This commit is contained in:
langhuihui
2024-03-19 10:04:56 +08:00
parent 9b79fc44f2
commit b2bce81e55
20 changed files with 1391 additions and 28 deletions

View File

@@ -2,11 +2,13 @@ package main
import (
"context"
"time"
m7s "m7s.live/monibuca/v5"
_ "m7s.live/monibuca/v5/example/plugin-demo"
"m7s.live/m7s/v5"
_ "m7s.live/m7s/v5/plugin/demo"
)
func main() {
m7s.Run(context.Background())
ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10))
m7s.Run(ctx)
}

22
go.mod
View File

@@ -1,3 +1,21 @@
module m7s.live/monibuca/v5
module m7s.live/m7s/v5
go 1.22
go 1.22
require github.com/quic-go/quic-go v0.42.0
require (
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 // indirect
github.com/logrusorgru/aurora/v4 v4.0.0
github.com/onsi/ginkgo/v2 v2.9.5 // indirect
go.uber.org/mock v0.4.0 // indirect
golang.org/x/crypto v0.4.0 // indirect
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db // indirect
golang.org/x/mod v0.11.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sync v0.6.0
golang.org/x/sys v0.8.0 // indirect
golang.org/x/tools v0.9.1 // indirect
gopkg.in/yaml.v3 v3.0.1
)

40
go.sum Normal file
View File

@@ -0,0 +1,40 @@
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI=
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls=
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE=
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/logrusorgru/aurora/v4 v4.0.0 h1:sRjfPpun/63iADiSvGGjgA1cAYegEWMPCJdUpJYn9JA=
github.com/logrusorgru/aurora/v4 v4.0.0/go.mod h1:lP0iIa2nrnT/qoFXcOZSrZQpJ1o6n2CUf/hyHi2Q4ZQ=
github.com/onsi/ginkgo/v2 v2.9.5 h1:+6Hr4uxzP4XIUyAkg61dWBw8lb/gc4/X5luuxN/EC+Q=
github.com/onsi/ginkgo/v2 v2.9.5/go.mod h1:tvAoo1QUJwNEU2ITftXTpR7R1RbCzoZUOs3RonqW57k=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/quic-go/quic-go v0.42.0 h1:uSfdap0eveIl8KXnipv9K7nlwZ5IqLlYOpJ58u5utpM=
github.com/quic-go/quic-go v0.42.0/go.mod h1:132kz4kL3F9vxhW3CtQJLDVwcFe5wdWeJXXijhsO57M=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=
go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc=
golang.org/x/crypto v0.4.0 h1:UVQgzMY87xqpKNgb+kDsll2Igd33HszWHFLmpaRMq/8=
golang.org/x/crypto v0.4.0/go.mod h1:3quD/ATkf6oY+rnes5c3ExXTbLc8mueNue5/DoinL80=
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db h1:D/cFflL63o2KSLJIwjlcIt8PR064j/xsmdEJL/YvY/o=
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU=
golang.org/x/mod v0.11.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/tools v0.9.1 h1:8WMNJAz3zrtPmnYC7ISf5dEn3MT0gY7jBJfw27yrrLo=
golang.org/x/tools v0.9.1/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

345
pkg/config/config.go Normal file
View File

