乱序重排和B帧处理

This commit is contained in:
dexter
2022-02-12 12:44:09 +08:00
parent 717f2ae77d
commit b6df875a98
9 changed files with 202 additions and 117 deletions

View File

@@ -1,4 +1,4 @@
package engine package common
// DTSEstimator is a DTS estimator. // DTSEstimator is a DTS estimator.
type DTSEstimator struct { type DTSEstimator struct {

View File

@@ -119,7 +119,7 @@ type AVFrame[T RawSlice] struct {
DTS uint32 DTS uint32
FLV net.Buffers // 打包好的FLV Tag FLV net.Buffers // 打包好的FLV Tag
AVCC net.Buffers // 打包好的AVCC格式 AVCC net.Buffers // 打包好的AVCC格式
RTP []RTPFrame RTP []*RTPFrame
Raw []T //裸数据 Raw []T //裸数据
canRead bool canRead bool
} }
@@ -134,7 +134,7 @@ func (av *AVFrame[T]) FillFLV(t byte, ts uint32) {
func (av *AVFrame[T]) AppendAVCC(avcc ...[]byte) { func (av *AVFrame[T]) AppendAVCC(avcc ...[]byte) {
av.AVCC = append(av.AVCC, avcc...) av.AVCC = append(av.AVCC, avcc...)
} }
func (av *AVFrame[T]) AppendRTP(rtp ...RTPFrame) { func (av *AVFrame[T]) AppendRTP(rtp ...*RTPFrame) {
av.RTP = append(av.RTP, rtp...) av.RTP = append(av.RTP, rtp...)
} }

View File

@@ -127,13 +127,7 @@ func (r *Subscriber) WaitVideoTrack(names ...string) *track.Video {
if t := <-r.Stream.WaitTrack(names...); t == nil { if t := <-r.Stream.WaitTrack(names...); t == nil {
return nil return nil
} else { } else {
switch vt := t.(type) { return t.(*track.Video)
case *track.H264:
return (*track.Video)(vt)
case *track.H265:
return (*track.Video)(vt)
}
return nil
} }
} }
@@ -147,12 +141,6 @@ func (r *Subscriber) WaitAudioTrack(names ...string) *track.Audio {
if t := <-r.Stream.WaitTrack(names...); t == nil { if t := <-r.Stream.WaitTrack(names...); t == nil {
return nil return nil
} else { } else {
switch at := t.(type) { return t.(*track.Audio)
case *track.AAC:
return (*track.Audio)(at)
case *track.G711:
return (*track.Audio)(at)
}
return nil
} }
} }

View File

@@ -18,23 +18,27 @@ func NewAAC(stream IStream) (aac *AAC) {
aac.Init(stream, 32) aac.Init(stream, 32)
aac.Poll = time.Millisecond * 20 aac.Poll = time.Millisecond * 20
aac.DecoderConfiguration.PayloadType = 97 aac.DecoderConfiguration.PayloadType = 97
if config.Global.RTPReorder {
aac.orderQueue = make([]*RTPFrame, 20)
}
return return
} }
type AAC Audio type AAC struct {
Audio
}
func (aac *AAC) WriteRTP(raw []byte) { func (aac *AAC) WriteRTP(raw []byte) {
var packet RTPFrame for frame := aac.UnmarshalRTP(raw); frame != nil; frame = aac.nextRTPFrame() {
if frame := packet.Unmarshal(raw); frame == nil { for _, payload := range codec.ParseRTPAAC(frame.Payload) {
return
}
for _, payload := range codec.ParseRTPAAC(packet.Payload) {
aac.WriteSlice(payload) aac.WriteSlice(payload)
} }
aac.Value.AppendRTP(packet) aac.Value.AppendRTP(frame)
if packet.Marker { if frame.Marker {
aac.generateTimestamp()
aac.Flush() aac.Flush()
} }
}
} }
func (aac *AAC) WriteAVCC(ts uint32, frame AVCCFrame) { func (aac *AAC) WriteAVCC(ts uint32, frame AVCCFrame) {
@@ -51,7 +55,7 @@ func (aac *AAC) WriteAVCC(ts uint32, frame AVCCFrame) {
aac.DecoderConfiguration.Raw = AudioSlice(frame[2:]) aac.DecoderConfiguration.Raw = AudioSlice(frame[2:])
aac.DecoderConfiguration.FLV = net.Buffers{adcflv1, frame, adcflv2} aac.DecoderConfiguration.FLV = net.Buffers{adcflv1, frame, adcflv2}
} else { } else {
(*Audio)(aac).WriteAVCC(ts, frame) aac.Audio.WriteAVCC(ts, frame)
aac.Flush() aac.Flush()
} }
} }
@@ -73,5 +77,5 @@ func (aac *AAC) Flush() {
} }
aac.PacketizeRTP(o) aac.PacketizeRTP(o)
} }
(*Audio)(aac).Flush() aac.Audio.Flush()
} }

