diff --git a/client.go b/client.go index c63beb9b..2ad7a314 100644 --- a/client.go +++ b/client.go @@ -174,6 +174,10 @@ type Client struct { // This must be touched only when the server reports errors about buffer sizes. // It defaults to 2048. ReadBufferSize int + // write buffer count. + // It allows to queue packets before sending them. + // It defaults to 8. + WriteBufferCount int // // system functions @@ -268,11 +272,14 @@ func (c *Client) Start(scheme string, host string) error { c.InitialUDPReadTimeout = 3 * time.Second } if c.ReadBufferCount == 0 { - c.ReadBufferCount = 1 + c.ReadBufferCount = 256 } if c.ReadBufferSize == 0 { c.ReadBufferSize = 2048 } + if c.WriteBufferCount == 0 { + c.WriteBufferCount = 256 + } // system functions if c.DialContext == nil { @@ -687,7 +694,7 @@ func (c *Client) playRecordStart() { // decrease RAM consumption by allocating less buffers. c.writeBuffer = ringbuffer.New(8) } else { - c.writeBuffer = ringbuffer.New(uint64(c.ReadBufferCount)) + c.writeBuffer = ringbuffer.New(uint64(c.WriteBufferCount)) } c.writerRunning = true c.writerDone = make(chan struct{}) @@ -748,9 +755,12 @@ func (c *Client) runReader() { } } } else { + var tcpReadBuffer *multibuffer.MultiBuffer var processFunc func(int, bool, []byte) if c.state == clientStatePlay { + tcpReadBuffer = multibuffer.New(uint64(c.ReadBufferCount), uint64(c.ReadBufferSize)) + tcpRTPPacketBuffer := newRTPPacketMultiBuffer(uint64(c.ReadBufferCount)) processFunc = func(trackID int, isRTP bool, payload []byte) { @@ -779,6 +789,11 @@ func (c *Client) runReader() { } } } else { + // when recording, tcpReadBuffer is only used to receive RTCP receiver reports, + // that are much smaller than RTP packets and are sent at a fixed interval. + // decrease RAM consumption by allocating less buffers. + tcpReadBuffer = multibuffer.New(8, uint64(c.ReadBufferSize)) + processFunc = func(trackID int, isRTP bool, payload []byte) { if !isRTP { packets, err := rtcp.Unmarshal(payload) @@ -793,7 +808,6 @@ func (c *Client) runReader() { } } - tcpReadBuffer := multibuffer.New(uint64(c.ReadBufferCount), uint64(c.ReadBufferSize)) var frame base.InterleavedFrame var res base.Response @@ -845,6 +859,7 @@ func (c *Client) playRecordStop(isClosing bool) { c.writeBuffer.Close() <-c.writerDone c.writerRunning = false + c.writeBuffer = nil // start connCloser if !isClosing { diff --git a/server.go b/server.go index 621a3002..7d311e6c 100644 --- a/server.go +++ b/server.go @@ -112,13 +112,17 @@ type Server struct { // If greater than 1, allows to pass buffers to routines different than the one // that is reading frames. // It also allows to buffer routed frames and mitigate network fluctuations - // that are particularly high when using UDP. - // It defaults to 512 + // that are particularly relevant when using UDP. + // It defaults to 256. ReadBufferCount int // read buffer size. // This must be touched only when the server reports errors about buffer sizes. // It defaults to 2048. ReadBufferSize int + // write buffer count. + // It allows to queue packets before sending them. + // It defaults to 256. + WriteBufferCount int // // system functions @@ -170,11 +174,14 @@ func (s *Server) Start() error { s.WriteTimeout = 10 * time.Second } if s.ReadBufferCount == 0 { - s.ReadBufferCount = 512 + s.ReadBufferCount = 256 } if s.ReadBufferSize == 0 { s.ReadBufferSize = 2048 } + if s.WriteBufferCount == 0 { + s.WriteBufferCount = 256 + } // system functions if s.Listen == nil { diff --git a/servermulticasthandler.go b/servermulticasthandler.go index 5d4b015a..2b44438b 100644 --- a/servermulticasthandler.go +++ b/servermulticasthandler.go @@ -29,7 +29,7 @@ func newServerMulticastHandler(s *Server) (*serverMulticastHandler, error) { h := &serverMulticastHandler{ rtpl: rtpl, rtcpl: rtcpl, - writeBuffer: ringbuffer.New(uint64(s.ReadBufferCount)), + writeBuffer: ringbuffer.New(uint64(s.WriteBufferCount)), writerDone: make(chan struct{}), } diff --git a/serversession.go b/serversession.go index ec0f7f5b..dd34815f 100644 --- a/serversession.go +++ b/serversession.go @@ -831,7 +831,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base // inside the callback. if ss.state != ServerSessionStatePlay && *ss.setuppedTransport != TransportUDPMulticast { - ss.writeBuffer = ringbuffer.New(uint64(ss.s.ReadBufferCount)) + ss.writeBuffer = ringbuffer.New(uint64(ss.s.WriteBufferCount)) } res, err := sc.s.Handler.(ServerHandlerOnPlay).OnPlay(&ServerHandlerOnPlayCtx{