diff --git a/README.md b/README.md index 704b9c78..420c38c6 100644 --- a/README.md +++ b/README.md @@ -895,7 +895,23 @@ paths: The resulting stream is available in path `/mypath`. -Known clients that can publish with WebRTC and WHIP are [FFmpeg](#ffmpeg) and [GStreamer](#gstreamer). +If the listening IP is a multicast IP, _MediaMTX_ listens for incoming multicast packets on all network interfaces. It is possible to listen on a single interface only by using the `interface` parameter: + +```yml +paths: + mypath: + source: udp://238.0.0.1:1234?interface=eth0 +``` + +It is possible to restrict who can send packets by using the `source` parameter: + +```yml +paths: + mypath: + source: udp://0.0.0.0:1234?source=192.168.3.5 +``` + +Known clients that can publish with UDP/MPEG-TS are [FFmpeg](#ffmpeg) and [GStreamer](#gstreamer). ## Read from the server diff --git a/internal/restrictnetwork/restrict_network.go b/internal/restrictnetwork/restrict_network.go index 9c5cdef7..426c52c1 100644 --- a/internal/restrictnetwork/restrict_network.go +++ b/internal/restrictnetwork/restrict_network.go @@ -5,7 +5,7 @@ import ( "net" ) -// Restrict avoids listening on IPv6 when address is 0.0.0.0. +// Restrict prevents listening on IPv6 when address is 0.0.0.0. func Restrict(network string, address string) (string, string) { host, _, err := net.SplitHostPort(address) if err == nil { diff --git a/internal/staticsources/udp/source.go b/internal/staticsources/udp/source.go index 4c36705c..2cf9f4ba 100644 --- a/internal/staticsources/udp/source.go +++ b/internal/staticsources/udp/source.go @@ -4,6 +4,7 @@ package udp import ( "fmt" "net" + "net/url" "time" "github.com/bluenviron/gortsplib/v4/pkg/description" @@ -25,18 +26,20 @@ const ( ) type packetConnReader struct { - net.PacketConn -} - -func newPacketConnReader(pc net.PacketConn) *packetConnReader { - return &packetConnReader{ - PacketConn: pc, - } + pc net.PacketConn + sourceIP net.IP } func (r *packetConnReader) Read(p []byte) (int, error) { - n, _, err := r.PacketConn.ReadFrom(p) - return n, err + for { + n, addr, err := r.pc.ReadFrom(p) + + if r.sourceIP != nil && addr != nil && !addr.(*net.UDPAddr).IP.Equal(r.sourceIP) { + continue + } + + return n, err + } } type packetConn interface { @@ -59,9 +62,22 @@ func (s *Source) Log(level logger.Level, format string, args ...interface{}) { func (s *Source) Run(params defs.StaticSourceRunParams) error { s.Log(logger.Debug, "connecting") - hostPort := params.ResolvedSource[len("udp://"):] + u, err := url.Parse(params.ResolvedSource) + if err != nil { + return err + } + q := u.Query() - addr, err := net.ResolveUDPAddr("udp", hostPort) + var sourceIP net.IP + + if src := q.Get("source"); src != "" { + sourceIP = net.ParseIP(src) + if sourceIP == nil { + return fmt.Errorf("invalid source IP") + } + } + + addr, err := net.ResolveUDPAddr("udp", u.Host) if err != nil { return err } @@ -69,9 +85,22 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { var pc packetConn if ip4 := addr.IP.To4(); ip4 != nil && addr.IP.IsMulticast() { - pc, err = multicast.NewMultiConn(hostPort, true, net.ListenPacket) - if err != nil { - return err + if intfName := q.Get("interface"); intfName != "" { + var intf *net.Interface + intf, err = net.InterfaceByName(intfName) + if err != nil { + return err + } + + pc, err = multicast.NewSingleConn(intf, addr.String(), net.ListenPacket) + if err != nil { + return err + } + } else { + pc, err = multicast.NewMultiConn(addr.String(), true, net.ListenPacket) + if err != nil { + return err + } } } else { var tmp net.PacketConn @@ -91,7 +120,7 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { readerErr := make(chan error) go func() { - readerErr <- s.runReader(pc) + readerErr <- s.runReader(pc, sourceIP) }() select { @@ -105,9 +134,10 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { } } -func (s *Source) runReader(pc net.PacketConn) error { +func (s *Source) runReader(pc net.PacketConn, sourceIP net.IP) error { pc.SetReadDeadline(time.Now().Add(time.Duration(s.ReadTimeout))) - r := &mcmpegts.Reader{R: mcmpegts.NewBufferedReader(newPacketConnReader(pc))} + pcr := &packetConnReader{pc: pc, sourceIP: sourceIP} + r := &mcmpegts.Reader{R: mcmpegts.NewBufferedReader(pcr)} err := r.Initialize() if err != nil { return err diff --git a/internal/staticsources/udp/source_test.go b/internal/staticsources/udp/source_test.go index 37d16ae2..5e9b368a 100644 --- a/internal/staticsources/udp/source_test.go +++ b/internal/staticsources/udp/source_test.go @@ -14,46 +14,110 @@ import ( "github.com/bluenviron/mediamtx/internal/test" ) -func TestSource(t *testing.T) { - te := test.NewSourceTester( - func(p defs.StaticSourceParent) defs.StaticSource { - return &Source{ - ReadTimeout: conf.Duration(10 * time.Second), - Parent: p, - } - }, - "udp://127.0.0.1:9001", - &conf.Path{}, - ) - defer te.Close() - - time.Sleep(50 * time.Millisecond) - - conn, err := net.Dial("udp", "127.0.0.1:9001") +func multicastCapableInterface(t *testing.T) string { + intfs, err := net.Interfaces() require.NoError(t, err) - defer conn.Close() - track := &mpegts.Track{ - Codec: &mpegts.CodecH264{}, + for _, intf := range intfs { + if (intf.Flags & net.FlagMulticast) != 0 { + return intf.Name + } } - bw := bufio.NewWriter(conn) - w := &mpegts.Writer{W: bw, Tracks: []*mpegts.Track{track}} - err = w.Initialize() - require.NoError(t, err) - - err = w.WriteH264(track, 0, 0, [][]byte{{ // IDR - 5, 1, - }}) - require.NoError(t, err) - - err = w.WriteH264(track, 0, 0, [][]byte{{ // non-IDR - 5, 2, - }}) - require.NoError(t, err) - - err = bw.Flush() - require.NoError(t, err) - - <-te.Unit + t.Errorf("unable to find a multicast IP") + return "" +} + +func TestSource(t *testing.T) { + for _, ca := range []string{ + "unicast", + "multicast", + "multicast with interface", + "unicast with source", + } { + t.Run(ca, func(t *testing.T) { + var src string + + switch ca { + case "unicast": + src = "udp://127.0.0.1:9001" + + case "multicast": + src = "udp://238.0.0.1:9001" + + case "multicast with interface": + src = "udp://238.0.0.1:9001?interface=" + multicastCapableInterface(t) + + case "unicast with source": + src = "udp://127.0.0.1:9001?source=127.0.1.1" + } + + te := test.NewSourceTester( + func(p defs.StaticSourceParent) defs.StaticSource { + return &Source{ + ReadTimeout: conf.Duration(10 * time.Second), + Parent: p, + } + }, + src, + &conf.Path{}, + ) + defer te.Close() + + time.Sleep(50 * time.Millisecond) + + var dest string + + switch ca { + case "unicast": + dest = "127.0.0.1:9001" + + case "multicast": + dest = "238.0.0.1:9001" + + case "multicast with interface": + dest = "238.0.0.1:9001" + + case "unicast with source": + dest = "127.0.0.1:9001" + } + + udest, err := net.ResolveUDPAddr("udp", dest) + require.NoError(t, err) + + var usrc *net.UDPAddr + if ca == "unicast with source" { + usrc, err = net.ResolveUDPAddr("udp", "127.0.1.1:9020") + require.NoError(t, err) + } + + conn, err := net.DialUDP("udp", usrc, udest) + require.NoError(t, err) + defer conn.Close() //nolint:errcheck + + track := &mpegts.Track{ + Codec: &mpegts.CodecH264{}, + } + + bw := bufio.NewWriter(conn) + w := &mpegts.Writer{W: bw, Tracks: []*mpegts.Track{track}} + err = w.Initialize() + require.NoError(t, err) + + err = w.WriteH264(track, 0, 0, [][]byte{{ // IDR + 5, 1, + }}) + require.NoError(t, err) + + err = w.WriteH264(track, 0, 0, [][]byte{{ // non-IDR + 5, 2, + }}) + require.NoError(t, err) + + err = bw.Flush() + require.NoError(t, err) + + <-te.Unit + }) + } }