diff --git a/codec/codec.go b/codec/codec.go index 2d397bf..ac71082 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -31,7 +31,7 @@ func NewMessage(cmd consts.CMD, payload json.RawMessage, queue string, headers m func (m *Message) Serialize(aesKey, hmacKey []byte, encrypt bool) ([]byte, string, error) { var buf bytes.Buffer - // Serialize Headers, Queue, Command, Payload, and Metadata + // Serialize Headers, Topic, Command, Payload, and Metadata if err := writeLengthPrefixedJSON(&buf, m.Headers); err != nil { return nil, "", fmt.Errorf("error serializing headers: %v", err) } @@ -62,7 +62,7 @@ func Deserialize(data, aesKey, hmacKey []byte, receivedHMAC string, decrypt bool buf := bytes.NewReader(data) - // Deserialize Headers, Queue, Command, Payload, and Metadata + // Deserialize Headers, Topic, Command, Payload, and Metadata headers := make(map[string]string) if err := readLengthPrefixedJSON(buf, &headers); err != nil { return nil, fmt.Errorf("error deserializing headers: %v", err) diff --git a/consts/constants.go b/consts/constants.go index 2776c94..9abbc6b 100644 --- a/consts/constants.go +++ b/consts/constants.go @@ -54,7 +54,7 @@ var ( PublisherKey = "Publisher-Key" ContentType = "Content-Type" AwaitResponseKey = "Await-Response" - QueueKey = "Queue" + QueueKey = "Topic" TypeJson = "application/json" HeaderKey = "headers" TriggerNode = "triggerNode" diff --git a/consumer.go b/consumer.go index 2aa2926..609ead1 100644 --- a/consumer.go +++ b/consumer.go @@ -89,9 +89,9 @@ func (c *Consumer) OnMessage(ctx context.Context, msg *codec.Message, conn net.C return } ctx = SetHeaders(ctx, map[string]string{consts.QueueKey: msg.Queue}) - result := c.ProcessTask(ctx, task) - result.MessageID = task.ID - result.Queue = msg.Queue + result := c.ProcessTask(ctx, &task) + result.TaskID = task.ID + result.Topic = msg.Queue if result.Status == "" { if result.Error != nil { result.Status = "FAILED" @@ -107,7 +107,7 @@ func (c *Consumer) OnMessage(ctx context.Context, msg *codec.Message, conn net.C } // ProcessTask handles a received task message and invokes the appropriate handler. -func (c *Consumer) ProcessTask(ctx context.Context, msg Task) Result { +func (c *Consumer) ProcessTask(ctx context.Context, msg *Task) Result { queue, _ := GetQueue(ctx) handler, exists := c.handlers[queue] if !exists { diff --git a/ctx.go b/ctx.go index 2817b3e..b1e66d1 100644 --- a/ctx.go +++ b/ctx.go @@ -16,15 +16,17 @@ import ( ) type Task struct { - ID string `json:"id"` - Payload json.RawMessage `json:"payload"` - CreatedAt time.Time `json:"created_at"` - ProcessedAt time.Time `json:"processed_at"` - Status string `json:"status"` - Error error `json:"error"` + ID string `json:"id"` + Results map[string]Result `json:"results"` + Topic string `json:"topic"` + Payload json.RawMessage `json:"payload"` + CreatedAt time.Time `json:"created_at"` + ProcessedAt time.Time `json:"processed_at"` + Status string `json:"status"` + Error error `json:"error"` } -type Handler func(context.Context, Task) Result +type Handler func(context.Context, *Task) Result func IsClosed(conn net.Conn) bool { _, err := conn.Read(make([]byte, 1)) diff --git a/dag/dag.go b/dag/dag.go index c0745ef..0f2b51f 100644 --- a/dag/dag.go +++ b/dag/dag.go @@ -124,9 +124,9 @@ func (d *DAG) PublishTask(ctx context.Context, payload json.RawMessage, taskID . return mq.Result{Error: err} } return mq.Result{ - Payload: payload, - Queue: queue, - MessageID: id, + Payload: payload, + Topic: queue, + TaskID: id, } } @@ -168,37 +168,37 @@ func (d *DAG) Send(ctx context.Context, payload []byte) mq.Result { return result } d.mu.Lock() - d.taskChMap[result.MessageID] = resultCh + d.taskChMap[result.TaskID] = resultCh d.mu.Unlock() finalResult := <-resultCh return finalResult } func (d *DAG) processNode(ctx context.Context, task mq.Result) mq.Result { - if con, ok := d.nodes[task.Queue]; ok { + if con, ok := d.nodes[task.Topic]; ok { return con.ProcessTask(ctx, mq.Task{ - ID: task.MessageID, + ID: task.TaskID, Payload: task.Payload, }) } - return mq.Result{Error: fmt.Errorf("no consumer to process %s", task.Queue)} + return mq.Result{Error: fmt.Errorf("no consumer to process %s", task.Topic)} } func (d *DAG) sendSync(ctx context.Context, task mq.Result) mq.Result { - if task.MessageID == "" { - task.MessageID = mq.NewID() + if task.TaskID == "" { + task.TaskID = mq.NewID() } - if task.Queue == "" { - task.Queue = d.FirstNode + if task.Topic == "" { + task.Topic = d.FirstNode } ctx = mq.SetHeaders(ctx, map[string]string{ - consts.QueueKey: task.Queue, + consts.QueueKey: task.Topic, }) result := d.processNode(ctx, task) if result.Error != nil { return result } - for _, target := range d.loopEdges[task.Queue] { + for _, target := range d.loopEdges[task.Topic] { var items, results []json.RawMessage if err := json.Unmarshal(result.Payload, &items); err != nil { return mq.Result{Error: err} @@ -208,9 +208,9 @@ func (d *DAG) sendSync(ctx context.Context, task mq.Result) mq.Result { consts.QueueKey: target, }) result = d.sendSync(ctx, mq.Result{ - Payload: item, - Queue: target, - MessageID: result.MessageID, + Payload: item, + Topic: target, + TaskID: result.TaskID, }) if result.Error != nil { return result @@ -223,29 +223,29 @@ func (d *DAG) sendSync(ctx context.Context, task mq.Result) mq.Result { } result.Payload = bt } - if conditions, ok := d.conditions[task.Queue]; ok { + if conditions, ok := d.conditions[task.Topic]; ok { if target, exists := conditions[result.Status]; exists { ctx = mq.SetHeaders(ctx, map[string]string{ consts.QueueKey: target, }) result = d.sendSync(ctx, mq.Result{ - Payload: result.Payload, - Queue: target, - MessageID: result.MessageID, + Payload: result.Payload, + Topic: target, + TaskID: result.TaskID, }) if result.Error != nil { return result } } } - if target, ok := d.edges[task.Queue]; ok { + if target, ok := d.edges[task.Topic]; ok { ctx = mq.SetHeaders(ctx, map[string]string{ consts.QueueKey: target, }) result = d.sendSync(ctx, mq.Result{ - Payload: result.Payload, - Queue: target, - MessageID: result.MessageID, + Payload: result.Payload, + Topic: target, + TaskID: result.TaskID, }) if result.Error != nil { return result @@ -260,7 +260,7 @@ func (d *DAG) getCompletedResults(task mq.Result, ok bool, triggeredNode string) completed := false multipleResults := false if ok && triggeredNode != "" { - taskResults, ok := d.taskResults[task.MessageID] + taskResults, ok := d.taskResults[task.TaskID] if ok { nodeResult, exists := taskResults[triggeredNode] if exists { @@ -300,25 +300,25 @@ func (d *DAG) TaskCallback(ctx context.Context, task mq.Result) mq.Result { } triggeredNode, ok := mq.GetTriggerNode(ctx) payload, completed, multipleResults := d.getCompletedResults(task, ok, triggeredNode) - if loopNodes, exists := d.loopEdges[task.Queue]; exists { + if loopNodes, exists := d.loopEdges[task.Topic]; exists { var items []json.RawMessage if err := json.Unmarshal(payload, &items); err != nil { return mq.Result{Error: task.Error} } - d.taskResults[task.MessageID] = map[string]*taskContext{ - task.Queue: { + d.taskResults[task.TaskID] = map[string]*taskContext{ + task.Topic: { totalItems: len(items), multipleResults: true, }, } - ctx = mq.SetHeaders(ctx, map[string]string{consts.TriggerNode: task.Queue}) + ctx = mq.SetHeaders(ctx, map[string]string{consts.TriggerNode: task.Topic}) for _, loopNode := range loopNodes { for _, item := range items { ctx = mq.SetHeaders(ctx, map[string]string{ consts.QueueKey: loopNode, }) - result := d.PublishTask(ctx, item, task.MessageID) + result := d.PublishTask(ctx, item, task.TaskID) if result.Error != nil { return result } @@ -328,51 +328,51 @@ func (d *DAG) TaskCallback(ctx context.Context, task mq.Result) mq.Result { return task } if multipleResults && completed { - task.Queue = triggeredNode + task.Topic = triggeredNode } - if conditions, ok := d.conditions[task.Queue]; ok { + if conditions, ok := d.conditions[task.Topic]; ok { if target, exists := conditions[task.Status]; exists { - d.taskResults[task.MessageID] = map[string]*taskContext{ - task.Queue: { + d.taskResults[task.TaskID] = map[string]*taskContext{ + task.Topic: { totalItems: len(conditions), }, } ctx = mq.SetHeaders(ctx, map[string]string{ consts.QueueKey: target, - consts.TriggerNode: task.Queue, + consts.TriggerNode: task.Topic, }) - result := d.PublishTask(ctx, payload, task.MessageID) + result := d.PublishTask(ctx, payload, task.TaskID) if result.Error != nil { return result } } } else { - ctx = mq.SetHeaders(ctx, map[string]string{consts.TriggerNode: task.Queue}) - edge, exists := d.edges[task.Queue] + ctx = mq.SetHeaders(ctx, map[string]string{consts.TriggerNode: task.Topic}) + edge, exists := d.edges[task.Topic] if exists { - d.taskResults[task.MessageID] = map[string]*taskContext{ - task.Queue: { + d.taskResults[task.TaskID] = map[string]*taskContext{ + task.Topic: { totalItems: 1, }, } ctx = mq.SetHeaders(ctx, map[string]string{ consts.QueueKey: edge, }) - result := d.PublishTask(ctx, payload, task.MessageID) + result := d.PublishTask(ctx, payload, task.TaskID) if result.Error != nil { return result } } else if completed { d.mu.Lock() - if resultCh, ok := d.taskChMap[task.MessageID]; ok { + if resultCh, ok := d.taskChMap[task.TaskID]; ok { resultCh <- mq.Result{ - Payload: payload, - Queue: task.Queue, - MessageID: task.MessageID, - Status: "done", + Payload: payload, + Topic: task.Topic, + TaskID: task.TaskID, + Status: "done", } - delete(d.taskChMap, task.MessageID) - delete(d.taskResults, task.MessageID) + delete(d.taskChMap, task.TaskID) + delete(d.taskResults, task.TaskID) } d.mu.Unlock() } diff --git a/examples/dag.go b/examples/dag.go index 992b303..f2e66cd 100644 --- a/examples/dag.go +++ b/examples/dag.go @@ -82,7 +82,7 @@ func requestHandler(requestType string) func(w http.ResponseWriter, r *http.Requ } w.Header().Set("Content-Type", "application/json") result := map[string]any{ - "message_id": rs.MessageID, + "message_id": rs.TaskID, "payload": string(rs.Payload), "error": rs.Error, } diff --git a/examples/tasks/tasks.go b/examples/tasks/tasks.go index fd7d534..3a6f64c 100644 --- a/examples/tasks/tasks.go +++ b/examples/tasks/tasks.go @@ -4,18 +4,19 @@ import ( "context" "encoding/json" "fmt" + "github.com/oarkflow/mq" ) -func Node1(ctx context.Context, task mq.Task) mq.Result { - return mq.Result{Payload: task.Payload, MessageID: task.ID} +func Node1(ctx context.Context, task *mq.Task) mq.Result { + return mq.Result{Payload: task.Payload, TaskID: task.ID} } -func Node2(ctx context.Context, task mq.Task) mq.Result { - return mq.Result{Payload: task.Payload, MessageID: task.ID} +func Node2(ctx context.Context, task *mq.Task) mq.Result { + return mq.Result{Payload: task.Payload, TaskID: task.ID} } -func Node3(ctx context.Context, task mq.Task) mq.Result { +func Node3(ctx context.Context, task *mq.Task) mq.Result { var data map[string]any err := json.Unmarshal(task.Payload, &data) if err != nil { @@ -23,10 +24,10 @@ func Node3(ctx context.Context, task mq.Task) mq.Result { } data["salary"] = fmt.Sprintf("12000%v", data["user_id"]) bt, _ := json.Marshal(data) - return mq.Result{Payload: bt, MessageID: task.ID} + return mq.Result{Payload: bt, TaskID: task.ID} } -func Node4(ctx context.Context, task mq.Task) mq.Result { +func Node4(ctx context.Context, task *mq.Task) mq.Result { var data []map[string]any err := json.Unmarshal(task.Payload, &data) if err != nil { @@ -34,10 +35,10 @@ func Node4(ctx context.Context, task mq.Task) mq.Result { } payload := map[string]any{"storage": data} bt, _ := json.Marshal(payload) - return mq.Result{Payload: bt, MessageID: task.ID} + return mq.Result{Payload: bt, TaskID: task.ID} } -func CheckCondition(ctx context.Context, task mq.Task) mq.Result { +func CheckCondition(ctx context.Context, task *mq.Task) mq.Result { var data map[string]any err := json.Unmarshal(task.Payload, &data) if err != nil { @@ -49,20 +50,20 @@ func CheckCondition(ctx context.Context, task mq.Task) mq.Result { } else { status = "fail" } - return mq.Result{Status: status, Payload: task.Payload, MessageID: task.ID} + return mq.Result{Status: status, Payload: task.Payload, TaskID: task.ID} } -func Pass(ctx context.Context, task mq.Task) mq.Result { +func Pass(ctx context.Context, task *mq.Task) mq.Result { fmt.Println("Pass") return mq.Result{Payload: task.Payload} } -func Fail(ctx context.Context, task mq.Task) mq.Result { +func Fail(ctx context.Context, task *mq.Task) mq.Result { fmt.Println("Fail") return mq.Result{Payload: []byte(`{"test2": "asdsa"}`)} } func Callback(ctx context.Context, task mq.Result) mq.Result { - fmt.Println("Received task", task.MessageID, "Payload", string(task.Payload), task.Error, task.Queue) + fmt.Println("Received task", task.TaskID, "Payload", string(task.Payload), task.Error, task.Topic) return mq.Result{} } diff --git a/options.go b/options.go index 76ea6ea..4f98d04 100644 --- a/options.go +++ b/options.go @@ -7,11 +7,11 @@ import ( ) type Result struct { - Payload json.RawMessage `json:"payload"` - Queue string `json:"queue"` - MessageID string `json:"message_id"` - Error error `json:"error,omitempty"` - Status string `json:"status"` + Payload json.RawMessage `json:"payload"` + Topic string `json:"topic"` + TaskID string `json:"task_id"` + Error error `json:"error,omitempty"` + Status string `json:"status"` } type TLSConfig struct {