mirror of
https://github.com/aler9/rtsp-simple-server
synced 2025-09-27 03:56:15 +08:00
280 lines
6.0 KiB
Go
280 lines
6.0 KiB
Go
// Package stream contains the Stream object.
|
|
package stream
|
|
|
|
import (
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/bluenviron/gortsplib/v5"
|
|
"github.com/bluenviron/gortsplib/v5/pkg/description"
|
|
"github.com/bluenviron/gortsplib/v5/pkg/format"
|
|
"github.com/pion/rtp"
|
|
|
|
"github.com/bluenviron/mediamtx/internal/counterdumper"
|
|
"github.com/bluenviron/mediamtx/internal/logger"
|
|
"github.com/bluenviron/mediamtx/internal/unit"
|
|
)
|
|
|
|
// Reader is a stream reader.
|
|
type Reader interface {
|
|
logger.Writer
|
|
}
|
|
|
|
// ReadFunc is the callback passed to AddReader().
|
|
type ReadFunc func(unit.Unit) error
|
|
|
|
// Stream is a media stream.
|
|
// It stores tracks, readers and allows to write data to readers, converting it when needed.
|
|
type Stream struct {
|
|
WriteQueueSize int
|
|
RTPMaxPayloadSize int
|
|
Desc *description.Session
|
|
GenerateRTPPackets bool
|
|
Parent logger.Writer
|
|
|
|
bytesReceived *uint64
|
|
bytesSent *uint64
|
|
streamMedias map[*description.Media]*streamMedia
|
|
mutex sync.RWMutex
|
|
rtspStream *gortsplib.ServerStream
|
|
rtspsStream *gortsplib.ServerStream
|
|
streamReaders map[Reader]*streamReader
|
|
processingErrors *counterdumper.CounterDumper
|
|
|
|
readerRunning chan struct{}
|
|
}
|
|
|
|
// Initialize initializes a Stream.
|
|
func (s *Stream) Initialize() error {
|
|
s.bytesReceived = new(uint64)
|
|
s.bytesSent = new(uint64)
|
|
s.streamMedias = make(map[*description.Media]*streamMedia)
|
|
s.streamReaders = make(map[Reader]*streamReader)
|
|
s.readerRunning = make(chan struct{})
|
|
|
|
s.processingErrors = &counterdumper.CounterDumper{
|
|
OnReport: func(val uint64) {
|
|
s.Parent.Log(logger.Warn, "%d processing %s",
|
|
val,
|
|
func() string {
|
|
if val == 1 {
|
|
return "error"
|
|
}
|
|
return "errors"
|
|
}())
|
|
},
|
|
}
|
|
s.processingErrors.Start()
|
|
|
|
for _, media := range s.Desc.Medias {
|
|
s.streamMedias[media] = &streamMedia{
|
|
rtpMaxPayloadSize: s.RTPMaxPayloadSize,
|
|
media: media,
|
|
generateRTPPackets: s.GenerateRTPPackets,
|
|
processingErrors: s.processingErrors,
|
|
parent: s.Parent,
|
|
}
|
|
err := s.streamMedias[media].initialize()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Close closes all resources of the stream.
|
|
func (s *Stream) Close() {
|
|
s.processingErrors.Stop()
|
|
|
|
if s.rtspStream != nil {
|
|
s.rtspStream.Close()
|
|
}
|
|
if s.rtspsStream != nil {
|
|
s.rtspsStream.Close()
|
|
}
|
|
}
|
|
|
|
// BytesReceived returns received bytes.
|
|
func (s *Stream) BytesReceived() uint64 {
|
|
return atomic.LoadUint64(s.bytesReceived)
|
|
}
|
|
|
|
// BytesSent returns sent bytes.
|
|
func (s *Stream) BytesSent() uint64 {
|
|
s.mutex.RLock()
|
|
defer s.mutex.RUnlock()
|
|
|
|
bytesSent := atomic.LoadUint64(s.bytesSent)
|
|
if s.rtspStream != nil {
|
|
stats := s.rtspStream.Stats()
|
|
bytesSent += stats.BytesSent
|
|
}
|
|
if s.rtspsStream != nil {
|
|
stats := s.rtspsStream.Stats()
|
|
bytesSent += stats.BytesSent
|
|
}
|
|
return bytesSent
|
|
}
|
|
|
|
// RTSPStream returns the RTSP stream.
|
|
func (s *Stream) RTSPStream(server *gortsplib.Server) *gortsplib.ServerStream {
|
|
s.mutex.Lock()
|
|
defer s.mutex.Unlock()
|
|
|
|
if s.rtspStream == nil {
|
|
s.rtspStream = &gortsplib.ServerStream{
|
|
Server: server,
|
|
Desc: s.Desc,
|
|
}
|
|
err := s.rtspStream.Initialize()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
return s.rtspStream
|
|
}
|
|
|
|
// RTSPSStream returns the RTSPS stream.
|
|
func (s *Stream) RTSPSStream(server *gortsplib.Server) *gortsplib.ServerStream {
|
|
s.mutex.Lock()
|
|
defer s.mutex.Unlock()
|
|
|
|
if s.rtspsStream == nil {
|
|
s.rtspsStream = &gortsplib.ServerStream{
|
|
Server: server,
|
|
Desc: s.Desc,
|
|
}
|
|
err := s.rtspsStream.Initialize()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
return s.rtspsStream
|
|
}
|
|
|
|
// AddReader adds a reader.
|
|
// Used by all protocols except RTSP.
|
|
func (s *Stream) AddReader(reader Reader, medi *description.Media, forma format.Format, cb ReadFunc) {
|
|
s.mutex.Lock()
|
|
defer s.mutex.Unlock()
|
|
|
|
sr, ok := s.streamReaders[reader]
|
|
if !ok {
|
|
sr = &streamReader{
|
|
queueSize: s.WriteQueueSize,
|
|
parent: reader,
|
|
}
|
|
sr.initialize()
|
|
|
|
s.streamReaders[reader] = sr
|
|
}
|
|
|
|
sm := s.streamMedias[medi]
|
|
sf := sm.formats[forma]
|
|
sf.addReader(sr, cb)
|
|
}
|
|
|
|
// RemoveReader removes a reader.
|
|
// Used by all protocols except RTSP.
|
|
func (s *Stream) RemoveReader(reader Reader) {
|
|
s.mutex.Lock()
|
|
defer s.mutex.Unlock()
|
|
|
|
sr := s.streamReaders[reader]
|
|
|
|
for _, sm := range s.streamMedias {
|
|
for _, sf := range sm.formats {
|
|
sf.removeReader(sr)
|
|
}
|
|
}
|
|
|
|
delete(s.streamReaders, reader)
|
|
|
|
sr.stop()
|
|
}
|
|
|
|
// StartReader starts a reader.
|
|
// Used by all protocols except RTSP.
|
|
func (s *Stream) StartReader(reader Reader) {
|
|
s.mutex.Lock()
|
|
defer s.mutex.Unlock()
|
|
|
|
sr := s.streamReaders[reader]
|
|
|
|
sr.start()
|
|
|
|
for _, sm := range s.streamMedias {
|
|
for _, sf := range sm.formats {
|
|
sf.startReader(sr)
|
|
}
|
|
}
|
|
|
|
select {
|
|
case <-s.readerRunning:
|
|
default:
|
|
close(s.readerRunning)
|
|
}
|
|
}
|
|
|
|
// ReaderError returns whenever there's an error.
|
|
func (s *Stream) ReaderError(reader Reader) chan error {
|
|
sr := s.streamReaders[reader]
|
|
return sr.error()
|
|
}
|
|
|
|
// ReaderFormats returns all formats that a reader is reading.
|
|
func (s *Stream) ReaderFormats(reader Reader) []format.Format {
|
|
s.mutex.RLock()
|
|
defer s.mutex.RUnlock()
|
|
|
|
sr := s.streamReaders[reader]
|
|
var formats []format.Format
|
|
|
|
for _, sm := range s.streamMedias {
|
|
for forma, sf := range sm.formats {
|
|
if _, ok := sf.pausedReaders[sr]; ok {
|
|
formats = append(formats, forma)
|
|
} else if _, ok = sf.runningReaders[sr]; ok {
|
|
formats = append(formats, forma)
|
|
}
|
|
}
|
|
}
|
|
|
|
return formats
|
|
}
|
|
|
|
// WaitRunningReader waits for a running reader.
|
|
func (s *Stream) WaitRunningReader() {
|
|
<-s.readerRunning
|
|
}
|
|
|
|
// WriteUnit writes a Unit.
|
|
func (s *Stream) WriteUnit(medi *description.Media, forma format.Format, u unit.Unit) {
|
|
sm := s.streamMedias[medi]
|
|
sf := sm.formats[forma]
|
|
|
|
s.mutex.RLock()
|
|
defer s.mutex.RUnlock()
|
|
|
|
sf.writeUnit(s, medi, u)
|
|
}
|
|
|
|
// WriteRTPPacket writes a RTP packet.
|
|
func (s *Stream) WriteRTPPacket(
|
|
medi *description.Media,
|
|
forma format.Format,
|
|
pkt *rtp.Packet,
|
|
ntp time.Time,
|
|
pts int64,
|
|
) {
|
|
sm := s.streamMedias[medi]
|
|
sf := sm.formats[forma]
|
|
|
|
s.mutex.RLock()
|
|
defer s.mutex.RUnlock()
|
|
|
|
sf.writeRTPPacket(s, medi, pkt, ntp, pts)
|
|
}
|