@@ -0,0 +1,345 @@
package config
import (
"encoding/json"
"fmt"
"log/slog"
"net"
"net/http"
"os"
"reflect"
"regexp"
"strings"
"time"
"github.com/quic-go/quic-go"
"gopkg.in/yaml.v3"
)
type Config struct {
Ptr reflect.Value //指向配置结构体值,优先级:动态修改值>环境变量>配置文件>defaultYaml>全局配置>默认值
Modify any //动态修改的值
Env any //环境变量中的值
File any //配置文件中的值
Global *Config //全局配置中的值,指针类型
Default any //默认值
Enum []struct {
Label string `json:"label"`
Value any `json:"value"`
}
name string // 小写
propsMap map[string]*Config
props []*Config
tag reflect.StructTag
}
var durationType = reflect.TypeOf(time.Duration(0))
var regexpType = reflect.TypeOf(Regexp{})
type Plugin interface {
// 可能的入参类型FirstConfig 第一次初始化配置Config 后续配置更新SE系列StateEvent流状态变化事件
OnEvent(any)
}
type TCPPlugin interface {
Plugin
ServeTCP(net.Conn)
}
type HTTPPlugin interface {
Plugin
http.Handler
}
type QuicPlugin interface {
Plugin
ServeQuic(quic.Connection)
}
func (config *Config) Range(f func(key string, value Config)) {
if m, ok := config.GetValue().(map[string]Config); ok {
for k, v := range m {
f(k, v)
}
}
}
func (config *Config) IsMap() bool {
_, ok := config.GetValue().(map[string]Config)
return ok
}
func (config *Config) Get(key string) (v *Config) {
if config.propsMap == nil {
config.propsMap = make(map[string]*Config)
}
if v, ok := config.propsMap[key]; ok {
return v
} else {
v = &Config{
name: key,
}
config.propsMap[key] = v
config.props = append(config.props, v)
return v
}
}
func (config Config) Has(key string) (ok bool) {
if config.propsMap == nil {
return false
}
_, ok = config.propsMap[strings.ToLower(key)]
return ok
}
func (config *Config) MarshalJSON() ([]byte, error) {
if config.propsMap == nil {
return json.Marshal(config.GetValue())
}
return json.Marshal(config.propsMap)
}
func (config *Config) GetValue() any {
return config.Ptr.Interface()
}
// Parse 第一步读取配置结构体的默认值
func (config *Config) Parse(s any, prefix ...string) {
var t reflect.Type
var v reflect.Value
if vv, ok := s.(reflect.Value); ok {
t, v = vv.Type(), vv
} else {
t, v = reflect.TypeOf(s), reflect.ValueOf(s)
}
if t.Kind() == reflect.Pointer {
t, v = t.Elem(), v.Elem()
}
config.Ptr = v
config.Default = v.Interface()
if len(prefix) > 0 { // 读取环境变量
envKey := strings.Join(prefix, "_")
if envValue := os.Getenv(envKey); envValue != "" {
envv := config.assign(strings.ToLower(prefix[0]), envValue)
config.Env = envv.Interface()
config.Ptr.Set(envv)
}
}
if t.Kind() == reflect.Struct && t != regexpType {
for i, j := 0, t.NumField(); i < j; i++ {
ft, fv := t.Field(i), v.Field(i)
if !ft.IsExported() {
continue
}
name := strings.ToLower(ft.Name)
if tag := ft.Tag.Get("yaml"); tag != "" {
if tag == "-" {
continue
}
name, _, _ = strings.Cut(tag, ",")
}
prop := config.Get(name)
prop.Parse(fv, append(prefix, strings.ToUpper(ft.Name))...)
prop.tag = ft.Tag
for _, kv := range strings.Split(ft.Tag.Get("enum"), ",") {
kvs := strings.Split(kv, ":")
if len(kvs) != 2 {
continue
}
var tmp struct {
Value any
}
yaml.Unmarshal([]byte(fmt.Sprintf("value: %s", strings.TrimSpace(kvs[0]))), &tmp)
prop.Enum = append(prop.Enum, struct {
Label string `json:"label"`
Value any `json:"value"`
}{
Label: strings.TrimSpace(kvs[1]),
Value: tmp.Value,
})
}
}
}
}
// ParseDefaultYaml 第二步读取全局配置
func (config *Config) ParseGlobal(g *Config) {
config.Global = g
if config.propsMap != nil {
for k, v := range config.propsMap {
v.ParseGlobal(g.Get(k))
}
} else {
config.Ptr.Set(g.Ptr)
}
}
// ParseDefaultYaml 第三步读取内嵌默认配置
func (config *Config) ParseDefaultYaml(defaultYaml map[string]any) {
if defaultYaml == nil {
return
}
for k, v := range defaultYaml {
if config.Has(k) {
if prop := config.Get(k); prop.props != nil {
if v != nil {
prop.ParseDefaultYaml(v.(map[string]any))
}
} else {
dv := prop.assign(k, v)
prop.Default = dv.Interface()
if prop.Env == nil {
prop.Ptr.Set(dv)
}
}
}
}
}
// ParseFile 第四步读取用户配置文件
func (config *Config) ParseUserFile(conf map[string]any) {
if conf == nil {
return
}
config.File = conf
for k, v := range conf {
if config.Has(k) {
if prop := config.Get(k); prop.props != nil {
if v != nil {
prop.ParseUserFile(v.(map[string]any))
}
} else {
fv := prop.assign(k, v)
prop.File = fv.Interface()
if prop.Env == nil {
prop.Ptr.Set(fv)
}
}
}
}
}
// ParseModifyFile 第五步读取动态修改配置文件
func (config *Config) ParseModifyFile(conf map[string]any) {
if conf == nil {
return
}
config.Modify = conf
for k, v := range conf {
if config.Has(k) {
if prop := config.Get(k); prop.props != nil {
if v != nil {
vmap := v.(map[string]any)
prop.ParseModifyFile(vmap)
if len(vmap) == 0 {
delete(conf, k)
}
}
} else {
mv := prop.assign(k, v)
v = mv.Interface()
vwm := prop.valueWithoutModify()
if equal(vwm, v) {
delete(conf, k)
if prop.Modify != nil {
prop.Modify = nil
prop.Ptr.Set(reflect.ValueOf(vwm))
}
continue
}
prop.Modify = v
prop.Ptr.Set(mv)
}
}
}
if len(conf) == 0 {
config.Modify = nil
}
}
func (config *Config) valueWithoutModify() any {
if config.Env != nil {
return config.Env
}
if config.File != nil {
return config.File
}
if config.Global != nil {
return config.Global.GetValue()
}
return config.Default
}
func equal(vwm, v any) bool {
ft := reflect.TypeOf(vwm)
switch ft {
case regexpType:
return vwm.(Regexp).String() == v.(Regexp).String()
default:
switch ft.Kind() {
case reflect.Slice, reflect.Array, reflect.Map:
return reflect.DeepEqual(vwm, v)
}
return vwm == v
}
}
func (config *Config) GetMap() map[string]any {
m := make(map[string]any)
for k, v := range config.propsMap {
if v.props != nil {
if vv := v.GetMap(); vv != nil {
m[k] = vv
}
} else if v.GetValue() != nil {
m[k] = v.GetValue()
}
}
if len(m) > 0 {
return m
}
return nil
}
var regexPureNumber = regexp.MustCompile(`^\d+$`)
func (config *Config) assign(k string, v any) (target reflect.Value) {
ft := config.Ptr.Type()
source := reflect.ValueOf(v)
switch ft {
case durationType:
target = reflect.New(ft).Elem()
if source.Type() == durationType {
target.Set(source)
} else if source.IsZero() || !source.IsValid() {
target.SetInt(0)
} else {
timeStr := source.String()
if d, err := time.ParseDuration(timeStr); err == nil && !regexPureNumber.MatchString(timeStr) {
target.SetInt(int64(d))
} else {
slog.Error("invalid duration value please add unit (s,m,h,d)eg: 100ms, 10s, 4m, 1h", "key", k, "value", source)
os.Exit(1)
}
}
case regexpType:
target = reflect.New(ft).Elem()
regexpStr := source.String()
target.Set(reflect.ValueOf(Regexp{regexp.MustCompile(regexpStr)}))
default:
tmpStruct := reflect.StructOf([]reflect.StructField{
{
Name: strings.ToUpper(k),
Type: ft,
},
})
tmpValue := reflect.New(tmpStruct)
tmpByte, _ := yaml.Marshal(map[string]any{k: v})
yaml.Unmarshal(tmpByte, tmpValue.Interface())
target = tmpValue.Elem().Field(0)
}
return
}

53
pkg/config/config_test.go Normal file
View File

@@ -0,0 +1,53 @@
package config
import (
"testing"
)
// TestModify 测试动态修改配置文件比较值是否修改修改后是否有Modify属性
func TestModify(t *testing.T) {
t.Run(t.Name(), func(t *testing.T) {
var defaultValue struct{
Subscribe
}
defaultValue.SubAudio = false
var conf Config
conf.Parse(&defaultValue)
conf.ParseModifyFile(map[string]any{
"subscribe": map[string]any{
"subaudio": false,
},
})
if conf.Modify != nil {
t.Fail()
}
conf.ParseModifyFile(map[string]any{
"subscribe": map[string]any{
"subaudio": true,
},
})
if conf.Modify == nil {
t.Fail()
}
})
}
// TestGlobal 测试全局配置
func TestGlobal(t *testing.T) {
t.Run(t.Name(), func(t *testing.T) {
var defaultValue struct{
Publish
}
var globalValue struct {
Publish
}
globalValue.Publish.KickExist = true
var conf Config
var globalConf Config
globalConf.Parse(&globalValue)
conf.Parse(&defaultValue)
conf.ParseGlobal(&globalConf)
if defaultValue.Publish.KickExist != true {
t.Fail()
}
})
}

243
pkg/config/formily.go Normal file
View File

