重构缓存接口和redis

This commit is contained in:
黄孟柱
2022-10-18 19:12:26 +08:00
parent 3a0c03b365
commit b7f98c4750
12 changed files with 378 additions and 198 deletions

View File

@@ -0,0 +1,57 @@
package redis
import (
"context"
"errors"
"github.com/eolinker/apinto/resources"
"time"
)
var (
ErrorNotInitRedis = errors.New("redis not init")
intError = resources.NewIntResult(0, ErrorNotInitRedis)
boolError = resources.NewBoolResult(false, ErrorNotInitRedis)
stringError = resources.NewStringResult("", ErrorNotInitRedis)
statusError = resources.NewStatusResult(ErrorNotInitRedis)
)
type Empty struct {
}
func (e *Empty) Exec(ctx context.Context) error {
return ErrorNotInitRedis
}
func (e *Empty) Set(ctx context.Context, key string, value []byte, expiration time.Duration) resources.StatusResult {
return statusError
}
func (e *Empty) SetNX(ctx context.Context, key string, value []byte, expiration time.Duration) resources.BoolResult {
return boolError
}
func (e *Empty) DecrBy(ctx context.Context, key string, decrement int64) resources.IntResult {
return intError
}
func (e *Empty) IncrBy(ctx context.Context, key string, decrement int64) resources.IntResult {
return intError
}
func (e *Empty) Get(ctx context.Context, key string) resources.StringResult {
return stringError
}
func (e *Empty) GetDel(ctx context.Context, key string) resources.StringResult {
//TODO implement me
panic("implement me")
}
func (e *Empty) Del(ctx context.Context, keys ...string) resources.IntResult {
return intError
}
func (e *Empty) Tx() resources.TX {
return e
}

View File

@@ -1,8 +1,32 @@
package redis
import (
"context"
"fmt"
"github.com/eolinker/eosc"
"github.com/go-redis/redis/v8"
"time"
)
type Config struct {
Enable bool `json:"enable"`
Addrs []string `json:"addrs" label:"redis 节点列表"`
Username string `json:"username"`
Password string `json:"password"`
}
func (c *Config) connect() (*redis.ClusterClient, error) {
if len(c.Addrs) == 0 {
return nil, fmt.Errorf("addrs:%w", eosc.ErrorRequire)
}
nc := redis.NewClusterClient(&redis.ClusterOptions{
Addrs: c.Addrs,
Username: c.Username,
Password: c.Password,
})
timeout, _ := context.WithTimeout(context.Background(), time.Second)
if err := nc.Ping(timeout).Err(); err != nil {
nc.Close()
return nil, err
}
return nc, nil
}

View File

@@ -1,95 +0,0 @@
package redis
import (
"context"
"github.com/eolinker/apinto/resources"
"github.com/eolinker/eosc"
"github.com/eolinker/eosc/env"
"github.com/eolinker/eosc/log"
"github.com/go-redis/redis/v8"
"reflect"
)
type Controller struct {
current *_Cacher
config Config
}
func (m *Controller) ConfigType() reflect.Type {
return configType
}
func (m *Controller) shutdown() {
oldClient := m.current
if oldClient != nil {
m.current = nil
resources.ReplaceCacher()
oldClient.client.Close()
}
}
func (m *Controller) Set(conf interface{}) (err error) {
config, ok := conf.(*Config)
if ok && config != nil {
old := m.config
m.config = *config
if reflect.DeepEqual(old, m.config) {
return nil
}
if m.config.Enable && len(m.config.Addrs) > 0 {
client := redis.NewClusterClient(&redis.ClusterOptions{
Addrs: m.config.Addrs,
Username: m.config.Username,
Password: m.config.Password,
})
if res, errPing := client.Ping(context.Background()).Result(); errPing != nil {
log.Info("ping redis:", res, " error:", err)
client.Close()
return errPing
}
if env.Process() == eosc.ProcessWorker {
if m.current == nil {
m.current = newCacher(client)
resources.ReplaceCacher(m.current)
} else {
m.current.client = client
}
} else {
client.Close()
}
return nil
}
}
oldClient := m.current
if oldClient != nil {
resources.ReplaceCacher()
m.current = nil
oldClient.client.Close()
}
return nil
}
func (m *Controller) Get() interface{} {
return m.config
}
func (m *Controller) Mode() eosc.SettingMode {
return eosc.SettingModeSingleton
}
func (m *Controller) Check(cfg interface{}) (profession, name, driver, desc string, err error) {
err = eosc.ErrorUnsupportedKind
return
}
func (m *Controller) AllWorkers() []string {
return []string{"redis@setting"}
}
func NewController() *Controller {
return &Controller{}
}

View File

