diff --git a/rtmp/channel.go b/rtmp/channel.go index e471c1ca..fa34448e 100644 --- a/rtmp/channel.go +++ b/rtmp/channel.go @@ -94,7 +94,7 @@ type channel struct { isProxy bool } -func newChannel(conn connection, playPath, reference string, remote net.Addr, streams []av.CodecData, isProxy bool, collector session.Collector) *channel { +func newChannel(conn connection, playPath, reference string, remote net.Addr, streams []av.CodecData, isProxy bool, identity string, collector session.Collector) *channel { ch := &channel{ path: playPath, reference: reference, @@ -112,7 +112,12 @@ func newChannel(conn connection, playPath, reference string, remote net.Addr, st ip, _, _ := net.SplitHostPort(addr) if collector.IsCollectableIP(ip) { - collector.RegisterAndActivate(ch.path, ch.reference, "publish:"+ch.path, addr) + extra := map[string]interface{}{ + "name": identity, + "method": "publish", + } + collector.RegisterAndActivate(ch.path, ch.reference, ch.path, addr) + collector.Extra(ch.path, extra) } return ch @@ -129,14 +134,19 @@ func (ch *channel) Close() { ch.queue.Close() } -func (ch *channel) AddSubscriber(conn *rtmp.Conn) string { +func (ch *channel) AddSubscriber(conn *rtmp.Conn, playPath, identity string) string { addr := conn.NetConn().RemoteAddr().String() ip, _, _ := net.SplitHostPort(addr) client := newClient(conn, addr, ch.collector) if ch.collector.IsCollectableIP(ip) { - ch.collector.RegisterAndActivate(addr, ch.reference, "play:"+conn.URL.Path, addr) + extra := map[string]interface{}{ + "name": identity, + "method": "play", + } + ch.collector.RegisterAndActivate(addr, ch.reference, playPath, addr) + ch.collector.Extra(addr, extra) } ch.lock.Lock() diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index 1369af2e..887597bd 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -346,7 +346,7 @@ func (s *server) handlePlay(conn *rtmp.Conn) { // Send the metadata to the client conn.WriteHeader(ch.streams) - s.log(identity, "PLAY", "START", conn.URL.Path, "", remote) + s.log(identity, "PLAY", "START", playpath, "", remote) // Get a cursor and apply filters cursor := ch.queue.Oldest() @@ -366,7 +366,7 @@ func (s *server) handlePlay(conn *rtmp.Conn) { Demuxer: cursor, } - id := ch.AddSubscriber(conn) + id := ch.AddSubscriber(conn, playpath, identity) // Transfer the data, blocks until done avutil.CopyFile(conn, demuxer) @@ -431,7 +431,7 @@ func (s *server) publish(src connection, playpath string, remote net.Addr, ident reference := strings.TrimPrefix(strings.TrimSuffix(playpath, filepath.Ext(playpath)), s.app+"/") // Create a new channel - ch = newChannel(src, playpath, reference, remote, streams, isProxy, s.collector) + ch = newChannel(src, playpath, reference, remote, streams, isProxy, identity, s.collector) for _, stream := range streams { typ := stream.Type()