mirror of
https://github.com/harshabose/socket-comm.git
synced 2025-10-06 16:18:05 +08:00
added the Processor interface
This commit is contained in:
@@ -1,7 +1,12 @@
|
|||||||
package chat
|
package chat
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/harshabose/socket-comm/pkg/interceptor"
|
"github.com/harshabose/socket-comm/pkg/interceptor"
|
||||||
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/process"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ClientInterceptor struct {
|
type ClientInterceptor struct {
|
||||||
@@ -11,3 +16,20 @@ type ClientInterceptor struct {
|
|||||||
func (i *ClientInterceptor) BindSocketConnection(connection interceptor.Connection, writer interceptor.Writer, reader interceptor.Reader) (interceptor.Writer, interceptor.Reader, error) {
|
func (i *ClientInterceptor) BindSocketConnection(connection interceptor.Connection, writer interceptor.Writer, reader interceptor.Reader) (interceptor.Writer, interceptor.Reader, error) {
|
||||||
return i.commonInterceptor.BindSocketConnection(connection, writer, reader)
|
return i.commonInterceptor.BindSocketConnection(connection, writer, reader)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (i *ClientInterceptor) Init(connection interceptor.Connection) error {
|
||||||
|
s, err := i.GetState(connection)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error while init; err: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
p := process.NewIdentWaiter(ctx)
|
||||||
|
if err := p.Process(nil, s); err != nil {
|
||||||
|
return fmt.Errorf("error while init; err: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@@ -6,7 +6,6 @@ import (
|
|||||||
"github.com/harshabose/socket-comm/pkg/interceptor"
|
"github.com/harshabose/socket-comm/pkg/interceptor"
|
||||||
"github.com/harshabose/socket-comm/pkg/message"
|
"github.com/harshabose/socket-comm/pkg/message"
|
||||||
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
|
||||||
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
|
|
||||||
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -14,7 +13,7 @@ type commonInterceptor struct {
|
|||||||
interceptor.NoOpInterceptor
|
interceptor.NoOpInterceptor
|
||||||
readProcessMessages message.Registry
|
readProcessMessages message.Registry
|
||||||
writeProcessMessages message.Registry
|
writeProcessMessages message.Registry
|
||||||
states interfaces.StateManager
|
states state.Manager
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *commonInterceptor) BindSocketConnection(connection interceptor.Connection, writer interceptor.Writer, reader interceptor.Reader) (interceptor.Writer, interceptor.Reader, error) {
|
func (i *commonInterceptor) BindSocketConnection(connection interceptor.Connection, writer interceptor.Writer, reader interceptor.Reader) (interceptor.Writer, interceptor.Reader, error) {
|
||||||
@@ -98,6 +97,6 @@ func (i *commonInterceptor) Close() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *commonInterceptor) GetState(connection interceptor.Connection) (interfaces.State, error) {
|
func (i *commonInterceptor) GetState(connection interceptor.Connection) (*state.State, error) {
|
||||||
return i.states.GetState(connection)
|
return i.states.GetState(connection)
|
||||||
}
|
}
|
||||||
|
1
pkg/middleware/chat/interfaces/health.go
Normal file
1
pkg/middleware/chat/interfaces/health.go
Normal file
@@ -0,0 +1 @@
|
|||||||
|
package interfaces
|
26
pkg/middleware/chat/interfaces/process.go
Normal file
26
pkg/middleware/chat/interfaces/process.go
Normal file
@@ -0,0 +1,26 @@
|
|||||||
|
package interfaces
|
||||||
|
|
||||||
|
import "github.com/harshabose/socket-comm/pkg/middleware/chat/state"
|
||||||
|
|
||||||
|
type CanProcess interface {
|
||||||
|
Process(CanBeProcessed, *state.State) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type CanProcessBackground interface {
|
||||||
|
ProcessBackground(CanBeProcessedBackground, *state.State) CanBeProcessedBackground
|
||||||
|
}
|
||||||
|
|
||||||
|
type Processor interface {
|
||||||
|
CanProcess
|
||||||
|
CanProcessBackground
|
||||||
|
}
|
||||||
|
|
||||||
|
type CanBeProcessed interface {
|
||||||
|
Process(Processor, *state.State) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type CanBeProcessedBackground interface {
|
||||||
|
ProcessBackground(Processor, *state.State) CanBeProcessedBackground
|
||||||
|
Wait() error
|
||||||
|
Stop()
|
||||||
|
}
|
@@ -1,31 +1,24 @@
|
|||||||
package interfaces
|
package interfaces
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/harshabose/socket-comm/pkg/message"
|
"github.com/harshabose/socket-comm/pkg/message"
|
||||||
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/room"
|
||||||
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
|
||||||
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
type CanAdd interface {
|
type CanAdd interface {
|
||||||
Add(types.RoomID, State) error
|
Add(types.RoomID, *state.State) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type CanRemove interface {
|
type CanRemove interface {
|
||||||
Remove(types.RoomID, State) error
|
Remove(types.RoomID, *state.State) error
|
||||||
}
|
|
||||||
|
|
||||||
type Room interface {
|
|
||||||
CanAdd
|
|
||||||
CanRemove
|
|
||||||
ID() types.RoomID
|
|
||||||
GetParticipants() []types.ClientID
|
|
||||||
io.Closer
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type CanGetRoom interface {
|
type CanGetRoom interface {
|
||||||
GetRoom(id types.RoomID) (Room, error)
|
GetRoom(id types.RoomID) (*room.Room, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type CanWriteRoomMessage interface {
|
type CanWriteRoomMessage interface {
|
||||||
@@ -33,33 +26,9 @@ type CanWriteRoomMessage interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type CanCreateRoom interface {
|
type CanCreateRoom interface {
|
||||||
CreateRoom(types.RoomID, []types.ClientID, time.Duration) (Room, error)
|
CreateRoom(types.RoomID, []types.ClientID, time.Duration) (*room.Room, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type CanDeleteRoom interface {
|
type CanDeleteRoom interface {
|
||||||
DeleteRoom(types.RoomID) error
|
DeleteRoom(types.RoomID) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type RoomManager interface {
|
|
||||||
CanCreateRoom
|
|
||||||
CanDeleteRoom
|
|
||||||
CanGetRoom
|
|
||||||
}
|
|
||||||
|
|
||||||
type RoomProcessor interface {
|
|
||||||
Process(CanProcess, State) error
|
|
||||||
}
|
|
||||||
|
|
||||||
type RoomProcessorBackground interface {
|
|
||||||
ProcessBackground(CanProcessBackground, State) CanProcessBackground
|
|
||||||
}
|
|
||||||
|
|
||||||
type CanProcess interface {
|
|
||||||
Process(CanGetRoom, State) error
|
|
||||||
}
|
|
||||||
|
|
||||||
type CanProcessBackground interface {
|
|
||||||
ProcessBackground(room CanGetRoom, state State) CanProcessBackground
|
|
||||||
Wait() error
|
|
||||||
Stop()
|
|
||||||
}
|
|
||||||
|
@@ -1,28 +1,17 @@
|
|||||||
package interfaces
|
package interfaces
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
|
|
||||||
"github.com/harshabose/socket-comm/pkg/interceptor"
|
"github.com/harshabose/socket-comm/pkg/interceptor"
|
||||||
"github.com/harshabose/socket-comm/pkg/message"
|
"github.com/harshabose/socket-comm/pkg/message"
|
||||||
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
|
||||||
)
|
)
|
||||||
|
|
||||||
type CanSetClientID interface {
|
|
||||||
SetClientID(id types.ClientID) error
|
|
||||||
}
|
|
||||||
|
|
||||||
type State interface {
|
|
||||||
GetClientID() (types.ClientID, error)
|
|
||||||
Ctx() context.Context
|
|
||||||
}
|
|
||||||
|
|
||||||
type CanGetState interface {
|
type CanGetState interface {
|
||||||
GetState(interceptor.Connection) (State, error)
|
GetState(interceptor.Connection) (*state.State, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type CanSetState interface {
|
type CanSetState interface {
|
||||||
SetState(interceptor.Connection, State) error
|
SetState(interceptor.Connection, *state.State) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type CanRemoveState interface {
|
type CanRemoveState interface {
|
||||||
@@ -32,10 +21,3 @@ type CanRemoveState interface {
|
|||||||
type CanWriteMessage interface {
|
type CanWriteMessage interface {
|
||||||
Write(message.Message) error
|
Write(message.Message) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type StateManager interface {
|
|
||||||
CanGetState
|
|
||||||
CanSetState
|
|
||||||
CanRemoveState
|
|
||||||
ForEach(func(interceptor.Connection, State) error) error
|
|
||||||
}
|
|
||||||
|
@@ -34,21 +34,11 @@ func (m *Ident) ReadProcess(_i interceptor.Interceptor, connection interceptor.C
|
|||||||
return fmt.Errorf("error while processing 'Ident' message; err: %s", err.Error())
|
return fmt.Errorf("error while processing 'Ident' message; err: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
c, ok := ss.(interfaces.CanSetClientID)
|
if err := ss.SetClientID(types.ClientID(m.CurrentHeader.Sender)); err != nil {
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("error while processing 'Ident' message; err: %s", errors.ErrInterfaceMisMatch.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := c.SetClientID(types.ClientID(m.CurrentHeader.Sender)); err != nil {
|
|
||||||
return fmt.Errorf("error while processing 'Ident' message; err: %s", err.Error())
|
return fmt.Errorf("error while processing 'Ident' message; err: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
w, ok := ss.(interfaces.CanWriteMessage)
|
if err := ss.Write(&IdentResponse{}); err != nil {
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("error while processing 'Ident' message; err: %s", errors.ErrInterfaceMisMatch.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := w.Write(&IdentResponse{}); err != nil {
|
|
||||||
return fmt.Errorf("error while processing 'Ident' message; err: %s", err.Error())
|
return fmt.Errorf("error while processing 'Ident' message; err: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -76,12 +66,7 @@ func (m *IdentResponse) ReadProcess(_i interceptor.Interceptor, connection inter
|
|||||||
return fmt.Errorf("error while processing 'Ident' message; err: %s", err.Error())
|
return fmt.Errorf("error while processing 'Ident' message; err: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
c, ok := ss.(interfaces.CanSetClientID)
|
if err := ss.SetClientID(types.ClientID(m.CurrentHeader.Sender)); err != nil {
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("error while processing 'Ident' message; err: %s", errors.ErrInterfaceMisMatch.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := c.SetClientID(types.ClientID(m.CurrentHeader.Sender)); err != nil {
|
|
||||||
return fmt.Errorf("error while processing 'Ident' message; err: %s", err.Error())
|
return fmt.Errorf("error while processing 'Ident' message; err: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -68,12 +68,7 @@ func (m *RequestHealth) ReadProcess(_i interceptor.Interceptor, connection inter
|
|||||||
return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error())
|
return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
w, ok := ss.(interfaces.CanWriteMessage)
|
if err := ss.Write(msg); err != nil {
|
||||||
if !ok {
|
|
||||||
return errors.ErrInterfaceMisMatch
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := w.Write(msg); err != nil {
|
|
||||||
return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error())
|
return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -6,8 +6,10 @@ import (
|
|||||||
|
|
||||||
"github.com/harshabose/socket-comm/pkg/interceptor"
|
"github.com/harshabose/socket-comm/pkg/interceptor"
|
||||||
"github.com/harshabose/socket-comm/pkg/message"
|
"github.com/harshabose/socket-comm/pkg/message"
|
||||||
|
"github.com/harshabose/socket-comm/pkg/middleware/chat"
|
||||||
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
|
||||||
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
|
||||||
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
|
||||||
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -32,31 +34,31 @@ func (m *CreateRoom) GetProtocol() message.Protocol {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *CreateRoom) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error {
|
func (m *CreateRoom) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error {
|
||||||
s, ok := _i.(interfaces.CanGetState)
|
i, ok := _i.(*chat.ServerInterceptor)
|
||||||
if !ok {
|
if !ok {
|
||||||
return errors.ErrInterfaceMisMatch
|
return errors.ErrInterfaceMisMatch
|
||||||
}
|
}
|
||||||
|
|
||||||
i, ok := _i.(interfaces.CanCreateRoom)
|
s, err := i.GetState(connection)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return i.Rooms.Process(m, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *CreateRoom) Process(p interfaces.Processor, s *state.State) error {
|
||||||
|
r, ok := p.(interfaces.CanCreateRoom)
|
||||||
if !ok {
|
if !ok {
|
||||||
return errors.ErrInterfaceMisMatch
|
return errors.ErrInterfaceMisMatch
|
||||||
}
|
}
|
||||||
|
|
||||||
ss, err := s.GetState(connection)
|
room, err := r.CreateRoom(m.RoomID, m.Allowed, m.TTL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error while read processing 'CreateRoom' msg; err: %s", err.Error())
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
room, err := i.CreateRoom(m.RoomID, m.Allowed, m.TTL)
|
return room.Add(m.RoomID, s)
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error while read processing 'CreateRoom' msg; err: %s", err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := room.Add(m.RoomID, ss); err != nil {
|
|
||||||
return fmt.Errorf("error while read processing 'CreateRoom' msg; err: %s", err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type DeleteRoom struct {
|
type DeleteRoom struct {
|
||||||
@@ -68,24 +70,28 @@ func (m *DeleteRoom) GetProtocol() message.Protocol {
|
|||||||
return DeleteRoomProtocol
|
return DeleteRoomProtocol
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *DeleteRoom) ReadProcess(i interceptor.Interceptor, _ interceptor.Connection) error {
|
func (m *DeleteRoom) ReadProcess(_i interceptor.Interceptor, _ interceptor.Connection) error {
|
||||||
d, ok := i.(interfaces.CanDeleteRoom)
|
i, ok := _i.(*chat.ServerInterceptor)
|
||||||
if !ok {
|
if !ok {
|
||||||
return errors.ErrInterfaceMisMatch
|
return errors.ErrInterfaceMisMatch
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := d.DeleteRoom(m.RoomID); err != nil {
|
return i.Rooms.Process(m, nil)
|
||||||
return fmt.Errorf("error while read processing 'DeleteRoom' msg; err: %s", errors.ErrMessageForServerOnly)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
func (m *DeleteRoom) Process(p interfaces.Processor, _ *state.State) error {
|
||||||
|
r, ok := p.(interfaces.CanDeleteRoom)
|
||||||
|
if !ok {
|
||||||
|
return errors.ErrInterfaceMisMatch
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.DeleteRoom(m.RoomID)
|
||||||
}
|
}
|
||||||
|
|
||||||
type ToForwardMessage struct {
|
type ToForwardMessage struct {
|
||||||
interceptor.BaseMessage
|
interceptor.BaseMessage
|
||||||
RoomID types.RoomID `json:"room_id"`
|
RoomID types.RoomID `json:"room_id"`
|
||||||
From types.ClientID `json:"from"`
|
To []types.ClientID `json:"to"`
|
||||||
To types.ClientID `json:"to"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *ToForwardMessage) GetProtocol() message.Protocol {
|
func (m *ToForwardMessage) GetProtocol() message.Protocol {
|
||||||
@@ -93,37 +99,40 @@ func (m *ToForwardMessage) GetProtocol() message.Protocol {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *ToForwardMessage) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error {
|
func (m *ToForwardMessage) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error {
|
||||||
|
i, ok := _i.(*chat.ServerInterceptor)
|
||||||
|
if !ok {
|
||||||
|
return errors.ErrInterfaceMisMatch
|
||||||
|
}
|
||||||
|
s, err := i.GetState(connection)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return i.Rooms.Process(m, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *ToForwardMessage) Process(p interfaces.Processor, s *state.State) error {
|
||||||
msg, err := newForwardedMessage(m)
|
msg, err := newForwardedMessage(m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error while read processing 'ToForwardMessage'; err: %s", err.Error())
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
s, ok := _i.(interfaces.CanGetState)
|
clientID, err := s.GetClientID()
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("error while read processing 'ToForwardMessage'; err: %s", errors.ErrInterfaceMisMatch)
|
|
||||||
}
|
|
||||||
|
|
||||||
ss, err := s.GetState(connection)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error while read processing 'ToForwardMessage'; err: %s", errors.ErrInterfaceMisMatch)
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
clientID, err := ss.GetClientID()
|
if types.ClientID(m.CurrentHeader.Sender) != clientID {
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error while read processing 'ToForwardMessage'; err: %s", errors.ErrInterfaceMisMatch)
|
|
||||||
}
|
|
||||||
|
|
||||||
if m.From != clientID {
|
|
||||||
return fmt.Errorf("error while read processing 'ToForwardMessage'; From and ClientID did not match")
|
return fmt.Errorf("error while read processing 'ToForwardMessage'; From and ClientID did not match")
|
||||||
}
|
}
|
||||||
|
|
||||||
w, ok := _i.(interfaces.CanWriteRoomMessage)
|
w, ok := p.(interfaces.CanWriteRoomMessage)
|
||||||
if !ok {
|
if !ok {
|
||||||
return errors.ErrInterfaceMisMatch
|
return errors.ErrInterfaceMisMatch
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := w.WriteRoomMessage(m.RoomID, msg, m.From, m.To); err != nil {
|
if err := w.WriteRoomMessage(m.RoomID, msg, clientID, m.To...); err != nil {
|
||||||
return fmt.Errorf("error while read processing 'ToForwardMessage' msg; err: %s", err.Error())
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@@ -160,17 +169,21 @@ func (m *JoinRoom) GetProtocol() message.Protocol {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *JoinRoom) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error {
|
func (m *JoinRoom) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error {
|
||||||
s, ok := _i.(interfaces.CanGetState)
|
i, ok := _i.(*chat.ServerInterceptor)
|
||||||
if !ok {
|
if !ok {
|
||||||
return errors.ErrInterfaceMisMatch
|
return errors.ErrInterfaceMisMatch
|
||||||
}
|
}
|
||||||
|
|
||||||
ss, err := s.GetState(connection)
|
s, err := i.GetState(connection)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error while read processing 'JoinRoom' msg; err: %s", err.Error())
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
a, ok := _i.(interfaces.CanAdd)
|
return i.Rooms.Process(m, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *JoinRoom) Process(p interfaces.Processor, s *state.State) error {
|
||||||
|
a, ok := p.(interfaces.CanAdd)
|
||||||
if !ok {
|
if !ok {
|
||||||
return errors.ErrInterfaceMisMatch
|
return errors.ErrInterfaceMisMatch
|
||||||
}
|
}
|
||||||
@@ -181,9 +194,9 @@ func (m *JoinRoom) ReadProcess(_i interceptor.Interceptor, connection intercepto
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
return fmt.Errorf("error while read processing 'JoinRoom' msg; err: %s", errors.ErrMessageForServerOnly)
|
return fmt.Errorf("error while read processing 'JoinRoom' msg; err: %s", errors.ErrContextCancelled)
|
||||||
default:
|
default:
|
||||||
err := a.Add(m.RoomID, ss)
|
err := a.Add(m.RoomID, s)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -202,24 +215,24 @@ func (m *LeaveRoom) GetProtocol() message.Protocol {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *LeaveRoom) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error {
|
func (m *LeaveRoom) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error {
|
||||||
s, ok := _i.(interfaces.CanGetState)
|
i, ok := _i.(*chat.ServerInterceptor)
|
||||||
if !ok {
|
if !ok {
|
||||||
return errors.ErrInterfaceMisMatch
|
return errors.ErrInterfaceMisMatch
|
||||||
}
|
}
|
||||||
|
|
||||||
ss, err := s.GetState(connection)
|
s, err := i.GetState(connection)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error while read processing 'JoinRoom' msg; err: %s", err.Error())
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
a, ok := _i.(interfaces.CanRemove)
|
return i.Rooms.Process(m, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *LeaveRoom) Process(p interfaces.Processor, s *state.State) error {
|
||||||
|
r, ok := p.(interfaces.CanRemove)
|
||||||
if !ok {
|
if !ok {
|
||||||
return errors.ErrInterfaceMisMatch
|
return errors.ErrInterfaceMisMatch
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := a.Remove(m.RoomID, ss); err != nil {
|
return r.Remove(m.RoomID, s)
|
||||||
return fmt.Errorf("error while read processing 'JoinRoom' msg; err: %s", err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
@@ -6,7 +6,9 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
|
||||||
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
|
||||||
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
|
||||||
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -33,7 +35,7 @@ func NewDeleteRoomWaiter(ctx context.Context, cancel context.CancelFunc, deleter
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *DeleteRoomWaiter) Process(r interfaces.CanGetRoom, _ interfaces.State) error {
|
func (p *DeleteRoomWaiter) Process(r interfaces.Processor, _ *state.State) error {
|
||||||
timer := time.NewTimer(p.ttl)
|
timer := time.NewTimer(p.ttl)
|
||||||
defer timer.Stop()
|
defer timer.Stop()
|
||||||
|
|
||||||
@@ -49,7 +51,7 @@ func (p *DeleteRoomWaiter) Process(r interfaces.CanGetRoom, _ interfaces.State)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *DeleteRoomWaiter) ProcessBackground(r interfaces.CanGetRoom, s interfaces.State) interfaces.CanProcessBackground {
|
func (p *DeleteRoomWaiter) ProcessBackground(r interfaces.Processor, s *state.State) interfaces.CanBeProcessedBackground {
|
||||||
go func() {
|
go func() {
|
||||||
if err := p.Process(r, s); err != nil {
|
if err := p.Process(r, s); err != nil {
|
||||||
p.mux.Lock()
|
p.mux.Lock()
|
||||||
@@ -76,7 +78,12 @@ func (p *DeleteRoomWaiter) Stop() {
|
|||||||
p.cancel()
|
p.cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *DeleteRoomWaiter) process(r interfaces.CanGetRoom) error {
|
func (p *DeleteRoomWaiter) process(_r interfaces.Processor) error {
|
||||||
|
r, ok := _r.(interfaces.CanGetRoom)
|
||||||
|
if !ok {
|
||||||
|
return errors.ErrInterfaceMisMatch
|
||||||
|
}
|
||||||
|
|
||||||
room, err := r.GetRoom(p.roomid)
|
room, err := r.GetRoom(p.roomid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error while processing DelteRoomWaiter process; err: %s", err.Error())
|
return fmt.Errorf("error while processing DelteRoomWaiter process; err: %s", err.Error())
|
||||||
|
33
pkg/middleware/chat/process/ident_starter.go
Normal file
33
pkg/middleware/chat/process/ident_starter.go
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
package process
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
|
||||||
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
|
||||||
|
)
|
||||||
|
|
||||||
|
type IdentInit struct {
|
||||||
|
ctx context.Context
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewIdentInit(ctx context.Context) *IdentInit {
|
||||||
|
return &IdentInit{
|
||||||
|
ctx: ctx,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *IdentInit) Process(r interfaces.Processor, s *state.State) error {
|
||||||
|
// TODO: SEND IDENT MESSAGE
|
||||||
|
if err := s.Write(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
waiter := NewIdentWaiter(p.ctx)
|
||||||
|
if err := waiter.Process(r, s); err != nil {
|
||||||
|
return fmt.Errorf("error while processing IdentInit process; err: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
53
pkg/middleware/chat/process/ident_waiter.go
Normal file
53
pkg/middleware/chat/process/ident_waiter.go
Normal file
@@ -0,0 +1,53 @@
|
|||||||
|
package process
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
|
||||||
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
|
||||||
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
|
||||||
|
)
|
||||||
|
|
||||||
|
type IdentWaiterOptions func(*IdentWaiter)
|
||||||
|
|
||||||
|
func NewIdentWaiter(ctx context.Context, options ...IdentWaiterOptions) *IdentWaiter {
|
||||||
|
p := &IdentWaiter{
|
||||||
|
ctx: ctx,
|
||||||
|
duration: 500 * time.Millisecond,
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, option := range options {
|
||||||
|
option(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
type IdentWaiter struct {
|
||||||
|
ctx context.Context
|
||||||
|
duration time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *IdentWaiter) Process(_ interfaces.Processor, s *state.State) error {
|
||||||
|
ticker := time.NewTicker(p.duration)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-p.ctx.Done():
|
||||||
|
return errors.ErrContextCancelled
|
||||||
|
case <-ticker.C:
|
||||||
|
if err := p.process(nil, s); err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
fmt.Println("waiting for ident...")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *IdentWaiter) process(_ interfaces.Processor, s *state.State) error {
|
||||||
|
_, err := s.GetClientID()
|
||||||
|
return err
|
||||||
|
}
|
@@ -10,6 +10,7 @@ import (
|
|||||||
"github.com/harshabose/socket-comm/pkg/message"
|
"github.com/harshabose/socket-comm/pkg/message"
|
||||||
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
|
||||||
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
|
||||||
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
|
||||||
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -35,7 +36,7 @@ func NewSendMessageRoom(ctx context.Context, cancel context.CancelFunc, msgFacto
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *SendMessageRoom) Process(r interfaces.CanGetRoom, s interfaces.State) error {
|
func (p *SendMessageRoom) Process(r interfaces.Processor, s *state.State) error {
|
||||||
ticker := time.NewTicker(p.interval)
|
ticker := time.NewTicker(p.interval)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
@@ -51,9 +52,9 @@ func (p *SendMessageRoom) Process(r interfaces.CanGetRoom, s interfaces.State) e
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *SendMessageRoom) ProcessBackground(r interfaces.CanGetRoom, s interfaces.State) interfaces.CanProcessBackground {
|
func (p *SendMessageRoom) ProcessBackground(_r interfaces.Processor, s *state.State) interfaces.CanBeProcessedBackground {
|
||||||
go func() {
|
go func() {
|
||||||
if err := p.Process(r, s); err != nil {
|
if err := p.Process(_r, s); err != nil {
|
||||||
p.mux.Lock()
|
p.mux.Lock()
|
||||||
p.err = err
|
p.err = err
|
||||||
p.mux.Unlock()
|
p.mux.Unlock()
|
||||||
@@ -78,7 +79,12 @@ func (p *SendMessageRoom) Stop() {
|
|||||||
p.cancel()
|
p.cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *SendMessageRoom) process(r interfaces.CanGetRoom, _ interfaces.State) error {
|
func (p *SendMessageRoom) process(_r interfaces.Processor, _ *state.State) error {
|
||||||
|
r, ok := _r.(interfaces.CanGetRoom)
|
||||||
|
if !ok {
|
||||||
|
return errors.ErrInterfaceMisMatch
|
||||||
|
}
|
||||||
|
|
||||||
room, err := r.GetRoom(p.roomid)
|
room, err := r.GetRoom(p.roomid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -86,11 +92,6 @@ func (p *SendMessageRoom) process(r interfaces.CanGetRoom, _ interfaces.State) e
|
|||||||
|
|
||||||
participants := room.GetParticipants()
|
participants := room.GetParticipants()
|
||||||
|
|
||||||
w, ok := room.(interfaces.CanWriteRoomMessage)
|
|
||||||
if !ok {
|
|
||||||
return errors.ErrInterfaceMisMatch
|
|
||||||
}
|
|
||||||
|
|
||||||
merr := util.NewMultiError()
|
merr := util.NewMultiError()
|
||||||
|
|
||||||
for _, participant := range participants {
|
for _, participant := range participants {
|
||||||
@@ -100,7 +101,7 @@ func (p *SendMessageRoom) process(r interfaces.CanGetRoom, _ interfaces.State) e
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := w.WriteRoomMessage(p.roomid, msg, "", participant); err != nil {
|
if err := room.WriteRoomMessage(p.roomid, msg, "", participant); err != nil {
|
||||||
merr.Add(err)
|
merr.Add(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -1,57 +0,0 @@
|
|||||||
package process
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
|
|
||||||
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
|
|
||||||
)
|
|
||||||
|
|
||||||
type WaitUntilIdent struct {
|
|
||||||
ctx context.Context
|
|
||||||
duration time.Duration
|
|
||||||
}
|
|
||||||
|
|
||||||
type WaitUntilIdentOption func(*WaitUntilIdent)
|
|
||||||
|
|
||||||
func WithTickerDuration(duration time.Duration) WaitUntilIdentOption {
|
|
||||||
return WaitUntilIdentOption(func(ident *WaitUntilIdent) {
|
|
||||||
ident.duration = duration
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewWaitUntilIdentComplete(ctx context.Context, options ...WaitUntilIdentOption) *WaitUntilIdent {
|
|
||||||
i := &WaitUntilIdent{
|
|
||||||
ctx: ctx,
|
|
||||||
duration: 500 * time.Millisecond,
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, option := range options {
|
|
||||||
option(i)
|
|
||||||
}
|
|
||||||
|
|
||||||
return i
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p WaitUntilIdent) Process(_ interfaces.CanGetRoom, s interfaces.State) error {
|
|
||||||
ticker := time.NewTicker(p.duration)
|
|
||||||
defer ticker.Stop()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ticker.C:
|
|
||||||
if p.process(s) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
case <-p.ctx.Done():
|
|
||||||
return fmt.Errorf("error while processing WaitUntilIdent process; err: %s", errors.ErrContextCancelled.Error())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p WaitUntilIdent) process(s interfaces.State) bool {
|
|
||||||
_, err := s.GetClientID()
|
|
||||||
return err == nil
|
|
||||||
}
|
|
21
pkg/middleware/chat/processors/health.go
Normal file
21
pkg/middleware/chat/processors/health.go
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
package processors
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
|
||||||
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Health struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Health) Process(process interfaces.CanBeProcessed, state *state.State) error {
|
||||||
|
return process.Process(p, state)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Health) ProcessBackground(process interfaces.CanBeProcessedBackground, state *state.State) interfaces.CanBeProcessedBackground {
|
||||||
|
return process.ProcessBackground(p, state)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Health) GetSnapshot() {
|
||||||
|
|
||||||
|
}
|
106
pkg/middleware/chat/processors/room.go
Normal file
106
pkg/middleware/chat/processors/room.go
Normal file
@@ -0,0 +1,106 @@
|
|||||||
|
package processors
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/harshabose/socket-comm/pkg/message"
|
||||||
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
|
||||||
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
|
||||||
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/messages"
|
||||||
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/process"
|
||||||
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/room"
|
||||||
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
|
||||||
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
type RoomManager struct {
|
||||||
|
rooms map[types.RoomID]*room.Room
|
||||||
|
mux sync.RWMutex
|
||||||
|
ctx context.Context
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: DO I NEED MUX HERE?
|
||||||
|
|
||||||
|
func (m *RoomManager) Add(id types.RoomID, s *state.State) error {
|
||||||
|
r, err := m.GetRoom(id)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.Add(id, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *RoomManager) Remove(id types.RoomID, s *state.State) error {
|
||||||
|
r, err := m.GetRoom(id)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.Remove(id, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *RoomManager) CreateRoom(id types.RoomID, allowed []types.ClientID, ttl time.Duration) (*room.Room, error) {
|
||||||
|
if m.exists(id) {
|
||||||
|
return nil, fmt.Errorf("error while creating r with id %s; err: %s", id, errors.ErrRoomAlreadyExists)
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(m.ctx, ttl)
|
||||||
|
r := room.NewRoom(ctx, cancel, id, allowed)
|
||||||
|
|
||||||
|
m.rooms[id] = r
|
||||||
|
|
||||||
|
// NOTE: THE FOLLOWING STARTS A BACKGROUND PROCESS WHICH WAITS UNTIL TTL AND KILLS THE ROOM. THIS DOES NOT KILL CONNECTION
|
||||||
|
_ = m.ProcessBackground(process.NewDeleteRoomWaiter(ctx, cancel, m, id, ttl), nil)
|
||||||
|
|
||||||
|
// NOTE: THE FOLLOWING STARTS A BACKGROUND PROCESS WHICH CONSTANTLY SENDS THE GIVEN MESSAGE TO EVERY PARTICIPANT IN THE ROOM
|
||||||
|
_ = m.ProcessBackground(process.NewSendMessageRoom(ctx, cancel, messages.NewRequestHealthFactory(id), id, 5*time.Second), nil)
|
||||||
|
return r, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *RoomManager) GetRoom(id types.RoomID) (*room.Room, error) {
|
||||||
|
exists := m.exists(id)
|
||||||
|
if !exists {
|
||||||
|
return nil, fmt.Errorf("error while getting room with id %s; err: %s", id, errors.ErrRoomNotFound)
|
||||||
|
}
|
||||||
|
|
||||||
|
return m.rooms[id], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *RoomManager) exists(id types.RoomID) bool {
|
||||||
|
_, exists := m.rooms[id]
|
||||||
|
return exists
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *RoomManager) DeleteRoom(id types.RoomID) error {
|
||||||
|
r, err := m.GetRoom(id)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error while deleting r with id: %s; err: %s", id, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := r.Close(); err != nil {
|
||||||
|
return fmt.Errorf("error while deleting r with id: %s; err: %s", id, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(m.rooms, id)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *RoomManager) WriteRoomMessage(roomid types.RoomID, msg message.Message, from types.ClientID, tos ...types.ClientID) error {
|
||||||
|
r, err := m.GetRoom(roomid)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.WriteRoomMessage(roomid, msg, from, tos...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *RoomManager) Process(process interfaces.CanBeProcessed, state *state.State) error {
|
||||||
|
return process.Process(m, state)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *RoomManager) ProcessBackground(process interfaces.CanBeProcessedBackground, state *state.State) interfaces.CanBeProcessedBackground {
|
||||||
|
return process.ProcessBackground(m, state)
|
||||||
|
}
|
@@ -1,105 +0,0 @@
|
|||||||
package room
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/harshabose/socket-comm/pkg/message"
|
|
||||||
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
|
|
||||||
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
|
|
||||||
"github.com/harshabose/socket-comm/pkg/middleware/chat/messages"
|
|
||||||
"github.com/harshabose/socket-comm/pkg/middleware/chat/process"
|
|
||||||
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Manager struct {
|
|
||||||
rooms map[types.RoomID]interfaces.Room
|
|
||||||
ctx context.Context
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Manager) Add(id types.RoomID, s interfaces.State) error {
|
|
||||||
room, err := m.GetRoom(id)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return room.Add(id, s)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Manager) Remove(id types.RoomID, s interfaces.State) error {
|
|
||||||
room, err := m.GetRoom(id)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return room.Remove(id, s)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Manager) CreateRoom(id types.RoomID, allowed []types.ClientID, ttl time.Duration) (interfaces.Room, error) {
|
|
||||||
if m.exists(id) {
|
|
||||||
return nil, fmt.Errorf("error while creating room with id %s; err: %s", id, errors.ErrRoomAlreadyExists)
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(m.ctx, ttl)
|
|
||||||
room := NewRoom(ctx, cancel, id, allowed)
|
|
||||||
|
|
||||||
m.rooms[id] = room
|
|
||||||
|
|
||||||
// NOTE: THE FOLLOWING STARTS A BACKGROUND PROCESS WHICH WAITS UNTIL TTL AND KILLS THE ROOM. THIS DOES NOT KILL CONNECTION
|
|
||||||
_ = m.ProcessBackground(process.NewDeleteRoomWaiter(ctx, cancel, m, id, ttl), nil)
|
|
||||||
|
|
||||||
// NOTE: THE FOLLOWING STARTS A BACKGROUND PROCESS WHICH CONSTANTLY SENDS THE GIVEN MESSAGE TO EVERY PARTICIPANT IN THE ROOM
|
|
||||||
_ = m.ProcessBackground(process.NewSendMessageRoom(ctx, cancel, messages.NewRequestHealthFactory(id), id, 5*time.Second), nil)
|
|
||||||
return room, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Manager) GetRoom(id types.RoomID) (interfaces.Room, error) {
|
|
||||||
exists := m.exists(id)
|
|
||||||
if !exists {
|
|
||||||
return nil, fmt.Errorf("error while getting room with id %s; err: %s", id, errors.ErrRoomNotFound)
|
|
||||||
}
|
|
||||||
|
|
||||||
return m.rooms[id], nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Manager) exists(id types.RoomID) bool {
|
|
||||||
_, exists := m.rooms[id]
|
|
||||||
return exists
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Manager) DeleteRoom(id types.RoomID) error {
|
|
||||||
room, err := m.GetRoom(id)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error while deleting room with id: %s; err: %s", id, err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := room.Close(); err != nil {
|
|
||||||
return fmt.Errorf("error while deleting room with id: %s; err: %s", id, err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
delete(m.rooms, id)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Manager) WriteRoomMessage(roomid types.RoomID, msg message.Message, from types.ClientID, tos ...types.ClientID) error {
|
|
||||||
room, err := m.GetRoom(roomid)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
w, ok := room.(interfaces.CanWriteRoomMessage)
|
|
||||||
if !ok {
|
|
||||||
return errors.ErrInterfaceMisMatch
|
|
||||||
}
|
|
||||||
|
|
||||||
return w.WriteRoomMessage(roomid, msg, from, tos...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Manager) Process(process interfaces.CanProcess, state interfaces.State) error {
|
|
||||||
return process.Process(m, state)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Manager) ProcessBackground(process interfaces.CanProcessBackground, state interfaces.State) interfaces.CanProcessBackground {
|
|
||||||
return process.ProcessBackground(m, state)
|
|
||||||
}
|
|
@@ -6,7 +6,7 @@ import (
|
|||||||
|
|
||||||
"github.com/harshabose/socket-comm/pkg/message"
|
"github.com/harshabose/socket-comm/pkg/message"
|
||||||
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
|
||||||
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
|
||||||
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -14,18 +14,20 @@ type Room struct {
|
|||||||
// NOTE: MAYBE A CONFIG FOR ROOM?
|
// NOTE: MAYBE A CONFIG FOR ROOM?
|
||||||
roomid types.RoomID
|
roomid types.RoomID
|
||||||
allowed []types.ClientID
|
allowed []types.ClientID
|
||||||
participants map[types.ClientID]interfaces.State
|
participants map[types.ClientID]*state.State
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: DO I NEED MUX HERE?
|
||||||
|
|
||||||
func NewRoom(ctx context.Context, cancel context.CancelFunc, id types.RoomID, allowed []types.ClientID) *Room {
|
func NewRoom(ctx context.Context, cancel context.CancelFunc, id types.RoomID, allowed []types.ClientID) *Room {
|
||||||
return &Room{
|
return &Room{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
roomid: id,
|
roomid: id,
|
||||||
allowed: allowed,
|
allowed: allowed,
|
||||||
participants: make(map[types.ClientID]interfaces.State),
|
participants: make(map[types.ClientID]*state.State),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -33,7 +35,7 @@ func (r *Room) ID() types.RoomID {
|
|||||||
return r.roomid
|
return r.roomid
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Room) Add(roomid types.RoomID, s interfaces.State) error {
|
func (r *Room) Add(roomid types.RoomID, s *state.State) error {
|
||||||
if roomid != r.roomid {
|
if roomid != r.roomid {
|
||||||
return errors.ErrWrongRoom
|
return errors.ErrWrongRoom
|
||||||
}
|
}
|
||||||
@@ -93,7 +95,7 @@ func (r *Room) forEachBoolean(f func(id types.ClientID) bool, ids ...types.Clien
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Room) Remove(roomid types.RoomID, s interfaces.State) error {
|
func (r *Room) Remove(roomid types.RoomID, s *state.State) error {
|
||||||
if roomid != r.roomid {
|
if roomid != r.roomid {
|
||||||
return errors.ErrWrongRoom
|
return errors.ErrWrongRoom
|
||||||
}
|
}
|
||||||
@@ -152,12 +154,7 @@ func (r *Room) WriteRoomMessage(roomid types.RoomID, msg message.Message, from t
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, to := range tos {
|
for _, to := range tos {
|
||||||
s, ok := (r.participants[to]).(interfaces.CanWriteMessage)
|
if err := r.participants[to].Write(msg); err != nil {
|
||||||
if !ok {
|
|
||||||
return errors.ErrInterfaceMisMatch
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := s.Write(msg); err != nil {
|
|
||||||
return fmt.Errorf("error while sending message to peer in room; err: %s", err.Error())
|
return fmt.Errorf("error while sending message to peer in room; err: %s", err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -181,7 +178,7 @@ func (r *Room) GetParticipants() []types.ClientID {
|
|||||||
|
|
||||||
func (r *Room) Close() error {
|
func (r *Room) Close() error {
|
||||||
r.cancel()
|
r.cancel()
|
||||||
r.participants = make(map[types.ClientID]interfaces.State)
|
r.participants = make(map[types.ClientID]*state.State)
|
||||||
r.allowed = make([]types.ClientID, 0)
|
r.allowed = make([]types.ClientID, 0)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@@ -6,17 +6,14 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/harshabose/socket-comm/pkg/interceptor"
|
"github.com/harshabose/socket-comm/pkg/interceptor"
|
||||||
"github.com/harshabose/socket-comm/pkg/message"
|
|
||||||
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
|
|
||||||
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
|
||||||
"github.com/harshabose/socket-comm/pkg/middleware/chat/process"
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/process"
|
||||||
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type ServerInterceptor struct {
|
type ServerInterceptor struct {
|
||||||
commonInterceptor
|
commonInterceptor
|
||||||
interceptor.Interceptor
|
Rooms interfaces.Processor
|
||||||
rooms interfaces.RoomManager
|
Health interfaces.Processor
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *ServerInterceptor) BindSocketConnection(connection interceptor.Connection, writer interceptor.Writer, reader interceptor.Reader) (interceptor.Writer, interceptor.Reader, error) {
|
func (i *ServerInterceptor) BindSocketConnection(connection interceptor.Connection, writer interceptor.Writer, reader interceptor.Reader) (interceptor.Writer, interceptor.Reader, error) {
|
||||||
@@ -29,21 +26,11 @@ func (i *ServerInterceptor) Init(connection interceptor.Connection) error {
|
|||||||
return fmt.Errorf("error while init; err: %s", err.Error())
|
return fmt.Errorf("error while init; err: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
w, ok := s.(interfaces.CanWriteMessage)
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
if !ok {
|
|
||||||
return errors.ErrInterfaceMisMatch
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: SEND IDENT MESSAGE
|
|
||||||
if err := w.Write(); err != nil {
|
|
||||||
return fmt.Errorf("error while init; err: %s", err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(s.Ctx(), 10*time.Second)
|
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
p := process.NewWaitUntilIdentComplete(ctx, process.WithTickerDuration(500*time.Millisecond))
|
p := process.NewIdentInit(ctx)
|
||||||
if err := p.Process(nil, s); err != nil {
|
if err := p.Process(i.Rooms, s); err != nil {
|
||||||
return fmt.Errorf("error while init; err: %s", err.Error())
|
return fmt.Errorf("error while init; err: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -57,38 +44,3 @@ func (i *ServerInterceptor) UnBindSocketConnection(connection interceptor.Connec
|
|||||||
func (i *ServerInterceptor) Close() error {
|
func (i *ServerInterceptor) Close() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *ServerInterceptor) CreateRoom(id types.RoomID, allowed []types.ClientID, ttl time.Duration) (interfaces.Room, error) {
|
|
||||||
return i.rooms.CreateRoom(id, allowed, ttl)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (i *ServerInterceptor) DeleteRoom(id types.RoomID) error {
|
|
||||||
return i.rooms.DeleteRoom(id)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (i *ServerInterceptor) WriteRoomMessage(roomid types.RoomID, msg message.Message, from types.ClientID, tos ...types.ClientID) error {
|
|
||||||
w, ok := i.rooms.(interfaces.CanWriteRoomMessage)
|
|
||||||
if !ok {
|
|
||||||
return errors.ErrInterfaceMisMatch
|
|
||||||
}
|
|
||||||
|
|
||||||
return w.WriteRoomMessage(roomid, msg, from, tos...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (i *ServerInterceptor) Add(id types.RoomID, s interfaces.State) error {
|
|
||||||
a, ok := i.rooms.(interfaces.CanAdd)
|
|
||||||
if !ok {
|
|
||||||
return errors.ErrInterfaceMisMatch
|
|
||||||
}
|
|
||||||
|
|
||||||
return a.Add(id, s)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (i *ServerInterceptor) Remove(id types.RoomID, s interfaces.State) error {
|
|
||||||
r, ok := i.rooms.(interfaces.CanRemove)
|
|
||||||
if !ok {
|
|
||||||
return errors.ErrInterfaceMisMatch
|
|
||||||
}
|
|
||||||
|
|
||||||
return r.Remove(id, s)
|
|
||||||
}
|
|
||||||
|
@@ -6,27 +6,26 @@ import (
|
|||||||
"github.com/harshabose/socket-comm/internal/util"
|
"github.com/harshabose/socket-comm/internal/util"
|
||||||
"github.com/harshabose/socket-comm/pkg/interceptor"
|
"github.com/harshabose/socket-comm/pkg/interceptor"
|
||||||
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
|
||||||
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type Manager struct {
|
type Manager struct {
|
||||||
states map[interceptor.Connection]interfaces.State
|
states map[interceptor.Connection]*State
|
||||||
mux sync.RWMutex
|
mux sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) GetState(connection interceptor.Connection) (interfaces.State, error) {
|
func (m *Manager) GetState(connection interceptor.Connection) (*State, error) {
|
||||||
m.mux.RLock()
|
m.mux.RLock()
|
||||||
defer m.mux.RUnlock()
|
defer m.mux.RUnlock()
|
||||||
|
|
||||||
state, exists := m.states[connection]
|
s, exists := m.states[connection]
|
||||||
if !exists {
|
if !exists {
|
||||||
return nil, errors.ErrConnectionNotFound
|
return nil, errors.ErrConnectionNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
return state, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) SetState(connection interceptor.Connection, s interfaces.State) error {
|
func (m *Manager) SetState(connection interceptor.Connection, s *State) error {
|
||||||
m.mux.Lock()
|
m.mux.Lock()
|
||||||
defer m.mux.Unlock()
|
defer m.mux.Unlock()
|
||||||
|
|
||||||
@@ -38,7 +37,7 @@ func (m *Manager) SetState(connection interceptor.Connection, s interfaces.State
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemoveState removes a connection's state
|
// RemoveState removes a connection's State
|
||||||
func (m *Manager) RemoveState(connection interceptor.Connection) error {
|
func (m *Manager) RemoveState(connection interceptor.Connection) error {
|
||||||
m.mux.Lock()
|
m.mux.Lock()
|
||||||
defer m.mux.Unlock()
|
defer m.mux.Unlock()
|
||||||
@@ -52,14 +51,14 @@ func (m *Manager) RemoveState(connection interceptor.Connection) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ForEach executes the provided function for each state in the manager
|
// ForEach executes the provided function for each State in the manager
|
||||||
func (m *Manager) ForEach(fn func(connection interceptor.Connection, state interfaces.State) error) error {
|
func (m *Manager) ForEach(fn func(connection interceptor.Connection, state *State) error) error {
|
||||||
m.mux.RLock()
|
m.mux.RLock()
|
||||||
defer m.mux.RUnlock()
|
defer m.mux.RUnlock()
|
||||||
|
|
||||||
var errs util.MultiError
|
var errs util.MultiError
|
||||||
for conn, state := range m.states {
|
for conn, s := range m.states {
|
||||||
if err := fn(conn, state); err != nil {
|
if err := fn(conn, s); err != nil {
|
||||||
errs.Add(err)
|
errs.Add(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -6,11 +6,10 @@ import (
|
|||||||
"github.com/harshabose/socket-comm/pkg/interceptor"
|
"github.com/harshabose/socket-comm/pkg/interceptor"
|
||||||
"github.com/harshabose/socket-comm/pkg/message"
|
"github.com/harshabose/socket-comm/pkg/message"
|
||||||
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
|
||||||
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
|
|
||||||
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
|
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
type state struct {
|
type State struct {
|
||||||
id types.ClientID
|
id types.ClientID
|
||||||
connection interceptor.Connection
|
connection interceptor.Connection
|
||||||
writer interceptor.Writer
|
writer interceptor.Writer
|
||||||
@@ -19,8 +18,8 @@ type state struct {
|
|||||||
ctx context.Context
|
ctx context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewState(ctx context.Context, cancel context.CancelFunc, connection interceptor.Connection, writer interceptor.Writer, reader interceptor.Reader) interfaces.State {
|
func NewState(ctx context.Context, cancel context.CancelFunc, connection interceptor.Connection, writer interceptor.Writer, reader interceptor.Reader) *State {
|
||||||
return &state{
|
return &State{
|
||||||
id: types.UnKnownClient,
|
id: types.UnKnownClient,
|
||||||
connection: connection,
|
connection: connection,
|
||||||
writer: writer,
|
writer: writer,
|
||||||
@@ -30,11 +29,11 @@ func NewState(ctx context.Context, cancel context.CancelFunc, connection interce
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *state) Ctx() context.Context {
|
func (s *State) Ctx() context.Context {
|
||||||
return s.ctx
|
return s.ctx
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *state) GetClientID() (types.ClientID, error) {
|
func (s *State) GetClientID() (types.ClientID, error) {
|
||||||
if s.id == types.UnKnownClient {
|
if s.id == types.UnKnownClient {
|
||||||
return s.id, errors.ErrUnknownClientIDState
|
return s.id, errors.ErrUnknownClientIDState
|
||||||
}
|
}
|
||||||
@@ -42,11 +41,11 @@ func (s *state) GetClientID() (types.ClientID, error) {
|
|||||||
return s.id, nil
|
return s.id, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *state) Write(msg message.Message) error {
|
func (s *State) Write(msg message.Message) error {
|
||||||
return s.writer.Write(s.connection, msg)
|
return s.writer.Write(s.connection, msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *state) SetClientID(id types.ClientID) error {
|
func (s *State) SetClientID(id types.ClientID) error {
|
||||||
if s.id != types.UnKnownClient {
|
if s.id != types.UnKnownClient {
|
||||||
return errors.ErrClientIDNotConsistent
|
return errors.ErrClientIDNotConsistent
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user