feat: update

This commit is contained in:
sujit
2025-09-17 06:38:56 +05:45
parent 06a935ce2f
commit 35fba839ed
9 changed files with 668 additions and 7 deletions

View File

@@ -152,12 +152,15 @@ func (e *Operation) ValidateFields(c context.Context, payload []byte) (map[strin
var data map[string]any var data map[string]any
err := json.Unmarshal(payload, &data) err := json.Unmarshal(payload, &data)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("failed to unmarshal payload: %w", err)
} }
for k, v := range e.Payload.Mapping { for k, v := range e.Payload.Mapping {
_, val := GetVal(c, v, data) _, val := GetVal(c, v, data)
if val != nil { if val != nil {
keys = append(keys, k) keys = append(keys, k)
} else {
// Log missing mapping
fmt.Printf("Warning: Mapping key %s not found for %s\n", k, v)
} }
} }
for k := range e.Payload.Data { for k := range e.Payload.Data {
@@ -165,7 +168,7 @@ func (e *Operation) ValidateFields(c context.Context, payload []byte) (map[strin
} }
for _, k := range e.RequiredFields { for _, k := range e.RequiredFields {
if !slices.Contains(keys, k) { if !slices.Contains(keys, k) {
return nil, errors.New("Required field doesn't exist") return nil, fmt.Errorf("required field '%s' is missing or could not be mapped", k)
} }
} }
return data, nil return data, nil
@@ -232,6 +235,13 @@ func GetVal(c context.Context, v string, data map[string]any) (key string, val a
} }
} }
// Log warning if value is nil and not expected
if val == nil && key != "" {
// Assuming logger is available, but since it's not in this file, perhaps add a field or use fmt
// For now, use fmt.Printf for warning
fmt.Printf("Warning: Value not found for key %s in mapping %s\n", key, v)
}
return return
} }

View File

