From 00128ce20a78a40f0a01423debafba73a2b9bb6f Mon Sep 17 00:00:00 2001 From: langhuihui <178529795@qq.com> Date: Sun, 15 Mar 2020 19:57:57 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=A0=E5=85=A5=E6=98=AF=E5=90=A6=E5=85=81?= =?UTF-8?q?=E8=AE=B8=E7=A9=BA=E6=88=BF=E9=97=B4=E7=9A=84=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.go | 4 ++++ go.mod | 1 + go.sum | 2 ++ hook.go | 26 ++++++++++++++++++++++++++ index.go => main.go | 6 ++++++ room.go | 2 ++ subscriber.go | 6 ++++++ 7 files changed, 47 insertions(+) rename index.go => main.go (90%) diff --git a/config.go b/config.go index d2ac115..924d445 100644 --- a/config.go +++ b/config.go @@ -32,3 +32,7 @@ func InstallPlugin(opt *PluginConfig) { type ListenerConfig struct { ListenAddr string } + +var config = &struct { + EnableWaitRoom bool +}{true} diff --git a/go.mod b/go.mod index 5e354f2..416a3e8 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478 github.com/go-ole/go-ole v1.2.4 // indirect + github.com/pkg/errors v0.9.1 github.com/shirou/gopsutil v2.20.1+incompatible github.com/stretchr/testify v1.5.1 // indirect golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae // indirect diff --git a/go.sum b/go.sum index 61cfe30..9b82b4d 100644 --- a/go.sum +++ b/go.sum @@ -16,6 +16,8 @@ github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6Wezm github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM= github.com/langhuihui/monibuca v0.4.1 h1:hR5xiVtYJM272ChQUrKdNd+AQyY98SNxVZEx2WAuNmA= github.com/langhuihui/monibuca v0.4.1/go.mod h1:S4rqYUQ+bCB3WdwuXTJ92FqVRZz5Sh7zAXOJc94JqMI= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/quangngotan95/go-m3u8 v0.1.0/go.mod h1:smzfWHlYpBATVNu1GapKLYiCtEo5JxridIgvvudZ+Wc= diff --git a/hook.go b/hook.go index 61f0dd8..ab15b62 100644 --- a/hook.go +++ b/hook.go @@ -42,6 +42,19 @@ func (h OnSubscribeHook) Trigger(s *OutputStream) { } } +var OnUnSubscribeHooks = make(OnUnSubscribeHook, 0) + +type OnUnSubscribeHook []func(s *OutputStream) + +func (h OnUnSubscribeHook) AddHook(hook func(s *OutputStream)) { + OnUnSubscribeHooks = append(h, hook) +} +func (h OnUnSubscribeHook) Trigger(s *OutputStream) { + for _, f := range h { + f(s) + } +} + var OnDropHooks = make(OnDropHook, 0) type OnDropHook []func(s *OutputStream) @@ -67,3 +80,16 @@ func (h OnSummaryHook) Trigger(v bool) { f(v) } } + +var OnRoomClosedHooks = make(OnRoomClosedHook, 0) + +type OnRoomClosedHook []func(*Room) + +func (h OnRoomClosedHook) AddHook(hook func(*Room)) { + OnRoomClosedHooks = append(h, hook) +} +func (h OnRoomClosedHook) Trigger(v *Room) { + for _, f := range h { + f(v) + } +} diff --git a/index.go b/main.go similarity index 90% rename from index.go rename to main.go index 97e3adf..80de00d 100644 --- a/index.go +++ b/main.go @@ -39,6 +39,12 @@ func Run(configFile string) (err error) { go Summary.StartSummary() var cg map[string]interface{} if _, err = toml.Decode(string(ConfigRaw), &cg); err == nil { + if cfg, ok := cg["Monibuca"]; ok { + b, _ := json.Marshal(cfg) + if err = json.Unmarshal(b, config); err != nil { + log.Println(err) + } + } for name, config := range Plugins { if cfg, ok := cg[name]; ok { b, _ := json.Marshal(cfg) diff --git a/room.go b/room.go index ae7f20a..b787554 100644 --- a/room.go +++ b/room.go @@ -91,6 +91,7 @@ type ChangeRoomCmd struct { func (r *Room) onClosed() { log.Printf("room destoryed :%s", r.StreamPath) AllRoom.Delete(r.StreamPath) + OnRoomClosedHooks.Trigger(r) if r.Publisher != nil { r.OnClosed() } @@ -138,6 +139,7 @@ func (r *Room) Run() { switch v := s.(type) { case *UnSubscribeCmd: delete(r.Subscribers, v.ID) + OnUnSubscribeHooks.Trigger(v.OutputStream) log.Printf("%s subscriber %s removed remains:%d", r.StreamPath, v.ID, len(r.Subscribers)) if len(r.Subscribers) == 0 && r.Publisher == nil { r.Cancel() diff --git a/subscriber.go b/subscriber.go index 1ade342..c322f5c 100644 --- a/subscriber.go +++ b/subscriber.go @@ -6,6 +6,7 @@ import ( "time" "github.com/Monibuca/engine/avformat" + "github.com/pkg/errors" ) // Subscriber 订阅者 @@ -55,6 +56,11 @@ func (s *OutputStream) Close() { //Play 开始订阅 func (s *OutputStream) Play(streamPath string) (err error) { + if !config.EnableWaitRoom { + if _, ok := AllRoom.Load(streamPath); !ok { + return errors.New(fmt.Sprintf("Stream not found:%s", streamPath)) + } + } AllRoom.Get(streamPath).Subscribe(s) defer s.UnSubscribe(s) for {