mirror of
https://github.com/flavioribeiro/donut.git
synced 2025-10-17 20:40:44 +08:00
introduce donut engine
This commit is contained in:
85
internal/controllers/engine/donut_engine_controller.go
Normal file
85
internal/controllers/engine/donut_engine_controller.go
Normal file
@@ -0,0 +1,85 @@
|
|||||||
|
package engine
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/flavioribeiro/donut/internal/controllers/probers"
|
||||||
|
"github.com/flavioribeiro/donut/internal/controllers/streamers"
|
||||||
|
"github.com/flavioribeiro/donut/internal/entities"
|
||||||
|
"go.uber.org/fx"
|
||||||
|
)
|
||||||
|
|
||||||
|
type DonutEngine interface {
|
||||||
|
Prober() probers.DonutProber
|
||||||
|
Streamer() streamers.DonutStreamer
|
||||||
|
CompatibleStreamsFor(server, client *entities.StreamInfo) []entities.Stream
|
||||||
|
}
|
||||||
|
|
||||||
|
type DonutEngineParams struct {
|
||||||
|
fx.In
|
||||||
|
Streamers []streamers.DonutStreamer `group:"streamers"`
|
||||||
|
Probers []probers.DonutProber `group:"probers"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type DonutEngineController struct {
|
||||||
|
p DonutEngineParams
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewDonutEngineController(p DonutEngineParams) *DonutEngineController {
|
||||||
|
return &DonutEngineController{p}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *DonutEngineController) EngineFor(req *entities.RequestParams) (DonutEngine, error) {
|
||||||
|
prober := c.selectProberFor(req)
|
||||||
|
if prober == nil {
|
||||||
|
return nil, fmt.Errorf("request %v: not fulfilled error %v", req, entities.ErrMissingProber)
|
||||||
|
}
|
||||||
|
|
||||||
|
streamer := c.selectStreamerFor(req)
|
||||||
|
if prober == nil {
|
||||||
|
return nil, fmt.Errorf("request %v: not fulfilled error %v", req, entities.ErrMissingStreamer)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &donutEngine{
|
||||||
|
prober: prober,
|
||||||
|
streamer: streamer,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: try to use generics
|
||||||
|
func (c *DonutEngineController) selectProberFor(req *entities.RequestParams) probers.DonutProber {
|
||||||
|
for _, p := range c.p.Probers {
|
||||||
|
if p.Match(req) {
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: try to use generics
|
||||||
|
func (c *DonutEngineController) selectStreamerFor(req *entities.RequestParams) streamers.DonutStreamer {
|
||||||
|
for _, p := range c.p.Streamers {
|
||||||
|
if p.Match(req) {
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type donutEngine struct {
|
||||||
|
prober probers.DonutProber
|
||||||
|
streamer streamers.DonutStreamer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *donutEngine) Prober() probers.DonutProber {
|
||||||
|
return d.prober
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *donutEngine) Streamer() streamers.DonutStreamer {
|
||||||
|
return d.streamer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *donutEngine) CompatibleStreamsFor(server, client *entities.StreamInfo) []entities.Stream {
|
||||||
|
// TODO: implement proper matching
|
||||||
|
return server.Streams
|
||||||
|
}
|
@@ -2,6 +2,7 @@ package probers
|
|||||||
|
|
||||||
import "github.com/flavioribeiro/donut/internal/entities"
|
import "github.com/flavioribeiro/donut/internal/entities"
|
||||||
|
|
||||||
type Prober interface {
|
type DonutProber interface {
|
||||||
StreamInfo(req *entities.RequestParams) (*entities.StreamInfo, error)
|
StreamInfo(req *entities.RequestParams) (*entities.StreamInfo, error)
|
||||||
|
Match(req *entities.RequestParams) bool
|
||||||
}
|
}
|
||||||
|
@@ -7,28 +7,46 @@ import (
|
|||||||
|
|
||||||
astisrt "github.com/asticode/go-astisrt/pkg"
|
astisrt "github.com/asticode/go-astisrt/pkg"
|
||||||
"github.com/asticode/go-astits"
|
"github.com/asticode/go-astits"
|
||||||
"github.com/flavioribeiro/donut/internal/controllers"
|
|
||||||
"github.com/flavioribeiro/donut/internal/entities"
|
"github.com/flavioribeiro/donut/internal/entities"
|
||||||
"github.com/flavioribeiro/donut/internal/mapper"
|
"github.com/flavioribeiro/donut/internal/mapper"
|
||||||
|
"go.uber.org/fx"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
type SrtMpegTs struct {
|
type SrtMpegTs struct {
|
||||||
c *entities.Config
|
c *entities.Config
|
||||||
l *zap.SugaredLogger
|
l *zap.SugaredLogger
|
||||||
srtController *controllers.SRTController
|
m *mapper.Mapper
|
||||||
m *mapper.Mapper
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSrtMpegTs(c *entities.Config, l *zap.SugaredLogger, srtController *controllers.SRTController, m *mapper.Mapper) *SrtMpegTs {
|
type ResultSrtMpegTs struct {
|
||||||
return &SrtMpegTs{
|
fx.Out
|
||||||
c: c,
|
SrtMpegTsProber DonutProber `group:"probers"`
|
||||||
l: l,
|
}
|
||||||
srtController: srtController,
|
|
||||||
m: m,
|
// NewSrtMpegTs creates a new SrtMpegTs DonutProber
|
||||||
|
func NewSrtMpegTs(
|
||||||
|
c *entities.Config,
|
||||||
|
l *zap.SugaredLogger,
|
||||||
|
m *mapper.Mapper,
|
||||||
|
) ResultSrtMpegTs {
|
||||||
|
return ResultSrtMpegTs{
|
||||||
|
SrtMpegTsProber: &SrtMpegTs{
|
||||||
|
c: c,
|
||||||
|
l: l,
|
||||||
|
m: m,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Match returns true when the request is for an SrtMpegTs prober
|
||||||
|
func (c *SrtMpegTs) Match(req *entities.RequestParams) bool {
|
||||||
|
if req.SRTHost != "" {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
// StreamInfo connects to the SRT stream and probe N packets to discovery the media properties.
|
// StreamInfo connects to the SRT stream and probe N packets to discovery the media properties.
|
||||||
func (c *SrtMpegTs) StreamInfo(req *entities.RequestParams) (*entities.StreamInfo, error) {
|
func (c *SrtMpegTs) StreamInfo(req *entities.RequestParams) (*entities.StreamInfo, error) {
|
||||||
streamInfoMap, err := c.streamInfoMap(req)
|
streamInfoMap, err := c.streamInfoMap(req)
|
||||||
@@ -51,7 +69,7 @@ func (c *SrtMpegTs) streamInfoMap(req *entities.RequestParams) (map[entities.Cod
|
|||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
srtConnection, err := c.srtController.Connect(cancel, req)
|
srtConnection, err := c.connect(cancel, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -132,3 +150,43 @@ func (c *SrtMpegTs) fillStreamInfoFromMpegTS(streamInfo map[entities.Codec]entit
|
|||||||
}
|
}
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: move to its own component later dup streamer.srt_mpegts, prober.srt_mpegts
|
||||||
|
func (c *SrtMpegTs) connect(cancel context.CancelFunc, params *entities.RequestParams) (*astisrt.Connection, error) {
|
||||||
|
c.l.Info("trying to connect srt")
|
||||||
|
|
||||||
|
if err := params.Valid(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
c.l.Infow("Connecting to SRT ",
|
||||||
|
"offer", params.String(),
|
||||||
|
)
|
||||||
|
|
||||||
|
conn, err := astisrt.Dial(astisrt.DialOptions{
|
||||||
|
ConnectionOptions: []astisrt.ConnectionOption{
|
||||||
|
astisrt.WithLatency(c.c.SRTConnectionLatencyMS),
|
||||||
|
astisrt.WithStreamid(params.SRTStreamID),
|
||||||
|
astisrt.WithCongestion("live"),
|
||||||
|
astisrt.WithTranstype(astisrt.Transtype(astisrt.TranstypeLive)),
|
||||||
|
},
|
||||||
|
|
||||||
|
OnDisconnect: func(conn *astisrt.Connection, err error) {
|
||||||
|
c.l.Infow("Canceling SRT",
|
||||||
|
"error", err,
|
||||||
|
)
|
||||||
|
cancel()
|
||||||
|
},
|
||||||
|
|
||||||
|
Host: params.SRTHost,
|
||||||
|
Port: params.SRTPort,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
c.l.Errorw("failed to connect srt",
|
||||||
|
"error", err,
|
||||||
|
)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
c.l.Infow("Connected to SRT")
|
||||||
|
return conn, nil
|
||||||
|
}
|
||||||
|
@@ -1,91 +0,0 @@
|
|||||||
package controllers
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
astisrt "github.com/asticode/go-astisrt/pkg"
|
|
||||||
"github.com/flavioribeiro/donut/internal/entities"
|
|
||||||
"go.uber.org/fx"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
|
||||||
|
|
||||||
type SRTController struct {
|
|
||||||
c *entities.Config
|
|
||||||
l *zap.SugaredLogger
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewSRTController(c *entities.Config, l *zap.SugaredLogger, lc fx.Lifecycle) (*SRTController, error) {
|
|
||||||
// Handle logs
|
|
||||||
astisrt.SetLogLevel(astisrt.LogLevel(astisrt.LogLevelNotice))
|
|
||||||
astisrt.SetLogHandler(func(ll astisrt.LogLevel, file, area, msg string, line int) {
|
|
||||||
l.Infow("SRT",
|
|
||||||
"ll", ll,
|
|
||||||
"msg", msg,
|
|
||||||
)
|
|
||||||
})
|
|
||||||
|
|
||||||
// Startup srt
|
|
||||||
if err := astisrt.Startup(); err != nil {
|
|
||||||
l.Errorw("failed to start up srt",
|
|
||||||
"error", err,
|
|
||||||
)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
lc.Append(fx.Hook{
|
|
||||||
OnStop: func(ctx context.Context) error {
|
|
||||||
// Clean up
|
|
||||||
if err := astisrt.CleanUp(); err != nil {
|
|
||||||
l.Errorw("failed to clean up srt",
|
|
||||||
"error", err,
|
|
||||||
)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
return &SRTController{
|
|
||||||
c: c,
|
|
||||||
l: l,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *SRTController) Connect(cancel context.CancelFunc, params *entities.RequestParams) (*astisrt.Connection, error) {
|
|
||||||
c.l.Info("trying to connect srt")
|
|
||||||
|
|
||||||
if err := params.Valid(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
c.l.Infow("Connecting to SRT ",
|
|
||||||
"offer", params.String(),
|
|
||||||
)
|
|
||||||
|
|
||||||
conn, err := astisrt.Dial(astisrt.DialOptions{
|
|
||||||
ConnectionOptions: []astisrt.ConnectionOption{
|
|
||||||
astisrt.WithLatency(c.c.SRTConnectionLatencyMS),
|
|
||||||
astisrt.WithStreamid(params.SRTStreamID),
|
|
||||||
astisrt.WithCongestion("live"),
|
|
||||||
astisrt.WithTranstype(astisrt.Transtype(astisrt.TranstypeLive)),
|
|
||||||
},
|
|
||||||
|
|
||||||
OnDisconnect: func(conn *astisrt.Connection, err error) {
|
|
||||||
c.l.Infow("Canceling SRT",
|
|
||||||
"error", err,
|
|
||||||
)
|
|
||||||
cancel()
|
|
||||||
},
|
|
||||||
|
|
||||||
Host: params.SRTHost,
|
|
||||||
Port: params.SRTPort,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
c.l.Errorw("failed to connect srt",
|
|
||||||
"error", err,
|
|
||||||
)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
c.l.Infow("Connected to SRT")
|
|
||||||
return conn, nil
|
|
||||||
}
|
|
8
internal/controllers/streamers/interface.go
Normal file
8
internal/controllers/streamers/interface.go
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
package streamers
|
||||||
|
|
||||||
|
import "github.com/flavioribeiro/donut/internal/entities"
|
||||||
|
|
||||||
|
type DonutStreamer interface {
|
||||||
|
Stream(sp *entities.StreamParameters)
|
||||||
|
Match(req *entities.RequestParams) bool
|
||||||
|
}
|
@@ -1,4 +1,4 @@
|
|||||||
package controllers
|
package streamers
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@@ -14,14 +14,14 @@ import (
|
|||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
type StreamingController struct {
|
type SRTMpegTSStreamer struct {
|
||||||
c *entities.Config
|
c *entities.Config
|
||||||
l *zap.SugaredLogger
|
l *zap.SugaredLogger
|
||||||
|
|
||||||
middlewares []entities.StreamMiddleware
|
middlewares []entities.StreamMiddleware
|
||||||
}
|
}
|
||||||
|
|
||||||
type StreamingControllerParams struct {
|
type SRTMpegTSStreamerParams struct {
|
||||||
fx.In
|
fx.In
|
||||||
C *entities.Config
|
C *entities.Config
|
||||||
L *zap.SugaredLogger
|
L *zap.SugaredLogger
|
||||||
@@ -29,25 +29,45 @@ type StreamingControllerParams struct {
|
|||||||
Middlewares []entities.StreamMiddleware `group:"middlewares"`
|
Middlewares []entities.StreamMiddleware `group:"middlewares"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewStreamingController(p StreamingControllerParams) *StreamingController {
|
type ResultSRTMpegTSStreamer struct {
|
||||||
return &StreamingController{
|
fx.Out
|
||||||
c: p.C,
|
SRTMpegTSStreamer DonutStreamer `group:"streamers"`
|
||||||
l: p.L,
|
}
|
||||||
middlewares: p.Middlewares,
|
|
||||||
|
func NewSRTMpegTSStreamer(p SRTMpegTSStreamerParams) ResultSRTMpegTSStreamer {
|
||||||
|
return ResultSRTMpegTSStreamer{
|
||||||
|
SRTMpegTSStreamer: &SRTMpegTSStreamer{
|
||||||
|
c: p.C,
|
||||||
|
l: p.L,
|
||||||
|
middlewares: p.Middlewares,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *StreamingController) Stream(sp *entities.StreamParameters) {
|
func (c *SRTMpegTSStreamer) Match(req *entities.RequestParams) bool {
|
||||||
|
if req.SRTHost != "" {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SRTMpegTSStreamer) Stream(sp *entities.StreamParameters) {
|
||||||
|
srtConnection, err := c.connect(sp.Cancel, sp.RequestParams)
|
||||||
|
if err != nil {
|
||||||
|
c.l.Errorw("streaming has stopped due errors",
|
||||||
|
"error", sp.Ctx.Err(),
|
||||||
|
)
|
||||||
|
return
|
||||||
|
}
|
||||||
r, w := io.Pipe()
|
r, w := io.Pipe()
|
||||||
|
|
||||||
defer r.Close()
|
defer r.Close()
|
||||||
defer w.Close()
|
defer w.Close()
|
||||||
defer sp.SRTConnection.Close()
|
defer srtConnection.Close()
|
||||||
defer sp.WebRTCConn.Close()
|
defer sp.WebRTCConn.Close()
|
||||||
defer sp.Cancel()
|
defer sp.Cancel()
|
||||||
|
|
||||||
// TODO: pick the proper transport? is it possible to get rtp instead?
|
go c.readFromSRTIntoWriterPipe(srtConnection, w)
|
||||||
go c.readFromSRTIntoWriterPipe(sp.SRTConnection, w)
|
|
||||||
|
|
||||||
// reading from reader pipe to the mpeg-ts demuxer
|
// reading from reader pipe to the mpeg-ts demuxer
|
||||||
mpegTSDemuxer := astits.NewDemuxer(sp.Ctx, r)
|
mpegTSDemuxer := astits.NewDemuxer(sp.Ctx, r)
|
||||||
@@ -77,7 +97,7 @@ func (c *StreamingController) Stream(sp *entities.StreamParameters) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// writing mpeg-ts video to webrtc channels
|
// writing mpeg-ts video to webrtc channels
|
||||||
for _, v := range sp.StreamInfo.VideoStreams() {
|
for _, v := range sp.ServerStreamInfo.VideoStreams() {
|
||||||
if v.Id != mpegTSDemuxData.PID {
|
if v.Id != mpegTSDemuxData.PID {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -109,7 +129,7 @@ func (c *StreamingController) Stream(sp *entities.StreamParameters) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *StreamingController) readFromSRTIntoWriterPipe(srtConnection *astisrt.Connection, w *io.PipeWriter) {
|
func (c *SRTMpegTSStreamer) readFromSRTIntoWriterPipe(srtConnection *astisrt.Connection, w *io.PipeWriter) {
|
||||||
defer srtConnection.Close()
|
defer srtConnection.Close()
|
||||||
|
|
||||||
inboundMpegTsPacket := make([]byte, c.c.SRTReadBufferSizeBytes)
|
inboundMpegTsPacket := make([]byte, c.c.SRTReadBufferSizeBytes)
|
||||||
@@ -131,3 +151,43 @@ func (c *StreamingController) readFromSRTIntoWriterPipe(srtConnection *astisrt.C
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: move to its own component later dup streamer.srt_mpegts, prober.srt_mpegts
|
||||||
|
func (c *SRTMpegTSStreamer) connect(cancel context.CancelFunc, params *entities.RequestParams) (*astisrt.Connection, error) {
|
||||||
|
c.l.Info("trying to connect srt")
|
||||||
|
|
||||||
|
if err := params.Valid(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
c.l.Infow("Connecting to SRT ",
|
||||||
|
"offer", params.String(),
|
||||||
|
)
|
||||||
|
|
||||||
|
conn, err := astisrt.Dial(astisrt.DialOptions{
|
||||||
|
ConnectionOptions: []astisrt.ConnectionOption{
|
||||||
|
astisrt.WithLatency(c.c.SRTConnectionLatencyMS),
|
||||||
|
astisrt.WithStreamid(params.SRTStreamID),
|
||||||
|
astisrt.WithCongestion("live"),
|
||||||
|
astisrt.WithTranstype(astisrt.Transtype(astisrt.TranstypeLive)),
|
||||||
|
},
|
||||||
|
|
||||||
|
OnDisconnect: func(conn *astisrt.Connection, err error) {
|
||||||
|
c.l.Infow("Canceling SRT",
|
||||||
|
"error", err,
|
||||||
|
)
|
||||||
|
cancel()
|
||||||
|
},
|
||||||
|
|
||||||
|
Host: params.SRTHost,
|
||||||
|
Port: params.SRTPort,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
c.l.Errorw("failed to connect srt",
|
||||||
|
"error", err,
|
||||||
|
)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
c.l.Infow("Connected to SRT")
|
||||||
|
return conn, nil
|
||||||
|
}
|
@@ -25,7 +25,7 @@ func NewEIA608() EIA608Response {
|
|||||||
|
|
||||||
// Act parses and send eia608 data from mpeg-ts to metadata channel
|
// Act parses and send eia608 data from mpeg-ts to metadata channel
|
||||||
func (*eia608Middleware) Act(mpegTSDemuxData *astits.DemuxerData, sp *entities.StreamParameters) error {
|
func (*eia608Middleware) Act(mpegTSDemuxData *astits.DemuxerData, sp *entities.StreamParameters) error {
|
||||||
vs := sp.StreamInfo.VideoStreams()
|
vs := sp.ServerStreamInfo.VideoStreams()
|
||||||
eia608Reader := newEIA608Reader()
|
eia608Reader := newEIA608Reader()
|
||||||
|
|
||||||
for _, v := range vs {
|
for _, v := range vs {
|
||||||
|
@@ -4,7 +4,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
astisrt "github.com/asticode/go-astisrt/pkg"
|
|
||||||
"github.com/asticode/go-astits"
|
"github.com/asticode/go-astits"
|
||||||
"github.com/pion/webrtc/v3"
|
"github.com/pion/webrtc/v3"
|
||||||
)
|
)
|
||||||
@@ -117,10 +116,11 @@ type StreamParameters struct {
|
|||||||
WebRTCConn *webrtc.PeerConnection
|
WebRTCConn *webrtc.PeerConnection
|
||||||
Cancel context.CancelFunc
|
Cancel context.CancelFunc
|
||||||
Ctx context.Context
|
Ctx context.Context
|
||||||
SRTConnection *astisrt.Connection
|
RequestParams *RequestParams
|
||||||
VideoTrack *webrtc.TrackLocalStaticSample
|
VideoTrack *webrtc.TrackLocalStaticSample
|
||||||
MetadataTrack *webrtc.DataChannel
|
MetadataTrack *webrtc.DataChannel
|
||||||
StreamInfo *StreamInfo
|
ServerStreamInfo *StreamInfo
|
||||||
|
ClientStreamInfo *StreamInfo
|
||||||
StreamMiddlewares []StreamMiddleware
|
StreamMiddlewares []StreamMiddleware
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -12,3 +12,6 @@ var ErrMissingWebRTCSetup = errors.New("WebRTCController.SetupPeerConnection mus
|
|||||||
var ErrMissingRemoteOffer = errors.New("nil offer, in order to connect one must pass a valid offer")
|
var ErrMissingRemoteOffer = errors.New("nil offer, in order to connect one must pass a valid offer")
|
||||||
var ErrMissingRequestParams = errors.New("RequestParams must not be nil")
|
var ErrMissingRequestParams = errors.New("RequestParams must not be nil")
|
||||||
var ErrMissingProcess = errors.New("there is no process running")
|
var ErrMissingProcess = errors.New("there is no process running")
|
||||||
|
var ErrMissingProber = errors.New("there is no prober")
|
||||||
|
var ErrMissingStreamer = errors.New("there is no streamer")
|
||||||
|
var ErrMissingCompatibleStreams = errors.New("there is no compatible streams")
|
||||||
|
@@ -4,7 +4,9 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
|
|
||||||
"github.com/flavioribeiro/donut/internal/controllers"
|
"github.com/flavioribeiro/donut/internal/controllers"
|
||||||
|
"github.com/flavioribeiro/donut/internal/controllers/engine"
|
||||||
"github.com/flavioribeiro/donut/internal/controllers/probers"
|
"github.com/flavioribeiro/donut/internal/controllers/probers"
|
||||||
|
"github.com/flavioribeiro/donut/internal/controllers/streamers"
|
||||||
"github.com/flavioribeiro/donut/internal/controllers/streammiddlewares"
|
"github.com/flavioribeiro/donut/internal/controllers/streammiddlewares"
|
||||||
"github.com/flavioribeiro/donut/internal/entities"
|
"github.com/flavioribeiro/donut/internal/entities"
|
||||||
"github.com/flavioribeiro/donut/internal/mapper"
|
"github.com/flavioribeiro/donut/internal/mapper"
|
||||||
@@ -38,15 +40,15 @@ func Dependencies(enableICEMux bool) fx.Option {
|
|||||||
fx.Provide(controllers.NewUDPICEServer),
|
fx.Provide(controllers.NewUDPICEServer),
|
||||||
|
|
||||||
// Controllers
|
// Controllers
|
||||||
fx.Provide(controllers.NewSRTController),
|
|
||||||
fx.Provide(controllers.NewStreamingController),
|
|
||||||
|
|
||||||
fx.Provide(controllers.NewWebRTCController),
|
fx.Provide(controllers.NewWebRTCController),
|
||||||
fx.Provide(controllers.NewWebRTCSettingsEngine),
|
fx.Provide(controllers.NewWebRTCSettingsEngine),
|
||||||
fx.Provide(controllers.NewWebRTCMediaEngine),
|
fx.Provide(controllers.NewWebRTCMediaEngine),
|
||||||
fx.Provide(controllers.NewWebRTCAPI),
|
fx.Provide(controllers.NewWebRTCAPI),
|
||||||
|
fx.Provide(streamers.NewSRTMpegTSStreamer),
|
||||||
fx.Provide(probers.NewSrtMpegTs),
|
fx.Provide(probers.NewSrtMpegTs),
|
||||||
|
|
||||||
|
fx.Provide(engine.NewDonutEngineController),
|
||||||
|
|
||||||
// Stream middlewares
|
// Stream middlewares
|
||||||
fx.Provide(streammiddlewares.NewStreamInfo),
|
fx.Provide(streammiddlewares.NewStreamInfo),
|
||||||
fx.Provide(streammiddlewares.NewEIA608),
|
fx.Provide(streammiddlewares.NewEIA608),
|
||||||
|
@@ -6,52 +6,59 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"github.com/flavioribeiro/donut/internal/controllers"
|
"github.com/flavioribeiro/donut/internal/controllers"
|
||||||
"github.com/flavioribeiro/donut/internal/controllers/probers"
|
"github.com/flavioribeiro/donut/internal/controllers/engine"
|
||||||
"github.com/flavioribeiro/donut/internal/entities"
|
"github.com/flavioribeiro/donut/internal/entities"
|
||||||
"github.com/flavioribeiro/donut/internal/mapper"
|
"github.com/flavioribeiro/donut/internal/mapper"
|
||||||
|
"github.com/pion/webrtc/v3"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
type SignalingHandler struct {
|
type SignalingHandler struct {
|
||||||
c *entities.Config
|
c *entities.Config
|
||||||
l *zap.SugaredLogger
|
l *zap.SugaredLogger
|
||||||
webRTCController *controllers.WebRTCController
|
webRTCController *controllers.WebRTCController
|
||||||
srtController *controllers.SRTController
|
mapper *mapper.Mapper
|
||||||
streamingController *controllers.StreamingController
|
donut *engine.DonutEngineController
|
||||||
srtMpegTSprober *probers.SrtMpegTs
|
|
||||||
mapper *mapper.Mapper
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSignalingHandler(
|
func NewSignalingHandler(
|
||||||
c *entities.Config,
|
c *entities.Config,
|
||||||
log *zap.SugaredLogger,
|
log *zap.SugaredLogger,
|
||||||
webRTCController *controllers.WebRTCController,
|
webRTCController *controllers.WebRTCController,
|
||||||
srtController *controllers.SRTController,
|
|
||||||
streamingController *controllers.StreamingController,
|
|
||||||
srtMpegTSprober *probers.SrtMpegTs,
|
|
||||||
mapper *mapper.Mapper,
|
mapper *mapper.Mapper,
|
||||||
|
donut *engine.DonutEngineController,
|
||||||
) *SignalingHandler {
|
) *SignalingHandler {
|
||||||
return &SignalingHandler{
|
return &SignalingHandler{
|
||||||
c: c,
|
c: c,
|
||||||
l: log,
|
l: log,
|
||||||
webRTCController: webRTCController,
|
webRTCController: webRTCController,
|
||||||
srtController: srtController,
|
mapper: mapper,
|
||||||
streamingController: streamingController,
|
donut: donut,
|
||||||
srtMpegTSprober: srtMpegTSprober,
|
|
||||||
mapper: mapper,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *SignalingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) error {
|
func (h *SignalingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) error {
|
||||||
if r.Method != http.MethodPost {
|
params, err := h.createAndValidateParams(w, r)
|
||||||
return entities.ErrHTTPPostOnly
|
if err != nil {
|
||||||
}
|
|
||||||
|
|
||||||
params := entities.RequestParams{}
|
|
||||||
if err := json.NewDecoder(r.Body).Decode(¶ms); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := params.Valid(); err != nil {
|
|
||||||
|
// It decides which prober and streamer should be used based on the parameters (server-side protocol).
|
||||||
|
donutEngine, err := h.donut.EngineFor(¶ms)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// real stream info from server
|
||||||
|
serverStreamInfo, err := donutEngine.Prober().StreamInfo(¶ms)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// client stream info support from the client (browser)
|
||||||
|
// TODO: evaluate to move this code either inside webrtc or to a prober
|
||||||
|
clientStreamInfo, err := h.mapper.FromWebRTCSessionDescriptionToStreamInfo(params.Offer)
|
||||||
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -62,47 +69,32 @@ func (h *SignalingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) err
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// real stream info from server
|
// TODO: introduce a mode to deal with transcoding recipes
|
||||||
serverStreamInfo, err := h.srtMpegTSprober.StreamInfo(¶ms)
|
// selects prober media that client and server has adverted.
|
||||||
if err != nil {
|
compatibleStreams := donutEngine.CompatibleStreamsFor(serverStreamInfo, clientStreamInfo)
|
||||||
return err
|
if compatibleStreams == nil || len(compatibleStreams) == 0 {
|
||||||
}
|
return entities.ErrMissingCompatibleStreams
|
||||||
// client stream info support from the client (browser)
|
|
||||||
// clientStreamInfo, err := h.mapper.FromWebRTCSessionDescriptionToStreamInfo(params.Offer)
|
|
||||||
// if err != nil {
|
|
||||||
// h.l.Errorw("error while fetching server stream info",
|
|
||||||
// "error", err,
|
|
||||||
// )
|
|
||||||
// return err
|
|
||||||
// }
|
|
||||||
// TODO: create tracks according with SRT available streams
|
|
||||||
// for st := range serverStreamInfo.Streams {
|
|
||||||
// }
|
|
||||||
|
|
||||||
// Create a video track
|
|
||||||
videoTrack, err := h.webRTCController.CreateTrack(
|
|
||||||
peer,
|
|
||||||
entities.Stream{
|
|
||||||
Codec: entities.H264,
|
|
||||||
}, "video", params.SRTStreamID,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a audio track
|
var videoTrack *webrtc.TrackLocalStaticSample
|
||||||
// audioTrack, err := h.webRTCController.CreateTrack(
|
// var audioTrack *webrtc.TrackLocalStaticSample
|
||||||
// peer,
|
|
||||||
// entities.Stream{
|
for _, st := range compatibleStreams {
|
||||||
// Codec: entities.AAC,
|
// TODO: make the mapping less dependent on type
|
||||||
// }, "audio", params.SRTStreamID,
|
if st.Type == entities.VideoType {
|
||||||
// )
|
videoTrack, err = h.webRTCController.CreateTrack(
|
||||||
// if err != nil {
|
peer,
|
||||||
// h.l.Errorw("error while creating a web rtc track",
|
st,
|
||||||
// "error", err,
|
string(st.Type), // "video" or "audio"
|
||||||
// )
|
params.SRTStreamID,
|
||||||
// return err
|
)
|
||||||
// }
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
// if st.Type == entities.AudioType {
|
||||||
|
}
|
||||||
|
|
||||||
metadataSender, err := h.webRTCController.CreateDataChannel(peer, entities.MetadataChannelID)
|
metadataSender, err := h.webRTCController.CreateDataChannel(peer, entities.MetadataChannelID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -118,19 +110,15 @@ func (h *SignalingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) err
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
srtConnection, err := h.srtController.Connect(cancel, ¶ms)
|
go donutEngine.Streamer().Stream(&entities.StreamParameters{
|
||||||
if err != nil {
|
Cancel: cancel,
|
||||||
return err
|
Ctx: ctx,
|
||||||
}
|
WebRTCConn: peer,
|
||||||
|
RequestParams: ¶ms,
|
||||||
go h.streamingController.Stream(&entities.StreamParameters{
|
VideoTrack: videoTrack,
|
||||||
Cancel: cancel,
|
MetadataTrack: metadataSender,
|
||||||
Ctx: ctx,
|
ServerStreamInfo: serverStreamInfo,
|
||||||
WebRTCConn: peer,
|
ClientStreamInfo: clientStreamInfo,
|
||||||
SRTConnection: srtConnection,
|
|
||||||
VideoTrack: videoTrack,
|
|
||||||
MetadataTrack: metadataSender,
|
|
||||||
StreamInfo: serverStreamInfo,
|
|
||||||
})
|
})
|
||||||
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
@@ -143,3 +131,19 @@ func (h *SignalingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) err
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *SignalingHandler) createAndValidateParams(w http.ResponseWriter, r *http.Request) (entities.RequestParams, error) {
|
||||||
|
if r.Method != http.MethodPost {
|
||||||
|
return entities.RequestParams{}, entities.ErrHTTPPostOnly
|
||||||
|
}
|
||||||
|
|
||||||
|
params := entities.RequestParams{}
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(¶ms); err != nil {
|
||||||
|
return entities.RequestParams{}, err
|
||||||
|
}
|
||||||
|
if err := params.Valid(); err != nil {
|
||||||
|
return entities.RequestParams{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return params, nil
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user