Create publisher for remote rtmp stream

This commit is contained in:
Ingo Oppermann
2022-08-15 08:47:33 +03:00
parent b51a38c99e
commit 02fec74457
3 changed files with 368 additions and 215 deletions

164
rtmp/channel.go Normal file
View File

@@ -0,0 +1,164 @@
package rtmp
import (
"context"
"net"
"net/url"
"path/filepath"
"strings"
"sync"
"time"
"github.com/datarhei/core/v16/session"
"github.com/datarhei/joy4/av"
"github.com/datarhei/joy4/av/pubsub"
"github.com/datarhei/joy4/format/rtmp"
)
type client struct {
conn connection
id string
createdAt time.Time
txbytes uint64
rxbytes uint64
collector session.Collector
cancel context.CancelFunc
}
func newClient(conn connection, id string, collector session.Collector) *client {
c := &client{
conn: conn,
id: id,
createdAt: time.Now(),
collector: collector,
}
var ctx context.Context
ctx, c.cancel = context.WithCancel(context.Background())
go c.ticker(ctx)
return c
}
func (c *client) ticker(ctx context.Context) {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
txbytes := c.conn.TxBytes()
rxbytes := c.conn.RxBytes()
c.collector.Ingress(c.id, int64(rxbytes-c.rxbytes))
c.collector.Egress(c.id, int64(txbytes-c.txbytes))
c.txbytes = txbytes
c.rxbytes = rxbytes
}
}
}
func (c *client) Close() {
c.cancel()
c.conn.Close()
}
// channel represents a stream that is sent to the server
type channel struct {
// The packet queue for the stream
queue *pubsub.Queue
// The metadata of the stream
streams []av.CodecData
// Whether the stream has an audio track
hasAudio bool
// Whether the stream has a video track
hasVideo bool
collector session.Collector
path string
publisher *client
subscriber map[string]*client
lock sync.RWMutex
isProxy bool
}
func newChannel(conn connection, u *url.URL, remote net.Addr, streams []av.CodecData, isProxy bool, collector session.Collector) *channel {
ch := &channel{
path: u.Path,
publisher: newClient(conn, u.Path, collector),
subscriber: make(map[string]*client),
collector: collector,
streams: streams,
queue: pubsub.NewQueue(),
isProxy: isProxy,
}
addr := remote.String()
ip, _, _ := net.SplitHostPort(addr)
if collector.IsCollectableIP(ip) {
reference := strings.TrimSuffix(filepath.Base(u.Path), filepath.Ext(u.Path))
collector.RegisterAndActivate(u.Path, reference, "publish:"+u.Path, addr)
}
return ch
}
func (ch *channel) Close() {
if ch.publisher == nil {
return
}
ch.publisher.Close()
ch.publisher = nil
ch.queue.Close()
}
func (ch *channel) AddSubscriber(conn *rtmp.Conn) string {
addr := conn.NetConn().RemoteAddr().String()
ip, _, _ := net.SplitHostPort(addr)
client := newClient(conn, addr, ch.collector)
if ch.collector.IsCollectableIP(ip) {
reference := strings.TrimSuffix(filepath.Base(conn.URL.Path), filepath.Ext(conn.URL.Path))
ch.collector.RegisterAndActivate(addr, reference, "play:"+conn.URL.Path, addr)
}
ch.lock.Lock()
ch.subscriber[addr] = client
ch.lock.Unlock()
return addr
}
func (ch *channel) RemoveSubscriber(id string) {
ch.lock.Lock()
defer ch.lock.Unlock()
client := ch.subscriber[id]
if client != nil {
delete(ch.subscriber, id)
client.Close()
}
// If this is a proxied channel and the last subscriber leaves,
// close the channel.
if len(ch.subscriber) == 0 && ch.isProxy {
ch.Close()
}
}

104
rtmp/connection.go Normal file
View File

