Files
streamctl/pkg/mainprocess/client.go
2025-06-22 14:59:00 +01:00

128 lines
2.8 KiB
Go

package mainprocess
import (
"context"
"encoding/gob"
"fmt"
"net"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/observability"
"github.com/xaionaro-go/xsync"
)
type Client struct {
WriteLocker xsync.Mutex
Conn net.Conn
Password string
}
func NewClient(
myName ProcessName,
addr string,
password string,
) (*Client, error) {
conn, err := net.Dial("tcp", addr)
if err != nil {
return nil, fmt.Errorf("unable to connect to '%s': %w", addr, err)
}
logger.Default().Tracef("connected to '%s' from '%s'", conn.RemoteAddr(), conn.LocalAddr())
conn.(*net.TCPConn).SetNoDelay(true)
msg := RegistrationMessage{
Password: password,
Source: myName,
}
encoder := gob.NewEncoder(conn)
if err := encoder.Encode(msg); err != nil {
return nil, fmt.Errorf("unable to encode&send the registration message %#+v: %w", msg, err)
}
var regResult RegistrationResult
decoder := gob.NewDecoder(conn)
if err := decoder.Decode(&regResult); err != nil {
return nil, fmt.Errorf("unable to decode&receive the registration result: %w", err)
}
if regResult.Error != "" {
return nil, fmt.Errorf("registration error: %s", regResult.Error)
}
logger.Default().Tracef("successfully registered the process '%s'", myName)
return &Client{
Conn: conn,
Password: password,
}, nil
}
func (c *Client) SendMessage(
ctx context.Context,
dst ProcessName,
content any,
) error {
logger.Debugf(ctx, "SendMessage(ctx, '%s', %T)", dst, content)
defer logger.Debugf(ctx, "/SendMessage(ctx, '%s', %T)", dst, content)
encoder := gob.NewEncoder(c.Conn)
msg := MessageToMain{
Password: c.Password,
Destination: dst,
Content: content,
}
err := xsync.DoR1(ctx, &c.WriteLocker, func() error {
return encoder.Encode(msg)
})
logger.Tracef(ctx, "sending message %#+v: %v", msg, err)
if err != nil {
return fmt.Errorf("unable to encode&send message %#+v: %w", msg, err)
}
return nil
}
func (c *Client) Close() error {
return c.Conn.Close()
}
func (c *Client) Serve(
ctx context.Context,
onReceivedMessage OnReceivedMessageFunc,
) error {
ctx, cancelFn := context.WithCancel(ctx)
defer cancelFn()
observability.Go(ctx, func(ctx context.Context) {
<-ctx.Done()
err := c.Close()
if err != nil {
logger.Error(ctx, err)
}
})
for {
select {
case <-ctx.Done():
return nil
default:
}
if err := c.ReadOne(ctx, onReceivedMessage); err != nil {
return err
}
}
}
func (c *Client) ReadOne(
ctx context.Context,
onReceivedMessage OnReceivedMessageFunc,
) error {
var msg MessageFromMain
decoder := gob.NewDecoder(c.Conn)
err := decoder.Decode(&msg)
if err != nil {
return fmt.Errorf("unable to receive&decode message: %w", err)
}
if err := onReceivedMessage(ctx, msg.Source, msg.Content); err != nil {
return fmt.Errorf("unable to process the message '%#+v': %w", msg, err)
}
return nil
}