🐛 FIX: 乱序重排需要克隆数据

This commit is contained in:
dexter
2022-10-22 22:20:05 +08:00
parent 2b2815e213
commit e7fc23e1dc
3 changed files with 60 additions and 46 deletions

View File

@@ -79,6 +79,10 @@ type RTPFrame struct {
rtp.Packet rtp.Packet
} }
func (rtp *RTPFrame) Clone() *RTPFrame {
return &RTPFrame{*rtp.Packet.Clone()}
}
func (rtp *RTPFrame) H264Type() (naluType codec.H264NALUType) { func (rtp *RTPFrame) H264Type() (naluType codec.H264NALUType) {
return naluType.Parse(rtp.Payload[0]) return naluType.Parse(rtp.Payload[0])
} }
@@ -169,6 +173,7 @@ func (avcc AVCCFrame) AudioCodecID() codec.AudioCodecID {
// } // }
// return // return
// } // }
//
// func (annexb AnnexBFrame) ToNALUs() (ret [][]NALUSlice) { // func (annexb AnnexBFrame) ToNALUs() (ret [][]NALUSlice) {
// for len(annexb) > 0 { // for len(annexb) > 0 {
// before, after, found := bytes.Cut(annexb, codec.NALU_Delimiter1) // before, after, found := bytes.Cut(annexb, codec.NALU_Delimiter1)

View File

@@ -50,7 +50,7 @@ type Media[T RawSlice] struct {
rtpSequence uint16 //用于生成下一个rtp包的序号 rtpSequence uint16 //用于生成下一个rtp包的序号
lastSeq uint16 //上一个收到的序号,用于乱序重排 lastSeq uint16 //上一个收到的序号,用于乱序重排
lastSeq2 uint16 //记录上上一个收到的序列号 lastSeq2 uint16 //记录上上一个收到的序列号
乱序重排 util.RTPReorder[RTPFrame] 乱序重排 util.RTPReorder[*RTPFrame]
流速控制 流速控制
} }

View File

@@ -1,21 +1,28 @@
package util package util
// RTPReorder RTP包乱序重排 type CloneType[T any] interface {
type RTPReorder[T any] struct { Clone() T
lastSeq uint16 //最新收到的rtp包序号 comparable
queue []*T // 缓存队列,0号元素位置代表lastReq+1永远保持为空
} }
func (p *RTPReorder[T]) Push(seq uint16, v *T) *T { const queueLen = 20
// RTPReorder RTP包乱序重排
type RTPReorder[T CloneType[T]] struct {
lastSeq uint16 //最新收到的rtp包序号
queue []T // 缓存队列,0号元素位置代表lastReq+1永远保持为空
}
func (p *RTPReorder[T]) Push(seq uint16, v T) (result T) {
// 初始化 // 初始化
if len(p.queue) == 0 { if len(p.queue) == 0 {
p.lastSeq = seq p.lastSeq = seq
p.queue = make([]*T, 20) p.queue = make([]T, 20)
return v return v
} }
if seq < p.lastSeq && p.lastSeq-seq < 0x8000 { if seq < p.lastSeq && p.lastSeq-seq < 0x8000 {
// 旧的包直接丢弃 // 旧的包直接丢弃
return nil return
} }
delta := seq - p.lastSeq delta := seq - p.lastSeq
if delta == 1 { if delta == 1 {
@@ -26,7 +33,6 @@ func (p *RTPReorder[T]) Push(seq uint16, v *T) *T {
} }
if seq > p.lastSeq { if seq > p.lastSeq {
//delta必然大于1 //delta必然大于1
queueLen := uint16(len(p.queue))
if queueLen < delta { if queueLen < delta {
//超过缓存最大范围,无法挽回,只能造成丢包(序号断裂) //超过缓存最大范围,无法挽回,只能造成丢包(序号断裂)
for { for {
@@ -35,31 +41,34 @@ func (p *RTPReorder[T]) Push(seq uint16, v *T) *T {
p.pop() p.pop()
// 可以放得进去了 // 可以放得进去了
if delta == queueLen-1 { if delta == queueLen-1 {
p.queue[queueLen-1] = v p.queue[queueLen-1] = v.Clone()
v = p.queue[0] v = p.queue[0]
p.queue[0] = nil p.queue[0] = result
return v return v
} }
} }
} else { } else {
// 出现后面的包先到达,缓存起来 // 出现后面的包先到达,缓存起来
p.queue[delta-1] = v p.queue[delta-1] = v.Clone()
return nil return
} }
} }
return nil return
} }
func (p *RTPReorder[T]) pop() { func (p *RTPReorder[T]) pop() (result T) {
copy(p.queue, p.queue[1:]) //整体数据向前移动一位保持0号元素代表lastSeq+1 copy(p.queue, p.queue[1:]) //整体数据向前移动一位保持0号元素代表lastSeq+1
p.queue[queueLen-1] = result
return p.queue[0]
} }
// Pop 从缓存中取出一个包需要连续调用直到返回nil // Pop 从缓存中取出一个包需要连续调用直到返回nil
func (p *RTPReorder[T]) Pop() (next *T) { func (p *RTPReorder[T]) Pop() (result T) {
if len(p.queue) == 0 { if len(p.queue) == 0 {
return return
} }
if next = p.queue[0]; next != nil { if next := p.queue[0]; next != result {
result = next
p.lastSeq++ p.lastSeq++
p.pop() p.pop()
} }