split readPublisher into reader and publisher

This commit is contained in:
aler9
2021-07-31 20:46:06 +02:00
parent 1a7f26ce29
commit eee64a1450
12 changed files with 579 additions and 574 deletions

View File

@@ -165,17 +165,11 @@ func (r *hlsRemuxer) Close() {
r.ctxCancel() r.ctxCancel()
} }
// IsReadPublisher implements readPublisher.
func (r *hlsRemuxer) IsReadPublisher() {}
// IsSource implements source.
func (r *hlsRemuxer) IsSource() {}
func (r *hlsRemuxer) log(level logger.Level, format string, args ...interface{}) { func (r *hlsRemuxer) log(level logger.Level, format string, args ...interface{}) {
r.parent.Log(level, "[remuxer %s] "+format, append([]interface{}{r.pathName}, args...)...) r.parent.Log(level, "[remuxer %s] "+format, append([]interface{}{r.pathName}, args...)...)
} }
// PathName returns the path name of the readPublisher // PathName returns the path name.
func (r *hlsRemuxer) PathName() string { func (r *hlsRemuxer) PathName() string {
return r.pathName return r.pathName
} }
@@ -203,15 +197,11 @@ func (r *hlsRemuxer) run() {
r.ctxCancel() r.ctxCancel()
if r.path != nil {
r.path.OnReadPublisherRemove(readPublisherRemoveReq{Author: r})
}
r.parent.OnRemuxerClose(r) r.parent.OnRemuxerClose(r)
} }
func (r *hlsRemuxer) runInner(innerCtx context.Context) error { func (r *hlsRemuxer) runInner(innerCtx context.Context) error {
res := r.pathManager.OnReadPublisherSetupPlay(readPublisherSetupPlayReq{ res := r.pathManager.OnReaderSetupPlay(pathReaderSetupPlayReq{
Author: r, Author: r,
PathName: r.pathName, PathName: r.pathName,
IP: nil, IP: nil,
@@ -223,6 +213,11 @@ func (r *hlsRemuxer) runInner(innerCtx context.Context) error {
} }
r.path = res.Path r.path = res.Path
defer func() {
r.path.OnReaderRemove(pathReaderRemoveReq{Author: r})
}()
var videoTrack *gortsplib.Track var videoTrack *gortsplib.Track
videoTrackID := -1 videoTrackID := -1
var h264SPS []byte var h264SPS []byte
@@ -300,7 +295,7 @@ func (r *hlsRemuxer) runInner(innerCtx context.Context) error {
r.ringBuffer = ringbuffer.New(uint64(r.readBufferCount)) r.ringBuffer = ringbuffer.New(uint64(r.readBufferCount))
r.path.OnReadPublisherPlay(readPublisherPlayReq{ r.path.OnReaderPlay(pathReaderPlayReq{
Author: r, Author: r,
}) })
@@ -489,17 +484,13 @@ func (r *hlsRemuxer) OnRequest(req hlsRemuxerRequest) {
} }
} }
// OnReaderAccepted implements readPublisher. // OnReaderAccepted implements reader.
func (r *hlsRemuxer) OnReaderAccepted() { func (r *hlsRemuxer) OnReaderAccepted() {
r.log(logger.Info, "is remuxing into HLS") r.log(logger.Info, "is remuxing into HLS")
} }
// OnPublisherAccepted implements readPublisher. // OnReaderFrame implements reader.
func (r *hlsRemuxer) OnPublisherAccepted(tracksLen int) { func (r *hlsRemuxer) OnReaderFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
}
// OnFrame implements readPublisher.
func (r *hlsRemuxer) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
if streamType == gortsplib.StreamTypeRTP { if streamType == gortsplib.StreamTypeRTP {
r.ringBuffer.Push(hlsRemuxerTrackIDPayloadPair{trackID, payload}) r.ringBuffer.Push(hlsRemuxerTrackIDPayloadPair{trackID, payload})
} }

File diff suppressed because it is too large Load Diff

View File

@@ -39,9 +39,9 @@ type pathManager struct {
confReload chan map[string]*conf.PathConf confReload chan map[string]*conf.PathConf
pathClose chan *path pathClose chan *path
pathSourceReady chan *path pathSourceReady chan *path
rpDescribe chan readPublisherDescribeReq describe chan pathDescribeReq
rpSetupPlay chan readPublisherSetupPlayReq readerSetupPlay chan pathReaderSetupPlayReq
rpAnnounce chan readPublisherAnnounceReq publisherAnnounce chan pathPublisherAnnounceReq
hlsServerSet chan *hlsServer hlsServerSet chan *hlsServer
} }
@@ -74,9 +74,9 @@ func newPathManager(
confReload: make(chan map[string]*conf.PathConf), confReload: make(chan map[string]*conf.PathConf),
pathClose: make(chan *path), pathClose: make(chan *path),
pathSourceReady: make(chan *path), pathSourceReady: make(chan *path),
rpDescribe: make(chan readPublisherDescribeReq), describe: make(chan pathDescribeReq),
rpSetupPlay: make(chan readPublisherSetupPlayReq), readerSetupPlay: make(chan pathReaderSetupPlayReq),
rpAnnounce: make(chan readPublisherAnnounceReq), publisherAnnounce: make(chan pathPublisherAnnounceReq),
hlsServerSet: make(chan *hlsServer), hlsServerSet: make(chan *hlsServer),
} }
@@ -150,10 +150,10 @@ outer:
pm.hlsServer.OnPathSourceReady(pa) pm.hlsServer.OnPathSourceReady(pa)
} }
case req := <-pm.rpDescribe: case req := <-pm.describe:
pathName, pathConf, err := pm.findPathConf(req.PathName) pathName, pathConf, err := pm.findPathConf(req.PathName)
if err != nil { if err != nil {
req.Res <- readPublisherDescribeRes{Err: err} req.Res <- pathDescribeRes{Err: err}
continue continue
} }
@@ -166,7 +166,7 @@ outer:
pathConf.ReadPass, pathConf.ReadPass,
) )
if err != nil { if err != nil {
req.Res <- readPublisherDescribeRes{Err: err} req.Res <- pathDescribeRes{Err: err}
continue continue
} }
@@ -175,12 +175,12 @@ outer:
pm.createPath(pathName, pathConf, req.PathName) pm.createPath(pathName, pathConf, req.PathName)
} }
pm.paths[req.PathName].OnPathManDescribe(req) pm.paths[req.PathName].OnDescribe(req)
case req := <-pm.rpSetupPlay: case req := <-pm.readerSetupPlay:
pathName, pathConf, err := pm.findPathConf(req.PathName) pathName, pathConf, err := pm.findPathConf(req.PathName)
if err != nil { if err != nil {
req.Res <- readPublisherSetupPlayRes{Err: err} req.Res <- pathReaderSetupPlayRes{Err: err}
continue continue
} }
@@ -193,7 +193,7 @@ outer:
pathConf.ReadPass, pathConf.ReadPass,
) )
if err != nil { if err != nil {
req.Res <- readPublisherSetupPlayRes{Err: err} req.Res <- pathReaderSetupPlayRes{Err: err}
continue continue
} }
@@ -202,12 +202,12 @@ outer:
pm.createPath(pathName, pathConf, req.PathName) pm.createPath(pathName, pathConf, req.PathName)
} }
pm.paths[req.PathName].OnPathManSetupPlay(req) pm.paths[req.PathName].OnReaderSetupPlay(req)
case req := <-pm.rpAnnounce: case req := <-pm.publisherAnnounce:
pathName, pathConf, err := pm.findPathConf(req.PathName) pathName, pathConf, err := pm.findPathConf(req.PathName)
if err != nil { if err != nil {
req.Res <- readPublisherAnnounceRes{Err: err} req.Res <- pathPublisherAnnounceRes{Err: err}
continue continue
} }
@@ -220,7 +220,7 @@ outer:
pathConf.PublishPass, pathConf.PublishPass,
) )
if err != nil { if err != nil {
req.Res <- readPublisherAnnounceRes{Err: err} req.Res <- pathPublisherAnnounceRes{Err: err}
continue continue
} }
@@ -229,7 +229,7 @@ outer:
pm.createPath(pathName, pathConf, req.PathName) pm.createPath(pathName, pathConf, req.PathName)
} }
pm.paths[req.PathName].OnPathManAnnounce(req) pm.paths[req.PathName].OnPublisherAnnounce(req)
case s := <-pm.hlsServerSet: case s := <-pm.hlsServerSet:
pm.hlsServer = s pm.hlsServer = s
@@ -298,7 +298,7 @@ func (pm *pathManager) authenticate(
// validate ip // validate ip
if pathIPs != nil && ip != nil { if pathIPs != nil && ip != nil {
if !ipEqualOrInRange(ip, pathIPs) { if !ipEqualOrInRange(ip, pathIPs) {
return readPublisherErrAuthCritical{ return pathErrAuthCritical{
Message: fmt.Sprintf("IP '%s' not allowed", ip), Message: fmt.Sprintf("IP '%s' not allowed", ip),
Response: &base.Response{ Response: &base.Response{
StatusCode: base.StatusUnauthorized, StatusCode: base.StatusUnauthorized,
@@ -342,36 +342,36 @@ func (pm *pathManager) OnPathClose(pa *path) {
} }
} }
// OnReadPublisherDescribe is called by a readPublisher. // OnDescribe is called by a reader or publisher.
func (pm *pathManager) OnReadPublisherDescribe(req readPublisherDescribeReq) readPublisherDescribeRes { func (pm *pathManager) OnDescribe(req pathDescribeReq) pathDescribeRes {
req.Res = make(chan readPublisherDescribeRes) req.Res = make(chan pathDescribeRes)
select { select {
case pm.rpDescribe <- req: case pm.describe <- req:
return <-req.Res return <-req.Res
case <-pm.ctx.Done(): case <-pm.ctx.Done():
return readPublisherDescribeRes{Err: fmt.Errorf("terminated")} return pathDescribeRes{Err: fmt.Errorf("terminated")}
} }
} }
// OnReadPublisherAnnounce is called by a readPublisher. // OnPublisherAnnounce is called by a publisher.
func (pm *pathManager) OnReadPublisherAnnounce(req readPublisherAnnounceReq) readPublisherAnnounceRes { func (pm *pathManager) OnPublisherAnnounce(req pathPublisherAnnounceReq) pathPublisherAnnounceRes {
req.Res = make(chan readPublisherAnnounceRes) req.Res = make(chan pathPublisherAnnounceRes)
select { select {
case pm.rpAnnounce <- req: case pm.publisherAnnounce <- req:
return <-req.Res return <-req.Res
case <-pm.ctx.Done(): case <-pm.ctx.Done():
return readPublisherAnnounceRes{Err: fmt.Errorf("terminated")} return pathPublisherAnnounceRes{Err: fmt.Errorf("terminated")}
} }
} }
// OnReadPublisherSetupPlay is called by a readPublisher. // OnReaderSetupPlay is called by a reader.
func (pm *pathManager) OnReadPublisherSetupPlay(req readPublisherSetupPlayReq) readPublisherSetupPlayRes { func (pm *pathManager) OnReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes {
req.Res = make(chan readPublisherSetupPlayRes) req.Res = make(chan pathReaderSetupPlayRes)
select { select {
case pm.rpSetupPlay <- req: case pm.readerSetupPlay <- req:
return <-req.Res return <-req.Res
case <-pm.ctx.Done(): case <-pm.ctx.Done():
return readPublisherSetupPlayRes{Err: fmt.Errorf("terminated")} return pathReaderSetupPlayRes{Err: fmt.Errorf("terminated")}
} }
} }

View File

@@ -0,0 +1,8 @@
package core
// publisher is an entity that can publish a stream dynamically.
type publisher interface {
source
Close()
OnPublisherAccepted(tracksLen int)
}

View File

@@ -1,113 +0,0 @@
package core
import (
"fmt"
"net"
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/gortsplib/pkg/headers"
)
type readPublisherErrNoOnePublishing struct {
PathName string
}
// Error implements the error interface.
func (e readPublisherErrNoOnePublishing) Error() string {
return fmt.Sprintf("no one is publishing to path '%s'", e.PathName)
}
type readPublisherErrAuthNotCritical struct {
*base.Response
}
// Error implements the error interface.
func (readPublisherErrAuthNotCritical) Error() string {
return "non-critical authentication error"
}
type readPublisherErrAuthCritical struct {
Message string
Response *base.Response
}
// Error implements the error interface.
func (readPublisherErrAuthCritical) Error() string {
return "critical authentication error"
}
type readPublisher interface {
IsReadPublisher()
IsSource()
Close()
OnReaderAccepted()
OnPublisherAccepted(tracksLen int)
OnFrame(int, gortsplib.StreamType, []byte)
}
type readPublisherDescribeRes struct {
Stream *gortsplib.ServerStream
Redirect string
Err error
}
type readPublisherDescribeReq struct {
PathName string
URL *base.URL
IP net.IP
ValidateCredentials func(authMethods []headers.AuthMethod, pathUser string, pathPass string) error
Res chan readPublisherDescribeRes
}
type readPublisherSetupPlayRes struct {
Path *path
Stream *gortsplib.ServerStream
Err error
}
type readPublisherSetupPlayReq struct {
Author readPublisher
PathName string
IP net.IP
ValidateCredentials func(authMethods []headers.AuthMethod, pathUser string, pathPass string) error
Res chan readPublisherSetupPlayRes
}
type readPublisherAnnounceRes struct {
Path *path
Err error
}
type readPublisherAnnounceReq struct {
Author readPublisher
PathName string
Tracks gortsplib.Tracks
IP net.IP
ValidateCredentials func(authMethods []headers.AuthMethod, pathUser string, pathPass string) error
Res chan readPublisherAnnounceRes
}
type readPublisherRemoveReq struct {
Author readPublisher
Res chan struct{}
}
type readPublisherPlayReq struct {
Author readPublisher
Res chan struct{}
}
type readPublisherRecordRes struct {
Err error
}
type readPublisherRecordReq struct {
Author readPublisher
Res chan readPublisherRecordRes
}
type readPublisherPauseReq struct {
Author readPublisher
Res chan struct{}
}

12
internal/core/reader.go Normal file
View File

@@ -0,0 +1,12 @@
package core
import (
"github.com/aler9/gortsplib"
)
// reader is an entity that can read a stream.
type reader interface {
Close()
OnReaderAccepted()
OnReaderFrame(int, gortsplib.StreamType, []byte)
}

View File

@@ -121,9 +121,6 @@ func (c *rtmpConn) Close() {
c.ctxCancel() c.ctxCancel()
} }
// IsReadPublisher implements readPublisher.
func (c *rtmpConn) IsReadPublisher() {}
// IsSource implements source. // IsSource implements source.
func (c *rtmpConn) IsSource() {} func (c *rtmpConn) IsSource() {}
@@ -168,10 +165,6 @@ func (c *rtmpConn) run() {
c.ctxCancel() c.ctxCancel()
if c.path != nil {
c.path.OnReadPublisherRemove(readPublisherRemoveReq{Author: c})
}
c.parent.OnConnClose(c) c.parent.OnConnClose(c)
} }
@@ -197,7 +190,7 @@ func (c *rtmpConn) runInner(ctx context.Context) error {
func (c *rtmpConn) runRead(ctx context.Context) error { func (c *rtmpConn) runRead(ctx context.Context) error {
pathName, query := pathNameAndQuery(c.conn.URL()) pathName, query := pathNameAndQuery(c.conn.URL())
res := c.pathManager.OnReadPublisherSetupPlay(readPublisherSetupPlayReq{ res := c.pathManager.OnReaderSetupPlay(pathReaderSetupPlayReq{
Author: c, Author: c,
PathName: pathName, PathName: pathName,
IP: c.ip(), IP: c.ip(),
@@ -207,7 +200,7 @@ func (c *rtmpConn) runRead(ctx context.Context) error {
}) })
if res.Err != nil { if res.Err != nil {
if terr, ok := res.Err.(readPublisherErrAuthCritical); ok { if terr, ok := res.Err.(pathErrAuthCritical); ok {
// wait some seconds to stop brute force attacks // wait some seconds to stop brute force attacks
<-time.After(rtmpConnPauseAfterAuthError) <-time.After(rtmpConnPauseAfterAuthError)
return errors.New(terr.Message) return errors.New(terr.Message)
@@ -217,6 +210,10 @@ func (c *rtmpConn) runRead(ctx context.Context) error {
c.path = res.Path c.path = res.Path
defer func() {
c.path.OnReaderRemove(pathReaderRemoveReq{Author: c})
}()
var videoTrack *gortsplib.Track var videoTrack *gortsplib.Track
videoTrackID := -1 videoTrackID := -1
var h264Decoder *rtph264.Decoder var h264Decoder *rtph264.Decoder
@@ -261,7 +258,7 @@ func (c *rtmpConn) runRead(ctx context.Context) error {
c.ringBuffer.Close() c.ringBuffer.Close()
}() }()
c.path.OnReadPublisherPlay(readPublisherPlayReq{ c.path.OnReaderPlay(pathReaderPlayReq{
Author: c, Author: c,
}) })
@@ -389,7 +386,7 @@ func (c *rtmpConn) runPublish(ctx context.Context) error {
pathName, query := pathNameAndQuery(c.conn.URL()) pathName, query := pathNameAndQuery(c.conn.URL())
res := c.pathManager.OnReadPublisherAnnounce(readPublisherAnnounceReq{ res := c.pathManager.OnPublisherAnnounce(pathPublisherAnnounceReq{
Author: c, Author: c,
PathName: pathName, PathName: pathName,
Tracks: tracks, Tracks: tracks,
@@ -400,7 +397,7 @@ func (c *rtmpConn) runPublish(ctx context.Context) error {
}) })
if res.Err != nil { if res.Err != nil {
if terr, ok := res.Err.(readPublisherErrAuthCritical); ok { if terr, ok := res.Err.(pathErrAuthCritical); ok {
// wait some seconds to stop brute force attacks // wait some seconds to stop brute force attacks
<-time.After(rtmpConnPauseAfterAuthError) <-time.After(rtmpConnPauseAfterAuthError)
return errors.New(terr.Message) return errors.New(terr.Message)
@@ -410,10 +407,14 @@ func (c *rtmpConn) runPublish(ctx context.Context) error {
c.path = res.Path c.path = res.Path
defer func() {
c.path.OnPublisherRemove(pathPublisherRemoveReq{Author: c})
}()
// disable write deadline // disable write deadline
c.conn.NetConn().SetWriteDeadline(time.Time{}) c.conn.NetConn().SetWriteDeadline(time.Time{})
rres := c.path.OnReadPublisherRecord(readPublisherRecordReq{Author: c}) rres := c.path.OnPublisherRecord(pathPublisherRecordReq{Author: c})
if rres.Err != nil { if rres.Err != nil {
return rres.Err return rres.Err
} }
@@ -497,7 +498,7 @@ func (c *rtmpConn) validateCredentials(
) error { ) error {
if query.Get("user") != pathUser || if query.Get("user") != pathUser ||
query.Get("pass") != pathPass { query.Get("pass") != pathPass {
return readPublisherErrAuthCritical{ return pathErrAuthCritical{
Message: "wrong username or password", Message: "wrong username or password",
} }
} }
@@ -505,12 +506,19 @@ func (c *rtmpConn) validateCredentials(
return nil return nil
} }
// OnReaderAccepted implements readPublisher. // OnReaderAccepted implements reader.
func (c *rtmpConn) OnReaderAccepted() { func (c *rtmpConn) OnReaderAccepted() {
c.log(logger.Info, "is reading from path '%s'", c.path.Name()) c.log(logger.Info, "is reading from path '%s'", c.path.Name())
} }
// OnPublisherAccepted implements readPublisher. // OnReaderFrame implements reader.
func (c *rtmpConn) OnReaderFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
if streamType == gortsplib.StreamTypeRTP {
c.ringBuffer.Push(rtmpConnTrackIDPayloadPair{trackID, payload})
}
}
// OnPublisherAccepted implements publisher.
func (c *rtmpConn) OnPublisherAccepted(tracksLen int) { func (c *rtmpConn) OnPublisherAccepted(tracksLen int) {
c.log(logger.Info, "is publishing to path '%s', %d %s", c.log(logger.Info, "is publishing to path '%s', %d %s",
c.path.Name(), c.path.Name(),
@@ -522,10 +530,3 @@ func (c *rtmpConn) OnPublisherAccepted(tracksLen int) {
return "tracks" return "tracks"
}()) }())
} }
// OnFrame implements readPublisher.
func (c *rtmpConn) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
if streamType == gortsplib.StreamTypeRTP {
c.ringBuffer.Push(rtmpConnTrackIDPayloadPair{trackID, payload})
}
}

View File

@@ -24,8 +24,8 @@ const (
type rtmpSourceParent interface { type rtmpSourceParent interface {
Log(logger.Level, string, ...interface{}) Log(logger.Level, string, ...interface{})
OnSourceExternalSetReady(req sourceExtSetReadyReq) OnSourceStaticSetReady(req pathSourceStaticSetReadyReq)
OnSourceExternalSetNotReady(req sourceExtSetNotReadyReq) OnSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq)
OnSourceFrame(int, gortsplib.StreamType, []byte) OnSourceFrame(int, gortsplib.StreamType, []byte)
} }
@@ -81,8 +81,8 @@ func (s *rtmpSource) Close() {
// IsSource implements source. // IsSource implements source.
func (s *rtmpSource) IsSource() {} func (s *rtmpSource) IsSource() {}
// IsSourceExternal implements sourceExternal. // IsSourceStatic implements sourceStatic.
func (s *rtmpSource) IsSourceExternal() {} func (s *rtmpSource) IsSourceStatic() {}
func (s *rtmpSource) log(level logger.Level, format string, args ...interface{}) { func (s *rtmpSource) log(level logger.Level, format string, args ...interface{}) {
s.parent.Log(level, "[rtmp source] "+format, args...) s.parent.Log(level, "[rtmp source] "+format, args...)
@@ -168,12 +168,12 @@ func (s *rtmpSource) runInner() bool {
s.log(logger.Info, "ready") s.log(logger.Info, "ready")
s.parent.OnSourceExternalSetReady(sourceExtSetReadyReq{ s.parent.OnSourceStaticSetReady(pathSourceStaticSetReadyReq{
Tracks: tracks, Tracks: tracks,
}) })
defer func() { defer func() {
s.parent.OnSourceExternalSetNotReady(sourceExtSetNotReadyReq{}) s.parent.OnSourceStaticSetNotReady(pathSourceStaticSetNotReadyReq{})
}() }()
rtcpSenders := rtcpsenderset.New(tracks, s.parent.OnSourceFrame) rtcpSenders := rtcpsenderset.New(tracks, s.parent.OnSourceFrame)

View File

@@ -148,7 +148,7 @@ func (c *rtspConn) validateCredentials(
// 4) with password and username // 4) with password and username
// therefore we must allow up to 3 failures // therefore we must allow up to 3 failures
if c.authFailures > 3 { if c.authFailures > 3 {
return readPublisherErrAuthCritical{ return pathErrAuthCritical{
Message: "unauthorized: " + err.Error(), Message: "unauthorized: " + err.Error(),
Response: &base.Response{ Response: &base.Response{
StatusCode: base.StatusUnauthorized, StatusCode: base.StatusUnauthorized,
@@ -160,7 +160,7 @@ func (c *rtspConn) validateCredentials(
c.log(logger.Debug, "WARN: unauthorized: %s", err) c.log(logger.Debug, "WARN: unauthorized: %s", err)
} }
return readPublisherErrAuthNotCritical{ return pathErrAuthNotCritical{
Response: &base.Response{ Response: &base.Response{
StatusCode: base.StatusUnauthorized, StatusCode: base.StatusUnauthorized,
Header: base.Header{ Header: base.Header{
@@ -188,7 +188,7 @@ func (c *rtspConn) OnResponse(res *base.Response) {
// OnDescribe is called by rtspServer. // OnDescribe is called by rtspServer.
func (c *rtspConn) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) { func (c *rtspConn) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) {
res := c.pathManager.OnReadPublisherDescribe(readPublisherDescribeReq{ res := c.pathManager.OnDescribe(pathDescribeReq{
PathName: ctx.Path, PathName: ctx.Path,
URL: ctx.Req.URL, URL: ctx.Req.URL,
IP: c.ip(), IP: c.ip(),
@@ -199,16 +199,16 @@ func (c *rtspConn) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.
if res.Err != nil { if res.Err != nil {
switch terr := res.Err.(type) { switch terr := res.Err.(type) {
case readPublisherErrAuthNotCritical: case pathErrAuthNotCritical:
return terr.Response, nil, nil return terr.Response, nil, nil
case readPublisherErrAuthCritical: case pathErrAuthCritical:
// wait some seconds to stop brute force attacks // wait some seconds to stop brute force attacks
<-time.After(rtspConnPauseAfterAuthError) <-time.After(rtspConnPauseAfterAuthError)
return terr.Response, nil, errors.New(terr.Message) return terr.Response, nil, errors.New(terr.Message)
case readPublisherErrNoOnePublishing: case pathErrNoOnePublishing:
return &base.Response{ return &base.Response{
StatusCode: base.StatusNotFound, StatusCode: base.StatusNotFound,
}, nil, res.Err }, nil, res.Err

View File

@@ -66,8 +66,13 @@ func (s *rtspSession) ParentClose() {
} }
} }
if s.path != nil { switch s.ss.State() {
s.path.OnReadPublisherRemove(readPublisherRemoveReq{Author: s}) case gortsplib.ServerSessionStatePrePlay, gortsplib.ServerSessionStatePlay:
s.path.OnReaderRemove(pathReaderRemoveReq{Author: s})
s.path = nil
case gortsplib.ServerSessionStatePreRecord, gortsplib.ServerSessionStateRecord:
s.path.OnPublisherRemove(pathPublisherRemoveReq{Author: s})
s.path = nil s.path = nil
} }
@@ -79,13 +84,10 @@ func (s *rtspSession) Close() {
s.ss.Close() s.ss.Close()
} }
// IsReadPublisher implements readPublisher.
func (s *rtspSession) IsReadPublisher() {}
// IsSource implements source. // IsSource implements source.
func (s *rtspSession) IsSource() {} func (s *rtspSession) IsSource() {}
// IsRTSPSession implements path.rtspSession. // IsRTSPSession implements pathRTSPSession.
func (s *rtspSession) IsRTSPSession() {} func (s *rtspSession) IsRTSPSession() {}
// VisualID returns the visual ID of the session. // VisualID returns the visual ID of the session.
@@ -106,7 +108,7 @@ func (s *rtspSession) log(level logger.Level, format string, args ...interface{}
// OnAnnounce is called by rtspServer. // OnAnnounce is called by rtspServer.
func (s *rtspSession) OnAnnounce(c *rtspConn, ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Response, error) { func (s *rtspSession) OnAnnounce(c *rtspConn, ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Response, error) {
res := s.pathManager.OnReadPublisherAnnounce(readPublisherAnnounceReq{ res := s.pathManager.OnPublisherAnnounce(pathPublisherAnnounceReq{
Author: s, Author: s,
PathName: ctx.Path, PathName: ctx.Path,
Tracks: ctx.Tracks, Tracks: ctx.Tracks,
@@ -118,10 +120,10 @@ func (s *rtspSession) OnAnnounce(c *rtspConn, ctx *gortsplib.ServerHandlerOnAnno
if res.Err != nil { if res.Err != nil {
switch terr := res.Err.(type) { switch terr := res.Err.(type) {
case readPublisherErrAuthNotCritical: case pathErrAuthNotCritical:
return terr.Response, nil return terr.Response, nil
case readPublisherErrAuthCritical: case pathErrAuthCritical:
// wait some seconds to stop brute force attacks // wait some seconds to stop brute force attacks
<-time.After(pauseAfterAuthError) <-time.After(pauseAfterAuthError)
@@ -165,7 +167,7 @@ func (s *rtspSession) OnSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt
switch s.ss.State() { switch s.ss.State() {
case gortsplib.ServerSessionStateInitial, gortsplib.ServerSessionStatePrePlay: // play case gortsplib.ServerSessionStateInitial, gortsplib.ServerSessionStatePrePlay: // play
res := s.pathManager.OnReadPublisherSetupPlay(readPublisherSetupPlayReq{ res := s.pathManager.OnReaderSetupPlay(pathReaderSetupPlayReq{
Author: s, Author: s,
PathName: ctx.Path, PathName: ctx.Path,
IP: ctx.Conn.NetConn().RemoteAddr().(*net.TCPAddr).IP, IP: ctx.Conn.NetConn().RemoteAddr().(*net.TCPAddr).IP,
@@ -176,16 +178,16 @@ func (s *rtspSession) OnSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt
if res.Err != nil { if res.Err != nil {
switch terr := res.Err.(type) { switch terr := res.Err.(type) {
case readPublisherErrAuthNotCritical: case pathErrAuthNotCritical:
return terr.Response, nil, nil return terr.Response, nil, nil
case readPublisherErrAuthCritical: case pathErrAuthCritical:
// wait some seconds to stop brute force attacks // wait some seconds to stop brute force attacks
<-time.After(pauseAfterAuthError) <-time.After(pauseAfterAuthError)
return terr.Response, nil, errors.New(terr.Message) return terr.Response, nil, errors.New(terr.Message)
case readPublisherErrNoOnePublishing: case pathErrNoOnePublishing:
return &base.Response{ return &base.Response{
StatusCode: base.StatusNotFound, StatusCode: base.StatusNotFound,
}, nil, res.Err }, nil, res.Err
@@ -232,9 +234,7 @@ func (s *rtspSession) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Respo
}, fmt.Errorf("path has changed, was '%s', now is '%s'", s.path.Name(), ctx.Path) }, fmt.Errorf("path has changed, was '%s', now is '%s'", s.path.Name(), ctx.Path)
} }
s.path.OnReadPublisherPlay(readPublisherPlayReq{ s.path.OnReaderPlay(pathReaderPlayReq{Author: s})
Author: s,
})
if s.path.Conf().RunOnRead != "" { if s.path.Conf().RunOnRead != "" {
_, port, _ := net.SplitHostPort(s.rtspAddress) _, port, _ := net.SplitHostPort(s.rtspAddress)
@@ -259,7 +259,7 @@ func (s *rtspSession) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R
}, fmt.Errorf("path has changed, was '%s', now is '%s'", s.path.Name(), ctx.Path) }, fmt.Errorf("path has changed, was '%s', now is '%s'", s.path.Name(), ctx.Path)
} }
res := s.path.OnReadPublisherRecord(readPublisherRecordReq{Author: s}) res := s.path.OnPublisherRecord(pathPublisherRecordReq{Author: s})
if res.Err != nil { if res.Err != nil {
return &base.Response{ return &base.Response{
StatusCode: base.StatusBadRequest, StatusCode: base.StatusBadRequest,
@@ -279,10 +279,10 @@ func (s *rtspSession) OnPause(ctx *gortsplib.ServerHandlerOnPauseCtx) (*base.Res
s.onReadCmd.Close() s.onReadCmd.Close()
} }
s.path.OnReadPublisherPause(readPublisherPauseReq{Author: s}) s.path.OnReaderPause(pathReaderPauseReq{Author: s})
case gortsplib.ServerSessionStateRecord: case gortsplib.ServerSessionStateRecord:
s.path.OnReadPublisherPause(readPublisherPauseReq{Author: s}) s.path.OnPublisherPause(pathPublisherPauseReq{Author: s})
} }
return &base.Response{ return &base.Response{
@@ -290,7 +290,7 @@ func (s *rtspSession) OnPause(ctx *gortsplib.ServerHandlerOnPauseCtx) (*base.Res
}, nil }, nil
} }
// OnReaderAccepted implements readPublisher. // OnReaderAccepted implements reader.
func (s *rtspSession) OnReaderAccepted() { func (s *rtspSession) OnReaderAccepted() {
tracksLen := len(s.ss.SetuppedTracks()) tracksLen := len(s.ss.SetuppedTracks())
@@ -306,7 +306,12 @@ func (s *rtspSession) OnReaderAccepted() {
s.displayedProtocol()) s.displayedProtocol())
} }
// OnPublisherAccepted implements readPublisher. // OnReaderFrame implements reader.
func (s *rtspSession) OnReaderFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
s.ss.WriteFrame(trackID, streamType, payload)
}
// OnPublisherAccepted implements publisher.
func (s *rtspSession) OnPublisherAccepted(tracksLen int) { func (s *rtspSession) OnPublisherAccepted(tracksLen int) {
s.log(logger.Info, "is publishing to path '%s', %d %s with %s", s.log(logger.Info, "is publishing to path '%s', %d %s with %s",
s.path.Name(), s.path.Name(),
@@ -320,11 +325,6 @@ func (s *rtspSession) OnPublisherAccepted(tracksLen int) {
s.displayedProtocol()) s.displayedProtocol())
} }
// OnFrame implements readPublisher.
func (s *rtspSession) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
s.ss.WriteFrame(trackID, streamType, payload)
}
// OnIncomingFrame is called by rtspServer. // OnIncomingFrame is called by rtspServer.
func (s *rtspSession) OnIncomingFrame(ctx *gortsplib.ServerHandlerOnFrameCtx) { func (s *rtspSession) OnIncomingFrame(ctx *gortsplib.ServerHandlerOnFrameCtx) {
if s.ss.State() != gortsplib.ServerSessionStateRecord { if s.ss.State() != gortsplib.ServerSessionStateRecord {

View File

@@ -23,8 +23,8 @@ const (
type rtspSourceParent interface { type rtspSourceParent interface {
Log(logger.Level, string, ...interface{}) Log(logger.Level, string, ...interface{})
OnSourceExternalSetReady(req sourceExtSetReadyReq) OnSourceStaticSetReady(req pathSourceStaticSetReadyReq)
OnSourceExternalSetNotReady(req sourceExtSetNotReadyReq) OnSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq)
OnSourceFrame(int, gortsplib.StreamType, []byte) OnSourceFrame(int, gortsplib.StreamType, []byte)
} }
@@ -94,8 +94,8 @@ func (s *rtspSource) Close() {
// IsSource implements source. // IsSource implements source.
func (s *rtspSource) IsSource() {} func (s *rtspSource) IsSource() {}
// IsSourceExternal implements sourceExternal. // IsSourceStatic implements sourceStatic.
func (s *rtspSource) IsSourceExternal() {} func (s *rtspSource) IsSourceStatic() {}
func (s *rtspSource) log(level logger.Level, format string, args ...interface{}) { func (s *rtspSource) log(level logger.Level, format string, args ...interface{}) {
s.parent.Log(level, "[rtsp source] "+format, args...) s.parent.Log(level, "[rtsp source] "+format, args...)
@@ -187,12 +187,12 @@ func (s *rtspSource) runInner() bool {
s.log(logger.Info, "ready") s.log(logger.Info, "ready")
s.parent.OnSourceExternalSetReady(sourceExtSetReadyReq{ s.parent.OnSourceStaticSetReady(pathSourceStaticSetReadyReq{
Tracks: conn.Tracks(), Tracks: conn.Tracks(),
}) })
defer func() { defer func() {
s.parent.OnSourceExternalSetNotReady(sourceExtSetNotReadyReq{}) s.parent.OnSourceStaticSetNotReady(pathSourceStaticSetNotReadyReq{})
}() }()
readErr := make(chan error) readErr := make(chan error)

View File

@@ -1,26 +1,13 @@
package core package core
import ( // source is an entity that can provide a stream, statically or dynamically.
"github.com/aler9/gortsplib"
)
type source interface { type source interface {
IsSource() IsSource()
} }
type sourceExternal interface { // sourceStatic is an entity that can provide a static stream.
IsSource() type sourceStatic interface {
IsSourceExternal() source
IsSourceStatic()
Close() Close()
} }
type sourceExtSetReadyRes struct{}
type sourceExtSetReadyReq struct {
Tracks gortsplib.Tracks
Res chan sourceExtSetReadyRes
}
type sourceExtSetNotReadyReq struct {
Res chan struct{}
}