diff --git a/go.mod b/go.mod index 45ac6c6..7e9baac 100644 --- a/go.mod +++ b/go.mod @@ -5,8 +5,10 @@ go 1.19 require ( github.com/asticode/go-astisrt v0.3.0 github.com/asticode/go-astits v1.11.0 + github.com/kelseyhightower/envconfig v1.4.0 github.com/pion/webrtc/v3 v3.1.47 github.com/szatmary/gocaption v0.0.0-20220607192049-fdd59655f0c3 + go.uber.org/fx v1.20.1 ) require ( @@ -28,6 +30,10 @@ require ( github.com/pion/transport v0.13.1 // indirect github.com/pion/turn/v2 v2.0.8 // indirect github.com/pion/udp v0.1.1 // indirect + go.uber.org/atomic v1.7.0 // indirect + go.uber.org/dig v1.17.0 // indirect + go.uber.org/multierr v1.6.0 // indirect + go.uber.org/zap v1.23.0 // indirect golang.org/x/crypto v0.2.0 // indirect golang.org/x/net v0.2.0 // indirect golang.org/x/sys v0.2.0 // indirect diff --git a/go.sum b/go.sum index 611583d..1bfc9ae 100644 --- a/go.sum +++ b/go.sum @@ -1,13 +1,11 @@ -github.com/asticode/go-astikit v0.30.0 h1:DkBkRQRIxYcknlaU7W7ksNfn4gMFsB0tqMJflxkRsZA= github.com/asticode/go-astikit v0.30.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0= github.com/asticode/go-astikit v0.36.0 h1:WHSY88YT76D/XRbdp0lMLwfjyUGw8dygnbKKtbGNIG8= github.com/asticode/go-astikit v0.36.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0= -github.com/asticode/go-astisrt v0.2.0 h1:pfaSfW2BfW2O2NFtzlktzIf5gHc4Pue1q1scTuj7uhc= -github.com/asticode/go-astisrt v0.2.0/go.mod h1:tP5Dx+MXyaICUeF0gz4nwyav3RDI609e0en3QQkrxKE= github.com/asticode/go-astisrt v0.3.0 h1:LpvqOc17qfMr2suLZPzMs9wYLozxXYu/PE9CA1tH88c= github.com/asticode/go-astisrt v0.3.0/go.mod h1:tP5Dx+MXyaICUeF0gz4nwyav3RDI609e0en3QQkrxKE= github.com/asticode/go-astits v1.11.0 h1:GTHUXht0ZXAJXsVbsLIcyfHr1Bchi4QQwMARw2ZWAng= github.com/asticode/go-astits v1.11.0/go.mod h1:QSHmknZ51pf6KJdHKZHJTLlMegIrhega3LPWz3ND/iI= +github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -30,6 +28,8 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8= +github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= @@ -48,7 +48,6 @@ github.com/pion/dtls/v2 v2.1.5 h1:jlh2vtIyUBShchoTDqpCCqiYCyRFJ/lvf/gQ8TALs+c= github.com/pion/dtls/v2 v2.1.5/go.mod h1:BqCE7xPZbPSubGasRoDFJeTsyJtdD1FanJYL0JGheqY= github.com/pion/ice/v2 v2.2.11 h1:wiAy7TSrVZ4KdyjC0CcNTkwltz9ywetbe4wbHLKUbIg= github.com/pion/ice/v2 v2.2.11/go.mod h1:NqUDUao6SjSs1+4jrqpexDmFlptlVhGxQjcymXLaVvE= -github.com/pion/interceptor v0.1.11 h1:00U6OlqxA3FFB50HSg25J/8cWi7P6FbSzw4eFn24Bvs= github.com/pion/interceptor v0.1.11/go.mod h1:tbtKjZY14awXd7Bq0mmWvgtHB5MDaRN7HV3OZ/uy7s8= github.com/pion/interceptor v0.1.12 h1:CslaNriCFUItiXS5o+hh5lpL0t0ytQkFnUcbbCs2Zq8= github.com/pion/interceptor v0.1.12/go.mod h1:bDtgAD9dRkBZpWHGKaoKb42FhDHTG2rX8Ii9LRALLVA= @@ -64,7 +63,6 @@ github.com/pion/rtcp v1.2.10/go.mod h1:ztfEwXZNLGyF1oQDttz/ZKIBaeeg/oWbRYqzBM9TL github.com/pion/rtp v1.7.13 h1:qcHwlmtiI50t1XivvoawdCGTP4Uiypzfrsap+bijcoA= github.com/pion/rtp v1.7.13/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= github.com/pion/sctp v1.8.0/go.mod h1:xFe9cLMZ5Vj6eOzpyiKjT9SwGM4KpK/8Jbw5//jc+0s= -github.com/pion/sctp v1.8.2 h1:yBBCIrUMJ4yFICL3RIvR4eh/H2BTTvlligmSTy+3kiA= github.com/pion/sctp v1.8.2/go.mod h1:xFe9cLMZ5Vj6eOzpyiKjT9SwGM4KpK/8Jbw5//jc+0s= github.com/pion/sctp v1.8.3 h1:LWcciN2ptLkw9Ugp/Ks2E76fiWy7yk3Wm79D6oFbFNo= github.com/pion/sctp v1.8.3/go.mod h1:OHbDjdk7kg+L+7TJim9q/qGVefdEJohuA2SZyihccgI= @@ -85,11 +83,13 @@ github.com/pion/udp v0.1.1 h1:8UAPvyqmsxK8oOjloDk4wUt63TzFe9WEJkg5lChlj7o= github.com/pion/udp v0.1.1/go.mod h1:6AFo+CMdKQm7UiA0eUPA8/eVCTx8jBIITLZHc9DWX5M= github.com/pion/webrtc/v3 v3.1.47 h1:2dFEKRI1rzFvehXDq43hK9OGGyTGJSusUi3j6QKHC5s= github.com/pion/webrtc/v3 v3.1.47/go.mod h1:8U39MYZCLVV4sIBn01htASVNkWQN2zDa/rx5xisEXWs= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/profile v1.4.0/go.mod h1:NWz/XGvpEW1FyYQ7fCx4dqYBLlfTcE+A9FLAkNKqjFE= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZKJ48R1S7H23Ji7oFO5Bw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -99,11 +99,21 @@ github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PK github.com/szatmary/gocaption v0.0.0-20220607192049-fdd59655f0c3 h1:j8SVIV6YZreqjOPGjxM48tB4XgS8oUZdgy0cyN7YrBg= github.com/szatmary/gocaption v0.0.0-20220607192049-fdd59655f0c3/go.mod h1:l9r7RYKHGLuHbXpKJhJgASvi8xT+Uqxnz9B26uVU73c= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/dig v1.17.0 h1:5Chju+tUvcC+N7N6EV08BJz41UZuO3BmHcN4A287ZLI= +go.uber.org/dig v1.17.0/go.mod h1:rTxpf7l5I0eBTlE6/9RL+lDybC7WFwY2QH55ZSjy1mU= +go.uber.org/fx v1.20.1 h1:zVwVQGS8zYvhh9Xxcu4w1M6ESyeMzebzj2NbSayZ4Mk= +go.uber.org/fx v1.20.1/go.mod h1:iSYNbHf2y55acNCwCXKx7LbWb5WG1Bnue5RDXz1OREg= +go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= +go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= +go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/zap v1.23.0 h1:OjGQ5KQDEUawVHxNwQgPpiypGHOxo2mNZsOqTak4fFY= +go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.0.0-20221010152910-d6f0a8c073c2 h1:x8vtB3zMecnlqZIwJNUUpwYKYSqCz5jXbiyv0ZJJZeI= golang.org/x/crypto v0.0.0-20221010152910-d6f0a8c073c2/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.2.0 h1:BRXPfhNivWL5Yq0BGQ39a2sW6t44aODpfxkWjYdzewE= golang.org/x/crypto v0.2.0/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4= @@ -121,7 +131,6 @@ golang.org/x/net v0.0.0-20211201190559-0a0e4e1bb54c/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220531201128-c960675eff93/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.0.0-20221002022538-bcab6841153b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= -golang.org/x/net v0.0.0-20221004154528-8021a29435af h1:wv66FM3rLZGPdxpYL+ApnDe2HzHcTFta3z5nsc13wI4= golang.org/x/net v0.0.0-20221004154528-8021a29435af/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= golang.org/x/net v0.2.0 h1:sZfSu1wtKLGlWI4ZZayP0ck9Y73K1ynO6gqzTdBVdPU= golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= @@ -145,7 +154,6 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220608164250-635b8c9b7f68/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220622161953-175b2fd9d664/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20221010170243-090e33056c14 h1:k5II8e6QD8mITdi+okbbmR/cIyEbeXLBhy5Ha4nevyc= golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.2.0 h1:ljd4t30dBnAvMZaQCevtY0xLLD0A+bRZXbgLMLU1F/A= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/controller/srt/controller.go b/internal/controller/srt/controller.go new file mode 100644 index 0000000..90ed0bc --- /dev/null +++ b/internal/controller/srt/controller.go @@ -0,0 +1,49 @@ +package srt + +import ( + "log" + + astisrt "github.com/asticode/go-astisrt/pkg" + "github.com/flavioribeiro/donut/internal/entity" +) + +type SRTController struct { + c *entity.Config +} + +func NewSRTController(c *entity.Config) *SRTController { + return &SRTController{ + c: c, + } +} + +func (c *SRTController) Connect(offer *entity.ParamsOffer) error { + if err := offer.Valid(); err != nil { + return err + } + + // conn, err := c.srtConnect(offer) + // if err != nil { + // return err + // } + + return nil +} + +func (c *SRTController) srtConnect(offer *entity.ParamsOffer) (*astisrt.Connection, error) { + srtConnection, err := astisrt.Dial(astisrt.DialOptions{ + ConnectionOptions: []astisrt.ConnectionOption{ + astisrt.WithLatency(c.c.SRTConnectionLatencyMS), + astisrt.WithStreamid(offer.SRTStreamID), + }, + OnDisconnect: func(c *astisrt.Connection, err error) { + log.Fatal("Disconnected from SRT") + }, + Host: offer.SRTHost, + Port: offer.SRTPort, + }) + if err != nil { + return nil, err + } + return srtConnection, nil +} diff --git a/internal/entity/entitities.go b/internal/entity/entitities.go index 60bfa35..d37be65 100644 --- a/internal/entity/entitities.go +++ b/internal/entity/entitities.go @@ -1,7 +1,6 @@ package entity import ( - "errors" "fmt" "github.com/pion/webrtc/v3" @@ -18,19 +17,19 @@ type ParamsOffer struct { func (p *ParamsOffer) Valid() error { if p == nil { - return errors.New("ParamsOffer must not be nil") + return ErrMissingParamsOffer } if p.SRTHost == "" { - return errors.New("SRTHost must not be nil") + return ErrMissingSRTHost } if p.SRTPort == 0 { - return errors.New("SRTPort must be valid") + return ErrMissingSRTPort } if p.SRTStreamID == "" { - return errors.New("SRTStreamID must not be empty") + return ErrMissingSRTStreamID } return nil @@ -53,3 +52,18 @@ type Message struct { Type MessageType Message string } + +type Config struct { + HTTPPort int32 `required:"true" default:"8080"` + HTTPHost string `required:"true" default:"0.0.0.0"` + + TCPICEPort int `required:"true" default:"8081"` + UDPICEPort int `required:"true" default:"8081"` + ICEReadBufferSize int `required:"true" default:"8"` + ICEExternalIPsDNAT []string `required:"true" default:"127.0.0.1"` + EnableICEMux bool `require:"true" default:"false"` + StunServers []string `required:"true" default:"stun:stun4.l.google.com:19302"` + + SRTConnectionLatencyMS int32 `required:"true" default:"300"` + SRTReadBufferSizeBytes int `required:"true" default:"1316"` +} diff --git a/internal/entity/errors.go b/internal/entity/errors.go new file mode 100644 index 0000000..f9577a3 --- /dev/null +++ b/internal/entity/errors.go @@ -0,0 +1,10 @@ +package entity + +import "errors" + +var ErrHTTPGetOnly = errors.New("you must use http GET verb") +var ErrHTTPPostOnly = errors.New("you must use http POST verb") +var ErrMissingParamsOffer = errors.New("ParamsOffer must not be nil") +var ErrMissingSRTHost = errors.New("SRTHost must not be nil") +var ErrMissingSRTPort = errors.New("SRTPort must be valid") +var ErrMissingSRTStreamID = errors.New("SRTStreamID must not be empty") diff --git a/internal/web/handlers.go b/internal/web/handlers.go deleted file mode 100644 index 813278a..0000000 --- a/internal/web/handlers.go +++ /dev/null @@ -1,7 +0,0 @@ -package handlers - -import "net/http" - -func MediaInfo(w http.ResponseWriter, r *http.Request) { - -} diff --git a/internal/web/http.go b/internal/web/http.go new file mode 100644 index 0000000..f4bcbc7 --- /dev/null +++ b/internal/web/http.go @@ -0,0 +1,41 @@ +package handlers + +import ( + "context" + "fmt" + "net" + "net/http" + + "github.com/flavioribeiro/donut/internal/entity" + "go.uber.org/fx" + "go.uber.org/zap" +) + +func NewHTTPServer( + c *entity.Config, + mux *http.ServeMux, + log *zap.Logger, + lc fx.Lifecycle, +) *http.Server { + srv := &http.Server{ + Addr: fmt.Sprintf("%s:%d", c.HTTPHost, c.HTTPPort), + Handler: mux, + } + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + ln, err := net.Listen("tcp", srv.Addr) + if err != nil { + return err + } + log.Sugar().Infow(fmt.Sprintf("Starting HTTP server. Open http://%s to access the demo", srv.Addr), + "addr", srv.Addr, + ) + go srv.Serve(ln) + return nil + }, + OnStop: func(ctx context.Context) error { + return srv.Shutdown(ctx) + }, + }) + return srv +} diff --git a/internal/web/ice.go b/internal/web/ice.go new file mode 100644 index 0000000..6b918ea --- /dev/null +++ b/internal/web/ice.go @@ -0,0 +1,50 @@ +package handlers + +import ( + "net" + + "github.com/flavioribeiro/donut/internal/entity" + "github.com/pion/webrtc/v3" +) + +func NewTCPICEServer(c *entity.Config) (*net.TCPListener, error) { + tcpListener, err := net.ListenTCP("tcp", &net.TCPAddr{ + IP: net.IP{0, 0, 0, 0}, + Port: c.TCPICEPort, + }) + if err != nil { + return nil, err + } + return tcpListener, nil +} + +func NewUDPICEServer(c *entity.Config) (*net.UDPConn, error) { + + udpListener, err := net.ListenUDP("udp", &net.UDPAddr{ + IP: net.IP{0, 0, 0, 0}, + Port: c.UDPICEPort, + }) + if err != nil { + return nil, err + } + + return udpListener, nil +} + +func NewWebRTCSettingsEngine(c *entity.Config, tcpListener *net.TCPListener, udpListener *net.UDPConn) *webrtc.SettingEngine { + settingEngine := &webrtc.SettingEngine{} + + settingEngine.SetNAT1To1IPs(c.ICEExternalIPsDNAT, webrtc.ICECandidateTypeHost) + settingEngine.SetICETCPMux(webrtc.NewICETCPMux(nil, tcpListener, c.ICEReadBufferSize)) + settingEngine.SetICEUDPMux(webrtc.NewICEUDPMux(nil, udpListener)) + + return settingEngine +} + +func NewWebRTCMediaEngine() (*webrtc.MediaEngine, error) { + mediaEngine := &webrtc.MediaEngine{} + if err := mediaEngine.RegisterDefaultCodecs(); err != nil { + return nil, err + } + return mediaEngine, nil +} diff --git a/internal/web/index.go b/internal/web/index.go new file mode 100644 index 0000000..1c828e0 --- /dev/null +++ b/internal/web/index.go @@ -0,0 +1,20 @@ +package handlers + +import ( + _ "embed" + "net/http" +) + +//go:embed index.html +var indexHTML string + +type IndexHandler struct{} + +func NewIndexHandler() *IndexHandler { + return &IndexHandler{} +} + +func (h *IndexHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte(indexHTML)) +} diff --git a/index.html b/internal/web/index.html similarity index 100% rename from index.html rename to internal/web/index.html diff --git a/internal/web/media.go b/internal/web/media.go new file mode 100644 index 0000000..b9b972d --- /dev/null +++ b/internal/web/media.go @@ -0,0 +1,49 @@ +package handlers + +import ( + "encoding/json" + "log" + "net/http" + + astisrt "github.com/asticode/go-astisrt/pkg" + "github.com/flavioribeiro/donut/internal/entity" +) + +type MediaHandler struct{} + +func NewMediaHandler() *MediaHandler { + return &MediaHandler{} +} + +func (m *MediaHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + SetCORS(w, r) + if r.Method != http.MethodGet { + ErrorToHTTP(w, entity.ErrHTTPGetOnly) + return + } + + offer := entity.ParamsOffer{} + if err := json.NewDecoder(r.Body).Decode(&offer); err != nil { + ErrorToHTTP(w, err) + return + } + + log.Println("Connecting to SRT ", offer) + _, err := astisrt.Dial(astisrt.DialOptions{ + ConnectionOptions: []astisrt.ConnectionOption{ + astisrt.WithLatency(300), + astisrt.WithStreamid(offer.SRTStreamID), + }, + + // Callback when the connection is disconnected + OnDisconnect: func(c *astisrt.Connection, err error) { log.Fatal("Disconnected from SRT") }, + + Host: offer.SRTHost, + Port: offer.SRTPort, + }) + if err != nil { + ErrorToHTTP(w, err) + return + } + log.Println("Connected to SRT") +} diff --git a/internal/web/router.go b/internal/web/router.go new file mode 100644 index 0000000..32e7fa2 --- /dev/null +++ b/internal/web/router.go @@ -0,0 +1,16 @@ +package handlers + +import "net/http" + +func NewServeMux( + index *IndexHandler, + signaling *SignalingHandler, +) *http.ServeMux { + + mux := http.NewServeMux() + + mux.Handle("/", index) + mux.Handle("/doSignaling", signaling) + + return mux +} diff --git a/internal/web/signaling.go b/internal/web/signaling.go new file mode 100644 index 0000000..4455ea0 --- /dev/null +++ b/internal/web/signaling.go @@ -0,0 +1,233 @@ +package handlers + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + astisrt "github.com/asticode/go-astisrt/pkg" + "github.com/asticode/go-astits" + "github.com/flavioribeiro/donut/eia608" + "github.com/flavioribeiro/donut/internal/entity" + "github.com/pion/webrtc/v3" + "github.com/pion/webrtc/v3/pkg/media" + "go.uber.org/zap" +) + +type SignalingHandler struct { + c *entity.Config + l *zap.Logger + webrtcSetting *webrtc.SettingEngine + mediaEngine *webrtc.MediaEngine +} + +func NewSignalingHandler(c *entity.Config, log *zap.Logger, webrtcSetting *webrtc.SettingEngine, mediaEngine *webrtc.MediaEngine) *SignalingHandler { + return &SignalingHandler{ + c: c, + l: log, + webrtcSetting: webrtcSetting, + mediaEngine: mediaEngine, + } +} + +func (h *SignalingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + SetCORS(w, r) + if r.Method != http.MethodPost { + ErrorToHTTP(w, entity.ErrHTTPPostOnly) + return + } + + peerConnectionConfiguration := webrtc.Configuration{} + if !h.c.EnableICEMux { + peerConnectionConfiguration.ICEServers = []webrtc.ICEServer{ + { + URLs: h.c.StunServers, + }, + } + } + + api := webrtc.NewAPI( + webrtc.WithSettingEngine(*h.webrtcSetting), + webrtc.WithMediaEngine(h.mediaEngine), + ) + + peerConnection, err := api.NewPeerConnection(peerConnectionConfiguration) + if err != nil { + ErrorToHTTP(w, err) + return + } + + offer := entity.ParamsOffer{} + if err := json.NewDecoder(r.Body).Decode(&offer); err != nil { + ErrorToHTTP(w, err) + return + } + + // Create a video track + videoTrack, err := webrtc.NewTrackLocalStaticSample( + webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, + "video", offer.SRTStreamID, + ) + if err != nil { + ErrorToHTTP(w, err) + return + } + if _, err := peerConnection.AddTrack(videoTrack); err != nil { + ErrorToHTTP(w, err) + return + } + + // Create data channel for metadata transmission + metadataSender, err := peerConnection.CreateDataChannel("metadata", nil) + if err != nil { + ErrorToHTTP(w, err) + } + + // Set the handler for ICE connection state + // This will notify you when the peer has connected/disconnected + peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { + h.l.Sugar().Infow("ICE Connection State has changed", + "status", connectionState.String(), + ) + }) + + err = offer.Valid() + if err != nil { + ErrorToHTTP(w, err) + return + } + if err = peerConnection.SetRemoteDescription(offer.Offer); err != nil { + ErrorToHTTP(w, err) + return + } + + h.l.Sugar().Infow("Gathering WebRTC Candidates") + gatherComplete := webrtc.GatheringCompletePromise(peerConnection) + answer, err := peerConnection.CreateAnswer(nil) + if err != nil { + ErrorToHTTP(w, err) + return + } else if err = peerConnection.SetLocalDescription(answer); err != nil { + ErrorToHTTP(w, err) + return + } + <-gatherComplete + + h.l.Sugar().Infow("Gathering WebRTC Candidates Complete") + + response, err := json.Marshal(*peerConnection.LocalDescription()) + if err != nil { + ErrorToHTTP(w, err) + return + } + + h.l.Sugar().Infow("Connecting to SRT ", + "offer", offer, + ) + srtConnection, err := astisrt.Dial(astisrt.DialOptions{ + ConnectionOptions: []astisrt.ConnectionOption{ + astisrt.WithLatency(h.c.SRTConnectionLatencyMS), + astisrt.WithStreamid(offer.SRTStreamID), + }, + + OnDisconnect: func(c *astisrt.Connection, err error) { + h.l.Sugar().Fatalw("Disconnected from SRT", + "error", err, + ) + }, + + Host: offer.SRTHost, + Port: offer.SRTPort, + }) + if err != nil { + ErrorToHTTP(w, err) + return + } + h.l.Sugar().Infow("Connected to SRT") + + go srtToWebRTC(srtConnection, videoTrack, metadataSender) + + w.Header().Set("Content-Type", "application/json") + if _, err := w.Write(response); err != nil { + ErrorToHTTP(w, err) + return + } +} + +func srtToWebRTC(srtConnection *astisrt.Connection, videoTrack *webrtc.TrackLocalStaticSample, metadataTrack *webrtc.DataChannel) { + r, w := io.Pipe() + defer r.Close() + defer w.Close() + defer srtConnection.Close() + + go func() { + defer srtConnection.Close() + inboundMpegTsPacket := make([]byte, 1316) // SRT Read Size + + for { + n, err := srtConnection.Read(inboundMpegTsPacket) + if err != nil { + break + } + + if _, err := w.Write(inboundMpegTsPacket[:n]); err != nil { + break + } + } + }() + + dmx := astits.NewDemuxer(context.Background(), r) + eia608Reader := eia608.NewEIA608Reader() + h264PID := uint16(0) + for { + d, err := dmx.NextData() + if err != nil { + break + } + + if d.PMT != nil { + for _, es := range d.PMT.ElementaryStreams { + msg, _ := json.Marshal(entity.Message{ + Type: entity.MessageTypeMetadata, + Message: es.StreamType.String(), + }) + metadataTrack.SendText(string(msg)) + if es.StreamType == astits.StreamTypeH264Video { + h264PID = es.ElementaryPID + } + } + + for _, d := range d.PMT.ProgramDescriptors { + if d.MaximumBitrate != nil { + bitrateInMbitsPerSecond := float32(d.MaximumBitrate.Bitrate) / float32(125000) + msg, _ := json.Marshal(entity.Message{ + Type: entity.MessageTypeMetadata, + Message: fmt.Sprintf("Bitrate %.2fMbps", bitrateInMbitsPerSecond), + }) + metadataTrack.SendText(string(msg)) + } + } + } + + if d.PID == h264PID && d.PES != nil { + if err = videoTrack.WriteSample(media.Sample{Data: d.PES.Data, Duration: time.Second / 30}); err != nil { + break + } + captions, err := eia608Reader.Parse(d.PES) + if err != nil { + break + } + if captions != "" { + captionsMsg, err := eia608.BuildCaptionsMessage(d.PES.Header.OptionalHeader.PTS, captions) + if err != nil { + break + } + metadataTrack.SendText(captionsMsg) + } + } + } + +} diff --git a/helpers.go b/internal/web/util.go similarity index 68% rename from helpers.go rename to internal/web/util.go index 70cb16b..2c767be 100644 --- a/helpers.go +++ b/internal/web/util.go @@ -1,15 +1,12 @@ -package main +package handlers -import ( - "net/http" -) +import "net/http" -func errorToHTTP(w http.ResponseWriter, err error) { - w.WriteHeader(500) - w.Write([]byte(err.Error())) +func ErrorToHTTP(w http.ResponseWriter, err error) { + http.Error(w, err.Error(), http.StatusInternalServerError) } -func setCors(w http.ResponseWriter, r *http.Request) { +func SetCORS(w http.ResponseWriter, r *http.Request) { if origin := r.Header.Get("Origin"); origin != "" { allowedHeaders := "Accept, Content-Type, Content-Length, Accept-Encoding, Authorization,X-CSRF-Token" w.Header().Set("Access-Control-Allow-Origin", "*") diff --git a/main.go b/main.go index 689793f..4821abe 100644 --- a/main.go +++ b/main.go @@ -4,259 +4,49 @@ package main import ( - "context" - _ "embed" - "encoding/json" "flag" - "fmt" - "io" "log" - "net" "net/http" - "time" - "github.com/flavioribeiro/donut/eia608" "github.com/flavioribeiro/donut/internal/entity" + handlers "github.com/flavioribeiro/donut/internal/web" - astisrt "github.com/asticode/go-astisrt/pkg" - "github.com/asticode/go-astits" - "github.com/pion/webrtc/v3" - "github.com/pion/webrtc/v3/pkg/media" + "github.com/kelseyhightower/envconfig" + "go.uber.org/fx" + "go.uber.org/zap" ) -var ( - //go:embed index.html - indexHTML string - - api *webrtc.API //nolint - - enableICEMux = false -) - -func srtToWebRTC(srtConnection *astisrt.Connection, videoTrack *webrtc.TrackLocalStaticSample, metadataTrack *webrtc.DataChannel) { - r, w := io.Pipe() - defer r.Close() - defer w.Close() - defer srtConnection.Close() - - go func() { - defer srtConnection.Close() - inboundMpegTsPacket := make([]byte, 1316) // SRT Read Size - - for { - n, err := srtConnection.Read(inboundMpegTsPacket) - if err != nil { - break - } - - if _, err := w.Write(inboundMpegTsPacket[:n]); err != nil { - break - } - } - }() - - dmx := astits.NewDemuxer(context.Background(), r) - eia608Reader := eia608.NewEIA608Reader() - h264PID := uint16(0) - for { - d, err := dmx.NextData() - if err != nil { - break - } - - if d.PMT != nil { - for _, es := range d.PMT.ElementaryStreams { - msg, _ := json.Marshal(entity.Message{ - Type: entity.MessageTypeMetadata, - Message: es.StreamType.String(), - }) - metadataTrack.SendText(string(msg)) - if es.StreamType == astits.StreamTypeH264Video { - h264PID = es.ElementaryPID - } - } - - for _, d := range d.PMT.ProgramDescriptors { - if d.MaximumBitrate != nil { - bitrateInMbitsPerSecond := float32(d.MaximumBitrate.Bitrate) / float32(125000) - msg, _ := json.Marshal(entity.Message{ - Type: entity.MessageTypeMetadata, - Message: fmt.Sprintf("Bitrate %.2fMbps", bitrateInMbitsPerSecond), - }) - metadataTrack.SendText(string(msg)) - } - } - } - - if d.PID == h264PID && d.PES != nil { - if err = videoTrack.WriteSample(media.Sample{Data: d.PES.Data, Duration: time.Second / 30}); err != nil { - break - } - captions, err := eia608Reader.Parse(d.PES) - if err != nil { - break - } - if captions != "" { - captionsMsg, err := eia608.BuildCaptionsMessage(d.PES.Header.OptionalHeader.PTS, captions) - if err != nil { - break - } - metadataTrack.SendText(captionsMsg) - } - } - } - -} - -func doSignaling(w http.ResponseWriter, r *http.Request) { - setCors(w, r) - if r.Method != http.MethodPost { - return - } - - peerConnectionConfiguration := webrtc.Configuration{} - if !enableICEMux { - peerConnectionConfiguration.ICEServers = []webrtc.ICEServer{ - { - URLs: []string{ - "stun:stun4.l.google.com:19302", - }, - }, - } - } - - peerConnection, err := api.NewPeerConnection(peerConnectionConfiguration) - if err != nil { - errorToHTTP(w, err) - return - } - - offer := entity.ParamsOffer{} - - if err = json.NewDecoder(r.Body).Decode(&offer); err != nil { - errorToHTTP(w, err) - return - } - - // Create a video track - videoTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, "video", offer.SRTStreamID) - if err != nil { - errorToHTTP(w, err) - return - } - if _, err := peerConnection.AddTrack(videoTrack); err != nil { - errorToHTTP(w, err) - return - } - - // Create data channel for metadata transmission - metadataSender, err := peerConnection.CreateDataChannel("metadata", nil) - if err != nil { - errorToHTTP(w, err) - } - - // Set the handler for ICE connection state - // This will notify you when the peer has connected/disconnected - peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { - log.Printf("ICE Connection State has changed: %s\n", connectionState.String()) - }) - - err = offer.Valid() - if err != nil { - errorToHTTP(w, err) - return - } - - if err = peerConnection.SetRemoteDescription(offer.Offer); err != nil { - errorToHTTP(w, err) - return - } - - log.Println("Gathering WebRTC Candidates") - gatherComplete := webrtc.GatheringCompletePromise(peerConnection) - answer, err := peerConnection.CreateAnswer(nil) - if err != nil { - errorToHTTP(w, err) - return - } else if err = peerConnection.SetLocalDescription(answer); err != nil { - errorToHTTP(w, err) - return - } - <-gatherComplete - log.Println("Gathering WebRTC Candidates Complete") - - response, err := json.Marshal(*peerConnection.LocalDescription()) - if err != nil { - return - } - - log.Println("Connecting to SRT ", offer) - srtConnection, err := astisrt.Dial(astisrt.DialOptions{ - ConnectionOptions: []astisrt.ConnectionOption{ - astisrt.WithLatency(300), - astisrt.WithStreamid(offer.SRTStreamID), - }, - - // Callback when the connection is disconnected - OnDisconnect: func(c *astisrt.Connection, err error) { log.Fatal("Disconnected from SRT") }, - - Host: offer.SRTHost, - Port: offer.SRTPort, - }) - if err != nil { - errorToHTTP(w, err) - return - } - log.Println("Connected to SRT") - - go srtToWebRTC(srtConnection, videoTrack, metadataSender) - - w.Header().Set("Content-Type", "application/json") - if _, err := w.Write(response); err != nil { - errorToHTTP(w, err) - return - } -} - func main() { + enableICEMux := false flag.BoolVar(&enableICEMux, "enable-ice-mux", false, "Enable ICE Mux on :8081") flag.Parse() - mediaEngine := &webrtc.MediaEngine{} - settingEngine := webrtc.SettingEngine{} - if err := mediaEngine.RegisterDefaultCodecs(); err != nil { - log.Fatal(err) + var c entity.Config + err := envconfig.Process("donut", &c) + if err != nil { + log.Fatal(err.Error()) } + c.EnableICEMux = enableICEMux - if enableICEMux { - tcpListener, err := net.ListenTCP("tcp", &net.TCPAddr{ - IP: net.IP{0, 0, 0, 0}, - Port: 8081, - }) - if err != nil { - log.Fatal(err) - } + fx.New( + fx.Provide(func() *entity.Config { + return &c + }), + fx.Provide(handlers.NewHTTPServer), - udpListener, err := net.ListenUDP("udp", &net.UDPAddr{ - IP: net.IP{0, 0, 0, 0}, - Port: 8081, - }) - if err != nil { - log.Fatal(err) - } + fx.Provide(handlers.NewSignalingHandler), + fx.Provide(handlers.NewIndexHandler), - settingEngine.SetNAT1To1IPs([]string{"127.0.0.1"}, webrtc.ICECandidateTypeHost) - settingEngine.SetICETCPMux(webrtc.NewICETCPMux(nil, tcpListener, 8)) - settingEngine.SetICEUDPMux(webrtc.NewICEUDPMux(nil, udpListener)) - } - api = webrtc.NewAPI(webrtc.WithSettingEngine(settingEngine), webrtc.WithMediaEngine(mediaEngine)) + fx.Provide(handlers.NewServeMux), - http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(200) - w.Write([]byte(indexHTML)) - }) - http.HandleFunc("/doSignaling", doSignaling) + fx.Provide(handlers.NewTCPICEServer), + fx.Provide(handlers.NewUDPICEServer), + fx.Provide(handlers.NewWebRTCSettingsEngine), + fx.Provide(handlers.NewWebRTCMediaEngine), - log.Println("Open http://localhost:8080 to access this demo") - log.Fatal(http.ListenAndServe(":8080", nil)) + fx.Provide(zap.NewProduction), + + // just to enforce the lifecycle by using NewHTTPServer + fx.Invoke(func(*http.Server) {}), + ).Run() }