diff --git a/subscriber.go b/subscriber.go index 1eac12c..fefa866 100644 --- a/subscriber.go +++ b/subscriber.go @@ -21,6 +21,11 @@ const ( SUBTYPE_RTP SUBTYPE_FLV ) +const ( + SUBSTATE_INIT = iota + SUBSTATE_FIRST + SUBSTATE_NORMAL +) type AudioFrame AVFrame[AudioSlice] type VideoFrame AVFrame[NALUSlice] @@ -100,6 +105,18 @@ type TrackPlayer struct { FirstAbsTS uint32 //订阅起始时间戳 } +func (tp *TrackPlayer) ReadVideo() (vp *AVFrame[NALUSlice]) { + vp = tp.Video.ring.Read(tp.Context) + tp.Video.Frame = vp + return +} + +func (tp *TrackPlayer) ReadAudio() (ap *AVFrame[AudioSlice]) { + ap = tp.Audio.ring.Read(tp.Context) + tp.Audio.Frame = ap + return +} + // Subscriber 订阅者实体定义 type Subscriber struct { IO[config.Subscribe] @@ -180,6 +197,7 @@ func (s *Subscriber) PlayBlock(subType byte) { s.Spesic.OnEvent(AudioDeConf(s.Audio.Track.DecoderConfiguration)) } sendVideoFrame = func(frame *AVFrame[NALUSlice]) { + // println(frame.Sequence, frame.AbsTime, frame.PTS, frame.DTS, frame.IFrame) spesic.OnEvent((*VideoFrame)(frame)) } sendAudioFrame = func(frame *AVFrame[AudioSlice]) { @@ -196,7 +214,6 @@ func (s *Subscriber) PlayBlock(subType byte) { } var videoSeq, audioSeq uint16 sendVideoFrame = func(frame *AVFrame[NALUSlice]) { - s.Video.Frame = frame for _, p := range frame.RTP { videoSeq++ vp := *p @@ -206,7 +223,6 @@ func (s *Subscriber) PlayBlock(subType byte) { } } sendAudioFrame = func(frame *AVFrame[AudioSlice]) { - s.Audio.Frame = frame for _, p := range frame.RTP { audioSeq++ vp := *p @@ -236,11 +252,10 @@ func (s *Subscriber) PlayBlock(subType byte) { spesic.OnEvent(append(result, util.PutBE(flvHeadCache[11:15], dataSize+11))) } sendVideoFrame = func(frame *AVFrame[NALUSlice]) { - s.Video.Frame = frame + // println(frame.Sequence, frame.AbsTime, frame.PTS, frame.DTS, frame.IFrame) sendFlvFrame(codec.FLV_TAG_TYPE_VIDEO, frame.AbsTime, frame.AVCC) } sendAudioFrame = func(frame *AVFrame[AudioSlice]) { - s.Audio.Frame = frame sendFlvFrame(codec.FLV_TAG_TYPE_AUDIO, frame.AbsTime, frame.AVCC) } } @@ -249,41 +264,42 @@ func (s *Subscriber) PlayBlock(subType byte) { for ctx.Err() == nil { hasVideo, hasAudio := s.Video.ring != nil && s.Config.SubVideo, s.Audio.ring != nil && s.Config.SubAudio if hasVideo { - for { - vp := s.Video.ring.Read(ctx) - s.Video.Frame = vp - if ctx.Err() != nil { - return - } - if vp.IFrame && s.Video.decConfChanged() { - sendVideoDecConf() - } + for ctx.Err() == nil { + var vp *AVFrame[NALUSlice] switch vstate { - case 0: + case SUBSTATE_INIT: + s.Video.ring.Ring = s.Video.Track.IDRing + vp = s.ReadVideo() startTime = time.Now() s.FirstAbsTS = vp.AbsTime firstSeq = vp.Sequence - s.Debug("firstIFrame", zap.Uint32("seq", vp.Sequence)) + s.Info("firstIFrame", zap.Uint32("seq", vp.Sequence)) if s.Config.LiveMode { - vstate = 1 + vstate = SUBSTATE_FIRST } else { - vstate = 3 - } - case 1: - // 防止过快消费 - if fast := time.Duration(vp.AbsTime-s.FirstAbsTS)*time.Millisecond - time.Since(startTime); fast > 0 && fast < time.Second { - time.Sleep(fast) + vstate = SUBSTATE_NORMAL } + case SUBSTATE_FIRST: if s.Video.Track.IDRing.Value.Sequence != firstSeq { s.Video.ring.Ring = s.Video.Track.IDRing // 直接跳到最近的关键帧 - vstate = 2 + vp = s.ReadVideo() + s.SkipTS = vp.AbsTime - beforeJump + s.Debug("skip to latest key frame", zap.Uint32("seq", vp.Sequence), zap.Uint32("skipTS", s.SkipTS)) + vstate = SUBSTATE_NORMAL + } else { + vp = s.ReadVideo() beforeJump = vp.AbsTime - continue + // 防止过快消费 + if fast := time.Duration(vp.AbsTime-s.FirstAbsTS)*time.Millisecond - time.Since(startTime); fast > 0 && fast < time.Second { + time.Sleep(fast) + } } - case 2: - vstate = 3 - s.SkipTS = vp.AbsTime - beforeJump - s.Debug("skip to latest key frame", zap.Uint32("seq", vp.Sequence), zap.Uint32("skipTS", s.SkipTS)) + case SUBSTATE_NORMAL: + vp = s.ReadVideo() + } + if vp.IFrame && s.Video.decConfChanged() { + // println(s.Video.confSeq, s.Video.Track.SPSInfo.Width, s.Video.Track.SPSInfo.Height) + sendVideoDecConf() } if !s.Config.IFrameOnly || vp.IFrame { sendVideoFrame(vp) @@ -294,7 +310,7 @@ func (s *Subscriber) PlayBlock(subType byte) { break } } - if vstate < 3 { + if vstate < SUBSTATE_NORMAL { continue } } @@ -307,8 +323,7 @@ func (s *Subscriber) PlayBlock(subType byte) { audioSent = true } for { - ap := s.Audio.ring.Read(ctx) - s.Audio.Frame = ap + ap := s.ReadAudio() if ctx.Err() != nil { return } diff --git a/track/video.go b/track/video.go index d7b6fa1..96ee363 100644 --- a/track/video.go +++ b/track/video.go @@ -3,7 +3,7 @@ package track import ( "bytes" - . "github.com/logrusorgru/aurora" + // . "github.com/logrusorgru/aurora" "go.uber.org/zap" "m7s.live/engine/v4/codec" . "m7s.live/engine/v4/common" @@ -65,7 +65,7 @@ func (vt *Video) ComputeGOP() { vt.GOP = int(vt.AVRing.RingBuffer.Value.Sequence - vt.IDRing.Value.Sequence) if l := vt.AVRing.RingBuffer.Size - vt.GOP - 5; l > 5 { vt.AVRing.RingBuffer.Size -= l - vt.Stream.Debug(Sprintf("resize(%d%s%d)", vt.AVRing.RingBuffer.Size+l, Blink("→"), vt.AVRing.RingBuffer.Size), zap.String("name", vt.Name)) + vt.Stream.Debug("resize", zap.Int("before", vt.AVRing.RingBuffer.Size+l), zap.Int("after", vt.AVRing.RingBuffer.Size), zap.String("name", vt.Name)) //缩小缓冲环节省内存 vt.Unlink(l).Do(func(v AVFrame[NALUSlice]) { if v.IFrame {