From c8467a2180a2ce60e8b5f7e0c4e04185779039f4 Mon Sep 17 00:00:00 2001 From: cnotch Date: Fri, 29 Nov 2024 18:51:18 +0800 Subject: [PATCH] remove basePts --- av/format/rtp/aac_depacketizer.go | 60 ++++++++++++++---------------- av/format/rtp/demuxer.go | 37 +++++++----------- av/format/rtp/h264_depacketizer.go | 31 ++++++--------- av/format/rtp/h265_depacketizer.go | 31 ++++++--------- av/format/rtp/syncclock.go | 24 +++++++++++- 5 files changed, 87 insertions(+), 96 deletions(-) diff --git a/av/format/rtp/aac_depacketizer.go b/av/format/rtp/aac_depacketizer.go index a36aaa8..50631ce 100644 --- a/av/format/rtp/aac_depacketizer.go +++ b/av/format/rtp/aac_depacketizer.go @@ -5,8 +5,6 @@ package rtp import ( - "time" - "github.com/cnotch/ipchub/av/codec" "github.com/cnotch/ipchub/av/codec/aac" ) @@ -27,36 +25,34 @@ func NewAacDepacketizer(meta *codec.AudioMeta, w codec.FrameWriter) Depacketizer sizeLength: 13, indexLength: 3, } - aacdp.syncClock.RTPTimeUnit = float64(time.Second) / float64(meta.SampleRate) + aacdp.syncClock.Init(meta.SampleRate) return aacdp } -// 以下是当 sizelength=13;indexlength=3;indexdeltalength=3 时 -// Au-header = 13+3 bits(2byte) 的示意图 -// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 -// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-| -// | AU-headers-length | -// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-| -// | AU-header(1) | -// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-| -// | AU-header(2) | -// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-| -// | ... | -// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-| -// | AU-header(n) | -// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-| -// | pading bits | -// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-| +// 以下是当 sizelength=13;indexlength=3;indexdeltalength=3 时 +// Au-header = 13+3 bits(2byte) 的示意图 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-| +// | AU-headers-length | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-| +// | AU-header(1) | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-| +// | AU-header(2) | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-| +// | ... | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-| +// | AU-header(n) | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-| +// | pading bits | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-| +// // 当 sizelength=6;indexlength=2;indexdeltalength=2 时 // 单帧封装时,rtp payload的长度 = AU-header-lengths(两个字节) + AU-header(6+2) + AU的长度 -func (aacdp *aacDepacketizer) Depacketize(basePts int64, packet *Packet) (err error) { - if aacdp.syncClock.NTPTime == 0 { // 未收到同步时钟信息,忽略任意包 - return - } - return aacdp.depacketizeFor2ByteAUHeader(basePts, packet) +func (aacdp *aacDepacketizer) Depacketize(packet *Packet) (err error) { + return aacdp.depacketizeFor2ByteAUHeader(packet) } -func (aacdp *aacDepacketizer) depacketizeFor2ByteAUHeader(basePts int64, packet *Packet) (err error) { +func (aacdp *aacDepacketizer) depacketizeFor2ByteAUHeader(packet *Packet) (err error) { payload := packet.Payload() // AU-headers-length 2bytes @@ -72,11 +68,11 @@ func (aacdp *aacDepacketizer) depacketizeFor2ByteAUHeader(basePts int64, packet for i := 0; i < int(auHeadersCount); i++ { auHeader := uint16(0) | (uint16(auHeaders[0]) << 8) | uint16(auHeaders[1]) frameSize := auHeader >> aacdp.indexLength - pts := aacdp.rtp2ntp(frameTimeStamp) - basePts + ptsDelay + pts := aacdp.rtp2ntp(frameTimeStamp) + ptsDelay frame := &codec.Frame{ MediaType: codec.MediaTypeAudio, - Dts: pts, - Pts: pts, + Dts: pts, + Pts: pts, Payload: framesPayload[:frameSize], } if err = aacdp.w.WriteFrame(frame); err != nil { @@ -92,7 +88,7 @@ func (aacdp *aacDepacketizer) depacketizeFor2ByteAUHeader(basePts int64, packet return } -func (aacdp *aacDepacketizer) depacketizeFor1ByteAUHeader(basePts int64, packet *Packet) (err error) { +func (aacdp *aacDepacketizer) depacketizeFor1ByteAUHeader(packet *Packet) (err error) { payload := packet.Payload() // AU-headers-length 2bytes @@ -108,11 +104,11 @@ func (aacdp *aacDepacketizer) depacketizeFor1ByteAUHeader(basePts int64, packet for i := 0; i < int(auHeadersCount); i++ { auHeader := auHeaders[0] frameSize := auHeader >> aacdp.indexLength - pts := aacdp.rtp2ntp(frameTimeStamp) - basePts + ptsDelay + pts := aacdp.rtp2ntp(frameTimeStamp) + ptsDelay frame := &codec.Frame{ MediaType: codec.MediaTypeAudio, - Dts: pts, - Pts: pts, + Dts: pts, + Pts: pts, Payload: framesPayload[:frameSize], } if err = aacdp.w.WriteFrame(frame); err != nil { diff --git a/av/format/rtp/demuxer.go b/av/format/rtp/demuxer.go index 2111b3f..0b7309b 100644 --- a/av/format/rtp/demuxer.go +++ b/av/format/rtp/demuxer.go @@ -16,45 +16,35 @@ import ( // 网络播放时 PTS(Presentation Time Stamp)的延时 const ( - ptsDelay = int64(time.Second)/2 + ptsDelay = int64(time.Second) / 2 ) // Depacketizer 解包器 type Depacketizer interface { - Control(basePts *int64, p *Packet) error - Depacketize(basePts int64, p *Packet) error + Control(p *Packet) error + Depacketize(p *Packet) error } type emptyDepacketizer struct{} -func (emptyDepacketizer) Control(basePts *int64, p *Packet) error { return nil } -func (emptyDepacketizer) Depacketize(basePts int64, p *Packet) error { return nil } +func (emptyDepacketizer) Control(p *Packet) error { return nil } +func (emptyDepacketizer) Depacketize(p *Packet) error { return nil } type depacketizer struct { syncClock SyncClock } -// func (dp *depacketizer) ForcInitSyncClock(basePts *int64, p *Packet) { -// if dp.syncClock.NTPTime == 0 { -// dp.syncClock.RTPTime = p.Timestamp -// dp.syncClock.NTPTime = time.Now().Local().UnixNano() -// if *basePts == 0 { -// *basePts = dp.syncClock.NTPTime -// } -// } -// } +func (dp *depacketizer) Control(p *Packet) error { + if dp.syncClock.RTPTime == 0 { + if ok := dp.syncClock.Decode(p.Data); ok { -func (dp *depacketizer) Control(basePts *int64, p *Packet) error { - if ok := dp.syncClock.Decode(p.Data); ok { - if *basePts == 0 { - *basePts = dp.syncClock.NTPTime } } return nil } func (dp *depacketizer) rtp2ntp(timestamp uint32) int64 { - return dp.syncClock.Rtp2Ntp(timestamp) + return dp.syncClock.RelativeNtp(timestamp) } // Demuxer 帧转换器 @@ -108,7 +98,6 @@ func (demuxer *Demuxer) process() { demuxer.recvQueue.Reset() }() - var basePts int64 for !demuxer.closed { p := demuxer.recvQueue.Pop() if p == nil { @@ -122,13 +111,13 @@ func (demuxer *Demuxer) process() { var err error switch packet.Channel { case ChannelVideo: - err = demuxer.vdp.Depacketize(basePts, packet) + err = demuxer.vdp.Depacketize(packet) case ChannelVideoControl: - err = demuxer.vdp.Control(&basePts, packet) + err = demuxer.vdp.Control(packet) case ChannelAudio: - err = demuxer.adp.Depacketize(basePts, packet) + err = demuxer.adp.Depacketize(packet) case ChannelAudioControl: - err = demuxer.adp.Control(&basePts, packet) + err = demuxer.adp.Control(packet) } if err != nil { diff --git a/av/format/rtp/h264_depacketizer.go b/av/format/rtp/h264_depacketizer.go index d27d1c2..1797e0c 100644 --- a/av/format/rtp/h264_depacketizer.go +++ b/av/format/rtp/h264_depacketizer.go @@ -19,7 +19,6 @@ type h264Depacketizer struct { metaReady bool nextDts float64 dtsStep float64 - startOn time.Time w codec.FrameWriter } @@ -30,15 +29,11 @@ func NewH264Depacketizer(meta *codec.VideoMeta, w codec.FrameWriter) Depacketize fragments: make([]*Packet, 0, 16), w: w, } - h264dp.syncClock.RTPTimeUnit = float64(time.Second) / float64(meta.ClockRate) + h264dp.syncClock.Init(meta.ClockRate) return h264dp } -func (h264dp *h264Depacketizer) Depacketize(basePts int64, packet *Packet) (err error) { - if h264dp.syncClock.NTPTime == 0 { // 未收到同步时钟信息,忽略任意包 - return - } - +func (h264dp *h264Depacketizer) Depacketize(packet *Packet) (err error) { payload := packet.Payload() if len(payload) < 3 { return @@ -69,18 +64,18 @@ func (h264dp *h264Depacketizer) Depacketize(basePts int64, packet *Packet) (err MediaType: codec.MediaTypeVideo, Payload: payload, } - err = h264dp.writeFrame(basePts, packet.Timestamp, frame) + err = h264dp.writeFrame(packet.Timestamp, frame) case naluType == h264.NalStapaInRtp: - err = h264dp.depacketizeStapa(basePts, packet) + err = h264dp.depacketizeStapa(packet) case naluType == h264.NalFuAInRtp: - err = h264dp.depacketizeFuA(basePts, packet) + err = h264dp.depacketizeFuA(packet) default: err = fmt.Errorf("nalu type %d is currently not handled", naluType) } return } -func (h264dp *h264Depacketizer) depacketizeStapa(basePts int64, packet *Packet) (err error) { +func (h264dp *h264Depacketizer) depacketizeStapa(packet *Packet) (err error) { payload := packet.Payload() header := payload[0] @@ -115,7 +110,7 @@ func (h264dp *h264Depacketizer) depacketizeStapa(basePts int64, packet *Packet) } copy(frame.Payload, payload[off:]) frame.Payload[0] = 0 | (header & 0x60) | (frame.Payload[0] & 0x1F) - if err = h264dp.writeFrame(basePts, packet.Timestamp, frame); err != nil { + if err = h264dp.writeFrame(packet.Timestamp, frame); err != nil { return } @@ -127,7 +122,7 @@ func (h264dp *h264Depacketizer) depacketizeStapa(basePts int64, packet *Packet) return } -func (h264dp *h264Depacketizer) depacketizeFuA(basePts int64, packet *Packet) (err error) { +func (h264dp *h264Depacketizer) depacketizeFuA(packet *Packet) (err error) { payload := packet.Payload() header := payload[0] @@ -182,13 +177,13 @@ func (h264dp *h264Depacketizer) depacketizeFuA(basePts int64, packet *Packet) (e // 清空分片缓存 h264dp.fragments = h264dp.fragments[:0] - err = h264dp.writeFrame(basePts, packet.Timestamp, frame) + err = h264dp.writeFrame(packet.Timestamp, frame) } return } -func (h264dp *h264Depacketizer) writeFrame(basePts int64, rtpTimestamp uint32, frame *codec.Frame) error { +func (h264dp *h264Depacketizer) writeFrame(rtpTimestamp uint32, frame *codec.Frame) error { nalType := frame.Payload[0] & 0x1f switch nalType { case h264.NalSps: @@ -209,18 +204,16 @@ func (h264dp *h264Depacketizer) writeFrame(basePts int64, rtpTimestamp uint32, f } if h264dp.meta.FixedFrameRate { h264dp.dtsStep = float64(time.Second) / h264dp.meta.FrameRate - } else { - h264dp.startOn = time.Now() } h264dp.metaReady = true } - frame.Pts = h264dp.rtp2ntp(rtpTimestamp) - basePts + ptsDelay + frame.Pts = h264dp.rtp2ntp(rtpTimestamp) + ptsDelay if h264dp.dtsStep > 0 { frame.Dts = int64(h264dp.nextDts) h264dp.nextDts += h264dp.dtsStep } else { - frame.Dts = int64(time.Now().Sub(h264dp.startOn)) + frame.Dts = h264dp.syncClock.RelativeNtpNow() } return h264dp.w.WriteFrame(frame) } diff --git a/av/format/rtp/h265_depacketizer.go b/av/format/rtp/h265_depacketizer.go index 46fa286..aeacb68 100644 --- a/av/format/rtp/h265_depacketizer.go +++ b/av/format/rtp/h265_depacketizer.go @@ -18,7 +18,6 @@ type h265Depacketizer struct { metaReady bool nextDts float64 dtsStep float64 - startOn time.Time w codec.FrameWriter } @@ -29,7 +28,7 @@ func NewH265Depacketizer(meta *codec.VideoMeta, w codec.FrameWriter) Depacketize fragments: make([]*Packet, 0, 16), w: w, } - h265dp.syncClock.RTPTimeUnit = float64(time.Second) / float64(meta.ClockRate) + h265dp.syncClock.Init(meta.ClockRate) return h265dp } @@ -57,11 +56,7 @@ func NewH265Depacketizer(meta *codec.VideoMeta, w codec.FrameWriter) Depacketize * End fragment (E): 1 bit * FuType: 6 bits */ -func (h265dp *h265Depacketizer) Depacketize(basePts int64, packet *Packet) (err error) { - if h265dp.syncClock.NTPTime == 0 { // 未收到同步时钟信息,忽略任意包 - return - } - +func (h265dp *h265Depacketizer) Depacketize(packet *Packet) (err error) { payload := packet.Payload() if len(payload) < 3 { return @@ -71,20 +66,20 @@ func (h265dp *h265Depacketizer) Depacketize(basePts int64, packet *Packet) (err switch naluType { case hevc.NalStapInRtp: // 在RTP中的聚合(AP) - return h265dp.depacketizeStap(basePts, packet) + return h265dp.depacketizeStap(packet) case hevc.NalFuInRtp: // 在RTP中的扩展,分片(FU) - return h265dp.depacketizeFu(basePts, packet) + return h265dp.depacketizeFu(packet) default: frame := &codec.Frame{ MediaType: codec.MediaTypeVideo, Payload: payload, } - err = h265dp.writeFrame(basePts, packet.Timestamp, frame) + err = h265dp.writeFrame(packet.Timestamp, frame) return } } -func (h265dp *h265Depacketizer) depacketizeStap(basePts int64, packet *Packet) (err error) { +func (h265dp *h265Depacketizer) depacketizeStap(packet *Packet) (err error) { payload := packet.Payload() off := 2 // 跳过 STAP NAL HDR @@ -102,7 +97,7 @@ func (h265dp *h265Depacketizer) depacketizeStap(basePts int64, packet *Packet) ( Payload: make([]byte, nalSize), } copy(frame.Payload, payload[off:]) - if err = h265dp.writeFrame(basePts, packet.Timestamp, frame); err != nil { + if err = h265dp.writeFrame(packet.Timestamp, frame); err != nil { return } off += int(nalSize) @@ -113,7 +108,7 @@ func (h265dp *h265Depacketizer) depacketizeStap(basePts int64, packet *Packet) ( return } -func (h265dp *h265Depacketizer) depacketizeFu(basePts int64, packet *Packet) (err error) { +func (h265dp *h265Depacketizer) depacketizeFu(packet *Packet) (err error) { payload := packet.Payload() rawDataOffset := 3 // 原始数据的偏移 = FU indicator + header @@ -162,13 +157,13 @@ func (h265dp *h265Depacketizer) depacketizeFu(basePts int64, packet *Packet) (er // 清空分片缓存 h265dp.fragments = h265dp.fragments[:0] - err = h265dp.writeFrame(basePts, packet.Timestamp, frame) + err = h265dp.writeFrame(packet.Timestamp, frame) } return } -func (h265dp *h265Depacketizer) writeFrame(basePts int64, rtpTimestamp uint32, frame *codec.Frame) error { +func (h265dp *h265Depacketizer) writeFrame(rtpTimestamp uint32, frame *codec.Frame) error { nalType := (frame.Payload[0] >> 1) & 0x3f switch nalType { case hevc.NalVps: @@ -191,18 +186,16 @@ func (h265dp *h265Depacketizer) writeFrame(basePts int64, rtpTimestamp uint32, f } if h265dp.meta.FixedFrameRate { h265dp.dtsStep = float64(time.Second) / h265dp.meta.FrameRate - } else { - h265dp.startOn = time.Now() } h265dp.metaReady = true } - frame.Pts = h265dp.rtp2ntp(rtpTimestamp) - basePts + ptsDelay + frame.Pts = h265dp.rtp2ntp(rtpTimestamp) + ptsDelay if h265dp.dtsStep > 0 { frame.Dts = int64(h265dp.nextDts) h265dp.nextDts += h265dp.dtsStep } else { - frame.Dts = int64(time.Now().Sub(h265dp.startOn)) + frame.Dts = h265dp.syncClock.RelativeNtpNow() } return h265dp.w.WriteFrame(frame) } diff --git a/av/format/rtp/syncclock.go b/av/format/rtp/syncclock.go index 87c51b8..92a5d40 100644 --- a/av/format/rtp/syncclock.go +++ b/av/format/rtp/syncclock.go @@ -22,6 +22,15 @@ type SyncClock struct { // 与RTP数据包中的RTP时间戳具有相同的单位和随机初始值。 RTPTime uint32 RTPTimeUnit float64 // RTP时间单位,每个RTP时间的纳秒数 + + initOn time.Time // 初始化时间 +} + +// Init 初始化同步时钟 +func (sc *SyncClock) Init(clockRate int) { + sc.initOn = time.Now() + sc.NTPTime = sc.initOn.UnixNano() + sc.RTPTimeUnit = float64(time.Second) / float64(clockRate) } // LocalTime 本地时间 @@ -41,8 +50,19 @@ func (sc *SyncClock) Decode(data []byte) (ok bool) { return } -// Rtp2Ntp . -func (sc *SyncClock) Rtp2Ntp(rtptime uint32) int64 { +// GetRelativeNtp . +func (sc *SyncClock) RelativeNtpNow() int64 { + return int64(time.Now().Sub(sc.initOn)) +} + +// RelativeNtp . +func (sc *SyncClock) RelativeNtp(rtptime uint32) int64 { + diff := int64(rtptime) - int64(sc.RTPTime) + return int64(float64(diff) * sc.RTPTimeUnit) +} + +// AbsoluteNtp . +func (sc *SyncClock) AbsoluteNtp(rtptime uint32) int64 { diff := int64(rtptime) - int64(sc.RTPTime) return sc.NTPTime + int64(float64(diff)*sc.RTPTimeUnit) }