From 1cffe91b4d57a79a21f4839ffdf2eb9f200f770a Mon Sep 17 00:00:00 2001 From: harshabose Date: Thu, 8 May 2025 00:53:38 +0530 Subject: [PATCH] added health processor and few related messages and processes --- pkg/middleware/chat/client_interceptor.go | 2 + pkg/middleware/chat/health/health.go | 157 ++++++++++++ pkg/middleware/chat/interfaces/health.go | 29 +++ pkg/middleware/chat/messages/create_room.go | 54 ++++ pkg/middleware/chat/messages/delete_room.go | 40 +++ pkg/middleware/chat/messages/forwarded.go | 28 +++ .../chat/messages/health_request.go | 91 +++++++ ...st_health_stream.go => health_response.go} | 152 +++++------ pkg/middleware/chat/messages/join_room.go | 66 +++++ pkg/middleware/chat/messages/leave_room.go | 45 ++++ pkg/middleware/chat/messages/room_messages.go | 238 ------------------ .../chat/messages/success_join_room.go | 43 ++++ pkg/middleware/chat/messages/to_forward.go | 65 +++++ pkg/middleware/chat/processors/health.go | 105 +++++++- 14 files changed, 784 insertions(+), 331 deletions(-) create mode 100644 pkg/middleware/chat/health/health.go create mode 100644 pkg/middleware/chat/messages/create_room.go create mode 100644 pkg/middleware/chat/messages/delete_room.go create mode 100644 pkg/middleware/chat/messages/forwarded.go create mode 100644 pkg/middleware/chat/messages/health_request.go rename pkg/middleware/chat/messages/{request_health_stream.go => health_response.go} (59%) create mode 100644 pkg/middleware/chat/messages/join_room.go create mode 100644 pkg/middleware/chat/messages/leave_room.go delete mode 100644 pkg/middleware/chat/messages/room_messages.go create mode 100644 pkg/middleware/chat/messages/success_join_room.go create mode 100644 pkg/middleware/chat/messages/to_forward.go diff --git a/pkg/middleware/chat/client_interceptor.go b/pkg/middleware/chat/client_interceptor.go index 1871c61..2b3377e 100644 --- a/pkg/middleware/chat/client_interceptor.go +++ b/pkg/middleware/chat/client_interceptor.go @@ -6,11 +6,13 @@ import ( "time" "github.com/harshabose/socket-comm/pkg/interceptor" + "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" "github.com/harshabose/socket-comm/pkg/middleware/chat/process" ) type ClientInterceptor struct { commonInterceptor + Health interfaces.Processor } func (i *ClientInterceptor) BindSocketConnection(connection interceptor.Connection, writer interceptor.Writer, reader interceptor.Reader) (interceptor.Writer, interceptor.Reader, error) { diff --git a/pkg/middleware/chat/health/health.go b/pkg/middleware/chat/health/health.go new file mode 100644 index 0000000..6e574bd --- /dev/null +++ b/pkg/middleware/chat/health/health.go @@ -0,0 +1,157 @@ +package health + +import ( + "context" + "encoding/json" + "fmt" + "sync" + + "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" + "github.com/harshabose/socket-comm/pkg/middleware/chat/types" +) + +type Stat struct { + ConnectionStatus types.ConnectionState `json:"connection_status"` + ConnectionUptime types.ConnectionUptime `json:"connection_uptime"` + CPUUsage types.CPUUsage `json:"cpu_usage"` + MemoryUsage types.MemoryUsage `json:"memory_usage"` + NetworkUsage types.NetworkUsage `json:"network_usage"` + Latency types.LatencyMs `json:"latency"` +} + +type Health struct { + Snapshot + mux sync.RWMutex + ctx context.Context +} + +type Snapshot struct { + Roomid types.RoomID `json:"roomid"` // NOTE: ADDED WHEN HEALTH IS CREATED. + Allowed []types.ClientID `json:"allowed"` // NOTE: ADDED WHEN HEALTH IS CREATED. + Participants map[types.ClientID]*Stat `json:"participants"` // NOTE: ADDED WHEN CLIENT JOINS. UPDATED WHEN CLIENT SENDS HEALTH RESPONSE. +} + +// Marshal marshals the health struct into a json byte array. +// NOTE: THIS IS USED TO SEND HEALTH STATS TO THE CLIENT. +func (h *Snapshot) Marshal() ([]byte, error) { + return json.Marshal(h) +} + +func NewHealth(ctx context.Context, id types.RoomID, allowed []types.ClientID) *Health { + return &Health{ + Snapshot: Snapshot{ + Roomid: id, + Allowed: allowed, + Participants: make(map[types.ClientID]*Stat), + }, + ctx: ctx, + } +} + +func (h *Health) ID() types.RoomID { + return h.Roomid +} + +func (h *Health) Add(roomid types.RoomID, id types.ClientID) error { + h.mux.Lock() + defer h.mux.Unlock() + + if roomid != h.Roomid { + return errors.ErrWrongRoom + } + + select { + case <-h.ctx.Done(): + return fmt.Errorf("error while adding client to health stats. client ID: %s; room ID: %s; err: %s", id, h.Roomid, errors.ErrContextCancelled.Error()) + default: + if !h.isAllowed(id) { + return fmt.Errorf("error while adding client to health stats. client ID: %s; room ID: %s; err: %s", id, h.Roomid, errors.ErrClientNotAllowedInRoom.Error()) + } + + if h.isParticipant(id) { + return fmt.Errorf("client with id '%s' already existing in the health stats with id %s; err: %s", id, h.Roomid, errors.ErrClientIsAlreadyParticipant) + } + + h.Participants[id] = &Stat{} + return nil + } +} + +func (h *Health) Remove(roomid types.RoomID, id types.ClientID) error { + h.mux.Lock() + defer h.mux.Unlock() + + if roomid != h.Roomid { + return errors.ErrWrongRoom + } + + select { + case <-h.ctx.Done(): + return fmt.Errorf("error while removing client to health stats. client ID: %s; room ID: %s; err: %s", id, h.Roomid, errors.ErrContextCancelled.Error()) + default: + if !h.isAllowed(id) { + return fmt.Errorf("error while removing client to health stats. client ID: %s; room ID: %s; err: %s", id, h.Roomid, errors.ErrClientNotAllowedInRoom.Error()) + } + + if !h.isParticipant(id) { + return fmt.Errorf("client with id '%s' does not exist in the health stats with id %s; err: %s", id, h.Roomid, errors.ErrClientNotAParticipant.Error()) + } + + delete(h.Participants, id) + return nil + } +} + +func (h *Health) Update(roomid types.RoomID, id types.ClientID, s *Stat) error { + h.mux.Lock() + defer h.mux.Unlock() + + if roomid != h.Roomid { + return errors.ErrWrongRoom + } + + select { + case <-h.ctx.Done(): + return fmt.Errorf("error while adding client to health stats. client ID: %s; room ID: %s; err: %s", id, h.Roomid, errors.ErrContextCancelled.Error()) + default: + if !h.isAllowed(id) { + return fmt.Errorf("error while adding client to health stats. client ID: %s; room ID: %s; err: %s", id, h.Roomid, errors.ErrClientNotAllowedInRoom.Error()) + } + + if !h.isParticipant(id) { + return fmt.Errorf("client with id '%s' does not exist in the health stats with id %s; err: %s", id, h.Roomid, errors.ErrClientNotAParticipant.Error()) + } + + h.Participants[id] = s + return nil + } +} + +func (h *Health) isAllowed(id types.ClientID) bool { + select { + case <-h.ctx.Done(): + return false + default: + if len(h.Allowed) == 0 { + return true + } + + for _, allowedID := range h.Allowed { + if allowedID == id { + return true + } + } + + return false + } +} + +func (h *Health) isParticipant(id types.ClientID) bool { + select { + case <-h.ctx.Done(): + return false + default: + _, exists := h.Participants[id] + return exists + } +} diff --git a/pkg/middleware/chat/interfaces/health.go b/pkg/middleware/chat/interfaces/health.go index 08badf2..5b91fbc 100644 --- a/pkg/middleware/chat/interfaces/health.go +++ b/pkg/middleware/chat/interfaces/health.go @@ -1 +1,30 @@ package interfaces + +import ( + "github.com/harshabose/socket-comm/pkg/middleware/chat/health" + "github.com/harshabose/socket-comm/pkg/middleware/chat/types" +) + +type CanAddHealth interface { + Add(roomid types.RoomID, id types.ClientID) error +} + +type CanRemoveHealth interface { + Remove(roomid types.RoomID, id types.ClientID) error +} + +type CanUpdate interface { + Update(roomid types.RoomID, id types.ClientID, s *health.Stat) error +} + +type CanCreateHealth interface { + CreateHealth(types.RoomID) (*health.Health, error) +} + +type CanDeleteHealth interface { + DeleteHealth(types.RoomID) error +} + +type CanGetHealthSnapshot interface { + GetHealthSnapshot(types.RoomID) (health.Snapshot, error) +} diff --git a/pkg/middleware/chat/messages/create_room.go b/pkg/middleware/chat/messages/create_room.go new file mode 100644 index 0000000..ba524cb --- /dev/null +++ b/pkg/middleware/chat/messages/create_room.go @@ -0,0 +1,54 @@ +package messages + +import ( + "time" + + "github.com/harshabose/socket-comm/pkg/interceptor" + "github.com/harshabose/socket-comm/pkg/message" + "github.com/harshabose/socket-comm/pkg/middleware/chat" + "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" + "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" + "github.com/harshabose/socket-comm/pkg/middleware/chat/state" + "github.com/harshabose/socket-comm/pkg/middleware/chat/types" +) + +var CreateRoomProtocol message.Protocol = "room:create_room" + +type CreateRoom struct { + interceptor.BaseMessage + RoomID types.RoomID `json:"room_id"` + Allowed []types.ClientID `json:"allowed"` + TTL time.Duration `json:"ttl"` +} + +func (m *CreateRoom) GetProtocol() message.Protocol { + return CreateRoomProtocol +} + +func (m *CreateRoom) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error { + i, ok := _i.(*chat.ServerInterceptor) + if !ok { + return errors.ErrInterfaceMisMatch + } + + s, err := i.GetState(connection) + if err != nil { + return err + } + + return i.Rooms.Process(m, s) +} + +func (m *CreateRoom) Process(p interfaces.Processor, s *state.State) error { + r, ok := p.(interfaces.CanCreateRoom) + if !ok { + return errors.ErrInterfaceMisMatch + } + + room, err := r.CreateRoom(m.RoomID, m.Allowed, m.TTL) + if err != nil { + return err + } + + return room.Add(m.RoomID, s) +} diff --git a/pkg/middleware/chat/messages/delete_room.go b/pkg/middleware/chat/messages/delete_room.go new file mode 100644 index 0000000..0671bc3 --- /dev/null +++ b/pkg/middleware/chat/messages/delete_room.go @@ -0,0 +1,40 @@ +package messages + +import ( + "github.com/harshabose/socket-comm/pkg/interceptor" + "github.com/harshabose/socket-comm/pkg/message" + "github.com/harshabose/socket-comm/pkg/middleware/chat" + "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" + "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" + "github.com/harshabose/socket-comm/pkg/middleware/chat/state" + "github.com/harshabose/socket-comm/pkg/middleware/chat/types" +) + +var DeleteRoomProtocol message.Protocol = "room:delete_room" + +type DeleteRoom struct { + interceptor.BaseMessage + RoomID types.RoomID `json:"room_id"` +} + +func (m *DeleteRoom) GetProtocol() message.Protocol { + return DeleteRoomProtocol +} + +func (m *DeleteRoom) ReadProcess(_i interceptor.Interceptor, _ interceptor.Connection) error { + i, ok := _i.(*chat.ServerInterceptor) + if !ok { + return errors.ErrInterfaceMisMatch + } + + return i.Rooms.Process(m, nil) +} + +func (m *DeleteRoom) Process(p interfaces.Processor, _ *state.State) error { + r, ok := p.(interfaces.CanDeleteRoom) + if !ok { + return errors.ErrInterfaceMisMatch + } + + return r.DeleteRoom(m.RoomID) +} diff --git a/pkg/middleware/chat/messages/forwarded.go b/pkg/middleware/chat/messages/forwarded.go new file mode 100644 index 0000000..6afcf89 --- /dev/null +++ b/pkg/middleware/chat/messages/forwarded.go @@ -0,0 +1,28 @@ +package messages + +import ( + "github.com/harshabose/socket-comm/pkg/interceptor" + "github.com/harshabose/socket-comm/pkg/message" +) + +var ForwardedMessageProtocol message.Protocol = "room:forwarded_message" + +type ForwardedMessage struct { + interceptor.BaseMessage +} + +func (m *ForwardedMessage) GetProtocol() message.Protocol { + return ForwardedMessageProtocol +} + +func newForwardedMessage(forward *ToForward) (*ForwardedMessage, error) { + msg := &ForwardedMessage{} + bmsg, err := interceptor.NewBaseMessage(forward.GetNextProtocol(), forward.NextPayload, msg) + if err != nil { + return nil, err + } + + msg.BaseMessage = bmsg + + return msg, nil +} diff --git a/pkg/middleware/chat/messages/health_request.go b/pkg/middleware/chat/messages/health_request.go new file mode 100644 index 0000000..0bc0bfd --- /dev/null +++ b/pkg/middleware/chat/messages/health_request.go @@ -0,0 +1,91 @@ +package messages + +import ( + "fmt" + "time" + + "github.com/harshabose/socket-comm/pkg/interceptor" + "github.com/harshabose/socket-comm/pkg/message" + "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" + "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" + "github.com/harshabose/socket-comm/pkg/middleware/chat/types" +) + +var RequestHealthProtocol message.Protocol = "room:request_health" + +type RequestHealth struct { + interceptor.BaseMessage + RoomID types.RoomID `json:"room_id"` + Timestamp int64 `json:"timestamp"` // in nanoseconds + ConnectionStartTime int64 `json:"connection_start_time"` +} + +func NewRequestHealth(id types.RoomID) (*RequestHealth, error) { + msg := &RequestHealth{ + RoomID: id, + Timestamp: time.Now().UnixNano(), + } + + bmsg, err := interceptor.NewBaseMessage(message.NoneProtocol, nil, msg) + if err != nil { + panic(err) + } + msg.BaseMessage = bmsg + + return msg, nil +} + +func NewRequestHealthFactory(id types.RoomID) func() (message.Message, error) { + return func() (message.Message, error) { + return NewRequestHealth(id) + } +} + +func (m *RequestHealth) GetProtocol() message.Protocol { + return RequestHealthProtocol +} + +func (m *RequestHealth) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error { + s, ok := _i.(interfaces.CanGetState) + if !ok { + return errors.ErrInterfaceMisMatch + } + + ss, err := s.GetState(connection) + if err != nil { + return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error()) + } + + msg, err := NewHealthResponse(m, 5*time.Second) + if err != nil { + return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error()) + } + + if err := ss.Write(msg); err != nil { + return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error()) + } + + return nil +} + +func (m *RequestHealth) WriteProcess(_i interceptor.Interceptor, connection interceptor.Connection) error { + s, ok := _i.(interfaces.CanGetState) + if !ok { + return errors.ErrInterfaceMisMatch + } + + ss, err := s.GetState(connection) + if err != nil { + return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error()) + } + + id, err := ss.GetClientID() + if err != nil { + return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error()) + } + + m.SetSender(message.Sender(_i.ID())) + m.SetReceiver(message.Receiver(id)) + + return nil +} diff --git a/pkg/middleware/chat/messages/request_health_stream.go b/pkg/middleware/chat/messages/health_response.go similarity index 59% rename from pkg/middleware/chat/messages/request_health_stream.go rename to pkg/middleware/chat/messages/health_response.go index e1f1223..f6db630 100644 --- a/pkg/middleware/chat/messages/request_health_stream.go +++ b/pkg/middleware/chat/messages/health_response.go @@ -10,112 +10,32 @@ import ( "github.com/harshabose/socket-comm/pkg/interceptor" "github.com/harshabose/socket-comm/pkg/message" + "github.com/harshabose/socket-comm/pkg/middleware/chat" "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" + "github.com/harshabose/socket-comm/pkg/middleware/chat/health" "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" + "github.com/harshabose/socket-comm/pkg/middleware/chat/state" "github.com/harshabose/socket-comm/pkg/middleware/chat/types" ) -var ( - RequestHealthProtocol message.Protocol = "room:request_health" - HealthResponseProtocol message.Protocol = "room:health_response" -) - -type RequestHealth struct { - interceptor.BaseMessage - RoomID types.RoomID `json:"room_id"` - Timestamp int64 `json:"timestamp"` // in nanoseconds - ConnectionStartTime int64 `json:"connection_start_time"` -} - -func NewRequestHealth(id types.RoomID) (*RequestHealth, error) { - msg := &RequestHealth{ - RoomID: id, - Timestamp: time.Now().UnixNano(), - } - - bmsg, err := interceptor.NewBaseMessage(message.NoneProtocol, nil, msg) - if err != nil { - panic(err) - } - msg.BaseMessage = bmsg - - return msg, nil -} - -func NewRequestHealthFactory(id types.RoomID) func() (message.Message, error) { - return func() (message.Message, error) { - return NewRequestHealth(id) - } -} - -func (m *RequestHealth) GetProtocol() message.Protocol { - return RequestHealthProtocol -} - -func (m *RequestHealth) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error { - s, ok := _i.(interfaces.CanGetState) - if !ok { - return errors.ErrInterfaceMisMatch - } - - ss, err := s.GetState(connection) - if err != nil { - return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error()) - } - - msg, err := NewHealthResponse(m) - if err != nil { - return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error()) - } - - if err := ss.Write(msg); err != nil { - return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error()) - } - - return nil -} - -func (m *RequestHealth) WriteProcess(_i interceptor.Interceptor, connection interceptor.Connection) error { - s, ok := _i.(interfaces.CanGetState) - if !ok { - return errors.ErrInterfaceMisMatch - } - - ss, err := s.GetState(connection) - if err != nil { - return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error()) - } - - id, err := ss.GetClientID() - if err != nil { - return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error()) - } - - m.SetSender(message.Sender(_i.ID())) - m.SetReceiver(message.Receiver(id)) - - return nil -} +var HealthResponseProtocol message.Protocol = "room:health_response" // NOTE: BASIC HEALTH RESPONSE FOR ROOM MANAGEMENT, OTHER METRICS WILL BE DEALT WITH LATER type HealthResponse struct { interceptor.BaseMessage - RequestTimeStamp int64 `json:"-"` // in nanoseconds - RoomID types.RoomID `json:"room_id"` - ConnectionStatus types.ConnectionState `json:"connection_status"` - ConnectionUptime types.ConnectionUptime `json:"connection_uptime"` - CPUUsage types.CPUUsage `json:"cpu_usage"` - MemoryUsage types.MemoryUsage `json:"memory_usage"` - NetworkUsage types.NetworkUsage `json:"network_usage"` - Latency types.LatencyMs `json:"latency"` + health.Stat + RequestTimeStamp int64 `json:"-"` // in nanoseconds + RoomID types.RoomID `json:"room_id"` + Validity time.Duration `json:"validity"` } -func NewHealthResponse(request *RequestHealth) (*HealthResponse, error) { +func NewHealthResponse(request *RequestHealth, validity time.Duration) (*HealthResponse, error) { response := &HealthResponse{} response.RequestTimeStamp = request.Timestamp response.RoomID = request.RoomID + response.Validity = validity response.setConnectionStatus(request.ConnectionStartTime) @@ -145,9 +65,52 @@ func (m *HealthResponse) GetProtocol() message.Protocol { } func (m *HealthResponse) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error { - // TODO: IMPLEMENT HEALTH MANAGEMENT + i, ok := _i.(*chat.ServerInterceptor) + if !ok { + return errors.ErrInterfaceMisMatch + } - return nil + s, err := i.GetState(connection) + if err != nil { + return err + } + + return i.Health.Process(m, s) +} + +func (m *HealthResponse) Process(p interfaces.Processor, s *state.State) error { + id, err := s.GetClientID() + if err != nil { + return err + } + + if id != types.ClientID(m.CurrentHeader.Sender) { + return fmt.Errorf("error while processing 'HealthResponse' message; err: 'sender id does not match'") + } + + u, ok := p.(interfaces.CanUpdate) + if !ok { + return errors.ErrInterfaceMisMatch + } + + // NOTE: BE VERY CAREFUL WITH THIS. STAT IS PASSED AS POINTER. ANY CHANGES LATER TO HealthResponse WILL BE REFLECTED IN CanUpdate + timer := time.NewTimer(m.Validity) + defer timer.Stop() + + for { + // NOTE: THIS IS A BLOCKING CALL. WE NEED TO WAIT FOR THE VALIDITY TO EXPIRE + // NOTE: THIS ALSO MAKES SURE THAT THE ROOM ACTUALLY EXISTS BEFORE UPDATING THE HEALTH STATS + select { + case <-timer.C: + return fmt.Errorf("error while processing 'HealthResponse' message; err: 'validity expired'") + default: + err := u.Update(m.RoomID, id, &m.Stat) + if err == nil { + return nil + } + fmt.Println("error while reading CPU usage; err: ", err.Error()) + } + } } func (m *HealthResponse) WriteProcess(_i interceptor.Interceptor, connection interceptor.Connection) error { @@ -225,6 +188,11 @@ func (m *HealthResponse) setMemoryUsage() error { return nil } +func (m *HealthResponse) setNetworkUsage() error { + // TODO: IMPLEMENT THIS LATER + return nil +} + func (m *HealthResponse) setLatency() error { latencyNano := time.Now().UnixNano() - m.RequestTimeStamp diff --git a/pkg/middleware/chat/messages/join_room.go b/pkg/middleware/chat/messages/join_room.go new file mode 100644 index 0000000..6feb9f0 --- /dev/null +++ b/pkg/middleware/chat/messages/join_room.go @@ -0,0 +1,66 @@ +package messages + +import ( + "fmt" + "time" + + "github.com/harshabose/socket-comm/pkg/interceptor" + "github.com/harshabose/socket-comm/pkg/message" + "github.com/harshabose/socket-comm/pkg/middleware/chat" + "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" + "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" + "github.com/harshabose/socket-comm/pkg/middleware/chat/state" + "github.com/harshabose/socket-comm/pkg/middleware/chat/types" +) + +var JoinRoomProtocol message.Protocol = "room:join_room" + +type JoinRoom struct { + interceptor.BaseMessage + RoomID types.RoomID `json:"room_id"` + JoinDeadline time.Duration `json:"join_deadline"` +} + +func (m *JoinRoom) GetProtocol() message.Protocol { + return JoinRoomProtocol +} + +func (m *JoinRoom) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error { + i, ok := _i.(*chat.ServerInterceptor) + if !ok { + return errors.ErrInterfaceMisMatch + } + + s, err := i.GetState(connection) + if err != nil { + return err + } + + return i.Rooms.Process(m, s) +} + +func (m *JoinRoom) Process(p interfaces.Processor, s *state.State) error { + a, ok := p.(interfaces.CanAdd) + if !ok { + return errors.ErrInterfaceMisMatch + } + + timer := time.NewTimer(m.JoinDeadline) + defer timer.Stop() + + for { + select { + case <-timer.C: + return fmt.Errorf("error while read processing 'JoinRoom' msg; err: %s", errors.ErrContextCancelled) + default: + err := a.Add(m.RoomID, s) + if err == nil { + return nil + } + fmt.Println(fmt.Errorf("error while read processing 'JoinRoom' msg; err: %s. retrying", err.Error())) + } + } + + // TODO: AFTER SUCCESS, SEND ROOM CURRENT STATE TO THE CLIENT + // TODO: THEN SEND SuccessJoinRoom MESSAGE TO THE CLIENT +} diff --git a/pkg/middleware/chat/messages/leave_room.go b/pkg/middleware/chat/messages/leave_room.go new file mode 100644 index 0000000..2651484 --- /dev/null +++ b/pkg/middleware/chat/messages/leave_room.go @@ -0,0 +1,45 @@ +package messages + +import ( + "github.com/harshabose/socket-comm/pkg/interceptor" + "github.com/harshabose/socket-comm/pkg/message" + "github.com/harshabose/socket-comm/pkg/middleware/chat" + "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" + "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" + "github.com/harshabose/socket-comm/pkg/middleware/chat/state" + "github.com/harshabose/socket-comm/pkg/middleware/chat/types" +) + +var LeaveRoomProtocol message.Protocol = "room:leave_room" + +type LeaveRoom struct { + interceptor.BaseMessage + RoomID types.RoomID +} + +func (m *LeaveRoom) GetProtocol() message.Protocol { + return LeaveRoomProtocol +} + +func (m *LeaveRoom) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error { + i, ok := _i.(*chat.ServerInterceptor) + if !ok { + return errors.ErrInterfaceMisMatch + } + + s, err := i.GetState(connection) + if err != nil { + return err + } + + return i.Rooms.Process(m, s) +} + +func (m *LeaveRoom) Process(p interfaces.Processor, s *state.State) error { + r, ok := p.(interfaces.CanRemove) + if !ok { + return errors.ErrInterfaceMisMatch + } + + return r.Remove(m.RoomID, s) +} diff --git a/pkg/middleware/chat/messages/room_messages.go b/pkg/middleware/chat/messages/room_messages.go deleted file mode 100644 index 055a3e8..0000000 --- a/pkg/middleware/chat/messages/room_messages.go +++ /dev/null @@ -1,238 +0,0 @@ -package messages - -import ( - "fmt" - "time" - - "github.com/harshabose/socket-comm/pkg/interceptor" - "github.com/harshabose/socket-comm/pkg/message" - "github.com/harshabose/socket-comm/pkg/middleware/chat" - "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" - "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" - "github.com/harshabose/socket-comm/pkg/middleware/chat/state" - "github.com/harshabose/socket-comm/pkg/middleware/chat/types" -) - -var ( - CreateRoomProtocol message.Protocol = "room:create_room" - DeleteRoomProtocol message.Protocol = "room:delete_room" - ForwardMessageProtocol message.Protocol = "room:forward_message" - ForwardedMessageProtocol message.Protocol = "room:forwarded_message" - JoinRoomProtocol message.Protocol = "room:join_room" - LeaveRoomProtocol message.Protocol = "room:leave_room" -) - -type CreateRoom struct { - interceptor.BaseMessage - RoomID types.RoomID `json:"room_id"` - Allowed []types.ClientID `json:"allowed"` - TTL time.Duration `json:"ttl"` -} - -func (m *CreateRoom) GetProtocol() message.Protocol { - return CreateRoomProtocol -} - -func (m *CreateRoom) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error { - i, ok := _i.(*chat.ServerInterceptor) - if !ok { - return errors.ErrInterfaceMisMatch - } - - s, err := i.GetState(connection) - if err != nil { - return err - } - - return i.Rooms.Process(m, s) -} - -func (m *CreateRoom) Process(p interfaces.Processor, s *state.State) error { - r, ok := p.(interfaces.CanCreateRoom) - if !ok { - return errors.ErrInterfaceMisMatch - } - - room, err := r.CreateRoom(m.RoomID, m.Allowed, m.TTL) - if err != nil { - return err - } - - return room.Add(m.RoomID, s) -} - -type DeleteRoom struct { - interceptor.BaseMessage - RoomID types.RoomID `json:"room_id"` -} - -func (m *DeleteRoom) GetProtocol() message.Protocol { - return DeleteRoomProtocol -} - -func (m *DeleteRoom) ReadProcess(_i interceptor.Interceptor, _ interceptor.Connection) error { - i, ok := _i.(*chat.ServerInterceptor) - if !ok { - return errors.ErrInterfaceMisMatch - } - - return i.Rooms.Process(m, nil) -} - -func (m *DeleteRoom) Process(p interfaces.Processor, _ *state.State) error { - r, ok := p.(interfaces.CanDeleteRoom) - if !ok { - return errors.ErrInterfaceMisMatch - } - - return r.DeleteRoom(m.RoomID) -} - -type ToForwardMessage struct { - interceptor.BaseMessage - RoomID types.RoomID `json:"room_id"` - To []types.ClientID `json:"to"` -} - -func (m *ToForwardMessage) GetProtocol() message.Protocol { - return ForwardMessageProtocol -} - -func (m *ToForwardMessage) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error { - i, ok := _i.(*chat.ServerInterceptor) - if !ok { - return errors.ErrInterfaceMisMatch - } - s, err := i.GetState(connection) - if err != nil { - return err - } - - return i.Rooms.Process(m, s) -} - -func (m *ToForwardMessage) Process(p interfaces.Processor, s *state.State) error { - msg, err := newForwardedMessage(m) - if err != nil { - return err - } - - clientID, err := s.GetClientID() - if err != nil { - return err - } - - if types.ClientID(m.CurrentHeader.Sender) != clientID { - return fmt.Errorf("error while read processing 'ToForwardMessage'; From and ClientID did not match") - } - - w, ok := p.(interfaces.CanWriteRoomMessage) - if !ok { - return errors.ErrInterfaceMisMatch - } - - if err := w.WriteRoomMessage(m.RoomID, msg, clientID, m.To...); err != nil { - return err - } - - return nil -} - -type ForwardedMessage struct { - interceptor.BaseMessage -} - -func (m *ForwardedMessage) GetProtocol() message.Protocol { - return ForwardedMessageProtocol -} - -func newForwardedMessage(forward *ToForwardMessage) (*ForwardedMessage, error) { - msg := &ForwardedMessage{} - bmsg, err := interceptor.NewBaseMessage(forward.GetNextProtocol(), forward.NextPayload, msg) - if err != nil { - return nil, err - } - - msg.BaseMessage = bmsg - - return msg, nil -} - -type JoinRoom struct { - interceptor.BaseMessage - RoomID types.RoomID `json:"room_id"` - WaitDuration time.Duration `json:"wait_duration"` -} - -func (m *JoinRoom) GetProtocol() message.Protocol { - return JoinRoomProtocol -} - -func (m *JoinRoom) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error { - i, ok := _i.(*chat.ServerInterceptor) - if !ok { - return errors.ErrInterfaceMisMatch - } - - s, err := i.GetState(connection) - if err != nil { - return err - } - - return i.Rooms.Process(m, s) -} - -func (m *JoinRoom) Process(p interfaces.Processor, s *state.State) error { - a, ok := p.(interfaces.CanAdd) - if !ok { - return errors.ErrInterfaceMisMatch - } - - timer := time.NewTimer(m.WaitDuration) - defer timer.Stop() - - for { - select { - case <-timer.C: - return fmt.Errorf("error while read processing 'JoinRoom' msg; err: %s", errors.ErrContextCancelled) - default: - err := a.Add(m.RoomID, s) - if err == nil { - return nil - } - fmt.Println(fmt.Errorf("error while read processing 'JoinRoom' msg; err: %s. retrying", err.Error())) - } - } -} - -type LeaveRoom struct { - interceptor.BaseMessage - RoomID types.RoomID -} - -func (m *LeaveRoom) GetProtocol() message.Protocol { - return LeaveRoomProtocol -} - -func (m *LeaveRoom) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error { - i, ok := _i.(*chat.ServerInterceptor) - if !ok { - return errors.ErrInterfaceMisMatch - } - - s, err := i.GetState(connection) - if err != nil { - return err - } - - return i.Rooms.Process(m, s) -} - -func (m *LeaveRoom) Process(p interfaces.Processor, s *state.State) error { - r, ok := p.(interfaces.CanRemove) - if !ok { - return errors.ErrInterfaceMisMatch - } - - return r.Remove(m.RoomID, s) -} diff --git a/pkg/middleware/chat/messages/success_join_room.go b/pkg/middleware/chat/messages/success_join_room.go new file mode 100644 index 0000000..864fa31 --- /dev/null +++ b/pkg/middleware/chat/messages/success_join_room.go @@ -0,0 +1,43 @@ +package messages + +import ( + "github.com/harshabose/socket-comm/pkg/interceptor" + "github.com/harshabose/socket-comm/pkg/message" + "github.com/harshabose/socket-comm/pkg/middleware/chat" + "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" + "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" + "github.com/harshabose/socket-comm/pkg/middleware/chat/state" + "github.com/harshabose/socket-comm/pkg/middleware/chat/types" +) + +var SuccessJoinRoomProtocol message.Protocol = "room:success_join_room" + +// SuccessJoinRoom is the message sent by the server to the clients (including the requested client and roommates) +// when the client joins a room successfully. +type SuccessJoinRoom struct { + interceptor.BaseMessage + RoomID types.RoomID `json:"room_id"` + ClientID types.ClientID `json:"client_id"` +} + +func (m *SuccessJoinRoom) GetProtocol() message.Protocol { + return SuccessJoinRoomProtocol +} + +func (m *SuccessJoinRoom) ReadProcess(_i interceptor.Interceptor, _ interceptor.Connection) error { + i, ok := _i.(*chat.ClientInterceptor) + if !ok { + return errors.ErrInterfaceMisMatch + } + + return i.Health.Process(m, nil) +} + +func (m *SuccessJoinRoom) Process(p interfaces.Processor, _ *state.State) error { + a, ok := p.(interfaces.CanAddHealth) + if !ok { + return errors.ErrInterfaceMisMatch + } + // NOTE: MIGHT FAIL IF THE ROOM CREATION MESSAGE IS NOT RECEIVED BY THE CLIENT BEFORE THIS MESSAGE IS SENT. + return a.Add(m.RoomID, m.ClientID) +} diff --git a/pkg/middleware/chat/messages/to_forward.go b/pkg/middleware/chat/messages/to_forward.go new file mode 100644 index 0000000..0ea5486 --- /dev/null +++ b/pkg/middleware/chat/messages/to_forward.go @@ -0,0 +1,65 @@ +package messages + +import ( + "fmt" + + "github.com/harshabose/socket-comm/pkg/interceptor" + "github.com/harshabose/socket-comm/pkg/message" + "github.com/harshabose/socket-comm/pkg/middleware/chat" + "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" + "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" + "github.com/harshabose/socket-comm/pkg/middleware/chat/state" + "github.com/harshabose/socket-comm/pkg/middleware/chat/types" +) + +var ForwardMessageProtocol message.Protocol = "room:forward_message" + +type ToForward struct { + interceptor.BaseMessage + RoomID types.RoomID `json:"room_id"` + To []types.ClientID `json:"to"` +} + +func (m *ToForward) GetProtocol() message.Protocol { + return ForwardMessageProtocol +} + +func (m *ToForward) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error { + i, ok := _i.(*chat.ServerInterceptor) + if !ok { + return errors.ErrInterfaceMisMatch + } + s, err := i.GetState(connection) + if err != nil { + return err + } + + return i.Rooms.Process(m, s) +} + +func (m *ToForward) Process(p interfaces.Processor, s *state.State) error { + msg, err := newForwardedMessage(m) + if err != nil { + return err + } + + clientID, err := s.GetClientID() + if err != nil { + return err + } + + if types.ClientID(m.CurrentHeader.Sender) != clientID { + return fmt.Errorf("error while read processing 'ToForward'; From and ClientID did not match") + } + + w, ok := p.(interfaces.CanWriteRoomMessage) + if !ok { + return errors.ErrInterfaceMisMatch + } + + if err := w.WriteRoomMessage(m.RoomID, msg, clientID, m.To...); err != nil { + return err + } + + return nil +} diff --git a/pkg/middleware/chat/processors/health.go b/pkg/middleware/chat/processors/health.go index ee7a394..1533844 100644 --- a/pkg/middleware/chat/processors/health.go +++ b/pkg/middleware/chat/processors/health.go @@ -1,13 +1,56 @@ package processors import ( + "context" + "fmt" + "sync" + + "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" + "github.com/harshabose/socket-comm/pkg/middleware/chat/health" "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" "github.com/harshabose/socket-comm/pkg/middleware/chat/state" + "github.com/harshabose/socket-comm/pkg/middleware/chat/types" ) type Health struct { + health map[types.RoomID]*health.Health + mux sync.RWMutex + ctx context.Context } +// ============================================================================== +// ================================ CORE METHODS ================================ +// ============================================================================== + +func (p *Health) CreateHealth(roomid types.RoomID, allowed []types.ClientID) (*health.Health, error) { + if p.exists(roomid) { + return nil, fmt.Errorf("error while creating h with id %s; err: %s", roomid, errors.ErrRoomAlreadyExists) + } + + h := health.NewHealth(p.ctx, roomid, allowed) + + p.health[roomid] = h + return h, nil +} + +func (p *Health) DeleteHealth(roomid types.RoomID) error { + if !p.exists(roomid) { + return fmt.Errorf("error while deleting h with id: %s; err: %s", roomid, errors.ErrRoomNotFound) + } + // TODO: DO I NEED TO CLOSE THE HEALTH? + delete(p.health, roomid) + return nil +} + +func (p *Health) exists(roomid types.RoomID) bool { + _, exists := p.health[roomid] + return exists +} + +// ============================================================================== +// ========================== INTERFACE IMPLEMENTATIONS ========================= +// ============================================================================== + func (p *Health) Process(process interfaces.CanBeProcessed, state *state.State) error { return process.Process(p, state) } @@ -16,6 +59,66 @@ func (p *Health) ProcessBackground(process interfaces.CanBeProcessedBackground, return process.ProcessBackground(p, state) } -func (p *Health) GetSnapshot() { +func (p *Health) GetHealthSnapshot(roomid types.RoomID) (health.Snapshot, error) { + p.mux.RLock() + defer p.mux.RUnlock() + if !p.exists(roomid) { + return health.Snapshot{}, fmt.Errorf("error while getting snapshot for room with id: %s; err: %s", roomid, errors.ErrRoomNotFound) + } + + h := p.health[roomid] + + // NOTE: FOLLOWING IS DEEP-COPYING SNAPSHOT + snapshot := health.Snapshot{ + Roomid: h.Roomid, + Allowed: make([]types.ClientID, len(h.Allowed)), + Participants: make(map[types.ClientID]*health.Stat, len(h.Participants)), + } + + copy(snapshot.Allowed, h.Allowed) + + for id, stat := range h.Participants { + if stat != nil { + statCopy := *stat + snapshot.Participants[id] = &statCopy + } else { + snapshot.Participants[id] = nil + } + } + + return snapshot, nil +} + +func (p *Health) Add(roomid types.RoomID, id types.ClientID) error { + p.mux.Lock() + defer p.mux.Unlock() + + if !p.exists(roomid) { + return fmt.Errorf("error while adding participant with id: %s; err: %s", id, errors.ErrRoomNotFound) + } + + return p.health[roomid].Add(roomid, id) +} + +func (p *Health) Remove(roomid types.RoomID, id types.ClientID) error { + p.mux.Lock() + defer p.mux.Unlock() + + if !p.exists(roomid) { + return fmt.Errorf("error while removing participant with id: %s; err: %s", id, errors.ErrRoomNotFound) + } + + return p.health[roomid].Remove(roomid, id) +} + +func (p *Health) Update(roomid types.RoomID, id types.ClientID, stat *health.Stat) error { + p.mux.Lock() + defer p.mux.Unlock() + + if !p.exists(roomid) { + return fmt.Errorf("error while updating participant with id: %s; err: %s", id, errors.ErrRoomNotFound) + } + + return p.health[roomid].Update(roomid, id, stat) }