diff --git a/examples/data_transform_demo.go b/examples/data_transform_demo.go index 48007c9..20f3ba6 100644 --- a/examples/data_transform_demo.go +++ b/examples/data_transform_demo.go @@ -49,6 +49,7 @@ func testFormatHandler() { result := runHandler(handler, testData, "Uppercase Format") printResult("Uppercase formatting", result) + printRequestConfigResult(testData, config, result) // Test currency formatting currencyData := map[string]any{ @@ -69,6 +70,7 @@ func testFormatHandler() { result = runHandler(currencyHandler, currencyData, "Currency Format") printResult("Currency formatting", result) + printRequestConfigResult(currencyData, currencyConfig, result) // Test date formatting dateData := map[string]any{ @@ -88,6 +90,7 @@ func testFormatHandler() { result = runHandler(dateHandler, dateData, "Date Format") printResult("Date formatting", result) + printRequestConfigResult(dateData, dateConfig, result) } func testGroupHandler() { @@ -121,6 +124,7 @@ func testGroupHandler() { result := runHandler(handler, testData, "Group by Department") printResult("Data grouping", result) + printRequestConfigResult(testData, config, result) } func testSplitJoinHandler() { @@ -134,10 +138,9 @@ func testSplitJoinHandler() { "skills": "golang python javascript", } - splitHandler := handlers.NewSplitJoinHandler("split-test") + splitHandler := handlers.NewSplitHandler("split-test") splitConfig := dag.Payload{ Data: map[string]any{ - "operation": "split", "fields": []string{"full_name", "skills"}, "separator": " ", }, @@ -146,12 +149,12 @@ func testSplitJoinHandler() { result := runHandler(splitHandler, testData, "Split Operation (space)") printResult("String splitting with space", result) + printRequestConfigResult(testData, splitConfig, result) // Test split with comma - splitHandler2 := handlers.NewSplitJoinHandler("split-test-2") + splitHandler2 := handlers.NewSplitHandler("split-test-2") splitConfig2 := dag.Payload{ Data: map[string]any{ - "operation": "split", "fields": []string{"tags"}, "separator": ",", }, @@ -160,6 +163,7 @@ func testSplitJoinHandler() { result = runHandler(splitHandler2, testData, "Split Operation (comma)") printResult("String splitting with comma", result) + printRequestConfigResult(testData, splitConfig2, result) // Test join operation joinData := map[string]any{ @@ -169,10 +173,9 @@ func testSplitJoinHandler() { "title": "Mr.", } - joinHandler := handlers.NewSplitJoinHandler("join-test") + joinHandler := handlers.NewJoinHandler("join-test") joinConfig := dag.Payload{ Data: map[string]any{ - "operation": "join", "source_fields": []string{"title", "first_name", "middle_name", "last_name"}, "target_field": "full_name_with_title", "separator": " ", @@ -182,6 +185,7 @@ func testSplitJoinHandler() { result = runHandler(joinHandler, joinData, "Join Operation") printResult("String joining", result) + printRequestConfigResult(joinData, joinConfig, result) } func testFlattenHandler() { @@ -211,6 +215,7 @@ func testFlattenHandler() { result := runHandler(handler, testData, "Flatten Settings") printResult("Settings flattening", result) + printRequestConfigResult(testData, config, result) // Test flatten key-value pairs kvData := map[string]any{ @@ -236,6 +241,7 @@ func testFlattenHandler() { result = runHandler(kvHandler, kvData, "Flatten Key-Value") printResult("Key-value flattening", result) + printRequestConfigResult(kvData, kvConfig, result) // Test flatten nested objects nestedData := map[string]any{ @@ -268,6 +274,7 @@ func testFlattenHandler() { result = runHandler(nestedHandler, nestedData, "Flatten Nested Objects") printResult("Nested object flattening", result) + printRequestConfigResult(nestedData, nestedConfig, result) } func testJSONHandler() { @@ -292,6 +299,7 @@ func testJSONHandler() { result := runHandler(parseHandler, testData, "JSON Parsing") printResult("JSON parsing", result) + printRequestConfigResult(testData, parseConfig, result) // Test JSON stringifying objData := map[string]any{ @@ -320,6 +328,7 @@ func testJSONHandler() { result = runHandler(stringifyHandler, objData, "JSON Stringifying") printResult("JSON stringifying", result) + printRequestConfigResult(objData, stringifyConfig, result) // Test JSON validation validationData := map[string]any{ @@ -339,6 +348,7 @@ func testJSONHandler() { result = runHandler(validateHandler, validationData, "JSON Validation") printResult("JSON validation", result) + printRequestConfigResult(validationData, validateConfig, result) } func testFieldHandler() { @@ -370,6 +380,7 @@ func testFieldHandler() { result := runHandler(filterHandler, testData, "Filter/Select Fields") printResult("Field filtering", result) + printRequestConfigResult(testData, filterConfig, result) // Test field exclusion/removal excludeHandler := handlers.NewFieldHandler("exclude-test") @@ -383,6 +394,7 @@ func testFieldHandler() { result = runHandler(excludeHandler, testData, "Exclude Fields") printResult("Field exclusion", result) + printRequestConfigResult(testData, excludeConfig, result) // Test field renaming renameHandler := handlers.NewFieldHandler("rename-test") @@ -404,6 +416,7 @@ func testFieldHandler() { result = runHandler(renameHandler, testData, "Rename Fields") printResult("Field renaming", result) + printRequestConfigResult(testData, renameConfig, result) // Test adding new fields addHandler := handlers.NewFieldHandler("add-test") @@ -424,6 +437,7 @@ func testFieldHandler() { result = runHandler(addHandler, testData, "Add Fields") printResult("Adding fields", result) + printRequestConfigResult(testData, addConfig, result) // Test field copying copyHandler := handlers.NewFieldHandler("copy-test") @@ -441,6 +455,7 @@ func testFieldHandler() { result = runHandler(copyHandler, testData, "Copy Fields") printResult("Field copying", result) + printRequestConfigResult(testData, copyConfig, result) // Test key transformation transformHandler := handlers.NewFieldHandler("transform-test") @@ -454,6 +469,7 @@ func testFieldHandler() { result = runHandler(transformHandler, testData, "Transform Keys") printResult("Key transformation", result) + printRequestConfigResult(testData, transformConfig, result) } func testDataHandler() { @@ -482,6 +498,7 @@ func testDataHandler() { result := runHandler(sortHandler, testData, "Sort Data by Salary (Desc)") printResult("Data sorting", result) + printRequestConfigResult(testData, sortConfig, result) // Test field calculations calcData := map[string]any{ @@ -520,6 +537,7 @@ func testDataHandler() { result = runHandler(calcHandler, calcData, "Field Calculations") printResult("Field calculations", result) + printRequestConfigResult(calcData, calcConfig, result) // Test data deduplication dupData := map[string]any{ @@ -543,6 +561,7 @@ func testDataHandler() { result = runHandler(dedupHandler, dupData, "Data Deduplication") printResult("Data deduplication", result) + printRequestConfigResult(dupData, dedupConfig, result) // Test type casting castData := map[string]any{ @@ -574,6 +593,7 @@ func testDataHandler() { result = runHandler(castHandler, castData, "Type Casting") printResult("Type casting", result) + printRequestConfigResult(castData, castConfig, result) // Test conditional field setting condData := map[string]any{ @@ -604,6 +624,7 @@ func testDataHandler() { result = runHandler(condHandler, condData, "Conditional Field Setting") printResult("Conditional setting", result) + printRequestConfigResult(condData, condConfig, result) } // Helper functions @@ -662,8 +683,36 @@ func printResult(operation string, result map[string]any) { if len(resultStr) > 1000 { resultStr = resultStr[:997] + "..." } +} - fmt.Printf("Result:\n%s\n", resultStr) +func printRequestConfigResult(requestData map[string]any, config dag.Payload, result map[string]any) { + fmt.Println("\n=== Request Data ===") + requestJSON, err := json.MarshalIndent(requestData, "", " ") + if err != nil { + fmt.Printf("Error formatting request data: %v\n", err) + } else { + fmt.Println(string(requestJSON)) + } + + fmt.Println("\n=== Configuration ===") + configJSON, err := json.MarshalIndent(config.Data, "", " ") + if err != nil { + fmt.Printf("Error formatting configuration: %v\n", err) + } else { + fmt.Println(string(configJSON)) + } + + fmt.Println("\n=== Result ===") + if result == nil { + fmt.Println("❌ Operation failed") + } else { + resultJSON, err := json.MarshalIndent(result, "", " ") + if err != nil { + fmt.Printf("Error formatting result: %v\n", err) + } else { + fmt.Println(string(resultJSON)) + } + } } // Example of chaining handlers in a DAG workflow diff --git a/handlers/group_handler.go b/handlers/group_handler.go index a0d5142..11ce307 100644 --- a/handlers/group_handler.go +++ b/handlers/group_handler.go @@ -242,6 +242,9 @@ func (h *GroupHandler) compareValues(a, b interface{}) int { } func (h *GroupHandler) getGroupByFields() []string { + if fields, ok := h.Payload.Data["group_by"].([]string); ok { + return fields + } if fields, ok := h.Payload.Data["group_by"].([]interface{}); ok { var result []string for _, field := range fields { diff --git a/handlers/split_join_handler.go b/handlers/split_join_handler.go index b96a491..c7d8950 100644 --- a/handlers/split_join_handler.go +++ b/handlers/split_join_handler.go @@ -10,12 +10,12 @@ import ( "github.com/oarkflow/mq/dag" ) -// SplitJoinHandler handles string split and join operations -type SplitJoinHandler struct { +// SplitHandler handles string split operations +type SplitHandler struct { dag.Operation } -func (h *SplitJoinHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { +func (h *SplitHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { var data map[string]any err := json.Unmarshal(task.Payload, &data) if err != nil { @@ -31,12 +31,8 @@ func (h *SplitJoinHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Re switch operation { case "split": result = h.splitOperation(data) - case "join": - result = h.joinOperation(data) case "split_to_array": result = h.splitToArrayOperation(data) - case "join_from_array": - result = h.joinFromArrayOperation(data) default: return mq.Result{Error: fmt.Errorf("unsupported operation: %s", operation)} } @@ -49,11 +45,13 @@ func (h *SplitJoinHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Re return mq.Result{Payload: resultPayload, Ctx: ctx} } -func (h *SplitJoinHandler) splitOperation(data map[string]any) map[string]any { +func (h *SplitHandler) splitOperation(data map[string]any) map[string]any { result := make(map[string]any) fields := h.getTargetFields() separator := h.getSeparator() + fmt.Printf("Split Operation: Fields=%v, Separator='%s'\n", fields, separator) + // Copy all original data for key, value := range data { result[key] = value @@ -64,6 +62,8 @@ func (h *SplitJoinHandler) splitOperation(data map[string]any) map[string]any { if str, ok := val.(string); ok { parts := strings.Split(str, separator) + fmt.Printf("Splitting field '%s': Original='%s', Parts=%v\n", field, str, parts) + // Create individual fields for each part for i, part := range parts { result[fmt.Sprintf("%s_%d", field, i)] = strings.TrimSpace(part) @@ -79,32 +79,7 @@ func (h *SplitJoinHandler) splitOperation(data map[string]any) map[string]any { return result } -func (h *SplitJoinHandler) joinOperation(data map[string]any) map[string]any { - result := make(map[string]any) - targetField := h.getTargetField() - separator := h.getSeparator() - sourceFields := h.getSourceFields() - - // Copy all original data - for key, value := range data { - result[key] = value - } - - var parts []string - for _, field := range sourceFields { - if val, ok := data[field]; ok && val != nil { - parts = append(parts, fmt.Sprintf("%v", val)) - } - } - - if len(parts) > 0 { - result[targetField] = strings.Join(parts, separator) - } - - return result -} - -func (h *SplitJoinHandler) splitToArrayOperation(data map[string]any) map[string]any { +func (h *SplitHandler) splitToArrayOperation(data map[string]any) map[string]any { result := make(map[string]any) fields := h.getTargetFields() separator := h.getSeparator() @@ -130,7 +105,100 @@ func (h *SplitJoinHandler) splitToArrayOperation(data map[string]any) map[string return result } -func (h *SplitJoinHandler) joinFromArrayOperation(data map[string]any) map[string]any { +func (h *SplitHandler) getTargetFields() []string { + if fields, ok := h.Payload.Data["fields"].([]string); ok { + return fields + } + if fields, ok := h.Payload.Data["fields"].([]interface{}); ok { + var result []string + for _, field := range fields { + if str, ok := field.(string); ok { + result = append(result, str) + } + } + return result + } + return nil +} + +func (h *SplitHandler) getSeparator() string { + if sep, ok := h.Payload.Data["separator"].(string); ok { + return sep + } + return "," // Default separator +} + +func NewSplitHandler(id string) *SplitHandler { + return &SplitHandler{ + Operation: dag.Operation{ID: id, Key: "split", Type: dag.Function, Tags: []string{"data", "transformation", "string"}}, + } +} + +// JoinHandler handles string join operations +type JoinHandler struct { + dag.Operation +} + +func (h *JoinHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]any + err := json.Unmarshal(task.Payload, &data) + if err != nil { + return mq.Result{Error: fmt.Errorf("failed to unmarshal task payload: %w", err)} + } + + operation, ok := h.Payload.Data["operation"].(string) + if !ok { + return mq.Result{Error: fmt.Errorf("operation not specified")} + } + + var result map[string]any + switch operation { + case "join": + result = h.joinOperation(data) + case "join_from_array": + result = h.joinFromArrayOperation(data) + default: + return mq.Result{Error: fmt.Errorf("unsupported operation: %s", operation)} + } + + resultPayload, err := json.Marshal(result) + if err != nil { + return mq.Result{Error: fmt.Errorf("failed to marshal result: %w", err)} + } + + return mq.Result{Payload: resultPayload, Ctx: ctx} +} + +func (h *JoinHandler) joinOperation(data map[string]any) map[string]any { + result := make(map[string]any) + targetField := h.getTargetField() + separator := h.getSeparator() + sourceFields := h.getSourceFields() + + fmt.Printf("Join Operation: TargetField='%s', Separator='%s', SourceFields=%v\n", targetField, separator, sourceFields) + + // Copy all original data + for key, value := range data { + result[key] = value + } + + var parts []string + for _, field := range sourceFields { + if val, ok := data[field]; ok && val != nil { + parts = append(parts, fmt.Sprintf("%v", val)) + } + } + + fmt.Printf("Joining fields: Parts=%v\n", parts) + + if len(parts) > 0 { + result[targetField] = strings.Join(parts, separator) + } + + return result +} + +func (h *JoinHandler) joinFromArrayOperation(data map[string]any) map[string]any { result := make(map[string]any) targetField := h.getTargetField() separator := h.getSeparator() @@ -156,34 +224,21 @@ func (h *SplitJoinHandler) joinFromArrayOperation(data map[string]any) map[strin return result } -func (h *SplitJoinHandler) getTargetFields() []string { - if fields, ok := h.Payload.Data["fields"].([]interface{}); ok { - var result []string - for _, field := range fields { - if str, ok := field.(string); ok { - result = append(result, str) - } - } - return result - } - return nil -} - -func (h *SplitJoinHandler) getTargetField() string { +func (h *JoinHandler) getTargetField() string { if field, ok := h.Payload.Data["target_field"].(string); ok { return field } return "joined_field" } -func (h *SplitJoinHandler) getSourceField() string { +func (h *JoinHandler) getSourceField() string { if field, ok := h.Payload.Data["source_field"].(string); ok { return field } return "" } -func (h *SplitJoinHandler) getSourceFields() []string { +func (h *JoinHandler) getSourceFields() []string { if fields, ok := h.Payload.Data["source_fields"].([]interface{}); ok { var result []string for _, field := range fields { @@ -196,15 +251,15 @@ func (h *SplitJoinHandler) getSourceFields() []string { return nil } -func (h *SplitJoinHandler) getSeparator() string { +func (h *JoinHandler) getSeparator() string { if sep, ok := h.Payload.Data["separator"].(string); ok { return sep } return "," // Default separator } -func NewSplitJoinHandler(id string) *SplitJoinHandler { - return &SplitJoinHandler{ - Operation: dag.Operation{ID: id, Key: "split_join", Type: dag.Function, Tags: []string{"data", "transformation", "string"}}, +func NewJoinHandler(id string) *JoinHandler { + return &JoinHandler{ + Operation: dag.Operation{ID: id, Key: "join", Type: dag.Function, Tags: []string{"data", "transformation", "string"}}, } }