diff --git a/writer.go b/async_processor.go similarity index 72% rename from writer.go rename to async_processor.go index ade274f4..1bab2a29 100644 --- a/writer.go +++ b/async_processor.go @@ -6,24 +6,24 @@ import ( // this struct contains a queue that allows to detach the routine that is reading a stream // from the routine that is writing a stream. -type writer struct { +type asyncProcessor struct { running bool buffer *ringbuffer.RingBuffer done chan struct{} } -func (w *writer) allocateBuffer(size int) { +func (w *asyncProcessor) allocateBuffer(size int) { w.buffer, _ = ringbuffer.New(uint64(size)) } -func (w *writer) start() { +func (w *asyncProcessor) start() { w.running = true w.done = make(chan struct{}) go w.run() } -func (w *writer) stop() { +func (w *asyncProcessor) stop() { if w.running { w.buffer.Close() <-w.done @@ -31,7 +31,7 @@ func (w *writer) stop() { } } -func (w *writer) run() { +func (w *asyncProcessor) run() { defer close(w.done) for { @@ -44,6 +44,6 @@ func (w *writer) run() { } } -func (w *writer) queue(cb func()) { +func (w *asyncProcessor) queue(cb func()) { w.buffer.Push(cb) } diff --git a/client.go b/client.go index 6b4c1840..576aec67 100644 --- a/client.go +++ b/client.go @@ -232,49 +232,45 @@ type Client struct { senderReportPeriod time.Duration udpReceiverReportPeriod time.Duration - checkStreamPeriod time.Duration + checkTimeoutPeriod time.Duration keepalivePeriod time.Duration - scheme string - host string - ctx context.Context - ctxCancel func() - state clientState - nconn net.Conn - conn *conn.Conn - session string - sender *auth.Sender - cseq int - optionsSent bool - useGetParameter bool - lastDescribeURL *url.URL - baseURL *url.URL - effectiveTransport *Transport - medias map[*media.Media]*clientMedia - tcpMediasByChannel map[int]*clientMedia - lastRange *headers.Range - checkStreamTimer *time.Timer - checkStreamInitial bool - tcpLastFrameTime *int64 - keepaliveTimer *time.Timer - closeError error - writer writer - - // connCloser channels - connCloserTerminate chan struct{} - connCloserDone chan struct{} - - // reader channels - readerErr chan error + scheme string + host string + ctx context.Context + ctxCancel func() + state clientState + nconn net.Conn + conn *conn.Conn + session string + sender *auth.Sender + cseq int + optionsSent bool + useGetParameter bool + lastDescribeURL *url.URL + baseURL *url.URL + effectiveTransport *Transport + medias map[*media.Media]*clientMedia + tcpMediasByChannel map[int]*clientMedia + lastRange *headers.Range + checkTimeoutTimer *time.Timer + checkTimeoutInitial bool + tcpLastFrameTime *int64 + keepaliveTimer *time.Timer + closeError error + writer asyncProcessor + reader *clientReader + connCloser *clientConnCloser // in - options chan optionsReq - describe chan describeReq - announce chan announceReq - setup chan setupReq - play chan playReq - record chan recordReq - pause chan pauseReq + options chan optionsReq + describe chan describeReq + announce chan announceReq + setup chan setupReq + play chan playReq + record chan recordReq + pause chan pauseReq + readError chan error // out done chan struct{} @@ -364,8 +360,8 @@ func (c *Client) Start(scheme string, host string) error { // some cameras require a maximum of 5secs between keepalives c.udpReceiverReportPeriod = 5 * time.Second } - if c.checkStreamPeriod == 0 { - c.checkStreamPeriod = 1 * time.Second + if c.checkTimeoutPeriod == 0 { + c.checkTimeoutPeriod = 1 * time.Second } if c.keepalivePeriod == 0 { c.keepalivePeriod = 30 * time.Second @@ -377,7 +373,7 @@ func (c *Client) Start(scheme string, host string) error { c.host = host c.ctx = ctx c.ctxCancel = ctxCancel - c.checkStreamTimer = emptyTimer() + c.checkTimeoutTimer = emptyTimer() c.keepaliveTimer = emptyTimer() c.options = make(chan optionsReq) c.describe = make(chan describeReq) @@ -386,6 +382,7 @@ func (c *Client) Start(scheme string, host string) error { c.play = make(chan playReq) c.record = make(chan recordReq) c.pause = make(chan pauseReq) + c.readError = make(chan error) c.done = make(chan struct{}) go c.run() @@ -481,86 +478,22 @@ func (c *Client) runInner() error { res, err := c.doPause() req.res <- clientRes{res: res, err: err} - case <-c.checkStreamTimer.C: - if *c.effectiveTransport == TransportUDP || - *c.effectiveTransport == TransportUDPMulticast { - if c.checkStreamInitial { - c.checkStreamInitial = false - - // check that at least one packet has been received - inTimeout := func() bool { - for _, ct := range c.medias { - lft := atomic.LoadInt64(ct.udpRTPListener.lastPacketTime) - if lft != 0 { - return false - } - - lft = atomic.LoadInt64(ct.udpRTCPListener.lastPacketTime) - if lft != 0 { - return false - } - } - return true - }() - if inTimeout { - err := c.trySwitchingProtocol() - if err != nil { - return err - } - } - } else { - inTimeout := func() bool { - now := time.Now() - for _, ct := range c.medias { - lft := time.Unix(atomic.LoadInt64(ct.udpRTPListener.lastPacketTime), 0) - if now.Sub(lft) < c.ReadTimeout { - return false - } - - lft = time.Unix(atomic.LoadInt64(ct.udpRTCPListener.lastPacketTime), 0) - if now.Sub(lft) < c.ReadTimeout { - return false - } - } - return true - }() - if inTimeout { - return liberrors.ErrClientUDPTimeout{} - } - } - } else { // TCP - inTimeout := func() bool { - now := time.Now() - lft := time.Unix(atomic.LoadInt64(c.tcpLastFrameTime), 0) - return now.Sub(lft) >= c.ReadTimeout - }() - if inTimeout { - return liberrors.ErrClientTCPTimeout{} - } - } - - c.checkStreamTimer = time.NewTimer(c.checkStreamPeriod) - - case <-c.keepaliveTimer.C: - _, err := c.do(&base.Request{ - Method: func() base.Method { - // the VLC integrated rtsp server requires GET_PARAMETER - if c.useGetParameter { - return base.GetParameter - } - return base.Options - }(), - // use the stream base URL, otherwise some cameras do not reply - URL: c.baseURL, - }, true, false) + case <-c.checkTimeoutTimer.C: + err := c.checkTimeout() if err != nil { return err } + c.checkTimeoutTimer = time.NewTimer(c.checkTimeoutPeriod) + case <-c.keepaliveTimer.C: + err := c.doKeepalive() + if err != nil { + return err + } c.keepaliveTimer = time.NewTimer(c.keepalivePeriod) - case err := <-c.readerErr: - c.readerErr = nil + case err := <-c.readError: + c.reader = nil return err case <-c.ctx.Done(): @@ -571,7 +504,8 @@ func (c *Client) runInner() error { func (c *Client) doClose() { if c.state != clientStatePlay && c.state != clientStateRecord && c.conn != nil { - c.connCloserStop() + c.connCloser.close() + c.connCloser = nil } if c.state == clientStatePlay || c.state == clientStateRecord { @@ -690,22 +624,22 @@ func (c *Client) trySwitchingProtocol2(medi *media.Media, baseURL *url.URL) (*ba } func (c *Client) playRecordStart() { - // stop connCloser - c.connCloserStop() + c.connCloser.close() + c.connCloser = nil if c.state == clientStatePlay { c.keepaliveTimer = time.NewTimer(c.keepalivePeriod) switch *c.effectiveTransport { case TransportUDP: - c.checkStreamTimer = time.NewTimer(c.InitialUDPReadTimeout) - c.checkStreamInitial = true + c.checkTimeoutTimer = time.NewTimer(c.InitialUDPReadTimeout) + c.checkTimeoutInitial = true case TransportUDPMulticast: - c.checkStreamTimer = time.NewTimer(c.checkStreamPeriod) + c.checkTimeoutTimer = time.NewTimer(c.checkTimeoutPeriod) default: // TCP - c.checkStreamTimer = time.NewTimer(c.checkStreamPeriod) + c.checkTimeoutTimer = time.NewTimer(c.checkTimeoutPeriod) v := time.Now().Unix() c.tcpLastFrameTime = &v } @@ -726,68 +660,18 @@ func (c *Client) playRecordStart() { cm.start() } - // for some reason, SetReadDeadline() must always be called in the same - // goroutine, otherwise Read() freezes. - // therefore, we disable the deadline and perform a check with a ticker. - c.nconn.SetReadDeadline(time.Time{}) - - // start reader - c.readerErr = make(chan error) - go c.runReader() -} - -func (c *Client) runReader() { - c.readerErr <- func() error { - if *c.effectiveTransport == TransportUDP || *c.effectiveTransport == TransportUDPMulticast { - for { - _, err := c.conn.ReadResponse() - if err != nil { - return err - } - } - } else { - for { - what, err := c.conn.ReadInterleavedFrameOrResponse() - if err != nil { - return err - } - - if fr, ok := what.(*base.InterleavedFrame); ok { - channel := fr.Channel - isRTP := true - if (channel % 2) != 0 { - channel-- - isRTP = false - } - - media, ok := c.tcpMediasByChannel[channel] - if !ok { - continue - } - - if isRTP { - err = media.readRTP(fr.Payload) - } else { - err = media.readRTCP(fr.Payload) - } - if err != nil { - return err - } - } - } - } - }() + c.reader = newClientReader(c) } func (c *Client) playRecordStop(isClosing bool) { - // stop reader - if c.readerErr != nil { - c.nconn.SetReadDeadline(time.Now()) - <-c.readerErr + if c.reader != nil { + c.reader.close() + <-c.readError + c.reader = nil } // stop timers - c.checkStreamTimer = emptyTimer() + c.checkTimeoutTimer = emptyTimer() c.keepaliveTimer = emptyTimer() c.writer.stop() @@ -796,9 +680,8 @@ func (c *Client) playRecordStop(isClosing bool) { cm.stop() } - // start connCloser if !isClosing { - c.connCloserStart() + c.connCloser = newClientConnCloser(c.ctx, c.nconn) } } @@ -821,10 +704,10 @@ func (c *Client) connOpen() error { } } - ctx, cancel := context.WithTimeout(c.ctx, c.ReadTimeout) - defer cancel() + dialCtx, dialCtxCancel := context.WithTimeout(c.ctx, c.ReadTimeout) + defer dialCtxCancel() - nconn, err := c.DialContext(ctx, "tcp", c.host) + nconn, err := c.DialContext(dialCtx, "tcp", c.host) if err != nil { return err } @@ -845,33 +728,11 @@ func (c *Client) connOpen() error { c.nconn = nconn bc := bytecounter.New(c.nconn, c.BytesReceived, c.BytesSent) c.conn = conn.NewConn(bc) + c.connCloser = newClientConnCloser(c.ctx, c.nconn) - c.connCloserStart() return nil } -func (c *Client) connCloserStart() { - c.connCloserTerminate = make(chan struct{}) - c.connCloserDone = make(chan struct{}) - - go func() { - defer close(c.connCloserDone) - - select { - case <-c.ctx.Done(): - c.nconn.Close() - - case <-c.connCloserTerminate: - } - }() -} - -func (c *Client) connCloserStop() { - close(c.connCloserTerminate) - <-c.connCloserDone - c.connCloserDone = nil -} - func (c *Client) do(req *base.Request, skipResponse bool, allowFrames bool) (*base.Response, error) { if c.nconn == nil { err := c.connOpen() @@ -964,6 +825,82 @@ func (c *Client) do(req *base.Request, skipResponse bool, allowFrames bool) (*ba return res, nil } +func (c *Client) checkTimeout() error { + if *c.effectiveTransport == TransportUDP || + *c.effectiveTransport == TransportUDPMulticast { + if c.checkTimeoutInitial { + c.checkTimeoutInitial = false + + // check that at least one packet has been received + inTimeout := func() bool { + for _, ct := range c.medias { + lft := atomic.LoadInt64(ct.udpRTPListener.lastPacketTime) + if lft != 0 { + return false + } + + lft = atomic.LoadInt64(ct.udpRTCPListener.lastPacketTime) + if lft != 0 { + return false + } + } + return true + }() + if inTimeout { + err := c.trySwitchingProtocol() + if err != nil { + return err + } + } + } else { + inTimeout := func() bool { + now := time.Now() + for _, ct := range c.medias { + lft := time.Unix(atomic.LoadInt64(ct.udpRTPListener.lastPacketTime), 0) + if now.Sub(lft) < c.ReadTimeout { + return false + } + + lft = time.Unix(atomic.LoadInt64(ct.udpRTCPListener.lastPacketTime), 0) + if now.Sub(lft) < c.ReadTimeout { + return false + } + } + return true + }() + if inTimeout { + return liberrors.ErrClientUDPTimeout{} + } + } + } else { // TCP + inTimeout := func() bool { + now := time.Now() + lft := time.Unix(atomic.LoadInt64(c.tcpLastFrameTime), 0) + return now.Sub(lft) >= c.ReadTimeout + }() + if inTimeout { + return liberrors.ErrClientTCPTimeout{} + } + } + + return nil +} + +func (c *Client) doKeepalive() error { + _, err := c.do(&base.Request{ + Method: func() base.Method { + // the VLC integrated rtsp server requires GET_PARAMETER + if c.useGetParameter { + return base.GetParameter + } + return base.Options + }(), + // use the stream base URL, otherwise some cameras do not reply + URL: c.baseURL, + }, true, false) + return err +} + func (c *Client) doOptions(u *url.URL) (*base.Response, error) { err := c.checkState(map[clientState]struct{}{ clientStateInitial: {}, diff --git a/client_conn_closer.go b/client_conn_closer.go new file mode 100644 index 00000000..3623109b --- /dev/null +++ b/client_conn_closer.go @@ -0,0 +1,43 @@ +package gortsplib + +import ( + "context" + "net" +) + +type clientConnCloser struct { + ctx context.Context + nconn net.Conn + + terminate chan struct{} + done chan struct{} +} + +func newClientConnCloser(ctx context.Context, nconn net.Conn) *clientConnCloser { + cc := &clientConnCloser{ + ctx: ctx, + nconn: nconn, + terminate: make(chan struct{}), + done: make(chan struct{}), + } + + go cc.run() + + return cc +} + +func (cc *clientConnCloser) close() { + close(cc.terminate) + <-cc.done +} + +func (cc *clientConnCloser) run() { + defer close(cc.done) + + select { + case <-cc.ctx.Done(): + cc.nconn.Close() + + case <-cc.terminate: + } +} diff --git a/client_reader.go b/client_reader.go new file mode 100644 index 00000000..1218b6ba --- /dev/null +++ b/client_reader.go @@ -0,0 +1,83 @@ +package gortsplib + +import ( + "time" + + "github.com/bluenviron/gortsplib/v3/pkg/base" +) + +type clientReader struct { + c *Client + closeErr chan error +} + +func newClientReader(c *Client) *clientReader { + r := &clientReader{ + c: c, + closeErr: make(chan error), + } + + // for some reason, SetReadDeadline() must always be called in the same + // goroutine, otherwise Read() freezes. + // therefore, we disable the deadline and perform a check with a ticker. + r.c.nconn.SetReadDeadline(time.Time{}) + + go r.run() + + return r +} + +func (r *clientReader) close() { + r.c.nconn.SetReadDeadline(time.Now()) +} + +func (r *clientReader) run() { + r.c.readError <- r.runInner() +} + +func (r *clientReader) runInner() error { + if *r.c.effectiveTransport == TransportUDP || *r.c.effectiveTransport == TransportUDPMulticast { + for { + res, err := r.c.conn.ReadResponse() + if err != nil { + return err + } + + r.c.OnResponse(res) + } + } else { + for { + what, err := r.c.conn.ReadInterleavedFrameOrResponse() + if err != nil { + return err + } + + switch what := what.(type) { + case *base.Response: + r.c.OnResponse(what) + + case *base.InterleavedFrame: + channel := what.Channel + isRTP := true + if (channel % 2) != 0 { + channel-- + isRTP = false + } + + media, ok := r.c.tcpMediasByChannel[channel] + if !ok { + continue + } + + if isRTP { + err = media.readRTP(what.Payload) + } else { + err = media.readRTCP(what.Payload) + } + if err != nil { + return err + } + } + } + } +} diff --git a/server_session.go b/server_session.go index 424e9ad2..641d7179 100644 --- a/server_session.go +++ b/server_session.go @@ -189,7 +189,7 @@ type ServerSession struct { announcedMedias media.Medias // publish udpLastPacketTime *int64 // publish udpCheckStreamTimer *time.Timer - writer writer + writer asyncProcessor // in chHandleRequest chan sessionRequestReq