add UDPReadBufferSize option (#857)

This commit is contained in:
Alessandro Ros
2025-08-11 10:49:54 +02:00
committed by GitHub
parent a560c2c24a
commit 7a5df14b2c
11 changed files with 110 additions and 12 deletions

View File

@@ -430,6 +430,10 @@ type Client struct {
// at least a packet within this timeout, otherwise it switches to TCP. // at least a packet within this timeout, otherwise it switches to TCP.
// It defaults to 3 seconds. // It defaults to 3 seconds.
InitialUDPReadTimeout time.Duration InitialUDPReadTimeout time.Duration
// Size of the UDP read buffer.
// This can be increased to reduce packet losses.
// It defaults to the operating system default value.
UDPReadBufferSize int
// Size of the queue of outgoing packets. // Size of the queue of outgoing packets.
// It defaults to 256. // It defaults to 256.
WriteQueueSize int WriteQueueSize int

View File

@@ -2,9 +2,11 @@ package gortsplib
import ( import (
"crypto/rand" "crypto/rand"
"fmt"
"math/big" "math/big"
"net" "net"
"sync/atomic" "sync/atomic"
"syscall"
"time" "time"
"github.com/bluenviron/gortsplib/v4/pkg/multicast" "github.com/bluenviron/gortsplib/v4/pkg/multicast"
@@ -25,7 +27,43 @@ func randInRange(maxVal int) (int, error) {
type packetConn interface { type packetConn interface {
net.PacketConn net.PacketConn
SetReadBuffer(int) error SyscallConn() (syscall.RawConn, error)
}
func setAndVerifyReadBufferSize(pc packetConn, v int) error {
rawConn, err := pc.SyscallConn()
if err != nil {
panic(err)
}
var err2 error
err = rawConn.Control(func(fd uintptr) {
err2 = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_RCVBUF, v)
if err2 != nil {
return
}
var v2 int
v2, err2 = syscall.GetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_RCVBUF)
if err2 != nil {
return
}
if v2 != (v * 2) {
err2 = fmt.Errorf("unable to set read buffer size to %v - check that net.core.rmem_max is greater than %v", v, v)
return
}
})
if err != nil {
return err
}
if err2 != nil {
return err2
}
return nil
} }
type clientUDPListener struct { type clientUDPListener struct {
@@ -61,10 +99,12 @@ func (u *clientUDPListener) initialize() error {
u.pc = tmp.(*net.UDPConn) u.pc = tmp.(*net.UDPConn)
} }
err := u.pc.SetReadBuffer(udpKernelReadBufferSize) if u.c.UDPReadBufferSize != 0 {
if err != nil { err := setAndVerifyReadBufferSize(u.pc, u.c.UDPReadBufferSize)
u.pc.Close() if err != nil {
return err u.pc.Close()
return err
}
} }
u.lastPacketTime = int64Ptr(0) u.lastPacketTime = int64Ptr(0)

View File

@@ -1,9 +1,6 @@
package gortsplib package gortsplib
const ( const (
// same size as GStreamer's rtspsrc
udpKernelReadBufferSize = 0x80000
// 1500 (UDP MTU) - 20 (IP header) - 8 (UDP header) // 1500 (UDP MTU) - 20 (IP header) - 8 (UDP header)
udpMaxPayloadSize = 1472 udpMaxPayloadSize = 1472

View File

@@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"net" "net"
"strconv" "strconv"
"syscall"
"time" "time"
"golang.org/x/net/ipv4" "golang.org/x/net/ipv4"
@@ -133,6 +134,11 @@ func (c *multiConn) SetReadBuffer(bytes int) error {
return c.readConn.SetReadBuffer(bytes) return c.readConn.SetReadBuffer(bytes)
} }
// SyscallConn implements Conn.
func (c *multiConn) SyscallConn() (syscall.RawConn, error) {
return c.readConn.SyscallConn()
}
// LocalAddr implements Conn. // LocalAddr implements Conn.
func (c *multiConn) LocalAddr() net.Addr { func (c *multiConn) LocalAddr() net.Addr {
return c.readConn.LocalAddr() return c.readConn.LocalAddr()

View File

@@ -187,6 +187,11 @@ func (c *multiConn) SetReadBuffer(bytes int) error {
return syscall.SetsockoptInt(int(c.readFile.Fd()), syscall.SOL_SOCKET, syscall.SO_RCVBUF, bytes) return syscall.SetsockoptInt(int(c.readFile.Fd()), syscall.SOL_SOCKET, syscall.SO_RCVBUF, bytes)
} }
// SyscallConn implements Conn.
func (c *multiConn) SyscallConn() (syscall.RawConn, error) {
return &rawConn{fd: c.readFile.Fd()}, nil
}
// LocalAddr implements Conn. // LocalAddr implements Conn.
func (c *multiConn) LocalAddr() net.Addr { func (c *multiConn) LocalAddr() net.Addr {
return c.readConn.LocalAddr() return c.readConn.LocalAddr()

View File

@@ -4,12 +4,14 @@ package multicast
import ( import (
"fmt" "fmt"
"net" "net"
"syscall"
) )
// Conn is a Multicast connection. // Conn is a Multicast connection.
type Conn interface { type Conn interface {
net.PacketConn net.PacketConn
SetReadBuffer(int) error SetReadBuffer(int) error
SyscallConn() (syscall.RawConn, error)
} }
// InterfaceForSource returns a multicast-capable interface that can communicate with given IP. // InterfaceForSource returns a multicast-capable interface that can communicate with given IP.

View File

@@ -5,6 +5,7 @@ package multicast
import ( import (
"net" "net"
"strconv" "strconv"
"syscall"
"time" "time"
"golang.org/x/net/ipv4" "golang.org/x/net/ipv4"
@@ -77,6 +78,11 @@ func (c *singleConn) SetReadBuffer(bytes int) error {
return c.conn.SetReadBuffer(bytes) return c.conn.SetReadBuffer(bytes)
} }
// SyscallConn implements Conn.
func (c *singleConn) SyscallConn() (syscall.RawConn, error) {
return c.conn.SyscallConn()
}
// LocalAddr implements Conn. // LocalAddr implements Conn.
func (c *singleConn) LocalAddr() net.Addr { func (c *singleConn) LocalAddr() net.Addr {
return c.conn.LocalAddr() return c.conn.LocalAddr()

View File

@@ -41,6 +41,26 @@ func setIPMreqInterface(mreq *syscall.IPMreq, ifi *net.Interface) error {
return fmt.Errorf("no such interface") return fmt.Errorf("no such interface")
} }
type rawConn struct {
fd uintptr
}
// Control implements syscall.RawConn.
func (c *rawConn) Control(f func(fd uintptr)) error {
f(c.fd)
return nil
}
// Read implements syscall.RawConn.
func (*rawConn) Read(_ func(fd uintptr) (done bool)) error {
panic("unimplemented")
}
// Write implements syscall.RawConn.
func (*rawConn) Write(_ func(fd uintptr) (done bool)) error {
panic("unimplemented")
}
// singleConn is a multicast connection // singleConn is a multicast connection
// that works on a single interface. // that works on a single interface.
type singleConn struct { type singleConn struct {
@@ -141,6 +161,11 @@ func (c *singleConn) SetReadBuffer(bytes int) error {
return syscall.SetsockoptInt(int(c.file.Fd()), syscall.SOL_SOCKET, syscall.SO_RCVBUF, bytes) return syscall.SetsockoptInt(int(c.file.Fd()), syscall.SOL_SOCKET, syscall.SO_RCVBUF, bytes)
} }
// SyscallConn implements Conn.
func (c *singleConn) SyscallConn() (syscall.RawConn, error) {
return &rawConn{fd: c.file.Fd()}, nil
}
// LocalAddr implements Conn. // LocalAddr implements Conn.
func (c *singleConn) LocalAddr() net.Addr { func (c *singleConn) LocalAddr() net.Addr {
return c.conn.LocalAddr() return c.conn.LocalAddr()

View File

@@ -85,6 +85,10 @@ type Server struct {
WriteTimeout time.Duration WriteTimeout time.Duration
// a TLS configuration to accept TLS (RTSPS) connections. // a TLS configuration to accept TLS (RTSPS) connections.
TLSConfig *tls.Config TLSConfig *tls.Config
// Size of the UDP read buffer.
// This can be increased to reduce packet losses.
// It defaults to the operating system default value.
UDPReadBufferSize int
// Size of the queue of outgoing packets. // Size of the queue of outgoing packets.
// It defaults to 256. // It defaults to 256.
WriteQueueSize int WriteQueueSize int
@@ -225,6 +229,7 @@ func (s *Server) Start() error {
} }
s.udpRTPListener = &serverUDPListener{ s.udpRTPListener = &serverUDPListener{
readBufferSize: s.UDPReadBufferSize,
listenPacket: s.ListenPacket, listenPacket: s.ListenPacket,
writeTimeout: s.WriteTimeout, writeTimeout: s.WriteTimeout,
multicastEnable: false, multicastEnable: false,
@@ -236,6 +241,7 @@ func (s *Server) Start() error {
} }
s.udpRTCPListener = &serverUDPListener{ s.udpRTCPListener = &serverUDPListener{
readBufferSize: s.UDPReadBufferSize,
listenPacket: s.ListenPacket, listenPacket: s.ListenPacket,
writeTimeout: s.WriteTimeout, writeTimeout: s.WriteTimeout,
multicastEnable: false, multicastEnable: false,

View File

@@ -23,6 +23,7 @@ func (h *serverMulticastWriter) initialize() error {
} }
rtpl, rtcpl, err := createUDPListenerMulticastPair( rtpl, rtcpl, err := createUDPListenerMulticastPair(
h.s.UDPReadBufferSize,
h.s.ListenPacket, h.s.ListenPacket,
h.s.WriteTimeout, h.s.WriteTimeout,
h.s.MulticastRTPPort, h.s.MulticastRTPPort,

View File

@@ -26,6 +26,7 @@ func (p *clientAddr) fill(ip net.IP, port int) {
} }
func createUDPListenerMulticastPair( func createUDPListenerMulticastPair(
readBufferSize int,
listenPacket func(network, address string) (net.PacketConn, error), listenPacket func(network, address string) (net.PacketConn, error),
writeTimeout time.Duration, writeTimeout time.Duration,
multicastRTPPort int, multicastRTPPort int,
@@ -33,6 +34,7 @@ func createUDPListenerMulticastPair(
ip net.IP, ip net.IP,
) (*serverUDPListener, *serverUDPListener, error) { ) (*serverUDPListener, *serverUDPListener, error) {
rtpl := &serverUDPListener{ rtpl := &serverUDPListener{
readBufferSize: readBufferSize,
listenPacket: listenPacket, listenPacket: listenPacket,
writeTimeout: writeTimeout, writeTimeout: writeTimeout,
multicastEnable: true, multicastEnable: true,
@@ -44,6 +46,7 @@ func createUDPListenerMulticastPair(
} }
rtcpl := &serverUDPListener{ rtcpl := &serverUDPListener{
readBufferSize: readBufferSize,
listenPacket: listenPacket, listenPacket: listenPacket,
writeTimeout: writeTimeout, writeTimeout: writeTimeout,
multicastEnable: true, multicastEnable: true,
@@ -59,6 +62,7 @@ func createUDPListenerMulticastPair(
} }
type serverUDPListener struct { type serverUDPListener struct {
readBufferSize int
listenPacket func(network, address string) (net.PacketConn, error) listenPacket func(network, address string) (net.PacketConn, error)
writeTimeout time.Duration writeTimeout time.Duration
multicastEnable bool multicastEnable bool
@@ -94,10 +98,12 @@ func (u *serverUDPListener) initialize() error {
u.listenIP = tmp.LocalAddr().(*net.UDPAddr).IP u.listenIP = tmp.LocalAddr().(*net.UDPAddr).IP
} }
err := u.pc.SetReadBuffer(udpKernelReadBufferSize) if u.readBufferSize != 0 {
if err != nil { err := setAndVerifyReadBufferSize(u.pc, u.readBufferSize)
u.pc.Close() if err != nil {
return err u.pc.Close()
return err
}
} }
u.clients = make(map[clientAddr]readFunc) u.clients = make(map[clientAddr]readFunc)