[streamplayer] Smoother control over playback speed
Some checks failed
rolling-release / build (push) Has been cancelled
rolling-release / rolling-release (push) Has been cancelled

This commit is contained in:
Dmitrii Okunev
2025-11-10 23:44:30 +00:00
parent 335f3cf154
commit a825d193e3
5 changed files with 60 additions and 44 deletions

View File

@@ -5,4 +5,4 @@ Website = "https://github.com/xaionaro/streamctl"
Name = "streampanel"
ID = "center.dx.streampanel"
Version = "0.1.0"
Build = 438
Build = 439

View File

@@ -907,7 +907,8 @@ func (yt *YouTube) StartStream(
err = yt.YouTubeClient.UpdateVideo(ctx, video, videoParts)
logger.Debugf(ctx, "YouTube.Update result: %v", err)
if err != nil {
return fmt.Errorf("unable to update video data: %w", err)
videoJSON, _ := json.Marshal(video)
return fmt.Errorf("unable to update video data (%s: %s): %w", strings.Join(videoParts, "+"), videoJSON, err)
}
playlistIDs := make([]string, 0, len(playlistIDMap[templateBroadcastID]))

View File

@@ -124,7 +124,7 @@ func (d *StreamD) shoutoutIfNeeded(
}
if !found {
logger.Debugf(ctx, "'%s' not in the list for auto-shoutout at '%s'", userID, msg.Platform)
logger.Debugf(ctx, "'%s' not in the list for auto-shoutout at '%s'", userID.User, msg.Platform)
return false
}

View File

@@ -9,6 +9,10 @@ type ChatUserID struct {
User streamcontrol.UserID `yaml:"user"`
}
func (c ChatUserID) String() string {
return string(c.Platform) + ":" + string(c.User)
}
type Shoutout struct {
AutoShoutoutOnMessage []ChatUserID `yaml:"auto_shoutout_on_message"`
}

View File

@@ -15,6 +15,7 @@ import (
"github.com/facebookincubator/go-belt/tool/experimental/errmon"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/hashicorp/go-multierror"
"github.com/xaionaro-go/avpipeline/indicator"
"github.com/xaionaro-go/avpipeline/kernel"
"github.com/xaionaro-go/avpipeline/packet"
"github.com/xaionaro-go/avpipeline/processor"
@@ -28,14 +29,14 @@ import (
)
const (
enableSeekOnStart = true
enableTracksRotation = false
enableSlowDown = true
minSpeed = 0.975
minSpeedDifferenceSlowDown = 0.001
minJitterBufDuration = 100 * time.Millisecond
jitterBufDecayHalftime = 5 * time.Minute
playerCheckInterval = 100 * time.Millisecond
enableSeekOnStart = true
enableTracksRotation = false
enableSlowDown = true
minSpeed = 0.975
minSpeedDifference = 0.01
minJitterBufDuration = 500 * time.Millisecond
jitterBufDecayHalftime = 5 * time.Minute
playerCheckInterval = 100 * time.Millisecond
)
type Publisher interface {
@@ -94,6 +95,7 @@ type StreamPlayer struct {
Backend player.Backend
Config Config
WantSpeedAverage *indicator.MAMA[float64]
CurrentJitterBufDuration time.Duration
}
@@ -129,9 +131,10 @@ func (sp *StreamPlayers) Create(
Parent: sp,
Cancel: cancel,
StreamPlayer: StreamPlayer{
Backend: backend,
Config: resultingOpts.Config(ctx),
StreamID: streamID,
Backend: backend,
Config: resultingOpts.Config(ctx),
StreamID: streamID,
WantSpeedAverage: indicator.NewMAMA[float64](10, 0.3, 0.05),
},
}
@@ -672,7 +675,8 @@ func (p *StreamPlayerHandler) controllerLoop(
err = fmt.Errorf("unable to get position: %w", err)
}
if enableSeekOnStart && protocol != streamtypes.ServerTypeRTMP && !triedToSeek {
l, err := player.GetLength(ctx)
var l time.Duration
l, err = player.GetLength(ctx)
if err != nil {
err = fmt.Errorf("unable to get length: %w", err)
}
@@ -801,9 +805,10 @@ func (p *StreamPlayerHandler) controllerLoop(
p.Config.JitterBufDuration,
)
logger.Debugf(ctx,
"StreamPlayer[%s].controllerLoop: increased jitter buffer duration to %v",
"StreamPlayer[%s].controllerLoop: increased jitter buffer duration to %v (increased by %v)",
p.StreamID,
p.CurrentJitterBufDuration,
jitterBufDurationIncrease,
)
jitterBufDurationIncrease = 0
}
@@ -934,14 +939,15 @@ func (p *StreamPlayerHandler) controllerLoop(
if enableSlowDown && protocol == streamtypes.ServerTypeRTMP && p.CurrentJitterBufDuration > time.Second && minBufDuration > lag {
jitterBufDurationIncrease = max(jitterBufDurationIncrease, minBufDuration-lag)
k := lag.Seconds() / minBufDuration.Seconds()
speed := 1 - (1-k)*(1-minSpeed)
if speed <= 0 {
wantSpeed := 1 - (1-k)*(1-minSpeed)
if wantSpeed <= 0 {
return
}
if math.Abs(speed-curSpeed) < minSpeedDifferenceSlowDown {
setSpeed := p.WantSpeedAverage.Update(wantSpeed)
if math.Abs(wantSpeed-curSpeed) < minSpeedDifference {
return
}
curSpeed = speed
curSpeed = setSpeed
logger.Debugf(ctx,
"StreamPlayer[%s].controllerLoop: slowing down to %f",
p.StreamID, curSpeed,
@@ -958,6 +964,7 @@ func (p *StreamPlayerHandler) controllerLoop(
commitJitterBufferIncrease()
}
if lag <= p.CurrentJitterBufDuration {
p.WantSpeedAverage.Update(1)
if curSpeed == 1 {
return
}
@@ -979,29 +986,31 @@ func (p *StreamPlayerHandler) controllerLoop(
// log(x) = log(0.5) / (halftime/interval)
// x = e^(log(0.5)/(halftime/interval))
jitterBufFactor := math.Exp(math.Log(2) / (jitterBufDecayHalftime.Seconds() / playerCheckInterval.Seconds()))
logger.Tracef(ctx,
"StreamPlayer[%s].controllerLoop: increasing jitter buffer duration factor: %v (halftime: %v, interval: %v)",
p.StreamID,
jitterBufFactor,
jitterBufDecayHalftime,
playerCheckInterval,
)
p.CurrentJitterBufDuration = max(
time.Duration(float64(p.CurrentJitterBufDuration)*jitterBufFactor),
minJitterBufDuration,
)
logger.Tracef(ctx,
"StreamPlayer[%s].controllerLoop: increasing jitter buffer duration factor: %v (halftime: %v, interval: %v); new duration: %v",
p.StreamID,
jitterBufFactor,
jitterBufDecayHalftime,
playerCheckInterval,
p.CurrentJitterBufDuration,
)
speed := float64(1) +
wantSpeed := float64(1) +
(p.Config.CatchupMaxSpeedFactor-float64(1))*
(lag-p.CurrentJitterBufDuration).Seconds()/
(p.Config.MaxCatchupAtLag-p.CurrentJitterBufDuration).Seconds()
speed = float64(uint(speed*100)) / 100 // to avoid flickering (for example between 1.0001 and 1.0)
setSpeed := p.WantSpeedAverage.Update(wantSpeed)
setSpeed = float64(uint(setSpeed*50)) / 50 // to avoid flickering (for example between 1.0001 and 1.0000)
if speed > p.Config.CatchupMaxSpeedFactor {
if setSpeed > p.Config.CatchupMaxSpeedFactor {
logger.Warnf(ctx,
"speed is calculated higher than the maximum: %v > %v: (%v-1)*(%v-%v)/(%v-%v); lag calculation: %v - %v",
speed,
setSpeed,
p.Config.CatchupMaxSpeedFactor,
p.Config.CatchupMaxSpeedFactor,
lag.Seconds(),
@@ -1010,22 +1019,24 @@ func (p *StreamPlayerHandler) controllerLoop(
p.CurrentJitterBufDuration.Seconds(),
l, pos,
)
speed = p.Config.CatchupMaxSpeedFactor
setSpeed = p.Config.CatchupMaxSpeedFactor
}
if speed != curSpeed {
logger.Debugf(
ctx,
"StreamPlayer[%s].controllerLoop: setting the speed to %v: lag: %v - %v == %v",
p.StreamID, speed, l, pos, lag,
)
err = player.SetSpeed(ctx, speed)
if err != nil {
logger.Errorf(ctx, "unable to set the speed to %v: %v", speed, err)
return
}
curSpeed = speed
if setSpeed == curSpeed {
return
}
logger.Debugf(
ctx,
"StreamPlayer[%s].controllerLoop: setting the speed to %v: lag: %v - %v == %v",
p.StreamID, setSpeed, l, pos, lag,
)
err = player.SetSpeed(ctx, setSpeed)
if err != nil {
logger.Errorf(ctx, "unable to set the speed to %v: %v", setSpeed, err)
return
}
curSpeed = setSpeed
})
if err != nil {
logger.Error(ctx, "unable to get the player: %v", err)