diff --git a/config/config.go b/config/config.go index 9091ee6..65ce8ef 100644 --- a/config/config.go +++ b/config/config.go @@ -6,7 +6,6 @@ import ( "reflect" "strings" - "go.uber.org/zap" "m7s.live/engine/v4/log" ) @@ -162,11 +161,11 @@ func (config Config) Merge(source Config) { case Config: m.Merge(v.(Config)) default: - log.Debug("merge", zap.String("k", k), zap.Any("v", v)) + log.Debug("merge", k, v) config[k] = v } } else { - log.Debug("exist", zap.String("k", k)) + log.Debug("exist", k) } } } diff --git a/main.go b/main.go index 22aab09..8d6da33 100755 --- a/main.go +++ b/main.go @@ -9,6 +9,7 @@ import ( "net/http" "os" "path/filepath" + "reflect" "runtime" "strings" "time" @@ -96,12 +97,20 @@ func Run(ctx context.Context, configFile string) (err error) { EventBus = make(chan any, EngineConfig.EventBusSize) go EngineConfig.Listen(Engine) for name, plugin := range Plugins { - plugin.RawConfig = cg.GetChild(name) - if plugin.RawConfig != nil { - if b, err := yaml.Marshal(plugin.RawConfig); err == nil { + userConfig := cg.GetChild(name) + if userConfig != nil { + if b, err := yaml.Marshal(userConfig); err == nil { plugin.Yaml = string(b) } } + if defaultYaml := reflect.ValueOf(plugin.Config).Elem().FieldByName("DefaultYaml"); defaultYaml.IsValid() { + if err := yaml.Unmarshal([]byte(defaultYaml.String()), &plugin.RawConfig); err != nil { + log.Error("parsing default config error:", err) + } + } + if plugin.Yaml != "" { + yaml.Unmarshal([]byte(plugin.Yaml), &plugin.RawConfig) + } plugin.assign() } UUID := uuid.NewString() diff --git a/plugin.go b/plugin.go index e6a85ca..2dad72e 100644 --- a/plugin.go +++ b/plugin.go @@ -48,6 +48,7 @@ func InstallPlugin(config config.Plugin) *Plugin { } type FirstConfig config.Config +type DefaultYaml string // Plugin 插件信息 type Plugin struct { @@ -91,21 +92,21 @@ func (opt *Plugin) handle(pattern string, handler http.Handler) { } // 读取独立配置合并入总配置中 -// TODO: 覆盖逻辑有待商榷 func (opt *Plugin) assign() { f, err := os.Open(opt.settingPath()) defer f.Close() if err == nil { var b []byte b, err = io.ReadAll(f) - opt.modifiedYaml = string(b) - if err = yaml.Unmarshal(b, &opt.Modified); err == nil { - if opt.RawConfig == nil { - opt.RawConfig = opt.Modified - } else { - opt.RawConfig.Assign(opt.Modified) + if err == nil { + opt.modifiedYaml = string(b) + if err = yaml.Unmarshal(b, &opt.Modified); err == nil { + err = yaml.Unmarshal(b, &opt.RawConfig) } } + if err != nil { + opt.Warn("assign config failed", zap.Error(err)) + } } if opt == Engine { opt.registerHandler() @@ -150,6 +151,7 @@ func (opt *Plugin) run() { if err != nil { panic(err) } + delete(opt.RawConfig, "defaultyaml") opt.Debug("config", zap.Any("config", opt.Config)) // opt.RawConfig = config.Struct2Config(opt.Config) if conf, ok := opt.Config.(config.HTTPConfig); ok { diff --git a/publisher-ts.go b/publisher-ts.go index 8991914..635cdce 100644 --- a/publisher-ts.go +++ b/publisher-ts.go @@ -87,8 +87,8 @@ func (t *TSPublisher) OnPES(pes mpegts.MpegTsPESPacket) { t.AudioTrack.WriteSlice(pes.Payload[7:frameLen]) pes.Payload = pes.Payload[frameLen:remainLen] remainLen -= frameLen - t.AudioTrack.Flush() } + t.AudioTrack.Flush() case *track.G711: t.AudioTrack.WriteRaw(uint32(pes.Header.Pts), pes.Payload) } diff --git a/subscriber.go b/subscriber.go index 577e57c..70e7288 100644 --- a/subscriber.go +++ b/subscriber.go @@ -348,11 +348,10 @@ func (s *Subscriber) PlayBlock(subType byte) { time.Sleep(time.Second) } } - println("exit") } func (s *Subscriber) onStop() { - if !s.IsClosed() { + if !s.Stream.IsClosed() { s.Info("stop") if !s.IsInternal { s.Stream.Receive(s.Spesific) diff --git a/track/base.go b/track/base.go index 5fa51a1..e8a10d1 100644 --- a/track/base.go +++ b/track/base.go @@ -26,6 +26,7 @@ func (p *流速控制) 时间戳差(绝对时间戳 uint32) time.Duration { } func (p *流速控制) 控制流速(绝对时间戳 uint32) { 数据时间差, 实际时间差 := p.时间戳差(绝对时间戳), time.Since(p.起始时间) + // println("数据时间差", 数据时间差, "实际时间差", 实际时间差, "绝对时间戳", 绝对时间戳, "起始时间戳", p.起始时间戳, "起始时间", p.起始时间.Format("2006-01-02 15:04:05")) // if 实际时间差 > 数据时间差 { // p.重置(绝对时间戳) // return @@ -136,6 +137,7 @@ func (av *Media[T]) Flush() { av.重置(curValue.AbsTime) } else { curValue.DeltaTime = (curValue.DTS - preValue.DTS) / 90 + println(curValue.DeltaTime ,curValue.DTS , preValue.DTS) curValue.AbsTime = preValue.AbsTime + curValue.DeltaTime } av.Base.Flush(&curValue.BaseFrame) diff --git a/track/h264.go b/track/h264.go index 9742051..3716f99 100644 --- a/track/h264.go +++ b/track/h264.go @@ -36,7 +36,6 @@ func (vt *H264) WriteAnnexB(pts uint32, dts uint32, frame AnnexBFrame) { if len(vt.Value.Raw) > 0 { vt.Flush() } - // println(vt.Value.DTS, vt.Value.PTS, vt.Value.PTS-vt.Value.DTS, len(frame)) // println(vt.FPS) } func (vt *H264) WriteSlice(slice NALUSlice) {