@@ -4,13 +4,13 @@ import (
"context" "context"
"fmt" "fmt"
"log" "log"
"math/rand" // ...new import for jitter...
"strings" "strings"
"sync" "sync"
"time" "time"
"math/rand" // ...new import for jitter...
"github.com/oarkflow/json" "github.com/oarkflow/json"
"github.com/oarkflow/mq" "github.com/oarkflow/mq"
"github.com/oarkflow/mq/logger" "github.com/oarkflow/mq/logger"
"github.com/oarkflow/mq/storage" "github.com/oarkflow/mq/storage"
@@ -191,6 +191,80 @@ func (tm *TaskManager) waitForResult() {
} }
} }
// areDependenciesMet checks if all previous nodes have completed successfully
func (tm *TaskManager) areDependenciesMet(nodeID string) bool {
pureNodeID := strings.Split(nodeID, Delimiter)[0]
// Get previous nodes
prevNodes, err := tm.dag.GetPreviousNodes(pureNodeID)
if err != nil {
tm.dag.Logger().Error("Error getting previous nodes", logger.Field{Key: "nodeID", Value: nodeID}, logger.Field{Key: "error", Value: err.Error()})
return false
}
// For iterator nodes, we need to be more selective about dependencies
// Iterator nodes should only depend on nodes that provide data to them,
// not on nodes that they create (which would be circular dependencies)
node, exists := tm.dag.nodes.Get(pureNodeID)
if exists {
// Check if this node has any iterator edges (meaning it's an iterator node)
hasIteratorEdges := false
for _, edge := range node.Edges {
if edge.Type == Iterator {
hasIteratorEdges = true
break
}
}
if hasIteratorEdges {
// For iterator nodes, only check dependencies from Simple edges
// Iterator edges represent outputs, not inputs
filteredPrevNodes := make([]*Node, 0)
for _, prevNode := range prevNodes {
// Check if there's a Simple edge from prevNode to this node
hasSimpleEdge := false
for _, edge := range prevNode.Edges {
if edge.To.ID == pureNodeID && edge.Type == Simple {
hasSimpleEdge = true
break
}
}
if hasSimpleEdge {
filteredPrevNodes = append(filteredPrevNodes, prevNode)
}
}
prevNodes = filteredPrevNodes
}
}
// Check if all relevant previous nodes have completed successfully
for _, prevNode := range prevNodes {
// Check both the pure node ID and the indexed node ID for state
state, exists := tm.taskStates.Get(prevNode.ID)
if !exists {
// Also check if there's a state with an index suffix
tm.taskStates.ForEach(func(key string, s *TaskState) bool {
if strings.Split(key, Delimiter)[0] == prevNode.ID {
state = s
exists = true
return false // Stop iteration
}
return true
})
}
if !exists || state.Status != mq.Completed {
tm.dag.Logger().Debug("Dependency not met",
logger.Field{Key: "nodeID", Value: nodeID},
logger.Field{Key: "dependency", Value: prevNode.ID},
logger.Field{Key: "stateExists", Value: exists},
logger.Field{Key: "stateStatus", Value: string(state.Status)})
return false
}
}
return true
}
func (tm *TaskManager) processNode(exec *task) { func (tm *TaskManager) processNode(exec *task) {
startTime := time.Now() startTime := time.Now()
pureNodeID := strings.Split(exec.nodeID, Delimiter)[0] pureNodeID := strings.Split(exec.nodeID, Delimiter)[0]
@@ -199,6 +273,15 @@ func (tm *TaskManager) processNode(exec *task) {
tm.dag.Logger().Error("Node not found", logger.Field{Key: "nodeID", Value: pureNodeID}) tm.dag.Logger().Error("Node not found", logger.Field{Key: "nodeID", Value: pureNodeID})
return return
} }
// Check if all dependencies are met before processing
if !tm.areDependenciesMet(pureNodeID) {
tm.dag.Logger().Warn("Dependencies not met for node, deferring", logger.Field{Key: "nodeID", Value: pureNodeID})
// Defer the task
tm.deferredTasks.Set(exec.taskID, exec)
return
}
// Wrap context with timeout if node.Timeout is configured. // Wrap context with timeout if node.Timeout is configured.
if node.Timeout > 0 { if node.Timeout > 0 {
var cancel context.CancelFunc var cancel context.CancelFunc
@@ -240,11 +323,15 @@ func (tm *TaskManager) processNode(exec *task) {
// add jitter to avoid thundering herd // add jitter to avoid thundering herd
jitter := time.Duration(rand.Int63n(int64(tm.baseBackoff))) jitter := time.Duration(rand.Int63n(int64(tm.baseBackoff)))
backoff += jitter backoff += jitter
log.Printf("Recoverable error on node %s, retrying in %s: %v", exec.nodeID, backoff, result.Error) tm.dag.Logger().Warn("Recoverable error on node, retrying",
logger.Field{Key: "nodeID", Value: exec.nodeID},
logger.Field{Key: "attempt", Value: attempts},
logger.Field{Key: "backoff", Value: backoff.String()},
logger.Field{Key: "error", Value: result.Error.Error()})
select { select {
case <-time.After(backoff): case <-time.After(backoff):
case <-exec.ctx.Done(): case <-exec.ctx.Done():
log.Printf("Context cancelled for node %s", exec.nodeID) tm.dag.Logger().Warn("Context cancelled for node", logger.Field{Key: "nodeID", Value: exec.nodeID})
return return
} }
continue continue
@@ -252,8 +339,13 @@ func (tm *TaskManager) processNode(exec *task) {
if err := tm.recoveryHandler(exec.ctx, result); err == nil { if err := tm.recoveryHandler(exec.ctx, result); err == nil {
result.Error = nil result.Error = nil
result.Status = mq.Completed result.Status = mq.Completed
} else {
result.Error = fmt.Errorf("recovery failed for node %s: %w", exec.nodeID, err)
} }
} }
} else {
// Wrap non-recoverable errors with context
result.Error = fmt.Errorf("node %s failed: %w", exec.nodeID, result.Error)
} }
} }
break break
@@ -284,6 +376,7 @@ func (tm *TaskManager) processNode(exec *task) {
result.Status = mq.Completed result.Status = mq.Completed
state.Result = result state.Result = result
state.Result.Status = mq.Completed state.Result.Status = mq.Completed
state.Status = mq.Completed
state.Result.Latency = nodeLatency.String() state.Result.Latency = nodeLatency.String()
result.Topic = node.ID result.Topic = node.ID
tm.updateTimestamps(&result) tm.updateTimestamps(&result)

122
examples/dag.go Normal file
View File

@@ -0,0 +1,122 @@
package main
import (
"context"
"fmt"
"github.com/oarkflow/json"
"github.com/oarkflow/mq"
"github.com/oarkflow/mq/dag"
"github.com/oarkflow/mq/examples/tasks"
)
func subDAG() *dag.DAG {
f := dag.NewDAG("Sub DAG", "sub-dag", func(taskID string, result mq.Result) {
fmt.Printf("Sub DAG Final result for task %s: %s\n", taskID, string(result.Payload))
}, mq.WithSyncMode(true))
f.
AddNode(dag.Function, "Store data", "store:data", &tasks.StoreData{Operation: dag.Operation{Type: dag.Function}}, true).
AddNode(dag.Function, "Send SMS", "send:sms", &tasks.SendSms{Operation: dag.Operation{Type: dag.Function}}).
AddNode(dag.Function, "Notification", "notification", &tasks.InAppNotification{Operation: dag.Operation{Type: dag.Function}}).
AddEdge(dag.Simple, "Store Payload to send sms", "store:data", "send:sms").
AddEdge(dag.Simple, "Store Payload to notification", "send:sms", "notification")
return f
}
func main() {
flow := dag.NewDAG("Sample DAG", "sample-dag", func(taskID string, result mq.Result) {
fmt.Printf("DAG Final result for task %s: %s\n", taskID, string(result.Payload))
})
flow.AddNode(dag.Function, "GetData", "GetData", &GetData{}, true)
flow.AddNode(dag.Function, "Loop", "Loop", &Loop{})
flow.AddNode(dag.Function, "ValidateAge", "ValidateAge", &ValidateAge{})
flow.AddNode(dag.Function, "ValidateGender", "ValidateGender", &ValidateGender{})
flow.AddNode(dag.Function, "Final", "Final", &Final{})
flow.AddDAGNode(dag.Function, "Check", "persistent", subDAG())
flow.AddEdge(dag.Simple, "GetData", "GetData", "Loop")
flow.AddEdge(dag.Iterator, "Validate age for each item", "Loop", "ValidateAge")
flow.AddCondition("ValidateAge", map[string]string{"pass": "ValidateGender", "default": "persistent"})
flow.AddEdge(dag.Simple, "Mark as Done", "Loop", "Final")
// flow.Start(":8080")
data := []byte(`[{"age": "15", "gender": "female"}, {"age": "18", "gender": "male"}]`)
if flow.Error != nil {
panic(flow.Error)
}
rs := flow.Process(context.Background(), data)
if rs.Error != nil {
panic(rs.Error)
}
fmt.Println(rs.Status, rs.Topic, string(rs.Payload))
}
type GetData struct {
dag.Operation
}
func (p *GetData) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
return mq.Result{Ctx: ctx, Payload: task.Payload}
}
type Loop struct {
dag.Operation
}
func (p *Loop) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
return mq.Result{Ctx: ctx, Payload: task.Payload}
}
type ValidateAge struct {
dag.Operation
}
func (p *ValidateAge) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
var data map[string]any
if err := json.Unmarshal(task.Payload, &data); err != nil {
return mq.Result{Error: fmt.Errorf("ValidateAge Error: %s", err.Error()), Ctx: ctx}
}
var status string
if data["age"] == "18" {
status = "pass"
} else {
status = "default"
}
updatedPayload, _ := json.Marshal(data)
return mq.Result{Payload: updatedPayload, Ctx: ctx, ConditionStatus: status}
}
type ValidateGender struct {
dag.Operation
}
func (p *ValidateGender) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
var data map[string]any
if err := json.Unmarshal(task.Payload, &data); err != nil {
return mq.Result{Error: fmt.Errorf("ValidateGender Error: %s", err.Error()), Ctx: ctx}
}
data["female_voter"] = data["gender"] == "female"
updatedPayload, _ := json.Marshal(data)
return mq.Result{Payload: updatedPayload, Ctx: ctx}
}
type Final struct {
dag.Operation
}
func (p *Final) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
var data []map[string]any
if err := json.Unmarshal(task.Payload, &data); err != nil {
return mq.Result{Error: fmt.Errorf("Final Error: %s", err.Error()), Ctx: ctx}
}
for i, row := range data {
row["done"] = true
data[i] = row
}
updatedPayload, err := json.Marshal(data)
if err != nil {
panic(err)
}
return mq.Result{Payload: updatedPayload, Ctx: ctx}
}

