From 0018e194f1ecc869a3687d54eb8e315fa862d0c4 Mon Sep 17 00:00:00 2001 From: dexter <178529795@qq.com> Date: Sun, 4 Sep 2022 12:51:34 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=91=8C=20IMPROVE:=20=E6=96=B0=E5=A2=9E?= =?UTF-8?q?=E6=B5=81=E5=85=B3=E9=97=AD=E7=90=86=E7=94=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- http.go | 6 +++--- io.go | 5 ++++- plugin.go | 29 ++++++++++++++++++++--------- stream.go | 7 +++++++ 4 files changed, 34 insertions(+), 13 deletions(-) diff --git a/http.go b/http.go index a424a00..6317d0a 100644 --- a/http.go +++ b/http.go @@ -122,18 +122,18 @@ func (conf *GlobalConfig) API_updateConfig(w http.ResponseWriter, r *http.Reques } func (conf *GlobalConfig) API_list_pull(w http.ResponseWriter, r *http.Request) { - var result []any + result := []any{} Pullers.Range(func(key, value any) bool { result = append(result, key) return true - }) + }) if err := json.NewEncoder(w).Encode(result); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) } } func (conf *GlobalConfig) API_list_push(w http.ResponseWriter, r *http.Request) { - var result []any + result := []any{} Pushers.Range(func(key, value any) bool { result = append(result, key) return true diff --git a/io.go b/io.go index 7165aab..31bee49 100644 --- a/io.go +++ b/io.go @@ -71,7 +71,9 @@ func (i *IO[C]) OnEvent(event any) { } } } - +func (io *IO[C]) GetStream() *Stream { + return io.Stream +} func (io *IO[C]) GetIO() *IO[C] { return io } @@ -86,6 +88,7 @@ type IIO interface { Stop() SetIO(any) SetParentCtx(context.Context) + GetStream() *Stream } //Stop 停止订阅或者发布,由订阅者或者发布者调用 diff --git a/plugin.go b/plugin.go index be491f8..6778b5c 100644 --- a/plugin.go +++ b/plugin.go @@ -11,6 +11,7 @@ import ( "runtime" "strings" "sync" + "time" "go.uber.org/zap" "gopkg.in/yaml.v3" @@ -57,6 +58,7 @@ type Plugin struct { RawConfig config.Config //配置的map形式方便查询 Modified config.Config //修改过的配置项 *zap.Logger `json:"-"` + saveTimer *time.Timer //用于保存的时候的延迟,防抖 } func (opt *Plugin) logHandler(pattern string, handler func(http.ResponseWriter, *http.Request)) http.HandlerFunc { @@ -172,15 +174,24 @@ func (opt *Plugin) settingPath() string { } func (opt *Plugin) Save() error { - file, err := os.OpenFile(opt.settingPath(), os.O_CREATE|os.O_WRONLY, 0644) - if err == nil { - defer file.Close() - err = yaml.NewEncoder(file).Encode(opt.Modified) + if opt.saveTimer == nil { + var lock sync.Mutex + opt.saveTimer = time.AfterFunc(time.Second, func() { + lock.Lock() + defer lock.Unlock() + file, err := os.OpenFile(opt.settingPath(), os.O_CREATE|os.O_WRONLY, 0644) + if err == nil { + defer file.Close() + err = yaml.NewEncoder(file).Encode(opt.Modified) + } + if err == nil { + opt.Info("config saved") + } + }) + } else { + opt.saveTimer.Reset(time.Second) } - if err == nil { - opt.Info("config saved") - } - return err + return nil } func (opt *Plugin) Publish(streamPath string, pub IPublisher) error { @@ -233,7 +244,7 @@ func (opt *Plugin) Pull(streamPath string, url string, puller IPuller, save bool defer opt.Info("stop pull", zap.String("remoteURL", url), zap.Error(err)) defer Pullers.Delete(puller) for puller.Reconnect() { - if puller.Pull(); !puller.IsClosed() { + if puller.Pull(); puller.GetStream().IsShutdown() { if err = puller.Connect(); err != nil { return } diff --git a/stream.go b/stream.go index 952435a..3d900ad 100644 --- a/stream.go +++ b/stream.go @@ -147,6 +147,7 @@ type Stream struct { Tracks Tracks AppName string StreamName string + CloseReason StreamAction //流关闭原因 } type StreamSummay struct { Path string @@ -240,6 +241,7 @@ func (r *Stream) action(action StreamAction) (ok bool) { stateEvent = SEwaitClose{event} r.timeout.Reset(r.DelayCloseTimeout) case STATE_CLOSED: + r.CloseReason = action for !r.actionChan.Close() { // 等待channel发送完毕 time.Sleep(time.Millisecond * 100) @@ -259,6 +261,11 @@ func (r *Stream) action(action StreamAction) (ok bool) { } return } + +func (r *Stream) IsShutdown() bool { + return r.CloseReason == ACTION_CLOSE +} + func (r *Stream) IsClosed() bool { if r == nil { return true