diff --git a/doc/DEV_LOG.md b/doc/DEV_LOG.md index e4a5dec..b652eef 100644 --- a/doc/DEV_LOG.md +++ b/doc/DEV_LOG.md @@ -28,6 +28,7 @@ go donutEngine.Stream( ``` ref https://wiki.xiph.org/Opus_Recommended_Settings 48000 webrtc +ref https://ffmpeg.org/ffmpeg-codecs.html#libopus-1 opus ## Date: 2/4/24 ### Summary: Adding audio track diff --git a/internal/controllers/engine/donut_engine_controller.go b/internal/controllers/engine/donut_engine_controller.go index 5a8cbb0..5af834f 100644 --- a/internal/controllers/engine/donut_engine_controller.go +++ b/internal/controllers/engine/donut_engine_controller.go @@ -12,7 +12,7 @@ import ( type DonutEngine interface { Prober() probers.DonutProber Streamer() streamers.DonutStreamer - CompatibleStreamsFor(server, client *entities.StreamInfo) ([]entities.Stream, bool) + RecipeFor(req *entities.RequestParams, server, client *entities.StreamInfo) *entities.DonutRecipe } type DonutEngineParams struct { @@ -79,7 +79,35 @@ func (d *donutEngine) Streamer() streamers.DonutStreamer { return d.streamer } -func (d *donutEngine) CompatibleStreamsFor(server, client *entities.StreamInfo) ([]entities.Stream, bool) { +func (d *donutEngine) RecipeFor(req *entities.RequestParams, server, client *entities.StreamInfo) *entities.DonutRecipe { // TODO: implement proper matching - return server.Streams, true + r := &entities.DonutRecipe{ + Input: entities.DonutInput{ + Format: "mpegts", // it'll change based on input, i.e. rmtp flv + Options: map[entities.DonutInputOptionKey]string{ + entities.DonutSRTStreamID: req.SRTStreamID, + entities.DonutSRTTranstype: "live", + entities.DonutSRTsmoother: "live", + }, + }, + Video: entities.DonutMediaTask{ + Action: entities.DonutBypass, + Codec: entities.H264, + }, + Audio: entities.DonutMediaTask{ + Action: entities.DonutTranscode, + Codec: entities.Opus, + // TODO: create method list options per Codec + CodecContextOptions: []entities.LibAVOptionsCodecContext{ + // opus specifically works under 48000 Hz + entities.SetSampleRate(48000), + // once we changed the sample rate we need to update the time base + entities.SetTimeBase(1, 48000), + // for some reason it's setting "s16" + // entities.SetSampleFormat("fltp"), + }, + }, + } + + return r } diff --git a/internal/controllers/streamers/libav_ffmpeg.go b/internal/controllers/streamers/libav_ffmpeg.go index bef7dc7..3832b38 100644 --- a/internal/controllers/streamers/libav_ffmpeg.go +++ b/internal/controllers/streamers/libav_ffmpeg.go @@ -4,6 +4,9 @@ import ( "context" "errors" "fmt" + "reflect" + "runtime" + "strconv" "strings" "time" @@ -51,13 +54,25 @@ func (c *LibAVFFmpegStreamer) Match(req *entities.RequestParams) bool { } type streamContext struct { + // IN inputStream *astiav.Stream decCodec *astiav.Codec decCodecContext *astiav.CodecContext decFrame *astiav.Frame + + // FILTER + filterGraph *astiav.FilterGraph + buffersinkContext *astiav.FilterContext + buffersrcContext *astiav.FilterContext + filterFrame *astiav.Frame + + // OUT + encCodec *astiav.Codec + encCodecContext *astiav.CodecContext + encPkt *astiav.Packet } -type params struct { +type libAVParams struct { inputFormatContext *astiav.FormatContext streams map[int]*streamContext } @@ -68,65 +83,109 @@ func (c *LibAVFFmpegStreamer) Stream(donut *entities.DonutParameters) { closer := astikit.NewCloser() defer closer.Close() - p := ¶ms{ + p := &libAVParams{ streams: make(map[int]*streamContext), } + // it's useful for debugging + astiav.SetLogLevel(astiav.LogLevelDebug) + astiav.SetLogCallback(func(l astiav.LogLevel, fmt, msg, parent string) { + c.l.Infof("ffmpeg %s: - %s", c.libAVLogToString(l), strings.TrimSpace(msg)) + }) + if err := c.prepareInput(p, closer, donut); err != nil { c.onError(err, donut) return } - pkt := astiav.AllocPacket() - closer.Add(pkt.Free) + if err := c.prepareOutput(p, closer, donut); err != nil { + c.onError(err, donut) + return + } + + if err := c.prepareFilters(p, closer, donut); err != nil { + c.onError(err, donut) + return + } + + inPkt := astiav.AllocPacket() + closer.Add(inPkt.Free) for { select { case <-donut.Ctx.Done(): if errors.Is(donut.Ctx.Err(), context.Canceled) { - c.l.Infow("streaming has stopped due cancellation") + c.l.Info("streaming has stopped due cancellation") return } c.onError(donut.Ctx.Err(), donut) return default: - - if err := p.inputFormatContext.ReadFrame(pkt); err != nil { + if err := p.inputFormatContext.ReadFrame(inPkt); err != nil { if errors.Is(err, astiav.ErrEof) { - break + c.l.Info("streaming has ended") + return } c.onError(err, donut) } - s, ok := p.streams[pkt.StreamIndex()] + s, ok := p.streams[inPkt.StreamIndex()] if !ok { + c.l.Warnf("cannot find stream id=%d", inPkt.StreamIndex()) continue } - pkt.RescaleTs(s.inputStream.TimeBase(), s.decCodecContext.TimeBase()) - audioDuration := c.defineAudioDuration(s, pkt) - videoDuration := c.defineVideoDuration(s, pkt) + inPkt.RescaleTs(s.inputStream.TimeBase(), s.decCodecContext.TimeBase()) - if s.inputStream.CodecParameters().MediaType() == astiav.MediaTypeVideo { + isVideo := s.decCodecContext.MediaType() == astiav.MediaTypeVideo + isVideoBypass := donut.Recipe.Video.Action == entities.DonutBypass + if isVideo && isVideoBypass { if donut.OnVideoFrame != nil { - if err := donut.OnVideoFrame(pkt.Data(), entities.MediaFrameContext{ - PTS: int(pkt.Pts()), - DTS: int(pkt.Dts()), - Duration: videoDuration, + if err := donut.OnVideoFrame(inPkt.Data(), entities.MediaFrameContext{ + PTS: int(inPkt.Pts()), + DTS: int(inPkt.Dts()), + Duration: c.defineVideoDuration(s, inPkt), }); err != nil { c.onError(err, donut) return } } + continue } - if s.inputStream.CodecParameters().MediaType() == astiav.MediaTypeAudio { + isAudio := s.decCodecContext.MediaType() == astiav.MediaTypeAudio + isAudioBypass := donut.Recipe.Audio.Action == entities.DonutBypass + if isAudio && isAudioBypass { if donut.OnAudioFrame != nil { - donut.OnAudioFrame(pkt.Data(), entities.MediaFrameContext{ - PTS: int(pkt.Pts()), - DTS: int(pkt.Dts()), - Duration: audioDuration, - }) + if err := donut.OnAudioFrame(inPkt.Data(), entities.MediaFrameContext{ + PTS: int(inPkt.Pts()), + DTS: int(inPkt.Dts()), + Duration: c.defineAudioDuration(s, inPkt), + }); err != nil { + c.onError(err, donut) + return + } + } + continue + } + + if err := s.decCodecContext.SendPacket(inPkt); err != nil { + c.onError(err, donut) + return + } + + for { + if err := s.decCodecContext.ReceiveFrame(s.decFrame); err != nil { + if errors.Is(err, astiav.ErrEof) || errors.Is(err, astiav.ErrEagain) { + break + } + c.onError(err, donut) + return + } + + if err := c.filterAndEncode(s.decFrame, s, donut); err != nil { + c.onError(err, donut) + return } } } @@ -139,19 +198,13 @@ func (c *LibAVFFmpegStreamer) onError(err error, p *entities.DonutParameters) { } } -func (c *LibAVFFmpegStreamer) prepareInput(p *params, closer *astikit.Closer, donut *entities.DonutParameters) error { - // good for debugging - astiav.SetLogLevel(astiav.LogLevelDebug) - astiav.SetLogCallback(func(l astiav.LogLevel, fmt, msg, parent string) { - c.l.Infof("ffmpeg log: %s (level: %d)", strings.TrimSpace(msg), l) - }) - +func (c *LibAVFFmpegStreamer) prepareInput(p *libAVParams, closer *astikit.Closer, donut *entities.DonutParameters) error { if p.inputFormatContext = astiav.AllocFormatContext(); p.inputFormatContext == nil { return errors.New("ffmpeg/libav: input format context is nil") } closer.Add(p.inputFormatContext.Free) - inputFormat, err := c.defineInputFormat(donut.StreamFormat) + inputFormat, err := c.defineInputFormat(donut.Recipe.Input.Format.String()) if err != nil { return err } @@ -159,7 +212,6 @@ func (c *LibAVFFmpegStreamer) prepareInput(p *params, closer *astikit.Closer, do if err := p.inputFormatContext.OpenInput(donut.StreamURL, inputFormat, inputOptions); err != nil { return fmt.Errorf("ffmpeg/libav: opening input failed %w", err) } - closer.Add(p.inputFormatContext.CloseInput) if err := p.inputFormatContext.FindStreamInfo(nil); err != nil { @@ -176,7 +228,7 @@ func (c *LibAVFFmpegStreamer) prepareInput(p *params, closer *astikit.Closer, do s := &streamContext{inputStream: is} if s.decCodec = astiav.FindDecoder(is.CodecParameters().CodecID()); s.decCodec == nil { - return errors.New("ffmpeg/libav: codec is nil") + return errors.New("ffmpeg/libav: codec is missing") } if s.decCodecContext = astiav.AllocCodecContext(s.decCodec); s.decCodecContext == nil { @@ -209,29 +261,326 @@ func (c *LibAVFFmpegStreamer) prepareInput(p *params, closer *astikit.Closer, do return nil } +func functionNameFor(i interface{}) string { + fullName := runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name() + components := strings.Split(fullName, ".") + return components[len(components)-2] +} + +func (c *LibAVFFmpegStreamer) prepareOutput(p *libAVParams, closer *astikit.Closer, donut *entities.DonutParameters) error { + for _, is := range p.inputFormatContext.Streams() { + s, ok := p.streams[is.Index()] + if !ok { + c.l.Infof("skipping stream index = %d", is.Index()) + continue + } + + isVideo := s.decCodecContext.MediaType() == astiav.MediaTypeVideo + isVideoBypass := donut.Recipe.Video.Action == entities.DonutBypass + if isVideo && isVideoBypass { + c.l.Infof("bypass video for %+v", s.inputStream) + continue + } + + isAudio := s.decCodecContext.MediaType() == astiav.MediaTypeAudio + isAudioBypass := donut.Recipe.Audio.Action == entities.DonutBypass + if isAudio && isAudioBypass { + c.l.Infof("bypass audio for %+v", s.inputStream) + continue + } + + var codecID astiav.CodecID + if isAudio { + audioCodecID, err := c.m.FromStreamCodecToLibAVCodecID(donut.Recipe.Audio.Codec) + if err != nil { + return err + } + codecID = audioCodecID + } + if isVideo { + videoCodecID, err := c.m.FromStreamCodecToLibAVCodecID(donut.Recipe.Video.Codec) + if err != nil { + return err + } + codecID = videoCodecID + } + + if s.encCodec = astiav.FindEncoder(codecID); s.encCodec == nil { + // TODO: migrate error to entity + return fmt.Errorf("cannot find a libav encoder for %+v", codecID) + } + + if s.encCodecContext = astiav.AllocCodecContext(s.encCodec); s.encCodecContext == nil { + return errors.New("ffmpeg/libav: codec context is nil") + } + closer.Add(s.encCodecContext.Free) + + if isAudio { + if v := s.encCodec.ChannelLayouts(); len(v) > 0 { + s.encCodecContext.SetChannelLayout(v[0]) + } else { + s.encCodecContext.SetChannelLayout(s.decCodecContext.ChannelLayout()) + } + s.encCodecContext.SetChannels(s.decCodecContext.Channels()) + s.encCodecContext.SetSampleRate(s.decCodecContext.SampleRate()) + if v := s.encCodec.SampleFormats(); len(v) > 0 { + s.encCodecContext.SetSampleFormat(v[0]) + } else { + s.encCodecContext.SetSampleFormat(s.decCodecContext.SampleFormat()) + } + s.encCodecContext.SetTimeBase(s.decCodecContext.TimeBase()) + + // supplying custom config + if len(donut.Recipe.Audio.CodecContextOptions) > 0 { + for _, opt := range donut.Recipe.Audio.CodecContextOptions { + c.l.Infof("overriding av codec context %s", functionNameFor(opt)) + opt(s.encCodecContext) + } + } + } + + if isVideo { + if v := s.encCodec.PixelFormats(); len(v) > 0 { + s.encCodecContext.SetPixelFormat(v[0]) + } else { + s.encCodecContext.SetPixelFormat(s.decCodecContext.PixelFormat()) + } + s.encCodecContext.SetSampleAspectRatio(s.decCodecContext.SampleAspectRatio()) + s.encCodecContext.SetTimeBase(s.decCodecContext.TimeBase()) + s.encCodecContext.SetHeight(s.decCodecContext.Height()) + s.encCodecContext.SetWidth(s.decCodecContext.Width()) + s.encCodecContext.SetFramerate(s.inputStream.AvgFrameRate()) + + // supplying custom config + if len(donut.Recipe.Video.CodecContextOptions) > 0 { + for _, opt := range donut.Recipe.Video.CodecContextOptions { + c.l.Infof("overriding av codec context %s", functionNameFor(opt)) + opt(s.encCodecContext) + } + } + } + + if s.decCodecContext.Flags().Has(astiav.CodecContextFlagGlobalHeader) { + s.encCodecContext.SetFlags(s.encCodecContext.Flags().Add(astiav.CodecContextFlagGlobalHeader)) + } + + if err := s.encCodecContext.Open(s.encCodec, nil); err != nil { + return fmt.Errorf("opening encoder context failed: %w", err) + } + } + return nil +} + +func (c *LibAVFFmpegStreamer) prepareFilters(p *libAVParams, closer *astikit.Closer, donut *entities.DonutParameters) error { + for _, s := range p.streams { + + isVideo := s.decCodecContext.MediaType() == astiav.MediaTypeVideo + isVideoBypass := donut.Recipe.Video.Action == entities.DonutBypass + if isVideo && isVideoBypass { + c.l.Infof("bypass video for %+v", s.inputStream) + continue + } + + isAudio := s.decCodecContext.MediaType() == astiav.MediaTypeAudio + isAudioBypass := donut.Recipe.Audio.Action == entities.DonutBypass + if isAudio && isAudioBypass { + c.l.Infof("bypass audio for %+v", s.inputStream) + continue + } + + var args astiav.FilterArgs + var buffersrc, buffersink *astiav.Filter + var content string + var err error + + if s.filterGraph = astiav.AllocFilterGraph(); s.filterGraph == nil { + return errors.New("main: graph is nil") + } + closer.Add(s.filterGraph.Free) + + outputs := astiav.AllocFilterInOut() + if outputs == nil { + return errors.New("main: outputs is nil") + } + closer.Add(outputs.Free) + + inputs := astiav.AllocFilterInOut() + if inputs == nil { + return errors.New("main: inputs is nil") + } + closer.Add(inputs.Free) + + if s.decCodecContext.MediaType() == astiav.MediaTypeAudio { + // TODO: what's the difference between args and content? + // why args are necessary? + args = astiav.FilterArgs{ + "channel_layout": s.decCodecContext.ChannelLayout().String(), + "sample_fmt": s.decCodecContext.SampleFormat().Name(), + "sample_rate": strconv.Itoa(s.decCodecContext.SampleRate()), + "time_base": s.decCodecContext.TimeBase().String(), + } + buffersrc = astiav.FindFilterByName("abuffer") + buffersink = astiav.FindFilterByName("abuffersink") + content = fmt.Sprintf( + "aresample=%s", // necessary for opus + strconv.Itoa(s.encCodecContext.SampleRate()), + ) + } + + if s.decCodecContext.MediaType() == astiav.MediaTypeVideo { + args = astiav.FilterArgs{ + "pix_fmt": strconv.Itoa(int(s.decCodecContext.PixelFormat())), + "pixel_aspect": s.decCodecContext.SampleAspectRatio().String(), + "time_base": s.decCodecContext.TimeBase().String(), + "video_size": strconv.Itoa(s.decCodecContext.Width()) + "x" + strconv.Itoa(s.decCodecContext.Height()), + } + buffersrc = astiav.FindFilterByName("buffer") + buffersink = astiav.FindFilterByName("buffersink") + content = fmt.Sprintf("format=pix_fmts=%s", s.encCodecContext.PixelFormat().Name()) + } + + if buffersrc == nil { + return errors.New("main: buffersrc is nil") + } + if buffersink == nil { + return errors.New("main: buffersink is nil") + } + + if s.buffersrcContext, err = s.filterGraph.NewFilterContext(buffersrc, "in", args); err != nil { + return fmt.Errorf("main: creating buffersrc context failed: %w", err) + } + if s.buffersinkContext, err = s.filterGraph.NewFilterContext(buffersink, "out", nil); err != nil { + return fmt.Errorf("main: creating buffersink context failed: %w", err) + } + + outputs.SetName("in") + outputs.SetFilterContext(s.buffersrcContext) + outputs.SetPadIdx(0) + outputs.SetNext(nil) + + inputs.SetName("out") + inputs.SetFilterContext(s.buffersinkContext) + inputs.SetPadIdx(0) + inputs.SetNext(nil) + + if err = s.filterGraph.Parse(content, inputs, outputs); err != nil { + return fmt.Errorf("main: parsing filter failed: %w", err) + } + + if err = s.filterGraph.Configure(); err != nil { + return fmt.Errorf("main: configuring filter failed: %w", err) + } + + s.filterFrame = astiav.AllocFrame() + closer.Add(s.filterFrame.Free) + + s.encPkt = astiav.AllocPacket() + closer.Add(s.encPkt.Free) + } + return nil +} + +func (c *LibAVFFmpegStreamer) filterAndEncode(f *astiav.Frame, s *streamContext, donut *entities.DonutParameters) (err error) { + if err = s.buffersrcContext.BuffersrcAddFrame(f, astiav.NewBuffersrcFlags(astiav.BuffersrcFlagKeepRef)); err != nil { + return fmt.Errorf("adding frame failed: %w", err) + } + for { + s.filterFrame.Unref() + + if err = s.buffersinkContext.BuffersinkGetFrame(s.filterFrame, astiav.NewBuffersinkFlags()); err != nil { + if errors.Is(err, astiav.ErrEof) || errors.Is(err, astiav.ErrEagain) { + err = nil + break + } + return fmt.Errorf("getting frame failed: %w", err) + } + // TODO: should we avoid setting the picture type for audio? + s.filterFrame.SetPictureType(astiav.PictureTypeNone) + + if err = c.encodeFrame(s.filterFrame, s, donut); err != nil { + err = fmt.Errorf("main: encoding and writing frame failed: %w", err) + return + } + } + return nil +} + +func (c *LibAVFFmpegStreamer) encodeFrame(f *astiav.Frame, s *streamContext, donut *entities.DonutParameters) (err error) { + s.encPkt.Unref() + + // when converting from aac to opus using filters, the np samples are bigger than the frame size + // to fix the error "more samples than frame size" + f.SetNbSamples(s.encCodecContext.FrameSize()) + + if err = s.encCodecContext.SendFrame(f); err != nil { + return fmt.Errorf("sending frame failed: %w", err) + } + + for { + if err = s.encCodecContext.ReceivePacket(s.encPkt); err != nil { + if errors.Is(err, astiav.ErrEof) || errors.Is(err, astiav.ErrEagain) { + err = nil + break + } + return fmt.Errorf("receiving packet failed: %w", err) + } + + // TODO: check if we need to swap + // pkt.RescaleTs(s.inputStream.TimeBase(), s.decCodecContext.TimeBase()) + s.encPkt.RescaleTs(s.inputStream.TimeBase(), s.encCodecContext.TimeBase()) + + isVideo := s.decCodecContext.MediaType() == astiav.MediaTypeVideo + if isVideo { + if donut.OnVideoFrame != nil { + if err := donut.OnVideoFrame(s.encPkt.Data(), entities.MediaFrameContext{ + PTS: int(s.encPkt.Pts()), + DTS: int(s.encPkt.Dts()), + Duration: c.defineVideoDuration(s, s.encPkt), + }); err != nil { + return err + } + } + } + + isAudio := s.decCodecContext.MediaType() == astiav.MediaTypeAudio + if isAudio { + if donut.OnAudioFrame != nil { + if err := donut.OnAudioFrame(s.encPkt.Data(), entities.MediaFrameContext{ + PTS: int(s.encPkt.Pts()), + DTS: int(s.encPkt.Dts()), + Duration: c.defineAudioDuration(s, s.encPkt), + }); err != nil { + return err + } + } + } + } + + return nil +} + func (c *LibAVFFmpegStreamer) defineInputFormat(streamFormat string) (*astiav.InputFormat, error) { + var inputFormat *astiav.InputFormat if streamFormat != "" { - inputFormat := astiav.FindInputFormat(streamFormat) + inputFormat = astiav.FindInputFormat(streamFormat) if inputFormat == nil { return nil, fmt.Errorf("ffmpeg/libav: could not find %s input format", streamFormat) } } - return nil, nil + return inputFormat, nil } func (c *LibAVFFmpegStreamer) defineInputOptions(p *entities.DonutParameters, closer *astikit.Closer) *astiav.Dictionary { - if strings.Contains(strings.ToLower(p.StreamURL), "srt:") { - d := &astiav.Dictionary{} - closer.Add(d.Free) + var dic *astiav.Dictionary + if len(p.Recipe.Input.Options) > 0 { + dic = &astiav.Dictionary{} + closer.Add(dic.Free) - // ref https://ffmpeg.org/ffmpeg-all.html#srt - // flags (the zeroed 3rd value) https://github.com/FFmpeg/FFmpeg/blob/n5.0/libavutil/dict.h#L67C9-L77 - d.Set("srt_streamid", p.StreamID, 0) - d.Set("smoother", "live", 0) - d.Set("transtype", "live", 0) - return d + for k, v := range p.Recipe.Input.Options { + dic.Set(k.String(), v, 0) + } } - return nil + return dic } func (c *LibAVFFmpegStreamer) defineAudioDuration(s *streamContext, pkt *astiav.Packet) time.Duration { @@ -246,9 +595,10 @@ func (c *LibAVFFmpegStreamer) defineAudioDuration(s *streamContext, pkt *astiav. // ref https://developer.apple.com/documentation/coreaudiotypes/audiostreambasicdescription/1423257-mframesperpacket // TODO: properly handle wraparound / roll over - c.currentAudioFrameSize = float64(pkt.Dts()) - c.lastAudioFrameDTS - if c.currentAudioFrameSize < 0 { - c.currentAudioFrameSize = c.lastAudioFrameDTS*2 - c.lastAudioFrameDTS + // or explore av frame_size https://ffmpeg.org/doxygen/trunk/structAVCodecContext.html#aec57f0d859a6df8b479cd93ca3a44a33 + // and libAV pts roll over + if float64(pkt.Dts())-c.lastAudioFrameDTS > 0 { + c.currentAudioFrameSize = float64(pkt.Dts()) - c.lastAudioFrameDTS } c.lastAudioFrameDTS = float64(pkt.Dts()) @@ -258,7 +608,7 @@ func (c *LibAVFFmpegStreamer) defineAudioDuration(s *streamContext, pkt *astiav. return audioDuration } -func (c *LibAVFFmpegStreamer) defineVideoDuration(s *streamContext, pkt *astiav.Packet) time.Duration { +func (c *LibAVFFmpegStreamer) defineVideoDuration(s *streamContext, _ *astiav.Packet) time.Duration { videoDuration := time.Duration(0) if s.inputStream.CodecParameters().MediaType() == astiav.MediaTypeVideo { // Video @@ -273,3 +623,35 @@ func (c *LibAVFFmpegStreamer) defineVideoDuration(s *streamContext, pkt *astiav. } return videoDuration } + +// TODO: move this either to a mapper or make a PR for astiav +func (*LibAVFFmpegStreamer) libAVLogToString(l astiav.LogLevel) string { + const _Ciconst_AV_LOG_DEBUG = 0x30 + const _Ciconst_AV_LOG_ERROR = 0x10 + const _Ciconst_AV_LOG_FATAL = 0x8 + const _Ciconst_AV_LOG_INFO = 0x20 + const _Ciconst_AV_LOG_PANIC = 0x0 + const _Ciconst_AV_LOG_QUIET = -0x8 + const _Ciconst_AV_LOG_VERBOSE = 0x28 + const _Ciconst_AV_LOG_WARNING = 0x18 + switch l { + case _Ciconst_AV_LOG_WARNING: + return "WARN" + case _Ciconst_AV_LOG_VERBOSE: + return "VERBOSE" + case _Ciconst_AV_LOG_QUIET: + return "QUIET" + case _Ciconst_AV_LOG_PANIC: + return "PANIC" + case _Ciconst_AV_LOG_INFO: + return "INFO" + case _Ciconst_AV_LOG_FATAL: + return "FATAL" + case _Ciconst_AV_LOG_DEBUG: + return "DEBUG" + case _Ciconst_AV_LOG_ERROR: + return "ERROR" + default: + return "UNKNOWN LEVEL" + } +} diff --git a/internal/controllers/webrtc_controller.go b/internal/controllers/webrtc_controller.go index b1d1010..65cc681 100644 --- a/internal/controllers/webrtc_controller.go +++ b/internal/controllers/webrtc_controller.go @@ -74,8 +74,8 @@ func (c *WebRTCController) CreatePeerConnection(cancel context.CancelFunc) (*web return peerConnection, nil } -func (c *WebRTCController) CreateTrack(peer *webrtc.PeerConnection, track entities.Stream, id string, streamId string) (*webrtc.TrackLocalStaticSample, error) { - codecCapability := c.m.FromTrackToRTPCodecCapability(track) +func (c *WebRTCController) CreateTrack(peer *webrtc.PeerConnection, codec entities.Codec, id string, streamId string) (*webrtc.TrackLocalStaticSample, error) { + codecCapability := c.m.FromTrackToRTPCodecCapability(codec) webRTCtrack, err := webrtc.NewTrackLocalStaticSample(codecCapability, id, streamId) if err != nil { return nil, err diff --git a/internal/entities/entities.go b/internal/entities/entities.go index 85668d1..e9d435b 100644 --- a/internal/entities/entities.go +++ b/internal/entities/entities.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/asticode/go-astiav" "github.com/pion/webrtc/v3" ) @@ -127,12 +128,9 @@ type DonutParameters struct { Cancel context.CancelFunc Ctx context.Context - StreamID string // ie: live001, channel01 - StreamFormat string // ie: flv, mpegts - StreamURL string // ie: srt://host:9080, rtmp://host:4991 + StreamURL string // ie: srt://host:9080, rtmp://host:4991 - TranscodeVideoCodec Codec // ie: vp8 - TranscodeAudioCodec Codec // ie: opus + Recipe DonutRecipe OnClose func() OnError func(err error) @@ -141,6 +139,92 @@ type DonutParameters struct { OnAudioFrame func(data []byte, c MediaFrameContext) error } +type DonutMediaTaskAction string + +var DonutTranscode DonutMediaTaskAction = "transcode" +var DonutBypass DonutMediaTaskAction = "bypass" + +// TODO: split entities per domain or files avoiding name collision. + +// DonutMediaTask is a transformation template to apply over a media. +type DonutMediaTask struct { + // Action the action that needs to be performed + Action DonutMediaTaskAction + // Codec is the main codec, it might be used depending on the action. + Codec Codec + // CodecContextOptions is a list of options to be applied on codec context. + // If no value is provided ffmpeg will use defaults. + // For instance, if one does not provide bit rate, it'll fallback to 64000 bps (opus) + CodecContextOptions []LibAVOptionsCodecContext +} + +type DonutInputOptionKey string + +func (d DonutInputOptionKey) String() string { + return string(d) +} + +var DonutSRTStreamID DonutInputOptionKey = "srt_streamid" +var DonutSRTsmoother DonutInputOptionKey = "smoother" +var DonutSRTTranstype DonutInputOptionKey = "transtype" + +type DonutInputFormat string + +func (d DonutInputFormat) String() string { + return string(d) +} + +var DonutMpegTSFormat DonutInputFormat = "mpegts" +var DonutFLVFormat DonutInputFormat = "flv" + +type DonutInput struct { + Format DonutInputFormat + Options map[DonutInputOptionKey]string +} + +type DonutRecipe struct { + Input DonutInput + Video DonutMediaTask + Audio DonutMediaTask +} + +type LibAVOptionsCodecContext func(c *astiav.CodecContext) + +func SetSampleRate(sampleRate int) LibAVOptionsCodecContext { + return func(c *astiav.CodecContext) { + c.SetSampleRate(sampleRate) + } +} + +func SetTimeBase(num, den int) LibAVOptionsCodecContext { + return func(c *astiav.CodecContext) { + c.SetTimeBase(astiav.NewRational(num, den)) + } +} + +// SetSampleFormat sets sample format, +// CAUTION it only contains partial list of fmt +// TODO: move it to mappers +func SetSampleFormat(fmt string) LibAVOptionsCodecContext { + var sf astiav.SampleFormat + if fmt == "fltp" { + sf = astiav.SampleFormatFltp + } else if fmt == "flt" { + sf = astiav.SampleFormatFlt + } else { + // DANGER: assuming a default value + sf = astiav.SampleFormatS16 + } + return func(c *astiav.CodecContext) { + c.SetSampleFormat(sf) + } +} + +// TODO: implement proper matching +// DonutTransformRecipe +// AudioTask: {Action: Transcode, From: AAC, To: Opus} +// VideoTask: {Action: Bypass, From: H264, To: H264} + type Config struct { HTTPPort int32 `required:"true" default:"8080"` HTTPHost string `required:"true" default:"0.0.0.0"` diff --git a/internal/mapper/mapper.go b/internal/mapper/mapper.go index d50854e..2e194db 100644 --- a/internal/mapper/mapper.go +++ b/internal/mapper/mapper.go @@ -1,6 +1,7 @@ package mapper import ( + "fmt" "strings" "github.com/asticode/go-astiav" @@ -19,15 +20,18 @@ func NewMapper(l *zap.SugaredLogger) *Mapper { return &Mapper{l: l} } -func (m *Mapper) FromTrackToRTPCodecCapability(track entities.Stream) webrtc.RTPCodecCapability { +func (m *Mapper) FromTrackToRTPCodecCapability(codec entities.Codec) webrtc.RTPCodecCapability { + // TODO: enrich codec capability, check if it's necessary response := webrtc.RTPCodecCapability{} - if track.Codec == entities.H264 { + if codec == entities.H264 { response.MimeType = webrtc.MimeTypeH264 - } else if track.Codec == entities.H265 { + } else if codec == entities.H265 { response.MimeType = webrtc.MimeTypeH265 + } else if codec == entities.Opus { + response.MimeType = webrtc.MimeTypeOpus } else { - m.l.Info("[[[[TODO: mapper not implemented]]]] for ", track) + m.l.Info("[[[[TODO: mapper not implemented]]]] for ", codec) } return response @@ -203,3 +207,22 @@ func (m *Mapper) FromLibAVStreamToEntityStream(libavStream *astiav.Stream) entit return st } + +func (m *Mapper) FromStreamCodecToLibAVCodecID(codec entities.Codec) (astiav.CodecID, error) { + if codec == entities.H264 { + return astiav.CodecIDH264, nil + } else if codec == entities.H265 { + return astiav.CodecIDHevc, nil + } else if codec == entities.Opus { + return astiav.CodecIDOpus, nil + } else if codec == entities.VP8 { + return astiav.CodecIDVp8, nil + } else if codec == entities.VP9 { + return astiav.CodecIDVp9, nil + } else if codec == entities.AAC { + return astiav.CodecIDAac, nil + } + + // TODO: port error to entities + return astiav.CodecIDH264, fmt.Errorf("cannot find a libav codec id for donut codec id %+v", codec) +} diff --git a/internal/web/handlers/signaling.go b/internal/web/handlers/signaling.go index eecea17..fb31e0f 100644 --- a/internal/web/handlers/signaling.go +++ b/internal/web/handlers/signaling.go @@ -39,7 +39,7 @@ func NewSignalingHandler( } func (h *SignalingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) error { - params, err := h.createAndValidateParams(w, r) + params, err := h.createAndValidateParams(r) if err != nil { return err } @@ -70,39 +70,21 @@ func (h *SignalingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) err return err } - // TODO: introduce a mode to deal with transcoding recipes - // selects proper media that client and server has adverted. - // donutEngine preferable vp8, ogg??? - // From: [] To: [] or Transcode:[], Bypass: [] - // libav_streamer.go, libav_streamer_format.go, libav_streamer_codec.go... - // reads from Server (input) and generates h264 raw, and ogg and send it with timing attributes - compatibleStreams, ok := donutEngine.CompatibleStreamsFor(serverStreamInfo, clientStreamInfo) - if !ok { - h.l.Info("we must transcode") - } - - if len(compatibleStreams) == 0 { + donutRecipe := donutEngine.RecipeFor(¶ms, serverStreamInfo, clientStreamInfo) + if donutRecipe == nil { return entities.ErrMissingCompatibleStreams } var videoTrack *webrtc.TrackLocalStaticSample - // var audioTrack *webrtc.TrackLocalStaticSample + videoTrack, err = h.webRTCController.CreateTrack(peer, donutRecipe.Video.Codec, string(entities.VideoType), params.SRTStreamID) + if err != nil { + return err + } - for _, st := range compatibleStreams { - // TODO: make the mapping less dependent on type - if st.Type == entities.VideoType { - videoTrack, err = h.webRTCController.CreateTrack( - peer, - st, - string(st.Type), // "video" or "audio" - params.SRTStreamID, - ) - if err != nil { - return err - } - - } - // if st.Type == entities.AudioType { + var audioTrack *webrtc.TrackLocalStaticSample + audioTrack, err = h.webRTCController.CreateTrack(peer, donutRecipe.Audio.Codec, string(entities.AudioType), params.SRTStreamID) + if err != nil { + return err } metadataSender, err := h.webRTCController.CreateDataChannel(peer, entities.MetadataChannelID) @@ -123,11 +105,9 @@ func (h *SignalingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) err Cancel: cancel, Ctx: ctx, - // TODO: add an UI element for the sub-type (format) when input is srt:// - // We're assuming that SRT is carrying mpegts. - StreamFormat: "mpegts", - StreamID: params.SRTStreamID, - StreamURL: fmt.Sprintf("srt://%s:%d", params.SRTHost, params.SRTPort), + Recipe: *donutRecipe, + + StreamURL: fmt.Sprintf("srt://%s:%d", params.SRTHost, params.SRTPort), OnClose: func() { cancel() @@ -146,7 +126,8 @@ func (h *SignalingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) err }, OnAudioFrame: func(data []byte, c entities.MediaFrameContext) error { // TODO: implement - return nil + // audioTrack + return h.webRTCController.SendVideoSample(audioTrack, data, c) }, }) @@ -161,7 +142,7 @@ func (h *SignalingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) err return nil } -func (h *SignalingHandler) createAndValidateParams(w http.ResponseWriter, r *http.Request) (entities.RequestParams, error) { +func (h *SignalingHandler) createAndValidateParams(r *http.Request) (entities.RequestParams, error) { if r.Method != http.MethodPost { return entities.RequestParams{}, entities.ErrHTTPPostOnly } diff --git a/static/demo.js b/static/demo.js index c75e3f9..dbc489d 100644 --- a/static/demo.js +++ b/static/demo.js @@ -52,6 +52,10 @@ const setupWebRTC = (setRemoteSDPfn) => { // with auto play. pc.ontrack = function (event) { log("ontrack : " + event.track.kind + " label " + event.track.label); + // it only creates a video tag element + if (event.track.kind !== "video") { + return + } const el = document.createElement(event.track.kind); el.srcObject = event.streams[0];