diff --git a/dag/operation.go b/dag/operation.go index ef8b3f6..7915cec 100644 --- a/dag/operation.go +++ b/dag/operation.go @@ -152,12 +152,15 @@ func (e *Operation) ValidateFields(c context.Context, payload []byte) (map[strin var data map[string]any err := json.Unmarshal(payload, &data) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to unmarshal payload: %w", err) } for k, v := range e.Payload.Mapping { _, val := GetVal(c, v, data) if val != nil { keys = append(keys, k) + } else { + // Log missing mapping + fmt.Printf("Warning: Mapping key %s not found for %s\n", k, v) } } for k := range e.Payload.Data { @@ -165,7 +168,7 @@ func (e *Operation) ValidateFields(c context.Context, payload []byte) (map[strin } for _, k := range e.RequiredFields { if !slices.Contains(keys, k) { - return nil, errors.New("Required field doesn't exist") + return nil, fmt.Errorf("required field '%s' is missing or could not be mapped", k) } } return data, nil @@ -232,6 +235,13 @@ func GetVal(c context.Context, v string, data map[string]any) (key string, val a } } + // Log warning if value is nil and not expected + if val == nil && key != "" { + // Assuming logger is available, but since it's not in this file, perhaps add a field or use fmt + // For now, use fmt.Printf for warning + fmt.Printf("Warning: Value not found for key %s in mapping %s\n", key, v) + } + return } diff --git a/dag/task_manager.go b/dag/task_manager.go index 88d9c44..dab9166 100644 --- a/dag/task_manager.go +++ b/dag/task_manager.go @@ -4,13 +4,13 @@ import ( "context" "fmt" "log" + "math/rand" // ...new import for jitter... "strings" "sync" "time" - "math/rand" // ...new import for jitter... - "github.com/oarkflow/json" + "github.com/oarkflow/mq" "github.com/oarkflow/mq/logger" "github.com/oarkflow/mq/storage" @@ -191,6 +191,80 @@ func (tm *TaskManager) waitForResult() { } } +// areDependenciesMet checks if all previous nodes have completed successfully +func (tm *TaskManager) areDependenciesMet(nodeID string) bool { + pureNodeID := strings.Split(nodeID, Delimiter)[0] + + // Get previous nodes + prevNodes, err := tm.dag.GetPreviousNodes(pureNodeID) + if err != nil { + tm.dag.Logger().Error("Error getting previous nodes", logger.Field{Key: "nodeID", Value: nodeID}, logger.Field{Key: "error", Value: err.Error()}) + return false + } + + // For iterator nodes, we need to be more selective about dependencies + // Iterator nodes should only depend on nodes that provide data to them, + // not on nodes that they create (which would be circular dependencies) + node, exists := tm.dag.nodes.Get(pureNodeID) + if exists { + // Check if this node has any iterator edges (meaning it's an iterator node) + hasIteratorEdges := false + for _, edge := range node.Edges { + if edge.Type == Iterator { + hasIteratorEdges = true + break + } + } + + if hasIteratorEdges { + // For iterator nodes, only check dependencies from Simple edges + // Iterator edges represent outputs, not inputs + filteredPrevNodes := make([]*Node, 0) + for _, prevNode := range prevNodes { + // Check if there's a Simple edge from prevNode to this node + hasSimpleEdge := false + for _, edge := range prevNode.Edges { + if edge.To.ID == pureNodeID && edge.Type == Simple { + hasSimpleEdge = true + break + } + } + if hasSimpleEdge { + filteredPrevNodes = append(filteredPrevNodes, prevNode) + } + } + prevNodes = filteredPrevNodes + } + } + + // Check if all relevant previous nodes have completed successfully + for _, prevNode := range prevNodes { + // Check both the pure node ID and the indexed node ID for state + state, exists := tm.taskStates.Get(prevNode.ID) + if !exists { + // Also check if there's a state with an index suffix + tm.taskStates.ForEach(func(key string, s *TaskState) bool { + if strings.Split(key, Delimiter)[0] == prevNode.ID { + state = s + exists = true + return false // Stop iteration + } + return true + }) + } + if !exists || state.Status != mq.Completed { + tm.dag.Logger().Debug("Dependency not met", + logger.Field{Key: "nodeID", Value: nodeID}, + logger.Field{Key: "dependency", Value: prevNode.ID}, + logger.Field{Key: "stateExists", Value: exists}, + logger.Field{Key: "stateStatus", Value: string(state.Status)}) + return false + } + } + + return true +} + func (tm *TaskManager) processNode(exec *task) { startTime := time.Now() pureNodeID := strings.Split(exec.nodeID, Delimiter)[0] @@ -199,6 +273,15 @@ func (tm *TaskManager) processNode(exec *task) { tm.dag.Logger().Error("Node not found", logger.Field{Key: "nodeID", Value: pureNodeID}) return } + + // Check if all dependencies are met before processing + if !tm.areDependenciesMet(pureNodeID) { + tm.dag.Logger().Warn("Dependencies not met for node, deferring", logger.Field{Key: "nodeID", Value: pureNodeID}) + // Defer the task + tm.deferredTasks.Set(exec.taskID, exec) + return + } + // Wrap context with timeout if node.Timeout is configured. if node.Timeout > 0 { var cancel context.CancelFunc @@ -240,11 +323,15 @@ func (tm *TaskManager) processNode(exec *task) { // add jitter to avoid thundering herd jitter := time.Duration(rand.Int63n(int64(tm.baseBackoff))) backoff += jitter - log.Printf("Recoverable error on node %s, retrying in %s: %v", exec.nodeID, backoff, result.Error) + tm.dag.Logger().Warn("Recoverable error on node, retrying", + logger.Field{Key: "nodeID", Value: exec.nodeID}, + logger.Field{Key: "attempt", Value: attempts}, + logger.Field{Key: "backoff", Value: backoff.String()}, + logger.Field{Key: "error", Value: result.Error.Error()}) select { case <-time.After(backoff): case <-exec.ctx.Done(): - log.Printf("Context cancelled for node %s", exec.nodeID) + tm.dag.Logger().Warn("Context cancelled for node", logger.Field{Key: "nodeID", Value: exec.nodeID}) return } continue @@ -252,8 +339,13 @@ func (tm *TaskManager) processNode(exec *task) { if err := tm.recoveryHandler(exec.ctx, result); err == nil { result.Error = nil result.Status = mq.Completed + } else { + result.Error = fmt.Errorf("recovery failed for node %s: %w", exec.nodeID, err) } } + } else { + // Wrap non-recoverable errors with context + result.Error = fmt.Errorf("node %s failed: %w", exec.nodeID, result.Error) } } break @@ -284,6 +376,7 @@ func (tm *TaskManager) processNode(exec *task) { result.Status = mq.Completed state.Result = result state.Result.Status = mq.Completed + state.Status = mq.Completed state.Result.Latency = nodeLatency.String() result.Topic = node.ID tm.updateTimestamps(&result) diff --git a/examples/dag.go b/examples/dag.go new file mode 100644 index 0000000..cee5e03 --- /dev/null +++ b/examples/dag.go @@ -0,0 +1,122 @@ +package main + +import ( + "context" + "fmt" + + "github.com/oarkflow/json" + + "github.com/oarkflow/mq" + "github.com/oarkflow/mq/dag" + "github.com/oarkflow/mq/examples/tasks" +) + +func subDAG() *dag.DAG { + f := dag.NewDAG("Sub DAG", "sub-dag", func(taskID string, result mq.Result) { + fmt.Printf("Sub DAG Final result for task %s: %s\n", taskID, string(result.Payload)) + }, mq.WithSyncMode(true)) + f. + AddNode(dag.Function, "Store data", "store:data", &tasks.StoreData{Operation: dag.Operation{Type: dag.Function}}, true). + AddNode(dag.Function, "Send SMS", "send:sms", &tasks.SendSms{Operation: dag.Operation{Type: dag.Function}}). + AddNode(dag.Function, "Notification", "notification", &tasks.InAppNotification{Operation: dag.Operation{Type: dag.Function}}). + AddEdge(dag.Simple, "Store Payload to send sms", "store:data", "send:sms"). + AddEdge(dag.Simple, "Store Payload to notification", "send:sms", "notification") + return f +} + +func main() { + flow := dag.NewDAG("Sample DAG", "sample-dag", func(taskID string, result mq.Result) { + fmt.Printf("DAG Final result for task %s: %s\n", taskID, string(result.Payload)) + }) + flow.AddNode(dag.Function, "GetData", "GetData", &GetData{}, true) + flow.AddNode(dag.Function, "Loop", "Loop", &Loop{}) + flow.AddNode(dag.Function, "ValidateAge", "ValidateAge", &ValidateAge{}) + flow.AddNode(dag.Function, "ValidateGender", "ValidateGender", &ValidateGender{}) + flow.AddNode(dag.Function, "Final", "Final", &Final{}) + flow.AddDAGNode(dag.Function, "Check", "persistent", subDAG()) + flow.AddEdge(dag.Simple, "GetData", "GetData", "Loop") + flow.AddEdge(dag.Iterator, "Validate age for each item", "Loop", "ValidateAge") + flow.AddCondition("ValidateAge", map[string]string{"pass": "ValidateGender", "default": "persistent"}) + flow.AddEdge(dag.Simple, "Mark as Done", "Loop", "Final") + + // flow.Start(":8080") + data := []byte(`[{"age": "15", "gender": "female"}, {"age": "18", "gender": "male"}]`) + if flow.Error != nil { + panic(flow.Error) + } + + rs := flow.Process(context.Background(), data) + if rs.Error != nil { + panic(rs.Error) + } + fmt.Println(rs.Status, rs.Topic, string(rs.Payload)) +} + +type GetData struct { + dag.Operation +} + +func (p *GetData) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + return mq.Result{Ctx: ctx, Payload: task.Payload} +} + +type Loop struct { + dag.Operation +} + +func (p *Loop) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + return mq.Result{Ctx: ctx, Payload: task.Payload} +} + +type ValidateAge struct { + dag.Operation +} + +func (p *ValidateAge) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]any + if err := json.Unmarshal(task.Payload, &data); err != nil { + return mq.Result{Error: fmt.Errorf("ValidateAge Error: %s", err.Error()), Ctx: ctx} + } + var status string + if data["age"] == "18" { + status = "pass" + } else { + status = "default" + } + updatedPayload, _ := json.Marshal(data) + return mq.Result{Payload: updatedPayload, Ctx: ctx, ConditionStatus: status} +} + +type ValidateGender struct { + dag.Operation +} + +func (p *ValidateGender) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]any + if err := json.Unmarshal(task.Payload, &data); err != nil { + return mq.Result{Error: fmt.Errorf("ValidateGender Error: %s", err.Error()), Ctx: ctx} + } + data["female_voter"] = data["gender"] == "female" + updatedPayload, _ := json.Marshal(data) + return mq.Result{Payload: updatedPayload, Ctx: ctx} +} + +type Final struct { + dag.Operation +} + +func (p *Final) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data []map[string]any + if err := json.Unmarshal(task.Payload, &data); err != nil { + return mq.Result{Error: fmt.Errorf("Final Error: %s", err.Error()), Ctx: ctx} + } + for i, row := range data { + row["done"] = true + data[i] = row + } + updatedPayload, err := json.Marshal(data) + if err != nil { + panic(err) + } + return mq.Result{Payload: updatedPayload, Ctx: ctx} +} diff --git a/examples/form.go b/examples/form.go new file mode 100644 index 0000000..b63b356 --- /dev/null +++ b/examples/form.go @@ -0,0 +1,160 @@ +package main + +import ( + "context" + "fmt" + + "github.com/oarkflow/json" + + "github.com/oarkflow/mq/dag" + + "github.com/oarkflow/jet" + + "github.com/oarkflow/mq" + "github.com/oarkflow/mq/consts" +) + +func main() { + flow := dag.NewDAG("Multi-Step Form", "multi-step-form", func(taskID string, result mq.Result) { + fmt.Printf("Final result for task %s: %s\n", taskID, string(result.Payload)) + }) + flow.AddNode(dag.Page, "Form Step1", "FormStep1", &FormStep1{}) + flow.AddNode(dag.Page, "Form Step2", "FormStep2", &FormStep2{}) + flow.AddNode(dag.Page, "Form Result", "FormResult", &FormResult{}) + + // Define edges + flow.AddEdge(dag.Simple, "Form Step1", "FormStep1", "FormStep2") + flow.AddEdge(dag.Simple, "Form Step2", "FormStep2", "FormResult") + + // Start the flow + if flow.Error != nil { + panic(flow.Error) + } + flow.Start(context.Background(), "0.0.0.0:8082") +} + +type FormStep1 struct { + dag.Operation +} + +func (p *FormStep1) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + bt := []byte(` + + + +
+ + + + + +
+ + + +
+ {{ if show_voting_controls }} + + + + {{ else }} +

You are not eligible to vote.

+ {{ end }} +
+ + +`) + parser := jet.NewWithMemory(jet.WithDelims("{{", "}}")) + inputData["task_id"] = ctx.Value("task_id") + rs, err := parser.ParseTemplate(string(bt), inputData) + if err != nil { + fmt.Println("FormStep2", inputData) + return mq.Result{Error: err, Ctx: ctx} + } + ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml) + inputData["html_content"] = rs + bt, _ = json.Marshal(inputData) + return mq.Result{Payload: bt, Ctx: ctx} +} + +type FormResult struct { + dag.Operation +} + +func (p *FormResult) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + // Load HTML template for results + bt := []byte(` + + + +

Form Summary

+

Name: {{ name }}

+

Age: {{ age }}

+{{ if register_vote }} +

You have registered to vote!

+{{ else }} +

You did not register to vote.

+{{ end }} + + + + +`) + var inputData map[string]any + if task.Payload != nil { + if err := json.Unmarshal(task.Payload, &inputData); err != nil { + return mq.Result{Error: err, Ctx: ctx} + } + } + if inputData != nil { + if isEligible, ok := inputData["register_vote"].(string); ok { + inputData["register_vote"] = isEligible + } else { + inputData["register_vote"] = false + } + } + parser := jet.NewWithMemory(jet.WithDelims("{{", "}}")) + rs, err := parser.ParseTemplate(string(bt), inputData) + if err != nil { + return mq.Result{Error: err, Ctx: ctx} + } + ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml) + inputData["html_content"] = rs + bt, _ = json.Marshal(inputData) + return mq.Result{Payload: bt, Ctx: ctx} +} diff --git a/examples/tasks/operations.go b/examples/tasks/operations.go new file mode 100644 index 0000000..9f1513e --- /dev/null +++ b/examples/tasks/operations.go @@ -0,0 +1,135 @@ +package tasks + +import ( + "context" + + "github.com/oarkflow/json" + + v2 "github.com/oarkflow/mq/dag" + + "github.com/oarkflow/mq" +) + +type GetData struct { + v2.Operation +} + +func (e *GetData) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + return mq.Result{Payload: task.Payload, Ctx: ctx} +} + +type Loop struct { + v2.Operation +} + +func (e *Loop) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + return mq.Result{Payload: task.Payload, Ctx: ctx} +} + +type Condition struct { + v2.Operation +} + +func (e *Condition) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]any + err := json.Unmarshal(task.Payload, &data) + if err != nil { + panic(err) + } + switch email := data["email"].(type) { + case string: + if email == "abc.xyz@gmail.com" { + return mq.Result{Payload: task.Payload, ConditionStatus: "pass", Ctx: ctx} + } + return mq.Result{Payload: task.Payload, ConditionStatus: "fail", Ctx: ctx} + default: + return mq.Result{Payload: task.Payload, ConditionStatus: "fail", Ctx: ctx} + } +} + +type PrepareEmail struct { + v2.Operation +} + +func (e *PrepareEmail) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]any + err := json.Unmarshal(task.Payload, &data) + if err != nil { + panic(err) + } + data["email_valid"] = true + d, _ := json.Marshal(data) + return mq.Result{Payload: d, Ctx: ctx} +} + +type EmailDelivery struct { + v2.Operation +} + +func (e *EmailDelivery) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]any + err := json.Unmarshal(task.Payload, &data) + if err != nil { + panic(err) + } + data["email_sent"] = true + d, _ := json.Marshal(data) + return mq.Result{Payload: d, Ctx: ctx} +} + +type SendSms struct { + v2.Operation +} + +func (e *SendSms) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]any + err := json.Unmarshal(task.Payload, &data) + if err != nil { + panic(err) + } + data["sms_sent"] = true + d, _ := json.Marshal(data) + return mq.Result{Payload: d, Ctx: ctx} +} + +type StoreData struct { + v2.Operation +} + +func (e *StoreData) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]any + err := json.Unmarshal(task.Payload, &data) + if err != nil { + panic(err) + } + data["stored"] = true + d, _ := json.Marshal(data) + return mq.Result{Payload: d, Ctx: ctx} +} + +type InAppNotification struct { + v2.Operation +} + +func (e *InAppNotification) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]any + err := json.Unmarshal(task.Payload, &data) + if err != nil { + panic(err) + } + data["notified"] = true + d, _ := json.Marshal(data) + return mq.Result{Payload: d, Ctx: ctx} +} + +type Final struct { + v2.Operation +} + +func (e *Final) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + rs := map[string]any{ + "html_content": `Processed successfully!`, + } + bt, _ := json.Marshal(rs) + return mq.Result{Payload: bt, Ctx: ctx} +} diff --git a/examples/tasks/scheduler.go b/examples/tasks/scheduler.go new file mode 100644 index 0000000..f913926 --- /dev/null +++ b/examples/tasks/scheduler.go @@ -0,0 +1,20 @@ +package tasks + +import ( + "context" + "fmt" + + "github.com/oarkflow/mq" +) + +func SchedulerHandler(ctx context.Context, task *mq.Task) mq.Result { + fmt.Printf("Processing task: %s\n", task.ID) + return mq.Result{Error: nil} +} + +func SchedulerCallback(ctx context.Context, result mq.Result) error { + if result.Error != nil { + fmt.Println("Task failed!", result.Error.Error()) + } + return nil +} diff --git a/examples/tasks/tasks.go b/examples/tasks/tasks.go new file mode 100644 index 0000000..aac868f --- /dev/null +++ b/examples/tasks/tasks.go @@ -0,0 +1,110 @@ +package tasks + +import ( + "context" + "fmt" + "log" + + "github.com/oarkflow/json" + + v2 "github.com/oarkflow/mq/dag" + + "github.com/oarkflow/mq" +) + +type Node1 struct{ v2.Operation } + +func (t *Node1) ProcessTask(_ context.Context, task *mq.Task) mq.Result { + fmt.Println("Node 1", string(task.Payload)) + return mq.Result{Payload: task.Payload, TaskID: task.ID} +} + +type Node2 struct{ v2.Operation } + +func (t *Node2) ProcessTask(_ context.Context, task *mq.Task) mq.Result { + fmt.Println("Node 2", string(task.Payload)) + return mq.Result{Payload: task.Payload, TaskID: task.ID} +} + +type Node3 struct{ v2.Operation } + +func (t *Node3) ProcessTask(_ context.Context, task *mq.Task) mq.Result { + var user map[string]any + fmt.Println(string(task.Payload)) + err := json.Unmarshal(task.Payload, &user) + if err != nil { + panic(err) + } + age := int(user["age"].(float64)) + status := "FAIL" + if age > 20 { + status = "PASS" + } + user["status"] = status + resultPayload, _ := json.Marshal(user) + return mq.Result{Payload: resultPayload, ConditionStatus: status} +} + +type Node4 struct{ v2.Operation } + +func (t *Node4) ProcessTask(_ context.Context, task *mq.Task) mq.Result { + var user map[string]any + _ = json.Unmarshal(task.Payload, &user) + user["node"] = "D" + resultPayload, _ := json.Marshal(user) + return mq.Result{Payload: resultPayload} +} + +type Node5 struct{ v2.Operation } + +func (t *Node5) ProcessTask(_ context.Context, task *mq.Task) mq.Result { + var user map[string]any + _ = json.Unmarshal(task.Payload, &user) + user["node"] = "E" + resultPayload, _ := json.Marshal(user) + return mq.Result{Payload: resultPayload} +} + +type Node6 struct{ v2.Operation } + +func (t *Node6) ProcessTask(_ context.Context, task *mq.Task) mq.Result { + var user map[string]any + _ = json.Unmarshal(task.Payload, &user) + resultPayload, _ := json.Marshal(map[string]any{"storage": user}) + return mq.Result{Payload: resultPayload} +} + +type Node7 struct{ v2.Operation } + +func (t *Node7) ProcessTask(_ context.Context, task *mq.Task) mq.Result { + var user map[string]any + _ = json.Unmarshal(task.Payload, &user) + user["node"] = "G" + resultPayload, _ := json.Marshal(user) + return mq.Result{Payload: resultPayload} +} + +type Node8 struct{ v2.Operation } + +func (t *Node8) ProcessTask(_ context.Context, task *mq.Task) mq.Result { + var user map[string]any + _ = json.Unmarshal(task.Payload, &user) + user["node"] = "H" + resultPayload, _ := json.Marshal(user) + return mq.Result{Payload: resultPayload} +} + +func Callback(_ context.Context, task mq.Result) mq.Result { + fmt.Println("Received task", task.TaskID, "Payload", string(task.Payload), task.Error, task.Topic) + return mq.Result{} +} + +func NotifyResponse(_ context.Context, result mq.Result) error { + log.Printf("DAG - FINAL_RESPONSE ~> TaskID: %s, Payload: %s, Topic: %s, Error: %v, Latency: %s", result.TaskID, result.Payload, result.Topic, result.Error, result.Latency) + return nil +} + +func NotifySubDAGResponse(_ context.Context, result mq.Result) error { + log.Printf("SUB DAG - FINAL_RESPONSE ~> TaskID: %s, Payload: %s, Topic: %s, Error: %v, Latency: %s", result.TaskID, result.Payload, result.Topic, result.Error, result.Latency) + return nil +} diff --git a/handlers/data_handler.go b/handlers/data_handler.go index f5b61e3..60823e7 100644 --- a/handlers/data_handler.go +++ b/handlers/data_handler.go @@ -736,6 +736,6 @@ func (h *DataHandler) getUnpivotFields() []string { func NewDataHandler(id string) *DataHandler { return &DataHandler{ - Operation: dag.Operation{ID: id, Key: "data", Type: dag.Function, Tags: []string{"data", "transformation", "misc"}}, + Operation: dag.Operation{ID: id, Key: "data:transform", Type: dag.Function, Tags: []string{"data", "transformation", "misc"}}, } } diff --git a/handlers/init.go b/handlers/init.go index ea5954a..c9b3561 100644 --- a/handlers/init.go +++ b/handlers/init.go @@ -6,10 +6,21 @@ import ( ) func Init() { + // Basic handlers dag.AddHandler("start", func(id string) mq.Processor { return NewStartHandler(id) }) dag.AddHandler("loop", func(id string) mq.Processor { return NewLoop(id) }) dag.AddHandler("condition", func(id string) mq.Processor { return NewCondition(id) }) dag.AddHandler("print", func(id string) mq.Processor { return NewPrintHandler(id) }) dag.AddHandler("render", func(id string) mq.Processor { return NewRenderHTMLNode(id) }) dag.AddHandler("log", func(id string) mq.Processor { return NewLogHandler(id) }) + + // Data transformation handlers + dag.AddHandler("data:transform", func(id string) mq.Processor { return NewDataHandler(id) }) + dag.AddHandler("field", func(id string) mq.Processor { return NewFieldHandler(id) }) + dag.AddHandler("format", func(id string) mq.Processor { return NewFormatHandler(id) }) + dag.AddHandler("group", func(id string) mq.Processor { return NewGroupHandler(id) }) + dag.AddHandler("flatten", func(id string) mq.Processor { return NewFlattenHandler(id) }) + dag.AddHandler("output", func(id string) mq.Processor { return NewOutputHandler(id) }) + dag.AddHandler("split", func(id string) mq.Processor { return NewSplitHandler(id) }) + dag.AddHandler("join", func(id string) mq.Processor { return NewJoinHandler(id) }) }