diff --git a/clientconf.go b/clientconf.go index 0441fc35..842f6a79 100644 --- a/clientconf.go +++ b/clientconf.go @@ -60,7 +60,12 @@ type ClientConf struct { // If greater than 1, allows to pass buffers to routines different than the one // that is reading frames. // It defaults to 1. - ReadBufferCount uint64 + ReadBufferCount int + + // read buffer size. + // This must be touched only when the server reports problems about buffer sizes. + // It defaults to 2048. + ReadBufferSize int // callback called before every request. OnRequest func(req *base.Request) diff --git a/clientconn.go b/clientconn.go index 421563b8..3f72067b 100644 --- a/clientconn.go +++ b/clientconn.go @@ -27,13 +27,12 @@ import ( ) const ( - clientConnReadBufferSize = 4096 - clientConnWriteBufferSize = 4096 - clientConnReceiverReportPeriod = 10 * time.Second - clientConnSenderReportPeriod = 10 * time.Second - clientConnUDPCheckStreamPeriod = 5 * time.Second - clientConnUDPKeepalivePeriod = 30 * time.Second - clientConnTCPFrameReadBufferSize = 2048 + clientConnReadBufferSize = 4096 + clientConnWriteBufferSize = 4096 + clientConnReceiverReportPeriod = 10 * time.Second + clientConnSenderReportPeriod = 10 * time.Second + clientConnUDPCheckStreamPeriod = 5 * time.Second + clientConnUDPKeepalivePeriod = 30 * time.Second ) type clientConnState int @@ -112,6 +111,9 @@ func newClientConn(conf ClientConf, scheme string, host string) (*ClientConn, er if conf.ReadBufferCount == 0 { conf.ReadBufferCount = 1 } + if conf.ReadBufferSize == 0 { + conf.ReadBufferSize = 2048 + } if conf.DialTimeout == nil { conf.DialTimeout = net.DialTimeout } @@ -154,7 +156,7 @@ func newClientConn(conf ClientConf, scheme string, host string) (*ClientConn, er udpRTCPListeners: make(map[int]*clientConnUDPListener), rtcpReceivers: make(map[int]*rtcpreceiver.RTCPReceiver), udpLastFrameTimes: make(map[int]*int64), - tcpFrameBuffer: multibuffer.New(conf.ReadBufferCount, clientConnTCPFrameReadBufferSize), + tcpFrameBuffer: multibuffer.New(uint64(conf.ReadBufferCount), uint64(conf.ReadBufferSize)), rtcpSenders: make(map[int]*rtcpsender.RTCPSender), publishError: fmt.Errorf("not running"), }, nil diff --git a/clientconnudpl.go b/clientconnudpl.go index 84e63fd4..b52a9524 100644 --- a/clientconnudpl.go +++ b/clientconnudpl.go @@ -12,8 +12,6 @@ import ( const ( // use the same buffer size as gstreamer's rtspsrc clientConnUDPKernelReadBufferSize = 0x80000 - - clientConnUDPReadBufferSize = 2048 ) type clientConnUDPListener struct { @@ -44,7 +42,7 @@ func newClientConnUDPListener(c *ClientConn, port int) (*clientConnUDPListener, return &clientConnUDPListener{ c: c, pc: pc, - udpFrameBuffer: multibuffer.New(c.conf.ReadBufferCount, clientConnUDPReadBufferSize), + udpFrameBuffer: multibuffer.New(uint64(c.conf.ReadBufferCount), uint64(c.conf.ReadBufferSize)), }, nil } diff --git a/server.go b/server.go index 452eacf7..4ca83217 100644 --- a/server.go +++ b/server.go @@ -22,6 +22,9 @@ func newServer(conf ServerConf, address string) (*Server, error) { if conf.ReadBufferCount == 0 { conf.ReadBufferCount = 512 } + if conf.ReadBufferSize == 0 { + conf.ReadBufferSize = 2048 + } if conf.Listen == nil { conf.Listen = net.Listen } diff --git a/serverconf.go b/serverconf.go index 53c6029b..3d4a6c11 100644 --- a/serverconf.go +++ b/serverconf.go @@ -42,7 +42,12 @@ type ServerConf struct { // It also allows to buffer routed frames and mitigate network fluctuations // that are particularly high when using UDP. // It defaults to 512 - ReadBufferCount uint64 + ReadBufferCount int + + // read buffer size. + // This must be touched only when the server reports problems about buffer sizes. + // It defaults to 2048. + ReadBufferSize int // function used to initialize the TCP listener. // It defaults to net.Listen diff --git a/serverconn.go b/serverconn.go index ef625a21..7e1edc35 100644 --- a/serverconn.go +++ b/serverconn.go @@ -22,7 +22,6 @@ const ( serverConnWriteBufferSize = 4096 serverConnCheckStreamInterval = 5 * time.Second serverConnReceiverReportInterval = 10 * time.Second - serverConnTCPFrameReadBufferSize = 2048 ) // server errors. @@ -161,7 +160,7 @@ func newServerConn(conf ServerConf, nconn net.Conn) *ServerConn { nconn: nconn, br: bufio.NewReaderSize(conn, serverConnReadBufferSize), bw: bufio.NewWriterSize(conn, serverConnWriteBufferSize), - frameRingBuffer: ringbuffer.New(conf.ReadBufferCount), + frameRingBuffer: ringbuffer.New(uint64(conf.ReadBufferCount)), backgroundWriteDone: make(chan struct{}), terminate: make(chan struct{}), } @@ -791,13 +790,13 @@ func (sc *ServerConn) backgroundRead() error { sc.framesEnabled = true if sc.state == ServerConnStateRecord { - tcpFrameBuffer = multibuffer.New(sc.conf.ReadBufferCount, serverConnTCPFrameReadBufferSize) + tcpFrameBuffer = multibuffer.New(uint64(sc.conf.ReadBufferCount), uint64(sc.conf.ReadBufferSize)) } else { // when playing, tcpFrameBuffer is only used to receive RTCP receiver reports, // that are much smaller than RTP frames and are sent at a fixed interval // (about 2 frames every 10 secs). // decrease RAM consumption by allocating less buffers. - tcpFrameBuffer = multibuffer.New(8, serverConnTCPFrameReadBufferSize) + tcpFrameBuffer = multibuffer.New(8, uint64(sc.conf.ReadBufferSize)) } // write response before frames diff --git a/serverudpl.go b/serverudpl.go index bde94b14..a95f571e 100644 --- a/serverudpl.go +++ b/serverudpl.go @@ -12,7 +12,6 @@ import ( const ( serverConnUDPListenerKernelReadBufferSize = 0x80000 // same as gstreamer's rtspsrc - serverConnUDPListenerReadBufferSize = 2048 ) type bufAddrPair struct { @@ -94,8 +93,8 @@ func (s *ServerUDPListener) initialize(conf ServerConf, streamType StreamType) { s.initialized = true s.streamType = streamType s.writeTimeout = conf.WriteTimeout - s.readBuf = multibuffer.New(conf.ReadBufferCount, serverConnUDPListenerReadBufferSize) - s.ringBuffer = ringbuffer.New(conf.ReadBufferCount) + s.readBuf = multibuffer.New(uint64(conf.ReadBufferCount), uint64(conf.ReadBufferSize)) + s.ringBuffer = ringbuffer.New(uint64(conf.ReadBufferCount)) go s.run() }