refactoring mpegts muxer

This commit is contained in:
notch
2021-01-16 20:41:50 +08:00
parent 2a6a69a1ba
commit b7bd9ec7d7
6 changed files with 276 additions and 215 deletions

View File

@@ -0,0 +1,66 @@
// Copyright (c) 2019,CAOHONGJU All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package mpegts
import (
"fmt"
"github.com/cnotch/ipchub/av/codec"
"github.com/cnotch/ipchub/av/codec/aac"
)
// in ms, for aac flush the audio
const aacDelay = 100
type aacPacketizer struct {
meta *codec.AudioMeta
tsframeWriter FrameWriter
audioSps *aac.RawSPS
}
func NewAacPacketizer(meta *codec.AudioMeta, tsframeWriter FrameWriter) Packetizer {
ap := &aacPacketizer{
meta: meta,
tsframeWriter: tsframeWriter,
}
ap.prepareAsc()
return ap
}
func (ap *aacPacketizer) prepareAsc() (err error) {
if ap.audioSps != nil {
return
}
var asc aac.AudioSpecificConfig
asc.Decode(ap.meta.Sps)
if err = asc.Decode(ap.meta.Sps); err != nil {
return
}
if asc.ObjectType == aac.AOT_NULL || asc.ObjectType == aac.AOT_ESCAPE {
err = fmt.Errorf("tsmuxer decdoe audio aac sequence header failed, aac object type=%d", asc.ObjectType)
return
}
ap.audioSps = &asc
return
}
func (ap *aacPacketizer) Packetize(basePts int64, frame *codec.Frame) error {
pts := frame.AbsTimestamp - basePts + ptsDelay
pts *= 90
// set fields
tsframe := &Frame{
Pid: tsAudioPid,
StreamID: tsAudioAac,
Dts: pts,
Pts: pts,
Payload: frame.Payload,
}
tsframe.prepareAacHeader(ap.audioSps)
return ap.tsframeWriter.WriteMpegtsFrame(tsframe)
}

View File

@@ -0,0 +1,80 @@
// Copyright (c) 2019,CAOHONGJU All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package mpegts
import (
"github.com/cnotch/ipchub/av/codec"
"github.com/cnotch/ipchub/av/codec/h264"
)
type h264Packetizer struct {
meta *codec.VideoMeta
tsframeWriter FrameWriter
metaReady bool
nextDts float64
dtsStep float64
}
func NewH264Packetizer(meta *codec.VideoMeta, tsframeWriter FrameWriter) Packetizer {
h264p := &h264Packetizer{
meta: meta,
tsframeWriter: tsframeWriter,
}
h264p.prepareMetadata()
return h264p
}
func (h264p *h264Packetizer) prepareMetadata() error {
if h264p.metaReady {
return nil
}
if !h264.MetadataIsReady(h264p.meta) {
// not enough
return nil
}
if h264p.meta.FixedFrameRate {
h264p.dtsStep = 1000.0 / h264p.meta.FrameRate
} else { // TODO:
h264p.dtsStep = 1000.0 / 30
}
h264p.metaReady = true
return nil
}
func (h264p *h264Packetizer) Packetize(basePts int64, frame *codec.Frame) error {
if frame.Payload[0]&0x1F == h264.NalSps {
return h264p.prepareMetadata()
}
if frame.Payload[0]&0x1F == h264.NalPps {
return h264p.prepareMetadata()
}
dts := int64(h264p.nextDts)
h264p.nextDts += h264p.dtsStep
pts := frame.AbsTimestamp - basePts + ptsDelay
if dts > pts {
pts = dts
}
// set fields
tsframe := &Frame{
Pid: tsVideoPid,
StreamID: tsVideoAvc,
Dts: dts * 90,
Pts: pts * 90,
Payload: frame.Payload,
key: frame.Payload[0]&0x1F == h264.NalIdrSlice,
}
tsframe.prepareAvcHeader(h264p.meta.Sps, h264p.meta.Pps)
return h264p.tsframeWriter.WriteMpegtsFrame(tsframe)
}

View File

