diff --git a/README.md b/README.md index f4bf47b5..dbea4fc4 100644 --- a/README.md +++ b/README.md @@ -10,10 +10,10 @@ Live streams can be published to the server with: |protocol|variants|codecs| |--------|--------|------| -|RTSP clients (FFmpeg, GStreamer, etc)|UDP, TCP, RTSPS|H264, H265, VP8, VP9, AV1, MPEG-2 video, MPEG-2 audio, M-JPEG, MP3, MPEG-4 video, MPEG-4 Audio (AAC), Opus, G711, G722, LPCM and any RTP-compatible codec| -|RTSP servers and cameras|UDP, UDP-Multicast, TCP, RTSPS|H264, H265, VP8, VP9, AV1, MPEG-2 video, MPEG-2 audio, M-JPEG, MP3, MPEG-4 video, MPEG-4 Audio (AAC), Opus, G711, G722, LPCM and any RTP-compatible codec| -|RTMP clients (OBS Studio)|RTMP, RTMPS|H264, H265, MPEG-4 Audio (AAC)| -|RTMP servers and cameras|RTMP, RTMPS|H264, MPEG-4 Audio (AAC)| +|RTSP clients (FFmpeg, GStreamer, etc)|UDP, TCP, RTSPS|H264, H265, VP8, VP9, AV1, MPEG-2 video, M-JPEG, MPEG-4 video, MPEG-2 audio (MP3), MPEG-4 Audio (AAC), Opus, G711, G722, LPCM and any RTP-compatible codec| +|RTSP servers and cameras|UDP, UDP-Multicast, TCP, RTSPS|H264, H265, VP8, VP9, AV1, MPEG-2 video, M-JPEG, MPEG-4 video, MPEG-2 audio (MP3), MPEG-4 Audio (AAC), Opus, G711, G722, LPCM and any RTP-compatible codec| +|RTMP clients (OBS Studio)|RTMP, RTMPS|H264, H265, MPEG-2 audio (MP3), MPEG-4 Audio (AAC)| +|RTMP servers and cameras|RTMP, RTMPS|H264, MPEG-2 audio (MP3), MPEG-4 Audio (AAC)| |HLS servers and cameras|Low-Latency HLS, MP4-based HLS, legacy HLS|H264, H265, MPEG-4 Audio (AAC), Opus| |UDP/MPEG-TS streams|Unicast, broadcast, multicast|H264, H265, MPEG-4 Audio (AAC), Opus| |Raspberry Pi Cameras||H264| @@ -22,7 +22,7 @@ And can be read from the server with: |protocol|variants|codecs| |--------|--------|------| -|RTSP|UDP, UDP-Multicast, TCP, RTSPS|H264, H265, VP8, VP9, AV1, MPEG-2 video, MPEG-2 audio, M-JPEG, MP3, MPEG-4 video, MPEG-4 Audio (AAC), Opus, G711, G722, LPCM and any RTP-compatible codec| +|RTSP|UDP, UDP-Multicast, TCP, RTSPS|H264, H265, VP8, VP9, AV1, MPEG-2 video, M-JPEG, MPEG-4 video, MPEG-2 audio (MP3), MPEG-4 Audio (AAC), Opus, G711, G722, LPCM and any RTP-compatible codec| |RTMP|RTMP, RTMPS|H264, MPEG-4 Audio (AAC)| |HLS|Low-Latency HLS, MP4-based HLS, legacy HLS|H264, H265, MPEG-4 Audio (AAC), Opus| |WebRTC||H264, VP8, VP9, Opus, G711, G722| diff --git a/go.mod b/go.mod index ac9afc0d..1cfdc825 100644 --- a/go.mod +++ b/go.mod @@ -7,8 +7,8 @@ require ( github.com/alecthomas/kong v0.7.1 github.com/asticode/go-astits v1.11.0 github.com/bluenviron/gohlslib v0.2.1 - github.com/bluenviron/gortsplib/v3 v3.2.1 - github.com/bluenviron/mediacommon v0.3.1 + github.com/bluenviron/gortsplib/v3 v3.3.0 + github.com/bluenviron/mediacommon v0.4.1 github.com/fsnotify/fsnotify v1.6.0 github.com/gin-gonic/gin v1.9.0 github.com/google/uuid v1.3.0 diff --git a/go.sum b/go.sum index 2f3331b0..26e9560e 100644 --- a/go.sum +++ b/go.sum @@ -14,10 +14,10 @@ github.com/asticode/go-astits v1.11.0 h1:GTHUXht0ZXAJXsVbsLIcyfHr1Bchi4QQwMARw2Z github.com/asticode/go-astits v1.11.0/go.mod h1:QSHmknZ51pf6KJdHKZHJTLlMegIrhega3LPWz3ND/iI= github.com/bluenviron/gohlslib v0.2.1 h1:ZDbC87oaSv6B85o5lYC+gcOISJsg1z9IX4xTbxMqJh4= github.com/bluenviron/gohlslib v0.2.1/go.mod h1:fxwqh+twBM2Mi3AZ05nuQ7qvp8un833dFqcyykzv8bc= -github.com/bluenviron/gortsplib/v3 v3.2.1 h1:wdMocTWu1EWa9PPWb8F/S6LY2hZikgrs7zgDtnwBPO0= -github.com/bluenviron/gortsplib/v3 v3.2.1/go.mod h1:AzHdywoBckre5q9Y581xg93PVthXayVHVqYMc3hwBlk= -github.com/bluenviron/mediacommon v0.3.1 h1:C4okNqyN1Mg5CVGcGKk2tEk9Uj2hHZusHV7nqdjn1Lk= -github.com/bluenviron/mediacommon v0.3.1/go.mod h1:t0dqPsWUTchyvib0MhixIwXEgvDX4V9G+I0GzWLQRb8= +github.com/bluenviron/gortsplib/v3 v3.3.0 h1:g7hXsLSXk8Z/qiJ70zLdiWb/HTn5L59ngIAsdFREBLk= +github.com/bluenviron/gortsplib/v3 v3.3.0/go.mod h1:7p+nkw/4yyNrKxHaLwskMhDlXXHKJXqO85kpVOi+eXc= +github.com/bluenviron/mediacommon v0.4.1 h1:oiqvqwnZ0NbB+mCZjuyBtgY7cF88rZdhb/PTSNG9M+Q= +github.com/bluenviron/mediacommon v0.4.1/go.mod h1:t0dqPsWUTchyvib0MhixIwXEgvDX4V9G+I0GzWLQRb8= github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= github.com/bytedance/sonic v1.8.0 h1:ea0Xadu+sHlu7x5O3gKhRpQ1IKiMrSiHttPF0ybECuA= github.com/bytedance/sonic v1.8.0/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U= diff --git a/internal/core/hls_muxer.go b/internal/core/hls_muxer.go index 3de2425a..1171baa6 100644 --- a/internal/core/hls_muxer.go +++ b/internal/core/hls_muxer.go @@ -296,7 +296,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) if medias == nil { return fmt.Errorf( - "the stream doesn't contain any supported codec (which are currently H264, H265, MPEG4-Audio, Opus)") + "the stream doesn't contain any supported codec, which are currently H264, H265, MPEG4-Audio, Opus") } var muxerDirectory string diff --git a/internal/core/hls_source.go b/internal/core/hls_source.go index d31f4109..3988d573 100644 --- a/internal/core/hls_source.go +++ b/internal/core/hls_source.go @@ -103,7 +103,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan } c.OnData(track, func(pts time.Duration, unit interface{}) { - err := stream.writeData(medi, medi.Formats[0], &formatprocessor.UnitH264{ + err := stream.writeUnit(medi, medi.Formats[0], &formatprocessor.UnitH264{ PTS: pts, AU: unit.([][]byte), NTP: time.Now(), @@ -125,7 +125,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan } c.OnData(track, func(pts time.Duration, unit interface{}) { - err := stream.writeData(medi, medi.Formats[0], &formatprocessor.UnitH265{ + err := stream.writeUnit(medi, medi.Formats[0], &formatprocessor.UnitH265{ PTS: pts, AU: unit.([][]byte), NTP: time.Now(), @@ -148,7 +148,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan } c.OnData(track, func(pts time.Duration, unit interface{}) { - err := stream.writeData(medi, medi.Formats[0], &formatprocessor.UnitMPEG4Audio{ + err := stream.writeUnit(medi, medi.Formats[0], &formatprocessor.UnitMPEG4Audio{ PTS: pts, AUs: [][]byte{unit.([]byte)}, NTP: time.Now(), @@ -168,7 +168,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan } c.OnData(track, func(pts time.Duration, unit interface{}) { - err := stream.writeData(medi, medi.Formats[0], &formatprocessor.UnitOpus{ + err := stream.writeUnit(medi, medi.Formats[0], &formatprocessor.UnitOpus{ PTS: pts, Frame: unit.([]byte), NTP: time.Now(), diff --git a/internal/core/hls_source_test.go b/internal/core/hls_source_test.go index 2f42d21f..32003ebd 100644 --- a/internal/core/hls_source_test.go +++ b/internal/core/hls_source_test.go @@ -262,7 +262,8 @@ func TestHLSSource(t *testing.T) { Control: medias[1].Control, Formats: []formats.Format{ &formats.MPEG4Audio{ - PayloadTyp: 96, + PayloadTyp: 96, + ProfileLevelID: 1, Config: &mpeg4audio.Config{ Type: 2, SampleRate: 44100, diff --git a/internal/core/rpicamera_source.go b/internal/core/rpicamera_source.go index b9ab75a4..63c6fc30 100644 --- a/internal/core/rpicamera_source.go +++ b/internal/core/rpicamera_source.go @@ -97,7 +97,7 @@ func (s *rpiCameraSource) run(ctx context.Context, cnf *conf.PathConf, reloadCon stream = res.stream } - err := stream.writeData(medi, medi.Formats[0], &formatprocessor.UnitH264{ + err := stream.writeUnit(medi, medi.Formats[0], &formatprocessor.UnitH264{ PTS: dts, AU: au, NTP: time.Now(), diff --git a/internal/core/rtmp_conn.go b/internal/core/rtmp_conn.go index 577b01f0..63ef3c50 100644 --- a/internal/core/rtmp_conn.go +++ b/internal/core/rtmp_conn.go @@ -14,6 +14,7 @@ import ( "github.com/bluenviron/gortsplib/v3/pkg/media" "github.com/bluenviron/gortsplib/v3/pkg/ringbuffer" "github.com/bluenviron/mediacommon/pkg/codecs/h264" + "github.com/bluenviron/mediacommon/pkg/codecs/mpeg2audio" "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio" "github.com/google/uuid" "github.com/notedit/rtmp/format/flv/flvio" @@ -39,6 +40,96 @@ func pathNameAndQuery(inURL *url.URL) (string, url.Values, string) { return pathName, ur.Query(), ur.RawQuery } +type rtmpWriteFunc func(msg interface{}) error + +func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream) rtmpWriteFunc { + switch format.(type) { + case *formats.H264: + return func(msg interface{}) error { + tmsg := msg.(*message.MsgVideo) + + if tmsg.H264Type == flvio.AVC_SEQHDR { + var conf h264conf.Conf + err := conf.Unmarshal(tmsg.Payload) + if err != nil { + return fmt.Errorf("unable to parse H264 config: %v", err) + } + + au := [][]byte{ + conf.SPS, + conf.PPS, + } + + return stream.writeUnit(medi, format, &formatprocessor.UnitH264{ + PTS: tmsg.DTS + tmsg.PTSDelta, + AU: au, + NTP: time.Now(), + }) + } + + if tmsg.H264Type == flvio.AVC_NALU { + au, err := h264.AVCCUnmarshal(tmsg.Payload) + if err != nil { + return fmt.Errorf("unable to decode AVCC: %v", err) + } + + return stream.writeUnit(medi, format, &formatprocessor.UnitH264{ + PTS: tmsg.DTS + tmsg.PTSDelta, + AU: au, + NTP: time.Now(), + }) + } + + return nil + } + + case *formats.H265: + return func(msg interface{}) error { + tmsg := msg.(*message.MsgVideo) + + au, err := h264.AVCCUnmarshal(tmsg.Payload) + if err != nil { + return fmt.Errorf("unable to decode AVCC: %v", err) + } + + return stream.writeUnit(medi, format, &formatprocessor.UnitH265{ + PTS: tmsg.DTS + tmsg.PTSDelta, + AU: au, + NTP: time.Now(), + }) + } + + case *formats.MPEG2Audio: + return func(msg interface{}) error { + tmsg := msg.(*message.MsgAudio) + + return stream.writeUnit(medi, format, &formatprocessor.UnitMPEG2Audio{ + PTS: tmsg.DTS, + Frames: [][]byte{tmsg.Payload}, + NTP: time.Now(), + }) + } + + case *formats.MPEG4Audio: + return func(msg interface{}) error { + tmsg := msg.(*message.MsgAudio) + + if tmsg.AACType != flvio.AAC_RAW { + return nil + } + + return stream.writeUnit(medi, format, &formatprocessor.UnitMPEG4Audio{ + PTS: tmsg.DTS, + AUs: [][]byte{tmsg.Payload}, + NTP: time.Now(), + }) + } + + default: + return nil + } +} + type rtmpConnState int const ( @@ -73,11 +164,10 @@ type rtmpConn struct { pathManager rtmpConnPathManager parent rtmpConnParent - ctx context.Context - ctxCancel func() - uuid uuid.UUID - created time.Time - // path *path + ctx context.Context + ctxCancel func() + uuid uuid.UUID + created time.Time state rtmpConnState stateMutex sync.Mutex } @@ -252,18 +342,6 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { c.state = rtmpConnStateRead c.stateMutex.Unlock() - var videoFormat *formats.H264 - videoMedia := res.stream.medias().FindFormat(&videoFormat) - videoFirstIDRFound := false - var videoStartDTS time.Duration - - var audioFormat *formats.MPEG4Audio - audioMedia := res.stream.medias().FindFormat(&audioFormat) - - if videoFormat == nil && audioFormat == nil { - return fmt.Errorf("the stream doesn't contain an H264 track or an AAC track") - } - ringBuffer, _ := ringbuffer.New(uint64(c.readBufferCount)) go func() { <-ctx.Done() @@ -271,152 +349,24 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { }() var medias media.Medias + videoFirstIDRFound := false + var videoStartDTS time.Duration + + videoMedia, videoFormat := c.findVideoFormat(res.stream, ringBuffer, + &videoFirstIDRFound, &videoStartDTS) if videoMedia != nil { medias = append(medias, videoMedia) - - videoStartPTSFilled := false - var videoStartPTS time.Duration - var videoDTSExtractor *h264.DTSExtractor - - res.stream.readerAdd(c, videoMedia, videoFormat, func(unit formatprocessor.Unit) { - ringBuffer.Push(func() error { - tunit := unit.(*formatprocessor.UnitH264) - - if tunit.AU == nil { - return nil - } - - if !videoStartPTSFilled { - videoStartPTSFilled = true - videoStartPTS = tunit.PTS - } - pts := tunit.PTS - videoStartPTS - - idrPresent := false - nonIDRPresent := false - - for _, nalu := range tunit.AU { - typ := h264.NALUType(nalu[0] & 0x1F) - switch typ { - case h264.NALUTypeIDR: - idrPresent = true - - case h264.NALUTypeNonIDR: - nonIDRPresent = true - } - } - - var dts time.Duration - - // wait until we receive an IDR - if !videoFirstIDRFound { - if !idrPresent { - return nil - } - - videoFirstIDRFound = true - videoDTSExtractor = h264.NewDTSExtractor() - - var err error - dts, err = videoDTSExtractor.Extract(tunit.AU, pts) - if err != nil { - return err - } - - videoStartDTS = dts - dts = 0 - pts -= videoStartDTS - } else { - if !idrPresent && !nonIDRPresent { - return nil - } - - var err error - dts, err = videoDTSExtractor.Extract(tunit.AU, pts) - if err != nil { - return err - } - - dts -= videoStartDTS - pts -= videoStartDTS - } - - avcc, err := h264.AVCCMarshal(tunit.AU) - if err != nil { - return err - } - - c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) - err = c.conn.WriteMessage(&message.MsgVideo{ - ChunkStreamID: message.MsgVideoChunkStreamID, - MessageStreamID: 0x1000000, - IsKeyFrame: idrPresent, - H264Type: flvio.AVC_NALU, - Payload: avcc, - DTS: dts, - PTSDelta: pts - dts, - }) - if err != nil { - return err - } - - return nil - }) - }) } - if audioMedia != nil { + audioMedia, audioFormat := c.findAudioFormat(res.stream, ringBuffer, + videoFormat, &videoFirstIDRFound, &videoStartDTS) + if audioFormat != nil { medias = append(medias, audioMedia) + } - audioStartPTSFilled := false - var audioStartPTS time.Duration - - res.stream.readerAdd(c, audioMedia, audioFormat, func(unit formatprocessor.Unit) { - ringBuffer.Push(func() error { - tunit := unit.(*formatprocessor.UnitMPEG4Audio) - - if tunit.AUs == nil { - return nil - } - - if !audioStartPTSFilled { - audioStartPTSFilled = true - audioStartPTS = tunit.PTS - } - pts := tunit.PTS - audioStartPTS - - if videoFormat != nil { - if !videoFirstIDRFound { - return nil - } - - pts -= videoStartDTS - if pts < 0 { - return nil - } - } - - for i, au := range tunit.AUs { - c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) - err := c.conn.WriteMessage(&message.MsgAudio{ - ChunkStreamID: message.MsgAudioChunkStreamID, - MessageStreamID: 0x1000000, - Rate: flvio.SOUND_44Khz, - Depth: flvio.SOUND_16BIT, - Channels: flvio.SOUND_STEREO, - AACType: flvio.AAC_RAW, - Payload: au, - DTS: pts + time.Duration(i)*mpeg4audio.SamplesPerAccessUnit* - time.Second/time.Duration(audioFormat.ClockRate()), - }) - if err != nil { - return err - } - } - - return nil - }) - }) + if videoFormat == nil && audioFormat == nil { + return fmt.Errorf( + "the stream doesn't contain any supported codec, which are currently H264, MPEG2-Audio, MPEG4-Audio") } defer res.stream.readerRemove(c) @@ -463,6 +413,245 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { } } +func (c *rtmpConn) findVideoFormat(stream *stream, ringBuffer *ringbuffer.RingBuffer, + videoFirstIDRFound *bool, videoStartDTS *time.Duration, +) (*media.Media, formats.Format) { + var videoFormatH264 *formats.H264 + videoMedia := stream.medias().FindFormat(&videoFormatH264) + + if videoFormatH264 != nil { + videoStartPTSFilled := false + var videoStartPTS time.Duration + var videoDTSExtractor *h264.DTSExtractor + + stream.readerAdd(c, videoMedia, videoFormatH264, func(unit formatprocessor.Unit) { + ringBuffer.Push(func() error { + tunit := unit.(*formatprocessor.UnitH264) + + if tunit.AU == nil { + return nil + } + + if !videoStartPTSFilled { + videoStartPTSFilled = true + videoStartPTS = tunit.PTS + } + pts := tunit.PTS - videoStartPTS + + idrPresent := false + nonIDRPresent := false + + for _, nalu := range tunit.AU { + typ := h264.NALUType(nalu[0] & 0x1F) + switch typ { + case h264.NALUTypeIDR: + idrPresent = true + + case h264.NALUTypeNonIDR: + nonIDRPresent = true + } + } + + var dts time.Duration + + // wait until we receive an IDR + if !*videoFirstIDRFound { + if !idrPresent { + return nil + } + + *videoFirstIDRFound = true + videoDTSExtractor = h264.NewDTSExtractor() + + var err error + dts, err = videoDTSExtractor.Extract(tunit.AU, pts) + if err != nil { + return err + } + + *videoStartDTS = dts + dts = 0 + pts -= *videoStartDTS + } else { + if !idrPresent && !nonIDRPresent { + return nil + } + + var err error + dts, err = videoDTSExtractor.Extract(tunit.AU, pts) + if err != nil { + return err + } + + dts -= *videoStartDTS + pts -= *videoStartDTS + } + + avcc, err := h264.AVCCMarshal(tunit.AU) + if err != nil { + return err + } + + c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) + err = c.conn.WriteMessage(&message.MsgVideo{ + ChunkStreamID: message.MsgVideoChunkStreamID, + MessageStreamID: 0x1000000, + IsKeyFrame: idrPresent, + H264Type: flvio.AVC_NALU, + Payload: avcc, + DTS: dts, + PTSDelta: pts - dts, + }) + if err != nil { + return err + } + + return nil + }) + }) + + return videoMedia, videoFormatH264 + } + + return nil, nil +} + +func (c *rtmpConn) findAudioFormat(stream *stream, ringBuffer *ringbuffer.RingBuffer, + videoFormat formats.Format, videoFirstIDRFound *bool, videoStartDTS *time.Duration, +) (*media.Media, formats.Format) { + var audioFormatMPEG4 *formats.MPEG4Audio + audioMedia := stream.medias().FindFormat(&audioFormatMPEG4) + + if audioMedia != nil { + audioStartPTSFilled := false + var audioStartPTS time.Duration + + stream.readerAdd(c, audioMedia, audioFormatMPEG4, func(unit formatprocessor.Unit) { + ringBuffer.Push(func() error { + tunit := unit.(*formatprocessor.UnitMPEG4Audio) + + if tunit.AUs == nil { + return nil + } + + if !audioStartPTSFilled { + audioStartPTSFilled = true + audioStartPTS = tunit.PTS + } + pts := tunit.PTS - audioStartPTS + + if videoFormat != nil { + if !*videoFirstIDRFound { + return nil + } + + pts -= *videoStartDTS + if pts < 0 { + return nil + } + } + + for i, au := range tunit.AUs { + c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) + err := c.conn.WriteMessage(&message.MsgAudio{ + ChunkStreamID: message.MsgAudioChunkStreamID, + MessageStreamID: 0x1000000, + Codec: message.CodecMPEG4Audio, + Rate: flvio.SOUND_44Khz, + Depth: flvio.SOUND_16BIT, + Channels: flvio.SOUND_STEREO, + AACType: flvio.AAC_RAW, + Payload: au, + DTS: pts + time.Duration(i)*mpeg4audio.SamplesPerAccessUnit* + time.Second/time.Duration(audioFormatMPEG4.ClockRate()), + }) + if err != nil { + return err + } + } + + return nil + }) + }) + + return audioMedia, audioFormatMPEG4 + } + + var audioFormatMPEG2 *formats.MPEG2Audio + audioMedia = stream.medias().FindFormat(&audioFormatMPEG2) + + if audioMedia != nil { + audioStartPTSFilled := false + var audioStartPTS time.Duration + + stream.readerAdd(c, audioMedia, audioFormatMPEG2, func(unit formatprocessor.Unit) { + ringBuffer.Push(func() error { + tunit := unit.(*formatprocessor.UnitMPEG2Audio) + + if !audioStartPTSFilled { + audioStartPTSFilled = true + audioStartPTS = tunit.PTS + } + pts := tunit.PTS - audioStartPTS + + if videoFormat != nil { + if !*videoFirstIDRFound { + return nil + } + + pts -= *videoStartDTS + if pts < 0 { + return nil + } + } + + for _, frame := range tunit.Frames { + var h mpeg2audio.FrameHeader + err := h.Unmarshal(frame) + if err != nil { + return err + } + + if !(!h.MPEG2 && h.Layer == 3) { + return fmt.Errorf("RTMP only supports MPEG-1 audio layer 3") + } + + channels := uint8(flvio.SOUND_STEREO) + if h.ChannelMode == mpeg2audio.ChannelModeMono { + channels = flvio.SOUND_MONO + } + + msg := &message.MsgAudio{ + ChunkStreamID: message.MsgAudioChunkStreamID, + MessageStreamID: 0x1000000, + Codec: message.CodecMPEG2Audio, + Rate: flvio.SOUND_44Khz, + Depth: flvio.SOUND_16BIT, + Channels: channels, + Payload: frame, + DTS: pts, + } + + c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) + err = c.conn.WriteMessage(msg) + if err != nil { + return err + } + + pts += time.Duration(h.SampleCount()) * + time.Second / time.Duration(h.SampleRate) + } + + return nil + }) + }) + + return audioMedia, audioFormatMPEG2 + } + + return nil, nil +} + func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { pathName, query, rawQuery := pathNameAndQuery(u) @@ -538,31 +727,8 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { // disable write deadline to allow outgoing acknowledges c.nconn.SetWriteDeadline(time.Time{}) - var onVideoData func(time.Duration, [][]byte) - - if _, ok := videoFormat.(*formats.H264); ok { - onVideoData = func(pts time.Duration, au [][]byte) { - err = rres.stream.writeData(videoMedia, videoFormat, &formatprocessor.UnitH264{ - PTS: pts, - AU: au, - NTP: time.Now(), - }) - if err != nil { - c.log(logger.Warn, "%v", err) - } - } - } else { - onVideoData = func(pts time.Duration, au [][]byte) { - err = rres.stream.writeData(videoMedia, videoFormat, &formatprocessor.UnitH265{ - PTS: pts, - AU: au, - NTP: time.Now(), - }) - if err != nil { - c.log(logger.Warn, "%v", err) - } - } - } + videoWriteFunc := getRTMPWriteFunc(videoMedia, videoFormat, rres.stream) + audioWriteFunc := getRTMPWriteFunc(audioMedia, audioFormat, rres.stream) for { c.nconn.SetReadDeadline(time.Now().Add(time.Duration(c.readTimeout))) @@ -577,34 +743,9 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { return fmt.Errorf("received a video packet, but track is not set up") } - if tmsg.H264Type == flvio.AVC_SEQHDR { - var conf h264conf.Conf - err = conf.Unmarshal(tmsg.Payload) - if err != nil { - return fmt.Errorf("unable to parse H264 config: %v", err) - } - - au := [][]byte{ - conf.SPS, - conf.PPS, - } - - err := rres.stream.writeData(videoMedia, videoFormat, &formatprocessor.UnitH264{ - PTS: tmsg.DTS + tmsg.PTSDelta, - AU: au, - NTP: time.Now(), - }) - if err != nil { - c.log(logger.Warn, "%v", err) - } - } else if tmsg.H264Type == flvio.AVC_NALU { - au, err := h264.AVCCUnmarshal(tmsg.Payload) - if err != nil { - c.log(logger.Warn, "unable to decode AVCC: %v", err) - continue - } - - onVideoData(tmsg.DTS+tmsg.PTSDelta, au) + err := videoWriteFunc(tmsg) + if err != nil { + c.log(logger.Warn, "%v", err) } case *message.MsgAudio: @@ -612,15 +753,9 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { return fmt.Errorf("received an audio packet, but track is not set up") } - if tmsg.AACType == flvio.AAC_RAW { - err := rres.stream.writeData(audioMedia, audioFormat, &formatprocessor.UnitMPEG4Audio{ - PTS: tmsg.DTS, - AUs: [][]byte{tmsg.Payload}, - NTP: time.Now(), - }) - if err != nil { - c.log(logger.Warn, "%v", err) - } + err := audioWriteFunc(tmsg) + if err != nil { + c.log(logger.Warn, "%v", err) } } } diff --git a/internal/core/rtmp_source.go b/internal/core/rtmp_source.go index 3e149e0e..792dea2f 100644 --- a/internal/core/rtmp_source.go +++ b/internal/core/rtmp_source.go @@ -13,11 +13,8 @@ import ( "github.com/bluenviron/gortsplib/v3/pkg/formats" "github.com/bluenviron/gortsplib/v3/pkg/media" - "github.com/bluenviron/mediacommon/pkg/codecs/h264" - "github.com/notedit/rtmp/format/flv/flvio" "github.com/aler9/mediamtx/internal/conf" - "github.com/aler9/mediamtx/internal/formatprocessor" "github.com/aler9/mediamtx/internal/logger" "github.com/aler9/mediamtx/internal/rtmp" "github.com/aler9/mediamtx/internal/rtmp/message" @@ -154,6 +151,9 @@ func (s *rtmpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha s.parent.sourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{}) }() + videoWriteFunc := getRTMPWriteFunc(videoMedia, videoFormat, res.stream) + audioWriteFunc := getRTMPWriteFunc(audioMedia, audioFormat, res.stream) + // disable write deadline to allow outgoing acknowledges nconn.SetWriteDeadline(time.Time{}) @@ -166,41 +166,23 @@ func (s *rtmpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha switch tmsg := msg.(type) { case *message.MsgVideo: - if tmsg.H264Type == flvio.AVC_NALU { - if videoFormat == nil { - return fmt.Errorf("received an H264 packet, but track is not set up") - } + if videoFormat == nil { + return fmt.Errorf("received an H264 packet, but track is not set up") + } - au, err := h264.AVCCUnmarshal(tmsg.Payload) - if err != nil { - s.Log(logger.Warn, "unable to decode AVCC: %v", err) - continue - } - - err = res.stream.writeData(videoMedia, videoFormat, &formatprocessor.UnitH264{ - PTS: tmsg.DTS + tmsg.PTSDelta, - AU: au, - NTP: time.Now(), - }) - if err != nil { - s.Log(logger.Warn, "%v", err) - } + err := videoWriteFunc(tmsg) + if err != nil { + s.Log(logger.Warn, "%v", err) } case *message.MsgAudio: - if tmsg.AACType == flvio.AAC_RAW { - if audioFormat == nil { - return fmt.Errorf("received an AAC packet, but track is not set up") - } + if audioFormat == nil { + return fmt.Errorf("received an AAC packet, but track is not set up") + } - err := res.stream.writeData(audioMedia, audioFormat, &formatprocessor.UnitMPEG4Audio{ - PTS: tmsg.DTS, - AUs: [][]byte{tmsg.Payload}, - NTP: time.Now(), - }) - if err != nil { - s.Log(logger.Warn, "%v", err) - } + err := audioWriteFunc(tmsg) + if err != nil { + s.Log(logger.Warn, "%v", err) } } } diff --git a/internal/core/rtsp_session.go b/internal/core/rtsp_session.go index 4f0d65a3..be8aae72 100644 --- a/internal/core/rtsp_session.go +++ b/internal/core/rtsp_session.go @@ -11,6 +11,7 @@ import ( "github.com/bluenviron/gortsplib/v3" "github.com/bluenviron/gortsplib/v3/pkg/base" "github.com/bluenviron/gortsplib/v3/pkg/formats" + "github.com/bluenviron/gortsplib/v3/pkg/media" "github.com/bluenviron/gortsplib/v3/pkg/url" "github.com/google/uuid" "github.com/pion/rtp" @@ -25,6 +26,76 @@ const ( pauseAfterAuthError = 2 * time.Second ) +type rtspWriteFunc func(*rtp.Packet) error + +func getRTSPWriteFunc(medi *media.Media, forma formats.Format, stream *stream) rtspWriteFunc { + switch forma.(type) { + case *formats.H264: + return func(pkt *rtp.Packet) error { + return stream.writeUnit(medi, forma, &formatprocessor.UnitH264{ + RTPPackets: []*rtp.Packet{pkt}, + NTP: time.Now(), + }) + } + + case *formats.H265: + return func(pkt *rtp.Packet) error { + return stream.writeUnit(medi, forma, &formatprocessor.UnitH265{ + RTPPackets: []*rtp.Packet{pkt}, + NTP: time.Now(), + }) + } + + case *formats.VP8: + return func(pkt *rtp.Packet) error { + return stream.writeUnit(medi, forma, &formatprocessor.UnitVP8{ + RTPPackets: []*rtp.Packet{pkt}, + NTP: time.Now(), + }) + } + + case *formats.VP9: + return func(pkt *rtp.Packet) error { + return stream.writeUnit(medi, forma, &formatprocessor.UnitVP9{ + RTPPackets: []*rtp.Packet{pkt}, + NTP: time.Now(), + }) + } + + case *formats.MPEG2Audio: + return func(pkt *rtp.Packet) error { + return stream.writeUnit(medi, forma, &formatprocessor.UnitMPEG2Audio{ + RTPPackets: []*rtp.Packet{pkt}, + NTP: time.Now(), + }) + } + + case *formats.MPEG4Audio: + return func(pkt *rtp.Packet) error { + return stream.writeUnit(medi, forma, &formatprocessor.UnitMPEG4Audio{ + RTPPackets: []*rtp.Packet{pkt}, + NTP: time.Now(), + }) + } + + case *formats.Opus: + return func(pkt *rtp.Packet) error { + return stream.writeUnit(medi, forma, &formatprocessor.UnitOpus{ + RTPPackets: []*rtp.Packet{pkt}, + NTP: time.Now(), + }) + } + + default: + return func(pkt *rtp.Packet) error { + return stream.writeUnit(medi, forma, &formatprocessor.UnitGeneric{ + RTPPackets: []*rtp.Packet{pkt}, + NTP: time.Now(), + }) + } + } +} + type rtspSessionPathManager interface { publisherAdd(req pathPublisherAddReq) pathPublisherAnnounceRes readerAdd(req pathReaderAddReq) pathReaderSetupPlayRes @@ -321,87 +392,14 @@ func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R for _, medi := range s.session.AnnouncedMedias() { for _, forma := range medi.Formats { - cmedia := medi - cformat := forma + writeFunc := getRTSPWriteFunc(medi, forma, s.stream) - switch forma.(type) { - case *formats.H264: - ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - err := s.stream.writeData(cmedia, cformat, &formatprocessor.UnitH264{ - RTPPackets: []*rtp.Packet{pkt}, - NTP: time.Now(), - }) - if err != nil { - s.log(logger.Warn, "%v", err) - } - }) - - case *formats.H265: - ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - err := s.stream.writeData(cmedia, cformat, &formatprocessor.UnitH265{ - RTPPackets: []*rtp.Packet{pkt}, - NTP: time.Now(), - }) - if err != nil { - s.log(logger.Warn, "%v", err) - } - }) - - case *formats.VP8: - ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - err := s.stream.writeData(cmedia, cformat, &formatprocessor.UnitVP8{ - RTPPackets: []*rtp.Packet{pkt}, - NTP: time.Now(), - }) - if err != nil { - s.log(logger.Warn, "%v", err) - } - }) - - case *formats.VP9: - ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - err := s.stream.writeData(cmedia, cformat, &formatprocessor.UnitVP9{ - RTPPackets: []*rtp.Packet{pkt}, - NTP: time.Now(), - }) - if err != nil { - s.log(logger.Warn, "%v", err) - } - }) - - case *formats.MPEG4Audio: - ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - err := s.stream.writeData(cmedia, cformat, &formatprocessor.UnitMPEG4Audio{ - RTPPackets: []*rtp.Packet{pkt}, - NTP: time.Now(), - }) - if err != nil { - s.log(logger.Warn, "%v", err) - } - }) - - case *formats.Opus: - ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - err := s.stream.writeData(cmedia, cformat, &formatprocessor.UnitOpus{ - RTPPackets: []*rtp.Packet{pkt}, - NTP: time.Now(), - }) - if err != nil { - s.log(logger.Warn, "%v", err) - } - }) - - default: - ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - err := s.stream.writeData(cmedia, cformat, &formatprocessor.UnitGeneric{ - RTPPackets: []*rtp.Packet{pkt}, - NTP: time.Now(), - }) - if err != nil { - s.log(logger.Warn, "%v", err) - } - }) - } + ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { + err := writeFunc(pkt) + if err != nil { + s.log(logger.Warn, "%v", err) + } + }) } } diff --git a/internal/core/rtsp_source.go b/internal/core/rtsp_source.go index f7d2ed15..4b7f2184 100644 --- a/internal/core/rtsp_source.go +++ b/internal/core/rtsp_source.go @@ -11,11 +11,9 @@ import ( "github.com/bluenviron/gortsplib/v3" "github.com/bluenviron/gortsplib/v3/pkg/base" - "github.com/bluenviron/gortsplib/v3/pkg/formats" "github.com/pion/rtp" "github.com/aler9/mediamtx/internal/conf" - "github.com/aler9/mediamtx/internal/formatprocessor" "github.com/aler9/mediamtx/internal/logger" "github.com/bluenviron/gortsplib/v3/pkg/url" ) @@ -139,87 +137,14 @@ func (s *rtspSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha for _, medi := range medias { for _, forma := range medi.Formats { - cmedia := medi - cformat := forma + writeFunc := getRTSPWriteFunc(medi, forma, res.stream) - switch forma.(type) { - case *formats.H264: - c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - err := res.stream.writeData(cmedia, cformat, &formatprocessor.UnitH264{ - RTPPackets: []*rtp.Packet{pkt}, - NTP: time.Now(), - }) - if err != nil { - s.Log(logger.Warn, "%v", err) - } - }) - - case *formats.H265: - c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - err := res.stream.writeData(cmedia, cformat, &formatprocessor.UnitH265{ - RTPPackets: []*rtp.Packet{pkt}, - NTP: time.Now(), - }) - if err != nil { - s.Log(logger.Warn, "%v", err) - } - }) - - case *formats.VP8: - c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - err := res.stream.writeData(cmedia, cformat, &formatprocessor.UnitVP8{ - RTPPackets: []*rtp.Packet{pkt}, - NTP: time.Now(), - }) - if err != nil { - s.Log(logger.Warn, "%v", err) - } - }) - - case *formats.VP9: - c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - err := res.stream.writeData(cmedia, cformat, &formatprocessor.UnitVP9{ - RTPPackets: []*rtp.Packet{pkt}, - NTP: time.Now(), - }) - if err != nil { - s.Log(logger.Warn, "%v", err) - } - }) - - case *formats.MPEG4Audio: - c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - err := res.stream.writeData(cmedia, cformat, &formatprocessor.UnitMPEG4Audio{ - RTPPackets: []*rtp.Packet{pkt}, - NTP: time.Now(), - }) - if err != nil { - s.Log(logger.Warn, "%v", err) - } - }) - - case *formats.Opus: - c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - err := res.stream.writeData(cmedia, cformat, &formatprocessor.UnitOpus{ - RTPPackets: []*rtp.Packet{pkt}, - NTP: time.Now(), - }) - if err != nil { - s.Log(logger.Warn, "%v", err) - } - }) - - default: - c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - err := res.stream.writeData(cmedia, cformat, &formatprocessor.UnitGeneric{ - RTPPackets: []*rtp.Packet{pkt}, - NTP: time.Now(), - }) - if err != nil { - s.Log(logger.Warn, "%v", err) - } - }) - } + c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { + err := writeFunc(pkt) + if err != nil { + s.Log(logger.Warn, "%v", err) + } + }) } } diff --git a/internal/core/stream.go b/internal/core/stream.go index 40f4a738..0c6000bc 100644 --- a/internal/core/stream.go +++ b/internal/core/stream.go @@ -60,8 +60,8 @@ func (s *stream) readerRemove(r reader) { } } -func (s *stream) writeData(medi *media.Media, forma formats.Format, data formatprocessor.Unit) error { +func (s *stream) writeUnit(medi *media.Media, forma formats.Format, data formatprocessor.Unit) error { sm := s.smedias[medi] sf := sm.formats[forma] - return sf.writeData(s, medi, data) + return sf.writeUnit(s, medi, data) } diff --git a/internal/core/stream_format.go b/internal/core/stream_format.go index b2b5f5bd..bc43a1c6 100644 --- a/internal/core/stream_format.go +++ b/internal/core/stream_format.go @@ -46,7 +46,7 @@ func (sf *streamFormat) readerRemove(r reader) { delete(sf.nonRTSPReaders, r) } -func (sf *streamFormat) writeData(s *stream, medi *media.Media, data formatprocessor.Unit) error { +func (sf *streamFormat) writeUnit(s *stream, medi *media.Media, data formatprocessor.Unit) error { sf.mutex.RLock() defer sf.mutex.RUnlock() diff --git a/internal/core/udp_source.go b/internal/core/udp_source.go index 642132a4..2d7d4211 100644 --- a/internal/core/udp_source.go +++ b/internal/core/udp_source.go @@ -196,7 +196,7 @@ func (s *udpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan return } - err = stream.writeData(medi, medi.Formats[0], &formatprocessor.UnitH264{ + err = stream.writeUnit(medi, medi.Formats[0], &formatprocessor.UnitH264{ PTS: pts, AU: au, NTP: time.Now(), @@ -221,7 +221,7 @@ func (s *udpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan return } - err = stream.writeData(medi, medi.Formats[0], &formatprocessor.UnitH265{ + err = stream.writeUnit(medi, medi.Formats[0], &formatprocessor.UnitH265{ PTS: pts, AU: au, NTP: time.Now(), @@ -256,7 +256,7 @@ func (s *udpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan aus[i] = pkt.AU } - err = stream.writeData(medi, medi.Formats[0], &formatprocessor.UnitMPEG4Audio{ + err = stream.writeUnit(medi, medi.Formats[0], &formatprocessor.UnitMPEG4Audio{ PTS: pts, AUs: aus, NTP: time.Now(), @@ -287,7 +287,7 @@ func (s *udpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan } pos += n - err = stream.writeData(medi, medi.Formats[0], &formatprocessor.UnitOpus{ + err = stream.writeUnit(medi, medi.Formats[0], &formatprocessor.UnitOpus{ PTS: pts, Frame: au.Frame, NTP: time.Now(), diff --git a/internal/formatprocessor/generic_test.go b/internal/formatprocessor/generic_test.go index e72626c0..d407368d 100644 --- a/internal/formatprocessor/generic_test.go +++ b/internal/formatprocessor/generic_test.go @@ -11,7 +11,7 @@ import ( func TestGenericRemovePadding(t *testing.T) { forma := &formats.Generic{ PayloadTyp: 96, - RTPMap: "private/90000", + RTPMa: "private/90000", } forma.Init() diff --git a/internal/formatprocessor/h264.go b/internal/formatprocessor/h264.go index 4b7f9a3e..d116f98d 100644 --- a/internal/formatprocessor/h264.go +++ b/internal/formatprocessor/h264.go @@ -104,7 +104,12 @@ func newH264( } if allocateEncoder { - t.encoder = forma.CreateEncoder() + t.encoder = &rtph264.Encoder{ + PayloadMaxSize: udpMaxPayloadSize - 12, + PayloadType: forma.PayloadTyp, + PacketizationMode: forma.PacketizationMode, + } + t.encoder.Init() } return t, nil @@ -280,6 +285,7 @@ func (t *formatProcessorH264) Process(unit Unit, hasNonRTSPReaders bool) error { tunit.AU = t.remuxAccessUnit(tunit.AU) } + // encode into RTP if len(tunit.AU) != 0 { pkts, err := t.encoder.Encode(tunit.AU, tunit.PTS) if err != nil { diff --git a/internal/formatprocessor/h265.go b/internal/formatprocessor/h265.go index 2f4e05ad..a74a7933 100644 --- a/internal/formatprocessor/h265.go +++ b/internal/formatprocessor/h265.go @@ -111,7 +111,11 @@ func newH265( } if allocateEncoder { - t.encoder = forma.CreateEncoder() + t.encoder = &rtph265.Encoder{ + PayloadMaxSize: t.udpMaxPayloadSize - 12, + PayloadType: forma.PayloadTyp, + } + t.encoder.Init() } return t, nil @@ -303,6 +307,7 @@ func (t *formatProcessorH265) Process(unit Unit, hasNonRTSPReaders bool) error { tunit.AU = t.remuxAccessUnit(tunit.AU) } + // encode into RTP if len(tunit.AU) != 0 { pkts, err := t.encoder.Encode(tunit.AU, tunit.PTS) if err != nil { diff --git a/internal/formatprocessor/mpeg2audio.go b/internal/formatprocessor/mpeg2audio.go new file mode 100644 index 00000000..d93e075b --- /dev/null +++ b/internal/formatprocessor/mpeg2audio.go @@ -0,0 +1,99 @@ +package formatprocessor + +import ( + "fmt" + "time" + + "github.com/bluenviron/gortsplib/v3/pkg/formats" + "github.com/bluenviron/gortsplib/v3/pkg/formats/rtpmpeg2audio" + "github.com/pion/rtp" +) + +// UnitMPEG2Audio is a MPEG2-audio data unit. +type UnitMPEG2Audio struct { + RTPPackets []*rtp.Packet + NTP time.Time + PTS time.Duration + Frames [][]byte +} + +// GetRTPPackets implements Unit. +func (d *UnitMPEG2Audio) GetRTPPackets() []*rtp.Packet { + return d.RTPPackets +} + +// GetNTP implements Unit. +func (d *UnitMPEG2Audio) GetNTP() time.Time { + return d.NTP +} + +type formatProcessorMPEG2Audio struct { + udpMaxPayloadSize int + format *formats.MPEG2Audio + encoder *rtpmpeg2audio.Encoder + decoder *rtpmpeg2audio.Decoder +} + +func newMPEG2Audio( + udpMaxPayloadSize int, + forma *formats.MPEG2Audio, + allocateEncoder bool, +) (*formatProcessorMPEG2Audio, error) { + t := &formatProcessorMPEG2Audio{ + udpMaxPayloadSize: udpMaxPayloadSize, + format: forma, + } + + if allocateEncoder { + t.encoder = &rtpmpeg2audio.Encoder{ + PayloadMaxSize: t.udpMaxPayloadSize - 12, + } + t.encoder.Init() + } + + return t, nil +} + +func (t *formatProcessorMPEG2Audio) Process(unit Unit, hasNonRTSPReaders bool) error { //nolint:dupl + tunit := unit.(*UnitMPEG2Audio) + + if tunit.RTPPackets != nil { + pkt := tunit.RTPPackets[0] + + // remove padding + pkt.Header.Padding = false + pkt.PaddingSize = 0 + + if pkt.MarshalSize() > t.udpMaxPayloadSize { + return fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", + pkt.MarshalSize(), t.udpMaxPayloadSize) + } + + // decode from RTP + if hasNonRTSPReaders { + if t.decoder == nil { + t.decoder = t.format.CreateDecoder() + } + + frames, pts, err := t.decoder.Decode(pkt) + if err != nil { + return err + } + + tunit.Frames = frames + tunit.PTS = pts + } + + // route packet as is + return nil + } + + // encode into RTP + pkts, err := t.encoder.Encode(tunit.Frames, tunit.PTS) + if err != nil { + return err + } + tunit.RTPPackets = pkts + + return nil +} diff --git a/internal/formatprocessor/mpeg4audio.go b/internal/formatprocessor/mpeg4audio.go index 206a9e9a..1fd0d584 100644 --- a/internal/formatprocessor/mpeg4audio.go +++ b/internal/formatprocessor/mpeg4audio.go @@ -45,7 +45,15 @@ func newMPEG4Audio( } if allocateEncoder { - t.encoder = forma.CreateEncoder() + t.encoder = &rtpmpeg4audio.Encoder{ + PayloadMaxSize: t.udpMaxPayloadSize - 12, + PayloadType: forma.PayloadTyp, + SampleRate: forma.Config.SampleRate, + SizeLength: forma.SizeLength, + IndexLength: forma.IndexLength, + IndexDeltaLength: forma.IndexDeltaLength, + } + t.encoder.Init() } return t, nil @@ -88,11 +96,12 @@ func (t *formatProcessorMPEG4Audio) Process(unit Unit, hasNonRTSPReaders bool) e return nil } + // encode into RTP pkts, err := t.encoder.Encode(tunit.AUs, tunit.PTS) if err != nil { return err } - tunit.RTPPackets = pkts + return nil } diff --git a/internal/formatprocessor/opus.go b/internal/formatprocessor/opus.go index 073d5d4e..f4022e1e 100644 --- a/internal/formatprocessor/opus.go +++ b/internal/formatprocessor/opus.go @@ -45,7 +45,12 @@ func newOpus( } if allocateEncoder { - t.encoder = forma.CreateEncoder() + t.encoder = &rtpsimpleaudio.Encoder{ + PayloadMaxSize: t.udpMaxPayloadSize - 12, + PayloadType: forma.PayloadTyp, + SampleRate: 48000, + } + t.encoder.Init() } return t, nil @@ -85,11 +90,12 @@ func (t *formatProcessorOpus) Process(unit Unit, hasNonRTSPReaders bool) error { return nil } + // encode into RTP pkt, err := t.encoder.Encode(tunit.Frame, tunit.PTS) if err != nil { return err } - tunit.RTPPackets = []*rtp.Packet{pkt} + return nil } diff --git a/internal/formatprocessor/processor.go b/internal/formatprocessor/processor.go index c82d80f6..05fb47ec 100644 --- a/internal/formatprocessor/processor.go +++ b/internal/formatprocessor/processor.go @@ -30,6 +30,9 @@ func New( case *formats.VP9: return newVP9(udpMaxPayloadSize, forma, generateRTPPackets) + case *formats.MPEG2Audio: + return newMPEG2Audio(udpMaxPayloadSize, forma, generateRTPPackets) + case *formats.MPEG4Audio: return newMPEG4Audio(udpMaxPayloadSize, forma, generateRTPPackets) diff --git a/internal/formatprocessor/vp8.go b/internal/formatprocessor/vp8.go index 3f9a8d2d..041321f5 100644 --- a/internal/formatprocessor/vp8.go +++ b/internal/formatprocessor/vp8.go @@ -45,7 +45,11 @@ func newVP8( } if allocateEncoder { - t.encoder = forma.CreateEncoder() + t.encoder = &rtpvp8.Encoder{ + PayloadMaxSize: t.udpMaxPayloadSize - 12, + PayloadType: forma.PayloadTyp, + } + t.encoder.Init() } return t, nil @@ -88,11 +92,12 @@ func (t *formatProcessorVP8) Process(unit Unit, hasNonRTSPReaders bool) error { return nil } + // encode into RTP pkts, err := t.encoder.Encode(tunit.Frame, tunit.PTS) if err != nil { return err } - tunit.RTPPackets = pkts + return nil } diff --git a/internal/formatprocessor/vp9.go b/internal/formatprocessor/vp9.go index 3fee3cd9..d886a5cc 100644 --- a/internal/formatprocessor/vp9.go +++ b/internal/formatprocessor/vp9.go @@ -45,7 +45,11 @@ func newVP9( } if allocateEncoder { - t.encoder = forma.CreateEncoder() + t.encoder = &rtpvp9.Encoder{ + PayloadMaxSize: t.udpMaxPayloadSize - 12, + PayloadType: forma.PayloadTyp, + } + t.encoder.Init() } return t, nil @@ -88,11 +92,12 @@ func (t *formatProcessorVP9) Process(unit Unit, hasNonRTSPReaders bool) error { return nil } + // encode into RTP pkts, err := t.encoder.Encode(tunit.Frame, tunit.PTS) if err != nil { return err } - tunit.RTPPackets = pkts + return nil } diff --git a/internal/rtmp/conn.go b/internal/rtmp/conn.go index b19caa93..5920f1b6 100644 --- a/internal/rtmp/conn.go +++ b/internal/rtmp/conn.go @@ -23,7 +23,6 @@ import ( const ( codecH264 = 7 - codecAAC = 10 ) func resultIsOK1(res *message.MsgCommandAMF0) bool { @@ -617,7 +616,7 @@ func trackFromAACDecoderConfig(data []byte) (*formats.MPEG4Audio, error) { var errEmptyMetadata = errors.New("metadata is empty") -func (c *Conn) readTracksFromMetadata(payload []interface{}) (formats.Format, *formats.MPEG4Audio, error) { +func (c *Conn) readTracksFromMetadata(payload []interface{}) (formats.Format, formats.Format, error) { if len(payload) != 1 { return nil, nil, fmt.Errorf("invalid metadata") } @@ -627,6 +626,9 @@ func (c *Conn) readTracksFromMetadata(payload []interface{}) (formats.Format, *f return nil, nil, fmt.Errorf("invalid metadata") } + var videoTrack formats.Format + var audioTrack formats.Format + hasVideo, err := func() (bool, error) { v, ok := md.GetV("videocodecid") if !ok { @@ -667,7 +669,11 @@ func (c *Conn) readTracksFromMetadata(payload []interface{}) (formats.Format, *f case 0: return false, nil - case codecAAC: + case message.CodecMPEG2Audio: + audioTrack = &formats.MPEG2Audio{} + return true, nil + + case message.CodecMPEG4Audio: return true, nil } @@ -687,9 +693,6 @@ func (c *Conn) readTracksFromMetadata(payload []interface{}) (formats.Format, *f return nil, nil, errEmptyMetadata } - var videoTrack formats.Format - var audioTrack *formats.MPEG4Audio - for { msg, err := c.ReadMessage() if err != nil { @@ -750,7 +753,7 @@ func (c *Conn) readTracksFromMetadata(payload []interface{}) (formats.Format, *f } if audioTrack == nil { - if tmsg.AACType == flvio.AVC_SEQHDR { + if tmsg.Codec == message.CodecMPEG4Audio && tmsg.AACType == flvio.AVC_SEQHDR { audioTrack, err = trackFromAACDecoderConfig(tmsg.Payload) if err != nil { return nil, nil, err @@ -842,7 +845,7 @@ outer: // ReadTracks reads track informations. // It returns the video track and the audio track. -func (c *Conn) ReadTracks() (formats.Format, *formats.MPEG4Audio, error) { +func (c *Conn) ReadTracks() (formats.Format, formats.Format, error) { msg, err := func() (message.Message, error) { for { msg, err := c.ReadMessage() @@ -901,7 +904,7 @@ func (c *Conn) ReadTracks() (formats.Format, *formats.MPEG4Audio, error) { } // WriteTracks writes track informations. -func (c *Conn) WriteTracks(videoTrack *formats.H264, audioTrack *formats.MPEG4Audio) error { +func (c *Conn) WriteTracks(videoTrack formats.Format, audioTrack formats.Format) error { err := c.WriteMessage(&message.MsgDataAMF0{ ChunkStreamID: 4, MessageStreamID: 0x1000000, @@ -916,10 +919,13 @@ func (c *Conn) WriteTracks(videoTrack *formats.H264, audioTrack *formats.MPEG4Au { K: "videocodecid", V: func() float64 { - if videoTrack != nil { + switch videoTrack.(type) { + case *formats.H264: return codecH264 + + default: + return 0 } - return 0 }(), }, { @@ -929,10 +935,16 @@ func (c *Conn) WriteTracks(videoTrack *formats.H264, audioTrack *formats.MPEG4Au { K: "audiocodecid", V: func() float64 { - if audioTrack != nil { - return codecAAC + switch audioTrack.(type) { + case *formats.MPEG2Audio: + return message.CodecMPEG2Audio + + case *formats.MPEG4Audio: + return message.CodecMPEG4Audio + + default: + return 0 } - return 0 }(), }, }, @@ -942,10 +954,11 @@ func (c *Conn) WriteTracks(videoTrack *formats.H264, audioTrack *formats.MPEG4Au return err } - // write decoder config only if SPS and PPS are available. - // if they're not available yet, they're sent later. - if videoTrack != nil { - sps, pps := videoTrack.SafeParams() + if h264Track, ok := videoTrack.(*formats.H264); ok { + sps, pps := h264Track.SafeParams() + + // write decoder config only if SPS and PPS are available. + // if they're not available yet, they're sent later. if sps != nil && pps != nil { buf, _ := h264conf.Conf{ SPS: sps, @@ -965,8 +978,8 @@ func (c *Conn) WriteTracks(videoTrack *formats.H264, audioTrack *formats.MPEG4Au } } - if audioTrack != nil { - enc, err := audioTrack.Config.Marshal() + if mpeg4audioTrack, ok := audioTrack.(*formats.MPEG4Audio); ok { + enc, err := mpeg4audioTrack.Config.Marshal() if err != nil { return err } @@ -974,6 +987,7 @@ func (c *Conn) WriteTracks(videoTrack *formats.H264, audioTrack *formats.MPEG4Au err = c.WriteMessage(&message.MsgAudio{ ChunkStreamID: message.MsgAudioChunkStreamID, MessageStreamID: 0x1000000, + Codec: message.CodecMPEG4Audio, Rate: flvio.SOUND_44Khz, Depth: flvio.SOUND_16BIT, Channels: flvio.SOUND_STEREO, diff --git a/internal/rtmp/conn_test.go b/internal/rtmp/conn_test.go index 30ae11c0..60ec26b7 100644 --- a/internal/rtmp/conn_test.go +++ b/internal/rtmp/conn_test.go @@ -515,7 +515,7 @@ func TestReadTracks(t *testing.T) { PPS: pps, PacketizationMode: 1, }, - (*formats.MPEG4Audio)(nil), + nil, }, { "metadata without codec id", @@ -801,7 +801,7 @@ func TestReadTracks(t *testing.T) { }, { K: "audiocodecid", - V: float64(codecAAC), + V: float64(message.CodecMPEG4Audio), }, }, }, @@ -832,6 +832,7 @@ func TestReadTracks(t *testing.T) { err = mrw.Write(&message.MsgAudio{ ChunkStreamID: message.MsgAudioChunkStreamID, MessageStreamID: 0x1000000, + Codec: message.CodecMPEG4Audio, Rate: flvio.SOUND_44Khz, Depth: flvio.SOUND_16BIT, Channels: flvio.SOUND_STEREO, @@ -932,6 +933,7 @@ func TestReadTracks(t *testing.T) { err = mrw.Write(&message.MsgAudio{ ChunkStreamID: message.MsgAudioChunkStreamID, MessageStreamID: 0x1000000, + Codec: message.CodecMPEG4Audio, Rate: flvio.SOUND_44Khz, Depth: flvio.SOUND_16BIT, Channels: flvio.SOUND_STEREO, @@ -965,6 +967,7 @@ func TestReadTracks(t *testing.T) { err = mrw.Write(&message.MsgAudio{ ChunkStreamID: message.MsgAudioChunkStreamID, MessageStreamID: 0x1000000, + Codec: message.CodecMPEG4Audio, Rate: flvio.SOUND_44Khz, Depth: flvio.SOUND_16BIT, Channels: flvio.SOUND_STEREO, @@ -984,6 +987,7 @@ func TestReadTracks(t *testing.T) { err = mrw.Write(&message.MsgAudio{ ChunkStreamID: message.MsgAudioChunkStreamID, MessageStreamID: 0x1000000, + Codec: message.CodecMPEG4Audio, Rate: flvio.SOUND_44Khz, Depth: flvio.SOUND_16BIT, Channels: flvio.SOUND_STEREO, @@ -995,6 +999,7 @@ func TestReadTracks(t *testing.T) { err = mrw.Write(&message.MsgAudio{ ChunkStreamID: message.MsgAudioChunkStreamID, MessageStreamID: 0x1000000, + Codec: message.CodecMPEG4Audio, Rate: flvio.SOUND_44Khz, Depth: flvio.SOUND_16BIT, Channels: flvio.SOUND_STEREO, @@ -1026,7 +1031,7 @@ func TestReadTracks(t *testing.T) { }, { K: "audiocodecid", - V: float64(codecAAC), + V: float64(message.CodecMPEG4Audio), }, }, }, @@ -1075,6 +1080,7 @@ func TestReadTracks(t *testing.T) { err = mrw.Write(&message.MsgAudio{ ChunkStreamID: message.MsgAudioChunkStreamID, MessageStreamID: 0x1000000, + Codec: message.CodecMPEG4Audio, Rate: flvio.SOUND_44Khz, Depth: flvio.SOUND_16BIT, Channels: flvio.SOUND_STEREO, @@ -1375,6 +1381,7 @@ func TestWriteTracks(t *testing.T) { require.Equal(t, &message.MsgAudio{ ChunkStreamID: message.MsgAudioChunkStreamID, MessageStreamID: 0x1000000, + Codec: message.CodecMPEG4Audio, Rate: flvio.SOUND_44Khz, Depth: flvio.SOUND_16BIT, Channels: flvio.SOUND_STEREO, diff --git a/internal/rtmp/message/msg_audio.go b/internal/rtmp/message/msg_audio.go index 8a4eec42..5a80b3e8 100644 --- a/internal/rtmp/message/msg_audio.go +++ b/internal/rtmp/message/msg_audio.go @@ -4,15 +4,19 @@ import ( "fmt" "time" - "github.com/notedit/rtmp/format/flv/flvio" - "github.com/aler9/mediamtx/internal/rtmp/chunk" "github.com/aler9/mediamtx/internal/rtmp/rawmessage" ) const ( // MsgAudioChunkStreamID is the chunk stream ID that is usually used to send MsgAudio{} - MsgAudioChunkStreamID = 6 + MsgAudioChunkStreamID = 4 +) + +// supported audio codecs +const ( + CodecMPEG2Audio = 2 + CodecMPEG4Audio = 10 ) // MsgAudio is an audio message. @@ -20,10 +24,11 @@ type MsgAudio struct { ChunkStreamID byte DTS time.Duration MessageStreamID uint32 + Codec uint8 Rate uint8 Depth uint8 Channels uint8 - AACType uint8 + AACType uint8 // only for CodecMPEG4Audio Payload []byte } @@ -37,28 +42,45 @@ func (m *MsgAudio) Unmarshal(raw *rawmessage.Message) error { return fmt.Errorf("invalid body size") } - codec := raw.Body[0] >> 4 - if codec != flvio.SOUND_AAC { - return fmt.Errorf("unsupported audio codec: %d", codec) + m.Codec = raw.Body[0] >> 4 + switch m.Codec { + case CodecMPEG2Audio, CodecMPEG4Audio: + default: + return fmt.Errorf("unsupported audio codec: %d", m.Codec) } m.Rate = (raw.Body[0] >> 2) & 0x03 m.Depth = (raw.Body[0] >> 1) & 0x01 m.Channels = raw.Body[0] & 0x01 - m.AACType = raw.Body[1] - m.Payload = raw.Body[2:] + + if m.Codec == CodecMPEG2Audio { + m.Payload = raw.Body[1:] + } else { + m.AACType = raw.Body[1] + m.Payload = raw.Body[2:] + } return nil } // Marshal implements Message. func (m MsgAudio) Marshal() (*rawmessage.Message, error) { - body := make([]byte, 2+len(m.Payload)) + var l int + if m.Codec == CodecMPEG2Audio { + l = 1 + len(m.Payload) + } else { + l = 2 + len(m.Payload) + } + body := make([]byte, l) - body[0] = flvio.SOUND_AAC<<4 | m.Rate<<2 | m.Depth<<1 | m.Channels - body[1] = m.AACType + body[0] = m.Codec<<4 | m.Rate<<2 | m.Depth<<1 | m.Channels - copy(body[2:], m.Payload) + if m.Codec == CodecMPEG2Audio { + copy(body[1:], m.Payload) + } else { + body[1] = m.AACType + copy(body[2:], m.Payload) + } return &rawmessage.Message{ ChunkStreamID: m.ChunkStreamID, diff --git a/internal/rtmp/message/reader_test.go b/internal/rtmp/message/reader_test.go index 789a88fd..2b5c3bc1 100644 --- a/internal/rtmp/message/reader_test.go +++ b/internal/rtmp/message/reader_test.go @@ -27,11 +27,29 @@ var readWriterCases = []struct { }, }, { - "audio", + "audio mpeg2", &MsgAudio{ ChunkStreamID: 7, DTS: 6013806 * time.Millisecond, MessageStreamID: 4534543, + Codec: CodecMPEG2Audio, + Rate: flvio.SOUND_44Khz, + Depth: flvio.SOUND_16BIT, + Channels: flvio.SOUND_STEREO, + Payload: []byte{0x01, 0x02, 0x03, 0x04}, + }, + []byte{ + 0x7, 0x5b, 0xc3, 0x6e, 0x0, 0x0, 0x5, 0x8, 0x0, 0x45, 0x31, 0xf, 0x2f, + 0x01, 0x02, 0x03, 0x04, + }, + }, + { + "audio mpeg4", + &MsgAudio{ + ChunkStreamID: 7, + DTS: 6013806 * time.Millisecond, + MessageStreamID: 4534543, + Codec: CodecMPEG4Audio, Rate: flvio.SOUND_44Khz, Depth: flvio.SOUND_16BIT, Channels: flvio.SOUND_STEREO,