feat: add formiliy support

This commit is contained in:
langhuihui
2023-12-01 17:54:10 +08:00
parent 268b237e35
commit 9e2ca2a670
7 changed files with 614 additions and 400 deletions

View File

@@ -1,11 +1,13 @@
package config package config
import ( import (
"encoding/json"
"fmt" "fmt"
"net" "net"
"net/http" "net/http"
"os" "os"
"reflect" "reflect"
"regexp"
"strings" "strings"
"time" "time"
@@ -14,7 +16,23 @@ import (
"m7s.live/engine/v4/log" "m7s.live/engine/v4/log"
) )
type Config map[string]any type Config struct {
Ptr reflect.Value //指向配置结构体值
Value any //当前值,优先级:动态修改值>环境变量>配置文件>defaultYaml>全局配置>默认值
Modify any //动态修改的值
Env any //环境变量中的值
File any //配置文件中的值
Global *Config //全局配置中的值,指针类型
Default any //默认值
Enum []struct {
Label string `json:"label"`
Value string `json:"value"`
}
name string // 小写
propsMap map[string]*Config
props []*Config
tag reflect.StructTag
}
var durationType = reflect.TypeOf(time.Duration(0)) var durationType = reflect.TypeOf(time.Duration(0))
@@ -38,164 +56,74 @@ type QuicPlugin interface {
ServeQuic(quic.Connection) ServeQuic(quic.Connection)
} }
// CreateElem 创建Map或者Slice中的元素 func (config *Config) Range(f func(key string, value Config)) {
func (config Config) CreateElem(eleType reflect.Type) reflect.Value { if m, ok := config.Value.(map[string]Config); ok {
if eleType.Kind() == reflect.Pointer { for k, v := range m {
newv := reflect.New(eleType.Elem()) f(k, v)
config.Unmarshal(newv)
return newv
} else {
newv := reflect.New(eleType)
config.Unmarshal(newv)
return newv.Elem()
}
}
func (config Config) Unmarshal(s any) {
// defer func() {
// if err := recover(); err != nil {
// log.Error("Unmarshal error:", err)
// }
// }()
if s == nil {
return
}
var el reflect.Value
if v, ok := s.(reflect.Value); ok {
el = v
} else {
el = reflect.ValueOf(s)
}
if el.Kind() == reflect.Pointer {
el = el.Elem()
}
t := el.Type()
if t.Kind() == reflect.Map {
tt := t.Elem()
for k, v := range config {
if child, ok := v.(Config); ok {
//复杂类型
el.SetMapIndex(reflect.ValueOf(k), child.CreateElem(tt))
} else {
//基本类型
el.SetMapIndex(reflect.ValueOf(k), reflect.ValueOf(v).Convert(tt))
}
}
return
}
//字段映射,小写对应的大写
nameMap := make(map[string]string)
for i, j := 0, t.NumField(); i < j; i++ {
field := t.Field(i)
name := field.Name
if tag := field.Tag.Get("yaml"); tag != "" {
name, _, _ = strings.Cut(tag, ",")
} else {
name = strings.ToLower(name)
}
nameMap[name] = field.Name
}
for k, v := range config {
name, ok := nameMap[k]
if !ok {
log.Warn("no config named:", k)
continue
}
// 需要被写入的字段
fv := el.FieldByName(name)
if child, ok := v.(Config); ok { //处理值是递归情况map)
if fv.Kind() == reflect.Map {
if fv.IsNil() {
fv.Set(reflect.MakeMap(fv.Type()))
}
}
child.Unmarshal(fv)
} else {
assign(name, fv, reflect.ValueOf(v))
} }
} }
} }
// 覆盖配置 func (config *Config) IsMap() bool {
func (config Config) Assign(source Config) { _, ok := config.Value.(map[string]Config)
for k, v := range source {
switch m := config[k].(type) {
case Config:
switch vv := v.(type) {
case Config:
m.Assign(vv)
case map[string]any:
m.Assign(Config(vv))
}
default:
config[k] = v
}
}
}
// 合并配置,不覆盖
func (config Config) Merge(source Config) {
for k, v := range source {
if _, ok := config[k]; !ok {
switch m := config[k].(type) {
case Config:
m.Merge(v.(Config))
default:
if Global.LogLang == "zh" {
log.Debug("合并配置", k, ":", v)
} else {
log.Debug("merge", k, ":", v)
}
config[k] = v
}
} else {
log.Debug("exist", k)
}
}
}
func (config *Config) Set(key string, value any) {
if *config == nil {
*config = Config{strings.ToLower(key): value}
} else {
(*config)[strings.ToLower(key)] = value
}
}
func (config Config) Get(key string) (v any) {
v = config[strings.ToLower(key)]
return
}
func (config Config) Has(key string) (ok bool) {
_, ok = config[strings.ToLower(key)]
return
}
func (config Config) HasChild(key string) (ok bool) {
_, ok = config[strings.ToLower(key)].(Config)
return ok return ok
} }
func (config Config) GetChild(key string) Config { func (config *Config) Get(key string) (v *Config) {
if v, ok := config[strings.ToLower(key)]; ok && v != nil { if config.propsMap == nil {
return v.(Config) config.propsMap = make(map[string]*Config)
}
if v, ok := config.propsMap[key]; ok {
return v
} else {
v = &Config{
name: key,
}
config.propsMap[key] = v
config.props = append(config.props, v)
return v
} }
return nil
} }
func Struct2Config(s any, prefix ...string) (config Config) { func (config Config) Has(key string) (ok bool) {
config = make(Config) if config.propsMap == nil {
return false
}
_, ok = config.propsMap[strings.ToLower(key)]
return ok
}
func (config *Config) MarshalJSON() ([]byte, error) {
if config.props == nil {
return json.Marshal(config.Value)
}
return json.Marshal(config.props)
}
// Parse 第一步读取配置结构体的默认值
func (config *Config) Parse(s any, prefix ...string) {
var t reflect.Type var t reflect.Type
var v reflect.Value var v reflect.Value
if vv, ok := s.(reflect.Value); ok { if vv, ok := s.(reflect.Value); ok {
t, v = vv.Type(), vv t, v = vv.Type(), vv
} else { } else {
t, v = reflect.TypeOf(s), reflect.ValueOf(s) t, v = reflect.TypeOf(s), reflect.ValueOf(s)
}
if t.Kind() == reflect.Pointer { if t.Kind() == reflect.Pointer {
t, v = t.Elem(), v.Elem() t, v = t.Elem(), v.Elem()
} }
config.Ptr = v
config.Default = v.Interface()
config.Value = v.Interface()
if len(prefix) > 0 { // 读取环境变量
envKey := strings.Join(prefix, "_")
if envValue := os.Getenv(envKey); envValue != "" {
yaml.Unmarshal([]byte(fmt.Sprintf("env: %s", envValue)), config)
config.Value = config.Env
config.Ptr.Set(reflect.ValueOf(config.Env))
} }
}
if t.Kind() == reflect.Struct {
for i, j := 0, t.NumField(); i < j; i++ { for i, j := 0, t.NumField(); i < j; i++ {
ft, fv := t.Field(i), v.Field(i) ft, fv := t.Field(i), v.Field(i)
if !ft.IsExported() { if !ft.IsExported() {
@@ -208,35 +136,335 @@ func Struct2Config(s any, prefix ...string) (config Config) {
} }
name, _, _ = strings.Cut(tag, ",") name, _, _ = strings.Cut(tag, ",")
} }
var envPath []string prop := config.Get(name)
if len(prefix) > 0 { prop.Parse(fv, append(prefix, strings.ToUpper(ft.Name))...)
envPath = append(prefix, strings.ToUpper(ft.Name)) prop.tag = ft.Tag
envKey := strings.Join(envPath, "_") for _, kv := range strings.Split(ft.Tag.Get("enum"), ",") {
if envValue := os.Getenv(envKey); envValue != "" { kvs := strings.Split(kv, ":")
yaml.Unmarshal([]byte(fmt.Sprintf("%s: %s", name, envValue)), config) if len(kvs) != 2 {
assign(envKey, fv, reflect.ValueOf(config[name])) continue
config[name] = fv.Interface() }
prop.Enum = append(prop.Enum, struct {
Label string `json:"label"`
Value string `json:"value"`
}{
Label: strings.TrimSpace(kvs[1]),
Value: strings.TrimSpace(kvs[0]),
})
}
}
}
}
// ParseDefaultYaml 第二步读取全局配置
func (config *Config) ParseGlobal(g *Config) {
config.Global = g
if config.propsMap != nil {
for k, v := range config.propsMap {
v.ParseGlobal(g.Get(k))
}
} else {
config.Value = g.Value
}
}
// ParseDefaultYaml 第三步读取内嵌默认配置
func (config *Config) ParseDefaultYaml(defaultYaml map[string]any) {
if defaultYaml == nil {
return return
} }
for k, v := range defaultYaml {
if config.Has(k) {
if prop := config.Get(k); prop.props != nil {
if v != nil {
prop.ParseDefaultYaml(v.(map[string]any))
} }
switch ft.Type.Kind() { } else {
case reflect.Struct: dv := prop.assign(k, v)
config[name] = Struct2Config(fv, envPath...) prop.Default = dv.Interface()
default: if prop.Env == nil {
reflect.ValueOf(config).SetMapIndex(reflect.ValueOf(name), fv) prop.Value = dv.Interface()
prop.Ptr.Set(dv)
} }
} }
}
}
}
// ParseFile 第四步读取用户配置文件
func (config *Config) ParseUserFile(conf map[string]any) {
if conf == nil {
return
}
config.File = conf
for k, v := range conf {
if config.Has(k) {
if prop := config.Get(k); prop.props != nil {
if v != nil {
prop.ParseUserFile(v.(map[string]any))
}
} else {
fv := prop.assign(k, v)
prop.File = fv.Interface()
if prop.Env == nil {
prop.Value = fv.Interface()
prop.Ptr.Set(fv)
}
}
}
}
}
// ParseModifyFile 第五步读取动态修改配置文件
func (config *Config) ParseModifyFile(conf map[string]any) {
if conf == nil {
return
}
config.Modify = conf
for k, v := range conf {
if config.Has(k) {
if prop := config.Get(k); prop.props != nil {
if v != nil {
prop.ParseModifyFile(v.(map[string]any))
}
} else {
mv := prop.assign(k, v)
prop.Modify = mv.Interface()
prop.Value = mv.Interface()
prop.Ptr.Set(mv)
}
}
}
}
func (config *Config) GetMap() map[string]any {
m := make(map[string]any)
for k, v := range config.propsMap {
if v.props != nil {
if vv := v.GetMap(); vv != nil {
m[k] = vv
}
} else if v.Value != nil {
m[k] = v.Value
}
}
if len(m) > 0 {
return m
}
return nil
}
func (config *Config) schema(index int) (r any) {
defer func() {
err := recover()
if err != nil {
log.Error(err)
}
}()
if config.props != nil {
r := Card{
Type: "void",
Component: "Card",
Properties: make(map[string]any),
Index: index,
}
r.ComponentProps = map[string]any{
"title": config.name,
}
for i, v := range config.props {
r.Properties[v.name] = v.schema(i)
}
return r
} else {
p := Property{
Title: config.name,
Default: config.Value,
DecoratorProps: map[string]any{
"tooltip": config.tag.Get("desc"),
},
ComponentProps: map[string]any{},
Decorator: "FormItem",
Index: index,
}
if config.Modify != nil {
p.Description = "已动态修改"
} else if config.Env != nil {
p.Description = "使用环境变量中的值"
} else if config.File != nil {
p.Description = "使用配置文件中的值"
} else if config.Global != nil {
p.Description = "已使用全局配置中的值"
}
p.Enum = config.Enum
if config.Ptr.Type() == durationType {
p.Type = "string"
p.Component = "Input"
str := config.Value.(time.Duration).String()
p.ComponentProps = map[string]any{
"placeholder": str,
}
p.Default = str
p.DecoratorProps["addonAfter"] = "时间,单位s,m,h,d例如100ms, 10s, 4m, 1h"
} else {
switch config.Ptr.Kind() {
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64, reflect.Float32, reflect.Float64:
p.Type = "number"
p.Component = "NumberPicker"
p.ComponentProps = map[string]any{
"placeholder": config.Value,
}
case reflect.Bool:
p.Type = "boolean"
p.Component = "Switch"
case reflect.String:
p.Type = "string"
p.Component = "Input"
p.ComponentProps = map[string]any{
"placeholder": config.Value,
}
case reflect.Slice:
p.Type = "array"
p.Component = "Input"
p.ComponentProps = map[string]any{
"placeholder": config.Value,
}
p.DecoratorProps["addonAfter"] = "数组,每个元素用逗号分隔"
case reflect.Map:
var children []struct {
Key string `json:"mkey"`
Value any `json:"mvalue"`
}
p := Property{
Type: "array",
Component: "ArrayTable",
Decorator: "FormItem",
Properties: map[string]any{
"addition": map[string]string{
"type": "void",
"title": "添加",
"x-component": "ArrayTable.Addition",
},
},
Index: index,
Title: config.name,
Items: &Object{
Type: "object",
Properties: map[string]any{
"c1": Card{
Type: "void",
Component: "ArrayTable.Column",
ComponentProps: map[string]any{
"title": config.tag.Get("key"),
"width": 300,
},
Properties: map[string]any{
"mkey": Property{
Type: "string",
Decorator: "FormItem",
Component: "Input",
},
},
Index: 0,
},
"c2": Card{
Type: "void",
Component: "ArrayTable.Column",
ComponentProps: map[string]any{
"title": config.tag.Get("value"),
},
Properties: map[string]any{
"mvalue": Property{
Type: "string",
Decorator: "FormItem",
Component: "Input",
},
},
Index: 1,
},
"operator": Card{
Type: "void",
Component: "ArrayTable.Column",
ComponentProps: map[string]any{
"title": "操作",
},
Properties: map[string]any{
"remove": Card{
Type: "void",
Component: "ArrayTable.Remove",
},
},
Index: 2,
},
},
},
}
iter := config.Ptr.MapRange()
for iter.Next() {
children = append(children, struct {
Key string `json:"mkey"`
Value any `json:"mvalue"`
}{
Key: iter.Key().String(),
Value: iter.Value().Interface(),
})
}
p.Default = children
return p
}
}
if len(p.Enum) > 0 {
p.Component = "Radio.Group"
}
return p
}
}
func (config *Config) GetFormily() (r Formily) {
r.Form.LabelCol = 4
r.Form.WrapperCol = 20
r.Schema = Object{
Type: "object",
Properties: make(map[string]any),
}
for i, v := range config.props {
r.Schema.Properties[v.name] = v.schema(i)
}
return return
} }
func assign(k string, target reflect.Value, source reflect.Value) { // func (config *Config) GetModify() map[string]any {
ft := target.Type() // m := make(map[string]any)
if ft == durationType && target.CanSet() { // for k, v := range config.props {
// if v.props != nil {
// if vv := v.GetModify(); vv != nil {
// m[k] = vv
// }
// } else if v.Modify != nil {
// m[k] = v.Modify
// }
// }
// if len(m) > 0 {
// return m
// }
// return nil
// }
var regexPureNumber = regexp.MustCompile(`^\d+$`)
func (config *Config) assign(k string, v any) (target reflect.Value) {
ft := config.Ptr.Type()
source := reflect.ValueOf(v)
if ft == durationType {
target = reflect.New(ft).Elem()
if source.Type() == durationType { if source.Type() == durationType {
target.Set(source) target.Set(source)
} else if source.IsZero() || !source.IsValid() { } else if source.IsZero() || !source.IsValid() {
target.SetInt(0) target.SetInt(0)
} else if d, err := time.ParseDuration(source.String()); err == nil { } else {
timeStr := source.String()
if d, err := time.ParseDuration(timeStr); err == nil && !regexPureNumber.MatchString(timeStr) {
target.SetInt(int64(d)) target.SetInt(int64(d))
} else { } else {
if Global.LogLang == "zh" { if Global.LogLang == "zh" {
@@ -246,44 +474,60 @@ func assign(k string, target reflect.Value, source reflect.Value) {
} }
os.Exit(1) os.Exit(1)
} }
}
return return
} }
switch target.Kind() {
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: tmpStruct := reflect.StructOf([]reflect.StructField{
target.SetUint(uint64(source.Int())) {
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: Name: strings.ToUpper(k),
target.SetInt(source.Int()) Type: ft,
case reflect.Float32, reflect.Float64: },
if source.CanFloat() { })
target.SetFloat(source.Float()) tmpValue := reflect.New(tmpStruct)
} else { tmpByte, _ := yaml.Marshal(map[string]any{k: v})
target.SetFloat(float64(source.Int())) yaml.Unmarshal(tmpByte, tmpValue.Interface())
} return tmpValue.Elem().Field(0)
case reflect.Slice: // switch target.Kind() {
var s reflect.Value // case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
if source.Kind() == reflect.Slice { // target.SetUint(uint64(source.Int()))
l := source.Len() // case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
s = reflect.MakeSlice(ft, l, source.Cap()) // target.SetInt(source.Int())
for i := 0; i < l; i++ { // case reflect.Float32, reflect.Float64:
fv := source.Index(i) // if source.CanFloat() {
item := s.Index(i) // target.SetFloat(source.Float())
if child, ok := fv.Interface().(Config); ok { // } else {
item.Set(child.CreateElem(ft.Elem())) // target.SetFloat(float64(source.Int()))
} else if fv.Kind() == reflect.Interface { // }
item.Set(reflect.ValueOf(fv.Interface()).Convert(item.Type())) // case reflect.Map:
} else {
item.Set(fv) // case reflect.Slice:
} // var s reflect.Value
} // if source.Kind() == reflect.Slice {
} else { // l := source.Len()
//值是单值,但类型是数组,默认解析为一个元素的数组 // s = reflect.MakeSlice(ft, l, source.Cap())
s = reflect.MakeSlice(ft, 1, 1) // for i := 0; i < l; i++ {
s.Index(0).Set(source) // fv := source.Index(i)
} // item := s.Index(i)
target.Set(s) // if child, ok := fv.Interface().(map[string]any); ok {
default: // panic(child)
if source.IsValid() { // // item.Set(child.CreateElem(ft.Elem()))
target.Set(source.Convert(ft)) // } else if fv.Kind() == reflect.Interface {
} // item.Set(reflect.ValueOf(fv.Interface()).Convert(item.Type()))
} // } else {
// item.Set(fv)
// }
// }
// } else {
// //值是单值,但类型是数组,默认解析为一个元素的数组
// s = reflect.MakeSlice(ft, 1, 1)
// s.Index(0).Set(source)
// }
// target.Set(s)
// default:
// if source.IsValid() {
// target.Set(source.Convert(ft))
// }
// }
return
} }

40
config/formily.go Normal file
View File

@@ -0,0 +1,40 @@
package config
type Property struct {
Type string `json:"type"`
Title string `json:"title"`
Description string `json:"description"`
Enum []struct {
Label string `json:"label"`
Value string `json:"value"`
} `json:"enum,omitempty"`
Items *Object `json:"items,omitempty"`
Properties map[string]any `json:"properties,omitempty"`
Default any `json:"default,omitempty"`
Decorator string `json:"x-decorator"`
DecoratorProps map[string]any `json:"x-decorator-props,omitempty"`
Component string `json:"x-component"`
ComponentProps map[string]any `json:"x-component-props,omitempty"`
Index int `json:"x-index"`
}
type Card struct {
Type string `json:"type"`
Properties map[string]any `json:"properties,omitempty"`
Component string `json:"x-component"`
ComponentProps map[string]any `json:"x-component-props,omitempty"`
Index int `json:"x-index"`
}
type Object struct {
Type string `json:"type"`
Properties map[string]any `json:"properties"`
}
type Formily struct {
Form struct {
LabelCol int `json:"labelCol"`
WrapperCol int `json:"wrapperCol"`
} `json:"form"`
Schema Object `json:"schema"`
}

View File

@@ -16,16 +16,16 @@ var _ HTTPConfig = (*HTTP)(nil)
type Middleware func(string, http.Handler) http.Handler type Middleware func(string, http.Handler) http.Handler
type HTTP struct { type HTTP struct {
ListenAddr string ListenAddr string `desc:"监听地址"`
ListenAddrTLS string ListenAddrTLS string `desc:"监听地址HTTPS"`
CertFile string CertFile string `desc:"HTTPS证书文件"`
KeyFile string KeyFile string `desc:"HTTPS密钥文件"`
CORS bool `default:"true"` //是否自动添加CORS头 CORS bool `default:"true" desc:"是否自动添加CORS头"` //是否自动添加CORS头
UserName string UserName string `desc:"基本身份认证用户名"`
Password string Password string `desc:"基本身份认证密码"`
ReadTimeout time.Duration ReadTimeout time.Duration `desc:"读取超时"`
WriteTimeout time.Duration WriteTimeout time.Duration `desc:"写入超时"`
IdleTimeout time.Duration IdleTimeout time.Duration `desc:"空闲超时"`
mux *http.ServeMux mux *http.ServeMux
middlewares []Middleware middlewares []Middleware
} }

View File

@@ -30,21 +30,21 @@ type PushConfig interface {
} }
type Publish struct { type Publish struct {
PubAudio bool `default:"true"` PubAudio bool `default:"true" desc:"是否发布音频"`
PubVideo bool `default:"true"` PubVideo bool `default:"true" desc:"是否发布视频"`
InsertSEI bool // 是否启用SEI插入 InsertSEI bool `desc:"是否启用SEI插入"` // 是否启用SEI插入
KickExist bool // 是否踢掉已经存在的发布者 KickExist bool `desc:"是否踢掉已经存在的发布者"` // 是否踢掉已经存在的发布者
PublishTimeout time.Duration `default:"10s"` // 发布无数据超时 PublishTimeout time.Duration `default:"10s" desc:"发布无数据超时"` // 发布无数据超时
WaitCloseTimeout time.Duration // 延迟自动关闭(等待重连) WaitCloseTimeout time.Duration `desc:"延迟自动关闭(等待重连)"` // 延迟自动关闭(等待重连)
DelayCloseTimeout time.Duration // 延迟自动关闭(无订阅时) DelayCloseTimeout time.Duration `desc:"延迟自动关闭(无订阅时)"` // 延迟自动关闭(无订阅时)
IdleTimeout time.Duration // 空闲(无订阅)超时 IdleTimeout time.Duration `desc:"空闲(无订阅)超时"` // 空闲(无订阅)超时
PauseTimeout time.Duration `default:"30s"` // 暂停超时 PauseTimeout time.Duration `default:"30s" desc:"暂停超时时间"` // 暂停超时
BufferTime time.Duration // 缓冲长度(单位:秒)0代表取最近关键帧 BufferTime time.Duration `desc:"缓冲长度(单位:秒)0代表取最近关键帧"` // 缓冲长度(单位:秒)0代表取最近关键帧
SpeedLimit time.Duration `default:"500ms"` //速度限制最大等待时间 SpeedLimit time.Duration `default:"500ms" desc:"速度限制最大等待时间,0则不等待"` //速度限制最大等待时间
Key string // 发布鉴权key Key string `desc:"发布鉴权key"` // 发布鉴权key
SecretArgName string `default:"secret"` // 发布鉴权参数名 SecretArgName string `default:"secret" desc:"发布鉴权参数名"` // 发布鉴权参数名
ExpireArgName string `default:"expire"` // 发布鉴权失效时间参数名 ExpireArgName string `default:"expire" desc:"发布鉴权失效时间参数名"` // 发布鉴权失效时间参数名
RingSize string `default:"256-1024"` // 初始缓冲区大小 RingSize string `default:"256-1024" desc:"缓冲范围"` // 初始缓冲区大小
} }
func (c Publish) GetPublishConfig() Publish { func (c Publish) GetPublishConfig() Publish {
@@ -52,24 +52,24 @@ func (c Publish) GetPublishConfig() Publish {
} }
type Subscribe struct { type Subscribe struct {
SubAudio bool `default:"true"` SubAudio bool `default:"true" desc:"是否订阅音频"`
SubVideo bool `default:"true"` SubVideo bool `default:"true" desc:"是否订阅视频"`
SubVideoArgName string `default:"vts"` // 指定订阅的视频轨道参数名 SubVideoArgName string `default:"vts" desc:"定订阅的视频轨道参数名"` // 指定订阅的视频轨道参数名
SubAudioArgName string `default:"ats"` // 指定订阅的音频轨道参数名 SubAudioArgName string `default:"ats" desc:"指定订阅的音频轨道参数名"` // 指定订阅的音频轨道参数名
SubDataArgName string `default:"dts"` // 指定订阅的数据轨道参数名 SubDataArgName string `default:"dts" desc:"指定订阅的数据轨道参数名"` // 指定订阅的数据轨道参数名
SubModeArgName string `default:"mode"` // 指定订阅的模式参数名 SubModeArgName string `desc:"指定订阅的模式参数名"` // 指定订阅的模式参数名
SubAudioTracks []string // 指定订阅的音频轨道 SubAudioTracks []string `desc:"指定订阅的音频轨道"` // 指定订阅的音频轨道
SubVideoTracks []string // 指定订阅的视频轨道 SubVideoTracks []string `desc:"指定订阅的视频轨道"` // 指定订阅的视频轨道
SubDataTracks []string // 指定订阅的数据轨道 SubDataTracks []string `desc:"指定订阅的数据轨道"` // 指定订阅的数据轨道
SubMode int // 0实时模式追赶发布者进度在播放首屏后等待发布者的下一个关键帧然后跳到该帧。1、首屏后不进行追赶。2、从缓冲最大的关键帧开始播放也不追赶需要发布者配置缓存长度 SubMode int `desc:"订阅模式" enum:"0:实时模式,1:首屏后不进行追赶,2:从缓冲最大的关键帧开始播放"` // 0实时模式追赶发布者进度在播放首屏后等待发布者的下一个关键帧然后跳到该帧。1、首屏后不进行追赶。2、从缓冲最大的关键帧开始播放也不追赶需要发布者配置缓存长度
SyncMode int // 0采用时间戳同步1采用写入时间同步 SyncMode int `desc:"同步模式" enum:"0:采用时间戳同步,1:采用写入时间同步"` // 0采用时间戳同步1采用写入时间同步
IFrameOnly bool // 只要关键帧 IFrameOnly bool `desc:"只要关键帧"` // 只要关键帧
WaitTimeout time.Duration `default:"10s"` // 等待流超时 WaitTimeout time.Duration `default:"10s" desc:"等待流超时时间"` // 等待流超时
WriteBufferSize int `default:"0"` // 写缓冲大小 WriteBufferSize int `desc:"写缓冲大小"` // 写缓冲大小
Key string // 订阅鉴权key Key string `desc:"订阅鉴权key"` // 订阅鉴权key
SecretArgName string `default:"secret"` // 订阅鉴权参数名 SecretArgName string `default:"secret" desc:"订阅鉴权参数名"` // 订阅鉴权参数名
ExpireArgName string `default:"expire"` // 订阅鉴权失效时间参数名 ExpireArgName string `default:"expire" desc:"订阅鉴权失效时间参数名"` // 订阅鉴权失效时间参数名
Internal bool `default:"false"` // 是否内部订阅 Internal bool `default:"false" desc:"是否内部订阅"` // 是否内部订阅
} }
func (c *Subscribe) GetSubscribeConfig() *Subscribe { func (c *Subscribe) GetSubscribeConfig() *Subscribe {
@@ -181,10 +181,10 @@ func (p *Push) CheckPush(streamPath string) string {
} }
type Console struct { type Console struct {
Server string `default:"console.monibuca.com:44944"` //远程控制台地址 Server string `default:"console.monibuca.com:44944" desc:"远程控制台地址"` //远程控制台地址
Secret string //远程控制台密钥 Secret string `desc:"远程控制台密钥"` //远程控制台密钥
PublicAddr string //公网地址,提供远程控制台访问的地址,不配置的话使用自动识别的地址 PublicAddr string `desc:"远程控制台公网地址"` //公网地址,提供远程控制台访问的地址,不配置的话使用自动识别的地址
PublicAddrTLS string PublicAddrTLS string `desc:"远程控制台公网TLS地址"`
} }
type Engine struct { type Engine struct {
@@ -192,17 +192,17 @@ type Engine struct {
Subscribe Subscribe
HTTP HTTP
Console Console
EnableAVCC bool `default:"true"` //启用AVCC格式rtmp、http-flv协议使用 EnableAVCC bool `default:"true" desc:"启用AVCC格式rtmp、http-flv协议使用"` //启用AVCC格式rtmp、http-flv协议使用
EnableRTP bool `default:"true"` //启用RTP格式rtsp、webrtc等协议使用,已废弃,在 rtp 下面配置 EnableRTP bool `default:"true" desc:"启用RTP格式rtsp、webrtc等协议使用"` //启用RTP格式rtsp、webrtc等协议使用
EnableSubEvent bool `default:"true"` //启用订阅事件,禁用可以提高性能 EnableSubEvent bool `default:"true" desc:"启用订阅事件,禁用可以提高性能"` //启用订阅事件,禁用可以提高性能
EnableAuth bool `default:"true"` //启用鉴权 EnableAuth bool `default:"true" desc:"启用鉴权"` //启用鉴权
LogLang string `default:"zh"` //日志语言 LogLang string `default:"zh" desc:"日志语言" enum:"zh:中文,en:英文"` //日志语言
LogLevel string `default:"info"` //日志级别 LogLevel string `default:"info" enum:"trace:跟踪,debug:调试,info:信息,warn:警告,error:错误"` //日志级别
EventBusSize int `default:"10"` //事件总线大小 EventBusSize int `default:"10" desc:"事件总线大小"` //事件总线大小
PulseInterval time.Duration `default:"5s"` //心跳事件间隔 PulseInterval time.Duration `default:"5s" desc:"心跳事件间隔"` //心跳事件间隔
DisableAll bool `default:"false"` //禁用所有插件 DisableAll bool `default:"false" desc:"禁用所有插件"` //禁用所有插件
RTPReorderBufferLen int `default:"50"` //RTP重排序缓冲区长度 RTPReorderBufferLen int `default:"50" desc:"RTP重排序缓冲区长度"` //RTP重排序缓冲区长度
PoolSize int //内存池大小 PoolSize int `desc:"内存池大小"` //内存池大小
enableReport bool `default:"false"` //启用报告,用于统计和监控 enableReport bool `default:"false"` //启用报告,用于统计和监控
reportStream quic.Stream // console server connection reportStream quic.Stream // console server connection
instanceId string // instance id 来自console instanceId string // instance id 来自console

33
http.go
View File

@@ -88,18 +88,20 @@ func (conf *GlobalConfig) API_getConfig(w http.ResponseWriter, r *http.Request)
} }
var data any var data any
if q.Get("yaml") != "" { if q.Get("yaml") != "" {
mm, err := yaml.Marshal(p.RawConfig) data = struct {
File any
Modified any
Merged any
}{
p.RawConfig.File, p.RawConfig.Modify, p.RawConfig.GetMap(),
}
mm, err := yaml.Marshal(data)
if err != nil { if err != nil {
mm = []byte("") mm = []byte("")
} }
data = struct { data = mm
File string } else if q.Get("formily") != "" {
Modified string data = p.RawConfig.GetFormily()
Merged string
}{
p.Yaml, p.modifiedYaml, string(mm),
}
} else { } else {
data = p.RawConfig data = p.RawConfig
} }
@@ -121,19 +123,16 @@ func (conf *GlobalConfig) API_modifyConfig(w http.ResponseWriter, r *http.Reques
} else { } else {
p = Engine p = Engine
} }
var modified map[string]any
if q.Get("yaml") != "" { if q.Get("yaml") != "" {
err = yaml.NewDecoder(r.Body).Decode(&p.Modified) err = yaml.NewDecoder(r.Body).Decode(&modified)
} else { } else {
err = json.NewDecoder(r.Body).Decode(&p.Modified) err = json.NewDecoder(r.Body).Decode(&modified)
} }
if err != nil { if err != nil {
util.ReturnError(util.APIErrorDecode, err.Error(), w, r) util.ReturnError(util.APIErrorDecode, err.Error(), w, r)
} else if err = p.Save(); err == nil { } else if err = p.Save(); err == nil {
p.RawConfig.Assign(p.Modified) p.RawConfig.ParseModifyFile(modified)
out, err := yaml.Marshal(p.Modified)
if err == nil {
p.modifiedYaml = string(out)
}
util.ReturnOK(w, r) util.ReturnOK(w, r)
} else { } else {
util.ReturnError(util.APIErrorSave, err.Error(), w, r) util.ReturnError(util.APIErrorSave, err.Error(), w, r)
@@ -154,7 +153,7 @@ func (conf *GlobalConfig) API_updateConfig(w http.ResponseWriter, r *http.Reques
} else { } else {
p = Engine p = Engine
} }
p.Update(p.Modified) p.Update(&p.RawConfig)
util.ReturnOK(w, r) util.ReturnOK(w, r)
} }

45
main.go
View File

@@ -21,7 +21,6 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"go.uber.org/zap/zapcore" "go.uber.org/zap/zapcore"
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
"m7s.live/engine/v4/config"
"m7s.live/engine/v4/lang" "m7s.live/engine/v4/lang"
"m7s.live/engine/v4/log" "m7s.live/engine/v4/log"
"m7s.live/engine/v4/util" "m7s.live/engine/v4/util"
@@ -62,7 +61,7 @@ func Run(ctx context.Context, conf any) (err error) {
SysInfo.StartTime = time.Now() SysInfo.StartTime = time.Now()
SysInfo.Version = Engine.Version SysInfo.Version = Engine.Version
Engine.Context = ctx Engine.Context = ctx
var cg config.Config var cg map[string]map[string]any
switch v := conf.(type) { switch v := conf.(type) {
case string: case string:
if _, err = os.Stat(v); err != nil { if _, err = os.Stat(v); err != nil {
@@ -73,7 +72,7 @@ func Run(ctx context.Context, conf any) (err error) {
} }
case []byte: case []byte:
ConfigRaw = v ConfigRaw = v
case config.Config: case map[string]map[string]any:
cg = v cg = v
} }
@@ -91,13 +90,9 @@ func Run(ctx context.Context, conf any) (err error) {
log.Error("parsing yml error:", err) log.Error("parsing yml error:", err)
} }
} }
Engine.RawConfig.Parse(&EngineConfig.Engine, "GLOBAL")
if cg != nil { if cg != nil {
Engine.RawConfig = cg.GetChild("global") Engine.RawConfig.ParseUserFile(cg["global"])
if b, err := yaml.Marshal(Engine.RawConfig); err == nil {
Engine.Yaml = string(b)
}
//将配置信息同步到结构体
Engine.RawConfig.Unmarshal(&EngineConfig.Engine)
} }
var logger log.Logger var logger log.Logger
log.LocaleLogger = logger.Lang(lang.Get(EngineConfig.LogLang)) log.LocaleLogger = logger.Lang(lang.Get(EngineConfig.LogLang))
@@ -114,8 +109,7 @@ func Run(ctx context.Context, conf any) (err error) {
} }
Engine.Logger = log.LocaleLogger.Named("engine") Engine.Logger = log.LocaleLogger.Named("engine")
// 使得RawConfig具备全量配置信息用于合并到插件配置中
Engine.RawConfig = config.Struct2Config(&EngineConfig.Engine, "GLOBAL")
Engine.assign() Engine.assign()
Engine.Logger.Debug("", zap.Any("config", EngineConfig)) Engine.Logger.Debug("", zap.Any("config", EngineConfig))
util.PoolSize = EngineConfig.PoolSize util.PoolSize = EngineConfig.PoolSize
@@ -129,22 +123,37 @@ func Run(ctx context.Context, conf any) (err error) {
continue continue
} }
plugin.Info("initialize", zap.String("version", plugin.Version)) plugin.Info("initialize", zap.String("version", plugin.Version))
userConfig := cg.GetChild(plugin.Name)
if userConfig != nil { plugin.RawConfig.Parse(plugin.Config, strings.ToUpper(plugin.Name))
if b, err := yaml.Marshal(userConfig); err == nil { for _, fname := range MergeConfigs {
plugin.Yaml = string(b) if name := strings.ToLower(fname); plugin.RawConfig.Has(name) {
plugin.RawConfig.Get(name).ParseGlobal(Engine.RawConfig.Get(name))
} }
} }
var userConfig map[string]any
if defaultYaml := reflect.ValueOf(plugin.Config).Elem().FieldByName("DefaultYaml"); defaultYaml.IsValid() { if defaultYaml := reflect.ValueOf(plugin.Config).Elem().FieldByName("DefaultYaml"); defaultYaml.IsValid() {
if err := yaml.Unmarshal([]byte(defaultYaml.String()), &plugin.RawConfig); err != nil { if err := yaml.Unmarshal([]byte(defaultYaml.String()), &userConfig); err != nil {
log.Error("parsing default config error:", err) log.Error("parsing default config error:", err)
} else {
plugin.RawConfig.ParseDefaultYaml(userConfig)
} }
} }
if plugin.Yaml != "" { userConfig = cg[strings.ToLower(plugin.Name)]
yaml.Unmarshal([]byte(plugin.Yaml), &plugin.RawConfig) plugin.RawConfig.ParseUserFile(userConfig)
if EngineConfig.DisableAll {
plugin.Disabled = true
} }
if userConfig["enable"] == false {
plugin.Disabled = true
} else if userConfig["enable"] == true {
plugin.Disabled = false
}
if plugin.Disabled {
plugin.Warn("plugin disabled")
} else {
plugin.assign() plugin.assign()
} }
}
UUID := uuid.NewString() UUID := uuid.NewString()
contentBuf := bytes.NewBuffer(nil) contentBuf := bytes.NewBuffer(nil)

