🐛 FIX: 🫀出血

This commit is contained in:
dexter
2022-09-14 14:17:50 +08:00
parent eeda0d4fb5
commit b7969f9f58
2 changed files with 48 additions and 33 deletions

View File

@@ -21,6 +21,11 @@ const (
SUBTYPE_RTP SUBTYPE_RTP
SUBTYPE_FLV SUBTYPE_FLV
) )
const (
SUBSTATE_INIT = iota
SUBSTATE_FIRST
SUBSTATE_NORMAL
)
type AudioFrame AVFrame[AudioSlice] type AudioFrame AVFrame[AudioSlice]
type VideoFrame AVFrame[NALUSlice] type VideoFrame AVFrame[NALUSlice]
@@ -100,6 +105,18 @@ type TrackPlayer struct {
FirstAbsTS uint32 //订阅起始时间戳 FirstAbsTS uint32 //订阅起始时间戳
} }
func (tp *TrackPlayer) ReadVideo() (vp *AVFrame[NALUSlice]) {
vp = tp.Video.ring.Read(tp.Context)
tp.Video.Frame = vp
return
}
func (tp *TrackPlayer) ReadAudio() (ap *AVFrame[AudioSlice]) {
ap = tp.Audio.ring.Read(tp.Context)
tp.Audio.Frame = ap
return
}
// Subscriber 订阅者实体定义 // Subscriber 订阅者实体定义
type Subscriber struct { type Subscriber struct {
IO[config.Subscribe] IO[config.Subscribe]
@@ -180,6 +197,7 @@ func (s *Subscriber) PlayBlock(subType byte) {
s.Spesic.OnEvent(AudioDeConf(s.Audio.Track.DecoderConfiguration)) s.Spesic.OnEvent(AudioDeConf(s.Audio.Track.DecoderConfiguration))
} }
sendVideoFrame = func(frame *AVFrame[NALUSlice]) { sendVideoFrame = func(frame *AVFrame[NALUSlice]) {
// println(frame.Sequence, frame.AbsTime, frame.PTS, frame.DTS, frame.IFrame)
spesic.OnEvent((*VideoFrame)(frame)) spesic.OnEvent((*VideoFrame)(frame))
} }
sendAudioFrame = func(frame *AVFrame[AudioSlice]) { sendAudioFrame = func(frame *AVFrame[AudioSlice]) {
@@ -196,7 +214,6 @@ func (s *Subscriber) PlayBlock(subType byte) {
} }
var videoSeq, audioSeq uint16 var videoSeq, audioSeq uint16
sendVideoFrame = func(frame *AVFrame[NALUSlice]) { sendVideoFrame = func(frame *AVFrame[NALUSlice]) {
s.Video.Frame = frame
for _, p := range frame.RTP { for _, p := range frame.RTP {
videoSeq++ videoSeq++
vp := *p vp := *p
@@ -206,7 +223,6 @@ func (s *Subscriber) PlayBlock(subType byte) {
} }
} }
sendAudioFrame = func(frame *AVFrame[AudioSlice]) { sendAudioFrame = func(frame *AVFrame[AudioSlice]) {
s.Audio.Frame = frame
for _, p := range frame.RTP { for _, p := range frame.RTP {
audioSeq++ audioSeq++
vp := *p vp := *p
@@ -236,11 +252,10 @@ func (s *Subscriber) PlayBlock(subType byte) {
spesic.OnEvent(append(result, util.PutBE(flvHeadCache[11:15], dataSize+11))) spesic.OnEvent(append(result, util.PutBE(flvHeadCache[11:15], dataSize+11)))
} }
sendVideoFrame = func(frame *AVFrame[NALUSlice]) { sendVideoFrame = func(frame *AVFrame[NALUSlice]) {
s.Video.Frame = frame // println(frame.Sequence, frame.AbsTime, frame.PTS, frame.DTS, frame.IFrame)
sendFlvFrame(codec.FLV_TAG_TYPE_VIDEO, frame.AbsTime, frame.AVCC) sendFlvFrame(codec.FLV_TAG_TYPE_VIDEO, frame.AbsTime, frame.AVCC)
} }
sendAudioFrame = func(frame *AVFrame[AudioSlice]) { sendAudioFrame = func(frame *AVFrame[AudioSlice]) {
s.Audio.Frame = frame
sendFlvFrame(codec.FLV_TAG_TYPE_AUDIO, frame.AbsTime, frame.AVCC) sendFlvFrame(codec.FLV_TAG_TYPE_AUDIO, frame.AbsTime, frame.AVCC)
} }
} }
@@ -249,41 +264,42 @@ func (s *Subscriber) PlayBlock(subType byte) {
for ctx.Err() == nil { for ctx.Err() == nil {
hasVideo, hasAudio := s.Video.ring != nil && s.Config.SubVideo, s.Audio.ring != nil && s.Config.SubAudio hasVideo, hasAudio := s.Video.ring != nil && s.Config.SubVideo, s.Audio.ring != nil && s.Config.SubAudio
if hasVideo { if hasVideo {
for { for ctx.Err() == nil {
vp := s.Video.ring.Read(ctx) var vp *AVFrame[NALUSlice]
s.Video.Frame = vp
if ctx.Err() != nil {
return
}
if vp.IFrame && s.Video.decConfChanged() {
sendVideoDecConf()
}
switch vstate { switch vstate {
case 0: case SUBSTATE_INIT:
s.Video.ring.Ring = s.Video.Track.IDRing
vp = s.ReadVideo()
startTime = time.Now() startTime = time.Now()
s.FirstAbsTS = vp.AbsTime s.FirstAbsTS = vp.AbsTime
firstSeq = vp.Sequence firstSeq = vp.Sequence
s.Debug("firstIFrame", zap.Uint32("seq", vp.Sequence)) s.Info("firstIFrame", zap.Uint32("seq", vp.Sequence))
if s.Config.LiveMode { if s.Config.LiveMode {
vstate = 1 vstate = SUBSTATE_FIRST
} else { } else {
vstate = 3 vstate = SUBSTATE_NORMAL
} }
case 1: case SUBSTATE_FIRST:
if s.Video.Track.IDRing.Value.Sequence != firstSeq {
s.Video.ring.Ring = s.Video.Track.IDRing // 直接跳到最近的关键帧
vp = s.ReadVideo()
s.SkipTS = vp.AbsTime - beforeJump
s.Debug("skip to latest key frame", zap.Uint32("seq", vp.Sequence), zap.Uint32("skipTS", s.SkipTS))
vstate = SUBSTATE_NORMAL
} else {
vp = s.ReadVideo()
beforeJump = vp.AbsTime
// 防止过快消费 // 防止过快消费
if fast := time.Duration(vp.AbsTime-s.FirstAbsTS)*time.Millisecond - time.Since(startTime); fast > 0 && fast < time.Second { if fast := time.Duration(vp.AbsTime-s.FirstAbsTS)*time.Millisecond - time.Since(startTime); fast > 0 && fast < time.Second {
time.Sleep(fast) time.Sleep(fast)
} }
if s.Video.Track.IDRing.Value.Sequence != firstSeq {
s.Video.ring.Ring = s.Video.Track.IDRing // 直接跳到最近的关键帧
vstate = 2
beforeJump = vp.AbsTime
continue
} }
case 2: case SUBSTATE_NORMAL:
vstate = 3 vp = s.ReadVideo()
s.SkipTS = vp.AbsTime - beforeJump }
s.Debug("skip to latest key frame", zap.Uint32("seq", vp.Sequence), zap.Uint32("skipTS", s.SkipTS)) if vp.IFrame && s.Video.decConfChanged() {
// println(s.Video.confSeq, s.Video.Track.SPSInfo.Width, s.Video.Track.SPSInfo.Height)
sendVideoDecConf()
} }
if !s.Config.IFrameOnly || vp.IFrame { if !s.Config.IFrameOnly || vp.IFrame {
sendVideoFrame(vp) sendVideoFrame(vp)
@@ -294,7 +310,7 @@ func (s *Subscriber) PlayBlock(subType byte) {
break break
} }
} }
if vstate < 3 { if vstate < SUBSTATE_NORMAL {
continue continue
} }
} }
@@ -307,8 +323,7 @@ func (s *Subscriber) PlayBlock(subType byte) {
audioSent = true audioSent = true
} }
for { for {
ap := s.Audio.ring.Read(ctx) ap := s.ReadAudio()
s.Audio.Frame = ap
if ctx.Err() != nil { if ctx.Err() != nil {
return return
} }

View File

@@ -3,7 +3,7 @@ package track
import ( import (
"bytes" "bytes"
. "github.com/logrusorgru/aurora" // . "github.com/logrusorgru/aurora"
"go.uber.org/zap" "go.uber.org/zap"
"m7s.live/engine/v4/codec" "m7s.live/engine/v4/codec"
. "m7s.live/engine/v4/common" . "m7s.live/engine/v4/common"
@@ -65,7 +65,7 @@ func (vt *Video) ComputeGOP() {
vt.GOP = int(vt.AVRing.RingBuffer.Value.Sequence - vt.IDRing.Value.Sequence) vt.GOP = int(vt.AVRing.RingBuffer.Value.Sequence - vt.IDRing.Value.Sequence)
if l := vt.AVRing.RingBuffer.Size - vt.GOP - 5; l > 5 { if l := vt.AVRing.RingBuffer.Size - vt.GOP - 5; l > 5 {
vt.AVRing.RingBuffer.Size -= l vt.AVRing.RingBuffer.Size -= l
vt.Stream.Debug(Sprintf("resize(%d%s%d)", vt.AVRing.RingBuffer.Size+l, Blink("→"), vt.AVRing.RingBuffer.Size), zap.String("name", vt.Name)) vt.Stream.Debug("resize", zap.Int("before", vt.AVRing.RingBuffer.Size+l), zap.Int("after", vt.AVRing.RingBuffer.Size), zap.String("name", vt.Name))
//缩小缓冲环节省内存 //缩小缓冲环节省内存
vt.Unlink(l).Do(func(v AVFrame[NALUSlice]) { vt.Unlink(l).Do(func(v AVFrame[NALUSlice]) {
if v.IFrame { if v.IFrame {