diff --git a/pkg/streamd/streamd.go b/pkg/streamd/streamd.go index 8abec1a..a74e483 100644 --- a/pkg/streamd/streamd.go +++ b/pkg/streamd/streamd.go @@ -32,6 +32,7 @@ import ( "github.com/xaionaro-go/streamctl/pkg/streamplayer" sptypes "github.com/xaionaro-go/streamctl/pkg/streamplayer/types" "github.com/xaionaro-go/streamctl/pkg/streamserver" + streamserverimpl "github.com/xaionaro-go/streamctl/pkg/streamserver/implementations/yutopp-go-rtmp/streamserver" "github.com/xaionaro-go/streamctl/pkg/streamserver/types" "github.com/xaionaro-go/streamctl/pkg/streamtypes" "github.com/xaionaro-go/streamctl/pkg/xpath" @@ -156,7 +157,25 @@ func (d *StreamD) initStreamServer(ctx context.Context) error { d.StreamServer = streamserver.New(&d.Config.StreamServer, newPlatformsControllerAdapter(d.StreamControllers)) assert(d.StreamServer != nil) defer d.notifyAboutChange(ctx, events.StreamServersChange) - return d.StreamServer.Init(ctx) + return d.StreamServer.Init(ctx, streamserverimpl.InitOptionDefaultStreamPlayerOptions(d.streamPlayerOptions())) +} + +func (d *StreamD) streamPlayerOptions() sptypes.Options { + return sptypes.Options{ + sptypes.OptionNotifierStart{ + d.notifyStreamPlayerStart, + }, + } +} + +func (d *StreamD) notifyStreamPlayerStart( + ctx context.Context, + streamID streamtypes.StreamID, +) { + logger.Debugf(ctx, "notifyStreamPlayerStart") + defer logger.Debugf(ctx, "/notifyStreamPlayerStart") + + d.notifyAboutChange(ctx, events.StreamPlayersChange) } func (d *StreamD) readCache(ctx context.Context) error { @@ -1257,6 +1276,7 @@ func (d *StreamD) AddStreamPlayer( playerType, disabled, streamPlaybackConfig, + streamserverimpl.AddStreamPlayerOptionDefaultStreamPlayerOptions(d.streamPlayerOptions()), )) result = multierror.Append(result, d.SaveConfig(ctx)) return result.ErrorOrNil() @@ -1454,9 +1474,11 @@ func (d *StreamD) StreamPlayerClose( } func (d *StreamD) notifyAboutChange( - _ context.Context, + ctx context.Context, topic events.Event, ) { + logger.Debugf(ctx, "notifyAboutChange(ctx, '%s')", topic) + defer logger.Debugf(ctx, "/notifyAboutChange(ctx, '%s')", topic) d.EventBus.Publish(topic) } diff --git a/pkg/streampanel/monitor.go b/pkg/streampanel/monitor.go index 29ae9b4..72285ab 100644 --- a/pkg/streampanel/monitor.go +++ b/pkg/streampanel/monitor.go @@ -26,10 +26,6 @@ func (p *Panel) startMonitorPage( logger.Debugf(ctx, "startMonitorPage") defer logger.Debugf(ctx, "/startMonitorPage") - p.monitorPageUpdaterLocker.Lock() - defer p.monitorPageUpdaterLocker.Unlock() - ctx, cancelFn := context.WithCancel(ctx) - p.monitorPageUpdaterCancel = cancelFn go func(ctx context.Context) { p.updateMonitorPageImages(ctx) p.updateMonitorPageStreamStatus(ctx) @@ -62,23 +58,6 @@ func (p *Panel) startMonitorPage( }(ctx) } -func (p *Panel) stopMonitorPage( - ctx context.Context, -) { - logger.Debugf(ctx, "stopMonitorPage") - defer logger.Debugf(ctx, "/stopMonitorPage") - - p.monitorPageUpdaterLocker.Lock() - defer p.monitorPageUpdaterLocker.Unlock() - - if p.monitorPageUpdaterCancel == nil { - return - } - - p.monitorPageUpdaterCancel() - p.monitorPageUpdaterCancel = nil -} - func (p *Panel) updateMonitorPageImages( ctx context.Context, ) { @@ -97,9 +76,6 @@ func (p *Panel) updateMonitorPageImages( logger.Debugf(ctx, "window size changed %#+v -> %#+v", lastWinSize, winSize) } - p.monitorPageUpdaterLocker.Lock() - defer p.monitorPageUpdaterLocker.Unlock() - var wg sync.WaitGroup wg.Add(1) @@ -158,9 +134,6 @@ func (p *Panel) updateMonitorPageStreamStatus( logger.Tracef(ctx, "updateMonitorPageStreamStatus") defer logger.Tracef(ctx, "/updateMonitorPageStreamStatus") - p.monitorPageUpdaterLocker.Lock() - defer p.monitorPageUpdaterLocker.Unlock() - var wg sync.WaitGroup for _, platID := range []streamcontrol.PlatformName{ obs.ID, diff --git a/pkg/streampanel/panel.go b/pkg/streampanel/panel.go index 2548812..6804964 100644 --- a/pkg/streampanel/panel.go +++ b/pkg/streampanel/panel.go @@ -85,12 +85,10 @@ type Panel struct { streamTitleField *widget.Entry streamDescriptionField *widget.Entry - monitorPage *fyne.Container - monitorPageUpdaterLocker sync.Mutex - monitorPageUpdaterCancel context.CancelFunc - monitorLastWinSize fyne.Size - screenshotContainer *fyne.Container - chatContainer *fyne.Container + monitorPage *fyne.Container + monitorLastWinSize fyne.Size + screenshotContainer *fyne.Container + chatContainer *fyne.Container streamStatus map[streamcontrol.PlatformName]*widget.Label @@ -115,9 +113,6 @@ type Panel struct { lastDisplayedError error - restreamPageUpdaterLocker sync.Mutex - restreamPageUpdaterCancel context.CancelFunc - streamServersWidget *fyne.Container streamsWidget *fyne.Container destinationsWidget *fyne.Container @@ -128,6 +123,13 @@ type Panel struct { previousNumBytes map[any][4]uint64 previousNumBytesTS map[any]time.Time + streamServersLocker sync.Mutex + streamServersUpdaterCanceller context.CancelFunc + streamForwardersLocker sync.Mutex + streamForwardersUpdaterCanceller context.CancelFunc + streamPlayersLocker sync.Mutex + streamPlayersUpdaterCanceller context.CancelFunc + obsSelectScene *widget.Select } @@ -1697,17 +1699,18 @@ func (p *Panel) initMainWindow( ), ) + var cancelPage context.CancelFunc setPage := func(page consts.Page) { logger.Debugf(ctx, "setPage(%s)", page) defer logger.Debugf(ctx, "/setPage(%s)", page) - if page != consts.PageMonitor { - p.stopMonitorPage(ctx) - } - if page != consts.PageMonitor { - p.stopRestreamPage(ctx) + if cancelPage != nil { + cancelPage() } + var pageCtx context.Context + pageCtx, cancelPage = context.WithCancel(ctx) + switch page { case consts.PageControl: p.monitorPage.Hide() @@ -1721,7 +1724,7 @@ func (p *Panel) initMainWindow( restreamPage.Hide() obsPage.Hide() p.monitorPage.Show() - p.startMonitorPage(ctx) + p.startMonitorPage(pageCtx) case consts.PageOBS: controlPage.Hide() profileControl.Hide() @@ -1734,7 +1737,7 @@ func (p *Panel) initMainWindow( p.monitorPage.Hide() obsPage.Hide() restreamPage.Show() - p.startRestreamPage(ctx) + p.startRestreamPage(pageCtx) } } @@ -1770,21 +1773,23 @@ func (p *Panel) initMainWindow( } func (p *Panel) subscribeUpdateControlPage(ctx context.Context) { + logger.Debugf(ctx, "subscribe to streams and config changes") + defer logger.Debugf(ctx, "/subscribe to streams and config changes") + + chStreams, err := p.StreamD.SubscribeToStreamsChanges(ctx) + if err != nil { + p.DisplayError(err) + //return + } + chConfigs, err := p.StreamD.SubscribeToConfigChanges(ctx) + if err != nil { + p.DisplayError(err) + //return + } + + p.getUpdatedStatus(ctx) + go func() { - logger.Debugf(ctx, "subscribe to streams and config changes") - defer logger.Debugf(ctx, "/subscribe to streams and config changes") - - chStreams, err := p.StreamD.SubscribeToStreamsChanges(ctx) - if err != nil { - p.DisplayError(err) - //return - } - chConfigs, err := p.StreamD.SubscribeToConfigChanges(ctx) - if err != nil { - p.DisplayError(err) - //return - } - for { select { case <-ctx.Done(): @@ -2183,7 +2188,7 @@ func (p *Panel) profileWindow( ) w := p.app.NewWindow(windowName) - resizeWindow(w, fyne.NewSize(400, 300)) + resizeWindow(w, fyne.NewSize(1500, 1000)) profileName := widget.NewEntry() profileName.SetPlaceHolder("profile name") profileName.SetText(string(values.Name)) diff --git a/pkg/streampanel/restream.go b/pkg/streampanel/restream.go index 688d88a..c49a2f3 100644 --- a/pkg/streampanel/restream.go +++ b/pkg/streampanel/restream.go @@ -29,14 +29,7 @@ func (p *Panel) startRestreamPage( logger.Debugf(ctx, "startRestreamPage") defer logger.Debugf(ctx, "/startRestreamPage") - p.restreamPageUpdaterLocker.Lock() - defer p.restreamPageUpdaterLocker.Unlock() - - ctx, cancelFn := context.WithCancel(ctx) - p.restreamPageUpdaterCancel = cancelFn - p.initRestreamPage(ctx) - } func (p *Panel) initRestreamPage( @@ -193,7 +186,6 @@ func (p *Panel) openAddStreamServerWindow(ctx context.Context) { return } w.Close() - p.initRestreamPage(ctx) }) }) @@ -233,6 +225,8 @@ func (p *Panel) displayStreamServers( logger.Debugf(ctx, "displayStreamServers") defer logger.Debugf(ctx, "/displayStreamServers") + hasDynamicValue := false + var objs []fyne.CanvasObject for idx, srv := range streamServers { logger.Tracef(ctx, "streamServer[%3d] == %#+v", idx, srv) @@ -254,7 +248,6 @@ func (p *Panel) displayStreamServers( return } }) - p.initRestreamPage(ctx) }, p.mainWindow, ) @@ -273,7 +266,9 @@ func (p *Panel) displayStreamServers( p.previousNumBytesLocker.Lock() prevNumBytes := p.previousNumBytes[key] now := time.Now() - bwText := widget.NewRichTextWithText(bwString(srv.NumBytesProducerRead, prevNumBytes[0], srv.NumBytesConsumerWrote, prevNumBytes[1], now, p.previousNumBytesTS[key])) + bwStr := bwString(srv.NumBytesProducerRead, prevNumBytes[0], srv.NumBytesConsumerWrote, prevNumBytes[1], now, p.previousNumBytesTS[key]) + bwText := widget.NewRichTextWithText(bwStr) + hasDynamicValue = hasDynamicValue || bwStr != "" p.previousNumBytes[key] = [4]uint64{srv.NumBytesProducerRead, srv.NumBytesConsumerWrote} p.previousNumBytesTS[key] = now p.previousNumBytesLocker.Unlock() @@ -283,6 +278,20 @@ func (p *Panel) displayStreamServers( } p.streamServersWidget.Objects = objs p.streamServersWidget.Refresh() + + p.streamServersLocker.Lock() + defer p.streamServersLocker.Unlock() + cancelFn := p.streamServersUpdaterCanceller + if hasDynamicValue { + if cancelFn == nil { + p.streamServersUpdaterCanceller = p.streamServersUpdater(ctx) + } + } else { + if cancelFn != nil { + cancelFn() + p.streamServersUpdaterCanceller = nil + } + } } func bwString( @@ -328,7 +337,6 @@ func (p *Panel) openAddStreamWindow(ctx context.Context) { return } w.Close() - p.initRestreamPage(ctx) }) }) @@ -384,12 +392,10 @@ func (p *Panel) displayIncomingServers( return } }) - p.initRestreamPage(ctx) }, p.mainWindow, ) w.Show() - p.initRestreamPage(ctx) }) label := widget.NewLabel(string(stream.StreamID)) c.RemoveAll() @@ -419,7 +425,6 @@ func (p *Panel) openAddDestinationWindow(ctx context.Context) { return } w.Close() - p.initRestreamPage(ctx) }) }) @@ -473,12 +478,10 @@ func (p *Panel) displayStreamDestinations( return } }) - p.initRestreamPage(ctx) }, p.mainWindow, ) w.Show() - p.initRestreamPage(ctx) }) label := widget.NewLabel(string(dst.ID) + ": " + string(dst.URL)) objs = append(objs, container.NewHBox( @@ -643,7 +646,6 @@ func (p *Panel) openAddOrEditPlayerWindow( return } w.Close() - p.initRestreamPage(ctx) }) }) @@ -681,6 +683,8 @@ func (p *Panel) displayStreamPlayers( logger.Debugf(ctx, "displayStreamPlayers") defer logger.Debugf(ctx, "/displayStreamPlayers") + hasDynamicValue := false + sort.Slice(players, func(i, j int) bool { return players[i].StreamID < players[j].StreamID }) @@ -706,12 +710,10 @@ func (p *Panel) displayStreamPlayers( return } }) - p.initRestreamPage(ctx) }, p.mainWindow, ) w.Show() - p.initRestreamPage(ctx) }) editButton := widget.NewButtonWithIcon("", theme.ListIcon(), func() { p.openEditPlayerWindow(ctx, player.StreamID) @@ -747,12 +749,10 @@ func (p *Panel) displayStreamPlayers( return } }) - p.initRestreamPage(ctx) }, p.mainWindow, ) w.Show() - p.initRestreamPage(ctx) }) caption := widget.NewLabel(string(player.StreamID) + " (" + string(player.PlayerType) + ")") c.RemoveAll() @@ -766,13 +766,30 @@ func (p *Panel) displayStreamPlayers( logger.Errorf(ctx, "unable to get the current position at player '%s': %v", player.StreamID, err) } else { c.Add(widget.NewSeparator()) - c.Add(widget.NewLabel(pos.String())) + posStr := pos.String() + posLabel := widget.NewLabel(posStr) + hasDynamicValue = hasDynamicValue || posStr != "" + c.Add(posLabel) } } objs = append(objs, c) } p.playersWidget.Objects = objs p.playersWidget.Refresh() + + p.streamPlayersLocker.Lock() + defer p.streamPlayersLocker.Unlock() + cancelFn := p.streamPlayersUpdaterCanceller + if hasDynamicValue { + if cancelFn == nil { + p.streamPlayersUpdaterCanceller = p.startStreamPlayersUpdater(ctx) + } + } else { + if cancelFn != nil { + cancelFn() + p.streamPlayersUpdaterCanceller = nil + } + } } func (p *Panel) openAddRestreamWindow(ctx context.Context) { @@ -846,7 +863,6 @@ func (p *Panel) openAddRestreamWindow(ctx context.Context) { return } w.Close() - p.initRestreamPage(ctx) }) }) @@ -895,6 +911,8 @@ func (p *Panel) displayStreamForwards( logger.Debugf(ctx, "displayStreamForwards") defer logger.Debugf(ctx, "/displayStreamForwards") + hasDynamicValue := false + sort.Slice(fwds, func(i, j int) bool { if fwds[i].StreamID != fwds[j].StreamID { return fwds[i].StreamID < fwds[j].StreamID @@ -924,12 +942,10 @@ func (p *Panel) displayStreamForwards( return } }) - p.initRestreamPage(ctx) }, p.mainWindow, ) w.Show() - p.initRestreamPage(ctx) }) icon := theme.MediaPauseIcon() label := "Pause" @@ -962,12 +978,10 @@ func (p *Panel) displayStreamForwards( return } }) - p.initRestreamPage(ctx) }, p.mainWindow, ) w.Show() - p.initRestreamPage(ctx) }) caption := widget.NewLabel(string(fwd.StreamID) + " -> " + string(fwd.DestinationID)) c.RemoveAll() @@ -985,7 +999,9 @@ func (p *Panel) displayStreamForwards( now := time.Now() p.previousNumBytesLocker.Lock() prevNumBytes := p.previousNumBytes[key] - bwText := widget.NewRichTextWithText(bwString(fwd.NumBytesRead, prevNumBytes[0], fwd.NumBytesWrote, prevNumBytes[1], now, p.previousNumBytesTS[key])) + bwStr := bwString(fwd.NumBytesRead, prevNumBytes[0], fwd.NumBytesWrote, prevNumBytes[1], now, p.previousNumBytesTS[key]) + bwText := widget.NewRichTextWithText(bwStr) + hasDynamicValue = hasDynamicValue || bwStr != "" p.previousNumBytes[key] = [4]uint64{fwd.NumBytesRead, fwd.NumBytesWrote} p.previousNumBytesTS[key] = now p.previousNumBytesLocker.Unlock() @@ -996,21 +1012,129 @@ func (p *Panel) displayStreamForwards( } p.restreamsWidget.Objects = objs p.restreamsWidget.Refresh() -} -func (p *Panel) stopRestreamPage( - ctx context.Context, -) { - logger.Debugf(ctx, "stopRestreamPage") - defer logger.Debugf(ctx, "/stopRestreamPage") - - p.restreamPageUpdaterLocker.Lock() - defer p.restreamPageUpdaterLocker.Unlock() - - if p.restreamPageUpdaterCancel == nil { - return + p.streamForwardersLocker.Lock() + defer p.streamForwardersLocker.Unlock() + cancelFn := p.streamForwardersUpdaterCanceller + if hasDynamicValue { + if cancelFn == nil { + p.streamForwardersUpdaterCanceller = p.startStreamForwardersUpdater(ctx) + } + } else { + if cancelFn != nil { + cancelFn() + p.streamForwardersUpdaterCanceller = nil + } } - - p.restreamPageUpdaterCancel() - p.restreamPageUpdaterCancel = nil +} + +func (p *Panel) streamServersUpdater( + ctx context.Context, +) context.CancelFunc { + ctx, cancelFn := context.WithCancel(ctx) + go func() { + updateData := func() { + streamServers, err := p.StreamD.ListStreamServers(ctx) + if err != nil { + p.DisplayError(err) + } else { + p.displayStreamServers(ctx, streamServers) + } + } + + defer func() { + p.streamServersLocker.Lock() + defer p.streamServersLocker.Unlock() + p.streamServersUpdaterCanceller = nil + go updateData() + }() + + logger.Debugf(ctx, "streamServersUpdater") + defer logger.Debugf(ctx, "/streamServersUpdater") + + t := time.NewTicker(time.Second) + for { + select { + case <-ctx.Done(): + return + case <-t.C: + } + updateData() + } + }() + return cancelFn +} + +func (p *Panel) startStreamPlayersUpdater( + ctx context.Context, +) context.CancelFunc { + ctx, cancelFn := context.WithCancel(ctx) + go func() { + updateData := func() { + streamPlayers, err := p.StreamD.ListStreamPlayers(ctx) + if err != nil { + p.DisplayError(err) + } else { + p.displayStreamPlayers(ctx, streamPlayers) + } + } + + defer func() { + p.streamPlayersLocker.Lock() + defer p.streamPlayersLocker.Unlock() + p.streamPlayersUpdaterCanceller = nil + go updateData() + }() + + logger.Debugf(ctx, "streamPlayersUpdater") + defer logger.Debugf(ctx, "/streamPlayersUpdater") + + t := time.NewTicker(time.Second) + for { + select { + case <-ctx.Done(): + return + case <-t.C: + } + updateData() + } + }() + return cancelFn +} + +func (p *Panel) startStreamForwardersUpdater( + ctx context.Context, +) context.CancelFunc { + ctx, cancelFn := context.WithCancel(ctx) + go func() { + updateData := func() { + streamFwds, err := p.StreamD.ListStreamForwards(ctx) + if err != nil { + p.DisplayError(err) + } else { + p.displayStreamForwards(ctx, streamFwds) + } + } + + defer func() { + p.streamForwardersLocker.Lock() + defer p.streamForwardersLocker.Unlock() + p.streamForwardersUpdaterCanceller = nil + go updateData() + }() + + logger.Debugf(ctx, "streamForwardersUpdater") + defer logger.Debugf(ctx, "/streamForwardersUpdater") + + t := time.NewTicker(time.Second) + for { + select { + case <-ctx.Done(): + return + case <-t.C: + } + updateData() + } + }() + return cancelFn } diff --git a/pkg/streamplayer/stream_player.go b/pkg/streamplayer/stream_player.go index 28f0874..4f33897 100644 --- a/pkg/streamplayer/stream_player.go +++ b/pkg/streamplayer/stream_player.go @@ -233,6 +233,24 @@ func (p *StreamPlayer) Resetup(opts ...Option) { } } +func (p *StreamPlayer) notifyStart(ctx context.Context) { + logger.Debugf(ctx, "notifyStart") + defer logger.Debugf(ctx, "/notifyStart") + + for _, f := range p.Config.NotifierStart { + func(f FuncNotifyStart) { + defer func() { + r := recover() + if r != nil { + logger.Error(ctx, "got panic during notification about a start: %v", r) + } + }() + + f(ctx, p.StreamID) + }(f) + } +} + func (p *StreamPlayer) controllerLoop(ctx context.Context) { logger.Debugf(ctx, "StreamPlayer[%s].controllerLoop", p.StreamID) defer logger.Debugf(ctx, "/StreamPlayer[%s].controllerLoop", p.StreamID) @@ -269,9 +287,9 @@ func (p *StreamPlayer) controllerLoop(ctx context.Context) { for time.Since(startedWaitingForBuffering) <= p.Config.StartTimeout { pos, err := p.Player.GetPosition(ctx) if err != nil { - logger.Errorf(ctx, "StreamPlayer[%s].controllerLoop: unable to get the current position: %v", p.StreamID, err) - time.Sleep(time.Second) - return true + logger.Tracef(ctx, "StreamPlayer[%s].controllerLoop: unable to get the current position: %v", p.StreamID, err) + time.Sleep(100 * time.Millisecond) + continue } logger.Tracef(ctx, "StreamPlayer[%s].controllerLoop: pos == %v", p.StreamID, pos) if pos != 0 { @@ -284,6 +302,11 @@ func (p *StreamPlayer) controllerLoop(ctx context.Context) { } } + go func() { + time.Sleep(time.Second) // TODO: delete this ugly racy hack + p.notifyStart(context.WithValue(ctx, CtxKeyStreamPlayer, p)) + }() + logger.Debugf(ctx, "finished waiting for a publisher at '%s'", p.StreamID) t := time.NewTicker(100 * time.Millisecond) diff --git a/pkg/streamplayer/types.go b/pkg/streamplayer/types.go index 9d6e484..895022c 100644 --- a/pkg/streamplayer/types.go +++ b/pkg/streamplayer/types.go @@ -7,3 +7,9 @@ import ( type Config = types.Config type Option = types.Option type Options = types.Options +type FuncNotifyStart = types.FuncNotifyStart +type CtxKey = types.CtxKey + +const ( + CtxKeyStreamPlayer = types.CtxKeyStreamPlayer +) diff --git a/pkg/streamplayer/types/config.go b/pkg/streamplayer/types/config.go index 184a0ae..946554a 100644 --- a/pkg/streamplayer/types/config.go +++ b/pkg/streamplayer/types/config.go @@ -3,14 +3,19 @@ package types import ( "context" "time" + + "github.com/xaionaro-go/streamctl/pkg/streamtypes" ) +type FuncNotifyStart func(ctx context.Context, streamID streamtypes.StreamID) + type Config struct { JitterBufDuration time.Duration CatchupMaxSpeedFactor float64 MaxCatchupAtLag time.Duration StartTimeout time.Duration ReadTimeout time.Duration + NotifierStart []FuncNotifyStart } func (cfg Config) Options() Options { @@ -21,15 +26,18 @@ func (cfg Config) Options() Options { if cfg.CatchupMaxSpeedFactor != 0 { opts = append(opts, OptionCatchupMaxSpeedFactor(cfg.CatchupMaxSpeedFactor)) } - if cfg.CatchupMaxSpeedFactor != 0 { + if cfg.MaxCatchupAtLag != 0 { opts = append(opts, OptionMaxCatchupAtLag(cfg.MaxCatchupAtLag)) } - if cfg.CatchupMaxSpeedFactor != 0 { + if cfg.StartTimeout != 0 { opts = append(opts, OptionStartTimeout(cfg.StartTimeout)) } - if cfg.CatchupMaxSpeedFactor != 0 { + if cfg.ReadTimeout != 0 { opts = append(opts, OptionReadTimeout(cfg.ReadTimeout)) } + if cfg.NotifierStart != nil { + opts = append(opts, OptionNotifierStart(cfg.NotifierStart)) + } return opts } @@ -90,3 +98,9 @@ type OptionReadTimeout time.Duration func (s OptionReadTimeout) Apply(cfg *Config) { cfg.ReadTimeout = time.Duration(s) } + +type OptionNotifierStart []FuncNotifyStart + +func (s OptionNotifierStart) Apply(cfg *Config) { + cfg.NotifierStart = ([]FuncNotifyStart)(s) +} diff --git a/pkg/streamplayer/types/ctx_key.go b/pkg/streamplayer/types/ctx_key.go new file mode 100644 index 0000000..7d24dac --- /dev/null +++ b/pkg/streamplayer/types/ctx_key.go @@ -0,0 +1,7 @@ +package types + +type CtxKey string + +const ( + CtxKeyStreamPlayer = CtxKey("StreamPlayer") +) diff --git a/pkg/streamserver/implementations/yutopp-go-rtmp/streamserver/stream_player.go b/pkg/streamserver/implementations/yutopp-go-rtmp/streamserver/stream_player.go index fe876fe..1d260fa 100644 --- a/pkg/streamserver/implementations/yutopp-go-rtmp/streamserver/stream_player.go +++ b/pkg/streamserver/implementations/yutopp-go-rtmp/streamserver/stream_player.go @@ -62,9 +62,36 @@ func (s *StreamPlayerStreamServer) GetPortServers( return result, nil } +type setupStreamPlayersConfig struct { + DefaultStreamPlayerOptions streamplayer.Options +} + +type setupStreamPlayersOption interface { + apply(*setupStreamPlayersConfig) +} + +type setupStreamPlayersOptions []setupStreamPlayersOption + +func (s setupStreamPlayersOptions) Config() setupStreamPlayersConfig { + cfg := setupStreamPlayersConfig{} + for _, opt := range s { + opt.apply(&cfg) + } + return cfg +} + +type setupStreamPlayersOptionDefaultStreamPlayerOptions streamplayer.Options + +func (opt setupStreamPlayersOptionDefaultStreamPlayerOptions) apply(cfg *setupStreamPlayersConfig) { + cfg.DefaultStreamPlayerOptions = (streamplayer.Options)(opt) +} + func (s *StreamServer) setupStreamPlayers( ctx context.Context, + opts ...setupStreamPlayersOption, ) error { + setupCfg := setupStreamPlayersOptions(opts).Config() + var streamIDsToDelete []api.StreamID logger.Tracef(ctx, "p.StreamPlayers.StreamPlayersLocker.Lock()-ing") @@ -105,10 +132,14 @@ func (s *StreamServer) setupStreamPlayers( continue } + ssOpts := playerCfg.StreamPlayback.Options() + if setupCfg.DefaultStreamPlayerOptions != nil { + ssOpts = append(ssOpts, setupCfg.DefaultStreamPlayerOptions...) + } _, err := s.StreamPlayers.Create( detachDone(ctx), streamID, - playerCfg.StreamPlayback.Options()..., + ssOpts..., ) if err != nil { err = fmt.Errorf("unable to create a stream player for stream '%s': %w", streamID, err) @@ -126,21 +157,54 @@ func detachDone(ctx context.Context) context.Context { return belt.CtxWithBelt(context.Background(), belt.CtxBelt(ctx)) } +type AddStreamPlayerConfig struct { + DefaultStreamPlayerOptions streamplayer.Options +} + +type AddStreamPlayerOption interface { + apply(*AddStreamPlayerConfig) +} + +type AddStreamPlayerOptions []AddStreamPlayerOption + +func (s AddStreamPlayerOptions) Config() AddStreamPlayerConfig { + cfg := AddStreamPlayerConfig{} + for _, opt := range s { + opt.apply(&cfg) + } + return cfg +} + +type AddStreamPlayerOptionDefaultStreamPlayerOptions streamplayer.Options + +func (opt AddStreamPlayerOptionDefaultStreamPlayerOptions) apply(cfg *AddStreamPlayerConfig) { + cfg.DefaultStreamPlayerOptions = (streamplayer.Options)(opt) +} + func (s *StreamServer) AddStreamPlayer( ctx context.Context, streamID streamtypes.StreamID, playerType player.Backend, disabled bool, streamPlaybackConfig sptypes.Config, + opts ...AddStreamPlayerOption, ) error { + cfg := AddStreamPlayerOptions(opts).Config() + s.Config.Streams[streamID].Player = &sstypes.PlayerConfig{ Player: playerType, Disabled: disabled, StreamPlayback: streamPlaybackConfig, } - if err := s.setupStreamPlayers(ctx); err != nil { - return fmt.Errorf("unable to setup the stream players: %w", err) + { + var opts setupStreamPlayersOptions + if cfg.DefaultStreamPlayerOptions != nil { + opts = append(opts, setupStreamPlayersOptionDefaultStreamPlayerOptions(cfg.DefaultStreamPlayerOptions)) + } + if err := s.setupStreamPlayers(ctx, opts...); err != nil { + return fmt.Errorf("unable to setup the stream players: %w", err) + } } return nil diff --git a/pkg/streamserver/implementations/yutopp-go-rtmp/streamserver/stream_server.go b/pkg/streamserver/implementations/yutopp-go-rtmp/streamserver/stream_server.go index 3a6e2ec..0c404de 100644 --- a/pkg/streamserver/implementations/yutopp-go-rtmp/streamserver/stream_server.go +++ b/pkg/streamserver/implementations/yutopp-go-rtmp/streamserver/stream_server.go @@ -53,7 +53,36 @@ func New( return s } -func (s *StreamServer) Init(ctx context.Context) error { +type InitConfig struct { + DefaultStreamPlayerOptions streamplayer.Options +} + +type InitOption interface { + apply(*InitConfig) +} + +type InitOptions []InitOption + +func (s InitOptions) Config() InitConfig { + cfg := InitConfig{} + for _, opt := range s { + opt.apply(&cfg) + } + return cfg +} + +type InitOptionDefaultStreamPlayerOptions streamplayer.Options + +func (opt InitOptionDefaultStreamPlayerOptions) apply(cfg *InitConfig) { + cfg.DefaultStreamPlayerOptions = (streamplayer.Options)(opt) +} + +func (s *StreamServer) Init( + ctx context.Context, + opts ...InitOption, +) error { + initCfg := InitOptions(opts).Config() + ctx = belt.WithField(ctx, "module", "StreamServer") s.Lock() defer s.Unlock() @@ -92,7 +121,11 @@ func (s *StreamServer) Init(ctx context.Context) error { } go func() { - err := s.setupStreamPlayers(ctx) + var opts setupStreamPlayersOptions + if initCfg.DefaultStreamPlayerOptions != nil { + opts = append(opts, setupStreamPlayersOptionDefaultStreamPlayerOptions(initCfg.DefaultStreamPlayerOptions)) + } + err := s.setupStreamPlayers(ctx, opts...) if err != nil { logger.Error(ctx, err) }