mirror of
https://github.com/oarkflow/mq.git
synced 2025-09-27 04:15:52 +08:00
feat: Add connection
This commit is contained in:
26
broker.go
26
broker.go
@@ -263,18 +263,40 @@ func (b *Broker) Start(ctx context.Context) error {
|
||||
log.Println("BROKER - RUNNING ~> started on", b.opts.brokerAddr)
|
||||
}
|
||||
defer listener.Close()
|
||||
|
||||
// Limit the number of concurrent connections
|
||||
const maxConcurrentConnections = 100
|
||||
sem := make(chan struct{}, maxConcurrentConnections)
|
||||
|
||||
for {
|
||||
conn, err := listener.Accept()
|
||||
if err != nil {
|
||||
b.OnError(ctx, conn, err)
|
||||
time.Sleep(50 * time.Millisecond) // Slow down retry on errors
|
||||
continue
|
||||
}
|
||||
|
||||
// Control concurrency by using a semaphore
|
||||
sem <- struct{}{}
|
||||
go func(c net.Conn) {
|
||||
defer c.Close()
|
||||
defer func() {
|
||||
<-sem // Release semaphore
|
||||
c.Close()
|
||||
}()
|
||||
|
||||
// Optionally set connection timeouts to prevent idle connections
|
||||
c.SetReadDeadline(time.Now().Add(5 * time.Minute))
|
||||
|
||||
for {
|
||||
// Attempt to read the message
|
||||
err := b.readMessage(ctx, c)
|
||||
if err != nil {
|
||||
break
|
||||
if netErr, ok := err.(net.Error); ok && netErr.Temporary() {
|
||||
log.Println("Temporary network error, retrying:", netErr)
|
||||
continue // Retry on temporary errors
|
||||
}
|
||||
log.Println("Connection closed due to error:", err)
|
||||
break // Break the loop and close the connection
|
||||
}
|
||||
}
|
||||
}(conn)
|
||||
|
@@ -146,6 +146,7 @@ func (c *Consumer) ConsumeMessage(ctx context.Context, msg *codec.Message, conn
|
||||
}
|
||||
|
||||
func (c *Consumer) ProcessTask(ctx context.Context, msg *Task) Result {
|
||||
defer RecoverPanic(RecoverTitle)
|
||||
queue, _ := GetQueue(ctx)
|
||||
if msg.Topic == "" && queue != "" {
|
||||
msg.Topic = queue
|
||||
|
5
ctx.go
5
ctx.go
@@ -8,6 +8,7 @@ import (
|
||||
"net"
|
||||
"os"
|
||||
|
||||
"github.com/oarkflow/errors"
|
||||
"github.com/oarkflow/xid"
|
||||
|
||||
"github.com/oarkflow/mq/consts"
|
||||
@@ -126,3 +127,7 @@ func GetConnection(addr string, config TLSConfig) (net.Conn, error) {
|
||||
return net.Dial("tcp", addr)
|
||||
}
|
||||
}
|
||||
|
||||
func WrapError(err error, msg, op string) error {
|
||||
return errors.Wrap(err, msg, op)
|
||||
}
|
||||
|
@@ -10,6 +10,7 @@ import (
|
||||
|
||||
"github.com/oarkflow/mq"
|
||||
"github.com/oarkflow/mq/consts"
|
||||
"github.com/oarkflow/mq/metrics"
|
||||
)
|
||||
|
||||
type Request struct {
|
||||
@@ -21,6 +22,7 @@ type Request struct {
|
||||
}
|
||||
|
||||
func (tm *DAG) Handlers() {
|
||||
metrics.HandleHTTP()
|
||||
http.HandleFunc("POST /request", tm.Request)
|
||||
http.HandleFunc("POST /publish", tm.Publish)
|
||||
http.HandleFunc("POST /schedule", tm.Schedule)
|
||||
|
22
dag/dag.go
22
dag/dag.go
@@ -12,6 +12,7 @@ import (
|
||||
|
||||
"github.com/oarkflow/mq"
|
||||
"github.com/oarkflow/mq/consts"
|
||||
"github.com/oarkflow/mq/metrics"
|
||||
)
|
||||
|
||||
type EdgeType int
|
||||
@@ -180,15 +181,19 @@ func (tm *DAG) GetStartNode() string {
|
||||
}
|
||||
|
||||
func (tm *DAG) Start(ctx context.Context, addr string) error {
|
||||
if !tm.server.SyncMode() {
|
||||
// Start the server in a separate goroutine
|
||||
go func() {
|
||||
err := tm.server.Start(ctx)
|
||||
if err != nil {
|
||||
defer mq.RecoverPanic(mq.RecoverTitle)
|
||||
if err := tm.server.Start(ctx); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Start the node consumers if not in sync mode
|
||||
if !tm.server.SyncMode() {
|
||||
for _, con := range tm.nodes {
|
||||
go func(con *Node) {
|
||||
defer mq.RecoverPanic(mq.RecoverTitle)
|
||||
limiter := rate.NewLimiter(rate.Every(1*time.Second), 1) // Retry every second
|
||||
for {
|
||||
err := con.processor.Consume(ctx)
|
||||
@@ -317,11 +322,20 @@ func (tm *DAG) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
|
||||
if tm.consumer != nil {
|
||||
initialNode, err := tm.parseInitialNode(ctx)
|
||||
if err != nil {
|
||||
metrics.TasksErrors.WithLabelValues("unknown").Inc() // Increase error count
|
||||
return mq.Result{Error: err}
|
||||
}
|
||||
task.Topic = initialNode
|
||||
}
|
||||
return manager.processTask(ctx, task.Topic, task.Payload)
|
||||
result := manager.processTask(ctx, task.Topic, task.Payload)
|
||||
|
||||
if result.Error != nil {
|
||||
metrics.TasksErrors.WithLabelValues(task.Topic).Inc() // Increase error count
|
||||
} else {
|
||||
metrics.TasksProcessed.WithLabelValues("success").Inc() // Increase processed task count
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func (tm *DAG) Process(ctx context.Context, payload []byte) mq.Result {
|
||||
|
@@ -39,6 +39,7 @@ func (tm *TaskManager) updateTS(result *mq.Result) {
|
||||
}
|
||||
|
||||
func (tm *TaskManager) processTask(ctx context.Context, nodeID string, payload json.RawMessage) mq.Result {
|
||||
defer mq.RecoverPanic(mq.RecoverTitle)
|
||||
node, ok := tm.dag.nodes[nodeID]
|
||||
if !ok {
|
||||
return mq.Result{Error: fmt.Errorf("nodeID %s not found", nodeID)}
|
||||
@@ -178,6 +179,7 @@ func (tm *TaskManager) appendFinalResult(result mq.Result) {
|
||||
}
|
||||
|
||||
func (tm *TaskManager) processNode(ctx context.Context, node *Node, payload json.RawMessage) {
|
||||
defer mq.RecoverPanic(mq.RecoverTitle)
|
||||
dag, isDAG := isDAGNode(node)
|
||||
if isDAG {
|
||||
if tm.dag.server.SyncMode() && !dag.server.SyncMode() {
|
||||
|
@@ -13,7 +13,7 @@ import (
|
||||
)
|
||||
|
||||
func main() {
|
||||
Sync()
|
||||
// Sync()
|
||||
aSync()
|
||||
}
|
||||
|
||||
|
@@ -94,6 +94,7 @@ type StoreData struct {
|
||||
}
|
||||
|
||||
func (e *StoreData) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
|
||||
panic("panic on store")
|
||||
return mq.Result{Payload: task.Payload, Ctx: ctx}
|
||||
}
|
||||
|
||||
|
13
go.mod
13
go.mod
@@ -9,6 +9,19 @@ require (
|
||||
github.com/oarkflow/expr v0.0.11
|
||||
github.com/oarkflow/json v0.0.13
|
||||
github.com/oarkflow/xid v1.2.5
|
||||
github.com/prometheus/client_golang v1.20.5
|
||||
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c
|
||||
golang.org/x/time v0.7.0
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
github.com/klauspost/compress v1.17.9 // indirect
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||
github.com/prometheus/client_model v0.6.1 // indirect
|
||||
github.com/prometheus/common v0.55.0 // indirect
|
||||
github.com/prometheus/procfs v0.15.1 // indirect
|
||||
golang.org/x/sys v0.22.0 // indirect
|
||||
google.golang.org/protobuf v1.34.2 // indirect
|
||||
)
|
||||
|
24
go.sum
24
go.sum
@@ -1,3 +1,15 @@
|
||||
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
|
||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
|
||||
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
|
||||
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
|
||||
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
|
||||
github.com/oarkflow/date v0.0.4 h1:EwY/wiS3CqZNBx7b2x+3kkJwVNuGk+G0dls76kL/fhU=
|
||||
github.com/oarkflow/date v0.0.4/go.mod h1:xQTFc6p6O5VX6J75ZrPJbelIFGca1ASmhpgirFqL8vM=
|
||||
github.com/oarkflow/dipper v0.0.6 h1:E+ak9i4R1lxx0B04CjfG5DTLTmwuWA1nrdS6KIHdUxQ=
|
||||
@@ -10,7 +22,19 @@ github.com/oarkflow/json v0.0.13 h1:/ZKW924/v4U1ht34WY7rj/GC/qW9+10IiV5+MR2vO0A=
|
||||
github.com/oarkflow/json v0.0.13/go.mod h1:S5BZA4/rM87+MY8mFrga3jISzxCL9RtLE6xHSk63VxI=
|
||||
github.com/oarkflow/xid v1.2.5 h1:6RcNJm9+oZ/B647gkME9trCzhpxGQaSdNoD56Vmkeho=
|
||||
github.com/oarkflow/xid v1.2.5/go.mod h1:jG4YBh+swbjlWApGWDBYnsJEa7hi3CCpmuqhB3RAxVo=
|
||||
github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y=
|
||||
github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE=
|
||||
github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E=
|
||||
github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY=
|
||||
github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc=
|
||||
github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8=
|
||||
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
|
||||
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
|
||||
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c h1:7dEasQXItcW1xKJ2+gg5VOiBnqWrJc+rq0DPKyvvdbY=
|
||||
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c/go.mod h1:NQtJDoLvd6faHhE7m4T/1IY708gDefGGjR/iUW8yQQ8=
|
||||
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
|
||||
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ=
|
||||
golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
|
||||
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
|
||||
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
|
||||
|
34
metrics/metrics.go
Normal file
34
metrics/metrics.go
Normal file
@@ -0,0 +1,34 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
)
|
||||
|
||||
var (
|
||||
TasksProcessed = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "tasks_processed_total",
|
||||
Help: "Total number of processed tasks.",
|
||||
},
|
||||
[]string{"status"},
|
||||
)
|
||||
TasksErrors = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "tasks_errors_total",
|
||||
Help: "Total number of errors encountered while processing tasks.",
|
||||
},
|
||||
[]string{"node"},
|
||||
)
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(TasksProcessed)
|
||||
prometheus.MustRegister(TasksErrors)
|
||||
}
|
||||
|
||||
func HandleHTTP() {
|
||||
http.Handle("/metrics", promhttp.Handler())
|
||||
}
|
39
recover.go
Normal file
39
recover.go
Normal file
@@ -0,0 +1,39 @@
|
||||
package mq
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"runtime"
|
||||
"runtime/debug"
|
||||
|
||||
"github.com/oarkflow/mq/metrics"
|
||||
)
|
||||
|
||||
func RecoverPanic(labelGenerator func() string) {
|
||||
if r := recover(); r != nil {
|
||||
pc, file, line, ok := runtime.Caller(2)
|
||||
funcName := "unknown"
|
||||
if ok {
|
||||
fn := runtime.FuncForPC(pc)
|
||||
if fn != nil {
|
||||
funcName = fn.Name()
|
||||
}
|
||||
}
|
||||
log.Printf("[PANIC] - recovered from panic in %s (%s:%d): %v\nStack trace: %s", funcName, file, line, r, debug.Stack())
|
||||
label := labelGenerator()
|
||||
metrics.TasksErrors.WithLabelValues(label).Inc()
|
||||
}
|
||||
}
|
||||
|
||||
func RecoverTitle() string {
|
||||
pc, _, line, ok := runtime.Caller(1)
|
||||
if !ok {
|
||||
return "Unknown"
|
||||
}
|
||||
fn := runtime.FuncForPC(pc)
|
||||
funcName := "unknown"
|
||||
if fn != nil {
|
||||
funcName = fn.Name()
|
||||
}
|
||||
return fmt.Sprintf("%s:%d", funcName, line)
|
||||
}
|
Reference in New Issue
Block a user