This commit is contained in:
aler9
2021-04-04 19:40:21 +02:00
parent 5847b507d1
commit fc3d84be56
5 changed files with 44 additions and 27 deletions

View File

@@ -28,11 +28,10 @@ import (
) )
const ( const (
clientConnReadBufferSize = 4096 clientConnReadBufferSize = 4096
clientConnWriteBufferSize = 4096 clientConnWriteBufferSize = 4096
clientConnUDPCheckStreamPeriod = 1 * time.Second clientConnCheckStreamPeriod = 1 * time.Second
clientConnUDPKeepalivePeriod = 30 * time.Second clientConnUDPKeepalivePeriod = 30 * time.Second
clientConnTCPSetDeadlinePeriod = 1 * time.Second
) )
type clientConnState int type clientConnState int

View File

@@ -159,19 +159,19 @@ func (cc *ClientConn) backgroundPlayUDP() error {
checkStreamInitial = false checkStreamInitial = false
checkStreamTicker.Stop() checkStreamTicker.Stop()
checkStreamTicker = time.NewTicker(clientConnUDPCheckStreamPeriod) checkStreamTicker = time.NewTicker(clientConnCheckStreamPeriod)
} else { } else {
inTimeout := func() bool { inTimeout := func() bool {
now := time.Now() now := time.Now()
for _, cct := range cc.tracks { for _, cct := range cc.tracks {
lft := atomic.LoadInt64(cct.udpRTPListener.lastFrameTime) lft := time.Unix(atomic.LoadInt64(cct.udpRTPListener.lastFrameTime), 0)
if now.Sub(time.Unix(lft, 0)) < cc.conf.ReadTimeout { if now.Sub(lft) < cc.conf.ReadTimeout {
return false return false
} }
lft = atomic.LoadInt64(cct.udpRTCPListener.lastFrameTime) lft = time.Unix(atomic.LoadInt64(cct.udpRTCPListener.lastFrameTime), 0)
if now.Sub(time.Unix(lft, 0)) < cc.conf.ReadTimeout { if now.Sub(lft) < cc.conf.ReadTimeout {
return false return false
} }
} }
@@ -191,6 +191,13 @@ func (cc *ClientConn) backgroundPlayUDP() error {
} }
func (cc *ClientConn) backgroundPlayTCP() error { func (cc *ClientConn) backgroundPlayTCP() error {
// for some reason, SetReadDeadline() must always be called in the same
// goroutine, otherwise Read() freezes.
// therefore, we disable the deadline and perform check with a ticker.
cc.nconn.SetReadDeadline(time.Time{})
var lastFrameTime int64
readerDone := make(chan error) readerDone := make(chan error)
go func() { go func() {
for { for {
@@ -203,7 +210,9 @@ func (cc *ClientConn) backgroundPlayTCP() error {
return return
} }
cc.tracks[frame.TrackID].rtcpReceiver.ProcessFrame(time.Now(), frame.StreamType, frame.Payload) now := time.Now()
atomic.StoreInt64(&lastFrameTime, now.Unix())
cc.tracks[frame.TrackID].rtcpReceiver.ProcessFrame(now, frame.StreamType, frame.Payload)
cc.readCB(frame.TrackID, frame.StreamType, frame.Payload) cc.readCB(frame.TrackID, frame.StreamType, frame.Payload)
} }
}() }()
@@ -211,17 +220,11 @@ func (cc *ClientConn) backgroundPlayTCP() error {
reportTicker := time.NewTicker(cc.conf.receiverReportPeriod) reportTicker := time.NewTicker(cc.conf.receiverReportPeriod)
defer reportTicker.Stop() defer reportTicker.Stop()
// for some reason, SetReadDeadline() must always be called in the same checkStreamTicker := time.NewTicker(clientConnCheckStreamPeriod)
// goroutine, otherwise Read() freezes. defer checkStreamTicker.Stop()
// therefore, we call it with a ticker.
deadlineTicker := time.NewTicker(clientConnTCPSetDeadlinePeriod)
defer deadlineTicker.Stop()
for { for {
select { select {
case <-deadlineTicker.C:
cc.nconn.SetReadDeadline(time.Now().Add(cc.conf.ReadTimeout))
case <-cc.backgroundTerminate: case <-cc.backgroundTerminate:
cc.nconn.SetReadDeadline(time.Now()) cc.nconn.SetReadDeadline(time.Now())
<-readerDone <-readerDone
@@ -240,6 +243,18 @@ func (cc *ClientConn) backgroundPlayTCP() error {
frame.Write(cc.bw) frame.Write(cc.bw)
} }
case <-checkStreamTicker.C:
inTimeout := func() bool {
now := time.Now()
lft := time.Unix(atomic.LoadInt64(&lastFrameTime), 0)
return now.Sub(lft) >= cc.conf.ReadTimeout
}()
if inTimeout {
cc.nconn.SetReadDeadline(time.Now())
<-readerDone
return liberrors.ErrClientTCPTimeout{}
}
case err := <-readerDone: case err := <-readerDone:
return err return err
} }

View File

@@ -1435,7 +1435,7 @@ func TestClientReadErrorTimeout(t *testing.T) {
require.Equal(t, "UDP timeout", err.Error()) require.Equal(t, "UDP timeout", err.Error())
case "tcp": case "tcp":
require.True(t, strings.HasSuffix(err.Error(), "i/o timeout")) require.Equal(t, "TCP timeout", err.Error())
} }
}) })
} }

View File

@@ -142,7 +142,7 @@ func (e ErrClientNoUDPPacketsRecently) Error() string {
return "no UDP packets received (maybe there's a firewall/NAT in between)" return "no UDP packets received (maybe there's a firewall/NAT in between)"
} }
// ErrClientUDPTimeout is returned when UDP packets have been received previously // ErrClientUDPTimeout is returned when timeout has exceeded but UDP packets have been received previously
// but now nothing is being received. // but now nothing is being received.
type ErrClientUDPTimeout struct{} type ErrClientUDPTimeout struct{}
@@ -151,6 +151,14 @@ func (e ErrClientUDPTimeout) Error() string {
return "UDP timeout" return "UDP timeout"
} }
// ErrClientTCPTimeout is returned when timeout has exceeded.
type ErrClientTCPTimeout struct{}
// Error implements the error interface.
func (e ErrClientTCPTimeout) Error() string {
return "TCP timeout"
}
// ErrClientRTPInfoInvalid is returned in case of an invalid RTP-Info. // ErrClientRTPInfoInvalid is returned in case of an invalid RTP-Info.
type ErrClientRTPInfoInvalid struct { type ErrClientRTPInfoInvalid struct {
Err error Err error

View File

@@ -195,7 +195,7 @@ func (ts *testServ) handleConn(conn *ServerConn) {
} }
} }
err := <-conn.Read(ServerConnReadHandlers{ <-conn.Read(ServerConnReadHandlers{
OnDescribe: onDescribe, OnDescribe: onDescribe,
OnAnnounce: onAnnounce, OnAnnounce: onAnnounce,
OnSetup: onSetup, OnSetup: onSetup,
@@ -203,11 +203,6 @@ func (ts *testServ) handleConn(conn *ServerConn) {
OnRecord: onRecord, OnRecord: onRecord,
OnFrame: onFrame, OnFrame: onFrame,
}) })
if err != io.EOF {
if _, ok := err.(liberrors.ErrServerTeardown); !ok {
fmt.Println("ERR", err)
}
}
ts.mutex.Lock() ts.mutex.Lock()
defer ts.mutex.Unlock() defer ts.mutex.Unlock()