mirror of
https://github.com/go-eagle/eagle.git
synced 2025-12-24 13:37:56 +08:00
chore: improve redis queue and add example
This commit is contained in:
22
examples/queue/redis/app.yaml
Normal file
22
examples/queue/redis/app.yaml
Normal file
@@ -0,0 +1,22 @@
|
||||
Name: eagle
|
||||
Version: 1.0.0
|
||||
PprofPort: :5555
|
||||
Mode: debug # debug, release, test
|
||||
JwtSecret: JWT_SECRET
|
||||
JwtTimeout: 86400
|
||||
CookieName: jwt-token
|
||||
SSL: true
|
||||
CtxDefaultTimeout: 12
|
||||
CSRF: true
|
||||
Debug: false
|
||||
EnableTrace: false
|
||||
EnablePprof: true
|
||||
|
||||
HTTP:
|
||||
Addr: :8080
|
||||
ReadTimeout: 3s
|
||||
WriteTimeout: 3s
|
||||
GRPC:
|
||||
Addr: :9090
|
||||
ReadTimeout: 5s
|
||||
WriteTimeout: 5s
|
||||
50
examples/queue/redis/client.go
Normal file
50
examples/queue/redis/client.go
Normal file
@@ -0,0 +1,50 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-eagle/eagle/pkg/config"
|
||||
"github.com/hibiken/asynq"
|
||||
)
|
||||
|
||||
var (
|
||||
client *asynq.Client
|
||||
once sync.Once
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Queue struct {
|
||||
Addr string
|
||||
Password string
|
||||
DB int
|
||||
MinIdleConn int
|
||||
DialTimeout time.Duration
|
||||
ReadTimeout time.Duration
|
||||
WriteTimeout time.Duration
|
||||
PoolSize int
|
||||
PoolTimeout time.Duration
|
||||
Concurrency int //并发数
|
||||
} `json:"redis"`
|
||||
}
|
||||
|
||||
func GetClient() *asynq.Client {
|
||||
once.Do(func() {
|
||||
//c := config.New("config", config.WithEnv("local"))
|
||||
c := config.New(".")
|
||||
var cfg Config
|
||||
if err := c.Load("redis", &cfg); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
client = asynq.NewClient(asynq.RedisClientOpt{
|
||||
Addr: cfg.Queue.Addr,
|
||||
Password: cfg.Queue.Password,
|
||||
DB: cfg.Queue.DB,
|
||||
DialTimeout: cfg.Queue.DialTimeout,
|
||||
ReadTimeout: cfg.Queue.ReadTimeout,
|
||||
WriteTimeout: cfg.Queue.WriteTimeout,
|
||||
PoolSize: cfg.Queue.PoolSize,
|
||||
})
|
||||
})
|
||||
return client
|
||||
}
|
||||
56
examples/queue/redis/handler.go
Normal file
56
examples/queue/redis/handler.go
Normal file
@@ -0,0 +1,56 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
)
|
||||
|
||||
const (
|
||||
TypeEmailWelcome = "email:welcome"
|
||||
)
|
||||
|
||||
type EmailWelcomePayload struct {
|
||||
UserID int64
|
||||
}
|
||||
|
||||
//----------------------------------------------
|
||||
// Write a function NewXXXTask to create a task.
|
||||
// A task consists of a type and a payload.
|
||||
//----------------------------------------------
|
||||
|
||||
func NewEmailWelcomeTask(data EmailWelcomePayload) error {
|
||||
payload, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "json marshal error, name: %s", TypeEmailWelcome)
|
||||
}
|
||||
task := asynq.NewTask(TypeEmailWelcome, payload)
|
||||
_, err = GetClient().Enqueue(task)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "Enqueue task error, name: %s", TypeEmailWelcome)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
//---------------------------------------------------------------
|
||||
// Write a function HandleXXXTask to handle the input task.
|
||||
// Note that it satisfies the asynq.HandlerFunc interface.
|
||||
//
|
||||
// Handler doesn't need to be a function. You can define a type
|
||||
// that satisfies asynq.Handler interface. See examples below.
|
||||
//---------------------------------------------------------------
|
||||
|
||||
func HandleEmailWelcomeTask(ctx context.Context, t *asynq.Task) error {
|
||||
var p EmailWelcomePayload
|
||||
if err := json.Unmarshal(t.Payload(), &p); err != nil {
|
||||
return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
|
||||
}
|
||||
log.Printf("Sending Email to User: user_id=%d", p.UserID)
|
||||
// Email delivery code ...
|
||||
return nil
|
||||
}
|
||||
14
examples/queue/redis/logger.yaml
Normal file
14
examples/queue/redis/logger.yaml
Normal file
@@ -0,0 +1,14 @@
|
||||
Development: false
|
||||
DisableCaller: false
|
||||
DisableStacktrace: false
|
||||
Encoding: console # json or console
|
||||
Level: info # 日志级别,INFO, WARN, ERROR
|
||||
Name: eagle
|
||||
Writers: console # 有2个可选项:file,console 选择file会将日志记录到logger_file指定的日志文件中,选择console会将日志输出到标准输出,当然也可以两者同时选择
|
||||
LoggerFile: /tmp/log/eagle.log
|
||||
LoggerWarnFile: /tmp/log/eagle.wf.log
|
||||
LoggerErrorFile: /tmp/log/eagle.err.log
|
||||
LogRollingPolicy: daily
|
||||
LogRotateDate: 1
|
||||
LogRotateSize: 1
|
||||
LogBackupCount: 7
|
||||
17
examples/queue/redis/producer.go
Normal file
17
examples/queue/redis/producer.go
Normal file
@@ -0,0 +1,17 @@
|
||||
package main
|
||||
|
||||
import "time"
|
||||
|
||||
// cd examples/queue/redis/consumer/
|
||||
// go run producer.go handler.go client.go
|
||||
func main() {
|
||||
for i := 0; i < 10; i++ {
|
||||
err := NewEmailWelcomeTask(EmailWelcomePayload{
|
||||
UserID: time.Now().Unix(),
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
11
examples/queue/redis/redis.yaml
Normal file
11
examples/queue/redis/redis.yaml
Normal file
@@ -0,0 +1,11 @@
|
||||
Queue:
|
||||
Addr: 127.0.0.1:6379
|
||||
Password: ""
|
||||
DB: 0
|
||||
MinIdleConn: 200
|
||||
DialTimeout: 60s
|
||||
ReadTimeout: 500ms
|
||||
WriteTimeout: 500ms
|
||||
PoolSize: 100
|
||||
PoolTimeout: 240s
|
||||
EnableTrace: true
|
||||
61
examples/queue/redis/server.go
Normal file
61
examples/queue/redis/server.go
Normal file
@@ -0,0 +1,61 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
eagle "github.com/go-eagle/eagle/pkg/app"
|
||||
"github.com/go-eagle/eagle/pkg/config"
|
||||
logger "github.com/go-eagle/eagle/pkg/log"
|
||||
redisMQ "github.com/go-eagle/eagle/pkg/transport/consumer/redis"
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/spf13/pflag"
|
||||
)
|
||||
|
||||
// redis queue consumer
|
||||
// cd examples/queue/redis/consumer/
|
||||
// go run server.go handler.go client.go
|
||||
func main() {
|
||||
pflag.Parse()
|
||||
|
||||
// init config
|
||||
c := config.New(".")
|
||||
var cfg eagle.Config
|
||||
if err := c.Load("app", &cfg); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
// set global
|
||||
eagle.Conf = &cfg
|
||||
|
||||
logger.Init()
|
||||
|
||||
srv := redisMQ.NewServer(
|
||||
asynq.RedisClientOpt{Addr: "localhost:6379"},
|
||||
asynq.Config{
|
||||
// Specify how many concurrent workers to use
|
||||
Concurrency: 10,
|
||||
// Optionally specify multiple queues with different priority.
|
||||
Queues: map[string]int{
|
||||
redisMQ.QueueCritical: 6,
|
||||
redisMQ.QueueDefault: 3,
|
||||
redisMQ.QueueLow: 1,
|
||||
},
|
||||
// See the godoc for other configuration options
|
||||
},
|
||||
)
|
||||
|
||||
// register handler
|
||||
srv.RegisterHandler(TypeEmailWelcome, HandleEmailWelcomeTask)
|
||||
// here register other handlers...
|
||||
|
||||
// start app
|
||||
app := eagle.New(
|
||||
eagle.WithName(cfg.Name),
|
||||
eagle.WithVersion(cfg.Version),
|
||||
eagle.WithLogger(logger.GetLogger()),
|
||||
eagle.WithServer(
|
||||
srv,
|
||||
),
|
||||
)
|
||||
|
||||
if err := app.Run(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
@@ -2,45 +2,40 @@ package redis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
)
|
||||
|
||||
const (
|
||||
// QueueCritical queue priority
|
||||
QueueCritical = "critical"
|
||||
QueueDefault = "default"
|
||||
QueueLow = "low"
|
||||
)
|
||||
|
||||
// Server async server
|
||||
type Server struct {
|
||||
clientOpt asynq.RedisClientOpt
|
||||
|
||||
// async server
|
||||
srv *asynq.Server
|
||||
mux *asynq.ServeMux
|
||||
|
||||
// async schedule
|
||||
sche *asynq.Scheduler
|
||||
}
|
||||
|
||||
// NewServer new async server
|
||||
func NewServer(redisOpt asynq.RedisClientOpt, asyncCfg asynq.Config) *Server {
|
||||
srv := &Server{
|
||||
srv: asynq.NewServer(redisOpt, asyncCfg),
|
||||
mux: asynq.NewServeMux(),
|
||||
sche: asynq.NewScheduler(
|
||||
redisOpt,
|
||||
&asynq.SchedulerOpts{Location: time.Local},
|
||||
),
|
||||
}
|
||||
|
||||
return srv
|
||||
}
|
||||
|
||||
// Start async server
|
||||
func (s *Server) Start(ctx context.Context) error {
|
||||
go func() {
|
||||
err := s.sche.Run()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
|
||||
err := s.srv.Run(s.mux)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to run async server: %v")
|
||||
@@ -49,18 +44,13 @@ func (s *Server) Start(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop async server
|
||||
func (s *Server) Stop(ctx context.Context) error {
|
||||
s.srv.Shutdown()
|
||||
s.sche.Shutdown()
|
||||
return nil
|
||||
}
|
||||
|
||||
// RegisterTask register task
|
||||
func (s *Server) RegisterTask(schedule string, task *asynq.Task) (entryID string, err error) {
|
||||
return s.sche.Register(schedule, task)
|
||||
}
|
||||
|
||||
// RegisterHandle register handler
|
||||
func (s *Server) RegisterHandle(pattern string, handler func(context.Context, *asynq.Task) error) {
|
||||
// RegisterHandler register handler
|
||||
func (s *Server) RegisterHandler(pattern string, handler func(context.Context, *asynq.Task) error) {
|
||||
s.mux.HandleFunc(pattern, handler)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user