update: dependencies

This commit is contained in:
Oarkflow
2024-11-22 16:40:21 +05:45
parent 175d1c3e0e
commit 8f2baf35e6
6 changed files with 267 additions and 42 deletions

View File

@@ -5,8 +5,9 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/oarkflow/mq" "github.com/oarkflow/mq"
"log" "github.com/oarkflow/mq/sio"
"net/http" "net/http"
"os"
"strings" "strings"
"github.com/oarkflow/mq/consts" "github.com/oarkflow/mq/consts"
@@ -108,14 +109,43 @@ func (tm *DAG) taskStatusHandler(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(result) json.NewEncoder(w).Encode(result)
} }
func (tm *DAG) Start(addr string) { func (tm *DAG) SetupWS() *sio.Server {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { ws := sio.New(sio.Config{
content := []byte(`<a href="/process">Start</a>`) CheckOrigin: func(r *http.Request) bool { return true },
w.Header().Set(consts.ContentType, consts.TypeHtml) EnableCompression: true,
w.Write(content) })
WsEvents(ws)
tm.Notifier = ws
return ws
}
func (tm *DAG) Handlers() {
http.Handle("/", http.FileServer(http.Dir("webroot")))
http.Handle("/notify", tm.SetupWS())
http.HandleFunc("/process", tm.render)
http.HandleFunc("/request", tm.render)
http.HandleFunc("/task/status", tm.taskStatusHandler)
http.HandleFunc("/dot", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain")
fmt.Fprintln(w, tm.ExportDOT())
})
http.HandleFunc("/ui", func(w http.ResponseWriter, r *http.Request) {
image := fmt.Sprintf("%s.svg", mq.NewID())
err := tm.SaveSVG(image)
if err != nil {
http.Error(w, "Failed to read request body", http.StatusBadRequest)
return
}
defer os.Remove(image)
svgBytes, err := os.ReadFile(image)
if err != nil {
http.Error(w, "Could not read SVG file", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "image/svg+xml")
if _, err := w.Write(svgBytes); err != nil {
http.Error(w, "Could not write SVG response", http.StatusInternalServerError)
return
}
}) })
http.HandleFunc("/process", tm.render)
http.HandleFunc("/task/status", tm.taskStatusHandler)
log.Printf("Server listening on http://%s", addr)
http.ListenAndServe(addr, nil)
} }

View File

