remove redundant definitions (#4793)

This commit is contained in:
Alessandro Ros
2025-07-27 12:01:52 +02:00
committed by GitHub
parent b5c847bad6
commit b59c37ad4b
19 changed files with 328 additions and 245 deletions

View File

@@ -12,9 +12,9 @@ import (
// Source is an entity that can provide a stream.
// it can be:
// - publisher
// - Publisher
// - staticsources.Handler
// - redirectSource
// - core.sourceRedirect
type Source interface {
logger.Writer
APISourceDescribe() APIPathSourceOrReader

View File

@@ -4,23 +4,8 @@ import (
"context"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/logger"
)
// StaticSource is a static source.
type StaticSource interface {
logger.Writer
Run(StaticSourceRunParams) error
APISourceDescribe() APIPathSourceOrReader
}
// StaticSourceParent is the parent of a static source.
type StaticSourceParent interface {
logger.Writer
SetReady(req PathSourceStaticSetReadyReq) PathSourceStaticSetReadyRes
SetNotReady(req PathSourceStaticSetNotReadyReq)
}
// StaticSourceRunParams is the set of params passed to Run().
type StaticSourceRunParams struct {
Context context.Context

View File

@@ -43,6 +43,12 @@ func resolveSource(s string, matches []string, query string) string {
return s
}
type staticSource interface {
logger.Writer
Run(defs.StaticSourceRunParams) error
APISourceDescribe() defs.APIPathSourceOrReader
}
type handlerPathManager interface {
AddReader(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error)
}
@@ -67,7 +73,7 @@ type Handler struct {
ctx context.Context
ctxCancel func()
instance defs.StaticSource
instance staticSource
running bool
query string

View File

@@ -17,10 +17,16 @@ import (
"github.com/bluenviron/mediamtx/internal/stream"
)
type parent interface {
logger.Writer
SetReady(req defs.PathSourceStaticSetReadyReq) defs.PathSourceStaticSetReadyRes
SetNotReady(req defs.PathSourceStaticSetNotReadyReq)
}
// Source is a HLS static source.
type Source struct {
ReadTimeout conf.Duration
Parent defs.StaticSourceParent
Parent parent
}
// Log implements logger.Writer.

View File

@@ -88,16 +88,28 @@ func TestSource(t *testing.T) {
go s.Serve(ln)
defer s.Shutdown(context.Background())
te := test.NewSourceTester(
func(p defs.StaticSourceParent) defs.StaticSource {
return &Source{
Parent: p,
}
},
"http://localhost:5780/stream.m3u8",
&conf.Path{},
)
defer te.Close()
p := &test.StaticSourceParent{}
p.Initialize()
defer p.Close()
<-te.Unit
so := &Source{
Parent: p,
}
done := make(chan struct{})
defer func() { <-done }()
ctx, ctxCancel := context.WithCancel(context.Background())
defer ctxCancel()
go func() {
so.Run(defs.StaticSourceRunParams{ //nolint:errcheck
Context: ctx,
ResolvedSource: "http://localhost:5780/stream.m3u8",
Conf: &conf.Path{},
})
close(done)
}()
<-p.Unit
}

View File

@@ -101,7 +101,9 @@ func (*secondaryReader) APIReaderDescribe() defs.APIPathSourceOrReader {
}
type parent interface {
defs.StaticSourceParent
logger.Writer
SetReady(req defs.PathSourceStaticSetReadyReq) defs.PathSourceStaticSetReadyRes
SetNotReady(req defs.PathSourceStaticSetNotReadyReq)
AddReader(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error)
}

View File

@@ -18,11 +18,17 @@ import (
"github.com/bluenviron/mediamtx/internal/stream"
)
type parent interface {
logger.Writer
SetReady(req defs.PathSourceStaticSetReadyReq) defs.PathSourceStaticSetReadyRes
SetNotReady(req defs.PathSourceStaticSetNotReadyReq)
}
// Source is a RTMP static source.
type Source struct {
ReadTimeout conf.Duration
WriteTimeout conf.Duration
Parent defs.StaticSourceParent
Parent parent
}
// Log implements logger.Writer.

View File

@@ -1,6 +1,7 @@
package rtmp
import (
"context"
"crypto/tls"
"net"
"os"
@@ -104,23 +105,34 @@ func TestSource(t *testing.T) {
source += "localhost/teststream"
te := test.NewSourceTester(
func(p defs.StaticSourceParent) defs.StaticSource {
return &Source{
ReadTimeout: conf.Duration(10 * time.Second),
WriteTimeout: conf.Duration(10 * time.Second),
Parent: p,
}
},
source,
&conf.Path{
SourceFingerprint: "33949E05FFFB5FF3E8AA16F8213A6251B4D9363804BA53233C4DA9A46D6F2739",
},
)
p := &test.StaticSourceParent{}
p.Initialize()
defer p.Close()
defer te.Close()
so := &Source{
ReadTimeout: conf.Duration(10 * time.Second),
WriteTimeout: conf.Duration(10 * time.Second),
Parent: p,
}
<-te.Unit
done := make(chan struct{})
defer func() { <-done }()
ctx, ctxCancel := context.WithCancel(context.Background())
defer ctxCancel()
go func() {
so.Run(defs.StaticSourceRunParams{ //nolint:errcheck
Context: ctx,
ResolvedSource: source,
Conf: &conf.Path{
SourceFingerprint: "33949E05FFFB5FF3E8AA16F8213A6251B4D9363804BA53233C4DA9A46D6F2739",
},
})
close(done)
}()
<-p.Unit
})
}
}

View File

@@ -61,12 +61,18 @@ func createRangeHeader(cnf *conf.Path) (*headers.Range, error) {
}
}
type parent interface {
logger.Writer
SetReady(req defs.PathSourceStaticSetReadyReq) defs.PathSourceStaticSetReadyRes
SetNotReady(req defs.PathSourceStaticSetNotReadyReq)
}
// Source is a RTSP static source.
type Source struct {
ReadTimeout conf.Duration
WriteTimeout conf.Duration
WriteQueueSize int
Parent defs.StaticSourceParent
Parent parent
}
// Log implements logger.Writer.

View File

@@ -1,6 +1,7 @@
package rtsp
import (
"context"
"crypto/tls"
"os"
"testing"
@@ -134,46 +135,50 @@ func TestSource(t *testing.T) {
require.NoError(t, err)
defer strm.Close()
var te *test.SourceTester
var ur string
var cnf *conf.Path
if source != "tls" {
ur = "rtsp://testuser:testpass@localhost:8555/teststream"
var sp conf.RTSPTransport
sp.UnmarshalJSON([]byte(`"` + source + `"`)) //nolint:errcheck
te = test.NewSourceTester(
func(p defs.StaticSourceParent) defs.StaticSource {
return &Source{
ReadTimeout: conf.Duration(10 * time.Second),
WriteTimeout: conf.Duration(10 * time.Second),
WriteQueueSize: 2048,
Parent: p,
}
},
"rtsp://testuser:testpass@localhost:8555/teststream",
&conf.Path{
RTSPTransport: sp,
},
)
cnf = &conf.Path{
RTSPTransport: sp,
}
} else {
te = test.NewSourceTester(
func(p defs.StaticSourceParent) defs.StaticSource {
return &Source{
ReadTimeout: conf.Duration(10 * time.Second),
WriteTimeout: conf.Duration(10 * time.Second),
WriteQueueSize: 2048,
Parent: p,
}
},
"rtsps://testuser:testpass@localhost:8555/teststream",
&conf.Path{
SourceFingerprint: "33949E05FFFB5FF3E8AA16F8213A6251B4D9363804BA53233C4DA9A46D6F2739",
},
)
ur = "rtsps://testuser:testpass@localhost:8555/teststream"
cnf = &conf.Path{
SourceFingerprint: "33949E05FFFB5FF3E8AA16F8213A6251B4D9363804BA53233C4DA9A46D6F2739",
}
}
defer te.Close()
p := &test.StaticSourceParent{}
p.Initialize()
defer p.Close()
<-te.Unit
so := &Source{
ReadTimeout: conf.Duration(10 * time.Second),
WriteTimeout: conf.Duration(10 * time.Second),
WriteQueueSize: 2048,
Parent: p,
}
done := make(chan struct{})
defer func() { <-done }()
ctx, ctxCancel := context.WithCancel(context.Background())
defer ctxCancel()
go func() {
so.Run(defs.StaticSourceRunParams{ //nolint:errcheck
Context: ctx,
ResolvedSource: ur,
Conf: cnf,
})
close(done)
}()
<-p.Unit
})
}
}
@@ -248,23 +253,35 @@ func TestSourceNoPassword(t *testing.T) {
var sp conf.RTSPTransport
sp.UnmarshalJSON([]byte(`"tcp"`)) //nolint:errcheck
te := test.NewSourceTester(
func(p defs.StaticSourceParent) defs.StaticSource {
return &Source{
ReadTimeout: conf.Duration(10 * time.Second),
WriteTimeout: conf.Duration(10 * time.Second),
WriteQueueSize: 2048,
Parent: p,
}
},
"rtsp://testuser:@127.0.0.1:8555/teststream",
&conf.Path{
RTSPTransport: sp,
},
)
defer te.Close()
p := &test.StaticSourceParent{}
p.Initialize()
defer p.Close()
<-te.Unit
so := &Source{
ReadTimeout: conf.Duration(10 * time.Second),
WriteTimeout: conf.Duration(10 * time.Second),
WriteQueueSize: 2048,
Parent: p,
}
done := make(chan struct{})
defer func() { <-done }()
ctx, ctxCancel := context.WithCancel(context.Background())
defer ctxCancel()
go func() {
so.Run(defs.StaticSourceRunParams{ //nolint:errcheck
Context: ctx,
ResolvedSource: "rtsp://testuser:@127.0.0.1:8555/teststream",
Conf: &conf.Path{
RTSPTransport: sp,
},
})
close(done)
}()
<-p.Unit
}
func TestSourceRange(t *testing.T) {
@@ -350,21 +367,33 @@ func TestSourceRange(t *testing.T) {
cnf.RTSPRangeStart = "130s"
}
te := test.NewSourceTester(
func(p defs.StaticSourceParent) defs.StaticSource {
return &Source{
ReadTimeout: conf.Duration(10 * time.Second),
WriteTimeout: conf.Duration(10 * time.Second),
WriteQueueSize: 2048,
Parent: p,
}
},
"rtsp://127.0.0.1:8555/teststream",
cnf,
)
defer te.Close()
p := &test.StaticSourceParent{}
p.Initialize()
defer p.Close()
<-te.Unit
so := &Source{
ReadTimeout: conf.Duration(10 * time.Second),
WriteTimeout: conf.Duration(10 * time.Second),
WriteQueueSize: 2048,
Parent: p,
}
done := make(chan struct{})
defer func() { <-done }()
ctx, ctxCancel := context.WithCancel(context.Background())
defer ctxCancel()
go func() {
so.Run(defs.StaticSourceRunParams{ //nolint:errcheck
Context: ctx,
ResolvedSource: "rtsp://127.0.0.1:8555/teststream",
Conf: cnf,
})
close(done)
}()
<-p.Unit
})
}
}

