From f2847be29f10c9905f6218a0cf090cf862e3fe4d Mon Sep 17 00:00:00 2001 From: dexter <178529795@qq.com> Date: Wed, 1 Feb 2023 10:42:50 +0800 Subject: [PATCH] =?UTF-8?q?rtp=E6=94=B9=E6=88=90=E9=93=BE=E8=A1=A8?= =?UTF-8?q?=E5=BD=A2=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/frame.go | 14 +++------- subscriber.go | 34 ++++++++++++++--------- track/base.go | 15 ++++++----- track/h264.go | 2 +- track/h265.go | 2 +- track/rtp.go | 21 +++++---------- track/video.go | 29 +++++++------------- util/list.go | 72 ++++++++++++++++++++++++++++++++++++++++++++----- util/pool.go | 10 +++++++ 9 files changed, 128 insertions(+), 71 deletions(-) diff --git a/common/frame.go b/common/frame.go index b6f4b86..da15626 100644 --- a/common/frame.go +++ b/common/frame.go @@ -53,9 +53,9 @@ type AVFrame struct { IFrame bool PTS uint32 DTS uint32 - AVCC util.BLL `json:"-"` // 打包好的AVCC格式(MPEG-4格式、Byte-Stream Format) - RTP []*RTPFrame `json:"-"` - AUList util.BLLs `json:"-"` // 裸数据 + AVCC util.BLL `json:"-"` // 打包好的AVCC格式(MPEG-4格式、Byte-Stream Format) + RTP util.List[RTPFrame] `json:"-"` + AUList util.BLLs `json:"-"` // 裸数据 mem util.BLL CanRead bool `json:"-"` } @@ -79,15 +79,9 @@ func (av *AVFrame) AppendMem(item *util.ListItem[util.Buffer]) { av.mem.Push(item) } -func (av *AVFrame) AppendRTP(rtp *RTPFrame) { - av.RTP = append(av.RTP, rtp) -} - // Reset 重置数据,复用内存 func (av *AVFrame) Reset() { - if av.RTP != nil { - av.RTP = av.RTP[:0] - } + av.RTP.Recycle() av.mem.Recycle() av.AVCC.Recycle() av.AUList.Recycle() diff --git a/subscriber.go b/subscriber.go index 2a2ae15..799c6d4 100644 --- a/subscriber.go +++ b/subscriber.go @@ -192,22 +192,22 @@ func (s *Subscriber) PlayBlock(subType byte) { case SUBTYPE_RTP: var videoSeq, audioSeq uint16 sendVideoFrame = func(frame *AVFrame) { - for _, p := range frame.RTP { + frame.RTP.Range(func(vp RTPFrame) bool { videoSeq++ - vp := *p vp.Header.Timestamp = vp.Header.Timestamp - s.VideoReader.SkipTs*90 vp.Header.SequenceNumber = videoSeq spesic.OnEvent((VideoRTP)(vp)) - } + return true + }) } sendAudioFrame = func(frame *AVFrame) { - for _, p := range frame.RTP { + frame.RTP.Range(func(ap RTPFrame) bool { audioSeq++ - vp := *p - vp.Header.SequenceNumber = audioSeq - vp.Header.Timestamp = vp.Header.Timestamp - s.AudioReader.SkipTs*90 - spesic.OnEvent((AudioRTP)(vp)) - } + ap.Header.SequenceNumber = audioSeq + ap.Header.Timestamp = ap.Header.Timestamp - s.AudioReader.SkipTs*90 + spesic.OnEvent((AudioRTP)(ap)) + return true + }) } case SUBTYPE_FLV: flvHeadCache := make([]byte, 15) //内存复用 @@ -258,7 +258,9 @@ func (s *Subscriber) PlayBlock(subType byte) { hasVideo, hasAudio := s.VideoReader.Track != nil && s.Config.SubVideo, s.AudioReader.Track != nil && s.Config.SubAudio if hasVideo { if videoFrame != nil { - sendVideoFrame(videoFrame) + if videoFrame.CanRead { + sendVideoFrame(videoFrame) + } videoFrame = nil } for ctx.Err() == nil { @@ -270,7 +272,9 @@ func (s *Subscriber) PlayBlock(subType byte) { } if audioFrame != nil { if frame.AbsTime > audioFrame.AbsTime { - sendAudioFrame(audioFrame) + if audioFrame.CanRead { + sendAudioFrame(audioFrame) + } audioFrame = nil } } @@ -293,7 +297,9 @@ func (s *Subscriber) PlayBlock(subType byte) { // 正常模式下或者纯音频模式下,音频开始播放 if hasAudio { if audioFrame != nil { - sendAudioFrame(audioFrame) + if audioFrame.CanRead { + sendAudioFrame(audioFrame) + } audioFrame = nil } for ctx.Err() == nil { @@ -315,7 +321,9 @@ func (s *Subscriber) PlayBlock(subType byte) { } if videoFrame != nil { if frame.AbsTime > videoFrame.AbsTime { - sendVideoFrame(videoFrame) + if videoFrame.CanRead { + sendVideoFrame(videoFrame) + } videoFrame = nil } } diff --git a/track/base.go b/track/base.go index 4c038fc..380740d 100644 --- a/track/base.go +++ b/track/base.go @@ -4,7 +4,6 @@ import ( "time" "unsafe" - "go.uber.org/zap" . "m7s.live/engine/v4/common" "m7s.live/engine/v4/config" "m7s.live/engine/v4/util" @@ -65,8 +64,8 @@ func (p *IDRingList) AddIDR(IDRing *util.Ring[AVFrame]) { } func (p *IDRingList) ShiftIDR() { - p.HistoryRing = p.Next.Next.Value p.Shift() + p.HistoryRing = p.Next.Value } // Media 基础媒体Track类 @@ -78,7 +77,8 @@ type Media struct { SSRC uint32 PayloadType byte BytesPool util.BytesPool `json:"-"` - SequenceHead []byte `json:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) AAC(config) + rtpPool util.Pool[RTPFrame] + SequenceHead []byte `json:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) AAC(config) SequenceHeadSeq int RTPMuxer RTPDemuxer @@ -163,7 +163,7 @@ func (av *Media) AppendAuBytes(b ...[]byte) { func (av *Media) narrow(gop int) { if l := av.Size - gop - 5; l > 5 { - av.Stream.Debug("resize", zap.Int("before", av.Size), zap.Int("after", av.Size-l), zap.String("name", av.Name)) + // av.Stream.Debug("resize", zap.Int("before", av.Size), zap.Int("after", av.Size-l), zap.String("name", av.Name)) //缩小缓冲环节省内存 av.Reduce(l).Do(func(v AVFrame) { v.Reset() @@ -174,6 +174,9 @@ func (av *Media) narrow(gop int) { func (av *Media) AddIDR() { if av.Stream.GetPublisherConfig().BufferTime > 0 { av.IDRingList.AddIDR(av.Ring) + if av.HistoryRing == nil { + av.HistoryRing = av.IDRing + } } else { av.IDRing = av.Ring } @@ -189,14 +192,14 @@ func (av *Media) Flush() { // 下一帧为订阅起始帧,即将覆盖,需要扩环 if nextValue == av.IDRing || nextValue == av.HistoryRing { // if av.AVRing.Size < 512 { - av.Stream.Debug("resize", zap.Int("before", av.Size), zap.Int("after", av.Size+5), zap.String("name", av.Name)) + // av.Stream.Debug("resize", zap.Int("before", av.Size), zap.Int("after", av.Size+5), zap.String("name", av.Name)) av.Glow(5) // } else { // av.Stream.Error("sub ring overflow", zap.Int("size", av.AVRing.Size), zap.String("name", av.Name)) // } } // 补完RTP - if config.Global.EnableRTP && len(curValue.RTP) == 0 { + if config.Global.EnableRTP && curValue.RTP.Length == 0 { av.CompleteRTP(curValue) } // 补完AVCC diff --git a/track/h264.go b/track/h264.go index 5ef5f3b..c6d11a6 100644 --- a/track/h264.go +++ b/track/h264.go @@ -128,7 +128,7 @@ func (vt *H264) WriteRTPFrame(frame *RTPFrame) { // RTP格式补完 func (vt *H264) CompleteRTP(value *AVFrame) { - if len(value.RTP) > 0 { + if value.RTP.Length > 0 { if !vt.dcChanged && value.IFrame { vt.insertDCRtp() } diff --git a/track/h265.go b/track/h265.go index 04cb050..b230d46 100644 --- a/track/h265.go +++ b/track/h265.go @@ -120,7 +120,7 @@ func (vt *H265) WriteRTPFrame(frame *RTPFrame) { // RTP格式补完 func (vt *H265) CompleteRTP(value *AVFrame) { - if len(value.RTP) > 0 { + if value.RTP.Length > 0 { if !vt.dcChanged && value.IFrame { vt.insertDCRtp() } diff --git a/track/rtp.go b/track/rtp.go index 2cbe7f5..76ae010 100644 --- a/track/rtp.go +++ b/track/rtp.go @@ -33,7 +33,7 @@ func (av *Media) UnmarshalRTP(raw []byte) (frame *RTPFrame) { } func (av *Media) writeRTPFrame(frame *RTPFrame) { - av.Value.AppendRTP(frame) + av.Value.RTP.PushValue(*frame) av.WriteRTPFrame(frame) if frame.Marker { av.SpesificTrack.generateTimestamp(frame.Timestamp) @@ -59,31 +59,24 @@ func (av *Media) WriteRTP(raw []byte) { // Packetize packetizes the payload of an RTP packet and returns one or more RTP packets func (av *Media) PacketizeRTP(payloads ...[][]byte) { packetCount := len(payloads) - if cap(av.Value.RTP) < packetCount { - av.Value.RTP = make([]*RTPFrame, packetCount) - } else { - av.Value.RTP = av.Value.RTP[:packetCount] - } for i, pp := range payloads { av.rtpSequence++ - packet := av.Value.RTP[i] - if packet == nil { - packet = &RTPFrame{} - av.Value.RTP[i] = packet + rtpItem := av.rtpPool.Get() + packet := &rtpItem.Value + if packet.Payload == nil { + packet.Payload = make([]byte, 0, RTPMTU) packet.Version = 2 packet.PayloadType = av.PayloadType packet.SSRC = av.SSRC } - item := av.BytesPool.Get(RTPMTU) - av.Value.AppendMem(item) - packet.Payload = item.Value[:0] - // packet.Payload = packet.Payload[:0] + packet.Payload = packet.Payload[:0] packet.SequenceNumber = av.rtpSequence packet.Timestamp = av.Value.PTS packet.Marker = i == packetCount-1 for _, p := range pp { packet.Payload = append(packet.Payload, p...) } + av.Value.RTP.Push(rtpItem) } } diff --git a/track/video.go b/track/video.go index 188160e..7377beb 100644 --- a/track/video.go +++ b/track/video.go @@ -71,7 +71,7 @@ func (vt *Video) computeGOP() { vt.GOP = int(vt.Value.Sequence - vt.IDRing.Value.Sequence) vt.narrow(vt.GOP) } - vt.IDRing = vt.Ring + vt.AddIDR() // var n int // for i := 0; i < len(vt.BytesPool); i++ { // n += vt.BytesPool[i].Length @@ -149,33 +149,24 @@ func (vt *Video) WriteSliceByte(b ...byte) { // 在I帧前面插入sps pps webrtc需要 func (av *Video) insertDCRtp() { - seq := av.Value.RTP[0].SequenceNumber - l1, l2 := len(av.ParamaterSets), len(av.Value.RTP) - afterLen := l1 + l2 - if cap(av.Value.RTP) < afterLen { - rtps := make([]*RTPFrame, l1, afterLen) - av.Value.RTP = append(rtps, av.Value.RTP...) - } else { - av.Value.RTP = av.Value.RTP[:afterLen] - copy(av.Value.RTP[l1:], av.Value.RTP[:l2]) - } - for i, nalu := range av.ParamaterSets { - packet := &RTPFrame{} + head := av.Value.RTP.Next + seq := head.Value.SequenceNumber + for _, nalu := range av.ParamaterSets { + var packet RTPFrame packet.Version = 2 packet.PayloadType = av.PayloadType packet.Payload = nalu packet.SSRC = av.SSRC - packet.SequenceNumber = seq packet.Timestamp = av.Value.PTS packet.Marker = false - seq++ + head.InsertBeforeValue(packet) av.rtpSequence++ - av.Value.RTP[i] = packet } - for i := l1; i < afterLen; i++ { - av.Value.RTP[i].SequenceNumber = seq + av.Value.RTP.RangeItem(func(item *util.ListItem[RTPFrame]) bool { + item.Value.SequenceNumber = seq seq++ - } + return true + }) } func (av *Video) generateTimestamp(ts uint32) { diff --git a/util/list.go b/util/list.go index 084057e..767a779 100644 --- a/util/list.go +++ b/util/list.go @@ -15,6 +15,40 @@ type ListItem[T any] struct { list *List[T] } +func (item *ListItem[T]) InsertBefore(insert *ListItem[T]) { + if insert.list != nil { + panic("item already in list") + } + insert.list = item.list + insert.Pre = item.Pre + insert.Next = item + item.Pre.Next = insert + item.Pre = insert + item.list.Length++ +} +func (item *ListItem[T]) InsertBeforeValue(value T) (result *ListItem[T]) { + result = &ListItem[T]{Value: value} + item.InsertBefore(result) + return +} +func (item *ListItem[T]) InsertAfter(insert *ListItem[T]) { + if insert.list != nil { + panic("item already in list") + } + insert.list = item.list + insert.Next = item.Next + insert.Pre = item + item.Next.Pre = insert + item.Next = insert + item.list.Length++ +} + +func (item *ListItem[T]) InsertAfterValue(value T) (result *ListItem[T]) { + result = &ListItem[T]{Value: value} + item.InsertAfter(result) + return +} + func (item *ListItem[T]) IsRoot() bool { return &item.list.ListItem == item } @@ -53,13 +87,37 @@ func (p *List[T]) Push(item *ListItem[T]) { p.Pre = &p.ListItem p.ListItem.list = p } - item.list = p - item.Next = &p.ListItem - item.Pre = p.Pre - // p.Value = item.Value - p.Pre.Next = item - p.Pre = p.Pre.Next - p.Length++ + p.Pre.InsertAfter(item) + // item.list = p + // item.Next = &p.ListItem + // item.Pre = p.Pre + // // p.Value = item.Value + // p.Pre.Next = item + // p.Pre = p.Pre.Next + // p.Length++ +} + +func (p *List[T]) UnshiftValue(value T) { + p.Unshift(&ListItem[T]{Value: value}) +} + +func (p *List[T]) Unshift(item *ListItem[T]) { + if item.list != nil { + panic("item already in list") + } + if p.Length == 0 { + p.Next = &p.ListItem + p.Pre = &p.ListItem + p.ListItem.list = p + } + p.Next.InsertBefore(item) + // item.list = p + // item.Next = p.Next + // item.Pre = &p.ListItem + // // p.Value = item.Value + // p.Next.Pre = item + // p.Next = p.Next.Pre + // p.Length++ } func (p *List[T]) ShiftValue() T { diff --git a/util/pool.go b/util/pool.go index 3cba27e..6989288 100644 --- a/util/pool.go +++ b/util/pool.go @@ -278,3 +278,13 @@ func (p BytesPool) Get(size int) (item *ListItem[Buffer]) { } return } + +type Pool[T any] List[T] + +func (p *Pool[T]) Get() (item *ListItem[T]) { + item = (*List[T])(p).PoolShift() + if item == nil { + item = &ListItem[T]{} + } + return +}