mirror of
https://github.com/xaionaro-go/streamctl.git
synced 2025-10-05 15:37:00 +08:00
128 lines
2.8 KiB
Go
128 lines
2.8 KiB
Go
package mainprocess
|
|
|
|
import (
|
|
"context"
|
|
"encoding/gob"
|
|
"fmt"
|
|
"net"
|
|
|
|
"github.com/facebookincubator/go-belt/tool/logger"
|
|
"github.com/xaionaro-go/observability"
|
|
"github.com/xaionaro-go/xsync"
|
|
)
|
|
|
|
type Client struct {
|
|
WriteLocker xsync.Mutex
|
|
Conn net.Conn
|
|
Password string
|
|
}
|
|
|
|
func NewClient(
|
|
myName ProcessName,
|
|
addr string,
|
|
password string,
|
|
) (*Client, error) {
|
|
conn, err := net.Dial("tcp", addr)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to connect to '%s': %w", addr, err)
|
|
}
|
|
logger.Default().Tracef("connected to '%s' from '%s'", conn.RemoteAddr(), conn.LocalAddr())
|
|
conn.(*net.TCPConn).SetNoDelay(true)
|
|
|
|
msg := RegistrationMessage{
|
|
Password: password,
|
|
Source: myName,
|
|
}
|
|
encoder := gob.NewEncoder(conn)
|
|
if err := encoder.Encode(msg); err != nil {
|
|
return nil, fmt.Errorf("unable to encode&send the registration message %#+v: %w", msg, err)
|
|
}
|
|
|
|
var regResult RegistrationResult
|
|
decoder := gob.NewDecoder(conn)
|
|
if err := decoder.Decode(®Result); err != nil {
|
|
return nil, fmt.Errorf("unable to decode&receive the registration result: %w", err)
|
|
}
|
|
if regResult.Error != "" {
|
|
return nil, fmt.Errorf("registration error: %s", regResult.Error)
|
|
}
|
|
logger.Default().Tracef("successfully registered the process '%s'", myName)
|
|
|
|
return &Client{
|
|
Conn: conn,
|
|
Password: password,
|
|
}, nil
|
|
}
|
|
|
|
func (c *Client) SendMessage(
|
|
ctx context.Context,
|
|
dst ProcessName,
|
|
content any,
|
|
) error {
|
|
logger.Debugf(ctx, "SendMessage(ctx, '%s', %T)", dst, content)
|
|
defer logger.Debugf(ctx, "/SendMessage(ctx, '%s', %T)", dst, content)
|
|
encoder := gob.NewEncoder(c.Conn)
|
|
msg := MessageToMain{
|
|
Password: c.Password,
|
|
Destination: dst,
|
|
Content: content,
|
|
}
|
|
err := xsync.DoR1(ctx, &c.WriteLocker, func() error {
|
|
return encoder.Encode(msg)
|
|
})
|
|
logger.Tracef(ctx, "sending message %#+v: %v", msg, err)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to encode&send message %#+v: %w", msg, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *Client) Close() error {
|
|
return c.Conn.Close()
|
|
}
|
|
|
|
func (c *Client) Serve(
|
|
ctx context.Context,
|
|
onReceivedMessage OnReceivedMessageFunc,
|
|
) error {
|
|
ctx, cancelFn := context.WithCancel(ctx)
|
|
defer cancelFn()
|
|
observability.Go(ctx, func(ctx context.Context) {
|
|
<-ctx.Done()
|
|
err := c.Close()
|
|
if err != nil {
|
|
logger.Error(ctx, err)
|
|
}
|
|
})
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
default:
|
|
}
|
|
|
|
if err := c.ReadOne(ctx, onReceivedMessage); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Client) ReadOne(
|
|
ctx context.Context,
|
|
onReceivedMessage OnReceivedMessageFunc,
|
|
) error {
|
|
var msg MessageFromMain
|
|
decoder := gob.NewDecoder(c.Conn)
|
|
err := decoder.Decode(&msg)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to receive&decode message: %w", err)
|
|
}
|
|
|
|
if err := onReceivedMessage(ctx, msg.Source, msg.Content); err != nil {
|
|
return fmt.Errorf("unable to process the message '%#+v': %w", msg, err)
|
|
}
|
|
|
|
return nil
|
|
}
|