From 0205cf35ef0cc2fdb1baeb9891d3564e83e90b7f Mon Sep 17 00:00:00 2001 From: sujit Date: Sat, 29 Mar 2025 19:06:28 +0545 Subject: [PATCH] feat: sig --- mq.go | 7 ++----- options.go | 26 ++++++++++++++++++++++++++ 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/mq.go b/mq.go index 80d7f61..443cc00 100644 --- a/mq.go +++ b/mq.go @@ -125,16 +125,13 @@ type RateLimiter struct { C chan struct{} } +// Modified RateLimiter: use blocking send to avoid discarding tokens. func NewRateLimiter(rate int, burst int) *RateLimiter { rl := &RateLimiter{C: make(chan struct{}, burst)} ticker := time.NewTicker(time.Second / time.Duration(rate)) go func() { for range ticker.C { - select { - case rl.C <- struct{}{}: - default: - // bucket full; token discarded - } + rl.C <- struct{}{} // blocking send; tokens queue for deferred task processing } }() return rl diff --git a/options.go b/options.go index 9959484..8e1dbd5 100644 --- a/options.go +++ b/options.go @@ -130,6 +130,8 @@ func defaultOptions() *Options { maxMemoryLoad: 5000000, storage: NewMemoryTaskStorage(10 * time.Minute), logger: logger.NewDefaultLogger(), + BrokerRateLimiter: NewRateLimiter(10, 5), + ConsumerRateLimiter: NewRateLimiter(10, 5), } } @@ -254,3 +256,27 @@ func WithJitterPercent(val float64) Option { opts.jitterPercent = val } } + +func WithBrokerRateLimiter(rate int, burst int) Option { + return func(opts *Options) { + opts.BrokerRateLimiter = NewRateLimiter(rate, burst) + } +} + +func WithConsumerRateLimiter(rate int, burst int) Option { + return func(opts *Options) { + opts.ConsumerRateLimiter = NewRateLimiter(rate, burst) + } +} + +func DisableBrokerRateLimit() Option { + return func(opts *Options) { + opts.BrokerRateLimiter = nil + } +} + +func DisableConsumerRateLimit() Option { + return func(opts *Options) { + opts.ConsumerRateLimiter = nil + } +}