diff --git a/.gitignore b/.gitignore index cfb9be3..991e4fb 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ donut # Test binary, built with `go test -c` *.test +coverage.out # Output of the go coverage tool, specifically when used with LiteIDE *.out diff --git a/Makefile b/Makefile index f6b3ce2..b094581 100644 --- a/Makefile +++ b/Makefile @@ -1,9 +1,15 @@ run: - docker-compose stop && docker-compose down && docker-compose build && docker-compose up + docker-compose stop && docker-compose down && docker-compose build && docker-compose up origin srt app test: docker compose stop test && docker compose down test && docker compose run --rm test +test-local: + ./scripts/local_run_test.sh + +html-local-coverage: + go tool cover -html=coverage.out + lint: docker compose stop lint && docker compose down lint && docker compose run --rm lint diff --git a/doc/DEV_LOG.md b/doc/DEV_LOG.md index 78bd2ae..b927b32 100644 --- a/doc/DEV_LOG.md +++ b/doc/DEV_LOG.md @@ -23,6 +23,7 @@ refs: * mpegts example https://github.com/wakabayashik/mpegts-to-webrtc/blob/main/main.go * binding go https://github.com/asticode/go-astiav * network use https://github.com/asticode/go-astiav/issues/7 +* srt live https://github.com/Haivision/srt/blob/master/docs/features/live-streaming.md # Moving player to static diff --git a/docker-compose.yaml b/docker-compose.yaml index 8a0f274..8a79d96 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -20,7 +20,7 @@ services: working_dir: "/app" volumes: - "./:/app/" - command: "go test ./..." + command: "go test -v ./..." lint: build: diff --git a/go.mod b/go.mod index 7e9baac..5ac26ee 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/pion/webrtc/v3 v3.1.47 github.com/szatmary/gocaption v0.0.0-20220607192049-fdd59655f0c3 go.uber.org/fx v1.20.1 + go.uber.org/zap v1.23.0 ) require ( @@ -33,7 +34,6 @@ require ( go.uber.org/atomic v1.7.0 // indirect go.uber.org/dig v1.17.0 // indirect go.uber.org/multierr v1.6.0 // indirect - go.uber.org/zap v1.23.0 // indirect golang.org/x/crypto v0.2.0 // indirect golang.org/x/net v0.2.0 // indirect golang.org/x/sys v0.2.0 // indirect diff --git a/internal/controllers/probers/interface.go b/internal/controllers/probers/interface.go new file mode 100644 index 0000000..5601dc2 --- /dev/null +++ b/internal/controllers/probers/interface.go @@ -0,0 +1,7 @@ +package probers + +import "github.com/flavioribeiro/donut/internal/entities" + +type Prober interface { + StreamInfo(req *entities.RequestParams) (map[entities.Codec]entities.Stream, error) +} diff --git a/internal/controllers/probers/srt_mpets.go b/internal/controllers/probers/srt_mpets.go new file mode 100644 index 0000000..da6073a --- /dev/null +++ b/internal/controllers/probers/srt_mpets.go @@ -0,0 +1,101 @@ +package probers + +import ( + "context" + "errors" + "io" + + "github.com/asticode/go-astits" + "github.com/flavioribeiro/donut/internal/controllers" + "github.com/flavioribeiro/donut/internal/entities" + "github.com/flavioribeiro/donut/internal/mapper" + "go.uber.org/zap" +) + +type SrtMpegTs struct { + c *entities.Config + l *zap.SugaredLogger + srtController *controllers.SRTController +} + +func NewSrtMpegTs(c *entities.Config, l *zap.SugaredLogger, srtController *controllers.SRTController) *SrtMpegTs { + return &SrtMpegTs{ + c: c, + l: l, + srtController: srtController, + } +} + +func (c *SrtMpegTs) StreamInfo(req *entities.RequestParams) (map[entities.Codec]entities.Stream, error) { + r, w := io.Pipe() + defer r.Close() + defer w.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + srtConnection, err := c.srtController.Connect(cancel, req) + if err != nil { + return nil, err + } + defer srtConnection.Close() + + streamInfoMap := map[entities.Codec]entities.Stream{} + inboundMpegTsPacket := make([]byte, c.c.SRTReadBufferSizeBytes) + + probingSize := 120 + // probing mpeg-ts for N packets to find metadata + c.l.Infow("probing has started") + go func() { + for i := 1; i < probingSize; i++ { + n, err := srtConnection.Read(inboundMpegTsPacket) + if err != nil { + c.l.Errorw("str conn failed to write data to buffer", + "error", err, + ) + break + } + + if _, err := w.Write(inboundMpegTsPacket[:n]); err != nil { + c.l.Errorw("failed to write mpeg-ts into the pipe", + "error", err, + ) + break + } + } + c.l.Info("done probing") + cancel() + }() + c.l.Info("probing has starting demuxing") + + mpegTSDemuxer := astits.NewDemuxer(ctx, r) + for { + select { + case <-ctx.Done(): + c.l.Errorw("streaming has stopped") + return streamInfoMap, nil + default: + mpegTSDemuxData, err := mpegTSDemuxer.NextData() + + if err != nil { + if !errors.Is(err, context.Canceled) { + c.l.Errorw("failed to demux mpeg-ts", + "error", err, + ) + return streamInfoMap, err + } + return streamInfoMap, nil + } + + if mpegTSDemuxData.PMT != nil { + + for _, es := range mpegTSDemuxData.PMT.ElementaryStreams { + streamInfoMap[mapper.FromMpegTsStreamTypeToCodec(es.StreamType)] = entities.Stream{ + Codec: mapper.FromMpegTsStreamTypeToCodec(es.StreamType), + Type: mapper.FromMpegTsStreamTypeToType(es.StreamType), + } + } + } + } + } +} diff --git a/internal/controllers/srt_controller.go b/internal/controllers/srt_controller.go index d846f12..2c18ae1 100644 --- a/internal/controllers/srt_controller.go +++ b/internal/controllers/srt_controller.go @@ -51,8 +51,8 @@ func NewSRTController(c *entities.Config, l *zap.SugaredLogger, lc fx.Lifecycle) }, nil } -func (c *SRTController) Connect(cancel context.CancelFunc, params entities.RequestParams) (*astisrt.Connection, error) { - c.l.Infow("trying to connect srt") +func (c *SRTController) Connect(cancel context.CancelFunc, params *entities.RequestParams) (*astisrt.Connection, error) { + c.l.Info("trying to connect srt") if err := params.Valid(); err != nil { return nil, err diff --git a/internal/controllers/webrtc_controller.go b/internal/controllers/webrtc_controller.go index beb0d0e..f85297f 100644 --- a/internal/controllers/webrtc_controller.go +++ b/internal/controllers/webrtc_controller.go @@ -69,7 +69,7 @@ func (c *WebRTCController) CreatePeerConnection(cancel context.CancelFunc) (*web return peerConnection, nil } -func (c *WebRTCController) CreateTrack(peer *webrtc.PeerConnection, track entities.Track, id string, streamId string) (*webrtc.TrackLocalStaticSample, error) { +func (c *WebRTCController) CreateTrack(peer *webrtc.PeerConnection, track entities.Stream, id string, streamId string) (*webrtc.TrackLocalStaticSample, error) { codecCapability := mapper.FromTrackToRTPCodecCapability(track) webRTCtrack, err := webrtc.NewTrackLocalStaticSample(codecCapability, id, streamId) if err != nil { diff --git a/internal/entities/entities.go b/internal/entities/entities.go index de6e58f..df2b274 100644 --- a/internal/entities/entities.go +++ b/internal/entities/entities.go @@ -57,14 +57,25 @@ type Message struct { Message string } -type TrackType string +type Codec string +type MediaType string const ( - H264 TrackType = "h264" + UnknownCodec Codec = "unknownCodec" + H264 Codec = "h264" + AAC Codec = "aac" ) -type Track struct { - Type TrackType +const ( + UnknownType MediaType = "unknownMediaType" + VideoType MediaType = "video" + AudioTyp MediaType = "audio" +) + +type Stream struct { + Codec Codec + Type MediaType + Id uint16 } type Cue struct { @@ -95,5 +106,10 @@ type Config struct { StunServers []string `required:"true" default:"stun:stun.l.google.com:19302"` SRTConnectionLatencyMS int32 `required:"true" default:"300"` - SRTReadBufferSizeBytes int `required:"true" default:"1316"` + // MPEG-TS consists of single units of 188 bytes. Multiplying 188*7 we get 1316, + // which is the maximum product of 188 that is less than MTU 1500 (188*8=1504) + // ref https://github.com/Haivision/srt/blob/master/docs/features/live-streaming.md#transmitting-mpeg-ts-binary-protocol-over-srt + SRTReadBufferSizeBytes int `required:"true" default:"1316"` + + ProbingSize int `required:"true" default:"120"` } diff --git a/internal/entities/errors.go b/internal/entities/errors.go index 6b1512c..cbcf78d 100644 --- a/internal/entities/errors.go +++ b/internal/entities/errors.go @@ -10,3 +10,4 @@ var ErrMissingSRTPort = errors.New("SRTPort must be valid") var ErrMissingSRTStreamID = errors.New("SRTStreamID must not be empty") var ErrMissingWebRTCSetup = errors.New("WebRTCController.SetupPeerConnection must be called first") var ErrMissingRemoteOffer = errors.New("nil offer, in order to connect one must pass a valid offer") +var ErrMissingRequestParams = errors.New("RequestParams must not be nil") diff --git a/internal/mapper/mapper.go b/internal/mapper/mapper.go index 10ee19c..10e8c49 100644 --- a/internal/mapper/mapper.go +++ b/internal/mapper/mapper.go @@ -1,16 +1,37 @@ package mapper import ( + "github.com/asticode/go-astits" "github.com/flavioribeiro/donut/internal/entities" "github.com/pion/webrtc/v3" ) -func FromTrackToRTPCodecCapability(track entities.Track) webrtc.RTPCodecCapability { +func FromTrackToRTPCodecCapability(track entities.Stream) webrtc.RTPCodecCapability { response := webrtc.RTPCodecCapability{} - if track.Type == entities.H264 { + if track.Codec == entities.H264 { response.MimeType = webrtc.MimeTypeH264 } return response } + +func FromMpegTsStreamTypeToCodec(st astits.StreamType) entities.Codec { + if st == astits.StreamTypeH264Video { + return entities.H264 + } + if st == astits.StreamTypeAACAudio { + return entities.AAC + } + return entities.UnknownCodec +} + +func FromMpegTsStreamTypeToType(st astits.StreamType) entities.MediaType { + if st.IsVideo() { + return entities.VideoType + } + if st.IsAudio() { + return entities.AudioTyp + } + return entities.UnknownType +} diff --git a/internal/web/dependencies.go b/internal/web/dependencies.go new file mode 100644 index 0000000..cbe777f --- /dev/null +++ b/internal/web/dependencies.go @@ -0,0 +1,57 @@ +package web + +import ( + "log" + + "github.com/flavioribeiro/donut/internal/controllers" + "github.com/flavioribeiro/donut/internal/controllers/probers" + "github.com/flavioribeiro/donut/internal/entities" + "github.com/flavioribeiro/donut/internal/web/handlers" + "github.com/kelseyhightower/envconfig" + "go.uber.org/fx" + "go.uber.org/zap" +) + +func Dependencies(enableICEMux bool) fx.Option { + var c entities.Config + err := envconfig.Process("donut", &c) + if err != nil { + log.Fatal(err.Error()) + } + c.EnableICEMux = enableICEMux + + return fx.Options( + // HTTP Server + fx.Provide(NewHTTPServer), + + // HTTP router + fx.Provide(NewServeMux), + + // HTTP handlers + fx.Provide(handlers.NewSignalingHandler), + fx.Provide(handlers.NewIndexHandler), + + // ICE mux servers + fx.Provide(controllers.NewTCPICEServer), + fx.Provide(controllers.NewUDPICEServer), + + // Controllers + fx.Provide(controllers.NewSRTController), + fx.Provide(controllers.NewStreamingController), + + fx.Provide(controllers.NewWebRTCController), + fx.Provide(controllers.NewWebRTCSettingsEngine), + fx.Provide(controllers.NewWebRTCMediaEngine), + fx.Provide(controllers.NewWebRTCAPI), + fx.Provide(probers.NewSrtMpegTs), + + // Logging, Config constructors + fx.Provide(func() *zap.SugaredLogger { + logger, _ := zap.NewProduction() + return logger.Sugar() + }), + fx.Provide(func() *entities.Config { + return &c + }), + ) +} diff --git a/internal/web/handlers/signaling.go b/internal/web/handlers/signaling.go index c6dad09..4172eb8 100644 --- a/internal/web/handlers/signaling.go +++ b/internal/web/handlers/signaling.go @@ -6,6 +6,7 @@ import ( "net/http" "github.com/flavioribeiro/donut/internal/controllers" + "github.com/flavioribeiro/donut/internal/controllers/probers" "github.com/flavioribeiro/donut/internal/entities" "go.uber.org/zap" ) @@ -16,6 +17,7 @@ type SignalingHandler struct { webRTCController *controllers.WebRTCController srtController *controllers.SRTController streamingController *controllers.StreamingController + srtMpegTSprober *probers.SrtMpegTs } func NewSignalingHandler( @@ -24,6 +26,7 @@ func NewSignalingHandler( webRTCController *controllers.WebRTCController, srtController *controllers.SRTController, streamingController *controllers.StreamingController, + srtMpegTSprober *probers.SrtMpegTs, ) *SignalingHandler { return &SignalingHandler{ c: c, @@ -31,6 +34,7 @@ func NewSignalingHandler( webRTCController: webRTCController, srtController: srtController, streamingController: streamingController, + srtMpegTSprober: srtMpegTSprober, } } @@ -64,12 +68,22 @@ func (h *SignalingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) err return err } + data, err := h.srtMpegTSprober.StreamInfo(¶ms) + if err != nil { + h.l.Errorw("error while probing", + "error", err, + ) + return err + } + h.l.Infow("stream info", + "data", data, + ) // TODO: create tracks according with SRT available streams // Create a video track videoTrack, err := h.webRTCController.CreateTrack( peer, - entities.Track{ - Type: entities.H264, + entities.Stream{ + Codec: entities.H264, }, "video", params.SRTStreamID, ) if err != nil { @@ -102,7 +116,7 @@ func (h *SignalingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) err return err } - srtConnection, err := h.srtController.Connect(cancel, params) + srtConnection, err := h.srtController.Connect(cancel, ¶ms) if err != nil { h.l.Errorw("error while connecting to an srt server", "error", err, diff --git a/main.go b/main.go index 8f09eac..fee7fdc 100644 --- a/main.go +++ b/main.go @@ -5,17 +5,11 @@ package main import ( "flag" - "log" "net/http" - "github.com/flavioribeiro/donut/internal/controllers" - "github.com/flavioribeiro/donut/internal/entities" "github.com/flavioribeiro/donut/internal/web" - "github.com/flavioribeiro/donut/internal/web/handlers" - "github.com/kelseyhightower/envconfig" "go.uber.org/fx" - "go.uber.org/zap" ) func main() { @@ -23,46 +17,8 @@ func main() { flag.BoolVar(&enableICEMux, "enable-ice-mux", false, "Enable ICE Mux on :8081") flag.Parse() - var c entities.Config - err := envconfig.Process("donut", &c) - if err != nil { - log.Fatal(err.Error()) - } - c.EnableICEMux = enableICEMux - fx.New( - // HTTP Server - fx.Provide(web.NewHTTPServer), - - // HTTP router - fx.Provide(web.NewServeMux), - - // HTTP handlers - fx.Provide(handlers.NewSignalingHandler), - fx.Provide(handlers.NewIndexHandler), - - // ICE mux servers - fx.Provide(controllers.NewTCPICEServer), - fx.Provide(controllers.NewUDPICEServer), - - // Controllers - fx.Provide(controllers.NewSRTController), - fx.Provide(controllers.NewStreamingController), - - fx.Provide(controllers.NewWebRTCController), - fx.Provide(controllers.NewWebRTCSettingsEngine), - fx.Provide(controllers.NewWebRTCMediaEngine), - fx.Provide(controllers.NewWebRTCAPI), - - // Logging, Config constructors - fx.Provide(func() *zap.SugaredLogger { - logger, _ := zap.NewProduction() - return logger.Sugar() - }), - fx.Provide(func() *entities.Config { - return &c - }), - + web.Dependencies(enableICEMux), // Forcing the lifecycle initiation with NewHTTPServer fx.Invoke(func(*http.Server) {}), ).Run()