From 8ecd0869f3556077da8363304954c30ed6a470b0 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Mon, 6 Dec 2021 13:52:55 +0100 Subject: [PATCH] server: run checkStreamTimer only when transport is udp --- serversession.go | 39 ++++++++++++++++++--------------------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/serversession.go b/serversession.go index d4b2ec17..24099c8a 100644 --- a/serversession.go +++ b/serversession.go @@ -168,8 +168,8 @@ type ServerSession struct { tcpConn *ServerConn // tcp announcedTracks []ServerSessionAnnouncedTrack // publish udpLastFrameTime *int64 // publish, udp - checkStreamTimer *time.Timer - udpReceiverReportTimer *time.Timer + udpCheckStreamTimer *time.Timer // udp + udpReceiverReportTimer *time.Timer // udp // in request chan sessionRequestReq @@ -191,7 +191,7 @@ func newServerSession( ctxCancel: ctxCancel, conns: make(map[*ServerConn]struct{}), lastRequestTime: time.Now(), - checkStreamTimer: emptyTimer(), + udpCheckStreamTimer: emptyTimer(), udpReceiverReportTimer: emptyTimer(), request: make(chan sessionRequestReq), connRemove: make(chan *ServerConn), @@ -303,32 +303,26 @@ func (ss *ServerSession) run() { } } - case <-ss.checkStreamTimer.C: - switch { + case <-ss.udpCheckStreamTimer.C: + now := time.Now() + // in case of RECORD and UDP, timeout happens when no RTP or RTCP packets are being received - case ss.state == ServerSessionStatePublish && (*ss.setuppedTransport == TransportUDP || - *ss.setuppedTransport == TransportUDPMulticast): - now := time.Now() + if ss.state == ServerSessionStatePublish { lft := atomic.LoadInt64(ss.udpLastFrameTime) if now.Sub(time.Unix(lft, 0)) >= ss.s.ReadTimeout { return liberrors.ErrServerNoUDPPacketsInAWhile{} } // in case of PLAY and UDP, timeout happens when no RTSP request arrives - case ss.state == ServerSessionStateRead && (*ss.setuppedTransport == TransportUDP || - *ss.setuppedTransport == TransportUDPMulticast): - now := time.Now() - if now.Sub(ss.lastRequestTime) >= ss.s.closeSessionAfterNoRequestsFor { - return liberrors.ErrServerNoRTSPRequestsInAWhile{} - } - - // in case of TCP, there's no timeout until all associated connections are closed + } else if now.Sub(ss.lastRequestTime) >= ss.s.closeSessionAfterNoRequestsFor { + return liberrors.ErrServerNoRTSPRequestsInAWhile{} } - ss.checkStreamTimer = time.NewTimer(ss.s.checkStreamPeriod) + ss.udpCheckStreamTimer = time.NewTimer(ss.s.checkStreamPeriod) case <-ss.udpReceiverReportTimer.C: now := time.Now() + for trackID, track := range ss.announcedTracks { r := track.rtcpReceiver.Report(now) ss.WritePacketRTCP(trackID, r) @@ -809,7 +803,6 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base } ss.state = ServerSessionStateRead - ss.checkStreamTimer = time.NewTimer(ss.s.checkStreamPeriod) if *ss.setuppedTransport == TransportTCP { ss.tcpConn = sc @@ -856,6 +849,8 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base switch *ss.setuppedTransport { case TransportUDP: + ss.udpCheckStreamTimer = time.NewTimer(ss.s.checkStreamPeriod) + for trackID, track := range ss.setuppedTracks { // readers can send RTCP packets sc.s.udpRTCPListener.addClient(ss.author.ip(), track.udpRTCPPort, ss, trackID, false) @@ -868,6 +863,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base return res, err case TransportUDPMulticast: + ss.udpCheckStreamTimer = time.NewTimer(ss.s.checkStreamPeriod) default: // TCP err = liberrors.ErrServerTCPFramesEnable{} @@ -928,10 +924,10 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base } ss.state = ServerSessionStatePublish - ss.checkStreamTimer = time.NewTimer(ss.s.checkStreamPeriod) switch *ss.setuppedTransport { case TransportUDP: + ss.udpCheckStreamTimer = time.NewTimer(ss.s.checkStreamPeriod) ss.udpReceiverReportTimer = time.NewTimer(ss.s.udpReceiverReportPeriod) for trackID, track := range ss.setuppedTracks { @@ -946,6 +942,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base } case TransportUDPMulticast: + ss.udpCheckStreamTimer = time.NewTimer(ss.s.checkStreamPeriod) ss.udpReceiverReportTimer = time.NewTimer(ss.s.udpReceiverReportPeriod) default: // TCP @@ -996,7 +993,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base ss.setuppedStream.readerSetInactive(ss) ss.state = ServerSessionStatePreRead - ss.checkStreamTimer = emptyTimer() + ss.udpCheckStreamTimer = emptyTimer() ss.tcpConn = nil switch *ss.setuppedTransport { @@ -1011,7 +1008,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base case ServerSessionStatePublish: ss.state = ServerSessionStatePrePublish - ss.checkStreamTimer = emptyTimer() + ss.udpCheckStreamTimer = emptyTimer() ss.udpReceiverReportTimer = emptyTimer() ss.tcpConn = nil