diff --git a/services/operation.go b/dag/operation.go similarity index 99% rename from services/operation.go rename to dag/operation.go index 45b0f0b..91da8c3 100644 --- a/services/operation.go +++ b/dag/operation.go @@ -1,4 +1,4 @@ -package services +package dag import ( "context" diff --git a/dag/operations.go b/dag/operations.go new file mode 100644 index 0000000..4ca8aac --- /dev/null +++ b/dag/operations.go @@ -0,0 +1,32 @@ +package dag + +import ( + "sync" + + "github.com/oarkflow/mq" +) + +type Operations struct { + mu *sync.RWMutex + Handlers map[string]func(string) mq.Processor +} + +var ops = &Operations{mu: &sync.RWMutex{}, Handlers: make(map[string]func(string) mq.Processor)} + +func AddHandler(key string, handler func(string) mq.Processor) { + ops.mu.Lock() + ops.Handlers[key] = handler + ops.mu.Unlock() +} + +func GetHandler(key string) func(string) mq.Processor { + return ops.Handlers[key] +} + +func AvailableHandlers() []string { + var op []string + for opt := range ops.Handlers { + op = append(op, opt) + } + return op +} diff --git a/dag/task_manager.go b/dag/task_manager.go index 70304ca..a91704e 100644 --- a/dag/task_manager.go +++ b/dag/task_manager.go @@ -92,6 +92,11 @@ func (tm *TaskManager) getConditionalEdges(node *Node, result mq.Result) []Edge } func (tm *TaskManager) handleNextTask(ctx context.Context, result mq.Result) mq.Result { + if result.Ctx != nil { + if headers, ok := mq.GetHeaders(ctx); ok { + ctx = mq.SetHeaders(result.Ctx, headers.AsMap()) + } + } defer func() { tm.wg.Done() mq.RecoverPanic(mq.RecoverTitle) diff --git a/examples/dag.go b/examples/dag.go index 16004f4..d5b4514 100644 --- a/examples/dag.go +++ b/examples/dag.go @@ -6,11 +6,9 @@ import ( "fmt" "log" - "github.com/oarkflow/mq/examples/tasks" - "github.com/oarkflow/mq/services" - "github.com/oarkflow/mq" "github.com/oarkflow/mq/dag" + "github.com/oarkflow/mq/examples/tasks" ) func main() { @@ -19,14 +17,14 @@ func main() { func setup(f *dag.DAG) { f. - AddNode("Email Delivery", "email:deliver", &tasks.EmailDelivery{Operation: services.Operation{Type: "process"}}). - AddNode("Prepare Email", "prepare:email", &tasks.PrepareEmail{Operation: services.Operation{Type: "process"}}). - AddNode("Get Input", "get:input", &tasks.GetData{Operation: services.Operation{Type: "input"}}, true). - AddNode("Iterator Processor", "loop", &tasks.Loop{Operation: services.Operation{Type: "loop"}}). - AddNode("Condition", "condition", &tasks.Condition{Operation: services.Operation{Type: "condition"}}). - AddNode("Store data", "store:data", &tasks.StoreData{Operation: services.Operation{Type: "process"}}). - AddNode("Send SMS", "send:sms", &tasks.SendSms{Operation: services.Operation{Type: "process"}}). - AddNode("Notification", "notification", &tasks.InAppNotification{Operation: services.Operation{Type: "process"}}). + AddNode("Email Delivery", "email:deliver", &tasks.EmailDelivery{Operation: dag.Operation{Type: "process"}}). + AddNode("Prepare Email", "prepare:email", &tasks.PrepareEmail{Operation: dag.Operation{Type: "process"}}). + AddNode("Get Input", "get:input", &tasks.GetData{Operation: dag.Operation{Type: "input"}}, true). + AddNode("Iterator Processor", "loop", &tasks.Loop{Operation: dag.Operation{Type: "loop"}}). + AddNode("Condition", "condition", &tasks.Condition{Operation: dag.Operation{Type: "condition"}}). + AddNode("Store data", "store:data", &tasks.StoreData{Operation: dag.Operation{Type: "process"}}). + AddNode("Send SMS", "send:sms", &tasks.SendSms{Operation: dag.Operation{Type: "process"}}). + AddNode("Notification", "notification", &tasks.InAppNotification{Operation: dag.Operation{Type: "process"}}). AddCondition("condition", map[dag.When]dag.Then{"pass": "email:deliver", "fail": "store:data"}). AddEdge("Get input to loop", "get:input", "loop"). AddIterator("Loop to prepare email", "loop", "prepare:email"). diff --git a/examples/tasks/operations.go b/examples/tasks/operations.go index acd3996..4713842 100644 --- a/examples/tasks/operations.go +++ b/examples/tasks/operations.go @@ -6,11 +6,11 @@ import ( "github.com/oarkflow/json" "github.com/oarkflow/mq" - "github.com/oarkflow/mq/services" + "github.com/oarkflow/mq/dag" ) type GetData struct { - services.Operation + dag.Operation } func (e *GetData) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { @@ -18,7 +18,7 @@ func (e *GetData) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { } type Loop struct { - services.Operation + dag.Operation } func (e *Loop) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { @@ -26,7 +26,7 @@ func (e *Loop) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { } type Condition struct { - services.Operation + dag.Operation } func (e *Condition) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { @@ -47,7 +47,7 @@ func (e *Condition) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { } type PrepareEmail struct { - services.Operation + dag.Operation } func (e *PrepareEmail) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { @@ -62,7 +62,7 @@ func (e *PrepareEmail) ProcessTask(ctx context.Context, task *mq.Task) mq.Result } type EmailDelivery struct { - services.Operation + dag.Operation } func (e *EmailDelivery) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { @@ -77,7 +77,7 @@ func (e *EmailDelivery) ProcessTask(ctx context.Context, task *mq.Task) mq.Resul } type SendSms struct { - services.Operation + dag.Operation } func (e *SendSms) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { @@ -90,7 +90,7 @@ func (e *SendSms) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { } type StoreData struct { - services.Operation + dag.Operation } func (e *StoreData) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { @@ -98,7 +98,7 @@ func (e *StoreData) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { } type InAppNotification struct { - services.Operation + dag.Operation } func (e *InAppNotification) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { diff --git a/examples/tasks/tasks.go b/examples/tasks/tasks.go index 52c9879..fd6c371 100644 --- a/examples/tasks/tasks.go +++ b/examples/tasks/tasks.go @@ -7,24 +7,24 @@ import ( "log" "github.com/oarkflow/mq" - "github.com/oarkflow/mq/services" + "github.com/oarkflow/mq/dag" ) -type Node1 struct{ services.Operation } +type Node1 struct{ dag.Operation } func (t *Node1) ProcessTask(_ context.Context, task *mq.Task) mq.Result { fmt.Println("Node 1", string(task.Payload)) return mq.Result{Payload: task.Payload, TaskID: task.ID} } -type Node2 struct{ services.Operation } +type Node2 struct{ dag.Operation } func (t *Node2) ProcessTask(_ context.Context, task *mq.Task) mq.Result { fmt.Println("Node 2", string(task.Payload)) return mq.Result{Payload: task.Payload, TaskID: task.ID} } -type Node3 struct{ services.Operation } +type Node3 struct{ dag.Operation } func (t *Node3) ProcessTask(_ context.Context, task *mq.Task) mq.Result { var user map[string]any @@ -39,7 +39,7 @@ func (t *Node3) ProcessTask(_ context.Context, task *mq.Task) mq.Result { return mq.Result{Payload: resultPayload, Status: status} } -type Node4 struct{ services.Operation } +type Node4 struct{ dag.Operation } func (t *Node4) ProcessTask(_ context.Context, task *mq.Task) mq.Result { var user map[string]any @@ -49,7 +49,7 @@ func (t *Node4) ProcessTask(_ context.Context, task *mq.Task) mq.Result { return mq.Result{Payload: resultPayload} } -type Node5 struct{ services.Operation } +type Node5 struct{ dag.Operation } func (t *Node5) ProcessTask(_ context.Context, task *mq.Task) mq.Result { var user map[string]any @@ -59,7 +59,7 @@ func (t *Node5) ProcessTask(_ context.Context, task *mq.Task) mq.Result { return mq.Result{Payload: resultPayload} } -type Node6 struct{ services.Operation } +type Node6 struct{ dag.Operation } func (t *Node6) ProcessTask(_ context.Context, task *mq.Task) mq.Result { var user map[string]any @@ -68,7 +68,7 @@ func (t *Node6) ProcessTask(_ context.Context, task *mq.Task) mq.Result { return mq.Result{Payload: resultPayload} } -type Node7 struct{ services.Operation } +type Node7 struct{ dag.Operation } func (t *Node7) ProcessTask(_ context.Context, task *mq.Task) mq.Result { var user map[string]any @@ -78,7 +78,7 @@ func (t *Node7) ProcessTask(_ context.Context, task *mq.Task) mq.Result { return mq.Result{Payload: resultPayload} } -type Node8 struct{ services.Operation } +type Node8 struct{ dag.Operation } func (t *Node8) ProcessTask(_ context.Context, task *mq.Task) mq.Result { var user map[string]any