diff --git a/pkg/log.go b/pkg/log.go index b62924f..c578cca 100644 --- a/pkg/log.go +++ b/pkg/log.go @@ -3,8 +3,10 @@ package pkg import ( "context" "log/slog" - "m7s.live/pro/pkg/task" "slices" + "sync" + + "m7s.live/pro/pkg/task" ) var _ slog.Handler = (*MultiLogHandler)(nil) @@ -21,16 +23,18 @@ func ParseLevel(level string) slog.Level { type MultiLogHandler struct { handlers []slog.Handler - attrChildren map[*MultiLogHandler][]slog.Attr + attrChildren sync.Map parentLevel *slog.Level level *slog.Level } func (m *MultiLogHandler) Add(h slog.Handler) { m.handlers = append(m.handlers, h) - for child, attrs := range m.attrChildren { - child.Add(h.WithAttrs(attrs)) - } + m.attrChildren.Range(func(key, value any) bool { + child := key.(*MultiLogHandler) + child.Add(h.WithAttrs(value.([]slog.Attr))) + return true + }) } func (m *MultiLogHandler) Remove(h slog.Handler) { @@ -71,10 +75,7 @@ func (m *MultiLogHandler) WithAttrs(attrs []slog.Attr) slog.Handler { handlers: make([]slog.Handler, len(m.handlers)), parentLevel: m.parentLevel, } - if m.attrChildren == nil { - m.attrChildren = make(map[*MultiLogHandler][]slog.Attr) - } - m.attrChildren[result] = attrs + m.attrChildren.Store(result, attrs) if m.level != nil { result.parentLevel = m.level } diff --git a/publisher.go b/publisher.go index b0890c8..37f917f 100644 --- a/publisher.go +++ b/publisher.go @@ -589,24 +589,27 @@ func (p *Publisher) Dispose() { if p.Paused != nil { p.Paused.Reject(p.StopReason()) } + var relatedAlias []*AliasStream for alias := range s.AliasStreams.Range { if alias.StreamPath == p.StreamPath { if alias.AutoRemove { - s.AliasStreams.Remove(alias) - } - for subscriber := range p.SubscriberRange { - if subscriber.StreamPath == alias.Alias { - if originStream, ok := s.Streams.Get(alias.Alias); ok { - p.Subscribers.Remove(subscriber) - originStream.AddSubscriber(subscriber) - } - } + defer s.AliasStreams.Remove(alias) } + relatedAlias = append(relatedAlias, alias) } } if p.Subscribers.Length > 0 { + SUBSCRIBER: for subscriber := range p.SubscriberRange { + for _, alias := range relatedAlias { + if subscriber.StreamPath == alias.Alias { + if originStream, ok := s.Streams.Get(alias.Alias); ok { + originStream.AddSubscriber(subscriber) + continue SUBSCRIBER + } + } + } s.Waiting.Wait(subscriber) } p.Subscribers.Clear()