diff --git a/client.go b/client.go index 2682ad89..1cb25d0a 100644 --- a/client.go +++ b/client.go @@ -38,9 +38,10 @@ const ( clientUDPKeepalivePeriod = 30 * time.Second ) -func isErrNOUDPPacketsReceivedRecently(err error) bool { - _, ok := err.(liberrors.ErrClientNoUDPPacketsRecently) - return ok +func emptyTimer() *time.Timer { + t := time.NewTimer(0) + <-t.C + return t } func isAnyPort(p int) bool { @@ -197,44 +198,47 @@ type Client struct { senderReportPeriod time.Duration receiverReportPeriod time.Duration - scheme string - host string - ctx context.Context - ctxCancel func() - state clientState - nconn net.Conn - br *bufio.Reader - bw *bufio.Writer - session string - sender *auth.Sender - cseq int - useGetParameter bool - streamBaseURL *base.URL - protocol *Transport - tracks map[int]clientTrack - tracksByChannel map[int]int - lastRange *headers.Range - backgroundRunning bool - backgroundErr error - tcpFrameBuffer *multibuffer.MultiBuffer // tcp - tcpWriteMutex sync.Mutex // tcp - writeMutex sync.RWMutex // write - writeFrameAllowed bool // write + scheme string + host string + ctx context.Context + ctxCancel func() + state clientState + nconn net.Conn + br *bufio.Reader + bw *bufio.Writer + session string + sender *auth.Sender + cseq int + useGetParameter bool + streamBaseURL *base.URL + protocol *Transport + tracks map[int]clientTrack + tracksByChannel map[int]int + lastRange *headers.Range + tcpFrameBuffer *multibuffer.MultiBuffer // tcp + tcpWriteMutex sync.Mutex // tcp + writeMutex sync.RWMutex // write + writeFrameAllowed bool // write + reportTimer *time.Timer + checkStreamTimer *time.Timer + checkStreamInitial bool + tcpLastFrameTime int64 + keepaliveTimer *time.Timer + readerRunning bool + finalErr error // in - options chan optionsReq - describe chan describeReq - announce chan announceReq - setup chan setupReq - play chan playReq - record chan recordReq - pause chan pauseReq - backgroundTerminate chan struct{} + options chan optionsReq + describe chan describeReq + announce chan announceReq + setup chan setupReq + play chan playReq + record chan recordReq + pause chan pauseReq // out - backgroundInnerDone chan error - backgroundDone chan struct{} - done chan struct{} + readerErr chan error + done chan struct{} } // Dial connects to a server. @@ -291,6 +295,9 @@ func (c *Client) Dial(scheme string, host string) error { c.host = host c.ctx = ctx c.ctxCancel = ctxCancel + c.reportTimer = emptyTimer() + c.checkStreamTimer = emptyTimer() + c.keepaliveTimer = emptyTimer() c.options = make(chan optionsReq) c.describe = make(chan describeReq) c.announce = make(chan announceReq) @@ -416,65 +423,159 @@ func (c *Client) Tracks() Tracks { func (c *Client) run() { defer close(c.done) -outer: - for { - select { - case req := <-c.options: - res, err := c.doOptions(req.url) - req.res <- clientRes{res: res, err: err} + c.finalErr = func() error { + for { + select { + case req := <-c.options: + res, err := c.doOptions(req.url) + req.res <- clientRes{res: res, err: err} - case req := <-c.describe: - tracks, baseURL, res, err := c.doDescribe(req.url) - req.res <- clientRes{tracks: tracks, baseURL: baseURL, res: res, err: err} + case req := <-c.describe: + tracks, baseURL, res, err := c.doDescribe(req.url) + req.res <- clientRes{tracks: tracks, baseURL: baseURL, res: res, err: err} - case req := <-c.announce: - res, err := c.doAnnounce(req.url, req.tracks) - req.res <- clientRes{res: res, err: err} + case req := <-c.announce: + res, err := c.doAnnounce(req.url, req.tracks) + req.res <- clientRes{res: res, err: err} - case req := <-c.setup: - res, err := c.doSetup(req.mode, req.baseURL, req.track, req.rtpPort, req.rtcpPort) - req.res <- clientRes{res: res, err: err} + case req := <-c.setup: + res, err := c.doSetup(req.mode, req.baseURL, req.track, req.rtpPort, req.rtcpPort) + req.res <- clientRes{res: res, err: err} - case req := <-c.play: - res, err := c.doPlay(req.ra, false) - req.res <- clientRes{res: res, err: err} + case req := <-c.play: + res, err := c.doPlay(req.ra, false) + req.res <- clientRes{res: res, err: err} - case req := <-c.record: - res, err := c.doRecord() - req.res <- clientRes{res: res, err: err} + case req := <-c.record: + res, err := c.doRecord() + req.res <- clientRes{res: res, err: err} - case req := <-c.pause: - res, err := c.doPause() - req.res <- clientRes{res: res, err: err} + case req := <-c.pause: + res, err := c.doPause() + req.res <- clientRes{res: res, err: err} - case err := <-c.backgroundInnerDone: - c.backgroundRunning = false - err = c.switchProtocolIfTimeout(err) - if err != nil { - c.backgroundErr = err - close(c.backgroundDone) + case <-c.reportTimer.C: + if c.state == clientStatePlay { + now := time.Now() + for trackID, cct := range c.tracks { + rr := cct.rtcpReceiver.Report(now) + c.WritePacketRTCP(trackID, rr) + } - c.writeMutex.Lock() - c.writeFrameAllowed = false - c.writeMutex.Unlock() + c.reportTimer = time.NewTimer(c.receiverReportPeriod) + } else { // Record + now := time.Now() + for trackID, cct := range c.tracks { + sr := cct.rtcpSender.Report(now) + if sr != nil { + c.WritePacketRTCP(trackID, sr) + } + } + + c.reportTimer = time.NewTimer(c.senderReportPeriod) + } + + case <-c.checkStreamTimer.C: + if *c.protocol == TransportUDP || + *c.protocol == TransportUDPMulticast { + if c.checkStreamInitial { + c.checkStreamInitial = false + + // check that at least one packet has been received + inTimeout := func() bool { + for _, cct := range c.tracks { + lft := atomic.LoadInt64(cct.udpRTPListener.lastFrameTime) + if lft != 0 { + return false + } + + lft = atomic.LoadInt64(cct.udpRTCPListener.lastFrameTime) + if lft != 0 { + return false + } + } + return true + }() + if inTimeout { + err := c.trySwitchingProtocol() + if err != nil { + return err + } + } + + c.checkStreamTimer = time.NewTimer(clientCheckStreamPeriod) + } else { + inTimeout := func() bool { + now := time.Now() + for _, cct := range c.tracks { + lft := time.Unix(atomic.LoadInt64(cct.udpRTPListener.lastFrameTime), 0) + if now.Sub(lft) < c.ReadTimeout { + return false + } + + lft = time.Unix(atomic.LoadInt64(cct.udpRTCPListener.lastFrameTime), 0) + if now.Sub(lft) < c.ReadTimeout { + return false + } + } + return true + }() + if inTimeout { + return liberrors.ErrClientUDPTimeout{} + } + + c.checkStreamTimer = time.NewTimer(clientCheckStreamPeriod) + } + } else { // TCP + inTimeout := func() bool { + now := time.Now() + lft := time.Unix(atomic.LoadInt64(&c.tcpLastFrameTime), 0) + return now.Sub(lft) >= c.ReadTimeout + }() + if inTimeout { + return liberrors.ErrClientTCPTimeout{} + } + + c.checkStreamTimer = time.NewTimer(clientCheckStreamPeriod) + } + + case <-c.keepaliveTimer.C: + _, err := c.do(&base.Request{ + Method: func() base.Method { + // the vlc integrated rtsp server requires GET_PARAMETER + if c.useGetParameter { + return base.GetParameter + } + return base.Options + }(), + // use the stream base URL, otherwise some cameras do not reply + URL: c.streamBaseURL, + }, true) + if err != nil { + return err + } + + c.keepaliveTimer = time.NewTimer(clientUDPKeepalivePeriod) + + case err := <-c.readerErr: + c.readerRunning = false + return err + + case <-c.ctx.Done(): + return liberrors.ErrClientTerminated{} } - - case <-c.ctx.Done(): - break outer } - } + }() c.ctxCancel() - c.doClose(false) + c.doClose() } -func (c *Client) doClose(isSwitchingProtocol bool) { - if c.backgroundRunning { - c.backgroundClose(isSwitchingProtocol) - } - +func (c *Client) doClose() { if c.state == clientStatePlay || c.state == clientStateRecord { + c.playRecordClose() + c.do(&base.Request{ Method: base.Teardown, URL: c.streamBaseURL, @@ -494,8 +595,8 @@ func (c *Client) doClose(isSwitchingProtocol bool) { } } -func (c *Client) reset(isSwitchingProtocol bool) { - c.doClose(isSwitchingProtocol) +func (c *Client) reset() { + c.doClose() c.state = clientStateInitial c.session = "" @@ -524,19 +625,12 @@ func (c *Client) checkState(allowed map[clientState]struct{}) error { return liberrors.ErrClientInvalidState{AllowedList: allowedList, State: c.state} } -func (c *Client) switchProtocolIfTimeout(err error) error { - if *c.protocol != TransportUDP || - c.state != clientStatePlay || - !isErrNOUDPPacketsReceivedRecently(err) || - c.Transport != nil { - return err - } - +func (c *Client) trySwitchingProtocol() error { prevBaseURL := c.streamBaseURL oldUseGetParameter := c.useGetParameter prevTracks := c.tracks - c.reset(true) + c.reset() v := TransportTCP c.protocol = &v @@ -551,7 +645,7 @@ func (c *Client) switchProtocolIfTimeout(err error) error { } } - _, err = c.doPlay(c.lastRange, true) + _, err := c.doPlay(c.lastRange, true) if err != nil { return err } @@ -559,373 +653,153 @@ func (c *Client) switchProtocolIfTimeout(err error) error { return nil } -func (c *Client) backgroundStart(isSwitchingProtocol bool) { +func (c *Client) playRecordStart() { + // start timers + if c.state == clientStatePlay { + c.reportTimer = time.NewTimer(c.receiverReportPeriod) + + switch *c.protocol { + case TransportUDP: + c.checkStreamTimer = time.NewTimer(c.InitialUDPReadTimeout) + c.checkStreamInitial = true + c.keepaliveTimer = time.NewTimer(clientUDPKeepalivePeriod) + + case TransportUDPMulticast: + c.checkStreamTimer = time.NewTimer(clientCheckStreamPeriod) + c.keepaliveTimer = time.NewTimer(clientUDPKeepalivePeriod) + + default: // TCP + c.checkStreamTimer = time.NewTimer(clientCheckStreamPeriod) + c.tcpLastFrameTime = time.Now().Unix() + } + + } else { + c.reportTimer = time.NewTimer(c.senderReportPeriod) + } + + // allow writing c.writeMutex.Lock() c.writeFrameAllowed = true c.writeMutex.Unlock() - c.backgroundRunning = true - c.backgroundTerminate = make(chan struct{}) - c.backgroundInnerDone = make(chan error) - - if !isSwitchingProtocol { - c.backgroundDone = make(chan struct{}) + // start UDP listeners + if *c.protocol == TransportUDP || *c.protocol == TransportUDPMulticast { + for _, cct := range c.tracks { + cct.udpRTPListener.start() + cct.udpRTCPListener.start() + } } - go c.runBackground() + // for some reason, SetReadDeadline() must always be called in the same + // goroutine, otherwise Read() freezes. + // therefore, we disable the deadline and perform a check with a ticker. + c.nconn.SetReadDeadline(time.Time{}) + + // start reader + c.readerRunning = true + c.readerErr = make(chan error) + go func() { + c.readerErr <- c.runReader() + }() } -func (c *Client) backgroundClose(isSwitchingProtocol bool) { - close(c.backgroundTerminate) - err := <-c.backgroundInnerDone - c.backgroundRunning = false +func (c *Client) runReader() error { + if *c.protocol == TransportUDP || *c.protocol == TransportUDPMulticast { + for { + var res base.Response + err := res.Read(c.br) + if err != nil { + return err + } + } + } else { + if c.state == clientStatePlay { + for { + frame := base.InterleavedFrame{ + Payload: c.tcpFrameBuffer.Next(), + } + err := frame.Read(c.br) + if err != nil { + return err + } - if !isSwitchingProtocol { - c.backgroundErr = err - close(c.backgroundDone) + channel := frame.Channel + isRTP := true + if (channel % 2) != 0 { + channel-- + isRTP = false + } + + trackID, ok := c.tracksByChannel[channel] + if !ok { + continue + } + + now := time.Now() + atomic.StoreInt64(&c.tcpLastFrameTime, now.Unix()) + + if isRTP { + c.tracks[trackID].rtcpReceiver.ProcessPacketRTP(now, frame.Payload) + c.OnPacketRTP(c, trackID, frame.Payload) + } else { + c.tracks[trackID].rtcpReceiver.ProcessPacketRTCP(now, frame.Payload) + c.OnPacketRTCP(c, trackID, frame.Payload) + } + } + } else { // Record + for { + frame := base.InterleavedFrame{ + Payload: c.tcpFrameBuffer.Next(), + } + err := frame.Read(c.br) + if err != nil { + return err + } + + channel := frame.Channel + isRTP := true + if (channel % 2) != 0 { + channel-- + isRTP = false + } + + trackID, ok := c.tracksByChannel[channel] + if !ok { + continue + } + + if !isRTP { + c.OnPacketRTCP(c, trackID, frame.Payload) + } + } + } + } +} + +func (c *Client) playRecordClose() { + // stop reader + if c.readerRunning { + c.nconn.SetReadDeadline(time.Now()) + <-c.readerErr } + // stop UDP listeners + if *c.protocol == TransportUDP || *c.protocol == TransportUDPMulticast { + for _, cct := range c.tracks { + cct.udpRTPListener.stop() + cct.udpRTCPListener.stop() + } + } + + // forbid writing c.writeMutex.Lock() c.writeFrameAllowed = false c.writeMutex.Unlock() -} -func (c *Client) runBackground() { - c.backgroundInnerDone <- func() error { - if c.state == clientStatePlay { - if *c.protocol == TransportUDP || *c.protocol == TransportUDPMulticast { - return c.runBackgroundPlayUDP() - } - return c.runBackgroundPlayTCP() - } - - if *c.protocol == TransportUDP { - return c.runBackgroundRecordUDP() - } - return c.runBackgroundRecordTCP() - }() -} - -func (c *Client) runBackgroundPlayUDP() error { - for _, cct := range c.tracks { - cct.udpRTPListener.start() - cct.udpRTCPListener.start() - } - - defer func() { - for _, cct := range c.tracks { - cct.udpRTPListener.stop() - cct.udpRTCPListener.stop() - } - }() - - // disable deadline - c.nconn.SetReadDeadline(time.Time{}) - - readerDone := make(chan error) - go func() { - for { - var res base.Response - err := res.Read(c.br) - if err != nil { - readerDone <- err - return - } - } - }() - - reportTicker := time.NewTicker(c.receiverReportPeriod) - defer reportTicker.Stop() - - keepaliveTicker := time.NewTicker(clientUDPKeepalivePeriod) - defer keepaliveTicker.Stop() - - checkStreamInitial := true - checkStreamTicker := time.NewTicker(c.InitialUDPReadTimeout) - defer func() { - checkStreamTicker.Stop() - }() - - for { - select { - case <-c.backgroundTerminate: - c.nconn.SetReadDeadline(time.Now()) - <-readerDone - return fmt.Errorf("terminated") - - case <-reportTicker.C: - now := time.Now() - for trackID, cct := range c.tracks { - rr := cct.rtcpReceiver.Report(now) - c.WritePacketRTCP(trackID, rr) - } - - case <-keepaliveTicker.C: - _, err := c.do(&base.Request{ - Method: func() base.Method { - // the vlc integrated rtsp server requires GET_PARAMETER - if c.useGetParameter { - return base.GetParameter - } - return base.Options - }(), - // use the stream base URL, otherwise some cameras do not reply - URL: c.streamBaseURL, - }, true) - if err != nil { - c.nconn.SetReadDeadline(time.Now()) - <-readerDone - return err - } - - case <-checkStreamTicker.C: - if checkStreamInitial { - // check that at least one packet has been received - inTimeout := func() bool { - for _, cct := range c.tracks { - lft := atomic.LoadInt64(cct.udpRTPListener.lastFrameTime) - if lft != 0 { - return false - } - - lft = atomic.LoadInt64(cct.udpRTCPListener.lastFrameTime) - if lft != 0 { - return false - } - } - return true - }() - if inTimeout { - c.nconn.SetReadDeadline(time.Now()) - <-readerDone - return liberrors.ErrClientNoUDPPacketsRecently{} - } - - checkStreamInitial = false - checkStreamTicker.Stop() - checkStreamTicker = time.NewTicker(clientCheckStreamPeriod) - } else { - inTimeout := func() bool { - now := time.Now() - for _, cct := range c.tracks { - lft := time.Unix(atomic.LoadInt64(cct.udpRTPListener.lastFrameTime), 0) - if now.Sub(lft) < c.ReadTimeout { - return false - } - - lft = time.Unix(atomic.LoadInt64(cct.udpRTCPListener.lastFrameTime), 0) - if now.Sub(lft) < c.ReadTimeout { - return false - } - } - return true - }() - if inTimeout { - c.nconn.SetReadDeadline(time.Now()) - <-readerDone - return liberrors.ErrClientUDPTimeout{} - } - } - - case err := <-readerDone: - return err - } - } -} - -func (c *Client) runBackgroundPlayTCP() error { - // for some reason, SetReadDeadline() must always be called in the same - // goroutine, otherwise Read() freezes. - // therefore, we disable the deadline and perform check with a ticker. - c.nconn.SetReadDeadline(time.Time{}) - - lastFrameTime := time.Now().Unix() - - readerDone := make(chan error) - go func() { - for { - frame := base.InterleavedFrame{ - Payload: c.tcpFrameBuffer.Next(), - } - err := frame.Read(c.br) - if err != nil { - readerDone <- err - return - } - - channel := frame.Channel - isRTP := true - if (channel % 2) != 0 { - channel-- - isRTP = false - } - - trackID, ok := c.tracksByChannel[channel] - if !ok { - continue - } - - now := time.Now() - atomic.StoreInt64(&lastFrameTime, now.Unix()) - - if isRTP { - c.tracks[trackID].rtcpReceiver.ProcessPacketRTP(now, frame.Payload) - c.OnPacketRTP(c, trackID, frame.Payload) - } else { - c.tracks[trackID].rtcpReceiver.ProcessPacketRTCP(now, frame.Payload) - c.OnPacketRTCP(c, trackID, frame.Payload) - } - } - }() - - reportTicker := time.NewTicker(c.receiverReportPeriod) - defer reportTicker.Stop() - - checkStreamTicker := time.NewTicker(clientCheckStreamPeriod) - defer checkStreamTicker.Stop() - - for { - select { - case <-c.backgroundTerminate: - c.nconn.SetReadDeadline(time.Now()) - <-readerDone - return fmt.Errorf("terminated") - - case <-reportTicker.C: - now := time.Now() - for trackID, cct := range c.tracks { - rr := cct.rtcpReceiver.Report(now) - c.WritePacketRTCP(trackID, rr) - } - - case <-checkStreamTicker.C: - inTimeout := func() bool { - now := time.Now() - lft := time.Unix(atomic.LoadInt64(&lastFrameTime), 0) - return now.Sub(lft) >= c.ReadTimeout - }() - if inTimeout { - c.nconn.SetReadDeadline(time.Now()) - <-readerDone - return liberrors.ErrClientTCPTimeout{} - } - - case err := <-readerDone: - return err - } - } -} - -func (c *Client) runBackgroundRecordUDP() error { - for _, cct := range c.tracks { - cct.udpRTPListener.start() - cct.udpRTCPListener.start() - } - - defer func() { - for _, cct := range c.tracks { - cct.udpRTPListener.stop() - cct.udpRTCPListener.stop() - } - }() - - // disable deadline - c.nconn.SetReadDeadline(time.Time{}) - - readerDone := make(chan error) - go func() { - for { - var res base.Response - err := res.Read(c.br) - if err != nil { - readerDone <- err - return - } - } - }() - - reportTicker := time.NewTicker(c.senderReportPeriod) - defer reportTicker.Stop() - - for { - select { - case <-c.backgroundTerminate: - c.nconn.SetReadDeadline(time.Now()) - <-readerDone - return fmt.Errorf("terminated") - - case <-reportTicker.C: - now := time.Now() - for trackID, cct := range c.tracks { - sr := cct.rtcpSender.Report(now) - if sr != nil { - c.WritePacketRTCP(trackID, sr) - } - } - - case err := <-readerDone: - return err - } - } -} - -func (c *Client) runBackgroundRecordTCP() error { - // disable deadline - c.nconn.SetReadDeadline(time.Time{}) - - readerDone := make(chan error) - go func() { - for { - frame := base.InterleavedFrame{ - Payload: c.tcpFrameBuffer.Next(), - } - err := frame.Read(c.br) - if err != nil { - readerDone <- err - return - } - - channel := frame.Channel - isRTP := true - if (channel % 2) != 0 { - channel-- - isRTP = false - } - - trackID, ok := c.tracksByChannel[channel] - if !ok { - continue - } - - if !isRTP { - c.OnPacketRTCP(c, trackID, frame.Payload) - } - } - }() - - reportTicker := time.NewTicker(c.senderReportPeriod) - defer reportTicker.Stop() - - for { - select { - case <-c.backgroundTerminate: - c.nconn.SetReadDeadline(time.Now()) - <-readerDone - return fmt.Errorf("terminated") - - case <-reportTicker.C: - now := time.Now() - for trackID, cct := range c.tracks { - sr := cct.rtcpSender.Report(now) - if sr != nil { - c.WritePacketRTCP(trackID, sr) - } - } - - case err := <-readerDone: - return err - } - } + // stop timers + c.reportTimer = emptyTimer() + c.checkStreamTimer = emptyTimer() + c.keepaliveTimer = emptyTimer() } func (c *Client) connOpen() error { @@ -1164,7 +1038,7 @@ func (c *Client) doDescribe(u *base.URL) (Tracks, *base.URL, *base.Response, err res.StatusCode >= base.StatusMovedPermanently && res.StatusCode <= base.StatusUseProxy && len(res.Header["Location"]) == 1 { - c.reset(false) + c.reset() u, err := base.ParseURL(res.Header["Location"][0]) if err != nil { @@ -1336,7 +1210,7 @@ func (c *Client) doSetup( } proto := func() Transport { - // protocol set by previous Setup() or switchProtocolIfTimeout() + // protocol set by previous Setup() or trySwitchingProtocol() if c.protocol != nil { return *c.protocol } @@ -1676,7 +1550,7 @@ func (c *Client) doPlay(ra *headers.Range, isSwitchingProtocol bool) (*base.Resp c.state = clientStatePlay c.lastRange = ra - c.backgroundStart(isSwitchingProtocol) + c.playRecordStart() return res, nil } @@ -1719,7 +1593,7 @@ func (c *Client) doRecord() (*base.Response, error) { c.state = clientStateRecord - c.backgroundStart(false) + c.playRecordStart() return nil, nil } @@ -1747,7 +1621,7 @@ func (c *Client) doPause() (*base.Response, error) { return nil, err } - c.backgroundClose(false) + c.playRecordClose() res, err := c.do(&base.Request{ Method: base.Pause, @@ -1799,21 +1673,27 @@ func (c *Client) Seek(ra *headers.Range) (*base.Response, error) { // ReadFrames starts reading frames. func (c *Client) ReadFrames() error { - <-c.backgroundDone - return c.backgroundErr + <-c.done + return c.finalErr } // WritePacketRTP writes a RTP packet. func (c *Client) WritePacketRTP(trackID int, payload []byte) error { - now := time.Now() + select { + case <-c.done: + return c.finalErr + default: + } c.writeMutex.RLock() defer c.writeMutex.RUnlock() if !c.writeFrameAllowed { - return c.backgroundErr + return liberrors.ErrClientWriteNotAllowed{} } + now := time.Now() + if c.tracks[trackID].rtcpSender != nil { c.tracks[trackID].rtcpSender.ProcessPacketRTP(now, payload) } @@ -1838,15 +1718,21 @@ func (c *Client) WritePacketRTP(trackID int, payload []byte) error { // WritePacketRTCP writes a RTCP packet. func (c *Client) WritePacketRTCP(trackID int, payload []byte) error { - now := time.Now() + select { + case <-c.done: + return c.finalErr + default: + } c.writeMutex.RLock() defer c.writeMutex.RUnlock() if !c.writeFrameAllowed { - return c.backgroundErr + return liberrors.ErrClientWriteNotAllowed{} } + now := time.Now() + if c.tracks[trackID].rtcpSender != nil { c.tracks[trackID].rtcpSender.ProcessPacketRTCP(now, payload) } diff --git a/client_read_test.go b/client_read_test.go index 60e2f8ad..d7793202 100644 --- a/client_read_test.go +++ b/client_read_test.go @@ -1768,9 +1768,6 @@ func TestClientReadPause(t *testing.T) { <-frameRecv _, err = c.Pause() require.NoError(t, err) - <-done - - c.ReadFrames() firstFrame = int32(0) frameRecv = make(chan struct{}) @@ -1778,12 +1775,6 @@ func TestClientReadPause(t *testing.T) { _, err = c.Play(nil) require.NoError(t, err) - done = make(chan struct{}) - go func() { - defer close(done) - c.ReadFrames() - }() - <-frameRecv c.Close() <-done diff --git a/pkg/liberrors/client.go b/pkg/liberrors/client.go index c6ba0ac2..58904a02 100644 --- a/pkg/liberrors/client.go +++ b/pkg/liberrors/client.go @@ -163,14 +163,6 @@ func (e ErrClientTransportHeaderInterleavedIDsAlreadyUsed) Error() string { return "interleaved IDs already used" } -// ErrClientNoUDPPacketsRecently is an error that can be returned by a client. -type ErrClientNoUDPPacketsRecently struct{} - -// Error implements the error interface. -func (e ErrClientNoUDPPacketsRecently) Error() string { - return "no UDP packets received (maybe there's a firewall/NAT in between)" -} - // ErrClientUDPTimeout is an error that can be returned by a client. type ErrClientUDPTimeout struct{} @@ -196,3 +188,11 @@ type ErrClientRTPInfoInvalid struct { func (e ErrClientRTPInfoInvalid) Error() string { return fmt.Sprintf("invalid RTP-Info: %v", e.Err) } + +// ErrClientWriteNotAllowed is an error that can be returned by a client. +type ErrClientWriteNotAllowed struct{} + +// Error implements the error interface. +func (e ErrClientWriteNotAllowed) Error() string { + return "writing is not allowed at the moment" +}