Updated rate enforcer

This commit is contained in:
Quentin Renard
2022-10-10 11:01:30 +02:00
parent a59b8134ad
commit 2f844182bc
2 changed files with 180 additions and 225 deletions

View File

@@ -3,8 +3,6 @@ package astilibav
// Event names
const (
EventNameLog = "astilibav.log"
// First frame of new node has been received by the rate enforcer
EventNameRateEnforcerSwitchedIn = "astilibav.rate.enforcer.switched.in"
// First frame of new node has been dispatched by the rate enforcer
EventNameRateEnforcerSwitchedOut = "astilibav.rate.enforcer.switched.out"
)

View File

@@ -3,7 +3,6 @@ package astilibav
import (
"context"
"fmt"
"math"
"sync"
"sync/atomic"
"time"
@@ -18,44 +17,52 @@ var countRateEnforcer uint64
// RateEnforcer represents an object capable of enforcing rate based on PTS
type RateEnforcer struct {
*astiencoder.BaseNode
adaptSlotsToIncomingFrames bool
buf []*rateEnforcerItem
c *astikit.Chan
d *frameDispatcher
descriptor Descriptor
eh *astiencoder.EventHandler
f RateEnforcerFiller
m *sync.Mutex
n astiencoder.Node
outputCtx Context
p *framePool
period time.Duration
restamper FrameRestamper
slotsCount int
slots []*rateEnforcerSlot
statDelayAvg *astikit.CounterAvgStat
statFilledRate *astikit.CounterRateStat
statIncomingRate *astikit.CounterRateStat
statProcessedRate *astikit.CounterRateStat
c *astikit.Chan
currentNode astiencoder.Node
d *frameDispatcher
delay time.Duration
descriptor Descriptor
desiredNode astiencoder.Node
eh *astiencoder.EventHandler
f RateEnforcerFiller
frames map[astiencoder.Node][]*astiav.Frame
m *sync.Mutex
outputCtx Context
p *framePool
period time.Duration
ptsReferences map[astiencoder.Node]*rateEnforcerPTSReference
restamper FrameRestamper
statDelayAvg *astikit.CounterAvgStat
statFilledRate *astikit.CounterRateStat
statIncomingRate *astikit.CounterRateStat
statProcessedRate *astikit.CounterRateStat
}
type rateEnforcerSlot struct {
i *rateEnforcerItem
n astiencoder.Node
ptsMax int64
ptsMin int64
type rateEnforcerPTSReference struct {
pts int64
t time.Time
timeBase astiav.Rational
}
type rateEnforcerItem struct {
f *astiav.Frame
n astiencoder.Node
func newRateEnforcerPTSReference(pts int64, t time.Time, timeBase astiav.Rational) *rateEnforcerPTSReference {
return &rateEnforcerPTSReference{
pts: pts,
t: t,
timeBase: timeBase,
}
}
func (r *rateEnforcerPTSReference) timeFromPTS(pts int64) time.Time {
return r.t.Add(time.Duration(astiav.RescaleQ(pts-r.pts, r.timeBase, nanosecondRational)))
}
func (r *rateEnforcerPTSReference) ptsFromTime(t time.Time) int64 {
return astiav.RescaleQ(int64(t.Sub(r.t)), nanosecondRational, r.timeBase) + r.pts
}
// RateEnforcerOptions represents rate enforcer options
type RateEnforcerOptions struct {
AdaptSlotsToIncomingFrames bool
// Expressed in number of frames in the output framerate
Delay uint
Delay time.Duration
Filler RateEnforcerFiller
Node astiencoder.NodeOptions
// Both FrameRate and TimeBase are mandatory
@@ -71,21 +78,21 @@ func NewRateEnforcer(o RateEnforcerOptions, eh *astiencoder.EventHandler, c *ast
// Create rate enforcer
r = &RateEnforcer{
adaptSlotsToIncomingFrames: o.AdaptSlotsToIncomingFrames,
c: astikit.NewChan(astikit.ChanOptions{ProcessAll: true}),
descriptor: o.OutputCtx.Descriptor(),
eh: eh,
f: o.Filler,
m: &sync.Mutex{},
outputCtx: o.OutputCtx,
period: time.Duration(float64(1e9) / o.OutputCtx.FrameRate.ToDouble()),
restamper: o.Restamper,
slots: []*rateEnforcerSlot{nil},
slotsCount: int(math.Max(float64(o.Delay), 1)),
statDelayAvg: astikit.NewCounterAvgStat(),
statFilledRate: astikit.NewCounterRateStat(),
statIncomingRate: astikit.NewCounterRateStat(),
statProcessedRate: astikit.NewCounterRateStat(),
c: astikit.NewChan(astikit.ChanOptions{ProcessAll: true}),
delay: o.Delay,
descriptor: o.OutputCtx.Descriptor(),
frames: make(map[astiencoder.Node][]*astiav.Frame),
eh: eh,
f: o.Filler,
m: &sync.Mutex{},
outputCtx: o.OutputCtx,
period: time.Duration(float64(1e9) / o.OutputCtx.FrameRate.ToDouble()),
ptsReferences: map[astiencoder.Node]*rateEnforcerPTSReference{},
restamper: o.Restamper,
statDelayAvg: astikit.NewCounterAvgStat(),
statFilledRate: astikit.NewCounterRateStat(),
statIncomingRate: astikit.NewCounterRateStat(),
statProcessedRate: astikit.NewCounterRateStat(),
}
// Create base node
@@ -163,7 +170,7 @@ func (r *RateEnforcer) OutputCtx() Context {
func (r *RateEnforcer) Switch(n astiencoder.Node) {
r.m.Lock()
defer r.m.Unlock()
r.n = n
r.desiredNode = n
}
// Connect implements the FrameHandlerConnector interface
@@ -190,15 +197,22 @@ func (r *RateEnforcer) Start(ctx context.Context, t astiencoder.CreateTaskFunc)
// Make sure to stop the chan properly
defer r.c.Stop()
// Start tick
startTickCtx := r.startTick(r.Context())
// Start tick in a goroutine
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
// Make sure to update the waiting group
defer wg.Done()
// Start tick
r.startTick(r.Context())
}()
// Start chan
r.c.Start(r.Context())
// Wait for start tick to be really over since it's not the blocking pattern
// and is executed in a goroutine
<-startTickCtx.Done()
// Wait for start tick to be really over
wg.Wait()
})
}
@@ -209,6 +223,11 @@ func (r *RateEnforcer) HandleFrame(p FrameHandlerPayload) {
// Increment incoming rate
r.statIncomingRate.Add(1)
// Invalid pts
if p.Frame.Pts() == astiav.NoPtsValue {
return
}
// Copy frame
f := r.p.get()
if err := f.Ref(p.Frame); err != nil {
@@ -216,6 +235,9 @@ func (r *RateEnforcer) HandleFrame(p FrameHandlerPayload) {
return
}
// Get time
t := time.Now().Add(r.delay)
// Restamp
f.SetPts(astiav.RescaleQ(f.Pts(), p.Descriptor.TimeBase(), r.outputCtx.TimeBase))
@@ -226,9 +248,6 @@ func (r *RateEnforcer) HandleFrame(p FrameHandlerPayload) {
// Handle pause
defer r.HandlePause()
// Make sure to close frame
defer r.p.put(f)
// Increment processed rate
r.statProcessedRate.Add(1)
@@ -236,99 +255,53 @@ func (r *RateEnforcer) HandleFrame(p FrameHandlerPayload) {
r.m.Lock()
defer r.m.Unlock()
// Get last slot
l := r.slots[len(r.slots)-1]
// We update the last slot if:
// - it's empty
// - its node is different from the desired node AND the payload's node is the desired
// node. That way, if the desired node doesn't dispatch frames for some time, we fallback to the previous
// node instead of the previous item
// - it's in the past compared to current frame and developer wants to adapt slots to incoming frames
if c1, c2 := l == nil || (r.n != l.n && r.n == p.Node), l != nil && l.n == p.Node && l.ptsMax < f.Pts(); c1 || c2 {
// Update last slot
if c1 || (c2 && r.adaptSlotsToIncomingFrames) {
r.slots[len(r.slots)-1] = r.newRateEnforcerSlot(f)
// Insert frame
var inserted bool
for idx, v := range r.frames[p.Node] {
if f.Pts() > v.Pts() {
continue
}
// Emit event
if c1 {
r.eh.Emit(astiencoder.Event{
Name: EventNameRateEnforcerSwitchedIn,
Payload: p.Node,
Target: r,
})
if f.Pts() == v.Pts() {
r.p.put(f)
return
} else {
r.frames[p.Node] = append(r.frames[p.Node][:idx], append([]*astiav.Frame{f}, r.frames[p.Node][idx:]...)...)
}
inserted = true
break
}
// Create item
i := newRateEnforcerItem(nil, p.Node)
// Copy frame
i.f = r.p.get()
if err := i.f.Ref(f); err != nil {
emitError(r, r.eh, err, "refing frame")
return
// Frame was not inserted, we need to append it
if !inserted {
r.frames[p.Node] = append(r.frames[p.Node], f)
}
// Append item
r.buf = append(r.buf, i)
// Update pts reference
ptsReference, ok := r.ptsReferences[p.Node]
if !ok || ptsReference == nil || ptsReference.timeFromPTS(f.Pts()).After(t) {
r.ptsReferences[p.Node] = newRateEnforcerPTSReference(f.Pts(), t, r.outputCtx.TimeBase)
ptsReference = r.ptsReferences[p.Node]
}
// Process delay stat
if l != nil && l.n == i.n {
r.statDelayAvg.Add(float64(time.Duration(astiav.RescaleQ(l.ptsMax-i.f.Pts(), r.outputCtx.TimeBase, nanosecondRational))))
if r.currentNode == p.Node {
r.statDelayAvg.Add(float64(t.Sub(ptsReference.timeFromPTS(f.Pts()))))
}
})
})
})
}
func (r *RateEnforcer) newRateEnforcerSlot(f *astiav.Frame) *rateEnforcerSlot {
return &rateEnforcerSlot{
n: r.n,
ptsMax: f.Pts() + int64(1/(r.outputCtx.TimeBase.ToDouble()*r.outputCtx.FrameRate.ToDouble())),
ptsMin: f.Pts(),
}
}
func (s *rateEnforcerSlot) next() *rateEnforcerSlot {
return &rateEnforcerSlot{
n: s.n,
ptsMin: s.ptsMax,
ptsMax: s.ptsMax - s.ptsMin + s.ptsMax,
}
}
func newRateEnforcerItem(f *astiav.Frame, n astiencoder.Node) *rateEnforcerItem {
return &rateEnforcerItem{
f: f,
n: n,
}
}
func (r *RateEnforcer) startTick(parentCtx context.Context) (ctx context.Context) {
// Create independant context that only captures when the following goroutine ends
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(context.Background())
// Execute the rest in a go routine
go func() {
// Make sure to cancel local context
defer cancel()
// Loop
nextAt := time.Now()
var previousNode astiencoder.Node
for {
if stop := r.tickFunc(parentCtx, &nextAt, &previousNode); stop {
return
}
func (r *RateEnforcer) startTick(ctx context.Context) {
nextAt := time.Now()
for {
if stop := r.tickFunc(ctx, &nextAt); stop {
return
}
}()
return
}
}
func (r *RateEnforcer) tickFunc(ctx context.Context, nextAt *time.Time, previousNode *astiencoder.Node) (stop bool) {
func (r *RateEnforcer) tickFunc(ctx context.Context, nextAt *time.Time) (stop bool) {
// Compute next at
*nextAt = nextAt.Add(r.period)
@@ -346,57 +319,32 @@ func (r *RateEnforcer) tickFunc(ctx context.Context, nextAt *time.Time, previous
r.m.Lock()
defer r.m.Unlock()
// Make sure to remove first slot AFTER adding next slot, so that when there's only
// one slot, we still can get the .next() slot
removeFirstSlot := true
defer func(b *bool) {
if *b {
r.slots = r.slots[1:]
}
}(&removeFirstSlot)
// Get frame
f, n, filled := r.frame(*nextAt)
// Make sure to add next slot
defer func() {
var s *rateEnforcerSlot
if ps := r.slots[len(r.slots)-1]; ps != nil {
s = ps.next()
}
r.slots = append(r.slots, s)
}()
// Not enough slots
if len(r.slots) < r.slotsCount {
removeFirstSlot = false
return
}
// Distribute
r.distribute()
// Dispatch
i, filled := r.current()
if i != nil {
// Process frame
if f != nil {
// Restamp frame
if r.restamper != nil {
r.restamper.Restamp(i.f)
r.restamper.Restamp(f)
}
// Dispatch frame
r.d.dispatch(i.f, r.descriptor)
r.d.dispatch(f, r.descriptor)
// Frame is coming from an actual node
if i.n != nil {
if n != nil {
// New node has been dispatched
if *previousNode != i.n {
if r.currentNode != n {
// Emit event
r.eh.Emit(astiencoder.Event{
Name: EventNameRateEnforcerSwitchedOut,
Payload: i.n,
Payload: n,
Target: r,
})
// Update previous node
*previousNode = i.n
// Update current node
r.currentNode = n
}
}
}
@@ -405,79 +353,88 @@ func (r *RateEnforcer) tickFunc(ctx context.Context, nextAt *time.Time, previous
if filled {
r.statFilledRate.Add(1)
} else {
r.p.put(i.f)
r.p.put(f)
}
return
}
func (r *RateEnforcer) distribute() {
// Get useful nodes
ns := make(map[astiencoder.Node]bool)
for _, s := range r.slots {
if s != nil && s.n != nil {
ns[s.n] = true
func (r *RateEnforcer) frame(from time.Time) (f *astiav.Frame, n astiencoder.Node, filled bool) {
// Get to
to := from.Add(r.period)
// If desired node is different from the current node, we check it first
if r.desiredNode != nil && r.desiredNode != r.currentNode {
if f = r.frameForNode(r.desiredNode, from, to); f != nil {
n = r.desiredNode
}
}
// Loop through slots
for _, s := range r.slots {
// Slot is empty or already has an item
if s == nil || s.i != nil {
// No frame, we need to check the current node if any
if f == nil && r.currentNode != nil {
if f = r.frameForNode(r.currentNode, from, to); f != nil {
n = r.currentNode
}
}
// Cleanup
r.cleanup(to)
// Fill
if f == nil {
f, n = r.f.Fill()
filled = true
} else {
r.f.NoFill(f, n)
}
return
}
func (r *RateEnforcer) frameForNode(n astiencoder.Node, from, to time.Time) (f *astiav.Frame) {
// Get pts reference
ptsReference, ok := r.ptsReferences[n]
if !ok {
return
}
// Get pts boundaries
ptsMin := ptsReference.ptsFromTime(from)
ptsMax := ptsReference.ptsFromTime(to)
// Loop through frames
for idx := range r.frames[n] {
if r.frames[n][idx].Pts() >= ptsMin && r.frames[n][idx].Pts() < ptsMax {
f = r.frames[n][idx]
r.frames[n] = append(r.frames[n][:idx], r.frames[n][idx+1:]...)
break
}
}
return
}
func (r *RateEnforcer) cleanup(to time.Time) {
// Loop through nodes
for n := range r.frames {
// Get pts reference
ptsReference, ok := r.ptsReferences[n]
if !ok {
continue
}
// Loop through buffer
for idx := 0; idx < len(r.buf); idx++ {
// Not the same node
if r.buf[idx].n != s.n {
// Node is useless
if _, ok := ns[r.buf[idx].n]; !ok {
r.p.put(r.buf[idx].f)
r.buf = append(r.buf[:idx], r.buf[idx+1:]...)
idx--
}
continue
}
// Get max pts
ptsMax := ptsReference.ptsFromTime(to)
// Add to slot or remove if pts is older
if s.ptsMin <= r.buf[idx].f.Pts() && s.ptsMax > r.buf[idx].f.Pts() {
if s.i == nil {
s.i = r.buf[idx]
} else {
r.p.put(r.buf[idx].f)
}
r.buf = append(r.buf[:idx], r.buf[idx+1:]...)
// Loop through frames
for idx := 0; idx < len(r.frames[n]); idx++ {
// PTS is too old
if r.frames[n][idx].Pts() < ptsMax {
r.p.put(r.frames[n][idx])
r.frames[n] = append(r.frames[n][:idx], r.frames[n][idx+1:]...)
idx--
continue
} else if s.ptsMin > r.buf[idx].f.Pts() {
r.p.put(r.buf[idx].f)
r.buf = append(r.buf[:idx], r.buf[idx+1:]...)
idx--
continue
}
}
}
}
func (r *RateEnforcer) current() (i *rateEnforcerItem, filled bool) {
if r.slots[0] != nil && r.slots[0].i != nil {
// Get item
i = r.slots[0].i
// No fill
r.f.NoFill(i.f, i.n)
} else {
// Fill
if f, n := r.f.Fill(); f != nil {
i = newRateEnforcerItem(f, n)
}
// Update filled
filled = true
}
return
}
type RateEnforcerFiller interface {
Fill() (*astiav.Frame, astiencoder.Node)
NoFill(*astiav.Frame, astiencoder.Node)