View File

@@ -15,10 +15,16 @@ import (
"github.com/bluenviron/mediamtx/internal/stream"
)
type parent interface {
logger.Writer
SetReady(req defs.PathSourceStaticSetReadyReq) defs.PathSourceStaticSetReadyRes
SetNotReady(req defs.PathSourceStaticSetNotReadyReq)
}
// Source is a SRT static source.
type Source struct {
ReadTimeout conf.Duration
Parent defs.StaticSourceParent
Parent parent
}
// Log implements logger.Writer.

View File

@@ -2,6 +2,7 @@ package srt
import (
"bufio"
"context"
"testing"
"time"
@@ -52,17 +53,29 @@ func TestSource(t *testing.T) {
time.Sleep(500 * time.Millisecond)
}()
te := test.NewSourceTester(
func(p defs.StaticSourceParent) defs.StaticSource {
return &Source{
ReadTimeout: conf.Duration(10 * time.Second),
Parent: p,
}
},
"srt://127.0.0.1:9002?streamid=sidname&passphrase=ttest1234567",
&conf.Path{},
)
defer te.Close()
p := &test.StaticSourceParent{}
p.Initialize()
defer p.Close()
<-te.Unit
so := &Source{
ReadTimeout: conf.Duration(10 * time.Second),
Parent: p,
}
done := make(chan struct{})
defer func() { <-done }()
ctx, ctxCancel := context.WithCancel(context.Background())
defer ctxCancel()
go func() {
so.Run(defs.StaticSourceRunParams{ //nolint:errcheck
Context: ctx,
ResolvedSource: "srt://127.0.0.1:9002?streamid=sidname&passphrase=ttest1234567",
Conf: &conf.Path{},
})
close(done)
}()
<-p.Unit
}

