From e7fc23e1dc4fc806114120d8b931be7e89ea8e56 Mon Sep 17 00:00:00 2001 From: dexter <178529795@qq.com> Date: Sat, 22 Oct 2022 22:20:05 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20FIX:=20=E4=B9=B1=E5=BA=8F?= =?UTF-8?q?=E9=87=8D=E6=8E=92=E9=9C=80=E8=A6=81=E5=85=8B=E9=9A=86=E6=95=B0?= =?UTF-8?q?=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/frame.go | 63 ++++++++++++++++++++++++++----------------------- track/base.go | 2 +- util/reorder.go | 41 +++++++++++++++++++------------- 3 files changed, 60 insertions(+), 46 deletions(-) diff --git a/common/frame.go b/common/frame.go index 40e129d..9a629a6 100644 --- a/common/frame.go +++ b/common/frame.go @@ -27,9 +27,9 @@ type RawSlice interface { ~[][]byte | ~[]byte } -// func (nalu *H264NALU) Append(slice ...NALUSlice) { -// *nalu = append(*nalu, slice...) -// } +// func (nalu *H264NALU) Append(slice ...NALUSlice) { +// *nalu = append(*nalu, slice...) +// } func (nalu NALUSlice) H264Type() (naluType codec.H264NALUType) { return naluType.Parse(nalu[0][0]) } @@ -79,6 +79,10 @@ type RTPFrame struct { rtp.Packet } +func (rtp *RTPFrame) Clone() *RTPFrame { + return &RTPFrame{*rtp.Packet.Clone()} +} + func (rtp *RTPFrame) H264Type() (naluType codec.H264NALUType) { return naluType.Parse(rtp.Payload[0]) } @@ -156,32 +160,33 @@ func (avcc AVCCFrame) AudioCodecID() codec.AudioCodecID { return codec.AudioCodecID(avcc[0] >> 4) } -// func (annexb AnnexBFrame) ToSlices() (ret []NALUSlice) { -// for len(annexb) > 0 { -// before, after, found := bytes.Cut(annexb, codec.NALU_Delimiter1) -// if !found { -// return append(ret, NALUSlice{annexb}) -// } -// if len(before) > 0 { -// ret = append(ret, NALUSlice{before}) -// } -// annexb = after -// } -// return -// } -// func (annexb AnnexBFrame) ToNALUs() (ret [][]NALUSlice) { -// for len(annexb) > 0 { -// before, after, found := bytes.Cut(annexb, codec.NALU_Delimiter1) -// if !found { -// return append(ret, annexb.ToSlices()) -// } -// if len(before) > 0 { -// ret = append(ret, AnnexBFrame(before).ToSlices()) -// } -// annexb = after -// } -// return -// } +// func (annexb AnnexBFrame) ToSlices() (ret []NALUSlice) { +// for len(annexb) > 0 { +// before, after, found := bytes.Cut(annexb, codec.NALU_Delimiter1) +// if !found { +// return append(ret, NALUSlice{annexb}) +// } +// if len(before) > 0 { +// ret = append(ret, NALUSlice{before}) +// } +// annexb = after +// } +// return +// } +// +// func (annexb AnnexBFrame) ToNALUs() (ret [][]NALUSlice) { +// for len(annexb) > 0 { +// before, after, found := bytes.Cut(annexb, codec.NALU_Delimiter1) +// if !found { +// return append(ret, annexb.ToSlices()) +// } +// if len(before) > 0 { +// ret = append(ret, AnnexBFrame(before).ToSlices()) +// } +// annexb = after +// } +// return +// } type DecoderConfiguration[T RawSlice] struct { PayloadType byte AVCC net.Buffers diff --git a/track/base.go b/track/base.go index 125b43b..40e3f00 100644 --- a/track/base.go +++ b/track/base.go @@ -50,7 +50,7 @@ type Media[T RawSlice] struct { rtpSequence uint16 //用于生成下一个rtp包的序号 lastSeq uint16 //上一个收到的序号,用于乱序重排 lastSeq2 uint16 //记录上上一个收到的序列号 - 乱序重排 util.RTPReorder[RTPFrame] + 乱序重排 util.RTPReorder[*RTPFrame] 流速控制 } diff --git a/util/reorder.go b/util/reorder.go index a80c9b7..19d4cfe 100644 --- a/util/reorder.go +++ b/util/reorder.go @@ -1,21 +1,28 @@ package util -// RTPReorder RTP包乱序重排 -type RTPReorder[T any] struct { - lastSeq uint16 //最新收到的rtp包序号 - queue []*T // 缓存队列,0号元素位置代表lastReq+1,永远保持为空 +type CloneType[T any] interface { + Clone() T + comparable } -func (p *RTPReorder[T]) Push(seq uint16, v *T) *T { +const queueLen = 20 + +// RTPReorder RTP包乱序重排 +type RTPReorder[T CloneType[T]] struct { + lastSeq uint16 //最新收到的rtp包序号 + queue []T // 缓存队列,0号元素位置代表lastReq+1,永远保持为空 +} + +func (p *RTPReorder[T]) Push(seq uint16, v T) (result T) { // 初始化 if len(p.queue) == 0 { p.lastSeq = seq - p.queue = make([]*T, 20) + p.queue = make([]T, 20) return v } if seq < p.lastSeq && p.lastSeq-seq < 0x8000 { // 旧的包直接丢弃 - return nil + return } delta := seq - p.lastSeq if delta == 1 { @@ -26,7 +33,6 @@ func (p *RTPReorder[T]) Push(seq uint16, v *T) *T { } if seq > p.lastSeq { //delta必然大于1 - queueLen := uint16(len(p.queue)) if queueLen < delta { //超过缓存最大范围,无法挽回,只能造成丢包(序号断裂) for { @@ -35,31 +41,34 @@ func (p *RTPReorder[T]) Push(seq uint16, v *T) *T { p.pop() // 可以放得进去了 if delta == queueLen-1 { - p.queue[queueLen-1] = v + p.queue[queueLen-1] = v.Clone() v = p.queue[0] - p.queue[0] = nil + p.queue[0] = result return v } } } else { // 出现后面的包先到达,缓存起来 - p.queue[delta-1] = v - return nil + p.queue[delta-1] = v.Clone() + return } } - return nil + return } -func (p *RTPReorder[T]) pop() { +func (p *RTPReorder[T]) pop() (result T) { copy(p.queue, p.queue[1:]) //整体数据向前移动一位,保持0号元素代表lastSeq+1 + p.queue[queueLen-1] = result + return p.queue[0] } // Pop 从缓存中取出一个包,需要连续调用直到返回nil -func (p *RTPReorder[T]) Pop() (next *T) { +func (p *RTPReorder[T]) Pop() (result T) { if len(p.queue) == 0 { return } - if next = p.queue[0]; next != nil { + if next := p.queue[0]; next != result { + result = next p.lastSeq++ p.pop() }