This commit is contained in:
sujit
2025-08-06 10:48:37 +05:45
parent 29d09d2d11
commit f728f89ff9
3 changed files with 169 additions and 62 deletions

View File

@@ -49,6 +49,7 @@ func testFormatHandler() {
result := runHandler(handler, testData, "Uppercase Format") result := runHandler(handler, testData, "Uppercase Format")
printResult("Uppercase formatting", result) printResult("Uppercase formatting", result)
printRequestConfigResult(testData, config, result)
// Test currency formatting // Test currency formatting
currencyData := map[string]any{ currencyData := map[string]any{
@@ -69,6 +70,7 @@ func testFormatHandler() {
result = runHandler(currencyHandler, currencyData, "Currency Format") result = runHandler(currencyHandler, currencyData, "Currency Format")
printResult("Currency formatting", result) printResult("Currency formatting", result)
printRequestConfigResult(currencyData, currencyConfig, result)
// Test date formatting // Test date formatting
dateData := map[string]any{ dateData := map[string]any{
@@ -88,6 +90,7 @@ func testFormatHandler() {
result = runHandler(dateHandler, dateData, "Date Format") result = runHandler(dateHandler, dateData, "Date Format")
printResult("Date formatting", result) printResult("Date formatting", result)
printRequestConfigResult(dateData, dateConfig, result)
} }
func testGroupHandler() { func testGroupHandler() {
@@ -121,6 +124,7 @@ func testGroupHandler() {
result := runHandler(handler, testData, "Group by Department") result := runHandler(handler, testData, "Group by Department")
printResult("Data grouping", result) printResult("Data grouping", result)
printRequestConfigResult(testData, config, result)
} }
func testSplitJoinHandler() { func testSplitJoinHandler() {
@@ -134,10 +138,9 @@ func testSplitJoinHandler() {
"skills": "golang python javascript", "skills": "golang python javascript",
} }
splitHandler := handlers.NewSplitJoinHandler("split-test") splitHandler := handlers.NewSplitHandler("split-test")
splitConfig := dag.Payload{ splitConfig := dag.Payload{
Data: map[string]any{ Data: map[string]any{
"operation": "split",
"fields": []string{"full_name", "skills"}, "fields": []string{"full_name", "skills"},
"separator": " ", "separator": " ",
}, },
@@ -146,12 +149,12 @@ func testSplitJoinHandler() {
result := runHandler(splitHandler, testData, "Split Operation (space)") result := runHandler(splitHandler, testData, "Split Operation (space)")
printResult("String splitting with space", result) printResult("String splitting with space", result)
printRequestConfigResult(testData, splitConfig, result)
// Test split with comma // Test split with comma
splitHandler2 := handlers.NewSplitJoinHandler("split-test-2") splitHandler2 := handlers.NewSplitHandler("split-test-2")
splitConfig2 := dag.Payload{ splitConfig2 := dag.Payload{
Data: map[string]any{ Data: map[string]any{
"operation": "split",
"fields": []string{"tags"}, "fields": []string{"tags"},
"separator": ",", "separator": ",",
}, },
@@ -160,6 +163,7 @@ func testSplitJoinHandler() {
result = runHandler(splitHandler2, testData, "Split Operation (comma)") result = runHandler(splitHandler2, testData, "Split Operation (comma)")
printResult("String splitting with comma", result) printResult("String splitting with comma", result)
printRequestConfigResult(testData, splitConfig2, result)
// Test join operation // Test join operation
joinData := map[string]any{ joinData := map[string]any{
@@ -169,10 +173,9 @@ func testSplitJoinHandler() {
"title": "Mr.", "title": "Mr.",
} }
joinHandler := handlers.NewSplitJoinHandler("join-test") joinHandler := handlers.NewJoinHandler("join-test")
joinConfig := dag.Payload{ joinConfig := dag.Payload{
Data: map[string]any{ Data: map[string]any{
"operation": "join",
"source_fields": []string{"title", "first_name", "middle_name", "last_name"}, "source_fields": []string{"title", "first_name", "middle_name", "last_name"},
"target_field": "full_name_with_title", "target_field": "full_name_with_title",
"separator": " ", "separator": " ",
@@ -182,6 +185,7 @@ func testSplitJoinHandler() {
result = runHandler(joinHandler, joinData, "Join Operation") result = runHandler(joinHandler, joinData, "Join Operation")
printResult("String joining", result) printResult("String joining", result)
printRequestConfigResult(joinData, joinConfig, result)
} }
func testFlattenHandler() { func testFlattenHandler() {
@@ -211,6 +215,7 @@ func testFlattenHandler() {
result := runHandler(handler, testData, "Flatten Settings") result := runHandler(handler, testData, "Flatten Settings")
printResult("Settings flattening", result) printResult("Settings flattening", result)
printRequestConfigResult(testData, config, result)
// Test flatten key-value pairs // Test flatten key-value pairs
kvData := map[string]any{ kvData := map[string]any{
@@ -236,6 +241,7 @@ func testFlattenHandler() {
result = runHandler(kvHandler, kvData, "Flatten Key-Value") result = runHandler(kvHandler, kvData, "Flatten Key-Value")
printResult("Key-value flattening", result) printResult("Key-value flattening", result)
printRequestConfigResult(kvData, kvConfig, result)
// Test flatten nested objects // Test flatten nested objects
nestedData := map[string]any{ nestedData := map[string]any{
@@ -268,6 +274,7 @@ func testFlattenHandler() {
result = runHandler(nestedHandler, nestedData, "Flatten Nested Objects") result = runHandler(nestedHandler, nestedData, "Flatten Nested Objects")
printResult("Nested object flattening", result) printResult("Nested object flattening", result)
printRequestConfigResult(nestedData, nestedConfig, result)
} }
func testJSONHandler() { func testJSONHandler() {
@@ -292,6 +299,7 @@ func testJSONHandler() {
result := runHandler(parseHandler, testData, "JSON Parsing") result := runHandler(parseHandler, testData, "JSON Parsing")
printResult("JSON parsing", result) printResult("JSON parsing", result)
printRequestConfigResult(testData, parseConfig, result)
// Test JSON stringifying // Test JSON stringifying
objData := map[string]any{ objData := map[string]any{
@@ -320,6 +328,7 @@ func testJSONHandler() {
result = runHandler(stringifyHandler, objData, "JSON Stringifying") result = runHandler(stringifyHandler, objData, "JSON Stringifying")
printResult("JSON stringifying", result) printResult("JSON stringifying", result)
printRequestConfigResult(objData, stringifyConfig, result)
// Test JSON validation // Test JSON validation
validationData := map[string]any{ validationData := map[string]any{
@@ -339,6 +348,7 @@ func testJSONHandler() {
result = runHandler(validateHandler, validationData, "JSON Validation") result = runHandler(validateHandler, validationData, "JSON Validation")
printResult("JSON validation", result) printResult("JSON validation", result)
printRequestConfigResult(validationData, validateConfig, result)
} }
func testFieldHandler() { func testFieldHandler() {
@@ -370,6 +380,7 @@ func testFieldHandler() {
result := runHandler(filterHandler, testData, "Filter/Select Fields") result := runHandler(filterHandler, testData, "Filter/Select Fields")
printResult("Field filtering", result) printResult("Field filtering", result)
printRequestConfigResult(testData, filterConfig, result)
// Test field exclusion/removal // Test field exclusion/removal
excludeHandler := handlers.NewFieldHandler("exclude-test") excludeHandler := handlers.NewFieldHandler("exclude-test")
@@ -383,6 +394,7 @@ func testFieldHandler() {
result = runHandler(excludeHandler, testData, "Exclude Fields") result = runHandler(excludeHandler, testData, "Exclude Fields")
printResult("Field exclusion", result) printResult("Field exclusion", result)
printRequestConfigResult(testData, excludeConfig, result)
// Test field renaming // Test field renaming
renameHandler := handlers.NewFieldHandler("rename-test") renameHandler := handlers.NewFieldHandler("rename-test")
@@ -404,6 +416,7 @@ func testFieldHandler() {
result = runHandler(renameHandler, testData, "Rename Fields") result = runHandler(renameHandler, testData, "Rename Fields")
printResult("Field renaming", result) printResult("Field renaming", result)
printRequestConfigResult(testData, renameConfig, result)
// Test adding new fields // Test adding new fields
addHandler := handlers.NewFieldHandler("add-test") addHandler := handlers.NewFieldHandler("add-test")
@@ -424,6 +437,7 @@ func testFieldHandler() {
result = runHandler(addHandler, testData, "Add Fields") result = runHandler(addHandler, testData, "Add Fields")
printResult("Adding fields", result) printResult("Adding fields", result)
printRequestConfigResult(testData, addConfig, result)
// Test field copying // Test field copying
copyHandler := handlers.NewFieldHandler("copy-test") copyHandler := handlers.NewFieldHandler("copy-test")
@@ -441,6 +455,7 @@ func testFieldHandler() {
result = runHandler(copyHandler, testData, "Copy Fields") result = runHandler(copyHandler, testData, "Copy Fields")
printResult("Field copying", result) printResult("Field copying", result)
printRequestConfigResult(testData, copyConfig, result)
// Test key transformation // Test key transformation
transformHandler := handlers.NewFieldHandler("transform-test") transformHandler := handlers.NewFieldHandler("transform-test")
@@ -454,6 +469,7 @@ func testFieldHandler() {
result = runHandler(transformHandler, testData, "Transform Keys") result = runHandler(transformHandler, testData, "Transform Keys")
printResult("Key transformation", result) printResult("Key transformation", result)
printRequestConfigResult(testData, transformConfig, result)
} }
func testDataHandler() { func testDataHandler() {
@@ -482,6 +498,7 @@ func testDataHandler() {
result := runHandler(sortHandler, testData, "Sort Data by Salary (Desc)") result := runHandler(sortHandler, testData, "Sort Data by Salary (Desc)")
printResult("Data sorting", result) printResult("Data sorting", result)
printRequestConfigResult(testData, sortConfig, result)
// Test field calculations // Test field calculations
calcData := map[string]any{ calcData := map[string]any{
@@ -520,6 +537,7 @@ func testDataHandler() {
result = runHandler(calcHandler, calcData, "Field Calculations") result = runHandler(calcHandler, calcData, "Field Calculations")
printResult("Field calculations", result) printResult("Field calculations", result)
printRequestConfigResult(calcData, calcConfig, result)
// Test data deduplication // Test data deduplication
dupData := map[string]any{ dupData := map[string]any{
@@ -543,6 +561,7 @@ func testDataHandler() {
result = runHandler(dedupHandler, dupData, "Data Deduplication") result = runHandler(dedupHandler, dupData, "Data Deduplication")
printResult("Data deduplication", result) printResult("Data deduplication", result)
printRequestConfigResult(dupData, dedupConfig, result)
// Test type casting // Test type casting
castData := map[string]any{ castData := map[string]any{
@@ -574,6 +593,7 @@ func testDataHandler() {
result = runHandler(castHandler, castData, "Type Casting") result = runHandler(castHandler, castData, "Type Casting")
printResult("Type casting", result) printResult("Type casting", result)
printRequestConfigResult(castData, castConfig, result)
// Test conditional field setting // Test conditional field setting
condData := map[string]any{ condData := map[string]any{
@@ -604,6 +624,7 @@ func testDataHandler() {
result = runHandler(condHandler, condData, "Conditional Field Setting") result = runHandler(condHandler, condData, "Conditional Field Setting")
printResult("Conditional setting", result) printResult("Conditional setting", result)
printRequestConfigResult(condData, condConfig, result)
} }
// Helper functions // Helper functions
@@ -662,8 +683,36 @@ func printResult(operation string, result map[string]any) {
if len(resultStr) > 1000 { if len(resultStr) > 1000 {
resultStr = resultStr[:997] + "..." resultStr = resultStr[:997] + "..."
} }
}
fmt.Printf("Result:\n%s\n", resultStr) func printRequestConfigResult(requestData map[string]any, config dag.Payload, result map[string]any) {
fmt.Println("\n=== Request Data ===")
requestJSON, err := json.MarshalIndent(requestData, "", " ")
if err != nil {
fmt.Printf("Error formatting request data: %v\n", err)
} else {
fmt.Println(string(requestJSON))
}
fmt.Println("\n=== Configuration ===")
configJSON, err := json.MarshalIndent(config.Data, "", " ")
if err != nil {
fmt.Printf("Error formatting configuration: %v\n", err)
} else {
fmt.Println(string(configJSON))
}
fmt.Println("\n=== Result ===")
if result == nil {
fmt.Println("❌ Operation failed")
} else {
resultJSON, err := json.MarshalIndent(result, "", " ")
if err != nil {
fmt.Printf("Error formatting result: %v\n", err)
} else {
fmt.Println(string(resultJSON))
}
}
} }
// Example of chaining handlers in a DAG workflow // Example of chaining handlers in a DAG workflow

View File

@@ -242,6 +242,9 @@ func (h *GroupHandler) compareValues(a, b interface{}) int {
} }
func (h *GroupHandler) getGroupByFields() []string { func (h *GroupHandler) getGroupByFields() []string {
if fields, ok := h.Payload.Data["group_by"].([]string); ok {
return fields
}
if fields, ok := h.Payload.Data["group_by"].([]interface{}); ok { if fields, ok := h.Payload.Data["group_by"].([]interface{}); ok {
var result []string var result []string
for _, field := range fields { for _, field := range fields {

View File

@@ -10,12 +10,12 @@ import (
"github.com/oarkflow/mq/dag" "github.com/oarkflow/mq/dag"
) )
// SplitJoinHandler handles string split and join operations // SplitHandler handles string split operations
type SplitJoinHandler struct { type SplitHandler struct {
dag.Operation dag.Operation
} }
func (h *SplitJoinHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { func (h *SplitHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
var data map[string]any var data map[string]any
err := json.Unmarshal(task.Payload, &data) err := json.Unmarshal(task.Payload, &data)
if err != nil { if err != nil {
@@ -31,12 +31,8 @@ func (h *SplitJoinHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Re
switch operation { switch operation {
case "split": case "split":
result = h.splitOperation(data) result = h.splitOperation(data)
case "join":
result = h.joinOperation(data)
case "split_to_array": case "split_to_array":
result = h.splitToArrayOperation(data) result = h.splitToArrayOperation(data)
case "join_from_array":
result = h.joinFromArrayOperation(data)
default: default:
return mq.Result{Error: fmt.Errorf("unsupported operation: %s", operation)} return mq.Result{Error: fmt.Errorf("unsupported operation: %s", operation)}
} }
@@ -49,11 +45,13 @@ func (h *SplitJoinHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Re
return mq.Result{Payload: resultPayload, Ctx: ctx} return mq.Result{Payload: resultPayload, Ctx: ctx}
} }
func (h *SplitJoinHandler) splitOperation(data map[string]any) map[string]any { func (h *SplitHandler) splitOperation(data map[string]any) map[string]any {
result := make(map[string]any) result := make(map[string]any)
fields := h.getTargetFields() fields := h.getTargetFields()
separator := h.getSeparator() separator := h.getSeparator()
fmt.Printf("Split Operation: Fields=%v, Separator='%s'\n", fields, separator)
// Copy all original data // Copy all original data
for key, value := range data { for key, value := range data {
result[key] = value result[key] = value
@@ -64,6 +62,8 @@ func (h *SplitJoinHandler) splitOperation(data map[string]any) map[string]any {
if str, ok := val.(string); ok { if str, ok := val.(string); ok {
parts := strings.Split(str, separator) parts := strings.Split(str, separator)
fmt.Printf("Splitting field '%s': Original='%s', Parts=%v\n", field, str, parts)
// Create individual fields for each part // Create individual fields for each part
for i, part := range parts { for i, part := range parts {
result[fmt.Sprintf("%s_%d", field, i)] = strings.TrimSpace(part) result[fmt.Sprintf("%s_%d", field, i)] = strings.TrimSpace(part)
@@ -79,32 +79,7 @@ func (h *SplitJoinHandler) splitOperation(data map[string]any) map[string]any {
return result return result
} }
func (h *SplitJoinHandler) joinOperation(data map[string]any) map[string]any { func (h *SplitHandler) splitToArrayOperation(data map[string]any) map[string]any {
result := make(map[string]any)
targetField := h.getTargetField()
separator := h.getSeparator()
sourceFields := h.getSourceFields()
// Copy all original data
for key, value := range data {
result[key] = value
}
var parts []string
for _, field := range sourceFields {
if val, ok := data[field]; ok && val != nil {
parts = append(parts, fmt.Sprintf("%v", val))
}
}
if len(parts) > 0 {
result[targetField] = strings.Join(parts, separator)
}
return result
}
func (h *SplitJoinHandler) splitToArrayOperation(data map[string]any) map[string]any {
result := make(map[string]any) result := make(map[string]any)
fields := h.getTargetFields() fields := h.getTargetFields()
separator := h.getSeparator() separator := h.getSeparator()
@@ -130,7 +105,100 @@ func (h *SplitJoinHandler) splitToArrayOperation(data map[string]any) map[string
return result return result
} }
func (h *SplitJoinHandler) joinFromArrayOperation(data map[string]any) map[string]any { func (h *SplitHandler) getTargetFields() []string {
if fields, ok := h.Payload.Data["fields"].([]string); ok {
return fields
}
if fields, ok := h.Payload.Data["fields"].([]interface{}); ok {
var result []string
for _, field := range fields {
if str, ok := field.(string); ok {
result = append(result, str)
}
}
return result
}
return nil
}
func (h *SplitHandler) getSeparator() string {
if sep, ok := h.Payload.Data["separator"].(string); ok {
return sep
}
return "," // Default separator
}
func NewSplitHandler(id string) *SplitHandler {
return &SplitHandler{
Operation: dag.Operation{ID: id, Key: "split", Type: dag.Function, Tags: []string{"data", "transformation", "string"}},
}
}
// JoinHandler handles string join operations
type JoinHandler struct {
dag.Operation
}
func (h *JoinHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
var data map[string]any
err := json.Unmarshal(task.Payload, &data)
if err != nil {
return mq.Result{Error: fmt.Errorf("failed to unmarshal task payload: %w", err)}
}
operation, ok := h.Payload.Data["operation"].(string)
if !ok {
return mq.Result{Error: fmt.Errorf("operation not specified")}
}
var result map[string]any
switch operation {
case "join":
result = h.joinOperation(data)
case "join_from_array":
result = h.joinFromArrayOperation(data)
default:
return mq.Result{Error: fmt.Errorf("unsupported operation: %s", operation)}
}
resultPayload, err := json.Marshal(result)
if err != nil {
return mq.Result{Error: fmt.Errorf("failed to marshal result: %w", err)}
}
return mq.Result{Payload: resultPayload, Ctx: ctx}
}
func (h *JoinHandler) joinOperation(data map[string]any) map[string]any {
result := make(map[string]any)
targetField := h.getTargetField()
separator := h.getSeparator()
sourceFields := h.getSourceFields()
fmt.Printf("Join Operation: TargetField='%s', Separator='%s', SourceFields=%v\n", targetField, separator, sourceFields)
// Copy all original data
for key, value := range data {
result[key] = value
}
var parts []string
for _, field := range sourceFields {
if val, ok := data[field]; ok && val != nil {
parts = append(parts, fmt.Sprintf("%v", val))
}
}
fmt.Printf("Joining fields: Parts=%v\n", parts)
if len(parts) > 0 {
result[targetField] = strings.Join(parts, separator)
}
return result
}
func (h *JoinHandler) joinFromArrayOperation(data map[string]any) map[string]any {
result := make(map[string]any) result := make(map[string]any)
targetField := h.getTargetField() targetField := h.getTargetField()
separator := h.getSeparator() separator := h.getSeparator()
@@ -156,34 +224,21 @@ func (h *SplitJoinHandler) joinFromArrayOperation(data map[string]any) map[strin
return result return result
} }
func (h *SplitJoinHandler) getTargetFields() []string { func (h *JoinHandler) getTargetField() string {
if fields, ok := h.Payload.Data["fields"].([]interface{}); ok {
var result []string
for _, field := range fields {
if str, ok := field.(string); ok {
result = append(result, str)
}
}
return result
}
return nil
}
func (h *SplitJoinHandler) getTargetField() string {
if field, ok := h.Payload.Data["target_field"].(string); ok { if field, ok := h.Payload.Data["target_field"].(string); ok {
return field return field
} }
return "joined_field" return "joined_field"
} }
func (h *SplitJoinHandler) getSourceField() string { func (h *JoinHandler) getSourceField() string {
if field, ok := h.Payload.Data["source_field"].(string); ok { if field, ok := h.Payload.Data["source_field"].(string); ok {
return field return field
} }
return "" return ""
} }
func (h *SplitJoinHandler) getSourceFields() []string { func (h *JoinHandler) getSourceFields() []string {
if fields, ok := h.Payload.Data["source_fields"].([]interface{}); ok { if fields, ok := h.Payload.Data["source_fields"].([]interface{}); ok {
var result []string var result []string
for _, field := range fields { for _, field := range fields {
@@ -196,15 +251,15 @@ func (h *SplitJoinHandler) getSourceFields() []string {
return nil return nil
} }
func (h *SplitJoinHandler) getSeparator() string { func (h *JoinHandler) getSeparator() string {
if sep, ok := h.Payload.Data["separator"].(string); ok { if sep, ok := h.Payload.Data["separator"].(string); ok {
return sep return sep
} }
return "," // Default separator return "," // Default separator
} }
func NewSplitJoinHandler(id string) *SplitJoinHandler { func NewJoinHandler(id string) *JoinHandler {
return &SplitJoinHandler{ return &JoinHandler{
Operation: dag.Operation{ID: id, Key: "split_join", Type: dag.Function, Tags: []string{"data", "transformation", "string"}}, Operation: dag.Operation{ID: id, Key: "join", Type: dag.Function, Tags: []string{"data", "transformation", "string"}},
} }
} }