👌 IMPROVE: 新增流关闭理由

This commit is contained in:
dexter
2022-09-04 12:51:34 +08:00
parent 51e002621b
commit 0018e194f1
4 changed files with 34 additions and 13 deletions

View File

@@ -122,7 +122,7 @@ func (conf *GlobalConfig) API_updateConfig(w http.ResponseWriter, r *http.Reques
}
func (conf *GlobalConfig) API_list_pull(w http.ResponseWriter, r *http.Request) {
var result []any
result := []any{}
Pullers.Range(func(key, value any) bool {
result = append(result, key)
return true
@@ -133,7 +133,7 @@ func (conf *GlobalConfig) API_list_pull(w http.ResponseWriter, r *http.Request)
}
func (conf *GlobalConfig) API_list_push(w http.ResponseWriter, r *http.Request) {
var result []any
result := []any{}
Pushers.Range(func(key, value any) bool {
result = append(result, key)
return true

5
io.go
View File

@@ -71,7 +71,9 @@ func (i *IO[C]) OnEvent(event any) {
}
}
}
func (io *IO[C]) GetStream() *Stream {
return io.Stream
}
func (io *IO[C]) GetIO() *IO[C] {
return io
}
@@ -86,6 +88,7 @@ type IIO interface {
Stop()
SetIO(any)
SetParentCtx(context.Context)
GetStream() *Stream
}
//Stop 停止订阅或者发布,由订阅者或者发布者调用

View File

@@ -11,6 +11,7 @@ import (
"runtime"
"strings"
"sync"
"time"
"go.uber.org/zap"
"gopkg.in/yaml.v3"
@@ -57,6 +58,7 @@ type Plugin struct {
RawConfig config.Config //配置的map形式方便查询
Modified config.Config //修改过的配置项
*zap.Logger `json:"-"`
saveTimer *time.Timer //用于保存的时候的延迟,防抖
}
func (opt *Plugin) logHandler(pattern string, handler func(http.ResponseWriter, *http.Request)) http.HandlerFunc {
@@ -172,6 +174,11 @@ func (opt *Plugin) settingPath() string {
}
func (opt *Plugin) Save() error {
if opt.saveTimer == nil {
var lock sync.Mutex
opt.saveTimer = time.AfterFunc(time.Second, func() {
lock.Lock()
defer lock.Unlock()
file, err := os.OpenFile(opt.settingPath(), os.O_CREATE|os.O_WRONLY, 0644)
if err == nil {
defer file.Close()
@@ -180,7 +187,11 @@ func (opt *Plugin) Save() error {
if err == nil {
opt.Info("config saved")
}
return err
})
} else {
opt.saveTimer.Reset(time.Second)
}
return nil
}
func (opt *Plugin) Publish(streamPath string, pub IPublisher) error {
@@ -233,7 +244,7 @@ func (opt *Plugin) Pull(streamPath string, url string, puller IPuller, save bool
defer opt.Info("stop pull", zap.String("remoteURL", url), zap.Error(err))
defer Pullers.Delete(puller)
for puller.Reconnect() {
if puller.Pull(); !puller.IsClosed() {
if puller.Pull(); puller.GetStream().IsShutdown() {
if err = puller.Connect(); err != nil {
return
}

View File

@@ -147,6 +147,7 @@ type Stream struct {
Tracks Tracks
AppName string
StreamName string
CloseReason StreamAction //流关闭原因
}
type StreamSummay struct {
Path string
@@ -240,6 +241,7 @@ func (r *Stream) action(action StreamAction) (ok bool) {
stateEvent = SEwaitClose{event}
r.timeout.Reset(r.DelayCloseTimeout)
case STATE_CLOSED:
r.CloseReason = action
for !r.actionChan.Close() {
// 等待channel发送完毕
time.Sleep(time.Millisecond * 100)
@@ -259,6 +261,11 @@ func (r *Stream) action(action StreamAction) (ok bool) {
}
return
}
func (r *Stream) IsShutdown() bool {
return r.CloseReason == ACTION_CLOSE
}
func (r *Stream) IsClosed() bool {
if r == nil {
return true