almost done with chat interceptor. Implement message registories mechanism

This commit is contained in:
harshabose
2025-05-23 11:40:50 +05:30
parent c1b249bc1f
commit 812110b775
19 changed files with 69 additions and 32 deletions

View File

@@ -9,6 +9,7 @@ import (
type Registry struct { type Registry struct {
factories []Factory factories []Factory
messages message.Registry
} }
func NewRegistry() *Registry { func NewRegistry() *Registry {
@@ -21,14 +22,14 @@ func (registry *Registry) Register(factory Factory) {
registry.factories = append(registry.factories, factory) registry.factories = append(registry.factories, factory)
} }
func (registry *Registry) Build(ctx context.Context, id string) (Interceptor, error) { func (registry *Registry) Build(ctx context.Context, id ClientID) (Interceptor, error) {
if len(registry.factories) == 0 { if len(registry.factories) == 0 {
return &NoOpInterceptor{}, nil return &NoOpInterceptor{}, nil
} }
interceptors := make([]Interceptor, 0) interceptors := make([]Interceptor, 0)
for _, factory := range registry.factories { for _, factory := range registry.factories {
interceptor, err := factory.NewInterceptor(ctx, id) interceptor, err := factory.NewInterceptor(ctx, id, registry.messages)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -40,7 +41,7 @@ func (registry *Registry) Build(ctx context.Context, id string) (Interceptor, er
} }
type Factory interface { type Factory interface {
NewInterceptor(context.Context, string) (Interceptor, error) NewInterceptor(context.Context, ClientID, message.Registry) (Interceptor, error)
} }
type Connection interface { type Connection interface {

View File

@@ -41,7 +41,7 @@ func (i *commonInterceptor) InterceptSocketWriter(writer interceptor.Writer) int
return writer.Write(ctx, connection, msg) return writer.Write(ctx, connection, msg)
} }
next, err := m.GetNext(i.writeProcessMessages) next, err := m.GetNext(i.GetMessageRegistry())
if err != nil { if err != nil {
return writer.Write(ctx, connection, msg) return writer.Write(ctx, connection, msg)
} }
@@ -74,7 +74,7 @@ func (i *commonInterceptor) InterceptSocketReader(reader interceptor.Reader) int
return msg, interceptor.ErrInterfaceMisMatch return msg, interceptor.ErrInterfaceMisMatch
} }
next, err := m.GetNext(i.readProcessMessages) next, err := m.GetNext(i.GetMessageRegistry())
if err != nil { if err != nil {
return msg, nil return msg, nil
} }

View File

@@ -6,7 +6,6 @@ import (
"github.com/harshabose/socket-comm/pkg/interceptor" "github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message" "github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/processors" "github.com/harshabose/socket-comm/pkg/middleware/chat/processors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/state" "github.com/harshabose/socket-comm/pkg/middleware/chat/state"
) )
@@ -26,7 +25,7 @@ func NewInterceptorFactory(options ...Option) *InterceptorFactory {
func WithServerInterceptor(i interceptor.Interceptor) error { func WithServerInterceptor(i interceptor.Interceptor) error {
c, ok := i.(*commonInterceptor) c, ok := i.(*commonInterceptor)
if !ok { if !ok {
return fmt.Errorf("can only convert common chat interceptor to client chat interceptor; err: %s", errors.ErrInterfaceMisMatch.Error()) return fmt.Errorf("can only convert common chat interceptor to client chat interceptor; err: %s", interceptor.ErrInterfaceMisMatch.Error())
} }
i = &ServerInterceptor{ i = &ServerInterceptor{
@@ -35,13 +34,15 @@ func WithServerInterceptor(i interceptor.Interceptor) error {
Health: processors.NewHealthProcessor(c.Ctx()), Health: processors.NewHealthProcessor(c.Ctx()),
} }
// TODO: add server messages to the registry
return nil return nil
} }
func WithClientInterceptor(i interceptor.Interceptor) error { func WithClientInterceptor(i interceptor.Interceptor) error {
c, ok := i.(*commonInterceptor) c, ok := i.(*commonInterceptor)
if !ok { if !ok {
return fmt.Errorf("can only convert common chat interceptor to client chat interceptor; err: %s", errors.ErrInterfaceMisMatch.Error()) return fmt.Errorf("can only convert common chat interceptor to client chat interceptor; err: %s", interceptor.ErrInterfaceMisMatch.Error())
} }
i = &ClientInterceptor{ i = &ClientInterceptor{
@@ -49,10 +50,12 @@ func WithClientInterceptor(i interceptor.Interceptor) error {
Health: processors.NewHealthProcessor(c.Ctx()), Health: processors.NewHealthProcessor(c.Ctx()),
} }
// TODO: add client messages to the registry
return nil return nil
} }
func (f *InterceptorFactory) NewInterceptor(ctx context.Context, id string, registry message.Registry) (interceptor.Interceptor, error) { func (f *InterceptorFactory) NewInterceptor(ctx context.Context, id interceptor.ClientID, registry message.Registry) (interceptor.Interceptor, error) {
i := &commonInterceptor{ i := &commonInterceptor{
NoOpInterceptor: interceptor.NewNoOpInterceptor(ctx, id, registry), NoOpInterceptor: interceptor.NewNoOpInterceptor(ctx, id, registry),
readProcessMessages: message.NewDefaultRegistry(), readProcessMessages: message.NewDefaultRegistry(),
@@ -60,11 +63,15 @@ func (f *InterceptorFactory) NewInterceptor(ctx context.Context, id string, regi
states: state.NewManager(), states: state.NewManager(),
} }
// TODO: add common messages to the registry
for _, option := range f.options { for _, option := range f.options {
if err := option(i); err != nil { if err := option(i); err != nil {
return nil, err return nil, err
} }
} }
// TODO: copy readProcessMessages and writeProcessMessages to registry
return i, nil return i, nil
} }

View File

@@ -8,7 +8,7 @@ import (
"github.com/harshabose/socket-comm/pkg/middleware/chat" "github.com/harshabose/socket-comm/pkg/middleware/chat"
) )
var ForwardedMessageProtocol message.Protocol = "room:forwarded_message" const ForwardedMessageProtocol message.Protocol = "room:forwarded_message"
type ForwardedMessage struct { type ForwardedMessage struct {
interceptor.BaseMessage interceptor.BaseMessage

View File

@@ -9,7 +9,7 @@ import (
"github.com/harshabose/socket-comm/pkg/middleware/chat" "github.com/harshabose/socket-comm/pkg/middleware/chat"
) )
var ( const (
IdentProtocol message.Protocol = "room:ident" IdentProtocol message.Protocol = "room:ident"
IdentResponseProtocol message.Protocol = "room:ident_response" IdentResponseProtocol message.Protocol = "room:ident_response"
) )

View File

@@ -9,7 +9,7 @@ import (
"github.com/harshabose/socket-comm/pkg/middleware/chat/process" "github.com/harshabose/socket-comm/pkg/middleware/chat/process"
) )
var JoinRoomProtocol message.Protocol = "room:join_room" const JoinRoomProtocol message.Protocol = "room:join_room"
type JoinRoom struct { type JoinRoom struct {
interceptor.BaseMessage interceptor.BaseMessage

View File

@@ -9,7 +9,7 @@ import (
"github.com/harshabose/socket-comm/pkg/middleware/chat/process" "github.com/harshabose/socket-comm/pkg/middleware/chat/process"
) )
var LeaveRoomProtocol message.Protocol = "room:leave_room" const LeaveRoomProtocol message.Protocol = "room:leave_room"
type LeaveRoom struct { type LeaveRoom struct {
interceptor.BaseMessage interceptor.BaseMessage

View File

@@ -12,7 +12,7 @@ import (
"github.com/harshabose/socket-comm/pkg/middleware/chat/types" "github.com/harshabose/socket-comm/pkg/middleware/chat/types"
) )
var MarkRoomForHealthTrackingProtocol message.Protocol = "chat:track_health" const MarkRoomForHealthTrackingProtocol message.Protocol = "chat:track_health"
// StartHealthTracking is sent by an interested client (who wants the stats of the whole room) to start tracking the health status. // StartHealthTracking is sent by an interested client (who wants the stats of the whole room) to start tracking the health status.
// This does not send the stats data to the interested client; this just tells the server to track the data. // This does not send the stats data to the interested client; this just tells the server to track the data.

View File

@@ -12,7 +12,7 @@ import (
"github.com/harshabose/socket-comm/pkg/middleware/chat/types" "github.com/harshabose/socket-comm/pkg/middleware/chat/types"
) )
var GetHealthSnapshotProtocol message.Protocol = "chat:get_health_snapshot" const GetHealthSnapshotProtocol message.Protocol = "chat:get_health_snapshot"
type StartStreamingHealthSnapshots struct { type StartStreamingHealthSnapshots struct {
interceptor.BaseMessage interceptor.BaseMessage

View File

@@ -9,7 +9,7 @@ import (
"github.com/harshabose/socket-comm/pkg/middleware/chat/process" "github.com/harshabose/socket-comm/pkg/middleware/chat/process"
) )
var StopStreamingHealthSnapshotProtocol message.Protocol = "chat:stop_streaming_health_snapshot" const StopStreamingHealthSnapshotProtocol message.Protocol = "chat:stop_streaming_health_snapshot"
type StopStreamingHealthSnapshot struct { type StopStreamingHealthSnapshot struct {
interceptor.BaseMessage interceptor.BaseMessage

View File

@@ -9,7 +9,7 @@ import (
"github.com/harshabose/socket-comm/pkg/middleware/chat/types" "github.com/harshabose/socket-comm/pkg/middleware/chat/types"
) )
var SuccessCreateRoomProtocol message.Protocol = "room:success_create_room" const SuccessCreateRoomProtocol message.Protocol = "room:success_create_room"
// SuccessCreateRoom is the message sent by the server to the client after successful creation of the requested room. // SuccessCreateRoom is the message sent by the server to the client after successful creation of the requested room.
// This marks the end of the CreateRoom topic. // This marks the end of the CreateRoom topic.

View File

@@ -6,11 +6,10 @@ import (
"github.com/harshabose/socket-comm/pkg/interceptor" "github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message" "github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat" "github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types" "github.com/harshabose/socket-comm/pkg/middleware/chat/types"
) )
var SuccessDeleteRoomProtocol message.Protocol = "room:success_delete_room" const SuccessDeleteRoomProtocol message.Protocol = "room:success_delete_room"
type SuccessDeleteRoom struct { type SuccessDeleteRoom struct {
interceptor.BaseMessage interceptor.BaseMessage
@@ -44,7 +43,7 @@ func (m *SuccessDeleteRoom) GetProtocol() message.Protocol {
func (m *SuccessDeleteRoom) ReadProcess(_ context.Context, _i interceptor.Interceptor, _ interceptor.Connection) error { func (m *SuccessDeleteRoom) ReadProcess(_ context.Context, _i interceptor.Interceptor, _ interceptor.Connection) error {
_, ok := _i.(*chat.ClientInterceptor) _, ok := _i.(*chat.ClientInterceptor)
if !ok { if !ok {
return errors.ErrInterfaceMisMatch return interceptor.ErrInterfaceMisMatch
} }
// NOTE: INTENTIONALLY EMPTY // NOTE: INTENTIONALLY EMPTY

View File

@@ -9,7 +9,7 @@ import (
"github.com/harshabose/socket-comm/pkg/middleware/chat/types" "github.com/harshabose/socket-comm/pkg/middleware/chat/types"
) )
var SuccessLeaveRoomProtocol message.Protocol = "room:success_leave_room" const SuccessLeaveRoomProtocol message.Protocol = "room:success_leave_room"
type SuccessLeaveRoom struct { type SuccessLeaveRoom struct {
interceptor.BaseMessage interceptor.BaseMessage

View File

@@ -1,15 +1,17 @@
package messages package messages
import ( import (
"context"
"time" "time"
"github.com/harshabose/socket-comm/pkg/interceptor" "github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message" "github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/process" "github.com/harshabose/socket-comm/pkg/middleware/chat/process"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types" "github.com/harshabose/socket-comm/pkg/middleware/chat/types"
) )
var SuccessStartHealthStreamingProtocol message.Protocol = "chat:success_start_health_streaming" const SuccessStartHealthStreamingProtocol message.Protocol = "chat:success_start_health_streaming"
type SuccessStartHealthStreaming struct { type SuccessStartHealthStreaming struct {
interceptor.BaseMessage interceptor.BaseMessage
@@ -38,3 +40,17 @@ func NewSuccessStartHealthStreamingMessageFactory(id types.RoomID, allowed []int
func (m *SuccessStartHealthStreaming) GetProtocol() message.Protocol { func (m *SuccessStartHealthStreaming) GetProtocol() message.Protocol {
return SuccessStartHealthStreamingProtocol return SuccessStartHealthStreamingProtocol
} }
func (m *SuccessStartHealthStreaming) ReadProcess(ctx context.Context, _i interceptor.Interceptor, _ interceptor.Connection) error {
i, ok := _i.(*chat.ClientInterceptor)
if !ok {
return interceptor.ErrInvalidInterceptor
}
if err := i.Health.Process(ctx, m, nil); err != nil {
return err
}
return nil
// NOTE: NO SUCCESS TRAIL MESSAGE
}

View File

@@ -6,11 +6,10 @@ import (
"github.com/harshabose/socket-comm/pkg/interceptor" "github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message" "github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat" "github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types" "github.com/harshabose/socket-comm/pkg/middleware/chat/types"
) )
var SuccessTrackHealthInRoomProtocol message.Protocol = "chat:success_track_health" const SuccessTrackHealthInRoomProtocol message.Protocol = "chat:success_track_health"
type SuccessTrackHealthInRoom struct { type SuccessTrackHealthInRoom struct {
interceptor.BaseMessage interceptor.BaseMessage
@@ -43,7 +42,7 @@ func NewSuccessTrackHealthInRoomMessageFactory(id types.RoomID) func() (message.
func (m *SuccessTrackHealthInRoom) ReadProcess(_ context.Context, _i interceptor.Interceptor, _ interceptor.Connection) error { func (m *SuccessTrackHealthInRoom) ReadProcess(_ context.Context, _i interceptor.Interceptor, _ interceptor.Connection) error {
_, ok := _i.(*chat.ServerInterceptor) _, ok := _i.(*chat.ServerInterceptor)
if !ok { if !ok {
return errors.ErrInterfaceMisMatch return interceptor.ErrInterfaceMisMatch
} }
// NOTE: INTENTIONALLY EMPTY // NOTE: INTENTIONALLY EMPTY

View File

@@ -1,13 +1,16 @@
package messages package messages
import ( import (
"context"
"github.com/harshabose/socket-comm/pkg/interceptor" "github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message" "github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/process" "github.com/harshabose/socket-comm/pkg/middleware/chat/process"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types" "github.com/harshabose/socket-comm/pkg/middleware/chat/types"
) )
var SuccessStopHealthStreamingProtocol message.Protocol = "chat:success_stop_health_streaming" const SuccessStopHealthStreamingProtocol message.Protocol = "chat:success_stop_health_streaming"
type SuccessStopHealthStreaming struct { type SuccessStopHealthStreaming struct {
interceptor.BaseMessage interceptor.BaseMessage
@@ -36,3 +39,17 @@ func NewSuccessStopHealthStreamingMessageFactory(id types.RoomID) func() (messag
func (m *SuccessStopHealthStreaming) GetProtocol() message.Protocol { func (m *SuccessStopHealthStreaming) GetProtocol() message.Protocol {
return SuccessStopHealthStreamingProtocol return SuccessStopHealthStreamingProtocol
} }
func (m *SuccessStopHealthStreaming) ReadProcess(ctx context.Context, _i interceptor.Interceptor, _ interceptor.Connection) error {
i, ok := _i.(*chat.ClientInterceptor)
if !ok {
return interceptor.ErrInvalidInterceptor
}
if err := i.Health.Process(ctx, m, nil); err != nil {
return err
}
return nil
// NOTE: NO SUCCESS TRAIL MESSAGES
}

View File

@@ -6,11 +6,10 @@ import (
"github.com/harshabose/socket-comm/pkg/interceptor" "github.com/harshabose/socket-comm/pkg/interceptor"
"github.com/harshabose/socket-comm/pkg/message" "github.com/harshabose/socket-comm/pkg/message"
"github.com/harshabose/socket-comm/pkg/middleware/chat" "github.com/harshabose/socket-comm/pkg/middleware/chat"
"github.com/harshabose/socket-comm/pkg/middleware/chat/errors"
"github.com/harshabose/socket-comm/pkg/middleware/chat/types" "github.com/harshabose/socket-comm/pkg/middleware/chat/types"
) )
var SuccessUnmarkRoomForHealthTrackingProtocol message.Protocol = "chat:success_untrack_health" const SuccessUnmarkRoomForHealthTrackingProtocol message.Protocol = "chat:success_untrack_health"
type SuccessUnmarkRoomForHealthTracking struct { type SuccessUnmarkRoomForHealthTracking struct {
interceptor.BaseMessage interceptor.BaseMessage
@@ -41,10 +40,10 @@ func NewSuccessUntrackHealthInRoomMessageFactory(id types.RoomID) func() (messag
} }
} }
func (m *SuccessUnmarkRoomForHealthTracking) ReadProcess(ctx context.Context, _i interceptor.Interceptor, _ interceptor.Connection) error { func (m *SuccessUnmarkRoomForHealthTracking) ReadProcess(_ context.Context, _i interceptor.Interceptor, _ interceptor.Connection) error {
_, ok := _i.(*chat.ClientInterceptor) _, ok := _i.(*chat.ClientInterceptor)
if !ok { if !ok {
return errors.ErrInvalidInterceptor return interceptor.ErrInvalidInterceptor
} }
// NOTE: INTENTIONALLY EMPTY // NOTE: INTENTIONALLY EMPTY

View File

@@ -1 +0,0 @@
package messages

View File

@@ -11,7 +11,7 @@ import (
"github.com/harshabose/socket-comm/pkg/middleware/chat/types" "github.com/harshabose/socket-comm/pkg/middleware/chat/types"
) )
var ForwardMessageProtocol message.Protocol = "room:forward_message" const ForwardMessageProtocol message.Protocol = "room:forward_message"
type ToForward struct { type ToForward struct {
interceptor.BaseMessage interceptor.BaseMessage