diff --git a/client.go b/client.go index a3eb74dc..bba9ce61 100644 --- a/client.go +++ b/client.go @@ -27,6 +27,7 @@ import ( "github.com/aler9/gortsplib/pkg/headers" "github.com/aler9/gortsplib/pkg/liberrors" "github.com/aler9/gortsplib/pkg/multibuffer" + "github.com/aler9/gortsplib/pkg/ringbuffer" "github.com/aler9/gortsplib/pkg/rtcpreceiver" "github.com/aler9/gortsplib/pkg/rtcpsender" ) @@ -221,6 +222,8 @@ type Client struct { tcpLastFrameTime int64 keepaliveTimer *time.Timer closeError error + writerRunning bool + writeBuffer *ringbuffer.RingBuffer // connCloser channels connCloserTerminate chan struct{} @@ -229,6 +232,9 @@ type Client struct { // reader channels readerErr chan error + // writer channels + writerDone chan struct{} + // in options chan optionsReq describe chan describeReq @@ -672,6 +678,19 @@ func (c *Client) playRecordStart() { // stop connCloser c.connCloserStop() + // start writer + if c.state == clientStatePlay { + // when reading, writeBuffer is only used to send RTCP receiver reports, + // that are much smaller than RTP packets and are sent at a fixed interval. + // decrease RAM consumption by allocating less buffers. + c.writeBuffer = ringbuffer.New(8) + } else { + c.writeBuffer = ringbuffer.New(uint64(c.ReadBufferCount)) + } + c.writerRunning = true + c.writerDone = make(chan struct{}) + go c.runWriter() + // allow writing c.writeMutex.Lock() c.writeFrameAllowed = true @@ -712,71 +731,71 @@ func (c *Client) playRecordStart() { // start reader c.readerErr = make(chan error) - go func() { - c.readerErr <- c.runReader() - }() + go c.runReader() } -func (c *Client) runReader() error { - if *c.protocol == TransportUDP || *c.protocol == TransportUDPMulticast { - for { - var res base.Response - err := res.Read(c.br) - if err != nil { - return err - } - } - } else { - var processFunc func(int, bool, []byte) - - if c.state == clientStatePlay { - processFunc = func(trackID int, isRTP bool, payload []byte) { - now := time.Now() - atomic.StoreInt64(&c.tcpLastFrameTime, now.Unix()) - - if isRTP { - c.tracks[trackID].rtcpReceiver.ProcessPacketRTP(now, payload) - c.OnPacketRTP(trackID, payload) - } else { - c.tracks[trackID].rtcpReceiver.ProcessPacketRTCP(now, payload) - c.OnPacketRTCP(trackID, payload) +func (c *Client) runReader() { + c.readerErr <- func() error { + if *c.protocol == TransportUDP || *c.protocol == TransportUDPMulticast { + for { + var res base.Response + err := res.Read(c.br) + if err != nil { + return err } } } else { - processFunc = func(trackID int, isRTP bool, payload []byte) { - if !isRTP { - c.OnPacketRTCP(trackID, payload) + var processFunc func(int, bool, []byte) + + if c.state == clientStatePlay { + processFunc = func(trackID int, isRTP bool, payload []byte) { + now := time.Now() + atomic.StoreInt64(&c.tcpLastFrameTime, now.Unix()) + + if isRTP { + c.tracks[trackID].rtcpReceiver.ProcessPacketRTP(now, payload) + c.OnPacketRTP(trackID, payload) + } else { + c.tracks[trackID].rtcpReceiver.ProcessPacketRTCP(now, payload) + c.OnPacketRTCP(trackID, payload) + } + } + } else { + processFunc = func(trackID int, isRTP bool, payload []byte) { + if !isRTP { + c.OnPacketRTCP(trackID, payload) + } + } + } + + frame := base.InterleavedFrame{} + res := base.Response{} + + for { + frame.Payload = c.tcpReadBuffer.Next() + what, err := base.ReadInterleavedFrameOrResponse(&frame, &res, c.br) + if err != nil { + return err + } + + if _, ok := what.(*base.InterleavedFrame); ok { + channel := frame.Channel + isRTP := true + if (channel % 2) != 0 { + channel-- + isRTP = false + } + + trackID, ok := c.tracksByChannel[channel] + if !ok { + continue + } + + processFunc(trackID, isRTP, frame.Payload) } } } - - frame := base.InterleavedFrame{} - res := base.Response{} - - for { - frame.Payload = c.tcpReadBuffer.Next() - what, err := base.ReadInterleavedFrameOrResponse(&frame, &res, c.br) - if err != nil { - return err - } - - if _, ok := what.(*base.InterleavedFrame); ok { - channel := frame.Channel - isRTP := true - if (channel % 2) != 0 { - channel-- - isRTP = false - } - - trackID, ok := c.tracksByChannel[channel] - if !ok { - continue - } - - processFunc(trackID, isRTP, frame.Payload) - } - } - } + }() } func (c *Client) playRecordStop(isClosing bool) { @@ -796,6 +815,11 @@ func (c *Client) playRecordStop(isClosing bool) { c.writeFrameAllowed = false c.writeMutex.Unlock() + // stop writer + c.writeBuffer.Close() + <-c.writerDone + c.writerRunning = false + // start connCloser if !isClosing { c.connCloserStart() @@ -1768,6 +1792,62 @@ func (c *Client) Seek(ra *headers.Range) (*base.Response, error) { return c.Play(ra) } +func (c *Client) runWriter() { + defer close(c.writerDone) + + var writeFunc func(int, bool, []byte) + + switch *c.protocol { + case TransportUDP, TransportUDPMulticast: + writeFunc = func(trackID int, isRTP bool, payload []byte) { + if isRTP { + if c.tracks[trackID].rtcpSender != nil { + c.tracks[trackID].rtcpSender.ProcessPacketRTP(time.Now(), payload) + } + + c.tracks[trackID].udpRTPListener.write(payload) + } else { + c.tracks[trackID].udpRTCPListener.write(payload) + } + } + + default: //TCP + writeFunc = func(trackID int, isRTP bool, payload []byte) { + if isRTP { + if c.tracks[trackID].rtcpSender != nil { + c.tracks[trackID].rtcpSender.ProcessPacketRTP(time.Now(), payload) + } + + f := c.tracks[trackID].tcpRTPFrame + f.Payload = payload + + c.tcpWriteMutex.Lock() + c.nconn.SetWriteDeadline(time.Now().Add(c.WriteTimeout)) + f.Write(c.bw) + c.tcpWriteMutex.Unlock() + } else { + f := c.tracks[trackID].tcpRTCPFrame + f.Payload = payload + + c.tcpWriteMutex.Lock() + c.nconn.SetWriteDeadline(time.Now().Add(c.WriteTimeout)) + f.Write(c.bw) + c.tcpWriteMutex.Unlock() + } + } + } + + for { + tmp, ok := c.writeBuffer.Pull() + if !ok { + return + } + data := tmp.(trackTypePayload) + + writeFunc(data.trackID, data.isRTP, data.payload) + } +} + // WritePacketRTP writes a RTP packet. func (c *Client) WritePacketRTP(trackID int, payload []byte) error { c.writeMutex.RLock() @@ -1782,27 +1862,12 @@ func (c *Client) WritePacketRTP(trackID int, payload []byte) error { } } - now := time.Now() - - if c.tracks[trackID].rtcpSender != nil { - c.tracks[trackID].rtcpSender.ProcessPacketRTP(now, payload) - } - - switch *c.protocol { - case TransportUDP, TransportUDPMulticast: - return c.tracks[trackID].udpRTPListener.write(payload) - - default: // TCP - f := c.tracks[trackID].tcpRTPFrame - - // a mutex is needed here since bufio.Writer is not thread safe. - c.tcpWriteMutex.Lock() - defer c.tcpWriteMutex.Unlock() - - c.nconn.SetWriteDeadline(now.Add(c.WriteTimeout)) - f.Payload = payload - return f.Write(c.bw) - } + c.writeBuffer.Push(trackTypePayload{ + trackID: trackID, + isRTP: true, + payload: payload, + }) + return nil } // WritePacketRTCP writes a RTCP packet. @@ -1819,21 +1884,10 @@ func (c *Client) WritePacketRTCP(trackID int, payload []byte) error { } } - now := time.Now() - - switch *c.protocol { - case TransportUDP, TransportUDPMulticast: - return c.tracks[trackID].udpRTCPListener.write(payload) - - default: // TCP - f := c.tracks[trackID].tcpRTCPFrame - - // a mutex is needed here since bufio.Writer is not thread safe. - c.tcpWriteMutex.Lock() - defer c.tcpWriteMutex.Unlock() - - c.nconn.SetWriteDeadline(now.Add(c.WriteTimeout)) - f.Payload = payload - return f.Write(c.bw) - } + c.writeBuffer.Push(trackTypePayload{ + trackID: trackID, + isRTP: false, + payload: payload, + }) + return nil }