diff --git a/connclient.go b/connclient.go index 5f8dd3c2..1cc1d742 100644 --- a/connclient.go +++ b/connclient.go @@ -732,75 +732,82 @@ func (c *ConnClient) Play(u *url.URL) (*base.Response, error) { return res, nil } -// LoopUDP must be called after SetupUDP() and Play(); it keeps +// LoopUDP must be called after Play() or Record(); it keeps // the TCP connection open with keepalives, and returns when the TCP // connection closes. func (c *ConnClient) LoopUDP() error { - if c.state != connClientStateReading { - return fmt.Errorf("can be called only after a successful Play()") + if c.state != connClientStateReading && c.state != connClientStatePublishing { + return fmt.Errorf("can be called only after a successful Play() or Record()") } if *c.streamProtocol != StreamProtocolUDP { return fmt.Errorf("stream protocol is not UDP") } - readDone := make(chan error) - go func() { + if c.state == connClientStateReading { + readDone := make(chan error) + go func() { + for { + c.conf.Conn.SetReadDeadline(time.Now().Add(clientUDPKeepalivePeriod + c.conf.ReadTimeout)) + var res base.Response + err := res.Read(c.br) + if err != nil { + readDone <- err + return + } + } + }() + + keepaliveTicker := time.NewTicker(clientUDPKeepalivePeriod) + defer keepaliveTicker.Stop() + + checkStreamTicker := time.NewTicker(clientUDPCheckStreamPeriod) + defer checkStreamTicker.Stop() + for { - c.conf.Conn.SetReadDeadline(time.Now().Add(clientUDPKeepalivePeriod + c.conf.ReadTimeout)) - var res base.Response - err := res.Read(c.br) - if err != nil { - readDone <- err - return - } - } - }() - - keepaliveTicker := time.NewTicker(clientUDPKeepalivePeriod) - defer keepaliveTicker.Stop() - - checkStreamTicker := time.NewTicker(clientUDPCheckStreamPeriod) - defer checkStreamTicker.Stop() - - for { - select { - case err := <-readDone: - c.conf.Conn.Close() - return err - - case <-keepaliveTicker.C: - _, err := c.Do(&base.Request{ - Method: base.OPTIONS, - Url: &url.URL{ - Scheme: "rtsp", - Host: c.streamUrl.Host, - User: c.streamUrl.User, - // use the stream path, otherwise some cameras do not reply - Path: c.streamUrl.Path, - }, - SkipResponse: true, - }) - if err != nil { + select { + case err := <-readDone: c.conf.Conn.Close() - <-readDone return err - } - case <-checkStreamTicker.C: - now := time.Now() - - for _, lastUnix := range c.udpLastFrameTimes { - last := time.Unix(atomic.LoadInt64(lastUnix), 0) - - if now.Sub(last) >= c.conf.ReadTimeout { + case <-keepaliveTicker.C: + _, err := c.Do(&base.Request{ + Method: base.OPTIONS, + Url: &url.URL{ + Scheme: "rtsp", + Host: c.streamUrl.Host, + User: c.streamUrl.User, + // use the stream path, otherwise some cameras do not reply + Path: c.streamUrl.Path, + }, + SkipResponse: true, + }) + if err != nil { c.conf.Conn.Close() <-readDone - return fmt.Errorf("no packets received recently (maybe there's a firewall/NAT in between)") + return err + } + + case <-checkStreamTicker.C: + now := time.Now() + + for _, lastUnix := range c.udpLastFrameTimes { + last := time.Unix(atomic.LoadInt64(lastUnix), 0) + + if now.Sub(last) >= c.conf.ReadTimeout { + c.conf.Conn.Close() + <-readDone + return fmt.Errorf("no packets received recently (maybe there's a firewall/NAT in between)") + } } } } } + + // connClientStatePublishing + c.conf.Conn.SetReadDeadline(time.Time{}) // disable deadline + var res base.Response + return res.Read(c.br) } // Announce writes an ANNOUNCE request and reads a Response. diff --git a/connclientdial_test.go b/connclientdial_test.go index 7e5ae567..af506730 100644 --- a/connclientdial_test.go +++ b/connclientdial_test.go @@ -161,6 +161,7 @@ func TestConnClientDialPublishUDP(t *testing.T) { var conn *ConnClient defer func() { conn.Close() + conn.LoopUDP() }() go func() { diff --git a/examples/client-publish-tcp.go b/examples/client-publish-tcp.go index 7213ec59..eda640bd 100644 --- a/examples/client-publish-tcp.go +++ b/examples/client-publish-tcp.go @@ -59,6 +59,7 @@ func main() { // write frames to the server err = conn.WriteFrameTCP(track.Id, gortsplib.StreamTypeRtp, buf[:n]) if err != nil { + fmt.Println("connection is closed (%s)", err) break } } diff --git a/examples/client-publish-udp.go b/examples/client-publish-udp.go index 4d792f1b..781f4a6f 100644 --- a/examples/client-publish-udp.go +++ b/examples/client-publish-udp.go @@ -62,4 +62,8 @@ func main() { break } } + + // wait until the connection is closed + err = conn.LoopUDP() + fmt.Println("connection is closed (%s)", err) } diff --git a/examples/client-read-udp.go b/examples/client-read-udp.go index a6e9fab0..a9e44880 100644 --- a/examples/client-read-udp.go +++ b/examples/client-read-udp.go @@ -56,6 +56,7 @@ func main() { }(trackId) } + // wait until the connection is closed err = conn.LoopUDP() fmt.Println("connection is closed (%s)", err) }