From 3d6d618a792e2c1e1706ffe236a388d091d0bbda Mon Sep 17 00:00:00 2001 From: langhuihui <178529795@qq.com> Date: Mon, 14 Apr 2025 13:58:21 +0800 Subject: [PATCH] fix: speed control --- pkg/track.go | 108 +++++++++++++++++++++++++++++++++---- publisher.go | 146 ++++++++++++++------------------------------------- 2 files changed, 137 insertions(+), 117 deletions(-) diff --git a/pkg/track.go b/pkg/track.go index 3128261..d85f65b 100644 --- a/pkg/track.go +++ b/pkg/track.go @@ -14,6 +14,11 @@ import ( "m7s.live/v5/pkg/util" ) +const threshold = 10 * time.Millisecond +const DROP_FRAME_LEVEL_NODROP = 0 +const DROP_FRAME_LEVEL_DROP_P = 1 +const DROP_FRAME_LEVEL_DROP_ALL = 2 + type ( Track struct { *slog.Logger @@ -33,6 +38,20 @@ type ( BaseTs, LastTs, BeforeScaleChangedTs time.Duration LastScale float64 } + SpeedController struct { + speed float64 + pausedTime time.Duration + beginTime time.Time + beginTimestamp time.Duration + Delta time.Duration + } + DropController struct { + acceptFrameCount int + accpetFPS int + LastDropLevelChange time.Time + DropFrameLevel int // 0: no drop, 1: drop P-frame, 2: drop all + } + AVTrack struct { Track *RingWriter @@ -41,8 +60,8 @@ type ( SequenceFrame IAVFrame WrapIndex int TsTamer - DropFrameLevel int // 0: no drop, 1: drop P-frame, 2: drop all - LastDropLevelChange time.Time + SpeedController + DropController } ) @@ -90,18 +109,56 @@ func (t *Track) AddBytesIn(n int) { } } -func (t *AVTrack) FixFPS() { - if dur := time.Since(t.lastBPSTime); dur > time.Second { - t.BPS = int(float64(t.bytesIn) / dur.Seconds()) - t.bytesIn = 0 - t.FPS = int(float64(t.frameCount) / dur.Seconds()) - t.frameCount = 0 - t.lastBPSTime = time.Now() +func (t *AVTrack) AddBytesIn(n int) { + dur := time.Since(t.lastBPSTime) + t.Track.AddBytesIn(n) + if t.frameCount == 0 { + t.accpetFPS = int(float64(t.acceptFrameCount) / dur.Seconds()) + t.acceptFrameCount = 0 } } +func (t *AVTrack) AcceptFrame(data IAVFrame) { + t.acceptFrameCount++ + t.Value.Wraps = append(t.Value.Wraps, data) +} + +func (t *AVTrack) changeDropFrameLevel(newLevel int) { + t.Warn("change drop frame level", "from", t.DropFrameLevel, "to", newLevel) + t.DropFrameLevel = newLevel + t.LastDropLevelChange = time.Now() +} + func (t *AVTrack) CheckIfNeedDropFrame(maxFPS int) (drop bool) { - return maxFPS > 0 && (t.FPS > maxFPS || t.frameCount > maxFPS) + drop = maxFPS > 0 && (t.accpetFPS > maxFPS) + if drop { + defer func() { + if time.Since(t.LastDropLevelChange) > time.Second && t.DropFrameLevel > 0 { + t.changeDropFrameLevel(t.DropFrameLevel + 1) + } + }() + } + // Enhanced frame dropping strategy based on DropFrameLevel + switch t.DropFrameLevel { + case DROP_FRAME_LEVEL_NODROP: + if drop { + t.changeDropFrameLevel(DROP_FRAME_LEVEL_DROP_P) + } + case DROP_FRAME_LEVEL_DROP_P: // Drop P-frame + if !t.Value.IDR { + return true + } else if !drop { + t.changeDropFrameLevel(DROP_FRAME_LEVEL_NODROP) + } + return false + default: + if !drop { + t.changeDropFrameLevel(DROP_FRAME_LEVEL_DROP_P) + } else { + return true + } + } + return } func (t *AVTrack) Ready(err error) { @@ -150,7 +207,7 @@ func (t *TsTamer) Tame(ts time.Duration, fps int, scale float64) (result time.Du result = max(1*time.Millisecond, t.BaseTs+ts) if fps > 0 { frameDur := float64(time.Second) / float64(fps) - if math.Abs(float64(result-t.LastTs)) > 10*frameDur { //时间戳突变 + if math.Abs(float64(result-t.LastTs)) > 10*frameDur*scale { //时间戳突变 // t.Warn("timestamp mutation", "fps", t.FPS, "lastTs", uint32(t.LastTs/time.Millisecond), "ts", uint32(frame.Timestamp/time.Millisecond), "frameDur", time.Duration(frameDur)) result = t.LastTs + time.Duration(frameDur) t.BaseTs = result - ts @@ -164,3 +221,32 @@ func (t *TsTamer) Tame(ts time.Duration, fps int, scale float64) (result time.Du result = t.BeforeScaleChangedTs + time.Duration(float64(result-t.BeforeScaleChangedTs)/scale) return } + +func (t *AVTrack) SpeedControl(speed float64) { + t.speedControl(speed, t.LastTs) +} + +func (t *AVTrack) AddPausedTime(d time.Duration) { + t.pausedTime += d +} + +func (s *SpeedController) speedControl(speed float64, ts time.Duration) { + if speed != s.speed || s.beginTime.IsZero() { + s.speed = speed + s.beginTime = time.Now() + s.beginTimestamp = ts + s.pausedTime = 0 + } else { + elapsed := time.Since(s.beginTime) - s.pausedTime + if speed == 0 { + s.Delta = ts - elapsed + return + } + should := time.Duration(float64(ts-s.beginTimestamp) / speed) + s.Delta = should - elapsed + // fmt.Println(speed, elapsed, should, s.Delta) + if s.Delta > threshold { + time.Sleep(min(s.Delta, time.Millisecond*500)) + } + } +} diff --git a/publisher.go b/publisher.go index b88348e..8f867d8 100644 --- a/publisher.go +++ b/publisher.go @@ -36,40 +36,8 @@ const ( PublishTypeReplay = "replay" ) -const threshold = 10 * time.Millisecond - -type SpeedControl struct { - speed float64 - pausedTime time.Duration - beginTime time.Time - beginTimestamp time.Duration - Delta time.Duration -} - -func (s *SpeedControl) speedControl(speed float64, ts time.Duration) { - if speed != s.speed || s.beginTime.IsZero() { - s.speed = speed - s.beginTime = time.Now() - s.beginTimestamp = ts - s.pausedTime = 0 - } else { - elapsed := time.Since(s.beginTime) - s.pausedTime - if speed == 0 { - s.Delta = ts - elapsed - return - } - should := time.Duration(float64(ts-s.beginTimestamp) / speed) - s.Delta = should - elapsed - // fmt.Println(speed, elapsed, should, s.Delta) - if s.Delta > threshold { - time.Sleep(min(s.Delta, time.Millisecond*500)) - } - } -} - type AVTracks struct { *AVTrack - SpeedControl util.Collection[reflect.Type, *AVTrack] sync.RWMutex baseTs time.Duration //from old publisher's lastTs @@ -145,7 +113,6 @@ type Publisher struct { OnGetPosition func() time.Time PullProxyConfig *PullProxyConfig dumpFile *os.File - dropRate float64 // 丢帧率,0-1之间 dropAfterTs time.Duration } @@ -317,18 +284,21 @@ func (p *Publisher) AddSubscriber(subscriber *Subscriber) { } } -func (p *Publisher) writeAV(t *AVTrack, data IAVFrame) { +func (p *Publisher) fixTimestamp(t *AVTrack, data IAVFrame) { frame := &t.Value - frame.Wraps = append(frame.Wraps, data) ts := data.GetTimestamp() frame.CTS = data.GetCTS() - bytesIn := frame.Wraps[0].GetSize() + bytesIn := data.GetSize() t.AddBytesIn(bytesIn) frame.Timestamp = t.Tame(ts, t.FPS, p.Scale) +} + +func (p *Publisher) writeAV(t *AVTrack, data IAVFrame) { + t.AcceptFrame(data) if p.Enabled(p, task.TraceLevel) { + frame := &t.Value codec := t.FourCC().String() - data := frame.Wraps[0].String() - p.Trace("write", "seq", frame.Sequence, "baseTs", int32(t.BaseTs/time.Millisecond), "ts0", uint32(ts/time.Millisecond), "ts", uint32(frame.Timestamp/time.Millisecond), "codec", codec, "size", bytesIn, "data", data) + p.Trace("write", "seq", frame.Sequence, "baseTs", int32(t.BaseTs/time.Millisecond), "ts0", uint32(data.GetTimestamp()/time.Millisecond), "ts", uint32(frame.Timestamp/time.Millisecond), "codec", codec, "size", data.GetSize(), "data", data.String()) } } @@ -341,53 +311,13 @@ func (p *Publisher) trackAdded() error { return nil } -func (p *Publisher) changeDropFrameLevel(newLevel int) { - p.Warn("change drop frame level", "from", p.VideoTrack.AVTrack.DropFrameLevel, "to", newLevel) - p.VideoTrack.AVTrack.DropFrameLevel = newLevel - p.VideoTrack.AVTrack.LastDropLevelChange = time.Now() -} - -func (p *Publisher) dropFrame(t *AVTrack) (drop bool) { - needDropFrame := t.CheckIfNeedDropFrame(p.MaxFPS) - if needDropFrame { - dropRatio := float64(t.FPS)/float64(p.MaxFPS) - 1 // How many frames to drop for each frame kept - p.dropRate = dropRatio / (dropRatio + 1) // 计算丢帧率 - defer func() { - if time.Since(t.LastDropLevelChange) > time.Second && t.DropFrameLevel > 0 { - p.changeDropFrameLevel(t.DropFrameLevel + 1) - } - }() - } - // Enhanced frame dropping strategy based on DropFrameLevel - switch t.DropFrameLevel { - case 0: - if needDropFrame { - p.changeDropFrameLevel(1) - } - case 1: // Drop P-frame - if !t.Value.IDR { - return true - } else if !needDropFrame { - p.changeDropFrameLevel(0) - } - default: - if !needDropFrame { - p.changeDropFrameLevel(1) - } else { - return true - } - } - return -} - func (p *Publisher) WriteVideo(data IAVFrame) (err error) { - t := p.VideoTrack.AVTrack defer func() { if err != nil { data.Recycle() - } - if t != nil { - t.FixFPS() + if err == ErrSkip { + err = nil + } } }() if err = p.Err(); err != nil { @@ -399,15 +329,17 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) { if !p.PubVideo { return ErrMuted } + t := p.VideoTrack.AVTrack if t == nil { t = NewAVTrack(data, p.Logger.With("track", "video"), &p.Publish, p.videoReady) p.VideoTrack.Set(t) p.Call(p.trackAdded) } + p.fixTimestamp(t, data) + defer t.SpeedControl(p.Speed) oldCodecCtx := t.ICodecCtx err = data.Parse(t) - if err == ErrSkip { - data.Recycle() + if err != nil { return nil } codecCtxChanged := oldCodecCtx != t.ICodecCtx @@ -429,10 +361,9 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) { var idr *util.Ring[AVFrame] if t.IDRingList.Len() > 0 { idr = t.IDRingList.Back().Value - if p.dropFrame(t) { + if t.CheckIfNeedDropFrame(p.MaxFPS) { p.dropAfterTs = t.LastTs - data.Recycle() - return nil + return ErrSkip } else { p.dropAfterTs = 0 } @@ -492,7 +423,6 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) { } } t.Step() - p.VideoTrack.speedControl(p.Speed, t.LastTs) return } @@ -501,9 +431,9 @@ func (p *Publisher) WriteAudio(data IAVFrame) (err error) { defer func() { if err != nil { data.Recycle() - } - if t != nil { - t.FixFPS() + if err == ErrSkip { + err = nil + } } }() if err = p.Err(); err != nil { @@ -515,26 +445,27 @@ func (p *Publisher) WriteAudio(data IAVFrame) (err error) { if !p.PubAudio { return ErrMuted } - // 根据丢帧率进行音频帧丢弃 - if p.dropAfterTs > 0 { - if t != nil { - // 使用序列号进行平均丢帧 - if t.LastTs > p.dropAfterTs { - data.Recycle() - return nil - } - } - } + if t == nil { t = NewAVTrack(data, p.Logger.With("track", "audio"), &p.Publish, p.audioReady) p.AudioTrack.Set(t) p.Call(p.trackAdded) } + p.fixTimestamp(t, data) + defer t.SpeedControl(p.Speed) + // 根据丢帧率进行音频帧丢弃 + if p.dropAfterTs > 0 { + if t != nil { + // 使用序列号进行平均丢帧 + if t.LastTs > p.dropAfterTs { + return ErrSkip + } + } + } oldCodecCtx := t.ICodecCtx err = data.Parse(t) - if err == ErrSkip { - data.Recycle() - return nil + if err != nil { + return } codecCtxChanged := oldCodecCtx != t.ICodecCtx if t.ICodecCtx == nil { @@ -584,7 +515,6 @@ func (p *Publisher) WriteAudio(data IAVFrame) (err error) { } } t.Step() - p.AudioTrack.speedControl(p.Speed, t.LastTs) return } @@ -738,8 +668,12 @@ func (p *Publisher) Resume() { } p.Paused.Resolve() p.Paused = nil - p.VideoTrack.pausedTime += time.Since(p.pauseTime) - p.AudioTrack.pausedTime += time.Since(p.pauseTime) + if p.HasVideoTrack() { + p.VideoTrack.AddPausedTime(time.Since(p.pauseTime)) + } + if p.HasAudioTrack() { + p.AudioTrack.AddPausedTime(time.Since(p.pauseTime)) + } } func (p *Publisher) Seek(ts time.Time) {