View File

@@ -102,7 +102,7 @@ func (at *UnknowAudio) WriteAVCC(ts uint32, frame AVCCFrame) {
a.SampleSize = 16 a.SampleSize = 16
a.avccHead = []byte{frame[0], 1} a.avccHead = []byte{frame[0], 1}
a.WriteAVCC(0, frame) a.WriteAVCC(0, frame)
a.Stream.AddTrack(a) a.Stream.AddTrack(&a.Audio)
case codec.CodecID_PCMA, case codec.CodecID_PCMA,
codec.CodecID_PCMU: codec.CodecID_PCMU:
alaw := true alaw := true
@@ -118,7 +118,7 @@ func (at *UnknowAudio) WriteAVCC(ts uint32, frame AVCCFrame) {
} }
a.Channels = frame[0]&0x01 + 1 a.Channels = frame[0]&0x01 + 1
a.avccHead = frame[:1] a.avccHead = frame[:1]
a.Stream.AddTrack(a) a.Stream.AddTrack(&a.Audio)
} }
} else { } else {
at.Know.WriteAVCC(ts, frame) at.Know.WriteAVCC(ts, frame)

View File

@@ -2,6 +2,7 @@ package track
import ( import (
. "github.com/Monibuca/engine/v4/common" . "github.com/Monibuca/engine/v4/common"
"github.com/Monibuca/engine/v4/config"
"github.com/pion/rtp" "github.com/pion/rtp"
) )
@@ -30,8 +31,83 @@ type Media[T RawSlice] struct {
SampleSize byte SampleSize byte
DecoderConfiguration DecoderConfiguration[T] `json:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) AAC(config) DecoderConfiguration DecoderConfiguration[T] `json:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) AAC(config)
// util.BytesPool //无锁内存池,用于发布者(在同一个协程中)复用小块的内存,通常是解包时需要临时使用 // util.BytesPool //无锁内存池,用于发布者(在同一个协程中)复用小块的内存,通常是解包时需要临时使用
lastAvccTS uint32 //上一个avcc帧的时间戳 rtpSequence uint16 //用于生成下一个rtp包的序号
rtpSequence uint16 orderQueue []*RTPFrame //rtp包的缓存队列用于乱序重排
lastSeq uint16 //上一个收到的序号,用于乱序重排
lastSeq2 uint16 //记录上上一个收到的序列号
}
// 获取缓存中下一个rtpFrame
func (av *Media[T]) nextRTPFrame() (frame *RTPFrame) {
if config.Global.RTPReorder {
frame = av.orderQueue[0]
av.lastSeq++
copy(av.orderQueue, av.orderQueue[1:])
}
return
}
func (av *Media[T]) generateTimestamp() {
ts := av.Value.RTP[0].Timestamp
av.Value.PTS = ts
av.Value.DTS = ts
}
func (av *Media[T]) UnmarshalRTP(raw []byte) (frame *RTPFrame) {
if frame = new(RTPFrame); frame.Unmarshal(raw) == nil {
return
}
if config.Global.RTPReorder {
if frame.SequenceNumber < av.lastSeq {
// 出现旧的包直接丢弃
return nil
} else if av.lastSeq == 0 {
// 初始化
av.lastSeq = frame.SequenceNumber
return
} else if av.lastSeq+1 == frame.SequenceNumber {
// 正常顺序
av.lastSeq = frame.SequenceNumber
copy(av.orderQueue, av.orderQueue[1:])
return
} else if frame.SequenceNumber > av.lastSeq {
delta := int(frame.SequenceNumber - av.lastSeq)
queueLen := len(av.orderQueue)
// 超过缓存队列长度,TODO: 可能会丢弃正确的包
if queueLen < delta {
for {
av.lastSeq++
delta = int(frame.SequenceNumber - av.lastSeq)
copy(av.orderQueue, av.orderQueue[1:])
// 可以放得进去了
if delta == queueLen-1 {
av.orderQueue[queueLen-1] = frame
frame, av.orderQueue[0] = av.orderQueue[0], nil
return frame
}
}
}
// 出现后面的包先到达,缓存起来
av.orderQueue[delta-1] = frame
return nil
} else {
return nil
}
} else {
if av.lastSeq == 0 {
av.lastSeq = frame.SequenceNumber
} else if frame.SequenceNumber == av.lastSeq2+1 { // 本次序号是上上次的序号+1 说明中间隔了一个错误序号某些rtsp流中的rtcp包写成了rtp包导致的
av.lastSeq = frame.SequenceNumber
} else {
av.lastSeq2 = av.lastSeq
av.lastSeq = frame.SequenceNumber
if av.lastSeq != av.lastSeq2+1 { //序号不连续
av.Stream.Warnln("RTP SequenceNumber error", av.lastSeq2, av.lastSeq)
return
}
}
return
}
} }
func (av *Media[T]) WriteSlice(slice T) { func (av *Media[T]) WriteSlice(slice T) {
@@ -39,11 +115,6 @@ func (av *Media[T]) WriteSlice(slice T) {
} }
func (av *Media[T]) WriteAVCC(ts uint32, frame AVCCFrame) { func (av *Media[T]) WriteAVCC(ts uint32, frame AVCCFrame) {
if av.lastAvccTS == 0 {
av.lastAvccTS = ts
} else {
av.Value.DeltaTime = ts - av.lastAvccTS
}
cts := frame.CTS() cts := frame.CTS()
av.Value.BytesIn = len(frame) av.Value.BytesIn = len(frame)
av.Value.AppendAVCC(frame) av.Value.AppendAVCC(frame)
@@ -53,6 +124,9 @@ func (av *Media[T]) WriteAVCC(ts uint32, frame AVCCFrame) {
} }
func (av *Media[T]) Flush() { func (av *Media[T]) Flush() {
if av.Prev().Value.DTS != 0 {
av.Value.DeltaTime = (av.Value.DTS - av.Prev().Value.DTS) / 90
}
av.Base.Flush(&av.Value.BaseFrame) av.Base.Flush(&av.Value.BaseFrame)
av.Step() av.Step()
} }
@@ -61,7 +135,7 @@ func (av *Media[T]) Flush() {
func (av *Media[T]) PacketizeRTP(payloads ...[]byte) { func (av *Media[T]) PacketizeRTP(payloads ...[]byte) {
for i, pp := range payloads { for i, pp := range payloads {
av.rtpSequence++ av.rtpSequence++
var frame = RTPFrame{Packet: rtp.Packet{ var frame = &RTPFrame{Packet: rtp.Packet{
Header: rtp.Header{ Header: rtp.Header{
Version: 2, Version: 2,
Padding: false, Padding: false,
@@ -69,7 +143,7 @@ func (av *Media[T]) PacketizeRTP(payloads ...[]byte) {
Marker: i == len(payloads)-1, Marker: i == len(payloads)-1,
PayloadType: av.DecoderConfiguration.PayloadType, PayloadType: av.DecoderConfiguration.PayloadType,
SequenceNumber: av.rtpSequence, SequenceNumber: av.rtpSequence,
Timestamp: av.Value.DTS, // Figure out how to do timestamps Timestamp: av.Value.PTS, // Figure out how to do timestamps
SSRC: av.Stream.SSRC(), SSRC: av.Stream.SSRC(),
}, },
Payload: pp, Payload: pp,

View File

@@ -5,6 +5,7 @@ import (
"github.com/Monibuca/engine/v4/codec" "github.com/Monibuca/engine/v4/codec"
. "github.com/Monibuca/engine/v4/common" . "github.com/Monibuca/engine/v4/common"
"github.com/Monibuca/engine/v4/config"
) )
func NewG711(stream IStream, alaw bool) (g711 *G711) { func NewG711(stream IStream, alaw bool) (g711 *G711) {
@@ -18,25 +19,29 @@ func NewG711(stream IStream, alaw bool) (g711 *G711) {
g711.Init(stream, 32) g711.Init(stream, 32)
g711.Poll = time.Millisecond * 20 g711.Poll = time.Millisecond * 20
g711.DecoderConfiguration.PayloadType = 97 g711.DecoderConfiguration.PayloadType = 97
if config.Global.RTPReorder {
g711.orderQueue = make([]*RTPFrame, 20)
}
return return
} }
type G711 Audio type G711 struct {
Audio
}
func (g711 *G711) WriteAVCC(ts uint32, frame AVCCFrame) { func (g711 *G711) WriteAVCC(ts uint32, frame AVCCFrame) {
g711.WriteSlice(AudioSlice(frame[1:])) g711.WriteSlice(AudioSlice(frame[1:]))
(*Audio)(g711).WriteAVCC(ts, frame) g711.Audio.WriteAVCC(ts, frame)
g711.Flush() g711.Flush()
} }
func (g711 *G711) WriteRTP(raw []byte) { func (g711 *G711) WriteRTP(raw []byte) {
var packet RTPFrame for frame := g711.UnmarshalRTP(raw); frame != nil; frame = g711.nextRTPFrame() {
if frame := packet.Unmarshal(raw); frame == nil { g711.WriteSlice(frame.Payload)
return g711.Value.AppendRTP(frame)
} if frame.Marker {
g711.WriteSlice(packet.Payload) g711.generateTimestamp()
g711.Value.AppendRTP(packet)
if packet.Marker {
g711.Flush() g711.Flush()
} }
}
} }

View File

@@ -10,7 +10,10 @@ import (
"github.com/Monibuca/engine/v4/util" "github.com/Monibuca/engine/v4/util"
) )
type H264 Video type H264 struct {
Video
dtsEst *DTSEstimator
}
func NewH264(stream IStream) (vt *H264) { func NewH264(stream IStream) (vt *H264) {
vt = &H264{} vt = &H264{}
@@ -21,10 +24,14 @@ func NewH264(stream IStream) (vt *H264) {
vt.Init(stream, 256) vt.Init(stream, 256)
vt.Poll = time.Millisecond * 20 vt.Poll = time.Millisecond * 20
vt.DecoderConfiguration.PayloadType = 96 vt.DecoderConfiguration.PayloadType = 96
if config.Global.RTPReorder {
vt.orderQueue = make([]*RTPFrame, 20)
}
vt.dtsEst = NewDTSEstimator()
return return
} }
func (vt *H264) WriteAnnexB(pts uint32, dts uint32, frame AnnexBFrame) { func (vt *H264) WriteAnnexB(pts uint32, dts uint32, frame AnnexBFrame) {
(*Video)(vt).WriteAnnexB(pts, dts, frame) vt.Video.WriteAnnexB(pts, dts, frame)
vt.Flush() vt.Flush()
} }
func (vt *H264) WriteSlice(slice NALUSlice) { func (vt *H264) WriteSlice(slice NALUSlice) {
@@ -66,17 +73,14 @@ func (vt *H264) WriteAVCC(ts uint32, frame AVCCFrame) {
} }
vt.DecoderConfiguration.FLV = codec.VideoAVCC2FLV(net.Buffers(vt.DecoderConfiguration.AVCC), 0) vt.DecoderConfiguration.FLV = codec.VideoAVCC2FLV(net.Buffers(vt.DecoderConfiguration.AVCC), 0)
} else { } else {
(*Video)(vt).WriteAVCC(ts, frame) vt.Video.WriteAVCC(ts, frame)
vt.Value.IFrame = frame.IsIDR() vt.Value.IFrame = frame.IsIDR()
vt.Flush() vt.Flush()
} }
} }
func (vt *H264) WriteRTP(raw []byte) { func (vt *H264) WriteRTP(raw []byte) {
var frame RTPFrame for frame := vt.UnmarshalRTP(raw); frame != nil; frame = vt.nextRTPFrame() {
if packet := frame.Unmarshal(raw); packet == nil {
return
}
if naluType := frame.H264Type(); naluType < 24 { if naluType := frame.H264Type(); naluType < 24 {
vt.WriteSlice(NALUSlice{frame.Payload}) vt.WriteSlice(NALUSlice{frame.Payload})
} else { } else {
@@ -99,16 +103,19 @@ func (vt *H264) WriteRTP(raw []byte) {
} }
vt.Value.AppendRTP(frame) vt.Value.AppendRTP(frame)
if frame.Marker { if frame.Marker {
vt.Value.PTS = frame.Timestamp
vt.Value.DTS = vt.dtsEst.Feed(frame.Timestamp)
vt.Flush() vt.Flush()
} }
}
} }
func (vt *H264) Flush() { func (vt *H264) Flush() {
if vt.Value.IFrame { if vt.Value.IFrame {
if vt.IDRing == nil { if vt.IDRing == nil {
defer vt.Stream.AddTrack(vt) defer vt.Stream.AddTrack(&vt.Video)
} }
(*Video)(vt).ComputeGOP() vt.Video.ComputeGOP()
} }
// RTP格式补完 // RTP格式补完
if vt.Value.RTP == nil && config.Global.EnableRTP { if vt.Value.RTP == nil && config.Global.EnableRTP {
@@ -142,5 +149,5 @@ func (vt *H264) Flush() {
} }
vt.PacketizeRTP(out...) vt.PacketizeRTP(out...)
} }
(*Video)(vt).Flush() vt.Video.Flush()
} }

View File

@@ -10,7 +10,9 @@ import (
"github.com/Monibuca/engine/v4/util" "github.com/Monibuca/engine/v4/util"
) )
type H265 Video type H265 struct {
Video
}
func NewH265(stream IStream) (vt *H265) { func NewH265(stream IStream) (vt *H265) {
vt = &H265{} vt = &H265{}
@@ -21,10 +23,13 @@ func NewH265(stream IStream) (vt *H265) {
vt.Init(stream, 256) vt.Init(stream, 256)
vt.Poll = time.Millisecond * 20 vt.Poll = time.Millisecond * 20
vt.DecoderConfiguration.PayloadType = 96 vt.DecoderConfiguration.PayloadType = 96
if config.Global.RTPReorder {
vt.orderQueue = make([]*RTPFrame, 20)
}
return return
} }
func (vt *H265) WriteAnnexB(pts uint32, dts uint32, frame AnnexBFrame) { func (vt *H265) WriteAnnexB(pts uint32, dts uint32, frame AnnexBFrame) {
(*Video)(vt).WriteAnnexB(pts, dts, frame) vt.Video.WriteAnnexB(pts, dts, frame)
vt.Flush() vt.Flush()
} }
func (vt *H265) WriteSlice(slice NALUSlice) { func (vt *H265) WriteSlice(slice NALUSlice) {
@@ -64,16 +69,13 @@ func (vt *H265) WriteAVCC(ts uint32, frame AVCCFrame) {
} }
vt.DecoderConfiguration.FLV = codec.VideoAVCC2FLV(net.Buffers(vt.DecoderConfiguration.AVCC), 0) vt.DecoderConfiguration.FLV = codec.VideoAVCC2FLV(net.Buffers(vt.DecoderConfiguration.AVCC), 0)
} else { } else {
(*Video)(vt).WriteAVCC(ts, frame) vt.Video.WriteAVCC(ts, frame)
vt.Value.IFrame = frame.IsIDR() vt.Value.IFrame = frame.IsIDR()
vt.Flush() vt.Flush()
} }
} }
func (vt *H265) WriteRTP(raw []byte) { func (vt *H265) WriteRTP(raw []byte) {
var frame RTPFrame for frame := vt.UnmarshalRTP(raw); frame != nil; frame = vt.nextRTPFrame() {
if packet := frame.Unmarshal(raw); packet == nil {
return
}
// TODO: DONL may need to be parsed if `sprop-max-don-diff` is greater than 0 on the RTP stream. // TODO: DONL may need to be parsed if `sprop-max-don-diff` is greater than 0 on the RTP stream.
var usingDonlField bool var usingDonlField bool
var buffer = util.Buffer(frame.Payload) var buffer = util.Buffer(frame.Payload)
@@ -105,13 +107,18 @@ func (vt *H265) WriteRTP(raw []byte) {
vt.WriteSlice(vt.Value.Raw[lastIndex]) vt.WriteSlice(vt.Value.Raw[lastIndex])
} }
} }
if frame.Marker {
vt.generateTimestamp()
vt.Flush()
}
}
} }
func (vt *H265) Flush() { func (vt *H265) Flush() {
if vt.Value.IFrame { if vt.Value.IFrame {
if vt.IDRing == nil { if vt.IDRing == nil {
defer vt.Stream.AddTrack(vt) defer vt.Stream.AddTrack(&vt.Video)
} }
(*Video)(vt).ComputeGOP() vt.Video.ComputeGOP()
} }
// RTP格式补完 // RTP格式补完
if vt.Value.RTP == nil && config.Global.EnableRTP { if vt.Value.RTP == nil && config.Global.EnableRTP {
@@ -145,5 +152,5 @@ func (vt *H265) Flush() {
} }
vt.PacketizeRTP(out...) vt.PacketizeRTP(out...)
} }
(*Video)(vt).Flush() vt.Video.Flush()
} }