mirror of
https://github.com/oarkflow/mq.git
synced 2025-10-06 16:36:53 +08:00
feat: implement Validate
to check for cycle
This commit is contained in:
@@ -169,13 +169,11 @@ func (c *Consumer) OnResponse(ctx context.Context, result Result) error {
|
||||
result.Status = "SUCCESS"
|
||||
}
|
||||
}
|
||||
if result.Payload != nil || result.Error != nil {
|
||||
bt, _ := json.Marshal(result)
|
||||
reply := codec.NewMessage(consts.MESSAGE_RESPONSE, bt, result.Topic, headers)
|
||||
if err := c.send(ctx, c.conn, reply); err != nil {
|
||||
return fmt.Errorf("failed to send MESSAGE_RESPONSE: %v", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@@ -138,10 +138,6 @@ func (tm *DAG) request(w http.ResponseWriter, r *http.Request, async bool) {
|
||||
} else {
|
||||
rs = tm.Process(ctx, request.Payload)
|
||||
}
|
||||
if rs.Error != nil {
|
||||
http.Error(w, fmt.Sprintf("[DAG Error] - %v", rs.Error), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(rs)
|
||||
}
|
||||
|
@@ -91,11 +91,18 @@ func (tm *TaskManager) getConditionalEdges(node *Node, result mq.Result) []Edge
|
||||
}
|
||||
|
||||
func (tm *TaskManager) handleCallback(ctx context.Context, result mq.Result) mq.Result {
|
||||
defer tm.wg.Done()
|
||||
defer func() {
|
||||
tm.wg.Done()
|
||||
mq.RecoverPanic(mq.RecoverTitle)
|
||||
}()
|
||||
node, ok := tm.dag.nodes[result.Topic]
|
||||
if !ok {
|
||||
return result
|
||||
}
|
||||
if result.Error != nil {
|
||||
tm.appendResult(result, true)
|
||||
return result
|
||||
}
|
||||
edges := tm.getConditionalEdges(node, result)
|
||||
if len(edges) == 0 {
|
||||
tm.appendResult(result, true)
|
||||
@@ -203,7 +210,7 @@ func (tm *TaskManager) processNode(ctx context.Context, node *Node, payload json
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
result = mq.Result{TaskID: tm.taskID, Topic: node.Key, Error: ctx.Err(), Ctx: ctx}
|
||||
tm.appendResult(result, false)
|
||||
tm.appendResult(result, true)
|
||||
return
|
||||
default:
|
||||
ctx = mq.SetHeaders(ctx, map[string]string{consts.QueueKey: node.Key})
|
||||
@@ -214,14 +221,14 @@ func (tm *TaskManager) processNode(ctx context.Context, node *Node, payload json
|
||||
result.TaskID = tm.taskID
|
||||
}
|
||||
if result.Error != nil {
|
||||
tm.appendResult(result, false)
|
||||
tm.appendResult(result, true)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
err := tm.dag.server.Publish(ctx, mq.NewTask(tm.taskID, payload, node.Key), node.Key)
|
||||
if err != nil {
|
||||
tm.appendResult(mq.Result{Error: err}, false)
|
||||
tm.appendResult(mq.Result{Error: err}, true)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@@ -3,6 +3,7 @@ package tasks
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/oarkflow/errors"
|
||||
"github.com/oarkflow/json"
|
||||
|
||||
"github.com/oarkflow/mq"
|
||||
@@ -35,6 +36,7 @@ func (e *Condition) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return mq.Result{Error: errors.New("Condition error")}
|
||||
switch email := data["email"].(type) {
|
||||
case string:
|
||||
if email == "abc.xyz@gmail.com" {
|
||||
|
43
options.go
43
options.go
@@ -7,6 +7,8 @@ import (
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
"github.com/oarkflow/errors"
|
||||
|
||||
"github.com/oarkflow/mq/consts"
|
||||
)
|
||||
|
||||
@@ -14,7 +16,7 @@ type Result struct {
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
ProcessedAt time.Time `json:"processed_at,omitempty"`
|
||||
Latency string `json:"latency"`
|
||||
Error error `json:"error,omitempty"`
|
||||
Error error `json:"-"` // Keep error as an error type
|
||||
Topic string `json:"topic"`
|
||||
TaskID string `json:"task_id"`
|
||||
Status string `json:"status"`
|
||||
@@ -22,6 +24,45 @@ type Result struct {
|
||||
Payload json.RawMessage `json:"payload"`
|
||||
}
|
||||
|
||||
// MarshalJSON customizes the marshaling of Result
|
||||
func (r Result) MarshalJSON() ([]byte, error) {
|
||||
type Alias Result
|
||||
aux := &struct {
|
||||
ErrorMsg string `json:"error"`
|
||||
Alias
|
||||
}{
|
||||
Alias: (Alias)(r),
|
||||
}
|
||||
if r.Error != nil {
|
||||
aux.ErrorMsg = r.Error.Error()
|
||||
}
|
||||
return json.Marshal(aux)
|
||||
}
|
||||
|
||||
// UnmarshalJSON customizes the unmarshalling of Result
|
||||
func (r *Result) UnmarshalJSON(data []byte) error {
|
||||
type Alias Result
|
||||
aux := &struct {
|
||||
ErrMsg string `json:"error,omitempty"`
|
||||
*Alias
|
||||
}{
|
||||
Alias: (*Alias)(r),
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(data, &aux); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Restore the error from string to error type
|
||||
if aux.ErrMsg != "" {
|
||||
r.Error = errors.New(aux.ErrMsg)
|
||||
} else {
|
||||
r.Error = nil
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r Result) Unmarshal(data any) error {
|
||||
if r.Payload == nil {
|
||||
return fmt.Errorf("payload is nil")
|
||||
|
Reference in New Issue
Block a user