rtmp: fix timeout when publishing with GLive T80 (#4002) (#4583)

This commit is contained in:
Alessandro Ros
2025-06-03 18:26:00 +02:00
committed by GitHub
parent 3c703052f6
commit 500d18b6c6
10 changed files with 287 additions and 79 deletions

View File

@@ -39,6 +39,8 @@ import (
//go:embed VERSION
var version []byte
var timeNow = time.Now
var defaultConfPaths = []string{
"rtsp-simple-server.yml",
"mediamtx.yml",
@@ -462,6 +464,7 @@ func (p *Core) createResources(initial bool) error {
RunOnConnectRestart: p.conf.RunOnConnectRestart,
RunOnDisconnect: p.conf.RunOnDisconnect,
ExternalCmdPool: p.externalCmdPool,
TimeNow: timeNow,
Metrics: p.metrics,
PathManager: p.pathManager,
Parent: p,
@@ -489,6 +492,7 @@ func (p *Core) createResources(initial bool) error {
RunOnConnectRestart: p.conf.RunOnConnectRestart,
RunOnDisconnect: p.conf.RunOnDisconnect,
ExternalCmdPool: p.externalCmdPool,
TimeNow: timeNow,
Metrics: p.metrics,
PathManager: p.pathManager,
Parent: p,

View File

@@ -42,35 +42,45 @@ func httpPullFile(t *testing.T, hc *http.Client, u string) []byte {
}
func TestMetrics(t *testing.T) {
serverCertFpath, err := test.CreateTempFile(test.TLSCertPub)
require.NoError(t, err)
defer os.Remove(serverCertFpath)
serverKeyFpath, err := test.CreateTempFile(test.TLSCertKey)
require.NoError(t, err)
defer os.Remove(serverKeyFpath)
p, ok := newInstance("api: yes\n" +
"hlsAlwaysRemux: yes\n" +
"metrics: yes\n" +
"webrtcServerCert: " + serverCertFpath + "\n" +
"webrtcServerKey: " + serverKeyFpath + "\n" +
"rtspEncryption: optional\n" +
"rtspServerCert: " + serverCertFpath + "\n" +
"rtspServerKey: " + serverKeyFpath + "\n" +
"rtmpEncryption: optional\n" +
"rtmpServerCert: " + serverCertFpath + "\n" +
"rtmpServerKey: " + serverKeyFpath + "\n" +
"paths:\n" +
" all_others:\n")
require.Equal(t, true, ok)
defer p.Close()
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
t.Run("initial", func(t *testing.T) {
serverCertFpath, err := test.CreateTempFile(test.TLSCertPub)
require.NoError(t, err)
defer os.Remove(serverCertFpath)
serverKeyFpath, err := test.CreateTempFile(test.TLSCertKey)
require.NoError(t, err)
defer os.Remove(serverKeyFpath)
n := 0
timeNow = func() time.Time {
d := time.Date(2009, 5, 20, 22, 15, 25, 427000, time.Local).Add(time.Duration(n) * 2 * time.Second)
n++
return d
}
defer func() {
timeNow = time.Now
}()
p, ok := newInstance("api: yes\n" +
"hlsAlwaysRemux: yes\n" +
"metrics: yes\n" +
"webrtcServerCert: " + serverCertFpath + "\n" +
"webrtcServerKey: " + serverKeyFpath + "\n" +
"rtspEncryption: optional\n" +
"rtspServerCert: " + serverCertFpath + "\n" +
"rtspServerKey: " + serverKeyFpath + "\n" +
"rtmpEncryption: optional\n" +
"rtmpServerCert: " + serverCertFpath + "\n" +
"rtmpServerKey: " + serverKeyFpath + "\n" +
"paths:\n" +
" all_others:\n")
require.Equal(t, true, ok)
defer p.Close()
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
bo := httpPullFile(t, hc, "http://localhost:9998/metrics")
require.Equal(t, `paths 0
@@ -169,6 +179,44 @@ webrtc_sessions_bytes_sent 0
})
t.Run("with data", func(t *testing.T) {
serverCertFpath, err := test.CreateTempFile(test.TLSCertPub)
require.NoError(t, err)
defer os.Remove(serverCertFpath)
serverKeyFpath, err := test.CreateTempFile(test.TLSCertKey)
require.NoError(t, err)
defer os.Remove(serverKeyFpath)
n := 0
timeNow = func() time.Time {
d := time.Date(2009, 5, 20, 22, 15, 25, 427000, time.Local).Add(time.Duration(n) * 2 * time.Second)
n++
return d
}
defer func() {
timeNow = time.Now
}()
p, ok := newInstance("api: yes\n" +
"hlsAlwaysRemux: yes\n" +
"metrics: yes\n" +
"webrtcServerCert: " + serverCertFpath + "\n" +
"webrtcServerKey: " + serverKeyFpath + "\n" +
"rtspEncryption: optional\n" +
"rtspServerCert: " + serverCertFpath + "\n" +
"rtspServerKey: " + serverKeyFpath + "\n" +
"rtmpEncryption: optional\n" +
"rtmpServerCert: " + serverCertFpath + "\n" +
"rtmpServerKey: " + serverKeyFpath + "\n" +
"paths:\n" +
" all_others:\n")
require.Equal(t, true, ok)
defer p.Close()
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
terminate := make(chan struct{})
var wg sync.WaitGroup
wg.Add(6)
@@ -193,6 +241,8 @@ webrtc_sessions_bytes_sent 0
<-terminate
}()
rtmpDone := make(chan struct{})
go func() {
defer wg.Done()
@@ -217,12 +267,16 @@ webrtc_sessions_bytes_sent 0
err = w.WriteH264(2*time.Second, 2*time.Second, [][]byte{{5, 2, 3, 4}})
require.NoError(t, err)
close(rtmpDone)
<-terminate
}()
go func() {
defer wg.Done()
<-rtmpDone
u, err := url.Parse("rtmps://localhost:1936/rtmps_path")
require.NoError(t, err)
@@ -248,6 +302,8 @@ webrtc_sessions_bytes_sent 0
<-terminate
}()
webrtcReady := make(chan struct{})
go func() {
defer wg.Done()
@@ -258,7 +314,7 @@ webrtc_sessions_bytes_sent 0
defer tr.CloseIdleConnections()
hc2 := &http.Client{Transport: tr}
track := &webrtc.OutgoingTrack{
track1 := &webrtc.OutgoingTrack{
Caps: pwebrtc.RTPCodecCapability{
MimeType: pwebrtc.MimeTypeH264,
ClockRate: 90000,
@@ -266,30 +322,44 @@ webrtc_sessions_bytes_sent 0
},
}
track2 := &webrtc.OutgoingTrack{
Caps: pwebrtc.RTPCodecCapability{
MimeType: pwebrtc.MimeTypeOpus,
ClockRate: 48000,
Channels: 2,
SDPFmtpLine: "minptime=10;useinbandfec=1;stereo=1;sprop-stereo=1",
},
}
s := &whip.Client{
HTTPClient: hc2,
URL: su,
Log: test.NilLogger,
Publish: true,
OutgoingTracks: []*webrtc.OutgoingTrack{track},
OutgoingTracks: []*webrtc.OutgoingTrack{track1, track2},
}
err = s.Initialize(context.Background())
require.NoError(t, err)
defer checkClose(t, s.Close)
err = track.WriteRTP(&rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
},
Payload: []byte{1},
})
require.NoError(t, err)
for _, track := range s.OutgoingTracks {
err = track.WriteRTP(&rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
},
Payload: []byte{1},
})
require.NoError(t, err)
}
close(webrtcReady)
<-terminate
}()
@@ -328,7 +398,8 @@ webrtc_sessions_bytes_sent 0
<-terminate
}()
time.Sleep(500*time.Millisecond + 2*time.Second)
<-webrtcReady
time.Sleep(1 * time.Second)
bo := httpPullFile(t, hc, "http://localhost:9998/metrics")
@@ -426,12 +497,12 @@ webrtc_sessions_bytes_sent 0
`srt_conns_bytes_send_drop\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_bytes_received_drop\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_bytes_received_undecrypt\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_us_packets_send_period\{id=".*?",state="publish"\} \d+\.\d+`+"\n"+
`srt_conns_us_packets_send_period\{id=".*?",state="publish"\} \d+(\.\d+)?`+"\n"+
`srt_conns_packets_flow_window\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_packets_flight_size\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_ms_rtt\{id=".*?",state="publish"\} \d+\.\d+`+"\n"+
`srt_conns_ms_rtt\{id=".*?",state="publish"\} \d+(\.\d+)?`+"\n"+
`srt_conns_mbps_send_rate\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_mbps_receive_rate\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_mbps_receive_rate\{id=".*?",state="publish"\} \d+(\.\d+)?`+"\n"+
`srt_conns_mbps_link_capacity\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_bytes_avail_send_buf\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_bytes_avail_receive_buf\{id=".*?",state="publish"\} [0-9]+`+"\n"+
@@ -460,6 +531,44 @@ webrtc_sessions_bytes_sent 0
})
t.Run("servers disabled", func(t *testing.T) {
serverCertFpath, err := test.CreateTempFile(test.TLSCertPub)
require.NoError(t, err)
defer os.Remove(serverCertFpath)
serverKeyFpath, err := test.CreateTempFile(test.TLSCertKey)
require.NoError(t, err)
defer os.Remove(serverKeyFpath)
n := 0
timeNow = func() time.Time {
d := time.Date(2009, 5, 20, 22, 15, 25, 427000, time.Local).Add(time.Duration(n) * 2 * time.Second)
n++
return d
}
defer func() {
timeNow = time.Now
}()
p, ok := newInstance("api: yes\n" +
"hlsAlwaysRemux: yes\n" +
"metrics: yes\n" +
"webrtcServerCert: " + serverCertFpath + "\n" +
"webrtcServerKey: " + serverKeyFpath + "\n" +
"rtspEncryption: optional\n" +
"rtspServerCert: " + serverCertFpath + "\n" +
"rtspServerKey: " + serverKeyFpath + "\n" +
"rtmpEncryption: optional\n" +
"rtmpServerCert: " + serverCertFpath + "\n" +
"rtmpServerKey: " + serverKeyFpath + "\n" +
"paths:\n" +
" all_others:\n")
require.Equal(t, true, ok)
defer p.Close()
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
httpRequest(t, hc, http.MethodPatch, "http://localhost:9997/v3/config/global/patch", map[string]interface{}{
"rtsp": false,
"rtmp": false,

View File

@@ -447,8 +447,16 @@ func TestPathRunOnRead(t *testing.T) {
require.NoError(t, err)
defer conn.Close()
n := 0
timeNow := func() time.Time {
d := time.Date(2009, 5, 20, 22, 15, 25, 427000, time.Local).Add(time.Duration(n) * 2 * time.Second)
n++
return d
}
r := &rtmp.Reader{
Conn: conn,
Conn: conn,
TimeNow: timeNow,
}
err = r.Initialize()
require.NoError(t, err)
@@ -483,8 +491,16 @@ func TestPathRunOnRead(t *testing.T) {
}
}()
n := 0
timeNow := func() time.Time {
d := time.Date(2009, 5, 20, 22, 15, 25, 427000, time.Local).Add(time.Duration(n) * 2 * time.Second)
n++
return d
}
r := &rtmp.Reader{
Conn: conn,
Conn: conn,
TimeNow: timeNow,
}
err = r.Initialize()
require.NoError(t, err)

View File

@@ -272,7 +272,8 @@ func sortedKeys(m map[uint8]format.Format) []int {
// Reader provides functions to read incoming data.
type Reader struct {
Conn Conn
Conn Conn
TimeNow func() time.Time
videoTracks map[uint8]format.Format
audioTracks map[uint8]format.Format
@@ -282,6 +283,10 @@ type Reader struct {
// Initialize initializes Reader.
func (r *Reader) Initialize() error {
if r.TimeNow == nil {
r.TimeNow = time.Now
}
var err error
r.videoTracks, r.audioTracks, err = r.readTracks()
if err != nil {
@@ -296,8 +301,8 @@ func (r *Reader) Initialize() error {
func (r *Reader) readTracks() (map[uint8]format.Format, map[uint8]format.Format, error) {
firstReceived := false
var startTime time.Duration
var curTime time.Duration
var startTime time.Time
var curTime time.Time
videoTracks := make(map[uint8]format.Format)
audioTracks := make(map[uint8]format.Format)
@@ -316,21 +321,23 @@ func (r *Reader) readTracks() (map[uint8]format.Format, map[uint8]format.Format,
return nil
}
handleVideoExCodedFrames := func(_ uint8, msg *message.VideoExCodedFrames) error {
handleVideoExCodedFrames := func(_ uint8, _ *message.VideoExCodedFrames) error {
now := r.TimeNow()
if !firstReceived {
firstReceived = true
startTime = msg.DTS
startTime = now
}
curTime = msg.DTS
curTime = now
return nil
}
handleVideoExFramesX := func(_ uint8, msg *message.VideoExFramesX) error {
handleVideoExFramesX := func(_ uint8, _ *message.VideoExFramesX) error {
now := r.TimeNow()
if !firstReceived {
firstReceived = true
startTime = msg.DTS
startTime = now
}
curTime = msg.DTS
curTime = now
return nil
}
@@ -346,11 +353,12 @@ func (r *Reader) readTracks() (map[uint8]format.Format, map[uint8]format.Format,
}
handleAudioCodedFrames := func(trackID uint8, msg *message.AudioExCodedFrames) error {
now := r.TimeNow()
if !firstReceived {
firstReceived = true
startTime = msg.DTS
startTime = now
}
curTime = msg.DTS
curTime = now
if audioTracks[trackID] != nil {
return nil
@@ -373,11 +381,12 @@ func (r *Reader) readTracks() (map[uint8]format.Format, map[uint8]format.Format,
switch msg := msg.(type) {
case *message.Video:
now := r.TimeNow()
if !firstReceived {
firstReceived = true
startTime = msg.DTS
startTime = now
}
curTime = msg.DTS
curTime = now
if msg.Type == message.VideoTypeConfig && videoTracks[0] == nil {
videoTracks[0], err = h264TrackFromConfig(msg.Payload)
@@ -430,11 +439,12 @@ func (r *Reader) readTracks() (map[uint8]format.Format, map[uint8]format.Format,
}
case *message.Audio:
now := r.TimeNow()
if !firstReceived {
firstReceived = true
startTime = msg.DTS
startTime = now
}
curTime = msg.DTS
curTime = now
if audioTracks[0] == nil && len(msg.Payload) != 0 {
if msg.Codec == message.CodecMPEG4Audio {
@@ -484,7 +494,7 @@ func (r *Reader) readTracks() (map[uint8]format.Format, map[uint8]format.Format,
}
}
if (curTime - startTime) >= analyzePeriod {
if curTime.Sub(startTime) >= analyzePeriod {
break
}
}

View File

@@ -1635,8 +1635,52 @@ func TestReadTracks(t *testing.T) {
}
c.initialize()
n := time.Duration(0)
TimeNow := func() time.Time {
var d time.Time
outer:
for {
msg := ca.messages[n]
n++
switch msg := msg.(type) {
case *message.Video:
d = time.Date(2008, 10, 28, 13, 11, 12, 0, time.UTC).Add(msg.DTS)
break outer
case *message.VideoExCodedFrames:
d = time.Date(2008, 10, 28, 13, 11, 12, 0, time.UTC).Add(msg.DTS)
break outer
case *message.VideoExMultitrack:
msg2 := msg.Wrapped
switch msg2 := msg2.(type) {
case *message.VideoExCodedFrames:
d = time.Date(2008, 10, 28, 13, 11, 12, 0, time.UTC).Add(msg2.DTS)
break outer
}
case *message.Audio:
d = time.Date(2008, 10, 28, 13, 11, 12, 0, time.UTC).Add(msg.DTS)
break outer
case *message.AudioExCodedFrames:
d = time.Date(2008, 10, 28, 13, 11, 12, 0, time.UTC).Add(msg.DTS)
break outer
case *message.AudioExMultitrack:
msg2 := msg.Wrapped
switch msg2 := msg2.(type) {
case *message.AudioExCodedFrames:
d = time.Date(2008, 10, 28, 13, 11, 12, 0, time.UTC).Add(msg2.DTS)
break outer
}
}
}
return d
}
r := &Reader{
Conn: c,
Conn: c,
TimeNow: TimeNow,
}
err = r.Initialize()
require.NoError(t, err)

View File

@@ -41,6 +41,7 @@ type conn struct {
wg *sync.WaitGroup
nconn net.Conn
externalCmdPool *externalcmd.Pool
timeNow func() time.Time
pathManager serverPathManager
parent *Server
@@ -259,7 +260,8 @@ func (c *conn) runPublish() error {
c.mutex.Unlock()
r := &rtmp.Reader{
Conn: c.rconn,
Conn: c.rconn,
TimeNow: c.timeNow,
}
err = r.Initialize()
if err != nil {

View File

@@ -10,6 +10,7 @@ import (
"reflect"
"sort"
"sync"
"time"
"github.com/google/uuid"
@@ -84,6 +85,7 @@ type Server struct {
RunOnConnectRestart bool
RunOnDisconnect string
ExternalCmdPool *externalcmd.Pool
TimeNow func() time.Time
Metrics serverMetrics
PathManager serverPathManager
Parent serverParent
@@ -216,6 +218,7 @@ outer:
wg: &s.wg,
nconn: nconn,
externalCmdPool: s.ExternalCmdPool,
timeNow: s.TimeNow,
pathManager: s.PathManager,
parent: s,
}

View File

@@ -97,6 +97,13 @@ func TestServerPublish(t *testing.T) {
},
}
n := 0
timeNow := func() time.Time {
d := time.Date(2009, 5, 20, 22, 15, 25, 427000, time.Local).Add(time.Duration(n) * 2 * time.Second)
n++
return d
}
s := &Server{
Address: "127.0.0.1:1935",
ReadTimeout: conf.Duration(10 * time.Second),
@@ -109,6 +116,7 @@ func TestServerPublish(t *testing.T) {
RunOnConnectRestart: false,
RunOnDisconnect: "",
ExternalCmdPool: nil,
TimeNow: timeNow,
PathManager: pathManager,
Parent: test.NilLogger,
}
@@ -146,12 +154,6 @@ func TestServerPublish(t *testing.T) {
err = w.Initialize()
require.NoError(t, err)
err = w.WriteH264(
2*time.Second, 2*time.Second, [][]byte{
{5, 2, 3, 4},
})
require.NoError(t, err)
<-path.streamCreated
recv := make(chan struct{})
@@ -166,7 +168,7 @@ func TestServerPublish(t *testing.T) {
require.Equal(t, [][]byte{
test.FormatH264.SPS,
test.FormatH264.PPS,
{5, 2, 3, 4},
{5, 6, 7, 8},
}, u.(*unit.H264).AU)
close(recv)
return nil
@@ -177,7 +179,7 @@ func TestServerPublish(t *testing.T) {
err = w.WriteH264(
3*time.Second, 3*time.Second, [][]byte{
{5, 2, 3, 4},
{5, 6, 7, 8},
})
require.NoError(t, err)
@@ -303,8 +305,16 @@ func TestServerRead(t *testing.T) {
})
}()
n := 0
timeNow := func() time.Time {
d := time.Date(2009, 5, 20, 22, 15, 25, 427000, time.Local).Add(time.Duration(n) * 2 * time.Second)
n++
return d
}
r := &rtmp.Reader{
Conn: conn,
Conn: conn,
TimeNow: timeNow,
}
err = r.Initialize()
require.NoError(t, err)

View File

@@ -18,6 +18,8 @@ import (
"github.com/bluenviron/mediamtx/internal/stream"
)
var timeNow = time.Now
// Source is a RTMP static source.
type Source struct {
ReadTimeout conf.Duration
@@ -86,7 +88,8 @@ func (s *Source) runReader(ctx context.Context, u *url.URL, fingerprint string)
}
r := &rtmp.Reader{
Conn: conn,
Conn: conn,
TimeNow: timeNow,
}
err = r.Initialize()
if err != nil {

View File

@@ -83,9 +83,6 @@ func TestSource(t *testing.T) {
err = w.WriteH264(2*time.Second, 2*time.Second, [][]byte{{5, 2, 3, 4}})
require.NoError(t, err)
err = w.WriteH264(3*time.Second, 3*time.Second, [][]byte{{5, 2, 3, 4}})
require.NoError(t, err)
break
}
}()
@@ -104,6 +101,16 @@ func TestSource(t *testing.T) {
source += "localhost/teststream"
n := 0
timeNow = func() time.Time {
d := time.Date(2009, 5, 20, 22, 15, 25, 427000, time.Local).Add(time.Duration(n) * 2 * time.Second)
n++
return d
}
defer func() {
timeNow = time.Now
}()
te := test.NewSourceTester(
func(p defs.StaticSourceParent) defs.StaticSource {
return &Source{