This commit is contained in:
aler9
2020-09-20 13:27:46 +02:00
parent 0fb858afd7
commit 6f69d9dabd
5 changed files with 74 additions and 34 deletions

View File

@@ -19,12 +19,13 @@ import (
) )
const ( const (
clientReadBufferSize = 4096 clientReadBufferSize = 4096
clientWriteBufferSize = 4096 clientWriteBufferSize = 4096
clientReceiverReportPeriod = 10 * time.Second clientReceiverReportPeriod = 10 * time.Second
clientUDPCheckStreamPeriod = 5 * time.Second clientUDPCheckStreamPeriod = 5 * time.Second
clientUDPKeepalivePeriod = 30 * time.Second clientUDPKeepalivePeriod = 30 * time.Second
clientTCPReadBufferSize = 128 * 1024 clientTCPFrameReadBufferSize = 128 * 1024
clientUDPFrameReadBufferSize = 2048
) )
// ConnClientConf allows to configure a ConnClient. // ConnClientConf allows to configure a ConnClient.
@@ -47,6 +48,12 @@ type ConnClientConf struct {
// (optional) function used to initialize UDP listeners. // (optional) function used to initialize UDP listeners.
// It defaults to net.ListenPacket // It defaults to net.ListenPacket
ListenPacket func(network, address string) (net.PacketConn, error) ListenPacket func(network, address string) (net.PacketConn, error)
// (optional) read buffer count.
// If greater than 1, allows to pass frames to other routines than the one
// that is reading frames.
// It defaults to 1
ReadBufferCount int
} }
// ConnClient is a client-side RTSP connection. // ConnClient is a client-side RTSP connection.
@@ -64,6 +71,7 @@ type ConnClient struct {
udpLastFrameTimes map[int]*int64 udpLastFrameTimes map[int]*int64
udpRtpListeners map[int]*connClientUDPListener udpRtpListeners map[int]*connClientUDPListener
udpRtcpListeners map[int]*connClientUDPListener udpRtcpListeners map[int]*connClientUDPListener
tcpFrameReadBuf *MultiBuffer
receiverReportTerminate chan struct{} receiverReportTerminate chan struct{}
receiverReportDone chan struct{} receiverReportDone chan struct{}
@@ -83,6 +91,9 @@ func NewConnClient(conf ConnClientConf) (*ConnClient, error) {
if conf.ListenPacket == nil { if conf.ListenPacket == nil {
conf.ListenPacket = net.ListenPacket conf.ListenPacket = net.ListenPacket
} }
if conf.ReadBufferCount == 0 {
conf.ReadBufferCount = 1
}
nconn, err := conf.DialTimeout("tcp", conf.Host, conf.ReadTimeout) nconn, err := conf.DialTimeout("tcp", conf.Host, conf.ReadTimeout)
if err != nil { if err != nil {
@@ -98,6 +109,7 @@ func NewConnClient(conf ConnClientConf) (*ConnClient, error) {
udpLastFrameTimes: make(map[int]*int64), udpLastFrameTimes: make(map[int]*int64),
udpRtpListeners: make(map[int]*connClientUDPListener), udpRtpListeners: make(map[int]*connClientUDPListener),
udpRtcpListeners: make(map[int]*connClientUDPListener), udpRtcpListeners: make(map[int]*connClientUDPListener),
tcpFrameReadBuf: NewMultiBuffer(conf.ReadBufferCount, clientTCPFrameReadBufferSize),
}, nil }, nil
} }
@@ -135,18 +147,22 @@ func (c *ConnClient) NetConn() net.Conn {
} }
// ReadFrame reads an InterleavedFrame. // ReadFrame reads an InterleavedFrame.
func (c *ConnClient) ReadFrame(frame *InterleavedFrame) error { func (c *ConnClient) ReadFrame() (*InterleavedFrame, error) {
c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout)) c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout))
frame := &InterleavedFrame{
Content: c.tcpFrameReadBuf.Next(),
}
err := frame.Read(c.br) err := frame.Read(c.br)
if err != nil { if err != nil {
return err return nil, err
} }
c.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content) c.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content)
return nil
return frame, nil
} }
func (c *ConnClient) readFrameOrResponse(frame *InterleavedFrame) (interface{}, error) { func (c *ConnClient) readFrameOrResponse() (interface{}, error) {
c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout)) c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout))
b, err := c.br.ReadByte() b, err := c.br.ReadByte()
if err != nil { if err != nil {
@@ -155,6 +171,9 @@ func (c *ConnClient) readFrameOrResponse(frame *InterleavedFrame) (interface{},
c.br.UnreadByte() c.br.UnreadByte()
if b == interleavedFrameMagicByte { if b == interleavedFrameMagicByte {
frame := &InterleavedFrame{
Content: c.tcpFrameReadBuf.Next(),
}
err := frame.Read(c.br) err := frame.Read(c.br)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -376,9 +395,6 @@ func (c *ConnClient) setup(u *url.URL, track *Track, ht *HeaderTransport) (*Resp
return res, nil return res, nil
} }
// UDPReadFunc is a function used to read UDP packets.
type UDPReadFunc func([]byte) (int, error)
// SetupUDP writes a SETUP request, that means that we want to read // SetupUDP writes a SETUP request, that means that we want to read
// a given track with the UDP transport. It then reads a Response. // a given track with the UDP transport. It then reads a Response.
func (c *ConnClient) SetupUDP(u *url.URL, track *Track, rtpPort int, func (c *ConnClient) SetupUDP(u *url.URL, track *Track, rtpPort int,
@@ -533,15 +549,10 @@ func (c *ConnClient) Play(u *url.URL) (*Response, error) {
return nil, err return nil, err
} }
frame := &InterleavedFrame{
Content: make([]byte, 0, clientTCPReadBufferSize),
}
// v4lrtspserver sends frames before the response. // v4lrtspserver sends frames before the response.
// ignore them and wait for the response. // ignore them and wait for the response.
for { for {
frame.Content = frame.Content[:cap(frame.Content)] recv, err := c.readFrameOrResponse()
recv, err := c.readFrameOrResponse(frame)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -7,7 +7,9 @@ import (
"time" "time"
) )
// connClientUDPListener is a UDP listener created by SetupUDP() to receive UDP frames. // UDPReadFunc is a function used to read UDP packets.
type UDPReadFunc func() ([]byte, error)
type connClientUDPListener struct { type connClientUDPListener struct {
c *ConnClient c *ConnClient
pc net.PacketConn pc net.PacketConn
@@ -15,6 +17,7 @@ type connClientUDPListener struct {
streamType StreamType streamType StreamType
publisherIp net.IP publisherIp net.IP
publisherPort int publisherPort int
udpFrameReadBuf *MultiBuffer
} }
func newConnClientUDPListener(c *ConnClient, port int, trackId int, streamType StreamType) (*connClientUDPListener, error) { func newConnClientUDPListener(c *ConnClient, port int, trackId int, streamType StreamType) (*connClientUDPListener, error) {
@@ -28,6 +31,7 @@ func newConnClientUDPListener(c *ConnClient, port int, trackId int, streamType S
pc: pc, pc: pc,
trackId: trackId, trackId: trackId,
streamType: streamType, streamType: streamType,
udpFrameReadBuf: NewMultiBuffer(c.conf.ReadBufferCount, 2048),
}, nil }, nil
} }
@@ -35,11 +39,12 @@ func (l *connClientUDPListener) close() {
l.pc.Close() l.pc.Close()
} }
func (l *connClientUDPListener) read(buf []byte) (int, error) { func (l *connClientUDPListener) read() ([]byte, error) {
for { for {
buf := l.udpFrameReadBuf.Next()
n, addr, err := l.pc.ReadFrom(buf) n, addr, err := l.pc.ReadFrom(buf)
if err != nil { if err != nil {
return 0, err return nil, err
} }
uaddr := addr.(*net.UDPAddr) uaddr := addr.(*net.UDPAddr)
@@ -52,6 +57,6 @@ func (l *connClientUDPListener) read(buf []byte) (int, error) {
l.c.rtcpReceivers[l.trackId].OnFrame(l.streamType, buf[:n]) l.c.rtcpReceivers[l.trackId].OnFrame(l.streamType, buf[:n])
return n, nil return buf[:n], nil
} }
} }

View File

@@ -43,12 +43,8 @@ func main() {
panic(err) panic(err)
} }
frame := &gortsplib.InterleavedFrame{Content: make([]byte, 0, 128*1024)}
for { for {
frame.Content = frame.Content[:cap(frame.Content)] frame, err := conn.ReadFrame()
err := conn.ReadFrame(frame)
if err != nil { if err != nil {
conn.Close() conn.Close()
fmt.Println("connection is closed (%s)", err) fmt.Println("connection is closed (%s)", err)

View File

@@ -59,14 +59,13 @@ func main() {
go func(trackId int, rtpRead gortsplib.UDPReadFunc) { go func(trackId int, rtpRead gortsplib.UDPReadFunc) {
defer wg.Done() defer wg.Done()
buf := make([]byte, 2048)
for { for {
n, err := rtpRead(buf) buf, err := rtpRead()
if err != nil { if err != nil {
break break
} }
fmt.Printf("frame from track %d, type RTP: %v\n", trackId, buf[:n]) fmt.Printf("frame from track %d, type RTP: %v\n", trackId, buf)
} }
}(trackId, rtpRead) }(trackId, rtpRead)
} }
@@ -78,14 +77,13 @@ func main() {
go func(trackId int, rtcpRead gortsplib.UDPReadFunc) { go func(trackId int, rtcpRead gortsplib.UDPReadFunc) {
defer wg.Done() defer wg.Done()
buf := make([]byte, 2048)
for { for {
n, err := rtcpRead(buf) buf, err := rtcpRead()
if err != nil { if err != nil {
break break
} }
fmt.Printf("frame from track %d, type RTCP: %v\n", trackId, buf[:n]) fmt.Printf("frame from track %d, type RTCP: %v\n", trackId, buf)
} }
}(trackId, rtcpRead) }(trackId, rtcpRead)
} }

30
multibuffer.go Normal file
View File

@@ -0,0 +1,30 @@
package gortsplib
// MultiBuffer implements software multi buffering, that allows to reuse
// existing buffers without creating new ones, increasing performance.
type MultiBuffer struct {
buffers [][]byte
curBuf int
}
// NewMultiBuffer allocates a MultiBuffer.
func NewMultiBuffer(count int, size int) *MultiBuffer {
buffers := make([][]byte, count)
for i := 0; i < count; i++ {
buffers[i] = make([]byte, size)
}
return &MultiBuffer{
buffers: buffers,
}
}
// Next gets the current buffer and sets the next buffer as the current one.
func (mb *MultiBuffer) Next() []byte {
ret := mb.buffers[mb.curBuf]
mb.curBuf += 1
if mb.curBuf >= len(mb.buffers) {
mb.curBuf = 0
}
return ret
}