104
plugin.go
View File

@@ -4,7 +4,6 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"io"
"net/http" "net/http"
"os" "os"
"path/filepath" "path/filepath"
@@ -52,7 +51,7 @@ func InstallPlugin(config config.Plugin) *Plugin {
return plugin return plugin
} }
type FirstConfig config.Config type FirstConfig *config.Config
type DefaultYaml string type DefaultYaml string
// Plugin 插件信息 // Plugin 插件信息
@@ -62,10 +61,7 @@ type Plugin struct {
Name string //插件名称 Name string //插件名称
Config config.Plugin `json:"-" yaml:"-"` //类型化的插件配置 Config config.Plugin `json:"-" yaml:"-"` //类型化的插件配置
Version string //插件版本 Version string //插件版本
Yaml string //配置文件中的配置项
modifiedYaml string //修改过的配置的yaml文件内容
RawConfig config.Config //最终合并后的配置的map形式方便查询 RawConfig config.Config //最终合并后的配置的map形式方便查询
Modified config.Config //修改过的配置项
*log.Logger `json:"-" yaml:"-"` *log.Logger `json:"-" yaml:"-"`
saveTimer *time.Timer //用于保存的时候的延迟,防抖 saveTimer *time.Timer //用于保存的时候的延迟,防抖
Disabled bool Disabled bool
@@ -104,73 +100,20 @@ func (opt *Plugin) assign() {
f, err := os.Open(opt.settingPath()) f, err := os.Open(opt.settingPath())
defer f.Close() defer f.Close()
if err == nil { if err == nil {
var b []byte var modifyConfig map[string]any
b, err = io.ReadAll(f) err = yaml.NewDecoder(f).Decode(&modifyConfig)
if err == nil { opt.RawConfig.ParseModifyFile(modifyConfig)
opt.modifiedYaml = string(b)
if err = yaml.Unmarshal(b, &opt.Modified); err == nil {
err = yaml.Unmarshal(b, &opt.RawConfig)
}
}
if err != nil {
opt.Warn("assign config failed", zap.Error(err))
}
}
if opt == Engine {
opt.registerHandler()
return
}
if EngineConfig.DisableAll {
opt.Disabled = true
}
if opt.RawConfig == nil {
opt.RawConfig = config.Config{}
} else if opt.RawConfig["enable"] == false {
opt.Disabled = true
} else if opt.RawConfig["enable"] == true {
opt.Disabled = false
//移除这个属性防止反序列化报错
delete(opt.RawConfig, "enable")
}
if opt.Disabled {
opt.Warn("plugin disabled")
return
}
t := reflect.TypeOf(opt.Config).Elem()
// 用全局配置覆盖没有设置的配置
for _, fname := range MergeConfigs {
if _, ok := t.FieldByName(fname); ok {
if v, ok := Engine.RawConfig[strings.ToLower(fname)]; ok {
if !opt.RawConfig.Has(fname) {
opt.RawConfig.Set(fname, v)
} else if opt.RawConfig.HasChild(fname) {
opt.RawConfig.GetChild(fname).Merge(Engine.RawConfig.GetChild(fname))
}
}
}
} }
opt.registerHandler() opt.registerHandler()
if opt != Engine {
opt.run() opt.run()
} }
}
func (opt *Plugin) run() { func (opt *Plugin) run() {
opt.Context, opt.CancelFunc = context.WithCancel(Engine) opt.Context, opt.CancelFunc = context.WithCancel(Engine)
opt.RawConfig.Unmarshal(opt.Config) opt.Config.OnEvent(FirstConfig(&opt.RawConfig))
opt.RawConfig = config.Struct2Config(opt.Config, strings.ToUpper(opt.Name))
// var buffer bytes.Buffer
// err := yaml.NewEncoder(&buffer).Encode(opt.Config)
// if err != nil {
// panic(err)
// }
// err = yaml.NewDecoder(&buffer).Decode(&opt.RawConfig)
// if err != nil {
// panic(err)
// }
opt.Config.OnEvent(FirstConfig(opt.RawConfig))
delete(opt.RawConfig, "defaultyaml")
opt.Debug("config", zap.Any("config", opt.Config)) opt.Debug("config", zap.Any("config", opt.Config))
// opt.RawConfig = config.Struct2Config(opt.Config)
if conf, ok := opt.Config.(config.HTTPConfig); ok { if conf, ok := opt.Config.(config.HTTPConfig); ok {
go conf.Listen(opt) go conf.Listen(opt)
} }
@@ -180,8 +123,7 @@ func (opt *Plugin) run() {
} }
// Update 热更新配置 // Update 热更新配置
func (opt *Plugin) Update(conf config.Config) { func (opt *Plugin) Update(conf *config.Config) {
conf.Unmarshal(opt.Config)
opt.Config.OnEvent(conf) opt.Config.OnEvent(conf)
} }
@@ -198,22 +140,6 @@ func (opt *Plugin) registerHandler() {
switch handler := v.Method(i).Interface().(type) { switch handler := v.Method(i).Interface().(type) {
case func(http.ResponseWriter, *http.Request): case func(http.ResponseWriter, *http.Request):
opt.handle(patten, http.HandlerFunc(handler)) opt.handle(patten, http.HandlerFunc(handler))
// case func(*http.Request) (int, string, any):
// opt.handle(patten, http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
// code, msg, data := handler(r)
// switch returnMode {
// case "json":
// rw.Header().Set("Content-Type", "application/json")
// rw.WriteHeader(http.StatusOK)
// json.NewEncoder(rw).Encode(map[string]interface{}{
// "code": code,
// "msg": msg,
// "data": data,
// })
// default:
// http.Error(rw, msg, code)
// }
// }))
} }
} }
} }
@@ -231,7 +157,7 @@ func (opt *Plugin) Save() error {
file, err := os.OpenFile(opt.settingPath(), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666) file, err := os.OpenFile(opt.settingPath(), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666)
if err == nil { if err == nil {
defer file.Close() defer file.Close()
err = yaml.NewEncoder(file).Encode(opt.Modified) err = yaml.NewEncoder(file).Encode(opt.RawConfig.Modify)
} }
if err == nil { if err == nil {
opt.Info("config saved") opt.Info("config saved")
@@ -292,6 +218,7 @@ func (opt *Plugin) AssignSubConfig(suber *Subscriber) {
suber.ID = fmt.Sprintf("%d", uintptr(unsafe.Pointer(suber))) suber.ID = fmt.Sprintf("%d", uintptr(unsafe.Pointer(suber)))
} }
} }
// Subscribe 订阅一个流,如果流不存在则创建一个等待流 // Subscribe 订阅一个流,如果流不存在则创建一个等待流
func (opt *Plugin) Subscribe(streamPath string, sub ISubscriber) error { func (opt *Plugin) Subscribe(streamPath string, sub ISubscriber) error {
suber := sub.GetSubscriber() suber := sub.GetSubscriber()
@@ -335,14 +262,12 @@ func (opt *Plugin) Pull(streamPath string, url string, puller IPuller, save int)
switch save { switch save {
case 1: case 1:
pullConf.AddPullOnStart(streamPath, url) pullConf.AddPullOnStart(streamPath, url)
opt.RawConfig.Get("pull").Get("pullonstart").Modify = pullConf.PullOnStart
case 2: case 2:
pullConf.AddPullOnSub(streamPath, url) pullConf.AddPullOnSub(streamPath, url)
opt.RawConfig.Get("pull").Get("pullonsub").Modify = pullConf.PullOnSub
} }
if save > 0 { if save > 0 {
if opt.Modified == nil {
opt.Modified = make(config.Config)
}
opt.Modified["pull"] = config.Struct2Config(pullConf)
if err = opt.Save(); err != nil { if err = opt.Save(); err != nil {
opt.Error("save faild", zap.Error(err)) opt.Error("save faild", zap.Error(err))
} }
@@ -372,10 +297,7 @@ func (opt *Plugin) Push(streamPath string, url string, pusher IPusher, save bool
go pusher.startPush(pusher) go pusher.startPush(pusher)
if save { if save {
pushConfig.AddPush(url, streamPath) pushConfig.AddPush(url, streamPath)
if opt.Modified == nil { opt.RawConfig.Get("push").Get("pushlist").Modify = pushConfig.PushList
opt.Modified = make(config.Config)
}
opt.Modified["push"] = config.Struct2Config(pushConfig)
if err = opt.Save(); err != nil { if err = opt.Save(); err != nil {
opt.Error("save faild", zap.Error(err)) opt.Error("save faild", zap.Error(err))
} }