160
examples/form.go Normal file
View File

@@ -0,0 +1,160 @@
package main
import (
"context"
"fmt"
"github.com/oarkflow/json"
"github.com/oarkflow/mq/dag"
"github.com/oarkflow/jet"
"github.com/oarkflow/mq"
"github.com/oarkflow/mq/consts"
)
func main() {
flow := dag.NewDAG("Multi-Step Form", "multi-step-form", func(taskID string, result mq.Result) {
fmt.Printf("Final result for task %s: %s\n", taskID, string(result.Payload))
})
flow.AddNode(dag.Page, "Form Step1", "FormStep1", &FormStep1{})
flow.AddNode(dag.Page, "Form Step2", "FormStep2", &FormStep2{})
flow.AddNode(dag.Page, "Form Result", "FormResult", &FormResult{})
// Define edges
flow.AddEdge(dag.Simple, "Form Step1", "FormStep1", "FormStep2")
flow.AddEdge(dag.Simple, "Form Step2", "FormStep2", "FormResult")
// Start the flow
if flow.Error != nil {
panic(flow.Error)
}
flow.Start(context.Background(), "0.0.0.0:8082")
}
type FormStep1 struct {
dag.Operation
}
func (p *FormStep1) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
bt := []byte(`
<html>
<body>
<form method="post" action="/process?task_id={{task_id}}&next=true">
<label>Name:</label>
<input type="text" name="name" required>
<label>Age:</label>
<input type="number" name="age" required>
<button type="submit">Next</button>
</form>
</body
</html
`)
parser := jet.NewWithMemory(jet.WithDelims("{{", "}}"))
rs, err := parser.ParseTemplate(string(bt), map[string]any{
"task_id": ctx.Value("task_id"),
})
if err != nil {
fmt.Println("FormStep1", string(task.Payload))
return mq.Result{Error: err, Ctx: ctx}
}
ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml)
data := map[string]any{
"html_content": rs,
}
bt, _ = json.Marshal(data)
return mq.Result{Payload: bt, Ctx: ctx}
}
type FormStep2 struct {
dag.Operation
}
func (p *FormStep2) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
// Parse input from Step 1
var inputData map[string]any
if err := json.Unmarshal(task.Payload, &inputData); err != nil {
return mq.Result{Error: err, Ctx: ctx}
}
// Determine dynamic content
isEligible := inputData["age"] == "18"
inputData["show_voting_controls"] = isEligible
bt := []byte(`
<html>
<body>
<form method="post" action="/process?task_id={{task_id}}&next=true">
{{ if show_voting_controls }}
<label>Do you want to register to vote?</label>
<input type="checkbox" name="register_vote">
<button type="submit">Next</button>
{{ else }}
<p>You are not eligible to vote.</p>
{{ end }}
</form>
</body>
</html>
`)
parser := jet.NewWithMemory(jet.WithDelims("{{", "}}"))
inputData["task_id"] = ctx.Value("task_id")
rs, err := parser.ParseTemplate(string(bt), inputData)
if err != nil {
fmt.Println("FormStep2", inputData)
return mq.Result{Error: err, Ctx: ctx}
}
ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml)
inputData["html_content"] = rs
bt, _ = json.Marshal(inputData)
return mq.Result{Payload: bt, Ctx: ctx}
}
type FormResult struct {
dag.Operation
}
func (p *FormResult) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
// Load HTML template for results
bt := []byte(`
<html>
<body>
<h1>Form Summary</h1>
<p>Name: {{ name }}</p>
<p>Age: {{ age }}</p>
{{ if register_vote }}
<p>You have registered to vote!</p>
{{ else }}
<p>You did not register to vote.</p>
{{ end }}
</body>
</html>
`)
var inputData map[string]any
if task.Payload != nil {
if err := json.Unmarshal(task.Payload, &inputData); err != nil {
return mq.Result{Error: err, Ctx: ctx}
}
}
if inputData != nil {
if isEligible, ok := inputData["register_vote"].(string); ok {
inputData["register_vote"] = isEligible
} else {
inputData["register_vote"] = false
}
}
parser := jet.NewWithMemory(jet.WithDelims("{{", "}}"))
rs, err := parser.ParseTemplate(string(bt), inputData)
if err != nil {
return mq.Result{Error: err, Ctx: ctx}
}
ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml)
inputData["html_content"] = rs
bt, _ = json.Marshal(inputData)
return mq.Result{Payload: bt, Ctx: ctx}
}

