feat: add task completion

This commit is contained in:
sujit
2024-12-26 13:44:06 +05:45
parent 05fe8458f3
commit 807c845254
10 changed files with 84 additions and 82 deletions

View File

@@ -21,41 +21,41 @@ import (
) )
type Node struct { type Node struct {
NodeType NodeType processor mq.Processor
Label string Label string
ID string ID string
Edges []Edge Edges []Edge
processor mq.Processor NodeType NodeType
isReady bool isReady bool
} }
type Edge struct { type Edge struct {
From *Node From *Node
To *Node To *Node
Type EdgeType
Label string Label string
Type EdgeType
} }
type DAG struct { type DAG struct {
server *mq.Broker
consumer *mq.Consumer
nodes storage.IMap[string, *Node] nodes storage.IMap[string, *Node]
taskManager storage.IMap[string, *TaskManager] taskManager storage.IMap[string, *TaskManager]
iteratorNodes storage.IMap[string, []Edge] iteratorNodes storage.IMap[string, []Edge]
Error error
conditions map[string]map[string]string
consumer *mq.Consumer
finalResult func(taskID string, result mq.Result) finalResult func(taskID string, result mq.Result)
pool *mq.Pool pool *mq.Pool
name string
key string
startNode string
opts []mq.Option
conditions map[string]map[string]string
consumerTopic string
hasPageNode bool
reportNodeResultCallback func(mq.Result)
Error error
Notifier *sio.Server Notifier *sio.Server
paused bool server *mq.Broker
reportNodeResultCallback func(mq.Result)
key string
consumerTopic string
startNode string
name string
report string report string
opts []mq.Option
hasPageNode bool
paused bool
} }
func NewDAG(name, key string, finalResultCallback func(taskID string, result mq.Result), opts ...mq.Option) *DAG { func NewDAG(name, key string, finalResultCallback func(taskID string, result mq.Result), opts ...mq.Option) *DAG {

View File

@@ -52,13 +52,13 @@ type Payload struct {
} }
type Operation struct { type Operation struct {
ID string `json:"id"` ID string `json:"id"`
Type NodeType `json:"type"` Key string `json:"key"`
Key string `json:"key"` Payload Payload
RequiredFields []string `json:"required_fields"` RequiredFields []string `json:"required_fields"`
OptionalFields []string `json:"optional_fields"` OptionalFields []string `json:"optional_fields"`
GeneratedFields []string `json:"generated_fields"` GeneratedFields []string `json:"generated_fields"`
Payload Payload Type NodeType `json:"type"`
} }
func (e *Operation) Consume(_ context.Context) error { func (e *Operation) Consume(_ context.Context) error {

View File

@@ -15,11 +15,11 @@ import (
) )
type TaskState struct { type TaskState struct {
UpdatedAt time.Time
targetResults storage.IMap[string, mq.Result]
NodeID string NodeID string
Status mq.Status Status mq.Status
UpdatedAt time.Time
Result mq.Result Result mq.Result
targetResults storage.IMap[string, mq.Result]
} }
func newTaskState(nodeID string) *TaskState { func newTaskState(nodeID string) *TaskState {
@@ -39,6 +39,7 @@ type nodeResult struct {
} }
type TaskManager struct { type TaskManager struct {
createdAt time.Time
taskStates storage.IMap[string, *TaskState] taskStates storage.IMap[string, *TaskState]
parentNodes storage.IMap[string, string] parentNodes storage.IMap[string, string]
childNodes storage.IMap[string, int] childNodes storage.IMap[string, int]
@@ -46,15 +47,14 @@ type TaskManager struct {
iteratorNodes storage.IMap[string, []Edge] iteratorNodes storage.IMap[string, []Edge]
currentNodePayload storage.IMap[string, json.RawMessage] currentNodePayload storage.IMap[string, json.RawMessage]
currentNodeResult storage.IMap[string, mq.Result] currentNodeResult storage.IMap[string, mq.Result]
createdAt time.Time taskQueue chan *task
latency string
result *mq.Result result *mq.Result
dag *DAG dag *DAG
taskID string
taskQueue chan *task
resultQueue chan nodeResult resultQueue chan nodeResult
resultCh chan mq.Result resultCh chan mq.Result
stopCh chan struct{} stopCh chan struct{}
taskID string
latency string
} }
type task struct { type task struct {

View File

@@ -3,10 +3,10 @@ package main
import ( import (
"fmt" "fmt"
"github.com/oarkflow/mq/utils" "github.com/oarkflow/form"
) )
func main() { func main() {
queryString := []byte("fields[0][method]=GET&fields[0][path]=/user/:id&fields[0][handlerMsg]=User Profile&fields[1][method]=POST&fields[1][path]=/user/create&fields[1][handlerMsg]=Create User") queryString := []byte("fields[0][method]=GET&fields[0][path]=/user/:id&fields[0][handlerMsg]=User Profile&fields[1][method]=POST&fields[1][path]=/user/create&fields[1][handlerMsg]=Create User")
fmt.Println(utils.DecodeForm(queryString)) fmt.Println(form.DecodeForm(queryString))
} }

View File

@@ -51,8 +51,8 @@ func (r Result) MarshalJSON() ([]byte, error) {
func (r *Result) UnmarshalJSON(data []byte) error { func (r *Result) UnmarshalJSON(data []byte) error {
type Alias Result type Alias Result
aux := &struct { aux := &struct {
ErrMsg string `json:"error,omitempty"`
*Alias *Alias
ErrMsg string `json:"error,omitempty"`
}{ }{
Alias: (*Alias)(r), Alias: (*Alias)(r),
} }
@@ -110,18 +110,18 @@ type TLSConfig struct {
} }
type Options struct { type Options struct {
storage TaskStorage
consumerOnSubscribe func(ctx context.Context, topic, consumerName string) consumerOnSubscribe func(ctx context.Context, topic, consumerName string)
consumerOnClose func(ctx context.Context, topic, consumerName string) consumerOnClose func(ctx context.Context, topic, consumerName string)
notifyResponse func(context.Context, Result) error notifyResponse func(context.Context, Result) error
tlsConfig TLSConfig
brokerAddr string brokerAddr string
tlsConfig TLSConfig
callback []func(context.Context, Result) Result callback []func(context.Context, Result) Result
maxRetries int queueSize int
initialDelay time.Duration initialDelay time.Duration
storage TaskStorage
maxBackoff time.Duration maxBackoff time.Duration
jitterPercent float64 jitterPercent float64
queueSize int maxRetries int
numOfWorkers int numOfWorkers int
maxMemoryLoad int64 maxMemoryLoad int64
syncMode bool syncMode bool

30
pool.go
View File

@@ -26,29 +26,29 @@ type Metrics struct {
type Pool struct { type Pool struct {
taskStorage TaskStorage taskStorage TaskStorage
taskQueue PriorityQueue scheduler *Scheduler
taskQueueLock sync.Mutex
stop chan struct{} stop chan struct{}
taskNotify chan struct{} taskNotify chan struct{}
workerAdjust chan int workerAdjust chan int
wg sync.WaitGroup
maxMemoryLoad int64
numOfWorkers int32
metrics Metrics
paused bool
scheduler *Scheduler
overflowBufferLock sync.RWMutex
overflowBuffer []*QueueTask
taskAvailableCond *sync.Cond
handler Handler handler Handler
callback Callback
batchSize int
timeout time.Duration
completionCallback CompletionCallback completionCallback CompletionCallback
taskAvailableCond *sync.Cond
callback Callback
taskQueue PriorityQueue
overflowBuffer []*QueueTask
metrics Metrics
wg sync.WaitGroup
taskCompletionNotifier sync.WaitGroup taskCompletionNotifier sync.WaitGroup
timeout time.Duration
batchSize int
maxMemoryLoad int64
idleTimeout time.Duration idleTimeout time.Duration
backoffDuration time.Duration backoffDuration time.Duration
maxRetries int // Max retries for tasks maxRetries int
overflowBufferLock sync.RWMutex
taskQueueLock sync.Mutex
numOfWorkers int32
paused bool
} }
func NewPool(numOfWorkers int, opts ...PoolOption) *Pool { func NewPool(numOfWorkers int, opts ...PoolOption) *Pool {

View File

@@ -12,8 +12,8 @@ import (
type ScheduleOptions struct { type ScheduleOptions struct {
Handler Handler Handler Handler
Callback Callback Callback Callback
Overlap bool
Interval time.Duration Interval time.Duration
Overlap bool
Recurring bool Recurring bool
} }
@@ -74,12 +74,12 @@ type ScheduledTask struct {
} }
type Schedule struct { type Schedule struct {
Interval time.Duration TimeOfDay time.Time
CronSpec string
DayOfWeek []time.Weekday DayOfWeek []time.Weekday
DayOfMonth []int DayOfMonth []int
TimeOfDay time.Time Interval time.Duration
Recurring bool Recurring bool
CronSpec string
} }
func (s *Schedule) ToHumanReadable() string { func (s *Schedule) ToHumanReadable() string {
@@ -216,9 +216,9 @@ func parseCronValue(field string) ([]string, error) {
} }
type Scheduler struct { type Scheduler struct {
pool *Pool
tasks []ScheduledTask tasks []ScheduledTask
mu sync.Mutex mu sync.Mutex
pool *Pool
} }
func (s *Scheduler) Start() { func (s *Scheduler) Start() {

View File

@@ -6,52 +6,52 @@ import (
) )
type hub struct { type hub struct {
sockets map[string]*Socket multihomeBackend Adapter
rooms map[string]*room joinRoomCh chan *joinRequest
shutdownCh chan bool roomMsgCh chan *RoomMsg
socketList chan []*Socket socketList chan []*Socket
addCh chan *Socket addCh chan *Socket
delCh chan *Socket delCh chan *Socket
joinRoomCh chan *joinRequest sockets map[string]*Socket
leaveRoomCh chan *leaveRequest leaveRoomCh chan *leaveRequest
roomMsgCh chan *RoomMsg shutdownCh chan bool
broomcastCh chan *RoomMsg // for passing data from the backend broomcastCh chan *RoomMsg
broadcastCh chan *BroadcastMsg broadcastCh chan *BroadcastMsg
bbroadcastCh chan *BroadcastMsg bbroadcastCh chan *BroadcastMsg
multihomeEnabled bool rooms map[string]*room
multihomeBackend Adapter
l sync.RWMutex l sync.RWMutex
multihomeEnabled bool
} }
type room struct { type room struct {
name string
sockets map[string]*Socket sockets map[string]*Socket
name string
l sync.RWMutex l sync.RWMutex
} }
type joinRequest struct { type joinRequest struct {
roomName string
socket *Socket socket *Socket
roomName string
} }
type leaveRequest struct { type leaveRequest struct {
roomName string
socket *Socket socket *Socket
roomName string
} }
// RoomMsg represents an event to be dispatched to a room of sockets // RoomMsg represents an event to be dispatched to a room of sockets
type RoomMsg struct { type RoomMsg struct {
RoomName string
Except []string
EventName string
Data any Data any
RoomName string
EventName string
Except []string
} }
// BroadcastMsg represents an event to be dispatched to all Sockets on the Server // BroadcastMsg represents an event to be dispatched to all Sockets on the Server
type BroadcastMsg struct { type BroadcastMsg struct {
Data any
EventName string EventName string
Except []string Except []string
Data any
} }
func (h *hub) addSocket(s *Socket) { func (h *hub) addSocket(s *Socket) {

View File

@@ -2,7 +2,6 @@ package sio
import ( import (
"context" "context"
"github.com/gorilla/websocket"
"io" "io"
"log/slog" "log/slog"
"net/http" "net/http"
@@ -12,6 +11,8 @@ import (
"sync" "sync"
"syscall" "syscall"
"time" "time"
"github.com/gorilla/websocket"
) )
const ( // ASCII chars const ( // ASCII chars
@@ -23,8 +24,8 @@ const ( // ASCII chars
) )
type event struct { type event struct {
eventName string
eventHandler func(*Socket, []byte) eventHandler func(*Socket, []byte)
eventName string
} }
// Config specifies parameters for upgrading an HTTP connection to a // Config specifies parameters for upgrading an HTTP connection to a
@@ -32,13 +33,14 @@ type event struct {
// //
// It is safe to call Config's methods concurrently. // It is safe to call Config's methods concurrently.
type Config struct { type Config struct {
HandshakeTimeout time.Duration WriteBufferPool websocket.BufferPool
ReadBufferSize, WriteBufferSize int Error func(w http.ResponseWriter, r *http.Request, status int, reason error)
WriteBufferPool websocket.BufferPool CheckOrigin func(r *http.Request) bool
Subprotocols []string Subprotocols []string
Error func(w http.ResponseWriter, r *http.Request, status int, reason error) HandshakeTimeout time.Duration
CheckOrigin func(r *http.Request) bool ReadBufferSize int
EnableCompression bool WriteBufferSize int
EnableCompression bool
} }
// Server manages the coordination between // Server manages the coordination between
@@ -182,7 +184,7 @@ type EventHandler interface {
// Any event functions registered with On, must be safe for concurrent use by multiple // Any event functions registered with On, must be safe for concurrent use by multiple
// go routines // go routines
func (serv *Server) On(eventName string, handleFunc func(*Socket, []byte)) { func (serv *Server) On(eventName string, handleFunc func(*Socket, []byte)) {
serv.events[eventName] = &event{eventName, handleFunc} // you think you can handle the func? serv.events[eventName] = &event{eventName: eventName, eventHandler: handleFunc} // you think you can handle the func?
} }
// OnEvent has the same functionality as On, but accepts // OnEvent has the same functionality as On, but accepts

View File

@@ -22,18 +22,18 @@ var (
// Socket represents a websocket connection // Socket represents a websocket connection
type Socket struct { type Socket struct {
context storage.IMap[string, any]
l *sync.RWMutex l *sync.RWMutex
id string
ws *websocket.Conn ws *websocket.Conn
closed bool
serv *Server serv *Server
roomsl *sync.RWMutex roomsl *sync.RWMutex
request *http.Request request *http.Request
context storage.IMap[string, any]
rooms map[string]bool rooms map[string]bool
pingTicker *time.Ticker pingTicker *time.Ticker
tickerDone chan bool tickerDone chan bool
id string
pingInterval time.Duration pingInterval time.Duration
closed bool
} }
const ( const (
@@ -139,7 +139,7 @@ func (s *Socket) GetRooms() []string {
func (s *Socket) Join(roomName string) { func (s *Socket) Join(roomName string) {
s.roomsl.Lock() s.roomsl.Lock()
defer s.roomsl.Unlock() defer s.roomsl.Unlock()
s.serv.hub.joinRoom(&joinRequest{roomName, s}) s.serv.hub.joinRoom(&joinRequest{roomName: roomName, socket: s})
s.rooms[roomName] = true s.rooms[roomName] = true
} }
@@ -149,7 +149,7 @@ func (s *Socket) Join(roomName string) {
func (s *Socket) Leave(roomName string) { func (s *Socket) Leave(roomName string) {
s.roomsl.Lock() s.roomsl.Lock()
defer s.roomsl.Unlock() defer s.roomsl.Unlock()
s.serv.hub.leaveRoom(&leaveRequest{roomName, s}) s.serv.hub.leaveRoom(&leaveRequest{roomName: roomName, socket: s})
delete(s.rooms, roomName) delete(s.rooms, roomName)
} }