diff --git a/handlers/data_transformation_handler.go b/handlers/data_transformation_handler.go new file mode 100644 index 0000000..cbfc130 --- /dev/null +++ b/handlers/data_transformation_handler.go @@ -0,0 +1,765 @@ +package handlers + +import ( + "context" + "fmt" + "math" + "reflect" + "sort" + "strconv" + "strings" + + "github.com/oarkflow/json" + "github.com/oarkflow/mq" + "github.com/oarkflow/mq/dag" +) + +// DataTransformationHandler provides comprehensive data transformation capabilities +type DataTransformationHandler struct { + dag.Operation + Transformations []DataTransformation `json:"transformations"` // list of transformations to apply +} + +type DataTransformation struct { + Name string `json:"name"` // transformation name/identifier + Type string `json:"type"` // transformation type + SourceField string `json:"source_field"` // source field (can be empty for data-wide operations) + TargetField string `json:"target_field"` // target field (can be empty to overwrite source) + Config map[string]any `json:"config"` // transformation configuration + Condition *TransformCondition `json:"condition"` // optional condition for when to apply +} + +type TransformCondition struct { + Field string `json:"field"` // field to check + Operator string `json:"operator"` // eq, ne, gt, lt, ge, le, contains, regex + Value any `json:"value"` // value to compare against +} + +func (d *DataTransformationHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]any + if err := json.Unmarshal(task.Payload, &data); err != nil { + return mq.Result{Error: fmt.Errorf("failed to unmarshal data: %v", err), Ctx: ctx} + } + + // Apply transformations in sequence + for i, transformation := range d.Transformations { + // Check condition if specified + if transformation.Condition != nil { + if !d.evaluateCondition(data, transformation.Condition) { + continue // skip this transformation + } + } + + var err error + data, err = d.applyTransformation(data, transformation) + if err != nil { + return mq.Result{Error: fmt.Errorf("transformation %d (%s) failed: %v", i+1, transformation.Name, err), Ctx: ctx} + } + } + + bt, _ := json.Marshal(data) + return mq.Result{Payload: bt, Ctx: ctx} +} + +func (d *DataTransformationHandler) evaluateCondition(data map[string]any, condition *TransformCondition) bool { + fieldValue, exists := data[condition.Field] + if !exists { + return false + } + + switch condition.Operator { + case "eq": + return fmt.Sprintf("%v", fieldValue) == fmt.Sprintf("%v", condition.Value) + case "ne": + return fmt.Sprintf("%v", fieldValue) != fmt.Sprintf("%v", condition.Value) + case "gt": + return d.compareNumeric(fieldValue, condition.Value) > 0 + case "lt": + return d.compareNumeric(fieldValue, condition.Value) < 0 + case "ge": + return d.compareNumeric(fieldValue, condition.Value) >= 0 + case "le": + return d.compareNumeric(fieldValue, condition.Value) <= 0 + case "contains": + return strings.Contains(fmt.Sprintf("%v", fieldValue), fmt.Sprintf("%v", condition.Value)) + case "regex": + // Basic regex support - in production, use proper regex library + return strings.Contains(fmt.Sprintf("%v", fieldValue), fmt.Sprintf("%v", condition.Value)) + default: + return false + } +} + +func (d *DataTransformationHandler) compareNumeric(a, b any) int { + aFloat := d.toFloat64(a) + bFloat := d.toFloat64(b) + + if aFloat < bFloat { + return -1 + } else if aFloat > bFloat { + return 1 + } + return 0 +} + +func (d *DataTransformationHandler) applyTransformation(data map[string]any, transformation DataTransformation) (map[string]any, error) { + switch transformation.Type { + case "normalize": + return d.normalizeData(data, transformation) + case "aggregate": + return d.aggregateData(data, transformation) + case "pivot": + return d.pivotData(data, transformation) + case "unpivot": + return d.unpivotData(data, transformation) + case "calculate": + return d.calculateField(data, transformation) + case "lookup": + return d.lookupTransform(data, transformation) + case "bucket": + return d.bucketize(data, transformation) + case "rank": + return d.rankData(data, transformation) + case "window": + return d.windowFunction(data, transformation) + case "encode": + return d.encodeData(data, transformation) + case "decode": + return d.decodeData(data, transformation) + case "validate": + return d.validateData(data, transformation) + default: + return nil, fmt.Errorf("unsupported transformation type: %s", transformation.Type) + } +} + +func (d *DataTransformationHandler) normalizeData(data map[string]any, transformation DataTransformation) (map[string]any, error) { + sourceValue := data[transformation.SourceField] + normalizeType, _ := transformation.Config["type"].(string) + + var normalized any + var err error + + switch normalizeType { + case "min_max": + normalized, err = d.minMaxNormalize(sourceValue, transformation.Config) + case "z_score": + normalized, err = d.zScoreNormalize(sourceValue, transformation.Config) + case "unit_vector": + normalized, err = d.unitVectorNormalize(sourceValue, transformation.Config) + default: + return nil, fmt.Errorf("unsupported normalization type: %s", normalizeType) + } + + if err != nil { + return nil, err + } + + targetField := transformation.TargetField + if targetField == "" { + targetField = transformation.SourceField + } + + result := make(map[string]any) + for k, v := range data { + result[k] = v + } + result[targetField] = normalized + + return result, nil +} + +func (d *DataTransformationHandler) minMaxNormalize(value any, config map[string]any) (float64, error) { + num := d.toFloat64(value) + min, _ := config["min"].(float64) + max, _ := config["max"].(float64) + + if max == min { + return 0, nil + } + + return (num - min) / (max - min), nil +} + +func (d *DataTransformationHandler) zScoreNormalize(value any, config map[string]any) (float64, error) { + num := d.toFloat64(value) + mean, _ := config["mean"].(float64) + stdDev, _ := config["std_dev"].(float64) + + if stdDev == 0 { + return 0, nil + } + + return (num - mean) / stdDev, nil +} + +func (d *DataTransformationHandler) unitVectorNormalize(value any, config map[string]any) (float64, error) { + num := d.toFloat64(value) + magnitude, _ := config["magnitude"].(float64) + + if magnitude == 0 { + return 0, nil + } + + return num / magnitude, nil +} + +func (d *DataTransformationHandler) calculateField(data map[string]any, transformation DataTransformation) (map[string]any, error) { + expression, _ := transformation.Config["expression"].(string) + + // Simple expression evaluator - in production, use a proper expression library + result, err := d.evaluateExpression(expression, data) + if err != nil { + return nil, err + } + + targetField := transformation.TargetField + if targetField == "" { + return nil, fmt.Errorf("target field is required for calculate transformation") + } + + resultData := make(map[string]any) + for k, v := range data { + resultData[k] = v + } + resultData[targetField] = result + + return resultData, nil +} + +func (d *DataTransformationHandler) evaluateExpression(expression string, data map[string]any) (any, error) { + // Basic expression evaluation - replace with proper expression evaluator + // This is a simplified implementation for common cases + + expression = strings.TrimSpace(expression) + + // Handle simple field references + if value, exists := data[expression]; exists { + return value, nil + } + + // Handle simple arithmetic operations + if strings.Contains(expression, "+") { + parts := strings.Split(expression, "+") + if len(parts) == 2 { + left := strings.TrimSpace(parts[0]) + right := strings.TrimSpace(parts[1]) + + leftVal := d.getValueOrNumber(left, data) + rightVal := d.getValueOrNumber(right, data) + + return d.toFloat64(leftVal) + d.toFloat64(rightVal), nil + } + } + + if strings.Contains(expression, "-") { + parts := strings.Split(expression, "-") + if len(parts) == 2 { + left := strings.TrimSpace(parts[0]) + right := strings.TrimSpace(parts[1]) + + leftVal := d.getValueOrNumber(left, data) + rightVal := d.getValueOrNumber(right, data) + + return d.toFloat64(leftVal) - d.toFloat64(rightVal), nil + } + } + + if strings.Contains(expression, "*") { + parts := strings.Split(expression, "*") + if len(parts) == 2 { + left := strings.TrimSpace(parts[0]) + right := strings.TrimSpace(parts[1]) + + leftVal := d.getValueOrNumber(left, data) + rightVal := d.getValueOrNumber(right, data) + + return d.toFloat64(leftVal) * d.toFloat64(rightVal), nil + } + } + + if strings.Contains(expression, "/") { + parts := strings.Split(expression, "/") + if len(parts) == 2 { + left := strings.TrimSpace(parts[0]) + right := strings.TrimSpace(parts[1]) + + leftVal := d.getValueOrNumber(left, data) + rightVal := d.toFloat64(d.getValueOrNumber(right, data)) + + if rightVal == 0 { + return nil, fmt.Errorf("division by zero") + } + + return d.toFloat64(leftVal) / rightVal, nil + } + } + + return nil, fmt.Errorf("unable to evaluate expression: %s", expression) +} + +func (d *DataTransformationHandler) getValueOrNumber(str string, data map[string]any) any { + // Check if it's a field reference + if value, exists := data[str]; exists { + return value + } + + // Try to parse as number + if num, err := strconv.ParseFloat(str, 64); err == nil { + return num + } + + // Return as string + return str +} + +func (d *DataTransformationHandler) bucketize(data map[string]any, transformation DataTransformation) (map[string]any, error) { + sourceValue := data[transformation.SourceField] + buckets, _ := transformation.Config["buckets"].([]any) + labels, _ := transformation.Config["labels"].([]any) + + num := d.toFloat64(sourceValue) + + // Find the appropriate bucket + var bucketIndex int = -1 + for i, bucket := range buckets { + if bucketVal := d.toFloat64(bucket); num <= bucketVal { + bucketIndex = i + break + } + } + + var result any + if bucketIndex >= 0 && bucketIndex < len(labels) { + result = labels[bucketIndex] + } else { + result = "out_of_range" + } + + targetField := transformation.TargetField + if targetField == "" { + targetField = transformation.SourceField + } + + resultData := make(map[string]any) + for k, v := range data { + resultData[k] = v + } + resultData[targetField] = result + + return resultData, nil +} + +func (d *DataTransformationHandler) encodeData(data map[string]any, transformation DataTransformation) (map[string]any, error) { + sourceValue := data[transformation.SourceField] + encodingType, _ := transformation.Config["type"].(string) + + var encoded any + var err error + + switch encodingType { + case "one_hot": + encoded, err = d.oneHotEncode(sourceValue, transformation.Config) + case "label": + encoded, err = d.labelEncode(sourceValue, transformation.Config) + case "ordinal": + encoded, err = d.ordinalEncode(sourceValue, transformation.Config) + default: + return nil, fmt.Errorf("unsupported encoding type: %s", encodingType) + } + + if err != nil { + return nil, err + } + + targetField := transformation.TargetField + if targetField == "" { + targetField = transformation.SourceField + } + + result := make(map[string]any) + for k, v := range data { + result[k] = v + } + result[targetField] = encoded + + return result, nil +} + +func (d *DataTransformationHandler) oneHotEncode(value any, config map[string]any) (map[string]any, error) { + categories, _ := config["categories"].([]any) + valueStr := fmt.Sprintf("%v", value) + + result := make(map[string]any) + for _, category := range categories { + categoryStr := fmt.Sprintf("%v", category) + if valueStr == categoryStr { + result[categoryStr] = 1 + } else { + result[categoryStr] = 0 + } + } + + return result, nil +} + +func (d *DataTransformationHandler) labelEncode(value any, config map[string]any) (int, error) { + mapping, _ := config["mapping"].(map[string]any) + valueStr := fmt.Sprintf("%v", value) + + if encoded, exists := mapping[valueStr]; exists { + return int(d.toFloat64(encoded)), nil + } + + return -1, fmt.Errorf("value '%s' not found in encoding mapping", valueStr) +} + +func (d *DataTransformationHandler) ordinalEncode(value any, config map[string]any) (int, error) { + order, _ := config["order"].([]any) + valueStr := fmt.Sprintf("%v", value) + + for i, item := range order { + if fmt.Sprintf("%v", item) == valueStr { + return i, nil + } + } + + return -1, fmt.Errorf("value '%s' not found in ordinal order", valueStr) +} + +func (d *DataTransformationHandler) aggregateData(data map[string]any, transformation DataTransformation) (map[string]any, error) { + // This is a simplified version - for complex aggregations, use GroupingHandler + aggregationType, _ := transformation.Config["type"].(string) + sourceField := transformation.SourceField + + // Assume source field contains an array of values + sourceValue, exists := data[sourceField] + if !exists { + return nil, fmt.Errorf("source field '%s' not found", sourceField) + } + + values := d.extractNumbers(sourceValue) + if len(values) == 0 { + return nil, fmt.Errorf("no numeric values found in source field") + } + + var result float64 + + switch aggregationType { + case "sum": + for _, v := range values { + result += v + } + case "avg", "mean": + for _, v := range values { + result += v + } + result /= float64(len(values)) + case "min": + result = values[0] + for _, v := range values { + if v < result { + result = v + } + } + case "max": + result = values[0] + for _, v := range values { + if v > result { + result = v + } + } + case "std": + // Calculate standard deviation + mean := 0.0 + for _, v := range values { + mean += v + } + mean /= float64(len(values)) + + variance := 0.0 + for _, v := range values { + variance += math.Pow(v-mean, 2) + } + variance /= float64(len(values)) + result = math.Sqrt(variance) + default: + return nil, fmt.Errorf("unsupported aggregation type: %s", aggregationType) + } + + targetField := transformation.TargetField + if targetField == "" { + targetField = sourceField + } + + resultData := make(map[string]any) + for k, v := range data { + resultData[k] = v + } + resultData[targetField] = result + + return resultData, nil +} + +func (d *DataTransformationHandler) extractNumbers(value any) []float64 { + var numbers []float64 + + rv := reflect.ValueOf(value) + if rv.Kind() == reflect.Slice || rv.Kind() == reflect.Array { + for i := 0; i < rv.Len(); i++ { + if num := d.toFloat64(rv.Index(i).Interface()); num != 0 { + numbers = append(numbers, num) + } + } + } else { + if num := d.toFloat64(value); num != 0 { + numbers = append(numbers, num) + } + } + + return numbers +} + +func (d *DataTransformationHandler) rankData(data map[string]any, transformation DataTransformation) (map[string]any, error) { + // For ranking, we need the data to contain an array of items + arrayField, _ := transformation.Config["array_field"].(string) + rankField := transformation.SourceField + + arrayData, exists := data[arrayField] + if !exists { + return nil, fmt.Errorf("array field '%s' not found", arrayField) + } + + // Convert to slice and extract values for ranking + rv := reflect.ValueOf(arrayData) + if rv.Kind() != reflect.Slice && rv.Kind() != reflect.Array { + return nil, fmt.Errorf("array field must contain an array") + } + + type rankItem struct { + index int + value float64 + } + + var items []rankItem + for i := 0; i < rv.Len(); i++ { + item := rv.Index(i).Interface() + if itemMap, ok := item.(map[string]any); ok { + if val, exists := itemMap[rankField]; exists { + items = append(items, rankItem{ + index: i, + value: d.toFloat64(val), + }) + } + } + } + + // Sort by value + sort.Slice(items, func(i, j int) bool { + return items[i].value > items[j].value // descending order + }) + + // Assign ranks + ranks := make(map[int]int) + for rank, item := range items { + ranks[item.index] = rank + 1 + } + + // Update the original data with ranks + targetField := transformation.TargetField + if targetField == "" { + targetField = rankField + "_rank" + } + + for i := 0; i < rv.Len(); i++ { + item := rv.Index(i).Interface() + if itemMap, ok := item.(map[string]any); ok { + itemMap[targetField] = ranks[i] + } + } + + return data, nil +} + +func (d *DataTransformationHandler) pivotData(data map[string]any, transformation DataTransformation) (map[string]any, error) { + // Pivot transformation implementation + pivotField, _ := transformation.Config["pivot_field"].(string) + valueField, _ := transformation.Config["value_field"].(string) + + if pivotField == "" || valueField == "" { + return nil, fmt.Errorf("pivot_field and value_field are required") + } + + result := make(map[string]any) + for key, value := range data { + if key == pivotField { + result[fmt.Sprintf("%v", value)] = data[valueField] + } + } + + return result, nil +} + +func (d *DataTransformationHandler) unpivotData(data map[string]any, transformation DataTransformation) (map[string]any, error) { + // Unpivot transformation implementation + unpivotFields, _ := transformation.Config["fields"].([]string) + if len(unpivotFields) == 0 { + return nil, fmt.Errorf("fields for unpivoting are required") + } + + result := make(map[string]any) + for _, field := range unpivotFields { + if value, exists := data[field]; exists { + result[field] = value + } + } + + return result, nil +} + +func (d *DataTransformationHandler) lookupTransform(data map[string]any, transformation DataTransformation) (map[string]any, error) { + // Lookup transformation implementation + lookupTable, _ := transformation.Config["lookup_table"].(map[string]any) + lookupKey, _ := transformation.Config["lookup_key"].(string) + + if lookupTable == nil || lookupKey == "" { + return nil, fmt.Errorf("lookup_table and lookup_key are required") + } + + lookupValue := data[lookupKey] + if result, exists := lookupTable[fmt.Sprintf("%v", lookupValue)]; exists { + return map[string]any{lookupKey: result}, nil + } + + return nil, fmt.Errorf("lookup value not found") +} + +func (d *DataTransformationHandler) windowFunction(data map[string]any, transformation DataTransformation) (map[string]any, error) { + // Window function transformation implementation + windowField, _ := transformation.Config["window_field"].(string) + operation, _ := transformation.Config["operation"].(string) + + if windowField == "" || operation == "" { + return nil, fmt.Errorf("window_field and operation are required") + } + + values := d.extractNumbers(data[windowField]) + if len(values) == 0 { + return nil, fmt.Errorf("no numeric values found in window_field") + } + + var result float64 + switch operation { + case "sum": + for _, v := range values { + result += v + } + case "avg": + for _, v := range values { + result += v + } + result /= float64(len(values)) + default: + return nil, fmt.Errorf("unsupported window operation: %s", operation) + } + + return map[string]any{windowField: result}, nil +} + +func (d *DataTransformationHandler) decodeData(data map[string]any, transformation DataTransformation) (map[string]any, error) { + // Data decoding implementation + encodingType, _ := transformation.Config["type"].(string) + + if encodingType == "" { + return nil, fmt.Errorf("encoding type is required") + } + + sourceValue := data[transformation.SourceField] + var decoded any + var err error + + switch encodingType { + case "base64": + decoded, err = d.decodeBase64(fmt.Sprintf("%v", sourceValue)) + case "hex": + decoded, err = d.decodeHex(fmt.Sprintf("%v", sourceValue)) + default: + return nil, fmt.Errorf("unsupported decoding type: %s", encodingType) + } + + if err != nil { + return nil, err + } + + return map[string]any{transformation.TargetField: decoded}, nil +} + +func (d *DataTransformationHandler) decodeBase64(value string) (string, error) { + decoded, err := strconv.ParseFloat(value, 64) + if err != nil { + return "", err + } + return fmt.Sprintf("%v", decoded), nil +} + +func (d *DataTransformationHandler) decodeHex(value string) (string, error) { + decoded, err := strconv.ParseFloat(value, 64) + if err != nil { + return "", err + } + return fmt.Sprintf("%v", decoded), nil +} + +func (d *DataTransformationHandler) validateData(data map[string]any, transformation DataTransformation) (map[string]any, error) { + // Data validation implementation + validationRules, _ := transformation.Config["rules"].([]map[string]any) + + if len(validationRules) == 0 { + return nil, fmt.Errorf("validation rules are required") + } + + for _, rule := range validationRules { + field, _ := rule["field"].(string) + operator, _ := rule["operator"].(string) + value := rule["value"] + + if !d.evaluateCondition(data, &TransformCondition{Field: field, Operator: operator, Value: value}) { + return nil, fmt.Errorf("validation failed for field: %s", field) + } + } + + return data, nil +} + +func (d *DataTransformationHandler) toFloat64(value any) float64 { + switch v := value.(type) { + case int: + return float64(v) + case int32: + return float64(v) + case int64: + return float64(v) + case float32: + return float64(v) + case float64: + return v + case string: + if num, err := strconv.ParseFloat(v, 64); err == nil { + return num + } + } + return 0 +} + +// Factory function +func NewDataTransformationHandler(id string, transformations []DataTransformation) *DataTransformationHandler { + return &DataTransformationHandler{ + Operation: dag.Operation{ + ID: id, + Key: "data_transformation", + Type: dag.Function, + Tags: []string{"data", "transformation", "advanced"}, + }, + Transformations: transformations, + } +} diff --git a/handlers/data_utils_handler.go b/handlers/data_utils_handler.go new file mode 100644 index 0000000..7ca9033 --- /dev/null +++ b/handlers/data_utils_handler.go @@ -0,0 +1,494 @@ +package handlers + +import ( + "context" + "fmt" + + "github.com/oarkflow/json" + "github.com/oarkflow/mq" + "github.com/oarkflow/mq/dag" +) + +// DataUtilsHandler provides utility functions for common data operations +type DataUtilsHandler struct { + dag.Operation + UtilityType string `json:"utility_type"` // type of utility operation + Config map[string]any `json:"config"` // operation configuration +} + +// Utility operation types: +// - "deduplicate": Remove duplicate entries from arrays or objects +// - "merge": Merge multiple objects or arrays +// - "diff": Compare two data structures and return differences +// - "sort": Sort arrays or object keys +// - "reverse": Reverse arrays or strings +// - "sample": Take a sample of data +// - "validate_schema": Validate data against a schema +// - "convert_types": Convert data types in bulk + +func (d *DataUtilsHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]any + if err := json.Unmarshal(task.Payload, &data); err != nil { + return mq.Result{Error: fmt.Errorf("failed to unmarshal data: %v", err), Ctx: ctx} + } + + var result map[string]any + var err error + + switch d.UtilityType { + case "deduplicate": + result, err = d.deduplicate(data) + case "merge": + result, err = d.merge(data) + case "diff": + result, err = d.diff(data) + case "sort": + result, err = d.sort(data) + case "reverse": + result, err = d.reverse(data) + case "sample": + result, err = d.sample(data) + case "validate_schema": + result, err = d.validateSchema(data) + case "convert_types": + result, err = d.convertTypes(data) + default: + return mq.Result{Error: fmt.Errorf("unsupported utility type: %s", d.UtilityType), Ctx: ctx} + } + + if err != nil { + return mq.Result{Error: err, Ctx: ctx} + } + + bt, _ := json.Marshal(result) + return mq.Result{Payload: bt, Ctx: ctx} +} + +func (d *DataUtilsHandler) deduplicate(data map[string]any) (map[string]any, error) { + sourceField, _ := d.Config["source_field"].(string) + targetField, _ := d.Config["target_field"].(string) + dedupeBy, _ := d.Config["dedupe_by"].(string) // field to dedupe by, or empty for exact match + + if targetField == "" { + targetField = sourceField + } + + sourceData, exists := data[sourceField] + if !exists { + return nil, fmt.Errorf("source field '%s' not found", sourceField) + } + + // Implementation depends on data type + result := make(map[string]any) + for k, v := range data { + result[k] = v + } + + // Basic deduplication logic - can be extended + if arrayData, ok := sourceData.([]any); ok { + seen := make(map[string]bool) + var dedupedArray []any + + for _, item := range arrayData { + var key string + if dedupeBy != "" { + // Dedupe by specific field + if itemMap, ok := item.(map[string]any); ok { + key = fmt.Sprintf("%v", itemMap[dedupeBy]) + } + } else { + // Dedupe by entire item + key = fmt.Sprintf("%v", item) + } + + if !seen[key] { + seen[key] = true + dedupedArray = append(dedupedArray, item) + } + } + + result[targetField] = dedupedArray + } + + return result, nil +} + +func (d *DataUtilsHandler) merge(data map[string]any) (map[string]any, error) { + sourceFields, _ := d.Config["source_fields"].([]any) + targetField, _ := d.Config["target_field"].(string) + mergeStrategy, _ := d.Config["strategy"].(string) // "overwrite", "append", "combine" + + if len(sourceFields) < 2 { + return nil, fmt.Errorf("at least 2 source fields required for merge") + } + + var mergedResult any + + switch mergeStrategy { + case "overwrite": + // Merge objects by overwriting keys + merged := make(map[string]any) + for _, fieldName := range sourceFields { + if field, ok := fieldName.(string); ok { + if fieldData, exists := data[field]; exists { + if fieldMap, ok := fieldData.(map[string]any); ok { + for k, v := range fieldMap { + merged[k] = v + } + } + } + } + } + mergedResult = merged + + case "append": + // Merge arrays by appending + var merged []any + for _, fieldName := range sourceFields { + if field, ok := fieldName.(string); ok { + if fieldData, exists := data[field]; exists { + if fieldArray, ok := fieldData.([]any); ok { + merged = append(merged, fieldArray...) + } + } + } + } + mergedResult = merged + + default: + return nil, fmt.Errorf("unsupported merge strategy: %s", mergeStrategy) + } + + result := make(map[string]any) + for k, v := range data { + result[k] = v + } + result[targetField] = mergedResult + + return result, nil +} + +func (d *DataUtilsHandler) diff(data map[string]any) (map[string]any, error) { + field1, _ := d.Config["first_field"].(string) + field2, _ := d.Config["second_field"].(string) + targetField, _ := d.Config["target_field"].(string) + + data1, exists1 := data[field1] + data2, exists2 := data[field2] + + if !exists1 || !exists2 { + return nil, fmt.Errorf("both comparison fields must exist") + } + + // Basic diff implementation + diffResult := map[string]any{ + "equal": fmt.Sprintf("%v", data1) == fmt.Sprintf("%v", data2), + "first_only": d.findUniqueElements(data1, data2), + "second_only": d.findUniqueElements(data2, data1), + "common": d.findCommonElements(data1, data2), + } + + result := make(map[string]any) + for k, v := range data { + result[k] = v + } + result[targetField] = diffResult + + return result, nil +} + +func (d *DataUtilsHandler) findUniqueElements(data1, data2 any) []any { + // Simplified implementation for arrays + if array1, ok := data1.([]any); ok { + if array2, ok := data2.([]any); ok { + set2 := make(map[string]bool) + for _, item := range array2 { + set2[fmt.Sprintf("%v", item)] = true + } + + var unique []any + for _, item := range array1 { + if !set2[fmt.Sprintf("%v", item)] { + unique = append(unique, item) + } + } + return unique + } + } + return nil +} + +func (d *DataUtilsHandler) findCommonElements(data1, data2 any) []any { + // Simplified implementation for arrays + if array1, ok := data1.([]any); ok { + if array2, ok := data2.([]any); ok { + set2 := make(map[string]bool) + for _, item := range array2 { + set2[fmt.Sprintf("%v", item)] = true + } + + var common []any + seen := make(map[string]bool) + for _, item := range array1 { + key := fmt.Sprintf("%v", item) + if set2[key] && !seen[key] { + common = append(common, item) + seen[key] = true + } + } + return common + } + } + return nil +} + +func (d *DataUtilsHandler) sort(data map[string]any) (map[string]any, error) { + sourceField, _ := d.Config["source_field"].(string) + targetField, _ := d.Config["target_field"].(string) + // sortBy, _ := d.Config["sort_by"].(string) + // direction, _ := d.Config["direction"].(string) // "asc" or "desc" + + if targetField == "" { + targetField = sourceField + } + + sourceData, exists := data[sourceField] + if !exists { + return nil, fmt.Errorf("source field '%s' not found", sourceField) + } + + // Basic sorting implementation + // For production, use more sophisticated sorting + result := make(map[string]any) + for k, v := range data { + result[k] = v + } + + // This is a placeholder - implement proper sorting based on data type + result[targetField] = sourceData + + return result, nil +} + +func (d *DataUtilsHandler) reverse(data map[string]any) (map[string]any, error) { + sourceField, _ := d.Config["source_field"].(string) + targetField, _ := d.Config["target_field"].(string) + + if targetField == "" { + targetField = sourceField + } + + sourceData, exists := data[sourceField] + if !exists { + return nil, fmt.Errorf("source field '%s' not found", sourceField) + } + + result := make(map[string]any) + for k, v := range data { + result[k] = v + } + + // Reverse arrays + if arrayData, ok := sourceData.([]any); ok { + reversed := make([]any, len(arrayData)) + for i, item := range arrayData { + reversed[len(arrayData)-1-i] = item + } + result[targetField] = reversed + } else if strData, ok := sourceData.(string); ok { + // Reverse strings + runes := []rune(strData) + for i, j := 0, len(runes)-1; i < j; i, j = i+1, j-1 { + runes[i], runes[j] = runes[j], runes[i] + } + result[targetField] = string(runes) + } else { + result[targetField] = sourceData + } + + return result, nil +} + +func (d *DataUtilsHandler) sample(data map[string]any) (map[string]any, error) { + sourceField, _ := d.Config["source_field"].(string) + targetField, _ := d.Config["target_field"].(string) + sampleSize, _ := d.Config["sample_size"].(float64) + + if targetField == "" { + targetField = sourceField + } + + sourceData, exists := data[sourceField] + if !exists { + return nil, fmt.Errorf("source field '%s' not found", sourceField) + } + + result := make(map[string]any) + for k, v := range data { + result[k] = v + } + + // Basic sampling for arrays + if arrayData, ok := sourceData.([]any); ok { + size := int(sampleSize) + if size > len(arrayData) { + size = len(arrayData) + } + + if size <= 0 { + result[targetField] = []any{} + } else if size >= len(arrayData) { + result[targetField] = arrayData + } else { + // Simple sampling - take first N elements + // For production, implement proper random sampling + sample := make([]any, size) + copy(sample, arrayData[:size]) + result[targetField] = sample + } + } else { + result[targetField] = sourceData + } + + return result, nil +} + +func (d *DataUtilsHandler) validateSchema(data map[string]any) (map[string]any, error) { + // Basic schema validation placeholder + // For production, implement proper JSON schema validation + sourceField, _ := d.Config["source_field"].(string) + schema, _ := d.Config["schema"].(map[string]any) + + result := make(map[string]any) + for k, v := range data { + result[k] = v + } + + // Placeholder validation result + result["validation_result"] = map[string]any{ + "valid": true, + "errors": []string{}, + "schema": schema, + "data": data[sourceField], + } + + return result, nil +} + +func (d *DataUtilsHandler) convertTypes(data map[string]any) (map[string]any, error) { + conversions, _ := d.Config["conversions"].(map[string]any) + + result := make(map[string]any) + for k, v := range data { + result[k] = v + } + + // Apply type conversions + for field, targetType := range conversions { + if value, exists := result[field]; exists { + converted, err := d.convertType(value, fmt.Sprintf("%v", targetType)) + if err == nil { + result[field] = converted + } + } + } + + return result, nil +} + +func (d *DataUtilsHandler) convertType(value any, targetType string) (any, error) { + switch targetType { + case "string": + return fmt.Sprintf("%v", value), nil + case "int": + if num := d.toFloat64(value); num != 0 { + return int(num), nil + } + return 0, nil + case "float": + return d.toFloat64(value), nil + case "bool": + str := fmt.Sprintf("%v", value) + return str == "true" || str == "1" || str == "yes", nil + default: + return value, fmt.Errorf("unsupported target type: %s", targetType) + } +} + +func (d *DataUtilsHandler) toFloat64(value any) float64 { + switch v := value.(type) { + case int: + return float64(v) + case int32: + return float64(v) + case int64: + return float64(v) + case float32: + return float64(v) + case float64: + return v + case string: + var result float64 + if n, err := fmt.Sscanf(v, "%f", &result); err == nil && n == 1 { + return result + } + } + return 0 +} + +// Factory functions for common utilities +func NewDeduplicateHandler(id, sourceField, targetField, dedupeBy string) *DataUtilsHandler { + return &DataUtilsHandler{ + Operation: dag.Operation{ + ID: id, + Key: "deduplicate_data", + Type: dag.Function, + Tags: []string{"data", "utils", "deduplicate"}, + }, + UtilityType: "deduplicate", + Config: map[string]any{ + "source_field": sourceField, + "target_field": targetField, + "dedupe_by": dedupeBy, + }, + } +} + +func NewMergeHandler(id string, sourceFields []string, targetField, strategy string) *DataUtilsHandler { + var anyFields []any + for _, field := range sourceFields { + anyFields = append(anyFields, field) + } + + return &DataUtilsHandler{ + Operation: dag.Operation{ + ID: id, + Key: "merge_data", + Type: dag.Function, + Tags: []string{"data", "utils", "merge"}, + }, + UtilityType: "merge", + Config: map[string]any{ + "source_fields": anyFields, + "target_field": targetField, + "strategy": strategy, + }, + } +} + +func NewDataDiffHandler(id, field1, field2, targetField string) *DataUtilsHandler { + return &DataUtilsHandler{ + Operation: dag.Operation{ + ID: id, + Key: "diff_data", + Type: dag.Function, + Tags: []string{"data", "utils", "diff"}, + }, + UtilityType: "diff", + Config: map[string]any{ + "first_field": field1, + "second_field": field2, + "target_field": targetField, + }, + } +} diff --git a/handlers/field_manipulation_handler.go b/handlers/field_manipulation_handler.go new file mode 100644 index 0000000..3222fe1 --- /dev/null +++ b/handlers/field_manipulation_handler.go @@ -0,0 +1,501 @@ +package handlers + +import ( + "context" + "fmt" + "reflect" + "regexp" + "strings" + + "github.com/oarkflow/json" + "github.com/oarkflow/mq" + "github.com/oarkflow/mq/dag" +) + +// FieldManipulationHandler handles various field operations on data +type FieldManipulationHandler struct { + dag.Operation + Operations []FieldOperation `json:"operations"` // list of field operations to perform +} + +type FieldOperation struct { + Type string `json:"type"` // "filter", "add", "remove", "rename", "copy", "transform" + Config FieldOperationConfig `json:"config"` // operation-specific configuration +} + +type FieldOperationConfig struct { + // Common fields + Fields []string `json:"fields"` // fields to operate on + Pattern string `json:"pattern"` // regex pattern for field matching + CaseSensitive bool `json:"case_sensitive"` // case sensitive pattern matching + + // Filter operation + IncludeOnly []string `json:"include_only"` // only include these fields + Exclude []string `json:"exclude"` // exclude these fields + KeepNulls bool `json:"keep_nulls"` // keep fields with null values + KeepEmpty bool `json:"keep_empty"` // keep fields with empty values + + // Add operation + NewFields map[string]any `json:"new_fields"` // fields to add with their values + DefaultValue any `json:"default_value"` // default value for new fields + + // Rename operation + FieldMapping map[string]string `json:"field_mapping"` // old field name -> new field name + + // Copy operation + CopyMapping map[string]string `json:"copy_mapping"` // source field -> target field + OverwriteCopy bool `json:"overwrite_copy"` // overwrite target if exists + + // Transform operation + Transformation string `json:"transformation"` // transformation type + TransformConfig map[string]any `json:"transform_config"` // transformation configuration +} + +func (f *FieldManipulationHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]any + if err := json.Unmarshal(task.Payload, &data); err != nil { + return mq.Result{Error: fmt.Errorf("failed to unmarshal data: %v", err), Ctx: ctx} + } + + // Apply operations in sequence + for i, operation := range f.Operations { + var err error + + switch operation.Type { + case "filter": + data, err = f.filterFields(data, operation.Config) + case "add": + data, err = f.addFields(data, operation.Config) + case "remove": + data, err = f.removeFields(data, operation.Config) + case "rename": + data, err = f.renameFields(data, operation.Config) + case "copy": + data, err = f.copyFields(data, operation.Config) + case "transform": + data, err = f.transformFields(data, operation.Config) + default: + return mq.Result{Error: fmt.Errorf("unsupported operation type: %s", operation.Type), Ctx: ctx} + } + + if err != nil { + return mq.Result{Error: fmt.Errorf("operation %d (%s) failed: %v", i+1, operation.Type, err), Ctx: ctx} + } + } + + bt, _ := json.Marshal(data) + return mq.Result{Payload: bt, Ctx: ctx} +} + +func (f *FieldManipulationHandler) filterFields(data map[string]any, config FieldOperationConfig) (map[string]any, error) { + result := make(map[string]any) + + // If include_only is specified, only include those fields + if len(config.IncludeOnly) > 0 { + for _, field := range config.IncludeOnly { + if value, exists := data[field]; exists { + if f.shouldKeepValue(value, config) { + result[field] = value + } + } + } + return result, nil + } + + // Otherwise, include all except excluded fields + excludeSet := make(map[string]bool) + for _, field := range config.Exclude { + excludeSet[field] = true + } + + // Compile regex pattern if provided + var pattern *regexp.Regexp + if config.Pattern != "" { + flags := "" + if !config.CaseSensitive { + flags = "(?i)" + } + var err error + pattern, err = regexp.Compile(flags + config.Pattern) + if err != nil { + return nil, fmt.Errorf("invalid regex pattern: %v", err) + } + } + + for field, value := range data { + // Check if field should be excluded + if excludeSet[field] { + continue + } + + // Check pattern matching + if pattern != nil && !pattern.MatchString(field) { + continue + } + + // Check value conditions + if f.shouldKeepValue(value, config) { + result[field] = value + } + } + + return result, nil +} + +func (f *FieldManipulationHandler) shouldKeepValue(value any, config FieldOperationConfig) bool { + if value == nil { + return config.KeepNulls + } + + // Check for empty values + if f.isEmpty(value) { + return config.KeepEmpty + } + + return true +} + +func (f *FieldManipulationHandler) isEmpty(value any) bool { + if value == nil { + return true + } + + rv := reflect.ValueOf(value) + switch rv.Kind() { + case reflect.String: + return rv.String() == "" + case reflect.Slice, reflect.Array, reflect.Map: + return rv.Len() == 0 + default: + return false + } +} + +func (f *FieldManipulationHandler) addFields(data map[string]any, config FieldOperationConfig) (map[string]any, error) { + result := make(map[string]any) + + // Copy existing data + for k, v := range data { + result[k] = v + } + + // Add new fields from new_fields map + for field, value := range config.NewFields { + result[field] = value + } + + // Add fields from fields list with default value + for _, field := range config.Fields { + if _, exists := result[field]; !exists { + result[field] = config.DefaultValue + } + } + + return result, nil +} + +func (f *FieldManipulationHandler) removeFields(data map[string]any, config FieldOperationConfig) (map[string]any, error) { + result := make(map[string]any) + + // Create set of fields to remove + removeSet := make(map[string]bool) + for _, field := range config.Fields { + removeSet[field] = true + } + + // Compile regex pattern if provided + var pattern *regexp.Regexp + if config.Pattern != "" { + flags := "" + if !config.CaseSensitive { + flags = "(?i)" + } + var err error + pattern, err = regexp.Compile(flags + config.Pattern) + if err != nil { + return nil, fmt.Errorf("invalid regex pattern: %v", err) + } + } + + // Copy fields that should not be removed + for field, value := range data { + shouldRemove := removeSet[field] + + // Check pattern matching + if !shouldRemove && pattern != nil { + shouldRemove = pattern.MatchString(field) + } + + if !shouldRemove { + result[field] = value + } + } + + return result, nil +} + +func (f *FieldManipulationHandler) renameFields(data map[string]any, config FieldOperationConfig) (map[string]any, error) { + result := make(map[string]any) + + // Copy and rename fields + for field, value := range data { + newName := field + if mappedName, exists := config.FieldMapping[field]; exists { + newName = mappedName + } + result[newName] = value + } + + return result, nil +} + +func (f *FieldManipulationHandler) copyFields(data map[string]any, config FieldOperationConfig) (map[string]any, error) { + result := make(map[string]any) + + // Copy existing data + for k, v := range data { + result[k] = v + } + + // Copy fields based on mapping + for sourceField, targetField := range config.CopyMapping { + if value, exists := data[sourceField]; exists { + // Check if target already exists and overwrite is not allowed + if _, targetExists := result[targetField]; targetExists && !config.OverwriteCopy { + continue + } + result[targetField] = value + } + } + + return result, nil +} + +func (f *FieldManipulationHandler) transformFields(data map[string]any, config FieldOperationConfig) (map[string]any, error) { + result := make(map[string]any) + + // Copy existing data + for k, v := range data { + result[k] = v + } + + // Apply transformations to specified fields + for _, field := range config.Fields { + if value, exists := result[field]; exists { + transformedValue, err := f.applyTransformation(value, config.Transformation, config.TransformConfig) + if err != nil { + return nil, fmt.Errorf("transformation failed for field '%s': %v", field, err) + } + result[field] = transformedValue + } + } + + return result, nil +} + +func (f *FieldManipulationHandler) applyTransformation(value any, transformationType string, config map[string]any) (any, error) { + switch transformationType { + case "uppercase": + return strings.ToUpper(fmt.Sprintf("%v", value)), nil + + case "lowercase": + return strings.ToLower(fmt.Sprintf("%v", value)), nil + + case "title": + return strings.Title(fmt.Sprintf("%v", value)), nil + + case "trim": + return strings.TrimSpace(fmt.Sprintf("%v", value)), nil + + case "prefix": + prefix, _ := config["prefix"].(string) + return prefix + fmt.Sprintf("%v", value), nil + + case "suffix": + suffix, _ := config["suffix"].(string) + return fmt.Sprintf("%v", value) + suffix, nil + + case "replace": + old, _ := config["old"].(string) + new, _ := config["new"].(string) + return strings.ReplaceAll(fmt.Sprintf("%v", value), old, new), nil + + case "regex_replace": + pattern, _ := config["pattern"].(string) + replacement, _ := config["replacement"].(string) + re, err := regexp.Compile(pattern) + if err != nil { + return nil, fmt.Errorf("invalid regex pattern: %v", err) + } + return re.ReplaceAllString(fmt.Sprintf("%v", value), replacement), nil + + case "multiply": + if multiplier, ok := config["multiplier"].(float64); ok { + if num := f.toFloat64(value); num != 0 { + return num * multiplier, nil + } + } + return value, nil + + case "add": + if addend, ok := config["addend"].(float64); ok { + if num := f.toFloat64(value); num != 0 { + return num + addend, nil + } + } + return value, nil + + case "absolute": + if num := f.toFloat64(value); num != 0 { + if num < 0 { + return -num, nil + } + return num, nil + } + return value, nil + + case "default_if_empty": + defaultVal := config["default"] + if f.isEmpty(value) { + return defaultVal, nil + } + return value, nil + + default: + return nil, fmt.Errorf("unsupported transformation type: %s", transformationType) + } +} + +func (f *FieldManipulationHandler) toFloat64(value any) float64 { + switch v := value.(type) { + case int: + return float64(v) + case int32: + return float64(v) + case int64: + return float64(v) + case float32: + return float64(v) + case float64: + return v + case string: + var result float64 + if n, err := fmt.Sscanf(v, "%f", &result); err == nil && n == 1 { + return result + } + } + return 0 +} + +// Factory functions for common operations +func NewFieldFilter(id string, includeOnly, exclude []string, options FieldOperationConfig) *FieldManipulationHandler { + options.IncludeOnly = includeOnly + options.Exclude = exclude + + return &FieldManipulationHandler{ + Operation: dag.Operation{ + ID: id, + Key: "filter_fields", + Type: dag.Function, + Tags: []string{"data", "fields", "filter"}, + }, + Operations: []FieldOperation{ + { + Type: "filter", + Config: options, + }, + }, + } +} + +func NewFieldAdder(id string, newFields map[string]any, defaultValue any) *FieldManipulationHandler { + return &FieldManipulationHandler{ + Operation: dag.Operation{ + ID: id, + Key: "add_fields", + Type: dag.Function, + Tags: []string{"data", "fields", "add"}, + }, + Operations: []FieldOperation{ + { + Type: "add", + Config: FieldOperationConfig{ + NewFields: newFields, + DefaultValue: defaultValue, + }, + }, + }, + } +} + +func NewFieldRemover(id string, fieldsToRemove []string, pattern string) *FieldManipulationHandler { + return &FieldManipulationHandler{ + Operation: dag.Operation{ + ID: id, + Key: "remove_fields", + Type: dag.Function, + Tags: []string{"data", "fields", "remove"}, + }, + Operations: []FieldOperation{ + { + Type: "remove", + Config: FieldOperationConfig{ + Fields: fieldsToRemove, + Pattern: pattern, + }, + }, + }, + } +} + +func NewFieldRenamer(id string, fieldMapping map[string]string) *FieldManipulationHandler { + return &FieldManipulationHandler{ + Operation: dag.Operation{ + ID: id, + Key: "rename_fields", + Type: dag.Function, + Tags: []string{"data", "fields", "rename"}, + }, + Operations: []FieldOperation{ + { + Type: "rename", + Config: FieldOperationConfig{ + FieldMapping: fieldMapping, + }, + }, + }, + } +} + +func NewFieldTransformer(id string, fields []string, transformation string, transformConfig map[string]any) *FieldManipulationHandler { + return &FieldManipulationHandler{ + Operation: dag.Operation{ + ID: id, + Key: "transform_fields", + Type: dag.Function, + Tags: []string{"data", "fields", "transform"}, + }, + Operations: []FieldOperation{ + { + Type: "transform", + Config: FieldOperationConfig{ + Fields: fields, + Transformation: transformation, + TransformConfig: transformConfig, + }, + }, + }, + } +} + +func NewAdvancedFieldManipulator(id string, operations []FieldOperation) *FieldManipulationHandler { + return &FieldManipulationHandler{ + Operation: dag.Operation{ + ID: id, + Key: "advanced_field_manipulation", + Type: dag.Function, + Tags: []string{"data", "fields", "advanced"}, + }, + Operations: operations, + } +} diff --git a/handlers/flatten_handler.go b/handlers/flatten_handler.go new file mode 100644 index 0000000..66c3498 --- /dev/null +++ b/handlers/flatten_handler.go @@ -0,0 +1,387 @@ +package handlers + +import ( + "context" + "fmt" + "reflect" + "strings" + + "github.com/oarkflow/json" + "github.com/oarkflow/mq" + "github.com/oarkflow/mq/dag" +) + +// FlattenHandler flattens array of objects to a single object or performs other flattening operations +type FlattenHandler struct { + dag.Operation + FlattenType string `json:"flatten_type"` // "array_to_object", "nested_object", "key_value_pairs" + SourceField string `json:"source_field"` // field containing data to flatten + TargetField string `json:"target_field"` // field to store flattened result + Config FlattenConfiguration `json:"config"` // configuration for flattening +} + +type FlattenConfiguration struct { + // For array_to_object flattening + KeyField string `json:"key_field"` // field to use as key + ValueField string `json:"value_field"` // field to use as value + TypeField string `json:"type_field"` // optional field for value type conversion + + // For nested_object flattening + Separator string `json:"separator"` // separator for nested keys (default: ".") + MaxDepth int `json:"max_depth"` // maximum depth to flatten (-1 for unlimited) + Prefix string `json:"prefix"` // prefix for flattened keys + SkipArrays bool `json:"skip_arrays"` // skip array flattening + SkipObjects bool `json:"skip_objects"` // skip object flattening + + // For key_value_pairs flattening + PairSeparator string `json:"pair_separator"` // separator between key-value pairs + KVSeparator string `json:"kv_separator"` // separator between key and value + + // General options + OverwriteExisting bool `json:"overwrite_existing"` // overwrite existing keys + PreserveTypes bool `json:"preserve_types"` // preserve original data types +} + +func (f *FlattenHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]any + if err := json.Unmarshal(task.Payload, &data); err != nil { + return mq.Result{Error: fmt.Errorf("failed to unmarshal data: %v", err), Ctx: ctx} + } + + // Get source data + sourceData, exists := data[f.SourceField] + if !exists { + return mq.Result{Error: fmt.Errorf("source field '%s' not found", f.SourceField), Ctx: ctx} + } + + var result any + var err error + + switch f.FlattenType { + case "array_to_object": + result, err = f.flattenArrayToObject(sourceData) + case "nested_object": + result, err = f.flattenNestedObject(sourceData) + case "key_value_pairs": + result, err = f.flattenKeyValuePairs(sourceData) + default: + return mq.Result{Error: fmt.Errorf("unsupported flatten type: %s", f.FlattenType), Ctx: ctx} + } + + if err != nil { + return mq.Result{Error: err, Ctx: ctx} + } + + // Set target field + targetField := f.TargetField + if targetField == "" { + targetField = f.SourceField // overwrite source if no target specified + } + data[targetField] = result + + bt, _ := json.Marshal(data) + return mq.Result{Payload: bt, Ctx: ctx} +} + +func (f *FlattenHandler) flattenArrayToObject(data any) (map[string]any, error) { + // Convert to slice of maps + items, err := f.convertToSliceOfMaps(data) + if err != nil { + return nil, err + } + + result := make(map[string]any) + + for _, item := range items { + key, keyExists := item[f.Config.KeyField] + if !keyExists { + continue + } + + value, valueExists := item[f.Config.ValueField] + if !valueExists { + continue + } + + keyStr := fmt.Sprintf("%v", key) + + // Handle type conversion if type field is specified + if f.Config.TypeField != "" { + if typeValue, typeExists := item[f.Config.TypeField]; typeExists { + convertedValue, err := f.convertValueByType(value, fmt.Sprintf("%v", typeValue)) + if err == nil { + value = convertedValue + } + } + } + + // Check for overwrites + if !f.Config.OverwriteExisting { + if _, exists := result[keyStr]; exists { + continue // skip if key already exists + } + } + + result[keyStr] = value + } + + return result, nil +} + +func (f *FlattenHandler) flattenNestedObject(data any) (map[string]any, error) { + result := make(map[string]any) + f.flattenRecursive(data, "", result, 0) + return result, nil +} + +func (f *FlattenHandler) flattenRecursive(data any, prefix string, result map[string]any, depth int) { + // Check depth limit + if f.Config.MaxDepth > 0 && depth >= f.Config.MaxDepth { + key := prefix + if key == "" { + key = "root" + } + result[key] = data + return + } + + rv := reflect.ValueOf(data) + if !rv.IsValid() { + return + } + + switch rv.Kind() { + case reflect.Map: + if f.Config.SkipObjects { + result[prefix] = data + return + } + + if rv.Type().Key().Kind() == reflect.String { + for _, key := range rv.MapKeys() { + keyStr := key.String() + value := rv.MapIndex(key).Interface() + + newPrefix := keyStr + if prefix != "" { + separator := f.Config.Separator + if separator == "" { + separator = "." + } + newPrefix = prefix + separator + keyStr + } + if f.Config.Prefix != "" { + newPrefix = f.Config.Prefix + newPrefix + } + + f.flattenRecursive(value, newPrefix, result, depth+1) + } + } else { + result[prefix] = data + } + + case reflect.Slice, reflect.Array: + if f.Config.SkipArrays { + result[prefix] = data + return + } + + for i := 0; i < rv.Len(); i++ { + value := rv.Index(i).Interface() + newPrefix := fmt.Sprintf("%s[%d]", prefix, i) + f.flattenRecursive(value, newPrefix, result, depth+1) + } + + default: + if prefix == "" { + prefix = "value" + } + result[prefix] = data + } +} + +func (f *FlattenHandler) flattenKeyValuePairs(data any) (map[string]any, error) { + str := fmt.Sprintf("%v", data) + result := make(map[string]any) + + pairSeparator := f.Config.PairSeparator + if pairSeparator == "" { + pairSeparator = "," + } + + kvSeparator := f.Config.KVSeparator + if kvSeparator == "" { + kvSeparator = "=" + } + + pairs := strings.Split(str, pairSeparator) + for _, pair := range pairs { + pair = strings.TrimSpace(pair) + if pair == "" { + continue + } + + kv := strings.SplitN(pair, kvSeparator, 2) + if len(kv) != 2 { + continue + } + + key := strings.TrimSpace(kv[0]) + value := strings.TrimSpace(kv[1]) + + // Check for overwrites + if !f.Config.OverwriteExisting { + if _, exists := result[key]; exists { + continue + } + } + + // Try to preserve types if requested + if f.Config.PreserveTypes { + if convertedValue := f.tryConvertType(value); convertedValue != nil { + result[key] = convertedValue + } else { + result[key] = value + } + } else { + result[key] = value + } + } + + return result, nil +} + +func (f *FlattenHandler) convertToSliceOfMaps(data any) ([]map[string]any, error) { + rv := reflect.ValueOf(data) + if rv.Kind() != reflect.Slice && rv.Kind() != reflect.Array { + return nil, fmt.Errorf("data must be an array or slice") + } + + var items []map[string]any + for i := 0; i < rv.Len(); i++ { + item := rv.Index(i).Interface() + + // Convert item to map[string]any + itemMap := make(map[string]any) + itemBytes, err := json.Marshal(item) + if err != nil { + return nil, fmt.Errorf("failed to marshal item at index %d: %v", i, err) + } + + if err := json.Unmarshal(itemBytes, &itemMap); err != nil { + return nil, fmt.Errorf("failed to unmarshal item at index %d: %v", i, err) + } + + items = append(items, itemMap) + } + + return items, nil +} + +func (f *FlattenHandler) convertValueByType(value any, typeStr string) (any, error) { + valueStr := fmt.Sprintf("%v", value) + + switch strings.ToLower(typeStr) { + case "string", "str": + return valueStr, nil + case "int", "integer": + if i, err := fmt.Sscanf(valueStr, "%d", new(int)); err == nil && i == 1 { + var result int + fmt.Sscanf(valueStr, "%d", &result) + return result, nil + } + case "float", "double", "number": + if i, err := fmt.Sscanf(valueStr, "%f", new(float64)); err == nil && i == 1 { + var result float64 + fmt.Sscanf(valueStr, "%f", &result) + return result, nil + } + case "bool", "boolean": + lower := strings.ToLower(valueStr) + return lower == "true" || lower == "yes" || lower == "1" || lower == "on", nil + case "json": + var result any + if err := json.Unmarshal([]byte(valueStr), &result); err == nil { + return result, nil + } + } + + return value, fmt.Errorf("unable to convert to type %s", typeStr) +} + +func (f *FlattenHandler) tryConvertType(value string) any { + // Try int + var intVal int + if n, err := fmt.Sscanf(value, "%d", &intVal); err == nil && n == 1 { + return intVal + } + + // Try float + var floatVal float64 + if n, err := fmt.Sscanf(value, "%f", &floatVal); err == nil && n == 1 { + return floatVal + } + + // Try bool + lower := strings.ToLower(value) + if lower == "true" || lower == "false" { + return lower == "true" + } + + // Try JSON + var jsonVal any + if err := json.Unmarshal([]byte(value), &jsonVal); err == nil { + return jsonVal + } + + return nil // Unable to convert, return nil to use original string +} + +// Factory functions +func NewArrayToObjectFlattener(id, sourceField, targetField, keyField, valueField string, config FlattenConfiguration) *FlattenHandler { + config.KeyField = keyField + config.ValueField = valueField + + return &FlattenHandler{ + Operation: dag.Operation{ + ID: id, + Key: "flatten_array_to_object", + Type: dag.Function, + Tags: []string{"data", "flatten", "array", "object"}, + }, + FlattenType: "array_to_object", + SourceField: sourceField, + TargetField: targetField, + Config: config, + } +} + +func NewNestedObjectFlattener(id, sourceField, targetField string, config FlattenConfiguration) *FlattenHandler { + return &FlattenHandler{ + Operation: dag.Operation{ + ID: id, + Key: "flatten_nested_object", + Type: dag.Function, + Tags: []string{"data", "flatten", "nested", "object"}, + }, + FlattenType: "nested_object", + SourceField: sourceField, + TargetField: targetField, + Config: config, + } +} + +func NewKeyValuePairsFlattener(id, sourceField, targetField string, config FlattenConfiguration) *FlattenHandler { + return &FlattenHandler{ + Operation: dag.Operation{ + ID: id, + Key: "flatten_key_value_pairs", + Type: dag.Function, + Tags: []string{"data", "flatten", "key-value", "string"}, + }, + FlattenType: "key_value_pairs", + SourceField: sourceField, + TargetField: targetField, + Config: config, + } +} diff --git a/handlers/format_handler.go b/handlers/format_handler.go new file mode 100644 index 0000000..3e371ac --- /dev/null +++ b/handlers/format_handler.go @@ -0,0 +1,314 @@ +package handlers + +import ( + "context" + "fmt" + "reflect" + "strconv" + "strings" + "time" + + "github.com/oarkflow/json" + "github.com/oarkflow/mq" + "github.com/oarkflow/mq/dag" +) + +// FormatHandler handles various data formatting operations +type FormatHandler struct { + dag.Operation + FormatType string `json:"format_type"` // date, number, string, currency, etc. + SourceField string `json:"source_field"` // field to format + TargetField string `json:"target_field"` // field to store formatted result + FormatConfig map[string]string `json:"format_config"` // format-specific configuration +} + +func (f *FormatHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]any + if err := json.Unmarshal(task.Payload, &data); err != nil { + return mq.Result{Error: fmt.Errorf("failed to unmarshal data: %v", err), Ctx: ctx} + } + + // Get source value + sourceValue, exists := data[f.SourceField] + if !exists { + return mq.Result{Error: fmt.Errorf("source field '%s' not found", f.SourceField), Ctx: ctx} + } + + // Format based on type + var formattedValue any + var err error + + switch f.FormatType { + case "date": + formattedValue, err = f.formatDate(sourceValue) + case "number": + formattedValue, err = f.formatNumber(sourceValue) + case "currency": + formattedValue, err = f.formatCurrency(sourceValue) + case "string": + formattedValue, err = f.formatString(sourceValue) + case "boolean": + formattedValue, err = f.formatBoolean(sourceValue) + case "array": + formattedValue, err = f.formatArray(sourceValue) + default: + return mq.Result{Error: fmt.Errorf("unsupported format type: %s", f.FormatType), Ctx: ctx} + } + + if err != nil { + return mq.Result{Error: err, Ctx: ctx} + } + + // Set target field + targetField := f.TargetField + if targetField == "" { + targetField = f.SourceField // overwrite source if no target specified + } + data[targetField] = formattedValue + + bt, _ := json.Marshal(data) + return mq.Result{Payload: bt, Ctx: ctx} +} + +func (f *FormatHandler) formatDate(value any) (string, error) { + var t time.Time + var err error + + switch v := value.(type) { + case string: + // Try parsing various date formats + formats := []string{ + time.RFC3339, + "2006-01-02 15:04:05", + "2006-01-02", + "01/02/2006", + "02-01-2006", + "2006/01/02", + } + + for _, format := range formats { + if t, err = time.Parse(format, v); err == nil { + break + } + } + if err != nil { + return "", fmt.Errorf("unable to parse date string: %s", v) + } + case time.Time: + t = v + case int64: + t = time.Unix(v, 0) + case float64: + t = time.Unix(int64(v), 0) + default: + return "", fmt.Errorf("unsupported date type: %T", value) + } + + // Get output format from config + outputFormat := f.FormatConfig["output_format"] + if outputFormat == "" { + outputFormat = "2006-01-02 15:04:05" // default format + } + + return t.Format(outputFormat), nil +} + +func (f *FormatHandler) formatNumber(value any) (string, error) { + var num float64 + var err error + + switch v := value.(type) { + case string: + num, err = strconv.ParseFloat(v, 64) + if err != nil { + return "", fmt.Errorf("unable to parse number string: %s", v) + } + case int: + num = float64(v) + case int32: + num = float64(v) + case int64: + num = float64(v) + case float32: + num = float64(v) + case float64: + num = v + default: + return "", fmt.Errorf("unsupported number type: %T", value) + } + + // Get precision from config + precision := 2 + if p, exists := f.FormatConfig["precision"]; exists { + if parsed, err := strconv.Atoi(p); err == nil { + precision = parsed + } + } + + // Get format style + style := f.FormatConfig["style"] + switch style { + case "scientific": + return fmt.Sprintf("%e", num), nil + case "percentage": + return fmt.Sprintf("%."+strconv.Itoa(precision)+"f%%", num*100), nil + default: + return fmt.Sprintf("%."+strconv.Itoa(precision)+"f", num), nil + } +} + +func (f *FormatHandler) formatCurrency(value any) (string, error) { + num, err := f.formatNumber(value) + if err != nil { + return "", err + } + + symbol := f.FormatConfig["symbol"] + if symbol == "" { + symbol = "$" // default currency symbol + } + + position := f.FormatConfig["position"] + if position == "suffix" { + return num + " " + symbol, nil + } + return symbol + num, nil +} + +func (f *FormatHandler) formatString(value any) (string, error) { + str := fmt.Sprintf("%v", value) + + operation := f.FormatConfig["operation"] + switch operation { + case "uppercase": + return strings.ToUpper(str), nil + case "lowercase": + return strings.ToLower(str), nil + case "title": + return strings.Title(str), nil + case "trim": + return strings.TrimSpace(str), nil + case "truncate": + if lengthStr, exists := f.FormatConfig["length"]; exists { + if length, err := strconv.Atoi(lengthStr); err == nil && len(str) > length { + return str[:length] + "...", nil + } + } + return str, nil + default: + return str, nil + } +} + +func (f *FormatHandler) formatBoolean(value any) (string, error) { + var boolVal bool + + switch v := value.(type) { + case bool: + boolVal = v + case string: + lower := strings.ToLower(v) + boolVal = lower == "true" || lower == "yes" || lower == "1" || lower == "on" + case int, int32, int64: + boolVal = reflect.ValueOf(v).Int() != 0 + case float32, float64: + boolVal = reflect.ValueOf(v).Float() != 0 + default: + return "", fmt.Errorf("unsupported boolean type: %T", value) + } + + trueValue := f.FormatConfig["true_value"] + falseValue := f.FormatConfig["false_value"] + + if trueValue == "" { + trueValue = "true" + } + if falseValue == "" { + falseValue = "false" + } + + if boolVal { + return trueValue, nil + } + return falseValue, nil +} + +func (f *FormatHandler) formatArray(value any) (string, error) { + rv := reflect.ValueOf(value) + if rv.Kind() != reflect.Slice && rv.Kind() != reflect.Array { + return "", fmt.Errorf("value is not an array or slice") + } + + separator := f.FormatConfig["separator"] + if separator == "" { + separator = ", " + } + + var elements []string + for i := 0; i < rv.Len(); i++ { + elements = append(elements, fmt.Sprintf("%v", rv.Index(i).Interface())) + } + + return strings.Join(elements, separator), nil +} + +// Factory functions for different format types +func NewDateFormatter(id, sourceField, targetField string, config map[string]string) *FormatHandler { + return &FormatHandler{ + Operation: dag.Operation{ + ID: id, + Key: "format_date", + Type: dag.Function, + Tags: []string{"data", "format", "date"}, + }, + FormatType: "date", + SourceField: sourceField, + TargetField: targetField, + FormatConfig: config, + } +} + +func NewNumberFormatter(id, sourceField, targetField string, config map[string]string) *FormatHandler { + return &FormatHandler{ + Operation: dag.Operation{ + ID: id, + Key: "format_number", + Type: dag.Function, + Tags: []string{"data", "format", "number"}, + }, + FormatType: "number", + SourceField: sourceField, + TargetField: targetField, + FormatConfig: config, + } +} + +func NewCurrencyFormatter(id, sourceField, targetField string, config map[string]string) *FormatHandler { + return &FormatHandler{ + Operation: dag.Operation{ + ID: id, + Key: "format_currency", + Type: dag.Function, + Tags: []string{"data", "format", "currency"}, + }, + FormatType: "currency", + SourceField: sourceField, + TargetField: targetField, + FormatConfig: config, + } +} + +func NewStringFormatter(id, sourceField, targetField string, config map[string]string) *FormatHandler { + return &FormatHandler{ + Operation: dag.Operation{ + ID: id, + Key: "format_string", + Type: dag.Function, + Tags: []string{"data", "format", "string"}, + }, + FormatType: "string", + SourceField: sourceField, + TargetField: targetField, + FormatConfig: config, + } +} diff --git a/handlers/grouping_handler.go b/handlers/grouping_handler.go new file mode 100644 index 0000000..b02bf30 --- /dev/null +++ b/handlers/grouping_handler.go @@ -0,0 +1,338 @@ +package handlers + +import ( + "context" + "fmt" + "reflect" + "sort" + "strconv" + "strings" + + "github.com/oarkflow/json" + "github.com/oarkflow/mq" + "github.com/oarkflow/mq/dag" +) + +// GroupingHandler groups data by specified fields and applies aggregations +type GroupingHandler struct { + dag.Operation + GroupByFields []string `json:"group_by_fields"` // fields to group by + Aggregations []AggregationConfig `json:"aggregations"` // aggregation configurations + SourceField string `json:"source_field"` // field containing array to group + TargetField string `json:"target_field"` // field to store grouped result + Options GroupingOptions `json:"options"` // additional options +} + +type AggregationConfig struct { + Field string `json:"field"` // field to aggregate + Operation string `json:"operation"` // sum, count, avg, min, max, concat, first, last + Alias string `json:"alias"` // optional alias for result field +} + +type GroupingOptions struct { + SortBy string `json:"sort_by"` // field to sort groups by + SortDirection string `json:"sort_direction"` // asc or desc + IncludeCount bool `json:"include_count"` // include count of items in each group + CountAlias string `json:"count_alias"` // alias for count field (default: "count") +} + +func (g *GroupingHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]any + if err := json.Unmarshal(task.Payload, &data); err != nil { + return mq.Result{Error: fmt.Errorf("failed to unmarshal data: %v", err), Ctx: ctx} + } + + // Get source data + sourceData, exists := data[g.SourceField] + if !exists { + return mq.Result{Error: fmt.Errorf("source field '%s' not found", g.SourceField), Ctx: ctx} + } + + // Convert to slice of maps + items, err := g.convertToSliceOfMaps(sourceData) + if err != nil { + return mq.Result{Error: err, Ctx: ctx} + } + + // Group the data + groups := g.groupData(items) + + // Apply aggregations + result := g.applyAggregations(groups) + + // Sort if requested + if g.Options.SortBy != "" { + result = g.sortGroups(result) + } + + // Set target field + targetField := g.TargetField + if targetField == "" { + targetField = "grouped_data" + } + data[targetField] = result + + bt, _ := json.Marshal(data) + return mq.Result{Payload: bt, Ctx: ctx} +} + +func (g *GroupingHandler) convertToSliceOfMaps(data any) ([]map[string]any, error) { + rv := reflect.ValueOf(data) + + if rv.Kind() != reflect.Slice && rv.Kind() != reflect.Array { + return nil, fmt.Errorf("source data must be an array or slice") + } + + var items []map[string]any + for i := 0; i < rv.Len(); i++ { + item := rv.Index(i).Interface() + + // Convert item to map[string]any + itemMap := make(map[string]any) + itemBytes, err := json.Marshal(item) + if err != nil { + return nil, fmt.Errorf("failed to marshal item at index %d: %v", i, err) + } + + if err := json.Unmarshal(itemBytes, &itemMap); err != nil { + return nil, fmt.Errorf("failed to unmarshal item at index %d: %v", i, err) + } + + items = append(items, itemMap) + } + + return items, nil +} + +func (g *GroupingHandler) groupData(items []map[string]any) map[string][]map[string]any { + groups := make(map[string][]map[string]any) + + for _, item := range items { + // Create group key + var keyParts []string + for _, field := range g.GroupByFields { + value := fmt.Sprintf("%v", item[field]) + keyParts = append(keyParts, value) + } + groupKey := strings.Join(keyParts, "|") + + // Add item to group + groups[groupKey] = append(groups[groupKey], item) + } + + return groups +} + +func (g *GroupingHandler) applyAggregations(groups map[string][]map[string]any) []map[string]any { + var result []map[string]any + + for groupKey, items := range groups { + groupResult := make(map[string]any) + + // Add group key fields + keyParts := strings.Split(groupKey, "|") + for i, field := range g.GroupByFields { + if i < len(keyParts) { + groupResult[field] = keyParts[i] + } + } + + // Add count if requested + if g.Options.IncludeCount { + countAlias := g.Options.CountAlias + if countAlias == "" { + countAlias = "count" + } + groupResult[countAlias] = len(items) + } + + // Apply aggregations + for _, agg := range g.Aggregations { + fieldAlias := agg.Alias + if fieldAlias == "" { + fieldAlias = agg.Field + "_" + agg.Operation + } + + aggregatedValue := g.performAggregation(items, agg) + groupResult[fieldAlias] = aggregatedValue + } + + result = append(result, groupResult) + } + + return result +} + +func (g *GroupingHandler) performAggregation(items []map[string]any, agg AggregationConfig) any { + switch agg.Operation { + case "count": + return len(items) + case "sum": + return g.sumValues(items, agg.Field) + case "avg": + sum := g.sumValues(items, agg.Field) + if count := len(items); count > 0 { + return sum / float64(count) + } + return 0 + case "min": + return g.minValue(items, agg.Field) + case "max": + return g.maxValue(items, agg.Field) + case "first": + if len(items) > 0 { + return items[0][agg.Field] + } + return nil + case "last": + if len(items) > 0 { + return items[len(items)-1][agg.Field] + } + return nil + case "concat": + return g.concatValues(items, agg.Field) + case "unique": + return g.uniqueValues(items, agg.Field) + default: + return nil + } +} + +func (g *GroupingHandler) sumValues(items []map[string]any, field string) float64 { + var sum float64 + for _, item := range items { + if value, exists := item[field]; exists { + if num := g.toFloat64(value); num != 0 { + sum += num + } + } + } + return sum +} + +func (g *GroupingHandler) minValue(items []map[string]any, field string) any { + var min any + for _, item := range items { + if value, exists := item[field]; exists { + if min == nil { + min = value + } else { + if g.compareValues(value, min) < 0 { + min = value + } + } + } + } + return min +} + +func (g *GroupingHandler) maxValue(items []map[string]any, field string) any { + var max any + for _, item := range items { + if value, exists := item[field]; exists { + if max == nil { + max = value + } else { + if g.compareValues(value, max) > 0 { + max = value + } + } + } + } + return max +} + +func (g *GroupingHandler) concatValues(items []map[string]any, field string) string { + var values []string + for _, item := range items { + if value, exists := item[field]; exists { + values = append(values, fmt.Sprintf("%v", value)) + } + } + return strings.Join(values, ", ") +} + +func (g *GroupingHandler) uniqueValues(items []map[string]any, field string) []any { + seen := make(map[string]bool) + var unique []any + + for _, item := range items { + if value, exists := item[field]; exists { + key := fmt.Sprintf("%v", value) + if !seen[key] { + seen[key] = true + unique = append(unique, value) + } + } + } + return unique +} + +func (g *GroupingHandler) toFloat64(value any) float64 { + switch v := value.(type) { + case int: + return float64(v) + case int32: + return float64(v) + case int64: + return float64(v) + case float32: + return float64(v) + case float64: + return v + case string: + if num, err := strconv.ParseFloat(v, 64); err == nil { + return num + } + } + return 0 +} + +func (g *GroupingHandler) compareValues(a, b any) int { + aFloat := g.toFloat64(a) + bFloat := g.toFloat64(b) + + if aFloat < bFloat { + return -1 + } else if aFloat > bFloat { + return 1 + } + + // If numeric comparison doesn't work, compare as strings + aStr := fmt.Sprintf("%v", a) + bStr := fmt.Sprintf("%v", b) + return strings.Compare(aStr, bStr) +} + +func (g *GroupingHandler) sortGroups(groups []map[string]any) []map[string]any { + sort.Slice(groups, func(i, j int) bool { + valueI := groups[i][g.Options.SortBy] + valueJ := groups[j][g.Options.SortBy] + + comparison := g.compareValues(valueI, valueJ) + + if g.Options.SortDirection == "desc" { + return comparison > 0 + } + return comparison < 0 + }) + + return groups +} + +// Factory function +func NewGroupingHandler(id, sourceField, targetField string, groupByFields []string, aggregations []AggregationConfig, options GroupingOptions) *GroupingHandler { + return &GroupingHandler{ + Operation: dag.Operation{ + ID: id, + Key: "group_data", + Type: dag.Function, + Tags: []string{"data", "grouping", "aggregation"}, + }, + GroupByFields: groupByFields, + Aggregations: aggregations, + SourceField: sourceField, + TargetField: targetField, + Options: options, + } +} diff --git a/handlers/json_handler.go b/handlers/json_handler.go new file mode 100644 index 0000000..51387d1 --- /dev/null +++ b/handlers/json_handler.go @@ -0,0 +1,366 @@ +package handlers + +import ( + "context" + "fmt" + "strings" + + "github.com/oarkflow/json" + "github.com/oarkflow/mq" + "github.com/oarkflow/mq/dag" +) + +// JSONHandler handles JSON parsing and stringification operations +type JSONHandler struct { + dag.Operation + OperationType string `json:"operation_type"` // "parse" or "stringify" + SourceField string `json:"source_field"` // field containing data to process + TargetField string `json:"target_field"` // field to store result + Options JSONOptions `json:"options"` // processing options +} + +type JSONOptions struct { + Pretty bool `json:"pretty"` // pretty print JSON (stringify only) + Indent string `json:"indent"` // indentation string (stringify only) + EscapeHTML bool `json:"escape_html"` // escape HTML in JSON strings (stringify only) + ValidateOnly bool `json:"validate_only"` // only validate, don't parse (parse only) + ErrorOnInvalid bool `json:"error_on_invalid"` // return error if JSON is invalid + DefaultOnError any `json:"default_on_error"` // default value to use if parsing fails + StrictMode bool `json:"strict_mode"` // strict JSON parsing + AllowComments bool `json:"allow_comments"` // allow comments in JSON (parse only) + AllowTrailing bool `json:"allow_trailing"` // allow trailing commas (parse only) +} + +func (j *JSONHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]any + if err := json.Unmarshal(task.Payload, &data); err != nil { + return mq.Result{Error: fmt.Errorf("failed to unmarshal data: %v", err), Ctx: ctx} + } + + // Get source value + sourceValue, exists := data[j.SourceField] + if !exists { + return mq.Result{Error: fmt.Errorf("source field '%s' not found", j.SourceField), Ctx: ctx} + } + + var result any + var err error + + switch j.OperationType { + case "parse": + result, err = j.parseJSON(sourceValue) + case "stringify": + result, err = j.stringifyJSON(sourceValue) + default: + return mq.Result{Error: fmt.Errorf("unsupported operation type: %s", j.OperationType), Ctx: ctx} + } + + if err != nil { + if j.Options.ErrorOnInvalid { + return mq.Result{Error: err, Ctx: ctx} + } + // Use default value if specified + if j.Options.DefaultOnError != nil { + result = j.Options.DefaultOnError + } else { + result = sourceValue // keep original value + } + } + + // Set target field + targetField := j.TargetField + if targetField == "" { + targetField = j.SourceField // overwrite source if no target specified + } + data[targetField] = result + + bt, _ := json.Marshal(data) + return mq.Result{Payload: bt, Ctx: ctx} +} + +func (j *JSONHandler) parseJSON(value any) (any, error) { + // Convert value to string + jsonStr := fmt.Sprintf("%v", value) + + // Validate only if requested + if j.Options.ValidateOnly { + var temp any + err := json.Unmarshal([]byte(jsonStr), &temp) + if err != nil { + return false, fmt.Errorf("invalid JSON: %v", err) + } + return true, nil + } + + // Preprocess if needed + if j.Options.AllowComments { + jsonStr = j.removeComments(jsonStr) + } + + if j.Options.AllowTrailing { + jsonStr = j.removeTrailingCommas(jsonStr) + } + + // Parse JSON + var result any + err := json.Unmarshal([]byte(jsonStr), &result) + if err != nil { + return nil, fmt.Errorf("failed to parse JSON: %v", err) + } + + return result, nil +} + +func (j *JSONHandler) stringifyJSON(value any) (string, error) { + var result []byte + var err error + + if j.Options.Pretty { + indent := j.Options.Indent + if indent == "" { + indent = " " // default indentation + } + result, err = json.MarshalIndent(value, "", indent) + } else { + result, err = json.Marshal(value) + } + + if err != nil { + return "", fmt.Errorf("failed to stringify JSON: %v", err) + } + + return string(result), nil +} + +func (j *JSONHandler) removeComments(jsonStr string) string { + lines := strings.Split(jsonStr, "\n") + var cleanLines []string + + for _, line := range lines { + // Remove single-line comments + if commentIndex := strings.Index(line, "//"); commentIndex != -1 { + line = line[:commentIndex] + } + cleanLines = append(cleanLines, line) + } + + result := strings.Join(cleanLines, "\n") + + // Remove multi-line comments (basic implementation) + for { + start := strings.Index(result, "/*") + if start == -1 { + break + } + end := strings.Index(result[start:], "*/") + if end == -1 { + break + } + result = result[:start] + result[start+end+2:] + } + + return result +} + +func (j *JSONHandler) removeTrailingCommas(jsonStr string) string { + // Basic implementation - remove commas before closing brackets/braces + jsonStr = strings.ReplaceAll(jsonStr, ",}", "}") + jsonStr = strings.ReplaceAll(jsonStr, ",]", "]") + return jsonStr +} + +// Advanced JSON handler for complex operations +type AdvancedJSONHandler struct { + dag.Operation + Operations []JSONOperation `json:"operations"` // chain of JSON operations +} + +type JSONOperation struct { + Type string `json:"type"` // "parse", "stringify", "validate", "extract", "merge" + SourceField string `json:"source_field"` // field to operate on + TargetField string `json:"target_field"` // field to store result + Options JSONOptions `json:"options"` // operation options + Path string `json:"path"` // JSON path for extraction (extract only) + MergeWith string `json:"merge_with"` // field to merge with (merge only) +} + +func (a *AdvancedJSONHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]any + if err := json.Unmarshal(task.Payload, &data); err != nil { + return mq.Result{Error: fmt.Errorf("failed to unmarshal data: %v", err), Ctx: ctx} + } + + // Execute operations in sequence + for i, op := range a.Operations { + var result any + var err error + + switch op.Type { + case "parse", "stringify": + handler := &JSONHandler{ + OperationType: op.Type, + SourceField: op.SourceField, + TargetField: op.TargetField, + Options: op.Options, + } + + tempData, _ := json.Marshal(data) + tempTask := &mq.Task{Payload: tempData} + + handlerResult := handler.ProcessTask(ctx, tempTask) + if handlerResult.Error != nil { + return mq.Result{Error: fmt.Errorf("operation %d failed: %v", i+1, handlerResult.Error), Ctx: ctx} + } + + if err := json.Unmarshal(handlerResult.Payload, &data); err != nil { + return mq.Result{Error: fmt.Errorf("failed to unmarshal result from operation %d: %v", i+1, err), Ctx: ctx} + } + continue + + case "validate": + result, err = a.validateJSON(data[op.SourceField]) + case "extract": + result, err = a.extractFromJSON(data[op.SourceField], op.Path) + case "merge": + result, err = a.mergeJSON(data[op.SourceField], data[op.MergeWith]) + default: + return mq.Result{Error: fmt.Errorf("unsupported operation type: %s", op.Type), Ctx: ctx} + } + + if err != nil { + if op.Options.ErrorOnInvalid { + return mq.Result{Error: fmt.Errorf("operation %d failed: %v", i+1, err), Ctx: ctx} + } + result = op.Options.DefaultOnError + } + + // Set target field + targetField := op.TargetField + if targetField == "" { + targetField = op.SourceField + } + data[targetField] = result + } + + bt, _ := json.Marshal(data) + return mq.Result{Payload: bt, Ctx: ctx} +} + +func (a *AdvancedJSONHandler) validateJSON(value any) (bool, error) { + jsonStr := fmt.Sprintf("%v", value) + var temp any + err := json.Unmarshal([]byte(jsonStr), &temp) + return err == nil, err +} + +func (a *AdvancedJSONHandler) extractFromJSON(value any, path string) (any, error) { + // Basic JSON path extraction (simplified implementation) + // For production use, consider using a proper JSON path library + + var jsonData any + if str, ok := value.(string); ok { + if err := json.Unmarshal([]byte(str), &jsonData); err != nil { + return nil, fmt.Errorf("invalid JSON: %v", err) + } + } else { + jsonData = value + } + + // Split path and navigate + parts := strings.Split(strings.Trim(path, "."), ".") + current := jsonData + + for _, part := range parts { + if part == "" { + continue + } + + switch v := current.(type) { + case map[string]any: + current = v[part] + default: + return nil, fmt.Errorf("cannot navigate path '%s' at part '%s'", path, part) + } + } + + return current, nil +} + +func (a *AdvancedJSONHandler) mergeJSON(value1, value2 any) (any, error) { + // Convert both values to maps if they're JSON strings + var map1, map2 map[string]any + + if str, ok := value1.(string); ok { + if err := json.Unmarshal([]byte(str), &map1); err != nil { + return nil, fmt.Errorf("invalid JSON in first value: %v", err) + } + } else if m, ok := value1.(map[string]any); ok { + map1 = m + } else { + return nil, fmt.Errorf("first value is not a JSON object") + } + + if str, ok := value2.(string); ok { + if err := json.Unmarshal([]byte(str), &map2); err != nil { + return nil, fmt.Errorf("invalid JSON in second value: %v", err) + } + } else if m, ok := value2.(map[string]any); ok { + map2 = m + } else { + return nil, fmt.Errorf("second value is not a JSON object") + } + + // Merge maps + result := make(map[string]any) + for k, v := range map1 { + result[k] = v + } + for k, v := range map2 { + result[k] = v // overwrites if key exists + } + + return result, nil +} + +// Factory functions +func NewJSONParser(id, sourceField, targetField string, options JSONOptions) *JSONHandler { + return &JSONHandler{ + Operation: dag.Operation{ + ID: id, + Key: "json_parse", + Type: dag.Function, + Tags: []string{"data", "json", "parse"}, + }, + OperationType: "parse", + SourceField: sourceField, + TargetField: targetField, + Options: options, + } +} + +func NewJSONStringifier(id, sourceField, targetField string, options JSONOptions) *JSONHandler { + return &JSONHandler{ + Operation: dag.Operation{ + ID: id, + Key: "json_stringify", + Type: dag.Function, + Tags: []string{"data", "json", "stringify"}, + }, + OperationType: "stringify", + SourceField: sourceField, + TargetField: targetField, + Options: options, + } +} + +func NewAdvancedJSONHandler(id string, operations []JSONOperation) *AdvancedJSONHandler { + return &AdvancedJSONHandler{ + Operation: dag.Operation{ + ID: id, + Key: "advanced_json", + Type: dag.Function, + Tags: []string{"data", "json", "advanced"}, + }, + Operations: operations, + } +} diff --git a/handlers/split_join_handler.go b/handlers/split_join_handler.go new file mode 100644 index 0000000..7da1ad9 --- /dev/null +++ b/handlers/split_join_handler.go @@ -0,0 +1,259 @@ +package handlers + +import ( + "context" + "fmt" + "reflect" + "regexp" + "strings" + + "github.com/oarkflow/json" + "github.com/oarkflow/mq" + "github.com/oarkflow/mq/dag" +) + +// SplitJoinHandler handles splitting strings into arrays and joining arrays into strings +type SplitJoinHandler struct { + dag.Operation + OpType string `json:"op_type"` // "split" or "join" + SourceField string `json:"source_field"` // field to operate on + TargetField string `json:"target_field"` // field to store result + Delimiter string `json:"delimiter"` // delimiter for split/join + Options SplitJoinOptions `json:"options"` +} + +type SplitJoinOptions struct { + TrimSpaces bool `json:"trim_spaces"` // trim spaces from elements (split only) + RemoveEmpty bool `json:"remove_empty"` // remove empty elements (split only) + MaxSplit int `json:"max_split"` // maximum number of splits (-1 for unlimited) + UseRegex bool `json:"use_regex"` // treat delimiter as regex pattern (split only) + CaseInsensitive bool `json:"case_insensitive"` // case insensitive regex (split only) + Prefix string `json:"prefix"` // prefix for joined string (join only) + Suffix string `json:"suffix"` // suffix for joined string (join only) +} + +func (s *SplitJoinHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]any + if err := json.Unmarshal(task.Payload, &data); err != nil { + return mq.Result{Error: fmt.Errorf("failed to unmarshal data: %v", err), Ctx: ctx} + } + + // Get source value + sourceValue, exists := data[s.SourceField] + if !exists { + return mq.Result{Error: fmt.Errorf("source field '%s' not found", s.SourceField), Ctx: ctx} + } + + var result any + var err error + + switch s.OpType { + case "split": + result, err = s.performSplit(sourceValue) + case "join": + result, err = s.performJoin(sourceValue) + default: + return mq.Result{Error: fmt.Errorf("unsupported operation: %s", s.OpType), Ctx: ctx} + } + + if err != nil { + return mq.Result{Error: err, Ctx: ctx} + } + + // Set target field + targetField := s.TargetField + if targetField == "" { + targetField = s.SourceField // overwrite source if no target specified + } + data[targetField] = result + + bt, _ := json.Marshal(data) + return mq.Result{Payload: bt, Ctx: ctx} +} + +func (s *SplitJoinHandler) performSplit(value any) ([]string, error) { + // Convert value to string + str := fmt.Sprintf("%v", value) + + var parts []string + + if s.Options.UseRegex { + // Use regex for splitting + flags := "" + if s.Options.CaseInsensitive { + flags = "(?i)" + } + pattern := flags + s.Delimiter + + re, err := regexp.Compile(pattern) + if err != nil { + return nil, fmt.Errorf("invalid regex pattern '%s': %v", pattern, err) + } + + if s.Options.MaxSplit > 0 { + parts = re.Split(str, s.Options.MaxSplit+1) + } else { + parts = re.Split(str, -1) + } + } else { + // Use simple string splitting + if s.Options.MaxSplit > 0 { + parts = strings.SplitN(str, s.Delimiter, s.Options.MaxSplit+1) + } else { + parts = strings.Split(str, s.Delimiter) + } + } + + // Process the parts based on options + var processedParts []string + for _, part := range parts { + if s.Options.TrimSpaces { + part = strings.TrimSpace(part) + } + + if s.Options.RemoveEmpty && part == "" { + continue + } + + processedParts = append(processedParts, part) + } + + return processedParts, nil +} + +func (s *SplitJoinHandler) performJoin(value any) (string, error) { + // Convert value to slice of strings + parts, err := s.convertToStringSlice(value) + if err != nil { + return "", err + } + + // Join the parts + joined := strings.Join(parts, s.Delimiter) + + // Add prefix/suffix if specified + if s.Options.Prefix != "" { + joined = s.Options.Prefix + joined + } + if s.Options.Suffix != "" { + joined = joined + s.Options.Suffix + } + + return joined, nil +} + +func (s *SplitJoinHandler) convertToStringSlice(value any) ([]string, error) { + rv := reflect.ValueOf(value) + + if rv.Kind() != reflect.Slice && rv.Kind() != reflect.Array { + return nil, fmt.Errorf("value must be an array or slice for join operation") + } + + var parts []string + for i := 0; i < rv.Len(); i++ { + element := rv.Index(i).Interface() + parts = append(parts, fmt.Sprintf("%v", element)) + } + + return parts, nil +} + +// Advanced split/join handler for complex operations +type AdvancedSplitJoinHandler struct { + dag.Operation + Operations []SplitJoinOperation `json:"operations"` // chain of split/join operations +} + +type SplitJoinOperation struct { + Type string `json:"type"` // "split" or "join" + SourceField string `json:"source_field"` // field to operate on + TargetField string `json:"target_field"` // field to store result + Delimiter string `json:"delimiter"` // delimiter for operation + Options SplitJoinOptions `json:"options"` // operation options +} + +func (a *AdvancedSplitJoinHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]any + if err := json.Unmarshal(task.Payload, &data); err != nil { + return mq.Result{Error: fmt.Errorf("failed to unmarshal data: %v", err), Ctx: ctx} + } + + // Execute operations in sequence + for i, op := range a.Operations { + handler := &SplitJoinHandler{ + Operation: dag.Operation{ + ID: fmt.Sprintf("%s_op_%d", a.ID, i), + Key: "temp_split_join", + Type: dag.Function, + Tags: []string{"data", "temp"}, + }, + OpType: op.Type, + SourceField: op.SourceField, + TargetField: op.TargetField, + Delimiter: op.Delimiter, + Options: op.Options, + } + + // Create a temporary task for this operation + tempData, _ := json.Marshal(data) + tempTask := &mq.Task{Payload: tempData} + + result := handler.ProcessTask(ctx, tempTask) + if result.Error != nil { + return mq.Result{Error: fmt.Errorf("operation %d failed: %v", i+1, result.Error), Ctx: ctx} + } + + // Update data with the result + if err := json.Unmarshal(result.Payload, &data); err != nil { + return mq.Result{Error: fmt.Errorf("failed to unmarshal result from operation %d: %v", i+1, err), Ctx: ctx} + } + } + + bt, _ := json.Marshal(data) + return mq.Result{Payload: bt, Ctx: ctx} +} + +// Factory functions +func NewSplitHandler(id, sourceField, targetField, delimiter string, options SplitJoinOptions) *SplitJoinHandler { + return &SplitJoinHandler{ + Operation: dag.Operation{ + ID: id, + Key: "split_string", + Type: dag.Function, + Tags: []string{"data", "string", "split"}, + }, + OpType: "split", + SourceField: sourceField, + TargetField: targetField, + Delimiter: delimiter, + Options: options, + } +} + +func NewJoinHandler(id, sourceField, targetField, delimiter string, options SplitJoinOptions) *SplitJoinHandler { + return &SplitJoinHandler{ + Operation: dag.Operation{ + ID: id, + Key: "join_array", + Type: dag.Function, + Tags: []string{"data", "array", "join"}, + }, + OpType: "join", + SourceField: sourceField, + TargetField: targetField, + Delimiter: delimiter, + Options: options, + } +} + +func NewAdvancedSplitJoinHandler(id string, operations []SplitJoinOperation) *AdvancedSplitJoinHandler { + return &AdvancedSplitJoinHandler{ + Operation: dag.Operation{ + ID: id, + Key: "advanced_split_join", + Type: dag.Function, + Tags: []string{"data", "string", "array", "advanced"}, + }, + Operations: operations, + } +}