From b292152663d17c54fafb7aa2bd301bcc0c4dcf1e Mon Sep 17 00:00:00 2001 From: sujit Date: Mon, 18 Nov 2024 22:05:20 +0545 Subject: [PATCH] feat: [wip] - Implement html node --- dag/v2/api.go | 74 +++++++++++++++ dag/v2/context.go | 81 ++++++++++++++++ dag/v2/dag.go | 175 ----------------------------------- examples/webroot/form.html | 2 +- examples/webroot/result.html | 2 +- 5 files changed, 157 insertions(+), 177 deletions(-) create mode 100644 dag/v2/api.go create mode 100644 dag/v2/context.go diff --git a/dag/v2/api.go b/dag/v2/api.go new file mode 100644 index 0000000..976cc85 --- /dev/null +++ b/dag/v2/api.go @@ -0,0 +1,74 @@ +package v2 + +import ( + "encoding/json" + "net/http" + + "github.com/oarkflow/mq/consts" + "github.com/oarkflow/mq/jsonparser" +) + +func (tm *DAG) render(w http.ResponseWriter, request *http.Request) { + ctx, data, err := parse(request) + if err != nil { + http.Error(w, err.Error(), http.StatusNotFound) + return + } + result := tm.ProcessTask(ctx, data) + contentType, ok := result.Ctx.Value(consts.ContentType).(string) + if !ok { + contentType = consts.TypeJson + } + switch contentType { + case consts.TypeHtml: + w.Header().Set(consts.ContentType, consts.TypeHtml) + data, err := jsonparser.GetString(result.Data, "html_content") + if err != nil { + return + } + w.Write([]byte(data)) + default: + if request.Method != "POST" { + http.Error(w, `{"message": "not allowed"}`, http.StatusMethodNotAllowed) + return + } + w.Header().Set(consts.ContentType, consts.TypeJson) + json.NewEncoder(w).Encode(result.Data) + } +} + +func (tm *DAG) taskStatusHandler(w http.ResponseWriter, r *http.Request) { + taskID := r.URL.Query().Get("taskID") + if taskID == "" { + http.Error(w, `{"message": "taskID is missing"}`, http.StatusBadRequest) + return + } + manager, ok := tm.taskManager.Get(taskID) + if !ok { + http.Error(w, `{"message": "Invalid TaskID"}`, http.StatusNotFound) + return + } + result := make(map[string]TaskState) + for key, value := range manager.taskStates { + rs := jsonparser.Delete(value.Result.Data, "html_content") + state := TaskState{ + NodeID: value.NodeID, + Status: value.Status, + UpdatedAt: value.UpdatedAt, + Result: Result{ + Data: rs, + Error: value.Result.Error, + Status: value.Result.Status, + }, + } + result[key] = state + } + w.Header().Set(consts.ContentType, consts.TypeJson) + json.NewEncoder(w).Encode(result) +} + +func (tm *DAG) Start(addr string) { + http.HandleFunc("/process", tm.render) + http.HandleFunc("/task/status", tm.taskStatusHandler) + http.ListenAndServe(addr, nil) +} diff --git a/dag/v2/context.go b/dag/v2/context.go new file mode 100644 index 0000000..938955f --- /dev/null +++ b/dag/v2/context.go @@ -0,0 +1,81 @@ +package v2 + +import ( + "context" + "encoding/json" + "io" + "net/http" +) + +type Context struct { + Query map[string]any +} + +func (ctx *Context) Get(key string) string { + if val, ok := ctx.Query[key]; ok { + switch val := val.(type) { + case []string: + return val[0] + case string: + return val + } + } + return "" +} + +func parse(r *http.Request) (context.Context, []byte, error) { + ctx := r.Context() + userContext := &Context{Query: make(map[string]any)} + result := make(map[string]any) + queryParams := r.URL.Query() + for key, values := range queryParams { + if len(values) > 1 { + userContext.Query[key] = values // Handle multiple values + } else { + userContext.Query[key] = values[0] // Single value + } + } + ctx = context.WithValue(ctx, "UserContext", userContext) + contentType := r.Header.Get("Content-Type") + switch { + case contentType == "application/json": + body, err := io.ReadAll(r.Body) + if err != nil { + return ctx, nil, err + } + defer r.Body.Close() + if body == nil { + return ctx, nil, nil + } + if err := json.Unmarshal(body, &result); err != nil { + return ctx, nil, err + } + + case contentType == "application/x-www-form-urlencoded": + if err := r.ParseForm(); err != nil { + return ctx, nil, err + } + result = make(map[string]any) + for key, values := range r.PostForm { + if len(values) > 1 { + result[key] = values + } else { + result[key] = values[0] + } + } + default: + return ctx, nil, nil + } + bt, err := json.Marshal(result) + if err != nil { + return ctx, nil, err + } + return ctx, bt, err +} + +func UserContext(ctx context.Context) *Context { + if userContext, ok := ctx.Value("UserContext").(*Context); ok { + return userContext + } + return &Context{Query: make(map[string]any)} +} diff --git a/dag/v2/dag.go b/dag/v2/dag.go index 28b7786..381cb13 100644 --- a/dag/v2/dag.go +++ b/dag/v2/dag.go @@ -4,13 +4,9 @@ import ( "context" "encoding/json" "fmt" - "io" - "net/http" "sync" "github.com/oarkflow/mq" - "github.com/oarkflow/mq/consts" - "github.com/oarkflow/mq/jsonparser" "github.com/oarkflow/mq/storage" "github.com/oarkflow/mq/storage/memory" ) @@ -170,43 +166,6 @@ func (tm *DAG) GetPreviousNodes(key string) ([]*Node, error) { } func (tm *DAG) ProcessTask(ctx context.Context, payload []byte) Result { - var taskID string - userCtx := UserContext(ctx) - if val := userCtx.Get("task_id"); val != "" { - taskID = val - } else { - taskID = mq.NewID() - } - ctx = context.WithValue(ctx, "task_id", taskID) - resultCh := make(chan Result, 1) - manager := NewTaskManager(tm, resultCh) - tm.taskManager.Set(taskID, manager) - firstNode, err := tm.parseInitialNode(ctx) - if err != nil { - return Result{Error: err} - } - manager.ProcessTask(ctx, taskID, firstNode, payload) - return <-resultCh -} - -func (tm *DAG) render(w http.ResponseWriter, request *http.Request) { - ctx, data, err := parse(request) - if err != nil { - http.Error(w, err.Error(), http.StatusNotFound) - return - } - result := tm.ProcessTask(ctx, data) - if contentType, ok := result.Ctx.Value(consts.ContentType).(string); ok && contentType == consts.TypeHtml { - w.Header().Set(consts.ContentType, consts.TypeHtml) - data, err := jsonparser.GetString(result.Data, "html_content") - if err != nil { - return - } - w.Write([]byte(data)) - } -} - -func (tm *DAG) submit(ctx context.Context, payload []byte) Result { var taskID string userCtx := UserContext(ctx) if val := userCtx.Get("task_id"); val != "" { @@ -241,137 +200,3 @@ func (tm *DAG) submit(ctx context.Context, payload []byte) Result { manager.ProcessTask(ctx, taskID, firstNode, payload) return <-resultCh } - -func (tm *DAG) taskRender(w http.ResponseWriter, r *http.Request) { - if r.Method == "GET" { - tm.render(w, r) - } else if r.Method == "POST" { - ctx, data, err := parse(r) - if err != nil { - http.Error(w, err.Error(), http.StatusNotFound) - return - } - result := tm.submit(ctx, data) - if result.Ctx == nil { - fmt.Println("Ctrl not set") - return - } - if contentType, ok := result.Ctx.Value(consts.ContentType).(string); ok && contentType == consts.TypeHtml { - w.Header().Set(consts.ContentType, consts.TypeHtml) - data, err := jsonparser.GetString(result.Data, "html_content") - if err != nil { - return - } - w.Write([]byte(data)) - } - } -} - -func (tm *DAG) taskStatusHandler(w http.ResponseWriter, r *http.Request) { - taskID := r.URL.Query().Get("taskID") - if taskID == "" { - http.Error(w, `{"message": "taskID is missing"}`, http.StatusBadRequest) - return - } - manager, ok := tm.taskManager.Get(taskID) - if !ok { - http.Error(w, `{"message": "Invalid TaskID"}`, http.StatusNotFound) - return - } - result := make(map[string]TaskState) - for key, value := range manager.taskStates { - rs := jsonparser.Delete(value.Result.Data, "html_content") - state := TaskState{ - NodeID: value.NodeID, - Status: value.Status, - UpdatedAt: value.UpdatedAt, - Result: Result{ - Data: rs, - Error: value.Result.Error, - Status: value.Result.Status, - }, - } - result[key] = state - } - w.Header().Set(consts.ContentType, consts.TypeJson) - json.NewEncoder(w).Encode(result) -} - -func (tm *DAG) Start(addr string) { - http.HandleFunc("/", tm.taskRender) - http.HandleFunc("/task-result", tm.taskStatusHandler) - http.ListenAndServe(addr, nil) -} - -type Context struct { - Query map[string]any -} - -func (ctx *Context) Get(key string) string { - if val, ok := ctx.Query[key]; ok { - switch val := val.(type) { - case []string: - return val[0] - case string: - return val - } - } - return "" -} - -func parse(r *http.Request) (context.Context, []byte, error) { - ctx := r.Context() - body, err := io.ReadAll(r.Body) - if err != nil { - return ctx, nil, err - } - defer r.Body.Close() - userContext := &Context{Query: make(map[string]any)} - result := make(map[string]any) - queryParams := r.URL.Query() - for key, values := range queryParams { - if len(values) > 1 { - userContext.Query[key] = values // Handle multiple values - } else { - userContext.Query[key] = values[0] // Single value - } - } - ctx = context.WithValue(ctx, "UserContext", userContext) - contentType := r.Header.Get("Content-Type") - switch { - case contentType == "application/json": - if body == nil { - return ctx, nil, nil - } - if err := json.Unmarshal(body, &result); err != nil { - return ctx, nil, err - } - - case contentType == "application/x-www-form-urlencoded": - if err := r.ParseForm(); err != nil { - return ctx, nil, err - } - result = make(map[string]any) - for key, values := range r.Form { - if len(values) > 1 { - result[key] = values - } else { - result[key] = values[0] - } - } - default: - return ctx, nil, nil - } - bt, err := json.Marshal(result) - if err != nil { - return ctx, nil, err - } - return ctx, bt, err -} - -func UserContext(ctx context.Context) *Context { - if userContext, ok := ctx.Value("UserContext").(*Context); ok { - return userContext - } - return &Context{Query: make(map[string]any)} -} diff --git a/examples/webroot/form.html b/examples/webroot/form.html index 9e56f0e..bcff360 100644 --- a/examples/webroot/form.html +++ b/examples/webroot/form.html @@ -7,7 +7,7 @@

Enter Your Information

-
+


diff --git a/examples/webroot/result.html b/examples/webroot/result.html index 6e0d7c7..13f1af9 100644 --- a/examples/webroot/result.html +++ b/examples/webroot/result.html @@ -96,7 +96,7 @@ // Fetch the task result const taskID = new URLSearchParams(window.location.search).get('task_id'); // Get taskID from URL if (taskID) { - fetch(`/task-result?taskID=${taskID}`) + fetch(`/task/status?taskID=${taskID}`) .then(response => response.json()) .then(data => { if(data?.message) {