in-process of converting interceptors to client and server interceptors

This commit is contained in:
harshabose
2025-05-06 21:08:46 +05:30
parent 2672ffe223
commit 5d89fe91d6
19 changed files with 810 additions and 288 deletions

View File

@@ -66,6 +66,8 @@ type Message interface {
GetNextProtocol() Protocol GetNextProtocol() Protocol
GetCurrentHeader() Header
// GetNext retrieves the next message in the chain, if one exists // GetNext retrieves the next message in the chain, if one exists
// Returns nil, nil if there is no next message // Returns nil, nil if there is no next message
GetNext(Registry) (Message, error) GetNext(Registry) (Message, error)
@@ -127,6 +129,10 @@ func (m *BaseMessage) GetNextProtocol() Protocol {
return m.NextProtocol return m.NextProtocol
} }
func (m *BaseMessage) GetCurrentHeader() Header {
return m.CurrentHeader
}
// GetNext retrieves the next message in the chain, if one exists. // GetNext retrieves the next message in the chain, if one exists.
// Returns nil, nil if NextProtocol is NoneProtocol. // Returns nil, nil if NextProtocol is NoneProtocol.
// Uses the provided Registry to create and unmarshal the next message. // Uses the provided Registry to create and unmarshal the next message.

View File

@@ -0,0 +1,13 @@
package chat
import (
"github.com/harshabose/socket-comm/pkg/interceptor"
)
type ClientInterceptor struct {
commonInterceptor
}
func (i *ClientInterceptor) BindSocketConnection(connection interceptor.Connection, writer interceptor.Writer, reader interceptor.Reader) (interceptor.Writer, interceptor.Reader, error) {
return i.commonInterceptor.BindSocketConnection(connection, writer, reader)
}

View File

@@ -0,0 +1,103 @@
package chat
import (
"context"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
)
type commonInterceptor struct {
interceptor.NoOpInterceptor
readProcessMessages message.Registry
writeProcessMessages message.Registry
states interfaces.StateManager
}
func (i *commonInterceptor) BindSocketConnection(connection interceptor.Connection, writer interceptor.Writer, reader interceptor.Reader) (interceptor.Writer, interceptor.Reader, error) {
ctx, cancel := context.WithCancel(i.Ctx())
if err := i.states.SetState(connection, state.NewState(ctx, cancel, connection, writer, reader)); err != nil {
cancel()
return nil, nil, err
}
return writer, reader, nil
}
func (i *commonInterceptor) InterceptSocketWriter(writer interceptor.Writer) interceptor.Writer {
return interceptor.WriterFunc(func(connection interceptor.Connection, msg message.Message) error {
if msg == nil {
return nil
}
if !i.writeProcessMessages.Check(msg.GetProtocol()) {
return writer.Write(connection, msg)
}
m, ok := msg.(interceptor.Message)
if !ok {
return writer.Write(connection, msg)
}
next, err := m.GetNext(nil)
if err != nil {
return writer.Write(connection, msg)
}
if err := m.WriteProcess(i, connection); err != nil {
return writer.Write(connection, next)
}
return writer.Write(connection, next)
})
}
func (i *commonInterceptor) InterceptSocketReader(reader interceptor.Reader) interceptor.Reader {
return interceptor.ReaderFunc(func(connection interceptor.Connection) (message.Message, error) {
msg, err := reader.Read(connection)
if err != nil {
return msg, err
}
if msg == nil {
return nil, nil
}
if !i.readProcessMessages.Check(msg.GetProtocol()) {
return msg, nil
}
m, ok := msg.(interceptor.Message)
if !ok {
return msg, errors.ErrInterfaceMisMatch
}
next, err := m.GetNext(nil)
if err != nil {
return msg, nil
}
if err := m.ReadProcess(i, connection); err != nil {
return next, nil
}
return next, nil
})
}
func (i *commonInterceptor) UnBindSocketConnection(connection interceptor.Connection) {
}
func (i *commonInterceptor) Close() error {
// TODO: FIGURE OUR GOOD CLOSE STRATEGY
return nil
}
func (i *commonInterceptor) GetState(connection interceptor.Connection) (interfaces.State, error) {
return i.states.GetState(connection)
}

View File

@@ -1,5 +0,0 @@
package config
type Config struct {
IsServer bool
}

View File

