mirror of
https://github.com/oarkflow/mq.git
synced 2025-12-24 13:57:52 +08:00
feat: [wip] - Implement html node
This commit is contained in:
74
dag/v2/api.go
Normal file
74
dag/v2/api.go
Normal file
@@ -0,0 +1,74 @@
|
||||
package v2
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
|
||||
"github.com/oarkflow/mq/consts"
|
||||
"github.com/oarkflow/mq/jsonparser"
|
||||
)
|
||||
|
||||
func (tm *DAG) render(w http.ResponseWriter, request *http.Request) {
|
||||
ctx, data, err := parse(request)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
result := tm.ProcessTask(ctx, data)
|
||||
contentType, ok := result.Ctx.Value(consts.ContentType).(string)
|
||||
if !ok {
|
||||
contentType = consts.TypeJson
|
||||
}
|
||||
switch contentType {
|
||||
case consts.TypeHtml:
|
||||
w.Header().Set(consts.ContentType, consts.TypeHtml)
|
||||
data, err := jsonparser.GetString(result.Data, "html_content")
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
w.Write([]byte(data))
|
||||
default:
|
||||
if request.Method != "POST" {
|
||||
http.Error(w, `{"message": "not allowed"}`, http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
w.Header().Set(consts.ContentType, consts.TypeJson)
|
||||
json.NewEncoder(w).Encode(result.Data)
|
||||
}
|
||||
}
|
||||
|
||||
func (tm *DAG) taskStatusHandler(w http.ResponseWriter, r *http.Request) {
|
||||
taskID := r.URL.Query().Get("taskID")
|
||||
if taskID == "" {
|
||||
http.Error(w, `{"message": "taskID is missing"}`, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
manager, ok := tm.taskManager.Get(taskID)
|
||||
if !ok {
|
||||
http.Error(w, `{"message": "Invalid TaskID"}`, http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
result := make(map[string]TaskState)
|
||||
for key, value := range manager.taskStates {
|
||||
rs := jsonparser.Delete(value.Result.Data, "html_content")
|
||||
state := TaskState{
|
||||
NodeID: value.NodeID,
|
||||
Status: value.Status,
|
||||
UpdatedAt: value.UpdatedAt,
|
||||
Result: Result{
|
||||
Data: rs,
|
||||
Error: value.Result.Error,
|
||||
Status: value.Result.Status,
|
||||
},
|
||||
}
|
||||
result[key] = state
|
||||
}
|
||||
w.Header().Set(consts.ContentType, consts.TypeJson)
|
||||
json.NewEncoder(w).Encode(result)
|
||||
}
|
||||
|
||||
func (tm *DAG) Start(addr string) {
|
||||
http.HandleFunc("/process", tm.render)
|
||||
http.HandleFunc("/task/status", tm.taskStatusHandler)
|
||||
http.ListenAndServe(addr, nil)
|
||||
}
|
||||
81
dag/v2/context.go
Normal file
81
dag/v2/context.go
Normal file
@@ -0,0 +1,81 @@
|
||||
package v2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
type Context struct {
|
||||
Query map[string]any
|
||||
}
|
||||
|
||||
func (ctx *Context) Get(key string) string {
|
||||
if val, ok := ctx.Query[key]; ok {
|
||||
switch val := val.(type) {
|
||||
case []string:
|
||||
return val[0]
|
||||
case string:
|
||||
return val
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func parse(r *http.Request) (context.Context, []byte, error) {
|
||||
ctx := r.Context()
|
||||
userContext := &Context{Query: make(map[string]any)}
|
||||
result := make(map[string]any)
|
||||
queryParams := r.URL.Query()
|
||||
for key, values := range queryParams {
|
||||
if len(values) > 1 {
|
||||
userContext.Query[key] = values // Handle multiple values
|
||||
} else {
|
||||
userContext.Query[key] = values[0] // Single value
|
||||
}
|
||||
}
|
||||
ctx = context.WithValue(ctx, "UserContext", userContext)
|
||||
contentType := r.Header.Get("Content-Type")
|
||||
switch {
|
||||
case contentType == "application/json":
|
||||
body, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return ctx, nil, err
|
||||
}
|
||||
defer r.Body.Close()
|
||||
if body == nil {
|
||||
return ctx, nil, nil
|
||||
}
|
||||
if err := json.Unmarshal(body, &result); err != nil {
|
||||
return ctx, nil, err
|
||||
}
|
||||
|
||||
case contentType == "application/x-www-form-urlencoded":
|
||||
if err := r.ParseForm(); err != nil {
|
||||
return ctx, nil, err
|
||||
}
|
||||
result = make(map[string]any)
|
||||
for key, values := range r.PostForm {
|
||||
if len(values) > 1 {
|
||||
result[key] = values
|
||||
} else {
|
||||
result[key] = values[0]
|
||||
}
|
||||
}
|
||||
default:
|
||||
return ctx, nil, nil
|
||||
}
|
||||
bt, err := json.Marshal(result)
|
||||
if err != nil {
|
||||
return ctx, nil, err
|
||||
}
|
||||
return ctx, bt, err
|
||||
}
|
||||
|
||||
func UserContext(ctx context.Context) *Context {
|
||||
if userContext, ok := ctx.Value("UserContext").(*Context); ok {
|
||||
return userContext
|
||||
}
|
||||
return &Context{Query: make(map[string]any)}
|
||||
}
|
||||
175
dag/v2/dag.go
175
dag/v2/dag.go
@@ -4,13 +4,9 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"github.com/oarkflow/mq"
|
||||
"github.com/oarkflow/mq/consts"
|
||||
"github.com/oarkflow/mq/jsonparser"
|
||||
"github.com/oarkflow/mq/storage"
|
||||
"github.com/oarkflow/mq/storage/memory"
|
||||
)
|
||||
@@ -170,43 +166,6 @@ func (tm *DAG) GetPreviousNodes(key string) ([]*Node, error) {
|
||||
}
|
||||
|
||||
func (tm *DAG) ProcessTask(ctx context.Context, payload []byte) Result {
|
||||
var taskID string
|
||||
userCtx := UserContext(ctx)
|
||||
if val := userCtx.Get("task_id"); val != "" {
|
||||
taskID = val
|
||||
} else {
|
||||
taskID = mq.NewID()
|
||||
}
|
||||
ctx = context.WithValue(ctx, "task_id", taskID)
|
||||
resultCh := make(chan Result, 1)
|
||||
manager := NewTaskManager(tm, resultCh)
|
||||
tm.taskManager.Set(taskID, manager)
|
||||
firstNode, err := tm.parseInitialNode(ctx)
|
||||
if err != nil {
|
||||
return Result{Error: err}
|
||||
}
|
||||
manager.ProcessTask(ctx, taskID, firstNode, payload)
|
||||
return <-resultCh
|
||||
}
|
||||
|
||||
func (tm *DAG) render(w http.ResponseWriter, request *http.Request) {
|
||||
ctx, data, err := parse(request)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
result := tm.ProcessTask(ctx, data)
|
||||
if contentType, ok := result.Ctx.Value(consts.ContentType).(string); ok && contentType == consts.TypeHtml {
|
||||
w.Header().Set(consts.ContentType, consts.TypeHtml)
|
||||
data, err := jsonparser.GetString(result.Data, "html_content")
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
w.Write([]byte(data))
|
||||
}
|
||||
}
|
||||
|
||||
func (tm *DAG) submit(ctx context.Context, payload []byte) Result {
|
||||
var taskID string
|
||||
userCtx := UserContext(ctx)
|
||||
if val := userCtx.Get("task_id"); val != "" {
|
||||
@@ -241,137 +200,3 @@ func (tm *DAG) submit(ctx context.Context, payload []byte) Result {
|
||||
manager.ProcessTask(ctx, taskID, firstNode, payload)
|
||||
return <-resultCh
|
||||
}
|
||||
|
||||
func (tm *DAG) taskRender(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method == "GET" {
|
||||
tm.render(w, r)
|
||||
} else if r.Method == "POST" {
|
||||
ctx, data, err := parse(r)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
result := tm.submit(ctx, data)
|
||||
if result.Ctx == nil {
|
||||
fmt.Println("Ctrl not set")
|
||||
return
|
||||
}
|
||||
if contentType, ok := result.Ctx.Value(consts.ContentType).(string); ok && contentType == consts.TypeHtml {
|
||||
w.Header().Set(consts.ContentType, consts.TypeHtml)
|
||||
data, err := jsonparser.GetString(result.Data, "html_content")
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
w.Write([]byte(data))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (tm *DAG) taskStatusHandler(w http.ResponseWriter, r *http.Request) {
|
||||
taskID := r.URL.Query().Get("taskID")
|
||||
if taskID == "" {
|
||||
http.Error(w, `{"message": "taskID is missing"}`, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
manager, ok := tm.taskManager.Get(taskID)
|
||||
if !ok {
|
||||
http.Error(w, `{"message": "Invalid TaskID"}`, http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
result := make(map[string]TaskState)
|
||||
for key, value := range manager.taskStates {
|
||||
rs := jsonparser.Delete(value.Result.Data, "html_content")
|
||||
state := TaskState{
|
||||
NodeID: value.NodeID,
|
||||
Status: value.Status,
|
||||
UpdatedAt: value.UpdatedAt,
|
||||
Result: Result{
|
||||
Data: rs,
|
||||
Error: value.Result.Error,
|
||||
Status: value.Result.Status,
|
||||
},
|
||||
}
|
||||
result[key] = state
|
||||
}
|
||||
w.Header().Set(consts.ContentType, consts.TypeJson)
|
||||
json.NewEncoder(w).Encode(result)
|
||||
}
|
||||
|
||||
func (tm *DAG) Start(addr string) {
|
||||
http.HandleFunc("/", tm.taskRender)
|
||||
http.HandleFunc("/task-result", tm.taskStatusHandler)
|
||||
http.ListenAndServe(addr, nil)
|
||||
}
|
||||
|
||||
type Context struct {
|
||||
Query map[string]any
|
||||
}
|
||||
|
||||
func (ctx *Context) Get(key string) string {
|
||||
if val, ok := ctx.Query[key]; ok {
|
||||
switch val := val.(type) {
|
||||
case []string:
|
||||
return val[0]
|
||||
case string:
|
||||
return val
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func parse(r *http.Request) (context.Context, []byte, error) {
|
||||
ctx := r.Context()
|
||||
body, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return ctx, nil, err
|
||||
}
|
||||
defer r.Body.Close()
|
||||
userContext := &Context{Query: make(map[string]any)}
|
||||
result := make(map[string]any)
|
||||
queryParams := r.URL.Query()
|
||||
for key, values := range queryParams {
|
||||
if len(values) > 1 {
|
||||
userContext.Query[key] = values // Handle multiple values
|
||||
} else {
|
||||
userContext.Query[key] = values[0] // Single value
|
||||
}
|
||||
}
|
||||
ctx = context.WithValue(ctx, "UserContext", userContext)
|
||||
contentType := r.Header.Get("Content-Type")
|
||||
switch {
|
||||
case contentType == "application/json":
|
||||
if body == nil {
|
||||
return ctx, nil, nil
|
||||
}
|
||||
if err := json.Unmarshal(body, &result); err != nil {
|
||||
return ctx, nil, err
|
||||
}
|
||||
|
||||
case contentType == "application/x-www-form-urlencoded":
|
||||
if err := r.ParseForm(); err != nil {
|
||||
return ctx, nil, err
|
||||
}
|
||||
result = make(map[string]any)
|
||||
for key, values := range r.Form {
|
||||
if len(values) > 1 {
|
||||
result[key] = values
|
||||
} else {
|
||||
result[key] = values[0]
|
||||
}
|
||||
}
|
||||
default:
|
||||
return ctx, nil, nil
|
||||
}
|
||||
bt, err := json.Marshal(result)
|
||||
if err != nil {
|
||||
return ctx, nil, err
|
||||
}
|
||||
return ctx, bt, err
|
||||
}
|
||||
|
||||
func UserContext(ctx context.Context) *Context {
|
||||
if userContext, ok := ctx.Value("UserContext").(*Context); ok {
|
||||
return userContext
|
||||
}
|
||||
return &Context{Query: make(map[string]any)}
|
||||
}
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
</head>
|
||||
<body>
|
||||
<h1>Enter Your Information</h1>
|
||||
<form action="/?task_id={{task_id}}&next=true" method="POST">
|
||||
<form action="/process?task_id={{task_id}}&next=true" method="POST">
|
||||
<label for="email">Email:</label><br>
|
||||
<input type="email" id="email" name="email" value="s.baniya.np@gmail.com" required><br><br>
|
||||
|
||||
|
||||
@@ -96,7 +96,7 @@
|
||||
// Fetch the task result
|
||||
const taskID = new URLSearchParams(window.location.search).get('task_id'); // Get taskID from URL
|
||||
if (taskID) {
|
||||
fetch(`/task-result?taskID=${taskID}`)
|
||||
fetch(`/task/status?taskID=${taskID}`)
|
||||
.then(response => response.json())
|
||||
.then(data => {
|
||||
if(data?.message) {
|
||||
|
||||
Reference in New Issue
Block a user