Add extra session info for RTMP

This commit is contained in:
Ingo Oppermann
2023-09-21 15:50:35 +02:00
parent c7979efb07
commit 8653ceeba9
2 changed files with 17 additions and 7 deletions

View File

@@ -94,7 +94,7 @@ type channel struct {
isProxy bool isProxy bool
} }
func newChannel(conn connection, playPath, reference string, remote net.Addr, streams []av.CodecData, isProxy bool, collector session.Collector) *channel { func newChannel(conn connection, playPath, reference string, remote net.Addr, streams []av.CodecData, isProxy bool, identity string, collector session.Collector) *channel {
ch := &channel{ ch := &channel{
path: playPath, path: playPath,
reference: reference, reference: reference,
@@ -112,7 +112,12 @@ func newChannel(conn connection, playPath, reference string, remote net.Addr, st
ip, _, _ := net.SplitHostPort(addr) ip, _, _ := net.SplitHostPort(addr)
if collector.IsCollectableIP(ip) { if collector.IsCollectableIP(ip) {
collector.RegisterAndActivate(ch.path, ch.reference, "publish:"+ch.path, addr) extra := map[string]interface{}{
"name": identity,
"method": "publish",
}
collector.RegisterAndActivate(ch.path, ch.reference, ch.path, addr)
collector.Extra(ch.path, extra)
} }
return ch return ch
@@ -129,14 +134,19 @@ func (ch *channel) Close() {
ch.queue.Close() ch.queue.Close()
} }
func (ch *channel) AddSubscriber(conn *rtmp.Conn) string { func (ch *channel) AddSubscriber(conn *rtmp.Conn, playPath, identity string) string {
addr := conn.NetConn().RemoteAddr().String() addr := conn.NetConn().RemoteAddr().String()
ip, _, _ := net.SplitHostPort(addr) ip, _, _ := net.SplitHostPort(addr)
client := newClient(conn, addr, ch.collector) client := newClient(conn, addr, ch.collector)
if ch.collector.IsCollectableIP(ip) { if ch.collector.IsCollectableIP(ip) {
ch.collector.RegisterAndActivate(addr, ch.reference, "play:"+conn.URL.Path, addr) extra := map[string]interface{}{
"name": identity,
"method": "play",
}
ch.collector.RegisterAndActivate(addr, ch.reference, playPath, addr)
ch.collector.Extra(addr, extra)
} }
ch.lock.Lock() ch.lock.Lock()

View File

@@ -346,7 +346,7 @@ func (s *server) handlePlay(conn *rtmp.Conn) {
// Send the metadata to the client // Send the metadata to the client
conn.WriteHeader(ch.streams) conn.WriteHeader(ch.streams)
s.log(identity, "PLAY", "START", conn.URL.Path, "", remote) s.log(identity, "PLAY", "START", playpath, "", remote)
// Get a cursor and apply filters // Get a cursor and apply filters
cursor := ch.queue.Oldest() cursor := ch.queue.Oldest()
@@ -366,7 +366,7 @@ func (s *server) handlePlay(conn *rtmp.Conn) {
Demuxer: cursor, Demuxer: cursor,
} }
id := ch.AddSubscriber(conn) id := ch.AddSubscriber(conn, playpath, identity)
// Transfer the data, blocks until done // Transfer the data, blocks until done
avutil.CopyFile(conn, demuxer) avutil.CopyFile(conn, demuxer)
@@ -431,7 +431,7 @@ func (s *server) publish(src connection, playpath string, remote net.Addr, ident
reference := strings.TrimPrefix(strings.TrimSuffix(playpath, filepath.Ext(playpath)), s.app+"/") reference := strings.TrimPrefix(strings.TrimSuffix(playpath, filepath.Ext(playpath)), s.app+"/")
// Create a new channel // Create a new channel
ch = newChannel(src, playpath, reference, remote, streams, isProxy, s.collector) ch = newChannel(src, playpath, reference, remote, streams, isProxy, identity, s.collector)
for _, stream := range streams { for _, stream := range streams {
typ := stream.Type() typ := stream.Type()