added health processor and few related messages and processes

This commit is contained in:
harshabose
2025-05-08 00:53:38 +05:30
parent 51bceea105
commit 1cffe91b4d
14 changed files with 784 additions and 331 deletions

View File

@@ -6,11 +6,13 @@ import (
"time"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/process"
)
type ClientInterceptor struct {
commonInterceptor
Health interfaces.Processor
}
func (i *ClientInterceptor) BindSocketConnection(connection interceptor.Connection, writer interceptor.Writer, reader interceptor.Reader) (interceptor.Writer, interceptor.Reader, error) {

View File

@@ -0,0 +1,157 @@
package health
import (
"context"
"encoding/json"
"fmt"
"sync"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
type Stat struct {
ConnectionStatus types.ConnectionState `json:"connection_status"`
ConnectionUptime types.ConnectionUptime `json:"connection_uptime"`
CPUUsage types.CPUUsage `json:"cpu_usage"`
MemoryUsage types.MemoryUsage `json:"memory_usage"`
NetworkUsage types.NetworkUsage `json:"network_usage"`
Latency types.LatencyMs `json:"latency"`
}
type Health struct {
Snapshot
mux sync.RWMutex
ctx context.Context
}
type Snapshot struct {
Roomid types.RoomID `json:"roomid"` // NOTE: ADDED WHEN HEALTH IS CREATED.
Allowed []types.ClientID `json:"allowed"` // NOTE: ADDED WHEN HEALTH IS CREATED.
Participants map[types.ClientID]*Stat `json:"participants"` // NOTE: ADDED WHEN CLIENT JOINS. UPDATED WHEN CLIENT SENDS HEALTH RESPONSE.
}
// Marshal marshals the health struct into a json byte array.
// NOTE: THIS IS USED TO SEND HEALTH STATS TO THE CLIENT.
func (h *Snapshot) Marshal() ([]byte, error) {
return json.Marshal(h)
}
func NewHealth(ctx context.Context, id types.RoomID, allowed []types.ClientID) *Health {
return &Health{
Snapshot: Snapshot{
Roomid: id,
Allowed: allowed,
Participants: make(map[types.ClientID]*Stat),
},
ctx: ctx,
}
}
func (h *Health) ID() types.RoomID {
return h.Roomid
}
func (h *Health) Add(roomid types.RoomID, id types.ClientID) error {
h.mux.Lock()
defer h.mux.Unlock()
if roomid != h.Roomid {
return errors.ErrWrongRoom
}
select {
case <-h.ctx.Done():
return fmt.Errorf("error while adding client to health stats. client ID: %s; room ID: %s; err: %s", id, h.Roomid, errors.ErrContextCancelled.Error())
default:
if !h.isAllowed(id) {
return fmt.Errorf("error while adding client to health stats. client ID: %s; room ID: %s; err: %s", id, h.Roomid, errors.ErrClientNotAllowedInRoom.Error())
}
if h.isParticipant(id) {
return fmt.Errorf("client with id '%s' already existing in the health stats with id %s; err: %s", id, h.Roomid, errors.ErrClientIsAlreadyParticipant)
}
h.Participants[id] = &Stat{}
return nil
}
}
func (h *Health) Remove(roomid types.RoomID, id types.ClientID) error {
h.mux.Lock()
defer h.mux.Unlock()
if roomid != h.Roomid {
return errors.ErrWrongRoom
}
select {
case <-h.ctx.Done():
return fmt.Errorf("error while removing client to health stats. client ID: %s; room ID: %s; err: %s", id, h.Roomid, errors.ErrContextCancelled.Error())
default:
if !h.isAllowed(id) {
return fmt.Errorf("error while removing client to health stats. client ID: %s; room ID: %s; err: %s", id, h.Roomid, errors.ErrClientNotAllowedInRoom.Error())
}
if !h.isParticipant(id) {
return fmt.Errorf("client with id '%s' does not exist in the health stats with id %s; err: %s", id, h.Roomid, errors.ErrClientNotAParticipant.Error())
}
delete(h.Participants, id)
return nil
}
}
func (h *Health) Update(roomid types.RoomID, id types.ClientID, s *Stat) error {
h.mux.Lock()
defer h.mux.Unlock()
if roomid != h.Roomid {
return errors.ErrWrongRoom
}
select {
case <-h.ctx.Done():
return fmt.Errorf("error while adding client to health stats. client ID: %s; room ID: %s; err: %s", id, h.Roomid, errors.ErrContextCancelled.Error())
default:
if !h.isAllowed(id) {
return fmt.Errorf("error while adding client to health stats. client ID: %s; room ID: %s; err: %s", id, h.Roomid, errors.ErrClientNotAllowedInRoom.Error())
}
if !h.isParticipant(id) {
return fmt.Errorf("client with id '%s' does not exist in the health stats with id %s; err: %s", id, h.Roomid, errors.ErrClientNotAParticipant.Error())
}
h.Participants[id] = s
return nil
}
}
func (h *Health) isAllowed(id types.ClientID) bool {
select {
case <-h.ctx.Done():
return false
default:
if len(h.Allowed) == 0 {
return true
}
for _, allowedID := range h.Allowed {
if allowedID == id {
return true
}
}
return false
}
}
func (h *Health) isParticipant(id types.ClientID) bool {
select {
case <-h.ctx.Done():
return false
default:
_, exists := h.Participants[id]
return exists
}
}

View File

@@ -1 +1,30 @@
package interfaces
import (
"github.com/harshabose/socket-comm/pkg/middleware/chat/health"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
type CanAddHealth interface {
Add(roomid types.RoomID, id types.ClientID) error
}
type CanRemoveHealth interface {
Remove(roomid types.RoomID, id types.ClientID) error
}
type CanUpdate interface {
Update(roomid types.RoomID, id types.ClientID, s *health.Stat) error
}
type CanCreateHealth interface {
CreateHealth(types.RoomID) (*health.Health, error)
}
type CanDeleteHealth interface {
DeleteHealth(types.RoomID) error
}
type CanGetHealthSnapshot interface {
GetHealthSnapshot(types.RoomID) (health.Snapshot, error)
}

View File

@@ -0,0 +1,54 @@
package messages
import (
"time"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
var CreateRoomProtocol message.Protocol = "room:create_room"
type CreateRoom struct {
interceptor.BaseMessage
RoomID types.RoomID `json:"room_id"`
Allowed []types.ClientID `json:"allowed"`
TTL time.Duration `json:"ttl"`
}
func (m *CreateRoom) GetProtocol() message.Protocol {
return CreateRoomProtocol
}
func (m *CreateRoom) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error {
i, ok := _i.(*chat.ServerInterceptor)
if !ok {
return errors.ErrInterfaceMisMatch
}
s, err := i.GetState(connection)
if err != nil {
return err
}
return i.Rooms.Process(m, s)
}
func (m *CreateRoom) Process(p interfaces.Processor, s *state.State) error {
r, ok := p.(interfaces.CanCreateRoom)
if !ok {
return errors.ErrInterfaceMisMatch
}
room, err := r.CreateRoom(m.RoomID, m.Allowed, m.TTL)
if err != nil {
return err
}
return room.Add(m.RoomID, s)
}

View File

@@ -0,0 +1,40 @@
package messages
import (
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
var DeleteRoomProtocol message.Protocol = "room:delete_room"
type DeleteRoom struct {
interceptor.BaseMessage
RoomID types.RoomID `json:"room_id"`
}
func (m *DeleteRoom) GetProtocol() message.Protocol {
return DeleteRoomProtocol
}
func (m *DeleteRoom) ReadProcess(_i interceptor.Interceptor, _ interceptor.Connection) error {
i, ok := _i.(*chat.ServerInterceptor)
if !ok {
return errors.ErrInterfaceMisMatch
}
return i.Rooms.Process(m, nil)
}
func (m *DeleteRoom) Process(p interfaces.Processor, _ *state.State) error {
r, ok := p.(interfaces.CanDeleteRoom)
if !ok {
return errors.ErrInterfaceMisMatch
}
return r.DeleteRoom(m.RoomID)
}

View File

@@ -0,0 +1,28 @@
package messages
import (
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
)
var ForwardedMessageProtocol message.Protocol = "room:forwarded_message"
type ForwardedMessage struct {
interceptor.BaseMessage
}
func (m *ForwardedMessage) GetProtocol() message.Protocol {
return ForwardedMessageProtocol
}
func newForwardedMessage(forward *ToForward) (*ForwardedMessage, error) {
msg := &ForwardedMessage{}
bmsg, err := interceptor.NewBaseMessage(forward.GetNextProtocol(), forward.NextPayload, msg)
if err != nil {
return nil, err
}
msg.BaseMessage = bmsg
return msg, nil
}

View File

@@ -0,0 +1,91 @@
package messages
import (
"fmt"
"time"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
var RequestHealthProtocol message.Protocol = "room:request_health"
type RequestHealth struct {
interceptor.BaseMessage
RoomID types.RoomID `json:"room_id"`
Timestamp int64 `json:"timestamp"` // in nanoseconds
ConnectionStartTime int64 `json:"connection_start_time"`
}
func NewRequestHealth(id types.RoomID) (*RequestHealth, error) {
msg := &RequestHealth{
RoomID: id,
Timestamp: time.Now().UnixNano(),
}
bmsg, err := interceptor.NewBaseMessage(message.NoneProtocol, nil, msg)
if err != nil {
panic(err)
}
msg.BaseMessage = bmsg
return msg, nil
}
func NewRequestHealthFactory(id types.RoomID) func() (message.Message, error) {
return func() (message.Message, error) {
return NewRequestHealth(id)
}
}
func (m *RequestHealth) GetProtocol() message.Protocol {
return RequestHealthProtocol
}
func (m *RequestHealth) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error {
s, ok := _i.(interfaces.CanGetState)
if !ok {
return errors.ErrInterfaceMisMatch
}
ss, err := s.GetState(connection)
if err != nil {
return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error())
}
msg, err := NewHealthResponse(m, 5*time.Second)
if err != nil {
return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error())
}
if err := ss.Write(msg); err != nil {
return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error())
}
return nil
}
func (m *RequestHealth) WriteProcess(_i interceptor.Interceptor, connection interceptor.Connection) error {
s, ok := _i.(interfaces.CanGetState)
if !ok {
return errors.ErrInterfaceMisMatch
}
ss, err := s.GetState(connection)
if err != nil {
return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error())
}
id, err := ss.GetClientID()
if err != nil {
return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error())
}
m.SetSender(message.Sender(_i.ID()))
m.SetReceiver(message.Receiver(id))
return nil
}

View File

@@ -10,112 +10,32 @@ import (
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/health"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
var (
RequestHealthProtocol message.Protocol = "room:request_health"
HealthResponseProtocol message.Protocol = "room:health_response"
)
type RequestHealth struct {
interceptor.BaseMessage
RoomID types.RoomID `json:"room_id"`
Timestamp int64 `json:"timestamp"` // in nanoseconds
ConnectionStartTime int64 `json:"connection_start_time"`
}
func NewRequestHealth(id types.RoomID) (*RequestHealth, error) {
msg := &RequestHealth{
RoomID: id,
Timestamp: time.Now().UnixNano(),
}
bmsg, err := interceptor.NewBaseMessage(message.NoneProtocol, nil, msg)
if err != nil {
panic(err)
}
msg.BaseMessage = bmsg
return msg, nil
}
func NewRequestHealthFactory(id types.RoomID) func() (message.Message, error) {
return func() (message.Message, error) {
return NewRequestHealth(id)
}
}
func (m *RequestHealth) GetProtocol() message.Protocol {
return RequestHealthProtocol
}
func (m *RequestHealth) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error {
s, ok := _i.(interfaces.CanGetState)
if !ok {
return errors.ErrInterfaceMisMatch
}
ss, err := s.GetState(connection)
if err != nil {
return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error())
}
msg, err := NewHealthResponse(m)
if err != nil {
return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error())
}
if err := ss.Write(msg); err != nil {
return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error())
}
return nil
}
func (m *RequestHealth) WriteProcess(_i interceptor.Interceptor, connection interceptor.Connection) error {
s, ok := _i.(interfaces.CanGetState)
if !ok {
return errors.ErrInterfaceMisMatch
}
ss, err := s.GetState(connection)
if err != nil {
return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error())
}
id, err := ss.GetClientID()
if err != nil {
return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error())
}
m.SetSender(message.Sender(_i.ID()))
m.SetReceiver(message.Receiver(id))
return nil
}
var HealthResponseProtocol message.Protocol = "room:health_response"
// NOTE: BASIC HEALTH RESPONSE FOR ROOM MANAGEMENT, OTHER METRICS WILL BE DEALT WITH LATER
type HealthResponse struct {
interceptor.BaseMessage
RequestTimeStamp int64 `json:"-"` // in nanoseconds
RoomID types.RoomID `json:"room_id"`
ConnectionStatus types.ConnectionState `json:"connection_status"`
ConnectionUptime types.ConnectionUptime `json:"connection_uptime"`
CPUUsage types.CPUUsage `json:"cpu_usage"`
MemoryUsage types.MemoryUsage `json:"memory_usage"`
NetworkUsage types.NetworkUsage `json:"network_usage"`
Latency types.LatencyMs `json:"latency"`
health.Stat
RequestTimeStamp int64 `json:"-"` // in nanoseconds
RoomID types.RoomID `json:"room_id"`
Validity time.Duration `json:"validity"`
}
func NewHealthResponse(request *RequestHealth) (*HealthResponse, error) {
func NewHealthResponse(request *RequestHealth, validity time.Duration) (*HealthResponse, error) {
response := &HealthResponse{}
response.RequestTimeStamp = request.Timestamp
response.RoomID = request.RoomID
response.Validity = validity
response.setConnectionStatus(request.ConnectionStartTime)
@@ -145,9 +65,52 @@ func (m *HealthResponse) GetProtocol() message.Protocol {
}
func (m *HealthResponse) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error {
// TODO: IMPLEMENT HEALTH MANAGEMENT
i, ok := _i.(*chat.ServerInterceptor)
if !ok {
return errors.ErrInterfaceMisMatch
}
return nil
s, err := i.GetState(connection)
if err != nil {
return err
}
return i.Health.Process(m, s)
}
func (m *HealthResponse) Process(p interfaces.Processor, s *state.State) error {
id, err := s.GetClientID()
if err != nil {
return err
}
if id != types.ClientID(m.CurrentHeader.Sender) {
return fmt.Errorf("error while processing 'HealthResponse' message; err: 'sender id does not match'")
}
u, ok := p.(interfaces.CanUpdate)
if !ok {
return errors.ErrInterfaceMisMatch
}
// NOTE: BE VERY CAREFUL WITH THIS. STAT IS PASSED AS POINTER. ANY CHANGES LATER TO HealthResponse WILL BE REFLECTED IN CanUpdate
timer := time.NewTimer(m.Validity)
defer timer.Stop()
for {
// NOTE: THIS IS A BLOCKING CALL. WE NEED TO WAIT FOR THE VALIDITY TO EXPIRE
// NOTE: THIS ALSO MAKES SURE THAT THE ROOM ACTUALLY EXISTS BEFORE UPDATING THE HEALTH STATS
select {
case <-timer.C:
return fmt.Errorf("error while processing 'HealthResponse' message; err: 'validity expired'")
default:
err := u.Update(m.RoomID, id, &m.Stat)
if err == nil {
return nil
}
fmt.Println("error while reading CPU usage; err: ", err.Error())
}
}
}
func (m *HealthResponse) WriteProcess(_i interceptor.Interceptor, connection interceptor.Connection) error {
@@ -225,6 +188,11 @@ func (m *HealthResponse) setMemoryUsage() error {
return nil
}
func (m *HealthResponse) setNetworkUsage() error {
// TODO: IMPLEMENT THIS LATER
return nil
}
func (m *HealthResponse) setLatency() error {
latencyNano := time.Now().UnixNano() - m.RequestTimeStamp

View File

@@ -0,0 +1,66 @@
package messages
import (
"fmt"
"time"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
var JoinRoomProtocol message.Protocol = "room:join_room"
type JoinRoom struct {
interceptor.BaseMessage
RoomID types.RoomID `json:"room_id"`
JoinDeadline time.Duration `json:"join_deadline"`
}
func (m *JoinRoom) GetProtocol() message.Protocol {
return JoinRoomProtocol
}
func (m *JoinRoom) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error {
i, ok := _i.(*chat.ServerInterceptor)
if !ok {
return errors.ErrInterfaceMisMatch
}
s, err := i.GetState(connection)
if err != nil {
return err
}
return i.Rooms.Process(m, s)
}
func (m *JoinRoom) Process(p interfaces.Processor, s *state.State) error {
a, ok := p.(interfaces.CanAdd)
if !ok {
return errors.ErrInterfaceMisMatch
}
timer := time.NewTimer(m.JoinDeadline)
defer timer.Stop()
for {
select {
case <-timer.C:
return fmt.Errorf("error while read processing 'JoinRoom' msg; err: %s", errors.ErrContextCancelled)
default:
err := a.Add(m.RoomID, s)
if err == nil {
return nil
}
fmt.Println(fmt.Errorf("error while read processing 'JoinRoom' msg; err: %s. retrying", err.Error()))
}
}
// TODO: AFTER SUCCESS, SEND ROOM CURRENT STATE TO THE CLIENT
// TODO: THEN SEND SuccessJoinRoom MESSAGE TO THE CLIENT
}

View File

@@ -0,0 +1,45 @@
package messages
import (
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
var LeaveRoomProtocol message.Protocol = "room:leave_room"
type LeaveRoom struct {
interceptor.BaseMessage
RoomID types.RoomID
}
func (m *LeaveRoom) GetProtocol() message.Protocol {
return LeaveRoomProtocol
}
func (m *LeaveRoom) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error {
i, ok := _i.(*chat.ServerInterceptor)
if !ok {
return errors.ErrInterfaceMisMatch
}
s, err := i.GetState(connection)
if err != nil {
return err
}
return i.Rooms.Process(m, s)
}
func (m *LeaveRoom) Process(p interfaces.Processor, s *state.State) error {
r, ok := p.(interfaces.CanRemove)
if !ok {
return errors.ErrInterfaceMisMatch
}
return r.Remove(m.RoomID, s)
}

View File

@@ -1,238 +0,0 @@
package messages
import (
"fmt"
"time"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
var (
CreateRoomProtocol message.Protocol = "room:create_room"
DeleteRoomProtocol message.Protocol = "room:delete_room"
ForwardMessageProtocol message.Protocol = "room:forward_message"
ForwardedMessageProtocol message.Protocol = "room:forwarded_message"
JoinRoomProtocol message.Protocol = "room:join_room"
LeaveRoomProtocol message.Protocol = "room:leave_room"
)
type CreateRoom struct {
interceptor.BaseMessage
RoomID types.RoomID `json:"room_id"`
Allowed []types.ClientID `json:"allowed"`
TTL time.Duration `json:"ttl"`
}
func (m *CreateRoom) GetProtocol() message.Protocol {
return CreateRoomProtocol
}
func (m *CreateRoom) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error {
i, ok := _i.(*chat.ServerInterceptor)
if !ok {
return errors.ErrInterfaceMisMatch
}
s, err := i.GetState(connection)
if err != nil {
return err
}
return i.Rooms.Process(m, s)
}
func (m *CreateRoom) Process(p interfaces.Processor, s *state.State) error {
r, ok := p.(interfaces.CanCreateRoom)
if !ok {
return errors.ErrInterfaceMisMatch
}
room, err := r.CreateRoom(m.RoomID, m.Allowed, m.TTL)
if err != nil {
return err
}
return room.Add(m.RoomID, s)
}
type DeleteRoom struct {
interceptor.BaseMessage
RoomID types.RoomID `json:"room_id"`
}
func (m *DeleteRoom) GetProtocol() message.Protocol {
return DeleteRoomProtocol
}
func (m *DeleteRoom) ReadProcess(_i interceptor.Interceptor, _ interceptor.Connection) error {
i, ok := _i.(*chat.ServerInterceptor)
if !ok {
return errors.ErrInterfaceMisMatch
}
return i.Rooms.Process(m, nil)
}
func (m *DeleteRoom) Process(p interfaces.Processor, _ *state.State) error {
r, ok := p.(interfaces.CanDeleteRoom)
if !ok {
return errors.ErrInterfaceMisMatch
}
return r.DeleteRoom(m.RoomID)
}
type ToForwardMessage struct {
interceptor.BaseMessage
RoomID types.RoomID `json:"room_id"`
To []types.ClientID `json:"to"`
}
func (m *ToForwardMessage) GetProtocol() message.Protocol {
return ForwardMessageProtocol
}
func (m *ToForwardMessage) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error {
i, ok := _i.(*chat.ServerInterceptor)
if !ok {
return errors.ErrInterfaceMisMatch
}
s, err := i.GetState(connection)
if err != nil {
return err
}
return i.Rooms.Process(m, s)
}
func (m *ToForwardMessage) Process(p interfaces.Processor, s *state.State) error {
msg, err := newForwardedMessage(m)
if err != nil {
return err
}
clientID, err := s.GetClientID()
if err != nil {
return err
}
if types.ClientID(m.CurrentHeader.Sender) != clientID {
return fmt.Errorf("error while read processing 'ToForwardMessage'; From and ClientID did not match")
}
w, ok := p.(interfaces.CanWriteRoomMessage)
if !ok {
return errors.ErrInterfaceMisMatch
}
if err := w.WriteRoomMessage(m.RoomID, msg, clientID, m.To...); err != nil {
return err
}
return nil
}
type ForwardedMessage struct {
interceptor.BaseMessage
}
func (m *ForwardedMessage) GetProtocol() message.Protocol {
return ForwardedMessageProtocol
}
func newForwardedMessage(forward *ToForwardMessage) (*ForwardedMessage, error) {
msg := &ForwardedMessage{}
bmsg, err := interceptor.NewBaseMessage(forward.GetNextProtocol(), forward.NextPayload, msg)
if err != nil {
return nil, err
}
msg.BaseMessage = bmsg
return msg, nil
}
type JoinRoom struct {
interceptor.BaseMessage
RoomID types.RoomID `json:"room_id"`
WaitDuration time.Duration `json:"wait_duration"`
}
func (m *JoinRoom) GetProtocol() message.Protocol {
return JoinRoomProtocol
}
func (m *JoinRoom) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error {
i, ok := _i.(*chat.ServerInterceptor)
if !ok {
return errors.ErrInterfaceMisMatch
}
s, err := i.GetState(connection)
if err != nil {
return err
}
return i.Rooms.Process(m, s)
}
func (m *JoinRoom) Process(p interfaces.Processor, s *state.State) error {
a, ok := p.(interfaces.CanAdd)
if !ok {
return errors.ErrInterfaceMisMatch
}
timer := time.NewTimer(m.WaitDuration)
defer timer.Stop()
for {
select {
case <-timer.C:
return fmt.Errorf("error while read processing 'JoinRoom' msg; err: %s", errors.ErrContextCancelled)
default:
err := a.Add(m.RoomID, s)
if err == nil {
return nil
}
fmt.Println(fmt.Errorf("error while read processing 'JoinRoom' msg; err: %s. retrying", err.Error()))
}
}
}
type LeaveRoom struct {
interceptor.BaseMessage
RoomID types.RoomID
}
func (m *LeaveRoom) GetProtocol() message.Protocol {
return LeaveRoomProtocol
}
func (m *LeaveRoom) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error {
i, ok := _i.(*chat.ServerInterceptor)
if !ok {
return errors.ErrInterfaceMisMatch
}
s, err := i.GetState(connection)
if err != nil {
return err
}
return i.Rooms.Process(m, s)
}
func (m *LeaveRoom) Process(p interfaces.Processor, s *state.State) error {
r, ok := p.(interfaces.CanRemove)
if !ok {
return errors.ErrInterfaceMisMatch
}
return r.Remove(m.RoomID, s)
}

View File

@@ -0,0 +1,43 @@
package messages
import (
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
var SuccessJoinRoomProtocol message.Protocol = "room:success_join_room"
// SuccessJoinRoom is the message sent by the server to the clients (including the requested client and roommates)
// when the client joins a room successfully.
type SuccessJoinRoom struct {
interceptor.BaseMessage
RoomID types.RoomID `json:"room_id"`
ClientID types.ClientID `json:"client_id"`
}
func (m *SuccessJoinRoom) GetProtocol() message.Protocol {
return SuccessJoinRoomProtocol
}
func (m *SuccessJoinRoom) ReadProcess(_i interceptor.Interceptor, _ interceptor.Connection) error {
i, ok := _i.(*chat.ClientInterceptor)
if !ok {
return errors.ErrInterfaceMisMatch
}
return i.Health.Process(m, nil)
}
func (m *SuccessJoinRoom) Process(p interfaces.Processor, _ *state.State) error {
a, ok := p.(interfaces.CanAddHealth)
if !ok {
return errors.ErrInterfaceMisMatch
}
// NOTE: MIGHT FAIL IF THE ROOM CREATION MESSAGE IS NOT RECEIVED BY THE CLIENT BEFORE THIS MESSAGE IS SENT.
return a.Add(m.RoomID, m.ClientID)
}

View File

@@ -0,0 +1,65 @@
package messages
import (
"fmt"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
var ForwardMessageProtocol message.Protocol = "room:forward_message"
type ToForward struct {
interceptor.BaseMessage
RoomID types.RoomID `json:"room_id"`
To []types.ClientID `json:"to"`
}
func (m *ToForward) GetProtocol() message.Protocol {
return ForwardMessageProtocol
}
func (m *ToForward) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error {
i, ok := _i.(*chat.ServerInterceptor)
if !ok {
return errors.ErrInterfaceMisMatch
}
s, err := i.GetState(connection)
if err != nil {
return err
}
return i.Rooms.Process(m, s)
}
func (m *ToForward) Process(p interfaces.Processor, s *state.State) error {
msg, err := newForwardedMessage(m)
if err != nil {
return err
}
clientID, err := s.GetClientID()
if err != nil {
return err
}
if types.ClientID(m.CurrentHeader.Sender) != clientID {
return fmt.Errorf("error while read processing 'ToForward'; From and ClientID did not match")
}
w, ok := p.(interfaces.CanWriteRoomMessage)
if !ok {
return errors.ErrInterfaceMisMatch
}
if err := w.WriteRoomMessage(m.RoomID, msg, clientID, m.To...); err != nil {
return err
}
return nil
}

View File

@@ -1,13 +1,56 @@
package processors
import (
"context"
"fmt"
"sync"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/health"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
type Health struct {
health map[types.RoomID]*health.Health
mux sync.RWMutex
ctx context.Context
}
// ==============================================================================
// ================================ CORE METHODS ================================
// ==============================================================================
func (p *Health) CreateHealth(roomid types.RoomID, allowed []types.ClientID) (*health.Health, error) {
if p.exists(roomid) {
return nil, fmt.Errorf("error while creating h with id %s; err: %s", roomid, errors.ErrRoomAlreadyExists)
}
h := health.NewHealth(p.ctx, roomid, allowed)
p.health[roomid] = h
return h, nil
}
func (p *Health) DeleteHealth(roomid types.RoomID) error {
if !p.exists(roomid) {
return fmt.Errorf("error while deleting h with id: %s; err: %s", roomid, errors.ErrRoomNotFound)
}
// TODO: DO I NEED TO CLOSE THE HEALTH?
delete(p.health, roomid)
return nil
}
func (p *Health) exists(roomid types.RoomID) bool {
_, exists := p.health[roomid]
return exists
}
// ==============================================================================
// ========================== INTERFACE IMPLEMENTATIONS =========================
// ==============================================================================
func (p *Health) Process(process interfaces.CanBeProcessed, state *state.State) error {
return process.Process(p, state)
}
@@ -16,6 +59,66 @@ func (p *Health) ProcessBackground(process interfaces.CanBeProcessedBackground,
return process.ProcessBackground(p, state)
}
func (p *Health) GetSnapshot() {
func (p *Health) GetHealthSnapshot(roomid types.RoomID) (health.Snapshot, error) {
p.mux.RLock()
defer p.mux.RUnlock()
if !p.exists(roomid) {
return health.Snapshot{}, fmt.Errorf("error while getting snapshot for room with id: %s; err: %s", roomid, errors.ErrRoomNotFound)
}
h := p.health[roomid]
// NOTE: FOLLOWING IS DEEP-COPYING SNAPSHOT
snapshot := health.Snapshot{
Roomid: h.Roomid,
Allowed: make([]types.ClientID, len(h.Allowed)),
Participants: make(map[types.ClientID]*health.Stat, len(h.Participants)),
}
copy(snapshot.Allowed, h.Allowed)
for id, stat := range h.Participants {
if stat != nil {
statCopy := *stat
snapshot.Participants[id] = &statCopy
} else {
snapshot.Participants[id] = nil
}
}
return snapshot, nil
}
func (p *Health) Add(roomid types.RoomID, id types.ClientID) error {
p.mux.Lock()
defer p.mux.Unlock()
if !p.exists(roomid) {
return fmt.Errorf("error while adding participant with id: %s; err: %s", id, errors.ErrRoomNotFound)
}
return p.health[roomid].Add(roomid, id)
}
func (p *Health) Remove(roomid types.RoomID, id types.ClientID) error {
p.mux.Lock()
defer p.mux.Unlock()
if !p.exists(roomid) {
return fmt.Errorf("error while removing participant with id: %s; err: %s", id, errors.ErrRoomNotFound)
}
return p.health[roomid].Remove(roomid, id)
}
func (p *Health) Update(roomid types.RoomID, id types.ClientID, stat *health.Stat) error {
p.mux.Lock()
defer p.mux.Unlock()
if !p.exists(roomid) {
return fmt.Errorf("error while updating participant with id: %s; err: %s", id, errors.ErrRoomNotFound)
}
return p.health[roomid].Update(roomid, id, stat)
}