From b77e57b1bbcadb31a8bfd6379ced4d4f53e19f3c Mon Sep 17 00:00:00 2001 From: dexter <178529795@qq.com> Date: Thu, 19 Jan 2023 20:33:20 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=B9=E5=8F=98avcc=E7=B1=BB=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/frame.go | 55 +++++++++++++++----- publisher.go | 10 ++-- track/aac.go | 15 +++--- track/audio.go | 5 +- track/g711.go | 10 ++-- track/h264.go | 8 +-- track/h265.go | 12 ++--- track/video.go | 52 ++++++++++--------- {codec => util}/amf.go | 10 ++-- util/buffer.go | 21 ++++++++ util/pool.go | 113 +++++++++++++++++++++++++++++++++++++++++ 11 files changed, 241 insertions(+), 70 deletions(-) rename {codec => util}/amf.go (98%) create mode 100644 util/pool.go diff --git a/common/frame.go b/common/frame.go index 544f7c8..e39eba8 100644 --- a/common/frame.go +++ b/common/frame.go @@ -10,6 +10,7 @@ import ( ) type NALUSlice net.Buffers + // 裸数据片段 type RawSlice interface { ~[][]byte | ~[]byte @@ -61,8 +62,8 @@ func (nalu *NALUSlice) Append(b ...[]byte) { // return false // } -type AVCCFrame []byte // 一帧AVCC格式的数据 -type AnnexBFrame []byte // 一帧AnnexB格式数据 +type AVCCFrame net.Buffers // 一帧AVCC格式的数据 +type AnnexBFrame []byte // 一帧AnnexB格式数据 type RTPFrame struct { rtp.Packet } @@ -110,16 +111,16 @@ type AVFrame[T RawSlice] struct { canRead bool } -func (av *AVFrame[T]) AppendRaw(raw ...T) { - av.Raw = append(av.Raw, raw...) +func (av *AVFrame[T]) AppendRaw(raw T) { + av.Raw = append(av.Raw, raw) } -func (av *AVFrame[T]) AppendAVCC(avcc ...[]byte) { +func (av *AVFrame[T]) AppendAVCC(avcc AVCCFrame) { av.AVCC = append(av.AVCC, avcc...) } -func (av *AVFrame[T]) AppendRTP(rtp ...*RTPFrame) { - av.RTP = append(av.RTP, rtp...) +func (av *AVFrame[T]) AppendRTP(rtp *RTPFrame) { + av.RTP = append(av.RTP, rtp) } // Clear 清空数据 gc @@ -147,20 +148,50 @@ func (av *AVFrame[T]) Reset() { } func (avcc AVCCFrame) IsIDR() bool { - v := avcc[0] >> 4 + v := avcc[0][0] >> 4 return v == 1 || v == 4 //generated keyframe } func (avcc AVCCFrame) IsSequence() bool { - return avcc[1] == 0 + return avcc[0][1] == 0 } func (avcc AVCCFrame) CTS() uint32 { - return uint32(avcc[2])<<24 | uint32(avcc[3])<<8 | uint32(avcc[4]) + return uint32(avcc[0][2])<<24 | uint32(avcc[0][3])<<8 | uint32(avcc[0][4]) } func (avcc AVCCFrame) VideoCodecID() codec.VideoCodecID { - return codec.VideoCodecID(avcc[0] & 0x0F) + return codec.VideoCodecID(avcc[0][0] & 0x0F) } func (avcc AVCCFrame) AudioCodecID() codec.AudioCodecID { - return codec.AudioCodecID(avcc[0] >> 4) + return codec.AudioCodecID(avcc[0][0] >> 4) +} + +func (avcc *AVCCFrame) ReadByte() (b byte) { + cur := *avcc + b = cur[0][0] + if len(cur[0]) == 1 { + *avcc = cur[1:] + } else { + cur[0] = cur[0][1:] + } + return +} + +func (avcc *AVCCFrame) ReadN(n int) (result net.Buffers) { + require := n + cur := *avcc + for require > 0 && len(cur) > 0 { + firstLen := len(cur[0]) + if firstLen > require { + result = append(result, cur[0][:require]) + cur[0] = cur[0][require:] + return + } else { + result = append(result, cur[0]) + require -= firstLen + cur = cur[1:] + *avcc = cur + } + } + return } // func (annexb AnnexBFrame) ToSlices() (ret []NALUSlice) { diff --git a/publisher.go b/publisher.go index 5123321..f1d7a73 100644 --- a/publisher.go +++ b/publisher.go @@ -93,7 +93,7 @@ func (p *Publisher) WriteAVCCAudio(ts uint32, frame common.AVCCFrame) { a := track.NewAAC(p.Stream) p.AudioTrack = a a.Audio.SampleSize = 16 - a.AVCCHead = []byte{frame[0], 1} + a.AVCCHead = []byte{frame[0][0], 1} a.WriteAVCC(0, frame) case codec.CodecID_PCMA, codec.CodecID_PCMU: @@ -103,13 +103,13 @@ func (p *Publisher) WriteAVCCAudio(ts uint32, frame common.AVCCFrame) { } a := track.NewG711(p.Stream, alaw) p.AudioTrack = a - a.Audio.SampleRate = uint32(codec.SoundRate[(frame[0]&0x0c)>>2]) + a.Audio.SampleRate = uint32(codec.SoundRate[(frame[0][0]&0x0c)>>2]) a.Audio.SampleSize = 16 - if frame[0]&0x02 == 0 { + if frame[0][0]&0x02 == 0 { a.Audio.SampleSize = 8 } - a.Channels = frame[0]&0x01 + 1 - a.AVCCHead = frame[:1] + a.Channels = frame[0][0]&0x01 + 1 + a.AVCCHead = frame[0][:1] p.AudioTrack.WriteAVCC(ts, frame) default: p.Stream.Error("audio codec not support yet", zap.Uint8("codecId", uint8(codecID))) diff --git a/track/aac.go b/track/aac.go index a0d59b5..e1b4c05 100644 --- a/track/aac.go +++ b/track/aac.go @@ -74,20 +74,23 @@ func (aac *AAC) WriteRTPFrame(frame *RTPFrame) { } func (aac *AAC) WriteAVCC(ts uint32, frame AVCCFrame) { - if len(frame) < 4 { - aac.Audio.Stream.Error("AVCC data too short", zap.ByteString("data", frame)) + if l := util.SizeOfBuffers(frame); l < 4 { + aac.Stream.Error("AVCC data too short", zap.Int("len", l)) return } if frame.IsSequence() { - aac.Audio.DecoderConfiguration.AVCC = net.Buffers{frame} - config1, config2 := frame[2], frame[3] + aac.Audio.DecoderConfiguration.AVCC = net.Buffers(frame) + config1, config2 := frame[0][2], frame[0][3] aac.Profile = (config1 & 0xF8) >> 3 aac.Channels = ((config2 >> 3) & 0x0F) //声道 aac.Audio.SampleRate = uint32(codec.SamplingFrequencies[((config1&0x7)<<1)|(config2>>7)]) - aac.Audio.DecoderConfiguration.Raw = frame[2:] + aac.Audio.DecoderConfiguration.Raw = frame[0][2:] aac.Attach() } else { - aac.Value.AppendRaw(frame[2:]) + aac.Value.AppendRaw(frame[0][2:]) + for _, data := range frame[1:] { + aac.Value.AppendRaw(data) + } aac.Audio.WriteAVCC(ts, frame) } } diff --git a/track/audio.go b/track/audio.go index 97be1d3..0764823 100644 --- a/track/audio.go +++ b/track/audio.go @@ -99,10 +99,7 @@ func (av *Audio) WriteAVCC(ts uint32, frame AVCCFrame) { } func (a *Audio) CompleteAVCC(value *AVFrame[[]byte]) { - value.AppendAVCC(a.AVCCHead) - for _, raw := range value.Raw { - value.AppendAVCC(raw) - } + value.AVCC = append(append(value.AVCC, a.AVCCHead), value.Raw...) } func (a *Audio) CompleteRTP(value *AVFrame[[]byte]) { diff --git a/track/g711.go b/track/g711.go index f304283..6f9dd3e 100644 --- a/track/g711.go +++ b/track/g711.go @@ -6,6 +6,7 @@ import ( "go.uber.org/zap" "m7s.live/engine/v4/codec" . "m7s.live/engine/v4/common" + "m7s.live/engine/v4/util" ) func NewG711(stream IStream, alaw bool) (g711 *G711) { @@ -33,11 +34,14 @@ type G711 struct { } func (g711 *G711) WriteAVCC(ts uint32, frame AVCCFrame) { - if len(frame) < 2 { - g711.Stream.Error("AVCC data too short", zap.ByteString("data", frame)) + if l := util.SizeOfBuffers(frame); l < 2 { + g711.Stream.Error("AVCC data too short", zap.Int("len", l)) return } - g711.Value.AppendRaw(frame[1:]) + g711.Value.AppendRaw(frame[0][1:]) + for _, data := range frame[1:] { + g711.Value.AppendRaw(data) + } g711.Audio.WriteAVCC(ts, frame) } diff --git a/track/h264.go b/track/h264.go index 8f57588..9abb213 100644 --- a/track/h264.go +++ b/track/h264.go @@ -65,16 +65,16 @@ func (vt *H264) WriteSliceBytes(slice []byte) { } func (vt *H264) WriteAVCC(ts uint32, frame AVCCFrame) { - if len(frame) < 6 { - vt.Stream.Error("AVCC data too short", zap.ByteString("data", frame)) + if l:=util.SizeOfBuffers(frame);l < 6 { + vt.Stream.Error("AVCC data too short", zap.Int("len", l)) return } if frame.IsSequence() { vt.dcChanged = true vt.Video.DecoderConfiguration.Seq++ - vt.Video.DecoderConfiguration.AVCC = net.Buffers{frame} + vt.Video.DecoderConfiguration.AVCC = net.Buffers(frame) var info codec.AVCDecoderConfigurationRecord - if _, err := info.Unmarshal(frame[5:]); err == nil { + if _, err := info.Unmarshal(frame[0][5:]); err == nil { vt.SPSInfo, _ = codec.ParseSPS(info.SequenceParameterSetNALUnit) vt.nalulenSize = int(info.LengthSizeMinusOne&3 + 1) vt.Video.DecoderConfiguration.Raw[0] = info.SequenceParameterSetNALUnit diff --git a/track/h265.go b/track/h265.go index 85e6e64..8d7b41b 100644 --- a/track/h265.go +++ b/track/h265.go @@ -60,17 +60,17 @@ func (vt *H265) WriteSliceBytes(slice []byte) { } func (vt *H265) WriteAVCC(ts uint32, frame AVCCFrame) { - if len(frame) < 6 { - vt.Stream.Error("AVCC data too short", zap.ByteString("data", frame)) + if l := util.SizeOfBuffers(frame); l < 6 { + vt.Stream.Error("AVCC data too short", zap.Int("len", l)) return } if frame.IsSequence() { vt.Video.dcChanged = true vt.Video.DecoderConfiguration.Seq++ - vt.Video.DecoderConfiguration.AVCC = net.Buffers{frame} - if vps, sps, pps, err := codec.ParseVpsSpsPpsFromSeqHeaderWithoutMalloc(frame); err == nil { - vt.Video.SPSInfo, _ = codec.ParseHevcSPS(frame) - vt.Video.nalulenSize = (int(frame[26]) & 0x03) + 1 + vt.Video.DecoderConfiguration.AVCC = net.Buffers(frame) + if vps, sps, pps, err := codec.ParseVpsSpsPpsFromSeqHeaderWithoutMalloc(frame[0]); err == nil { + vt.Video.SPSInfo, _ = codec.ParseHevcSPS(frame[0]) + vt.Video.nalulenSize = (int(frame[0][26]) & 0x03) + 1 vt.Video.DecoderConfiguration.Raw[0] = vps vt.Video.DecoderConfiguration.Raw[1] = sps vt.Video.DecoderConfiguration.Raw[2] = pps diff --git a/track/video.go b/track/video.go index f8bc62b..5fff597 100644 --- a/track/video.go +++ b/track/video.go @@ -89,10 +89,10 @@ func (vt *Video) PlayFullAnnexB(ctx context.Context, onMedia func(net.Buffers) e func (vt *Video) computeGOP() { vt.idrCount++ if vt.IDRing != nil { - vt.GOP = int(vt.AVRing.RingBuffer.Value.Sequence - vt.IDRing.Value.Sequence) - if l := vt.AVRing.RingBuffer.Size - vt.GOP - 5; l > 5 { - vt.AVRing.RingBuffer.Size -= l - vt.Stream.Debug("resize", zap.Int("before", vt.AVRing.RingBuffer.Size+l), zap.Int("after", vt.AVRing.RingBuffer.Size), zap.String("name", vt.Name)) + vt.GOP = int(vt.Value.Sequence - vt.IDRing.Value.Sequence) + if l := vt.AVRing.Size - vt.GOP - 5; l > 5 { + vt.AVRing.Size -= l + vt.Stream.Debug("resize", zap.Int("before", vt.AVRing.Size+l), zap.Int("after", vt.AVRing.Size), zap.String("name", vt.Name)) //缩小缓冲环节省内存 vt.Unlink(l).Do(func(v AVFrame[NALUSlice]) { if v.IFrame { @@ -102,7 +102,7 @@ func (vt *Video) computeGOP() { }) } } - vt.IDRing = vt.AVRing.RingBuffer.Ring + vt.IDRing = vt.AVRing.Ring } func (vt *Video) writeAnnexBSlice(annexb AnnexBFrame) { @@ -134,19 +134,17 @@ func (vt *Video) WriteAVCC(ts uint32, frame AVCCFrame) { vt.Media.WriteAVCC(ts, frame) vt.Value.DTS = ts * 90 vt.Value.PTS = (ts + frame.CTS()) * 90 - for nalus := frame[5:]; len(nalus) > vt.nalulenSize; { - nalulen := util.ReadBE[int](nalus[:vt.nalulenSize]) + frame.ReadN(5) + for len(frame) > 0 { + nalulen := 0 + for i, n := 0, vt.nalulenSize; i < n; i++ { + nalulen += int(frame.ReadByte()) << ((n - i - 1) << 3) + } if nalulen == 0 { - vt.Stream.Warn("WriteAVCC with nalulen=0", zap.Int("len", len(nalus))) + vt.Stream.Warn("WriteAVCC with nalulen=0") return } - if end := nalulen + vt.nalulenSize; len(nalus) >= end { - vt.WriteRawBytes(nalus[vt.nalulenSize:end]) - nalus = nalus[end:] - } else { - vt.Stream.Error("WriteAVCC", zap.Int("len", len(nalus)), zap.Int("naluLenSize", vt.nalulenSize), zap.Int("end", end)) - break - } + vt.WriteSlice(NALUSlice(frame.ReadN(nalulen))) } vt.Flush() } @@ -155,8 +153,12 @@ func (vt *Video) WriteSliceByte(b ...byte) { vt.WriteSliceBytes(b) } +func (vt *Video) WriteSlice(slice NALUSlice) { + vt.Value.AppendRaw(slice) +} + func (vt *Video) WriteRawBytes(slice []byte) { - if naluSlice := util.MallocSlice(&vt.AVRing.Value.Raw); naluSlice == nil { + if naluSlice := util.MallocSlice(&vt.Value.Raw); naluSlice == nil { vt.Value.AppendRaw(NALUSlice{slice}) } else { naluSlice.Reset(slice) @@ -221,10 +223,10 @@ func (vt *Video) CompleteAVCC(rv *AVFrame[NALUSlice]) { // 写入CTS util.PutBE(b[2:5], (rv.PTS-rv.DTS)/90) lengths := b.Malloc(len(rv.Raw) * 4) //每个slice的长度内存复用 - rv.AppendAVCC(b.SubBuf(0, 5)) + rv.AVCC = append(rv.AVCC, b.SubBuf(0, 5)) for i, nalu := range rv.Raw { - rv.AppendAVCC(util.PutBE(lengths.SubBuf(i*4, 4), util.SizeOfBuffers(nalu))) - rv.AppendAVCC(nalu...) + rv.AVCC = append(rv.AVCC, util.PutBE(lengths.SubBuf(i*4, 4), util.SizeOfBuffers(nalu))) + rv.AVCC = append(rv.AVCC, nalu...) } } @@ -250,17 +252,19 @@ func (vt *Video) Flush() { return } } - // 仅存一枚I帧 - if vt.idrCount == 1 { - // 下一帧为I帧,即将覆盖,需要扩环 - if vt.Next().Value.IFrame { - if vt.AVRing.RingBuffer.Size < 256 { + // 下一帧为I帧,即将覆盖,需要扩环 + if vt.Next().Value.IFrame { + // 仅存一枚I帧 + if vt.idrCount == 1 { + if vt.AVRing.Size < 256 { + vt.Stream.Debug("resize", zap.Int("before", vt.AVRing.Size), zap.Int("after", vt.AVRing.Size+5), zap.String("name", vt.Name)) vt.Link(util.NewRing[AVFrame[NALUSlice]](5)) // 扩大缓冲环 } } else { vt.idrCount-- } } + vt.Media.Flush() vt.dcChanged = false } diff --git a/codec/amf.go b/util/amf.go similarity index 98% rename from codec/amf.go rename to util/amf.go index b4ae9ad..121e7aa 100644 --- a/codec/amf.go +++ b/util/amf.go @@ -1,11 +1,9 @@ -package codec +package util import ( "fmt" "io" "reflect" - - "m7s.live/engine/v4/util" ) // Action Message Format -- AMF 0 @@ -82,7 +80,7 @@ var ( type EcmaArray map[string]any type AMF struct { - util.Buffer + Buffer } func (amf *AMF) ReadShortString() string { @@ -136,7 +134,7 @@ func (amf *AMF) Unmarshal() (obj any, err error) { if !amf.CanRead() { return nil, io.ErrUnexpectedEOF } - defer func(b util.Buffer) { + defer func(b Buffer) { if err != nil { amf.Buffer = b } @@ -239,7 +237,7 @@ func (amf *AMF) Marshal(v any) []byte { amf.WriteString(vv) case float64, uint, float32, int, int16, int32, int64, uint16, uint32, uint64, uint8, int8: amf.WriteByte(AMF0_NUMBER) - amf.WriteFloat64(util.ToFloat64(vv)) + amf.WriteFloat64(ToFloat64(vv)) case bool: amf.WriteByte(AMF0_BOOLEAN) if vv { diff --git a/util/buffer.go b/util/buffer.go index bb66ee5..26a14db 100644 --- a/util/buffer.go +++ b/util/buffer.go @@ -3,6 +3,7 @@ package util import ( "encoding/binary" "math" + "net" ) type Buffer []byte @@ -83,15 +84,35 @@ func (b *Buffer) Malloc(count int) Buffer { } return b.SubBuf(l, count) } + func (b *Buffer) Reset() { *b = b.SubBuf(0, 0) } + func (b *Buffer) Glow(n int) { l := b.Len() b.Malloc(n) *b = b.SubBuf(0, l) } +func (b *Buffer) Split(n int) (result net.Buffers) { + origin := *b + for { + if b.CanReadN(n) { + result = append(result, b.ReadN(n)) + } else { + result = append(result, *b) + *b = origin + return + } + } +} + +func (b *Buffer) MarshalAMFs(v ...any) { + amf := AMF{*b} + *b = amf.Marshals(v...) +} + // MallocSlice 用来对容量够的slice进行长度扩展+1,并返回新的位置的指针,用于写入 func MallocSlice[T any](slice *[]T) *T { oslice := *slice diff --git a/util/pool.go b/util/pool.go new file mode 100644 index 0000000..27dea41 --- /dev/null +++ b/util/pool.go @@ -0,0 +1,113 @@ +package util + +import "net" + +type BytesLinkList struct { + Head *BytesLinkItem + Tail *BytesLinkItem + Length int + ByteLength int +} + +func (list *BytesLinkList) Push(item *BytesLinkItem) { + if list == nil { + return + } + if list.Head == nil { + list.Head = item + list.Tail = item + list.Length = 1 + list.ByteLength = item.Len() + return + } + list.Tail.Next = item + list.Tail = item + list.Length++ + list.ByteLength += item.Len() +} + +func (list *BytesLinkList) Shift() (item *BytesLinkItem) { + if list.Head == nil { + return nil + } + item = list.Head + list.Head = list.Head.Next + list.Length-- + list.ByteLength -= item.Len() + return +} + +func (list *BytesLinkList) ToBuffers() (result net.Buffers) { + for p := list.Head; p != nil; p = p.Next { + result = append(result, p.Bytes) + } + return +} + +// 全部回收掉 +func (list *BytesLinkList) Recycle() { + for p := list.Head; p != nil; p = p.Next { + p.Pool.Push(p) + } + list.Head = nil + list.Tail = nil + list.Length = 0 + list.ByteLength = 0 +} + +type BytesLinkItem struct { + Next *BytesLinkItem + Bytes []byte + Pool *BytesLinkList +} + +func (b *BytesLinkItem) Len() int { + return len(b.Bytes) +} + +func (b *BytesLinkItem) Recycle() { + b.Pool.Push(b) +} + +func (b *BytesLinkItem) ToBuffers() (result net.Buffers) { + for p := b; p != nil; p = p.Next { + result = append(result, p.Bytes) + } + return +} + +type BytesPool []BytesLinkList + +// 获取来自真实内存的切片的——假内存块,即只回收外壳 +func (p BytesPool) GetFake() (item *BytesLinkItem) { + if p[0].Length > 0 { + return p[0].Shift() + } else { + return &BytesLinkItem{ + Pool: &p[0], + } + } +} + +func (p BytesPool) Get(size int) (item *BytesLinkItem) { + for i := 1; i < len(p); i++ { + level := 1 << i + if level >= size { + if p[i].Length > 0 { + item = p[i].Shift() + item.Bytes = item.Bytes[:size] + } else { + item = &BytesLinkItem{ + Bytes: make([]byte, size, level), + Pool: &p[i], + } + } + } + } + if item == nil { + item = &BytesLinkItem{ + Bytes: make([]byte, size), + } + } + return +}