add proper packet handling for bit stream filter

This commit is contained in:
Leandro Moreira
2024-05-15 22:42:15 -03:00
parent 60bff6779f
commit 8e4306c0f7

View File

@@ -150,77 +150,18 @@ func (c *LibAVFFmpegStreamer) Stream(donut *entities.DonutParameters) {
continue
}
isVideo := s.decCodecContext.MediaType() == astiav.MediaTypeVideo
isAudio := s.decCodecContext.MediaType() == astiav.MediaTypeAudio
var currentMedia *entities.DonutMediaTask
if isAudio {
currentMedia = &donut.Recipe.Audio
} else if isVideo {
currentMedia = &donut.Recipe.Video
if s.bsfContext != nil {
if err := c.applyBitStreamFilter(inPkt, s, closer, donut); err != nil {
c.onError(err, donut)
return
}
} else {
c.l.Warnf("ignoring to stream for media type %s", s.decCodecContext.MediaType().String())
continue
}
if currentMedia.DonutBitStreamFilter != nil {
if err := c.applyBitStreamFilter(inPkt, s, closer, currentMedia.DonutBitStreamFilter); err != nil {
c.onError(err, donut)
return
}
}
inPkt.RescaleTs(s.inputStream.TimeBase(), s.decCodecContext.TimeBase())
byPass := currentMedia.Action == entities.DonutBypass
if isVideo && byPass {
if donut.OnVideoFrame != nil {
if err := donut.OnVideoFrame(inPkt.Data(), entities.MediaFrameContext{
PTS: int(inPkt.Pts()),
DTS: int(inPkt.Dts()),
Duration: c.defineVideoDuration(s, inPkt),
}); err != nil {
c.onError(err, donut)
return
}
}
continue
}
if isAudio && byPass {
if donut.OnAudioFrame != nil {
if err := donut.OnAudioFrame(inPkt.Data(), entities.MediaFrameContext{
PTS: int(inPkt.Pts()),
DTS: int(inPkt.Dts()),
Duration: c.defineAudioDuration(s, inPkt),
}); err != nil {
c.onError(err, donut)
return
}
}
continue
}
// if isAudio {
// continue
// }
if err := s.decCodecContext.SendPacket(inPkt); err != nil {
c.onError(err, donut)
return
}
for {
if err := s.decCodecContext.ReceiveFrame(s.decFrame); err != nil {
if errors.Is(err, astiav.ErrEof) || errors.Is(err, astiav.ErrEagain) {
break
}
c.onError(err, donut)
return
}
if err := c.filterAndEncode(s.decFrame, s, donut); err != nil {
if err := c.processPacket(inPkt, s, donut); err != nil {
c.onError(err, donut)
return
}
}
inPkt.Unref()
}
}
}
@@ -558,12 +499,89 @@ func (c *LibAVFFmpegStreamer) prepareBitStreamFilters(p *libAVParams, closer *as
return nil
}
func (c *LibAVFFmpegStreamer) applyBitStreamFilter(p *astiav.Packet, s *streamContext, closer *astikit.Closer, filter *entities.DonutBitStreamFilter) error {
if err := s.bsfContext.SendPacket(p); err != nil {
return fmt.Errorf("error while sending the packet %w", err)
func (c *LibAVFFmpegStreamer) processPacket(pkt *astiav.Packet, s *streamContext, donut *entities.DonutParameters) error {
isVideo := s.decCodecContext.MediaType() == astiav.MediaTypeVideo
isAudio := s.decCodecContext.MediaType() == astiav.MediaTypeAudio
var currentMedia *entities.DonutMediaTask
if isAudio {
currentMedia = &donut.Recipe.Audio
} else if isVideo {
currentMedia = &donut.Recipe.Video
} else {
c.l.Warnf("ignoring to stream for media type %s", s.decCodecContext.MediaType().String())
return nil
}
if err := s.bsfContext.ReceivePacket(p); err != nil {
return fmt.Errorf("error while receiving the packet %w", err)
pkt.RescaleTs(s.inputStream.TimeBase(), s.decCodecContext.TimeBase())
byPass := currentMedia.Action == entities.DonutBypass
if isVideo && byPass {
if donut.OnVideoFrame != nil {
if err := donut.OnVideoFrame(pkt.Data(), entities.MediaFrameContext{
PTS: int(pkt.Pts()),
DTS: int(pkt.Dts()),
Duration: c.defineVideoDuration(s, pkt),
}); err != nil {
return err
}
}
return nil
}
if isAudio && byPass {
if donut.OnAudioFrame != nil {
if err := donut.OnAudioFrame(pkt.Data(), entities.MediaFrameContext{
PTS: int(pkt.Pts()),
DTS: int(pkt.Dts()),
Duration: c.defineAudioDuration(s, pkt),
}); err != nil {
return err
}
}
return nil
}
// if isAudio {
// continue
// }
if err := s.decCodecContext.SendPacket(pkt); err != nil {
return err
}
for {
if err := s.decCodecContext.ReceiveFrame(s.decFrame); err != nil {
if errors.Is(err, astiav.ErrEof) || errors.Is(err, astiav.ErrEagain) {
break
}
return err
}
if err := c.filterAndEncode(s.decFrame, s, donut); err != nil {
return err
}
}
return nil
}
func (c *LibAVFFmpegStreamer) applyBitStreamFilter(pkt *astiav.Packet, s *streamContext, closer *astikit.Closer, donut *entities.DonutParameters) error {
bitStreamPkt := astiav.AllocPacket()
closer.Add(bitStreamPkt.Free)
// TODO: enable bsf per stream
if err := s.bsfContext.SendPacket(pkt); err != nil && !errors.Is(err, astiav.ErrEagain) {
return fmt.Errorf("sending bit stream packet failed: %w", err)
}
for {
if err := s.bsfContext.ReceivePacket(bitStreamPkt); err != nil {
if errors.Is(err, astiav.ErrEof) || errors.Is(err, astiav.ErrEagain) {
break
}
return fmt.Errorf("receiving bit stream packet failed: %w", err)
}
c.processPacket(bitStreamPkt, s, donut)
bitStreamPkt.Unref()
}
return nil
}