diff --git a/client.go b/client.go index f94d4821..96f61ea9 100644 --- a/client.go +++ b/client.go @@ -38,12 +38,6 @@ const ( clientUDPKeepalivePeriod = 30 * time.Second ) -func emptyTimer() *time.Timer { - t := time.NewTimer(0) - <-t.C - return t -} - func isAnyPort(p int) bool { return p == 0 || p == 1 } diff --git a/emptytimer.go b/emptytimer.go new file mode 100644 index 00000000..a1e495ae --- /dev/null +++ b/emptytimer.go @@ -0,0 +1,11 @@ +package gortsplib + +import ( + "time" +) + +func emptyTimer() *time.Timer { + t := time.NewTimer(0) + <-t.C + return t +} diff --git a/serversession.go b/serversession.go index 806d28b2..b2178e3b 100644 --- a/serversession.go +++ b/serversession.go @@ -172,6 +172,8 @@ type ServerSession struct { tcpConn *ServerConn // tcp announcedTracks []ServerSessionAnnouncedTrack // publish udpLastFrameTime *int64 // publish, udp + checkStreamTimer *time.Timer + receiverReportTimer *time.Timer // in request chan sessionRequestReq @@ -186,15 +188,17 @@ func newServerSession( ctx, ctxCancel := context.WithCancel(s.ctx) ss := &ServerSession{ - s: s, - secretID: secretID, - author: author, - ctx: ctx, - ctxCancel: ctxCancel, - conns: make(map[*ServerConn]struct{}), - lastRequestTime: time.Now(), - request: make(chan sessionRequestReq), - connRemove: make(chan *ServerConn), + s: s, + secretID: secretID, + author: author, + ctx: ctx, + ctxCancel: ctxCancel, + conns: make(map[*ServerConn]struct{}), + lastRequestTime: time.Now(), + checkStreamTimer: emptyTimer(), + receiverReportTimer: emptyTimer(), + request: make(chan sessionRequestReq), + connRemove: make(chan *ServerConn), } s.wg.Add(1) @@ -254,12 +258,6 @@ func (ss *ServerSession) run() { } err := func() error { - checkTimeoutTicker := time.NewTicker(serverSessionCheckStreamPeriod) - defer checkTimeoutTicker.Stop() - - receiverReportTicker := time.NewTicker(ss.s.receiverReportPeriod) - defer receiverReportTicker.Stop() - for { select { case req := <-ss.request: @@ -309,7 +307,7 @@ func (ss *ServerSession) run() { } } - case <-checkTimeoutTicker.C: + case <-ss.checkStreamTimer.C: switch { // in case of RECORD and UDP, timeout happens when no RTP or RTCP packets are being received case ss.state == ServerSessionStatePublish && (*ss.setuppedTransport == TransportUDP || @@ -331,17 +329,17 @@ func (ss *ServerSession) run() { // in case of TCP, there's no timeout until all associated connections are closed } - case <-receiverReportTicker.C: - if ss.state != ServerSessionStatePublish { - continue - } + ss.checkStreamTimer = time.NewTimer(serverSessionCheckStreamPeriod) + case <-ss.receiverReportTimer.C: now := time.Now() for trackID, track := range ss.announcedTracks { r := track.rtcpReceiver.Report(now) ss.WritePacketRTCP(trackID, r) } + ss.receiverReportTimer = time.NewTimer(ss.s.receiverReportPeriod) + case <-ss.ctx.Done(): return liberrors.ErrServerTerminated{} } @@ -812,6 +810,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base } ss.state = ServerSessionStateRead + ss.checkStreamTimer = time.NewTimer(serverSessionCheckStreamPeriod) if *ss.setuppedTransport == TransportTCP { ss.tcpConn = sc @@ -930,6 +929,8 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base } ss.state = ServerSessionStatePublish + ss.checkStreamTimer = time.NewTimer(serverSessionCheckStreamPeriod) + ss.receiverReportTimer = time.NewTimer(ss.s.receiverReportPeriod) switch *ss.setuppedTransport { case TransportUDP: @@ -994,6 +995,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base ss.setuppedStream.readerSetInactive(ss) ss.state = ServerSessionStatePreRead + ss.checkStreamTimer = emptyTimer() ss.tcpConn = nil switch *ss.setuppedTransport { @@ -1008,6 +1010,8 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base case ServerSessionStatePublish: ss.state = ServerSessionStatePrePublish + ss.checkStreamTimer = emptyTimer() + ss.receiverReportTimer = emptyTimer() ss.tcpConn = nil switch *ss.setuppedTransport {