View File

@@ -0,0 +1,135 @@
package tasks
import (
"context"
"github.com/oarkflow/json"
v2 "github.com/oarkflow/mq/dag"
"github.com/oarkflow/mq"
)
type GetData struct {
v2.Operation
}
func (e *GetData) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
return mq.Result{Payload: task.Payload, Ctx: ctx}
}
type Loop struct {
v2.Operation
}
func (e *Loop) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
return mq.Result{Payload: task.Payload, Ctx: ctx}
}
type Condition struct {
v2.Operation
}
func (e *Condition) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
var data map[string]any
err := json.Unmarshal(task.Payload, &data)
if err != nil {
panic(err)
}
switch email := data["email"].(type) {
case string:
if email == "abc.xyz@gmail.com" {
return mq.Result{Payload: task.Payload, ConditionStatus: "pass", Ctx: ctx}
}
return mq.Result{Payload: task.Payload, ConditionStatus: "fail", Ctx: ctx}
default:
return mq.Result{Payload: task.Payload, ConditionStatus: "fail", Ctx: ctx}
}
}
type PrepareEmail struct {
v2.Operation
}
func (e *PrepareEmail) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
var data map[string]any
err := json.Unmarshal(task.Payload, &data)
if err != nil {
panic(err)
}
data["email_valid"] = true
d, _ := json.Marshal(data)
return mq.Result{Payload: d, Ctx: ctx}
}
type EmailDelivery struct {
v2.Operation
}
func (e *EmailDelivery) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
var data map[string]any
err := json.Unmarshal(task.Payload, &data)
if err != nil {
panic(err)
}
data["email_sent"] = true
d, _ := json.Marshal(data)
return mq.Result{Payload: d, Ctx: ctx}
}
type SendSms struct {
v2.Operation
}
func (e *SendSms) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
var data map[string]any
err := json.Unmarshal(task.Payload, &data)
if err != nil {
panic(err)
}
data["sms_sent"] = true
d, _ := json.Marshal(data)
return mq.Result{Payload: d, Ctx: ctx}
}
type StoreData struct {
v2.Operation
}
func (e *StoreData) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
var data map[string]any
err := json.Unmarshal(task.Payload, &data)
if err != nil {
panic(err)
}
data["stored"] = true
d, _ := json.Marshal(data)
return mq.Result{Payload: d, Ctx: ctx}
}
type InAppNotification struct {
v2.Operation
}
func (e *InAppNotification) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
var data map[string]any
err := json.Unmarshal(task.Payload, &data)
if err != nil {
panic(err)
}
data["notified"] = true
d, _ := json.Marshal(data)
return mq.Result{Payload: d, Ctx: ctx}
}
type Final struct {
v2.Operation
}
func (e *Final) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
rs := map[string]any{
"html_content": `<strong>Processed successfully!</strong>`,
}
bt, _ := json.Marshal(rs)
return mq.Result{Payload: bt, Ctx: ctx}
}

