feat: Add connection

This commit is contained in:
sujit
2024-10-28 08:38:36 +05:45
parent a41347786c
commit b5ca85a469
4 changed files with 18 additions and 19 deletions

View File

@@ -170,7 +170,7 @@ func (tm *DAG) callbackToConsumer(ctx context.Context, result mq.Result) {
func (tm *DAG) onTaskCallback(ctx context.Context, result mq.Result) mq.Result { func (tm *DAG) onTaskCallback(ctx context.Context, result mq.Result) mq.Result {
if taskContext, ok := tm.taskContext[result.TaskID]; ok && result.Topic != "" { if taskContext, ok := tm.taskContext[result.TaskID]; ok && result.Topic != "" {
return taskContext.handleCallback(ctx, result) return taskContext.handleNextTask(ctx, result)
} }
return mq.Result{} return mq.Result{}
} }

View File

@@ -75,9 +75,9 @@ func (tm *TaskManager) dispatchFinalResult(ctx context.Context) mq.Result {
func (tm *TaskManager) getConditionalEdges(node *Node, result mq.Result) []Edge { func (tm *TaskManager) getConditionalEdges(node *Node, result mq.Result) []Edge {
edges := make([]Edge, len(node.Edges)) edges := make([]Edge, len(node.Edges))
copy(edges, node.Edges) copy(edges, node.Edges)
if result.Status != "" { if result.ConditionStatus != "" {
if conditions, ok := tm.dag.conditions[FromNode(result.Topic)]; ok { if conditions, ok := tm.dag.conditions[FromNode(result.Topic)]; ok {
if targetNodeKey, ok := conditions[When(result.Status)]; ok { if targetNodeKey, ok := conditions[When(result.ConditionStatus)]; ok {
if targetNode, ok := tm.dag.nodes[string(targetNodeKey)]; ok { if targetNode, ok := tm.dag.nodes[string(targetNodeKey)]; ok {
edges = append(edges, Edge{From: node, To: []*Node{targetNode}}) edges = append(edges, Edge{From: node, To: []*Node{targetNode}})
} }
@@ -91,7 +91,7 @@ func (tm *TaskManager) getConditionalEdges(node *Node, result mq.Result) []Edge
return edges return edges
} }
func (tm *TaskManager) handleCallback(ctx context.Context, result mq.Result) mq.Result { func (tm *TaskManager) handleNextTask(ctx context.Context, result mq.Result) mq.Result {
defer func() { defer func() {
tm.wg.Done() tm.wg.Done()
mq.RecoverPanic(mq.RecoverTitle) mq.RecoverPanic(mq.RecoverTitle)
@@ -207,7 +207,7 @@ func (tm *TaskManager) processNode(ctx context.Context, node *Node, payload json
defer func() { defer func() {
result.Topic = node.Key result.Topic = node.Key
tm.appendResult(result, false) tm.appendResult(result, false)
tm.handleCallback(ctx, result) tm.handleNextTask(ctx, result)
}() }()
} }
select { select {

View File

@@ -38,11 +38,11 @@ func (e *Condition) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
switch email := data["email"].(type) { switch email := data["email"].(type) {
case string: case string:
if email == "abc.xyz@gmail.com" { if email == "abc.xyz@gmail.com" {
return mq.Result{Payload: task.Payload, Status: "pass", Ctx: ctx} return mq.Result{Payload: task.Payload, ConditionStatus: "pass", Ctx: ctx}
} }
return mq.Result{Payload: task.Payload, Status: "fail", Ctx: ctx} return mq.Result{Payload: task.Payload, ConditionStatus: "fail", Ctx: ctx}
default: default:
return mq.Result{Payload: task.Payload, Status: "fail", Ctx: ctx} return mq.Result{Payload: task.Payload, ConditionStatus: "fail", Ctx: ctx}
} }
} }

View File

@@ -13,15 +13,16 @@ import (
) )
type Result struct { type Result struct {
CreatedAt time.Time `json:"created_at"` CreatedAt time.Time `json:"created_at"`
ProcessedAt time.Time `json:"processed_at,omitempty"` ProcessedAt time.Time `json:"processed_at,omitempty"`
Latency string `json:"latency"` Latency string `json:"latency"`
Error error `json:"-"` // Keep error as an error type Error error `json:"-"` // Keep error as an error type
Topic string `json:"topic"` Topic string `json:"topic"`
TaskID string `json:"task_id"` TaskID string `json:"task_id"`
Status string `json:"status"` Status string `json:"status"`
Ctx context.Context `json:"-"` ConditionStatus string `json:"condition_status"`
Payload json.RawMessage `json:"payload"` Ctx context.Context `json:"-"`
Payload json.RawMessage `json:"payload"`
} }
func (r Result) MarshalJSON() ([]byte, error) { func (r Result) MarshalJSON() ([]byte, error) {
@@ -50,8 +51,6 @@ func (r *Result) UnmarshalJSON(data []byte) error {
if err := json.Unmarshal(data, &aux); err != nil { if err := json.Unmarshal(data, &aux); err != nil {
return err return err
} }
// Restore the error from string to error type
if aux.ErrMsg != "" { if aux.ErrMsg != "" {
r.Error = errors.New(aux.ErrMsg) r.Error = errors.New(aux.ErrMsg)
} else { } else {