mirror of
https://github.com/go-eagle/eagle.git
synced 2025-09-27 12:52:11 +08:00
185 lines
5.0 KiB
Go
185 lines
5.0 KiB
Go
package cache
|
||
|
||
import (
|
||
"context"
|
||
"reflect"
|
||
"time"
|
||
|
||
"github.com/pkg/errors"
|
||
"github.com/redis/go-redis/v9"
|
||
|
||
"github.com/go-eagle/eagle/pkg/encoding"
|
||
"github.com/go-eagle/eagle/pkg/log"
|
||
)
|
||
|
||
// redisCache redis cache结构体
|
||
type redisCache struct {
|
||
client *redis.Client
|
||
KeyPrefix string
|
||
encoding encoding.Encoding
|
||
DefaultExpireTime time.Duration
|
||
newObject func() interface{}
|
||
}
|
||
|
||
// NewRedisCache new一个cache, client 参数是可传入的,方便进行单元测试
|
||
func NewRedisCache(client *redis.Client, keyPrefix string, encoding encoding.Encoding, newObject func() interface{}) Cache {
|
||
return &redisCache{
|
||
client: client,
|
||
KeyPrefix: keyPrefix,
|
||
encoding: encoding,
|
||
newObject: newObject,
|
||
}
|
||
}
|
||
|
||
func (c *redisCache) Set(ctx context.Context, key string, val interface{}, expiration time.Duration) error {
|
||
buf, err := encoding.Marshal(c.encoding, val)
|
||
if err != nil {
|
||
return errors.Wrapf(err, "marshal data err, value is %+v", val)
|
||
}
|
||
|
||
cacheKey, err := BuildCacheKey(c.KeyPrefix, key)
|
||
if err != nil {
|
||
return errors.Wrapf(err, "build cache key err, key is %+v", key)
|
||
}
|
||
if expiration == 0 {
|
||
expiration = DefaultExpireTime
|
||
}
|
||
err = c.client.Set(ctx, cacheKey, buf, expiration).Err()
|
||
if err != nil {
|
||
return errors.Wrapf(err, "redis set err: %+v", err)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (c *redisCache) Get(ctx context.Context, key string, val interface{}) error {
|
||
cacheKey, err := BuildCacheKey(c.KeyPrefix, key)
|
||
if err != nil {
|
||
return errors.Wrapf(err, "build cache key err, key is %+v", key)
|
||
}
|
||
|
||
bytes, err := c.client.Get(ctx, cacheKey).Bytes()
|
||
// NOTE: don't handle the case where redis value is nil
|
||
// but leave it to the upstream for processing if need
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 防止data为空时,Unmarshal报错
|
||
if string(bytes) == "" {
|
||
return nil
|
||
}
|
||
if string(bytes) == NotFoundPlaceholder {
|
||
return ErrPlaceholder
|
||
}
|
||
err = encoding.Unmarshal(c.encoding, bytes, val)
|
||
if err != nil {
|
||
return errors.Wrapf(err, "unmarshal data error, key=%s, cacheKey=%s type=%v, json is %+v ",
|
||
key, cacheKey, reflect.TypeOf(val), string(bytes))
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (c *redisCache) MultiSet(ctx context.Context, valueMap map[string]interface{}, expiration time.Duration) error {
|
||
if len(valueMap) == 0 {
|
||
return nil
|
||
}
|
||
if expiration == 0 {
|
||
expiration = DefaultExpireTime
|
||
}
|
||
// key-value是成对的,所以这里的容量是map的2倍
|
||
paris := make([]interface{}, 0, 2*len(valueMap))
|
||
for key, value := range valueMap {
|
||
buf, err := encoding.Marshal(c.encoding, value)
|
||
if err != nil {
|
||
log.Warnf("marshal data err: %+v, value is %+v", err, value)
|
||
continue
|
||
}
|
||
cacheKey, err := BuildCacheKey(c.KeyPrefix, key)
|
||
if err != nil {
|
||
log.Warnf("build cache key err: %+v, key is %+v", err, key)
|
||
continue
|
||
}
|
||
paris = append(paris, []byte(cacheKey))
|
||
paris = append(paris, buf)
|
||
}
|
||
pipeline := c.client.Pipeline()
|
||
err := pipeline.MSet(ctx, paris...).Err()
|
||
if err != nil {
|
||
return errors.Wrapf(err, "redis multi set error")
|
||
}
|
||
for i := 0; i < len(paris); i = i + 2 {
|
||
switch paris[i].(type) {
|
||
case []byte:
|
||
pipeline.Expire(ctx, string(paris[i].([]byte)), expiration)
|
||
default:
|
||
log.Warnf("redis expire is unsupported key type: %+v", reflect.TypeOf(paris[i]))
|
||
}
|
||
}
|
||
_, err = pipeline.Exec(ctx)
|
||
if err != nil {
|
||
return errors.Wrapf(err, "redis multi set pipeline exec error")
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (c *redisCache) MultiGet(ctx context.Context, keys []string, value interface{}) error {
|
||
if len(keys) == 0 {
|
||
return nil
|
||
}
|
||
cacheKeys := make([]string, len(keys))
|
||
for index, key := range keys {
|
||
cacheKey, err := BuildCacheKey(c.KeyPrefix, key)
|
||
if err != nil {
|
||
return errors.Wrapf(err, "build cache key err, key is %+v", key)
|
||
}
|
||
cacheKeys[index] = cacheKey
|
||
}
|
||
values, err := c.client.MGet(ctx, cacheKeys...).Result()
|
||
if err != nil {
|
||
return errors.Wrapf(err, "redis MGet error, keys is %+v", keys)
|
||
}
|
||
|
||
// 通过反射注入到map
|
||
valueMap := reflect.ValueOf(value)
|
||
for i, value := range values {
|
||
if value == nil {
|
||
continue
|
||
}
|
||
object := c.newObject()
|
||
err = encoding.Unmarshal(c.encoding, []byte(value.(string)), object)
|
||
if err != nil {
|
||
log.Warnf("unmarshal data error: %+v, key=%s, cacheKey=%s type=%v", err,
|
||
keys[i], cacheKeys[i], reflect.TypeOf(value))
|
||
continue
|
||
}
|
||
valueMap.SetMapIndex(reflect.ValueOf(cacheKeys[i]), reflect.ValueOf(object))
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (c *redisCache) Del(ctx context.Context, keys ...string) error {
|
||
if len(keys) == 0 {
|
||
return nil
|
||
}
|
||
|
||
// 批量构建cacheKey
|
||
cacheKeys := make([]string, len(keys))
|
||
for index, key := range keys {
|
||
cacheKey, err := BuildCacheKey(c.KeyPrefix, key)
|
||
if err != nil {
|
||
log.Warnf("build cache key err: %+v, key is %+v", err, key)
|
||
continue
|
||
}
|
||
cacheKeys[index] = cacheKey
|
||
}
|
||
err := c.client.Del(ctx, cacheKeys...).Err()
|
||
if err != nil {
|
||
return errors.Wrapf(err, "redis delete error, keys is %+v", keys)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (c *redisCache) SetCacheWithNotFound(ctx context.Context, key string) error {
|
||
return c.client.Set(ctx, key, NotFoundPlaceholder, DefaultNotFoundExpireTime).Err()
|
||
}
|