move components to plural named packages

This commit is contained in:
Leandro Moreira
2024-01-28 07:05:31 -03:00
parent 852ad6c1ad
commit 74d84b13f2
15 changed files with 157 additions and 156 deletions

View File

@@ -1,4 +1,4 @@
package streaming
package controllers
import (
"context"
@@ -10,18 +10,18 @@ import (
astisrt "github.com/asticode/go-astisrt/pkg"
"github.com/asticode/go-astits"
"github.com/flavioribeiro/donut/eia608"
"github.com/flavioribeiro/donut/internal/entity"
"github.com/flavioribeiro/donut/internal/entities"
"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/media"
"go.uber.org/zap"
)
type StreamingController struct {
c *entity.Config
c *entities.Config
l *zap.Logger
}
func NewStreamingController(c *entity.Config, l *zap.Logger) *StreamingController {
func NewStreamingController(c *entities.Config, l *zap.Logger) *StreamingController {
return &StreamingController{
c: c,
l: l,
@@ -90,8 +90,8 @@ func (*StreamingController) captureBitrateAndSendToWebRTC(d *astits.DemuxerData,
for _, d := range d.PMT.ProgramDescriptors {
if d.MaximumBitrate != nil {
bitrateInMbitsPerSecond := float32(d.MaximumBitrate.Bitrate) / float32(125000)
msg, _ := json.Marshal(entity.Message{
Type: entity.MessageTypeMetadata,
msg, _ := json.Marshal(entities.Message{
Type: entities.MessageTypeMetadata,
Message: fmt.Sprintf("Bitrate %.2fMbps", bitrateInMbitsPerSecond),
})
metadataTrack.SendText(string(msg))
@@ -102,8 +102,8 @@ func (*StreamingController) captureBitrateAndSendToWebRTC(d *astits.DemuxerData,
func (*StreamingController) captureMediaInfoAndSendToWebRTC(d *astits.DemuxerData, metadataTrack *webrtc.DataChannel, h264PID uint16) uint16 {
for _, es := range d.PMT.ElementaryStreams {
msg, _ := json.Marshal(entity.Message{
Type: entity.MessageTypeMetadata,
msg, _ := json.Marshal(entities.Message{
Type: entities.MessageTypeMetadata,
Message: es.StreamType.String(),
})
metadataTrack.SendText(string(msg))

View File

@@ -1,26 +1,26 @@
package srt
package controllers
import (
astisrt "github.com/asticode/go-astisrt/pkg"
"github.com/flavioribeiro/donut/internal/entity"
"github.com/flavioribeiro/donut/internal/entities"
"go.uber.org/zap"
)
type SRTController struct {
c *entity.Config
c *entities.Config
l *zap.Logger
}
func NewSRTController(c *entity.Config, l *zap.Logger) *SRTController {
func NewSRTController(c *entities.Config, l *zap.Logger) *SRTController {
return &SRTController{
c: c,
l: l,
}
}
func (c *SRTController) Connect(params *entity.RequestParams) (*astisrt.Connection, error) {
func (c *SRTController) Connect(params *entities.RequestParams) (*astisrt.Connection, error) {
if params == nil {
return nil, entity.ErrMissingRemoteOffer
return nil, entities.ErrMissingRemoteOffer
}
if err := params.Valid(); err != nil {

View File

@@ -1,16 +1,16 @@
package webrtc
package controllers
import (
"net"
"github.com/flavioribeiro/donut/internal/entity"
"github.com/flavioribeiro/donut/internal/entities"
"github.com/flavioribeiro/donut/internal/mapper"
"github.com/pion/webrtc/v3"
"go.uber.org/zap"
)
type WebRTCController struct {
c *entity.Config
c *entities.Config
l *zap.Logger
iceTcp net.Listener
iceUdp net.PacketConn
@@ -18,7 +18,7 @@ type WebRTCController struct {
}
func NewWebRTCController(
c *entity.Config,
c *entities.Config,
l *zap.Logger,
iceTcp net.Listener,
iceUdp net.PacketConn,
@@ -73,7 +73,7 @@ func (c *WebRTCController) SetupPeerConnection() error {
return nil
}
func (c *WebRTCController) CreateTrack(track entity.Track, id string, streamId string) (*webrtc.TrackLocalStaticSample, error) {
func (c *WebRTCController) CreateTrack(track entities.Track, id string, streamId string) (*webrtc.TrackLocalStaticSample, error) {
codecCapability := mapper.FromTrackToRTPCodecCapability(track)
webRTCtrack, err := webrtc.NewTrackLocalStaticSample(codecCapability, id, streamId)
if err != nil {
@@ -89,7 +89,7 @@ func (c *WebRTCController) CreateTrack(track entity.Track, id string, streamId s
func (c *WebRTCController) CreateDataChannel(channelID string) (*webrtc.DataChannel, error) {
if c.peer == nil {
// TODO: or call SetupPeerConnection?
return nil, entity.ErrMissingWebRTCSetup
return nil, entities.ErrMissingWebRTCSetup
}
metadataSender, err := c.peer.CreateDataChannel(channelID, nil)
@@ -102,7 +102,7 @@ func (c *WebRTCController) CreateDataChannel(channelID string) (*webrtc.DataChan
func (c *WebRTCController) SetRemoteDescription(desc webrtc.SessionDescription) error {
if c.peer == nil {
// TODO: or call SetupPeerConnection?
return entity.ErrMissingWebRTCSetup
return entities.ErrMissingWebRTCSetup
}
err := c.peer.SetRemoteDescription(desc)
@@ -115,7 +115,7 @@ func (c *WebRTCController) SetRemoteDescription(desc webrtc.SessionDescription)
func (c *WebRTCController) GatheringWebRTC() (*webrtc.SessionDescription, error) {
if c.peer == nil {
// TODO: or call SetupPeerConnection?
return nil, entity.ErrMissingWebRTCSetup
return nil, entities.ErrMissingWebRTCSetup
}
c.l.Sugar().Infow("Gathering WebRTC Candidates")
@@ -132,7 +132,7 @@ func (c *WebRTCController) GatheringWebRTC() (*webrtc.SessionDescription, error)
return c.peer.LocalDescription(), nil
}
func NewWebRTCSettingsEngine(c *entity.Config, tcpListener net.Listener, udpListener net.PacketConn) webrtc.SettingEngine {
func NewWebRTCSettingsEngine(c *entities.Config, tcpListener net.Listener, udpListener net.PacketConn) webrtc.SettingEngine {
settingEngine := webrtc.SettingEngine{}
settingEngine.SetNAT1To1IPs(c.ICEExternalIPsDNAT, webrtc.ICECandidateTypeHost)
@@ -150,7 +150,7 @@ func NewWebRTCMediaEngine() (*webrtc.MediaEngine, error) {
return mediaEngine, nil
}
func NewTCPICEServer(c *entity.Config) (net.Listener, error) {
func NewTCPICEServer(c *entities.Config) (net.Listener, error) {
tcpListener, err := net.ListenTCP("tcp", &net.TCPAddr{
IP: net.IP{0, 0, 0, 0},
Port: c.TCPICEPort,
@@ -161,8 +161,7 @@ func NewTCPICEServer(c *entity.Config) (net.Listener, error) {
return tcpListener, nil
}
func NewUDPICEServer(c *entity.Config) (net.PacketConn, error) {
func NewUDPICEServer(c *entities.Config) (net.PacketConn, error) {
udpListener, err := net.ListenUDP("udp", &net.UDPAddr{
IP: net.IP{0, 0, 0, 0},
Port: c.UDPICEPort,
@@ -170,6 +169,5 @@ func NewUDPICEServer(c *entity.Config) (net.PacketConn, error) {
if err != nil {
return nil, err
}
return udpListener, nil
}

View File

@@ -1,4 +1,4 @@
package entity
package entities
import (
"fmt"

View File

@@ -1,4 +1,4 @@
package entity
package entities
import "errors"

View File

@@ -1,14 +1,14 @@
package mapper
import (
"github.com/flavioribeiro/donut/internal/entity"
"github.com/flavioribeiro/donut/internal/entities"
"github.com/pion/webrtc/v3"
)
func FromTrackToRTPCodecCapability(track entity.Track) webrtc.RTPCodecCapability {
func FromTrackToRTPCodecCapability(track entities.Track) webrtc.RTPCodecCapability {
response := webrtc.RTPCodecCapability{}
if track.Type == entity.H264 {
if track.Type == entities.H264 {
response.MimeType = webrtc.MimeTypeH264
}

View File

@@ -0,0 +1,12 @@
package handlers
import "net/http"
func SetError(w http.ResponseWriter, err error) {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
func SetSuccessJson(w http.ResponseWriter) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
}

View File

@@ -6,7 +6,7 @@ import (
"net/http"
astisrt "github.com/asticode/go-astisrt/pkg"
"github.com/flavioribeiro/donut/internal/entity"
"github.com/flavioribeiro/donut/internal/entities"
)
type MediaHandler struct{}
@@ -16,33 +16,32 @@ func NewMediaHandler() *MediaHandler {
}
func (m *MediaHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
SetCORS(w, r)
if r.Method != http.MethodGet {
ErrorToHTTP(w, entity.ErrHTTPGetOnly)
SetError(w, entities.ErrHTTPGetOnly)
return
}
offer := entity.RequestParams{}
if err := json.NewDecoder(r.Body).Decode(&offer); err != nil {
ErrorToHTTP(w, err)
params := entities.RequestParams{}
if err := json.NewDecoder(r.Body).Decode(&params); err != nil {
SetError(w, err)
return
}
log.Println("Connecting to SRT ", offer)
log.Println("Connecting to SRT ", params)
_, err := astisrt.Dial(astisrt.DialOptions{
ConnectionOptions: []astisrt.ConnectionOption{
astisrt.WithLatency(300),
astisrt.WithStreamid(offer.SRTStreamID),
astisrt.WithStreamid(params.SRTStreamID),
},
// Callback when the connection is disconnected
OnDisconnect: func(c *astisrt.Connection, err error) { log.Fatal("Disconnected from SRT") },
Host: offer.SRTHost,
Port: offer.SRTPort,
Host: params.SRTHost,
Port: params.SRTPort,
})
if err != nil {
ErrorToHTTP(w, err)
SetError(w, err)
return
}
log.Println("Connected to SRT")

View File

@@ -4,27 +4,25 @@ import (
"encoding/json"
"net/http"
donutsrt "github.com/flavioribeiro/donut/internal/controller/srt"
donutstreaming "github.com/flavioribeiro/donut/internal/controller/streaming"
donutwebrtc "github.com/flavioribeiro/donut/internal/controller/webrtc"
"github.com/flavioribeiro/donut/internal/entity"
"github.com/flavioribeiro/donut/internal/controllers"
"github.com/flavioribeiro/donut/internal/entities"
"go.uber.org/zap"
)
type SignalingHandler struct {
c *entity.Config
c *entities.Config
l *zap.Logger
webRTCController *donutwebrtc.WebRTCController
srtController *donutsrt.SRTController
streamingController *donutstreaming.StreamingController
webRTCController *controllers.WebRTCController
srtController *controllers.SRTController
streamingController *controllers.StreamingController
}
func NewSignalingHandler(
c *entity.Config,
c *entities.Config,
log *zap.Logger,
webRTCController *donutwebrtc.WebRTCController,
srtController *donutsrt.SRTController,
streamingController *donutstreaming.StreamingController,
webRTCController *controllers.WebRTCController,
srtController *controllers.SRTController,
streamingController *controllers.StreamingController,
) *SignalingHandler {
return &SignalingHandler{
c: c,
@@ -36,71 +34,70 @@ func NewSignalingHandler(
}
func (h *SignalingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
SetCORS(w, r)
if r.Method != http.MethodPost {
ErrorToHTTP(w, entity.ErrHTTPPostOnly)
SetError(w, entities.ErrHTTPPostOnly)
return
}
params := entity.RequestParams{}
params := entities.RequestParams{}
if err := json.NewDecoder(r.Body).Decode(&params); err != nil {
ErrorToHTTP(w, err)
SetError(w, err)
return
}
if err := params.Valid(); err != nil {
ErrorToHTTP(w, err)
SetError(w, err)
return
}
if err := h.webRTCController.SetupPeerConnection(); err != nil {
ErrorToHTTP(w, err)
SetError(w, err)
return
}
// TODO: create tracks according with SRT available streams
// Create a video track
videoTrack, err := h.webRTCController.CreateTrack(
entity.Track{
Type: entity.H264,
entities.Track{
Type: entities.H264,
}, "video", params.SRTStreamID,
)
if err != nil {
ErrorToHTTP(w, err)
SetError(w, err)
return
}
metadataSender, err := h.webRTCController.CreateDataChannel(entity.MetadataChannelID)
metadataSender, err := h.webRTCController.CreateDataChannel(entities.MetadataChannelID)
if err != nil {
ErrorToHTTP(w, err)
SetError(w, err)
}
if err = h.webRTCController.SetRemoteDescription(params.Offer); err != nil {
ErrorToHTTP(w, err)
SetError(w, err)
return
}
localDescription, err := h.webRTCController.GatheringWebRTC()
if err != nil {
ErrorToHTTP(w, err)
SetError(w, err)
return
}
localOfferDescription, err := json.Marshal(*localDescription)
if err != nil {
ErrorToHTTP(w, err)
SetError(w, err)
return
}
srtConnection, err := h.srtController.Connect(&params)
if err != nil {
ErrorToHTTP(w, err)
SetError(w, err)
return
}
go h.streamingController.Stream(srtConnection, videoTrack, metadataSender)
if _, err := w.Write(localOfferDescription); err != nil {
ErrorToHTTP(w, err)
SetError(w, err)
return
}
SetSuccessJson(w)

View File

@@ -1,60 +0,0 @@
package handlers
import (
"context"
"fmt"
"net"
"net/http"
"github.com/flavioribeiro/donut/internal/entity"
"go.uber.org/fx"
"go.uber.org/zap"
)
func NewHTTPServer(
c *entity.Config,
mux *http.ServeMux,
log *zap.Logger,
lc fx.Lifecycle,
) *http.Server {
srv := &http.Server{
Addr: fmt.Sprintf("%s:%d", c.HTTPHost, c.HTTPPort),
Handler: mux,
}
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
ln, err := net.Listen("tcp", srv.Addr)
if err != nil {
return err
}
log.Sugar().Infow(fmt.Sprintf("Starting HTTP server. Open http://%s to access the demo", srv.Addr),
"addr", srv.Addr,
)
go srv.Serve(ln)
return nil
},
OnStop: func(ctx context.Context) error {
return srv.Shutdown(ctx)
},
})
return srv
}
func ErrorToHTTP(w http.ResponseWriter, err error) {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
func SetCORS(w http.ResponseWriter, r *http.Request) {
if origin := r.Header.Get("Origin"); origin != "" {
allowedHeaders := "Accept, Content-Type, Content-Length, Accept-Encoding, Authorization,X-CSRF-Token"
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE")
w.Header().Set("Access-Control-Allow-Headers", allowedHeaders)
w.Header().Set("Access-Control-Expose-Headers", "Authorization")
}
}
func SetSuccessJson(w http.ResponseWriter) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
}

View File

@@ -1,16 +1,33 @@
package handlers
package web
import "net/http"
import (
"net/http"
"github.com/flavioribeiro/donut/internal/web/handlers"
)
func NewServeMux(
index *IndexHandler,
signaling *SignalingHandler,
index *handlers.IndexHandler,
signaling *handlers.SignalingHandler,
) *http.ServeMux {
mux := http.NewServeMux()
mux.Handle("/", index)
mux.Handle("/doSignaling", signaling)
mux.Handle("/doSignaling", setCors(signaling))
return mux
}
func setCors(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if origin := r.Header.Get("Origin"); origin != "" {
allowedHeaders := "Accept, Content-Type, Content-Length, Accept-Encoding, Authorization,X-CSRF-Token"
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE")
w.Header().Set("Access-Control-Allow-Headers", allowedHeaders)
w.Header().Set("Access-Control-Expose-Headers", "Authorization")
}
next.ServeHTTP(w, r)
})
}

41
internal/web/server.go Normal file
View File

@@ -0,0 +1,41 @@
package web
import (
"context"
"fmt"
"net"
"net/http"
"github.com/flavioribeiro/donut/internal/entities"
"go.uber.org/fx"
"go.uber.org/zap"
)
func NewHTTPServer(
c *entities.Config,
mux *http.ServeMux,
log *zap.Logger,
lc fx.Lifecycle,
) *http.Server {
srv := &http.Server{
Addr: fmt.Sprintf("%s:%d", c.HTTPHost, c.HTTPPort),
Handler: mux,
}
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
ln, err := net.Listen("tcp", srv.Addr)
if err != nil {
return err
}
log.Sugar().Infow(fmt.Sprintf("Starting HTTP server. Open http://%s to access the demo", srv.Addr),
"addr", srv.Addr,
)
go srv.Serve(ln)
return nil
},
OnStop: func(ctx context.Context) error {
return srv.Shutdown(ctx)
},
})
return srv
}

37
main.go
View File

@@ -8,11 +8,10 @@ import (
"log"
"net/http"
donutsrt "github.com/flavioribeiro/donut/internal/controller/srt"
donutstreaming "github.com/flavioribeiro/donut/internal/controller/streaming"
donutwebrtc "github.com/flavioribeiro/donut/internal/controller/webrtc"
"github.com/flavioribeiro/donut/internal/entity"
handlers "github.com/flavioribeiro/donut/internal/web"
"github.com/flavioribeiro/donut/internal/controllers"
"github.com/flavioribeiro/donut/internal/entities"
"github.com/flavioribeiro/donut/internal/web"
"github.com/flavioribeiro/donut/internal/web/handlers"
"github.com/kelseyhightower/envconfig"
"go.uber.org/fx"
@@ -24,7 +23,7 @@ func main() {
flag.BoolVar(&enableICEMux, "enable-ice-mux", false, "Enable ICE Mux on :8081")
flag.Parse()
var c entity.Config
var c entities.Config
err := envconfig.Process("donut", &c)
if err != nil {
log.Fatal(err.Error())
@@ -32,30 +31,28 @@ func main() {
c.EnableICEMux = enableICEMux
fx.New(
// Server entry point
fx.Provide(handlers.NewHTTPServer),
// HTTP Server
fx.Provide(web.NewHTTPServer),
// HTTP router
fx.Provide(web.NewServeMux),
// HTTP handlers
fx.Provide(handlers.NewSignalingHandler),
fx.Provide(handlers.NewIndexHandler),
// HTTP router
fx.Provide(handlers.NewServeMux),
// ICE mux servers
fx.Provide(donutwebrtc.NewTCPICEServer),
fx.Provide(donutwebrtc.NewUDPICEServer),
fx.Provide(controllers.NewTCPICEServer),
fx.Provide(controllers.NewUDPICEServer),
// WebRTC controller
fx.Provide(donutwebrtc.NewWebRTCController),
// SRT controller
fx.Provide(donutsrt.NewSRTController),
// Streaming controller
fx.Provide(donutstreaming.NewStreamingController),
// Controllers
fx.Provide(controllers.NewWebRTCController),
fx.Provide(controllers.NewSRTController),
fx.Provide(controllers.NewStreamingController),
// Logging, Config
fx.Provide(zap.NewProduction),
fx.Provide(func() *entity.Config {
fx.Provide(func() *entities.Config {
return &c
}),