diff --git a/app/api/api.go b/app/api/api.go index d677e31f..e62c2f3c 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -757,6 +757,7 @@ func (a *api) start() error { Token: cfg.SRT.Token, Logger: a.log.logger.core.WithComponent("SRT").WithField("address", cfg.SRT.Address), Collector: a.sessions.Collector("srt"), + Cluster: a.cluster, } if cfg.SRT.Log.Enable { diff --git a/client/client.go b/client/client.go index f1cd3bf1..0ddaa013 100644 --- a/client/client.go +++ b/client/client.go @@ -68,6 +68,7 @@ type RestClient interface { ProcessMetadataSet(id, key string, metadata api.Metadata) error // PUT /process/{id}/metadata/{key} RTMPChannels() ([]api.RTMPChannel, error) // GET /rtmp + SRTChannels() ([]api.SRTChannel, error) // GET /srt Sessions(collectors []string) (api.SessionsSummary, error) // GET /session SessionsActive(collectors []string) (api.SessionsActive, error) // GET /session/active diff --git a/client/srt.go b/client/srt.go new file mode 100644 index 00000000..bbf1c3e9 --- /dev/null +++ b/client/srt.go @@ -0,0 +1,20 @@ +package client + +import ( + "encoding/json" + + "github.com/datarhei/core/v16/http/api" +) + +func (r *restclient) SRTChannels() ([]api.SRTChannel, error) { + var m []api.SRTChannel + + data, err := r.call("GET", "/srt", "", nil) + if err != nil { + return m, err + } + + err = json.Unmarshal(data, &m) + + return m, err +} diff --git a/cluster/cluster.go b/cluster/cluster.go index 6358bcf2..7883d00a 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -9,13 +9,27 @@ import ( "github.com/datarhei/core/v16/log" ) +type ClusterReader interface { + GetURL(path string) (string, error) +} + +type dummyClusterReader struct{} + +func NewDummyClusterReader() ClusterReader { + return &dummyClusterReader{} +} + +func (r *dummyClusterReader) GetURL(path string) (string, error) { + return "", fmt.Errorf("not implemented in dummy cluster") +} + type Cluster interface { AddNode(address, username, password string) (string, error) RemoveNode(id string) error ListNodes() []NodeReader GetNode(id string) (NodeReader, error) Stop() - GetURL(path string) (string, error) + ClusterReader } type ClusterConfig struct { @@ -64,7 +78,7 @@ func New(config ClusterConfig) (Cluster, error) { "node": state.ID, "state": state.State, "files": len(state.Files), - }).Log("got update") + }).Log("Got update") c.lock.Lock() @@ -125,6 +139,11 @@ func (c *cluster) AddNode(address, username, password string) (string, error) { c.nodes[id] = node + c.logger.Info().WithFields(log.Fields{ + "address": address, + "id": id, + }).Log("Added node") + return id, nil } @@ -141,6 +160,10 @@ func (c *cluster) RemoveNode(id string) error { delete(c.nodes, id) + c.logger.Info().WithFields(log.Fields{ + "id": id, + }).Log("Removed node") + return nil } @@ -173,38 +196,36 @@ func (c *cluster) GetURL(path string) (string, error) { c.lock.RLock() defer c.lock.RUnlock() - c.logger.Debug().WithField("path", path).Log("opening") - id, ok := c.fileid[path] if !ok { - c.logger.Debug().WithField("path", path).Log("not found") + c.logger.Debug().WithField("path", path).Log("Not found") return "", fmt.Errorf("file not found") } ts, ok := c.idupdate[id] if !ok { - c.logger.Debug().WithField("path", path).Log("no age information found") + c.logger.Debug().WithField("path", path).Log("No age information found") return "", fmt.Errorf("file not found") } if time.Since(ts) > 2*time.Second { - c.logger.Debug().WithField("path", path).Log("file too old") + c.logger.Debug().WithField("path", path).Log("File too old") return "", fmt.Errorf("file not found") } node, ok := c.nodes[id] if !ok { - c.logger.Debug().WithField("path", path).Log("unknown node") + c.logger.Debug().WithField("path", path).Log("Unknown node") return "", fmt.Errorf("file not found") } url, err := node.GetURL(path) if err != nil { - c.logger.Debug().WithField("path", path).Log("invalid path") + c.logger.Debug().WithField("path", path).Log("Invalid path") return "", fmt.Errorf("file not found") } - c.logger.Debug().WithField("url", url).Log("file cluster url") + c.logger.Debug().WithField("url", url).Log("File cluster url") return url, nil } diff --git a/cluster/node.go b/cluster/node.go index 9725df56..7ce6836b 100644 --- a/cluster/node.go +++ b/cluster/node.go @@ -57,7 +57,7 @@ type node struct { rtmpAddress string rtmpToken string hasSRT bool - srtPort string + srtAddress string srtPassphrase string srtToken string @@ -127,12 +127,13 @@ func newNode(address, username, password string, updates chan<- NodeState) (*nod if config.Config.SRT.Enable { n.hasSRT = true + n.srtAddress = "srt://" _, port, err := net.SplitHostPort(config.Config.SRT.Address) if err != nil { n.hasSRT = false } else { - n.srtPort = port + n.srtAddress += host + ":" + port n.srtPassphrase = config.Config.SRT.Passphrase n.srtToken = config.Config.SRT.Token } @@ -203,10 +204,11 @@ func (n *node) files() { memfsfiles, errMemfs := n.peer.MemFSList("name", "asc") diskfsfiles, errDiskfs := n.peer.DiskFSList("name", "asc") rtmpfiles, errRTMP := n.peer.RTMPChannels() + srtfiles, errSRT := n.peer.SRTChannels() n.lastUpdate = time.Now() - if errMemfs != nil || errDiskfs != nil || errRTMP != nil { + if errMemfs != nil || errDiskfs != nil || errRTMP != nil || errSRT != nil { n.fileList = nil n.state = stateDisconnected return @@ -214,7 +216,7 @@ func (n *node) files() { n.state = stateConnected - n.fileList = make([]string, len(memfsfiles)+len(diskfsfiles)+len(rtmpfiles)) + n.fileList = make([]string, len(memfsfiles)+len(diskfsfiles)+len(rtmpfiles)+len(srtfiles)) nfiles := 0 @@ -233,6 +235,11 @@ func (n *node) files() { nfiles++ } + for _, file := range srtfiles { + n.fileList[nfiles] = "srt:" + file.Name + nfiles++ + } + return } @@ -252,6 +259,16 @@ func (n *node) GetURL(path string) (string, error) { if len(n.rtmpToken) != 0 { u += "?token=" + url.QueryEscape(n.rtmpToken) } + } else if prefix == "srt:" { + u = n.srtAddress + "?mode=caller" + if len(n.srtPassphrase) != 0 { + u += "&passphrase=" + url.QueryEscape(n.srtPassphrase) + } + streamid := "#!:m=request,r=" + path + if len(n.srtToken) != 0 { + streamid += ",token=" + n.srtToken + } + u += "&streamid=" + url.QueryEscape(streamid) } else { return "", fmt.Errorf("unknown prefix") } diff --git a/http/api/srt.go b/http/api/srt.go index 76194d52..149d3109 100644 --- a/http/api/srt.go +++ b/http/api/srt.go @@ -1,8 +1,6 @@ package api import ( - "github.com/datarhei/core/v16/srt" - gosrt "github.com/datarhei/gosrt" ) @@ -109,56 +107,11 @@ type SRTConnection struct { Stats SRTStatistics `json:"stats"` } -// Unmarshal converts the SRT connection into API representation -func (s *SRTConnection) Unmarshal(ss *srt.Connection) { - s.Log = make(map[string][]SRTLog) - s.Stats.Unmarshal(&ss.Stats) - - for k, v := range ss.Log { - s.Log[k] = make([]SRTLog, len(v)) - for i, l := range v { - s.Log[k][i].Timestamp = l.Timestamp.UnixMilli() - s.Log[k][i].Message = l.Message - } - } -} - -// SRTChannels represents all current SRT connections -type SRTChannels struct { - Publisher map[string]uint32 `json:"publisher"` - Subscriber map[string][]uint32 `json:"subscriber"` +// SRTChannel represents a SRT publishing connection with its subscribers +type SRTChannel struct { + Name string `json:"name"` + SocketId uint32 `json:"socketid"` + Subscriber []uint32 `json:"subscriber"` Connections map[uint32]SRTConnection `json:"connections"` Log map[string][]SRTLog `json:"log"` } - -// Unmarshal converts the SRT channels into API representation -func (s *SRTChannels) Unmarshal(ss *srt.Channels) { - s.Publisher = make(map[string]uint32) - s.Subscriber = make(map[string][]uint32) - s.Connections = make(map[uint32]SRTConnection) - s.Log = make(map[string][]SRTLog) - - for k, v := range ss.Publisher { - s.Publisher[k] = v - } - - for k, v := range ss.Subscriber { - vv := make([]uint32, len(v)) - copy(vv, v) - s.Subscriber[k] = vv - } - - for k, v := range ss.Connections { - c := s.Connections[k] - c.Unmarshal(&v) - s.Connections[k] = c - } - - for k, v := range ss.Log { - s.Log[k] = make([]SRTLog, len(v)) - for i, l := range v { - s.Log[k][i].Timestamp = l.Timestamp.UnixMilli() - s.Log[k][i].Message = l.Message - } - } -} diff --git a/http/fs/cluster.go b/http/fs/cluster.go index 8c713000..6848d5f6 100644 --- a/http/fs/cluster.go +++ b/http/fs/cluster.go @@ -17,7 +17,7 @@ type filesystem struct { fs.Filesystem what string - cluster cluster.Cluster + cluster cluster.ClusterReader } func NewClusterFS(what string, fs fs.Filesystem, cluster cluster.Cluster) Filesystem { diff --git a/http/handler/api/srt.go b/http/handler/api/srt.go index 1d98cc18..859820f9 100644 --- a/http/handler/api/srt.go +++ b/http/handler/api/srt.go @@ -26,14 +26,56 @@ func NewSRT(srt srt.Server) *SRTHandler { // @Description List all currently publishing SRT streams. This endpoint is EXPERIMENTAL and may change in future. // @ID srt-3-list-channels // @Produce json -// @Success 200 {array} api.SRTChannels +// @Success 200 {array} []api.SRTChannel // @Security ApiKeyAuth // @Router /api/v3/srt [get] func (srth *SRTHandler) ListChannels(c echo.Context) error { channels := srth.srt.Channels() - srtchannels := api.SRTChannels{} - srtchannels.Unmarshal(&channels) + srtchannels := []api.SRTChannel{} + + for _, channel := range channels { + srtchannels = append(srtchannels, srth.unmarshalChannel(channel)) + } return c.JSON(http.StatusOK, srtchannels) } + +// Unmarshal converts the SRT channels into API representation +func (srth *SRTHandler) unmarshalChannel(ss srt.Channel) api.SRTChannel { + s := api.SRTChannel{ + Name: ss.Name, + SocketId: ss.SocketId, + Connections: map[uint32]api.SRTConnection{}, + Log: make(map[string][]api.SRTLog), + } + + s.Subscriber = make([]uint32, len(ss.Subscriber)) + copy(s.Subscriber, ss.Subscriber) + + for k, v := range ss.Connections { + c := s.Connections[k] + c.Log = make(map[string][]api.SRTLog) + c.Stats.Unmarshal(&v.Stats) + + for lk, lv := range ss.Log { + s.Log[lk] = make([]api.SRTLog, len(lv)) + for i, l := range lv { + s.Log[lk][i].Timestamp = l.Timestamp.UnixMilli() + s.Log[lk][i].Message = l.Message + } + } + + s.Connections[k] = c + } + + for k, v := range ss.Log { + s.Log[k] = make([]api.SRTLog, len(v)) + for i, l := range v { + s.Log[k][i].Timestamp = l.Timestamp.UnixMilli() + s.Log[k][i].Message = l.Message + } + } + + return s +} diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index 4cea0e13..e8fff1d2 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -196,7 +196,7 @@ type Config struct { // with methods like tls.Config.SetSessionTicketKeys. TLSConfig *tls.Config - Cluster cluster.Cluster + Cluster cluster.ClusterReader } // Server represents a RTMP server @@ -231,7 +231,7 @@ type server struct { channels map[string]*channel lock sync.RWMutex - cluster cluster.Cluster + cluster cluster.ClusterReader } // New creates a new RTMP server according to the given config @@ -256,6 +256,10 @@ func New(config Config) (Server, error) { s.collector = session.NewNullCollector() } + if s.cluster == nil { + s.cluster = cluster.NewDummyClusterReader() + } + s.server = &rtmp.Server{ Addr: config.Addr, HandlePlay: s.handlePlay, @@ -404,16 +408,15 @@ func (s *server) handlePlay(conn *rtmp.Conn) { s.log("PLAY", "STOP", conn.URL.Path, "", client) } else { // Check in the cluster for that stream - if s.cluster != nil { - url, err := s.cluster.GetURL("rtmp:" + conn.URL.Path) + url, err := s.cluster.GetURL("rtmp:" + conn.URL.Path) + if err == nil { + src, err := avutil.Open(url) if err != nil { + s.logger.Error().WithField("address", url).WithError(err).Log("Proxying address failed") s.log("PLAY", "NOTFOUND", conn.URL.Path, "", client) } else { s.log("PLAY", "PROXYSTART", url, "", client) - - src, _ := avutil.Open(url) avutil.CopyFile(conn, src) - s.log("PLAY", "PROXYSTOP", url, "", client) } } else { diff --git a/srt/srt.go b/srt/srt.go index 78c5bbca..9cbe1860 100644 --- a/srt/srt.go +++ b/srt/srt.go @@ -4,11 +4,14 @@ import ( "container/ring" "context" "fmt" + "io" "net" + "net/url" "strings" "sync" "time" + "github.com/datarhei/core/v16/cluster" "github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/session" srt "github.com/datarhei/gosrt" @@ -161,6 +164,8 @@ type Config struct { Collector session.Collector SRTLogTopics []string + + Cluster cluster.ClusterReader } // Server represents a SRT server @@ -172,7 +177,7 @@ type Server interface { Close() // Channels return a list of currently publishing streams - Channels() Channels + Channels() []Channel } // server implements the Server interface @@ -185,8 +190,8 @@ type server struct { server srt.Server - // Map of publishing channels and a lock to serialize - // access to the map. + // Map of publishing channels and a lock to serialize access to the map. The map + // index is the name of the resource. channels map[string]*channel lock sync.RWMutex @@ -194,8 +199,10 @@ type server struct { srtlogger srt.Logger srtloggerCancel context.CancelFunc - srtlog map[string]*ring.Ring + srtlog map[string]*ring.Ring // Per logtopic a dedicated ring buffer srtlogLock sync.RWMutex + + cluster cluster.ClusterReader } func New(config Config) (Server, error) { @@ -205,12 +212,17 @@ func New(config Config) (Server, error) { passphrase: config.Passphrase, collector: config.Collector, logger: config.Logger, + cluster: config.Cluster, } if s.collector == nil { s.collector = session.NewNullCollector() } + if s.cluster == nil { + s.cluster = cluster.NewDummyClusterReader() + } + if s.logger == nil { s.logger = log.New("") } @@ -264,70 +276,74 @@ type Connection struct { Stats srt.Statistics } -type Channels struct { - Publisher map[string]uint32 - Subscriber map[string][]uint32 - Connections map[uint32]Connection - Log map[string][]Log +type Channel struct { + Name string // Resource + SocketId uint32 // Socketid + Subscriber []uint32 // List of subscribed sockedids + Connections map[uint32]Connection // Map from socketid to connection + Log map[string][]Log // Map of topic to log entries } -func (s *server) Channels() Channels { - st := Channels{ - Publisher: map[string]uint32{}, - Subscriber: map[string][]uint32{}, - Connections: map[uint32]Connection{}, - Log: map[string][]Log{}, - } +func (s *server) Channels() []Channel { + channels := []Channel{} s.lock.RLock() for id, ch := range s.channels { socketId := ch.publisher.conn.SocketId() - st.Publisher[id] = socketId + channel := Channel{ + Name: id, + SocketId: socketId, + Subscriber: []uint32{}, + Connections: map[uint32]Connection{}, + Log: map[string][]Log{}, + } - st.Connections[socketId] = Connection{ + channel.Connections[socketId] = Connection{ Stats: ch.publisher.conn.Stats(), Log: map[string][]Log{}, } for _, c := range ch.subscriber { socketId := c.conn.SocketId() - st.Subscriber[id] = append(st.Subscriber[id], socketId) + channel.Subscriber = append(channel.Subscriber, socketId) - st.Connections[socketId] = Connection{ + channel.Connections[socketId] = Connection{ Stats: c.conn.Stats(), Log: map[string][]Log{}, } } + + channels = append(channels, channel) } s.lock.RUnlock() - - s.srtlogLock.RLock() - for topic, buf := range s.srtlog { - - buf.Do(func(l interface{}) { - if l == nil { - return - } - - ll := l.(srt.Log) - - log := Log{ - Timestamp: ll.Time, - Message: strings.Split(ll.Message, "\n"), - } - - if ll.SocketId != 0 { - if _, ok := st.Connections[ll.SocketId]; ok { - st.Connections[ll.SocketId].Log[topic] = append(st.Connections[ll.SocketId].Log[topic], log) + /* + s.srtlogLock.RLock() + for topic, buf := range s.srtlog { + buf.Do(func(l interface{}) { + if l == nil { + return } - } else { - st.Log[topic] = append(st.Log[topic], log) - } - }) - } - s.srtlogLock.RUnlock() - return st + ll := l.(srt.Log) + + log := Log{ + Timestamp: ll.Time, + Message: strings.Split(ll.Message, "\n"), + } + + if ll.SocketId != 0 { + if _, ok := st.Connections[ll.SocketId]; ok { + st.Connections[ll.SocketId].Log[topic] = append(st.Connections[ll.SocketId].Log[topic], log) + } + } else { + st.Log[topic] = append(st.Log[topic], log) + } + }) + } + s.srtlogLock.RUnlock() + */ + + return channels } func (s *server) srtlogListener(ctx context.Context) { @@ -362,6 +378,8 @@ type streamInfo struct { token string } +// parseStreamId parses a streamid of the form "#!:key=value,key=value,..." and +// returns a streamInfo. In case the stream couldn't be parsed, an error is returned. func parseStreamId(streamid string) (streamInfo, error) { si := streamInfo{} @@ -451,20 +469,6 @@ func (s *server) handleConnect(req srt.ConnRequest) srt.ConnType { return srt.REJECT } - s.lock.RLock() - ch := s.channels[si.resource] - s.lock.RUnlock() - - if mode == srt.PUBLISH && ch != nil { - s.log("CONNECT", "CONFLICT", si.resource, "already publishing", client) - return srt.REJECT - } - - if mode == srt.SUBSCRIBE && ch == nil { - s.log("CONNECT", "NOTFOUND", si.resource, "no publisher for this resource found", client) - return srt.REJECT - } - return mode } @@ -507,6 +511,8 @@ func (s *server) handlePublish(conn srt.Conn) { } func (s *server) handleSubscribe(conn srt.Conn) { + defer conn.Close() + streamId := conn.StreamId() client := conn.RemoteAddr() @@ -518,20 +524,52 @@ func (s *server) handleSubscribe(conn srt.Conn) { s.lock.RUnlock() if ch == nil { - s.log("SUBSCRIBE", "NOTFOUND", si.resource, "no publisher for this resource found", client) - conn.Close() - return + srturl, err := s.cluster.GetURL("srt:" + si.resource) + if err == nil { + u, err := url.Parse(srturl) + if err != nil { + s.logger.Error().WithField("address", srturl).WithError(err).Log("Parsing proxy address failed") + s.log("SUBSCRIBE", "NOTFOUND", si.resource, "no publisher for this resource found", client) + return + } + config := srt.DefaultConfig() + config.Latency = 200 * time.Millisecond + if err := config.UnmarshalURL(srturl); err != nil { + s.logger.Error().WithField("address", srturl).WithError(err).Log("Parsing proxy address failed") + s.log("SUBSCRIBE", "NOTFOUND", si.resource, "no publisher for this resource found", client) + return + } + src, err := srt.Dial("srt", u.Host, config) + if err != nil { + s.logger.Error().WithField("address", srturl).WithError(err).Log("Proxying address failed") + s.log("SUBSCRIBE", "NOTFOUND", si.resource, "no publisher for this resource found", client) + } else { + s.log("SUBSCRIBE", "PROXYSTART", srturl, "", client) + buffer := make([]byte, srt.MAX_MSS_SIZE) + for { + n, err := src.Read(buffer) + if err != nil { + if err != io.EOF { + s.logger.Error().WithField("address", srturl).WithError(err).Log("Proxying address aborted") + } + break + } + conn.Write(buffer[:n]) + } + s.log("SUBSCRIBE", "PROXYSTOP", srturl, "", client) + } + } else { + s.log("SUBSCRIBE", "NOTFOUND", si.resource, "no publisher for this resource found", client) + } + } else { + s.log("SUBSCRIBE", "START", si.resource, "", client) + + id := ch.AddSubscriber(conn, si.resource) + + ch.pubsub.Subscribe(conn) + + s.log("SUBSCRIBE", "STOP", si.resource, "", client) + + ch.RemoveSubscriber(id) } - - s.log("SUBSCRIBE", "START", si.resource, "", client) - - id := ch.AddSubscriber(conn, si.resource) - - ch.pubsub.Subscribe(conn) - - s.log("SUBSCRIBE", "STOP", si.resource, "", client) - - ch.RemoveSubscriber(id) - - conn.Close() }