Files
2025-09-16 13:10:34 +02:00

280 lines
6.0 KiB
Go

// Package stream contains the Stream object.
package stream
import (
"sync"
"sync/atomic"
"time"
"github.com/bluenviron/gortsplib/v5"
"github.com/bluenviron/gortsplib/v5/pkg/description"
"github.com/bluenviron/gortsplib/v5/pkg/format"
"github.com/pion/rtp"
"github.com/bluenviron/mediamtx/internal/counterdumper"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/unit"
)
// Reader is a stream reader.
type Reader interface {
logger.Writer
}
// ReadFunc is the callback passed to AddReader().
type ReadFunc func(unit.Unit) error
// Stream is a media stream.
// It stores tracks, readers and allows to write data to readers, converting it when needed.
type Stream struct {
WriteQueueSize int
RTPMaxPayloadSize int
Desc *description.Session
GenerateRTPPackets bool
Parent logger.Writer
bytesReceived *uint64
bytesSent *uint64
streamMedias map[*description.Media]*streamMedia
mutex sync.RWMutex
rtspStream *gortsplib.ServerStream
rtspsStream *gortsplib.ServerStream
streamReaders map[Reader]*streamReader
processingErrors *counterdumper.CounterDumper
readerRunning chan struct{}
}
// Initialize initializes a Stream.
func (s *Stream) Initialize() error {
s.bytesReceived = new(uint64)
s.bytesSent = new(uint64)
s.streamMedias = make(map[*description.Media]*streamMedia)
s.streamReaders = make(map[Reader]*streamReader)
s.readerRunning = make(chan struct{})
s.processingErrors = &counterdumper.CounterDumper{
OnReport: func(val uint64) {
s.Parent.Log(logger.Warn, "%d processing %s",
val,
func() string {
if val == 1 {
return "error"
}
return "errors"
}())
},
}
s.processingErrors.Start()
for _, media := range s.Desc.Medias {
s.streamMedias[media] = &streamMedia{
rtpMaxPayloadSize: s.RTPMaxPayloadSize,
media: media,
generateRTPPackets: s.GenerateRTPPackets,
processingErrors: s.processingErrors,
parent: s.Parent,
}
err := s.streamMedias[media].initialize()
if err != nil {
return err
}
}
return nil
}
// Close closes all resources of the stream.
func (s *Stream) Close() {
s.processingErrors.Stop()
if s.rtspStream != nil {
s.rtspStream.Close()
}
if s.rtspsStream != nil {
s.rtspsStream.Close()
}
}
// BytesReceived returns received bytes.
func (s *Stream) BytesReceived() uint64 {
return atomic.LoadUint64(s.bytesReceived)
}
// BytesSent returns sent bytes.
func (s *Stream) BytesSent() uint64 {
s.mutex.RLock()
defer s.mutex.RUnlock()
bytesSent := atomic.LoadUint64(s.bytesSent)
if s.rtspStream != nil {
stats := s.rtspStream.Stats()
bytesSent += stats.BytesSent
}
if s.rtspsStream != nil {
stats := s.rtspsStream.Stats()
bytesSent += stats.BytesSent
}
return bytesSent
}
// RTSPStream returns the RTSP stream.
func (s *Stream) RTSPStream(server *gortsplib.Server) *gortsplib.ServerStream {
s.mutex.Lock()
defer s.mutex.Unlock()
if s.rtspStream == nil {
s.rtspStream = &gortsplib.ServerStream{
Server: server,
Desc: s.Desc,
}
err := s.rtspStream.Initialize()
if err != nil {
panic(err)
}
}
return s.rtspStream
}
// RTSPSStream returns the RTSPS stream.
func (s *Stream) RTSPSStream(server *gortsplib.Server) *gortsplib.ServerStream {
s.mutex.Lock()
defer s.mutex.Unlock()
if s.rtspsStream == nil {
s.rtspsStream = &gortsplib.ServerStream{
Server: server,
Desc: s.Desc,
}
err := s.rtspsStream.Initialize()
if err != nil {
panic(err)
}
}
return s.rtspsStream
}
// AddReader adds a reader.
// Used by all protocols except RTSP.
func (s *Stream) AddReader(reader Reader, medi *description.Media, forma format.Format, cb ReadFunc) {
s.mutex.Lock()
defer s.mutex.Unlock()
sr, ok := s.streamReaders[reader]
if !ok {
sr = &streamReader{
queueSize: s.WriteQueueSize,
parent: reader,
}
sr.initialize()
s.streamReaders[reader] = sr
}
sm := s.streamMedias[medi]
sf := sm.formats[forma]
sf.addReader(sr, cb)
}
// RemoveReader removes a reader.
// Used by all protocols except RTSP.
func (s *Stream) RemoveReader(reader Reader) {
s.mutex.Lock()
defer s.mutex.Unlock()
sr := s.streamReaders[reader]
for _, sm := range s.streamMedias {
for _, sf := range sm.formats {
sf.removeReader(sr)
}
}
delete(s.streamReaders, reader)
sr.stop()
}
// StartReader starts a reader.
// Used by all protocols except RTSP.
func (s *Stream) StartReader(reader Reader) {
s.mutex.Lock()
defer s.mutex.Unlock()
sr := s.streamReaders[reader]
sr.start()
for _, sm := range s.streamMedias {
for _, sf := range sm.formats {
sf.startReader(sr)
}
}
select {
case <-s.readerRunning:
default:
close(s.readerRunning)
}
}
// ReaderError returns whenever there's an error.
func (s *Stream) ReaderError(reader Reader) chan error {
sr := s.streamReaders[reader]
return sr.error()
}
// ReaderFormats returns all formats that a reader is reading.
func (s *Stream) ReaderFormats(reader Reader) []format.Format {
s.mutex.RLock()
defer s.mutex.RUnlock()
sr := s.streamReaders[reader]
var formats []format.Format
for _, sm := range s.streamMedias {
for forma, sf := range sm.formats {
if _, ok := sf.pausedReaders[sr]; ok {
formats = append(formats, forma)
} else if _, ok = sf.runningReaders[sr]; ok {
formats = append(formats, forma)
}
}
}
return formats
}
// WaitRunningReader waits for a running reader.
func (s *Stream) WaitRunningReader() {
<-s.readerRunning
}
// WriteUnit writes a Unit.
func (s *Stream) WriteUnit(medi *description.Media, forma format.Format, u unit.Unit) {
sm := s.streamMedias[medi]
sf := sm.formats[forma]
s.mutex.RLock()
defer s.mutex.RUnlock()
sf.writeUnit(s, medi, u)
}
// WriteRTPPacket writes a RTP packet.
func (s *Stream) WriteRTPPacket(
medi *description.Media,
forma format.Format,
pkt *rtp.Packet,
ntp time.Time,
pts int64,
) {
sm := s.streamMedias[medi]
sf := sm.formats[forma]
s.mutex.RLock()
defer s.mutex.RUnlock()
sf.writeRTPPacket(s, medi, pkt, ntp, pts)
}