standardize the input

This commit is contained in:
Leandro Moreira
2024-03-06 17:55:12 -03:00
parent 52312607e1
commit 668bb29f43
7 changed files with 75 additions and 39 deletions

View File

@@ -11,6 +11,7 @@ import (
)
type DonutEngine interface {
Appetizer() entities.DonutAppetizer
ServerIngredients() (*entities.StreamInfo, error)
ClientIngredients() (*entities.StreamInfo, error)
RecipeFor(server, client *entities.StreamInfo) *entities.DonutRecipe
@@ -79,7 +80,7 @@ type donutEngine struct {
}
func (d *donutEngine) ServerIngredients() (*entities.StreamInfo, error) {
return d.prober.StreamInfo(d.req)
return d.prober.StreamInfo(d.Appetizer())
}
func (d *donutEngine) ClientIngredients() (*entities.StreamInfo, error) {
@@ -101,14 +102,7 @@ func (d *donutEngine) RecipeFor(server, client *entities.StreamInfo) *entities.D
// if union(preferable, client.medias)
// transcode, preferable
r := &entities.DonutRecipe{
Input: entities.DonutInput{
Format: "mpegts", // it'll change based on input, i.e. rmtp flv
Options: map[entities.DonutInputOptionKey]string{
entities.DonutSRTStreamID: d.req.SRTStreamID,
entities.DonutSRTTranstype: "live",
entities.DonutSRTsmoother: "live",
},
},
Input: d.Appetizer(),
Video: entities.DonutMediaTask{
Action: entities.DonutBypass,
Codec: entities.H264,
@@ -130,3 +124,16 @@ func (d *donutEngine) RecipeFor(server, client *entities.StreamInfo) *entities.D
return r
}
func (d *donutEngine) Appetizer() entities.DonutAppetizer {
// TODO: implement input based on param
return entities.DonutAppetizer{
URL: fmt.Sprintf("srt://%s:%d", d.req.SRTHost, d.req.SRTPort),
Format: "mpegts", // it'll change based on input, i.e. rmtp flv
Options: map[entities.DonutInputOptionKey]string{
entities.DonutSRTStreamID: d.req.SRTStreamID,
entities.DonutSRTTranstype: "live",
entities.DonutSRTsmoother: "live",
},
}
}

View File

@@ -3,6 +3,6 @@ package probers
import "github.com/flavioribeiro/donut/internal/entities"
type DonutProber interface {
StreamInfo(req *entities.RequestParams) (*entities.StreamInfo, error)
StreamInfo(req entities.DonutAppetizer) (*entities.StreamInfo, error)
Match(req *entities.RequestParams) bool
}

View File

@@ -43,7 +43,7 @@ func (c *LibAVFFmpeg) Match(req *entities.RequestParams) bool {
}
// StreamInfo connects to the SRT stream to discovery media properties.
func (c *LibAVFFmpeg) StreamInfo(req *entities.RequestParams) (*entities.StreamInfo, error) {
func (c *LibAVFFmpeg) StreamInfo(req entities.DonutAppetizer) (*entities.StreamInfo, error) {
closer := astikit.NewCloser()
defer closer.Close()
@@ -53,26 +53,13 @@ func (c *LibAVFFmpeg) StreamInfo(req *entities.RequestParams) (*entities.StreamI
}
closer.Add(inputFormatContext.Free)
// TODO: implement proper handler per req
userProvidedInputFormat := "mpegts"
// We're assuming that SRT is carrying mpegts.
//
// ffmpeg -hide_banner -protocols # shows all protocols (SRT/RTMP)
// ffmpeg -hide_banner -formats # shows all formats (mpegts/webm/mov)
inputFormat := astiav.FindInputFormat(userProvidedInputFormat)
if inputFormat == nil {
return nil, fmt.Errorf("mpegts: %w", entities.ErrFFmpegLibAVNotFound)
inputFormat, err := c.defineInputFormat(req.Format.String())
if err != nil {
return nil, err
}
inputOptions := c.defineInputOptions(req.Options, closer)
// ref https://ffmpeg.org/ffmpeg-all.html#srt
d := &astiav.Dictionary{}
// flags (the zeroed 3rd value) https://github.com/FFmpeg/FFmpeg/blob/n5.0/libavutil/dict.h#L67C9-L77
d.Set("srt_streamid", req.SRTStreamID, 0)
d.Set("smoother", "live", 0)
d.Set("transtype", "live", 0)
inputURL := fmt.Sprintf("srt://%s:%d", req.SRTHost, req.SRTPort)
if err := inputFormatContext.OpenInput(inputURL, inputFormat, d); err != nil {
if err := inputFormatContext.OpenInput(req.URL, inputFormat, inputOptions); err != nil {
return nil, fmt.Errorf("error while inputFormatContext.OpenInput: %w", err)
}
closer.Add(inputFormatContext.CloseInput)
@@ -94,3 +81,28 @@ func (c *LibAVFFmpeg) StreamInfo(req *entities.RequestParams) (*entities.StreamI
return &si, nil
}
// TODO: merge common behavior (streamer / prober)
func (c *LibAVFFmpeg) defineInputFormat(streamFormat string) (*astiav.InputFormat, error) {
var inputFormat *astiav.InputFormat
if streamFormat != "" {
inputFormat = astiav.FindInputFormat(streamFormat)
if inputFormat == nil {
return nil, fmt.Errorf("ffmpeg/libav: could not find %s input format", streamFormat)
}
}
return inputFormat, nil
}
func (c *LibAVFFmpeg) defineInputOptions(opts map[entities.DonutInputOptionKey]string, closer *astikit.Closer) *astiav.Dictionary {
var dic *astiav.Dictionary
if len(opts) > 0 {
dic = &astiav.Dictionary{}
closer.Add(dic.Free)
for k, v := range opts {
dic.Set(k.String(), v, 0)
}
}
return dic
}

View File

@@ -1,6 +1,7 @@
package probers_test
import (
"fmt"
"testing"
"github.com/flavioribeiro/donut/internal/controllers/probers"
@@ -47,9 +48,19 @@ func TestSrtMpegTs_StreamInfo_264(t *testing.T) {
SRTStreamID: "test_id",
}
input := entities.DonutAppetizer{
URL: fmt.Sprintf("srt://%s:%d", ffmpeg.Output().Host, uint16(ffmpeg.Output().Port)),
Format: "mpegts", // it'll change based on input, i.e. rmtp flv
Options: map[entities.DonutInputOptionKey]string{
entities.DonutSRTStreamID: "test_id",
entities.DonutSRTTranstype: "live",
entities.DonutSRTsmoother: "live",
},
}
prober := selectProberFor(t, req)
streamInfo, err := prober.StreamInfo(req)
streamInfo, err := prober.StreamInfo(input)
assert.Nil(t, err)
assert.NotNil(t, streamInfo)
@@ -69,9 +80,19 @@ func TestSrtMpegTs_StreamInfo_265(t *testing.T) {
SRTStreamID: "test_id",
}
input := entities.DonutAppetizer{
URL: fmt.Sprintf("srt://%s:%d", ffmpeg.Output().Host, uint16(ffmpeg.Output().Port)),
Format: "mpegts", // it'll change based on input, i.e. rmtp flv
Options: map[entities.DonutInputOptionKey]string{
entities.DonutSRTStreamID: "test_id",
entities.DonutSRTTranstype: "live",
entities.DonutSRTsmoother: "live",
},
}
prober := selectProberFor(t, req)
streamInfo, err := prober.StreamInfo(req)
streamInfo, err := prober.StreamInfo(input)
assert.Nil(t, err)
assert.NotNil(t, streamInfo)

View File

@@ -209,7 +209,7 @@ func (c *LibAVFFmpegStreamer) prepareInput(p *libAVParams, closer *astikit.Close
return err
}
inputOptions := c.defineInputOptions(donut, closer)
if err := p.inputFormatContext.OpenInput(donut.StreamURL, inputFormat, inputOptions); err != nil {
if err := p.inputFormatContext.OpenInput(donut.Recipe.Input.URL, inputFormat, inputOptions); err != nil {
return fmt.Errorf("ffmpeg/libav: opening input failed %w", err)
}
closer.Add(p.inputFormatContext.CloseInput)

View File

@@ -136,8 +136,6 @@ type DonutParameters struct {
Cancel context.CancelFunc
Ctx context.Context
StreamURL string // ie: srt://host:9080, rtmp://host:4991
Recipe DonutRecipe
OnClose func()
@@ -185,13 +183,14 @@ func (d DonutInputFormat) String() string {
var DonutMpegTSFormat DonutInputFormat = "mpegts"
var DonutFLVFormat DonutInputFormat = "flv"
type DonutInput struct {
type DonutAppetizer struct {
URL string
Format DonutInputFormat
Options map[DonutInputOptionKey]string
}
type DonutRecipe struct {
Input DonutInput
Input DonutAppetizer
Video DonutMediaTask
Audio DonutMediaTask
}

View File

@@ -3,7 +3,6 @@ package handlers
import (
"context"
"encoding/json"
"fmt"
"net/http"
"github.com/flavioribeiro/donut/internal/controllers"
@@ -78,8 +77,6 @@ func (h *SignalingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) err
Recipe: *donutRecipe,
StreamURL: fmt.Sprintf("srt://%s:%d", params.SRTHost, params.SRTPort),
OnClose: func() {
cancel()
webRTCResponse.Connection.Close()