@@ -0,0 +1,243 @@
package config
import (
"fmt"
"log/slog"
"reflect"
"strings"
"time"
)
type Property struct {
Type string `json:"type"`
Title string `json:"title"`
Description string `json:"description"`
Enum []struct {
Label string `json:"label"`
Value any `json:"value"`
} `json:"enum,omitempty"`
Items *Object `json:"items,omitempty"`
Properties map[string]any `json:"properties,omitempty"`
Default any `json:"default,omitempty"`
Decorator string `json:"x-decorator"`
DecoratorProps map[string]any `json:"x-decorator-props,omitempty"`
Component string `json:"x-component"`
ComponentProps map[string]any `json:"x-component-props,omitempty"`
Index int `json:"x-index"`
}
type Card struct {
Type string `json:"type"`
Properties map[string]any `json:"properties,omitempty"`
Component string `json:"x-component"`
ComponentProps map[string]any `json:"x-component-props,omitempty"`
Index int `json:"x-index"`
}
type Object struct {
Type string `json:"type"`
Properties map[string]any `json:"properties"`
}
func (config *Config) schema(index int) (r any) {
defer func() {
err := recover()
if err != nil {
slog.Error(err.(error).Error())
}
}()
if config.props != nil {
r := Card{
Type: "void",
Component: "Card",
Properties: make(map[string]any),
Index: index,
}
r.ComponentProps = map[string]any{
"title": config.name,
}
for i, v := range config.props {
if strings.HasPrefix(v.tag.Get("desc"), "废弃") {
continue
}
r.Properties[v.name] = v.schema(i)
}
return r
} else {
p := Property{
Title: config.name,
Default: config.GetValue(),
DecoratorProps: map[string]any{
"tooltip": config.tag.Get("desc"),
},
ComponentProps: map[string]any{},
Decorator: "FormItem",
Index: index,
}
if config.Modify != nil {
p.Description = "已动态修改"
} else if config.Env != nil {
p.Description = "使用环境变量中的值"
} else if config.File != nil {
p.Description = "使用配置文件中的值"
} else if config.Global != nil {
p.Description = "已使用全局配置中的值"
}
p.Enum = config.Enum
switch config.Ptr.Type() {
case regexpType:
p.Type = "string"
p.Component = "Input"
p.DecoratorProps["addonAfter"] = "正则表达式"
str := config.GetValue().(Regexp).String()
p.ComponentProps = map[string]any{
"placeholder": str,
}
p.Default = str
case durationType:
p.Type = "string"
p.Component = "Input"
str := config.GetValue().(time.Duration).String()
p.ComponentProps = map[string]any{
"placeholder": str,
}
p.Default = str
p.DecoratorProps["addonAfter"] = "时间,单位s,m,h,d例如100ms, 10s, 4m, 1h"
default:
switch config.Ptr.Kind() {
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64, reflect.Float32, reflect.Float64:
p.Type = "number"
p.Component = "InputNumber"
p.ComponentProps = map[string]any{
"placeholder": fmt.Sprintf("%v", config.GetValue()),
}
case reflect.Bool:
p.Type = "boolean"
p.Component = "Switch"
case reflect.String:
p.Type = "string"
p.Component = "Input"
p.ComponentProps = map[string]any{
"placeholder": config.GetValue(),
}
case reflect.Slice:
p.Type = "array"
p.Component = "Input"
p.ComponentProps = map[string]any{
"placeholder": config.GetValue(),
}
p.DecoratorProps["addonAfter"] = "数组,每个元素用逗号分隔"
case reflect.Map:
var children []struct {
Key string `json:"mkey"`
Value any `json:"mvalue"`
}
p := Property{
Type: "array",
Component: "ArrayTable",
Decorator: "FormItem",
Properties: map[string]any{
"addition": map[string]string{
"type": "void",
"title": "添加",
"x-component": "ArrayTable.Addition",
},
},
Index: index,
Title: config.name,
Items: &Object{
Type: "object",
Properties: map[string]any{
"c1": Card{
Type: "void",
Component: "ArrayTable.Column",
ComponentProps: map[string]any{
"title": config.tag.Get("key"),
"width": 300,
},
Properties: map[string]any{
"mkey": Property{
Type: "string",
Decorator: "FormItem",
Component: "Input",
},
},
Index: 0,
},
"c2": Card{
Type: "void",
Component: "ArrayTable.Column",
ComponentProps: map[string]any{
"title": config.tag.Get("value"),
},
Properties: map[string]any{
"mvalue": Property{
Type: "string",
Decorator: "FormItem",
Component: "Input",
},
},
Index: 1,
},
"operator": Card{
Type: "void",
Component: "ArrayTable.Column",
ComponentProps: map[string]any{
"title": "操作",
},
Properties: map[string]any{
"remove": Card{
Type: "void",
Component: "ArrayTable.Remove",
},
},
Index: 2,
},
},
},
}
iter := config.Ptr.MapRange()
for iter.Next() {
children = append(children, struct {
Key string `json:"mkey"`
Value any `json:"mvalue"`
}{
Key: iter.Key().String(),
Value: iter.Value().Interface(),
})
}
p.Default = children
return p
default:
}
}
if len(p.Enum) > 0 {
p.Component = "Radio.Group"
}
return p
}
}
func (config *Config) GetFormily() (r Object) {
var fromItems = make(map[string]any)
r.Type = "object"
r.Properties = map[string]any{
"layout": Card{
Type: "void",
Component: "FormLayout",
ComponentProps: map[string]any{
"labelCol": 4,
"wrapperCol": 20,
},
Properties: fromItems,
},
}
for i, v := range config.props {
if strings.HasPrefix(v.tag.Get("desc"), "废弃") {
continue
}
fromItems[v.name] = v.schema(i)
}
return
}

129
pkg/config/http.go Normal file
View File

@@ -0,0 +1,129 @@
package config
import (
"context"
"crypto/tls"
"log/slog"
"net/http"
"time"
"github.com/logrusorgru/aurora/v4"
"golang.org/x/sync/errgroup"
)
var _ HTTPConfig = (*HTTP)(nil)
type Middleware func(string, http.Handler) http.Handler
type HTTP struct {
ListenAddr string `desc:"监听地址"`
ListenAddrTLS string `desc:"监听地址HTTPS"`
CertFile string `desc:"HTTPS证书文件"`
KeyFile string `desc:"HTTPS密钥文件"`
CORS bool `default:"true" desc:"是否自动添加CORS头"` //是否自动添加CORS头
UserName string `desc:"基本身份认证用户名"`
Password string `desc:"基本身份认证密码"`
ReadTimeout time.Duration `desc:"读取超时"`
WriteTimeout time.Duration `desc:"写入超时"`
IdleTimeout time.Duration `desc:"空闲超时"`
mux *http.ServeMux
middlewares []Middleware
}
type HTTPConfig interface {
GetHTTPConfig() *HTTP
Listen(ctx context.Context) error
Handle(string, http.Handler)
Handler(*http.Request) (http.Handler, string)
AddMiddleware(Middleware)
}
func (config *HTTP) AddMiddleware(middleware Middleware) {
config.middlewares = append(config.middlewares, middleware)
}
func (config *HTTP) Handle(path string, f http.Handler) {
if config.mux == nil {
config.mux = http.NewServeMux()
}
if config.CORS {
// f = util.CORS(f)
}
if config.UserName != "" && config.Password != "" {
// f = util.BasicAuth(config.UserName, config.Password, f)
}
for _, middleware := range config.middlewares {
f = middleware(path, f)
}
config.mux.Handle(path, f)
}
func (config *HTTP) GetHTTPConfig() *HTTP {
return config
}
func (config *HTTP) Handler(r *http.Request) (h http.Handler, pattern string) {
return config.mux.Handler(r)
}
// ListenAddrs Listen http and https
func (config *HTTP) Listen(ctx context.Context) error {
if config.mux == nil {
return nil
}
var g errgroup.Group
if config.ListenAddrTLS != "" && (config == &Global.HTTP || config.ListenAddrTLS != Global.ListenAddrTLS) {
g.Go(func() error {
slog.Info("🌐 https listen at ", "addr", aurora.Blink(config.ListenAddrTLS))
cer, _ := tls.X509KeyPair(LocalCert, LocalKey)
var server = http.Server{
Addr: config.ListenAddrTLS,
ReadTimeout: config.ReadTimeout,
WriteTimeout: config.WriteTimeout,
IdleTimeout: config.IdleTimeout,
Handler: config.mux,
TLSConfig: &tls.Config{
Certificates: []tls.Certificate{cer},
CipherSuites: []uint16{
tls.TLS_AES_128_GCM_SHA256,
tls.TLS_CHACHA20_POLY1305_SHA256,
tls.TLS_AES_256_GCM_SHA384,
//tls.TLS_RSA_WITH_AES_128_CBC_SHA,
//tls.TLS_RSA_WITH_AES_256_CBC_SHA,
//tls.TLS_RSA_WITH_AES_128_GCM_SHA256,
//tls.TLS_RSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305,
tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305,
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256,
tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,
},
},
}
return server.ListenAndServeTLS(config.CertFile, config.KeyFile)
})
}
if config.ListenAddr != "" && (config == &Global.HTTP || config.ListenAddr != Global.ListenAddr) {
g.Go(func() error {
slog.Info("🌐 http listen at ", "addr", aurora.Blink(config.ListenAddr))
var server = http.Server{
Addr: config.ListenAddr,
ReadTimeout: config.ReadTimeout,
WriteTimeout: config.WriteTimeout,
IdleTimeout: config.IdleTimeout,
Handler: config.mux,
}
return server.ListenAndServe()
})
}
g.Go(func() error {
<-ctx.Done()
return ctx.Err()
})
return g.Wait()
}

