diff --git a/common/frame.go b/common/frame.go index 238678f..d2af598 100644 --- a/common/frame.go +++ b/common/frame.go @@ -67,6 +67,7 @@ type AVFrame struct { AVCC util.BLL `json:"-"` // 打包好的AVCC格式(MPEG-4格式、Byte-Stream Format) RTP util.List[RTPFrame] `json:"-"` AUList util.BLLs `json:"-"` // 裸数据 + Extras any `json:"-"` // 任意扩展数据 } func (av *AVFrame) WriteAVCC(ts uint32, frame *util.BLL) { diff --git a/common/index.go b/common/index.go index acebfeb..27fdd40 100644 --- a/common/index.go +++ b/common/index.go @@ -31,39 +31,37 @@ type Base struct { ts time.Time bytes int frames int + drops int //丢帧数 BPS int FPS int - RawSize int // 裸数据长度 - RawPart []int // 裸数据片段用于UI上显示 - BPSs []TimelineData[int] // 10s码率统计 - FPSs []TimelineData[int] // 10s帧率统计 + Drops int // 丢帧率 + RawSize int // 裸数据长度 + RawPart []int // 裸数据片段用于UI上显示 } func (bt *Base) ComputeBPS(bytes int) { bt.bytes += bytes bt.frames++ if elapse := time.Since(bt.ts).Seconds(); elapse > 1 { - bt.BPS = bt.bytes / int(elapse) - bt.FPS = bt.frames / int(elapse) + bt.BPS = int(float64(bt.bytes) / elapse) + bt.FPS = int(float64(bt.frames) / elapse) + bt.Drops = int(float64(bt.drops) / elapse) bt.bytes = 0 bt.frames = 0 + bt.drops = 0 bt.ts = time.Now() - bt.BPSs = append(bt.BPSs, TimelineData[int]{Timestamp: bt.ts, Value: bt.BPS}) - if len(bt.BPSs) > 9 { - copy(bt.BPSs, bt.BPSs[1:]) - bt.BPSs = bt.BPSs[:10] - } - bt.FPSs = append(bt.FPSs, TimelineData[int]{Timestamp: bt.ts, Value: bt.FPS}) - if len(bt.FPSs) > 10 { - copy(bt.FPSs, bt.FPSs[1:]) - bt.FPSs = bt.FPSs[:10] - } } } func (bt *Base) GetBase() *Base { return bt } + +// GetRBSize 获取缓冲区大小 +func (bt *Base) GetRBSize() int { + return 0 +} + func (bt *Base) SnapForJson() { } func (bt *Base) Flush(bf *BaseFrame) { @@ -89,6 +87,7 @@ type Track interface { LastWriteTime() time.Time SnapForJson() SetStuff(stuff ...any) + GetRBSize() int } type AVTrack interface { diff --git a/common/stream.go b/common/stream.go index 2f9f7f9..54ec5fe 100644 --- a/common/stream.go +++ b/common/stream.go @@ -9,7 +9,7 @@ import ( ) type IStream interface { - AddTrack(*util.Promise[Track]) + AddTrack(Track) *util.Promise[Track] RemoveTrack(Track) Close() IsClosed() bool diff --git a/config/http.go b/config/http.go index 586606a..90abab6 100644 --- a/config/http.go +++ b/config/http.go @@ -19,7 +19,7 @@ type HTTP struct { ListenAddrTLS string CertFile string KeyFile string - CORS bool //是否自动添加CORS头 + CORS bool `default:"true"` //是否自动添加CORS头 UserName string Password string ReadTimeout time.Duration diff --git a/config/remote.go b/config/remote.go index 754205f..6e98098 100644 --- a/config/remote.go +++ b/config/remote.go @@ -47,20 +47,28 @@ func (cfg *Engine) Remote(ctx context.Context) error { conn, err := quic.DialAddr(cfg.Server, tlsConf, &quic.Config{ KeepAlivePeriod: time.Second * 10, + EnableDatagrams: true, }) wasConnected := err == nil if stream := quic.Stream(nil); err == nil { if stream, err = conn.OpenStreamSync(ctx); err == nil { - _, err = stream.Write([]byte(cfg.Secret + "\n")) + _, err = stream.Write(append([]byte{1}, (cfg.Secret + "\n")...)) if msg := []byte(nil); err == nil { - if msg, err = io.ReadAll(stream); err == nil { - var rMessage map[string]interface{} - if err = json.Unmarshal(msg, &rMessage); err == nil { + if msg, err = bufio.NewReader(stream).ReadSlice(0); err == nil { + var rMessage map[string]any + if err = json.Unmarshal(msg[:len(msg)-1], &rMessage); err == nil { if rMessage["code"].(float64) != 0 { log.Error("response from console server ", cfg.Server, " ", rMessage["msg"]) return nil } else { - log.Info("response from console server ", cfg.Server, " success") + cfg.reportStream = stream + log.Info("response from console server ", cfg.Server, " success ", rMessage) + if v, ok := rMessage["enableReport"]; ok { + cfg.enableReport = v.(bool) + } + if v, ok := rMessage["instanceId"]; ok { + cfg.instanceId = v.(string) + } } } } diff --git a/config/types.go b/config/types.go index c7da92e..70383c6 100755 --- a/config/types.go +++ b/config/types.go @@ -6,7 +6,7 @@ import ( "strings" "time" - "github.com/mcuadros/go-defaults" + "github.com/quic-go/quic-go" "golang.org/x/net/websocket" "m7s.live/engine/v4/log" "m7s.live/engine/v4/util" @@ -121,39 +121,25 @@ type Engine struct { RTPReorderBufferLen int `default:"50"` //RTP重排序缓冲长度 SpeedLimit time.Duration `default:"500ms"` //速度限制最大等待时间 EventBusSize int `default:"10"` //事件总线大小 + enableReport bool `default:"false"` //启用报告,用于统计和监控 + reportStream quic.Stream // console server connection + instanceId string // instance id 来自console } -var Global = &Engine{ - // Publish: Publish{true, true, false, 10, 0, 0, 0}, - // Subscribe: Subscribe{ - // SubAudio: true, - // SubVideo: true, - // SubVideoArgName: "vts", - // SubAudioArgName: "ats", - // SubDataArgName: "dts", - // SubAudioTracks: nil, - // SubVideoTracks: nil, - // SubMode: 0, - // IFrameOnly: false, - // WaitTimeout: 10, - // }, - HTTP: HTTP{ListenAddr: ":8080", CORS: true, mux: http.DefaultServeMux}, - // RTPReorder: true, - // EnableAVCC: true, - // EnableRTP: true, - // EnableSubEvent: true, - // EnableAuth: true, - // Console: Console{ - // "console.monibuca.com:4242", "", "", "", - // }, - // LogLevel: "info", - // RTPReorderBufferLen: 50, - // SpeedLimit: 500, - // EventBusSize: 10, +func (cfg *Engine) GetEnableReport() bool { + return cfg.enableReport } -func init() { - defaults.SetDefaults(Global) +func (cfg *Engine) GetInstanceId() string { + return cfg.instanceId +} + +var Global *Engine + +func (cfg *Engine) InitDefaultHttp() { + Global = cfg + cfg.HTTP.mux = http.DefaultServeMux + cfg.HTTP.ListenAddr = ":8080" } type myResponseWriter struct { @@ -233,6 +219,11 @@ func (cfg *Engine) WsRemote() { func (cfg *Engine) OnEvent(event any) { switch v := event.(type) { + case []byte: + if cfg.reportStream != nil { + cfg.reportStream.Write(v) + cfg.reportStream.Write([]byte{0}) + } case context.Context: util.RTPReorderBufferLen = uint16(cfg.RTPReorderBufferLen) if strings.HasPrefix(cfg.Console.Server, "wss") { diff --git a/events.go b/events.go new file mode 100644 index 0000000..d8e5db4 --- /dev/null +++ b/events.go @@ -0,0 +1,84 @@ +package engine + +import ( + "time" + + "m7s.live/engine/v4/common" +) + +type Event[T any] struct { + Time time.Time + Target T `json:"-"` +} + +func CreateEvent[T any](target T) (event Event[T]) { + event.Time = time.Now() + event.Target = target + return +} + +// PulseEvent 心跳事件 +type PulseEvent struct { + Event[struct{}] +} + +type StreamEvent struct { + Event[*Stream] +} + +// StateEvent 状态机事件 +type StateEvent struct { + StreamEvent + Action StreamAction + From StreamState +} + +// ErrorEvent 错误事件 +type ErrorEvent struct { + Event[any] + Error error +} + +func (se StateEvent) Next() (next StreamState, ok bool) { + next, ok = StreamFSM[se.From][se.Action] + return +} + +type SEwaitPublish struct { + StateEvent + Publisher IPublisher +} + +type SEpublish struct { + StateEvent +} + +type SErepublish struct { + StateEvent +} + +type SEwaitClose struct { + StateEvent +} +type SEclose struct { + StateEvent +} +type SEcreate struct { + StreamEvent +} + +type SEKick struct { + Event[struct{}] +} + +type UnsubscribeEvent struct { + Event[ISubscriber] +} + +type AddTrackEvent struct { + Event[common.Track] +} + +type TrackTimeoutEvent struct { + Event[common.Track] +} diff --git a/go.mod b/go.mod index 09ee048..e7d57f5 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( ) require ( + github.com/denisbrodbeck/machineid v1.0.1 github.com/go-ole/go-ole v1.2.6 // indirect github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect github.com/golang/mock v1.6.0 // indirect diff --git a/go.sum b/go.sum index cb2b20d..13992e4 100644 --- a/go.sum +++ b/go.sum @@ -15,6 +15,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/denisbrodbeck/machineid v1.0.1 h1:geKr9qtkB876mXguW2X6TU4ZynleN6ezuMSRhl4D7AQ= +github.com/denisbrodbeck/machineid v1.0.1/go.mod h1:dJUwb7PTidGDeYyUBmXZ2GphQBbjJCrnectwCyxcUSI= github.com/emitter-io/address v1.0.0/go.mod h1:GfZb5+S/o8694B1GMGK2imUYQyn2skszMvGNA5D84Ug= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= diff --git a/http.go b/http.go index 1ca78b8..7c1a51a 100644 --- a/http.go +++ b/http.go @@ -19,7 +19,7 @@ const ( ) type GlobalConfig struct { - *config.Engine + config.Engine } func (conf *GlobalConfig) ServeHTTP(rw http.ResponseWriter, r *http.Request) { diff --git a/io.go b/io.go index cdc49d0..6f2fd42 100644 --- a/io.go +++ b/io.go @@ -103,6 +103,10 @@ type IIO interface { // Stop 停止订阅或者发布,由订阅者或者发布者调用 func (io *IO) Stop() { + if io.IsClosed() { + return + } + io.Debug("stop", zap.Stack("stack")) if io.CancelFunc != nil { io.CancelFunc() } diff --git a/main.go b/main.go index 3437b6f..c0f98c7 100755 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package engine // import "m7s.live/engine/v4" import ( "bytes" "context" + "encoding/json" "fmt" "io/ioutil" "net" @@ -14,6 +15,7 @@ import ( "strings" "time" + "github.com/denisbrodbeck/machineid" "github.com/google/uuid" . "github.com/logrusorgru/aurora" "go.uber.org/zap" @@ -36,11 +38,9 @@ var ( ConfigRaw []byte Plugins = make(map[string]*Plugin) // Plugins 所有的插件配置 plugins []*Plugin //插件列表 - EngineConfig = &GlobalConfig{ - Engine: config.Global, - } + EngineConfig = &GlobalConfig{} + Engine = InstallPlugin(EngineConfig) settingDir = filepath.Join(ExecDir, ".m7s") //配置缓存目录,该目录按照插件名称作为文件名存储修改过的配置 - Engine = InstallPlugin(EngineConfig) //复用安装插件逻辑,将全局配置信息注入,并启动server MergeConfigs = []string{"Publish", "Subscribe", "HTTP"} //需要合并配置的属性项,插件若没有配置则使用全局配置 EventBus chan any apiList []string //注册到引擎的API接口列表 @@ -54,13 +54,14 @@ func init() { // Run 启动Monibuca引擎,传入总的Context,可用于关闭所有 func Run(ctx context.Context, configFile string) (err error) { + id, _ := machineid.ProtectedID("monibuca") SysInfo.StartTime = time.Now() SysInfo.Version = Engine.Version Engine.Context = ctx - if _, err := os.Stat(configFile); err != nil { + if _, err = os.Stat(configFile); err != nil { configFile = filepath.Join(ExecDir, configFile) } - if err := util.CreateShutdownScript(); err != nil { + if err = util.CreateShutdownScript(); err != nil { log.Error("create shutdown script error:", err) } if ConfigRaw, err = ioutil.ReadFile(configFile); err != nil { @@ -70,7 +71,7 @@ func Run(ctx context.Context, configFile string) (err error) { log.Error("create dir .m7s error:", err) return } - log.Info(Blink("Ⓜ starting m7s v4")) + log.Info("Ⓜ starting engine:", Blink(Engine.Version)) var cg config.Config if ConfigRaw != nil { if err = yaml.Unmarshal(ConfigRaw, &cg); err == nil { @@ -79,7 +80,7 @@ func Run(ctx context.Context, configFile string) (err error) { Engine.Yaml = string(b) } //将配置信息同步到结构体 - Engine.RawConfig.Unmarshal(config.Global) + Engine.RawConfig.Unmarshal(&EngineConfig.Engine) } else { log.Error("parsing yml error:", err) } @@ -123,35 +124,63 @@ func Run(ctx context.Context, configFile string) (err error) { if ver, ok := ctx.Value("version").(string); ok && ver != "" && ver != "dev" { version = ver } - log.Info(Blink("m7s@"+version), " start success") - content := fmt.Sprintf(`{"uuid":"%s","version":"%s","os":"%s","arch":"%s"}`, UUID, version, runtime.GOOS, runtime.GOARCH) + log.Info("monibuca", version, Green(" start success")) + var enabledPlugins, disabledPlugins []string + for _, plugin := range plugins { + if plugin.RawConfig["enable"] == false { + plugin.Disabled = true + disabledPlugins = append(disabledPlugins, plugin.Name) + } else { + enabledPlugins = append(enabledPlugins, plugin.Name) + } + } + fmt.Print("已运行的插件:") + for _, plugin := range enabledPlugins { + fmt.Print(Colorize(" "+plugin+" ", BlackFg|GreenBg|BoldFm), " ") + } + fmt.Println() + fmt.Print("已禁用的插件:") + for _, plugin := range disabledPlugins { + fmt.Print(Colorize(" "+plugin+" ", BlackFg|RedBg|CrossedOutFm), " ") + } + fmt.Println() + fmt.Println(Bold(Cyan("官网地址: ")), Yellow("https://m7s.live")) + fmt.Println(Bold(Cyan("启动工程: ")), Yellow("https://github.com/langhuihui/monibuca")) + fmt.Println(Bold(Cyan("使用文档: ")), Yellow("https://m7s.live/guide/introduction.html")) + fmt.Println(Bold(Cyan("开发文档: ")), Yellow("https://m7s.live/devel/startup.html")) + fmt.Println(Bold(Cyan("视频教程: ")), Yellow("https://space.bilibili.com/328443019/channel/collectiondetail?sid=514619")) + fmt.Println(Bold(Cyan("远程界面: ")), Yellow("https://console.monibuca.com")) + rp := struct { + UUID string `json:"uuid"` + Machine string `json:"machine"` + Instance string `json:"instance"` + Version string `json:"version"` + OS string `json:"os"` + Arch string `json:"arch"` + }{UUID, id, EngineConfig.GetInstanceId(), version, runtime.GOOS, runtime.GOARCH} + json.NewEncoder(contentBuf).Encode(&rp) + req.Body = ioutil.NopCloser(contentBuf) if EngineConfig.Secret != "" { EngineConfig.OnEvent(ctx) } var c http.Client - var firstReport = false + c.Do(req) for { select { case event := <-EventBus: for _, plugin := range Plugins { - if plugin.RawConfig["enable"] != false { + if !plugin.Disabled { plugin.Config.OnEvent(event) } } + EngineConfig.OnEvent(event) case <-ctx.Done(): return case <-reportTimer.C: contentBuf.Reset() - if firstReport { - contentBuf.WriteString(fmt.Sprintf(`{"uuid":"`+UUID+`","streams":%d}`, len(Streams.Map))) - } else { - contentBuf.WriteString(content) - } + contentBuf.WriteString(fmt.Sprintf(`{"uuid":"`+UUID+`","streams":%d}`, len(Streams.Map))) req.Body = ioutil.NopCloser(contentBuf) - _, err := c.Do(req) - if err == nil && !firstReport { - firstReport = true - } + c.Do(req) } } } diff --git a/plugin.go b/plugin.go index 3cdfc60..9c7c279 100644 --- a/plugin.go +++ b/plugin.go @@ -41,7 +41,10 @@ func InstallPlugin(config config.Plugin) *Plugin { if _, ok := Plugins[name]; ok { return nil } - if config != EngineConfig { + switch v := config.(type) { + case *GlobalConfig: + v.InitDefaultHttp() + default: plugin.Logger = log.With(zap.String("plugin", name)) Plugins[name] = plugin plugins = append(plugins, plugin) @@ -66,6 +69,7 @@ type Plugin struct { Modified config.Config //修改过的配置项 *zap.Logger `json:"-"` saveTimer *time.Timer //用于保存的时候的延迟,防抖 + Disabled bool } func (opt *Plugin) logHandler(pattern string, handler http.Handler) http.Handler { diff --git a/report.go b/report.go new file mode 100644 index 0000000..abe5459 --- /dev/null +++ b/report.go @@ -0,0 +1,118 @@ +package engine + +import ( + "time" + + "gopkg.in/yaml.v3" + "m7s.live/engine/v4/common" +) + +type ReportCreateStream struct { + StreamPath string + Time int64 +} + +type ReportCloseStream struct { + StreamPath string + Time int64 +} + +type ReportAddTrack struct { + Name string + StreamPath string + Time int64 +} +type ReportTrackInfo struct { + BPS int + FPS int + Drops int + RBSize int +} +type ReportPulse struct { + StreamPath string + Tracks map[string]ReportTrackInfo + Subscribers map[string]struct { + Type string + Readers map[string]struct { + Delay uint32 + } + } + Time int64 +} + +type Reportor struct { + Subscriber + pulse ReportPulse +} + +func (r *Reportor) OnEvent(event any) { + switch v := event.(type) { + case PulseEvent: + r.pulse.Tracks = make(map[string]ReportTrackInfo) + r.pulse.Subscribers = make(map[string]struct { + Type string + Readers map[string]struct { + Delay uint32 + } + }) + r.Stream.Tracks.Range(func(k string, t common.Track) { + track := t.GetBase() + r.pulse.Tracks[k] = ReportTrackInfo{ + BPS: track.BPS, + FPS: track.FPS, + Drops: track.Drops, + RBSize: t.GetRBSize(), + } + }) + r.Stream.Subscribers.RangeAll(func(sub ISubscriber, wait *waitTracks) { + suber := sub.GetSubscriber() + r.pulse.Subscribers[suber.ID] = struct { + Type string + Readers map[string]struct { + Delay uint32 + } + }{Type: suber.Type, Readers: map[string]struct { + Delay uint32 + }{suber.Audio.Name: {Delay: suber.AudioReader.Delay}, suber.Video.Name: {Delay: suber.VideoReader.Delay}}} + }) + r.pulse.Time = time.Now().Unix() + EngineConfig.Report("pulse", r.pulse) + case common.Track: + EngineConfig.Report("addtrack", &ReportAddTrack{v.GetBase().Name, r.Stream.Path, time.Now().Unix()}) + } +} + +func (conf *GlobalConfig) OnEvent(event any) { + if !conf.GetEnableReport() { + conf.Engine.OnEvent(event) + return + } + switch v := event.(type) { + case SEcreate: + conf.Report("create", &ReportCreateStream{v.Target.Path, time.Now().Unix()}) + var reportor Reportor + reportor.IsInternal = true + reportor.pulse.StreamPath = v.Target.Path + if Engine.Subscribe(v.Target.Path, &reportor) == nil { + reportor.SubPulse() + } + case SEpublish: + case SErepublish: + case SEKick: + case SEclose: + conf.Report("close", &ReportCloseStream{v.Target.Path, time.Now().Unix()}) + case SEwaitClose: + case SEwaitPublish: + case ISubscriber: + case UnsubscribeEvent: + default: + conf.Engine.OnEvent(event) + } +} + +func (conf *GlobalConfig) Report(t string, v any) { + out, err := yaml.Marshal(v) + if err == nil { + conf.Engine.OnEvent(append([]byte("type: "+t+"\n"), out...)) + } +} diff --git a/stream.go b/stream.go index 4afa417..6f92e4a 100644 --- a/stream.go +++ b/stream.go @@ -20,42 +20,6 @@ import ( type StreamState byte type StreamAction byte -type StateEvent struct { - Action StreamAction - From StreamState - Stream *Stream `json:"-"` -} - -func (se StateEvent) Next() (next StreamState, ok bool) { - next, ok = StreamFSM[se.From][se.Action] - return -} - -type SEwaitPublish struct { - StateEvent - Publisher IPublisher -} -type SEpublish struct { - StateEvent -} - -type SErepublish struct { - StateEvent -} - -type SEwaitClose struct { - StateEvent -} -type SEclose struct { - StateEvent -} - -type SEKick struct { -} -type UnsubscribeEvent struct { - Subscriber ISubscriber -} - // 四状态机 const ( STATE_WAITPUBLISH StreamState = iota // 等待发布者状态 @@ -286,7 +250,11 @@ func findOrCreateStream(streamPath string, waitTimeout time.Duration) (s *Stream } func (r *Stream) action(action StreamAction) (ok bool) { - event := StateEvent{action, r.State, r} + var event StateEvent + event.Target = r + event.Action = action + event.From = r.State + event.Time = time.Now() var next StreamState if next, ok = event.Next(); ok { r.State = next @@ -314,6 +282,7 @@ func (r *Stream) action(action StreamAction) (ok bool) { waitTime = time.Millisecond * 10 //没有订阅者也没有配置发布者等待重连时间,默认10ms后关闭流 } r.timeout.Reset(waitTime) + r.Debug("wait publish", zap.Duration("wait", waitTime)) case STATE_PUBLISHING: if len(r.SEHistory) > 1 { stateEvent = SErepublish{event} @@ -392,8 +361,16 @@ func (s *Stream) onSuberClose(sub ISubscriber) { // 流状态处理中枢,包括接收订阅发布指令等 func (s *Stream) run() { + EventBus <- SEcreate{StreamEvent{Event[*Stream]{Target: s, Time: time.Now()}}} + pulseTicker := time.NewTicker(time.Second * 5) + defer pulseTicker.Stop() + pulseSuber := make(map[ISubscriber]struct{}) for { select { + case <-pulseTicker.C: + for sub := range pulseSuber { + sub.OnEvent(PulseEvent{CreateEvent(struct{}{})}) + } case <-s.timeout.C: if s.State == STATE_PUBLISHING { for sub := range s.Subscribers.internal { @@ -412,7 +389,10 @@ func (s *Stream) run() { if lastWriteTime := t.LastWriteTime(); !lastWriteTime.IsZero() && time.Since(lastWriteTime) > s.PublishTimeout { s.Warn("track timeout", zap.String("name", name), zap.Time("lastWriteTime", lastWriteTime), zap.Duration("timeout", s.PublishTimeout)) delete(s.Tracks.Map.Map, name) - s.Subscribers.Broadcast(TrackRemoved{t}) + var event TrackTimeoutEvent + event.Target = t + event.Time = time.Now() + s.Subscribers.Broadcast(event) } }) if s.State != STATE_PUBLISHING { @@ -432,6 +412,8 @@ func (s *Stream) run() { case action, ok := <-s.actionChan.C: if ok { switch v := action.(type) { + case SubPulse: + pulseSuber[v] = struct{}{} case *util.Promise[IPublisher]: if s.IsClosed() { v.Reject(ErrStreamIsClosed) @@ -491,7 +473,8 @@ func (s *Stream) run() { if s.Subscribers.Len() == 1 && s.State == STATE_WAITCLOSE { s.action(ACTION_FIRSTENTER) } - case ISubscriber: + case Unsubscribe: + delete(pulseSuber, v) s.onSuberClose(v) case TrackRemoved: name := v.GetBase().Name @@ -534,14 +517,22 @@ func (s *Stream) run() { } } -func (s *Stream) AddTrack(t *util.Promise[Track]) { - s.Receive(t) +func (s *Stream) AddTrack(t Track) (promise *util.Promise[Track]) { + promise = util.NewPromise(t) + s.Receive(promise) + return } type TrackRemoved struct { Track } +type SubPulse struct { + ISubscriber +} + +type Unsubscribe ISubscriber + func (s *Stream) RemoveTrack(t Track) { s.Receive(TrackRemoved{t}) } diff --git a/subscriber.go b/subscriber.go index a6e378d..1a1f298 100644 --- a/subscriber.go +++ b/subscriber.go @@ -2,6 +2,7 @@ package engine import ( "context" + "fmt" "io" "net" "strconv" @@ -164,6 +165,10 @@ func (s *Subscriber) IsPlaying() bool { return s.TrackPlayer.Context != nil && s.TrackPlayer.Err() == nil } +func (s *Subscriber) SubPulse() { + s.Stream.Receive(SubPulse{s}) +} + func (s *Subscriber) PlayRaw() { s.PlayBlock(SUBTYPE_RAW) } @@ -206,11 +211,11 @@ func (s *Subscriber) PlayBlock(subType byte) { switch subType { case SUBTYPE_RAW: sendVideoFrame = func(frame *AVFrame) { - // println("v", frame.Sequence, frame.AbsTime, s.VideoReader.AbsTime, frame.IFrame) + // fmt.Println("v", frame.Sequence, s.VideoReader.AbsTime, frame.IFrame) spesic.OnEvent(VideoFrame{frame, s.Video, s.VideoReader.AbsTime, s.VideoReader.GetPTS32(), s.VideoReader.GetDTS32()}) } sendAudioFrame = func(frame *AVFrame) { - // println("a", frame.Sequence, frame.AbsTime, s.AudioReader.AbsTime) + // fmt.Println("a", frame.Sequence, s.AudioReader.AbsTime) spesic.OnEvent(AudioFrame{frame, s.Audio, s.AudioReader.AbsTime, s.AudioReader.GetPTS32(), s.AudioReader.GetDTS32()}) } case SUBTYPE_RTP: @@ -243,7 +248,8 @@ func (s *Subscriber) PlayBlock(subType byte) { case SUBTYPE_FLV: flvHeadCache := make([]byte, 15) //内存复用 sendFlvFrame := func(t byte, ts uint32, avcc ...[]byte) { - // fmt.Println(ts) + // println(t, ts) + fmt.Printf("%d %X %X %d\n",t, avcc[0][0], avcc[0][1], ts) flvHeadCache[0] = t result := append(FLVFrame{flvHeadCache[:11]}, avcc...) dataSize := uint32(util.SizeOfBuffers(avcc)) @@ -273,7 +279,7 @@ func (s *Subscriber) PlayBlock(subType byte) { sendFlvFrame(codec.FLV_TAG_TYPE_VIDEO, s.VideoReader.AbsTime, frame.AVCC.ToBuffers()...) } sendAudioFrame = func(frame *AVFrame) { - // println(frame.Sequence, s.AudioReader.AbsTime, frame.DeltaTime) + // fmt.Println(frame.Sequence, s.AudioReader.AbsTime, frame.DeltaTime) sendFlvFrame(codec.FLV_TAG_TYPE_AUDIO, s.AudioReader.AbsTime, frame.AVCC.ToBuffers()...) } } @@ -282,43 +288,38 @@ func (s *Subscriber) PlayBlock(subType byte) { if s.Args.Has(conf.SubModeArgName) { subMode, _ = strconv.Atoi(s.Args.Get(conf.SubModeArgName)) } + var initState = 0 var videoFrame, audioFrame *AVFrame - var lastAbsTime time.Duration - for ctx.Err() == nil { if hasVideo { for ctx.Err() == nil { s.VideoReader.Read(ctx, subMode) - frame := s.VideoReader.Frame - if frame == nil || ctx.Err() != nil { + videoFrame = s.VideoReader.Frame + if videoFrame == nil || ctx.Err() != nil { return } // fmt.Println("video", s.VideoReader.Track.PreFrame().Sequence-frame.Sequence) - if frame.IFrame && s.VideoReader.DecConfChanged() { + if videoFrame.IFrame && s.VideoReader.DecConfChanged() { s.VideoReader.ConfSeq = s.VideoReader.Track.SequenceHeadSeq sendVideoDecConf() } if hasAudio { if audioFrame != nil { - if frame.Timestamp > lastAbsTime { + if videoFrame.Timestamp > audioFrame.Timestamp { // fmt.Println("switch audio", audioFrame.CanRead) if audioFrame.CanRead { sendAudioFrame(audioFrame) } audioFrame = nil - videoFrame = frame - lastAbsTime = frame.Timestamp - break - } - } else if lastAbsTime == 0 { - if lastAbsTime = frame.Timestamp; lastAbsTime != 0 { - videoFrame = frame break } + } else if initState++; initState >= 2 { + break } } - if !conf.IFrameOnly || frame.IFrame { - sendVideoFrame(frame) + + if !conf.IFrameOnly || videoFrame.IFrame { + sendVideoFrame(videoFrame) } else { // fmt.Println("skip video", frame.Sequence) } @@ -339,8 +340,8 @@ func (s *Subscriber) PlayBlock(subType byte) { } } s.AudioReader.Read(ctx, subMode) - frame := s.AudioReader.Frame - if frame == nil || ctx.Err() != nil { + audioFrame = s.AudioReader.Frame + if audioFrame == nil || ctx.Err() != nil { return } // fmt.Println("audio", s.AudioReader.Track.PreFrame().Sequence-frame.Sequence) @@ -349,19 +350,17 @@ func (s *Subscriber) PlayBlock(subType byte) { sendAudioDecConf() } if hasVideo && videoFrame != nil { - if frame.Timestamp > lastAbsTime { + if audioFrame.Timestamp > videoFrame.Timestamp { // fmt.Println("switch video", videoFrame.CanRead) if videoFrame.CanRead { sendVideoFrame(videoFrame) } videoFrame = nil - audioFrame = frame - lastAbsTime = frame.Timestamp break } } - if frame.Timestamp >= s.AudioReader.SkipTs { - sendAudioFrame(frame) + if audioFrame.Timestamp >= s.AudioReader.SkipTs { + sendAudioFrame(audioFrame) } else { // fmt.Println("skip audio", frame.AbsTime, s.AudioReader.SkipTs) } diff --git a/subscribers.go b/subscribers.go index 5e27441..4086efc 100644 --- a/subscribers.go +++ b/subscribers.go @@ -93,7 +93,7 @@ func (s *Subscribers) Delete(suber ISubscriber) { io := suber.GetSubscriber() io.Info("suber -1", zap.Int("remains", s.Len())) if config.Global.EnableSubEvent { - EventBus <- UnsubscribeEvent{suber} + EventBus <- UnsubscribeEvent{CreateEvent(suber)} } } diff --git a/track/audio.go b/track/audio.go index cc6736d..bdcfa24 100644 --- a/track/audio.go +++ b/track/audio.go @@ -3,7 +3,6 @@ package track import ( "go.uber.org/zap" "m7s.live/engine/v4/codec" - "m7s.live/engine/v4/common" . "m7s.live/engine/v4/common" "m7s.live/engine/v4/util" ) @@ -22,9 +21,7 @@ type Audio struct { func (a *Audio) Attach() { if a.Attached.CompareAndSwap(false, true) { - promise := util.NewPromise(common.Track(a)) - a.Stream.AddTrack(promise) - if err := promise.Await(); err != nil { + if err := a.Stream.AddTrack(a).Await(); err != nil { a.Error("attach audio track failed", zap.Error(err)) } else { a.Info("audio track attached", zap.Uint32("sample rate", a.SampleRate)) diff --git a/track/base.go b/track/base.go index 7f7701a..a729043 100644 --- a/track/base.go +++ b/track/base.go @@ -5,6 +5,7 @@ import ( "unsafe" "github.com/pion/rtp" + "go.uber.org/zap" . "m7s.live/engine/v4/common" "m7s.live/engine/v4/config" "m7s.live/engine/v4/util" @@ -12,19 +13,27 @@ import ( type 流速控制 struct { 起始时间戳 time.Duration + 起始dts time.Duration 等待上限 time.Duration 起始时间 time.Time } -func (p *流速控制) 重置(绝对时间戳 time.Duration) { +func (p *流速控制) 重置(绝对时间戳 time.Duration, dts time.Duration) { p.起始时间 = time.Now() p.起始时间戳 = 绝对时间戳 + p.起始dts = dts // println("重置", p.起始时间.Format("2006-01-02 15:04:05"), p.起始时间戳) } +func (p *流速控制) 根据起始DTS计算绝对时间戳(dts time.Duration) time.Duration { + if dts < p.起始dts { + dts += 0xFFFFFFFFF + } + return ((dts-p.起始dts)*time.Millisecond + p.起始时间戳*90) / 90 +} func (p *流速控制) 时间戳差(绝对时间戳 time.Duration) time.Duration { return 绝对时间戳 - p.起始时间戳 } -func (p *流速控制) 控制流速(绝对时间戳 time.Duration) { +func (p *流速控制) 控制流速(绝对时间戳 time.Duration, dts time.Duration) { 数据时间差, 实际时间差 := p.时间戳差(绝对时间戳), time.Since(p.起始时间) // println("数据时间差", 数据时间差, "实际时间差", 实际时间差, "绝对时间戳", 绝对时间戳, "起始时间戳", p.起始时间戳, "起始时间", p.起始时间.Format("2006-01-02 15:04:05")) // if 实际时间差 > 数据时间差 { @@ -41,6 +50,8 @@ func (p *流速控制) 控制流速(绝对时间戳 time.Duration) { time.Sleep(过快) } } else if 过快 < -100*time.Millisecond { + // fmt.Println("过慢毫秒", 过快.Milliseconds()) + // p.重置(绝对时间戳, dts) // println("过慢毫秒", p.name, 过快.Milliseconds()) } } @@ -89,6 +100,10 @@ type Media struct { 流速控制 } +func (av *Media) GetRBSize() int { + return av.RingBuffer.Size +} + func (av *Media) GetRTPFromPool() (result *util.ListItem[RTPFrame]) { result = av.RtpPool.Get() if result.Value.Packet == nil { @@ -204,16 +219,29 @@ func (av *Media) AddIDR() { func (av *Media) Flush() { curValue, preValue, nextValue := &av.Value, av.LastValue, av.Next() + useDts := curValue.Timestamp == 0 if av.State == TrackStateOffline { av.State = TrackStateOnline - av.deltaTs = preValue.Timestamp - curValue.Timestamp + time.Duration(preValue.DeltaTime)*time.Millisecond - av.Info("track back online") - } - if av.deltaTs != 0 { - rtpts := av.deltaTs * 90 / 1000 - curValue.DTS = curValue.DTS + rtpts - curValue.PTS = curValue.PTS + rtpts - curValue.Timestamp = 0 + if useDts { + av.deltaTs = curValue.DTS - preValue.DTS + curValue.Timestamp = preValue.Timestamp + time.Millisecond + } else { + av.deltaTs = curValue.Timestamp - preValue.Timestamp + } + curValue.DTS += 90 + curValue.PTS += 90 + curValue.Timestamp += time.Millisecond + av.Info("track back online", zap.Duration("delta", av.deltaTs)) + } else if av.deltaTs != 0 { + if useDts { + curValue.DTS -= av.deltaTs + curValue.PTS -= av.deltaTs + } else { + rtpts := av.deltaTs * 90 / time.Millisecond + curValue.DTS -= rtpts + curValue.PTS -= rtpts + curValue.Timestamp -= av.deltaTs + } } bufferTime := av.Stream.GetPublisherConfig().BufferTime if bufferTime > 0 && av.IDRingList.Length > 1 && curValue.Timestamp-av.IDRingList.Next.Next.Value.Value.Timestamp > bufferTime { @@ -232,17 +260,17 @@ func (av *Media) Flush() { if av.起始时间.IsZero() { curValue.DeltaTime = 0 - if curValue.Timestamp == 0 { + if useDts { curValue.Timestamp = time.Since(av.Stream.GetStartTime()) } - av.重置(curValue.Timestamp) + av.重置(curValue.Timestamp, curValue.DTS) } else { - if curValue.Timestamp == 0 { - curValue.Timestamp = (preValue.Timestamp*90 + (curValue.DTS-preValue.DTS)*time.Millisecond) / 90 + if useDts { + curValue.Timestamp = av.根据起始DTS计算绝对时间戳(curValue.DTS) } curValue.DeltaTime = uint32((curValue.Timestamp - preValue.Timestamp) / time.Millisecond) } - // fmt.Println(av.Name, curValue.DTS, curValue.Timestamp, curValue.DeltaTime) + // fmt.Println(av.Name, curValue.Timestamp, curValue.DeltaTime) if curValue.AUList.Length > 0 { // 补完RTP if config.Global.EnableRTP && curValue.RTP.Length == 0 { @@ -255,7 +283,7 @@ func (av *Media) Flush() { } av.Base.Flush(&curValue.BaseFrame) if av.等待上限 > 0 { - av.控制流速(curValue.Timestamp) + av.控制流速(curValue.Timestamp, curValue.DTS) } preValue = curValue curValue = av.MoveNext() diff --git a/track/data.go b/track/data.go index 3bf7e50..def696e 100644 --- a/track/data.go +++ b/track/data.go @@ -6,7 +6,6 @@ import ( "time" "go.uber.org/zap" - "m7s.live/engine/v4/common" . "m7s.live/engine/v4/common" "m7s.live/engine/v4/util" ) @@ -17,6 +16,10 @@ type Data struct { sync.Locker // 写入锁,可选,单一协程写入可以不加锁 } +func (d *Data) GetRBSize() int { + return d.LockRing.RingBuffer.Size +} + func (d *Data) ReadRing() *LockRing[any] { return util.Clone(d.LockRing) } @@ -47,9 +50,7 @@ func (d *Data) Play(ctx context.Context, onData func(any) error) error { } func (d *Data) Attach() { - promise := util.NewPromise(common.Track(d)) - d.Stream.AddTrack(promise) - if err := promise.Await(); err != nil { + if err := d.Stream.AddTrack(d).Await(); err != nil { d.Error("attach data track failed", zap.Error(err)) } else { d.Info("data track attached") diff --git a/track/reader-av.go b/track/reader-av.go index ba98bcb..fe31b3e 100644 --- a/track/reader-av.go +++ b/track/reader-av.go @@ -125,17 +125,21 @@ func (r *AVRingReader) Read(ctx context.Context, mode int) (err error) { r.ReadFrame() } r.AbsTime = uint32((r.Frame.Timestamp - r.SkipTs).Milliseconds()) + if r.AbsTime == 0 { + r.AbsTime = 1 + } r.Delay = uint32((r.Track.LastValue.Timestamp - r.Frame.Timestamp).Milliseconds()) + // fmt.Println(r.Track.Name, r.Delay) // println(r.Track.Name, r.State, r.Frame.AbsTime, r.SkipTs, r.AbsTime) return } func (r *AVRingReader) GetPTS32() uint32 { - return uint32((r.Frame.PTS - r.SkipTs * 90 / time.Millisecond)) + return uint32((r.Frame.PTS - r.SkipTs*90/time.Millisecond)) } func (r *AVRingReader) GetDTS32() uint32 { - return uint32((r.Frame.DTS - r.SkipTs * 90 / time.Millisecond)) + return uint32((r.Frame.DTS - r.SkipTs*90/time.Millisecond)) } func (r *AVRingReader) ResetAbsTime() { r.SkipTs = r.Frame.Timestamp - r.AbsTime = 0 + r.AbsTime = 1 } diff --git a/track/video.go b/track/video.go index 08a91fa..e4def03 100644 --- a/track/video.go +++ b/track/video.go @@ -30,9 +30,7 @@ type Video struct { func (v *Video) Attach() { if v.Attached.CompareAndSwap(false, true) { - promise := util.NewPromise(common.Track(v)) - v.Stream.AddTrack(promise) - if err := promise.Await(); err != nil { + if err := v.Stream.AddTrack(v).Await(); err != nil { v.Error("attach video track failed", zap.Error(err)) } else { v.Info("video track attached", zap.Uint("width", v.Width), zap.Uint("height", v.Height)) diff --git a/util/buffer.go b/util/buffer.go index 8a94f01..0b9d7c4 100644 --- a/util/buffer.go +++ b/util/buffer.go @@ -83,6 +83,7 @@ func (b Buffer) SubBuf(start int, length int) Buffer { return b[start : start+length] } +// Malloc 扩大原来的buffer的长度,返回新增的buffer func (b *Buffer) Malloc(count int) Buffer { l := b.Len() newL := l + count @@ -96,14 +97,14 @@ func (b *Buffer) Malloc(count int) Buffer { return b.SubBuf(l, count) } -func (b *Buffer) Reset() { - *b = b.SubBuf(0, 0) +// Relloc 改变 buffer 到指定大小 +func (b *Buffer) Relloc(count int) { + b.Reset() + b.Malloc(count) } -func (b *Buffer) Glow(n int) { - l := b.Len() - b.Malloc(n) - *b = b.SubBuf(0, l) +func (b *Buffer) Reset() { + *b = b.SubBuf(0, 0) } func (b *Buffer) Split(n int) (result net.Buffers) {