diff --git a/srt/channel.go b/srt/channel.go index 1f7fc3a6..76c49432 100644 --- a/srt/channel.go +++ b/srt/channel.go @@ -84,7 +84,7 @@ type channel struct { isProxy bool } -func newChannel(conn srt.Conn, resource string, isProxy bool, collector session.Collector) *channel { +func newChannel(conn srt.Conn, resource string, isProxy bool, identity string, collector session.Collector) *channel { ch := &channel{ pubsub: srt.NewPubSub(srt.PubSubConfig{}), path: resource, @@ -98,7 +98,12 @@ func newChannel(conn srt.Conn, resource string, isProxy bool, collector session. ip, _, _ := net.SplitHostPort(addr) if collector.IsCollectableIP(ip) { - collector.RegisterAndActivate(resource, resource, "publish:"+resource, addr) + extra := map[string]interface{}{ + "name": identity, + "method": "publish", + } + collector.RegisterAndActivate(resource, resource, resource, addr) + collector.Extra(resource, extra) } return ch @@ -113,14 +118,19 @@ func (ch *channel) Close() { ch.publisher = nil } -func (ch *channel) AddSubscriber(conn srt.Conn, resource string) string { +func (ch *channel) AddSubscriber(conn srt.Conn, resource, identity string) string { addr := conn.RemoteAddr().String() ip, _, _ := net.SplitHostPort(addr) client := newClient(conn, addr, ch.collector) if ch.collector.IsCollectableIP(ip) { - ch.collector.RegisterAndActivate(addr, resource, "play:"+resource, addr) + extra := map[string]interface{}{ + "name": identity, + "method": "play", + } + ch.collector.RegisterAndActivate(addr, resource, resource, addr) + ch.collector.Extra(addr, extra) } ch.lock.Lock() diff --git a/srt/srt.go b/srt/srt.go index 89b483cb..6240fec5 100644 --- a/srt/srt.go +++ b/srt/srt.go @@ -359,7 +359,7 @@ func (s *server) publish(conn srt.Conn, isProxy bool) error { s.lock.Lock() ch := s.channels[si.Resource] if ch == nil { - ch = newChannel(conn, si.Resource, isProxy, s.collector) + ch = newChannel(conn, si.Resource, isProxy, identity, s.collector) s.channels[si.Resource] = ch } else { ch = nil @@ -471,7 +471,7 @@ func (s *server) handleSubscribe(conn srt.Conn) { if ch != nil { s.log(identity, "PLAY", "START", si.Resource, "", client) - id := ch.AddSubscriber(conn, si.Resource) + id := ch.AddSubscriber(conn, si.Resource, identity) // Blocks until connection closes err := ch.pubsub.Subscribe(conn)