diff --git a/pusher.go b/pusher.go index 5380f52..d449848 100644 --- a/pusher.go +++ b/pusher.go @@ -7,6 +7,7 @@ import ( "go.uber.org/zap" "m7s.live/engine/v4/config" ) + type IPusher interface { ISubscriber Push() error @@ -31,8 +32,15 @@ func (pub *Pusher) Reconnect() (result bool) { func (pub *Pusher) startPush(pusher IPusher) { badPusher := true var err error - Pushers.Store(pub.RemoteURL, pusher) - defer Pushers.Delete(pub.RemoteURL) + key := pub.RemoteURL + + if oldPusher, loaded := Pushers.LoadOrStore(key, pusher); loaded { + sub := oldPusher.(IPusher).GetSubscriber() + pusher.Error("pusher already exists", zap.Time("createAt", sub.StartTime)) + return + } + + defer Pushers.Delete(key) defer pusher.Disconnect() var startTime time.Time for pusher.Info("start push"); pusher.Reconnect(); pusher.Warn("restart push") {