feat: add example

This commit is contained in:
sujit
2024-10-08 16:24:52 +05:45
parent a4dff9c77b
commit 612eb535ec
4 changed files with 87 additions and 99 deletions

View File

@@ -5,20 +5,21 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/oarkflow/mq"
"github.com/oarkflow/mq/v2" "github.com/oarkflow/mq/v2"
) )
func handler1(ctx context.Context, task *v2.Task) v2.Result { func handler1(ctx context.Context, task *mq.Task) mq.Result {
return v2.Result{Payload: task.Payload, Ctx: ctx} return mq.Result{Payload: task.Payload, Ctx: ctx}
} }
func handler2(ctx context.Context, task *v2.Task) v2.Result { func handler2(ctx context.Context, task *mq.Task) mq.Result {
var user map[string]any var user map[string]any
json.Unmarshal(task.Payload, &user) json.Unmarshal(task.Payload, &user)
return v2.Result{Payload: task.Payload, Ctx: ctx} return mq.Result{Payload: task.Payload, Ctx: ctx}
} }
func handler3(ctx context.Context, task *v2.Task) v2.Result { func handler3(ctx context.Context, task *mq.Task) mq.Result {
var user map[string]any var user map[string]any
json.Unmarshal(task.Payload, &user) json.Unmarshal(task.Payload, &user)
age := int(user["age"].(float64)) age := int(user["age"].(float64))
@@ -28,30 +29,30 @@ func handler3(ctx context.Context, task *v2.Task) v2.Result {
} }
user["status"] = status user["status"] = status
resultPayload, _ := json.Marshal(user) resultPayload, _ := json.Marshal(user)
return v2.Result{Payload: resultPayload, Status: status, Ctx: ctx} return mq.Result{Payload: resultPayload, Status: status, Ctx: ctx}
} }
func handler4(ctx context.Context, task *v2.Task) v2.Result { func handler4(ctx context.Context, task *mq.Task) mq.Result {
var user map[string]any var user map[string]any
json.Unmarshal(task.Payload, &user) json.Unmarshal(task.Payload, &user)
user["final"] = "D" user["final"] = "D"
resultPayload, _ := json.Marshal(user) resultPayload, _ := json.Marshal(user)
return v2.Result{Payload: resultPayload, Ctx: ctx} return mq.Result{Payload: resultPayload, Ctx: ctx}
} }
func handler5(ctx context.Context, task *v2.Task) v2.Result { func handler5(ctx context.Context, task *mq.Task) mq.Result {
var user map[string]any var user map[string]any
json.Unmarshal(task.Payload, &user) json.Unmarshal(task.Payload, &user)
user["salary"] = "E" user["salary"] = "E"
resultPayload, _ := json.Marshal(user) resultPayload, _ := json.Marshal(user)
return v2.Result{Payload: resultPayload, Ctx: ctx} return mq.Result{Payload: resultPayload, Ctx: ctx}
} }
func handler6(ctx context.Context, task *v2.Task) v2.Result { func handler6(ctx context.Context, task *mq.Task) mq.Result {
var user map[string]any var user map[string]any
json.Unmarshal(task.Payload, &user) json.Unmarshal(task.Payload, &user)
resultPayload, _ := json.Marshal(map[string]any{"storage": user}) resultPayload, _ := json.Marshal(map[string]any{"storage": user})
return v2.Result{Payload: resultPayload, Ctx: ctx} return mq.Result{Payload: resultPayload, Ctx: ctx}
} }
func main() { func main() {

View File

@@ -3,10 +3,12 @@ package mq
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"time" "time"
) )
type Result struct { type Result struct {
Ctx context.Context
Payload json.RawMessage `json:"payload"` Payload json.RawMessage `json:"payload"`
Topic string `json:"topic"` Topic string `json:"topic"`
TaskID string `json:"task_id"` TaskID string `json:"task_id"`
@@ -14,6 +16,44 @@ type Result struct {
Status string `json:"status"` Status string `json:"status"`
} }
func (r Result) Unmarshal(data any) error {
if r.Payload == nil {
return fmt.Errorf("payload is nil")
}
return json.Unmarshal(r.Payload, data)
}
func (r Result) String() string {
return string(r.Payload)
}
func HandleError(ctx context.Context, err error, status ...string) Result {
st := "Failed"
if len(status) > 0 {
st = status[0]
}
if err == nil {
return Result{}
}
return Result{
Status: st,
Error: err,
Ctx: ctx,
}
}
func (r Result) WithData(status string, data []byte) Result {
if r.Error != nil {
return r
}
return Result{
Status: status,
Payload: data,
Error: nil,
Ctx: r.Ctx,
}
}
type TLSConfig struct { type TLSConfig struct {
UseTLS bool UseTLS bool
CertPath string CertPath string

View File

@@ -3,83 +3,28 @@ package v2
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"sync" "sync"
"github.com/oarkflow/xid" "github.com/oarkflow/xid"
"github.com/oarkflow/mq"
) )
type Handler func(ctx context.Context, task *Task) Result func NewTask(id string, payload json.RawMessage, nodeKey string, results ...map[string]mq.Result) *mq.Task {
type Result struct {
Ctx context.Context
TaskID string `json:"task_id"`
Payload json.RawMessage `json:"payload"`
Status string `json:"status"`
Error error `json:"error"`
nodeKey string
}
func (r Result) Unmarshal(data any) error {
if r.Payload == nil {
return fmt.Errorf("payload is nil")
}
return json.Unmarshal(r.Payload, data)
}
func (r Result) String() string {
return string(r.Payload)
}
func HandleError(ctx context.Context, err error, status ...string) Result {
st := "Failed"
if len(status) > 0 {
st = status[0]
}
if err == nil {
return Result{}
}
return Result{
Status: st,
Error: err,
Ctx: ctx,
}
}
func (r Result) WithData(status string, data []byte) Result {
if r.Error != nil {
return r
}
return Result{
Status: status,
Payload: data,
Error: nil,
Ctx: r.Ctx,
}
}
type Task struct {
ID string `json:"id"`
NodeKey string `json:"node_key"`
Payload json.RawMessage `json:"payload"`
Results map[string]Result `json:"results"`
}
func NewTask(id string, payload json.RawMessage, nodeKey string, results ...map[string]Result) *Task {
if id == "" { if id == "" {
id = xid.New().String() id = xid.New().String()
} }
result := make(map[string]Result) result := make(map[string]mq.Result)
if len(results) > 0 && results[0] != nil { if len(results) > 0 && results[0] != nil {
result = results[0] result = results[0]
} }
return &Task{ID: id, Payload: payload, NodeKey: nodeKey, Results: result} return &mq.Task{ID: id, Payload: payload, Topic: nodeKey, Results: result}
} }
type Node struct { type Node struct {
Key string Key string
Edges []Edge Edges []Edge
handler Handler handler mq.Handler
} }
type EdgeType int type EdgeType int
@@ -112,7 +57,7 @@ func NewDAG() *DAG {
} }
} }
func (tm *DAG) AddNode(key string, handler Handler) { func (tm *DAG) AddNode(key string, handler mq.Handler) {
tm.mu.Lock() tm.mu.Lock()
defer tm.mu.Unlock() defer tm.mu.Unlock()
tm.Nodes[key] = &Node{ tm.Nodes[key] = &Node{
@@ -145,11 +90,11 @@ func (tm *DAG) AddEdge(from, to string, edgeTypes ...EdgeType) {
fromNode.Edges = append(fromNode.Edges, edge) fromNode.Edges = append(fromNode.Edges, edge)
} }
func (tm *DAG) ProcessTask(ctx context.Context, node string, payload []byte) Result { func (tm *DAG) ProcessTask(ctx context.Context, node string, payload []byte) mq.Result {
tm.mu.Lock() tm.mu.Lock()
defer tm.mu.Unlock() defer tm.mu.Unlock()
taskID := xid.New().String() taskID := xid.New().String()
task := NewTask(taskID, payload, node, make(map[string]Result)) task := NewTask(taskID, payload, node, make(map[string]mq.Result))
manager := NewTaskManager(tm) manager := NewTaskManager(tm)
tm.taskContext[taskID] = manager tm.taskContext[taskID] = manager
return manager.processTask(ctx, node, task) return manager.processTask(ctx, node, task)

View File

@@ -5,30 +5,32 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"sync" "sync"
"github.com/oarkflow/mq"
) )
type TaskManager struct { type TaskManager struct {
dag *DAG dag *DAG
wg sync.WaitGroup wg sync.WaitGroup
mutex sync.Mutex mutex sync.Mutex
results []Result results []mq.Result
nodeResults map[string]Result nodeResults map[string]mq.Result
done chan struct{} done chan struct{}
} }
func NewTaskManager(d *DAG) *TaskManager { func NewTaskManager(d *DAG) *TaskManager {
return &TaskManager{ return &TaskManager{
dag: d, dag: d,
nodeResults: make(map[string]Result), nodeResults: make(map[string]mq.Result),
results: make([]Result, 0), results: make([]mq.Result, 0),
done: make(chan struct{}), done: make(chan struct{}),
} }
} }
func (tm *TaskManager) processTask(ctx context.Context, nodeID string, task *Task) Result { func (tm *TaskManager) processTask(ctx context.Context, nodeID string, task *mq.Task) mq.Result {
node, ok := tm.dag.Nodes[nodeID] node, ok := tm.dag.Nodes[nodeID]
if !ok { if !ok {
return Result{Error: fmt.Errorf("nodeID %s not found", nodeID)} return mq.Result{Error: fmt.Errorf("nodeID %s not found", nodeID)}
} }
tm.wg.Add(1) tm.wg.Add(1)
go tm.processNode(ctx, node, task, nil) go tm.processNode(ctx, node, task, nil)
@@ -38,7 +40,7 @@ func (tm *TaskManager) processTask(ctx context.Context, nodeID string, task *Tas
}() }()
select { select {
case <-ctx.Done(): case <-ctx.Done():
return Result{Error: ctx.Err()} return mq.Result{Error: ctx.Err()}
case <-tm.done: case <-tm.done:
tm.mutex.Lock() tm.mutex.Lock()
defer tm.mutex.Unlock() defer tm.mutex.Unlock()
@@ -49,10 +51,10 @@ func (tm *TaskManager) processTask(ctx context.Context, nodeID string, task *Tas
} }
} }
func (tm *TaskManager) callback(ctx context.Context, results any) Result { func (tm *TaskManager) callback(ctx context.Context, results any) mq.Result {
var rs Result var rs mq.Result
switch res := results.(type) { switch res := results.(type) {
case []Result: case []mq.Result:
aggregatedOutput := make([]json.RawMessage, 0) aggregatedOutput := make([]json.RawMessage, 0)
for i, result := range res { for i, result := range res {
if i == 0 { if i == 0 {
@@ -62,43 +64,43 @@ func (tm *TaskManager) callback(ctx context.Context, results any) Result {
var item json.RawMessage var item json.RawMessage
err := json.Unmarshal(result.Payload, &item) err := json.Unmarshal(result.Payload, &item)
if err != nil { if err != nil {
return HandleError(ctx, err) return mq.HandleError(ctx, err)
} }
aggregatedOutput = append(aggregatedOutput, item) aggregatedOutput = append(aggregatedOutput, item)
} }
finalOutput, err := json.Marshal(aggregatedOutput) finalOutput, err := json.Marshal(aggregatedOutput)
return HandleError(ctx, err).WithData(rs.Status, finalOutput) return mq.HandleError(ctx, err).WithData(rs.Status, finalOutput)
case Result: case mq.Result:
rs.TaskID = res.TaskID rs.TaskID = res.TaskID
var item json.RawMessage var item json.RawMessage
err := json.Unmarshal(res.Payload, &item) err := json.Unmarshal(res.Payload, &item)
if err != nil { if err != nil {
return HandleError(ctx, err) return mq.HandleError(ctx, err)
} }
finalOutput, err := json.Marshal(item) finalOutput, err := json.Marshal(item)
return HandleError(ctx, err).WithData(res.Status, finalOutput) return mq.HandleError(ctx, err).WithData(res.Status, finalOutput)
} }
return rs return rs
} }
func (tm *TaskManager) appendFinalResult(result Result) { func (tm *TaskManager) appendFinalResult(result mq.Result) {
tm.mutex.Lock() tm.mutex.Lock()
tm.results = append(tm.results, result) tm.results = append(tm.results, result)
tm.nodeResults[result.nodeKey] = result tm.nodeResults[result.Topic] = result
tm.mutex.Unlock() tm.mutex.Unlock()
} }
func (tm *TaskManager) processNode(ctx context.Context, node *Node, task *Task, parentNode *Node) { func (tm *TaskManager) processNode(ctx context.Context, node *Node, task *mq.Task, parentNode *Node) {
defer tm.wg.Done() defer tm.wg.Done()
var result Result var result mq.Result
select { select {
case <-ctx.Done(): case <-ctx.Done():
result = Result{TaskID: task.ID, nodeKey: node.Key, Error: ctx.Err()} result = mq.Result{TaskID: task.ID, Topic: node.Key, Error: ctx.Err()}
tm.appendFinalResult(result) tm.appendFinalResult(result)
return return
default: default:
result = node.handler(ctx, task) result = node.handler(ctx, task)
result.nodeKey = node.Key result.Topic = node.Key
if result.Error != nil { if result.Error != nil {
tm.appendFinalResult(result) tm.appendFinalResult(result)
return return
@@ -113,7 +115,7 @@ func (tm *TaskManager) processNode(ctx context.Context, node *Node, task *Task,
edges := make([]Edge, len(node.Edges)) edges := make([]Edge, len(node.Edges))
copy(edges, node.Edges) copy(edges, node.Edges)
if result.Status != "" { if result.Status != "" {
if conditions, ok := tm.dag.conditions[result.nodeKey]; ok { if conditions, ok := tm.dag.conditions[result.Topic]; ok {
if targetNodeKey, ok := conditions[result.Status]; ok { if targetNodeKey, ok := conditions[result.Status]; ok {
if targetNode, ok := tm.dag.Nodes[targetNodeKey]; ok { if targetNode, ok := tm.dag.Nodes[targetNodeKey]; ok {
edges = append(edges, Edge{From: node, To: targetNode}) edges = append(edges, Edge{From: node, To: targetNode})
@@ -133,7 +135,7 @@ func (tm *TaskManager) processNode(ctx context.Context, node *Node, task *Task,
var items []json.RawMessage var items []json.RawMessage
err := json.Unmarshal(result.Payload, &items) err := json.Unmarshal(result.Payload, &items)
if err != nil { if err != nil {
tm.appendFinalResult(Result{TaskID: task.ID, nodeKey: node.Key, Error: err}) tm.appendFinalResult(mq.Result{TaskID: task.ID, Topic: node.Key, Error: err})
return return
} }
for _, item := range items { for _, item := range items {