detect when the connection with the server is closed when publishing with udp (#6)

This commit is contained in:
aler9
2020-10-16 23:13:24 +02:00
parent b647e5ee31
commit c792387d79
5 changed files with 64 additions and 50 deletions

View File

@@ -732,75 +732,82 @@ func (c *ConnClient) Play(u *url.URL) (*base.Response, error) {
return res, nil return res, nil
} }
// LoopUDP must be called after SetupUDP() and Play(); it keeps // LoopUDP must be called after Play() or Record(); it keeps
// the TCP connection open with keepalives, and returns when the TCP // the TCP connection open with keepalives, and returns when the TCP
// connection closes. // connection closes.
func (c *ConnClient) LoopUDP() error { func (c *ConnClient) LoopUDP() error {
if c.state != connClientStateReading { if c.state != connClientStateReading && c.state != connClientStatePublishing {
return fmt.Errorf("can be called only after a successful Play()") return fmt.Errorf("can be called only after a successful Play() or Record()")
} }
if *c.streamProtocol != StreamProtocolUDP { if *c.streamProtocol != StreamProtocolUDP {
return fmt.Errorf("stream protocol is not UDP") return fmt.Errorf("stream protocol is not UDP")
} }
readDone := make(chan error) if c.state == connClientStateReading {
go func() { readDone := make(chan error)
go func() {
for {
c.conf.Conn.SetReadDeadline(time.Now().Add(clientUDPKeepalivePeriod + c.conf.ReadTimeout))
var res base.Response
err := res.Read(c.br)
if err != nil {
readDone <- err
return
}
}
}()
keepaliveTicker := time.NewTicker(clientUDPKeepalivePeriod)
defer keepaliveTicker.Stop()
checkStreamTicker := time.NewTicker(clientUDPCheckStreamPeriod)
defer checkStreamTicker.Stop()
for { for {
c.conf.Conn.SetReadDeadline(time.Now().Add(clientUDPKeepalivePeriod + c.conf.ReadTimeout)) select {
var res base.Response case err := <-readDone:
err := res.Read(c.br)
if err != nil {
readDone <- err
return
}
}
}()
keepaliveTicker := time.NewTicker(clientUDPKeepalivePeriod)
defer keepaliveTicker.Stop()
checkStreamTicker := time.NewTicker(clientUDPCheckStreamPeriod)
defer checkStreamTicker.Stop()
for {
select {
case err := <-readDone:
c.conf.Conn.Close()
return err
case <-keepaliveTicker.C:
_, err := c.Do(&base.Request{
Method: base.OPTIONS,
Url: &url.URL{
Scheme: "rtsp",
Host: c.streamUrl.Host,
User: c.streamUrl.User,
// use the stream path, otherwise some cameras do not reply
Path: c.streamUrl.Path,
},
SkipResponse: true,
})
if err != nil {
c.conf.Conn.Close() c.conf.Conn.Close()
<-readDone
return err return err
}
case <-checkStreamTicker.C: case <-keepaliveTicker.C:
now := time.Now() _, err := c.Do(&base.Request{
Method: base.OPTIONS,
for _, lastUnix := range c.udpLastFrameTimes { Url: &url.URL{
last := time.Unix(atomic.LoadInt64(lastUnix), 0) Scheme: "rtsp",
Host: c.streamUrl.Host,
if now.Sub(last) >= c.conf.ReadTimeout { User: c.streamUrl.User,
// use the stream path, otherwise some cameras do not reply
Path: c.streamUrl.Path,
},
SkipResponse: true,
})
if err != nil {
c.conf.Conn.Close() c.conf.Conn.Close()
<-readDone <-readDone
return fmt.Errorf("no packets received recently (maybe there's a firewall/NAT in between)") return err
}
case <-checkStreamTicker.C:
now := time.Now()
for _, lastUnix := range c.udpLastFrameTimes {
last := time.Unix(atomic.LoadInt64(lastUnix), 0)
if now.Sub(last) >= c.conf.ReadTimeout {
c.conf.Conn.Close()
<-readDone
return fmt.Errorf("no packets received recently (maybe there's a firewall/NAT in between)")
}
} }
} }
} }
} }
// connClientStatePublishing
c.conf.Conn.SetReadDeadline(time.Time{}) // disable deadline
var res base.Response
return res.Read(c.br)
} }
// Announce writes an ANNOUNCE request and reads a Response. // Announce writes an ANNOUNCE request and reads a Response.

View File

@@ -161,6 +161,7 @@ func TestConnClientDialPublishUDP(t *testing.T) {
var conn *ConnClient var conn *ConnClient
defer func() { defer func() {
conn.Close() conn.Close()
conn.LoopUDP()
}() }()
go func() { go func() {

View File

@@ -59,6 +59,7 @@ func main() {
// write frames to the server // write frames to the server
err = conn.WriteFrameTCP(track.Id, gortsplib.StreamTypeRtp, buf[:n]) err = conn.WriteFrameTCP(track.Id, gortsplib.StreamTypeRtp, buf[:n])
if err != nil { if err != nil {
fmt.Println("connection is closed (%s)", err)
break break
} }
} }

View File

@@ -62,4 +62,8 @@ func main() {
break break
} }
} }
// wait until the connection is closed
err = conn.LoopUDP()
fmt.Println("connection is closed (%s)", err)
} }

View File

@@ -56,6 +56,7 @@ func main() {
}(trackId) }(trackId)
} }
// wait until the connection is closed
err = conn.LoopUDP() err = conn.LoopUDP()
fmt.Println("connection is closed (%s)", err) fmt.Println("connection is closed (%s)", err)
} }