View File

@@ -0,0 +1,20 @@
package tasks
import (
"context"
"fmt"
"github.com/oarkflow/mq"
)
func SchedulerHandler(ctx context.Context, task *mq.Task) mq.Result {
fmt.Printf("Processing task: %s\n", task.ID)
return mq.Result{Error: nil}
}
func SchedulerCallback(ctx context.Context, result mq.Result) error {
if result.Error != nil {
fmt.Println("Task failed!", result.Error.Error())
}
return nil
}

110
examples/tasks/tasks.go Normal file
View File

@@ -0,0 +1,110 @@
package tasks
import (
"context"
"fmt"
"log"
"github.com/oarkflow/json"
v2 "github.com/oarkflow/mq/dag"
"github.com/oarkflow/mq"
)
type Node1 struct{ v2.Operation }
func (t *Node1) ProcessTask(_ context.Context, task *mq.Task) mq.Result {
fmt.Println("Node 1", string(task.Payload))
return mq.Result{Payload: task.Payload, TaskID: task.ID}
}
type Node2 struct{ v2.Operation }
func (t *Node2) ProcessTask(_ context.Context, task *mq.Task) mq.Result {
fmt.Println("Node 2", string(task.Payload))
return mq.Result{Payload: task.Payload, TaskID: task.ID}
}
type Node3 struct{ v2.Operation }
func (t *Node3) ProcessTask(_ context.Context, task *mq.Task) mq.Result {
var user map[string]any
fmt.Println(string(task.Payload))
err := json.Unmarshal(task.Payload, &user)
if err != nil {
panic(err)
}
age := int(user["age"].(float64))
status := "FAIL"
if age > 20 {
status = "PASS"
}
user["status"] = status
resultPayload, _ := json.Marshal(user)
return mq.Result{Payload: resultPayload, ConditionStatus: status}
}
type Node4 struct{ v2.Operation }
func (t *Node4) ProcessTask(_ context.Context, task *mq.Task) mq.Result {
var user map[string]any
_ = json.Unmarshal(task.Payload, &user)
user["node"] = "D"
resultPayload, _ := json.Marshal(user)
return mq.Result{Payload: resultPayload}
}
type Node5 struct{ v2.Operation }
func (t *Node5) ProcessTask(_ context.Context, task *mq.Task) mq.Result {
var user map[string]any
_ = json.Unmarshal(task.Payload, &user)
user["node"] = "E"
resultPayload, _ := json.Marshal(user)
return mq.Result{Payload: resultPayload}
}
type Node6 struct{ v2.Operation }
func (t *Node6) ProcessTask(_ context.Context, task *mq.Task) mq.Result {
var user map[string]any
_ = json.Unmarshal(task.Payload, &user)
resultPayload, _ := json.Marshal(map[string]any{"storage": user})
return mq.Result{Payload: resultPayload}
}
type Node7 struct{ v2.Operation }
func (t *Node7) ProcessTask(_ context.Context, task *mq.Task) mq.Result {
var user map[string]any
_ = json.Unmarshal(task.Payload, &user)
user["node"] = "G"
resultPayload, _ := json.Marshal(user)
return mq.Result{Payload: resultPayload}
}
type Node8 struct{ v2.Operation }
func (t *Node8) ProcessTask(_ context.Context, task *mq.Task) mq.Result {
var user map[string]any
_ = json.Unmarshal(task.Payload, &user)
user["node"] = "H"
resultPayload, _ := json.Marshal(user)
return mq.Result{Payload: resultPayload}
}
func Callback(_ context.Context, task mq.Result) mq.Result {
fmt.Println("Received task", task.TaskID, "Payload", string(task.Payload), task.Error, task.Topic)
return mq.Result{}
}
func NotifyResponse(_ context.Context, result mq.Result) error {
log.Printf("DAG - FINAL_RESPONSE ~> TaskID: %s, Payload: %s, Topic: %s, Error: %v, Latency: %s", result.TaskID, result.Payload, result.Topic, result.Error, result.Latency)
return nil
}
func NotifySubDAGResponse(_ context.Context, result mq.Result) error {
log.Printf("SUB DAG - FINAL_RESPONSE ~> TaskID: %s, Payload: %s, Topic: %s, Error: %v, Latency: %s", result.TaskID, result.Payload, result.Topic, result.Error, result.Latency)
return nil
}