View File

@@ -46,10 +46,16 @@ type packetConn interface {
SetReadBuffer(int) error
}
type parent interface {
logger.Writer
SetReady(req defs.PathSourceStaticSetReadyReq) defs.PathSourceStaticSetReadyRes
SetNotReady(req defs.PathSourceStaticSetNotReadyReq)
}
// Source is a UDP static source.
type Source struct {
ReadTimeout conf.Duration
Parent defs.StaticSourceParent
Parent parent
}
// Log implements logger.Writer.

View File

@@ -2,6 +2,7 @@ package udp
import (
"bufio"
"context"
"net"
"testing"
"time"
@@ -52,17 +53,29 @@ func TestSource(t *testing.T) {
src = "udp://127.0.0.1:9001?source=127.0.1.1"
}
te := test.NewSourceTester(
func(p defs.StaticSourceParent) defs.StaticSource {
return &Source{
ReadTimeout: conf.Duration(10 * time.Second),
Parent: p,
}
},
src,
&conf.Path{},
)
defer te.Close()
p := &test.StaticSourceParent{}
p.Initialize()
defer p.Close()
so := &Source{
ReadTimeout: conf.Duration(10 * time.Second),
Parent: p,
}
done := make(chan struct{})
defer func() { <-done }()
ctx, ctxCancel := context.WithCancel(context.Background())
defer ctxCancel()
go func() {
so.Run(defs.StaticSourceRunParams{ //nolint:errcheck
Context: ctx,
ResolvedSource: src,
Conf: &conf.Path{},
})
close(done)
}()
time.Sleep(50 * time.Millisecond)
@@ -117,7 +130,7 @@ func TestSource(t *testing.T) {
err = bw.Flush()
require.NoError(t, err)
<-te.Unit
<-p.Unit
})
}
}

