From 02fec7445760bdf5818d9f2893bdd5baaf463fd9 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Mon, 15 Aug 2022 08:47:33 +0300 Subject: [PATCH] Create publisher for remote rtmp stream --- rtmp/channel.go | 164 +++++++++++++++++++++++ rtmp/connection.go | 104 +++++++++++++++ rtmp/rtmp.go | 315 ++++++++++++++------------------------------- 3 files changed, 368 insertions(+), 215 deletions(-) create mode 100644 rtmp/channel.go create mode 100644 rtmp/connection.go diff --git a/rtmp/channel.go b/rtmp/channel.go new file mode 100644 index 00000000..52c1db84 --- /dev/null +++ b/rtmp/channel.go @@ -0,0 +1,164 @@ +package rtmp + +import ( + "context" + "net" + "net/url" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/datarhei/core/v16/session" + "github.com/datarhei/joy4/av" + "github.com/datarhei/joy4/av/pubsub" + "github.com/datarhei/joy4/format/rtmp" +) + +type client struct { + conn connection + id string + createdAt time.Time + + txbytes uint64 + rxbytes uint64 + + collector session.Collector + + cancel context.CancelFunc +} + +func newClient(conn connection, id string, collector session.Collector) *client { + c := &client{ + conn: conn, + id: id, + createdAt: time.Now(), + + collector: collector, + } + + var ctx context.Context + ctx, c.cancel = context.WithCancel(context.Background()) + + go c.ticker(ctx) + + return c +} + +func (c *client) ticker(ctx context.Context) { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + txbytes := c.conn.TxBytes() + rxbytes := c.conn.RxBytes() + + c.collector.Ingress(c.id, int64(rxbytes-c.rxbytes)) + c.collector.Egress(c.id, int64(txbytes-c.txbytes)) + + c.txbytes = txbytes + c.rxbytes = rxbytes + } + } +} + +func (c *client) Close() { + c.cancel() + c.conn.Close() +} + +// channel represents a stream that is sent to the server +type channel struct { + // The packet queue for the stream + queue *pubsub.Queue + + // The metadata of the stream + streams []av.CodecData + + // Whether the stream has an audio track + hasAudio bool + + // Whether the stream has a video track + hasVideo bool + + collector session.Collector + path string + + publisher *client + subscriber map[string]*client + lock sync.RWMutex + + isProxy bool +} + +func newChannel(conn connection, u *url.URL, remote net.Addr, streams []av.CodecData, isProxy bool, collector session.Collector) *channel { + ch := &channel{ + path: u.Path, + publisher: newClient(conn, u.Path, collector), + subscriber: make(map[string]*client), + collector: collector, + streams: streams, + queue: pubsub.NewQueue(), + isProxy: isProxy, + } + + addr := remote.String() + ip, _, _ := net.SplitHostPort(addr) + + if collector.IsCollectableIP(ip) { + reference := strings.TrimSuffix(filepath.Base(u.Path), filepath.Ext(u.Path)) + collector.RegisterAndActivate(u.Path, reference, "publish:"+u.Path, addr) + } + + return ch +} + +func (ch *channel) Close() { + if ch.publisher == nil { + return + } + + ch.publisher.Close() + ch.publisher = nil + + ch.queue.Close() +} + +func (ch *channel) AddSubscriber(conn *rtmp.Conn) string { + addr := conn.NetConn().RemoteAddr().String() + ip, _, _ := net.SplitHostPort(addr) + + client := newClient(conn, addr, ch.collector) + + if ch.collector.IsCollectableIP(ip) { + reference := strings.TrimSuffix(filepath.Base(conn.URL.Path), filepath.Ext(conn.URL.Path)) + ch.collector.RegisterAndActivate(addr, reference, "play:"+conn.URL.Path, addr) + } + + ch.lock.Lock() + ch.subscriber[addr] = client + ch.lock.Unlock() + + return addr +} + +func (ch *channel) RemoveSubscriber(id string) { + ch.lock.Lock() + defer ch.lock.Unlock() + + client := ch.subscriber[id] + if client != nil { + delete(ch.subscriber, id) + client.Close() + } + + // If this is a proxied channel and the last subscriber leaves, + // close the channel. + if len(ch.subscriber) == 0 && ch.isProxy { + ch.Close() + } +} diff --git a/rtmp/connection.go b/rtmp/connection.go new file mode 100644 index 00000000..43cee496 --- /dev/null +++ b/rtmp/connection.go @@ -0,0 +1,104 @@ +package rtmp + +import ( + "fmt" + + "github.com/datarhei/joy4/av" +) + +type connection interface { + av.MuxCloser + av.DemuxCloser + TxBytes() uint64 + RxBytes() uint64 +} + +// conn implements the connection interface +type conn struct { + muxer av.MuxCloser + demuxer av.DemuxCloser + + txbytes uint64 + rxbytes uint64 +} + +// Make sure that conn implements the connection interface +var _ connection = &conn{} + +func newConnectionFromDemuxer(m av.DemuxCloser) connection { + c := &conn{ + demuxer: m, + } + + return c +} + +func (c *conn) TxBytes() uint64 { + return c.txbytes +} + +func (c *conn) RxBytes() uint64 { + return c.rxbytes +} + +func (c *conn) ReadPacket() (av.Packet, error) { + if c.demuxer != nil { + p, err := c.demuxer.ReadPacket() + if err == nil { + c.rxbytes += uint64(len(p.Data)) + } + + return p, err + } + + return av.Packet{}, fmt.Errorf("no demuxer available") +} + +func (c *conn) Streams() ([]av.CodecData, error) { + if c.demuxer != nil { + return c.demuxer.Streams() + } + + return nil, fmt.Errorf("no demuxer available") +} + +func (c *conn) WritePacket(p av.Packet) error { + if c.muxer != nil { + err := c.muxer.WritePacket(p) + if err == nil { + c.txbytes += uint64(len(p.Data)) + } + + return err + } + + return fmt.Errorf("no muxer available") +} + +func (c *conn) WriteHeader(streams []av.CodecData) error { + if c.muxer != nil { + return c.muxer.WriteHeader(streams) + } + + return fmt.Errorf("no muxer available") +} + +func (c *conn) WriteTrailer() error { + if c.muxer != nil { + return c.muxer.WriteTrailer() + } + + return fmt.Errorf("no muxer available") +} + +func (c *conn) Close() error { + if c.muxer != nil { + return c.muxer.Close() + } + + if c.demuxer != nil { + return c.demuxer.Close() + } + + return nil +} diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index e8fff1d2..6267ace8 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -2,11 +2,10 @@ package rtmp import ( - "context" "crypto/tls" "fmt" "net" - "path/filepath" + "net/url" "strings" "sync" "time" @@ -17,9 +16,7 @@ import ( "github.com/datarhei/joy4/av/avutil" "github.com/datarhei/joy4/av/pktque" - "github.com/datarhei/joy4/av/pubsub" "github.com/datarhei/joy4/format" - "github.com/datarhei/joy4/format/flv/flvio" "github.com/datarhei/joy4/format/rtmp" ) @@ -31,142 +28,6 @@ func init() { format.RegisterAll() } -type client struct { - conn *rtmp.Conn - id string - createdAt time.Time - - txbytes uint64 - rxbytes uint64 - - collector session.Collector - - cancel context.CancelFunc -} - -func newClient(conn *rtmp.Conn, id string, collector session.Collector) *client { - c := &client{ - conn: conn, - id: id, - createdAt: time.Now(), - - collector: collector, - } - - var ctx context.Context - ctx, c.cancel = context.WithCancel(context.Background()) - - go c.ticker(ctx) - - return c -} - -func (c *client) ticker(ctx context.Context) { - ticker := time.NewTicker(1 * time.Second) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - txbytes := c.conn.TxBytes() - rxbytes := c.conn.RxBytes() - - c.collector.Ingress(c.id, int64(rxbytes-c.rxbytes)) - c.collector.Egress(c.id, int64(txbytes-c.txbytes)) - - c.txbytes = txbytes - c.rxbytes = rxbytes - } - } -} - -func (c *client) Close() { - c.cancel() -} - -// channel represents a stream that is sent to the server -type channel struct { - // The packet queue for the stream - queue *pubsub.Queue - - // The metadata of the stream - metadata flvio.AMFMap - - // Whether the stream has an audio track - hasAudio bool - - // Whether the stream has a video track - hasVideo bool - - collector session.Collector - path string - - publisher *client - subscriber map[string]*client - lock sync.RWMutex -} - -func newChannel(conn *rtmp.Conn, collector session.Collector) *channel { - ch := &channel{ - path: conn.URL.Path, - publisher: newClient(conn, conn.URL.Path, collector), - subscriber: make(map[string]*client), - collector: collector, - } - - addr := conn.NetConn().RemoteAddr().String() - ip, _, _ := net.SplitHostPort(addr) - - if collector.IsCollectableIP(ip) { - reference := strings.TrimSuffix(filepath.Base(conn.URL.Path), filepath.Ext(conn.URL.Path)) - collector.RegisterAndActivate(conn.URL.Path, reference, "publish:"+conn.URL.Path, addr) - } - - return ch -} - -func (ch *channel) Close() { - if ch.publisher == nil { - return - } - - ch.publisher.Close() - ch.publisher = nil - - ch.queue.Close() -} - -func (ch *channel) AddSubscriber(conn *rtmp.Conn) string { - addr := conn.NetConn().RemoteAddr().String() - ip, _, _ := net.SplitHostPort(addr) - - client := newClient(conn, addr, ch.collector) - - if ch.collector.IsCollectableIP(ip) { - reference := strings.TrimSuffix(filepath.Base(conn.URL.Path), filepath.Ext(conn.URL.Path)) - ch.collector.RegisterAndActivate(addr, reference, "play:"+conn.URL.Path, addr) - } - - ch.lock.Lock() - ch.subscriber[addr] = client - ch.lock.Unlock() - - return addr -} - -func (ch *channel) RemoveSubscriber(id string) { - ch.lock.Lock() - defer ch.lock.Unlock() - - client := ch.subscriber[id] - if client != nil { - delete(ch.subscriber, id) - client.Close() - } -} - // Config for a new RTMP server type Config struct { // Logger. Optional. @@ -338,47 +199,83 @@ func (s *server) log(who, action, path, message string, client net.Addr) { // handlePlay is called when a RTMP client wants to play a stream func (s *server) handlePlay(conn *rtmp.Conn) { - client := conn.NetConn().RemoteAddr() + defer conn.Close() + + remote := conn.NetConn().RemoteAddr() // Check the token q := conn.URL.Query() token := q.Get("token") if len(s.token) != 0 && s.token != token { - s.log("PLAY", "FORBIDDEN", conn.URL.Path, "invalid token ("+token+")", client) - conn.Close() + s.log("PLAY", "FORBIDDEN", conn.URL.Path, "invalid token ("+token+")", remote) return } - /* - ip, _, _ := net.SplitHostPort(client.String()) - if s.collector.IsCollectableIP(ip) { - maxBitrate := s.collector.MaxEgressBitrate() - if maxBitrate > 0.0 { - streamBitrate := s.collector.SessionTopIngressBitrate(conn.URL.Path) * 2.0 - currentBitrate := s.collector.CompanionEgressBitrate() * 1.15 - - resultingBitrate := currentBitrate + streamBitrate - - if resultingBitrate >= maxBitrate { - s.log("PLAY", "FORBIDDEN", conn.URL.Path, "bandwidth limit exceeded", client) - conn.Close() - return - } - } - } - */ - // Look for the stream s.lock.RLock() ch := s.channels[conn.URL.Path] s.lock.RUnlock() - if ch != nil { - // Set the metadata for the client - conn.SetMetaData(ch.metadata) + if ch == nil { + // Check in the cluster for that stream + url, err := s.cluster.GetURL("rtmp:" + conn.URL.Path) + if err != nil { + s.log("PLAY", "NOTFOUND", conn.URL.Path, "", remote) + return + } - s.log("PLAY", "START", conn.URL.Path, "", client) + src, err := avutil.Open(url) + if err != nil { + s.logger.Error().WithField("address", url).WithError(err).Log("Proxying address failed") + s.log("PLAY", "NOTFOUND", conn.URL.Path, "", remote) + return + } + + c := newConnectionFromDemuxer(src) + + wg := sync.WaitGroup{} + wg.Add(1) + + go func() { + s.log("PLAY", "PROXYSTART", url, "", remote) + wg.Done() + err := s.publish(c, conn.URL, remote, true) + if err != nil { + s.logger.Error().WithField("address", url).WithError(err).Log("Proxying address failed") + } + s.log("PLAY", "PROXYSTOP", url, "", remote) + }() + + // Wait for the goroutine to start + wg.Wait() + + // Wait for channel to become available + ticker := time.NewTicker(200 * time.Millisecond) + tickerStart := time.Now() + + for range ticker.C { + s.lock.RLock() + ch = s.channels[conn.URL.Path] + s.lock.RUnlock() + + if ch != nil { + break + } + + if time.Since(tickerStart) > 2*time.Second { + break + } + } + + ticker.Stop() + } + + if ch != nil { + // Send the metadata to the client + conn.WriteHeader(ch.streams) + + s.log("PLAY", "START", conn.URL.Path, "", remote) // Get a cursor and apply filters cursor := ch.queue.Oldest() @@ -400,71 +297,60 @@ func (s *server) handlePlay(conn *rtmp.Conn) { id := ch.AddSubscriber(conn) - // Transfer the data + // Transfer the data, blocks until done avutil.CopyFile(conn, demuxer) ch.RemoveSubscriber(id) - s.log("PLAY", "STOP", conn.URL.Path, "", client) + s.log("PLAY", "STOP", conn.URL.Path, "", remote) } else { - // Check in the cluster for that stream - url, err := s.cluster.GetURL("rtmp:" + conn.URL.Path) - if err == nil { - src, err := avutil.Open(url) - if err != nil { - s.logger.Error().WithField("address", url).WithError(err).Log("Proxying address failed") - s.log("PLAY", "NOTFOUND", conn.URL.Path, "", client) - } else { - s.log("PLAY", "PROXYSTART", url, "", client) - avutil.CopyFile(conn, src) - s.log("PLAY", "PROXYSTOP", url, "", client) - } - } else { - s.log("PLAY", "NOTFOUND", conn.URL.Path, "", client) - } + s.log("PLAY", "NOTFOUND", conn.URL.Path, "", remote) } - - conn.Close() } // handlePublish is called when a RTMP client wants to publish a stream func (s *server) handlePublish(conn *rtmp.Conn) { - client := conn.NetConn().RemoteAddr() + defer conn.Close() - // Check the token - q := conn.URL.Query() - token := q.Get("token") + remote := conn.NetConn().RemoteAddr() - if len(s.token) != 0 && s.token != token { - s.log("PUBLISH", "FORBIDDEN", conn.URL.Path, "invalid token ("+token+")", client) - conn.Close() - return + if len(s.token) != 0 { + // Check the token + token := conn.URL.Query().Get("token") + + if s.token != token { + s.log("PUBLISH", "FORBIDDEN", conn.URL.Path, "invalid token ("+token+")", remote) + return + } } // Check the app patch if !strings.HasPrefix(conn.URL.Path, s.app) { - s.log("PUBLISH", "FORBIDDEN", conn.URL.Path, "invalid app", client) - conn.Close() + s.log("PUBLISH", "FORBIDDEN", conn.URL.Path, "invalid app", remote) return } - // Check the stream if it contains any valid/known streams - streams, _ := conn.Streams() + err := s.publish(conn, conn.URL, remote, false) + if err != nil { + s.logger.WithField("path", conn.URL.Path).WithError(err).Log("") + } +} + +func (s *server) publish(src connection, u *url.URL, remote net.Addr, isProxy bool) error { + // Check the streams if it contains any valid/known streams + streams, _ := src.Streams() if len(streams) == 0 { - s.log("PUBLISH", "INVALID", conn.URL.Path, "no streams available", client) - conn.Close() - return + s.log("PUBLISH", "INVALID", u.Path, "no streams available", remote) + return fmt.Errorf("no streams are available") } s.lock.Lock() - ch := s.channels[conn.URL.Path] + ch := s.channels[u.Path] if ch == nil { // Create a new channel - ch = newChannel(conn, s.collector) - ch.metadata = conn.GetMetaData() - ch.queue = pubsub.NewQueue() + ch = newChannel(src, u, remote, streams, isProxy, s.collector) ch.queue.WriteHeader(streams) for _, stream := range streams { @@ -478,7 +364,7 @@ func (s *server) handlePublish(conn *rtmp.Conn) { } } - s.channels[conn.URL.Path] = ch + s.channels[u.Path] = ch } else { ch = nil } @@ -486,27 +372,26 @@ func (s *server) handlePublish(conn *rtmp.Conn) { s.lock.Unlock() if ch == nil { - s.log("PUBLISH", "CONFLICT", conn.URL.Path, "already publishing", client) - conn.Close() - return + s.log("PUBLISH", "CONFLICT", u.Path, "already publishing", remote) + return fmt.Errorf("already publishing") } - s.log("PUBLISH", "START", conn.URL.Path, "", client) + s.log("PUBLISH", "START", u.Path, "", remote) for _, stream := range streams { - s.log("PUBLISH", "STREAM", conn.URL.Path, stream.Type().String(), client) + s.log("PUBLISH", "STREAM", u.Path, stream.Type().String(), remote) } - // Ingest the data - avutil.CopyPackets(ch.queue, conn) + // Ingest the data, blocks until done + avutil.CopyPackets(ch.queue, src) s.lock.Lock() - delete(s.channels, conn.URL.Path) + delete(s.channels, u.Path) s.lock.Unlock() ch.Close() - s.log("PUBLISH", "STOP", conn.URL.Path, "", client) + s.log("PUBLISH", "STOP", u.Path, "", remote) - conn.Close() + return nil }