From 812110b775e0184e0fc4e5c189f926e4a7d98c27 Mon Sep 17 00:00:00 2001 From: harshabose Date: Fri, 23 May 2025 11:40:50 +0530 Subject: [PATCH] almost done with chat interceptor. Implement message registories mechanism --- pkg/interceptor/interceptor.go | 7 ++++--- pkg/middleware/chat/common_interceptor.go | 4 ++-- pkg/middleware/chat/factory.go | 15 +++++++++++---- pkg/middleware/chat/messages/forwarded.go | 2 +- .../chat/messages/ident_messages.go | 2 +- pkg/middleware/chat/messages/join_room.go | 2 +- pkg/middleware/chat/messages/leave_room.go | 2 +- .../chat/messages/start_health_tracking.go | 2 +- .../start_streaming_health_snapshot.go | 2 +- .../stop_streaming_health_snapshot.go | 2 +- ..._create_room.go => success_create_room.go} | 2 +- .../chat/messages/success_delete_room.go | 5 ++--- .../chat/messages/success_leave_room.go | 2 +- .../success_start_health_streaming.go | 18 +++++++++++++++++- .../messages/success_start_health_tracking.go | 5 ++--- .../messages/success_stop_health_streaming.go | 19 ++++++++++++++++++- .../messages/success_stop_health_tracking.go | 7 +++---- .../chat/messages/sucess_leave_room.go | 1 - pkg/middleware/chat/messages/to_forward.go | 2 +- 19 files changed, 69 insertions(+), 32 deletions(-) rename pkg/middleware/chat/messages/{sucess_create_room.go => success_create_room.go} (94%) delete mode 100644 pkg/middleware/chat/messages/sucess_leave_room.go diff --git a/pkg/interceptor/interceptor.go b/pkg/interceptor/interceptor.go index 36d5554..902b9d8 100644 --- a/pkg/interceptor/interceptor.go +++ b/pkg/interceptor/interceptor.go @@ -9,6 +9,7 @@ import ( type Registry struct { factories []Factory + messages message.Registry } func NewRegistry() *Registry { @@ -21,14 +22,14 @@ func (registry *Registry) Register(factory Factory) { registry.factories = append(registry.factories, factory) } -func (registry *Registry) Build(ctx context.Context, id string) (Interceptor, error) { +func (registry *Registry) Build(ctx context.Context, id ClientID) (Interceptor, error) { if len(registry.factories) == 0 { return &NoOpInterceptor{}, nil } interceptors := make([]Interceptor, 0) for _, factory := range registry.factories { - interceptor, err := factory.NewInterceptor(ctx, id) + interceptor, err := factory.NewInterceptor(ctx, id, registry.messages) if err != nil { return nil, err } @@ -40,7 +41,7 @@ func (registry *Registry) Build(ctx context.Context, id string) (Interceptor, er } type Factory interface { - NewInterceptor(context.Context, string) (Interceptor, error) + NewInterceptor(context.Context, ClientID, message.Registry) (Interceptor, error) } type Connection interface { diff --git a/pkg/middleware/chat/common_interceptor.go b/pkg/middleware/chat/common_interceptor.go index ea22783..f17ff74 100644 --- a/pkg/middleware/chat/common_interceptor.go +++ b/pkg/middleware/chat/common_interceptor.go @@ -41,7 +41,7 @@ func (i *commonInterceptor) InterceptSocketWriter(writer interceptor.Writer) int return writer.Write(ctx, connection, msg) } - next, err := m.GetNext(i.writeProcessMessages) + next, err := m.GetNext(i.GetMessageRegistry()) if err != nil { return writer.Write(ctx, connection, msg) } @@ -74,7 +74,7 @@ func (i *commonInterceptor) InterceptSocketReader(reader interceptor.Reader) int return msg, interceptor.ErrInterfaceMisMatch } - next, err := m.GetNext(i.readProcessMessages) + next, err := m.GetNext(i.GetMessageRegistry()) if err != nil { return msg, nil } diff --git a/pkg/middleware/chat/factory.go b/pkg/middleware/chat/factory.go index ebccebf..934977b 100644 --- a/pkg/middleware/chat/factory.go +++ b/pkg/middleware/chat/factory.go @@ -6,7 +6,6 @@ import ( "github.com/harshabose/socket-comm/pkg/interceptor" "github.com/harshabose/socket-comm/pkg/message" - "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" "github.com/harshabose/socket-comm/pkg/middleware/chat/processors" "github.com/harshabose/socket-comm/pkg/middleware/chat/state" ) @@ -26,7 +25,7 @@ func NewInterceptorFactory(options ...Option) *InterceptorFactory { func WithServerInterceptor(i interceptor.Interceptor) error { c, ok := i.(*commonInterceptor) if !ok { - return fmt.Errorf("can only convert common chat interceptor to client chat interceptor; err: %s", errors.ErrInterfaceMisMatch.Error()) + return fmt.Errorf("can only convert common chat interceptor to client chat interceptor; err: %s", interceptor.ErrInterfaceMisMatch.Error()) } i = &ServerInterceptor{ @@ -35,13 +34,15 @@ func WithServerInterceptor(i interceptor.Interceptor) error { Health: processors.NewHealthProcessor(c.Ctx()), } + // TODO: add server messages to the registry + return nil } func WithClientInterceptor(i interceptor.Interceptor) error { c, ok := i.(*commonInterceptor) if !ok { - return fmt.Errorf("can only convert common chat interceptor to client chat interceptor; err: %s", errors.ErrInterfaceMisMatch.Error()) + return fmt.Errorf("can only convert common chat interceptor to client chat interceptor; err: %s", interceptor.ErrInterfaceMisMatch.Error()) } i = &ClientInterceptor{ @@ -49,10 +50,12 @@ func WithClientInterceptor(i interceptor.Interceptor) error { Health: processors.NewHealthProcessor(c.Ctx()), } + // TODO: add client messages to the registry + return nil } -func (f *InterceptorFactory) NewInterceptor(ctx context.Context, id string, registry message.Registry) (interceptor.Interceptor, error) { +func (f *InterceptorFactory) NewInterceptor(ctx context.Context, id interceptor.ClientID, registry message.Registry) (interceptor.Interceptor, error) { i := &commonInterceptor{ NoOpInterceptor: interceptor.NewNoOpInterceptor(ctx, id, registry), readProcessMessages: message.NewDefaultRegistry(), @@ -60,11 +63,15 @@ func (f *InterceptorFactory) NewInterceptor(ctx context.Context, id string, regi states: state.NewManager(), } + // TODO: add common messages to the registry + for _, option := range f.options { if err := option(i); err != nil { return nil, err } } + // TODO: copy readProcessMessages and writeProcessMessages to registry + return i, nil } diff --git a/pkg/middleware/chat/messages/forwarded.go b/pkg/middleware/chat/messages/forwarded.go index d36a1ff..63c1030 100644 --- a/pkg/middleware/chat/messages/forwarded.go +++ b/pkg/middleware/chat/messages/forwarded.go @@ -8,7 +8,7 @@ import ( "github.com/harshabose/socket-comm/pkg/middleware/chat" ) -var ForwardedMessageProtocol message.Protocol = "room:forwarded_message" +const ForwardedMessageProtocol message.Protocol = "room:forwarded_message" type ForwardedMessage struct { interceptor.BaseMessage diff --git a/pkg/middleware/chat/messages/ident_messages.go b/pkg/middleware/chat/messages/ident_messages.go index dc91d1d..038d564 100644 --- a/pkg/middleware/chat/messages/ident_messages.go +++ b/pkg/middleware/chat/messages/ident_messages.go @@ -9,7 +9,7 @@ import ( "github.com/harshabose/socket-comm/pkg/middleware/chat" ) -var ( +const ( IdentProtocol message.Protocol = "room:ident" IdentResponseProtocol message.Protocol = "room:ident_response" ) diff --git a/pkg/middleware/chat/messages/join_room.go b/pkg/middleware/chat/messages/join_room.go index d766ca4..a14ea12 100644 --- a/pkg/middleware/chat/messages/join_room.go +++ b/pkg/middleware/chat/messages/join_room.go @@ -9,7 +9,7 @@ import ( "github.com/harshabose/socket-comm/pkg/middleware/chat/process" ) -var JoinRoomProtocol message.Protocol = "room:join_room" +const JoinRoomProtocol message.Protocol = "room:join_room" type JoinRoom struct { interceptor.BaseMessage diff --git a/pkg/middleware/chat/messages/leave_room.go b/pkg/middleware/chat/messages/leave_room.go index 31c1a70..4d0483b 100644 --- a/pkg/middleware/chat/messages/leave_room.go +++ b/pkg/middleware/chat/messages/leave_room.go @@ -9,7 +9,7 @@ import ( "github.com/harshabose/socket-comm/pkg/middleware/chat/process" ) -var LeaveRoomProtocol message.Protocol = "room:leave_room" +const LeaveRoomProtocol message.Protocol = "room:leave_room" type LeaveRoom struct { interceptor.BaseMessage diff --git a/pkg/middleware/chat/messages/start_health_tracking.go b/pkg/middleware/chat/messages/start_health_tracking.go index c1c3bd8..1f4338a 100644 --- a/pkg/middleware/chat/messages/start_health_tracking.go +++ b/pkg/middleware/chat/messages/start_health_tracking.go @@ -12,7 +12,7 @@ import ( "github.com/harshabose/socket-comm/pkg/middleware/chat/types" ) -var MarkRoomForHealthTrackingProtocol message.Protocol = "chat:track_health" +const MarkRoomForHealthTrackingProtocol message.Protocol = "chat:track_health" // StartHealthTracking is sent by an interested client (who wants the stats of the whole room) to start tracking the health status. // This does not send the stats data to the interested client; this just tells the server to track the data. diff --git a/pkg/middleware/chat/messages/start_streaming_health_snapshot.go b/pkg/middleware/chat/messages/start_streaming_health_snapshot.go index 041de00..f6a9f83 100644 --- a/pkg/middleware/chat/messages/start_streaming_health_snapshot.go +++ b/pkg/middleware/chat/messages/start_streaming_health_snapshot.go @@ -12,7 +12,7 @@ import ( "github.com/harshabose/socket-comm/pkg/middleware/chat/types" ) -var GetHealthSnapshotProtocol message.Protocol = "chat:get_health_snapshot" +const GetHealthSnapshotProtocol message.Protocol = "chat:get_health_snapshot" type StartStreamingHealthSnapshots struct { interceptor.BaseMessage diff --git a/pkg/middleware/chat/messages/stop_streaming_health_snapshot.go b/pkg/middleware/chat/messages/stop_streaming_health_snapshot.go index 805dd50..4c1dd1c 100644 --- a/pkg/middleware/chat/messages/stop_streaming_health_snapshot.go +++ b/pkg/middleware/chat/messages/stop_streaming_health_snapshot.go @@ -9,7 +9,7 @@ import ( "github.com/harshabose/socket-comm/pkg/middleware/chat/process" ) -var StopStreamingHealthSnapshotProtocol message.Protocol = "chat:stop_streaming_health_snapshot" +const StopStreamingHealthSnapshotProtocol message.Protocol = "chat:stop_streaming_health_snapshot" type StopStreamingHealthSnapshot struct { interceptor.BaseMessage diff --git a/pkg/middleware/chat/messages/sucess_create_room.go b/pkg/middleware/chat/messages/success_create_room.go similarity index 94% rename from pkg/middleware/chat/messages/sucess_create_room.go rename to pkg/middleware/chat/messages/success_create_room.go index f11b23f..e43b652 100644 --- a/pkg/middleware/chat/messages/sucess_create_room.go +++ b/pkg/middleware/chat/messages/success_create_room.go @@ -9,7 +9,7 @@ import ( "github.com/harshabose/socket-comm/pkg/middleware/chat/types" ) -var SuccessCreateRoomProtocol message.Protocol = "room:success_create_room" +const SuccessCreateRoomProtocol message.Protocol = "room:success_create_room" // SuccessCreateRoom is the message sent by the server to the client after successful creation of the requested room. // This marks the end of the CreateRoom topic. diff --git a/pkg/middleware/chat/messages/success_delete_room.go b/pkg/middleware/chat/messages/success_delete_room.go index 2410e1f..818760c 100644 --- a/pkg/middleware/chat/messages/success_delete_room.go +++ b/pkg/middleware/chat/messages/success_delete_room.go @@ -6,11 +6,10 @@ import ( "github.com/harshabose/socket-comm/pkg/interceptor" "github.com/harshabose/socket-comm/pkg/message" "github.com/harshabose/socket-comm/pkg/middleware/chat" - "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" "github.com/harshabose/socket-comm/pkg/middleware/chat/types" ) -var SuccessDeleteRoomProtocol message.Protocol = "room:success_delete_room" +const SuccessDeleteRoomProtocol message.Protocol = "room:success_delete_room" type SuccessDeleteRoom struct { interceptor.BaseMessage @@ -44,7 +43,7 @@ func (m *SuccessDeleteRoom) GetProtocol() message.Protocol { func (m *SuccessDeleteRoom) ReadProcess(_ context.Context, _i interceptor.Interceptor, _ interceptor.Connection) error { _, ok := _i.(*chat.ClientInterceptor) if !ok { - return errors.ErrInterfaceMisMatch + return interceptor.ErrInterfaceMisMatch } // NOTE: INTENTIONALLY EMPTY diff --git a/pkg/middleware/chat/messages/success_leave_room.go b/pkg/middleware/chat/messages/success_leave_room.go index a3f0607..33e0794 100644 --- a/pkg/middleware/chat/messages/success_leave_room.go +++ b/pkg/middleware/chat/messages/success_leave_room.go @@ -9,7 +9,7 @@ import ( "github.com/harshabose/socket-comm/pkg/middleware/chat/types" ) -var SuccessLeaveRoomProtocol message.Protocol = "room:success_leave_room" +const SuccessLeaveRoomProtocol message.Protocol = "room:success_leave_room" type SuccessLeaveRoom struct { interceptor.BaseMessage diff --git a/pkg/middleware/chat/messages/success_start_health_streaming.go b/pkg/middleware/chat/messages/success_start_health_streaming.go index 0984429..7699344 100644 --- a/pkg/middleware/chat/messages/success_start_health_streaming.go +++ b/pkg/middleware/chat/messages/success_start_health_streaming.go @@ -1,15 +1,17 @@ package messages import ( + "context" "time" "github.com/harshabose/socket-comm/pkg/interceptor" "github.com/harshabose/socket-comm/pkg/message" + "github.com/harshabose/socket-comm/pkg/middleware/chat" "github.com/harshabose/socket-comm/pkg/middleware/chat/process" "github.com/harshabose/socket-comm/pkg/middleware/chat/types" ) -var SuccessStartHealthStreamingProtocol message.Protocol = "chat:success_start_health_streaming" +const SuccessStartHealthStreamingProtocol message.Protocol = "chat:success_start_health_streaming" type SuccessStartHealthStreaming struct { interceptor.BaseMessage @@ -38,3 +40,17 @@ func NewSuccessStartHealthStreamingMessageFactory(id types.RoomID, allowed []int func (m *SuccessStartHealthStreaming) GetProtocol() message.Protocol { return SuccessStartHealthStreamingProtocol } + +func (m *SuccessStartHealthStreaming) ReadProcess(ctx context.Context, _i interceptor.Interceptor, _ interceptor.Connection) error { + i, ok := _i.(*chat.ClientInterceptor) + if !ok { + return interceptor.ErrInvalidInterceptor + } + + if err := i.Health.Process(ctx, m, nil); err != nil { + return err + } + + return nil + // NOTE: NO SUCCESS TRAIL MESSAGE +} diff --git a/pkg/middleware/chat/messages/success_start_health_tracking.go b/pkg/middleware/chat/messages/success_start_health_tracking.go index ee048c2..9d6945f 100644 --- a/pkg/middleware/chat/messages/success_start_health_tracking.go +++ b/pkg/middleware/chat/messages/success_start_health_tracking.go @@ -6,11 +6,10 @@ import ( "github.com/harshabose/socket-comm/pkg/interceptor" "github.com/harshabose/socket-comm/pkg/message" "github.com/harshabose/socket-comm/pkg/middleware/chat" - "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" "github.com/harshabose/socket-comm/pkg/middleware/chat/types" ) -var SuccessTrackHealthInRoomProtocol message.Protocol = "chat:success_track_health" +const SuccessTrackHealthInRoomProtocol message.Protocol = "chat:success_track_health" type SuccessTrackHealthInRoom struct { interceptor.BaseMessage @@ -43,7 +42,7 @@ func NewSuccessTrackHealthInRoomMessageFactory(id types.RoomID) func() (message. func (m *SuccessTrackHealthInRoom) ReadProcess(_ context.Context, _i interceptor.Interceptor, _ interceptor.Connection) error { _, ok := _i.(*chat.ServerInterceptor) if !ok { - return errors.ErrInterfaceMisMatch + return interceptor.ErrInterfaceMisMatch } // NOTE: INTENTIONALLY EMPTY diff --git a/pkg/middleware/chat/messages/success_stop_health_streaming.go b/pkg/middleware/chat/messages/success_stop_health_streaming.go index aaafc53..4a3ea53 100644 --- a/pkg/middleware/chat/messages/success_stop_health_streaming.go +++ b/pkg/middleware/chat/messages/success_stop_health_streaming.go @@ -1,13 +1,16 @@ package messages import ( + "context" + "github.com/harshabose/socket-comm/pkg/interceptor" "github.com/harshabose/socket-comm/pkg/message" + "github.com/harshabose/socket-comm/pkg/middleware/chat" "github.com/harshabose/socket-comm/pkg/middleware/chat/process" "github.com/harshabose/socket-comm/pkg/middleware/chat/types" ) -var SuccessStopHealthStreamingProtocol message.Protocol = "chat:success_stop_health_streaming" +const SuccessStopHealthStreamingProtocol message.Protocol = "chat:success_stop_health_streaming" type SuccessStopHealthStreaming struct { interceptor.BaseMessage @@ -36,3 +39,17 @@ func NewSuccessStopHealthStreamingMessageFactory(id types.RoomID) func() (messag func (m *SuccessStopHealthStreaming) GetProtocol() message.Protocol { return SuccessStopHealthStreamingProtocol } + +func (m *SuccessStopHealthStreaming) ReadProcess(ctx context.Context, _i interceptor.Interceptor, _ interceptor.Connection) error { + i, ok := _i.(*chat.ClientInterceptor) + if !ok { + return interceptor.ErrInvalidInterceptor + } + + if err := i.Health.Process(ctx, m, nil); err != nil { + return err + } + + return nil + // NOTE: NO SUCCESS TRAIL MESSAGES +} diff --git a/pkg/middleware/chat/messages/success_stop_health_tracking.go b/pkg/middleware/chat/messages/success_stop_health_tracking.go index d9ce107..4ca3776 100644 --- a/pkg/middleware/chat/messages/success_stop_health_tracking.go +++ b/pkg/middleware/chat/messages/success_stop_health_tracking.go @@ -6,11 +6,10 @@ import ( "github.com/harshabose/socket-comm/pkg/interceptor" "github.com/harshabose/socket-comm/pkg/message" "github.com/harshabose/socket-comm/pkg/middleware/chat" - "github.com/harshabose/socket-comm/pkg/middleware/chat/errors" "github.com/harshabose/socket-comm/pkg/middleware/chat/types" ) -var SuccessUnmarkRoomForHealthTrackingProtocol message.Protocol = "chat:success_untrack_health" +const SuccessUnmarkRoomForHealthTrackingProtocol message.Protocol = "chat:success_untrack_health" type SuccessUnmarkRoomForHealthTracking struct { interceptor.BaseMessage @@ -41,10 +40,10 @@ func NewSuccessUntrackHealthInRoomMessageFactory(id types.RoomID) func() (messag } } -func (m *SuccessUnmarkRoomForHealthTracking) ReadProcess(ctx context.Context, _i interceptor.Interceptor, _ interceptor.Connection) error { +func (m *SuccessUnmarkRoomForHealthTracking) ReadProcess(_ context.Context, _i interceptor.Interceptor, _ interceptor.Connection) error { _, ok := _i.(*chat.ClientInterceptor) if !ok { - return errors.ErrInvalidInterceptor + return interceptor.ErrInvalidInterceptor } // NOTE: INTENTIONALLY EMPTY diff --git a/pkg/middleware/chat/messages/sucess_leave_room.go b/pkg/middleware/chat/messages/sucess_leave_room.go deleted file mode 100644 index 38640a1..0000000 --- a/pkg/middleware/chat/messages/sucess_leave_room.go +++ /dev/null @@ -1 +0,0 @@ -package messages diff --git a/pkg/middleware/chat/messages/to_forward.go b/pkg/middleware/chat/messages/to_forward.go index e08418c..5ed9368 100644 --- a/pkg/middleware/chat/messages/to_forward.go +++ b/pkg/middleware/chat/messages/to_forward.go @@ -11,7 +11,7 @@ import ( "github.com/harshabose/socket-comm/pkg/middleware/chat/types" ) -var ForwardMessageProtocol message.Protocol = "room:forward_message" +const ForwardMessageProtocol message.Protocol = "room:forward_message" type ToForward struct { interceptor.BaseMessage