diff --git a/dag/operation.go b/dag/operation.go index 18e3565..ef8b3f6 100644 --- a/dag/operation.go +++ b/dag/operation.go @@ -8,6 +8,7 @@ import ( "time" "github.com/oarkflow/json" + "github.com/oarkflow/mq/utils" "github.com/oarkflow/date" "github.com/oarkflow/dipper" @@ -170,6 +171,48 @@ func (e *Operation) ValidateFields(c context.Context, payload []byte) (map[strin return data, nil } +func (e *Operation) ParseMapping(ctx context.Context, data map[string]any) map[string]any { + templateData := make(map[string]any) + if e.Payload.Mapping != nil { + for k, v := range e.Payload.Mapping { + _, val := GetVal(ctx, v, data) + templateData[k] = val + } + } + return templateData +} + +func (e *Operation) ExceptFields(payload []byte) []byte { + except, ok := e.Payload.Data["except_fields"].([]string) + if !ok { + exceptAny, ok := e.Payload.Data["except_fields"].([]any) + if ok { + except = make([]string, len(exceptAny)) + for i, v := range exceptAny { + except[i], _ = v.(string) + } + } + } + return e.RemoveFields(payload, except...) +} + +func (e *Operation) RemoveFields(payload []byte, keys ...string) []byte { + for _, field := range keys { + payload = utils.RemoveRecursiveFromJSON(payload, field) + } + return payload +} + +func UnmarshalPayload[T any](c context.Context, payload []byte) (T, error) { + var data T + if len(payload) > 0 { + if err := json.Unmarshal(payload, &data); err != nil { + return data, err + } + } + return data, nil +} + func GetVal(c context.Context, v string, data map[string]any) (key string, val any) { key, val = getVal(c, v, data) if val == nil { diff --git a/handlers/common_handler.go b/handlers/common_handler.go index 97226c5..90c2c4f 100644 --- a/handlers/common_handler.go +++ b/handlers/common_handler.go @@ -4,7 +4,6 @@ import ( "context" "fmt" - "github.com/oarkflow/json" "github.com/oarkflow/mq" "github.com/oarkflow/mq/dag" ) @@ -36,10 +35,9 @@ func (e *Condition) SetConditions(conditions map[string]dag.Condition) { } func (e *Condition) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { - var data map[string]any - err := json.Unmarshal(task.Payload, &data) + data, err := dag.UnmarshalPayload[map[string]any](ctx, task.Payload) if err != nil { - panic(err) + return mq.Result{Error: err, Ctx: ctx} } var conditionStatus string _, ok := e.conditions[defaultKey] diff --git a/handlers/examples.go b/handlers/examples.go deleted file mode 100644 index 55065a6..0000000 --- a/handlers/examples.go +++ /dev/null @@ -1,208 +0,0 @@ -package handlers - -/* -Data Transformation Handlers Usage Examples - -This file contains examples of how to configure and use the various data transformation handlers. -All configurations are done through the dag.Operation.Payload.Data map - no handler-specific configurations. - -1. FORMAT HANDLER -================= -Supports: string, number, date, currency, uppercase, lowercase, capitalize, trim - -Example configuration: -{ - "format_type": "uppercase", - "fields": ["name", "title"], - "currency": "$", - "date_format": "2006-01-02" -} - -2. GROUP HANDLER -================ -Groups data with aggregation functions - -Example configuration: -{ - "group_by": ["department", "status"], - "aggregations": { - "salary": "sum", - "age": "avg", - "count": "count", - "name": "concat" - }, - "concat_separator": ", " -} - -3. SPLIT/JOIN HANDLER -==================== -Handles string operations - -Split example: -{ - "operation": "split", - "fields": ["full_name"], - "separator": " " -} - -Join example: -{ - "operation": "join", - "source_fields": ["first_name", "last_name"], - "target_field": "full_name", - "separator": " " -} - -4. FLATTEN HANDLER -================== -Flattens nested data structures - -Flatten settings example (key-value pairs): -{ - "operation": "flatten_settings", - "source_field": "settings", - "target_field": "config" -} - -Input: {"settings": [{"key": "theme", "value": "dark", "value_type": "string"}]} -Output: {"config": {"theme": "dark"}} - -5. JSON HANDLER -=============== -JSON parsing and manipulation - -Parse JSON string: -{ - "operation": "parse", - "fields": ["json_data"] -} - -Stringify object: -{ - "operation": "stringify", - "fields": ["object_data"], - "indent": true -} - -6. FIELD HANDLER -================ -Field manipulation operations - -Filter fields: -{ - "operation": "filter", - "fields": ["name", "email", "age"] -} - -Rename fields: -{ - "operation": "rename", - "mapping": { - "old_name": "new_name", - "email_addr": "email" - } -} - -Add fields: -{ - "operation": "add", - "new_fields": { - "created_at": "2023-01-01", - "status": "active" - } -} - -Transform keys: -{ - "operation": "transform_keys", - "transformation": "snake_case" // or camel_case, kebab_case, etc. -} - -7. DATA HANDLER -=============== -Miscellaneous data operations - -Sort data: -{ - "operation": "sort", - "sort_field": "created_at", - "sort_order": "desc" -} - -Deduplicate: -{ - "operation": "deduplicate", - "dedupe_fields": ["email", "phone"] -} - -Calculate fields: -{ - "operation": "calculate", - "calculations": { - "total": { - "operation": "sum", - "fields": ["amount1", "amount2"] - }, - "average_score": { - "operation": "average", - "fields": ["score1", "score2", "score3"] - } - } -} - -Type casting: -{ - "operation": "type_cast", - "cast": { - "age": "int", - "salary": "float", - "active": "bool" - } -} - -Validate fields: -{ - "operation": "validate_fields", - "validation_rules": { - "email": { - "required": true, - "type": "string" - }, - "age": { - "required": true, - "type": "int", - "min": 0 - } - } -} - -USAGE IN DAG: -============= - -import "github.com/oarkflow/mq/handlers" -import "github.com/oarkflow/mq/dag" - -// Create handler -formatHandler := handlers.NewFormatHandler("format-1") - -// Configure through Operation.Payload -config := dag.Payload{ - Data: map[string]any{ - "format_type": "uppercase", - "fields": []string{"name", "title"}, - }, -} -formatHandler.SetConfig(config) - -// Use in DAG -dag := dag.NewDAG("data-processing") -dag.AddNode(formatHandler) - -CHAINING OPERATIONS: -=================== - -You can chain multiple handlers in a DAG: -1. Parse JSON → 2. Flatten → 3. Filter fields → 4. Format → 5. Group - -Each handler receives the output of the previous handler as input. -*/ diff --git a/handlers/html_handler.go b/handlers/html_handler.go index c82a10d..84a329c 100644 --- a/handlers/html_handler.go +++ b/handlers/html_handler.go @@ -50,11 +50,9 @@ func (c *RenderHTMLNode) ProcessTask(ctx context.Context, task *mq.Task) mq.Resu templateStr, _ = data["template"].(string) templateFile, _ = data["template_file"].(string) ) - var templateData map[string]any - if len(task.Payload) > 0 { - if err := json.Unmarshal(task.Payload, &templateData); err != nil { - return mq.Result{Payload: task.Payload, Error: err, Ctx: ctx} - } + templateData, err := dag.UnmarshalPayload[map[string]any](ctx, task.Payload) + if err != nil { + return mq.Result{Error: err, Ctx: ctx} } if templateData == nil { templateData = make(map[string]any) @@ -68,7 +66,6 @@ func (c *RenderHTMLNode) ProcessTask(ctx context.Context, task *mq.Task) mq.Resu } templateData["task_id"] = ctx.Value("task_id") var renderedHTML string - var err error parser := jet.NewWithMemory(jet.WithDelims("{{", "}}")) switch { // 1. JSONSchema + HTML Template diff --git a/handlers/output_handler.go b/handlers/output_handler.go index 943a883..7c86f58 100644 --- a/handlers/output_handler.go +++ b/handlers/output_handler.go @@ -11,7 +11,6 @@ import ( "github.com/oarkflow/mq" "github.com/oarkflow/mq/dag" - "github.com/oarkflow/mq/utils" ) type OutputHandler struct { @@ -19,31 +18,14 @@ type OutputHandler struct { } func (c *OutputHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { - var templateData map[string]any - if len(task.Payload) > 0 { - if err := json.Unmarshal(task.Payload, &templateData); err != nil { - return mq.Result{Error: err, Ctx: ctx} - } + templateData, err := dag.UnmarshalPayload[map[string]any](ctx, task.Payload) + if err != nil { + return mq.Result{Error: err, Ctx: ctx} } if templateData == nil { templateData = make(map[string]any) } - if c.Payload.Mapping != nil { - for k, v := range c.Payload.Mapping { - _, val := dag.GetVal(ctx, v, templateData) - templateData[k] = val - } - } - except, ok := c.Payload.Data["except_fields"].([]string) - if !ok { - exceptAny, ok := c.Payload.Data["except_fields"].([]any) - if ok { - except = make([]string, len(exceptAny)) - for i, v := range exceptAny { - except[i], _ = v.(string) - } - } - } + templateData = c.ParseMapping(ctx, templateData) outputType, _ := c.Payload.Data["output_type"].(string) switch outputType { case "stdout": @@ -132,11 +114,8 @@ func (c *OutputHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Resul "message": "Data sent to API successfully", } } - bt, _ := json.Marshal(templateData) - for _, field := range except { - bt = utils.RemoveRecursiveFromJSON(bt, field) - } + bt = c.ExceptFields(bt) return mq.Result{Payload: bt, Ctx: ctx} }