From fe244e5cea1ff1e8e3815833e8973f676b0f3468 Mon Sep 17 00:00:00 2001 From: dexter <178529795@qq.com> Date: Thu, 17 Nov 2022 23:46:29 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=91=8C=20IMPROVE:=20=E6=89=93=E5=8D=B0?= =?UTF-8?q?=E5=85=A8=E5=B1=80API=EF=BC=8C=E5=8F=91=E5=B8=83=E9=A6=96?= =?UTF-8?q?=E6=AC=A1=E8=B6=85=E6=97=B6=E6=97=B6=E9=97=B4=E9=9A=8FPublishTi?= =?UTF-8?q?meout=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- io.go | 31 +++++++++++++++++++++++++------ plugin.go | 14 ++++++++------ stream.go | 32 ++++++++++++++++---------------- 3 files changed, 49 insertions(+), 28 deletions(-) diff --git a/io.go b/io.go index 48b1056..33366c0 100644 --- a/io.go +++ b/io.go @@ -60,6 +60,25 @@ func (i *IO[C]) SetParentCtx(parent context.Context) { i.Context, i.CancelFunc = context.WithCancel(parent) } +func (i *IO[C]) SetStuff(stuffs ...any) { + for _, stuff := range stuffs { + switch v := stuff.(type) { + case context.Context: + i.Context, i.CancelFunc = context.WithCancel(v) + default: + if v, ok := v.(io.Closer); ok { + i.Closer = v + } + if v, ok := v.(io.Reader); ok { + i.Reader = v + } + if v, ok := v.(io.Writer); ok { + i.Writer = v + } + } + } +} + func (i *IO[C]) OnEvent(event any) { switch event.(type) { case SEclose, SEKick: @@ -91,15 +110,15 @@ type IIO interface { GetStream() *Stream } -//Stop 停止订阅或者发布,由订阅者或者发布者调用 +// Stop 停止订阅或者发布,由订阅者或者发布者调用 func (io *IO[C]) Stop() { if io.CancelFunc != nil { io.CancelFunc() } } -var BadNameErr = errors.New("Bad Name") -var StreamIsClosedErr = errors.New("Stream Is Closed") +var ErrBadName = errors.New("Stream Already Exist") +var ErrStreamIsClosed = errors.New("Stream Is Closed") // receive 用于接收发布或者订阅 func (io *IO[C]) receive(streamPath string, specific IIO, conf *C) error { @@ -122,7 +141,7 @@ func (io *IO[C]) receive(streamPath string, specific IIO, conf *C) error { s, create := findOrCreateStream(u.Path, wt) Streams.Unlock() if s == nil { - return BadNameErr + return ErrBadName } io.Config = conf if io.Type == "" { @@ -137,7 +156,7 @@ func (io *IO[C]) receive(streamPath string, specific IIO, conf *C) error { s.Warn("kick", zap.String("type", oldPublisher.GetIO().Type)) oldPublisher.OnEvent(SEKick{}) } else { - return BadNameErr + return ErrBadName } } s.PublishTimeout = util.Second2Duration(v.PublishTimeout) @@ -169,7 +188,7 @@ func (io *IO[C]) receive(streamPath string, specific IIO, conf *C) error { return promise.Catch() } } - return StreamIsClosedErr + return ErrStreamIsClosed } // ClientIO 作为Client角色(Puller,Pusher)的公共结构体 diff --git a/plugin.go b/plugin.go index f257819..6adf6d5 100644 --- a/plugin.go +++ b/plugin.go @@ -85,9 +85,9 @@ func (opt *Plugin) handleFunc(pattern string, handler func(http.ResponseWriter, if opt != Engine { pattern = "/" + strings.ToLower(opt.Name) + pattern opt.Debug("http handle added to engine:" + pattern) - apiList = append(apiList, pattern) EngineConfig.HandleFunc(pattern, opt.logHandler(pattern, handler)) } + apiList = append(apiList, pattern) } // 读取独立配置合并入总配置中 @@ -229,7 +229,7 @@ func (opt *Plugin) SubscribeBlock(streamPath string, sub ISubscriber, t byte) (e return } -var NoPullConfigErr = errors.New("no pull config") +var ErrNoPullConfig = errors.New("no pull config") var Pullers sync.Map func (opt *Plugin) Pull(streamPath string, url string, puller IPuller, save bool) (err error) { @@ -241,7 +241,7 @@ func (opt *Plugin) Pull(streamPath string, url string, puller IPuller, save bool }() conf, ok := opt.Config.(config.PullConfig) if !ok { - return NoPullConfigErr + return ErrNoPullConfig } pullConf := conf.GetPullConfig() @@ -266,7 +266,9 @@ func (opt *Plugin) Pull(streamPath string, url string, puller IPuller, save bool return } if err = opt.Publish(streamPath, puller); err != nil { - if Streams.Get(streamPath).Publisher != puller { + if puber := Streams.Get(streamPath).Publisher; puber != puller { + io := puber.GetIO() + opt.Warn("puller is not publisher", zap.String("ID", io.ID), zap.String("Type", io.Type), zap.Error(err)) return } } @@ -289,7 +291,7 @@ func (opt *Plugin) Pull(streamPath string, url string, puller IPuller, save bool return } -var NoPushConfigErr = errors.New("no push config") +var ErrNoPushConfig = errors.New("no push config") var Pushers sync.Map func (opt *Plugin) Push(streamPath string, url string, pusher IPusher, save bool) (err error) { @@ -302,7 +304,7 @@ func (opt *Plugin) Push(streamPath string, url string, pusher IPusher, save bool }() conf, ok := opt.Config.(config.PushConfig) if !ok { - return NoPushConfigErr + return ErrNoPushConfig } pushConfig := conf.GetPushConfig() diff --git a/stream.go b/stream.go index 3ed4bc4..a9a4f0e 100644 --- a/stream.go +++ b/stream.go @@ -265,7 +265,7 @@ func (r *Stream) action(action StreamAction) (ok bool) { case STATE_PUBLISHING: stateEvent = SEpublish{event} r.broadcast(stateEvent) - r.timeout.Reset(time.Second * 10) // 5秒心跳,检测track的存活度 + r.timeout.Reset(r.PublishTimeout) // 5秒心跳,检测track的存活度 case STATE_WAITCLOSE: stateEvent = SEwaitClose{event} r.timeout.Reset(r.DelayCloseTimeout) @@ -456,11 +456,11 @@ func (s *Stream) run() { if s.Publisher == v.Value { s.Publisher = nil } - v.Reject(BadNameErr) + v.Reject(ErrBadName) } case *util.Promise[ISubscriber, struct{}]: if s.IsClosed() { - v.Reject(StreamIsClosedErr) + v.Reject(ErrStreamIsClosed) } suber := v.Value io := suber.GetIO() @@ -512,6 +512,18 @@ func (s *Stream) run() { if len(s.Subscribers) == 1 { s.action(ACTION_FIRSTENTER) } + case TrackRemoved: + name := v.GetBase().Name + if t, ok := s.Tracks.Delete(name); ok { + s.Info("track -1", zap.String("name", name)) + s.broadcast(v) + if s.Tracks.Len() == 0 { + s.action(ACTION_PUBLISHLOST) + } + if dt, ok := t.(*track.Data); ok { + dt.Dispose() + } + } case Track: name := v.GetBase().Name if s.Tracks.Add(name, v) { @@ -526,18 +538,6 @@ func (s *Stream) run() { } } } - case TrackRemoved: - name := v.GetBase().Name - if t, ok := s.Tracks.Delete(name); ok { - s.Info("track -1", zap.String("name", name)) - s.broadcast(v) - if s.Tracks.Len() == 0 { - s.action(ACTION_PUBLISHLOST) - } - if dt, ok := t.(*track.Data); ok { - dt.Dispose() - } - } case StreamAction: s.action(v) default: @@ -545,7 +545,7 @@ func (s *Stream) run() { } } else { for sub, w := range waitP { - w.Reject(StreamIsClosedErr) + w.Reject(ErrStreamIsClosed) delete(waitP, sub) } s.Tracks.Range(func(_ string, t Track) {