@@ -3,9 +3,13 @@ package v2
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/oarkflow/mq/consts"
"github.com/oarkflow/mq/sio" "github.com/oarkflow/mq/sio"
"golang.org/x/time/rate"
"log" "log"
"net/http"
"strings" "strings"
"time"
"github.com/oarkflow/mq" "github.com/oarkflow/mq"
"github.com/oarkflow/mq/storage" "github.com/oarkflow/mq/storage"
@@ -318,3 +322,158 @@ func (tm *DAG) Process(ctx context.Context, payload []byte) mq.Result {
} }
return tm.ProcessTask(ctx, mq.NewTask(taskID, payload, "")) return tm.ProcessTask(ctx, mq.NewTask(taskID, payload, ""))
} }
func (tm *DAG) Validate() error {
report, hasCycle, err := tm.ClassifyEdges()
if hasCycle || err != nil {
tm.Error = err
return err
}
tm.report = report
return nil
}
func (tm *DAG) GetReport() string {
return tm.report
}
func (tm *DAG) AddDAGNode(name string, key string, dag *DAG, firstNode ...bool) *DAG {
dag.AssignTopic(key)
tm.nodes.Set(key, &Node{
Label: name,
ID: key,
processor: dag,
isReady: true,
})
if len(firstNode) > 0 && firstNode[0] {
tm.startNode = key
}
return tm
}
func (tm *DAG) Start(ctx context.Context, addr string) error {
// Start the server in a separate goroutine
go func() {
defer mq.RecoverPanic(mq.RecoverTitle)
if err := tm.server.Start(ctx); err != nil {
panic(err)
}
}()
// Start the node consumers if not in sync mode
if !tm.server.SyncMode() {
tm.nodes.ForEach(func(_ string, con *Node) bool {
go func(con *Node) {
defer mq.RecoverPanic(mq.RecoverTitle)
limiter := rate.NewLimiter(rate.Every(1*time.Second), 1) // Retry every second
for {
err := con.processor.Consume(ctx)
if err != nil {
log.Printf("[ERROR] - Consumer %s failed to start: %v", con.ID, err)
} else {
log.Printf("[INFO] - Consumer %s started successfully", con.ID)
break
}
limiter.Wait(ctx) // Wait with rate limiting before retrying
}
}(con)
return true
})
}
log.Printf("DAG - HTTP_SERVER ~> started on http://%s", addr)
tm.Handlers()
config := tm.server.TLSConfig()
log.Printf("Server listening on http://%s", addr)
if config.UseTLS {
return http.ListenAndServeTLS(addr, config.CertPath, config.KeyPath, nil)
}
return http.ListenAndServe(addr, nil)
}
func (tm *DAG) ScheduleTask(ctx context.Context, payload []byte, opts ...mq.SchedulerOption) mq.Result {
var taskID string
userCtx := UserContext(ctx)
if val := userCtx.Get("task_id"); val != "" {
taskID = val
} else {
taskID = mq.NewID()
}
t := mq.NewTask(taskID, payload, "")
ctx = context.WithValue(ctx, "task_id", taskID)
userContext := UserContext(ctx)
next := userContext.Get("next")
manager, ok := tm.taskManager.Get(taskID)
resultCh := make(chan mq.Result, 1)
if !ok {
manager = NewTaskManager(tm, taskID, resultCh, tm.iteratorNodes.Clone())
tm.taskManager.Set(taskID, manager)
} else {
manager.resultCh = resultCh
}
currentNode := strings.Split(manager.currentNode, Delimiter)[0]
node, exists := tm.nodes.Get(currentNode)
method, ok := ctx.Value("method").(string)
if method == "GET" && exists && node.NodeType == Page {
ctx = context.WithValue(ctx, "initial_node", currentNode)
} else if next == "true" {
nodes, err := tm.GetNextNodes(currentNode)
if err != nil {
return mq.Result{Error: err, Ctx: ctx}
}
if len(nodes) > 0 {
ctx = context.WithValue(ctx, "initial_node", nodes[0].ID)
}
}
firstNode, err := tm.parseInitialNode(ctx)
if err != nil {
return mq.Result{Error: err, Ctx: ctx}
}
node, ok = tm.nodes.Get(firstNode)
if ok && node.NodeType != Page && t.Payload == nil {
return mq.Result{Error: fmt.Errorf("payload is required for node %s", firstNode), Ctx: ctx}
}
t.Topic = firstNode
ctx = context.WithValue(ctx, ContextIndex, "0")
headers, ok := mq.GetHeaders(ctx)
ctxx := context.Background()
if ok {
ctxx = mq.SetHeaders(ctxx, headers.AsMap())
}
tm.pool.Scheduler().AddTask(ctxx, t, opts...)
return mq.Result{CreatedAt: t.CreatedAt, TaskID: t.ID, Topic: t.Topic, Status: "PENDING"}
}
func (tm *DAG) PauseConsumer(ctx context.Context, id string) {
tm.doConsumer(ctx, id, consts.CONSUMER_PAUSE)
}
func (tm *DAG) ResumeConsumer(ctx context.Context, id string) {
tm.doConsumer(ctx, id, consts.CONSUMER_RESUME)
}
func (tm *DAG) doConsumer(ctx context.Context, id string, action consts.CMD) {
if node, ok := tm.nodes.Get(id); ok {
switch action {
case consts.CONSUMER_PAUSE:
err := node.processor.Pause(ctx)
if err == nil {
node.isReady = false
log.Printf("[INFO] - Consumer %s paused successfully", node.ID)
} else {
log.Printf("[ERROR] - Failed to pause consumer %s: %v", node.ID, err)
}
case consts.CONSUMER_RESUME:
err := node.processor.Resume(ctx)
if err == nil {
node.isReady = true
log.Printf("[INFO] - Consumer %s resumed successfully", node.ID)
} else {
log.Printf("[ERROR] - Failed to resume consumer %s: %v", node.ID, err)
}
}
} else {
log.Printf("[WARNING] - Consumer %s not found", id)
}
}

32
dag/v2/websocket.go Normal file
View File

