diff --git a/codec/codec.go b/codec/codec.go index 63fb235..5e9072d 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -2,15 +2,16 @@ package codec import ( "errors" + ) const ( ADTS_HEADER_SIZE = 7 CodecID_AAC = 0xA CodecID_PCMA = 7 - CodecID_PCMU = 8 - CodecID_H264 = 7 - CodecID_H265 = 0xC + CodecID_PCMU = 8 + CodecID_H264 = 7 + CodecID_H265 = 0xC ) // ISO/IEC 14496-3 38(52)/page @@ -134,14 +135,7 @@ type ADTSVariableHeader struct { // NumberOfRawDataBlockInFrame, 表示ADTS帧中有number_of_raw_data_blocks_in_frame + 1个AAC原始帧 // 所以说number_of_raw_data_blocks_in_frame == 0 表示说ADTS帧中有一个AAC数据块并不是说没有。(一个AAC原始帧包含一段时间内1024个采样及相关数据) -func ADTSToAudioSpecificConfig(data []byte) []byte { - profile := ((data[2] & 0xc0) >> 6) + 1 - sampleRate := (data[2] & 0x3c) >> 2 - channel := ((data[2] & 0x1) << 2) | ((data[3] & 0xc0) >> 6) - config1 := (profile << 3) | ((sampleRate & 0xe) >> 1) - config2 := ((sampleRate & 0x1) << 7) | (channel << 3) - return []byte{0xAF, 0x00, config1, config2} -} + func AudioSpecificConfigToADTS(asc AudioSpecificConfig, rawDataLength int) (adts ADTS, adtsByte []byte, err error) { if asc.ChannelConfiguration > 8 || asc.FrameLengthFlag > 13 { err = errors.New("Reserved field.") @@ -218,3 +212,5 @@ func ParseRTPAAC(payload []byte) (result [][]byte) { } return } + + diff --git a/codec/flv.go b/codec/flv.go index 995490e..6bd0f30 100644 --- a/codec/flv.go +++ b/codec/flv.go @@ -108,3 +108,14 @@ func ReadFLVTag(r io.Reader) (t byte, timestamp uint32, payload []byte, err erro } return } + +func VideoAVCC2FLV(avcc net.Buffers, ts uint32) (flv net.Buffers) { + b := util.Buffer(make([]byte, 0, 15)) + b.WriteByte(FLV_TAG_TYPE_VIDEO) + dataSize := util.SizeOfBuffers(avcc) + b.WriteUint24(uint32(dataSize)) + b.WriteUint24(ts) + b.WriteByte(byte(ts >> 24)) + b.WriteUint24(0) + return append(append(append(flv, b), avcc...), util.PutBE(b.Malloc(4), dataSize+11)) +} diff --git a/common/frame.go b/common/frame.go index 3870e4d..825613e 100644 --- a/common/frame.go +++ b/common/frame.go @@ -4,11 +4,12 @@ import ( "net" "time" - "github.com/Monibuca/engine/v4/util" + "github.com/Monibuca/engine/v4/codec" "github.com/pion/rtp" ) type NALUSlice net.Buffers + // type H264Slice NALUSlice // type H265Slice NALUSlice @@ -82,14 +83,8 @@ func (av *AVFrame[T]) AppendRaw(raw ...T) { av.Raw = append(av.Raw, raw...) } func (av *AVFrame[T]) FillFLV(t byte, ts uint32) { - b := util.Buffer(make([]byte, 0, 15)) - b.WriteByte(t) - dataSize := util.SizeOfBuffers(av.AVCC) - b.WriteUint24(uint32(dataSize)) - b.WriteUint24(ts) - b.WriteByte(byte(ts >> 24)) - b.WriteUint24(0) - av.FLV = append(append(append(av.FLV, b), av.AVCC...), util.PutBE(b.Malloc(4), dataSize+11)) + av.FLV = codec.VideoAVCC2FLV(av.AVCC, ts) + av.FLV[0][0] = t } func (av *AVFrame[T]) AppendAVCC(avcc ...[]byte) { av.AVCC = append(av.AVCC, avcc...) @@ -151,3 +146,8 @@ func (avcc AVCCFrame) AudioCodecID() byte { // } // return // } +type DecoderConfiguration[T RawSlice] struct { + AVCC T + Raw T + FLV net.Buffers +} diff --git a/config.go b/config.go deleted file mode 100644 index 221026f..0000000 --- a/config.go +++ /dev/null @@ -1,207 +0,0 @@ -package engine - -import ( - "context" - "log" - "net" - "net/http" - "reflect" - "runtime" - "strings" - "time" - - "golang.org/x/sync/errgroup" -) - -type Second int - -func (s Second) Duration() time.Duration { - return time.Duration(s) * time.Second -} - -type PluginConfig interface { - Update(Config) -} - -type TCPPluginConfig interface { - PluginConfig - context.Context - ServeTCP(*net.TCPConn) -} - -type HTTPPluginConfig interface { - PluginConfig - context.Context - http.Handler -} - -type Config map[string]any - -func (config Config) Unmarshal(s any) { - var el reflect.Value - if v, ok := s.(reflect.Value); ok { - el = v - } else { - el = reflect.ValueOf(s).Elem() - } - t := el.Type() - for k, v := range config { - var fv reflect.Value - value := reflect.ValueOf(v) - if f, ok := t.FieldByName(strings.ToUpper(k[:1]) + k[1:]); ok { - // 兼容首字母大写的属性 - fv = el.FieldByName(f.Name) - } else if f, ok := t.FieldByName(strings.ToUpper(k)); ok { - // 兼容全部大写的属性 - fv = el.FieldByName(f.Name) - } else { - continue - } - if t.Kind() == reflect.Slice { - l := value.Len() - s := reflect.MakeSlice(t.Elem(), l, value.Cap()) - for i := 0; i < l; i++ { - fv := value.Field(i) - if fv.Type() == reflect.TypeOf(config) { - fv.FieldByName("Unmarshal").Call([]reflect.Value{s.Field(i)}) - } else { - s.Field(i).Set(fv) - } - } - fv.Set(s) - } else if child, ok := v.(Config); ok { - child.Unmarshal(fv) - } else { - fv.Set(value) - } - } -} - -func (config Config) Assign(source Config) { - for k, v := range source { - m, isMap := v.(map[string]any) - if _, ok := config[k]; !ok || !isMap { - config[k] = v - } else { - Config(config[k].(map[string]any)).Assign(m) - } - } -} - -func (config Config) Has(key string) (ok bool) { - if config == nil { - return - } - _, ok = config[key] - return -} - -type TCPConfig struct { - ListenAddr string - ListenNum int //同时并行监听数量,0为CPU核心数量 -} - -func (tcp *TCPConfig) listen(l net.Listener, handler func(*net.TCPConn)) { - var tempDelay time.Duration - for { - conn, err := l.Accept() - if err != nil { - if ne, ok := err.(net.Error); ok && ne.Temporary() { - if tempDelay == 0 { - tempDelay = 5 * time.Millisecond - } else { - tempDelay *= 2 - } - if max := 1 * time.Second; tempDelay > max { - tempDelay = max - } - log.Printf("%s: Accept error: %v; retrying in %v", tcp.ListenAddr, err, tempDelay) - time.Sleep(tempDelay) - continue - } - return - } - conn.(*net.TCPConn).SetNoDelay(false) - tempDelay = 0 - go handler(conn.(*net.TCPConn)) - } -} -func (tcp *TCPConfig) Listen(plugin TCPPluginConfig) error { - l, err := net.Listen("tcp", tcp.ListenAddr) - if err != nil { - return err - } - count := tcp.ListenNum - if count == 0 { - count = runtime.NumCPU() - } - for i := 0; i < count; i++ { - go tcp.listen(l, plugin.ServeTCP) - } - <-plugin.Done() - return l.Close() -} - -type HTTPConfig struct { - ListenAddr string - ListenAddrTLS string - CertFile string - KeyFile string - CORS bool //是否自动添加CORS头 -} - -// ListenAddrs Listen http and https -func (config *HTTPConfig) Listen(plugin HTTPPluginConfig) error { - var g errgroup.Group - if config.ListenAddrTLS != "" { - g.Go(func() error { - return http.ListenAndServeTLS(config.ListenAddrTLS, config.CertFile, config.KeyFile, plugin) - }) - } - if config.ListenAddr != "" { - g.Go(func() error { return http.ListenAndServe(config.ListenAddr, plugin) }) - } - g.Go(func() error { - <-plugin.Done() - return plugin.Err() - }) - return g.Wait() -} - -type PublishConfig struct { - PubAudio bool - PubVideo bool - KillExit bool // 是否踢掉已经存在的发布者 - PublishTimeout Second // 发布无数据超时 - WaitCloseTimeout Second // 延迟自动关闭(无订阅时) -} - -type SubscribeConfig struct { - SubAudio bool - SubVideo bool - IFrameOnly bool // 只要关键帧 - WaitTimeout Second // 等待流超时 -} - -type PullConfig struct { - AutoReconnect bool // 自动重连 - PullOnStart bool // 启动时拉流 - PullOnSubscribe bool // 订阅时自动拉流 - AutoPullList map[string]string // 自动拉流列表 -} - -type PushConfig struct { - AutoPushList map[string]string // 自动推流列表 -} - -type EngineConfig struct { - *http.ServeMux - context.Context - Publish PublishConfig - Subscribe SubscribeConfig - HTTP HTTPConfig - RTPReorder bool - EnableAVCC bool //启用AVCC格式,rtmp协议使用 - EnableRTP bool //启用RTP格式,rtsp、gb18181等协议使用 - EnableFLV bool //开启FLV格式,hdl协议使用 -} diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..61a41f6 --- /dev/null +++ b/config/config.go @@ -0,0 +1,151 @@ +package config + +import ( + "net" + "net/http" + "reflect" + "strings" + "time" +) + +type Config map[string]any + +type Second int + +func (s Second) Duration() time.Duration { + return time.Duration(s) * time.Second +} + +type Plugin interface { + Update(Config) +} + +type TCPPlugin interface { + Plugin + ServeTCP(*net.TCPConn) +} + +type HTTPPlugin interface { + Plugin + http.Handler +} + +func (config Config) Unmarshal(s any) { + if s == nil { + return + } + var el reflect.Value + if v, ok := s.(reflect.Value); ok { + el = v + } else { + el = reflect.ValueOf(s) + } + if el.Kind() == reflect.Pointer { + el = el.Elem() + } + t := el.Type() + + //字段映射,小写对应的大写 + nameMap := make(map[string]string) + for i, j := 0, t.NumField(); i < j; i++ { + name := t.Field(i).Name + nameMap[strings.ToLower(name)] = name + } + for k, v := range config { + value := reflect.ValueOf(v) + // 需要被写入的字段 + fv := el.FieldByName(nameMap[k]) + if t.Kind() == reflect.Slice { + l := value.Len() + s := reflect.MakeSlice(t.Elem(), l, value.Cap()) + for i := 0; i < l; i++ { + fv := value.Field(i) + if fv.Type() == reflect.TypeOf(config) { + fv.FieldByName("Unmarshal").Call([]reflect.Value{s.Field(i)}) + } else { + s.Field(i).Set(fv) + } + } + fv.Set(s) + } else if child, ok := v.(Config); ok { + child.Unmarshal(fv) + } else { + fv.Set(value) + } + } +} + +// 覆盖配置 +func (config Config) Assign(source Config) { + for k, v := range source { + switch m := config[k].(type) { + case Config: + m.Assign(v.(Config)) + default: + config[k] = v + } + } +} + +// 合并配置,不覆盖 +func (config Config) Merge(source Config) { + for k, v := range source { + if _, ok := config[k]; !ok { + switch m := config[k].(type) { + case Config: + m.Merge(v.(Config)) + default: + config[k] = v + } + } + } +} + +func (config Config) Set(key string, value any) { + config[strings.ToLower(key)] = value +} + +func (config Config) Has(key string) (ok bool) { + _, ok = config[strings.ToLower(key)] + return +} + +func (config Config) HasChild(key string) (ok bool) { + _, ok = config[strings.ToLower(key)].(Config) + return ok +} + +func (config Config) GetChild(key string) Config { + return config[strings.ToLower(key)].(Config) +} + +func Struct2Config(s any) (config Config) { + var t reflect.Type + var v reflect.Value + if vv, ok := s.(reflect.Value); ok { + v = vv + t = vv.Type() + } else { + t = reflect.TypeOf(s) + v = reflect.ValueOf(s) + if t.Kind() == reflect.Pointer { + v = v.Elem() + t = t.Elem() + } + } + for i, j := 0, t.NumField(); i < j; i++ { + ft := t.Field(i) + switch ft.Type.Kind() { + case reflect.Struct: + config[ft.Name] = Struct2Config(v.Field(i)) + case reflect.Slice: + fallthrough + default: + if config == nil { + config = make(Config) + } + reflect.ValueOf(config).SetMapIndex(reflect.ValueOf(strings.ToLower(ft.Name)), v.Field(i)) + } + } + return +} diff --git a/config/http.go b/config/http.go new file mode 100644 index 0000000..9eac7b4 --- /dev/null +++ b/config/http.go @@ -0,0 +1,34 @@ +package config + +import ( + "context" + "net/http" + + "golang.org/x/sync/errgroup" +) + +type HTTP struct { + ListenAddr string + ListenAddrTLS string + CertFile string + KeyFile string + CORS bool //是否自动添加CORS头 +} + +// ListenAddrs Listen http and https +func (config *HTTP) Listen(ctx context.Context, plugin HTTPPlugin) error { + var g errgroup.Group + if config.ListenAddrTLS != "" { + g.Go(func() error { + return http.ListenAndServeTLS(config.ListenAddrTLS, config.CertFile, config.KeyFile, plugin) + }) + } + if config.ListenAddr != "" { + g.Go(func() error { return http.ListenAndServe(config.ListenAddr, plugin) }) + } + g.Go(func() error { + <-ctx.Done() + return ctx.Err() + }) + return g.Wait() +} diff --git a/config/tcp.go b/config/tcp.go new file mode 100644 index 0000000..bc14c83 --- /dev/null +++ b/config/tcp.go @@ -0,0 +1,55 @@ +package config + +import ( + "context" + "log" + "net" + "runtime" + "time" +) + +type TCP struct { + ListenAddr string + ListenNum int //同时并行监听数量,0为CPU核心数量 +} + +func (tcp *TCP) listen(l net.Listener, handler func(*net.TCPConn)) { + var tempDelay time.Duration + for { + conn, err := l.Accept() + if err != nil { + if ne, ok := err.(net.Error); ok && ne.Temporary() { + if tempDelay == 0 { + tempDelay = 5 * time.Millisecond + } else { + tempDelay *= 2 + } + if max := 1 * time.Second; tempDelay > max { + tempDelay = max + } + log.Printf("%s: Accept error: %v; retrying in %v", tcp.ListenAddr, err, tempDelay) + time.Sleep(tempDelay) + continue + } + return + } + conn.(*net.TCPConn).SetNoDelay(false) + tempDelay = 0 + go handler(conn.(*net.TCPConn)) + } +} +func (tcp *TCP) Listen(ctx context.Context, plugin TCPPlugin) error { + l, err := net.Listen("tcp", tcp.ListenAddr) + if err != nil { + return err + } + count := tcp.ListenNum + if count == 0 { + count = runtime.NumCPU() + } + for i := 0; i < count; i++ { + go tcp.listen(l, plugin.ServeTCP) + } + <-ctx.Done() + return l.Close() +} diff --git a/config/types.go b/config/types.go new file mode 100644 index 0000000..d482b79 --- /dev/null +++ b/config/types.go @@ -0,0 +1,48 @@ +package config + +type Publish struct { + PubAudio bool + PubVideo bool + KillExit bool // 是否踢掉已经存在的发布者 + PublishTimeout Second // 发布无数据超时 + WaitCloseTimeout Second // 延迟自动关闭(无订阅时) +} + +type Subscribe struct { + SubAudio bool + SubVideo bool + IFrameOnly bool // 只要关键帧 + WaitTimeout Second // 等待流超时 +} + +type Pull struct { + AutoReconnect bool // 自动重连 + PullOnStart bool // 启动时拉流 + PullOnSubscribe bool // 订阅时自动拉流 + AutoPullList map[string]string // 自动拉流列表 +} + +type Push struct { + AutoPushList map[string]string // 自动推流列表 +} + +type Engine struct { + Publish + Subscribe + HTTP + RTPReorder bool + EnableAVCC bool //启用AVCC格式,rtmp协议使用 + EnableRTP bool //启用RTP格式,rtsp、gb18181等协议使用 + EnableFLV bool //开启FLV格式,hdl协议使用 +} + +func (g *Engine) Update(override Config) { + override.Unmarshal(g) +} + +var Global = &Engine{ + Publish{true, true, false, 10, 10}, + Subscribe{true, true, false, 10}, + HTTP{ListenAddr: ":8080", CORS: true}, + false, true, true, true, +} diff --git a/http.go b/http.go index fb7d16d..297ec70 100644 --- a/http.go +++ b/http.go @@ -4,31 +4,31 @@ import ( "encoding/json" "net/http" + "github.com/Monibuca/engine/v4/config" "github.com/Monibuca/engine/v4/util" . "github.com/logrusorgru/aurora" ) -func (config *EngineConfig) Update(override Config) { - override.Unmarshal(config) - if config.Context == nil { - config.Context = Ctx - handleFunc("/sysInfo", sysInfo) - handleFunc("/closeStream", closeStream) - util.Print(Green("api server start at "), BrightBlue(config.HTTP.ListenAddr), BrightBlue(config.HTTP.ListenAddrTLS)) - config.HTTP.Listen(config) - } +type GlobalConfig struct { + *http.ServeMux + *config.Engine } -func handleFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) { - config.HandleFunc("/api"+pattern, func(rw http.ResponseWriter, r *http.Request) { - if config.HTTP.CORS { - util.CORS(rw, r) - } - handler(rw, r) - }) +func (cfg *GlobalConfig) Update(override config.Config) { + cfg.Engine.Update(override) + Engine.RawConfig = config.Struct2Config(cfg.Engine) + util.Print(Green("api server start at "), BrightBlue(cfg.ListenAddr), BrightBlue(cfg.ListenAddrTLS)) + cfg.Listen(Engine, cfg) } -func closeStream(w http.ResponseWriter, r *http.Request) { +func (config *GlobalConfig) API_sysInfo(rw http.ResponseWriter, r *http.Request) { + json.NewEncoder(rw).Encode(&struct { + Version string + StartTime string + }{Engine.Version, StartTime.Format("2006-01-02 15:04:05")}) +} + +func (config *GlobalConfig) API_closeStream(w http.ResponseWriter, r *http.Request) { if streamPath := r.URL.Query().Get("stream"); streamPath != "" { if s := Streams.Get(streamPath); s != nil { s.Close() @@ -40,9 +40,3 @@ func closeStream(w http.ResponseWriter, r *http.Request) { w.Write([]byte("no query stream")) } } -func sysInfo(w http.ResponseWriter, r *http.Request) { - json.NewEncoder(w).Encode(&struct { - Version string - StartTime string - }{Version, StartTime.Format("2006-01-02 15:04:05")}) -} diff --git a/main.go b/main.go index 0906ff3..5ec4153 100644 --- a/main.go +++ b/main.go @@ -13,6 +13,7 @@ import ( "strings" "time" // colorable + "github.com/Monibuca/engine/v4/config" "github.com/Monibuca/engine/v4/util" "github.com/google/uuid" @@ -20,37 +21,25 @@ import ( "gopkg.in/yaml.v3" ) -var Version = "4.0.0" - var ( - DefaultPublishConfig = PublishConfig{ - true, true, false, 10, 10, - } - DefaultSubscribeConfig = SubscribeConfig{ - true, true, false, 10, - } - config = &EngineConfig{ - http.NewServeMux(), - Ctx, - DefaultPublishConfig, - DefaultSubscribeConfig, - HTTPConfig{ListenAddr: ":8080", CORS: true}, - false, true, true, true, - } // ConfigRaw 配置信息的原始数据 - ConfigRaw []byte - StartTime time.Time //启动时间 - Plugins = make(map[string]*Plugin) // Plugins 所有的插件配置 - Ctx context.Context - settingDir string + ConfigRaw []byte + StartTime time.Time //启动时间 + Plugins = make(map[string]*Plugin) // Plugins 所有的插件配置 + settingDir string + EngineConfig = &GlobalConfig{ + Engine: config.Global, + ServeMux: http.NewServeMux(), + } + Engine = InstallPlugin(EngineConfig) ) -func InstallPlugin(config PluginConfig) *Plugin { - name := strings.TrimSuffix(reflect.TypeOf(config).Elem().Name(), "Config") +func InstallPlugin[T config.Plugin](config T) *Plugin { + t := reflect.TypeOf(config).Elem() + name := strings.TrimSuffix(t.Name(), "Config") plugin := &Plugin{ - Name: name, - Config: config, - Modified: make(Config), + Name: name, + Config: config, } _, pluginFilePath, _, _ := runtime.Caller(1) configDir := filepath.Dir(pluginFilePath) @@ -65,24 +54,20 @@ func InstallPlugin(config PluginConfig) *Plugin { return plugin } -// Plugin 插件配置定义 +// Plugin 插件信息 type Plugin struct { - Name string //插件名称 - Config PluginConfig //插件配置 - Version string //插件版本 - RawConfig Config //配置的map形式方便查询 - Modified Config //修改过的配置项 -} - -func init() { - if parts := strings.Split(util.CurrentDir(), "@"); len(parts) > 1 { - Version = parts[len(parts)-1] - } + context.Context `json:"-"` + context.CancelFunc `json:"-"` + Name string //插件名称 + Config config.Plugin //插件配置 + Version string //插件版本 + RawConfig config.Config //配置的map形式方便查询 + Modified config.Config //修改过的配置项 } // Run 启动Monibuca引擎 func Run(ctx context.Context, configFile string) (err error) { - Ctx = ctx + Engine.Context = ctx if err := util.CreateShutdownScript(); err != nil { log.Print(Red("create shutdown script error:"), err) } @@ -95,29 +80,24 @@ func Run(ctx context.Context, configFile string) (err error) { log.Print(Red("create dir .m7s error:"), err) return } - util.Print(BgGreen(Black("Ⓜ starting m7s ")), BrightBlue(Version)) - var cg Config - var engineCg Config + util.Print(BgGreen(White("Ⓜ starting m7s "))) + var cg config.Config if ConfigRaw != nil { if err = yaml.Unmarshal(ConfigRaw, &cg); err == nil { - if cfg, ok := cg["engine"]; ok { - engineCg = cfg.(Config) - } + Engine.RawConfig = cg.GetChild("global") } } - go config.Update(engineCg) + Engine.registerHandler() + go EngineConfig.Update(Engine.RawConfig) for name, config := range Plugins { - if v, ok := cg[strings.ToLower(name)]; ok { - config.RawConfig = v.(Config) - } - config.merge() + config.RawConfig = cg.GetChild(name) + config.assign() } - UUID := uuid.NewString() reportTimer := time.NewTimer(time.Minute) req, _ := http.NewRequestWithContext(ctx, http.MethodGet, "http://monibuca.com:2022/report/engine", nil) req.Header.Set("os", runtime.GOOS) - req.Header.Set("version", Version) + req.Header.Set("version", Engine.Version) req.Header.Set("uuid", UUID) var c http.Client for { @@ -132,13 +112,19 @@ func Run(ctx context.Context, configFile string) (err error) { } func (opt *Plugin) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) { + if opt == nil { + return + } var cors bool if v, ok := opt.RawConfig["cors"]; ok { cors = v.(bool) - } else if config.HTTP.CORS { + } else if EngineConfig.CORS { cors = true } - config.HandleFunc("/"+strings.ToLower(opt.Name)+pattern, func(rw http.ResponseWriter, r *http.Request) { + if opt != Engine { + pattern = "/" + strings.ToLower(opt.Name) + pattern + } + Engine.HandleFunc(pattern, func(rw http.ResponseWriter, r *http.Request) { if cors { util.CORS(rw, r) } @@ -147,11 +133,16 @@ func (opt *Plugin) HandleFunc(pattern string, handler func(http.ResponseWriter, } func (opt *Plugin) HandleApi(pattern string, handler func(http.ResponseWriter, *http.Request)) { - opt.HandleFunc("/api"+pattern, handler) + if opt == nil { + return + } + pattern = "/api" + pattern + util.Println("http handle added:", pattern) + opt.HandleFunc(pattern, handler) } // 读取独立配置合并入总配置中 -func (opt *Plugin) merge() { +func (opt *Plugin) assign() { f, err := os.Open(opt.settingPath()) if err == nil { if err = yaml.NewDecoder(f).Decode(&opt.Modified); err == nil { @@ -162,8 +153,45 @@ func (opt *Plugin) merge() { } } } + t := reflect.TypeOf(opt.Config).Elem() + // 用全局配置覆盖没有设置的配置 + for i, j := 0, t.NumField(); i < j; i++ { + fname := t.Field(i).Name + if Engine.RawConfig.Has(fname) { + if !opt.RawConfig.Has(fname) { + opt.RawConfig.Set(fname, Engine.RawConfig[fname]) + } else if opt.RawConfig.HasChild(fname) { + opt.RawConfig.GetChild(fname).Merge(Engine.RawConfig.GetChild(fname)) + } + } + } + opt.registerHandler() + opt.Update() +} + +func (opt *Plugin) Update() { + if opt.CancelFunc != nil { + opt.CancelFunc() + } + opt.Context, opt.CancelFunc = context.WithCancel(Engine) go opt.Config.Update(opt.RawConfig) } + +func (opt *Plugin) registerHandler() { + t := reflect.TypeOf(opt.Config).Elem() + v := reflect.ValueOf(opt.Config).Elem() + // 注册http响应 + for i, j := 0, t.NumMethod(); i < j; i++ { + mt := t.Method(i) + if strings.HasPrefix(mt.Name, "API") { + parts := strings.Split(mt.Name, "_") + parts[0] = "" + patten := reflect.ValueOf(strings.Join(parts, "/")) + reflect.ValueOf(opt.HandleApi).Call([]reflect.Value{patten, v.Method(i)}) + } + } +} + func (opt *Plugin) settingPath() string { return filepath.Join(settingDir, strings.ToLower(opt.Name)+".yaml") } diff --git a/publisher.go b/publisher.go index 79f4a4c..9bbf798 100644 --- a/publisher.go +++ b/publisher.go @@ -1,8 +1,11 @@ package engine import ( + "io" "net/url" "time" + + "github.com/Monibuca/engine/v4/config" ) type IPublisher interface { @@ -12,26 +15,28 @@ type IPublisher interface { type Publisher struct { Type string - PullURL *url.URL *Stream `json:"-"` - Config PublishConfig } -func (pub *Publisher) Publish(streamPath string, realPub IPublisher) bool { +func (pub *Publisher) Publish(streamPath string, realPub IPublisher, config config.Publish) bool { Streams.Lock() defer Streams.Unlock() s, created := findOrCreateStream(streamPath, time.Second) if s.IsClosed() { return false } - if s.Publisher != nil && pub.Config.KillExit { - s.Publisher.Close() + if s.Publisher != nil { + if config.KillExit { + s.Publisher.Close() + } else { + return false + } } pub.Stream = s s.Publisher = realPub if created { - s.PublishTimeout = pub.Config.PublishTimeout.Duration() - s.WaitCloseTimeout = pub.Config.WaitCloseTimeout.Duration() + s.PublishTimeout = config.PublishTimeout.Duration() + s.WaitCloseTimeout = config.WaitCloseTimeout.Duration() go s.run() } s.actionChan <- PublishAction{} @@ -41,3 +46,14 @@ func (pub *Publisher) Publish(streamPath string, realPub IPublisher) bool { func (pub *Publisher) OnStateChange(oldState StreamState, newState StreamState) bool { return true } + +// 用于远程拉流的发布者 +type Puller struct { + Publisher + RemoteURL *url.URL + io.ReadCloser +} + +func (puller *Puller) Close() { + puller.ReadCloser.Close() +} diff --git a/stream.go b/stream.go index f801792..dff679b 100644 --- a/stream.go +++ b/stream.go @@ -65,6 +65,17 @@ var StreamFSM = [STATE_DESTROYED + 1]map[StreamAction]StreamState{ // Streams 所有的流集合 var Streams = util.Map[string, *Stream]{Map: make(map[string]*Stream)} +func FilterStreams[T IPublisher]() (ss []*Stream) { + Streams.RLock() + defer Streams.RUnlock() + for _, s := range Streams.Map { + if _, ok := s.Publisher.(T); ok { + ss = append(ss, s) + } + } + return +} + type UnSubscibeAction *Subscriber type PublishAction struct{} type UnPublishAction struct{} @@ -125,7 +136,7 @@ func findOrCreateStream(streamPath string, waitTimeout time.Duration) (s *Stream s.actionChan = make(chan any, 1) s.StartTime = time.Now() s.timeout = time.NewTimer(waitTimeout) - s.Context, s.cancel = context.WithCancel(Ctx) + s.Context, s.cancel = context.WithCancel(Engine) s.Init(s) return s, true } diff --git a/subscriber.go b/subscriber.go index 23de78d..5c17d43 100644 --- a/subscriber.go +++ b/subscriber.go @@ -6,6 +6,7 @@ import ( "time" . "github.com/Monibuca/engine/v4/common" + "github.com/Monibuca/engine/v4/config" "github.com/Monibuca/engine/v4/track" ) @@ -16,7 +17,7 @@ type VideoFrame AVFrame[NALUSlice] type Subscriber struct { context.Context `json:"-"` cancel context.CancelFunc - Config SubscribeConfig + Config config.Subscribe Stream *Stream `json:"-"` ID string TotalDrop int //总丢帧 @@ -39,7 +40,7 @@ func (s *Subscriber) Close() { } //Subscribe 开始订阅 将Subscriber与Stream关联 -func (sub *Subscriber) Subscribe(streamPath string, config SubscribeConfig) bool { +func (sub *Subscriber) Subscribe(streamPath string, config config.Subscribe) bool { Streams.Lock() defer Streams.Unlock() s, created := findOrCreateStream(streamPath, config.WaitTimeout.Duration()) diff --git a/track/aac.go b/track/aac.go index c28e86b..13bdf63 100644 --- a/track/aac.go +++ b/track/aac.go @@ -1,6 +1,7 @@ package track import ( + "net" "time" "github.com/Monibuca/engine/v4/codec" @@ -21,8 +22,7 @@ type AAC Audio func (aac *AAC) WriteAVCC(ts uint32, frame AVCCFrame) { if frame.IsSequence() { - aac.DecoderConfiguration.Reset() - aac.DecoderConfiguration.AppendAVCC(frame) + aac.DecoderConfiguration.AVCC = AudioSlice(frame) config1, config2 := frame[2], frame[3] //audioObjectType = (config1 & 0xF8) >> 3 // 1 AAC MAIN ISO/IEC 14496-3 subpart 4 @@ -31,8 +31,8 @@ func (aac *AAC) WriteAVCC(ts uint32, frame AVCCFrame) { // 4 AAC LTP ISO/IEC 14496-3 subpart 4 aac.Channels = ((config2 >> 3) & 0x0F) //声道 aac.SampleRate = uint32(codec.SamplingFrequencies[((config1&0x7)<<1)|(config2>>7)]) - aac.DecoderConfiguration.AppendRaw(AudioSlice(frame[2:])) - aac.DecoderConfiguration.FillFLV(codec.FLV_TAG_TYPE_AUDIO, 0) + aac.DecoderConfiguration.Raw = AudioSlice(frame[2:]) + aac.DecoderConfiguration.FLV = net.Buffers{adcflv1, frame, adcflv2} } else { (*Audio)(aac).WriteAVCC(ts, frame) } diff --git a/track/audio.go b/track/audio.go index c3a255f..22e03b8 100644 --- a/track/audio.go +++ b/track/audio.go @@ -1,6 +1,7 @@ package track import ( + "net" "strings" "github.com/Monibuca/engine/v4/codec" @@ -8,6 +9,9 @@ import ( "github.com/Monibuca/engine/v4/util" ) +var adcflv1 = []byte{codec.FLV_TAG_TYPE_AUDIO, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0} +var adcflv2 = []byte{0, 0, 0, 15} + type Audio struct { Media[AudioSlice] Channels byte @@ -36,12 +40,18 @@ func (at *Audio) Play(onAudio func(*AVFrame[AudioSlice]) error) { } } func (at *Audio) WriteADTS(adts []byte) { - at.SampleRate = uint32(codec.SamplingFrequencies[(adts[2]&0x3c)>>2]) - at.Channels = ((adts[2] & 0x1) << 2) | ((adts[3] & 0xc0) >> 6) - at.DecoderConfiguration.AppendAVCC(codec.ADTSToAudioSpecificConfig(adts)) - at.DecoderConfiguration.AppendRaw(at.DecoderConfiguration.AVCC[0][2:]) - at.DecoderConfiguration.FillFLV(codec.FLV_TAG_TYPE_AUDIO, 0) + profile := ((adts[2] & 0xc0) >> 6) + 1 + sampleRate := (adts[2] & 0x3c) >> 2 + channel := ((adts[2] & 0x1) << 2) | ((adts[3] & 0xc0) >> 6) + config1 := (profile << 3) | ((sampleRate & 0xe) >> 1) + config2 := ((sampleRate & 0x1) << 7) | (channel << 3) + at.SampleRate = uint32(codec.SamplingFrequencies[sampleRate]) + at.Channels = channel + at.DecoderConfiguration.AVCC = []byte{0xAF, 0x00, config1, config2} + at.DecoderConfiguration.Raw = at.DecoderConfiguration.AVCC[:2] + at.DecoderConfiguration.FLV = net.Buffers{adcflv1, at.DecoderConfiguration.AVCC, adcflv2} } + func (at *Audio) WriteAVCC(ts uint32, frame AVCCFrame) { at.Media.WriteAVCC(ts, frame) at.Flush() diff --git a/track/base.go b/track/base.go index 8c54574..b1fd60e 100644 --- a/track/base.go +++ b/track/base.go @@ -29,7 +29,7 @@ type Media[T RawSlice] struct { CodecID byte SampleRate uint32 SampleSize byte - DecoderConfiguration AVFrame[T] `json:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) AAC(config) + DecoderConfiguration DecoderConfiguration[T] `json:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) AAC(config) util.BytesPool //无锁内存池,用于发布者(在同一个协程中)复用小块的内存,通常是解包时需要临时使用 lastAvccTS uint32 //上一个avcc帧的时间戳 } diff --git a/track/h264.go b/track/h264.go index 77c8079..7e6593d 100644 --- a/track/h264.go +++ b/track/h264.go @@ -28,20 +28,29 @@ func (vt *H264) WriteAnnexB(pts uint32, dts uint32, frame AnnexBFrame) { func (vt *H264) WriteSlice(slice NALUSlice) { switch slice.H264Type() { case codec.NALU_SPS: - vt.DecoderConfiguration.Reset() - vt.DecoderConfiguration.AppendRaw(slice) + if len(vt.DecoderConfiguration.Raw) > 0 { + vt.DecoderConfiguration.Raw = vt.DecoderConfiguration.Raw[:0] + } + vt.DecoderConfiguration.Raw = append(vt.DecoderConfiguration.Raw, slice[0]) case codec.NALU_PPS: - vt.DecoderConfiguration.AppendRaw(slice) + vt.DecoderConfiguration.Raw = append(vt.DecoderConfiguration.Raw, slice[0]) vt.SPSInfo, _ = codec.ParseSPS(slice[0]) - lenSPS := util.SizeOfBuffers(net.Buffers(vt.DecoderConfiguration.Raw[0])) - lenPPS := util.SizeOfBuffers(net.Buffers(vt.DecoderConfiguration.Raw[1])) + if len(vt.DecoderConfiguration.Raw) > 0 { + vt.DecoderConfiguration.Raw = vt.DecoderConfiguration.Raw[:0] + } + lenSPS := len(vt.DecoderConfiguration.Raw[0]) + lenPPS := len(vt.DecoderConfiguration.Raw[1]) + if len(vt.DecoderConfiguration.AVCC) > 0 { + vt.DecoderConfiguration.AVCC = vt.DecoderConfiguration.AVCC[:0] + } if lenSPS > 3 { - vt.DecoderConfiguration.AppendAVCC(codec.RTMP_AVC_HEAD[:6], vt.DecoderConfiguration.Raw[0][0][1:4]) + vt.DecoderConfiguration.AVCC = append(vt.DecoderConfiguration.AVCC, codec.RTMP_AVC_HEAD[:6], vt.DecoderConfiguration.Raw[0][1:4]) } else { - vt.DecoderConfiguration.AppendAVCC(codec.RTMP_AVC_HEAD) + vt.DecoderConfiguration.AVCC = append(vt.DecoderConfiguration.AVCC, codec.RTMP_AVC_HEAD) } tmp := []byte{0xE1, 0, 0, 0x01, 0, 0} - vt.DecoderConfiguration.AppendAVCC(tmp[:1], util.PutBE(tmp[1:3], lenSPS), vt.DecoderConfiguration.Raw[0][0], tmp[3:4], util.PutBE(tmp[3:6], lenPPS), vt.DecoderConfiguration.Raw[1][0]) + vt.DecoderConfiguration.AVCC = append(vt.DecoderConfiguration.AVCC, tmp[:1], util.PutBE(tmp[1:3], lenSPS), vt.DecoderConfiguration.Raw[0], tmp[3:4], util.PutBE(tmp[3:6], lenPPS), vt.DecoderConfiguration.Raw[1]) + vt.DecoderConfiguration.FLV = codec.VideoAVCC2FLV(net.Buffers(vt.DecoderConfiguration.AVCC), 0) case codec.NALU_IDR_Picture: vt.Value.IFrame = true fallthrough @@ -53,16 +62,17 @@ func (vt *H264) WriteSlice(slice NALUSlice) { func (vt *H264) WriteAVCC(ts uint32, frame AVCCFrame) { if frame.IsSequence() { - vt.DecoderConfiguration.Reset() - vt.DecoderConfiguration.SeqInTrack = vt.Value.SeqInTrack - vt.DecoderConfiguration.AppendAVCC(frame) + if len(vt.DecoderConfiguration.AVCC) > 0 { + vt.DecoderConfiguration.AVCC = vt.DecoderConfiguration.AVCC[:0] + } + vt.DecoderConfiguration.AVCC = append(vt.DecoderConfiguration.AVCC, frame) var info codec.AVCDecoderConfigurationRecord if _, err := info.Unmarshal(frame[5:]); err == nil { vt.SPSInfo, _ = codec.ParseSPS(info.SequenceParameterSetNALUnit) vt.nalulenSize = int(info.LengthSizeMinusOne&3 + 1) - vt.DecoderConfiguration.AppendRaw(NALUSlice{info.SequenceParameterSetNALUnit}, NALUSlice{info.PictureParameterSetNALUnit}) + vt.DecoderConfiguration.Raw = NALUSlice{info.SequenceParameterSetNALUnit, info.PictureParameterSetNALUnit} } - vt.DecoderConfiguration.FillFLV(codec.FLV_TAG_TYPE_VIDEO, 0) + vt.DecoderConfiguration.FLV = codec.VideoAVCC2FLV(net.Buffers(vt.DecoderConfiguration.AVCC), 0) } else { (*Video)(vt).WriteAVCC(ts, frame) vt.Value.IFrame = frame.IsIDR() diff --git a/track/h265.go b/track/h265.go index 6c67d69..889d4f1 100644 --- a/track/h265.go +++ b/track/h265.go @@ -1,6 +1,7 @@ package track import ( + "net" "time" "github.com/Monibuca/engine/v4/codec" @@ -26,17 +27,23 @@ func (vt *H265) WriteAnnexB(pts uint32, dts uint32, frame AnnexBFrame) { func (vt *H265) WriteSlice(slice NALUSlice) { switch slice.H265Type() { case codec.NAL_UNIT_VPS: - vt.DecoderConfiguration.Reset() - vt.DecoderConfiguration.AppendRaw(slice) + if len(vt.DecoderConfiguration.Raw) > 0 { + vt.DecoderConfiguration.Raw = vt.DecoderConfiguration.Raw[:0] + } + vt.DecoderConfiguration.Raw = append(vt.DecoderConfiguration.Raw, slice[0]) case codec.NAL_UNIT_SPS: - vt.DecoderConfiguration.AppendRaw(slice) + vt.DecoderConfiguration.Raw = append(vt.DecoderConfiguration.Raw, slice[0]) vt.SPSInfo, _ = codec.ParseHevcSPS(slice[0]) case codec.NAL_UNIT_PPS: - vt.DecoderConfiguration.AppendRaw(slice) - extraData, err := codec.BuildH265SeqHeaderFromVpsSpsPps(vt.DecoderConfiguration.Raw[0][0], vt.DecoderConfiguration.Raw[1][0], vt.DecoderConfiguration.Raw[2][0]) + vt.DecoderConfiguration.Raw = append(vt.DecoderConfiguration.Raw, slice[0]) + extraData, err := codec.BuildH265SeqHeaderFromVpsSpsPps(vt.DecoderConfiguration.Raw[0], vt.DecoderConfiguration.Raw[1], vt.DecoderConfiguration.Raw[2]) if err == nil { - vt.DecoderConfiguration.AppendAVCC(extraData) + if len(vt.DecoderConfiguration.AVCC) > 0 { + vt.DecoderConfiguration.AVCC = vt.DecoderConfiguration.AVCC[:0] + } + vt.DecoderConfiguration.AVCC = append(vt.DecoderConfiguration.AVCC, extraData) } + vt.DecoderConfiguration.FLV = codec.VideoAVCC2FLV(net.Buffers(vt.DecoderConfiguration.AVCC), 0) case codec.NAL_UNIT_CODED_SLICE_BLA, codec.NAL_UNIT_CODED_SLICE_BLANT, @@ -52,15 +59,16 @@ func (vt *H265) WriteSlice(slice NALUSlice) { } func (vt *H265) WriteAVCC(ts uint32, frame AVCCFrame) { if frame.IsSequence() { - vt.DecoderConfiguration.Reset() - vt.DecoderConfiguration.SeqInTrack = vt.Value.SeqInTrack - vt.DecoderConfiguration.AppendAVCC(frame) + if len(vt.DecoderConfiguration.AVCC) > 0 { + vt.DecoderConfiguration.AVCC = vt.DecoderConfiguration.AVCC[:0] + } + vt.DecoderConfiguration.AVCC = append(vt.DecoderConfiguration.AVCC, frame) if vps, sps, pps, err := codec.ParseVpsSpsPpsFromSeqHeaderWithoutMalloc(frame); err == nil { vt.SPSInfo, _ = codec.ParseHevcSPS(frame) vt.nalulenSize = int(frame[26]) & 0x03 - vt.DecoderConfiguration.AppendRaw(NALUSlice{vps}, NALUSlice{sps}, NALUSlice{pps}) + vt.DecoderConfiguration.Raw = NALUSlice{vps, sps, pps} } - vt.DecoderConfiguration.FillFLV(codec.FLV_TAG_TYPE_VIDEO, 0) + vt.DecoderConfiguration.FLV = codec.VideoAVCC2FLV(net.Buffers(vt.DecoderConfiguration.AVCC), 0) } else { (*Video)(vt).WriteAVCC(ts, frame) vt.Value.IFrame = frame.IsIDR() diff --git a/util/buffer.go b/util/buffer.go index 9e3817f..6e66399 100644 --- a/util/buffer.go +++ b/util/buffer.go @@ -95,6 +95,7 @@ func SizeOfBuffers(buf net.Buffers) (size int) { func CutBuffers(buf net.Buffers, size int) { } + // SplitBuffers 按照一定大小分割 Buffers func SplitBuffers(buf net.Buffers, size int) (result []net.Buffers) { for total := SizeOfBuffers(buf); total > 0; { @@ -122,3 +123,5 @@ func SplitBuffers(buf net.Buffers, size int) (result []net.Buffers) { } return } + + diff --git a/util/slice.go b/util/slice.go index fbdf52b..10c4255 100644 --- a/util/slice.go +++ b/util/slice.go @@ -19,3 +19,14 @@ func (s *Slice[T]) Delete(v T) bool { } return false } + +func (s *Slice[T]) Reset() { + if len(*s) > 0 { + *s = (*s)[:0] + } +} + +func (s *Slice[T]) ResetAppend(first T) { + s.Reset() + s.Add(first) +}