From b7bd9ec7d7c2afc5f01dcaa078d14de7f5235733 Mon Sep 17 00:00:00 2001 From: notch Date: Sat, 16 Jan 2021 20:41:50 +0800 Subject: [PATCH] refactoring mpegts muxer --- av/format/mpegts/aac_packetizer.go | 66 +++++++++ av/format/mpegts/h264_packetizer.go | 80 +++++++++++ av/format/mpegts/mpegts_test.go | 2 +- av/format/mpegts/muxer.go | 124 ++++++++++++++++ av/format/mpegts/muxeravcaac.go | 212 ---------------------------- media/stream.go | 7 +- 6 files changed, 276 insertions(+), 215 deletions(-) create mode 100644 av/format/mpegts/aac_packetizer.go create mode 100644 av/format/mpegts/h264_packetizer.go create mode 100644 av/format/mpegts/muxer.go delete mode 100644 av/format/mpegts/muxeravcaac.go diff --git a/av/format/mpegts/aac_packetizer.go b/av/format/mpegts/aac_packetizer.go new file mode 100644 index 0000000..37b8822 --- /dev/null +++ b/av/format/mpegts/aac_packetizer.go @@ -0,0 +1,66 @@ +// Copyright (c) 2019,CAOHONGJU All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package mpegts + +import ( + "fmt" + + "github.com/cnotch/ipchub/av/codec" + "github.com/cnotch/ipchub/av/codec/aac" +) + +// in ms, for aac flush the audio +const aacDelay = 100 + +type aacPacketizer struct { + meta *codec.AudioMeta + tsframeWriter FrameWriter + audioSps *aac.RawSPS +} + +func NewAacPacketizer(meta *codec.AudioMeta, tsframeWriter FrameWriter) Packetizer { + ap := &aacPacketizer{ + meta: meta, + tsframeWriter: tsframeWriter, + } + ap.prepareAsc() + return ap +} + +func (ap *aacPacketizer) prepareAsc() (err error) { + if ap.audioSps != nil { + return + } + + var asc aac.AudioSpecificConfig + asc.Decode(ap.meta.Sps) + if err = asc.Decode(ap.meta.Sps); err != nil { + return + } + + if asc.ObjectType == aac.AOT_NULL || asc.ObjectType == aac.AOT_ESCAPE { + err = fmt.Errorf("tsmuxer decdoe audio aac sequence header failed, aac object type=%d", asc.ObjectType) + return + } + ap.audioSps = &asc + return +} + +func (ap *aacPacketizer) Packetize(basePts int64, frame *codec.Frame) error { + pts := frame.AbsTimestamp - basePts + ptsDelay + pts *= 90 + + // set fields + tsframe := &Frame{ + Pid: tsAudioPid, + StreamID: tsAudioAac, + Dts: pts, + Pts: pts, + Payload: frame.Payload, + } + + tsframe.prepareAacHeader(ap.audioSps) + return ap.tsframeWriter.WriteMpegtsFrame(tsframe) +} diff --git a/av/format/mpegts/h264_packetizer.go b/av/format/mpegts/h264_packetizer.go new file mode 100644 index 0000000..7c73e7b --- /dev/null +++ b/av/format/mpegts/h264_packetizer.go @@ -0,0 +1,80 @@ +// Copyright (c) 2019,CAOHONGJU All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package mpegts + +import ( + "github.com/cnotch/ipchub/av/codec" + "github.com/cnotch/ipchub/av/codec/h264" +) + +type h264Packetizer struct { + meta *codec.VideoMeta + tsframeWriter FrameWriter + metaReady bool + nextDts float64 + dtsStep float64 +} + +func NewH264Packetizer(meta *codec.VideoMeta, tsframeWriter FrameWriter) Packetizer { + h264p := &h264Packetizer{ + meta: meta, + tsframeWriter: tsframeWriter, + } + + h264p.prepareMetadata() + + return h264p +} + +func (h264p *h264Packetizer) prepareMetadata() error { + if h264p.metaReady { + return nil + } + + if !h264.MetadataIsReady(h264p.meta) { + // not enough + return nil + } + + if h264p.meta.FixedFrameRate { + h264p.dtsStep = 1000.0 / h264p.meta.FrameRate + } else { // TODO: + h264p.dtsStep = 1000.0 / 30 + } + h264p.metaReady = true + + return nil +} + +func (h264p *h264Packetizer) Packetize(basePts int64, frame *codec.Frame) error { + if frame.Payload[0]&0x1F == h264.NalSps { + return h264p.prepareMetadata() + } + + if frame.Payload[0]&0x1F == h264.NalPps { + return h264p.prepareMetadata() + } + + dts := int64(h264p.nextDts) + h264p.nextDts += h264p.dtsStep + pts := frame.AbsTimestamp - basePts + ptsDelay + if dts > pts { + pts = dts + } + + // set fields + tsframe := &Frame{ + Pid: tsVideoPid, + StreamID: tsVideoAvc, + Dts: dts * 90, + Pts: pts * 90, + Payload: frame.Payload, + key: frame.Payload[0]&0x1F == h264.NalIdrSlice, + } + + tsframe.prepareAvcHeader(h264p.meta.Sps, h264p.meta.Pps) + + return h264p.tsframeWriter.WriteMpegtsFrame(tsframe) +} diff --git a/av/format/mpegts/mpegts_test.go b/av/format/mpegts/mpegts_test.go index eafef69..9a23897 100644 --- a/av/format/mpegts/mpegts_test.go +++ b/av/format/mpegts/mpegts_test.go @@ -41,7 +41,7 @@ func TestMpegtsWriter(t *testing.T) { var audio codec.AudioMeta sdp.ParseMetadata(string(sdpraw), &video, &audio) writer, err := NewWriter(out) - tsMuxer, _ := NewMuxerAvcAac(video, audio, writer, xlog.L()) + tsMuxer, _ := NewMuxer(&video, &audio, writer, xlog.L()) rtpDemuxer, _ := rtp.NewDemuxer(&video, &audio, tsMuxer, xlog.L()) channels := []int{int(rtp.ChannelVideo), int(rtp.ChannelVideoControl), int(rtp.ChannelAudio), int(rtp.ChannelAudioControl)} diff --git a/av/format/mpegts/muxer.go b/av/format/mpegts/muxer.go new file mode 100644 index 0000000..1a937eb --- /dev/null +++ b/av/format/mpegts/muxer.go @@ -0,0 +1,124 @@ +// Copyright (c) 2019,CAOHONGJU All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package mpegts + +import ( + "fmt" + "runtime/debug" + + "github.com/cnotch/ipchub/av/codec" + "github.com/cnotch/queue" + "github.com/cnotch/xlog" +) + +// Packetizer 封包器 +type Packetizer interface { + Packetize(basePts int64, frame *codec.Frame) error +} + +type emptyPacketizer struct{} + +func (emptyPacketizer) Packetize(basePts int64, frame *codec.Frame) error { return nil } + +// 网络播放时 PTS(Presentation Time Stamp)的延时 +// 影响视频 Tag 的 CTS 和音频的 DTS(Decoding Time Stamp) +const ( + ptsDelay = 1000 +) + +// Muxer mpegts muxer from av.Frame(H264[+AAC]) +type Muxer struct { + recvQueue *queue.SyncQueue + closed bool + logger *xlog.Logger // 日志对象 +} + +// NewMuxer . +func NewMuxer(videoMeta *codec.VideoMeta, audioMeta *codec.AudioMeta, tsframeWriter FrameWriter, logger *xlog.Logger) (*Muxer, error) { + muxer := &Muxer{ + recvQueue: queue.NewSyncQueue(), + closed: false, + logger: logger, + } + var vp Packetizer = emptyPacketizer{} + var ap Packetizer = emptyPacketizer{} + + switch videoMeta.Codec { + case "H264": + vp = NewH264Packetizer(videoMeta, tsframeWriter) + default: + return nil, fmt.Errorf("ts muxer unsupport video codec type:%s", videoMeta.Codec) + } + + switch audioMeta.Codec { + case "AAC": + ap = NewAacPacketizer(audioMeta, tsframeWriter) + default: + return nil, fmt.Errorf("ts muxer unsupport audio codec type:%s", videoMeta.Codec) + } + + go muxer.process(vp, ap) + return muxer, nil +} + +// WriteFrame . +func (muxer *Muxer) WriteFrame(frame *codec.Frame) error { + muxer.recvQueue.Push(frame) + return nil +} + +// Close . +func (muxer *Muxer) Close() error { + if muxer.closed { + return nil + } + + muxer.closed = true + muxer.recvQueue.Signal() + return nil +} + +func (muxer *Muxer) process(vp, ap Packetizer) { + defer func() { + defer func() { // 避免 handler 再 panic + recover() + }() + + if r := recover(); r != nil { + muxer.logger.Errorf("ts muxer routine panic;r = %v \n %s", r, debug.Stack()) + } + + // 尽早通知GC,回收内存 + muxer.recvQueue.Reset() + }() + + var basePts int64 + for !muxer.closed { + f := muxer.recvQueue.Pop() + if f == nil { + if !muxer.closed { + muxer.logger.Warn("tsmuxer: receive nil frame") + } + continue + } + + frame := f.(*codec.Frame) + if basePts == 0 { + basePts = frame.AbsTimestamp + } + + switch frame.MediaType { + case codec.MediaTypeVideo: + if err := vp.Packetize(basePts, frame); err != nil { + muxer.logger.Errorf("tsmuxer: muxVideoTag error - %s", err.Error()) + } + case codec.MediaTypeAudio: + if err := ap.Packetize(basePts, frame); err != nil { + muxer.logger.Errorf("tsmuxer: muxAudioTag error - %s", err.Error()) + } + default: + } + } +} diff --git a/av/format/mpegts/muxeravcaac.go b/av/format/mpegts/muxeravcaac.go deleted file mode 100644 index e06c860..0000000 --- a/av/format/mpegts/muxeravcaac.go +++ /dev/null @@ -1,212 +0,0 @@ -// Copyright (c) 2019,CAOHONGJU All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package mpegts - -import ( - "fmt" - "io" - "runtime/debug" - - "github.com/cnotch/ipchub/av/codec" - "github.com/cnotch/ipchub/av/codec/aac" - "github.com/cnotch/ipchub/av/codec/h264" - "github.com/cnotch/queue" - "github.com/cnotch/xlog" -) - -// 网络播放时 PTS(Presentation Time Stamp)的延时 -// 影响视频 Tag 的 CTS 和音频的 DTS(Decoding Time Stamp) -const ( - dtsDelay = 0 - ptsDelay = 1000 -) - -// MuxerAvcAac flv muxer from av.Frame(H264[+AAC]) -type MuxerAvcAac struct { - videoMeta codec.VideoMeta - audioMeta codec.AudioMeta - hasAudio bool - audioSps aac.RawSPS - recvQueue *queue.SyncQueue - tsframeWriter FrameWriter - closed bool - metaReady bool - basePts int64 - nextDts float64 - dtsStep float64 - logger *xlog.Logger // 日志对象 -} - -// NewMuxerAvcAac . -func NewMuxerAvcAac(videoMeta codec.VideoMeta, audioMeta codec.AudioMeta, tsframeWriter FrameWriter, logger *xlog.Logger) (*MuxerAvcAac, error) { - muxer := &MuxerAvcAac{ - recvQueue: queue.NewSyncQueue(), - videoMeta: videoMeta, - audioMeta: audioMeta, - hasAudio: audioMeta.Codec == "AAC", - tsframeWriter: tsframeWriter, - closed: false, - nextDts: dtsDelay, - logger: logger, - } - - if videoMeta.FrameRate > 0 { - muxer.dtsStep = 1000.0 / videoMeta.FrameRate - } - - if muxer.hasAudio { - if err := muxer.prepareAacSps(); err != nil { - return nil, err - } - } - go muxer.process() - return muxer, nil -} - -func (muxer *MuxerAvcAac) prepareAacSps() (err error) { - if err = muxer.audioSps.Decode(muxer.audioMeta.Sps); err != nil { - return - } - - if muxer.audioSps.ObjectType == aac.AOT_NULL || muxer.audioSps.ObjectType == aac.AOT_ESCAPE { - err = fmt.Errorf("tsmuxer decdoe audio aac sequence header failed, aac object type=%d", muxer.audioSps.ObjectType) - return - } - return -} - -// WriteFrame . -func (muxer *MuxerAvcAac) WriteFrame(frame *codec.Frame) error { - muxer.recvQueue.Push(frame) - return nil -} - -// Close . -func (muxer *MuxerAvcAac) Close() error { - if muxer.closed { - return nil - } - - muxer.closed = true - muxer.recvQueue.Signal() - return nil -} - -func (muxer *MuxerAvcAac) process() { - defer func() { - defer func() { // 避免 handler 再 panic - recover() - }() - - if r := recover(); r != nil { - muxer.logger.Errorf("tsmuxer routine panic;r = %v \n %s", r, debug.Stack()) - } - - // 尽早通知GC,回收内存 - muxer.recvQueue.Reset() - if closer, ok := muxer.tsframeWriter.(io.Closer); ok { - closer.Close() - } - }() - - for !muxer.closed { - f := muxer.recvQueue.Pop() - if f == nil { - if !muxer.closed { - muxer.logger.Warn("tsmuxer:receive nil frame") - } - continue - } - - frame := f.(*codec.Frame) - if muxer.basePts == 0 { - muxer.basePts = frame.AbsTimestamp - } - - if frame.MediaType == codec.MediaTypeVideo { - if err := muxer.muxVideoTag(frame); err != nil { - muxer.logger.Errorf("tsmuxer: muxVideoFrame error - %s", err.Error()) - } - } else { - if err := muxer.muxAudioTag(frame); err != nil { - muxer.logger.Errorf("tsmuxer: muxAudioFrame error - %s", err.Error()) - } - } - } -} - -func (muxer *MuxerAvcAac) muxVideoTag(frame *codec.Frame) (err error) { - if frame.Payload[0]&0x1F == h264.NalSps { - if len(muxer.videoMeta.Sps) == 0 { - muxer.videoMeta.Sps = frame.Payload - } - muxer.preparMetadata() - return - } - - if frame.Payload[0]&0x1F == h264.NalPps { - if len(muxer.videoMeta.Pps) == 0 { - muxer.videoMeta.Pps = frame.Payload - } - muxer.preparMetadata() - return - } - - dts := int64(muxer.nextDts) - muxer.nextDts += muxer.dtsStep - pts := frame.AbsTimestamp - muxer.basePts + ptsDelay - if dts > pts { - pts = dts - } - - // set fields - tsframe := &Frame{ - Pid: tsVideoPid, - StreamID: tsVideoAvc, - Dts: dts * 90, - Pts: pts * 90, - Payload: frame.Payload, - key: frame.Payload[0]&0x1F == h264.NalIdrSlice, - } - - tsframe.prepareAvcHeader(muxer.videoMeta.Sps, muxer.videoMeta.Pps) - - return muxer.tsframeWriter.WriteMpegtsFrame(tsframe) -} - -func (muxer *MuxerAvcAac) preparMetadata() { - if muxer.metaReady { - return - } - - if !h264.MetadataIsReady(&muxer.videoMeta) { - // not enough - return - } - - if muxer.videoMeta.FixedFrameRate { - muxer.dtsStep = 1000.0 / muxer.videoMeta.FrameRate - } else { // TODO: - muxer.dtsStep = 1000.0 / 30 - } - muxer.metaReady = true -} - -func (muxer *MuxerAvcAac) muxAudioTag(frame *codec.Frame) error { - pts := frame.AbsTimestamp - muxer.basePts + ptsDelay - pts *= 90 - - // set fields - tsframe := &Frame{ - Pid: tsAudioPid, - StreamID: tsAudioAac, - Dts: pts, - Pts: pts, - Payload: frame.Payload, - } - - tsframe.prepareAacHeader(&muxer.audioSps) - return muxer.tsframeWriter.WriteMpegtsFrame(tsframe) -} diff --git a/media/stream.go b/media/stream.go index 758252d..31e7b92 100755 --- a/media/stream.go +++ b/media/stream.go @@ -55,7 +55,8 @@ type Stream struct { flvMuxer flvMuxer flvConsumptions consumptions flvCache packCache - tsMuxer *mpegts.MuxerAvcAac + tsMuxer *mpegts.Muxer + hlsSG *hls.SegmentGenerator hlsPlaylist *hls.Playlist attrs map[string]string // 流属性 multicast Multicastable @@ -131,12 +132,13 @@ func (s *Stream) prepareOtherStream() { if err != nil { return } - tsMuxer, err2 := mpegts.NewMuxerAvcAac(s.Video, s.Audio, sg, + tsMuxer, err2 := mpegts.NewMuxer(&s.Video, &s.Audio, sg, s.logger.With(xlog.Fields(xlog.F("extra", "ts.Muxer")))) if err2 != nil { return } s.tsMuxer = tsMuxer + s.hlsSG = sg s.hlsPlaylist = hlsPlaylist } } @@ -179,6 +181,7 @@ func (s *Stream) close(status int32) error { // 关闭 hls if s.tsMuxer != nil { s.tsMuxer.Close() + s.hlsSG.Close() s.hlsPlaylist.Close() }