diff --git a/dag/v2/api.go b/dag/v2/api.go index db46702..ac02df3 100644 --- a/dag/v2/api.go +++ b/dag/v2/api.go @@ -5,8 +5,9 @@ import ( "encoding/json" "fmt" "github.com/oarkflow/mq" - "log" + "github.com/oarkflow/mq/sio" "net/http" + "os" "strings" "github.com/oarkflow/mq/consts" @@ -108,14 +109,43 @@ func (tm *DAG) taskStatusHandler(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(result) } -func (tm *DAG) Start(addr string) { - http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - content := []byte(`Start`) - w.Header().Set(consts.ContentType, consts.TypeHtml) - w.Write(content) +func (tm *DAG) SetupWS() *sio.Server { + ws := sio.New(sio.Config{ + CheckOrigin: func(r *http.Request) bool { return true }, + EnableCompression: true, + }) + WsEvents(ws) + tm.Notifier = ws + return ws +} + +func (tm *DAG) Handlers() { + http.Handle("/", http.FileServer(http.Dir("webroot"))) + http.Handle("/notify", tm.SetupWS()) + http.HandleFunc("/process", tm.render) + http.HandleFunc("/request", tm.render) + http.HandleFunc("/task/status", tm.taskStatusHandler) + http.HandleFunc("/dot", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/plain") + fmt.Fprintln(w, tm.ExportDOT()) + }) + http.HandleFunc("/ui", func(w http.ResponseWriter, r *http.Request) { + image := fmt.Sprintf("%s.svg", mq.NewID()) + err := tm.SaveSVG(image) + if err != nil { + http.Error(w, "Failed to read request body", http.StatusBadRequest) + return + } + defer os.Remove(image) + svgBytes, err := os.ReadFile(image) + if err != nil { + http.Error(w, "Could not read SVG file", http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "image/svg+xml") + if _, err := w.Write(svgBytes); err != nil { + http.Error(w, "Could not write SVG response", http.StatusInternalServerError) + return + } }) - http.HandleFunc("/process", tm.render) - http.HandleFunc("/task/status", tm.taskStatusHandler) - log.Printf("Server listening on http://%s", addr) - http.ListenAndServe(addr, nil) } diff --git a/dag/v2/dag.go b/dag/v2/dag.go index e48e209..552d5fe 100644 --- a/dag/v2/dag.go +++ b/dag/v2/dag.go @@ -3,9 +3,13 @@ package v2 import ( "context" "fmt" + "github.com/oarkflow/mq/consts" "github.com/oarkflow/mq/sio" + "golang.org/x/time/rate" "log" + "net/http" "strings" + "time" "github.com/oarkflow/mq" "github.com/oarkflow/mq/storage" @@ -318,3 +322,158 @@ func (tm *DAG) Process(ctx context.Context, payload []byte) mq.Result { } return tm.ProcessTask(ctx, mq.NewTask(taskID, payload, "")) } + +func (tm *DAG) Validate() error { + report, hasCycle, err := tm.ClassifyEdges() + if hasCycle || err != nil { + tm.Error = err + return err + } + tm.report = report + return nil +} + +func (tm *DAG) GetReport() string { + return tm.report +} + +func (tm *DAG) AddDAGNode(name string, key string, dag *DAG, firstNode ...bool) *DAG { + dag.AssignTopic(key) + tm.nodes.Set(key, &Node{ + Label: name, + ID: key, + processor: dag, + isReady: true, + }) + if len(firstNode) > 0 && firstNode[0] { + tm.startNode = key + } + return tm +} + +func (tm *DAG) Start(ctx context.Context, addr string) error { + // Start the server in a separate goroutine + go func() { + defer mq.RecoverPanic(mq.RecoverTitle) + if err := tm.server.Start(ctx); err != nil { + panic(err) + } + }() + + // Start the node consumers if not in sync mode + if !tm.server.SyncMode() { + tm.nodes.ForEach(func(_ string, con *Node) bool { + go func(con *Node) { + defer mq.RecoverPanic(mq.RecoverTitle) + limiter := rate.NewLimiter(rate.Every(1*time.Second), 1) // Retry every second + for { + err := con.processor.Consume(ctx) + if err != nil { + log.Printf("[ERROR] - Consumer %s failed to start: %v", con.ID, err) + } else { + log.Printf("[INFO] - Consumer %s started successfully", con.ID) + break + } + limiter.Wait(ctx) // Wait with rate limiting before retrying + } + }(con) + return true + }) + } + log.Printf("DAG - HTTP_SERVER ~> started on http://%s", addr) + tm.Handlers() + config := tm.server.TLSConfig() + log.Printf("Server listening on http://%s", addr) + if config.UseTLS { + return http.ListenAndServeTLS(addr, config.CertPath, config.KeyPath, nil) + } + return http.ListenAndServe(addr, nil) +} + +func (tm *DAG) ScheduleTask(ctx context.Context, payload []byte, opts ...mq.SchedulerOption) mq.Result { + var taskID string + userCtx := UserContext(ctx) + if val := userCtx.Get("task_id"); val != "" { + taskID = val + } else { + taskID = mq.NewID() + } + t := mq.NewTask(taskID, payload, "") + + ctx = context.WithValue(ctx, "task_id", taskID) + userContext := UserContext(ctx) + next := userContext.Get("next") + manager, ok := tm.taskManager.Get(taskID) + resultCh := make(chan mq.Result, 1) + if !ok { + manager = NewTaskManager(tm, taskID, resultCh, tm.iteratorNodes.Clone()) + tm.taskManager.Set(taskID, manager) + } else { + manager.resultCh = resultCh + } + currentNode := strings.Split(manager.currentNode, Delimiter)[0] + node, exists := tm.nodes.Get(currentNode) + method, ok := ctx.Value("method").(string) + if method == "GET" && exists && node.NodeType == Page { + ctx = context.WithValue(ctx, "initial_node", currentNode) + } else if next == "true" { + nodes, err := tm.GetNextNodes(currentNode) + if err != nil { + return mq.Result{Error: err, Ctx: ctx} + } + if len(nodes) > 0 { + ctx = context.WithValue(ctx, "initial_node", nodes[0].ID) + } + } + firstNode, err := tm.parseInitialNode(ctx) + if err != nil { + return mq.Result{Error: err, Ctx: ctx} + } + node, ok = tm.nodes.Get(firstNode) + if ok && node.NodeType != Page && t.Payload == nil { + return mq.Result{Error: fmt.Errorf("payload is required for node %s", firstNode), Ctx: ctx} + } + t.Topic = firstNode + ctx = context.WithValue(ctx, ContextIndex, "0") + + headers, ok := mq.GetHeaders(ctx) + ctxx := context.Background() + if ok { + ctxx = mq.SetHeaders(ctxx, headers.AsMap()) + } + tm.pool.Scheduler().AddTask(ctxx, t, opts...) + return mq.Result{CreatedAt: t.CreatedAt, TaskID: t.ID, Topic: t.Topic, Status: "PENDING"} +} + +func (tm *DAG) PauseConsumer(ctx context.Context, id string) { + tm.doConsumer(ctx, id, consts.CONSUMER_PAUSE) +} + +func (tm *DAG) ResumeConsumer(ctx context.Context, id string) { + tm.doConsumer(ctx, id, consts.CONSUMER_RESUME) +} + +func (tm *DAG) doConsumer(ctx context.Context, id string, action consts.CMD) { + if node, ok := tm.nodes.Get(id); ok { + switch action { + case consts.CONSUMER_PAUSE: + err := node.processor.Pause(ctx) + if err == nil { + node.isReady = false + log.Printf("[INFO] - Consumer %s paused successfully", node.ID) + } else { + log.Printf("[ERROR] - Failed to pause consumer %s: %v", node.ID, err) + } + case consts.CONSUMER_RESUME: + err := node.processor.Resume(ctx) + if err == nil { + node.isReady = true + log.Printf("[INFO] - Consumer %s resumed successfully", node.ID) + } else { + log.Printf("[ERROR] - Failed to resume consumer %s: %v", node.ID, err) + } + } + } else { + log.Printf("[WARNING] - Consumer %s not found", id) + } +} diff --git a/dag/v2/websocket.go b/dag/v2/websocket.go new file mode 100644 index 0000000..0f0534a --- /dev/null +++ b/dag/v2/websocket.go @@ -0,0 +1,32 @@ +package v2 + +import ( + "encoding/json" + "github.com/oarkflow/mq/sio" +) + +func WsEvents(s *sio.Server) { + s.On("join", join) + s.On("message", message) +} + +func join(s *sio.Socket, data []byte) { + //just one room at a time for the simple example + currentRooms := s.GetRooms() + for _, room := range currentRooms { + s.Leave(room) + } + s.Join(string(data)) + s.Emit("joinedRoom", string(data)) +} + +type msg struct { + Room string + Message string +} + +func message(s *sio.Socket, data []byte) { + var m msg + json.Unmarshal(data, &m) + s.ToRoom(m.Room, "message", m.Message) +} diff --git a/examples/dag.go b/examples/dag.go index aaa2afd..9a30d8e 100644 --- a/examples/dag.go +++ b/examples/dag.go @@ -4,14 +4,16 @@ import ( "context" "encoding/json" "fmt" + v2 "github.com/oarkflow/mq/dag/v2" "github.com/oarkflow/mq" - "github.com/oarkflow/mq/dag" "github.com/oarkflow/mq/examples/tasks" ) func main() { - f := dag.NewDAG("Sample DAG", "sample-dag", mq.WithSyncMode(true)) + f := v2.NewDAG("Sample DAG", "sample-dag", func(taskID string, result mq.Result) { + fmt.Printf("Final result for task %s: %s\n", taskID, string(result.Payload)) + }, mq.WithSyncMode(true)) f.SetNotifyResponse(func(ctx context.Context, result mq.Result) error { if f.Notifier != nil { f.Notifier.ToRoom("global", "final-message", result) @@ -27,33 +29,35 @@ func main() { sendData(f) } -func subDAG() *dag.DAG { - f := dag.NewDAG("Sub DAG", "sub-dag", mq.WithSyncMode(true)) +func subDAG() *v2.DAG { + f := v2.NewDAG("Sub DAG", "sub-dag", func(taskID string, result mq.Result) { + fmt.Printf("Final result for task %s: %s\n", taskID, string(result.Payload)) + }, mq.WithSyncMode(true)) f. - AddNode("Store data", "store:data", &tasks.StoreData{Operation: dag.Operation{Type: "process"}}, true). - AddNode("Send SMS", "send:sms", &tasks.SendSms{Operation: dag.Operation{Type: "process"}}). - AddNode("Notification", "notification", &tasks.InAppNotification{Operation: dag.Operation{Type: "process"}}). - AddEdge("Store Payload to send sms", "store:data", "send:sms"). - AddEdge("Store Payload to notification", "send:sms", "notification") + AddNode(v2.Function, "Store data", "store:data", &tasks.StoreData{Operation: v2.Operation{Type: "process"}}, true). + AddNode(v2.Function, "Send SMS", "send:sms", &tasks.SendSms{Operation: v2.Operation{Type: "process"}}). + AddNode(v2.Function, "Notification", "notification", &tasks.InAppNotification{Operation: v2.Operation{Type: "process"}}). + AddEdge(v2.Simple, "Store Payload to send sms", "store:data", "send:sms"). + AddEdge(v2.Simple, "Store Payload to notification", "send:sms", "notification") return f } -func setup(f *dag.DAG) { +func setup(f *v2.DAG) { f. - AddNode("Email Delivery", "email:deliver", &tasks.EmailDelivery{Operation: dag.Operation{Type: "process"}}). - AddNode("Prepare Email", "prepare:email", &tasks.PrepareEmail{Operation: dag.Operation{Type: "process"}}). - AddNode("Get Input", "get:input", &tasks.GetData{Operation: dag.Operation{Type: "input"}}, true). - AddNode("Final Payload", "final", &tasks.Final{Operation: dag.Operation{Type: "page"}}). - AddNode("Iterator Processor", "loop", &tasks.Loop{Operation: dag.Operation{Type: "loop"}}). - AddNode("Condition", "condition", &tasks.Condition{Operation: dag.Operation{Type: "condition"}}). + AddNode(v2.Function, "Email Delivery", "email:deliver", &tasks.EmailDelivery{Operation: v2.Operation{Type: "process"}}). + AddNode(v2.Function, "Prepare Email", "prepare:email", &tasks.PrepareEmail{Operation: v2.Operation{Type: "process"}}). + AddNode(v2.Function, "Get Input", "get:input", &tasks.GetData{Operation: v2.Operation{Type: "input"}}, true). + AddNode(v2.Function, "Final Payload", "final", &tasks.Final{Operation: v2.Operation{Type: "page"}}). + AddNode(v2.Function, "Iterator Processor", "loop", &tasks.Loop{Operation: v2.Operation{Type: "loop"}}). + AddNode(v2.Function, "Condition", "condition", &tasks.Condition{Operation: v2.Operation{Type: "condition"}}). AddDAGNode("Persistent", "persistent", subDAG()). - AddEdge("Get input to loop", "get:input", "loop"). - AddIterator("Loop to prepare email", "loop", "prepare:email"). - AddEdge("Prepare Email to condition", "prepare:email", "condition"). - AddCondition("condition", map[dag.When]dag.Then{"pass": "email:deliver", "fail": "persistent"}) + AddEdge(v2.Simple, "Get input to loop", "get:input", "loop"). + AddEdge(v2.Iterator, "Loop to prepare email", "loop", "prepare:email"). + AddEdge(v2.Simple, "Prepare Email to condition", "prepare:email", "condition"). + AddCondition("condition", map[string]string{"pass": "email:deliver", "fail": "persistent"}) } -func sendData(f *dag.DAG) { +func sendData(f *v2.DAG) { data := []map[string]any{ {"phone": "+123456789", "email": "abc.xyz@gmail.com"}, {"phone": "+98765412", "email": "xyz.abc@gmail.com"}, } diff --git a/examples/tasks/operations.go b/examples/tasks/operations.go index bed7779..62e0d6b 100644 --- a/examples/tasks/operations.go +++ b/examples/tasks/operations.go @@ -2,15 +2,15 @@ package tasks import ( "context" + v2 "github.com/oarkflow/mq/dag/v2" "github.com/oarkflow/json" "github.com/oarkflow/mq" - "github.com/oarkflow/mq/dag" ) type GetData struct { - dag.Operation + v2.Operation } func (e *GetData) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { @@ -18,7 +18,7 @@ func (e *GetData) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { } type Loop struct { - dag.Operation + v2.Operation } func (e *Loop) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { @@ -26,7 +26,7 @@ func (e *Loop) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { } type Condition struct { - dag.Operation + v2.Operation } func (e *Condition) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { @@ -47,7 +47,7 @@ func (e *Condition) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { } type PrepareEmail struct { - dag.Operation + v2.Operation } func (e *PrepareEmail) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { @@ -62,7 +62,7 @@ func (e *PrepareEmail) ProcessTask(ctx context.Context, task *mq.Task) mq.Result } type EmailDelivery struct { - dag.Operation + v2.Operation } func (e *EmailDelivery) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { @@ -77,7 +77,7 @@ func (e *EmailDelivery) ProcessTask(ctx context.Context, task *mq.Task) mq.Resul } type SendSms struct { - dag.Operation + v2.Operation } func (e *SendSms) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { @@ -90,7 +90,7 @@ func (e *SendSms) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { } type StoreData struct { - dag.Operation + v2.Operation } func (e *StoreData) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { @@ -98,7 +98,7 @@ func (e *StoreData) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { } type InAppNotification struct { - dag.Operation + v2.Operation } func (e *InAppNotification) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { @@ -111,7 +111,7 @@ func (e *InAppNotification) ProcessTask(ctx context.Context, task *mq.Task) mq.R } type Final struct { - dag.Operation + v2.Operation } func (e *Final) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { diff --git a/examples/v2.go b/examples/v2.go index e4b5d15..43b2ec1 100644 --- a/examples/v2.go +++ b/examples/v2.go @@ -128,5 +128,5 @@ func main() { if flow.Error != nil { panic(flow.Error) } - flow.Start("0.0.0.0:8080") + flow.Start(context.Background(), "0.0.0.0:8082") }