mirror of
https://github.com/aler9/gortsplib
synced 2025-10-05 15:16:51 +08:00
add and use MultiBuffer
This commit is contained in:
@@ -20,12 +20,13 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
clientReadBufferSize = 4096
|
||||
clientWriteBufferSize = 4096
|
||||
clientReceiverReportPeriod = 10 * time.Second
|
||||
clientUDPCheckStreamPeriod = 5 * time.Second
|
||||
clientUDPKeepalivePeriod = 30 * time.Second
|
||||
clientTCPReadBufferSize = 128 * 1024
|
||||
clientReadBufferSize = 4096
|
||||
clientWriteBufferSize = 4096
|
||||
clientReceiverReportPeriod = 10 * time.Second
|
||||
clientUDPCheckStreamPeriod = 5 * time.Second
|
||||
clientUDPKeepalivePeriod = 30 * time.Second
|
||||
clientTCPFrameReadBufferSize = 128 * 1024
|
||||
clientUDPFrameReadBufferSize = 2048
|
||||
)
|
||||
|
||||
// ConnClientConf allows to configure a ConnClient.
|
||||
@@ -41,6 +42,12 @@ type ConnClientConf struct {
|
||||
// It defaults to 5 seconds
|
||||
WriteTimeout time.Duration
|
||||
|
||||
// (optional) read buffer count.
|
||||
// If greater than 1, allows to pass frames to other routines than the one
|
||||
// that is reading frames.
|
||||
// It defaults to 1
|
||||
ReadBufferCount int
|
||||
|
||||
// (optional) function used to initialize the TCP client.
|
||||
// It defaults to net.DialTimeout
|
||||
DialTimeout func(network, address string, timeout time.Duration) (net.Conn, error)
|
||||
@@ -65,6 +72,7 @@ type ConnClient struct {
|
||||
udpLastFrameTimes map[int]*int64
|
||||
udpRtpListeners map[int]*connClientUDPListener
|
||||
udpRtcpListeners map[int]*connClientUDPListener
|
||||
tcpFrameReadBuf *MultiBuffer
|
||||
playing bool
|
||||
|
||||
receiverReportTerminate chan struct{}
|
||||
@@ -79,6 +87,9 @@ func NewConnClient(conf ConnClientConf) (*ConnClient, error) {
|
||||
if conf.WriteTimeout == time.Duration(0) {
|
||||
conf.WriteTimeout = 5 * time.Second
|
||||
}
|
||||
if conf.ReadBufferCount == 0 {
|
||||
conf.ReadBufferCount = 1
|
||||
}
|
||||
if conf.DialTimeout == nil {
|
||||
conf.DialTimeout = net.DialTimeout
|
||||
}
|
||||
@@ -137,18 +148,22 @@ func (c *ConnClient) NetConn() net.Conn {
|
||||
}
|
||||
|
||||
// ReadFrame reads an InterleavedFrame.
|
||||
func (c *ConnClient) ReadFrame(frame *InterleavedFrame) error {
|
||||
func (c *ConnClient) ReadFrame() (*InterleavedFrame, error) {
|
||||
c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout))
|
||||
frame := &InterleavedFrame{
|
||||
Content: c.tcpFrameReadBuf.Next(),
|
||||
}
|
||||
err := frame.Read(c.br)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content)
|
||||
return nil
|
||||
|
||||
return frame, nil
|
||||
}
|
||||
|
||||
func (c *ConnClient) readFrameOrResponse(frame *InterleavedFrame) (interface{}, error) {
|
||||
func (c *ConnClient) readFrameOrResponse() (interface{}, error) {
|
||||
c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout))
|
||||
b, err := c.br.ReadByte()
|
||||
if err != nil {
|
||||
@@ -157,6 +172,9 @@ func (c *ConnClient) readFrameOrResponse(frame *InterleavedFrame) (interface{},
|
||||
c.br.UnreadByte()
|
||||
|
||||
if b == interleavedFrameMagicByte {
|
||||
frame := &InterleavedFrame{
|
||||
Content: c.tcpFrameReadBuf.Next(),
|
||||
}
|
||||
err := frame.Read(c.br)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -386,9 +404,6 @@ func (c *ConnClient) setup(u *url.URL, track *Track, ht *HeaderTransport) (*Resp
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// UDPReadFunc is a function used to read UDP packets.
|
||||
type UDPReadFunc func([]byte) (int, error)
|
||||
|
||||
// SetupUDP writes a SETUP request, that means that we want to read
|
||||
// a given track with the UDP transport. It then reads a Response.
|
||||
// If rtpPort and rtcpPort are zero, they will be chosen automatically.
|
||||
@@ -595,15 +610,12 @@ func (c *ConnClient) Play(u *url.URL) (*Response, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
frame := &InterleavedFrame{
|
||||
Content: make([]byte, 0, clientTCPReadBufferSize),
|
||||
}
|
||||
c.tcpFrameReadBuf = NewMultiBuffer(c.conf.ReadBufferCount, clientTCPFrameReadBufferSize)
|
||||
|
||||
// v4lrtspserver sends frames before the response.
|
||||
// ignore them and wait for the response.
|
||||
for {
|
||||
frame.Content = frame.Content[:cap(frame.Content)]
|
||||
recv, err := c.readFrameOrResponse(frame)
|
||||
recv, err := c.readFrameOrResponse()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@@ -98,10 +98,7 @@ func TestConnClientTCP(t *testing.T) {
|
||||
_, err = conn.Play(u)
|
||||
require.NoError(t, err)
|
||||
|
||||
frame := &InterleavedFrame{Content: make([]byte, 0, 128*1024)}
|
||||
frame.Content = frame.Content[:cap(frame.Content)]
|
||||
|
||||
err = conn.ReadFrame(frame)
|
||||
_, err = conn.ReadFrame()
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
@@ -155,7 +152,6 @@ func TestConnClientUDP(t *testing.T) {
|
||||
|
||||
go conn.LoopUDP(u)
|
||||
|
||||
buf := make([]byte, 2048)
|
||||
_, err = rtpReads[0](buf)
|
||||
_, err = rtpReads[0]()
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
@@ -7,14 +7,17 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// connClientUDPListener is a UDP listener created by SetupUDP() to receive UDP frames.
|
||||
// UDPReadFunc is a function used to read UDP packets.
|
||||
type UDPReadFunc func() ([]byte, error)
|
||||
|
||||
type connClientUDPListener struct {
|
||||
c *ConnClient
|
||||
pc net.PacketConn
|
||||
trackId int
|
||||
streamType StreamType
|
||||
publisherIp net.IP
|
||||
publisherPort int
|
||||
c *ConnClient
|
||||
pc net.PacketConn
|
||||
trackId int
|
||||
streamType StreamType
|
||||
publisherIp net.IP
|
||||
publisherPort int
|
||||
udpFrameReadBuf *MultiBuffer
|
||||
}
|
||||
|
||||
func newConnClientUDPListener(c *ConnClient, port int, trackId int, streamType StreamType) (*connClientUDPListener, error) {
|
||||
@@ -24,10 +27,11 @@ func newConnClientUDPListener(c *ConnClient, port int, trackId int, streamType S
|
||||
}
|
||||
|
||||
return &connClientUDPListener{
|
||||
c: c,
|
||||
pc: pc,
|
||||
trackId: trackId,
|
||||
streamType: streamType,
|
||||
c: c,
|
||||
pc: pc,
|
||||
trackId: trackId,
|
||||
streamType: streamType,
|
||||
udpFrameReadBuf: NewMultiBuffer(c.conf.ReadBufferCount, 2048),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -35,11 +39,12 @@ func (l *connClientUDPListener) close() {
|
||||
l.pc.Close()
|
||||
}
|
||||
|
||||
func (l *connClientUDPListener) read(buf []byte) (int, error) {
|
||||
func (l *connClientUDPListener) read() ([]byte, error) {
|
||||
for {
|
||||
buf := l.udpFrameReadBuf.Next()
|
||||
n, addr, err := l.pc.ReadFrom(buf)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
uaddr := addr.(*net.UDPAddr)
|
||||
@@ -52,6 +57,6 @@ func (l *connClientUDPListener) read(buf []byte) (int, error) {
|
||||
|
||||
l.c.rtcpReceivers[l.trackId].OnFrame(l.streamType, buf[:n])
|
||||
|
||||
return n, nil
|
||||
return buf[:n], nil
|
||||
}
|
||||
}
|
||||
|
@@ -23,13 +23,20 @@ type ConnServerConf struct {
|
||||
// (optional) timeout of write operations.
|
||||
// It defaults to 5 seconds
|
||||
WriteTimeout time.Duration
|
||||
|
||||
// (optional) read buffer count.
|
||||
// If greater than 1, allows to pass frames to other routines than the one
|
||||
// that is reading frames.
|
||||
// It defaults to 1
|
||||
ReadBufferCount int
|
||||
}
|
||||
|
||||
// ConnServer is a server-side RTSP connection.
|
||||
type ConnServer struct {
|
||||
conf ConnServerConf
|
||||
br *bufio.Reader
|
||||
bw *bufio.Writer
|
||||
conf ConnServerConf
|
||||
br *bufio.Reader
|
||||
bw *bufio.Writer
|
||||
tcpFrameReadBuf *MultiBuffer
|
||||
}
|
||||
|
||||
// NewConnServer allocates a ConnServer.
|
||||
@@ -40,11 +47,15 @@ func NewConnServer(conf ConnServerConf) *ConnServer {
|
||||
if conf.WriteTimeout == time.Duration(0) {
|
||||
conf.WriteTimeout = 5 * time.Second
|
||||
}
|
||||
if conf.ReadBufferCount == 0 {
|
||||
conf.ReadBufferCount = 1
|
||||
}
|
||||
|
||||
return &ConnServer{
|
||||
conf: conf,
|
||||
br: bufio.NewReaderSize(conf.Conn, serverReadBufferSize),
|
||||
bw: bufio.NewWriterSize(conf.Conn, serverWriteBufferSize),
|
||||
conf: conf,
|
||||
br: bufio.NewReaderSize(conf.Conn, serverReadBufferSize),
|
||||
bw: bufio.NewWriterSize(conf.Conn, serverWriteBufferSize),
|
||||
tcpFrameReadBuf: NewMultiBuffer(conf.ReadBufferCount, clientTCPFrameReadBufferSize),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -65,7 +76,7 @@ func (s *ConnServer) ReadRequest() (*Request, error) {
|
||||
}
|
||||
|
||||
// ReadFrameOrRequest reads an InterleavedFrame or a Request.
|
||||
func (s *ConnServer) ReadFrameOrRequest(frame *InterleavedFrame, timeout bool) (interface{}, error) {
|
||||
func (s *ConnServer) ReadFrameOrRequest(timeout bool) (interface{}, error) {
|
||||
if timeout {
|
||||
s.conf.Conn.SetReadDeadline(time.Now().Add(s.conf.ReadTimeout))
|
||||
}
|
||||
@@ -77,6 +88,9 @@ func (s *ConnServer) ReadFrameOrRequest(frame *InterleavedFrame, timeout bool) (
|
||||
s.br.UnreadByte()
|
||||
|
||||
if b == interleavedFrameMagicByte {
|
||||
frame := &InterleavedFrame{
|
||||
Content: s.tcpFrameReadBuf.Next(),
|
||||
}
|
||||
err := frame.Read(s.br)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@@ -43,12 +43,8 @@ func main() {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
frame := &gortsplib.InterleavedFrame{Content: make([]byte, 0, 128*1024)}
|
||||
|
||||
for {
|
||||
frame.Content = frame.Content[:cap(frame.Content)]
|
||||
|
||||
err := conn.ReadFrame(frame)
|
||||
frame, err := conn.ReadFrame()
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
fmt.Println("connection is closed (%s)", err)
|
||||
|
@@ -59,14 +59,13 @@ func main() {
|
||||
go func(trackId int, rtpRead gortsplib.UDPReadFunc) {
|
||||
defer wg.Done()
|
||||
|
||||
buf := make([]byte, 2048)
|
||||
for {
|
||||
n, err := rtpRead(buf)
|
||||
buf, err := rtpRead()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
fmt.Printf("frame from track %d, type RTP: %v\n", trackId, buf[:n])
|
||||
fmt.Printf("frame from track %d, type RTP: %v\n", trackId, buf)
|
||||
}
|
||||
}(trackId, rtpRead)
|
||||
}
|
||||
@@ -78,14 +77,13 @@ func main() {
|
||||
go func(trackId int, rtcpRead gortsplib.UDPReadFunc) {
|
||||
defer wg.Done()
|
||||
|
||||
buf := make([]byte, 2048)
|
||||
for {
|
||||
n, err := rtcpRead(buf)
|
||||
buf, err := rtcpRead()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
fmt.Printf("frame from track %d, type RTCP: %v\n", trackId, buf[:n])
|
||||
fmt.Printf("frame from track %d, type RTCP: %v\n", trackId, buf)
|
||||
}
|
||||
}(trackId, rtcpRead)
|
||||
}
|
||||
|
30
multibuffer.go
Normal file
30
multibuffer.go
Normal file
@@ -0,0 +1,30 @@
|
||||
package gortsplib
|
||||
|
||||
// MultiBuffer implements software multi buffering, that allows to reuse
|
||||
// existing buffers without creating new ones, increasing performance.
|
||||
type MultiBuffer struct {
|
||||
buffers [][]byte
|
||||
curBuf int
|
||||
}
|
||||
|
||||
// NewMultiBuffer allocates a MultiBuffer.
|
||||
func NewMultiBuffer(count int, size int) *MultiBuffer {
|
||||
buffers := make([][]byte, count)
|
||||
for i := 0; i < count; i++ {
|
||||
buffers[i] = make([]byte, size)
|
||||
}
|
||||
|
||||
return &MultiBuffer{
|
||||
buffers: buffers,
|
||||
}
|
||||
}
|
||||
|
||||
// Next gets the current buffer and sets the next buffer as the current one.
|
||||
func (mb *MultiBuffer) Next() []byte {
|
||||
ret := mb.buffers[mb.curBuf]
|
||||
mb.curBuf += 1
|
||||
if mb.curBuf >= len(mb.buffers) {
|
||||
mb.curBuf = 0
|
||||
}
|
||||
return ret
|
||||
}
|
Reference in New Issue
Block a user