diff --git a/common/frame.go b/common/frame.go index efa33c7..4c9dd79 100644 --- a/common/frame.go +++ b/common/frame.go @@ -5,6 +5,7 @@ import ( "io" "net" "sync" + "sync/atomic" "time" "github.com/pion/rtp" @@ -44,14 +45,88 @@ func (r *RTPFrame) Unmarshal(raw []byte) *RTPFrame { return r } +type IDataFrame[T any] interface { + Init() // 初始化 + Reset() // 重置数据,复用内存 + Ready() // 标记为可读取 + ReaderEnter() int32 // 读取者数量+1 + ReaderLeave() int32 // 读取者数量-1 + StartWrite() bool // 开始写入 + SetSequence(uint32) // 设置序号 + GetSequence() uint32 // 获取序号 + ReaderCount() int32 // 读取者数量 + Discard() int32 // 如果写入时还有读取者没有离开则废弃该帧,剥离RingBuffer,防止并发读写 + IsDiscarded() bool // 是否已废弃 + IsWriting() bool // 是否正在写入 + Wait() // 阻塞等待可读取 + Broadcast() // 广播可读取 +} + type DataFrame[T any] struct { - DeltaTime uint32 // 相对上一帧时间戳,毫秒 - WriteTime time.Time // 写入时间,可用于比较两个帧的先后 - Sequence uint32 // 在一个Track中的序号 - BytesIn int // 输入字节数用于计算BPS - CanRead bool `json:"-" yaml:"-"` - Data T `json:"-" yaml:"-"` - sync.Cond `json:"-" yaml:"-"` + DeltaTime uint32 // 相对上一帧时间戳,毫秒 + WriteTime time.Time // 写入时间,可用于比较两个帧的先后 + Sequence uint32 // 在一个Track中的序号 + BytesIn int // 输入字节数用于计算BPS + CanRead bool `json:"-" yaml:"-"` // 是否可读取 + readerCount atomic.Int32 `json:"-" yaml:"-"` // 读取者数量 + Data T `json:"-" yaml:"-"` + sync.Cond `json:"-" yaml:"-"` +} + +func NewDataFrame[T any]() *DataFrame[T] { + return &DataFrame[T]{} +} +func (df *DataFrame[T]) IsWriting() bool { + return !df.CanRead +} + +func (df *DataFrame[T]) IsDiscarded() bool { + return df.L == nil +} + +func (df *DataFrame[T]) Discard() int32 { + df.L = nil //标记为废弃 + return df.readerCount.Load() +} + +func (df *DataFrame[T]) SetSequence(sequence uint32) { + df.Sequence = sequence +} + +func (df *DataFrame[T]) GetSequence() uint32 { + return df.Sequence +} + +func (df *DataFrame[T]) ReaderEnter() int32 { + return df.readerCount.Add(1) +} + +func (df *DataFrame[T]) ReaderCount() int32 { + return df.readerCount.Load() +} + +func (df *DataFrame[T]) ReaderLeave() int32 { + return df.readerCount.Add(-1) +} + +func (df *DataFrame[T]) StartWrite() bool { + if df.readerCount.Load() > 0 { + df.Discard() //标记为废弃 + return false + } else { + df.CanRead = false //标记为正在写入 + return true + } +} + +func (df *DataFrame[T]) Ready() { + df.WriteTime = time.Now() + df.CanRead = true //标记为可读取 + df.Broadcast() +} + +func (df *DataFrame[T]) Init() { + df.L = EmptyLocker } func (df *DataFrame[T]) Reset() { @@ -71,6 +146,10 @@ type AVFrame struct { AUList util.BLLs `json:"-" yaml:"-"` // 裸数据 } +func NewAVFrame() *AVFrame { + return &AVFrame{} +} + func (av *AVFrame) WriteAVCC(ts uint32, frame *util.BLL) { if ts == 0 { ts = 1 diff --git a/common/index.go b/common/index.go index 0b970e4..bf75a34 100644 --- a/common/index.go +++ b/common/index.go @@ -22,25 +22,25 @@ const ( ) // Base 基础Track类 -type Base[T any] struct { - RingBuffer[T] `json:"-" yaml:"-"` - Name string - log.Zap `json:"-" yaml:"-"` - Stream IStream `json:"-" yaml:"-"` - Attached atomic.Bool `json:"-" yaml:"-"` - State TrackState - ts time.Time - bytes int - frames int - DropCount int `json:"-" yaml:"-"` //丢帧数 - BPS int - FPS int - Drops int // 丢帧率 - RawSize int // 裸数据长度 - RawPart []int // 裸数据片段用于UI上显示 +type Base[T any, F IDataFrame[T]] struct { + RingWriter[T, F] `json:"-" yaml:"-"` + Name string + log.Zap `json:"-" yaml:"-"` + Stream IStream `json:"-" yaml:"-"` + Attached atomic.Bool `json:"-" yaml:"-"` + State TrackState + ts time.Time + bytes int + frames int + DropCount int `json:"-" yaml:"-"` //丢帧数 + BPS int + FPS int + Drops int // 丢帧率 + RawSize int // 裸数据长度 + RawPart []int // 裸数据片段用于UI上显示 } -func (bt *Base[T]) ComputeBPS(bytes int) { +func (bt *Base[T, F]) ComputeBPS(bytes int) { bt.bytes += bytes bt.frames++ if elapse := time.Since(bt.ts).Seconds(); elapse > 1 { @@ -54,31 +54,31 @@ func (bt *Base[T]) ComputeBPS(bytes int) { } } -func (bt *Base[T]) GetName() string { +func (bt *Base[T, F]) GetName() string { return bt.Name } -func (bt *Base[T]) GetBPS() int { +func (bt *Base[T, F]) GetBPS() int { return bt.BPS } -func (bt *Base[T]) GetFPS() int { +func (bt *Base[T, F]) GetFPS() int { return bt.FPS } -func (bt *Base[T]) GetDrops() int { +func (bt *Base[T, F]) GetDrops() int { return bt.Drops } // GetRBSize 获取缓冲区大小 -func (bt *Base[T]) GetRBSize() int { - return bt.RingBuffer.Size +func (bt *Base[T, F]) GetRBSize() int { + return bt.RingWriter.Size } -func (bt *Base[T]) SnapForJson() { +func (bt *Base[T, F]) SnapForJson() { } -func (bt *Base[T]) SetStuff(stuff ...any) { +func (bt *Base[T, F]) SetStuff(stuff ...any) { for _, s := range stuff { switch v := s.(type) { case IStream: diff --git a/common/ring-writer.go b/common/ring-writer.go new file mode 100644 index 0000000..3414a65 --- /dev/null +++ b/common/ring-writer.go @@ -0,0 +1,111 @@ +package common + +import ( + "m7s.live/engine/v4/util" +) + +type emptyLocker struct{} + +func (emptyLocker) Lock() {} +func (emptyLocker) Unlock() {} + +var EmptyLocker emptyLocker + +type RingWriter[T any, F IDataFrame[T]] struct { + *util.Ring[F] `json:"-" yaml:"-"` + pool *util.Ring[F] + poolSize int + Size int + LastValue F + constructor func() F +} + +func (rb *RingWriter[T, F]) create(n int) (ring *util.Ring[F]) { + ring = util.NewRing[F](n) + for p, i := ring, n; i > 0; p, i = p.Next(), i-1 { + p.Value = rb.constructor() + p.Value.Init() + } + return +} + +func (rb *RingWriter[T, F]) Init(n int, constructor func() F) *RingWriter[T, F] { + rb.constructor = constructor + rb.Ring = rb.create(n) + rb.Size = n + rb.LastValue = rb.Value + return rb +} + +// func (rb *RingBuffer[T, F]) MoveNext() F { +// rb.LastValue = rb.Value +// rb.Ring = rb.Next() +// return rb.Value +// } + +func (rb *RingWriter[T, F]) Glow(size int) (newItem *util.Ring[F]) { + if size < rb.poolSize { + newItem = rb.pool.Unlink(size) + rb.poolSize -= size + } else if size == rb.poolSize { + newItem = rb.pool + rb.poolSize = 0 + rb.pool = nil + } else { + newItem = rb.create(size - rb.poolSize).Link(rb.pool) + rb.poolSize = 0 + rb.pool = nil + } + rb.Link(newItem) + rb.Size += size + return +} + +func (rb *RingWriter[T, F]) Recycle(r *util.Ring[F]) { + rb.poolSize++ + r.Value.Init() + r.Value.Reset() + if rb.pool == nil { + rb.pool = r + } else { + rb.pool.Link(r) + } +} + +func (rb *RingWriter[T, F]) Reduce(size int) { + r := rb.Unlink(size) + if size > 1 { + for p := r.Next(); p != r; { + next := p.Next() //先保存下一个节点 + if p.Value.Discard() == 0 { + rb.Recycle(p.Prev().Unlink(1)) + } else { + // fmt.Println("Reduce", p.Value.ReaderCount()) + } + p = next + } + } + if r.Value.Discard() == 0 { + rb.Recycle(r) + } + rb.Size -= size + return +} + +func (rb *RingWriter[T, F]) Step() (normal bool) { + rb.LastValue.Broadcast() // 防止订阅者还在等待 + rb.LastValue = rb.Value + nextSeq := rb.LastValue.GetSequence() + 1 + next := rb.Next() + if normal = next.Value.StartWrite(); normal { + next.Value.Reset() + rb.Ring = next + } else { + rb.Reduce(1) //抛弃还有订阅者的节点 + rb.Ring = rb.Glow(1) //补充一个新节点 + rb.Value.StartWrite() + } + rb.Value.SetSequence(nextSeq) + rb.LastValue.Ready() + return +} diff --git a/common/ring.go b/common/ring.go deleted file mode 100644 index 8aa7589..0000000 --- a/common/ring.go +++ /dev/null @@ -1,55 +0,0 @@ -package common - -import ( - "m7s.live/engine/v4/util" -) - -type RingBuffer[T any] struct { - *util.Ring[T] `json:"-" yaml:"-"` - Size int - MoveCount uint32 - LastValue *T -} - -func (rb *RingBuffer[T]) Init(n int) *RingBuffer[T] { - if rb == nil { - rb = new(RingBuffer[T]) - } - rb.Ring = util.NewRing[T](n) - rb.Size = n - rb.LastValue = &rb.Value - return rb -} - -func (rb *RingBuffer[T]) MoveNext() *T { - rb.LastValue = &rb.Value - rb.Ring = rb.Next() - rb.MoveCount++ - return &rb.Value -} - -func (rb *RingBuffer[T]) Glow(size int) (newItem *util.Ring[T]) { - newItem = rb.Link(util.NewRing[T](size)) - rb.Size += size - return -} - -func (rb *RingBuffer[T]) Reduce(size int) (newItem *RingBuffer[T]) { - newItem = &RingBuffer[T]{ - Ring: rb.Unlink(size), - Size: size, - } - rb.Size -= size - return -} - -// Do calls function f on each element of the ring, in forward order. -// The behavior of Do is undefined if f changes *r. -func (rb *RingBuffer[T]) Do(f func(*T)) { - if rb != nil { - f(&rb.Value) - for p := rb.Next(); p != rb.Ring; p = p.Next() { - f(&p.Value) - } - } -} diff --git a/config/types.go b/config/types.go index a276789..36c3933 100755 --- a/config/types.go +++ b/config/types.go @@ -38,9 +38,12 @@ type Publish struct { IdleTimeout time.Duration // 空闲(无订阅)超时 PauseTimeout time.Duration `default:"30s"` // 暂停超时 BufferTime time.Duration // 缓冲长度(单位:秒),0代表取最近关键帧 + SpeedLimit time.Duration `default:"500ms"` //速度限制最大等待时间 Key string // 发布鉴权key SecretArgName string `default:"secret"` // 发布鉴权参数名 ExpireArgName string `default:"expire"` // 发布鉴权失效时间参数名 + RingSize int `default:"256"` // 初始缓冲区大小 + RingSizeMax int `default:"1024"` // 最大缓冲区大小 } func (c Publish) GetPublishConfig() Publish { @@ -59,9 +62,8 @@ type Subscribe struct { SubDataTracks []string // 指定订阅的数据轨道 SubMode int // 0,实时模式:追赶发布者进度,在播放首屏后等待发布者的下一个关键帧,然后跳到该帧。1、首屏后不进行追赶。2、从缓冲最大的关键帧开始播放,也不追赶,需要发布者配置缓存长度 IFrameOnly bool // 只要关键帧 - WaitTimeout time.Duration `default:"10s"` // 等待流超时 - WriteBufferSize int `default:"0"` // 写缓冲大小 - // Poll time.Duration `default:"20ms"` // 读取Ring时的轮询间隔,单位毫秒 + WaitTimeout time.Duration `default:"10s"` // 等待流超时 + WriteBufferSize int `default:"0"` // 写缓冲大小 Key string // 订阅鉴权key SecretArgName string `default:"secret"` // 订阅鉴权参数名 ExpireArgName string `default:"expire"` // 订阅鉴权失效时间参数名 diff --git a/stream.go b/stream.go index 14a697e..0bdfd6f 100644 --- a/stream.go +++ b/stream.go @@ -149,9 +149,8 @@ func (tracks *Tracks) Add(name string, t Track) bool { tracks.SetIDR(v) } if tracks.SEI != nil { - v.SEIReader = &track.DataReader[[]byte]{ - Ring: tracks.SEI.Ring, - } + v.SEIReader = &track.DataReader[[]byte]{} + v.SEIReader.Ring = tracks.SEI.Ring } case *track.Audio: if tracks.MainVideo != nil { @@ -177,12 +176,13 @@ func (tracks *Tracks) AddSEI(t byte, data []byte) bool { l := len(data) var buffer util.Buffer buffer.WriteByte(t) - for l > 255 { + for l >= 255 { buffer.WriteByte(255) l -= 255 } buffer.WriteByte(byte(l)) buffer.Write(data) + buffer.WriteByte(0x80) tracks.SEI.Push(buffer) return true } @@ -274,16 +274,16 @@ func findOrCreateStream(streamPath string, waitTimeout time.Duration) (s *Stream AppName: p[0], StreamName: strings.Join(p[1:], "/"), StartTime: time.Now(), + Logger: log.LocaleLogger.With(zap.String("stream", streamPath)), + timeout: time.NewTimer(waitTimeout), }) if s := actual.(*Stream); loaded { s.Debug("Stream Found") return s, false } else { - s.timeout = time.NewTimer(waitTimeout) s.Subscribers.Init() - s.Logger = log.LocaleLogger.With(zap.String("stream", streamPath)) - s.Info("created") s.actionChan.Init(1) + s.Info("created") go s.run() return s, true } @@ -330,11 +330,11 @@ func (r *Stream) action(action StreamAction) (ok bool) { stateEvent = SEpublish{event} } r.Subscribers.Broadcast(stateEvent) - if r.IdleTimeout > 0 && r.Subscribers.Len() == 0 { - return r.action(ACTION_LASTLEAVE) - } else { - r.timeout.Reset(r.PublishTimeout) // 5秒心跳,检测track的存活度 - } + // if r.IdleTimeout > 0 && r.Subscribers.Len() == 0 { + // return r.action(ACTION_LASTLEAVE) + // } else { + r.timeout.Reset(r.PublishTimeout) // 5秒心跳,检测track的存活度 + // } case STATE_WAITCLOSE: stateEvent = SEwaitClose{event} if r.IdleTimeout > 0 { @@ -465,6 +465,10 @@ func (s *Stream) run() { s.action(ACTION_PUBLISHLOST) continue } + if s.IdleTimeout > 0 && s.Subscribers.Len() == 0 && time.Since(s.StartTime) > s.IdleTimeout { + s.action(ACTION_LASTLEAVE) + continue + } } s.timeout.Reset(time.Second * 5) //订阅者等待音视频轨道超时了,放弃等待,订阅成功 diff --git a/subscriber.go b/subscriber.go index 536e082..bfc66b2 100644 --- a/subscriber.go +++ b/subscriber.go @@ -9,6 +9,7 @@ import ( "time" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "m7s.live/engine/v4/codec" . "m7s.live/engine/v4/common" "m7s.live/engine/v4/config" @@ -206,9 +207,10 @@ func (s *Subscriber) PlayBlock(subType byte) { ctx := s.TrackPlayer.Context conf := s.Config hasVideo, hasAudio := s.Video != nil && conf.SubVideo, s.Audio != nil && conf.SubAudio - defer s.onStop() + stopReason := zap.String("reason", "stop") + defer s.onStop(&stopReason) if !hasAudio && !hasVideo { - s.Error("play neither video nor audio") + stopReason = zap.String("reason", "play neither video nor audio") return } sendVideoDecConf := func() { @@ -309,11 +311,12 @@ func (s *Subscriber) PlayBlock(subType byte) { for ctx.Err() == nil { if hasVideo { for ctx.Err() == nil { - s.VideoReader.Read(ctx, subMode) - videoFrame = s.VideoReader.Frame - if videoFrame == nil || ctx.Err() != nil { + err := s.VideoReader.ReadFrame(subMode) + if err != nil { + stopReason = zap.Error(err) return } + videoFrame = s.VideoReader.Value // fmt.Println("video", s.VideoReader.Track.PreFrame().Sequence-frame.Sequence) if videoFrame.IFrame && s.VideoReader.DecConfChanged() { s.VideoReader.ConfSeq = s.VideoReader.Track.SequenceHeadSeq @@ -323,9 +326,7 @@ func (s *Subscriber) PlayBlock(subType byte) { if audioFrame != nil { if videoFrame.Timestamp > audioFrame.Timestamp { // fmt.Println("switch audio", audioFrame.CanRead) - if audioFrame.CanRead { - sendAudioFrame(audioFrame) - } + sendAudioFrame(audioFrame) audioFrame = nil break } @@ -355,11 +356,12 @@ func (s *Subscriber) PlayBlock(subType byte) { s.AudioReader.SkipTs = s.VideoReader.SkipTs } } - s.AudioReader.Read(ctx, subMode) - audioFrame = s.AudioReader.Frame - if audioFrame == nil || ctx.Err() != nil { + err := s.AudioReader.ReadFrame(subMode) + if err != nil { + stopReason = zap.Error(err) return } + audioFrame = s.AudioReader.Value // fmt.Println("audio", s.AudioReader.Track.PreFrame().Sequence-frame.Sequence) if s.AudioReader.DecConfChanged() { s.AudioReader.ConfSeq = s.AudioReader.Track.SequenceHeadSeq @@ -368,9 +370,7 @@ func (s *Subscriber) PlayBlock(subType byte) { if hasVideo && videoFrame != nil { if audioFrame.Timestamp > videoFrame.Timestamp { // fmt.Println("switch video", videoFrame.CanRead) - if videoFrame.CanRead { - sendVideoFrame(videoFrame) - } + sendVideoFrame(videoFrame) videoFrame = nil break } @@ -383,11 +383,18 @@ func (s *Subscriber) PlayBlock(subType byte) { } } } + if videoFrame != nil { + videoFrame.ReaderLeave() + } + if audioFrame != nil { + audioFrame.ReaderLeave() + } + stopReason = zap.Error(ctx.Err()) } -func (s *Subscriber) onStop() { +func (s *Subscriber) onStop(reason *zapcore.Field) { if !s.Stream.IsClosed() { - s.Info("stop") + s.Info("stop", *reason) if !s.Config.Internal { s.Stream.Receive(s.Spesific) } diff --git a/track/aac.go b/track/aac.go index 458619f..78802cd 100644 --- a/track/aac.go +++ b/track/aac.go @@ -4,7 +4,6 @@ import ( "fmt" "io" "net" - "time" "github.com/aler9/gortsplib/v2/pkg/bits" "go.uber.org/zap" @@ -25,7 +24,7 @@ func NewAAC(stream IStream, stuff ...any) (aac *AAC) { aac.CodecID = codec.CodecID_AAC aac.Channels = 2 aac.SampleSize = 16 - aac.SetStuff("aac", stream, int(256+128), byte(97), aac, time.Millisecond*10) + aac.SetStuff("aac", stream, byte(97), aac) aac.SetStuff(stuff...) if aac.BytesPool == nil { aac.BytesPool = make(util.BytesPool, 17) diff --git a/track/audio.go b/track/audio.go index 0fab951..ffc3945 100644 --- a/track/audio.go +++ b/track/audio.go @@ -57,7 +57,7 @@ func (av *Audio) Flush() { } func (av *Audio) WriteRawBytes(pts uint32, raw util.IBytes) { - curValue := &av.Value + curValue := av.Value curValue.BytesIn += raw.Len() av.Value.AUList.Push(av.GetFromPool(raw)) av.generateTimestamp(pts) diff --git a/track/base.go b/track/base.go index bf49425..d84ae91 100644 --- a/track/base.go +++ b/track/base.go @@ -12,13 +12,6 @@ import ( "m7s.live/engine/v4/util" ) -type emptyLocker struct{} - -func (emptyLocker) Lock() {} -func (emptyLocker) Unlock() {} - -var EmptyLocker emptyLocker - type 流速控制 struct { 起始时间戳 time.Duration 起始dts time.Duration @@ -74,12 +67,12 @@ type SpesificTrack interface { } type IDRingList struct { - IDRList util.List[*util.Ring[AVFrame]] - IDRing *util.Ring[AVFrame] - HistoryRing *util.Ring[AVFrame] + IDRList util.List[*util.Ring[*AVFrame]] + IDRing *util.Ring[*AVFrame] + HistoryRing *util.Ring[*AVFrame] } -func (p *IDRingList) AddIDR(IDRing *util.Ring[AVFrame]) { +func (p *IDRingList) AddIDR(IDRing *util.Ring[*AVFrame]) { p.IDRList.PushValue(IDRing) p.IDRing = IDRing } @@ -91,7 +84,7 @@ func (p *IDRingList) ShiftIDR() { // Media 基础媒体Track类 type Media struct { - Base[AVFrame] + Base[any, *AVFrame] PayloadType byte IDRingList `json:"-" yaml:"-"` //最近的关键帧位置,首屏渲染 SSRC uint32 @@ -159,11 +152,12 @@ func (av *Media) SetStuff(stuff ...any) { // 代表发布者已经离线,该Track成为遗留Track,等待下一任发布者接续发布 for _, s := range stuff { switch v := s.(type) { - case int: - av.Init(v) - av.Value.L = EmptyLocker + case IStream: + pubConf := v.GetPublisherConfig() + av.Base.SetStuff(v) + av.Init(pubConf.RingSize, NewAVFrame) av.SSRC = uint32(uintptr(unsafe.Pointer(av))) - av.等待上限 = config.Global.SpeedLimit + av.等待上限 = pubConf.SpeedLimit case uint32: av.SampleRate = v case byte: @@ -183,7 +177,7 @@ func (av *Media) LastWriteTime() time.Time { } func (av *Media) CurrentFrame() *AVFrame { - return &av.Value + return av.Value } func (av *Media) PreFrame() *AVFrame { return av.LastValue @@ -212,9 +206,7 @@ func (av *Media) narrow(gop int) { av.Trace("resize", zap.Int("before", av.Size), zap.Int("after", av.Size-5)) } //缩小缓冲环节省内存 - av.Reduce(5).Do(func(v *AVFrame) { - v.Reset() - }) + av.Reduce(5) } } @@ -230,7 +222,7 @@ func (av *Media) AddIDR() { } func (av *Media) Flush() { - curValue, preValue, nextValue := &av.Value, av.LastValue, av.Next() + curValue, preValue, nextValue := av.Value, av.LastValue, av.Next() useDts := curValue.Timestamp == 0 if av.State == TrackStateOffline { av.State = TrackStateOnline @@ -308,20 +300,10 @@ func (av *Media) Flush() { } } av.ComputeBPS(curValue.BytesIn) - curValue.WriteTime = time.Now() + av.Step() if av.等待上限 > 0 { av.控制流速(curValue.Timestamp, curValue.DTS) } - preValue = curValue - curValue = av.MoveNext() - curValue.CanRead = false - curValue.Reset() - if curValue.L == nil { - curValue.L = EmptyLocker - } - curValue.Sequence = av.MoveCount - preValue.CanRead = true - preValue.Broadcast() } func deltaTS(curTs time.Duration, preTs time.Duration) time.Duration { diff --git a/track/data.go b/track/data.go index 47d1ea5..23e3bfe 100644 --- a/track/data.go +++ b/track/data.go @@ -12,52 +12,43 @@ import ( ) type Data[T any] struct { - Base[DataFrame[T]] + Base[T, *DataFrame[T]] sync.Locker `json:"-" yaml:"-"` // 写入锁,可选,单一协程写入可以不加锁 } +func (dt *Data[T]) Init(n int) { + dt.Base.Init(n, NewDataFrame[T]) +} + func (dt *Data[T]) Push(data T) { if dt.Locker != nil { dt.Lock() defer dt.Unlock() } - curValue := &dt.Value + curValue := dt.Value if log.Trace { dt.Trace("push data", zap.Uint32("sequence", curValue.Sequence)) } - curValue.WriteTime = time.Now() curValue.Data = data - preValue := curValue - curValue = dt.MoveNext() - curValue.CanRead = false - curValue.Reset() - if curValue.L == nil { - curValue.L = EmptyLocker - } - curValue.Sequence = dt.MoveCount - preValue.CanRead = true - preValue.Broadcast() + dt.Step() } func (d *Data[T]) Play(ctx context.Context, onData func(*DataFrame[T]) error) (err error) { d.Debug("play data track") - reader := DataReader[T]{ - Ctx: ctx, - Ring: d.Ring, - } - for { - curValue := reader.Read() - if err = ctx.Err(); err != nil { - return - } + reader := DataReader[T]{} + for err = reader.Read(d.Ring); err == nil; err = reader.ReadNext() { if log.Trace { - d.Trace("read data", zap.Uint32("sequence", curValue.Sequence)) + d.Trace("read data", zap.Uint32("sequence", reader.Value.Sequence)) } - if err = onData(curValue); err == nil { + if err = onData(reader.Value); err == nil { err = ctx.Err() } - reader.MoveNext() + if err != nil { + reader.Value.ReaderLeave() + return + } } + return } func (d *Data[T]) Attach(s IStream) { @@ -80,7 +71,6 @@ func (d *Data[T]) LastWriteTime() time.Time { func NewDataTrack[T any](name string) (dt *Data[T]) { dt = &Data[T]{} dt.Init(10) - dt.Value.L = EmptyLocker dt.SetStuff(name) return } @@ -94,30 +84,18 @@ func (dt *RecycleData[T]) Push(data T) { dt.Lock() defer dt.Unlock() } - curValue := &dt.Value + curValue := dt.Value if log.Trace { dt.Trace("push data", zap.Uint32("sequence", curValue.Sequence)) } - curValue.WriteTime = time.Now() curValue.Data = data - preValue := curValue - curValue = dt.MoveNext() - curValue.CanRead = false - curValue.Reset() - if curValue.L == nil { - curValue.L = EmptyLocker - } else { - curValue.Data.Recycle() - } - curValue.Sequence = dt.MoveCount - preValue.CanRead = true - preValue.Broadcast() + dt.Step() + dt.Value.Data.Recycle() } func NewRecycleDataTrack[T util.Recyclable](name string) (dt *RecycleData[T]) { dt = &RecycleData[T]{} dt.Init(10) - dt.Value.L = EmptyLocker dt.SetStuff(name) return } @@ -132,7 +110,6 @@ func NewBytesDataTrack(name string) (dt *BytesData) { Pool: make(util.BytesPool, 17), } dt.Init(10) - dt.Value.L = EmptyLocker dt.SetStuff(name) return } diff --git a/track/g711.go b/track/g711.go index 338b1fd..59f046a 100644 --- a/track/g711.go +++ b/track/g711.go @@ -2,7 +2,6 @@ package track import ( "io" - "time" "go.uber.org/zap" "m7s.live/engine/v4/codec" @@ -29,7 +28,7 @@ func NewG711(stream IStream, alaw bool, stuff ...any) (g711 *G711) { g711.SampleSize = 8 g711.Channels = 1 g711.AVCCHead = []byte{(byte(g711.CodecID) << 4) | (1 << 1)} - g711.SetStuff(stream, int(32), uint32(8000), g711, time.Millisecond*10) + g711.SetStuff(stream, uint32(8000), g711) g711.SetStuff(stuff...) if g711.BytesPool == nil { g711.BytesPool = make(util.BytesPool, 17) diff --git a/track/h264.go b/track/h264.go index 149e341..ee5e87c 100644 --- a/track/h264.go +++ b/track/h264.go @@ -2,7 +2,6 @@ package track import ( "io" - "time" "go.uber.org/zap" "m7s.live/engine/v4/codec" @@ -20,7 +19,7 @@ type H264 struct { func NewH264(stream IStream, stuff ...any) (vt *H264) { vt = &H264{} vt.Video.CodecID = codec.CodecID_H264 - vt.SetStuff("h264", int(256), byte(96), uint32(90000), stream, vt, time.Millisecond*10) + vt.SetStuff("h264", byte(96), uint32(90000), stream, vt) vt.SetStuff(stuff...) if vt.BytesPool == nil { vt.BytesPool = make(util.BytesPool, 17) @@ -114,7 +113,7 @@ func (vt *H264) WriteRTPFrame(frame *RTPFrame) { vt.lostFlag = true vt.Warn("lost rtp packet", zap.Uint16("lastSeq", vt.lastSeq), zap.Uint16("lastSeq2", vt.lastSeq2)) } - rv := &vt.Value + rv := vt.Value if naluType := frame.H264Type(); naluType < 24 { vt.WriteSliceBytes(frame.Payload) } else { diff --git a/track/h265.go b/track/h265.go index 45ba61a..f93d486 100644 --- a/track/h265.go +++ b/track/h265.go @@ -2,7 +2,6 @@ package track import ( "io" - "time" "go.uber.org/zap" "m7s.live/engine/v4/codec" @@ -20,7 +19,7 @@ type H265 struct { func NewH265(stream IStream, stuff ...any) (vt *H265) { vt = &H265{} vt.Video.CodecID = codec.CodecID_H265 - vt.SetStuff("h265", int(256), byte(96), uint32(90000), stream, vt, time.Millisecond*10) + vt.SetStuff("h265", byte(96), uint32(90000), stream, vt) vt.SetStuff(stuff...) if vt.BytesPool == nil { vt.BytesPool = make(util.BytesPool, 17) @@ -135,7 +134,7 @@ func (vt *H265) WriteAVCC(ts uint32, frame *util.BLL) (err error) { } func (vt *H265) WriteRTPFrame(frame *RTPFrame) { - rv := &vt.Value + rv := vt.Value // TODO: DONL may need to be parsed if `sprop-max-don-diff` is greater than 0 on the RTP stream. var usingDonlField bool var buffer = util.Buffer(frame.Payload) diff --git a/track/reader-av.go b/track/reader-av.go index 39a426a..69e5c5d 100644 --- a/track/reader-av.go +++ b/track/reader-av.go @@ -1,13 +1,12 @@ package track import ( - "context" + "errors" "time" "go.uber.org/zap" "m7s.live/engine/v4/common" "m7s.live/engine/v4/log" - "m7s.live/engine/v4/util" ) const ( @@ -21,12 +20,12 @@ const ( SUBMODE_BUFFER ) +var ErrDiscard = errors.New("dsicard") + type AVRingReader struct { - ctx context.Context - mode int - Track *Media - *util.Ring[common.AVFrame] - // wait func() + RingReader[any, *common.AVFrame] + mode int + Track *Media State byte FirstSeq uint32 FirstTs time.Duration @@ -34,7 +33,6 @@ type AVRingReader struct { beforeJump time.Duration ConfSeq int startTime time.Time - Frame *common.AVFrame AbsTime uint32 Delay uint32 *log.Logger @@ -45,45 +43,25 @@ func (r *AVRingReader) DecConfChanged() bool { } func NewAVRingReader(t *Media) *AVRingReader { - r := &AVRingReader{ + return &AVRingReader{ Track: t, } - // if poll == 0 { - // r.wait = runtime.Gosched - // } else { - // r.wait = func() { - // time.Sleep(poll) - // } - // } - return r } -func (r *AVRingReader) ReadFrame() *common.AVFrame { - if r.Frame = &r.Value; r.ctx.Err() == nil && !r.Frame.CanRead { - r.Frame.Wait() +func (r *AVRingReader) readFrame() (err error) { + err = r.ReadNext() + if err != nil { + return err } // 超过一半的缓冲区大小,说明Reader太慢,需要丢帧 - if r.mode != SUBMODE_BUFFER && r.State == READSTATE_NORMAL && r.Track.LastValue.Sequence-r.Frame.Sequence > uint32(r.Track.Size/2) && r.Track.IDRing != nil && r.Track.IDRing.Value.Sequence > r.Frame.Sequence { - r.Warn("reader too slow", zap.Uint32("lastSeq", r.Track.LastValue.Sequence), zap.Uint32("seq", r.Frame.Sequence)) - r.Ring = r.Track.IDRing - return r.ReadFrame() + if r.mode != SUBMODE_BUFFER && r.State == READSTATE_NORMAL && r.Track.LastValue.Sequence-r.Value.Sequence > uint32(r.Track.Size/2) && r.Track.IDRing != nil && r.Track.IDRing.Value.Sequence > r.Value.Sequence { + r.Warn("reader too slow", zap.Uint32("lastSeq", r.Track.LastValue.Sequence), zap.Uint32("seq", r.Value.Sequence)) + return r.Read(r.Track.IDRing) } - return r.Frame + return } -func (r *AVRingReader) TryRead() (item *common.AVFrame) { - if item = &r.Value; item.CanRead { - return - } - return nil -} - -func (r *AVRingReader) MoveNext() { - r.Ring = r.Next() -} - -func (r *AVRingReader) Read(ctx context.Context, mode int) (err error) { - r.ctx = ctx +func (r *AVRingReader) ReadFrame(mode int) (err error) { r.mode = mode switch r.State { case READSTATE_INIT: @@ -109,57 +87,56 @@ func (r *AVRingReader) Read(ctx context.Context, mode int) (err error) { } r.State = READSTATE_NORMAL } - r.Ring = startRing - r.ReadFrame() - if err = r.ctx.Err(); err != nil { + if err = r.StartRead(startRing); err != nil { return } r.startTime = time.Now() if r.FirstTs == 0 { - r.FirstTs = r.Frame.Timestamp + r.FirstTs = r.Value.Timestamp } r.SkipTs = r.FirstTs - r.FirstSeq = r.Frame.Sequence + r.FirstSeq = r.Value.Sequence r.Info("first frame read", zap.Duration("firstTs", r.FirstTs), zap.Uint32("firstSeq", r.FirstSeq)) case READSTATE_FIRST: if r.Track.IDRing.Value.Sequence != r.FirstSeq { - r.Ring = r.Track.IDRing - frame := r.ReadFrame() // 直接跳到最近的关键帧 - if err = r.ctx.Err(); err != nil { + if err = r.Read(r.Track.IDRing); err != nil { return } - r.SkipTs = frame.Timestamp - r.beforeJump + r.SkipTs = r.Value.Timestamp - r.beforeJump r.Info("jump", zap.Uint32("skipSeq", r.Track.IDRing.Value.Sequence-r.FirstSeq), zap.Duration("skipTs", r.SkipTs)) r.State = READSTATE_NORMAL } else { - r.MoveNext() - frame := r.ReadFrame() - r.beforeJump = frame.Timestamp - r.FirstTs + if err = r.readFrame(); err != nil { + return + } + r.beforeJump = r.Value.Timestamp - r.FirstTs // 防止过快消费 if fast := r.beforeJump - time.Since(r.startTime); fast > 0 && fast < time.Second { time.Sleep(fast) } } case READSTATE_NORMAL: - r.MoveNext() - r.ReadFrame() + if err = r.readFrame(); err != nil { + return + } } - r.AbsTime = uint32((r.Frame.Timestamp - r.SkipTs).Milliseconds()) + r.AbsTime = uint32((r.Value.Timestamp - r.SkipTs).Milliseconds()) if r.AbsTime == 0 { r.AbsTime = 1 } - r.Delay = uint32((r.Track.LastValue.Timestamp - r.Frame.Timestamp).Milliseconds()) + // r.Delay = uint32((r.Track.LastValue.Timestamp - r.Value.Timestamp).Milliseconds()) + r.Delay = uint32(r.Track.LastValue.Sequence - r.Value.Sequence) // fmt.Println(r.Track.Name, r.Delay) // println(r.Track.Name, r.State, r.Frame.AbsTime, r.SkipTs, r.AbsTime) return } func (r *AVRingReader) GetPTS32() uint32 { - return uint32((r.Frame.PTS - r.SkipTs*90/time.Millisecond)) + return uint32((r.Value.PTS - r.SkipTs*90/time.Millisecond)) } func (r *AVRingReader) GetDTS32() uint32 { - return uint32((r.Frame.DTS - r.SkipTs*90/time.Millisecond)) + return uint32((r.Value.DTS - r.SkipTs*90/time.Millisecond)) } func (r *AVRingReader) ResetAbsTime() { - r.SkipTs = r.Frame.Timestamp + r.SkipTs = r.Value.Timestamp r.AbsTime = 1 } diff --git a/track/reader-data.go b/track/reader-data.go index ddc7eec..2ce2ed2 100644 --- a/track/reader-data.go +++ b/track/reader-data.go @@ -1,37 +1,71 @@ package track import ( - "context" - "m7s.live/engine/v4/common" "m7s.live/engine/v4/util" ) -type DataReader[T any] struct { - Ctx context.Context - // common.Track - *util.Ring[common.DataFrame[T]] - // startTime time.Time - // Frame *common.DataFrame[T] - // Delay uint32 - // *log.Logger +type RingReader[T any, F common.IDataFrame[T]] struct { + *util.Ring[F] + Count int // 读取的帧数 } -func (r *DataReader[T]) Read() (item *common.DataFrame[T]) { - item = &r.Value - if r.Ctx.Err() == nil && !item.CanRead { - item.Wait() +func (r *RingReader[T, F]) StartRead(ring *util.Ring[F]) (err error) { + r.Ring = ring + if r.Value.IsDiscarded() { + return ErrDiscard } + if r.Value.IsWriting() { + // t := time.Now() + r.Value.Wait() + // log.Info("wait", time.Since(t)) + } + r.Count++ + r.Value.ReaderEnter() return } -func (r *DataReader[T]) TryRead() (item *common.DataFrame[T]) { - if item = &r.Value; item.CanRead { +func (r *RingReader[T, F]) TryRead() (f F, err error) { + if r.Count > 0 { + preValue := r.Value + if preValue.IsDiscarded() { + preValue.ReaderLeave() + err = ErrDiscard + return + } + if r.Next().Value.IsWriting() { + return + } + defer preValue.ReaderLeave() + r.Ring = r.Next() + } else { + if r.Value.IsWriting() { + return + } + } + if r.Value.IsDiscarded() { + err = ErrDiscard return } - return nil + r.Count++ + f = r.Value + r.Value.ReaderEnter() + return } -func (r *DataReader[T]) MoveNext() { - r.Ring = r.Next() +func (r *RingReader[T, F]) ReadNext() (err error) { + return r.Read(r.Next()) +} + +func (r *RingReader[T, F]) Read(ring *util.Ring[F]) (err error) { + preValue := r.Value + defer preValue.ReaderLeave() + if preValue.IsDiscarded() { + return ErrDiscard + } + return r.StartRead(ring) +} + +type DataReader[T any] struct { + RingReader[T, *common.DataFrame[T]] } diff --git a/track/video.go b/track/video.go index 6fef8d4..195558b 100644 --- a/track/video.go +++ b/track/video.go @@ -247,14 +247,15 @@ func (vt *Video) CompleteAVCC(rv *AVFrame) { } func (vt *Video) Flush() { - rv := &vt.Value + rv := vt.Value if vt.SEIReader != nil { - if seiFrame := vt.SEIReader.TryRead(); seiFrame != nil { + if seiFrame, err := vt.SEIReader.TryRead(); seiFrame != nil { var au util.BLL au.Push(vt.SpesificTrack.GetNALU_SEI()) au.Push(vt.BytesPool.GetShell(seiFrame.Data)) vt.Value.AUList.UnshiftValue(&au) - vt.SEIReader.MoveNext() + } else if err != nil { + vt.SEIReader = nil } } if rv.IFrame {