diff --git a/client.go b/client.go index 8691d9c9..304ab9c1 100644 --- a/client.go +++ b/client.go @@ -213,8 +213,6 @@ type Client struct { tracks map[int]clientTrack tracksByChannel map[int]int lastRange *headers.Range - tcpReadBuffer *multibuffer.MultiBuffer - tcpRTPPacketBuffer *rtpPacketMultiBuffer writeMutex sync.RWMutex // publish writeFrameAllowed bool // publish udpReportTimer *time.Timer @@ -563,7 +561,7 @@ func (c *Client) run() { }(), // use the stream base URL, otherwise some cameras do not reply URL: c.streamBaseURL, - }, true) + }, true, false) if err != nil { return err } @@ -599,7 +597,7 @@ func (c *Client) doClose() { c.do(&base.Request{ Method: base.Teardown, URL: c.streamBaseURL, - }, true) + }, true, false) c.conn.Close() c.conn = nil @@ -629,7 +627,6 @@ func (c *Client) reset() { c.protocol = nil c.tracks = nil c.tracksByChannel = nil - c.tcpReadBuffer = nil } func (c *Client) checkState(allowed map[clientState]struct{}) error { @@ -756,12 +753,14 @@ func (c *Client) runReader() { var processFunc func(int, bool, []byte) if c.state == clientStatePlay { + tcpRTPPacketBuffer := newRTPPacketMultiBuffer(uint64(c.ReadBufferCount)) + processFunc = func(trackID int, isRTP bool, payload []byte) { now := time.Now() atomic.StoreInt64(c.tcpLastFrameTime, now.Unix()) if isRTP { - pkt := c.tcpRTPPacketBuffer.next() + pkt := tcpRTPPacketBuffer.next() err := pkt.Unmarshal(payload) if err != nil { return @@ -796,11 +795,12 @@ func (c *Client) runReader() { } } + tcpReadBuffer := multibuffer.New(uint64(c.ReadBufferCount), uint64(c.ReadBufferSize)) var frame base.InterleavedFrame var res base.Response for { - frame.Payload = c.tcpReadBuffer.Next() + frame.Payload = tcpReadBuffer.Next() what, err := base.ReadInterleavedFrameOrResponse(&frame, &res, c.br) if err != nil { return err @@ -918,7 +918,7 @@ func (c *Client) connCloserStop() { c.connCloserDone = nil } -func (c *Client) do(req *base.Request, skipResponse bool) (*base.Response, error) { +func (c *Client) do(req *base.Request, skipResponse bool, allowFrames bool) (*base.Response, error) { if c.conn == nil { err := c.connOpen() if err != nil { @@ -965,12 +965,13 @@ func (c *Client) do(req *base.Request, skipResponse bool) (*base.Response, error c.conn.SetReadDeadline(time.Now().Add(c.ReadTimeout)) - if c.tcpReadBuffer != nil { + if allowFrames { // read the response and ignore interleaved frames in between; // interleaved frames are sent in two scenarios: // * when the server is v4lrtspserver, before the PLAY response // * when the stream is already playing - err = res.ReadIgnoreFrames(c.br, c.tcpReadBuffer.Next()) + buf := make([]byte, c.ReadBufferSize) + err = res.ReadIgnoreFrames(c.br, buf) if err != nil { return err } @@ -1016,7 +1017,7 @@ func (c *Client) do(req *base.Request, skipResponse bool) (*base.Response, error } c.sender = sender - return c.do(req, false) + return c.do(req, skipResponse, allowFrames) } return &res, nil @@ -1035,7 +1036,7 @@ func (c *Client) doOptions(u *base.URL) (*base.Response, error) { res, err := c.do(&base.Request{ Method: base.Options, URL: u, - }, false) + }, false, false) if err != nil { return nil, err } @@ -1095,7 +1096,7 @@ func (c *Client) doDescribe(u *base.URL) (Tracks, *base.URL, *base.Response, err Header: base.Header{ "Accept": base.HeaderValue{"application/sdp"}, }, - }, false) + }, false, false) if err != nil { return nil, nil, nil, err } @@ -1208,7 +1209,7 @@ func (c *Client) doAnnounce(u *base.URL, tracks Tracks) (*base.Response, error) "Content-Type": base.HeaderValue{"application/sdp"}, }, Body: tracks.Write(false), - }, false) + }, false, false) if err != nil { return nil, err } @@ -1360,7 +1361,7 @@ func (c *Client) doSetup( Header: base.Header{ "Transport": th.Write(), }, - }, false) + }, false, false) if err != nil { if proto == TransportUDP { rtpListener.close() @@ -1535,11 +1536,6 @@ func (c *Client) doSetup( } case TransportTCP: - if c.tcpReadBuffer == nil { - c.tcpReadBuffer = multibuffer.New(uint64(c.ReadBufferCount), uint64(c.ReadBufferSize)) - c.tcpRTPPacketBuffer = newRTPPacketMultiBuffer(uint64(c.ReadBufferCount)) - } - if c.tracksByChannel == nil { c.tracksByChannel = make(map[int]int) } @@ -1633,7 +1629,7 @@ func (c *Client) doPlay(ra *headers.Range, isSwitchingProtocol bool) (*base.Resp Header: base.Header{ "Range": ra.Write(), }, - }, false) + }, false, *c.protocol == TransportTCP) if err != nil { if *c.protocol == TransportUDP || *c.protocol == TransportUDPMulticast { for _, cct := range c.tracks { @@ -1710,7 +1706,7 @@ func (c *Client) doRecord() (*base.Response, error) { res, err := c.do(&base.Request{ Method: base.Record, URL: c.streamBaseURL, - }, false) + }, false, false) if err != nil { if *c.protocol == TransportUDP { for _, cct := range c.tracks { @@ -1784,7 +1780,7 @@ func (c *Client) doPause() (*base.Response, error) { res, err := c.do(&base.Request{ Method: base.Pause, URL: c.streamBaseURL, - }, false) + }, false, *c.protocol == TransportTCP) if err != nil { return nil, err }