@@ -0,0 +1,43 @@
package redis
import (
"github.com/eolinker/eosc"
"reflect"
)
type Driver struct {
}
func (d *Driver) Check(v interface{}, workers map[eosc.RequireId]eosc.IWorker) error {
_, err := checkConfig(v)
return err
}
func checkConfig(v interface{}) (*Config, error) {
cfg, ok := v.(*Config)
if !ok {
return nil, eosc.ErrorConfigIsNil
}
return cfg, nil
}
func (d *Driver) ConfigType() reflect.Type {
return configType
}
func (d *Driver) Create(id, name string, v interface{}, workers map[eosc.RequireId]eosc.IWorker) (eosc.IWorker, error) {
if err := d.Check(v, workers); err != nil {
return nil, err
}
w := &Worker{
ICache: &Empty{},
config: nil,
client: nil,
id: id,
name: name,
}
err := w.Reset(v, workers)
if err != nil {
return nil, err
}
return w, nil
}

View File

@@ -2,20 +2,31 @@ package redis
import (
"github.com/eolinker/eosc"
"github.com/eolinker/eosc/setting"
"github.com/eolinker/eosc/utils/schema"
"reflect"
)
var (
singleton *Controller
_ eosc.ISetting = singleton
configType = reflect.TypeOf(new(Config))
configType = reflect.TypeOf(new(Config))
render interface{}
)
func init() {
singleton = NewController()
render, _ = schema.Generate(configType, nil)
}
func Register(register eosc.IExtenderDriverRegister) {
setting.RegisterSetting("redis", singleton)
register.RegisterExtenderDriver("redis", new(Factory))
}
type Factory struct {
}
func (f *Factory) Render() interface{} {
return render
}
func (f *Factory) Create(profession string, name string, label string, desc string, params map[string]interface{}) (eosc.IExtenderDriver, error) {
return new(Driver), nil
}

View File

