diff --git a/cmd/streampanel/FyneApp.toml b/cmd/streampanel/FyneApp.toml index 534b6c1..0476553 100755 --- a/cmd/streampanel/FyneApp.toml +++ b/cmd/streampanel/FyneApp.toml @@ -5,4 +5,4 @@ Website = "https://github.com/xaionaro/streamctl" Name = "streampanel" ID = "center.dx.streampanel" Version = "0.1.0" - Build = 438 + Build = 439 diff --git a/pkg/streamcontrol/youtube/youtube.go b/pkg/streamcontrol/youtube/youtube.go index a8d7de5..c9174e6 100644 --- a/pkg/streamcontrol/youtube/youtube.go +++ b/pkg/streamcontrol/youtube/youtube.go @@ -907,7 +907,8 @@ func (yt *YouTube) StartStream( err = yt.YouTubeClient.UpdateVideo(ctx, video, videoParts) logger.Debugf(ctx, "YouTube.Update result: %v", err) if err != nil { - return fmt.Errorf("unable to update video data: %w", err) + videoJSON, _ := json.Marshal(video) + return fmt.Errorf("unable to update video data (%s: %s): %w", strings.Join(videoParts, "+"), videoJSON, err) } playlistIDs := make([]string, 0, len(playlistIDMap[templateBroadcastID])) diff --git a/pkg/streamd/chat.go b/pkg/streamd/chat.go index 1194ce3..0ee99c3 100644 --- a/pkg/streamd/chat.go +++ b/pkg/streamd/chat.go @@ -124,7 +124,7 @@ func (d *StreamD) shoutoutIfNeeded( } if !found { - logger.Debugf(ctx, "'%s' not in the list for auto-shoutout at '%s'", userID, msg.Platform) + logger.Debugf(ctx, "'%s' not in the list for auto-shoutout at '%s'", userID.User, msg.Platform) return false } diff --git a/pkg/streamd/config/shoutout.go b/pkg/streamd/config/shoutout.go index 3a12c5e..dbb5bdb 100644 --- a/pkg/streamd/config/shoutout.go +++ b/pkg/streamd/config/shoutout.go @@ -9,6 +9,10 @@ type ChatUserID struct { User streamcontrol.UserID `yaml:"user"` } +func (c ChatUserID) String() string { + return string(c.Platform) + ":" + string(c.User) +} + type Shoutout struct { AutoShoutoutOnMessage []ChatUserID `yaml:"auto_shoutout_on_message"` } diff --git a/pkg/streamplayer/stream_player.go b/pkg/streamplayer/stream_player.go index 1de5cd4..1384bd2 100644 --- a/pkg/streamplayer/stream_player.go +++ b/pkg/streamplayer/stream_player.go @@ -15,6 +15,7 @@ import ( "github.com/facebookincubator/go-belt/tool/experimental/errmon" "github.com/facebookincubator/go-belt/tool/logger" "github.com/hashicorp/go-multierror" + "github.com/xaionaro-go/avpipeline/indicator" "github.com/xaionaro-go/avpipeline/kernel" "github.com/xaionaro-go/avpipeline/packet" "github.com/xaionaro-go/avpipeline/processor" @@ -28,14 +29,14 @@ import ( ) const ( - enableSeekOnStart = true - enableTracksRotation = false - enableSlowDown = true - minSpeed = 0.975 - minSpeedDifferenceSlowDown = 0.001 - minJitterBufDuration = 100 * time.Millisecond - jitterBufDecayHalftime = 5 * time.Minute - playerCheckInterval = 100 * time.Millisecond + enableSeekOnStart = true + enableTracksRotation = false + enableSlowDown = true + minSpeed = 0.975 + minSpeedDifference = 0.01 + minJitterBufDuration = 500 * time.Millisecond + jitterBufDecayHalftime = 5 * time.Minute + playerCheckInterval = 100 * time.Millisecond ) type Publisher interface { @@ -94,6 +95,7 @@ type StreamPlayer struct { Backend player.Backend Config Config + WantSpeedAverage *indicator.MAMA[float64] CurrentJitterBufDuration time.Duration } @@ -129,9 +131,10 @@ func (sp *StreamPlayers) Create( Parent: sp, Cancel: cancel, StreamPlayer: StreamPlayer{ - Backend: backend, - Config: resultingOpts.Config(ctx), - StreamID: streamID, + Backend: backend, + Config: resultingOpts.Config(ctx), + StreamID: streamID, + WantSpeedAverage: indicator.NewMAMA[float64](10, 0.3, 0.05), }, } @@ -672,7 +675,8 @@ func (p *StreamPlayerHandler) controllerLoop( err = fmt.Errorf("unable to get position: %w", err) } if enableSeekOnStart && protocol != streamtypes.ServerTypeRTMP && !triedToSeek { - l, err := player.GetLength(ctx) + var l time.Duration + l, err = player.GetLength(ctx) if err != nil { err = fmt.Errorf("unable to get length: %w", err) } @@ -801,9 +805,10 @@ func (p *StreamPlayerHandler) controllerLoop( p.Config.JitterBufDuration, ) logger.Debugf(ctx, - "StreamPlayer[%s].controllerLoop: increased jitter buffer duration to %v", + "StreamPlayer[%s].controllerLoop: increased jitter buffer duration to %v (increased by %v)", p.StreamID, p.CurrentJitterBufDuration, + jitterBufDurationIncrease, ) jitterBufDurationIncrease = 0 } @@ -934,14 +939,15 @@ func (p *StreamPlayerHandler) controllerLoop( if enableSlowDown && protocol == streamtypes.ServerTypeRTMP && p.CurrentJitterBufDuration > time.Second && minBufDuration > lag { jitterBufDurationIncrease = max(jitterBufDurationIncrease, minBufDuration-lag) k := lag.Seconds() / minBufDuration.Seconds() - speed := 1 - (1-k)*(1-minSpeed) - if speed <= 0 { + wantSpeed := 1 - (1-k)*(1-minSpeed) + if wantSpeed <= 0 { return } - if math.Abs(speed-curSpeed) < minSpeedDifferenceSlowDown { + setSpeed := p.WantSpeedAverage.Update(wantSpeed) + if math.Abs(wantSpeed-curSpeed) < minSpeedDifference { return } - curSpeed = speed + curSpeed = setSpeed logger.Debugf(ctx, "StreamPlayer[%s].controllerLoop: slowing down to %f", p.StreamID, curSpeed, @@ -958,6 +964,7 @@ func (p *StreamPlayerHandler) controllerLoop( commitJitterBufferIncrease() } if lag <= p.CurrentJitterBufDuration { + p.WantSpeedAverage.Update(1) if curSpeed == 1 { return } @@ -979,29 +986,31 @@ func (p *StreamPlayerHandler) controllerLoop( // log(x) = log(0.5) / (halftime/interval) // x = e^(log(0.5)/(halftime/interval)) jitterBufFactor := math.Exp(math.Log(2) / (jitterBufDecayHalftime.Seconds() / playerCheckInterval.Seconds())) - logger.Tracef(ctx, - "StreamPlayer[%s].controllerLoop: increasing jitter buffer duration factor: %v (halftime: %v, interval: %v)", - p.StreamID, - jitterBufFactor, - jitterBufDecayHalftime, - playerCheckInterval, - ) p.CurrentJitterBufDuration = max( time.Duration(float64(p.CurrentJitterBufDuration)*jitterBufFactor), minJitterBufDuration, ) + logger.Tracef(ctx, + "StreamPlayer[%s].controllerLoop: increasing jitter buffer duration factor: %v (halftime: %v, interval: %v); new duration: %v", + p.StreamID, + jitterBufFactor, + jitterBufDecayHalftime, + playerCheckInterval, + p.CurrentJitterBufDuration, + ) - speed := float64(1) + + wantSpeed := float64(1) + (p.Config.CatchupMaxSpeedFactor-float64(1))* (lag-p.CurrentJitterBufDuration).Seconds()/ (p.Config.MaxCatchupAtLag-p.CurrentJitterBufDuration).Seconds() - speed = float64(uint(speed*100)) / 100 // to avoid flickering (for example between 1.0001 and 1.0) + setSpeed := p.WantSpeedAverage.Update(wantSpeed) + setSpeed = float64(uint(setSpeed*50)) / 50 // to avoid flickering (for example between 1.0001 and 1.0000) - if speed > p.Config.CatchupMaxSpeedFactor { + if setSpeed > p.Config.CatchupMaxSpeedFactor { logger.Warnf(ctx, "speed is calculated higher than the maximum: %v > %v: (%v-1)*(%v-%v)/(%v-%v); lag calculation: %v - %v", - speed, + setSpeed, p.Config.CatchupMaxSpeedFactor, p.Config.CatchupMaxSpeedFactor, lag.Seconds(), @@ -1010,22 +1019,24 @@ func (p *StreamPlayerHandler) controllerLoop( p.CurrentJitterBufDuration.Seconds(), l, pos, ) - speed = p.Config.CatchupMaxSpeedFactor + setSpeed = p.Config.CatchupMaxSpeedFactor } - if speed != curSpeed { - logger.Debugf( - ctx, - "StreamPlayer[%s].controllerLoop: setting the speed to %v: lag: %v - %v == %v", - p.StreamID, speed, l, pos, lag, - ) - err = player.SetSpeed(ctx, speed) - if err != nil { - logger.Errorf(ctx, "unable to set the speed to %v: %v", speed, err) - return - } - curSpeed = speed + if setSpeed == curSpeed { + return } + + logger.Debugf( + ctx, + "StreamPlayer[%s].controllerLoop: setting the speed to %v: lag: %v - %v == %v", + p.StreamID, setSpeed, l, pos, lag, + ) + err = player.SetSpeed(ctx, setSpeed) + if err != nil { + logger.Errorf(ctx, "unable to set the speed to %v: %v", setSpeed, err) + return + } + curSpeed = setSpeed }) if err != nil { logger.Error(ctx, "unable to get the player: %v", err)