server: use timers only when needed

This commit is contained in:
aler9
2021-11-15 12:20:04 +01:00
parent b81267e310
commit 1a599b1400
3 changed files with 35 additions and 26 deletions

View File

@@ -38,12 +38,6 @@ const (
clientUDPKeepalivePeriod = 30 * time.Second clientUDPKeepalivePeriod = 30 * time.Second
) )
func emptyTimer() *time.Timer {
t := time.NewTimer(0)
<-t.C
return t
}
func isAnyPort(p int) bool { func isAnyPort(p int) bool {
return p == 0 || p == 1 return p == 0 || p == 1
} }

11
emptytimer.go Normal file
View File

@@ -0,0 +1,11 @@
package gortsplib
import (
"time"
)
func emptyTimer() *time.Timer {
t := time.NewTimer(0)
<-t.C
return t
}

View File

@@ -172,6 +172,8 @@ type ServerSession struct {
tcpConn *ServerConn // tcp tcpConn *ServerConn // tcp
announcedTracks []ServerSessionAnnouncedTrack // publish announcedTracks []ServerSessionAnnouncedTrack // publish
udpLastFrameTime *int64 // publish, udp udpLastFrameTime *int64 // publish, udp
checkStreamTimer *time.Timer
receiverReportTimer *time.Timer
// in // in
request chan sessionRequestReq request chan sessionRequestReq
@@ -186,15 +188,17 @@ func newServerSession(
ctx, ctxCancel := context.WithCancel(s.ctx) ctx, ctxCancel := context.WithCancel(s.ctx)
ss := &ServerSession{ ss := &ServerSession{
s: s, s: s,
secretID: secretID, secretID: secretID,
author: author, author: author,
ctx: ctx, ctx: ctx,
ctxCancel: ctxCancel, ctxCancel: ctxCancel,
conns: make(map[*ServerConn]struct{}), conns: make(map[*ServerConn]struct{}),
lastRequestTime: time.Now(), lastRequestTime: time.Now(),
request: make(chan sessionRequestReq), checkStreamTimer: emptyTimer(),
connRemove: make(chan *ServerConn), receiverReportTimer: emptyTimer(),
request: make(chan sessionRequestReq),
connRemove: make(chan *ServerConn),
} }
s.wg.Add(1) s.wg.Add(1)
@@ -254,12 +258,6 @@ func (ss *ServerSession) run() {
} }
err := func() error { err := func() error {
checkTimeoutTicker := time.NewTicker(serverSessionCheckStreamPeriod)
defer checkTimeoutTicker.Stop()
receiverReportTicker := time.NewTicker(ss.s.receiverReportPeriod)
defer receiverReportTicker.Stop()
for { for {
select { select {
case req := <-ss.request: case req := <-ss.request:
@@ -309,7 +307,7 @@ func (ss *ServerSession) run() {
} }
} }
case <-checkTimeoutTicker.C: case <-ss.checkStreamTimer.C:
switch { switch {
// in case of RECORD and UDP, timeout happens when no RTP or RTCP packets are being received // in case of RECORD and UDP, timeout happens when no RTP or RTCP packets are being received
case ss.state == ServerSessionStatePublish && (*ss.setuppedTransport == TransportUDP || case ss.state == ServerSessionStatePublish && (*ss.setuppedTransport == TransportUDP ||
@@ -331,17 +329,17 @@ func (ss *ServerSession) run() {
// in case of TCP, there's no timeout until all associated connections are closed // in case of TCP, there's no timeout until all associated connections are closed
} }
case <-receiverReportTicker.C: ss.checkStreamTimer = time.NewTimer(serverSessionCheckStreamPeriod)
if ss.state != ServerSessionStatePublish {
continue
}
case <-ss.receiverReportTimer.C:
now := time.Now() now := time.Now()
for trackID, track := range ss.announcedTracks { for trackID, track := range ss.announcedTracks {
r := track.rtcpReceiver.Report(now) r := track.rtcpReceiver.Report(now)
ss.WritePacketRTCP(trackID, r) ss.WritePacketRTCP(trackID, r)
} }
ss.receiverReportTimer = time.NewTimer(ss.s.receiverReportPeriod)
case <-ss.ctx.Done(): case <-ss.ctx.Done():
return liberrors.ErrServerTerminated{} return liberrors.ErrServerTerminated{}
} }
@@ -812,6 +810,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
} }
ss.state = ServerSessionStateRead ss.state = ServerSessionStateRead
ss.checkStreamTimer = time.NewTimer(serverSessionCheckStreamPeriod)
if *ss.setuppedTransport == TransportTCP { if *ss.setuppedTransport == TransportTCP {
ss.tcpConn = sc ss.tcpConn = sc
@@ -930,6 +929,8 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
} }
ss.state = ServerSessionStatePublish ss.state = ServerSessionStatePublish
ss.checkStreamTimer = time.NewTimer(serverSessionCheckStreamPeriod)
ss.receiverReportTimer = time.NewTimer(ss.s.receiverReportPeriod)
switch *ss.setuppedTransport { switch *ss.setuppedTransport {
case TransportUDP: case TransportUDP:
@@ -994,6 +995,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
ss.setuppedStream.readerSetInactive(ss) ss.setuppedStream.readerSetInactive(ss)
ss.state = ServerSessionStatePreRead ss.state = ServerSessionStatePreRead
ss.checkStreamTimer = emptyTimer()
ss.tcpConn = nil ss.tcpConn = nil
switch *ss.setuppedTransport { switch *ss.setuppedTransport {
@@ -1008,6 +1010,8 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
case ServerSessionStatePublish: case ServerSessionStatePublish:
ss.state = ServerSessionStatePrePublish ss.state = ServerSessionStatePrePublish
ss.checkStreamTimer = emptyTimer()
ss.receiverReportTimer = emptyTimer()
ss.tcpConn = nil ss.tcpConn = nil
switch *ss.setuppedTransport { switch *ss.setuppedTransport {