From cc27cf656360b12086570b957bb82cb52009eefa Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Mon, 21 Jul 2025 10:02:40 +0200 Subject: [PATCH] mpegts, srt: support MPEG-4 Audio LATM tracks (#4403) (#4759) --- go.mod | 2 +- go.sum | 4 +- internal/formatprocessor/mpeg4_audio_latm.go | 114 ++++++++++++++ internal/formatprocessor/mpeg4_video.go | 10 +- internal/formatprocessor/processor.go | 8 + internal/protocols/hls/from_stream.go | 57 +++++-- internal/protocols/mpegts/enhanced_reader.go | 78 ++++++++++ internal/protocols/mpegts/from_stream.go | 88 +++++++++-- internal/protocols/mpegts/to_stream.go | 63 +++++++- internal/protocols/mpegts/to_stream_test.go | 142 +++++++++++++++++- internal/protocols/rtmp/from_stream.go | 31 ++++ .../protocols/rtmp/message/opus_id_header.go | 2 +- internal/protocols/rtmp/writer.go | 12 +- internal/recorder/format_fmp4.go | 69 ++++++--- internal/recorder/format_mpegts.go | 48 +++++- internal/servers/srt/conn.go | 3 +- internal/staticsources/srt/source.go | 3 +- internal/staticsources/udp/source.go | 3 +- internal/unit/mpeg4_audio_latm.go | 7 + 19 files changed, 668 insertions(+), 76 deletions(-) create mode 100644 internal/formatprocessor/mpeg4_audio_latm.go create mode 100644 internal/protocols/mpegts/enhanced_reader.go create mode 100644 internal/unit/mpeg4_audio_latm.go diff --git a/go.mod b/go.mod index df482c12..2bf8545e 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/alecthomas/kong v1.12.0 github.com/asticode/go-astits v1.13.0 github.com/bluenviron/gohlslib/v2 v2.2.2 - github.com/bluenviron/gortsplib/v4 v4.15.0 + github.com/bluenviron/gortsplib/v4 v4.16.0 github.com/bluenviron/mediacommon/v2 v2.4.0 github.com/datarhei/gosrt v0.9.0 github.com/fsnotify/fsnotify v1.9.0 diff --git a/go.sum b/go.sum index 6c8caaa1..40acc71c 100644 --- a/go.sum +++ b/go.sum @@ -35,8 +35,8 @@ github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c h1:8XZeJrs4+ZYh github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c/go.mod h1:x1vxHcL/9AVzuk5HOloOEPrtJY0MaalYr78afXZ+pWI= github.com/bluenviron/gohlslib/v2 v2.2.2 h1:Q86VloPjwONKF8pu6jSEh9ENm4UzdMl5SzYvtjneL5k= github.com/bluenviron/gohlslib/v2 v2.2.2/go.mod h1:3Lby/VMDD/cN0B3uJPd3bEEiJZ34LqXs71FEvN/fq2k= -github.com/bluenviron/gortsplib/v4 v4.15.0 h1:R5mimKNlmzpUqcAVfCqkSznGk/2hl4Kk9LPFo2KZJeU= -github.com/bluenviron/gortsplib/v4 v4.15.0/go.mod h1:mqAxRuombKOUHREiKuKJ4VBjEC4U7VeMar4/G4Sbq04= +github.com/bluenviron/gortsplib/v4 v4.16.0 h1:qzJxlZXCv11oxNkNTAFMaeX0uEXJE0L6lDv3CKUYT/k= +github.com/bluenviron/gortsplib/v4 v4.16.0/go.mod h1:pcSNf/GToNEwdWy74moR4Tp5JWIEDJ0d9CzCSUPkiwM= github.com/bluenviron/mediacommon/v2 v2.4.0 h1:Ss1T7AMxTrICJ+a/N5urS/1lp1ZpsF+3iJq3B/RLDMw= github.com/bluenviron/mediacommon/v2 v2.4.0/go.mod h1:a6MbPmXtYda9mKibKVMZlW20GYLLrX2R7ZkUE+1pwV0= github.com/bytedance/sonic v1.13.2 h1:8/H1FempDZqC4VqjptGo14QQlJx8VdZJegxs6wwfqpQ= diff --git a/internal/formatprocessor/mpeg4_audio_latm.go b/internal/formatprocessor/mpeg4_audio_latm.go new file mode 100644 index 00000000..73e004f1 --- /dev/null +++ b/internal/formatprocessor/mpeg4_audio_latm.go @@ -0,0 +1,114 @@ +package formatprocessor + +import ( + "errors" + "fmt" + "time" + + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/gortsplib/v4/pkg/format/rtpfragmented" + "github.com/bluenviron/gortsplib/v4/pkg/format/rtpmpeg4audio" + "github.com/pion/rtp" + + "github.com/bluenviron/mediamtx/internal/logger" + "github.com/bluenviron/mediamtx/internal/unit" +) + +type mpeg4AudioLATM struct { + RTPMaxPayloadSize int + Format *format.MPEG4AudioLATM + GenerateRTPPackets bool + Parent logger.Writer + + encoder *rtpfragmented.Encoder + decoder *rtpfragmented.Decoder + randomStart uint32 +} + +func (t *mpeg4AudioLATM) initialize() error { + if t.GenerateRTPPackets { + err := t.createEncoder() + if err != nil { + return err + } + + t.randomStart, err = randUint32() + if err != nil { + return err + } + } + + return nil +} + +func (t *mpeg4AudioLATM) createEncoder() error { + t.encoder = &rtpfragmented.Encoder{ + PayloadMaxSize: t.RTPMaxPayloadSize, + PayloadType: t.Format.PayloadTyp, + } + return t.encoder.Init() +} + +func (t *mpeg4AudioLATM) ProcessUnit(uu unit.Unit) error { //nolint:dupl + u := uu.(*unit.MPEG4AudioLATM) + + pkts, err := t.encoder.Encode(u.Element) + if err != nil { + return err + } + u.RTPPackets = pkts + + for _, pkt := range u.RTPPackets { + pkt.Timestamp += t.randomStart + uint32(u.PTS) + } + + return nil +} + +func (t *mpeg4AudioLATM) ProcessRTPPacket( //nolint:dupl + pkt *rtp.Packet, + ntp time.Time, + pts int64, + hasNonRTSPReaders bool, +) (unit.Unit, error) { + u := &unit.MPEG4AudioLATM{ + Base: unit.Base{ + RTPPackets: []*rtp.Packet{pkt}, + NTP: ntp, + PTS: pts, + }, + } + + // remove padding + pkt.Padding = false + pkt.PaddingSize = 0 + + if len(pkt.Payload) > t.RTPMaxPayloadSize { + return nil, fmt.Errorf("RTP payload size (%d) is greater than maximum allowed (%d)", + len(pkt.Payload), t.RTPMaxPayloadSize) + } + + // decode from RTP + if hasNonRTSPReaders || t.decoder != nil { + if t.decoder == nil { + var err error + t.decoder, err = t.Format.CreateDecoder() + if err != nil { + return nil, err + } + } + + el, err := t.decoder.Decode(pkt) + if err != nil { + if errors.Is(err, rtpmpeg4audio.ErrMorePacketsNeeded) { + return u, nil + } + return nil, err + } + + u.Element = el + } + + // route packet as is + return u, nil +} diff --git a/internal/formatprocessor/mpeg4_video.go b/internal/formatprocessor/mpeg4_video.go index c7125ec5..9d65660f 100644 --- a/internal/formatprocessor/mpeg4_video.go +++ b/internal/formatprocessor/mpeg4_video.go @@ -7,7 +7,7 @@ import ( "time" "github.com/bluenviron/gortsplib/v4/pkg/format" - "github.com/bluenviron/gortsplib/v4/pkg/format/rtpmpeg4video" + "github.com/bluenviron/gortsplib/v4/pkg/format/rtpfragmented" "github.com/bluenviron/mediacommon/v2/pkg/codecs/mpeg4video" "github.com/pion/rtp" @@ -33,8 +33,8 @@ type mpeg4Video struct { GenerateRTPPackets bool Parent logger.Writer - encoder *rtpmpeg4video.Encoder - decoder *rtpmpeg4video.Decoder + encoder *rtpfragmented.Encoder + decoder *rtpfragmented.Decoder randomStart uint32 } @@ -55,7 +55,7 @@ func (t *mpeg4Video) initialize() error { } func (t *mpeg4Video) createEncoder() error { - t.encoder = &rtpmpeg4video.Encoder{ + t.encoder = &rtpfragmented.Encoder{ PayloadMaxSize: t.RTPMaxPayloadSize, PayloadType: t.Format.PayloadTyp, } @@ -154,7 +154,7 @@ func (t *mpeg4Video) ProcessRTPPacket( //nolint:dupl frame, err := t.decoder.Decode(pkt) if err != nil { - if errors.Is(err, rtpmpeg4video.ErrMorePacketsNeeded) { + if errors.Is(err, rtpfragmented.ErrMorePacketsNeeded) { return u, nil } return nil, err diff --git a/internal/formatprocessor/processor.go b/internal/formatprocessor/processor.go index 59fb999e..e35f6532 100644 --- a/internal/formatprocessor/processor.go +++ b/internal/formatprocessor/processor.go @@ -135,6 +135,14 @@ func New( Parent: parent, } + case *format.MPEG4AudioLATM: + proc = &mpeg4AudioLATM{ + RTPMaxPayloadSize: rtpMaxPayloadSize, + Format: forma, + GenerateRTPPackets: generateRTPPackets, + Parent: parent, + } + case *format.MPEG1Audio: proc = &mpeg1Audio{ RTPMaxPayloadSize: rtpMaxPayloadSize, diff --git a/internal/protocols/hls/from_stream.go b/internal/protocols/hls/from_stream.go index 4a2dd1d0..a3b4e003 100644 --- a/internal/protocols/hls/from_stream.go +++ b/internal/protocols/hls/from_stream.go @@ -9,6 +9,7 @@ import ( "github.com/bluenviron/gohlslib/v2/pkg/codecs" "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/mediacommon/v2/pkg/codecs/mpeg4audio" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/stream" "github.com/bluenviron/mediamtx/internal/unit" @@ -233,11 +234,41 @@ func setupAudioTracks( }) case *format.MPEG4Audio: - co := forma.GetConfig() - if co != nil { + track := &gohlslib.Track{ + Codec: &codecs.MPEG4Audio{ + Config: *forma.Config, + }, + ClockRate: forma.ClockRate(), + } + + addTrack( + media, + forma, + track, + func(u unit.Unit) error { + tunit := u.(*unit.MPEG4Audio) + + if tunit.AUs == nil { + return nil + } + + err := muxer.WriteMPEG4Audio( + track, + tunit.NTP, + tunit.PTS, // no conversion is needed since we set gohlslib.Track.ClockRate = format.ClockRate + tunit.AUs) + if err != nil { + return fmt.Errorf("muxer error: %w", err) + } + + return nil + }) + + case *format.MPEG4AudioLATM: + if !forma.CPresent { track := &gohlslib.Track{ Codec: &codecs.MPEG4Audio{ - Config: *co, + Config: *forma.StreamMuxConfig.Programs[0].Layers[0].AudioSpecificConfig, }, ClockRate: forma.ClockRate(), } @@ -247,22 +278,24 @@ func setupAudioTracks( forma, track, func(u unit.Unit) error { - tunit := u.(*unit.MPEG4Audio) + tunit := u.(*unit.MPEG4AudioLATM) - if tunit.AUs == nil { + if tunit.Element == nil { return nil } - err := muxer.WriteMPEG4Audio( + var ame mpeg4audio.AudioMuxElement + ame.StreamMuxConfig = forma.StreamMuxConfig + err := ame.Unmarshal(tunit.Element) + if err != nil { + return err + } + + return muxer.WriteMPEG4Audio( track, tunit.NTP, tunit.PTS, // no conversion is needed since we set gohlslib.Track.ClockRate = format.ClockRate - tunit.AUs) - if err != nil { - return fmt.Errorf("muxer error: %w", err) - } - - return nil + [][]byte{ame.Payloads[0][0][0]}) }) } } diff --git a/internal/protocols/mpegts/enhanced_reader.go b/internal/protocols/mpegts/enhanced_reader.go new file mode 100644 index 00000000..835c2acd --- /dev/null +++ b/internal/protocols/mpegts/enhanced_reader.go @@ -0,0 +1,78 @@ +package mpegts + +import ( + "io" + + "github.com/bluenviron/mediacommon/v2/pkg/codecs/mpeg4audio" + mcmpegts "github.com/bluenviron/mediacommon/v2/pkg/formats/mpegts" + "github.com/bluenviron/mediacommon/v2/pkg/rewindablereader" +) + +// EnhancedReader is a mpegts.Reader wrapper +// That provides additional informations that are needed in order +// to perform conversion to RTSP. +type EnhancedReader struct { + R io.Reader + + *mcmpegts.Reader + + latmConfigs map[uint16]*mpeg4audio.StreamMuxConfig +} + +// Initialize initializes EnhancedReader. +func (r *EnhancedReader) Initialize() error { + rr := &rewindablereader.Reader{R: r.R} + mr := &mcmpegts.Reader{R: rr} + err := mr.Initialize() + if err != nil { + return err + } + + r.latmConfigs = make(map[uint16]*mpeg4audio.StreamMuxConfig) + tracksToParse := 0 + + for _, track := range mr.Tracks() { + if _, ok := track.Codec.(*mcmpegts.CodecMPEG4AudioLATM); ok { + cpid := track.PID + done := false + tracksToParse++ + + mr.OnDataMPEG4AudioLATM(track, func(_ int64, els [][]byte) error { + if done { + return nil + } + + var ame mpeg4audio.AudioMuxElement + ame.MuxConfigPresent = true + err2 := ame.Unmarshal(els[0]) + if err2 != nil { + return nil //nolint:nilerr + } + + if ame.MuxConfigPresent { + r.latmConfigs[cpid] = ame.StreamMuxConfig + tracksToParse-- + done = true + } + + return nil + }) + } + } + + for tracksToParse > 0 { + err = mr.Read() + if err != nil { + return err + } + } + + rr.Rewind() + r.Reader = &mcmpegts.Reader{R: rr} + err = r.Reader.Initialize() + if err != nil { + return err + } + + return err +} diff --git a/internal/protocols/mpegts/from_stream.go b/internal/protocols/mpegts/from_stream.go index 2311f3f8..c227bd92 100644 --- a/internal/protocols/mpegts/from_stream.go +++ b/internal/protocols/mpegts/from_stream.go @@ -10,6 +10,7 @@ import ( "github.com/bluenviron/mediacommon/v2/pkg/codecs/ac3" "github.com/bluenviron/mediacommon/v2/pkg/codecs/h264" "github.com/bluenviron/mediacommon/v2/pkg/codecs/h265" + "github.com/bluenviron/mediacommon/v2/pkg/codecs/mpeg4audio" mcmpegts "github.com/bluenviron/mediacommon/v2/pkg/formats/mpegts" srt "github.com/datarhei/gosrt" @@ -255,27 +256,89 @@ func FromStream( }) case *format.MPEG4Audio: - co := forma.GetConfig() - if co != nil { - track := &mcmpegts.Track{Codec: &mcmpegts.CodecMPEG4Audio{ - Config: *co, - }} + track := &mcmpegts.Track{Codec: &mcmpegts.CodecMPEG4Audio{ + Config: *forma.Config, + }} + addTrack( + media, + forma, + track, + func(u unit.Unit) error { + tunit := u.(*unit.MPEG4Audio) + if tunit.AUs == nil { + return nil + } + + sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) + err := (*w).WriteMPEG4Audio( + track, + multiplyAndDivide(tunit.PTS, 90000, int64(clockRate)), + tunit.AUs) + if err != nil { + return err + } + return bw.Flush() + }) + + case *format.MPEG4AudioLATM: + track := &mcmpegts.Track{Codec: &mcmpegts.CodecMPEG4AudioLATM{}} + + if !forma.CPresent { addTrack( media, forma, track, func(u unit.Unit) error { - tunit := u.(*unit.MPEG4Audio) - if tunit.AUs == nil { + tunit := u.(*unit.MPEG4AudioLATM) + if tunit.Element == nil { + return nil + } + + var elIn mpeg4audio.AudioMuxElement + elIn.MuxConfigPresent = false + elIn.StreamMuxConfig = forma.StreamMuxConfig + err := elIn.Unmarshal(tunit.Element) + if err != nil { + return err + } + + var elOut mpeg4audio.AudioMuxElement + elOut.MuxConfigPresent = true + elOut.StreamMuxConfig = forma.StreamMuxConfig + elOut.UseSameStreamMux = false + elOut.Payloads = elIn.Payloads + buf, err := elOut.Marshal() + if err != nil { + return err + } + + sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) + err = (*w).WriteMPEG4AudioLATM( + track, + multiplyAndDivide(tunit.PTS, 90000, int64(clockRate)), + [][]byte{buf}) + if err != nil { + return err + } + return bw.Flush() + }) + } else { + addTrack( + media, + forma, + track, + func(u unit.Unit) error { + tunit := u.(*unit.MPEG4AudioLATM) + if tunit.Element == nil { return nil } sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) - err := (*w).WriteMPEG4Audio( + err := (*w).WriteMPEG4AudioLATM( track, multiplyAndDivide(tunit.PTS, 90000, int64(clockRate)), - tunit.AUs) + [][]byte{tunit.Element}) if err != nil { return err } @@ -353,10 +416,5 @@ func FromStream( } w = &mcmpegts.Writer{W: bw, Tracks: tracks} - err := w.Initialize() - if err != nil { - panic(err) - } - - return nil + return w.Initialize() } diff --git a/internal/protocols/mpegts/to_stream.go b/internal/protocols/mpegts/to_stream.go index cc3291e6..012adca0 100644 --- a/internal/protocols/mpegts/to_stream.go +++ b/internal/protocols/mpegts/to_stream.go @@ -3,10 +3,13 @@ package mpegts import ( "errors" + "fmt" + "reflect" "time" "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/mediacommon/v2/pkg/codecs/mpeg4audio" "github.com/bluenviron/mediacommon/v2/pkg/formats/mpegts" "github.com/bluenviron/mediamtx/internal/logger" @@ -20,7 +23,7 @@ var errNoSupportedCodecs = errors.New( // ToStream maps a MPEG-TS stream to a MediaMTX stream. func ToStream( - r *mpegts.Reader, + r *EnhancedReader, stream **stream.Stream, l logger.Writer, ) ([]*description.Media, error) { @@ -184,6 +187,64 @@ func ToStream( return nil }) + case *mpegts.CodecMPEG4AudioLATM: + // We are dealing with a LATM stream with in-band configuration. + // Although in theory this can be streamed with RTSP (RFC6416 with cpresent=1), + // in practice there is no player that supports it. + // Therefore, convert the stream to a LATM stream with out-of-band configuration. + streamMuxConfig := r.latmConfigs[track.PID] + medi = &description.Media{ + Type: description.MediaTypeAudio, + Formats: []format.Format{&format.MPEG4AudioLATM{ + PayloadTyp: 96, + CPresent: false, + ProfileLevelID: 30, + StreamMuxConfig: streamMuxConfig, + }}, + } + clockRate := medi.Formats[0].ClockRate() + + r.OnDataMPEG4AudioLATM(track, func(pts int64, els [][]byte) error { + pts = td.Decode(pts) + + pts = multiplyAndDivide(pts, int64(clockRate), 90000) + + for _, el := range els { + var elIn mpeg4audio.AudioMuxElement + elIn.MuxConfigPresent = true + elIn.StreamMuxConfig = streamMuxConfig + err := elIn.Unmarshal(el) + if err != nil { + return err + } + + if !reflect.DeepEqual(elIn.StreamMuxConfig, streamMuxConfig) { + return fmt.Errorf("dynamic stream mux config is not supported") + } + + var elOut mpeg4audio.AudioMuxElement + elOut.MuxConfigPresent = false + elOut.StreamMuxConfig = streamMuxConfig + elOut.Payloads = elIn.Payloads + buf, err := elOut.Marshal() + if err != nil { + return err + } + + (*stream).WriteUnit(medi, medi.Formats[0], &unit.MPEG4AudioLATM{ + Base: unit.Base{ + NTP: time.Now(), + PTS: pts, + }, + Element: buf, + }) + + pts += mpeg4audio.SamplesPerAccessUnit + } + + return nil + }) + case *mpegts.CodecMPEG1Audio: medi = &description.Media{ Type: description.MediaTypeAudio, diff --git a/internal/protocols/mpegts/to_stream_test.go b/internal/protocols/mpegts/to_stream_test.go index de3b38ae..6be5c5d8 100644 --- a/internal/protocols/mpegts/to_stream_test.go +++ b/internal/protocols/mpegts/to_stream_test.go @@ -7,12 +7,148 @@ import ( "testing" "github.com/asticode/go-astits" - "github.com/bluenviron/mediacommon/v2/pkg/formats/mpegts" + "github.com/bluenviron/gortsplib/v4/pkg/description" + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/mediacommon/v2/pkg/codecs/mpeg4audio" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/test" "github.com/stretchr/testify/require" ) +func TestToStream(t *testing.T) { + for _, ca := range []string{ + "h265", + "h264", + "mpeg-4 audio latm", + } { + t.Run(ca, func(t *testing.T) { + var buf bytes.Buffer + mux := astits.NewMuxer(context.Background(), &buf) + + switch ca { + case "h265": + err := mux.AddElementaryStream(astits.PMTElementaryStream{ + ElementaryPID: 122, + StreamType: astits.StreamTypeH265Video, + }) + require.NoError(t, err) + + mux.SetPCRPID(122) + + _, err = mux.WriteTables() + require.NoError(t, err) + + case "h264": + err := mux.AddElementaryStream(astits.PMTElementaryStream{ + ElementaryPID: 122, + StreamType: astits.StreamTypeH264Video, + }) + require.NoError(t, err) + + mux.SetPCRPID(122) + + _, err = mux.WriteTables() + require.NoError(t, err) + + case "mpeg-4 audio latm": + err := mux.AddElementaryStream(astits.PMTElementaryStream{ + ElementaryPID: 122, + StreamType: astits.StreamTypeAACLATMAudio, + }) + require.NoError(t, err) + + mux.SetPCRPID(122) + + enc1, err := mpeg4audio.AudioMuxElement{ + MuxConfigPresent: true, + StreamMuxConfig: &mpeg4audio.StreamMuxConfig{ + Programs: []*mpeg4audio.StreamMuxConfigProgram{{ + Layers: []*mpeg4audio.StreamMuxConfigLayer{{ + AudioSpecificConfig: &mpeg4audio.AudioSpecificConfig{ + Type: 2, + SampleRate: 48000, + ChannelCount: 2, + }, + LatmBufferFullness: 255, + }}, + }}, + }, + Payloads: [][][][]byte{{{{1, 2, 3, 4}}}}, + }.Marshal() + require.NoError(t, err) + + enc2, err := mpeg4audio.AudioSyncStream{ + AudioMuxElements: [][]byte{enc1}, + }.Marshal() + require.NoError(t, err) + + _, err = mux.WriteData(&astits.MuxerData{ + PID: 122, + PES: &astits.PESData{ + Header: &astits.PESHeader{ + OptionalHeader: &astits.PESOptionalHeader{ + MarkerBits: 2, + PTSDTSIndicator: astits.PTSDTSIndicatorOnlyPTS, + PTS: &astits.ClockReference{Base: 90000}, + }, + StreamID: 192, + }, + Data: enc2, + }, + }) + require.NoError(t, err) + } + + r := &EnhancedReader{R: &buf} + err := r.Initialize() + require.NoError(t, err) + + desc, err := ToStream(r, nil, nil) + require.NoError(t, err) + + switch ca { + case "h265": + require.Equal(t, []*description.Media{{ + Type: description.MediaTypeVideo, + Formats: []format.Format{&format.H265{ + PayloadTyp: 96, + }}, + }}, desc) + + case "h264": + require.Equal(t, []*description.Media{{ + Type: description.MediaTypeVideo, + Formats: []format.Format{&format.H264{ + PayloadTyp: 96, + PacketizationMode: 1, + }}, + }}, desc) + + case "mpeg-4 audio latm": + require.Equal(t, []*description.Media{{ + Type: description.MediaTypeAudio, + Formats: []format.Format{&format.MPEG4AudioLATM{ + PayloadTyp: 96, + ProfileLevelID: 30, + StreamMuxConfig: &mpeg4audio.StreamMuxConfig{ + Programs: []*mpeg4audio.StreamMuxConfigProgram{{ + Layers: []*mpeg4audio.StreamMuxConfigLayer{{ + AudioSpecificConfig: &mpeg4audio.AudioSpecificConfig{ + Type: 2, + SampleRate: 48000, + ChannelCount: 2, + }, + LatmBufferFullness: 255, + }}, + }}, + }, + }}, + }}, desc) + } + }) + } +} + func TestToStreamNoSupportedCodecs(t *testing.T) { var buf bytes.Buffer mux := astits.NewMuxer(context.Background(), &buf) @@ -28,7 +164,7 @@ func TestToStreamNoSupportedCodecs(t *testing.T) { _, err = mux.WriteTables() require.NoError(t, err) - r := &mpegts.Reader{R: &buf} + r := &EnhancedReader{R: &buf} err = r.Initialize() require.NoError(t, err) @@ -60,7 +196,7 @@ func TestToStreamSkipUnsupportedTracks(t *testing.T) { _, err = mux.WriteTables() require.NoError(t, err) - r := &mpegts.Reader{R: &buf} + r := &EnhancedReader{R: &buf} err = r.Initialize() require.NoError(t, err) diff --git a/internal/protocols/rtmp/from_stream.go b/internal/protocols/rtmp/from_stream.go index bc040ce3..f821db78 100644 --- a/internal/protocols/rtmp/from_stream.go +++ b/internal/protocols/rtmp/from_stream.go @@ -137,6 +137,37 @@ func setupAudio( return audioFormatMPEG4Audio } + var audioFormatMPEG4AudioLATM *format.MPEG4AudioLATM + audioMedia = str.Desc.FindFormat(&audioFormatMPEG4AudioLATM) + + if audioMedia != nil && !audioFormatMPEG4AudioLATM.CPresent { + str.AddReader( + reader, + audioMedia, + audioFormatMPEG4AudioLATM, + func(u unit.Unit) error { + tunit := u.(*unit.MPEG4AudioLATM) + if tunit.Element == nil { + return nil + } + + var ame mpeg4audio.AudioMuxElement + ame.StreamMuxConfig = audioFormatMPEG4AudioLATM.StreamMuxConfig + err := ame.Unmarshal(tunit.Element) + if err != nil { + return err + } + + nconn.SetWriteDeadline(time.Now().Add(writeTimeout)) + return (*w).WriteMPEG4Audio( + timestampToDuration(tunit.PTS, audioFormatMPEG4AudioLATM.ClockRate()), + ame.Payloads[0][0][0], + ) + }) + + return audioFormatMPEG4AudioLATM + } + var audioFormatMPEG1 *format.MPEG1Audio audioMedia = str.Desc.FindFormat(&audioFormatMPEG1) diff --git a/internal/protocols/rtmp/message/opus_id_header.go b/internal/protocols/rtmp/message/opus_id_header.go index fbd53c05..9d5a364c 100644 --- a/internal/protocols/rtmp/message/opus_id_header.go +++ b/internal/protocols/rtmp/message/opus_id_header.go @@ -8,7 +8,7 @@ import ( var magicSignature = []byte{'O', 'p', 'u', 's', 'H', 'e', 'a', 'd'} // OpusIDHeader is an Opus identification header. -// Specification: https://datatracker.ietf.org/doc/html/rfc7845#section-5.1 +// Specification: RFC7845, section 5.1 type OpusIDHeader struct { Version uint8 ChannelCount uint8 diff --git a/internal/protocols/rtmp/writer.go b/internal/protocols/rtmp/writer.go index f0ce6a00..c29a2faa 100644 --- a/internal/protocols/rtmp/writer.go +++ b/internal/protocols/rtmp/writer.go @@ -95,7 +95,7 @@ func (w *Writer) writeTracks() error { case *format.MPEG1Audio: return message.CodecMPEG1Audio - case *format.MPEG4Audio: + case *format.MPEG4Audio, *format.MPEG4AudioLATM: return message.CodecMPEG4Audio default: @@ -133,14 +133,16 @@ func (w *Writer) writeTracks() error { } } - var audioConfig *mpeg4audio.AudioSpecificConfig + var audioConf *mpeg4audio.AudioSpecificConfig if track, ok := w.AudioTrack.(*format.MPEG4Audio); ok { - audioConfig = track.GetConfig() + audioConf = track.Config + } else if track, ok := w.AudioTrack.(*format.MPEG4AudioLATM); ok { + audioConf = track.StreamMuxConfig.Programs[0].Layers[0].AudioSpecificConfig } - if audioConfig != nil { - enc, err := audioConfig.Marshal() + if audioConf != nil { + enc, err := audioConf.Marshal() if err != nil { return err } diff --git a/internal/recorder/format_fmp4.go b/internal/recorder/format_fmp4.go index 72d52da0..977b4003 100644 --- a/internal/recorder/format_fmp4.go +++ b/internal/recorder/format_fmp4.go @@ -640,10 +640,43 @@ func (f *formatFMP4) initialize() bool { }) case *rtspformat.MPEG4Audio: - co := forma.GetConfig() - if co != nil { + codec := &mp4.CodecMPEG4Audio{ + Config: *forma.Config, + } + track := addTrack(forma, codec) + + f.ri.stream.AddReader( + f.ri, + media, + forma, + func(u unit.Unit) error { + tunit := u.(*unit.MPEG4Audio) + if tunit.AUs == nil { + return nil + } + + for i, au := range tunit.AUs { + pts := tunit.PTS + int64(i)*mpeg4audio.SamplesPerAccessUnit + + err := track.write(&sample{ + Sample: &fmp4.Sample{ + Payload: au, + }, + dts: pts, + ntp: tunit.NTP.Add(timestampToDuration(pts-tunit.PTS, clockRate)), + }) + if err != nil { + return err + } + } + + return nil + }) + + case *rtspformat.MPEG4AudioLATM: + if !forma.CPresent { codec := &mp4.CodecMPEG4Audio{ - Config: *co, + Config: *forma.StreamMuxConfig.Programs[0].Layers[0].AudioSpecificConfig, } track := addTrack(forma, codec) @@ -652,27 +685,25 @@ func (f *formatFMP4) initialize() bool { media, forma, func(u unit.Unit) error { - tunit := u.(*unit.MPEG4Audio) - if tunit.AUs == nil { + tunit := u.(*unit.MPEG4AudioLATM) + if tunit.Element == nil { return nil } - for i, au := range tunit.AUs { - pts := tunit.PTS + int64(i)*mpeg4audio.SamplesPerAccessUnit - - err := track.write(&sample{ - Sample: &fmp4.Sample{ - Payload: au, - }, - dts: pts, - ntp: tunit.NTP.Add(timestampToDuration(pts-tunit.PTS, clockRate)), - }) - if err != nil { - return err - } + var ame mpeg4audio.AudioMuxElement + ame.StreamMuxConfig = forma.StreamMuxConfig + err := ame.Unmarshal(tunit.Element) + if err != nil { + return err } - return nil + return track.write(&sample{ + Sample: &fmp4.Sample{ + Payload: ame.Payloads[0][0][0], + }, + dts: tunit.PTS, + ntp: tunit.NTP, + }) }) } diff --git a/internal/recorder/format_mpegts.go b/internal/recorder/format_mpegts.go index 545f88cc..655b0113 100644 --- a/internal/recorder/format_mpegts.go +++ b/internal/recorder/format_mpegts.go @@ -11,6 +11,7 @@ import ( "github.com/bluenviron/mediacommon/v2/pkg/codecs/ac3" "github.com/bluenviron/mediacommon/v2/pkg/codecs/h264" "github.com/bluenviron/mediacommon/v2/pkg/codecs/h265" + "github.com/bluenviron/mediacommon/v2/pkg/codecs/mpeg4audio" "github.com/bluenviron/mediacommon/v2/pkg/codecs/mpeg4video" "github.com/bluenviron/mediacommon/v2/pkg/formats/mpegts" @@ -306,10 +307,38 @@ func (f *formatMPEGTS) initialize() bool { }) case *rtspformat.MPEG4Audio: - co := forma.GetConfig() - if co != nil { + track := addTrack(forma, &mpegts.CodecMPEG4Audio{ + Config: *forma.Config, + }) + + f.ri.stream.AddReader( + f.ri, + media, + forma, + func(u unit.Unit) error { + tunit := u.(*unit.MPEG4Audio) + if tunit.AUs == nil { + return nil + } + + return f.write( + timestampToDuration(tunit.PTS, clockRate), + tunit.NTP, + false, + true, + func() error { + return f.mw.WriteMPEG4Audio( + track, + multiplyAndDivide(tunit.PTS, 90000, int64(clockRate)), + tunit.AUs) + }, + ) + }) + + case *rtspformat.MPEG4AudioLATM: + if !forma.CPresent { track := addTrack(forma, &mpegts.CodecMPEG4Audio{ - Config: *co, + Config: *forma.StreamMuxConfig.Programs[0].Layers[0].AudioSpecificConfig, }) f.ri.stream.AddReader( @@ -317,11 +346,18 @@ func (f *formatMPEGTS) initialize() bool { media, forma, func(u unit.Unit) error { - tunit := u.(*unit.MPEG4Audio) - if tunit.AUs == nil { + tunit := u.(*unit.MPEG4AudioLATM) + if tunit.Element == nil { return nil } + var ame mpeg4audio.AudioMuxElement + ame.StreamMuxConfig = forma.StreamMuxConfig + err := ame.Unmarshal(tunit.Element) + if err != nil { + return err + } + return f.write( timestampToDuration(tunit.PTS, clockRate), tunit.NTP, @@ -331,7 +367,7 @@ func (f *formatMPEGTS) initialize() bool { return f.mw.WriteMPEG4Audio( track, multiplyAndDivide(tunit.PTS, 90000, int64(clockRate)), - tunit.AUs) + [][]byte{ame.Payloads[0][0][0]}) }, ) }) diff --git a/internal/servers/srt/conn.go b/internal/servers/srt/conn.go index f5a01b21..258b7814 100644 --- a/internal/servers/srt/conn.go +++ b/internal/servers/srt/conn.go @@ -10,7 +10,6 @@ import ( "time" "github.com/bluenviron/gortsplib/v4/pkg/description" - mcmpegts "github.com/bluenviron/mediacommon/v2/pkg/formats/mpegts" srt "github.com/datarhei/gosrt" "github.com/google/uuid" @@ -204,7 +203,7 @@ func (c *conn) runPublish(streamID *streamID) error { func (c *conn) runPublishReader(sconn srt.Conn, path defs.Path) error { sconn.SetReadDeadline(time.Now().Add(time.Duration(c.readTimeout))) - r := &mcmpegts.Reader{R: sconn} + r := &mpegts.EnhancedReader{R: sconn} err := r.Initialize() if err != nil { return err diff --git a/internal/staticsources/srt/source.go b/internal/staticsources/srt/source.go index ecd6d736..f41405db 100644 --- a/internal/staticsources/srt/source.go +++ b/internal/staticsources/srt/source.go @@ -5,7 +5,6 @@ import ( "time" "github.com/bluenviron/gortsplib/v4/pkg/description" - mcmpegts "github.com/bluenviron/mediacommon/v2/pkg/formats/mpegts" srt "github.com/datarhei/gosrt" "github.com/bluenviron/mediamtx/internal/conf" @@ -70,7 +69,7 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { func (s *Source) runReader(sconn srt.Conn) error { sconn.SetReadDeadline(time.Now().Add(time.Duration(s.ReadTimeout))) - r := &mcmpegts.Reader{R: sconn} + r := &mpegts.EnhancedReader{R: sconn} err := r.Initialize() if err != nil { return err diff --git a/internal/staticsources/udp/source.go b/internal/staticsources/udp/source.go index e40a5d7e..80923cc9 100644 --- a/internal/staticsources/udp/source.go +++ b/internal/staticsources/udp/source.go @@ -9,7 +9,6 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/multicast" - mcmpegts "github.com/bluenviron/mediacommon/v2/pkg/formats/mpegts" "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/counterdumper" @@ -137,7 +136,7 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { func (s *Source) runReader(pc net.PacketConn, sourceIP net.IP) error { pc.SetReadDeadline(time.Now().Add(time.Duration(s.ReadTimeout))) pcr := &packetConnReader{pc: pc, sourceIP: sourceIP} - r := &mcmpegts.Reader{R: pcr} + r := &mpegts.EnhancedReader{R: pcr} err := r.Initialize() if err != nil { return err diff --git a/internal/unit/mpeg4_audio_latm.go b/internal/unit/mpeg4_audio_latm.go new file mode 100644 index 00000000..b2a1209b --- /dev/null +++ b/internal/unit/mpeg4_audio_latm.go @@ -0,0 +1,7 @@ +package unit + +// MPEG4AudioLATM is a MPEG-4 Audio LATM data unit. +type MPEG4AudioLATM struct { + Base + Element []byte +}