diff --git a/app/api/api.go b/app/api/api.go index 47b1d442..d677e31f 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -729,6 +729,7 @@ func (a *api) start() error { Token: cfg.RTMP.Token, Logger: a.log.logger.rtmp, Collector: a.sessions.Collector("rtmp"), + Cluster: a.cluster, } if autocertManager != nil && cfg.RTMP.EnableTLS { diff --git a/client/client.go b/client/client.go index 8713ba6d..f1cd3bf1 100644 --- a/client/client.go +++ b/client/client.go @@ -67,7 +67,7 @@ type RestClient interface { ProcessMetadata(id, key string) (api.Metadata, error) // GET /process/{id}/metadata/{key} ProcessMetadataSet(id, key string, metadata api.Metadata) error // PUT /process/{id}/metadata/{key} - RTMPChannels() (api.RTMPChannel, error) // GET /rtmp + RTMPChannels() ([]api.RTMPChannel, error) // GET /rtmp Sessions(collectors []string) (api.SessionsSummary, error) // GET /session SessionsActive(collectors []string) (api.SessionsActive, error) // GET /session/active diff --git a/client/rtmp.go b/client/rtmp.go index e86eeccb..6f3e714b 100644 --- a/client/rtmp.go +++ b/client/rtmp.go @@ -6,10 +6,10 @@ import ( "github.com/datarhei/core/v16/http/api" ) -func (r *restclient) RTMPChannels() (api.RTMPChannel, error) { - var m api.RTMPChannel +func (r *restclient) RTMPChannels() ([]api.RTMPChannel, error) { + var m []api.RTMPChannel - data, err := r.call("GET", "rtmp", "", nil) + data, err := r.call("GET", "/rtmp", "", nil) if err != nil { return m, err } diff --git a/cluster/cluster.go b/cluster/cluster.go index d0567ec5..6358bcf2 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -3,8 +3,6 @@ package cluster import ( "context" "fmt" - "path/filepath" - "regexp" "sync" "time" @@ -36,8 +34,6 @@ type cluster struct { cancel context.CancelFunc once sync.Once - prefix *regexp.Regexp - logger log.Logger } @@ -48,7 +44,6 @@ func New(config ClusterConfig) (Cluster, error) { idupdate: map[string]time.Time{}, fileid: map[string]string{}, updates: make(chan NodeState, 64), - prefix: regexp.MustCompile(`^[a-z]+:`), logger: config.Logger, } @@ -203,18 +198,9 @@ func (c *cluster) GetURL(path string) (string, error) { return "", fmt.Errorf("file not found") } - // Remove prefix from path - prefix := c.prefix.FindString(path) - path = c.prefix.ReplaceAllString(path, "") - - url := "" - - if prefix == "memfs:" { - url = node.Address() + "/" + filepath.Join("memfs", path) - } else if prefix == "diskfs:" { - url = node.Address() + path - } else { - c.logger.Debug().WithField("path", path).WithField("prefix", prefix).Log("unknown prefix") + url, err := node.GetURL(path) + if err != nil { + c.logger.Debug().WithField("path", path).Log("invalid path") return "", fmt.Errorf("file not found") } diff --git a/cluster/node.go b/cluster/node.go index 5daea55e..9725df56 100644 --- a/cluster/node.go +++ b/cluster/node.go @@ -2,7 +2,13 @@ package cluster import ( "context" + "fmt" + "net" "net/http" + "net/url" + "path/filepath" + "regexp" + "strings" "sync" "time" @@ -44,15 +50,40 @@ type node struct { lock sync.RWMutex cancel context.CancelFunc once sync.Once + + host string + secure bool + hasRTMP bool + rtmpAddress string + rtmpToken string + hasSRT bool + srtPort string + srtPassphrase string + srtToken string + + prefix *regexp.Regexp } func newNode(address, username, password string, updates chan<- NodeState) (*node, error) { + u, err := url.Parse(address) + if err != nil { + return nil, fmt.Errorf("invalid address: %w", err) + } + + host, _, err := net.SplitHostPort(u.Host) + if err != nil { + return nil, fmt.Errorf("invalid address: %w", err) + } + n := &node{ address: address, username: username, password: password, state: stateDisconnected, updates: updates, + prefix: regexp.MustCompile(`^[a-z]+:`), + host: host, + secure: strings.HasPrefix(address, "https://"), } peer, err := client.New(client.Config{ @@ -64,11 +95,49 @@ func newNode(address, username, password string, updates chan<- NodeState) (*nod Timeout: 5 * time.Second, }, }) - if err != nil { return nil, err } + config, err := peer.Config() + if err != nil { + return nil, err + } + + if config.Config.RTMP.Enable { + n.hasRTMP = true + n.rtmpAddress = "rtmp://" + + isHostIP := net.ParseIP(host) != nil + + address := config.Config.RTMP.Address + if n.secure && config.Config.RTMP.EnableTLS && !isHostIP { + address = config.Config.RTMP.AddressTLS + n.rtmpAddress = "rtmps://" + } + + _, port, err := net.SplitHostPort(address) + if err != nil { + n.hasRTMP = false + } else { + n.rtmpAddress += host + ":" + port + n.rtmpToken = config.Config.RTMP.Token + } + } + + if config.Config.SRT.Enable { + n.hasSRT = true + + _, port, err := net.SplitHostPort(config.Config.SRT.Address) + if err != nil { + n.hasSRT = false + } else { + n.srtPort = port + n.srtPassphrase = config.Config.SRT.Passphrase + n.srtToken = config.Config.SRT.Token + } + } + n.peer = peer ctx, cancel := context.WithCancel(context.Background()) @@ -133,10 +202,11 @@ func (n *node) stop() { func (n *node) files() { memfsfiles, errMemfs := n.peer.MemFSList("name", "asc") diskfsfiles, errDiskfs := n.peer.DiskFSList("name", "asc") + rtmpfiles, errRTMP := n.peer.RTMPChannels() n.lastUpdate = time.Now() - if errMemfs != nil || errDiskfs != nil { + if errMemfs != nil || errDiskfs != nil || errRTMP != nil { n.fileList = nil n.state = stateDisconnected return @@ -144,7 +214,7 @@ func (n *node) files() { n.state = stateConnected - n.fileList = make([]string, len(memfsfiles)+len(diskfsfiles)) + n.fileList = make([]string, len(memfsfiles)+len(diskfsfiles)+len(rtmpfiles)) nfiles := 0 @@ -158,5 +228,33 @@ func (n *node) files() { nfiles++ } + for _, file := range rtmpfiles { + n.fileList[nfiles] = "rtmp:" + file.Name + nfiles++ + } + return } + +func (n *node) GetURL(path string) (string, error) { + // Remove prefix from path + prefix := n.prefix.FindString(path) + path = n.prefix.ReplaceAllString(path, "") + + u := "" + + if prefix == "memfs:" { + u = n.address + "/" + filepath.Join("memfs", path) + } else if prefix == "diskfs:" { + u = n.address + path + } else if prefix == "rtmp:" { + u = n.rtmpAddress + path + if len(n.rtmpToken) != 0 { + u += "?token=" + url.QueryEscape(n.rtmpToken) + } + } else { + return "", fmt.Errorf("unknown prefix") + } + + return u, nil +} diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index 17809a49..4cea0e13 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/datarhei/core/v16/cluster" "github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/session" @@ -194,6 +195,8 @@ type Config struct { // ListenAndServe, so it's not possible to modify the configuration // with methods like tls.Config.SetSessionTicketKeys. TLSConfig *tls.Config + + Cluster cluster.Cluster } // Server represents a RTMP server @@ -227,6 +230,8 @@ type server struct { // access to the map. channels map[string]*channel lock sync.RWMutex + + cluster cluster.Cluster } // New creates a new RTMP server according to the given config @@ -244,6 +249,7 @@ func New(config Config) (Server, error) { token: config.Token, logger: config.Logger, collector: config.Collector, + cluster: config.Cluster, } if s.collector == nil { @@ -397,7 +403,22 @@ func (s *server) handlePlay(conn *rtmp.Conn) { s.log("PLAY", "STOP", conn.URL.Path, "", client) } else { - s.log("PLAY", "NOTFOUND", conn.URL.Path, "", client) + // Check in the cluster for that stream + if s.cluster != nil { + url, err := s.cluster.GetURL("rtmp:" + conn.URL.Path) + if err != nil { + s.log("PLAY", "NOTFOUND", conn.URL.Path, "", client) + } else { + s.log("PLAY", "PROXYSTART", url, "", client) + + src, _ := avutil.Open(url) + avutil.CopyFile(conn, src) + + s.log("PLAY", "PROXYSTOP", url, "", client) + } + } else { + s.log("PLAY", "NOTFOUND", conn.URL.Path, "", client) + } } conn.Close()