feat: add stop subscribe api, show reasons for subscriber closure

This commit is contained in:
langhuihui
2023-08-06 14:16:06 +08:00
parent 9a352bcbad
commit 1a347b5a0b
24 changed files with 238 additions and 188 deletions

104
plugin.go
View File

@@ -294,7 +294,9 @@ func (opt *Plugin) Subscribe(streamPath string, sub ISubscriber) error {
copyConfig := *conf.GetSubscribeConfig()
suber.Config = &copyConfig
}
suber.ID = fmt.Sprintf("%s_%d", suber.ID, uintptr(unsafe.Pointer(suber)))
if suber.ID == "" {
suber.ID = fmt.Sprintf("%d", uintptr(unsafe.Pointer(suber)))
}
return sub.Subscribe(streamPath, sub)
}
@@ -310,60 +312,66 @@ var ErrNoPullConfig = errors.New("no pull config")
var Pullers sync.Map
func (opt *Plugin) Pull(streamPath string, url string, puller IPuller, save int) (err error) {
zurl := zap.String("url", url)
zpath := zap.String("path", streamPath)
opt.Info("pull", zpath, zurl)
defer func() {
if err != nil {
opt.Error("pull failed", zurl, zap.Error(err))
}
}()
conf, ok := opt.Config.(config.PullConfig)
if !ok {
return ErrNoPullConfig
}
pullConf := conf.GetPullConfig()
puller.init(streamPath, url, pullConf)
puller.SetLogger(opt.Logger.With(zpath, zurl))
go func() {
Pullers.Store(puller, url)
defer Pullers.Delete(puller)
for opt.Info("start pull", zurl); puller.Reconnect(); opt.Warn("restart pull", zurl) {
if err = puller.Connect(); err != nil {
if err == io.EOF {
puller.GetPublisher().Stream.Close()
opt.Info("pull complete", zurl)
return
}
opt.Error("pull connect", zurl, zap.Error(err))
time.Sleep(time.Second * 5)
} else {
if err = opt.Publish(streamPath, puller); err != nil {
if stream := Streams.Get(streamPath); stream != nil {
if stream.Publisher != puller && stream.Publisher != nil {
io := stream.Publisher.GetPublisher()
opt.Error("puller is not publisher", zap.String("ID", io.ID), zap.String("Type", io.Type), zap.Error(err))
return
} else {
opt.Warn("pull publish", zurl, zap.Error(err))
}
} else {
opt.Error("pull publish", zurl, zap.Error(err))
if save < 2 {
zurl := zap.String("url", url)
zpath := zap.String("path", streamPath)
opt.Info("pull", zpath, zurl)
defer func() {
if err != nil {
opt.Error("pull failed", zurl, zap.Error(err))
}
}()
puller.init(streamPath, url, pullConf)
puller.SetLogger(opt.Logger.With(zpath, zurl))
badPuller := true
go func() {
Pullers.Store(puller, url)
defer Pullers.Delete(puller)
for opt.Info("start pull", zurl); puller.Reconnect(); opt.Warn("restart pull", zurl) {
if err = puller.Connect(); err != nil {
if err == io.EOF {
puller.GetPublisher().Stream.Close()
opt.Info("pull complete", zurl)
return
}
opt.Error("pull connect", zurl, zap.Error(err))
if badPuller {
return
}
time.Sleep(time.Second * 5)
} else {
if err = opt.Publish(streamPath, puller); err != nil {
if stream := Streams.Get(streamPath); stream != nil {
if stream.Publisher != puller && stream.Publisher != nil {
io := stream.Publisher.GetPublisher()
opt.Error("puller is not publisher", zap.String("ID", io.ID), zap.String("Type", io.Type), zap.Error(err))
return
} else {
opt.Warn("pull publish", zurl, zap.Error(err))
}
} else {
opt.Error("pull publish", zurl, zap.Error(err))
return
}
}
badPuller = false
if err = puller.Pull(); err != nil && !puller.IsShutdown() {
opt.Error("pull", zurl, zap.Error(err))
}
}
if err = puller.Pull(); err != nil && !puller.IsShutdown() {
opt.Error("pull", zurl, zap.Error(err))
if puller.IsShutdown() {
opt.Info("stop pull shutdown", zurl)
return
}
}
if puller.IsShutdown() {
opt.Info("stop pull shutdown", zurl)
return
}
}
opt.Warn("stop pull stop reconnect", zurl)
}()
opt.Warn("stop pull stop reconnect", zurl)
}()
}
switch save {
case 1:
pullConf.AddPullOnStart(streamPath, url)
@@ -400,7 +408,7 @@ func (opt *Plugin) Push(streamPath string, url string, pusher IPusher, save bool
pushConfig := conf.GetPushConfig()
pusher.init(streamPath, url, pushConfig)
badPusher := true
go func() {
Pushers.Store(url, pusher)
defer Pushers.Delete(url)
@@ -418,10 +426,14 @@ func (opt *Plugin) Push(streamPath string, url string, pusher IPusher, save bool
opt.Error("push connect", zp, zu, zap.Error(err))
time.Sleep(time.Second * 5)
stream.Receive(pusher) // 通知stream移除订阅者
if badPusher {
return
}
} else if err = pusher.Push(); err != nil && !stream.IsClosed() {
opt.Error("push", zp, zu, zap.Error(err))
pusher.Stop()
}
badPusher = false
if stream.IsClosed() {
opt.Info("stop push closed", zp, zu)
return