From e9f98a0816a19d88042c4e019162760af4bb92b7 Mon Sep 17 00:00:00 2001 From: sujit Date: Tue, 15 Oct 2024 20:34:56 +0545 Subject: [PATCH] feat: remove un-necessary dependencies --- codec/codec.go | 2 +- consumer.go | 2 +- dag/dag.go | 16 ++++++++-------- dag/task_manager.go | 8 ++++---- dag/waitgroup.go | 4 ++-- go.mod | 2 +- publisher.go | 2 +- 7 files changed, 18 insertions(+), 18 deletions(-) diff --git a/codec/codec.go b/codec/codec.go index 1d519f0..99e5e7a 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -11,9 +11,9 @@ import ( type Message struct { Headers map[string]string `msgpack:"h"` Queue string `msgpack:"q"` - Command consts.CMD `msgpack:"c"` Payload []byte `msgpack:"p"` m sync.RWMutex + Command consts.CMD `msgpack:"c"` } func NewMessage(cmd consts.CMD, payload []byte, queue string, headers map[string]string) *Message { diff --git a/consumer.go b/consumer.go index 6ae840b..9947645 100644 --- a/consumer.go +++ b/consumer.go @@ -32,9 +32,9 @@ type Consumer struct { conn net.Conn handler Handler pool *Pool + opts *Options id string queue string - opts *Options } func NewConsumer(id string, queue string, handler Handler, opts ...Option) *Consumer { diff --git a/dag/dag.go b/dag/dag.go index 1c29fc0..1a5db9a 100644 --- a/dag/dag.go +++ b/dag/dag.go @@ -32,11 +32,11 @@ const ( ) type Node struct { + processor mq.Processor Name string Key string Edges []Edge isReady bool - processor mq.Processor } func (n *Node) ProcessTask(ctx context.Context, msg *mq.Task) mq.Result { @@ -61,20 +61,20 @@ type ( ) type DAG struct { - name string - key string - startNode string - consumerTopic string nodes map[string]*Node server *mq.Broker consumer *mq.Consumer taskContext map[string]*TaskManager conditions map[FromNode]map[When]Then - mu sync.RWMutex - paused bool - opts []mq.Option pool *mq.Pool taskCleanupCh chan string + name string + key string + startNode string + consumerTopic string + opts []mq.Option + mu sync.RWMutex + paused bool } func (tm *DAG) SetKey(key string) { diff --git a/dag/task_manager.go b/dag/task_manager.go index 346043d..9e08c0e 100644 --- a/dag/task_manager.go +++ b/dag/task_manager.go @@ -12,14 +12,14 @@ import ( ) type TaskManager struct { - taskID string - dag *DAG - mutex sync.Mutex createdAt time.Time processedAt time.Time - results []mq.Result + dag *DAG nodeResults map[string]mq.Result wg *WaitGroup + taskID string + results []mq.Result + mutex sync.Mutex } func NewTaskManager(d *DAG, taskID string) *TaskManager { diff --git a/dag/waitgroup.go b/dag/waitgroup.go index defa02d..e63f9bf 100644 --- a/dag/waitgroup.go +++ b/dag/waitgroup.go @@ -5,9 +5,9 @@ import ( ) type WaitGroup struct { - sync.Mutex - counter int cond *sync.Cond + counter int + sync.Mutex } func NewWaitGroup() *WaitGroup { diff --git a/go.mod b/go.mod index ce058dd..f5ff52e 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/oarkflow/mq -go 1.23.0 +go 1.22.0 require ( github.com/oarkflow/date v0.0.4 diff --git a/publisher.go b/publisher.go index 44e16b3..b28bb13 100644 --- a/publisher.go +++ b/publisher.go @@ -14,8 +14,8 @@ import ( ) type Publisher struct { - id string opts *Options + id string } func NewPublisher(id string, opts ...Option) *Publisher {