diff --git a/audio_track.go b/audio_track.go index 31b3503..f94c5ba 100644 --- a/audio_track.go +++ b/audio_track.go @@ -24,9 +24,12 @@ type AudioTrack struct { } func (at *AudioTrack) pushByteStream(ts uint32, payload []byte) { + if len(payload) == 0 { + return + } switch at.CodecID = payload[0] >> 4; at.CodecID { case codec.CodecID_AAC: - if payload[1] != 0 { + if len(payload) < 4 || payload[1] != 0 { return } else { config1, config2 := payload[2], payload[3] diff --git a/dtsestimator.go b/dtsestimator.go new file mode 100644 index 0000000..94c35bc --- /dev/null +++ b/dtsestimator.go @@ -0,0 +1,52 @@ +package engine + +// DTSEstimator is a DTS estimator. +type DTSEstimator struct { + prevDTS uint32 + prevPTS uint32 + prevPrevPTS uint32 + dts func(uint32) uint32 +} + +func (d *DTSEstimator) _dts(pts uint32) uint32 { + // P or I frame + if pts > d.prevPTS { + // previous frame was B + // use the DTS of the previous frame + if d.prevPTS < d.prevPrevPTS { + return d.prevPTS + } + + // previous frame was P or I + // use two frames ago plus a small quantity + // to avoid non-monotonous DTS with B-frames + return d.prevPrevPTS + 1 + } + + // B Frame + // increase by a small quantity + return d.prevDTS + 1 +} + +// NewDTSEstimator allocates a DTSEstimator. +func NewDTSEstimator() *DTSEstimator { + result := &DTSEstimator{} + result.dts = func(pts uint32) uint32 { + if pts > 0 { + result.dts = result._dts + } + return pts + } + return result +} + +// Feed provides PTS to the estimator, and returns the estimated DTS. +func (d *DTSEstimator) Feed(pts uint32) uint32 { + dts := d.dts(pts) + + d.prevPrevPTS = d.prevPTS + d.prevPTS = pts + d.prevDTS = dts + + return dts +} diff --git a/rtp.go b/rtp.go new file mode 100644 index 0000000..082d010 --- /dev/null +++ b/rtp.go @@ -0,0 +1,57 @@ +package engine + +import ( + "time" + + "github.com/Monibuca/utils/v3" + "github.com/pion/rtp" +) + +// 对rtp包进行解封装,并修复时间戳,包括时间戳跳跃 +type RTPDemuxer struct { + rtp.Packet + Reorder bool // 是否支持乱序重排 + PTS uint32 // 修复后的时间戳(毫秒) + lastTs uint32 // 记录上一个收到的时间戳 + lastSeq uint16 // 记录上一个收到的序列号 + lastSeq2 uint16 // 记录上上一个收到的序列号 + timeBase uint64 // 采样率 + timestamp time.Time // 客观时间用于计算耗时 + OnDemux func(uint32, []byte) +} + +func (r *RTPDemuxer) Push(rtpRaw []byte) { + if err := r.Unmarshal(rtpRaw); err != nil { + utils.Println("RTP Unmarshal error", err) + return + } + // 本次序号是上上次的序号+1 说明中间隔了一个错误序号(某些rtsp流中的rtcp包写成了rtp包导致的) + if r.SequenceNumber == r.lastSeq2+1 { + r.lastSeq = r.SequenceNumber + } else { + r.lastSeq2 = r.lastSeq + r.lastSeq = r.SequenceNumber + if r.lastSeq != r.lastSeq2+1 { //序号不连续 + utils.Println("RTP SequenceNumber error", r.lastSeq2, r.lastSeq) + return + } + } + if r.Timestamp > r.lastTs { + delta := uint32(uint64(r.Timestamp-r.lastTs) * 1000 / r.timeBase) + if delta > 1000 { // 时间戳跳跃 + r.PTS += uint32(time.Since(r.timestamp) / time.Millisecond) + } else { + r.PTS += delta + } + } else if r.lastTs > r.Timestamp { + delta := uint32(uint64(r.lastTs-r.Timestamp) * 1000 / r.timeBase) + if delta > 1000 { // 时间戳跳跃 + r.PTS += uint32(time.Since(r.timestamp) / time.Millisecond) + } else { + r.PTS -= delta + } + } + r.timestamp = time.Now() + r.OnDemux(r.PTS, r.Payload) + r.lastTs = r.Timestamp +} diff --git a/rtp_audio.go b/rtp_audio.go index 04708fd..5e42978 100644 --- a/rtp_audio.go +++ b/rtp_audio.go @@ -1,21 +1,14 @@ package engine import ( + "time" + + "github.com/Monibuca/utils/v3" "github.com/Monibuca/utils/v3/codec" - "github.com/pion/rtp" ) -type RTPPublisher struct { - rtp.Packet `json:"-"` - lastTs uint32 - absTs uint32 - lastSeq uint16 - ts uint32 //毫秒单位的时间戳 - demux func() -} - type RTPAudio struct { - RTPPublisher + RTPDemuxer `json:"-"` *AudioTrack } @@ -23,48 +16,28 @@ func (s *Stream) NewRTPAudio(codec byte) (r *RTPAudio) { r = &RTPAudio{ AudioTrack: s.NewAudioTrack(codec), } - r.demux = r.push + r.OnDemux = r.push return } -func (v *RTPAudio) push() { +// 该函数只执行一次 +func (v *RTPAudio) push(ts uint32, payload []byte) { switch v.CodecID { case codec.CodecID_AAC: - v.demux = func() { - for _, payload := range codec.ParseRTPAAC(v.Payload) { - v.PushRaw(v.ts, payload) + v.OnDemux = func(ts uint32, payload []byte) { + for _, payload := range codec.ParseRTPAAC(payload) { + v.PushRaw(ts, payload) } } case codec.CodecID_PCMA, codec.CodecID_PCMU: - v.demux = func() { - v.PushRaw(v.ts, v.Payload) + v.OnDemux = func(ts uint32, payload []byte) { + v.PushRaw(ts, payload) } + default: + utils.Println("RTP Publisher: Unsupported codec", v.CodecID) + return // TODO } - v.demux() -} - -func (p *RTPAudio) Push(payload []byte) { - if p.Unmarshal(payload) == nil { - if p.lastTs != 0 { - if p.SequenceNumber != p.lastSeq+1 { - println("RTP Publisher: SequenceNumber error", p.lastSeq, p.SequenceNumber) - return - } else { - // if p.lastTs > p.Timestamp { - // if p.lastTs-p.Timestamp > 100000 { - // p.absTs += (p.Timestamp) - // } else { //B frame - // p.absTs -= (p.lastTs - p.Timestamp) - // } - // } else { - // p.absTs += (p.Timestamp - p.lastTs) - // } - p.absTs += (p.Timestamp - p.lastTs) - p.ts = uint32(uint64(p.absTs) * 1000 / uint64(p.SoundRate)) - } - } - p.lastTs = p.Timestamp - p.lastSeq = p.SequenceNumber - p.demux() - } + v.timeBase = uint64(v.AudioTrack.SoundRate) + v.timestamp = time.Now() + v.OnDemux(ts, payload) } diff --git a/rtp_video.go b/rtp_video.go index 103af52..a756575 100644 --- a/rtp_video.go +++ b/rtp_video.go @@ -2,7 +2,6 @@ package engine import ( "bytes" - "container/heap" "encoding/binary" "github.com/Monibuca/utils/v3" @@ -33,7 +32,7 @@ type RTPNalu struct { } type RTPVideo struct { - RTPPublisher + RTPDemuxer `json:"-"` *VideoTrack fuaBuffer *bytes.Buffer demuxNalu func([]byte) *RTPNalu @@ -43,13 +42,14 @@ func (s *Stream) NewRTPVideo(codecID byte) (r *RTPVideo) { r = &RTPVideo{ VideoTrack: s.NewVideoTrack(codecID), } + r.timeBase = 90000 switch codecID { case codec.CodecID_H264: r.demuxNalu = r.demuxH264 case codec.CodecID_H265: r.demuxNalu = r.demuxH265 } - r.demux = r._demux + r.OnDemux = r._demux return } @@ -69,7 +69,7 @@ func (v *RTPVideo) demuxH264(payload []byte) (result *RTPNalu) { utils.Printf("STAP-A declared size(%d) is larger then buffer(%d)", naluSize, naluLen-currOffset) return } - *current = &RTPNalu{Payload: payload[currOffset : currOffset+naluSize], PTS: v.Timestamp} + *current = &RTPNalu{Payload: payload[currOffset : currOffset+naluSize], PTS: v.PTS} current = &(*current).Next } case codec.NALU_MTAP16, codec.NALU_MTAP24: @@ -85,7 +85,7 @@ func (v *RTPVideo) demuxH264(payload []byte) (result *RTPNalu) { if lenSize == 5 { ts = (ts << 8) | uint16(payload[currOffset+5]) } - *current = &RTPNalu{Payload: payload[currOffset : currOffset+naluSize], PTS: v.Timestamp + uint32(ts)} + *current = &RTPNalu{Payload: payload[currOffset : currOffset+naluSize], PTS: v.PTS + uint32(ts)} current = &(*current).Next } /* @@ -133,12 +133,12 @@ func (v *RTPVideo) demuxH264(payload []byte) (result *RTPNalu) { } if v.fuaBuffer != nil { if v.fuaBuffer.Write(payload[lenSize:]); payload[1]&fuaEndBitmask != 0 { - result = &RTPNalu{Payload: v.fuaBuffer.Bytes(), PTS: v.Timestamp} + result = &RTPNalu{Payload: v.fuaBuffer.Bytes(), PTS: v.PTS} v.fuaBuffer = nil } } default: - return &RTPNalu{Payload: payload, PTS: v.Timestamp} + return &RTPNalu{Payload: payload, PTS: v.PTS} } return } @@ -266,61 +266,65 @@ func (p *RTPVideo) demuxH265(payload []byte) (result *RTPNalu) { return } -func (p *RTPVideo) _demux() { - if last := p.demuxNalu(p.Payload); last != nil { - p.demux = func() { - if current := p.demuxNalu(p.Payload); current != nil { - if last.PTS > current.PTS { //有B帧 - var b B - utils.Println("rtp has B-frame!!") - for heap.Push(&b, last); last.Next != nil; last = last.Next { - heap.Push(&b, last.Next) - } - for heap.Push(&b, current); current.Next != nil; current = current.Next { - heap.Push(&b, current.Next) - } - p.demux = func() { - if current := p.demuxNalu(p.Payload); current != nil { - if current.PTS > b.MaxTS { - for b.Len() > 0 { - el := heap.Pop(&b).(struct { - DTS uint32 - *RTPNalu - }) - p.absTs += (el.DTS - p.lastTs) - p.lastTs = el.DTS - p.PushNalu(p.absTs/90, (el.PTS/90 - el.DTS/90), el.Payload) - } - b.MaxTS = 0 - } - for heap.Push(&b, current); current.Next != nil; current = current.Next { - heap.Push(&b, current.Next) - } - } - } - return - } - if p.lastTs != 0 { - p.absTs += (last.PTS - p.lastTs) - } - p.lastTs = last.PTS - p.PushNalu(p.absTs/90, 0, last.Payload) - for last = current; last.Next != nil; last = last.Next { - p.absTs += (last.PTS - p.lastTs) - p.lastTs = last.PTS - p.PushNalu(p.absTs/90, 0, last.Payload) - } +// func (p *RTPVideo) _demux(ts uint32, payload []byte) { +// p.timestamp = time.Now() +// if last := p.demuxNalu(payload); last != nil { +// p.OnDemux = func(ts uint32, payload []byte) { +// if current := p.demuxNalu(payload); current != nil { +// if last.PTS > current.PTS { //有B帧 +// var b B +// utils.Println("rtp has B-frame!!") +// for heap.Push(&b, last); last.Next != nil; last = last.Next { +// heap.Push(&b, last.Next) +// } +// for heap.Push(&b, current); current.Next != nil; current = current.Next { +// heap.Push(&b, current.Next) +// } +// p.OnDemux = func(ts uint32, payload []byte) { +// if current := p.demuxNalu(payload); current != nil { +// if current.PTS > b.MaxTS { +// for b.Len() > 0 { +// el := heap.Pop(&b).(struct { +// DTS uint32 +// *RTPNalu +// }) +// p.PushNalu(el.DTS, (el.PTS - el.DTS), el.Payload) +// } +// b.MaxTS = 0 +// } +// for heap.Push(&b, current); current.Next != nil; current = current.Next { +// heap.Push(&b, current.Next) +// } +// } +// } +// return +// } +// p.PushNalu(p.PTS, 0, last.Payload) +// for last = current; last.Next != nil; last = last.Next { +// p.PushNalu(p.PTS, 0, last.Payload) +// } +// } +// } +// } +// } + +func (p *RTPVideo) _demux(ts uint32, payload []byte) { + if nalus := p.demuxNalu(payload); nalus != nil { + startPTS := nalus.PTS + dtsEst := NewDTSEstimator() + dts := dtsEst.Feed(0) + p.PushNalu(dts, 0, nalus.Payload) + for nalus = nalus.Next; nalus != nil; nalus = nalus.Next { + pts := nalus.PTS - startPTS + dts := dtsEst.Feed(pts) + p.PushNalu(dts, pts-dts, nalus.Payload) + } + p.OnDemux = func(ts uint32, payload []byte) { + for nalus := p.demuxNalu(p.Payload); nalus != nil; nalus = nalus.Next { + pts := nalus.PTS - startPTS + dts := dtsEst.Feed(pts) + p.PushNalu(dts, pts-dts, nalus.Payload) } } } } -func (p *RTPVideo) Push(payload []byte) { - if p.Unmarshal(payload) == nil { - if p.lastSeq > 0 && p.SequenceNumber != p.lastSeq+1 { - println("RTP Publisher: SequenceNumber error", p.lastSeq, p.SequenceNumber) - return - } - p.lastSeq = p.SequenceNumber - p.demux() - } -} diff --git a/video_track.go b/video_track.go index 3255256..7478f25 100644 --- a/video_track.go +++ b/video_track.go @@ -372,7 +372,7 @@ func (vt *VideoTrack) setIDR(idr bool) { vt.AVItem.Value = vt.keyFrameBuffers.Remove(cache) vt.VideoPack = vt.AVItem.Value.(*VideoPack) //设置当前操作的指针 } - } else {//原来是关键帧,现在是非关键帧,把原来的关键帧缓存放回去 + } else { //原来是关键帧,现在是非关键帧,把原来的关键帧缓存放回去 vt.keyFrameBuffers.PushBack(vt.AVItem.Value) vt.VideoPack = new(VideoPack) //设置当前操作的指针 vt.AVItem.Value = vt.VideoPack