feat: update

This commit is contained in:
sujit
2025-09-17 07:00:43 +05:45
parent 3b2cc20299
commit 52341cdf00
4 changed files with 303 additions and 4 deletions

View File

@@ -94,7 +94,7 @@ package main
import (
"context"
"github.com/oarkflow/mq"
"github.com/oarkflow/mq/examples/tasks"
)
@@ -141,7 +141,7 @@ package main
import (
"context"
"fmt"
"github.com/oarkflow/mq"
)
@@ -247,8 +247,84 @@ func requestHandler(requestType string) func(w http.ResponseWriter, r *http.Requ
}
```
## Middleware System
The DAG system supports a robust middleware system that allows you to add cross-cutting concerns like logging, validation, timing, and more to your DAG nodes.
### Global Middlewares
Global middlewares apply to all nodes in the DAG:
```go
flow.Use(LoggingMiddleware, ValidationMiddleware)
```
### Node-Specific Middlewares
You can apply middlewares to specific nodes:
```go
flow.UseNodeMiddlewares(
dag.NodeMiddleware{
Node: "process_a",
Middlewares: []mq.Handler{TimingMiddleware},
},
dag.NodeMiddleware{
Node: "process_b",
Middlewares: []mq.Handler{TimingMiddleware},
},
)
```
### Middleware Example
[middleware_example_main.go](./examples/middleware_example_main.go)
```go
// LoggingMiddleware logs the start and end of task processing
func LoggingMiddleware(ctx context.Context, task *mq.Task) mq.Result {
log.Printf("Middleware: Starting processing for node %s, task %s", task.Topic, task.ID)
// Return successful result to continue to next middleware/processor
return mq.Result{
Status: mq.Completed,
Ctx: ctx,
Payload: task.Payload,
}
}
// ValidationMiddleware validates the task payload
func ValidationMiddleware(ctx context.Context, task *mq.Task) mq.Result {
if len(task.Payload) == 0 {
return mq.Result{
Status: mq.Failed,
Error: fmt.Errorf("empty payload not allowed"),
Ctx: ctx,
}
}
return mq.Result{
Status: mq.Completed,
Ctx: ctx,
Payload: task.Payload,
}
}
```
### Middleware Execution Order
Middlewares are executed in the following order:
1. Global middlewares (in the order they were added)
2. Node-specific middlewares (in the order they were added)
3. The actual node processor
Each middleware can:
- Modify the context
- Modify the task payload
- Short-circuit processing by returning an error result
- Continue processing by returning a successful result
## TODOS
- Backend for task persistence
- Task scheduling
- Task scheduling

View File

@@ -32,6 +32,12 @@ type TaskMetrics struct {
Failed int
}
// NodeMiddleware represents middleware configuration for a specific node
type NodeMiddleware struct {
Node string
Middlewares []mq.Handler
}
type Node struct {
processor mq.Processor
Label string
@@ -114,6 +120,11 @@ type DAG struct {
// Debug configuration
debug bool // Global debug mode for the entire DAG
// Middleware configuration
globalMiddlewares []mq.Handler
nodeMiddlewares map[string][]mq.Handler
middlewaresMu sync.RWMutex
}
// SetPreProcessHook configures a function to be called before each node is processed.
@@ -183,6 +194,77 @@ func (tm *DAG) GetDebugInfo() map[string]interface{} {
return debugInfo
}
// Use adds global middleware handlers that will be executed for all nodes in the DAG
func (tm *DAG) Use(handlers ...mq.Handler) {
tm.middlewaresMu.Lock()
defer tm.middlewaresMu.Unlock()
tm.globalMiddlewares = append(tm.globalMiddlewares, handlers...)
}
// UseNodeMiddlewares adds middleware handlers for specific nodes
func (tm *DAG) UseNodeMiddlewares(nodeMiddlewares ...NodeMiddleware) {
tm.middlewaresMu.Lock()
defer tm.middlewaresMu.Unlock()
for _, nm := range nodeMiddlewares {
if nm.Node == "" {
continue
}
if tm.nodeMiddlewares[nm.Node] == nil {
tm.nodeMiddlewares[nm.Node] = make([]mq.Handler, 0)
}
tm.nodeMiddlewares[nm.Node] = append(tm.nodeMiddlewares[nm.Node], nm.Middlewares...)
}
}
// executeMiddlewares executes a chain of middlewares and then the final handler
func (tm *DAG) executeMiddlewares(ctx context.Context, task *mq.Task, middlewares []mq.Handler, finalHandler func(context.Context, *mq.Task) mq.Result) mq.Result {
if len(middlewares) == 0 {
return finalHandler(ctx, task)
}
// For this implementation, we'll execute middlewares sequentially
// Each middleware can modify the task/context and return a result
// If a middleware returns a result with Status == Completed and no error,
// we consider it as "continue to next"
for _, middleware := range middlewares {
result := middleware(ctx, task)
// If middleware returns an error or failed status, short-circuit
if result.Error != nil || result.Status == mq.Failed {
return result
}
// Update task payload if middleware modified it
if len(result.Payload) > 0 {
task.Payload = result.Payload
}
// Update context if middleware provided one
if result.Ctx != nil {
ctx = result.Ctx
}
}
// All middlewares passed, execute final handler
return finalHandler(ctx, task)
}
// getNodeMiddlewares returns all middlewares for a specific node (global + node-specific)
func (tm *DAG) getNodeMiddlewares(nodeID string) []mq.Handler {
tm.middlewaresMu.RLock()
defer tm.middlewaresMu.RUnlock()
var allMiddlewares []mq.Handler
// Add global middlewares first
allMiddlewares = append(allMiddlewares, tm.globalMiddlewares...)
// Add node-specific middlewares
if nodeMiddlewares, exists := tm.nodeMiddlewares[nodeID]; exists {
allMiddlewares = append(allMiddlewares, nodeMiddlewares...)
}
return allMiddlewares
}
func NewDAG(name, key string, finalResultCallback func(taskID string, result mq.Result), opts ...mq.Option) *DAG {
callback := func(ctx context.Context, result mq.Result) error { return nil }
d := &DAG{
@@ -197,6 +279,7 @@ func NewDAG(name, key string, finalResultCallback func(taskID string, result mq.
circuitBreakers: make(map[string]*CircuitBreaker),
nextNodesCache: make(map[string][]*Node),
prevNodesCache: make(map[string][]*Node),
nodeMiddlewares: make(map[string][]mq.Handler),
}
opts = append(opts,

View File

@@ -349,7 +349,13 @@ func (tm *TaskManager) processNode(exec *task) {
attempts := 0
for {
// log.Printf("Tracing: Start processing node %s (attempt %d) on flow %s", exec.nodeID, attempts+1, tm.dag.key)
result = node.processor.ProcessTask(exec.ctx, mq.NewTask(exec.taskID, exec.payload, exec.nodeID, mq.WithDAG(tm.dag)))
// Get middlewares for this node
middlewares := tm.dag.getNodeMiddlewares(pureNodeID)
// Execute middlewares and processor
result = tm.dag.executeMiddlewares(exec.ctx, mq.NewTask(exec.taskID, exec.payload, exec.nodeID, mq.WithDAG(tm.dag)), middlewares, func(ctx context.Context, task *mq.Task) mq.Result {
return node.processor.ProcessTask(ctx, task)
})
if result.Error != nil {
if te, ok := result.Error.(TaskError); ok && te.Recoverable {
if attempts < tm.maxRetries {

View File

@@ -0,0 +1,134 @@
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/oarkflow/mq"
"github.com/oarkflow/mq/dag"
)
// LoggingMiddleware logs the start and end of task processing
func LoggingMiddleware(ctx context.Context, task *mq.Task) mq.Result {
log.Printf("Middleware: Starting processing for node %s, task %s", task.Topic, task.ID)
start := time.Now()
// For middleware, we return a successful result to continue to next middleware/processor
// The actual processing will happen after all middlewares
result := mq.Result{
Status: mq.Completed,
Ctx: ctx,
Payload: task.Payload, // Pass through the payload
}
log.Printf("Middleware: Completed in %v", time.Since(start))
return result
}
// ValidationMiddleware validates the task payload
func ValidationMiddleware(ctx context.Context, task *mq.Task) mq.Result {
log.Printf("ValidationMiddleware: Validating payload for node %s", task.Topic)
// Check if payload is empty
if len(task.Payload) == 0 {
return mq.Result{
Status: mq.Failed,
Error: fmt.Errorf("empty payload not allowed"),
Ctx: ctx,
}
}
log.Printf("ValidationMiddleware: Payload validation passed")
return mq.Result{
Status: mq.Completed,
Ctx: ctx,
Payload: task.Payload,
}
}
// TimingMiddleware measures execution time
func TimingMiddleware(ctx context.Context, task *mq.Task) mq.Result {
log.Printf("TimingMiddleware: Starting timing for node %s", task.Topic)
// Add timing info to context
ctx = context.WithValue(ctx, "start_time", time.Now())
return mq.Result{
Status: mq.Completed,
Ctx: ctx,
Payload: task.Payload,
}
}
// Example processor that simulates some work
type ExampleProcessor struct {
dag.Operation
}
func (p *ExampleProcessor) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
log.Printf("Processor: Processing task %s on node %s", task.ID, task.Topic)
// Simulate some processing time
time.Sleep(100 * time.Millisecond)
// Check if timing middleware was used
if startTime, ok := ctx.Value("start_time").(time.Time); ok {
duration := time.Since(startTime)
log.Printf("Processor: Task completed in %v", duration)
}
result := fmt.Sprintf("Processed: %s", string(task.Payload))
return mq.Result{
Status: mq.Completed,
Payload: []byte(result),
Ctx: ctx,
}
}
func main() {
// Create a new DAG
flow := dag.NewDAG("Middleware Example", "middleware-example", func(taskID string, result mq.Result) {
log.Printf("Final result for task %s: %s", taskID, string(result.Payload))
})
// Add nodes
flow.AddNode(dag.Function, "Process A", "process_a", &ExampleProcessor{Operation: dag.Operation{Type: dag.Function}}, true)
flow.AddNode(dag.Function, "Process B", "process_b", &ExampleProcessor{Operation: dag.Operation{Type: dag.Function}})
flow.AddNode(dag.Function, "Process C", "process_c", &ExampleProcessor{Operation: dag.Operation{Type: dag.Function}})
// Add edges
flow.AddEdge(dag.Simple, "A to B", "process_a", "process_b")
flow.AddEdge(dag.Simple, "B to C", "process_b", "process_c")
// Add global middlewares that apply to all nodes
flow.Use(LoggingMiddleware, ValidationMiddleware)
// Add node-specific middlewares
flow.UseNodeMiddlewares(
dag.NodeMiddleware{
Node: "process_a",
Middlewares: []mq.Handler{TimingMiddleware},
},
dag.NodeMiddleware{
Node: "process_b",
Middlewares: []mq.Handler{TimingMiddleware},
},
)
if flow.Error != nil {
panic(flow.Error)
}
// Test the DAG with middleware
data := []byte(`{"message": "Hello from middleware example"}`)
log.Printf("Starting DAG processing with payload: %s", string(data))
result := flow.Process(context.Background(), data)
if result.Error != nil {
log.Printf("DAG processing failed: %v", result.Error)
} else {
log.Printf("DAG processing completed successfully: %s", string(result.Payload))
}
}