diff --git a/broker.go b/broker.go index 49ec808..9b0b497 100644 --- a/broker.go +++ b/broker.go @@ -263,18 +263,40 @@ func (b *Broker) Start(ctx context.Context) error { log.Println("BROKER - RUNNING ~> started on", b.opts.brokerAddr) } defer listener.Close() + + // Limit the number of concurrent connections + const maxConcurrentConnections = 100 + sem := make(chan struct{}, maxConcurrentConnections) + for { conn, err := listener.Accept() if err != nil { b.OnError(ctx, conn, err) + time.Sleep(50 * time.Millisecond) // Slow down retry on errors continue } + + // Control concurrency by using a semaphore + sem <- struct{}{} go func(c net.Conn) { - defer c.Close() + defer func() { + <-sem // Release semaphore + c.Close() + }() + + // Optionally set connection timeouts to prevent idle connections + c.SetReadDeadline(time.Now().Add(5 * time.Minute)) + for { + // Attempt to read the message err := b.readMessage(ctx, c) if err != nil { - break + if netErr, ok := err.(net.Error); ok && netErr.Temporary() { + log.Println("Temporary network error, retrying:", netErr) + continue // Retry on temporary errors + } + log.Println("Connection closed due to error:", err) + break // Break the loop and close the connection } } }(conn) diff --git a/consumer.go b/consumer.go index 062fc97..197c097 100644 --- a/consumer.go +++ b/consumer.go @@ -146,6 +146,7 @@ func (c *Consumer) ConsumeMessage(ctx context.Context, msg *codec.Message, conn } func (c *Consumer) ProcessTask(ctx context.Context, msg *Task) Result { + defer RecoverPanic(RecoverTitle) queue, _ := GetQueue(ctx) if msg.Topic == "" && queue != "" { msg.Topic = queue diff --git a/ctx.go b/ctx.go index 60c995e..d9ee35e 100644 --- a/ctx.go +++ b/ctx.go @@ -8,6 +8,7 @@ import ( "net" "os" + "github.com/oarkflow/errors" "github.com/oarkflow/xid" "github.com/oarkflow/mq/consts" @@ -126,3 +127,7 @@ func GetConnection(addr string, config TLSConfig) (net.Conn, error) { return net.Dial("tcp", addr) } } + +func WrapError(err error, msg, op string) error { + return errors.Wrap(err, msg, op) +} diff --git a/dag/api.go b/dag/api.go index 130e5d7..b8bbdb5 100644 --- a/dag/api.go +++ b/dag/api.go @@ -10,6 +10,7 @@ import ( "github.com/oarkflow/mq" "github.com/oarkflow/mq/consts" + "github.com/oarkflow/mq/metrics" ) type Request struct { @@ -21,6 +22,7 @@ type Request struct { } func (tm *DAG) Handlers() { + metrics.HandleHTTP() http.HandleFunc("POST /request", tm.Request) http.HandleFunc("POST /publish", tm.Publish) http.HandleFunc("POST /schedule", tm.Schedule) diff --git a/dag/dag.go b/dag/dag.go index 9b6c900..0e7a863 100644 --- a/dag/dag.go +++ b/dag/dag.go @@ -12,6 +12,7 @@ import ( "github.com/oarkflow/mq" "github.com/oarkflow/mq/consts" + "github.com/oarkflow/mq/metrics" ) type EdgeType int @@ -180,15 +181,19 @@ func (tm *DAG) GetStartNode() string { } func (tm *DAG) Start(ctx context.Context, addr string) error { + // Start the server in a separate goroutine + go func() { + defer mq.RecoverPanic(mq.RecoverTitle) + if err := tm.server.Start(ctx); err != nil { + panic(err) + } + }() + + // Start the node consumers if not in sync mode if !tm.server.SyncMode() { - go func() { - err := tm.server.Start(ctx) - if err != nil { - panic(err) - } - }() for _, con := range tm.nodes { go func(con *Node) { + defer mq.RecoverPanic(mq.RecoverTitle) limiter := rate.NewLimiter(rate.Every(1*time.Second), 1) // Retry every second for { err := con.processor.Consume(ctx) @@ -317,11 +322,20 @@ func (tm *DAG) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { if tm.consumer != nil { initialNode, err := tm.parseInitialNode(ctx) if err != nil { + metrics.TasksErrors.WithLabelValues("unknown").Inc() // Increase error count return mq.Result{Error: err} } task.Topic = initialNode } - return manager.processTask(ctx, task.Topic, task.Payload) + result := manager.processTask(ctx, task.Topic, task.Payload) + + if result.Error != nil { + metrics.TasksErrors.WithLabelValues(task.Topic).Inc() // Increase error count + } else { + metrics.TasksProcessed.WithLabelValues("success").Inc() // Increase processed task count + } + + return result } func (tm *DAG) Process(ctx context.Context, payload []byte) mq.Result { diff --git a/dag/task_manager.go b/dag/task_manager.go index 93a235d..1af2296 100644 --- a/dag/task_manager.go +++ b/dag/task_manager.go @@ -39,6 +39,7 @@ func (tm *TaskManager) updateTS(result *mq.Result) { } func (tm *TaskManager) processTask(ctx context.Context, nodeID string, payload json.RawMessage) mq.Result { + defer mq.RecoverPanic(mq.RecoverTitle) node, ok := tm.dag.nodes[nodeID] if !ok { return mq.Result{Error: fmt.Errorf("nodeID %s not found", nodeID)} @@ -178,6 +179,7 @@ func (tm *TaskManager) appendFinalResult(result mq.Result) { } func (tm *TaskManager) processNode(ctx context.Context, node *Node, payload json.RawMessage) { + defer mq.RecoverPanic(mq.RecoverTitle) dag, isDAG := isDAGNode(node) if isDAG { if tm.dag.server.SyncMode() && !dag.server.SyncMode() { diff --git a/examples/dag.go b/examples/dag.go index f483070..7583c69 100644 --- a/examples/dag.go +++ b/examples/dag.go @@ -13,7 +13,7 @@ import ( ) func main() { - Sync() + // Sync() aSync() } diff --git a/examples/tasks/operations.go b/examples/tasks/operations.go index be5158f..86cd592 100644 --- a/examples/tasks/operations.go +++ b/examples/tasks/operations.go @@ -94,6 +94,7 @@ type StoreData struct { } func (e *StoreData) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + panic("panic on store") return mq.Result{Payload: task.Payload, Ctx: ctx} } diff --git a/go.mod b/go.mod index 7b82534..ea15c29 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,19 @@ require ( github.com/oarkflow/expr v0.0.11 github.com/oarkflow/json v0.0.13 github.com/oarkflow/xid v1.2.5 + github.com/prometheus/client_golang v1.20.5 golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c golang.org/x/time v0.7.0 ) + +require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/klauspost/compress v1.17.9 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/common v0.55.0 // indirect + github.com/prometheus/procfs v0.15.1 // indirect + golang.org/x/sys v0.22.0 // indirect + google.golang.org/protobuf v1.34.2 // indirect +) diff --git a/go.sum b/go.sum index 00f370b..95e7b67 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,15 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/oarkflow/date v0.0.4 h1:EwY/wiS3CqZNBx7b2x+3kkJwVNuGk+G0dls76kL/fhU= github.com/oarkflow/date v0.0.4/go.mod h1:xQTFc6p6O5VX6J75ZrPJbelIFGca1ASmhpgirFqL8vM= github.com/oarkflow/dipper v0.0.6 h1:E+ak9i4R1lxx0B04CjfG5DTLTmwuWA1nrdS6KIHdUxQ= @@ -10,7 +22,19 @@ github.com/oarkflow/json v0.0.13 h1:/ZKW924/v4U1ht34WY7rj/GC/qW9+10IiV5+MR2vO0A= github.com/oarkflow/json v0.0.13/go.mod h1:S5BZA4/rM87+MY8mFrga3jISzxCL9RtLE6xHSk63VxI= github.com/oarkflow/xid v1.2.5 h1:6RcNJm9+oZ/B647gkME9trCzhpxGQaSdNoD56Vmkeho= github.com/oarkflow/xid v1.2.5/go.mod h1:jG4YBh+swbjlWApGWDBYnsJEa7hi3CCpmuqhB3RAxVo= +github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y= +github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= +github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= +github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= +github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc= +github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8= +github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= +github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c h1:7dEasQXItcW1xKJ2+gg5VOiBnqWrJc+rq0DPKyvvdbY= golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c/go.mod h1:NQtJDoLvd6faHhE7m4T/1IY708gDefGGjR/iUW8yQQ8= +golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= +golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ= golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= diff --git a/metrics/metrics.go b/metrics/metrics.go new file mode 100644 index 0000000..69682da --- /dev/null +++ b/metrics/metrics.go @@ -0,0 +1,34 @@ +package metrics + +import ( + "net/http" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +var ( + TasksProcessed = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "tasks_processed_total", + Help: "Total number of processed tasks.", + }, + []string{"status"}, + ) + TasksErrors = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "tasks_errors_total", + Help: "Total number of errors encountered while processing tasks.", + }, + []string{"node"}, + ) +) + +func init() { + prometheus.MustRegister(TasksProcessed) + prometheus.MustRegister(TasksErrors) +} + +func HandleHTTP() { + http.Handle("/metrics", promhttp.Handler()) +} diff --git a/recover.go b/recover.go new file mode 100644 index 0000000..8fad4d8 --- /dev/null +++ b/recover.go @@ -0,0 +1,39 @@ +package mq + +import ( + "fmt" + "log" + "runtime" + "runtime/debug" + + "github.com/oarkflow/mq/metrics" +) + +func RecoverPanic(labelGenerator func() string) { + if r := recover(); r != nil { + pc, file, line, ok := runtime.Caller(2) + funcName := "unknown" + if ok { + fn := runtime.FuncForPC(pc) + if fn != nil { + funcName = fn.Name() + } + } + log.Printf("[PANIC] - recovered from panic in %s (%s:%d): %v\nStack trace: %s", funcName, file, line, r, debug.Stack()) + label := labelGenerator() + metrics.TasksErrors.WithLabelValues(label).Inc() + } +} + +func RecoverTitle() string { + pc, _, line, ok := runtime.Caller(1) + if !ok { + return "Unknown" + } + fn := runtime.FuncForPC(pc) + funcName := "unknown" + if fn != nil { + funcName = fn.Name() + } + return fmt.Sprintf("%s:%d", funcName, line) +}