diff --git a/dag/dag.go b/dag/dag.go index e720241..52a6b1c 100644 --- a/dag/dag.go +++ b/dag/dag.go @@ -21,41 +21,41 @@ import ( ) type Node struct { - NodeType NodeType + processor mq.Processor Label string ID string Edges []Edge - processor mq.Processor + NodeType NodeType isReady bool } type Edge struct { From *Node To *Node - Type EdgeType Label string + Type EdgeType } type DAG struct { - server *mq.Broker - consumer *mq.Consumer nodes storage.IMap[string, *Node] taskManager storage.IMap[string, *TaskManager] iteratorNodes storage.IMap[string, []Edge] + Error error + conditions map[string]map[string]string + consumer *mq.Consumer finalResult func(taskID string, result mq.Result) pool *mq.Pool - name string - key string - startNode string - opts []mq.Option - conditions map[string]map[string]string - consumerTopic string - hasPageNode bool - reportNodeResultCallback func(mq.Result) - Error error Notifier *sio.Server - paused bool + server *mq.Broker + reportNodeResultCallback func(mq.Result) + key string + consumerTopic string + startNode string + name string report string + opts []mq.Option + hasPageNode bool + paused bool } func NewDAG(name, key string, finalResultCallback func(taskID string, result mq.Result), opts ...mq.Option) *DAG { diff --git a/dag/operation.go b/dag/operation.go index 6098412..5ba5f5b 100644 --- a/dag/operation.go +++ b/dag/operation.go @@ -52,13 +52,13 @@ type Payload struct { } type Operation struct { - ID string `json:"id"` - Type NodeType `json:"type"` - Key string `json:"key"` + ID string `json:"id"` + Key string `json:"key"` + Payload Payload RequiredFields []string `json:"required_fields"` OptionalFields []string `json:"optional_fields"` GeneratedFields []string `json:"generated_fields"` - Payload Payload + Type NodeType `json:"type"` } func (e *Operation) Consume(_ context.Context) error { diff --git a/dag/task_manager.go b/dag/task_manager.go index c6cc526..8b926b6 100644 --- a/dag/task_manager.go +++ b/dag/task_manager.go @@ -15,11 +15,11 @@ import ( ) type TaskState struct { + UpdatedAt time.Time + targetResults storage.IMap[string, mq.Result] NodeID string Status mq.Status - UpdatedAt time.Time Result mq.Result - targetResults storage.IMap[string, mq.Result] } func newTaskState(nodeID string) *TaskState { @@ -39,6 +39,7 @@ type nodeResult struct { } type TaskManager struct { + createdAt time.Time taskStates storage.IMap[string, *TaskState] parentNodes storage.IMap[string, string] childNodes storage.IMap[string, int] @@ -46,15 +47,14 @@ type TaskManager struct { iteratorNodes storage.IMap[string, []Edge] currentNodePayload storage.IMap[string, json.RawMessage] currentNodeResult storage.IMap[string, mq.Result] - createdAt time.Time - latency string + taskQueue chan *task result *mq.Result dag *DAG - taskID string - taskQueue chan *task resultQueue chan nodeResult resultCh chan mq.Result stopCh chan struct{} + taskID string + latency string } type task struct { diff --git a/examples/parse.go b/examples/parse.go index 16e42f8..7298d2d 100644 --- a/examples/parse.go +++ b/examples/parse.go @@ -3,10 +3,10 @@ package main import ( "fmt" - "github.com/oarkflow/mq/utils" + "github.com/oarkflow/form" ) func main() { queryString := []byte("fields[0][method]=GET&fields[0][path]=/user/:id&fields[0][handlerMsg]=User Profile&fields[1][method]=POST&fields[1][path]=/user/create&fields[1][handlerMsg]=Create User") - fmt.Println(utils.DecodeForm(queryString)) + fmt.Println(form.DecodeForm(queryString)) } diff --git a/options.go b/options.go index c3a8cb1..0c5dac1 100644 --- a/options.go +++ b/options.go @@ -51,8 +51,8 @@ func (r Result) MarshalJSON() ([]byte, error) { func (r *Result) UnmarshalJSON(data []byte) error { type Alias Result aux := &struct { - ErrMsg string `json:"error,omitempty"` *Alias + ErrMsg string `json:"error,omitempty"` }{ Alias: (*Alias)(r), } @@ -110,18 +110,18 @@ type TLSConfig struct { } type Options struct { + storage TaskStorage consumerOnSubscribe func(ctx context.Context, topic, consumerName string) consumerOnClose func(ctx context.Context, topic, consumerName string) notifyResponse func(context.Context, Result) error - tlsConfig TLSConfig brokerAddr string + tlsConfig TLSConfig callback []func(context.Context, Result) Result - maxRetries int + queueSize int initialDelay time.Duration - storage TaskStorage maxBackoff time.Duration jitterPercent float64 - queueSize int + maxRetries int numOfWorkers int maxMemoryLoad int64 syncMode bool diff --git a/pool.go b/pool.go index bb747e4..60ec8b2 100644 --- a/pool.go +++ b/pool.go @@ -26,29 +26,29 @@ type Metrics struct { type Pool struct { taskStorage TaskStorage - taskQueue PriorityQueue - taskQueueLock sync.Mutex + scheduler *Scheduler stop chan struct{} taskNotify chan struct{} workerAdjust chan int - wg sync.WaitGroup - maxMemoryLoad int64 - numOfWorkers int32 - metrics Metrics - paused bool - scheduler *Scheduler - overflowBufferLock sync.RWMutex - overflowBuffer []*QueueTask - taskAvailableCond *sync.Cond handler Handler - callback Callback - batchSize int - timeout time.Duration completionCallback CompletionCallback + taskAvailableCond *sync.Cond + callback Callback + taskQueue PriorityQueue + overflowBuffer []*QueueTask + metrics Metrics + wg sync.WaitGroup taskCompletionNotifier sync.WaitGroup + timeout time.Duration + batchSize int + maxMemoryLoad int64 idleTimeout time.Duration backoffDuration time.Duration - maxRetries int // Max retries for tasks + maxRetries int + overflowBufferLock sync.RWMutex + taskQueueLock sync.Mutex + numOfWorkers int32 + paused bool } func NewPool(numOfWorkers int, opts ...PoolOption) *Pool { diff --git a/scheduler.go b/scheduler.go index f4a20e1..63e757a 100644 --- a/scheduler.go +++ b/scheduler.go @@ -12,8 +12,8 @@ import ( type ScheduleOptions struct { Handler Handler Callback Callback - Overlap bool Interval time.Duration + Overlap bool Recurring bool } @@ -74,12 +74,12 @@ type ScheduledTask struct { } type Schedule struct { - Interval time.Duration + TimeOfDay time.Time + CronSpec string DayOfWeek []time.Weekday DayOfMonth []int - TimeOfDay time.Time + Interval time.Duration Recurring bool - CronSpec string } func (s *Schedule) ToHumanReadable() string { @@ -216,9 +216,9 @@ func parseCronValue(field string) ([]string, error) { } type Scheduler struct { + pool *Pool tasks []ScheduledTask mu sync.Mutex - pool *Pool } func (s *Scheduler) Start() { diff --git a/sio/hub.go b/sio/hub.go index 59b565d..23f5f5e 100644 --- a/sio/hub.go +++ b/sio/hub.go @@ -6,52 +6,52 @@ import ( ) type hub struct { - sockets map[string]*Socket - rooms map[string]*room - shutdownCh chan bool + multihomeBackend Adapter + joinRoomCh chan *joinRequest + roomMsgCh chan *RoomMsg socketList chan []*Socket addCh chan *Socket delCh chan *Socket - joinRoomCh chan *joinRequest + sockets map[string]*Socket leaveRoomCh chan *leaveRequest - roomMsgCh chan *RoomMsg - broomcastCh chan *RoomMsg // for passing data from the backend + shutdownCh chan bool + broomcastCh chan *RoomMsg broadcastCh chan *BroadcastMsg bbroadcastCh chan *BroadcastMsg - multihomeEnabled bool - multihomeBackend Adapter + rooms map[string]*room l sync.RWMutex + multihomeEnabled bool } type room struct { - name string sockets map[string]*Socket + name string l sync.RWMutex } type joinRequest struct { - roomName string socket *Socket + roomName string } type leaveRequest struct { - roomName string socket *Socket + roomName string } // RoomMsg represents an event to be dispatched to a room of sockets type RoomMsg struct { - RoomName string - Except []string - EventName string Data any + RoomName string + EventName string + Except []string } // BroadcastMsg represents an event to be dispatched to all Sockets on the Server type BroadcastMsg struct { + Data any EventName string Except []string - Data any } func (h *hub) addSocket(s *Socket) { diff --git a/sio/server.go b/sio/server.go index 980db3f..1a05ab0 100644 --- a/sio/server.go +++ b/sio/server.go @@ -2,7 +2,6 @@ package sio import ( "context" - "github.com/gorilla/websocket" "io" "log/slog" "net/http" @@ -12,6 +11,8 @@ import ( "sync" "syscall" "time" + + "github.com/gorilla/websocket" ) const ( // ASCII chars @@ -23,8 +24,8 @@ const ( // ASCII chars ) type event struct { - eventName string eventHandler func(*Socket, []byte) + eventName string } // Config specifies parameters for upgrading an HTTP connection to a @@ -32,13 +33,14 @@ type event struct { // // It is safe to call Config's methods concurrently. type Config struct { - HandshakeTimeout time.Duration - ReadBufferSize, WriteBufferSize int - WriteBufferPool websocket.BufferPool - Subprotocols []string - Error func(w http.ResponseWriter, r *http.Request, status int, reason error) - CheckOrigin func(r *http.Request) bool - EnableCompression bool + WriteBufferPool websocket.BufferPool + Error func(w http.ResponseWriter, r *http.Request, status int, reason error) + CheckOrigin func(r *http.Request) bool + Subprotocols []string + HandshakeTimeout time.Duration + ReadBufferSize int + WriteBufferSize int + EnableCompression bool } // Server manages the coordination between @@ -182,7 +184,7 @@ type EventHandler interface { // Any event functions registered with On, must be safe for concurrent use by multiple // go routines func (serv *Server) On(eventName string, handleFunc func(*Socket, []byte)) { - serv.events[eventName] = &event{eventName, handleFunc} // you think you can handle the func? + serv.events[eventName] = &event{eventName: eventName, eventHandler: handleFunc} // you think you can handle the func? } // OnEvent has the same functionality as On, but accepts diff --git a/sio/socket.go b/sio/socket.go index 9a93204..94c5531 100644 --- a/sio/socket.go +++ b/sio/socket.go @@ -22,18 +22,18 @@ var ( // Socket represents a websocket connection type Socket struct { + context storage.IMap[string, any] l *sync.RWMutex - id string ws *websocket.Conn - closed bool serv *Server roomsl *sync.RWMutex request *http.Request - context storage.IMap[string, any] rooms map[string]bool pingTicker *time.Ticker tickerDone chan bool + id string pingInterval time.Duration + closed bool } const ( @@ -139,7 +139,7 @@ func (s *Socket) GetRooms() []string { func (s *Socket) Join(roomName string) { s.roomsl.Lock() defer s.roomsl.Unlock() - s.serv.hub.joinRoom(&joinRequest{roomName, s}) + s.serv.hub.joinRoom(&joinRequest{roomName: roomName, socket: s}) s.rooms[roomName] = true } @@ -149,7 +149,7 @@ func (s *Socket) Join(roomName string) { func (s *Socket) Leave(roomName string) { s.roomsl.Lock() defer s.roomsl.Unlock() - s.serv.hub.leaveRoom(&leaveRequest{roomName, s}) + s.serv.hub.leaveRoom(&leaveRequest{roomName: roomName, socket: s}) delete(s.rooms, roomName) }