diff --git a/track/aac.go b/track/aac.go index 1536cb6..b20b50e 100644 --- a/track/aac.go +++ b/track/aac.go @@ -1,11 +1,9 @@ package track import ( - "fmt" "io" "net" - "github.com/bluenviron/mediacommon/pkg/bits" "go.uber.org/zap" "m7s.live/engine/v4/codec" . "m7s.live/engine/v4/common" @@ -18,9 +16,9 @@ func NewAAC(puber IPuber, stuff ...any) (aac *AAC) { aac = &AAC{ Mode: 2, } - aac.SizeLength = 13 - aac.IndexLength = 3 - aac.IndexDeltaLength = 3 + aac.AACDecoder.SizeLength = 13 + aac.AACDecoder.IndexLength = 3 + aac.AACDecoder.IndexDeltaLength = 3 aac.CodecID = codec.CodecID_AAC aac.Channels = 2 aac.SampleSize = 16 @@ -34,7 +32,6 @@ func NewAAC(puber IPuber, stuff ...any) (aac *AAC) { type AAC struct { Audio - Mode int // 1为lbr,2为hbr fragments *util.BLL // 用于处理不完整的AU,缺少的字节数 } @@ -72,97 +69,16 @@ func (aac *AAC) WriteADTS(ts uint32, b util.IBytes) { func (aac *AAC) WriteRTPFrame(rtpItem *LIRTP) { aac.Value.RTP.Push(rtpItem) frame := &rtpItem.Value - if len(frame.Payload) < 2 { - // aac.fragments = aac.fragments[:0] + au, err := aac.AACDecoder.Decode(frame.Packet) + if err != nil { + aac.Error("decode error", zap.Error(err)) return } - if aac.SampleRate != 90000 { - aac.generateTimestamp(uint32(uint64(frame.Timestamp) * 90000 / uint64(aac.SampleRate))) - } - auHeaderLen := util.ReadBE[int](frame.Payload[:2]) //通常为16,即一个AU Header的长度 - if auHeaderLen == 0 { - aac.Value.AUList.Push(aac.BytesPool.GetShell(frame.Payload[:2])) - aac.Flush() - } else { - payload := frame.Payload[2:] - // AU-headers - dataLens, err := aac.readAUHeaders(payload, auHeaderLen) - if err != nil { - // discard pending fragmented packets - return - } - - pos := (auHeaderLen >> 3) - if (auHeaderLen % 8) != 0 { - pos++ - } - payload = payload[pos:] - - if aac.fragments == nil { - if frame.Header.Marker { - // AUs - for _, dataLen := range dataLens { - if len(payload) < int(dataLen) { - aac.fragments = &util.BLL{} - aac.fragments.Push(aac.BytesPool.GetShell(payload)) - // aac.fragments = aac.fragments[:0] - // aac.Error("payload is too short 1", zap.Int("dataLen", int(dataLen)), zap.Int("len", len(payload))) - return - } - aac.AppendAuBytes(payload[:dataLen]) - payload = payload[dataLen:] - } - } else { - if len(dataLens) != 1 { - // aac.fragments = aac.fragments[:0] - aac.Error("a fragmented packet can only contain one AU") - return - } - aac.fragments = &util.BLL{} - // if len(payload) < int(dataLens[0]) { - // aac.fragments = aac.fragments[:0] - // aac.Error("payload is too short 2", zap.Int("dataLen", int(dataLens[0])), zap.Int("len", len(payload))) - // return - // } - aac.fragments.Push(aac.BytesPool.GetShell(payload)) - // aac.fragments = append(aac.fragments, payload[:dataLens[0]]) - return - } - } else { - // we are decoding a fragmented AU - if len(dataLens) != 1 { - aac.fragments.Recycle() - aac.fragments = nil - // aac.fragments = aac.fragments[:0] - aac.Error("a fragmented packet can only contain one AU") - return - } - - // if len(payload) < int(dataLens[0]) { - // aac.fragments = aac.fragments[:0] - // aac.Error("payload is too short 3", zap.Int("dataLen", int(dataLens[0])), zap.Int("len", len(payload))) - // return - // } - - // if fragmentedSize := util.SizeOfBuffers(aac.fragments) + int(dataLens[0]); fragmentedSize > 5*1024 { - // aac.fragments = aac.fragments[:0] // discard pending fragmented packets - // aac.Error(fmt.Sprintf("AU size (%d) is too big (maximum is %d)", fragmentedSize, 5*1024)) - // return - // } - - // aac.fragments = append(aac.fragments, payload[:dataLens[0]]) - aac.fragments.Push(aac.BytesPool.GetShell(payload)) - if !frame.Header.Marker { - return - } - if uint64(aac.fragments.ByteLength) != dataLens[0] { - aac.Error("fragmented AU size is not correct", zap.Uint64("dataLen", dataLens[0]), zap.Int("len", aac.fragments.ByteLength)) - } - aac.Value.AUList.PushValue(aac.fragments) - // aac.AppendAuBytes(aac.fragments...) - - aac.fragments = nil + if len(au) > 0 { + if aac.SampleRate != 90000 { + aac.generateTimestamp(uint32(uint64(frame.Timestamp) * 90000 / uint64(aac.SampleRate))) } + aac.AppendAuBytes(au...) aac.Flush() } } @@ -205,62 +121,3 @@ func (aac *AAC) CompleteRTP(value *AVFrame) { } aac.PacketizeRTP(packets...) } - -func (aac *AAC) readAUHeaders(buf []byte, headersLen int) ([]uint64, error) { - firstRead := false - - count := 0 - for i := 0; i < headersLen; { - if i == 0 { - i += aac.SizeLength - i += aac.IndexLength - } else { - i += aac.SizeLength - i += aac.IndexDeltaLength - } - count++ - } - - dataLens := make([]uint64, count) - - pos := 0 - i := 0 - - for headersLen > 0 { - dataLen, err := bits.ReadBits(buf, &pos, aac.SizeLength) - if err != nil { - return nil, err - } - headersLen -= aac.SizeLength - - if !firstRead { - firstRead = true - if aac.IndexLength > 0 { - auIndex, err := bits.ReadBits(buf, &pos, aac.IndexLength) - if err != nil { - return nil, err - } - headersLen -= aac.IndexLength - - if auIndex != 0 { - return nil, fmt.Errorf("AU-index different than zero is not supported") - } - } - } else if aac.IndexDeltaLength > 0 { - auIndexDelta, err := bits.ReadBits(buf, &pos, aac.IndexDeltaLength) - if err != nil { - return nil, err - } - headersLen -= aac.IndexDeltaLength - - if auIndexDelta != 0 { - return nil, fmt.Errorf("AU-index-delta different than zero is not supported") - } - } - - dataLens[i] = dataLen - i++ - } - - return dataLens, nil -} diff --git a/track/audio.go b/track/audio.go index 68545ab..b2010b5 100644 --- a/track/audio.go +++ b/track/audio.go @@ -1,6 +1,7 @@ package track import ( + "github.com/bluenviron/gortsplib/v4/pkg/format/rtpmpeg4audio" "go.uber.org/zap" "m7s.live/engine/v4/codec" . "m7s.live/engine/v4/common" @@ -9,14 +10,12 @@ import ( type Audio struct { Media - CodecID codec.AudioCodecID - Channels byte - SampleSize byte - SizeLength int // 通常为13 - IndexLength int - IndexDeltaLength int - AVCCHead []byte // 音频包在AVCC格式中,AAC会有两个字节,其他的只有一个字节 + CodecID codec.AudioCodecID + Channels byte + SampleSize byte + AVCCHead []byte // 音频包在AVCC格式中,AAC会有两个字节,其他的只有一个字节 codec.AudioSpecificConfig + AACDecoder rtpmpeg4audio.Decoder } func (a *Audio) Attach() { diff --git a/util/ring-writer.go b/util/ring-writer.go index fae4806..b65e551 100644 --- a/util/ring-writer.go +++ b/util/ring-writer.go @@ -81,7 +81,6 @@ func (rb *RingWriter[T, F]) Reduce(size int) { p := rb.Unlink(size) pSize := size rb.Size -= size - defer rb.recycle(p) for i := 0; i < size; i++ { if p.Value.StartWrite() { p.Value.Reset() @@ -90,6 +89,7 @@ func (rb *RingWriter[T, F]) Reduce(size int) { } else { p.Value.Reset() if pSize == 1 { + // last one,无法删除最后一个节点,直接返回即可 return } p = p.Prev() @@ -98,6 +98,7 @@ func (rb *RingWriter[T, F]) Reduce(size int) { } p = p.Next() } + rb.recycle(p) } func (rb *RingWriter[T, F]) Dispose() {