View File

@@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEogIBAAKCAQEAn2zPOFL8i4EqVwoa7fzlGXXPaTxVoUthlGEuWP5jPZCRctAe
JNPxZT7vhj6PZsRi0jGJKD3KNosprSxN335KzZmm8FPxghRXEt/y7DvKQuuTXyst
eaQoTbZtdX3a2cXNC4aPbDeCCYhrthHrCo8GL07a3lV9FlKpcrmn+zep3+BXHDD4
QiG/6k4/AyZImKIi6de25riP7yIjJgENgXzQvEDRYj/Qkth+2uoMzvjb5dZvDzSF
syZ4haPdNv66UDMFVzsIQ7ZPSILVw91vdQn7yI7sCVdlQT1CgXkbXOHvyZGhVBQh
ikPEDBCQpvR8m45kXaterYtXDrW4AXb/VfbILQIDAQABAoIBAAm3jZfOwxLTsBt3
A7YuvF4FZrtw0H1QxWVQWQ1WkADwH1VipvuyqVt07w99H6txW96Y41W/EmSprcQe
165AGdoXO+wZCbbWe4oseTd523Whuy1JSe/ImCZIcLqBDcZJPpqtvG8poPToyjvi
MrPFjOh0Q3XauxGRvz89XqY6udFp1+jzt7pmUSk9AWi/CYNGWzU32iEgZZNAxhgg
T3f6HkitiyoyqQURuxMxUVdohjdavVQwDrRqwgLM4MkfIhT+1B52xNYhCVzxT5Eg
5VE533fumUULxCK0t8/aneSn4rJ/5+CU0WBDex9cVtDsXCncK9oCkw/PQZO+lzzt
LHzBytcCgYEA1tZISKL2gFb24aDaS/OcHfDkHrFRVQDHP+iWL8TMs5HHUGsM675z
JxYNTgKH4tgg19V7G1N6SfF9wluNYd+4m681DO4kHMdzQ5+RcuLM/p2T8o1hLjf6
CDTjMoSntyecouwbuqmibBzQw834+LR/0h7N12eLgV5MEjdKMLaktqcCgYEAvfiU
FAIePAwSlroJcmL1AVCOYIyoVK9Vg5hEukclDtrzy0KkSKoMsyGrM/ggPvMNL/vz
W6rJSFahoRb8XyDautZVj1oxjYrrVBOKSKLTKr8+ckfztDO6+aljxtxHdv9sRQg1
zQ+NSozgXJSW08Y464I6CPaS5GW1DGTLFAjCeQsCgYAh06WYAkjL1mWTCy+0C8yG
Dlrs1kCXIMM+tdGH/fW5RHfcmq5zJA6fleJMaSuaNSuesFds6wzzPZnuk1nEkmRP
5xt0SL7Y5TKp8CMHstxSLt+PrmEh1OCCkElBuA9sUEligciv8GvJmBPq8LCGAG2r
2PvSMdSObxmNOLVuzCNNOwKBgAo+c02454R5ai8yjPvcFjYh7+uI6jLW2ZelCF+7
ImZwrCDT0SQR92lZcW/1+1cpqBZkUbUpunzqHwEeyjEfBmx4zlhSlsV5LkN0YkqU
bSqq8WUcOCoJeBWqarT4f+oMz/vQ+4W5Rvc0LY0QfimhUMRyW0rMcRNb4K1wafsE
legNAoGADESfqKfuuL2C4+lmwaPb/7K3yirOEaZECKgc5F9YFRFEJ6Wic+8U4Yvn
89cvY8ye1RM4ZDe1GtuzlgRw11kXc65QtjEnj1j8EqESu+3EcZnq/wOzFFbeELFn
kTwlRfbRHELksH04OAphQwa/BFTXPni+zv0tFYBkj0RyXAVumBQ=
-----END RSA PRIVATE KEY-----

View File

