Merge branch 'dev' into cluster

This commit is contained in:
Ingo Oppermann
2023-04-12 15:27:38 +02:00
995 changed files with 130216 additions and 13987 deletions

View File

@@ -198,38 +198,72 @@ func (s *server) log(who, action, path, message string, client net.Addr) {
}).Log(message)
}
// handlePlay is called when a RTMP client wants to play a stream
func (s *server) handlePlay(conn *rtmp.Conn) {
defer conn.Close()
remote := conn.NetConn().RemoteAddr()
// Check the token
q := conn.URL.Query()
// getToken returns the path and the token found in the URL. If the token
// was part of the path, the token is removed from the path. The token in
// the query string takes precedence. The token in the path is assumed to
// be the last path element.
func getToken(u *url.URL) (string, string) {
q := u.Query()
token := q.Get("token")
if len(s.token) != 0 && s.token != token {
s.log("PLAY", "FORBIDDEN", conn.URL.Path, "invalid token ("+token+")", remote)
return
if len(token) != 0 {
// The token was in the query. Return the unmomdified path and the token
return u.Path, token
}
pathElements := strings.Split(u.EscapedPath(), "/")
nPathElements := len(pathElements)
if nPathElements == 0 {
return u.Path, ""
}
// Return the path without the token
return strings.Join(pathElements[:nPathElements-1], "/"), pathElements[nPathElements-1]
}
// handlePlay is called when a RTMP client wants to play a stream
func (s *server) handlePlay(conn *rtmp.Conn) {
client := conn.NetConn().RemoteAddr()
defer conn.Close()
playPath := conn.URL.Path
// Check the token in the URL if one is required
if len(s.token) != 0 {
path, token := getToken(conn.URL)
if len(token) == 0 {
s.log("PLAY", "FORBIDDEN", path, "no streamkey provided", client)
return
}
if s.token != token {
s.log("PLAY", "FORBIDDEN", path, "invalid streamkey ("+token+")", client)
return
}
playPath = path
}
// Look for the stream
s.lock.RLock()
ch := s.channels[conn.URL.Path]
ch := s.channels[playPath]
s.lock.RUnlock()
if ch == nil {
// Check in the cluster for that stream
url, err := s.cluster.GetURL("rtmp:" + conn.URL.Path)
if err != nil {
s.log("PLAY", "NOTFOUND", conn.URL.Path, "", remote)
s.log("PLAY", "NOTFOUND", conn.URL.Path, "", client)
return
}
src, err := avutil.Open(url)
if err != nil {
s.logger.Error().WithField("address", url).WithError(err).Log("Proxying address failed")
s.log("PLAY", "NOTFOUND", conn.URL.Path, "", remote)
s.log("PLAY", "NOTFOUND", conn.URL.Path, "", client)
return
}
@@ -239,13 +273,13 @@ func (s *server) handlePlay(conn *rtmp.Conn) {
wg.Add(1)
go func() {
s.log("PLAY", "PROXYSTART", url, "", remote)
s.log("PLAY", "PROXYSTART", url, "", client)
wg.Done()
err := s.publish(c, conn.URL, remote, true)
err := s.publish(c, playPath, client, true)
if err != nil {
s.logger.Error().WithField("address", url).WithError(err).Log("Proxying address failed")
}
s.log("PLAY", "PROXYSTOP", url, "", remote)
s.log("PLAY", "PROXYSTOP", url, "", client)
}()
// Wait for the goroutine to start
@@ -276,7 +310,7 @@ func (s *server) handlePlay(conn *rtmp.Conn) {
// Send the metadata to the client
conn.WriteHeader(ch.streams)
s.log("PLAY", "START", conn.URL.Path, "", remote)
s.log("PLAY", "START", playPath, "", client)
// Get a cursor and apply filters
cursor := ch.queue.Oldest()
@@ -289,7 +323,7 @@ func (s *server) handlePlay(conn *rtmp.Conn) {
}
// Adjust the timestamp such that the stream starts from 0
filters = append(filters, &pktque.FixTime{StartFromZero: true, MakeIncrement: true})
filters = append(filters, &pktque.FixTime{StartFromZero: true, MakeIncrement: false})
demuxer := &pktque.FilterDemuxer{
Filter: filters,
@@ -303,57 +337,64 @@ func (s *server) handlePlay(conn *rtmp.Conn) {
ch.RemoveSubscriber(id)
s.log("PLAY", "STOP", conn.URL.Path, "", remote)
s.log("PLAY", "STOP", playPath, "", client)
} else {
s.log("PLAY", "NOTFOUND", conn.URL.Path, "", remote)
s.log("PLAY", "NOTFOUND", playPath, "", client)
}
}
// handlePublish is called when a RTMP client wants to publish a stream
func (s *server) handlePublish(conn *rtmp.Conn) {
client := conn.NetConn().RemoteAddr()
defer conn.Close()
remote := conn.NetConn().RemoteAddr()
playPath := conn.URL.Path
if len(s.token) != 0 {
// Check the token
token := conn.URL.Query().Get("token")
path, token := getToken(conn.URL)
if s.token != token {
s.log("PUBLISH", "FORBIDDEN", conn.URL.Path, "invalid token ("+token+")", remote)
if len(token) == 0 {
s.log("PLAY", "FORBIDDEN", path, "no streamkey provided", client)
return
}
if s.token != token {
s.log("PLAY", "FORBIDDEN", path, "invalid streamkey ("+token+")", client)
return
}
playPath = path
}
// Check the app patch
if !strings.HasPrefix(conn.URL.Path, s.app) {
s.log("PUBLISH", "FORBIDDEN", conn.URL.Path, "invalid app", remote)
if !strings.HasPrefix(playPath, s.app) {
s.log("PUBLISH", "FORBIDDEN", playPath, "invalid app", client)
return
}
err := s.publish(conn, conn.URL, remote, false)
err := s.publish(conn, playPath, client, false)
if err != nil {
s.logger.WithField("path", conn.URL.Path).WithError(err).Log("")
s.logger.WithField("path", playPath).WithError(err).Log("")
}
}
func (s *server) publish(src connection, u *url.URL, remote net.Addr, isProxy bool) error {
func (s *server) publish(src connection, playPath string, client net.Addr, isProxy bool) error {
// Check the streams if it contains any valid/known streams
streams, _ := src.Streams()
if len(streams) == 0 {
s.log("PUBLISH", "INVALID", u.Path, "no streams available", remote)
return fmt.Errorf("no streams are available")
s.log("PUBLISH", "INVALID", playPath, "no streams available", client)
}
s.lock.Lock()
ch := s.channels[u.Path]
ch := s.channels[playPath]
if ch == nil {
reference := strings.TrimPrefix(strings.TrimSuffix(u.Path, filepath.Ext(u.Path)), s.app+"/")
reference := strings.TrimPrefix(strings.TrimSuffix(playPath, filepath.Ext(playPath)), s.app+"/")
// Create a new channel
ch = newChannel(src, u, reference, remote, streams, isProxy, s.collector)
ch = newChannel(src, playPath, reference, client, streams, isProxy, s.collector)
for _, stream := range streams {
typ := stream.Type()
@@ -366,7 +407,7 @@ func (s *server) publish(src connection, u *url.URL, remote net.Addr, isProxy bo
}
}
s.channels[u.Path] = ch
s.channels[playPath] = ch
} else {
ch = nil
}
@@ -374,26 +415,26 @@ func (s *server) publish(src connection, u *url.URL, remote net.Addr, isProxy bo
s.lock.Unlock()
if ch == nil {
s.log("PUBLISH", "CONFLICT", u.Path, "already publishing", remote)
s.log("PUBLISH", "CONFLICT", playPath, "already publishing", client)
return fmt.Errorf("already publishing")
}
s.log("PUBLISH", "START", u.Path, "", remote)
s.log("PUBLISH", "START", playPath, "", client)
for _, stream := range streams {
s.log("PUBLISH", "STREAM", u.Path, stream.Type().String(), remote)
s.log("PUBLISH", "STREAM", playPath, stream.Type().String(), client)
}
// Ingest the data, blocks until done
avutil.CopyPackets(ch.queue, src)
s.lock.Lock()
delete(s.channels, u.Path)
delete(s.channels, playPath)
s.lock.Unlock()
ch.Close()
s.log("PUBLISH", "STOP", u.Path, "", remote)
s.log("PUBLISH", "STOP", playPath, "", client)
return nil
}