diff --git a/client.go b/client.go index ae605217..d75c6839 100644 --- a/client.go +++ b/client.go @@ -430,6 +430,10 @@ type Client struct { // at least a packet within this timeout, otherwise it switches to TCP. // It defaults to 3 seconds. InitialUDPReadTimeout time.Duration + // Size of the UDP read buffer. + // This can be increased to reduce packet losses. + // It defaults to the operating system default value. + UDPReadBufferSize int // Size of the queue of outgoing packets. // It defaults to 256. WriteQueueSize int diff --git a/client_udp_listener.go b/client_udp_listener.go index c28bc138..c5faf19c 100644 --- a/client_udp_listener.go +++ b/client_udp_listener.go @@ -2,9 +2,11 @@ package gortsplib import ( "crypto/rand" + "fmt" "math/big" "net" "sync/atomic" + "syscall" "time" "github.com/bluenviron/gortsplib/v4/pkg/multicast" @@ -25,7 +27,43 @@ func randInRange(maxVal int) (int, error) { type packetConn interface { net.PacketConn - SetReadBuffer(int) error + SyscallConn() (syscall.RawConn, error) +} + +func setAndVerifyReadBufferSize(pc packetConn, v int) error { + rawConn, err := pc.SyscallConn() + if err != nil { + panic(err) + } + + var err2 error + + err = rawConn.Control(func(fd uintptr) { + err2 = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_RCVBUF, v) + if err2 != nil { + return + } + + var v2 int + v2, err2 = syscall.GetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_RCVBUF) + if err2 != nil { + return + } + + if v2 != (v * 2) { + err2 = fmt.Errorf("unable to set read buffer size to %v - check that net.core.rmem_max is greater than %v", v, v) + return + } + }) + if err != nil { + return err + } + + if err2 != nil { + return err2 + } + + return nil } type clientUDPListener struct { @@ -61,10 +99,12 @@ func (u *clientUDPListener) initialize() error { u.pc = tmp.(*net.UDPConn) } - err := u.pc.SetReadBuffer(udpKernelReadBufferSize) - if err != nil { - u.pc.Close() - return err + if u.c.UDPReadBufferSize != 0 { + err := setAndVerifyReadBufferSize(u.pc, u.c.UDPReadBufferSize) + if err != nil { + u.pc.Close() + return err + } } u.lastPacketTime = int64Ptr(0) diff --git a/constants.go b/constants.go index 44165676..6d460154 100644 --- a/constants.go +++ b/constants.go @@ -1,9 +1,6 @@ package gortsplib const ( - // same size as GStreamer's rtspsrc - udpKernelReadBufferSize = 0x80000 - // 1500 (UDP MTU) - 20 (IP header) - 8 (UDP header) udpMaxPayloadSize = 1472 diff --git a/pkg/multicast/multi_conn.go b/pkg/multicast/multi_conn.go index f7e43db1..23547a11 100644 --- a/pkg/multicast/multi_conn.go +++ b/pkg/multicast/multi_conn.go @@ -6,6 +6,7 @@ import ( "fmt" "net" "strconv" + "syscall" "time" "golang.org/x/net/ipv4" @@ -133,6 +134,11 @@ func (c *multiConn) SetReadBuffer(bytes int) error { return c.readConn.SetReadBuffer(bytes) } +// SyscallConn implements Conn. +func (c *multiConn) SyscallConn() (syscall.RawConn, error) { + return c.readConn.SyscallConn() +} + // LocalAddr implements Conn. func (c *multiConn) LocalAddr() net.Addr { return c.readConn.LocalAddr() diff --git a/pkg/multicast/multi_conn_lin.go b/pkg/multicast/multi_conn_lin.go index e27d382b..1529d56b 100644 --- a/pkg/multicast/multi_conn_lin.go +++ b/pkg/multicast/multi_conn_lin.go @@ -187,6 +187,11 @@ func (c *multiConn) SetReadBuffer(bytes int) error { return syscall.SetsockoptInt(int(c.readFile.Fd()), syscall.SOL_SOCKET, syscall.SO_RCVBUF, bytes) } +// SyscallConn implements Conn. +func (c *multiConn) SyscallConn() (syscall.RawConn, error) { + return &rawConn{fd: c.readFile.Fd()}, nil +} + // LocalAddr implements Conn. func (c *multiConn) LocalAddr() net.Addr { return c.readConn.LocalAddr() diff --git a/pkg/multicast/multicast.go b/pkg/multicast/multicast.go index a332ae05..be1fc1f8 100644 --- a/pkg/multicast/multicast.go +++ b/pkg/multicast/multicast.go @@ -4,12 +4,14 @@ package multicast import ( "fmt" "net" + "syscall" ) // Conn is a Multicast connection. type Conn interface { net.PacketConn SetReadBuffer(int) error + SyscallConn() (syscall.RawConn, error) } // InterfaceForSource returns a multicast-capable interface that can communicate with given IP. diff --git a/pkg/multicast/single_conn.go b/pkg/multicast/single_conn.go index f22ce157..242fb6c5 100644 --- a/pkg/multicast/single_conn.go +++ b/pkg/multicast/single_conn.go @@ -5,6 +5,7 @@ package multicast import ( "net" "strconv" + "syscall" "time" "golang.org/x/net/ipv4" @@ -77,6 +78,11 @@ func (c *singleConn) SetReadBuffer(bytes int) error { return c.conn.SetReadBuffer(bytes) } +// SyscallConn implements Conn. +func (c *singleConn) SyscallConn() (syscall.RawConn, error) { + return c.conn.SyscallConn() +} + // LocalAddr implements Conn. func (c *singleConn) LocalAddr() net.Addr { return c.conn.LocalAddr() diff --git a/pkg/multicast/single_conn_lin.go b/pkg/multicast/single_conn_lin.go index 5aa00ff0..6c338a6e 100644 --- a/pkg/multicast/single_conn_lin.go +++ b/pkg/multicast/single_conn_lin.go @@ -41,6 +41,26 @@ func setIPMreqInterface(mreq *syscall.IPMreq, ifi *net.Interface) error { return fmt.Errorf("no such interface") } +type rawConn struct { + fd uintptr +} + +// Control implements syscall.RawConn. +func (c *rawConn) Control(f func(fd uintptr)) error { + f(c.fd) + return nil +} + +// Read implements syscall.RawConn. +func (*rawConn) Read(_ func(fd uintptr) (done bool)) error { + panic("unimplemented") +} + +// Write implements syscall.RawConn. +func (*rawConn) Write(_ func(fd uintptr) (done bool)) error { + panic("unimplemented") +} + // singleConn is a multicast connection // that works on a single interface. type singleConn struct { @@ -141,6 +161,11 @@ func (c *singleConn) SetReadBuffer(bytes int) error { return syscall.SetsockoptInt(int(c.file.Fd()), syscall.SOL_SOCKET, syscall.SO_RCVBUF, bytes) } +// SyscallConn implements Conn. +func (c *singleConn) SyscallConn() (syscall.RawConn, error) { + return &rawConn{fd: c.file.Fd()}, nil +} + // LocalAddr implements Conn. func (c *singleConn) LocalAddr() net.Addr { return c.conn.LocalAddr() diff --git a/server.go b/server.go index d739856c..da7c2c7b 100644 --- a/server.go +++ b/server.go @@ -85,6 +85,10 @@ type Server struct { WriteTimeout time.Duration // a TLS configuration to accept TLS (RTSPS) connections. TLSConfig *tls.Config + // Size of the UDP read buffer. + // This can be increased to reduce packet losses. + // It defaults to the operating system default value. + UDPReadBufferSize int // Size of the queue of outgoing packets. // It defaults to 256. WriteQueueSize int @@ -225,6 +229,7 @@ func (s *Server) Start() error { } s.udpRTPListener = &serverUDPListener{ + readBufferSize: s.UDPReadBufferSize, listenPacket: s.ListenPacket, writeTimeout: s.WriteTimeout, multicastEnable: false, @@ -236,6 +241,7 @@ func (s *Server) Start() error { } s.udpRTCPListener = &serverUDPListener{ + readBufferSize: s.UDPReadBufferSize, listenPacket: s.ListenPacket, writeTimeout: s.WriteTimeout, multicastEnable: false, diff --git a/server_multicast_writer.go b/server_multicast_writer.go index e8bb1d8c..52645aa3 100644 --- a/server_multicast_writer.go +++ b/server_multicast_writer.go @@ -23,6 +23,7 @@ func (h *serverMulticastWriter) initialize() error { } rtpl, rtcpl, err := createUDPListenerMulticastPair( + h.s.UDPReadBufferSize, h.s.ListenPacket, h.s.WriteTimeout, h.s.MulticastRTPPort, diff --git a/server_udp_listener.go b/server_udp_listener.go index 96dd756b..b26b2e1d 100644 --- a/server_udp_listener.go +++ b/server_udp_listener.go @@ -26,6 +26,7 @@ func (p *clientAddr) fill(ip net.IP, port int) { } func createUDPListenerMulticastPair( + readBufferSize int, listenPacket func(network, address string) (net.PacketConn, error), writeTimeout time.Duration, multicastRTPPort int, @@ -33,6 +34,7 @@ func createUDPListenerMulticastPair( ip net.IP, ) (*serverUDPListener, *serverUDPListener, error) { rtpl := &serverUDPListener{ + readBufferSize: readBufferSize, listenPacket: listenPacket, writeTimeout: writeTimeout, multicastEnable: true, @@ -44,6 +46,7 @@ func createUDPListenerMulticastPair( } rtcpl := &serverUDPListener{ + readBufferSize: readBufferSize, listenPacket: listenPacket, writeTimeout: writeTimeout, multicastEnable: true, @@ -59,6 +62,7 @@ func createUDPListenerMulticastPair( } type serverUDPListener struct { + readBufferSize int listenPacket func(network, address string) (net.PacketConn, error) writeTimeout time.Duration multicastEnable bool @@ -94,10 +98,12 @@ func (u *serverUDPListener) initialize() error { u.listenIP = tmp.LocalAddr().(*net.UDPAddr).IP } - err := u.pc.SetReadBuffer(udpKernelReadBufferSize) - if err != nil { - u.pc.Close() - return err + if u.readBufferSize != 0 { + err := setAndVerifyReadBufferSize(u.pc, u.readBufferSize) + if err != nil { + u.pc.Close() + return err + } } u.clients = make(map[clientAddr]readFunc)