mirror of
https://github.com/oarkflow/mq.git
synced 2025-10-05 07:57:00 +08:00
fix: ui for DOT
This commit is contained in:
18
broker.go
18
broker.go
@@ -38,7 +38,7 @@ type Broker struct {
|
||||
queues storage.IMap[string, *Queue]
|
||||
consumers storage.IMap[string, *consumer]
|
||||
publishers storage.IMap[string, *publisher]
|
||||
deadLetter storage.IMap[string, *Queue] // DLQ mapping for each queue
|
||||
deadLetter storage.IMap[string, *Queue]
|
||||
opts *Options
|
||||
}
|
||||
|
||||
@@ -241,7 +241,6 @@ func (b *Broker) SubscribeHandler(ctx context.Context, conn net.Conn, msg *codec
|
||||
func (b *Broker) Start(ctx context.Context) error {
|
||||
var listener net.Listener
|
||||
var err error
|
||||
|
||||
if b.opts.tlsConfig.UseTLS {
|
||||
cert, err := tls.LoadX509KeyPair(b.opts.tlsConfig.CertPath, b.opts.tlsConfig.KeyPath)
|
||||
if err != nil {
|
||||
@@ -263,37 +262,30 @@ func (b *Broker) Start(ctx context.Context) error {
|
||||
log.Println("BROKER - RUNNING ~> started on", b.opts.brokerAddr)
|
||||
}
|
||||
defer listener.Close()
|
||||
|
||||
// Limit the number of concurrent connections
|
||||
const maxConcurrentConnections = 100
|
||||
sem := make(chan struct{}, maxConcurrentConnections)
|
||||
|
||||
for {
|
||||
conn, err := listener.Accept()
|
||||
if err != nil {
|
||||
b.OnError(ctx, conn, err)
|
||||
time.Sleep(50 * time.Millisecond) // Slow down retry on errors
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
|
||||
// Control concurrency by using a semaphore
|
||||
sem <- struct{}{}
|
||||
go func(c net.Conn) {
|
||||
defer func() {
|
||||
<-sem // Release semaphore
|
||||
<-sem
|
||||
c.Close()
|
||||
}()
|
||||
|
||||
for {
|
||||
// Attempt to read the message
|
||||
err := b.readMessage(ctx, c)
|
||||
if err != nil {
|
||||
if netErr, ok := err.(net.Error); ok && netErr.Temporary() {
|
||||
log.Println("Temporary network error, retrying:", netErr)
|
||||
continue // Retry on temporary errors
|
||||
continue
|
||||
}
|
||||
log.Println("Connection closed due to error:", err)
|
||||
break // Break the loop and close the connection
|
||||
break
|
||||
}
|
||||
}
|
||||
}(conn)
|
||||
|
@@ -56,7 +56,6 @@ func SendMessage(ctx context.Context, conn net.Conn, msg *Message) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
totalLength := 4 + len(data)
|
||||
buffer := byteBufferPool.Get().([]byte)
|
||||
if cap(buffer) < totalLength {
|
||||
@@ -65,10 +64,8 @@ func SendMessage(ctx context.Context, conn net.Conn, msg *Message) error {
|
||||
buffer = buffer[:totalLength]
|
||||
}
|
||||
defer byteBufferPool.Put(buffer)
|
||||
|
||||
binary.BigEndian.PutUint32(buffer[:4], uint32(len(data)))
|
||||
copy(buffer[4:], data)
|
||||
|
||||
writer := bufio.NewWriter(conn)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -78,7 +75,6 @@ func SendMessage(ctx context.Context, conn net.Conn, msg *Message) error {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return writer.Flush()
|
||||
}
|
||||
|
||||
|
@@ -71,6 +71,10 @@ func (c *Consumer) SetKey(key string) {
|
||||
c.id = key
|
||||
}
|
||||
|
||||
func (c *Consumer) Metrics() Metrics {
|
||||
return c.pool.Metrics()
|
||||
}
|
||||
|
||||
func (c *Consumer) subscribe(ctx context.Context, queue string) error {
|
||||
headers := HeadersWithConsumerID(ctx, c.id)
|
||||
msg := codec.NewMessage(consts.SUBSCRIBE, utils.ToByte("{}"), queue, headers)
|
||||
|
40
dag/dag.go
40
dag/dag.go
@@ -358,24 +358,31 @@ func (tm *DAG) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
|
||||
return result
|
||||
}
|
||||
|
||||
func (tm *DAG) Process(ctx context.Context, payload []byte) mq.Result {
|
||||
func (tm *DAG) check(ctx context.Context, payload []byte) (context.Context, *mq.Task, error) {
|
||||
tm.mu.RLock()
|
||||
if tm.paused {
|
||||
tm.mu.RUnlock()
|
||||
return mq.Result{Error: fmt.Errorf("unable to process task, error: DAG is not accepting any task")}
|
||||
return ctx, nil, fmt.Errorf("unable to process task, error: DAG is not accepting any task")
|
||||
}
|
||||
tm.mu.RUnlock()
|
||||
if !tm.IsReady() {
|
||||
return mq.Result{Error: fmt.Errorf("unable to process task, error: DAG is not ready yet")}
|
||||
return ctx, nil, fmt.Errorf("unable to process task, error: DAG is not ready yet")
|
||||
}
|
||||
initialNode, err := tm.parseInitialNode(ctx)
|
||||
if err != nil {
|
||||
return mq.Result{Error: err}
|
||||
return ctx, nil, err
|
||||
}
|
||||
if tm.server.SyncMode() {
|
||||
ctx = mq.SetHeaders(ctx, map[string]string{consts.AwaitResponseKey: "true"})
|
||||
}
|
||||
task := mq.NewTask(mq.NewID(), payload, initialNode)
|
||||
return ctx, mq.NewTask(mq.NewID(), payload, initialNode), nil
|
||||
}
|
||||
|
||||
func (tm *DAG) Process(ctx context.Context, payload []byte) mq.Result {
|
||||
ctx, task, err := tm.check(ctx, payload)
|
||||
if err != nil {
|
||||
return mq.Result{Error: fmt.Errorf("unable to process task, error: DAG is not accepting any task")}
|
||||
}
|
||||
awaitResponse, _ := mq.GetAwaitResponse(ctx)
|
||||
if awaitResponse != "true" {
|
||||
headers, ok := mq.GetHeaders(ctx)
|
||||
@@ -384,38 +391,25 @@ func (tm *DAG) Process(ctx context.Context, payload []byte) mq.Result {
|
||||
ctxx = mq.SetHeaders(ctxx, headers.AsMap())
|
||||
}
|
||||
if err := tm.pool.EnqueueTask(ctxx, task, 0); err != nil {
|
||||
return mq.Result{CreatedAt: task.CreatedAt, TaskID: task.ID, Topic: initialNode, Status: "FAILED", Error: err}
|
||||
return mq.Result{CreatedAt: task.CreatedAt, TaskID: task.ID, Topic: task.Topic, Status: "FAILED", Error: err}
|
||||
}
|
||||
return mq.Result{CreatedAt: task.CreatedAt, TaskID: task.ID, Topic: initialNode, Status: "PENDING"}
|
||||
return mq.Result{CreatedAt: task.CreatedAt, TaskID: task.ID, Topic: task.Topic, Status: "PENDING"}
|
||||
}
|
||||
return tm.ProcessTask(ctx, task)
|
||||
}
|
||||
|
||||
func (tm *DAG) ScheduleTask(ctx context.Context, payload []byte, opts ...mq.SchedulerOption) mq.Result {
|
||||
tm.mu.RLock()
|
||||
if tm.paused {
|
||||
tm.mu.RUnlock()
|
||||
ctx, task, err := tm.check(ctx, payload)
|
||||
if err != nil {
|
||||
return mq.Result{Error: fmt.Errorf("unable to process task, error: DAG is not accepting any task")}
|
||||
}
|
||||
tm.mu.RUnlock()
|
||||
if !tm.IsReady() {
|
||||
return mq.Result{Error: fmt.Errorf("unable to process task, error: DAG is not ready yet")}
|
||||
}
|
||||
initialNode, err := tm.parseInitialNode(ctx)
|
||||
if err != nil {
|
||||
return mq.Result{Error: err}
|
||||
}
|
||||
if tm.server.SyncMode() {
|
||||
ctx = mq.SetHeaders(ctx, map[string]string{consts.AwaitResponseKey: "true"})
|
||||
}
|
||||
task := mq.NewTask(mq.NewID(), payload, initialNode)
|
||||
headers, ok := mq.GetHeaders(ctx)
|
||||
ctxx := context.Background()
|
||||
if ok {
|
||||
ctxx = mq.SetHeaders(ctxx, headers.AsMap())
|
||||
}
|
||||
tm.pool.Scheduler().AddTask(ctxx, task, opts...)
|
||||
return mq.Result{CreatedAt: task.CreatedAt, TaskID: task.ID, Topic: initialNode, Status: "PENDING"}
|
||||
return mq.Result{CreatedAt: task.CreatedAt, TaskID: task.ID, Topic: task.Topic, Status: "PENDING"}
|
||||
}
|
||||
|
||||
func (tm *DAG) parseInitialNode(ctx context.Context) (string, error) {
|
||||
|
@@ -14,6 +14,7 @@ import (
|
||||
type TaskManager struct {
|
||||
createdAt time.Time
|
||||
processedAt time.Time
|
||||
status string
|
||||
dag *DAG
|
||||
nodeResults map[string]mq.Result
|
||||
wg *WaitGroup
|
||||
|
Reference in New Issue
Block a user