feat: sig

This commit is contained in:
sujit
2025-03-29 23:03:27 +05:45
parent b205d29bb5
commit ffaa1a137b
4 changed files with 108 additions and 2 deletions

View File

@@ -22,10 +22,12 @@ const (
CONSUMER_PAUSE
CONSUMER_RESUME
CONSUMER_STOP
CONSUMER_UPDATE
CONSUMER_PAUSED
CONSUMER_RESUMED
CONSUMER_STOPPED
CONSUMER_UPDATED
)
type ConsumerState byte
@@ -62,6 +64,8 @@ func (c CMD) String() string {
return "CONSUMER_PAUSE"
case CONSUMER_RESUME:
return "CONSUMER_RESUME"
case CONSUMER_UPDATE:
return "CONSUMER_UPDATE"
case CONSUMER_STOP:
return "CONSUMER_STOP"
case CONSUMER_PAUSED:
@@ -70,6 +74,8 @@ func (c CMD) String() string {
return "CONSUMER_RESUMED"
case CONSUMER_STOPPED:
return "CONSUMER_STOPPED"
case CONSUMER_UPDATED:
return "CONSUMER_UPDATED"
case RESPONSE:
return "RESPONSE"
default:

View File

@@ -118,6 +118,12 @@ func (c *Consumer) OnMessage(ctx context.Context, msg *codec.Message, conn net.C
log.Printf("Unable to stop consumer: %v", err)
}
return err
case consts.CONSUMER_UPDATE:
err := c.Update(ctx, msg.Payload)
if err != nil {
log.Printf("Unable to update consumer: %v", err)
}
return err
default:
log.Printf("CONSUMER - UNKNOWN_COMMAND ~> %s on %s", msg.Command, msg.Queue)
}
@@ -295,6 +301,21 @@ func (c *Consumer) Pause(ctx context.Context) error {
return c.operate(ctx, consts.CONSUMER_PAUSED, c.pool.Pause)
}
func (c *Consumer) Update(ctx context.Context, payload []byte) error {
var newConfig DynamicConfig
if err := json.Unmarshal(payload, &newConfig); err != nil {
log.Printf("Invalid payload for CONSUMER_UPDATE: %v", err)
return err
}
if c.pool != nil {
if err := c.pool.UpdateConfig(&newConfig); err != nil {
log.Printf("Failed to update pool config: %v", err)
return err
}
}
return c.sendOpsMessage(ctx, consts.CONSUMER_UPDATED)
}
func (c *Consumer) Resume(ctx context.Context) error {
return c.operate(ctx, consts.CONSUMER_RESUMED, c.pool.Resume)
}
@@ -304,10 +325,10 @@ func (c *Consumer) Stop(ctx context.Context) error {
}
func (c *Consumer) operate(ctx context.Context, cmd consts.CMD, poolOperation func()) error {
poolOperation()
if err := c.sendOpsMessage(ctx, cmd); err != nil {
return err
}
poolOperation()
return nil
}

55
mq.go
View File

@@ -321,6 +321,8 @@ func (b *Broker) OnMessage(ctx context.Context, msg *codec.Message, conn net.Con
b.OnConsumerResume(ctx, msg)
case consts.CONSUMER_STOPPED:
b.OnConsumerStop(ctx, msg)
case consts.CONSUMER_UPDATED:
b.OnConsumerUpdated(ctx, msg)
default:
log.Printf("BROKER - UNKNOWN_COMMAND ~> %s on %s", msg.Command, msg.Queue)
}
@@ -374,6 +376,22 @@ func (b *Broker) OnConsumerStop(ctx context.Context, _ *codec.Message) {
}
}
func (b *Broker) OnConsumerUpdated(ctx context.Context, msg *codec.Message) {
consumerID, _ := GetConsumerID(ctx)
if consumerID != "" {
log.Printf("BROKER - CONSUMER ~> Updated %s", consumerID)
if b.opts.notifyResponse != nil {
result := Result{
Status: "CONSUMER UPDATED",
TaskID: consumerID,
Ctx: ctx,
Payload: msg.Payload,
}
_ = b.opts.notifyResponse(ctx, result)
}
}
}
func (b *Broker) OnConsumerResume(ctx context.Context, _ *codec.Message) {
consumerID, _ := GetConsumerID(ctx)
if consumerID != "" {
@@ -582,7 +600,10 @@ func (b *Broker) RemoveConsumer(consumerID string, queues ...string) {
})
}
func (b *Broker) handleConsumer(ctx context.Context, cmd consts.CMD, state consts.ConsumerState, consumerID string, payload []byte, queues ...string) {
func (b *Broker) handleConsumer(
ctx context.Context, cmd consts.CMD, state consts.ConsumerState,
consumerID string, payload []byte, queues ...string,
) {
fn := func(queue *Queue) {
con, ok := queue.consumers.Get(consumerID)
if ok {
@@ -607,6 +628,38 @@ func (b *Broker) handleConsumer(ctx context.Context, cmd consts.CMD, state const
})
}
func (b *Broker) UpdateConsumer(ctx context.Context, consumerID string, config DynamicConfig, queues ...string) error {
var err error
payload, _ := json.Marshal(config)
fn := func(queue *Queue) error {
con, ok := queue.consumers.Get(consumerID)
if ok {
ack := codec.NewMessage(consts.CONSUMER_UPDATE, payload, queue.name, map[string]string{consts.ConsumerKey: consumerID})
return b.send(ctx, con.conn, ack)
}
return nil
}
if len(queues) > 0 {
for _, queueName := range queues {
if queue, ok := b.queues.Get(queueName); ok {
err = fn(queue)
if err != nil {
return err
}
}
}
return nil
}
b.queues.ForEach(func(queueName string, queue *Queue) bool {
err = fn(queue)
if err != nil {
return false
}
return true
})
return nil
}
func (b *Broker) PauseConsumer(ctx context.Context, consumerID string, queues ...string) {
b.handleConsumer(ctx, consts.CONSUMER_PAUSE, consts.ConsumerStatePaused, consumerID, utils.ToByte("{}"), queues...)
}

26
pool.go
View File

@@ -112,6 +112,7 @@ type DynamicConfig struct {
MaxRetries int
ReloadInterval time.Duration
WarningThreshold WarningThresholds
NumberOfWorkers int // <-- new field for worker count
}
var Config = &DynamicConfig{
@@ -126,6 +127,7 @@ var Config = &DynamicConfig{
HighMemory: 1 * 1024 * 1024,
LongExecution: 2 * time.Second,
},
NumberOfWorkers: 5, // <-- default worker count
}
type Pool struct {
@@ -591,3 +593,27 @@ func (wp *Pool) startHealthServer() {
}
}()
}
// New method to update pool configuration via POOL_UPDATE command.
func (wp *Pool) UpdateConfig(newConfig *DynamicConfig) error {
if err := validateDynamicConfig(newConfig); err != nil {
return err
}
wp.timeout = newConfig.Timeout
wp.batchSize = newConfig.BatchSize
wp.maxMemoryLoad = newConfig.MaxMemoryLoad
wp.idleTimeout = newConfig.IdleTimeout
wp.backoffDuration = newConfig.BackoffDuration
wp.maxRetries = newConfig.MaxRetries
wp.thresholds = ThresholdConfig{
HighMemory: newConfig.WarningThreshold.HighMemory,
LongExecution: newConfig.WarningThreshold.LongExecution,
}
newWorkerCount := newConfig.NumberOfWorkers
currentWorkers := int(atomic.LoadInt32(&wp.numOfWorkers))
if newWorkerCount != currentWorkers && newWorkerCount > 0 {
wp.adjustWorkers(newWorkerCount)
}
wp.logger.Info().Msg("Pool configuration updated via POOL_UPDATE")
return nil
}