client: add write buffer

This commit is contained in:
aler9
2021-12-08 13:39:11 +01:00
parent ab465820ce
commit 5f3f7ec93a

244
client.go
View File

@@ -27,6 +27,7 @@ import (
"github.com/aler9/gortsplib/pkg/headers"
"github.com/aler9/gortsplib/pkg/liberrors"
"github.com/aler9/gortsplib/pkg/multibuffer"
"github.com/aler9/gortsplib/pkg/ringbuffer"
"github.com/aler9/gortsplib/pkg/rtcpreceiver"
"github.com/aler9/gortsplib/pkg/rtcpsender"
)
@@ -221,6 +222,8 @@ type Client struct {
tcpLastFrameTime int64
keepaliveTimer *time.Timer
closeError error
writerRunning bool
writeBuffer *ringbuffer.RingBuffer
// connCloser channels
connCloserTerminate chan struct{}
@@ -229,6 +232,9 @@ type Client struct {
// reader channels
readerErr chan error
// writer channels
writerDone chan struct{}
// in
options chan optionsReq
describe chan describeReq
@@ -672,6 +678,19 @@ func (c *Client) playRecordStart() {
// stop connCloser
c.connCloserStop()
// start writer
if c.state == clientStatePlay {
// when reading, writeBuffer is only used to send RTCP receiver reports,
// that are much smaller than RTP packets and are sent at a fixed interval.
// decrease RAM consumption by allocating less buffers.
c.writeBuffer = ringbuffer.New(8)
} else {
c.writeBuffer = ringbuffer.New(uint64(c.ReadBufferCount))
}
c.writerRunning = true
c.writerDone = make(chan struct{})
go c.runWriter()
// allow writing
c.writeMutex.Lock()
c.writeFrameAllowed = true
@@ -712,71 +731,71 @@ func (c *Client) playRecordStart() {
// start reader
c.readerErr = make(chan error)
go func() {
c.readerErr <- c.runReader()
}()
go c.runReader()
}
func (c *Client) runReader() error {
if *c.protocol == TransportUDP || *c.protocol == TransportUDPMulticast {
for {
var res base.Response
err := res.Read(c.br)
if err != nil {
return err
}
}
} else {
var processFunc func(int, bool, []byte)
if c.state == clientStatePlay {
processFunc = func(trackID int, isRTP bool, payload []byte) {
now := time.Now()
atomic.StoreInt64(&c.tcpLastFrameTime, now.Unix())
if isRTP {
c.tracks[trackID].rtcpReceiver.ProcessPacketRTP(now, payload)
c.OnPacketRTP(trackID, payload)
} else {
c.tracks[trackID].rtcpReceiver.ProcessPacketRTCP(now, payload)
c.OnPacketRTCP(trackID, payload)
func (c *Client) runReader() {
c.readerErr <- func() error {
if *c.protocol == TransportUDP || *c.protocol == TransportUDPMulticast {
for {
var res base.Response
err := res.Read(c.br)
if err != nil {
return err
}
}
} else {
processFunc = func(trackID int, isRTP bool, payload []byte) {
if !isRTP {
c.OnPacketRTCP(trackID, payload)
var processFunc func(int, bool, []byte)
if c.state == clientStatePlay {
processFunc = func(trackID int, isRTP bool, payload []byte) {
now := time.Now()
atomic.StoreInt64(&c.tcpLastFrameTime, now.Unix())
if isRTP {
c.tracks[trackID].rtcpReceiver.ProcessPacketRTP(now, payload)
c.OnPacketRTP(trackID, payload)
} else {
c.tracks[trackID].rtcpReceiver.ProcessPacketRTCP(now, payload)
c.OnPacketRTCP(trackID, payload)
}
}
} else {
processFunc = func(trackID int, isRTP bool, payload []byte) {
if !isRTP {
c.OnPacketRTCP(trackID, payload)
}
}
}
frame := base.InterleavedFrame{}
res := base.Response{}
for {
frame.Payload = c.tcpReadBuffer.Next()
what, err := base.ReadInterleavedFrameOrResponse(&frame, &res, c.br)
if err != nil {
return err
}
if _, ok := what.(*base.InterleavedFrame); ok {
channel := frame.Channel
isRTP := true
if (channel % 2) != 0 {
channel--
isRTP = false
}
trackID, ok := c.tracksByChannel[channel]
if !ok {
continue
}
processFunc(trackID, isRTP, frame.Payload)
}
}
}
frame := base.InterleavedFrame{}
res := base.Response{}
for {
frame.Payload = c.tcpReadBuffer.Next()
what, err := base.ReadInterleavedFrameOrResponse(&frame, &res, c.br)
if err != nil {
return err
}
if _, ok := what.(*base.InterleavedFrame); ok {
channel := frame.Channel
isRTP := true
if (channel % 2) != 0 {
channel--
isRTP = false
}
trackID, ok := c.tracksByChannel[channel]
if !ok {
continue
}
processFunc(trackID, isRTP, frame.Payload)
}
}
}
}()
}
func (c *Client) playRecordStop(isClosing bool) {
@@ -796,6 +815,11 @@ func (c *Client) playRecordStop(isClosing bool) {
c.writeFrameAllowed = false
c.writeMutex.Unlock()
// stop writer
c.writeBuffer.Close()
<-c.writerDone
c.writerRunning = false
// start connCloser
if !isClosing {
c.connCloserStart()
@@ -1768,6 +1792,62 @@ func (c *Client) Seek(ra *headers.Range) (*base.Response, error) {
return c.Play(ra)
}
func (c *Client) runWriter() {
defer close(c.writerDone)
var writeFunc func(int, bool, []byte)
switch *c.protocol {
case TransportUDP, TransportUDPMulticast:
writeFunc = func(trackID int, isRTP bool, payload []byte) {
if isRTP {
if c.tracks[trackID].rtcpSender != nil {
c.tracks[trackID].rtcpSender.ProcessPacketRTP(time.Now(), payload)
}
c.tracks[trackID].udpRTPListener.write(payload)
} else {
c.tracks[trackID].udpRTCPListener.write(payload)
}
}
default: //TCP
writeFunc = func(trackID int, isRTP bool, payload []byte) {
if isRTP {
if c.tracks[trackID].rtcpSender != nil {
c.tracks[trackID].rtcpSender.ProcessPacketRTP(time.Now(), payload)
}
f := c.tracks[trackID].tcpRTPFrame
f.Payload = payload
c.tcpWriteMutex.Lock()
c.nconn.SetWriteDeadline(time.Now().Add(c.WriteTimeout))
f.Write(c.bw)
c.tcpWriteMutex.Unlock()
} else {
f := c.tracks[trackID].tcpRTCPFrame
f.Payload = payload
c.tcpWriteMutex.Lock()
c.nconn.SetWriteDeadline(time.Now().Add(c.WriteTimeout))
f.Write(c.bw)
c.tcpWriteMutex.Unlock()
}
}
}
for {
tmp, ok := c.writeBuffer.Pull()
if !ok {
return
}
data := tmp.(trackTypePayload)
writeFunc(data.trackID, data.isRTP, data.payload)
}
}
// WritePacketRTP writes a RTP packet.
func (c *Client) WritePacketRTP(trackID int, payload []byte) error {
c.writeMutex.RLock()
@@ -1782,27 +1862,12 @@ func (c *Client) WritePacketRTP(trackID int, payload []byte) error {
}
}
now := time.Now()
if c.tracks[trackID].rtcpSender != nil {
c.tracks[trackID].rtcpSender.ProcessPacketRTP(now, payload)
}
switch *c.protocol {
case TransportUDP, TransportUDPMulticast:
return c.tracks[trackID].udpRTPListener.write(payload)
default: // TCP
f := c.tracks[trackID].tcpRTPFrame
// a mutex is needed here since bufio.Writer is not thread safe.
c.tcpWriteMutex.Lock()
defer c.tcpWriteMutex.Unlock()
c.nconn.SetWriteDeadline(now.Add(c.WriteTimeout))
f.Payload = payload
return f.Write(c.bw)
}
c.writeBuffer.Push(trackTypePayload{
trackID: trackID,
isRTP: true,
payload: payload,
})
return nil
}
// WritePacketRTCP writes a RTCP packet.
@@ -1819,21 +1884,10 @@ func (c *Client) WritePacketRTCP(trackID int, payload []byte) error {
}
}
now := time.Now()
switch *c.protocol {
case TransportUDP, TransportUDPMulticast:
return c.tracks[trackID].udpRTCPListener.write(payload)
default: // TCP
f := c.tracks[trackID].tcpRTCPFrame
// a mutex is needed here since bufio.Writer is not thread safe.
c.tcpWriteMutex.Lock()
defer c.tcpWriteMutex.Unlock()
c.nconn.SetWriteDeadline(now.Add(c.WriteTimeout))
f.Payload = payload
return f.Write(c.bw)
}
c.writeBuffer.Push(trackTypePayload{
trackID: trackID,
isRTP: false,
payload: payload,
})
return nil
}