add draft for rtmp

This commit is contained in:
Leandro Moreira
2024-05-12 15:05:35 -03:00
parent 4cb98061df
commit 199c23e9fd
23 changed files with 420 additions and 98 deletions

View File

@@ -50,7 +50,10 @@ func NewLibAVFFmpegStreamer(p LibAVFFmpegStreamerParams) ResultLibAVFFmpegStream
}
func (c *LibAVFFmpegStreamer) Match(req *entities.RequestParams) bool {
return req.SRTHost != ""
isRTMP := strings.Contains(strings.ToLower(req.StreamURL), "rtmp")
isSRT := strings.Contains(strings.ToLower(req.StreamURL), "srt")
return isRTMP || isSRT
}
type streamContext struct {
@@ -78,7 +81,7 @@ type libAVParams struct {
}
func (c *LibAVFFmpegStreamer) Stream(donut *entities.DonutParameters) {
c.l.Infow("streaming has started")
c.l.Infof("streaming has started for %#v", donut)
closer := astikit.NewCloser()
defer closer.Close()
@@ -89,20 +92,26 @@ func (c *LibAVFFmpegStreamer) Stream(donut *entities.DonutParameters) {
// it's useful for debugging
astiav.SetLogLevel(astiav.LogLevelDebug)
astiav.SetLogCallback(func(l astiav.LogLevel, fmt, msg, parent string) {
astiav.SetLogCallback(func(_ astiav.Classer, l astiav.LogLevel, fmt, msg string) {
c.l.Infof("ffmpeg %s: - %s", c.libAVLogToString(l), strings.TrimSpace(msg))
})
// 138.1 internal/controllers/streamers/libav_ffmpeg.go:95:24:
// cannot use func(l astiav.LogLevel, fmt, msg, parent string) {…}
// (value of type func(l astiav.LogLevel, fmt string, msg string, parent string)) as astiav.LogCallback value in argument to astiav.SetLogCallback
c.l.Infof("preparing input")
if err := c.prepareInput(p, closer, donut); err != nil {
c.onError(err, donut)
return
}
c.l.Infof("preparing output")
if err := c.prepareOutput(p, closer, donut); err != nil {
c.onError(err, donut)
return
}
c.l.Infof("preparing filters")
if err := c.prepareFilters(p, closer, donut); err != nil {
c.onError(err, donut)
return
@@ -121,6 +130,7 @@ func (c *LibAVFFmpegStreamer) Stream(donut *entities.DonutParameters) {
c.onError(donut.Ctx.Err(), donut)
return
default:
c.l.Infof("started reading frame")
if err := p.inputFormatContext.ReadFrame(inPkt); err != nil {
if errors.Is(err, astiav.ErrEof) {
c.l.Info("streaming has ended")
@@ -141,6 +151,63 @@ func (c *LibAVFFmpegStreamer) Stream(donut *entities.DonutParameters) {
isVideoBypass := donut.Recipe.Video.Action == entities.DonutBypass
if isVideo && isVideoBypass {
if donut.OnVideoFrame != nil {
// The SRT(mpegts[h264]) bitstream format is Annex B 0x0, 0x0, 0x0, 0x1 [Start Code]
// [start code]--[NAL]--[start code]--[NAL] etc
//
// The RTMP(flv[h264]) bitstream format is AVCC (mp4) 0xY, 0xZ, 0xK, 0xW [Length]
// [SIZE (4 bytes)]--[NAL]--[SIZE (4 bytes)]--[NAL] etc
//
// ref: https://stackoverflow.com/questions/28421375/usage-of-start-code-for-h264-video/29103276#29103276
//
// To convert from AVCC to AnnexB:
//
// Remove length, insert start code, insert SPS for each I-frame, insert PPS for each frame, insert AU delimiter for each GOP.
//
// https://ffmpeg.org/doxygen/trunk/h264__mp4toannexb__bsf_8c.html#a773e34981d7642d499348d1ae72fd02e
// av_bsf_send_packet(bsfContext, pkt)
// av_bsf_receive_packet(bsfContext, pkt)
// for {
// c.l.Infof("start receiving packet")
// if err := s.decCodecContext.ReceiveFrame(s.decFrame); err != nil {
// if errors.Is(err, astiav.ErrEof) || errors.Is(err, astiav.ErrEagain) {
// break
// }
// c.onError(err, donut)
// return
// }
// c.l.Infof("start filtering")
// if err := c.filterAndEncode(s.decFrame, s, donut); err != nil {
// c.onError(err, donut)
// return
// }
// }
bistreamFilter := astiav.FindBitStreamFilterByName("h264_mp4toannexb")
if bistreamFilter == nil {
c.l.Info("cannot find bit stream filter")
return
}
bsfCtx, err := astiav.AllocBitStreamContext(bistreamFilter)
if err != nil {
c.l.Info("error while AllocBitStreamContext", err)
return
}
if err := bsfCtx.Init(); err != nil {
c.l.Info("error while init", err)
return
}
if err := bsfCtx.SendPacket(inPkt); err != nil {
c.l.Info("error while SendPacket", err)
return
}
if bsfCtx.ReceivePacket(inPkt) != nil {
c.l.Info("error while ReceivePacket", err)
return
}
if err := donut.OnVideoFrame(inPkt.Data(), entities.MediaFrameContext{
PTS: int(inPkt.Pts()),
DTS: int(inPkt.Dts()),
@@ -169,12 +236,19 @@ func (c *LibAVFFmpegStreamer) Stream(donut *entities.DonutParameters) {
continue
}
if isAudio {
continue
}
c.l.Infof("start sending packet")
// c.processPacket(inPkt, s, donut)
if err := s.decCodecContext.SendPacket(inPkt); err != nil {
c.onError(err, donut)
return
}
for {
c.l.Infof("start receiving packet")
if err := s.decCodecContext.ReceiveFrame(s.decFrame); err != nil {
if errors.Is(err, astiav.ErrEof) || errors.Is(err, astiav.ErrEagain) {
break
@@ -182,7 +256,7 @@ func (c *LibAVFFmpegStreamer) Stream(donut *entities.DonutParameters) {
c.onError(err, donut)
return
}
c.l.Infof("start filtering")
if err := c.filterAndEncode(s.decFrame, s, donut); err != nil {
c.onError(err, donut)
return
@@ -414,8 +488,6 @@ func (c *LibAVFFmpegStreamer) prepareFilters(p *libAVParams, closer *astikit.Clo
closer.Add(inputs.Free)
if s.decCodecContext.MediaType() == astiav.MediaTypeAudio {
// TODO: what's the difference between args and content?
// why args are necessary?
args = astiav.FilterArgs{
"channel_layout": s.decCodecContext.ChannelLayout().String(),
"sample_fmt": s.decCodecContext.SampleFormat().Name(),
@@ -483,6 +555,29 @@ func (c *LibAVFFmpegStreamer) prepareFilters(p *libAVParams, closer *astikit.Clo
return nil
}
func (c *LibAVFFmpegStreamer) processPacket(pkt *astiav.Packet, s *streamContext, donut *entities.DonutParameters) {
if err := s.decCodecContext.SendPacket(pkt); err != nil {
c.onError(err, donut)
return
}
for {
c.l.Infof("start receiving packet")
if err := s.decCodecContext.ReceiveFrame(s.decFrame); err != nil {
if errors.Is(err, astiav.ErrEof) || errors.Is(err, astiav.ErrEagain) {
break
}
c.onError(err, donut)
return
}
c.l.Infof("start filtering")
if err := c.filterAndEncode(s.decFrame, s, donut); err != nil {
c.onError(err, donut)
return
}
}
}
func (c *LibAVFFmpegStreamer) filterAndEncode(f *astiav.Frame, s *streamContext, donut *entities.DonutParameters) (err error) {
if err = s.buffersrcContext.BuffersrcAddFrame(f, astiav.NewBuffersrcFlags(astiav.BuffersrcFlagKeepRef)); err != nil {
return fmt.Errorf("adding frame failed: %w", err)
@@ -499,7 +594,7 @@ func (c *LibAVFFmpegStreamer) filterAndEncode(f *astiav.Frame, s *streamContext,
}
// TODO: should we avoid setting the picture type for audio?
s.filterFrame.SetPictureType(astiav.PictureTypeNone)
c.l.Infof("start encoding")
if err = c.encodeFrame(s.filterFrame, s, donut); err != nil {
err = fmt.Errorf("main: encoding and writing frame failed: %w", err)
return
@@ -520,6 +615,7 @@ func (c *LibAVFFmpegStreamer) encodeFrame(f *astiav.Frame, s *streamContext, don
}
for {
c.l.Infof("start receiving packet")
if err = s.encCodecContext.ReceivePacket(s.encPkt); err != nil {
if errors.Is(err, astiav.ErrEof) || errors.Is(err, astiav.ErrEagain) {
err = nil
@@ -535,6 +631,7 @@ func (c *LibAVFFmpegStreamer) encodeFrame(f *astiav.Frame, s *streamContext, don
isVideo := s.decCodecContext.MediaType() == astiav.MediaTypeVideo
if isVideo {
if donut.OnVideoFrame != nil {
c.l.Infof("sending transcoded video")
if err := donut.OnVideoFrame(s.encPkt.Data(), entities.MediaFrameContext{
PTS: int(s.encPkt.Pts()),
DTS: int(s.encPkt.Dts()),
@@ -548,6 +645,7 @@ func (c *LibAVFFmpegStreamer) encodeFrame(f *astiav.Frame, s *streamContext, don
isAudio := s.decCodecContext.MediaType() == astiav.MediaTypeAudio
if isAudio {
if donut.OnAudioFrame != nil {
c.l.Infof("sending transcoded audio")
if err := donut.OnAudioFrame(s.encPkt.Data(), entities.MediaFrameContext{
PTS: int(s.encPkt.Pts()),
DTS: int(s.encPkt.Dts()),