This commit is contained in:
Leandro Moreira
2024-05-12 22:57:52 -03:00
parent 06d749caff
commit c131f10fd4
13 changed files with 76 additions and 229 deletions

View File

@@ -10,9 +10,6 @@ run-dev-total-rebuild:
clean-docker:
docker-compose down -v --rmi all --remove-orphans && docker volume prune -a -f && docker system prune -a -f && docker builder prune -a -f
run-srt:
docker compose stop && docker compose down && docker compose build srt && docker compose up srt
lint:
docker compose stop lint && docker compose down lint && docker compose run --rm lint

View File

@@ -37,12 +37,12 @@ func NewDonutEngineController(p DonutEngineParams) *DonutEngineController {
func (c *DonutEngineController) EngineFor(req *entities.RequestParams) (DonutEngine, error) {
prober := c.selectProberFor(req)
if prober == nil {
return nil, fmt.Errorf("request %v: not fulfilled error %w", req, entities.ErrMissingProber)
return nil, fmt.Errorf("request %v: not fulfilled. error %w", req, entities.ErrMissingProber)
}
streamer := c.selectStreamerFor(req)
if streamer == nil {
return nil, fmt.Errorf("request %v: not fulfilled error %w", req, entities.ErrMissingStreamer)
return nil, fmt.Errorf("request %v: not fulfilled. error %w", req, entities.ErrMissingStreamer)
}
return &donutEngine{
@@ -114,9 +114,9 @@ func (d *donutEngine) RecipeFor(server, client *entities.StreamInfo) (*entities.
r := &entities.DonutRecipe{
Input: appetizer,
Video: entities.DonutMediaTask{
Action: entities.DonutBypass,
// Action: entities.DonutTranscode,
Codec: entities.H264,
Action: entities.DonutBypass,
Codec: entities.H264,
DonutBitStreamFilter: &entities.DonutH264AnnexB,
},
Audio: entities.DonutMediaTask{
Action: entities.DonutTranscode,
@@ -137,15 +137,20 @@ func (d *donutEngine) RecipeFor(server, client *entities.StreamInfo) (*entities.
}
func (d *donutEngine) Appetizer() (entities.DonutAppetizer, error) {
if strings.Contains(strings.ToLower(d.req.StreamURL), "rtmp") {
isRTMP := strings.Contains(strings.ToLower(d.req.StreamURL), "rtmp")
isSRT := strings.Contains(strings.ToLower(d.req.StreamURL), "srt")
if isRTMP {
return entities.DonutAppetizer{
URL: fmt.Sprintf("%s/%s", d.req.StreamURL, d.req.StreamID),
Options: map[entities.DonutInputOptionKey]string{
entities.DonutRTMPLive: "live",
},
// Format: "flv",
Format: "flv",
}, nil
} else if strings.Contains(strings.ToLower(d.req.StreamURL), "srt") {
}
if isSRT {
return entities.DonutAppetizer{
URL: d.req.StreamURL,
Format: "mpegts", // TODO: check how to get format for srt

View File

@@ -95,9 +95,6 @@ func (c *LibAVFFmpegStreamer) Stream(donut *entities.DonutParameters) {
astiav.SetLogCallback(func(_ astiav.Classer, l astiav.LogLevel, fmt, msg string) {
c.l.Infof("ffmpeg %s: - %s", c.libAVLogToString(l), strings.TrimSpace(msg))
})
// 138.1 internal/controllers/streamers/libav_ffmpeg.go:95:24:
// cannot use func(l astiav.LogLevel, fmt, msg, parent string) {…}
// (value of type func(l astiav.LogLevel, fmt string, msg string, parent string)) as astiav.LogCallback value in argument to astiav.SetLogCallback
c.l.Infof("preparing input")
if err := c.prepareInput(p, closer, donut); err != nil {
@@ -130,7 +127,6 @@ func (c *LibAVFFmpegStreamer) Stream(donut *entities.DonutParameters) {
c.onError(donut.Ctx.Err(), donut)
return
default:
c.l.Infof("started reading frame")
if err := p.inputFormatContext.ReadFrame(inPkt); err != nil {
if errors.Is(err, astiav.ErrEof) {
c.l.Info("streaming has ended")
@@ -145,77 +141,22 @@ func (c *LibAVFFmpegStreamer) Stream(donut *entities.DonutParameters) {
continue
}
isVideo := s.decCodecContext.MediaType() == astiav.MediaTypeVideo
isAudio := s.decCodecContext.MediaType() == astiav.MediaTypeAudio
if isVideo && donut.Recipe.Video.DonutBitStreamFilter != nil {
c.applyBitStreamFilter(inPkt, s, donut.Recipe.Video.DonutBitStreamFilter)
}
if isAudio && donut.Recipe.Audio.DonutBitStreamFilter != nil {
c.applyBitStreamFilter(inPkt, s, donut.Recipe.Audio.DonutBitStreamFilter)
}
inPkt.RescaleTs(s.inputStream.TimeBase(), s.decCodecContext.TimeBase())
isVideo := s.decCodecContext.MediaType() == astiav.MediaTypeVideo
isVideoBypass := donut.Recipe.Video.Action == entities.DonutBypass
if isVideo && isVideoBypass {
if donut.OnVideoFrame != nil {
// The SRT(mpegts[h264]) bitstream format is Annex B 0x0, 0x0, 0x0, 0x1 [Start Code]
// [start code]--[NAL]--[start code]--[NAL] etc
//
// The RTMP(flv[h264]) bitstream format is AVCC (mp4) 0xY, 0xZ, 0xK, 0xW [Length]
// [SIZE (4 bytes)]--[NAL]--[SIZE (4 bytes)]--[NAL] etc
//
// ref: https://stackoverflow.com/questions/28421375/usage-of-start-code-for-h264-video/29103276#29103276
//
// To convert from AVCC to AnnexB:
//
// Remove length, insert start code, insert SPS for each I-frame, insert PPS for each frame, insert AU delimiter for each GOP.
//
// https://ffmpeg.org/doxygen/trunk/h264__mp4toannexb__bsf_8c.html#a773e34981d7642d499348d1ae72fd02e
// av_bsf_send_packet(bsfContext, pkt)
// av_bsf_receive_packet(bsfContext, pkt)
// for {
// c.l.Infof("start receiving packet")
// if err := s.decCodecContext.ReceiveFrame(s.decFrame); err != nil {
// if errors.Is(err, astiav.ErrEof) || errors.Is(err, astiav.ErrEagain) {
// break
// }
// c.onError(err, donut)
// return
// }
// c.l.Infof("start filtering")
// if err := c.filterAndEncode(s.decFrame, s, donut); err != nil {
// c.onError(err, donut)
// return
// }
// }
// https://github.com/FFmpeg/FFmpeg/blob/9c6c4f3d476d7a8d423ec3b954254c6a67ebc792/libavformat/mux.c#L1351
bistreamFilter := astiav.FindBitStreamFilterByName("h264_mp4toannexb")
if bistreamFilter == nil {
c.l.Info("cannot find bit stream filter")
return
}
bsfCtx, err := astiav.AllocBitStreamContext(bistreamFilter)
if err != nil {
c.l.Info("error while AllocBitStreamContext", err)
return
}
bsfCtx.SetTimeBaseIn(s.inputStream.TimeBase())
if err := s.inputStream.CodecParameters().Copy(bsfCtx.CodecParametersIn()); err != nil {
c.l.Info("error copying codec parameter", err)
return
}
if err := bsfCtx.Init(); err != nil {
c.l.Info("error while init", err)
return
}
if err := bsfCtx.SendPacket(inPkt); err != nil {
c.l.Info("error while SendPacket", err)
return
}
if bsfCtx.ReceivePacket(inPkt) != nil {
c.l.Info("error while ReceivePacket", err)
return
}
if err := donut.OnVideoFrame(inPkt.Data(), entities.MediaFrameContext{
PTS: int(inPkt.Pts()),
DTS: int(inPkt.Dts()),
@@ -228,7 +169,6 @@ func (c *LibAVFFmpegStreamer) Stream(donut *entities.DonutParameters) {
continue
}
isAudio := s.decCodecContext.MediaType() == astiav.MediaTypeAudio
isAudioBypass := donut.Recipe.Audio.Action == entities.DonutBypass
if isAudio && isAudioBypass {
if donut.OnAudioFrame != nil {
@@ -244,19 +184,16 @@ func (c *LibAVFFmpegStreamer) Stream(donut *entities.DonutParameters) {
continue
}
if isAudio {
continue
}
// if isAudio {
// continue
// }
c.l.Infof("start sending packet")
// c.processPacket(inPkt, s, donut)
if err := s.decCodecContext.SendPacket(inPkt); err != nil {
c.onError(err, donut)
return
}
for {
c.l.Infof("start receiving packet")
if err := s.decCodecContext.ReceiveFrame(s.decFrame); err != nil {
if errors.Is(err, astiav.ErrEof) || errors.Is(err, astiav.ErrEagain) {
break
@@ -264,7 +201,6 @@ func (c *LibAVFFmpegStreamer) Stream(donut *entities.DonutParameters) {
c.onError(err, donut)
return
}
c.l.Infof("start filtering")
if err := c.filterAndEncode(s.decFrame, s, donut); err != nil {
c.onError(err, donut)
return
@@ -563,27 +499,32 @@ func (c *LibAVFFmpegStreamer) prepareFilters(p *libAVParams, closer *astikit.Clo
return nil
}
func (c *LibAVFFmpegStreamer) processPacket(pkt *astiav.Packet, s *streamContext, donut *entities.DonutParameters) {
if err := s.decCodecContext.SendPacket(pkt); err != nil {
c.onError(err, donut)
return
func (c *LibAVFFmpegStreamer) applyBitStreamFilter(p *astiav.Packet, s *streamContext, filter *entities.DonutBitStreamFilter) (*astiav.Packet, error) {
bsf := astiav.FindBitStreamFilterByName(string(*filter))
if bsf == nil {
return nil, fmt.Errorf("can not find the filter %s", string(*filter))
}
for {
c.l.Infof("start receiving packet")
if err := s.decCodecContext.ReceiveFrame(s.decFrame); err != nil {
if errors.Is(err, astiav.ErrEof) || errors.Is(err, astiav.ErrEagain) {
break
}
c.onError(err, donut)
return
}
c.l.Infof("start filtering")
if err := c.filterAndEncode(s.decFrame, s, donut); err != nil {
c.onError(err, donut)
return
}
bsfCtx, err := astiav.AllocBitStreamContext(bsf)
if err != nil {
return nil, fmt.Errorf("error while allocating bit stream context %w", err)
}
bsfCtx.SetTimeBaseIn(s.inputStream.TimeBase())
if err := s.inputStream.CodecParameters().Copy(bsfCtx.CodecParametersIn()); err != nil {
return nil, fmt.Errorf("error while copying codec parameters %w", err)
}
if err := bsfCtx.Init(); err != nil {
return nil, fmt.Errorf("error while initiating %w", err)
}
if err := bsfCtx.SendPacket(p); err != nil {
return nil, fmt.Errorf("error while sending the packet %w", err)
}
if bsfCtx.ReceivePacket(p) != nil {
return nil, fmt.Errorf("error while receiving the packet %w", err)
}
return p, nil
}
func (c *LibAVFFmpegStreamer) filterAndEncode(f *astiav.Frame, s *streamContext, donut *entities.DonutParameters) (err error) {
@@ -602,7 +543,6 @@ func (c *LibAVFFmpegStreamer) filterAndEncode(f *astiav.Frame, s *streamContext,
}
// TODO: should we avoid setting the picture type for audio?
s.filterFrame.SetPictureType(astiav.PictureTypeNone)
c.l.Infof("start encoding")
if err = c.encodeFrame(s.filterFrame, s, donut); err != nil {
err = fmt.Errorf("main: encoding and writing frame failed: %w", err)
return
@@ -623,7 +563,6 @@ func (c *LibAVFFmpegStreamer) encodeFrame(f *astiav.Frame, s *streamContext, don
}
for {
c.l.Infof("start receiving packet")
if err = s.encCodecContext.ReceivePacket(s.encPkt); err != nil {
if errors.Is(err, astiav.ErrEof) || errors.Is(err, astiav.ErrEagain) {
err = nil
@@ -639,7 +578,6 @@ func (c *LibAVFFmpegStreamer) encodeFrame(f *astiav.Frame, s *streamContext, don
isVideo := s.decCodecContext.MediaType() == astiav.MediaTypeVideo
if isVideo {
if donut.OnVideoFrame != nil {
c.l.Infof("sending transcoded video")
if err := donut.OnVideoFrame(s.encPkt.Data(), entities.MediaFrameContext{
PTS: int(s.encPkt.Pts()),
DTS: int(s.encPkt.Dts()),
@@ -653,7 +591,6 @@ func (c *LibAVFFmpegStreamer) encodeFrame(f *astiav.Frame, s *streamContext, don
isAudio := s.decCodecContext.MediaType() == astiav.MediaTypeAudio
if isAudio {
if donut.OnAudioFrame != nil {
c.l.Infof("sending transcoded audio")
if err := donut.OnAudioFrame(s.encPkt.Data(), entities.MediaFrameContext{
PTS: int(s.encPkt.Pts()),
DTS: int(s.encPkt.Dts()),

View File

@@ -22,15 +22,10 @@ type WebRTCSetupResponse struct {
LocalSDP *webrtc.SessionDescription
}
// TODO: make it agnostic from streaming protocol when implementing RTMP
type RequestParams struct {
SRTHost string
SRTPort uint16 `json:",string"`
SRTStreamID string
Offer webrtc.SessionDescription
StreamURL string
StreamID string
Offer webrtc.SessionDescription
}
func (p *RequestParams) Valid() error {
@@ -157,6 +152,10 @@ type DonutMediaTaskAction string
var DonutTranscode DonutMediaTaskAction = "transcode"
var DonutBypass DonutMediaTaskAction = "bypass"
type DonutBitStreamFilter string
var DonutH264AnnexB DonutBitStreamFilter = "h264_mp4toannexb"
// TODO: split entities per domain or files avoiding name collision.
// DonutMediaTask is a transformation template to apply over a media.
@@ -169,6 +168,9 @@ type DonutMediaTask struct {
// If no value is provided ffmpeg will use defaults.
// For instance, if one does not provide bit rate, it'll fallback to 64000 bps (opus)
CodecContextOptions []LibAVOptionsCodecContext
// DonutBitStreamFilter is the bitstream filter
DonutBitStreamFilter *DonutBitStreamFilter
}
type DonutInputOptionKey string

View File

@@ -42,48 +42,48 @@ func (h *SignalingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) err
if err != nil {
return err
}
h.l.Infof("createAndValidateParams %s", params.String())
h.l.Infof("RequestParams %s", params.String())
donutEngine, err := h.donut.EngineFor(&params)
if err != nil {
return err
}
h.l.Infof("EngineFor %#v", donutEngine)
h.l.Infof("DonutEngine %#v", donutEngine)
// server side media info
serverStreamInfo, err := donutEngine.ServerIngredients()
if err != nil {
return err
}
h.l.Infof("ServerIngredients %#v", serverStreamInfo)
// client side media support
clientStreamInfo, err := donutEngine.ClientIngredients()
if err != nil {
return err
}
h.l.Infof("ServerIngredients %#v", serverStreamInfo)
h.l.Infof("ClientIngredients %#v", clientStreamInfo)
donutRecipe, err := donutEngine.RecipeFor(serverStreamInfo, clientStreamInfo)
h.l.Info("after RecipeFor")
h.l.Info("after RecipeFor err", err)
h.l.Info("after RecipeFor donutRecipe", donutRecipe)
if err != nil {
return err
}
h.l.Infof("RecipeFor %#v", donutRecipe)
h.l.Infof("DonutRecipe %#v", donutRecipe)
// We can't defer calling cancel here because it'll live alongside the stream.
ctx, cancel := context.WithCancel(context.Background())
webRTCResponse, err := h.webRTCController.Setup(cancel, donutRecipe, params)
h.l.Infof("webRTCController.Setup %#v, err=%#v", webRTCResponse, err)
if err != nil {
cancel()
return err
}
//tODO: add explan
h.l.Info("before sleeping")
time.Sleep(5 * time.Second)
h.l.Info("after sleeping")
h.l.Infof("WebRTCResponse %#v", webRTCResponse)
//TODO: remove the sleeping
// The simulated RTMP stream (/scripts/ffmpeg_rtmp.sh) goes down every time a client disconnects.
// The prober is forcing the first restart therefore it waits for 4 seconds.
time.Sleep(4 * time.Second)
go donutEngine.Serve(&entities.DonutParameters{
Cancel: cancel,
Ctx: ctx,
@@ -98,18 +98,13 @@ func (h *SignalingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) err
h.l.Errorw("error while streaming", "error", err)
},
OnStream: func(st *entities.Stream) error {
h.l.Infof("onstream %#v", st)
return h.webRTCController.SendMetadata(webRTCResponse.Data, st)
},
OnVideoFrame: func(data []byte, c entities.MediaFrameContext) error {
// sl[len(sl)-1]
h.l.Infof("OnVideoFrame %#v < %d > First %#v Last %#v", c, len(data), data[0:7], data[len(data)-7:])
return h.webRTCController.SendMediaSample(webRTCResponse.Video, data, c)
},
OnAudioFrame: func(data []byte, c entities.MediaFrameContext) error {
h.l.Infof("OnAudioFrame %#v", c)
return nil
// return h.webRTCController.SendMediaSample(webRTCResponse.Audio, data, c)
return h.webRTCController.SendMediaSample(webRTCResponse.Audio, data, c)
},
})
@@ -117,11 +112,11 @@ func (h *SignalingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) err
w.WriteHeader(http.StatusOK)
err = json.NewEncoder(w).Encode(*webRTCResponse.LocalSDP)
h.l.Infof("webRTCResponse %#v", webRTCResponse)
if err != nil {
cancel()
return err
}
h.l.Infof("webRTCResponse %#v", webRTCResponse)
return nil
}

View File

@@ -1,26 +0,0 @@
docker run -it --rm --entrypoint bash jrottenberg/ffmpeg:5.1.4-ubuntu2204
ffmpeg -bsfs | grep annex
# h264_mp4toannexb
# hevc_mp4toannexb
live_flv : In case of live network streams, if you force format, you may use live_flv option instead of flv to survive timestamp discontinuities.
-f live_flv
ff_live_flv_demuxer
ff_flv_demuxer
ffmpeg -h demuxer=live_flv
ffmpeg -h bsf=h264_mp4toannexb
example:
https://github.com/FFmpeg/FFmpeg/blob/9c6c4f3d476d7a8d423ec3b954254c6a67ebc792/libavformat/mux.c#L1351
https://github.com/search?q=repo%3AFFmpeg%2FFFmpeg%20ff_stream_add_bitstream_filter&type=code
Good explanaation
https://stackoverflow.com/questions/24884827/possible-locations-for-sequence-picture-parameter-sets-for-h-264-stream/24890903#24890903
Filter impl https://github.com/FFmpeg/FFmpeg/blob/9c6c4f3d476d7a8d423ec3b954254c6a67ebc792/libavcodec/bsf/h264_mp4toannexb.c#L276

View File

@@ -1,8 +1,8 @@
ffmpeg -hide_banner -loglevel verbose \
-re -f lavfi -i testsrc2=size=1280x720:rate=30,format=yuv420p \
-re -f lavfi -i testsrc2=size=768x432:rate=30,format=yuv420p \
-f lavfi -i sine=frequency=1000:sample_rate=44100 \
-c:v libx264 -preset veryfast -tune zerolatency -profile:v baseline \
-vf "drawtext=text='SRT streaming':box=1:boxborderw=10:x=(w-text_w)/2:y=(h-text_h)/2:fontsize=128:fontcolor=black" \
-vf "drawtext=text='SRT streaming':box=1:boxborderw=10:x=(w-text_w)/2:y=(h-text_h)/2:fontsize=64:fontcolor=black" \
-b:v 1000k -bufsize 2000k -x264opts keyint=30:min-keyint=30:scenecut=-1 \
-c:a aac -b:a 128k \
-f mpegts "udp://${SRT_INPUT_HOST}:${SRT_INPUT_PORT}?pkt_size=${PKT_SIZE}"

View File

@@ -2,10 +2,10 @@
while true
do
ffmpeg -hide_banner -loglevel debug \
-re -f lavfi -i testsrc2=size=1280x720:rate=30,format=yuv420p \
-re -f lavfi -i testsrc2=size=768x432:rate=30,format=yuv420p \
-f lavfi -i sine=frequency=1000:sample_rate=44100 \
-c:v libx264 -preset veryfast -tune zerolatency -profile:v baseline \
-vf "drawtext=text='RTMP streaming':box=1:boxborderw=10:x=(w-text_w)/2:y=(h-text_h)/2:fontsize=128:fontcolor=black" \
-vf "drawtext=text='RTMP streaming':box=1:boxborderw=10:x=(w-text_w)/2:y=(h-text_h)/2:fontsize=64:fontcolor=black" \
-b:v 1000k -bufsize 2000k -x264opts keyint=30:min-keyint=30:scenecut=-1 \
-c:a aac -b:a 128k \
-f flv -listen 1 -rtmp_live live "rtmp://${RTMP_HOST}:${RTMP_PORT}/live/app"

View File

@@ -1,23 +0,0 @@
#!/bin/bash
set -e
PREFIX="/opt/ffmpeg"
# from https://github.com/asticode/go-astiav/blob/master/Makefile
version="n5.1.2"
srcPath="tmp/$(version)/src"
postCheckout=""
rm -rf $(srcPath)
mkdir -p $(srcPath)
git clone --depth 1 --branch $(version) https://github.com/FFmpeg/FFmpeg $(srcPath)
# TODO: install all required libraries (srt, rtmp, aac, x264...) and enable them.
cd $(srcPath) && ./configure --prefix=.. $(configure) \
--disable-htmlpages --disable-doc --disable-txtpages --disable-podpages --disable-manpages \
# --enable-gpl \
# --disable-ffmpeg --disable-ffplay --disable-ffprobe --enable-libopus \
# --enable-libsvtav1 --enable-libfdk-aac --enable-libopus \
# --enable-libfreetype --enable-libsrt --enable-librtmp \
# --enable-libvorbis --enable-libx265 --enable-libx264 --enable-libvpx
cd $(srcPath) && make
cd $(srcPath) && make install

View File

@@ -1,12 +0,0 @@
#!/bin/bash
if ! brew list srt &>/dev/null; then
echo "ERROR you must install srt"
echo "brew install srt"
exit 1
fi
if ! ls tmp &>/dev/null; then
echo "ERROR you must install ffmpeg"
echo "make install-ffmpeg"
exit 1
fi

View File

@@ -1,7 +0,0 @@
#!/bin/bash
source ./scripts/mac_check_deps.sh
# deps
source ./scripts/setup_deps_flags.sh
go run -race main.go

View File

@@ -1,11 +0,0 @@
#!/bin/bash
source ./scripts/mac_check_deps.sh
# deps
source ./scripts/setup_deps_flags.sh
# For debugging:
# go test -v -p 1 ./...
# ref https://github.com/golang/go/issues/46959#issuecomment-1407594935
go test ./...

View File

@@ -1,10 +0,0 @@
#!/bin/bash
# SRT deps
export CGO_LDFLAGS="-L$(brew --prefix srt)/lib"
export CGO_CFLAGS="-I$(brew --prefix srt)/include/"
export PKG_CONFIG_PATH="$(brew --prefix srt)/lib/pkgconfig"
# ffmpeg/libav deps
CGO_LDFLAGS="$CGO_LDFLAGS -L$(pwd)/tmp/n5.1.2/lib/"
CGO_CFLAGS="$CGO_CFLAGS -I$(pwd)/tmp/n5.1.2/include/"
PKG_CONFIG_PATH="$PKG_CONFIG_PATH:$(pwd)/tmp/n5.1.2/lib/pkgconfig"