@@ -7,59 +7,72 @@ import (
"time"
)
var (
_ resources.ICache = (*_Cacher)(nil)
)
type _Cacher struct {
client *redis.ClusterClient
type statusResult struct {
statusCmd *redis.StatusCmd
}
func (r *_Cacher) Close() error {
if r.client == nil {
e := r.client.Close()
r.client = nil
return e
func (s *statusResult) Result() error {
return s.statusCmd.Err()
}
type Cmdable struct {
cmdable redis.Cmdable
}
func (r *Cmdable) Tx() resources.TX {
tx := r.cmdable.TxPipeline()
return &TxPipeline{
Cmdable: Cmdable{
cmdable: tx,
},
p: tx,
}
return nil
}
func (r *_Cacher) Set(ctx context.Context, key string, value []byte, expiration time.Duration) error {
func (r *Cmdable) Set(ctx context.Context, key string, value []byte, expiration time.Duration) resources.StatusResult {
return r.client.Set(ctx, key, value, expiration).Err()
return &statusResult{statusCmd: r.cmdable.Set(ctx, key, value, expiration)}
}
func (r *_Cacher) SetNX(ctx context.Context, key string, value []byte, expiration time.Duration) (bool, error) {
func (r *Cmdable) SetNX(ctx context.Context, key string, value []byte, expiration time.Duration) resources.BoolResult {
return r.client.SetNX(ctx, key, value, expiration).Result()
return r.cmdable.SetNX(ctx, key, value, expiration)
}
func (r *_Cacher) DecrBy(ctx context.Context, key string, decrement int64) (int64, error) {
func (r *Cmdable) DecrBy(ctx context.Context, key string, decrement int64) resources.IntResult {
return r.client.DecrBy(ctx, key, decrement).Result()
return r.cmdable.DecrBy(ctx, key, decrement)
}
func (r *_Cacher) IncrBy(ctx context.Context, key string, decrement int64) (int64, error) {
return r.client.IncrBy(ctx, key, decrement).Result()
func (r *Cmdable) IncrBy(ctx context.Context, key string, decrement int64) resources.IntResult {
return r.cmdable.IncrBy(ctx, key, decrement)
}
func (r *_Cacher) Get(ctx context.Context, key string) ([]byte, error) {
return r.client.Get(ctx, key).Bytes()
func (r *Cmdable) Get(ctx context.Context, key string) resources.StringResult {
return r.cmdable.Get(ctx, key)
}
func (r *_Cacher) GetDel(ctx context.Context, key string) ([]byte, error) {
return r.client.GetDel(ctx, key).Bytes()
func (r *Cmdable) GetDel(ctx context.Context, key string) resources.StringResult {
return r.cmdable.GetDel(ctx, key)
}
func (r *_Cacher) Del(ctx context.Context, keys ...string) (int64, error) {
return r.client.Del(ctx, keys...).Result()
func (r *Cmdable) Del(ctx context.Context, keys ...string) resources.IntResult {
return r.cmdable.Del(ctx, keys...)
}
func newCacher(client *redis.ClusterClient) *_Cacher {
if client == nil {
return nil
}
return &_Cacher{client: client}
type TxPipeline struct {
Cmdable
p redis.Pipeliner
}
func (tx *TxPipeline) Tx() resources.TX {
return tx
}
func (tx *TxPipeline) Exec(ctx context.Context) error {
_, err := tx.p.Exec(ctx)
return err
}

View File

@@ -0,0 +1,81 @@
package redis
import (
"github.com/eolinker/apinto/resources"
"github.com/eolinker/eosc"
"github.com/go-redis/redis/v8"
"reflect"
)
type Worker struct {
resources.ICache
config *Config
client *redis.ClusterClient
id string
name string
isRunning bool
}
func (w *Worker) Id() string {
return w.id
}
func (w *Worker) Start() error {
if w.isRunning {
return nil
}
if len(w.config.Addrs) == 0 {
return eosc.ErrorConfigIsNil
}
client, err := w.config.connect()
if err != nil {
return err
}
w.client, w.ICache = client, &Cmdable{cmdable: client}
w.isRunning = true
return nil
}
func (w *Worker) Reset(conf interface{}, workers map[eosc.RequireId]eosc.IWorker) error {
cfg, err := checkConfig(conf)
if err != nil {
return err
}
if w.config == nil || !reflect.DeepEqual(w.config, cfg) {
w.config = cfg
client, err := cfg.connect()
if err != nil {
return err
}
if w.isRunning {
oc := w.client
w.client, w.ICache = client, &Cmdable{cmdable: client}
oc.Close()
} else {
client.Close()
}
}
return nil
}
func (w *Worker) Stop() error {
if !w.isRunning {
return eosc.ErrorWorkerNotRunning
}
w.isRunning = false
if w.client != nil {
w.ICache = &Empty{}
e := w.client.Close()
w.client = nil
return e
}
return nil
}
func (w *Worker) CheckSkill(skill string) bool {
return skill == resources.CacheSkill
}

View File

@@ -2,7 +2,6 @@ package limiting_strategy
import (
"github.com/eolinker/apinto/drivers/strategy/limiting-strategy/scalar"
"github.com/eolinker/apinto/strategy"
"github.com/eolinker/eosc/eocontext"
"sort"
"sync"
@@ -15,15 +14,16 @@ var (
func init() {
actuator := newActuator()
actuatorSet = actuator
strategy.AddStrategyHandler(actuator)
}
type ActuatorSet interface {
eocontext.IFilter
Set(id string, limiting *LimitingHandler)
Del(id string)
}
type tActuator struct {
type tActuatorSet struct {
lock sync.RWMutex
all map[string]*LimitingHandler
handlers []*LimitingHandler
@@ -31,24 +31,24 @@ type tActuator struct {
traffics scalar.Manager
}
func (a *tActuator) Destroy() {
func (a *tActuatorSet) Destroy() {
}
func (a *tActuator) Set(id string, limiting *LimitingHandler) {
func (a *tActuatorSet) Set(id string, limiting *LimitingHandler) {
// 调用来源有锁
a.all[id] = limiting
a.rebuild()
}
func (a *tActuator) Del(id string) {
func (a *tActuatorSet) Del(id string) {
// 调用来源有锁
delete(a.all, id)
a.rebuild()
}
func (a *tActuator) rebuild() {
func (a *tActuatorSet) rebuild() {
handlers := make([]*LimitingHandler, 0, len(a.all))
for _, h := range a.all {
@@ -61,15 +61,15 @@ func (a *tActuator) rebuild() {
defer a.lock.Unlock()
a.handlers = handlers
}
func newActuator() *tActuator {
return &tActuator{
func newActuator() *tActuatorSet {
return &tActuatorSet{
queryScalar: scalar.NewManager(),
traffics: scalar.NewManager(),
all: make(map[string]*LimitingHandler),
}
}
func (a *tActuator) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) error {
func (a *tActuatorSet) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) error {
a.lock.RLock()
handlers := a.handlers

1
go.mod
View File

@@ -11,6 +11,7 @@ require (
github.com/hashicorp/consul/api v1.9.1
github.com/nsqio/go-nsq v1.1.0
github.com/ohler55/ojg v1.12.9
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pkg/sftp v1.13.4
github.com/robertkrimen/otto v0.0.0-20211024170158-b87d35c0b86f
github.com/satori/go.uuid v1.2.0

View File

@@ -3,6 +3,8 @@ package resources
import (
"context"
"github.com/coocood/freecache"
"github.com/patrickmn/go-cache"
"sync"
"time"
)
@@ -11,7 +13,16 @@ var (
)
type NoCache struct {
client *freecache.Cache
tx sync.Mutex
client *freecache.Cache
comparableCache *cache.Cache
}
func (n *NoCache) Tx(ctx context.Context, f func(c ICache) error) error {
n.tx.Lock()
defer n.tx.Unlock()
return f(n)
}
func (n *NoCache) Close() error {
@@ -34,6 +45,7 @@ func (n *NoCache) SetNX(ctx context.Context, key string, value []byte, expiratio
}
func (n *NoCache) DecrBy(ctx context.Context, key string, decrement int64) (int64, error) {
return 0, ErrorNoCache
}

View File

@@ -3,50 +3,102 @@ package resources
import (
"context"
"errors"
"github.com/eolinker/eosc/utils/config"
"time"
"unsafe"
)
var CacheSkill = config.TypeNameOf(ICache(nil))
var (
ErrorNoCache = errors.New("no cache")
_ ICache = (*_Proxy)(nil)
ErrorNoCache = errors.New("no cache")
)
var (
singCacheProxy *_Proxy
)
func init() {
singCacheProxy = newProxy(new(NoCache))
}
func ReplaceCacher(caches ...ICache) {
if len(caches) < 1 || caches[0] == nil {
if singCacheProxy.ICache != nil {
singCacheProxy.ICache.Close()
}
singCacheProxy.ICache = NewCacher()
return
}
singCacheProxy.ICache = caches[0]
}
func Cacher() ICache {
return singCacheProxy
}
type ICache interface {
Set(ctx context.Context, key string, value []byte, expiration time.Duration) error
SetNX(ctx context.Context, key string, value []byte, expiration time.Duration) (bool, error)
DecrBy(ctx context.Context, key string, decrement int64) (int64, error)
IncrBy(ctx context.Context, key string, decrement int64) (int64, error)
Get(ctx context.Context, key string) ([]byte, error)
GetDel(ctx context.Context, key string) ([]byte, error)
Del(ctx context.Context, keys ...string) (int64, error)
Close() error
}
Set(ctx context.Context, key string, value []byte, expiration time.Duration) StatusResult
SetNX(ctx context.Context, key string, value []byte, expiration time.Duration) BoolResult
DecrBy(ctx context.Context, key string, decrement int64) IntResult
IncrBy(ctx context.Context, key string, decrement int64) IntResult
Get(ctx context.Context, key string) StringResult
GetDel(ctx context.Context, key string) StringResult
Del(ctx context.Context, keys ...string) IntResult
Tx() TX
type _Proxy struct {
//Close() error
}
type TX interface {
ICache
Exec(ctx context.Context) error
}
type BoolResult interface {
Result() (bool, error)
}
type IntResult interface {
Result() (int64, error)
}
type StringResult interface {
Result() (string, error)
Bytes() ([]byte, error)
}
type StatusResult interface {
Result() error
}
func newProxy(target ICache) *_Proxy {
return &_Proxy{ICache: target}
type statusResult struct {
err error
}
func NewStatusResult(err error) *statusResult {
return &statusResult{err: err}
}
func (s *statusResult) Result() error {
return s.err
}
type stringResult struct {
err error
val string
}
func NewStringResult(val string, err error) *stringResult {
return &stringResult{err: err, val: val}
}
func (s *stringResult) Result() (string, error) {
return s.val, s.err
}
func (s *stringResult) Bytes() ([]byte, error) {
return *(*[]byte)(unsafe.Pointer(
&struct {
string
Cap int
}{s.val, len(s.val)},
)), s.err
}
type boolResult struct {
val bool
err error
}
func NewBoolResult(val bool, err error) *boolResult {
return &boolResult{val: val, err: err}
}
func (b *boolResult) Result() (bool, error) {
return b.val, b.err
}
type intResult struct {
val int64
err error
}
func NewIntResult(val int64, err error) *intResult {
return &intResult{val: val, err: err}
}
func (b *intResult) Result() (int64, error) {
return b.val, b.err
}

View File

@@ -1,20 +1 @@
package strategy
import "github.com/eolinker/eosc/eocontext"
type IStrategyManager interface {
AddStrategyHandler(handler eocontext.IFilter)
Strategy(ctx eocontext.EoContext, next eocontext.IChain) error
}
var (
handlers eocontext.Filters
)
func AddStrategyHandler(handler eocontext.IFilter) {
handlers = append(handlers, handler)
}
func Strategy(ctx eocontext.EoContext, next eocontext.IChain) error {
return eocontext.DoChain(ctx, handlers, eocontext.ToFilter(next))
}