support assigning paths to different configurations without closing stream (#4576)

Co-authored-by: aler9 <46489434+aler9@users.noreply.github.com>
This commit is contained in:
Jeremías Robles
2025-06-03 22:03:30 +03:00
committed by GitHub
parent c21c969a8c
commit f9ce572691
2 changed files with 135 additions and 18 deletions

View File

@@ -19,6 +19,9 @@ import (
func pathConfCanBeUpdated(oldPathConf *conf.Path, newPathConf *conf.Path) bool {
clone := oldPathConf.Clone()
clone.Name = newPathConf.Name
clone.Regexp = newPathConf.Regexp
clone.Record = newPathConf.Record
clone.RPICameraBrightness = newPathConf.RPICameraBrightness
@@ -51,8 +54,9 @@ type pathSetHLSServerReq struct {
}
type pathData struct {
path *path
ready bool
path *path
ready bool
confName string
}
type pathManagerParent interface {
@@ -209,34 +213,37 @@ func (pm *pathManager) doReloadConf(newPaths map[string]*conf.Path) {
// process existing paths
for pathName, pathData := range pm.paths {
path := pathData.path
pathConf, _, err := conf.FindPathConf(newPaths, pathName)
newPathConf, _, err := conf.FindPathConf(newPaths, pathName)
// path does not have a config anymore: delete it
if err != nil {
pm.removePath(path)
path.close()
path.wait() // avoid conflicts between sources
pm.removeAndClosePath(path)
continue
}
// path now belongs to a different config: delete it
if pathConf.Name != path.conf.Name {
pm.removePath(path)
path.close()
path.wait() // avoid conflicts between sources
// path now belongs to a different config
if newPathConf.Name != pathData.confName {
// path config can be hot reloaded
oldPathConf := pm.pathConfs[pathData.confName]
if pathConfCanBeUpdated(oldPathConf, newPathConf) {
pm.paths[path.name].confName = newPathConf.Name
go path.reloadConf(newPathConf)
continue
}
// Configuration cannot be hot reloaded: delete the path
pm.removeAndClosePath(path)
continue
}
// path configuration has changed and cannot be hot reloaded: delete path
if _, ok := confsToRecreate[pathConf.Name]; ok {
pm.removePath(path)
path.close()
path.wait() // avoid conflicts between sources
if _, ok := confsToRecreate[newPathConf.Name]; ok {
pm.removeAndClosePath(path)
continue
}
// path configuration has changed but can be hot reloaded: reload it
if _, ok := confsToReload[pathConf.Name]; ok {
go path.reloadConf(pathConf)
if _, ok := confsToReload[newPathConf.Name]; ok {
go path.reloadConf(newPathConf)
}
}
@@ -252,6 +259,12 @@ func (pm *pathManager) doReloadConf(newPaths map[string]*conf.Path) {
}
}
func (pm *pathManager) removeAndClosePath(path *path) {
pm.removePath(path)
path.close()
path.wait() // avoid conflicts between sources
}
func (pm *pathManager) doSetHLSServer(m *hls.Server) []defs.Path {
pm.hlsServer = m
@@ -425,7 +438,10 @@ func (pm *pathManager) createPath(
}
pa.initialize()
pm.paths[name] = &pathData{path: pa}
pm.paths[name] = &pathData{
path: pa,
confName: pathConf.Name,
}
}
func (pm *pathManager) removePath(pa *path) {

View File

@@ -3,11 +3,18 @@ package core
import (
"bufio"
"net"
"net/http"
"testing"
"time"
"github.com/bluenviron/gortsplib/v4"
"github.com/bluenviron/gortsplib/v4/pkg/base"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/headers"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
"github.com/bluenviron/mediamtx/internal/test"
)
func TestPathAutoDeletion(t *testing.T) {
@@ -82,3 +89,97 @@ func TestPathAutoDeletion(t *testing.T) {
})
}
}
func TestPathConfigurationHotReload(t *testing.T) {
// Start MediaMTX with basic configuration
p, ok := newInstance("api: yes\n" +
"paths:\n" +
" all:\n" +
" record: no\n")
require.Equal(t, true, ok)
defer p.Close()
// Set up HTTP client for API calls
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
// Create a publisher that will use the "all" configuration
media0 := test.UniqueMediaH264()
source := gortsplib.Client{}
err := source.StartRecording(
"rtsp://localhost:8554/undefined_stream",
&description.Session{Medias: []*description.Media{media0}})
require.NoError(t, err)
defer source.Close()
// Send some data to establish the stream
err = source.WritePacketRTP(media0, &rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: 96,
},
Payload: []byte{5, 1, 2, 3, 4},
})
require.NoError(t, err)
time.Sleep(100 * time.Millisecond)
// Verify the path exists and is using the "all" configuration
pathData, err := p.pathManager.APIPathsGet("undefined_stream")
require.NoError(t, err)
require.Equal(t, "undefined_stream", pathData.Name)
require.Equal(t, "all", pathData.ConfName)
// Check the current configuration via API
var allConfig map[string]interface{}
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/paths/get/all", nil, &allConfig)
require.Equal(t, false, allConfig["record"]) // Should be false from "all" config
// Add a new specific configuration for "undefined_stream" with record enabled
httpRequest(t, hc, http.MethodPost, "http://localhost:9997/v3/config/paths/add/undefined_stream",
map[string]interface{}{
"record": true,
}, nil)
// Give the system time to process the configuration change
time.Sleep(200 * time.Millisecond)
// Verify the path now uses the new specific configuration
pathData, err = p.pathManager.APIPathsGet("undefined_stream")
require.NoError(t, err)
require.Equal(t, "undefined_stream", pathData.Name)
require.Equal(t, "undefined_stream", pathData.ConfName) // Should now use the specific config
// Check the new configuration via API
var newConfig map[string]interface{}
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/paths/get/undefined_stream", nil, &newConfig)
require.Equal(t, true, newConfig["record"]) // Should be true from new config
// Verify the stream is still active and working
err = source.WritePacketRTP(media0, &rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: 96,
SequenceNumber: 2,
},
Payload: []byte{5, 1, 2, 3, 4},
})
require.NoError(t, err)
// Verify the path is still ready and functional
require.Equal(t, true, pathData.Ready)
// revert configuration
httpRequest(t, hc, http.MethodDelete, "http://localhost:9997/v3/config/paths/delete/undefined_stream",
nil, nil)
// Give the system time to process the configuration change
time.Sleep(200 * time.Millisecond)
// Verify the path now uses the old configuration
pathData, err = p.pathManager.APIPathsGet("undefined_stream")
require.NoError(t, err)
require.Equal(t, "undefined_stream", pathData.Name)
require.Equal(t, "all", pathData.ConfName)
}