diff --git a/dag/operation.go b/dag/operation.go index 91da8c3..dfffdb8 100644 --- a/dag/operation.go +++ b/dag/operation.go @@ -19,6 +19,20 @@ import ( "github.com/oarkflow/mq" ) +type Processor interface { + mq.Processor + SetConfig(Payload) +} + +type Condition interface { + Match(data any) bool +} + +type ConditionProcessor interface { + Processor + SetConditions(map[string]Condition) +} + type Provider struct { Mapping map[string]any `json:"mapping"` UpdateMapping map[string]any `json:"update_mapping"` @@ -47,19 +61,19 @@ type Operation struct { Payload Payload } -func (e *Operation) Consume(ctx context.Context) error { +func (e *Operation) Consume(_ context.Context) error { return nil } -func (e *Operation) Pause(ctx context.Context) error { +func (e *Operation) Pause(_ context.Context) error { return nil } -func (e *Operation) Resume(ctx context.Context) error { +func (e *Operation) Resume(_ context.Context) error { return nil } -func (e *Operation) Stop(ctx context.Context) error { +func (e *Operation) Stop(_ context.Context) error { return nil } @@ -67,11 +81,11 @@ func (e *Operation) Close() error { return nil } -func (e *Operation) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { +func (e *Operation) ProcessTask(_ context.Context, task *mq.Task) mq.Result { return mq.Result{Payload: task.Payload} } -func (e *Operation) SetPayload(payload Payload) { +func (e *Operation) SetConfig(payload Payload) { e.Payload = payload e.GeneratedFields = slices.Compact(append(e.GeneratedFields, payload.GeneratedFields...)) } diff --git a/dag/operations.go b/dag/operations.go index 4ca8aac..d56923f 100644 --- a/dag/operations.go +++ b/dag/operations.go @@ -30,3 +30,28 @@ func AvailableHandlers() []string { } return op } + +type List struct { + mu *sync.RWMutex + Handlers map[string]*DAG +} + +var dags = &List{mu: &sync.RWMutex{}, Handlers: make(map[string]*DAG)} + +func AddDAG(key string, handler *DAG) { + dags.mu.Lock() + dags.Handlers[key] = handler + dags.mu.Unlock() +} + +func GetDAG(key string) *DAG { + return dags.Handlers[key] +} + +func AvailableDAG() []string { + var op []string + for opt := range dags.Handlers { + op = append(op, opt) + } + return op +} diff --git a/go.mod b/go.mod index a13f379..8a70092 100644 --- a/go.mod +++ b/go.mod @@ -18,11 +18,11 @@ require ( require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/klauspost/compress v1.17.11 // indirect + github.com/klauspost/compress v1.17.9 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/prometheus/client_model v0.6.1 // indirect - github.com/prometheus/common v0.60.1 // indirect + github.com/prometheus/common v0.55.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect - golang.org/x/sys v0.26.0 // indirect - google.golang.org/protobuf v1.35.1 // indirect + golang.org/x/sys v0.22.0 // indirect + google.golang.org/protobuf v1.34.2 // indirect ) diff --git a/go.sum b/go.sum index 6e1ff0a..1605a9c 100644 --- a/go.sum +++ b/go.sum @@ -8,8 +8,6 @@ github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aN github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= -github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= -github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= @@ -32,19 +30,13 @@ github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc= github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8= -github.com/prometheus/common v0.60.1 h1:FUas6GcOw66yB/73KC+BOZoFJmbo/1pojoILArPAaSc= -github.com/prometheus/common v0.60.1/go.mod h1:h0LYf1R1deLSKtD4Vdg8gy4RuOvENW2J/h19V5NADQw= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c h1:7dEasQXItcW1xKJ2+gg5VOiBnqWrJc+rq0DPKyvvdbY= golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c/go.mod h1:NQtJDoLvd6faHhE7m4T/1IY708gDefGGjR/iUW8yQQ8= golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= -golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ= golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= -google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= -google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= diff --git a/options.go b/options.go index 126a6ed..2de23ad 100644 --- a/options.go +++ b/options.go @@ -28,7 +28,7 @@ type Result struct { func (r Result) MarshalJSON() ([]byte, error) { type Alias Result aux := &struct { - ErrorMsg string `json:"error"` + ErrorMsg string `json:"error,omitempty"` Alias }{ Alias: (Alias)(r), diff --git a/pool.go b/pool.go index 2b207d4..bb747e4 100644 --- a/pool.go +++ b/pool.go @@ -131,9 +131,10 @@ func (wp *Pool) processNextBatch() { wp.handleTask(task) } } - if len(tasks) > 0 { - wp.taskCompletionNotifier.Done() - } + // @TODO - Why was this done? + //if len(tasks) > 0 { + // wp.taskCompletionNotifier.Done() + //} } func (wp *Pool) handleTask(task *QueueTask) {