server: run checkStreamTimer only when transport is udp

This commit is contained in:
aler9
2021-12-06 13:52:55 +01:00
parent 1878a72fd9
commit 8ecd0869f3

View File

@@ -168,8 +168,8 @@ type ServerSession struct {
tcpConn *ServerConn // tcp tcpConn *ServerConn // tcp
announcedTracks []ServerSessionAnnouncedTrack // publish announcedTracks []ServerSessionAnnouncedTrack // publish
udpLastFrameTime *int64 // publish, udp udpLastFrameTime *int64 // publish, udp
checkStreamTimer *time.Timer udpCheckStreamTimer *time.Timer // udp
udpReceiverReportTimer *time.Timer udpReceiverReportTimer *time.Timer // udp
// in // in
request chan sessionRequestReq request chan sessionRequestReq
@@ -191,7 +191,7 @@ func newServerSession(
ctxCancel: ctxCancel, ctxCancel: ctxCancel,
conns: make(map[*ServerConn]struct{}), conns: make(map[*ServerConn]struct{}),
lastRequestTime: time.Now(), lastRequestTime: time.Now(),
checkStreamTimer: emptyTimer(), udpCheckStreamTimer: emptyTimer(),
udpReceiverReportTimer: emptyTimer(), udpReceiverReportTimer: emptyTimer(),
request: make(chan sessionRequestReq), request: make(chan sessionRequestReq),
connRemove: make(chan *ServerConn), connRemove: make(chan *ServerConn),
@@ -303,32 +303,26 @@ func (ss *ServerSession) run() {
} }
} }
case <-ss.checkStreamTimer.C: case <-ss.udpCheckStreamTimer.C:
switch { now := time.Now()
// in case of RECORD and UDP, timeout happens when no RTP or RTCP packets are being received // in case of RECORD and UDP, timeout happens when no RTP or RTCP packets are being received
case ss.state == ServerSessionStatePublish && (*ss.setuppedTransport == TransportUDP || if ss.state == ServerSessionStatePublish {
*ss.setuppedTransport == TransportUDPMulticast):
now := time.Now()
lft := atomic.LoadInt64(ss.udpLastFrameTime) lft := atomic.LoadInt64(ss.udpLastFrameTime)
if now.Sub(time.Unix(lft, 0)) >= ss.s.ReadTimeout { if now.Sub(time.Unix(lft, 0)) >= ss.s.ReadTimeout {
return liberrors.ErrServerNoUDPPacketsInAWhile{} return liberrors.ErrServerNoUDPPacketsInAWhile{}
} }
// in case of PLAY and UDP, timeout happens when no RTSP request arrives // in case of PLAY and UDP, timeout happens when no RTSP request arrives
case ss.state == ServerSessionStateRead && (*ss.setuppedTransport == TransportUDP || } else if now.Sub(ss.lastRequestTime) >= ss.s.closeSessionAfterNoRequestsFor {
*ss.setuppedTransport == TransportUDPMulticast): return liberrors.ErrServerNoRTSPRequestsInAWhile{}
now := time.Now()
if now.Sub(ss.lastRequestTime) >= ss.s.closeSessionAfterNoRequestsFor {
return liberrors.ErrServerNoRTSPRequestsInAWhile{}
}
// in case of TCP, there's no timeout until all associated connections are closed
} }
ss.checkStreamTimer = time.NewTimer(ss.s.checkStreamPeriod) ss.udpCheckStreamTimer = time.NewTimer(ss.s.checkStreamPeriod)
case <-ss.udpReceiverReportTimer.C: case <-ss.udpReceiverReportTimer.C:
now := time.Now() now := time.Now()
for trackID, track := range ss.announcedTracks { for trackID, track := range ss.announcedTracks {
r := track.rtcpReceiver.Report(now) r := track.rtcpReceiver.Report(now)
ss.WritePacketRTCP(trackID, r) ss.WritePacketRTCP(trackID, r)
@@ -809,7 +803,6 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
} }
ss.state = ServerSessionStateRead ss.state = ServerSessionStateRead
ss.checkStreamTimer = time.NewTimer(ss.s.checkStreamPeriod)
if *ss.setuppedTransport == TransportTCP { if *ss.setuppedTransport == TransportTCP {
ss.tcpConn = sc ss.tcpConn = sc
@@ -856,6 +849,8 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
switch *ss.setuppedTransport { switch *ss.setuppedTransport {
case TransportUDP: case TransportUDP:
ss.udpCheckStreamTimer = time.NewTimer(ss.s.checkStreamPeriod)
for trackID, track := range ss.setuppedTracks { for trackID, track := range ss.setuppedTracks {
// readers can send RTCP packets // readers can send RTCP packets
sc.s.udpRTCPListener.addClient(ss.author.ip(), track.udpRTCPPort, ss, trackID, false) sc.s.udpRTCPListener.addClient(ss.author.ip(), track.udpRTCPPort, ss, trackID, false)
@@ -868,6 +863,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
return res, err return res, err
case TransportUDPMulticast: case TransportUDPMulticast:
ss.udpCheckStreamTimer = time.NewTimer(ss.s.checkStreamPeriod)
default: // TCP default: // TCP
err = liberrors.ErrServerTCPFramesEnable{} err = liberrors.ErrServerTCPFramesEnable{}
@@ -928,10 +924,10 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
} }
ss.state = ServerSessionStatePublish ss.state = ServerSessionStatePublish
ss.checkStreamTimer = time.NewTimer(ss.s.checkStreamPeriod)
switch *ss.setuppedTransport { switch *ss.setuppedTransport {
case TransportUDP: case TransportUDP:
ss.udpCheckStreamTimer = time.NewTimer(ss.s.checkStreamPeriod)
ss.udpReceiverReportTimer = time.NewTimer(ss.s.udpReceiverReportPeriod) ss.udpReceiverReportTimer = time.NewTimer(ss.s.udpReceiverReportPeriod)
for trackID, track := range ss.setuppedTracks { for trackID, track := range ss.setuppedTracks {
@@ -946,6 +942,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
} }
case TransportUDPMulticast: case TransportUDPMulticast:
ss.udpCheckStreamTimer = time.NewTimer(ss.s.checkStreamPeriod)
ss.udpReceiverReportTimer = time.NewTimer(ss.s.udpReceiverReportPeriod) ss.udpReceiverReportTimer = time.NewTimer(ss.s.udpReceiverReportPeriod)
default: // TCP default: // TCP
@@ -996,7 +993,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
ss.setuppedStream.readerSetInactive(ss) ss.setuppedStream.readerSetInactive(ss)
ss.state = ServerSessionStatePreRead ss.state = ServerSessionStatePreRead
ss.checkStreamTimer = emptyTimer() ss.udpCheckStreamTimer = emptyTimer()
ss.tcpConn = nil ss.tcpConn = nil
switch *ss.setuppedTransport { switch *ss.setuppedTransport {
@@ -1011,7 +1008,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
case ServerSessionStatePublish: case ServerSessionStatePublish:
ss.state = ServerSessionStatePrePublish ss.state = ServerSessionStatePrePublish
ss.checkStreamTimer = emptyTimer() ss.udpCheckStreamTimer = emptyTimer()
ss.udpReceiverReportTimer = emptyTimer() ss.udpReceiverReportTimer = emptyTimer()
ss.tcpConn = nil ss.tcpConn = nil