@@ -0,0 +1,66 @@
-----BEGIN CERTIFICATE-----
MIIGbzCCBNegAwIBAgIQUnjZ1U6EvGqTYJGEoD5X7zANBgkqhkiG9w0BAQwFADBZ
MQswCQYDVQQGEwJDTjElMCMGA1UEChMcVHJ1c3RBc2lhIFRlY2hub2xvZ2llcywg
SW5jLjEjMCEGA1UEAxMaVHJ1c3RBc2lhIFJTQSBEViBUTFMgQ0EgRzIwHhcNMjMw
MjIyMDAwMDAwWhcNMjQwMjIyMjM1OTU5WjAdMRswGQYDVQQDExJsb2NhbC5tb25p
YnVjYS5jb20wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCfbM84UvyL
gSpXChrt/OUZdc9pPFWhS2GUYS5Y/mM9kJFy0B4k0/FlPu+GPo9mxGLSMYkoPco2
iymtLE3ffkrNmabwU/GCFFcS3/LsO8pC65NfKy15pChNtm11fdrZxc0Lho9sN4IJ
iGu2EesKjwYvTtreVX0WUqlyuaf7N6nf4FccMPhCIb/qTj8DJkiYoiLp17bmuI/v
IiMmAQ2BfNC8QNFiP9CS2H7a6gzO+Nvl1m8PNIWzJniFo902/rpQMwVXOwhDtk9I
gtXD3W91CfvIjuwJV2VBPUKBeRtc4e/JkaFUFCGKQ8QMEJCm9HybjmRdq16ti1cO
tbgBdv9V9sgtAgMBAAGjggLtMIIC6TAfBgNVHSMEGDAWgBRfOnwREH4MZ3Fh3Iuj
tQADZ/VXHDAdBgNVHQ4EFgQUjqZHMBwnZrrlYVLZnNFBaCgfdU0wDgYDVR0PAQH/
BAQDAgWgMAwGA1UdEwEB/wQCMAAwHQYDVR0lBBYwFAYIKwYBBQUHAwEGCCsGAQUF
BwMCMEkGA1UdIARCMEAwNAYLKwYBBAGyMQECAjEwJTAjBggrBgEFBQcCARYXaHR0
cHM6Ly9zZWN0aWdvLmNvbS9DUFMwCAYGZ4EMAQIBMH0GCCsGAQUFBwEBBHEwbzBC
BggrBgEFBQcwAoY2aHR0cDovL2NydC50cnVzdC1wcm92aWRlci5jbi9UcnVzdEFz
aWFSU0FEVlRMU0NBRzIuY3J0MCkGCCsGAQUFBzABhh1odHRwOi8vb2NzcC50cnVz
dC1wcm92aWRlci5jbjAdBgNVHREEFjAUghJsb2NhbC5tb25pYnVjYS5jb20wggF/
BgorBgEEAdZ5AgQCBIIBbwSCAWsBaQB3AHb/iD8KtvuVUcJhzPWHujS0pM27Kdxo
Qgqf5mdMWjp0AAABhnkjvQkAAAQDAEgwRgIhAMBG7q1j0b/vZwGD27eUP0+nBh73
P7nC+CSokfpts8SoAiEA+HCrNNZ4jrgiLyi9035m1Hvf414Bn8ksSjWzxNUEiCgA
dgDatr9rP7W2Ip+bwrtca+hwkXFsu1GEhTS9pD0wSNf7qwAAAYZ5I7zdAAAEAwBH
MEUCIQCdIMs1nf5cheetmz4/9om8/6KDsoej8VFXHzaDt4BkAAIgLePqBifr6zUA
lTpl17CRDG7q09kUPzElK8uTLjnugtkAdgDuzdBk1dsazsVct520zROiModGfLzs
3sNRSFlGcR+1mwAAAYZ5I7yiAAAEAwBHMEUCIAbWcfP71joz+2wBVRU78RD0bV6V
ugIiATVYUh+k9duJAiEA7x0JIdFuLBj3ggGH9QpAWdG03z48kZ9Cy1DjibRn4fYw
DQYJKoZIhvcNAQEMBQADggGBAItE6rr+1vFoVA3R71+23W0ctYrBTWyxCOse8i0x
/BEo13FjERXJkKWGSj1mwmTikO94JPcPqm83PVyZ0eIbEPu4IO/E6xFbOkTSQu7c
o+5i7TdqtPfv6AOApt+yBb3t3NHRXk2WYLV4onvuSnorbFaj9wRS7GNr+rXCIbqJ
HaFKpneHoV9XhKYUwdgDr2w6JkGattyFdee5o60+8EtL068Mf6Qg3OmyMosEZuBw
R/Gs4DPVKwxj/qt7cJPZoUDV7L6LzLCkgxre8nvvLbOBkC34i4KQGF4CkTjRPWw+
OpRKOzWIw9fQ2+m+z7QwWi+fZQ31EAT6KGnHqPLePJNj09qqUSff3e/y9szzKGHc
TpKVSCgEuuCiBNze7PG9D8QgBMYHkOuGkMeP1EO0pZ3mxd3obUn+bPz0tsqvMR2t
gBx56pMeFnVNQR26VqT2YE+Xw7j6AQUwLa6SCMEsfPeotnhl5tiIKxjqWjWf1lLV
/YWY7m7yb5ctZnq9FJU926ZLLA==
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIFBzCCA++gAwIBAgIRALIM7VUuMaC/NDp1KHQ76aswDQYJKoZIhvcNAQELBQAw
ezELMAkGA1UEBhMCR0IxGzAZBgNVBAgMEkdyZWF0ZXIgTWFuY2hlc3RlcjEQMA4G
A1UEBwwHU2FsZm9yZDEaMBgGA1UECgwRQ29tb2RvIENBIExpbWl0ZWQxITAfBgNV
BAMMGEFBQSBDZXJ0aWZpY2F0ZSBTZXJ2aWNlczAeFw0yMjAxMTAwMDAwMDBaFw0y
ODEyMzEyMzU5NTlaMFkxCzAJBgNVBAYTAkNOMSUwIwYDVQQKExxUcnVzdEFzaWEg
VGVjaG5vbG9naWVzLCBJbmMuMSMwIQYDVQQDExpUcnVzdEFzaWEgUlNBIERWIFRM
UyBDQSBHMjCCAaIwDQYJKoZIhvcNAQEBBQADggGPADCCAYoCggGBAKjGDe0GSaBs
Yl/VhMaTM6GhfR1TAt4mrhN8zfAMwEfLZth+N2ie5ULbW8YvSGzhqkDhGgSBlafm
qq05oeESrIJQyz24j7icGeGyIZ/jIChOOvjt4M8EVi3O0Se7E6RAgVYcX+QWVp5c
Sy+l7XrrtL/pDDL9Bngnq/DVfjCzm5ZYUb1PpyvYTP7trsV+yYOCNmmwQvB4yVjf
IIpHC1OcsPBntMUGeH1Eja4D+qJYhGOxX9kpa+2wTCW06L8T6OhkpJWYn5JYiht5
8exjAR7b8Zi3DeG9oZO5o6Qvhl3f8uGU8lK1j9jCUN/18mI/5vZJ76i+hsgdlfZB
Rh5lmAQjD80M9TY+oD4MYUqB5XrigPfFAUwXFGehhlwCVw7y6+5kpbq/NpvM5Ba8
SeQYUUuMA8RXpTtGlrrTPqJryfa55hTuX/ThhX4gcCVkbyujo0CYr+Uuc14IOyNY
1fD0/qORbllbgV41wiy/2ZUWZQUodqHWkjT1CwIMbQOY5jmrSYGBwwIDAQABo4IB
JjCCASIwHwYDVR0jBBgwFoAUoBEKIz6W8Qfs4q8p74Klf9AwpLQwHQYDVR0OBBYE
FF86fBEQfgxncWHci6O1AANn9VccMA4GA1UdDwEB/wQEAwIBhjASBgNVHRMBAf8E
CDAGAQH/AgEAMB0GA1UdJQQWMBQGCCsGAQUFBwMBBggrBgEFBQcDAjAiBgNVHSAE
GzAZMA0GCysGAQQBsjEBAgIxMAgGBmeBDAECATBDBgNVHR8EPDA6MDigNqA0hjJo
dHRwOi8vY3JsLmNvbW9kb2NhLmNvbS9BQUFDZXJ0aWZpY2F0ZVNlcnZpY2VzLmNy
bDA0BggrBgEFBQcBAQQoMCYwJAYIKwYBBQUHMAGGGGh0dHA6Ly9vY3NwLmNvbW9k
b2NhLmNvbTANBgkqhkiG9w0BAQsFAAOCAQEAHMUom5cxIje2IiFU7mOCsBr2F6CY
eU5cyfQ/Aep9kAXYUDuWsaT85721JxeXFYkf4D/cgNd9+hxT8ZeDOJrn+ysqR7NO
2K9AdqTdIY2uZPKmvgHOkvH2gQD6jc05eSPOwdY/10IPvmpgUKaGOa/tyygL8Og4
3tYyoHipMMnS4OiYKakDJny0XVuchIP7ZMKiP07Q3FIuSS4omzR77kmc75/6Q9dP
v4wa90UCOn1j6r7WhMmX3eT3Gsdj3WMe9bYD0AFuqa6MDyjIeXq08mVGraXiw73s
Zale8OMckn/BU3O/3aFNLHLfET2H2hT6Wb3nwxjpLIfXmSVcVd8A58XH0g==
-----END CERTIFICATE-----