@@ -0,0 +1,104 @@
package rtmp
import (
"fmt"
"github.com/datarhei/joy4/av"
)
type connection interface {
av.MuxCloser
av.DemuxCloser
TxBytes() uint64
RxBytes() uint64
}
// conn implements the connection interface
type conn struct {
muxer av.MuxCloser
demuxer av.DemuxCloser
txbytes uint64
rxbytes uint64
}
// Make sure that conn implements the connection interface
var _ connection = &conn{}
func newConnectionFromDemuxer(m av.DemuxCloser) connection {
c := &conn{
demuxer: m,
}
return c
}
func (c *conn) TxBytes() uint64 {
return c.txbytes
}
func (c *conn) RxBytes() uint64 {
return c.rxbytes
}
func (c *conn) ReadPacket() (av.Packet, error) {
if c.demuxer != nil {
p, err := c.demuxer.ReadPacket()
if err == nil {
c.rxbytes += uint64(len(p.Data))
}
return p, err
}
return av.Packet{}, fmt.Errorf("no demuxer available")
}
func (c *conn) Streams() ([]av.CodecData, error) {
if c.demuxer != nil {
return c.demuxer.Streams()
}
return nil, fmt.Errorf("no demuxer available")
}
func (c *conn) WritePacket(p av.Packet) error {
if c.muxer != nil {
err := c.muxer.WritePacket(p)
if err == nil {
c.txbytes += uint64(len(p.Data))
}
return err
}
return fmt.Errorf("no muxer available")
}
func (c *conn) WriteHeader(streams []av.CodecData) error {
if c.muxer != nil {
return c.muxer.WriteHeader(streams)
}
return fmt.Errorf("no muxer available")
}
func (c *conn) WriteTrailer() error {
if c.muxer != nil {
return c.muxer.WriteTrailer()
}
return fmt.Errorf("no muxer available")
}
func (c *conn) Close() error {
if c.muxer != nil {
return c.muxer.Close()
}
if c.demuxer != nil {
return c.demuxer.Close()
}
return nil
}

View File

