Added demuxer looper

This commit is contained in:
Quentin Renard
2021-04-26 10:18:33 +02:00
parent 2b58b7c8eb
commit c0c7de7ef5
7 changed files with 225 additions and 63 deletions

View File

@@ -50,7 +50,7 @@ type openedOutput struct {
type buildData struct {
c *astikit.Closer
decoders map[*astilibav.Demuxer]map[*avformat.Stream]*astilibav.Decoder
decoders map[*astilibav.Demuxer]map[*astilibav.Stream]*astilibav.Decoder
eh *astiencoder.EventHandler
inputs map[string]openedInput
outputs map[string]openedOutput
@@ -62,7 +62,7 @@ func newBuildData(w *astiencoder.Workflow, eh *astiencoder.EventHandler, c *asti
return &buildData{
c: c,
eh: eh,
decoders: make(map[*astilibav.Demuxer]map[*avformat.Stream]*astilibav.Decoder),
decoders: make(map[*astilibav.Demuxer]map[*astilibav.Stream]*astilibav.Decoder),
s: s,
w: w,
}
@@ -190,14 +190,14 @@ func (b *builder) addOperationToWorkflow(name string, o JobOperation, bd *buildD
// Loop through inputs
for _, i := range ois {
// Loop through streams
for _, is := range i.o.d.CtxFormat().Streams() {
for _, is := range i.o.d.Streams() {
// Only process a specific PID
if i.c.PID != nil && is.Id() != *i.c.PID {
if i.c.PID != nil && is.ID != *i.c.PID {
continue
}
// Only process a specific media type
if t := avutil.MediaTypeFromString(i.c.MediaType); t > -1 && is.CodecParameters().CodecType() != avcodec.MediaType(t) {
if t := avutil.MediaTypeFromString(i.c.MediaType); t > -1 && is.CodecParameters.CodecType() != avcodec.MediaType(t) {
continue
}
@@ -211,7 +211,7 @@ func (b *builder) addOperationToWorkflow(name string, o JobOperation, bd *buildD
// Clone stream
var os *avformat.Stream
if os, err = astilibav.CloneStream(is, o.o.m.CtxFormat()); err != nil {
err = fmt.Errorf("main: cloning stream 0x%x(%d) of %s failed: %w", is.Id(), is.Id(), i.c.Name, err)
err = fmt.Errorf("main: cloning stream 0x%x(%d) of %s failed: %w", is.ID, is.ID, i.c.Name, err)
return
}
@@ -227,7 +227,7 @@ func (b *builder) addOperationToWorkflow(name string, o JobOperation, bd *buildD
// Create decoder
var d *astilibav.Decoder
if d, err = b.createDecoder(bd, i, is); err != nil {
err = fmt.Errorf("main: creating decoder for stream 0x%x(%d) of input %s failed: %w", is.Id(), is.Id(), i.c.Name, err)
err = fmt.Errorf("main: creating decoder for stream 0x%x(%d) of input %s failed: %w", is.ID, is.ID, i.c.Name, err)
return
}
@@ -237,14 +237,14 @@ func (b *builder) addOperationToWorkflow(name string, o JobOperation, bd *buildD
// Create filterer
var f *astilibav.Filterer
if f, err = b.createFilterer(bd, outCtx, d); err != nil {
err = fmt.Errorf("main: creating filterer for stream 0x%x(%d) of input %s failed: %w", is.Id(), is.Id(), i.c.Name, err)
err = fmt.Errorf("main: creating filterer for stream 0x%x(%d) of input %s failed: %w", is.ID, is.ID, i.c.Name, err)
return
}
// Create encoder
var e *astilibav.Encoder
if e, err = astilibav.NewEncoder(astilibav.EncoderOptions{Ctx: outCtx}, bd.eh, bd.c, bd.s); err != nil {
err = fmt.Errorf("main: creating encoder for stream 0x%x(%d) of input %s failed: %w", is.Id(), is.Id(), i.c.Name, err)
err = fmt.Errorf("main: creating encoder for stream 0x%x(%d) of input %s failed: %w", is.ID, is.ID, i.c.Name, err)
return
}
@@ -275,7 +275,7 @@ func (b *builder) addOperationToWorkflow(name string, o JobOperation, bd *buildD
// Add stream
var os *avformat.Stream
if os, err = e.AddStream(o.o.m.CtxFormat()); err != nil {
err = fmt.Errorf("main: adding stream for stream 0x%x(%d) of %s and output %s failed: %w", is.Id(), is.Id(), i.c.Name, o.c.Name, err)
err = fmt.Errorf("main: adding stream for stream 0x%x(%d) of %s and output %s failed: %w", is.ID, is.ID, i.c.Name, o.c.Name, err)
return
}
@@ -399,23 +399,23 @@ func (b *builder) operationOutputCtx(o JobOperation, inCtx astilibav.Context, oo
return
}
func (b *builder) createDecoder(bd *buildData, i operationInput, is *avformat.Stream) (d *astilibav.Decoder, err error) {
func (b *builder) createDecoder(bd *buildData, i operationInput, is *astilibav.Stream) (d *astilibav.Decoder, err error) {
// Get decoder
var okD, okS bool
if _, okD = bd.decoders[i.o.d]; okD {
d, okS = bd.decoders[i.o.d][is]
} else {
bd.decoders[i.o.d] = make(map[*avformat.Stream]*astilibav.Decoder)
bd.decoders[i.o.d] = make(map[*astilibav.Stream]*astilibav.Decoder)
}
// Decoder doesn't exist
if !okD || !okS {
// Create decoder
if d, err = astilibav.NewDecoder(astilibav.DecoderOptions{
CodecParams: is.CodecParameters(),
OutputCtx: astilibav.NewContextFromStream(is),
CodecParams: is.CodecParameters,
OutputCtx: is.Ctx,
}, bd.eh, bd.c, bd.s); err != nil {
err = fmt.Errorf("main: creating decoder for stream 0x%x(%d) of %s failed: %w", is.Id(), is.Id(), i.c.Name, err)
err = fmt.Errorf("main: creating decoder for stream 0x%x(%d) of %s failed: %w", is.ID, is.ID, i.c.Name, err)
return
}

View File

@@ -39,6 +39,10 @@ type Context struct {
Width int
}
func (ctx Context) Descriptor() Descriptor {
return NewDescriptor(ctx.TimeBase)
}
func (ctx Context) String() string {
// Shared
var ss []string

View File

@@ -3,6 +3,7 @@ package astilibav
import (
"context"
"fmt"
"sort"
"sync"
"sync/atomic"
"time"
@@ -27,10 +28,9 @@ type Demuxer struct {
eh *astiencoder.EventHandler
emulateRate bool
interruptRet *int
loop bool
l *demuxerLooper
p *pktPool
readFrameErrorHandler DemuxerReadFrameErrorHandler
restamper PktRestamper
ss map[int]*demuxerStream
statIncomingRate *astikit.CounterRateStat
}
@@ -39,10 +39,20 @@ type DemuxerReadFrameErrorHandler func(d *Demuxer, ret int) (stop, handled bool)
type demuxerStream struct {
ctx Context
d Descriptor
e *demuxerRateEmulator
s *avformat.Stream
}
func (d *demuxerStream) stream() *Stream {
return &Stream{
CodecParameters: d.s.CodecParameters(),
Ctx: d.ctx,
ID: d.s.Id(),
Index: d.s.Index(),
}
}
// DemuxerOptions represents demuxer options
type DemuxerOptions struct {
// String content of the demuxer as you would use in ffmpeg
@@ -79,7 +89,6 @@ func NewDemuxer(o DemuxerOptions, eh *astiencoder.EventHandler, c *astikit.Close
d = &Demuxer{
eh: eh,
emulateRate: o.EmulateRate,
loop: o.Loop,
p: newPktPool(c),
readFrameErrorHandler: o.ReadFrameErrorHandler,
ss: make(map[int]*demuxerStream),
@@ -95,11 +104,6 @@ func NewDemuxer(o DemuxerOptions, eh *astiencoder.EventHandler, c *astikit.Close
// Add stats
d.addStats()
// If loop is enabled, we need to add a restamper
if d.loop {
d.restamper = NewPktRestamperWithPktDuration()
}
// Dict
var dict *avutil.Dictionary
if o.Dict != nil {
@@ -171,20 +175,26 @@ func NewDemuxer(o DemuxerOptions, eh *astiencoder.EventHandler, c *astikit.Close
return
}
// Index streams
// Loop through streams
for _, s := range d.ctxFormat.Streams() {
// Create stream
// Create demuxer stream
ds := &demuxerStream{
ctx: NewContextFromStream(s),
s: s,
}
ds.d = ds.ctx.Descriptor()
// Create rate emulator
ds.e = newDemuxerRateEmulator(o.EmulateRateBufferDuration, d.d, d.eh, d.p, s, ds.ctx)
ds.e = newDemuxerRateEmulator(o.EmulateRateBufferDuration, d.d, d.eh, d.p, ds)
// Store
// Store stream
d.ss[s.Index()] = ds
}
// Create looper
if o.Loop {
d.l = newDemuxerLooper(d.ss)
}
return
}
@@ -205,9 +215,22 @@ func (d *Demuxer) addStats() {
d.BaseNode.AddStats(ss...)
}
// CtxFormat returns the format ctx
func (d *Demuxer) CtxFormat() *avformat.Context {
return d.ctxFormat
// Streams returns the streams ordered by index
func (d *Demuxer) Streams() (ss []*Stream) {
// Get indexes
var idxs []int
for idx := range d.ss {
idxs = append(idxs, idx)
}
// Sort indexes
sort.Ints(idxs)
// Loop through indexes
for _, idx := range idxs {
ss = append(ss, d.ss[idx].stream())
}
return
}
// Connect implements the PktHandlerConnector interface
@@ -229,7 +252,7 @@ func (d *Demuxer) Disconnect(h PktHandler) {
}
// ConnectForStream connects the demuxer to a PktHandler for a specific stream
func (d *Demuxer) ConnectForStream(h PktHandler, i *avformat.Stream) {
func (d *Demuxer) ConnectForStream(h PktHandler, i *Stream) {
// Add handler
d.d.addHandler(newPktCond(i, h))
@@ -238,7 +261,7 @@ func (d *Demuxer) ConnectForStream(h PktHandler, i *avformat.Stream) {
}
// DisconnectForStream disconnects the demuxer from a PktHandler for a specific stream
func (d *Demuxer) DisconnectForStream(h PktHandler, i *avformat.Stream) {
func (d *Demuxer) DisconnectForStream(h PktHandler, i *Stream) {
// Delete handler
d.d.delHandler(newPktCond(i, h))
@@ -294,6 +317,11 @@ func (d *Demuxer) Start(ctx context.Context, t astiencoder.CreateTaskFunc) {
// Wait for rate emulators
wg.Wait()
// Reset looper
if d.l != nil {
d.l.reset()
}
})
}
@@ -304,7 +332,24 @@ func (d *Demuxer) readFrame(ctx context.Context) (stop bool) {
// Read frame
if ret := d.ctxFormat.AvReadFrame(pkt); ret < 0 {
if d.loop && ret == avutil.AVERROR_EOF {
if d.l != nil && ret == avutil.AVERROR_EOF {
// Let the looper know we're looping
d.l.looping(func(delta int64, idx int) {
// Not emulating rate
if !d.emulateRate {
return
}
// Get stream
s, ok := d.ss[idx]
if !ok {
return
}
// Add duration to rate emulator
s.e.add(delta)
})
// Seek to start
if ret = d.ctxFormat.AvSeekFrame(-1, d.ctxFormat.StartTime(), avformat.AVSEEK_FLAG_BACKWARD); ret < 0 {
emitAvError(d, d.eh, ret, "ctxFormat.AvSeekFrame on %s failed", d.ctxFormat.Filename())
@@ -337,22 +382,117 @@ func (d *Demuxer) readFrame(ctx context.Context) (stop bool) {
return
}
// Restamp
if d.restamper != nil {
d.restamper.Restamp(pkt)
// Handle loop
if d.l != nil {
d.l.handlePkt(pkt)
}
// Emulate rate
if d.emulateRate {
// Add to rate emulator
s.e.add(ctx, pkt)
// Handle rate emulation
s.e.handlePkt(ctx, pkt)
} else {
// Dispatch pkt
d.d.dispatch(pkt, s.s)
d.d.dispatch(pkt, s.d)
}
return
}
type demuxerLooper struct {
r *PktRestamperWithPktDuration
ss map[int]*demuxerLooperStream // Indexed by stream index
}
type demuxerLooperStream struct {
delta time.Duration
duration time.Duration
s *demuxerStream
}
func newDemuxerLooperStream(s *demuxerStream) *demuxerLooperStream {
return &demuxerLooperStream{s: s}
}
func newDemuxerLooper(ss map[int]*demuxerStream) (l *demuxerLooper) {
// Create looper
l = &demuxerLooper{
r: NewPktRestamperWithPktDuration(),
ss: make(map[int]*demuxerLooperStream),
}
// Loop through streams
for idx, s := range ss {
l.ss[idx] = newDemuxerLooperStream(s)
}
return
}
func (l *demuxerLooper) reset() {
for _, s := range l.ss {
s.delta = 0
s.duration = 0
}
}
func (l *demuxerLooper) handlePkt(pkt *avcodec.Packet) {
// Get stream
s, ok := l.ss[pkt.StreamIndex()]
if !ok {
return
}
// Increment duration
s.duration += time.Duration(avutil.AvRescaleQ(pkt.Duration(), s.s.ctx.TimeBase, nanosecondRational))
// Restamp
l.r.Restamp(pkt)
}
func (l *demuxerLooper) looping(fn func(delta int64, idx int)) {
// Get max duration
var maxDuration time.Duration
for _, s := range l.ss {
if s.duration > maxDuration {
maxDuration = s.duration
}
}
// Loop through streams
for idx, s := range l.ss {
// Get delta duration
exactDeltaDuration := maxDuration - s.duration + s.delta
// Reset duration
s.duration = 0
// No delta
if exactDeltaDuration <= 0 {
continue
}
// Get delta expressed in stream timebase
roundedDeltaInt := avutil.AvRescaleQ(exactDeltaDuration.Nanoseconds(), nanosecondRational, s.s.ctx.TimeBase)
// AvRescaleQ rounds to the nearest int, we need to make sure it's rounded to the nearest smaller int
roundedDeltaDuration := time.Duration(avutil.AvRescaleQ(roundedDeltaInt, s.s.ctx.TimeBase, nanosecondRational))
if roundedDeltaDuration > exactDeltaDuration {
roundedDeltaInt--
}
// Use delta
if roundedDeltaInt > 0 {
// Add delta to restamper
l.r.Add(roundedDeltaInt, idx)
// Custom
fn(roundedDeltaInt, idx)
}
// Update delta
s.delta = exactDeltaDuration - time.Duration(avutil.AvRescaleQ(roundedDeltaInt, s.s.ctx.TimeBase, nanosecondRational))
}
}
type demuxerRateEmulator struct {
bufferDuration time.Duration
c *astikit.Chan
@@ -364,11 +504,10 @@ type demuxerRateEmulator struct {
nextAt time.Time
p *pktPool
ps map[*avcodec.Packet]bool
s *avformat.Stream
sc Context
s *demuxerStream
}
func newDemuxerRateEmulator(bufferDuration time.Duration, d *pktDispatcher, eh *astiencoder.EventHandler, p *pktPool, s *avformat.Stream, sc Context) (e *demuxerRateEmulator) {
func newDemuxerRateEmulator(bufferDuration time.Duration, d *pktDispatcher, eh *astiencoder.EventHandler, p *pktPool, s *demuxerStream) (e *demuxerRateEmulator) {
e = &demuxerRateEmulator{
bufferDuration: bufferDuration,
c: astikit.NewChan(astikit.ChanOptions{}),
@@ -378,7 +517,6 @@ func newDemuxerRateEmulator(bufferDuration time.Duration, d *pktDispatcher, eh *
p: p,
ps: make(map[*avcodec.Packet]bool),
s: s,
sc: sc,
}
if e.bufferDuration <= 0 {
e.bufferDuration = time.Second
@@ -416,7 +554,7 @@ func (e *demuxerRateEmulator) stop() {
e.c.Stop()
}
func (e *demuxerRateEmulator) add(ctx context.Context, in *avcodec.Packet) {
func (e *demuxerRateEmulator) handlePkt(ctx context.Context, in *avcodec.Packet) {
// Copy pkt
pkt := e.p.get()
if ret := pkt.AvPacketRef(in); ret < 0 {
@@ -436,7 +574,7 @@ func (e *demuxerRateEmulator) add(ctx context.Context, in *avcodec.Packet) {
}
// Compute next at
e.nextAt = pktAt.Add(time.Duration(avutil.AvRescaleQ(pktDuration(pkt, e.sc), e.s.TimeBase(), nanosecondRational)))
e.nextAt = pktAt.Add(time.Duration(avutil.AvRescaleQ(pktDuration(pkt, e.s.ctx), e.s.ctx.TimeBase, nanosecondRational)))
// Add to chan
e.c.Add(func() {
@@ -457,7 +595,7 @@ func (e *demuxerRateEmulator) add(ctx context.Context, in *avcodec.Packet) {
}
// Dispatch
e.d.dispatch(pkt, e.s)
e.d.dispatch(pkt, e.s.d)
})
// Too many pkts are buffered, demuxer needs to wait
@@ -466,3 +604,8 @@ func (e *demuxerRateEmulator) add(ctx context.Context, in *avcodec.Packet) {
astikit.Sleep(ctx, delta)
}
}
func (e *demuxerRateEmulator) add(delta int64) {
// Compute next at
e.nextAt = e.nextAt.Add(time.Duration(avutil.AvRescaleQ(delta, e.s.ctx.TimeBase, nanosecondRational)))
}

View File

@@ -7,7 +7,6 @@ import (
"github.com/asticode/go-astiencoder"
"github.com/asticode/go-astikit"
"github.com/asticode/goav/avcodec"
"github.com/asticode/goav/avformat"
"github.com/asticode/goav/avutil"
)
@@ -114,10 +113,10 @@ type PktCond interface {
type pktCond struct {
PktHandler
i *avformat.Stream
i *Stream
}
func newPktCond(i *avformat.Stream, h PktHandler) *pktCond {
func newPktCond(i *Stream, h PktHandler) *pktCond {
return &pktCond{
i: i,
PktHandler: h,
@@ -127,13 +126,13 @@ func newPktCond(i *avformat.Stream, h PktHandler) *pktCond {
// Metadata implements the NodeDescriptor interface
func (c *pktCond) Metadata() astiencoder.NodeMetadata {
m := c.PktHandler.Metadata()
m.Name = fmt.Sprintf("%s_%d", c.PktHandler.Metadata().Name, c.i.Index())
m.Name = fmt.Sprintf("%s_%d", c.PktHandler.Metadata().Name, c.i.Index)
return m
}
// UsePkt implements the PktCond interface
func (c *pktCond) UsePkt(pkt *avcodec.Packet) bool {
return pkt.StreamIndex() == c.i.Index()
return pkt.StreamIndex() == c.i.Index
}
type pktPool struct {

View File

@@ -40,23 +40,23 @@ func (r *pktRestamperWithOffset) restamp(pkt *avcodec.Packet, fn func(pkt *avcod
pkt.SetPts(dts + delta)
}
type pktRestamperStartFromZero struct {
type PktRestamperStartFromZero struct {
*pktRestamperWithOffset
}
// NewPktRestamperStartFromZero creates a new pkt restamper that starts timestamps from 0
func NewPktRestamperStartFromZero() PktRestamper {
return &pktRestamperStartFromZero{pktRestamperWithOffset: newPktRestamperWithOffset()}
func NewPktRestamperStartFromZero() *PktRestamperStartFromZero {
return &PktRestamperStartFromZero{pktRestamperWithOffset: newPktRestamperWithOffset()}
}
// Restamp implements the Restamper interface
func (r *pktRestamperStartFromZero) Restamp(pkt *avcodec.Packet) {
func (r *PktRestamperStartFromZero) Restamp(pkt *avcodec.Packet) {
r.restamp(pkt, func(pkt *avcodec.Packet) int64 {
return -pkt.Dts()
})
}
type pktRestamperWithPktDuration struct {
type PktRestamperWithPktDuration struct {
lastItem map[int]*pktRestamperWithPktDurationItem
m *sync.Mutex
}
@@ -68,15 +68,24 @@ type pktRestamperWithPktDurationItem struct {
// NewPktRestamperWithPktDuration creates a new pkt restamper that starts timestamps from 0 and increments them
// of the previous pkt.Duration()
func NewPktRestamperWithPktDuration() PktRestamper {
return &pktRestamperWithPktDuration{
func NewPktRestamperWithPktDuration() *PktRestamperWithPktDuration {
return &PktRestamperWithPktDuration{
lastItem: make(map[int]*pktRestamperWithPktDurationItem),
m: &sync.Mutex{},
}
}
// Add adds delta to the last item duration of the specified stream index
func (r *PktRestamperWithPktDuration) Add(delta int64, idx int) {
r.m.Lock()
defer r.m.Unlock()
if i, ok := r.lastItem[idx]; ok && i != nil {
i.duration += delta
}
}
// Restamp implements the FrameRestamper interface
func (r *pktRestamperWithPktDuration) Restamp(pkt *avcodec.Packet) {
func (r *PktRestamperWithPktDuration) Restamp(pkt *avcodec.Packet) {
// Get last item
r.m.Lock()
lastItem := r.lastItem[pkt.StreamIndex()]

View File

@@ -73,7 +73,7 @@ func NewRateEnforcer(o RateEnforcerOptions, eh *astiencoder.EventHandler, c *ast
r = &RateEnforcer{
adaptSlotsToIncomingFrames: o.AdaptSlotsToIncomingFrames,
c: astikit.NewChan(astikit.ChanOptions{ProcessAll: true}),
descriptor: NewDescriptor(o.OutputCtx.TimeBase),
descriptor: o.OutputCtx.Descriptor(),
eh: eh,
f: o.Filler,
m: &sync.Mutex{},

View File

@@ -7,19 +7,26 @@ import (
"github.com/asticode/goav/avformat"
)
type Stream struct {
CodecParameters *avcodec.CodecParameters
Ctx Context
ID int
Index int
}
// AddStream adds a stream to the format ctx
func AddStream(ctxFormat *avformat.Context) *avformat.Stream {
return ctxFormat.AvformatNewStream(nil)
}
// CloneStream clones a stream and add it to the format ctx
func CloneStream(i *avformat.Stream, ctxFormat *avformat.Context) (o *avformat.Stream, err error) {
func CloneStream(i *Stream, ctxFormat *avformat.Context) (o *avformat.Stream, err error) {
// Add stream
o = AddStream(ctxFormat)
// Copy codec parameters
if ret := avcodec.AvcodecParametersCopy(o.CodecParameters(), i.CodecParameters()); ret < 0 {
err = fmt.Errorf("astilibav: avcodec.AvcodecParametersCopy from %+v to %+v failed: %w", i.CodecParameters(), o.CodecParameters(), NewAvError(ret))
if ret := avcodec.AvcodecParametersCopy(o.CodecParameters(), i.CodecParameters); ret < 0 {
err = fmt.Errorf("astilibav: avcodec.AvcodecParametersCopy from %+v to %+v failed: %w", i.CodecParameters, o.CodecParameters(), NewAvError(ret))
return
}