View File

@@ -1,4 +0,0 @@
package config
type Publish struct {
}

64
pkg/config/quic.go Normal file
View File

@@ -0,0 +1,64 @@
package config
import (
"context"
"crypto/tls"
"log/slog"
"github.com/quic-go/quic-go"
)
type QuicConfig interface {
ListenQuic(context.Context, QuicPlugin) error
}
type Quic struct {
ListenAddr string `desc:"监听地址格式为ip:portip 可省略默认监听所有网卡"`
CertFile string `desc:"证书文件"`
KeyFile string `desc:"私钥文件"`
}
func (q *Quic) ListenQuic(ctx context.Context, plugin QuicPlugin) error {
listener, err := quic.ListenAddr(q.ListenAddr, q.generateTLSConfig(), &quic.Config{
EnableDatagrams: true,
})
if err != nil {
return err
}
slog.Info("quic listen", "addr", q.ListenAddr)
for {
conn, err := listener.Accept(ctx)
if err != nil {
return err
}
go plugin.ServeQuic(conn)
}
}
func (q *Quic) generateTLSConfig() *tls.Config {
// key, err := rsa.GenerateKey(rand.Reader, 1024)
// if err != nil {
// panic(err)
// }
// template := x509.Certificate{SerialNumber: big.NewInt(1)}
// certDER, err := x509.CreateCertificate(rand.Reader, &template, &template, &key.PublicKey, key)
// if err != nil {
// panic(err)
// }
// keyPEM := pem.EncodeToMemory(&pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(key)})
// certPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certDER})
// tlsCert, err := tls.X509KeyPair(certPEM, keyPEM)
keyPair, err := tls.X509KeyPair(LocalCert, LocalKey)
if q.CertFile != "" || q.KeyFile != "" {
keyPair, err = tls.LoadX509KeyPair(q.CertFile, q.KeyFile)
}
if err != nil {
slog.Error("LoadX509KeyPair", "error", err)
panic(err)
}
return &tls.Config{
Certificates: []tls.Certificate{keyPair},
NextProtos: []string{"monibuca"},
}
}

49
pkg/config/regexp.go Normal file
View File

@@ -0,0 +1,49 @@
package config
import (
"regexp"
"gopkg.in/yaml.v3"
)
type Regexp struct {
*regexp.Regexp
}
func (r *Regexp) Valid() bool {
return r.Regexp != nil
}
func (r Regexp) String() string {
if !r.Valid() {
return ""
}
return r.Regexp.String()
}
func (r *Regexp) UnmarshalYAML(node *yaml.Node) error {
r.Regexp = regexp.MustCompile(node.Value)
return nil
}
func (r Regexp) MarshalYAML() (interface{}, error) {
return r.String(), nil
}
func (r Regexp) MarshalJSON() ([]byte, error) {
return []byte(`"` + r.String() + `"`), nil
}
func (r *Regexp) UnmarshalJSON(b []byte) error {
if len(b) == 0 {
return nil
}
if b[0] == '"' {
b = b[1:]
}
if b[len(b)-1] == '"' {
b = b[:len(b)-1]
}
r.Regexp = regexp.MustCompile(string(b))
return nil
}

112
pkg/config/tcp.go Normal file
View File

@@ -0,0 +1,112 @@
package config
import (
"context"
"crypto/tls"
_ "embed"
"net"
"runtime"
"time"
)
//go:embed local.monibuca.com_bundle.pem
var LocalCert []byte
//go:embed local.monibuca.com.key
var LocalKey []byte
var _ TCPConfig = (*TCP)(nil)
type TCPConfig interface {
ListenTCP(context.Context, TCPPlugin) error
}
type TCP struct {
ListenAddr string `desc:"监听地址格式为ip:portip 可省略默认监听所有网卡"`
ListenAddrTLS string `desc:"监听地址格式为ip:portip 可省略默认监听所有网卡"`
CertFile string `desc:"证书文件"`
KeyFile string `desc:"私钥文件"`
ListenNum int `desc:"同时并行监听数量0为CPU核心数量"` //同时并行监听数量0为CPU核心数量
NoDelay bool `desc:"是否禁用Nagle算法"` //是否禁用Nagle算法
}
func (tcp *TCP) listen(l net.Listener, handler func(net.Conn)) {
var tempDelay time.Duration
for {
conn, err := l.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
// slog.Warnf("%s: Accept error: %v; retrying in %v", tcp.ListenAddr, err, tempDelay)
time.Sleep(tempDelay)
continue
}
return
}
var tcpConn *net.TCPConn
switch v := conn.(type) {
case *net.TCPConn:
tcpConn = v
case *tls.Conn:
tcpConn = v.NetConn().(*net.TCPConn)
}
if !tcp.NoDelay {
tcpConn.SetNoDelay(false)
}
tempDelay = 0
go handler(conn)
}
}
func (tcp *TCP) ListenTCP(ctx context.Context, plugin TCPPlugin) error {
l, err := net.Listen("tcp", tcp.ListenAddr)
if err != nil {
// if Global.LogLang == "zh" {
// slog.Fatalf("%s: 监听失败: %v", tcp.ListenAddr, err)
// } else {
// slog.Fatalf("%s: Listen error: %v", tcp.ListenAddr, err)
// }
return err
}
count := tcp.ListenNum
if count == 0 {
count = runtime.NumCPU()
}
// slog.Infof("tcp listen %d at %s", count, tcp.ListenAddr)
for i := 0; i < count; i++ {
go tcp.listen(l, plugin.ServeTCP)
}
if tcp.ListenAddrTLS != "" {
keyPair, _ := tls.X509KeyPair(LocalCert, LocalKey)
if tcp.CertFile != "" || tcp.KeyFile != "" {
keyPair, err = tls.LoadX509KeyPair(tcp.CertFile, tcp.KeyFile)
}
if err != nil {
// slog.Error("LoadX509KeyPair", "error", err)
return err
}
l, err = tls.Listen("tcp", tcp.ListenAddrTLS, &tls.Config{
Certificates: []tls.Certificate{keyPair},
})
if err != nil {
// if Global.LogLang == "zh" {
// slog.Fatalf("%s: 监听失败: %v", tcp.ListenAddrTLS, err)
// } else {
// slog.Fatalf("%s: Listen error: %v", tcp.ListenAddrTLS, err)
// }
return err
}
// slog.Infof("tls tcp listen %d at %s", count, tcp.ListenAddrTLS)
for i := 0; i < count; i++ {
go tcp.listen(l, plugin.ServeTCP)
}
}
<-ctx.Done()
return l.Close()
}

