mirror of
https://github.com/datarhei/core.git
synced 2025-09-26 20:11:29 +08:00
Add GoSRT & improvements (repo-merge)
Commits (Ingo Oppermann): - Add experimental SRT connection stats and logs - Hide /config/reload endpoint in reade-only mode - Add SRT server - Create v16 in go.mod - Fix data races, tests, lint, and update dependencies - Add trailing slash for routed directories (datarhei/restreamer#340) - Allow relative URLs in content in static routes Co-Authored-By: Ingo Oppermann <57445+ioppermann@users.noreply.github.com>
This commit is contained in:
519
srt/srt.go
Normal file
519
srt/srt.go
Normal file
@@ -0,0 +1,519 @@
|
||||
package srt
|
||||
|
||||
import (
|
||||
"container/ring"
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/datarhei/core/v16/log"
|
||||
"github.com/datarhei/core/v16/session"
|
||||
srt "github.com/datarhei/gosrt"
|
||||
)
|
||||
|
||||
// ErrServerClosed is returned by ListenAndServe if the server
|
||||
// has been closed regularly with the Close() function.
|
||||
var ErrServerClosed = srt.ErrServerClosed
|
||||
|
||||
type client struct {
|
||||
conn srt.Conn
|
||||
id string
|
||||
createdAt time.Time
|
||||
|
||||
txbytes uint64
|
||||
rxbytes uint64
|
||||
|
||||
collector session.Collector
|
||||
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func newClient(conn srt.Conn, id string, collector session.Collector) *client {
|
||||
c := &client{
|
||||
conn: conn,
|
||||
id: id,
|
||||
createdAt: time.Now(),
|
||||
|
||||
collector: collector,
|
||||
}
|
||||
|
||||
var ctx context.Context
|
||||
ctx, c.cancel = context.WithCancel(context.Background())
|
||||
|
||||
go c.ticker(ctx)
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *client) ticker(ctx context.Context) {
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
stats := c.conn.Stats()
|
||||
|
||||
rxbytes := stats.ByteRecv
|
||||
txbytes := stats.ByteSent
|
||||
|
||||
c.collector.Ingress(c.id, int64(rxbytes-c.rxbytes))
|
||||
c.collector.Egress(c.id, int64(txbytes-c.txbytes))
|
||||
|
||||
c.txbytes = txbytes
|
||||
c.rxbytes = rxbytes
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *client) Close() {
|
||||
c.cancel()
|
||||
}
|
||||
|
||||
// channel represents a stream that is sent to the server
|
||||
type channel struct {
|
||||
pubsub srt.PubSub
|
||||
collector session.Collector
|
||||
path string
|
||||
|
||||
publisher *client
|
||||
subscriber map[string]*client
|
||||
lock sync.RWMutex
|
||||
}
|
||||
|
||||
func newChannel(conn srt.Conn, resource string, collector session.Collector) *channel {
|
||||
ch := &channel{
|
||||
pubsub: srt.NewPubSub(srt.PubSubConfig{}),
|
||||
path: resource,
|
||||
publisher: newClient(conn, resource, collector),
|
||||
subscriber: make(map[string]*client),
|
||||
collector: collector,
|
||||
}
|
||||
|
||||
addr := conn.RemoteAddr().String()
|
||||
ip, _, _ := net.SplitHostPort(addr)
|
||||
|
||||
if collector.IsCollectableIP(ip) {
|
||||
collector.RegisterAndActivate(resource, resource, "publish:"+resource, addr)
|
||||
}
|
||||
|
||||
return ch
|
||||
}
|
||||
|
||||
func (ch *channel) Close() {
|
||||
if ch.publisher == nil {
|
||||
return
|
||||
}
|
||||
|
||||
ch.publisher.Close()
|
||||
ch.publisher = nil
|
||||
}
|
||||
|
||||
func (ch *channel) AddSubscriber(conn srt.Conn, resource string) string {
|
||||
addr := conn.RemoteAddr().String()
|
||||
ip, _, _ := net.SplitHostPort(addr)
|
||||
|
||||
client := newClient(conn, addr, ch.collector)
|
||||
|
||||
if ch.collector.IsCollectableIP(ip) {
|
||||
ch.collector.RegisterAndActivate(addr, resource, "play:"+resource, addr)
|
||||
}
|
||||
|
||||
ch.lock.Lock()
|
||||
ch.subscriber[addr] = client
|
||||
ch.lock.Unlock()
|
||||
|
||||
return addr
|
||||
}
|
||||
|
||||
func (ch *channel) RemoveSubscriber(id string) {
|
||||
ch.lock.Lock()
|
||||
defer ch.lock.Unlock()
|
||||
|
||||
client := ch.subscriber[id]
|
||||
if client != nil {
|
||||
delete(ch.subscriber, id)
|
||||
client.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// Config for a new SRT server
|
||||
type Config struct {
|
||||
// The address the SRT server should listen on, e.g. ":1935"
|
||||
Addr string
|
||||
|
||||
// A token that needs to be added to the URL as query string
|
||||
// in order push or pull a stream. The key for the query
|
||||
// parameter is "token". Optional. By default no token is
|
||||
// required.
|
||||
Token string
|
||||
|
||||
Passphrase string
|
||||
|
||||
// Logger. Optional.
|
||||
Logger log.Logger
|
||||
|
||||
Collector session.Collector
|
||||
|
||||
SRTLogTopics []string
|
||||
}
|
||||
|
||||
// Server represents a SRT server
|
||||
type Server interface {
|
||||
// ListenAndServe starts the SRT server
|
||||
ListenAndServe() error
|
||||
|
||||
// Close stops the RTMP server and closes all connections
|
||||
Close()
|
||||
|
||||
// Channels return a list of currently publishing streams
|
||||
Channels() Channels
|
||||
}
|
||||
|
||||
// server implements the Server interface
|
||||
type server struct {
|
||||
addr string
|
||||
token string
|
||||
passphrase string
|
||||
|
||||
collector session.Collector
|
||||
|
||||
server srt.Server
|
||||
|
||||
// Map of publishing channels and a lock to serialize
|
||||
// access to the map.
|
||||
channels map[string]*channel
|
||||
lock sync.RWMutex
|
||||
|
||||
logger log.Logger
|
||||
|
||||
srtlogger srt.Logger
|
||||
srtloggerCancel context.CancelFunc
|
||||
srtlog map[string]*ring.Ring
|
||||
srtlogLock sync.RWMutex
|
||||
}
|
||||
|
||||
func New(config Config) (Server, error) {
|
||||
s := &server{
|
||||
addr: config.Addr,
|
||||
token: config.Token,
|
||||
passphrase: config.Passphrase,
|
||||
collector: config.Collector,
|
||||
logger: config.Logger,
|
||||
}
|
||||
|
||||
if s.collector == nil {
|
||||
s.collector = session.NewNullCollector()
|
||||
}
|
||||
|
||||
if s.logger == nil {
|
||||
s.logger = log.New("")
|
||||
}
|
||||
|
||||
s.srtlogger = srt.NewLogger(config.SRTLogTopics)
|
||||
|
||||
s.srtlogLock.Lock()
|
||||
s.srtlog = make(map[string]*ring.Ring)
|
||||
s.srtlogLock.Unlock()
|
||||
|
||||
s.channels = make(map[string]*channel)
|
||||
|
||||
srtconfig := srt.DefaultConfig()
|
||||
|
||||
srtconfig.KMPreAnnounce = 200
|
||||
srtconfig.KMRefreshRate = 10000
|
||||
srtconfig.Passphrase = config.Passphrase
|
||||
srtconfig.Logger = s.srtlogger
|
||||
|
||||
s.server.Addr = config.Addr
|
||||
s.server.Config = &srtconfig
|
||||
s.server.HandleConnect = s.handleConnect
|
||||
s.server.HandlePublish = s.handlePublish
|
||||
s.server.HandleSubscribe = s.handleSubscribe
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *server) ListenAndServe() error {
|
||||
var ctx context.Context
|
||||
ctx, s.srtloggerCancel = context.WithCancel(context.Background())
|
||||
|
||||
go s.srtlogListener(ctx)
|
||||
|
||||
return s.server.ListenAndServe()
|
||||
}
|
||||
|
||||
func (s *server) Close() {
|
||||
s.server.Shutdown()
|
||||
|
||||
s.srtloggerCancel()
|
||||
}
|
||||
|
||||
type Connection struct {
|
||||
Log map[string][]string
|
||||
Stats srt.Statistics
|
||||
}
|
||||
|
||||
type Channels struct {
|
||||
Publisher map[string]uint32
|
||||
Subscriber map[string][]uint32
|
||||
Connections map[uint32]Connection
|
||||
Log map[string][]string
|
||||
}
|
||||
|
||||
func (s *server) Channels() Channels {
|
||||
st := Channels{
|
||||
Publisher: map[string]uint32{},
|
||||
Subscriber: map[string][]uint32{},
|
||||
Connections: map[uint32]Connection{},
|
||||
Log: map[string][]string{},
|
||||
}
|
||||
|
||||
s.lock.RLock()
|
||||
for id, ch := range s.channels {
|
||||
socketId := ch.publisher.conn.SocketId()
|
||||
st.Publisher[id] = socketId
|
||||
|
||||
st.Connections[socketId] = Connection{
|
||||
Stats: ch.publisher.conn.Stats(),
|
||||
Log: map[string][]string{},
|
||||
}
|
||||
|
||||
for _, c := range ch.subscriber {
|
||||
socketId := c.conn.SocketId()
|
||||
st.Subscriber[id] = append(st.Subscriber[id], socketId)
|
||||
|
||||
st.Connections[socketId] = Connection{
|
||||
Stats: c.conn.Stats(),
|
||||
Log: map[string][]string{},
|
||||
}
|
||||
}
|
||||
}
|
||||
s.lock.RUnlock()
|
||||
|
||||
s.srtlogLock.RLock()
|
||||
for topic, buf := range s.srtlog {
|
||||
|
||||
buf.Do(func(l interface{}) {
|
||||
if l == nil {
|
||||
return
|
||||
}
|
||||
|
||||
ll := l.(srt.Log)
|
||||
|
||||
message := fmt.Sprintf("%s [%d] %s", ll.Time.Format("2006-01-02 15:04:05"), ll.SocketId, ll.Message)
|
||||
|
||||
st.Log[topic] = append(st.Log[topic], message)
|
||||
|
||||
if ll.SocketId != 0 {
|
||||
if _, ok := st.Connections[ll.SocketId]; ok {
|
||||
st.Connections[ll.SocketId].Log[topic] = append(st.Connections[ll.SocketId].Log[topic], message)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
s.srtlogLock.RUnlock()
|
||||
|
||||
return st
|
||||
}
|
||||
|
||||
func (s *server) srtlogListener(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
break
|
||||
case l := <-s.srtlogger.Listen():
|
||||
s.srtlogLock.Lock()
|
||||
if buf := s.srtlog[l.Topic]; buf == nil {
|
||||
s.srtlog[l.Topic] = ring.New(100)
|
||||
}
|
||||
s.srtlog[l.Topic].Value = l
|
||||
s.srtlog[l.Topic] = s.srtlog[l.Topic].Next()
|
||||
s.srtlogLock.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *server) log(handler, action, resource, message string, client net.Addr) {
|
||||
s.logger.Info().WithFields(log.Fields{
|
||||
"handler": handler,
|
||||
"status": action,
|
||||
"resource": resource,
|
||||
"client": client.String(),
|
||||
}).Log(message)
|
||||
}
|
||||
|
||||
type streamInfo struct {
|
||||
mode string
|
||||
resource string
|
||||
token string
|
||||
}
|
||||
|
||||
func parseStreamId(streamid string) (streamInfo, error) {
|
||||
si := streamInfo{}
|
||||
|
||||
if !strings.HasPrefix(streamid, "#!:") {
|
||||
return si, fmt.Errorf("unknown streamid format")
|
||||
}
|
||||
|
||||
streamid = strings.TrimPrefix(streamid, "#!:")
|
||||
|
||||
kvs := strings.Split(streamid, ",")
|
||||
|
||||
split := func(s, sep string) (string, string, error) {
|
||||
splitted := strings.SplitN(s, sep, 2)
|
||||
|
||||
if len(splitted) != 2 {
|
||||
return "", "", fmt.Errorf("invalid key/value pair")
|
||||
}
|
||||
|
||||
return splitted[0], splitted[1], nil
|
||||
}
|
||||
|
||||
for _, kv := range kvs {
|
||||
key, value, err := split(kv, "=")
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
switch key {
|
||||
case "m":
|
||||
si.mode = value
|
||||
case "r":
|
||||
si.resource = value
|
||||
case "token":
|
||||
si.token = value
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
return si, nil
|
||||
}
|
||||
|
||||
func (s *server) handleConnect(req srt.ConnRequest) srt.ConnType {
|
||||
mode := srt.REJECT
|
||||
client := req.RemoteAddr()
|
||||
streamId := req.StreamId()
|
||||
|
||||
si, err := parseStreamId(streamId)
|
||||
if err != nil {
|
||||
s.log("CONNECT", "INVALID", "", err.Error(), client)
|
||||
return srt.REJECT
|
||||
}
|
||||
|
||||
if len(si.resource) == 0 {
|
||||
s.log("CONNECT", "INVALID", "", "stream resource not provided", client)
|
||||
return srt.REJECT
|
||||
}
|
||||
|
||||
if si.mode == "publish" {
|
||||
mode = srt.PUBLISH
|
||||
} else if si.mode == "request" {
|
||||
mode = srt.SUBSCRIBE
|
||||
} else {
|
||||
s.log("CONNECT", "INVALID", si.resource, "invalid connection mode", client)
|
||||
return srt.REJECT
|
||||
}
|
||||
|
||||
if req.IsEncrypted() {
|
||||
if err := req.SetPassphrase(s.passphrase); err != nil {
|
||||
s.log("CONNECT", "FORBIDDEN", si.resource, err.Error(), client)
|
||||
return srt.REJECT
|
||||
}
|
||||
}
|
||||
|
||||
// Check the token
|
||||
if len(s.token) != 0 && s.token != si.token {
|
||||
s.log("CONNECT", "FORBIDDEN", si.resource, "invalid token ("+si.token+")", client)
|
||||
return srt.REJECT
|
||||
}
|
||||
|
||||
s.lock.RLock()
|
||||
ch := s.channels[si.resource]
|
||||
s.lock.RUnlock()
|
||||
|
||||
if mode == srt.PUBLISH && ch != nil {
|
||||
s.log("CONNECT", "CONFLICT", si.resource, "already publishing", client)
|
||||
return srt.REJECT
|
||||
}
|
||||
|
||||
if mode == srt.SUBSCRIBE && ch == nil {
|
||||
s.log("CONNECT", "NOTFOUND", si.resource, "no publisher for this resource found", client)
|
||||
return srt.REJECT
|
||||
}
|
||||
|
||||
return mode
|
||||
}
|
||||
|
||||
func (s *server) handlePublish(conn srt.Conn) {
|
||||
streamId := conn.StreamId()
|
||||
client := conn.RemoteAddr()
|
||||
|
||||
si, _ := parseStreamId(streamId)
|
||||
|
||||
// Look for the stream
|
||||
s.lock.Lock()
|
||||
ch := s.channels[si.resource]
|
||||
if ch == nil {
|
||||
ch = newChannel(conn, si.resource, s.collector)
|
||||
s.channels[si.resource] = ch
|
||||
} else {
|
||||
ch = nil
|
||||
}
|
||||
s.lock.Unlock()
|
||||
|
||||
if ch == nil {
|
||||
s.log("PUBLISH", "CONFLICT", si.resource, "already publishing", client)
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
|
||||
s.log("PUBLISH", "START", si.resource, "", client)
|
||||
|
||||
ch.pubsub.Publish(conn)
|
||||
|
||||
s.lock.Lock()
|
||||
delete(s.channels, si.resource)
|
||||
s.lock.Unlock()
|
||||
|
||||
ch.Close()
|
||||
|
||||
s.log("PUBLISH", "STOP", si.resource, "", client)
|
||||
|
||||
conn.Close()
|
||||
}
|
||||
|
||||
func (s *server) handleSubscribe(conn srt.Conn) {
|
||||
streamId := conn.StreamId()
|
||||
client := conn.RemoteAddr()
|
||||
|
||||
si, _ := parseStreamId(streamId)
|
||||
|
||||
// Look for the stream
|
||||
s.lock.RLock()
|
||||
ch := s.channels[si.resource]
|
||||
s.lock.RUnlock()
|
||||
|
||||
if ch == nil {
|
||||
s.log("SUBSCRIBE", "NOTFOUND", si.resource, "no publisher for this resource found", client)
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
|
||||
s.log("SUBSCRIBE", "START", si.resource, "", client)
|
||||
|
||||
id := ch.AddSubscriber(conn, si.resource)
|
||||
|
||||
ch.pubsub.Subscribe(conn)
|
||||
|
||||
s.log("SUBSCRIBE", "STOP", si.resource, "", client)
|
||||
|
||||
ch.RemoveSubscriber(id)
|
||||
|
||||
conn.Close()
|
||||
}
|
26
srt/srt_test.go
Normal file
26
srt/srt_test.go
Normal file
@@ -0,0 +1,26 @@
|
||||
package srt
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestParseStreamId(t *testing.T) {
|
||||
streamids := map[string]streamInfo{
|
||||
"bla": {},
|
||||
"#!:": {},
|
||||
"#!:key=value": {},
|
||||
"#!:m=publish": {mode: "publish"},
|
||||
"#!:r=123456789": {resource: "123456789"},
|
||||
"#!:token=foobar": {token: "foobar"},
|
||||
"#!:token=foo,bar": {token: "foo"},
|
||||
"#!:m=publish,r=123456789,token=foobar": {mode: "publish", resource: "123456789", token: "foobar"},
|
||||
}
|
||||
|
||||
for streamid, wantsi := range streamids {
|
||||
si, _ := parseStreamId(streamid)
|
||||
|
||||
require.Equal(t, wantsi, si)
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user