From cac791dfd52e4131fd4c181e3f50d34c338ef752 Mon Sep 17 00:00:00 2001 From: kwin Date: Thu, 12 Sep 2024 11:40:17 +0800 Subject: [PATCH] =?UTF-8?q?=E9=9B=86=E6=88=90asynq=EF=BC=88=E5=9F=BA?= =?UTF-8?q?=E4=BA=8ERedis=E5=AE=9E=E7=8E=B0=E5=BC=82=E6=AD=A5=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E9=98=9F=E5=88=97=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Makefile | 5 +- README.md | 108 +++++++++++++++++++++++++++- asynq/mq_base.go | 76 ++++++++++++++++++++ asynq/test.go | 28 ++++++++ cmd/main.go | 1 + cmd/server/asynq.go | 163 +++++++++++++++++++++++++++++++++++++++++++ cmd/server/mq.go | 2 +- config.yaml | 6 +- core/global/core.go | 1 + core/logger/zap.go | 30 +++++--- crontab/cron_init.go | 4 +- go.mod | 6 +- mq/mq_base.go | 9 ++- router/router.go | 12 +++- 14 files changed, 427 insertions(+), 24 deletions(-) create mode 100644 asynq/mq_base.go create mode 100644 asynq/test.go create mode 100644 cmd/server/asynq.go diff --git a/Makefile b/Makefile index 4923bf0..b02362c 100644 --- a/Makefile +++ b/Makefile @@ -78,4 +78,7 @@ deploy-dev: make run swag-api: - @swag init --parseDependency --parseInternal --parseGoList=false --parseDepth=6 -o ./docs -d . \ No newline at end of file + @swag init --parseDependency --parseInternal --parseGoList=false --parseDepth=6 -o ./docs -d . + +go-run: + @go run main.go server -c config.yaml \ No newline at end of file diff --git a/README.md b/README.md index 3b3d0f8..7bdab8e 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ https://github.com/kwinH/fastApi 11. [go-nsq](github.com/nsqio/go-nsq): nsq 是一款基于 go 语言开发实现的分布式消息队列组件 12. [endless](github.com/fvbock/endless) 用于创建和管理 HTTP 服务器的 Go 包,特别是提供了优雅的停机、热重启支持功能 13. [OpenTelemetry](https://pkg.go.dev/go.opentelemetry.io/otel) 实现分布式追踪和指标收集的功能 +14. [asynq](github.com/hibiken/asynq) 基于 Redis 实现异步任务队列 本项目已经预先实现了一些常用的代码方便参考和复用: @@ -160,7 +161,7 @@ bin/fast-api-linux server restart bin/fast-api-linux cron -c config.yaml ``` -## 消息队列 +## nsq消息队列 ### 先启动nsq服务 @@ -178,14 +179,117 @@ nsq: consumer: 127.0.0.1:4161 ``` +### 生产者 +> 所有的生产者代码都放在`mq`目录下,并且都继承`BaseMQ`,`BaseMQ`中封装了`Producer`方法,只需要实现`HandleMessage`方法即可 +```go +package mq + +import ( + "context" + "fastApi/app/model" + "fastApi/core/logger" + "github.com/nsqio/go-nsq" +) + +type SendRegisteredEmail struct { + BaseMQ +} + +func (c *SendRegisteredEmail) HandleMessage(msg *nsq.Message) error { + return c.Handle(msg, func(ctx context.Context, data string) error { + logger.Log(ctx).Info("ok") + return nil + }) +} + +func init() { + MQList = append(MQList, NewSendRegisteredEmail()) +} + +func NewSendRegisteredEmail() *SendRegisteredEmail { + return &SendRegisteredEmail{ + BaseMQ: BaseMQ{ + Topic: "sendRegisteredEmail", + }} +} + +``` + +### 触发生产者 +```go +err := mq.NewSendRegisteredEmail().Producer(ctx, []byte("test"), 1*time.Second) +fmt.Printf("err:%v", err) +``` + ### 运行消费者 ```shell bin/fast-api-linux nq -c config.yaml ``` +## asysq消息队列(基于redis实现) + +### 开启config.yaml配置中的`asysq`选项 + +```yaml +... +asynq: + addr: 127.0.0.1:6379 + password: "Q#W*9hETg*fp0)@jWmXl" + db: 0 +``` + +### 生产者 +> 所有的生产者代码都放在`asynq`目录下,并且都继承`BaseMQ`,`BaseMQ`中封装了`Producer`方法,只需要实现`Consumer`方法即可 +```go +package asynq + +import ( + "context" + "fastApi/core/logger" + "github.com/hibiken/asynq" +) + +func init() { + MQList = append(MQList, NewTest()) +} + +type Test struct { + BaseMQ +} + +func NewTest() *Test { + return &Test{ + BaseMQ: BaseMQ{ + Typename: "test", + }} +} + +func (c *Test) Consumer(ctx context.Context, t *asynq.Task) error { + logger.SLog(ctx).Infof("type: %v, payload: %s", t.Type(), string(t.Payload())) + + return nil +} + + +``` + +### 触发生产者 +```go +rawData := `{"name":"kwinwong"}` +err := asynq.NewTest().Producer(c, []byte(rawData)) +fmt.Printf("err:%v", err) +``` + +### 运行消费者 + +```shell +bin/fast-api-linux asynq -c config.yaml +``` + + # 链路追踪 -fast-api 中基于OpenTelemetry集成了链路追踪,配置如下: +fast-api 中基于OpenTelemetry集成了链路追踪,`config.yaml`配置如下: ```yaml #链路追踪 telemetry: diff --git a/asynq/mq_base.go b/asynq/mq_base.go new file mode 100644 index 0000000..8f17709 --- /dev/null +++ b/asynq/mq_base.go @@ -0,0 +1,76 @@ +package asynq + +import ( + "context" + "encoding/json" + "fastApi/core/logger" + "github.com/hibiken/asynq" + "github.com/spf13/viper" +) + +var AsynqClient *asynq.Client +var MQList []InterfaceMQ + +type InterfaceMQ interface { + Producer(ctx context.Context, message []byte, opts ...asynq.Option) error + Consumer(ctx context.Context, t *asynq.Task) error + GetTypename() string +} + +type BaseMQ struct { + Typename string +} + +func (c *BaseMQ) GetTypename() string { + return c.Typename +} + +func GetAsynqClient() { + if AsynqClient == nil { + r := asynq.RedisClientOpt{ + Addr: viper.GetString("asynq.addr"), + Password: viper.GetString("asynq.password"), + DB: viper.GetInt("asynq.db"), + } + AsynqClient = asynq.NewClient(r) + } +} + +func (c *BaseMQ) Producer(ctx context.Context, payload []byte, opts ...asynq.Option) (err error) { + GetAsynqClient() + + traceId := ctx.Value(logger.TraceId).(string) + if traceId == "" { + traceId, _ = logger.CalcTraceId(ctx) + } + + data := map[string]string{ + "traceId": traceId, + "payload": string(payload), + } + payload, _ = json.Marshal(data) + + task := asynq.NewTask(c.GetTypename(), payload) + + //延时消费 5秒后消费 + //opts = append(opts, asynq.ProcessIn(5*time.Second)) + + // 10秒超时 + //opts = append(opts, asynq.Timeout(10*time.Second)) + + //最多重试次参数 + opts = append(opts, asynq.MaxRetry(100)) + + //20秒后过期 + //opts = append(opts, asynq.Deadline(time.Now().Add(20*time.Second))) + + res, err := AsynqClient.Enqueue(task, opts...) + + if err != nil { + logger.SLog(ctx).Errorf("加入队列(%s)失败:%v", res.Type, err.Error()) + } else { + logger.SLog(ctx).Infof("加入队列:%s成功,参数:%s", res.Type, string(payload)) + } + + return +} diff --git a/asynq/test.go b/asynq/test.go new file mode 100644 index 0000000..6f9417f --- /dev/null +++ b/asynq/test.go @@ -0,0 +1,28 @@ +package asynq + +import ( + "context" + "fastApi/core/logger" + "github.com/hibiken/asynq" +) + +func init() { + MQList = append(MQList, NewTest()) +} + +type Test struct { + BaseMQ +} + +func NewTest() *Test { + return &Test{ + BaseMQ: BaseMQ{ + Typename: "test", + }} +} + +func (c *Test) Consumer(ctx context.Context, t *asynq.Task) error { + logger.SLog(ctx).Infof("type: %v, payload: %s", t.Type(), string(t.Payload())) + + return nil +} diff --git a/cmd/main.go b/cmd/main.go index f915bde..efa097f 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -22,6 +22,7 @@ func init() { RootCmd.AddCommand(server.StartApi) RootCmd.AddCommand(server.StartCmd) RootCmd.AddCommand(server.StartMQ) + RootCmd.AddCommand(server.StartAsynq) } func persistentPreRun(cmd *cobra.Command, args []string) { diff --git a/cmd/server/asynq.go b/cmd/server/asynq.go new file mode 100644 index 0000000..deda46a --- /dev/null +++ b/cmd/server/asynq.go @@ -0,0 +1,163 @@ +package server + +import ( + "context" + "encoding/json" + nq "fastApi/asynq" + "fastApi/core/global" + "fastApi/core/logger" + "fastApi/util" + "fmt" + "github.com/hibiken/asynq" + "github.com/spf13/cobra" + "github.com/spf13/viper" + "go.opentelemetry.io/otel/attribute" + oteltrace "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" + "os" + "os/signal" + "syscall" + "time" +) + +var ( + StartAsynq = &cobra.Command{ + Use: "asynq", + Short: "Welcome to a Tour of Asynq!", + Example: "Asynq 是一个由Redis实现的异步队列 Go 库", + PreRun: func(cmd *cobra.Command, args []string) { + + }, + RunE: func(cmd *cobra.Command, args []string) error { + return run() + }, + } +) + +func run() error { + // asynq server + srv := asynq.NewServer( + asynq.RedisClientOpt{ + Addr: viper.GetString("asynq.addr"), + Password: viper.GetString("asynq.password"), + DB: viper.GetInt("asynq.db"), + }, + asynq.Config{ + Logger: global.SLog, + // Concurrency: 20, //worker数量,默认启动的worker数量是服务器的CPU个数 + RetryDelayFunc: func(n int, e error, t *asynq.Task) time.Duration { + delay := 1 * time.Second + if n < 15 { + delay = 60 * time.Second + } else if n < 25 { + delay = 300 * time.Second + } else if n < 60 { + delay = 600 * time.Second + } else { + delay = 3600 * time.Second + } + + return delay + }, + }, + ) + + mux := asynq.NewServeMux() + + // some middlewares + mux.Use(func(next asynq.Handler) asynq.Handler { + return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error { + var log *zap.Logger + var span oteltrace.Span + + defer func() { + if r := recover(); r != nil { + if log == nil { + log = logger.Log(ctx).With( + zap.String("url", t.Type()), + ) + } + log.Sugar().Errorf("panic: %v", r) + } + }() + + ctx, span, _ = util.ContextWithSpanContext(ctx, "", "", "queue-consumer", t.Type()) + if span != nil { + defer span.End() + } + + startTime := time.Now() + var data map[string]string + err := json.Unmarshal(t.Payload(), &data) + + if err != nil { + log = logger.Log(ctx).With( + zap.String("url", t.Type()), + zap.String("params", string(t.Payload())), + ) + + log.Error("数据解析失败: " + err.Error()) + } + + if span != nil { + span.SetAttributes(attribute.String("traceId", data["traceId"])) + } + + ctx = logger.WithC(ctx, + zap.String("url", t.Type()), + zap.String("traceId", data["traceId"]), + ) + + t = asynq.NewTask(t.Type(), []byte(data["payload"])) + err = next.ProcessTask(ctx, t) + + endTime := time.Now() + latencyTime := endTime.Sub(startTime) + log = logger.Log(ctx).With( + zap.String("params", data["payload"]), + zap.Duration("runtime", latencyTime), + ) + if err != nil { + log.Error("任务执行失败: " + err.Error()) + } else { + log.Info("任务执行成功") + } + + return err + }) + }) + + for _, q := range nq.MQList { + mux.HandleFunc(q.GetTypename(), q.Consumer) + } + + // start server + if err := srv.Start(mux); err != nil { + panic(err) + } + + // Wait for termination signal. + c := make(chan os.Signal, 1) + + signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, + syscall.SIGQUIT, + //syscall.SIGUSR1, syscall.SIGUSR2, + ) + + for { + s := <-c + switch s { + case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT: + fmt.Println("Program Exit...", s) + srv.Shutdown() + srv.Stop() + return nil + //case syscall.SIGUSR1: + // fmt.Println("usr1 signal", s) + //case syscall.SIGUSR2: + // fmt.Println("usr2 signal", s) + default: + fmt.Println("other signal", s) + } + } +} diff --git a/cmd/server/mq.go b/cmd/server/mq.go index 9569971..625d5ab 100644 --- a/cmd/server/mq.go +++ b/cmd/server/mq.go @@ -32,7 +32,7 @@ func initConsumer(topic string, channel string, address string, handler nsq.Hand if err != nil { panic(err) } - c.SetLoggerLevel(nsq.LogLevelWarning) //屏蔽系统日志 + c.SetLoggerLevel(nsq.LogLevelWarning) //支显示警告级别及以上的日志 c.AddHandler(handler) // 添加消费者接口 //建立NSQLookupd连接 diff --git a/config.yaml b/config.yaml index 3250505..31fd62a 100644 --- a/config.yaml +++ b/config.yaml @@ -18,7 +18,6 @@ redis: password: "" db: 0 - jwt: # token 密钥 key: "dgE7B6SPrS%9yLE" @@ -35,6 +34,11 @@ logger: # producer: 127.0.0.1:4150 # consumer: 127.0.0.1:4161 +#asynq: +# addr: 127.0.0.1:6379 +# password: "Q#W*9hETg*fp0)@jWmXl" +# db: 0 + #链路追踪 #telemetry: # name: fast-api diff --git a/core/global/core.go b/core/global/core.go index 361f97b..e30b0d9 100644 --- a/core/global/core.go +++ b/core/global/core.go @@ -15,6 +15,7 @@ const DBKey = "DB" var ( Trans ut.Translator // 定义一个全局翻译器T Log *zap.Logger + SLog *zap.SugaredLogger GDB *gorm.DB // DB 数据库链接单例 Redis *redis.Client Producer *nsq.Producer diff --git a/core/logger/zap.go b/core/logger/zap.go index 85b11d6..111ccd2 100644 --- a/core/logger/zap.go +++ b/core/logger/zap.go @@ -9,6 +9,7 @@ import ( "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "go.uber.org/zap/zapcore" + "gorm.io/gorm" "io" "os" ) @@ -41,6 +42,7 @@ func InitLogger() *zap.Logger { ) global.Log = zap.New(zapCore) + global.SLog = global.Log.Sugar() logger = global.Log return logger } @@ -68,24 +70,30 @@ func CalcTraceId(ctx context.Context) (traceId, spanId string) { return uuid.New().String(), "" } -func With(c *gin.Context, fields ...zap.Field) { +func injectToContext(log *zap.Logger, slog *zap.SugaredLogger, db *gorm.DB, store func(key string, value interface{})) { + store(loggerKey, log) + store(loggerSugarKey, slog) + store(global.DBKey, db) +} + +func createLoggerAndDB(fields ...zap.Field) (*zap.Logger, *zap.SugaredLogger, *gorm.DB) { log := logger.With(fields...) slog := log.Sugar() db := global.GDB db.Logger = NewGormLog(log) - c.Set(loggerKey, log) - c.Set(loggerSugarKey, slog) - c.Set(global.DBKey, db) + return log, slog, db +} + +func With(c *gin.Context, fields ...zap.Field) { + log, slog, db := createLoggerAndDB(fields...) + injectToContext(log, slog, db, c.Set) } func WithC(c context.Context, fields ...zap.Field) context.Context { - log := logger.With(fields...) - slog := log.Sugar() - db := global.GDB - db.Logger = NewGormLog(log) - c = context.WithValue(c, loggerKey, log) - c = context.WithValue(c, loggerSugarKey, slog) - c = context.WithValue(c, global.DBKey, db) + log, slog, db := createLoggerAndDB(fields...) + injectToContext(log, slog, db, func(key string, value interface{}) { + c = context.WithValue(c, key, value) + }) return c } diff --git a/crontab/cron_init.go b/crontab/cron_init.go index 95a4dff..8964983 100644 --- a/crontab/cron_init.go +++ b/crontab/cron_init.go @@ -50,7 +50,7 @@ func CronInit() { Cron.Start() } -func WithRequestId(ctx context.Context, name, traceId string) context.Context { +func WithTraceId(ctx context.Context, name, traceId string) context.Context { return logger.WithC( ctx, zap.String("traceId", traceId), @@ -63,7 +63,7 @@ func BaseCronFuc(name string, cmd func(context.Context)) func() { traceId := uuid.New().String() ctx := context.WithValue(context.Background(), logger.TraceId, traceId) - ctx = WithRequestId(ctx, name, traceId) + ctx = WithTraceId(ctx, name, traceId) cmd(ctx) } } diff --git a/go.mod b/go.mod index 47258c6..f4022e2 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/go-redis/redis/v8 v8.11.5 github.com/golang-jwt/jwt/v4 v4.4.2 github.com/google/uuid v1.6.0 + github.com/hibiken/asynq v0.24.1 github.com/nsqio/go-nsq v1.1.0 github.com/robfig/cron/v3 v3.0.1 github.com/spf13/cobra v1.5.0 @@ -38,7 +39,7 @@ require ( github.com/KyleBanks/depth v1.2.1 // indirect github.com/bytedance/sonic v1.12.1 // indirect github.com/bytedance/sonic/loader v0.2.0 // indirect - github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cloudwego/base64x v0.1.4 // indirect github.com/cloudwego/iasm v0.2.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect @@ -54,6 +55,7 @@ require ( github.com/go-redis/redis/extra/rediscmd/v8 v8.11.5 // indirect github.com/go-sql-driver/mysql v1.7.0 // indirect github.com/goccy/go-json v0.10.3 // indirect + github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.3 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect @@ -71,6 +73,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pelletier/go-toml v1.9.5 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect + github.com/redis/go-redis/v9 v9.0.3 // indirect github.com/spf13/afero v1.8.2 // indirect github.com/spf13/cast v1.5.0 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect @@ -85,6 +88,7 @@ require ( golang.org/x/net v0.28.0 // indirect golang.org/x/sys v0.24.0 // indirect golang.org/x/text v0.17.0 // indirect + golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect google.golang.org/protobuf v1.34.2 // indirect gopkg.in/ini.v1 v1.66.4 // indirect diff --git a/mq/mq_base.go b/mq/mq_base.go index 3d9c61a..623c946 100644 --- a/mq/mq_base.go +++ b/mq/mq_base.go @@ -89,6 +89,12 @@ func (b *BaseMQ) Handle(msg *nsq.Message, h HandleFunc) error { } }() + var span oteltrace.Span + ctx, span, _ = util.ContextWithSpanContext(ctx, "", "", "queue-consumer", b.GetTopic()) + if span != nil { + defer span.End() + } + startTime := time.Now() var data map[string]string @@ -102,11 +108,8 @@ func (b *BaseMQ) Handle(msg *nsq.Message, h HandleFunc) error { ).Error("数据解析失败: " + err.Error()) } - var span oteltrace.Span - ctx, span, _ = util.ContextWithSpanContext(ctx, "", "", "queue-consumer", b.GetTopic()) if span != nil { span.SetAttributes(attribute.String("traceId", data["traceId"])) - defer span.End() } ctx = context.WithValue(ctx, logger.TraceId, data["traceId"]) diff --git a/router/router.go b/router/router.go index 125a0f5..6dd8743 100644 --- a/router/router.go +++ b/router/router.go @@ -1,6 +1,7 @@ package router import ( + "fastApi/asynq" "fastApi/core/middleware" _ "fastApi/docs" // 千万不要忘了导入把你上一步生成的docs "fastApi/mq" @@ -35,12 +36,19 @@ func initGin() *gin.Engine { c.String(200, "pong") }) - engine.GET("/queue_test", func(c *gin.Context) { + engine.GET("/nsq_test", func(c *gin.Context) { err := mq.NewSendRegisteredEmail().Producer(c, []byte("test"), 1*time.Second) - fmt.Printf("\n\n%#v\n\n", err) + fmt.Printf("err:%#v", err) c.String(200, "pong") }) + engine.GET("/asynq_test", func(c *gin.Context) { + rawData := `{"name":"kwinwong"}` + err := asynq.NewTest().Producer(c, []byte(rawData)) + fmt.Printf("err:%#v", err) + + c.String(200, "pong") + }) return engine }