remove PathManager.StartPublisher and StopPublisher (#4766)

simplify path manager usage.
This commit is contained in:
Alessandro Ros
2025-07-21 13:52:55 +02:00
committed by GitHub
parent cc27cf6563
commit 0b901ade3e
20 changed files with 314 additions and 462 deletions

View File

@@ -105,8 +105,6 @@ type path struct {
chDescribe chan defs.PathDescribeReq
chAddPublisher chan defs.PathAddPublisherReq
chRemovePublisher chan defs.PathRemovePublisherReq
chStartPublisher chan defs.PathStartPublisherReq
chStopPublisher chan defs.PathStopPublisherReq
chAddReader chan defs.PathAddReaderReq
chRemoveReader chan defs.PathRemoveReaderReq
chAPIPathsGet chan pathAPIPathsGetReq
@@ -131,8 +129,6 @@ func (pa *path) initialize() {
pa.chDescribe = make(chan defs.PathDescribeReq)
pa.chAddPublisher = make(chan defs.PathAddPublisherReq)
pa.chRemovePublisher = make(chan defs.PathRemovePublisherReq)
pa.chStartPublisher = make(chan defs.PathStartPublisherReq)
pa.chStopPublisher = make(chan defs.PathStopPublisherReq)
pa.chAddReader = make(chan defs.PathAddReaderReq)
pa.chRemoveReader = make(chan defs.PathRemoveReaderReq)
pa.chAPIPathsGet = make(chan pathAPIPathsGetReq)
@@ -297,16 +293,6 @@ func (pa *path) runInner() error {
return fmt.Errorf("not in use")
}
case req := <-pa.chStartPublisher:
pa.doStartPublisher(req)
case req := <-pa.chStopPublisher:
pa.doStopPublisher(req)
if pa.shouldClose() {
return fmt.Errorf("not in use")
}
case req := <-pa.chAddReader:
pa.doAddReader(req)
@@ -480,18 +466,10 @@ func (pa *path) doAddPublisher(req defs.PathAddPublisherReq) {
pa.source = req.Author
pa.publisherQuery = req.AccessRequest.Query
req.Res <- defs.PathAddPublisherRes{Path: pa}
}
func (pa *path) doStartPublisher(req defs.PathStartPublisherReq) {
if pa.source != req.Author {
req.Res <- defs.PathStartPublisherRes{Err: fmt.Errorf("publisher is not assigned to this path anymore")}
return
}
err := pa.setReady(req.Desc, req.GenerateRTPPackets)
if err != nil {
req.Res <- defs.PathStartPublisherRes{Err: err}
pa.source = nil
req.Res <- defs.PathAddPublisherRes{Err: err}
return
}
@@ -507,14 +485,10 @@ func (pa *path) doStartPublisher(req defs.PathStartPublisherReq) {
pa.consumeOnHoldRequests()
req.Res <- defs.PathStartPublisherRes{Stream: pa.stream}
}
func (pa *path) doStopPublisher(req defs.PathStopPublisherReq) {
if req.Author == pa.source && pa.stream != nil {
pa.setNotReady()
req.Res <- defs.PathAddPublisherRes{
Path: pa,
Stream: pa.stream,
}
close(req.Res)
}
func (pa *path) doAddReader(req defs.PathAddReaderReq) {
@@ -715,12 +689,12 @@ func (pa *path) setReady(desc *description.Session, allocateEncoder bool) error
return err
}
pa.readyTime = time.Now()
if pa.conf.Record {
pa.startRecording()
}
pa.readyTime = time.Now()
pa.onNotReadyHook = hooks.OnReady(hooks.OnReadyParams{
Logger: pa,
ExternalCmdPool: pa.externalCmdPool,
@@ -916,13 +890,13 @@ func (pa *path) describe(req defs.PathDescribeReq) defs.PathDescribeRes {
}
// addPublisher is called by a publisher through pathManager.
func (pa *path) addPublisher(req defs.PathAddPublisherReq) (defs.Path, error) {
func (pa *path) addPublisher(req defs.PathAddPublisherReq) (defs.Path, *stream.Stream, error) {
select {
case pa.chAddPublisher <- req:
res := <-req.Res
return res.Path, res.Err
return res.Path, res.Stream, res.Err
case <-pa.ctx.Done():
return nil, fmt.Errorf("terminated")
return nil, nil, fmt.Errorf("terminated")
}
}
@@ -936,28 +910,6 @@ func (pa *path) RemovePublisher(req defs.PathRemovePublisherReq) {
}
}
// StartPublisher is called by a publisher.
func (pa *path) StartPublisher(req defs.PathStartPublisherReq) (*stream.Stream, error) {
req.Res = make(chan defs.PathStartPublisherRes)
select {
case pa.chStartPublisher <- req:
res := <-req.Res
return res.Stream, res.Err
case <-pa.ctx.Done():
return nil, fmt.Errorf("terminated")
}
}
// StopPublisher is called by a publisher.
func (pa *path) StopPublisher(req defs.PathStopPublisherReq) {
req.Res = make(chan struct{})
select {
case pa.chStopPublisher <- req:
<-req.Res
case <-pa.ctx.Done():
}
}
// addReader is called by a reader through pathManager.
func (pa *path) addReader(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) {
select {

View File

@@ -379,6 +379,11 @@ func (pm *pathManager) doAddPublisher(req defs.PathAddPublisherReq) {
return
}
if req.ConfToCompare != nil && !pathConf.Equal(req.ConfToCompare) {
req.Res <- defs.PathAddPublisherRes{Err: fmt.Errorf("configuration has changed")}
return
}
if !req.AccessRequest.SkipAuth {
err = pm.authManager.Authenticate(req.AccessRequest.ToAuthRequest())
if err != nil {
@@ -520,19 +525,19 @@ func (pm *pathManager) Describe(req defs.PathDescribeReq) defs.PathDescribeRes {
}
// AddPublisher is called by a publisher.
func (pm *pathManager) AddPublisher(req defs.PathAddPublisherReq) (defs.Path, error) {
func (pm *pathManager) AddPublisher(req defs.PathAddPublisherReq) (defs.Path, *stream.Stream, error) {
req.Res = make(chan defs.PathAddPublisherRes)
select {
case pm.chAddPublisher <- req:
res := <-req.Res
if res.Err != nil {
return nil, res.Err
return nil, nil, res.Err
}
return res.Path.(*path).addPublisher(req)
case <-pm.ctx.Done():
return nil, fmt.Errorf("terminated")
return nil, nil, fmt.Errorf("terminated")
}
}

View File

@@ -25,8 +25,6 @@ type Path interface {
Name() string
SafeConf() *conf.Path
ExternalCmdEnv() externalcmd.Environment
StartPublisher(req PathStartPublisherReq) (*stream.Stream, error)
StopPublisher(req PathStopPublisherReq)
RemovePublisher(req PathRemovePublisherReq)
RemoveReader(req PathRemoveReaderReq)
}
@@ -59,39 +57,23 @@ type PathDescribeReq struct {
// PathAddPublisherRes contains the response of AddPublisher().
type PathAddPublisherRes struct {
Path Path
Err error
}
// PathAddPublisherReq contains arguments of AddPublisher().
type PathAddPublisherReq struct {
Author Publisher
AccessRequest PathAccessRequest
Res chan PathAddPublisherRes
}
// PathRemovePublisherReq contains arguments of RemovePublisher().
type PathRemovePublisherReq struct {
Author Publisher
Res chan struct{}
}
// PathStartPublisherRes contains the response of StartPublisher().
type PathStartPublisherRes struct {
Path Path
Stream *stream.Stream
Err error
}
// PathStartPublisherReq contains arguments of StartPublisher().
type PathStartPublisherReq struct {
// PathAddPublisherReq contains arguments of AddPublisher().
type PathAddPublisherReq struct {
Author Publisher
Desc *description.Session
GenerateRTPPackets bool
Res chan PathStartPublisherRes
ConfToCompare *conf.Path
AccessRequest PathAccessRequest
Res chan PathAddPublisherRes
}
// PathStopPublisherReq contains arguments of StopPublisher().
type PathStopPublisherReq struct {
// PathRemovePublisherReq contains arguments of RemovePublisher().
type PathRemovePublisherReq struct {
Author Publisher
Res chan struct{}
}

View File

@@ -147,17 +147,15 @@ func (s *httpServer) onRequest(ctx *gin.Context) {
return
}
req := defs.PathAccessRequest{
Name: dir,
Query: ctx.Request.URL.RawQuery,
Publish: false,
Proto: auth.ProtocolHLS,
Credentials: httpp.Credentials(ctx.Request),
IP: net.ParseIP(ctx.ClientIP()),
}
pathConf, err := s.pathManager.FindPathConf(defs.PathFindPathConfReq{
AccessRequest: req,
AccessRequest: defs.PathAccessRequest{
Name: dir,
Query: ctx.Request.URL.RawQuery,
Publish: false,
Proto: auth.ProtocolHLS,
Credentials: httpp.Credentials(ctx.Request),
IP: net.ParseIP(ctx.ClientIP()),
},
})
if err != nil {
var terr auth.Error

View File

@@ -57,13 +57,6 @@ func (pa *dummyPath) ExternalCmdEnv() externalcmd.Environment {
return nil
}
func (pa *dummyPath) StartPublisher(_ defs.PathStartPublisherReq) (*stream.Stream, error) {
return nil, fmt.Errorf("unimplemented")
}
func (pa *dummyPath) StopPublisher(_ defs.PathStopPublisherReq) {
}
func (pa *dummyPath) RemovePublisher(_ defs.PathRemovePublisherReq) {
}

View File

@@ -22,13 +22,6 @@ import (
"github.com/bluenviron/mediamtx/internal/stream"
)
type connState int
const (
connStateRead connState = iota + 1
connStatePublish
)
type conn struct {
parentCtx context.Context
isTLS bool
@@ -50,7 +43,7 @@ type conn struct {
created time.Time
mutex sync.RWMutex
rconn *rtmp.ServerConn
state connState
state defs.APIRTMPConnState
pathName string
query string
}
@@ -60,6 +53,7 @@ func (c *conn) initialize() {
c.uuid = uuid.New()
c.created = time.Now()
c.state = defs.APIRTMPConnStateIdle
c.Log(logger.Info, "opened")
@@ -183,7 +177,7 @@ func (c *conn) runRead() error {
defer path.RemoveReader(defs.PathRemoveReaderReq{Author: c})
c.mutex.Lock()
c.state = connStateRead
c.state = defs.APIRTMPConnStateRead
c.pathName = pathName
c.query = c.rconn.URL.RawQuery
c.mutex.Unlock()
@@ -225,8 +219,26 @@ func (c *conn) runPublish() error {
pathName := strings.TrimLeft(c.rconn.URL.Path, "/")
query := c.rconn.URL.Query()
path, err := c.pathManager.AddPublisher(defs.PathAddPublisherReq{
Author: c,
r := &rtmp.Reader{
Conn: c.rconn,
}
err := r.Initialize()
if err != nil {
return err
}
var stream *stream.Stream
medias, err := rtmp.ToStream(r, &stream)
if err != nil {
return err
}
var path defs.Path
path, stream, err = c.pathManager.AddPublisher(defs.PathAddPublisherReq{
Author: c,
Desc: &description.Session{Medias: medias},
GenerateRTPPackets: true,
AccessRequest: defs.PathAccessRequest{
Name: pathName,
Query: c.rconn.URL.RawQuery,
@@ -253,35 +265,11 @@ func (c *conn) runPublish() error {
defer path.RemovePublisher(defs.PathRemovePublisherReq{Author: c})
c.mutex.Lock()
c.state = connStatePublish
c.state = defs.APIRTMPConnStatePublish
c.pathName = pathName
c.query = c.rconn.URL.RawQuery
c.mutex.Unlock()
r := &rtmp.Reader{
Conn: c.rconn,
}
err = r.Initialize()
if err != nil {
return err
}
var stream *stream.Stream
medias, err := rtmp.ToStream(r, &stream)
if err != nil {
return err
}
stream, err = path.StartPublisher(defs.PathStartPublisherReq{
Author: c,
Desc: &description.Session{Medias: medias},
GenerateRTPPackets: true,
})
if err != nil {
return err
}
// disable write deadline to allow outgoing acknowledges
c.nconn.SetWriteDeadline(time.Time{})
@@ -325,21 +313,10 @@ func (c *conn) apiItem() *defs.APIRTMPConn {
}
return &defs.APIRTMPConn{
ID: c.uuid,
Created: c.created,
RemoteAddr: c.remoteAddr().String(),
State: func() defs.APIRTMPConnState {
switch c.state {
case connStateRead:
return defs.APIRTMPConnStateRead
case connStatePublish:
return defs.APIRTMPConnStatePublish
default:
return defs.APIRTMPConnStateIdle
}
}(),
ID: c.uuid,
Created: c.created,
RemoteAddr: c.remoteAddr().String(),
State: c.state,
Path: c.pathName,
Query: c.query,
BytesReceived: bytesReceived,

View File

@@ -63,7 +63,7 @@ type serverMetrics interface {
}
type serverPathManager interface {
AddPublisher(req defs.PathAddPublisherReq) (defs.Path, error)
AddPublisher(req defs.PathAddPublisherReq) (defs.Path, *stream.Stream, error)
AddReader(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error)
}

View File

@@ -20,10 +20,7 @@ import (
"github.com/stretchr/testify/require"
)
type dummyPath struct {
stream *stream.Stream
streamCreated chan struct{}
}
type dummyPath struct{}
func (p *dummyPath) Name() string {
return "teststream"
@@ -37,26 +34,6 @@ func (p *dummyPath) ExternalCmdEnv() externalcmd.Environment {
return externalcmd.Environment{}
}
func (p *dummyPath) StartPublisher(req defs.PathStartPublisherReq) (*stream.Stream, error) {
p.stream = &stream.Stream{
WriteQueueSize: 512,
RTPMaxPayloadSize: 1450,
Desc: req.Desc,
GenerateRTPPackets: true,
Parent: test.NilLogger,
}
err := p.stream.Initialize()
if err != nil {
return nil, err
}
close(p.streamCreated)
return p.stream, nil
}
func (p *dummyPath) StopPublisher(_ defs.PathStopPublisherReq) {
}
func (p *dummyPath) RemovePublisher(_ defs.PathRemovePublisherReq) {
}
@@ -83,17 +60,29 @@ func TestServerPublish(t *testing.T) {
defer os.Remove(serverKeyFpath)
}
path := &dummyPath{
streamCreated: make(chan struct{}),
}
var strm *stream.Stream
streamCreated := make(chan struct{})
pathManager := &test.PathManager{
AddPublisherImpl: func(req defs.PathAddPublisherReq) (defs.Path, error) {
AddPublisherImpl: func(req defs.PathAddPublisherReq) (defs.Path, *stream.Stream, error) {
require.Equal(t, "teststream", req.AccessRequest.Name)
require.Equal(t, "user=myuser&pass=mypass&param=value", req.AccessRequest.Query)
require.Equal(t, "myuser", req.AccessRequest.Credentials.User)
require.Equal(t, "mypass", req.AccessRequest.Credentials.Pass)
return path, nil
strm = &stream.Stream{
WriteQueueSize: 512,
RTPMaxPayloadSize: 1450,
Desc: req.Desc,
GenerateRTPPackets: true,
Parent: test.NilLogger,
}
err := strm.Initialize()
require.NoError(t, err)
close(streamCreated)
return &dummyPath{}, strm, nil
},
}
@@ -152,16 +141,16 @@ func TestServerPublish(t *testing.T) {
})
require.NoError(t, err)
<-path.streamCreated
<-streamCreated
recv := make(chan struct{})
reader := test.NilLogger
path.stream.AddReader(
strm.AddReader(
reader,
path.stream.Desc.Medias[0],
path.stream.Desc.Medias[0].Formats[0],
strm.Desc.Medias[0],
strm.Desc.Medias[0].Formats[0],
func(u unit.Unit) error {
require.Equal(t, [][]byte{
test.FormatH264.SPS,
@@ -172,8 +161,8 @@ func TestServerPublish(t *testing.T) {
return nil
})
path.stream.StartReader(reader)
defer path.stream.RemoveReader(reader)
strm.StartReader(reader)
defer strm.RemoveReader(reader)
err = w.WriteH264(
3*time.Second, 3*time.Second, [][]byte{
@@ -217,15 +206,13 @@ func TestServerRead(t *testing.T) {
err := strm.Initialize()
require.NoError(t, err)
path := &dummyPath{stream: strm}
pathManager := &test.PathManager{
AddReaderImpl: func(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) {
require.Equal(t, "teststream", req.AccessRequest.Name)
require.Equal(t, "user=myuser&pass=mypass&param=value", req.AccessRequest.Query)
require.Equal(t, "myuser", req.AccessRequest.Credentials.User)
require.Equal(t, "mypass", req.AccessRequest.Credentials.Pass)
return path, path.stream, nil
return &dummyPath{}, strm, nil
},
}

View File

@@ -156,18 +156,16 @@ func (c *conn) onDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx,
}
}
req := defs.PathAccessRequest{
Name: ctx.Path,
Query: ctx.Query,
Proto: auth.ProtocolRTSP,
ID: &c.uuid,
Credentials: rtsp.Credentials(ctx.Request),
IP: c.ip(),
CustomVerifyFunc: customVerifyFunc,
}
res := c.pathManager.Describe(defs.PathDescribeReq{
AccessRequest: req,
AccessRequest: defs.PathAccessRequest{
Name: ctx.Path,
Query: ctx.Query,
Proto: auth.ProtocolRTSP,
ID: &c.uuid,
Credentials: rtsp.Credentials(ctx.Request),
IP: c.ip(),
CustomVerifyFunc: customVerifyFunc,
},
})
if res.Err != nil {

View File

@@ -58,8 +58,9 @@ type serverMetrics interface {
}
type serverPathManager interface {
FindPathConf(req defs.PathFindPathConfReq) (*conf.Path, error)
Describe(req defs.PathDescribeReq) defs.PathDescribeRes
AddPublisher(_ defs.PathAddPublisherReq) (defs.Path, error)
AddPublisher(_ defs.PathAddPublisherReq) (defs.Path, *stream.Stream, error)
AddReader(_ defs.PathAddReaderReq) (defs.Path, *stream.Stream, error)
}

View File

@@ -20,10 +20,7 @@ import (
"github.com/stretchr/testify/require"
)
type dummyPath struct {
stream *stream.Stream
streamCreated chan struct{}
}
type dummyPath struct{}
func (p *dummyPath) Name() string {
return "teststream"
@@ -37,26 +34,6 @@ func (p *dummyPath) ExternalCmdEnv() externalcmd.Environment {
return externalcmd.Environment{}
}
func (p *dummyPath) StartPublisher(req defs.PathStartPublisherReq) (*stream.Stream, error) {
p.stream = &stream.Stream{
WriteQueueSize: 512,
RTPMaxPayloadSize: 1450,
Desc: req.Desc,
GenerateRTPPackets: true,
Parent: test.NilLogger,
}
err := p.stream.Initialize()
if err != nil {
return nil, err
}
close(p.streamCreated)
return p.stream, nil
}
func (p *dummyPath) StopPublisher(_ defs.PathStopPublisherReq) {
}
func (p *dummyPath) RemovePublisher(_ defs.PathRemovePublisherReq) {
}
@@ -66,22 +43,22 @@ func (p *dummyPath) RemoveReader(_ defs.PathRemoveReaderReq) {
func TestServerPublish(t *testing.T) {
for _, ca := range []string{"basic", "digest", "basic+digest"} {
t.Run(ca, func(t *testing.T) {
path := &dummyPath{
streamCreated: make(chan struct{}),
}
var strm *stream.Stream
streamCreated := make(chan struct{})
n := 0
pathManager := &test.PathManager{
AddPublisherImpl: func(req defs.PathAddPublisherReq) (defs.Path, error) {
FindPathConfImpl: func(req defs.PathFindPathConfReq) (*conf.Path, error) {
require.Equal(t, "teststream", req.AccessRequest.Name)
require.Equal(t, "param=value", req.AccessRequest.Query)
if ca == "basic" {
require.Nil(t, req.AccessRequest.CustomVerifyFunc)
if req.AccessRequest.Credentials.User == "" && req.AccessRequest.Credentials.Pass == "" {
return nil, auth.Error{Message: "", AskCredentials: true}
}
require.Equal(t, "teststream", req.AccessRequest.Name)
require.Equal(t, "param=value", req.AccessRequest.Query)
require.Equal(t, "myuser", req.AccessRequest.Credentials.User)
require.Equal(t, "mypass", req.AccessRequest.Credentials.Pass)
} else {
@@ -94,7 +71,26 @@ func TestServerPublish(t *testing.T) {
require.True(t, ok)
}
return path, nil
return &conf.Path{}, nil
},
AddPublisherImpl: func(req defs.PathAddPublisherReq) (defs.Path, *stream.Stream, error) {
require.Equal(t, "teststream", req.AccessRequest.Name)
require.Equal(t, "param=value", req.AccessRequest.Query)
require.True(t, req.AccessRequest.SkipAuth)
strm = &stream.Stream{
WriteQueueSize: 512,
RTPMaxPayloadSize: 1450,
Desc: req.Desc,
GenerateRTPPackets: true,
Parent: test.NilLogger,
}
err := strm.Initialize()
require.NoError(t, err)
close(streamCreated)
return &dummyPath{}, strm, nil
},
}
@@ -132,16 +128,16 @@ func TestServerPublish(t *testing.T) {
require.NoError(t, err)
defer source.Close()
<-path.streamCreated
<-streamCreated
reader := test.NilLogger
recv := make(chan struct{})
path.stream.AddReader(
strm.AddReader(
reader,
path.stream.Desc.Medias[0],
path.stream.Desc.Medias[0].Formats[0],
strm.Desc.Medias[0],
strm.Desc.Medias[0].Formats[0],
func(u unit.Unit) error {
require.Equal(t, [][]byte{
test.FormatH264.SPS,
@@ -152,8 +148,8 @@ func TestServerPublish(t *testing.T) {
return nil
})
path.stream.StartReader(reader)
defer path.stream.RemoveReader(reader)
strm.StartReader(reader)
defer strm.RemoveReader(reader)
err = source.WritePacketRTP(media0, &rtp.Packet{
Header: rtp.Header{
@@ -188,19 +184,20 @@ func TestServerRead(t *testing.T) {
err := strm.Initialize()
require.NoError(t, err)
path := &dummyPath{stream: strm}
n := 0
pathManager := &test.PathManager{
DescribeImpl: func(req defs.PathDescribeReq) defs.PathDescribeRes {
require.Equal(t, "teststream", req.AccessRequest.Name)
require.Equal(t, "param=value", req.AccessRequest.Query)
if ca == "basic" {
require.Nil(t, req.AccessRequest.CustomVerifyFunc)
if req.AccessRequest.Credentials.User == "" && req.AccessRequest.Credentials.Pass == "" {
return defs.PathDescribeRes{Err: auth.Error{Message: "", AskCredentials: true}}
}
require.Equal(t, "teststream", req.AccessRequest.Name)
require.Equal(t, "param=value", req.AccessRequest.Query)
require.Equal(t, "myuser", req.AccessRequest.Credentials.User)
require.Equal(t, "mypass", req.AccessRequest.Credentials.Pass)
} else {
@@ -214,17 +211,17 @@ func TestServerRead(t *testing.T) {
}
return defs.PathDescribeRes{
Path: path,
Stream: path.stream,
Path: &dummyPath{},
Stream: strm,
Err: nil,
}
},
AddReaderImpl: func(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) {
require.Equal(t, "teststream", req.AccessRequest.Name)
require.Equal(t, "param=value", req.AccessRequest.Query)
if ca == "basic" {
require.Nil(t, req.AccessRequest.CustomVerifyFunc)
require.Equal(t, "teststream", req.AccessRequest.Name)
require.Equal(t, "param=value", req.AccessRequest.Query)
require.Equal(t, "myuser", req.AccessRequest.Credentials.User)
require.Equal(t, "mypass", req.AccessRequest.Credentials.Pass)
} else {
@@ -232,7 +229,7 @@ func TestServerRead(t *testing.T) {
require.True(t, ok)
}
return path, path.stream, nil
return &dummyPath{}, strm, nil
},
}
@@ -334,8 +331,6 @@ func TestServerRedirect(t *testing.T) {
err := strm.Initialize()
require.NoError(t, err)
path := &dummyPath{stream: strm}
pathManager := &test.PathManager{
DescribeImpl: func(req defs.PathDescribeReq) defs.PathDescribeRes {
if req.AccessRequest.Name == "path1" {
@@ -359,9 +354,8 @@ func TestServerRedirect(t *testing.T) {
require.Equal(t, "mypass", req.AccessRequest.Credentials.Pass)
return defs.PathDescribeRes{
Path: path,
Stream: path.stream,
Err: nil,
Path: &dummyPath{},
Stream: strm,
}
},
}

View File

@@ -36,11 +36,12 @@ type session struct {
uuid uuid.UUID
created time.Time
pathConf *conf.Path // record only
path defs.Path
stream *stream.Stream
onUnreadHook func()
mutex sync.Mutex
state gortsplib.ServerSessionState
state defs.APIRTSPSessionState
transport *gortsplib.Transport
pathName string
query string
@@ -52,6 +53,7 @@ type session struct {
func (s *session) initialize() {
s.uuid = uuid.New()
s.created = time.Now()
s.state = defs.APIRTSPSessionStateIdle
s.packetsLost = &counterdumper.CounterDumper{
OnReport: func(val uint64) {
@@ -126,7 +128,7 @@ func (s *session) onClose(err error) {
case gortsplib.ServerSessionStatePrePlay, gortsplib.ServerSessionStatePlay:
s.path.RemoveReader(defs.PathRemoveReaderReq{Author: s})
case gortsplib.ServerSessionStatePreRecord, gortsplib.ServerSessionStateRecord:
case gortsplib.ServerSessionStateRecord:
s.path.RemovePublisher(defs.PathRemovePublisherReq{Author: s})
}
@@ -154,20 +156,17 @@ func (s *session) onAnnounce(c *conn, ctx *gortsplib.ServerHandlerOnAnnounceCtx)
}
}
req := defs.PathAccessRequest{
Name: ctx.Path,
Query: ctx.Query,
Publish: true,
Proto: auth.ProtocolRTSP,
ID: &c.uuid,
Credentials: rtsp.Credentials(ctx.Request),
IP: c.ip(),
CustomVerifyFunc: customVerifyFunc,
}
path, err := s.pathManager.AddPublisher(defs.PathAddPublisherReq{
Author: s,
AccessRequest: req,
pathConf, err := s.pathManager.FindPathConf(defs.PathFindPathConfReq{
AccessRequest: defs.PathAccessRequest{
Name: ctx.Path,
Query: ctx.Query,
Publish: true,
Proto: auth.ProtocolRTSP,
ID: &c.uuid,
Credentials: rtsp.Credentials(ctx.Request),
IP: c.ip(),
CustomVerifyFunc: customVerifyFunc,
},
})
if err != nil {
var terr auth.Error
@@ -180,10 +179,9 @@ func (s *session) onAnnounce(c *conn, ctx *gortsplib.ServerHandlerOnAnnounceCtx)
}, err
}
s.path = path
s.pathConf = pathConf
s.mutex.Lock()
s.state = gortsplib.ServerSessionStatePreRecord
s.pathName = ctx.Path
s.query = ctx.Query
s.mutex.Unlock()
@@ -226,19 +224,17 @@ func (s *session) onSetup(c *conn, ctx *gortsplib.ServerHandlerOnSetupCtx,
switch s.rsession.State() {
case gortsplib.ServerSessionStateInitial, gortsplib.ServerSessionStatePrePlay: // play
req := defs.PathAccessRequest{
Name: ctx.Path,
Query: ctx.Query,
Proto: auth.ProtocolRTSP,
ID: &c.uuid,
Credentials: rtsp.Credentials(ctx.Request),
IP: c.ip(),
CustomVerifyFunc: customVerifyFunc,
}
path, stream, err := s.pathManager.AddReader(defs.PathAddReaderReq{
Author: s,
AccessRequest: req,
Author: s,
AccessRequest: defs.PathAccessRequest{
Name: ctx.Path,
Query: ctx.Query,
Proto: auth.ProtocolRTSP,
ID: &c.uuid,
Credentials: rtsp.Credentials(ctx.Request),
IP: c.ip(),
CustomVerifyFunc: customVerifyFunc,
},
})
if err != nil {
var terr auth.Error
@@ -263,7 +259,6 @@ func (s *session) onSetup(c *conn, ctx *gortsplib.ServerHandlerOnSetupCtx,
s.stream = stream
s.mutex.Lock()
s.state = gortsplib.ServerSessionStatePrePlay
s.pathName = ctx.Path
s.query = ctx.Query
s.mutex.Unlock()
@@ -306,7 +301,7 @@ func (s *session) onPlay(_ *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, e
})
s.mutex.Lock()
s.state = gortsplib.ServerSessionStatePlay
s.state = defs.APIRTSPSessionStateRead
s.transport = s.rsession.SetuppedTransport()
s.mutex.Unlock()
}
@@ -319,10 +314,17 @@ func (s *session) onPlay(_ *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, e
// onRecord is called by rtspServer.
func (s *session) onRecord(_ *gortsplib.ServerHandlerOnRecordCtx) (*base.Response, error) {
stream, err := s.path.StartPublisher(defs.PathStartPublisherReq{
path, stream, err := s.pathManager.AddPublisher(defs.PathAddPublisherReq{
Author: s,
Desc: s.rsession.AnnouncedDescription(),
GenerateRTPPackets: false,
ConfToCompare: s.pathConf,
AccessRequest: defs.PathAccessRequest{
Name: s.pathName,
Query: s.query,
Publish: true,
SkipAuth: true,
},
})
if err != nil {
return &base.Response{
@@ -330,6 +332,7 @@ func (s *session) onRecord(_ *gortsplib.ServerHandlerOnRecordCtx) (*base.Respons
}, err
}
s.path = path
s.stream = stream
rtsp.ToStream(
@@ -340,7 +343,7 @@ func (s *session) onRecord(_ *gortsplib.ServerHandlerOnRecordCtx) (*base.Respons
s)
s.mutex.Lock()
s.state = gortsplib.ServerSessionStateRecord
s.state = defs.APIRTSPSessionStatePublish
s.transport = s.rsession.SetuppedTransport()
s.mutex.Unlock()
@@ -356,14 +359,14 @@ func (s *session) onPause(_ *gortsplib.ServerHandlerOnPauseCtx) (*base.Response,
s.onUnreadHook()
s.mutex.Lock()
s.state = gortsplib.ServerSessionStatePrePlay
s.state = defs.APIRTSPSessionStateIdle
s.mutex.Unlock()
case gortsplib.ServerSessionStateRecord:
s.path.StopPublisher(defs.PathStopPublisherReq{Author: s})
s.path.RemovePublisher(defs.PathRemovePublisherReq{Author: s})
s.mutex.Lock()
s.state = gortsplib.ServerSessionStatePreRecord
s.state = defs.APIRTSPSessionStateIdle
s.mutex.Unlock()
}
@@ -416,20 +419,9 @@ func (s *session) apiItem() *defs.APIRTSPSession {
ID: s.uuid,
Created: s.created,
RemoteAddr: s.remoteAddr().String(),
State: func() defs.APIRTSPSessionState {
switch s.state {
case gortsplib.ServerSessionStatePrePlay,
gortsplib.ServerSessionStatePlay:
return defs.APIRTSPSessionStateRead
case gortsplib.ServerSessionStatePreRecord,
gortsplib.ServerSessionStateRecord:
return defs.APIRTSPSessionStatePublish
}
return defs.APIRTSPSessionStateIdle
}(),
Path: s.pathName,
Query: s.query,
State: s.state,
Path: s.pathName,
Query: s.query,
Transport: func() *string {
if s.transport == nil {
return nil

View File

@@ -41,13 +41,6 @@ func srtCheckPassphrase(connReq srt.ConnRequest, passphrase string) error {
return nil
}
type connState int
const (
connStateRead connState = iota + 1
connStatePublish
)
type conn struct {
parentCtx context.Context
rtspAddress string
@@ -68,7 +61,7 @@ type conn struct {
created time.Time
uuid uuid.UUID
mutex sync.RWMutex
state connState
state defs.APISRTConnState
pathName string
query string
sconn srt.Conn
@@ -79,6 +72,7 @@ func (c *conn) initialize() {
c.created = time.Now()
c.uuid = uuid.New()
c.state = defs.APISRTConnStateIdle
c.Log(logger.Info, "opened")
@@ -137,8 +131,7 @@ func (c *conn) runInner() error {
}
func (c *conn) runPublish(streamID *streamID) error {
path, err := c.pathManager.AddPublisher(defs.PathAddPublisherReq{
Author: c,
pathConf, err := c.pathManager.FindPathConf(defs.PathFindPathConfReq{
AccessRequest: defs.PathAccessRequest{
Name: streamID.path,
Query: streamID.query,
@@ -164,9 +157,7 @@ func (c *conn) runPublish(streamID *streamID) error {
return err
}
defer path.RemovePublisher(defs.PathRemovePublisherReq{Author: c})
err = srtCheckPassphrase(c.connReq, path.SafeConf().SRTPublishPassphrase)
err = srtCheckPassphrase(c.connReq, pathConf.SRTPublishPassphrase)
if err != nil {
c.connReq.Reject(srt.REJ_PEER)
return err
@@ -177,16 +168,9 @@ func (c *conn) runPublish(streamID *streamID) error {
return err
}
c.mutex.Lock()
c.state = connStatePublish
c.pathName = streamID.path
c.query = streamID.query
c.sconn = sconn
c.mutex.Unlock()
readerErr := make(chan error)
go func() {
readerErr <- c.runPublishReader(sconn, path)
readerErr <- c.runPublishReader(sconn, streamID, pathConf)
}()
select {
@@ -201,7 +185,7 @@ func (c *conn) runPublish(streamID *streamID) error {
}
}
func (c *conn) runPublishReader(sconn srt.Conn, path defs.Path) error {
func (c *conn) runPublishReader(sconn srt.Conn, streamID *streamID, pathConf *conf.Path) error {
sconn.SetReadDeadline(time.Now().Add(time.Duration(c.readTimeout)))
r := &mpegts.EnhancedReader{R: sconn}
err := r.Initialize()
@@ -236,15 +220,32 @@ func (c *conn) runPublishReader(sconn srt.Conn, path defs.Path) error {
return err
}
stream, err = path.StartPublisher(defs.PathStartPublisherReq{
var path defs.Path
path, stream, err = c.pathManager.AddPublisher(defs.PathAddPublisherReq{
Author: c,
Desc: &description.Session{Medias: medias},
GenerateRTPPackets: true,
ConfToCompare: pathConf,
AccessRequest: defs.PathAccessRequest{
Name: streamID.path,
Query: streamID.query,
Publish: true,
SkipAuth: true,
},
})
if err != nil {
return err
}
defer path.RemovePublisher(defs.PathRemovePublisherReq{Author: c})
c.mutex.Lock()
c.state = defs.APISRTConnStatePublish
c.pathName = streamID.path
c.query = streamID.query
c.sconn = sconn
c.mutex.Unlock()
for {
err = r.Read()
if err != nil {
@@ -294,13 +295,6 @@ func (c *conn) runRead(streamID *streamID) error {
}
defer sconn.Close()
c.mutex.Lock()
c.state = connStateRead
c.pathName = streamID.path
c.query = streamID.query
c.sconn = sconn
c.mutex.Unlock()
bw := bufio.NewWriterSize(sconn, srtMaxPayloadSize(c.udpMaxPayloadSize))
err = mpegts.FromStream(stream, c, bw, sconn, time.Duration(c.writeTimeout))
@@ -308,6 +302,13 @@ func (c *conn) runRead(streamID *streamID) error {
return err
}
c.mutex.Lock()
c.state = defs.APISRTConnStateRead
c.pathName = streamID.path
c.query = streamID.query
c.sconn = sconn
c.mutex.Unlock()
c.Log(logger.Info, "is reading from path '%s', %s",
path.Name(), defs.FormatsInfo(stream.ReaderFormats(c)))
@@ -357,20 +358,9 @@ func (c *conn) apiItem() *defs.APISRTConn {
ID: c.uuid,
Created: c.created,
RemoteAddr: c.connReq.RemoteAddr().String(),
State: func() defs.APISRTConnState {
switch c.state {
case connStateRead:
return defs.APISRTConnStateRead
case connStatePublish:
return defs.APISRTConnStatePublish
default:
return defs.APISRTConnStateIdle
}
}(),
Path: c.pathName,
Query: c.query,
State: c.state,
Path: c.pathName,
Query: c.query,
}
if c.sconn != nil {

View File

@@ -64,7 +64,8 @@ type serverMetrics interface {
}
type serverPathManager interface {
AddPublisher(req defs.PathAddPublisherReq) (defs.Path, error)
FindPathConf(req defs.PathFindPathConfReq) (*conf.Path, error)
AddPublisher(req defs.PathAddPublisherReq) (defs.Path, *stream.Stream, error)
AddReader(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error)
}

View File

@@ -17,10 +17,7 @@ import (
"github.com/stretchr/testify/require"
)
type dummyPath struct {
stream *stream.Stream
streamCreated chan struct{}
}
type dummyPath struct{}
func (p *dummyPath) Name() string {
return "teststream"
@@ -34,26 +31,6 @@ func (p *dummyPath) ExternalCmdEnv() externalcmd.Environment {
return externalcmd.Environment{}
}
func (p *dummyPath) StartPublisher(req defs.PathStartPublisherReq) (*stream.Stream, error) {
p.stream = &stream.Stream{
WriteQueueSize: 512,
RTPMaxPayloadSize: 1450,
Desc: req.Desc,
GenerateRTPPackets: true,
Parent: test.NilLogger,
}
err := p.stream.Initialize()
if err != nil {
return nil, err
}
close(p.streamCreated)
return p.stream, nil
}
func (p *dummyPath) StopPublisher(_ defs.PathStopPublisherReq) {
}
func (p *dummyPath) RemovePublisher(_ defs.PathRemovePublisherReq) {
}
@@ -65,17 +42,35 @@ func TestServerPublish(t *testing.T) {
externalCmdPool.Initialize()
defer externalCmdPool.Close()
path := &dummyPath{
streamCreated: make(chan struct{}),
}
var strm *stream.Stream
streamCreated := make(chan struct{})
pathManager := &test.PathManager{
AddPublisherImpl: func(req defs.PathAddPublisherReq) (defs.Path, error) {
FindPathConfImpl: func(req defs.PathFindPathConfReq) (*conf.Path, error) {
require.Equal(t, "teststream", req.AccessRequest.Name)
require.Equal(t, "param=value", req.AccessRequest.Query)
require.Equal(t, "myuser", req.AccessRequest.Credentials.User)
require.Equal(t, "mypass", req.AccessRequest.Credentials.Pass)
return path, nil
return &conf.Path{}, nil
},
AddPublisherImpl: func(req defs.PathAddPublisherReq) (defs.Path, *stream.Stream, error) {
require.Equal(t, "teststream", req.AccessRequest.Name)
require.Equal(t, "param=value", req.AccessRequest.Query)
require.True(t, req.AccessRequest.SkipAuth)
strm = &stream.Stream{
WriteQueueSize: 512,
RTPMaxPayloadSize: 1450,
Desc: req.Desc,
GenerateRTPPackets: true,
Parent: test.NilLogger,
}
err := strm.Initialize()
require.NoError(t, err)
close(streamCreated)
return &dummyPath{}, strm, nil
},
}
@@ -128,16 +123,16 @@ func TestServerPublish(t *testing.T) {
err = bw.Flush()
require.NoError(t, err)
<-path.streamCreated
<-streamCreated
reader := test.NilLogger
recv := make(chan struct{})
path.stream.AddReader(
strm.AddReader(
reader,
path.stream.Desc.Medias[0],
path.stream.Desc.Medias[0].Formats[0],
strm.Desc.Medias[0],
strm.Desc.Medias[0].Formats[0],
func(u unit.Unit) error {
require.Equal(t, [][]byte{
test.FormatH264.SPS,
@@ -148,8 +143,8 @@ func TestServerPublish(t *testing.T) {
return nil
})
path.stream.StartReader(reader)
defer path.stream.RemoveReader(reader)
strm.StartReader(reader)
defer strm.RemoveReader(reader)
err = w.WriteH264(track, 0, 0, [][]byte{
{5, 2},
@@ -179,15 +174,13 @@ func TestServerRead(t *testing.T) {
err := strm.Initialize()
require.NoError(t, err)
path := &dummyPath{stream: strm}
pathManager := &test.PathManager{
AddReaderImpl: func(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) {
require.Equal(t, "teststream", req.AccessRequest.Name)
require.Equal(t, "param=value", req.AccessRequest.Query)
require.Equal(t, "myuser", req.AccessRequest.Credentials.User)
require.Equal(t, "mypass", req.AccessRequest.Credentials.Pass)
return path, path.stream, nil
return &dummyPath{}, strm, nil
},
}

View File

@@ -124,17 +124,15 @@ func (s *httpServer) close() {
}
func (s *httpServer) checkAuthOutsideSession(ctx *gin.Context, pathName string, publish bool) bool {
req := defs.PathAccessRequest{
Name: pathName,
Query: ctx.Request.URL.RawQuery,
Publish: publish,
Proto: auth.ProtocolWebRTC,
Credentials: httpp.Credentials(ctx.Request),
IP: net.ParseIP(ctx.ClientIP()),
}
_, err := s.pathManager.FindPathConf(defs.PathFindPathConfReq{
AccessRequest: req,
AccessRequest: defs.PathAccessRequest{
Name: pathName,
Query: ctx.Request.URL.RawQuery,
Publish: publish,
Proto: auth.ProtocolWebRTC,
Credentials: httpp.Credentials(ctx.Request),
IP: net.ParseIP(ctx.ClientIP()),
},
})
if err != nil {
var terr auth.Error

View File

@@ -174,7 +174,7 @@ type serverMetrics interface {
type serverPathManager interface {
FindPathConf(req defs.PathFindPathConfReq) (*conf.Path, error)
AddPublisher(req defs.PathAddPublisherReq) (defs.Path, error)
AddPublisher(req defs.PathAddPublisherReq) (defs.Path, *stream.Stream, error)
AddReader(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error)
}

View File

@@ -34,10 +34,7 @@ func checkClose(t *testing.T, closeFunc func() error) {
require.NoError(t, closeFunc())
}
type dummyPath struct {
stream *stream.Stream
streamCreated chan struct{}
}
type dummyPath struct{}
func (p *dummyPath) Name() string {
return "teststream"
@@ -51,26 +48,6 @@ func (p *dummyPath) ExternalCmdEnv() externalcmd.Environment {
return externalcmd.Environment{}
}
func (p *dummyPath) StartPublisher(req defs.PathStartPublisherReq) (*stream.Stream, error) {
p.stream = &stream.Stream{
WriteQueueSize: 512,
RTPMaxPayloadSize: 1450,
Desc: req.Desc,
GenerateRTPPackets: true,
Parent: test.NilLogger,
}
err := p.stream.Initialize()
if err != nil {
return nil, err
}
close(p.streamCreated)
return p.stream, nil
}
func (p *dummyPath) StopPublisher(_ defs.PathStopPublisherReq) {
}
func (p *dummyPath) RemovePublisher(_ defs.PathRemovePublisherReq) {
}
@@ -223,9 +200,8 @@ func TestServerOptionsICEServer(t *testing.T) {
}
func TestServerPublish(t *testing.T) {
path := &dummyPath{
streamCreated: make(chan struct{}),
}
var strm *stream.Stream
streamCreated := make(chan struct{})
pathManager := &test.PathManager{
FindPathConfImpl: func(req defs.PathFindPathConfReq) (*conf.Path, error) {
@@ -235,12 +211,24 @@ func TestServerPublish(t *testing.T) {
require.Equal(t, "mypass", req.AccessRequest.Credentials.Pass)
return &conf.Path{}, nil
},
AddPublisherImpl: func(req defs.PathAddPublisherReq) (defs.Path, error) {
AddPublisherImpl: func(req defs.PathAddPublisherReq) (defs.Path, *stream.Stream, error) {
require.Equal(t, "teststream", req.AccessRequest.Name)
require.Equal(t, "param=value", req.AccessRequest.Query)
require.Equal(t, "myuser", req.AccessRequest.Credentials.User)
require.Equal(t, "mypass", req.AccessRequest.Credentials.Pass)
return path, nil
require.True(t, req.AccessRequest.SkipAuth)
strm = &stream.Stream{
WriteQueueSize: 512,
RTPMaxPayloadSize: 1450,
Desc: req.Desc,
GenerateRTPPackets: true,
Parent: test.NilLogger,
}
err := strm.Initialize()
require.NoError(t, err)
close(streamCreated)
return &dummyPath{}, strm, nil
},
}
@@ -309,16 +297,16 @@ func TestServerPublish(t *testing.T) {
})
require.NoError(t, err)
<-path.streamCreated
<-streamCreated
reader := test.NilLogger
recv := make(chan struct{})
path.stream.AddReader(
strm.AddReader(
reader,
path.stream.Desc.Medias[0],
path.stream.Desc.Medias[0].Formats[0],
strm.Desc.Medias[0],
strm.Desc.Medias[0].Formats[0],
func(u unit.Unit) error {
select {
case <-recv:
@@ -334,8 +322,8 @@ func TestServerPublish(t *testing.T) {
return nil
})
path.stream.StartReader(reader)
defer path.stream.RemoveReader(reader)
strm.StartReader(reader)
defer strm.RemoveReader(reader)
err = track.WriteRTP(&rtp.Packet{
Header: rtp.Header{
@@ -516,8 +504,6 @@ func TestServerRead(t *testing.T) {
err := strm.Initialize()
require.NoError(t, err)
path := &dummyPath{stream: strm}
pathManager := &test.PathManager{
FindPathConfImpl: func(req defs.PathFindPathConfReq) (*conf.Path, error) {
require.Equal(t, "teststream", req.AccessRequest.Name)
@@ -531,7 +517,7 @@ func TestServerRead(t *testing.T) {
require.Equal(t, "param=value", req.AccessRequest.Query)
require.Equal(t, "myuser", req.AccessRequest.Credentials.User)
require.Equal(t, "mypass", req.AccessRequest.Credentials.Pass)
return path, strm, nil
return &dummyPath{}, strm, nil
},
}

View File

@@ -137,26 +137,21 @@ func (s *session) runInner2() (int, error) {
func (s *session) runPublish() (int, error) {
ip, _, _ := net.SplitHostPort(s.req.remoteAddr)
req := defs.PathAccessRequest{
Name: s.req.pathName,
Query: s.req.httpRequest.URL.RawQuery,
Publish: true,
Proto: auth.ProtocolWebRTC,
ID: &s.uuid,
Credentials: httpp.Credentials(s.req.httpRequest),
IP: net.ParseIP(ip),
}
path, err := s.pathManager.AddPublisher(defs.PathAddPublisherReq{
Author: s,
AccessRequest: req,
pathConf, err := s.pathManager.FindPathConf(defs.PathFindPathConfReq{
AccessRequest: defs.PathAccessRequest{
Name: s.req.pathName,
Query: s.req.httpRequest.URL.RawQuery,
Publish: true,
Proto: auth.ProtocolWebRTC,
ID: &s.uuid,
Credentials: httpp.Credentials(s.req.httpRequest),
IP: net.ParseIP(ip),
},
})
if err != nil {
return http.StatusBadRequest, err
}
defer path.RemovePublisher(defs.PathRemovePublisherReq{Author: s})
iceServers, err := s.parent.generateICEServers(false)
if err != nil {
return http.StatusInternalServerError, err
@@ -173,7 +168,7 @@ func (s *session) runPublish() (int, error) {
TrackGatherTimeout: s.trackGatherTimeout,
STUNGatherTimeout: s.stunGatherTimeout,
Publish: false,
UseAbsoluteTimestamp: path.SafeConf().UseAbsoluteTimestamp,
UseAbsoluteTimestamp: pathConf.UseAbsoluteTimestamp,
Log: s,
}
err = pc.Start()
@@ -230,15 +225,25 @@ func (s *session) runPublish() (int, error) {
return 0, err
}
stream, err = path.StartPublisher(defs.PathStartPublisherReq{
var path defs.Path
path, stream, err = s.pathManager.AddPublisher(defs.PathAddPublisherReq{
Author: s,
Desc: &description.Session{Medias: medias},
GenerateRTPPackets: false,
ConfToCompare: pathConf,
AccessRequest: defs.PathAccessRequest{
Name: s.req.pathName,
Query: s.req.httpRequest.URL.RawQuery,
Publish: true,
SkipAuth: true,
},
})
if err != nil {
return 0, err
}
defer path.RemovePublisher(defs.PathRemovePublisherReq{Author: s})
pc.StartReading()
select {

View File

@@ -10,7 +10,7 @@ import (
type PathManager struct {
FindPathConfImpl func(req defs.PathFindPathConfReq) (*conf.Path, error)
DescribeImpl func(req defs.PathDescribeReq) defs.PathDescribeRes
AddPublisherImpl func(req defs.PathAddPublisherReq) (defs.Path, error)
AddPublisherImpl func(req defs.PathAddPublisherReq) (defs.Path, *stream.Stream, error)
AddReaderImpl func(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error)
}
@@ -25,7 +25,7 @@ func (pm *PathManager) Describe(req defs.PathDescribeReq) defs.PathDescribeRes {
}
// AddPublisher implements PathManager.
func (pm *PathManager) AddPublisher(req defs.PathAddPublisherReq) (defs.Path, error) {
func (pm *PathManager) AddPublisher(req defs.PathAddPublisherReq) (defs.Path, *stream.Stream, error) {
return pm.AddPublisherImpl(req)
}