事件上报

This commit is contained in:
dexter
2023-03-28 13:26:54 +08:00
parent 336b0ae759
commit 25a8add7a7
23 changed files with 440 additions and 180 deletions

View File

@@ -67,6 +67,7 @@ type AVFrame struct {
AVCC util.BLL `json:"-"` // 打包好的AVCC格式(MPEG-4格式、Byte-Stream Format)
RTP util.List[RTPFrame] `json:"-"`
AUList util.BLLs `json:"-"` // 裸数据
Extras any `json:"-"` // 任意扩展数据
}
func (av *AVFrame) WriteAVCC(ts uint32, frame *util.BLL) {

View File

@@ -31,39 +31,37 @@ type Base struct {
ts time.Time
bytes int
frames int
drops int //丢帧数
BPS int
FPS int
Drops int // 丢帧率
RawSize int // 裸数据长度
RawPart []int // 裸数据片段用于UI上显示
BPSs []TimelineData[int] // 10s码率统计
FPSs []TimelineData[int] // 10s帧率统计
}
func (bt *Base) ComputeBPS(bytes int) {
bt.bytes += bytes
bt.frames++
if elapse := time.Since(bt.ts).Seconds(); elapse > 1 {
bt.BPS = bt.bytes / int(elapse)
bt.FPS = bt.frames / int(elapse)
bt.BPS = int(float64(bt.bytes) / elapse)
bt.FPS = int(float64(bt.frames) / elapse)
bt.Drops = int(float64(bt.drops) / elapse)
bt.bytes = 0
bt.frames = 0
bt.drops = 0
bt.ts = time.Now()
bt.BPSs = append(bt.BPSs, TimelineData[int]{Timestamp: bt.ts, Value: bt.BPS})
if len(bt.BPSs) > 9 {
copy(bt.BPSs, bt.BPSs[1:])
bt.BPSs = bt.BPSs[:10]
}
bt.FPSs = append(bt.FPSs, TimelineData[int]{Timestamp: bt.ts, Value: bt.FPS})
if len(bt.FPSs) > 10 {
copy(bt.FPSs, bt.FPSs[1:])
bt.FPSs = bt.FPSs[:10]
}
}
}
func (bt *Base) GetBase() *Base {
return bt
}
// GetRBSize 获取缓冲区大小
func (bt *Base) GetRBSize() int {
return 0
}
func (bt *Base) SnapForJson() {
}
func (bt *Base) Flush(bf *BaseFrame) {
@@ -89,6 +87,7 @@ type Track interface {
LastWriteTime() time.Time
SnapForJson()
SetStuff(stuff ...any)
GetRBSize() int
}
type AVTrack interface {

View File

@@ -9,7 +9,7 @@ import (
)
type IStream interface {
AddTrack(*util.Promise[Track])
AddTrack(Track) *util.Promise[Track]
RemoveTrack(Track)
Close()
IsClosed() bool

View File

@@ -19,7 +19,7 @@ type HTTP struct {
ListenAddrTLS string
CertFile string
KeyFile string
CORS bool //是否自动添加CORS头
CORS bool `default:"true"` //是否自动添加CORS头
UserName string
Password string
ReadTimeout time.Duration

View File

@@ -47,20 +47,28 @@ func (cfg *Engine) Remote(ctx context.Context) error {
conn, err := quic.DialAddr(cfg.Server, tlsConf, &quic.Config{
KeepAlivePeriod: time.Second * 10,
EnableDatagrams: true,
})
wasConnected := err == nil
if stream := quic.Stream(nil); err == nil {
if stream, err = conn.OpenStreamSync(ctx); err == nil {
_, err = stream.Write([]byte(cfg.Secret + "\n"))
_, err = stream.Write(append([]byte{1}, (cfg.Secret + "\n")...))
if msg := []byte(nil); err == nil {
if msg, err = io.ReadAll(stream); err == nil {
var rMessage map[string]interface{}
if err = json.Unmarshal(msg, &rMessage); err == nil {
if msg, err = bufio.NewReader(stream).ReadSlice(0); err == nil {
var rMessage map[string]any
if err = json.Unmarshal(msg[:len(msg)-1], &rMessage); err == nil {
if rMessage["code"].(float64) != 0 {
log.Error("response from console server ", cfg.Server, " ", rMessage["msg"])
return nil
} else {
log.Info("response from console server ", cfg.Server, " success")
cfg.reportStream = stream
log.Info("response from console server ", cfg.Server, " success ", rMessage)
if v, ok := rMessage["enableReport"]; ok {
cfg.enableReport = v.(bool)
}
if v, ok := rMessage["instanceId"]; ok {
cfg.instanceId = v.(string)
}
}
}
}

View File

@@ -6,7 +6,7 @@ import (
"strings"
"time"
"github.com/mcuadros/go-defaults"
"github.com/quic-go/quic-go"
"golang.org/x/net/websocket"
"m7s.live/engine/v4/log"
"m7s.live/engine/v4/util"
@@ -121,39 +121,25 @@ type Engine struct {
RTPReorderBufferLen int `default:"50"` //RTP重排序缓冲长度
SpeedLimit time.Duration `default:"500ms"` //速度限制最大等待时间
EventBusSize int `default:"10"` //事件总线大小
enableReport bool `default:"false"` //启用报告,用于统计和监控
reportStream quic.Stream // console server connection
instanceId string // instance id 来自console
}
var Global = &Engine{
// Publish: Publish{true, true, false, 10, 0, 0, 0},
// Subscribe: Subscribe{
// SubAudio: true,
// SubVideo: true,
// SubVideoArgName: "vts",
// SubAudioArgName: "ats",
// SubDataArgName: "dts",
// SubAudioTracks: nil,
// SubVideoTracks: nil,
// SubMode: 0,
// IFrameOnly: false,
// WaitTimeout: 10,
// },
HTTP: HTTP{ListenAddr: ":8080", CORS: true, mux: http.DefaultServeMux},
// RTPReorder: true,
// EnableAVCC: true,
// EnableRTP: true,
// EnableSubEvent: true,
// EnableAuth: true,
// Console: Console{
// "console.monibuca.com:4242", "", "", "",
// },
// LogLevel: "info",
// RTPReorderBufferLen: 50,
// SpeedLimit: 500,
// EventBusSize: 10,
func (cfg *Engine) GetEnableReport() bool {
return cfg.enableReport
}
func init() {
defaults.SetDefaults(Global)
func (cfg *Engine) GetInstanceId() string {
return cfg.instanceId
}
var Global *Engine
func (cfg *Engine) InitDefaultHttp() {
Global = cfg
cfg.HTTP.mux = http.DefaultServeMux
cfg.HTTP.ListenAddr = ":8080"
}
type myResponseWriter struct {
@@ -233,6 +219,11 @@ func (cfg *Engine) WsRemote() {
func (cfg *Engine) OnEvent(event any) {
switch v := event.(type) {
case []byte:
if cfg.reportStream != nil {
cfg.reportStream.Write(v)
cfg.reportStream.Write([]byte{0})
}
case context.Context:
util.RTPReorderBufferLen = uint16(cfg.RTPReorderBufferLen)
if strings.HasPrefix(cfg.Console.Server, "wss") {

84
events.go Normal file
View File

@@ -0,0 +1,84 @@
package engine
import (
"time"
"m7s.live/engine/v4/common"
)
type Event[T any] struct {
Time time.Time
Target T `json:"-"`
}
func CreateEvent[T any](target T) (event Event[T]) {
event.Time = time.Now()
event.Target = target
return
}
// PulseEvent 心跳事件
type PulseEvent struct {
Event[struct{}]
}
type StreamEvent struct {
Event[*Stream]
}
// StateEvent 状态机事件
type StateEvent struct {
StreamEvent
Action StreamAction
From StreamState
}
// ErrorEvent 错误事件
type ErrorEvent struct {
Event[any]
Error error
}
func (se StateEvent) Next() (next StreamState, ok bool) {
next, ok = StreamFSM[se.From][se.Action]
return
}
type SEwaitPublish struct {
StateEvent
Publisher IPublisher
}
type SEpublish struct {
StateEvent
}
type SErepublish struct {
StateEvent
}
type SEwaitClose struct {
StateEvent
}
type SEclose struct {
StateEvent
}
type SEcreate struct {
StreamEvent
}
type SEKick struct {
Event[struct{}]
}
type UnsubscribeEvent struct {
Event[ISubscriber]
}
type AddTrackEvent struct {
Event[common.Track]
}
type TrackTimeoutEvent struct {
Event[common.Track]
}

1
go.mod
View File

@@ -24,6 +24,7 @@ require (
)
require (
github.com/denisbrodbeck/machineid v1.0.1
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect
github.com/golang/mock v1.6.0 // indirect

2
go.sum
View File

@@ -15,6 +15,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/denisbrodbeck/machineid v1.0.1 h1:geKr9qtkB876mXguW2X6TU4ZynleN6ezuMSRhl4D7AQ=
github.com/denisbrodbeck/machineid v1.0.1/go.mod h1:dJUwb7PTidGDeYyUBmXZ2GphQBbjJCrnectwCyxcUSI=
github.com/emitter-io/address v1.0.0/go.mod h1:GfZb5+S/o8694B1GMGK2imUYQyn2skszMvGNA5D84Ug=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=

View File

@@ -19,7 +19,7 @@ const (
)
type GlobalConfig struct {
*config.Engine
config.Engine
}
func (conf *GlobalConfig) ServeHTTP(rw http.ResponseWriter, r *http.Request) {

4
io.go
View File

@@ -103,6 +103,10 @@ type IIO interface {
// Stop 停止订阅或者发布,由订阅者或者发布者调用
func (io *IO) Stop() {
if io.IsClosed() {
return
}
io.Debug("stop", zap.Stack("stack"))
if io.CancelFunc != nil {
io.CancelFunc()
}

69
main.go
View File

@@ -3,6 +3,7 @@ package engine // import "m7s.live/engine/v4"
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net"
@@ -14,6 +15,7 @@ import (
"strings"
"time"
"github.com/denisbrodbeck/machineid"
"github.com/google/uuid"
. "github.com/logrusorgru/aurora"
"go.uber.org/zap"
@@ -36,11 +38,9 @@ var (
ConfigRaw []byte
Plugins = make(map[string]*Plugin) // Plugins 所有的插件配置
plugins []*Plugin //插件列表
EngineConfig = &GlobalConfig{
Engine: config.Global,
}
EngineConfig = &GlobalConfig{}
Engine = InstallPlugin(EngineConfig)
settingDir = filepath.Join(ExecDir, ".m7s") //配置缓存目录,该目录按照插件名称作为文件名存储修改过的配置
Engine = InstallPlugin(EngineConfig) //复用安装插件逻辑将全局配置信息注入并启动server
MergeConfigs = []string{"Publish", "Subscribe", "HTTP"} //需要合并配置的属性项,插件若没有配置则使用全局配置
EventBus chan any
apiList []string //注册到引擎的API接口列表
@@ -54,13 +54,14 @@ func init() {
// Run 启动Monibuca引擎传入总的Context可用于关闭所有
func Run(ctx context.Context, configFile string) (err error) {
id, _ := machineid.ProtectedID("monibuca")
SysInfo.StartTime = time.Now()
SysInfo.Version = Engine.Version
Engine.Context = ctx
if _, err := os.Stat(configFile); err != nil {
if _, err = os.Stat(configFile); err != nil {
configFile = filepath.Join(ExecDir, configFile)
}
if err := util.CreateShutdownScript(); err != nil {
if err = util.CreateShutdownScript(); err != nil {
log.Error("create shutdown script error:", err)
}
if ConfigRaw, err = ioutil.ReadFile(configFile); err != nil {
@@ -70,7 +71,7 @@ func Run(ctx context.Context, configFile string) (err error) {
log.Error("create dir .m7s error:", err)
return
}
log.Info(Blink("Ⓜ starting m7s v4"))
log.Info("Ⓜ starting engine:", Blink(Engine.Version))
var cg config.Config
if ConfigRaw != nil {
if err = yaml.Unmarshal(ConfigRaw, &cg); err == nil {
@@ -79,7 +80,7 @@ func Run(ctx context.Context, configFile string) (err error) {
Engine.Yaml = string(b)
}
//将配置信息同步到结构体
Engine.RawConfig.Unmarshal(config.Global)
Engine.RawConfig.Unmarshal(&EngineConfig.Engine)
} else {
log.Error("parsing yml error:", err)
}
@@ -123,35 +124,63 @@ func Run(ctx context.Context, configFile string) (err error) {
if ver, ok := ctx.Value("version").(string); ok && ver != "" && ver != "dev" {
version = ver
}
log.Info(Blink("m7s@"+version), " start success")
content := fmt.Sprintf(`{"uuid":"%s","version":"%s","os":"%s","arch":"%s"}`, UUID, version, runtime.GOOS, runtime.GOARCH)
log.Info("monibuca", version, Green(" start success"))
var enabledPlugins, disabledPlugins []string
for _, plugin := range plugins {
if plugin.RawConfig["enable"] == false {
plugin.Disabled = true
disabledPlugins = append(disabledPlugins, plugin.Name)
} else {
enabledPlugins = append(enabledPlugins, plugin.Name)
}
}
fmt.Print("已运行的插件:")
for _, plugin := range enabledPlugins {
fmt.Print(Colorize(" "+plugin+" ", BlackFg|GreenBg|BoldFm), " ")
}
fmt.Println()
fmt.Print("已禁用的插件:")
for _, plugin := range disabledPlugins {
fmt.Print(Colorize(" "+plugin+" ", BlackFg|RedBg|CrossedOutFm), " ")
}
fmt.Println()
fmt.Println(Bold(Cyan("官网地址: ")), Yellow("https://m7s.live"))
fmt.Println(Bold(Cyan("启动工程: ")), Yellow("https://github.com/langhuihui/monibuca"))
fmt.Println(Bold(Cyan("使用文档: ")), Yellow("https://m7s.live/guide/introduction.html"))
fmt.Println(Bold(Cyan("开发文档: ")), Yellow("https://m7s.live/devel/startup.html"))
fmt.Println(Bold(Cyan("视频教程: ")), Yellow("https://space.bilibili.com/328443019/channel/collectiondetail?sid=514619"))
fmt.Println(Bold(Cyan("远程界面: ")), Yellow("https://console.monibuca.com"))
rp := struct {
UUID string `json:"uuid"`
Machine string `json:"machine"`
Instance string `json:"instance"`
Version string `json:"version"`
OS string `json:"os"`
Arch string `json:"arch"`
}{UUID, id, EngineConfig.GetInstanceId(), version, runtime.GOOS, runtime.GOARCH}
json.NewEncoder(contentBuf).Encode(&rp)
req.Body = ioutil.NopCloser(contentBuf)
if EngineConfig.Secret != "" {
EngineConfig.OnEvent(ctx)
}
var c http.Client
var firstReport = false
c.Do(req)
for {
select {
case event := <-EventBus:
for _, plugin := range Plugins {
if plugin.RawConfig["enable"] != false {
if !plugin.Disabled {
plugin.Config.OnEvent(event)
}
}
EngineConfig.OnEvent(event)
case <-ctx.Done():
return
case <-reportTimer.C:
contentBuf.Reset()
if firstReport {
contentBuf.WriteString(fmt.Sprintf(`{"uuid":"`+UUID+`","streams":%d}`, len(Streams.Map)))
} else {
contentBuf.WriteString(content)
}
req.Body = ioutil.NopCloser(contentBuf)
_, err := c.Do(req)
if err == nil && !firstReport {
firstReport = true
}
c.Do(req)
}
}
}

View File

@@ -41,7 +41,10 @@ func InstallPlugin(config config.Plugin) *Plugin {
if _, ok := Plugins[name]; ok {
return nil
}
if config != EngineConfig {
switch v := config.(type) {
case *GlobalConfig:
v.InitDefaultHttp()
default:
plugin.Logger = log.With(zap.String("plugin", name))
Plugins[name] = plugin
plugins = append(plugins, plugin)
@@ -66,6 +69,7 @@ type Plugin struct {
Modified config.Config //修改过的配置项
*zap.Logger `json:"-"`
saveTimer *time.Timer //用于保存的时候的延迟,防抖
Disabled bool
}
func (opt *Plugin) logHandler(pattern string, handler http.Handler) http.Handler {

118
report.go Normal file
View File

@@ -0,0 +1,118 @@
package engine
import (
"time"
"gopkg.in/yaml.v3"
"m7s.live/engine/v4/common"
)
type ReportCreateStream struct {
StreamPath string
Time int64
}
type ReportCloseStream struct {
StreamPath string
Time int64
}
type ReportAddTrack struct {
Name string
StreamPath string
Time int64
}
type ReportTrackInfo struct {
BPS int
FPS int
Drops int
RBSize int
}
type ReportPulse struct {
StreamPath string
Tracks map[string]ReportTrackInfo
Subscribers map[string]struct {
Type string
Readers map[string]struct {
Delay uint32
}
}
Time int64
}
type Reportor struct {
Subscriber
pulse ReportPulse
}
func (r *Reportor) OnEvent(event any) {
switch v := event.(type) {
case PulseEvent:
r.pulse.Tracks = make(map[string]ReportTrackInfo)
r.pulse.Subscribers = make(map[string]struct {
Type string
Readers map[string]struct {
Delay uint32
}
})
r.Stream.Tracks.Range(func(k string, t common.Track) {
track := t.GetBase()
r.pulse.Tracks[k] = ReportTrackInfo{
BPS: track.BPS,
FPS: track.FPS,
Drops: track.Drops,
RBSize: t.GetRBSize(),
}
})
r.Stream.Subscribers.RangeAll(func(sub ISubscriber, wait *waitTracks) {
suber := sub.GetSubscriber()
r.pulse.Subscribers[suber.ID] = struct {
Type string
Readers map[string]struct {
Delay uint32
}
}{Type: suber.Type, Readers: map[string]struct {
Delay uint32
}{suber.Audio.Name: {Delay: suber.AudioReader.Delay}, suber.Video.Name: {Delay: suber.VideoReader.Delay}}}
})
r.pulse.Time = time.Now().Unix()
EngineConfig.Report("pulse", r.pulse)
case common.Track:
EngineConfig.Report("addtrack", &ReportAddTrack{v.GetBase().Name, r.Stream.Path, time.Now().Unix()})
}
}
func (conf *GlobalConfig) OnEvent(event any) {
if !conf.GetEnableReport() {
conf.Engine.OnEvent(event)
return
}
switch v := event.(type) {
case SEcreate:
conf.Report("create", &ReportCreateStream{v.Target.Path, time.Now().Unix()})
var reportor Reportor
reportor.IsInternal = true
reportor.pulse.StreamPath = v.Target.Path
if Engine.Subscribe(v.Target.Path, &reportor) == nil {
reportor.SubPulse()
}
case SEpublish:
case SErepublish:
case SEKick:
case SEclose:
conf.Report("close", &ReportCloseStream{v.Target.Path, time.Now().Unix()})
case SEwaitClose:
case SEwaitPublish:
case ISubscriber:
case UnsubscribeEvent:
default:
conf.Engine.OnEvent(event)
}
}
func (conf *GlobalConfig) Report(t string, v any) {
out, err := yaml.Marshal(v)
if err == nil {
conf.Engine.OnEvent(append([]byte("type: "+t+"\n"), out...))
}
}

View File

@@ -20,42 +20,6 @@ import (
type StreamState byte
type StreamAction byte
type StateEvent struct {
Action StreamAction
From StreamState
Stream *Stream `json:"-"`
}
func (se StateEvent) Next() (next StreamState, ok bool) {
next, ok = StreamFSM[se.From][se.Action]
return
}
type SEwaitPublish struct {
StateEvent
Publisher IPublisher
}
type SEpublish struct {
StateEvent
}
type SErepublish struct {
StateEvent
}
type SEwaitClose struct {
StateEvent
}
type SEclose struct {
StateEvent
}
type SEKick struct {
}
type UnsubscribeEvent struct {
Subscriber ISubscriber
}
// 四状态机
const (
STATE_WAITPUBLISH StreamState = iota // 等待发布者状态
@@ -286,7 +250,11 @@ func findOrCreateStream(streamPath string, waitTimeout time.Duration) (s *Stream
}
func (r *Stream) action(action StreamAction) (ok bool) {
event := StateEvent{action, r.State, r}
var event StateEvent
event.Target = r
event.Action = action
event.From = r.State
event.Time = time.Now()
var next StreamState
if next, ok = event.Next(); ok {
r.State = next
@@ -314,6 +282,7 @@ func (r *Stream) action(action StreamAction) (ok bool) {
waitTime = time.Millisecond * 10 //没有订阅者也没有配置发布者等待重连时间默认10ms后关闭流
}
r.timeout.Reset(waitTime)
r.Debug("wait publish", zap.Duration("wait", waitTime))
case STATE_PUBLISHING:
if len(r.SEHistory) > 1 {
stateEvent = SErepublish{event}
@@ -392,8 +361,16 @@ func (s *Stream) onSuberClose(sub ISubscriber) {
// 流状态处理中枢,包括接收订阅发布指令等
func (s *Stream) run() {
EventBus <- SEcreate{StreamEvent{Event[*Stream]{Target: s, Time: time.Now()}}}
pulseTicker := time.NewTicker(time.Second * 5)
defer pulseTicker.Stop()
pulseSuber := make(map[ISubscriber]struct{})
for {
select {
case <-pulseTicker.C:
for sub := range pulseSuber {
sub.OnEvent(PulseEvent{CreateEvent(struct{}{})})
}
case <-s.timeout.C:
if s.State == STATE_PUBLISHING {
for sub := range s.Subscribers.internal {
@@ -412,7 +389,10 @@ func (s *Stream) run() {
if lastWriteTime := t.LastWriteTime(); !lastWriteTime.IsZero() && time.Since(lastWriteTime) > s.PublishTimeout {
s.Warn("track timeout", zap.String("name", name), zap.Time("lastWriteTime", lastWriteTime), zap.Duration("timeout", s.PublishTimeout))
delete(s.Tracks.Map.Map, name)
s.Subscribers.Broadcast(TrackRemoved{t})
var event TrackTimeoutEvent
event.Target = t
event.Time = time.Now()
s.Subscribers.Broadcast(event)
}
})
if s.State != STATE_PUBLISHING {
@@ -432,6 +412,8 @@ func (s *Stream) run() {
case action, ok := <-s.actionChan.C:
if ok {
switch v := action.(type) {
case SubPulse:
pulseSuber[v] = struct{}{}
case *util.Promise[IPublisher]:
if s.IsClosed() {
v.Reject(ErrStreamIsClosed)
@@ -491,7 +473,8 @@ func (s *Stream) run() {
if s.Subscribers.Len() == 1 && s.State == STATE_WAITCLOSE {
s.action(ACTION_FIRSTENTER)
}
case ISubscriber:
case Unsubscribe:
delete(pulseSuber, v)
s.onSuberClose(v)
case TrackRemoved:
name := v.GetBase().Name
@@ -534,14 +517,22 @@ func (s *Stream) run() {
}
}
func (s *Stream) AddTrack(t *util.Promise[Track]) {
s.Receive(t)
func (s *Stream) AddTrack(t Track) (promise *util.Promise[Track]) {
promise = util.NewPromise(t)
s.Receive(promise)
return
}
type TrackRemoved struct {
Track
}
type SubPulse struct {
ISubscriber
}
type Unsubscribe ISubscriber
func (s *Stream) RemoveTrack(t Track) {
s.Receive(TrackRemoved{t})
}

View File

@@ -2,6 +2,7 @@ package engine
import (
"context"
"fmt"
"io"
"net"
"strconv"
@@ -164,6 +165,10 @@ func (s *Subscriber) IsPlaying() bool {
return s.TrackPlayer.Context != nil && s.TrackPlayer.Err() == nil
}
func (s *Subscriber) SubPulse() {
s.Stream.Receive(SubPulse{s})
}
func (s *Subscriber) PlayRaw() {
s.PlayBlock(SUBTYPE_RAW)
}
@@ -206,11 +211,11 @@ func (s *Subscriber) PlayBlock(subType byte) {
switch subType {
case SUBTYPE_RAW:
sendVideoFrame = func(frame *AVFrame) {
// println("v", frame.Sequence, frame.AbsTime, s.VideoReader.AbsTime, frame.IFrame)
// fmt.Println("v", frame.Sequence, s.VideoReader.AbsTime, frame.IFrame)
spesic.OnEvent(VideoFrame{frame, s.Video, s.VideoReader.AbsTime, s.VideoReader.GetPTS32(), s.VideoReader.GetDTS32()})
}
sendAudioFrame = func(frame *AVFrame) {
// println("a", frame.Sequence, frame.AbsTime, s.AudioReader.AbsTime)
// fmt.Println("a", frame.Sequence, s.AudioReader.AbsTime)
spesic.OnEvent(AudioFrame{frame, s.Audio, s.AudioReader.AbsTime, s.AudioReader.GetPTS32(), s.AudioReader.GetDTS32()})
}
case SUBTYPE_RTP:
@@ -243,7 +248,8 @@ func (s *Subscriber) PlayBlock(subType byte) {
case SUBTYPE_FLV:
flvHeadCache := make([]byte, 15) //内存复用
sendFlvFrame := func(t byte, ts uint32, avcc ...[]byte) {
// fmt.Println(ts)
// println(t, ts)
fmt.Printf("%d %X %X %d\n",t, avcc[0][0], avcc[0][1], ts)
flvHeadCache[0] = t
result := append(FLVFrame{flvHeadCache[:11]}, avcc...)
dataSize := uint32(util.SizeOfBuffers(avcc))
@@ -273,7 +279,7 @@ func (s *Subscriber) PlayBlock(subType byte) {
sendFlvFrame(codec.FLV_TAG_TYPE_VIDEO, s.VideoReader.AbsTime, frame.AVCC.ToBuffers()...)
}
sendAudioFrame = func(frame *AVFrame) {
// println(frame.Sequence, s.AudioReader.AbsTime, frame.DeltaTime)
// fmt.Println(frame.Sequence, s.AudioReader.AbsTime, frame.DeltaTime)
sendFlvFrame(codec.FLV_TAG_TYPE_AUDIO, s.AudioReader.AbsTime, frame.AVCC.ToBuffers()...)
}
}
@@ -282,43 +288,38 @@ func (s *Subscriber) PlayBlock(subType byte) {
if s.Args.Has(conf.SubModeArgName) {
subMode, _ = strconv.Atoi(s.Args.Get(conf.SubModeArgName))
}
var initState = 0
var videoFrame, audioFrame *AVFrame
var lastAbsTime time.Duration
for ctx.Err() == nil {
if hasVideo {
for ctx.Err() == nil {
s.VideoReader.Read(ctx, subMode)
frame := s.VideoReader.Frame
if frame == nil || ctx.Err() != nil {
videoFrame = s.VideoReader.Frame
if videoFrame == nil || ctx.Err() != nil {
return
}
// fmt.Println("video", s.VideoReader.Track.PreFrame().Sequence-frame.Sequence)
if frame.IFrame && s.VideoReader.DecConfChanged() {
if videoFrame.IFrame && s.VideoReader.DecConfChanged() {
s.VideoReader.ConfSeq = s.VideoReader.Track.SequenceHeadSeq
sendVideoDecConf()
}
if hasAudio {
if audioFrame != nil {
if frame.Timestamp > lastAbsTime {
if videoFrame.Timestamp > audioFrame.Timestamp {
// fmt.Println("switch audio", audioFrame.CanRead)
if audioFrame.CanRead {
sendAudioFrame(audioFrame)
}
audioFrame = nil
videoFrame = frame
lastAbsTime = frame.Timestamp
break
}
} else if lastAbsTime == 0 {
if lastAbsTime = frame.Timestamp; lastAbsTime != 0 {
videoFrame = frame
} else if initState++; initState >= 2 {
break
}
}
}
if !conf.IFrameOnly || frame.IFrame {
sendVideoFrame(frame)
if !conf.IFrameOnly || videoFrame.IFrame {
sendVideoFrame(videoFrame)
} else {
// fmt.Println("skip video", frame.Sequence)
}
@@ -339,8 +340,8 @@ func (s *Subscriber) PlayBlock(subType byte) {
}
}
s.AudioReader.Read(ctx, subMode)
frame := s.AudioReader.Frame
if frame == nil || ctx.Err() != nil {
audioFrame = s.AudioReader.Frame
if audioFrame == nil || ctx.Err() != nil {
return
}
// fmt.Println("audio", s.AudioReader.Track.PreFrame().Sequence-frame.Sequence)
@@ -349,19 +350,17 @@ func (s *Subscriber) PlayBlock(subType byte) {
sendAudioDecConf()
}
if hasVideo && videoFrame != nil {
if frame.Timestamp > lastAbsTime {
if audioFrame.Timestamp > videoFrame.Timestamp {
// fmt.Println("switch video", videoFrame.CanRead)
if videoFrame.CanRead {
sendVideoFrame(videoFrame)
}
videoFrame = nil
audioFrame = frame
lastAbsTime = frame.Timestamp
break
}
}
if frame.Timestamp >= s.AudioReader.SkipTs {
sendAudioFrame(frame)
if audioFrame.Timestamp >= s.AudioReader.SkipTs {
sendAudioFrame(audioFrame)
} else {
// fmt.Println("skip audio", frame.AbsTime, s.AudioReader.SkipTs)
}

View File

@@ -93,7 +93,7 @@ func (s *Subscribers) Delete(suber ISubscriber) {
io := suber.GetSubscriber()
io.Info("suber -1", zap.Int("remains", s.Len()))
if config.Global.EnableSubEvent {
EventBus <- UnsubscribeEvent{suber}
EventBus <- UnsubscribeEvent{CreateEvent(suber)}
}
}

View File

@@ -3,7 +3,6 @@ package track
import (
"go.uber.org/zap"
"m7s.live/engine/v4/codec"
"m7s.live/engine/v4/common"
. "m7s.live/engine/v4/common"
"m7s.live/engine/v4/util"
)
@@ -22,9 +21,7 @@ type Audio struct {
func (a *Audio) Attach() {
if a.Attached.CompareAndSwap(false, true) {
promise := util.NewPromise(common.Track(a))
a.Stream.AddTrack(promise)
if err := promise.Await(); err != nil {
if err := a.Stream.AddTrack(a).Await(); err != nil {
a.Error("attach audio track failed", zap.Error(err))
} else {
a.Info("audio track attached", zap.Uint32("sample rate", a.SampleRate))

View File

@@ -5,6 +5,7 @@ import (
"unsafe"
"github.com/pion/rtp"
"go.uber.org/zap"
. "m7s.live/engine/v4/common"
"m7s.live/engine/v4/config"
"m7s.live/engine/v4/util"
@@ -12,19 +13,27 @@ import (
type 流速控制 struct {
起始时间戳 time.Duration
起始dts time.Duration
等待上限 time.Duration
起始时间 time.Time
}
func (p *流速控制) 重置(绝对时间戳 time.Duration) {
func (p *流速控制) 重置(绝对时间戳 time.Duration, dts time.Duration) {
p.起始时间 = time.Now()
p.起始时间戳 = 绝对时间戳
p.起始dts = dts
// println("重置", p.起始时间.Format("2006-01-02 15:04:05"), p.起始时间戳)
}
func (p *流速控制) 根据起始DTS计算绝对时间戳(dts time.Duration) time.Duration {
if dts < p.起始dts {
dts += 0xFFFFFFFFF
}
return ((dts-p.起始dts)*time.Millisecond + p.起始时间戳*90) / 90
}
func (p *流速控制) 时间戳差(绝对时间戳 time.Duration) time.Duration {
return 绝对时间戳 - p.起始时间戳
}
func (p *流速控制) 控制流速(绝对时间戳 time.Duration) {
func (p *流速控制) 控制流速(绝对时间戳 time.Duration, dts time.Duration) {
数据时间差, 实际时间差 := p.时间戳差(绝对时间戳), time.Since(p.起始时间)
// println("数据时间差", 数据时间差, "实际时间差", 实际时间差, "绝对时间戳", 绝对时间戳, "起始时间戳", p.起始时间戳, "起始时间", p.起始时间.Format("2006-01-02 15:04:05"))
// if 实际时间差 > 数据时间差 {
@@ -41,6 +50,8 @@ func (p *流速控制) 控制流速(绝对时间戳 time.Duration) {
time.Sleep(过快)
}
} else if 过快 < -100*time.Millisecond {
// fmt.Println("过慢毫秒", 过快.Milliseconds())
// p.重置(绝对时间戳, dts)
// println("过慢毫秒", p.name, 过快.Milliseconds())
}
}
@@ -89,6 +100,10 @@ type Media struct {
流速控制
}
func (av *Media) GetRBSize() int {
return av.RingBuffer.Size
}
func (av *Media) GetRTPFromPool() (result *util.ListItem[RTPFrame]) {
result = av.RtpPool.Get()
if result.Value.Packet == nil {
@@ -204,16 +219,29 @@ func (av *Media) AddIDR() {
func (av *Media) Flush() {
curValue, preValue, nextValue := &av.Value, av.LastValue, av.Next()
useDts := curValue.Timestamp == 0
if av.State == TrackStateOffline {
av.State = TrackStateOnline
av.deltaTs = preValue.Timestamp - curValue.Timestamp + time.Duration(preValue.DeltaTime)*time.Millisecond
av.Info("track back online")
if useDts {
av.deltaTs = curValue.DTS - preValue.DTS
curValue.Timestamp = preValue.Timestamp + time.Millisecond
} else {
av.deltaTs = curValue.Timestamp - preValue.Timestamp
}
curValue.DTS += 90
curValue.PTS += 90
curValue.Timestamp += time.Millisecond
av.Info("track back online", zap.Duration("delta", av.deltaTs))
} else if av.deltaTs != 0 {
if useDts {
curValue.DTS -= av.deltaTs
curValue.PTS -= av.deltaTs
} else {
rtpts := av.deltaTs * 90 / time.Millisecond
curValue.DTS -= rtpts
curValue.PTS -= rtpts
curValue.Timestamp -= av.deltaTs
}
if av.deltaTs != 0 {
rtpts := av.deltaTs * 90 / 1000
curValue.DTS = curValue.DTS + rtpts
curValue.PTS = curValue.PTS + rtpts
curValue.Timestamp = 0
}
bufferTime := av.Stream.GetPublisherConfig().BufferTime
if bufferTime > 0 && av.IDRingList.Length > 1 && curValue.Timestamp-av.IDRingList.Next.Next.Value.Value.Timestamp > bufferTime {
@@ -232,17 +260,17 @@ func (av *Media) Flush() {
if av.起始时间.IsZero() {
curValue.DeltaTime = 0
if curValue.Timestamp == 0 {
if useDts {
curValue.Timestamp = time.Since(av.Stream.GetStartTime())
}
av.重置(curValue.Timestamp)
av.重置(curValue.Timestamp, curValue.DTS)
} else {
if curValue.Timestamp == 0 {
curValue.Timestamp = (preValue.Timestamp*90 + (curValue.DTS-preValue.DTS)*time.Millisecond) / 90
if useDts {
curValue.Timestamp = av.根据起始DTS计算绝对时间戳(curValue.DTS)
}
curValue.DeltaTime = uint32((curValue.Timestamp - preValue.Timestamp) / time.Millisecond)
}
// fmt.Println(av.Name, curValue.DTS, curValue.Timestamp, curValue.DeltaTime)
// fmt.Println(av.Name, curValue.Timestamp, curValue.DeltaTime)
if curValue.AUList.Length > 0 {
// 补完RTP
if config.Global.EnableRTP && curValue.RTP.Length == 0 {
@@ -255,7 +283,7 @@ func (av *Media) Flush() {
}
av.Base.Flush(&curValue.BaseFrame)
if av.等待上限 > 0 {
av.控制流速(curValue.Timestamp)
av.控制流速(curValue.Timestamp, curValue.DTS)
}
preValue = curValue
curValue = av.MoveNext()

View File

@@ -6,7 +6,6 @@ import (
"time"
"go.uber.org/zap"
"m7s.live/engine/v4/common"
. "m7s.live/engine/v4/common"
"m7s.live/engine/v4/util"
)
@@ -17,6 +16,10 @@ type Data struct {
sync.Locker // 写入锁,可选,单一协程写入可以不加锁
}
func (d *Data) GetRBSize() int {
return d.LockRing.RingBuffer.Size
}
func (d *Data) ReadRing() *LockRing[any] {
return util.Clone(d.LockRing)
}
@@ -47,9 +50,7 @@ func (d *Data) Play(ctx context.Context, onData func(any) error) error {
}
func (d *Data) Attach() {
promise := util.NewPromise(common.Track(d))
d.Stream.AddTrack(promise)
if err := promise.Await(); err != nil {
if err := d.Stream.AddTrack(d).Await(); err != nil {
d.Error("attach data track failed", zap.Error(err))
} else {
d.Info("data track attached")

View File

@@ -125,7 +125,11 @@ func (r *AVRingReader) Read(ctx context.Context, mode int) (err error) {
r.ReadFrame()
}
r.AbsTime = uint32((r.Frame.Timestamp - r.SkipTs).Milliseconds())
if r.AbsTime == 0 {
r.AbsTime = 1
}
r.Delay = uint32((r.Track.LastValue.Timestamp - r.Frame.Timestamp).Milliseconds())
// fmt.Println(r.Track.Name, r.Delay)
// println(r.Track.Name, r.State, r.Frame.AbsTime, r.SkipTs, r.AbsTime)
return
}
@@ -137,5 +141,5 @@ func (r *AVRingReader) GetDTS32() uint32 {
}
func (r *AVRingReader) ResetAbsTime() {
r.SkipTs = r.Frame.Timestamp
r.AbsTime = 0
r.AbsTime = 1
}

View File

@@ -30,9 +30,7 @@ type Video struct {
func (v *Video) Attach() {
if v.Attached.CompareAndSwap(false, true) {
promise := util.NewPromise(common.Track(v))
v.Stream.AddTrack(promise)
if err := promise.Await(); err != nil {
if err := v.Stream.AddTrack(v).Await(); err != nil {
v.Error("attach video track failed", zap.Error(err))
} else {
v.Info("video track attached", zap.Uint("width", v.Width), zap.Uint("height", v.Height))

View File

@@ -83,6 +83,7 @@ func (b Buffer) SubBuf(start int, length int) Buffer {
return b[start : start+length]
}
// Malloc 扩大原来的buffer的长度返回新增的buffer
func (b *Buffer) Malloc(count int) Buffer {
l := b.Len()
newL := l + count
@@ -96,14 +97,14 @@ func (b *Buffer) Malloc(count int) Buffer {
return b.SubBuf(l, count)
}
func (b *Buffer) Reset() {
*b = b.SubBuf(0, 0)
// Relloc 改变 buffer 到指定大小
func (b *Buffer) Relloc(count int) {
b.Reset()
b.Malloc(count)
}
func (b *Buffer) Glow(n int) {
l := b.Len()
b.Malloc(n)
*b = b.SubBuf(0, l)
func (b *Buffer) Reset() {
*b = b.SubBuf(0, 0)
}
func (b *Buffer) Split(n int) (result net.Buffers) {