重构订阅者集合,减少轨道等待

This commit is contained in:
dexter
2022-12-29 17:51:59 +08:00
parent 99126c44f3
commit 95fff02162
12 changed files with 205 additions and 178 deletions

View File

@@ -273,8 +273,13 @@ func (opt *Plugin) Pull(streamPath string, url string, puller IPuller, save bool
go func() {
Pullers.Store(puller, url)
defer Pullers.Delete(puller)
for opt.Info("start pull", zurl); ; opt.Warn("restart pull", zurl) {
for opt.Info("start pull", zurl); puller.Reconnect(); opt.Warn("restart pull", zurl) {
if err = puller.Connect(); err != nil {
if err == io.EOF {
puller.GetPublisher().Stream.Close()
opt.Info("pull complete", zurl)
return
}
opt.Error("pull connect", zurl, zap.Error(err))
time.Sleep(time.Second * 5)
} else {
@@ -286,18 +291,16 @@ func (opt *Plugin) Pull(streamPath string, url string, puller IPuller, save bool
}
opt.Error("pull publish", zurl, zap.Error(err))
}
if err = puller.Pull(); err != nil {
if err = puller.Pull(); err != nil && !puller.IsShutdown() {
opt.Error("pull", zurl, zap.Error(err))
}
}
if !puller.Reconnect() {
opt.Warn("stop pull stop reconnect", zurl)
break
} else if puller.IsShutdown() {
opt.Warn("stop pull shutdown", zurl)
break
if puller.IsShutdown() {
opt.Info("stop pull shutdown", zurl)
return
}
}
opt.Warn("stop pull stop reconnect", zurl)
}()
if save {
@@ -335,7 +338,7 @@ func (opt *Plugin) Push(streamPath string, url string, pusher IPusher, save bool
go func() {
Pushers.Store(url, pusher)
defer Pushers.Delete(url)
for opt.Info("start push", zp, zu); ; opt.Warn("restart push", zp, zu) {
for opt.Info("start push", zp, zu); pusher.Reconnect(); opt.Warn("restart push", zp, zu) {
if err = opt.Subscribe(streamPath, pusher); err != nil {
opt.Error("push subscribe", zp, zu, zap.Error(err))
time.Sleep(time.Second * 5)
@@ -344,19 +347,17 @@ func (opt *Plugin) Push(streamPath string, url string, pusher IPusher, save bool
opt.Error("push connect", zp, zu, zap.Error(err))
time.Sleep(time.Second * 5)
} else {
if err = pusher.Push(); err != nil {
if err = pusher.Push(); err != nil && !pusher.IsShutdown() {
opt.Error("push", zp, zu, zap.Error(err))
}
}
}
if !pusher.Reconnect() {
opt.Warn("stop push stop reconnect", zp, zu)
break
} else if pusher.IsShutdown() {
opt.Warn("stop push shutdown", zp, zu)
break
if pusher.IsShutdown() {
opt.Info("stop push shutdown", zp, zu)
return
}
}
opt.Warn("stop push stop reconnect", zp, zu)
}()
if save {