mirror of
https://github.com/aler9/rtsp-simple-server
synced 2025-10-06 16:17:59 +08:00

in API (/paths/list, /paths/get) and metrics (paths_bytes_sent), the amount of sent bytes was increased even in case of writes to the recorder and HLS muxer, which are not generating network traffic. This fixes the issue.
432 lines
9.5 KiB
Go
432 lines
9.5 KiB
Go
package mpegts
|
|
|
|
import (
|
|
"bufio"
|
|
"fmt"
|
|
"slices"
|
|
"time"
|
|
|
|
"github.com/bluenviron/gortsplib/v5/pkg/description"
|
|
"github.com/bluenviron/gortsplib/v5/pkg/format"
|
|
"github.com/bluenviron/mediacommon/v2/pkg/codecs/ac3"
|
|
"github.com/bluenviron/mediacommon/v2/pkg/codecs/h264"
|
|
"github.com/bluenviron/mediacommon/v2/pkg/codecs/h265"
|
|
"github.com/bluenviron/mediacommon/v2/pkg/codecs/mpeg4audio"
|
|
mcmpegts "github.com/bluenviron/mediacommon/v2/pkg/formats/mpegts"
|
|
srt "github.com/datarhei/gosrt"
|
|
|
|
"github.com/bluenviron/mediamtx/internal/logger"
|
|
"github.com/bluenviron/mediamtx/internal/stream"
|
|
"github.com/bluenviron/mediamtx/internal/unit"
|
|
)
|
|
|
|
func multiplyAndDivide(v, m, d int64) int64 {
|
|
secs := v / d
|
|
dec := v % d
|
|
return (secs*m + dec*m/d)
|
|
}
|
|
|
|
// FromStream maps a MediaMTX stream to a MPEG-TS writer.
|
|
func FromStream(
|
|
desc *description.Session,
|
|
r *stream.Reader,
|
|
bw *bufio.Writer,
|
|
sconn srt.Conn,
|
|
writeTimeout time.Duration,
|
|
) error {
|
|
var w *mcmpegts.Writer
|
|
var tracks []*mcmpegts.Track
|
|
|
|
addTrack := func(
|
|
media *description.Media,
|
|
forma format.Format,
|
|
track *mcmpegts.Track,
|
|
onData stream.OnDataFunc,
|
|
) {
|
|
tracks = append(tracks, track)
|
|
r.OnData(media, forma, onData)
|
|
}
|
|
|
|
for _, media := range desc.Medias {
|
|
for _, forma := range media.Formats {
|
|
clockRate := forma.ClockRate()
|
|
|
|
switch forma := forma.(type) {
|
|
case *format.H265: //nolint:dupl
|
|
track := &mcmpegts.Track{Codec: &mcmpegts.CodecH265{}}
|
|
|
|
var dtsExtractor *h265.DTSExtractor
|
|
|
|
addTrack(
|
|
media,
|
|
forma,
|
|
track,
|
|
func(u unit.Unit) error {
|
|
tunit := u.(*unit.H265)
|
|
|
|
if tunit.AU == nil {
|
|
return nil
|
|
}
|
|
|
|
if dtsExtractor == nil {
|
|
if !h265.IsRandomAccess(tunit.AU) {
|
|
return nil
|
|
}
|
|
dtsExtractor = &h265.DTSExtractor{}
|
|
dtsExtractor.Initialize()
|
|
}
|
|
|
|
dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
sconn.SetWriteDeadline(time.Now().Add(writeTimeout))
|
|
err = (*w).WriteH265(
|
|
track,
|
|
tunit.PTS, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP
|
|
dts,
|
|
tunit.AU)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return bw.Flush()
|
|
})
|
|
|
|
case *format.H264: //nolint:dupl
|
|
track := &mcmpegts.Track{Codec: &mcmpegts.CodecH264{}}
|
|
|
|
var dtsExtractor *h264.DTSExtractor
|
|
|
|
addTrack(
|
|
media,
|
|
forma,
|
|
track,
|
|
func(u unit.Unit) error {
|
|
tunit := u.(*unit.H264)
|
|
|
|
if tunit.AU == nil {
|
|
return nil
|
|
}
|
|
|
|
idrPresent := h264.IsRandomAccess(tunit.AU)
|
|
|
|
if dtsExtractor == nil {
|
|
if !idrPresent {
|
|
return nil
|
|
}
|
|
dtsExtractor = &h264.DTSExtractor{}
|
|
dtsExtractor.Initialize()
|
|
}
|
|
|
|
dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
sconn.SetWriteDeadline(time.Now().Add(writeTimeout))
|
|
err = (*w).WriteH264(
|
|
track,
|
|
tunit.PTS, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP
|
|
dts,
|
|
tunit.AU)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return bw.Flush()
|
|
})
|
|
|
|
case *format.MPEG4Video:
|
|
track := &mcmpegts.Track{Codec: &mcmpegts.CodecMPEG4Video{}}
|
|
|
|
firstReceived := false
|
|
var lastPTS int64
|
|
|
|
addTrack(
|
|
media,
|
|
forma,
|
|
track,
|
|
func(u unit.Unit) error {
|
|
tunit := u.(*unit.MPEG4Video)
|
|
|
|
if tunit.Frame == nil {
|
|
return nil
|
|
}
|
|
|
|
if !firstReceived {
|
|
firstReceived = true
|
|
} else if tunit.PTS < lastPTS {
|
|
return fmt.Errorf("MPEG-4 Video streams with B-frames are not supported (yet)")
|
|
}
|
|
lastPTS = tunit.PTS
|
|
|
|
sconn.SetWriteDeadline(time.Now().Add(writeTimeout))
|
|
err := (*w).WriteMPEG4Video(
|
|
track,
|
|
tunit.PTS, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP
|
|
tunit.Frame)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return bw.Flush()
|
|
})
|
|
|
|
case *format.MPEG1Video:
|
|
track := &mcmpegts.Track{Codec: &mcmpegts.CodecMPEG1Video{}}
|
|
|
|
firstReceived := false
|
|
var lastPTS int64
|
|
|
|
addTrack(
|
|
media,
|
|
forma,
|
|
track,
|
|
func(u unit.Unit) error {
|
|
tunit := u.(*unit.MPEG1Video)
|
|
|
|
if tunit.Frame == nil {
|
|
return nil
|
|
}
|
|
|
|
if !firstReceived {
|
|
firstReceived = true
|
|
} else if tunit.PTS < lastPTS {
|
|
return fmt.Errorf("MPEG-1 Video streams with B-frames are not supported (yet)")
|
|
}
|
|
lastPTS = tunit.PTS
|
|
|
|
sconn.SetWriteDeadline(time.Now().Add(writeTimeout))
|
|
err := (*w).WriteMPEG1Video(
|
|
track,
|
|
tunit.PTS, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP
|
|
tunit.Frame)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return bw.Flush()
|
|
})
|
|
|
|
case *format.Opus:
|
|
track := &mcmpegts.Track{Codec: &mcmpegts.CodecOpus{
|
|
ChannelCount: forma.ChannelCount,
|
|
}}
|
|
|
|
addTrack(
|
|
media,
|
|
forma,
|
|
track,
|
|
func(u unit.Unit) error {
|
|
tunit := u.(*unit.Opus)
|
|
|
|
if tunit.Packets == nil {
|
|
return nil
|
|
}
|
|
|
|
sconn.SetWriteDeadline(time.Now().Add(writeTimeout))
|
|
err := (*w).WriteOpus(
|
|
track,
|
|
multiplyAndDivide(tunit.PTS, 90000, int64(clockRate)),
|
|
tunit.Packets)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return bw.Flush()
|
|
})
|
|
|
|
case *format.KLV:
|
|
track := &mcmpegts.Track{
|
|
Codec: &mcmpegts.CodecKLV{
|
|
Synchronous: true,
|
|
},
|
|
}
|
|
|
|
addTrack(
|
|
media,
|
|
forma,
|
|
track,
|
|
func(u unit.Unit) error {
|
|
tunit := u.(*unit.KLV)
|
|
|
|
if tunit.Unit == nil {
|
|
return nil
|
|
}
|
|
|
|
sconn.SetWriteDeadline(time.Now().Add(writeTimeout))
|
|
err := (*w).WriteKLV(track, multiplyAndDivide(tunit.PTS, 90000, 90000), tunit.Unit)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return bw.Flush()
|
|
})
|
|
|
|
case *format.MPEG4Audio:
|
|
track := &mcmpegts.Track{Codec: &mcmpegts.CodecMPEG4Audio{
|
|
Config: *forma.Config,
|
|
}}
|
|
|
|
addTrack(
|
|
media,
|
|
forma,
|
|
track,
|
|
func(u unit.Unit) error {
|
|
tunit := u.(*unit.MPEG4Audio)
|
|
|
|
if tunit.AUs == nil {
|
|
return nil
|
|
}
|
|
|
|
sconn.SetWriteDeadline(time.Now().Add(writeTimeout))
|
|
err := (*w).WriteMPEG4Audio(
|
|
track,
|
|
multiplyAndDivide(tunit.PTS, 90000, int64(clockRate)),
|
|
tunit.AUs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return bw.Flush()
|
|
})
|
|
|
|
case *format.MPEG4AudioLATM:
|
|
track := &mcmpegts.Track{Codec: &mcmpegts.CodecMPEG4AudioLATM{}}
|
|
|
|
if !forma.CPresent {
|
|
addTrack(
|
|
media,
|
|
forma,
|
|
track,
|
|
func(u unit.Unit) error {
|
|
tunit := u.(*unit.MPEG4AudioLATM)
|
|
|
|
if tunit.Element == nil {
|
|
return nil
|
|
}
|
|
|
|
var elIn mpeg4audio.AudioMuxElement
|
|
elIn.MuxConfigPresent = false
|
|
elIn.StreamMuxConfig = forma.StreamMuxConfig
|
|
err := elIn.Unmarshal(tunit.Element)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var elOut mpeg4audio.AudioMuxElement
|
|
elOut.MuxConfigPresent = true
|
|
elOut.StreamMuxConfig = forma.StreamMuxConfig
|
|
elOut.UseSameStreamMux = false
|
|
elOut.Payloads = elIn.Payloads
|
|
buf, err := elOut.Marshal()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
sconn.SetWriteDeadline(time.Now().Add(writeTimeout))
|
|
err = (*w).WriteMPEG4AudioLATM(
|
|
track,
|
|
multiplyAndDivide(tunit.PTS, 90000, int64(clockRate)),
|
|
[][]byte{buf})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return bw.Flush()
|
|
})
|
|
} else {
|
|
addTrack(
|
|
media,
|
|
forma,
|
|
track,
|
|
func(u unit.Unit) error {
|
|
tunit := u.(*unit.MPEG4AudioLATM)
|
|
|
|
if tunit.Element == nil {
|
|
return nil
|
|
}
|
|
|
|
sconn.SetWriteDeadline(time.Now().Add(writeTimeout))
|
|
err := (*w).WriteMPEG4AudioLATM(
|
|
track,
|
|
multiplyAndDivide(tunit.PTS, 90000, int64(clockRate)),
|
|
[][]byte{tunit.Element})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return bw.Flush()
|
|
})
|
|
}
|
|
|
|
case *format.MPEG1Audio:
|
|
track := &mcmpegts.Track{Codec: &mcmpegts.CodecMPEG1Audio{}}
|
|
|
|
addTrack(
|
|
media,
|
|
forma,
|
|
track,
|
|
func(u unit.Unit) error {
|
|
tunit := u.(*unit.MPEG1Audio)
|
|
|
|
if tunit.Frames == nil {
|
|
return nil
|
|
}
|
|
|
|
sconn.SetWriteDeadline(time.Now().Add(writeTimeout))
|
|
err := (*w).WriteMPEG1Audio(
|
|
track,
|
|
tunit.PTS, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP
|
|
tunit.Frames)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return bw.Flush()
|
|
})
|
|
|
|
case *format.AC3:
|
|
track := &mcmpegts.Track{Codec: &mcmpegts.CodecAC3{}}
|
|
|
|
addTrack(
|
|
media,
|
|
forma,
|
|
track,
|
|
func(u unit.Unit) error {
|
|
tunit := u.(*unit.AC3)
|
|
|
|
if tunit.Frames == nil {
|
|
return nil
|
|
}
|
|
|
|
for i, frame := range tunit.Frames {
|
|
framePTS := tunit.PTS + int64(i)*ac3.SamplesPerFrame
|
|
|
|
sconn.SetWriteDeadline(time.Now().Add(writeTimeout))
|
|
err := (*w).WriteAC3(
|
|
track,
|
|
multiplyAndDivide(framePTS, 90000, int64(clockRate)),
|
|
frame)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return bw.Flush()
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(tracks) == 0 {
|
|
return errNoSupportedCodecs
|
|
}
|
|
|
|
setuppedFormats := r.Formats()
|
|
|
|
n := 1
|
|
for _, medi := range desc.Medias {
|
|
for _, forma := range medi.Formats {
|
|
if !slices.Contains(setuppedFormats, forma) {
|
|
r.Parent.Log(logger.Warn, "skipping track %d (%s)", n, forma.Codec())
|
|
}
|
|
n++
|
|
}
|
|
}
|
|
|
|
w = &mcmpegts.Writer{W: bw, Tracks: tracks}
|
|
return w.Initialize()
|
|
}
|