From 4438376b200a60a16b1f3c05ea795e2c2c4dd8bd Mon Sep 17 00:00:00 2001 From: dexter <178529795@qq.com> Date: Wed, 30 Nov 2022 13:18:21 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=93=A6=20NEW:=20=E6=96=B0=E5=A2=9ESubscri?= =?UTF-8?q?beExist=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- plugin.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/plugin.go b/plugin.go index 6adf6d5..495cd7a 100644 --- a/plugin.go +++ b/plugin.go @@ -213,6 +213,24 @@ func (opt *Plugin) Publish(streamPath string, pub IPublisher) error { return pub.receive(streamPath, pub, conf.GetPublishConfig()) } +var ErrStreamNotExist = errors.New("stream not exist") + +// SubscribeExist 订阅已经存在的流 +func (opt *Plugin) SubscribeExist(streamPath string, sub ISubscriber) error { + opt.Info("subscribe exit", zap.String("path", streamPath)) + path, _, _ := strings.Cut(streamPath, "?") + if !Streams.Has(path) { + opt.Warn("stream not exist", zap.String("path", streamPath)) + return ErrStreamNotExist + } + conf, ok := opt.Config.(config.SubscribeConfig) + if !ok { + conf = EngineConfig + } + return sub.receive(streamPath, sub, conf.GetSubscribeConfig()) +} + +// Subscribe 订阅一个流,如果流不存在则创建一个等待流 func (opt *Plugin) Subscribe(streamPath string, sub ISubscriber) error { opt.Info("subscribe", zap.String("path", streamPath)) conf, ok := opt.Config.(config.SubscribeConfig) @@ -222,6 +240,7 @@ func (opt *Plugin) Subscribe(streamPath string, sub ISubscriber) error { return sub.receive(streamPath, sub, conf.GetSubscribeConfig()) } +// SubscribeBlock 阻塞订阅一个流,直到订阅结束 func (opt *Plugin) SubscribeBlock(streamPath string, sub ISubscriber, t byte) (err error) { if err = opt.Subscribe(streamPath, sub); err == nil { sub.PlayBlock(t)