diff --git a/pkg/message/message.go b/pkg/message/message.go index d71a6c1..615210f 100644 --- a/pkg/message/message.go +++ b/pkg/message/message.go @@ -66,6 +66,8 @@ type Message interface { GetNextProtocol() Protocol + GetCurrentHeader() Header + // GetNext retrieves the next message in the chain, if one exists // Returns nil, nil if there is no next message GetNext(Registry) (Message, error) @@ -127,6 +129,10 @@ func (m *BaseMessage) GetNextProtocol() Protocol { return m.NextProtocol } +func (m *BaseMessage) GetCurrentHeader() Header { + return m.CurrentHeader +} + // GetNext retrieves the next message in the chain, if one exists. // Returns nil, nil if NextProtocol is NoneProtocol. // Uses the provided Registry to create and unmarshal the next message. diff --git a/pkg/middleware/chat/client_interceptor.go b/pkg/middleware/chat/client_interceptor.go new file mode 100644 index 0000000..309e8b5 --- /dev/null +++ b/pkg/middleware/chat/client_interceptor.go @@ -0,0 +1,13 @@ +package chat + +import ( + "github.com/harshabose/socket-comm/pkg/interceptor" +) + +type ClientInterceptor struct { + commonInterceptor +} + +func (i *ClientInterceptor) BindSocketConnection(connection interceptor.Connection, writer interceptor.Writer, reader interceptor.Reader) (interceptor.Writer, interceptor.Reader, error) { + return i.commonInterceptor.BindSocketConnection(connection, writer, reader) +} diff --git a/pkg/middleware/chat/common_interceptor.go b/pkg/middleware/chat/common_interceptor.go new file mode 100644 index 0000000..5320865 --- /dev/null +++ b/pkg/middleware/chat/common_interceptor.go @@ -0,0 +1,103 @@ +package chat + +import ( + "context" + + "github.com/harshabose/socket-comm/pkg/interceptor" + "github.com/harshabose/socket-comm/pkg/message" + "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" + "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" + "github.com/harshabose/socket-comm/pkg/middleware/chat/state" +) + +type commonInterceptor struct { + interceptor.NoOpInterceptor + readProcessMessages message.Registry + writeProcessMessages message.Registry + states interfaces.StateManager +} + +func (i *commonInterceptor) BindSocketConnection(connection interceptor.Connection, writer interceptor.Writer, reader interceptor.Reader) (interceptor.Writer, interceptor.Reader, error) { + ctx, cancel := context.WithCancel(i.Ctx()) + + if err := i.states.SetState(connection, state.NewState(ctx, cancel, connection, writer, reader)); err != nil { + cancel() + return nil, nil, err + } + + return writer, reader, nil +} + +func (i *commonInterceptor) InterceptSocketWriter(writer interceptor.Writer) interceptor.Writer { + return interceptor.WriterFunc(func(connection interceptor.Connection, msg message.Message) error { + if msg == nil { + return nil + } + + if !i.writeProcessMessages.Check(msg.GetProtocol()) { + return writer.Write(connection, msg) + } + + m, ok := msg.(interceptor.Message) + if !ok { + return writer.Write(connection, msg) + } + + next, err := m.GetNext(nil) + if err != nil { + return writer.Write(connection, msg) + } + + if err := m.WriteProcess(i, connection); err != nil { + return writer.Write(connection, next) + } + + return writer.Write(connection, next) + }) +} + +func (i *commonInterceptor) InterceptSocketReader(reader interceptor.Reader) interceptor.Reader { + return interceptor.ReaderFunc(func(connection interceptor.Connection) (message.Message, error) { + msg, err := reader.Read(connection) + if err != nil { + return msg, err + } + + if msg == nil { + return nil, nil + } + + if !i.readProcessMessages.Check(msg.GetProtocol()) { + return msg, nil + } + + m, ok := msg.(interceptor.Message) + if !ok { + return msg, errors.ErrInterfaceMisMatch + } + + next, err := m.GetNext(nil) + if err != nil { + return msg, nil + } + + if err := m.ReadProcess(i, connection); err != nil { + return next, nil + } + + return next, nil + }) +} + +func (i *commonInterceptor) UnBindSocketConnection(connection interceptor.Connection) { + +} + +func (i *commonInterceptor) Close() error { + // TODO: FIGURE OUR GOOD CLOSE STRATEGY + return nil +} + +func (i *commonInterceptor) GetState(connection interceptor.Connection) (interfaces.State, error) { + return i.states.GetState(connection) +} diff --git a/pkg/middleware/chat/config/config.go b/pkg/middleware/chat/config/config.go deleted file mode 100644 index 0cfeeb6..0000000 --- a/pkg/middleware/chat/config/config.go +++ /dev/null @@ -1,5 +0,0 @@ -package config - -type Config struct { - IsServer bool -} diff --git a/pkg/middleware/chat/errors/errors.go b/pkg/middleware/chat/errors/errors.go index 2b4bc4c..7ef0148 100644 --- a/pkg/middleware/chat/errors/errors.go +++ b/pkg/middleware/chat/errors/errors.go @@ -3,9 +3,11 @@ package errors import "errors" var ( - ErrInterfaceMisMatch = errors.New("unsatisfied interface triggered") - ErrMessageForServerOnly = errors.New("message should only be processed by server") - ErrMessageForClientOnly = errors.New("message should only be processed by client") + ErrContextCancelled = errors.New("context cancelled") + ErrInterfaceMisMatch = errors.New("unsatisfied interface triggered") + ErrMessageForServerOnly = errors.New("message should only be processed by server") + ErrMessageForClientOnly = errors.New("message should only be processed by client") + ErrClientIDNotConsistent = errors.New("client id is not consistent throughout the connection") ErrConnectionNotFound = errors.New("connection not registered") ErrConnectionExists = errors.New("connection already exists") @@ -19,3 +21,7 @@ var ( ErrClientNotAParticipant = errors.New("client is not a participant in the room at the moment") ErrWrongRoom = errors.New("operation not permitted as room id did not match") ) + +func New(text string) error { + return errors.New(text) +} diff --git a/pkg/middleware/chat/interceptor.go b/pkg/middleware/chat/interceptor.go deleted file mode 100644 index acc1d60..0000000 --- a/pkg/middleware/chat/interceptor.go +++ /dev/null @@ -1,115 +0,0 @@ -package chat - -import ( - "context" - "time" - - "github.com/harshabose/socket-comm/pkg/interceptor" - "github.com/harshabose/socket-comm/pkg/message" - "github.com/harshabose/socket-comm/pkg/middleware/chat/config" - "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" - "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" - "github.com/harshabose/socket-comm/pkg/middleware/chat/state" - "github.com/harshabose/socket-comm/pkg/middleware/chat/types" -) - -type Interceptor struct { - interceptor.NoOpInterceptor - localMessageRegistry message.Registry - states interfaces.StateManager - rooms interfaces.RoomManager - config config.Config -} - -func (i *Interceptor) BindSocketConnection(connection interceptor.Connection, writer interceptor.Writer, reader interceptor.Reader) (interceptor.Writer, interceptor.Reader, error) { - ctx, cancel := context.WithCancel(i.Ctx()) - - if err := i.states.SetState(connection, state.NewState(ctx, cancel, i.config, connection, writer, reader)); err != nil { - cancel() - return nil, nil, err - } - - return writer, reader, nil -} - -func (i *Interceptor) Init(connection interceptor.Connection) error { - // TODO: ADD DEFAULT ROOM CREATION / JOIN ROOM - return nil -} - -func (i *Interceptor) InterceptSocketWriter(writer interceptor.Writer) interceptor.Writer { - return interceptor.WriterFunc(func(connection interceptor.Connection, message message.Message) error { - return writer.Write(connection, message) - }) -} - -func (i *Interceptor) InterceptSocketReader(reader interceptor.Reader) interceptor.Reader { - return interceptor.ReaderFunc(func(connection interceptor.Connection) (message.Message, error) { - msg, err := reader.Read(connection) - if err != nil { - return msg, err - } - - if !i.localMessageRegistry.Check(msg.GetProtocol()) { - return msg, nil - } - - m, ok := msg.(interceptor.Message) - if !ok { - return msg, errors.ErrInterfaceMisMatch - } - - if err := m.ReadProcess(i, connection); err != nil { - return nil, err - } - - return m.GetNext(i.GetMessageRegistry()) - }) -} - -func (i *Interceptor) UnBindSocketConnection(connection interceptor.Connection) { - -} - -func (i *Interceptor) Close() error { - return nil -} - -func (i *Interceptor) GetState(connection interceptor.Connection) (interfaces.State, error) { - return i.states.GetState(connection) -} - -func (i *Interceptor) CreateRoom(id types.RoomID, allowed []types.ClientID, ttl time.Duration) (interfaces.Room, error) { - return i.rooms.CreateRoom(id, allowed, ttl) -} - -func (i *Interceptor) DeleteRoom(id types.RoomID) error { - return i.rooms.DeleteRoom(id) -} - -func (i *Interceptor) WriteMessage(roomid types.RoomID, msg message.Message, from types.ClientID, tos ...types.ClientID) error { - w, ok := i.rooms.(interfaces.CanWriteRoomMessage) - if !ok { - return errors.ErrInterfaceMisMatch - } - - return w.WriteMessage(roomid, msg, from, tos...) -} - -func (i *Interceptor) Add(id types.RoomID, s interfaces.State) error { - a, ok := i.rooms.(interfaces.CanAdd) - if !ok { - return errors.ErrInterfaceMisMatch - } - - return a.Add(id, s) -} - -func (i *Interceptor) Remove(id types.RoomID, s interfaces.State) error { - r, ok := i.rooms.(interfaces.CanRemove) - if !ok { - return errors.ErrInterfaceMisMatch - } - - return r.Remove(id, s) -} diff --git a/pkg/middleware/chat/interfaces/room.go b/pkg/middleware/chat/interfaces/room.go index b9cfc94..adf7ac4 100644 --- a/pkg/middleware/chat/interfaces/room.go +++ b/pkg/middleware/chat/interfaces/room.go @@ -20,6 +20,7 @@ type Room interface { CanAdd CanRemove ID() types.RoomID + GetParticipants() []types.ClientID io.Closer } @@ -28,7 +29,7 @@ type CanGetRoom interface { } type CanWriteRoomMessage interface { - WriteMessage(roomid types.RoomID, msg message.Message, from types.ClientID, tos ...types.ClientID) error + WriteRoomMessage(roomid types.RoomID, msg message.Message, from types.ClientID, tos ...types.ClientID) error } type CanCreateRoom interface { @@ -42,12 +43,23 @@ type CanDeleteRoom interface { type RoomManager interface { CanCreateRoom CanDeleteRoom + CanGetRoom } type RoomProcessor interface { Process(CanProcess, State) error } +type RoomProcessorBackground interface { + ProcessBackground(CanProcessBackground, State) CanProcessBackground +} + type CanProcess interface { Process(CanGetRoom, State) error } + +type CanProcessBackground interface { + ProcessBackground(room CanGetRoom, state State) CanProcessBackground + Wait() error + Stop() +} diff --git a/pkg/middleware/chat/interfaces/state.go b/pkg/middleware/chat/interfaces/state.go index 989fd4f..9677dc2 100644 --- a/pkg/middleware/chat/interfaces/state.go +++ b/pkg/middleware/chat/interfaces/state.go @@ -1,15 +1,20 @@ package interfaces import ( + "context" + "github.com/harshabose/socket-comm/pkg/interceptor" "github.com/harshabose/socket-comm/pkg/message" - "github.com/harshabose/socket-comm/pkg/middleware/chat/config" "github.com/harshabose/socket-comm/pkg/middleware/chat/types" ) +type CanSetClientID interface { + SetClientID(id types.ClientID) error +} + type State interface { GetClientID() (types.ClientID, error) - GetConfig() config.Config + Ctx() context.Context } type CanGetState interface { diff --git a/pkg/middleware/chat/messages/ident_messages.go b/pkg/middleware/chat/messages/ident_messages.go new file mode 100644 index 0000000..29d32b5 --- /dev/null +++ b/pkg/middleware/chat/messages/ident_messages.go @@ -0,0 +1,89 @@ +package messages + +import ( + "fmt" + + "github.com/harshabose/socket-comm/pkg/interceptor" + "github.com/harshabose/socket-comm/pkg/message" + "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" + "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" + "github.com/harshabose/socket-comm/pkg/middleware/chat/types" +) + +var ( + IdentProtocol message.Protocol = "room:ident" + IdentResponseProtocol message.Protocol = "room:ident_response" +) + +type Ident struct { + interceptor.BaseMessage +} + +func (m *Ident) GetProtocol() message.Protocol { + return IdentProtocol +} + +func (m *Ident) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error { + s, ok := _i.(interfaces.CanGetState) + if !ok { + return fmt.Errorf("error while processing 'Ident' message; err: %s", errors.ErrInterfaceMisMatch.Error()) + } + + ss, err := s.GetState(connection) + if err != nil { + return fmt.Errorf("error while processing 'Ident' message; err: %s", err.Error()) + } + + c, ok := ss.(interfaces.CanSetClientID) + if !ok { + return fmt.Errorf("error while processing 'Ident' message; err: %s", errors.ErrInterfaceMisMatch.Error()) + } + + if err := c.SetClientID(types.ClientID(m.CurrentHeader.Sender)); err != nil { + return fmt.Errorf("error while processing 'Ident' message; err: %s", err.Error()) + } + + w, ok := ss.(interfaces.CanWriteMessage) + if !ok { + return fmt.Errorf("error while processing 'Ident' message; err: %s", errors.ErrInterfaceMisMatch.Error()) + } + + if err := w.Write(&IdentResponse{}); err != nil { + return fmt.Errorf("error while processing 'Ident' message; err: %s", err.Error()) + } + + return nil +} + +type IdentResponse struct { + interceptor.BaseMessage +} + +// TODO: ADD THE FACTORY FOR IdentResponse + +func (m *IdentResponse) GetProtocol() message.Protocol { + return IdentResponseProtocol +} + +func (m *IdentResponse) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error { + s, ok := _i.(interfaces.CanGetState) + if !ok { + return fmt.Errorf("error while processing 'Ident' message; err: %s", errors.ErrInterfaceMisMatch.Error()) + } + + ss, err := s.GetState(connection) + if err != nil { + return fmt.Errorf("error while processing 'Ident' message; err: %s", err.Error()) + } + + c, ok := ss.(interfaces.CanSetClientID) + if !ok { + return fmt.Errorf("error while processing 'Ident' message; err: %s", errors.ErrInterfaceMisMatch.Error()) + } + + if err := c.SetClientID(types.ClientID(m.CurrentHeader.Sender)); err != nil { + return fmt.Errorf("error while processing 'Ident' message; err: %s", err.Error()) + } + + return nil +} diff --git a/pkg/middleware/chat/messages/request_health_stream.go b/pkg/middleware/chat/messages/request_health_stream.go new file mode 100644 index 0000000..f93c355 --- /dev/null +++ b/pkg/middleware/chat/messages/request_health_stream.go @@ -0,0 +1,68 @@ +package messages + +import ( + "fmt" + + "github.com/harshabose/socket-comm/pkg/interceptor" + "github.com/harshabose/socket-comm/pkg/message" + "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" + "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" + "github.com/harshabose/socket-comm/pkg/middleware/chat/types" +) + +var ( + RequestHealthProtocol message.Protocol = "room:request_health" + HealthResponseProtocol message.Protocol = "room:health_response" +) + +type RequestHealth struct { + interceptor.BaseMessage + RoomID types.RoomID `json:"room_id"` +} + +func NewRequestHealth(id types.RoomID) *RequestHealth { + return &RequestHealth{ + RoomID: id, + } +} + +func NewRequestHealthFactory(id types.RoomID) func() message.Message { + return func() message.Message { + return NewRequestHealth(id) + } +} + +func (m *RequestHealth) GetProtocol() message.Protocol { + return RequestHealthProtocol +} + +func (m *RequestHealth) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error { + +} + +func (m *RequestHealth) WriteProcess(_i interceptor.Interceptor, connection interceptor.Connection) error { + s, ok := _i.(interfaces.CanGetState) + if !ok { + return errors.ErrInterfaceMisMatch + } + + ss, err := s.GetState(connection) + if err != nil { + return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error()) + } + + id, err := ss.GetClientID() + if err != nil { + return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error()) + } + + m.SetSender(message.Sender(_i.ID())) + m.SetReceiver(message.Receiver(id)) + + return nil +} + +type HealthResponse struct { + interceptor.BaseMessage + RoomID types.RoomID `json:"room_id"` +} diff --git a/pkg/middleware/chat/room/messages.go b/pkg/middleware/chat/messages/room_messages.go similarity index 81% rename from pkg/middleware/chat/room/messages.go rename to pkg/middleware/chat/messages/room_messages.go index 3b4733b..96ba7c4 100644 --- a/pkg/middleware/chat/room/messages.go +++ b/pkg/middleware/chat/messages/room_messages.go @@ -1,4 +1,4 @@ -package room +package messages import ( "fmt" @@ -47,10 +47,6 @@ func (m *CreateRoom) ReadProcess(_i interceptor.Interceptor, connection intercep return fmt.Errorf("error while read processing 'CreateRoom' msg; err: %s", err.Error()) } - if !ss.GetConfig().IsServer { - return fmt.Errorf("error while read processing 'CreateRoom' msg; err: %s", errors.ErrMessageForServerOnly) - } - room, err := i.CreateRoom(m.RoomID, m.Allowed, m.TTL) if err != nil { return fmt.Errorf("error while read processing 'CreateRoom' msg; err: %s", err.Error()) @@ -72,27 +68,13 @@ func (m *DeleteRoom) GetProtocol() message.Protocol { return DeleteRoomProtocol } -func (m *DeleteRoom) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error { - s, ok := _i.(interfaces.CanGetState) +func (m *DeleteRoom) ReadProcess(i interceptor.Interceptor, _ interceptor.Connection) error { + d, ok := i.(interfaces.CanDeleteRoom) if !ok { return errors.ErrInterfaceMisMatch } - i, ok := _i.(interfaces.CanDeleteRoom) - if !ok { - return errors.ErrInterfaceMisMatch - } - - ss, err := s.GetState(connection) - if err != nil { - return fmt.Errorf("error while read processing 'DeleteRoom' msg; err: %s", err.Error()) - } - - if !ss.GetConfig().IsServer { - return fmt.Errorf("error while read processing 'DeleteRoom' msg; err: %s", errors.ErrMessageForServerOnly) - } - - if err := i.DeleteRoom(m.RoomID); err != nil { + if err := d.DeleteRoom(m.RoomID); err != nil { return fmt.Errorf("error while read processing 'DeleteRoom' msg; err: %s", errors.ErrMessageForServerOnly) } @@ -111,23 +93,28 @@ func (m *ToForwardMessage) GetProtocol() message.Protocol { } func (m *ToForwardMessage) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error { - s, ok := _i.(interfaces.CanGetState) - if !ok { - return errors.ErrInterfaceMisMatch - } - msg, err := newForwardedMessage(m) if err != nil { return fmt.Errorf("error while read processing 'ToForwardMessage'; err: %s", err.Error()) } - ss, err := s.GetState(connection) - if err != nil { - return fmt.Errorf("error while read processing 'ToForwardMessage' msg; err: %s", err.Error()) + s, ok := _i.(interfaces.CanGetState) + if !ok { + return fmt.Errorf("error while read processing 'ToForwardMessage'; err: %s", errors.ErrInterfaceMisMatch) } - if !ss.GetConfig().IsServer { - return fmt.Errorf("error while read processing 'ToForwardMessage' msg; err: %s", errors.ErrMessageForServerOnly) + ss, err := s.GetState(connection) + if err != nil { + return fmt.Errorf("error while read processing 'ToForwardMessage'; err: %s", errors.ErrInterfaceMisMatch) + } + + clientID, err := ss.GetClientID() + if err != nil { + return fmt.Errorf("error while read processing 'ToForwardMessage'; err: %s", errors.ErrInterfaceMisMatch) + } + + if m.From != clientID { + return fmt.Errorf("error while read processing 'ToForwardMessage'; From and ClientID did not match") } w, ok := _i.(interfaces.CanWriteRoomMessage) @@ -135,7 +122,7 @@ func (m *ToForwardMessage) ReadProcess(_i interceptor.Interceptor, connection in return errors.ErrInterfaceMisMatch } - if err := w.WriteMessage(m.RoomID, msg, m.From, m.To); err != nil { + if err := w.WriteRoomMessage(m.RoomID, msg, m.From, m.To); err != nil { return fmt.Errorf("error while read processing 'ToForwardMessage' msg; err: %s", err.Error()) } @@ -164,7 +151,8 @@ func newForwardedMessage(forward *ToForwardMessage) (*ForwardedMessage, error) { type JoinRoom struct { interceptor.BaseMessage - RoomID types.RoomID + RoomID types.RoomID `json:"room_id"` + WaitDuration time.Duration `json:"wait_duration"` } func (m *JoinRoom) GetProtocol() message.Protocol { @@ -187,11 +175,21 @@ func (m *JoinRoom) ReadProcess(_i interceptor.Interceptor, connection intercepto return errors.ErrInterfaceMisMatch } - if err := a.Add(m.RoomID, ss); err != nil { - return fmt.Errorf("error while read processing 'JoinRoom' msg; err: %s", err.Error()) - } + timer := time.NewTimer(m.WaitDuration) + defer timer.Stop() - return nil + for { + select { + case <-timer.C: + return fmt.Errorf("error while read processing 'JoinRoom' msg; err: %s", errors.ErrMessageForServerOnly) + default: + err := a.Add(m.RoomID, ss) + if err == nil { + return nil + } + fmt.Println(fmt.Errorf("error while read processing 'JoinRoom' msg; err: %s. retrying", err.Error())) + } + } } type LeaveRoom struct { diff --git a/pkg/middleware/chat/process/delete_room_waiter.go b/pkg/middleware/chat/process/delete_room_waiter.go new file mode 100644 index 0000000..3760d19 --- /dev/null +++ b/pkg/middleware/chat/process/delete_room_waiter.go @@ -0,0 +1,94 @@ +package process + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" + "github.com/harshabose/socket-comm/pkg/middleware/chat/types" +) + +type DeleteRoomWaiter struct { + deleter interfaces.CanDeleteRoom + ttl time.Duration + roomid types.RoomID + err error + ctx context.Context + cancel context.CancelFunc + mux sync.RWMutex + done chan struct{} +} + +func NewDeleteRoomWaiter(ctx context.Context, cancel context.CancelFunc, deleter interfaces.CanDeleteRoom, roomid types.RoomID, ttl time.Duration) *DeleteRoomWaiter { + return &DeleteRoomWaiter{ + ctx: ctx, + cancel: cancel, + roomid: roomid, + deleter: deleter, + err: nil, + ttl: ttl, + done: make(chan struct{}), + } +} + +func (p *DeleteRoomWaiter) Process(r interfaces.CanGetRoom, _ interfaces.State) error { + timer := time.NewTimer(p.ttl) + defer timer.Stop() + + for { + select { + case <-timer.C: + if err := p.process(r); err != nil { + return fmt.Errorf("error while processing DeleteRoomWaiter process; err: %s", err.Error()) + } + case <-p.ctx.Done(): + return fmt.Errorf("context cancelled before process completion") + } + } +} + +func (p *DeleteRoomWaiter) ProcessBackground(r interfaces.CanGetRoom, s interfaces.State) interfaces.CanProcessBackground { + go func() { + if err := p.Process(r, s); err != nil { + p.mux.Lock() + p.err = err + p.mux.Unlock() + p.done <- struct{}{} + + fmt.Println(p.err) + } + }() + + return p +} + +func (p *DeleteRoomWaiter) Wait() error { + <-p.done + p.mux.RLock() + defer p.mux.RUnlock() + + return p.err +} + +func (p *DeleteRoomWaiter) Stop() { + p.cancel() +} + +func (p *DeleteRoomWaiter) process(r interfaces.CanGetRoom) error { + room, err := r.GetRoom(p.roomid) + if err != nil { + return fmt.Errorf("error while processing DelteRoomWaiter process; err: %s", err.Error()) + } + + if err := room.Close(); err != nil { + return err + } + + if err := p.deleter.DeleteRoom(p.roomid); err != nil { + return err + } + + return nil +} diff --git a/pkg/middleware/chat/process/send_message_room.go b/pkg/middleware/chat/process/send_message_room.go new file mode 100644 index 0000000..d742012 --- /dev/null +++ b/pkg/middleware/chat/process/send_message_room.go @@ -0,0 +1,103 @@ +package process + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/harshabose/socket-comm/internal/util" + "github.com/harshabose/socket-comm/pkg/message" + "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" + "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" + "github.com/harshabose/socket-comm/pkg/middleware/chat/types" +) + +type SendMessageRoom struct { + msgFactory func() message.Message + roomid types.RoomID + interval time.Duration + err error + mux sync.RWMutex + done chan struct{} + ctx context.Context + cancel context.CancelFunc +} + +func NewSendMessageRoom(ctx context.Context, cancel context.CancelFunc, msgFactory func() message.Message, roomid types.RoomID, interval time.Duration) *SendMessageRoom { + return &SendMessageRoom{ + ctx: ctx, + cancel: cancel, + msgFactory: msgFactory, + roomid: roomid, + interval: interval, + done: make(chan struct{}), + } +} + +func (p *SendMessageRoom) Process(r interfaces.CanGetRoom, s interfaces.State) error { + ticker := time.NewTicker(p.interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if err := p.process(r, s); err != nil { + fmt.Println("error while processing SendMessageRoom process; err: ", err.Error()) + } + case <-p.ctx.Done(): + return nil + } + } +} + +func (p *SendMessageRoom) ProcessBackground(r interfaces.CanGetRoom, s interfaces.State) interfaces.CanProcessBackground { + go func() { + if err := p.Process(r, s); err != nil { + p.mux.Lock() + p.err = err + p.mux.Unlock() + p.done <- struct{}{} + + fmt.Println(p.err) + } + }() + + return p +} + +func (p *SendMessageRoom) Wait() error { + <-p.done + p.mux.RLock() + defer p.mux.RUnlock() + + return p.err +} + +func (p *SendMessageRoom) Stop() { + p.cancel() +} + +func (p *SendMessageRoom) process(r interfaces.CanGetRoom, _ interfaces.State) error { + room, err := r.GetRoom(p.roomid) + if err != nil { + return err + } + + participants := room.GetParticipants() + + w, ok := room.(interfaces.CanWriteRoomMessage) + if !ok { + return errors.ErrInterfaceMisMatch + } + + merr := util.NewMultiError() + + for _, participant := range participants { + if err := w.WriteRoomMessage(p.roomid, p.msgFactory(), "", participant); err != nil { + merr.Add(err) + } + } + + return merr.ErrorOrNil() +} diff --git a/pkg/middleware/chat/process/wait_until_ident.go b/pkg/middleware/chat/process/wait_until_ident.go new file mode 100644 index 0000000..220f99e --- /dev/null +++ b/pkg/middleware/chat/process/wait_until_ident.go @@ -0,0 +1,57 @@ +package process + +import ( + "context" + "fmt" + "time" + + "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" + "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" +) + +type WaitUntilIdent struct { + ctx context.Context + duration time.Duration +} + +type WaitUntilIdentOption func(*WaitUntilIdent) + +func WithTickerDuration(duration time.Duration) WaitUntilIdentOption { + return WaitUntilIdentOption(func(ident *WaitUntilIdent) { + ident.duration = duration + }) +} + +func NewWaitUntilIdentComplete(ctx context.Context, options ...WaitUntilIdentOption) *WaitUntilIdent { + i := &WaitUntilIdent{ + ctx: ctx, + duration: 500 * time.Millisecond, + } + + for _, option := range options { + option(i) + } + + return i +} + +func (p WaitUntilIdent) Process(_ interfaces.CanGetRoom, s interfaces.State) error { + ticker := time.NewTicker(p.duration) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if p.process(s) { + return nil + } + case <-p.ctx.Done(): + return fmt.Errorf("error while processing WaitUntilIdent process; err: %s", errors.ErrContextCancelled.Error()) + } + } +} + +func (p WaitUntilIdent) process(s interfaces.State) bool { + _, err := s.GetClientID() + return err == nil +} diff --git a/pkg/middleware/chat/room/manager.go b/pkg/middleware/chat/room/manager.go index ac6ad05..b4e0a0f 100644 --- a/pkg/middleware/chat/room/manager.go +++ b/pkg/middleware/chat/room/manager.go @@ -8,6 +8,8 @@ import ( "github.com/harshabose/socket-comm/pkg/message" "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" + "github.com/harshabose/socket-comm/pkg/middleware/chat/messages" + "github.com/harshabose/socket-comm/pkg/middleware/chat/process" "github.com/harshabose/socket-comm/pkg/middleware/chat/types" ) @@ -44,12 +46,11 @@ func (m *Manager) CreateRoom(id types.RoomID, allowed []types.ClientID, ttl time m.rooms[id] = room - go func() { - if err := m.Process(NewDeleteRoomWaiter(ctx, m, id, ttl), nil); err != nil { - fmt.Println(err.Error()) - } - }() // TODO: DO I NEED THIS? + // NOTE: THE FOLLOWING STARTS A BACKGROUND PROCESS WHICH WAITS UNTIL TTL AND KILLS THE ROOM. THIS DOES NOT KILL CONNECTION + _ = m.ProcessBackground(process.NewDeleteRoomWaiter(ctx, cancel, m, id, ttl), nil) + // NOTE: THE FOLLOWING STARTS A BACKGROUND PROCESS WHICH CONSTANTLY SENDS THE GIVEN MESSAGE TO EVERY PARTICIPANT IN THE ROOM + _ = m.ProcessBackground(process.NewSendMessageRoom(ctx, cancel, messages.NewRequestHealthFactory(id), id, 5*time.Second), nil) return room, nil } @@ -77,10 +78,11 @@ func (m *Manager) DeleteRoom(id types.RoomID) error { return fmt.Errorf("error while deleting room with id: %s; err: %s", id, err.Error()) } + delete(m.rooms, id) return nil } -func (m *Manager) WriteMessage(roomid types.RoomID, msg message.Message, from types.ClientID, tos ...types.ClientID) error { +func (m *Manager) WriteRoomMessage(roomid types.RoomID, msg message.Message, from types.ClientID, tos ...types.ClientID) error { room, err := m.GetRoom(roomid) if err != nil { return err @@ -91,9 +93,13 @@ func (m *Manager) WriteMessage(roomid types.RoomID, msg message.Message, from ty return errors.ErrInterfaceMisMatch } - return w.WriteMessage(roomid, msg, from, tos...) + return w.WriteRoomMessage(roomid, msg, from, tos...) } func (m *Manager) Process(process interfaces.CanProcess, state interfaces.State) error { return process.Process(m, state) } + +func (m *Manager) ProcessBackground(process interfaces.CanProcessBackground, state interfaces.State) interfaces.CanProcessBackground { + return process.ProcessBackground(m, state) +} diff --git a/pkg/middleware/chat/room/processes.go b/pkg/middleware/chat/room/processes.go deleted file mode 100644 index bdd203f..0000000 --- a/pkg/middleware/chat/room/processes.go +++ /dev/null @@ -1,59 +0,0 @@ -package room - -import ( - "context" - "fmt" - "time" - - "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" - "github.com/harshabose/socket-comm/pkg/middleware/chat/types" -) - -type DeleteRoomWaiter struct { - canDelete interfaces.CanDeleteRoom - ttl time.Duration - roomid types.RoomID - ctx context.Context -} - -func NewDeleteRoomWaiter(ctx context.Context, manager interfaces.CanDeleteRoom, roomid types.RoomID, ttl time.Duration) *DeleteRoomWaiter { - return &DeleteRoomWaiter{ - ctx: ctx, - roomid: roomid, - canDelete: manager, - ttl: ttl, - } -} - -func (p DeleteRoomWaiter) Process(r interfaces.CanGetRoom, _ interfaces.State) error { - timer := time.NewTimer(p.ttl) - defer timer.Stop() - - for { - select { - case <-timer.C: - if err := p.process(r); err != nil { - return fmt.Errorf("error while processing DeleteRoomWaiter process; err: %s", err.Error()) - } - case <-p.ctx.Done(): - return fmt.Errorf("context cancelled before process completion") - } - } -} - -func (p DeleteRoomWaiter) process(r interfaces.CanGetRoom) error { - room, err := r.GetRoom(p.roomid) - if err != nil { - return fmt.Errorf("error while processing DelteRoomWaiter process; err: %s", err.Error()) - } - - if err := room.Close(); err != nil { - return err - } - - if err := p.canDelete.DeleteRoom(p.roomid); err != nil { - return err - } - - return nil -} diff --git a/pkg/middleware/chat/room/room.go b/pkg/middleware/chat/room/room.go index 7abb4ee..6bb77d4 100644 --- a/pkg/middleware/chat/room/room.go +++ b/pkg/middleware/chat/room/room.go @@ -43,33 +43,43 @@ func (r *Room) Add(roomid types.RoomID, s interfaces.State) error { return err } - if !r.isAllowed(id) { - return fmt.Errorf("error while adding client to room. client ID: %s; room ID: %s; err: %s", id, r.roomid, errors.ErrClientNotAllowedInRoom.Error()) - } + select { + case <-r.ctx.Done(): + return fmt.Errorf("error while adding client to room. client ID: %s; room ID: %s; err: %s", id, r.roomid, errors.ErrContextCancelled.Error()) + default: + if !r.isAllowed(id) { + return fmt.Errorf("error while adding client to room. client ID: %s; room ID: %s; err: %s", id, r.roomid, errors.ErrClientNotAllowedInRoom.Error()) + } - if r.isParticipant(id) { - return fmt.Errorf("client with id '%s' already existing in the room with id %s; err: %s", id, r.roomid, errors.ErrClientIsAlreadyParticipant) - } + if r.isParticipant(id) { + return fmt.Errorf("client with id '%s' already existing in the room with id %s; err: %s", id, r.roomid, errors.ErrClientIsAlreadyParticipant) + } - r.participants[id] = s - return nil + r.participants[id] = s + return nil + } } func (r *Room) isAllowed(id types.ClientID) bool { - if len(r.allowed) == 0 { - return true - } - - for _, allowedID := range r.allowed { - if allowedID == id { + select { + case <-r.ctx.Done(): + return false + default: + if len(r.allowed) == 0 { return true } - } - return false + for _, allowedID := range r.allowed { + if allowedID == id { + return true + } + } + + return false + } } -func (r *Room) forEach(f func(id types.ClientID) bool, ids ...types.ClientID) bool { +func (r *Room) forEachBoolean(f func(id types.ClientID) bool, ids ...types.ClientID) bool { if len(ids) == 0 { return false } @@ -93,54 +103,85 @@ func (r *Room) Remove(roomid types.RoomID, s interfaces.State) error { return err } - if !r.isAllowed(id) { - return fmt.Errorf("error while removing client to room. client ID: %s; room ID: %s; err: %s", id, r.roomid, errors.ErrClientNotAllowedInRoom.Error()) - } + select { + case <-r.ctx.Done(): + return fmt.Errorf("error while removing client to room. client ID: %s; room ID: %s; err: %s", id, r.roomid, errors.ErrContextCancelled.Error()) + default: + if !r.isAllowed(id) { + return fmt.Errorf("error while removing client to room. client ID: %s; room ID: %s; err: %s", id, r.roomid, errors.ErrClientNotAllowedInRoom.Error()) + } - if !r.isParticipant(id) { - return fmt.Errorf("client with id '%s' does not exist in the room with id %s; err: %s", id, r.roomid, errors.ErrClientNotAParticipant.Error()) - } + if !r.isParticipant(id) { + return fmt.Errorf("client with id '%s' does not exist in the room with id %s; err: %s", id, r.roomid, errors.ErrClientNotAParticipant.Error()) + } - delete(r.participants, id) - return nil + delete(r.participants, id) + return nil + } } func (r *Room) isParticipant(id types.ClientID) bool { - _, exists := r.participants[id] - return exists + select { + case <-r.ctx.Done(): + return false + default: + _, exists := r.participants[id] + return exists + } } -func (r *Room) WriteMessage(roomid types.RoomID, msg message.Message, from types.ClientID, tos ...types.ClientID) error { - if roomid != r.roomid { - return errors.ErrWrongRoom - } - - if len(tos) == 0 { - return fmt.Errorf("atleast one receiver is need to use 'WriteMessage' message") - } - - if !r.forEach(r.isAllowed, append(tos, from)...) { - return errors.ErrClientNotAllowedInRoom - } - - if !r.forEach(r.isParticipant, append(tos, from)...) { - return errors.ErrClientNotAParticipant - } - - for _, to := range tos { - s, ok := (r.participants[to]).(interfaces.CanWriteMessage) - if !ok { - return errors.ErrInterfaceMisMatch +func (r *Room) WriteRoomMessage(roomid types.RoomID, msg message.Message, from types.ClientID, tos ...types.ClientID) error { + select { + case <-r.ctx.Done(): + return fmt.Errorf("error while sending message to peer in room; err: %s", errors.ErrContextCancelled.Error()) + default: + if roomid != r.roomid { + return errors.ErrWrongRoom } - if err := s.Write(msg); err != nil { - return fmt.Errorf("error while sending message to peer in room; err: %s", err.Error()) + if len(tos) == 0 { + return fmt.Errorf("atleast one receiver is need to use 'WriteRoomMessage' message") } - } - return nil + if !r.forEachBoolean(r.isAllowed, append(tos, from)...) { + return errors.ErrClientNotAllowedInRoom + } + + if !r.forEachBoolean(r.isParticipant, append(tos, from)...) { + return errors.ErrClientNotAParticipant + } + + for _, to := range tos { + s, ok := (r.participants[to]).(interfaces.CanWriteMessage) + if !ok { + return errors.ErrInterfaceMisMatch + } + + if err := s.Write(msg); err != nil { + return fmt.Errorf("error while sending message to peer in room; err: %s", err.Error()) + } + } + + return nil + } +} + +func (r *Room) GetParticipants() []types.ClientID { + select { + case <-r.ctx.Done(): + return make([]types.ClientID, 0) // EMPTY LIST + default: + clients := make([]types.ClientID, 0) + for id, _ := range r.participants { + clients = append(clients, id) + } + return clients + } } func (r *Room) Close() error { + r.cancel() + r.participants = make(map[types.ClientID]interfaces.State) + r.allowed = make([]types.ClientID, 0) return nil } diff --git a/pkg/middleware/chat/server_interceptor.go b/pkg/middleware/chat/server_interceptor.go new file mode 100644 index 0000000..c36f310 --- /dev/null +++ b/pkg/middleware/chat/server_interceptor.go @@ -0,0 +1,94 @@ +package chat + +import ( + "context" + "fmt" + "time" + + "github.com/harshabose/socket-comm/pkg/interceptor" + "github.com/harshabose/socket-comm/pkg/message" + "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" + "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" + "github.com/harshabose/socket-comm/pkg/middleware/chat/process" + "github.com/harshabose/socket-comm/pkg/middleware/chat/types" +) + +type ServerInterceptor struct { + commonInterceptor + interceptor.Interceptor + rooms interfaces.RoomManager +} + +func (i *ServerInterceptor) BindSocketConnection(connection interceptor.Connection, writer interceptor.Writer, reader interceptor.Reader) (interceptor.Writer, interceptor.Reader, error) { + return i.commonInterceptor.BindSocketConnection(connection, writer, reader) +} + +func (i *ServerInterceptor) Init(connection interceptor.Connection) error { + s, err := i.GetState(connection) + if err != nil { + return fmt.Errorf("error while init; err: %s", err.Error()) + } + + w, ok := s.(interfaces.CanWriteMessage) + if !ok { + return errors.ErrInterfaceMisMatch + } + + // TODO: SEND IDENT MESSAGE + if err := w.Write(); err != nil { + return fmt.Errorf("error while init; err: %s", err.Error()) + } + + ctx, cancel := context.WithTimeout(s.Ctx(), 10*time.Second) + defer cancel() + + p := process.NewWaitUntilIdentComplete(ctx, process.WithTickerDuration(500*time.Millisecond)) + if err := p.Process(nil, s); err != nil { + return fmt.Errorf("error while init; err: %s", err.Error()) + } + + return nil +} + +func (i *ServerInterceptor) UnBindSocketConnection(connection interceptor.Connection) { + +} + +func (i *ServerInterceptor) Close() error { + return nil +} + +func (i *ServerInterceptor) CreateRoom(id types.RoomID, allowed []types.ClientID, ttl time.Duration) (interfaces.Room, error) { + return i.rooms.CreateRoom(id, allowed, ttl) +} + +func (i *ServerInterceptor) DeleteRoom(id types.RoomID) error { + return i.rooms.DeleteRoom(id) +} + +func (i *ServerInterceptor) WriteRoomMessage(roomid types.RoomID, msg message.Message, from types.ClientID, tos ...types.ClientID) error { + w, ok := i.rooms.(interfaces.CanWriteRoomMessage) + if !ok { + return errors.ErrInterfaceMisMatch + } + + return w.WriteRoomMessage(roomid, msg, from, tos...) +} + +func (i *ServerInterceptor) Add(id types.RoomID, s interfaces.State) error { + a, ok := i.rooms.(interfaces.CanAdd) + if !ok { + return errors.ErrInterfaceMisMatch + } + + return a.Add(id, s) +} + +func (i *ServerInterceptor) Remove(id types.RoomID, s interfaces.State) error { + r, ok := i.rooms.(interfaces.CanRemove) + if !ok { + return errors.ErrInterfaceMisMatch + } + + return r.Remove(id, s) +} diff --git a/pkg/middleware/chat/state/state.go b/pkg/middleware/chat/state/state.go index a7fdd63..10cc453 100644 --- a/pkg/middleware/chat/state/state.go +++ b/pkg/middleware/chat/state/state.go @@ -5,7 +5,6 @@ import ( "github.com/harshabose/socket-comm/pkg/interceptor" "github.com/harshabose/socket-comm/pkg/message" - "github.com/harshabose/socket-comm/pkg/middleware/chat/config" "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" "github.com/harshabose/socket-comm/pkg/middleware/chat/types" @@ -16,15 +15,13 @@ type state struct { connection interceptor.Connection writer interceptor.Writer reader interceptor.Reader - config config.Config cancel context.CancelFunc ctx context.Context } -func NewState(ctx context.Context, cancel context.CancelFunc, config config.Config, connection interceptor.Connection, writer interceptor.Writer, reader interceptor.Reader) interfaces.State { +func NewState(ctx context.Context, cancel context.CancelFunc, connection interceptor.Connection, writer interceptor.Writer, reader interceptor.Reader) interfaces.State { return &state{ id: types.UnKnownClient, - config: config, connection: connection, writer: writer, reader: reader, @@ -33,6 +30,10 @@ func NewState(ctx context.Context, cancel context.CancelFunc, config config.Conf } } +func (s *state) Ctx() context.Context { + return s.ctx +} + func (s *state) GetClientID() (types.ClientID, error) { if s.id == types.UnKnownClient { return s.id, errors.ErrUnknownClientIDState @@ -45,6 +46,11 @@ func (s *state) Write(msg message.Message) error { return s.writer.Write(s.connection, msg) } -func (s *state) GetConfig() config.Config { - return s.config +func (s *state) SetClientID(id types.ClientID) error { + if s.id != types.UnKnownClient { + return errors.ErrClientIDNotConsistent + } + + s.id = id + return nil }