From 710cc55d8874fcaa83580c510408c3783a6e3d75 Mon Sep 17 00:00:00 2001 From: notch Date: Mon, 18 Jan 2021 11:44:04 +0800 Subject: [PATCH] refactoring dts and pts process --- av/codec/frame.go | 7 ++- av/format/flv/aac_packetizer.go | 18 +++--- av/format/flv/h264_packetizer.go | 47 +++------------ av/format/flv/h265_packetizer.go | 42 ++----------- av/format/flv/muxer.go | 26 ++++---- av/format/mpegts/aac_packetizer.go | 6 +- av/format/mpegts/h264_packetizer.go | 53 +++------------- av/format/mpegts/muxer.go | 18 ++---- av/format/rtp/aac_depacketizer.go | 45 ++++++++------ av/format/rtp/demuxer.go | 93 +++++++++++++++++------------ av/format/rtp/demuxer_test.go | 4 ++ av/format/rtp/h264_depacketizer.go | 76 +++++++++++++++-------- av/format/rtp/h265_depacketizer.go | 77 ++++++++++++++++-------- av/format/rtp/syncclock.go | 5 +- 14 files changed, 241 insertions(+), 276 deletions(-) diff --git a/av/codec/frame.go b/av/codec/frame.go index 5943d37..99ef4aa 100644 --- a/av/codec/frame.go +++ b/av/codec/frame.go @@ -74,9 +74,10 @@ func (mt *MediaType) unmarshalText(text string) bool { // Frame 音视频完整帧 type Frame struct { - MediaType // 媒体类型 - AbsTimestamp int64 // 绝对时间戳(主要用于表示 pts),单位为 ms 的 UNIX 时间 - Payload []byte // 媒体数据载荷 + MediaType // 媒体类型 + Dts int64 // DTS,单位为 ns + Pts int64 // PTS,单位为 ns + Payload []byte // 媒体数据载荷 } // FrameWriter 包装 WriteFrame 方法的接口 diff --git a/av/format/flv/aac_packetizer.go b/av/format/flv/aac_packetizer.go index 9d7c3f0..6fca54d 100644 --- a/av/format/flv/aac_packetizer.go +++ b/av/format/flv/aac_packetizer.go @@ -4,7 +4,11 @@ package flv -import "github.com/cnotch/ipchub/av/codec" +import ( + "time" + + "github.com/cnotch/ipchub/av/codec" +) // in ms, for aac flush the audio const aacDelay = 100 @@ -13,7 +17,6 @@ type aacPacketizer struct { meta *codec.AudioMeta dataTemplate *AudioData tagWriter TagWriter - spsMuxed bool } func NewAacPacketizer(meta *codec.AudioMeta, tagWriter TagWriter) Packetizer { @@ -61,11 +64,6 @@ func (ap *aacPacketizer) prepareTemplate() { } func (ap *aacPacketizer) PacketizeSequenceHeader() error { - if ap.spsMuxed { - return nil - } - - ap.spsMuxed = true audioData := *ap.dataTemplate audioData.AACPacketType = AACPacketTypeSequenceHeader audioData.Body = ap.meta.Sps @@ -81,12 +79,12 @@ func (ap *aacPacketizer) PacketizeSequenceHeader() error { return ap.tagWriter.WriteFlvTag(tag) } -func (ap *aacPacketizer) Packetize(basePts int64, frame *codec.Frame) error { +func (ap *aacPacketizer) Packetize(frame *codec.Frame) error { audioData := *ap.dataTemplate audioData.Body = frame.Payload data, _ := audioData.Marshal() - pts := aacDelay + frame.AbsTimestamp - basePts + ptsDelay - + pts := frame.Pts / int64(time.Millisecond) + tag := &Tag{ TagType: TagTypeAudio, DataSize: uint32(len(data)), diff --git a/av/format/flv/h264_packetizer.go b/av/format/flv/h264_packetizer.go index 0920e4d..01aac23 100644 --- a/av/format/flv/h264_packetizer.go +++ b/av/format/flv/h264_packetizer.go @@ -5,6 +5,8 @@ package flv import ( + "time" + "github.com/cnotch/ipchub/av/codec" "github.com/cnotch/ipchub/av/codec/h264" ) @@ -12,9 +14,6 @@ import ( type h264Packetizer struct { meta *codec.VideoMeta tagWriter TagWriter - spsMuxed bool - nextDts float64 - dtsStep float64 } func NewH264Packetizer(meta *codec.VideoMeta, tagWriter TagWriter) Packetizer { @@ -22,38 +21,17 @@ func NewH264Packetizer(meta *codec.VideoMeta, tagWriter TagWriter) Packetizer { meta: meta, tagWriter: tagWriter, } - - if meta.FrameRate > 0 { - h264p.dtsStep = 1000.0 / meta.FrameRate - } return h264p } func (h264p *h264Packetizer) PacketizeSequenceHeader() error { - if h264p.spsMuxed { - return nil - } - - if !h264.MetadataIsReady(h264p.meta) { - // not enough - return nil - } - - h264p.spsMuxed = true - - if h264p.meta.FixedFrameRate { - h264p.dtsStep = 1000.0 / h264p.meta.FrameRate - } else { // TODO: - h264p.dtsStep = 1000.0 / 30 - } - record := NewAVCDecoderConfigurationRecord(h264p.meta.Sps, h264p.meta.Pps) body, _ := record.Marshal() videoData := &VideoData{ FrameType: FrameTypeKeyFrame, CodecID: CodecIDAVC, - H2645PacketType: H2645PacketTypeSequenceHeader, + H2645PacketType: H2645PacketTypeSequenceHeader, CompositionTime: 0, Body: body, } @@ -70,26 +48,15 @@ func (h264p *h264Packetizer) PacketizeSequenceHeader() error { return h264p.tagWriter.WriteFlvTag(tag) } -func (h264p *h264Packetizer) Packetize(basePts int64, frame *codec.Frame) error { - if frame.Payload[0]&0x1F == h264.NalSps { - return h264p.PacketizeSequenceHeader() - } +func (h264p *h264Packetizer) Packetize(frame *codec.Frame) error { - if frame.Payload[0]&0x1F == h264.NalPps { - return h264p.PacketizeSequenceHeader() - } - - dts := int64(h264p.nextDts) - h264p.nextDts += h264p.dtsStep - pts := frame.AbsTimestamp - basePts + ptsDelay - if dts > pts { - pts = dts - } + dts := frame.Dts / int64(time.Millisecond) + pts := frame.Pts / int64(time.Millisecond) videoData := &VideoData{ FrameType: FrameTypeInterFrame, CodecID: CodecIDAVC, - H2645PacketType: H2645PacketTypeNALU, + H2645PacketType: H2645PacketTypeNALU, CompositionTime: uint32(pts - dts), Body: frame.Payload, } diff --git a/av/format/flv/h265_packetizer.go b/av/format/flv/h265_packetizer.go index 7218e88..74d63b1 100644 --- a/av/format/flv/h265_packetizer.go +++ b/av/format/flv/h265_packetizer.go @@ -5,6 +5,8 @@ package flv import ( + "time" + "github.com/cnotch/ipchub/av/codec" "github.com/cnotch/ipchub/av/codec/hevc" ) @@ -12,9 +14,6 @@ import ( type h265Packetizer struct { meta *codec.VideoMeta tagWriter TagWriter - spsMuxed bool - nextDts float64 - dtsStep float64 } func NewH265Packetizer(meta *codec.VideoMeta, tagWriter TagWriter) Packetizer { @@ -22,31 +21,10 @@ func NewH265Packetizer(meta *codec.VideoMeta, tagWriter TagWriter) Packetizer { meta: meta, tagWriter: tagWriter, } - - if meta.FrameRate > 0 { - h265p.dtsStep = 1000.0 / meta.FrameRate - } return h265p } func (h265p *h265Packetizer) PacketizeSequenceHeader() error { - if h265p.spsMuxed { - return nil - } - - if !hevc.MetadataIsReady(h265p.meta) { - // not enough - return nil - } - - h265p.spsMuxed = true - - if h265p.meta.FixedFrameRate { - h265p.dtsStep = 1000.0 / h265p.meta.FrameRate - } else { // TODO: - h265p.dtsStep = 1000.0 / 30 - } - record := NewHEVCDecoderConfigurationRecord(h265p.meta.Vps, h265p.meta.Sps, h265p.meta.Pps) body, _ := record.Marshal() @@ -70,20 +48,10 @@ func (h265p *h265Packetizer) PacketizeSequenceHeader() error { return h265p.tagWriter.WriteFlvTag(tag) } -func (h265p *h265Packetizer) Packetize(basePts int64, frame *codec.Frame) error { +func (h265p *h265Packetizer) Packetize(frame *codec.Frame) error { nalType := (frame.Payload[0] >> 1) & 0x3f - if nalType == hevc.NalVps || - nalType == hevc.NalSps || - nalType == hevc.NalPps { - return h265p.PacketizeSequenceHeader() - } - - dts := int64(h265p.nextDts) - h265p.nextDts += h265p.dtsStep - pts := frame.AbsTimestamp - basePts + ptsDelay - if dts > pts { - pts = dts - } + dts := frame.Dts / int64(time.Millisecond) + pts := frame.Pts / int64(time.Millisecond) videoData := &VideoData{ FrameType: FrameTypeInterFrame, diff --git a/av/format/flv/muxer.go b/av/format/flv/muxer.go index 7fb4e1f..39caf44 100644 --- a/av/format/flv/muxer.go +++ b/av/format/flv/muxer.go @@ -18,19 +18,13 @@ import ( // Packetizer 封包器 type Packetizer interface { PacketizeSequenceHeader() error - Packetize(basePts int64, frame *codec.Frame) error + Packetize(frame *codec.Frame) error } type emptyPacketizer struct{} -func (emptyPacketizer) PacketizeSequenceHeader() error { return nil } -func (emptyPacketizer) Packetize(basePts int64, frame *codec.Frame) error { return nil } - -// 网络播放时 PTS(Presentation Time Stamp)的延时 -// 影响视频 Tag 的 CTS 和音频的 DTS(Decoding Time Stamp) -const ( - ptsDelay = 1000 -) +func (emptyPacketizer) PacketizeSequenceHeader() error { return nil } +func (emptyPacketizer) Packetize(frame *codec.Frame) error { return nil } // Muxer flv muxer from av.Frame(H264[+AAC]) type Muxer struct { @@ -113,7 +107,8 @@ func (muxer *Muxer) process() { muxer.recvQueue.Reset() }() - var basePts int64 + var packSequenceHeader bool + for !muxer.closed { f := muxer.recvQueue.Pop() if f == nil { @@ -123,21 +118,22 @@ func (muxer *Muxer) process() { continue } - frame := f.(*codec.Frame) - if basePts == 0 { - basePts = frame.AbsTimestamp + if !packSequenceHeader{ muxer.muxMetadataTag() muxer.vp.PacketizeSequenceHeader() muxer.ap.PacketizeSequenceHeader() + packSequenceHeader = true } + + frame := f.(*codec.Frame) switch frame.MediaType { case codec.MediaTypeVideo: - if err := muxer.vp.Packetize(basePts, frame); err != nil { + if err := muxer.vp.Packetize(frame); err != nil { muxer.logger.Errorf("flvmuxer: muxVideoTag error - %s", err.Error()) } case codec.MediaTypeAudio: - if err := muxer.ap.Packetize(basePts, frame); err != nil { + if err := muxer.ap.Packetize(frame); err != nil { muxer.logger.Errorf("flvmuxer: muxAudioTag error - %s", err.Error()) } default: diff --git a/av/format/mpegts/aac_packetizer.go b/av/format/mpegts/aac_packetizer.go index 37b8822..02f8d77 100644 --- a/av/format/mpegts/aac_packetizer.go +++ b/av/format/mpegts/aac_packetizer.go @@ -6,6 +6,7 @@ package mpegts import ( "fmt" + "time" "github.com/cnotch/ipchub/av/codec" "github.com/cnotch/ipchub/av/codec/aac" @@ -48,9 +49,8 @@ func (ap *aacPacketizer) prepareAsc() (err error) { return } -func (ap *aacPacketizer) Packetize(basePts int64, frame *codec.Frame) error { - pts := frame.AbsTimestamp - basePts + ptsDelay - pts *= 90 +func (ap *aacPacketizer) Packetize(frame *codec.Frame) error { + pts := frame.Pts * 90000 / int64(time.Second) // 90000Hz // set fields tsframe := &Frame{ diff --git a/av/format/mpegts/h264_packetizer.go b/av/format/mpegts/h264_packetizer.go index 7c73e7b..1ca2563 100644 --- a/av/format/mpegts/h264_packetizer.go +++ b/av/format/mpegts/h264_packetizer.go @@ -5,6 +5,8 @@ package mpegts import ( + "time" + "github.com/cnotch/ipchub/av/codec" "github.com/cnotch/ipchub/av/codec/h264" ) @@ -12,9 +14,6 @@ import ( type h264Packetizer struct { meta *codec.VideoMeta tsframeWriter FrameWriter - metaReady bool - nextDts float64 - dtsStep float64 } func NewH264Packetizer(meta *codec.VideoMeta, tsframeWriter FrameWriter) Packetizer { @@ -22,56 +21,22 @@ func NewH264Packetizer(meta *codec.VideoMeta, tsframeWriter FrameWriter) Packeti meta: meta, tsframeWriter: tsframeWriter, } - - h264p.prepareMetadata() - return h264p } -func (h264p *h264Packetizer) prepareMetadata() error { - if h264p.metaReady { - return nil - } - - if !h264.MetadataIsReady(h264p.meta) { - // not enough - return nil - } - - if h264p.meta.FixedFrameRate { - h264p.dtsStep = 1000.0 / h264p.meta.FrameRate - } else { // TODO: - h264p.dtsStep = 1000.0 / 30 - } - h264p.metaReady = true - - return nil -} - -func (h264p *h264Packetizer) Packetize(basePts int64, frame *codec.Frame) error { - if frame.Payload[0]&0x1F == h264.NalSps { - return h264p.prepareMetadata() - } - - if frame.Payload[0]&0x1F == h264.NalPps { - return h264p.prepareMetadata() - } - - dts := int64(h264p.nextDts) - h264p.nextDts += h264p.dtsStep - pts := frame.AbsTimestamp - basePts + ptsDelay - if dts > pts { - pts = dts - } +func (h264p *h264Packetizer) Packetize(frame *codec.Frame) error { + nalType := frame.Payload[0] & 0x1F + dts := frame.Dts * 90000 / int64(time.Second) // 90000Hz + pts := frame.Pts * 90000 / int64(time.Second) // 90000Hz // set fields tsframe := &Frame{ Pid: tsVideoPid, StreamID: tsVideoAvc, - Dts: dts * 90, - Pts: pts * 90, + Dts: dts, + Pts: pts, Payload: frame.Payload, - key: frame.Payload[0]&0x1F == h264.NalIdrSlice, + key: nalType == h264.NalIdrSlice, } tsframe.prepareAvcHeader(h264p.meta.Sps, h264p.meta.Pps) diff --git a/av/format/mpegts/muxer.go b/av/format/mpegts/muxer.go index 1a937eb..9824a86 100644 --- a/av/format/mpegts/muxer.go +++ b/av/format/mpegts/muxer.go @@ -15,18 +15,12 @@ import ( // Packetizer 封包器 type Packetizer interface { - Packetize(basePts int64, frame *codec.Frame) error + Packetize(frame *codec.Frame) error } type emptyPacketizer struct{} -func (emptyPacketizer) Packetize(basePts int64, frame *codec.Frame) error { return nil } - -// 网络播放时 PTS(Presentation Time Stamp)的延时 -// 影响视频 Tag 的 CTS 和音频的 DTS(Decoding Time Stamp) -const ( - ptsDelay = 1000 -) +func (emptyPacketizer) Packetize(frame *codec.Frame) error { return nil } // Muxer mpegts muxer from av.Frame(H264[+AAC]) type Muxer struct { @@ -94,7 +88,6 @@ func (muxer *Muxer) process(vp, ap Packetizer) { muxer.recvQueue.Reset() }() - var basePts int64 for !muxer.closed { f := muxer.recvQueue.Pop() if f == nil { @@ -105,17 +98,14 @@ func (muxer *Muxer) process(vp, ap Packetizer) { } frame := f.(*codec.Frame) - if basePts == 0 { - basePts = frame.AbsTimestamp - } switch frame.MediaType { case codec.MediaTypeVideo: - if err := vp.Packetize(basePts, frame); err != nil { + if err := vp.Packetize(frame); err != nil { muxer.logger.Errorf("tsmuxer: muxVideoTag error - %s", err.Error()) } case codec.MediaTypeAudio: - if err := ap.Packetize(basePts, frame); err != nil { + if err := ap.Packetize(frame); err != nil { muxer.logger.Errorf("tsmuxer: muxAudioTag error - %s", err.Error()) } default: diff --git a/av/format/rtp/aac_depacketizer.go b/av/format/rtp/aac_depacketizer.go index 069ef33..caf6d82 100644 --- a/av/format/rtp/aac_depacketizer.go +++ b/av/format/rtp/aac_depacketizer.go @@ -5,33 +5,38 @@ package rtp import ( + "time" + "github.com/cnotch/ipchub/av/codec" "github.com/cnotch/ipchub/av/codec/aac" ) type aacDepacketizer struct { - meta *codec.AudioMeta + meta *codec.AudioMeta w codec.FrameWriter sizeLength int indexLength int - // depacketizeFunc func(packet *Packet) error syncClock SyncClock } // NewAacDepacketizer 实例化 AAC 解包器 func NewAacDepacketizer(meta *codec.AudioMeta, w codec.FrameWriter) Depacketizer { - fe := &aacDepacketizer{ - meta: meta, + aacdp := &aacDepacketizer{ + meta: meta, w: w, sizeLength: 13, indexLength: 3, } - fe.syncClock.RTPTimeUnit = 1000.0 / float64(meta.SampleRate) - return fe + aacdp.syncClock.RTPTimeUnit = float64(time.Second) / float64(meta.SampleRate) + return aacdp } -func (aacdp *aacDepacketizer) Control(p *Packet) error { - aacdp.syncClock.Decode(p.Data) +func (aacdp *aacDepacketizer) Control(basePts *int64, p *Packet) error { + if ok := aacdp.syncClock.Decode(p.Data); ok { + if *basePts == 0 { + *basePts = aacdp.syncClock.NTPTime + } + } return nil } @@ -53,14 +58,14 @@ func (aacdp *aacDepacketizer) Control(p *Packet) error { // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-| // 当 sizelength=6;indexlength=2;indexdeltalength=2 时 // 单帧封装时,rtp payload的长度 = AU-header-lengths(两个字节) + AU-header(6+2) + AU的长度 -func (aacdp *aacDepacketizer) Depacketize(packet *Packet) (err error) { +func (aacdp *aacDepacketizer) Depacketize(basePts int64, packet *Packet) (err error) { if aacdp.syncClock.NTPTime == 0 { // 未收到同步时钟信息,忽略任意包 return } - return aacdp.depacketizeFor2ByteAUHeader(packet) + return aacdp.depacketizeFor2ByteAUHeader(basePts, packet) } -func (aacdp *aacDepacketizer) depacketizeFor2ByteAUHeader(packet *Packet) (err error) { +func (aacdp *aacDepacketizer) depacketizeFor2ByteAUHeader(basePts int64, packet *Packet) (err error) { payload := packet.Payload() // AU-headers-length 2bytes @@ -76,10 +81,12 @@ func (aacdp *aacDepacketizer) depacketizeFor2ByteAUHeader(packet *Packet) (err e for i := 0; i < int(auHeadersCount); i++ { auHeader := uint16(0) | (uint16(auHeaders[0]) << 8) | uint16(auHeaders[1]) frameSize := auHeader >> aacdp.indexLength + pts := aacdp.rtp2ntp(frameTimeStamp) - basePts + ptsDelay frame := &codec.Frame{ - MediaType: codec.MediaTypeAudio, - AbsTimestamp: aacdp.rtp2ntp(frameTimeStamp), - Payload: framesPayload[:frameSize], + MediaType: codec.MediaTypeAudio, + Dts: pts, + Pts: pts, + Payload: framesPayload[:frameSize], } if err = aacdp.w.WriteFrame(frame); err != nil { return @@ -94,7 +101,7 @@ func (aacdp *aacDepacketizer) depacketizeFor2ByteAUHeader(packet *Packet) (err e return } -func (aacdp *aacDepacketizer) depacketizeFor1ByteAUHeader(packet *Packet) (err error) { +func (aacdp *aacDepacketizer) depacketizeFor1ByteAUHeader(basePts int64, packet *Packet) (err error) { payload := packet.Payload() // AU-headers-length 2bytes @@ -110,10 +117,12 @@ func (aacdp *aacDepacketizer) depacketizeFor1ByteAUHeader(packet *Packet) (err e for i := 0; i < int(auHeadersCount); i++ { auHeader := auHeaders[0] frameSize := auHeader >> aacdp.indexLength + pts := aacdp.rtp2ntp(frameTimeStamp) - basePts + ptsDelay frame := &codec.Frame{ - MediaType: codec.MediaTypeAudio, - AbsTimestamp: aacdp.rtp2ntp(frameTimeStamp), - Payload: framesPayload[:frameSize], + MediaType: codec.MediaTypeAudio, + Dts: pts, + Pts: pts, + Payload: framesPayload[:frameSize], } if err = aacdp.w.WriteFrame(frame); err != nil { return diff --git a/av/format/rtp/demuxer.go b/av/format/rtp/demuxer.go index a1b1714..b14f1b5 100644 --- a/av/format/rtp/demuxer.go +++ b/av/format/rtp/demuxer.go @@ -7,108 +7,123 @@ package rtp import ( "fmt" "runtime/debug" + "time" "github.com/cnotch/ipchub/av/codec" "github.com/cnotch/queue" "github.com/cnotch/xlog" ) +// 网络播放时 PTS(Presentation Time Stamp)的延时 +const ( + ptsDelay = int64(time.Second) +) + // Depacketizer 解包器 type Depacketizer interface { - Control(p *Packet) error - Depacketize(p *Packet) error + Control(basePts *int64, p *Packet) error + Depacketize(basePts int64, p *Packet) error } +type emptyDepacketizer struct{} + +func (emptyDepacketizer) Control(basePts *int64, p *Packet) error { return nil } +func (emptyDepacketizer) Depacketize(basePts int64, p *Packet) error { return nil } + // Demuxer 帧转换器 type Demuxer struct { - closed bool - recvQueue *queue.SyncQueue - depacketizeFuncs [4]func(packet *Packet) error - logger *xlog.Logger + closed bool + recvQueue *queue.SyncQueue + vdp Depacketizer + adp Depacketizer + logger *xlog.Logger } -func emptyDepacketize(*Packet) error { return nil } +func emptyDepacketize(*int64, *Packet) error { return nil } // NewDemuxer 创建 rtp.Packet 解封装处理器。 func NewDemuxer(video *codec.VideoMeta, audio *codec.AudioMeta, fw codec.FrameWriter, logger *xlog.Logger) (*Demuxer, error) { - fc := &Demuxer{ + demuxer := &Demuxer{ recvQueue: queue.NewSyncQueue(), closed: false, logger: logger, } - var videoDepacketizer, audioDepacketizer Depacketizer switch video.Codec { case "H264": - videoDepacketizer = NewH264Depacketizer(video, fw) + demuxer.vdp = NewH264Depacketizer(video, fw) case "H265": - videoDepacketizer = NewH265Depacketizer(video, fw) + demuxer.vdp = NewH265Depacketizer(video, fw) default: return nil, fmt.Errorf("rtp demuxer unsupport video codec type:%s", video.Codec) } - - fc.depacketizeFuncs[ChannelVideo] = videoDepacketizer.Depacketize - fc.depacketizeFuncs[ChannelVideoControl] = videoDepacketizer.Control - if audio.Codec == "AAC" { - audioDepacketizer = NewAacDepacketizer(audio, fw) - } - if audioDepacketizer != nil { - fc.depacketizeFuncs[ChannelAudio] = audioDepacketizer.Depacketize - fc.depacketizeFuncs[ChannelAudioControl] = audioDepacketizer.Control + demuxer.adp = NewAacDepacketizer(audio, fw) } else { - fc.depacketizeFuncs[ChannelAudio] = emptyDepacketize - fc.depacketizeFuncs[ChannelAudioControl] = emptyDepacketize + demuxer.adp = emptyDepacketizer{} } - go fc.process() - return fc, nil + go demuxer.process() + return demuxer, nil } -func (dm *Demuxer) process() { +func (demuxer *Demuxer) process() { defer func() { defer func() { // 避免 handler 再 panic recover() }() if r := recover(); r != nil { - dm.logger.Errorf("FrameConverter routine panic;r = %v \n %s", r, debug.Stack()) + demuxer.logger.Errorf("FrameConverter routine panic;r = %v \n %s", r, debug.Stack()) } // 尽早通知GC,回收内存 - dm.recvQueue.Reset() + demuxer.recvQueue.Reset() }() - for !dm.closed { - p := dm.recvQueue.Pop() + var basePts int64 + for !demuxer.closed { + p := demuxer.recvQueue.Pop() if p == nil { - if !dm.closed { - dm.logger.Warn("FrameConverter:receive nil packet") + if !demuxer.closed { + demuxer.logger.Warn("FrameConverter:receive nil packet") } continue } packet := p.(*Packet) - if err := dm.depacketizeFuncs[int(packet.Channel)](packet); err != nil { - dm.logger.Errorf("FrameConverter: extract rtp frame error :%s", err.Error()) + var err error + switch packet.Channel { + case ChannelVideo: + err = demuxer.vdp.Depacketize(basePts, packet) + case ChannelVideoControl: + err = demuxer.vdp.Control(&basePts, packet) + case ChannelAudio: + err = demuxer.adp.Depacketize(basePts, packet) + case ChannelAudioControl: + err = demuxer.adp.Control(&basePts, packet) + } + + if err != nil { + demuxer.logger.Errorf("rtp demuxer: depackeetize rtp frame error :%s", err.Error()) // break } } } // Close . -func (dm *Demuxer) Close() error { - if dm.closed { +func (demuxer *Demuxer) Close() error { + if demuxer.closed { return nil } - dm.closed = true - dm.recvQueue.Signal() + demuxer.closed = true + demuxer.recvQueue.Signal() return nil } // WritePacket . -func (fc *Demuxer) WriteRtpPacket(packet *Packet) error { - fc.recvQueue.Push(packet) +func (demuxer *Demuxer) WriteRtpPacket(packet *Packet) error { + demuxer.recvQueue.Push(packet) return nil } diff --git a/av/format/rtp/demuxer_test.go b/av/format/rtp/demuxer_test.go index 1fba464..756831e 100644 --- a/av/format/rtp/demuxer_test.go +++ b/av/format/rtp/demuxer_test.go @@ -86,6 +86,10 @@ type frameWriter struct { } func (fw *frameWriter) WriteFrame(frame *codec.Frame) (err error) { + dts := frame.Dts / int64(time.Millisecond) + pts := frame.Pts / int64(time.Millisecond) + _ = dts + _ = pts if frame.MediaType == codec.MediaTypeVideo { fw.videoFrames++ } else { diff --git a/av/format/rtp/h264_depacketizer.go b/av/format/rtp/h264_depacketizer.go index 021fee8..dc8c059 100644 --- a/av/format/rtp/h264_depacketizer.go +++ b/av/format/rtp/h264_depacketizer.go @@ -6,6 +6,7 @@ package rtp import ( "fmt" + "time" "github.com/cnotch/ipchub/av/codec" "github.com/cnotch/ipchub/av/codec/h264" @@ -13,28 +14,36 @@ import ( type h264Depacketizer struct { fragments []*Packet // 分片包 - meta *codec.VideoMeta + meta *codec.VideoMeta + metaReady bool + nextDts float64 + dtsStep float64 + startOn time.Time w codec.FrameWriter syncClock SyncClock } // NewH264Depacketizer 实例化 H264 帧提取器 func NewH264Depacketizer(meta *codec.VideoMeta, w codec.FrameWriter) Depacketizer { - fe := &h264Depacketizer{ - meta: meta, + h264dp := &h264Depacketizer{ + meta: meta, fragments: make([]*Packet, 0, 16), w: w, } - fe.syncClock.RTPTimeUnit = 1000.0 / float64(meta.ClockRate) - return fe + h264dp.syncClock.RTPTimeUnit = float64(time.Second) / float64(meta.ClockRate) + return h264dp } -func (h264dp *h264Depacketizer) Control(p *Packet) error { - h264dp.syncClock.Decode(p.Data) +func (h264dp *h264Depacketizer) Control(basePts *int64, p *Packet) error { + if ok := h264dp.syncClock.Decode(p.Data); ok { + if *basePts == 0 { + *basePts = h264dp.syncClock.NTPTime + } + } return nil } -func (h264dp *h264Depacketizer) Depacketize(packet *Packet) (err error) { +func (h264dp *h264Depacketizer) Depacketize(basePts int64, packet *Packet) (err error) { if h264dp.syncClock.NTPTime == 0 { // 未收到同步时钟信息,忽略任意包 return } @@ -66,22 +75,21 @@ func (h264dp *h264Depacketizer) Depacketize(packet *Packet) (err error) { // | :...OPTIONAL RTP padding | // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ frame := &codec.Frame{ - MediaType: codec.MediaTypeVideo, - AbsTimestamp: h264dp.rtp2ntp(packet.Timestamp), - Payload: payload, + MediaType: codec.MediaTypeVideo, + Payload: payload, } - err = h264dp.writeFrame(frame) + err = h264dp.writeFrame(basePts, packet.Timestamp, frame) case naluType == h264.NalStapaInRtp: - err = h264dp.depacketizeStapa(packet) + err = h264dp.depacketizeStapa(basePts, packet) case naluType == h264.NalFuAInRtp: - err = h264dp.depacketizeFuA(packet) + err = h264dp.depacketizeFuA(basePts, packet) default: err = fmt.Errorf("nalu type %d is currently not handled", naluType) } return } -func (h264dp *h264Depacketizer) depacketizeStapa(packet *Packet) (err error) { +func (h264dp *h264Depacketizer) depacketizeStapa(basePts int64, packet *Packet) (err error) { payload := packet.Payload() header := payload[0] @@ -111,13 +119,12 @@ func (h264dp *h264Depacketizer) depacketizeStapa(packet *Packet) (err error) { off += 2 frame := &codec.Frame{ - MediaType: codec.MediaTypeVideo, - AbsTimestamp: h264dp.rtp2ntp(packet.Timestamp), - Payload: make([]byte, nalSize), + MediaType: codec.MediaTypeVideo, + Payload: make([]byte, nalSize), } copy(frame.Payload, payload[off:]) frame.Payload[0] = 0 | (header & 0x60) | (frame.Payload[0] & 0x1F) - if err = h264dp.writeFrame(frame); err != nil { + if err = h264dp.writeFrame(basePts, packet.Timestamp,frame); err != nil { return } @@ -129,7 +136,7 @@ func (h264dp *h264Depacketizer) depacketizeStapa(packet *Packet) (err error) { return } -func (h264dp *h264Depacketizer) depacketizeFuA(packet *Packet) (err error) { +func (h264dp *h264Depacketizer) depacketizeFuA(basePts int64, packet *Packet) (err error) { payload := packet.Payload() header := payload[0] @@ -171,9 +178,8 @@ func (h264dp *h264Depacketizer) depacketizeFuA(packet *Packet) (err error) { } frame := &codec.Frame{ - MediaType: codec.MediaTypeVideo, - AbsTimestamp: h264dp.rtp2ntp(packet.Timestamp), - Payload: make([]byte, frameLen)} + MediaType: codec.MediaTypeVideo, + Payload: make([]byte, frameLen)} frame.Payload[0] = (header & 0x60) | (fuHeader & 0x1F) offset := 1 @@ -185,7 +191,7 @@ func (h264dp *h264Depacketizer) depacketizeFuA(packet *Packet) (err error) { // 清空分片缓存 h264dp.fragments = h264dp.fragments[:0] - err = h264dp.writeFrame(frame) + err = h264dp.writeFrame(basePts, packet.Timestamp,frame) } return @@ -195,7 +201,7 @@ func (h264dp *h264Depacketizer) rtp2ntp(timestamp uint32) int64 { return h264dp.syncClock.Rtp2Ntp(timestamp) } -func (h264dp *h264Depacketizer) writeFrame(frame *codec.Frame) error { +func (h264dp *h264Depacketizer) writeFrame(basePts int64, rtpTimestamp uint32, frame *codec.Frame) error { nalType := frame.Payload[0] & 0x1f switch nalType { case h264.NalSps: @@ -209,5 +215,25 @@ func (h264dp *h264Depacketizer) writeFrame(frame *codec.Frame) error { case h264.NalFillerData: // ?ignore... return nil } + + if !h264dp.metaReady { + if !h264.MetadataIsReady(h264dp.meta) { + return nil + } + if h264dp.meta.FixedFrameRate { + h264dp.dtsStep = float64(time.Second) / h264dp.meta.FrameRate + } else { + h264dp.startOn = time.Now() + } + h264dp.metaReady = true + } + + frame.Pts = h264dp.rtp2ntp(rtpTimestamp) - basePts+ptsDelay + if h264dp.dtsStep > 0 { + frame.Dts = int64(h264dp.nextDts) + h264dp.nextDts += h264dp.dtsStep + } else { + frame.Dts = int64(time.Now().Sub(h264dp.startOn)) + } return h264dp.w.WriteFrame(frame) } diff --git a/av/format/rtp/h265_depacketizer.go b/av/format/rtp/h265_depacketizer.go index 08bee2e..ade1315 100644 --- a/av/format/rtp/h265_depacketizer.go +++ b/av/format/rtp/h265_depacketizer.go @@ -5,30 +5,40 @@ package rtp import ( + "time" + "github.com/cnotch/ipchub/av/codec" "github.com/cnotch/ipchub/av/codec/hevc" ) type h265Depacketizer struct { fragments []*Packet // 分片包 - meta *codec.VideoMeta + meta *codec.VideoMeta + metaReady bool + nextDts float64 + dtsStep float64 + startOn time.Time w codec.FrameWriter syncClock SyncClock } // NewH265Depacketizer 实例化 H265 帧提取器 func NewH265Depacketizer(meta *codec.VideoMeta, w codec.FrameWriter) Depacketizer { - fe := &h265Depacketizer{ - meta: meta, + h265dp := &h265Depacketizer{ + meta: meta, fragments: make([]*Packet, 0, 16), w: w, } - fe.syncClock.RTPTimeUnit = 1000.0 / float64(meta.ClockRate) - return fe + h265dp.syncClock.RTPTimeUnit = float64(time.Second) / float64(meta.ClockRate) + return h265dp } -func (h265dp *h265Depacketizer) Control(p *Packet) error { - h265dp.syncClock.Decode(p.Data) +func (h265dp *h265Depacketizer) Control(basePts *int64, p *Packet) error { + if ok := h265dp.syncClock.Decode(p.Data); ok { + if *basePts == 0 { + *basePts = h265dp.syncClock.NTPTime + } + } return nil } @@ -56,7 +66,7 @@ func (h265dp *h265Depacketizer) Control(p *Packet) error { * End fragment (E): 1 bit * FuType: 6 bits */ -func (h265dp *h265Depacketizer) Depacketize(packet *Packet) (err error) { +func (h265dp *h265Depacketizer) Depacketize(basePts int64, packet *Packet) (err error) { if h265dp.syncClock.NTPTime == 0 { // 未收到同步时钟信息,忽略任意包 return } @@ -70,21 +80,20 @@ func (h265dp *h265Depacketizer) Depacketize(packet *Packet) (err error) { switch naluType { case hevc.NalStapInRtp: // 在RTP中的聚合(AP) - return h265dp.depacketizeStap(packet) + return h265dp.depacketizeStap(basePts, packet) case hevc.NalFuInRtp: // 在RTP中的扩展,分片(FU) - return h265dp.depacketizeFu(packet) + return h265dp.depacketizeFu(basePts, packet) default: frame := &codec.Frame{ - MediaType: codec.MediaTypeVideo, - AbsTimestamp: h265dp.rtp2ntp(packet.Timestamp), - Payload: payload, + MediaType: codec.MediaTypeVideo, + Payload: payload, } - err = h265dp.writeFrame(frame) + err = h265dp.writeFrame(basePts, packet.Timestamp, frame) return } } -func (h265dp *h265Depacketizer) depacketizeStap(packet *Packet) (err error) { +func (h265dp *h265Depacketizer) depacketizeStap(basePts int64, packet *Packet) (err error) { payload := packet.Payload() off := 2 // 跳过 STAP NAL HDR @@ -98,12 +107,11 @@ func (h265dp *h265Depacketizer) depacketizeStap(packet *Packet) (err error) { off += 2 frame := &codec.Frame{ - MediaType: codec.MediaTypeVideo, - AbsTimestamp: h265dp.rtp2ntp(packet.Timestamp), - Payload: make([]byte, nalSize), + MediaType: codec.MediaTypeVideo, + Payload: make([]byte, nalSize), } copy(frame.Payload, payload[off:]) - if err = h265dp.writeFrame(frame); err != nil { + if err = h265dp.writeFrame(basePts, packet.Timestamp, frame); err != nil { return } off += int(nalSize) @@ -114,7 +122,7 @@ func (h265dp *h265Depacketizer) depacketizeStap(packet *Packet) (err error) { return } -func (h265dp *h265Depacketizer) depacketizeFu(packet *Packet) (err error) { +func (h265dp *h265Depacketizer) depacketizeFu(basePts int64, packet *Packet) (err error) { payload := packet.Payload() rawDataOffset := 3 // 原始数据的偏移 = FU indicator + header @@ -148,9 +156,8 @@ func (h265dp *h265Depacketizer) depacketizeFu(packet *Packet) (err error) { } frame := &codec.Frame{ - MediaType: codec.MediaTypeVideo, - AbsTimestamp: h265dp.rtp2ntp(packet.Timestamp), - Payload: make([]byte, frameLen), + MediaType: codec.MediaTypeVideo, + Payload: make([]byte, frameLen), } frame.Payload[0] = (payload[0] & 0x81) | (fuHeader&0x3f)<<1 @@ -164,7 +171,7 @@ func (h265dp *h265Depacketizer) depacketizeFu(packet *Packet) (err error) { // 清空分片缓存 h265dp.fragments = h265dp.fragments[:0] - err = h265dp.writeFrame(frame) + err = h265dp.writeFrame(basePts, packet.Timestamp, frame) } return @@ -174,7 +181,7 @@ func (h265dp *h265Depacketizer) rtp2ntp(timestamp uint32) int64 { return h265dp.syncClock.Rtp2Ntp(timestamp) } -func (h265dp *h265Depacketizer) writeFrame(frame *codec.Frame) error { +func (h265dp *h265Depacketizer) writeFrame(basePts int64, rtpTimestamp uint32, frame *codec.Frame) error { nalType := (frame.Payload[0] >> 1) & 0x3f switch nalType { case hevc.NalVps: @@ -190,5 +197,25 @@ func (h265dp *h265Depacketizer) writeFrame(frame *codec.Frame) error { h265dp.meta.Pps = frame.Payload } } + + if !h265dp.metaReady { + if !hevc.MetadataIsReady(h265dp.meta) { + return nil + } + if h265dp.meta.FixedFrameRate { + h265dp.dtsStep = float64(time.Second) / h265dp.meta.FrameRate + } else { + h265dp.startOn = time.Now() + } + h265dp.metaReady = true + } + + frame.Pts = h265dp.rtp2ntp(rtpTimestamp) - basePts + ptsDelay + if h265dp.dtsStep > 0 { + frame.Dts = int64(h265dp.nextDts) + h265dp.nextDts += h265dp.dtsStep + } else { + frame.Dts = int64(time.Now().Sub(h265dp.startOn)) + } return h265dp.w.WriteFrame(frame) } diff --git a/av/format/rtp/syncclock.go b/av/format/rtp/syncclock.go index 4bbe1f3..87c51b8 100644 --- a/av/format/rtp/syncclock.go +++ b/av/format/rtp/syncclock.go @@ -21,12 +21,12 @@ type SyncClock struct { // RTP Timestamp:与NTP时间戳对应, // 与RTP数据包中的RTP时间戳具有相同的单位和随机初始值。 RTPTime uint32 - RTPTimeUnit float64 // RTP时间单位,每个RTP时间的豪秒数 + RTPTimeUnit float64 // RTP时间单位,每个RTP时间的纳秒数 } // LocalTime 本地时间 func (sc *SyncClock) LocalTime() time.Time { - return time.Unix(0, sc.NTPTime*int64(time.Millisecond)).In(time.Local) + return time.Unix(0, sc.NTPTime).In(time.Local) } // Decode . @@ -36,7 +36,6 @@ func (sc *SyncClock) Decode(data []byte) (ok bool) { lsw := binary.BigEndian.Uint32(data[12:]) sc.RTPTime = binary.BigEndian.Uint32(data[16:]) sc.NTPTime = int64(msw-jan1970)*int64(time.Second) + (int64(lsw)*1000_000_000)>>32 - sc.NTPTime /= int64(time.Millisecond) ok = true } return