View File

@@ -736,6 +736,6 @@ func (h *DataHandler) getUnpivotFields() []string {
func NewDataHandler(id string) *DataHandler { func NewDataHandler(id string) *DataHandler {
return &DataHandler{ return &DataHandler{
Operation: dag.Operation{ID: id, Key: "data", Type: dag.Function, Tags: []string{"data", "transformation", "misc"}}, Operation: dag.Operation{ID: id, Key: "data:transform", Type: dag.Function, Tags: []string{"data", "transformation", "misc"}},
} }
} }

View File

@@ -6,10 +6,21 @@ import (
) )
func Init() { func Init() {
// Basic handlers
dag.AddHandler("start", func(id string) mq.Processor { return NewStartHandler(id) }) dag.AddHandler("start", func(id string) mq.Processor { return NewStartHandler(id) })
dag.AddHandler("loop", func(id string) mq.Processor { return NewLoop(id) }) dag.AddHandler("loop", func(id string) mq.Processor { return NewLoop(id) })
dag.AddHandler("condition", func(id string) mq.Processor { return NewCondition(id) }) dag.AddHandler("condition", func(id string) mq.Processor { return NewCondition(id) })
dag.AddHandler("print", func(id string) mq.Processor { return NewPrintHandler(id) }) dag.AddHandler("print", func(id string) mq.Processor { return NewPrintHandler(id) })
dag.AddHandler("render", func(id string) mq.Processor { return NewRenderHTMLNode(id) }) dag.AddHandler("render", func(id string) mq.Processor { return NewRenderHTMLNode(id) })
dag.AddHandler("log", func(id string) mq.Processor { return NewLogHandler(id) }) dag.AddHandler("log", func(id string) mq.Processor { return NewLogHandler(id) })
// Data transformation handlers
dag.AddHandler("data:transform", func(id string) mq.Processor { return NewDataHandler(id) })
dag.AddHandler("field", func(id string) mq.Processor { return NewFieldHandler(id) })
dag.AddHandler("format", func(id string) mq.Processor { return NewFormatHandler(id) })
dag.AddHandler("group", func(id string) mq.Processor { return NewGroupHandler(id) })
dag.AddHandler("flatten", func(id string) mq.Processor { return NewFlattenHandler(id) })
dag.AddHandler("output", func(id string) mq.Processor { return NewOutputHandler(id) })
dag.AddHandler("split", func(id string) mq.Processor { return NewSplitHandler(id) })
dag.AddHandler("join", func(id string) mq.Processor { return NewJoinHandler(id) })
} }