201
pkg/config/types.go Executable file
View File

@@ -0,0 +1,201 @@
package config
import (
"fmt"
"net/http"
"regexp"
"strings"
"sync"
"time"
)
type PublishConfig interface {
GetPublishConfig() Publish
}
type SubscribeConfig interface {
GetSubscribeConfig() *Subscribe
}
type PullConfig interface {
GetPullConfig() *Pull
}
type PushConfig interface {
GetPushConfig() *Push
}
type Publish struct {
PubAudio bool `default:"true" desc:"是否发布音频"`
PubVideo bool `default:"true" desc:"是否发布视频"`
KickExist bool `desc:"是否踢掉已经存在的发布者"` // 是否踢掉已经存在的发布者
PublishTimeout time.Duration `default:"10s" desc:"发布无数据超时"` // 发布无数据超时
WaitCloseTimeout time.Duration `desc:"延迟自动关闭(等待重连)"` // 延迟自动关闭(等待重连)
DelayCloseTimeout time.Duration `desc:"延迟自动关闭(无订阅时)"` // 延迟自动关闭(无订阅时)
IdleTimeout time.Duration `desc:"空闲(无订阅)超时"` // 空闲(无订阅)超时
PauseTimeout time.Duration `default:"30s" desc:"暂停超时时间"` // 暂停超时
BufferTime time.Duration `desc:"缓冲长度(单位:秒)0代表取最近关键帧"` // 缓冲长度(单位:秒)0代表取最近关键帧
SpeedLimit time.Duration `default:"500ms" desc:"速度限制最大等待时间,0则不等待"` //速度限制最大等待时间
Key string `desc:"发布鉴权key"` // 发布鉴权key
SecretArgName string `default:"secret" desc:"发布鉴权参数名"` // 发布鉴权参数名
ExpireArgName string `default:"expire" desc:"发布鉴权失效时间参数名"` // 发布鉴权失效时间参数名
RingSize string `default:"256-1024" desc:"缓冲范围"` // 初始缓冲区大小
}
func (c Publish) GetPublishConfig() Publish {
return c
}
type Subscribe struct {
SubAudio bool `default:"true" desc:"是否订阅音频"`
SubVideo bool `default:"true" desc:"是否订阅视频"`
SubVideoArgName string `default:"vts" desc:"定订阅的视频轨道参数名"` // 指定订阅的视频轨道参数名
SubAudioArgName string `default:"ats" desc:"指定订阅的音频轨道参数名"` // 指定订阅的音频轨道参数名
SubDataArgName string `default:"dts" desc:"指定订阅的数据轨道参数名"` // 指定订阅的数据轨道参数名
SubModeArgName string `desc:"指定订阅的模式参数名"` // 指定订阅的模式参数名
SubAudioTracks []string `desc:"指定订阅的音频轨道"` // 指定订阅的音频轨道
SubVideoTracks []string `desc:"指定订阅的视频轨道"` // 指定订阅的视频轨道
SubDataTracks []string `desc:"指定订阅的数据轨道"` // 指定订阅的数据轨道
SubMode int `desc:"订阅模式" enum:"0:实时模式,1:首屏后不进行追赶,2:从缓冲最大的关键帧开始播放"` // 0实时模式追赶发布者进度在播放首屏后等待发布者的下一个关键帧然后跳到该帧。1、首屏后不进行追赶。2、从缓冲最大的关键帧开始播放也不追赶需要发布者配置缓存长度
SyncMode int `desc:"同步模式" enum:"0:采用时间戳同步,1:采用写入时间同步"` // 0采用时间戳同步1采用写入时间同步
IFrameOnly bool `desc:"只要关键帧"` // 只要关键帧
WaitTimeout time.Duration `default:"10s" desc:"等待流超时时间"` // 等待流超时
WriteBufferSize int `desc:"写缓冲大小"` // 写缓冲大小
Key string `desc:"订阅鉴权key"` // 订阅鉴权key
SecretArgName string `default:"secret" desc:"订阅鉴权参数名"` // 订阅鉴权参数名
ExpireArgName string `default:"expire" desc:"订阅鉴权失效时间参数名"` // 订阅鉴权失效时间参数名
Internal bool `default:"false" desc:"是否内部订阅"` // 是否内部订阅
}
func (c *Subscribe) GetSubscribeConfig() *Subscribe {
return c
}
type Pull struct {
RePull int `desc:"断开后自动重试次数,0:不重试,-1:无限重试"` // 断开后自动重拉,0 表示不自动重拉,-1 表示无限重拉高于0 的数代表最大重拉次数
EnableRegexp bool `desc:"是否启用正则表达式"` // 是否启用正则表达式
PullOnStart map[string]string `desc:"启动时拉流的列表"` // 启动时拉流的列表
PullOnSub map[string]string `desc:"订阅时自动拉流的列表"` // 订阅时自动拉流的列表
Proxy string `desc:"代理地址"` // 代理地址
PullOnSubLocker sync.RWMutex `yaml:"-" json:"-"`
PullOnStartLocker sync.RWMutex `yaml:"-" json:"-"`
}
func (p *Pull) GetPullConfig() *Pull {
return p
}
func (p *Pull) CheckPullOnStart(streamPath string) string {
p.PullOnStartLocker.RLock()
defer p.PullOnStartLocker.RUnlock()
if p.PullOnStart == nil {
return ""
}
url, ok := p.PullOnStart[streamPath]
if !ok && p.EnableRegexp {
for k, url := range p.PullOnStart {
if r, err := regexp.Compile(k); err != nil {
if group := r.FindStringSubmatch(streamPath); group != nil {
for i, value := range group {
url = strings.Replace(url, fmt.Sprintf("$%d", i), value, -1)
}
return url
}
}
return ""
}
}
return url
}
func (p *Pull) CheckPullOnSub(streamPath string) string {
p.PullOnSubLocker.RLock()
defer p.PullOnSubLocker.RUnlock()
if p.PullOnSub == nil {
return ""
}
url, ok := p.PullOnSub[streamPath]
if !ok && p.EnableRegexp {
for k, url := range p.PullOnSub {
if r, err := regexp.Compile(k); err == nil {
if group := r.FindStringSubmatch(streamPath); group != nil {
for i, value := range group {
url = strings.Replace(url, fmt.Sprintf("$%d", i), value, -1)
}
return url
}
}
return ""
}
}
return url
}
type Push struct {
EnableRegexp bool `desc:"是否启用正则表达式"` // 是否启用正则表达式
RePush int `desc:"断开后自动重试次数,0:不重试,-1:无限重试"` // 断开后自动重推,0 表示不自动重推,-1 表示无限重推高于0 的数代表最大重推次数
PushList map[string]string `desc:"自动推流列表"` // 自动推流列表
Proxy string `desc:"代理地址"` // 代理地址
}
func (p *Push) GetPushConfig() *Push {
return p
}
func (p *Push) AddPush(url string, streamPath string) {
if p.PushList == nil {
p.PushList = make(map[string]string)
}
p.PushList[streamPath] = url
}
func (p *Push) CheckPush(streamPath string) string {
url, ok := p.PushList[streamPath]
if !ok && p.EnableRegexp {
for k, url := range p.PushList {
if r, err := regexp.Compile(k); err == nil {
if group := r.FindStringSubmatch(streamPath); group != nil {
for i, value := range group {
url = strings.Replace(url, fmt.Sprintf("$%d", i), value, -1)
}
return url
}
}
return ""
}
}
return url
}
type Console struct {
Server string `default:"console.monibuca.com:44944" desc:"远程控制台地址"` //远程控制台地址
Secret string `desc:"远程控制台密钥"` //远程控制台密钥
PublicAddr string `desc:"远程控制台公网地址"` //公网地址,提供远程控制台访问的地址,不配置的话使用自动识别的地址
PublicAddrTLS string `desc:"远程控制台公网TLS地址"`
}
type Engine struct {
Publish
Subscribe
HTTP
Console
EnableAVCC bool `default:"true" desc:"启用AVCC格式rtmp、http-flv协议使用"` //启用AVCC格式rtmp、http-flv协议使用
EnableRTP bool `default:"true" desc:"启用RTP格式rtsp、webrtc等协议使用"` //启用RTP格式rtsp、webrtc等协议使用
EnableSubEvent bool `default:"true" desc:"启用订阅事件,禁用可以提高性能"` //启用订阅事件,禁用可以提高性能
EnableAuth bool `default:"true" desc:"启用鉴权"` //启用鉴权
LogLang string `default:"zh" desc:"日志语言" enum:"zh:中文,en:英文"` //日志语言
LogLevel string `default:"info" enum:"trace:跟踪,debug:调试,info:信息,warn:警告,error:错误"` //日志级别
EventBusSize int `default:"10" desc:"事件总线大小"` //事件总线大小
PulseInterval time.Duration `default:"5s" desc:"心跳事件间隔"` //心跳事件间隔
DisableAll bool `default:"false" desc:"禁用所有插件"` //禁用所有插件
RTPReorderBufferLen int `default:"50" desc:"RTP重排序缓冲区长度"` //RTP重排序缓冲区长度
PoolSize int `desc:"内存池大小"` //内存池大小
}
var Global *Engine
func (cfg *Engine) InitDefaultHttp() {
Global = cfg
cfg.HTTP.mux = http.NewServeMux()
cfg.HTTP.ListenAddrTLS = ":8443"
cfg.HTTP.ListenAddr = ":8080"
}

