From aa0cecf1fe59a5c37823833c31719b51bf3e07a9 Mon Sep 17 00:00:00 2001 From: sujit Date: Mon, 14 Oct 2024 22:21:53 +0545 Subject: [PATCH] feat: remove un-necessary dependencies --- broker.go | 16 +++---- codec/codec.go | 6 +-- codec/serializer.go | 37 +++++++++++++++ ctx.go | 11 +++-- examples/dag.go | 22 +++++---- go.mod | 8 +--- go.sum | 6 --- queue.go | 7 +-- storage/interface.go | 15 ++++++ storage/memory/memory.go | 99 ++++++++++++++++++++++++++++++++++++++++ 10 files changed, 185 insertions(+), 42 deletions(-) create mode 100644 codec/serializer.go create mode 100644 storage/interface.go create mode 100644 storage/memory/memory.go diff --git a/broker.go b/broker.go index 57bd824..946ba61 100644 --- a/broker.go +++ b/broker.go @@ -10,11 +10,11 @@ import ( "strings" "time" - "github.com/oarkflow/xsync" - "github.com/oarkflow/mq/codec" "github.com/oarkflow/mq/consts" "github.com/oarkflow/mq/jsonparser" + "github.com/oarkflow/mq/storage" + "github.com/oarkflow/mq/storage/memory" "github.com/oarkflow/mq/utils" ) @@ -35,18 +35,18 @@ type publisher struct { } type Broker struct { - queues xsync.IMap[string, *Queue] - consumers xsync.IMap[string, *consumer] - publishers xsync.IMap[string, *publisher] + queues storage.IMap[string, *Queue] + consumers storage.IMap[string, *consumer] + publishers storage.IMap[string, *publisher] opts *Options } func NewBroker(opts ...Option) *Broker { options := SetupOptions(opts...) return &Broker{ - queues: xsync.NewMap[string, *Queue](), - publishers: xsync.NewMap[string, *publisher](), - consumers: xsync.NewMap[string, *consumer](), + queues: memory.New[string, *Queue](), + publishers: memory.New[string, *publisher](), + consumers: memory.New[string, *consumer](), opts: options, } } diff --git a/codec/codec.go b/codec/codec.go index 042f60e..1d519f0 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -5,8 +5,6 @@ import ( "net" "sync" - "github.com/oarkflow/msgpack" - "github.com/oarkflow/mq/consts" ) @@ -30,7 +28,7 @@ func NewMessage(cmd consts.CMD, payload []byte, queue string, headers map[string func (m *Message) Serialize() ([]byte, error) { m.m.RLock() defer m.m.RUnlock() - data, err := msgpack.Marshal(m) + data, err := Marshal(m) if err != nil { return nil, err } @@ -39,7 +37,7 @@ func (m *Message) Serialize() ([]byte, error) { func Deserialize(data []byte) (*Message, error) { var msg Message - if err := msgpack.Unmarshal(data, &msg); err != nil { + if err := Unmarshal(data, &msg); err != nil { return nil, err } diff --git a/codec/serializer.go b/codec/serializer.go new file mode 100644 index 0000000..0d49514 --- /dev/null +++ b/codec/serializer.go @@ -0,0 +1,37 @@ +package codec + +import ( + "encoding/json" +) + +type MarshallerFunc func(v any) ([]byte, error) + +type UnmarshallerFunc func(data []byte, v any) error + +func (f MarshallerFunc) Marshal(v any) ([]byte, error) { + return f(v) +} + +func (f UnmarshallerFunc) Unmarshal(data []byte, v any) error { + return f(data, v) +} + +var defaultMarshaller MarshallerFunc = json.Marshal + +var defaultUnmarshaller UnmarshallerFunc = json.Unmarshal + +func SetMarshaller(marshaller MarshallerFunc) { + defaultMarshaller = marshaller +} + +func SetUnmarshaller(unmarshaller UnmarshallerFunc) { + defaultUnmarshaller = unmarshaller +} + +func Marshal(v any) ([]byte, error) { + return defaultMarshaller(v) +} + +func Unmarshal(data []byte, v any) error { + return defaultUnmarshaller(data, v) +} diff --git a/ctx.go b/ctx.go index f739a44..122bb09 100644 --- a/ctx.go +++ b/ctx.go @@ -11,9 +11,10 @@ import ( "time" "github.com/oarkflow/xid" - "github.com/oarkflow/xsync" "github.com/oarkflow/mq/consts" + "github.com/oarkflow/mq/storage" + "github.com/oarkflow/mq/storage/memory" ) type Task struct { @@ -41,7 +42,7 @@ func IsClosed(conn net.Conn) bool { func SetHeaders(ctx context.Context, headers map[string]string) context.Context { hd, _ := GetHeaders(ctx) if hd == nil { - hd = xsync.NewMap[string, string]() + hd = memory.New[string, string]() } for key, val := range headers { hd.Set(key, val) @@ -52,7 +53,7 @@ func SetHeaders(ctx context.Context, headers map[string]string) context.Context func WithHeaders(ctx context.Context, headers map[string]string) map[string]string { hd, _ := GetHeaders(ctx) if hd == nil { - hd = xsync.NewMap[string, string]() + hd = memory.New[string, string]() } for key, val := range headers { hd.Set(key, val) @@ -60,8 +61,8 @@ func WithHeaders(ctx context.Context, headers map[string]string) map[string]stri return hd.AsMap() } -func GetHeaders(ctx context.Context) (xsync.IMap[string, string], bool) { - headers, ok := ctx.Value(consts.HeaderKey).(xsync.IMap[string, string]) +func GetHeaders(ctx context.Context) (storage.IMap[string, string], bool) { + headers, ok := ctx.Value(consts.HeaderKey).(storage.IMap[string, string]) return headers, ok } diff --git a/examples/dag.go b/examples/dag.go index c84ab5c..9e69eb8 100644 --- a/examples/dag.go +++ b/examples/dag.go @@ -25,22 +25,26 @@ var ( ) func main() { + subDag := dag.NewDAG( + "Sub DAG", + "D", + mq.WithNotifyResponse(tasks.NotifySubDAGResponse), + ) + subDag.AddNode("D", "D", tasks.Node4, true) + subDag.AddNode("F", "F", tasks.Node6) + subDag.AddNode("G", "G", tasks.Node7) + subDag.AddNode("H", "H", tasks.Node8) + subDag.AddEdge("Label 2", "D", "F") + subDag.AddEdge("Label 4", "F", "G", "H") + d.AddNode("A", "A", tasks.Node1, true) d.AddNode("B", "B", tasks.Node2) d.AddNode("C", "C", tasks.Node3) - d.AddNode("D", "D", tasks.Node4) + d.AddDAGNode("D", "D", subDag) d.AddNode("E", "E", tasks.Node5) - d.AddNode("F", "F", tasks.Node6) - d.AddNode("G", "G", tasks.Node7) - d.AddNode("H", "H", tasks.Node8) - d.AddLoop("Send each item", "A", "B") d.AddCondition("C", map[dag.When]dag.Then{"PASS": "D", "FAIL": "E"}) d.AddEdge("Label 1", "B", "C") - d.AddEdge("Label 2", "D", "F") - d.AddEdge("Label 3", "E", "F") - d.AddEdge("Label 4", "F", "G", "H") - // Classify edges // d.ClassifyEdges() // fmt.Println(d.ExportDOT()) diff --git a/go.mod b/go.mod index 22cf9e1..1c5abaa 100644 --- a/go.mod +++ b/go.mod @@ -2,10 +2,4 @@ module github.com/oarkflow/mq go 1.23.0 -require ( - github.com/oarkflow/msgpack v0.0.1 - github.com/oarkflow/xid v1.2.5 - github.com/oarkflow/xsync v0.0.5 -) - -require github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect +require github.com/oarkflow/xid v1.2.5 diff --git a/go.sum b/go.sum index f859e8d..bc4a7ee 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,2 @@ -github.com/oarkflow/msgpack v0.0.1 h1:q7CvtT1/2TU+qoXgiQZ6BW4E9fAGOQ9bATWFFdFNZUI= -github.com/oarkflow/msgpack v0.0.1/go.mod h1:LthukEYeLGz+NEYzN6voNUVHAiLR8A3HX2DM40O3QBg= github.com/oarkflow/xid v1.2.5 h1:6RcNJm9+oZ/B647gkME9trCzhpxGQaSdNoD56Vmkeho= github.com/oarkflow/xid v1.2.5/go.mod h1:jG4YBh+swbjlWApGWDBYnsJEa7hi3CCpmuqhB3RAxVo= -github.com/oarkflow/xsync v0.0.5 h1:7HBQjmDus4YFLQFC5D197TB4c2YJTVwsTFuqk5zWKBM= -github.com/oarkflow/xsync v0.0.5/go.mod h1:KAaEc506OEd3ISxfhgUBKxk8eQzkz+mb0JkpGGd/QwU= -github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= -github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= diff --git a/queue.go b/queue.go index 2a190de..6de8a23 100644 --- a/queue.go +++ b/queue.go @@ -1,11 +1,12 @@ package mq import ( - "github.com/oarkflow/xsync" + "github.com/oarkflow/mq/storage" + "github.com/oarkflow/mq/storage/memory" ) type Queue struct { - consumers xsync.IMap[string, *consumer] + consumers storage.IMap[string, *consumer] tasks chan *QueuedTask // channel to hold tasks name string } @@ -13,7 +14,7 @@ type Queue struct { func newQueue(name string, queueSize int) *Queue { return &Queue{ name: name, - consumers: xsync.NewMap[string, *consumer](), + consumers: memory.New[string, *consumer](), tasks: make(chan *QueuedTask, queueSize), // buffer size for tasks } } diff --git a/storage/interface.go b/storage/interface.go new file mode 100644 index 0000000..4bfaacc --- /dev/null +++ b/storage/interface.go @@ -0,0 +1,15 @@ +package storage + +// IMap is a thread-safe map interface. +type IMap[K comparable, V any] interface { + Get(K) (V, bool) + Set(K, V) + Del(K) + ForEach(func(K, V) bool) + Clear() + Size() int + Keys() []K + Values() []V + AsMap() map[K]V + Clone() IMap[K, V] +} diff --git a/storage/memory/memory.go b/storage/memory/memory.go new file mode 100644 index 0000000..0281fc4 --- /dev/null +++ b/storage/memory/memory.go @@ -0,0 +1,99 @@ +package memory + +import ( + "sync" + + "github.com/oarkflow/mq/storage" +) + +type Map[K comparable, V any] struct { + data map[K]V + mu sync.RWMutex +} + +func New[K comparable, V any]() *Map[K, V] { + return &Map[K, V]{ + data: make(map[K]V), + } +} + +func (m *Map[K, V]) Get(key K) (V, bool) { + m.mu.RLock() + defer m.mu.RUnlock() + val, exists := m.data[key] + return val, exists +} + +func (m *Map[K, V]) Set(key K, value V) { + m.mu.Lock() + defer m.mu.Unlock() + m.data[key] = value +} + +func (m *Map[K, V]) Del(key K) { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.data, key) +} + +func (m *Map[K, V]) ForEach(f func(K, V) bool) { + m.mu.RLock() + defer m.mu.RUnlock() + for k, v := range m.data { + if !f(k, v) { + break + } + } +} + +func (m *Map[K, V]) Clear() { + m.mu.Lock() + defer m.mu.Unlock() + m.data = make(map[K]V) +} + +func (m *Map[K, V]) Size() int { + m.mu.RLock() + defer m.mu.RUnlock() + return len(m.data) +} + +func (m *Map[K, V]) Keys() []K { + m.mu.RLock() + defer m.mu.RUnlock() + keys := make([]K, 0, len(m.data)) + for k := range m.data { + keys = append(keys, k) + } + return keys +} + +func (m *Map[K, V]) Values() []V { + m.mu.RLock() + defer m.mu.RUnlock() + values := make([]V, 0, len(m.data)) + for _, v := range m.data { + values = append(values, v) + } + return values +} + +func (m *Map[K, V]) AsMap() map[K]V { + m.mu.RLock() + defer m.mu.RUnlock() + copiedMap := make(map[K]V, len(m.data)) + for k, v := range m.data { + copiedMap[k] = v + } + return copiedMap +} + +func (m *Map[K, V]) Clone() storage.IMap[K, V] { + m.mu.RLock() + defer m.mu.RUnlock() + clonedMap := New[K, V]() + for k, v := range m.data { + clonedMap.Set(k, v) + } + return clonedMap +}