diff --git a/README.md b/README.md index 85b2438..f088822 100644 --- a/README.md +++ b/README.md @@ -37,6 +37,7 @@ - 获取所有远端拉流信息 `/api/list/pull` 返回{RemoteURL:"",StreamPath:"",Type:"",StartTime:""} - 获取所有向远端推流信息 `/api/list/push` 返回{RemoteURL:"",StreamPath:"",Type:"",StartTime:""} - 停止推流 `/api/stoppush?url=xxx` 停止向xxx推流 ,成功返回ok +- 插入SEI帧 `/api/insertsei?streamPath=xxx&type=5` 向xxx流内插入SEI帧 ,成功返回ok。type为SEI类型,可选,默认是5 # 引擎默认配置 ```yaml global: @@ -58,12 +59,12 @@ global: pubaudio: true # 是否发布音频流 pubvideo: true # 是否发布视频流 kickexist: false # 剔出已经存在的发布者,用于顶替原有发布者 + insertsei: false # 是否开启插入SEI信息功能 publishtimeout: 10s # 发布流默认过期时间,超过该时间发布者没有恢复流将被删除 delayclosetimeout: 0 # 自动关闭触发后延迟的时间(期间内如果有新的订阅则取消触发关闭),0为关闭该功能,保持连接。 waitclosetimeout: 0 # 发布者断开后等待时间,超过该时间发布者没有恢复流将被删除,0为关闭该功能,由订阅者决定是否删除 buffertime: 0 # 缓存时间,用于时光回溯,0为关闭缓存 idletimeout: 0 # 空闲超时时间,0为不限制 - poll: 20ms # 订阅者轮询时间,伪自选锁等待周期 key: # 发布鉴权key secretargname: secret # 发布鉴权参数名 expireargname: expire # 发布鉴权失效时间参数名 @@ -79,7 +80,6 @@ global: iframeonly: false # 只订阅关键帧 waittimeout: 10s # 等待发布者的超时时间,用于订阅尚未发布的流 writebuffersize: 0 # 订阅者写缓存大小,用于减少io次数,但可能影响实时性 - poll: 20ms # 订阅者轮询时间,伪自选锁等待周期 key: # 订阅鉴权key secretargname: secret # 订阅鉴权参数名 expireargname: expire # 订阅鉴权失效时间参数名 diff --git a/codec/h264.go b/codec/h264.go index d78b781..15102ee 100644 --- a/codec/h264.go +++ b/codec/h264.go @@ -103,7 +103,6 @@ var ( RTMP_KEYFRAME_HEAD = []byte{0x17, 0x01, 0x00, 0x00, 0x00} RTMP_NORMALFRAME_HEAD = []byte{0x27, 0x01, 0x00, 0x00, 0x00} ) -var NALU_SEI_BYTE []byte // H.264/AVC视频编码标准中,整个系统框架被分为了两个层面:视频编码层面(VCL)和网络抽象层面(NAL) // NAL - Network Abstract Layer diff --git a/common/frame.go b/common/frame.go index 6444e22..efa33c7 100644 --- a/common/frame.go +++ b/common/frame.go @@ -44,23 +44,24 @@ func (r *RTPFrame) Unmarshal(raw []byte) *RTPFrame { return r } -type BaseFrame struct { +type DataFrame[T any] struct { DeltaTime uint32 // 相对上一帧时间戳,毫秒 WriteTime time.Time // 写入时间,可用于比较两个帧的先后 Sequence uint32 // 在一个Track中的序号 BytesIn int // 输入字节数用于计算BPS + CanRead bool `json:"-" yaml:"-"` + Data T `json:"-" yaml:"-"` + sync.Cond `json:"-" yaml:"-"` } -type DataFrame[T any] struct { - BaseFrame - Value T `json:"-" yaml:"-"` +func (df *DataFrame[T]) Reset() { + df.BytesIn = 0 + df.DeltaTime = 0 } type AVFrame struct { - BaseFrame + DataFrame[any] IFrame bool - CanRead bool `json:"-" yaml:"-"` - sync.Cond `json:"-" yaml:"-"` PTS time.Duration DTS time.Duration Timestamp time.Duration // 绝对时间戳 @@ -68,7 +69,6 @@ type AVFrame struct { AVCC util.BLL `json:"-" yaml:"-"` // 打包好的AVCC格式(MPEG-4格式、Byte-Stream Format) RTP util.List[RTPFrame] `json:"-" yaml:"-"` AUList util.BLLs `json:"-" yaml:"-"` // 裸数据 - Extras any `json:"-" yaml:"-"` // 任意扩展数据 } func (av *AVFrame) WriteAVCC(ts uint32, frame *util.BLL) { @@ -97,9 +97,8 @@ func (av *AVFrame) Reset() { av.ADTS.Recycle() av.ADTS = nil } - av.BytesIn = 0 av.Timestamp = 0 - av.DeltaTime = 0 + av.DataFrame.Reset() } type ParamaterSets [][]byte diff --git a/common/index.go b/common/index.go index 67f1264..0b970e4 100644 --- a/common/index.go +++ b/common/index.go @@ -22,24 +22,25 @@ const ( ) // Base 基础Track类 -type Base struct { - Name string - log.Zap `json:"-" yaml:"-"` - Stream IStream `json:"-" yaml:"-"` - Attached atomic.Bool `json:"-" yaml:"-"` - State TrackState - ts time.Time - bytes int - frames int - DropCount int `json:"-" yaml:"-"` //丢帧数 - BPS int - FPS int - Drops int // 丢帧率 - RawSize int // 裸数据长度 - RawPart []int // 裸数据片段用于UI上显示 +type Base[T any] struct { + RingBuffer[T] `json:"-" yaml:"-"` + Name string + log.Zap `json:"-" yaml:"-"` + Stream IStream `json:"-" yaml:"-"` + Attached atomic.Bool `json:"-" yaml:"-"` + State TrackState + ts time.Time + bytes int + frames int + DropCount int `json:"-" yaml:"-"` //丢帧数 + BPS int + FPS int + Drops int // 丢帧率 + RawSize int // 裸数据长度 + RawPart []int // 裸数据片段用于UI上显示 } -func (bt *Base) ComputeBPS(bytes int) { +func (bt *Base[T]) ComputeBPS(bytes int) { bt.bytes += bytes bt.frames++ if elapse := time.Since(bt.ts).Seconds(); elapse > 1 { @@ -53,22 +54,31 @@ func (bt *Base) ComputeBPS(bytes int) { } } -func (bt *Base) GetBase() *Base { - return bt +func (bt *Base[T]) GetName() string { + return bt.Name +} + +func (bt *Base[T]) GetBPS() int { + return bt.BPS +} + +func (bt *Base[T]) GetFPS() int { + return bt.FPS +} + +func (bt *Base[T]) GetDrops() int { + return bt.Drops } // GetRBSize 获取缓冲区大小 -func (bt *Base) GetRBSize() int { - return 0 +func (bt *Base[T]) GetRBSize() int { + return bt.RingBuffer.Size } -func (bt *Base) SnapForJson() { +func (bt *Base[T]) SnapForJson() { } -func (bt *Base) Flush(bf *BaseFrame) { - bt.ComputeBPS(bf.BytesIn) - bf.WriteTime = time.Now() -} -func (bt *Base) SetStuff(stuff ...any) { + +func (bt *Base[T]) SetStuff(stuff ...any) { for _, s := range stuff { switch v := s.(type) { case IStream: @@ -83,11 +93,15 @@ func (bt *Base) SetStuff(stuff ...any) { } type Track interface { - GetBase() *Base + GetName() string + GetBPS() int + GetFPS() int + GetDrops() int LastWriteTime() time.Time SnapForJson() SetStuff(stuff ...any) GetRBSize() int + Dispose() } type AVTrack interface { diff --git a/common/ring.go b/common/ring.go index e11b465..8aa7589 100644 --- a/common/ring.go +++ b/common/ring.go @@ -34,8 +34,11 @@ func (rb *RingBuffer[T]) Glow(size int) (newItem *util.Ring[T]) { return } -func (rb *RingBuffer[T]) Reduce(size int) (newItem *util.Ring[T]) { - newItem = rb.Unlink(size) +func (rb *RingBuffer[T]) Reduce(size int) (newItem *RingBuffer[T]) { + newItem = &RingBuffer[T]{ + Ring: rb.Unlink(size), + Size: size, + } rb.Size -= size return } @@ -45,7 +48,7 @@ func (rb *RingBuffer[T]) Reduce(size int) (newItem *util.Ring[T]) { func (rb *RingBuffer[T]) Do(f func(*T)) { if rb != nil { f(&rb.Value) - for p := rb.Next(); p != rb.Ring; p = rb.Next() { + for p := rb.Next(); p != rb.Ring; p = p.Next() { f(&p.Value) } } diff --git a/common/ring_lock.go b/common/ring_lock.go deleted file mode 100644 index 33f76e7..0000000 --- a/common/ring_lock.go +++ /dev/null @@ -1,77 +0,0 @@ -package common - -import ( - "sync" - "sync/atomic" -) - -type LockFrame[T any] struct { - DataFrame[T] - sync.RWMutex -} - -type LockRing[T any] struct { - RingBuffer[LockFrame[T]] - Reset func(*DataFrame[T]) `json:"-" yaml:"-"` - Flag *int32 -} - -func (lr *LockRing[T]) Init(n int) *LockRing[T] { - var flag int32 - if lr == nil { - lr = &LockRing[T]{} - } - lr.Reset = func(*DataFrame[T]) { - } - lr.RingBuffer.Init(n) - lr.Flag = &flag - lr.RingBuffer.Value.Lock() - return lr -} - -func (rb *LockRing[T]) Read() *DataFrame[T] { - current := &rb.RingBuffer.Value - current.RLock() - defer current.RUnlock() - return ¤t.DataFrame -} - -// func (rb *LockRing[T]) Step() { -// if atomic.CompareAndSwapInt32(rb.Flag, 0, 1) { -// current := rb.RingBuffer.MoveNext() -// current.Lock() -// rb.RingBuffer.LastValue.Unlock() -// //Flag不为1代表被Dispose了,但尚未处理Done -// if !atomic.CompareAndSwapInt32(rb.Flag, 1, 0) { -// current.Unlock() -// } -// } -// } - -func (rb *LockRing[T]) Write(value T) { - rb.Value.Value = value - if atomic.CompareAndSwapInt32(rb.Flag, 0, 1) { - current := rb.RingBuffer.MoveNext() - current.Lock() - if current.Sequence != 0 { - rb.Reset(¤t.DataFrame) - } - current.Sequence = rb.RingBuffer.MoveCount - rb.LastValue.Unlock() - //Flag不为1代表被Dispose了,但尚未处理Done - if !atomic.CompareAndSwapInt32(rb.Flag, 1, 0) { - current.Unlock() - } - } -} - -func (rb *LockRing[T]) Dispose() { - current := &rb.RingBuffer.Value - if atomic.CompareAndSwapInt32(rb.Flag, 0, 2) { - current.Unlock() - } else if atomic.CompareAndSwapInt32(rb.Flag, 1, 2) { - //当前是1代表正在写入,此时变成2,但是Done的任务得交给NextW来处理 - } else if atomic.CompareAndSwapInt32(rb.Flag, 0, 2) { - current.Unlock() - } -} diff --git a/config/http.go b/config/http.go index 3396991..9be1562 100644 --- a/config/http.go +++ b/config/http.go @@ -73,6 +73,7 @@ func (config *HTTP) Listen(ctx context.Context) error { } else { log.Info("🌐 https listen at ", Blink(config.ListenAddrTLS)) } + cer, _ := tls.X509KeyPair(LocalCert, LocalKey) var server = http.Server{ Addr: config.ListenAddrTLS, ReadTimeout: config.ReadTimeout, @@ -80,6 +81,7 @@ func (config *HTTP) Listen(ctx context.Context) error { IdleTimeout: config.IdleTimeout, Handler: config.mux, TLSConfig: &tls.Config{ + Certificates: []tls.Certificate{cer}, CipherSuites: []uint16{ tls.TLS_AES_128_GCM_SHA256, tls.TLS_CHACHA20_POLY1305_SHA256, diff --git a/config/remote.go b/config/remote.go index 7126ae6..15ee064 100644 --- a/config/remote.go +++ b/config/remote.go @@ -39,7 +39,29 @@ func (w *myResponseWriter3) Hijack() (net.Conn, *bufio.ReadWriter, error) { return net.Conn(w), bufio.NewReadWriter(bufio.NewReader(w), bufio.NewWriter(w)), nil } -func (cfg *Engine) Remote(ctx context.Context) error { +func (cfg *Engine) WtRemote(ctx context.Context) { + retryDelay := [...]int{2, 3, 5, 8, 13} + for i := 0; ctx.Err() == nil; i++ { + connected, err := cfg.Remote(ctx) + if err == nil { + //不需要重试了,服务器返回了错误 + return + } + if Global.LogLang == "zh" { + log.Error("连接到控制台服务器", cfg.Server, "失败", err) + } else { + log.Error("connect to console server ", cfg.Server, " ", err) + } + if connected { + i = 0 + } else if i >= 5 { + i = 4 + } + time.Sleep(time.Second * time.Duration(retryDelay[i])) + } +} + +func (cfg *Engine) Remote(ctx context.Context) (wasConnected bool, err error) { tlsConf := &tls.Config{ InsecureSkipVerify: true, NextProtos: []string{"monibuca"}, @@ -49,7 +71,7 @@ func (cfg *Engine) Remote(ctx context.Context) error { KeepAlivePeriod: time.Second * 10, EnableDatagrams: true, }) - wasConnected := err == nil + wasConnected = err == nil if stream := quic.Stream(nil); err == nil { if stream, err = conn.OpenStreamSync(ctx); err == nil { _, err = stream.Write(append([]byte{1}, (cfg.Secret + "\n")...)) @@ -63,7 +85,7 @@ func (cfg *Engine) Remote(ctx context.Context) error { } else { log.Error("response from console server ", cfg.Server, " ", rMessage["msg"]) } - return nil + return false, nil } else { cfg.reportStream = stream if Global.LogLang == "zh" { @@ -90,21 +112,7 @@ func (cfg *Engine) Remote(ctx context.Context) error { go cfg.ReceiveRequest(s, conn) } } - - if err != nil { - if wasConnected { - if Global.LogLang == "zh" { - log.Error("连接到控制台服务器", cfg.Server, "失败", err) - } else { - log.Error("connect to console server ", cfg.Server, " ", err) - } - } - if ctx.Err() == nil { - go cfg.Remote(ctx) - } - } - - return err + return wasConnected, err } func (cfg *Engine) ReceiveRequest(s quic.Stream, conn quic.Connection) error { diff --git a/config/types.go b/config/types.go index c28618d..a276789 100755 --- a/config/types.go +++ b/config/types.go @@ -30,6 +30,7 @@ type PushConfig interface { type Publish struct { PubAudio bool `default:"true"` PubVideo bool `default:"true"` + InsertSEI bool // 是否启用SEI插入 KickExist bool // 是否踢掉已经存在的发布者 PublishTimeout time.Duration `default:"10s"` // 发布无数据超时 WaitCloseTimeout time.Duration // 延迟自动关闭(等待重连) @@ -60,7 +61,7 @@ type Subscribe struct { IFrameOnly bool // 只要关键帧 WaitTimeout time.Duration `default:"10s"` // 等待流超时 WriteBufferSize int `default:"0"` // 写缓冲大小 - Poll time.Duration `default:"20ms"` // 读取Ring时的轮询间隔,单位毫秒 + // Poll time.Duration `default:"20ms"` // 读取Ring时的轮询间隔,单位毫秒 Key string // 订阅鉴权key SecretArgName string `default:"secret"` // 订阅鉴权参数名 ExpireArgName string `default:"expire"` // 订阅鉴权失效时间参数名 @@ -114,7 +115,7 @@ func (p *Push) AddPush(url string, streamPath string) { } type Console struct { - Server string `default:"console.monibuca.com:4242"` //远程控制台地址 + Server string `default:"console.monibuca.com:44944"` //远程控制台地址 Secret string //远程控制台密钥 PublicAddr string //公网地址,提供远程控制台访问的地址,不配置的话使用自动识别的地址 PublicAddrTLS string @@ -154,6 +155,7 @@ var Global *Engine func (cfg *Engine) InitDefaultHttp() { Global = cfg cfg.HTTP.mux = http.DefaultServeMux + cfg.HTTP.ListenAddrTLS = ":8443" cfg.HTTP.ListenAddr = ":8080" } @@ -244,7 +246,7 @@ func (cfg *Engine) OnEvent(event any) { if strings.HasPrefix(cfg.Console.Server, "wss") { go cfg.WsRemote() } else { - go cfg.Remote(v) + go cfg.WtRemote(v) } } } diff --git a/http.go b/http.go index 4d6cbc5..0169b03 100644 --- a/http.go +++ b/http.go @@ -2,6 +2,7 @@ package engine import ( "encoding/json" + "io" "net/http" "os" "strconv" @@ -341,3 +342,33 @@ func (conf *GlobalConfig) API_replay_mp4(w http.ResponseWriter, r *http.Request) go pub.ReadMP4Data(f) } } + +func (conf *GlobalConfig) API_insertSEI(w http.ResponseWriter, r *http.Request) { + q := r.URL.Query() + streamPath := q.Get("streamPath") + s := Streams.Get(streamPath) + if s == nil { + http.Error(w, NO_SUCH_STREAM, http.StatusNotFound) + return + } + t := q.Get("type") + tb, err := strconv.ParseInt(t, 10, 8) + if err != nil { + if t == "" { + tb = 5 + } else { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + } + sei, err := io.ReadAll(r.Body) + if err == nil { + if s.Tracks.AddSEI(byte(tb), sei) { + w.Write([]byte("ok")) + } else { + http.Error(w, "no sei track", http.StatusBadRequest) + } + } else { + http.Error(w, err.Error(), http.StatusBadRequest) + } +} diff --git a/io.go b/io.go index f6fed29..8056050 100644 --- a/io.go +++ b/io.go @@ -165,9 +165,6 @@ func (io *IO) receive(streamPath string, specific IIO) error { io.Context, io.CancelFunc = context.WithCancel(Engine) } s, create := findOrCreateStream(u.Path, wt) - if s == nil { - return ErrBadStreamName - } io.Stream = s io.Spesific = specific io.StartTime = time.Now() @@ -182,6 +179,8 @@ func (io *IO) receive(streamPath string, specific IIO) error { conf := v.GetPublisher().Config io.Type = strings.TrimSuffix(io.Type, "Publisher") io.Info("publish") + s.pubLocker.Lock() + defer s.pubLocker.Unlock() oldPublisher := s.Publisher if oldPublisher != nil && !oldPublisher.IsClosed() { // 根据配置是否剔出原来的发布者 diff --git a/log/log.go b/log/log.go index 1e2f6c4..0a98f63 100644 --- a/log/log.go +++ b/log/log.go @@ -95,10 +95,8 @@ func (l *Logger) formatLang(msg *string, fields []zapcore.Field) { } func (l *Logger) Trace(msg string, fields ...zap.Field) { - if Trace { - l.formatLang(&msg, fields) - l.Logger.Debug(msg, fields...) - } + l.formatLang(&msg, fields) + l.Logger.Debug(msg, fields...) } func (l *Logger) Debug(msg string, fields ...zap.Field) { diff --git a/main.go b/main.go index 331d25d..c0aca77 100755 --- a/main.go +++ b/main.go @@ -144,16 +144,16 @@ func Run(ctx context.Context, configFile string) (err error) { version = ver } if EngineConfig.LogLang == "zh" { - log.Info("monibuca 引擎版本:", version, Green(" 启动成功")) + log.Info("monibuca ", version, Green(" 启动成功")) } else { - log.Info("monibuca", version, Green(" start success")) + log.Info("monibuca ", version, Green(" start success")) } - var enabledPlugins, disabledPlugins []string + var enabledPlugins, disabledPlugins []*Plugin for _, plugin := range plugins { if plugin.Disabled { - disabledPlugins = append(disabledPlugins, plugin.Name) + disabledPlugins = append(disabledPlugins, plugin) } else { - enabledPlugins = append(enabledPlugins, plugin.Name) + enabledPlugins = append(enabledPlugins, plugin) } } if EngineConfig.LogLang == "zh" { @@ -162,7 +162,7 @@ func Run(ctx context.Context, configFile string) (err error) { fmt.Print("enabled plugins:") } for _, plugin := range enabledPlugins { - fmt.Print(Colorize(" "+plugin+" ", BlackFg|GreenBg|BoldFm), " ") + fmt.Print(Colorize(" "+plugin.Name+" ", BlackFg|GreenBg|BoldFm), " ") } fmt.Println() if EngineConfig.LogLang == "zh" { @@ -171,7 +171,7 @@ func Run(ctx context.Context, configFile string) (err error) { fmt.Print("disabled plugins:") } for _, plugin := range disabledPlugins { - fmt.Print(Colorize(" "+plugin+" ", BlackFg|RedBg|CrossedOutFm), " ") + fmt.Print(Colorize(" "+plugin.Name+" ", BlackFg|RedBg|CrossedOutFm), " ") } fmt.Println() fmt.Println(Bold(Cyan("官网地址: ")), Yellow("https://m7s.live")) @@ -179,6 +179,7 @@ func Run(ctx context.Context, configFile string) (err error) { fmt.Println(Bold(Cyan("文档地址: ")), Yellow("https://docs.m7s.live")) fmt.Println(Bold(Cyan("视频教程: ")), Yellow("https://space.bilibili.com/328443019/channel/collectiondetail?sid=514619")) fmt.Println(Bold(Cyan("远程界面: ")), Yellow("https://console.monibuca.com")) + fmt.Println(Yellow("关注公众号:不卡科技,获取更多信息")) rp := struct { UUID string `json:"uuid"` Machine string `json:"machine"` @@ -194,17 +195,18 @@ func Run(ctx context.Context, configFile string) (err error) { } var c http.Client c.Do(req) + for _, plugin := range enabledPlugins { + plugin.Config.OnEvent(EngineConfig) //引擎初始化完成后,通知插件 + } for { select { case event := <-EventBus: ts := time.Now() - for _, plugin := range Plugins { - if !plugin.Disabled { - ts := time.Now() - plugin.Config.OnEvent(event) - if cost := time.Since(ts); cost > time.Millisecond*100 { - plugin.Warn("event cost too much time", zap.String("event", fmt.Sprintf("%v", event)), zap.Duration("cost", cost)) - } + for _, plugin := range enabledPlugins { + ts := time.Now() + plugin.Config.OnEvent(event) + if cost := time.Since(ts); cost > time.Millisecond*100 { + plugin.Warn("event cost too much time", zap.String("event", fmt.Sprintf("%v", event)), zap.Duration("cost", cost)) } } EngineConfig.OnEvent(event) diff --git a/stream.go b/stream.go index 5e8f671..c35b20c 100644 --- a/stream.go +++ b/stream.go @@ -10,6 +10,8 @@ import ( . "github.com/logrusorgru/aurora" "go.uber.org/zap" + "m7s.live/engine/v4/codec" + "m7s.live/engine/v4/common" . "m7s.live/engine/v4/common" "m7s.live/engine/v4/config" "m7s.live/engine/v4/log" @@ -130,6 +132,7 @@ type StreamTimeoutConfig struct { type Tracks struct { sync.Map MainVideo *track.Video + SEI *track.Data[[]byte] } func (tracks *Tracks) Range(f func(name string, t Track)) { @@ -146,6 +149,11 @@ func (tracks *Tracks) Add(name string, t Track) bool { tracks.MainVideo = v tracks.SetIDR(v) } + if tracks.SEI != nil { + v.SEIReader = &track.DataReader[[]byte]{ + Ring: tracks.SEI.Ring, + } + } case *track.Audio: if tracks.MainVideo != nil { v.Narrow() @@ -165,6 +173,24 @@ func (tracks *Tracks) SetIDR(video Track) { } } +func (tracks *Tracks) AddSEI(t byte, data []byte) bool { + if tracks.SEI != nil { + l := len(data) + var buffer util.Buffer + buffer.WriteByte(byte(codec.NALU_SEI)) + buffer.WriteByte(t) + for l > 255 { + buffer.WriteByte(255) + l -= 255 + } + buffer.WriteByte(byte(l)) + buffer.Write(data) + tracks.SEI.Push(buffer) + return true + } + return false +} + func (tracks *Tracks) MarshalJSON() ([]byte, error) { var trackList []Track tracks.Range(func(_ string, t Track) { @@ -190,6 +216,7 @@ type Stream struct { AppName string StreamName string IsPause bool // 是否处于暂停状态 + pubLocker sync.Mutex } type StreamSummay struct { Path string @@ -222,8 +249,7 @@ func (s *Stream) Summary() (r StreamSummay) { r.Type = s.Publisher.GetPublisher().Type } s.Tracks.Range(func(name string, t Track) { - b := t.GetBase() - r.BPS += b.BPS + r.BPS += t.GetBPS() r.Tracks = append(r.Tracks, name) }) r.Path = s.Path @@ -245,22 +271,20 @@ func findOrCreateStream(streamPath string, waitTimeout time.Duration) (s *Stream log.Warn(Red("Stream Path Format Error:"), streamPath) return nil, false } - if s := Streams.Get(streamPath); s != nil { + actual, loaded := Streams.LoadOrStore(streamPath, &Stream{ + Path: streamPath, + AppName: p[0], + StreamName: strings.Join(p[1:], "/"), + StartTime: time.Now(), + }) + if s := actual.(*Stream); loaded { s.Debug("Stream Found") return s, false } else { - p := strings.Split(streamPath, "/") - s = &Stream{ - Path: streamPath, - AppName: p[0], - StreamName: strings.Join(p[1:], "/"), - StartTime: time.Now(), - timeout: time.NewTimer(waitTimeout), - } + s.timeout = time.NewTimer(waitTimeout) s.Subscribers.Init() s.Logger = log.LocaleLogger.With(zap.String("stream", streamPath)) s.Info("created") - Streams.Set(streamPath, s) s.actionChan.Init(1) go s.run() return s, true @@ -430,13 +454,13 @@ func (s *Stream) run() { } s.Tracks.Range(func(name string, t Track) { trackCount++ - if _, ok := t.(track.Custom); ok { - return - } - // track 超过一定时间没有更新数据了 - if lastWriteTime := t.LastWriteTime(); !lastWriteTime.IsZero() && time.Since(lastWriteTime) > timeout { - s.Warn("track timeout", zap.String("name", name), zap.Time("last writetime", lastWriteTime), zap.Duration("timeout", timeout)) - hasTrackTimeout = true + switch t.(type) { + case *track.Video, *track.Audio: + // track 超过一定时间没有更新数据了 + if lastWriteTime := t.LastWriteTime(); !lastWriteTime.IsZero() && time.Since(lastWriteTime) > timeout { + s.Warn("track timeout", zap.String("name", name), zap.Time("last writetime", lastWriteTime), zap.Duration("timeout", timeout)) + hasTrackTimeout = true + } } }) if trackCount == 0 || hasTrackTimeout || (s.Publisher != nil && s.Publisher.IsClosed()) { @@ -470,6 +494,16 @@ func (s *Stream) run() { } if s.action(ACTION_PUBLISH) || republish || kicked { v.Resolve() + if s.Publisher.GetPublisher().Config.InsertSEI { + if s.Tracks.SEI == nil { + s.Tracks.SEI = track.NewDataTrack[[]byte]("sei") + s.Tracks.SEI.Locker = &sync.Mutex{} + s.Tracks.SEI.SetStuff(s) + if s.Tracks.Add("sei", s.Tracks.SEI) { + s.Info("sei track added") + } + } + } } else { v.Reject(ErrBadStreamName) } @@ -526,13 +560,11 @@ func (s *Stream) run() { s.onSuberClose(v) case TrackRemoved: timeOutInfo = zap.String("action", "TrackRemoved") - name := v.GetBase().Name + name := v.GetName() if t, ok := s.Tracks.LoadAndDelete(name); ok { s.Info("track -1", zap.String("name", name)) s.Subscribers.Broadcast(t) - if dt, ok := t.(track.Custom); ok { - dt.Dispose() - } + t.(common.Track).Dispose() } case *util.Promise[Track]: timeOutInfo = zap.String("action", "Track") @@ -540,7 +572,7 @@ func (s *Stream) run() { s.action(ACTION_PUBLISH) } pubConfig := s.GetPublisherConfig() - name := v.Value.GetBase().Name + name := v.Value.GetName() if _, ok := v.Value.(*track.Video); ok && !pubConfig.PubVideo { v.Reject(ErrTrackMute) continue @@ -575,9 +607,7 @@ func (s *Stream) run() { } else { s.Subscribers.Dispose() s.Tracks.Range(func(_ string, t Track) { - if dt, ok := t.(track.Custom); ok { - dt.Dispose() - } + t.Dispose() }) return } diff --git a/subscriber.go b/subscriber.go index e56ef9c..536e082 100644 --- a/subscriber.go +++ b/subscriber.go @@ -148,7 +148,7 @@ func (s *Subscriber) OnEvent(event any) { } func (s *Subscriber) CreateTrackReader(t *track.Media) (result *track.AVRingReader) { - result = track.NewAVRingReader(t, s.Config.Poll) + result = track.NewAVRingReader(t) result.Logger = s.With(zap.String("track", t.Name)) return } @@ -170,7 +170,7 @@ func (s *Subscriber) AddTrack(t Track) bool { default: return false } - s.Info("track+1", zap.String("name", t.GetBase().Name)) + s.Info("track+1", zap.String("name", t.GetName())) return true } @@ -236,11 +236,12 @@ func (s *Subscriber) PlayBlock(subType byte) { var videoSeq, audioSeq uint16 sendVideoFrame = func(frame *AVFrame) { // fmt.Println("v", frame.Sequence, frame.AbsTime, s.VideoReader.AbsTime, frame.IFrame) + delta := uint32(s.VideoReader.SkipTs * 90 / time.Millisecond) frame.RTP.Range(func(vp RTPFrame) bool { videoSeq++ copy := *vp.Packet vp.Packet = © - vp.Header.Timestamp = vp.Header.Timestamp - uint32(s.VideoReader.SkipTs*90/time.Millisecond) + vp.Header.Timestamp = vp.Header.Timestamp - delta vp.Header.SequenceNumber = videoSeq spesic.OnEvent((VideoRTP)(vp)) return true @@ -249,12 +250,13 @@ func (s *Subscriber) PlayBlock(subType byte) { sendAudioFrame = func(frame *AVFrame) { // fmt.Println("a", frame.Sequence, frame.AbsTime, s.AudioReader.AbsTime) + delta := uint32(s.AudioReader.SkipTs / time.Millisecond * time.Duration(s.AudioReader.Track.SampleRate) / 1000) frame.RTP.Range(func(ap RTPFrame) bool { audioSeq++ copy := *ap.Packet ap.Packet = © ap.Header.SequenceNumber = audioSeq - ap.Header.Timestamp = ap.Header.Timestamp - uint32(s.AudioReader.SkipTs/time.Millisecond*time.Duration(s.AudioReader.Track.SampleRate)/1000) + ap.Header.Timestamp = ap.Header.Timestamp - delta spesic.OnEvent((AudioRTP)(ap)) return true }) diff --git a/track/base.go b/track/base.go index 193a617..c166967 100644 --- a/track/base.go +++ b/track/base.go @@ -8,13 +8,16 @@ import ( "go.uber.org/zap" . "m7s.live/engine/v4/common" "m7s.live/engine/v4/config" + "m7s.live/engine/v4/log" "m7s.live/engine/v4/util" ) -type EmptyLocker struct{} +type emptyLocker struct{} -func (EmptyLocker) Lock() {} -func (EmptyLocker) Unlock() {} +func (emptyLocker) Lock() {} +func (emptyLocker) Unlock() {} + +var EmptyLocker emptyLocker type 流速控制 struct { 起始时间戳 time.Duration @@ -70,25 +73,24 @@ type SpesificTrack interface { } type IDRingList struct { - util.List[*util.Ring[AVFrame]] + IDRList util.List[*util.Ring[AVFrame]] IDRing *util.Ring[AVFrame] HistoryRing *util.Ring[AVFrame] } func (p *IDRingList) AddIDR(IDRing *util.Ring[AVFrame]) { - p.PushValue(IDRing) + p.IDRList.PushValue(IDRing) p.IDRing = IDRing } func (p *IDRingList) ShiftIDR() { - p.Shift() - p.HistoryRing = p.Next.Value + p.IDRList.Shift() + p.HistoryRing = p.IDRList.Next.Value } // Media 基础媒体Track类 type Media struct { - Base - RingBuffer[AVFrame] + Base[AVFrame] PayloadType byte IDRingList `json:"-" yaml:"-"` //最近的关键帧位置,首屏渲染 SSRC uint32 @@ -104,6 +106,10 @@ type Media struct { 流速控制 } +func (av *Media) Dispose() { + av.Value.Broadcast() +} + func (av *Media) GetFromPool(b util.IBytes) (item *util.ListItem[util.Buffer]) { if b.Reuse() { item = av.BytesPool.Get(b.Len()) @@ -114,10 +120,6 @@ func (av *Media) GetFromPool(b util.IBytes) (item *util.ListItem[util.Buffer]) { return } -func (av *Media) GetRBSize() int { - return av.RingBuffer.Size -} - func (av *Media) GetRTPFromPool() (result *util.ListItem[RTPFrame]) { result = av.RtpPool.Get() if result.Value.Packet == nil { @@ -158,7 +160,7 @@ func (av *Media) SetStuff(stuff ...any) { switch v := s.(type) { case int: av.Init(v) - av.Value.L = EmptyLocker{} + av.Value.L = EmptyLocker av.SSRC = uint32(uintptr(unsafe.Pointer(av))) av.等待上限 = config.Global.SpeedLimit case uint32: @@ -179,17 +181,6 @@ func (av *Media) LastWriteTime() time.Time { return av.LastValue.WriteTime } -// func (av *Media) Play(ctx context.Context, onMedia func(*AVFrame) error) error { -// for ar := av.ReadRing(); ctx.Err() == nil; ar.MoveNext() { -// ap := ar.Read(ctx) -// if err := onMedia(ap); err != nil { -// // TODO: log err -// return err -// } -// } -// return ctx.Err() -// } - func (av *Media) CurrentFrame() *AVFrame { return &av.Value } @@ -216,9 +207,11 @@ func (av *Media) AppendAuBytes(b ...[]byte) { func (av *Media) narrow(gop int) { if l := av.Size - gop; l > 12 { - // av.Debug("resize", zap.Int("before", av.Size), zap.Int("after", av.Size-5)) + if log.Trace { + av.Trace("resize", zap.Int("before", av.Size), zap.Int("after", av.Size-5)) + } //缩小缓冲环节省内存 - av.Reduce(5).Do(func(v AVFrame) { + av.Reduce(5).Do(func(v *AVFrame) { v.Reset() }) } @@ -283,16 +276,20 @@ func (av *Media) Flush() { curValue.DeltaTime = uint32(deltaTS(curValue.Timestamp, preValue.Timestamp) / time.Millisecond) } - av.Trace("write", zap.Uint32("seq", curValue.Sequence), zap.Duration("dts", curValue.DTS), zap.Duration("dts delta", curValue.DTS-preValue.DTS), zap.Uint32("delta", curValue.DeltaTime), zap.Duration("timestamp", curValue.Timestamp)) + if log.Trace { + av.Trace("write", zap.Uint32("seq", curValue.Sequence), zap.Duration("dts", curValue.DTS), zap.Duration("dts delta", curValue.DTS-preValue.DTS), zap.Uint32("delta", curValue.DeltaTime), zap.Duration("timestamp", curValue.Timestamp), zap.Int("au", curValue.AUList.Length), zap.Int("rtp", curValue.RTP.Length), zap.Int("avcc", curValue.AVCC.ByteLength), zap.Int("raw", curValue.AUList.ByteLength), zap.Int("bps", av.BPS)) + } bufferTime := av.Stream.GetPublisherConfig().BufferTime - if bufferTime > 0 && av.IDRingList.Length > 1 && deltaTS(curValue.Timestamp, av.IDRingList.Next.Next.Value.Value.Timestamp) > bufferTime { + if bufferTime > 0 && av.IDRingList.IDRList.Length > 1 && deltaTS(curValue.Timestamp, av.IDRingList.IDRList.Next.Next.Value.Value.Timestamp) > bufferTime { av.ShiftIDR() av.narrow(int(curValue.Sequence - av.HistoryRing.Value.Sequence)) } // 下一帧为订阅起始帧,即将覆盖,需要扩环 if nextValue == av.IDRing || nextValue == av.HistoryRing { // if av.AVRing.Size < 512 { - // av.Stream.Debug("resize", zap.Int("before", av.Size), zap.Int("after", av.Size+5), zap.String("name", av.Name)) + if log.Trace { + av.Stream.Trace("resize", zap.Int("before", av.Size), zap.Int("after", av.Size+5), zap.String("name", av.Name)) + } av.Glow(5) // } else { // av.Stream.Error("sub ring overflow", zap.Int("size", av.AVRing.Size), zap.String("name", av.Name)) @@ -309,7 +306,8 @@ func (av *Media) Flush() { av.CompleteAVCC(curValue) } } - av.Base.Flush(&curValue.BaseFrame) + av.ComputeBPS(curValue.BytesIn) + curValue.WriteTime = time.Now() if av.等待上限 > 0 { av.控制流速(curValue.Timestamp, curValue.DTS) } @@ -318,7 +316,7 @@ func (av *Media) Flush() { curValue.CanRead = false curValue.Reset() if curValue.L == nil { - curValue.L = EmptyLocker{} + curValue.L = EmptyLocker } curValue.Sequence = av.MoveCount preValue.CanRead = true diff --git a/track/data.go b/track/data.go index c051e99..47d1ea5 100644 --- a/track/data.go +++ b/track/data.go @@ -7,30 +7,13 @@ import ( "go.uber.org/zap" . "m7s.live/engine/v4/common" + "m7s.live/engine/v4/log" "m7s.live/engine/v4/util" ) -type Custom interface { - Track - Dispose() -} - type Data[T any] struct { - Base - LockRing[T] - sync.Locker // 写入锁,可选,单一协程写入可以不加锁 -} - -func (d *Data[T]) GetRBSize() int { - return d.LockRing.RingBuffer.Size -} - -func (d *Data[T]) ReadRing() *LockRing[T] { - return util.Clone(d.LockRing) -} - -func (d *Data[T]) LastWriteTime() time.Time { - return d.LockRing.RingBuffer.LastValue.WriteTime + Base[DataFrame[T]] + sync.Locker `json:"-" yaml:"-"` // 写入锁,可选,单一协程写入可以不加锁 } func (dt *Data[T]) Push(data T) { @@ -38,21 +21,43 @@ func (dt *Data[T]) Push(data T) { dt.Lock() defer dt.Unlock() } - dt.Value.WriteTime = time.Now() - dt.Write(data) + curValue := &dt.Value + if log.Trace { + dt.Trace("push data", zap.Uint32("sequence", curValue.Sequence)) + } + curValue.WriteTime = time.Now() + curValue.Data = data + preValue := curValue + curValue = dt.MoveNext() + curValue.CanRead = false + curValue.Reset() + if curValue.L == nil { + curValue.L = EmptyLocker + } + curValue.Sequence = dt.MoveCount + preValue.CanRead = true + preValue.Broadcast() } -func (d *Data[T]) Play(ctx context.Context, onData func(*DataFrame[T]) error) error { - for r := d.ReadRing(); ctx.Err() == nil; r.MoveNext() { - p := r.Read() - if *r.Flag == 2 { - break - } - if err := onData(p); err != nil { - return err - } +func (d *Data[T]) Play(ctx context.Context, onData func(*DataFrame[T]) error) (err error) { + d.Debug("play data track") + reader := DataReader[T]{ + Ctx: ctx, + Ring: d.Ring, + } + for { + curValue := reader.Read() + if err = ctx.Err(); err != nil { + return + } + if log.Trace { + d.Trace("read data", zap.Uint32("sequence", curValue.Sequence)) + } + if err = onData(curValue); err == nil { + err = ctx.Err() + } + reader.MoveNext() } - return ctx.Err() } func (d *Data[T]) Attach(s IStream) { @@ -64,9 +69,70 @@ func (d *Data[T]) Attach(s IStream) { } } +func (d *Data[T]) Dispose() { + d.Value.Broadcast() +} + +func (d *Data[T]) LastWriteTime() time.Time { + return d.LastValue.WriteTime +} + func NewDataTrack[T any](name string) (dt *Data[T]) { dt = &Data[T]{} dt.Init(10) + dt.Value.L = EmptyLocker + dt.SetStuff(name) + return +} + +type RecycleData[T util.Recyclable] struct { + Data[T] +} + +func (dt *RecycleData[T]) Push(data T) { + if dt.Locker != nil { + dt.Lock() + defer dt.Unlock() + } + curValue := &dt.Value + if log.Trace { + dt.Trace("push data", zap.Uint32("sequence", curValue.Sequence)) + } + curValue.WriteTime = time.Now() + curValue.Data = data + preValue := curValue + curValue = dt.MoveNext() + curValue.CanRead = false + curValue.Reset() + if curValue.L == nil { + curValue.L = EmptyLocker + } else { + curValue.Data.Recycle() + } + curValue.Sequence = dt.MoveCount + preValue.CanRead = true + preValue.Broadcast() +} + +func NewRecycleDataTrack[T util.Recyclable](name string) (dt *RecycleData[T]) { + dt = &RecycleData[T]{} + dt.Init(10) + dt.Value.L = EmptyLocker + dt.SetStuff(name) + return +} + +type BytesData struct { + RecycleData[*util.ListItem[util.Buffer]] + Pool util.BytesPool +} + +func NewBytesDataTrack(name string) (dt *BytesData) { + dt = &BytesData{ + Pool: make(util.BytesPool, 17), + } + dt.Init(10) + dt.Value.L = EmptyLocker dt.SetStuff(name) return } diff --git a/track/h264.go b/track/h264.go index 5bdcbc5..35459bb 100644 --- a/track/h264.go +++ b/track/h264.go @@ -7,6 +7,7 @@ import ( "go.uber.org/zap" "m7s.live/engine/v4/codec" . "m7s.live/engine/v4/common" + "m7s.live/engine/v4/log" "m7s.live/engine/v4/util" ) @@ -32,7 +33,9 @@ func NewH264(stream IStream, stuff ...any) (vt *H264) { func (vt *H264) WriteSliceBytes(slice []byte) { naluType := codec.ParseH264NALUType(slice[0]) - vt.Trace("naluType", zap.Uint8("naluType", naluType.Byte())) + if log.Trace { + vt.Trace("naluType", zap.Uint8("naluType", naluType.Byte())) + } switch naluType { case codec.NALU_SPS: spsInfo, _ := codec.ParseSPS(slice) diff --git a/track/h265.go b/track/h265.go index 138a230..aee72d8 100644 --- a/track/h265.go +++ b/track/h265.go @@ -79,7 +79,7 @@ func (vt *H265) WriteSliceBytes(slice []byte) { func (vt *H265) writeSequenceHead(head []byte) (err error) { vt.WriteSequenceHead(head) if vt.VPS, vt.SPS, vt.PPS, err = codec.ParseVpsSpsPpsFromSeqHeaderWithoutMalloc(vt.SequenceHead); err == nil { - vt.SPSInfo, _ = codec.ParseHevcSPS(vt.SequenceHead) + vt.SPSInfo, _ = codec.ParseHevcSPS(vt.SPS) vt.nalulenSize = (int(vt.SequenceHead[26]) & 0x03) + 1 } else { vt.Error("H265 ParseVpsSpsPps Error") @@ -94,6 +94,7 @@ func (vt *H265) WriteAVCC(ts uint32, frame *util.BLL) (err error) { } b0 := frame.GetByte(0) if isExtHeader := (b0 >> 4) & 0b1000; isExtHeader != 0 { + firstBuffer := frame.Next.Value packetType := b0 & 0b1111 switch packetType { case codec.PacketTypeSequenceStart: @@ -107,18 +108,18 @@ func (vt *H265) WriteAVCC(ts uint32, frame *util.BLL) (err error) { frame.Recycle() return case codec.PacketTypeCodedFrames: - frame.Next.Value[0] = b0 & 0b0111_1111 & 0xFC - frame.Next.Value[1] = 0x01 - copy(frame.Next.Value[2:], frame.Next.Value[5:]) - frame.Next.Value = frame.Next.Value[:frame.Next.Value.Len()-3] + firstBuffer[0] = b0 & 0b0111_1111 & 0xFC + firstBuffer[1] = 0x01 + copy(firstBuffer[2:], firstBuffer[5:]) + frame.Next.Value = firstBuffer[:firstBuffer.Len()-3] frame.ByteLength -= 3 return vt.Video.WriteAVCC(ts, frame) case codec.PacketTypeCodedFramesX: - frame.Next.Value[0] = b0 & 0b0111_1111 & 0xFC - frame.Next.Value[1] = 0x01 - frame.Next.Value[2] = 0 - frame.Next.Value[3] = 0 - frame.Next.Value[4] = 0 + firstBuffer[0] = b0 & 0b0111_1111 & 0xFC + firstBuffer[1] = 0x01 + firstBuffer[2] = 0 + firstBuffer[3] = 0 + firstBuffer[4] = 0 return vt.Video.WriteAVCC(ts, frame) } } else { diff --git a/track/reader-av.go b/track/reader-av.go index a0bc450..39a426a 100644 --- a/track/reader-av.go +++ b/track/reader-av.go @@ -2,7 +2,6 @@ package track import ( "context" - "runtime" "time" "go.uber.org/zap" @@ -24,9 +23,10 @@ const ( type AVRingReader struct { ctx context.Context + mode int Track *Media *util.Ring[common.AVFrame] - wait func() + // wait func() State byte FirstSeq uint32 FirstTs time.Duration @@ -44,17 +44,17 @@ func (r *AVRingReader) DecConfChanged() bool { return r.ConfSeq != r.Track.SequenceHeadSeq } -func NewAVRingReader(t *Media, poll time.Duration) *AVRingReader { +func NewAVRingReader(t *Media) *AVRingReader { r := &AVRingReader{ Track: t, } - if poll == 0 { - r.wait = runtime.Gosched - } else { - r.wait = func() { - time.Sleep(poll) - } - } + // if poll == 0 { + // r.wait = runtime.Gosched + // } else { + // r.wait = func() { + // time.Sleep(poll) + // } + // } return r } @@ -63,7 +63,7 @@ func (r *AVRingReader) ReadFrame() *common.AVFrame { r.Frame.Wait() } // 超过一半的缓冲区大小,说明Reader太慢,需要丢帧 - if r.State == READSTATE_NORMAL && r.Track.LastValue.Sequence-r.Frame.Sequence > uint32(r.Track.Size/2) && r.Track.IDRing != nil && r.Track.IDRing.Value.Sequence > r.Frame.Sequence { + if r.mode != SUBMODE_BUFFER && r.State == READSTATE_NORMAL && r.Track.LastValue.Sequence-r.Frame.Sequence > uint32(r.Track.Size/2) && r.Track.IDRing != nil && r.Track.IDRing.Value.Sequence > r.Frame.Sequence { r.Warn("reader too slow", zap.Uint32("lastSeq", r.Track.LastValue.Sequence), zap.Uint32("seq", r.Frame.Sequence)) r.Ring = r.Track.IDRing return r.ReadFrame() @@ -84,6 +84,7 @@ func (r *AVRingReader) MoveNext() { func (r *AVRingReader) Read(ctx context.Context, mode int) (err error) { r.ctx = ctx + r.mode = mode switch r.State { case READSTATE_INIT: r.Info("start read", zap.Int("mode", mode)) diff --git a/track/reader-data.go b/track/reader-data.go new file mode 100644 index 0000000..ddc7eec --- /dev/null +++ b/track/reader-data.go @@ -0,0 +1,37 @@ +package track + +import ( + "context" + + "m7s.live/engine/v4/common" + "m7s.live/engine/v4/util" +) + +type DataReader[T any] struct { + Ctx context.Context + // common.Track + *util.Ring[common.DataFrame[T]] + // startTime time.Time + // Frame *common.DataFrame[T] + // Delay uint32 + // *log.Logger +} + +func (r *DataReader[T]) Read() (item *common.DataFrame[T]) { + item = &r.Value + if r.Ctx.Err() == nil && !item.CanRead { + item.Wait() + } + return +} + +func (r *DataReader[T]) TryRead() (item *common.DataFrame[T]) { + if item = &r.Value; item.CanRead { + return + } + return nil +} + +func (r *DataReader[T]) MoveNext() { + r.Ring = r.Next() +} diff --git a/track/rtp.go b/track/rtp.go index b956a6b..b894bbd 100644 --- a/track/rtp.go +++ b/track/rtp.go @@ -39,7 +39,7 @@ func (av *Media) WriteRTP(raw *util.ListItem[RTPFrame]) { av.WriteRTPFrame(&frame.Value) // av.Info("rtp", zap.Uint32("ts", (frame.Value.Timestamp)), zap.Int("len", len(frame.Value.Payload)), zap.Bool("marker", frame.Value.Marker), zap.Uint16("seq", frame.Value.SequenceNumber)) } else { - av.Warn("rtp payload is empty", zap.Uint32("ts", (frame.Value.Timestamp)), zap.Any("ext", frame.Value.GetExtensionIDs()), zap.Uint16("seq", frame.Value.SequenceNumber)) + av.Debug("rtp payload is empty", zap.Uint32("ts", (frame.Value.Timestamp)), zap.Any("ext", frame.Value.GetExtensionIDs()), zap.Uint16("seq", frame.Value.SequenceNumber)) frame.Recycle() } } diff --git a/track/video.go b/track/video.go index 2e29a9b..4c913a5 100644 --- a/track/video.go +++ b/track/video.go @@ -24,8 +24,9 @@ type Video struct { lostFlag bool // 是否丢帧 codec.SPSInfo ParamaterSets `json:"-" yaml:"-"` - SPS []byte `json:"-" yaml:"-"` - PPS []byte `json:"-" yaml:"-"` + SPS []byte `json:"-" yaml:"-"` + PPS []byte `json:"-" yaml:"-"` + SEIReader *DataReader[[]byte] `json:"-" yaml:"-"` } func (v *Video) Attach() { @@ -196,7 +197,7 @@ func (vt *Video) insertDCRtp() { packet.SSRC = vt.SSRC packet.Timestamp = uint32(vt.Value.PTS) packet.Marker = false - head.InsertBeforeValue(RTPFrame{&packet, nil}) + head.InsertBeforeValue(RTPFrame{Packet: &packet}) } } @@ -247,6 +248,15 @@ func (vt *Video) CompleteAVCC(rv *AVFrame) { func (vt *Video) Flush() { rv := &vt.Value + if vt.SEIReader != nil { + if seiFrame := vt.SEIReader.TryRead(); seiFrame != nil { + var au util.BLL + content := vt.BytesPool.GetShell(seiFrame.Data) + au.Push(content) + vt.Value.AUList.UnshiftValue(&au) + vt.SEIReader.MoveNext() + } + } if rv.IFrame { vt.computeGOP() vt.Stream.SetIDR(vt) diff --git a/wait-tracks.go b/wait-tracks.go index e71211e..301de12 100644 --- a/wait-tracks.go +++ b/wait-tracks.go @@ -1,6 +1,7 @@ package engine import ( + "m7s.live/engine/v4/common" . "m7s.live/engine/v4/common" "m7s.live/engine/v4/track" "m7s.live/engine/v4/util" @@ -68,15 +69,15 @@ func (w *waitTracks) Accept(t Track) bool { suber := w.Promise.Value switch t.(type) { case *track.Audio: - if w.audio.Accept(t.GetBase().Name) { + if w.audio.Accept(t.GetName()) { suber.OnEvent(t) } case *track.Video: - if w.video.Accept(t.GetBase().Name) { + if w.video.Accept(t.GetName()) { suber.OnEvent(t) } - case track.Custom: - w.data.Accept(t.GetBase().Name) + case common.Track: + w.data.Accept(t.GetName()) suber.OnEvent(t) } if w.NeedWait() {