diff --git a/config/config.go b/config/config.go index 1fe9312..c7ca9dc 100644 --- a/config/config.go +++ b/config/config.go @@ -1,11 +1,13 @@ package config import ( + "encoding/json" "fmt" "net" "net/http" "os" "reflect" + "regexp" "strings" "time" @@ -14,7 +16,23 @@ import ( "m7s.live/engine/v4/log" ) -type Config map[string]any +type Config struct { + Ptr reflect.Value //指向配置结构体值 + Value any //当前值,优先级:动态修改值>环境变量>配置文件>defaultYaml>全局配置>默认值 + Modify any //动态修改的值 + Env any //环境变量中的值 + File any //配置文件中的值 + Global *Config //全局配置中的值,指针类型 + Default any //默认值 + Enum []struct { + Label string `json:"label"` + Value string `json:"value"` + } + name string // 小写 + propsMap map[string]*Config + props []*Config + tag reflect.StructTag +} var durationType = reflect.TypeOf(time.Duration(0)) @@ -38,252 +56,478 @@ type QuicPlugin interface { ServeQuic(quic.Connection) } -// CreateElem 创建Map或者Slice中的元素 -func (config Config) CreateElem(eleType reflect.Type) reflect.Value { - if eleType.Kind() == reflect.Pointer { - newv := reflect.New(eleType.Elem()) - config.Unmarshal(newv) - return newv - } else { - newv := reflect.New(eleType) - config.Unmarshal(newv) - return newv.Elem() - } -} - -func (config Config) Unmarshal(s any) { - // defer func() { - // if err := recover(); err != nil { - // log.Error("Unmarshal error:", err) - // } - // }() - if s == nil { - return - } - var el reflect.Value - if v, ok := s.(reflect.Value); ok { - el = v - } else { - el = reflect.ValueOf(s) - } - if el.Kind() == reflect.Pointer { - el = el.Elem() - } - t := el.Type() - if t.Kind() == reflect.Map { - tt := t.Elem() - for k, v := range config { - if child, ok := v.(Config); ok { - //复杂类型 - el.SetMapIndex(reflect.ValueOf(k), child.CreateElem(tt)) - } else { - //基本类型 - el.SetMapIndex(reflect.ValueOf(k), reflect.ValueOf(v).Convert(tt)) - } - } - return - } - //字段映射,小写对应的大写 - nameMap := make(map[string]string) - for i, j := 0, t.NumField(); i < j; i++ { - field := t.Field(i) - name := field.Name - if tag := field.Tag.Get("yaml"); tag != "" { - name, _, _ = strings.Cut(tag, ",") - } else { - name = strings.ToLower(name) - } - nameMap[name] = field.Name - } - for k, v := range config { - name, ok := nameMap[k] - if !ok { - log.Warn("no config named:", k) - continue - } - // 需要被写入的字段 - fv := el.FieldByName(name) - if child, ok := v.(Config); ok { //处理值是递归情况(map) - if fv.Kind() == reflect.Map { - if fv.IsNil() { - fv.Set(reflect.MakeMap(fv.Type())) - } - } - child.Unmarshal(fv) - } else { - assign(name, fv, reflect.ValueOf(v)) +func (config *Config) Range(f func(key string, value Config)) { + if m, ok := config.Value.(map[string]Config); ok { + for k, v := range m { + f(k, v) } } } -// 覆盖配置 -func (config Config) Assign(source Config) { - for k, v := range source { - switch m := config[k].(type) { - case Config: - switch vv := v.(type) { - case Config: - m.Assign(vv) - case map[string]any: - m.Assign(Config(vv)) - } - default: - config[k] = v - } - } -} - -// 合并配置,不覆盖 -func (config Config) Merge(source Config) { - for k, v := range source { - if _, ok := config[k]; !ok { - switch m := config[k].(type) { - case Config: - m.Merge(v.(Config)) - default: - if Global.LogLang == "zh" { - log.Debug("合并配置", k, ":", v) - } else { - log.Debug("merge", k, ":", v) - } - config[k] = v - } - } else { - log.Debug("exist", k) - } - } -} - -func (config *Config) Set(key string, value any) { - if *config == nil { - *config = Config{strings.ToLower(key): value} - } else { - (*config)[strings.ToLower(key)] = value - } -} - -func (config Config) Get(key string) (v any) { - v = config[strings.ToLower(key)] - return -} - -func (config Config) Has(key string) (ok bool) { - _, ok = config[strings.ToLower(key)] - return -} - -func (config Config) HasChild(key string) (ok bool) { - _, ok = config[strings.ToLower(key)].(Config) +func (config *Config) IsMap() bool { + _, ok := config.Value.(map[string]Config) return ok } -func (config Config) GetChild(key string) Config { - if v, ok := config[strings.ToLower(key)]; ok && v != nil { - return v.(Config) +func (config *Config) Get(key string) (v *Config) { + if config.propsMap == nil { + config.propsMap = make(map[string]*Config) + } + if v, ok := config.propsMap[key]; ok { + return v + } else { + v = &Config{ + name: key, + } + config.propsMap[key] = v + config.props = append(config.props, v) + return v } - return nil } -func Struct2Config(s any, prefix ...string) (config Config) { - config = make(Config) +func (config Config) Has(key string) (ok bool) { + if config.propsMap == nil { + return false + } + _, ok = config.propsMap[strings.ToLower(key)] + return ok +} + +func (config *Config) MarshalJSON() ([]byte, error) { + if config.props == nil { + return json.Marshal(config.Value) + } + return json.Marshal(config.props) +} + +// Parse 第一步读取配置结构体的默认值 +func (config *Config) Parse(s any, prefix ...string) { var t reflect.Type var v reflect.Value if vv, ok := s.(reflect.Value); ok { t, v = vv.Type(), vv } else { t, v = reflect.TypeOf(s), reflect.ValueOf(s) - if t.Kind() == reflect.Pointer { - t, v = t.Elem(), v.Elem() + } + if t.Kind() == reflect.Pointer { + t, v = t.Elem(), v.Elem() + } + config.Ptr = v + config.Default = v.Interface() + config.Value = v.Interface() + if len(prefix) > 0 { // 读取环境变量 + envKey := strings.Join(prefix, "_") + if envValue := os.Getenv(envKey); envValue != "" { + yaml.Unmarshal([]byte(fmt.Sprintf("env: %s", envValue)), config) + config.Value = config.Env + config.Ptr.Set(reflect.ValueOf(config.Env)) } } - for i, j := 0, t.NumField(); i < j; i++ { - ft, fv := t.Field(i), v.Field(i) - if !ft.IsExported() { - continue - } - name := strings.ToLower(ft.Name) - if tag := ft.Tag.Get("yaml"); tag != "" { - if tag == "-" { + if t.Kind() == reflect.Struct { + for i, j := 0, t.NumField(); i < j; i++ { + ft, fv := t.Field(i), v.Field(i) + if !ft.IsExported() { continue } - name, _, _ = strings.Cut(tag, ",") - } - var envPath []string - if len(prefix) > 0 { - envPath = append(prefix, strings.ToUpper(ft.Name)) - envKey := strings.Join(envPath, "_") - if envValue := os.Getenv(envKey); envValue != "" { - yaml.Unmarshal([]byte(fmt.Sprintf("%s: %s", name, envValue)), config) - assign(envKey, fv, reflect.ValueOf(config[name])) - config[name] = fv.Interface() - return + name := strings.ToLower(ft.Name) + if tag := ft.Tag.Get("yaml"); tag != "" { + if tag == "-" { + continue + } + name, _, _ = strings.Cut(tag, ",") + } + prop := config.Get(name) + prop.Parse(fv, append(prefix, strings.ToUpper(ft.Name))...) + prop.tag = ft.Tag + for _, kv := range strings.Split(ft.Tag.Get("enum"), ",") { + kvs := strings.Split(kv, ":") + if len(kvs) != 2 { + continue + } + prop.Enum = append(prop.Enum, struct { + Label string `json:"label"` + Value string `json:"value"` + }{ + Label: strings.TrimSpace(kvs[1]), + Value: strings.TrimSpace(kvs[0]), + }) } } - switch ft.Type.Kind() { - case reflect.Struct: - config[name] = Struct2Config(fv, envPath...) - default: - reflect.ValueOf(config).SetMapIndex(reflect.ValueOf(name), fv) + } +} + +// ParseDefaultYaml 第二步读取全局配置 +func (config *Config) ParseGlobal(g *Config) { + config.Global = g + if config.propsMap != nil { + for k, v := range config.propsMap { + v.ParseGlobal(g.Get(k)) } + } else { + config.Value = g.Value + } +} + +// ParseDefaultYaml 第三步读取内嵌默认配置 +func (config *Config) ParseDefaultYaml(defaultYaml map[string]any) { + if defaultYaml == nil { + return + } + for k, v := range defaultYaml { + if config.Has(k) { + if prop := config.Get(k); prop.props != nil { + if v != nil { + prop.ParseDefaultYaml(v.(map[string]any)) + } + } else { + dv := prop.assign(k, v) + prop.Default = dv.Interface() + if prop.Env == nil { + prop.Value = dv.Interface() + prop.Ptr.Set(dv) + } + } + } + } +} + +// ParseFile 第四步读取用户配置文件 +func (config *Config) ParseUserFile(conf map[string]any) { + if conf == nil { + return + } + config.File = conf + for k, v := range conf { + if config.Has(k) { + if prop := config.Get(k); prop.props != nil { + if v != nil { + prop.ParseUserFile(v.(map[string]any)) + } + } else { + fv := prop.assign(k, v) + prop.File = fv.Interface() + if prop.Env == nil { + prop.Value = fv.Interface() + prop.Ptr.Set(fv) + } + } + } + } +} + +// ParseModifyFile 第五步读取动态修改配置文件 +func (config *Config) ParseModifyFile(conf map[string]any) { + if conf == nil { + return + } + config.Modify = conf + for k, v := range conf { + if config.Has(k) { + if prop := config.Get(k); prop.props != nil { + if v != nil { + prop.ParseModifyFile(v.(map[string]any)) + } + } else { + mv := prop.assign(k, v) + prop.Modify = mv.Interface() + prop.Value = mv.Interface() + prop.Ptr.Set(mv) + } + } + } +} + +func (config *Config) GetMap() map[string]any { + m := make(map[string]any) + for k, v := range config.propsMap { + if v.props != nil { + if vv := v.GetMap(); vv != nil { + m[k] = vv + } + } else if v.Value != nil { + m[k] = v.Value + } + } + if len(m) > 0 { + return m + } + return nil +} + +func (config *Config) schema(index int) (r any) { + defer func() { + err := recover() + if err != nil { + log.Error(err) + } + }() + if config.props != nil { + r := Card{ + Type: "void", + Component: "Card", + Properties: make(map[string]any), + Index: index, + } + r.ComponentProps = map[string]any{ + "title": config.name, + } + for i, v := range config.props { + r.Properties[v.name] = v.schema(i) + } + return r + } else { + p := Property{ + Title: config.name, + Default: config.Value, + DecoratorProps: map[string]any{ + "tooltip": config.tag.Get("desc"), + }, + ComponentProps: map[string]any{}, + Decorator: "FormItem", + Index: index, + } + if config.Modify != nil { + p.Description = "已动态修改" + } else if config.Env != nil { + p.Description = "使用环境变量中的值" + } else if config.File != nil { + p.Description = "使用配置文件中的值" + } else if config.Global != nil { + p.Description = "已使用全局配置中的值" + } + p.Enum = config.Enum + if config.Ptr.Type() == durationType { + p.Type = "string" + p.Component = "Input" + str := config.Value.(time.Duration).String() + p.ComponentProps = map[string]any{ + "placeholder": str, + } + p.Default = str + p.DecoratorProps["addonAfter"] = "时间,单位:s,m,h,d,例如:100ms, 10s, 4m, 1h" + } else { + switch config.Ptr.Kind() { + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64, reflect.Float32, reflect.Float64: + p.Type = "number" + p.Component = "NumberPicker" + p.ComponentProps = map[string]any{ + "placeholder": config.Value, + } + case reflect.Bool: + p.Type = "boolean" + p.Component = "Switch" + case reflect.String: + p.Type = "string" + p.Component = "Input" + p.ComponentProps = map[string]any{ + "placeholder": config.Value, + } + case reflect.Slice: + p.Type = "array" + p.Component = "Input" + p.ComponentProps = map[string]any{ + "placeholder": config.Value, + } + p.DecoratorProps["addonAfter"] = "数组,每个元素用逗号分隔" + case reflect.Map: + var children []struct { + Key string `json:"mkey"` + Value any `json:"mvalue"` + } + p := Property{ + Type: "array", + Component: "ArrayTable", + Decorator: "FormItem", + Properties: map[string]any{ + "addition": map[string]string{ + "type": "void", + "title": "添加", + "x-component": "ArrayTable.Addition", + }, + }, + Index: index, + Title: config.name, + Items: &Object{ + Type: "object", + Properties: map[string]any{ + "c1": Card{ + Type: "void", + Component: "ArrayTable.Column", + ComponentProps: map[string]any{ + "title": config.tag.Get("key"), + "width": 300, + }, + Properties: map[string]any{ + "mkey": Property{ + Type: "string", + Decorator: "FormItem", + Component: "Input", + }, + }, + Index: 0, + }, + "c2": Card{ + Type: "void", + Component: "ArrayTable.Column", + ComponentProps: map[string]any{ + "title": config.tag.Get("value"), + }, + Properties: map[string]any{ + "mvalue": Property{ + Type: "string", + Decorator: "FormItem", + Component: "Input", + }, + }, + Index: 1, + }, + "operator": Card{ + Type: "void", + Component: "ArrayTable.Column", + ComponentProps: map[string]any{ + "title": "操作", + }, + Properties: map[string]any{ + "remove": Card{ + Type: "void", + Component: "ArrayTable.Remove", + }, + }, + Index: 2, + }, + }, + }, + } + iter := config.Ptr.MapRange() + for iter.Next() { + children = append(children, struct { + Key string `json:"mkey"` + Value any `json:"mvalue"` + }{ + Key: iter.Key().String(), + Value: iter.Value().Interface(), + }) + } + p.Default = children + return p + } + } + if len(p.Enum) > 0 { + p.Component = "Radio.Group" + } + return p + } +} + +func (config *Config) GetFormily() (r Formily) { + r.Form.LabelCol = 4 + r.Form.WrapperCol = 20 + r.Schema = Object{ + Type: "object", + Properties: make(map[string]any), + } + for i, v := range config.props { + r.Schema.Properties[v.name] = v.schema(i) } return } -func assign(k string, target reflect.Value, source reflect.Value) { - ft := target.Type() - if ft == durationType && target.CanSet() { +// func (config *Config) GetModify() map[string]any { +// m := make(map[string]any) +// for k, v := range config.props { +// if v.props != nil { +// if vv := v.GetModify(); vv != nil { +// m[k] = vv +// } +// } else if v.Modify != nil { +// m[k] = v.Modify +// } +// } +// if len(m) > 0 { +// return m +// } +// return nil +// } + +var regexPureNumber = regexp.MustCompile(`^\d+$`) + +func (config *Config) assign(k string, v any) (target reflect.Value) { + ft := config.Ptr.Type() + + source := reflect.ValueOf(v) + + if ft == durationType { + target = reflect.New(ft).Elem() if source.Type() == durationType { target.Set(source) } else if source.IsZero() || !source.IsValid() { target.SetInt(0) - } else if d, err := time.ParseDuration(source.String()); err == nil { - target.SetInt(int64(d)) } else { - if Global.LogLang == "zh" { - log.Errorf("%s 无效的时间值: %v 请添加单位(s,m,h,d),例如:100ms, 10s, 4m, 1h", k, source) + timeStr := source.String() + if d, err := time.ParseDuration(timeStr); err == nil && !regexPureNumber.MatchString(timeStr) { + target.SetInt(int64(d)) } else { - log.Errorf("%s invalid duration value: %v please add unit (s,m,h,d),eg: 100ms, 10s, 4m, 1h", k, source) + if Global.LogLang == "zh" { + log.Errorf("%s 无效的时间值: %v 请添加单位(s,m,h,d),例如:100ms, 10s, 4m, 1h", k, source) + } else { + log.Errorf("%s invalid duration value: %v please add unit (s,m,h,d),eg: 100ms, 10s, 4m, 1h", k, source) + } + os.Exit(1) } - os.Exit(1) } return } - switch target.Kind() { - case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: - target.SetUint(uint64(source.Int())) - case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: - target.SetInt(source.Int()) - case reflect.Float32, reflect.Float64: - if source.CanFloat() { - target.SetFloat(source.Float()) - } else { - target.SetFloat(float64(source.Int())) - } - case reflect.Slice: - var s reflect.Value - if source.Kind() == reflect.Slice { - l := source.Len() - s = reflect.MakeSlice(ft, l, source.Cap()) - for i := 0; i < l; i++ { - fv := source.Index(i) - item := s.Index(i) - if child, ok := fv.Interface().(Config); ok { - item.Set(child.CreateElem(ft.Elem())) - } else if fv.Kind() == reflect.Interface { - item.Set(reflect.ValueOf(fv.Interface()).Convert(item.Type())) - } else { - item.Set(fv) - } - } - } else { - //值是单值,但类型是数组,默认解析为一个元素的数组 - s = reflect.MakeSlice(ft, 1, 1) - s.Index(0).Set(source) - } - target.Set(s) - default: - if source.IsValid() { - target.Set(source.Convert(ft)) - } - } + + tmpStruct := reflect.StructOf([]reflect.StructField{ + { + Name: strings.ToUpper(k), + Type: ft, + }, + }) + tmpValue := reflect.New(tmpStruct) + tmpByte, _ := yaml.Marshal(map[string]any{k: v}) + yaml.Unmarshal(tmpByte, tmpValue.Interface()) + return tmpValue.Elem().Field(0) + // switch target.Kind() { + // case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + // target.SetUint(uint64(source.Int())) + // case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + // target.SetInt(source.Int()) + // case reflect.Float32, reflect.Float64: + // if source.CanFloat() { + // target.SetFloat(source.Float()) + // } else { + // target.SetFloat(float64(source.Int())) + // } + // case reflect.Map: + + // case reflect.Slice: + // var s reflect.Value + // if source.Kind() == reflect.Slice { + // l := source.Len() + // s = reflect.MakeSlice(ft, l, source.Cap()) + // for i := 0; i < l; i++ { + // fv := source.Index(i) + // item := s.Index(i) + // if child, ok := fv.Interface().(map[string]any); ok { + // panic(child) + // // item.Set(child.CreateElem(ft.Elem())) + // } else if fv.Kind() == reflect.Interface { + // item.Set(reflect.ValueOf(fv.Interface()).Convert(item.Type())) + // } else { + // item.Set(fv) + // } + // } + // } else { + // //值是单值,但类型是数组,默认解析为一个元素的数组 + // s = reflect.MakeSlice(ft, 1, 1) + // s.Index(0).Set(source) + // } + // target.Set(s) + // default: + // if source.IsValid() { + // target.Set(source.Convert(ft)) + // } + // } + return } diff --git a/config/formily.go b/config/formily.go new file mode 100644 index 0000000..1a24ee7 --- /dev/null +++ b/config/formily.go @@ -0,0 +1,40 @@ +package config + +type Property struct { + Type string `json:"type"` + Title string `json:"title"` + Description string `json:"description"` + Enum []struct { + Label string `json:"label"` + Value string `json:"value"` + } `json:"enum,omitempty"` + Items *Object `json:"items,omitempty"` + Properties map[string]any `json:"properties,omitempty"` + Default any `json:"default,omitempty"` + Decorator string `json:"x-decorator"` + DecoratorProps map[string]any `json:"x-decorator-props,omitempty"` + Component string `json:"x-component"` + ComponentProps map[string]any `json:"x-component-props,omitempty"` + Index int `json:"x-index"` +} + +type Card struct { + Type string `json:"type"` + Properties map[string]any `json:"properties,omitempty"` + Component string `json:"x-component"` + ComponentProps map[string]any `json:"x-component-props,omitempty"` + Index int `json:"x-index"` +} + +type Object struct { + Type string `json:"type"` + Properties map[string]any `json:"properties"` +} + +type Formily struct { + Form struct { + LabelCol int `json:"labelCol"` + WrapperCol int `json:"wrapperCol"` + } `json:"form"` + Schema Object `json:"schema"` +} diff --git a/config/http.go b/config/http.go index d9f7e17..451dd99 100644 --- a/config/http.go +++ b/config/http.go @@ -16,16 +16,16 @@ var _ HTTPConfig = (*HTTP)(nil) type Middleware func(string, http.Handler) http.Handler type HTTP struct { - ListenAddr string - ListenAddrTLS string - CertFile string - KeyFile string - CORS bool `default:"true"` //是否自动添加CORS头 - UserName string - Password string - ReadTimeout time.Duration - WriteTimeout time.Duration - IdleTimeout time.Duration + ListenAddr string `desc:"监听地址"` + ListenAddrTLS string `desc:"监听地址HTTPS"` + CertFile string `desc:"HTTPS证书文件"` + KeyFile string `desc:"HTTPS密钥文件"` + CORS bool `default:"true" desc:"是否自动添加CORS头"` //是否自动添加CORS头 + UserName string `desc:"基本身份认证用户名"` + Password string `desc:"基本身份认证密码"` + ReadTimeout time.Duration `desc:"读取超时"` + WriteTimeout time.Duration `desc:"写入超时"` + IdleTimeout time.Duration `desc:"空闲超时"` mux *http.ServeMux middlewares []Middleware } diff --git a/config/types.go b/config/types.go index 96e147a..1808ece 100755 --- a/config/types.go +++ b/config/types.go @@ -30,21 +30,21 @@ type PushConfig interface { } type Publish struct { - PubAudio bool `default:"true"` - PubVideo bool `default:"true"` - InsertSEI bool // 是否启用SEI插入 - KickExist bool // 是否踢掉已经存在的发布者 - PublishTimeout time.Duration `default:"10s"` // 发布无数据超时 - WaitCloseTimeout time.Duration // 延迟自动关闭(等待重连) - DelayCloseTimeout time.Duration // 延迟自动关闭(无订阅时) - IdleTimeout time.Duration // 空闲(无订阅)超时 - PauseTimeout time.Duration `default:"30s"` // 暂停超时 - BufferTime time.Duration // 缓冲长度(单位:秒),0代表取最近关键帧 - SpeedLimit time.Duration `default:"500ms"` //速度限制最大等待时间 - Key string // 发布鉴权key - SecretArgName string `default:"secret"` // 发布鉴权参数名 - ExpireArgName string `default:"expire"` // 发布鉴权失效时间参数名 - RingSize string `default:"256-1024"` // 初始缓冲区大小 + PubAudio bool `default:"true" desc:"是否发布音频"` + PubVideo bool `default:"true" desc:"是否发布视频"` + InsertSEI bool `desc:"是否启用SEI插入"` // 是否启用SEI插入 + KickExist bool `desc:"是否踢掉已经存在的发布者"` // 是否踢掉已经存在的发布者 + PublishTimeout time.Duration `default:"10s" desc:"发布无数据超时"` // 发布无数据超时 + WaitCloseTimeout time.Duration `desc:"延迟自动关闭(等待重连)"` // 延迟自动关闭(等待重连) + DelayCloseTimeout time.Duration `desc:"延迟自动关闭(无订阅时)"` // 延迟自动关闭(无订阅时) + IdleTimeout time.Duration `desc:"空闲(无订阅)超时"` // 空闲(无订阅)超时 + PauseTimeout time.Duration `default:"30s" desc:"暂停超时时间"` // 暂停超时 + BufferTime time.Duration `desc:"缓冲长度(单位:秒),0代表取最近关键帧"` // 缓冲长度(单位:秒),0代表取最近关键帧 + SpeedLimit time.Duration `default:"500ms" desc:"速度限制最大等待时间,0则不等待"` //速度限制最大等待时间 + Key string `desc:"发布鉴权key"` // 发布鉴权key + SecretArgName string `default:"secret" desc:"发布鉴权参数名"` // 发布鉴权参数名 + ExpireArgName string `default:"expire" desc:"发布鉴权失效时间参数名"` // 发布鉴权失效时间参数名 + RingSize string `default:"256-1024" desc:"缓冲范围"` // 初始缓冲区大小 } func (c Publish) GetPublishConfig() Publish { @@ -52,24 +52,24 @@ func (c Publish) GetPublishConfig() Publish { } type Subscribe struct { - SubAudio bool `default:"true"` - SubVideo bool `default:"true"` - SubVideoArgName string `default:"vts"` // 指定订阅的视频轨道参数名 - SubAudioArgName string `default:"ats"` // 指定订阅的音频轨道参数名 - SubDataArgName string `default:"dts"` // 指定订阅的数据轨道参数名 - SubModeArgName string `default:"mode"` // 指定订阅的模式参数名 - SubAudioTracks []string // 指定订阅的音频轨道 - SubVideoTracks []string // 指定订阅的视频轨道 - SubDataTracks []string // 指定订阅的数据轨道 - SubMode int // 0,实时模式:追赶发布者进度,在播放首屏后等待发布者的下一个关键帧,然后跳到该帧。1、首屏后不进行追赶。2、从缓冲最大的关键帧开始播放,也不追赶,需要发布者配置缓存长度 - SyncMode int // 0,采用时间戳同步,1,采用写入时间同步 - IFrameOnly bool // 只要关键帧 - WaitTimeout time.Duration `default:"10s"` // 等待流超时 - WriteBufferSize int `default:"0"` // 写缓冲大小 - Key string // 订阅鉴权key - SecretArgName string `default:"secret"` // 订阅鉴权参数名 - ExpireArgName string `default:"expire"` // 订阅鉴权失效时间参数名 - Internal bool `default:"false"` // 是否内部订阅 + SubAudio bool `default:"true" desc:"是否订阅音频"` + SubVideo bool `default:"true" desc:"是否订阅视频"` + SubVideoArgName string `default:"vts" desc:"定订阅的视频轨道参数名"` // 指定订阅的视频轨道参数名 + SubAudioArgName string `default:"ats" desc:"指定订阅的音频轨道参数名"` // 指定订阅的音频轨道参数名 + SubDataArgName string `default:"dts" desc:"指定订阅的数据轨道参数名"` // 指定订阅的数据轨道参数名 + SubModeArgName string `desc:"指定订阅的模式参数名"` // 指定订阅的模式参数名 + SubAudioTracks []string `desc:"指定订阅的音频轨道"` // 指定订阅的音频轨道 + SubVideoTracks []string `desc:"指定订阅的视频轨道"` // 指定订阅的视频轨道 + SubDataTracks []string `desc:"指定订阅的数据轨道"` // 指定订阅的数据轨道 + SubMode int `desc:"订阅模式" enum:"0:实时模式,1:首屏后不进行追赶,2:从缓冲最大的关键帧开始播放"` // 0,实时模式:追赶发布者进度,在播放首屏后等待发布者的下一个关键帧,然后跳到该帧。1、首屏后不进行追赶。2、从缓冲最大的关键帧开始播放,也不追赶,需要发布者配置缓存长度 + SyncMode int `desc:"同步模式" enum:"0:采用时间戳同步,1:采用写入时间同步"` // 0,采用时间戳同步,1,采用写入时间同步 + IFrameOnly bool `desc:"只要关键帧"` // 只要关键帧 + WaitTimeout time.Duration `default:"10s" desc:"等待流超时时间"` // 等待流超时 + WriteBufferSize int `desc:"写缓冲大小"` // 写缓冲大小 + Key string `desc:"订阅鉴权key"` // 订阅鉴权key + SecretArgName string `default:"secret" desc:"订阅鉴权参数名"` // 订阅鉴权参数名 + ExpireArgName string `default:"expire" desc:"订阅鉴权失效时间参数名"` // 订阅鉴权失效时间参数名 + Internal bool `default:"false" desc:"是否内部订阅"` // 是否内部订阅 } func (c *Subscribe) GetSubscribeConfig() *Subscribe { @@ -181,10 +181,10 @@ func (p *Push) CheckPush(streamPath string) string { } type Console struct { - Server string `default:"console.monibuca.com:44944"` //远程控制台地址 - Secret string //远程控制台密钥 - PublicAddr string //公网地址,提供远程控制台访问的地址,不配置的话使用自动识别的地址 - PublicAddrTLS string + Server string `default:"console.monibuca.com:44944" desc:"远程控制台地址"` //远程控制台地址 + Secret string `desc:"远程控制台密钥"` //远程控制台密钥 + PublicAddr string `desc:"远程控制台公网地址"` //公网地址,提供远程控制台访问的地址,不配置的话使用自动识别的地址 + PublicAddrTLS string `desc:"远程控制台公网TLS地址"` } type Engine struct { @@ -192,18 +192,18 @@ type Engine struct { Subscribe HTTP Console - EnableAVCC bool `default:"true"` //启用AVCC格式,rtmp、http-flv协议使用 - EnableRTP bool `default:"true"` //启用RTP格式,rtsp、webrtc等协议使用,已废弃,在 rtp 下面配置 - EnableSubEvent bool `default:"true"` //启用订阅事件,禁用可以提高性能 - EnableAuth bool `default:"true"` //启用鉴权 - LogLang string `default:"zh"` //日志语言 - LogLevel string `default:"info"` //日志级别 - EventBusSize int `default:"10"` //事件总线大小 - PulseInterval time.Duration `default:"5s"` //心跳事件间隔 - DisableAll bool `default:"false"` //禁用所有插件 - RTPReorderBufferLen int `default:"50"` //RTP重排序缓冲区长度 - PoolSize int //内存池大小 - enableReport bool `default:"false"` //启用报告,用于统计和监控 + EnableAVCC bool `default:"true" desc:"启用AVCC格式,rtmp、http-flv协议使用"` //启用AVCC格式,rtmp、http-flv协议使用 + EnableRTP bool `default:"true" desc:"启用RTP格式,rtsp、webrtc等协议使用"` //启用RTP格式,rtsp、webrtc等协议使用 + EnableSubEvent bool `default:"true" desc:"启用订阅事件,禁用可以提高性能"` //启用订阅事件,禁用可以提高性能 + EnableAuth bool `default:"true" desc:"启用鉴权"` //启用鉴权 + LogLang string `default:"zh" desc:"日志语言" enum:"zh:中文,en:英文"` //日志语言 + LogLevel string `default:"info" enum:"trace:跟踪,debug:调试,info:信息,warn:警告,error:错误"` //日志级别 + EventBusSize int `default:"10" desc:"事件总线大小"` //事件总线大小 + PulseInterval time.Duration `default:"5s" desc:"心跳事件间隔"` //心跳事件间隔 + DisableAll bool `default:"false" desc:"禁用所有插件"` //禁用所有插件 + RTPReorderBufferLen int `default:"50" desc:"RTP重排序缓冲区长度"` //RTP重排序缓冲区长度 + PoolSize int `desc:"内存池大小"` //内存池大小 + enableReport bool `default:"false"` //启用报告,用于统计和监控 reportStream quic.Stream // console server connection instanceId string // instance id 来自console } diff --git a/http.go b/http.go index 891a22b..c5d6703 100644 --- a/http.go +++ b/http.go @@ -88,18 +88,20 @@ func (conf *GlobalConfig) API_getConfig(w http.ResponseWriter, r *http.Request) } var data any if q.Get("yaml") != "" { - mm, err := yaml.Marshal(p.RawConfig) + data = struct { + File any + Modified any + Merged any + }{ + p.RawConfig.File, p.RawConfig.Modify, p.RawConfig.GetMap(), + } + mm, err := yaml.Marshal(data) if err != nil { mm = []byte("") } - data = struct { - File string - Modified string - Merged string - }{ - p.Yaml, p.modifiedYaml, string(mm), - } - + data = mm + } else if q.Get("formily") != "" { + data = p.RawConfig.GetFormily() } else { data = p.RawConfig } @@ -121,19 +123,16 @@ func (conf *GlobalConfig) API_modifyConfig(w http.ResponseWriter, r *http.Reques } else { p = Engine } + var modified map[string]any if q.Get("yaml") != "" { - err = yaml.NewDecoder(r.Body).Decode(&p.Modified) + err = yaml.NewDecoder(r.Body).Decode(&modified) } else { - err = json.NewDecoder(r.Body).Decode(&p.Modified) + err = json.NewDecoder(r.Body).Decode(&modified) } if err != nil { util.ReturnError(util.APIErrorDecode, err.Error(), w, r) } else if err = p.Save(); err == nil { - p.RawConfig.Assign(p.Modified) - out, err := yaml.Marshal(p.Modified) - if err == nil { - p.modifiedYaml = string(out) - } + p.RawConfig.ParseModifyFile(modified) util.ReturnOK(w, r) } else { util.ReturnError(util.APIErrorSave, err.Error(), w, r) @@ -154,7 +153,7 @@ func (conf *GlobalConfig) API_updateConfig(w http.ResponseWriter, r *http.Reques } else { p = Engine } - p.Update(p.Modified) + p.Update(&p.RawConfig) util.ReturnOK(w, r) } diff --git a/main.go b/main.go index 9260917..5762774 100755 --- a/main.go +++ b/main.go @@ -21,7 +21,6 @@ import ( "go.uber.org/zap" "go.uber.org/zap/zapcore" "gopkg.in/yaml.v3" - "m7s.live/engine/v4/config" "m7s.live/engine/v4/lang" "m7s.live/engine/v4/log" "m7s.live/engine/v4/util" @@ -62,7 +61,7 @@ func Run(ctx context.Context, conf any) (err error) { SysInfo.StartTime = time.Now() SysInfo.Version = Engine.Version Engine.Context = ctx - var cg config.Config + var cg map[string]map[string]any switch v := conf.(type) { case string: if _, err = os.Stat(v); err != nil { @@ -73,7 +72,7 @@ func Run(ctx context.Context, conf any) (err error) { } case []byte: ConfigRaw = v - case config.Config: + case map[string]map[string]any: cg = v } @@ -91,13 +90,9 @@ func Run(ctx context.Context, conf any) (err error) { log.Error("parsing yml error:", err) } } + Engine.RawConfig.Parse(&EngineConfig.Engine, "GLOBAL") if cg != nil { - Engine.RawConfig = cg.GetChild("global") - if b, err := yaml.Marshal(Engine.RawConfig); err == nil { - Engine.Yaml = string(b) - } - //将配置信息同步到结构体 - Engine.RawConfig.Unmarshal(&EngineConfig.Engine) + Engine.RawConfig.ParseUserFile(cg["global"]) } var logger log.Logger log.LocaleLogger = logger.Lang(lang.Get(EngineConfig.LogLang)) @@ -114,8 +109,7 @@ func Run(ctx context.Context, conf any) (err error) { } Engine.Logger = log.LocaleLogger.Named("engine") - // 使得RawConfig具备全量配置信息,用于合并到插件配置中 - Engine.RawConfig = config.Struct2Config(&EngineConfig.Engine, "GLOBAL") + Engine.assign() Engine.Logger.Debug("", zap.Any("config", EngineConfig)) util.PoolSize = EngineConfig.PoolSize @@ -129,21 +123,36 @@ func Run(ctx context.Context, conf any) (err error) { continue } plugin.Info("initialize", zap.String("version", plugin.Version)) - userConfig := cg.GetChild(plugin.Name) - if userConfig != nil { - if b, err := yaml.Marshal(userConfig); err == nil { - plugin.Yaml = string(b) + + plugin.RawConfig.Parse(plugin.Config, strings.ToUpper(plugin.Name)) + for _, fname := range MergeConfigs { + if name := strings.ToLower(fname); plugin.RawConfig.Has(name) { + plugin.RawConfig.Get(name).ParseGlobal(Engine.RawConfig.Get(name)) } } + var userConfig map[string]any if defaultYaml := reflect.ValueOf(plugin.Config).Elem().FieldByName("DefaultYaml"); defaultYaml.IsValid() { - if err := yaml.Unmarshal([]byte(defaultYaml.String()), &plugin.RawConfig); err != nil { + if err := yaml.Unmarshal([]byte(defaultYaml.String()), &userConfig); err != nil { log.Error("parsing default config error:", err) + } else { + plugin.RawConfig.ParseDefaultYaml(userConfig) } } - if plugin.Yaml != "" { - yaml.Unmarshal([]byte(plugin.Yaml), &plugin.RawConfig) + userConfig = cg[strings.ToLower(plugin.Name)] + plugin.RawConfig.ParseUserFile(userConfig) + if EngineConfig.DisableAll { + plugin.Disabled = true + } + if userConfig["enable"] == false { + plugin.Disabled = true + } else if userConfig["enable"] == true { + plugin.Disabled = false + } + if plugin.Disabled { + plugin.Warn("plugin disabled") + } else { + plugin.assign() } - plugin.assign() } UUID := uuid.NewString() diff --git a/plugin.go b/plugin.go index 4a81a7f..7a67daf 100644 --- a/plugin.go +++ b/plugin.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "io" "net/http" "os" "path/filepath" @@ -52,7 +51,7 @@ func InstallPlugin(config config.Plugin) *Plugin { return plugin } -type FirstConfig config.Config +type FirstConfig *config.Config type DefaultYaml string // Plugin 插件信息 @@ -62,10 +61,7 @@ type Plugin struct { Name string //插件名称 Config config.Plugin `json:"-" yaml:"-"` //类型化的插件配置 Version string //插件版本 - Yaml string //配置文件中的配置项 - modifiedYaml string //修改过的配置的yaml文件内容 RawConfig config.Config //最终合并后的配置的map形式方便查询 - Modified config.Config //修改过的配置项 *log.Logger `json:"-" yaml:"-"` saveTimer *time.Timer //用于保存的时候的延迟,防抖 Disabled bool @@ -104,73 +100,20 @@ func (opt *Plugin) assign() { f, err := os.Open(opt.settingPath()) defer f.Close() if err == nil { - var b []byte - b, err = io.ReadAll(f) - if err == nil { - opt.modifiedYaml = string(b) - if err = yaml.Unmarshal(b, &opt.Modified); err == nil { - err = yaml.Unmarshal(b, &opt.RawConfig) - } - } - if err != nil { - opt.Warn("assign config failed", zap.Error(err)) - } - } - - if opt == Engine { - opt.registerHandler() - return - } - if EngineConfig.DisableAll { - opt.Disabled = true - } - if opt.RawConfig == nil { - opt.RawConfig = config.Config{} - } else if opt.RawConfig["enable"] == false { - opt.Disabled = true - } else if opt.RawConfig["enable"] == true { - opt.Disabled = false - //移除这个属性防止反序列化报错 - delete(opt.RawConfig, "enable") - } - if opt.Disabled { - opt.Warn("plugin disabled") - return - } - t := reflect.TypeOf(opt.Config).Elem() - // 用全局配置覆盖没有设置的配置 - for _, fname := range MergeConfigs { - if _, ok := t.FieldByName(fname); ok { - if v, ok := Engine.RawConfig[strings.ToLower(fname)]; ok { - if !opt.RawConfig.Has(fname) { - opt.RawConfig.Set(fname, v) - } else if opt.RawConfig.HasChild(fname) { - opt.RawConfig.GetChild(fname).Merge(Engine.RawConfig.GetChild(fname)) - } - } - } + var modifyConfig map[string]any + err = yaml.NewDecoder(f).Decode(&modifyConfig) + opt.RawConfig.ParseModifyFile(modifyConfig) } opt.registerHandler() - opt.run() + if opt != Engine { + opt.run() + } } func (opt *Plugin) run() { opt.Context, opt.CancelFunc = context.WithCancel(Engine) - opt.RawConfig.Unmarshal(opt.Config) - opt.RawConfig = config.Struct2Config(opt.Config, strings.ToUpper(opt.Name)) - // var buffer bytes.Buffer - // err := yaml.NewEncoder(&buffer).Encode(opt.Config) - // if err != nil { - // panic(err) - // } - // err = yaml.NewDecoder(&buffer).Decode(&opt.RawConfig) - // if err != nil { - // panic(err) - // } - opt.Config.OnEvent(FirstConfig(opt.RawConfig)) - delete(opt.RawConfig, "defaultyaml") + opt.Config.OnEvent(FirstConfig(&opt.RawConfig)) opt.Debug("config", zap.Any("config", opt.Config)) - // opt.RawConfig = config.Struct2Config(opt.Config) if conf, ok := opt.Config.(config.HTTPConfig); ok { go conf.Listen(opt) } @@ -180,8 +123,7 @@ func (opt *Plugin) run() { } // Update 热更新配置 -func (opt *Plugin) Update(conf config.Config) { - conf.Unmarshal(opt.Config) +func (opt *Plugin) Update(conf *config.Config) { opt.Config.OnEvent(conf) } @@ -198,22 +140,6 @@ func (opt *Plugin) registerHandler() { switch handler := v.Method(i).Interface().(type) { case func(http.ResponseWriter, *http.Request): opt.handle(patten, http.HandlerFunc(handler)) - // case func(*http.Request) (int, string, any): - // opt.handle(patten, http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { - // code, msg, data := handler(r) - // switch returnMode { - // case "json": - // rw.Header().Set("Content-Type", "application/json") - // rw.WriteHeader(http.StatusOK) - // json.NewEncoder(rw).Encode(map[string]interface{}{ - // "code": code, - // "msg": msg, - // "data": data, - // }) - // default: - // http.Error(rw, msg, code) - // } - // })) } } } @@ -231,7 +157,7 @@ func (opt *Plugin) Save() error { file, err := os.OpenFile(opt.settingPath(), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666) if err == nil { defer file.Close() - err = yaml.NewEncoder(file).Encode(opt.Modified) + err = yaml.NewEncoder(file).Encode(opt.RawConfig.Modify) } if err == nil { opt.Info("config saved") @@ -279,7 +205,7 @@ func (opt *Plugin) SubscribeExist(streamPath string, sub ISubscriber) error { } return opt.Subscribe(streamPath, sub) } -func (opt *Plugin) AssignSubConfig(suber *Subscriber) { +func (opt *Plugin) AssignSubConfig(suber *Subscriber) { if suber.Config == nil { conf, ok := opt.Config.(config.SubscribeConfig) if !ok { @@ -292,6 +218,7 @@ func (opt *Plugin) AssignSubConfig(suber *Subscriber) { suber.ID = fmt.Sprintf("%d", uintptr(unsafe.Pointer(suber))) } } + // Subscribe 订阅一个流,如果流不存在则创建一个等待流 func (opt *Plugin) Subscribe(streamPath string, sub ISubscriber) error { suber := sub.GetSubscriber() @@ -335,14 +262,12 @@ func (opt *Plugin) Pull(streamPath string, url string, puller IPuller, save int) switch save { case 1: pullConf.AddPullOnStart(streamPath, url) + opt.RawConfig.Get("pull").Get("pullonstart").Modify = pullConf.PullOnStart case 2: pullConf.AddPullOnSub(streamPath, url) + opt.RawConfig.Get("pull").Get("pullonsub").Modify = pullConf.PullOnSub } if save > 0 { - if opt.Modified == nil { - opt.Modified = make(config.Config) - } - opt.Modified["pull"] = config.Struct2Config(pullConf) if err = opt.Save(); err != nil { opt.Error("save faild", zap.Error(err)) } @@ -372,10 +297,7 @@ func (opt *Plugin) Push(streamPath string, url string, pusher IPusher, save bool go pusher.startPush(pusher) if save { pushConfig.AddPush(url, streamPath) - if opt.Modified == nil { - opt.Modified = make(config.Config) - } - opt.Modified["push"] = config.Struct2Config(pushConfig) + opt.RawConfig.Get("push").Get("pushlist").Modify = pushConfig.PushList if err = opt.Save(); err != nil { opt.Error("save faild", zap.Error(err)) }