mirror of
https://github.com/aler9/rtsp-simple-server
synced 2025-09-27 03:56:15 +08:00
87 lines
1.4 KiB
Go
87 lines
1.4 KiB
Go
package stream
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
"github.com/bluenviron/gortsplib/v5/pkg/ringbuffer"
|
|
"github.com/bluenviron/mediamtx/internal/counterdumper"
|
|
"github.com/bluenviron/mediamtx/internal/logger"
|
|
)
|
|
|
|
type streamReader struct {
|
|
queueSize int
|
|
parent logger.Writer
|
|
|
|
buffer *ringbuffer.RingBuffer
|
|
started bool
|
|
discardedFrames *counterdumper.CounterDumper
|
|
|
|
// out
|
|
err chan error
|
|
}
|
|
|
|
func (w *streamReader) initialize() {
|
|
buffer, _ := ringbuffer.New(uint64(w.queueSize))
|
|
w.buffer = buffer
|
|
w.err = make(chan error)
|
|
}
|
|
|
|
func (w *streamReader) start() {
|
|
w.started = true
|
|
|
|
w.discardedFrames = &counterdumper.CounterDumper{
|
|
OnReport: func(val uint64) {
|
|
w.parent.Log(logger.Warn, "reader is too slow, discarding %d %s",
|
|
val,
|
|
func() string {
|
|
if val == 1 {
|
|
return "frame"
|
|
}
|
|
return "frames"
|
|
}())
|
|
},
|
|
}
|
|
w.discardedFrames.Start()
|
|
|
|
go w.run()
|
|
}
|
|
|
|
func (w *streamReader) stop() {
|
|
w.buffer.Close()
|
|
|
|
if w.started {
|
|
w.discardedFrames.Stop()
|
|
<-w.err
|
|
}
|
|
}
|
|
|
|
func (w *streamReader) error() chan error {
|
|
return w.err
|
|
}
|
|
|
|
func (w *streamReader) run() {
|
|
w.err <- w.runInner()
|
|
close(w.err)
|
|
}
|
|
|
|
func (w *streamReader) runInner() error {
|
|
for {
|
|
cb, ok := w.buffer.Pull()
|
|
if !ok {
|
|
return fmt.Errorf("terminated")
|
|
}
|
|
|
|
err := cb.(func() error)()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
func (w *streamReader) push(cb func() error) {
|
|
ok := w.buffer.Push(cb)
|
|
if !ok {
|
|
w.discardedFrames.Increase()
|
|
}
|
|
}
|