update: dependencies

This commit is contained in:
Oarkflow
2024-11-21 16:58:20 +05:45
parent 3ffe2507ca
commit aca6c4e57b
7 changed files with 43 additions and 38 deletions

View File

@@ -55,7 +55,7 @@ func (tm *DAG) render(w http.ResponseWriter, r *http.Request) {
switch contentType {
case consts.TypeHtml:
w.Header().Set(consts.ContentType, consts.TypeHtml)
data, err := jsonparser.GetString(result.Data, "html_content")
data, err := jsonparser.GetString(result.Payload, "html_content")
if err != nil {
return
}
@@ -66,7 +66,7 @@ func (tm *DAG) render(w http.ResponseWriter, r *http.Request) {
return
}
w.Header().Set(consts.ContentType, consts.TypeJson)
json.NewEncoder(w).Encode(result.Data)
json.NewEncoder(w).Encode(result.Payload)
}
}
@@ -85,7 +85,7 @@ func (tm *DAG) taskStatusHandler(w http.ResponseWriter, r *http.Request) {
manager.taskStates.ForEach(func(key string, value *TaskState) bool {
key = strings.Split(key, Delimiter)[0]
nodeID := strings.Split(value.NodeID, Delimiter)[0]
rs := jsonparser.Delete(value.Result.Data, "html_content")
rs := jsonparser.Delete(value.Result.Payload, "html_content")
status := value.Status
if status == StatusProcessing {
status = StatusCompleted
@@ -95,7 +95,7 @@ func (tm *DAG) taskStatusHandler(w http.ResponseWriter, r *http.Request) {
Status: status,
UpdatedAt: value.UpdatedAt,
Result: Result{
Data: rs,
Payload: rs,
Error: value.Result.Error,
Status: status,
},

View File

@@ -22,7 +22,7 @@ const (
type Result struct {
Ctx context.Context `json:"-"`
Data json.RawMessage
Payload json.RawMessage
Error error
Status TaskStatus
ConditionStatus string
@@ -261,12 +261,13 @@ func (tm *DAG) ProcessTask(ctx context.Context, payload []byte) Result {
} else {
manager.resultCh = resultCh
}
node, exists := tm.nodes.Get(manager.currentNode)
currentNode := strings.Split(manager.currentNode, Delimiter)[0]
node, exists := tm.nodes.Get(currentNode)
method, ok := ctx.Value("method").(string)
if method == "GET" && exists && node.Type == Page {
ctx = context.WithValue(ctx, "initial_node", manager.currentNode)
ctx = context.WithValue(ctx, "initial_node", currentNode)
} else if next == "true" {
nodes, err := tm.GetNextNodes(manager.currentNode)
nodes, err := tm.GetNextNodes(currentNode)
if err != nil {
return Result{Error: err, Ctx: ctx}
}

View File

@@ -151,6 +151,7 @@ func (tm *TaskManager) processNode(exec *Task) {
}
state.Status = StatusProcessing
state.UpdatedAt = time.Now()
fmt.Println(exec.nodeID)
tm.currentNode = exec.nodeID
result := node.Handler(exec.ctx, exec.payload)
state.Result = result
@@ -178,7 +179,7 @@ func (tm *TaskManager) handlePrevious(ctx context.Context, state *TaskState, res
aggregatedData := make([]json.RawMessage, size)
i := 0
state.targetResults.ForEach(func(_ string, rs Result) bool {
aggregatedData[i] = rs.Data
aggregatedData[i] = rs.Payload
i++
return true
})
@@ -186,15 +187,15 @@ func (tm *TaskManager) handlePrevious(ctx context.Context, state *TaskState, res
if err != nil {
panic(err)
}
state.Result = Result{Data: aggregatedPayload, Status: StatusCompleted, Ctx: ctx, Topic: state.NodeID}
state.Result = Result{Payload: aggregatedPayload, Status: StatusCompleted, Ctx: ctx, Topic: state.NodeID}
} else if size == 1 {
state.Result = state.targetResults.Values()[0]
}
state.Status = result.Status
state.Result.Status = result.Status
}
if state.Result.Data == nil {
state.Result.Data = result.Data
if state.Result.Payload == nil {
state.Result.Payload = result.Payload
}
state.UpdatedAt = time.Now()
if result.Ctx == nil {
@@ -227,6 +228,7 @@ func (tm *TaskManager) handlePrevious(ctx context.Context, state *TaskState, res
}
}
} else {
state.Result.Topic = strings.Split(state.NodeID, Delimiter)[0]
tm.resultCh <- state.Result
tm.processFinalResult(state)
}
@@ -318,7 +320,7 @@ func (tm *TaskManager) handleEdges(currentResult nodeResult, edges []Edge) {
}
if edge.Type == Iterator {
var items []json.RawMessage
err := json.Unmarshal(currentResult.result.Data, &items)
err := json.Unmarshal(currentResult.result.Payload, &items)
if err != nil {
log.Printf("Error unmarshalling data for node %s: %v\n", edge.To.ID, err)
tm.resultQueue <- nodeResult{
@@ -345,7 +347,7 @@ func (tm *TaskManager) handleEdges(currentResult nodeResult, edges []Edge) {
childNode := fmt.Sprintf("%s%s%s", edge.To.ID, Delimiter, idx)
ctx := context.WithValue(currentResult.ctx, ContextIndex, idx)
tm.parentNodes.Set(childNode, parentNode)
tm.send(ctx, edge.To.ID, tm.taskID, currentResult.result.Data)
tm.send(ctx, edge.To.ID, tm.taskID, currentResult.result.Payload)
}
}
}

View File

@@ -33,8 +33,8 @@ func subDAG() *dag.DAG {
AddNode("Store data", "store:data", &tasks.StoreData{Operation: dag.Operation{Type: "process"}}, true).
AddNode("Send SMS", "send:sms", &tasks.SendSms{Operation: dag.Operation{Type: "process"}}).
AddNode("Notification", "notification", &tasks.InAppNotification{Operation: dag.Operation{Type: "process"}}).
AddEdge("Store Data to send sms", "store:data", "send:sms").
AddEdge("Store Data to notification", "send:sms", "notification")
AddEdge("Store Payload to send sms", "store:data", "send:sms").
AddEdge("Store Payload to notification", "send:sms", "notification")
return f
}
@@ -43,7 +43,7 @@ func setup(f *dag.DAG) {
AddNode("Email Delivery", "email:deliver", &tasks.EmailDelivery{Operation: dag.Operation{Type: "process"}}).
AddNode("Prepare Email", "prepare:email", &tasks.PrepareEmail{Operation: dag.Operation{Type: "process"}}).
AddNode("Get Input", "get:input", &tasks.GetData{Operation: dag.Operation{Type: "input"}}, true).
AddNode("Final Data", "final", &tasks.Final{Operation: dag.Operation{Type: "page"}}).
AddNode("Final Payload", "final", &tasks.Final{Operation: dag.Operation{Type: "page"}}).
AddNode("Iterator Processor", "loop", &tasks.Loop{Operation: dag.Operation{Type: "loop"}}).
AddNode("Condition", "condition", &tasks.Condition{Operation: dag.Operation{Type: "condition"}}).
AddDAGNode("Persistent", "persistent", subDAG()).

View File

@@ -29,7 +29,7 @@ func Form(ctx context.Context, payload json.RawMessage) v2.Result {
"html_content": rs,
}
bt, _ = json.Marshal(data)
return v2.Result{Data: bt, Ctx: ctx}
return v2.Result{Payload: bt, Ctx: ctx}
}
func NodeA(ctx context.Context, payload json.RawMessage) v2.Result {
@@ -39,7 +39,7 @@ func NodeA(ctx context.Context, payload json.RawMessage) v2.Result {
}
data["allowed_voting"] = data["age"] == "18"
updatedPayload, _ := json.Marshal(data)
return v2.Result{Data: updatedPayload, Ctx: ctx}
return v2.Result{Payload: updatedPayload, Ctx: ctx}
}
func NodeB(ctx context.Context, payload json.RawMessage) v2.Result {
@@ -49,7 +49,7 @@ func NodeB(ctx context.Context, payload json.RawMessage) v2.Result {
}
data["female_voter"] = data["gender"] == "female"
updatedPayload, _ := json.Marshal(data)
return v2.Result{Data: updatedPayload, Ctx: ctx}
return v2.Result{Payload: updatedPayload, Ctx: ctx}
}
func NodeC(ctx context.Context, payload json.RawMessage) v2.Result {
@@ -59,7 +59,7 @@ func NodeC(ctx context.Context, payload json.RawMessage) v2.Result {
}
data["voted"] = true
updatedPayload, _ := json.Marshal(data)
return v2.Result{Data: updatedPayload, Ctx: ctx}
return v2.Result{Payload: updatedPayload, Ctx: ctx}
}
func Result(ctx context.Context, payload json.RawMessage) v2.Result {
@@ -68,9 +68,11 @@ func Result(ctx context.Context, payload json.RawMessage) v2.Result {
return v2.Result{Error: err, Ctx: ctx}
}
var data map[string]any
if payload != nil {
if err := json.Unmarshal(payload, &data); err != nil {
return v2.Result{Error: err, Ctx: ctx}
}
}
if bt != nil {
parser := jet.NewWithMemory(jet.WithDelims("{{", "}}"))
rs, err := parser.ParseTemplate(string(bt), data)
@@ -82,13 +84,13 @@ func Result(ctx context.Context, payload json.RawMessage) v2.Result {
"html_content": rs,
}
bt, _ := json.Marshal(data)
return v2.Result{Data: bt, Ctx: ctx}
return v2.Result{Payload: bt, Ctx: ctx}
}
return v2.Result{Data: payload, Ctx: ctx}
return v2.Result{Payload: payload, Ctx: ctx}
}
func notify(taskID string, result v2.Result) {
fmt.Printf("Final result for Task %s: %s\n", taskID, string(result.Data))
fmt.Printf("Final result for Task %s: %s\n", taskID, string(result.Payload))
}
func main() {

View File

@@ -10,7 +10,7 @@ import (
func main() {
dag := v2.NewDAG(func(taskID string, result v2.Result) {
// fmt.Printf("Final resuslt for Task %s: %s\n", taskID, string(result.Data))
// fmt.Printf("Final resuslt for Task %s: %s\n", taskID, string(result.Payload))
})
dag.AddNode(v2.Function, "GetData", GetData, true)
dag.AddNode(v2.Function, "Loop", Loop)
@@ -32,15 +32,15 @@ func main() {
if rs.Error != nil {
panic(rs.Error)
}
fmt.Println(rs.Status, rs.Topic, string(rs.Data))
fmt.Println(rs.Status, rs.Topic, string(rs.Payload))
}
func GetData(ctx context.Context, payload json.RawMessage) v2.Result {
return v2.Result{Ctx: ctx, Data: payload}
return v2.Result{Ctx: ctx, Payload: payload}
}
func Loop(ctx context.Context, payload json.RawMessage) v2.Result {
return v2.Result{Ctx: ctx, Data: payload}
return v2.Result{Ctx: ctx, Payload: payload}
}
func ValidateAge(ctx context.Context, payload json.RawMessage) v2.Result {
@@ -56,7 +56,7 @@ func ValidateAge(ctx context.Context, payload json.RawMessage) v2.Result {
}
data["age_voter"] = data["age"] == "18"
updatedPayload, _ := json.Marshal(data)
return v2.Result{Data: updatedPayload, Ctx: ctx, ConditionStatus: status}
return v2.Result{Payload: updatedPayload, Ctx: ctx, ConditionStatus: status}
}
func ValidateGender(ctx context.Context, payload json.RawMessage) v2.Result {
@@ -66,7 +66,7 @@ func ValidateGender(ctx context.Context, payload json.RawMessage) v2.Result {
}
data["female_voter"] = data["gender"] == "female"
updatedPayload, _ := json.Marshal(data)
return v2.Result{Data: updatedPayload, Ctx: ctx}
return v2.Result{Payload: updatedPayload, Ctx: ctx}
}
func Final(ctx context.Context, payload json.RawMessage) v2.Result {
@@ -82,5 +82,5 @@ func Final(ctx context.Context, payload json.RawMessage) v2.Result {
if err != nil {
panic(err)
}
return v2.Result{Data: updatedPayload, Ctx: ctx}
return v2.Result{Payload: updatedPayload, Ctx: ctx}
}

View File

@@ -173,13 +173,13 @@
<th>Task ID</th>
<th>Status</th>
<th>UpdatedAt</th>
<th>Result Data</th>
<th>Result</th>
</tr>
<tr>
<td>${taskID}</td>
<td class="${getStatusClass(data.Result.Status)}">${data.Result.Status}</td>
<td>${formatDate(data.Result.UpdatedAt)}</td>
<td><pre>${JSON.stringify(data.Result.Result.Data, null, 2)}</pre></td>
<td><pre>${JSON.stringify(data.Result.Result.Payload, null, 2)}</pre></td>
</tr>
</table>
`;
@@ -202,7 +202,7 @@
<td>${node.NodeID}</td>
<td class="${getStatusClass(node.Status)}">${node.Status}</td>
<td>${formatDate(node.UpdatedAt)}</td>
<td><pre>${JSON.stringify(node.Result.Data, null, 2)}</pre></td>
<td><pre>${JSON.stringify(node.Result.Payload, null, 2)}</pre></td>
</tr>
`;
}