集成asynq(基于Redis实现异步任务队列)

This commit is contained in:
kwin
2024-09-12 11:40:17 +08:00
parent 076d7f291d
commit cac791dfd5
14 changed files with 427 additions and 24 deletions

View File

@@ -78,4 +78,7 @@ deploy-dev:
make run
swag-api:
@swag init --parseDependency --parseInternal --parseGoList=false --parseDepth=6 -o ./docs -d .
@swag init --parseDependency --parseInternal --parseGoList=false --parseDepth=6 -o ./docs -d .
go-run:
@go run main.go server -c config.yaml

108
README.md
View File

@@ -25,6 +25,7 @@ https://github.com/kwinH/fastApi
11. [go-nsq](github.com/nsqio/go-nsq): nsq 是一款基于 go 语言开发实现的分布式消息队列组件
12. [endless](github.com/fvbock/endless) 用于创建和管理 HTTP 服务器的 Go 包,特别是提供了优雅的停机、热重启支持功能
13. [OpenTelemetry](https://pkg.go.dev/go.opentelemetry.io/otel) 实现分布式追踪和指标收集的功能
14. [asynq](github.com/hibiken/asynq) 基于 Redis 实现异步任务队列
本项目已经预先实现了一些常用的代码方便参考和复用:
@@ -160,7 +161,7 @@ bin/fast-api-linux server restart
bin/fast-api-linux cron -c config.yaml
```
## 消息队列
## nsq消息队列
### 先启动nsq服务
@@ -178,14 +179,117 @@ nsq:
consumer: 127.0.0.1:4161
```
### 生产者
> 所有的生产者代码都放在`mq`目录下,并且都继承`BaseMQ``BaseMQ`中封装了`Producer`方法,只需要实现`HandleMessage`方法即可
```go
package mq
import (
"context"
"fastApi/app/model"
"fastApi/core/logger"
"github.com/nsqio/go-nsq"
)
type SendRegisteredEmail struct {
BaseMQ
}
func (c *SendRegisteredEmail) HandleMessage(msg *nsq.Message) error {
return c.Handle(msg, func(ctx context.Context, data string) error {
logger.Log(ctx).Info("ok")
return nil
})
}
func init() {
MQList = append(MQList, NewSendRegisteredEmail())
}
func NewSendRegisteredEmail() *SendRegisteredEmail {
return &SendRegisteredEmail{
BaseMQ: BaseMQ{
Topic: "sendRegisteredEmail",
}}
}
```
### 触发生产者
```go
err := mq.NewSendRegisteredEmail().Producer(ctx, []byte("test"), 1*time.Second)
fmt.Printf("err%v", err)
```
### 运行消费者
```shell
bin/fast-api-linux nq -c config.yaml
```
## asysq消息队列基于redis实现
### 开启config.yaml配置中的`asysq`选项
```yaml
...
asynq:
addr: 127.0.0.1:6379
password: "Q#W*9hETg*fp0)@jWmXl"
db: 0
```
### 生产者
> 所有的生产者代码都放在`asynq`目录下,并且都继承`BaseMQ``BaseMQ`中封装了`Producer`方法,只需要实现`Consumer`方法即可
```go
package asynq
import (
"context"
"fastApi/core/logger"
"github.com/hibiken/asynq"
)
func init() {
MQList = append(MQList, NewTest())
}
type Test struct {
BaseMQ
}
func NewTest() *Test {
return &Test{
BaseMQ: BaseMQ{
Typename: "test",
}}
}
func (c *Test) Consumer(ctx context.Context, t *asynq.Task) error {
logger.SLog(ctx).Infof("type: %v, payload: %s", t.Type(), string(t.Payload()))
return nil
}
```
### 触发生产者
```go
rawData := `{"name":"kwinwong"}`
err := asynq.NewTest().Producer(c, []byte(rawData))
fmt.Printf("err%v", err)
```
### 运行消费者
```shell
bin/fast-api-linux asynq -c config.yaml
```
# 链路追踪
fast-api 中基于OpenTelemetry集成了链路追踪配置如下
fast-api 中基于OpenTelemetry集成了链路追踪`config.yaml`配置如下:
```yaml
#链路追踪
telemetry:

76
asynq/mq_base.go Normal file
View File

@@ -0,0 +1,76 @@
package asynq
import (
"context"
"encoding/json"
"fastApi/core/logger"
"github.com/hibiken/asynq"
"github.com/spf13/viper"
)
var AsynqClient *asynq.Client
var MQList []InterfaceMQ
type InterfaceMQ interface {
Producer(ctx context.Context, message []byte, opts ...asynq.Option) error
Consumer(ctx context.Context, t *asynq.Task) error
GetTypename() string
}
type BaseMQ struct {
Typename string
}
func (c *BaseMQ) GetTypename() string {
return c.Typename
}
func GetAsynqClient() {
if AsynqClient == nil {
r := asynq.RedisClientOpt{
Addr: viper.GetString("asynq.addr"),
Password: viper.GetString("asynq.password"),
DB: viper.GetInt("asynq.db"),
}
AsynqClient = asynq.NewClient(r)
}
}
func (c *BaseMQ) Producer(ctx context.Context, payload []byte, opts ...asynq.Option) (err error) {
GetAsynqClient()
traceId := ctx.Value(logger.TraceId).(string)
if traceId == "" {
traceId, _ = logger.CalcTraceId(ctx)
}
data := map[string]string{
"traceId": traceId,
"payload": string(payload),
}
payload, _ = json.Marshal(data)
task := asynq.NewTask(c.GetTypename(), payload)
//延时消费 5秒后消费
//opts = append(opts, asynq.ProcessIn(5*time.Second))
// 10秒超时
//opts = append(opts, asynq.Timeout(10*time.Second))
//最多重试次参数
opts = append(opts, asynq.MaxRetry(100))
//20秒后过期
//opts = append(opts, asynq.Deadline(time.Now().Add(20*time.Second)))
res, err := AsynqClient.Enqueue(task, opts...)
if err != nil {
logger.SLog(ctx).Errorf("加入队列(%s)失败:%v", res.Type, err.Error())
} else {
logger.SLog(ctx).Infof("加入队列:%s成功参数%s", res.Type, string(payload))
}
return
}

28
asynq/test.go Normal file
View File

@@ -0,0 +1,28 @@
package asynq
import (
"context"
"fastApi/core/logger"
"github.com/hibiken/asynq"
)
func init() {
MQList = append(MQList, NewTest())
}
type Test struct {
BaseMQ
}
func NewTest() *Test {
return &Test{
BaseMQ: BaseMQ{
Typename: "test",
}}
}
func (c *Test) Consumer(ctx context.Context, t *asynq.Task) error {
logger.SLog(ctx).Infof("type: %v, payload: %s", t.Type(), string(t.Payload()))
return nil
}

View File

@@ -22,6 +22,7 @@ func init() {
RootCmd.AddCommand(server.StartApi)
RootCmd.AddCommand(server.StartCmd)
RootCmd.AddCommand(server.StartMQ)
RootCmd.AddCommand(server.StartAsynq)
}
func persistentPreRun(cmd *cobra.Command, args []string) {

163
cmd/server/asynq.go Normal file
View File

@@ -0,0 +1,163 @@
package server
import (
"context"
"encoding/json"
nq "fastApi/asynq"
"fastApi/core/global"
"fastApi/core/logger"
"fastApi/util"
"fmt"
"github.com/hibiken/asynq"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.opentelemetry.io/otel/attribute"
oteltrace "go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"os"
"os/signal"
"syscall"
"time"
)
var (
StartAsynq = &cobra.Command{
Use: "asynq",
Short: "Welcome to a Tour of Asynq!",
Example: "Asynq 是一个由Redis实现的异步队列 Go 库",
PreRun: func(cmd *cobra.Command, args []string) {
},
RunE: func(cmd *cobra.Command, args []string) error {
return run()
},
}
)
func run() error {
// asynq server
srv := asynq.NewServer(
asynq.RedisClientOpt{
Addr: viper.GetString("asynq.addr"),
Password: viper.GetString("asynq.password"),
DB: viper.GetInt("asynq.db"),
},
asynq.Config{
Logger: global.SLog,
// Concurrency: 20, //worker数量,默认启动的worker数量是服务器的CPU个数
RetryDelayFunc: func(n int, e error, t *asynq.Task) time.Duration {
delay := 1 * time.Second
if n < 15 {
delay = 60 * time.Second
} else if n < 25 {
delay = 300 * time.Second
} else if n < 60 {
delay = 600 * time.Second
} else {
delay = 3600 * time.Second
}
return delay
},
},
)
mux := asynq.NewServeMux()
// some middlewares
mux.Use(func(next asynq.Handler) asynq.Handler {
return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
var log *zap.Logger
var span oteltrace.Span
defer func() {
if r := recover(); r != nil {
if log == nil {
log = logger.Log(ctx).With(
zap.String("url", t.Type()),
)
}
log.Sugar().Errorf("panic: %v", r)
}
}()
ctx, span, _ = util.ContextWithSpanContext(ctx, "", "", "queue-consumer", t.Type())
if span != nil {
defer span.End()
}
startTime := time.Now()
var data map[string]string
err := json.Unmarshal(t.Payload(), &data)
if err != nil {
log = logger.Log(ctx).With(
zap.String("url", t.Type()),
zap.String("params", string(t.Payload())),
)
log.Error("数据解析失败: " + err.Error())
}
if span != nil {
span.SetAttributes(attribute.String("traceId", data["traceId"]))
}
ctx = logger.WithC(ctx,
zap.String("url", t.Type()),
zap.String("traceId", data["traceId"]),
)
t = asynq.NewTask(t.Type(), []byte(data["payload"]))
err = next.ProcessTask(ctx, t)
endTime := time.Now()
latencyTime := endTime.Sub(startTime)
log = logger.Log(ctx).With(
zap.String("params", data["payload"]),
zap.Duration("runtime", latencyTime),
)
if err != nil {
log.Error("任务执行失败: " + err.Error())
} else {
log.Info("任务执行成功")
}
return err
})
})
for _, q := range nq.MQList {
mux.HandleFunc(q.GetTypename(), q.Consumer)
}
// start server
if err := srv.Start(mux); err != nil {
panic(err)
}
// Wait for termination signal.
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM,
syscall.SIGQUIT,
//syscall.SIGUSR1, syscall.SIGUSR2,
)
for {
s := <-c
switch s {
case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT:
fmt.Println("Program Exit...", s)
srv.Shutdown()
srv.Stop()
return nil
//case syscall.SIGUSR1:
// fmt.Println("usr1 signal", s)
//case syscall.SIGUSR2:
// fmt.Println("usr2 signal", s)
default:
fmt.Println("other signal", s)
}
}
}

View File

@@ -32,7 +32,7 @@ func initConsumer(topic string, channel string, address string, handler nsq.Hand
if err != nil {
panic(err)
}
c.SetLoggerLevel(nsq.LogLevelWarning) //屏蔽系统日志
c.SetLoggerLevel(nsq.LogLevelWarning) //支显示警告级别及以上的日志
c.AddHandler(handler) // 添加消费者接口
//建立NSQLookupd连接

View File

@@ -18,7 +18,6 @@ redis:
password: ""
db: 0
jwt:
# token 密钥
key: "dgE7B6SPrS%9yLE"
@@ -35,6 +34,11 @@ logger:
# producer: 127.0.0.1:4150
# consumer: 127.0.0.1:4161
#asynq:
# addr: 127.0.0.1:6379
# password: "Q#W*9hETg*fp0)@jWmXl"
# db: 0
#链路追踪
#telemetry:
# name: fast-api

View File

@@ -15,6 +15,7 @@ const DBKey = "DB"
var (
Trans ut.Translator // 定义一个全局翻译器T
Log *zap.Logger
SLog *zap.SugaredLogger
GDB *gorm.DB // DB 数据库链接单例
Redis *redis.Client
Producer *nsq.Producer

View File

@@ -9,6 +9,7 @@ import (
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"gorm.io/gorm"
"io"
"os"
)
@@ -41,6 +42,7 @@ func InitLogger() *zap.Logger {
)
global.Log = zap.New(zapCore)
global.SLog = global.Log.Sugar()
logger = global.Log
return logger
}
@@ -68,24 +70,30 @@ func CalcTraceId(ctx context.Context) (traceId, spanId string) {
return uuid.New().String(), ""
}
func With(c *gin.Context, fields ...zap.Field) {
func injectToContext(log *zap.Logger, slog *zap.SugaredLogger, db *gorm.DB, store func(key string, value interface{})) {
store(loggerKey, log)
store(loggerSugarKey, slog)
store(global.DBKey, db)
}
func createLoggerAndDB(fields ...zap.Field) (*zap.Logger, *zap.SugaredLogger, *gorm.DB) {
log := logger.With(fields...)
slog := log.Sugar()
db := global.GDB
db.Logger = NewGormLog(log)
c.Set(loggerKey, log)
c.Set(loggerSugarKey, slog)
c.Set(global.DBKey, db)
return log, slog, db
}
func With(c *gin.Context, fields ...zap.Field) {
log, slog, db := createLoggerAndDB(fields...)
injectToContext(log, slog, db, c.Set)
}
func WithC(c context.Context, fields ...zap.Field) context.Context {
log := logger.With(fields...)
slog := log.Sugar()
db := global.GDB
db.Logger = NewGormLog(log)
c = context.WithValue(c, loggerKey, log)
c = context.WithValue(c, loggerSugarKey, slog)
c = context.WithValue(c, global.DBKey, db)
log, slog, db := createLoggerAndDB(fields...)
injectToContext(log, slog, db, func(key string, value interface{}) {
c = context.WithValue(c, key, value)
})
return c
}

View File

@@ -50,7 +50,7 @@ func CronInit() {
Cron.Start()
}
func WithRequestId(ctx context.Context, name, traceId string) context.Context {
func WithTraceId(ctx context.Context, name, traceId string) context.Context {
return logger.WithC(
ctx,
zap.String("traceId", traceId),
@@ -63,7 +63,7 @@ func BaseCronFuc(name string, cmd func(context.Context)) func() {
traceId := uuid.New().String()
ctx := context.WithValue(context.Background(), logger.TraceId, traceId)
ctx = WithRequestId(ctx, name, traceId)
ctx = WithTraceId(ctx, name, traceId)
cmd(ctx)
}
}

6
go.mod
View File

@@ -15,6 +15,7 @@ require (
github.com/go-redis/redis/v8 v8.11.5
github.com/golang-jwt/jwt/v4 v4.4.2
github.com/google/uuid v1.6.0
github.com/hibiken/asynq v0.24.1
github.com/nsqio/go-nsq v1.1.0
github.com/robfig/cron/v3 v3.0.1
github.com/spf13/cobra v1.5.0
@@ -38,7 +39,7 @@ require (
github.com/KyleBanks/depth v1.2.1 // indirect
github.com/bytedance/sonic v1.12.1 // indirect
github.com/bytedance/sonic/loader v0.2.0 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cloudwego/base64x v0.1.4 // indirect
github.com/cloudwego/iasm v0.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
@@ -54,6 +55,7 @@ require (
github.com/go-redis/redis/extra/rediscmd/v8 v8.11.5 // indirect
github.com/go-sql-driver/mysql v1.7.0 // indirect
github.com/goccy/go-json v0.10.3 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
@@ -71,6 +73,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/redis/go-redis/v9 v9.0.3 // indirect
github.com/spf13/afero v1.8.2 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
@@ -85,6 +88,7 @@ require (
golang.org/x/net v0.28.0 // indirect
golang.org/x/sys v0.24.0 // indirect
golang.org/x/text v0.17.0 // indirect
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/ini.v1 v1.66.4 // indirect

View File

@@ -89,6 +89,12 @@ func (b *BaseMQ) Handle(msg *nsq.Message, h HandleFunc) error {
}
}()
var span oteltrace.Span
ctx, span, _ = util.ContextWithSpanContext(ctx, "", "", "queue-consumer", b.GetTopic())
if span != nil {
defer span.End()
}
startTime := time.Now()
var data map[string]string
@@ -102,11 +108,8 @@ func (b *BaseMQ) Handle(msg *nsq.Message, h HandleFunc) error {
).Error("数据解析失败: " + err.Error())
}
var span oteltrace.Span
ctx, span, _ = util.ContextWithSpanContext(ctx, "", "", "queue-consumer", b.GetTopic())
if span != nil {
span.SetAttributes(attribute.String("traceId", data["traceId"]))
defer span.End()
}
ctx = context.WithValue(ctx, logger.TraceId, data["traceId"])

View File

@@ -1,6 +1,7 @@
package router
import (
"fastApi/asynq"
"fastApi/core/middleware"
_ "fastApi/docs" // 千万不要忘了导入把你上一步生成的docs
"fastApi/mq"
@@ -35,12 +36,19 @@ func initGin() *gin.Engine {
c.String(200, "pong")
})
engine.GET("/queue_test", func(c *gin.Context) {
engine.GET("/nsq_test", func(c *gin.Context) {
err := mq.NewSendRegisteredEmail().Producer(c, []byte("test"), 1*time.Second)
fmt.Printf("\n\n%#v\n\n", err)
fmt.Printf("err%#v", err)
c.String(200, "pong")
})
engine.GET("/asynq_test", func(c *gin.Context) {
rawData := `{"name":"kwinwong"}`
err := asynq.NewTest().Producer(c, []byte(rawData))
fmt.Printf("err%#v", err)
c.String(200, "pong")
})
return engine
}