@@ -3,9 +3,11 @@ package errors
import "errors" import "errors"
var ( var (
ErrContextCancelled = errors.New("context cancelled")
ErrInterfaceMisMatch = errors.New("unsatisfied interface triggered") ErrInterfaceMisMatch = errors.New("unsatisfied interface triggered")
ErrMessageForServerOnly = errors.New("message should only be processed by server") ErrMessageForServerOnly = errors.New("message should only be processed by server")
ErrMessageForClientOnly = errors.New("message should only be processed by client") ErrMessageForClientOnly = errors.New("message should only be processed by client")
ErrClientIDNotConsistent = errors.New("client id is not consistent throughout the connection")
ErrConnectionNotFound = errors.New("connection not registered") ErrConnectionNotFound = errors.New("connection not registered")
ErrConnectionExists = errors.New("connection already exists") ErrConnectionExists = errors.New("connection already exists")
@@ -19,3 +21,7 @@ var (
ErrClientNotAParticipant = errors.New("client is not a participant in the room at the moment") ErrClientNotAParticipant = errors.New("client is not a participant in the room at the moment")
ErrWrongRoom = errors.New("operation not permitted as room id did not match") ErrWrongRoom = errors.New("operation not permitted as room id did not match")
) )
func New(text string) error {
return errors.New(text)
}

View File

@@ -1,115 +0,0 @@
package chat
import (
"context"
"time"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat/config"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
type Interceptor struct {
interceptor.NoOpInterceptor
localMessageRegistry message.Registry
states interfaces.StateManager
rooms interfaces.RoomManager
config config.Config
}
func (i *Interceptor) BindSocketConnection(connection interceptor.Connection, writer interceptor.Writer, reader interceptor.Reader) (interceptor.Writer, interceptor.Reader, error) {
ctx, cancel := context.WithCancel(i.Ctx())
if err := i.states.SetState(connection, state.NewState(ctx, cancel, i.config, connection, writer, reader)); err != nil {
cancel()
return nil, nil, err
}
return writer, reader, nil
}
func (i *Interceptor) Init(connection interceptor.Connection) error {
// TODO: ADD DEFAULT ROOM CREATION / JOIN ROOM
return nil
}
func (i *Interceptor) InterceptSocketWriter(writer interceptor.Writer) interceptor.Writer {
return interceptor.WriterFunc(func(connection interceptor.Connection, message message.Message) error {
return writer.Write(connection, message)
})
}
func (i *Interceptor) InterceptSocketReader(reader interceptor.Reader) interceptor.Reader {
return interceptor.ReaderFunc(func(connection interceptor.Connection) (message.Message, error) {
msg, err := reader.Read(connection)
if err != nil {
return msg, err
}
if !i.localMessageRegistry.Check(msg.GetProtocol()) {
return msg, nil
}
m, ok := msg.(interceptor.Message)
if !ok {
return msg, errors.ErrInterfaceMisMatch
}
if err := m.ReadProcess(i, connection); err != nil {
return nil, err
}
return m.GetNext(i.GetMessageRegistry())
})
}
func (i *Interceptor) UnBindSocketConnection(connection interceptor.Connection) {
}
func (i *Interceptor) Close() error {
return nil
}
func (i *Interceptor) GetState(connection interceptor.Connection) (interfaces.State, error) {
return i.states.GetState(connection)
}
func (i *Interceptor) CreateRoom(id types.RoomID, allowed []types.ClientID, ttl time.Duration) (interfaces.Room, error) {
return i.rooms.CreateRoom(id, allowed, ttl)
}
func (i *Interceptor) DeleteRoom(id types.RoomID) error {
return i.rooms.DeleteRoom(id)
}
func (i *Interceptor) WriteMessage(roomid types.RoomID, msg message.Message, from types.ClientID, tos ...types.ClientID) error {
w, ok := i.rooms.(interfaces.CanWriteRoomMessage)
if !ok {
return errors.ErrInterfaceMisMatch
}
return w.WriteMessage(roomid, msg, from, tos...)
}
func (i *Interceptor) Add(id types.RoomID, s interfaces.State) error {
a, ok := i.rooms.(interfaces.CanAdd)
if !ok {
return errors.ErrInterfaceMisMatch
}
return a.Add(id, s)
}
func (i *Interceptor) Remove(id types.RoomID, s interfaces.State) error {
r, ok := i.rooms.(interfaces.CanRemove)
if !ok {
return errors.ErrInterfaceMisMatch
}
return r.Remove(id, s)
}

View File

@@ -20,6 +20,7 @@ type Room interface {
CanAdd CanAdd
CanRemove CanRemove
ID() types.RoomID ID() types.RoomID
GetParticipants() []types.ClientID
io.Closer io.Closer
} }
@@ -28,7 +29,7 @@ type CanGetRoom interface {
} }
type CanWriteRoomMessage interface { type CanWriteRoomMessage interface {
WriteMessage(roomid types.RoomID, msg message.Message, from types.ClientID, tos ...types.ClientID) error WriteRoomMessage(roomid types.RoomID, msg message.Message, from types.ClientID, tos ...types.ClientID) error
} }
type CanCreateRoom interface { type CanCreateRoom interface {
@@ -42,12 +43,23 @@ type CanDeleteRoom interface {
type RoomManager interface { type RoomManager interface {
CanCreateRoom CanCreateRoom
CanDeleteRoom CanDeleteRoom
CanGetRoom
} }
type RoomProcessor interface { type RoomProcessor interface {
Process(CanProcess, State) error Process(CanProcess, State) error
} }
type RoomProcessorBackground interface {
ProcessBackground(CanProcessBackground, State) CanProcessBackground
}
type CanProcess interface { type CanProcess interface {
Process(CanGetRoom, State) error Process(CanGetRoom, State) error
} }
type CanProcessBackground interface {
ProcessBackground(room CanGetRoom, state State) CanProcessBackground
Wait() error
Stop()
}

View File

@@ -1,15 +1,20 @@
package interfaces package interfaces
import ( import (
"context"
"github.com/harshabose/socket-comm/pkg/interceptor" "github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message" "github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat/config"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types" "github.com/harshabose/socket-comm/pkg/middleware/chat/types"
) )
type CanSetClientID interface {
SetClientID(id types.ClientID) error
}
type State interface { type State interface {
GetClientID() (types.ClientID, error) GetClientID() (types.ClientID, error)
GetConfig() config.Config Ctx() context.Context
} }
type CanGetState interface { type CanGetState interface {

View File

@@ -0,0 +1,89 @@
package messages
import (
"fmt"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
var (
IdentProtocol message.Protocol = "room:ident"
IdentResponseProtocol message.Protocol = "room:ident_response"
)
type Ident struct {
interceptor.BaseMessage
}
func (m *Ident) GetProtocol() message.Protocol {
return IdentProtocol
}
func (m *Ident) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error {
s, ok := _i.(interfaces.CanGetState)
if !ok {
return fmt.Errorf("error while processing 'Ident' message; err: %s", errors.ErrInterfaceMisMatch.Error())
}
ss, err := s.GetState(connection)
if err != nil {
return fmt.Errorf("error while processing 'Ident' message; err: %s", err.Error())
}
c, ok := ss.(interfaces.CanSetClientID)
if !ok {
return fmt.Errorf("error while processing 'Ident' message; err: %s", errors.ErrInterfaceMisMatch.Error())
}
if err := c.SetClientID(types.ClientID(m.CurrentHeader.Sender)); err != nil {
return fmt.Errorf("error while processing 'Ident' message; err: %s", err.Error())
}
w, ok := ss.(interfaces.CanWriteMessage)
if !ok {
return fmt.Errorf("error while processing 'Ident' message; err: %s", errors.ErrInterfaceMisMatch.Error())
}
if err := w.Write(&IdentResponse{}); err != nil {
return fmt.Errorf("error while processing 'Ident' message; err: %s", err.Error())
}
return nil
}
type IdentResponse struct {
interceptor.BaseMessage
}
// TODO: ADD THE FACTORY FOR IdentResponse
func (m *IdentResponse) GetProtocol() message.Protocol {
return IdentResponseProtocol
}
func (m *IdentResponse) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error {
s, ok := _i.(interfaces.CanGetState)
if !ok {
return fmt.Errorf("error while processing 'Ident' message; err: %s", errors.ErrInterfaceMisMatch.Error())
}
ss, err := s.GetState(connection)
if err != nil {
return fmt.Errorf("error while processing 'Ident' message; err: %s", err.Error())
}
c, ok := ss.(interfaces.CanSetClientID)
if !ok {
return fmt.Errorf("error while processing 'Ident' message; err: %s", errors.ErrInterfaceMisMatch.Error())
}
if err := c.SetClientID(types.ClientID(m.CurrentHeader.Sender)); err != nil {
return fmt.Errorf("error while processing 'Ident' message; err: %s", err.Error())
}
return nil
}

View File

@@ -0,0 +1,68 @@
package messages
import (
"fmt"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
var (
RequestHealthProtocol message.Protocol = "room:request_health"
HealthResponseProtocol message.Protocol = "room:health_response"
)
type RequestHealth struct {
interceptor.BaseMessage
RoomID types.RoomID `json:"room_id"`
}
func NewRequestHealth(id types.RoomID) *RequestHealth {
return &RequestHealth{
RoomID: id,
}
}
func NewRequestHealthFactory(id types.RoomID) func() message.Message {
return func() message.Message {
return NewRequestHealth(id)
}
}
func (m *RequestHealth) GetProtocol() message.Protocol {
return RequestHealthProtocol
}
func (m *RequestHealth) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error {
}
func (m *RequestHealth) WriteProcess(_i interceptor.Interceptor, connection interceptor.Connection) error {
s, ok := _i.(interfaces.CanGetState)
if !ok {
return errors.ErrInterfaceMisMatch
}
ss, err := s.GetState(connection)
if err != nil {
return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error())
}
id, err := ss.GetClientID()
if err != nil {
return fmt.Errorf("error while read processing 'RequestHealth' msg; err: %s", err.Error())
}
m.SetSender(message.Sender(_i.ID()))
m.SetReceiver(message.Receiver(id))
return nil
}
type HealthResponse struct {
interceptor.BaseMessage
RoomID types.RoomID `json:"room_id"`
}

View File

@@ -1,4 +1,4 @@
package room package messages
import ( import (
"fmt" "fmt"
@@ -47,10 +47,6 @@ func (m *CreateRoom) ReadProcess(_i interceptor.Interceptor, connection intercep
return fmt.Errorf("error while read processing 'CreateRoom' msg; err: %s", err.Error()) return fmt.Errorf("error while read processing 'CreateRoom' msg; err: %s", err.Error())
} }
if !ss.GetConfig().IsServer {
return fmt.Errorf("error while read processing 'CreateRoom' msg; err: %s", errors.ErrMessageForServerOnly)
}
room, err := i.CreateRoom(m.RoomID, m.Allowed, m.TTL) room, err := i.CreateRoom(m.RoomID, m.Allowed, m.TTL)
if err != nil { if err != nil {
return fmt.Errorf("error while read processing 'CreateRoom' msg; err: %s", err.Error()) return fmt.Errorf("error while read processing 'CreateRoom' msg; err: %s", err.Error())
@@ -72,27 +68,13 @@ func (m *DeleteRoom) GetProtocol() message.Protocol {
return DeleteRoomProtocol return DeleteRoomProtocol
} }
func (m *DeleteRoom) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error { func (m *DeleteRoom) ReadProcess(i interceptor.Interceptor, _ interceptor.Connection) error {
s, ok := _i.(interfaces.CanGetState) d, ok := i.(interfaces.CanDeleteRoom)
if !ok { if !ok {
return errors.ErrInterfaceMisMatch return errors.ErrInterfaceMisMatch
} }
i, ok := _i.(interfaces.CanDeleteRoom) if err := d.DeleteRoom(m.RoomID); err != nil {
if !ok {
return errors.ErrInterfaceMisMatch
}
ss, err := s.GetState(connection)
if err != nil {
return fmt.Errorf("error while read processing 'DeleteRoom' msg; err: %s", err.Error())
}
if !ss.GetConfig().IsServer {
return fmt.Errorf("error while read processing 'DeleteRoom' msg; err: %s", errors.ErrMessageForServerOnly)
}
if err := i.DeleteRoom(m.RoomID); err != nil {
return fmt.Errorf("error while read processing 'DeleteRoom' msg; err: %s", errors.ErrMessageForServerOnly) return fmt.Errorf("error while read processing 'DeleteRoom' msg; err: %s", errors.ErrMessageForServerOnly)
} }
@@ -111,23 +93,28 @@ func (m *ToForwardMessage) GetProtocol() message.Protocol {
} }
func (m *ToForwardMessage) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error { func (m *ToForwardMessage) ReadProcess(_i interceptor.Interceptor, connection interceptor.Connection) error {
s, ok := _i.(interfaces.CanGetState)
if !ok {
return errors.ErrInterfaceMisMatch
}
msg, err := newForwardedMessage(m) msg, err := newForwardedMessage(m)
if err != nil { if err != nil {
return fmt.Errorf("error while read processing 'ToForwardMessage'; err: %s", err.Error()) return fmt.Errorf("error while read processing 'ToForwardMessage'; err: %s", err.Error())
} }
ss, err := s.GetState(connection) s, ok := _i.(interfaces.CanGetState)
if err != nil { if !ok {
return fmt.Errorf("error while read processing 'ToForwardMessage' msg; err: %s", err.Error()) return fmt.Errorf("error while read processing 'ToForwardMessage'; err: %s", errors.ErrInterfaceMisMatch)
} }
if !ss.GetConfig().IsServer { ss, err := s.GetState(connection)
return fmt.Errorf("error while read processing 'ToForwardMessage' msg; err: %s", errors.ErrMessageForServerOnly) if err != nil {
return fmt.Errorf("error while read processing 'ToForwardMessage'; err: %s", errors.ErrInterfaceMisMatch)
}
clientID, err := ss.GetClientID()
if err != nil {
return fmt.Errorf("error while read processing 'ToForwardMessage'; err: %s", errors.ErrInterfaceMisMatch)
}
if m.From != clientID {
return fmt.Errorf("error while read processing 'ToForwardMessage'; From and ClientID did not match")
} }
w, ok := _i.(interfaces.CanWriteRoomMessage) w, ok := _i.(interfaces.CanWriteRoomMessage)
@@ -135,7 +122,7 @@ func (m *ToForwardMessage) ReadProcess(_i interceptor.Interceptor, connection in
return errors.ErrInterfaceMisMatch return errors.ErrInterfaceMisMatch
} }
if err := w.WriteMessage(m.RoomID, msg, m.From, m.To); err != nil { if err := w.WriteRoomMessage(m.RoomID, msg, m.From, m.To); err != nil {
return fmt.Errorf("error while read processing 'ToForwardMessage' msg; err: %s", err.Error()) return fmt.Errorf("error while read processing 'ToForwardMessage' msg; err: %s", err.Error())
} }
@@ -164,7 +151,8 @@ func newForwardedMessage(forward *ToForwardMessage) (*ForwardedMessage, error) {
type JoinRoom struct { type JoinRoom struct {
interceptor.BaseMessage interceptor.BaseMessage
RoomID types.RoomID RoomID types.RoomID `json:"room_id"`
WaitDuration time.Duration `json:"wait_duration"`
} }
func (m *JoinRoom) GetProtocol() message.Protocol { func (m *JoinRoom) GetProtocol() message.Protocol {
@@ -187,11 +175,21 @@ func (m *JoinRoom) ReadProcess(_i interceptor.Interceptor, connection intercepto
return errors.ErrInterfaceMisMatch return errors.ErrInterfaceMisMatch
} }
if err := a.Add(m.RoomID, ss); err != nil { timer := time.NewTimer(m.WaitDuration)
return fmt.Errorf("error while read processing 'JoinRoom' msg; err: %s", err.Error()) defer timer.Stop()
}
for {
select {
case <-timer.C:
return fmt.Errorf("error while read processing 'JoinRoom' msg; err: %s", errors.ErrMessageForServerOnly)
default:
err := a.Add(m.RoomID, ss)
if err == nil {
return nil return nil
}
fmt.Println(fmt.Errorf("error while read processing 'JoinRoom' msg; err: %s. retrying", err.Error()))
}
}
} }
type LeaveRoom struct { type LeaveRoom struct {

View File

@@ -0,0 +1,94 @@
package process
import (
"context"
"fmt"
"sync"
"time"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
type DeleteRoomWaiter struct {
deleter interfaces.CanDeleteRoom
ttl time.Duration
roomid types.RoomID
err error
ctx context.Context
cancel context.CancelFunc
mux sync.RWMutex
done chan struct{}
}
func NewDeleteRoomWaiter(ctx context.Context, cancel context.CancelFunc, deleter interfaces.CanDeleteRoom, roomid types.RoomID, ttl time.Duration) *DeleteRoomWaiter {
return &DeleteRoomWaiter{
ctx: ctx,
cancel: cancel,
roomid: roomid,
deleter: deleter,
err: nil,
ttl: ttl,
done: make(chan struct{}),
}
}
func (p *DeleteRoomWaiter) Process(r interfaces.CanGetRoom, _ interfaces.State) error {
timer := time.NewTimer(p.ttl)
defer timer.Stop()
for {
select {
case <-timer.C:
if err := p.process(r); err != nil {
return fmt.Errorf("error while processing DeleteRoomWaiter process; err: %s", err.Error())
}
case <-p.ctx.Done():
return fmt.Errorf("context cancelled before process completion")
}
}
}
func (p *DeleteRoomWaiter) ProcessBackground(r interfaces.CanGetRoom, s interfaces.State) interfaces.CanProcessBackground {
go func() {
if err := p.Process(r, s); err != nil {
p.mux.Lock()
p.err = err
p.mux.Unlock()
p.done <- struct{}{}
fmt.Println(p.err)
}
}()
return p
}
func (p *DeleteRoomWaiter) Wait() error {
<-p.done
p.mux.RLock()
defer p.mux.RUnlock()
return p.err
}
func (p *DeleteRoomWaiter) Stop() {
p.cancel()
}
func (p *DeleteRoomWaiter) process(r interfaces.CanGetRoom) error {
room, err := r.GetRoom(p.roomid)
if err != nil {
return fmt.Errorf("error while processing DelteRoomWaiter process; err: %s", err.Error())
}
if err := room.Close(); err != nil {
return err
}
if err := p.deleter.DeleteRoom(p.roomid); err != nil {
return err
}
return nil
}

View File

@@ -0,0 +1,103 @@
package process
import (
"context"
"fmt"
"sync"
"time"
"github.com/harshabose/socket-comm/internal/util"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
type SendMessageRoom struct {
msgFactory func() message.Message
roomid types.RoomID
interval time.Duration
err error
mux sync.RWMutex
done chan struct{}
ctx context.Context
cancel context.CancelFunc
}
func NewSendMessageRoom(ctx context.Context, cancel context.CancelFunc, msgFactory func() message.Message, roomid types.RoomID, interval time.Duration) *SendMessageRoom {
return &SendMessageRoom{
ctx: ctx,
cancel: cancel,
msgFactory: msgFactory,
roomid: roomid,
interval: interval,
done: make(chan struct{}),
}
}
func (p *SendMessageRoom) Process(r interfaces.CanGetRoom, s interfaces.State) error {
ticker := time.NewTicker(p.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := p.process(r, s); err != nil {
fmt.Println("error while processing SendMessageRoom process; err: ", err.Error())
}
case <-p.ctx.Done():
return nil
}
}
}
func (p *SendMessageRoom) ProcessBackground(r interfaces.CanGetRoom, s interfaces.State) interfaces.CanProcessBackground {
go func() {
if err := p.Process(r, s); err != nil {
p.mux.Lock()
p.err = err
p.mux.Unlock()
p.done <- struct{}{}
fmt.Println(p.err)
}
}()
return p
}
func (p *SendMessageRoom) Wait() error {
<-p.done
p.mux.RLock()
defer p.mux.RUnlock()
return p.err
}
func (p *SendMessageRoom) Stop() {
p.cancel()
}
func (p *SendMessageRoom) process(r interfaces.CanGetRoom, _ interfaces.State) error {
room, err := r.GetRoom(p.roomid)
if err != nil {
return err
}
participants := room.GetParticipants()
w, ok := room.(interfaces.CanWriteRoomMessage)
if !ok {
return errors.ErrInterfaceMisMatch
}
merr := util.NewMultiError()
for _, participant := range participants {
if err := w.WriteRoomMessage(p.roomid, p.msgFactory(), "", participant); err != nil {
merr.Add(err)
}
}
return merr.ErrorOrNil()
}

View File

@@ -0,0 +1,57 @@
package process
import (
"context"
"fmt"
"time"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
)
type WaitUntilIdent struct {
ctx context.Context
duration time.Duration
}
type WaitUntilIdentOption func(*WaitUntilIdent)
func WithTickerDuration(duration time.Duration) WaitUntilIdentOption {
return WaitUntilIdentOption(func(ident *WaitUntilIdent) {
ident.duration = duration
})
}
func NewWaitUntilIdentComplete(ctx context.Context, options ...WaitUntilIdentOption) *WaitUntilIdent {
i := &WaitUntilIdent{
ctx: ctx,
duration: 500 * time.Millisecond,
}
for _, option := range options {
option(i)
}
return i
}
func (p WaitUntilIdent) Process(_ interfaces.CanGetRoom, s interfaces.State) error {
ticker := time.NewTicker(p.duration)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if p.process(s) {
return nil
}
case <-p.ctx.Done():
return fmt.Errorf("error while processing WaitUntilIdent process; err: %s", errors.ErrContextCancelled.Error())
}
}
}
func (p WaitUntilIdent) process(s interfaces.State) bool {
_, err := s.GetClientID()
return err == nil
}

View File

@@ -8,6 +8,8 @@ import (
"github.com/harshabose/socket-comm/pkg/message" "github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors" "github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/messages"
"github.com/harshabose/socket-comm/pkg/middleware/chat/process"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types" "github.com/harshabose/socket-comm/pkg/middleware/chat/types"
) )
@@ -44,12 +46,11 @@ func (m *Manager) CreateRoom(id types.RoomID, allowed []types.ClientID, ttl time
m.rooms[id] = room m.rooms[id] = room
go func() { // NOTE: THE FOLLOWING STARTS A BACKGROUND PROCESS WHICH WAITS UNTIL TTL AND KILLS THE ROOM. THIS DOES NOT KILL CONNECTION
if err := m.Process(NewDeleteRoomWaiter(ctx, m, id, ttl), nil); err != nil { _ = m.ProcessBackground(process.NewDeleteRoomWaiter(ctx, cancel, m, id, ttl), nil)
fmt.Println(err.Error())
}
}() // TODO: DO I NEED THIS?
// NOTE: THE FOLLOWING STARTS A BACKGROUND PROCESS WHICH CONSTANTLY SENDS THE GIVEN MESSAGE TO EVERY PARTICIPANT IN THE ROOM
_ = m.ProcessBackground(process.NewSendMessageRoom(ctx, cancel, messages.NewRequestHealthFactory(id), id, 5*time.Second), nil)
return room, nil return room, nil
} }
@@ -77,10 +78,11 @@ func (m *Manager) DeleteRoom(id types.RoomID) error {
return fmt.Errorf("error while deleting room with id: %s; err: %s", id, err.Error()) return fmt.Errorf("error while deleting room with id: %s; err: %s", id, err.Error())
} }
delete(m.rooms, id)
return nil return nil
} }
func (m *Manager) WriteMessage(roomid types.RoomID, msg message.Message, from types.ClientID, tos ...types.ClientID) error { func (m *Manager) WriteRoomMessage(roomid types.RoomID, msg message.Message, from types.ClientID, tos ...types.ClientID) error {
room, err := m.GetRoom(roomid) room, err := m.GetRoom(roomid)
if err != nil { if err != nil {
return err return err
@@ -91,9 +93,13 @@ func (m *Manager) WriteMessage(roomid types.RoomID, msg message.Message, from ty
return errors.ErrInterfaceMisMatch return errors.ErrInterfaceMisMatch
} }
return w.WriteMessage(roomid, msg, from, tos...) return w.WriteRoomMessage(roomid, msg, from, tos...)
} }
func (m *Manager) Process(process interfaces.CanProcess, state interfaces.State) error { func (m *Manager) Process(process interfaces.CanProcess, state interfaces.State) error {
return process.Process(m, state) return process.Process(m, state)
} }
func (m *Manager) ProcessBackground(process interfaces.CanProcessBackground, state interfaces.State) interfaces.CanProcessBackground {
return process.ProcessBackground(m, state)
}

View File

@@ -1,59 +0,0 @@
package room
import (
"context"
"fmt"
"time"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
type DeleteRoomWaiter struct {
canDelete interfaces.CanDeleteRoom
ttl time.Duration
roomid types.RoomID
ctx context.Context
}
func NewDeleteRoomWaiter(ctx context.Context, manager interfaces.CanDeleteRoom, roomid types.RoomID, ttl time.Duration) *DeleteRoomWaiter {
return &DeleteRoomWaiter{
ctx: ctx,
roomid: roomid,
canDelete: manager,
ttl: ttl,
}
}
func (p DeleteRoomWaiter) Process(r interfaces.CanGetRoom, _ interfaces.State) error {
timer := time.NewTimer(p.ttl)
defer timer.Stop()
for {
select {
case <-timer.C:
if err := p.process(r); err != nil {
return fmt.Errorf("error while processing DeleteRoomWaiter process; err: %s", err.Error())
}
case <-p.ctx.Done():
return fmt.Errorf("context cancelled before process completion")
}
}
}
func (p DeleteRoomWaiter) process(r interfaces.CanGetRoom) error {
room, err := r.GetRoom(p.roomid)
if err != nil {
return fmt.Errorf("error while processing DelteRoomWaiter process; err: %s", err.Error())
}
if err := room.Close(); err != nil {
return err
}
if err := p.canDelete.DeleteRoom(p.roomid); err != nil {
return err
}
return nil
}

View File

@@ -43,6 +43,10 @@ func (r *Room) Add(roomid types.RoomID, s interfaces.State) error {
return err return err
} }
select {
case <-r.ctx.Done():
return fmt.Errorf("error while adding client to room. client ID: %s; room ID: %s; err: %s", id, r.roomid, errors.ErrContextCancelled.Error())
default:
if !r.isAllowed(id) { if !r.isAllowed(id) {
return fmt.Errorf("error while adding client to room. client ID: %s; room ID: %s; err: %s", id, r.roomid, errors.ErrClientNotAllowedInRoom.Error()) return fmt.Errorf("error while adding client to room. client ID: %s; room ID: %s; err: %s", id, r.roomid, errors.ErrClientNotAllowedInRoom.Error())
} }
@@ -53,9 +57,14 @@ func (r *Room) Add(roomid types.RoomID, s interfaces.State) error {
r.participants[id] = s r.participants[id] = s
return nil return nil
}
} }
func (r *Room) isAllowed(id types.ClientID) bool { func (r *Room) isAllowed(id types.ClientID) bool {
select {
case <-r.ctx.Done():
return false
default:
if len(r.allowed) == 0 { if len(r.allowed) == 0 {
return true return true
} }
@@ -67,9 +76,10 @@ func (r *Room) isAllowed(id types.ClientID) bool {
} }
return false return false
}
} }
func (r *Room) forEach(f func(id types.ClientID) bool, ids ...types.ClientID) bool { func (r *Room) forEachBoolean(f func(id types.ClientID) bool, ids ...types.ClientID) bool {
if len(ids) == 0 { if len(ids) == 0 {
return false return false
} }
@@ -93,6 +103,10 @@ func (r *Room) Remove(roomid types.RoomID, s interfaces.State) error {
return err return err
} }
select {
case <-r.ctx.Done():
return fmt.Errorf("error while removing client to room. client ID: %s; room ID: %s; err: %s", id, r.roomid, errors.ErrContextCancelled.Error())
default:
if !r.isAllowed(id) { if !r.isAllowed(id) {
return fmt.Errorf("error while removing client to room. client ID: %s; room ID: %s; err: %s", id, r.roomid, errors.ErrClientNotAllowedInRoom.Error()) return fmt.Errorf("error while removing client to room. client ID: %s; room ID: %s; err: %s", id, r.roomid, errors.ErrClientNotAllowedInRoom.Error())
} }
@@ -103,27 +117,37 @@ func (r *Room) Remove(roomid types.RoomID, s interfaces.State) error {
delete(r.participants, id) delete(r.participants, id)
return nil return nil
}
} }
func (r *Room) isParticipant(id types.ClientID) bool { func (r *Room) isParticipant(id types.ClientID) bool {
select {
case <-r.ctx.Done():
return false
default:
_, exists := r.participants[id] _, exists := r.participants[id]
return exists return exists
}
} }
func (r *Room) WriteMessage(roomid types.RoomID, msg message.Message, from types.ClientID, tos ...types.ClientID) error { func (r *Room) WriteRoomMessage(roomid types.RoomID, msg message.Message, from types.ClientID, tos ...types.ClientID) error {
select {
case <-r.ctx.Done():
return fmt.Errorf("error while sending message to peer in room; err: %s", errors.ErrContextCancelled.Error())
default:
if roomid != r.roomid { if roomid != r.roomid {
return errors.ErrWrongRoom return errors.ErrWrongRoom
} }
if len(tos) == 0 { if len(tos) == 0 {
return fmt.Errorf("atleast one receiver is need to use 'WriteMessage' message") return fmt.Errorf("atleast one receiver is need to use 'WriteRoomMessage' message")
} }
if !r.forEach(r.isAllowed, append(tos, from)...) { if !r.forEachBoolean(r.isAllowed, append(tos, from)...) {
return errors.ErrClientNotAllowedInRoom return errors.ErrClientNotAllowedInRoom
} }
if !r.forEach(r.isParticipant, append(tos, from)...) { if !r.forEachBoolean(r.isParticipant, append(tos, from)...) {
return errors.ErrClientNotAParticipant return errors.ErrClientNotAParticipant
} }
@@ -139,8 +163,25 @@ func (r *Room) WriteMessage(roomid types.RoomID, msg message.Message, from types
} }
return nil return nil
}
}
func (r *Room) GetParticipants() []types.ClientID {
select {
case <-r.ctx.Done():
return make([]types.ClientID, 0) // EMPTY LIST
default:
clients := make([]types.ClientID, 0)
for id, _ := range r.participants {
clients = append(clients, id)
}
return clients
}
} }
func (r *Room) Close() error { func (r *Room) Close() error {
r.cancel()
r.participants = make(map[types.ClientID]interfaces.State)
r.allowed = make([]types.ClientID, 0)
return nil return nil
} }

View File

@@ -0,0 +1,94 @@
package chat
import (
"context"
"fmt"
"time"
"github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/process"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types"
)
type ServerInterceptor struct {
commonInterceptor
interceptor.Interceptor
rooms interfaces.RoomManager
}
func (i *ServerInterceptor) BindSocketConnection(connection interceptor.Connection, writer interceptor.Writer, reader interceptor.Reader) (interceptor.Writer, interceptor.Reader, error) {
return i.commonInterceptor.BindSocketConnection(connection, writer, reader)
}
func (i *ServerInterceptor) Init(connection interceptor.Connection) error {
s, err := i.GetState(connection)
if err != nil {
return fmt.Errorf("error while init; err: %s", err.Error())
}
w, ok := s.(interfaces.CanWriteMessage)
if !ok {
return errors.ErrInterfaceMisMatch
}
// TODO: SEND IDENT MESSAGE
if err := w.Write(); err != nil {
return fmt.Errorf("error while init; err: %s", err.Error())
}
ctx, cancel := context.WithTimeout(s.Ctx(), 10*time.Second)
defer cancel()
p := process.NewWaitUntilIdentComplete(ctx, process.WithTickerDuration(500*time.Millisecond))
if err := p.Process(nil, s); err != nil {
return fmt.Errorf("error while init; err: %s", err.Error())
}
return nil
}
func (i *ServerInterceptor) UnBindSocketConnection(connection interceptor.Connection) {
}
func (i *ServerInterceptor) Close() error {
return nil
}
func (i *ServerInterceptor) CreateRoom(id types.RoomID, allowed []types.ClientID, ttl time.Duration) (interfaces.Room, error) {
return i.rooms.CreateRoom(id, allowed, ttl)
}
func (i *ServerInterceptor) DeleteRoom(id types.RoomID) error {
return i.rooms.DeleteRoom(id)
}
func (i *ServerInterceptor) WriteRoomMessage(roomid types.RoomID, msg message.Message, from types.ClientID, tos ...types.ClientID) error {
w, ok := i.rooms.(interfaces.CanWriteRoomMessage)
if !ok {
return errors.ErrInterfaceMisMatch
}
return w.WriteRoomMessage(roomid, msg, from, tos...)
}
func (i *ServerInterceptor) Add(id types.RoomID, s interfaces.State) error {
a, ok := i.rooms.(interfaces.CanAdd)
if !ok {
return errors.ErrInterfaceMisMatch
}
return a.Add(id, s)
}
func (i *ServerInterceptor) Remove(id types.RoomID, s interfaces.State) error {
r, ok := i.rooms.(interfaces.CanRemove)
if !ok {
return errors.ErrInterfaceMisMatch
}
return r.Remove(id, s)
}

View File

@@ -5,7 +5,6 @@ import (
"github.com/harshabose/socket-comm/pkg/interceptor" "github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message" "github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat/config"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors" "github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces" "github.com/harshabose/socket-comm/pkg/middleware/chat/interfaces"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types" "github.com/harshabose/socket-comm/pkg/middleware/chat/types"
@@ -16,15 +15,13 @@ type state struct {
connection interceptor.Connection connection interceptor.Connection
writer interceptor.Writer writer interceptor.Writer
reader interceptor.Reader reader interceptor.Reader
config config.Config
cancel context.CancelFunc cancel context.CancelFunc
ctx context.Context ctx context.Context
} }
func NewState(ctx context.Context, cancel context.CancelFunc, config config.Config, connection interceptor.Connection, writer interceptor.Writer, reader interceptor.Reader) interfaces.State { func NewState(ctx context.Context, cancel context.CancelFunc, connection interceptor.Connection, writer interceptor.Writer, reader interceptor.Reader) interfaces.State {
return &state{ return &state{
id: types.UnKnownClient, id: types.UnKnownClient,
config: config,
connection: connection, connection: connection,
writer: writer, writer: writer,
reader: reader, reader: reader,
@@ -33,6 +30,10 @@ func NewState(ctx context.Context, cancel context.CancelFunc, config config.Conf
} }
} }
func (s *state) Ctx() context.Context {
return s.ctx
}
func (s *state) GetClientID() (types.ClientID, error) { func (s *state) GetClientID() (types.ClientID, error) {
if s.id == types.UnKnownClient { if s.id == types.UnKnownClient {
return s.id, errors.ErrUnknownClientIDState return s.id, errors.ErrUnknownClientIDState
@@ -45,6 +46,11 @@ func (s *state) Write(msg message.Message) error {
return s.writer.Write(s.connection, msg) return s.writer.Write(s.connection, msg)
} }
func (s *state) GetConfig() config.Config { func (s *state) SetClientID(id types.ClientID) error {
return s.config if s.id != types.UnKnownClient {
return errors.ErrClientIDNotConsistent
}
s.id = id
return nil
} }