mirror of
https://github.com/datarhei/core.git
synced 2025-10-05 16:07:07 +08:00
513 lines
12 KiB
Go
513 lines
12 KiB
Go
// Package rtmp provides an RTMP server
|
|
package rtmp
|
|
|
|
import (
|
|
"crypto/tls"
|
|
"fmt"
|
|
"net"
|
|
"net/url"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/datarhei/core/v16/cluster/proxy"
|
|
"github.com/datarhei/core/v16/iam"
|
|
"github.com/datarhei/core/v16/log"
|
|
"github.com/datarhei/core/v16/session"
|
|
|
|
"github.com/datarhei/joy4/av/avutil"
|
|
"github.com/datarhei/joy4/av/pktque"
|
|
"github.com/datarhei/joy4/format"
|
|
"github.com/datarhei/joy4/format/rtmp"
|
|
)
|
|
|
|
// ErrServerClosed is returned by ListenAndServe (or ListenAndServeTLS) if the server
|
|
// has been closed regularly with the Close() function.
|
|
var ErrServerClosed = rtmp.ErrServerClosed
|
|
|
|
func init() {
|
|
format.RegisterAll()
|
|
}
|
|
|
|
// Config for a new RTMP server
|
|
type Config struct {
|
|
// Logger. Optional.
|
|
Logger log.Logger
|
|
|
|
Collector session.Collector
|
|
|
|
// The address the RTMP server should listen on, e.g. ":1935"
|
|
Addr string
|
|
|
|
// The address the RTMPS server should listen on, e.g. ":1936"
|
|
TLSAddr string
|
|
|
|
// The app path for the streams, e.g. "/live". Optional. Defaults
|
|
// to "/".
|
|
App string
|
|
|
|
// A token that needs to be added to the URL as query string
|
|
// in order push or pull a stream. The key for the query
|
|
// parameter is "token". Optional. By default no token is
|
|
// required.
|
|
Token string
|
|
|
|
// TLSConfig optionally provides a TLS configuration for use
|
|
// by ListenAndServe. Note that this value is cloned by
|
|
// ListenAndServe, so it's not possible to modify the configuration
|
|
// with methods like tls.Config.SetSessionTicketKeys.
|
|
TLSConfig *tls.Config
|
|
|
|
Proxy proxy.ProxyReader
|
|
|
|
IAM iam.IAM
|
|
}
|
|
|
|
// Server represents a RTMP server
|
|
type Server interface {
|
|
// ListenAndServe starts the RTMP server
|
|
ListenAndServe() error
|
|
|
|
// ListenAndServe starts the RTMPS server
|
|
ListenAndServeTLS(certFile, keyFile string) error
|
|
|
|
// Close stops the RTMP server and closes all connections
|
|
Close()
|
|
|
|
// Channels return a list of currently publishing streams
|
|
Channels() []string
|
|
}
|
|
|
|
// server is an implementation of the Server interface
|
|
type server struct {
|
|
// Configuration parameter taken from the Config
|
|
app string
|
|
token string
|
|
logger log.Logger
|
|
collector session.Collector
|
|
|
|
// A joy4 RTMP server instance
|
|
server *rtmp.Server
|
|
tlsServer *rtmp.Server
|
|
|
|
// Map of publishing channels and a lock to serialize
|
|
// access to the map.
|
|
channels map[string]*channel
|
|
lock sync.RWMutex
|
|
|
|
proxy proxy.ProxyReader
|
|
|
|
iam iam.IAM
|
|
}
|
|
|
|
// New creates a new RTMP server according to the given config
|
|
func New(config Config) (Server, error) {
|
|
if len(config.App) == 0 {
|
|
config.App = "/"
|
|
}
|
|
|
|
if config.Logger == nil {
|
|
config.Logger = log.New("")
|
|
}
|
|
|
|
s := &server{
|
|
app: filepath.Join("/", config.App),
|
|
token: config.Token,
|
|
logger: config.Logger,
|
|
collector: config.Collector,
|
|
proxy: config.Proxy,
|
|
iam: config.IAM,
|
|
}
|
|
|
|
if s.collector == nil {
|
|
s.collector = session.NewNullCollector()
|
|
}
|
|
|
|
if s.proxy == nil {
|
|
s.proxy = proxy.NewNullProxyReader()
|
|
}
|
|
|
|
s.server = &rtmp.Server{
|
|
Addr: config.Addr,
|
|
HandlePlay: s.handlePlay,
|
|
HandlePublish: s.handlePublish,
|
|
}
|
|
|
|
if len(config.TLSAddr) != 0 {
|
|
s.tlsServer = &rtmp.Server{
|
|
Addr: config.TLSAddr,
|
|
TLSConfig: config.TLSConfig.Clone(),
|
|
HandlePlay: s.handlePlay,
|
|
HandlePublish: s.handlePublish,
|
|
}
|
|
}
|
|
|
|
s.channels = make(map[string]*channel)
|
|
|
|
rtmp.Debug = false
|
|
|
|
return s, nil
|
|
}
|
|
|
|
// ListenAndServe starts the RMTP server
|
|
func (s *server) ListenAndServe() error {
|
|
return s.server.ListenAndServe()
|
|
}
|
|
|
|
func (s *server) ListenAndServeTLS(certFile, keyFile string) error {
|
|
if s.tlsServer == nil {
|
|
return fmt.Errorf("RTMPS server is not configured")
|
|
}
|
|
|
|
return s.tlsServer.ListenAndServeTLS(certFile, keyFile)
|
|
}
|
|
|
|
func (s *server) Close() {
|
|
// Stop listening
|
|
s.server.Close()
|
|
|
|
if s.tlsServer != nil {
|
|
s.tlsServer.Close()
|
|
}
|
|
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
// Close all channels
|
|
for _, ch := range s.channels {
|
|
ch.Close()
|
|
}
|
|
}
|
|
|
|
// Channels returns the list of streams that are
|
|
// publishing currently
|
|
func (s *server) Channels() []string {
|
|
channels := []string{}
|
|
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
|
|
for key := range s.channels {
|
|
channels = append(channels, key)
|
|
}
|
|
|
|
return channels
|
|
}
|
|
|
|
func (s *server) log(who, what, action, path, message string, client net.Addr) {
|
|
s.logger.Info().WithFields(log.Fields{
|
|
"who": who,
|
|
"what": what,
|
|
"action": action,
|
|
"path": path,
|
|
"client": client.String(),
|
|
}).Log(message)
|
|
}
|
|
|
|
// GetToken returns the path without the token and the token found in the URL. If the token
|
|
// was part of the path, the token is removed from the path. The token in the query string
|
|
// takes precedence. The token in the path is assumed to be the last path element.
|
|
func GetToken(u *url.URL) (string, string) {
|
|
q := u.Query()
|
|
if q.Has("token") {
|
|
// The token was in the query. Return the unmomdified path and the token
|
|
return u.Path, q.Get("token")
|
|
}
|
|
|
|
pathElements := splitPath(u.EscapedPath())
|
|
nPathElements := len(pathElements)
|
|
|
|
if nPathElements <= 1 {
|
|
return u.Path, ""
|
|
}
|
|
|
|
// Return the path without the token
|
|
return "/" + strings.Join(pathElements[:nPathElements-1], "/"), pathElements[nPathElements-1]
|
|
}
|
|
|
|
func splitPath(path string) []string {
|
|
pathElements := strings.Split(filepath.Clean(path), "/")
|
|
|
|
if len(pathElements) == 0 {
|
|
return pathElements
|
|
}
|
|
|
|
if len(pathElements[0]) == 0 {
|
|
pathElements = pathElements[1:]
|
|
}
|
|
|
|
return pathElements
|
|
}
|
|
|
|
func removePathPrefix(path, prefix string) (string, string) {
|
|
prefix = filepath.Join("/", prefix)
|
|
return filepath.Join("/", strings.TrimPrefix(path, prefix+"/")), prefix
|
|
}
|
|
|
|
// handlePlay is called when a RTMP client wants to play a stream
|
|
func (s *server) handlePlay(conn *rtmp.Conn) {
|
|
defer conn.Close()
|
|
|
|
remote := conn.NetConn().RemoteAddr()
|
|
playpath, token := GetToken(conn.URL)
|
|
|
|
playpath, _ = removePathPrefix(playpath, s.app)
|
|
|
|
identity, err := s.findIdentityFromStreamKey(token)
|
|
if err != nil {
|
|
s.logger.Debug().WithError(err).Log("invalid streamkey")
|
|
s.log(identity, "PLAY", "FORBIDDEN", playpath, "invalid streamkey ("+token+")", remote)
|
|
return
|
|
}
|
|
|
|
domain := s.findDomainFromPlaypath(playpath)
|
|
resource := "rtmp:" + playpath
|
|
|
|
if !s.iam.Enforce(identity, domain, resource, "PLAY") {
|
|
s.log(identity, "PLAY", "FORBIDDEN", playpath, "access denied", remote)
|
|
return
|
|
}
|
|
|
|
// Look for the stream
|
|
s.lock.RLock()
|
|
ch := s.channels[playpath]
|
|
s.lock.RUnlock()
|
|
|
|
if ch == nil {
|
|
// Check in the cluster for that stream
|
|
url, err := s.proxy.GetURL("rtmp:" + conn.URL.Path)
|
|
if err != nil {
|
|
s.log(identity, "PLAY", "NOTFOUND", conn.URL.Path, "", remote)
|
|
return
|
|
}
|
|
|
|
src, err := avutil.Open(url)
|
|
if err != nil {
|
|
s.logger.Error().WithField("address", url).WithError(err).Log("Proxying address failed")
|
|
s.log(identity, "PLAY", "NOTFOUND", conn.URL.Path, "", remote)
|
|
return
|
|
}
|
|
|
|
c := newConnectionFromDemuxer(src)
|
|
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(1)
|
|
|
|
go func() {
|
|
s.log(identity, "PLAY", "PROXYSTART", url, "", remote)
|
|
wg.Done()
|
|
err := s.publish(c, playpath, remote, identity, true)
|
|
if err != nil {
|
|
s.logger.Error().WithField("address", url).WithError(err).Log("Proxying address failed")
|
|
}
|
|
s.log(identity, "PLAY", "PROXYSTOP", url, "", remote)
|
|
}()
|
|
|
|
// Wait for the goroutine to start
|
|
wg.Wait()
|
|
|
|
// Wait for channel to become available
|
|
ticker := time.NewTicker(200 * time.Millisecond)
|
|
tickerStart := time.Now()
|
|
|
|
for range ticker.C {
|
|
s.lock.RLock()
|
|
ch = s.channels[conn.URL.Path]
|
|
s.lock.RUnlock()
|
|
|
|
if ch != nil {
|
|
break
|
|
}
|
|
|
|
if time.Since(tickerStart) > 2*time.Second {
|
|
break
|
|
}
|
|
}
|
|
|
|
ticker.Stop()
|
|
}
|
|
|
|
if ch != nil {
|
|
// Send the metadata to the client
|
|
conn.WriteHeader(ch.streams)
|
|
|
|
s.log(identity, "PLAY", "START", conn.URL.Path, "", remote)
|
|
|
|
// Get a cursor and apply filters
|
|
cursor := ch.queue.Oldest()
|
|
|
|
filters := pktque.Filters{}
|
|
|
|
if ch.hasVideo {
|
|
// The first packet has to be a key frame
|
|
filters = append(filters, &pktque.WaitKeyFrame{})
|
|
}
|
|
|
|
// Adjust the timestamp such that the stream starts from 0
|
|
filters = append(filters, &pktque.FixTime{StartFromZero: true, MakeIncrement: false})
|
|
|
|
demuxer := &pktque.FilterDemuxer{
|
|
Filter: filters,
|
|
Demuxer: cursor,
|
|
}
|
|
|
|
id := ch.AddSubscriber(conn)
|
|
|
|
// Transfer the data, blocks until done
|
|
avutil.CopyFile(conn, demuxer)
|
|
|
|
ch.RemoveSubscriber(id)
|
|
|
|
s.log(identity, "PLAY", "STOP", playpath, "", remote)
|
|
} else {
|
|
s.log(identity, "PLAY", "NOTFOUND", playpath, "", remote)
|
|
}
|
|
}
|
|
|
|
// handlePublish is called when a RTMP client wants to publish a stream
|
|
func (s *server) handlePublish(conn *rtmp.Conn) {
|
|
defer conn.Close()
|
|
|
|
remote := conn.NetConn().RemoteAddr()
|
|
playpath, token := GetToken(conn.URL)
|
|
|
|
playpath, app := removePathPrefix(playpath, s.app)
|
|
|
|
identity, err := s.findIdentityFromStreamKey(token)
|
|
if err != nil {
|
|
s.logger.Debug().WithError(err).Log("invalid streamkey")
|
|
s.log(identity, "PUBLISH", "FORBIDDEN", playpath, "invalid streamkey ("+token+")", remote)
|
|
return
|
|
}
|
|
|
|
// Check the app patch
|
|
if app != s.app {
|
|
s.log(identity, "PUBLISH", "FORBIDDEN", playpath, "invalid app", remote)
|
|
return
|
|
}
|
|
|
|
domain := s.findDomainFromPlaypath(playpath)
|
|
resource := "rtmp:" + playpath
|
|
|
|
if !s.iam.Enforce(identity, domain, resource, "PUBLISH") {
|
|
s.log(identity, "PUBLISH", "FORBIDDEN", playpath, "access denied", remote)
|
|
return
|
|
}
|
|
|
|
err = s.publish(conn, playpath, remote, identity, false)
|
|
if err != nil {
|
|
s.logger.WithField("path", conn.URL.Path).WithError(err).Log("")
|
|
}
|
|
}
|
|
|
|
func (s *server) publish(src connection, playpath string, remote net.Addr, identity string, isProxy bool) error {
|
|
// Check the streams if it contains any valid/known streams
|
|
streams, _ := src.Streams()
|
|
|
|
if len(streams) == 0 {
|
|
s.log(identity, "PUBLISH", "INVALID", playpath, "no streams available", remote)
|
|
return fmt.Errorf("no streams are available")
|
|
}
|
|
|
|
s.lock.Lock()
|
|
|
|
ch := s.channels[playpath]
|
|
if ch == nil {
|
|
reference := strings.TrimPrefix(strings.TrimSuffix(playpath, filepath.Ext(playpath)), s.app+"/")
|
|
|
|
// Create a new channel
|
|
ch = newChannel(src, playpath, reference, remote, streams, isProxy, s.collector)
|
|
|
|
for _, stream := range streams {
|
|
typ := stream.Type()
|
|
|
|
switch {
|
|
case typ.IsAudio():
|
|
ch.hasAudio = true
|
|
case typ.IsVideo():
|
|
ch.hasVideo = true
|
|
}
|
|
}
|
|
|
|
s.channels[playpath] = ch
|
|
} else {
|
|
ch = nil
|
|
}
|
|
|
|
s.lock.Unlock()
|
|
|
|
if ch == nil {
|
|
s.log(identity, "PUBLISH", "CONFLICT", playpath, "already publishing", remote)
|
|
return fmt.Errorf("already publishing")
|
|
}
|
|
|
|
s.log(identity, "PUBLISH", "START", playpath, "", remote)
|
|
|
|
for _, stream := range streams {
|
|
s.log(identity, "PUBLISH", "STREAM", playpath, stream.Type().String(), remote)
|
|
}
|
|
|
|
// Ingest the data, blocks until done
|
|
avutil.CopyPackets(ch.queue, src)
|
|
|
|
s.lock.Lock()
|
|
delete(s.channels, playpath)
|
|
s.lock.Unlock()
|
|
|
|
ch.Close()
|
|
|
|
s.log(identity, "PUBLISH", "STOP", playpath, "", remote)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *server) findIdentityFromStreamKey(key string) (string, error) {
|
|
if len(key) == 0 {
|
|
return "$anon", nil
|
|
}
|
|
|
|
var identity iam.IdentityVerifier
|
|
var err error
|
|
|
|
var token string
|
|
|
|
elements := strings.Split(key, ":")
|
|
if len(elements) == 1 {
|
|
identity = s.iam.GetDefaultVerifier()
|
|
token = elements[0]
|
|
} else {
|
|
identity, err = s.iam.GetVerifier(elements[0])
|
|
token = elements[1]
|
|
}
|
|
|
|
if err != nil {
|
|
return "$anon", nil
|
|
}
|
|
|
|
if ok, err := identity.VerifyServiceToken(token); !ok {
|
|
return "$anon", fmt.Errorf("invalid token: %w", err)
|
|
}
|
|
|
|
return identity.Name(), nil
|
|
}
|
|
|
|
// findDomainFromPlaypath finds the domain in the path. The domain is
|
|
// the first path element. If there's only one path element, it is not
|
|
// considered the domain. It is assumed that the app is not part of
|
|
// the provided path.
|
|
func (s *server) findDomainFromPlaypath(path string) string {
|
|
elements := splitPath(path)
|
|
if len(elements) == 1 {
|
|
return "$none"
|
|
}
|
|
|
|
domain := elements[0]
|
|
|
|
if s.iam.HasDomain(domain) {
|
|
return domain
|
|
}
|
|
|
|
return "$none"
|
|
}
|