diff --git a/clientconn.go b/clientconn.go index ee2db456..0e9ca530 100644 --- a/clientconn.go +++ b/clientconn.go @@ -28,11 +28,10 @@ import ( ) const ( - clientConnReadBufferSize = 4096 - clientConnWriteBufferSize = 4096 - clientConnUDPCheckStreamPeriod = 1 * time.Second - clientConnUDPKeepalivePeriod = 30 * time.Second - clientConnTCPSetDeadlinePeriod = 1 * time.Second + clientConnReadBufferSize = 4096 + clientConnWriteBufferSize = 4096 + clientConnCheckStreamPeriod = 1 * time.Second + clientConnUDPKeepalivePeriod = 30 * time.Second ) type clientConnState int diff --git a/clientconnread.go b/clientconnread.go index b84be9d7..ed525afb 100644 --- a/clientconnread.go +++ b/clientconnread.go @@ -159,19 +159,19 @@ func (cc *ClientConn) backgroundPlayUDP() error { checkStreamInitial = false checkStreamTicker.Stop() - checkStreamTicker = time.NewTicker(clientConnUDPCheckStreamPeriod) + checkStreamTicker = time.NewTicker(clientConnCheckStreamPeriod) } else { inTimeout := func() bool { now := time.Now() for _, cct := range cc.tracks { - lft := atomic.LoadInt64(cct.udpRTPListener.lastFrameTime) - if now.Sub(time.Unix(lft, 0)) < cc.conf.ReadTimeout { + lft := time.Unix(atomic.LoadInt64(cct.udpRTPListener.lastFrameTime), 0) + if now.Sub(lft) < cc.conf.ReadTimeout { return false } - lft = atomic.LoadInt64(cct.udpRTCPListener.lastFrameTime) - if now.Sub(time.Unix(lft, 0)) < cc.conf.ReadTimeout { + lft = time.Unix(atomic.LoadInt64(cct.udpRTCPListener.lastFrameTime), 0) + if now.Sub(lft) < cc.conf.ReadTimeout { return false } } @@ -191,6 +191,13 @@ func (cc *ClientConn) backgroundPlayUDP() error { } func (cc *ClientConn) backgroundPlayTCP() error { + // for some reason, SetReadDeadline() must always be called in the same + // goroutine, otherwise Read() freezes. + // therefore, we disable the deadline and perform check with a ticker. + cc.nconn.SetReadDeadline(time.Time{}) + + var lastFrameTime int64 + readerDone := make(chan error) go func() { for { @@ -203,7 +210,9 @@ func (cc *ClientConn) backgroundPlayTCP() error { return } - cc.tracks[frame.TrackID].rtcpReceiver.ProcessFrame(time.Now(), frame.StreamType, frame.Payload) + now := time.Now() + atomic.StoreInt64(&lastFrameTime, now.Unix()) + cc.tracks[frame.TrackID].rtcpReceiver.ProcessFrame(now, frame.StreamType, frame.Payload) cc.readCB(frame.TrackID, frame.StreamType, frame.Payload) } }() @@ -211,17 +220,11 @@ func (cc *ClientConn) backgroundPlayTCP() error { reportTicker := time.NewTicker(cc.conf.receiverReportPeriod) defer reportTicker.Stop() - // for some reason, SetReadDeadline() must always be called in the same - // goroutine, otherwise Read() freezes. - // therefore, we call it with a ticker. - deadlineTicker := time.NewTicker(clientConnTCPSetDeadlinePeriod) - defer deadlineTicker.Stop() + checkStreamTicker := time.NewTicker(clientConnCheckStreamPeriod) + defer checkStreamTicker.Stop() for { select { - case <-deadlineTicker.C: - cc.nconn.SetReadDeadline(time.Now().Add(cc.conf.ReadTimeout)) - case <-cc.backgroundTerminate: cc.nconn.SetReadDeadline(time.Now()) <-readerDone @@ -240,6 +243,18 @@ func (cc *ClientConn) backgroundPlayTCP() error { frame.Write(cc.bw) } + case <-checkStreamTicker.C: + inTimeout := func() bool { + now := time.Now() + lft := time.Unix(atomic.LoadInt64(&lastFrameTime), 0) + return now.Sub(lft) >= cc.conf.ReadTimeout + }() + if inTimeout { + cc.nconn.SetReadDeadline(time.Now()) + <-readerDone + return liberrors.ErrClientTCPTimeout{} + } + case err := <-readerDone: return err } diff --git a/clientconnread_test.go b/clientconnread_test.go index 6d241f54..8432678a 100644 --- a/clientconnread_test.go +++ b/clientconnread_test.go @@ -1435,7 +1435,7 @@ func TestClientReadErrorTimeout(t *testing.T) { require.Equal(t, "UDP timeout", err.Error()) case "tcp": - require.True(t, strings.HasSuffix(err.Error(), "i/o timeout")) + require.Equal(t, "TCP timeout", err.Error()) } }) } diff --git a/pkg/liberrors/client.go b/pkg/liberrors/client.go index 9c0a1540..a98c32db 100644 --- a/pkg/liberrors/client.go +++ b/pkg/liberrors/client.go @@ -142,7 +142,7 @@ func (e ErrClientNoUDPPacketsRecently) Error() string { return "no UDP packets received (maybe there's a firewall/NAT in between)" } -// ErrClientUDPTimeout is returned when UDP packets have been received previously +// ErrClientUDPTimeout is returned when timeout has exceeded but UDP packets have been received previously // but now nothing is being received. type ErrClientUDPTimeout struct{} @@ -151,6 +151,14 @@ func (e ErrClientUDPTimeout) Error() string { return "UDP timeout" } +// ErrClientTCPTimeout is returned when timeout has exceeded. +type ErrClientTCPTimeout struct{} + +// Error implements the error interface. +func (e ErrClientTCPTimeout) Error() string { + return "TCP timeout" +} + // ErrClientRTPInfoInvalid is returned in case of an invalid RTP-Info. type ErrClientRTPInfoInvalid struct { Err error diff --git a/serverconn_test.go b/serverconn_test.go index c4cd3a9c..4077e26a 100644 --- a/serverconn_test.go +++ b/serverconn_test.go @@ -195,7 +195,7 @@ func (ts *testServ) handleConn(conn *ServerConn) { } } - err := <-conn.Read(ServerConnReadHandlers{ + <-conn.Read(ServerConnReadHandlers{ OnDescribe: onDescribe, OnAnnounce: onAnnounce, OnSetup: onSetup, @@ -203,11 +203,6 @@ func (ts *testServ) handleConn(conn *ServerConn) { OnRecord: onRecord, OnFrame: onFrame, }) - if err != io.EOF { - if _, ok := err.(liberrors.ErrServerTeardown); !ok { - fmt.Println("ERR", err) - } - } ts.mutex.Lock() defer ts.mutex.Unlock()