remove basePts

This commit is contained in:
cnotch
2024-11-29 18:51:18 +08:00
parent b13c574d93
commit c8467a2180
5 changed files with 87 additions and 96 deletions

View File

@@ -5,8 +5,6 @@
package rtp package rtp
import ( import (
"time"
"github.com/cnotch/ipchub/av/codec" "github.com/cnotch/ipchub/av/codec"
"github.com/cnotch/ipchub/av/codec/aac" "github.com/cnotch/ipchub/av/codec/aac"
) )
@@ -27,36 +25,34 @@ func NewAacDepacketizer(meta *codec.AudioMeta, w codec.FrameWriter) Depacketizer
sizeLength: 13, sizeLength: 13,
indexLength: 3, indexLength: 3,
} }
aacdp.syncClock.RTPTimeUnit = float64(time.Second) / float64(meta.SampleRate) aacdp.syncClock.Init(meta.SampleRate)
return aacdp return aacdp
} }
// 以下是当 sizelength=13;indexlength=3;indexdeltalength=3 时 // 以下是当 sizelength=13;indexlength=3;indexdeltalength=3 时
// Au-header = 13+3 bits(2byte) 的示意图 // Au-header = 13+3 bits(2byte) 的示意图
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 // 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-| // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-|
// | AU-headers-length | // | AU-headers-length |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-| // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-|
// | AU-header(1) | // | AU-header(1) |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-| // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-|
// | AU-header(2) | // | AU-header(2) |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-| // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-|
// | ... | // | ... |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-| // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-|
// | AU-header(n) | // | AU-header(n) |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-| // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-|
// | pading bits | // | pading bits |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-| // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-|
//
// 当 sizelength=6;indexlength=2;indexdeltalength=2 时 // 当 sizelength=6;indexlength=2;indexdeltalength=2 时
// 单帧封装时rtp payload的长度 = AU-header-lengths(两个字节) + AU-header(6+2) + AU的长度 // 单帧封装时rtp payload的长度 = AU-header-lengths(两个字节) + AU-header(6+2) + AU的长度
func (aacdp *aacDepacketizer) Depacketize(basePts int64, packet *Packet) (err error) { func (aacdp *aacDepacketizer) Depacketize(packet *Packet) (err error) {
if aacdp.syncClock.NTPTime == 0 { // 未收到同步时钟信息,忽略任意包 return aacdp.depacketizeFor2ByteAUHeader(packet)
return
}
return aacdp.depacketizeFor2ByteAUHeader(basePts, packet)
} }
func (aacdp *aacDepacketizer) depacketizeFor2ByteAUHeader(basePts int64, packet *Packet) (err error) { func (aacdp *aacDepacketizer) depacketizeFor2ByteAUHeader(packet *Packet) (err error) {
payload := packet.Payload() payload := packet.Payload()
// AU-headers-length 2bytes // AU-headers-length 2bytes
@@ -72,11 +68,11 @@ func (aacdp *aacDepacketizer) depacketizeFor2ByteAUHeader(basePts int64, packet
for i := 0; i < int(auHeadersCount); i++ { for i := 0; i < int(auHeadersCount); i++ {
auHeader := uint16(0) | (uint16(auHeaders[0]) << 8) | uint16(auHeaders[1]) auHeader := uint16(0) | (uint16(auHeaders[0]) << 8) | uint16(auHeaders[1])
frameSize := auHeader >> aacdp.indexLength frameSize := auHeader >> aacdp.indexLength
pts := aacdp.rtp2ntp(frameTimeStamp) - basePts + ptsDelay pts := aacdp.rtp2ntp(frameTimeStamp) + ptsDelay
frame := &codec.Frame{ frame := &codec.Frame{
MediaType: codec.MediaTypeAudio, MediaType: codec.MediaTypeAudio,
Dts: pts, Dts: pts,
Pts: pts, Pts: pts,
Payload: framesPayload[:frameSize], Payload: framesPayload[:frameSize],
} }
if err = aacdp.w.WriteFrame(frame); err != nil { if err = aacdp.w.WriteFrame(frame); err != nil {
@@ -92,7 +88,7 @@ func (aacdp *aacDepacketizer) depacketizeFor2ByteAUHeader(basePts int64, packet
return return
} }
func (aacdp *aacDepacketizer) depacketizeFor1ByteAUHeader(basePts int64, packet *Packet) (err error) { func (aacdp *aacDepacketizer) depacketizeFor1ByteAUHeader(packet *Packet) (err error) {
payload := packet.Payload() payload := packet.Payload()
// AU-headers-length 2bytes // AU-headers-length 2bytes
@@ -108,11 +104,11 @@ func (aacdp *aacDepacketizer) depacketizeFor1ByteAUHeader(basePts int64, packet
for i := 0; i < int(auHeadersCount); i++ { for i := 0; i < int(auHeadersCount); i++ {
auHeader := auHeaders[0] auHeader := auHeaders[0]
frameSize := auHeader >> aacdp.indexLength frameSize := auHeader >> aacdp.indexLength
pts := aacdp.rtp2ntp(frameTimeStamp) - basePts + ptsDelay pts := aacdp.rtp2ntp(frameTimeStamp) + ptsDelay
frame := &codec.Frame{ frame := &codec.Frame{
MediaType: codec.MediaTypeAudio, MediaType: codec.MediaTypeAudio,
Dts: pts, Dts: pts,
Pts: pts, Pts: pts,
Payload: framesPayload[:frameSize], Payload: framesPayload[:frameSize],
} }
if err = aacdp.w.WriteFrame(frame); err != nil { if err = aacdp.w.WriteFrame(frame); err != nil {

View File

@@ -16,45 +16,35 @@ import (
// 网络播放时 PTSPresentation Time Stamp的延时 // 网络播放时 PTSPresentation Time Stamp的延时
const ( const (
ptsDelay = int64(time.Second)/2 ptsDelay = int64(time.Second) / 2
) )
// Depacketizer 解包器 // Depacketizer 解包器
type Depacketizer interface { type Depacketizer interface {
Control(basePts *int64, p *Packet) error Control(p *Packet) error
Depacketize(basePts int64, p *Packet) error Depacketize(p *Packet) error
} }
type emptyDepacketizer struct{} type emptyDepacketizer struct{}
func (emptyDepacketizer) Control(basePts *int64, p *Packet) error { return nil } func (emptyDepacketizer) Control(p *Packet) error { return nil }
func (emptyDepacketizer) Depacketize(basePts int64, p *Packet) error { return nil } func (emptyDepacketizer) Depacketize(p *Packet) error { return nil }
type depacketizer struct { type depacketizer struct {
syncClock SyncClock syncClock SyncClock
} }
// func (dp *depacketizer) ForcInitSyncClock(basePts *int64, p *Packet) { func (dp *depacketizer) Control(p *Packet) error {
// if dp.syncClock.NTPTime == 0 { if dp.syncClock.RTPTime == 0 {
// dp.syncClock.RTPTime = p.Timestamp if ok := dp.syncClock.Decode(p.Data); ok {
// dp.syncClock.NTPTime = time.Now().Local().UnixNano()
// if *basePts == 0 {
// *basePts = dp.syncClock.NTPTime
// }
// }
// }
func (dp *depacketizer) Control(basePts *int64, p *Packet) error {
if ok := dp.syncClock.Decode(p.Data); ok {
if *basePts == 0 {
*basePts = dp.syncClock.NTPTime
} }
} }
return nil return nil
} }
func (dp *depacketizer) rtp2ntp(timestamp uint32) int64 { func (dp *depacketizer) rtp2ntp(timestamp uint32) int64 {
return dp.syncClock.Rtp2Ntp(timestamp) return dp.syncClock.RelativeNtp(timestamp)
} }
// Demuxer 帧转换器 // Demuxer 帧转换器
@@ -108,7 +98,6 @@ func (demuxer *Demuxer) process() {
demuxer.recvQueue.Reset() demuxer.recvQueue.Reset()
}() }()
var basePts int64
for !demuxer.closed { for !demuxer.closed {
p := demuxer.recvQueue.Pop() p := demuxer.recvQueue.Pop()
if p == nil { if p == nil {
@@ -122,13 +111,13 @@ func (demuxer *Demuxer) process() {
var err error var err error
switch packet.Channel { switch packet.Channel {
case ChannelVideo: case ChannelVideo:
err = demuxer.vdp.Depacketize(basePts, packet) err = demuxer.vdp.Depacketize(packet)
case ChannelVideoControl: case ChannelVideoControl:
err = demuxer.vdp.Control(&basePts, packet) err = demuxer.vdp.Control(packet)
case ChannelAudio: case ChannelAudio:
err = demuxer.adp.Depacketize(basePts, packet) err = demuxer.adp.Depacketize(packet)
case ChannelAudioControl: case ChannelAudioControl:
err = demuxer.adp.Control(&basePts, packet) err = demuxer.adp.Control(packet)
} }
if err != nil { if err != nil {

View File

@@ -19,7 +19,6 @@ type h264Depacketizer struct {
metaReady bool metaReady bool
nextDts float64 nextDts float64
dtsStep float64 dtsStep float64
startOn time.Time
w codec.FrameWriter w codec.FrameWriter
} }
@@ -30,15 +29,11 @@ func NewH264Depacketizer(meta *codec.VideoMeta, w codec.FrameWriter) Depacketize
fragments: make([]*Packet, 0, 16), fragments: make([]*Packet, 0, 16),
w: w, w: w,
} }
h264dp.syncClock.RTPTimeUnit = float64(time.Second) / float64(meta.ClockRate) h264dp.syncClock.Init(meta.ClockRate)
return h264dp return h264dp
} }
func (h264dp *h264Depacketizer) Depacketize(basePts int64, packet *Packet) (err error) { func (h264dp *h264Depacketizer) Depacketize(packet *Packet) (err error) {
if h264dp.syncClock.NTPTime == 0 { // 未收到同步时钟信息,忽略任意包
return
}
payload := packet.Payload() payload := packet.Payload()
if len(payload) < 3 { if len(payload) < 3 {
return return
@@ -69,18 +64,18 @@ func (h264dp *h264Depacketizer) Depacketize(basePts int64, packet *Packet) (err
MediaType: codec.MediaTypeVideo, MediaType: codec.MediaTypeVideo,
Payload: payload, Payload: payload,
} }
err = h264dp.writeFrame(basePts, packet.Timestamp, frame) err = h264dp.writeFrame(packet.Timestamp, frame)
case naluType == h264.NalStapaInRtp: case naluType == h264.NalStapaInRtp:
err = h264dp.depacketizeStapa(basePts, packet) err = h264dp.depacketizeStapa(packet)
case naluType == h264.NalFuAInRtp: case naluType == h264.NalFuAInRtp:
err = h264dp.depacketizeFuA(basePts, packet) err = h264dp.depacketizeFuA(packet)
default: default:
err = fmt.Errorf("nalu type %d is currently not handled", naluType) err = fmt.Errorf("nalu type %d is currently not handled", naluType)
} }
return return
} }
func (h264dp *h264Depacketizer) depacketizeStapa(basePts int64, packet *Packet) (err error) { func (h264dp *h264Depacketizer) depacketizeStapa(packet *Packet) (err error) {
payload := packet.Payload() payload := packet.Payload()
header := payload[0] header := payload[0]
@@ -115,7 +110,7 @@ func (h264dp *h264Depacketizer) depacketizeStapa(basePts int64, packet *Packet)
} }
copy(frame.Payload, payload[off:]) copy(frame.Payload, payload[off:])
frame.Payload[0] = 0 | (header & 0x60) | (frame.Payload[0] & 0x1F) frame.Payload[0] = 0 | (header & 0x60) | (frame.Payload[0] & 0x1F)
if err = h264dp.writeFrame(basePts, packet.Timestamp, frame); err != nil { if err = h264dp.writeFrame(packet.Timestamp, frame); err != nil {
return return
} }
@@ -127,7 +122,7 @@ func (h264dp *h264Depacketizer) depacketizeStapa(basePts int64, packet *Packet)
return return
} }
func (h264dp *h264Depacketizer) depacketizeFuA(basePts int64, packet *Packet) (err error) { func (h264dp *h264Depacketizer) depacketizeFuA(packet *Packet) (err error) {
payload := packet.Payload() payload := packet.Payload()
header := payload[0] header := payload[0]
@@ -182,13 +177,13 @@ func (h264dp *h264Depacketizer) depacketizeFuA(basePts int64, packet *Packet) (e
// 清空分片缓存 // 清空分片缓存
h264dp.fragments = h264dp.fragments[:0] h264dp.fragments = h264dp.fragments[:0]
err = h264dp.writeFrame(basePts, packet.Timestamp, frame) err = h264dp.writeFrame(packet.Timestamp, frame)
} }
return return
} }
func (h264dp *h264Depacketizer) writeFrame(basePts int64, rtpTimestamp uint32, frame *codec.Frame) error { func (h264dp *h264Depacketizer) writeFrame(rtpTimestamp uint32, frame *codec.Frame) error {
nalType := frame.Payload[0] & 0x1f nalType := frame.Payload[0] & 0x1f
switch nalType { switch nalType {
case h264.NalSps: case h264.NalSps:
@@ -209,18 +204,16 @@ func (h264dp *h264Depacketizer) writeFrame(basePts int64, rtpTimestamp uint32, f
} }
if h264dp.meta.FixedFrameRate { if h264dp.meta.FixedFrameRate {
h264dp.dtsStep = float64(time.Second) / h264dp.meta.FrameRate h264dp.dtsStep = float64(time.Second) / h264dp.meta.FrameRate
} else {
h264dp.startOn = time.Now()
} }
h264dp.metaReady = true h264dp.metaReady = true
} }
frame.Pts = h264dp.rtp2ntp(rtpTimestamp) - basePts + ptsDelay frame.Pts = h264dp.rtp2ntp(rtpTimestamp) + ptsDelay
if h264dp.dtsStep > 0 { if h264dp.dtsStep > 0 {
frame.Dts = int64(h264dp.nextDts) frame.Dts = int64(h264dp.nextDts)
h264dp.nextDts += h264dp.dtsStep h264dp.nextDts += h264dp.dtsStep
} else { } else {
frame.Dts = int64(time.Now().Sub(h264dp.startOn)) frame.Dts = h264dp.syncClock.RelativeNtpNow()
} }
return h264dp.w.WriteFrame(frame) return h264dp.w.WriteFrame(frame)
} }

View File

@@ -18,7 +18,6 @@ type h265Depacketizer struct {
metaReady bool metaReady bool
nextDts float64 nextDts float64
dtsStep float64 dtsStep float64
startOn time.Time
w codec.FrameWriter w codec.FrameWriter
} }
@@ -29,7 +28,7 @@ func NewH265Depacketizer(meta *codec.VideoMeta, w codec.FrameWriter) Depacketize
fragments: make([]*Packet, 0, 16), fragments: make([]*Packet, 0, 16),
w: w, w: w,
} }
h265dp.syncClock.RTPTimeUnit = float64(time.Second) / float64(meta.ClockRate) h265dp.syncClock.Init(meta.ClockRate)
return h265dp return h265dp
} }
@@ -57,11 +56,7 @@ func NewH265Depacketizer(meta *codec.VideoMeta, w codec.FrameWriter) Depacketize
* End fragment (E): 1 bit * End fragment (E): 1 bit
* FuType: 6 bits * FuType: 6 bits
*/ */
func (h265dp *h265Depacketizer) Depacketize(basePts int64, packet *Packet) (err error) { func (h265dp *h265Depacketizer) Depacketize(packet *Packet) (err error) {
if h265dp.syncClock.NTPTime == 0 { // 未收到同步时钟信息,忽略任意包
return
}
payload := packet.Payload() payload := packet.Payload()
if len(payload) < 3 { if len(payload) < 3 {
return return
@@ -71,20 +66,20 @@ func (h265dp *h265Depacketizer) Depacketize(basePts int64, packet *Packet) (err
switch naluType { switch naluType {
case hevc.NalStapInRtp: // 在RTP中的聚合AP case hevc.NalStapInRtp: // 在RTP中的聚合AP
return h265dp.depacketizeStap(basePts, packet) return h265dp.depacketizeStap(packet)
case hevc.NalFuInRtp: // 在RTP中的扩展,分片(FU) case hevc.NalFuInRtp: // 在RTP中的扩展,分片(FU)
return h265dp.depacketizeFu(basePts, packet) return h265dp.depacketizeFu(packet)
default: default:
frame := &codec.Frame{ frame := &codec.Frame{
MediaType: codec.MediaTypeVideo, MediaType: codec.MediaTypeVideo,
Payload: payload, Payload: payload,
} }
err = h265dp.writeFrame(basePts, packet.Timestamp, frame) err = h265dp.writeFrame(packet.Timestamp, frame)
return return
} }
} }
func (h265dp *h265Depacketizer) depacketizeStap(basePts int64, packet *Packet) (err error) { func (h265dp *h265Depacketizer) depacketizeStap(packet *Packet) (err error) {
payload := packet.Payload() payload := packet.Payload()
off := 2 // 跳过 STAP NAL HDR off := 2 // 跳过 STAP NAL HDR
@@ -102,7 +97,7 @@ func (h265dp *h265Depacketizer) depacketizeStap(basePts int64, packet *Packet) (
Payload: make([]byte, nalSize), Payload: make([]byte, nalSize),
} }
copy(frame.Payload, payload[off:]) copy(frame.Payload, payload[off:])
if err = h265dp.writeFrame(basePts, packet.Timestamp, frame); err != nil { if err = h265dp.writeFrame(packet.Timestamp, frame); err != nil {
return return
} }
off += int(nalSize) off += int(nalSize)
@@ -113,7 +108,7 @@ func (h265dp *h265Depacketizer) depacketizeStap(basePts int64, packet *Packet) (
return return
} }
func (h265dp *h265Depacketizer) depacketizeFu(basePts int64, packet *Packet) (err error) { func (h265dp *h265Depacketizer) depacketizeFu(packet *Packet) (err error) {
payload := packet.Payload() payload := packet.Payload()
rawDataOffset := 3 // 原始数据的偏移 = FU indicator + header rawDataOffset := 3 // 原始数据的偏移 = FU indicator + header
@@ -162,13 +157,13 @@ func (h265dp *h265Depacketizer) depacketizeFu(basePts int64, packet *Packet) (er
// 清空分片缓存 // 清空分片缓存
h265dp.fragments = h265dp.fragments[:0] h265dp.fragments = h265dp.fragments[:0]
err = h265dp.writeFrame(basePts, packet.Timestamp, frame) err = h265dp.writeFrame(packet.Timestamp, frame)
} }
return return
} }
func (h265dp *h265Depacketizer) writeFrame(basePts int64, rtpTimestamp uint32, frame *codec.Frame) error { func (h265dp *h265Depacketizer) writeFrame(rtpTimestamp uint32, frame *codec.Frame) error {
nalType := (frame.Payload[0] >> 1) & 0x3f nalType := (frame.Payload[0] >> 1) & 0x3f
switch nalType { switch nalType {
case hevc.NalVps: case hevc.NalVps:
@@ -191,18 +186,16 @@ func (h265dp *h265Depacketizer) writeFrame(basePts int64, rtpTimestamp uint32, f
} }
if h265dp.meta.FixedFrameRate { if h265dp.meta.FixedFrameRate {
h265dp.dtsStep = float64(time.Second) / h265dp.meta.FrameRate h265dp.dtsStep = float64(time.Second) / h265dp.meta.FrameRate
} else {
h265dp.startOn = time.Now()
} }
h265dp.metaReady = true h265dp.metaReady = true
} }
frame.Pts = h265dp.rtp2ntp(rtpTimestamp) - basePts + ptsDelay frame.Pts = h265dp.rtp2ntp(rtpTimestamp) + ptsDelay
if h265dp.dtsStep > 0 { if h265dp.dtsStep > 0 {
frame.Dts = int64(h265dp.nextDts) frame.Dts = int64(h265dp.nextDts)
h265dp.nextDts += h265dp.dtsStep h265dp.nextDts += h265dp.dtsStep
} else { } else {
frame.Dts = int64(time.Now().Sub(h265dp.startOn)) frame.Dts = h265dp.syncClock.RelativeNtpNow()
} }
return h265dp.w.WriteFrame(frame) return h265dp.w.WriteFrame(frame)
} }

View File

@@ -22,6 +22,15 @@ type SyncClock struct {
// 与RTP数据包中的RTP时间戳具有相同的单位和随机初始值。 // 与RTP数据包中的RTP时间戳具有相同的单位和随机初始值。
RTPTime uint32 RTPTime uint32
RTPTimeUnit float64 // RTP时间单位每个RTP时间的纳秒数 RTPTimeUnit float64 // RTP时间单位每个RTP时间的纳秒数
initOn time.Time // 初始化时间
}
// Init 初始化同步时钟
func (sc *SyncClock) Init(clockRate int) {
sc.initOn = time.Now()
sc.NTPTime = sc.initOn.UnixNano()
sc.RTPTimeUnit = float64(time.Second) / float64(clockRate)
} }
// LocalTime 本地时间 // LocalTime 本地时间
@@ -41,8 +50,19 @@ func (sc *SyncClock) Decode(data []byte) (ok bool) {
return return
} }
// Rtp2Ntp . // GetRelativeNtp .
func (sc *SyncClock) Rtp2Ntp(rtptime uint32) int64 { func (sc *SyncClock) RelativeNtpNow() int64 {
return int64(time.Now().Sub(sc.initOn))
}
// RelativeNtp .
func (sc *SyncClock) RelativeNtp(rtptime uint32) int64 {
diff := int64(rtptime) - int64(sc.RTPTime)
return int64(float64(diff) * sc.RTPTimeUnit)
}
// AbsoluteNtp .
func (sc *SyncClock) AbsoluteNtp(rtptime uint32) int64 {
diff := int64(rtptime) - int64(sc.RTPTime) diff := int64(rtptime) - int64(sc.RTPTime)
return sc.NTPTime + int64(float64(diff)*sc.RTPTimeUnit) return sc.NTPTime + int64(float64(diff)*sc.RTPTimeUnit)
} }