Add rtmp proxying

This commit is contained in:
Ingo Oppermann
2022-08-05 16:34:08 +02:00
parent 5af85c6a8b
commit f4acc0457f
6 changed files with 131 additions and 25 deletions

View File

@@ -729,6 +729,7 @@ func (a *api) start() error {
Token: cfg.RTMP.Token, Token: cfg.RTMP.Token,
Logger: a.log.logger.rtmp, Logger: a.log.logger.rtmp,
Collector: a.sessions.Collector("rtmp"), Collector: a.sessions.Collector("rtmp"),
Cluster: a.cluster,
} }
if autocertManager != nil && cfg.RTMP.EnableTLS { if autocertManager != nil && cfg.RTMP.EnableTLS {

View File

@@ -67,7 +67,7 @@ type RestClient interface {
ProcessMetadata(id, key string) (api.Metadata, error) // GET /process/{id}/metadata/{key} ProcessMetadata(id, key string) (api.Metadata, error) // GET /process/{id}/metadata/{key}
ProcessMetadataSet(id, key string, metadata api.Metadata) error // PUT /process/{id}/metadata/{key} ProcessMetadataSet(id, key string, metadata api.Metadata) error // PUT /process/{id}/metadata/{key}
RTMPChannels() (api.RTMPChannel, error) // GET /rtmp RTMPChannels() ([]api.RTMPChannel, error) // GET /rtmp
Sessions(collectors []string) (api.SessionsSummary, error) // GET /session Sessions(collectors []string) (api.SessionsSummary, error) // GET /session
SessionsActive(collectors []string) (api.SessionsActive, error) // GET /session/active SessionsActive(collectors []string) (api.SessionsActive, error) // GET /session/active

View File

@@ -6,10 +6,10 @@ import (
"github.com/datarhei/core/v16/http/api" "github.com/datarhei/core/v16/http/api"
) )
func (r *restclient) RTMPChannels() (api.RTMPChannel, error) { func (r *restclient) RTMPChannels() ([]api.RTMPChannel, error) {
var m api.RTMPChannel var m []api.RTMPChannel
data, err := r.call("GET", "rtmp", "", nil) data, err := r.call("GET", "/rtmp", "", nil)
if err != nil { if err != nil {
return m, err return m, err
} }

View File

@@ -3,8 +3,6 @@ package cluster
import ( import (
"context" "context"
"fmt" "fmt"
"path/filepath"
"regexp"
"sync" "sync"
"time" "time"
@@ -36,8 +34,6 @@ type cluster struct {
cancel context.CancelFunc cancel context.CancelFunc
once sync.Once once sync.Once
prefix *regexp.Regexp
logger log.Logger logger log.Logger
} }
@@ -48,7 +44,6 @@ func New(config ClusterConfig) (Cluster, error) {
idupdate: map[string]time.Time{}, idupdate: map[string]time.Time{},
fileid: map[string]string{}, fileid: map[string]string{},
updates: make(chan NodeState, 64), updates: make(chan NodeState, 64),
prefix: regexp.MustCompile(`^[a-z]+:`),
logger: config.Logger, logger: config.Logger,
} }
@@ -203,18 +198,9 @@ func (c *cluster) GetURL(path string) (string, error) {
return "", fmt.Errorf("file not found") return "", fmt.Errorf("file not found")
} }
// Remove prefix from path url, err := node.GetURL(path)
prefix := c.prefix.FindString(path) if err != nil {
path = c.prefix.ReplaceAllString(path, "") c.logger.Debug().WithField("path", path).Log("invalid path")
url := ""
if prefix == "memfs:" {
url = node.Address() + "/" + filepath.Join("memfs", path)
} else if prefix == "diskfs:" {
url = node.Address() + path
} else {
c.logger.Debug().WithField("path", path).WithField("prefix", prefix).Log("unknown prefix")
return "", fmt.Errorf("file not found") return "", fmt.Errorf("file not found")
} }

View File

@@ -2,7 +2,13 @@ package cluster
import ( import (
"context" "context"
"fmt"
"net"
"net/http" "net/http"
"net/url"
"path/filepath"
"regexp"
"strings"
"sync" "sync"
"time" "time"
@@ -44,15 +50,40 @@ type node struct {
lock sync.RWMutex lock sync.RWMutex
cancel context.CancelFunc cancel context.CancelFunc
once sync.Once once sync.Once
host string
secure bool
hasRTMP bool
rtmpAddress string
rtmpToken string
hasSRT bool
srtPort string
srtPassphrase string
srtToken string
prefix *regexp.Regexp
} }
func newNode(address, username, password string, updates chan<- NodeState) (*node, error) { func newNode(address, username, password string, updates chan<- NodeState) (*node, error) {
u, err := url.Parse(address)
if err != nil {
return nil, fmt.Errorf("invalid address: %w", err)
}
host, _, err := net.SplitHostPort(u.Host)
if err != nil {
return nil, fmt.Errorf("invalid address: %w", err)
}
n := &node{ n := &node{
address: address, address: address,
username: username, username: username,
password: password, password: password,
state: stateDisconnected, state: stateDisconnected,
updates: updates, updates: updates,
prefix: regexp.MustCompile(`^[a-z]+:`),
host: host,
secure: strings.HasPrefix(address, "https://"),
} }
peer, err := client.New(client.Config{ peer, err := client.New(client.Config{
@@ -64,11 +95,49 @@ func newNode(address, username, password string, updates chan<- NodeState) (*nod
Timeout: 5 * time.Second, Timeout: 5 * time.Second,
}, },
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }
config, err := peer.Config()
if err != nil {
return nil, err
}
if config.Config.RTMP.Enable {
n.hasRTMP = true
n.rtmpAddress = "rtmp://"
isHostIP := net.ParseIP(host) != nil
address := config.Config.RTMP.Address
if n.secure && config.Config.RTMP.EnableTLS && !isHostIP {
address = config.Config.RTMP.AddressTLS
n.rtmpAddress = "rtmps://"
}
_, port, err := net.SplitHostPort(address)
if err != nil {
n.hasRTMP = false
} else {
n.rtmpAddress += host + ":" + port
n.rtmpToken = config.Config.RTMP.Token
}
}
if config.Config.SRT.Enable {
n.hasSRT = true
_, port, err := net.SplitHostPort(config.Config.SRT.Address)
if err != nil {
n.hasSRT = false
} else {
n.srtPort = port
n.srtPassphrase = config.Config.SRT.Passphrase
n.srtToken = config.Config.SRT.Token
}
}
n.peer = peer n.peer = peer
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@@ -133,10 +202,11 @@ func (n *node) stop() {
func (n *node) files() { func (n *node) files() {
memfsfiles, errMemfs := n.peer.MemFSList("name", "asc") memfsfiles, errMemfs := n.peer.MemFSList("name", "asc")
diskfsfiles, errDiskfs := n.peer.DiskFSList("name", "asc") diskfsfiles, errDiskfs := n.peer.DiskFSList("name", "asc")
rtmpfiles, errRTMP := n.peer.RTMPChannels()
n.lastUpdate = time.Now() n.lastUpdate = time.Now()
if errMemfs != nil || errDiskfs != nil { if errMemfs != nil || errDiskfs != nil || errRTMP != nil {
n.fileList = nil n.fileList = nil
n.state = stateDisconnected n.state = stateDisconnected
return return
@@ -144,7 +214,7 @@ func (n *node) files() {
n.state = stateConnected n.state = stateConnected
n.fileList = make([]string, len(memfsfiles)+len(diskfsfiles)) n.fileList = make([]string, len(memfsfiles)+len(diskfsfiles)+len(rtmpfiles))
nfiles := 0 nfiles := 0
@@ -158,5 +228,33 @@ func (n *node) files() {
nfiles++ nfiles++
} }
for _, file := range rtmpfiles {
n.fileList[nfiles] = "rtmp:" + file.Name
nfiles++
}
return return
} }
func (n *node) GetURL(path string) (string, error) {
// Remove prefix from path
prefix := n.prefix.FindString(path)
path = n.prefix.ReplaceAllString(path, "")
u := ""
if prefix == "memfs:" {
u = n.address + "/" + filepath.Join("memfs", path)
} else if prefix == "diskfs:" {
u = n.address + path
} else if prefix == "rtmp:" {
u = n.rtmpAddress + path
if len(n.rtmpToken) != 0 {
u += "?token=" + url.QueryEscape(n.rtmpToken)
}
} else {
return "", fmt.Errorf("unknown prefix")
}
return u, nil
}

View File

@@ -11,6 +11,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/datarhei/core/v16/cluster"
"github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/session" "github.com/datarhei/core/v16/session"
@@ -194,6 +195,8 @@ type Config struct {
// ListenAndServe, so it's not possible to modify the configuration // ListenAndServe, so it's not possible to modify the configuration
// with methods like tls.Config.SetSessionTicketKeys. // with methods like tls.Config.SetSessionTicketKeys.
TLSConfig *tls.Config TLSConfig *tls.Config
Cluster cluster.Cluster
} }
// Server represents a RTMP server // Server represents a RTMP server
@@ -227,6 +230,8 @@ type server struct {
// access to the map. // access to the map.
channels map[string]*channel channels map[string]*channel
lock sync.RWMutex lock sync.RWMutex
cluster cluster.Cluster
} }
// New creates a new RTMP server according to the given config // New creates a new RTMP server according to the given config
@@ -244,6 +249,7 @@ func New(config Config) (Server, error) {
token: config.Token, token: config.Token,
logger: config.Logger, logger: config.Logger,
collector: config.Collector, collector: config.Collector,
cluster: config.Cluster,
} }
if s.collector == nil { if s.collector == nil {
@@ -397,7 +403,22 @@ func (s *server) handlePlay(conn *rtmp.Conn) {
s.log("PLAY", "STOP", conn.URL.Path, "", client) s.log("PLAY", "STOP", conn.URL.Path, "", client)
} else { } else {
s.log("PLAY", "NOTFOUND", conn.URL.Path, "", client) // Check in the cluster for that stream
if s.cluster != nil {
url, err := s.cluster.GetURL("rtmp:" + conn.URL.Path)
if err != nil {
s.log("PLAY", "NOTFOUND", conn.URL.Path, "", client)
} else {
s.log("PLAY", "PROXYSTART", url, "", client)
src, _ := avutil.Open(url)
avutil.CopyFile(conn, src)
s.log("PLAY", "PROXYSTOP", url, "", client)
}
} else {
s.log("PLAY", "NOTFOUND", conn.URL.Path, "", client)
}
} }
conn.Close() conn.Close()