feat: add example

This commit is contained in:
sujit
2024-10-10 17:37:29 +05:45
parent 48fdb7d67c
commit ae356186d0

View File

@@ -76,6 +76,7 @@ func (c *Consumer) OnMessage(ctx context.Context, msg *codec.Message, conn net.C
headers := WithHeaders(ctx, map[string]string{
consts.ConsumerKey: c.id,
consts.ContentType: consts.TypeJson,
consts.QueueKey: msg.Queue,
})
taskID, _ := jsonparser.GetString(msg.Payload, "id")
reply := codec.NewMessage(consts.MESSAGE_ACK, []byte(fmt.Sprintf(`{"id":"%s"}`, taskID)), msg.Queue, headers)
@@ -85,13 +86,27 @@ func (c *Consumer) OnMessage(ctx context.Context, msg *codec.Message, conn net.C
var task Task
err := json.Unmarshal(msg.Payload, &task)
if err != nil {
log.Println("Error unmarshalling message:", err)
log.Printf("Error unmarshalling message: %v", err)
return
}
ctx = SetHeaders(ctx, map[string]string{consts.QueueKey: msg.Queue})
result := c.ProcessTask(ctx, &task)
result.TaskID = task.ID
result.Topic = msg.Queue
result.TaskID = taskID
err = c.MessageResponseCallback(ctx, result)
if err != nil {
log.Printf("Error on message callback: %v", err)
}
}
// MessageResponseCallback sends the result back to the broker.
func (c *Consumer) MessageResponseCallback(ctx context.Context, result Result) error {
headers := WithHeaders(ctx, map[string]string{
consts.ConsumerKey: c.id,
consts.QueueKey: result.Topic,
consts.ContentType: consts.TypeJson,
})
if result.Status == "" {
if result.Error != nil {
result.Status = "FAILED"
@@ -100,10 +115,11 @@ func (c *Consumer) OnMessage(ctx context.Context, msg *codec.Message, conn net.C
}
}
bt, _ := json.Marshal(result)
reply = codec.NewMessage(consts.MESSAGE_RESPONSE, bt, msg.Queue, headers)
if err := c.send(conn, reply); err != nil {
fmt.Printf("failed to send MESSAGE_RESPONSE for queue %s: %v", msg.Queue, err)
reply := codec.NewMessage(consts.MESSAGE_RESPONSE, bt, result.Topic, headers)
if err := codec.SendMessage(c.conn, reply, c.opts.aesKey, c.opts.hmacKey, c.opts.enableEncryption); err != nil {
return fmt.Errorf("failed to send MESSAGE_RESPONSE: %v", err)
}
return nil
}
// ProcessTask handles a received task message and invokes the appropriate handler.