From bb66a03f3d79a2398b406c12976d03e7d4db34ea Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Fri, 18 Sep 2020 22:43:39 +0200 Subject: [PATCH] improve performance when publishing to the server or receiving streams in proxy mode --- client.go | 49 +++++++++++++++++++++++++++++-------------------- go.mod | 2 +- go.sum | 4 ++-- main.go | 3 +++ 4 files changed, 35 insertions(+), 23 deletions(-) diff --git a/client.go b/client.go index 90592f64..957220d7 100644 --- a/client.go +++ b/client.go @@ -133,19 +133,20 @@ func (cs clientState) String() string { } type client struct { - p *program - conn *gortsplib.ConnServer - state clientState - path *path - authUser string - authPass string - authHelper *gortsplib.AuthServer - authFailures int - streamProtocol gortsplib.StreamProtocol - streamTracks map[int]*clientTrack - rtcpReceivers []*gortsplib.RtcpReceiver - describeCSeq gortsplib.HeaderValue - describeUrl string + p *program + conn *gortsplib.ConnServer + state clientState + path *path + authUser string + authPass string + authHelper *gortsplib.AuthServer + authFailures int + streamProtocol gortsplib.StreamProtocol + streamTracks map[int]*clientTrack + rtcpReceivers []*gortsplib.RtcpReceiver + udpLastFrameTimes []*int64 + describeCSeq gortsplib.HeaderValue + describeUrl string describe chan describeRes tcpFrame chan *gortsplib.InterleavedFrame @@ -1069,6 +1070,14 @@ func (c *client) runRecord() bool { c.rtcpReceivers[trackId] = gortsplib.NewRtcpReceiver() } + if c.streamProtocol == gortsplib.StreamProtocolUDP { + c.udpLastFrameTimes = make([]*int64, len(c.streamTracks)) + for trackId := range c.streamTracks { + v := time.Now().Unix() + c.udpLastFrameTimes[trackId] = &v + } + } + c.p.clientRecord <- c c.log("is publishing on path '%s', %d %s via %s", c.path.name, len(c.streamTracks), func() string { @@ -1098,10 +1107,6 @@ func (c *client) runRecord() bool { onPublishCmd.Wait() } - for trackId := range c.streamTracks { - c.rtcpReceivers[trackId].Close() - } - return false } @@ -1160,9 +1165,13 @@ func (c *client) runRecordUDP() { return case <-checkStreamTicker.C: - for trackId := range c.streamTracks { - if time.Since(c.rtcpReceivers[trackId].LastFrameTime()) >= c.p.conf.ReadTimeout { - c.log("ERR: no packets received recently (maybe there's a firewall/NAT)") + now := time.Now() + + for _, lastUnix := range c.udpLastFrameTimes { + last := time.Unix(atomic.LoadInt64(lastUnix), 0) + + if now.Sub(last) >= c.p.conf.ReadTimeout { + c.log("ERR: no packets received recently (maybe there's a firewall/NAT in between)") c.conn.Close() <-readDone c.p.clientClose <- c diff --git a/go.mod b/go.mod index de0ff2ab..80e4e341 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.12 require ( github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect - github.com/aler9/gortsplib v0.0.0-20200913144012-45cf5562dede + github.com/aler9/gortsplib v0.0.0-20200918204140-dd671c35ba0e github.com/davecgh/go-spew v1.1.1 // indirect github.com/stretchr/testify v1.6.1 gopkg.in/alecthomas/kingpin.v2 v2.2.6 diff --git a/go.sum b/go.sum index 185411a9..1ab91064 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/aler9/gortsplib v0.0.0-20200913144012-45cf5562dede h1:J1fnRadD+GLXwZTnn/tzr8qJnAR9liQJ6+h1eRqoxq0= -github.com/aler9/gortsplib v0.0.0-20200913144012-45cf5562dede/go.mod h1:XybE/Zt1yFtnNEjNhAyg2w6VjD8aJ79GFfpSjkfLNd8= +github.com/aler9/gortsplib v0.0.0-20200918204140-dd671c35ba0e h1:6QuNm+AZxHDtfvti5Fj8LXLMpsayJ8rY6RAiLAmahPg= +github.com/aler9/gortsplib v0.0.0-20200918204140-dd671c35ba0e/go.mod h1:XybE/Zt1yFtnNEjNhAyg2w6VjD8aJ79GFfpSjkfLNd8= github.com/aler9/sdp-dirty/v3 v3.0.0-20200905103724-214b7cc25cfd h1:s/l20rPNGiyjggMdkhsLu0aQ0K0OFcROUMBDu7fGT+I= github.com/aler9/sdp-dirty/v3 v3.0.0-20200905103724-214b7cc25cfd/go.mod h1:5bO/aUQr9m3OasDatNNcVqKAgs7r5hgGXmszWHaC6mI= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= diff --git a/main.go b/main.go index d36a778e..2cdb8184 100644 --- a/main.go +++ b/main.go @@ -316,7 +316,10 @@ outer: continue } + atomic.StoreInt64(pub.client.udpLastFrameTimes[pub.trackId], time.Now().Unix()) + pub.client.rtcpReceivers[pub.trackId].OnFrame(req.streamType, req.buf) + p.forwardFrame(pub.client.path, pub.trackId, req.streamType, req.buf) case req := <-p.clientFrameTCP: