refactoring dts and pts process

This commit is contained in:
notch
2021-01-18 11:44:04 +08:00
parent b7bd9ec7d7
commit 710cc55d88
14 changed files with 241 additions and 276 deletions

View File

@@ -74,9 +74,10 @@ func (mt *MediaType) unmarshalText(text string) bool {
// Frame 音视频完整帧 // Frame 音视频完整帧
type Frame struct { type Frame struct {
MediaType // 媒体类型 MediaType // 媒体类型
AbsTimestamp int64 // 绝对时间戳(主要用于表示 pts),单位为 ms 的 UNIX 时间 Dts int64 // DTS单位为 ns
Payload []byte // 媒体数据载荷 Pts int64 // PTS单位为 ns
Payload []byte // 媒体数据载荷
} }
// FrameWriter 包装 WriteFrame 方法的接口 // FrameWriter 包装 WriteFrame 方法的接口

View File

@@ -4,7 +4,11 @@
package flv package flv
import "github.com/cnotch/ipchub/av/codec" import (
"time"
"github.com/cnotch/ipchub/av/codec"
)
// in ms, for aac flush the audio // in ms, for aac flush the audio
const aacDelay = 100 const aacDelay = 100
@@ -13,7 +17,6 @@ type aacPacketizer struct {
meta *codec.AudioMeta meta *codec.AudioMeta
dataTemplate *AudioData dataTemplate *AudioData
tagWriter TagWriter tagWriter TagWriter
spsMuxed bool
} }
func NewAacPacketizer(meta *codec.AudioMeta, tagWriter TagWriter) Packetizer { func NewAacPacketizer(meta *codec.AudioMeta, tagWriter TagWriter) Packetizer {
@@ -61,11 +64,6 @@ func (ap *aacPacketizer) prepareTemplate() {
} }
func (ap *aacPacketizer) PacketizeSequenceHeader() error { func (ap *aacPacketizer) PacketizeSequenceHeader() error {
if ap.spsMuxed {
return nil
}
ap.spsMuxed = true
audioData := *ap.dataTemplate audioData := *ap.dataTemplate
audioData.AACPacketType = AACPacketTypeSequenceHeader audioData.AACPacketType = AACPacketTypeSequenceHeader
audioData.Body = ap.meta.Sps audioData.Body = ap.meta.Sps
@@ -81,12 +79,12 @@ func (ap *aacPacketizer) PacketizeSequenceHeader() error {
return ap.tagWriter.WriteFlvTag(tag) return ap.tagWriter.WriteFlvTag(tag)
} }
func (ap *aacPacketizer) Packetize(basePts int64, frame *codec.Frame) error { func (ap *aacPacketizer) Packetize(frame *codec.Frame) error {
audioData := *ap.dataTemplate audioData := *ap.dataTemplate
audioData.Body = frame.Payload audioData.Body = frame.Payload
data, _ := audioData.Marshal() data, _ := audioData.Marshal()
pts := aacDelay + frame.AbsTimestamp - basePts + ptsDelay pts := frame.Pts / int64(time.Millisecond)
tag := &Tag{ tag := &Tag{
TagType: TagTypeAudio, TagType: TagTypeAudio,
DataSize: uint32(len(data)), DataSize: uint32(len(data)),

View File

@@ -5,6 +5,8 @@
package flv package flv
import ( import (
"time"
"github.com/cnotch/ipchub/av/codec" "github.com/cnotch/ipchub/av/codec"
"github.com/cnotch/ipchub/av/codec/h264" "github.com/cnotch/ipchub/av/codec/h264"
) )
@@ -12,9 +14,6 @@ import (
type h264Packetizer struct { type h264Packetizer struct {
meta *codec.VideoMeta meta *codec.VideoMeta
tagWriter TagWriter tagWriter TagWriter
spsMuxed bool
nextDts float64
dtsStep float64
} }
func NewH264Packetizer(meta *codec.VideoMeta, tagWriter TagWriter) Packetizer { func NewH264Packetizer(meta *codec.VideoMeta, tagWriter TagWriter) Packetizer {
@@ -22,38 +21,17 @@ func NewH264Packetizer(meta *codec.VideoMeta, tagWriter TagWriter) Packetizer {
meta: meta, meta: meta,
tagWriter: tagWriter, tagWriter: tagWriter,
} }
if meta.FrameRate > 0 {
h264p.dtsStep = 1000.0 / meta.FrameRate
}
return h264p return h264p
} }
func (h264p *h264Packetizer) PacketizeSequenceHeader() error { func (h264p *h264Packetizer) PacketizeSequenceHeader() error {
if h264p.spsMuxed {
return nil
}
if !h264.MetadataIsReady(h264p.meta) {
// not enough
return nil
}
h264p.spsMuxed = true
if h264p.meta.FixedFrameRate {
h264p.dtsStep = 1000.0 / h264p.meta.FrameRate
} else { // TODO:
h264p.dtsStep = 1000.0 / 30
}
record := NewAVCDecoderConfigurationRecord(h264p.meta.Sps, h264p.meta.Pps) record := NewAVCDecoderConfigurationRecord(h264p.meta.Sps, h264p.meta.Pps)
body, _ := record.Marshal() body, _ := record.Marshal()
videoData := &VideoData{ videoData := &VideoData{
FrameType: FrameTypeKeyFrame, FrameType: FrameTypeKeyFrame,
CodecID: CodecIDAVC, CodecID: CodecIDAVC,
H2645PacketType: H2645PacketTypeSequenceHeader, H2645PacketType: H2645PacketTypeSequenceHeader,
CompositionTime: 0, CompositionTime: 0,
Body: body, Body: body,
} }
@@ -70,26 +48,15 @@ func (h264p *h264Packetizer) PacketizeSequenceHeader() error {
return h264p.tagWriter.WriteFlvTag(tag) return h264p.tagWriter.WriteFlvTag(tag)
} }
func (h264p *h264Packetizer) Packetize(basePts int64, frame *codec.Frame) error { func (h264p *h264Packetizer) Packetize(frame *codec.Frame) error {
if frame.Payload[0]&0x1F == h264.NalSps {
return h264p.PacketizeSequenceHeader()
}
if frame.Payload[0]&0x1F == h264.NalPps { dts := frame.Dts / int64(time.Millisecond)
return h264p.PacketizeSequenceHeader() pts := frame.Pts / int64(time.Millisecond)
}
dts := int64(h264p.nextDts)
h264p.nextDts += h264p.dtsStep
pts := frame.AbsTimestamp - basePts + ptsDelay
if dts > pts {
pts = dts
}
videoData := &VideoData{ videoData := &VideoData{
FrameType: FrameTypeInterFrame, FrameType: FrameTypeInterFrame,
CodecID: CodecIDAVC, CodecID: CodecIDAVC,
H2645PacketType: H2645PacketTypeNALU, H2645PacketType: H2645PacketTypeNALU,
CompositionTime: uint32(pts - dts), CompositionTime: uint32(pts - dts),
Body: frame.Payload, Body: frame.Payload,
} }

View File

@@ -5,6 +5,8 @@
package flv package flv
import ( import (
"time"
"github.com/cnotch/ipchub/av/codec" "github.com/cnotch/ipchub/av/codec"
"github.com/cnotch/ipchub/av/codec/hevc" "github.com/cnotch/ipchub/av/codec/hevc"
) )
@@ -12,9 +14,6 @@ import (
type h265Packetizer struct { type h265Packetizer struct {
meta *codec.VideoMeta meta *codec.VideoMeta
tagWriter TagWriter tagWriter TagWriter
spsMuxed bool
nextDts float64
dtsStep float64
} }
func NewH265Packetizer(meta *codec.VideoMeta, tagWriter TagWriter) Packetizer { func NewH265Packetizer(meta *codec.VideoMeta, tagWriter TagWriter) Packetizer {
@@ -22,31 +21,10 @@ func NewH265Packetizer(meta *codec.VideoMeta, tagWriter TagWriter) Packetizer {
meta: meta, meta: meta,
tagWriter: tagWriter, tagWriter: tagWriter,
} }
if meta.FrameRate > 0 {
h265p.dtsStep = 1000.0 / meta.FrameRate
}
return h265p return h265p
} }
func (h265p *h265Packetizer) PacketizeSequenceHeader() error { func (h265p *h265Packetizer) PacketizeSequenceHeader() error {
if h265p.spsMuxed {
return nil
}
if !hevc.MetadataIsReady(h265p.meta) {
// not enough
return nil
}
h265p.spsMuxed = true
if h265p.meta.FixedFrameRate {
h265p.dtsStep = 1000.0 / h265p.meta.FrameRate
} else { // TODO:
h265p.dtsStep = 1000.0 / 30
}
record := NewHEVCDecoderConfigurationRecord(h265p.meta.Vps, h265p.meta.Sps, h265p.meta.Pps) record := NewHEVCDecoderConfigurationRecord(h265p.meta.Vps, h265p.meta.Sps, h265p.meta.Pps)
body, _ := record.Marshal() body, _ := record.Marshal()
@@ -70,20 +48,10 @@ func (h265p *h265Packetizer) PacketizeSequenceHeader() error {
return h265p.tagWriter.WriteFlvTag(tag) return h265p.tagWriter.WriteFlvTag(tag)
} }
func (h265p *h265Packetizer) Packetize(basePts int64, frame *codec.Frame) error { func (h265p *h265Packetizer) Packetize(frame *codec.Frame) error {
nalType := (frame.Payload[0] >> 1) & 0x3f nalType := (frame.Payload[0] >> 1) & 0x3f
if nalType == hevc.NalVps || dts := frame.Dts / int64(time.Millisecond)
nalType == hevc.NalSps || pts := frame.Pts / int64(time.Millisecond)
nalType == hevc.NalPps {
return h265p.PacketizeSequenceHeader()
}
dts := int64(h265p.nextDts)
h265p.nextDts += h265p.dtsStep
pts := frame.AbsTimestamp - basePts + ptsDelay
if dts > pts {
pts = dts
}
videoData := &VideoData{ videoData := &VideoData{
FrameType: FrameTypeInterFrame, FrameType: FrameTypeInterFrame,

View File

@@ -18,19 +18,13 @@ import (
// Packetizer 封包器 // Packetizer 封包器
type Packetizer interface { type Packetizer interface {
PacketizeSequenceHeader() error PacketizeSequenceHeader() error
Packetize(basePts int64, frame *codec.Frame) error Packetize(frame *codec.Frame) error
} }
type emptyPacketizer struct{} type emptyPacketizer struct{}
func (emptyPacketizer) PacketizeSequenceHeader() error { return nil } func (emptyPacketizer) PacketizeSequenceHeader() error { return nil }
func (emptyPacketizer) Packetize(basePts int64, frame *codec.Frame) error { return nil } func (emptyPacketizer) Packetize(frame *codec.Frame) error { return nil }
// 网络播放时 PTSPresentation Time Stamp的延时
// 影响视频 Tag 的 CTS 和音频的 DTSDecoding Time Stamp
const (
ptsDelay = 1000
)
// Muxer flv muxer from av.Frame(H264[+AAC]) // Muxer flv muxer from av.Frame(H264[+AAC])
type Muxer struct { type Muxer struct {
@@ -113,7 +107,8 @@ func (muxer *Muxer) process() {
muxer.recvQueue.Reset() muxer.recvQueue.Reset()
}() }()
var basePts int64 var packSequenceHeader bool
for !muxer.closed { for !muxer.closed {
f := muxer.recvQueue.Pop() f := muxer.recvQueue.Pop()
if f == nil { if f == nil {
@@ -123,21 +118,22 @@ func (muxer *Muxer) process() {
continue continue
} }
frame := f.(*codec.Frame) if !packSequenceHeader{
if basePts == 0 {
basePts = frame.AbsTimestamp
muxer.muxMetadataTag() muxer.muxMetadataTag()
muxer.vp.PacketizeSequenceHeader() muxer.vp.PacketizeSequenceHeader()
muxer.ap.PacketizeSequenceHeader() muxer.ap.PacketizeSequenceHeader()
packSequenceHeader = true
} }
frame := f.(*codec.Frame)
switch frame.MediaType { switch frame.MediaType {
case codec.MediaTypeVideo: case codec.MediaTypeVideo:
if err := muxer.vp.Packetize(basePts, frame); err != nil { if err := muxer.vp.Packetize(frame); err != nil {
muxer.logger.Errorf("flvmuxer: muxVideoTag error - %s", err.Error()) muxer.logger.Errorf("flvmuxer: muxVideoTag error - %s", err.Error())
} }
case codec.MediaTypeAudio: case codec.MediaTypeAudio:
if err := muxer.ap.Packetize(basePts, frame); err != nil { if err := muxer.ap.Packetize(frame); err != nil {
muxer.logger.Errorf("flvmuxer: muxAudioTag error - %s", err.Error()) muxer.logger.Errorf("flvmuxer: muxAudioTag error - %s", err.Error())
} }
default: default:

View File

@@ -6,6 +6,7 @@ package mpegts
import ( import (
"fmt" "fmt"
"time"
"github.com/cnotch/ipchub/av/codec" "github.com/cnotch/ipchub/av/codec"
"github.com/cnotch/ipchub/av/codec/aac" "github.com/cnotch/ipchub/av/codec/aac"
@@ -48,9 +49,8 @@ func (ap *aacPacketizer) prepareAsc() (err error) {
return return
} }
func (ap *aacPacketizer) Packetize(basePts int64, frame *codec.Frame) error { func (ap *aacPacketizer) Packetize(frame *codec.Frame) error {
pts := frame.AbsTimestamp - basePts + ptsDelay pts := frame.Pts * 90000 / int64(time.Second) // 90000Hz
pts *= 90
// set fields // set fields
tsframe := &Frame{ tsframe := &Frame{

View File

@@ -5,6 +5,8 @@
package mpegts package mpegts
import ( import (
"time"
"github.com/cnotch/ipchub/av/codec" "github.com/cnotch/ipchub/av/codec"
"github.com/cnotch/ipchub/av/codec/h264" "github.com/cnotch/ipchub/av/codec/h264"
) )
@@ -12,9 +14,6 @@ import (
type h264Packetizer struct { type h264Packetizer struct {
meta *codec.VideoMeta meta *codec.VideoMeta
tsframeWriter FrameWriter tsframeWriter FrameWriter
metaReady bool
nextDts float64
dtsStep float64
} }
func NewH264Packetizer(meta *codec.VideoMeta, tsframeWriter FrameWriter) Packetizer { func NewH264Packetizer(meta *codec.VideoMeta, tsframeWriter FrameWriter) Packetizer {
@@ -22,56 +21,22 @@ func NewH264Packetizer(meta *codec.VideoMeta, tsframeWriter FrameWriter) Packeti
meta: meta, meta: meta,
tsframeWriter: tsframeWriter, tsframeWriter: tsframeWriter,
} }
h264p.prepareMetadata()
return h264p return h264p
} }
func (h264p *h264Packetizer) prepareMetadata() error { func (h264p *h264Packetizer) Packetize(frame *codec.Frame) error {
if h264p.metaReady { nalType := frame.Payload[0] & 0x1F
return nil
}
if !h264.MetadataIsReady(h264p.meta) {
// not enough
return nil
}
if h264p.meta.FixedFrameRate {
h264p.dtsStep = 1000.0 / h264p.meta.FrameRate
} else { // TODO:
h264p.dtsStep = 1000.0 / 30
}
h264p.metaReady = true
return nil
}
func (h264p *h264Packetizer) Packetize(basePts int64, frame *codec.Frame) error {
if frame.Payload[0]&0x1F == h264.NalSps {
return h264p.prepareMetadata()
}
if frame.Payload[0]&0x1F == h264.NalPps {
return h264p.prepareMetadata()
}
dts := int64(h264p.nextDts)
h264p.nextDts += h264p.dtsStep
pts := frame.AbsTimestamp - basePts + ptsDelay
if dts > pts {
pts = dts
}
dts := frame.Dts * 90000 / int64(time.Second) // 90000Hz
pts := frame.Pts * 90000 / int64(time.Second) // 90000Hz
// set fields // set fields
tsframe := &Frame{ tsframe := &Frame{
Pid: tsVideoPid, Pid: tsVideoPid,
StreamID: tsVideoAvc, StreamID: tsVideoAvc,
Dts: dts * 90, Dts: dts,
Pts: pts * 90, Pts: pts,
Payload: frame.Payload, Payload: frame.Payload,
key: frame.Payload[0]&0x1F == h264.NalIdrSlice, key: nalType == h264.NalIdrSlice,
} }
tsframe.prepareAvcHeader(h264p.meta.Sps, h264p.meta.Pps) tsframe.prepareAvcHeader(h264p.meta.Sps, h264p.meta.Pps)

View File

@@ -15,18 +15,12 @@ import (
// Packetizer 封包器 // Packetizer 封包器
type Packetizer interface { type Packetizer interface {
Packetize(basePts int64, frame *codec.Frame) error Packetize(frame *codec.Frame) error
} }
type emptyPacketizer struct{} type emptyPacketizer struct{}
func (emptyPacketizer) Packetize(basePts int64, frame *codec.Frame) error { return nil } func (emptyPacketizer) Packetize(frame *codec.Frame) error { return nil }
// 网络播放时 PTSPresentation Time Stamp的延时
// 影响视频 Tag 的 CTS 和音频的 DTSDecoding Time Stamp
const (
ptsDelay = 1000
)
// Muxer mpegts muxer from av.Frame(H264[+AAC]) // Muxer mpegts muxer from av.Frame(H264[+AAC])
type Muxer struct { type Muxer struct {
@@ -94,7 +88,6 @@ func (muxer *Muxer) process(vp, ap Packetizer) {
muxer.recvQueue.Reset() muxer.recvQueue.Reset()
}() }()
var basePts int64
for !muxer.closed { for !muxer.closed {
f := muxer.recvQueue.Pop() f := muxer.recvQueue.Pop()
if f == nil { if f == nil {
@@ -105,17 +98,14 @@ func (muxer *Muxer) process(vp, ap Packetizer) {
} }
frame := f.(*codec.Frame) frame := f.(*codec.Frame)
if basePts == 0 {
basePts = frame.AbsTimestamp
}
switch frame.MediaType { switch frame.MediaType {
case codec.MediaTypeVideo: case codec.MediaTypeVideo:
if err := vp.Packetize(basePts, frame); err != nil { if err := vp.Packetize(frame); err != nil {
muxer.logger.Errorf("tsmuxer: muxVideoTag error - %s", err.Error()) muxer.logger.Errorf("tsmuxer: muxVideoTag error - %s", err.Error())
} }
case codec.MediaTypeAudio: case codec.MediaTypeAudio:
if err := ap.Packetize(basePts, frame); err != nil { if err := ap.Packetize(frame); err != nil {
muxer.logger.Errorf("tsmuxer: muxAudioTag error - %s", err.Error()) muxer.logger.Errorf("tsmuxer: muxAudioTag error - %s", err.Error())
} }
default: default:

View File

@@ -5,33 +5,38 @@
package rtp package rtp
import ( import (
"time"
"github.com/cnotch/ipchub/av/codec" "github.com/cnotch/ipchub/av/codec"
"github.com/cnotch/ipchub/av/codec/aac" "github.com/cnotch/ipchub/av/codec/aac"
) )
type aacDepacketizer struct { type aacDepacketizer struct {
meta *codec.AudioMeta meta *codec.AudioMeta
w codec.FrameWriter w codec.FrameWriter
sizeLength int sizeLength int
indexLength int indexLength int
// depacketizeFunc func(packet *Packet) error
syncClock SyncClock syncClock SyncClock
} }
// NewAacDepacketizer 实例化 AAC 解包器 // NewAacDepacketizer 实例化 AAC 解包器
func NewAacDepacketizer(meta *codec.AudioMeta, w codec.FrameWriter) Depacketizer { func NewAacDepacketizer(meta *codec.AudioMeta, w codec.FrameWriter) Depacketizer {
fe := &aacDepacketizer{ aacdp := &aacDepacketizer{
meta: meta, meta: meta,
w: w, w: w,
sizeLength: 13, sizeLength: 13,
indexLength: 3, indexLength: 3,
} }
fe.syncClock.RTPTimeUnit = 1000.0 / float64(meta.SampleRate) aacdp.syncClock.RTPTimeUnit = float64(time.Second) / float64(meta.SampleRate)
return fe return aacdp
} }
func (aacdp *aacDepacketizer) Control(p *Packet) error { func (aacdp *aacDepacketizer) Control(basePts *int64, p *Packet) error {
aacdp.syncClock.Decode(p.Data) if ok := aacdp.syncClock.Decode(p.Data); ok {
if *basePts == 0 {
*basePts = aacdp.syncClock.NTPTime
}
}
return nil return nil
} }
@@ -53,14 +58,14 @@ func (aacdp *aacDepacketizer) Control(p *Packet) error {
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-| // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-|
// 当 sizelength=6;indexlength=2;indexdeltalength=2 时 // 当 sizelength=6;indexlength=2;indexdeltalength=2 时
// 单帧封装时rtp payload的长度 = AU-header-lengths(两个字节) + AU-header(6+2) + AU的长度 // 单帧封装时rtp payload的长度 = AU-header-lengths(两个字节) + AU-header(6+2) + AU的长度
func (aacdp *aacDepacketizer) Depacketize(packet *Packet) (err error) { func (aacdp *aacDepacketizer) Depacketize(basePts int64, packet *Packet) (err error) {
if aacdp.syncClock.NTPTime == 0 { // 未收到同步时钟信息,忽略任意包 if aacdp.syncClock.NTPTime == 0 { // 未收到同步时钟信息,忽略任意包
return return
} }
return aacdp.depacketizeFor2ByteAUHeader(packet) return aacdp.depacketizeFor2ByteAUHeader(basePts, packet)
} }
func (aacdp *aacDepacketizer) depacketizeFor2ByteAUHeader(packet *Packet) (err error) { func (aacdp *aacDepacketizer) depacketizeFor2ByteAUHeader(basePts int64, packet *Packet) (err error) {
payload := packet.Payload() payload := packet.Payload()
// AU-headers-length 2bytes // AU-headers-length 2bytes
@@ -76,10 +81,12 @@ func (aacdp *aacDepacketizer) depacketizeFor2ByteAUHeader(packet *Packet) (err e
for i := 0; i < int(auHeadersCount); i++ { for i := 0; i < int(auHeadersCount); i++ {
auHeader := uint16(0) | (uint16(auHeaders[0]) << 8) | uint16(auHeaders[1]) auHeader := uint16(0) | (uint16(auHeaders[0]) << 8) | uint16(auHeaders[1])
frameSize := auHeader >> aacdp.indexLength frameSize := auHeader >> aacdp.indexLength
pts := aacdp.rtp2ntp(frameTimeStamp) - basePts + ptsDelay
frame := &codec.Frame{ frame := &codec.Frame{
MediaType: codec.MediaTypeAudio, MediaType: codec.MediaTypeAudio,
AbsTimestamp: aacdp.rtp2ntp(frameTimeStamp), Dts: pts,
Payload: framesPayload[:frameSize], Pts: pts,
Payload: framesPayload[:frameSize],
} }
if err = aacdp.w.WriteFrame(frame); err != nil { if err = aacdp.w.WriteFrame(frame); err != nil {
return return
@@ -94,7 +101,7 @@ func (aacdp *aacDepacketizer) depacketizeFor2ByteAUHeader(packet *Packet) (err e
return return
} }
func (aacdp *aacDepacketizer) depacketizeFor1ByteAUHeader(packet *Packet) (err error) { func (aacdp *aacDepacketizer) depacketizeFor1ByteAUHeader(basePts int64, packet *Packet) (err error) {
payload := packet.Payload() payload := packet.Payload()
// AU-headers-length 2bytes // AU-headers-length 2bytes
@@ -110,10 +117,12 @@ func (aacdp *aacDepacketizer) depacketizeFor1ByteAUHeader(packet *Packet) (err e
for i := 0; i < int(auHeadersCount); i++ { for i := 0; i < int(auHeadersCount); i++ {
auHeader := auHeaders[0] auHeader := auHeaders[0]
frameSize := auHeader >> aacdp.indexLength frameSize := auHeader >> aacdp.indexLength
pts := aacdp.rtp2ntp(frameTimeStamp) - basePts + ptsDelay
frame := &codec.Frame{ frame := &codec.Frame{
MediaType: codec.MediaTypeAudio, MediaType: codec.MediaTypeAudio,
AbsTimestamp: aacdp.rtp2ntp(frameTimeStamp), Dts: pts,
Payload: framesPayload[:frameSize], Pts: pts,
Payload: framesPayload[:frameSize],
} }
if err = aacdp.w.WriteFrame(frame); err != nil { if err = aacdp.w.WriteFrame(frame); err != nil {
return return

View File

@@ -7,108 +7,123 @@ package rtp
import ( import (
"fmt" "fmt"
"runtime/debug" "runtime/debug"
"time"
"github.com/cnotch/ipchub/av/codec" "github.com/cnotch/ipchub/av/codec"
"github.com/cnotch/queue" "github.com/cnotch/queue"
"github.com/cnotch/xlog" "github.com/cnotch/xlog"
) )
// 网络播放时 PTSPresentation Time Stamp的延时
const (
ptsDelay = int64(time.Second)
)
// Depacketizer 解包器 // Depacketizer 解包器
type Depacketizer interface { type Depacketizer interface {
Control(p *Packet) error Control(basePts *int64, p *Packet) error
Depacketize(p *Packet) error Depacketize(basePts int64, p *Packet) error
} }
type emptyDepacketizer struct{}
func (emptyDepacketizer) Control(basePts *int64, p *Packet) error { return nil }
func (emptyDepacketizer) Depacketize(basePts int64, p *Packet) error { return nil }
// Demuxer 帧转换器 // Demuxer 帧转换器
type Demuxer struct { type Demuxer struct {
closed bool closed bool
recvQueue *queue.SyncQueue recvQueue *queue.SyncQueue
depacketizeFuncs [4]func(packet *Packet) error vdp Depacketizer
logger *xlog.Logger adp Depacketizer
logger *xlog.Logger
} }
func emptyDepacketize(*Packet) error { return nil } func emptyDepacketize(*int64, *Packet) error { return nil }
// NewDemuxer 创建 rtp.Packet 解封装处理器。 // NewDemuxer 创建 rtp.Packet 解封装处理器。
func NewDemuxer(video *codec.VideoMeta, audio *codec.AudioMeta, fw codec.FrameWriter, logger *xlog.Logger) (*Demuxer, error) { func NewDemuxer(video *codec.VideoMeta, audio *codec.AudioMeta, fw codec.FrameWriter, logger *xlog.Logger) (*Demuxer, error) {
fc := &Demuxer{ demuxer := &Demuxer{
recvQueue: queue.NewSyncQueue(), recvQueue: queue.NewSyncQueue(),
closed: false, closed: false,
logger: logger, logger: logger,
} }
var videoDepacketizer, audioDepacketizer Depacketizer
switch video.Codec { switch video.Codec {
case "H264": case "H264":
videoDepacketizer = NewH264Depacketizer(video, fw) demuxer.vdp = NewH264Depacketizer(video, fw)
case "H265": case "H265":
videoDepacketizer = NewH265Depacketizer(video, fw) demuxer.vdp = NewH265Depacketizer(video, fw)
default: default:
return nil, fmt.Errorf("rtp demuxer unsupport video codec type:%s", video.Codec) return nil, fmt.Errorf("rtp demuxer unsupport video codec type:%s", video.Codec)
} }
fc.depacketizeFuncs[ChannelVideo] = videoDepacketizer.Depacketize
fc.depacketizeFuncs[ChannelVideoControl] = videoDepacketizer.Control
if audio.Codec == "AAC" { if audio.Codec == "AAC" {
audioDepacketizer = NewAacDepacketizer(audio, fw) demuxer.adp = NewAacDepacketizer(audio, fw)
}
if audioDepacketizer != nil {
fc.depacketizeFuncs[ChannelAudio] = audioDepacketizer.Depacketize
fc.depacketizeFuncs[ChannelAudioControl] = audioDepacketizer.Control
} else { } else {
fc.depacketizeFuncs[ChannelAudio] = emptyDepacketize demuxer.adp = emptyDepacketizer{}
fc.depacketizeFuncs[ChannelAudioControl] = emptyDepacketize
} }
go fc.process() go demuxer.process()
return fc, nil return demuxer, nil
} }
func (dm *Demuxer) process() { func (demuxer *Demuxer) process() {
defer func() { defer func() {
defer func() { // 避免 handler 再 panic defer func() { // 避免 handler 再 panic
recover() recover()
}() }()
if r := recover(); r != nil { if r := recover(); r != nil {
dm.logger.Errorf("FrameConverter routine panicr = %v \n %s", r, debug.Stack()) demuxer.logger.Errorf("FrameConverter routine panicr = %v \n %s", r, debug.Stack())
} }
// 尽早通知GC回收内存 // 尽早通知GC回收内存
dm.recvQueue.Reset() demuxer.recvQueue.Reset()
}() }()
for !dm.closed { var basePts int64
p := dm.recvQueue.Pop() for !demuxer.closed {
p := demuxer.recvQueue.Pop()
if p == nil { if p == nil {
if !dm.closed { if !demuxer.closed {
dm.logger.Warn("FrameConverter:receive nil packet") demuxer.logger.Warn("FrameConverter:receive nil packet")
} }
continue continue
} }
packet := p.(*Packet) packet := p.(*Packet)
if err := dm.depacketizeFuncs[int(packet.Channel)](packet); err != nil { var err error
dm.logger.Errorf("FrameConverter: extract rtp frame error :%s", err.Error()) switch packet.Channel {
case ChannelVideo:
err = demuxer.vdp.Depacketize(basePts, packet)
case ChannelVideoControl:
err = demuxer.vdp.Control(&basePts, packet)
case ChannelAudio:
err = demuxer.adp.Depacketize(basePts, packet)
case ChannelAudioControl:
err = demuxer.adp.Control(&basePts, packet)
}
if err != nil {
demuxer.logger.Errorf("rtp demuxer: depackeetize rtp frame error :%s", err.Error())
// break // break
} }
} }
} }
// Close . // Close .
func (dm *Demuxer) Close() error { func (demuxer *Demuxer) Close() error {
if dm.closed { if demuxer.closed {
return nil return nil
} }
dm.closed = true demuxer.closed = true
dm.recvQueue.Signal() demuxer.recvQueue.Signal()
return nil return nil
} }
// WritePacket . // WritePacket .
func (fc *Demuxer) WriteRtpPacket(packet *Packet) error { func (demuxer *Demuxer) WriteRtpPacket(packet *Packet) error {
fc.recvQueue.Push(packet) demuxer.recvQueue.Push(packet)
return nil return nil
} }

View File

@@ -86,6 +86,10 @@ type frameWriter struct {
} }
func (fw *frameWriter) WriteFrame(frame *codec.Frame) (err error) { func (fw *frameWriter) WriteFrame(frame *codec.Frame) (err error) {
dts := frame.Dts / int64(time.Millisecond)
pts := frame.Pts / int64(time.Millisecond)
_ = dts
_ = pts
if frame.MediaType == codec.MediaTypeVideo { if frame.MediaType == codec.MediaTypeVideo {
fw.videoFrames++ fw.videoFrames++
} else { } else {

View File

@@ -6,6 +6,7 @@ package rtp
import ( import (
"fmt" "fmt"
"time"
"github.com/cnotch/ipchub/av/codec" "github.com/cnotch/ipchub/av/codec"
"github.com/cnotch/ipchub/av/codec/h264" "github.com/cnotch/ipchub/av/codec/h264"
@@ -13,28 +14,36 @@ import (
type h264Depacketizer struct { type h264Depacketizer struct {
fragments []*Packet // 分片包 fragments []*Packet // 分片包
meta *codec.VideoMeta meta *codec.VideoMeta
metaReady bool
nextDts float64
dtsStep float64
startOn time.Time
w codec.FrameWriter w codec.FrameWriter
syncClock SyncClock syncClock SyncClock
} }
// NewH264Depacketizer 实例化 H264 帧提取器 // NewH264Depacketizer 实例化 H264 帧提取器
func NewH264Depacketizer(meta *codec.VideoMeta, w codec.FrameWriter) Depacketizer { func NewH264Depacketizer(meta *codec.VideoMeta, w codec.FrameWriter) Depacketizer {
fe := &h264Depacketizer{ h264dp := &h264Depacketizer{
meta: meta, meta: meta,
fragments: make([]*Packet, 0, 16), fragments: make([]*Packet, 0, 16),
w: w, w: w,
} }
fe.syncClock.RTPTimeUnit = 1000.0 / float64(meta.ClockRate) h264dp.syncClock.RTPTimeUnit = float64(time.Second) / float64(meta.ClockRate)
return fe return h264dp
} }
func (h264dp *h264Depacketizer) Control(p *Packet) error { func (h264dp *h264Depacketizer) Control(basePts *int64, p *Packet) error {
h264dp.syncClock.Decode(p.Data) if ok := h264dp.syncClock.Decode(p.Data); ok {
if *basePts == 0 {
*basePts = h264dp.syncClock.NTPTime
}
}
return nil return nil
} }
func (h264dp *h264Depacketizer) Depacketize(packet *Packet) (err error) { func (h264dp *h264Depacketizer) Depacketize(basePts int64, packet *Packet) (err error) {
if h264dp.syncClock.NTPTime == 0 { // 未收到同步时钟信息,忽略任意包 if h264dp.syncClock.NTPTime == 0 { // 未收到同步时钟信息,忽略任意包
return return
} }
@@ -66,22 +75,21 @@ func (h264dp *h264Depacketizer) Depacketize(packet *Packet) (err error) {
// | :...OPTIONAL RTP padding | // | :...OPTIONAL RTP padding |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
frame := &codec.Frame{ frame := &codec.Frame{
MediaType: codec.MediaTypeVideo, MediaType: codec.MediaTypeVideo,
AbsTimestamp: h264dp.rtp2ntp(packet.Timestamp), Payload: payload,
Payload: payload,
} }
err = h264dp.writeFrame(frame) err = h264dp.writeFrame(basePts, packet.Timestamp, frame)
case naluType == h264.NalStapaInRtp: case naluType == h264.NalStapaInRtp:
err = h264dp.depacketizeStapa(packet) err = h264dp.depacketizeStapa(basePts, packet)
case naluType == h264.NalFuAInRtp: case naluType == h264.NalFuAInRtp:
err = h264dp.depacketizeFuA(packet) err = h264dp.depacketizeFuA(basePts, packet)
default: default:
err = fmt.Errorf("nalu type %d is currently not handled", naluType) err = fmt.Errorf("nalu type %d is currently not handled", naluType)
} }
return return
} }
func (h264dp *h264Depacketizer) depacketizeStapa(packet *Packet) (err error) { func (h264dp *h264Depacketizer) depacketizeStapa(basePts int64, packet *Packet) (err error) {
payload := packet.Payload() payload := packet.Payload()
header := payload[0] header := payload[0]
@@ -111,13 +119,12 @@ func (h264dp *h264Depacketizer) depacketizeStapa(packet *Packet) (err error) {
off += 2 off += 2
frame := &codec.Frame{ frame := &codec.Frame{
MediaType: codec.MediaTypeVideo, MediaType: codec.MediaTypeVideo,
AbsTimestamp: h264dp.rtp2ntp(packet.Timestamp), Payload: make([]byte, nalSize),
Payload: make([]byte, nalSize),
} }
copy(frame.Payload, payload[off:]) copy(frame.Payload, payload[off:])
frame.Payload[0] = 0 | (header & 0x60) | (frame.Payload[0] & 0x1F) frame.Payload[0] = 0 | (header & 0x60) | (frame.Payload[0] & 0x1F)
if err = h264dp.writeFrame(frame); err != nil { if err = h264dp.writeFrame(basePts, packet.Timestamp,frame); err != nil {
return return
} }
@@ -129,7 +136,7 @@ func (h264dp *h264Depacketizer) depacketizeStapa(packet *Packet) (err error) {
return return
} }
func (h264dp *h264Depacketizer) depacketizeFuA(packet *Packet) (err error) { func (h264dp *h264Depacketizer) depacketizeFuA(basePts int64, packet *Packet) (err error) {
payload := packet.Payload() payload := packet.Payload()
header := payload[0] header := payload[0]
@@ -171,9 +178,8 @@ func (h264dp *h264Depacketizer) depacketizeFuA(packet *Packet) (err error) {
} }
frame := &codec.Frame{ frame := &codec.Frame{
MediaType: codec.MediaTypeVideo, MediaType: codec.MediaTypeVideo,
AbsTimestamp: h264dp.rtp2ntp(packet.Timestamp), Payload: make([]byte, frameLen)}
Payload: make([]byte, frameLen)}
frame.Payload[0] = (header & 0x60) | (fuHeader & 0x1F) frame.Payload[0] = (header & 0x60) | (fuHeader & 0x1F)
offset := 1 offset := 1
@@ -185,7 +191,7 @@ func (h264dp *h264Depacketizer) depacketizeFuA(packet *Packet) (err error) {
// 清空分片缓存 // 清空分片缓存
h264dp.fragments = h264dp.fragments[:0] h264dp.fragments = h264dp.fragments[:0]
err = h264dp.writeFrame(frame) err = h264dp.writeFrame(basePts, packet.Timestamp,frame)
} }
return return
@@ -195,7 +201,7 @@ func (h264dp *h264Depacketizer) rtp2ntp(timestamp uint32) int64 {
return h264dp.syncClock.Rtp2Ntp(timestamp) return h264dp.syncClock.Rtp2Ntp(timestamp)
} }
func (h264dp *h264Depacketizer) writeFrame(frame *codec.Frame) error { func (h264dp *h264Depacketizer) writeFrame(basePts int64, rtpTimestamp uint32, frame *codec.Frame) error {
nalType := frame.Payload[0] & 0x1f nalType := frame.Payload[0] & 0x1f
switch nalType { switch nalType {
case h264.NalSps: case h264.NalSps:
@@ -209,5 +215,25 @@ func (h264dp *h264Depacketizer) writeFrame(frame *codec.Frame) error {
case h264.NalFillerData: // ?ignore... case h264.NalFillerData: // ?ignore...
return nil return nil
} }
if !h264dp.metaReady {
if !h264.MetadataIsReady(h264dp.meta) {
return nil
}
if h264dp.meta.FixedFrameRate {
h264dp.dtsStep = float64(time.Second) / h264dp.meta.FrameRate
} else {
h264dp.startOn = time.Now()
}
h264dp.metaReady = true
}
frame.Pts = h264dp.rtp2ntp(rtpTimestamp) - basePts+ptsDelay
if h264dp.dtsStep > 0 {
frame.Dts = int64(h264dp.nextDts)
h264dp.nextDts += h264dp.dtsStep
} else {
frame.Dts = int64(time.Now().Sub(h264dp.startOn))
}
return h264dp.w.WriteFrame(frame) return h264dp.w.WriteFrame(frame)
} }

View File

@@ -5,30 +5,40 @@
package rtp package rtp
import ( import (
"time"
"github.com/cnotch/ipchub/av/codec" "github.com/cnotch/ipchub/av/codec"
"github.com/cnotch/ipchub/av/codec/hevc" "github.com/cnotch/ipchub/av/codec/hevc"
) )
type h265Depacketizer struct { type h265Depacketizer struct {
fragments []*Packet // 分片包 fragments []*Packet // 分片包
meta *codec.VideoMeta meta *codec.VideoMeta
metaReady bool
nextDts float64
dtsStep float64
startOn time.Time
w codec.FrameWriter w codec.FrameWriter
syncClock SyncClock syncClock SyncClock
} }
// NewH265Depacketizer 实例化 H265 帧提取器 // NewH265Depacketizer 实例化 H265 帧提取器
func NewH265Depacketizer(meta *codec.VideoMeta, w codec.FrameWriter) Depacketizer { func NewH265Depacketizer(meta *codec.VideoMeta, w codec.FrameWriter) Depacketizer {
fe := &h265Depacketizer{ h265dp := &h265Depacketizer{
meta: meta, meta: meta,
fragments: make([]*Packet, 0, 16), fragments: make([]*Packet, 0, 16),
w: w, w: w,
} }
fe.syncClock.RTPTimeUnit = 1000.0 / float64(meta.ClockRate) h265dp.syncClock.RTPTimeUnit = float64(time.Second) / float64(meta.ClockRate)
return fe return h265dp
} }
func (h265dp *h265Depacketizer) Control(p *Packet) error { func (h265dp *h265Depacketizer) Control(basePts *int64, p *Packet) error {
h265dp.syncClock.Decode(p.Data) if ok := h265dp.syncClock.Decode(p.Data); ok {
if *basePts == 0 {
*basePts = h265dp.syncClock.NTPTime
}
}
return nil return nil
} }
@@ -56,7 +66,7 @@ func (h265dp *h265Depacketizer) Control(p *Packet) error {
* End fragment (E): 1 bit * End fragment (E): 1 bit
* FuType: 6 bits * FuType: 6 bits
*/ */
func (h265dp *h265Depacketizer) Depacketize(packet *Packet) (err error) { func (h265dp *h265Depacketizer) Depacketize(basePts int64, packet *Packet) (err error) {
if h265dp.syncClock.NTPTime == 0 { // 未收到同步时钟信息,忽略任意包 if h265dp.syncClock.NTPTime == 0 { // 未收到同步时钟信息,忽略任意包
return return
} }
@@ -70,21 +80,20 @@ func (h265dp *h265Depacketizer) Depacketize(packet *Packet) (err error) {
switch naluType { switch naluType {
case hevc.NalStapInRtp: // 在RTP中的聚合AP case hevc.NalStapInRtp: // 在RTP中的聚合AP
return h265dp.depacketizeStap(packet) return h265dp.depacketizeStap(basePts, packet)
case hevc.NalFuInRtp: // 在RTP中的扩展,分片(FU) case hevc.NalFuInRtp: // 在RTP中的扩展,分片(FU)
return h265dp.depacketizeFu(packet) return h265dp.depacketizeFu(basePts, packet)
default: default:
frame := &codec.Frame{ frame := &codec.Frame{
MediaType: codec.MediaTypeVideo, MediaType: codec.MediaTypeVideo,
AbsTimestamp: h265dp.rtp2ntp(packet.Timestamp), Payload: payload,
Payload: payload,
} }
err = h265dp.writeFrame(frame) err = h265dp.writeFrame(basePts, packet.Timestamp, frame)
return return
} }
} }
func (h265dp *h265Depacketizer) depacketizeStap(packet *Packet) (err error) { func (h265dp *h265Depacketizer) depacketizeStap(basePts int64, packet *Packet) (err error) {
payload := packet.Payload() payload := packet.Payload()
off := 2 // 跳过 STAP NAL HDR off := 2 // 跳过 STAP NAL HDR
@@ -98,12 +107,11 @@ func (h265dp *h265Depacketizer) depacketizeStap(packet *Packet) (err error) {
off += 2 off += 2
frame := &codec.Frame{ frame := &codec.Frame{
MediaType: codec.MediaTypeVideo, MediaType: codec.MediaTypeVideo,
AbsTimestamp: h265dp.rtp2ntp(packet.Timestamp), Payload: make([]byte, nalSize),
Payload: make([]byte, nalSize),
} }
copy(frame.Payload, payload[off:]) copy(frame.Payload, payload[off:])
if err = h265dp.writeFrame(frame); err != nil { if err = h265dp.writeFrame(basePts, packet.Timestamp, frame); err != nil {
return return
} }
off += int(nalSize) off += int(nalSize)
@@ -114,7 +122,7 @@ func (h265dp *h265Depacketizer) depacketizeStap(packet *Packet) (err error) {
return return
} }
func (h265dp *h265Depacketizer) depacketizeFu(packet *Packet) (err error) { func (h265dp *h265Depacketizer) depacketizeFu(basePts int64, packet *Packet) (err error) {
payload := packet.Payload() payload := packet.Payload()
rawDataOffset := 3 // 原始数据的偏移 = FU indicator + header rawDataOffset := 3 // 原始数据的偏移 = FU indicator + header
@@ -148,9 +156,8 @@ func (h265dp *h265Depacketizer) depacketizeFu(packet *Packet) (err error) {
} }
frame := &codec.Frame{ frame := &codec.Frame{
MediaType: codec.MediaTypeVideo, MediaType: codec.MediaTypeVideo,
AbsTimestamp: h265dp.rtp2ntp(packet.Timestamp), Payload: make([]byte, frameLen),
Payload: make([]byte, frameLen),
} }
frame.Payload[0] = (payload[0] & 0x81) | (fuHeader&0x3f)<<1 frame.Payload[0] = (payload[0] & 0x81) | (fuHeader&0x3f)<<1
@@ -164,7 +171,7 @@ func (h265dp *h265Depacketizer) depacketizeFu(packet *Packet) (err error) {
// 清空分片缓存 // 清空分片缓存
h265dp.fragments = h265dp.fragments[:0] h265dp.fragments = h265dp.fragments[:0]
err = h265dp.writeFrame(frame) err = h265dp.writeFrame(basePts, packet.Timestamp, frame)
} }
return return
@@ -174,7 +181,7 @@ func (h265dp *h265Depacketizer) rtp2ntp(timestamp uint32) int64 {
return h265dp.syncClock.Rtp2Ntp(timestamp) return h265dp.syncClock.Rtp2Ntp(timestamp)
} }
func (h265dp *h265Depacketizer) writeFrame(frame *codec.Frame) error { func (h265dp *h265Depacketizer) writeFrame(basePts int64, rtpTimestamp uint32, frame *codec.Frame) error {
nalType := (frame.Payload[0] >> 1) & 0x3f nalType := (frame.Payload[0] >> 1) & 0x3f
switch nalType { switch nalType {
case hevc.NalVps: case hevc.NalVps:
@@ -190,5 +197,25 @@ func (h265dp *h265Depacketizer) writeFrame(frame *codec.Frame) error {
h265dp.meta.Pps = frame.Payload h265dp.meta.Pps = frame.Payload
} }
} }
if !h265dp.metaReady {
if !hevc.MetadataIsReady(h265dp.meta) {
return nil
}
if h265dp.meta.FixedFrameRate {
h265dp.dtsStep = float64(time.Second) / h265dp.meta.FrameRate
} else {
h265dp.startOn = time.Now()
}
h265dp.metaReady = true
}
frame.Pts = h265dp.rtp2ntp(rtpTimestamp) - basePts + ptsDelay
if h265dp.dtsStep > 0 {
frame.Dts = int64(h265dp.nextDts)
h265dp.nextDts += h265dp.dtsStep
} else {
frame.Dts = int64(time.Now().Sub(h265dp.startOn))
}
return h265dp.w.WriteFrame(frame) return h265dp.w.WriteFrame(frame)
} }

View File

@@ -21,12 +21,12 @@ type SyncClock struct {
// RTP Timestamp与NTP时间戳对应 // RTP Timestamp与NTP时间戳对应
// 与RTP数据包中的RTP时间戳具有相同的单位和随机初始值。 // 与RTP数据包中的RTP时间戳具有相同的单位和随机初始值。
RTPTime uint32 RTPTime uint32
RTPTimeUnit float64 // RTP时间单位每个RTP时间的秒数 RTPTimeUnit float64 // RTP时间单位每个RTP时间的秒数
} }
// LocalTime 本地时间 // LocalTime 本地时间
func (sc *SyncClock) LocalTime() time.Time { func (sc *SyncClock) LocalTime() time.Time {
return time.Unix(0, sc.NTPTime*int64(time.Millisecond)).In(time.Local) return time.Unix(0, sc.NTPTime).In(time.Local)
} }
// Decode . // Decode .
@@ -36,7 +36,6 @@ func (sc *SyncClock) Decode(data []byte) (ok bool) {
lsw := binary.BigEndian.Uint32(data[12:]) lsw := binary.BigEndian.Uint32(data[12:])
sc.RTPTime = binary.BigEndian.Uint32(data[16:]) sc.RTPTime = binary.BigEndian.Uint32(data[16:])
sc.NTPTime = int64(msw-jan1970)*int64(time.Second) + (int64(lsw)*1000_000_000)>>32 sc.NTPTime = int64(msw-jan1970)*int64(time.Second) + (int64(lsw)*1000_000_000)>>32
sc.NTPTime /= int64(time.Millisecond)
ok = true ok = true
} }
return return