mirror of
https://github.com/Monibuca/engine.git
synced 2025-10-05 16:46:58 +08:00
feat: add insert sei
feat: add engine init done event refactor: remove ring_lock fix: retry connect to console fix: h265 sps parse error fix: concurrent publish desc: - 增加插入SEI帧的功能 - 增加engine初始化完成事件 - 删除ring_lock,DataTrack和MediaTrack共用一个ring - 修复console无限重连导致远程服务器崩溃问题 - 修复h265 sps解析错误问题 - 修复并发发布导致的问题
This commit is contained in:
@@ -37,6 +37,7 @@
|
|||||||
- 获取所有远端拉流信息 `/api/list/pull` 返回{RemoteURL:"",StreamPath:"",Type:"",StartTime:""}
|
- 获取所有远端拉流信息 `/api/list/pull` 返回{RemoteURL:"",StreamPath:"",Type:"",StartTime:""}
|
||||||
- 获取所有向远端推流信息 `/api/list/push` 返回{RemoteURL:"",StreamPath:"",Type:"",StartTime:""}
|
- 获取所有向远端推流信息 `/api/list/push` 返回{RemoteURL:"",StreamPath:"",Type:"",StartTime:""}
|
||||||
- 停止推流 `/api/stoppush?url=xxx` 停止向xxx推流 ,成功返回ok
|
- 停止推流 `/api/stoppush?url=xxx` 停止向xxx推流 ,成功返回ok
|
||||||
|
- 插入SEI帧 `/api/insertsei?streamPath=xxx&type=5` 向xxx流内插入SEI帧 ,成功返回ok。type为SEI类型,可选,默认是5
|
||||||
# 引擎默认配置
|
# 引擎默认配置
|
||||||
```yaml
|
```yaml
|
||||||
global:
|
global:
|
||||||
@@ -58,12 +59,12 @@ global:
|
|||||||
pubaudio: true # 是否发布音频流
|
pubaudio: true # 是否发布音频流
|
||||||
pubvideo: true # 是否发布视频流
|
pubvideo: true # 是否发布视频流
|
||||||
kickexist: false # 剔出已经存在的发布者,用于顶替原有发布者
|
kickexist: false # 剔出已经存在的发布者,用于顶替原有发布者
|
||||||
|
insertsei: false # 是否开启插入SEI信息功能
|
||||||
publishtimeout: 10s # 发布流默认过期时间,超过该时间发布者没有恢复流将被删除
|
publishtimeout: 10s # 发布流默认过期时间,超过该时间发布者没有恢复流将被删除
|
||||||
delayclosetimeout: 0 # 自动关闭触发后延迟的时间(期间内如果有新的订阅则取消触发关闭),0为关闭该功能,保持连接。
|
delayclosetimeout: 0 # 自动关闭触发后延迟的时间(期间内如果有新的订阅则取消触发关闭),0为关闭该功能,保持连接。
|
||||||
waitclosetimeout: 0 # 发布者断开后等待时间,超过该时间发布者没有恢复流将被删除,0为关闭该功能,由订阅者决定是否删除
|
waitclosetimeout: 0 # 发布者断开后等待时间,超过该时间发布者没有恢复流将被删除,0为关闭该功能,由订阅者决定是否删除
|
||||||
buffertime: 0 # 缓存时间,用于时光回溯,0为关闭缓存
|
buffertime: 0 # 缓存时间,用于时光回溯,0为关闭缓存
|
||||||
idletimeout: 0 # 空闲超时时间,0为不限制
|
idletimeout: 0 # 空闲超时时间,0为不限制
|
||||||
poll: 20ms # 订阅者轮询时间,伪自选锁等待周期
|
|
||||||
key: # 发布鉴权key
|
key: # 发布鉴权key
|
||||||
secretargname: secret # 发布鉴权参数名
|
secretargname: secret # 发布鉴权参数名
|
||||||
expireargname: expire # 发布鉴权失效时间参数名
|
expireargname: expire # 发布鉴权失效时间参数名
|
||||||
@@ -79,7 +80,6 @@ global:
|
|||||||
iframeonly: false # 只订阅关键帧
|
iframeonly: false # 只订阅关键帧
|
||||||
waittimeout: 10s # 等待发布者的超时时间,用于订阅尚未发布的流
|
waittimeout: 10s # 等待发布者的超时时间,用于订阅尚未发布的流
|
||||||
writebuffersize: 0 # 订阅者写缓存大小,用于减少io次数,但可能影响实时性
|
writebuffersize: 0 # 订阅者写缓存大小,用于减少io次数,但可能影响实时性
|
||||||
poll: 20ms # 订阅者轮询时间,伪自选锁等待周期
|
|
||||||
key: # 订阅鉴权key
|
key: # 订阅鉴权key
|
||||||
secretargname: secret # 订阅鉴权参数名
|
secretargname: secret # 订阅鉴权参数名
|
||||||
expireargname: expire # 订阅鉴权失效时间参数名
|
expireargname: expire # 订阅鉴权失效时间参数名
|
||||||
|
@@ -103,7 +103,6 @@ var (
|
|||||||
RTMP_KEYFRAME_HEAD = []byte{0x17, 0x01, 0x00, 0x00, 0x00}
|
RTMP_KEYFRAME_HEAD = []byte{0x17, 0x01, 0x00, 0x00, 0x00}
|
||||||
RTMP_NORMALFRAME_HEAD = []byte{0x27, 0x01, 0x00, 0x00, 0x00}
|
RTMP_NORMALFRAME_HEAD = []byte{0x27, 0x01, 0x00, 0x00, 0x00}
|
||||||
)
|
)
|
||||||
var NALU_SEI_BYTE []byte
|
|
||||||
|
|
||||||
// H.264/AVC视频编码标准中,整个系统框架被分为了两个层面:视频编码层面(VCL)和网络抽象层面(NAL)
|
// H.264/AVC视频编码标准中,整个系统框架被分为了两个层面:视频编码层面(VCL)和网络抽象层面(NAL)
|
||||||
// NAL - Network Abstract Layer
|
// NAL - Network Abstract Layer
|
||||||
|
@@ -44,23 +44,24 @@ func (r *RTPFrame) Unmarshal(raw []byte) *RTPFrame {
|
|||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
type BaseFrame struct {
|
type DataFrame[T any] struct {
|
||||||
DeltaTime uint32 // 相对上一帧时间戳,毫秒
|
DeltaTime uint32 // 相对上一帧时间戳,毫秒
|
||||||
WriteTime time.Time // 写入时间,可用于比较两个帧的先后
|
WriteTime time.Time // 写入时间,可用于比较两个帧的先后
|
||||||
Sequence uint32 // 在一个Track中的序号
|
Sequence uint32 // 在一个Track中的序号
|
||||||
BytesIn int // 输入字节数用于计算BPS
|
BytesIn int // 输入字节数用于计算BPS
|
||||||
|
CanRead bool `json:"-" yaml:"-"`
|
||||||
|
Data T `json:"-" yaml:"-"`
|
||||||
|
sync.Cond `json:"-" yaml:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type DataFrame[T any] struct {
|
func (df *DataFrame[T]) Reset() {
|
||||||
BaseFrame
|
df.BytesIn = 0
|
||||||
Value T `json:"-" yaml:"-"`
|
df.DeltaTime = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
type AVFrame struct {
|
type AVFrame struct {
|
||||||
BaseFrame
|
DataFrame[any]
|
||||||
IFrame bool
|
IFrame bool
|
||||||
CanRead bool `json:"-" yaml:"-"`
|
|
||||||
sync.Cond `json:"-" yaml:"-"`
|
|
||||||
PTS time.Duration
|
PTS time.Duration
|
||||||
DTS time.Duration
|
DTS time.Duration
|
||||||
Timestamp time.Duration // 绝对时间戳
|
Timestamp time.Duration // 绝对时间戳
|
||||||
@@ -68,7 +69,6 @@ type AVFrame struct {
|
|||||||
AVCC util.BLL `json:"-" yaml:"-"` // 打包好的AVCC格式(MPEG-4格式、Byte-Stream Format)
|
AVCC util.BLL `json:"-" yaml:"-"` // 打包好的AVCC格式(MPEG-4格式、Byte-Stream Format)
|
||||||
RTP util.List[RTPFrame] `json:"-" yaml:"-"`
|
RTP util.List[RTPFrame] `json:"-" yaml:"-"`
|
||||||
AUList util.BLLs `json:"-" yaml:"-"` // 裸数据
|
AUList util.BLLs `json:"-" yaml:"-"` // 裸数据
|
||||||
Extras any `json:"-" yaml:"-"` // 任意扩展数据
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (av *AVFrame) WriteAVCC(ts uint32, frame *util.BLL) {
|
func (av *AVFrame) WriteAVCC(ts uint32, frame *util.BLL) {
|
||||||
@@ -97,9 +97,8 @@ func (av *AVFrame) Reset() {
|
|||||||
av.ADTS.Recycle()
|
av.ADTS.Recycle()
|
||||||
av.ADTS = nil
|
av.ADTS = nil
|
||||||
}
|
}
|
||||||
av.BytesIn = 0
|
|
||||||
av.Timestamp = 0
|
av.Timestamp = 0
|
||||||
av.DeltaTime = 0
|
av.DataFrame.Reset()
|
||||||
}
|
}
|
||||||
|
|
||||||
type ParamaterSets [][]byte
|
type ParamaterSets [][]byte
|
||||||
|
@@ -22,24 +22,25 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// Base 基础Track类
|
// Base 基础Track类
|
||||||
type Base struct {
|
type Base[T any] struct {
|
||||||
Name string
|
RingBuffer[T] `json:"-" yaml:"-"`
|
||||||
log.Zap `json:"-" yaml:"-"`
|
Name string
|
||||||
Stream IStream `json:"-" yaml:"-"`
|
log.Zap `json:"-" yaml:"-"`
|
||||||
Attached atomic.Bool `json:"-" yaml:"-"`
|
Stream IStream `json:"-" yaml:"-"`
|
||||||
State TrackState
|
Attached atomic.Bool `json:"-" yaml:"-"`
|
||||||
ts time.Time
|
State TrackState
|
||||||
bytes int
|
ts time.Time
|
||||||
frames int
|
bytes int
|
||||||
DropCount int `json:"-" yaml:"-"` //丢帧数
|
frames int
|
||||||
BPS int
|
DropCount int `json:"-" yaml:"-"` //丢帧数
|
||||||
FPS int
|
BPS int
|
||||||
Drops int // 丢帧率
|
FPS int
|
||||||
RawSize int // 裸数据长度
|
Drops int // 丢帧率
|
||||||
RawPart []int // 裸数据片段用于UI上显示
|
RawSize int // 裸数据长度
|
||||||
|
RawPart []int // 裸数据片段用于UI上显示
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bt *Base) ComputeBPS(bytes int) {
|
func (bt *Base[T]) ComputeBPS(bytes int) {
|
||||||
bt.bytes += bytes
|
bt.bytes += bytes
|
||||||
bt.frames++
|
bt.frames++
|
||||||
if elapse := time.Since(bt.ts).Seconds(); elapse > 1 {
|
if elapse := time.Since(bt.ts).Seconds(); elapse > 1 {
|
||||||
@@ -53,22 +54,31 @@ func (bt *Base) ComputeBPS(bytes int) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bt *Base) GetBase() *Base {
|
func (bt *Base[T]) GetName() string {
|
||||||
return bt
|
return bt.Name
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bt *Base[T]) GetBPS() int {
|
||||||
|
return bt.BPS
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bt *Base[T]) GetFPS() int {
|
||||||
|
return bt.FPS
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bt *Base[T]) GetDrops() int {
|
||||||
|
return bt.Drops
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetRBSize 获取缓冲区大小
|
// GetRBSize 获取缓冲区大小
|
||||||
func (bt *Base) GetRBSize() int {
|
func (bt *Base[T]) GetRBSize() int {
|
||||||
return 0
|
return bt.RingBuffer.Size
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bt *Base) SnapForJson() {
|
func (bt *Base[T]) SnapForJson() {
|
||||||
}
|
}
|
||||||
func (bt *Base) Flush(bf *BaseFrame) {
|
|
||||||
bt.ComputeBPS(bf.BytesIn)
|
func (bt *Base[T]) SetStuff(stuff ...any) {
|
||||||
bf.WriteTime = time.Now()
|
|
||||||
}
|
|
||||||
func (bt *Base) SetStuff(stuff ...any) {
|
|
||||||
for _, s := range stuff {
|
for _, s := range stuff {
|
||||||
switch v := s.(type) {
|
switch v := s.(type) {
|
||||||
case IStream:
|
case IStream:
|
||||||
@@ -83,11 +93,15 @@ func (bt *Base) SetStuff(stuff ...any) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Track interface {
|
type Track interface {
|
||||||
GetBase() *Base
|
GetName() string
|
||||||
|
GetBPS() int
|
||||||
|
GetFPS() int
|
||||||
|
GetDrops() int
|
||||||
LastWriteTime() time.Time
|
LastWriteTime() time.Time
|
||||||
SnapForJson()
|
SnapForJson()
|
||||||
SetStuff(stuff ...any)
|
SetStuff(stuff ...any)
|
||||||
GetRBSize() int
|
GetRBSize() int
|
||||||
|
Dispose()
|
||||||
}
|
}
|
||||||
|
|
||||||
type AVTrack interface {
|
type AVTrack interface {
|
||||||
|
@@ -34,8 +34,11 @@ func (rb *RingBuffer[T]) Glow(size int) (newItem *util.Ring[T]) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rb *RingBuffer[T]) Reduce(size int) (newItem *util.Ring[T]) {
|
func (rb *RingBuffer[T]) Reduce(size int) (newItem *RingBuffer[T]) {
|
||||||
newItem = rb.Unlink(size)
|
newItem = &RingBuffer[T]{
|
||||||
|
Ring: rb.Unlink(size),
|
||||||
|
Size: size,
|
||||||
|
}
|
||||||
rb.Size -= size
|
rb.Size -= size
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -45,7 +48,7 @@ func (rb *RingBuffer[T]) Reduce(size int) (newItem *util.Ring[T]) {
|
|||||||
func (rb *RingBuffer[T]) Do(f func(*T)) {
|
func (rb *RingBuffer[T]) Do(f func(*T)) {
|
||||||
if rb != nil {
|
if rb != nil {
|
||||||
f(&rb.Value)
|
f(&rb.Value)
|
||||||
for p := rb.Next(); p != rb.Ring; p = rb.Next() {
|
for p := rb.Next(); p != rb.Ring; p = p.Next() {
|
||||||
f(&p.Value)
|
f(&p.Value)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -1,77 +0,0 @@
|
|||||||
package common
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
)
|
|
||||||
|
|
||||||
type LockFrame[T any] struct {
|
|
||||||
DataFrame[T]
|
|
||||||
sync.RWMutex
|
|
||||||
}
|
|
||||||
|
|
||||||
type LockRing[T any] struct {
|
|
||||||
RingBuffer[LockFrame[T]]
|
|
||||||
Reset func(*DataFrame[T]) `json:"-" yaml:"-"`
|
|
||||||
Flag *int32
|
|
||||||
}
|
|
||||||
|
|
||||||
func (lr *LockRing[T]) Init(n int) *LockRing[T] {
|
|
||||||
var flag int32
|
|
||||||
if lr == nil {
|
|
||||||
lr = &LockRing[T]{}
|
|
||||||
}
|
|
||||||
lr.Reset = func(*DataFrame[T]) {
|
|
||||||
}
|
|
||||||
lr.RingBuffer.Init(n)
|
|
||||||
lr.Flag = &flag
|
|
||||||
lr.RingBuffer.Value.Lock()
|
|
||||||
return lr
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rb *LockRing[T]) Read() *DataFrame[T] {
|
|
||||||
current := &rb.RingBuffer.Value
|
|
||||||
current.RLock()
|
|
||||||
defer current.RUnlock()
|
|
||||||
return ¤t.DataFrame
|
|
||||||
}
|
|
||||||
|
|
||||||
// func (rb *LockRing[T]) Step() {
|
|
||||||
// if atomic.CompareAndSwapInt32(rb.Flag, 0, 1) {
|
|
||||||
// current := rb.RingBuffer.MoveNext()
|
|
||||||
// current.Lock()
|
|
||||||
// rb.RingBuffer.LastValue.Unlock()
|
|
||||||
// //Flag不为1代表被Dispose了,但尚未处理Done
|
|
||||||
// if !atomic.CompareAndSwapInt32(rb.Flag, 1, 0) {
|
|
||||||
// current.Unlock()
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
func (rb *LockRing[T]) Write(value T) {
|
|
||||||
rb.Value.Value = value
|
|
||||||
if atomic.CompareAndSwapInt32(rb.Flag, 0, 1) {
|
|
||||||
current := rb.RingBuffer.MoveNext()
|
|
||||||
current.Lock()
|
|
||||||
if current.Sequence != 0 {
|
|
||||||
rb.Reset(¤t.DataFrame)
|
|
||||||
}
|
|
||||||
current.Sequence = rb.RingBuffer.MoveCount
|
|
||||||
rb.LastValue.Unlock()
|
|
||||||
//Flag不为1代表被Dispose了,但尚未处理Done
|
|
||||||
if !atomic.CompareAndSwapInt32(rb.Flag, 1, 0) {
|
|
||||||
current.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rb *LockRing[T]) Dispose() {
|
|
||||||
current := &rb.RingBuffer.Value
|
|
||||||
if atomic.CompareAndSwapInt32(rb.Flag, 0, 2) {
|
|
||||||
current.Unlock()
|
|
||||||
} else if atomic.CompareAndSwapInt32(rb.Flag, 1, 2) {
|
|
||||||
//当前是1代表正在写入,此时变成2,但是Done的任务得交给NextW来处理
|
|
||||||
} else if atomic.CompareAndSwapInt32(rb.Flag, 0, 2) {
|
|
||||||
current.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
@@ -73,6 +73,7 @@ func (config *HTTP) Listen(ctx context.Context) error {
|
|||||||
} else {
|
} else {
|
||||||
log.Info("🌐 https listen at ", Blink(config.ListenAddrTLS))
|
log.Info("🌐 https listen at ", Blink(config.ListenAddrTLS))
|
||||||
}
|
}
|
||||||
|
cer, _ := tls.X509KeyPair(LocalCert, LocalKey)
|
||||||
var server = http.Server{
|
var server = http.Server{
|
||||||
Addr: config.ListenAddrTLS,
|
Addr: config.ListenAddrTLS,
|
||||||
ReadTimeout: config.ReadTimeout,
|
ReadTimeout: config.ReadTimeout,
|
||||||
@@ -80,6 +81,7 @@ func (config *HTTP) Listen(ctx context.Context) error {
|
|||||||
IdleTimeout: config.IdleTimeout,
|
IdleTimeout: config.IdleTimeout,
|
||||||
Handler: config.mux,
|
Handler: config.mux,
|
||||||
TLSConfig: &tls.Config{
|
TLSConfig: &tls.Config{
|
||||||
|
Certificates: []tls.Certificate{cer},
|
||||||
CipherSuites: []uint16{
|
CipherSuites: []uint16{
|
||||||
tls.TLS_AES_128_GCM_SHA256,
|
tls.TLS_AES_128_GCM_SHA256,
|
||||||
tls.TLS_CHACHA20_POLY1305_SHA256,
|
tls.TLS_CHACHA20_POLY1305_SHA256,
|
||||||
|
@@ -39,7 +39,29 @@ func (w *myResponseWriter3) Hijack() (net.Conn, *bufio.ReadWriter, error) {
|
|||||||
return net.Conn(w), bufio.NewReadWriter(bufio.NewReader(w), bufio.NewWriter(w)), nil
|
return net.Conn(w), bufio.NewReadWriter(bufio.NewReader(w), bufio.NewWriter(w)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cfg *Engine) Remote(ctx context.Context) error {
|
func (cfg *Engine) WtRemote(ctx context.Context) {
|
||||||
|
retryDelay := [...]int{2, 3, 5, 8, 13}
|
||||||
|
for i := 0; ctx.Err() == nil; i++ {
|
||||||
|
connected, err := cfg.Remote(ctx)
|
||||||
|
if err == nil {
|
||||||
|
//不需要重试了,服务器返回了错误
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if Global.LogLang == "zh" {
|
||||||
|
log.Error("连接到控制台服务器", cfg.Server, "失败", err)
|
||||||
|
} else {
|
||||||
|
log.Error("connect to console server ", cfg.Server, " ", err)
|
||||||
|
}
|
||||||
|
if connected {
|
||||||
|
i = 0
|
||||||
|
} else if i >= 5 {
|
||||||
|
i = 4
|
||||||
|
}
|
||||||
|
time.Sleep(time.Second * time.Duration(retryDelay[i]))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cfg *Engine) Remote(ctx context.Context) (wasConnected bool, err error) {
|
||||||
tlsConf := &tls.Config{
|
tlsConf := &tls.Config{
|
||||||
InsecureSkipVerify: true,
|
InsecureSkipVerify: true,
|
||||||
NextProtos: []string{"monibuca"},
|
NextProtos: []string{"monibuca"},
|
||||||
@@ -49,7 +71,7 @@ func (cfg *Engine) Remote(ctx context.Context) error {
|
|||||||
KeepAlivePeriod: time.Second * 10,
|
KeepAlivePeriod: time.Second * 10,
|
||||||
EnableDatagrams: true,
|
EnableDatagrams: true,
|
||||||
})
|
})
|
||||||
wasConnected := err == nil
|
wasConnected = err == nil
|
||||||
if stream := quic.Stream(nil); err == nil {
|
if stream := quic.Stream(nil); err == nil {
|
||||||
if stream, err = conn.OpenStreamSync(ctx); err == nil {
|
if stream, err = conn.OpenStreamSync(ctx); err == nil {
|
||||||
_, err = stream.Write(append([]byte{1}, (cfg.Secret + "\n")...))
|
_, err = stream.Write(append([]byte{1}, (cfg.Secret + "\n")...))
|
||||||
@@ -63,7 +85,7 @@ func (cfg *Engine) Remote(ctx context.Context) error {
|
|||||||
} else {
|
} else {
|
||||||
log.Error("response from console server ", cfg.Server, " ", rMessage["msg"])
|
log.Error("response from console server ", cfg.Server, " ", rMessage["msg"])
|
||||||
}
|
}
|
||||||
return nil
|
return false, nil
|
||||||
} else {
|
} else {
|
||||||
cfg.reportStream = stream
|
cfg.reportStream = stream
|
||||||
if Global.LogLang == "zh" {
|
if Global.LogLang == "zh" {
|
||||||
@@ -90,21 +112,7 @@ func (cfg *Engine) Remote(ctx context.Context) error {
|
|||||||
go cfg.ReceiveRequest(s, conn)
|
go cfg.ReceiveRequest(s, conn)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return wasConnected, err
|
||||||
if err != nil {
|
|
||||||
if wasConnected {
|
|
||||||
if Global.LogLang == "zh" {
|
|
||||||
log.Error("连接到控制台服务器", cfg.Server, "失败", err)
|
|
||||||
} else {
|
|
||||||
log.Error("connect to console server ", cfg.Server, " ", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if ctx.Err() == nil {
|
|
||||||
go cfg.Remote(ctx)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cfg *Engine) ReceiveRequest(s quic.Stream, conn quic.Connection) error {
|
func (cfg *Engine) ReceiveRequest(s quic.Stream, conn quic.Connection) error {
|
||||||
|
@@ -30,6 +30,7 @@ type PushConfig interface {
|
|||||||
type Publish struct {
|
type Publish struct {
|
||||||
PubAudio bool `default:"true"`
|
PubAudio bool `default:"true"`
|
||||||
PubVideo bool `default:"true"`
|
PubVideo bool `default:"true"`
|
||||||
|
InsertSEI bool // 是否启用SEI插入
|
||||||
KickExist bool // 是否踢掉已经存在的发布者
|
KickExist bool // 是否踢掉已经存在的发布者
|
||||||
PublishTimeout time.Duration `default:"10s"` // 发布无数据超时
|
PublishTimeout time.Duration `default:"10s"` // 发布无数据超时
|
||||||
WaitCloseTimeout time.Duration // 延迟自动关闭(等待重连)
|
WaitCloseTimeout time.Duration // 延迟自动关闭(等待重连)
|
||||||
@@ -60,7 +61,7 @@ type Subscribe struct {
|
|||||||
IFrameOnly bool // 只要关键帧
|
IFrameOnly bool // 只要关键帧
|
||||||
WaitTimeout time.Duration `default:"10s"` // 等待流超时
|
WaitTimeout time.Duration `default:"10s"` // 等待流超时
|
||||||
WriteBufferSize int `default:"0"` // 写缓冲大小
|
WriteBufferSize int `default:"0"` // 写缓冲大小
|
||||||
Poll time.Duration `default:"20ms"` // 读取Ring时的轮询间隔,单位毫秒
|
// Poll time.Duration `default:"20ms"` // 读取Ring时的轮询间隔,单位毫秒
|
||||||
Key string // 订阅鉴权key
|
Key string // 订阅鉴权key
|
||||||
SecretArgName string `default:"secret"` // 订阅鉴权参数名
|
SecretArgName string `default:"secret"` // 订阅鉴权参数名
|
||||||
ExpireArgName string `default:"expire"` // 订阅鉴权失效时间参数名
|
ExpireArgName string `default:"expire"` // 订阅鉴权失效时间参数名
|
||||||
@@ -114,7 +115,7 @@ func (p *Push) AddPush(url string, streamPath string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Console struct {
|
type Console struct {
|
||||||
Server string `default:"console.monibuca.com:4242"` //远程控制台地址
|
Server string `default:"console.monibuca.com:44944"` //远程控制台地址
|
||||||
Secret string //远程控制台密钥
|
Secret string //远程控制台密钥
|
||||||
PublicAddr string //公网地址,提供远程控制台访问的地址,不配置的话使用自动识别的地址
|
PublicAddr string //公网地址,提供远程控制台访问的地址,不配置的话使用自动识别的地址
|
||||||
PublicAddrTLS string
|
PublicAddrTLS string
|
||||||
@@ -154,6 +155,7 @@ var Global *Engine
|
|||||||
func (cfg *Engine) InitDefaultHttp() {
|
func (cfg *Engine) InitDefaultHttp() {
|
||||||
Global = cfg
|
Global = cfg
|
||||||
cfg.HTTP.mux = http.DefaultServeMux
|
cfg.HTTP.mux = http.DefaultServeMux
|
||||||
|
cfg.HTTP.ListenAddrTLS = ":8443"
|
||||||
cfg.HTTP.ListenAddr = ":8080"
|
cfg.HTTP.ListenAddr = ":8080"
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -244,7 +246,7 @@ func (cfg *Engine) OnEvent(event any) {
|
|||||||
if strings.HasPrefix(cfg.Console.Server, "wss") {
|
if strings.HasPrefix(cfg.Console.Server, "wss") {
|
||||||
go cfg.WsRemote()
|
go cfg.WsRemote()
|
||||||
} else {
|
} else {
|
||||||
go cfg.Remote(v)
|
go cfg.WtRemote(v)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
31
http.go
31
http.go
@@ -2,6 +2,7 @@ package engine
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
@@ -341,3 +342,33 @@ func (conf *GlobalConfig) API_replay_mp4(w http.ResponseWriter, r *http.Request)
|
|||||||
go pub.ReadMP4Data(f)
|
go pub.ReadMP4Data(f)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (conf *GlobalConfig) API_insertSEI(w http.ResponseWriter, r *http.Request) {
|
||||||
|
q := r.URL.Query()
|
||||||
|
streamPath := q.Get("streamPath")
|
||||||
|
s := Streams.Get(streamPath)
|
||||||
|
if s == nil {
|
||||||
|
http.Error(w, NO_SUCH_STREAM, http.StatusNotFound)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
t := q.Get("type")
|
||||||
|
tb, err := strconv.ParseInt(t, 10, 8)
|
||||||
|
if err != nil {
|
||||||
|
if t == "" {
|
||||||
|
tb = 5
|
||||||
|
} else {
|
||||||
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sei, err := io.ReadAll(r.Body)
|
||||||
|
if err == nil {
|
||||||
|
if s.Tracks.AddSEI(byte(tb), sei) {
|
||||||
|
w.Write([]byte("ok"))
|
||||||
|
} else {
|
||||||
|
http.Error(w, "no sei track", http.StatusBadRequest)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
5
io.go
5
io.go
@@ -165,9 +165,6 @@ func (io *IO) receive(streamPath string, specific IIO) error {
|
|||||||
io.Context, io.CancelFunc = context.WithCancel(Engine)
|
io.Context, io.CancelFunc = context.WithCancel(Engine)
|
||||||
}
|
}
|
||||||
s, create := findOrCreateStream(u.Path, wt)
|
s, create := findOrCreateStream(u.Path, wt)
|
||||||
if s == nil {
|
|
||||||
return ErrBadStreamName
|
|
||||||
}
|
|
||||||
io.Stream = s
|
io.Stream = s
|
||||||
io.Spesific = specific
|
io.Spesific = specific
|
||||||
io.StartTime = time.Now()
|
io.StartTime = time.Now()
|
||||||
@@ -182,6 +179,8 @@ func (io *IO) receive(streamPath string, specific IIO) error {
|
|||||||
conf := v.GetPublisher().Config
|
conf := v.GetPublisher().Config
|
||||||
io.Type = strings.TrimSuffix(io.Type, "Publisher")
|
io.Type = strings.TrimSuffix(io.Type, "Publisher")
|
||||||
io.Info("publish")
|
io.Info("publish")
|
||||||
|
s.pubLocker.Lock()
|
||||||
|
defer s.pubLocker.Unlock()
|
||||||
oldPublisher := s.Publisher
|
oldPublisher := s.Publisher
|
||||||
if oldPublisher != nil && !oldPublisher.IsClosed() {
|
if oldPublisher != nil && !oldPublisher.IsClosed() {
|
||||||
// 根据配置是否剔出原来的发布者
|
// 根据配置是否剔出原来的发布者
|
||||||
|
@@ -95,10 +95,8 @@ func (l *Logger) formatLang(msg *string, fields []zapcore.Field) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (l *Logger) Trace(msg string, fields ...zap.Field) {
|
func (l *Logger) Trace(msg string, fields ...zap.Field) {
|
||||||
if Trace {
|
l.formatLang(&msg, fields)
|
||||||
l.formatLang(&msg, fields)
|
l.Logger.Debug(msg, fields...)
|
||||||
l.Logger.Debug(msg, fields...)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *Logger) Debug(msg string, fields ...zap.Field) {
|
func (l *Logger) Debug(msg string, fields ...zap.Field) {
|
||||||
|
30
main.go
30
main.go
@@ -144,16 +144,16 @@ func Run(ctx context.Context, configFile string) (err error) {
|
|||||||
version = ver
|
version = ver
|
||||||
}
|
}
|
||||||
if EngineConfig.LogLang == "zh" {
|
if EngineConfig.LogLang == "zh" {
|
||||||
log.Info("monibuca 引擎版本:", version, Green(" 启动成功"))
|
log.Info("monibuca ", version, Green(" 启动成功"))
|
||||||
} else {
|
} else {
|
||||||
log.Info("monibuca", version, Green(" start success"))
|
log.Info("monibuca ", version, Green(" start success"))
|
||||||
}
|
}
|
||||||
var enabledPlugins, disabledPlugins []string
|
var enabledPlugins, disabledPlugins []*Plugin
|
||||||
for _, plugin := range plugins {
|
for _, plugin := range plugins {
|
||||||
if plugin.Disabled {
|
if plugin.Disabled {
|
||||||
disabledPlugins = append(disabledPlugins, plugin.Name)
|
disabledPlugins = append(disabledPlugins, plugin)
|
||||||
} else {
|
} else {
|
||||||
enabledPlugins = append(enabledPlugins, plugin.Name)
|
enabledPlugins = append(enabledPlugins, plugin)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if EngineConfig.LogLang == "zh" {
|
if EngineConfig.LogLang == "zh" {
|
||||||
@@ -162,7 +162,7 @@ func Run(ctx context.Context, configFile string) (err error) {
|
|||||||
fmt.Print("enabled plugins:")
|
fmt.Print("enabled plugins:")
|
||||||
}
|
}
|
||||||
for _, plugin := range enabledPlugins {
|
for _, plugin := range enabledPlugins {
|
||||||
fmt.Print(Colorize(" "+plugin+" ", BlackFg|GreenBg|BoldFm), " ")
|
fmt.Print(Colorize(" "+plugin.Name+" ", BlackFg|GreenBg|BoldFm), " ")
|
||||||
}
|
}
|
||||||
fmt.Println()
|
fmt.Println()
|
||||||
if EngineConfig.LogLang == "zh" {
|
if EngineConfig.LogLang == "zh" {
|
||||||
@@ -171,7 +171,7 @@ func Run(ctx context.Context, configFile string) (err error) {
|
|||||||
fmt.Print("disabled plugins:")
|
fmt.Print("disabled plugins:")
|
||||||
}
|
}
|
||||||
for _, plugin := range disabledPlugins {
|
for _, plugin := range disabledPlugins {
|
||||||
fmt.Print(Colorize(" "+plugin+" ", BlackFg|RedBg|CrossedOutFm), " ")
|
fmt.Print(Colorize(" "+plugin.Name+" ", BlackFg|RedBg|CrossedOutFm), " ")
|
||||||
}
|
}
|
||||||
fmt.Println()
|
fmt.Println()
|
||||||
fmt.Println(Bold(Cyan("官网地址: ")), Yellow("https://m7s.live"))
|
fmt.Println(Bold(Cyan("官网地址: ")), Yellow("https://m7s.live"))
|
||||||
@@ -179,6 +179,7 @@ func Run(ctx context.Context, configFile string) (err error) {
|
|||||||
fmt.Println(Bold(Cyan("文档地址: ")), Yellow("https://docs.m7s.live"))
|
fmt.Println(Bold(Cyan("文档地址: ")), Yellow("https://docs.m7s.live"))
|
||||||
fmt.Println(Bold(Cyan("视频教程: ")), Yellow("https://space.bilibili.com/328443019/channel/collectiondetail?sid=514619"))
|
fmt.Println(Bold(Cyan("视频教程: ")), Yellow("https://space.bilibili.com/328443019/channel/collectiondetail?sid=514619"))
|
||||||
fmt.Println(Bold(Cyan("远程界面: ")), Yellow("https://console.monibuca.com"))
|
fmt.Println(Bold(Cyan("远程界面: ")), Yellow("https://console.monibuca.com"))
|
||||||
|
fmt.Println(Yellow("关注公众号:不卡科技,获取更多信息"))
|
||||||
rp := struct {
|
rp := struct {
|
||||||
UUID string `json:"uuid"`
|
UUID string `json:"uuid"`
|
||||||
Machine string `json:"machine"`
|
Machine string `json:"machine"`
|
||||||
@@ -194,17 +195,18 @@ func Run(ctx context.Context, configFile string) (err error) {
|
|||||||
}
|
}
|
||||||
var c http.Client
|
var c http.Client
|
||||||
c.Do(req)
|
c.Do(req)
|
||||||
|
for _, plugin := range enabledPlugins {
|
||||||
|
plugin.Config.OnEvent(EngineConfig) //引擎初始化完成后,通知插件
|
||||||
|
}
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case event := <-EventBus:
|
case event := <-EventBus:
|
||||||
ts := time.Now()
|
ts := time.Now()
|
||||||
for _, plugin := range Plugins {
|
for _, plugin := range enabledPlugins {
|
||||||
if !plugin.Disabled {
|
ts := time.Now()
|
||||||
ts := time.Now()
|
plugin.Config.OnEvent(event)
|
||||||
plugin.Config.OnEvent(event)
|
if cost := time.Since(ts); cost > time.Millisecond*100 {
|
||||||
if cost := time.Since(ts); cost > time.Millisecond*100 {
|
plugin.Warn("event cost too much time", zap.String("event", fmt.Sprintf("%v", event)), zap.Duration("cost", cost))
|
||||||
plugin.Warn("event cost too much time", zap.String("event", fmt.Sprintf("%v", event)), zap.Duration("cost", cost))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
EngineConfig.OnEvent(event)
|
EngineConfig.OnEvent(event)
|
||||||
|
84
stream.go
84
stream.go
@@ -10,6 +10,8 @@ import (
|
|||||||
|
|
||||||
. "github.com/logrusorgru/aurora"
|
. "github.com/logrusorgru/aurora"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
"m7s.live/engine/v4/codec"
|
||||||
|
"m7s.live/engine/v4/common"
|
||||||
. "m7s.live/engine/v4/common"
|
. "m7s.live/engine/v4/common"
|
||||||
"m7s.live/engine/v4/config"
|
"m7s.live/engine/v4/config"
|
||||||
"m7s.live/engine/v4/log"
|
"m7s.live/engine/v4/log"
|
||||||
@@ -130,6 +132,7 @@ type StreamTimeoutConfig struct {
|
|||||||
type Tracks struct {
|
type Tracks struct {
|
||||||
sync.Map
|
sync.Map
|
||||||
MainVideo *track.Video
|
MainVideo *track.Video
|
||||||
|
SEI *track.Data[[]byte]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tracks *Tracks) Range(f func(name string, t Track)) {
|
func (tracks *Tracks) Range(f func(name string, t Track)) {
|
||||||
@@ -146,6 +149,11 @@ func (tracks *Tracks) Add(name string, t Track) bool {
|
|||||||
tracks.MainVideo = v
|
tracks.MainVideo = v
|
||||||
tracks.SetIDR(v)
|
tracks.SetIDR(v)
|
||||||
}
|
}
|
||||||
|
if tracks.SEI != nil {
|
||||||
|
v.SEIReader = &track.DataReader[[]byte]{
|
||||||
|
Ring: tracks.SEI.Ring,
|
||||||
|
}
|
||||||
|
}
|
||||||
case *track.Audio:
|
case *track.Audio:
|
||||||
if tracks.MainVideo != nil {
|
if tracks.MainVideo != nil {
|
||||||
v.Narrow()
|
v.Narrow()
|
||||||
@@ -165,6 +173,24 @@ func (tracks *Tracks) SetIDR(video Track) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (tracks *Tracks) AddSEI(t byte, data []byte) bool {
|
||||||
|
if tracks.SEI != nil {
|
||||||
|
l := len(data)
|
||||||
|
var buffer util.Buffer
|
||||||
|
buffer.WriteByte(byte(codec.NALU_SEI))
|
||||||
|
buffer.WriteByte(t)
|
||||||
|
for l > 255 {
|
||||||
|
buffer.WriteByte(255)
|
||||||
|
l -= 255
|
||||||
|
}
|
||||||
|
buffer.WriteByte(byte(l))
|
||||||
|
buffer.Write(data)
|
||||||
|
tracks.SEI.Push(buffer)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func (tracks *Tracks) MarshalJSON() ([]byte, error) {
|
func (tracks *Tracks) MarshalJSON() ([]byte, error) {
|
||||||
var trackList []Track
|
var trackList []Track
|
||||||
tracks.Range(func(_ string, t Track) {
|
tracks.Range(func(_ string, t Track) {
|
||||||
@@ -190,6 +216,7 @@ type Stream struct {
|
|||||||
AppName string
|
AppName string
|
||||||
StreamName string
|
StreamName string
|
||||||
IsPause bool // 是否处于暂停状态
|
IsPause bool // 是否处于暂停状态
|
||||||
|
pubLocker sync.Mutex
|
||||||
}
|
}
|
||||||
type StreamSummay struct {
|
type StreamSummay struct {
|
||||||
Path string
|
Path string
|
||||||
@@ -222,8 +249,7 @@ func (s *Stream) Summary() (r StreamSummay) {
|
|||||||
r.Type = s.Publisher.GetPublisher().Type
|
r.Type = s.Publisher.GetPublisher().Type
|
||||||
}
|
}
|
||||||
s.Tracks.Range(func(name string, t Track) {
|
s.Tracks.Range(func(name string, t Track) {
|
||||||
b := t.GetBase()
|
r.BPS += t.GetBPS()
|
||||||
r.BPS += b.BPS
|
|
||||||
r.Tracks = append(r.Tracks, name)
|
r.Tracks = append(r.Tracks, name)
|
||||||
})
|
})
|
||||||
r.Path = s.Path
|
r.Path = s.Path
|
||||||
@@ -245,22 +271,20 @@ func findOrCreateStream(streamPath string, waitTimeout time.Duration) (s *Stream
|
|||||||
log.Warn(Red("Stream Path Format Error:"), streamPath)
|
log.Warn(Red("Stream Path Format Error:"), streamPath)
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
if s := Streams.Get(streamPath); s != nil {
|
actual, loaded := Streams.LoadOrStore(streamPath, &Stream{
|
||||||
|
Path: streamPath,
|
||||||
|
AppName: p[0],
|
||||||
|
StreamName: strings.Join(p[1:], "/"),
|
||||||
|
StartTime: time.Now(),
|
||||||
|
})
|
||||||
|
if s := actual.(*Stream); loaded {
|
||||||
s.Debug("Stream Found")
|
s.Debug("Stream Found")
|
||||||
return s, false
|
return s, false
|
||||||
} else {
|
} else {
|
||||||
p := strings.Split(streamPath, "/")
|
s.timeout = time.NewTimer(waitTimeout)
|
||||||
s = &Stream{
|
|
||||||
Path: streamPath,
|
|
||||||
AppName: p[0],
|
|
||||||
StreamName: strings.Join(p[1:], "/"),
|
|
||||||
StartTime: time.Now(),
|
|
||||||
timeout: time.NewTimer(waitTimeout),
|
|
||||||
}
|
|
||||||
s.Subscribers.Init()
|
s.Subscribers.Init()
|
||||||
s.Logger = log.LocaleLogger.With(zap.String("stream", streamPath))
|
s.Logger = log.LocaleLogger.With(zap.String("stream", streamPath))
|
||||||
s.Info("created")
|
s.Info("created")
|
||||||
Streams.Set(streamPath, s)
|
|
||||||
s.actionChan.Init(1)
|
s.actionChan.Init(1)
|
||||||
go s.run()
|
go s.run()
|
||||||
return s, true
|
return s, true
|
||||||
@@ -430,13 +454,13 @@ func (s *Stream) run() {
|
|||||||
}
|
}
|
||||||
s.Tracks.Range(func(name string, t Track) {
|
s.Tracks.Range(func(name string, t Track) {
|
||||||
trackCount++
|
trackCount++
|
||||||
if _, ok := t.(track.Custom); ok {
|
switch t.(type) {
|
||||||
return
|
case *track.Video, *track.Audio:
|
||||||
}
|
// track 超过一定时间没有更新数据了
|
||||||
// track 超过一定时间没有更新数据了
|
if lastWriteTime := t.LastWriteTime(); !lastWriteTime.IsZero() && time.Since(lastWriteTime) > timeout {
|
||||||
if lastWriteTime := t.LastWriteTime(); !lastWriteTime.IsZero() && time.Since(lastWriteTime) > timeout {
|
s.Warn("track timeout", zap.String("name", name), zap.Time("last writetime", lastWriteTime), zap.Duration("timeout", timeout))
|
||||||
s.Warn("track timeout", zap.String("name", name), zap.Time("last writetime", lastWriteTime), zap.Duration("timeout", timeout))
|
hasTrackTimeout = true
|
||||||
hasTrackTimeout = true
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
if trackCount == 0 || hasTrackTimeout || (s.Publisher != nil && s.Publisher.IsClosed()) {
|
if trackCount == 0 || hasTrackTimeout || (s.Publisher != nil && s.Publisher.IsClosed()) {
|
||||||
@@ -470,6 +494,16 @@ func (s *Stream) run() {
|
|||||||
}
|
}
|
||||||
if s.action(ACTION_PUBLISH) || republish || kicked {
|
if s.action(ACTION_PUBLISH) || republish || kicked {
|
||||||
v.Resolve()
|
v.Resolve()
|
||||||
|
if s.Publisher.GetPublisher().Config.InsertSEI {
|
||||||
|
if s.Tracks.SEI == nil {
|
||||||
|
s.Tracks.SEI = track.NewDataTrack[[]byte]("sei")
|
||||||
|
s.Tracks.SEI.Locker = &sync.Mutex{}
|
||||||
|
s.Tracks.SEI.SetStuff(s)
|
||||||
|
if s.Tracks.Add("sei", s.Tracks.SEI) {
|
||||||
|
s.Info("sei track added")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
v.Reject(ErrBadStreamName)
|
v.Reject(ErrBadStreamName)
|
||||||
}
|
}
|
||||||
@@ -526,13 +560,11 @@ func (s *Stream) run() {
|
|||||||
s.onSuberClose(v)
|
s.onSuberClose(v)
|
||||||
case TrackRemoved:
|
case TrackRemoved:
|
||||||
timeOutInfo = zap.String("action", "TrackRemoved")
|
timeOutInfo = zap.String("action", "TrackRemoved")
|
||||||
name := v.GetBase().Name
|
name := v.GetName()
|
||||||
if t, ok := s.Tracks.LoadAndDelete(name); ok {
|
if t, ok := s.Tracks.LoadAndDelete(name); ok {
|
||||||
s.Info("track -1", zap.String("name", name))
|
s.Info("track -1", zap.String("name", name))
|
||||||
s.Subscribers.Broadcast(t)
|
s.Subscribers.Broadcast(t)
|
||||||
if dt, ok := t.(track.Custom); ok {
|
t.(common.Track).Dispose()
|
||||||
dt.Dispose()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
case *util.Promise[Track]:
|
case *util.Promise[Track]:
|
||||||
timeOutInfo = zap.String("action", "Track")
|
timeOutInfo = zap.String("action", "Track")
|
||||||
@@ -540,7 +572,7 @@ func (s *Stream) run() {
|
|||||||
s.action(ACTION_PUBLISH)
|
s.action(ACTION_PUBLISH)
|
||||||
}
|
}
|
||||||
pubConfig := s.GetPublisherConfig()
|
pubConfig := s.GetPublisherConfig()
|
||||||
name := v.Value.GetBase().Name
|
name := v.Value.GetName()
|
||||||
if _, ok := v.Value.(*track.Video); ok && !pubConfig.PubVideo {
|
if _, ok := v.Value.(*track.Video); ok && !pubConfig.PubVideo {
|
||||||
v.Reject(ErrTrackMute)
|
v.Reject(ErrTrackMute)
|
||||||
continue
|
continue
|
||||||
@@ -575,9 +607,7 @@ func (s *Stream) run() {
|
|||||||
} else {
|
} else {
|
||||||
s.Subscribers.Dispose()
|
s.Subscribers.Dispose()
|
||||||
s.Tracks.Range(func(_ string, t Track) {
|
s.Tracks.Range(func(_ string, t Track) {
|
||||||
if dt, ok := t.(track.Custom); ok {
|
t.Dispose()
|
||||||
dt.Dispose()
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@@ -148,7 +148,7 @@ func (s *Subscriber) OnEvent(event any) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Subscriber) CreateTrackReader(t *track.Media) (result *track.AVRingReader) {
|
func (s *Subscriber) CreateTrackReader(t *track.Media) (result *track.AVRingReader) {
|
||||||
result = track.NewAVRingReader(t, s.Config.Poll)
|
result = track.NewAVRingReader(t)
|
||||||
result.Logger = s.With(zap.String("track", t.Name))
|
result.Logger = s.With(zap.String("track", t.Name))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -170,7 +170,7 @@ func (s *Subscriber) AddTrack(t Track) bool {
|
|||||||
default:
|
default:
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
s.Info("track+1", zap.String("name", t.GetBase().Name))
|
s.Info("track+1", zap.String("name", t.GetName()))
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -236,11 +236,12 @@ func (s *Subscriber) PlayBlock(subType byte) {
|
|||||||
var videoSeq, audioSeq uint16
|
var videoSeq, audioSeq uint16
|
||||||
sendVideoFrame = func(frame *AVFrame) {
|
sendVideoFrame = func(frame *AVFrame) {
|
||||||
// fmt.Println("v", frame.Sequence, frame.AbsTime, s.VideoReader.AbsTime, frame.IFrame)
|
// fmt.Println("v", frame.Sequence, frame.AbsTime, s.VideoReader.AbsTime, frame.IFrame)
|
||||||
|
delta := uint32(s.VideoReader.SkipTs * 90 / time.Millisecond)
|
||||||
frame.RTP.Range(func(vp RTPFrame) bool {
|
frame.RTP.Range(func(vp RTPFrame) bool {
|
||||||
videoSeq++
|
videoSeq++
|
||||||
copy := *vp.Packet
|
copy := *vp.Packet
|
||||||
vp.Packet = ©
|
vp.Packet = ©
|
||||||
vp.Header.Timestamp = vp.Header.Timestamp - uint32(s.VideoReader.SkipTs*90/time.Millisecond)
|
vp.Header.Timestamp = vp.Header.Timestamp - delta
|
||||||
vp.Header.SequenceNumber = videoSeq
|
vp.Header.SequenceNumber = videoSeq
|
||||||
spesic.OnEvent((VideoRTP)(vp))
|
spesic.OnEvent((VideoRTP)(vp))
|
||||||
return true
|
return true
|
||||||
@@ -249,12 +250,13 @@ func (s *Subscriber) PlayBlock(subType byte) {
|
|||||||
|
|
||||||
sendAudioFrame = func(frame *AVFrame) {
|
sendAudioFrame = func(frame *AVFrame) {
|
||||||
// fmt.Println("a", frame.Sequence, frame.AbsTime, s.AudioReader.AbsTime)
|
// fmt.Println("a", frame.Sequence, frame.AbsTime, s.AudioReader.AbsTime)
|
||||||
|
delta := uint32(s.AudioReader.SkipTs / time.Millisecond * time.Duration(s.AudioReader.Track.SampleRate) / 1000)
|
||||||
frame.RTP.Range(func(ap RTPFrame) bool {
|
frame.RTP.Range(func(ap RTPFrame) bool {
|
||||||
audioSeq++
|
audioSeq++
|
||||||
copy := *ap.Packet
|
copy := *ap.Packet
|
||||||
ap.Packet = ©
|
ap.Packet = ©
|
||||||
ap.Header.SequenceNumber = audioSeq
|
ap.Header.SequenceNumber = audioSeq
|
||||||
ap.Header.Timestamp = ap.Header.Timestamp - uint32(s.AudioReader.SkipTs/time.Millisecond*time.Duration(s.AudioReader.Track.SampleRate)/1000)
|
ap.Header.Timestamp = ap.Header.Timestamp - delta
|
||||||
spesic.OnEvent((AudioRTP)(ap))
|
spesic.OnEvent((AudioRTP)(ap))
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
@@ -8,13 +8,16 @@ import (
|
|||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
. "m7s.live/engine/v4/common"
|
. "m7s.live/engine/v4/common"
|
||||||
"m7s.live/engine/v4/config"
|
"m7s.live/engine/v4/config"
|
||||||
|
"m7s.live/engine/v4/log"
|
||||||
"m7s.live/engine/v4/util"
|
"m7s.live/engine/v4/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
type EmptyLocker struct{}
|
type emptyLocker struct{}
|
||||||
|
|
||||||
func (EmptyLocker) Lock() {}
|
func (emptyLocker) Lock() {}
|
||||||
func (EmptyLocker) Unlock() {}
|
func (emptyLocker) Unlock() {}
|
||||||
|
|
||||||
|
var EmptyLocker emptyLocker
|
||||||
|
|
||||||
type 流速控制 struct {
|
type 流速控制 struct {
|
||||||
起始时间戳 time.Duration
|
起始时间戳 time.Duration
|
||||||
@@ -70,25 +73,24 @@ type SpesificTrack interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type IDRingList struct {
|
type IDRingList struct {
|
||||||
util.List[*util.Ring[AVFrame]]
|
IDRList util.List[*util.Ring[AVFrame]]
|
||||||
IDRing *util.Ring[AVFrame]
|
IDRing *util.Ring[AVFrame]
|
||||||
HistoryRing *util.Ring[AVFrame]
|
HistoryRing *util.Ring[AVFrame]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *IDRingList) AddIDR(IDRing *util.Ring[AVFrame]) {
|
func (p *IDRingList) AddIDR(IDRing *util.Ring[AVFrame]) {
|
||||||
p.PushValue(IDRing)
|
p.IDRList.PushValue(IDRing)
|
||||||
p.IDRing = IDRing
|
p.IDRing = IDRing
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *IDRingList) ShiftIDR() {
|
func (p *IDRingList) ShiftIDR() {
|
||||||
p.Shift()
|
p.IDRList.Shift()
|
||||||
p.HistoryRing = p.Next.Value
|
p.HistoryRing = p.IDRList.Next.Value
|
||||||
}
|
}
|
||||||
|
|
||||||
// Media 基础媒体Track类
|
// Media 基础媒体Track类
|
||||||
type Media struct {
|
type Media struct {
|
||||||
Base
|
Base[AVFrame]
|
||||||
RingBuffer[AVFrame]
|
|
||||||
PayloadType byte
|
PayloadType byte
|
||||||
IDRingList `json:"-" yaml:"-"` //最近的关键帧位置,首屏渲染
|
IDRingList `json:"-" yaml:"-"` //最近的关键帧位置,首屏渲染
|
||||||
SSRC uint32
|
SSRC uint32
|
||||||
@@ -104,6 +106,10 @@ type Media struct {
|
|||||||
流速控制
|
流速控制
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (av *Media) Dispose() {
|
||||||
|
av.Value.Broadcast()
|
||||||
|
}
|
||||||
|
|
||||||
func (av *Media) GetFromPool(b util.IBytes) (item *util.ListItem[util.Buffer]) {
|
func (av *Media) GetFromPool(b util.IBytes) (item *util.ListItem[util.Buffer]) {
|
||||||
if b.Reuse() {
|
if b.Reuse() {
|
||||||
item = av.BytesPool.Get(b.Len())
|
item = av.BytesPool.Get(b.Len())
|
||||||
@@ -114,10 +120,6 @@ func (av *Media) GetFromPool(b util.IBytes) (item *util.ListItem[util.Buffer]) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (av *Media) GetRBSize() int {
|
|
||||||
return av.RingBuffer.Size
|
|
||||||
}
|
|
||||||
|
|
||||||
func (av *Media) GetRTPFromPool() (result *util.ListItem[RTPFrame]) {
|
func (av *Media) GetRTPFromPool() (result *util.ListItem[RTPFrame]) {
|
||||||
result = av.RtpPool.Get()
|
result = av.RtpPool.Get()
|
||||||
if result.Value.Packet == nil {
|
if result.Value.Packet == nil {
|
||||||
@@ -158,7 +160,7 @@ func (av *Media) SetStuff(stuff ...any) {
|
|||||||
switch v := s.(type) {
|
switch v := s.(type) {
|
||||||
case int:
|
case int:
|
||||||
av.Init(v)
|
av.Init(v)
|
||||||
av.Value.L = EmptyLocker{}
|
av.Value.L = EmptyLocker
|
||||||
av.SSRC = uint32(uintptr(unsafe.Pointer(av)))
|
av.SSRC = uint32(uintptr(unsafe.Pointer(av)))
|
||||||
av.等待上限 = config.Global.SpeedLimit
|
av.等待上限 = config.Global.SpeedLimit
|
||||||
case uint32:
|
case uint32:
|
||||||
@@ -179,17 +181,6 @@ func (av *Media) LastWriteTime() time.Time {
|
|||||||
return av.LastValue.WriteTime
|
return av.LastValue.WriteTime
|
||||||
}
|
}
|
||||||
|
|
||||||
// func (av *Media) Play(ctx context.Context, onMedia func(*AVFrame) error) error {
|
|
||||||
// for ar := av.ReadRing(); ctx.Err() == nil; ar.MoveNext() {
|
|
||||||
// ap := ar.Read(ctx)
|
|
||||||
// if err := onMedia(ap); err != nil {
|
|
||||||
// // TODO: log err
|
|
||||||
// return err
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// return ctx.Err()
|
|
||||||
// }
|
|
||||||
|
|
||||||
func (av *Media) CurrentFrame() *AVFrame {
|
func (av *Media) CurrentFrame() *AVFrame {
|
||||||
return &av.Value
|
return &av.Value
|
||||||
}
|
}
|
||||||
@@ -216,9 +207,11 @@ func (av *Media) AppendAuBytes(b ...[]byte) {
|
|||||||
|
|
||||||
func (av *Media) narrow(gop int) {
|
func (av *Media) narrow(gop int) {
|
||||||
if l := av.Size - gop; l > 12 {
|
if l := av.Size - gop; l > 12 {
|
||||||
// av.Debug("resize", zap.Int("before", av.Size), zap.Int("after", av.Size-5))
|
if log.Trace {
|
||||||
|
av.Trace("resize", zap.Int("before", av.Size), zap.Int("after", av.Size-5))
|
||||||
|
}
|
||||||
//缩小缓冲环节省内存
|
//缩小缓冲环节省内存
|
||||||
av.Reduce(5).Do(func(v AVFrame) {
|
av.Reduce(5).Do(func(v *AVFrame) {
|
||||||
v.Reset()
|
v.Reset()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -283,16 +276,20 @@ func (av *Media) Flush() {
|
|||||||
|
|
||||||
curValue.DeltaTime = uint32(deltaTS(curValue.Timestamp, preValue.Timestamp) / time.Millisecond)
|
curValue.DeltaTime = uint32(deltaTS(curValue.Timestamp, preValue.Timestamp) / time.Millisecond)
|
||||||
}
|
}
|
||||||
av.Trace("write", zap.Uint32("seq", curValue.Sequence), zap.Duration("dts", curValue.DTS), zap.Duration("dts delta", curValue.DTS-preValue.DTS), zap.Uint32("delta", curValue.DeltaTime), zap.Duration("timestamp", curValue.Timestamp))
|
if log.Trace {
|
||||||
|
av.Trace("write", zap.Uint32("seq", curValue.Sequence), zap.Duration("dts", curValue.DTS), zap.Duration("dts delta", curValue.DTS-preValue.DTS), zap.Uint32("delta", curValue.DeltaTime), zap.Duration("timestamp", curValue.Timestamp), zap.Int("au", curValue.AUList.Length), zap.Int("rtp", curValue.RTP.Length), zap.Int("avcc", curValue.AVCC.ByteLength), zap.Int("raw", curValue.AUList.ByteLength), zap.Int("bps", av.BPS))
|
||||||
|
}
|
||||||
bufferTime := av.Stream.GetPublisherConfig().BufferTime
|
bufferTime := av.Stream.GetPublisherConfig().BufferTime
|
||||||
if bufferTime > 0 && av.IDRingList.Length > 1 && deltaTS(curValue.Timestamp, av.IDRingList.Next.Next.Value.Value.Timestamp) > bufferTime {
|
if bufferTime > 0 && av.IDRingList.IDRList.Length > 1 && deltaTS(curValue.Timestamp, av.IDRingList.IDRList.Next.Next.Value.Value.Timestamp) > bufferTime {
|
||||||
av.ShiftIDR()
|
av.ShiftIDR()
|
||||||
av.narrow(int(curValue.Sequence - av.HistoryRing.Value.Sequence))
|
av.narrow(int(curValue.Sequence - av.HistoryRing.Value.Sequence))
|
||||||
}
|
}
|
||||||
// 下一帧为订阅起始帧,即将覆盖,需要扩环
|
// 下一帧为订阅起始帧,即将覆盖,需要扩环
|
||||||
if nextValue == av.IDRing || nextValue == av.HistoryRing {
|
if nextValue == av.IDRing || nextValue == av.HistoryRing {
|
||||||
// if av.AVRing.Size < 512 {
|
// if av.AVRing.Size < 512 {
|
||||||
// av.Stream.Debug("resize", zap.Int("before", av.Size), zap.Int("after", av.Size+5), zap.String("name", av.Name))
|
if log.Trace {
|
||||||
|
av.Stream.Trace("resize", zap.Int("before", av.Size), zap.Int("after", av.Size+5), zap.String("name", av.Name))
|
||||||
|
}
|
||||||
av.Glow(5)
|
av.Glow(5)
|
||||||
// } else {
|
// } else {
|
||||||
// av.Stream.Error("sub ring overflow", zap.Int("size", av.AVRing.Size), zap.String("name", av.Name))
|
// av.Stream.Error("sub ring overflow", zap.Int("size", av.AVRing.Size), zap.String("name", av.Name))
|
||||||
@@ -309,7 +306,8 @@ func (av *Media) Flush() {
|
|||||||
av.CompleteAVCC(curValue)
|
av.CompleteAVCC(curValue)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
av.Base.Flush(&curValue.BaseFrame)
|
av.ComputeBPS(curValue.BytesIn)
|
||||||
|
curValue.WriteTime = time.Now()
|
||||||
if av.等待上限 > 0 {
|
if av.等待上限 > 0 {
|
||||||
av.控制流速(curValue.Timestamp, curValue.DTS)
|
av.控制流速(curValue.Timestamp, curValue.DTS)
|
||||||
}
|
}
|
||||||
@@ -318,7 +316,7 @@ func (av *Media) Flush() {
|
|||||||
curValue.CanRead = false
|
curValue.CanRead = false
|
||||||
curValue.Reset()
|
curValue.Reset()
|
||||||
if curValue.L == nil {
|
if curValue.L == nil {
|
||||||
curValue.L = EmptyLocker{}
|
curValue.L = EmptyLocker
|
||||||
}
|
}
|
||||||
curValue.Sequence = av.MoveCount
|
curValue.Sequence = av.MoveCount
|
||||||
preValue.CanRead = true
|
preValue.CanRead = true
|
||||||
|
130
track/data.go
130
track/data.go
@@ -7,30 +7,13 @@ import (
|
|||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
. "m7s.live/engine/v4/common"
|
. "m7s.live/engine/v4/common"
|
||||||
|
"m7s.live/engine/v4/log"
|
||||||
"m7s.live/engine/v4/util"
|
"m7s.live/engine/v4/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Custom interface {
|
|
||||||
Track
|
|
||||||
Dispose()
|
|
||||||
}
|
|
||||||
|
|
||||||
type Data[T any] struct {
|
type Data[T any] struct {
|
||||||
Base
|
Base[DataFrame[T]]
|
||||||
LockRing[T]
|
sync.Locker `json:"-" yaml:"-"` // 写入锁,可选,单一协程写入可以不加锁
|
||||||
sync.Locker // 写入锁,可选,单一协程写入可以不加锁
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *Data[T]) GetRBSize() int {
|
|
||||||
return d.LockRing.RingBuffer.Size
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *Data[T]) ReadRing() *LockRing[T] {
|
|
||||||
return util.Clone(d.LockRing)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *Data[T]) LastWriteTime() time.Time {
|
|
||||||
return d.LockRing.RingBuffer.LastValue.WriteTime
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dt *Data[T]) Push(data T) {
|
func (dt *Data[T]) Push(data T) {
|
||||||
@@ -38,21 +21,43 @@ func (dt *Data[T]) Push(data T) {
|
|||||||
dt.Lock()
|
dt.Lock()
|
||||||
defer dt.Unlock()
|
defer dt.Unlock()
|
||||||
}
|
}
|
||||||
dt.Value.WriteTime = time.Now()
|
curValue := &dt.Value
|
||||||
dt.Write(data)
|
if log.Trace {
|
||||||
|
dt.Trace("push data", zap.Uint32("sequence", curValue.Sequence))
|
||||||
|
}
|
||||||
|
curValue.WriteTime = time.Now()
|
||||||
|
curValue.Data = data
|
||||||
|
preValue := curValue
|
||||||
|
curValue = dt.MoveNext()
|
||||||
|
curValue.CanRead = false
|
||||||
|
curValue.Reset()
|
||||||
|
if curValue.L == nil {
|
||||||
|
curValue.L = EmptyLocker
|
||||||
|
}
|
||||||
|
curValue.Sequence = dt.MoveCount
|
||||||
|
preValue.CanRead = true
|
||||||
|
preValue.Broadcast()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Data[T]) Play(ctx context.Context, onData func(*DataFrame[T]) error) error {
|
func (d *Data[T]) Play(ctx context.Context, onData func(*DataFrame[T]) error) (err error) {
|
||||||
for r := d.ReadRing(); ctx.Err() == nil; r.MoveNext() {
|
d.Debug("play data track")
|
||||||
p := r.Read()
|
reader := DataReader[T]{
|
||||||
if *r.Flag == 2 {
|
Ctx: ctx,
|
||||||
break
|
Ring: d.Ring,
|
||||||
}
|
}
|
||||||
if err := onData(p); err != nil {
|
for {
|
||||||
return err
|
curValue := reader.Read()
|
||||||
}
|
if err = ctx.Err(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if log.Trace {
|
||||||
|
d.Trace("read data", zap.Uint32("sequence", curValue.Sequence))
|
||||||
|
}
|
||||||
|
if err = onData(curValue); err == nil {
|
||||||
|
err = ctx.Err()
|
||||||
|
}
|
||||||
|
reader.MoveNext()
|
||||||
}
|
}
|
||||||
return ctx.Err()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Data[T]) Attach(s IStream) {
|
func (d *Data[T]) Attach(s IStream) {
|
||||||
@@ -64,9 +69,70 @@ func (d *Data[T]) Attach(s IStream) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *Data[T]) Dispose() {
|
||||||
|
d.Value.Broadcast()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Data[T]) LastWriteTime() time.Time {
|
||||||
|
return d.LastValue.WriteTime
|
||||||
|
}
|
||||||
|
|
||||||
func NewDataTrack[T any](name string) (dt *Data[T]) {
|
func NewDataTrack[T any](name string) (dt *Data[T]) {
|
||||||
dt = &Data[T]{}
|
dt = &Data[T]{}
|
||||||
dt.Init(10)
|
dt.Init(10)
|
||||||
|
dt.Value.L = EmptyLocker
|
||||||
|
dt.SetStuff(name)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
type RecycleData[T util.Recyclable] struct {
|
||||||
|
Data[T]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dt *RecycleData[T]) Push(data T) {
|
||||||
|
if dt.Locker != nil {
|
||||||
|
dt.Lock()
|
||||||
|
defer dt.Unlock()
|
||||||
|
}
|
||||||
|
curValue := &dt.Value
|
||||||
|
if log.Trace {
|
||||||
|
dt.Trace("push data", zap.Uint32("sequence", curValue.Sequence))
|
||||||
|
}
|
||||||
|
curValue.WriteTime = time.Now()
|
||||||
|
curValue.Data = data
|
||||||
|
preValue := curValue
|
||||||
|
curValue = dt.MoveNext()
|
||||||
|
curValue.CanRead = false
|
||||||
|
curValue.Reset()
|
||||||
|
if curValue.L == nil {
|
||||||
|
curValue.L = EmptyLocker
|
||||||
|
} else {
|
||||||
|
curValue.Data.Recycle()
|
||||||
|
}
|
||||||
|
curValue.Sequence = dt.MoveCount
|
||||||
|
preValue.CanRead = true
|
||||||
|
preValue.Broadcast()
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRecycleDataTrack[T util.Recyclable](name string) (dt *RecycleData[T]) {
|
||||||
|
dt = &RecycleData[T]{}
|
||||||
|
dt.Init(10)
|
||||||
|
dt.Value.L = EmptyLocker
|
||||||
|
dt.SetStuff(name)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
type BytesData struct {
|
||||||
|
RecycleData[*util.ListItem[util.Buffer]]
|
||||||
|
Pool util.BytesPool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewBytesDataTrack(name string) (dt *BytesData) {
|
||||||
|
dt = &BytesData{
|
||||||
|
Pool: make(util.BytesPool, 17),
|
||||||
|
}
|
||||||
|
dt.Init(10)
|
||||||
|
dt.Value.L = EmptyLocker
|
||||||
dt.SetStuff(name)
|
dt.SetStuff(name)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@@ -7,6 +7,7 @@ import (
|
|||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"m7s.live/engine/v4/codec"
|
"m7s.live/engine/v4/codec"
|
||||||
. "m7s.live/engine/v4/common"
|
. "m7s.live/engine/v4/common"
|
||||||
|
"m7s.live/engine/v4/log"
|
||||||
"m7s.live/engine/v4/util"
|
"m7s.live/engine/v4/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -32,7 +33,9 @@ func NewH264(stream IStream, stuff ...any) (vt *H264) {
|
|||||||
|
|
||||||
func (vt *H264) WriteSliceBytes(slice []byte) {
|
func (vt *H264) WriteSliceBytes(slice []byte) {
|
||||||
naluType := codec.ParseH264NALUType(slice[0])
|
naluType := codec.ParseH264NALUType(slice[0])
|
||||||
vt.Trace("naluType", zap.Uint8("naluType", naluType.Byte()))
|
if log.Trace {
|
||||||
|
vt.Trace("naluType", zap.Uint8("naluType", naluType.Byte()))
|
||||||
|
}
|
||||||
switch naluType {
|
switch naluType {
|
||||||
case codec.NALU_SPS:
|
case codec.NALU_SPS:
|
||||||
spsInfo, _ := codec.ParseSPS(slice)
|
spsInfo, _ := codec.ParseSPS(slice)
|
||||||
|
@@ -79,7 +79,7 @@ func (vt *H265) WriteSliceBytes(slice []byte) {
|
|||||||
func (vt *H265) writeSequenceHead(head []byte) (err error) {
|
func (vt *H265) writeSequenceHead(head []byte) (err error) {
|
||||||
vt.WriteSequenceHead(head)
|
vt.WriteSequenceHead(head)
|
||||||
if vt.VPS, vt.SPS, vt.PPS, err = codec.ParseVpsSpsPpsFromSeqHeaderWithoutMalloc(vt.SequenceHead); err == nil {
|
if vt.VPS, vt.SPS, vt.PPS, err = codec.ParseVpsSpsPpsFromSeqHeaderWithoutMalloc(vt.SequenceHead); err == nil {
|
||||||
vt.SPSInfo, _ = codec.ParseHevcSPS(vt.SequenceHead)
|
vt.SPSInfo, _ = codec.ParseHevcSPS(vt.SPS)
|
||||||
vt.nalulenSize = (int(vt.SequenceHead[26]) & 0x03) + 1
|
vt.nalulenSize = (int(vt.SequenceHead[26]) & 0x03) + 1
|
||||||
} else {
|
} else {
|
||||||
vt.Error("H265 ParseVpsSpsPps Error")
|
vt.Error("H265 ParseVpsSpsPps Error")
|
||||||
@@ -94,6 +94,7 @@ func (vt *H265) WriteAVCC(ts uint32, frame *util.BLL) (err error) {
|
|||||||
}
|
}
|
||||||
b0 := frame.GetByte(0)
|
b0 := frame.GetByte(0)
|
||||||
if isExtHeader := (b0 >> 4) & 0b1000; isExtHeader != 0 {
|
if isExtHeader := (b0 >> 4) & 0b1000; isExtHeader != 0 {
|
||||||
|
firstBuffer := frame.Next.Value
|
||||||
packetType := b0 & 0b1111
|
packetType := b0 & 0b1111
|
||||||
switch packetType {
|
switch packetType {
|
||||||
case codec.PacketTypeSequenceStart:
|
case codec.PacketTypeSequenceStart:
|
||||||
@@ -107,18 +108,18 @@ func (vt *H265) WriteAVCC(ts uint32, frame *util.BLL) (err error) {
|
|||||||
frame.Recycle()
|
frame.Recycle()
|
||||||
return
|
return
|
||||||
case codec.PacketTypeCodedFrames:
|
case codec.PacketTypeCodedFrames:
|
||||||
frame.Next.Value[0] = b0 & 0b0111_1111 & 0xFC
|
firstBuffer[0] = b0 & 0b0111_1111 & 0xFC
|
||||||
frame.Next.Value[1] = 0x01
|
firstBuffer[1] = 0x01
|
||||||
copy(frame.Next.Value[2:], frame.Next.Value[5:])
|
copy(firstBuffer[2:], firstBuffer[5:])
|
||||||
frame.Next.Value = frame.Next.Value[:frame.Next.Value.Len()-3]
|
frame.Next.Value = firstBuffer[:firstBuffer.Len()-3]
|
||||||
frame.ByteLength -= 3
|
frame.ByteLength -= 3
|
||||||
return vt.Video.WriteAVCC(ts, frame)
|
return vt.Video.WriteAVCC(ts, frame)
|
||||||
case codec.PacketTypeCodedFramesX:
|
case codec.PacketTypeCodedFramesX:
|
||||||
frame.Next.Value[0] = b0 & 0b0111_1111 & 0xFC
|
firstBuffer[0] = b0 & 0b0111_1111 & 0xFC
|
||||||
frame.Next.Value[1] = 0x01
|
firstBuffer[1] = 0x01
|
||||||
frame.Next.Value[2] = 0
|
firstBuffer[2] = 0
|
||||||
frame.Next.Value[3] = 0
|
firstBuffer[3] = 0
|
||||||
frame.Next.Value[4] = 0
|
firstBuffer[4] = 0
|
||||||
return vt.Video.WriteAVCC(ts, frame)
|
return vt.Video.WriteAVCC(ts, frame)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@@ -2,7 +2,6 @@ package track
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"runtime"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
@@ -24,9 +23,10 @@ const (
|
|||||||
|
|
||||||
type AVRingReader struct {
|
type AVRingReader struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
mode int
|
||||||
Track *Media
|
Track *Media
|
||||||
*util.Ring[common.AVFrame]
|
*util.Ring[common.AVFrame]
|
||||||
wait func()
|
// wait func()
|
||||||
State byte
|
State byte
|
||||||
FirstSeq uint32
|
FirstSeq uint32
|
||||||
FirstTs time.Duration
|
FirstTs time.Duration
|
||||||
@@ -44,17 +44,17 @@ func (r *AVRingReader) DecConfChanged() bool {
|
|||||||
return r.ConfSeq != r.Track.SequenceHeadSeq
|
return r.ConfSeq != r.Track.SequenceHeadSeq
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewAVRingReader(t *Media, poll time.Duration) *AVRingReader {
|
func NewAVRingReader(t *Media) *AVRingReader {
|
||||||
r := &AVRingReader{
|
r := &AVRingReader{
|
||||||
Track: t,
|
Track: t,
|
||||||
}
|
}
|
||||||
if poll == 0 {
|
// if poll == 0 {
|
||||||
r.wait = runtime.Gosched
|
// r.wait = runtime.Gosched
|
||||||
} else {
|
// } else {
|
||||||
r.wait = func() {
|
// r.wait = func() {
|
||||||
time.Sleep(poll)
|
// time.Sleep(poll)
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -63,7 +63,7 @@ func (r *AVRingReader) ReadFrame() *common.AVFrame {
|
|||||||
r.Frame.Wait()
|
r.Frame.Wait()
|
||||||
}
|
}
|
||||||
// 超过一半的缓冲区大小,说明Reader太慢,需要丢帧
|
// 超过一半的缓冲区大小,说明Reader太慢,需要丢帧
|
||||||
if r.State == READSTATE_NORMAL && r.Track.LastValue.Sequence-r.Frame.Sequence > uint32(r.Track.Size/2) && r.Track.IDRing != nil && r.Track.IDRing.Value.Sequence > r.Frame.Sequence {
|
if r.mode != SUBMODE_BUFFER && r.State == READSTATE_NORMAL && r.Track.LastValue.Sequence-r.Frame.Sequence > uint32(r.Track.Size/2) && r.Track.IDRing != nil && r.Track.IDRing.Value.Sequence > r.Frame.Sequence {
|
||||||
r.Warn("reader too slow", zap.Uint32("lastSeq", r.Track.LastValue.Sequence), zap.Uint32("seq", r.Frame.Sequence))
|
r.Warn("reader too slow", zap.Uint32("lastSeq", r.Track.LastValue.Sequence), zap.Uint32("seq", r.Frame.Sequence))
|
||||||
r.Ring = r.Track.IDRing
|
r.Ring = r.Track.IDRing
|
||||||
return r.ReadFrame()
|
return r.ReadFrame()
|
||||||
@@ -84,6 +84,7 @@ func (r *AVRingReader) MoveNext() {
|
|||||||
|
|
||||||
func (r *AVRingReader) Read(ctx context.Context, mode int) (err error) {
|
func (r *AVRingReader) Read(ctx context.Context, mode int) (err error) {
|
||||||
r.ctx = ctx
|
r.ctx = ctx
|
||||||
|
r.mode = mode
|
||||||
switch r.State {
|
switch r.State {
|
||||||
case READSTATE_INIT:
|
case READSTATE_INIT:
|
||||||
r.Info("start read", zap.Int("mode", mode))
|
r.Info("start read", zap.Int("mode", mode))
|
||||||
|
37
track/reader-data.go
Normal file
37
track/reader-data.go
Normal file
@@ -0,0 +1,37 @@
|
|||||||
|
package track
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"m7s.live/engine/v4/common"
|
||||||
|
"m7s.live/engine/v4/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
type DataReader[T any] struct {
|
||||||
|
Ctx context.Context
|
||||||
|
// common.Track
|
||||||
|
*util.Ring[common.DataFrame[T]]
|
||||||
|
// startTime time.Time
|
||||||
|
// Frame *common.DataFrame[T]
|
||||||
|
// Delay uint32
|
||||||
|
// *log.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *DataReader[T]) Read() (item *common.DataFrame[T]) {
|
||||||
|
item = &r.Value
|
||||||
|
if r.Ctx.Err() == nil && !item.CanRead {
|
||||||
|
item.Wait()
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *DataReader[T]) TryRead() (item *common.DataFrame[T]) {
|
||||||
|
if item = &r.Value; item.CanRead {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *DataReader[T]) MoveNext() {
|
||||||
|
r.Ring = r.Next()
|
||||||
|
}
|
@@ -39,7 +39,7 @@ func (av *Media) WriteRTP(raw *util.ListItem[RTPFrame]) {
|
|||||||
av.WriteRTPFrame(&frame.Value)
|
av.WriteRTPFrame(&frame.Value)
|
||||||
// av.Info("rtp", zap.Uint32("ts", (frame.Value.Timestamp)), zap.Int("len", len(frame.Value.Payload)), zap.Bool("marker", frame.Value.Marker), zap.Uint16("seq", frame.Value.SequenceNumber))
|
// av.Info("rtp", zap.Uint32("ts", (frame.Value.Timestamp)), zap.Int("len", len(frame.Value.Payload)), zap.Bool("marker", frame.Value.Marker), zap.Uint16("seq", frame.Value.SequenceNumber))
|
||||||
} else {
|
} else {
|
||||||
av.Warn("rtp payload is empty", zap.Uint32("ts", (frame.Value.Timestamp)), zap.Any("ext", frame.Value.GetExtensionIDs()), zap.Uint16("seq", frame.Value.SequenceNumber))
|
av.Debug("rtp payload is empty", zap.Uint32("ts", (frame.Value.Timestamp)), zap.Any("ext", frame.Value.GetExtensionIDs()), zap.Uint16("seq", frame.Value.SequenceNumber))
|
||||||
frame.Recycle()
|
frame.Recycle()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -24,8 +24,9 @@ type Video struct {
|
|||||||
lostFlag bool // 是否丢帧
|
lostFlag bool // 是否丢帧
|
||||||
codec.SPSInfo
|
codec.SPSInfo
|
||||||
ParamaterSets `json:"-" yaml:"-"`
|
ParamaterSets `json:"-" yaml:"-"`
|
||||||
SPS []byte `json:"-" yaml:"-"`
|
SPS []byte `json:"-" yaml:"-"`
|
||||||
PPS []byte `json:"-" yaml:"-"`
|
PPS []byte `json:"-" yaml:"-"`
|
||||||
|
SEIReader *DataReader[[]byte] `json:"-" yaml:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *Video) Attach() {
|
func (v *Video) Attach() {
|
||||||
@@ -196,7 +197,7 @@ func (vt *Video) insertDCRtp() {
|
|||||||
packet.SSRC = vt.SSRC
|
packet.SSRC = vt.SSRC
|
||||||
packet.Timestamp = uint32(vt.Value.PTS)
|
packet.Timestamp = uint32(vt.Value.PTS)
|
||||||
packet.Marker = false
|
packet.Marker = false
|
||||||
head.InsertBeforeValue(RTPFrame{&packet, nil})
|
head.InsertBeforeValue(RTPFrame{Packet: &packet})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -247,6 +248,15 @@ func (vt *Video) CompleteAVCC(rv *AVFrame) {
|
|||||||
|
|
||||||
func (vt *Video) Flush() {
|
func (vt *Video) Flush() {
|
||||||
rv := &vt.Value
|
rv := &vt.Value
|
||||||
|
if vt.SEIReader != nil {
|
||||||
|
if seiFrame := vt.SEIReader.TryRead(); seiFrame != nil {
|
||||||
|
var au util.BLL
|
||||||
|
content := vt.BytesPool.GetShell(seiFrame.Data)
|
||||||
|
au.Push(content)
|
||||||
|
vt.Value.AUList.UnshiftValue(&au)
|
||||||
|
vt.SEIReader.MoveNext()
|
||||||
|
}
|
||||||
|
}
|
||||||
if rv.IFrame {
|
if rv.IFrame {
|
||||||
vt.computeGOP()
|
vt.computeGOP()
|
||||||
vt.Stream.SetIDR(vt)
|
vt.Stream.SetIDR(vt)
|
||||||
|
@@ -1,6 +1,7 @@
|
|||||||
package engine
|
package engine
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"m7s.live/engine/v4/common"
|
||||||
. "m7s.live/engine/v4/common"
|
. "m7s.live/engine/v4/common"
|
||||||
"m7s.live/engine/v4/track"
|
"m7s.live/engine/v4/track"
|
||||||
"m7s.live/engine/v4/util"
|
"m7s.live/engine/v4/util"
|
||||||
@@ -68,15 +69,15 @@ func (w *waitTracks) Accept(t Track) bool {
|
|||||||
suber := w.Promise.Value
|
suber := w.Promise.Value
|
||||||
switch t.(type) {
|
switch t.(type) {
|
||||||
case *track.Audio:
|
case *track.Audio:
|
||||||
if w.audio.Accept(t.GetBase().Name) {
|
if w.audio.Accept(t.GetName()) {
|
||||||
suber.OnEvent(t)
|
suber.OnEvent(t)
|
||||||
}
|
}
|
||||||
case *track.Video:
|
case *track.Video:
|
||||||
if w.video.Accept(t.GetBase().Name) {
|
if w.video.Accept(t.GetName()) {
|
||||||
suber.OnEvent(t)
|
suber.OnEvent(t)
|
||||||
}
|
}
|
||||||
case track.Custom:
|
case common.Track:
|
||||||
w.data.Accept(t.GetBase().Name)
|
w.data.Accept(t.GetName())
|
||||||
suber.OnEvent(t)
|
suber.OnEvent(t)
|
||||||
}
|
}
|
||||||
if w.NeedWait() {
|
if w.NeedWait() {
|
||||||
|
Reference in New Issue
Block a user