View File

@@ -18,10 +18,16 @@ import (
"github.com/bluenviron/mediamtx/internal/stream"
)
type parent interface {
logger.Writer
SetReady(req defs.PathSourceStaticSetReadyReq) defs.PathSourceStaticSetReadyRes
SetNotReady(req defs.PathSourceStaticSetNotReadyReq)
}
// Source is a WebRTC static source.
type Source struct {
ReadTimeout conf.Duration
Parent defs.StaticSourceParent
Parent parent
}
// Log implements logger.Writer.

View File

@@ -123,17 +123,29 @@ func TestSource(t *testing.T) {
go httpServ.Serve(ln)
defer httpServ.Shutdown(context.Background())
te := test.NewSourceTester(
func(p defs.StaticSourceParent) defs.StaticSource {
return &Source{
ReadTimeout: conf.Duration(10 * time.Second),
Parent: p,
}
},
"whep://localhost:9003/my/resource",
&conf.Path{},
)
defer te.Close()
p := &test.StaticSourceParent{}
p.Initialize()
defer p.Close()
<-te.Unit
so := &Source{
ReadTimeout: conf.Duration(10 * time.Second),
Parent: p,
}
done := make(chan struct{})
defer func() { <-done }()
ctx, ctxCancel := context.WithCancel(context.Background())
defer ctxCancel()
go func() {
so.Run(defs.StaticSourceRunParams{ //nolint:errcheck
Context: ctx,
ResolvedSource: "whep://localhost:9003/my/resource",
Conf: &conf.Path{},
})
close(done)
}()
<-p.Unit
}

View File

