refactoring RTP remuxer

This commit is contained in:
notch
2021-01-14 10:29:42 +08:00
parent 473c36f3a2
commit 21ed16382c
8 changed files with 104 additions and 78 deletions

View File

@@ -42,9 +42,7 @@ func TestFlvWriter(t *testing.T) {
writer, err := NewWriter(out, 5)
flvMuxer := NewMuxerAvcAac(video, audio, writer, xlog.L())
h264Depack := rtp.NewH264Depacketizer(flvMuxer)
mpesDepack := rtp.NewAacDepacketizer(flvMuxer, audio.SampleRate)
rtpDemuxer := rtp.NewDemuxer(h264Depack, mpesDepack, xlog.L())
rtpDemuxer,_ := rtp.NewDemuxer(&video,&audio,flvMuxer, xlog.L())
channels := []int{int(rtp.ChannelVideo), int(rtp.ChannelVideoControl), int(rtp.ChannelAudio), int(rtp.ChannelAudioControl)}
for {
packet, err := rtp.ReadPacket(reader, channels)

View File

@@ -42,9 +42,7 @@ func TestMpegtsWriter(t *testing.T) {
writer, err := NewWriter(out)
tsMuxer, _ := NewMuxerAvcAac(video, audio, writer, xlog.L())
h264Depack := rtp.NewH264Depacketizer(tsMuxer)
mpesDepack := rtp.NewAacDepacketizer(tsMuxer, audio.SampleRate)
rtpDemuxer := rtp.NewDemuxer(h264Depack, mpesDepack, xlog.L())
rtpDemuxer,_ := rtp.NewDemuxer(&video, &audio, tsMuxer, xlog.L())
channels := []int{int(rtp.ChannelVideo), int(rtp.ChannelVideoControl), int(rtp.ChannelAudio), int(rtp.ChannelAudioControl)}
for {
packet, err := rtp.ReadPacket(reader, channels)

View File

@@ -10,6 +10,7 @@ import (
)
type aacDepacketizer struct {
audio *codec.AudioMeta
w codec.FrameWriter
sizeLength int
indexLength int
@@ -18,13 +19,14 @@ type aacDepacketizer struct {
}
// NewAacDepacketizer 实例化 AAC 解包器
func NewAacDepacketizer(w codec.FrameWriter, rtpTimeUnit int) Depacketizer {
func NewAacDepacketizer(audio *codec.AudioMeta, w codec.FrameWriter) depacketizer {
fe := &aacDepacketizer{
audio: audio,
w: w,
sizeLength: 13,
indexLength: 3,
}
fe.syncClock.RTPTimeUnit = 1000.0 / float64(rtpTimeUnit)
fe.syncClock.RTPTimeUnit = 1000.0 / float64(audio.SampleRate)
return fe
}

View File

@@ -5,44 +5,55 @@
package rtp
import (
"fmt"
"runtime/debug"
"github.com/cnotch/ipchub/av/codec"
"github.com/cnotch/queue"
"github.com/cnotch/xlog"
)
// Depacketizer 解包器
type Depacketizer interface {
// depacketizer 解包器
type depacketizer interface {
Control(p *Packet) error
Depacketize(p *Packet) error
}
// Demuxer 帧转换器
type Demuxer struct {
closed bool
recvQueue *queue.SyncQueue
closed bool
recvQueue *queue.SyncQueue
depacketizeFuncs [4]func(packet *Packet) error
logger *xlog.Logger
logger *xlog.Logger
}
func emptyDepacketize(*Packet) error { return nil }
// NewDemuxer 创建 rtp.Packet 解封装处理器。
func NewDemuxer(videoDepacketizer Depacketizer, audioDepacketizer Depacketizer, logger *xlog.Logger) *Demuxer {
func NewDemuxer(video *codec.VideoMeta, audio *codec.AudioMeta, fw codec.FrameWriter, logger *xlog.Logger) (*Demuxer, error) {
fc := &Demuxer{
recvQueue: queue.NewSyncQueue(),
closed: false,
logger: logger,
}
if videoDepacketizer != nil {
fc.depacketizeFuncs[ChannelVideo] = videoDepacketizer.Depacketize
fc.depacketizeFuncs[ChannelVideoControl] = videoDepacketizer.Control
} else {
fc.depacketizeFuncs[ChannelVideo] = emptyDepacketize
fc.depacketizeFuncs[ChannelVideoControl] = emptyDepacketize
var videoDepacketizer, audioDepacketizer depacketizer
switch video.Codec {
case "H264":
videoDepacketizer = NewH264Depacketizer(video, fw)
case "H265":
videoDepacketizer = NewH265Depacketizer(video, fw)
}
if videoDepacketizer == nil {
return nil, fmt.Errorf("Unsupport video codec type:%s", video.Codec)
}
fc.depacketizeFuncs[ChannelVideo] = videoDepacketizer.Depacketize
fc.depacketizeFuncs[ChannelVideoControl] = videoDepacketizer.Control
if audio.Codec == "AAC" {
audioDepacketizer = NewAacDepacketizer(audio, fw)
}
if audioDepacketizer != nil {
fc.depacketizeFuncs[ChannelAudio] = audioDepacketizer.Depacketize
fc.depacketizeFuncs[ChannelAudioControl] = audioDepacketizer.Control
@@ -51,11 +62,11 @@ func NewDemuxer(videoDepacketizer Depacketizer, audioDepacketizer Depacketizer,
fc.depacketizeFuncs[ChannelAudioControl] = emptyDepacketize
}
go fc.convert()
return fc
go fc.process()
return fc, nil
}
func (dm *Demuxer) convert() {
func (dm *Demuxer) process() {
defer func() {
defer func() { // 避免 handler 再 panic
recover()

View File

@@ -7,20 +7,25 @@ package rtp
import (
"bufio"
"io"
"io/ioutil"
"os"
"testing"
"time"
"github.com/cnotch/ipchub/av/codec"
"github.com/cnotch/ipchub/av/codec/h264"
"github.com/cnotch/ipchub/av/format/sdp"
"github.com/cnotch/xlog"
"github.com/stretchr/testify/assert"
)
var demuxerTestCases = []struct {
sdpFile string
rtpFile string
frames frameWriter
}{
{"game.rtp", frameWriter{1354, 1937, 11, 0, 0}},
{"music.rtp", frameWriter{1505, 2569, 9, 0, 9}},
{"game.sdp", "game.rtp", frameWriter{1354, 1937, 11, 0, 0}},
{"music.sdp", "music.rtp", frameWriter{1505, 2569, 9, 0, 9}},
// {"4k.rtp", frameWriter{898, 1359, 28, 0, 27}},
}
@@ -28,6 +33,19 @@ func TestDemuxer(t *testing.T) {
channels := []int{int(ChannelVideo), int(ChannelVideoControl), int(ChannelAudio), int(ChannelAudioControl)}
for _, tt := range demuxerTestCases {
t.Run(tt.rtpFile, func(t *testing.T) {
sdpbytes, err := ioutil.ReadFile("../../../test/asserts/" + tt.sdpFile)
if err != nil {
t.Error(err)
return
}
var video codec.VideoMeta
var audio codec.AudioMeta
err = sdp.ParseMetadata(string(sdpbytes), &video, &audio)
if err != nil {
t.Error(err)
return
}
file, err := os.Open("../../../test/asserts/" + tt.rtpFile)
if err != nil {
t.Error(err)
@@ -37,8 +55,12 @@ func TestDemuxer(t *testing.T) {
reader := bufio.NewReader(file)
fw := &frameWriter{}
h264dp := NewH264Depacketizer(fw)
aacdp := NewAacDepacketizer(fw, 44100)
demuxer, err := NewDemuxer(&video, &audio, fw, xlog.L())
if err!=nil{
t.Error(err)
}
defer demuxer.Close()
for {
packet, err := ReadPacket(reader, channels)
if err == io.EOF {
@@ -48,21 +70,10 @@ func TestDemuxer(t *testing.T) {
t.Errorf("read packet error :%s", err.Error())
break
}
switch packet.Channel {
case ChannelAudio:
if err := aacdp.Depacketize(packet); err != nil {
t.Errorf("depacketiz aac error :%s", err.Error())
}
case ChannelVideo:
if err := h264dp.Depacketize(packet); err != nil {
t.Errorf("depacketiz h264 error :%s", err.Error())
}
case ChannelVideoControl:
h264dp.Control(packet)
case ChannelAudioControl:
aacdp.Control(packet)
}
demuxer.WriteRtpPacket(packet)
}
<-time.After(time.Second)
assert.Equal(t, tt.frames, *fw)
})
}

View File

@@ -13,17 +13,19 @@ import (
type h264Depacketizer struct {
fragments []*Packet // 分片包
video *codec.VideoMeta
w codec.FrameWriter
syncClock SyncClock
}
// NewH264Depacketizer 实例化 H264 帧提取器
func NewH264Depacketizer(w codec.FrameWriter) Depacketizer {
func NewH264Depacketizer(video *codec.VideoMeta, w codec.FrameWriter) depacketizer {
fe := &h264Depacketizer{
video: video,
fragments: make([]*Packet, 0, 16),
w: w,
}
fe.syncClock.RTPTimeUnit = 1000.0 / 90000
fe.syncClock.RTPTimeUnit = 1000.0 / float64(video.ClockRate)
return fe
}
@@ -63,15 +65,12 @@ func (h264dp *h264Depacketizer) Depacketize(packet *Packet) (err error) {
// | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | :...OPTIONAL RTP padding |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
if payload[0]&0x1f == h264.NalFillerData {
return
}
frame := &codec.Frame{
MediaType: codec.MediaTypeVideo,
AbsTimestamp: h264dp.rtp2ntp(packet.Timestamp),
Payload: payload,
}
err = h264dp.w.WriteFrame(frame)
err = h264dp.writeFrame(frame)
case naluType == h264.NalStapaInRtp:
err = h264dp.depacketizeStapa(packet)
case naluType == h264.NalFuAInRtp:
@@ -111,18 +110,17 @@ func (h264dp *h264Depacketizer) depacketizeStapa(packet *Packet) (err error) {
}
off += 2
if payload[off]&0x1f != h264.NalFillerData {
frame := &codec.Frame{
MediaType: codec.MediaTypeVideo,
AbsTimestamp: h264dp.rtp2ntp(packet.Timestamp),
Payload: make([]byte, nalSize),
}
copy(frame.Payload, payload[off:])
frame.Payload[0] = 0 | (header & 0x60) | (frame.Payload[0] & 0x1F)
if err = h264dp.w.WriteFrame(frame); err != nil {
return
}
frame := &codec.Frame{
MediaType: codec.MediaTypeVideo,
AbsTimestamp: h264dp.rtp2ntp(packet.Timestamp),
Payload: make([]byte, nalSize),
}
copy(frame.Payload, payload[off:])
frame.Payload[0] = 0 | (header & 0x60) | (frame.Payload[0] & 0x1F)
if err = h264dp.writeFrame(frame); err != nil {
return
}
off += int(nalSize)
if off >= len(payload) { // 扫描完成
break
@@ -152,9 +150,6 @@ func (h264dp *h264Depacketizer) depacketizeFuA(packet *Packet) (err error) {
// |S|E|R| Type |
// +---------------+
fuHeader := payload[1]
if fuHeader&0x1F == h264.NalFillerData {
return
}
if (fuHeader>>7)&1 == 1 { // 第一个分片包
h264dp.fragments = h264dp.fragments[:0]
@@ -190,7 +185,7 @@ func (h264dp *h264Depacketizer) depacketizeFuA(packet *Packet) (err error) {
// 清空分片缓存
h264dp.fragments = h264dp.fragments[:0]
err = h264dp.w.WriteFrame(frame)
err = h264dp.writeFrame(frame)
}
return
@@ -199,3 +194,20 @@ func (h264dp *h264Depacketizer) depacketizeFuA(packet *Packet) (err error) {
func (h264dp *h264Depacketizer) rtp2ntp(timestamp uint32) int64 {
return h264dp.syncClock.Rtp2Ntp(timestamp)
}
func (h264dp *h264Depacketizer) writeFrame(frame *codec.Frame) error {
nalType := frame.Payload[0] & 0x1f
switch nalType {
case h264.NalSps:
if len(h264dp.video.Sps) == 0 {
h264dp.video.Sps = frame.Payload
}
case h264.NalPps:
if len(h264dp.video.Pps) == 0 {
h264dp.video.Pps = frame.Payload
}
case h264.NalFillerData: // ?ignore...
return nil
}
return h264dp.w.WriteFrame(frame)
}

View File

@@ -17,7 +17,7 @@ type h265Depacketizer struct {
}
// NewH265Depacketizer 实例化 H265 帧提取器
func NewH265Depacketizer(video *codec.VideoMeta, w codec.FrameWriter) Depacketizer {
func NewH265Depacketizer(video *codec.VideoMeta, w codec.FrameWriter) depacketizer {
fe := &h265Depacketizer{
video: video,
fragments: make([]*Packet, 0, 16),

View File

@@ -100,20 +100,17 @@ func NewStream(path string, rawsdp string, options ...Option) *Stream {
}
func (s *Stream) prepareOtherStream() {
// steam(rtp)->frameconverter->stream(frame)->flvmuxer->stream(tag)
// steam(rtp)->rtpdemuxer->stream(frame)->flvmuxer->stream(tag)
s.flvCache = emptyCache{}
s.flvMuxer = emptyFlvMuxer{}
// prepare rtp.Packet -> av.Frame
var videoExtractor, audioExtractor rtp.Depacketizer
if s.Video.Codec == "H264" {
videoExtractor = rtp.NewH264Depacketizer(s)
}
if s.Audio.Codec == "AAC" {
audioExtractor = rtp.NewAacDepacketizer(s, s.Audio.SampleRate)
}
if videoExtractor == nil && audioExtractor == nil {
var err error
if s.rtpDemuxer, err = rtp.NewDemuxer(&s.Video, &s.Audio,
s, s.logger.With(xlog.Fields(xlog.F("extra", "rtp2frame")))); err != nil {
s.rtpDemuxer = emptyRtpDemuxer{}
} else {
s.rtpDemuxer = rtp.NewDemuxer(videoExtractor, audioExtractor,
s.logger.With(xlog.Fields(xlog.F("extra", "rtp2frame"))))
return
}
// prepare av.Frame -> flv.Tag
@@ -121,9 +118,6 @@ func (s *Stream) prepareOtherStream() {
s.flvCache = cache.NewFlvCache(config.CacheGop())
s.flvMuxer = flv.NewMuxerAvcAac(s.Video, s.Audio,
s, s.logger.With(xlog.Fields(xlog.F("extra", "frame2flv"))))
} else {
s.flvCache = emptyCache{}
s.flvMuxer = emptyFlvMuxer{}
}
// prepare av.Frame -> mpegts.Frame