fix: panic

This commit is contained in:
langhuihui
2024-11-11 12:44:34 +08:00
parent c92547dd21
commit ef6e967a77
2 changed files with 22 additions and 18 deletions

View File

@@ -3,8 +3,10 @@ package pkg
import (
"context"
"log/slog"
"m7s.live/pro/pkg/task"
"slices"
"sync"
"m7s.live/pro/pkg/task"
)
var _ slog.Handler = (*MultiLogHandler)(nil)
@@ -21,16 +23,18 @@ func ParseLevel(level string) slog.Level {
type MultiLogHandler struct {
handlers []slog.Handler
attrChildren map[*MultiLogHandler][]slog.Attr
attrChildren sync.Map
parentLevel *slog.Level
level *slog.Level
}
func (m *MultiLogHandler) Add(h slog.Handler) {
m.handlers = append(m.handlers, h)
for child, attrs := range m.attrChildren {
child.Add(h.WithAttrs(attrs))
}
m.attrChildren.Range(func(key, value any) bool {
child := key.(*MultiLogHandler)
child.Add(h.WithAttrs(value.([]slog.Attr)))
return true
})
}
func (m *MultiLogHandler) Remove(h slog.Handler) {
@@ -71,10 +75,7 @@ func (m *MultiLogHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
handlers: make([]slog.Handler, len(m.handlers)),
parentLevel: m.parentLevel,
}
if m.attrChildren == nil {
m.attrChildren = make(map[*MultiLogHandler][]slog.Attr)
}
m.attrChildren[result] = attrs
m.attrChildren.Store(result, attrs)
if m.level != nil {
result.parentLevel = m.level
}

View File

@@ -589,24 +589,27 @@ func (p *Publisher) Dispose() {
if p.Paused != nil {
p.Paused.Reject(p.StopReason())
}
var relatedAlias []*AliasStream
for alias := range s.AliasStreams.Range {
if alias.StreamPath == p.StreamPath {
if alias.AutoRemove {
s.AliasStreams.Remove(alias)
}
for subscriber := range p.SubscriberRange {
if subscriber.StreamPath == alias.Alias {
if originStream, ok := s.Streams.Get(alias.Alias); ok {
p.Subscribers.Remove(subscriber)
originStream.AddSubscriber(subscriber)
}
}
defer s.AliasStreams.Remove(alias)
}
relatedAlias = append(relatedAlias, alias)
}
}
if p.Subscribers.Length > 0 {
SUBSCRIBER:
for subscriber := range p.SubscriberRange {
for _, alias := range relatedAlias {
if subscriber.StreamPath == alias.Alias {
if originStream, ok := s.Streams.Get(alias.Alias); ok {
originStream.AddSubscriber(subscriber)
continue SUBSCRIBER
}
}
}
s.Waiting.Wait(subscriber)
}
p.Subscribers.Clear()