View File

@@ -8,14 +8,13 @@ import (
"runtime"
"strings"
"m7s.live/monibuca/v5/pkg/config"
"m7s.live/m7s/v5/pkg/config"
)
type PluginMeta struct {
Name string
Version string //插件版本
Disabled bool
Type reflect.Type
Name string
Version string //插件版本
Type reflect.Type
}
type IPlugin interface {
@@ -46,10 +45,18 @@ func InstallPlugin[C IPlugin](options ...any) error {
}
type Plugin struct {
Disabled bool
Meta *PluginMeta
context.Context `json:"-" yaml:"-"`
context.CancelCauseFunc `json:"-" yaml:"-"`
Config struct {
config.Publish
config.Subscribe
config.HTTP
config.Quic
config.TCP
config.Pull
config.Push
}
Publishers []*Publisher
*slog.Logger
@@ -61,7 +68,7 @@ func (p *Plugin) PostMessage(message any) {
p.server.EventBus <- message
}
func (p *Plugin) Publish(streamPath string) *Publisher {
publisher := &Publisher{Plugin: p}
return publisher
func (p *Plugin) Publish(streamPath string) (publisher *Publisher, err error) {
publisher = &Publisher{Plugin: p, Config: p.Config.Publish, Logger: p.With("streamPath", streamPath)}
return
}

View File

@@ -1,8 +1,8 @@
package demo
import (
m7s "m7s.live/monibuca/v5"
. "m7s.live/monibuca/v5/pkg"
"m7s.live/m7s/v5"
. "m7s.live/m7s/v5/pkg"
)
type DemoPlugin struct {
@@ -10,7 +10,10 @@ type DemoPlugin struct {
}
func (p *DemoPlugin) OnInit() {
puber := p.Publish("live/demo")
puber, err := p.Publish("live/demo")
if err != nil {
panic(err)
}
puber.WriteVideo(&H264Nalu{})
}

1
plugin/rtmp/index.go Normal file
View File

@@ -0,0 +1 @@
package rtmp

View File

@@ -1,9 +1,16 @@
package m7s
import . "m7s.live/monibuca/v5/pkg"
import (
"log/slog"
. "m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/config"
)
type Publisher struct {
Config config.Publish
Plugin *Plugin
Logger *slog.Logger
}
func (p *Publisher) WriteVideo(data IVideoData) {

View File

@@ -5,10 +5,9 @@ import (
"errors"
"log/slog"
"reflect"
"time"
"unsafe"
. "m7s.live/monibuca/v5/pkg"
. "m7s.live/m7s/v5/pkg"
)
type Server struct {
@@ -26,7 +25,6 @@ func NewServer() *Server {
}
func Run(ctx context.Context) {
ctx, _ = context.WithDeadline(ctx, time.Now().Add(time.Second*10))
DefaultServer.Run(ctx)
}
@@ -41,6 +39,9 @@ func (s *Server) Run(ctx context.Context) {
s.Warn("Server is done", "reason", context.Cause(s))
case event := <-s.EventBus:
for _, plugin := range s.Plugins {
if plugin.Disabled {
continue
}
plugin.handler.OnEvent(event)
}
}
@@ -55,10 +56,9 @@ func (s *Server) initPlugins() {
instance := reflect.New(plugin.Type).Interface().(IPlugin)
p := reflect.ValueOf(instance).Elem().FieldByName("Plugin").Addr().Interface().(*Plugin)
p.handler = instance
if plugin.Disabled {
continue
}
p.Meta = &plugin
p.server = s
p.Logger = s.Logger.With("plugin", plugin.Name)
s.Plugins = append(s.Plugins, p)
instance.OnInit()
}

View File

@@ -1 +1 @@
dot_import_whitelist = ["m7s.live/engine/v5/pkg"]
dot_import_whitelist = ["m7s.live/m7s/v5/pkg"]