rtmp: support ingesting AV1, VP9, H265, MP3, PCM from other servers (#3751)

This commit is contained in:
Alessandro Ros
2024-09-09 12:26:35 +02:00
committed by GitHub
parent ab85249638
commit e6653857aa
8 changed files with 353 additions and 352 deletions

View File

@@ -27,7 +27,7 @@ Live streams can be published to the server with:
|[RTSP clients](#rtsp-clients)|UDP, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3, G726, G722, G711 (PCMA, PCMU), LPCM and any RTP-compatible codec|
|[RTSP cameras and servers](#rtsp-cameras-and-servers)|UDP, UDP-Multicast, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3, G726, G722, G711 (PCMA, PCMU), LPCM and any RTP-compatible codec|
|[RTMP clients](#rtmp-clients)|RTMP, RTMPS, Enhanced RTMP|AV1, VP9, H265, H264|MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), G711 (PCMA, PCMU), LPCM|
|[RTMP cameras and servers](#rtmp-cameras-and-servers)|RTMP, RTMPS, Enhanced RTMP|H264|MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)|
|[RTMP cameras and servers](#rtmp-cameras-and-servers)|RTMP, RTMPS, Enhanced RTMP|AV1, VP9, H265, H264|MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), G711 (PCMA, PCMU), LPCM|
|[HLS cameras and servers](#hls-cameras-and-servers)|Low-Latency HLS, MP4-based HLS, legacy HLS|AV1, VP9, H265, H264|Opus, MPEG-4 Audio (AAC)|
|[UDP/MPEG-TS](#udpmpeg-ts)|Unicast, broadcast, multicast|H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3|
|[Raspberry Pi Cameras](#raspberry-pi-cameras)||H264||

View File

@@ -21,7 +21,7 @@ func durationGoToMPEGTS(v time.Duration) int64 {
return int64(v.Seconds() * 90000)
}
// FromStream links a server stream to a MPEG-TS writer.
// FromStream maps a MediaMTX stream to a MPEG-TS writer.
func FromStream(
stream *stream.Stream,
writer *asyncwriter.Writer,

View File

@@ -17,7 +17,7 @@ import (
var ErrNoTracks = errors.New("no supported tracks found (supported are H265, H264," +
" MPEG-4 Video, MPEG-1/2 Video, Opus, MPEG-4 Audio, MPEG-1 Audio, AC-3")
// ToStream converts a MPEG-TS stream to a server stream.
// ToStream maps a MPEG-TS stream to a MediaMTX stream.
func ToStream(r *mpegts.Reader, stream **stream.Stream) ([]*description.Media, error) {
var medias []*description.Media //nolint:prealloc

View File

@@ -0,0 +1,204 @@
package rtmp
import (
"errors"
"fmt"
"net"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediacommon/pkg/codecs/h264"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg1audio"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio"
"github.com/bluenviron/mediamtx/internal/asyncwriter"
"github.com/bluenviron/mediamtx/internal/stream"
"github.com/bluenviron/mediamtx/internal/unit"
)
var errNoSupportedCodecs = errors.New(
"the stream doesn't contain any supported codec, which are currently H264, MPEG-4 Audio, MPEG-1/2 Audio")
func setupVideo(
stream *stream.Stream,
writer *asyncwriter.Writer,
w **Writer,
nconn net.Conn,
writeTimeout time.Duration,
) format.Format {
var videoFormatH264 *format.H264
videoMedia := stream.Desc().FindFormat(&videoFormatH264)
if videoFormatH264 != nil {
var videoDTSExtractor *h264.DTSExtractor
stream.AddReader(writer, videoMedia, videoFormatH264, func(u unit.Unit) error {
tunit := u.(*unit.H264)
if tunit.AU == nil {
return nil
}
idrPresent := false
nonIDRPresent := false
for _, nalu := range tunit.AU {
typ := h264.NALUType(nalu[0] & 0x1F)
switch typ {
case h264.NALUTypeIDR:
idrPresent = true
case h264.NALUTypeNonIDR:
nonIDRPresent = true
}
}
var dts time.Duration
// wait until we receive an IDR
if videoDTSExtractor == nil {
if !idrPresent {
return nil
}
videoDTSExtractor = h264.NewDTSExtractor()
var err error
dts, err = videoDTSExtractor.Extract(tunit.AU, tunit.PTS)
if err != nil {
return err
}
} else {
if !idrPresent && !nonIDRPresent {
return nil
}
var err error
dts, err = videoDTSExtractor.Extract(tunit.AU, tunit.PTS)
if err != nil {
return err
}
}
nconn.SetWriteDeadline(time.Now().Add(writeTimeout))
return (*w).WriteH264(tunit.PTS, dts, idrPresent, tunit.AU)
})
return videoFormatH264
}
return nil
}
func setupAudio(
stream *stream.Stream,
writer *asyncwriter.Writer,
w **Writer,
nconn net.Conn,
writeTimeout time.Duration,
) format.Format {
var audioFormatMPEG4Audio *format.MPEG4Audio
audioMedia := stream.Desc().FindFormat(&audioFormatMPEG4Audio)
if audioMedia != nil {
stream.AddReader(writer, audioMedia, audioFormatMPEG4Audio, func(u unit.Unit) error {
tunit := u.(*unit.MPEG4Audio)
if tunit.AUs == nil {
return nil
}
for i, au := range tunit.AUs {
nconn.SetWriteDeadline(time.Now().Add(writeTimeout))
err := (*w).WriteMPEG4Audio(
tunit.PTS+time.Duration(i)*mpeg4audio.SamplesPerAccessUnit*
time.Second/time.Duration(audioFormatMPEG4Audio.ClockRate()),
au,
)
if err != nil {
return err
}
}
return nil
})
return audioFormatMPEG4Audio
}
var audioFormatMPEG1 *format.MPEG1Audio
audioMedia = stream.Desc().FindFormat(&audioFormatMPEG1)
if audioMedia != nil {
stream.AddReader(writer, audioMedia, audioFormatMPEG1, func(u unit.Unit) error {
tunit := u.(*unit.MPEG1Audio)
pts := tunit.PTS
for _, frame := range tunit.Frames {
var h mpeg1audio.FrameHeader
err := h.Unmarshal(frame)
if err != nil {
return err
}
if !(!h.MPEG2 && h.Layer == 3) {
return fmt.Errorf("RTMP only supports MPEG-1 layer 3 audio")
}
nconn.SetWriteDeadline(time.Now().Add(writeTimeout))
err = (*w).WriteMPEG1Audio(pts, &h, frame)
if err != nil {
return err
}
pts += time.Duration(h.SampleCount()) *
time.Second / time.Duration(h.SampleRate)
}
return nil
})
return audioFormatMPEG1
}
return nil
}
// FromStream maps a MediaMTX stream to a RTMP stream.
func FromStream(
stream *stream.Stream,
writer *asyncwriter.Writer,
conn *Conn,
nconn net.Conn,
writeTimeout time.Duration,
) error {
var w *Writer
videoFormat := setupVideo(
stream,
writer,
&w,
nconn,
writeTimeout,
)
audioFormat := setupAudio(
stream,
writer,
&w,
nconn,
writeTimeout,
)
if videoFormat == nil && audioFormat == nil {
return errNoSupportedCodecs
}
var err error
w, err = NewWriter(conn, videoFormat, audioFormat)
if err != nil {
return err
}
return nil
}

View File

@@ -0,0 +1,134 @@
package rtmp
import (
"fmt"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediamtx/internal/stream"
"github.com/bluenviron/mediamtx/internal/unit"
)
// ToStream maps a RTMP stream to a MediaMTX stream.
func ToStream(r *Reader, stream **stream.Stream) ([]*description.Media, error) {
videoFormat, audioFormat := r.Tracks()
var medias []*description.Media
if videoFormat != nil {
medi := &description.Media{
Type: description.MediaTypeVideo,
Formats: []format.Format{videoFormat},
}
medias = append(medias, medi)
switch videoFormat.(type) {
case *format.AV1:
r.OnDataAV1(func(pts time.Duration, tu [][]byte) {
(*stream).WriteUnit(medi, videoFormat, &unit.AV1{
Base: unit.Base{
NTP: time.Now(),
PTS: pts,
},
TU: tu,
})
})
case *format.VP9:
r.OnDataVP9(func(pts time.Duration, frame []byte) {
(*stream).WriteUnit(medi, videoFormat, &unit.VP9{
Base: unit.Base{
NTP: time.Now(),
PTS: pts,
},
Frame: frame,
})
})
case *format.H265:
r.OnDataH265(func(pts time.Duration, au [][]byte) {
(*stream).WriteUnit(medi, videoFormat, &unit.H265{
Base: unit.Base{
NTP: time.Now(),
PTS: pts,
},
AU: au,
})
})
case *format.H264:
r.OnDataH264(func(pts time.Duration, au [][]byte) {
(*stream).WriteUnit(medi, videoFormat, &unit.H264{
Base: unit.Base{
NTP: time.Now(),
PTS: pts,
},
AU: au,
})
})
default:
return nil, fmt.Errorf("unsupported video codec: %T", videoFormat)
}
}
if audioFormat != nil {
medi := &description.Media{
Type: description.MediaTypeAudio,
Formats: []format.Format{audioFormat},
}
medias = append(medias, medi)
switch audioFormat.(type) {
case *format.MPEG4Audio:
r.OnDataMPEG4Audio(func(pts time.Duration, au []byte) {
(*stream).WriteUnit(medi, audioFormat, &unit.MPEG4Audio{
Base: unit.Base{
NTP: time.Now(),
PTS: pts,
},
AUs: [][]byte{au},
})
})
case *format.MPEG1Audio:
r.OnDataMPEG1Audio(func(pts time.Duration, frame []byte) {
(*stream).WriteUnit(medi, audioFormat, &unit.MPEG1Audio{
Base: unit.Base{
NTP: time.Now(),
PTS: pts,
},
Frames: [][]byte{frame},
})
})
case *format.G711:
r.OnDataG711(func(pts time.Duration, samples []byte) {
(*stream).WriteUnit(medi, audioFormat, &unit.G711{
Base: unit.Base{
NTP: time.Now(),
PTS: pts,
},
Samples: samples,
})
})
case *format.LPCM:
r.OnDataLPCM(func(pts time.Duration, samples []byte) {
(*stream).WriteUnit(medi, audioFormat, &unit.LPCM{
Base: unit.Base{
NTP: time.Now(),
PTS: pts,
},
Samples: samples,
})
})
default:
return nil, fmt.Errorf("unsupported audio codec: %T", audioFormat)
}
}
return medias, nil
}

View File

@@ -422,7 +422,7 @@ func TestPeerConnectionPublishRead(t *testing.T) {
}
}
// test that an audio codec is present regardless of the fact that an audio track is not.
// test that an audio codec is present regardless of the fact that an audio track is.
func TestPeerConnectionFallbackCodecs(t *testing.T) {
pc1 := &PeerConnection{
HandshakeTimeout: conf.StringDuration(10 * time.Second),

View File

@@ -11,10 +11,6 @@ import (
"time"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediacommon/pkg/codecs/h264"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg1audio"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio"
"github.com/google/uuid"
"github.com/bluenviron/mediamtx/internal/asyncwriter"
@@ -26,12 +22,8 @@ import (
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/protocols/rtmp"
"github.com/bluenviron/mediamtx/internal/stream"
"github.com/bluenviron/mediamtx/internal/unit"
)
var errNoSupportedCodecs = errors.New(
"the stream doesn't contain any supported codec, which are currently H264, MPEG-4 Audio, MPEG-1/2 Audio")
func pathNameAndQuery(inURL *url.URL) (string, url.Values, string) {
// remove leading and trailing slashes inserted by OBS and some other clients
tmp := strings.TrimRight(inURL.String(), "/")
@@ -199,20 +191,9 @@ func (c *conn) runRead(conn *rtmp.Conn, u *url.URL) error {
defer stream.RemoveReader(writer)
var w *rtmp.Writer
videoFormat := c.setupVideo(
&w,
stream,
writer)
audioFormat := c.setupAudio(
&w,
stream,
writer)
if videoFormat == nil && audioFormat == nil {
return errNoSupportedCodecs
err = rtmp.FromStream(stream, writer, conn, c.nconn, time.Duration(c.writeTimeout))
if err != nil {
return err
}
c.Log(logger.Info, "is reading from path '%s', %s",
@@ -228,11 +209,6 @@ func (c *conn) runRead(conn *rtmp.Conn, u *url.URL) error {
})
defer onUnreadHook()
w, err = rtmp.NewWriter(conn, videoFormat, audioFormat)
if err != nil {
return err
}
// disable read deadline
c.nconn.SetReadDeadline(time.Time{})
@@ -248,148 +224,6 @@ func (c *conn) runRead(conn *rtmp.Conn, u *url.URL) error {
}
}
func (c *conn) setupVideo(
w **rtmp.Writer,
stream *stream.Stream,
writer *asyncwriter.Writer,
) format.Format {
var videoFormatH264 *format.H264
videoMedia := stream.Desc().FindFormat(&videoFormatH264)
if videoFormatH264 != nil {
var videoDTSExtractor *h264.DTSExtractor
stream.AddReader(writer, videoMedia, videoFormatH264, func(u unit.Unit) error {
tunit := u.(*unit.H264)
if tunit.AU == nil {
return nil
}
idrPresent := false
nonIDRPresent := false
for _, nalu := range tunit.AU {
typ := h264.NALUType(nalu[0] & 0x1F)
switch typ {
case h264.NALUTypeIDR:
idrPresent = true
case h264.NALUTypeNonIDR:
nonIDRPresent = true
}
}
var dts time.Duration
// wait until we receive an IDR
if videoDTSExtractor == nil {
if !idrPresent {
return nil
}
videoDTSExtractor = h264.NewDTSExtractor()
var err error
dts, err = videoDTSExtractor.Extract(tunit.AU, tunit.PTS)
if err != nil {
return err
}
} else {
if !idrPresent && !nonIDRPresent {
return nil
}
var err error
dts, err = videoDTSExtractor.Extract(tunit.AU, tunit.PTS)
if err != nil {
return err
}
}
c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout)))
return (*w).WriteH264(tunit.PTS, dts, idrPresent, tunit.AU)
})
return videoFormatH264
}
return nil
}
func (c *conn) setupAudio(
w **rtmp.Writer,
stream *stream.Stream,
writer *asyncwriter.Writer,
) format.Format {
var audioFormatMPEG4Audio *format.MPEG4Audio
audioMedia := stream.Desc().FindFormat(&audioFormatMPEG4Audio)
if audioMedia != nil {
stream.AddReader(writer, audioMedia, audioFormatMPEG4Audio, func(u unit.Unit) error {
tunit := u.(*unit.MPEG4Audio)
if tunit.AUs == nil {
return nil
}
for i, au := range tunit.AUs {
c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout)))
err := (*w).WriteMPEG4Audio(
tunit.PTS+time.Duration(i)*mpeg4audio.SamplesPerAccessUnit*
time.Second/time.Duration(audioFormatMPEG4Audio.ClockRate()),
au,
)
if err != nil {
return err
}
}
return nil
})
return audioFormatMPEG4Audio
}
var audioFormatMPEG1 *format.MPEG1Audio
audioMedia = stream.Desc().FindFormat(&audioFormatMPEG1)
if audioMedia != nil {
stream.AddReader(writer, audioMedia, audioFormatMPEG1, func(u unit.Unit) error {
tunit := u.(*unit.MPEG1Audio)
pts := tunit.PTS
for _, frame := range tunit.Frames {
var h mpeg1audio.FrameHeader
err := h.Unmarshal(frame)
if err != nil {
return err
}
if !(!h.MPEG2 && h.Layer == 3) {
return fmt.Errorf("RTMP only supports MPEG-1 layer 3 audio")
}
c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout)))
err = (*w).WriteMPEG1Audio(pts, &h, frame)
if err != nil {
return err
}
pts += time.Duration(h.SampleCount()) *
time.Second / time.Duration(h.SampleRate)
}
return nil
})
return audioFormatMPEG1
}
return nil
}
func (c *conn) runPublish(conn *rtmp.Conn, u *url.URL) error {
pathName, query, rawQuery := pathNameAndQuery(u)
@@ -428,123 +262,12 @@ func (c *conn) runPublish(conn *rtmp.Conn, u *url.URL) error {
if err != nil {
return err
}
videoFormat, audioFormat := r.Tracks()
var medias []*description.Media
var stream *stream.Stream
if videoFormat != nil {
videoMedia := &description.Media{
Type: description.MediaTypeVideo,
Formats: []format.Format{videoFormat},
}
medias = append(medias, videoMedia)
switch videoFormat.(type) {
case *format.AV1:
r.OnDataAV1(func(pts time.Duration, tu [][]byte) {
stream.WriteUnit(videoMedia, videoFormat, &unit.AV1{
Base: unit.Base{
NTP: time.Now(),
PTS: pts,
},
TU: tu,
})
})
case *format.VP9:
r.OnDataVP9(func(pts time.Duration, frame []byte) {
stream.WriteUnit(videoMedia, videoFormat, &unit.VP9{
Base: unit.Base{
NTP: time.Now(),
PTS: pts,
},
Frame: frame,
})
})
case *format.H265:
r.OnDataH265(func(pts time.Duration, au [][]byte) {
stream.WriteUnit(videoMedia, videoFormat, &unit.H265{
Base: unit.Base{
NTP: time.Now(),
PTS: pts,
},
AU: au,
})
})
case *format.H264:
r.OnDataH264(func(pts time.Duration, au [][]byte) {
stream.WriteUnit(videoMedia, videoFormat, &unit.H264{
Base: unit.Base{
NTP: time.Now(),
PTS: pts,
},
AU: au,
})
})
default:
return fmt.Errorf("unsupported video codec: %T", videoFormat)
}
}
if audioFormat != nil { //nolint:dupl
audioMedia := &description.Media{
Type: description.MediaTypeAudio,
Formats: []format.Format{audioFormat},
}
medias = append(medias, audioMedia)
switch audioFormat.(type) {
case *format.MPEG4Audio:
r.OnDataMPEG4Audio(func(pts time.Duration, au []byte) {
stream.WriteUnit(audioMedia, audioFormat, &unit.MPEG4Audio{
Base: unit.Base{
NTP: time.Now(),
PTS: pts,
},
AUs: [][]byte{au},
})
})
case *format.MPEG1Audio:
r.OnDataMPEG1Audio(func(pts time.Duration, frame []byte) {
stream.WriteUnit(audioMedia, audioFormat, &unit.MPEG1Audio{
Base: unit.Base{
NTP: time.Now(),
PTS: pts,
},
Frames: [][]byte{frame},
})
})
case *format.G711:
r.OnDataG711(func(pts time.Duration, samples []byte) {
stream.WriteUnit(audioMedia, audioFormat, &unit.G711{
Base: unit.Base{
NTP: time.Now(),
PTS: pts,
},
Samples: samples,
})
})
case *format.LPCM:
r.OnDataLPCM(func(pts time.Duration, samples []byte) {
stream.WriteUnit(audioMedia, audioFormat, &unit.LPCM{
Base: unit.Base{
NTP: time.Now(),
PTS: pts,
},
Samples: samples,
})
})
default:
return fmt.Errorf("unsupported audio codec: %T", audioFormat)
}
medias, err := rtmp.ToStream(r, &stream)
if err != nil {
return err
}
stream, err = path.StartPublisher(defs.PathStartPublisherReq{

View File

@@ -4,13 +4,11 @@ package rtmp
import (
"context"
ctls "crypto/tls"
"fmt"
"net"
"net/url"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/defs"
@@ -18,7 +16,6 @@ import (
"github.com/bluenviron/mediamtx/internal/protocols/rtmp"
"github.com/bluenviron/mediamtx/internal/protocols/tls"
"github.com/bluenviron/mediamtx/internal/stream"
"github.com/bluenviron/mediamtx/internal/unit"
)
// Source is a RTMP static source.
@@ -97,73 +94,16 @@ func (s *Source) runReader(u *url.URL, nconn net.Conn) error {
return err
}
mc, err := rtmp.NewReader(conn)
r, err := rtmp.NewReader(conn)
if err != nil {
return err
}
videoFormat, audioFormat := mc.Tracks()
var medias []*description.Media
var stream *stream.Stream
if videoFormat != nil {
videoMedia := &description.Media{
Type: description.MediaTypeVideo,
Formats: []format.Format{videoFormat},
}
medias = append(medias, videoMedia)
switch videoFormat.(type) {
case *format.H264:
mc.OnDataH264(func(pts time.Duration, au [][]byte) {
stream.WriteUnit(videoMedia, videoFormat, &unit.H264{
Base: unit.Base{
NTP: time.Now(),
PTS: pts,
},
AU: au,
})
})
default:
return fmt.Errorf("unsupported video codec: %T", videoFormat)
}
}
if audioFormat != nil { //nolint:dupl
audioMedia := &description.Media{
Type: description.MediaTypeAudio,
Formats: []format.Format{audioFormat},
}
medias = append(medias, audioMedia)
switch audioFormat.(type) {
case *format.MPEG4Audio:
mc.OnDataMPEG4Audio(func(pts time.Duration, au []byte) {
stream.WriteUnit(audioMedia, audioFormat, &unit.MPEG4Audio{
Base: unit.Base{
NTP: time.Now(),
PTS: pts,
},
AUs: [][]byte{au},
})
})
case *format.MPEG1Audio:
mc.OnDataMPEG1Audio(func(pts time.Duration, frame []byte) {
stream.WriteUnit(audioMedia, audioFormat, &unit.MPEG1Audio{
Base: unit.Base{
NTP: time.Now(),
PTS: pts,
},
Frames: [][]byte{frame},
})
})
default:
return fmt.Errorf("unsupported audio codec: %T", audioFormat)
}
medias, err := rtmp.ToStream(r, &stream)
if err != nil {
return err
}
res := s.Parent.SetReady(defs.PathSourceStaticSetReadyReq{
@@ -183,7 +123,7 @@ func (s *Source) runReader(u *url.URL, nconn net.Conn) error {
for {
nconn.SetReadDeadline(time.Now().Add(time.Duration(s.ReadTimeout)))
err := mc.Read()
err := r.Read()
if err != nil {
return err
}