From d7d2ba38f1340b97f2b38ba18a307b43c5bf6d23 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Wed, 29 Jul 2020 23:30:42 +0200 Subject: [PATCH] add option sourceOnDemand to pull sources only when there are connected clients (#36) --- Makefile | 7 +- README.md | 2 +- server-client.go => client.go | 153 +++++------- conf.go | 9 +- main.go | 405 +++++++++++++++++--------------- path.go | 110 +++++++++ rtsp-simple-server.yml | 4 + server-tcpl.go => server-tcp.go | 12 +- server-udpl.go => server-udp.go | 14 +- source.go | 205 ++++++++++------ 10 files changed, 543 insertions(+), 378 deletions(-) rename server-client.go => client.go (88%) create mode 100644 path.go rename server-tcpl.go => server-tcp.go (67%) rename server-udpl.go => server-udp.go (80%) diff --git a/Makefile b/Makefile index e3fb3b99..e22b30ad 100644 --- a/Makefile +++ b/Makefile @@ -74,9 +74,10 @@ paths: # readUser: test # readPass: tast -# proxied: -# source: rtsp://localhost:8554/mystream -# sourceProtocol: tcp + proxied: + source: rtsp://192.168.2.198:8554/stream + sourceProtocol: tcp + sourceOnDemand: yes # original: # runOnPublish: ffmpeg -i rtsp://localhost:8554/original -b:a 64k -c:v libx264 -preset ultrafast -b:v 500k -max_muxing_queue_size 1024 -f rtsp rtsp://localhost:8554/compressed diff --git a/README.md b/README.md index 78454618..abd18380 100644 --- a/README.md +++ b/README.md @@ -71,7 +71,7 @@ docker run --rm -it -v $PWD/rtsp-simple-server.yml:/rtsp-simple-server.yml -p 85 #### Full configuration file -To change the configuration, it's enough to edit the `rtsp-simple-server.yml` file, provided with the executable. The default configuration is [available here](rtsp-simple-server.yml). +To see or change the configuration, edit the `rtsp-simple-server.yml` file, provided with the executable. The default configuration is [available here](rtsp-simple-server.yml). #### Usage as RTSP Proxy diff --git a/server-client.go b/client.go similarity index 88% rename from server-client.go rename to client.go index 7f6e0fbb..be50cbbc 100644 --- a/server-client.go +++ b/client.go @@ -23,25 +23,26 @@ const ( clientUdpWriteBufferSize = 128 * 1024 ) -type serverClientTrack struct { +type clientTrack struct { rtpPort int rtcpPort int } -type serverClientEvent interface { +type clientEvent interface { isServerClientEvent() } -type serverClientEventFrameTcp struct { +type clientEventFrameTcp struct { frame *gortsplib.InterleavedFrame } -func (serverClientEventFrameTcp) isServerClientEvent() {} +func (clientEventFrameTcp) isServerClientEvent() {} -type serverClientState int +type clientState int const ( - clientStateStarting serverClientState = iota + clientStateInitial clientState = iota + clientStateWaitingDescription clientStateAnnounce clientStatePrePlay clientStatePlay @@ -49,10 +50,10 @@ const ( clientStateRecord ) -func (cs serverClientState) String() string { +func (cs clientState) String() string { switch cs { - case clientStateStarting: - return "STARTING" + case clientStateInitial: + return "INITIAL" case clientStateAnnounce: return "ANNOUNCE" @@ -72,36 +73,35 @@ func (cs serverClientState) String() string { return "UNKNOWN" } -type serverClient struct { - p *program - conn *gortsplib.ConnServer - state serverClientState - path string - authUser string - authPass string - authHelper *gortsplib.AuthServer - authFailures int - streamSdpText []byte // only if publisher - streamSdpParsed *sdp.SessionDescription // only if publisher - streamProtocol gortsplib.StreamProtocol - streamTracks []*serverClientTrack - rtcpReceivers []*gortsplib.RtcpReceiver - readBuf *doubleBuffer - writeBuf *doubleBuffer +type client struct { + p *program + conn *gortsplib.ConnServer + state clientState + path string + authUser string + authPass string + authHelper *gortsplib.AuthServer + authFailures int + streamProtocol gortsplib.StreamProtocol + streamTracks []*clientTrack + rtcpReceivers []*gortsplib.RtcpReceiver + readBuf *doubleBuffer + writeBuf *doubleBuffer - events chan serverClientEvent // only if state = Play and gortsplib.StreamProtocol = TCP - done chan struct{} + describeRes chan []byte + events chan clientEvent // only if state = Play and gortsplib.StreamProtocol = TCP + done chan struct{} } -func newServerClient(p *program, nconn net.Conn) *serverClient { - c := &serverClient{ +func newServerClient(p *program, nconn net.Conn) *client { + c := &client{ p: p, conn: gortsplib.NewConnServer(gortsplib.ConnServerConf{ Conn: nconn, ReadTimeout: p.conf.ReadTimeout, WriteTimeout: p.conf.WriteTimeout, }), - state: clientStateStarting, + state: clientStateInitial, readBuf: newDoubleBuffer(clientTcpReadBufferSize), done: make(chan struct{}), } @@ -110,31 +110,21 @@ func newServerClient(p *program, nconn net.Conn) *serverClient { return c } -func (c *serverClient) log(format string, args ...interface{}) { +func (c *client) log(format string, args ...interface{}) { c.p.log("[client %s] "+format, append([]interface{}{c.conn.NetConn().RemoteAddr().String()}, args...)...) } -func (c *serverClient) ip() net.IP { +func (c *client) isPublisher() {} + +func (c *client) ip() net.IP { return c.conn.NetConn().RemoteAddr().(*net.TCPAddr).IP } -func (c *serverClient) zone() string { +func (c *client) zone() string { return c.conn.NetConn().RemoteAddr().(*net.TCPAddr).Zone } -func (c *serverClient) publisherIsReady() bool { - return c.state == clientStateRecord -} - -func (c *serverClient) publisherSdpText() []byte { - return c.streamSdpText -} - -func (c *serverClient) publisherSdpParsed() *sdp.SessionDescription { - return c.streamSdpParsed -} - -func (c *serverClient) run() { +func (c *client) run() { var runOnConnectCmd *exec.Cmd if c.p.conf.RunOnConnect != "" { runOnConnectCmd = exec.Command("/bin/sh", "-c", c.p.conf.RunOnConnect) @@ -176,12 +166,7 @@ outer: close(c.done) // close() never blocks } -func (c *serverClient) close() { - c.conn.NetConn().Close() - <-c.done -} - -func (c *serverClient) writeResError(req *gortsplib.Request, code gortsplib.StatusCode, err error) { +func (c *client) writeResError(req *gortsplib.Request, code gortsplib.StatusCode, err error) { c.log("ERR: %s", err) header := gortsplib.Header{} @@ -195,22 +180,10 @@ func (c *serverClient) writeResError(req *gortsplib.Request, code gortsplib.Stat }) } -func (c *serverClient) findConfForPath(path string) *ConfPath { - if pconf, ok := c.p.conf.Paths[path]; ok { - return pconf - } - - if pconf, ok := c.p.conf.Paths["all"]; ok { - return pconf - } - - return nil -} - var errAuthCritical = errors.New("auth critical") var errAuthNotCritical = errors.New("auth not critical") -func (c *serverClient) authenticate(ips []interface{}, user string, pass string, req *gortsplib.Request) error { +func (c *client) authenticate(ips []interface{}, user string, pass string, req *gortsplib.Request) error { // validate ip err := func() error { if ips == nil { @@ -288,7 +261,7 @@ func (c *serverClient) authenticate(ips []interface{}, user string, pass string, return nil } -func (c *serverClient) handleRequest(req *gortsplib.Request) bool { +func (c *client) handleRequest(req *gortsplib.Request) bool { c.log(string(req.Method)) cseq, ok := req.Header["CSeq"] @@ -315,9 +288,6 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { switch req.Method { case gortsplib.OPTIONS: - // do not check state, since OPTIONS can be requested - // in any state - c.conn.WriteResponse(&gortsplib.Response{ StatusCode: gortsplib.StatusOK, Header: gortsplib.Header{ @@ -335,13 +305,13 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { return true case gortsplib.DESCRIBE: - if c.state != clientStateStarting { + if c.state != clientStateInitial { c.writeResError(req, gortsplib.StatusBadRequest, - fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStateStarting)) + fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStateInitial)) return false } - pconf := c.findConfForPath(path) + pconf := c.p.findConfForPath(path) if pconf == nil { c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("unable to find a valid configuration for path '%s'", path)) @@ -356,9 +326,9 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { return true } - res := make(chan []byte) - c.p.events <- programEventClientDescribe{path, res} - sdp := <-res + c.describeRes = make(chan []byte) + c.p.events <- programEventClientDescribe{c, path} + sdp := <-c.describeRes if sdp == nil { c.writeResError(req, gortsplib.StatusNotFound, fmt.Errorf("no one is publishing on path '%s'", path)) return false @@ -376,9 +346,9 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { return true case gortsplib.ANNOUNCE: - if c.state != clientStateStarting { + if c.state != clientStateInitial { c.writeResError(req, gortsplib.StatusBadRequest, - fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStateStarting)) + fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStateInitial)) return false } @@ -387,7 +357,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { return false } - pconf := c.findConfForPath(path) + pconf := c.p.findConfForPath(path) if pconf == nil { c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("unable to find a valid configuration for path '%s'", path)) @@ -435,16 +405,13 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { sdpParsed, req.Content = sdpForServer(tracks) res := make(chan error) - c.p.events <- programEventClientAnnounce{res, c, path} + c.p.events <- programEventClientAnnounce{res, c, path, req.Content, sdpParsed} err = <-res if err != nil { c.writeResError(req, gortsplib.StatusBadRequest, err) return false } - c.streamSdpText = req.Content - c.streamSdpParsed = sdpParsed - c.conn.WriteResponse(&gortsplib.Response{ StatusCode: gortsplib.StatusOK, Header: gortsplib.Header{ @@ -467,8 +434,8 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { switch c.state { // play - case clientStateStarting, clientStatePrePlay: - pconf := c.findConfForPath(path) + case clientStateInitial, clientStatePrePlay: + pconf := c.p.findConfForPath(path) if pconf == nil { c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("unable to find a valid configuration for path '%s'", path)) @@ -626,7 +593,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { return false } - if len(c.streamTracks) >= len(c.streamSdpParsed.MediaDescriptions) { + if len(c.streamTracks) >= len(c.p.paths[c.path].publisherSdpParsed.MediaDescriptions) { c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("all the tracks have already been setup")) return false } @@ -678,7 +645,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { return false } - if len(c.streamTracks) >= len(c.streamSdpParsed.MediaDescriptions) { + if len(c.streamTracks) >= len(c.p.paths[c.path].publisherSdpParsed.MediaDescriptions) { c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("all the tracks have already been setup")) return false } @@ -762,7 +729,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { return false } - if len(c.streamTracks) != len(c.streamSdpParsed.MediaDescriptions) { + if len(c.streamTracks) != len(c.p.paths[c.path].publisherSdpParsed.MediaDescriptions) { c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("not all tracks have been setup")) return false } @@ -788,12 +755,12 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { } } -func (c *serverClient) runPlay(path string) { - pconf := c.findConfForPath(path) +func (c *client) runPlay(path string) { + pconf := c.p.findConfForPath(path) if c.streamProtocol == gortsplib.StreamProtocolTcp { c.writeBuf = newDoubleBuffer(clientTcpWriteBufferSize) - c.events = make(chan serverClientEvent) + c.events = make(chan clientEvent) } done := make(chan struct{}) @@ -863,7 +830,7 @@ func (c *serverClient) runPlay(path string) { case rawEvt := <-c.events: switch evt := rawEvt.(type) { - case serverClientEventFrameTcp: + case clientEventFrameTcp: c.conn.WriteFrame(evt.frame) } } @@ -887,8 +854,8 @@ func (c *serverClient) runPlay(path string) { } } -func (c *serverClient) runRecord(path string) { - pconf := c.findConfForPath(path) +func (c *client) runRecord(path string) { + pconf := c.p.findConfForPath(path) c.rtcpReceivers = make([]*gortsplib.RtcpReceiver, len(c.streamTracks)) for trackId := range c.streamTracks { diff --git a/conf.go b/conf.go index 69487897..6ae3fdc5 100644 --- a/conf.go +++ b/conf.go @@ -12,11 +12,12 @@ import ( "gopkg.in/yaml.v2" ) -type ConfPath struct { +type confPath struct { Source string `yaml:"source"` sourceUrl *url.URL SourceProtocol string `yaml:"sourceProtocol"` sourceProtocolParsed gortsplib.StreamProtocol + SourceOnDemand bool `yaml:"sourceOnDemand"` PublishUser string `yaml:"publishUser"` PublishPass string `yaml:"publishPass"` PublishIps []string `yaml:"publishIps"` @@ -41,7 +42,7 @@ type conf struct { AuthMethods []string `yaml:"authMethods"` authMethodsParsed []gortsplib.AuthMethod Pprof bool `yaml:"pprof"` - Paths map[string]*ConfPath `yaml:"paths"` + Paths map[string]*confPath `yaml:"paths"` } func loadConf(fpath string, stdin io.Reader) (*conf, error) { @@ -142,14 +143,14 @@ func loadConf(fpath string, stdin io.Reader) (*conf, error) { } if len(conf.Paths) == 0 { - conf.Paths = map[string]*ConfPath{ + conf.Paths = map[string]*confPath{ "all": {}, } } for path, pconf := range conf.Paths { if pconf == nil { - conf.Paths[path] = &ConfPath{} + conf.Paths[path] = &confPath{} pconf = conf.Paths[path] } diff --git a/main.go b/main.go index 86dd0fe5..6c390f27 100644 --- a/main.go +++ b/main.go @@ -8,6 +8,7 @@ import ( "net/http" _ "net/http/pprof" "os" + "time" "github.com/aler9/gortsplib" "github.com/aler9/sdp/v3" @@ -28,29 +29,31 @@ func (programEventClientNew) isProgramEvent() {} type programEventClientClose struct { done chan struct{} - client *serverClient + client *client } func (programEventClientClose) isProgramEvent() {} type programEventClientDescribe struct { - path string - res chan []byte + client *client + path string } func (programEventClientDescribe) isProgramEvent() {} type programEventClientAnnounce struct { - res chan error - client *serverClient - path string + res chan error + client *client + path string + sdpText []byte + sdpParsed *sdp.SessionDescription } func (programEventClientAnnounce) isProgramEvent() {} type programEventClientSetupPlay struct { res chan error - client *serverClient + client *client path string protocol gortsplib.StreamProtocol rtpPort int @@ -61,7 +64,7 @@ func (programEventClientSetupPlay) isProgramEvent() {} type programEventClientSetupRecord struct { res chan error - client *serverClient + client *client protocol gortsplib.StreamProtocol rtpPort int rtcpPort int @@ -71,35 +74,35 @@ func (programEventClientSetupRecord) isProgramEvent() {} type programEventClientPlay1 struct { res chan error - client *serverClient + client *client } func (programEventClientPlay1) isProgramEvent() {} type programEventClientPlay2 struct { done chan struct{} - client *serverClient + client *client } func (programEventClientPlay2) isProgramEvent() {} type programEventClientPlayStop struct { done chan struct{} - client *serverClient + client *client } func (programEventClientPlayStop) isProgramEvent() {} type programEventClientRecord struct { done chan struct{} - client *serverClient + client *client } func (programEventClientRecord) isProgramEvent() {} type programEventClientRecordStop struct { done chan struct{} - client *serverClient + client *client } func (programEventClientRecordStop) isProgramEvent() {} @@ -121,46 +124,45 @@ type programEventClientFrameTcp struct { func (programEventClientFrameTcp) isProgramEvent() {} -type programEventStreamerReady struct { +type programEventSourceReady struct { source *source } -func (programEventStreamerReady) isProgramEvent() {} +func (programEventSourceReady) isProgramEvent() {} -type programEventStreamerNotReady struct { +type programEventSourceNotReady struct { source *source } -func (programEventStreamerNotReady) isProgramEvent() {} +func (programEventSourceNotReady) isProgramEvent() {} -type programEventStreamerFrame struct { +type programEventSourceFrame struct { source *source trackId int streamType gortsplib.StreamType buf []byte } -func (programEventStreamerFrame) isProgramEvent() {} +func (programEventSourceFrame) isProgramEvent() {} + +type programEventSourceReset struct { + source *source +} + +func (programEventSourceReset) isProgramEvent() {} type programEventTerminate struct{} func (programEventTerminate) isProgramEvent() {} -// a publisher can be either a serverClient or a source -type publisher interface { - publisherIsReady() bool - publisherSdpText() []byte - publisherSdpParsed() *sdp.SessionDescription -} - type program struct { conf *conf - rtspl *serverTcpListener - rtpl *serverUdpListener - rtcpl *serverUdpListener - clients map[*serverClient]struct{} + rtspl *serverTcp + rtpl *serverUdp + rtcpl *serverUdp sources []*source - publishers map[string]publisher + clients map[*client]struct{} + paths map[string]*path publisherCount int readerCount int @@ -188,18 +190,18 @@ func newProgram(args []string, stdin io.Reader) (*program, error) { } p := &program{ - conf: conf, - clients: make(map[*serverClient]struct{}), - publishers: make(map[string]publisher), - events: make(chan programEvent), - done: make(chan struct{}), + conf: conf, + clients: make(map[*client]struct{}), + paths: make(map[string]*path), + events: make(chan programEvent), + done: make(chan struct{}), } for path, pconf := range conf.Paths { if pconf.Source != "record" { - s := newSource(p, path, pconf.sourceUrl, pconf.sourceProtocolParsed) + s := newSource(p, path, pconf) p.sources = append(p.sources, s) - p.publishers[path] = s + p.paths[path] = newPath(p, path, s) } } @@ -217,17 +219,17 @@ func newProgram(args []string, stdin io.Reader) (*program, error) { http.DefaultServeMux = http.NewServeMux() } - p.rtpl, err = newServerUdpListener(p, conf.RtpPort, gortsplib.StreamTypeRtp) + p.rtpl, err = newServerUdp(p, conf.RtpPort, gortsplib.StreamTypeRtp) if err != nil { return nil, err } - p.rtcpl, err = newServerUdpListener(p, conf.RtcpPort, gortsplib.StreamTypeRtcp) + p.rtcpl, err = newServerUdp(p, conf.RtcpPort, gortsplib.StreamTypeRtcp) if err != nil { return nil, err } - p.rtspl, err = newServerTcpListener(p) + p.rtspl, err = newServerTcp(p) if err != nil { return nil, err } @@ -249,163 +251,170 @@ func (p *program) log(format string, args ...interface{}) { } func (p *program) run() { + checkPathsTicker := time.NewTicker(5 * time.Second) + defer checkPathsTicker.Stop() + outer: - for rawEvt := range p.events { - switch evt := rawEvt.(type) { - case programEventClientNew: - c := newServerClient(p, evt.nconn) - p.clients[c] = struct{}{} - c.log("connected") + for { + select { + case <-checkPathsTicker.C: + for _, path := range p.paths { + path.check() + } - case programEventClientClose: - // already deleted - if _, ok := p.clients[evt.client]; !ok { + case rawEvt := <-p.events: + switch evt := rawEvt.(type) { + case programEventClientNew: + c := newServerClient(p, evt.nconn) + p.clients[c] = struct{}{} + c.log("connected") + + case programEventClientClose: + delete(p.clients, evt.client) + + if evt.client.path != "" { + if path, ok := p.paths[evt.client.path]; ok { + // if this is a publisher + if path.publisher == evt.client { + path.publisherReset() + + // delete the path + delete(p.paths, evt.client.path) + } + } + } + + evt.client.log("disconnected") close(evt.done) - continue - } - delete(p.clients, evt.client) + case programEventClientDescribe: + path, ok := p.paths[evt.path] - if evt.client.path != "" { - if pub, ok := p.publishers[evt.client.path]; ok && pub == evt.client { - delete(p.publishers, evt.client.path) + // no path: return 404 + if !ok { + evt.client.describeRes <- nil + continue } - } - evt.client.log("disconnected") - close(evt.done) + sdpText, wait := path.describe() - case programEventClientDescribe: - pub, ok := p.publishers[evt.path] - if !ok || !pub.publisherIsReady() { + if wait { + evt.client.path = evt.path + evt.client.state = clientStateWaitingDescription + continue + } + + evt.client.describeRes <- sdpText + + case programEventClientAnnounce: + _, ok := p.paths[evt.path] + if ok { + evt.res <- fmt.Errorf("someone is already publishing on path '%s'", evt.path) + continue + } + + evt.client.path = evt.path + evt.client.state = clientStateAnnounce + p.paths[evt.path] = newPath(p, evt.path, evt.client) + p.paths[evt.path].publisherSdpText = evt.sdpText + p.paths[evt.path].publisherSdpParsed = evt.sdpParsed evt.res <- nil - continue - } - evt.res <- pub.publisherSdpText() - - case programEventClientAnnounce: - _, ok := p.publishers[evt.path] - if ok { - evt.res <- fmt.Errorf("someone is already publishing on path '%s'", evt.path) - continue - } - - evt.client.path = evt.path - evt.client.state = clientStateAnnounce - p.publishers[evt.path] = evt.client - evt.res <- nil - - case programEventClientSetupPlay: - pub, ok := p.publishers[evt.path] - if !ok || !pub.publisherIsReady() { - evt.res <- fmt.Errorf("no one is streaming on path '%s'", evt.path) - continue - } - - sdpParsed := pub.publisherSdpParsed() - - if len(evt.client.streamTracks) >= len(sdpParsed.MediaDescriptions) { - evt.res <- fmt.Errorf("all the tracks have already been setup") - continue - } - - evt.client.path = evt.path - evt.client.streamProtocol = evt.protocol - evt.client.streamTracks = append(evt.client.streamTracks, &serverClientTrack{ - rtpPort: evt.rtpPort, - rtcpPort: evt.rtcpPort, - }) - evt.client.state = clientStatePrePlay - evt.res <- nil - - case programEventClientSetupRecord: - evt.client.streamProtocol = evt.protocol - evt.client.streamTracks = append(evt.client.streamTracks, &serverClientTrack{ - rtpPort: evt.rtpPort, - rtcpPort: evt.rtcpPort, - }) - evt.client.state = clientStatePreRecord - evt.res <- nil - - case programEventClientPlay1: - pub, ok := p.publishers[evt.client.path] - if !ok || !pub.publisherIsReady() { - evt.res <- fmt.Errorf("no one is streaming on path '%s'", evt.client.path) - continue - } - - sdpParsed := pub.publisherSdpParsed() - - if len(evt.client.streamTracks) != len(sdpParsed.MediaDescriptions) { - evt.res <- fmt.Errorf("not all tracks have been setup") - continue - } - - evt.res <- nil - - case programEventClientPlay2: - p.readerCount += 1 - evt.client.state = clientStatePlay - close(evt.done) - - case programEventClientPlayStop: - p.readerCount -= 1 - evt.client.state = clientStatePrePlay - close(evt.done) - - case programEventClientRecord: - p.publisherCount += 1 - evt.client.state = clientStateRecord - close(evt.done) - - case programEventClientRecordStop: - p.publisherCount -= 1 - evt.client.state = clientStatePreRecord - - // close all other clients that share the same path - for oc := range p.clients { - if oc != evt.client && oc.path == evt.client.path { - go oc.close() + case programEventClientSetupPlay: + path, ok := p.paths[evt.path] + if !ok || !path.publisherReady { + evt.res <- fmt.Errorf("no one is publishing on path '%s'", evt.path) + continue } - } - close(evt.done) - - case programEventClientFrameUdp: - client, trackId := p.findPublisher(evt.addr, evt.streamType) - if client == nil { - continue - } - - client.rtcpReceivers[trackId].OnFrame(evt.streamType, evt.buf) - p.forwardFrame(client.path, trackId, evt.streamType, evt.buf) - - case programEventClientFrameTcp: - p.forwardFrame(evt.path, evt.trackId, evt.streamType, evt.buf) - - case programEventStreamerReady: - evt.source.ready = true - p.publisherCount += 1 - evt.source.log("ready") - - case programEventStreamerNotReady: - evt.source.ready = false - p.publisherCount -= 1 - evt.source.log("not ready") - - // close all clients that share the same path - for oc := range p.clients { - if oc.path == evt.source.path { - go oc.close() + if len(evt.client.streamTracks) >= len(path.publisherSdpParsed.MediaDescriptions) { + evt.res <- fmt.Errorf("all the tracks have already been setup") + continue } + + evt.client.path = evt.path + evt.client.streamProtocol = evt.protocol + evt.client.streamTracks = append(evt.client.streamTracks, &clientTrack{ + rtpPort: evt.rtpPort, + rtcpPort: evt.rtcpPort, + }) + evt.client.state = clientStatePrePlay + evt.res <- nil + + case programEventClientSetupRecord: + evt.client.streamProtocol = evt.protocol + evt.client.streamTracks = append(evt.client.streamTracks, &clientTrack{ + rtpPort: evt.rtpPort, + rtcpPort: evt.rtcpPort, + }) + evt.client.state = clientStatePreRecord + evt.res <- nil + + case programEventClientPlay1: + path, ok := p.paths[evt.client.path] + if !ok || !path.publisherReady { + evt.res <- fmt.Errorf("no one is publishing on path '%s'", evt.client.path) + continue + } + + if len(evt.client.streamTracks) != len(path.publisherSdpParsed.MediaDescriptions) { + evt.res <- fmt.Errorf("not all tracks have been setup") + continue + } + + evt.res <- nil + + case programEventClientPlay2: + p.readerCount += 1 + evt.client.state = clientStatePlay + close(evt.done) + + case programEventClientPlayStop: + p.readerCount -= 1 + evt.client.state = clientStatePrePlay + close(evt.done) + + case programEventClientRecord: + p.publisherCount += 1 + evt.client.state = clientStateRecord + p.paths[evt.client.path].publisherSetReady() + close(evt.done) + + case programEventClientRecordStop: + p.publisherCount -= 1 + evt.client.state = clientStatePreRecord + p.paths[evt.client.path].publisherSetNotReady() + close(evt.done) + + case programEventClientFrameUdp: + client, trackId := p.findClientPublisher(evt.addr, evt.streamType) + if client == nil { + continue + } + + client.rtcpReceivers[trackId].OnFrame(evt.streamType, evt.buf) + p.forwardFrame(client.path, trackId, evt.streamType, evt.buf) + + case programEventClientFrameTcp: + p.forwardFrame(evt.path, evt.trackId, evt.streamType, evt.buf) + + case programEventSourceReady: + evt.source.log("ready") + p.paths[evt.source.path].publisherSetReady() + + case programEventSourceNotReady: + evt.source.log("not ready") + p.paths[evt.source.path].publisherSetNotReady() + + case programEventSourceFrame: + p.forwardFrame(evt.source.path, evt.trackId, evt.streamType, evt.buf) + + case programEventSourceReset: + p.paths[evt.source.path].publisherReset() + + case programEventTerminate: + break outer } - - case programEventStreamerFrame: - p.forwardFrame(evt.source.path, evt.trackId, evt.streamType, evt.buf) - - case programEventTerminate: - break outer } } @@ -416,7 +425,7 @@ outer: close(evt.done) case programEventClientDescribe: - evt.res <- nil + evt.client.describeRes <- nil case programEventClientAnnounce: evt.res <- fmt.Errorf("terminated") @@ -446,7 +455,8 @@ outer: }() for _, s := range p.sources { - s.close() + s.events <- sourceEventTerminate{} + <-s.done } p.rtspl.close() @@ -454,7 +464,8 @@ outer: p.rtpl.close() for c := range p.clients { - c.close() + c.conn.NetConn().Close() + <-c.done } close(p.events) @@ -466,9 +477,21 @@ func (p *program) close() { <-p.done } -func (p *program) findPublisher(addr *net.UDPAddr, streamType gortsplib.StreamType) (*serverClient, int) { - for _, pub := range p.publishers { - cl, ok := pub.(*serverClient) +func (p *program) findConfForPath(path string) *confPath { + if pconf, ok := p.conf.Paths[path]; ok { + return pconf + } + + if pconf, ok := p.conf.Paths["all"]; ok { + return pconf + } + + return nil +} + +func (p *program) findClientPublisher(addr *net.UDPAddr, streamType gortsplib.StreamType) (*client, int) { + for _, path := range p.paths { + cl, ok := path.publisher.(*client) if !ok { continue } @@ -523,7 +546,7 @@ func (p *program) forwardFrame(path string, trackId int, streamType gortsplib.St buf = buf[:len(frame)] copy(buf, frame) - client.events <- serverClientEventFrameTcp{ + client.events <- clientEventFrameTcp{ frame: &gortsplib.InterleavedFrame{ TrackId: trackId, StreamType: streamType, diff --git a/path.go b/path.go new file mode 100644 index 00000000..862ebd93 --- /dev/null +++ b/path.go @@ -0,0 +1,110 @@ +package main + +import ( + "time" + + "github.com/aler9/sdp/v3" +) + +// a publisher is either a client or a source +type publisher interface { + isPublisher() +} + +type path struct { + p *program + id string + publisher publisher + publisherReady bool + publisherSdpText []byte + publisherSdpParsed *sdp.SessionDescription + lastRequested time.Time +} + +func newPath(p *program, id string, publisher publisher) *path { + return &path{ + p: p, + id: id, + publisher: publisher, + } +} + +func (p *path) check() { + hasClients := func() bool { + for c := range p.p.clients { + if c.path == p.id { + return true + } + } + return false + }() + source, publisherIsSource := p.publisher.(*source) + + // stop source if needed + if !hasClients && + publisherIsSource && + source.state == sourceStateRunning && + time.Since(p.lastRequested) >= 10*time.Second { + source.log("stopping due to inactivity") + source.state = sourceStateStopped + source.events <- sourceEventApplyState{source.state} + } +} + +func (p *path) describe() ([]byte, bool) { + p.lastRequested = time.Now() + + // publisher was found but is not ready: wait + if !p.publisherReady { + // start source if needed + if source, ok := p.publisher.(*source); ok && source.state == sourceStateStopped { + source.log("starting on demand") + source.state = sourceStateRunning + source.events <- sourceEventApplyState{source.state} + } + + return nil, true + } + + // publisher was found and is ready + return p.publisherSdpText, false +} + +func (p *path) publisherSetReady() { + p.publisherReady = true + + // reply to all clients that are waiting for a description + for c := range p.p.clients { + if c.state == clientStateWaitingDescription && + c.path == p.id { + c.path = "" + c.state = clientStateInitial + c.describeRes <- p.publisherSdpText + } + } +} + +func (p *path) publisherSetNotReady() { + p.publisherReady = false + + // close all clients that are reading + for c := range p.p.clients { + if c.state != clientStateWaitingDescription && + c != p.publisher && + c.path == p.id { + c.conn.NetConn().Close() + } + } +} + +func (p *path) publisherReset() { + // reply to all clients that were waiting for a description + for oc := range p.p.clients { + if oc.state == clientStateWaitingDescription && + oc.path == p.id { + oc.path = "" + oc.state = clientStateInitial + oc.describeRes <- nil + } + } +} diff --git a/rtsp-simple-server.yml b/rtsp-simple-server.yml index d05da04f..9c093934 100644 --- a/rtsp-simple-server.yml +++ b/rtsp-simple-server.yml @@ -15,6 +15,7 @@ readTimeout: 10s # timeout of write operations writeTimeout: 5s # supported authentication methods +# WARNING: both methods are insecure, use RTSP inside a VPN to enforce security. authMethods: [basic, digest] # enable pprof on port 9999 to monitor performances pprof: false @@ -29,6 +30,9 @@ paths: source: record # if the source is an RTSP url, this is the protocol that will be used to pull the stream sourceProtocol: udp + # if the source is an RTSP url, it will be pulled only when at least one reader + # is connected, saving bandwidth + sourceOnDemand: no # username required to publish publishUser: diff --git a/server-tcpl.go b/server-tcp.go similarity index 67% rename from server-tcpl.go rename to server-tcp.go index 34f59fc9..8ad4f369 100644 --- a/server-tcpl.go +++ b/server-tcp.go @@ -4,14 +4,14 @@ import ( "net" ) -type serverTcpListener struct { +type serverTcp struct { p *program nconn *net.TCPListener done chan struct{} } -func newServerTcpListener(p *program) (*serverTcpListener, error) { +func newServerTcp(p *program) (*serverTcp, error) { nconn, err := net.ListenTCP("tcp", &net.TCPAddr{ Port: p.conf.RtspPort, }) @@ -19,7 +19,7 @@ func newServerTcpListener(p *program) (*serverTcpListener, error) { return nil, err } - l := &serverTcpListener{ + l := &serverTcp{ p: p, nconn: nconn, done: make(chan struct{}), @@ -29,11 +29,11 @@ func newServerTcpListener(p *program) (*serverTcpListener, error) { return l, nil } -func (l *serverTcpListener) log(format string, args ...interface{}) { +func (l *serverTcp) log(format string, args ...interface{}) { l.p.log("[TCP listener] "+format, args...) } -func (l *serverTcpListener) run() { +func (l *serverTcp) run() { for { nconn, err := l.nconn.AcceptTCP() if err != nil { @@ -46,7 +46,7 @@ func (l *serverTcpListener) run() { close(l.done) } -func (l *serverTcpListener) close() { +func (l *serverTcp) close() { l.nconn.Close() <-l.done } diff --git a/server-udpl.go b/server-udp.go similarity index 80% rename from server-udpl.go rename to server-udp.go index eecacf92..1c296255 100644 --- a/server-udpl.go +++ b/server-udp.go @@ -12,7 +12,7 @@ type udpAddrBufPair struct { buf []byte } -type serverUdpListener struct { +type serverUdp struct { p *program nconn *net.UDPConn streamType gortsplib.StreamType @@ -23,7 +23,7 @@ type serverUdpListener struct { done chan struct{} } -func newServerUdpListener(p *program, port int, streamType gortsplib.StreamType) (*serverUdpListener, error) { +func newServerUdp(p *program, port int, streamType gortsplib.StreamType) (*serverUdp, error) { nconn, err := net.ListenUDP("udp", &net.UDPAddr{ Port: port, }) @@ -31,7 +31,7 @@ func newServerUdpListener(p *program, port int, streamType gortsplib.StreamType) return nil, err } - l := &serverUdpListener{ + l := &serverUdp{ p: p, nconn: nconn, streamType: streamType, @@ -45,7 +45,7 @@ func newServerUdpListener(p *program, port int, streamType gortsplib.StreamType) return l, nil } -func (l *serverUdpListener) log(format string, args ...interface{}) { +func (l *serverUdp) log(format string, args ...interface{}) { var label string if l.streamType == gortsplib.StreamTypeRtp { label = "RTP" @@ -55,7 +55,7 @@ func (l *serverUdpListener) log(format string, args ...interface{}) { l.p.log("[UDP/"+label+" listener] "+format, args...) } -func (l *serverUdpListener) run() { +func (l *serverUdp) run() { writeDone := make(chan struct{}) go func() { defer close(writeDone) @@ -85,12 +85,12 @@ func (l *serverUdpListener) run() { close(l.done) } -func (l *serverUdpListener) close() { +func (l *serverUdp) close() { l.nconn.Close() <-l.done } -func (l *serverUdpListener) write(pair *udpAddrBufPair) { +func (l *serverUdp) write(pair *udpAddrBufPair) { // replace input buffer with write buffer buf := l.writeBuf.swap() buf = buf[:len(pair.buf)] diff --git a/source.go b/source.go index 110a0af6..79a0e474 100644 --- a/source.go +++ b/source.go @@ -3,13 +3,11 @@ package main import ( "math/rand" "net" - "net/url" "os" "sync" "time" "github.com/aler9/gortsplib" - "github.com/aler9/sdp/v3" ) const ( @@ -18,28 +16,51 @@ const ( sourceTcpReadBufferSize = 128 * 1024 ) -type source struct { - p *program - path string - u *url.URL - proto gortsplib.StreamProtocol - ready bool - tracks []*gortsplib.Track - serverSdpText []byte - serverSdpParsed *sdp.SessionDescription +type sourceState int - terminate chan struct{} - done chan struct{} +const ( + sourceStateStopped sourceState = iota + sourceStateRunning +) + +type sourceEvent interface { + isSourceEvent() } -func newSource(p *program, path string, u *url.URL, proto gortsplib.StreamProtocol) *source { +type sourceEventApplyState struct { + state sourceState +} + +func (sourceEventApplyState) isSourceEvent() {} + +type sourceEventTerminate struct{} + +func (sourceEventTerminate) isSourceEvent() {} + +type source struct { + p *program + path string + pconf *confPath + state sourceState + tracks []*gortsplib.Track + + events chan sourceEvent + done chan struct{} +} + +func newSource(p *program, path string, pconf *confPath) *source { s := &source{ - p: p, - path: path, - u: u, - proto: proto, - terminate: make(chan struct{}), - done: make(chan struct{}), + p: p, + path: path, + pconf: pconf, + events: make(chan sourceEvent), + done: make(chan struct{}), + } + + if pconf.SourceOnDemand { + s.state = sourceStateStopped + } else { + s.state = sourceStateRunning } return s @@ -49,45 +70,88 @@ func (s *source) log(format string, args ...interface{}) { s.p.log("[source "+s.path+"] "+format, args...) } -func (s *source) publisherIsReady() bool { - return s.ready -} - -func (s *source) publisherSdpText() []byte { - return s.serverSdpText -} - -func (s *source) publisherSdpParsed() *sdp.SessionDescription { - return s.serverSdpParsed -} +func (s *source) isPublisher() {} func (s *source) run() { - for { - ok := s.do() - if !ok { - break - } + running := false + var doTerminate chan struct{} + var doDone chan struct{} - t := time.NewTimer(sourceRetryInterval) - select { - case <-s.terminate: - break - case <-t.C: + applyState := func(state sourceState) { + if state == sourceStateRunning { + if !running { + s.log("started") + running = true + doTerminate = make(chan struct{}) + doDone = make(chan struct{}) + go s.do(doTerminate, doDone) + } + } else { + if running { + close(doTerminate) + <-doDone + running = false + s.log("stopped") + } } } + applyState(s.state) + +outer: + for rawEvt := range s.events { + switch evt := rawEvt.(type) { + case sourceEventApplyState: + applyState(evt.state) + + case sourceEventTerminate: + break outer + } + } + + if running { + close(doTerminate) + <-doDone + } + close(s.done) } -func (s *source) do() bool { - s.log("initializing with protocol %s", s.proto) +func (s *source) do(terminate chan struct{}, done chan struct{}) { + defer close(done) + + for { + ok := s.doInner(terminate) + if !ok { + break + } + + s.p.events <- programEventSourceReset{s} + + if !func() bool { + t := time.NewTimer(sourceRetryInterval) + defer t.Stop() + select { + case <-terminate: + return false + case <-t.C: + return true + } + }() { + break + } + } +} + +func (s *source) doInner(terminate chan struct{}) bool { + s.log("connecting") var conn *gortsplib.ConnClient var err error dialDone := make(chan struct{}) go func() { conn, err = gortsplib.NewConnClient(gortsplib.ConnClientConf{ - Host: s.u.Host, + Host: s.pconf.sourceUrl.Host, ReadTimeout: s.p.conf.ReadTimeout, WriteTimeout: s.p.conf.WriteTimeout, }) @@ -95,7 +159,7 @@ func (s *source) do() bool { }() select { - case <-s.terminate: + case <-terminate: return false case <-dialDone: } @@ -107,13 +171,13 @@ func (s *source) do() bool { defer conn.Close() - _, err = conn.Options(s.u) + _, err = conn.Options(s.pconf.sourceUrl) if err != nil { s.log("ERR: %s", err) return true } - tracks, _, err := conn.Describe(s.u) + tracks, _, err := conn.Describe(s.pconf.sourceUrl) if err != nil { s.log("ERR: %s", err) return true @@ -123,17 +187,17 @@ func (s *source) do() bool { serverSdpParsed, serverSdpText := sdpForServer(tracks) s.tracks = tracks - s.serverSdpText = serverSdpText - s.serverSdpParsed = serverSdpParsed + s.p.paths[s.path].publisherSdpText = serverSdpText + s.p.paths[s.path].publisherSdpParsed = serverSdpParsed - if s.proto == gortsplib.StreamProtocolUdp { - return s.runUdp(conn) + if s.pconf.sourceProtocolParsed == gortsplib.StreamProtocolUdp { + return s.runUdp(terminate, conn) } else { - return s.runTcp(conn) + return s.runTcp(terminate, conn) } } -func (s *source) runUdp(conn *gortsplib.ConnClient) bool { +func (s *source) runUdp(terminate chan struct{}, conn *gortsplib.ConnClient) bool { type trackListenerPair struct { rtpl *gortsplib.ConnClientUdpListener rtcpl *gortsplib.ConnClientUdpListener @@ -151,7 +215,7 @@ func (s *source) runUdp(conn *gortsplib.ConnClient) bool { rtpPort := (rand.Intn((65535-10000)/2) * 2) + 10000 rtcpPort := rtpPort + 1 - rtpl, rtcpl, _, err = conn.SetupUdp(s.u, track, rtpPort, rtcpPort) + rtpl, rtcpl, _, err = conn.SetupUdp(s.pconf.sourceUrl, track, rtpPort, rtcpPort) if err != nil { // retry if it's a bind error if nerr, ok := err.(*net.OpError); ok { @@ -175,13 +239,13 @@ func (s *source) runUdp(conn *gortsplib.ConnClient) bool { }) } - _, err := conn.Play(s.u) + _, err := conn.Play(s.pconf.sourceUrl) if err != nil { s.log("ERR: %s", err) return true } - s.p.events <- programEventStreamerReady{s} + s.p.events <- programEventSourceReady{s} var wg sync.WaitGroup @@ -201,7 +265,7 @@ func (s *source) runUdp(conn *gortsplib.ConnClient) bool { break } - s.p.events <- programEventStreamerFrame{s, trackId, gortsplib.StreamTypeRtp, buf[:n]} + s.p.events <- programEventSourceFrame{s, trackId, gortsplib.StreamTypeRtp, buf[:n]} } }(trackId, lp.rtpl) @@ -218,14 +282,14 @@ func (s *source) runUdp(conn *gortsplib.ConnClient) bool { break } - s.p.events <- programEventStreamerFrame{s, trackId, gortsplib.StreamTypeRtcp, buf[:n]} + s.p.events <- programEventSourceFrame{s, trackId, gortsplib.StreamTypeRtcp, buf[:n]} } }(trackId, lp.rtcpl) } tcpConnDone := make(chan error) go func() { - tcpConnDone <- conn.LoopUDP(s.u) + tcpConnDone <- conn.LoopUDP(s.pconf.sourceUrl) }() var ret bool @@ -233,7 +297,7 @@ func (s *source) runUdp(conn *gortsplib.ConnClient) bool { outer: for { select { - case <-s.terminate: + case <-terminate: conn.NetConn().Close() <-tcpConnDone ret = false @@ -246,7 +310,7 @@ outer: } } - s.p.events <- programEventStreamerNotReady{s} + s.p.events <- programEventSourceNotReady{s} for _, lp := range listeners { lp.rtpl.Close() @@ -257,22 +321,22 @@ outer: return ret } -func (s *source) runTcp(conn *gortsplib.ConnClient) bool { +func (s *source) runTcp(terminate chan struct{}, conn *gortsplib.ConnClient) bool { for _, track := range s.tracks { - _, err := conn.SetupTcp(s.u, track) + _, err := conn.SetupTcp(s.pconf.sourceUrl, track) if err != nil { s.log("ERR: %s", err) return true } } - _, err := conn.Play(s.u) + _, err := conn.Play(s.pconf.sourceUrl) if err != nil { s.log("ERR: %s", err) return true } - s.p.events <- programEventStreamerReady{s} + s.p.events <- programEventSourceReady{s} frame := &gortsplib.InterleavedFrame{} doubleBuf := newDoubleBuffer(sourceTcpReadBufferSize) @@ -289,7 +353,7 @@ func (s *source) runTcp(conn *gortsplib.ConnClient) bool { return } - s.p.events <- programEventStreamerFrame{s, frame.TrackId, frame.StreamType, frame.Content} + s.p.events <- programEventSourceFrame{s, frame.TrackId, frame.StreamType, frame.Content} } }() @@ -298,7 +362,7 @@ func (s *source) runTcp(conn *gortsplib.ConnClient) bool { outer: for { select { - case <-s.terminate: + case <-terminate: conn.NetConn().Close() <-tcpConnDone ret = false @@ -311,12 +375,7 @@ outer: } } - s.p.events <- programEventStreamerNotReady{s} + s.p.events <- programEventSourceNotReady{s} return ret } - -func (s *source) close() { - close(s.terminate) - <-s.done -}