add mpeg-ts prober

This commit is contained in:
Leandro Moreira
2024-02-09 09:49:17 -03:00
parent 4ad3d0920a
commit 33710100a5
15 changed files with 242 additions and 61 deletions

1
.gitignore vendored
View File

@@ -8,6 +8,7 @@ donut
# Test binary, built with `go test -c`
*.test
coverage.out
# Output of the go coverage tool, specifically when used with LiteIDE
*.out

View File

@@ -1,9 +1,15 @@
run:
docker-compose stop && docker-compose down && docker-compose build && docker-compose up
docker-compose stop && docker-compose down && docker-compose build && docker-compose up origin srt app
test:
docker compose stop test && docker compose down test && docker compose run --rm test
test-local:
./scripts/local_run_test.sh
html-local-coverage:
go tool cover -html=coverage.out
lint:
docker compose stop lint && docker compose down lint && docker compose run --rm lint

View File

@@ -23,6 +23,7 @@ refs:
* mpegts example https://github.com/wakabayashik/mpegts-to-webrtc/blob/main/main.go
* binding go https://github.com/asticode/go-astiav
* network use https://github.com/asticode/go-astiav/issues/7
* srt live https://github.com/Haivision/srt/blob/master/docs/features/live-streaming.md
# Moving player to static

View File

@@ -20,7 +20,7 @@ services:
working_dir: "/app"
volumes:
- "./:/app/"
command: "go test ./..."
command: "go test -v ./..."
lint:
build:

2
go.mod
View File

