From c96eef945d66c84f0d46b44c2418276a9fcf4aa2 Mon Sep 17 00:00:00 2001 From: sujit Date: Mon, 18 Nov 2024 20:37:24 +0545 Subject: [PATCH] feat: [wip] - Implement html node --- dag/api.go | 2 +- dag/v2/dag.go | 92 ++++++++++++++++++++++++++++++------ dag/v2/task_manager.go | 1 + examples/tasks/operations.go | 2 +- examples/v2.go | 57 +++++++++------------- examples/webroot/form.html | 4 +- examples/webroot/result.html | 10 ++-- 7 files changed, 110 insertions(+), 58 deletions(-) diff --git a/dag/api.go b/dag/api.go index bc5df57..f37fe4d 100644 --- a/dag/api.go +++ b/dag/api.go @@ -165,7 +165,7 @@ func (tm *DAG) Render(w http.ResponseWriter, r *http.Request) { ctx := mq.SetHeaders(r.Context(), map[string]string{consts.AwaitResponseKey: "true", "request_type": "render"}) ctx = context.WithValue(ctx, "query_params", r.URL.Query()) rs := tm.Process(ctx, nil) - content, err := jsonparser.GetString(rs.Payload, "content") + content, err := jsonparser.GetString(rs.Payload, "html_content") if err != nil { http.Error(w, "Failed to read request body", http.StatusBadRequest) return diff --git a/dag/v2/dag.go b/dag/v2/dag.go index 865f70b..056387b 100644 --- a/dag/v2/dag.go +++ b/dag/v2/dag.go @@ -10,6 +10,7 @@ import ( "github.com/oarkflow/mq" "github.com/oarkflow/mq/consts" + "github.com/oarkflow/mq/jsonparser" "github.com/oarkflow/mq/storage" "github.com/oarkflow/mq/storage/memory" ) @@ -24,7 +25,7 @@ const ( ) type Result struct { - Ctx context.Context + Ctx context.Context `json:"-"` Data json.RawMessage Error error Status TaskStatus @@ -188,6 +189,59 @@ func (tm *DAG) ProcessTask(ctx context.Context, payload []byte) Result { return <-resultCh } +func (tm *DAG) render(w http.ResponseWriter, request *http.Request) { + ctx, data, err := parse(request) + if err != nil { + http.Error(w, err.Error(), http.StatusNotFound) + return + } + result := tm.ProcessTask(ctx, data) + if contentType, ok := result.Ctx.Value(consts.ContentType).(string); ok && contentType == consts.TypeHtml { + w.Header().Set(consts.ContentType, consts.TypeHtml) + data, err := jsonparser.GetString(result.Data, "content") + if err != nil { + return + } + w.Write([]byte(data)) + } +} + +func (tm *DAG) submit(ctx context.Context, payload []byte) Result { + var taskID string + userCtx := UserContext(ctx) + if val := userCtx.Get("task_id"); val != "" { + taskID = val + } else { + taskID = mq.NewID() + } + ctx = context.WithValue(ctx, "task_id", taskID) + userContext := UserContext(ctx) + next := userContext.Get("next") + manager, ok := tm.taskManager.Get(taskID) + resultCh := make(chan Result, 1) + if !ok { + manager = NewTaskManager(tm, resultCh) + tm.taskManager.Set(taskID, manager) + } else { + manager.resultCh = resultCh + } + if next == "true" { + nodes, err := tm.GetNextNodes(manager.currentNode) + if err != nil { + return Result{Error: err} + } + if len(nodes) > 0 { + ctx = context.WithValue(ctx, "initial_node", nodes[0].ID) + } + } + firstNode, err := tm.parseInitialNode(ctx) + if err != nil { + return Result{Error: err} + } + manager.ProcessTask(ctx, taskID, firstNode, payload) + return <-resultCh +} + func (tm *DAG) formHandler(w http.ResponseWriter, r *http.Request) { if r.Method == "GET" { http.ServeFile(w, r, "webroot/form.html") @@ -221,25 +275,35 @@ func (tm *DAG) taskStatusHandler(w http.ResponseWriter, r *http.Request) { http.Error(w, `{"message": "Invalid TaskID"}`, http.StatusNotFound) return } - w.Header().Set("Content-Type", "application/json") + w.Header().Set(consts.ContentType, consts.TypeJson) json.NewEncoder(w).Encode(manager.taskStates) } func (tm *DAG) Start(addr string) { - http.HandleFunc("/", func(w http.ResponseWriter, request *http.Request) { - ctx, data, err := parse(request) - if err != nil { - http.Error(w, err.Error(), http.StatusNotFound) - return - } - result := tm.ProcessTask(ctx, data) - if contentType, ok := result.Ctx.Value(consts.ContentType).(string); ok && contentType == consts.TypeHtml { - w.Header().Set(consts.ContentType, consts.TypeHtml) - w.Write(result.Data) + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + if r.Method == "GET" { + tm.render(w, r) + } else if r.Method == "POST" { + ctx, data, err := parse(r) + if err != nil { + http.Error(w, err.Error(), http.StatusNotFound) + return + } + result := tm.submit(ctx, data) + if result.Ctx == nil { + fmt.Println("Ctrl not set") + return + } + if contentType, ok := result.Ctx.Value(consts.ContentType).(string); ok && contentType == consts.TypeHtml { + w.Header().Set(consts.ContentType, consts.TypeHtml) + data, err := jsonparser.GetString(result.Data, "content") + if err != nil { + return + } + w.Write([]byte(data)) + } } }) - http.HandleFunc("/form", tm.formHandler) - http.HandleFunc("/result", tm.resultHandler) http.HandleFunc("/task-result", tm.taskStatusHandler) http.ListenAndServe(addr, nil) } diff --git a/dag/v2/task_manager.go b/dag/v2/task_manager.go index c647895..1fb7e3b 100644 --- a/dag/v2/task_manager.go +++ b/dag/v2/task_manager.go @@ -200,6 +200,7 @@ func (tm *TaskManager) aggregateResults(parentNode string, taskID string) { } else if state.targetResults.Size() == 1 { state.Result = state.targetResults.Values()[0] } + tm.resultCh <- state.Result tm.processFinalResult(taskID, state) } diff --git a/examples/tasks/operations.go b/examples/tasks/operations.go index ee3d091..bed7779 100644 --- a/examples/tasks/operations.go +++ b/examples/tasks/operations.go @@ -116,7 +116,7 @@ type Final struct { func (e *Final) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { rs := map[string]any{ - "content": `Processed successfully!`, + "html_content": `Processed successfully!`, } bt, _ := json.Marshal(rs) return mq.Result{Payload: bt, Ctx: ctx} diff --git a/examples/v2.go b/examples/v2.go index 80aa1f0..cd44e71 100644 --- a/examples/v2.go +++ b/examples/v2.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "os" "github.com/oarkflow/jet" @@ -12,45 +13,23 @@ import ( ) func Form(ctx context.Context, payload json.RawMessage) v2.Result { - template := ` - - - - - - User Data Form - - -

Enter Your Information

-
-
-

- -
-

- -
-

- - -
- - - -` + bt, err := os.ReadFile("webroot/form.html") + if err != nil { + return v2.Result{Error: err, Ctx: ctx} + } parser := jet.NewWithMemory(jet.WithDelims("{{", "}}")) - rs, err := parser.ParseTemplate(template, map[string]any{ + rs, err := parser.ParseTemplate(string(bt), map[string]any{ "task_id": ctx.Value("task_id"), }) if err != nil { return v2.Result{Error: err, Ctx: ctx} } ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml) - return v2.Result{Data: []byte(rs), Ctx: ctx} + data := map[string]any{ + "content": rs, + } + bt, _ = json.Marshal(data) + return v2.Result{Data: bt, Ctx: ctx} } func NodeA(ctx context.Context, payload json.RawMessage) v2.Result { @@ -84,18 +63,26 @@ func NodeC(ctx context.Context, payload json.RawMessage) v2.Result { } func Result(ctx context.Context, payload json.RawMessage) v2.Result { + bt, err := os.ReadFile("webroot/result.html") + if err != nil { + return v2.Result{Error: err, Ctx: ctx} + } var data map[string]any if err := json.Unmarshal(payload, &data); err != nil { return v2.Result{Error: err, Ctx: ctx} } - if templateFile, ok := data["html_content"].(string); ok { + if bt != nil { parser := jet.NewWithMemory(jet.WithDelims("{{", "}}")) - rs, err := parser.ParseTemplate(templateFile, data) + rs, err := parser.ParseTemplate(string(bt), data) if err != nil { return v2.Result{Error: err, Ctx: ctx} } ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml) - return v2.Result{Data: []byte(rs), Ctx: ctx} + data := map[string]any{ + "content": rs, + } + bt, _ := json.Marshal(data) + return v2.Result{Data: bt, Ctx: ctx} } return v2.Result{Data: payload, Ctx: ctx} } diff --git a/examples/webroot/form.html b/examples/webroot/form.html index 0cfec3c..9e56f0e 100644 --- a/examples/webroot/form.html +++ b/examples/webroot/form.html @@ -7,7 +7,7 @@

Enter Your Information

-
+


@@ -24,4 +24,4 @@
- + \ No newline at end of file diff --git a/examples/webroot/result.html b/examples/webroot/result.html index f425325..6e0d7c7 100644 --- a/examples/webroot/result.html +++ b/examples/webroot/result.html @@ -94,7 +94,7 @@ } // Fetch the task result - const taskID = new URLSearchParams(window.location.search).get('taskID'); // Get taskID from URL + const taskID = new URLSearchParams(window.location.search).get('task_id'); // Get taskID from URL if (taskID) { fetch(`/task-result?taskID=${taskID}`) .then(response => response.json()) @@ -111,13 +111,13 @@ Task ID Status - Timestamp + UpdatedAt Result Data ${taskID} ${data.Result.Status} - ${formatDate(data.Result.Timestamp)} + ${formatDate(data.Result.UpdatedAt)}
${JSON.stringify(data.Result.Result.Data, null, 2)}
@@ -129,7 +129,7 @@ Node ID Status - Timestamp + UpdatedAt Node Result Data `; @@ -140,7 +140,7 @@ ${node.NodeID} ${node.Status} - ${formatDate(node.Timestamp)} + ${formatDate(node.UpdatedAt)}
${JSON.stringify(node.Result.Data, null, 2)}
`;