Files
eagle/pkg/cache/redis.go
2023-08-08 18:03:43 +08:00

185 lines
5.0 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package cache
import (
"context"
"reflect"
"time"
"github.com/pkg/errors"
"github.com/redis/go-redis/v9"
"github.com/go-eagle/eagle/pkg/encoding"
"github.com/go-eagle/eagle/pkg/log"
)
// redisCache redis cache结构体
type redisCache struct {
client *redis.Client
KeyPrefix string
encoding encoding.Encoding
DefaultExpireTime time.Duration
newObject func() interface{}
}
// NewRedisCache new一个cache, client 参数是可传入的,方便进行单元测试
func NewRedisCache(client *redis.Client, keyPrefix string, encoding encoding.Encoding, newObject func() interface{}) Cache {
return &redisCache{
client: client,
KeyPrefix: keyPrefix,
encoding: encoding,
newObject: newObject,
}
}
func (c *redisCache) Set(ctx context.Context, key string, val interface{}, expiration time.Duration) error {
buf, err := encoding.Marshal(c.encoding, val)
if err != nil {
return errors.Wrapf(err, "marshal data err, value is %+v", val)
}
cacheKey, err := BuildCacheKey(c.KeyPrefix, key)
if err != nil {
return errors.Wrapf(err, "build cache key err, key is %+v", key)
}
if expiration == 0 {
expiration = DefaultExpireTime
}
err = c.client.Set(ctx, cacheKey, buf, expiration).Err()
if err != nil {
return errors.Wrapf(err, "redis set err: %+v", err)
}
return nil
}
func (c *redisCache) Get(ctx context.Context, key string, val interface{}) error {
cacheKey, err := BuildCacheKey(c.KeyPrefix, key)
if err != nil {
return errors.Wrapf(err, "build cache key err, key is %+v", key)
}
bytes, err := c.client.Get(ctx, cacheKey).Bytes()
// NOTE: don't handle the case where redis value is nil
// but leave it to the upstream for processing if need
if err != nil {
return err
}
// 防止data为空时Unmarshal报错
if string(bytes) == "" {
return nil
}
if string(bytes) == NotFoundPlaceholder {
return ErrPlaceholder
}
err = encoding.Unmarshal(c.encoding, bytes, val)
if err != nil {
return errors.Wrapf(err, "unmarshal data error, key=%s, cacheKey=%s type=%v, json is %+v ",
key, cacheKey, reflect.TypeOf(val), string(bytes))
}
return nil
}
func (c *redisCache) MultiSet(ctx context.Context, valueMap map[string]interface{}, expiration time.Duration) error {
if len(valueMap) == 0 {
return nil
}
if expiration == 0 {
expiration = DefaultExpireTime
}
// key-value是成对的所以这里的容量是map的2倍
paris := make([]interface{}, 0, 2*len(valueMap))
for key, value := range valueMap {
buf, err := encoding.Marshal(c.encoding, value)
if err != nil {
log.Warnf("marshal data err: %+v, value is %+v", err, value)
continue
}
cacheKey, err := BuildCacheKey(c.KeyPrefix, key)
if err != nil {
log.Warnf("build cache key err: %+v, key is %+v", err, key)
continue
}
paris = append(paris, []byte(cacheKey))
paris = append(paris, buf)
}
pipeline := c.client.Pipeline()
err := pipeline.MSet(ctx, paris...).Err()
if err != nil {
return errors.Wrapf(err, "redis multi set error")
}
for i := 0; i < len(paris); i = i + 2 {
switch paris[i].(type) {
case []byte:
pipeline.Expire(ctx, string(paris[i].([]byte)), expiration)
default:
log.Warnf("redis expire is unsupported key type: %+v", reflect.TypeOf(paris[i]))
}
}
_, err = pipeline.Exec(ctx)
if err != nil {
return errors.Wrapf(err, "redis multi set pipeline exec error")
}
return nil
}
func (c *redisCache) MultiGet(ctx context.Context, keys []string, value interface{}) error {
if len(keys) == 0 {
return nil
}
cacheKeys := make([]string, len(keys))
for index, key := range keys {
cacheKey, err := BuildCacheKey(c.KeyPrefix, key)
if err != nil {
return errors.Wrapf(err, "build cache key err, key is %+v", key)
}
cacheKeys[index] = cacheKey
}
values, err := c.client.MGet(ctx, cacheKeys...).Result()
if err != nil {
return errors.Wrapf(err, "redis MGet error, keys is %+v", keys)
}
// 通过反射注入到map
valueMap := reflect.ValueOf(value)
for i, value := range values {
if value == nil {
continue
}
object := c.newObject()
err = encoding.Unmarshal(c.encoding, []byte(value.(string)), object)
if err != nil {
log.Warnf("unmarshal data error: %+v, key=%s, cacheKey=%s type=%v", err,
keys[i], cacheKeys[i], reflect.TypeOf(value))
continue
}
valueMap.SetMapIndex(reflect.ValueOf(cacheKeys[i]), reflect.ValueOf(object))
}
return nil
}
func (c *redisCache) Del(ctx context.Context, keys ...string) error {
if len(keys) == 0 {
return nil
}
// 批量构建cacheKey
cacheKeys := make([]string, len(keys))
for index, key := range keys {
cacheKey, err := BuildCacheKey(c.KeyPrefix, key)
if err != nil {
log.Warnf("build cache key err: %+v, key is %+v", err, key)
continue
}
cacheKeys[index] = cacheKey
}
err := c.client.Del(ctx, cacheKeys...).Err()
if err != nil {
return errors.Wrapf(err, "redis delete error, keys is %+v", keys)
}
return nil
}
func (c *redisCache) SetCacheWithNotFound(ctx context.Context, key string) error {
return c.client.Set(ctx, key, NotFoundPlaceholder, DefaultNotFoundExpireTime).Err()
}