Add extra session info for SRT

This commit is contained in:
Ingo Oppermann
2023-09-21 16:06:27 +02:00
parent 8653ceeba9
commit d2016fff70
2 changed files with 16 additions and 6 deletions

View File

@@ -84,7 +84,7 @@ type channel struct {
isProxy bool isProxy bool
} }
func newChannel(conn srt.Conn, resource string, isProxy bool, collector session.Collector) *channel { func newChannel(conn srt.Conn, resource string, isProxy bool, identity string, collector session.Collector) *channel {
ch := &channel{ ch := &channel{
pubsub: srt.NewPubSub(srt.PubSubConfig{}), pubsub: srt.NewPubSub(srt.PubSubConfig{}),
path: resource, path: resource,
@@ -98,7 +98,12 @@ func newChannel(conn srt.Conn, resource string, isProxy bool, collector session.
ip, _, _ := net.SplitHostPort(addr) ip, _, _ := net.SplitHostPort(addr)
if collector.IsCollectableIP(ip) { if collector.IsCollectableIP(ip) {
collector.RegisterAndActivate(resource, resource, "publish:"+resource, addr) extra := map[string]interface{}{
"name": identity,
"method": "publish",
}
collector.RegisterAndActivate(resource, resource, resource, addr)
collector.Extra(resource, extra)
} }
return ch return ch
@@ -113,14 +118,19 @@ func (ch *channel) Close() {
ch.publisher = nil ch.publisher = nil
} }
func (ch *channel) AddSubscriber(conn srt.Conn, resource string) string { func (ch *channel) AddSubscriber(conn srt.Conn, resource, identity string) string {
addr := conn.RemoteAddr().String() addr := conn.RemoteAddr().String()
ip, _, _ := net.SplitHostPort(addr) ip, _, _ := net.SplitHostPort(addr)
client := newClient(conn, addr, ch.collector) client := newClient(conn, addr, ch.collector)
if ch.collector.IsCollectableIP(ip) { if ch.collector.IsCollectableIP(ip) {
ch.collector.RegisterAndActivate(addr, resource, "play:"+resource, addr) extra := map[string]interface{}{
"name": identity,
"method": "play",
}
ch.collector.RegisterAndActivate(addr, resource, resource, addr)
ch.collector.Extra(addr, extra)
} }
ch.lock.Lock() ch.lock.Lock()

View File

@@ -359,7 +359,7 @@ func (s *server) publish(conn srt.Conn, isProxy bool) error {
s.lock.Lock() s.lock.Lock()
ch := s.channels[si.Resource] ch := s.channels[si.Resource]
if ch == nil { if ch == nil {
ch = newChannel(conn, si.Resource, isProxy, s.collector) ch = newChannel(conn, si.Resource, isProxy, identity, s.collector)
s.channels[si.Resource] = ch s.channels[si.Resource] = ch
} else { } else {
ch = nil ch = nil
@@ -471,7 +471,7 @@ func (s *server) handleSubscribe(conn srt.Conn) {
if ch != nil { if ch != nil {
s.log(identity, "PLAY", "START", si.Resource, "", client) s.log(identity, "PLAY", "START", si.Resource, "", client)
id := ch.AddSubscriber(conn, si.Resource) id := ch.AddSubscriber(conn, si.Resource, identity)
// Blocks until connection closes // Blocks until connection closes
err := ch.pubsub.Subscribe(conn) err := ch.pubsub.Subscribe(conn)