mirror of
https://github.com/langhuihui/monibuca.git
synced 2025-12-24 13:48:04 +08:00
fix: speed control
This commit is contained in:
108
pkg/track.go
108
pkg/track.go
@@ -14,6 +14,11 @@ import (
|
||||
"m7s.live/v5/pkg/util"
|
||||
)
|
||||
|
||||
const threshold = 10 * time.Millisecond
|
||||
const DROP_FRAME_LEVEL_NODROP = 0
|
||||
const DROP_FRAME_LEVEL_DROP_P = 1
|
||||
const DROP_FRAME_LEVEL_DROP_ALL = 2
|
||||
|
||||
type (
|
||||
Track struct {
|
||||
*slog.Logger
|
||||
@@ -33,6 +38,20 @@ type (
|
||||
BaseTs, LastTs, BeforeScaleChangedTs time.Duration
|
||||
LastScale float64
|
||||
}
|
||||
SpeedController struct {
|
||||
speed float64
|
||||
pausedTime time.Duration
|
||||
beginTime time.Time
|
||||
beginTimestamp time.Duration
|
||||
Delta time.Duration
|
||||
}
|
||||
DropController struct {
|
||||
acceptFrameCount int
|
||||
accpetFPS int
|
||||
LastDropLevelChange time.Time
|
||||
DropFrameLevel int // 0: no drop, 1: drop P-frame, 2: drop all
|
||||
}
|
||||
|
||||
AVTrack struct {
|
||||
Track
|
||||
*RingWriter
|
||||
@@ -41,8 +60,8 @@ type (
|
||||
SequenceFrame IAVFrame
|
||||
WrapIndex int
|
||||
TsTamer
|
||||
DropFrameLevel int // 0: no drop, 1: drop P-frame, 2: drop all
|
||||
LastDropLevelChange time.Time
|
||||
SpeedController
|
||||
DropController
|
||||
}
|
||||
)
|
||||
|
||||
@@ -90,18 +109,56 @@ func (t *Track) AddBytesIn(n int) {
|
||||
}
|
||||
}
|
||||
|
||||
func (t *AVTrack) FixFPS() {
|
||||
if dur := time.Since(t.lastBPSTime); dur > time.Second {
|
||||
t.BPS = int(float64(t.bytesIn) / dur.Seconds())
|
||||
t.bytesIn = 0
|
||||
t.FPS = int(float64(t.frameCount) / dur.Seconds())
|
||||
t.frameCount = 0
|
||||
t.lastBPSTime = time.Now()
|
||||
func (t *AVTrack) AddBytesIn(n int) {
|
||||
dur := time.Since(t.lastBPSTime)
|
||||
t.Track.AddBytesIn(n)
|
||||
if t.frameCount == 0 {
|
||||
t.accpetFPS = int(float64(t.acceptFrameCount) / dur.Seconds())
|
||||
t.acceptFrameCount = 0
|
||||
}
|
||||
}
|
||||
|
||||
func (t *AVTrack) AcceptFrame(data IAVFrame) {
|
||||
t.acceptFrameCount++
|
||||
t.Value.Wraps = append(t.Value.Wraps, data)
|
||||
}
|
||||
|
||||
func (t *AVTrack) changeDropFrameLevel(newLevel int) {
|
||||
t.Warn("change drop frame level", "from", t.DropFrameLevel, "to", newLevel)
|
||||
t.DropFrameLevel = newLevel
|
||||
t.LastDropLevelChange = time.Now()
|
||||
}
|
||||
|
||||
func (t *AVTrack) CheckIfNeedDropFrame(maxFPS int) (drop bool) {
|
||||
return maxFPS > 0 && (t.FPS > maxFPS || t.frameCount > maxFPS)
|
||||
drop = maxFPS > 0 && (t.accpetFPS > maxFPS)
|
||||
if drop {
|
||||
defer func() {
|
||||
if time.Since(t.LastDropLevelChange) > time.Second && t.DropFrameLevel > 0 {
|
||||
t.changeDropFrameLevel(t.DropFrameLevel + 1)
|
||||
}
|
||||
}()
|
||||
}
|
||||
// Enhanced frame dropping strategy based on DropFrameLevel
|
||||
switch t.DropFrameLevel {
|
||||
case DROP_FRAME_LEVEL_NODROP:
|
||||
if drop {
|
||||
t.changeDropFrameLevel(DROP_FRAME_LEVEL_DROP_P)
|
||||
}
|
||||
case DROP_FRAME_LEVEL_DROP_P: // Drop P-frame
|
||||
if !t.Value.IDR {
|
||||
return true
|
||||
} else if !drop {
|
||||
t.changeDropFrameLevel(DROP_FRAME_LEVEL_NODROP)
|
||||
}
|
||||
return false
|
||||
default:
|
||||
if !drop {
|
||||
t.changeDropFrameLevel(DROP_FRAME_LEVEL_DROP_P)
|
||||
} else {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (t *AVTrack) Ready(err error) {
|
||||
@@ -150,7 +207,7 @@ func (t *TsTamer) Tame(ts time.Duration, fps int, scale float64) (result time.Du
|
||||
result = max(1*time.Millisecond, t.BaseTs+ts)
|
||||
if fps > 0 {
|
||||
frameDur := float64(time.Second) / float64(fps)
|
||||
if math.Abs(float64(result-t.LastTs)) > 10*frameDur { //时间戳突变
|
||||
if math.Abs(float64(result-t.LastTs)) > 10*frameDur*scale { //时间戳突变
|
||||
// t.Warn("timestamp mutation", "fps", t.FPS, "lastTs", uint32(t.LastTs/time.Millisecond), "ts", uint32(frame.Timestamp/time.Millisecond), "frameDur", time.Duration(frameDur))
|
||||
result = t.LastTs + time.Duration(frameDur)
|
||||
t.BaseTs = result - ts
|
||||
@@ -164,3 +221,32 @@ func (t *TsTamer) Tame(ts time.Duration, fps int, scale float64) (result time.Du
|
||||
result = t.BeforeScaleChangedTs + time.Duration(float64(result-t.BeforeScaleChangedTs)/scale)
|
||||
return
|
||||
}
|
||||
|
||||
func (t *AVTrack) SpeedControl(speed float64) {
|
||||
t.speedControl(speed, t.LastTs)
|
||||
}
|
||||
|
||||
func (t *AVTrack) AddPausedTime(d time.Duration) {
|
||||
t.pausedTime += d
|
||||
}
|
||||
|
||||
func (s *SpeedController) speedControl(speed float64, ts time.Duration) {
|
||||
if speed != s.speed || s.beginTime.IsZero() {
|
||||
s.speed = speed
|
||||
s.beginTime = time.Now()
|
||||
s.beginTimestamp = ts
|
||||
s.pausedTime = 0
|
||||
} else {
|
||||
elapsed := time.Since(s.beginTime) - s.pausedTime
|
||||
if speed == 0 {
|
||||
s.Delta = ts - elapsed
|
||||
return
|
||||
}
|
||||
should := time.Duration(float64(ts-s.beginTimestamp) / speed)
|
||||
s.Delta = should - elapsed
|
||||
// fmt.Println(speed, elapsed, should, s.Delta)
|
||||
if s.Delta > threshold {
|
||||
time.Sleep(min(s.Delta, time.Millisecond*500))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
146
publisher.go
146
publisher.go
@@ -36,40 +36,8 @@ const (
|
||||
PublishTypeReplay = "replay"
|
||||
)
|
||||
|
||||
const threshold = 10 * time.Millisecond
|
||||
|
||||
type SpeedControl struct {
|
||||
speed float64
|
||||
pausedTime time.Duration
|
||||
beginTime time.Time
|
||||
beginTimestamp time.Duration
|
||||
Delta time.Duration
|
||||
}
|
||||
|
||||
func (s *SpeedControl) speedControl(speed float64, ts time.Duration) {
|
||||
if speed != s.speed || s.beginTime.IsZero() {
|
||||
s.speed = speed
|
||||
s.beginTime = time.Now()
|
||||
s.beginTimestamp = ts
|
||||
s.pausedTime = 0
|
||||
} else {
|
||||
elapsed := time.Since(s.beginTime) - s.pausedTime
|
||||
if speed == 0 {
|
||||
s.Delta = ts - elapsed
|
||||
return
|
||||
}
|
||||
should := time.Duration(float64(ts-s.beginTimestamp) / speed)
|
||||
s.Delta = should - elapsed
|
||||
// fmt.Println(speed, elapsed, should, s.Delta)
|
||||
if s.Delta > threshold {
|
||||
time.Sleep(min(s.Delta, time.Millisecond*500))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type AVTracks struct {
|
||||
*AVTrack
|
||||
SpeedControl
|
||||
util.Collection[reflect.Type, *AVTrack]
|
||||
sync.RWMutex
|
||||
baseTs time.Duration //from old publisher's lastTs
|
||||
@@ -145,7 +113,6 @@ type Publisher struct {
|
||||
OnGetPosition func() time.Time
|
||||
PullProxyConfig *PullProxyConfig
|
||||
dumpFile *os.File
|
||||
dropRate float64 // 丢帧率,0-1之间
|
||||
dropAfterTs time.Duration
|
||||
}
|
||||
|
||||
@@ -317,18 +284,21 @@ func (p *Publisher) AddSubscriber(subscriber *Subscriber) {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Publisher) writeAV(t *AVTrack, data IAVFrame) {
|
||||
func (p *Publisher) fixTimestamp(t *AVTrack, data IAVFrame) {
|
||||
frame := &t.Value
|
||||
frame.Wraps = append(frame.Wraps, data)
|
||||
ts := data.GetTimestamp()
|
||||
frame.CTS = data.GetCTS()
|
||||
bytesIn := frame.Wraps[0].GetSize()
|
||||
bytesIn := data.GetSize()
|
||||
t.AddBytesIn(bytesIn)
|
||||
frame.Timestamp = t.Tame(ts, t.FPS, p.Scale)
|
||||
}
|
||||
|
||||
func (p *Publisher) writeAV(t *AVTrack, data IAVFrame) {
|
||||
t.AcceptFrame(data)
|
||||
if p.Enabled(p, task.TraceLevel) {
|
||||
frame := &t.Value
|
||||
codec := t.FourCC().String()
|
||||
data := frame.Wraps[0].String()
|
||||
p.Trace("write", "seq", frame.Sequence, "baseTs", int32(t.BaseTs/time.Millisecond), "ts0", uint32(ts/time.Millisecond), "ts", uint32(frame.Timestamp/time.Millisecond), "codec", codec, "size", bytesIn, "data", data)
|
||||
p.Trace("write", "seq", frame.Sequence, "baseTs", int32(t.BaseTs/time.Millisecond), "ts0", uint32(data.GetTimestamp()/time.Millisecond), "ts", uint32(frame.Timestamp/time.Millisecond), "codec", codec, "size", data.GetSize(), "data", data.String())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -341,53 +311,13 @@ func (p *Publisher) trackAdded() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Publisher) changeDropFrameLevel(newLevel int) {
|
||||
p.Warn("change drop frame level", "from", p.VideoTrack.AVTrack.DropFrameLevel, "to", newLevel)
|
||||
p.VideoTrack.AVTrack.DropFrameLevel = newLevel
|
||||
p.VideoTrack.AVTrack.LastDropLevelChange = time.Now()
|
||||
}
|
||||
|
||||
func (p *Publisher) dropFrame(t *AVTrack) (drop bool) {
|
||||
needDropFrame := t.CheckIfNeedDropFrame(p.MaxFPS)
|
||||
if needDropFrame {
|
||||
dropRatio := float64(t.FPS)/float64(p.MaxFPS) - 1 // How many frames to drop for each frame kept
|
||||
p.dropRate = dropRatio / (dropRatio + 1) // 计算丢帧率
|
||||
defer func() {
|
||||
if time.Since(t.LastDropLevelChange) > time.Second && t.DropFrameLevel > 0 {
|
||||
p.changeDropFrameLevel(t.DropFrameLevel + 1)
|
||||
}
|
||||
}()
|
||||
}
|
||||
// Enhanced frame dropping strategy based on DropFrameLevel
|
||||
switch t.DropFrameLevel {
|
||||
case 0:
|
||||
if needDropFrame {
|
||||
p.changeDropFrameLevel(1)
|
||||
}
|
||||
case 1: // Drop P-frame
|
||||
if !t.Value.IDR {
|
||||
return true
|
||||
} else if !needDropFrame {
|
||||
p.changeDropFrameLevel(0)
|
||||
}
|
||||
default:
|
||||
if !needDropFrame {
|
||||
p.changeDropFrameLevel(1)
|
||||
} else {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (p *Publisher) WriteVideo(data IAVFrame) (err error) {
|
||||
t := p.VideoTrack.AVTrack
|
||||
defer func() {
|
||||
if err != nil {
|
||||
data.Recycle()
|
||||
}
|
||||
if t != nil {
|
||||
t.FixFPS()
|
||||
if err == ErrSkip {
|
||||
err = nil
|
||||
}
|
||||
}
|
||||
}()
|
||||
if err = p.Err(); err != nil {
|
||||
@@ -399,15 +329,17 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) {
|
||||
if !p.PubVideo {
|
||||
return ErrMuted
|
||||
}
|
||||
t := p.VideoTrack.AVTrack
|
||||
if t == nil {
|
||||
t = NewAVTrack(data, p.Logger.With("track", "video"), &p.Publish, p.videoReady)
|
||||
p.VideoTrack.Set(t)
|
||||
p.Call(p.trackAdded)
|
||||
}
|
||||
p.fixTimestamp(t, data)
|
||||
defer t.SpeedControl(p.Speed)
|
||||
oldCodecCtx := t.ICodecCtx
|
||||
err = data.Parse(t)
|
||||
if err == ErrSkip {
|
||||
data.Recycle()
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
codecCtxChanged := oldCodecCtx != t.ICodecCtx
|
||||
@@ -429,10 +361,9 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) {
|
||||
var idr *util.Ring[AVFrame]
|
||||
if t.IDRingList.Len() > 0 {
|
||||
idr = t.IDRingList.Back().Value
|
||||
if p.dropFrame(t) {
|
||||
if t.CheckIfNeedDropFrame(p.MaxFPS) {
|
||||
p.dropAfterTs = t.LastTs
|
||||
data.Recycle()
|
||||
return nil
|
||||
return ErrSkip
|
||||
} else {
|
||||
p.dropAfterTs = 0
|
||||
}
|
||||
@@ -492,7 +423,6 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) {
|
||||
}
|
||||
}
|
||||
t.Step()
|
||||
p.VideoTrack.speedControl(p.Speed, t.LastTs)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -501,9 +431,9 @@ func (p *Publisher) WriteAudio(data IAVFrame) (err error) {
|
||||
defer func() {
|
||||
if err != nil {
|
||||
data.Recycle()
|
||||
}
|
||||
if t != nil {
|
||||
t.FixFPS()
|
||||
if err == ErrSkip {
|
||||
err = nil
|
||||
}
|
||||
}
|
||||
}()
|
||||
if err = p.Err(); err != nil {
|
||||
@@ -515,26 +445,27 @@ func (p *Publisher) WriteAudio(data IAVFrame) (err error) {
|
||||
if !p.PubAudio {
|
||||
return ErrMuted
|
||||
}
|
||||
// 根据丢帧率进行音频帧丢弃
|
||||
if p.dropAfterTs > 0 {
|
||||
if t != nil {
|
||||
// 使用序列号进行平均丢帧
|
||||
if t.LastTs > p.dropAfterTs {
|
||||
data.Recycle()
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if t == nil {
|
||||
t = NewAVTrack(data, p.Logger.With("track", "audio"), &p.Publish, p.audioReady)
|
||||
p.AudioTrack.Set(t)
|
||||
p.Call(p.trackAdded)
|
||||
}
|
||||
p.fixTimestamp(t, data)
|
||||
defer t.SpeedControl(p.Speed)
|
||||
// 根据丢帧率进行音频帧丢弃
|
||||
if p.dropAfterTs > 0 {
|
||||
if t != nil {
|
||||
// 使用序列号进行平均丢帧
|
||||
if t.LastTs > p.dropAfterTs {
|
||||
return ErrSkip
|
||||
}
|
||||
}
|
||||
}
|
||||
oldCodecCtx := t.ICodecCtx
|
||||
err = data.Parse(t)
|
||||
if err == ErrSkip {
|
||||
data.Recycle()
|
||||
return nil
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
codecCtxChanged := oldCodecCtx != t.ICodecCtx
|
||||
if t.ICodecCtx == nil {
|
||||
@@ -584,7 +515,6 @@ func (p *Publisher) WriteAudio(data IAVFrame) (err error) {
|
||||
}
|
||||
}
|
||||
t.Step()
|
||||
p.AudioTrack.speedControl(p.Speed, t.LastTs)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -738,8 +668,12 @@ func (p *Publisher) Resume() {
|
||||
}
|
||||
p.Paused.Resolve()
|
||||
p.Paused = nil
|
||||
p.VideoTrack.pausedTime += time.Since(p.pauseTime)
|
||||
p.AudioTrack.pausedTime += time.Since(p.pauseTime)
|
||||
if p.HasVideoTrack() {
|
||||
p.VideoTrack.AddPausedTime(time.Since(p.pauseTime))
|
||||
}
|
||||
if p.HasAudioTrack() {
|
||||
p.AudioTrack.AddPausedTime(time.Since(p.pauseTime))
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Publisher) Seek(ts time.Time) {
|
||||
|
||||
Reference in New Issue
Block a user