almost done with chat interceptor. Implement message registories mechanism

This commit is contained in:
harshabose
2025-05-23 11:02:30 +05:30
parent 3bc71a1627
commit c1b249bc1f
67 changed files with 923 additions and 521 deletions

20
pkg/interceptor/errors.go Normal file
View File

@@ -0,0 +1,20 @@
package interceptor
import "errors"
// COMMON ERRORS
var (
ErrContextCancelled = errors.New("context cancelled")
ErrInterfaceMisMatch = errors.New("unsatisfied interface triggered")
ErrProcessExecutionStopped = errors.New("process execution stopped manually")
ErrClientIDNotConsistent = errors.New("client id is not consistent throughout the connection")
ErrConnectionNotFound = errors.New("connection not registered")
ErrConnectionExists = errors.New("connection already exists")
ErrInvalidInterceptor = errors.New("inappropriate interceptor for the message")
)
func NewError(text string) error {
return errors.New(text)
}

View File

@@ -50,7 +50,7 @@ type Connection interface {
}
type Interceptor interface {
ID() string
ID() ClientID
Ctx() context.Context
@@ -90,12 +90,12 @@ func (f WriterFunc) Write(ctx context.Context, connection Connection, message me
}
type NoOpInterceptor struct {
iD string
iD ClientID
messageRegistry message.Registry
ctx context.Context
}
func NewNoOpInterceptor(ctx context.Context, id string, registry message.Registry) NoOpInterceptor {
func NewNoOpInterceptor(ctx context.Context, id ClientID, registry message.Registry) NoOpInterceptor {
return NoOpInterceptor{
ctx: ctx,
iD: id,
@@ -107,7 +107,7 @@ func (interceptor *NoOpInterceptor) Ctx() context.Context {
return interceptor.ctx
}
func (interceptor *NoOpInterceptor) ID() string {
func (interceptor *NoOpInterceptor) ID() ClientID {
return interceptor.iD
}

View File

@@ -0,0 +1,26 @@
package interceptor
import "context"
type CanProcess interface {
Process(context.Context, CanBeProcessed, State) error
}
type CanProcessBackground interface {
ProcessBackground(context.Context, CanBeProcessedBackground, State) CanBeProcessedBackground
}
type Processor interface {
CanProcess
CanProcessBackground
}
type CanBeProcessed interface {
Process(context.Context, CanProcess, State) error
}
type CanBeProcessedBackground interface {
ProcessBackground(context.Context, CanProcessBackground, State) CanBeProcessedBackground
Wait() error
Stop()
}

14
pkg/interceptor/state.go Normal file
View File

@@ -0,0 +1,14 @@
package interceptor
import (
"context"
"github.com/harshabose/socket-comm/pkg/message"
)
type State interface {
Ctx() context.Context
GetClientID() (ClientID, error)
SetClientID(id ClientID) error
Write(ctx context.Context, msg message.Message) error
}

9
pkg/interceptor/types.go Normal file
View File

@@ -0,0 +1,9 @@
package interceptor
type (
ClientID string
)
const (
UnknownClientID ClientID = "unknown"
)

View File

@@ -6,13 +6,12 @@ import (
"time"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/process"
)
type ClientInterceptor struct {
*commonInterceptor
Health interfaces.Processor
Health interceptor.Processor
}
func (i *ClientInterceptor) BindSocketConnection(connection interceptor.Connection, writer interceptor.Writer, reader interceptor.Reader) (interceptor.Writer, interceptor.Reader, error) {

View File

@@ -5,8 +5,6 @@ import (
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
)
@@ -73,7 +71,7 @@ func (i *commonInterceptor) InterceptSocketReader(reader interceptor.Reader) int
m, ok := msg.(interceptor.Message)
if !ok {
return msg, errors.ErrInterfaceMisMatch
return msg, interceptor.ErrInterfaceMisMatch
}
next, err := m.GetNext(i.readProcessMessages)
@@ -102,10 +100,10 @@ func (i *commonInterceptor) GetState(connection interceptor.Connection) (*state.
return i.states.GetState(connection)
}
func (i *commonInterceptor) Process(ctx context.Context, process interfaces.CanBeProcessed, state *state.State) error {
return process.Process(ctx, i, state)
func (i *commonInterceptor) Process(ctx context.Context, process interceptor.CanBeProcessed, s interceptor.State) error {
return process.Process(ctx, i, s)
}
func (i *commonInterceptor) ProcessBackground(ctx context.Context, process interfaces.CanBeProcessedBackground, state *state.State) interfaces.CanBeProcessedBackground {
return process.ProcessBackground(ctx, i, state)
func (i *commonInterceptor) ProcessBackground(ctx context.Context, process interceptor.CanBeProcessedBackground, s interceptor.State) interceptor.CanBeProcessedBackground {
return process.ProcessBackground(ctx, i, s)
}

View File

@@ -1,28 +1,16 @@
package errors
import "errors"
var (
ErrContextCancelled = errors.New("context cancelled")
ErrInterfaceMisMatch = errors.New("unsatisfied interface triggered")
ErrMessageForServerOnly = errors.New("message should only be processed by server")
ErrMessageForClientOnly = errors.New("message should only be processed by client")
ErrClientIDNotConsistent = errors.New("client id is not consistent throughout the connection")
ErrProcessExecutionStopped = errors.New("process execution stopped manually")
ErrConnectionNotFound = errors.New("connection not registered")
ErrConnectionExists = errors.New("connection already exists")
ErrInvalidInterceptor = errors.New("inappropriate interceptor for the payload")
ErrRoomNotFound = errors.New("room does not exists")
ErrRoomAlreadyExists = errors.New("room already exists")
ErrUnknownClientIDState = errors.New("client id not known at the moment")
ErrClientNotAllowed = errors.New("client is not allowed in the room")
ErrClientIsAlreadyParticipant = errors.New("client is already a participant in the room")
ErrClientNotAParticipant = errors.New("client is not a participant in the room at the moment")
ErrWrongRoom = errors.New("operation not permitted as room id did not match")
import (
"github.com/harshabose/socket-comm/pkg/interceptor"
)
func New(text string) error {
return errors.New(text)
}
var (
ErrRoomNotFound = interceptor.NewError("room does not exists")
ErrRoomAlreadyExists = interceptor.NewError("room already exists")
ErrUnknownClientIDState = interceptor.NewError("client id not known at the moment")
ErrClientNotAllowed = interceptor.NewError("client is not allowed in the room")
ErrClientIsAlreadyParticipant = interceptor.NewError("client is already a participant in the room")
ErrClientNotAParticipant = interceptor.NewError("client is not a participant in the room at the moment")
ErrWrongRoom = interceptor.NewError("operation not permitted as room id did not match")
)

View File

@@ -7,6 +7,7 @@ import (
"sync"
"time"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
@@ -28,10 +29,10 @@ type Health struct {
}
type Snapshot struct {
Roomid types.RoomID `json:"roomid"` // NOTE: ADDED WHEN HEALTH IS CREATED.
Allowed []types.ClientID `json:"allowed"`
TTL time.Duration `json:"ttl"`
Participants map[types.ClientID]*Stat `json:"participants"` // NOTE: ADDED WHEN CLIENT JOINS. UPDATED WHEN CLIENT SENDS HEALTH RESPONSE.
Roomid types.RoomID `json:"roomid"` // NOTE: ADDED WHEN HEALTH IS CREATED.
Allowed []interceptor.ClientID `json:"allowed"`
TTL time.Duration `json:"ttl"`
Participants map[interceptor.ClientID]*Stat `json:"participants"` // NOTE: ADDED WHEN CLIENT JOINS. UPDATED WHEN CLIENT SENDS HEALTH RESPONSE.
}
// Marshal marshals the health struct into a json byte array.
@@ -40,14 +41,14 @@ func (h *Snapshot) Marshal() ([]byte, error) {
return json.Marshal(h)
}
func NewHealth(ctx context.Context, id types.RoomID, allowed []types.ClientID, ttl time.Duration) *Health {
func NewHealth(ctx context.Context, id types.RoomID, allowed []interceptor.ClientID, ttl time.Duration) *Health {
ctx2, cancel := context.WithTimeout(ctx, ttl)
h := &Health{
Snapshot: Snapshot{
Roomid: id,
Allowed: allowed,
TTL: ttl,
Participants: make(map[types.ClientID]*Stat),
Participants: make(map[interceptor.ClientID]*Stat),
},
cancel: cancel,
ctx: ctx2,
@@ -84,14 +85,14 @@ func (h *Health) GetTTL() time.Duration {
return h.TTL
}
func (h *Health) GetAllowed() []types.ClientID {
func (h *Health) GetAllowed() []interceptor.ClientID {
h.mux.RLock()
defer h.mux.RUnlock()
return h.Allowed
}
func (h *Health) Add(roomid types.RoomID, id types.ClientID) error {
func (h *Health) Add(roomid types.RoomID, id interceptor.ClientID) error {
h.mux.Lock()
defer h.mux.Unlock()
@@ -101,7 +102,7 @@ func (h *Health) Add(roomid types.RoomID, id types.ClientID) error {
select {
case <-h.ctx.Done():
return fmt.Errorf("error while adding client to health stats. client id: %s; room id: %s; err: %s", id, h.Roomid, errors.ErrContextCancelled.Error())
return fmt.Errorf("error while adding client to health stats. client id: %s; room id: %s; err: %s", id, h.Roomid, interceptor.ErrContextCancelled.Error())
default:
if !h.IsAllowed(id) {
return fmt.Errorf("client with id '%s' is not allowed in the health room with id '%s'; err: %s", id, h.Roomid, errors.ErrClientNotAllowed.Error())
@@ -116,7 +117,7 @@ func (h *Health) Add(roomid types.RoomID, id types.ClientID) error {
}
}
func (h *Health) Remove(roomid types.RoomID, id types.ClientID) error {
func (h *Health) Remove(roomid types.RoomID, id interceptor.ClientID) error {
h.mux.Lock()
defer h.mux.Unlock()
@@ -126,7 +127,7 @@ func (h *Health) Remove(roomid types.RoomID, id types.ClientID) error {
select {
case <-h.ctx.Done():
return fmt.Errorf("error while removing client to health stats. client id: %s; room id: %s; err: %s", id, h.Roomid, errors.ErrContextCancelled.Error())
return fmt.Errorf("error while removing client to health stats. client id: %s; room id: %s; err: %s", id, h.Roomid, interceptor.ErrContextCancelled.Error())
default:
if !h.IsAllowed(id) {
return fmt.Errorf("client with id '%s' is not allowed in the health room with id '%s'; err: %s", id, h.Roomid, errors.ErrClientNotAllowed.Error())
@@ -141,7 +142,7 @@ func (h *Health) Remove(roomid types.RoomID, id types.ClientID) error {
}
}
func (h *Health) Update(roomid types.RoomID, id types.ClientID, s *Stat) error {
func (h *Health) Update(roomid types.RoomID, id interceptor.ClientID, s *Stat) error {
h.mux.Lock()
defer h.mux.Unlock()
@@ -151,7 +152,7 @@ func (h *Health) Update(roomid types.RoomID, id types.ClientID, s *Stat) error {
select {
case <-h.ctx.Done():
return fmt.Errorf("error while adding client to health stats. client id: %s; room id: %s; err: %s", id, h.Roomid, errors.ErrContextCancelled.Error())
return fmt.Errorf("error while adding client to health stats. client id: %s; room id: %s; err: %s", id, h.Roomid, interceptor.ErrContextCancelled.Error())
default:
if !h.IsAllowed(id) {
return fmt.Errorf("client with id '%s' is not allowed in the health room with id '%s'; err: %s", id, h.Roomid, errors.ErrClientNotAllowed.Error())
@@ -174,15 +175,15 @@ func (h *Health) Close() error {
select {
case <-h.ctx.Done():
return fmt.Errorf("error while closing the health room with id %s; err: %s", h.Roomid, errors.ErrContextCancelled.Error())
return fmt.Errorf("error while closing the health room with id %s; err: %s", h.Roomid, interceptor.ErrContextCancelled.Error())
default:
h.Allowed = make([]types.ClientID, 0)
h.Participants = make(map[types.ClientID]*Stat)
h.Allowed = make([]interceptor.ClientID, 0)
h.Participants = make(map[interceptor.ClientID]*Stat)
return nil
}
}
func (h *Health) IsParticipant(id types.ClientID) bool {
func (h *Health) IsParticipant(id interceptor.ClientID) bool {
select {
case <-h.ctx.Done():
return false
@@ -192,7 +193,7 @@ func (h *Health) IsParticipant(id types.ClientID) bool {
}
}
func (h *Health) IsAllowed(id types.ClientID) bool {
func (h *Health) IsAllowed(id interceptor.ClientID) bool {
select {
case <-h.ctx.Done():
return false

View File

@@ -3,21 +3,21 @@ package interfaces
import (
"time"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/middleware/chat/health"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
type CanAddHealth interface {
Add(roomid types.RoomID, id types.ClientID) error
Add(roomid types.RoomID, id interceptor.ClientID) error
}
type CanRemoveHealth interface {
Remove(roomid types.RoomID, id types.ClientID) error
Remove(roomid types.RoomID, id interceptor.ClientID) error
}
type CanUpdate interface {
Update(roomid types.RoomID, id types.ClientID, s *health.Stat) error
Update(roomid types.RoomID, id interceptor.ClientID, s *health.Stat) error
}
type CanGetHealth interface {
@@ -25,15 +25,15 @@ type CanGetHealth interface {
}
type CanAddHealthSnapshotStreamer interface {
AddHealthSnapshotStreamer(types.RoomID, time.Duration, *state.State) error
AddHealthSnapshotStreamer(types.RoomID, interceptor.State, interceptor.CanBeProcessedBackground) error
}
type CanRemoveHealthSnapshotStreamer interface {
RemoveHealthSnapshotStreamer(types.RoomID, *state.State) error
RemoveHealthSnapshotStreamer(types.RoomID, interceptor.State) error
}
type CanCreateHealth interface {
CreateHealth(types.RoomID, []types.ClientID, time.Duration) (*health.Health, error)
CreateHealth(types.RoomID, []interceptor.ClientID, time.Duration) (*health.Health, error)
}
type CanDeleteHealth interface {

View File

@@ -1,30 +0,0 @@
package interfaces
import (
"context"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
)
type CanProcess interface {
Process(context.Context, CanBeProcessed, *state.State) error
}
type CanProcessBackground interface {
ProcessBackground(context.Context, CanBeProcessedBackground, *state.State) CanBeProcessedBackground
}
type Processor interface {
CanProcess
CanProcessBackground
}
type CanBeProcessed interface {
Process(context.Context, Processor, *state.State) error
}
type CanBeProcessedBackground interface {
ProcessBackground(context.Context, Processor, *state.State) CanBeProcessedBackground
Wait() error
Stop()
}

View File

@@ -3,18 +3,18 @@ package interfaces
import (
"time"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat/room"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
type CanAdd interface {
Add(types.RoomID, *state.State) error
Add(types.RoomID, interceptor.State) error
}
type CanRemove interface {
Remove(types.RoomID, *state.State) error
Remove(types.RoomID, interceptor.State) error
}
type CanGetRoom interface {
@@ -22,11 +22,11 @@ type CanGetRoom interface {
}
type CanWriteRoomMessage interface {
WriteRoomMessage(roomid types.RoomID, msg message.Message, from types.ClientID, tos ...types.ClientID) error
WriteRoomMessage(roomid types.RoomID, msg message.Message, from interceptor.ClientID, tos ...interceptor.ClientID) error
}
type CanCreateRoom interface {
CreateRoom(types.RoomID, []types.ClientID, time.Duration) (*room.Room, error)
CreateRoom(types.RoomID, []interceptor.ClientID, time.Duration) (*room.Room, error)
}
type CanDeleteRoom interface {
@@ -34,7 +34,7 @@ type CanDeleteRoom interface {
}
type CanStartHealthTracking interface {
StartHealthTracking(types.RoomID, time.Duration) error
StartHealthTracking(types.RoomID, time.Duration, interceptor.CanBeProcessedBackground) error
IsHealthTracked(types.RoomID) (bool, error)
}

View File

@@ -1,23 +0,0 @@
package interfaces
import (
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
)
type CanGetState interface {
GetState(interceptor.Connection) (*state.State, error)
}
type CanSetState interface {
SetState(interceptor.Connection, *state.State) error
}
type CanRemoveState interface {
RemoveState(connection interceptor.Connection) error
}
type CanWriteMessage interface {
Write(message.Message) error
}

View File

@@ -6,11 +6,10 @@ import (
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/process"
)
var CreateRoomProtocol message.Protocol = "room:create_room"
const CreateRoomProtocol message.Protocol = "room:create_room"
// CreateRoom is the message sent by the client to the server when the client wants to create a room.
// When received by the server, the server will create a room with the given room id and allowed clients.
@@ -29,7 +28,8 @@ func (m *CreateRoom) GetProtocol() message.Protocol {
func (m *CreateRoom) ReadProcess(ctx context.Context, _i interceptor.Interceptor, connection interceptor.Connection) error {
i, ok := _i.(*chat.ServerInterceptor)
if !ok {
return errors.ErrInterfaceMisMatch
// NOTE: CANNOT SEND FAIL MESSAGE AS STATE IS NOT DISCOVERED
return interceptor.ErrInterfaceMisMatch
}
s, err := i.GetState(connection)
@@ -38,6 +38,7 @@ func (m *CreateRoom) ReadProcess(ctx context.Context, _i interceptor.Interceptor
}
if err := i.Rooms.Process(ctx, m, s); err != nil {
_ = process.NewSendMessage(NewFailCreateRoomMessageFactory(m.RoomID, err)).Process(ctx, nil, s)
return err
}

View File

@@ -6,11 +6,10 @@ import (
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/process"
)
var DeleteRoomProtocol message.Protocol = "room:delete_room"
const DeleteRoomProtocol message.Protocol = "room:delete_room"
type DeleteRoom struct {
interceptor.BaseMessage
@@ -24,7 +23,7 @@ func (m *DeleteRoom) GetProtocol() message.Protocol {
func (m *DeleteRoom) ReadProcess(ctx context.Context, _i interceptor.Interceptor, connection interceptor.Connection) error {
i, ok := _i.(*chat.ServerInterceptor)
if !ok {
return errors.ErrInterfaceMisMatch
return interceptor.ErrInterfaceMisMatch
}
s, err := i.GetState(connection)
@@ -33,6 +32,7 @@ func (m *DeleteRoom) ReadProcess(ctx context.Context, _i interceptor.Interceptor
}
if err := i.Rooms.Process(ctx, m, s); err != nil {
_ = process.NewSendMessage(NewFailDeleteRoomMessageFactory(m.RoomID, err)).Process(ctx, nil, s)
return err
}

View File

@@ -0,0 +1,56 @@
package messages
import (
"context"
"fmt"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
const FailCreateRoomProtocol message.Protocol = "room:fail_create_room"
type FailCreateRoom struct {
interceptor.BaseMessage
RoomID types.RoomID `json:"room_id"`
Error string `json:"error"`
}
func NewFailCreateRoomMessage(id types.RoomID, err error) (*FailCreateRoom, error) {
msg := &FailCreateRoom{
RoomID: id,
Error: err.Error(),
}
bmsg, err := interceptor.NewBaseMessage(message.NoneProtocol, nil, msg)
if err != nil {
return nil, err
}
msg.BaseMessage = bmsg
return msg, nil
}
func NewFailCreateRoomMessageFactory(id types.RoomID, err error) func() (message.Message, error) {
return func() (message.Message, error) {
return NewFailCreateRoomMessage(id, err)
}
}
func (m *FailCreateRoom) GetProtocol() message.Protocol {
return FailCreateRoomProtocol
}
func (m *FailCreateRoom) ReadProcess(_ context.Context, _i interceptor.Interceptor, _ interceptor.Connection) error {
_, ok := _i.(*chat.ClientInterceptor)
if !ok {
return interceptor.ErrInvalidInterceptor
}
fmt.Println("failed to create room:", m.Error)
// NOTE: INTENTIONALLY EMPTY
return nil
}

View File

@@ -0,0 +1,56 @@
package messages
import (
"context"
"fmt"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
const FailDeleteRoomProtocol message.Protocol = "room:fail_delete_room"
type FailDeleteRoom struct {
interceptor.BaseMessage
RoomID types.RoomID `json:"room_id"`
Error string `json:"error"`
}
func NewFailDeleteRoomMessage(id types.RoomID, err error) (*FailDeleteRoom, error) {
msg := &FailDeleteRoom{
RoomID: id,
Error: err.Error(),
}
bmsg, err := interceptor.NewBaseMessage(message.NoneProtocol, nil, msg)
if err != nil {
return nil, err
}
msg.BaseMessage = bmsg
return msg, nil
}
func NewFailDeleteRoomMessageFactory(id types.RoomID, err error) func() (message.Message, error) {
return func() (message.Message, error) {
return NewFailDeleteRoomMessage(id, err)
}
}
func (m *FailDeleteRoom) GetProtocol() message.Protocol {
return FailDeleteRoomProtocol
}
func (m *FailDeleteRoom) ReadProcess(_ context.Context, _i interceptor.Interceptor, _ interceptor.Connection) error {
_, ok := _i.(*chat.ClientInterceptor)
if !ok {
return interceptor.ErrInvalidInterceptor
}
fmt.Println("failed to create room:", m.Error)
// NOTE: INTENTIONALLY EMPTY
return nil
}

View File

@@ -0,0 +1,56 @@
package messages
import (
"context"
"fmt"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
const FailJoinRoomProtocol message.Protocol = "room:fail_join_room"
type FailJoinRoom struct {
interceptor.BaseMessage
RoomID types.RoomID `json:"room_id"`
Error string `json:"error"`
}
func NewFailJoinRoomMessage(id types.RoomID, err error) (*FailJoinRoom, error) {
msg := &FailJoinRoom{
RoomID: id,
Error: err.Error(),
}
bmsg, err := interceptor.NewBaseMessage(message.NoneProtocol, nil, msg)
if err != nil {
return nil, err
}
msg.BaseMessage = bmsg
return msg, nil
}
func NewFailJoinRoomMessageFactory(id types.RoomID, err error) func() (message.Message, error) {
return func() (message.Message, error) {
return NewFailJoinRoomMessage(id, err)
}
}
func (m *FailJoinRoom) GetProtocol() message.Protocol {
return FailJoinRoomProtocol
}
func (m *FailJoinRoom) ReadProcess(_ context.Context, _i interceptor.Interceptor, _ interceptor.Connection) error {
_, ok := _i.(*chat.ClientInterceptor)
if !ok {
return interceptor.ErrInvalidInterceptor
}
fmt.Println("failed to join room:", m.Error)
// NOTE: INTENTIONALLY EMPTY
return nil
}

View File

@@ -0,0 +1,56 @@
package messages
import (
"context"
"fmt"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
const FailLeaveRoomProtocol message.Protocol = "room:fail_leave_room"
type FailLeaveRoom struct {
interceptor.BaseMessage
RoomID types.RoomID `json:"room_id"`
Error string `json:"error"`
}
func NewFailLeaveRoomMessage(id types.RoomID, err error) (*FailLeaveRoom, error) {
msg := &FailLeaveRoom{
RoomID: id,
Error: err.Error(),
}
bmsg, err := interceptor.NewBaseMessage(message.NoneProtocol, nil, msg)
if err != nil {
return nil, err
}
msg.BaseMessage = bmsg
return msg, nil
}
func NewFailLeaveRoomMessageFactory(id types.RoomID, err error) func() (message.Message, error) {
return func() (message.Message, error) {
return NewFailLeaveRoomMessage(id, err)
}
}
func (m *FailLeaveRoom) GetProtocol() message.Protocol {
return FailLeaveRoomProtocol
}
func (m *FailLeaveRoom) ReadProcess(_ context.Context, _i interceptor.Interceptor, _ interceptor.Connection) error {
_, ok := _i.(*chat.ClientInterceptor)
if !ok {
return interceptor.ErrInvalidInterceptor
}
fmt.Println("failed to leave room:", m.Error)
// NOTE: INTENTIONALLY EMPTY
return nil
}

View File

@@ -0,0 +1,56 @@
package messages
import (
"context"
"fmt"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
const FailStartHealthTrackingProtocol message.Protocol = "chat:fail_track_health"
type FailStartHealthTracking struct {
interceptor.BaseMessage
RoomID types.RoomID `json:"room_id"`
Error string `json:"error"`
}
func NewFailStartHealthTrackingMessage(id types.RoomID, err error) (*FailStartHealthTracking, error) {
msg := &FailStartHealthTracking{
RoomID: id,
Error: err.Error(),
}
bmsg, err := interceptor.NewBaseMessage(message.NoneProtocol, nil, msg)
if err != nil {
return nil, err
}
msg.BaseMessage = bmsg
return msg, nil
}
func NewFailStartHealthTrackingMessageFactory(id types.RoomID, err error) func() (message.Message, error) {
return func() (message.Message, error) {
return NewFailStartHealthTrackingMessage(id, err)
}
}
func (m *FailStartHealthTracking) GetProtocol() message.Protocol {
return FailStartHealthTrackingProtocol
}
func (m *FailStartHealthTracking) ReadProcess(_ context.Context, _i interceptor.Interceptor, _ interceptor.Connection) error {
_, ok := _i.(*chat.ClientInterceptor)
if !ok {
return interceptor.ErrInvalidInterceptor
}
fmt.Println("failed to start health tracking:", m.Error)
// NOTE: INTENTIONALLY EMPTY
return nil
}

View File

@@ -0,0 +1,56 @@
package messages
import (
"context"
"fmt"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
const FailStartHealthStreamingProtocol message.Protocol = "chat:fail_get_health_snapshot"
type FailStartHealthStreaming struct {
interceptor.BaseMessage
RoomID types.RoomID `json:"room_id"`
Error string `json:"error"`
}
func NewFailStartHealthStreamingMessage(id types.RoomID, err error) (*FailStartHealthStreaming, error) {
msg := &FailStartHealthStreaming{
RoomID: id,
Error: err.Error(),
}
bmsg, err := interceptor.NewBaseMessage(message.NoneProtocol, nil, msg)
if err != nil {
return nil, err
}
msg.BaseMessage = bmsg
return msg, nil
}
func NewFailStartHealthStreamingMessageFactory(id types.RoomID, err error) func() (message.Message, error) {
return func() (message.Message, error) {
return NewFailStartHealthStreamingMessage(id, err)
}
}
func (m *FailStartHealthStreaming) GetProtocol() message.Protocol {
return FailStartHealthStreamingProtocol
}
func (m *FailStartHealthStreaming) ReadProcess(_ context.Context, _i interceptor.Interceptor, _ interceptor.Connection) error {
_, ok := _i.(*chat.ClientInterceptor)
if !ok {
return interceptor.ErrInvalidInterceptor
}
fmt.Println("failed to start health streaming:", m.Error)
// NOTE: INTENTIONALLY EMPTY
return nil
}

View File

@@ -0,0 +1,56 @@
package messages
import (
"context"
"fmt"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
const FailStopHealthTrackingProtocol message.Protocol = "chat:fail_untrack_health"
type FailStopHealthTracking struct {
interceptor.BaseMessage
RoomID types.RoomID `json:"room_id"`
Error string `json:"error"`
}
func NewFailStopHealthTrackingMessage(id types.RoomID, err error) (*FailStopHealthTracking, error) {
msg := &FailStopHealthTracking{
RoomID: id,
Error: err.Error(),
}
bmsg, err := interceptor.NewBaseMessage(message.NoneProtocol, nil, msg)
if err != nil {
return nil, err
}
msg.BaseMessage = bmsg
return msg, nil
}
func NewFailStopHealthTrackingMessageFactory(id types.RoomID, err error) func() (message.Message, error) {
return func() (message.Message, error) {
return NewFailStopHealthTrackingMessage(id, err)
}
}
func (m *FailStopHealthTracking) GetProtocol() message.Protocol {
return FailStopHealthTrackingProtocol
}
func (m *FailStopHealthTracking) ReadProcess(_ context.Context, _i interceptor.Interceptor, _ interceptor.Connection) error {
_, ok := _i.(*chat.ClientInterceptor)
if !ok {
return interceptor.ErrInvalidInterceptor
}
fmt.Println("failed to stop health tracking:", m.Error)
// NOTE: INTENTIONALLY EMPTY
return nil
}

View File

@@ -0,0 +1,56 @@
package messages
import (
"context"
"fmt"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
const FailStopHealthStreamingProtocol message.Protocol = "chat:fail_stop_streaming_health_snapshot"
type FailStopHealthStreaming struct {
interceptor.BaseMessage
RoomID types.RoomID `json:"room_id"`
Error string `json:"error"`
}
func NewFailStopHealthStreamingMessage(id types.RoomID, err error) (*FailStopHealthStreaming, error) {
msg := &FailStopHealthStreaming{
RoomID: id,
Error: err.Error(),
}
bmsg, err := interceptor.NewBaseMessage(message.NoneProtocol, nil, msg)
if err != nil {
return nil, err
}
msg.BaseMessage = bmsg
return msg, nil
}
func NewFailStopHealthStreamingMessageFactory(id types.RoomID, err error) func() (message.Message, error) {
return func() (message.Message, error) {
return NewFailStopHealthStreamingMessage(id, err)
}
}
func (m *FailStopHealthStreaming) GetProtocol() message.Protocol {
return FailStopHealthStreamingProtocol
}
func (m *FailStopHealthStreaming) ReadProcess(_ context.Context, _i interceptor.Interceptor, _ interceptor.Connection) error {
_, ok := _i.(*chat.ClientInterceptor)
if !ok {
return interceptor.ErrInvalidInterceptor
}
fmt.Println("failed to stop health streaming:", m.Error)
// NOTE: INTENTIONALLY EMPTY
return nil
}

View File

@@ -1,8 +1,11 @@
package messages
import (
"context"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat"
)
var ForwardedMessageProtocol message.Protocol = "room:forwarded_message"
@@ -26,3 +29,15 @@ func newForwardedMessage(forward *ToForward) (*ForwardedMessage, error) {
return msg, nil
}
func (m *ForwardedMessage) ReadProcess(_ context.Context, _i interceptor.Interceptor, _ interceptor.Connection) error {
_, ok := _i.(*chat.ClientInterceptor)
if !ok {
return interceptor.ErrInterfaceMisMatch
}
// NOTE: INTENTIONALLY EMPTY
return nil
// NOTE: RETURNING NIL ASSUMING THAT THE NEXT PAYLOAD WILL BE MARSHALLED AFTER PROCESSING
}

View File

@@ -1,13 +1,12 @@
package messages
import (
"context"
"fmt"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
"github.com/harshabose/socket-comm/pkg/middleware/chat"
)
var (
@@ -23,10 +22,10 @@ func (m *Ident) GetProtocol() message.Protocol {
return IdentProtocol
}
func (m *Ident) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error {
s, ok := _i.(interfaces.CanGetState)
func (m *Ident) ReadProcess(ctx context.Context, _i interceptor.Interceptor, connection interceptor.Connection) error {
s, ok := _i.(*chat.ClientInterceptor)
if !ok {
return fmt.Errorf("error while processing 'Ident' message; err: %s", errors.ErrInterfaceMisMatch.Error())
return fmt.Errorf("error while processing 'Ident' message; err: %s", interceptor.ErrInterfaceMisMatch.Error())
}
ss, err := s.GetState(connection)
@@ -34,11 +33,11 @@ func (m *Ident) ReadProcess(_i interceptor.Interceptor, connection interceptor.C
return fmt.Errorf("error while processing 'Ident' message; err: %s", err.Error())
}
if err := ss.SetClientID(types.ClientID(m.CurrentHeader.Sender)); err != nil {
if err := ss.SetClientID(interceptor.ClientID(m.CurrentHeader.Sender)); err != nil {
return fmt.Errorf("error while processing 'Ident' message; err: %s", err.Error())
}
if err := ss.Write(&IdentResponse{}); err != nil {
if err := ss.Write(ctx, &IdentResponse{}); err != nil {
return fmt.Errorf("error while processing 'Ident' message; err: %s", err.Error())
}
@@ -56,9 +55,9 @@ func (m *IdentResponse) GetProtocol() message.Protocol {
}
func (m *IdentResponse) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error {
s, ok := _i.(interfaces.CanGetState)
s, ok := _i.(*chat.ServerInterceptor)
if !ok {
return fmt.Errorf("error while processing 'Ident' message; err: %s", errors.ErrInterfaceMisMatch.Error())
return fmt.Errorf("error while processing 'Ident' message; err: %s", interceptor.ErrInterfaceMisMatch.Error())
}
ss, err := s.GetState(connection)
@@ -66,7 +65,7 @@ func (m *IdentResponse) ReadProcess(_i interceptor.Interceptor, connection inter
return fmt.Errorf("error while processing 'Ident' message; err: %s", err.Error())
}
if err := ss.SetClientID(types.ClientID(m.CurrentHeader.Sender)); err != nil {
if err := ss.SetClientID(interceptor.ClientID(m.CurrentHeader.Sender)); err != nil {
return fmt.Errorf("error while processing 'Ident' message; err: %s", err.Error())
}

View File

@@ -6,7 +6,6 @@ import (
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/process"
)
@@ -24,7 +23,7 @@ func (m *JoinRoom) GetProtocol() message.Protocol {
func (m *JoinRoom) ReadProcess(ctx context.Context, _i interceptor.Interceptor, connection interceptor.Connection) error {
i, ok := _i.(*chat.ServerInterceptor)
if !ok {
return errors.ErrInterfaceMisMatch
return interceptor.ErrInterfaceMisMatch
}
s, err := i.GetState(connection)
@@ -33,8 +32,9 @@ func (m *JoinRoom) ReadProcess(ctx context.Context, _i interceptor.Interceptor,
}
if err := i.Rooms.Process(ctx, m, s); err != nil {
_ = process.NewSendMessage(NewFailJoinRoomMessageFactory(m.RoomID, err)).Process(ctx, nil, s)
return err
}
return process.NewSendMessageToAllParticipantsInRoom(m.RoomID, NewSuccessJoinRoomMessageFactory(m.RoomID)).Process(ctx, nil, s)
return process.NewSendMessageToAllParticipantsInRoom(m.RoomID, NewSuccessJoinRoomMessageFactory(m.RoomID, interceptor.ClientID(m.GetCurrentHeader().Sender))).Process(ctx, nil, s)
}

View File

@@ -6,7 +6,6 @@ import (
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/process"
)
@@ -24,7 +23,7 @@ func (m *LeaveRoom) GetProtocol() message.Protocol {
func (m *LeaveRoom) ReadProcess(ctx context.Context, _i interceptor.Interceptor, connection interceptor.Connection) error {
i, ok := _i.(*chat.ServerInterceptor)
if !ok {
return errors.ErrInterfaceMisMatch
return interceptor.ErrInterfaceMisMatch
}
s, err := i.GetState(connection)
@@ -33,12 +32,9 @@ func (m *LeaveRoom) ReadProcess(ctx context.Context, _i interceptor.Interceptor,
}
if err := i.Rooms.Process(ctx, m, s); err != nil {
_ = process.NewSendMessage(NewFailLeaveRoomMessageFactory(m.RoomID, err)).Process(ctx, nil, s)
return err
}
if err := process.NewSendMessage(NewSuccessLeaveRoomMessageFactory(m.RoomID)).Process(ctx, nil, s); err != nil {
return err
}
return nil
return process.NewSendMessageToAllParticipantsInRoom(m.RoomID, NewSuccessLeaveRoomMessageFactory(m.RoomID, interceptor.ClientID(m.GetCurrentHeader().Sender))).Process(ctx, nil, s)
}

View File

@@ -7,13 +7,12 @@ import (
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/process"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
var RequestHealthProtocol message.Protocol = "room:request_health"
const RequestHealthProtocol message.Protocol = "room:request_health"
// SendHealthStats is sent by the server to request the health from a client.
// The client then responds with UpdateHealthStat with the health.Stat embedded.
@@ -50,9 +49,9 @@ func (m *SendHealthStats) GetProtocol() message.Protocol {
}
func (m *SendHealthStats) ReadProcess(ctx context.Context, _i interceptor.Interceptor, connection interceptor.Connection) error {
s, ok := _i.(interfaces.CanGetState)
s, ok := _i.(*chat.ClientInterceptor)
if !ok {
return errors.ErrInterfaceMisMatch
return interceptor.ErrInterfaceMisMatch
}
ss, err := s.GetState(connection)
@@ -64,9 +63,9 @@ func (m *SendHealthStats) ReadProcess(ctx context.Context, _i interceptor.Interc
}
func (m *SendHealthStats) WriteProcess(ctx context.Context, _i interceptor.Interceptor, connection interceptor.Connection) error {
s, ok := _i.(interfaces.CanGetState)
s, ok := _i.(*chat.ClientInterceptor)
if !ok {
return errors.ErrInterfaceMisMatch
return interceptor.ErrInterfaceMisMatch
}
ss, err := s.GetState(connection)

View File

@@ -2,23 +2,25 @@ package messages
import (
"context"
"time"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/process"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
var MarkRoomForHealthTrackingProtocol message.Protocol = "chat:track_health"
// StartHealthTracking is sent by an interested client (who wants the stats of the whole room) to start tracking the health status.
// This does not send the stats data to the interested client; this just tells the server to track the data.
// If any client wants the data, they can send a StartHealthTracking to the server.
// If any client wants the data, they can send a _ to the server.
type StartHealthTracking struct {
interceptor.BaseMessage
process.StartHealthTracking
RoomID types.RoomID `json:"room_id"`
Interval time.Duration `json:"interval"`
}
func (m *StartHealthTracking) GetProtocol() message.Protocol {
@@ -28,29 +30,42 @@ func (m *StartHealthTracking) GetProtocol() message.Protocol {
func (m *StartHealthTracking) ReadProcess(ctx context.Context, _i interceptor.Interceptor, connection interceptor.Connection) error {
i, ok := _i.(*chat.ServerInterceptor)
if !ok {
return errors.ErrInterfaceMisMatch
// Can't send fail message here because we don't have a state
return interceptor.ErrInterfaceMisMatch
}
s, err := i.GetState(connection)
if err != nil {
// Can't send fail message here because we don't have a state
return err
}
r, ok := i.Rooms.(interfaces.CanGetRoom)
if !ok {
return errors.ErrInterfaceMisMatch
_ = process.NewSendMessage(NewFailStartHealthTrackingMessageFactory(m.RoomID, interceptor.ErrInterfaceMisMatch)).Process(ctx, nil, s)
return interceptor.ErrInterfaceMisMatch
}
room, err := r.GetRoom(m.RoomID)
if err != nil {
_ = process.NewSendMessage(NewFailStartHealthTrackingMessageFactory(m.RoomID, err)).Process(ctx, nil, s)
return err
}
if err := i.Rooms.Process(ctx, m, nil); err != nil {
t, ok := i.Rooms.(interfaces.CanStartHealthTracking)
if !ok {
_ = process.NewSendMessage(NewFailStartHealthTrackingMessageFactory(m.RoomID, interceptor.ErrInterfaceMisMatch)).Process(ctx, nil, s)
return interceptor.ErrInterfaceMisMatch
}
if err := t.StartHealthTracking(m.RoomID, m.Interval, process.NewSendMessageStreamToAllParticipants(nil, NewRequestHealthFactory(m.RoomID), m.RoomID, m.Interval, room.TTL())); err != nil {
_ = process.NewSendMessage(NewFailStartHealthTrackingMessageFactory(m.RoomID, err)).Process(ctx, nil, s)
return err
}
if err := process.NewCreateHealthRoom(m.RoomID, room.GetAllowed(), room.TTL()).Process(ctx, i.Health, s); err != nil {
healthRoom := process.NewCreateHealthRoom(m.RoomID, room.GetAllowed(), room.TTL())
if err := healthRoom.Process(ctx, i.Health, s); err != nil {
_ = process.NewSendMessage(NewFailStartHealthTrackingMessageFactory(m.RoomID, err)).Process(ctx, nil, s)
return err
}

View File

@@ -2,20 +2,22 @@ package messages
import (
"context"
"time"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/process"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
var GetHealthSnapshotProtocol message.Protocol = "chat:get_health_snapshot"
type StartStreamingHealthSnapshots struct {
interceptor.BaseMessage
process.StartStreamingHealthSnapshots
Roomid types.RoomID `json:"roomid"`
Interval time.Duration `json:"interval"`
}
func (m *StartStreamingHealthSnapshots) GetProtocol() message.Protocol {
@@ -25,7 +27,7 @@ func (m *StartStreamingHealthSnapshots) GetProtocol() message.Protocol {
func (m *StartStreamingHealthSnapshots) ReadProcess(ctx context.Context, _i interceptor.Interceptor, connection interceptor.Connection) error {
i, ok := _i.(*chat.ServerInterceptor)
if !ok {
return errors.ErrInvalidInterceptor
return interceptor.ErrInvalidInterceptor
}
s, err := i.GetState(connection)
@@ -33,21 +35,43 @@ func (m *StartStreamingHealthSnapshots) ReadProcess(ctx context.Context, _i inte
return err
}
if err := i.Health.Process(ctx, m, s); err != nil {
return err
}
r, ok := i.Rooms.(interfaces.CanGetRoom)
if !ok {
return errors.ErrInterfaceMisMatch
_ = process.NewSendMessage(NewFailStartHealthStreamingMessageFactory(m.Roomid, interceptor.ErrInterfaceMisMatch)).Process(ctx, nil, s)
return interceptor.ErrInterfaceMisMatch
}
room, err := r.GetRoom(m.Roomid)
if err != nil {
_ = process.NewSendMessage(NewFailStartHealthStreamingMessageFactory(m.Roomid, err)).Process(ctx, nil, s)
return err
}
if !room.IsRoomMarkedForHealthTracking() {
err := interceptor.NewError("to get snapshots room must first be marked for health tracking")
_ = process.NewSendMessage(NewFailStartHealthStreamingMessageFactory(m.Roomid, err)).Process(ctx, nil, s)
return err
}
h, ok := i.Health.(interfaces.CanAddHealthSnapshotStreamer)
if !ok {
_ = process.NewSendMessage(NewFailStartHealthStreamingMessageFactory(m.Roomid, interceptor.ErrInterfaceMisMatch)).Process(ctx, nil, s)
return interceptor.ErrInterfaceMisMatch
}
g, ok := i.Health.(interfaces.CanGetHealthSnapshot)
if !ok {
_ = process.NewSendMessage(NewFailStartHealthStreamingMessageFactory(m.Roomid, interceptor.ErrInterfaceMisMatch)).Process(ctx, nil, s)
return interceptor.ErrInterfaceMisMatch
}
if err := h.AddHealthSnapshotStreamer(m.Roomid, s, process.NewSendMessageStream(NewUpdateHealthSnapshotMessageFactory(m.Roomid, g), m.Interval)); err != nil {
_ = process.NewSendMessage(NewFailStartHealthStreamingMessageFactory(m.Roomid, err)).Process(ctx, nil, s)
return err
}
if err := process.NewSendMessage(NewSuccessStartHealthStreamingMessageFactory(m.Roomid, room.GetAllowed(), room.TTL())).Process(ctx, nil, s); err != nil {
// do not send a fail message here as failing to send a success message also means failing to send a failure message
return err
}

View File

@@ -6,41 +6,45 @@ import (
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/process"
)
var UntrackHealthInRoomProtocol message.Protocol = "chat:untrack_health"
const UntrackHealthInRoomProtocol message.Protocol = "chat:untrack_health"
type UnMarkRoomForHealthTracking struct {
type StopHealthTracking struct {
interceptor.BaseMessage
process.UnMarkRoomForHealthTracking
process.StopHealthTracking
}
func (m *UnMarkRoomForHealthTracking) GetProtocol() message.Protocol {
func (m *StopHealthTracking) GetProtocol() message.Protocol {
return UntrackHealthInRoomProtocol
}
func (m *UnMarkRoomForHealthTracking) ReadProcess(ctx context.Context, _i interceptor.Interceptor, connection interceptor.Connection) error {
func (m *StopHealthTracking) ReadProcess(ctx context.Context, _i interceptor.Interceptor, connection interceptor.Connection) error {
i, ok := _i.(*chat.ServerInterceptor)
if !ok {
return errors.ErrInvalidInterceptor
// Can't send fail message here because we don't have a state
return interceptor.ErrInvalidInterceptor
}
s, err := i.GetState(connection)
if err != nil {
// Can't send fail message here because we don't have a state
return err
}
if err := i.Rooms.Process(ctx, m, nil); err != nil {
_ = process.NewSendMessage(NewFailStopHealthTrackingMessageFactory(m.RoomID, err)).Process(ctx, nil, s)
return err
}
if err := process.NewDeleteHealthRoom(m.RoomID).Process(ctx, i.Health, s); err != nil {
_ = process.NewSendMessage(NewFailStopHealthTrackingMessageFactory(m.RoomID, err)).Process(ctx, nil, s)
return err
}
if err := process.NewSendMessage(nil).Process(ctx, nil, s); err != nil {
if err := process.NewSendMessage(NewSuccessUntrackHealthInRoomMessageFactory(m.RoomID)).Process(ctx, nil, s); err != nil {
// do not send a fail message here as failing to send a success message also means failing to send a failure message
return err
}

View File

@@ -6,7 +6,6 @@ import (
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/process"
)
@@ -24,19 +23,23 @@ func (m *StopStreamingHealthSnapshot) GetProtocol() message.Protocol {
func (m *StopStreamingHealthSnapshot) ReadProcess(ctx context.Context, _i interceptor.Interceptor, connection interceptor.Connection) error {
i, ok := _i.(*chat.ServerInterceptor)
if !ok {
return errors.ErrInvalidInterceptor
// Can't send fail message here because we don't have a state
return interceptor.ErrInvalidInterceptor
}
s, err := i.GetState(connection)
if err != nil {
// Can't send fail message here because we don't have a state
return err
}
if err := i.Health.Process(ctx, m, s); err != nil {
_ = process.NewSendMessage(NewFailStopHealthStreamingMessageFactory(m.Roomid, err)).Process(ctx, nil, s)
return err
}
if err := process.NewSendMessage(NewSuccessStopHealthStreamingMessageFactory(m.Roomid)).Process(ctx, nil, s); err != nil {
// do not send a fail message here as failing to send a success message also means failing to send a failure message
return err
}

View File

@@ -6,11 +6,10 @@ import (
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
var SuccessJoinRoomProtocol message.Protocol = "room:success_join_room"
const SuccessJoinRoomProtocol message.Protocol = "room:success_join_room"
// SuccessJoinRoom is the message sent by the server to the clients (including the requested client and roommates)
// when the client joins a room successfully.
@@ -18,12 +17,14 @@ var SuccessJoinRoomProtocol message.Protocol = "room:success_join_room"
// This marks the end of the JoinRoom topic.
type SuccessJoinRoom struct {
interceptor.BaseMessage
RoomID types.RoomID `json:"room_id"`
RoomID types.RoomID `json:"room_id"`
ClientID interceptor.ClientID `json:"client_id"`
}
func NewSuccessJoinRoomMessage(id types.RoomID) (*SuccessJoinRoom, error) {
func NewSuccessJoinRoomMessage(id types.RoomID, clientID interceptor.ClientID) (*SuccessJoinRoom, error) {
msg := &SuccessJoinRoom{
RoomID: id,
RoomID: id,
ClientID: clientID,
}
bmsg, err := interceptor.NewBaseMessage(message.NoneProtocol, nil, msg)
@@ -35,9 +36,9 @@ func NewSuccessJoinRoomMessage(id types.RoomID) (*SuccessJoinRoom, error) {
return msg, nil
}
func NewSuccessJoinRoomMessageFactory(id types.RoomID) func() (message.Message, error) {
func NewSuccessJoinRoomMessageFactory(id types.RoomID, clientID interceptor.ClientID) func() (message.Message, error) {
return func() (message.Message, error) {
return NewSuccessJoinRoomMessage(id)
return NewSuccessJoinRoomMessage(id, clientID)
}
}
@@ -48,7 +49,7 @@ func (m *SuccessJoinRoom) GetProtocol() message.Protocol {
func (m *SuccessJoinRoom) ReadProcess(_ context.Context, _i interceptor.Interceptor, _ interceptor.Connection) error {
_, ok := _i.(*chat.ClientInterceptor)
if !ok {
return errors.ErrInvalidInterceptor
return interceptor.ErrInvalidInterceptor
}
// NOTE: INTENTIONALLY EMPTY

View File

@@ -6,7 +6,6 @@ import (
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
@@ -14,12 +13,14 @@ var SuccessLeaveRoomProtocol message.Protocol = "room:success_leave_room"
type SuccessLeaveRoom struct {
interceptor.BaseMessage
RoomID types.RoomID `json:"room_id"`
RoomID types.RoomID `json:"room_id"`
ClientID interceptor.ClientID `json:"client_id"`
}
func NewSuccessLeaveRoomMessage(id types.RoomID) (*SuccessLeaveRoom, error) {
func NewSuccessLeaveRoomMessage(id types.RoomID, clientID interceptor.ClientID) (*SuccessLeaveRoom, error) {
msg := &SuccessLeaveRoom{
RoomID: id,
RoomID: id,
ClientID: clientID,
}
bmsg, err := interceptor.NewBaseMessage(message.NoneProtocol, nil, msg)
@@ -31,9 +32,9 @@ func NewSuccessLeaveRoomMessage(id types.RoomID) (*SuccessLeaveRoom, error) {
return msg, nil
}
func NewSuccessLeaveRoomMessageFactory(id types.RoomID) func() (message.Message, error) {
func NewSuccessLeaveRoomMessageFactory(id types.RoomID, clientID interceptor.ClientID) func() (message.Message, error) {
return func() (message.Message, error) {
return NewSuccessLeaveRoomMessage(id)
return NewSuccessLeaveRoomMessage(id, clientID)
}
}
@@ -44,7 +45,7 @@ func (m *SuccessLeaveRoom) GetProtocol() message.Protocol {
func (m *SuccessLeaveRoom) ReadProcess(_ context.Context, _i interceptor.Interceptor, _ interceptor.Connection) error {
_, ok := _i.(*chat.ClientInterceptor)
if !ok {
return errors.ErrInterfaceMisMatch
return interceptor.ErrInterfaceMisMatch
}
// NOTE: INTENTIONALLY EMPTY

View File

@@ -16,7 +16,7 @@ type SuccessStartHealthStreaming struct {
process.CreateHealthRoom
}
func NewSuccessStartHealthStreamingMessage(id types.RoomID, allowed []types.ClientID, ttl time.Duration) (*SuccessStartHealthStreaming, error) {
func NewSuccessStartHealthStreamingMessage(id types.RoomID, allowed []interceptor.ClientID, ttl time.Duration) (*SuccessStartHealthStreaming, error) {
msg := &SuccessStartHealthStreaming{
CreateHealthRoom: process.NewCreateHealthRoom(id, allowed, ttl),
}
@@ -29,7 +29,7 @@ func NewSuccessStartHealthStreamingMessage(id types.RoomID, allowed []types.Clie
return msg, nil
}
func NewSuccessStartHealthStreamingMessageFactory(id types.RoomID, allowed []types.ClientID, ttl time.Duration) func() (message.Message, error) {
func NewSuccessStartHealthStreamingMessageFactory(id types.RoomID, allowed []interceptor.ClientID, ttl time.Duration) func() (message.Message, error) {
return func() (message.Message, error) {
return NewSuccessStartHealthStreamingMessage(id, allowed, ttl)
}

View File

@@ -6,7 +6,6 @@ import (
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
@@ -46,7 +45,7 @@ func (m *SuccessCreateRoom) GetProtocol() message.Protocol {
func (m *SuccessCreateRoom) ReadProcess(_ context.Context, _i interceptor.Interceptor, _ interceptor.Connection) error {
_, ok := _i.(*chat.ClientInterceptor)
if !ok {
return errors.ErrInvalidInterceptor
return interceptor.ErrInvalidInterceptor
}
// NOTE: INTENTIONALLY EMPTY

View File

@@ -1,14 +1,13 @@
package messages
import (
"context"
"fmt"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
@@ -16,28 +15,28 @@ var ForwardMessageProtocol message.Protocol = "room:forward_message"
type ToForward struct {
interceptor.BaseMessage
RoomID types.RoomID `json:"room_id"`
To []types.ClientID `json:"to"`
RoomID types.RoomID `json:"room_id"`
To []interceptor.ClientID `json:"to"`
}
func (m *ToForward) GetProtocol() message.Protocol {
return ForwardMessageProtocol
}
func (m *ToForward) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error {
func (m *ToForward) ReadProcess(ctx context.Context, _i interceptor.Interceptor, connection interceptor.Connection) error {
i, ok := _i.(*chat.ServerInterceptor)
if !ok {
return errors.ErrInterfaceMisMatch
return interceptor.ErrInterfaceMisMatch
}
s, err := i.GetState(connection)
if err != nil {
return err
}
return i.Rooms.Process(m, s)
return i.Rooms.Process(ctx, m, s)
}
func (m *ToForward) Process(p interfaces.Processor, s *state.State) error {
func (m *ToForward) Process(_ context.Context, p interceptor.CanProcess, s interceptor.State) error {
msg, err := newForwardedMessage(m)
if err != nil {
return err
@@ -48,13 +47,13 @@ func (m *ToForward) Process(p interfaces.Processor, s *state.State) error {
return err
}
if types.ClientID(m.CurrentHeader.Sender) != clientID {
if interceptor.ClientID(m.CurrentHeader.Sender) != clientID {
return fmt.Errorf("error while read processing 'ToForward'; From and ClientID did not match")
}
w, ok := p.(interfaces.CanWriteRoomMessage)
if !ok {
return errors.ErrInterfaceMisMatch
return interceptor.ErrInterfaceMisMatch
}
if err := w.WriteRoomMessage(m.RoomID, msg, clientID, m.To...); err != nil {

View File

@@ -6,15 +6,40 @@ import (
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/process"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
const UpdateHealthSnapshotProtocol message.Protocol = "chat:update_health_snapshot"
type UpdateHealthSnapshot struct {
interceptor.BaseMessage
process.UpdateHealthSnapshot
*process.UpdateHealthSnapshot
}
func NewUpdateHealthSnapshotMessageFactory(roomID types.RoomID, getter interfaces.CanGetHealthSnapshot) func() (message.Message, error) {
return func() (message.Message, error) {
return NewUpdateHealthSnapshotMessage(roomID, getter)
}
}
func NewUpdateHealthSnapshotMessage(roomID types.RoomID, getter interfaces.CanGetHealthSnapshot) (*UpdateHealthSnapshot, error) {
p, err := process.NewUpdateHealthSnapshot(roomID, getter)
if err != nil {
return nil, err
}
msg := &UpdateHealthSnapshot{
UpdateHealthSnapshot: p,
}
bmsg, err := interceptor.NewBaseMessage(message.NoneProtocol, nil, msg)
if err != nil {
return nil, err
}
msg.BaseMessage = bmsg
return msg, nil
}
func (m *UpdateHealthSnapshot) GetProtocol() message.Protocol {
@@ -24,7 +49,7 @@ func (m *UpdateHealthSnapshot) GetProtocol() message.Protocol {
func (m *UpdateHealthSnapshot) ReadProcess(ctx context.Context, _i interceptor.Interceptor, _ interceptor.Connection) error {
i, ok := _i.(*chat.ClientInterceptor)
if !ok {
return errors.ErrInvalidInterceptor
return interceptor.ErrInvalidInterceptor
}
return i.Health.Process(ctx, m, nil)

View File

@@ -12,8 +12,6 @@ import (
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/process"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
@@ -67,7 +65,7 @@ func (m *UpdateHealthStat) GetProtocol() message.Protocol {
func (m *UpdateHealthStat) ReadProcess(ctx context.Context, _i interceptor.Interceptor, connection interceptor.Connection) error {
i, ok := _i.(*chat.ServerInterceptor)
if !ok {
return errors.ErrInterfaceMisMatch
return interceptor.ErrInterfaceMisMatch
}
s, err := i.GetState(connection)
@@ -78,10 +76,10 @@ func (m *UpdateHealthStat) ReadProcess(ctx context.Context, _i interceptor.Inter
return i.Health.Process(ctx, m, s)
}
func (m *UpdateHealthStat) WriteProcess(ctx context.Context, _i interceptor.Interceptor, connection interceptor.Connection) error {
s, ok := _i.(interfaces.CanGetState)
func (m *UpdateHealthStat) WriteProcess(_ context.Context, _i interceptor.Interceptor, connection interceptor.Connection) error {
s, ok := _i.(*chat.ServerInterceptor)
if !ok {
return errors.ErrInterfaceMisMatch
return interceptor.ErrInterfaceMisMatch
}
ss, err := s.GetState(connection)

View File

@@ -3,9 +3,8 @@ package process
import (
"context"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
@@ -15,21 +14,21 @@ type AddToRoom struct {
AsyncProcess
}
func NewAddToRoom(roomID types.RoomID) interfaces.CanBeProcessed {
func NewAddToRoom(roomID types.RoomID) interceptor.CanBeProcessed {
return &AddToRoom{
RoomID: roomID,
}
}
// Process needs room processor
func (p *AddToRoom) Process(ctx context.Context, processor interfaces.Processor, s *state.State) error {
func (p *AddToRoom) Process(ctx context.Context, processor interceptor.CanProcess, s interceptor.State) error {
select {
case <-ctx.Done():
return errors.ErrContextCancelled
return interceptor.ErrContextCancelled
default:
r, ok := processor.(interfaces.CanAdd)
if !ok {
return errors.ErrInterfaceMisMatch
return interceptor.ErrInterfaceMisMatch
}
return r.Add(p.RoomID, s)

View File

@@ -5,14 +5,13 @@ import (
"fmt"
"sync"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/interceptor"
)
// AsyncProcess is intended to be embedded in a process (can be message-tagged) to enable async capabilities
// NOTE: TO EMBED THIS, THE EMBEDDER NEEDS TO IMPLEMENT interfaces.CanBeProcessed. THIS CONTRACT IS LEFT TO THE DEVELOPER TO FULFILL
type AsyncProcess struct {
interfaces.CanBeProcessed
interceptor.CanBeProcessed
err error
done chan struct{}
ctx context.Context
@@ -28,10 +27,10 @@ func ManualAsyncProcessInitialisation(ctx context.Context, cancel context.Cancel
}
}
func (p *AsyncProcess) ProcessBackground(ctx context.Context, _p interfaces.Processor, s *state.State) interfaces.CanBeProcessedBackground {
func (p *AsyncProcess) ProcessBackground(ctx context.Context, _p interceptor.CanProcessBackground, s interceptor.State) interceptor.CanBeProcessedBackground {
if p.CanBeProcessed == nil {
fmt.Println("WARNING: AsyncProcess.CanBeProcessed is nil; this is not allowed")
// p.CanBeProcessed =
return nil
}
if p.done == nil { // ONLY POSSIBLE WHEN NOT ManualAsyncProcessInitialisation-ed
p.done = make(chan struct{})
@@ -47,7 +46,7 @@ func (p *AsyncProcess) ProcessBackground(ctx context.Context, _p interfaces.Proc
}
go func() {
err := p.Process(p.ctx, _p, s)
err := p.Process(p.ctx, _p.(interceptor.CanProcess), s)
p.mux.Lock()
defer p.mux.Unlock()
defer p.cancel()

View File

@@ -4,20 +4,19 @@ import (
"context"
"time"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
type CreateHealthRoom struct {
RoomID types.RoomID `json:"room_id"`
Allowed []types.ClientID `json:"allowed"`
TTL time.Duration `json:"ttl"`
RoomID types.RoomID `json:"room_id"`
Allowed []interceptor.ClientID `json:"allowed"`
TTL time.Duration `json:"ttl"`
AsyncProcess
}
func NewCreateHealthRoom(id types.RoomID, allowed []types.ClientID, ttl time.Duration) CreateHealthRoom {
func NewCreateHealthRoom(id types.RoomID, allowed []interceptor.ClientID, ttl time.Duration) CreateHealthRoom {
return CreateHealthRoom{
RoomID: id,
Allowed: allowed,
@@ -25,14 +24,14 @@ func NewCreateHealthRoom(id types.RoomID, allowed []types.ClientID, ttl time.Dur
}
}
func (p *CreateHealthRoom) Process(ctx context.Context, processor interfaces.Processor, _ *state.State) error {
func (p *CreateHealthRoom) Process(ctx context.Context, processor interceptor.CanProcess, _ interceptor.State) error {
select {
case <-ctx.Done():
return errors.ErrContextCancelled
return interceptor.ErrContextCancelled
default:
r, ok := processor.(interfaces.CanCreateHealth)
if !ok {
return errors.ErrInterfaceMisMatch
return interceptor.ErrInterfaceMisMatch
}
_, err := r.CreateHealth(p.RoomID, p.Allowed, p.TTL)

View File

@@ -4,21 +4,20 @@ import (
"context"
"time"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
// CreateRoom is a process that creates a room as per requested by the client and adds the client to it.
type CreateRoom struct {
RoomID types.RoomID `json:"room_id"`
Allowed []types.ClientID `json:"allowed"`
TTL time.Duration `json:"ttl"`
RoomID types.RoomID `json:"room_id"`
Allowed []interceptor.ClientID `json:"allowed"`
TTL time.Duration `json:"ttl"`
AsyncProcess
}
func NewCreateRoom(roomID types.RoomID, allowed []types.ClientID, ttl time.Duration) *CreateRoom {
func NewCreateRoom(roomID types.RoomID, allowed []interceptor.ClientID, ttl time.Duration) *CreateRoom {
return &CreateRoom{
RoomID: roomID,
Allowed: allowed,
@@ -27,14 +26,14 @@ func NewCreateRoom(roomID types.RoomID, allowed []types.ClientID, ttl time.Durat
}
// Process requires room processor to be passed in.
func (p *CreateRoom) Process(ctx context.Context, processor interfaces.Processor, _ *state.State) error {
func (p *CreateRoom) Process(ctx context.Context, processor interceptor.CanProcess, _ interceptor.State) error {
select {
case <-ctx.Done():
return errors.ErrContextCancelled
return interceptor.ErrContextCancelled
default:
r, ok := processor.(interfaces.CanCreateRoom)
if !ok {
return errors.ErrInterfaceMisMatch
return interceptor.ErrInterfaceMisMatch
}
_, err := r.CreateRoom(p.RoomID, p.Allowed, p.TTL)

View File

@@ -3,9 +3,8 @@ package process
import (
"context"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
@@ -20,14 +19,14 @@ func NewDeleteHealthRoom(roomID types.RoomID) DeleteHealthRoom {
}
}
func (p *DeleteHealthRoom) Process(ctx context.Context, processor interfaces.Processor, _ *state.State) error {
func (p *DeleteHealthRoom) Process(ctx context.Context, processor interceptor.CanProcess, _ interceptor.State) error {
select {
case <-ctx.Done():
return errors.ErrContextCancelled
return interceptor.ErrContextCancelled
default:
d, ok := processor.(interfaces.CanDeleteHealth)
if !ok {
return errors.ErrInterfaceMisMatch
return interceptor.ErrInterfaceMisMatch
}
return d.DeleteHealth(p.RoomID)

View File

@@ -5,9 +5,8 @@ import (
"fmt"
"time"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
@@ -25,10 +24,10 @@ func NewDeleteHealthWaiter(ctx context.Context, roomid types.RoomID, ttl time.Du
}
}
func (p *DeleteHealthWaiter) Process(ctx context.Context, processor interfaces.Processor, _ *state.State) error {
func (p *DeleteHealthWaiter) Process(ctx context.Context, processor interceptor.CanProcess, _ interceptor.State) error {
d, ok := processor.(interfaces.CanDeleteHealth)
if !ok {
return errors.ErrInterfaceMisMatch
return interceptor.ErrInterfaceMisMatch
}
timer := time.NewTimer(p.TTL)
@@ -37,20 +36,12 @@ func (p *DeleteHealthWaiter) Process(ctx context.Context, processor interfaces.P
for {
select {
case <-ctx.Done():
return errors.ErrContextCancelled
return interceptor.ErrContextCancelled
case <-timer.C:
if err := p.process(d); err != nil {
if err := d.DeleteHealth(p.RoomID); err != nil {
return fmt.Errorf("error while processing DeleteHealthWaiter process; err: %s", err.Error())
}
return nil
}
}
}
func (p *DeleteHealthWaiter) process(d interfaces.CanDeleteHealth) error {
if err := d.DeleteHealth(p.RoomID); err != nil {
return err
}
return nil
}

View File

@@ -3,9 +3,8 @@ package process
import (
"context"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
@@ -15,14 +14,14 @@ type DeleteRoom struct {
}
// Process needs room processor to be passed in.
func (p *DeleteRoom) Process(ctx context.Context, processor interfaces.Processor, _ *state.State) error {
func (p *DeleteRoom) Process(ctx context.Context, processor interceptor.CanProcess, _ interceptor.State) error {
select {
case <-ctx.Done():
return errors.ErrContextCancelled
return interceptor.ErrContextCancelled
default:
d, ok := processor.(interfaces.CanDeleteRoom)
if !ok {
return errors.ErrInterfaceMisMatch
return interceptor.ErrInterfaceMisMatch
}
return d.DeleteRoom(p.RoomID)

View File

@@ -5,41 +5,34 @@ import (
"fmt"
"time"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
// DeleteRoomWaiter is a process that waits until TTL to delete a room.
// NOTE: THIS IS A PURE PROCESS; AND IS NOT ADVISED TO BE TAGGED IN A MESSAGE
type DeleteRoomWaiter struct {
TTL time.Duration `json:"ttl"`
RoomID types.RoomID `json:"room_id"`
TTL time.Duration `json:"ttl"`
DeleteRoom
AsyncProcess
}
func NewDeleteRoomWaiter(ctx context.Context, roomid types.RoomID, ttl time.Duration) *DeleteRoomWaiter {
return &DeleteRoomWaiter{
AsyncProcess: ManualAsyncProcessInitialisation(context.WithTimeout(ctx, ttl)),
RoomID: roomid,
DeleteRoom: DeleteRoom{RoomID: roomid},
TTL: ttl,
}
}
func (p *DeleteRoomWaiter) Process(ctx context.Context, processor interfaces.Processor, _ *state.State) error {
d, ok := processor.(interfaces.CanDeleteRoom)
if !ok {
return errors.ErrInterfaceMisMatch
}
func (p *DeleteRoomWaiter) Process(ctx context.Context, processor interceptor.CanProcess, _ interceptor.State) error {
timer := time.NewTimer(p.TTL)
defer timer.Stop()
for {
select {
case <-timer.C:
if err := p.process(d); err != nil {
if err := p.DeleteRoom.Process(ctx, processor, nil); err != nil {
return fmt.Errorf("error while processing DeleteRoomWaiter process; err: %s", err.Error())
}
return nil
@@ -48,11 +41,3 @@ func (p *DeleteRoomWaiter) Process(ctx context.Context, processor interfaces.Pro
}
}
}
func (p *DeleteRoomWaiter) process(d interfaces.CanDeleteRoom) error {
if err := d.DeleteRoom(p.RoomID); err != nil {
return err
}
return nil
}

View File

@@ -4,8 +4,7 @@ import (
"context"
"fmt"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/interceptor"
)
type IdentInit struct {
@@ -16,8 +15,8 @@ func NewIdentInit() *IdentInit {
return &IdentInit{}
}
func (p *IdentInit) Process(ctx context.Context, _ interfaces.Processor, s *state.State) error {
// TODO: SEND IDENT MESSAGE
func (p *IdentInit) Process(ctx context.Context, _ interceptor.CanProcess, s interceptor.State) error {
// TODO: SEND IDENT MESSAGE // PROBLEM HERE AS PROCESS MODULE CANNOT IMPORT MESSAGE
if err := s.Write(nil); err != nil {
return err
}

View File

@@ -5,9 +5,7 @@ import (
"fmt"
"time"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/interceptor"
)
type IdentWaiterOptions func(*IdentWaiter)
@@ -29,14 +27,14 @@ type IdentWaiter struct {
AsyncProcess
}
func (p *IdentWaiter) Process(ctx context.Context, _ interfaces.Processor, s *state.State) error {
func (p *IdentWaiter) Process(ctx context.Context, _ interceptor.CanProcess, s interceptor.State) error {
ticker := time.NewTicker(p.duration)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return errors.ErrContextCancelled
return interceptor.ErrContextCancelled
case <-ticker.C:
if err := p.process(s); err == nil {
return nil
@@ -46,7 +44,7 @@ func (p *IdentWaiter) Process(ctx context.Context, _ interfaces.Processor, s *st
}
}
func (p *IdentWaiter) process(s *state.State) error {
func (p *IdentWaiter) process(s interceptor.State) error {
_, err := s.GetClientID()
return err
}

View File

@@ -3,20 +3,20 @@ package process
import (
"context"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
type RemoveFromRoom struct {
RoomID types.RoomID `json:"room_id"`
AsyncProcess
}
func (p *RemoveFromRoom) Process(ctx context.Context, processor interfaces.Processor, s *state.State) error {
func (p *RemoveFromRoom) Process(_ context.Context, processor interceptor.CanProcess, s interceptor.State) error {
r, ok := processor.(interfaces.CanRemove)
if !ok {
return errors.ErrInterfaceMisMatch
return interceptor.ErrInterfaceMisMatch
}
return r.Remove(p.RoomID, s)

View File

@@ -3,10 +3,8 @@ package process
import (
"context"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
)
type SendMessage struct {
@@ -20,15 +18,15 @@ func NewSendMessage(factory func() (message.Message, error)) *SendMessage {
}
}
func (p *SendMessage) Process(ctx context.Context, _ interfaces.Processor, s *state.State) error {
func (p *SendMessage) Process(ctx context.Context, _ interceptor.CanProcess, s interceptor.State) error {
select {
case <-ctx.Done():
return errors.ErrContextCancelled
return interceptor.ErrContextCancelled
default:
msg, err := p.factory()
if err != nil {
return err
}
return s.Write(msg)
return s.Write(ctx, msg)
}
}

View File

@@ -5,10 +5,9 @@ import (
"fmt"
"github.com/harshabose/socket-comm/internal/util"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
@@ -28,14 +27,14 @@ func NewSendMessageToAllParticipantsInRoom(roomid types.RoomID, msgFactory func(
}
// Process needs Room processor to be passed in.
func (p *SendMessageToAllParticipantsInRoom) Process(ctx context.Context, processor interfaces.Processor, _ *state.State) error {
func (p *SendMessageToAllParticipantsInRoom) Process(ctx context.Context, processor interceptor.CanProcess, _ interceptor.State) error {
select {
case <-ctx.Done():
return errors.ErrContextCancelled
return interceptor.ErrContextCancelled
default:
r, ok := processor.(interfaces.CanGetRoom)
if !ok {
return errors.ErrInterfaceMisMatch
return interceptor.ErrInterfaceMisMatch
}
room, err := r.GetRoom(p.Roomid)

View File

@@ -3,21 +3,20 @@ package process
import (
"context"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
type SendMessageRoom struct {
RoomID types.RoomID `json:"room_id"`
ClientID types.ClientID `json:"client_id"`
RoomID types.RoomID `json:"room_id"`
ClientID interceptor.ClientID `json:"client_id"`
messageFactory func() (message.Message, error)
AsyncProcess
}
func NewSendMessageBetweenParticipantsInRoom(roomID types.RoomID, clientID types.ClientID, messageFactory func() (message.Message, error)) *SendMessageRoom {
func NewSendMessageBetweenParticipantsInRoom(roomID types.RoomID, clientID interceptor.ClientID, messageFactory func() (message.Message, error)) *SendMessageRoom {
return &SendMessageRoom{
RoomID: roomID,
ClientID: clientID,
@@ -25,14 +24,14 @@ func NewSendMessageBetweenParticipantsInRoom(roomID types.RoomID, clientID types
}
}
func (p *SendMessageRoom) Process(ctx context.Context, processor interfaces.Processor, _ *state.State) error {
func (p *SendMessageRoom) Process(ctx context.Context, processor interceptor.CanProcess, _ interceptor.State) error {
select {
case <-ctx.Done():
return errors.ErrContextCancelled
return interceptor.ErrContextCancelled
default:
w, ok := processor.(interfaces.CanWriteRoomMessage)
if !ok {
return errors.ErrInterfaceMisMatch
return interceptor.ErrInterfaceMisMatch
}
msg, err := p.messageFactory()

View File

@@ -4,10 +4,8 @@ import (
"context"
"time"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
)
type SendMessageStream struct {
@@ -23,7 +21,7 @@ func NewSendMessageStream(factory func() (message.Message, error), interval time
}
}
func (p *SendMessageStream) Process(ctx context.Context, _ interfaces.Processor, s *state.State) error {
func (p *SendMessageStream) Process(ctx context.Context, _ interceptor.CanProcess, s interceptor.State) error {
ticker := time.NewTicker(p.Interval)
defer ticker.Stop()
@@ -34,7 +32,7 @@ func (p *SendMessageStream) Process(ctx context.Context, _ interfaces.Processor,
return err
}
case <-ctx.Done():
return errors.ErrContextCancelled
return interceptor.ErrContextCancelled
}
}
}

View File

@@ -5,9 +5,8 @@ import (
"fmt"
"time"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
@@ -30,8 +29,12 @@ func NewSendMessageStreamToAllParticipants(ctx context.Context, msgFactory func(
}
}
func (p *SendMessageStreamRoomToAllParticipants) SetInterval(interval time.Duration) {
p.interval = interval
}
// Process needs Room processor to be passed in.
func (p *SendMessageStreamRoomToAllParticipants) Process(ctx context.Context, r interfaces.Processor, _ *state.State) error {
func (p *SendMessageStreamRoomToAllParticipants) Process(ctx context.Context, r interceptor.CanProcess, _ interceptor.State) error {
ticker := time.NewTicker(p.interval)
defer ticker.Stop()

View File

@@ -1,41 +0,0 @@
package process
import (
"context"
"time"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
type StartHealthTracking struct {
RoomID types.RoomID `json:"room_id"`
Interval time.Duration `json:"interval"`
AsyncProcess
}
func NewStartHealthTracking(roomID types.RoomID) interfaces.CanBeProcessed {
return &StartHealthTracking{
RoomID: roomID,
}
}
func (p *StartHealthTracking) Process(ctx context.Context, processor interfaces.Processor, _ *state.State) error {
select {
case <-ctx.Done():
return errors.ErrContextCancelled
default:
t, ok := processor.(interfaces.CanStartHealthTracking)
if !ok {
return errors.ErrInterfaceMisMatch
}
if err := t.StartHealthTracking(p.RoomID, p.Interval); err != nil {
return err
}
return nil
}
}

View File

@@ -1,37 +0,0 @@
package process
import (
"context"
"time"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
type StartStreamingHealthSnapshots struct {
Roomid types.RoomID `json:"roomid"`
Interval time.Duration `json:"interval"`
AsyncProcess
}
func NewGetHealthSnapshot(roomid types.RoomID) *StartStreamingHealthSnapshots {
return &StartStreamingHealthSnapshots{
Roomid: roomid,
}
}
func (p *StartStreamingHealthSnapshots) Process(ctx context.Context, processor interfaces.Processor, s *state.State) error {
select {
case <-ctx.Done():
return errors.ErrContextCancelled
default:
h, ok := processor.(interfaces.CanAddHealthSnapshotStreamer)
if !ok {
return errors.ErrInterfaceMisMatch
}
return h.AddHealthSnapshotStreamer(p.Roomid, p.Interval, s)
}
}

View File

@@ -3,31 +3,30 @@ package process
import (
"context"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
type UnMarkRoomForHealthTracking struct {
type StopHealthTracking struct {
RoomID types.RoomID
AsyncProcess
}
func NewUnMarkRoomForHealthTracking(roomID types.RoomID) *UnMarkRoomForHealthTracking {
return &UnMarkRoomForHealthTracking{
func NewUnMarkRoomForHealthTracking(roomID types.RoomID) *StopHealthTracking {
return &StopHealthTracking{
RoomID: roomID,
}
}
func (p *UnMarkRoomForHealthTracking) Process(ctx context.Context, processor interfaces.Processor, _ *state.State) error {
func (p *StopHealthTracking) Process(ctx context.Context, processor interceptor.CanProcess, _ interceptor.State) error {
select {
case <-ctx.Done():
return errors.ErrContextCancelled
return interceptor.ErrContextCancelled
default:
u, ok := processor.(interfaces.CanStopHealthTracking)
if !ok {
return errors.ErrInterfaceMisMatch
return interceptor.ErrInterfaceMisMatch
}
if err := u.StopHealthTracking(p.RoomID); err != nil {

View File

@@ -3,9 +3,8 @@ package process
import (
"context"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
@@ -20,14 +19,14 @@ func NewStopStreamingHealthSnapshots(roomid types.RoomID) StopStreamingHealthSna
}
}
func (p *StopStreamingHealthSnapshots) Process(ctx context.Context, processor interfaces.Processor, s *state.State) error {
func (p *StopStreamingHealthSnapshots) Process(ctx context.Context, processor interceptor.CanProcess, s interceptor.State) error {
select {
case <-ctx.Done():
return nil
default:
h, ok := processor.(interfaces.CanRemoveHealthSnapshotStreamer)
if !ok {
return errors.ErrInterfaceMisMatch
return interceptor.ErrInterfaceMisMatch
}
return h.RemoveHealthSnapshotStreamer(p.Roomid, s)

View File

@@ -3,10 +3,9 @@ package process
import (
"context"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/middleware/chat/health"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
@@ -17,10 +16,10 @@ type UpdateHealthStat struct {
AsyncProcess
}
func (p *UpdateHealthStat) Process(ctx context.Context, processor interfaces.Processor, s *state.State) error {
func (p *UpdateHealthStat) Process(ctx context.Context, processor interceptor.CanProcess, s interceptor.State) error {
select {
case <-ctx.Done():
return errors.ErrContextCancelled
return interceptor.ErrContextCancelled
default:
id, err := s.GetClientID()
if err != nil {
@@ -29,7 +28,7 @@ func (p *UpdateHealthStat) Process(ctx context.Context, processor interfaces.Pro
u, ok := processor.(interfaces.CanUpdate)
if !ok {
return errors.ErrInterfaceMisMatch
return interceptor.ErrInterfaceMisMatch
}
if err := u.Update(p.RoomID, id, &p.Stat); err != nil {

View File

@@ -4,23 +4,19 @@ import (
"context"
"github.com/harshabose/socket-comm/internal/util"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/middleware/chat/health"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
type SnapshotGetter interface {
GetSnapshot() (health.Snapshot, error)
}
type UpdateHealthSnapshot struct {
health.Snapshot
Snapshot health.Snapshot `json:"snapshot"`
AsyncProcess
}
func NewUpdateHealthSnapshot(snapGetter SnapshotGetter) (*UpdateHealthSnapshot, error) {
snapshot, err := snapGetter.GetSnapshot()
func NewUpdateHealthSnapshot(roomID types.RoomID, snapGetter interfaces.CanGetHealthSnapshot) (*UpdateHealthSnapshot, error) {
snapshot, err := snapGetter.GetHealthSnapshot(roomID)
if err != nil {
return nil, err
}
@@ -29,28 +25,24 @@ func NewUpdateHealthSnapshot(snapGetter SnapshotGetter) (*UpdateHealthSnapshot,
}, nil
}
func (p *UpdateHealthSnapshot) Process(ctx context.Context, processor interfaces.Processor, s *state.State) error {
func (p *UpdateHealthSnapshot) Process(ctx context.Context, processor interceptor.CanProcess, _ interceptor.State) error {
select {
case <-ctx.Done():
return errors.ErrContextCancelled
return interceptor.ErrContextCancelled
default:
c, ok := processor.(interfaces.CanGetHealth)
if !ok {
return errors.ErrInterfaceMisMatch
return interceptor.ErrInterfaceMisMatch
}
h, err := c.GetHealth(p.Roomid)
h, err := c.GetHealth(p.Snapshot.Roomid)
if err != nil {
return err
}
// NOTE: I COULD JUST DO THE FOLLOWING:
// h.Snapshot = p.Snapshot
// BUT MAYBE SOME MISMATCH WITH TTL OR ROOMID OR SOMETHING CAN CORRUPT THIS DATA.
merr := util.NewMultiError()
for client, stat := range p.Participants {
merr.Add(h.Update(p.Roomid, client, stat))
for client, stat := range p.Snapshot.Participants {
merr.Add(h.Update(p.Snapshot.Roomid, client, stat))
}
return merr.ErrorOrNil()

View File

@@ -6,18 +6,17 @@ import (
"sync"
"time"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/health"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/process"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
type healthSession struct {
health *health.Health
healthTrackingDeleter interfaces.CanBeProcessedBackground
healthSnapshotStreamer map[types.ClientID]interfaces.CanBeProcessedBackground // receiver[]streamerProcess
healthTrackingDeleter interceptor.CanBeProcessedBackground
healthSnapshotStreamer map[interceptor.ClientID]interceptor.CanBeProcessedBackground // receiver[]streamerProcess
}
type Health struct {
@@ -37,7 +36,7 @@ func NewHealthProcessor(ctx context.Context) *Health {
// ================================ CORE METHODS ================================
// ==============================================================================
func (p *Health) CreateHealth(roomid types.RoomID, allowed []types.ClientID, ttl time.Duration) (*health.Health, error) {
func (p *Health) CreateHealth(roomid types.RoomID, allowed []interceptor.ClientID, ttl time.Duration) (*health.Health, error) {
if p.exists(roomid) {
return nil, fmt.Errorf("error while creating h with id %s; err: %s", roomid, errors.ErrRoomAlreadyExists)
}
@@ -45,7 +44,7 @@ func (p *Health) CreateHealth(roomid types.RoomID, allowed []types.ClientID, ttl
p.health[roomid] = &healthSession{
health: health.NewHealth(p.ctx, roomid, allowed, ttl),
healthTrackingDeleter: process.NewDeleteHealthWaiter(p.ctx, roomid, ttl).ProcessBackground(nil, p, nil),
healthSnapshotStreamer: make(map[types.ClientID]interfaces.CanBeProcessedBackground),
healthSnapshotStreamer: make(map[interceptor.ClientID]interceptor.CanBeProcessedBackground),
}
return p.health[roomid].health, nil
@@ -94,11 +93,11 @@ func (p *Health) exists(roomid types.RoomID) bool {
// ========================== INTERFACE IMPLEMENTATIONS =========================
// ==============================================================================
func (p *Health) Process(ctx context.Context, process interfaces.CanBeProcessed, state *state.State) error {
func (p *Health) Process(ctx context.Context, process interceptor.CanBeProcessed, state interceptor.State) error {
return process.Process(ctx, p, state)
}
func (p *Health) ProcessBackground(ctx context.Context, process interfaces.CanBeProcessedBackground, state *state.State) interfaces.CanBeProcessedBackground {
func (p *Health) ProcessBackground(ctx context.Context, process interceptor.CanBeProcessedBackground, state interceptor.State) interceptor.CanBeProcessedBackground {
return process.ProcessBackground(ctx, p, state)
}
@@ -115,7 +114,7 @@ func (p *Health) GetHealthSnapshot(roomid types.RoomID) (health.Snapshot, error)
// NOTE: FOLLOWING IS DEEP-COPYING SNAPSHOT
snapshot := health.Snapshot{
Roomid: h.Roomid,
Participants: make(map[types.ClientID]*health.Stat, len(h.Participants)),
Participants: make(map[interceptor.ClientID]*health.Stat, len(h.Participants)),
}
for id, stat := range h.Participants {
@@ -130,7 +129,7 @@ func (p *Health) GetHealthSnapshot(roomid types.RoomID) (health.Snapshot, error)
return snapshot, nil
}
func (p *Health) AddHealthSnapshotStreamer(roomid types.RoomID, interval time.Duration, s *state.State) error {
func (p *Health) AddHealthSnapshotStreamer(roomid types.RoomID, s interceptor.State, _p interceptor.CanBeProcessedBackground) error {
p.mux.Lock()
defer p.mux.Unlock()
@@ -158,15 +157,14 @@ func (p *Health) AddHealthSnapshotStreamer(roomid types.RoomID, interval time.Du
}
// WARN: MAYBE RISK OF INFINITE RECURSION HERE (NO REASON BUT THE EXISTENCE OF RECURSION ITSELF HAS RISKS
return p.AddHealthSnapshotStreamer(roomid, interval, s)
return p.AddHealthSnapshotStreamer(roomid, s, _p)
}
// TODO: SEND SNAP MESSAGE
session.healthSnapshotStreamer[id] = process.NewSendMessageStream(nil, interval).ProcessBackground(p.ctx, nil, s)
session.healthSnapshotStreamer[id] = _p.ProcessBackground(p.ctx, nil, s)
return nil
}
func (p *Health) RemoveHealthSnapshotStreamer(roomid types.RoomID, s *state.State) error {
func (p *Health) RemoveHealthSnapshotStreamer(roomid types.RoomID, s interceptor.State) error {
p.mux.Lock()
defer p.mux.Unlock()
@@ -193,7 +191,7 @@ func (p *Health) RemoveHealthSnapshotStreamer(roomid types.RoomID, s *state.Stat
// Add adds the given client to the health tracking in the given room.
// Only after calling this method, the stat responses from the clients are updated.
func (p *Health) Add(roomid types.RoomID, id types.ClientID) error {
func (p *Health) Add(roomid types.RoomID, id interceptor.ClientID) error {
p.mux.Lock()
defer p.mux.Unlock()
@@ -207,7 +205,7 @@ func (p *Health) Add(roomid types.RoomID, id types.ClientID) error {
// Remove removes the given client from the health tracking from the given room.
// After removing, the stat responses are not updated any more.
func (p *Health) Remove(roomid types.RoomID, id types.ClientID) error {
func (p *Health) Remove(roomid types.RoomID, id interceptor.ClientID) error {
p.mux.Lock()
defer p.mux.Unlock()
@@ -221,7 +219,7 @@ func (p *Health) Remove(roomid types.RoomID, id types.ClientID) error {
// Update updates the stats of the given client in the given room.
// If the client is not already added to the list, the update will fail.
func (p *Health) Update(roomid types.RoomID, id types.ClientID, stat *health.Stat) error {
func (p *Health) Update(roomid types.RoomID, id interceptor.ClientID, stat *health.Stat) error {
p.mux.Lock()
defer p.mux.Unlock()

View File

@@ -6,13 +6,11 @@ import (
"sync"
"time"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/messages"
"github.com/harshabose/socket-comm/pkg/middleware/chat/process"
"github.com/harshabose/socket-comm/pkg/middleware/chat/room"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
@@ -20,7 +18,7 @@ type roomSession struct {
// NOTE: AS OF NOW, I AM MANAGING THE ROOM-DELETION AND HEALTH-TRACKING LIKE THIS;
// NOTE: I AM NOT SURE IF THIS IS THE RIGHT WAY OR NOT
room *room.Room
deletionWaiter, healthTrackingRequestSender interfaces.CanBeProcessedBackground
deletionWaiter, healthTrackingRequestSender interceptor.CanBeProcessedBackground
}
type RoomManager struct {
@@ -36,7 +34,7 @@ func NewRoomProcessor(ctx context.Context) *RoomManager {
}
}
func (m *RoomManager) Add(id types.RoomID, s *state.State) error {
func (m *RoomManager) Add(id types.RoomID, s interceptor.State) error {
r, err := m.GetRoom(id)
if err != nil {
return err
@@ -45,7 +43,7 @@ func (m *RoomManager) Add(id types.RoomID, s *state.State) error {
return r.Add(id, s)
}
func (m *RoomManager) Remove(id types.RoomID, s *state.State) error {
func (m *RoomManager) Remove(id types.RoomID, s interceptor.State) error {
r, err := m.GetRoom(id)
if err != nil {
return err
@@ -65,7 +63,7 @@ func (m *RoomManager) Remove(id types.RoomID, s *state.State) error {
// Returns:
// - *room.Room: pointer to the newly created room
// - error: nil if successful, ErrRoomAlreadyExists if room already exists
func (m *RoomManager) CreateRoom(id types.RoomID, allowed []types.ClientID, ttl time.Duration) (*room.Room, error) {
func (m *RoomManager) CreateRoom(id types.RoomID, allowed []interceptor.ClientID, ttl time.Duration) (*room.Room, error) {
if m.exists(id) {
return nil, fmt.Errorf("error while creating r with id %s; err: %s", id, errors.ErrRoomAlreadyExists)
}
@@ -101,7 +99,12 @@ func (m *RoomManager) exists(id types.RoomID) bool {
//
// Returns:
// - error: nil if successful, ErrRoomNotFound if room does not exist, or other errors if marking fails
func (m *RoomManager) StartHealthTracking(id types.RoomID, interval time.Duration) error {
func (m *RoomManager) StartHealthTracking(id types.RoomID, interval time.Duration, _p interceptor.CanBeProcessedBackground) error {
p, ok := _p.(*process.SendMessageStreamRoomToAllParticipants)
if !ok {
return interceptor.ErrInterfaceMisMatch
}
if interval <= 0 {
return fmt.Errorf("health tracking interval must be positive, got: %v", interval)
}
@@ -119,12 +122,16 @@ func (m *RoomManager) StartHealthTracking(id types.RoomID, interval time.Duratio
return fmt.Errorf("health tracking interval too large (maximum %v = 10%% of TTL), got: %v", r.TTL()/10, interval)
}
p.SetInterval(interval)
if err := r.StartHealthTracking(id); err != nil {
return err
}
m.rooms[id].healthTrackingRequestSender = process.NewSendMessageStreamToAllParticipants(
m.ctx, messages.NewRequestHealthFactory(id), id, interval, r.TTL()).ProcessBackground(nil, m, nil)
// m.rooms[id].healthTrackingRequestSender = process.NewSendMessageStreamToAllParticipants(
// m.ctx, messages.NewRequestHealthFactory(id), id, interval, r.TTL()).ProcessBackground(nil, m, nil)
m.rooms[id].healthTrackingRequestSender = p.ProcessBackground(nil, m, nil)
return nil
}
@@ -188,7 +195,7 @@ func (m *RoomManager) DeleteRoom(id types.RoomID) error {
return nil
}
func (m *RoomManager) WriteRoomMessage(roomid types.RoomID, msg message.Message, from types.ClientID, tos ...types.ClientID) error {
func (m *RoomManager) WriteRoomMessage(roomid types.RoomID, msg message.Message, from interceptor.ClientID, tos ...interceptor.ClientID) error {
r, err := m.GetRoom(roomid)
if err != nil {
return err
@@ -197,10 +204,10 @@ func (m *RoomManager) WriteRoomMessage(roomid types.RoomID, msg message.Message,
return r.WriteRoomMessage(roomid, msg, from, tos...)
}
func (m *RoomManager) Process(ctx context.Context, process interfaces.CanBeProcessed, state *state.State) error {
func (m *RoomManager) Process(ctx context.Context, process interceptor.CanBeProcessed, state interceptor.State) error {
return process.Process(ctx, m, state)
}
func (m *RoomManager) ProcessBackground(ctx context.Context, process interfaces.CanBeProcessedBackground, state *state.State) interfaces.CanBeProcessedBackground {
func (m *RoomManager) ProcessBackground(ctx context.Context, process interceptor.CanBeProcessedBackground, state interceptor.State) interceptor.CanBeProcessedBackground {
return process.ProcessBackground(ctx, m, state)
}

View File

@@ -5,17 +5,17 @@ import (
"fmt"
"time"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
type Room struct {
// NOTE: MAYBE A CONFIG FOR ROOM?
roomid types.RoomID
allowed []types.ClientID
participants map[types.ClientID]*state.State
allowed []interceptor.ClientID
participants map[interceptor.ClientID]interceptor.State
ttl time.Duration
isHealthTracked bool
cancel context.CancelFunc
@@ -26,7 +26,7 @@ type Room struct {
// UPDATE: YES
// TODO: ADD SOME VALIDATION BEFORE CREATING THE ROOM
func NewRoom(ctx context.Context, id types.RoomID, allowed []types.ClientID, ttl time.Duration) *Room {
func NewRoom(ctx context.Context, id types.RoomID, allowed []interceptor.ClientID, ttl time.Duration) *Room {
ctx2, cancel := context.WithTimeout(ctx, ttl)
return &Room{
ctx: ctx2,
@@ -34,7 +34,7 @@ func NewRoom(ctx context.Context, id types.RoomID, allowed []types.ClientID, ttl
ttl: ttl,
roomid: id,
allowed: allowed,
participants: make(map[types.ClientID]*state.State),
participants: make(map[interceptor.ClientID]interceptor.State),
}
}
@@ -50,7 +50,7 @@ func (r *Room) TTL() time.Duration {
return r.ttl
}
func (r *Room) Add(roomid types.RoomID, s *state.State) error {
func (r *Room) Add(roomid types.RoomID, s interceptor.State) error {
if roomid != r.roomid {
return errors.ErrWrongRoom
}
@@ -62,7 +62,7 @@ func (r *Room) Add(roomid types.RoomID, s *state.State) error {
select {
case <-r.ctx.Done():
return fmt.Errorf("error while adding client to room. client id: %s; room id: %s; err: %s", id, r.roomid, errors.ErrContextCancelled.Error())
return fmt.Errorf("error while adding client to room. client id: %s; room id: %s; err: %s", id, r.roomid, interceptor.ErrContextCancelled.Error())
default:
if !r.isAllowed(id) {
return fmt.Errorf("error while adding client to room. client id: %s; room id: %s; err: %s", id, r.roomid, errors.ErrClientNotAllowed.Error())
@@ -77,7 +77,7 @@ func (r *Room) Add(roomid types.RoomID, s *state.State) error {
}
}
func (r *Room) isAllowed(id types.ClientID) bool {
func (r *Room) isAllowed(id interceptor.ClientID) bool {
select {
case <-r.ctx.Done():
return false
@@ -96,7 +96,7 @@ func (r *Room) isAllowed(id types.ClientID) bool {
}
}
func (r *Room) forEachBoolean(f func(id types.ClientID) bool, ids ...types.ClientID) bool {
func (r *Room) forEachBoolean(f func(id interceptor.ClientID) bool, ids ...interceptor.ClientID) bool {
if len(ids) == 0 {
return false
}
@@ -110,7 +110,7 @@ func (r *Room) forEachBoolean(f func(id types.ClientID) bool, ids ...types.Clien
return true
}
func (r *Room) Remove(roomid types.RoomID, s *state.State) error {
func (r *Room) Remove(roomid types.RoomID, s interceptor.State) error {
if roomid != r.roomid {
return errors.ErrWrongRoom
}
@@ -122,7 +122,7 @@ func (r *Room) Remove(roomid types.RoomID, s *state.State) error {
select {
case <-r.ctx.Done():
return fmt.Errorf("error while removing client to room. client id: %s; room id: %s; err: %s", id, r.roomid, errors.ErrContextCancelled.Error())
return fmt.Errorf("error while removing client to room. client id: %s; room id: %s; err: %s", id, r.roomid, interceptor.ErrContextCancelled.Error())
default:
if !r.isAllowed(id) {
return fmt.Errorf("error while removing client to room. client id: %s; room id: %s; err: %s", id, r.roomid, errors.ErrClientNotAllowed.Error())
@@ -137,7 +137,7 @@ func (r *Room) Remove(roomid types.RoomID, s *state.State) error {
}
}
func (r *Room) isParticipant(id types.ClientID) bool {
func (r *Room) isParticipant(id interceptor.ClientID) bool {
select {
case <-r.ctx.Done():
return false
@@ -147,10 +147,10 @@ func (r *Room) isParticipant(id types.ClientID) bool {
}
}
func (r *Room) WriteRoomMessage(roomid types.RoomID, msg message.Message, from types.ClientID, tos ...types.ClientID) error {
func (r *Room) WriteRoomMessage(roomid types.RoomID, msg message.Message, from interceptor.ClientID, tos ...interceptor.ClientID) error {
select {
case <-r.ctx.Done():
return fmt.Errorf("error while sending message to peer in room; err: %s", errors.ErrContextCancelled.Error())
return fmt.Errorf("error while sending message to peer in room; err: %s", interceptor.ErrContextCancelled.Error())
default:
if roomid != r.roomid {
return errors.ErrWrongRoom
@@ -169,7 +169,7 @@ func (r *Room) WriteRoomMessage(roomid types.RoomID, msg message.Message, from t
}
for _, to := range tos {
if err := r.participants[to].Write(msg); err != nil {
if err := r.participants[to].Write(r.ctx, msg); err != nil {
return fmt.Errorf("error while sending message to peer in room; err: %s", err.Error())
}
}
@@ -181,7 +181,7 @@ func (r *Room) WriteRoomMessage(roomid types.RoomID, msg message.Message, from t
func (r *Room) StartHealthTracking(roomid types.RoomID) error {
select {
case <-r.ctx.Done():
return fmt.Errorf("error while marking room as health tracked. room id: %s; err: %s", roomid, errors.ErrContextCancelled.Error())
return fmt.Errorf("error while marking room as health tracked. room id: %s; err: %s", roomid, interceptor.ErrContextCancelled.Error())
default:
if roomid != r.roomid {
return errors.ErrWrongRoom
@@ -203,7 +203,7 @@ func (r *Room) IsRoomMarkedForHealthTracking() bool {
func (r *Room) UnMarkRoomForHealthTracking() error {
select {
case <-r.ctx.Done():
return fmt.Errorf("error while unmarking room as health tracked. room id: %s; err: %s", r.roomid, errors.ErrContextCancelled.Error())
return fmt.Errorf("error while unmarking room as health tracked. room id: %s; err: %s", r.roomid, interceptor.ErrContextCancelled.Error())
default:
if !r.IsRoomMarkedForHealthTracking() {
return fmt.Errorf("room with id %s is not health tracked", r.roomid)
@@ -214,26 +214,26 @@ func (r *Room) UnMarkRoomForHealthTracking() error {
}
}
func (r *Room) GetParticipants() []types.ClientID {
func (r *Room) GetParticipants() []interceptor.ClientID {
select {
case <-r.ctx.Done():
return make([]types.ClientID, 0) // EMPTY LIST
return make([]interceptor.ClientID, 0) // EMPTY LIST
default:
clients := make([]types.ClientID, 0)
for id, _ := range r.participants {
clients := make([]interceptor.ClientID, 0)
for id := range r.participants {
clients = append(clients, id)
}
return clients
}
}
func (r *Room) GetAllowed() []types.ClientID {
func (r *Room) GetAllowed() []interceptor.ClientID {
return r.allowed
}
func (r *Room) Close() error {
r.cancel()
r.participants = make(map[types.ClientID]*state.State)
r.allowed = make([]types.ClientID, 0)
r.participants = make(map[interceptor.ClientID]interceptor.State)
r.allowed = make([]interceptor.ClientID, 0)
return nil
}

View File

@@ -6,14 +6,13 @@ import (
"time"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/process"
)
type ServerInterceptor struct {
*commonInterceptor
Rooms interfaces.Processor
Health interfaces.Processor
Rooms interceptor.Processor
Health interceptor.Processor
}
func (i *ServerInterceptor) BindSocketConnection(connection interceptor.Connection, writer interceptor.Writer, reader interceptor.Reader) (interceptor.Writer, interceptor.Reader, error) {

View File

@@ -6,11 +6,10 @@ import (
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
type State struct {
id types.ClientID
id interceptor.ClientID
connection interceptor.Connection
writer interceptor.Writer
reader interceptor.Reader
@@ -20,7 +19,7 @@ type State struct {
func NewState(ctx context.Context, cancel context.CancelFunc, connection interceptor.Connection, writer interceptor.Writer, reader interceptor.Reader) *State {
return &State{
id: types.UnKnownClient,
id: interceptor.UnknownClientID,
connection: connection,
writer: writer,
reader: reader,
@@ -33,21 +32,21 @@ func (s *State) Ctx() context.Context {
return s.ctx
}
func (s *State) GetClientID() (types.ClientID, error) {
if s.id == types.UnKnownClient {
func (s *State) GetClientID() (interceptor.ClientID, error) {
if s.id == interceptor.UnknownClientID {
return s.id, errors.ErrUnknownClientIDState
}
return s.id, nil
}
func (s *State) Write(msg message.Message) error {
return s.writer.Write(s.connection, msg)
func (s *State) Write(ctx context.Context, msg message.Message) error {
return s.writer.Write(ctx, s.connection, msg)
}
func (s *State) SetClientID(id types.ClientID) error {
if s.id != types.UnKnownClient {
return errors.ErrClientIDNotConsistent
func (s *State) SetClientID(id interceptor.ClientID) error {
if s.id != interceptor.UnknownClientID {
return interceptor.ErrClientIDNotConsistent
}
s.id = id

View File

@@ -1,10 +1,5 @@
package types
type (
ClientID string
RoomID string
)
const (
UnKnownClient ClientID = "unknown"
RoomID string
)