diff --git a/pkg/streamcontrol/config.go b/pkg/streamcontrol/config.go index fbb416b..9ef9411 100644 --- a/pkg/streamcontrol/config.go +++ b/pkg/streamcontrol/config.go @@ -287,6 +287,9 @@ func ConvertPlatformConfig[T any, S StreamProfile]( ctx context.Context, platCfg *AbstractPlatformConfig, ) *PlatformConfig[T, S] { + if platCfg == nil { + platCfg = &AbstractPlatformConfig{} + } return &PlatformConfig[T, S]{ Enable: platCfg.Enable, Config: GetPlatformSpecificConfig[T](ctx, platCfg.Config), @@ -299,6 +302,10 @@ func GetPlatformSpecificConfig[T any]( ctx context.Context, platCfgCfg any, ) T { + if platCfgCfg == nil { + var zeroValue T + return zeroValue + } switch platCfgCfg := platCfgCfg.(type) { case T: return platCfgCfg diff --git a/pkg/streamcontrol/kick/chat_handler.go b/pkg/streamcontrol/kick/chat_handler.go index 87f907c..2480f85 100644 --- a/pkg/streamcontrol/kick/chat_handler.go +++ b/pkg/streamcontrol/kick/chat_handler.go @@ -57,6 +57,21 @@ func NewChatHandler( close(h.messagesOutChan) }() + h.sendMessage(kickcom.ChatMessageV2{ + ID: "", + ChatID: 0, + UserID: 0, + Content: "test\nmultiline message", + Type: "", + Metadata: nil, + CreatedAt: time.Time{}, + Sender: kickcom.ChatMessageSenderV2{ + ID: 0, + Slug: "", + Username: "test-user", + Identity: kickcom.Identity{}, + }, + }) err := h.iterate(ctx) if err != nil { logger.Errorf(ctx, "unable to perform an iteration: %w", err) diff --git a/pkg/streamcontrol/twitch/chat_handler.go b/pkg/streamcontrol/twitch/chat_handler.go index 2b215eb..c6a329e 100644 --- a/pkg/streamcontrol/twitch/chat_handler.go +++ b/pkg/streamcontrol/twitch/chat_handler.go @@ -3,6 +3,7 @@ package twitch import ( "context" "fmt" + "time" "github.com/adeithe/go-twitch/irc" "github.com/xaionaro-go/streamctl/pkg/observability" @@ -47,6 +48,13 @@ func newChatHandler( messagesOutChan: make(chan streamcontrol.ChatMessage, 100), } + h.messagesOutChan <- streamcontrol.ChatMessage{ + CreatedAt: time.Now(), + UserID: "test-twitch-user", + MessageID: "test-message-id", + Message: "test\nmultiline message", + } + observability.Go(ctx, func() { defer func() { h.client.Close() diff --git a/pkg/streamd/chat.go b/pkg/streamd/chat.go index d44ec36..f9d358a 100644 --- a/pkg/streamd/chat.go +++ b/pkg/streamd/chat.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/facebookincubator/go-belt/tool/logger" "github.com/xaionaro-go/streamctl/pkg/observability" "github.com/xaionaro-go/streamctl/pkg/streamcontrol" "github.com/xaionaro-go/streamctl/pkg/streamd/api" @@ -14,6 +15,7 @@ func (d *StreamD) startListeningForChatMessages( ctx context.Context, platName streamcontrol.PlatformName, ) error { + logger.Debugf(ctx, "startListeningForChatMessages(ctx, '%s')", platName) ctrl, err := d.streamController(ctx, platName) if err != nil { return fmt.Errorf("unable to get the just initialized '%s': %w", platName, err) @@ -23,6 +25,7 @@ func (d *StreamD) startListeningForChatMessages( return fmt.Errorf("unable to get the channel for chat messages of '%s': %w", platName, err) } observability.Go(ctx, func() { + logger.Debugf(ctx, "/startListeningForChatMessages(ctx, '%s')", platName) for { select { case <-ctx.Done(): diff --git a/pkg/streamd/stream_controller.go b/pkg/streamd/stream_controller.go index 846748b..c331794 100644 --- a/pkg/streamd/stream_controller.go +++ b/pkg/streamd/stream_controller.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "sort" "strings" "time" @@ -22,16 +21,16 @@ import ( "github.com/xaionaro-go/streamctl/pkg/xsync" ) -func (d *StreamD) EXPERIMENTAL_ReinitStreamControllers(ctx context.Context) error { - platNames := make([]streamcontrol.PlatformName, 0, len(d.Config.Backends)) - for platName := range d.Config.Backends { - platNames = append(platNames, platName) - } - sort.Slice(platNames, func(i, j int) bool { - return platNames[i] < platNames[j] - }) +func (d *StreamD) EXPERIMENTAL_ReinitStreamControllers(ctx context.Context) (_err error) { + logger.Debugf(ctx, "ReinitStreamControllers") + defer func() { logger.Debugf(ctx, "/ReinitStreamControllers: %v", _err) }() var result *multierror.Error - for _, platName := range platNames { + for _, platName := range []streamcontrol.PlatformName{ + youtube.ID, + twitch.ID, + kick.ID, + obs.ID, + } { var err error switch strings.ToLower(string(platName)) { case strings.ToLower(string(obs.ID)): @@ -76,7 +75,9 @@ func newOBS( ) { logger.Debugf(ctx, "newOBS(ctx, %#+v, ...)", cfg) defer func() { logger.Debugf(ctx, "/newOBS: %v", _err) }() - + if cfg == nil { + cfg = &streamcontrol.AbstractPlatformConfig{} + } platCfg := streamcontrol.ConvertPlatformConfig[ obs.PlatformSpecificConfig, obs.StreamProfile, ]( @@ -139,6 +140,9 @@ func newTwitch( *twitch.Twitch, error, ) { + if cfg == nil { + cfg = &streamcontrol.AbstractPlatformConfig{} + } platCfg := streamcontrol.ConvertPlatformConfig[twitch.PlatformSpecificConfig, twitch.StreamProfile]( ctx, cfg, @@ -210,6 +214,9 @@ func newKick( *kick.Kick, error, ) { + if cfg == nil { + cfg = &streamcontrol.AbstractPlatformConfig{} + } platCfg := streamcontrol.ConvertPlatformConfig[kick.PlatformSpecificConfig, kick.StreamProfile]( ctx, cfg, @@ -222,6 +229,27 @@ func newKick( return nil, ErrSkipBackend } + hadSetNewUserData := false + if platCfg.Config.Channel == "" { + ok, err := setUserData(ctx, platCfg) + if !ok { + err := saveCfgFunc(&streamcontrol.AbstractPlatformConfig{ + Enable: platCfg.Enable, + Config: platCfg.Config, + StreamProfiles: streamcontrol.ToAbstractStreamProfiles(platCfg.StreamProfiles), + Custom: platCfg.Custom, + }) + if err != nil { + logger.Error(ctx, err) + } + return nil, ErrSkipBackend + } + if err != nil { + return nil, fmt.Errorf("unable to set user info: %w", err) + } + hadSetNewUserData = true + } + logger.Debugf(ctx, "kick config: %#+v", platCfg) cfg = streamcontrol.ToAbstractPlatformConfig(ctx, platCfg) kick, err := kick.New(ctx, *platCfg, @@ -237,6 +265,12 @@ func newKick( if err != nil { return nil, fmt.Errorf("unable to initialize Kick client: %w", err) } + if hadSetNewUserData { + logger.Debugf(ctx, "confirmed new youtube user data, saving it") + if err := saveCfgFunc(cfg); err != nil { + return nil, fmt.Errorf("unable to save the configuration: %w", err) + } + } return kick, nil } @@ -251,6 +285,9 @@ func newYouTube( *youtube.YouTube, error, ) { + if cfg == nil { + cfg = &streamcontrol.AbstractPlatformConfig{} + } platCfg := streamcontrol.ConvertPlatformConfig[youtube.PlatformSpecificConfig, youtube.StreamProfile]( ctx, cfg, diff --git a/pkg/streamd/streamd.go b/pkg/streamd/streamd.go index a6f843f..0436afa 100644 --- a/pkg/streamd/streamd.go +++ b/pkg/streamd/streamd.go @@ -318,6 +318,9 @@ func (d *StreamD) setPlatformConfig( logger.Debugf(ctx, "setPlatformConfig('%s', '%#+v')", platID, platCfg) defer logger.Debugf(ctx, "endof setPlatformConfig('%s', '%#+v')", platID, platCfg) return xsync.DoR1(ctx, &d.ConfigLock, func() error { + if d.Config.Backends == nil { + d.Config.Backends = make(streamcontrol.Config) + } d.Config.Backends[platID] = platCfg return d.SaveConfig(ctx) })