diff --git a/connclient.go b/connclient.go index 505f83d3..a42b78af 100644 --- a/connclient.go +++ b/connclient.go @@ -20,12 +20,13 @@ import ( ) const ( - clientReadBufferSize = 4096 - clientWriteBufferSize = 4096 - clientReceiverReportPeriod = 10 * time.Second - clientUDPCheckStreamPeriod = 5 * time.Second - clientUDPKeepalivePeriod = 30 * time.Second - clientTCPReadBufferSize = 128 * 1024 + clientReadBufferSize = 4096 + clientWriteBufferSize = 4096 + clientReceiverReportPeriod = 10 * time.Second + clientUDPCheckStreamPeriod = 5 * time.Second + clientUDPKeepalivePeriod = 30 * time.Second + clientTCPFrameReadBufferSize = 128 * 1024 + clientUDPFrameReadBufferSize = 2048 ) // ConnClientConf allows to configure a ConnClient. @@ -41,6 +42,12 @@ type ConnClientConf struct { // It defaults to 5 seconds WriteTimeout time.Duration + // (optional) read buffer count. + // If greater than 1, allows to pass frames to other routines than the one + // that is reading frames. + // It defaults to 1 + ReadBufferCount int + // (optional) function used to initialize the TCP client. // It defaults to net.DialTimeout DialTimeout func(network, address string, timeout time.Duration) (net.Conn, error) @@ -65,6 +72,7 @@ type ConnClient struct { udpLastFrameTimes map[int]*int64 udpRtpListeners map[int]*connClientUDPListener udpRtcpListeners map[int]*connClientUDPListener + tcpFrameReadBuf *MultiBuffer playing bool receiverReportTerminate chan struct{} @@ -79,6 +87,9 @@ func NewConnClient(conf ConnClientConf) (*ConnClient, error) { if conf.WriteTimeout == time.Duration(0) { conf.WriteTimeout = 5 * time.Second } + if conf.ReadBufferCount == 0 { + conf.ReadBufferCount = 1 + } if conf.DialTimeout == nil { conf.DialTimeout = net.DialTimeout } @@ -137,18 +148,22 @@ func (c *ConnClient) NetConn() net.Conn { } // ReadFrame reads an InterleavedFrame. -func (c *ConnClient) ReadFrame(frame *InterleavedFrame) error { +func (c *ConnClient) ReadFrame() (*InterleavedFrame, error) { c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout)) + frame := &InterleavedFrame{ + Content: c.tcpFrameReadBuf.Next(), + } err := frame.Read(c.br) if err != nil { - return err + return nil, err } c.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content) - return nil + + return frame, nil } -func (c *ConnClient) readFrameOrResponse(frame *InterleavedFrame) (interface{}, error) { +func (c *ConnClient) readFrameOrResponse() (interface{}, error) { c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout)) b, err := c.br.ReadByte() if err != nil { @@ -157,6 +172,9 @@ func (c *ConnClient) readFrameOrResponse(frame *InterleavedFrame) (interface{}, c.br.UnreadByte() if b == interleavedFrameMagicByte { + frame := &InterleavedFrame{ + Content: c.tcpFrameReadBuf.Next(), + } err := frame.Read(c.br) if err != nil { return nil, err @@ -386,9 +404,6 @@ func (c *ConnClient) setup(u *url.URL, track *Track, ht *HeaderTransport) (*Resp return res, nil } -// UDPReadFunc is a function used to read UDP packets. -type UDPReadFunc func([]byte) (int, error) - // SetupUDP writes a SETUP request, that means that we want to read // a given track with the UDP transport. It then reads a Response. // If rtpPort and rtcpPort are zero, they will be chosen automatically. @@ -595,15 +610,12 @@ func (c *ConnClient) Play(u *url.URL) (*Response, error) { return nil, err } - frame := &InterleavedFrame{ - Content: make([]byte, 0, clientTCPReadBufferSize), - } + c.tcpFrameReadBuf = NewMultiBuffer(c.conf.ReadBufferCount, clientTCPFrameReadBufferSize) // v4lrtspserver sends frames before the response. // ignore them and wait for the response. for { - frame.Content = frame.Content[:cap(frame.Content)] - recv, err := c.readFrameOrResponse(frame) + recv, err := c.readFrameOrResponse() if err != nil { return nil, err } diff --git a/connclient_test.go b/connclient_test.go index 42ee44de..85d5ce8e 100644 --- a/connclient_test.go +++ b/connclient_test.go @@ -98,10 +98,7 @@ func TestConnClientTCP(t *testing.T) { _, err = conn.Play(u) require.NoError(t, err) - frame := &InterleavedFrame{Content: make([]byte, 0, 128*1024)} - frame.Content = frame.Content[:cap(frame.Content)] - - err = conn.ReadFrame(frame) + _, err = conn.ReadFrame() require.NoError(t, err) } @@ -155,7 +152,6 @@ func TestConnClientUDP(t *testing.T) { go conn.LoopUDP(u) - buf := make([]byte, 2048) - _, err = rtpReads[0](buf) + _, err = rtpReads[0]() require.NoError(t, err) } diff --git a/connclientudpl.go b/connclientudpl.go index 6dc9b47f..39e325af 100644 --- a/connclientudpl.go +++ b/connclientudpl.go @@ -7,14 +7,17 @@ import ( "time" ) -// connClientUDPListener is a UDP listener created by SetupUDP() to receive UDP frames. +// UDPReadFunc is a function used to read UDP packets. +type UDPReadFunc func() ([]byte, error) + type connClientUDPListener struct { - c *ConnClient - pc net.PacketConn - trackId int - streamType StreamType - publisherIp net.IP - publisherPort int + c *ConnClient + pc net.PacketConn + trackId int + streamType StreamType + publisherIp net.IP + publisherPort int + udpFrameReadBuf *MultiBuffer } func newConnClientUDPListener(c *ConnClient, port int, trackId int, streamType StreamType) (*connClientUDPListener, error) { @@ -24,10 +27,11 @@ func newConnClientUDPListener(c *ConnClient, port int, trackId int, streamType S } return &connClientUDPListener{ - c: c, - pc: pc, - trackId: trackId, - streamType: streamType, + c: c, + pc: pc, + trackId: trackId, + streamType: streamType, + udpFrameReadBuf: NewMultiBuffer(c.conf.ReadBufferCount, 2048), }, nil } @@ -35,11 +39,12 @@ func (l *connClientUDPListener) close() { l.pc.Close() } -func (l *connClientUDPListener) read(buf []byte) (int, error) { +func (l *connClientUDPListener) read() ([]byte, error) { for { + buf := l.udpFrameReadBuf.Next() n, addr, err := l.pc.ReadFrom(buf) if err != nil { - return 0, err + return nil, err } uaddr := addr.(*net.UDPAddr) @@ -52,6 +57,6 @@ func (l *connClientUDPListener) read(buf []byte) (int, error) { l.c.rtcpReceivers[l.trackId].OnFrame(l.streamType, buf[:n]) - return n, nil + return buf[:n], nil } } diff --git a/connserver.go b/connserver.go index a39ec04e..45f812df 100644 --- a/connserver.go +++ b/connserver.go @@ -23,13 +23,20 @@ type ConnServerConf struct { // (optional) timeout of write operations. // It defaults to 5 seconds WriteTimeout time.Duration + + // (optional) read buffer count. + // If greater than 1, allows to pass frames to other routines than the one + // that is reading frames. + // It defaults to 1 + ReadBufferCount int } // ConnServer is a server-side RTSP connection. type ConnServer struct { - conf ConnServerConf - br *bufio.Reader - bw *bufio.Writer + conf ConnServerConf + br *bufio.Reader + bw *bufio.Writer + tcpFrameReadBuf *MultiBuffer } // NewConnServer allocates a ConnServer. @@ -40,11 +47,15 @@ func NewConnServer(conf ConnServerConf) *ConnServer { if conf.WriteTimeout == time.Duration(0) { conf.WriteTimeout = 5 * time.Second } + if conf.ReadBufferCount == 0 { + conf.ReadBufferCount = 1 + } return &ConnServer{ - conf: conf, - br: bufio.NewReaderSize(conf.Conn, serverReadBufferSize), - bw: bufio.NewWriterSize(conf.Conn, serverWriteBufferSize), + conf: conf, + br: bufio.NewReaderSize(conf.Conn, serverReadBufferSize), + bw: bufio.NewWriterSize(conf.Conn, serverWriteBufferSize), + tcpFrameReadBuf: NewMultiBuffer(conf.ReadBufferCount, clientTCPFrameReadBufferSize), } } @@ -65,7 +76,7 @@ func (s *ConnServer) ReadRequest() (*Request, error) { } // ReadFrameOrRequest reads an InterleavedFrame or a Request. -func (s *ConnServer) ReadFrameOrRequest(frame *InterleavedFrame, timeout bool) (interface{}, error) { +func (s *ConnServer) ReadFrameOrRequest(timeout bool) (interface{}, error) { if timeout { s.conf.Conn.SetReadDeadline(time.Now().Add(s.conf.ReadTimeout)) } @@ -77,6 +88,9 @@ func (s *ConnServer) ReadFrameOrRequest(frame *InterleavedFrame, timeout bool) ( s.br.UnreadByte() if b == interleavedFrameMagicByte { + frame := &InterleavedFrame{ + Content: s.tcpFrameReadBuf.Next(), + } err := frame.Read(s.br) if err != nil { return nil, err diff --git a/examples/read-tcp.go b/examples/read-tcp.go index 9196c5d9..5e22fc2e 100644 --- a/examples/read-tcp.go +++ b/examples/read-tcp.go @@ -43,12 +43,8 @@ func main() { panic(err) } - frame := &gortsplib.InterleavedFrame{Content: make([]byte, 0, 128*1024)} - for { - frame.Content = frame.Content[:cap(frame.Content)] - - err := conn.ReadFrame(frame) + frame, err := conn.ReadFrame() if err != nil { conn.Close() fmt.Println("connection is closed (%s)", err) diff --git a/examples/read-udp.go b/examples/read-udp.go index 359d6093..c0799c99 100644 --- a/examples/read-udp.go +++ b/examples/read-udp.go @@ -59,14 +59,13 @@ func main() { go func(trackId int, rtpRead gortsplib.UDPReadFunc) { defer wg.Done() - buf := make([]byte, 2048) for { - n, err := rtpRead(buf) + buf, err := rtpRead() if err != nil { break } - fmt.Printf("frame from track %d, type RTP: %v\n", trackId, buf[:n]) + fmt.Printf("frame from track %d, type RTP: %v\n", trackId, buf) } }(trackId, rtpRead) } @@ -78,14 +77,13 @@ func main() { go func(trackId int, rtcpRead gortsplib.UDPReadFunc) { defer wg.Done() - buf := make([]byte, 2048) for { - n, err := rtcpRead(buf) + buf, err := rtcpRead() if err != nil { break } - fmt.Printf("frame from track %d, type RTCP: %v\n", trackId, buf[:n]) + fmt.Printf("frame from track %d, type RTCP: %v\n", trackId, buf) } }(trackId, rtcpRead) } diff --git a/multibuffer.go b/multibuffer.go new file mode 100644 index 00000000..31ce13d1 --- /dev/null +++ b/multibuffer.go @@ -0,0 +1,30 @@ +package gortsplib + +// MultiBuffer implements software multi buffering, that allows to reuse +// existing buffers without creating new ones, increasing performance. +type MultiBuffer struct { + buffers [][]byte + curBuf int +} + +// NewMultiBuffer allocates a MultiBuffer. +func NewMultiBuffer(count int, size int) *MultiBuffer { + buffers := make([][]byte, count) + for i := 0; i < count; i++ { + buffers[i] = make([]byte, size) + } + + return &MultiBuffer{ + buffers: buffers, + } +} + +// Next gets the current buffer and sets the next buffer as the current one. +func (mb *MultiBuffer) Next() []byte { + ret := mb.buffers[mb.curBuf] + mb.curBuf += 1 + if mb.curBuf >= len(mb.buffers) { + mb.curBuf = 0 + } + return ret +}