diff --git a/pkg/middleware/chat/client_interceptor.go b/pkg/middleware/chat/client_interceptor.go index 309e8b5..1871c61 100644 --- a/pkg/middleware/chat/client_interceptor.go +++ b/pkg/middleware/chat/client_interceptor.go @@ -1,7 +1,12 @@ package chat import ( + "context" + "fmt" + "time" + "github.com/harshabose/socket-comm/pkg/interceptor" + "github.com/harshabose/socket-comm/pkg/middleware/chat/process" ) type ClientInterceptor struct { @@ -11,3 +16,20 @@ type ClientInterceptor struct { func (i *ClientInterceptor) BindSocketConnection(connection interceptor.Connection, writer interceptor.Writer, reader interceptor.Reader) (interceptor.Writer, interceptor.Reader, error) { return i.commonInterceptor.BindSocketConnection(connection, writer, reader) } + +func (i *ClientInterceptor) Init(connection interceptor.Connection) error { + s, err := i.GetState(connection) + if err != nil { + return fmt.Errorf("error while init; err: %s", err.Error()) + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + p := process.NewIdentWaiter(ctx) + if err := p.Process(nil, s); err != nil { + return fmt.Errorf("error while init; err: %s", err.Error()) + } + + return nil +} diff --git a/pkg/middleware/chat/common_interceptor.go b/pkg/middleware/chat/common_interceptor.go index 5320865..175152a 100644 --- a/pkg/middleware/chat/common_interceptor.go +++ b/pkg/middleware/chat/common_interceptor.go @@ -6,7 +6,6 @@ import ( "github.com/harshabose/socket-comm/pkg/interceptor" "github.com/harshabose/socket-comm/pkg/message" "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" - "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" "github.com/harshabose/socket-comm/pkg/middleware/chat/state" ) @@ -14,7 +13,7 @@ type commonInterceptor struct { interceptor.NoOpInterceptor readProcessMessages message.Registry writeProcessMessages message.Registry - states interfaces.StateManager + states state.Manager } func (i *commonInterceptor) BindSocketConnection(connection interceptor.Connection, writer interceptor.Writer, reader interceptor.Reader) (interceptor.Writer, interceptor.Reader, error) { @@ -98,6 +97,6 @@ func (i *commonInterceptor) Close() error { return nil } -func (i *commonInterceptor) GetState(connection interceptor.Connection) (interfaces.State, error) { +func (i *commonInterceptor) GetState(connection interceptor.Connection) (*state.State, error) { return i.states.GetState(connection) } diff --git a/pkg/middleware/chat/interfaces/health.go b/pkg/middleware/chat/interfaces/health.go new file mode 100644 index 0000000..08badf2 --- /dev/null +++ b/pkg/middleware/chat/interfaces/health.go @@ -0,0 +1 @@ +package interfaces diff --git a/pkg/middleware/chat/interfaces/process.go b/pkg/middleware/chat/interfaces/process.go new file mode 100644 index 0000000..af25c1a --- /dev/null +++ b/pkg/middleware/chat/interfaces/process.go @@ -0,0 +1,26 @@ +package interfaces + +import "github.com/harshabose/socket-comm/pkg/middleware/chat/state" + +type CanProcess interface { + Process(CanBeProcessed, *state.State) error +} + +type CanProcessBackground interface { + ProcessBackground(CanBeProcessedBackground, *state.State) CanBeProcessedBackground +} + +type Processor interface { + CanProcess + CanProcessBackground +} + +type CanBeProcessed interface { + Process(Processor, *state.State) error +} + +type CanBeProcessedBackground interface { + ProcessBackground(Processor, *state.State) CanBeProcessedBackground + Wait() error + Stop() +} diff --git a/pkg/middleware/chat/interfaces/room.go b/pkg/middleware/chat/interfaces/room.go index adf7ac4..3c204e2 100644 --- a/pkg/middleware/chat/interfaces/room.go +++ b/pkg/middleware/chat/interfaces/room.go @@ -1,31 +1,24 @@ package interfaces import ( - "io" "time" "github.com/harshabose/socket-comm/pkg/message" + "github.com/harshabose/socket-comm/pkg/middleware/chat/room" + "github.com/harshabose/socket-comm/pkg/middleware/chat/state" "github.com/harshabose/socket-comm/pkg/middleware/chat/types" ) type CanAdd interface { - Add(types.RoomID, State) error + Add(types.RoomID, *state.State) error } type CanRemove interface { - Remove(types.RoomID, State) error -} - -type Room interface { - CanAdd - CanRemove - ID() types.RoomID - GetParticipants() []types.ClientID - io.Closer + Remove(types.RoomID, *state.State) error } type CanGetRoom interface { - GetRoom(id types.RoomID) (Room, error) + GetRoom(id types.RoomID) (*room.Room, error) } type CanWriteRoomMessage interface { @@ -33,33 +26,9 @@ type CanWriteRoomMessage interface { } type CanCreateRoom interface { - CreateRoom(types.RoomID, []types.ClientID, time.Duration) (Room, error) + CreateRoom(types.RoomID, []types.ClientID, time.Duration) (*room.Room, error) } type CanDeleteRoom interface { DeleteRoom(types.RoomID) error } - -type RoomManager interface { - CanCreateRoom - CanDeleteRoom - CanGetRoom -} - -type RoomProcessor interface { - Process(CanProcess, State) error -} - -type RoomProcessorBackground interface { - ProcessBackground(CanProcessBackground, State) CanProcessBackground -} - -type CanProcess interface { - Process(CanGetRoom, State) error -} - -type CanProcessBackground interface { - ProcessBackground(room CanGetRoom, state State) CanProcessBackground - Wait() error - Stop() -} diff --git a/pkg/middleware/chat/interfaces/state.go b/pkg/middleware/chat/interfaces/state.go index 9677dc2..f88ce50 100644 --- a/pkg/middleware/chat/interfaces/state.go +++ b/pkg/middleware/chat/interfaces/state.go @@ -1,28 +1,17 @@ package interfaces import ( - "context" - "github.com/harshabose/socket-comm/pkg/interceptor" "github.com/harshabose/socket-comm/pkg/message" - "github.com/harshabose/socket-comm/pkg/middleware/chat/types" + "github.com/harshabose/socket-comm/pkg/middleware/chat/state" ) -type CanSetClientID interface { - SetClientID(id types.ClientID) error -} - -type State interface { - GetClientID() (types.ClientID, error) - Ctx() context.Context -} - type CanGetState interface { - GetState(interceptor.Connection) (State, error) + GetState(interceptor.Connection) (*state.State, error) } type CanSetState interface { - SetState(interceptor.Connection, State) error + SetState(interceptor.Connection, *state.State) error } type CanRemoveState interface { @@ -32,10 +21,3 @@ type CanRemoveState interface { type CanWriteMessage interface { Write(message.Message) error } - -type StateManager interface { - CanGetState - CanSetState - CanRemoveState - ForEach(func(interceptor.Connection, State) error) error -} diff --git a/pkg/middleware/chat/messages/ident_messages.go b/pkg/middleware/chat/messages/ident_messages.go index 29d32b5..89c34a5 100644 --- a/pkg/middleware/chat/messages/ident_messages.go +++ b/pkg/middleware/chat/messages/ident_messages.go @@ -34,21 +34,11 @@ func (m *Ident) ReadProcess(_i interceptor.Interceptor, connection interceptor.C return fmt.Errorf("error while processing 'Ident' message; err: %s", err.Error()) } - c, ok := ss.(interfaces.CanSetClientID) - if !ok { - return fmt.Errorf("error while processing 'Ident' message; err: %s", errors.ErrInterfaceMisMatch.Error()) - } - - if err := c.SetClientID(types.ClientID(m.CurrentHeader.Sender)); err != nil { + if err := ss.SetClientID(types.ClientID(m.CurrentHeader.Sender)); err != nil { return fmt.Errorf("error while processing 'Ident' message; err: %s", err.Error()) } - w, ok := ss.(interfaces.CanWriteMessage) - if !ok { - return fmt.Errorf("error while processing 'Ident' message; err: %s", errors.ErrInterfaceMisMatch.Error()) - } - - if err := w.Write(&IdentResponse{}); err != nil { + if err := ss.Write(&IdentResponse{}); err != nil { return fmt.Errorf("error while processing 'Ident' message; err: %s", err.Error()) } @@ -76,12 +66,7 @@ func (m *IdentResponse) ReadProcess(_i interceptor.Interceptor, connection inter return fmt.Errorf("error while processing 'Ident' message; err: %s", err.Error()) } - c, ok := ss.(interfaces.CanSetClientID) - if !ok { - return fmt.Errorf("error while processing 'Ident' message; err: %s", errors.ErrInterfaceMisMatch.Error()) - } - - if err := c.SetClientID(types.ClientID(m.CurrentHeader.Sender)); err != nil { + if err := ss.SetClientID(types.ClientID(m.CurrentHeader.Sender)); err != nil { return fmt.Errorf("error while processing 'Ident' message; err: %s", err.Error()) } diff --git a/pkg/middleware/chat/messages/request_health_stream.go b/pkg/middleware/chat/messages/request_health_stream.go index 9633e33..e1f1223 100644 --- a/pkg/middleware/chat/messages/request_health_stream.go +++ b/pkg/middleware/chat/messages/request_health_stream.go @@ -68,12 +68,7 @@ func (m *RequestHealth) ReadProcess(_i interceptor.Interceptor, connection inter return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error()) } - w, ok := ss.(interfaces.CanWriteMessage) - if !ok { - return errors.ErrInterfaceMisMatch - } - - if err := w.Write(msg); err != nil { + if err := ss.Write(msg); err != nil { return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error()) } diff --git a/pkg/middleware/chat/messages/room_messages.go b/pkg/middleware/chat/messages/room_messages.go index 96ba7c4..055a3e8 100644 --- a/pkg/middleware/chat/messages/room_messages.go +++ b/pkg/middleware/chat/messages/room_messages.go @@ -6,8 +6,10 @@ import ( "github.com/harshabose/socket-comm/pkg/interceptor" "github.com/harshabose/socket-comm/pkg/message" + "github.com/harshabose/socket-comm/pkg/middleware/chat" "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" + "github.com/harshabose/socket-comm/pkg/middleware/chat/state" "github.com/harshabose/socket-comm/pkg/middleware/chat/types" ) @@ -32,31 +34,31 @@ func (m *CreateRoom) GetProtocol() message.Protocol { } func (m *CreateRoom) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error { - s, ok := _i.(interfaces.CanGetState) + i, ok := _i.(*chat.ServerInterceptor) if !ok { return errors.ErrInterfaceMisMatch } - i, ok := _i.(interfaces.CanCreateRoom) + s, err := i.GetState(connection) + if err != nil { + return err + } + + return i.Rooms.Process(m, s) +} + +func (m *CreateRoom) Process(p interfaces.Processor, s *state.State) error { + r, ok := p.(interfaces.CanCreateRoom) if !ok { return errors.ErrInterfaceMisMatch } - ss, err := s.GetState(connection) + room, err := r.CreateRoom(m.RoomID, m.Allowed, m.TTL) if err != nil { - return fmt.Errorf("error while read processing 'CreateRoom' msg; err: %s", err.Error()) + return err } - room, err := i.CreateRoom(m.RoomID, m.Allowed, m.TTL) - if err != nil { - return fmt.Errorf("error while read processing 'CreateRoom' msg; err: %s", err.Error()) - } - - if err := room.Add(m.RoomID, ss); err != nil { - return fmt.Errorf("error while read processing 'CreateRoom' msg; err: %s", err.Error()) - } - - return nil + return room.Add(m.RoomID, s) } type DeleteRoom struct { @@ -68,24 +70,28 @@ func (m *DeleteRoom) GetProtocol() message.Protocol { return DeleteRoomProtocol } -func (m *DeleteRoom) ReadProcess(i interceptor.Interceptor, _ interceptor.Connection) error { - d, ok := i.(interfaces.CanDeleteRoom) +func (m *DeleteRoom) ReadProcess(_i interceptor.Interceptor, _ interceptor.Connection) error { + i, ok := _i.(*chat.ServerInterceptor) if !ok { return errors.ErrInterfaceMisMatch } - if err := d.DeleteRoom(m.RoomID); err != nil { - return fmt.Errorf("error while read processing 'DeleteRoom' msg; err: %s", errors.ErrMessageForServerOnly) + return i.Rooms.Process(m, nil) +} + +func (m *DeleteRoom) Process(p interfaces.Processor, _ *state.State) error { + r, ok := p.(interfaces.CanDeleteRoom) + if !ok { + return errors.ErrInterfaceMisMatch } - return nil + return r.DeleteRoom(m.RoomID) } type ToForwardMessage struct { interceptor.BaseMessage - RoomID types.RoomID `json:"room_id"` - From types.ClientID `json:"from"` - To types.ClientID `json:"to"` + RoomID types.RoomID `json:"room_id"` + To []types.ClientID `json:"to"` } func (m *ToForwardMessage) GetProtocol() message.Protocol { @@ -93,37 +99,40 @@ func (m *ToForwardMessage) GetProtocol() message.Protocol { } func (m *ToForwardMessage) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error { + i, ok := _i.(*chat.ServerInterceptor) + if !ok { + return errors.ErrInterfaceMisMatch + } + s, err := i.GetState(connection) + if err != nil { + return err + } + + return i.Rooms.Process(m, s) +} + +func (m *ToForwardMessage) Process(p interfaces.Processor, s *state.State) error { msg, err := newForwardedMessage(m) if err != nil { - return fmt.Errorf("error while read processing 'ToForwardMessage'; err: %s", err.Error()) + return err } - s, ok := _i.(interfaces.CanGetState) - if !ok { - return fmt.Errorf("error while read processing 'ToForwardMessage'; err: %s", errors.ErrInterfaceMisMatch) - } - - ss, err := s.GetState(connection) + clientID, err := s.GetClientID() if err != nil { - return fmt.Errorf("error while read processing 'ToForwardMessage'; err: %s", errors.ErrInterfaceMisMatch) + return err } - clientID, err := ss.GetClientID() - if err != nil { - return fmt.Errorf("error while read processing 'ToForwardMessage'; err: %s", errors.ErrInterfaceMisMatch) - } - - if m.From != clientID { + if types.ClientID(m.CurrentHeader.Sender) != clientID { return fmt.Errorf("error while read processing 'ToForwardMessage'; From and ClientID did not match") } - w, ok := _i.(interfaces.CanWriteRoomMessage) + w, ok := p.(interfaces.CanWriteRoomMessage) if !ok { return errors.ErrInterfaceMisMatch } - if err := w.WriteRoomMessage(m.RoomID, msg, m.From, m.To); err != nil { - return fmt.Errorf("error while read processing 'ToForwardMessage' msg; err: %s", err.Error()) + if err := w.WriteRoomMessage(m.RoomID, msg, clientID, m.To...); err != nil { + return err } return nil @@ -160,17 +169,21 @@ func (m *JoinRoom) GetProtocol() message.Protocol { } func (m *JoinRoom) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error { - s, ok := _i.(interfaces.CanGetState) + i, ok := _i.(*chat.ServerInterceptor) if !ok { return errors.ErrInterfaceMisMatch } - ss, err := s.GetState(connection) + s, err := i.GetState(connection) if err != nil { - return fmt.Errorf("error while read processing 'JoinRoom' msg; err: %s", err.Error()) + return err } - a, ok := _i.(interfaces.CanAdd) + return i.Rooms.Process(m, s) +} + +func (m *JoinRoom) Process(p interfaces.Processor, s *state.State) error { + a, ok := p.(interfaces.CanAdd) if !ok { return errors.ErrInterfaceMisMatch } @@ -181,9 +194,9 @@ func (m *JoinRoom) ReadProcess(_i interceptor.Interceptor, connection intercepto for { select { case <-timer.C: - return fmt.Errorf("error while read processing 'JoinRoom' msg; err: %s", errors.ErrMessageForServerOnly) + return fmt.Errorf("error while read processing 'JoinRoom' msg; err: %s", errors.ErrContextCancelled) default: - err := a.Add(m.RoomID, ss) + err := a.Add(m.RoomID, s) if err == nil { return nil } @@ -202,24 +215,24 @@ func (m *LeaveRoom) GetProtocol() message.Protocol { } func (m *LeaveRoom) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error { - s, ok := _i.(interfaces.CanGetState) + i, ok := _i.(*chat.ServerInterceptor) if !ok { return errors.ErrInterfaceMisMatch } - ss, err := s.GetState(connection) + s, err := i.GetState(connection) if err != nil { - return fmt.Errorf("error while read processing 'JoinRoom' msg; err: %s", err.Error()) + return err } - a, ok := _i.(interfaces.CanRemove) + return i.Rooms.Process(m, s) +} + +func (m *LeaveRoom) Process(p interfaces.Processor, s *state.State) error { + r, ok := p.(interfaces.CanRemove) if !ok { return errors.ErrInterfaceMisMatch } - if err := a.Remove(m.RoomID, ss); err != nil { - return fmt.Errorf("error while read processing 'JoinRoom' msg; err: %s", err.Error()) - } - - return nil + return r.Remove(m.RoomID, s) } diff --git a/pkg/middleware/chat/process/delete_room_waiter.go b/pkg/middleware/chat/process/delete_room_waiter.go index 3760d19..24ccf85 100644 --- a/pkg/middleware/chat/process/delete_room_waiter.go +++ b/pkg/middleware/chat/process/delete_room_waiter.go @@ -6,7 +6,9 @@ import ( "sync" "time" + "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" + "github.com/harshabose/socket-comm/pkg/middleware/chat/state" "github.com/harshabose/socket-comm/pkg/middleware/chat/types" ) @@ -33,7 +35,7 @@ func NewDeleteRoomWaiter(ctx context.Context, cancel context.CancelFunc, deleter } } -func (p *DeleteRoomWaiter) Process(r interfaces.CanGetRoom, _ interfaces.State) error { +func (p *DeleteRoomWaiter) Process(r interfaces.Processor, _ *state.State) error { timer := time.NewTimer(p.ttl) defer timer.Stop() @@ -49,7 +51,7 @@ func (p *DeleteRoomWaiter) Process(r interfaces.CanGetRoom, _ interfaces.State) } } -func (p *DeleteRoomWaiter) ProcessBackground(r interfaces.CanGetRoom, s interfaces.State) interfaces.CanProcessBackground { +func (p *DeleteRoomWaiter) ProcessBackground(r interfaces.Processor, s *state.State) interfaces.CanBeProcessedBackground { go func() { if err := p.Process(r, s); err != nil { p.mux.Lock() @@ -76,7 +78,12 @@ func (p *DeleteRoomWaiter) Stop() { p.cancel() } -func (p *DeleteRoomWaiter) process(r interfaces.CanGetRoom) error { +func (p *DeleteRoomWaiter) process(_r interfaces.Processor) error { + r, ok := _r.(interfaces.CanGetRoom) + if !ok { + return errors.ErrInterfaceMisMatch + } + room, err := r.GetRoom(p.roomid) if err != nil { return fmt.Errorf("error while processing DelteRoomWaiter process; err: %s", err.Error()) diff --git a/pkg/middleware/chat/process/ident_starter.go b/pkg/middleware/chat/process/ident_starter.go new file mode 100644 index 0000000..17d1ed4 --- /dev/null +++ b/pkg/middleware/chat/process/ident_starter.go @@ -0,0 +1,33 @@ +package process + +import ( + "context" + "fmt" + + "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" + "github.com/harshabose/socket-comm/pkg/middleware/chat/state" +) + +type IdentInit struct { + ctx context.Context +} + +func NewIdentInit(ctx context.Context) *IdentInit { + return &IdentInit{ + ctx: ctx, + } +} + +func (p *IdentInit) Process(r interfaces.Processor, s *state.State) error { + // TODO: SEND IDENT MESSAGE + if err := s.Write(); err != nil { + return err + } + + waiter := NewIdentWaiter(p.ctx) + if err := waiter.Process(r, s); err != nil { + return fmt.Errorf("error while processing IdentInit process; err: %s", err.Error()) + } + + return nil +} diff --git a/pkg/middleware/chat/process/ident_waiter.go b/pkg/middleware/chat/process/ident_waiter.go new file mode 100644 index 0000000..5ad5c26 --- /dev/null +++ b/pkg/middleware/chat/process/ident_waiter.go @@ -0,0 +1,53 @@ +package process + +import ( + "context" + "fmt" + "time" + + "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" + "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" + "github.com/harshabose/socket-comm/pkg/middleware/chat/state" +) + +type IdentWaiterOptions func(*IdentWaiter) + +func NewIdentWaiter(ctx context.Context, options ...IdentWaiterOptions) *IdentWaiter { + p := &IdentWaiter{ + ctx: ctx, + duration: 500 * time.Millisecond, + } + + for _, option := range options { + option(p) + } + + return p +} + +type IdentWaiter struct { + ctx context.Context + duration time.Duration +} + +func (p *IdentWaiter) Process(_ interfaces.Processor, s *state.State) error { + ticker := time.NewTicker(p.duration) + defer ticker.Stop() + + for { + select { + case <-p.ctx.Done(): + return errors.ErrContextCancelled + case <-ticker.C: + if err := p.process(nil, s); err == nil { + return nil + } + fmt.Println("waiting for ident...") + } + } +} + +func (p *IdentWaiter) process(_ interfaces.Processor, s *state.State) error { + _, err := s.GetClientID() + return err +} diff --git a/pkg/middleware/chat/process/send_message_room.go b/pkg/middleware/chat/process/send_message_room.go index 96c6782..8faf2c8 100644 --- a/pkg/middleware/chat/process/send_message_room.go +++ b/pkg/middleware/chat/process/send_message_room.go @@ -10,6 +10,7 @@ import ( "github.com/harshabose/socket-comm/pkg/message" "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" + "github.com/harshabose/socket-comm/pkg/middleware/chat/state" "github.com/harshabose/socket-comm/pkg/middleware/chat/types" ) @@ -35,7 +36,7 @@ func NewSendMessageRoom(ctx context.Context, cancel context.CancelFunc, msgFacto } } -func (p *SendMessageRoom) Process(r interfaces.CanGetRoom, s interfaces.State) error { +func (p *SendMessageRoom) Process(r interfaces.Processor, s *state.State) error { ticker := time.NewTicker(p.interval) defer ticker.Stop() @@ -51,9 +52,9 @@ func (p *SendMessageRoom) Process(r interfaces.CanGetRoom, s interfaces.State) e } } -func (p *SendMessageRoom) ProcessBackground(r interfaces.CanGetRoom, s interfaces.State) interfaces.CanProcessBackground { +func (p *SendMessageRoom) ProcessBackground(_r interfaces.Processor, s *state.State) interfaces.CanBeProcessedBackground { go func() { - if err := p.Process(r, s); err != nil { + if err := p.Process(_r, s); err != nil { p.mux.Lock() p.err = err p.mux.Unlock() @@ -78,7 +79,12 @@ func (p *SendMessageRoom) Stop() { p.cancel() } -func (p *SendMessageRoom) process(r interfaces.CanGetRoom, _ interfaces.State) error { +func (p *SendMessageRoom) process(_r interfaces.Processor, _ *state.State) error { + r, ok := _r.(interfaces.CanGetRoom) + if !ok { + return errors.ErrInterfaceMisMatch + } + room, err := r.GetRoom(p.roomid) if err != nil { return err @@ -86,11 +92,6 @@ func (p *SendMessageRoom) process(r interfaces.CanGetRoom, _ interfaces.State) e participants := room.GetParticipants() - w, ok := room.(interfaces.CanWriteRoomMessage) - if !ok { - return errors.ErrInterfaceMisMatch - } - merr := util.NewMultiError() for _, participant := range participants { @@ -100,7 +101,7 @@ func (p *SendMessageRoom) process(r interfaces.CanGetRoom, _ interfaces.State) e continue } - if err := w.WriteRoomMessage(p.roomid, msg, "", participant); err != nil { + if err := room.WriteRoomMessage(p.roomid, msg, "", participant); err != nil { merr.Add(err) } } diff --git a/pkg/middleware/chat/process/wait_until_ident.go b/pkg/middleware/chat/process/wait_until_ident.go deleted file mode 100644 index 220f99e..0000000 --- a/pkg/middleware/chat/process/wait_until_ident.go +++ /dev/null @@ -1,57 +0,0 @@ -package process - -import ( - "context" - "fmt" - "time" - - "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" - "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" -) - -type WaitUntilIdent struct { - ctx context.Context - duration time.Duration -} - -type WaitUntilIdentOption func(*WaitUntilIdent) - -func WithTickerDuration(duration time.Duration) WaitUntilIdentOption { - return WaitUntilIdentOption(func(ident *WaitUntilIdent) { - ident.duration = duration - }) -} - -func NewWaitUntilIdentComplete(ctx context.Context, options ...WaitUntilIdentOption) *WaitUntilIdent { - i := &WaitUntilIdent{ - ctx: ctx, - duration: 500 * time.Millisecond, - } - - for _, option := range options { - option(i) - } - - return i -} - -func (p WaitUntilIdent) Process(_ interfaces.CanGetRoom, s interfaces.State) error { - ticker := time.NewTicker(p.duration) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - if p.process(s) { - return nil - } - case <-p.ctx.Done(): - return fmt.Errorf("error while processing WaitUntilIdent process; err: %s", errors.ErrContextCancelled.Error()) - } - } -} - -func (p WaitUntilIdent) process(s interfaces.State) bool { - _, err := s.GetClientID() - return err == nil -} diff --git a/pkg/middleware/chat/processors/health.go b/pkg/middleware/chat/processors/health.go new file mode 100644 index 0000000..ee7a394 --- /dev/null +++ b/pkg/middleware/chat/processors/health.go @@ -0,0 +1,21 @@ +package processors + +import ( + "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" + "github.com/harshabose/socket-comm/pkg/middleware/chat/state" +) + +type Health struct { +} + +func (p *Health) Process(process interfaces.CanBeProcessed, state *state.State) error { + return process.Process(p, state) +} + +func (p *Health) ProcessBackground(process interfaces.CanBeProcessedBackground, state *state.State) interfaces.CanBeProcessedBackground { + return process.ProcessBackground(p, state) +} + +func (p *Health) GetSnapshot() { + +} diff --git a/pkg/middleware/chat/processors/room.go b/pkg/middleware/chat/processors/room.go new file mode 100644 index 0000000..1fa8169 --- /dev/null +++ b/pkg/middleware/chat/processors/room.go @@ -0,0 +1,106 @@ +package processors + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/harshabose/socket-comm/pkg/message" + "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" + "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" + "github.com/harshabose/socket-comm/pkg/middleware/chat/messages" + "github.com/harshabose/socket-comm/pkg/middleware/chat/process" + "github.com/harshabose/socket-comm/pkg/middleware/chat/room" + "github.com/harshabose/socket-comm/pkg/middleware/chat/state" + "github.com/harshabose/socket-comm/pkg/middleware/chat/types" +) + +type RoomManager struct { + rooms map[types.RoomID]*room.Room + mux sync.RWMutex + ctx context.Context +} + +// TODO: DO I NEED MUX HERE? + +func (m *RoomManager) Add(id types.RoomID, s *state.State) error { + r, err := m.GetRoom(id) + if err != nil { + return err + } + + return r.Add(id, s) +} + +func (m *RoomManager) Remove(id types.RoomID, s *state.State) error { + r, err := m.GetRoom(id) + if err != nil { + return err + } + + return r.Remove(id, s) +} + +func (m *RoomManager) CreateRoom(id types.RoomID, allowed []types.ClientID, ttl time.Duration) (*room.Room, error) { + if m.exists(id) { + return nil, fmt.Errorf("error while creating r with id %s; err: %s", id, errors.ErrRoomAlreadyExists) + } + + ctx, cancel := context.WithTimeout(m.ctx, ttl) + r := room.NewRoom(ctx, cancel, id, allowed) + + m.rooms[id] = r + + // NOTE: THE FOLLOWING STARTS A BACKGROUND PROCESS WHICH WAITS UNTIL TTL AND KILLS THE ROOM. THIS DOES NOT KILL CONNECTION + _ = m.ProcessBackground(process.NewDeleteRoomWaiter(ctx, cancel, m, id, ttl), nil) + + // NOTE: THE FOLLOWING STARTS A BACKGROUND PROCESS WHICH CONSTANTLY SENDS THE GIVEN MESSAGE TO EVERY PARTICIPANT IN THE ROOM + _ = m.ProcessBackground(process.NewSendMessageRoom(ctx, cancel, messages.NewRequestHealthFactory(id), id, 5*time.Second), nil) + return r, nil +} + +func (m *RoomManager) GetRoom(id types.RoomID) (*room.Room, error) { + exists := m.exists(id) + if !exists { + return nil, fmt.Errorf("error while getting room with id %s; err: %s", id, errors.ErrRoomNotFound) + } + + return m.rooms[id], nil +} + +func (m *RoomManager) exists(id types.RoomID) bool { + _, exists := m.rooms[id] + return exists +} + +func (m *RoomManager) DeleteRoom(id types.RoomID) error { + r, err := m.GetRoom(id) + if err != nil { + return fmt.Errorf("error while deleting r with id: %s; err: %s", id, err.Error()) + } + + if err := r.Close(); err != nil { + return fmt.Errorf("error while deleting r with id: %s; err: %s", id, err.Error()) + } + + delete(m.rooms, id) + return nil +} + +func (m *RoomManager) WriteRoomMessage(roomid types.RoomID, msg message.Message, from types.ClientID, tos ...types.ClientID) error { + r, err := m.GetRoom(roomid) + if err != nil { + return err + } + + return r.WriteRoomMessage(roomid, msg, from, tos...) +} + +func (m *RoomManager) Process(process interfaces.CanBeProcessed, state *state.State) error { + return process.Process(m, state) +} + +func (m *RoomManager) ProcessBackground(process interfaces.CanBeProcessedBackground, state *state.State) interfaces.CanBeProcessedBackground { + return process.ProcessBackground(m, state) +} diff --git a/pkg/middleware/chat/room/manager.go b/pkg/middleware/chat/room/manager.go deleted file mode 100644 index b4e0a0f..0000000 --- a/pkg/middleware/chat/room/manager.go +++ /dev/null @@ -1,105 +0,0 @@ -package room - -import ( - "context" - "fmt" - "time" - - "github.com/harshabose/socket-comm/pkg/message" - "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" - "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" - "github.com/harshabose/socket-comm/pkg/middleware/chat/messages" - "github.com/harshabose/socket-comm/pkg/middleware/chat/process" - "github.com/harshabose/socket-comm/pkg/middleware/chat/types" -) - -type Manager struct { - rooms map[types.RoomID]interfaces.Room - ctx context.Context -} - -func (m *Manager) Add(id types.RoomID, s interfaces.State) error { - room, err := m.GetRoom(id) - if err != nil { - return err - } - - return room.Add(id, s) -} - -func (m *Manager) Remove(id types.RoomID, s interfaces.State) error { - room, err := m.GetRoom(id) - if err != nil { - return err - } - - return room.Remove(id, s) -} - -func (m *Manager) CreateRoom(id types.RoomID, allowed []types.ClientID, ttl time.Duration) (interfaces.Room, error) { - if m.exists(id) { - return nil, fmt.Errorf("error while creating room with id %s; err: %s", id, errors.ErrRoomAlreadyExists) - } - - ctx, cancel := context.WithTimeout(m.ctx, ttl) - room := NewRoom(ctx, cancel, id, allowed) - - m.rooms[id] = room - - // NOTE: THE FOLLOWING STARTS A BACKGROUND PROCESS WHICH WAITS UNTIL TTL AND KILLS THE ROOM. THIS DOES NOT KILL CONNECTION - _ = m.ProcessBackground(process.NewDeleteRoomWaiter(ctx, cancel, m, id, ttl), nil) - - // NOTE: THE FOLLOWING STARTS A BACKGROUND PROCESS WHICH CONSTANTLY SENDS THE GIVEN MESSAGE TO EVERY PARTICIPANT IN THE ROOM - _ = m.ProcessBackground(process.NewSendMessageRoom(ctx, cancel, messages.NewRequestHealthFactory(id), id, 5*time.Second), nil) - return room, nil -} - -func (m *Manager) GetRoom(id types.RoomID) (interfaces.Room, error) { - exists := m.exists(id) - if !exists { - return nil, fmt.Errorf("error while getting room with id %s; err: %s", id, errors.ErrRoomNotFound) - } - - return m.rooms[id], nil -} - -func (m *Manager) exists(id types.RoomID) bool { - _, exists := m.rooms[id] - return exists -} - -func (m *Manager) DeleteRoom(id types.RoomID) error { - room, err := m.GetRoom(id) - if err != nil { - return fmt.Errorf("error while deleting room with id: %s; err: %s", id, err.Error()) - } - - if err := room.Close(); err != nil { - return fmt.Errorf("error while deleting room with id: %s; err: %s", id, err.Error()) - } - - delete(m.rooms, id) - return nil -} - -func (m *Manager) WriteRoomMessage(roomid types.RoomID, msg message.Message, from types.ClientID, tos ...types.ClientID) error { - room, err := m.GetRoom(roomid) - if err != nil { - return err - } - - w, ok := room.(interfaces.CanWriteRoomMessage) - if !ok { - return errors.ErrInterfaceMisMatch - } - - return w.WriteRoomMessage(roomid, msg, from, tos...) -} - -func (m *Manager) Process(process interfaces.CanProcess, state interfaces.State) error { - return process.Process(m, state) -} - -func (m *Manager) ProcessBackground(process interfaces.CanProcessBackground, state interfaces.State) interfaces.CanProcessBackground { - return process.ProcessBackground(m, state) -} diff --git a/pkg/middleware/chat/room/room.go b/pkg/middleware/chat/room/room.go index 6bb77d4..fc4de9d 100644 --- a/pkg/middleware/chat/room/room.go +++ b/pkg/middleware/chat/room/room.go @@ -6,7 +6,7 @@ import ( "github.com/harshabose/socket-comm/pkg/message" "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" - "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" + "github.com/harshabose/socket-comm/pkg/middleware/chat/state" "github.com/harshabose/socket-comm/pkg/middleware/chat/types" ) @@ -14,18 +14,20 @@ type Room struct { // NOTE: MAYBE A CONFIG FOR ROOM? roomid types.RoomID allowed []types.ClientID - participants map[types.ClientID]interfaces.State + participants map[types.ClientID]*state.State cancel context.CancelFunc ctx context.Context } +// TODO: DO I NEED MUX HERE? + func NewRoom(ctx context.Context, cancel context.CancelFunc, id types.RoomID, allowed []types.ClientID) *Room { return &Room{ ctx: ctx, cancel: cancel, roomid: id, allowed: allowed, - participants: make(map[types.ClientID]interfaces.State), + participants: make(map[types.ClientID]*state.State), } } @@ -33,7 +35,7 @@ func (r *Room) ID() types.RoomID { return r.roomid } -func (r *Room) Add(roomid types.RoomID, s interfaces.State) error { +func (r *Room) Add(roomid types.RoomID, s *state.State) error { if roomid != r.roomid { return errors.ErrWrongRoom } @@ -93,7 +95,7 @@ func (r *Room) forEachBoolean(f func(id types.ClientID) bool, ids ...types.Clien return true } -func (r *Room) Remove(roomid types.RoomID, s interfaces.State) error { +func (r *Room) Remove(roomid types.RoomID, s *state.State) error { if roomid != r.roomid { return errors.ErrWrongRoom } @@ -152,12 +154,7 @@ func (r *Room) WriteRoomMessage(roomid types.RoomID, msg message.Message, from t } for _, to := range tos { - s, ok := (r.participants[to]).(interfaces.CanWriteMessage) - if !ok { - return errors.ErrInterfaceMisMatch - } - - if err := s.Write(msg); err != nil { + if err := r.participants[to].Write(msg); err != nil { return fmt.Errorf("error while sending message to peer in room; err: %s", err.Error()) } } @@ -181,7 +178,7 @@ func (r *Room) GetParticipants() []types.ClientID { func (r *Room) Close() error { r.cancel() - r.participants = make(map[types.ClientID]interfaces.State) + r.participants = make(map[types.ClientID]*state.State) r.allowed = make([]types.ClientID, 0) return nil } diff --git a/pkg/middleware/chat/server_interceptor.go b/pkg/middleware/chat/server_interceptor.go index c36f310..f9279e7 100644 --- a/pkg/middleware/chat/server_interceptor.go +++ b/pkg/middleware/chat/server_interceptor.go @@ -6,17 +6,14 @@ import ( "time" "github.com/harshabose/socket-comm/pkg/interceptor" - "github.com/harshabose/socket-comm/pkg/message" - "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" "github.com/harshabose/socket-comm/pkg/middleware/chat/process" - "github.com/harshabose/socket-comm/pkg/middleware/chat/types" ) type ServerInterceptor struct { commonInterceptor - interceptor.Interceptor - rooms interfaces.RoomManager + Rooms interfaces.Processor + Health interfaces.Processor } func (i *ServerInterceptor) BindSocketConnection(connection interceptor.Connection, writer interceptor.Writer, reader interceptor.Reader) (interceptor.Writer, interceptor.Reader, error) { @@ -29,21 +26,11 @@ func (i *ServerInterceptor) Init(connection interceptor.Connection) error { return fmt.Errorf("error while init; err: %s", err.Error()) } - w, ok := s.(interfaces.CanWriteMessage) - if !ok { - return errors.ErrInterfaceMisMatch - } - - // TODO: SEND IDENT MESSAGE - if err := w.Write(); err != nil { - return fmt.Errorf("error while init; err: %s", err.Error()) - } - - ctx, cancel := context.WithTimeout(s.Ctx(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - p := process.NewWaitUntilIdentComplete(ctx, process.WithTickerDuration(500*time.Millisecond)) - if err := p.Process(nil, s); err != nil { + p := process.NewIdentInit(ctx) + if err := p.Process(i.Rooms, s); err != nil { return fmt.Errorf("error while init; err: %s", err.Error()) } @@ -57,38 +44,3 @@ func (i *ServerInterceptor) UnBindSocketConnection(connection interceptor.Connec func (i *ServerInterceptor) Close() error { return nil } - -func (i *ServerInterceptor) CreateRoom(id types.RoomID, allowed []types.ClientID, ttl time.Duration) (interfaces.Room, error) { - return i.rooms.CreateRoom(id, allowed, ttl) -} - -func (i *ServerInterceptor) DeleteRoom(id types.RoomID) error { - return i.rooms.DeleteRoom(id) -} - -func (i *ServerInterceptor) WriteRoomMessage(roomid types.RoomID, msg message.Message, from types.ClientID, tos ...types.ClientID) error { - w, ok := i.rooms.(interfaces.CanWriteRoomMessage) - if !ok { - return errors.ErrInterfaceMisMatch - } - - return w.WriteRoomMessage(roomid, msg, from, tos...) -} - -func (i *ServerInterceptor) Add(id types.RoomID, s interfaces.State) error { - a, ok := i.rooms.(interfaces.CanAdd) - if !ok { - return errors.ErrInterfaceMisMatch - } - - return a.Add(id, s) -} - -func (i *ServerInterceptor) Remove(id types.RoomID, s interfaces.State) error { - r, ok := i.rooms.(interfaces.CanRemove) - if !ok { - return errors.ErrInterfaceMisMatch - } - - return r.Remove(id, s) -} diff --git a/pkg/middleware/chat/state/manager.go b/pkg/middleware/chat/state/manager.go index 5b4c884..b83ec2f 100644 --- a/pkg/middleware/chat/state/manager.go +++ b/pkg/middleware/chat/state/manager.go @@ -6,27 +6,26 @@ import ( "github.com/harshabose/socket-comm/internal/util" "github.com/harshabose/socket-comm/pkg/interceptor" "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" - "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" ) type Manager struct { - states map[interceptor.Connection]interfaces.State + states map[interceptor.Connection]*State mux sync.RWMutex } -func (m *Manager) GetState(connection interceptor.Connection) (interfaces.State, error) { +func (m *Manager) GetState(connection interceptor.Connection) (*State, error) { m.mux.RLock() defer m.mux.RUnlock() - state, exists := m.states[connection] + s, exists := m.states[connection] if !exists { return nil, errors.ErrConnectionNotFound } - return state, nil + return s, nil } -func (m *Manager) SetState(connection interceptor.Connection, s interfaces.State) error { +func (m *Manager) SetState(connection interceptor.Connection, s *State) error { m.mux.Lock() defer m.mux.Unlock() @@ -38,7 +37,7 @@ func (m *Manager) SetState(connection interceptor.Connection, s interfaces.State return nil } -// RemoveState removes a connection's state +// RemoveState removes a connection's State func (m *Manager) RemoveState(connection interceptor.Connection) error { m.mux.Lock() defer m.mux.Unlock() @@ -52,14 +51,14 @@ func (m *Manager) RemoveState(connection interceptor.Connection) error { return nil } -// ForEach executes the provided function for each state in the manager -func (m *Manager) ForEach(fn func(connection interceptor.Connection, state interfaces.State) error) error { +// ForEach executes the provided function for each State in the manager +func (m *Manager) ForEach(fn func(connection interceptor.Connection, state *State) error) error { m.mux.RLock() defer m.mux.RUnlock() var errs util.MultiError - for conn, state := range m.states { - if err := fn(conn, state); err != nil { + for conn, s := range m.states { + if err := fn(conn, s); err != nil { errs.Add(err) } } diff --git a/pkg/middleware/chat/state/state.go b/pkg/middleware/chat/state/state.go index 10cc453..7568977 100644 --- a/pkg/middleware/chat/state/state.go +++ b/pkg/middleware/chat/state/state.go @@ -6,11 +6,10 @@ import ( "github.com/harshabose/socket-comm/pkg/interceptor" "github.com/harshabose/socket-comm/pkg/message" "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" - "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" "github.com/harshabose/socket-comm/pkg/middleware/chat/types" ) -type state struct { +type State struct { id types.ClientID connection interceptor.Connection writer interceptor.Writer @@ -19,8 +18,8 @@ type state struct { ctx context.Context } -func NewState(ctx context.Context, cancel context.CancelFunc, connection interceptor.Connection, writer interceptor.Writer, reader interceptor.Reader) interfaces.State { - return &state{ +func NewState(ctx context.Context, cancel context.CancelFunc, connection interceptor.Connection, writer interceptor.Writer, reader interceptor.Reader) *State { + return &State{ id: types.UnKnownClient, connection: connection, writer: writer, @@ -30,11 +29,11 @@ func NewState(ctx context.Context, cancel context.CancelFunc, connection interce } } -func (s *state) Ctx() context.Context { +func (s *State) Ctx() context.Context { return s.ctx } -func (s *state) GetClientID() (types.ClientID, error) { +func (s *State) GetClientID() (types.ClientID, error) { if s.id == types.UnKnownClient { return s.id, errors.ErrUnknownClientIDState } @@ -42,11 +41,11 @@ func (s *state) GetClientID() (types.ClientID, error) { return s.id, nil } -func (s *state) Write(msg message.Message) error { +func (s *State) Write(msg message.Message) error { return s.writer.Write(s.connection, msg) } -func (s *state) SetClientID(id types.ClientID) error { +func (s *State) SetClientID(id types.ClientID) error { if s.id != types.UnKnownClient { return errors.ErrClientIDNotConsistent }