feat: Prevent concurrent read and write issues when publishers catch up with subscribers after blocking them

desc: 防止订阅者阻塞后,发布者追上订阅者产生并发读写问题
This commit is contained in:
langhuihui
2023-06-30 09:57:52 +08:00
parent 700aef98c5
commit 6a4610df1d
17 changed files with 397 additions and 282 deletions

View File

@@ -5,6 +5,7 @@ import (
"io" "io"
"net" "net"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/pion/rtp" "github.com/pion/rtp"
@@ -44,16 +45,90 @@ func (r *RTPFrame) Unmarshal(raw []byte) *RTPFrame {
return r return r
} }
type IDataFrame[T any] interface {
Init() // 初始化
Reset() // 重置数据,复用内存
Ready() // 标记为可读取
ReaderEnter() int32 // 读取者数量+1
ReaderLeave() int32 // 读取者数量-1
StartWrite() bool // 开始写入
SetSequence(uint32) // 设置序号
GetSequence() uint32 // 获取序号
ReaderCount() int32 // 读取者数量
Discard() int32 // 如果写入时还有读取者没有离开则废弃该帧剥离RingBuffer防止并发读写
IsDiscarded() bool // 是否已废弃
IsWriting() bool // 是否正在写入
Wait() // 阻塞等待可读取
Broadcast() // 广播可读取
}
type DataFrame[T any] struct { type DataFrame[T any] struct {
DeltaTime uint32 // 相对上一帧时间戳,毫秒 DeltaTime uint32 // 相对上一帧时间戳,毫秒
WriteTime time.Time // 写入时间,可用于比较两个帧的先后 WriteTime time.Time // 写入时间,可用于比较两个帧的先后
Sequence uint32 // 在一个Track中的序号 Sequence uint32 // 在一个Track中的序号
BytesIn int // 输入字节数用于计算BPS BytesIn int // 输入字节数用于计算BPS
CanRead bool `json:"-" yaml:"-"` CanRead bool `json:"-" yaml:"-"` // 是否可读取
readerCount atomic.Int32 `json:"-" yaml:"-"` // 读取者数量
Data T `json:"-" yaml:"-"` Data T `json:"-" yaml:"-"`
sync.Cond `json:"-" yaml:"-"` sync.Cond `json:"-" yaml:"-"`
} }
func NewDataFrame[T any]() *DataFrame[T] {
return &DataFrame[T]{}
}
func (df *DataFrame[T]) IsWriting() bool {
return !df.CanRead
}
func (df *DataFrame[T]) IsDiscarded() bool {
return df.L == nil
}
func (df *DataFrame[T]) Discard() int32 {
df.L = nil //标记为废弃
return df.readerCount.Load()
}
func (df *DataFrame[T]) SetSequence(sequence uint32) {
df.Sequence = sequence
}
func (df *DataFrame[T]) GetSequence() uint32 {
return df.Sequence
}
func (df *DataFrame[T]) ReaderEnter() int32 {
return df.readerCount.Add(1)
}
func (df *DataFrame[T]) ReaderCount() int32 {
return df.readerCount.Load()
}
func (df *DataFrame[T]) ReaderLeave() int32 {
return df.readerCount.Add(-1)
}
func (df *DataFrame[T]) StartWrite() bool {
if df.readerCount.Load() > 0 {
df.Discard() //标记为废弃
return false
} else {
df.CanRead = false //标记为正在写入
return true
}
}
func (df *DataFrame[T]) Ready() {
df.WriteTime = time.Now()
df.CanRead = true //标记为可读取
df.Broadcast()
}
func (df *DataFrame[T]) Init() {
df.L = EmptyLocker
}
func (df *DataFrame[T]) Reset() { func (df *DataFrame[T]) Reset() {
df.BytesIn = 0 df.BytesIn = 0
df.DeltaTime = 0 df.DeltaTime = 0
@@ -71,6 +146,10 @@ type AVFrame struct {
AUList util.BLLs `json:"-" yaml:"-"` // 裸数据 AUList util.BLLs `json:"-" yaml:"-"` // 裸数据
} }
func NewAVFrame() *AVFrame {
return &AVFrame{}
}
func (av *AVFrame) WriteAVCC(ts uint32, frame *util.BLL) { func (av *AVFrame) WriteAVCC(ts uint32, frame *util.BLL) {
if ts == 0 { if ts == 0 {
ts = 1 ts = 1

View File

@@ -22,8 +22,8 @@ const (
) )
// Base 基础Track类 // Base 基础Track类
type Base[T any] struct { type Base[T any, F IDataFrame[T]] struct {
RingBuffer[T] `json:"-" yaml:"-"` RingWriter[T, F] `json:"-" yaml:"-"`
Name string Name string
log.Zap `json:"-" yaml:"-"` log.Zap `json:"-" yaml:"-"`
Stream IStream `json:"-" yaml:"-"` Stream IStream `json:"-" yaml:"-"`
@@ -40,7 +40,7 @@ type Base[T any] struct {
RawPart []int // 裸数据片段用于UI上显示 RawPart []int // 裸数据片段用于UI上显示
} }
func (bt *Base[T]) ComputeBPS(bytes int) { func (bt *Base[T, F]) ComputeBPS(bytes int) {
bt.bytes += bytes bt.bytes += bytes
bt.frames++ bt.frames++
if elapse := time.Since(bt.ts).Seconds(); elapse > 1 { if elapse := time.Since(bt.ts).Seconds(); elapse > 1 {
@@ -54,31 +54,31 @@ func (bt *Base[T]) ComputeBPS(bytes int) {
} }
} }
func (bt *Base[T]) GetName() string { func (bt *Base[T, F]) GetName() string {
return bt.Name return bt.Name
} }
func (bt *Base[T]) GetBPS() int { func (bt *Base[T, F]) GetBPS() int {
return bt.BPS return bt.BPS
} }
func (bt *Base[T]) GetFPS() int { func (bt *Base[T, F]) GetFPS() int {
return bt.FPS return bt.FPS
} }
func (bt *Base[T]) GetDrops() int { func (bt *Base[T, F]) GetDrops() int {
return bt.Drops return bt.Drops
} }
// GetRBSize 获取缓冲区大小 // GetRBSize 获取缓冲区大小
func (bt *Base[T]) GetRBSize() int { func (bt *Base[T, F]) GetRBSize() int {
return bt.RingBuffer.Size return bt.RingWriter.Size
} }
func (bt *Base[T]) SnapForJson() { func (bt *Base[T, F]) SnapForJson() {
} }
func (bt *Base[T]) SetStuff(stuff ...any) { func (bt *Base[T, F]) SetStuff(stuff ...any) {
for _, s := range stuff { for _, s := range stuff {
switch v := s.(type) { switch v := s.(type) {
case IStream: case IStream:

111
common/ring-writer.go Normal file
View File

@@ -0,0 +1,111 @@
package common
import (
"m7s.live/engine/v4/util"
)
type emptyLocker struct{}
func (emptyLocker) Lock() {}
func (emptyLocker) Unlock() {}
var EmptyLocker emptyLocker
type RingWriter[T any, F IDataFrame[T]] struct {
*util.Ring[F] `json:"-" yaml:"-"`
pool *util.Ring[F]
poolSize int
Size int
LastValue F
constructor func() F
}
func (rb *RingWriter[T, F]) create(n int) (ring *util.Ring[F]) {
ring = util.NewRing[F](n)
for p, i := ring, n; i > 0; p, i = p.Next(), i-1 {
p.Value = rb.constructor()
p.Value.Init()
}
return
}
func (rb *RingWriter[T, F]) Init(n int, constructor func() F) *RingWriter[T, F] {
rb.constructor = constructor
rb.Ring = rb.create(n)
rb.Size = n
rb.LastValue = rb.Value
return rb
}
// func (rb *RingBuffer[T, F]) MoveNext() F {
// rb.LastValue = rb.Value
// rb.Ring = rb.Next()
// return rb.Value
// }
func (rb *RingWriter[T, F]) Glow(size int) (newItem *util.Ring[F]) {
if size < rb.poolSize {
newItem = rb.pool.Unlink(size)
rb.poolSize -= size
} else if size == rb.poolSize {
newItem = rb.pool
rb.poolSize = 0
rb.pool = nil
} else {
newItem = rb.create(size - rb.poolSize).Link(rb.pool)
rb.poolSize = 0
rb.pool = nil
}
rb.Link(newItem)
rb.Size += size
return
}
func (rb *RingWriter[T, F]) Recycle(r *util.Ring[F]) {
rb.poolSize++
r.Value.Init()
r.Value.Reset()
if rb.pool == nil {
rb.pool = r
} else {
rb.pool.Link(r)
}
}
func (rb *RingWriter[T, F]) Reduce(size int) {
r := rb.Unlink(size)
if size > 1 {
for p := r.Next(); p != r; {
next := p.Next() //先保存下一个节点
if p.Value.Discard() == 0 {
rb.Recycle(p.Prev().Unlink(1))
} else {
// fmt.Println("Reduce", p.Value.ReaderCount())
}
p = next
}
}
if r.Value.Discard() == 0 {
rb.Recycle(r)
}
rb.Size -= size
return
}
func (rb *RingWriter[T, F]) Step() (normal bool) {
rb.LastValue.Broadcast() // 防止订阅者还在等待
rb.LastValue = rb.Value
nextSeq := rb.LastValue.GetSequence() + 1
next := rb.Next()
if normal = next.Value.StartWrite(); normal {
next.Value.Reset()
rb.Ring = next
} else {
rb.Reduce(1) //抛弃还有订阅者的节点
rb.Ring = rb.Glow(1) //补充一个新节点
rb.Value.StartWrite()
}
rb.Value.SetSequence(nextSeq)
rb.LastValue.Ready()
return
}

View File

@@ -1,55 +0,0 @@
package common
import (
"m7s.live/engine/v4/util"
)
type RingBuffer[T any] struct {
*util.Ring[T] `json:"-" yaml:"-"`
Size int
MoveCount uint32
LastValue *T
}
func (rb *RingBuffer[T]) Init(n int) *RingBuffer[T] {
if rb == nil {
rb = new(RingBuffer[T])
}
rb.Ring = util.NewRing[T](n)
rb.Size = n
rb.LastValue = &rb.Value
return rb
}
func (rb *RingBuffer[T]) MoveNext() *T {
rb.LastValue = &rb.Value
rb.Ring = rb.Next()
rb.MoveCount++
return &rb.Value
}
func (rb *RingBuffer[T]) Glow(size int) (newItem *util.Ring[T]) {
newItem = rb.Link(util.NewRing[T](size))
rb.Size += size
return
}
func (rb *RingBuffer[T]) Reduce(size int) (newItem *RingBuffer[T]) {
newItem = &RingBuffer[T]{
Ring: rb.Unlink(size),
Size: size,
}
rb.Size -= size
return
}
// Do calls function f on each element of the ring, in forward order.
// The behavior of Do is undefined if f changes *r.
func (rb *RingBuffer[T]) Do(f func(*T)) {
if rb != nil {
f(&rb.Value)
for p := rb.Next(); p != rb.Ring; p = p.Next() {
f(&p.Value)
}
}
}

View File

@@ -38,9 +38,12 @@ type Publish struct {
IdleTimeout time.Duration // 空闲(无订阅)超时 IdleTimeout time.Duration // 空闲(无订阅)超时
PauseTimeout time.Duration `default:"30s"` // 暂停超时 PauseTimeout time.Duration `default:"30s"` // 暂停超时
BufferTime time.Duration // 缓冲长度(单位:秒)0代表取最近关键帧 BufferTime time.Duration // 缓冲长度(单位:秒)0代表取最近关键帧
SpeedLimit time.Duration `default:"500ms"` //速度限制最大等待时间
Key string // 发布鉴权key Key string // 发布鉴权key
SecretArgName string `default:"secret"` // 发布鉴权参数名 SecretArgName string `default:"secret"` // 发布鉴权参数名
ExpireArgName string `default:"expire"` // 发布鉴权失效时间参数名 ExpireArgName string `default:"expire"` // 发布鉴权失效时间参数名
RingSize int `default:"256"` // 初始缓冲区大小
RingSizeMax int `default:"1024"` // 最大缓冲区大小
} }
func (c Publish) GetPublishConfig() Publish { func (c Publish) GetPublishConfig() Publish {
@@ -61,7 +64,6 @@ type Subscribe struct {
IFrameOnly bool // 只要关键帧 IFrameOnly bool // 只要关键帧
WaitTimeout time.Duration `default:"10s"` // 等待流超时 WaitTimeout time.Duration `default:"10s"` // 等待流超时
WriteBufferSize int `default:"0"` // 写缓冲大小 WriteBufferSize int `default:"0"` // 写缓冲大小
// Poll time.Duration `default:"20ms"` // 读取Ring时的轮询间隔,单位毫秒
Key string // 订阅鉴权key Key string // 订阅鉴权key
SecretArgName string `default:"secret"` // 订阅鉴权参数名 SecretArgName string `default:"secret"` // 订阅鉴权参数名
ExpireArgName string `default:"expire"` // 订阅鉴权失效时间参数名 ExpireArgName string `default:"expire"` // 订阅鉴权失效时间参数名

View File

@@ -149,9 +149,8 @@ func (tracks *Tracks) Add(name string, t Track) bool {
tracks.SetIDR(v) tracks.SetIDR(v)
} }
if tracks.SEI != nil { if tracks.SEI != nil {
v.SEIReader = &track.DataReader[[]byte]{ v.SEIReader = &track.DataReader[[]byte]{}
Ring: tracks.SEI.Ring, v.SEIReader.Ring = tracks.SEI.Ring
}
} }
case *track.Audio: case *track.Audio:
if tracks.MainVideo != nil { if tracks.MainVideo != nil {
@@ -177,12 +176,13 @@ func (tracks *Tracks) AddSEI(t byte, data []byte) bool {
l := len(data) l := len(data)
var buffer util.Buffer var buffer util.Buffer
buffer.WriteByte(t) buffer.WriteByte(t)
for l > 255 { for l >= 255 {
buffer.WriteByte(255) buffer.WriteByte(255)
l -= 255 l -= 255
} }
buffer.WriteByte(byte(l)) buffer.WriteByte(byte(l))
buffer.Write(data) buffer.Write(data)
buffer.WriteByte(0x80)
tracks.SEI.Push(buffer) tracks.SEI.Push(buffer)
return true return true
} }
@@ -274,16 +274,16 @@ func findOrCreateStream(streamPath string, waitTimeout time.Duration) (s *Stream
AppName: p[0], AppName: p[0],
StreamName: strings.Join(p[1:], "/"), StreamName: strings.Join(p[1:], "/"),
StartTime: time.Now(), StartTime: time.Now(),
Logger: log.LocaleLogger.With(zap.String("stream", streamPath)),
timeout: time.NewTimer(waitTimeout),
}) })
if s := actual.(*Stream); loaded { if s := actual.(*Stream); loaded {
s.Debug("Stream Found") s.Debug("Stream Found")
return s, false return s, false
} else { } else {
s.timeout = time.NewTimer(waitTimeout)
s.Subscribers.Init() s.Subscribers.Init()
s.Logger = log.LocaleLogger.With(zap.String("stream", streamPath))
s.Info("created")
s.actionChan.Init(1) s.actionChan.Init(1)
s.Info("created")
go s.run() go s.run()
return s, true return s, true
} }
@@ -330,11 +330,11 @@ func (r *Stream) action(action StreamAction) (ok bool) {
stateEvent = SEpublish{event} stateEvent = SEpublish{event}
} }
r.Subscribers.Broadcast(stateEvent) r.Subscribers.Broadcast(stateEvent)
if r.IdleTimeout > 0 && r.Subscribers.Len() == 0 { // if r.IdleTimeout > 0 && r.Subscribers.Len() == 0 {
return r.action(ACTION_LASTLEAVE) // return r.action(ACTION_LASTLEAVE)
} else { // } else {
r.timeout.Reset(r.PublishTimeout) // 5秒心跳检测track的存活度 r.timeout.Reset(r.PublishTimeout) // 5秒心跳检测track的存活度
} // }
case STATE_WAITCLOSE: case STATE_WAITCLOSE:
stateEvent = SEwaitClose{event} stateEvent = SEwaitClose{event}
if r.IdleTimeout > 0 { if r.IdleTimeout > 0 {
@@ -465,6 +465,10 @@ func (s *Stream) run() {
s.action(ACTION_PUBLISHLOST) s.action(ACTION_PUBLISHLOST)
continue continue
} }
if s.IdleTimeout > 0 && s.Subscribers.Len() == 0 && time.Since(s.StartTime) > s.IdleTimeout {
s.action(ACTION_LASTLEAVE)
continue
}
} }
s.timeout.Reset(time.Second * 5) s.timeout.Reset(time.Second * 5)
//订阅者等待音视频轨道超时了,放弃等待,订阅成功 //订阅者等待音视频轨道超时了,放弃等待,订阅成功

View File

@@ -9,6 +9,7 @@ import (
"time" "time"
"go.uber.org/zap" "go.uber.org/zap"
"go.uber.org/zap/zapcore"
"m7s.live/engine/v4/codec" "m7s.live/engine/v4/codec"
. "m7s.live/engine/v4/common" . "m7s.live/engine/v4/common"
"m7s.live/engine/v4/config" "m7s.live/engine/v4/config"
@@ -206,9 +207,10 @@ func (s *Subscriber) PlayBlock(subType byte) {
ctx := s.TrackPlayer.Context ctx := s.TrackPlayer.Context
conf := s.Config conf := s.Config
hasVideo, hasAudio := s.Video != nil && conf.SubVideo, s.Audio != nil && conf.SubAudio hasVideo, hasAudio := s.Video != nil && conf.SubVideo, s.Audio != nil && conf.SubAudio
defer s.onStop() stopReason := zap.String("reason", "stop")
defer s.onStop(&stopReason)
if !hasAudio && !hasVideo { if !hasAudio && !hasVideo {
s.Error("play neither video nor audio") stopReason = zap.String("reason", "play neither video nor audio")
return return
} }
sendVideoDecConf := func() { sendVideoDecConf := func() {
@@ -309,11 +311,12 @@ func (s *Subscriber) PlayBlock(subType byte) {
for ctx.Err() == nil { for ctx.Err() == nil {
if hasVideo { if hasVideo {
for ctx.Err() == nil { for ctx.Err() == nil {
s.VideoReader.Read(ctx, subMode) err := s.VideoReader.ReadFrame(subMode)
videoFrame = s.VideoReader.Frame if err != nil {
if videoFrame == nil || ctx.Err() != nil { stopReason = zap.Error(err)
return return
} }
videoFrame = s.VideoReader.Value
// fmt.Println("video", s.VideoReader.Track.PreFrame().Sequence-frame.Sequence) // fmt.Println("video", s.VideoReader.Track.PreFrame().Sequence-frame.Sequence)
if videoFrame.IFrame && s.VideoReader.DecConfChanged() { if videoFrame.IFrame && s.VideoReader.DecConfChanged() {
s.VideoReader.ConfSeq = s.VideoReader.Track.SequenceHeadSeq s.VideoReader.ConfSeq = s.VideoReader.Track.SequenceHeadSeq
@@ -323,9 +326,7 @@ func (s *Subscriber) PlayBlock(subType byte) {
if audioFrame != nil { if audioFrame != nil {
if videoFrame.Timestamp > audioFrame.Timestamp { if videoFrame.Timestamp > audioFrame.Timestamp {
// fmt.Println("switch audio", audioFrame.CanRead) // fmt.Println("switch audio", audioFrame.CanRead)
if audioFrame.CanRead {
sendAudioFrame(audioFrame) sendAudioFrame(audioFrame)
}
audioFrame = nil audioFrame = nil
break break
} }
@@ -355,11 +356,12 @@ func (s *Subscriber) PlayBlock(subType byte) {
s.AudioReader.SkipTs = s.VideoReader.SkipTs s.AudioReader.SkipTs = s.VideoReader.SkipTs
} }
} }
s.AudioReader.Read(ctx, subMode) err := s.AudioReader.ReadFrame(subMode)
audioFrame = s.AudioReader.Frame if err != nil {
if audioFrame == nil || ctx.Err() != nil { stopReason = zap.Error(err)
return return
} }
audioFrame = s.AudioReader.Value
// fmt.Println("audio", s.AudioReader.Track.PreFrame().Sequence-frame.Sequence) // fmt.Println("audio", s.AudioReader.Track.PreFrame().Sequence-frame.Sequence)
if s.AudioReader.DecConfChanged() { if s.AudioReader.DecConfChanged() {
s.AudioReader.ConfSeq = s.AudioReader.Track.SequenceHeadSeq s.AudioReader.ConfSeq = s.AudioReader.Track.SequenceHeadSeq
@@ -368,9 +370,7 @@ func (s *Subscriber) PlayBlock(subType byte) {
if hasVideo && videoFrame != nil { if hasVideo && videoFrame != nil {
if audioFrame.Timestamp > videoFrame.Timestamp { if audioFrame.Timestamp > videoFrame.Timestamp {
// fmt.Println("switch video", videoFrame.CanRead) // fmt.Println("switch video", videoFrame.CanRead)
if videoFrame.CanRead {
sendVideoFrame(videoFrame) sendVideoFrame(videoFrame)
}
videoFrame = nil videoFrame = nil
break break
} }
@@ -383,11 +383,18 @@ func (s *Subscriber) PlayBlock(subType byte) {
} }
} }
} }
if videoFrame != nil {
videoFrame.ReaderLeave()
}
if audioFrame != nil {
audioFrame.ReaderLeave()
}
stopReason = zap.Error(ctx.Err())
} }
func (s *Subscriber) onStop() { func (s *Subscriber) onStop(reason *zapcore.Field) {
if !s.Stream.IsClosed() { if !s.Stream.IsClosed() {
s.Info("stop") s.Info("stop", *reason)
if !s.Config.Internal { if !s.Config.Internal {
s.Stream.Receive(s.Spesific) s.Stream.Receive(s.Spesific)
} }

View File

@@ -4,7 +4,6 @@ import (
"fmt" "fmt"
"io" "io"
"net" "net"
"time"
"github.com/aler9/gortsplib/v2/pkg/bits" "github.com/aler9/gortsplib/v2/pkg/bits"
"go.uber.org/zap" "go.uber.org/zap"
@@ -25,7 +24,7 @@ func NewAAC(stream IStream, stuff ...any) (aac *AAC) {
aac.CodecID = codec.CodecID_AAC aac.CodecID = codec.CodecID_AAC
aac.Channels = 2 aac.Channels = 2
aac.SampleSize = 16 aac.SampleSize = 16
aac.SetStuff("aac", stream, int(256+128), byte(97), aac, time.Millisecond*10) aac.SetStuff("aac", stream, byte(97), aac)
aac.SetStuff(stuff...) aac.SetStuff(stuff...)
if aac.BytesPool == nil { if aac.BytesPool == nil {
aac.BytesPool = make(util.BytesPool, 17) aac.BytesPool = make(util.BytesPool, 17)

View File

@@ -57,7 +57,7 @@ func (av *Audio) Flush() {
} }
func (av *Audio) WriteRawBytes(pts uint32, raw util.IBytes) { func (av *Audio) WriteRawBytes(pts uint32, raw util.IBytes) {
curValue := &av.Value curValue := av.Value
curValue.BytesIn += raw.Len() curValue.BytesIn += raw.Len()
av.Value.AUList.Push(av.GetFromPool(raw)) av.Value.AUList.Push(av.GetFromPool(raw))
av.generateTimestamp(pts) av.generateTimestamp(pts)

View File

@@ -12,13 +12,6 @@ import (
"m7s.live/engine/v4/util" "m7s.live/engine/v4/util"
) )
type emptyLocker struct{}
func (emptyLocker) Lock() {}
func (emptyLocker) Unlock() {}
var EmptyLocker emptyLocker
type 流速控制 struct { type 流速控制 struct {
起始时间戳 time.Duration 起始时间戳 time.Duration
起始dts time.Duration 起始dts time.Duration
@@ -74,12 +67,12 @@ type SpesificTrack interface {
} }
type IDRingList struct { type IDRingList struct {
IDRList util.List[*util.Ring[AVFrame]] IDRList util.List[*util.Ring[*AVFrame]]
IDRing *util.Ring[AVFrame] IDRing *util.Ring[*AVFrame]
HistoryRing *util.Ring[AVFrame] HistoryRing *util.Ring[*AVFrame]
} }
func (p *IDRingList) AddIDR(IDRing *util.Ring[AVFrame]) { func (p *IDRingList) AddIDR(IDRing *util.Ring[*AVFrame]) {
p.IDRList.PushValue(IDRing) p.IDRList.PushValue(IDRing)
p.IDRing = IDRing p.IDRing = IDRing
} }
@@ -91,7 +84,7 @@ func (p *IDRingList) ShiftIDR() {
// Media 基础媒体Track类 // Media 基础媒体Track类
type Media struct { type Media struct {
Base[AVFrame] Base[any, *AVFrame]
PayloadType byte PayloadType byte
IDRingList `json:"-" yaml:"-"` //最近的关键帧位置,首屏渲染 IDRingList `json:"-" yaml:"-"` //最近的关键帧位置,首屏渲染
SSRC uint32 SSRC uint32
@@ -159,11 +152,12 @@ func (av *Media) SetStuff(stuff ...any) {
// 代表发布者已经离线该Track成为遗留Track等待下一任发布者接续发布 // 代表发布者已经离线该Track成为遗留Track等待下一任发布者接续发布
for _, s := range stuff { for _, s := range stuff {
switch v := s.(type) { switch v := s.(type) {
case int: case IStream:
av.Init(v) pubConf := v.GetPublisherConfig()
av.Value.L = EmptyLocker av.Base.SetStuff(v)
av.Init(pubConf.RingSize, NewAVFrame)
av.SSRC = uint32(uintptr(unsafe.Pointer(av))) av.SSRC = uint32(uintptr(unsafe.Pointer(av)))
av.等待上限 = config.Global.SpeedLimit av.等待上限 = pubConf.SpeedLimit
case uint32: case uint32:
av.SampleRate = v av.SampleRate = v
case byte: case byte:
@@ -183,7 +177,7 @@ func (av *Media) LastWriteTime() time.Time {
} }
func (av *Media) CurrentFrame() *AVFrame { func (av *Media) CurrentFrame() *AVFrame {
return &av.Value return av.Value
} }
func (av *Media) PreFrame() *AVFrame { func (av *Media) PreFrame() *AVFrame {
return av.LastValue return av.LastValue
@@ -212,9 +206,7 @@ func (av *Media) narrow(gop int) {
av.Trace("resize", zap.Int("before", av.Size), zap.Int("after", av.Size-5)) av.Trace("resize", zap.Int("before", av.Size), zap.Int("after", av.Size-5))
} }
//缩小缓冲环节省内存 //缩小缓冲环节省内存
av.Reduce(5).Do(func(v *AVFrame) { av.Reduce(5)
v.Reset()
})
} }
} }
@@ -230,7 +222,7 @@ func (av *Media) AddIDR() {
} }
func (av *Media) Flush() { func (av *Media) Flush() {
curValue, preValue, nextValue := &av.Value, av.LastValue, av.Next() curValue, preValue, nextValue := av.Value, av.LastValue, av.Next()
useDts := curValue.Timestamp == 0 useDts := curValue.Timestamp == 0
if av.State == TrackStateOffline { if av.State == TrackStateOffline {
av.State = TrackStateOnline av.State = TrackStateOnline
@@ -308,20 +300,10 @@ func (av *Media) Flush() {
} }
} }
av.ComputeBPS(curValue.BytesIn) av.ComputeBPS(curValue.BytesIn)
curValue.WriteTime = time.Now() av.Step()
if av.等待上限 > 0 { if av.等待上限 > 0 {
av.控制流速(curValue.Timestamp, curValue.DTS) av.控制流速(curValue.Timestamp, curValue.DTS)
} }
preValue = curValue
curValue = av.MoveNext()
curValue.CanRead = false
curValue.Reset()
if curValue.L == nil {
curValue.L = EmptyLocker
}
curValue.Sequence = av.MoveCount
preValue.CanRead = true
preValue.Broadcast()
} }
func deltaTS(curTs time.Duration, preTs time.Duration) time.Duration { func deltaTS(curTs time.Duration, preTs time.Duration) time.Duration {

View File

@@ -12,53 +12,44 @@ import (
) )
type Data[T any] struct { type Data[T any] struct {
Base[DataFrame[T]] Base[T, *DataFrame[T]]
sync.Locker `json:"-" yaml:"-"` // 写入锁,可选,单一协程写入可以不加锁 sync.Locker `json:"-" yaml:"-"` // 写入锁,可选,单一协程写入可以不加锁
} }
func (dt *Data[T]) Init(n int) {
dt.Base.Init(n, NewDataFrame[T])
}
func (dt *Data[T]) Push(data T) { func (dt *Data[T]) Push(data T) {
if dt.Locker != nil { if dt.Locker != nil {
dt.Lock() dt.Lock()
defer dt.Unlock() defer dt.Unlock()
} }
curValue := &dt.Value curValue := dt.Value
if log.Trace { if log.Trace {
dt.Trace("push data", zap.Uint32("sequence", curValue.Sequence)) dt.Trace("push data", zap.Uint32("sequence", curValue.Sequence))
} }
curValue.WriteTime = time.Now()
curValue.Data = data curValue.Data = data
preValue := curValue dt.Step()
curValue = dt.MoveNext()
curValue.CanRead = false
curValue.Reset()
if curValue.L == nil {
curValue.L = EmptyLocker
}
curValue.Sequence = dt.MoveCount
preValue.CanRead = true
preValue.Broadcast()
} }
func (d *Data[T]) Play(ctx context.Context, onData func(*DataFrame[T]) error) (err error) { func (d *Data[T]) Play(ctx context.Context, onData func(*DataFrame[T]) error) (err error) {
d.Debug("play data track") d.Debug("play data track")
reader := DataReader[T]{ reader := DataReader[T]{}
Ctx: ctx, for err = reader.Read(d.Ring); err == nil; err = reader.ReadNext() {
Ring: d.Ring,
}
for {
curValue := reader.Read()
if err = ctx.Err(); err != nil {
return
}
if log.Trace { if log.Trace {
d.Trace("read data", zap.Uint32("sequence", curValue.Sequence)) d.Trace("read data", zap.Uint32("sequence", reader.Value.Sequence))
} }
if err = onData(curValue); err == nil { if err = onData(reader.Value); err == nil {
err = ctx.Err() err = ctx.Err()
} }
reader.MoveNext() if err != nil {
reader.Value.ReaderLeave()
return
} }
} }
return
}
func (d *Data[T]) Attach(s IStream) { func (d *Data[T]) Attach(s IStream) {
d.SetStuff(s) d.SetStuff(s)
@@ -80,7 +71,6 @@ func (d *Data[T]) LastWriteTime() time.Time {
func NewDataTrack[T any](name string) (dt *Data[T]) { func NewDataTrack[T any](name string) (dt *Data[T]) {
dt = &Data[T]{} dt = &Data[T]{}
dt.Init(10) dt.Init(10)
dt.Value.L = EmptyLocker
dt.SetStuff(name) dt.SetStuff(name)
return return
} }
@@ -94,30 +84,18 @@ func (dt *RecycleData[T]) Push(data T) {
dt.Lock() dt.Lock()
defer dt.Unlock() defer dt.Unlock()
} }
curValue := &dt.Value curValue := dt.Value
if log.Trace { if log.Trace {
dt.Trace("push data", zap.Uint32("sequence", curValue.Sequence)) dt.Trace("push data", zap.Uint32("sequence", curValue.Sequence))
} }
curValue.WriteTime = time.Now()
curValue.Data = data curValue.Data = data
preValue := curValue dt.Step()
curValue = dt.MoveNext() dt.Value.Data.Recycle()
curValue.CanRead = false
curValue.Reset()
if curValue.L == nil {
curValue.L = EmptyLocker
} else {
curValue.Data.Recycle()
}
curValue.Sequence = dt.MoveCount
preValue.CanRead = true
preValue.Broadcast()
} }
func NewRecycleDataTrack[T util.Recyclable](name string) (dt *RecycleData[T]) { func NewRecycleDataTrack[T util.Recyclable](name string) (dt *RecycleData[T]) {
dt = &RecycleData[T]{} dt = &RecycleData[T]{}
dt.Init(10) dt.Init(10)
dt.Value.L = EmptyLocker
dt.SetStuff(name) dt.SetStuff(name)
return return
} }
@@ -132,7 +110,6 @@ func NewBytesDataTrack(name string) (dt *BytesData) {
Pool: make(util.BytesPool, 17), Pool: make(util.BytesPool, 17),
} }
dt.Init(10) dt.Init(10)
dt.Value.L = EmptyLocker
dt.SetStuff(name) dt.SetStuff(name)
return return
} }

View File

@@ -2,7 +2,6 @@ package track
import ( import (
"io" "io"
"time"
"go.uber.org/zap" "go.uber.org/zap"
"m7s.live/engine/v4/codec" "m7s.live/engine/v4/codec"
@@ -29,7 +28,7 @@ func NewG711(stream IStream, alaw bool, stuff ...any) (g711 *G711) {
g711.SampleSize = 8 g711.SampleSize = 8
g711.Channels = 1 g711.Channels = 1
g711.AVCCHead = []byte{(byte(g711.CodecID) << 4) | (1 << 1)} g711.AVCCHead = []byte{(byte(g711.CodecID) << 4) | (1 << 1)}
g711.SetStuff(stream, int(32), uint32(8000), g711, time.Millisecond*10) g711.SetStuff(stream, uint32(8000), g711)
g711.SetStuff(stuff...) g711.SetStuff(stuff...)
if g711.BytesPool == nil { if g711.BytesPool == nil {
g711.BytesPool = make(util.BytesPool, 17) g711.BytesPool = make(util.BytesPool, 17)

View File

@@ -2,7 +2,6 @@ package track
import ( import (
"io" "io"
"time"
"go.uber.org/zap" "go.uber.org/zap"
"m7s.live/engine/v4/codec" "m7s.live/engine/v4/codec"
@@ -20,7 +19,7 @@ type H264 struct {
func NewH264(stream IStream, stuff ...any) (vt *H264) { func NewH264(stream IStream, stuff ...any) (vt *H264) {
vt = &H264{} vt = &H264{}
vt.Video.CodecID = codec.CodecID_H264 vt.Video.CodecID = codec.CodecID_H264
vt.SetStuff("h264", int(256), byte(96), uint32(90000), stream, vt, time.Millisecond*10) vt.SetStuff("h264", byte(96), uint32(90000), stream, vt)
vt.SetStuff(stuff...) vt.SetStuff(stuff...)
if vt.BytesPool == nil { if vt.BytesPool == nil {
vt.BytesPool = make(util.BytesPool, 17) vt.BytesPool = make(util.BytesPool, 17)
@@ -114,7 +113,7 @@ func (vt *H264) WriteRTPFrame(frame *RTPFrame) {
vt.lostFlag = true vt.lostFlag = true
vt.Warn("lost rtp packet", zap.Uint16("lastSeq", vt.lastSeq), zap.Uint16("lastSeq2", vt.lastSeq2)) vt.Warn("lost rtp packet", zap.Uint16("lastSeq", vt.lastSeq), zap.Uint16("lastSeq2", vt.lastSeq2))
} }
rv := &vt.Value rv := vt.Value
if naluType := frame.H264Type(); naluType < 24 { if naluType := frame.H264Type(); naluType < 24 {
vt.WriteSliceBytes(frame.Payload) vt.WriteSliceBytes(frame.Payload)
} else { } else {

View File

@@ -2,7 +2,6 @@ package track
import ( import (
"io" "io"
"time"
"go.uber.org/zap" "go.uber.org/zap"
"m7s.live/engine/v4/codec" "m7s.live/engine/v4/codec"
@@ -20,7 +19,7 @@ type H265 struct {
func NewH265(stream IStream, stuff ...any) (vt *H265) { func NewH265(stream IStream, stuff ...any) (vt *H265) {
vt = &H265{} vt = &H265{}
vt.Video.CodecID = codec.CodecID_H265 vt.Video.CodecID = codec.CodecID_H265
vt.SetStuff("h265", int(256), byte(96), uint32(90000), stream, vt, time.Millisecond*10) vt.SetStuff("h265", byte(96), uint32(90000), stream, vt)
vt.SetStuff(stuff...) vt.SetStuff(stuff...)
if vt.BytesPool == nil { if vt.BytesPool == nil {
vt.BytesPool = make(util.BytesPool, 17) vt.BytesPool = make(util.BytesPool, 17)
@@ -135,7 +134,7 @@ func (vt *H265) WriteAVCC(ts uint32, frame *util.BLL) (err error) {
} }
func (vt *H265) WriteRTPFrame(frame *RTPFrame) { func (vt *H265) WriteRTPFrame(frame *RTPFrame) {
rv := &vt.Value rv := vt.Value
// TODO: DONL may need to be parsed if `sprop-max-don-diff` is greater than 0 on the RTP stream. // TODO: DONL may need to be parsed if `sprop-max-don-diff` is greater than 0 on the RTP stream.
var usingDonlField bool var usingDonlField bool
var buffer = util.Buffer(frame.Payload) var buffer = util.Buffer(frame.Payload)

View File

@@ -1,13 +1,12 @@
package track package track
import ( import (
"context" "errors"
"time" "time"
"go.uber.org/zap" "go.uber.org/zap"
"m7s.live/engine/v4/common" "m7s.live/engine/v4/common"
"m7s.live/engine/v4/log" "m7s.live/engine/v4/log"
"m7s.live/engine/v4/util"
) )
const ( const (
@@ -21,12 +20,12 @@ const (
SUBMODE_BUFFER SUBMODE_BUFFER
) )
var ErrDiscard = errors.New("dsicard")
type AVRingReader struct { type AVRingReader struct {
ctx context.Context RingReader[any, *common.AVFrame]
mode int mode int
Track *Media Track *Media
*util.Ring[common.AVFrame]
// wait func()
State byte State byte
FirstSeq uint32 FirstSeq uint32
FirstTs time.Duration FirstTs time.Duration
@@ -34,7 +33,6 @@ type AVRingReader struct {
beforeJump time.Duration beforeJump time.Duration
ConfSeq int ConfSeq int
startTime time.Time startTime time.Time
Frame *common.AVFrame
AbsTime uint32 AbsTime uint32
Delay uint32 Delay uint32
*log.Logger *log.Logger
@@ -45,45 +43,25 @@ func (r *AVRingReader) DecConfChanged() bool {
} }
func NewAVRingReader(t *Media) *AVRingReader { func NewAVRingReader(t *Media) *AVRingReader {
r := &AVRingReader{ return &AVRingReader{
Track: t, Track: t,
} }
// if poll == 0 {
// r.wait = runtime.Gosched
// } else {
// r.wait = func() {
// time.Sleep(poll)
// }
// }
return r
} }
func (r *AVRingReader) ReadFrame() *common.AVFrame { func (r *AVRingReader) readFrame() (err error) {
if r.Frame = &r.Value; r.ctx.Err() == nil && !r.Frame.CanRead { err = r.ReadNext()
r.Frame.Wait() if err != nil {
return err
} }
// 超过一半的缓冲区大小说明Reader太慢需要丢帧 // 超过一半的缓冲区大小说明Reader太慢需要丢帧
if r.mode != SUBMODE_BUFFER && r.State == READSTATE_NORMAL && r.Track.LastValue.Sequence-r.Frame.Sequence > uint32(r.Track.Size/2) && r.Track.IDRing != nil && r.Track.IDRing.Value.Sequence > r.Frame.Sequence { if r.mode != SUBMODE_BUFFER && r.State == READSTATE_NORMAL && r.Track.LastValue.Sequence-r.Value.Sequence > uint32(r.Track.Size/2) && r.Track.IDRing != nil && r.Track.IDRing.Value.Sequence > r.Value.Sequence {
r.Warn("reader too slow", zap.Uint32("lastSeq", r.Track.LastValue.Sequence), zap.Uint32("seq", r.Frame.Sequence)) r.Warn("reader too slow", zap.Uint32("lastSeq", r.Track.LastValue.Sequence), zap.Uint32("seq", r.Value.Sequence))
r.Ring = r.Track.IDRing return r.Read(r.Track.IDRing)
return r.ReadFrame()
} }
return r.Frame
}
func (r *AVRingReader) TryRead() (item *common.AVFrame) {
if item = &r.Value; item.CanRead {
return return
} }
return nil
}
func (r *AVRingReader) MoveNext() { func (r *AVRingReader) ReadFrame(mode int) (err error) {
r.Ring = r.Next()
}
func (r *AVRingReader) Read(ctx context.Context, mode int) (err error) {
r.ctx = ctx
r.mode = mode r.mode = mode
switch r.State { switch r.State {
case READSTATE_INIT: case READSTATE_INIT:
@@ -109,57 +87,56 @@ func (r *AVRingReader) Read(ctx context.Context, mode int) (err error) {
} }
r.State = READSTATE_NORMAL r.State = READSTATE_NORMAL
} }
r.Ring = startRing if err = r.StartRead(startRing); err != nil {
r.ReadFrame()
if err = r.ctx.Err(); err != nil {
return return
} }
r.startTime = time.Now() r.startTime = time.Now()
if r.FirstTs == 0 { if r.FirstTs == 0 {
r.FirstTs = r.Frame.Timestamp r.FirstTs = r.Value.Timestamp
} }
r.SkipTs = r.FirstTs r.SkipTs = r.FirstTs
r.FirstSeq = r.Frame.Sequence r.FirstSeq = r.Value.Sequence
r.Info("first frame read", zap.Duration("firstTs", r.FirstTs), zap.Uint32("firstSeq", r.FirstSeq)) r.Info("first frame read", zap.Duration("firstTs", r.FirstTs), zap.Uint32("firstSeq", r.FirstSeq))
case READSTATE_FIRST: case READSTATE_FIRST:
if r.Track.IDRing.Value.Sequence != r.FirstSeq { if r.Track.IDRing.Value.Sequence != r.FirstSeq {
r.Ring = r.Track.IDRing if err = r.Read(r.Track.IDRing); err != nil {
frame := r.ReadFrame() // 直接跳到最近的关键帧
if err = r.ctx.Err(); err != nil {
return return
} }
r.SkipTs = frame.Timestamp - r.beforeJump r.SkipTs = r.Value.Timestamp - r.beforeJump
r.Info("jump", zap.Uint32("skipSeq", r.Track.IDRing.Value.Sequence-r.FirstSeq), zap.Duration("skipTs", r.SkipTs)) r.Info("jump", zap.Uint32("skipSeq", r.Track.IDRing.Value.Sequence-r.FirstSeq), zap.Duration("skipTs", r.SkipTs))
r.State = READSTATE_NORMAL r.State = READSTATE_NORMAL
} else { } else {
r.MoveNext() if err = r.readFrame(); err != nil {
frame := r.ReadFrame() return
r.beforeJump = frame.Timestamp - r.FirstTs }
r.beforeJump = r.Value.Timestamp - r.FirstTs
// 防止过快消费 // 防止过快消费
if fast := r.beforeJump - time.Since(r.startTime); fast > 0 && fast < time.Second { if fast := r.beforeJump - time.Since(r.startTime); fast > 0 && fast < time.Second {
time.Sleep(fast) time.Sleep(fast)
} }
} }
case READSTATE_NORMAL: case READSTATE_NORMAL:
r.MoveNext() if err = r.readFrame(); err != nil {
r.ReadFrame() return
} }
r.AbsTime = uint32((r.Frame.Timestamp - r.SkipTs).Milliseconds()) }
r.AbsTime = uint32((r.Value.Timestamp - r.SkipTs).Milliseconds())
if r.AbsTime == 0 { if r.AbsTime == 0 {
r.AbsTime = 1 r.AbsTime = 1
} }
r.Delay = uint32((r.Track.LastValue.Timestamp - r.Frame.Timestamp).Milliseconds()) // r.Delay = uint32((r.Track.LastValue.Timestamp - r.Value.Timestamp).Milliseconds())
r.Delay = uint32(r.Track.LastValue.Sequence - r.Value.Sequence)
// fmt.Println(r.Track.Name, r.Delay) // fmt.Println(r.Track.Name, r.Delay)
// println(r.Track.Name, r.State, r.Frame.AbsTime, r.SkipTs, r.AbsTime) // println(r.Track.Name, r.State, r.Frame.AbsTime, r.SkipTs, r.AbsTime)
return return
} }
func (r *AVRingReader) GetPTS32() uint32 { func (r *AVRingReader) GetPTS32() uint32 {
return uint32((r.Frame.PTS - r.SkipTs*90/time.Millisecond)) return uint32((r.Value.PTS - r.SkipTs*90/time.Millisecond))
} }
func (r *AVRingReader) GetDTS32() uint32 { func (r *AVRingReader) GetDTS32() uint32 {
return uint32((r.Frame.DTS - r.SkipTs*90/time.Millisecond)) return uint32((r.Value.DTS - r.SkipTs*90/time.Millisecond))
} }
func (r *AVRingReader) ResetAbsTime() { func (r *AVRingReader) ResetAbsTime() {
r.SkipTs = r.Frame.Timestamp r.SkipTs = r.Value.Timestamp
r.AbsTime = 1 r.AbsTime = 1
} }

View File

@@ -1,37 +1,71 @@
package track package track
import ( import (
"context"
"m7s.live/engine/v4/common" "m7s.live/engine/v4/common"
"m7s.live/engine/v4/util" "m7s.live/engine/v4/util"
) )
type DataReader[T any] struct { type RingReader[T any, F common.IDataFrame[T]] struct {
Ctx context.Context *util.Ring[F]
// common.Track Count int // 读取的帧数
*util.Ring[common.DataFrame[T]]
// startTime time.Time
// Frame *common.DataFrame[T]
// Delay uint32
// *log.Logger
} }
func (r *DataReader[T]) Read() (item *common.DataFrame[T]) { func (r *RingReader[T, F]) StartRead(ring *util.Ring[F]) (err error) {
item = &r.Value r.Ring = ring
if r.Ctx.Err() == nil && !item.CanRead { if r.Value.IsDiscarded() {
item.Wait() return ErrDiscard
} }
if r.Value.IsWriting() {
// t := time.Now()
r.Value.Wait()
// log.Info("wait", time.Since(t))
}
r.Count++
r.Value.ReaderEnter()
return return
} }
func (r *DataReader[T]) TryRead() (item *common.DataFrame[T]) { func (r *RingReader[T, F]) TryRead() (f F, err error) {
if item = &r.Value; item.CanRead { if r.Count > 0 {
preValue := r.Value
if preValue.IsDiscarded() {
preValue.ReaderLeave()
err = ErrDiscard
return return
} }
return nil if r.Next().Value.IsWriting() {
return
} }
defer preValue.ReaderLeave()
func (r *DataReader[T]) MoveNext() {
r.Ring = r.Next() r.Ring = r.Next()
} else {
if r.Value.IsWriting() {
return
}
}
if r.Value.IsDiscarded() {
err = ErrDiscard
return
}
r.Count++
f = r.Value
r.Value.ReaderEnter()
return
}
func (r *RingReader[T, F]) ReadNext() (err error) {
return r.Read(r.Next())
}
func (r *RingReader[T, F]) Read(ring *util.Ring[F]) (err error) {
preValue := r.Value
defer preValue.ReaderLeave()
if preValue.IsDiscarded() {
return ErrDiscard
}
return r.StartRead(ring)
}
type DataReader[T any] struct {
RingReader[T, *common.DataFrame[T]]
} }

View File

@@ -247,14 +247,15 @@ func (vt *Video) CompleteAVCC(rv *AVFrame) {
} }
func (vt *Video) Flush() { func (vt *Video) Flush() {
rv := &vt.Value rv := vt.Value
if vt.SEIReader != nil { if vt.SEIReader != nil {
if seiFrame := vt.SEIReader.TryRead(); seiFrame != nil { if seiFrame, err := vt.SEIReader.TryRead(); seiFrame != nil {
var au util.BLL var au util.BLL
au.Push(vt.SpesificTrack.GetNALU_SEI()) au.Push(vt.SpesificTrack.GetNALU_SEI())
au.Push(vt.BytesPool.GetShell(seiFrame.Data)) au.Push(vt.BytesPool.GetShell(seiFrame.Data))
vt.Value.AUList.UnshiftValue(&au) vt.Value.AUList.UnshiftValue(&au)
vt.SEIReader.MoveNext() } else if err != nil {
vt.SEIReader = nil
} }
} }
if rv.IFrame { if rv.IFrame {