support filtering interface and source IP in UDP sources (#4178) (#4464)

This commit is contained in:
Alessandro Ros
2025-04-28 22:22:55 +02:00
committed by GitHub
parent a348007607
commit a0f5315549
4 changed files with 167 additions and 57 deletions

View File

@@ -895,7 +895,23 @@ paths:
The resulting stream is available in path `/mypath`. The resulting stream is available in path `/mypath`.
Known clients that can publish with WebRTC and WHIP are [FFmpeg](#ffmpeg) and [GStreamer](#gstreamer). If the listening IP is a multicast IP, _MediaMTX_ listens for incoming multicast packets on all network interfaces. It is possible to listen on a single interface only by using the `interface` parameter:
```yml
paths:
mypath:
source: udp://238.0.0.1:1234?interface=eth0
```
It is possible to restrict who can send packets by using the `source` parameter:
```yml
paths:
mypath:
source: udp://0.0.0.0:1234?source=192.168.3.5
```
Known clients that can publish with UDP/MPEG-TS are [FFmpeg](#ffmpeg) and [GStreamer](#gstreamer).
## Read from the server ## Read from the server

View File

@@ -5,7 +5,7 @@ import (
"net" "net"
) )
// Restrict avoids listening on IPv6 when address is 0.0.0.0. // Restrict prevents listening on IPv6 when address is 0.0.0.0.
func Restrict(network string, address string) (string, string) { func Restrict(network string, address string) (string, string) {
host, _, err := net.SplitHostPort(address) host, _, err := net.SplitHostPort(address)
if err == nil { if err == nil {

View File

@@ -4,6 +4,7 @@ package udp
import ( import (
"fmt" "fmt"
"net" "net"
"net/url"
"time" "time"
"github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/description"
@@ -25,18 +26,20 @@ const (
) )
type packetConnReader struct { type packetConnReader struct {
net.PacketConn pc net.PacketConn
} sourceIP net.IP
func newPacketConnReader(pc net.PacketConn) *packetConnReader {
return &packetConnReader{
PacketConn: pc,
}
} }
func (r *packetConnReader) Read(p []byte) (int, error) { func (r *packetConnReader) Read(p []byte) (int, error) {
n, _, err := r.PacketConn.ReadFrom(p) for {
return n, err n, addr, err := r.pc.ReadFrom(p)
if r.sourceIP != nil && addr != nil && !addr.(*net.UDPAddr).IP.Equal(r.sourceIP) {
continue
}
return n, err
}
} }
type packetConn interface { type packetConn interface {
@@ -59,9 +62,22 @@ func (s *Source) Log(level logger.Level, format string, args ...interface{}) {
func (s *Source) Run(params defs.StaticSourceRunParams) error { func (s *Source) Run(params defs.StaticSourceRunParams) error {
s.Log(logger.Debug, "connecting") s.Log(logger.Debug, "connecting")
hostPort := params.ResolvedSource[len("udp://"):] u, err := url.Parse(params.ResolvedSource)
if err != nil {
return err
}
q := u.Query()
addr, err := net.ResolveUDPAddr("udp", hostPort) var sourceIP net.IP
if src := q.Get("source"); src != "" {
sourceIP = net.ParseIP(src)
if sourceIP == nil {
return fmt.Errorf("invalid source IP")
}
}
addr, err := net.ResolveUDPAddr("udp", u.Host)
if err != nil { if err != nil {
return err return err
} }
@@ -69,9 +85,22 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error {
var pc packetConn var pc packetConn
if ip4 := addr.IP.To4(); ip4 != nil && addr.IP.IsMulticast() { if ip4 := addr.IP.To4(); ip4 != nil && addr.IP.IsMulticast() {
pc, err = multicast.NewMultiConn(hostPort, true, net.ListenPacket) if intfName := q.Get("interface"); intfName != "" {
if err != nil { var intf *net.Interface
return err intf, err = net.InterfaceByName(intfName)
if err != nil {
return err
}
pc, err = multicast.NewSingleConn(intf, addr.String(), net.ListenPacket)
if err != nil {
return err
}
} else {
pc, err = multicast.NewMultiConn(addr.String(), true, net.ListenPacket)
if err != nil {
return err
}
} }
} else { } else {
var tmp net.PacketConn var tmp net.PacketConn
@@ -91,7 +120,7 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error {
readerErr := make(chan error) readerErr := make(chan error)
go func() { go func() {
readerErr <- s.runReader(pc) readerErr <- s.runReader(pc, sourceIP)
}() }()
select { select {
@@ -105,9 +134,10 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error {
} }
} }
func (s *Source) runReader(pc net.PacketConn) error { func (s *Source) runReader(pc net.PacketConn, sourceIP net.IP) error {
pc.SetReadDeadline(time.Now().Add(time.Duration(s.ReadTimeout))) pc.SetReadDeadline(time.Now().Add(time.Duration(s.ReadTimeout)))
r := &mcmpegts.Reader{R: mcmpegts.NewBufferedReader(newPacketConnReader(pc))} pcr := &packetConnReader{pc: pc, sourceIP: sourceIP}
r := &mcmpegts.Reader{R: mcmpegts.NewBufferedReader(pcr)}
err := r.Initialize() err := r.Initialize()
if err != nil { if err != nil {
return err return err

View File

@@ -14,46 +14,110 @@ import (
"github.com/bluenviron/mediamtx/internal/test" "github.com/bluenviron/mediamtx/internal/test"
) )
func TestSource(t *testing.T) { func multicastCapableInterface(t *testing.T) string {
te := test.NewSourceTester( intfs, err := net.Interfaces()
func(p defs.StaticSourceParent) defs.StaticSource {
return &Source{
ReadTimeout: conf.Duration(10 * time.Second),
Parent: p,
}
},
"udp://127.0.0.1:9001",
&conf.Path{},
)
defer te.Close()
time.Sleep(50 * time.Millisecond)
conn, err := net.Dial("udp", "127.0.0.1:9001")
require.NoError(t, err) require.NoError(t, err)
defer conn.Close()
track := &mpegts.Track{ for _, intf := range intfs {
Codec: &mpegts.CodecH264{}, if (intf.Flags & net.FlagMulticast) != 0 {
return intf.Name
}
} }
bw := bufio.NewWriter(conn) t.Errorf("unable to find a multicast IP")
w := &mpegts.Writer{W: bw, Tracks: []*mpegts.Track{track}} return ""
err = w.Initialize() }
require.NoError(t, err)
func TestSource(t *testing.T) {
err = w.WriteH264(track, 0, 0, [][]byte{{ // IDR for _, ca := range []string{
5, 1, "unicast",
}}) "multicast",
require.NoError(t, err) "multicast with interface",
"unicast with source",
err = w.WriteH264(track, 0, 0, [][]byte{{ // non-IDR } {
5, 2, t.Run(ca, func(t *testing.T) {
}}) var src string
require.NoError(t, err)
switch ca {
err = bw.Flush() case "unicast":
require.NoError(t, err) src = "udp://127.0.0.1:9001"
<-te.Unit case "multicast":
src = "udp://238.0.0.1:9001"
case "multicast with interface":
src = "udp://238.0.0.1:9001?interface=" + multicastCapableInterface(t)
case "unicast with source":
src = "udp://127.0.0.1:9001?source=127.0.1.1"
}
te := test.NewSourceTester(
func(p defs.StaticSourceParent) defs.StaticSource {
return &Source{
ReadTimeout: conf.Duration(10 * time.Second),
Parent: p,
}
},
src,
&conf.Path{},
)
defer te.Close()
time.Sleep(50 * time.Millisecond)
var dest string
switch ca {
case "unicast":
dest = "127.0.0.1:9001"
case "multicast":
dest = "238.0.0.1:9001"
case "multicast with interface":
dest = "238.0.0.1:9001"
case "unicast with source":
dest = "127.0.0.1:9001"
}
udest, err := net.ResolveUDPAddr("udp", dest)
require.NoError(t, err)
var usrc *net.UDPAddr
if ca == "unicast with source" {
usrc, err = net.ResolveUDPAddr("udp", "127.0.1.1:9020")
require.NoError(t, err)
}
conn, err := net.DialUDP("udp", usrc, udest)
require.NoError(t, err)
defer conn.Close() //nolint:errcheck
track := &mpegts.Track{
Codec: &mpegts.CodecH264{},
}
bw := bufio.NewWriter(conn)
w := &mpegts.Writer{W: bw, Tracks: []*mpegts.Track{track}}
err = w.Initialize()
require.NoError(t, err)
err = w.WriteH264(track, 0, 0, [][]byte{{ // IDR
5, 1,
}})
require.NoError(t, err)
err = w.WriteH264(track, 0, 0, [][]byte{{ // non-IDR
5, 2,
}})
require.NoError(t, err)
err = bw.Flush()
require.NoError(t, err)
<-te.Unit
})
}
} }