diff --git a/dag/fiber_api.go b/dag/fiber_api.go index d8b3a93..448fdc7 100644 --- a/dag/fiber_api.go +++ b/dag/fiber_api.go @@ -51,7 +51,9 @@ func (tm *DAG) RenderFiber(c *fiber.Ctx) error { if result.Error != nil { return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"message": result.Error.Error()}) } - + if result.Ctx == nil { + result.Ctx = ctx + } contentType := consts.TypeJson if ct, ok := result.Ctx.Value(consts.ContentType).(string); ok { contentType = ct diff --git a/handlers/flatten_handler.go b/handlers/flatten_handler.go index 44a5dee..05ce6f9 100644 --- a/handlers/flatten_handler.go +++ b/handlers/flatten_handler.go @@ -18,12 +18,12 @@ func (h *FlattenHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Resu var data map[string]any err := json.Unmarshal(task.Payload, &data) if err != nil { - return mq.Result{Error: fmt.Errorf("failed to unmarshal task payload: %w", err)} + return mq.Result{Error: fmt.Errorf("failed to unmarshal task payload: %w", err), Ctx: ctx} } operation, ok := h.Payload.Data["operation"].(string) if !ok { - return mq.Result{Error: fmt.Errorf("operation not specified")} + return mq.Result{Error: fmt.Errorf("operation not specified"), Ctx: ctx} } var result map[string]any @@ -37,12 +37,12 @@ func (h *FlattenHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Resu case "flatten_array": result = h.flattenArray(data) default: - return mq.Result{Error: fmt.Errorf("unsupported operation: %s", operation)} + return mq.Result{Error: fmt.Errorf("unsupported operation: %s", operation), Ctx: ctx} } resultPayload, err := json.Marshal(result) if err != nil { - return mq.Result{Error: fmt.Errorf("failed to marshal result: %w", err)} + return mq.Result{Error: fmt.Errorf("failed to marshal result: %w", err), Ctx: ctx} } return mq.Result{Payload: resultPayload, Ctx: ctx} diff --git a/handlers/format_handler.go b/handlers/format_handler.go index 4a75b3b..24cd6a4 100644 --- a/handlers/format_handler.go +++ b/handlers/format_handler.go @@ -21,12 +21,12 @@ func (h *FormatHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Resul var data map[string]any err := json.Unmarshal(task.Payload, &data) if err != nil { - return mq.Result{Error: fmt.Errorf("failed to unmarshal task payload: %w", err)} + return mq.Result{Error: fmt.Errorf("failed to unmarshal task payload: %w", err), Ctx: ctx} } formatType, ok := h.Payload.Data["format_type"].(string) if !ok { - return mq.Result{Error: fmt.Errorf("format_type not specified")} + return mq.Result{Error: fmt.Errorf("format_type not specified"), Ctx: ctx} } var result map[string]any @@ -48,12 +48,12 @@ func (h *FormatHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Resul case "trim": result = h.formatTrim(data) default: - return mq.Result{Error: fmt.Errorf("unsupported format_type: %s", formatType)} + return mq.Result{Error: fmt.Errorf("unsupported format_type: %s", formatType), Ctx: ctx} } resultPayload, err := json.Marshal(result) if err != nil { - return mq.Result{Error: fmt.Errorf("failed to marshal result: %w", err)} + return mq.Result{Error: fmt.Errorf("failed to marshal result: %w", err), Ctx: ctx} } return mq.Result{Payload: resultPayload, Ctx: ctx} diff --git a/handlers/group_handler.go b/handlers/group_handler.go index 11ce307..0df001a 100644 --- a/handlers/group_handler.go +++ b/handlers/group_handler.go @@ -25,12 +25,12 @@ func (h *GroupHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result // Extract the data array dataArray, ok := data["data"].([]interface{}) if !ok { - return mq.Result{Error: fmt.Errorf("expected 'data' field to be an array")} + return mq.Result{Error: fmt.Errorf("expected 'data' field to be an array"), Ctx: ctx} } groupByFields := h.getGroupByFields() if len(groupByFields) == 0 { - return mq.Result{Error: fmt.Errorf("group_by fields not specified")} + return mq.Result{Error: fmt.Errorf("group_by fields not specified"), Ctx: ctx} } aggregations := h.getAggregations() @@ -43,7 +43,7 @@ func (h *GroupHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result resultPayload, err := json.Marshal(data) if err != nil { - return mq.Result{Error: fmt.Errorf("failed to marshal result: %w", err)} + return mq.Result{Error: fmt.Errorf("failed to marshal result: %w", err), Ctx: ctx} } return mq.Result{Payload: resultPayload, Ctx: ctx} diff --git a/handlers/json_handler.go b/handlers/json_handler.go index 8cf9f30..f2e03e6 100644 --- a/handlers/json_handler.go +++ b/handlers/json_handler.go @@ -23,7 +23,7 @@ func (h *JSONHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result operation, ok := h.Payload.Data["operation"].(string) if !ok { - return mq.Result{Error: fmt.Errorf("operation not specified")} + return mq.Result{Error: fmt.Errorf("operation not specified"), Ctx: ctx} } var result map[string]any @@ -41,12 +41,12 @@ func (h *JSONHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result case "extract_fields": result = h.extractFields(data) default: - return mq.Result{Error: fmt.Errorf("unsupported operation: %s", operation)} + return mq.Result{Error: fmt.Errorf("unsupported operation: %s", operation), Ctx: ctx} } resultPayload, err := json.Marshal(result) if err != nil { - return mq.Result{Error: fmt.Errorf("failed to marshal result: %w", err)} + return mq.Result{Error: fmt.Errorf("failed to marshal result: %w", err), Ctx: ctx} } return mq.Result{Payload: resultPayload, Ctx: ctx} diff --git a/services/go.mod b/services/go.mod index 75f43ad..2d8c085 100644 --- a/services/go.mod +++ b/services/go.mod @@ -69,10 +69,10 @@ require ( github.com/xhit/go-simple-mail/v2 v2.16.0 // indirect github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778 // indirect github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect - golang.org/x/crypto v0.33.0 // indirect + golang.org/x/crypto v0.41.0 // indirect golang.org/x/exp v0.0.0-20250305212735-054e65f0b394 // indirect - golang.org/x/sync v0.14.0 // indirect - golang.org/x/sys v0.31.0 // indirect - golang.org/x/text v0.25.0 // indirect + golang.org/x/sync v0.16.0 // indirect + golang.org/x/sys v0.35.0 // indirect + golang.org/x/text v0.28.0 // indirect golang.org/x/time v0.11.0 // indirect ) diff --git a/services/go.sum b/services/go.sum index 7e31823..a57cccb 100644 --- a/services/go.sum +++ b/services/go.sum @@ -157,17 +157,21 @@ github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZ github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus= golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M= +golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc= golang.org/x/exp v0.0.0-20250305212735-054e65f0b394 h1:nDVHiLt8aIbd/VzvPWN6kSOPE7+F/fNFDSXLVYkE/Iw= golang.org/x/exp v0.0.0-20250305212735-054e65f0b394/go.mod h1:sIifuuw/Yco/y6yb6+bDNfyeQ/MdPUy/hKEMYQV17cM= golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8= golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk= golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ= golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/text v0.25.0 h1:qVyWApTSYLk/drJRO5mDlNYskwQznZmkpV2c8q9zls4= golang.org/x/text v0.25.0/go.mod h1:WEdwpYrmk1qmdHvhkSTNPm3app7v4rsT8F2UD6+VHIA= +golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU= golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0= golang.org/x/time v0.11.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/services/setup.go b/services/setup.go index 2ede546..a44449e 100644 --- a/services/setup.go +++ b/services/setup.go @@ -9,6 +9,7 @@ import ( "slices" "sort" "strings" + "time" "github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2/middleware/basicauth" @@ -31,14 +32,11 @@ import ( var ValidationInstance Validation -func Setup(loader *Loader, serverApp *fiber.App, brokerAddr string) { +func Setup(loader *Loader, serverApp *fiber.App, brokerAddr string) error { if loader.UserConfig == nil { - return - } - err := SetupServices(loader.Prefix(), serverApp, brokerAddr) - if err != nil { - panic(err) + return nil } + return SetupServices(loader.Prefix(), serverApp, brokerAddr) } func SetupHandler(handler Handler, brokerAddr string, async ...bool) *dag.DAG { @@ -209,6 +207,39 @@ func mapProviders(dataProviders interface{}) []dag.Provider { return providers } +func setupBackgroundHandlers(brokerAddress string) { + for _, cmd := range userConfig.Policy.BackgroundHandlers { + if cmd.Handler.Key == "" && cmd.HandlerKey != "" { + handler := userConfig.GetHandler(cmd.HandlerKey) + if handler == nil { + panic(fmt.Sprintf("Handler not found %s", cmd.HandlerKey)) + } + cmd.Handler = *handler + } + flow := SetupHandler(cmd.Handler, brokerAddress) + if flow.Error != nil { + panic(flow.Error) + } + flow.AssignTopic(cmd.Queue) + if cmd.Schedule != nil && cmd.Schedule.Enable { + duration, err := utils.ParseDuration(cmd.Schedule.Interval) + if err != nil { + duration = time.Minute + } + go func() { + time.Sleep(5 * time.Second) + flow.ScheduleTask(context.Background(), cmd.Payload, mq.WithInterval(duration)) + }() + } + go func() { + err := flow.Consume(context.Background()) + if err != nil { + panic(err) + } + }() + } +} + func SetupServices(prefix string, router fiber.Router, brokerAddr string) error { if router == nil { return nil @@ -217,6 +248,7 @@ func SetupServices(prefix string, router fiber.Router, brokerAddr string) error if err != nil { return err } + setupBackgroundHandlers(brokerAddr) static := userConfig.Policy.Web.Static if static != nil && static.Dir != "" { router.Static( @@ -244,7 +276,7 @@ func SetupAPI(prefix string, router fiber.Router, brokerAddr string) error { api := router.Group(prefix) for _, configRoute := range userConfig.Policy.Web.Apis { routeGroup := api.Group(configRoute.Prefix) - mws := setupMiddlewares(configRoute.Middlewares) + mws := setupMiddlewares(configRoute.Middlewares...) if len(mws) > 0 { routeGroup.Use(mws...) } @@ -252,7 +284,7 @@ func SetupAPI(prefix string, router fiber.Router, brokerAddr string) error { switch route.Operation { case "custom": flow := setupFlow(route, routeGroup, brokerAddr) - routeMiddlewares := setupMiddlewares(route.Middlewares) + routeMiddlewares := setupMiddlewares(route.Middlewares...) if len(routeMiddlewares) > 0 { routeGroup.Use(routeMiddlewares...) } @@ -516,7 +548,7 @@ func requestMiddleware(prefix string, route *Route) fiber.Handler { } } -func setupMiddlewares(middlewares []Middleware) (mid []any) { +func setupMiddlewares(middlewares ...Middleware) (mid []any) { for _, middleware := range middlewares { switch middleware.Name { case "cors":