Start implementing the chat widget for the dashboard

This commit is contained in:
Dmitrii Okunev
2025-06-15 00:06:04 +01:00
parent 6d6a1b4b26
commit 20072c81c0
22 changed files with 1182 additions and 642 deletions

View File

@@ -10,6 +10,7 @@ import (
"net/http"
_ "net/http/pprof"
"os"
"time"
"github.com/davecgh/go-spew/spew"
"github.com/facebookincubator/go-belt/tool/logger"
@@ -318,7 +319,7 @@ func chatListen(cmd *cobra.Command, args []string) {
assertNoError(ctx, err)
fmt.Println("subscribing...")
ch, err := streamD.SubscribeToChatMessages(ctx)
ch, err := streamD.SubscribeToChatMessages(ctx, time.Now().Add(-time.Minute))
assertNoError(ctx, err)
fmt.Println("started listening...")

View File

@@ -200,6 +200,7 @@ func runPanel(
err := mainProcess.SendMessage(ctx, ProcessNameMain, MessageApplicationRestart{})
if err != nil {
logger.Errorf(ctx, "unable to send a request to restart the application: %v; closing the panel instead (hoping it will trigger a restart)", err)
belt.Flush(ctx)
panel.Close()
}
},
@@ -212,6 +213,8 @@ func runPanel(
if err != nil {
logger.Panic(ctx, err)
}
logger.Debugf(ctx, "/panel.Loop")
belt.Flush(ctx)
err = panel.Close()
if err != nil {
logger.Error(ctx, err)

View File

@@ -0,0 +1,39 @@
package chatmessagesstorage
import (
"context"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/streamctl/pkg/streamd/api"
"github.com/xaionaro-go/xsync"
)
func (s *ChatMessagesStorage) AddMessage(
ctx context.Context,
msg api.ChatMessage,
) error {
xsync.DoA2(ctx, &s.Mutex, s.addMessageLocked, ctx, msg)
return nil
}
func (s *ChatMessagesStorage) addMessageLocked(
ctx context.Context,
msg api.ChatMessage,
) {
logger.Tracef(ctx, "addMessageLocked(ctx, %#+v)", msg)
defer func() { logger.Tracef(ctx, "/addMessageLocked(ctx, %#+v)", msg) }()
if len(s.Messages) > 0 && !msg.CreatedAt.After(s.Messages[len(s.Messages)-1].CreatedAt) {
s.IsSorted = false
}
s.Messages = append(s.Messages, msg)
s.IsChanged = true
if len(s.Messages) <= MaxMessages {
return
}
if s.IsSorted {
s.Messages = s.Messages[len(s.Messages)-MaxMessages:]
return
}
s.sortAndDeduplicateAndTruncate(ctx)
}

View File

@@ -0,0 +1,26 @@
package chatmessagesstorage
import (
"github.com/xaionaro-go/streamctl/pkg/streamd/api"
"github.com/xaionaro-go/xsync"
)
const (
MaxMessages = 1000
)
type ChatMessagesStorage struct {
xsync.Mutex
FilePath string
IsSorted bool
IsChanged bool
Messages []api.ChatMessage
}
func New(
filePath string,
) *ChatMessagesStorage {
return &ChatMessagesStorage{
FilePath: filePath,
}
}

View File

@@ -0,0 +1,49 @@
package chatmessagesstorage
import (
"context"
"slices"
"sort"
"time"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/streamctl/pkg/streamd/api"
"github.com/xaionaro-go/xsync"
)
func (s *ChatMessagesStorage) GetMessagesSince(
ctx context.Context,
since time.Time,
) ([]api.ChatMessage, error) {
return xsync.DoA2R2(ctx, &s.Mutex, s.getMessagesSinceLocked, ctx, since)
}
func (s *ChatMessagesStorage) getMessagesSinceLocked(
ctx context.Context,
since time.Time,
) (_ret []api.ChatMessage, _err error) {
logger.Tracef(ctx, "getMessagesSinceLocked(ctx, %v)", since)
defer func() { logger.Tracef(ctx, "/getMessagesSinceLocked(ctx, %v): len:%d, %v", since, len(_ret), _err) }()
if len(s.Messages) == 0 {
return nil, nil
}
if !s.IsSorted {
s.sortAndDeduplicateAndTruncate(ctx)
}
idx := sort.Search(len(s.Messages), func(i int) bool {
m := &s.Messages[i]
return !m.CreatedAt.After(since)
})
if idx >= len(s.Messages) {
if !since.Before(s.Messages[0].CreatedAt) {
return nil, nil
}
idx = 0
}
return slices.Clone(s.Messages[idx:]), nil
}

View File

@@ -0,0 +1,38 @@
package chatmessagesstorage
import (
"context"
"encoding/json"
"fmt"
"os"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/xsync"
)
func (s *ChatMessagesStorage) Load(ctx context.Context) error {
return xsync.DoA1R1(ctx, &s.Mutex, s.loadLocked, ctx)
}
func (s *ChatMessagesStorage) loadLocked(ctx context.Context) (_err error) {
logger.Tracef(ctx, "storeLocked(ctx)")
defer func() { logger.Tracef(ctx, "/storeLocked(ctx): %v", _err) }()
f, err := os.OpenFile(s.FilePath, os.O_RDONLY, 0)
if err != nil {
if os.IsNotExist(err) {
s.Messages = s.Messages[:0]
return nil
}
return fmt.Errorf("unable to open file '%s' for reading: %w", s.FilePath, err)
}
defer f.Close()
d := json.NewDecoder(f)
err = d.Decode(&s.Messages)
if err != nil {
return fmt.Errorf("unable to parse file '%s': %w", s.FilePath, err)
}
s.sortAndDeduplicateAndTruncate(ctx)
return nil
}

View File

@@ -0,0 +1,37 @@
package chatmessagesstorage
import (
"context"
"fmt"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol"
"github.com/xaionaro-go/xsync"
)
func (s *ChatMessagesStorage) RemoveMessage(
ctx context.Context,
msgID streamcontrol.ChatMessageID,
) error {
return xsync.DoA2R1(ctx, &s.Mutex, s.removeMessageLocked, ctx, msgID)
}
func (s *ChatMessagesStorage) removeMessageLocked(
ctx context.Context,
msgID streamcontrol.ChatMessageID,
) (_err error) {
logger.Tracef(ctx, "removeMessageLocked(ctx, '%v')", msgID)
defer func() { logger.Tracef(ctx, "/removeMessageLocked(ctx, '%v'): %v", msgID, _err) }()
for idx := range s.Messages {
if s.Messages[idx].MessageID != msgID {
continue
}
s.Messages[idx] = s.Messages[len(s.Messages)-1]
s.Messages = s.Messages[:len(s.Messages)-1]
s.IsSorted = false
s.IsChanged = true
return nil
}
return fmt.Errorf("message with ID '%s' was not found", msgID)
}

View File

@@ -0,0 +1,56 @@
package chatmessagesstorage
import (
"context"
"sort"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/streamctl/pkg/streamd/api"
)
func msgLess(ctx context.Context, a *api.ChatMessage, b *api.ChatMessage) bool {
if a.CreatedAt.Before(b.CreatedAt) {
return true
}
if a.Platform < b.Platform {
return true
}
if a.Username < b.Username {
return true
}
if a.MessageID < b.MessageID {
return true
}
if a.Message < b.Message {
return true
}
if a != b {
logger.Errorf(ctx, "msgs A and B look equal, but are not: A:%#+v B:%#+v", a, b)
}
return false
}
func (s *ChatMessagesStorage) sortAndDeduplicateAndTruncate(ctx context.Context) {
if len(s.Messages) == 0 {
return
}
sort.Slice(s.Messages, func(i, j int) bool {
return msgLess(ctx, &s.Messages[i], &s.Messages[j])
})
s.IsSorted = true
dedup := make([]api.ChatMessage, 0, len(s.Messages))
dedup = append(dedup, s.Messages[0])
for _, msg := range s.Messages[1:] {
if msg == dedup[len(dedup)-1] {
continue
}
dedup = append(dedup, msg)
}
s.Messages = dedup
if len(s.Messages) > MaxMessages {
s.Messages = s.Messages[len(s.Messages)-MaxMessages:]
}
}

View File

@@ -0,0 +1,59 @@
package chatmessagesstorage
import (
"context"
"encoding/json"
"fmt"
"os"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/xsync"
)
func (s *ChatMessagesStorage) Store(ctx context.Context) error {
return xsync.DoA1R1(ctx, &s.Mutex, s.storeLocked, ctx)
}
func (s *ChatMessagesStorage) storeLocked(ctx context.Context) (_err error) {
logger.Tracef(ctx, "storeLocked(ctx)")
defer func() { logger.Tracef(ctx, "/storeLocked(ctx): %v", _err) }()
if !s.IsChanged {
return nil
}
newFilePath := s.FilePath + ".new"
f, err := os.OpenFile(newFilePath, os.O_WRONLY|os.O_CREATE, 0640)
if err != nil {
return fmt.Errorf("unable to open file '%s' for writing: %w", newFilePath, err)
}
s.sortAndDeduplicateAndTruncate(ctx)
d := json.NewEncoder(f)
err = d.Encode(s.Messages)
f.Close()
if err != nil {
return fmt.Errorf("unable to serialize the messages and write them to file '%s': %w", newFilePath, err)
}
oldFilePath := s.FilePath + ".old"
err = os.Rename(s.FilePath, oldFilePath)
if err != nil {
if !os.IsNotExist(err) {
return fmt.Errorf("unable to move '%s' to '%s': %w", s.FilePath, oldFilePath)
}
}
err = os.Rename(newFilePath, s.FilePath)
if err != nil {
return fmt.Errorf("unable to move '%s' to '%s': %w", newFilePath, s.FilePath)
}
err = os.Remove(oldFilePath)
if err != nil {
if !os.IsNotExist(err) {
logger.Errorf(ctx, "unable to delete '%s': %v", s.FilePath, err)
}
}
return nil
}

View File

@@ -277,6 +277,7 @@ type StreamD interface {
SubscribeToChatMessages(
ctx context.Context,
since time.Time,
) (<-chan ChatMessage, error)
SendChatMessage(
ctx context.Context,

View File

@@ -11,6 +11,14 @@ import (
"github.com/xaionaro-go/streamctl/pkg/streamd/api"
)
type ChatMessageStorage interface {
AddMessage(context.Context, api.ChatMessage) error
RemoveMessage(context.Context, streamcontrol.ChatMessageID) error
Load(ctx context.Context) error
Store(ctx context.Context) error
GetMessagesSince(context.Context, time.Time) ([]api.ChatMessage, error)
}
func (d *StreamD) startListeningForChatMessages(
ctx context.Context,
platName streamcontrol.PlatformName,
@@ -25,19 +33,24 @@ func (d *StreamD) startListeningForChatMessages(
return fmt.Errorf("unable to get the channel for chat messages of '%s': %w", platName, err)
}
observability.Go(ctx, func() {
logger.Debugf(ctx, "/startListeningForChatMessages(ctx, '%s')", platName)
defer logger.Debugf(ctx, "/startListeningForChatMessages(ctx, '%s')", platName)
for {
select {
case <-ctx.Done():
logger.Debugf(ctx, "startListeningForChatMessages(ctx, '%s'): context is closed; %v", platName, ctx.Err())
return
case ev, ok := <-ch:
if !ok {
return
}
d.publishEvent(ctx, api.ChatMessage{
msg := api.ChatMessage{
ChatMessage: ev,
Platform: platName,
})
}
if err := d.ChatMessagesStorage.AddMessage(ctx, msg); err != nil {
logger.Errorf(ctx, "unable to add the message %#+v to the chat messages storage: %v", msg, err)
}
d.publishEvent(ctx, msg)
}
}
})
@@ -59,6 +72,10 @@ func (d *StreamD) RemoveChatMessage(
return fmt.Errorf("unable to remove message '%s' on '%s': %w", msgID, platID, err)
}
if err := d.ChatMessagesStorage.RemoveMessage(ctx, msgID); err != nil {
logger.Errorf(ctx, "unable to remove the message from the chat messages storage: %v", err)
}
return nil
}
@@ -81,3 +98,46 @@ func (d *StreamD) BanUser(
return nil
}
func (d *StreamD) SubscribeToChatMessages(
ctx context.Context,
since time.Time,
) (<-chan api.ChatMessage, error) {
return eventSubToChan[api.ChatMessage](
ctx, d,
func(ctx context.Context, outCh chan api.ChatMessage) {
msgs, err := d.ChatMessagesStorage.GetMessagesSince(ctx, since)
if err != nil {
logger.Errorf(ctx, "unable to get the messages from the storage: %v", err)
return
}
for _, msg := range msgs {
outCh <- msg
}
},
)
}
func (d *StreamD) SendChatMessage(
ctx context.Context,
platID streamcontrol.PlatformName,
message string,
) (_err error) {
logger.Debugf(ctx, "SendChatMessage(ctx, '%s', '%s')", platID, message)
defer func() { logger.Debugf(ctx, "/SendChatMessage(ctx, '%s', '%s'): %v", platID, message, _err) }()
if message == "" {
return nil
}
ctrl, err := d.streamController(ctx, platID)
if err != nil {
return fmt.Errorf("unable to get stream controller for platform '%s': %w", platID, err)
}
err = ctrl.SendChatMessage(ctx, message)
if err != nil {
return fmt.Errorf("unable to send message '%s' to platform '%s': %w", message, platID, err)
}
return nil
}

View File

@@ -631,7 +631,10 @@ func (c *Client) SetConfig(
func (c *Client) IsBackendEnabled(
ctx context.Context,
id streamcontrol.PlatformName,
) (bool, error) {
) (_ret bool, _err error) {
logger.Tracef(ctx, "IsBackendEnabled(ctx, '%s')", id)
defer func() { logger.Tracef(ctx, "/IsBackendEnabled(ctx, '%s'): %v %v", id, _ret, _err) }()
reply, err := withStreamDClient(ctx, c, func(
ctx context.Context,
client streamd_grpc.StreamDClient,
@@ -715,7 +718,9 @@ func (c *Client) EndStream(
func (c *Client) GetBackendInfo(
ctx context.Context,
platID streamcontrol.PlatformName,
) (*api.BackendInfo, error) {
) (_ret *api.BackendInfo, _err error) {
logger.Tracef(ctx, "GetBackendInfo(ctx, '%s')", platID)
defer func() { logger.Tracef(ctx, "/GetBackendInfo(ctx, '%s'): %v %v", platID, _ret, _err) }()
reply, err := withStreamDClient(ctx, c, func(
ctx context.Context,
client streamd_grpc.StreamDClient,
@@ -2618,6 +2623,7 @@ func (c *Client) SubmitEvent(
func (c *Client) SubscribeToChatMessages(
ctx context.Context,
since time.Time,
) (<-chan api.ChatMessage, error) {
return unwrapStreamDChan(
ctx,
@@ -2630,19 +2636,21 @@ func (c *Client) SubscribeToChatMessages(
ctx,
c,
client.SubscribeToChatMessages,
&streamd_grpc.SubscribeToChatMessagesRequest{},
&streamd_grpc.SubscribeToChatMessagesRequest{
SinceUNIXNano: uint64(since.UnixNano()),
},
)
},
func(
ctx context.Context,
event *streamd_grpc.ChatMessage,
) api.ChatMessage {
createdAtUnix := event.GetCreatedAtNano()
createdAtUNIXNano := event.GetCreatedAtUNIXNano()
return api.ChatMessage{
ChatMessage: streamcontrol.ChatMessage{
CreatedAt: time.Unix(
int64(createdAtUnix)/int64(time.Second),
(int64(createdAtUnix)%int64(time.Second))/int64(time.Nanosecond),
int64(createdAtUNIXNano)/int64(time.Second),
(int64(createdAtUNIXNano)%int64(time.Second))/int64(time.Nanosecond),
),
UserID: streamcontrol.ChatUserID(event.GetUserID()),
Username: event.GetUsername(),

View File

@@ -0,0 +1,28 @@
package config
import (
"context"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/xpath"
)
const (
DefaultChatMessagesPath = "~/.streampanel.chat-messages"
)
func (cfg *Config) GetChatMessageStorage(
ctx context.Context,
) string {
if cfg.ChatMessagesStorage != nil {
return *cfg.ChatMessagesStorage
}
path, err := xpath.Expand(DefaultChatMessagesPath)
if err != nil {
logger.Errorf(ctx, "unable to expand '%s': %w", DefaultChatMessagesPath, err)
return ".streampanel.chat-messages"
}
return path
}

View File

@@ -22,15 +22,16 @@ type ProfileMetadata struct {
}
type config struct {
CachePath *string `yaml:"cache_path"`
GitRepo GitRepoConfig
Backends streamcontrol.Config
ProfileMetadata map[streamcontrol.ProfileName]ProfileMetadata
StreamServer streamserver.Config `yaml:"stream_server"`
Dashboard DashboardConfig `yaml:"monitor"` // TODO: rename to `dashboard`
TriggerRules TriggerRules `yaml:"trigger_rules"`
P2PNetwork P2PNetwork `yaml:"p2p_network"`
LLM LLM `yaml:"llm"`
CachePath *string `yaml:"cache_path"`
ChatMessagesStorage *string `yaml:"chat_messages_storage"`
GitRepo GitRepoConfig
Backends streamcontrol.Config
ProfileMetadata map[streamcontrol.ProfileName]ProfileMetadata
StreamServer streamserver.Config `yaml:"stream_server"`
Dashboard DashboardConfig `yaml:"monitor"` // TODO: rename to `dashboard`
TriggerRules TriggerRules `yaml:"trigger_rules"`
P2PNetwork P2PNetwork `yaml:"p2p_network"`
LLM LLM `yaml:"llm"`
}
type Config config

View File

@@ -93,8 +93,12 @@ func (d *StreamD) doAction(
func eventSubToChan[T any](
ctx context.Context,
d *StreamD,
onReady func(ctx context.Context, outCh chan T),
) (<-chan T, error) {
var sample T
logger.Debugf(ctx, "eventSubToChan[%T]", sample)
defer func() { logger.Debugf(ctx, "/eventSubToChan[%T]", sample) }()
topic := eventTopic(sample)
var mutex sync.Mutex
@@ -117,11 +121,22 @@ func eventSubToChan[T any](
}
}
if onReady != nil {
mutex.Lock()
}
err := d.EventBus.SubscribeAsync(topic, callback, true)
if err != nil {
return nil, fmt.Errorf("unable to subscribe: %w", err)
}
if onReady != nil {
observability.Go(ctx, func() {
defer mutex.Unlock()
onReady(ctx, r)
})
}
observability.Go(ctx, func() {
<-ctx.Done()
@@ -139,49 +154,49 @@ func eventSubToChan[T any](
func (d *StreamD) SubscribeToDashboardChanges(
ctx context.Context,
) (<-chan api.DiffDashboard, error) {
return eventSubToChan[api.DiffDashboard](ctx, d)
return eventSubToChan[api.DiffDashboard](ctx, d, nil)
}
func (d *StreamD) SubscribeToConfigChanges(
ctx context.Context,
) (<-chan api.DiffConfig, error) {
return eventSubToChan[api.DiffConfig](ctx, d)
return eventSubToChan[api.DiffConfig](ctx, d, nil)
}
func (d *StreamD) SubscribeToStreamsChanges(
ctx context.Context,
) (<-chan api.DiffStreams, error) {
return eventSubToChan[api.DiffStreams](ctx, d)
return eventSubToChan[api.DiffStreams](ctx, d, nil)
}
func (d *StreamD) SubscribeToStreamServersChanges(
ctx context.Context,
) (<-chan api.DiffStreamServers, error) {
return eventSubToChan[api.DiffStreamServers](ctx, d)
return eventSubToChan[api.DiffStreamServers](ctx, d, nil)
}
func (d *StreamD) SubscribeToStreamDestinationsChanges(
ctx context.Context,
) (<-chan api.DiffStreamDestinations, error) {
return eventSubToChan[api.DiffStreamDestinations](ctx, d)
return eventSubToChan[api.DiffStreamDestinations](ctx, d, nil)
}
func (d *StreamD) SubscribeToIncomingStreamsChanges(
ctx context.Context,
) (<-chan api.DiffIncomingStreams, error) {
return eventSubToChan[api.DiffIncomingStreams](ctx, d)
return eventSubToChan[api.DiffIncomingStreams](ctx, d, nil)
}
func (d *StreamD) SubscribeToStreamForwardsChanges(
ctx context.Context,
) (<-chan api.DiffStreamForwards, error) {
return eventSubToChan[api.DiffStreamForwards](ctx, d)
return eventSubToChan[api.DiffStreamForwards](ctx, d, nil)
}
func (d *StreamD) SubscribeToStreamPlayersChanges(
ctx context.Context,
) (<-chan api.DiffStreamPlayers, error) {
return eventSubToChan[api.DiffStreamPlayers](ctx, d)
return eventSubToChan[api.DiffStreamPlayers](ctx, d, nil)
}
func (d *StreamD) notifyStreamPlayerStart(

File diff suppressed because it is too large Load Diff

View File

@@ -745,9 +745,10 @@ message SubmitEventRequest {
message SubmitEventReply {}
message SubscribeToChatMessagesRequest {
uint64 sinceUNIXNano = 1;
}
message ChatMessage {
uint64 createdAtNano = 1;
uint64 createdAtUNIXNano = 1;
string platID = 2;
string userID = 3;
string username = 4;

View File

@@ -300,13 +300,29 @@ func (grpc *GRPCServer) EndStream(
return &streamd_grpc.EndStreamReply{}, nil
}
func (grpc *GRPCServer) GetStreamD() api.StreamD {
if grpc.StreamD == nil {
panic("grpc.StreamD == nil")
}
streamD, ok := grpc.StreamD.(*streamd.StreamD)
if !ok {
return grpc.StreamD
}
<-streamD.ReadyChan
return streamD
}
func (grpc *GRPCServer) IsBackendEnabled(
ctx context.Context,
req *streamd_grpc.IsBackendEnabledRequest,
) (*streamd_grpc.IsBackendEnabledReply, error) {
enabled, err := grpc.StreamD.IsBackendEnabled(
) (_ret *streamd_grpc.IsBackendEnabledReply, _err error) {
platID := streamcontrol.PlatformName(req.GetPlatID())
logger.Tracef(ctx, "IsBackendEnabled(ctx, '%s')", platID)
defer func() { logger.Tracef(ctx, "/IsBackendEnabled(ctx, '%s'): %v %v", platID, _ret, _err) }()
enabled, err := grpc.GetStreamD().IsBackendEnabled(
ctx,
streamcontrol.PlatformName(req.GetPlatID()),
platID,
)
if err != nil {
return nil, fmt.Errorf(
@@ -323,9 +339,11 @@ func (grpc *GRPCServer) IsBackendEnabled(
func (grpc *GRPCServer) GetBackendInfo(
ctx context.Context,
req *streamd_grpc.GetBackendInfoRequest,
) (*streamd_grpc.GetBackendInfoReply, error) {
) (_ret *streamd_grpc.GetBackendInfoReply, _err error) {
platID := streamcontrol.PlatformName(req.GetPlatID())
isEnabled, err := grpc.StreamD.IsBackendEnabled(
logger.Tracef(ctx, "GetBackendInfo(ctx, '%s')", platID)
defer func() { logger.Tracef(ctx, "/GetBackendInfo(ctx, '%s'): %v %v", platID, _ret, _err) }()
isEnabled, err := grpc.GetStreamD().IsBackendEnabled(
ctx,
platID,
)
@@ -1890,17 +1908,24 @@ func (grpc *GRPCServer) SubscribeToChatMessages(
req *streamd_grpc.SubscribeToChatMessagesRequest,
srv streamd_grpc.StreamD_SubscribeToChatMessagesServer,
) error {
ts := req.GetSinceUNIXNano()
since := time.Unix(
int64(ts)/int64(time.Second.Nanoseconds()),
int64(ts)%int64(time.Second.Nanoseconds()),
)
return wrapChan(
grpc.StreamD.SubscribeToChatMessages,
func(ctx context.Context) (<-chan api.ChatMessage, error) {
return grpc.StreamD.SubscribeToChatMessages(ctx, since)
},
srv,
func(input api.ChatMessage) streamd_grpc.ChatMessage {
return streamd_grpc.ChatMessage{
CreatedAtNano: uint64(input.CreatedAt.UnixNano()),
PlatID: string(input.Platform),
UserID: string(input.UserID),
Username: input.Username,
MessageID: string(input.MessageID),
Message: input.Message,
CreatedAtUNIXNano: uint64(input.CreatedAt.UnixNano()),
PlatID: string(input.Platform),
UserID: string(input.UserID),
Username: input.Username,
MessageID: string(input.MessageID),
Message: input.Message,
}
},
)

View File

@@ -19,6 +19,7 @@ import (
"github.com/hashicorp/go-multierror"
"github.com/xaionaro-go/observability"
"github.com/xaionaro-go/player/pkg/player"
"github.com/xaionaro-go/streamctl/pkg/chatmessagesstorage"
"github.com/xaionaro-go/streamctl/pkg/p2p"
"github.com/xaionaro-go/streamctl/pkg/repository"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol"
@@ -65,10 +66,13 @@ type StreamD struct {
SaveConfigFunc SaveConfigFunc
ConfigLock xsync.Gorex
Config config.Config
ReadyChan chan struct{}
CacheLock xsync.Mutex
Cache *cache.Cache
ChatMessagesStorage ChatMessageStorage
GitStorage *repository.GIT
CancelGitSyncer context.CancelFunc
@@ -133,20 +137,28 @@ func New(
}
d := &StreamD{
UI: ui,
SaveConfigFunc: saveCfgFunc,
Config: cfg,
Cache: &cache.Cache{},
OAuthListenPorts: map[uint16]struct{}{},
StreamStatusCache: memoize.NewMemoizeData(),
EventBus: eventbus.New(),
UI: ui,
SaveConfigFunc: saveCfgFunc,
Config: cfg,
ChatMessagesStorage: chatmessagesstorage.New(cfg.GetChatMessageStorage(ctx)),
Cache: &cache.Cache{},
OAuthListenPorts: map[uint16]struct{}{},
StreamStatusCache: memoize.NewMemoizeData(),
EventBus: eventbus.New(),
OBSState: OBSState{
VolumeMeters: map[string][][3]float64{},
},
Timers: map[api.TimerID]*Timer{},
Options: Options(options).Aggregate(),
Timers: map[api.TimerID]*Timer{},
Options: Options(options).Aggregate(),
ReadyChan: make(chan struct{}),
}
// TODO: move this to Run()
if err := d.ChatMessagesStorage.Load(ctx); err != nil {
logger.FromBelt(b).Errorf("unable to read the chat messages: %v", err)
}
// TODO: move this to Run()
err = d.readCache(ctx)
if err != nil {
logger.FromBelt(b).Errorf("unable to read cache: %v", err)
@@ -200,6 +212,11 @@ func (d *StreamD) Run(ctx context.Context) (_ret error) { // TODO: delete the fe
d.UI.DisplayError(fmt.Errorf("unable to initialize the P2P network: %w", err))
}
d.UI.SetStatus("Initializing chat messages storage...")
if err := d.initChatMessagesStorage(ctx); err != nil {
d.UI.DisplayError(fmt.Errorf("unable to initialize the chat messages storage: %w", err))
}
d.UI.SetStatus("OBS restarter...")
if err := d.initOBSRestarter(ctx); err != nil {
d.UI.DisplayError(fmt.Errorf("unable to initialize the OBS restarter: %w", err))
@@ -211,6 +228,32 @@ func (d *StreamD) Run(ctx context.Context) (_ret error) { // TODO: delete the fe
}
d.UI.SetStatus("Initializing UI...")
close(d.ReadyChan)
return nil
}
func (d *StreamD) initChatMessagesStorage(ctx context.Context) (_err error) {
logger.Debugf(ctx, "initChatMessagesStorage")
defer logger.Debugf(ctx, "/initChatMessagesStorage: %v", _err)
observability.Go(ctx, func() {
logger.Debugf(ctx, "initChatMessagesStorage-refresherLoop")
defer logger.Debugf(ctx, "/initChatMessagesStorage-refresherLoop")
t := time.NewTicker(5 * time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
}
err := d.ChatMessagesStorage.Store(ctx)
if err != nil {
d.UI.DisplayError(fmt.Errorf("unable to store the chat messages: %w", err))
}
}
})
return nil
}
@@ -218,7 +261,7 @@ func (d *StreamD) secretsProviderUpdater(ctx context.Context) (_err error) {
logger.Debugf(ctx, "secretsProviderUpdater")
defer logger.Debugf(ctx, "/secretsProviderUpdater: %v", _err)
cfgChangeCh, err := eventSubToChan[api.DiffConfig](ctx, d)
cfgChangeCh, err := eventSubToChan[api.DiffConfig](ctx, d, nil)
if err != nil {
return fmt.Errorf("unable to subscribe to config changes: %w", err)
}
@@ -573,8 +616,11 @@ func (d *StreamD) onUpdateConfig(
func (d *StreamD) IsBackendEnabled(
ctx context.Context,
id streamcontrol.PlatformName,
) (bool, error) {
return xsync.RDoR2(ctx, &d.ControllersLocker, func() (bool, error) {
) (_ret bool, _err error) {
logger.Tracef(ctx, "IsBackendEnabled(ctx, '%s')", id)
defer func() { logger.Tracef(ctx, "/IsBackendEnabled(ctx, '%s'): %v %v", id, _ret, _err) }()
return xsync.RDoR2(ctx, &d.ControllersLocker, func() (_ret bool, _err error) {
switch id {
case obs.ID:
return d.StreamControllers.OBS != nil, nil
@@ -758,7 +804,10 @@ func (d *StreamD) EndStream(ctx context.Context, platID streamcontrol.PlatformNa
func (d *StreamD) GetBackendInfo(
ctx context.Context,
platID streamcontrol.PlatformName,
) (*api.BackendInfo, error) {
) (_ret *api.BackendInfo, _err error) {
logger.Tracef(ctx, "GetBackendInfo(ctx, '%s')", platID)
defer func() { logger.Tracef(ctx, "/GetBackendInfo(ctx, '%s'): %v %v", platID, _ret, _err) }()
ctrl, err := d.streamController(ctx, platID)
if err != nil {
return nil, fmt.Errorf("unable to get stream controller for platform '%s': %w", platID, err)
@@ -1920,36 +1969,6 @@ func (d *StreamD) listTimers(
return result, nil
}
func (d *StreamD) SubscribeToChatMessages(
ctx context.Context,
) (<-chan api.ChatMessage, error) {
return eventSubToChan[api.ChatMessage](ctx, d)
}
func (d *StreamD) SendChatMessage(
ctx context.Context,
platID streamcontrol.PlatformName,
message string,
) (_err error) {
logger.Debugf(ctx, "SendChatMessage(ctx, '%s', '%s')", platID, message)
defer func() { logger.Debugf(ctx, "/SendChatMessage(ctx, '%s', '%s'): %v", platID, message, _err) }()
if message == "" {
return nil
}
ctrl, err := d.streamController(ctx, platID)
if err != nil {
return fmt.Errorf("unable to get stream controller for platform '%s': %w", platID, err)
}
err = ctrl.SendChatMessage(ctx, message)
if err != nil {
return fmt.Errorf("unable to send message '%s' to platform '%s': %w", message, platID, err)
}
return nil
}
func (d *StreamD) DialContext(
ctx context.Context,
network string,

View File

@@ -30,6 +30,7 @@ type chatUI struct {
List *widget.List
MessagesHistoryLocker sync.Mutex
MessagesHistory []api.ChatMessage
EnableButtons bool
CapabilitiesCacheLocker sync.Mutex
CapabilitiesCache map[streamcontrol.PlatformName]map[streamcontrol.Capability]struct{}
@@ -42,10 +43,15 @@ type chatUI struct {
func newChatUI(
ctx context.Context,
enableButtons bool,
panel *Panel,
) (*chatUI, error) {
) (_ret *chatUI, _err error) {
logger.Debugf(ctx, "newChatUI")
defer func() { logger.Debugf(ctx, "/newChatUI: %v %v", _ret, _err) }()
ui := &chatUI{
Panel: panel,
EnableButtons: enableButtons,
CapabilitiesCache: make(map[streamcontrol.PlatformName]map[streamcontrol.Capability]struct{}),
ctx: ctx,
}
@@ -57,16 +63,11 @@ func newChatUI(
func (ui *chatUI) init(
ctx context.Context,
) error {
ui.List = widget.NewList(ui.listLength, ui.listCreateItem, ui.listUpdateItem)
msgCh, err := ui.Panel.StreamD.SubscribeToChatMessages(ctx)
if err != nil {
return fmt.Errorf("unable to subscribe to chat messages: %w", err)
}
) (_err error) {
logger.Debugf(ctx, "init")
defer func() { logger.Debugf(ctx, "/init: %v", _err) }()
observability.Go(ctx, func() {
ui.messageReceiverLoop(ctx, msgCh)
})
ui.List = widget.NewList(ui.listLength, ui.listCreateItem, ui.listUpdateItem)
messageInputEntry := widget.NewEntry()
messageInputEntry.OnSubmitted = func(s string) {
@@ -79,26 +80,43 @@ func (ui *chatUI) init(
messageInputEntry.OnSubmitted(messageInputEntry.Text)
})
ui.CanvasObject = container.NewBorder(
nil,
container.NewBorder(
var buttonPanel fyne.CanvasObject
if ui.EnableButtons {
buttonPanel = container.NewBorder(
nil,
nil,
nil,
messageSendButton,
messageInputEntry,
),
)
}
ui.CanvasObject = container.NewBorder(
nil,
buttonPanel,
nil,
nil,
ui.List,
)
msgCh, err := ui.Panel.StreamD.SubscribeToChatMessages(ctx, time.Now().Add(-7*24*time.Hour))
if err != nil {
return fmt.Errorf("unable to subscribe to chat messages: %w", err)
}
observability.Go(ctx, func() {
time.Sleep(2 * time.Second) // TODO: delete this ugliness
ui.messageReceiverLoop(ctx, msgCh)
})
return nil
}
func (ui *chatUI) sendMessage(
ctx context.Context,
message string,
) error {
) (_err error) {
logger.Tracef(ctx, "sendMessage")
defer func() { logger.Tracef(ctx, "/sendMessage: %v", _err) }()
var result *multierror.Error
panel := ui.Panel
streamD := panel.StreamD
@@ -132,6 +150,8 @@ func (ui *chatUI) messageReceiverLoop(
ctx context.Context,
msgCh <-chan api.ChatMessage,
) {
logger.Tracef(ctx, "messageReceiverLoop")
defer func() { logger.Tracef(ctx, "/messageReceiverLoop") }()
for {
select {
case <-ctx.Done():
@@ -141,7 +161,7 @@ func (ui *chatUI) messageReceiverLoop(
logger.Errorf(ctx, "message channel got closed")
return
}
ui.onReceiveMessage(ctx, msg)
ui.onReceiveMessage(ctx, msg, false)
}
}
}
@@ -149,21 +169,38 @@ func (ui *chatUI) messageReceiverLoop(
func (ui *chatUI) onReceiveMessage(
ctx context.Context,
msg api.ChatMessage,
muteNotifications bool,
) {
logger.Debugf(ctx, "onReceiveMessage(ctx, %s)", spew.Sdump(msg))
defer func() { logger.Tracef(ctx, "/onReceiveMessage(ctx, %s)", spew.Sdump(msg)) }()
ui.MessagesHistoryLocker.Lock()
defer ui.MessagesHistoryLocker.Unlock()
ui.MessagesHistory = append(ui.MessagesHistory, msg)
observability.Go(ctx, func() {
ui.List.Refresh()
})
observability.Go(ctx, func() {
notificationsEnabled := xsync.DoR1(ctx, &ui.Panel.configLocker, func() bool {
return ui.Panel.Config.Chat.NotificationsEnabled()
observability.GoSafe(ctx, func() {
commandTemplate := xsync.DoR1(ctx, &ui.Panel.configLocker, func() string {
return ui.Panel.Config.Chat.CommandOnReceiveMessage
})
if !notificationsEnabled {
if commandTemplate == "" {
return
}
logger.Debugf(ctx, "CommandOnReceiveMessage: <%s>", commandTemplate)
defer logger.Debugf(ctx, "/CommandOnReceiveMessage")
ui.Panel.execCommand(ctx, commandTemplate, msg)
})
notificationsEnabled := xsync.DoR1(ctx, &ui.Panel.configLocker, func() bool {
return ui.Panel.Config.Chat.NotificationsEnabled()
})
if muteNotifications {
notificationsEnabled = false
}
if !notificationsEnabled {
return
}
observability.GoSafe(ctx, func() {
logger.Debugf(ctx, "SendNotification")
defer logger.Debugf(ctx, "/SendNotification")
ui.Panel.app.SendNotification(&fyne.Notification{
@@ -171,7 +208,7 @@ func (ui *chatUI) onReceiveMessage(
Content: msg.Username + ": " + msg.Message,
})
})
observability.Go(ctx, func() {
observability.GoSafe(ctx, func() {
soundEnabled := xsync.DoR1(ctx, &ui.Panel.configLocker, func() bool {
return ui.Panel.Config.Chat.ReceiveMessageSoundAlarmEnabled()
})
@@ -191,18 +228,6 @@ func (ui *chatUI) onReceiveMessage(
logger.Errorf(ctx, "unable to playback the chat message sound: %v", err)
}
})
observability.Go(ctx, func() {
commandTemplate := xsync.DoR1(ctx, &ui.Panel.configLocker, func() string {
return ui.Panel.Config.Chat.CommandOnReceiveMessage
})
if commandTemplate == "" {
return
}
logger.Debugf(ctx, "CommandOnReceiveMessage: <%s>", commandTemplate)
defer logger.Debugf(ctx, "/CommandOnReceiveMessage")
ui.Panel.execCommand(ctx, commandTemplate, msg)
})
}
func (ui *chatUI) listLength() int {
@@ -216,13 +241,17 @@ func (ui *chatUI) listCreateItem() fyne.CanvasObject {
removeMsgButton := widget.NewButtonWithIcon("", theme.DeleteIcon(), func() {})
label := widget.NewLabel("<...loading...>")
label.Wrapping = fyne.TextWrapWord
var leftPanel fyne.CanvasObject
if ui.EnableButtons {
leftPanel = container.NewHBox(
banUserButton,
removeMsgButton,
)
}
return container.NewBorder(
nil,
nil,
container.NewHBox(
banUserButton,
removeMsgButton,
),
leftPanel,
nil,
label,
)
@@ -241,6 +270,10 @@ func (ui *chatUI) getPlatformCapabilities(
return m, nil
}
if ui.Panel.StreamD == nil {
return nil, fmt.Errorf("ui.Panel.StreamD == nil")
}
info, err := ui.Panel.StreamD.GetBackendInfo(ctx, platID)
if err != nil {
return nil, fmt.Errorf("GetBackendInfo returned error: %w", err)
@@ -258,6 +291,10 @@ func (ui *chatUI) listUpdateItem(
ui.MessagesHistoryLocker.Lock()
defer ui.MessagesHistoryLocker.Unlock()
entryID := len(ui.MessagesHistory) - 1 - rowID
if entryID < 0 || entryID >= len(ui.MessagesHistory) {
logger.Errorf(ctx, "invalid entry ID: %d", entryID)
return
}
msg := ui.MessagesHistory[entryID]
platCaps, err := ui.getPlatformCapabilities(ctx, msg.Platform)

View File

@@ -60,10 +60,11 @@ func (p *Panel) getErrorReports() []errorReport {
func (p *Panel) statusPanelSet(text string) {
ctx := context.TODO()
p.statusPanelLocker.Do(ctx, func() {
if p.statusPanel == nil {
panel := p.statusPanel
if panel == nil {
return
}
p.statusPanel.SetText("status: " + text)
panel.SetText("status: " + text)
})
}

View File

@@ -22,6 +22,7 @@ import (
"fyne.io/fyne/v2/theme"
"fyne.io/fyne/v2/widget"
child_process_manager "github.com/AgustinSRG/go-child-process-manager"
"github.com/facebookincubator/go-belt"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/hashicorp/go-multierror"
"github.com/tiendc/go-deepcopy"
@@ -231,7 +232,10 @@ func (p *Panel) dumpConfig(ctx context.Context) {
logger.Tracef(ctx, "the current config is: %s", buf.String())
}
func (p *Panel) Loop(ctx context.Context, opts ...LoopOption) error {
func (p *Panel) Loop(ctx context.Context, opts ...LoopOption) (_err error) {
logger.Debugf(ctx, "Loop")
defer func() { logger.Debugf(ctx, "/Loop: %v (ctx:%v)", _err, ctx.Err()) }()
if p.defaultContext != nil {
return fmt.Errorf("Loop was already used, and cannot be used the second time")
}
@@ -249,6 +253,7 @@ func (p *Panel) Loop(ctx context.Context, opts ...LoopOption) error {
p.app = fyneapp.New()
p.app.Driver().SetDisableScreenBlanking(true)
logger.Tracef(ctx, "SetDisableScreenBlanking(true)")
p.createMainWindow(ctx)
var loadingWindow fyne.Window
if ignoreError(p.GetConfig(ctx)).RemoteStreamDAddr == "" {
@@ -353,6 +358,7 @@ func (p *Panel) Loop(ctx context.Context, opts ...LoopOption) error {
})
p.app.Run()
logger.Infof(ctx, "p.app.Run finished")
return nil
}
@@ -1250,15 +1256,30 @@ func (p *Panel) getUpdatedStatus_startStopStreamButton_noLock(ctx context.Contex
}
}
func (p *Panel) createMainWindow(
ctx context.Context,
) {
logger.Debugf(ctx, "createMainWindow")
defer belt.Flush(ctx)
defer func() { logger.Debugf(ctx, "/createMainWindow") }()
w := p.newPermanentWindow(ctx, gconsts.AppName)
w.SetCloseIntercept(func() {
logger.Debugf(ctx, "main window 'close' was clicked")
})
w.SetMaster()
p.mainWindow = w
}
func (p *Panel) initMainWindow(
ctx context.Context,
startingPage consts.Page,
) {
logger.Debugf(ctx, "initMainWindow")
defer logger.Debugf(ctx, "/initMainWindow")
defer belt.Flush(ctx)
defer func() { logger.Debugf(ctx, "/initMainWindow") }()
w := p.newPermanentWindow(ctx, gconsts.AppName)
w.SetCloseIntercept(func() {})
w := p.mainWindow
menu := fyne.NewMainMenu(fyne.NewMenu("Main",
fyne.NewMenuItem("Settings", func() {
err := p.openSettingsWindow(ctx)
@@ -1301,13 +1322,13 @@ func (p *Panel) initMainWindow(
}),
fyne.NewMenuItemSeparator(),
fyne.NewMenuItem("Quit", func() {
logger.Debugf(ctx, "Quit was clicked")
p.app.Quit()
w.Close()
}),
))
w.SetMainMenu(menu)
p.mainWindow = w
w.SetMaster()
resizeWindow(w, fyne.NewSize(600, 1000))
profileFilter := widget.NewEntry()
@@ -1637,7 +1658,7 @@ func (p *Panel) initMainWindow(
)
chatPage := container.NewBorder(nil, nil, nil, nil)
chatUI, err := newChatUI(ctx, p)
chatUI, err := newChatUI(ctx, true, p)
if err != nil {
logger.Errorf(ctx, "unable to initialize the page for chat: %v", err)
} else {
@@ -2320,7 +2341,10 @@ func (p *Panel) showWaitStreamDConnectWindow(ctx context.Context) {
})
}
func (p *Panel) Close() error {
func (p *Panel) Close() (_err error) {
ctx := context.TODO()
logger.Debugf(ctx, "Close()")
defer func() { logger.Debugf(ctx, "/Close(): %v", _err) }()
var err *multierror.Error
err = multierror.Append(err, p.eventSensor.Close())
// TODO: remove observability.Go, Quit should be executed synchronously,