diff --git a/dag/dag.go b/dag/dag.go index c9b3bca..5c8bdd8 100644 --- a/dag/dag.go +++ b/dag/dag.go @@ -245,12 +245,14 @@ func (d *DAG) updateTaskMetrics(taskID string, result mq.Result, duration time.D case mq.Cancelled: d.metrics.Cancelled++ } - d.Logger().Info("Updating task metrics", - logger.Field{Key: "taskID", Value: taskID}, - logger.Field{Key: "lastExecuted", Value: time.Now()}, - logger.Field{Key: "duration", Value: duration}, - logger.Field{Key: "success", Value: result.Status}, - ) + if d.debug { + d.Logger().Info("Updating task metrics", + logger.Field{Key: "taskID", Value: taskID}, + logger.Field{Key: "lastExecuted", Value: time.Now()}, + logger.Field{Key: "duration", Value: duration}, + logger.Field{Key: "success", Value: result.Status}, + ) + } } // Getter for task metrics. diff --git a/dag/enhancements.go b/dag/enhancements.go index 8fcaa0d..2af7bc6 100644 --- a/dag/enhancements.go +++ b/dag/enhancements.go @@ -255,10 +255,12 @@ func (tm *TransactionManager) CommitTransaction(txID string) error { tx.Status = TransactionStatusCommitted tx.EndTime = time.Now() - tm.logger.Info("Transaction committed", - logger.Field{Key: "transaction_id", Value: txID}, - logger.Field{Key: "operations_count", Value: len(tx.Operations)}, - ) + if tm.dag.debug { + tm.logger.Info("Transaction committed", + logger.Field{Key: "transaction_id", Value: txID}, + logger.Field{Key: "operations_count", Value: len(tx.Operations)}, + ) + } // Clean up save points delete(tm.savePoints, txID) diff --git a/dag/task_manager.go b/dag/task_manager.go index 4d99b1a..0ab8011 100644 --- a/dag/task_manager.go +++ b/dag/task_manager.go @@ -4,13 +4,13 @@ import ( "context" "fmt" "log" + "math/rand" // ...new import for jitter... "strings" "sync" "time" - "math/rand" // ...new import for jitter... - "github.com/oarkflow/json" + "github.com/oarkflow/mq" "github.com/oarkflow/mq/logger" "github.com/oarkflow/mq/storage" @@ -476,7 +476,9 @@ func (tm *TaskManager) logNodeExecution(exec *task, pureNodeID string, result mq fields = append(fields, logger.Field{Key: "error", Value: result.Error.Error()}) tm.dag.Logger().Error("Node execution failed", fields...) } else { - tm.dag.Logger().Info("Node execution completed", fields...) + if tm.dag.debug { + tm.dag.Logger().Info("Node execution completed", fields...) + } } } diff --git a/performance.go b/performance.go index ba4b1a6..9a939ad 100644 --- a/performance.go +++ b/performance.go @@ -284,8 +284,6 @@ func (po *PerformanceOptimizer) adjustWorkerCount() { // Apply scaling if targetWorkers != currentWorkers { - po.workerPool.logger.Info().Msg(fmt.Sprintf("Auto-scaling workers from %d to %d (queue: %d)", - currentWorkers, targetWorkers, queueDepth)) po.workerPool.AdjustWorkerCount(targetWorkers) } } diff --git a/pool.go b/pool.go index bf1f60a..eabfff9 100644 --- a/pool.go +++ b/pool.go @@ -1356,8 +1356,6 @@ func (wp *Pool) adjustWorkersBasedOnLoad() { targetWorkers = max(minWorkers, min(maxWorkers, targetWorkers)) if targetWorkers != currentWorkers { - wp.logger.Info().Msgf("Auto-scaling workers from %d to %d (queue: %d, overflow: %d)", - currentWorkers, targetWorkers, queueLen, overflowLen) wp.AdjustWorkerCount(targetWorkers) } } diff --git a/services/http/responses/responses.go b/services/http/responses/responses.go index cb05dd2..d727634 100644 --- a/services/http/responses/responses.go +++ b/services/http/responses/responses.go @@ -11,7 +11,7 @@ import ( type Response struct { Additional any `json:"additional,omitempty"` Data any `json:"data"` - Message string `json:"message,omitempty"` + Message string `json:"message"` StackTrace string `json:"stack_trace,omitempty"` Code int `json:"code"` Success bool `json:"success"` diff --git a/services/setup.go b/services/setup.go index 21d2d20..4e5ab52 100644 --- a/services/setup.go +++ b/services/setup.go @@ -19,8 +19,6 @@ import ( "github.com/oarkflow/form" "github.com/oarkflow/json" "github.com/oarkflow/log" - "github.com/oarkflow/protocol/utils/str" - "github.com/oarkflow/mq" "github.com/oarkflow/mq/consts" "github.com/oarkflow/mq/dag" @@ -28,6 +26,7 @@ import ( "github.com/oarkflow/mq/services/middlewares" "github.com/oarkflow/mq/services/renderer" "github.com/oarkflow/mq/services/utils" + "github.com/oarkflow/protocol/utils/str" ) var ValidationInstance Validation @@ -142,41 +141,34 @@ func prepareNode(flow *dag.DAG, node Node) error { GeneratedFields: node.Data.GeneratedFields, Providers: providers, }) - if s, ok := node.Data.AdditionalData["conditions"]; ok { - var fil map[string]*Filter - err := Map(&fil, s) - if err != nil { - return err - } - condition := make(map[string]string) - conditions := make(map[string]dag.Condition) - for key, cond := range fil { - condition[key] = cond.Node - if cond.Filter != nil { - conditions[key] = cond.Filter - } else if cond.FilterGroup != nil { - cond.FilterGroup.Operator = strings.ToUpper(cond.FilterGroup.Operator) - if !slices.Contains([]string{"AND", "OR"}, cond.FilterGroup.Operator) { - cond.FilterGroup.Operator = "AND" - } - var fillers []filters.Condition - for _, f := range cond.FilterGroup.Filters { - if f != nil { - fillers = append(fillers, f) - } - } - conditions[key] = &filters.FilterGroup{ - Operator: filters.Boolean(cond.FilterGroup.Operator), - Reverse: cond.FilterGroup.Reverse, - Filters: fillers, - } - } else { - conditions[key] = nil + condition := make(map[string]string) + conditions := make(map[string]dag.Condition) + for key, cond := range node.Data.Conditions { + condition[key] = cond.Node + if cond.Filter != nil { + conditions[key] = cond.Filter + } else if cond.FilterGroup != nil { + cond.FilterGroup.Operator = strings.ToUpper(cond.FilterGroup.Operator) + if !slices.Contains([]string{"AND", "OR"}, cond.FilterGroup.Operator) { + cond.FilterGroup.Operator = "AND" } + var fillers []filters.Condition + for _, f := range cond.FilterGroup.Filters { + if f != nil { + fillers = append(fillers, f) + } + } + conditions[key] = &filters.FilterGroup{ + Operator: filters.Boolean(cond.FilterGroup.Operator), + Reverse: cond.FilterGroup.Reverse, + Filters: fillers, + } + } else { + conditions[key] = nil } - flow.AddCondition(node.ID, condition) - nodeHandler.SetConditions(conditions) } + flow.AddCondition(node.ID, condition) + nodeHandler.SetConditions(conditions) case dag.Processor: nodeHandler.SetConfig(dag.Payload{ Mapping: node.Data.Mapping, @@ -459,7 +451,7 @@ func customHandler(flow *dag.DAG) fiber.Handler { if contentType == "" || contentType == fiber.MIMEApplicationJSON || contentType == fiber.MIMEApplicationJSONCharsetUTF8 { - return ctx.JSON(result) + return responses.Success(ctx, 200, result.Payload) } var resultData map[string]any diff --git a/services/user_config.go b/services/user_config.go index 43b4e22..7808e0e 100644 --- a/services/user_config.go +++ b/services/user_config.go @@ -126,6 +126,7 @@ type Data struct { Mapping map[string]string `json:"mapping,omitempty" yaml:"mapping,omitempty"` AdditionalData map[string]any `json:"additional_data,omitempty" yaml:"additional_data,omitempty"` GeneratedFields []string `json:"generated_fields,omitempty" yaml:"generated_fields,omitempty"` + Conditions map[string]Filter `json:"conditions,omitempty" yaml:"conditions,omitempty"` Providers []Provider `json:"providers,omitempty" yaml:"providers,omitempty"` }