@@ -41,7 +41,7 @@ func TestMpegtsWriter(t *testing.T) {
var audio codec.AudioMeta var audio codec.AudioMeta
sdp.ParseMetadata(string(sdpraw), &video, &audio) sdp.ParseMetadata(string(sdpraw), &video, &audio)
writer, err := NewWriter(out) writer, err := NewWriter(out)
tsMuxer, _ := NewMuxerAvcAac(video, audio, writer, xlog.L()) tsMuxer, _ := NewMuxer(&video, &audio, writer, xlog.L())
rtpDemuxer, _ := rtp.NewDemuxer(&video, &audio, tsMuxer, xlog.L()) rtpDemuxer, _ := rtp.NewDemuxer(&video, &audio, tsMuxer, xlog.L())
channels := []int{int(rtp.ChannelVideo), int(rtp.ChannelVideoControl), int(rtp.ChannelAudio), int(rtp.ChannelAudioControl)} channels := []int{int(rtp.ChannelVideo), int(rtp.ChannelVideoControl), int(rtp.ChannelAudio), int(rtp.ChannelAudioControl)}

124
av/format/mpegts/muxer.go Normal file
View File

@@ -0,0 +1,124 @@
// Copyright (c) 2019,CAOHONGJU All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package mpegts
import (
"fmt"
"runtime/debug"
"github.com/cnotch/ipchub/av/codec"
"github.com/cnotch/queue"
"github.com/cnotch/xlog"
)
// Packetizer 封包器
type Packetizer interface {
Packetize(basePts int64, frame *codec.Frame) error
}
type emptyPacketizer struct{}
func (emptyPacketizer) Packetize(basePts int64, frame *codec.Frame) error { return nil }
// 网络播放时 PTSPresentation Time Stamp的延时
// 影响视频 Tag 的 CTS 和音频的 DTSDecoding Time Stamp
const (
ptsDelay = 1000
)
// Muxer mpegts muxer from av.Frame(H264[+AAC])
type Muxer struct {
recvQueue *queue.SyncQueue
closed bool
logger *xlog.Logger // 日志对象
}
// NewMuxer .
func NewMuxer(videoMeta *codec.VideoMeta, audioMeta *codec.AudioMeta, tsframeWriter FrameWriter, logger *xlog.Logger) (*Muxer, error) {
muxer := &Muxer{
recvQueue: queue.NewSyncQueue(),
closed: false,
logger: logger,
}
var vp Packetizer = emptyPacketizer{}
var ap Packetizer = emptyPacketizer{}
switch videoMeta.Codec {
case "H264":
vp = NewH264Packetizer(videoMeta, tsframeWriter)
default:
return nil, fmt.Errorf("ts muxer unsupport video codec type:%s", videoMeta.Codec)
}
switch audioMeta.Codec {
case "AAC":
ap = NewAacPacketizer(audioMeta, tsframeWriter)
default:
return nil, fmt.Errorf("ts muxer unsupport audio codec type:%s", videoMeta.Codec)
}
go muxer.process(vp, ap)
return muxer, nil
}
// WriteFrame .
func (muxer *Muxer) WriteFrame(frame *codec.Frame) error {
muxer.recvQueue.Push(frame)
return nil
}
// Close .
func (muxer *Muxer) Close() error {
if muxer.closed {
return nil
}
muxer.closed = true
muxer.recvQueue.Signal()
return nil
}
func (muxer *Muxer) process(vp, ap Packetizer) {
defer func() {
defer func() { // 避免 handler 再 panic
recover()
}()
if r := recover(); r != nil {
muxer.logger.Errorf("ts muxer routine panicr = %v \n %s", r, debug.Stack())
}
// 尽早通知GC回收内存
muxer.recvQueue.Reset()
}()
var basePts int64
for !muxer.closed {
f := muxer.recvQueue.Pop()
if f == nil {
if !muxer.closed {
muxer.logger.Warn("tsmuxer: receive nil frame")
}
continue
}
frame := f.(*codec.Frame)
if basePts == 0 {
basePts = frame.AbsTimestamp
}
switch frame.MediaType {
case codec.MediaTypeVideo:
if err := vp.Packetize(basePts, frame); err != nil {
muxer.logger.Errorf("tsmuxer: muxVideoTag error - %s", err.Error())
}
case codec.MediaTypeAudio:
if err := ap.Packetize(basePts, frame); err != nil {
muxer.logger.Errorf("tsmuxer: muxAudioTag error - %s", err.Error())
}
default:
}
}
}

View File

@@ -1,212 +0,0 @@
// Copyright (c) 2019,CAOHONGJU All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package mpegts
import (
"fmt"
"io"
"runtime/debug"
"github.com/cnotch/ipchub/av/codec"
"github.com/cnotch/ipchub/av/codec/aac"
"github.com/cnotch/ipchub/av/codec/h264"
"github.com/cnotch/queue"
"github.com/cnotch/xlog"
)
// 网络播放时 PTSPresentation Time Stamp的延时
// 影响视频 Tag 的 CTS 和音频的 DTSDecoding Time Stamp
const (
dtsDelay = 0
ptsDelay = 1000
)
// MuxerAvcAac flv muxer from av.Frame(H264[+AAC])
type MuxerAvcAac struct {
videoMeta codec.VideoMeta
audioMeta codec.AudioMeta
hasAudio bool
audioSps aac.RawSPS
recvQueue *queue.SyncQueue
tsframeWriter FrameWriter
closed bool
metaReady bool
basePts int64
nextDts float64
dtsStep float64
logger *xlog.Logger // 日志对象
}
// NewMuxerAvcAac .
func NewMuxerAvcAac(videoMeta codec.VideoMeta, audioMeta codec.AudioMeta, tsframeWriter FrameWriter, logger *xlog.Logger) (*MuxerAvcAac, error) {
muxer := &MuxerAvcAac{
recvQueue: queue.NewSyncQueue(),
videoMeta: videoMeta,
audioMeta: audioMeta,
hasAudio: audioMeta.Codec == "AAC",
tsframeWriter: tsframeWriter,
closed: false,
nextDts: dtsDelay,
logger: logger,
}
if videoMeta.FrameRate > 0 {
muxer.dtsStep = 1000.0 / videoMeta.FrameRate
}
if muxer.hasAudio {
if err := muxer.prepareAacSps(); err != nil {
return nil, err
}
}
go muxer.process()
return muxer, nil
}
func (muxer *MuxerAvcAac) prepareAacSps() (err error) {
if err = muxer.audioSps.Decode(muxer.audioMeta.Sps); err != nil {
return
}
if muxer.audioSps.ObjectType == aac.AOT_NULL || muxer.audioSps.ObjectType == aac.AOT_ESCAPE {
err = fmt.Errorf("tsmuxer decdoe audio aac sequence header failed, aac object type=%d", muxer.audioSps.ObjectType)
return
}
return
}
// WriteFrame .
func (muxer *MuxerAvcAac) WriteFrame(frame *codec.Frame) error {
muxer.recvQueue.Push(frame)
return nil
}
// Close .
func (muxer *MuxerAvcAac) Close() error {
if muxer.closed {
return nil
}
muxer.closed = true
muxer.recvQueue.Signal()
return nil
}
func (muxer *MuxerAvcAac) process() {
defer func() {
defer func() { // 避免 handler 再 panic
recover()
}()
if r := recover(); r != nil {
muxer.logger.Errorf("tsmuxer routine panicr = %v \n %s", r, debug.Stack())
}
// 尽早通知GC回收内存
muxer.recvQueue.Reset()
if closer, ok := muxer.tsframeWriter.(io.Closer); ok {
closer.Close()
}
}()
for !muxer.closed {
f := muxer.recvQueue.Pop()
if f == nil {
if !muxer.closed {
muxer.logger.Warn("tsmuxer:receive nil frame")
}
continue
}
frame := f.(*codec.Frame)
if muxer.basePts == 0 {
muxer.basePts = frame.AbsTimestamp
}
if frame.MediaType == codec.MediaTypeVideo {
if err := muxer.muxVideoTag(frame); err != nil {
muxer.logger.Errorf("tsmuxer: muxVideoFrame error - %s", err.Error())
}
} else {
if err := muxer.muxAudioTag(frame); err != nil {
muxer.logger.Errorf("tsmuxer: muxAudioFrame error - %s", err.Error())
}
}
}
}
func (muxer *MuxerAvcAac) muxVideoTag(frame *codec.Frame) (err error) {
if frame.Payload[0]&0x1F == h264.NalSps {
if len(muxer.videoMeta.Sps) == 0 {
muxer.videoMeta.Sps = frame.Payload
}
muxer.preparMetadata()
return
}
if frame.Payload[0]&0x1F == h264.NalPps {
if len(muxer.videoMeta.Pps) == 0 {
muxer.videoMeta.Pps = frame.Payload
}
muxer.preparMetadata()
return
}
dts := int64(muxer.nextDts)
muxer.nextDts += muxer.dtsStep
pts := frame.AbsTimestamp - muxer.basePts + ptsDelay
if dts > pts {
pts = dts
}
// set fields
tsframe := &Frame{
Pid: tsVideoPid,
StreamID: tsVideoAvc,
Dts: dts * 90,
Pts: pts * 90,
Payload: frame.Payload,
key: frame.Payload[0]&0x1F == h264.NalIdrSlice,
}
tsframe.prepareAvcHeader(muxer.videoMeta.Sps, muxer.videoMeta.Pps)
return muxer.tsframeWriter.WriteMpegtsFrame(tsframe)
}
func (muxer *MuxerAvcAac) preparMetadata() {
if muxer.metaReady {
return
}
if !h264.MetadataIsReady(&muxer.videoMeta) {
// not enough
return
}
if muxer.videoMeta.FixedFrameRate {
muxer.dtsStep = 1000.0 / muxer.videoMeta.FrameRate
} else { // TODO:
muxer.dtsStep = 1000.0 / 30
}
muxer.metaReady = true
}
func (muxer *MuxerAvcAac) muxAudioTag(frame *codec.Frame) error {
pts := frame.AbsTimestamp - muxer.basePts + ptsDelay
pts *= 90
// set fields
tsframe := &Frame{
Pid: tsAudioPid,
StreamID: tsAudioAac,
Dts: pts,
Pts: pts,
Payload: frame.Payload,
}
tsframe.prepareAacHeader(&muxer.audioSps)
return muxer.tsframeWriter.WriteMpegtsFrame(tsframe)
}

View File

@@ -55,7 +55,8 @@ type Stream struct {
flvMuxer flvMuxer flvMuxer flvMuxer
flvConsumptions consumptions flvConsumptions consumptions
flvCache packCache flvCache packCache
tsMuxer *mpegts.MuxerAvcAac tsMuxer *mpegts.Muxer
hlsSG *hls.SegmentGenerator
hlsPlaylist *hls.Playlist hlsPlaylist *hls.Playlist
attrs map[string]string // 流属性 attrs map[string]string // 流属性
multicast Multicastable multicast Multicastable
@@ -131,12 +132,13 @@ func (s *Stream) prepareOtherStream() {
if err != nil { if err != nil {
return return
} }
tsMuxer, err2 := mpegts.NewMuxerAvcAac(s.Video, s.Audio, sg, tsMuxer, err2 := mpegts.NewMuxer(&s.Video, &s.Audio, sg,
s.logger.With(xlog.Fields(xlog.F("extra", "ts.Muxer")))) s.logger.With(xlog.Fields(xlog.F("extra", "ts.Muxer"))))
if err2 != nil { if err2 != nil {
return return
} }
s.tsMuxer = tsMuxer s.tsMuxer = tsMuxer
s.hlsSG = sg
s.hlsPlaylist = hlsPlaylist s.hlsPlaylist = hlsPlaylist
} }
} }
@@ -179,6 +181,7 @@ func (s *Stream) close(status int32) error {
// 关闭 hls // 关闭 hls
if s.tsMuxer != nil { if s.tsMuxer != nil {
s.tsMuxer.Close() s.tsMuxer.Close()
s.hlsSG.Close()
s.hlsPlaylist.Close() s.hlsPlaylist.Close()
} }