@@ -1,3 +1,4 @@
// Package test contains test utilities.
package test
import "github.com/bluenviron/mediamtx/internal/auth"

View File

@@ -1,96 +0,0 @@
// Package test contains test utilities.
package test
import (
"context"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/stream"
"github.com/bluenviron/mediamtx/internal/unit"
)
// SourceTester is a static source tester.
type SourceTester struct {
ctx context.Context
ctxCancel func()
stream *stream.Stream
reader stream.Reader
Unit chan unit.Unit
done chan struct{}
}
// NewSourceTester allocates a SourceTester.
func NewSourceTester(
createFunc func(defs.StaticSourceParent) defs.StaticSource,
resolvedSource string,
conf *conf.Path,
) *SourceTester {
ctx, ctxCancel := context.WithCancel(context.Background())
t := &SourceTester{
ctx: ctx,
ctxCancel: ctxCancel,
Unit: make(chan unit.Unit),
done: make(chan struct{}),
}
s := createFunc(t)
go func() {
s.Run(defs.StaticSourceRunParams{ //nolint:errcheck
Context: ctx,
ResolvedSource: resolvedSource,
Conf: conf,
})
close(t.done)
}()
return t
}
// Close closes the tester.
func (t *SourceTester) Close() {
t.ctxCancel()
t.stream.RemoveReader(t.reader)
<-t.done
}
// Log implements StaticSourceParent.
func (t *SourceTester) Log(_ logger.Level, _ string, _ ...interface{}) {
}
// SetReady implements StaticSourceParent.
func (t *SourceTester) SetReady(req defs.PathSourceStaticSetReadyReq) defs.PathSourceStaticSetReadyRes {
t.stream = &stream.Stream{
WriteQueueSize: 512,
RTPMaxPayloadSize: 1450,
Desc: req.Desc,
GenerateRTPPackets: req.GenerateRTPPackets,
Parent: t,
}
err := t.stream.Initialize()
if err != nil {
panic(err)
}
t.reader = NilLogger
t.stream.AddReader(t.reader, req.Desc.Medias[0], req.Desc.Medias[0].Formats[0], func(u unit.Unit) error {
t.Unit <- u
close(t.Unit)
return nil
})
t.stream.StartReader(t.reader)
return defs.PathSourceStaticSetReadyRes{
Stream: t.stream,
}
}
// SetNotReady implements StaticSourceParent.
func (t *SourceTester) SetNotReady(_ defs.PathSourceStaticSetNotReadyReq) {
}

View File

@@ -0,0 +1,58 @@
package test
import (
"github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/stream"
"github.com/bluenviron/mediamtx/internal/unit"
)
// StaticSourceParent is a dummy static source parent.
type StaticSourceParent struct {
stream *stream.Stream
reader stream.Reader
Unit chan unit.Unit
}
// Log implements logger.Writer.
func (*StaticSourceParent) Log(logger.Level, string, ...interface{}) {}
// Initialize initializes StaticSourceParent.
func (p *StaticSourceParent) Initialize() {
p.Unit = make(chan unit.Unit)
}
// Close closes StaticSourceParent.
func (p *StaticSourceParent) Close() {
p.stream.RemoveReader(p.reader)
}
// SetReady implements parent.
func (p *StaticSourceParent) SetReady(req defs.PathSourceStaticSetReadyReq) defs.PathSourceStaticSetReadyRes {
p.stream = &stream.Stream{
WriteQueueSize: 512,
RTPMaxPayloadSize: 1450,
Desc: req.Desc,
GenerateRTPPackets: req.GenerateRTPPackets,
Parent: p,
}
err := p.stream.Initialize()
if err != nil {
panic(err)
}
p.reader = NilLogger
p.stream.AddReader(p.reader, req.Desc.Medias[0], req.Desc.Medias[0].Formats[0], func(u unit.Unit) error {
p.Unit <- u
close(p.Unit)
return nil
})
p.stream.StartReader(p.reader)
return defs.PathSourceStaticSetReadyRes{Stream: p.stream}
}
// SetNotReady implements parent.
func (StaticSourceParent) SetNotReady(_ defs.PathSourceStaticSetNotReadyReq) {}