From e34b7d932695c85304d12cc1daf330b75c4728e9 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sun, 20 Sep 2020 14:48:37 +0200 Subject: [PATCH] improve performance --- connclient.go | 12 ++++-------- connserver.go | 20 +++++++++----------- multibuffer.go | 44 +++++++++++++++++++++++++++++++++++++++----- rtcpreceiver.go | 2 +- 4 files changed, 53 insertions(+), 25 deletions(-) diff --git a/connclient.go b/connclient.go index a42b78af..e976226d 100644 --- a/connclient.go +++ b/connclient.go @@ -72,7 +72,7 @@ type ConnClient struct { udpLastFrameTimes map[int]*int64 udpRtpListeners map[int]*connClientUDPListener udpRtcpListeners map[int]*connClientUDPListener - tcpFrameReadBuf *MultiBuffer + tcpFrames *multiFrame playing bool receiverReportTerminate chan struct{} @@ -150,9 +150,7 @@ func (c *ConnClient) NetConn() net.Conn { // ReadFrame reads an InterleavedFrame. func (c *ConnClient) ReadFrame() (*InterleavedFrame, error) { c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout)) - frame := &InterleavedFrame{ - Content: c.tcpFrameReadBuf.Next(), - } + frame := c.tcpFrames.next() err := frame.Read(c.br) if err != nil { return nil, err @@ -172,9 +170,7 @@ func (c *ConnClient) readFrameOrResponse() (interface{}, error) { c.br.UnreadByte() if b == interleavedFrameMagicByte { - frame := &InterleavedFrame{ - Content: c.tcpFrameReadBuf.Next(), - } + frame := c.tcpFrames.next() err := frame.Read(c.br) if err != nil { return nil, err @@ -610,7 +606,7 @@ func (c *ConnClient) Play(u *url.URL) (*Response, error) { return nil, err } - c.tcpFrameReadBuf = NewMultiBuffer(c.conf.ReadBufferCount, clientTCPFrameReadBufferSize) + c.tcpFrames = newMultiFrame(c.conf.ReadBufferCount, clientTCPFrameReadBufferSize) // v4lrtspserver sends frames before the response. // ignore them and wait for the response. diff --git a/connserver.go b/connserver.go index 45f812df..4c985f49 100644 --- a/connserver.go +++ b/connserver.go @@ -33,10 +33,10 @@ type ConnServerConf struct { // ConnServer is a server-side RTSP connection. type ConnServer struct { - conf ConnServerConf - br *bufio.Reader - bw *bufio.Writer - tcpFrameReadBuf *MultiBuffer + conf ConnServerConf + br *bufio.Reader + bw *bufio.Writer + tcpFrames *multiFrame } // NewConnServer allocates a ConnServer. @@ -52,10 +52,10 @@ func NewConnServer(conf ConnServerConf) *ConnServer { } return &ConnServer{ - conf: conf, - br: bufio.NewReaderSize(conf.Conn, serverReadBufferSize), - bw: bufio.NewWriterSize(conf.Conn, serverWriteBufferSize), - tcpFrameReadBuf: NewMultiBuffer(conf.ReadBufferCount, clientTCPFrameReadBufferSize), + conf: conf, + br: bufio.NewReaderSize(conf.Conn, serverReadBufferSize), + bw: bufio.NewWriterSize(conf.Conn, serverWriteBufferSize), + tcpFrames: newMultiFrame(conf.ReadBufferCount, clientTCPFrameReadBufferSize), } } @@ -88,9 +88,7 @@ func (s *ConnServer) ReadFrameOrRequest(timeout bool) (interface{}, error) { s.br.UnreadByte() if b == interleavedFrameMagicByte { - frame := &InterleavedFrame{ - Content: s.tcpFrameReadBuf.Next(), - } + frame := s.tcpFrames.next() err := frame.Read(s.br) if err != nil { return nil, err diff --git a/multibuffer.go b/multibuffer.go index 31ce13d1..7a332caf 100644 --- a/multibuffer.go +++ b/multibuffer.go @@ -3,8 +3,9 @@ package gortsplib // MultiBuffer implements software multi buffering, that allows to reuse // existing buffers without creating new ones, increasing performance. type MultiBuffer struct { + count int buffers [][]byte - curBuf int + cur int } // NewMultiBuffer allocates a MultiBuffer. @@ -15,16 +16,49 @@ func NewMultiBuffer(count int, size int) *MultiBuffer { } return &MultiBuffer{ + count: count, buffers: buffers, } } // Next gets the current buffer and sets the next buffer as the current one. func (mb *MultiBuffer) Next() []byte { - ret := mb.buffers[mb.curBuf] - mb.curBuf += 1 - if mb.curBuf >= len(mb.buffers) { - mb.curBuf = 0 + ret := mb.buffers[mb.cur] + mb.cur += 1 + if mb.cur >= mb.count { + mb.cur = 0 } return ret } + +type multiFrame struct { + count int + frames []*InterleavedFrame + cur int +} + +func newMultiFrame(count int, bufsize int) *multiFrame { + frames := make([]*InterleavedFrame, count) + for i := 0; i < count; i++ { + frames[i] = &InterleavedFrame{ + Content: make([]byte, 0, bufsize), + } + } + + return &multiFrame{ + count: count, + frames: frames, + } +} + +func (mf *multiFrame) next() *InterleavedFrame { + ret := mf.frames[mf.cur] + mf.cur += 1 + if mf.cur >= mf.count { + mf.cur = 0 + } + + ret.Content = ret.Content[:cap(ret.Content)] + + return ret +} diff --git a/rtcpreceiver.go b/rtcpreceiver.go index f2333fb9..b5281d8d 100644 --- a/rtcpreceiver.go +++ b/rtcpreceiver.go @@ -56,7 +56,7 @@ func (rr *RtcpReceiver) OnFrame(streamType StreamType, buf []byte) { } else { // we can afford to unmarshal all RTCP frames - // since they are sent with a frequency much lower than the one of the RTP frames + // since they are sent with a frequency much lower than the one of RTP frames frames, err := rtcp.Unmarshal(buf) if err == nil { for _, frame := range frames {