diff --git a/plugin.go b/plugin.go index 6adf6d5..495cd7a 100644 --- a/plugin.go +++ b/plugin.go @@ -213,6 +213,24 @@ func (opt *Plugin) Publish(streamPath string, pub IPublisher) error { return pub.receive(streamPath, pub, conf.GetPublishConfig()) } +var ErrStreamNotExist = errors.New("stream not exist") + +// SubscribeExist 订阅已经存在的流 +func (opt *Plugin) SubscribeExist(streamPath string, sub ISubscriber) error { + opt.Info("subscribe exit", zap.String("path", streamPath)) + path, _, _ := strings.Cut(streamPath, "?") + if !Streams.Has(path) { + opt.Warn("stream not exist", zap.String("path", streamPath)) + return ErrStreamNotExist + } + conf, ok := opt.Config.(config.SubscribeConfig) + if !ok { + conf = EngineConfig + } + return sub.receive(streamPath, sub, conf.GetSubscribeConfig()) +} + +// Subscribe 订阅一个流,如果流不存在则创建一个等待流 func (opt *Plugin) Subscribe(streamPath string, sub ISubscriber) error { opt.Info("subscribe", zap.String("path", streamPath)) conf, ok := opt.Config.(config.SubscribeConfig) @@ -222,6 +240,7 @@ func (opt *Plugin) Subscribe(streamPath string, sub ISubscriber) error { return sub.receive(streamPath, sub, conf.GetSubscribeConfig()) } +// SubscribeBlock 阻塞订阅一个流,直到订阅结束 func (opt *Plugin) SubscribeBlock(streamPath string, sub ISubscriber, t byte) (err error) { if err = opt.Subscribe(streamPath, sub); err == nil { sub.PlayBlock(t)