From 6f69d9dabd44eacdf190d70784c496568f4dc966 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sun, 20 Sep 2020 13:27:46 +0200 Subject: [PATCH] temp --- connclient.go | 49 ++++++++++++++++++++++++++---------------- connclientudpl.go | 13 +++++++---- examples/client-tcp.go | 6 +----- examples/client-udp.go | 10 ++++----- multibuffer.go | 30 ++++++++++++++++++++++++++ 5 files changed, 74 insertions(+), 34 deletions(-) create mode 100644 multibuffer.go diff --git a/connclient.go b/connclient.go index 248d17d7..b733b5c7 100644 --- a/connclient.go +++ b/connclient.go @@ -19,12 +19,13 @@ import ( ) const ( - clientReadBufferSize = 4096 - clientWriteBufferSize = 4096 - clientReceiverReportPeriod = 10 * time.Second - clientUDPCheckStreamPeriod = 5 * time.Second - clientUDPKeepalivePeriod = 30 * time.Second - clientTCPReadBufferSize = 128 * 1024 + clientReadBufferSize = 4096 + clientWriteBufferSize = 4096 + clientReceiverReportPeriod = 10 * time.Second + clientUDPCheckStreamPeriod = 5 * time.Second + clientUDPKeepalivePeriod = 30 * time.Second + clientTCPFrameReadBufferSize = 128 * 1024 + clientUDPFrameReadBufferSize = 2048 ) // ConnClientConf allows to configure a ConnClient. @@ -47,6 +48,12 @@ type ConnClientConf struct { // (optional) function used to initialize UDP listeners. // It defaults to net.ListenPacket ListenPacket func(network, address string) (net.PacketConn, error) + + // (optional) read buffer count. + // If greater than 1, allows to pass frames to other routines than the one + // that is reading frames. + // It defaults to 1 + ReadBufferCount int } // ConnClient is a client-side RTSP connection. @@ -64,6 +71,7 @@ type ConnClient struct { udpLastFrameTimes map[int]*int64 udpRtpListeners map[int]*connClientUDPListener udpRtcpListeners map[int]*connClientUDPListener + tcpFrameReadBuf *MultiBuffer receiverReportTerminate chan struct{} receiverReportDone chan struct{} @@ -83,6 +91,9 @@ func NewConnClient(conf ConnClientConf) (*ConnClient, error) { if conf.ListenPacket == nil { conf.ListenPacket = net.ListenPacket } + if conf.ReadBufferCount == 0 { + conf.ReadBufferCount = 1 + } nconn, err := conf.DialTimeout("tcp", conf.Host, conf.ReadTimeout) if err != nil { @@ -98,6 +109,7 @@ func NewConnClient(conf ConnClientConf) (*ConnClient, error) { udpLastFrameTimes: make(map[int]*int64), udpRtpListeners: make(map[int]*connClientUDPListener), udpRtcpListeners: make(map[int]*connClientUDPListener), + tcpFrameReadBuf: NewMultiBuffer(conf.ReadBufferCount, clientTCPFrameReadBufferSize), }, nil } @@ -135,18 +147,22 @@ func (c *ConnClient) NetConn() net.Conn { } // ReadFrame reads an InterleavedFrame. -func (c *ConnClient) ReadFrame(frame *InterleavedFrame) error { +func (c *ConnClient) ReadFrame() (*InterleavedFrame, error) { c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout)) + frame := &InterleavedFrame{ + Content: c.tcpFrameReadBuf.Next(), + } err := frame.Read(c.br) if err != nil { - return err + return nil, err } c.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content) - return nil + + return frame, nil } -func (c *ConnClient) readFrameOrResponse(frame *InterleavedFrame) (interface{}, error) { +func (c *ConnClient) readFrameOrResponse() (interface{}, error) { c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout)) b, err := c.br.ReadByte() if err != nil { @@ -155,6 +171,9 @@ func (c *ConnClient) readFrameOrResponse(frame *InterleavedFrame) (interface{}, c.br.UnreadByte() if b == interleavedFrameMagicByte { + frame := &InterleavedFrame{ + Content: c.tcpFrameReadBuf.Next(), + } err := frame.Read(c.br) if err != nil { return nil, err @@ -376,9 +395,6 @@ func (c *ConnClient) setup(u *url.URL, track *Track, ht *HeaderTransport) (*Resp return res, nil } -// UDPReadFunc is a function used to read UDP packets. -type UDPReadFunc func([]byte) (int, error) - // SetupUDP writes a SETUP request, that means that we want to read // a given track with the UDP transport. It then reads a Response. func (c *ConnClient) SetupUDP(u *url.URL, track *Track, rtpPort int, @@ -533,15 +549,10 @@ func (c *ConnClient) Play(u *url.URL) (*Response, error) { return nil, err } - frame := &InterleavedFrame{ - Content: make([]byte, 0, clientTCPReadBufferSize), - } - // v4lrtspserver sends frames before the response. // ignore them and wait for the response. for { - frame.Content = frame.Content[:cap(frame.Content)] - recv, err := c.readFrameOrResponse(frame) + recv, err := c.readFrameOrResponse() if err != nil { return nil, err } diff --git a/connclientudpl.go b/connclientudpl.go index 6dc9b47f..b9e8570f 100644 --- a/connclientudpl.go +++ b/connclientudpl.go @@ -7,7 +7,9 @@ import ( "time" ) -// connClientUDPListener is a UDP listener created by SetupUDP() to receive UDP frames. +// UDPReadFunc is a function used to read UDP packets. +type UDPReadFunc func() ([]byte, error) + type connClientUDPListener struct { c *ConnClient pc net.PacketConn @@ -15,6 +17,7 @@ type connClientUDPListener struct { streamType StreamType publisherIp net.IP publisherPort int + udpFrameReadBuf *MultiBuffer } func newConnClientUDPListener(c *ConnClient, port int, trackId int, streamType StreamType) (*connClientUDPListener, error) { @@ -28,6 +31,7 @@ func newConnClientUDPListener(c *ConnClient, port int, trackId int, streamType S pc: pc, trackId: trackId, streamType: streamType, + udpFrameReadBuf: NewMultiBuffer(c.conf.ReadBufferCount, 2048), }, nil } @@ -35,11 +39,12 @@ func (l *connClientUDPListener) close() { l.pc.Close() } -func (l *connClientUDPListener) read(buf []byte) (int, error) { +func (l *connClientUDPListener) read() ([]byte, error) { for { + buf := l.udpFrameReadBuf.Next() n, addr, err := l.pc.ReadFrom(buf) if err != nil { - return 0, err + return nil, err } uaddr := addr.(*net.UDPAddr) @@ -52,6 +57,6 @@ func (l *connClientUDPListener) read(buf []byte) (int, error) { l.c.rtcpReceivers[l.trackId].OnFrame(l.streamType, buf[:n]) - return n, nil + return buf[:n], nil } } diff --git a/examples/client-tcp.go b/examples/client-tcp.go index 9196c5d9..5e22fc2e 100644 --- a/examples/client-tcp.go +++ b/examples/client-tcp.go @@ -43,12 +43,8 @@ func main() { panic(err) } - frame := &gortsplib.InterleavedFrame{Content: make([]byte, 0, 128*1024)} - for { - frame.Content = frame.Content[:cap(frame.Content)] - - err := conn.ReadFrame(frame) + frame, err := conn.ReadFrame() if err != nil { conn.Close() fmt.Println("connection is closed (%s)", err) diff --git a/examples/client-udp.go b/examples/client-udp.go index 673c53cb..bdbd28ff 100644 --- a/examples/client-udp.go +++ b/examples/client-udp.go @@ -59,14 +59,13 @@ func main() { go func(trackId int, rtpRead gortsplib.UDPReadFunc) { defer wg.Done() - buf := make([]byte, 2048) for { - n, err := rtpRead(buf) + buf, err := rtpRead() if err != nil { break } - fmt.Printf("frame from track %d, type RTP: %v\n", trackId, buf[:n]) + fmt.Printf("frame from track %d, type RTP: %v\n", trackId, buf) } }(trackId, rtpRead) } @@ -78,14 +77,13 @@ func main() { go func(trackId int, rtcpRead gortsplib.UDPReadFunc) { defer wg.Done() - buf := make([]byte, 2048) for { - n, err := rtcpRead(buf) + buf, err := rtcpRead() if err != nil { break } - fmt.Printf("frame from track %d, type RTCP: %v\n", trackId, buf[:n]) + fmt.Printf("frame from track %d, type RTCP: %v\n", trackId, buf) } }(trackId, rtcpRead) } diff --git a/multibuffer.go b/multibuffer.go new file mode 100644 index 00000000..31ce13d1 --- /dev/null +++ b/multibuffer.go @@ -0,0 +1,30 @@ +package gortsplib + +// MultiBuffer implements software multi buffering, that allows to reuse +// existing buffers without creating new ones, increasing performance. +type MultiBuffer struct { + buffers [][]byte + curBuf int +} + +// NewMultiBuffer allocates a MultiBuffer. +func NewMultiBuffer(count int, size int) *MultiBuffer { + buffers := make([][]byte, count) + for i := 0; i < count; i++ { + buffers[i] = make([]byte, size) + } + + return &MultiBuffer{ + buffers: buffers, + } +} + +// Next gets the current buffer and sets the next buffer as the current one. +func (mb *MultiBuffer) Next() []byte { + ret := mb.buffers[mb.curBuf] + mb.curBuf += 1 + if mb.curBuf >= len(mb.buffers) { + mb.curBuf = 0 + } + return ret +}