@@ -2,11 +2,10 @@
package rtmp package rtmp
import ( import (
"context"
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"net" "net"
"path/filepath" "net/url"
"strings" "strings"
"sync" "sync"
"time" "time"
@@ -17,9 +16,7 @@ import (
"github.com/datarhei/joy4/av/avutil" "github.com/datarhei/joy4/av/avutil"
"github.com/datarhei/joy4/av/pktque" "github.com/datarhei/joy4/av/pktque"
"github.com/datarhei/joy4/av/pubsub"
"github.com/datarhei/joy4/format" "github.com/datarhei/joy4/format"
"github.com/datarhei/joy4/format/flv/flvio"
"github.com/datarhei/joy4/format/rtmp" "github.com/datarhei/joy4/format/rtmp"
) )
@@ -31,142 +28,6 @@ func init() {
format.RegisterAll() format.RegisterAll()
} }
type client struct {
conn *rtmp.Conn
id string
createdAt time.Time
txbytes uint64
rxbytes uint64
collector session.Collector
cancel context.CancelFunc
}
func newClient(conn *rtmp.Conn, id string, collector session.Collector) *client {
c := &client{
conn: conn,
id: id,
createdAt: time.Now(),
collector: collector,
}
var ctx context.Context
ctx, c.cancel = context.WithCancel(context.Background())
go c.ticker(ctx)
return c
}
func (c *client) ticker(ctx context.Context) {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
txbytes := c.conn.TxBytes()
rxbytes := c.conn.RxBytes()
c.collector.Ingress(c.id, int64(rxbytes-c.rxbytes))
c.collector.Egress(c.id, int64(txbytes-c.txbytes))
c.txbytes = txbytes
c.rxbytes = rxbytes
}
}
}
func (c *client) Close() {
c.cancel()
}
// channel represents a stream that is sent to the server
type channel struct {
// The packet queue for the stream
queue *pubsub.Queue
// The metadata of the stream
metadata flvio.AMFMap
// Whether the stream has an audio track
hasAudio bool
// Whether the stream has a video track
hasVideo bool
collector session.Collector
path string
publisher *client
subscriber map[string]*client
lock sync.RWMutex
}
func newChannel(conn *rtmp.Conn, collector session.Collector) *channel {
ch := &channel{
path: conn.URL.Path,
publisher: newClient(conn, conn.URL.Path, collector),
subscriber: make(map[string]*client),
collector: collector,
}
addr := conn.NetConn().RemoteAddr().String()
ip, _, _ := net.SplitHostPort(addr)
if collector.IsCollectableIP(ip) {
reference := strings.TrimSuffix(filepath.Base(conn.URL.Path), filepath.Ext(conn.URL.Path))
collector.RegisterAndActivate(conn.URL.Path, reference, "publish:"+conn.URL.Path, addr)
}
return ch
}
func (ch *channel) Close() {
if ch.publisher == nil {
return
}
ch.publisher.Close()
ch.publisher = nil
ch.queue.Close()
}
func (ch *channel) AddSubscriber(conn *rtmp.Conn) string {
addr := conn.NetConn().RemoteAddr().String()
ip, _, _ := net.SplitHostPort(addr)
client := newClient(conn, addr, ch.collector)
if ch.collector.IsCollectableIP(ip) {
reference := strings.TrimSuffix(filepath.Base(conn.URL.Path), filepath.Ext(conn.URL.Path))
ch.collector.RegisterAndActivate(addr, reference, "play:"+conn.URL.Path, addr)
}
ch.lock.Lock()
ch.subscriber[addr] = client
ch.lock.Unlock()
return addr
}
func (ch *channel) RemoveSubscriber(id string) {
ch.lock.Lock()
defer ch.lock.Unlock()
client := ch.subscriber[id]
if client != nil {
delete(ch.subscriber, id)
client.Close()
}
}
// Config for a new RTMP server // Config for a new RTMP server
type Config struct { type Config struct {
// Logger. Optional. // Logger. Optional.
@@ -338,47 +199,83 @@ func (s *server) log(who, action, path, message string, client net.Addr) {
// handlePlay is called when a RTMP client wants to play a stream // handlePlay is called when a RTMP client wants to play a stream
func (s *server) handlePlay(conn *rtmp.Conn) { func (s *server) handlePlay(conn *rtmp.Conn) {
client := conn.NetConn().RemoteAddr() defer conn.Close()
remote := conn.NetConn().RemoteAddr()
// Check the token // Check the token
q := conn.URL.Query() q := conn.URL.Query()
token := q.Get("token") token := q.Get("token")
if len(s.token) != 0 && s.token != token { if len(s.token) != 0 && s.token != token {
s.log("PLAY", "FORBIDDEN", conn.URL.Path, "invalid token ("+token+")", client) s.log("PLAY", "FORBIDDEN", conn.URL.Path, "invalid token ("+token+")", remote)
conn.Close()
return return
} }
/*
ip, _, _ := net.SplitHostPort(client.String())
if s.collector.IsCollectableIP(ip) {
maxBitrate := s.collector.MaxEgressBitrate()
if maxBitrate > 0.0 {
streamBitrate := s.collector.SessionTopIngressBitrate(conn.URL.Path) * 2.0
currentBitrate := s.collector.CompanionEgressBitrate() * 1.15
resultingBitrate := currentBitrate + streamBitrate
if resultingBitrate >= maxBitrate {
s.log("PLAY", "FORBIDDEN", conn.URL.Path, "bandwidth limit exceeded", client)
conn.Close()
return
}
}
}
*/
// Look for the stream // Look for the stream
s.lock.RLock() s.lock.RLock()
ch := s.channels[conn.URL.Path] ch := s.channels[conn.URL.Path]
s.lock.RUnlock() s.lock.RUnlock()
if ch != nil { if ch == nil {
// Set the metadata for the client // Check in the cluster for that stream
conn.SetMetaData(ch.metadata) url, err := s.cluster.GetURL("rtmp:" + conn.URL.Path)
if err != nil {
s.log("PLAY", "NOTFOUND", conn.URL.Path, "", remote)
return
}
s.log("PLAY", "START", conn.URL.Path, "", client) src, err := avutil.Open(url)
if err != nil {
s.logger.Error().WithField("address", url).WithError(err).Log("Proxying address failed")
s.log("PLAY", "NOTFOUND", conn.URL.Path, "", remote)
return
}
c := newConnectionFromDemuxer(src)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
s.log("PLAY", "PROXYSTART", url, "", remote)
wg.Done()
err := s.publish(c, conn.URL, remote, true)
if err != nil {
s.logger.Error().WithField("address", url).WithError(err).Log("Proxying address failed")
}
s.log("PLAY", "PROXYSTOP", url, "", remote)
}()
// Wait for the goroutine to start
wg.Wait()
// Wait for channel to become available
ticker := time.NewTicker(200 * time.Millisecond)
tickerStart := time.Now()
for range ticker.C {
s.lock.RLock()
ch = s.channels[conn.URL.Path]
s.lock.RUnlock()
if ch != nil {
break
}
if time.Since(tickerStart) > 2*time.Second {
break
}
}
ticker.Stop()
}
if ch != nil {
// Send the metadata to the client
conn.WriteHeader(ch.streams)
s.log("PLAY", "START", conn.URL.Path, "", remote)
// Get a cursor and apply filters // Get a cursor and apply filters
cursor := ch.queue.Oldest() cursor := ch.queue.Oldest()
@@ -400,71 +297,60 @@ func (s *server) handlePlay(conn *rtmp.Conn) {
id := ch.AddSubscriber(conn) id := ch.AddSubscriber(conn)
// Transfer the data // Transfer the data, blocks until done
avutil.CopyFile(conn, demuxer) avutil.CopyFile(conn, demuxer)
ch.RemoveSubscriber(id) ch.RemoveSubscriber(id)
s.log("PLAY", "STOP", conn.URL.Path, "", client) s.log("PLAY", "STOP", conn.URL.Path, "", remote)
} else { } else {
// Check in the cluster for that stream s.log("PLAY", "NOTFOUND", conn.URL.Path, "", remote)
url, err := s.cluster.GetURL("rtmp:" + conn.URL.Path)
if err == nil {
src, err := avutil.Open(url)
if err != nil {
s.logger.Error().WithField("address", url).WithError(err).Log("Proxying address failed")
s.log("PLAY", "NOTFOUND", conn.URL.Path, "", client)
} else {
s.log("PLAY", "PROXYSTART", url, "", client)
avutil.CopyFile(conn, src)
s.log("PLAY", "PROXYSTOP", url, "", client)
}
} else {
s.log("PLAY", "NOTFOUND", conn.URL.Path, "", client)
}
} }
conn.Close()
} }
// handlePublish is called when a RTMP client wants to publish a stream // handlePublish is called when a RTMP client wants to publish a stream
func (s *server) handlePublish(conn *rtmp.Conn) { func (s *server) handlePublish(conn *rtmp.Conn) {
client := conn.NetConn().RemoteAddr() defer conn.Close()
// Check the token remote := conn.NetConn().RemoteAddr()
q := conn.URL.Query()
token := q.Get("token")
if len(s.token) != 0 && s.token != token { if len(s.token) != 0 {
s.log("PUBLISH", "FORBIDDEN", conn.URL.Path, "invalid token ("+token+")", client) // Check the token
conn.Close() token := conn.URL.Query().Get("token")
return
if s.token != token {
s.log("PUBLISH", "FORBIDDEN", conn.URL.Path, "invalid token ("+token+")", remote)
return
}
} }
// Check the app patch // Check the app patch
if !strings.HasPrefix(conn.URL.Path, s.app) { if !strings.HasPrefix(conn.URL.Path, s.app) {
s.log("PUBLISH", "FORBIDDEN", conn.URL.Path, "invalid app", client) s.log("PUBLISH", "FORBIDDEN", conn.URL.Path, "invalid app", remote)
conn.Close()
return return
} }
// Check the stream if it contains any valid/known streams err := s.publish(conn, conn.URL, remote, false)
streams, _ := conn.Streams() if err != nil {
s.logger.WithField("path", conn.URL.Path).WithError(err).Log("")
}
}
func (s *server) publish(src connection, u *url.URL, remote net.Addr, isProxy bool) error {
// Check the streams if it contains any valid/known streams
streams, _ := src.Streams()
if len(streams) == 0 { if len(streams) == 0 {
s.log("PUBLISH", "INVALID", conn.URL.Path, "no streams available", client) s.log("PUBLISH", "INVALID", u.Path, "no streams available", remote)
conn.Close() return fmt.Errorf("no streams are available")
return
} }
s.lock.Lock() s.lock.Lock()
ch := s.channels[conn.URL.Path] ch := s.channels[u.Path]
if ch == nil { if ch == nil {
// Create a new channel // Create a new channel
ch = newChannel(conn, s.collector) ch = newChannel(src, u, remote, streams, isProxy, s.collector)
ch.metadata = conn.GetMetaData()
ch.queue = pubsub.NewQueue()
ch.queue.WriteHeader(streams) ch.queue.WriteHeader(streams)
for _, stream := range streams { for _, stream := range streams {
@@ -478,7 +364,7 @@ func (s *server) handlePublish(conn *rtmp.Conn) {
} }
} }
s.channels[conn.URL.Path] = ch s.channels[u.Path] = ch
} else { } else {
ch = nil ch = nil
} }
@@ -486,27 +372,26 @@ func (s *server) handlePublish(conn *rtmp.Conn) {
s.lock.Unlock() s.lock.Unlock()
if ch == nil { if ch == nil {
s.log("PUBLISH", "CONFLICT", conn.URL.Path, "already publishing", client) s.log("PUBLISH", "CONFLICT", u.Path, "already publishing", remote)
conn.Close() return fmt.Errorf("already publishing")
return
} }
s.log("PUBLISH", "START", conn.URL.Path, "", client) s.log("PUBLISH", "START", u.Path, "", remote)
for _, stream := range streams { for _, stream := range streams {
s.log("PUBLISH", "STREAM", conn.URL.Path, stream.Type().String(), client) s.log("PUBLISH", "STREAM", u.Path, stream.Type().String(), remote)
} }
// Ingest the data // Ingest the data, blocks until done
avutil.CopyPackets(ch.queue, conn) avutil.CopyPackets(ch.queue, src)
s.lock.Lock() s.lock.Lock()
delete(s.channels, conn.URL.Path) delete(s.channels, u.Path)
s.lock.Unlock() s.lock.Unlock()
ch.Close() ch.Close()
s.log("PUBLISH", "STOP", conn.URL.Path, "", client) s.log("PUBLISH", "STOP", u.Path, "", remote)
conn.Close() return nil
} }