mirror of
https://github.com/Monibuca/engine.git
synced 2025-12-24 13:18:07 +08:00
114 lines
2.1 KiB
Go
114 lines
2.1 KiB
Go
package track
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
. "m7s.live/engine/v4/common"
|
|
"m7s.live/engine/v4/log"
|
|
"m7s.live/engine/v4/util"
|
|
)
|
|
|
|
type Data[T any] struct {
|
|
Base[T, *DataFrame[T]]
|
|
sync.Locker `json:"-" yaml:"-"` // 写入锁,可选,单一协程写入可以不加锁
|
|
}
|
|
|
|
func (dt *Data[T]) Init(n int) {
|
|
dt.Base.Init(n, NewDataFrame[T])
|
|
}
|
|
|
|
func (dt *Data[T]) Push(data T) {
|
|
if dt.Locker != nil {
|
|
dt.Lock()
|
|
defer dt.Unlock()
|
|
}
|
|
curValue := dt.Value
|
|
if log.Trace {
|
|
dt.Trace("push data", zap.Uint32("sequence", curValue.Sequence))
|
|
}
|
|
curValue.Data = data
|
|
dt.Step()
|
|
}
|
|
|
|
func (d *Data[T]) Play(ctx context.Context, onData func(*DataFrame[T]) error) (err error) {
|
|
d.Debug("play data track")
|
|
reader := DataReader[T]{}
|
|
for err = reader.StartRead(d.Ring); err == nil; err = reader.ReadNext() {
|
|
if log.Trace {
|
|
d.Trace("read data", zap.Uint32("sequence", reader.Value.Sequence))
|
|
}
|
|
if err = onData(reader.Value); err == nil {
|
|
err = ctx.Err()
|
|
}
|
|
if err != nil {
|
|
reader.Value.ReaderLeave()
|
|
return
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (d *Data[T]) Attach(s IStream) {
|
|
d.SetStuff(s)
|
|
if err := s.AddTrack(d).Await(); err != nil {
|
|
d.Error("attach data track failed", zap.Error(err))
|
|
} else {
|
|
d.Info("data track attached")
|
|
}
|
|
}
|
|
|
|
func (d *Data[T]) LastWriteTime() time.Time {
|
|
return d.LastValue.WriteTime
|
|
}
|
|
|
|
func NewDataTrack[T any](name string) (dt *Data[T]) {
|
|
dt = &Data[T]{}
|
|
dt.Init(10)
|
|
dt.SetStuff(name)
|
|
return
|
|
}
|
|
|
|
type RecycleData[T util.Recyclable] struct {
|
|
Data[T]
|
|
}
|
|
|
|
func (dt *RecycleData[T]) Push(data T) {
|
|
if dt.Locker != nil {
|
|
dt.Lock()
|
|
defer dt.Unlock()
|
|
}
|
|
curValue := dt.Value
|
|
if log.Trace {
|
|
dt.Trace("push data", zap.Uint32("sequence", curValue.Sequence))
|
|
}
|
|
curValue.Data = data
|
|
dt.Step()
|
|
if !dt.Value.WriteTime.IsZero() {
|
|
dt.Value.Data.Recycle()
|
|
}
|
|
}
|
|
|
|
func NewRecycleDataTrack[T util.Recyclable](name string) (dt *RecycleData[T]) {
|
|
dt = &RecycleData[T]{}
|
|
dt.Init(10)
|
|
dt.SetStuff(name)
|
|
return
|
|
}
|
|
|
|
type BytesData struct {
|
|
RecycleData[util.LIBP]
|
|
Pool util.BytesPool
|
|
}
|
|
|
|
func NewBytesDataTrack(name string) (dt *BytesData) {
|
|
dt = &BytesData{
|
|
Pool: make(util.BytesPool, 17),
|
|
}
|
|
dt.Init(10)
|
|
dt.SetStuff(name)
|
|
return
|
|
}
|