allow setting UDP read buffer size (#3308) (#4846)

new parameters: rtspUDPReadBufferSize, rtpUDPReadBufferSize, mpegtsUDPReadBufferSize
This commit is contained in:
Alessandro Ros
2025-08-11 12:21:00 +02:00
committed by GitHub
parent 9e073cd34f
commit 462fb2bd0f
14 changed files with 180 additions and 369 deletions

View File

@@ -2560,13 +2560,34 @@ rtsps://localhost:8322/mystream
In some scenarios, when publishing or reading from the server with RTSP, frames can get corrupted. This can be caused by several reasons:
* the write queue of the server is too small and can't keep up with the stream throughput. A solution consists in increasing its size:
* When the transport protocol is UDP (which is default one), packets sent to the server might get discarded because the UDP read buffer size is too small. This can be noticed in logs through the "RTP packets lost" message. Try increasing the UDP read buffer size:
```yml
rtspUDPReadBufferSize: 1000000
```
If the source of the stream is a camera:
```yml
paths:
test:
source: rtsp://..
rtspUDPReadBufferSize: 1000000
```
Both these options require the `net.core.rmem_max` system parameter to be equal or greater than `rtspUDPReadBufferSize`:
```sh
sudo sysctl net.core.rmem_max=100000000
```
* When the transport protocol is UDP (which is the default one), packets sent from the server to readers might get discarded because the write queue is too small. This can be noticed in logs through the "reader is too slow" message. Try increasing the write queue:
```yml
writeQueueSize: 1024
```
* The stream throughput is too big and the stream can't be transmitted correctly with the UDP transport protocol. UDP is more performant, faster and more efficient than TCP, but doesn't have a retransmission mechanism, that is needed in case of streams that need a large bandwidth. A solution consists in switching to TCP:
* The stream is too big and it can't be transmitted correctly with the UDP transport protocol. UDP is more performant, faster and more efficient than TCP, but doesn't have a retransmission mechanism, that is needed in case of streams that need a large bandwidth. A solution consists in switching to TCP:
```yml
rtspTransports: [tcp]
@@ -2579,7 +2600,7 @@ In some scenarios, when publishing or reading from the server with RTSP, frames
test:
source: rtsp://..
rtspTransport: tcp
```
```
* The stream throughput is too big to be handled by the network between server and readers. Upgrade the network or decrease the stream bitrate by re-encoding it.

View File

@@ -217,6 +217,9 @@ components:
type: array
items:
type: string
rtspUDPReadBufferSize:
type: integer
format: int64
# RTMP server
rtmp:
@@ -382,10 +385,21 @@ components:
type: string
rtspRangeStart:
type: string
rtspUDPReadBufferSize:
type: integer
format: int64
# MPEG-TS source
mpegtsUDPReadBufferSize:
type: integer
format: int64
# RTP source
rtpSDP:
type: string
rtpUDPReadBufferSize:
type: integer
format: int64
# Redirect source
sourceRedirect:

View File

@@ -220,29 +220,30 @@ type Conf struct {
PlaybackTrustedProxies IPNetworks `json:"playbackTrustedProxies"`
// RTSP server
RTSP bool `json:"rtsp"`
RTSPDisable *bool `json:"rtspDisable,omitempty"` // deprecated
Protocols *RTSPTransports `json:"protocols,omitempty"` // deprecated
RTSPTransports RTSPTransports `json:"rtspTransports"`
Encryption *Encryption `json:"encryption,omitempty"` // deprecated
RTSPEncryption Encryption `json:"rtspEncryption"`
RTSPAddress string `json:"rtspAddress"`
RTSPSAddress string `json:"rtspsAddress"`
RTPAddress string `json:"rtpAddress"`
RTCPAddress string `json:"rtcpAddress"`
MulticastIPRange string `json:"multicastIPRange"`
MulticastRTPPort int `json:"multicastRTPPort"`
MulticastRTCPPort int `json:"multicastRTCPPort"`
SRTPAddress string `json:"srtpAddress"`
SRTCPAddress string `json:"srtcpAddress"`
MulticastSRTPPort int `json:"multicastSRTPPort"`
MulticastSRTCPPort int `json:"multicastSRTCPPort"`
ServerKey *string `json:"serverKey,omitempty"`
ServerCert *string `json:"serverCert,omitempty"`
RTSPServerKey string `json:"rtspServerKey"`
RTSPServerCert string `json:"rtspServerCert"`
AuthMethods *RTSPAuthMethods `json:"authMethods,omitempty"` // deprecated
RTSPAuthMethods RTSPAuthMethods `json:"rtspAuthMethods"`
RTSP bool `json:"rtsp"`
RTSPDisable *bool `json:"rtspDisable,omitempty"` // deprecated
Protocols *RTSPTransports `json:"protocols,omitempty"` // deprecated
RTSPTransports RTSPTransports `json:"rtspTransports"`
Encryption *Encryption `json:"encryption,omitempty"` // deprecated
RTSPEncryption Encryption `json:"rtspEncryption"`
RTSPAddress string `json:"rtspAddress"`
RTSPSAddress string `json:"rtspsAddress"`
RTPAddress string `json:"rtpAddress"`
RTCPAddress string `json:"rtcpAddress"`
MulticastIPRange string `json:"multicastIPRange"`
MulticastRTPPort int `json:"multicastRTPPort"`
MulticastRTCPPort int `json:"multicastRTCPPort"`
SRTPAddress string `json:"srtpAddress"`
SRTCPAddress string `json:"srtcpAddress"`
MulticastSRTPPort int `json:"multicastSRTPPort"`
MulticastSRTCPPort int `json:"multicastSRTCPPort"`
ServerKey *string `json:"serverKey,omitempty"`
ServerCert *string `json:"serverCert,omitempty"`
RTSPServerKey string `json:"rtspServerKey"`
RTSPServerCert string `json:"rtspServerCert"`
AuthMethods *RTSPAuthMethods `json:"authMethods,omitempty"` // deprecated
RTSPAuthMethods RTSPAuthMethods `json:"rtspAuthMethods"`
RTSPUDPReadBufferSize uint `json:"rtspUDPReadBufferSize"`
// RTMP server
RTMP bool `json:"rtmp"`

View File

@@ -146,15 +146,20 @@ type Path struct {
SRTPublishPassphrase string `json:"srtPublishPassphrase"`
// RTSP source
RTSPTransport RTSPTransport `json:"rtspTransport"`
RTSPAnyPort bool `json:"rtspAnyPort"`
SourceProtocol *RTSPTransport `json:"sourceProtocol,omitempty"` // deprecated
SourceAnyPortEnable *bool `json:"sourceAnyPortEnable,omitempty"` // deprecated
RTSPRangeType RTSPRangeType `json:"rtspRangeType"`
RTSPRangeStart string `json:"rtspRangeStart"`
RTSPTransport RTSPTransport `json:"rtspTransport"`
RTSPAnyPort bool `json:"rtspAnyPort"`
SourceProtocol *RTSPTransport `json:"sourceProtocol,omitempty"` // deprecated
SourceAnyPortEnable *bool `json:"sourceAnyPortEnable,omitempty"` // deprecated
RTSPRangeType RTSPRangeType `json:"rtspRangeType"`
RTSPRangeStart string `json:"rtspRangeStart"`
RTSPUDPReadBufferSize uint `json:"rtspUDPReadBufferSize"`
// MPEG-TS source
MPEGTSUDPReadBufferSize uint `json:"mpegtsUDPReadBufferSize"`
// RTP source
RTPSDP string `json:"rtpSDP"`
RTPSDP string `json:"rtpSDP"`
RTPUDPReadBufferSize uint `json:"rtpUDPReadBufferSize"`
// Redirect source
SourceRedirect string `json:"sourceRedirect"`

View File

@@ -395,6 +395,7 @@ func (p *Core) createResources(initial bool) error {
i := &rtsp.Server{
Address: p.conf.RTSPAddress,
AuthMethods: p.conf.RTSPAuthMethods,
UDPReadBufferSize: p.conf.RTSPUDPReadBufferSize,
ReadTimeout: p.conf.ReadTimeout,
WriteTimeout: p.conf.WriteTimeout,
WriteQueueSize: p.conf.WriteQueueSize,
@@ -435,6 +436,7 @@ func (p *Core) createResources(initial bool) error {
i := &rtsp.Server{
Address: p.conf.RTSPSAddress,
AuthMethods: p.conf.RTSPAuthMethods,
UDPReadBufferSize: p.conf.RTSPUDPReadBufferSize,
ReadTimeout: p.conf.ReadTimeout,
WriteTimeout: p.conf.WriteTimeout,
WriteQueueSize: p.conf.WriteQueueSize,
@@ -730,6 +732,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
newConf.RTSPEncryption != p.conf.RTSPEncryption ||
newConf.RTSPAddress != p.conf.RTSPAddress ||
!reflect.DeepEqual(newConf.RTSPAuthMethods, p.conf.RTSPAuthMethods) ||
newConf.RTSPUDPReadBufferSize != p.conf.RTSPUDPReadBufferSize ||
newConf.ReadTimeout != p.conf.ReadTimeout ||
newConf.WriteTimeout != p.conf.WriteTimeout ||
newConf.WriteQueueSize != p.conf.WriteQueueSize ||
@@ -752,6 +755,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
newConf.RTSPEncryption != p.conf.RTSPEncryption ||
newConf.RTSPSAddress != p.conf.RTSPSAddress ||
!reflect.DeepEqual(newConf.RTSPAuthMethods, p.conf.RTSPAuthMethods) ||
newConf.RTSPUDPReadBufferSize != p.conf.RTSPUDPReadBufferSize ||
newConf.ReadTimeout != p.conf.ReadTimeout ||
newConf.WriteTimeout != p.conf.WriteTimeout ||
newConf.WriteQueueSize != p.conf.WriteQueueSize ||

View File

@@ -1,19 +1,57 @@
package mpegts
// Package udp contains utilities to work with the UDP protocol.
package udp
import (
"fmt"
"net"
"net/url"
"syscall"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/multicast"
"github.com/bluenviron/mediamtx/internal/restrictnetwork"
)
const (
// same size as GStreamer's rtspsrc
udpKernelReadBufferSize = 0x80000
)
type packetConn interface {
net.PacketConn
SyscallConn() (syscall.RawConn, error)
}
func setAndVerifyReadBufferSize(pc packetConn, v int) error {
rawConn, err := pc.SyscallConn()
if err != nil {
panic(err)
}
var err2 error
err = rawConn.Control(func(fd uintptr) {
err2 = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_RCVBUF, v)
if err2 != nil {
return
}
var v2 int
v2, err2 = syscall.GetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_RCVBUF)
if err2 != nil {
return
}
if v2 != (v * 2) {
err2 = fmt.Errorf("unable to set read buffer size to %v - check that net.core.rmem_max is greater than %v", v, v)
return
}
})
if err != nil {
return err
}
if err2 != nil {
return err2
}
return nil
}
type udpConn struct {
pc net.PacketConn
@@ -98,12 +136,9 @@ func defaultInterfaceForMulticast(multicastAddr *net.UDPAddr) (*net.Interface, e
return nil, fmt.Errorf("could not find any interface for using multicast address %s", multicastAddr)
}
type packetConn interface {
net.PacketConn
SetReadBuffer(int) error
}
func createUDP(host string, q url.Values) (net.Conn, error) {
// CreateConn creates a UDP connection.
func CreateConn(u *url.URL, udpReadBufferSize int) (net.Conn, error) {
q := u.Query()
var sourceIP net.IP
if src := q.Get("source"); src != "" {
@@ -113,7 +148,7 @@ func createUDP(host string, q url.Values) (net.Conn, error) {
}
}
addr, err := net.ResolveUDPAddr("udp", host)
addr, err := net.ResolveUDPAddr("udp", u.Host)
if err != nil {
return nil, err
}
@@ -148,12 +183,12 @@ func createUDP(host string, q url.Values) (net.Conn, error) {
pc = tmp.(*net.UDPConn)
}
// defer pc.Close()
err = pc.SetReadBuffer(udpKernelReadBufferSize)
if err != nil {
pc.Close()
return nil, err
if udpReadBufferSize != 0 {
err = setAndVerifyReadBufferSize(pc, udpReadBufferSize)
if err != nil {
pc.Close()
return nil, err
}
}
return &udpConn{pc: pc, sourceIP: sourceIP}, nil

View File

@@ -1,4 +1,5 @@
package mpegts
// Package unix contains utilities to work with Unix sockets.
package unix
import (
"fmt"
@@ -113,7 +114,8 @@ func (r *unixConn) SetWriteDeadline(_ time.Time) error {
panic("unimplemented")
}
func createUnix(u *url.URL) (net.Conn, error) {
// CreateConn creates a Unix socket connection.
func CreateConn(u *url.URL) (net.Conn, error) {
var pa string
if u.Path != "" {
pa = u.Path

View File

@@ -72,6 +72,7 @@ type serverParent interface {
type Server struct {
Address string
AuthMethods []auth.VerifyMethod
UDPReadBufferSize uint
ReadTimeout conf.Duration
WriteTimeout conf.Duration
WriteQueueSize int
@@ -113,12 +114,13 @@ func (s *Server) Initialize() error {
s.sessions = make(map[*gortsplib.ServerSession]*session)
s.srv = &gortsplib.Server{
Handler: s,
ReadTimeout: time.Duration(s.ReadTimeout),
WriteTimeout: time.Duration(s.WriteTimeout),
WriteQueueSize: s.WriteQueueSize,
RTSPAddress: s.Address,
AuthMethods: s.AuthMethods,
Handler: s,
ReadTimeout: time.Duration(s.ReadTimeout),
WriteTimeout: time.Duration(s.WriteTimeout),
UDPReadBufferSize: int(s.UDPReadBufferSize),
WriteQueueSize: s.WriteQueueSize,
RTSPAddress: s.Address,
AuthMethods: s.AuthMethods,
}
if s.UseUDP {

View File

@@ -14,6 +14,8 @@ import (
"github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/protocols/mpegts"
"github.com/bluenviron/mediamtx/internal/protocols/udp"
"github.com/bluenviron/mediamtx/internal/protocols/unix"
"github.com/bluenviron/mediamtx/internal/stream"
)
@@ -42,19 +44,18 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error {
if err != nil {
return err
}
q := u.Query()
var nc net.Conn
switch u.Scheme {
case "unix+mpegts":
nc, err = createUnix(u)
nc, err = unix.CreateConn(u)
if err != nil {
return err
}
default:
nc, err = createUDP(u.Host, q)
nc, err = udp.CreateConn(u, int(params.Conf.MPEGTSUDPReadBufferSize))
if err != nil {
return err
}

View File

@@ -15,6 +15,8 @@ import (
"github.com/bluenviron/mediamtx/internal/counterdumper"
"github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/protocols/udp"
"github.com/bluenviron/mediamtx/internal/protocols/unix"
"github.com/bluenviron/mediamtx/internal/stream"
"github.com/pion/rtp"
)
@@ -56,19 +58,18 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error {
if err != nil {
return err
}
q := u.Query()
var nc net.Conn
switch u.Scheme {
case "unix+rtp":
nc, err = createUnix(u)
nc, err = unix.CreateConn(u)
if err != nil {
return err
}
default:
nc, err = createUDP(u.Host, q)
nc, err = udp.CreateConn(u, int(params.Conf.RTPUDPReadBufferSize))
if err != nil {
return err
}

View File

@@ -1,160 +0,0 @@
package rtp
import (
"fmt"
"net"
"net/url"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/multicast"
"github.com/bluenviron/mediamtx/internal/restrictnetwork"
)
const (
// same size as GStreamer's rtspsrc
udpKernelReadBufferSize = 0x80000
)
type udpConn struct {
pc net.PacketConn
sourceIP net.IP
}
func (r *udpConn) Close() error {
return r.pc.Close()
}
func (r *udpConn) Read(p []byte) (int, error) {
for {
n, addr, err := r.pc.ReadFrom(p)
if r.sourceIP != nil && addr != nil && !addr.(*net.UDPAddr).IP.Equal(r.sourceIP) {
continue
}
return n, err
}
}
func (r *udpConn) Write(_ []byte) (int, error) {
panic("unimplemented")
}
func (r *udpConn) LocalAddr() net.Addr {
panic("unimplemented")
}
func (r *udpConn) RemoteAddr() net.Addr {
panic("unimplemented")
}
func (r *udpConn) SetDeadline(_ time.Time) error {
panic("unimplemented")
}
func (r *udpConn) SetReadDeadline(t time.Time) error {
return r.pc.SetReadDeadline(t)
}
func (r *udpConn) SetWriteDeadline(_ time.Time) error {
panic("unimplemented")
}
func defaultInterfaceForMulticast(multicastAddr *net.UDPAddr) (*net.Interface, error) {
conn, err := net.Dial("udp4", multicastAddr.String())
if err != nil {
return nil, err
}
localAddr := conn.LocalAddr().(*net.UDPAddr)
conn.Close()
interfaces, err := net.Interfaces()
if err != nil {
return nil, err
}
for _, iface := range interfaces {
var addrs []net.Addr
addrs, err = iface.Addrs()
if err != nil {
continue
}
for _, addr := range addrs {
var ip net.IP
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
}
if ip != nil && ip.Equal(localAddr.IP) {
return &iface, nil
}
}
}
return nil, fmt.Errorf("could not find any interface for using multicast address %s", multicastAddr)
}
type packetConn interface {
net.PacketConn
SetReadBuffer(int) error
}
func createUDP(host string, q url.Values) (net.Conn, error) {
var sourceIP net.IP
if src := q.Get("source"); src != "" {
sourceIP = net.ParseIP(src)
if sourceIP == nil {
return nil, fmt.Errorf("invalid source IP")
}
}
addr, err := net.ResolveUDPAddr("udp", host)
if err != nil {
return nil, err
}
var pc packetConn
if ip4 := addr.IP.To4(); ip4 != nil && addr.IP.IsMulticast() {
var intf *net.Interface
if intfName := q.Get("interface"); intfName != "" {
intf, err = net.InterfaceByName(intfName)
if err != nil {
return nil, err
}
} else {
intf, err = defaultInterfaceForMulticast(addr)
if err != nil {
return nil, err
}
}
pc, err = multicast.NewSingleConn(intf, addr.String(), net.ListenPacket)
if err != nil {
return nil, err
}
} else {
var tmp net.PacketConn
tmp, err = net.ListenPacket(restrictnetwork.Restrict("udp", addr.String()))
if err != nil {
return nil, err
}
pc = tmp.(*net.UDPConn)
}
// defer pc.Close()
err = pc.SetReadBuffer(udpKernelReadBufferSize)
if err != nil {
pc.Close()
return nil, err
}
return &udpConn{pc: pc, sourceIP: sourceIP}, nil
}

View File

@@ -1,136 +0,0 @@
package rtp
import (
"fmt"
"net"
"net/url"
"os"
"sync"
"time"
)
type unixConn struct {
l net.Listener
c net.Conn
mutex sync.Mutex
closed bool
deadline time.Time
}
func (r *unixConn) Close() error {
r.mutex.Lock()
defer r.mutex.Unlock()
r.closed = true
r.l.Close()
if r.c != nil {
r.c.Close()
}
return nil
}
func (r *unixConn) acceptWithDeadline() (net.Conn, error) {
done := make(chan struct{})
defer func() { <-done }()
terminate := make(chan struct{})
defer close(terminate)
go func() {
defer close(done)
select {
case <-time.After(time.Until(r.deadline)):
r.l.Close()
case <-terminate:
return
}
}()
c, err := r.l.Accept()
if err != nil {
if time.Now().After(r.deadline) {
return nil, fmt.Errorf("deadline exceeded")
}
return nil, err
}
return c, nil
}
func (r *unixConn) setConn(c net.Conn) error {
r.mutex.Lock()
defer r.mutex.Unlock()
if r.closed {
return fmt.Errorf("closed")
}
r.c = c
return nil
}
func (r *unixConn) Read(p []byte) (int, error) {
if r.c == nil {
c, err := r.acceptWithDeadline()
if err != nil {
return 0, err
}
err = r.setConn(c)
if err != nil {
return 0, err
}
}
r.c.SetReadDeadline(r.deadline)
return r.c.Read(p)
}
func (r *unixConn) Write(_ []byte) (int, error) {
panic("unimplemented")
}
func (r *unixConn) LocalAddr() net.Addr {
panic("unimplemented")
}
func (r *unixConn) RemoteAddr() net.Addr {
panic("unimplemented")
}
func (r *unixConn) SetDeadline(_ time.Time) error {
panic("unimplemented")
}
func (r *unixConn) SetReadDeadline(t time.Time) error {
r.deadline = t
return nil
}
func (r *unixConn) SetWriteDeadline(_ time.Time) error {
panic("unimplemented")
}
func createUnix(u *url.URL) (net.Conn, error) {
var pa string
if u.Path != "" {
pa = u.Path
} else {
pa = u.Host
}
if pa == "" {
return nil, fmt.Errorf("invalid unix path")
}
os.Remove(pa)
socket, err := net.Listen("unix", pa)
if err != nil {
return nil, err
}
return &unixConn{l: socket}, nil
}

View File

@@ -122,14 +122,15 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error {
}
c := &gortsplib.Client{
Scheme: u.Scheme,
Host: u.Host,
Transport: params.Conf.RTSPTransport.Transport,
TLSConfig: tls.ConfigForFingerprint(params.Conf.SourceFingerprint),
ReadTimeout: time.Duration(s.ReadTimeout),
WriteTimeout: time.Duration(s.WriteTimeout),
WriteQueueSize: s.WriteQueueSize,
AnyPortEnable: params.Conf.RTSPAnyPort,
Scheme: u.Scheme,
Host: u.Host,
Transport: params.Conf.RTSPTransport.Transport,
TLSConfig: tls.ConfigForFingerprint(params.Conf.SourceFingerprint),
ReadTimeout: time.Duration(s.ReadTimeout),
WriteTimeout: time.Duration(s.WriteTimeout),
WriteQueueSize: s.WriteQueueSize,
UDPReadBufferSize: int(params.Conf.RTSPUDPReadBufferSize),
AnyPortEnable: params.Conf.RTSPAnyPort,
OnRequest: func(req *base.Request) {
s.Log(logger.Debug, "[c->s] %v", req)
},

View File

@@ -277,6 +277,10 @@ rtspServerCert: server.crt
# Authentication methods. Available are "basic" and "digest".
# "digest" doesn't provide any additional security and is available for compatibility only.
rtspAuthMethods: [basic]
# Size of the UDP buffer of the RTSP server.
# This can be increased to mitigate packet losses.
# It defaults to the default value of the operating system.
rtspUDPReadBufferSize: 0
###############################################
# Global settings -> RTMP server
@@ -530,12 +534,28 @@ pathDefaults:
# * npt: duration such as "300ms", "1.5m" or "2h45m", valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h"
# * smpte: duration such as "300ms", "1.5m" or "2h45m", valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h"
rtspRangeStart:
# Size of the UDP buffer of the RTSP client.
# This can be increased to mitigate packet losses.
# It defaults to the default value of the operating system.
rtspUDPReadBufferSize: 0
###############################################
# Default path settings -> MPEG-TS source (when source is MPEG-TS)
# Size of the UDP buffer of the MPEG-TS client.
# This can be increased to mitigate packet losses.
# It defaults to the default value of the operating system.
mpegtsUDPReadBufferSize: 0
###############################################
# Default path settings -> RTP source (when source is RTP)
# session description protocol (SDP) of the RTP stream.
rtpSDP:
# Size of the UDP buffer of the RTP client.
# This can be increased to mitigate packet losses.
# It defaults to the default value of the operating system.
rtpUDPReadBufferSize: 0
###############################################
# Default path settings -> Redirect source (when source is "redirect")