diff --git a/common/index.go b/common/index.go index c72ff10..150dff9 100644 --- a/common/index.go +++ b/common/index.go @@ -76,7 +76,7 @@ type AVTrack interface { CurrentFrame() *AVFrame Attach() Detach() - WriteAVCC(ts uint32, frame util.BLL) //写入AVCC格式的数据 + WriteAVCC(ts uint32, frame util.BLL) error //写入AVCC格式的数据 WriteRTP([]byte) WriteRTPPack(*rtp.Packet) Flush() diff --git a/subscriber.go b/subscriber.go index 5da7d94..829cc91 100644 --- a/subscriber.go +++ b/subscriber.go @@ -27,12 +27,16 @@ const ( SUBSTATE_NORMAL ) +// AVCC 格式的序列帧 type VideoDeConf []byte + +// AVCC 格式的序列帧 type AudioDeConf []byte type AudioFrame struct { *AVFrame AbsTime uint32 PTS uint32 + DTS uint32 } type VideoFrame struct { *AVFrame @@ -168,13 +172,13 @@ func (s *Subscriber) PlayBlock(subType byte) { s.TrackPlayer.Context, s.TrackPlayer.CancelFunc = context.WithCancel(s.IO) ctx := s.TrackPlayer.Context sendVideoDecConf := func() { + spesic.OnEvent(s.Video.ParamaterSets) spesic.OnEvent(VideoDeConf(s.VideoReader.Track.SequenceHead)) } sendAudioDecConf := func() { spesic.OnEvent(AudioDeConf(s.AudioReader.Track.SequenceHead)) } - var sendVideoFrame func(*AVFrame) - var sendAudioFrame func(*AVFrame) + var sendAudioFrame, sendVideoFrame func(*AVFrame) switch subType { case SUBTYPE_RAW: sendVideoFrame = func(frame *AVFrame) { @@ -183,7 +187,7 @@ func (s *Subscriber) PlayBlock(subType byte) { } sendAudioFrame = func(frame *AVFrame) { // println("a", frame.Sequence, s.AudioReader.AbsTime) - spesic.OnEvent(AudioFrame{frame, s.AudioReader.AbsTime, frame.DTS - s.VideoReader.SkipTs*90}) + spesic.OnEvent(AudioFrame{frame, s.AudioReader.AbsTime, s.AudioReader.AbsTime * 90, s.AudioReader.AbsTime * 90}) } case SUBTYPE_RTP: var videoSeq, audioSeq uint16 @@ -201,7 +205,7 @@ func (s *Subscriber) PlayBlock(subType byte) { audioSeq++ vp := *p vp.Header.SequenceNumber = audioSeq - vp.Header.Timestamp = vp.Header.Timestamp - s.VideoReader.SkipTs*90 + vp.Header.Timestamp = vp.Header.Timestamp - s.AudioReader.SkipTs*90 spesic.OnEvent((AudioRTP)(vp)) } } @@ -225,10 +229,11 @@ func (s *Subscriber) PlayBlock(subType byte) { // spesic.OnEvent(FLVFrame(copyBuffers(s.Audio.Track.DecoderConfiguration.FLV))) } sendVideoFrame = func(frame *AVFrame) { - // println(frame.Sequence, frame.AbsTime, frame.DeltaTime, frame.IFrame) + // println(frame.Sequence, s.VideoReader.AbsTime, frame.DeltaTime, frame.IFrame) sendFlvFrame(codec.FLV_TAG_TYPE_VIDEO, s.VideoReader.AbsTime, frame.AVCC.ToBuffers()...) } sendAudioFrame = func(frame *AVFrame) { + // println(frame.Sequence, s.AudioReader.AbsTime, frame.DeltaTime) sendFlvFrame(codec.FLV_TAG_TYPE_AUDIO, s.AudioReader.AbsTime, frame.AVCC.ToBuffers()...) } } @@ -237,32 +242,49 @@ func (s *Subscriber) PlayBlock(subType byte) { if s.Args.Has(s.Config.SubModeArgName) { subMode, _ = strconv.Atoi(s.Args.Get(s.Config.SubModeArgName)) } + var videoFrame, audioFrame *AVFrame for ctx.Err() == nil { hasVideo, hasAudio := s.VideoReader.Track != nil && s.Config.SubVideo, s.AudioReader.Track != nil && s.Config.SubAudio if hasVideo { + if videoFrame != nil { + sendVideoFrame(videoFrame) + videoFrame = nil + } for ctx.Err() == nil { s.VideoReader.Read(ctx, subMode) frame := s.VideoReader.Frame // println("video", frame.Sequence, frame.AbsTime) - if frame == nil { + if frame == nil || ctx.Err() != nil { return } + if audioFrame != nil { + if frame.AbsTime > audioFrame.AbsTime { + sendAudioFrame(audioFrame) + audioFrame = nil + } + } if frame.IFrame && s.VideoReader.DecConfChanged() { s.VideoReader.ConfSeq = s.VideoReader.Track.SequenceHeadSeq // println(s.Video.confSeq, s.Video.Track.SPSInfo.Width, s.Video.Track.SPSInfo.Height) sendVideoDecConf() } if !s.Config.IFrameOnly || frame.IFrame { - sendVideoFrame(frame) - } - if frame.AbsTime > lastPlayAbsTime { - lastPlayAbsTime = frame.AbsTime - break + if frame.AbsTime > lastPlayAbsTime { + lastPlayAbsTime = frame.AbsTime + videoFrame = frame + break + } else { + sendVideoFrame(frame) + } } } } // 正常模式下或者纯音频模式下,音频开始播放 if hasAudio { + if audioFrame != nil { + sendAudioFrame(audioFrame) + audioFrame = nil + } for ctx.Err() == nil { switch s.AudioReader.State { case track.READSTATE_INIT: @@ -277,17 +299,25 @@ func (s *Subscriber) PlayBlock(subType byte) { s.AudioReader.Read(ctx, subMode) frame := s.AudioReader.Frame // println("audio", frame.Sequence, frame.AbsTime) - if frame == nil { + if frame == nil || ctx.Err() != nil { return } + if videoFrame != nil { + if frame.AbsTime > videoFrame.AbsTime { + sendVideoFrame(videoFrame) + videoFrame = nil + } + } if s.AudioReader.DecConfChanged() { s.AudioReader.ConfSeq = s.AudioReader.Track.SequenceHeadSeq sendAudioDecConf() } - sendAudioFrame(frame) if frame.AbsTime > lastPlayAbsTime { lastPlayAbsTime = frame.AbsTime + audioFrame = frame break + } else { + sendAudioFrame(frame) } } } diff --git a/track/aac.go b/track/aac.go index c3f35e4..d467be1 100644 --- a/track/aac.go +++ b/track/aac.go @@ -1,6 +1,7 @@ package track import ( + "io" "net" "time" @@ -100,10 +101,10 @@ func (aac *AAC) WriteAVCCSequenceHead(sh []byte) { aac.WriteSequenceHead(append([]byte{0xAF, 0x00}, sh...)) } -func (aac *AAC) WriteAVCC(ts uint32, frame util.BLL) { +func (aac *AAC) WriteAVCC(ts uint32, frame util.BLL) error { if l := frame.ByteLength; l < 4 { aac.Stream.Error("AVCC data too short", zap.Int("len", l)) - return + return io.ErrShortWrite } if frame.GetByte(1) == 0 { aac.WriteSequenceHead(frame.ToBytes()) @@ -114,6 +115,7 @@ func (aac *AAC) WriteAVCC(ts uint32, frame util.BLL) { aac.AppendAuBytes(au...) aac.Audio.WriteAVCC(ts, frame) } + return nil } func (aac *AAC) CompleteRTP(value *AVFrame) { diff --git a/track/base.go b/track/base.go index f0ee042..4c038fc 100644 --- a/track/base.go +++ b/track/base.go @@ -93,7 +93,7 @@ func (av *Media) SnapForJson() { av.RawPart = av.RawPart[:0] } av.RawSize = v.AUList.ByteLength - r := v.AUList.Next.Value.NewReader() + r := v.AUList.NewReader() for b, err := r.ReadByte(); err == nil && len(av.RawPart) < 10; b, err = r.ReadByte() { av.RawPart = append(av.RawPart, int(b)) } diff --git a/track/g711.go b/track/g711.go index 17d5f4c..d89870e 100644 --- a/track/g711.go +++ b/track/g711.go @@ -1,6 +1,7 @@ package track import ( + "io" "time" "go.uber.org/zap" @@ -36,17 +37,18 @@ type G711 struct { Audio } -func (g711 *G711) WriteAVCC(ts uint32, frame util.BLL) { +func (g711 *G711) WriteAVCC(ts uint32, frame util.BLL) error { if l := frame.ByteLength; l < 2 { g711.Stream.Error("AVCC data too short", zap.Int("len", l)) - return + return io.ErrShortWrite } g711.Value.AUList.Push(g711.BytesPool.GetShell(frame.Next.Value[1:])) - frame.Next.Range(func(v util.BLI) bool { + frame.Range(func(v util.BLI) bool { g711.Value.AUList.Push(g711.BytesPool.GetShell(v)) return true }) g711.Audio.WriteAVCC(ts, frame) + return nil } func (g711 *G711) WriteRTPFrame(frame *RTPFrame) { diff --git a/track/h264.go b/track/h264.go index 8d5ace5..852c207 100644 --- a/track/h264.go +++ b/track/h264.go @@ -1,6 +1,7 @@ package track import ( + "io" "time" "go.uber.org/zap" @@ -65,22 +66,23 @@ func (vt *H264) WriteSliceBytes(slice []byte) { case codec.NALU_SEI: vt.AppendAuBytes(slice) case codec.NALU_Access_Unit_Delimiter: + case codec.NALU_Filler_Data: default: vt.Stream.Error("H264 WriteSliceBytes naluType not support", zap.Int("naluType", int(naluType))) } } -func (vt *H264) WriteAVCC(ts uint32, frame util.BLL) { +func (vt *H264) WriteAVCC(ts uint32, frame util.BLL) (err error) { if l := frame.ByteLength; l < 6 { vt.Stream.Error("AVCC data too short", zap.Int("len", l)) - return + return io.ErrShortWrite } if frame.GetByte(1) == 0 { vt.SequenceHead = frame.ToBytes() frame.Recycle() vt.updateSequeceHead() var info codec.AVCDecoderConfigurationRecord - if _, err := info.Unmarshal(vt.SequenceHead[5:]); err == nil { + if _, err = info.Unmarshal(vt.SequenceHead[5:]); err == nil { vt.SPSInfo, _ = codec.ParseSPS(info.SequenceParameterSetNALUnit) vt.nalulenSize = int(info.LengthSizeMinusOne&3 + 1) vt.SPS = info.SequenceParameterSetNALUnit @@ -91,8 +93,9 @@ func (vt *H264) WriteAVCC(ts uint32, frame util.BLL) { vt.Stream.Error("H264 ParseSpsPps Error") vt.Stream.Close() } + return } else { - vt.Video.WriteAVCC(ts, frame) + return vt.Video.WriteAVCC(ts, frame) } } diff --git a/track/h265.go b/track/h265.go index d3d2ba5..6394e6d 100644 --- a/track/h265.go +++ b/track/h265.go @@ -1,6 +1,7 @@ package track import ( + "io" "time" "go.uber.org/zap" @@ -62,27 +63,25 @@ func (vt *H265) WriteSliceBytes(slice []byte) { } } -func (vt *H265) WriteAVCC(ts uint32, frame util.BLL) { +func (vt *H265) WriteAVCC(ts uint32, frame util.BLL) (err error) { if l := frame.ByteLength; l < 6 { vt.Stream.Error("AVCC data too short", zap.Int("len", l)) - return + return io.ErrShortWrite } if frame.GetByte(1) == 0 { vt.SequenceHead = frame.ToBytes() frame.Recycle() vt.updateSequeceHead() - if vps, sps, pps, err := codec.ParseVpsSpsPpsFromSeqHeaderWithoutMalloc(vt.SequenceHead); err == nil { + if vt.VPS, vt.SPS, vt.PPS, err = codec.ParseVpsSpsPpsFromSeqHeaderWithoutMalloc(vt.SequenceHead); err == nil { vt.SPSInfo, _ = codec.ParseHevcSPS(vt.SequenceHead) vt.nalulenSize = (int(vt.SequenceHead[26]) & 0x03) + 1 - vt.VPS = vps - vt.SPS = sps - vt.PPS = pps } else { vt.Stream.Error("H265 ParseVpsSpsPps Error") vt.Stream.Close() } + return } else { - vt.Video.WriteAVCC(ts, frame) + return vt.Video.WriteAVCC(ts, frame) } } diff --git a/track/video.go b/track/video.go index 2568415..5d7b134 100644 --- a/track/video.go +++ b/track/video.go @@ -101,11 +101,11 @@ func (vt *Video) WriteAnnexB(pts uint32, dts uint32, frame AnnexBFrame) { vt.Flush() } -func (vt *Video) WriteAVCC(ts uint32, frame util.BLL) { +func (vt *Video) WriteAVCC(ts uint32, frame util.BLL) error { r := frame.NewReader() b, err := r.ReadByte() if err != nil { - return + return err } b = b >> 4 vt.Value.IFrame = b == 1 || b == 4 @@ -113,13 +113,14 @@ func (vt *Video) WriteAVCC(ts uint32, frame util.BLL) { vt.Value.WriteAVCC(ts, frame) cts, err := r.ReadBE(3) if err != nil { - return + return err } vt.Value.PTS = (ts + cts) * 90 for nalulen, err := r.ReadBE(vt.nalulenSize); err == nil; nalulen, err = r.ReadBE(vt.nalulenSize) { vt.AppendAuBytes(r.ReadN(int(nalulen))...) } vt.Flush() + return nil } func (vt *Video) WriteSliceByte(b ...byte) { diff --git a/util/amf.go b/util/amf.go index 121e7aa..498f3c3 100644 --- a/util/amf.go +++ b/util/amf.go @@ -83,27 +83,29 @@ type AMF struct { Buffer } -func (amf *AMF) ReadShortString() string { - value, _ := amf.Unmarshal() - return value.(string) -} - -func (amf *AMF) ReadNumber() float64 { - value, _ := amf.Unmarshal() - return value.(float64) -} - -func (amf *AMF) ReadObject() map[string]any { - value, _ := amf.Unmarshal() - if value == nil { - return nil +func ReadAMF[T any](amf *AMF) (result T) { + value, err := amf.Unmarshal() + if err != nil { + return } - return value.(map[string]any) + result, _ = value.(T) + return } -func (amf *AMF) ReadBool() bool { - value, _ := amf.Unmarshal() - return value.(bool) +func (amf *AMF) ReadShortString() (result string) { + return ReadAMF[string](amf) +} + +func (amf *AMF) ReadNumber() (result float64) { + return ReadAMF[float64](amf) +} + +func (amf *AMF) ReadObject() (result map[string]any) { + return ReadAMF[map[string]any](amf) +} + +func (amf *AMF) ReadBool() (result bool) { + return ReadAMF[bool](amf) } func (amf *AMF) readKey() (string, error) { diff --git a/util/pool.go b/util/pool.go index 9b37d8f..965b5f2 100644 --- a/util/pool.go +++ b/util/pool.go @@ -69,6 +69,29 @@ func (r *BLLReader) ReadN(n int) (result net.Buffers) { return } +type BLLsReader struct { + *ListItem[*BLL] + BLLReader +} + +func (r *BLLsReader) CanRead() bool { + return r.ListItem != nil && !r.IsRoot() +} + +func (r *BLLsReader) ReadByte() (b byte, err error) { + if r.BLLReader.CanRead() { + b, err = r.BLLReader.ReadByte() + if err == nil { + return + } + } + r.ListItem = r.Next + if !r.CanRead() { + return 0, io.EOF + } + r.BLLReader = *r.Value.NewReader() + return r.BLLReader.ReadByte() +} type BLLs struct { List[*BLL] ByteLength int @@ -121,6 +144,10 @@ func (list *BLLs) Recycle() { list.ByteLength = 0 } +func (list *BLLs) NewReader() *BLLsReader { + return &BLLsReader{list.Next, *list.Next.Value.NewReader()} +} + // ByteLinkList type BLL struct { List[BLI]