mirror of
https://github.com/xaionaro-go/streamctl.git
synced 2025-10-27 09:30:23 +08:00
Render a basic chat window
This commit is contained in:
@@ -264,12 +264,23 @@ func (obs *OBS) GetChatMessagesChan(
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (obs *OBS) SendChatMessage(ctx context.Context, message string) error {
|
||||
func (obs *OBS) SendChatMessage(
|
||||
ctx context.Context,
|
||||
message string,
|
||||
) error {
|
||||
return fmt.Errorf("not implemented, yet")
|
||||
}
|
||||
func (obs *OBS) DeleteChatMessage(ctx context.Context, messageID string) error {
|
||||
func (obs *OBS) RemoveChatMessage(
|
||||
ctx context.Context,
|
||||
messageID streamcontrol.ChatMessageID,
|
||||
) error {
|
||||
return fmt.Errorf("not implemented, yet")
|
||||
}
|
||||
func (obs *OBS) BanUser(ctx context.Context, userID string, reason string, deadline time.Time) error {
|
||||
func (obs *OBS) BanUser(
|
||||
ctx context.Context,
|
||||
userID streamcontrol.ChatUserID,
|
||||
reason string,
|
||||
deadline time.Time,
|
||||
) error {
|
||||
return fmt.Errorf("not implemented, yet")
|
||||
}
|
||||
|
||||
@@ -105,10 +105,13 @@ type StreamStatus struct {
|
||||
CustomData any `json:",omitempty"`
|
||||
}
|
||||
|
||||
type ChatUserID string
|
||||
type ChatMessageID string
|
||||
|
||||
type ChatMessage struct {
|
||||
CreatedAt time.Time
|
||||
UserID string
|
||||
MessageID string
|
||||
UserID ChatUserID
|
||||
MessageID ChatMessageID
|
||||
Message string
|
||||
}
|
||||
|
||||
@@ -124,8 +127,8 @@ type StreamControllerCommons interface {
|
||||
|
||||
GetChatMessagesChan(ctx context.Context) (<-chan ChatMessage, error)
|
||||
SendChatMessage(ctx context.Context, message string) error
|
||||
DeleteChatMessage(ctx context.Context, messageID string) error
|
||||
BanUser(ctx context.Context, userID string, reason string, deadline time.Time) error
|
||||
RemoveChatMessage(ctx context.Context, messageID ChatMessageID) error
|
||||
BanUser(ctx context.Context, userID ChatUserID, reason string, deadline time.Time) error
|
||||
}
|
||||
|
||||
type StreamController[ProfileType StreamProfile] interface {
|
||||
@@ -230,10 +233,10 @@ func (c *abstractStreamController) GetChatMessagesChan(ctx context.Context) (<-c
|
||||
func (c *abstractStreamController) SendChatMessage(ctx context.Context, message string) error {
|
||||
return c.StreamController.SendChatMessage(ctx, message)
|
||||
}
|
||||
func (c *abstractStreamController) DeleteChatMessage(ctx context.Context, messageID string) error {
|
||||
return c.StreamController.DeleteChatMessage(ctx, messageID)
|
||||
func (c *abstractStreamController) RemoveChatMessage(ctx context.Context, messageID ChatMessageID) error {
|
||||
return c.StreamController.RemoveChatMessage(ctx, messageID)
|
||||
}
|
||||
func (c *abstractStreamController) BanUser(ctx context.Context, userID string, reason string, deadline time.Time) error {
|
||||
func (c *abstractStreamController) BanUser(ctx context.Context, userID ChatUserID, reason string, deadline time.Time) error {
|
||||
return c.StreamController.BanUser(ctx, userID, reason, deadline)
|
||||
}
|
||||
|
||||
|
||||
@@ -63,8 +63,8 @@ func newChatHandler(
|
||||
select {
|
||||
case h.messagesOutChan <- streamcontrol.ChatMessage{
|
||||
CreatedAt: ev.CreatedAt,
|
||||
UserID: ev.Sender.Username,
|
||||
MessageID: ev.ID,
|
||||
UserID: streamcontrol.ChatUserID(ev.Sender.Username),
|
||||
MessageID: streamcontrol.ChatMessageID(ev.ID),
|
||||
Message: ev.Text, // TODO: investigate if we need ev.IRCMessage.Text
|
||||
}:
|
||||
default:
|
||||
|
||||
@@ -73,9 +73,9 @@ func TestChatHandler(t *testing.T) {
|
||||
|
||||
callback(0, irc.ChatMessage{
|
||||
Sender: irc.ChatSender{
|
||||
Username: expectedEvent.UserID,
|
||||
Username: string(expectedEvent.UserID),
|
||||
},
|
||||
ID: expectedEvent.MessageID,
|
||||
ID: string(expectedEvent.MessageID),
|
||||
Channel: channelID,
|
||||
Text: expectedEvent.Message,
|
||||
CreatedAt: time.Now(),
|
||||
|
||||
@@ -844,20 +844,28 @@ func (t *Twitch) SendChatMessage(ctx context.Context, message string) (_ret erro
|
||||
})
|
||||
return err
|
||||
}
|
||||
func (t *Twitch) DeleteChatMessage(ctx context.Context, messageID string) (_ret error) {
|
||||
logger.Debugf(ctx, "DeleteChatMessage(ctx, '%s')", messageID)
|
||||
defer func() { logger.Debugf(ctx, "/DeleteChatMessage(ctx, '%s'): %v", messageID, _ret) }()
|
||||
func (t *Twitch) RemoveChatMessage(
|
||||
ctx context.Context,
|
||||
messageID streamcontrol.ChatMessageID,
|
||||
) (_ret error) {
|
||||
logger.Debugf(ctx, "RemoveChatMessage(ctx, '%s')", messageID)
|
||||
defer func() { logger.Debugf(ctx, "/RemoveChatMessage(ctx, '%s'): %v", messageID, _ret) }()
|
||||
|
||||
t.prepare(ctx)
|
||||
|
||||
_, err := t.client.DeleteChatMessage(&helix.DeleteChatMessageParams{
|
||||
BroadcasterID: t.broadcasterID,
|
||||
ModeratorID: t.broadcasterID,
|
||||
MessageID: messageID,
|
||||
MessageID: string(messageID),
|
||||
})
|
||||
return err
|
||||
}
|
||||
func (t *Twitch) BanUser(ctx context.Context, userID string, reason string, deadline time.Time) (_err error) {
|
||||
func (t *Twitch) BanUser(
|
||||
ctx context.Context,
|
||||
userID streamcontrol.ChatUserID,
|
||||
reason string,
|
||||
deadline time.Time,
|
||||
) (_err error) {
|
||||
logger.Debugf(ctx, "BanUser(ctx, '%s', '%s', %v)", userID, reason, deadline)
|
||||
defer func() { logger.Debugf(ctx, "/BanUser(ctx, '%s', '%s', %v): %v", userID, reason, deadline, _err) }()
|
||||
|
||||
@@ -873,7 +881,7 @@ func (t *Twitch) BanUser(ctx context.Context, userID string, reason string, dead
|
||||
Body: helix.BanUserRequestBody{
|
||||
Duration: duration,
|
||||
Reason: reason,
|
||||
UserId: userID,
|
||||
UserId: string(userID),
|
||||
},
|
||||
})
|
||||
return err
|
||||
|
||||
@@ -1170,12 +1170,23 @@ func (yt *YouTube) GetChatMessagesChan(
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (yt *YouTube) SendChatMessage(ctx context.Context, message string) error {
|
||||
func (yt *YouTube) SendChatMessage(
|
||||
ctx context.Context,
|
||||
message string,
|
||||
) error {
|
||||
return fmt.Errorf("not implemented, yet")
|
||||
}
|
||||
func (yt *YouTube) DeleteChatMessage(ctx context.Context, messageID string) error {
|
||||
func (yt *YouTube) RemoveChatMessage(
|
||||
ctx context.Context,
|
||||
messageID streamcontrol.ChatMessageID,
|
||||
) error {
|
||||
return fmt.Errorf("not implemented, yet")
|
||||
}
|
||||
func (yt *YouTube) BanUser(ctx context.Context, userID string, reason string, deadline time.Time) error {
|
||||
func (yt *YouTube) BanUser(
|
||||
ctx context.Context,
|
||||
userID streamcontrol.ChatUserID,
|
||||
reason string,
|
||||
deadline time.Time,
|
||||
) error {
|
||||
return fmt.Errorf("not implemented, yet")
|
||||
}
|
||||
|
||||
@@ -276,6 +276,18 @@ type StreamD interface {
|
||||
SubscribeToChatMessages(
|
||||
ctx context.Context,
|
||||
) (<-chan ChatMessage, error)
|
||||
RemoveChatMessage(
|
||||
ctx context.Context,
|
||||
platID streamcontrol.PlatformName,
|
||||
msgID streamcontrol.ChatMessageID,
|
||||
) error
|
||||
BanUser(
|
||||
ctx context.Context,
|
||||
platID streamcontrol.PlatformName,
|
||||
userID streamcontrol.ChatUserID,
|
||||
reason string,
|
||||
deadline time.Time,
|
||||
) error
|
||||
}
|
||||
|
||||
type StreamPlayer = sstypes.StreamPlayer
|
||||
|
||||
@@ -3,6 +3,7 @@ package streamd
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/xaionaro-go/streamctl/pkg/observability"
|
||||
"github.com/xaionaro-go/streamctl/pkg/streamcontrol"
|
||||
@@ -36,3 +37,41 @@ func (d *StreamD) startListeningForChatMessages(
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *StreamD) RemoveChatMessage(
|
||||
ctx context.Context,
|
||||
platID streamcontrol.PlatformName,
|
||||
msgID streamcontrol.ChatMessageID,
|
||||
) error {
|
||||
ctrl, err := d.streamController(ctx, platID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to get stream controller '%s': %w", platID, err)
|
||||
}
|
||||
|
||||
err = ctrl.RemoveChatMessage(ctx, msgID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to remove message '%s' on '%s': %w", msgID, platID, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *StreamD) BanUser(
|
||||
ctx context.Context,
|
||||
platID streamcontrol.PlatformName,
|
||||
userID streamcontrol.ChatUserID,
|
||||
reason string,
|
||||
deadline time.Time,
|
||||
) error {
|
||||
ctrl, err := d.streamController(ctx, platID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to get stream controller '%s': %w", platID, err)
|
||||
}
|
||||
|
||||
err = ctrl.BanUser(ctx, streamcontrol.ChatUserID(userID), reason, deadline)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to ban user '%s' on '%s': %w", userID, platID, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -2701,8 +2701,8 @@ func (c *Client) SubscribeToChatMessages(
|
||||
int64(createdAtUnix)/int64(time.Second),
|
||||
(int64(createdAtUnix)%int64(time.Second))/int64(time.Nanosecond),
|
||||
),
|
||||
UserID: event.GetUserID(),
|
||||
MessageID: event.GetMessageID(),
|
||||
UserID: streamcontrol.ChatUserID(event.GetUserID()),
|
||||
MessageID: streamcontrol.ChatMessageID(event.GetMessageID()),
|
||||
Message: event.GetMessage(),
|
||||
},
|
||||
Platform: streamcontrol.PlatformName(event.GetPlatID()),
|
||||
@@ -2710,3 +2710,62 @@ func (c *Client) SubscribeToChatMessages(
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
func (c *Client) RemoveChatMessage(
|
||||
ctx context.Context,
|
||||
platID streamcontrol.PlatformName,
|
||||
msgID streamcontrol.ChatMessageID,
|
||||
) error {
|
||||
_, err := withStreamDClient(ctx, c, func(
|
||||
ctx context.Context,
|
||||
client streamd_grpc.StreamDClient,
|
||||
conn io.Closer,
|
||||
) (*streamd_grpc.RemoveChatMessageReply, error) {
|
||||
return callWrapper(
|
||||
ctx,
|
||||
c,
|
||||
client.RemoveChatMessage,
|
||||
&streamd_grpc.RemoveChatMessageRequest{
|
||||
PlatID: string(platID),
|
||||
MessageID: string(msgID),
|
||||
},
|
||||
)
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to submit the event: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (c *Client) BanUser(
|
||||
ctx context.Context,
|
||||
platID streamcontrol.PlatformName,
|
||||
userID streamcontrol.ChatUserID,
|
||||
reason string,
|
||||
deadline time.Time,
|
||||
) error {
|
||||
_, err := withStreamDClient(ctx, c, func(
|
||||
ctx context.Context,
|
||||
client streamd_grpc.StreamDClient,
|
||||
conn io.Closer,
|
||||
) (*streamd_grpc.BanUserReply, error) {
|
||||
var deadlineNano *int64
|
||||
if !deadline.IsZero() {
|
||||
deadlineNano = ptr(int64(deadline.UnixNano()))
|
||||
}
|
||||
return callWrapper(
|
||||
ctx,
|
||||
c,
|
||||
client.BanUser,
|
||||
&streamd_grpc.BanUserRequest{
|
||||
PlatID: string(platID),
|
||||
UserID: string(userID),
|
||||
Reason: reason,
|
||||
DeadlineUnixNano: deadlineNano,
|
||||
},
|
||||
)
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to submit the event: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -91,6 +91,8 @@ type StreamDClient interface {
|
||||
UpdateTriggerRule(ctx context.Context, in *UpdateTriggerRuleRequest, opts ...grpc.CallOption) (*UpdateTriggerRuleReply, error)
|
||||
SubmitEvent(ctx context.Context, in *SubmitEventRequest, opts ...grpc.CallOption) (*SubmitEventReply, error)
|
||||
SubscribeToChatMessages(ctx context.Context, in *SubscribeToChatMessagesRequest, opts ...grpc.CallOption) (StreamD_SubscribeToChatMessagesClient, error)
|
||||
RemoveChatMessage(ctx context.Context, in *RemoveChatMessageRequest, opts ...grpc.CallOption) (*RemoveChatMessageReply, error)
|
||||
BanUser(ctx context.Context, in *BanUserRequest, opts ...grpc.CallOption) (*BanUserReply, error)
|
||||
}
|
||||
|
||||
type streamDClient struct {
|
||||
@@ -1020,6 +1022,24 @@ func (x *streamDSubscribeToChatMessagesClient) Recv() (*ChatMessage, error) {
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func (c *streamDClient) RemoveChatMessage(ctx context.Context, in *RemoveChatMessageRequest, opts ...grpc.CallOption) (*RemoveChatMessageReply, error) {
|
||||
out := new(RemoveChatMessageReply)
|
||||
err := c.cc.Invoke(ctx, "/streamd.StreamD/RemoveChatMessage", in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *streamDClient) BanUser(ctx context.Context, in *BanUserRequest, opts ...grpc.CallOption) (*BanUserReply, error) {
|
||||
out := new(BanUserReply)
|
||||
err := c.cc.Invoke(ctx, "/streamd.StreamD/BanUser", in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// StreamDServer is the server API for StreamD service.
|
||||
// All implementations must embed UnimplementedStreamDServer
|
||||
// for forward compatibility
|
||||
@@ -1098,6 +1118,8 @@ type StreamDServer interface {
|
||||
UpdateTriggerRule(context.Context, *UpdateTriggerRuleRequest) (*UpdateTriggerRuleReply, error)
|
||||
SubmitEvent(context.Context, *SubmitEventRequest) (*SubmitEventReply, error)
|
||||
SubscribeToChatMessages(*SubscribeToChatMessagesRequest, StreamD_SubscribeToChatMessagesServer) error
|
||||
RemoveChatMessage(context.Context, *RemoveChatMessageRequest) (*RemoveChatMessageReply, error)
|
||||
BanUser(context.Context, *BanUserRequest) (*BanUserReply, error)
|
||||
mustEmbedUnimplementedStreamDServer()
|
||||
}
|
||||
|
||||
@@ -1327,6 +1349,12 @@ func (UnimplementedStreamDServer) SubmitEvent(context.Context, *SubmitEventReque
|
||||
func (UnimplementedStreamDServer) SubscribeToChatMessages(*SubscribeToChatMessagesRequest, StreamD_SubscribeToChatMessagesServer) error {
|
||||
return status.Errorf(codes.Unimplemented, "method SubscribeToChatMessages not implemented")
|
||||
}
|
||||
func (UnimplementedStreamDServer) RemoveChatMessage(context.Context, *RemoveChatMessageRequest) (*RemoveChatMessageReply, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method RemoveChatMessage not implemented")
|
||||
}
|
||||
func (UnimplementedStreamDServer) BanUser(context.Context, *BanUserRequest) (*BanUserReply, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method BanUser not implemented")
|
||||
}
|
||||
func (UnimplementedStreamDServer) mustEmbedUnimplementedStreamDServer() {}
|
||||
|
||||
// UnsafeStreamDServer may be embedded to opt out of forward compatibility for this service.
|
||||
@@ -2705,6 +2733,42 @@ func (x *streamDSubscribeToChatMessagesServer) Send(m *ChatMessage) error {
|
||||
return x.ServerStream.SendMsg(m)
|
||||
}
|
||||
|
||||
func _StreamD_RemoveChatMessage_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(RemoveChatMessageRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(StreamDServer).RemoveChatMessage(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: "/streamd.StreamD/RemoveChatMessage",
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(StreamDServer).RemoveChatMessage(ctx, req.(*RemoveChatMessageRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _StreamD_BanUser_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(BanUserRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(StreamDServer).BanUser(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: "/streamd.StreamD/BanUser",
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(StreamDServer).BanUser(ctx, req.(*BanUserRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
var _StreamD_serviceDesc = grpc.ServiceDesc{
|
||||
ServiceName: "streamd.StreamD",
|
||||
HandlerType: (*StreamDServer)(nil),
|
||||
@@ -2961,6 +3025,14 @@ var _StreamD_serviceDesc = grpc.ServiceDesc{
|
||||
MethodName: "SubmitEvent",
|
||||
Handler: _StreamD_SubmitEvent_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "RemoveChatMessage",
|
||||
Handler: _StreamD_RemoveChatMessage_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "BanUser",
|
||||
Handler: _StreamD_BanUser_Handler,
|
||||
},
|
||||
},
|
||||
Streams: []grpc.StreamDesc{
|
||||
{
|
||||
|
||||
12
pkg/streamd/grpc/goconv/time.go
Normal file
12
pkg/streamd/grpc/goconv/time.go
Normal file
@@ -0,0 +1,12 @@
|
||||
package goconv
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
func UnixGRPC2Go(unixNano int64) time.Time {
|
||||
return time.Unix(
|
||||
unixNano/int64(time.Second),
|
||||
unixNano%int64(time.Second),
|
||||
)
|
||||
}
|
||||
@@ -88,6 +88,8 @@ service StreamD {
|
||||
rpc SubmitEvent(SubmitEventRequest) returns (SubmitEventReply) {}
|
||||
|
||||
rpc SubscribeToChatMessages(SubscribeToChatMessagesRequest) returns (stream ChatMessage) {}
|
||||
rpc RemoveChatMessage(RemoveChatMessageRequest) returns (RemoveChatMessageReply) {}
|
||||
rpc BanUser(BanUserRequest) returns (BanUserReply) {}
|
||||
}
|
||||
|
||||
message PingRequest {
|
||||
@@ -683,3 +685,16 @@ message ChatMessage {
|
||||
string messageID = 4;
|
||||
string message = 5;
|
||||
}
|
||||
|
||||
message RemoveChatMessageRequest {
|
||||
string platID = 1;
|
||||
string messageID = 2;
|
||||
}
|
||||
message RemoveChatMessageReply {}
|
||||
message BanUserRequest {
|
||||
string platID = 1;
|
||||
string userID = 2;
|
||||
string reason = 3;
|
||||
optional int64 deadlineUnixNano = 4;
|
||||
}
|
||||
message BanUserReply {}
|
||||
|
||||
@@ -1948,10 +1948,46 @@ func (grpc *GRPCServer) SubscribeToChatMessages(
|
||||
return streamd_grpc.ChatMessage{
|
||||
CreatedAtNano: uint64(input.CreatedAt.UnixNano()),
|
||||
PlatID: string(input.Platform),
|
||||
UserID: input.UserID,
|
||||
MessageID: input.MessageID,
|
||||
UserID: string(input.UserID),
|
||||
MessageID: string(input.MessageID),
|
||||
Message: input.Message,
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
func (grpc *GRPCServer) RemoveChatMessage(
|
||||
ctx context.Context,
|
||||
req *streamd_grpc.RemoveChatMessageRequest,
|
||||
) (*streamd_grpc.RemoveChatMessageReply, error) {
|
||||
err := grpc.StreamD.RemoveChatMessage(
|
||||
ctx,
|
||||
streamcontrol.PlatformName(req.GetPlatID()),
|
||||
streamcontrol.ChatMessageID(req.GetMessageID()),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &streamd_grpc.RemoveChatMessageReply{}, nil
|
||||
}
|
||||
func (grpc *GRPCServer) BanUser(
|
||||
ctx context.Context,
|
||||
req *streamd_grpc.BanUserRequest,
|
||||
) (*streamd_grpc.BanUserReply, error) {
|
||||
|
||||
var deadline time.Time
|
||||
if req.DeadlineUnixNano != nil {
|
||||
deadline = goconv.UnixGRPC2Go(*req.DeadlineUnixNano)
|
||||
}
|
||||
err := grpc.StreamD.BanUser(
|
||||
ctx,
|
||||
streamcontrol.PlatformName(req.GetPlatID()),
|
||||
streamcontrol.ChatUserID(req.GetUserID()),
|
||||
req.GetReason(),
|
||||
deadline,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &streamd_grpc.BanUserReply{}, nil
|
||||
}
|
||||
|
||||
143
pkg/streampanel/chat.go
Normal file
143
pkg/streampanel/chat.go
Normal file
@@ -0,0 +1,143 @@
|
||||
package streampanel
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"fyne.io/fyne/v2"
|
||||
"fyne.io/fyne/v2/container"
|
||||
"fyne.io/fyne/v2/theme"
|
||||
"fyne.io/fyne/v2/widget"
|
||||
"github.com/xaionaro-go/streamctl/pkg/observability"
|
||||
"github.com/xaionaro-go/streamctl/pkg/streamcontrol"
|
||||
"github.com/xaionaro-go/streamctl/pkg/streamd/api"
|
||||
)
|
||||
|
||||
type chatUI struct {
|
||||
CanvasObject fyne.CanvasObject
|
||||
Panel *Panel
|
||||
List *widget.List
|
||||
MessagesHistoryLocker sync.Mutex
|
||||
MessagesHistory []api.ChatMessage
|
||||
|
||||
// TODO: do not store ctx in a struct:
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func newChatUI(
|
||||
ctx context.Context,
|
||||
panel *Panel,
|
||||
) (*chatUI, error) {
|
||||
ui := &chatUI{
|
||||
Panel: panel,
|
||||
ctx: ctx,
|
||||
}
|
||||
if err := ui.init(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ui, nil
|
||||
}
|
||||
|
||||
func (ui *chatUI) init(
|
||||
ctx context.Context,
|
||||
) error {
|
||||
ui.List = widget.NewList(ui.listLength, ui.listCreateItem, ui.listUpdateItem)
|
||||
msgCh, err := ui.Panel.StreamD.SubscribeToChatMessages(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to subscribe to chat messages: %w", err)
|
||||
}
|
||||
|
||||
observability.Go(ctx, func() {
|
||||
ui.messageReceiverLoop(ctx, msgCh)
|
||||
})
|
||||
ui.CanvasObject = ui.List
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ui *chatUI) messageReceiverLoop(
|
||||
ctx context.Context,
|
||||
msgCh <-chan api.ChatMessage,
|
||||
) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case msg := <-msgCh:
|
||||
ui.onReceiveMessage(msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ui *chatUI) onReceiveMessage(
|
||||
msg api.ChatMessage,
|
||||
) {
|
||||
ui.MessagesHistoryLocker.Lock()
|
||||
defer ui.MessagesHistoryLocker.Unlock()
|
||||
ui.MessagesHistory = append(ui.MessagesHistory, msg)
|
||||
}
|
||||
|
||||
func (ui *chatUI) listLength() int {
|
||||
ui.MessagesHistoryLocker.Lock()
|
||||
defer ui.MessagesHistoryLocker.Unlock()
|
||||
return len(ui.MessagesHistory)
|
||||
}
|
||||
|
||||
func (ui *chatUI) listCreateItem() fyne.CanvasObject {
|
||||
container := container.NewHBox()
|
||||
return container
|
||||
}
|
||||
|
||||
func (ui *chatUI) listUpdateItem(
|
||||
itemID int,
|
||||
obj fyne.CanvasObject,
|
||||
) {
|
||||
ui.MessagesHistoryLocker.Lock()
|
||||
defer ui.MessagesHistoryLocker.Unlock()
|
||||
container := obj.(*fyne.Container)
|
||||
msg := ui.MessagesHistory[itemID]
|
||||
|
||||
label := widget.NewLabel(
|
||||
fmt.Sprintf(
|
||||
"%s: %s: %s: %s",
|
||||
msg.CreatedAt.Format("15:04"),
|
||||
msg.Platform,
|
||||
msg.UserID,
|
||||
msg.Message,
|
||||
),
|
||||
)
|
||||
container.RemoveAll()
|
||||
container.Add(widget.NewButtonWithIcon("", theme.ContentRemoveIcon(), func() {
|
||||
ui.onRemoveClicked(itemID)
|
||||
}))
|
||||
container.Add(label)
|
||||
|
||||
// TODO: think of how to get rid of this racy hack:
|
||||
observability.Go(context.TODO(), func() { ui.List.SetItemHeight(itemID, label.MinSize().Height) })
|
||||
}
|
||||
|
||||
func (ui *chatUI) onRemoveClicked(
|
||||
itemID int,
|
||||
) {
|
||||
ui.MessagesHistoryLocker.Lock()
|
||||
defer ui.MessagesHistoryLocker.Unlock()
|
||||
if itemID < 0 || itemID >= len(ui.MessagesHistory) {
|
||||
return
|
||||
}
|
||||
msg := ui.MessagesHistory[itemID]
|
||||
ui.MessagesHistory = append(ui.MessagesHistory[:itemID], ui.MessagesHistory[itemID+1:]...)
|
||||
ui.Panel.chatMessageRemove(ui.ctx, msg.Platform, msg.MessageID)
|
||||
ui.CanvasObject.Refresh()
|
||||
}
|
||||
|
||||
func (p *Panel) chatMessageRemove(
|
||||
ctx context.Context,
|
||||
platID streamcontrol.PlatformName,
|
||||
msgID streamcontrol.ChatMessageID,
|
||||
) {
|
||||
|
||||
err := p.StreamD.RemoveChatMessage(ctx, platID, msgID)
|
||||
if err != nil {
|
||||
p.DisplayError(err)
|
||||
}
|
||||
}
|
||||
@@ -21,6 +21,7 @@ type Page string
|
||||
const (
|
||||
PageControl = Page("Control")
|
||||
PageMoreControl = Page("More control")
|
||||
PageChat = Page("Chat")
|
||||
PageMonitor = Page("Monitor")
|
||||
PageOBS = Page("OBS")
|
||||
PageRestream = Page("Restream")
|
||||
|
||||
@@ -2018,6 +2018,20 @@ func (p *Panel) initMainWindow(
|
||||
),
|
||||
)
|
||||
|
||||
chatPage := container.NewBorder(nil, nil, nil, nil)
|
||||
chatUI, err := newChatUI(ctx, p)
|
||||
if err != nil {
|
||||
logger.Errorf(ctx, "unable to initialize the page for chat: %v", err)
|
||||
} else {
|
||||
chatPage = container.NewBorder(
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
chatUI.CanvasObject,
|
||||
)
|
||||
}
|
||||
|
||||
var cancelPage context.CancelFunc
|
||||
setPage := func(page consts.Page) {
|
||||
logger.Debugf(ctx, "setPage(%s)", page)
|
||||
@@ -2036,6 +2050,7 @@ func (p *Panel) initMainWindow(
|
||||
obsPage.Hide()
|
||||
restreamPage.Hide()
|
||||
moreControlPage.Hide()
|
||||
chatPage.Hide()
|
||||
profileControl.Show()
|
||||
monitorControl.Hide()
|
||||
timersUI.StopRefreshingFromRemote(ctx)
|
||||
@@ -2044,12 +2059,21 @@ func (p *Panel) initMainWindow(
|
||||
p.monitorPage.Hide()
|
||||
obsPage.Hide()
|
||||
restreamPage.Hide()
|
||||
moreControlPage.Hide()
|
||||
chatPage.Hide()
|
||||
profileControl.Hide()
|
||||
monitorControl.Hide()
|
||||
controlPage.Hide()
|
||||
moreControlPage.Show()
|
||||
timersUI.StartRefreshingFromRemote(ctx)
|
||||
case consts.PageChat:
|
||||
p.monitorPage.Hide()
|
||||
obsPage.Hide()
|
||||
restreamPage.Hide()
|
||||
moreControlPage.Hide()
|
||||
profileControl.Hide()
|
||||
monitorControl.Hide()
|
||||
controlPage.Hide()
|
||||
chatPage.Show()
|
||||
case consts.PageMonitor:
|
||||
controlPage.Hide()
|
||||
profileControl.Hide()
|
||||
@@ -2086,6 +2110,7 @@ func (p *Panel) initMainWindow(
|
||||
[]string{
|
||||
string(consts.PageControl),
|
||||
string(consts.PageMoreControl),
|
||||
string(consts.PageChat),
|
||||
string(consts.PageMonitor),
|
||||
string(consts.PageOBS),
|
||||
string(consts.PageRestream),
|
||||
@@ -2112,7 +2137,7 @@ func (p *Panel) initMainWindow(
|
||||
),
|
||||
nil,
|
||||
nil,
|
||||
container.NewStack(controlPage, moreControlPage, p.monitorPage, obsPage, restreamPage),
|
||||
container.NewStack(controlPage, moreControlPage, chatPage, p.monitorPage, obsPage, restreamPage),
|
||||
))
|
||||
|
||||
w.Show()
|
||||
|
||||
Reference in New Issue
Block a user