add option sourceOnDemand to pull sources only when there are connected clients (#36)

This commit is contained in:
aler9
2020-07-29 23:30:42 +02:00
parent df10e8898f
commit d7d2ba38f1
10 changed files with 543 additions and 378 deletions

View File

@@ -74,9 +74,10 @@ paths:
# readUser: test
# readPass: tast
# proxied:
# source: rtsp://localhost:8554/mystream
# sourceProtocol: tcp
proxied:
source: rtsp://192.168.2.198:8554/stream
sourceProtocol: tcp
sourceOnDemand: yes
# original:
# runOnPublish: ffmpeg -i rtsp://localhost:8554/original -b:a 64k -c:v libx264 -preset ultrafast -b:v 500k -max_muxing_queue_size 1024 -f rtsp rtsp://localhost:8554/compressed

View File

@@ -71,7 +71,7 @@ docker run --rm -it -v $PWD/rtsp-simple-server.yml:/rtsp-simple-server.yml -p 85
#### Full configuration file
To change the configuration, it's enough to edit the `rtsp-simple-server.yml` file, provided with the executable. The default configuration is [available here](rtsp-simple-server.yml).
To see or change the configuration, edit the `rtsp-simple-server.yml` file, provided with the executable. The default configuration is [available here](rtsp-simple-server.yml).
#### Usage as RTSP Proxy

View File

@@ -23,25 +23,26 @@ const (
clientUdpWriteBufferSize = 128 * 1024
)
type serverClientTrack struct {
type clientTrack struct {
rtpPort int
rtcpPort int
}
type serverClientEvent interface {
type clientEvent interface {
isServerClientEvent()
}
type serverClientEventFrameTcp struct {
type clientEventFrameTcp struct {
frame *gortsplib.InterleavedFrame
}
func (serverClientEventFrameTcp) isServerClientEvent() {}
func (clientEventFrameTcp) isServerClientEvent() {}
type serverClientState int
type clientState int
const (
clientStateStarting serverClientState = iota
clientStateInitial clientState = iota
clientStateWaitingDescription
clientStateAnnounce
clientStatePrePlay
clientStatePlay
@@ -49,10 +50,10 @@ const (
clientStateRecord
)
func (cs serverClientState) String() string {
func (cs clientState) String() string {
switch cs {
case clientStateStarting:
return "STARTING"
case clientStateInitial:
return "INITIAL"
case clientStateAnnounce:
return "ANNOUNCE"
@@ -72,36 +73,35 @@ func (cs serverClientState) String() string {
return "UNKNOWN"
}
type serverClient struct {
type client struct {
p *program
conn *gortsplib.ConnServer
state serverClientState
state clientState
path string
authUser string
authPass string
authHelper *gortsplib.AuthServer
authFailures int
streamSdpText []byte // only if publisher
streamSdpParsed *sdp.SessionDescription // only if publisher
streamProtocol gortsplib.StreamProtocol
streamTracks []*serverClientTrack
streamTracks []*clientTrack
rtcpReceivers []*gortsplib.RtcpReceiver
readBuf *doubleBuffer
writeBuf *doubleBuffer
events chan serverClientEvent // only if state = Play and gortsplib.StreamProtocol = TCP
describeRes chan []byte
events chan clientEvent // only if state = Play and gortsplib.StreamProtocol = TCP
done chan struct{}
}
func newServerClient(p *program, nconn net.Conn) *serverClient {
c := &serverClient{
func newServerClient(p *program, nconn net.Conn) *client {
c := &client{
p: p,
conn: gortsplib.NewConnServer(gortsplib.ConnServerConf{
Conn: nconn,
ReadTimeout: p.conf.ReadTimeout,
WriteTimeout: p.conf.WriteTimeout,
}),
state: clientStateStarting,
state: clientStateInitial,
readBuf: newDoubleBuffer(clientTcpReadBufferSize),
done: make(chan struct{}),
}
@@ -110,31 +110,21 @@ func newServerClient(p *program, nconn net.Conn) *serverClient {
return c
}
func (c *serverClient) log(format string, args ...interface{}) {
func (c *client) log(format string, args ...interface{}) {
c.p.log("[client %s] "+format, append([]interface{}{c.conn.NetConn().RemoteAddr().String()}, args...)...)
}
func (c *serverClient) ip() net.IP {
func (c *client) isPublisher() {}
func (c *client) ip() net.IP {
return c.conn.NetConn().RemoteAddr().(*net.TCPAddr).IP
}
func (c *serverClient) zone() string {
func (c *client) zone() string {
return c.conn.NetConn().RemoteAddr().(*net.TCPAddr).Zone
}
func (c *serverClient) publisherIsReady() bool {
return c.state == clientStateRecord
}
func (c *serverClient) publisherSdpText() []byte {
return c.streamSdpText
}
func (c *serverClient) publisherSdpParsed() *sdp.SessionDescription {
return c.streamSdpParsed
}
func (c *serverClient) run() {
func (c *client) run() {
var runOnConnectCmd *exec.Cmd
if c.p.conf.RunOnConnect != "" {
runOnConnectCmd = exec.Command("/bin/sh", "-c", c.p.conf.RunOnConnect)
@@ -176,12 +166,7 @@ outer:
close(c.done) // close() never blocks
}
func (c *serverClient) close() {
c.conn.NetConn().Close()
<-c.done
}
func (c *serverClient) writeResError(req *gortsplib.Request, code gortsplib.StatusCode, err error) {
func (c *client) writeResError(req *gortsplib.Request, code gortsplib.StatusCode, err error) {
c.log("ERR: %s", err)
header := gortsplib.Header{}
@@ -195,22 +180,10 @@ func (c *serverClient) writeResError(req *gortsplib.Request, code gortsplib.Stat
})
}
func (c *serverClient) findConfForPath(path string) *ConfPath {
if pconf, ok := c.p.conf.Paths[path]; ok {
return pconf
}
if pconf, ok := c.p.conf.Paths["all"]; ok {
return pconf
}
return nil
}
var errAuthCritical = errors.New("auth critical")
var errAuthNotCritical = errors.New("auth not critical")
func (c *serverClient) authenticate(ips []interface{}, user string, pass string, req *gortsplib.Request) error {
func (c *client) authenticate(ips []interface{}, user string, pass string, req *gortsplib.Request) error {
// validate ip
err := func() error {
if ips == nil {
@@ -288,7 +261,7 @@ func (c *serverClient) authenticate(ips []interface{}, user string, pass string,
return nil
}
func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
func (c *client) handleRequest(req *gortsplib.Request) bool {
c.log(string(req.Method))
cseq, ok := req.Header["CSeq"]
@@ -315,9 +288,6 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
switch req.Method {
case gortsplib.OPTIONS:
// do not check state, since OPTIONS can be requested
// in any state
c.conn.WriteResponse(&gortsplib.Response{
StatusCode: gortsplib.StatusOK,
Header: gortsplib.Header{
@@ -335,13 +305,13 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
return true
case gortsplib.DESCRIBE:
if c.state != clientStateStarting {
if c.state != clientStateInitial {
c.writeResError(req, gortsplib.StatusBadRequest,
fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStateStarting))
fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStateInitial))
return false
}
pconf := c.findConfForPath(path)
pconf := c.p.findConfForPath(path)
if pconf == nil {
c.writeResError(req, gortsplib.StatusBadRequest,
fmt.Errorf("unable to find a valid configuration for path '%s'", path))
@@ -356,9 +326,9 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
return true
}
res := make(chan []byte)
c.p.events <- programEventClientDescribe{path, res}
sdp := <-res
c.describeRes = make(chan []byte)
c.p.events <- programEventClientDescribe{c, path}
sdp := <-c.describeRes
if sdp == nil {
c.writeResError(req, gortsplib.StatusNotFound, fmt.Errorf("no one is publishing on path '%s'", path))
return false
@@ -376,9 +346,9 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
return true
case gortsplib.ANNOUNCE:
if c.state != clientStateStarting {
if c.state != clientStateInitial {
c.writeResError(req, gortsplib.StatusBadRequest,
fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStateStarting))
fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStateInitial))
return false
}
@@ -387,7 +357,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
return false
}
pconf := c.findConfForPath(path)
pconf := c.p.findConfForPath(path)
if pconf == nil {
c.writeResError(req, gortsplib.StatusBadRequest,
fmt.Errorf("unable to find a valid configuration for path '%s'", path))
@@ -435,16 +405,13 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
sdpParsed, req.Content = sdpForServer(tracks)
res := make(chan error)
c.p.events <- programEventClientAnnounce{res, c, path}
c.p.events <- programEventClientAnnounce{res, c, path, req.Content, sdpParsed}
err = <-res
if err != nil {
c.writeResError(req, gortsplib.StatusBadRequest, err)
return false
}
c.streamSdpText = req.Content
c.streamSdpParsed = sdpParsed
c.conn.WriteResponse(&gortsplib.Response{
StatusCode: gortsplib.StatusOK,
Header: gortsplib.Header{
@@ -467,8 +434,8 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
switch c.state {
// play
case clientStateStarting, clientStatePrePlay:
pconf := c.findConfForPath(path)
case clientStateInitial, clientStatePrePlay:
pconf := c.p.findConfForPath(path)
if pconf == nil {
c.writeResError(req, gortsplib.StatusBadRequest,
fmt.Errorf("unable to find a valid configuration for path '%s'", path))
@@ -626,7 +593,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
return false
}
if len(c.streamTracks) >= len(c.streamSdpParsed.MediaDescriptions) {
if len(c.streamTracks) >= len(c.p.paths[c.path].publisherSdpParsed.MediaDescriptions) {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("all the tracks have already been setup"))
return false
}
@@ -678,7 +645,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
return false
}
if len(c.streamTracks) >= len(c.streamSdpParsed.MediaDescriptions) {
if len(c.streamTracks) >= len(c.p.paths[c.path].publisherSdpParsed.MediaDescriptions) {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("all the tracks have already been setup"))
return false
}
@@ -762,7 +729,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
return false
}
if len(c.streamTracks) != len(c.streamSdpParsed.MediaDescriptions) {
if len(c.streamTracks) != len(c.p.paths[c.path].publisherSdpParsed.MediaDescriptions) {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("not all tracks have been setup"))
return false
}
@@ -788,12 +755,12 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
}
}
func (c *serverClient) runPlay(path string) {
pconf := c.findConfForPath(path)
func (c *client) runPlay(path string) {
pconf := c.p.findConfForPath(path)
if c.streamProtocol == gortsplib.StreamProtocolTcp {
c.writeBuf = newDoubleBuffer(clientTcpWriteBufferSize)
c.events = make(chan serverClientEvent)
c.events = make(chan clientEvent)
}
done := make(chan struct{})
@@ -863,7 +830,7 @@ func (c *serverClient) runPlay(path string) {
case rawEvt := <-c.events:
switch evt := rawEvt.(type) {
case serverClientEventFrameTcp:
case clientEventFrameTcp:
c.conn.WriteFrame(evt.frame)
}
}
@@ -887,8 +854,8 @@ func (c *serverClient) runPlay(path string) {
}
}
func (c *serverClient) runRecord(path string) {
pconf := c.findConfForPath(path)
func (c *client) runRecord(path string) {
pconf := c.p.findConfForPath(path)
c.rtcpReceivers = make([]*gortsplib.RtcpReceiver, len(c.streamTracks))
for trackId := range c.streamTracks {

View File

@@ -12,11 +12,12 @@ import (
"gopkg.in/yaml.v2"
)
type ConfPath struct {
type confPath struct {
Source string `yaml:"source"`
sourceUrl *url.URL
SourceProtocol string `yaml:"sourceProtocol"`
sourceProtocolParsed gortsplib.StreamProtocol
SourceOnDemand bool `yaml:"sourceOnDemand"`
PublishUser string `yaml:"publishUser"`
PublishPass string `yaml:"publishPass"`
PublishIps []string `yaml:"publishIps"`
@@ -41,7 +42,7 @@ type conf struct {
AuthMethods []string `yaml:"authMethods"`
authMethodsParsed []gortsplib.AuthMethod
Pprof bool `yaml:"pprof"`
Paths map[string]*ConfPath `yaml:"paths"`
Paths map[string]*confPath `yaml:"paths"`
}
func loadConf(fpath string, stdin io.Reader) (*conf, error) {
@@ -142,14 +143,14 @@ func loadConf(fpath string, stdin io.Reader) (*conf, error) {
}
if len(conf.Paths) == 0 {
conf.Paths = map[string]*ConfPath{
conf.Paths = map[string]*confPath{
"all": {},
}
}
for path, pconf := range conf.Paths {
if pconf == nil {
conf.Paths[path] = &ConfPath{}
conf.Paths[path] = &confPath{}
pconf = conf.Paths[path]
}

211
main.go
View File

@@ -8,6 +8,7 @@ import (
"net/http"
_ "net/http/pprof"
"os"
"time"
"github.com/aler9/gortsplib"
"github.com/aler9/sdp/v3"
@@ -28,29 +29,31 @@ func (programEventClientNew) isProgramEvent() {}
type programEventClientClose struct {
done chan struct{}
client *serverClient
client *client
}
func (programEventClientClose) isProgramEvent() {}
type programEventClientDescribe struct {
client *client
path string
res chan []byte
}
func (programEventClientDescribe) isProgramEvent() {}
type programEventClientAnnounce struct {
res chan error
client *serverClient
client *client
path string
sdpText []byte
sdpParsed *sdp.SessionDescription
}
func (programEventClientAnnounce) isProgramEvent() {}
type programEventClientSetupPlay struct {
res chan error
client *serverClient
client *client
path string
protocol gortsplib.StreamProtocol
rtpPort int
@@ -61,7 +64,7 @@ func (programEventClientSetupPlay) isProgramEvent() {}
type programEventClientSetupRecord struct {
res chan error
client *serverClient
client *client
protocol gortsplib.StreamProtocol
rtpPort int
rtcpPort int
@@ -71,35 +74,35 @@ func (programEventClientSetupRecord) isProgramEvent() {}
type programEventClientPlay1 struct {
res chan error
client *serverClient
client *client
}
func (programEventClientPlay1) isProgramEvent() {}
type programEventClientPlay2 struct {
done chan struct{}
client *serverClient
client *client
}
func (programEventClientPlay2) isProgramEvent() {}
type programEventClientPlayStop struct {
done chan struct{}
client *serverClient
client *client
}
func (programEventClientPlayStop) isProgramEvent() {}
type programEventClientRecord struct {
done chan struct{}
client *serverClient
client *client
}
func (programEventClientRecord) isProgramEvent() {}
type programEventClientRecordStop struct {
done chan struct{}
client *serverClient
client *client
}
func (programEventClientRecordStop) isProgramEvent() {}
@@ -121,46 +124,45 @@ type programEventClientFrameTcp struct {
func (programEventClientFrameTcp) isProgramEvent() {}
type programEventStreamerReady struct {
type programEventSourceReady struct {
source *source
}
func (programEventStreamerReady) isProgramEvent() {}
func (programEventSourceReady) isProgramEvent() {}
type programEventStreamerNotReady struct {
type programEventSourceNotReady struct {
source *source
}
func (programEventStreamerNotReady) isProgramEvent() {}
func (programEventSourceNotReady) isProgramEvent() {}
type programEventStreamerFrame struct {
type programEventSourceFrame struct {
source *source
trackId int
streamType gortsplib.StreamType
buf []byte
}
func (programEventStreamerFrame) isProgramEvent() {}
func (programEventSourceFrame) isProgramEvent() {}
type programEventSourceReset struct {
source *source
}
func (programEventSourceReset) isProgramEvent() {}
type programEventTerminate struct{}
func (programEventTerminate) isProgramEvent() {}
// a publisher can be either a serverClient or a source
type publisher interface {
publisherIsReady() bool
publisherSdpText() []byte
publisherSdpParsed() *sdp.SessionDescription
}
type program struct {
conf *conf
rtspl *serverTcpListener
rtpl *serverUdpListener
rtcpl *serverUdpListener
clients map[*serverClient]struct{}
rtspl *serverTcp
rtpl *serverUdp
rtcpl *serverUdp
sources []*source
publishers map[string]publisher
clients map[*client]struct{}
paths map[string]*path
publisherCount int
readerCount int
@@ -189,17 +191,17 @@ func newProgram(args []string, stdin io.Reader) (*program, error) {
p := &program{
conf: conf,
clients: make(map[*serverClient]struct{}),
publishers: make(map[string]publisher),
clients: make(map[*client]struct{}),
paths: make(map[string]*path),
events: make(chan programEvent),
done: make(chan struct{}),
}
for path, pconf := range conf.Paths {
if pconf.Source != "record" {
s := newSource(p, path, pconf.sourceUrl, pconf.sourceProtocolParsed)
s := newSource(p, path, pconf)
p.sources = append(p.sources, s)
p.publishers[path] = s
p.paths[path] = newPath(p, path, s)
}
}
@@ -217,17 +219,17 @@ func newProgram(args []string, stdin io.Reader) (*program, error) {
http.DefaultServeMux = http.NewServeMux()
}
p.rtpl, err = newServerUdpListener(p, conf.RtpPort, gortsplib.StreamTypeRtp)
p.rtpl, err = newServerUdp(p, conf.RtpPort, gortsplib.StreamTypeRtp)
if err != nil {
return nil, err
}
p.rtcpl, err = newServerUdpListener(p, conf.RtcpPort, gortsplib.StreamTypeRtcp)
p.rtcpl, err = newServerUdp(p, conf.RtcpPort, gortsplib.StreamTypeRtcp)
if err != nil {
return nil, err
}
p.rtspl, err = newServerTcpListener(p)
p.rtspl, err = newServerTcp(p)
if err != nil {
return nil, err
}
@@ -249,8 +251,18 @@ func (p *program) log(format string, args ...interface{}) {
}
func (p *program) run() {
checkPathsTicker := time.NewTicker(5 * time.Second)
defer checkPathsTicker.Stop()
outer:
for rawEvt := range p.events {
for {
select {
case <-checkPathsTicker.C:
for _, path := range p.paths {
path.check()
}
case rawEvt := <-p.events:
switch evt := rawEvt.(type) {
case programEventClientNew:
c := newServerClient(p, evt.nconn)
@@ -258,17 +270,17 @@ outer:
c.log("connected")
case programEventClientClose:
// already deleted
if _, ok := p.clients[evt.client]; !ok {
close(evt.done)
continue
}
delete(p.clients, evt.client)
if evt.client.path != "" {
if pub, ok := p.publishers[evt.client.path]; ok && pub == evt.client {
delete(p.publishers, evt.client.path)
if path, ok := p.paths[evt.client.path]; ok {
// if this is a publisher
if path.publisher == evt.client {
path.publisherReset()
// delete the path
delete(p.paths, evt.client.path)
}
}
}
@@ -276,16 +288,26 @@ outer:
close(evt.done)
case programEventClientDescribe:
pub, ok := p.publishers[evt.path]
if !ok || !pub.publisherIsReady() {
evt.res <- nil
path, ok := p.paths[evt.path]
// no path: return 404
if !ok {
evt.client.describeRes <- nil
continue
}
evt.res <- pub.publisherSdpText()
sdpText, wait := path.describe()
if wait {
evt.client.path = evt.path
evt.client.state = clientStateWaitingDescription
continue
}
evt.client.describeRes <- sdpText
case programEventClientAnnounce:
_, ok := p.publishers[evt.path]
_, ok := p.paths[evt.path]
if ok {
evt.res <- fmt.Errorf("someone is already publishing on path '%s'", evt.path)
continue
@@ -293,26 +315,26 @@ outer:
evt.client.path = evt.path
evt.client.state = clientStateAnnounce
p.publishers[evt.path] = evt.client
p.paths[evt.path] = newPath(p, evt.path, evt.client)
p.paths[evt.path].publisherSdpText = evt.sdpText
p.paths[evt.path].publisherSdpParsed = evt.sdpParsed
evt.res <- nil
case programEventClientSetupPlay:
pub, ok := p.publishers[evt.path]
if !ok || !pub.publisherIsReady() {
evt.res <- fmt.Errorf("no one is streaming on path '%s'", evt.path)
path, ok := p.paths[evt.path]
if !ok || !path.publisherReady {
evt.res <- fmt.Errorf("no one is publishing on path '%s'", evt.path)
continue
}
sdpParsed := pub.publisherSdpParsed()
if len(evt.client.streamTracks) >= len(sdpParsed.MediaDescriptions) {
if len(evt.client.streamTracks) >= len(path.publisherSdpParsed.MediaDescriptions) {
evt.res <- fmt.Errorf("all the tracks have already been setup")
continue
}
evt.client.path = evt.path
evt.client.streamProtocol = evt.protocol
evt.client.streamTracks = append(evt.client.streamTracks, &serverClientTrack{
evt.client.streamTracks = append(evt.client.streamTracks, &clientTrack{
rtpPort: evt.rtpPort,
rtcpPort: evt.rtcpPort,
})
@@ -321,7 +343,7 @@ outer:
case programEventClientSetupRecord:
evt.client.streamProtocol = evt.protocol
evt.client.streamTracks = append(evt.client.streamTracks, &serverClientTrack{
evt.client.streamTracks = append(evt.client.streamTracks, &clientTrack{
rtpPort: evt.rtpPort,
rtcpPort: evt.rtcpPort,
})
@@ -329,15 +351,13 @@ outer:
evt.res <- nil
case programEventClientPlay1:
pub, ok := p.publishers[evt.client.path]
if !ok || !pub.publisherIsReady() {
evt.res <- fmt.Errorf("no one is streaming on path '%s'", evt.client.path)
path, ok := p.paths[evt.client.path]
if !ok || !path.publisherReady {
evt.res <- fmt.Errorf("no one is publishing on path '%s'", evt.client.path)
continue
}
sdpParsed := pub.publisherSdpParsed()
if len(evt.client.streamTracks) != len(sdpParsed.MediaDescriptions) {
if len(evt.client.streamTracks) != len(path.publisherSdpParsed.MediaDescriptions) {
evt.res <- fmt.Errorf("not all tracks have been setup")
continue
}
@@ -357,23 +377,17 @@ outer:
case programEventClientRecord:
p.publisherCount += 1
evt.client.state = clientStateRecord
p.paths[evt.client.path].publisherSetReady()
close(evt.done)
case programEventClientRecordStop:
p.publisherCount -= 1
evt.client.state = clientStatePreRecord
// close all other clients that share the same path
for oc := range p.clients {
if oc != evt.client && oc.path == evt.client.path {
go oc.close()
}
}
p.paths[evt.client.path].publisherSetNotReady()
close(evt.done)
case programEventClientFrameUdp:
client, trackId := p.findPublisher(evt.addr, evt.streamType)
client, trackId := p.findClientPublisher(evt.addr, evt.streamType)
if client == nil {
continue
}
@@ -384,30 +398,25 @@ outer:
case programEventClientFrameTcp:
p.forwardFrame(evt.path, evt.trackId, evt.streamType, evt.buf)
case programEventStreamerReady:
evt.source.ready = true
p.publisherCount += 1
case programEventSourceReady:
evt.source.log("ready")
p.paths[evt.source.path].publisherSetReady()
case programEventStreamerNotReady:
evt.source.ready = false
p.publisherCount -= 1
case programEventSourceNotReady:
evt.source.log("not ready")
p.paths[evt.source.path].publisherSetNotReady()
// close all clients that share the same path
for oc := range p.clients {
if oc.path == evt.source.path {
go oc.close()
}
}
case programEventStreamerFrame:
case programEventSourceFrame:
p.forwardFrame(evt.source.path, evt.trackId, evt.streamType, evt.buf)
case programEventSourceReset:
p.paths[evt.source.path].publisherReset()
case programEventTerminate:
break outer
}
}
}
go func() {
for rawEvt := range p.events {
@@ -416,7 +425,7 @@ outer:
close(evt.done)
case programEventClientDescribe:
evt.res <- nil
evt.client.describeRes <- nil
case programEventClientAnnounce:
evt.res <- fmt.Errorf("terminated")
@@ -446,7 +455,8 @@ outer:
}()
for _, s := range p.sources {
s.close()
s.events <- sourceEventTerminate{}
<-s.done
}
p.rtspl.close()
@@ -454,7 +464,8 @@ outer:
p.rtpl.close()
for c := range p.clients {
c.close()
c.conn.NetConn().Close()
<-c.done
}
close(p.events)
@@ -466,9 +477,21 @@ func (p *program) close() {
<-p.done
}
func (p *program) findPublisher(addr *net.UDPAddr, streamType gortsplib.StreamType) (*serverClient, int) {
for _, pub := range p.publishers {
cl, ok := pub.(*serverClient)
func (p *program) findConfForPath(path string) *confPath {
if pconf, ok := p.conf.Paths[path]; ok {
return pconf
}
if pconf, ok := p.conf.Paths["all"]; ok {
return pconf
}
return nil
}
func (p *program) findClientPublisher(addr *net.UDPAddr, streamType gortsplib.StreamType) (*client, int) {
for _, path := range p.paths {
cl, ok := path.publisher.(*client)
if !ok {
continue
}
@@ -523,7 +546,7 @@ func (p *program) forwardFrame(path string, trackId int, streamType gortsplib.St
buf = buf[:len(frame)]
copy(buf, frame)
client.events <- serverClientEventFrameTcp{
client.events <- clientEventFrameTcp{
frame: &gortsplib.InterleavedFrame{
TrackId: trackId,
StreamType: streamType,

110
path.go Normal file
View File

@@ -0,0 +1,110 @@
package main
import (
"time"
"github.com/aler9/sdp/v3"
)
// a publisher is either a client or a source
type publisher interface {
isPublisher()
}
type path struct {
p *program
id string
publisher publisher
publisherReady bool
publisherSdpText []byte
publisherSdpParsed *sdp.SessionDescription
lastRequested time.Time
}
func newPath(p *program, id string, publisher publisher) *path {
return &path{
p: p,
id: id,
publisher: publisher,
}
}
func (p *path) check() {
hasClients := func() bool {
for c := range p.p.clients {
if c.path == p.id {
return true
}
}
return false
}()
source, publisherIsSource := p.publisher.(*source)
// stop source if needed
if !hasClients &&
publisherIsSource &&
source.state == sourceStateRunning &&
time.Since(p.lastRequested) >= 10*time.Second {
source.log("stopping due to inactivity")
source.state = sourceStateStopped
source.events <- sourceEventApplyState{source.state}
}
}
func (p *path) describe() ([]byte, bool) {
p.lastRequested = time.Now()
// publisher was found but is not ready: wait
if !p.publisherReady {
// start source if needed
if source, ok := p.publisher.(*source); ok && source.state == sourceStateStopped {
source.log("starting on demand")
source.state = sourceStateRunning
source.events <- sourceEventApplyState{source.state}
}
return nil, true
}
// publisher was found and is ready
return p.publisherSdpText, false
}
func (p *path) publisherSetReady() {
p.publisherReady = true
// reply to all clients that are waiting for a description
for c := range p.p.clients {
if c.state == clientStateWaitingDescription &&
c.path == p.id {
c.path = ""
c.state = clientStateInitial
c.describeRes <- p.publisherSdpText
}
}
}
func (p *path) publisherSetNotReady() {
p.publisherReady = false
// close all clients that are reading
for c := range p.p.clients {
if c.state != clientStateWaitingDescription &&
c != p.publisher &&
c.path == p.id {
c.conn.NetConn().Close()
}
}
}
func (p *path) publisherReset() {
// reply to all clients that were waiting for a description
for oc := range p.p.clients {
if oc.state == clientStateWaitingDescription &&
oc.path == p.id {
oc.path = ""
oc.state = clientStateInitial
oc.describeRes <- nil
}
}
}

View File

@@ -15,6 +15,7 @@ readTimeout: 10s
# timeout of write operations
writeTimeout: 5s
# supported authentication methods
# WARNING: both methods are insecure, use RTSP inside a VPN to enforce security.
authMethods: [basic, digest]
# enable pprof on port 9999 to monitor performances
pprof: false
@@ -29,6 +30,9 @@ paths:
source: record
# if the source is an RTSP url, this is the protocol that will be used to pull the stream
sourceProtocol: udp
# if the source is an RTSP url, it will be pulled only when at least one reader
# is connected, saving bandwidth
sourceOnDemand: no
# username required to publish
publishUser:

View File

@@ -4,14 +4,14 @@ import (
"net"
)
type serverTcpListener struct {
type serverTcp struct {
p *program
nconn *net.TCPListener
done chan struct{}
}
func newServerTcpListener(p *program) (*serverTcpListener, error) {
func newServerTcp(p *program) (*serverTcp, error) {
nconn, err := net.ListenTCP("tcp", &net.TCPAddr{
Port: p.conf.RtspPort,
})
@@ -19,7 +19,7 @@ func newServerTcpListener(p *program) (*serverTcpListener, error) {
return nil, err
}
l := &serverTcpListener{
l := &serverTcp{
p: p,
nconn: nconn,
done: make(chan struct{}),
@@ -29,11 +29,11 @@ func newServerTcpListener(p *program) (*serverTcpListener, error) {
return l, nil
}
func (l *serverTcpListener) log(format string, args ...interface{}) {
func (l *serverTcp) log(format string, args ...interface{}) {
l.p.log("[TCP listener] "+format, args...)
}
func (l *serverTcpListener) run() {
func (l *serverTcp) run() {
for {
nconn, err := l.nconn.AcceptTCP()
if err != nil {
@@ -46,7 +46,7 @@ func (l *serverTcpListener) run() {
close(l.done)
}
func (l *serverTcpListener) close() {
func (l *serverTcp) close() {
l.nconn.Close()
<-l.done
}

View File

@@ -12,7 +12,7 @@ type udpAddrBufPair struct {
buf []byte
}
type serverUdpListener struct {
type serverUdp struct {
p *program
nconn *net.UDPConn
streamType gortsplib.StreamType
@@ -23,7 +23,7 @@ type serverUdpListener struct {
done chan struct{}
}
func newServerUdpListener(p *program, port int, streamType gortsplib.StreamType) (*serverUdpListener, error) {
func newServerUdp(p *program, port int, streamType gortsplib.StreamType) (*serverUdp, error) {
nconn, err := net.ListenUDP("udp", &net.UDPAddr{
Port: port,
})
@@ -31,7 +31,7 @@ func newServerUdpListener(p *program, port int, streamType gortsplib.StreamType)
return nil, err
}
l := &serverUdpListener{
l := &serverUdp{
p: p,
nconn: nconn,
streamType: streamType,
@@ -45,7 +45,7 @@ func newServerUdpListener(p *program, port int, streamType gortsplib.StreamType)
return l, nil
}
func (l *serverUdpListener) log(format string, args ...interface{}) {
func (l *serverUdp) log(format string, args ...interface{}) {
var label string
if l.streamType == gortsplib.StreamTypeRtp {
label = "RTP"
@@ -55,7 +55,7 @@ func (l *serverUdpListener) log(format string, args ...interface{}) {
l.p.log("[UDP/"+label+" listener] "+format, args...)
}
func (l *serverUdpListener) run() {
func (l *serverUdp) run() {
writeDone := make(chan struct{})
go func() {
defer close(writeDone)
@@ -85,12 +85,12 @@ func (l *serverUdpListener) run() {
close(l.done)
}
func (l *serverUdpListener) close() {
func (l *serverUdp) close() {
l.nconn.Close()
<-l.done
}
func (l *serverUdpListener) write(pair *udpAddrBufPair) {
func (l *serverUdp) write(pair *udpAddrBufPair) {
// replace input buffer with write buffer
buf := l.writeBuf.swap()
buf = buf[:len(pair.buf)]

187
source.go
View File

@@ -3,13 +3,11 @@ package main
import (
"math/rand"
"net"
"net/url"
"os"
"sync"
"time"
"github.com/aler9/gortsplib"
"github.com/aler9/sdp/v3"
)
const (
@@ -18,30 +16,53 @@ const (
sourceTcpReadBufferSize = 128 * 1024
)
type sourceState int
const (
sourceStateStopped sourceState = iota
sourceStateRunning
)
type sourceEvent interface {
isSourceEvent()
}
type sourceEventApplyState struct {
state sourceState
}
func (sourceEventApplyState) isSourceEvent() {}
type sourceEventTerminate struct{}
func (sourceEventTerminate) isSourceEvent() {}
type source struct {
p *program
path string
u *url.URL
proto gortsplib.StreamProtocol
ready bool
pconf *confPath
state sourceState
tracks []*gortsplib.Track
serverSdpText []byte
serverSdpParsed *sdp.SessionDescription
terminate chan struct{}
events chan sourceEvent
done chan struct{}
}
func newSource(p *program, path string, u *url.URL, proto gortsplib.StreamProtocol) *source {
func newSource(p *program, path string, pconf *confPath) *source {
s := &source{
p: p,
path: path,
u: u,
proto: proto,
terminate: make(chan struct{}),
pconf: pconf,
events: make(chan sourceEvent),
done: make(chan struct{}),
}
if pconf.SourceOnDemand {
s.state = sourceStateStopped
} else {
s.state = sourceStateRunning
}
return s
}
@@ -49,45 +70,88 @@ func (s *source) log(format string, args ...interface{}) {
s.p.log("[source "+s.path+"] "+format, args...)
}
func (s *source) publisherIsReady() bool {
return s.ready
}
func (s *source) publisherSdpText() []byte {
return s.serverSdpText
}
func (s *source) publisherSdpParsed() *sdp.SessionDescription {
return s.serverSdpParsed
}
func (s *source) isPublisher() {}
func (s *source) run() {
for {
ok := s.do()
if !ok {
break
running := false
var doTerminate chan struct{}
var doDone chan struct{}
applyState := func(state sourceState) {
if state == sourceStateRunning {
if !running {
s.log("started")
running = true
doTerminate = make(chan struct{})
doDone = make(chan struct{})
go s.do(doTerminate, doDone)
}
} else {
if running {
close(doTerminate)
<-doDone
running = false
s.log("stopped")
}
}
}
t := time.NewTimer(sourceRetryInterval)
select {
case <-s.terminate:
break
case <-t.C:
applyState(s.state)
outer:
for rawEvt := range s.events {
switch evt := rawEvt.(type) {
case sourceEventApplyState:
applyState(evt.state)
case sourceEventTerminate:
break outer
}
}
if running {
close(doTerminate)
<-doDone
}
close(s.done)
}
func (s *source) do() bool {
s.log("initializing with protocol %s", s.proto)
func (s *source) do(terminate chan struct{}, done chan struct{}) {
defer close(done)
for {
ok := s.doInner(terminate)
if !ok {
break
}
s.p.events <- programEventSourceReset{s}
if !func() bool {
t := time.NewTimer(sourceRetryInterval)
defer t.Stop()
select {
case <-terminate:
return false
case <-t.C:
return true
}
}() {
break
}
}
}
func (s *source) doInner(terminate chan struct{}) bool {
s.log("connecting")
var conn *gortsplib.ConnClient
var err error
dialDone := make(chan struct{})
go func() {
conn, err = gortsplib.NewConnClient(gortsplib.ConnClientConf{
Host: s.u.Host,
Host: s.pconf.sourceUrl.Host,
ReadTimeout: s.p.conf.ReadTimeout,
WriteTimeout: s.p.conf.WriteTimeout,
})
@@ -95,7 +159,7 @@ func (s *source) do() bool {
}()
select {
case <-s.terminate:
case <-terminate:
return false
case <-dialDone:
}
@@ -107,13 +171,13 @@ func (s *source) do() bool {
defer conn.Close()
_, err = conn.Options(s.u)
_, err = conn.Options(s.pconf.sourceUrl)
if err != nil {
s.log("ERR: %s", err)
return true
}
tracks, _, err := conn.Describe(s.u)
tracks, _, err := conn.Describe(s.pconf.sourceUrl)
if err != nil {
s.log("ERR: %s", err)
return true
@@ -123,17 +187,17 @@ func (s *source) do() bool {
serverSdpParsed, serverSdpText := sdpForServer(tracks)
s.tracks = tracks
s.serverSdpText = serverSdpText
s.serverSdpParsed = serverSdpParsed
s.p.paths[s.path].publisherSdpText = serverSdpText
s.p.paths[s.path].publisherSdpParsed = serverSdpParsed
if s.proto == gortsplib.StreamProtocolUdp {
return s.runUdp(conn)
if s.pconf.sourceProtocolParsed == gortsplib.StreamProtocolUdp {
return s.runUdp(terminate, conn)
} else {
return s.runTcp(conn)
return s.runTcp(terminate, conn)
}
}
func (s *source) runUdp(conn *gortsplib.ConnClient) bool {
func (s *source) runUdp(terminate chan struct{}, conn *gortsplib.ConnClient) bool {
type trackListenerPair struct {
rtpl *gortsplib.ConnClientUdpListener
rtcpl *gortsplib.ConnClientUdpListener
@@ -151,7 +215,7 @@ func (s *source) runUdp(conn *gortsplib.ConnClient) bool {
rtpPort := (rand.Intn((65535-10000)/2) * 2) + 10000
rtcpPort := rtpPort + 1
rtpl, rtcpl, _, err = conn.SetupUdp(s.u, track, rtpPort, rtcpPort)
rtpl, rtcpl, _, err = conn.SetupUdp(s.pconf.sourceUrl, track, rtpPort, rtcpPort)
if err != nil {
// retry if it's a bind error
if nerr, ok := err.(*net.OpError); ok {
@@ -175,13 +239,13 @@ func (s *source) runUdp(conn *gortsplib.ConnClient) bool {
})
}
_, err := conn.Play(s.u)
_, err := conn.Play(s.pconf.sourceUrl)
if err != nil {
s.log("ERR: %s", err)
return true
}
s.p.events <- programEventStreamerReady{s}
s.p.events <- programEventSourceReady{s}
var wg sync.WaitGroup
@@ -201,7 +265,7 @@ func (s *source) runUdp(conn *gortsplib.ConnClient) bool {
break
}
s.p.events <- programEventStreamerFrame{s, trackId, gortsplib.StreamTypeRtp, buf[:n]}
s.p.events <- programEventSourceFrame{s, trackId, gortsplib.StreamTypeRtp, buf[:n]}
}
}(trackId, lp.rtpl)
@@ -218,14 +282,14 @@ func (s *source) runUdp(conn *gortsplib.ConnClient) bool {
break
}
s.p.events <- programEventStreamerFrame{s, trackId, gortsplib.StreamTypeRtcp, buf[:n]}
s.p.events <- programEventSourceFrame{s, trackId, gortsplib.StreamTypeRtcp, buf[:n]}
}
}(trackId, lp.rtcpl)
}
tcpConnDone := make(chan error)
go func() {
tcpConnDone <- conn.LoopUDP(s.u)
tcpConnDone <- conn.LoopUDP(s.pconf.sourceUrl)
}()
var ret bool
@@ -233,7 +297,7 @@ func (s *source) runUdp(conn *gortsplib.ConnClient) bool {
outer:
for {
select {
case <-s.terminate:
case <-terminate:
conn.NetConn().Close()
<-tcpConnDone
ret = false
@@ -246,7 +310,7 @@ outer:
}
}
s.p.events <- programEventStreamerNotReady{s}
s.p.events <- programEventSourceNotReady{s}
for _, lp := range listeners {
lp.rtpl.Close()
@@ -257,22 +321,22 @@ outer:
return ret
}
func (s *source) runTcp(conn *gortsplib.ConnClient) bool {
func (s *source) runTcp(terminate chan struct{}, conn *gortsplib.ConnClient) bool {
for _, track := range s.tracks {
_, err := conn.SetupTcp(s.u, track)
_, err := conn.SetupTcp(s.pconf.sourceUrl, track)
if err != nil {
s.log("ERR: %s", err)
return true
}
}
_, err := conn.Play(s.u)
_, err := conn.Play(s.pconf.sourceUrl)
if err != nil {
s.log("ERR: %s", err)
return true
}
s.p.events <- programEventStreamerReady{s}
s.p.events <- programEventSourceReady{s}
frame := &gortsplib.InterleavedFrame{}
doubleBuf := newDoubleBuffer(sourceTcpReadBufferSize)
@@ -289,7 +353,7 @@ func (s *source) runTcp(conn *gortsplib.ConnClient) bool {
return
}
s.p.events <- programEventStreamerFrame{s, frame.TrackId, frame.StreamType, frame.Content}
s.p.events <- programEventSourceFrame{s, frame.TrackId, frame.StreamType, frame.Content}
}
}()
@@ -298,7 +362,7 @@ func (s *source) runTcp(conn *gortsplib.ConnClient) bool {
outer:
for {
select {
case <-s.terminate:
case <-terminate:
conn.NetConn().Close()
<-tcpConnDone
ret = false
@@ -311,12 +375,7 @@ outer:
}
}
s.p.events <- programEventStreamerNotReady{s}
s.p.events <- programEventSourceNotReady{s}
return ret
}
func (s *source) close() {
close(s.terminate)
<-s.done
}