Files
rtsp-simple-server/internal/stream/stream_format.go
Alessandro Ros cd80814009 do not include recorder and HLS muxer in sent bytes (#4380) (#5039)
in API (/paths/list, /paths/get) and metrics (paths_bytes_sent), the
amount of sent bytes was increased even in case of writes to the
recorder and HLS muxer, which are not generating network traffic. This
fixes the issue.
2025-09-29 09:28:04 +02:00

104 lines
2.2 KiB
Go

package stream
import (
"sync/atomic"
"time"
"github.com/bluenviron/gortsplib/v5/pkg/description"
"github.com/bluenviron/gortsplib/v5/pkg/format"
"github.com/pion/rtp"
"github.com/bluenviron/mediamtx/internal/codecprocessor"
"github.com/bluenviron/mediamtx/internal/counterdumper"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/unit"
)
func unitSize(u unit.Unit) uint64 {
n := uint64(0)
for _, pkt := range u.GetRTPPackets() {
n += uint64(pkt.MarshalSize())
}
return n
}
type streamFormat struct {
rtpMaxPayloadSize int
format format.Format
generateRTPPackets bool
processingErrors *counterdumper.CounterDumper
parent logger.Writer
proc codecprocessor.Processor
onDatas map[*Reader]OnDataFunc
}
func (sf *streamFormat) initialize() error {
sf.onDatas = make(map[*Reader]OnDataFunc)
var err error
sf.proc, err = codecprocessor.New(sf.rtpMaxPayloadSize, sf.format, sf.generateRTPPackets, sf.parent)
if err != nil {
return err
}
return nil
}
func (sf *streamFormat) writeUnit(s *Stream, medi *description.Media, u unit.Unit) {
err := sf.proc.ProcessUnit(u)
if err != nil {
sf.processingErrors.Increase()
return
}
sf.writeUnitInner(s, medi, u)
}
func (sf *streamFormat) writeRTPPacket(
s *Stream,
medi *description.Media,
pkt *rtp.Packet,
ntp time.Time,
pts int64,
) {
hasNonRTSPReaders := len(sf.onDatas) > 0
u, err := sf.proc.ProcessRTPPacket(pkt, ntp, pts, hasNonRTSPReaders)
if err != nil {
sf.processingErrors.Increase()
return
}
sf.writeUnitInner(s, medi, u)
}
func (sf *streamFormat) writeUnitInner(s *Stream, medi *description.Media, u unit.Unit) {
size := unitSize(u)
atomic.AddUint64(s.bytesReceived, size)
if s.rtspStream != nil {
for _, pkt := range u.GetRTPPackets() {
s.rtspStream.WritePacketRTPWithNTP(medi, pkt, u.GetNTP()) //nolint:errcheck
}
}
if s.rtspsStream != nil {
for _, pkt := range u.GetRTPPackets() {
s.rtspsStream.WritePacketRTPWithNTP(medi, pkt, u.GetNTP()) //nolint:errcheck
}
}
for sr, onData := range sf.onDatas {
csr := sr
cOnData := onData
sr.push(func() error {
if !csr.SkipBytesSent {
atomic.AddUint64(s.bytesSent, size)
}
return cOnData(u)
})
}
}