diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..9e7f782 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,19 @@ +# The .dockerignore file excludes files from the container build process. +# +# https://docs.docker.com/engine/reference/builder/#dockerignore-file + +# Exclude locally vendored dependencies. +vendor/ + +# Exclude "build-time" ignore files. +.dockerignore +.gcloudignore +.github +.vscode +.git + +# Exclude git history and configuration. +.gitignore +tags +probers.test +coverage.out \ No newline at end of file diff --git a/doc/DEV_LOG.md b/doc/DEV_LOG.md index 74e0ae7..e4a5dec 100644 --- a/doc/DEV_LOG.md +++ b/doc/DEV_LOG.md @@ -27,6 +27,8 @@ go donutEngine.Stream( ) ``` +ref https://wiki.xiph.org/Opus_Recommended_Settings 48000 webrtc + ## Date: 2/4/24 ### Summary: Adding audio track diff --git a/internal/controllers/streamers/interface.go b/internal/controllers/streamers/interface.go index 7d4dd39..4212f4c 100644 --- a/internal/controllers/streamers/interface.go +++ b/internal/controllers/streamers/interface.go @@ -3,6 +3,6 @@ package streamers import "github.com/flavioribeiro/donut/internal/entities" type DonutStreamer interface { - Stream(sp *entities.StreamParameters) + Stream(p *entities.DonutParameters) Match(req *entities.RequestParams) bool } diff --git a/internal/controllers/streamers/libav_ffmpeg.go b/internal/controllers/streamers/libav_ffmpeg.go index 8dfad1f..e66857f 100644 --- a/internal/controllers/streamers/libav_ffmpeg.go +++ b/internal/controllers/streamers/libav_ffmpeg.go @@ -10,7 +10,6 @@ import ( "github.com/asticode/go-astiav" "github.com/asticode/go-astikit" "github.com/flavioribeiro/donut/internal/entities" - "github.com/pion/webrtc/v3/pkg/media" "go.uber.org/fx" "go.uber.org/zap" ) @@ -19,15 +18,14 @@ type LibAVFFmpegStreamer struct { c *entities.Config l *zap.SugaredLogger - middlewares []entities.StreamMiddleware + lastAudioFrameDTS float64 + currentAudioFrameSize float64 } type LibAVFFmpegStreamerParams struct { fx.In C *entities.Config L *zap.SugaredLogger - - Middlewares []entities.StreamMiddleware `group:"middlewares"` } type ResultLibAVFFmpegStreamer struct { @@ -38,18 +36,14 @@ type ResultLibAVFFmpegStreamer struct { func NewLibAVFFmpegStreamer(p LibAVFFmpegStreamerParams) ResultLibAVFFmpegStreamer { return ResultLibAVFFmpegStreamer{ LibAVFFmpegStreamer: &LibAVFFmpegStreamer{ - c: p.C, - l: p.L, - middlewares: p.Middlewares, + c: p.C, + l: p.L, }, } } func (c *LibAVFFmpegStreamer) Match(req *entities.RequestParams) bool { - if req.SRTHost != "" { - return true - } - return false + return req.SRTHost != "" } type streamContext struct { @@ -64,20 +58,18 @@ type params struct { streams map[int]*streamContext } -func (c *LibAVFFmpegStreamer) Stream(sp *entities.StreamParameters) { +func (c *LibAVFFmpegStreamer) Stream(donut *entities.DonutParameters) { c.l.Infow("streaming has started") closer := astikit.NewCloser() defer closer.Close() - defer sp.WebRTCConn.Close() - defer sp.Cancel() p := ¶ms{ streams: make(map[int]*streamContext), } - if err := c.prepareInput(p, closer, sp); err != nil { - c.l.Errorf("ffmpeg/libav: failed at prepareInput %s", err.Error()) + if err := c.prepareInput(p, closer, donut); err != nil { + c.onError(err, donut) return } @@ -86,14 +78,12 @@ func (c *LibAVFFmpegStreamer) Stream(sp *entities.StreamParameters) { for { select { - case <-sp.Ctx.Done(): - if errors.Is(sp.Ctx.Err(), context.Canceled) { + case <-donut.Ctx.Done(): + if errors.Is(donut.Ctx.Err(), context.Canceled) { c.l.Infow("streaming has stopped due cancellation") return } - c.l.Errorw("streaming has stopped due errors", - "error", sp.Ctx.Err(), - ) + c.onError(donut.Ctx.Err(), donut) return default: @@ -101,7 +91,7 @@ func (c *LibAVFFmpegStreamer) Stream(sp *entities.StreamParameters) { if errors.Is(err, astiav.ErrEof) { break } - c.l.Fatalf("ffmpeg/libav: reading frame failed %s", err.Error()) + c.onError(err, donut) } s, ok := p.streams[pkt.StreamIndex()] @@ -110,33 +100,43 @@ func (c *LibAVFFmpegStreamer) Stream(sp *entities.StreamParameters) { } pkt.RescaleTs(s.inputStream.TimeBase(), s.decCodecContext.TimeBase()) + audioDuration := c.defineAudioDuration(s, pkt) + videoDuration := c.defineVideoDuration(s, pkt) + if s.inputStream.CodecParameters().MediaType() == astiav.MediaTypeVideo { - if err := sp.VideoTrack.WriteSample(media.Sample{Data: pkt.Data(), Duration: time.Second / 30}); err != nil { - c.l.Errorw("ffmpeg/libav: failed to write video to web rtc", - "error", err, - ) - return + if donut.OnVideoFrame != nil { + if err := donut.OnVideoFrame(pkt.Data(), entities.MediaFrameContext{ + PTS: int(pkt.Pts()), + DTS: int(pkt.Dts()), + Duration: videoDuration, + }); err != nil { + c.onError(err, donut) + return + } } } - // if err := s.decCodecContext.SendPacket(pkt); err != nil { - // c.l.Fatalf("ffmpeg/libav: sending packet failed %s", err.Error()) - // } - - // for { - // if err := s.decCodecContext.ReceiveFrame(s.decFrame); err != nil { - // if errors.Is(err, astiav.ErrEof) || errors.Is(err, astiav.ErrEagain) { - // break - // } - // c.l.Fatalf("ffmpeg/libav: receiving frame failed %s", err.Error()) - // } - // } - + if s.inputStream.CodecParameters().MediaType() == astiav.MediaTypeAudio { + if donut.OnAudioFrame != nil { + donut.OnAudioFrame(pkt.Data(), entities.MediaFrameContext{ + PTS: int(pkt.Pts()), + DTS: int(pkt.Dts()), + Duration: audioDuration, + }) + } + } } } } -func (c *LibAVFFmpegStreamer) prepareInput(p *params, closer *astikit.Closer, sp *entities.StreamParameters) error { +func (c *LibAVFFmpegStreamer) onError(err error, p *entities.DonutParameters) { + if p.OnError != nil { + p.OnError(err) + } +} + +func (c *LibAVFFmpegStreamer) prepareInput(p *params, closer *astikit.Closer, donut *entities.DonutParameters) error { + // good for debugging astiav.SetLogLevel(astiav.LogLevelDebug) astiav.SetLogCallback(func(l astiav.LogLevel, fmt, msg, parent string) { c.l.Infof("ffmpeg log: %s (level: %d)", strings.TrimSpace(msg), l) @@ -147,27 +147,15 @@ func (c *LibAVFFmpegStreamer) prepareInput(p *params, closer *astikit.Closer, sp } closer.Add(p.inputFormatContext.Free) - // TODO: add an UI element for sub-type (format) when input is srt:// (defaulting to mpeg-ts) - // We're assuming that SRT is carrying mpegts. - userProvidedInputFormat := "mpegts" - - inputFormat := astiav.FindInputFormat(userProvidedInputFormat) - if inputFormat == nil { - return errors.New(fmt.Sprintf("ffmpeg/libav: could not find %s", userProvidedInputFormat)) + inputFormat, err := c.defineInputFormat(donut.StreamFormat) + if err != nil { + return err } - - d := &astiav.Dictionary{} - // ref https://ffmpeg.org/ffmpeg-all.html#srt - // flags (the zeroed 3rd value) https://github.com/FFmpeg/FFmpeg/blob/n5.0/libavutil/dict.h#L67C9-L77 - d.Set("srt_streamid", sp.RequestParams.SRTStreamID, 0) - d.Set("smoother", "live", 0) - d.Set("transtype", "live", 0) - - inputURL := fmt.Sprintf("srt://%s:%d", sp.RequestParams.SRTHost, sp.RequestParams.SRTPort) - - if err := p.inputFormatContext.OpenInput(inputURL, inputFormat, d); err != nil { + inputOptions := c.defineInputOptions(donut, closer) + if err := p.inputFormatContext.OpenInput(donut.StreamURL, inputFormat, inputOptions); err != nil { return errors.New(fmt.Sprintf("ffmpeg/libav: opening input failed %s", err.Error())) } + closer.Add(p.inputFormatContext.CloseInput) if err := p.inputFormatContext.FindStreamInfo(nil); err != nil { @@ -211,3 +199,64 @@ func (c *LibAVFFmpegStreamer) prepareInput(p *params, closer *astikit.Closer, sp } return nil } + +func (c *LibAVFFmpegStreamer) defineInputFormat(streamFormat string) (*astiav.InputFormat, error) { + if streamFormat != "" { + inputFormat := astiav.FindInputFormat(streamFormat) + if inputFormat == nil { + return nil, errors.New(fmt.Sprintf("ffmpeg/libav: could not find %s input format", streamFormat)) + } + } + return nil, nil +} + +func (c *LibAVFFmpegStreamer) defineInputOptions(p *entities.DonutParameters, closer *astikit.Closer) *astiav.Dictionary { + if strings.Contains(strings.ToLower(p.StreamURL), "srt:") { + d := &astiav.Dictionary{} + closer.Add(d.Free) + + // ref https://ffmpeg.org/ffmpeg-all.html#srt + // flags (the zeroed 3rd value) https://github.com/FFmpeg/FFmpeg/blob/n5.0/libavutil/dict.h#L67C9-L77 + d.Set("srt_streamid", p.StreamID, 0) + d.Set("smoother", "live", 0) + d.Set("transtype", "live", 0) + return d + } + return nil +} + +func (c *LibAVFFmpegStreamer) defineAudioDuration(s *streamContext, pkt *astiav.Packet) time.Duration { + audioDuration := time.Duration(0) + if s.inputStream.CodecParameters().MediaType() == astiav.MediaTypeAudio { + // Audio + // + // dur = 0,023219954648526078 + // sample = 44100 + // frameSize = 1024 (or 960 for aac, but it could be variable for opus) + // 1s = dur * (sample/frameSize) + // ref https://developer.apple.com/documentation/coreaudiotypes/audiostreambasicdescription/1423257-mframesperpacket + + // TODO: handle wraparound + c.currentAudioFrameSize = float64(pkt.Dts()) - c.lastAudioFrameDTS + c.lastAudioFrameDTS = float64(pkt.Dts()) + sampleRate := float64(s.inputStream.CodecParameters().SampleRate()) + audioDuration = time.Duration((c.currentAudioFrameSize / sampleRate) * float64(time.Second)) + } + return audioDuration +} + +func (c *LibAVFFmpegStreamer) defineVideoDuration(s *streamContext, pkt *astiav.Packet) time.Duration { + videoDuration := time.Duration(0) + if s.inputStream.CodecParameters().MediaType() == astiav.MediaTypeVideo { + // Video + // + // dur = 0,033333 + // sample = 30 + // frameSize = 1 + // 1s = dur * (sample/frameSize) + + // we're assuming fixed video frame rate + videoDuration = time.Duration((float64(1) / float64(s.inputStream.AvgFrameRate().Num())) * float64(time.Second)) + } + return videoDuration +} diff --git a/internal/controllers/streamers/srt_mpegts.go b/internal/controllers/streamers/srt_mpegts.go deleted file mode 100644 index cf4da7d..0000000 --- a/internal/controllers/streamers/srt_mpegts.go +++ /dev/null @@ -1,187 +0,0 @@ -package streamers - -import ( - "context" - "errors" - "io" - "time" - - astisrt "github.com/asticode/go-astisrt/pkg" - "github.com/asticode/go-astits" - "github.com/flavioribeiro/donut/internal/entities" - "github.com/pion/webrtc/v3/pkg/media" - "go.uber.org/fx" - "go.uber.org/zap" -) - -type SRTMpegTSStreamer struct { - c *entities.Config - l *zap.SugaredLogger - - middlewares []entities.StreamMiddleware -} - -type SRTMpegTSStreamerParams struct { - fx.In - C *entities.Config - L *zap.SugaredLogger - - Middlewares []entities.StreamMiddleware `group:"middlewares"` -} - -type ResultSRTMpegTSStreamer struct { - fx.Out - SRTMpegTSStreamer DonutStreamer `group:"streamers"` -} - -func NewSRTMpegTSStreamer(p SRTMpegTSStreamerParams) ResultSRTMpegTSStreamer { - return ResultSRTMpegTSStreamer{ - SRTMpegTSStreamer: &SRTMpegTSStreamer{ - c: p.C, - l: p.L, - middlewares: p.Middlewares, - }, - } -} - -func (c *SRTMpegTSStreamer) Match(req *entities.RequestParams) bool { - if req.SRTHost != "" { - return false - } - return false -} - -func (c *SRTMpegTSStreamer) Stream(sp *entities.StreamParameters) { - srtConnection, err := c.connect(sp.Cancel, sp.RequestParams) - if err != nil { - c.l.Errorw("streaming has stopped due errors", - "error", sp.Ctx.Err(), - ) - return - } - r, w := io.Pipe() - - defer r.Close() - defer w.Close() - defer srtConnection.Close() - defer sp.WebRTCConn.Close() - defer sp.Cancel() - - go c.readFromSRTIntoWriterPipe(srtConnection, w) - - // reading from reader pipe to the mpeg-ts demuxer - mpegTSDemuxer := astits.NewDemuxer(sp.Ctx, r) - - c.l.Infow("streaming has started") - - for { - select { - case <-sp.Ctx.Done(): - if errors.Is(sp.Ctx.Err(), context.Canceled) { - c.l.Infow("streaming has stopped due cancellation") - return - } - c.l.Errorw("streaming has stopped due errors", - "error", sp.Ctx.Err(), - ) - return - default: - // fetching mpeg-ts data - // ref https://tsduck.io/download/docs/mpegts-introduction.pdf - mpegTSDemuxData, err := mpegTSDemuxer.NextData() - if err != nil { - c.l.Errorw("failed to demux mpeg-ts", - "error", err, - ) - return - } - - // writing mpeg-ts video to webrtc channels - for _, v := range sp.ServerStreamInfo.VideoStreams() { - if v.Id != mpegTSDemuxData.PID { - continue - } - - if err := sp.VideoTrack.WriteSample(media.Sample{Data: mpegTSDemuxData.PES.Data, Duration: time.Second / 30}); err != nil { - c.l.Errorw("failed to write an mpeg-ts to web rtc", - "error", err, - ) - return - } - } - - // calling all registered middlewares - for _, m := range c.middlewares { - err = m.Act(mpegTSDemuxData, sp) - if err != nil { - c.l.Errorw("middleware error", - "error", err, - ) - } - } - } - } -} - -func (c *SRTMpegTSStreamer) readFromSRTIntoWriterPipe(srtConnection *astisrt.Connection, w *io.PipeWriter) { - defer srtConnection.Close() - - inboundMpegTsPacket := make([]byte, c.c.SRTReadBufferSizeBytes) - - for { - n, err := srtConnection.Read(inboundMpegTsPacket) - if err != nil { - c.l.Errorw("str conn failed to write data to buffer", - "error", err, - ) - break - } - - if _, err := w.Write(inboundMpegTsPacket[:n]); err != nil { - c.l.Errorw("failed to write mpeg-ts into the pipe", - "error", err, - ) - break - } - } -} - -// TODO: move to its own component later dup streamer.srt_mpegts, prober.srt_mpegts -func (c *SRTMpegTSStreamer) connect(cancel context.CancelFunc, params *entities.RequestParams) (*astisrt.Connection, error) { - c.l.Info("trying to connect srt") - - if err := params.Valid(); err != nil { - return nil, err - } - - c.l.Infow("Connecting to SRT ", - "offer", params.String(), - ) - - conn, err := astisrt.Dial(astisrt.DialOptions{ - ConnectionOptions: []astisrt.ConnectionOption{ - astisrt.WithLatency(c.c.SRTConnectionLatencyMS), - astisrt.WithStreamid(params.SRTStreamID), - astisrt.WithCongestion("live"), - astisrt.WithTranstype(astisrt.Transtype(astisrt.TranstypeLive)), - }, - - OnDisconnect: func(conn *astisrt.Connection, err error) { - c.l.Infow("Canceling SRT", - "error", err, - ) - cancel() - }, - - Host: params.SRTHost, - Port: params.SRTPort, - }) - if err != nil { - c.l.Errorw("failed to connect srt", - "error", err, - ) - return nil, err - } - c.l.Infow("Connected to SRT") - return conn, nil -} diff --git a/internal/controllers/streammiddlewares/mpegts.go b/internal/controllers/streammiddlewares/mpegts.go index 5c1fa00..90d368d 100644 --- a/internal/controllers/streammiddlewares/mpegts.go +++ b/internal/controllers/streammiddlewares/mpegts.go @@ -1,86 +1,87 @@ package streammiddlewares -import ( - "encoding/json" +// import ( +// "encoding/json" - "github.com/asticode/go-astits" - "github.com/flavioribeiro/donut/internal/entities" - "github.com/flavioribeiro/donut/internal/mapper" - "go.uber.org/fx" -) +// "github.com/asticode/go-astits" +// "github.com/flavioribeiro/donut/internal/entities" +// "github.com/flavioribeiro/donut/internal/mapper" +// "go.uber.org/fx" +// ) -type eia608Middleware struct{} +// type eia608Middleware struct{} -type EIA608Response struct { - fx.Out - EIA608Middleware entities.StreamMiddleware `group:"middlewares"` -} +// type EIA608Response struct { +// fx.Out +// EIA608Middleware entities.StreamMiddleware `group:"middlewares"` +// } -// NewEIA608 creates a new EIA608 middleware -func NewEIA608() EIA608Response { - return EIA608Response{ - EIA608Middleware: &eia608Middleware{}, - } -} +// // TODO: migrate to donutparameters.onEvent api +// // NewEIA608 creates a new EIA608 middleware +// func NewEIA608() EIA608Response { +// return EIA608Response{ +// EIA608Middleware: &eia608Middleware{}, +// } +// } -// Act parses and send eia608 data from mpeg-ts to metadata channel -func (*eia608Middleware) Act(mpegTSDemuxData *astits.DemuxerData, sp *entities.StreamParameters) error { - vs := sp.ServerStreamInfo.VideoStreams() - eia608Reader := newEIA608Reader() +// // Act parses and send eia608 data from mpeg-ts to metadata channel +// func (*eia608Middleware) Act(mpegTSDemuxData *astits.DemuxerData, sp *entities.StreamParameters) error { +// vs := sp.ServerStreamInfo.VideoStreams() +// eia608Reader := newEIA608Reader() - for _, v := range vs { - if mpegTSDemuxData.PES != nil && v.Codec == entities.H264 { - captions, err := eia608Reader.parse(mpegTSDemuxData.PES) - if err != nil { - return err - } +// for _, v := range vs { +// if mpegTSDemuxData.PES != nil && v.Codec == entities.H264 { +// captions, err := eia608Reader.parse(mpegTSDemuxData.PES) +// if err != nil { +// return err +// } - if captions != "" { - captionsMsg, err := eia608Reader.buildCaptionsMessage(mpegTSDemuxData.PES.Header.OptionalHeader.PTS, captions) - if err != nil { - return err - } - sp.MetadataTrack.SendText(captionsMsg) - } - } - } - return nil -} +// if captions != "" { +// captionsMsg, err := eia608Reader.buildCaptionsMessage(mpegTSDemuxData.PES.Header.OptionalHeader.PTS, captions) +// if err != nil { +// return err +// } +// sp.MetadataTrack.SendText(captionsMsg) +// } +// } +// } +// return nil +// } -type streamInfoMiddleware struct { - m *mapper.Mapper -} +// type streamInfoMiddleware struct { +// m *mapper.Mapper +// } -type StreamInfoResponse struct { - fx.Out - StreamInfoMiddleware entities.StreamMiddleware `group:"middlewares"` -} +// type StreamInfoResponse struct { +// fx.Out +// StreamInfoMiddleware entities.StreamMiddleware `group:"middlewares"` +// } -// NewStreamInfo creates a new StreamInfo middleware -func NewStreamInfo(m *mapper.Mapper) StreamInfoResponse { - return StreamInfoResponse{ - StreamInfoMiddleware: &streamInfoMiddleware{m: m}, - } -} +// // NewStreamInfo creates a new StreamInfo middleware +// func NewStreamInfo(m *mapper.Mapper) StreamInfoResponse { +// return StreamInfoResponse{ +// StreamInfoMiddleware: &streamInfoMiddleware{m: m}, +// } +// } -// Act parses and send StreamInfo data from mpeg-ts to metadata channel -func (s *streamInfoMiddleware) Act(mpegTSDemuxData *astits.DemuxerData, sp *entities.StreamParameters) error { - var streams []entities.Stream - // TODO: check if it makes sense to move this code to a mapper - if mpegTSDemuxData.PMT != nil { - for _, es := range mpegTSDemuxData.PMT.ElementaryStreams { - streams = append(streams, s.m.FromStreamTypeToEntityStream(es)) - } - } +// // Act parses and send StreamInfo data from mpeg-ts to metadata channel +// func (s *streamInfoMiddleware) Act(mpegTSDemuxData *astits.DemuxerData, sp *entities.StreamParameters) error { +// var streams []entities.Stream +// // TODO: check if it makes sense to move this code to a mapper +// if mpegTSDemuxData.PMT != nil { +// for _, es := range mpegTSDemuxData.PMT.ElementaryStreams { +// streams = append(streams, s.m.FromStreamTypeToEntityStream(es)) +// } +// } - msgs := s.m.FromStreamInfoToEntityMessages(&entities.StreamInfo{Streams: streams}) - for _, m := range msgs { - msg, err := json.Marshal(m) - if err != nil { - return err - } - sp.MetadataTrack.SendText(string(msg)) - } +// msgs := s.m.FromStreamInfoToEntityMessages(&entities.StreamInfo{Streams: streams}) +// for _, m := range msgs { +// msg, err := json.Marshal(m) +// if err != nil { +// return err +// } +// sp.MetadataTrack.SendText(string(msg)) +// } - return nil -} +// return nil +// } diff --git a/internal/controllers/webrtc_controller.go b/internal/controllers/webrtc_controller.go index 2fa1386..8c5c0ba 100644 --- a/internal/controllers/webrtc_controller.go +++ b/internal/controllers/webrtc_controller.go @@ -7,6 +7,7 @@ import ( "github.com/flavioribeiro/donut/internal/entities" "github.com/flavioribeiro/donut/internal/mapper" "github.com/pion/webrtc/v3" + "github.com/pion/webrtc/v3/pkg/media" "go.uber.org/zap" ) @@ -118,6 +119,13 @@ func (c *WebRTCController) GatheringWebRTC(peer *webrtc.PeerConnection) (*webrtc return peer.LocalDescription(), nil } +func (c *WebRTCController) SendVideoSample(videoTrack *webrtc.TrackLocalStaticSample, data []byte, mediaCtx entities.MediaFrameContext) error { + if err := videoTrack.WriteSample(media.Sample{Data: data, Duration: mediaCtx.Duration}); err != nil { + return err + } + return nil +} + func NewWebRTCSettingsEngine(c *entities.Config, tcpListener net.Listener, udpListener net.PacketConn) webrtc.SettingEngine { settingEngine := webrtc.SettingEngine{} diff --git a/internal/entities/entities.go b/internal/entities/entities.go index ec9e343..66aadce 100644 --- a/internal/entities/entities.go +++ b/internal/entities/entities.go @@ -3,8 +3,8 @@ package entities import ( "context" "fmt" + "time" - "github.com/asticode/go-astits" "github.com/pion/webrtc/v3" ) @@ -83,6 +83,15 @@ type Stream struct { Index uint16 } +type MediaFrameContext struct { + // DTS decoding timestamp + DTS int + // PTS presentation timestamp + PTS int + // Media frame duration + Duration time.Duration +} + type StreamInfo struct { Streams []Stream } @@ -113,22 +122,22 @@ type Cue struct { Text string } -type StreamParameters struct { - WebRTCConn *webrtc.PeerConnection - Cancel context.CancelFunc - Ctx context.Context - RequestParams *RequestParams - VideoTrack *webrtc.TrackLocalStaticSample - MetadataTrack *webrtc.DataChannel - ServerStreamInfo *StreamInfo - ClientStreamInfo *StreamInfo - StreamMiddlewares []StreamMiddleware -} +type DonutParameters struct { + Cancel context.CancelFunc + Ctx context.Context -// StreamMiddleware is a component to act while streaming. -// Most implementations are at /internal/controllers/streammiddlewares/ -type StreamMiddleware interface { - Act(mpegTSDemuxData *astits.DemuxerData, sp *StreamParameters) error + StreamID string // ie: live001, channel01 + StreamFormat string // ie: flv, mpegts + StreamURL string // ie: srt://host:9080, rtmp://host:4991 + + TranscodeVideoCodec Codec // ie: vp8 + TranscodeAudioCodec Codec // ie: opus + + OnClose func() + OnError func(err error) + OnStream func(st *Stream) + OnVideoFrame func(data []byte, c MediaFrameContext) error + OnAudioFrame func(data []byte, c MediaFrameContext) error } type Config struct { diff --git a/internal/web/dependencies.go b/internal/web/dependencies.go index f9096d2..3a4bf24 100644 --- a/internal/web/dependencies.go +++ b/internal/web/dependencies.go @@ -7,7 +7,6 @@ import ( "github.com/flavioribeiro/donut/internal/controllers/engine" "github.com/flavioribeiro/donut/internal/controllers/probers" "github.com/flavioribeiro/donut/internal/controllers/streamers" - "github.com/flavioribeiro/donut/internal/controllers/streammiddlewares" "github.com/flavioribeiro/donut/internal/entities" "github.com/flavioribeiro/donut/internal/mapper" "github.com/flavioribeiro/donut/internal/web/handlers" @@ -44,15 +43,14 @@ func Dependencies(enableICEMux bool) fx.Option { fx.Provide(controllers.NewWebRTCSettingsEngine), fx.Provide(controllers.NewWebRTCMediaEngine), fx.Provide(controllers.NewWebRTCAPI), - fx.Provide(streamers.NewSRTMpegTSStreamer), fx.Provide(streamers.NewLibAVFFmpegStreamer), fx.Provide(probers.NewLibAVFFmpeg), fx.Provide(engine.NewDonutEngineController), - // Stream middlewares - fx.Provide(streammiddlewares.NewStreamInfo), - fx.Provide(streammiddlewares.NewEIA608), + // // Stream middlewares + // fx.Provide(streammiddlewares.NewStreamInfo), + // fx.Provide(streammiddlewares.NewEIA608), // Mappers fx.Provide(mapper.NewMapper), diff --git a/internal/web/handlers/signaling.go b/internal/web/handlers/signaling.go index 9d7d86e..1408fce 100644 --- a/internal/web/handlers/signaling.go +++ b/internal/web/handlers/signaling.go @@ -3,6 +3,7 @@ package handlers import ( "context" "encoding/json" + "fmt" "net/http" "github.com/flavioribeiro/donut/internal/controllers" @@ -80,7 +81,7 @@ func (h *SignalingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) err h.l.Info("we must transcode") } - if compatibleStreams == nil || len(compatibleStreams) == 0 { + if len(compatibleStreams) == 0 { return entities.ErrMissingCompatibleStreams } @@ -118,15 +119,33 @@ func (h *SignalingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) err return err } - go donutEngine.Streamer().Stream(&entities.StreamParameters{ - Cancel: cancel, - Ctx: ctx, - WebRTCConn: peer, - RequestParams: ¶ms, - VideoTrack: videoTrack, - MetadataTrack: metadataSender, - ServerStreamInfo: serverStreamInfo, - ClientStreamInfo: clientStreamInfo, + go donutEngine.Streamer().Stream(&entities.DonutParameters{ + Cancel: cancel, + Ctx: ctx, + + // TODO: add an UI element for the sub-type (format) when input is srt:// + // We're assuming that SRT is carrying mpegts. + StreamFormat: "mpegts", + StreamID: params.SRTStreamID, + StreamURL: fmt.Sprintf("srt://%s:%d", params.SRTHost, params.SRTPort), + + OnClose: func() { + cancel() + peer.Close() + }, + OnError: func(err error) { + h.l.Errorw("error while streaming", "error", err) + }, + OnStream: func(st *entities.Stream) { + h.sendStreamInfoToMetadata(st, metadataSender) + }, + OnVideoFrame: func(data []byte, c entities.MediaFrameContext) error { + return h.webRTCController.SendVideoSample(videoTrack, data, c) + }, + OnAudioFrame: func(data []byte, c entities.MediaFrameContext) error { + // TODO: implement + return nil + }, }) w.Header().Set("Content-Type", "application/json") @@ -140,6 +159,16 @@ func (h *SignalingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) err return nil } +func (h *SignalingHandler) sendStreamInfoToMetadata(st *entities.Stream, md *webrtc.DataChannel) { + msg := h.mapper.FromStreamToEntityMessage(*st) + msgBytes, err := json.Marshal(msg) + if err != nil { + h.l.Errorw("error marshalling stream message", "error", err) + return + } + md.SendText(string(msgBytes)) +} + func (h *SignalingHandler) createAndValidateParams(w http.ResponseWriter, r *http.Request) (entities.RequestParams, error) { if r.Method != http.MethodPost { return entities.RequestParams{}, entities.ErrHTTPPostOnly