Files
mq/publisher.go
2024-09-27 15:45:48 +05:45

68 lines
1.5 KiB
Go

package mq
import (
"context"
"fmt"
"net"
)
type Publisher struct {
id string
brokerAddr string
}
func NewPublisher(id, brokerAddr string) *Publisher {
return &Publisher{brokerAddr: brokerAddr, id: id}
}
func (p *Publisher) Publish(ctx context.Context, queue string, task Task) error {
conn, err := net.Dial("tcp", p.brokerAddr)
if err != nil {
return fmt.Errorf("failed to connect to broker: %w", err)
}
defer conn.Close()
ctx = SetHeaders(ctx, map[string]string{
PublisherKey: p.id,
ContentType: TypeJson,
})
cmd := Command{
ID: NewID(),
Command: PUBLISH,
Queue: queue,
MessageID: task.ID,
Payload: task.Payload,
}
// Fire and forget: No need to wait for response
return Write(ctx, conn, cmd)
}
func (p *Publisher) Request(ctx context.Context, queue string, task Task) (Result, error) {
conn, err := net.Dial("tcp", p.brokerAddr)
if err != nil {
return Result{}, fmt.Errorf("failed to connect to broker: %w", err)
}
defer conn.Close()
ctx = SetHeaders(ctx, map[string]string{
PublisherKey: p.id,
ContentType: TypeJson,
})
cmd := Command{
ID: NewID(),
Command: REQUEST,
Queue: queue,
MessageID: task.ID,
Payload: task.Payload,
}
var result Result
err = Write(ctx, conn, cmd)
if err != nil {
return result, err
}
ReadFromConn(ctx, conn, func(ctx context.Context, conn net.Conn, bytes []byte) error {
fmt.Println(string(bytes), "Here")
return conn.Close()
})
return result, nil
}