Add SRT proxying

This commit is contained in:
Ingo Oppermann
2022-08-08 16:53:37 +02:00
parent f4acc0457f
commit c04ab1e82f
10 changed files with 248 additions and 152 deletions

View File

@@ -4,11 +4,14 @@ import (
"container/ring"
"context"
"fmt"
"io"
"net"
"net/url"
"strings"
"sync"
"time"
"github.com/datarhei/core/v16/cluster"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/session"
srt "github.com/datarhei/gosrt"
@@ -161,6 +164,8 @@ type Config struct {
Collector session.Collector
SRTLogTopics []string
Cluster cluster.ClusterReader
}
// Server represents a SRT server
@@ -172,7 +177,7 @@ type Server interface {
Close()
// Channels return a list of currently publishing streams
Channels() Channels
Channels() []Channel
}
// server implements the Server interface
@@ -185,8 +190,8 @@ type server struct {
server srt.Server
// Map of publishing channels and a lock to serialize
// access to the map.
// Map of publishing channels and a lock to serialize access to the map. The map
// index is the name of the resource.
channels map[string]*channel
lock sync.RWMutex
@@ -194,8 +199,10 @@ type server struct {
srtlogger srt.Logger
srtloggerCancel context.CancelFunc
srtlog map[string]*ring.Ring
srtlog map[string]*ring.Ring // Per logtopic a dedicated ring buffer
srtlogLock sync.RWMutex
cluster cluster.ClusterReader
}
func New(config Config) (Server, error) {
@@ -205,12 +212,17 @@ func New(config Config) (Server, error) {
passphrase: config.Passphrase,
collector: config.Collector,
logger: config.Logger,
cluster: config.Cluster,
}
if s.collector == nil {
s.collector = session.NewNullCollector()
}
if s.cluster == nil {
s.cluster = cluster.NewDummyClusterReader()
}
if s.logger == nil {
s.logger = log.New("")
}
@@ -264,70 +276,74 @@ type Connection struct {
Stats srt.Statistics
}
type Channels struct {
Publisher map[string]uint32
Subscriber map[string][]uint32
Connections map[uint32]Connection
Log map[string][]Log
type Channel struct {
Name string // Resource
SocketId uint32 // Socketid
Subscriber []uint32 // List of subscribed sockedids
Connections map[uint32]Connection // Map from socketid to connection
Log map[string][]Log // Map of topic to log entries
}
func (s *server) Channels() Channels {
st := Channels{
Publisher: map[string]uint32{},
Subscriber: map[string][]uint32{},
Connections: map[uint32]Connection{},
Log: map[string][]Log{},
}
func (s *server) Channels() []Channel {
channels := []Channel{}
s.lock.RLock()
for id, ch := range s.channels {
socketId := ch.publisher.conn.SocketId()
st.Publisher[id] = socketId
channel := Channel{
Name: id,
SocketId: socketId,
Subscriber: []uint32{},
Connections: map[uint32]Connection{},
Log: map[string][]Log{},
}
st.Connections[socketId] = Connection{
channel.Connections[socketId] = Connection{
Stats: ch.publisher.conn.Stats(),
Log: map[string][]Log{},
}
for _, c := range ch.subscriber {
socketId := c.conn.SocketId()
st.Subscriber[id] = append(st.Subscriber[id], socketId)
channel.Subscriber = append(channel.Subscriber, socketId)
st.Connections[socketId] = Connection{
channel.Connections[socketId] = Connection{
Stats: c.conn.Stats(),
Log: map[string][]Log{},
}
}
channels = append(channels, channel)
}
s.lock.RUnlock()
s.srtlogLock.RLock()
for topic, buf := range s.srtlog {
buf.Do(func(l interface{}) {
if l == nil {
return
}
ll := l.(srt.Log)
log := Log{
Timestamp: ll.Time,
Message: strings.Split(ll.Message, "\n"),
}
if ll.SocketId != 0 {
if _, ok := st.Connections[ll.SocketId]; ok {
st.Connections[ll.SocketId].Log[topic] = append(st.Connections[ll.SocketId].Log[topic], log)
/*
s.srtlogLock.RLock()
for topic, buf := range s.srtlog {
buf.Do(func(l interface{}) {
if l == nil {
return
}
} else {
st.Log[topic] = append(st.Log[topic], log)
}
})
}
s.srtlogLock.RUnlock()
return st
ll := l.(srt.Log)
log := Log{
Timestamp: ll.Time,
Message: strings.Split(ll.Message, "\n"),
}
if ll.SocketId != 0 {
if _, ok := st.Connections[ll.SocketId]; ok {
st.Connections[ll.SocketId].Log[topic] = append(st.Connections[ll.SocketId].Log[topic], log)
}
} else {
st.Log[topic] = append(st.Log[topic], log)
}
})
}
s.srtlogLock.RUnlock()
*/
return channels
}
func (s *server) srtlogListener(ctx context.Context) {
@@ -362,6 +378,8 @@ type streamInfo struct {
token string
}
// parseStreamId parses a streamid of the form "#!:key=value,key=value,..." and
// returns a streamInfo. In case the stream couldn't be parsed, an error is returned.
func parseStreamId(streamid string) (streamInfo, error) {
si := streamInfo{}
@@ -451,20 +469,6 @@ func (s *server) handleConnect(req srt.ConnRequest) srt.ConnType {
return srt.REJECT
}
s.lock.RLock()
ch := s.channels[si.resource]
s.lock.RUnlock()
if mode == srt.PUBLISH && ch != nil {
s.log("CONNECT", "CONFLICT", si.resource, "already publishing", client)
return srt.REJECT
}
if mode == srt.SUBSCRIBE && ch == nil {
s.log("CONNECT", "NOTFOUND", si.resource, "no publisher for this resource found", client)
return srt.REJECT
}
return mode
}
@@ -507,6 +511,8 @@ func (s *server) handlePublish(conn srt.Conn) {
}
func (s *server) handleSubscribe(conn srt.Conn) {
defer conn.Close()
streamId := conn.StreamId()
client := conn.RemoteAddr()
@@ -518,20 +524,52 @@ func (s *server) handleSubscribe(conn srt.Conn) {
s.lock.RUnlock()
if ch == nil {
s.log("SUBSCRIBE", "NOTFOUND", si.resource, "no publisher for this resource found", client)
conn.Close()
return
srturl, err := s.cluster.GetURL("srt:" + si.resource)
if err == nil {
u, err := url.Parse(srturl)
if err != nil {
s.logger.Error().WithField("address", srturl).WithError(err).Log("Parsing proxy address failed")
s.log("SUBSCRIBE", "NOTFOUND", si.resource, "no publisher for this resource found", client)
return
}
config := srt.DefaultConfig()
config.Latency = 200 * time.Millisecond
if err := config.UnmarshalURL(srturl); err != nil {
s.logger.Error().WithField("address", srturl).WithError(err).Log("Parsing proxy address failed")
s.log("SUBSCRIBE", "NOTFOUND", si.resource, "no publisher for this resource found", client)
return
}
src, err := srt.Dial("srt", u.Host, config)
if err != nil {
s.logger.Error().WithField("address", srturl).WithError(err).Log("Proxying address failed")
s.log("SUBSCRIBE", "NOTFOUND", si.resource, "no publisher for this resource found", client)
} else {
s.log("SUBSCRIBE", "PROXYSTART", srturl, "", client)
buffer := make([]byte, srt.MAX_MSS_SIZE)
for {
n, err := src.Read(buffer)
if err != nil {
if err != io.EOF {
s.logger.Error().WithField("address", srturl).WithError(err).Log("Proxying address aborted")
}
break
}
conn.Write(buffer[:n])
}
s.log("SUBSCRIBE", "PROXYSTOP", srturl, "", client)
}
} else {
s.log("SUBSCRIBE", "NOTFOUND", si.resource, "no publisher for this resource found", client)
}
} else {
s.log("SUBSCRIBE", "START", si.resource, "", client)
id := ch.AddSubscriber(conn, si.resource)
ch.pubsub.Subscribe(conn)
s.log("SUBSCRIBE", "STOP", si.resource, "", client)
ch.RemoveSubscriber(id)
}
s.log("SUBSCRIBE", "START", si.resource, "", client)
id := ch.AddSubscriber(conn, si.resource)
ch.pubsub.Subscribe(conn)
s.log("SUBSCRIBE", "STOP", si.resource, "", client)
ch.RemoveSubscriber(id)
conn.Close()
}