add general refactoring

This commit is contained in:
Leandro Moreira
2024-03-04 20:32:16 -03:00
parent fef3ae17c7
commit d70b35c30c
11 changed files with 106 additions and 204 deletions

2
go.mod generated
View File

@@ -5,8 +5,6 @@ go 1.19
require (
github.com/asticode/go-astiav v0.12.0
github.com/asticode/go-astikit v0.36.0
github.com/asticode/go-astisrt v0.3.0
github.com/asticode/go-astits v1.11.0
github.com/kelseyhightower/envconfig v1.4.0
github.com/pion/webrtc/v3 v3.1.47
github.com/stretchr/testify v1.8.0

7
go.sum generated
View File

@@ -1,12 +1,7 @@
github.com/asticode/go-astiav v0.12.0 h1:tETfPhVpJrSyh3zvUOmDvebFaCoFpeATSaQAA7B50J8=
github.com/asticode/go-astiav v0.12.0/go.mod h1:phvUnSSlV91S/PELeLkDisYiRLOssxWOsj4oDrqM/54=
github.com/asticode/go-astikit v0.30.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0=
github.com/asticode/go-astikit v0.36.0 h1:WHSY88YT76D/XRbdp0lMLwfjyUGw8dygnbKKtbGNIG8=
github.com/asticode/go-astikit v0.36.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0=
github.com/asticode/go-astisrt v0.3.0 h1:LpvqOc17qfMr2suLZPzMs9wYLozxXYu/PE9CA1tH88c=
github.com/asticode/go-astisrt v0.3.0/go.mod h1:tP5Dx+MXyaICUeF0gz4nwyav3RDI609e0en3QQkrxKE=
github.com/asticode/go-astits v1.11.0 h1:GTHUXht0ZXAJXsVbsLIcyfHr1Bchi4QQwMARw2ZWAng=
github.com/asticode/go-astits v1.11.0/go.mod h1:QSHmknZ51pf6KJdHKZHJTLlMegIrhega3LPWz3ND/iI=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
@@ -88,14 +83,12 @@ github.com/pion/udp v0.1.1/go.mod h1:6AFo+CMdKQm7UiA0eUPA8/eVCTx8jBIITLZHc9DWX5M
github.com/pion/webrtc/v3 v3.1.47 h1:2dFEKRI1rzFvehXDq43hK9OGGyTGJSusUi3j6QKHC5s=
github.com/pion/webrtc/v3 v3.1.47/go.mod h1:8U39MYZCLVV4sIBn01htASVNkWQN2zDa/rx5xisEXWs=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/profile v1.4.0/go.mod h1:NWz/XGvpEW1FyYQ7fCx4dqYBLlfTcE+A9FLAkNKqjFE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZKJ48R1S7H23Ji7oFO5Bw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=

View File

@@ -6,19 +6,22 @@ import (
"github.com/flavioribeiro/donut/internal/controllers/probers"
"github.com/flavioribeiro/donut/internal/controllers/streamers"
"github.com/flavioribeiro/donut/internal/entities"
"github.com/flavioribeiro/donut/internal/mapper"
"go.uber.org/fx"
)
type DonutEngine interface {
Prober() probers.DonutProber
Streamer() streamers.DonutStreamer
ServerIngredients(req *entities.RequestParams) (*entities.StreamInfo, error)
ClientIngredients(req *entities.RequestParams) (*entities.StreamInfo, error)
RecipeFor(req *entities.RequestParams, server, client *entities.StreamInfo) *entities.DonutRecipe
Serve(p *entities.DonutParameters)
}
type DonutEngineParams struct {
fx.In
Streamers []streamers.DonutStreamer `group:"streamers"`
Probers []probers.DonutProber `group:"probers"`
Mapper *mapper.Mapper
}
type DonutEngineController struct {
@@ -36,13 +39,14 @@ func (c *DonutEngineController) EngineFor(req *entities.RequestParams) (DonutEng
}
streamer := c.selectStreamerFor(req)
if prober == nil {
if streamer == nil {
return nil, fmt.Errorf("request %v: not fulfilled error %w", req, entities.ErrMissingStreamer)
}
return &donutEngine{
prober: prober,
streamer: streamer,
mapper: c.p.Mapper,
}, nil
}
@@ -69,18 +73,31 @@ func (c *DonutEngineController) selectStreamerFor(req *entities.RequestParams) s
type donutEngine struct {
prober probers.DonutProber
streamer streamers.DonutStreamer
mapper *mapper.Mapper
}
func (d *donutEngine) Prober() probers.DonutProber {
return d.prober
func (d *donutEngine) ServerIngredients(req *entities.RequestParams) (*entities.StreamInfo, error) {
return d.prober.StreamInfo(req)
}
func (d *donutEngine) Streamer() streamers.DonutStreamer {
return d.streamer
func (d *donutEngine) ClientIngredients(req *entities.RequestParams) (*entities.StreamInfo, error) {
return d.mapper.FromWebRTCSessionDescriptionToStreamInfo(req.Offer)
}
func (d *donutEngine) Serve(p *entities.DonutParameters) {
d.streamer.Stream(p)
}
func (d *donutEngine) RecipeFor(req *entities.RequestParams, server, client *entities.StreamInfo) *entities.DonutRecipe {
// TODO: implement proper matching
//
// suggestions:
// if client.medias.contains(server.media)
// bypass, server.media
// else
// preferable = [vp8, opus]
// if union(preferable, client.medias)
// transcode, preferable
r := &entities.DonutRecipe{
Input: entities.DonutInput{
Format: "mpegts", // it'll change based on input, i.e. rmtp flv

View File

@@ -53,7 +53,7 @@ func (c *LibAVFFmpeg) StreamInfo(req *entities.RequestParams) (*entities.StreamI
}
closer.Add(inputFormatContext.Free)
// TODO: add an UI element for sub-type (format) when input is srt:// (defaulting to mpeg-ts)
// TODO: implement proper handler per req
userProvidedInputFormat := "mpegts"
// We're assuming that SRT is carrying mpegts.
//

View File

@@ -255,7 +255,10 @@ func (c *LibAVFFmpegStreamer) prepareInput(p *libAVParams, closer *astikit.Close
if donut.OnStream != nil {
stream := c.m.FromLibAVStreamToEntityStream(is)
donut.OnStream(&stream)
err := donut.OnStream(&stream)
if err != nil {
return err
}
}
}
return nil
@@ -586,11 +589,12 @@ func (c *LibAVFFmpegStreamer) defineInputOptions(p *entities.DonutParameters, cl
func (c *LibAVFFmpegStreamer) defineAudioDuration(s *streamContext, pkt *astiav.Packet) time.Duration {
audioDuration := time.Duration(0)
if s.inputStream.CodecParameters().MediaType() == astiav.MediaTypeAudio {
// Audio
//
// dur = 0,023219954648526078
// sample = 44100
// frameSize = 1024 (or 960 for aac, but it could be variable for opus)
// dur = 12.416666ms
// sample = 48000
// frameSize = 596 (it can be variable for opus)
// 1s = dur * (sample/frameSize)
// ref https://developer.apple.com/documentation/coreaudiotypes/audiostreambasicdescription/1423257-mframesperpacket
@@ -602,7 +606,7 @@ func (c *LibAVFFmpegStreamer) defineAudioDuration(s *streamContext, pkt *astiav.
}
c.lastAudioFrameDTS = float64(pkt.Dts())
sampleRate := float64(s.inputStream.CodecParameters().SampleRate())
sampleRate := float64(s.encCodecContext.SampleRate())
audioDuration = time.Duration((c.currentAudioFrameSize / sampleRate) * float64(time.Second))
}
return audioDuration

View File

@@ -3,7 +3,6 @@ package streammiddlewares
import (
"encoding/json"
"github.com/asticode/go-astits"
"github.com/flavioribeiro/donut/internal/controllers"
"github.com/flavioribeiro/donut/internal/entities"
gocaption "github.com/szatmary/gocaption"
@@ -17,8 +16,8 @@ func newEIA608Reader() (r *eia608Reader) {
return &eia608Reader{}
}
func (r *eia608Reader) parse(PES *astits.PESData) (string, error) {
nalus, err := controllers.ParseNALUs(PES.Data)
func (r *eia608Reader) parse(data []byte) (string, error) {
nalus, err := controllers.ParseNALUs(data)
if err != nil {
return "", err
}
@@ -49,9 +48,9 @@ func (r *eia608Reader) parse(PES *astits.PESData) (string, error) {
}
// TODO: port to mappers
func (r *eia608Reader) buildCaptionsMessage(pts *astits.ClockReference, captions string) (string, error) {
func (r *eia608Reader) buildCaptionsMessage(pts int64, captions string) (string, error) {
cue := entities.Cue{
StartTime: pts.Base,
StartTime: pts,
Text: captions,
Type: "captions",
}

View File

@@ -1,87 +0,0 @@
package streammiddlewares
// import (
// "encoding/json"
// "github.com/asticode/go-astits"
// "github.com/flavioribeiro/donut/internal/entities"
// "github.com/flavioribeiro/donut/internal/mapper"
// "go.uber.org/fx"
// )
// type eia608Middleware struct{}
// type EIA608Response struct {
// fx.Out
// EIA608Middleware entities.StreamMiddleware `group:"middlewares"`
// }
// // TODO: migrate to donutparameters.onEvent api
// // NewEIA608 creates a new EIA608 middleware
// func NewEIA608() EIA608Response {
// return EIA608Response{
// EIA608Middleware: &eia608Middleware{},
// }
// }
// // Act parses and send eia608 data from mpeg-ts to metadata channel
// func (*eia608Middleware) Act(mpegTSDemuxData *astits.DemuxerData, sp *entities.StreamParameters) error {
// vs := sp.ServerStreamInfo.VideoStreams()
// eia608Reader := newEIA608Reader()
// for _, v := range vs {
// if mpegTSDemuxData.PES != nil && v.Codec == entities.H264 {
// captions, err := eia608Reader.parse(mpegTSDemuxData.PES)
// if err != nil {
// return err
// }
// if captions != "" {
// captionsMsg, err := eia608Reader.buildCaptionsMessage(mpegTSDemuxData.PES.Header.OptionalHeader.PTS, captions)
// if err != nil {
// return err
// }
// sp.MetadataTrack.SendText(captionsMsg)
// }
// }
// }
// return nil
// }
// type streamInfoMiddleware struct {
// m *mapper.Mapper
// }
// type StreamInfoResponse struct {
// fx.Out
// StreamInfoMiddleware entities.StreamMiddleware `group:"middlewares"`
// }
// // NewStreamInfo creates a new StreamInfo middleware
// func NewStreamInfo(m *mapper.Mapper) StreamInfoResponse {
// return StreamInfoResponse{
// StreamInfoMiddleware: &streamInfoMiddleware{m: m},
// }
// }
// // Act parses and send StreamInfo data from mpeg-ts to metadata channel
// func (s *streamInfoMiddleware) Act(mpegTSDemuxData *astits.DemuxerData, sp *entities.StreamParameters) error {
// var streams []entities.Stream
// // TODO: check if it makes sense to move this code to a mapper
// if mpegTSDemuxData.PMT != nil {
// for _, es := range mpegTSDemuxData.PMT.ElementaryStreams {
// streams = append(streams, s.m.FromStreamTypeToEntityStream(es))
// }
// }
// msgs := s.m.FromStreamInfoToEntityMessages(&entities.StreamInfo{Streams: streams})
// for _, m := range msgs {
// msg, err := json.Marshal(m)
// if err != nil {
// return err
// }
// sp.MetadataTrack.SendText(string(msg))
// }
// return nil
// }

View File

@@ -33,6 +33,47 @@ func NewWebRTCController(
}
}
func (c *WebRTCController) Setup(cancel context.CancelFunc, donutRecipe *entities.DonutRecipe, params entities.RequestParams) (*entities.WebRTCSetupResponse, error) {
response := &entities.WebRTCSetupResponse{}
peer, err := c.CreatePeerConnection(cancel)
if err != nil {
return nil, err
}
response.Connection = peer
var videoTrack *webrtc.TrackLocalStaticSample
videoTrack, err = c.CreateTrack(peer, donutRecipe.Video.Codec, string(entities.VideoType), params.SRTStreamID)
if err != nil {
return nil, err
}
response.Video = videoTrack
var audioTrack *webrtc.TrackLocalStaticSample
audioTrack, err = c.CreateTrack(peer, donutRecipe.Audio.Codec, string(entities.AudioType), params.SRTStreamID)
if err != nil {
return nil, err
}
response.Audio = audioTrack
metadataSender, err := c.CreateDataChannel(peer, entities.MetadataChannelID)
if err != nil {
return nil, err
}
response.Data = metadataSender
if err = c.SetRemoteDescription(peer, params.Offer); err != nil {
return nil, err
}
localDescription, err := c.GatheringWebRTC(peer)
if err != nil {
return nil, err
}
response.LocalSDP = localDescription
return response, nil
}
func (c *WebRTCController) CreatePeerConnection(cancel context.CancelFunc) (*webrtc.PeerConnection, error) {
c.l.Infow("trying to set up web rtc conn")
@@ -104,7 +145,6 @@ func (c *WebRTCController) SetRemoteDescription(peer *webrtc.PeerConnection, des
}
func (c *WebRTCController) GatheringWebRTC(peer *webrtc.PeerConnection) (*webrtc.SessionDescription, error) {
c.l.Infow("Gathering WebRTC Candidates")
gatherComplete := webrtc.GatheringCompletePromise(peer)
answer, err := peer.CreateAnswer(nil)
@@ -120,8 +160,8 @@ func (c *WebRTCController) GatheringWebRTC(peer *webrtc.PeerConnection) (*webrtc
return peer.LocalDescription(), nil
}
func (c *WebRTCController) SendVideoSample(videoTrack *webrtc.TrackLocalStaticSample, data []byte, mediaCtx entities.MediaFrameContext) error {
if err := videoTrack.WriteSample(media.Sample{Data: data, Duration: mediaCtx.Duration}); err != nil {
func (c *WebRTCController) SendMediaSample(mediaTrack *webrtc.TrackLocalStaticSample, data []byte, mediaCtx entities.MediaFrameContext) error {
if err := mediaTrack.WriteSample(media.Sample{Data: data, Duration: mediaCtx.Duration}); err != nil {
return err
}
return nil

View File

@@ -13,6 +13,14 @@ const (
MetadataChannelID string = "metadata"
)
type WebRTCSetupResponse struct {
Connection *webrtc.PeerConnection
Video *webrtc.TrackLocalStaticSample
Audio *webrtc.TrackLocalStaticSample
Data *webrtc.DataChannel
LocalSDP *webrtc.SessionDescription
}
type RequestParams struct {
SRTHost string
SRTPort uint16 `json:",string"`
@@ -134,7 +142,7 @@ type DonutParameters struct {
OnClose func()
OnError func(err error)
OnStream func(st *Stream)
OnStream func(st *Stream) error
OnVideoFrame func(data []byte, c MediaFrameContext) error
OnAudioFrame func(data []byte, c MediaFrameContext) error
}

View File

@@ -5,7 +5,6 @@ import (
"strings"
"github.com/asticode/go-astiav"
"github.com/asticode/go-astits"
"github.com/flavioribeiro/donut/internal/entities"
"github.com/pion/webrtc/v3"
"go.uber.org/zap"
@@ -37,43 +36,6 @@ func (m *Mapper) FromTrackToRTPCodecCapability(codec entities.Codec) webrtc.RTPC
return response
}
func (m *Mapper) FromMpegTsStreamTypeToCodec(st astits.StreamType) entities.Codec {
if st == astits.StreamTypeH264Video {
return entities.H264
}
if st == astits.StreamTypeH265Video {
return entities.H265
}
if st == astits.StreamTypeAACAudio {
return entities.AAC
}
m.l.Info("[[[[TODO: mapper not implemented]]]] for ", st)
return entities.UnknownCodec
}
func (m *Mapper) FromMpegTsStreamTypeToType(st astits.StreamType) entities.MediaType {
if st.IsVideo() {
return entities.VideoType
}
if st.IsAudio() {
return entities.AudioType
}
m.l.Info("[[[[TODO: mapper not implemented]]]] for ", st)
return entities.UnknownType
}
func (m *Mapper) FromStreamTypeToEntityStream(es *astits.PMTElementaryStream) entities.Stream {
return entities.Stream{
Codec: m.FromMpegTsStreamTypeToCodec(es.StreamType),
Type: m.FromMpegTsStreamTypeToType(es.StreamType),
Id: m.FromMpegTsStreamTypeToID(es),
}
}
func (m *Mapper) FromMpegTsStreamTypeToID(es *astits.PMTElementaryStream) uint16 {
return es.ElementaryPID
}
func (m *Mapper) FromWebRTCSessionDescriptionToStreamInfo(desc webrtc.SessionDescription) (*entities.StreamInfo, error) {
sdpDesc, err := desc.Unmarshal()
if err != nil {

View File

@@ -10,7 +10,6 @@ import (
"github.com/flavioribeiro/donut/internal/controllers/engine"
"github.com/flavioribeiro/donut/internal/entities"
"github.com/flavioribeiro/donut/internal/mapper"
"github.com/pion/webrtc/v3"
"go.uber.org/zap"
)
@@ -44,28 +43,18 @@ func (h *SignalingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) err
return err
}
// It decides which prober and streamer should be used based on the parameters (server-side protocol).
donutEngine, err := h.donut.EngineFor(&params)
if err != nil {
return err
}
// real stream info from server
serverStreamInfo, err := donutEngine.Prober().StreamInfo(&params)
// server side media info
serverStreamInfo, err := donutEngine.ServerIngredients(&params)
if err != nil {
return err
}
// client stream info support from the client (browser)
// TODO: evaluate to move this code either inside webrtc or to a prober
clientStreamInfo, err := h.mapper.FromWebRTCSessionDescriptionToStreamInfo(params.Offer)
if err != nil {
return err
}
ctx, cancel := context.WithCancel(context.Background())
peer, err := h.webRTCController.CreatePeerConnection(cancel)
// client side media support
clientStreamInfo, err := donutEngine.ClientIngredients(&params)
if err != nil {
return err
}
@@ -75,33 +64,15 @@ func (h *SignalingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) err
return entities.ErrMissingCompatibleStreams
}
var videoTrack *webrtc.TrackLocalStaticSample
videoTrack, err = h.webRTCController.CreateTrack(peer, donutRecipe.Video.Codec, string(entities.VideoType), params.SRTStreamID)
// We can't defer calling cancel here because it'll live alongside the stream.
ctx, cancel := context.WithCancel(context.Background())
webRTCResponse, err := h.webRTCController.Setup(cancel, donutRecipe, params)
if err != nil {
cancel()
return err
}
var audioTrack *webrtc.TrackLocalStaticSample
audioTrack, err = h.webRTCController.CreateTrack(peer, donutRecipe.Audio.Codec, string(entities.AudioType), params.SRTStreamID)
if err != nil {
return err
}
metadataSender, err := h.webRTCController.CreateDataChannel(peer, entities.MetadataChannelID)
if err != nil {
return err
}
if err = h.webRTCController.SetRemoteDescription(peer, params.Offer); err != nil {
return err
}
localDescription, err := h.webRTCController.GatheringWebRTC(peer)
if err != nil {
return err
}
go donutEngine.Streamer().Stream(&entities.DonutParameters{
go donutEngine.Serve(&entities.DonutParameters{
Cancel: cancel,
Ctx: ctx,
@@ -111,31 +82,28 @@ func (h *SignalingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) err
OnClose: func() {
cancel()
peer.Close()
webRTCResponse.Connection.Close()
},
OnError: func(err error) {
h.l.Errorw("error while streaming", "error", err)
},
OnStream: func(st *entities.Stream) {
if err := h.webRTCController.SendMetadata(metadataSender, st); err != nil {
h.l.Errorw("error while sending metadata", "error", err)
}
OnStream: func(st *entities.Stream) error {
return h.webRTCController.SendMetadata(webRTCResponse.Data, st)
},
OnVideoFrame: func(data []byte, c entities.MediaFrameContext) error {
return h.webRTCController.SendVideoSample(videoTrack, data, c)
return h.webRTCController.SendMediaSample(webRTCResponse.Video, data, c)
},
OnAudioFrame: func(data []byte, c entities.MediaFrameContext) error {
// TODO: implement
// audioTrack
return h.webRTCController.SendVideoSample(audioTrack, data, c)
return h.webRTCController.SendMediaSample(webRTCResponse.Audio, data, c)
},
})
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
err = json.NewEncoder(w).Encode(*localDescription)
err = json.NewEncoder(w).Encode(*webRTCResponse.LocalSDP)
if err != nil {
cancel()
return err
}