mirror of
				https://github.com/datarhei/core.git
				synced 2025-11-01 03:42:51 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			164 lines
		
	
	
		
			3.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			164 lines
		
	
	
		
			3.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package rtmp
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"net"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/datarhei/core/v16/session"
 | |
| 	"github.com/datarhei/joy4/av"
 | |
| 	"github.com/datarhei/joy4/av/pubsub"
 | |
| 	"github.com/datarhei/joy4/format/rtmp"
 | |
| )
 | |
| 
 | |
| type client struct {
 | |
| 	conn      connection
 | |
| 	id        string
 | |
| 	createdAt time.Time
 | |
| 
 | |
| 	txbytes uint64
 | |
| 	rxbytes uint64
 | |
| 
 | |
| 	collector session.Collector
 | |
| 
 | |
| 	cancel context.CancelFunc
 | |
| }
 | |
| 
 | |
| func newClient(conn connection, id string, collector session.Collector) *client {
 | |
| 	c := &client{
 | |
| 		conn:      conn,
 | |
| 		id:        id,
 | |
| 		createdAt: time.Now(),
 | |
| 
 | |
| 		collector: collector,
 | |
| 	}
 | |
| 
 | |
| 	var ctx context.Context
 | |
| 	ctx, c.cancel = context.WithCancel(context.Background())
 | |
| 
 | |
| 	go c.ticker(ctx)
 | |
| 
 | |
| 	return c
 | |
| }
 | |
| 
 | |
| func (c *client) ticker(ctx context.Context) {
 | |
| 	ticker := time.NewTicker(1 * time.Second)
 | |
| 	defer ticker.Stop()
 | |
| 
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			return
 | |
| 		case <-ticker.C:
 | |
| 			txbytes := c.conn.TxBytes()
 | |
| 			rxbytes := c.conn.RxBytes()
 | |
| 
 | |
| 			c.collector.Ingress(c.id, int64(rxbytes-c.rxbytes))
 | |
| 			c.collector.Egress(c.id, int64(txbytes-c.txbytes))
 | |
| 
 | |
| 			c.txbytes = txbytes
 | |
| 			c.rxbytes = rxbytes
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (c *client) Close() {
 | |
| 	c.cancel()
 | |
| 	c.conn.Close()
 | |
| }
 | |
| 
 | |
| // channel represents a stream that is sent to the server
 | |
| type channel struct {
 | |
| 	// The packet queue for the stream
 | |
| 	queue *pubsub.Queue
 | |
| 
 | |
| 	// The metadata of the stream
 | |
| 	streams []av.CodecData
 | |
| 
 | |
| 	// Whether the stream has an audio track
 | |
| 	hasAudio bool
 | |
| 
 | |
| 	// Whether the stream has a video track
 | |
| 	hasVideo bool
 | |
| 
 | |
| 	collector session.Collector
 | |
| 	path      string
 | |
| 	reference string
 | |
| 
 | |
| 	publisher  *client
 | |
| 	subscriber map[string]*client
 | |
| 	lock       sync.RWMutex
 | |
| 
 | |
| 	isProxy bool
 | |
| }
 | |
| 
 | |
| func newChannel(conn connection, playPath, reference string, remote net.Addr, streams []av.CodecData, isProxy bool, collector session.Collector) *channel {
 | |
| 	ch := &channel{
 | |
| 		path:       playPath,
 | |
| 		reference:  reference,
 | |
| 		publisher:  newClient(conn, playPath, collector),
 | |
| 		subscriber: make(map[string]*client),
 | |
| 		collector:  collector,
 | |
| 		streams:    streams,
 | |
| 		queue:      pubsub.NewQueue(),
 | |
| 		isProxy:    isProxy,
 | |
| 	}
 | |
| 
 | |
| 	ch.queue.WriteHeader(streams)
 | |
| 
 | |
| 	addr := remote.String()
 | |
| 	ip, _, _ := net.SplitHostPort(addr)
 | |
| 
 | |
| 	if collector.IsCollectableIP(ip) {
 | |
| 		collector.RegisterAndActivate(ch.path, ch.reference, "publish:"+ch.path, addr)
 | |
| 	}
 | |
| 
 | |
| 	return ch
 | |
| }
 | |
| 
 | |
| func (ch *channel) Close() {
 | |
| 	if ch.publisher == nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	ch.publisher.Close()
 | |
| 	ch.publisher = nil
 | |
| 
 | |
| 	ch.queue.Close()
 | |
| }
 | |
| 
 | |
| func (ch *channel) AddSubscriber(conn *rtmp.Conn) string {
 | |
| 	addr := conn.NetConn().RemoteAddr().String()
 | |
| 	ip, _, _ := net.SplitHostPort(addr)
 | |
| 
 | |
| 	client := newClient(conn, addr, ch.collector)
 | |
| 
 | |
| 	if ch.collector.IsCollectableIP(ip) {
 | |
| 		ch.collector.RegisterAndActivate(addr, ch.reference, "play:"+conn.URL.Path, addr)
 | |
| 	}
 | |
| 
 | |
| 	ch.lock.Lock()
 | |
| 	ch.subscriber[addr] = client
 | |
| 	ch.lock.Unlock()
 | |
| 
 | |
| 	return addr
 | |
| }
 | |
| 
 | |
| func (ch *channel) RemoveSubscriber(id string) {
 | |
| 	ch.lock.Lock()
 | |
| 	defer ch.lock.Unlock()
 | |
| 
 | |
| 	client := ch.subscriber[id]
 | |
| 	if client != nil {
 | |
| 		delete(ch.subscriber, id)
 | |
| 		client.Close()
 | |
| 	}
 | |
| 
 | |
| 	// If this is a proxied channel and the last subscriber leaves,
 | |
| 	// close the channel.
 | |
| 	if len(ch.subscriber) == 0 && ch.isProxy {
 | |
| 		ch.Close()
 | |
| 	}
 | |
| }
 | 
