This commit is contained in:
sujit
2025-08-11 12:58:16 +05:45
parent 3bc33accac
commit 82c1afbc8e
5 changed files with 53 additions and 244 deletions

View File

@@ -8,6 +8,7 @@ import (
"time"
"github.com/oarkflow/json"
"github.com/oarkflow/mq/utils"
"github.com/oarkflow/date"
"github.com/oarkflow/dipper"
@@ -170,6 +171,48 @@ func (e *Operation) ValidateFields(c context.Context, payload []byte) (map[strin
return data, nil
}
func (e *Operation) ParseMapping(ctx context.Context, data map[string]any) map[string]any {
templateData := make(map[string]any)
if e.Payload.Mapping != nil {
for k, v := range e.Payload.Mapping {
_, val := GetVal(ctx, v, data)
templateData[k] = val
}
}
return templateData
}
func (e *Operation) ExceptFields(payload []byte) []byte {
except, ok := e.Payload.Data["except_fields"].([]string)
if !ok {
exceptAny, ok := e.Payload.Data["except_fields"].([]any)
if ok {
except = make([]string, len(exceptAny))
for i, v := range exceptAny {
except[i], _ = v.(string)
}
}
}
return e.RemoveFields(payload, except...)
}
func (e *Operation) RemoveFields(payload []byte, keys ...string) []byte {
for _, field := range keys {
payload = utils.RemoveRecursiveFromJSON(payload, field)
}
return payload
}
func UnmarshalPayload[T any](c context.Context, payload []byte) (T, error) {
var data T
if len(payload) > 0 {
if err := json.Unmarshal(payload, &data); err != nil {
return data, err
}
}
return data, nil
}
func GetVal(c context.Context, v string, data map[string]any) (key string, val any) {
key, val = getVal(c, v, data)
if val == nil {

View File

@@ -4,7 +4,6 @@ import (
"context"
"fmt"
"github.com/oarkflow/json"
"github.com/oarkflow/mq"
"github.com/oarkflow/mq/dag"
)
@@ -36,10 +35,9 @@ func (e *Condition) SetConditions(conditions map[string]dag.Condition) {
}
func (e *Condition) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
var data map[string]any
err := json.Unmarshal(task.Payload, &data)
data, err := dag.UnmarshalPayload[map[string]any](ctx, task.Payload)
if err != nil {
panic(err)
return mq.Result{Error: err, Ctx: ctx}
}
var conditionStatus string
_, ok := e.conditions[defaultKey]

View File

@@ -1,208 +0,0 @@
package handlers
/*
Data Transformation Handlers Usage Examples
This file contains examples of how to configure and use the various data transformation handlers.
All configurations are done through the dag.Operation.Payload.Data map - no handler-specific configurations.
1. FORMAT HANDLER
=================
Supports: string, number, date, currency, uppercase, lowercase, capitalize, trim
Example configuration:
{
"format_type": "uppercase",
"fields": ["name", "title"],
"currency": "$",
"date_format": "2006-01-02"
}
2. GROUP HANDLER
================
Groups data with aggregation functions
Example configuration:
{
"group_by": ["department", "status"],
"aggregations": {
"salary": "sum",
"age": "avg",
"count": "count",
"name": "concat"
},
"concat_separator": ", "
}
3. SPLIT/JOIN HANDLER
====================
Handles string operations
Split example:
{
"operation": "split",
"fields": ["full_name"],
"separator": " "
}
Join example:
{
"operation": "join",
"source_fields": ["first_name", "last_name"],
"target_field": "full_name",
"separator": " "
}
4. FLATTEN HANDLER
==================
Flattens nested data structures
Flatten settings example (key-value pairs):
{
"operation": "flatten_settings",
"source_field": "settings",
"target_field": "config"
}
Input: {"settings": [{"key": "theme", "value": "dark", "value_type": "string"}]}
Output: {"config": {"theme": "dark"}}
5. JSON HANDLER
===============
JSON parsing and manipulation
Parse JSON string:
{
"operation": "parse",
"fields": ["json_data"]
}
Stringify object:
{
"operation": "stringify",
"fields": ["object_data"],
"indent": true
}
6. FIELD HANDLER
================
Field manipulation operations
Filter fields:
{
"operation": "filter",
"fields": ["name", "email", "age"]
}
Rename fields:
{
"operation": "rename",
"mapping": {
"old_name": "new_name",
"email_addr": "email"
}
}
Add fields:
{
"operation": "add",
"new_fields": {
"created_at": "2023-01-01",
"status": "active"
}
}
Transform keys:
{
"operation": "transform_keys",
"transformation": "snake_case" // or camel_case, kebab_case, etc.
}
7. DATA HANDLER
===============
Miscellaneous data operations
Sort data:
{
"operation": "sort",
"sort_field": "created_at",
"sort_order": "desc"
}
Deduplicate:
{
"operation": "deduplicate",
"dedupe_fields": ["email", "phone"]
}
Calculate fields:
{
"operation": "calculate",
"calculations": {
"total": {
"operation": "sum",
"fields": ["amount1", "amount2"]
},
"average_score": {
"operation": "average",
"fields": ["score1", "score2", "score3"]
}
}
}
Type casting:
{
"operation": "type_cast",
"cast": {
"age": "int",
"salary": "float",
"active": "bool"
}
}
Validate fields:
{
"operation": "validate_fields",
"validation_rules": {
"email": {
"required": true,
"type": "string"
},
"age": {
"required": true,
"type": "int",
"min": 0
}
}
}
USAGE IN DAG:
=============
import "github.com/oarkflow/mq/handlers"
import "github.com/oarkflow/mq/dag"
// Create handler
formatHandler := handlers.NewFormatHandler("format-1")
// Configure through Operation.Payload
config := dag.Payload{
Data: map[string]any{
"format_type": "uppercase",
"fields": []string{"name", "title"},
},
}
formatHandler.SetConfig(config)
// Use in DAG
dag := dag.NewDAG("data-processing")
dag.AddNode(formatHandler)
CHAINING OPERATIONS:
===================
You can chain multiple handlers in a DAG:
1. Parse JSON → 2. Flatten → 3. Filter fields → 4. Format → 5. Group
Each handler receives the output of the previous handler as input.
*/

View File

@@ -50,11 +50,9 @@ func (c *RenderHTMLNode) ProcessTask(ctx context.Context, task *mq.Task) mq.Resu
templateStr, _ = data["template"].(string)
templateFile, _ = data["template_file"].(string)
)
var templateData map[string]any
if len(task.Payload) > 0 {
if err := json.Unmarshal(task.Payload, &templateData); err != nil {
return mq.Result{Payload: task.Payload, Error: err, Ctx: ctx}
}
templateData, err := dag.UnmarshalPayload[map[string]any](ctx, task.Payload)
if err != nil {
return mq.Result{Error: err, Ctx: ctx}
}
if templateData == nil {
templateData = make(map[string]any)
@@ -68,7 +66,6 @@ func (c *RenderHTMLNode) ProcessTask(ctx context.Context, task *mq.Task) mq.Resu
}
templateData["task_id"] = ctx.Value("task_id")
var renderedHTML string
var err error
parser := jet.NewWithMemory(jet.WithDelims("{{", "}}"))
switch {
// 1. JSONSchema + HTML Template

View File

@@ -11,7 +11,6 @@ import (
"github.com/oarkflow/mq"
"github.com/oarkflow/mq/dag"
"github.com/oarkflow/mq/utils"
)
type OutputHandler struct {
@@ -19,31 +18,14 @@ type OutputHandler struct {
}
func (c *OutputHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
var templateData map[string]any
if len(task.Payload) > 0 {
if err := json.Unmarshal(task.Payload, &templateData); err != nil {
templateData, err := dag.UnmarshalPayload[map[string]any](ctx, task.Payload)
if err != nil {
return mq.Result{Error: err, Ctx: ctx}
}
}
if templateData == nil {
templateData = make(map[string]any)
}
if c.Payload.Mapping != nil {
for k, v := range c.Payload.Mapping {
_, val := dag.GetVal(ctx, v, templateData)
templateData[k] = val
}
}
except, ok := c.Payload.Data["except_fields"].([]string)
if !ok {
exceptAny, ok := c.Payload.Data["except_fields"].([]any)
if ok {
except = make([]string, len(exceptAny))
for i, v := range exceptAny {
except[i], _ = v.(string)
}
}
}
templateData = c.ParseMapping(ctx, templateData)
outputType, _ := c.Payload.Data["output_type"].(string)
switch outputType {
case "stdout":
@@ -132,11 +114,8 @@ func (c *OutputHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Resul
"message": "Data sent to API successfully",
}
}
bt, _ := json.Marshal(templateData)
for _, field := range except {
bt = utils.RemoveRecursiveFromJSON(bt, field)
}
bt = c.ExceptFields(bt)
return mq.Result{Payload: bt, Ctx: ctx}
}