From 3bfd26c8edb08d9865cc2d7a7af4b5ec8afbfc47 Mon Sep 17 00:00:00 2001 From: Oarkflow Date: Sat, 16 Nov 2024 16:02:59 +0545 Subject: [PATCH] update: dependencies --- consts/constants.go | 3 ++- dag/api.go | 45 ++++++++++++++++++++++++++++++++-- dag/dag.go | 59 +++++++++++++++++++++++++++++++++++++++------ dag/task_manager.go | 37 ++++++++++++++++------------ dag/task_process.go | 37 +++++++++++++++++++++++++--- go.mod | 1 + 6 files changed, 152 insertions(+), 30 deletions(-) diff --git a/consts/constants.go b/consts/constants.go index c9280b8..4a80d6c 100644 --- a/consts/constants.go +++ b/consts/constants.go @@ -83,7 +83,8 @@ var ( ContentType = "Content-Type" AwaitResponseKey = "Await-Response" QueueKey = "Topic" - TypeJson = "application/json" + TypeJson = "application/json; charset=utf-8" + TypeHtml = "text/html; charset=utf-8" HeaderKey = "headers" TriggerNode = "triggerNode" ) diff --git a/dag/api.go b/dag/api.go index 980a05d..bc5df57 100644 --- a/dag/api.go +++ b/dag/api.go @@ -1,14 +1,18 @@ package dag import ( + "context" "encoding/json" "fmt" - "github.com/oarkflow/mq/sio" "io" "net/http" + "net/url" "os" "time" + "github.com/oarkflow/mq/jsonparser" + "github.com/oarkflow/mq/sio" + "github.com/oarkflow/mq" "github.com/oarkflow/mq/consts" "github.com/oarkflow/mq/metrics" @@ -36,6 +40,7 @@ func (tm *DAG) Handlers() { metrics.HandleHTTP() http.Handle("/", http.FileServer(http.Dir("webroot"))) http.Handle("/notify", tm.SetupWS()) + http.HandleFunc("GET /render", tm.Render) http.HandleFunc("POST /request", tm.Request) http.HandleFunc("POST /publish", tm.Publish) http.HandleFunc("POST /schedule", tm.Schedule) @@ -124,7 +129,7 @@ func (tm *DAG) request(w http.ResponseWriter, r *http.Request, async bool) { } err = json.Unmarshal(payload, &request) if err != nil { - http.Error(w, "Failed to read request body", http.StatusBadRequest) + http.Error(w, "Failed to unmarshal body", http.StatusBadRequest) return } } else { @@ -145,6 +150,7 @@ func (tm *DAG) request(w http.ResponseWriter, r *http.Request, async bool) { if request.Recurring { opts = append(opts, mq.WithRecurring()) } + ctx = context.WithValue(ctx, "query_params", r.URL.Query()) var rs mq.Result if request.Schedule { rs = tm.ScheduleTask(ctx, request.Payload, opts...) @@ -155,6 +161,19 @@ func (tm *DAG) request(w http.ResponseWriter, r *http.Request, async bool) { json.NewEncoder(w).Encode(rs) } +func (tm *DAG) Render(w http.ResponseWriter, r *http.Request) { + ctx := mq.SetHeaders(r.Context(), map[string]string{consts.AwaitResponseKey: "true", "request_type": "render"}) + ctx = context.WithValue(ctx, "query_params", r.URL.Query()) + rs := tm.Process(ctx, nil) + content, err := jsonparser.GetString(rs.Payload, "content") + if err != nil { + http.Error(w, "Failed to read request body", http.StatusBadRequest) + return + } + w.Header().Set("Content-Type", consts.TypeHtml) + w.Write([]byte(content)) +} + func (tm *DAG) Request(w http.ResponseWriter, r *http.Request) { tm.request(w, r, true) } @@ -166,3 +185,25 @@ func (tm *DAG) Publish(w http.ResponseWriter, r *http.Request) { func (tm *DAG) Schedule(w http.ResponseWriter, r *http.Request) { tm.request(w, r, false) } + +func GetTaskID(ctx context.Context) string { + if queryParams := ctx.Value("query_params"); queryParams != nil { + if params, ok := queryParams.(url.Values); ok { + if id := params.Get("taskID"); id != "" { + return id + } + } + } + return "" +} + +func CanNextNode(ctx context.Context) string { + if queryParams := ctx.Value("query_params"); queryParams != nil { + if params, ok := queryParams.(url.Values); ok { + if id := params.Get("next"); id != "" { + return id + } + } + } + return "" +} diff --git a/dag/dag.go b/dag/dag.go index 9559bd4..8bd7042 100644 --- a/dag/dag.go +++ b/dag/dag.go @@ -26,9 +26,19 @@ const ( Iterator ) +type NodeType int + +func (c NodeType) IsValid() bool { return c >= Process && c <= Page } + +const ( + Process NodeType = iota + Page +) + type Node struct { processor mq.Processor Name string + Type NodeType Key string Edges []Edge isReady bool @@ -60,6 +70,7 @@ type DAG struct { server *mq.Broker consumer *mq.Consumer taskContext map[string]*TaskManager + iteratorNodes map[string]struct{} conditions map[FromNode]map[When]Then pool *mq.Pool taskCleanupCh chan string @@ -136,6 +147,7 @@ func NewDAG(name, key string, opts ...mq.Option) *DAG { name: name, key: key, nodes: make(map[string]*Node), + iteratorNodes: make(map[string]struct{}), taskContext: make(map[string]*TaskManager), conditions: make(map[FromNode]map[When]Then), taskCleanupCh: make(chan string), @@ -259,6 +271,9 @@ func (tm *DAG) AddNode(name, key string, handler mq.Processor, firstNode ...bool Key: key, processor: con, } + if handler.GetType() == "page" { + n.Type = Page + } if tm.server.SyncMode() { n.isReady = true } @@ -305,6 +320,9 @@ func (tm *DAG) AddCondition(fromNode FromNode, conditions map[When]Then) *DAG { func (tm *DAG) AddIterator(label, from string, targets ...string) *DAG { tm.Error = tm.addEdge(Iterator, label, from, targets...) + tm.mu.Lock() + tm.iteratorNodes[from] = struct{}{} + tm.mu.Unlock() return tm } @@ -349,10 +367,15 @@ func (tm *DAG) GetReport() string { func (tm *DAG) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { tm.mu.Lock() - taskID := mq.NewID() - manager := NewTaskManager(tm, taskID) - manager.createdAt = task.CreatedAt - tm.taskContext[taskID] = manager + if task.ID == "" { + task.ID = mq.NewID() + } + manager, exists := tm.taskContext[task.ID] + if !exists { + manager = NewTaskManager(tm, task.ID, tm.iteratorNodes) + manager.createdAt = task.CreatedAt + tm.taskContext[task.ID] = manager + } tm.mu.Unlock() if tm.consumer != nil { @@ -363,6 +386,18 @@ func (tm *DAG) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { } task.Topic = initialNode } + if manager.topic != "" { + task.Topic = manager.topic + canNext := CanNextNode(ctx) + if canNext != "" { + if n, ok := tm.nodes[task.Topic]; ok { + if len(n.Edges) > 0 { + task.Topic = n.Edges[0].To[0].Key + } + } + } else { + } + } result := manager.processTask(ctx, task.Topic, task.Payload) if result.Error != nil { @@ -375,12 +410,9 @@ func (tm *DAG) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { } func (tm *DAG) check(ctx context.Context, payload []byte) (context.Context, *mq.Task, error) { - tm.mu.RLock() if tm.paused { - tm.mu.RUnlock() return ctx, nil, fmt.Errorf("unable to process task, error: DAG is not accepting any task") } - tm.mu.RUnlock() if !tm.IsReady() { return ctx, nil, fmt.Errorf("unable to process task, error: DAG is not ready yet") } @@ -391,7 +423,18 @@ func (tm *DAG) check(ctx context.Context, payload []byte) (context.Context, *mq. if tm.server.SyncMode() { ctx = mq.SetHeaders(ctx, map[string]string{consts.AwaitResponseKey: "true"}) } - return ctx, mq.NewTask(mq.NewID(), payload, initialNode), nil + taskID := GetTaskID(ctx) + tm.mu.RLock() + defer tm.mu.RUnlock() + if taskID != "" { + if _, exists := tm.taskContext[taskID]; !exists { + return ctx, nil, fmt.Errorf("provided task ID doesn't exist") + } + } + if taskID == "" { + taskID = mq.NewID() + } + return ctx, mq.NewTask(taskID, payload, initialNode), nil } func (tm *DAG) Process(ctx context.Context, payload []byte) mq.Result { diff --git a/dag/task_manager.go b/dag/task_manager.go index 3195ced..5972047 100644 --- a/dag/task_manager.go +++ b/dag/task_manager.go @@ -11,24 +11,30 @@ import ( ) type TaskManager struct { - createdAt time.Time - processedAt time.Time - status string - dag *DAG - nodeResults map[string]mq.Result - wg *WaitGroup - taskID string - results []mq.Result - mutex sync.Mutex + createdAt time.Time + processedAt time.Time + status string + dag *DAG + nodeResults map[string]mq.Result + wg *WaitGroup + taskID string + results []mq.Result + iteratorNodes map[string]struct{} + mutex sync.Mutex + topic string } -func NewTaskManager(d *DAG, taskID string) *TaskManager { +func NewTaskManager(d *DAG, taskID string, iteratorNodes map[string]struct{}) *TaskManager { + if iteratorNodes == nil { + iteratorNodes = make(map[string]struct{}) + } return &TaskManager{ - dag: d, - nodeResults: make(map[string]mq.Result), - results: make([]mq.Result, 0), - taskID: taskID, - wg: NewWaitGroup(), + dag: d, + nodeResults: make(map[string]mq.Result), + results: make([]mq.Result, 0), + taskID: taskID, + iteratorNodes: iteratorNodes, + wg: NewWaitGroup(), } } @@ -51,6 +57,7 @@ func (tm *TaskManager) dispatchFinalResult(ctx context.Context) mq.Result { _ = tm.dag.server.NotifyHandler()(ctx, rs) } tm.dag.taskCleanupCh <- tm.taskID + tm.topic = rs.Topic return rs } diff --git a/dag/task_process.go b/dag/task_process.go index 9b07542..1809ea3 100644 --- a/dag/task_process.go +++ b/dag/task_process.go @@ -9,6 +9,23 @@ import ( "time" ) +func (tm *TaskManager) renderResult(ctx context.Context) mq.Result { + if rs, ok := tm.nodeResults[tm.topic]; ok { + tm.updateTS(&rs) + return rs + } + var rs mq.Result + if len(tm.results) == 1 { + rs = tm.handleResult(ctx, tm.results[0]) + } else { + rs = tm.handleResult(ctx, tm.results) + } + tm.updateTS(&rs) + tm.dag.callbackToConsumer(ctx, rs) + tm.topic = rs.Topic + return rs +} + func (tm *TaskManager) processTask(ctx context.Context, nodeID string, payload json.RawMessage) mq.Result { defer mq.RecoverPanic(mq.RecoverTitle) node, ok := tm.dag.nodes[nodeID] @@ -23,6 +40,10 @@ func (tm *TaskManager) processTask(ctx context.Context, nodeID string, payload j go tm.processNode(ctx, node, payload) }() tm.wg.Wait() + requestType, ok := mq.GetHeader(ctx, "request_type") + if ok && requestType == "render" { + return tm.renderResult(ctx) + } return tm.dispatchFinalResult(ctx) } @@ -46,15 +67,16 @@ func (tm *TaskManager) getConditionalEdges(node *Node, result mq.Result) []Edge } func (tm *TaskManager) handleNextTask(ctx context.Context, result mq.Result) mq.Result { + tm.topic = result.Topic + defer func() { + tm.wg.Done() + mq.RecoverPanic(mq.RecoverTitle) + }() if result.Ctx != nil { if headers, ok := mq.GetHeaders(ctx); ok { ctx = mq.SetHeaders(result.Ctx, headers.AsMap()) } } - defer func() { - tm.wg.Done() - mq.RecoverPanic(mq.RecoverTitle) - }() node, ok := tm.dag.nodes[result.Topic] if !ok { return result @@ -70,6 +92,9 @@ func (tm *TaskManager) handleNextTask(ctx context.Context, result mq.Result) mq. } else { tm.appendResult(result, false) } + if node.Type == Page { + return result + } for _, edge := range edges { switch edge.Type { case Iterator: @@ -88,6 +113,10 @@ func (tm *TaskManager) handleNextTask(ctx context.Context, result mq.Result) mq. }(ctx, target, item) } } + } + } + for _, edge := range edges { + switch edge.Type { case Simple: for _, target := range edge.To { ctx = mq.SetHeaders(ctx, map[string]string{consts.QueueKey: target.Key}) diff --git a/go.mod b/go.mod index e136d46..11082cd 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/oarkflow/dipper v0.0.6 github.com/oarkflow/errors v0.0.6 github.com/oarkflow/expr v0.0.11 + github.com/oarkflow/jet v0.0.4 github.com/oarkflow/json v0.0.13 github.com/oarkflow/xid v1.2.5 github.com/prometheus/client_golang v1.20.5