mirror of
https://github.com/oarkflow/mq.git
synced 2025-10-04 23:52:48 +08:00
feat: update
This commit is contained in:
@@ -51,7 +51,9 @@ func (tm *DAG) RenderFiber(c *fiber.Ctx) error {
|
||||
if result.Error != nil {
|
||||
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"message": result.Error.Error()})
|
||||
}
|
||||
|
||||
if result.Ctx == nil {
|
||||
result.Ctx = ctx
|
||||
}
|
||||
contentType := consts.TypeJson
|
||||
if ct, ok := result.Ctx.Value(consts.ContentType).(string); ok {
|
||||
contentType = ct
|
||||
|
@@ -18,12 +18,12 @@ func (h *FlattenHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Resu
|
||||
var data map[string]any
|
||||
err := json.Unmarshal(task.Payload, &data)
|
||||
if err != nil {
|
||||
return mq.Result{Error: fmt.Errorf("failed to unmarshal task payload: %w", err)}
|
||||
return mq.Result{Error: fmt.Errorf("failed to unmarshal task payload: %w", err), Ctx: ctx}
|
||||
}
|
||||
|
||||
operation, ok := h.Payload.Data["operation"].(string)
|
||||
if !ok {
|
||||
return mq.Result{Error: fmt.Errorf("operation not specified")}
|
||||
return mq.Result{Error: fmt.Errorf("operation not specified"), Ctx: ctx}
|
||||
}
|
||||
|
||||
var result map[string]any
|
||||
@@ -37,12 +37,12 @@ func (h *FlattenHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Resu
|
||||
case "flatten_array":
|
||||
result = h.flattenArray(data)
|
||||
default:
|
||||
return mq.Result{Error: fmt.Errorf("unsupported operation: %s", operation)}
|
||||
return mq.Result{Error: fmt.Errorf("unsupported operation: %s", operation), Ctx: ctx}
|
||||
}
|
||||
|
||||
resultPayload, err := json.Marshal(result)
|
||||
if err != nil {
|
||||
return mq.Result{Error: fmt.Errorf("failed to marshal result: %w", err)}
|
||||
return mq.Result{Error: fmt.Errorf("failed to marshal result: %w", err), Ctx: ctx}
|
||||
}
|
||||
|
||||
return mq.Result{Payload: resultPayload, Ctx: ctx}
|
||||
|
@@ -21,12 +21,12 @@ func (h *FormatHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Resul
|
||||
var data map[string]any
|
||||
err := json.Unmarshal(task.Payload, &data)
|
||||
if err != nil {
|
||||
return mq.Result{Error: fmt.Errorf("failed to unmarshal task payload: %w", err)}
|
||||
return mq.Result{Error: fmt.Errorf("failed to unmarshal task payload: %w", err), Ctx: ctx}
|
||||
}
|
||||
|
||||
formatType, ok := h.Payload.Data["format_type"].(string)
|
||||
if !ok {
|
||||
return mq.Result{Error: fmt.Errorf("format_type not specified")}
|
||||
return mq.Result{Error: fmt.Errorf("format_type not specified"), Ctx: ctx}
|
||||
}
|
||||
|
||||
var result map[string]any
|
||||
@@ -48,12 +48,12 @@ func (h *FormatHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Resul
|
||||
case "trim":
|
||||
result = h.formatTrim(data)
|
||||
default:
|
||||
return mq.Result{Error: fmt.Errorf("unsupported format_type: %s", formatType)}
|
||||
return mq.Result{Error: fmt.Errorf("unsupported format_type: %s", formatType), Ctx: ctx}
|
||||
}
|
||||
|
||||
resultPayload, err := json.Marshal(result)
|
||||
if err != nil {
|
||||
return mq.Result{Error: fmt.Errorf("failed to marshal result: %w", err)}
|
||||
return mq.Result{Error: fmt.Errorf("failed to marshal result: %w", err), Ctx: ctx}
|
||||
}
|
||||
|
||||
return mq.Result{Payload: resultPayload, Ctx: ctx}
|
||||
|
@@ -25,12 +25,12 @@ func (h *GroupHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result
|
||||
// Extract the data array
|
||||
dataArray, ok := data["data"].([]interface{})
|
||||
if !ok {
|
||||
return mq.Result{Error: fmt.Errorf("expected 'data' field to be an array")}
|
||||
return mq.Result{Error: fmt.Errorf("expected 'data' field to be an array"), Ctx: ctx}
|
||||
}
|
||||
|
||||
groupByFields := h.getGroupByFields()
|
||||
if len(groupByFields) == 0 {
|
||||
return mq.Result{Error: fmt.Errorf("group_by fields not specified")}
|
||||
return mq.Result{Error: fmt.Errorf("group_by fields not specified"), Ctx: ctx}
|
||||
}
|
||||
|
||||
aggregations := h.getAggregations()
|
||||
@@ -43,7 +43,7 @@ func (h *GroupHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result
|
||||
|
||||
resultPayload, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
return mq.Result{Error: fmt.Errorf("failed to marshal result: %w", err)}
|
||||
return mq.Result{Error: fmt.Errorf("failed to marshal result: %w", err), Ctx: ctx}
|
||||
}
|
||||
|
||||
return mq.Result{Payload: resultPayload, Ctx: ctx}
|
||||
|
@@ -23,7 +23,7 @@ func (h *JSONHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result
|
||||
|
||||
operation, ok := h.Payload.Data["operation"].(string)
|
||||
if !ok {
|
||||
return mq.Result{Error: fmt.Errorf("operation not specified")}
|
||||
return mq.Result{Error: fmt.Errorf("operation not specified"), Ctx: ctx}
|
||||
}
|
||||
|
||||
var result map[string]any
|
||||
@@ -41,12 +41,12 @@ func (h *JSONHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result
|
||||
case "extract_fields":
|
||||
result = h.extractFields(data)
|
||||
default:
|
||||
return mq.Result{Error: fmt.Errorf("unsupported operation: %s", operation)}
|
||||
return mq.Result{Error: fmt.Errorf("unsupported operation: %s", operation), Ctx: ctx}
|
||||
}
|
||||
|
||||
resultPayload, err := json.Marshal(result)
|
||||
if err != nil {
|
||||
return mq.Result{Error: fmt.Errorf("failed to marshal result: %w", err)}
|
||||
return mq.Result{Error: fmt.Errorf("failed to marshal result: %w", err), Ctx: ctx}
|
||||
}
|
||||
|
||||
return mq.Result{Payload: resultPayload, Ctx: ctx}
|
||||
|
@@ -69,10 +69,10 @@ require (
|
||||
github.com/xhit/go-simple-mail/v2 v2.16.0 // indirect
|
||||
github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778 // indirect
|
||||
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect
|
||||
golang.org/x/crypto v0.33.0 // indirect
|
||||
golang.org/x/crypto v0.41.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20250305212735-054e65f0b394 // indirect
|
||||
golang.org/x/sync v0.14.0 // indirect
|
||||
golang.org/x/sys v0.31.0 // indirect
|
||||
golang.org/x/text v0.25.0 // indirect
|
||||
golang.org/x/sync v0.16.0 // indirect
|
||||
golang.org/x/sys v0.35.0 // indirect
|
||||
golang.org/x/text v0.28.0 // indirect
|
||||
golang.org/x/time v0.11.0 // indirect
|
||||
)
|
||||
|
@@ -157,17 +157,21 @@ github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZ
|
||||
github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E=
|
||||
golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus=
|
||||
golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M=
|
||||
golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc=
|
||||
golang.org/x/exp v0.0.0-20250305212735-054e65f0b394 h1:nDVHiLt8aIbd/VzvPWN6kSOPE7+F/fNFDSXLVYkE/Iw=
|
||||
golang.org/x/exp v0.0.0-20250305212735-054e65f0b394/go.mod h1:sIifuuw/Yco/y6yb6+bDNfyeQ/MdPUy/hKEMYQV17cM=
|
||||
golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8=
|
||||
golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk=
|
||||
golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ=
|
||||
golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
|
||||
golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
|
||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
|
||||
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
||||
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
||||
golang.org/x/text v0.25.0 h1:qVyWApTSYLk/drJRO5mDlNYskwQznZmkpV2c8q9zls4=
|
||||
golang.org/x/text v0.25.0/go.mod h1:WEdwpYrmk1qmdHvhkSTNPm3app7v4rsT8F2UD6+VHIA=
|
||||
golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU=
|
||||
golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0=
|
||||
golang.org/x/time v0.11.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
|
@@ -9,6 +9,7 @@ import (
|
||||
"slices"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/gofiber/fiber/v2"
|
||||
"github.com/gofiber/fiber/v2/middleware/basicauth"
|
||||
@@ -31,14 +32,11 @@ import (
|
||||
|
||||
var ValidationInstance Validation
|
||||
|
||||
func Setup(loader *Loader, serverApp *fiber.App, brokerAddr string) {
|
||||
func Setup(loader *Loader, serverApp *fiber.App, brokerAddr string) error {
|
||||
if loader.UserConfig == nil {
|
||||
return
|
||||
}
|
||||
err := SetupServices(loader.Prefix(), serverApp, brokerAddr)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return nil
|
||||
}
|
||||
return SetupServices(loader.Prefix(), serverApp, brokerAddr)
|
||||
}
|
||||
|
||||
func SetupHandler(handler Handler, brokerAddr string, async ...bool) *dag.DAG {
|
||||
@@ -209,6 +207,39 @@ func mapProviders(dataProviders interface{}) []dag.Provider {
|
||||
return providers
|
||||
}
|
||||
|
||||
func setupBackgroundHandlers(brokerAddress string) {
|
||||
for _, cmd := range userConfig.Policy.BackgroundHandlers {
|
||||
if cmd.Handler.Key == "" && cmd.HandlerKey != "" {
|
||||
handler := userConfig.GetHandler(cmd.HandlerKey)
|
||||
if handler == nil {
|
||||
panic(fmt.Sprintf("Handler not found %s", cmd.HandlerKey))
|
||||
}
|
||||
cmd.Handler = *handler
|
||||
}
|
||||
flow := SetupHandler(cmd.Handler, brokerAddress)
|
||||
if flow.Error != nil {
|
||||
panic(flow.Error)
|
||||
}
|
||||
flow.AssignTopic(cmd.Queue)
|
||||
if cmd.Schedule != nil && cmd.Schedule.Enable {
|
||||
duration, err := utils.ParseDuration(cmd.Schedule.Interval)
|
||||
if err != nil {
|
||||
duration = time.Minute
|
||||
}
|
||||
go func() {
|
||||
time.Sleep(5 * time.Second)
|
||||
flow.ScheduleTask(context.Background(), cmd.Payload, mq.WithInterval(duration))
|
||||
}()
|
||||
}
|
||||
go func() {
|
||||
err := flow.Consume(context.Background())
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func SetupServices(prefix string, router fiber.Router, brokerAddr string) error {
|
||||
if router == nil {
|
||||
return nil
|
||||
@@ -217,6 +248,7 @@ func SetupServices(prefix string, router fiber.Router, brokerAddr string) error
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
setupBackgroundHandlers(brokerAddr)
|
||||
static := userConfig.Policy.Web.Static
|
||||
if static != nil && static.Dir != "" {
|
||||
router.Static(
|
||||
@@ -244,7 +276,7 @@ func SetupAPI(prefix string, router fiber.Router, brokerAddr string) error {
|
||||
api := router.Group(prefix)
|
||||
for _, configRoute := range userConfig.Policy.Web.Apis {
|
||||
routeGroup := api.Group(configRoute.Prefix)
|
||||
mws := setupMiddlewares(configRoute.Middlewares)
|
||||
mws := setupMiddlewares(configRoute.Middlewares...)
|
||||
if len(mws) > 0 {
|
||||
routeGroup.Use(mws...)
|
||||
}
|
||||
@@ -252,7 +284,7 @@ func SetupAPI(prefix string, router fiber.Router, brokerAddr string) error {
|
||||
switch route.Operation {
|
||||
case "custom":
|
||||
flow := setupFlow(route, routeGroup, brokerAddr)
|
||||
routeMiddlewares := setupMiddlewares(route.Middlewares)
|
||||
routeMiddlewares := setupMiddlewares(route.Middlewares...)
|
||||
if len(routeMiddlewares) > 0 {
|
||||
routeGroup.Use(routeMiddlewares...)
|
||||
}
|
||||
@@ -516,7 +548,7 @@ func requestMiddleware(prefix string, route *Route) fiber.Handler {
|
||||
}
|
||||
}
|
||||
|
||||
func setupMiddlewares(middlewares []Middleware) (mid []any) {
|
||||
func setupMiddlewares(middlewares ...Middleware) (mid []any) {
|
||||
for _, middleware := range middlewares {
|
||||
switch middleware.Name {
|
||||
case "cors":
|
||||
|
Reference in New Issue
Block a user