mirror of
https://github.com/aler9/gortsplib
synced 2025-10-05 15:16:51 +08:00
client: save RAM by releasing read buffers earlier
This commit is contained in:
42
client.go
42
client.go
@@ -213,8 +213,6 @@ type Client struct {
|
|||||||
tracks map[int]clientTrack
|
tracks map[int]clientTrack
|
||||||
tracksByChannel map[int]int
|
tracksByChannel map[int]int
|
||||||
lastRange *headers.Range
|
lastRange *headers.Range
|
||||||
tcpReadBuffer *multibuffer.MultiBuffer
|
|
||||||
tcpRTPPacketBuffer *rtpPacketMultiBuffer
|
|
||||||
writeMutex sync.RWMutex // publish
|
writeMutex sync.RWMutex // publish
|
||||||
writeFrameAllowed bool // publish
|
writeFrameAllowed bool // publish
|
||||||
udpReportTimer *time.Timer
|
udpReportTimer *time.Timer
|
||||||
@@ -563,7 +561,7 @@ func (c *Client) run() {
|
|||||||
}(),
|
}(),
|
||||||
// use the stream base URL, otherwise some cameras do not reply
|
// use the stream base URL, otherwise some cameras do not reply
|
||||||
URL: c.streamBaseURL,
|
URL: c.streamBaseURL,
|
||||||
}, true)
|
}, true, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -599,7 +597,7 @@ func (c *Client) doClose() {
|
|||||||
c.do(&base.Request{
|
c.do(&base.Request{
|
||||||
Method: base.Teardown,
|
Method: base.Teardown,
|
||||||
URL: c.streamBaseURL,
|
URL: c.streamBaseURL,
|
||||||
}, true)
|
}, true, false)
|
||||||
|
|
||||||
c.conn.Close()
|
c.conn.Close()
|
||||||
c.conn = nil
|
c.conn = nil
|
||||||
@@ -629,7 +627,6 @@ func (c *Client) reset() {
|
|||||||
c.protocol = nil
|
c.protocol = nil
|
||||||
c.tracks = nil
|
c.tracks = nil
|
||||||
c.tracksByChannel = nil
|
c.tracksByChannel = nil
|
||||||
c.tcpReadBuffer = nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) checkState(allowed map[clientState]struct{}) error {
|
func (c *Client) checkState(allowed map[clientState]struct{}) error {
|
||||||
@@ -756,12 +753,14 @@ func (c *Client) runReader() {
|
|||||||
var processFunc func(int, bool, []byte)
|
var processFunc func(int, bool, []byte)
|
||||||
|
|
||||||
if c.state == clientStatePlay {
|
if c.state == clientStatePlay {
|
||||||
|
tcpRTPPacketBuffer := newRTPPacketMultiBuffer(uint64(c.ReadBufferCount))
|
||||||
|
|
||||||
processFunc = func(trackID int, isRTP bool, payload []byte) {
|
processFunc = func(trackID int, isRTP bool, payload []byte) {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
atomic.StoreInt64(c.tcpLastFrameTime, now.Unix())
|
atomic.StoreInt64(c.tcpLastFrameTime, now.Unix())
|
||||||
|
|
||||||
if isRTP {
|
if isRTP {
|
||||||
pkt := c.tcpRTPPacketBuffer.next()
|
pkt := tcpRTPPacketBuffer.next()
|
||||||
err := pkt.Unmarshal(payload)
|
err := pkt.Unmarshal(payload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
@@ -796,11 +795,12 @@ func (c *Client) runReader() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tcpReadBuffer := multibuffer.New(uint64(c.ReadBufferCount), uint64(c.ReadBufferSize))
|
||||||
var frame base.InterleavedFrame
|
var frame base.InterleavedFrame
|
||||||
var res base.Response
|
var res base.Response
|
||||||
|
|
||||||
for {
|
for {
|
||||||
frame.Payload = c.tcpReadBuffer.Next()
|
frame.Payload = tcpReadBuffer.Next()
|
||||||
what, err := base.ReadInterleavedFrameOrResponse(&frame, &res, c.br)
|
what, err := base.ReadInterleavedFrameOrResponse(&frame, &res, c.br)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -918,7 +918,7 @@ func (c *Client) connCloserStop() {
|
|||||||
c.connCloserDone = nil
|
c.connCloserDone = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) do(req *base.Request, skipResponse bool) (*base.Response, error) {
|
func (c *Client) do(req *base.Request, skipResponse bool, allowFrames bool) (*base.Response, error) {
|
||||||
if c.conn == nil {
|
if c.conn == nil {
|
||||||
err := c.connOpen()
|
err := c.connOpen()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -965,12 +965,13 @@ func (c *Client) do(req *base.Request, skipResponse bool) (*base.Response, error
|
|||||||
|
|
||||||
c.conn.SetReadDeadline(time.Now().Add(c.ReadTimeout))
|
c.conn.SetReadDeadline(time.Now().Add(c.ReadTimeout))
|
||||||
|
|
||||||
if c.tcpReadBuffer != nil {
|
if allowFrames {
|
||||||
// read the response and ignore interleaved frames in between;
|
// read the response and ignore interleaved frames in between;
|
||||||
// interleaved frames are sent in two scenarios:
|
// interleaved frames are sent in two scenarios:
|
||||||
// * when the server is v4lrtspserver, before the PLAY response
|
// * when the server is v4lrtspserver, before the PLAY response
|
||||||
// * when the stream is already playing
|
// * when the stream is already playing
|
||||||
err = res.ReadIgnoreFrames(c.br, c.tcpReadBuffer.Next())
|
buf := make([]byte, c.ReadBufferSize)
|
||||||
|
err = res.ReadIgnoreFrames(c.br, buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -1016,7 +1017,7 @@ func (c *Client) do(req *base.Request, skipResponse bool) (*base.Response, error
|
|||||||
}
|
}
|
||||||
c.sender = sender
|
c.sender = sender
|
||||||
|
|
||||||
return c.do(req, false)
|
return c.do(req, skipResponse, allowFrames)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &res, nil
|
return &res, nil
|
||||||
@@ -1035,7 +1036,7 @@ func (c *Client) doOptions(u *base.URL) (*base.Response, error) {
|
|||||||
res, err := c.do(&base.Request{
|
res, err := c.do(&base.Request{
|
||||||
Method: base.Options,
|
Method: base.Options,
|
||||||
URL: u,
|
URL: u,
|
||||||
}, false)
|
}, false, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -1095,7 +1096,7 @@ func (c *Client) doDescribe(u *base.URL) (Tracks, *base.URL, *base.Response, err
|
|||||||
Header: base.Header{
|
Header: base.Header{
|
||||||
"Accept": base.HeaderValue{"application/sdp"},
|
"Accept": base.HeaderValue{"application/sdp"},
|
||||||
},
|
},
|
||||||
}, false)
|
}, false, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, err
|
return nil, nil, nil, err
|
||||||
}
|
}
|
||||||
@@ -1208,7 +1209,7 @@ func (c *Client) doAnnounce(u *base.URL, tracks Tracks) (*base.Response, error)
|
|||||||
"Content-Type": base.HeaderValue{"application/sdp"},
|
"Content-Type": base.HeaderValue{"application/sdp"},
|
||||||
},
|
},
|
||||||
Body: tracks.Write(false),
|
Body: tracks.Write(false),
|
||||||
}, false)
|
}, false, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -1360,7 +1361,7 @@ func (c *Client) doSetup(
|
|||||||
Header: base.Header{
|
Header: base.Header{
|
||||||
"Transport": th.Write(),
|
"Transport": th.Write(),
|
||||||
},
|
},
|
||||||
}, false)
|
}, false, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if proto == TransportUDP {
|
if proto == TransportUDP {
|
||||||
rtpListener.close()
|
rtpListener.close()
|
||||||
@@ -1535,11 +1536,6 @@ func (c *Client) doSetup(
|
|||||||
}
|
}
|
||||||
|
|
||||||
case TransportTCP:
|
case TransportTCP:
|
||||||
if c.tcpReadBuffer == nil {
|
|
||||||
c.tcpReadBuffer = multibuffer.New(uint64(c.ReadBufferCount), uint64(c.ReadBufferSize))
|
|
||||||
c.tcpRTPPacketBuffer = newRTPPacketMultiBuffer(uint64(c.ReadBufferCount))
|
|
||||||
}
|
|
||||||
|
|
||||||
if c.tracksByChannel == nil {
|
if c.tracksByChannel == nil {
|
||||||
c.tracksByChannel = make(map[int]int)
|
c.tracksByChannel = make(map[int]int)
|
||||||
}
|
}
|
||||||
@@ -1633,7 +1629,7 @@ func (c *Client) doPlay(ra *headers.Range, isSwitchingProtocol bool) (*base.Resp
|
|||||||
Header: base.Header{
|
Header: base.Header{
|
||||||
"Range": ra.Write(),
|
"Range": ra.Write(),
|
||||||
},
|
},
|
||||||
}, false)
|
}, false, *c.protocol == TransportTCP)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if *c.protocol == TransportUDP || *c.protocol == TransportUDPMulticast {
|
if *c.protocol == TransportUDP || *c.protocol == TransportUDPMulticast {
|
||||||
for _, cct := range c.tracks {
|
for _, cct := range c.tracks {
|
||||||
@@ -1710,7 +1706,7 @@ func (c *Client) doRecord() (*base.Response, error) {
|
|||||||
res, err := c.do(&base.Request{
|
res, err := c.do(&base.Request{
|
||||||
Method: base.Record,
|
Method: base.Record,
|
||||||
URL: c.streamBaseURL,
|
URL: c.streamBaseURL,
|
||||||
}, false)
|
}, false, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if *c.protocol == TransportUDP {
|
if *c.protocol == TransportUDP {
|
||||||
for _, cct := range c.tracks {
|
for _, cct := range c.tracks {
|
||||||
@@ -1784,7 +1780,7 @@ func (c *Client) doPause() (*base.Response, error) {
|
|||||||
res, err := c.do(&base.Request{
|
res, err := c.do(&base.Request{
|
||||||
Method: base.Pause,
|
Method: base.Pause,
|
||||||
URL: c.streamBaseURL,
|
URL: c.streamBaseURL,
|
||||||
}, false)
|
}, false, *c.protocol == TransportTCP)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user