mirror of
https://github.com/oarkflow/mq.git
synced 2025-10-21 19:19:22 +08:00
feat: add jsonparser
This commit is contained in:
@@ -3,6 +3,7 @@ package main
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
mq "github.com/oarkflow/mq/v2"
|
||||
)
|
||||
@@ -14,7 +15,7 @@ func main() {
|
||||
}
|
||||
publisher := mq.NewPublisher("publish-1")
|
||||
// publisher := mq.NewPublisher("publish-1", mq.WithTLS(true, "./certs/server.crt", "./certs/server.key"))
|
||||
err := publisher.Publish(context.Background(), "queue1", task)
|
||||
err := publisher.Publish(context.Background(), task, "queue1")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
@@ -23,9 +24,11 @@ func main() {
|
||||
task = mq.Task{
|
||||
Payload: payload,
|
||||
}
|
||||
result := publisher.Request(context.Background(), "queue1", task)
|
||||
if result.Error != nil {
|
||||
panic(result.Error)
|
||||
for i := 0; i < 100; i++ {
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
err := publisher.Publish(context.Background(), task, "queue1")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
fmt.Printf("Sync task published. Result: %v\n", string(result.Payload))
|
||||
}
|
||||
|
@@ -9,7 +9,7 @@ import (
|
||||
)
|
||||
|
||||
func Node1(ctx context.Context, task mq.Task) mq.Result {
|
||||
fmt.Println("Processing queue1")
|
||||
fmt.Println("Processing queue1", task.ID)
|
||||
return mq.Result{Payload: task.Payload, MessageID: task.ID}
|
||||
}
|
||||
|
||||
|
21
jsonparser/LICENSE
Normal file
21
jsonparser/LICENSE
Normal file
@@ -0,0 +1,21 @@
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2016 Leonid Bugaev
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
51
jsonparser/bytes.go
Normal file
51
jsonparser/bytes.go
Normal file
@@ -0,0 +1,51 @@
|
||||
package jsonparser
|
||||
|
||||
const absMinInt64 = 1 << 63
|
||||
const maxInt64 = 1<<63 - 1
|
||||
const maxUint64 = 1<<64 - 1
|
||||
|
||||
// About 2x faster then strconv.ParseInt because it only supports base 10, which is enough for JSON
|
||||
func parseInt(bytes []byte) (v int64, ok bool, overflow bool) {
|
||||
if len(bytes) == 0 {
|
||||
return 0, false, false
|
||||
}
|
||||
|
||||
var neg bool = false
|
||||
if bytes[0] == '-' {
|
||||
neg = true
|
||||
bytes = bytes[1:]
|
||||
}
|
||||
|
||||
var n uint64 = 0
|
||||
for idx, c := range bytes {
|
||||
if c < '0' || c > '9' {
|
||||
return 0, false, false
|
||||
}
|
||||
// Deal with invalid data such as "00"
|
||||
if n == 0 && idx != 0 {
|
||||
return 0, false, false
|
||||
}
|
||||
if n > maxUint64/10 {
|
||||
return 0, false, true
|
||||
}
|
||||
n *= 10
|
||||
n1 := n + uint64(c-'0')
|
||||
if n1 < n {
|
||||
return 0, false, true
|
||||
}
|
||||
n = n1
|
||||
}
|
||||
|
||||
if n > maxInt64 {
|
||||
if neg && n == absMinInt64 {
|
||||
return -absMinInt64, true, false
|
||||
}
|
||||
return 0, false, true
|
||||
}
|
||||
|
||||
if neg {
|
||||
return -int64(n), true, false
|
||||
} else {
|
||||
return int64(n), true, false
|
||||
}
|
||||
}
|
26
jsonparser/bytes_safe.go
Normal file
26
jsonparser/bytes_safe.go
Normal file
@@ -0,0 +1,26 @@
|
||||
//go:build appengine || appenginevm
|
||||
// +build appengine appenginevm
|
||||
|
||||
package jsonparser
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
)
|
||||
|
||||
// See fastbytes_unsafe.go for explanation on why *[]byte is used (signatures must be consistent with those in that file)
|
||||
|
||||
func equalStr(b *[]byte, s string) bool {
|
||||
return string(*b) == s
|
||||
}
|
||||
|
||||
func parseFloat(b *[]byte) (float64, error) {
|
||||
return strconv.ParseFloat(string(*b), 64)
|
||||
}
|
||||
|
||||
func bytesToString(b *[]byte) string {
|
||||
return string(*b)
|
||||
}
|
||||
|
||||
func StringToBytes(s string) []byte {
|
||||
return []byte(s)
|
||||
}
|
43
jsonparser/bytes_unsafe.go
Normal file
43
jsonparser/bytes_unsafe.go
Normal file
@@ -0,0 +1,43 @@
|
||||
//go:build !appengine && !appenginevm
|
||||
// +build !appengine,!appenginevm
|
||||
|
||||
package jsonparser
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
// The reason for using *[]byte rather than []byte in parameters is an optimization. As of Go 1.6,
|
||||
// the compiler cannot perfectly inline the function when using a non-pointer slice. That is,
|
||||
// the non-pointer []byte parameter version is slower than if its function body is manually
|
||||
// inlined, whereas the pointer []byte version is equally fast to the manually inlined
|
||||
// version. Instruction count in assembly taken from "go tool compile" confirms this difference.
|
||||
//
|
||||
// TODO: Remove hack after Go 1.7 release
|
||||
func equalStr(b *[]byte, s string) bool {
|
||||
return *(*string)(unsafe.Pointer(b)) == s
|
||||
}
|
||||
|
||||
func parseFloat(b *[]byte) (float64, error) {
|
||||
return strconv.ParseFloat(*(*string)(unsafe.Pointer(b)), 64)
|
||||
}
|
||||
|
||||
// A hack until issue golang/go#2632 is fixed.
|
||||
// See: https://github.com/golang/go/issues/2632
|
||||
func bytesToString(b *[]byte) string {
|
||||
return *(*string)(unsafe.Pointer(b))
|
||||
}
|
||||
|
||||
func StringToBytes(s string) []byte {
|
||||
b := make([]byte, 0, 0)
|
||||
bh := (*reflect.SliceHeader)(unsafe.Pointer(&b))
|
||||
sh := (*reflect.StringHeader)(unsafe.Pointer(&s))
|
||||
bh.Data = sh.Data
|
||||
bh.Cap = sh.Len
|
||||
bh.Len = sh.Len
|
||||
runtime.KeepAlive(s)
|
||||
return b
|
||||
}
|
178
jsonparser/escape.go
Normal file
178
jsonparser/escape.go
Normal file
@@ -0,0 +1,178 @@
|
||||
package jsonparser
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"unicode/utf8"
|
||||
)
|
||||
|
||||
// JSON Unicode stuff: see https://tools.ietf.org/html/rfc7159#section-7
|
||||
|
||||
const supplementalPlanesOffset = 0x10000
|
||||
const highSurrogateOffset = 0xD800
|
||||
const lowSurrogateOffset = 0xDC00
|
||||
|
||||
const basicMultilingualPlaneReservedOffset = 0xDFFF
|
||||
const basicMultilingualPlaneOffset = 0xFFFF
|
||||
|
||||
func combineUTF16Surrogates(high, low rune) rune {
|
||||
return supplementalPlanesOffset + (high-highSurrogateOffset)<<10 + (low - lowSurrogateOffset)
|
||||
}
|
||||
|
||||
const badHex = -1
|
||||
|
||||
func h2I(c byte) int {
|
||||
switch {
|
||||
case c >= '0' && c <= '9':
|
||||
return int(c - '0')
|
||||
case c >= 'A' && c <= 'F':
|
||||
return int(c - 'A' + 10)
|
||||
case c >= 'a' && c <= 'f':
|
||||
return int(c - 'a' + 10)
|
||||
}
|
||||
return badHex
|
||||
}
|
||||
|
||||
// decodeSingleUnicodeEscape decodes a single \uXXXX escape sequence. The prefix \u is assumed to be present and
|
||||
// is not checked.
|
||||
// In JSON, these escapes can either come alone or as part of "UTF16 surrogate pairs" that must be handled together.
|
||||
// This function only handles one; decodeUnicodeEscape handles this more complex case.
|
||||
func decodeSingleUnicodeEscape(in []byte) (rune, bool) {
|
||||
// We need at least 6 characters total
|
||||
if len(in) < 6 {
|
||||
return utf8.RuneError, false
|
||||
}
|
||||
|
||||
// Convert hex to decimal
|
||||
h1, h2, h3, h4 := h2I(in[2]), h2I(in[3]), h2I(in[4]), h2I(in[5])
|
||||
if h1 == badHex || h2 == badHex || h3 == badHex || h4 == badHex {
|
||||
return utf8.RuneError, false
|
||||
}
|
||||
|
||||
// Compose the hex digits
|
||||
return rune(h1<<12 + h2<<8 + h3<<4 + h4), true
|
||||
}
|
||||
|
||||
// isUTF16EncodedRune checks if a rune is in the range for non-BMP characters,
|
||||
// which is used to describe UTF16 chars.
|
||||
// Source: https://en.wikipedia.org/wiki/Plane_(Unicode)#Basic_Multilingual_Plane
|
||||
func isUTF16EncodedRune(r rune) bool {
|
||||
return highSurrogateOffset <= r && r <= basicMultilingualPlaneReservedOffset
|
||||
}
|
||||
|
||||
func decodeUnicodeEscape(in []byte) (rune, int) {
|
||||
if r, ok := decodeSingleUnicodeEscape(in); !ok {
|
||||
// Invalid Unicode escape
|
||||
return utf8.RuneError, -1
|
||||
} else if r <= basicMultilingualPlaneOffset && !isUTF16EncodedRune(r) {
|
||||
// Valid Unicode escape in Basic Multilingual Plane
|
||||
return r, 6
|
||||
} else if r2, ok := decodeSingleUnicodeEscape(in[6:]); !ok { // Note: previous decodeSingleUnicodeEscape success guarantees at least 6 bytes remain
|
||||
// UTF16 "high surrogate" without manditory valid following Unicode escape for the "low surrogate"
|
||||
return utf8.RuneError, -1
|
||||
} else if r2 < lowSurrogateOffset {
|
||||
// Invalid UTF16 "low surrogate"
|
||||
return utf8.RuneError, -1
|
||||
} else {
|
||||
// Valid UTF16 surrogate pair
|
||||
return combineUTF16Surrogates(r, r2), 12
|
||||
}
|
||||
}
|
||||
|
||||
// backslashCharEscapeTable: when '\X' is found for some byte X, it is to be replaced with backslashCharEscapeTable[X]
|
||||
var backslashCharEscapeTable = [...]byte{
|
||||
'"': '"',
|
||||
'\\': '\\',
|
||||
'/': '/',
|
||||
'b': '\b',
|
||||
'f': '\f',
|
||||
'n': '\n',
|
||||
'r': '\r',
|
||||
't': '\t',
|
||||
}
|
||||
|
||||
// unescapeToUTF8 unescapes the single escape sequence starting at 'in' into 'out' and returns
|
||||
// how many characters were consumed from 'in' and emitted into 'out'.
|
||||
// If a valid escape sequence does not appear as a prefix of 'in', (-1, -1) to signal the error.
|
||||
func unescapeToUTF8(in, out []byte) (inLen int, outLen int) {
|
||||
if len(in) < 2 || in[0] != '\\' {
|
||||
// Invalid escape due to insufficient characters for any escape or no initial backslash
|
||||
return -1, -1
|
||||
}
|
||||
|
||||
// https://tools.ietf.org/html/rfc7159#section-7
|
||||
switch e := in[1]; e {
|
||||
case '"', '\\', '/', 'b', 'f', 'n', 'r', 't':
|
||||
// Valid basic 2-character escapes (use lookup table)
|
||||
out[0] = backslashCharEscapeTable[e]
|
||||
return 2, 1
|
||||
case 'u':
|
||||
// Unicode escape
|
||||
if r, inLen := decodeUnicodeEscape(in); inLen == -1 {
|
||||
// Invalid Unicode escape
|
||||
return -1, -1
|
||||
} else {
|
||||
// Valid Unicode escape; re-encode as UTF8
|
||||
outLen := utf8.EncodeRune(out, r)
|
||||
return inLen, outLen
|
||||
}
|
||||
}
|
||||
|
||||
return -1, -1
|
||||
}
|
||||
|
||||
// unescape unescapes the string contained in 'in' and returns it as a slice.
|
||||
// If 'in' contains no escaped characters:
|
||||
//
|
||||
// Returns 'in'.
|
||||
//
|
||||
// Else, if 'out' is of sufficient capacity (guaranteed if cap(out) >= len(in)):
|
||||
//
|
||||
// 'out' is used to build the unescaped string and is returned with no extra allocation
|
||||
//
|
||||
// Else:
|
||||
//
|
||||
// A new slice is allocated and returned.
|
||||
func Unescape(in, out []byte) ([]byte, error) {
|
||||
firstBackslash := bytes.IndexByte(in, '\\')
|
||||
if firstBackslash == -1 {
|
||||
return in, nil
|
||||
}
|
||||
|
||||
// Get a buffer of sufficient size (allocate if needed)
|
||||
if cap(out) < len(in) {
|
||||
out = make([]byte, len(in))
|
||||
} else {
|
||||
out = out[0:len(in)]
|
||||
}
|
||||
|
||||
// Copy the first sequence of unescaped bytes to the output and obtain a buffer pointer (subslice)
|
||||
copy(out, in[:firstBackslash])
|
||||
in = in[firstBackslash:]
|
||||
buf := out[firstBackslash:]
|
||||
|
||||
for len(in) > 0 {
|
||||
// Unescape the next escaped character
|
||||
inLen, bufLen := unescapeToUTF8(in, buf)
|
||||
if inLen == -1 {
|
||||
return nil, MalformedStringEscapeError
|
||||
}
|
||||
|
||||
in = in[inLen:]
|
||||
buf = buf[bufLen:]
|
||||
|
||||
// Copy everything up until the next backslash
|
||||
nextBackslash := bytes.IndexByte(in, '\\')
|
||||
if nextBackslash == -1 {
|
||||
copy(buf, in)
|
||||
buf = buf[len(in):]
|
||||
break
|
||||
} else {
|
||||
copy(buf, in[:nextBackslash])
|
||||
buf = buf[nextBackslash:]
|
||||
in = in[nextBackslash:]
|
||||
}
|
||||
}
|
||||
|
||||
// Trim the out buffer to the amount that was actually emitted
|
||||
return out[:len(out)-len(buf)], nil
|
||||
}
|
117
jsonparser/fuzz.go
Normal file
117
jsonparser/fuzz.go
Normal file
@@ -0,0 +1,117 @@
|
||||
package jsonparser
|
||||
|
||||
func FuzzParseString(data []byte) int {
|
||||
r, err := ParseString(data)
|
||||
if err != nil || r == "" {
|
||||
return 0
|
||||
}
|
||||
return 1
|
||||
}
|
||||
|
||||
func FuzzEachKey(data []byte) int {
|
||||
paths := [][]string{
|
||||
{"name"},
|
||||
{"order"},
|
||||
{"nested", "a"},
|
||||
{"nested", "b"},
|
||||
{"nested2", "a"},
|
||||
{"nested", "nested3", "b"},
|
||||
{"arr", "[1]", "b"},
|
||||
{"arrInt", "[3]"},
|
||||
{"arrInt", "[5]"},
|
||||
{"nested"},
|
||||
{"arr", "["},
|
||||
{"a\n", "b\n"},
|
||||
}
|
||||
EachKey(data, func(idx int, value []byte, vt ValueType, err error) {}, paths...)
|
||||
return 1
|
||||
}
|
||||
|
||||
func FuzzDelete(data []byte) int {
|
||||
Delete(data, "test")
|
||||
return 1
|
||||
}
|
||||
|
||||
func FuzzSet(data []byte) int {
|
||||
_, err := Set(data, []byte(`"new value"`), "test")
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
return 1
|
||||
}
|
||||
|
||||
func FuzzObjectEach(data []byte) int {
|
||||
_ = ObjectEach(data, func(key, value []byte, valueType ValueType, off int) error {
|
||||
return nil
|
||||
})
|
||||
return 1
|
||||
}
|
||||
|
||||
func FuzzParseFloat(data []byte) int {
|
||||
_, err := ParseFloat(data)
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
return 1
|
||||
}
|
||||
|
||||
func FuzzParseInt(data []byte) int {
|
||||
_, err := ParseInt(data)
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
return 1
|
||||
}
|
||||
|
||||
func FuzzParseBool(data []byte) int {
|
||||
_, err := ParseBoolean(data)
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
return 1
|
||||
}
|
||||
|
||||
func FuzzTokenStart(data []byte) int {
|
||||
_ = tokenStart(data)
|
||||
return 1
|
||||
}
|
||||
|
||||
func FuzzGetString(data []byte) int {
|
||||
_, err := GetString(data, "test")
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
return 1
|
||||
}
|
||||
|
||||
func FuzzGetFloat(data []byte) int {
|
||||
_, err := GetFloat(data, "test")
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
return 1
|
||||
}
|
||||
|
||||
func FuzzGetInt(data []byte) int {
|
||||
_, err := GetInt(data, "test")
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
return 1
|
||||
}
|
||||
|
||||
func FuzzGetBoolean(data []byte) int {
|
||||
_, err := GetBoolean(data, "test")
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
return 1
|
||||
}
|
||||
|
||||
func FuzzGetUnsafeString(data []byte) int {
|
||||
_, err := GetUnsafeString(data, "test")
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
return 1
|
||||
}
|
1315
jsonparser/parser.go
Normal file
1315
jsonparser/parser.go
Normal file
File diff suppressed because it is too large
Load Diff
77
v2/broker.go
77
v2/broker.go
@@ -7,13 +7,21 @@ import (
|
||||
"log"
|
||||
"net"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/oarkflow/xsync"
|
||||
|
||||
"github.com/oarkflow/mq/codec"
|
||||
"github.com/oarkflow/mq/consts"
|
||||
"github.com/oarkflow/mq/jsonparser"
|
||||
"github.com/oarkflow/mq/utils"
|
||||
)
|
||||
|
||||
type QueuedTask struct {
|
||||
Message *codec.Message
|
||||
RetryCount int
|
||||
}
|
||||
|
||||
type consumer struct {
|
||||
id string
|
||||
conn net.Conn
|
||||
@@ -82,7 +90,8 @@ func (b *Broker) OnMessage(ctx context.Context, msg *codec.Message, conn net.Con
|
||||
|
||||
func (b *Broker) MessageAck(ctx context.Context, msg *codec.Message) {
|
||||
consumerID, _ := GetConsumerID(ctx)
|
||||
log.Printf("BROKER - MESSAGE_ACK ~> %s on %s", consumerID, msg.Queue)
|
||||
taskID, _ := jsonparser.GetString(msg.Payload, "id")
|
||||
log.Printf("BROKER - MESSAGE_ACK ~> %s on %s for Task %s", consumerID, msg.Queue, taskID)
|
||||
}
|
||||
|
||||
func (b *Broker) MessageResponseHandler(ctx context.Context, msg *codec.Message) {
|
||||
@@ -110,10 +119,19 @@ func (b *Broker) MessageResponseHandler(ctx context.Context, msg *codec.Message)
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Broker) Publish(ctx context.Context, task Task, queue string) error {
|
||||
headers, _ := GetHeaders(ctx)
|
||||
msg := codec.NewMessage(consts.PUBLISH, task.Payload, queue, headers)
|
||||
b.broadcastToConsumers(ctx, msg)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *Broker) PublishHandler(ctx context.Context, conn net.Conn, msg *codec.Message) {
|
||||
pub := b.addPublisher(ctx, msg.Queue, conn)
|
||||
log.Printf("BROKER - PUBLISH ~> received from %s on %s", pub.id, msg.Queue)
|
||||
ack := codec.NewMessage(consts.PUBLISH_ACK, nil, msg.Queue, msg.Headers)
|
||||
taskID, _ := jsonparser.GetString(msg.Payload, "id")
|
||||
log.Printf("BROKER - PUBLISH ~> received from %s on %s for Task %s", pub.id, msg.Queue, taskID)
|
||||
|
||||
ack := codec.NewMessage(consts.PUBLISH_ACK, []byte(fmt.Sprintf(`{"id":"%s"}`, taskID)), msg.Queue, msg.Headers)
|
||||
if err := b.send(conn, ack); err != nil {
|
||||
log.Printf("Error sending PUBLISH_ACK: %v\n", err)
|
||||
}
|
||||
@@ -193,13 +211,9 @@ func (b *Broker) receive(c net.Conn) (*codec.Message, error) {
|
||||
|
||||
func (b *Broker) broadcastToConsumers(ctx context.Context, msg *codec.Message) {
|
||||
if queue, ok := b.queues.Get(msg.Queue); ok {
|
||||
queue.consumers.ForEach(func(_ string, con *consumer) bool {
|
||||
msg.Command = consts.MESSAGE_SEND
|
||||
if err := b.send(con.conn, msg); err != nil {
|
||||
log.Printf("Error sending Message: %v\n", err)
|
||||
}
|
||||
return true
|
||||
})
|
||||
task := &QueuedTask{Message: msg, RetryCount: 0}
|
||||
queue.tasks <- task
|
||||
log.Printf("Task enqueued for queue %s", msg.Queue)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -264,3 +278,46 @@ func (b *Broker) readMessage(ctx context.Context, c net.Conn) error {
|
||||
b.OnError(ctx, c, err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (b *Broker) dispatchWorker(queue *Queue) {
|
||||
delay := b.opts.initialDelay
|
||||
for task := range queue.tasks {
|
||||
success := false
|
||||
for !success && task.RetryCount <= b.opts.maxRetries {
|
||||
if b.dispatchTaskToConsumer(queue, task) {
|
||||
success = true
|
||||
} else {
|
||||
task.RetryCount++
|
||||
delay = b.backoffRetry(queue, task, delay)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Broker) dispatchTaskToConsumer(queue *Queue, task *QueuedTask) bool {
|
||||
var consumerFound bool
|
||||
queue.consumers.ForEach(func(_ string, con *consumer) bool {
|
||||
if err := b.send(con.conn, task.Message); err == nil {
|
||||
consumerFound = true
|
||||
log.Printf("Task dispatched to consumer %s on queue %s", con.id, queue.name)
|
||||
return false // break the loop once a consumer is found
|
||||
}
|
||||
return true
|
||||
})
|
||||
if !consumerFound {
|
||||
log.Printf("No available consumers for queue %s, retrying...", queue.name)
|
||||
}
|
||||
return consumerFound
|
||||
}
|
||||
|
||||
func (b *Broker) backoffRetry(queue *Queue, task *QueuedTask, delay time.Duration) time.Duration {
|
||||
backoffDuration := utils.CalculateJitter(delay, b.opts.jitterPercent)
|
||||
log.Printf("Backing off for %v before retrying task for queue %s", backoffDuration, task.Message.Queue)
|
||||
time.Sleep(backoffDuration)
|
||||
queue.tasks <- task
|
||||
delay *= 2
|
||||
if delay > b.opts.maxBackoff {
|
||||
delay = b.opts.maxBackoff
|
||||
}
|
||||
return delay
|
||||
}
|
||||
|
@@ -13,6 +13,7 @@ import (
|
||||
|
||||
"github.com/oarkflow/mq/codec"
|
||||
"github.com/oarkflow/mq/consts"
|
||||
"github.com/oarkflow/mq/jsonparser"
|
||||
"github.com/oarkflow/mq/utils"
|
||||
)
|
||||
|
||||
@@ -76,7 +77,8 @@ func (c *Consumer) OnMessage(ctx context.Context, msg *codec.Message, conn net.C
|
||||
consts.ConsumerKey: c.id,
|
||||
consts.ContentType: consts.TypeJson,
|
||||
})
|
||||
reply := codec.NewMessage(consts.MESSAGE_ACK, nil, msg.Queue, headers)
|
||||
taskID, _ := jsonparser.GetString(msg.Payload, "id")
|
||||
reply := codec.NewMessage(consts.MESSAGE_ACK, []byte(fmt.Sprintf(`{"id":"%s"}`, taskID)), msg.Queue, headers)
|
||||
if err := c.send(conn, reply); err != nil {
|
||||
fmt.Printf("failed to send MESSAGE_ACK for queue %s: %v", msg.Queue, err)
|
||||
}
|
||||
|
@@ -33,6 +33,7 @@ type Options struct {
|
||||
aesKey json.RawMessage
|
||||
hmacKey json.RawMessage
|
||||
enableEncryption bool
|
||||
queueSize int
|
||||
}
|
||||
|
||||
func defaultOptions() Options {
|
||||
@@ -43,6 +44,7 @@ func defaultOptions() Options {
|
||||
initialDelay: 2 * time.Second,
|
||||
maxBackoff: 20 * time.Second,
|
||||
jitterPercent: 0.5,
|
||||
queueSize: 100,
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -10,6 +10,7 @@ import (
|
||||
|
||||
"github.com/oarkflow/mq/codec"
|
||||
"github.com/oarkflow/mq/consts"
|
||||
"github.com/oarkflow/mq/jsonparser"
|
||||
)
|
||||
|
||||
type Publisher struct {
|
||||
@@ -49,7 +50,8 @@ func (p *Publisher) waitForAck(conn net.Conn) error {
|
||||
return err
|
||||
}
|
||||
if msg.Command == consts.PUBLISH_ACK {
|
||||
log.Printf("PUBLISHER - PUBLISH_ACK ~> from %s on %s", p.id, msg.Queue)
|
||||
taskID, _ := jsonparser.GetString(msg.Payload, "id")
|
||||
log.Printf("PUBLISHER - PUBLISH_ACK ~> from %s on %s for Task %s", p.id, msg.Queue, taskID)
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("expected PUBLISH_ACK, got: %v", msg.Command)
|
||||
@@ -69,7 +71,7 @@ func (p *Publisher) waitForResponse(conn net.Conn) Result {
|
||||
return Result{Error: err}
|
||||
}
|
||||
|
||||
func (p *Publisher) Publish(ctx context.Context, queue string, task Task) error {
|
||||
func (p *Publisher) Publish(ctx context.Context, task Task, queue string) error {
|
||||
conn, err := GetConnection(p.opts.brokerAddr, p.opts.tlsConfig)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to connect to broker: %w", err)
|
||||
|
@@ -7,12 +7,14 @@ import (
|
||||
type Queue struct {
|
||||
name string
|
||||
consumers xsync.IMap[string, *consumer]
|
||||
tasks chan *QueuedTask // channel to hold tasks
|
||||
}
|
||||
|
||||
func newQueue(name string) *Queue {
|
||||
func newQueue(name string, queueSize int) *Queue {
|
||||
return &Queue{
|
||||
name: name,
|
||||
consumers: xsync.NewMap[string, *consumer](),
|
||||
tasks: make(chan *QueuedTask, queueSize), // buffer size for tasks
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,7 +23,8 @@ func (b *Broker) NewQueue(qName string) *Queue {
|
||||
if ok {
|
||||
return q
|
||||
}
|
||||
q = newQueue(qName)
|
||||
q = newQueue(qName, b.opts.queueSize)
|
||||
b.queues.Set(qName, q)
|
||||
go b.dispatchWorker(q)
|
||||
return q
|
||||
}
|
||||
|
Reference in New Issue
Block a user