@@ -9,6 +9,7 @@ require (
github.com/pion/webrtc/v3 v3.1.47
github.com/szatmary/gocaption v0.0.0-20220607192049-fdd59655f0c3
go.uber.org/fx v1.20.1
go.uber.org/zap v1.23.0
)
require (
@@ -33,7 +34,6 @@ require (
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/dig v1.17.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.23.0 // indirect
golang.org/x/crypto v0.2.0 // indirect
golang.org/x/net v0.2.0 // indirect
golang.org/x/sys v0.2.0 // indirect

View File

@@ -0,0 +1,7 @@
package probers
import "github.com/flavioribeiro/donut/internal/entities"
type Prober interface {
StreamInfo(req *entities.RequestParams) (map[entities.Codec]entities.Stream, error)
}

View File

@@ -0,0 +1,101 @@
package probers
import (
"context"
"errors"
"io"
"github.com/asticode/go-astits"
"github.com/flavioribeiro/donut/internal/controllers"
"github.com/flavioribeiro/donut/internal/entities"
"github.com/flavioribeiro/donut/internal/mapper"
"go.uber.org/zap"
)
type SrtMpegTs struct {
c *entities.Config
l *zap.SugaredLogger
srtController *controllers.SRTController
}
func NewSrtMpegTs(c *entities.Config, l *zap.SugaredLogger, srtController *controllers.SRTController) *SrtMpegTs {
return &SrtMpegTs{
c: c,
l: l,
srtController: srtController,
}
}
func (c *SrtMpegTs) StreamInfo(req *entities.RequestParams) (map[entities.Codec]entities.Stream, error) {
r, w := io.Pipe()
defer r.Close()
defer w.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
srtConnection, err := c.srtController.Connect(cancel, req)
if err != nil {
return nil, err
}
defer srtConnection.Close()
streamInfoMap := map[entities.Codec]entities.Stream{}
inboundMpegTsPacket := make([]byte, c.c.SRTReadBufferSizeBytes)
probingSize := 120
// probing mpeg-ts for N packets to find metadata
c.l.Infow("probing has started")
go func() {
for i := 1; i < probingSize; i++ {
n, err := srtConnection.Read(inboundMpegTsPacket)
if err != nil {
c.l.Errorw("str conn failed to write data to buffer",
"error", err,
)
break
}
if _, err := w.Write(inboundMpegTsPacket[:n]); err != nil {
c.l.Errorw("failed to write mpeg-ts into the pipe",
"error", err,
)
break
}
}
c.l.Info("done probing")
cancel()
}()
c.l.Info("probing has starting demuxing")
mpegTSDemuxer := astits.NewDemuxer(ctx, r)
for {
select {
case <-ctx.Done():
c.l.Errorw("streaming has stopped")
return streamInfoMap, nil
default:
mpegTSDemuxData, err := mpegTSDemuxer.NextData()
if err != nil {
if !errors.Is(err, context.Canceled) {
c.l.Errorw("failed to demux mpeg-ts",
"error", err,
)
return streamInfoMap, err
}
return streamInfoMap, nil
}
if mpegTSDemuxData.PMT != nil {
for _, es := range mpegTSDemuxData.PMT.ElementaryStreams {
streamInfoMap[mapper.FromMpegTsStreamTypeToCodec(es.StreamType)] = entities.Stream{
Codec: mapper.FromMpegTsStreamTypeToCodec(es.StreamType),
Type: mapper.FromMpegTsStreamTypeToType(es.StreamType),
}
}
}
}
}
}

View File

@@ -51,8 +51,8 @@ func NewSRTController(c *entities.Config, l *zap.SugaredLogger, lc fx.Lifecycle)
}, nil
}
func (c *SRTController) Connect(cancel context.CancelFunc, params entities.RequestParams) (*astisrt.Connection, error) {
c.l.Infow("trying to connect srt")
func (c *SRTController) Connect(cancel context.CancelFunc, params *entities.RequestParams) (*astisrt.Connection, error) {
c.l.Info("trying to connect srt")
if err := params.Valid(); err != nil {
return nil, err

View File

@@ -69,7 +69,7 @@ func (c *WebRTCController) CreatePeerConnection(cancel context.CancelFunc) (*web
return peerConnection, nil
}
func (c *WebRTCController) CreateTrack(peer *webrtc.PeerConnection, track entities.Track, id string, streamId string) (*webrtc.TrackLocalStaticSample, error) {
func (c *WebRTCController) CreateTrack(peer *webrtc.PeerConnection, track entities.Stream, id string, streamId string) (*webrtc.TrackLocalStaticSample, error) {
codecCapability := mapper.FromTrackToRTPCodecCapability(track)
webRTCtrack, err := webrtc.NewTrackLocalStaticSample(codecCapability, id, streamId)
if err != nil {

View File

@@ -57,14 +57,25 @@ type Message struct {
Message string
}
type TrackType string
type Codec string
type MediaType string
const (
H264 TrackType = "h264"
UnknownCodec Codec = "unknownCodec"
H264 Codec = "h264"
AAC Codec = "aac"
)
type Track struct {
Type TrackType
const (
UnknownType MediaType = "unknownMediaType"
VideoType MediaType = "video"
AudioTyp MediaType = "audio"
)
type Stream struct {
Codec Codec
Type MediaType
Id uint16
}
type Cue struct {
@@ -95,5 +106,10 @@ type Config struct {
StunServers []string `required:"true" default:"stun:stun.l.google.com:19302"`
SRTConnectionLatencyMS int32 `required:"true" default:"300"`
SRTReadBufferSizeBytes int `required:"true" default:"1316"`
// MPEG-TS consists of single units of 188 bytes. Multiplying 188*7 we get 1316,
// which is the maximum product of 188 that is less than MTU 1500 (188*8=1504)
// ref https://github.com/Haivision/srt/blob/master/docs/features/live-streaming.md#transmitting-mpeg-ts-binary-protocol-over-srt
SRTReadBufferSizeBytes int `required:"true" default:"1316"`
ProbingSize int `required:"true" default:"120"`
}

View File

@@ -10,3 +10,4 @@ var ErrMissingSRTPort = errors.New("SRTPort must be valid")
var ErrMissingSRTStreamID = errors.New("SRTStreamID must not be empty")
var ErrMissingWebRTCSetup = errors.New("WebRTCController.SetupPeerConnection must be called first")
var ErrMissingRemoteOffer = errors.New("nil offer, in order to connect one must pass a valid offer")
var ErrMissingRequestParams = errors.New("RequestParams must not be nil")

View File

@@ -1,16 +1,37 @@
package mapper
import (
"github.com/asticode/go-astits"
"github.com/flavioribeiro/donut/internal/entities"
"github.com/pion/webrtc/v3"
)
func FromTrackToRTPCodecCapability(track entities.Track) webrtc.RTPCodecCapability {
func FromTrackToRTPCodecCapability(track entities.Stream) webrtc.RTPCodecCapability {
response := webrtc.RTPCodecCapability{}
if track.Type == entities.H264 {
if track.Codec == entities.H264 {
response.MimeType = webrtc.MimeTypeH264
}
return response
}
func FromMpegTsStreamTypeToCodec(st astits.StreamType) entities.Codec {
if st == astits.StreamTypeH264Video {
return entities.H264
}
if st == astits.StreamTypeAACAudio {
return entities.AAC
}
return entities.UnknownCodec
}
func FromMpegTsStreamTypeToType(st astits.StreamType) entities.MediaType {
if st.IsVideo() {
return entities.VideoType
}
if st.IsAudio() {
return entities.AudioTyp
}
return entities.UnknownType
}

View File

@@ -0,0 +1,57 @@
package web
import (
"log"
"github.com/flavioribeiro/donut/internal/controllers"
"github.com/flavioribeiro/donut/internal/controllers/probers"
"github.com/flavioribeiro/donut/internal/entities"
"github.com/flavioribeiro/donut/internal/web/handlers"
"github.com/kelseyhightower/envconfig"
"go.uber.org/fx"
"go.uber.org/zap"
)
func Dependencies(enableICEMux bool) fx.Option {
var c entities.Config
err := envconfig.Process("donut", &c)
if err != nil {
log.Fatal(err.Error())
}
c.EnableICEMux = enableICEMux
return fx.Options(
// HTTP Server
fx.Provide(NewHTTPServer),
// HTTP router
fx.Provide(NewServeMux),
// HTTP handlers
fx.Provide(handlers.NewSignalingHandler),
fx.Provide(handlers.NewIndexHandler),
// ICE mux servers
fx.Provide(controllers.NewTCPICEServer),
fx.Provide(controllers.NewUDPICEServer),
// Controllers
fx.Provide(controllers.NewSRTController),
fx.Provide(controllers.NewStreamingController),
fx.Provide(controllers.NewWebRTCController),
fx.Provide(controllers.NewWebRTCSettingsEngine),
fx.Provide(controllers.NewWebRTCMediaEngine),
fx.Provide(controllers.NewWebRTCAPI),
fx.Provide(probers.NewSrtMpegTs),
// Logging, Config constructors
fx.Provide(func() *zap.SugaredLogger {
logger, _ := zap.NewProduction()
return logger.Sugar()
}),
fx.Provide(func() *entities.Config {
return &c
}),
)
}

View File

@@ -6,6 +6,7 @@ import (
"net/http"
"github.com/flavioribeiro/donut/internal/controllers"
"github.com/flavioribeiro/donut/internal/controllers/probers"
"github.com/flavioribeiro/donut/internal/entities"
"go.uber.org/zap"
)
@@ -16,6 +17,7 @@ type SignalingHandler struct {
webRTCController *controllers.WebRTCController
srtController *controllers.SRTController
streamingController *controllers.StreamingController
srtMpegTSprober *probers.SrtMpegTs
}
func NewSignalingHandler(
@@ -24,6 +26,7 @@ func NewSignalingHandler(
webRTCController *controllers.WebRTCController,
srtController *controllers.SRTController,
streamingController *controllers.StreamingController,
srtMpegTSprober *probers.SrtMpegTs,
) *SignalingHandler {
return &SignalingHandler{
c: c,
@@ -31,6 +34,7 @@ func NewSignalingHandler(
webRTCController: webRTCController,
srtController: srtController,
streamingController: streamingController,
srtMpegTSprober: srtMpegTSprober,
}
}
@@ -64,12 +68,22 @@ func (h *SignalingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) err
return err
}
data, err := h.srtMpegTSprober.StreamInfo(&params)
if err != nil {
h.l.Errorw("error while probing",
"error", err,
)
return err
}
h.l.Infow("stream info",
"data", data,
)
// TODO: create tracks according with SRT available streams
// Create a video track
videoTrack, err := h.webRTCController.CreateTrack(
peer,
entities.Track{
Type: entities.H264,
entities.Stream{
Codec: entities.H264,
}, "video", params.SRTStreamID,
)
if err != nil {
@@ -102,7 +116,7 @@ func (h *SignalingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) err
return err
}
srtConnection, err := h.srtController.Connect(cancel, params)
srtConnection, err := h.srtController.Connect(cancel, &params)
if err != nil {
h.l.Errorw("error while connecting to an srt server",
"error", err,

46
main.go
View File

@@ -5,17 +5,11 @@ package main
import (
"flag"
"log"
"net/http"
"github.com/flavioribeiro/donut/internal/controllers"
"github.com/flavioribeiro/donut/internal/entities"
"github.com/flavioribeiro/donut/internal/web"
"github.com/flavioribeiro/donut/internal/web/handlers"
"github.com/kelseyhightower/envconfig"
"go.uber.org/fx"
"go.uber.org/zap"
)
func main() {
@@ -23,46 +17,8 @@ func main() {
flag.BoolVar(&enableICEMux, "enable-ice-mux", false, "Enable ICE Mux on :8081")
flag.Parse()
var c entities.Config
err := envconfig.Process("donut", &c)
if err != nil {
log.Fatal(err.Error())
}
c.EnableICEMux = enableICEMux
fx.New(
// HTTP Server
fx.Provide(web.NewHTTPServer),
// HTTP router
fx.Provide(web.NewServeMux),
// HTTP handlers
fx.Provide(handlers.NewSignalingHandler),
fx.Provide(handlers.NewIndexHandler),
// ICE mux servers
fx.Provide(controllers.NewTCPICEServer),
fx.Provide(controllers.NewUDPICEServer),
// Controllers
fx.Provide(controllers.NewSRTController),
fx.Provide(controllers.NewStreamingController),
fx.Provide(controllers.NewWebRTCController),
fx.Provide(controllers.NewWebRTCSettingsEngine),
fx.Provide(controllers.NewWebRTCMediaEngine),
fx.Provide(controllers.NewWebRTCAPI),
// Logging, Config constructors
fx.Provide(func() *zap.SugaredLogger {
logger, _ := zap.NewProduction()
return logger.Sugar()
}),
fx.Provide(func() *entities.Config {
return &c
}),
web.Dependencies(enableICEMux),
// Forcing the lifecycle initiation with NewHTTPServer
fx.Invoke(func(*http.Server) {}),
).Run()