add WriteBufferCount; set both ReadBufferCount and WriteBufferCount to 256

This commit is contained in:
aler9
2022-02-19 15:09:00 +01:00
parent 86fb4181c7
commit 0515539831
4 changed files with 30 additions and 8 deletions

View File

@@ -174,6 +174,10 @@ type Client struct {
// This must be touched only when the server reports errors about buffer sizes. // This must be touched only when the server reports errors about buffer sizes.
// It defaults to 2048. // It defaults to 2048.
ReadBufferSize int ReadBufferSize int
// write buffer count.
// It allows to queue packets before sending them.
// It defaults to 8.
WriteBufferCount int
// //
// system functions // system functions
@@ -268,11 +272,14 @@ func (c *Client) Start(scheme string, host string) error {
c.InitialUDPReadTimeout = 3 * time.Second c.InitialUDPReadTimeout = 3 * time.Second
} }
if c.ReadBufferCount == 0 { if c.ReadBufferCount == 0 {
c.ReadBufferCount = 1 c.ReadBufferCount = 256
} }
if c.ReadBufferSize == 0 { if c.ReadBufferSize == 0 {
c.ReadBufferSize = 2048 c.ReadBufferSize = 2048
} }
if c.WriteBufferCount == 0 {
c.WriteBufferCount = 256
}
// system functions // system functions
if c.DialContext == nil { if c.DialContext == nil {
@@ -687,7 +694,7 @@ func (c *Client) playRecordStart() {
// decrease RAM consumption by allocating less buffers. // decrease RAM consumption by allocating less buffers.
c.writeBuffer = ringbuffer.New(8) c.writeBuffer = ringbuffer.New(8)
} else { } else {
c.writeBuffer = ringbuffer.New(uint64(c.ReadBufferCount)) c.writeBuffer = ringbuffer.New(uint64(c.WriteBufferCount))
} }
c.writerRunning = true c.writerRunning = true
c.writerDone = make(chan struct{}) c.writerDone = make(chan struct{})
@@ -748,9 +755,12 @@ func (c *Client) runReader() {
} }
} }
} else { } else {
var tcpReadBuffer *multibuffer.MultiBuffer
var processFunc func(int, bool, []byte) var processFunc func(int, bool, []byte)
if c.state == clientStatePlay { if c.state == clientStatePlay {
tcpReadBuffer = multibuffer.New(uint64(c.ReadBufferCount), uint64(c.ReadBufferSize))
tcpRTPPacketBuffer := newRTPPacketMultiBuffer(uint64(c.ReadBufferCount)) tcpRTPPacketBuffer := newRTPPacketMultiBuffer(uint64(c.ReadBufferCount))
processFunc = func(trackID int, isRTP bool, payload []byte) { processFunc = func(trackID int, isRTP bool, payload []byte) {
@@ -779,6 +789,11 @@ func (c *Client) runReader() {
} }
} }
} else { } else {
// when recording, tcpReadBuffer is only used to receive RTCP receiver reports,
// that are much smaller than RTP packets and are sent at a fixed interval.
// decrease RAM consumption by allocating less buffers.
tcpReadBuffer = multibuffer.New(8, uint64(c.ReadBufferSize))
processFunc = func(trackID int, isRTP bool, payload []byte) { processFunc = func(trackID int, isRTP bool, payload []byte) {
if !isRTP { if !isRTP {
packets, err := rtcp.Unmarshal(payload) packets, err := rtcp.Unmarshal(payload)
@@ -793,7 +808,6 @@ func (c *Client) runReader() {
} }
} }
tcpReadBuffer := multibuffer.New(uint64(c.ReadBufferCount), uint64(c.ReadBufferSize))
var frame base.InterleavedFrame var frame base.InterleavedFrame
var res base.Response var res base.Response
@@ -845,6 +859,7 @@ func (c *Client) playRecordStop(isClosing bool) {
c.writeBuffer.Close() c.writeBuffer.Close()
<-c.writerDone <-c.writerDone
c.writerRunning = false c.writerRunning = false
c.writeBuffer = nil
// start connCloser // start connCloser
if !isClosing { if !isClosing {

View File

@@ -112,13 +112,17 @@ type Server struct {
// If greater than 1, allows to pass buffers to routines different than the one // If greater than 1, allows to pass buffers to routines different than the one
// that is reading frames. // that is reading frames.
// It also allows to buffer routed frames and mitigate network fluctuations // It also allows to buffer routed frames and mitigate network fluctuations
// that are particularly high when using UDP. // that are particularly relevant when using UDP.
// It defaults to 512 // It defaults to 256.
ReadBufferCount int ReadBufferCount int
// read buffer size. // read buffer size.
// This must be touched only when the server reports errors about buffer sizes. // This must be touched only when the server reports errors about buffer sizes.
// It defaults to 2048. // It defaults to 2048.
ReadBufferSize int ReadBufferSize int
// write buffer count.
// It allows to queue packets before sending them.
// It defaults to 256.
WriteBufferCount int
// //
// system functions // system functions
@@ -170,11 +174,14 @@ func (s *Server) Start() error {
s.WriteTimeout = 10 * time.Second s.WriteTimeout = 10 * time.Second
} }
if s.ReadBufferCount == 0 { if s.ReadBufferCount == 0 {
s.ReadBufferCount = 512 s.ReadBufferCount = 256
} }
if s.ReadBufferSize == 0 { if s.ReadBufferSize == 0 {
s.ReadBufferSize = 2048 s.ReadBufferSize = 2048
} }
if s.WriteBufferCount == 0 {
s.WriteBufferCount = 256
}
// system functions // system functions
if s.Listen == nil { if s.Listen == nil {

View File

@@ -29,7 +29,7 @@ func newServerMulticastHandler(s *Server) (*serverMulticastHandler, error) {
h := &serverMulticastHandler{ h := &serverMulticastHandler{
rtpl: rtpl, rtpl: rtpl,
rtcpl: rtcpl, rtcpl: rtcpl,
writeBuffer: ringbuffer.New(uint64(s.ReadBufferCount)), writeBuffer: ringbuffer.New(uint64(s.WriteBufferCount)),
writerDone: make(chan struct{}), writerDone: make(chan struct{}),
} }

View File

@@ -831,7 +831,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
// inside the callback. // inside the callback.
if ss.state != ServerSessionStatePlay && if ss.state != ServerSessionStatePlay &&
*ss.setuppedTransport != TransportUDPMulticast { *ss.setuppedTransport != TransportUDPMulticast {
ss.writeBuffer = ringbuffer.New(uint64(ss.s.ReadBufferCount)) ss.writeBuffer = ringbuffer.New(uint64(ss.s.WriteBufferCount))
} }
res, err := sc.s.Handler.(ServerHandlerOnPlay).OnPlay(&ServerHandlerOnPlayCtx{ res, err := sc.s.Handler.(ServerHandlerOnPlay).OnPlay(&ServerHandlerOnPlayCtx{