@@ -0,0 +1,32 @@
package v2
import (
"encoding/json"
"github.com/oarkflow/mq/sio"
)
func WsEvents(s *sio.Server) {
s.On("join", join)
s.On("message", message)
}
func join(s *sio.Socket, data []byte) {
//just one room at a time for the simple example
currentRooms := s.GetRooms()
for _, room := range currentRooms {
s.Leave(room)
}
s.Join(string(data))
s.Emit("joinedRoom", string(data))
}
type msg struct {
Room string
Message string
}
func message(s *sio.Socket, data []byte) {
var m msg
json.Unmarshal(data, &m)
s.ToRoom(m.Room, "message", m.Message)
}

View File

@@ -4,14 +4,16 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
v2 "github.com/oarkflow/mq/dag/v2"
"github.com/oarkflow/mq" "github.com/oarkflow/mq"
"github.com/oarkflow/mq/dag"
"github.com/oarkflow/mq/examples/tasks" "github.com/oarkflow/mq/examples/tasks"
) )
func main() { func main() {
f := dag.NewDAG("Sample DAG", "sample-dag", mq.WithSyncMode(true)) f := v2.NewDAG("Sample DAG", "sample-dag", func(taskID string, result mq.Result) {
fmt.Printf("Final result for task %s: %s\n", taskID, string(result.Payload))
}, mq.WithSyncMode(true))
f.SetNotifyResponse(func(ctx context.Context, result mq.Result) error { f.SetNotifyResponse(func(ctx context.Context, result mq.Result) error {
if f.Notifier != nil { if f.Notifier != nil {
f.Notifier.ToRoom("global", "final-message", result) f.Notifier.ToRoom("global", "final-message", result)
@@ -27,33 +29,35 @@ func main() {
sendData(f) sendData(f)
} }
func subDAG() *dag.DAG { func subDAG() *v2.DAG {
f := dag.NewDAG("Sub DAG", "sub-dag", mq.WithSyncMode(true)) f := v2.NewDAG("Sub DAG", "sub-dag", func(taskID string, result mq.Result) {
fmt.Printf("Final result for task %s: %s\n", taskID, string(result.Payload))
}, mq.WithSyncMode(true))
f. f.
AddNode("Store data", "store:data", &tasks.StoreData{Operation: dag.Operation{Type: "process"}}, true). AddNode(v2.Function, "Store data", "store:data", &tasks.StoreData{Operation: v2.Operation{Type: "process"}}, true).
AddNode("Send SMS", "send:sms", &tasks.SendSms{Operation: dag.Operation{Type: "process"}}). AddNode(v2.Function, "Send SMS", "send:sms", &tasks.SendSms{Operation: v2.Operation{Type: "process"}}).
AddNode("Notification", "notification", &tasks.InAppNotification{Operation: dag.Operation{Type: "process"}}). AddNode(v2.Function, "Notification", "notification", &tasks.InAppNotification{Operation: v2.Operation{Type: "process"}}).
AddEdge("Store Payload to send sms", "store:data", "send:sms"). AddEdge(v2.Simple, "Store Payload to send sms", "store:data", "send:sms").
AddEdge("Store Payload to notification", "send:sms", "notification") AddEdge(v2.Simple, "Store Payload to notification", "send:sms", "notification")
return f return f
} }
func setup(f *dag.DAG) { func setup(f *v2.DAG) {
f. f.
AddNode("Email Delivery", "email:deliver", &tasks.EmailDelivery{Operation: dag.Operation{Type: "process"}}). AddNode(v2.Function, "Email Delivery", "email:deliver", &tasks.EmailDelivery{Operation: v2.Operation{Type: "process"}}).
AddNode("Prepare Email", "prepare:email", &tasks.PrepareEmail{Operation: dag.Operation{Type: "process"}}). AddNode(v2.Function, "Prepare Email", "prepare:email", &tasks.PrepareEmail{Operation: v2.Operation{Type: "process"}}).
AddNode("Get Input", "get:input", &tasks.GetData{Operation: dag.Operation{Type: "input"}}, true). AddNode(v2.Function, "Get Input", "get:input", &tasks.GetData{Operation: v2.Operation{Type: "input"}}, true).
AddNode("Final Payload", "final", &tasks.Final{Operation: dag.Operation{Type: "page"}}). AddNode(v2.Function, "Final Payload", "final", &tasks.Final{Operation: v2.Operation{Type: "page"}}).
AddNode("Iterator Processor", "loop", &tasks.Loop{Operation: dag.Operation{Type: "loop"}}). AddNode(v2.Function, "Iterator Processor", "loop", &tasks.Loop{Operation: v2.Operation{Type: "loop"}}).
AddNode("Condition", "condition", &tasks.Condition{Operation: dag.Operation{Type: "condition"}}). AddNode(v2.Function, "Condition", "condition", &tasks.Condition{Operation: v2.Operation{Type: "condition"}}).
AddDAGNode("Persistent", "persistent", subDAG()). AddDAGNode("Persistent", "persistent", subDAG()).
AddEdge("Get input to loop", "get:input", "loop"). AddEdge(v2.Simple, "Get input to loop", "get:input", "loop").
AddIterator("Loop to prepare email", "loop", "prepare:email"). AddEdge(v2.Iterator, "Loop to prepare email", "loop", "prepare:email").
AddEdge("Prepare Email to condition", "prepare:email", "condition"). AddEdge(v2.Simple, "Prepare Email to condition", "prepare:email", "condition").
AddCondition("condition", map[dag.When]dag.Then{"pass": "email:deliver", "fail": "persistent"}) AddCondition("condition", map[string]string{"pass": "email:deliver", "fail": "persistent"})
} }
func sendData(f *dag.DAG) { func sendData(f *v2.DAG) {
data := []map[string]any{ data := []map[string]any{
{"phone": "+123456789", "email": "abc.xyz@gmail.com"}, {"phone": "+98765412", "email": "xyz.abc@gmail.com"}, {"phone": "+123456789", "email": "abc.xyz@gmail.com"}, {"phone": "+98765412", "email": "xyz.abc@gmail.com"},
} }

View File

@@ -2,15 +2,15 @@ package tasks
import ( import (
"context" "context"
v2 "github.com/oarkflow/mq/dag/v2"
"github.com/oarkflow/json" "github.com/oarkflow/json"
"github.com/oarkflow/mq" "github.com/oarkflow/mq"
"github.com/oarkflow/mq/dag"
) )
type GetData struct { type GetData struct {
dag.Operation v2.Operation
} }
func (e *GetData) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { func (e *GetData) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
@@ -18,7 +18,7 @@ func (e *GetData) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
} }
type Loop struct { type Loop struct {
dag.Operation v2.Operation
} }
func (e *Loop) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { func (e *Loop) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
@@ -26,7 +26,7 @@ func (e *Loop) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
} }
type Condition struct { type Condition struct {
dag.Operation v2.Operation
} }
func (e *Condition) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { func (e *Condition) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
@@ -47,7 +47,7 @@ func (e *Condition) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
} }
type PrepareEmail struct { type PrepareEmail struct {
dag.Operation v2.Operation
} }
func (e *PrepareEmail) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { func (e *PrepareEmail) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
@@ -62,7 +62,7 @@ func (e *PrepareEmail) ProcessTask(ctx context.Context, task *mq.Task) mq.Result
} }
type EmailDelivery struct { type EmailDelivery struct {
dag.Operation v2.Operation
} }
func (e *EmailDelivery) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { func (e *EmailDelivery) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
@@ -77,7 +77,7 @@ func (e *EmailDelivery) ProcessTask(ctx context.Context, task *mq.Task) mq.Resul
} }
type SendSms struct { type SendSms struct {
dag.Operation v2.Operation
} }
func (e *SendSms) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { func (e *SendSms) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
@@ -90,7 +90,7 @@ func (e *SendSms) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
} }
type StoreData struct { type StoreData struct {
dag.Operation v2.Operation
} }
func (e *StoreData) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { func (e *StoreData) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
@@ -98,7 +98,7 @@ func (e *StoreData) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
} }
type InAppNotification struct { type InAppNotification struct {
dag.Operation v2.Operation
} }
func (e *InAppNotification) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { func (e *InAppNotification) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
@@ -111,7 +111,7 @@ func (e *InAppNotification) ProcessTask(ctx context.Context, task *mq.Task) mq.R
} }
type Final struct { type Final struct {
dag.Operation v2.Operation
} }
func (e *Final) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { func (e *Final) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {

View File

@@ -128,5 +128,5 @@ func main() {
if flow.Error != nil { if flow.Error != nil {
panic(flow.Error) panic(flow.Error)
} }
flow.Start("0.0.0.0:8080") flow.Start(context.Background(), "0.0.0.0:8082")
} }