Fix the selection of the local endpoint for stream forwarding

This commit is contained in:
Dmitrii Okunev
2024-10-19 17:21:55 +01:00
parent aa22de6eff
commit b084ea8e1b
22 changed files with 338 additions and 295 deletions

View File

@@ -22,7 +22,7 @@ type Output struct {
func formatFromScheme(scheme string) string {
switch scheme {
case "rtmp":
case "rtmp", "rtmps":
return "flv"
default:
return scheme

View File

@@ -61,13 +61,16 @@ func (c *Client) NewInputFromURL(
url string,
authKey string,
config InputConfig,
) (InputID, error) {
) (_ InputID, _err error) {
client, conn, err := c.grpcClient()
if err != nil {
return 0, err
}
defer conn.Close()
logger.Debugf(ctx, "NewInputFromURL(ctx, '%s', authKey, %#+v)", url, authKey)
defer func() { logger.Debugf(ctx, "/NewInputFromURL(ctx, '%s', authKey, %#+v): %v", url, authKey, _err) }()
resp, err := client.NewInput(ctx, &recoder_grpc.NewInputRequest{
Path: &recoder_grpc.ResourcePath{
ResourcePath: &recoder_grpc.ResourcePath_Url{

View File

@@ -17,6 +17,7 @@ import (
"github.com/xaionaro-go/streamctl/pkg/streampanel/consts"
sptypes "github.com/xaionaro-go/streamctl/pkg/streamplayer/types"
sstypes "github.com/xaionaro-go/streamctl/pkg/streamserver/types"
"github.com/xaionaro-go/streamctl/pkg/streamserver/types/streamportserver"
"github.com/xaionaro-go/streamctl/pkg/streamtypes"
)
@@ -99,7 +100,7 @@ type StreamD interface {
ctx context.Context,
serverType StreamServerType,
listenAddr string,
opts ...sstypes.ServerOption,
opts ...streamportserver.Option,
) error
StopStreamServer(
ctx context.Context,
@@ -288,10 +289,7 @@ type BackendDataYouTube struct {
type StreamServerType = streamtypes.ServerType
type StreamServer struct {
sstypes.ServerConfig
Type StreamServerType
ListenAddr string
streamportserver.Config
NumBytesConsumerWrote uint64
NumBytesProducerRead uint64

View File

@@ -34,6 +34,7 @@ import (
"github.com/xaionaro-go/streamctl/pkg/streampanel/consts"
sptypes "github.com/xaionaro-go/streamctl/pkg/streamplayer/types"
"github.com/xaionaro-go/streamctl/pkg/streamserver/types"
"github.com/xaionaro-go/streamctl/pkg/streamserver/types/streamportserver"
"github.com/xaionaro-go/streamctl/pkg/streamtypes"
"github.com/xaionaro-go/streamctl/pkg/xsync"
"google.golang.org/grpc"
@@ -1259,9 +1260,11 @@ func (c *Client) ListStreamServers(
)
}
result = append(result, api.StreamServer{
ServerConfig: opts.Config(ctx),
Type: srvType,
ListenAddr: listenAddr,
Config: streamportserver.Config{
ProtocolSpecificConfig: opts.ProtocolSpecificConfig(ctx),
Type: srvType,
ListenAddr: listenAddr,
},
NumBytesConsumerWrote: uint64(
server.GetStatistics().
GetNumBytesConsumerWrote(),
@@ -1279,7 +1282,7 @@ func (c *Client) StartStreamServer(
ctx context.Context,
serverType api.StreamServerType,
listenAddr string,
opts ...types.ServerOption,
opts ...streamportserver.Option,
) error {
cfg, err := goconv.StreamServerConfigGo2GRPC(
ctx,

View File

@@ -9,7 +9,7 @@ import (
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/streamctl/pkg/streamd/api"
"github.com/xaionaro-go/streamctl/pkg/streamd/grpc/go/streamd_grpc"
"github.com/xaionaro-go/streamctl/pkg/streamserver/types"
"github.com/xaionaro-go/streamctl/pkg/streamserver/types/streamportserver"
"github.com/xaionaro-go/streamctl/pkg/streamtypes"
)
@@ -41,14 +41,14 @@ func StreamServerConfigGo2GRPC(
ctx context.Context,
serverType api.StreamServerType,
listenAddr string,
opts ...types.ServerOption,
opts ...streamportserver.Option,
) (*streamd_grpc.StreamServer, error) {
t, err := StreamServerTypeGo2GRPC(serverType)
if err != nil {
return nil, fmt.Errorf("unable to convert the server type: %w", err)
}
cfg := types.ServerOptions(opts).Config(ctx)
cfg := streamportserver.Options(opts).ProtocolSpecificConfig(ctx)
var serverCert *streamd_grpc.TLSCertificate
if cfg.ServerCert != nil {
logger.Debugf(ctx, "cfg.ServerCert != nil: %#+v", cfg.ServerKey)
@@ -88,7 +88,7 @@ func StreamServerConfigGo2GRPC(
func StreamServerConfigGRPC2Go(
ctx context.Context,
srv *streamd_grpc.StreamServer,
) (api.StreamServerType, string, types.ServerOptions, error) {
) (api.StreamServerType, string, streamportserver.Options, error) {
srvType, err := StreamServerTypeGRPC2Go(srv.GetServerType())
if err != nil {
return 0, "", nil, fmt.Errorf("unable to convert the server type value: %w", err)
@@ -104,7 +104,7 @@ func StreamServerConfigGRPC2Go(
return 0, "", nil, fmt.Errorf("unable to convert the private key: %w", err)
}
cfg := &types.ServerConfig{
psCfg := &streamportserver.ProtocolSpecificConfig{
IsTLS: srv.GetIsTLS(),
WriteQueueSize: srv.GetWriteQueueSize(),
WriteTimeout: time.Nanosecond * time.Duration(srv.GetWriteTimeoutNano()),
@@ -113,5 +113,5 @@ func StreamServerConfigGRPC2Go(
ServerKey: serverKey,
}
return srvType, srv.GetListenAddr(), cfg.Options(), nil
return srvType, srv.GetListenAddr(), psCfg.Options(), nil
}

View File

@@ -30,10 +30,10 @@ import (
"github.com/xaionaro-go/streamctl/pkg/streamd/memoize"
"github.com/xaionaro-go/streamctl/pkg/streamd/ui"
"github.com/xaionaro-go/streamctl/pkg/streampanel/consts"
"github.com/xaionaro-go/streamctl/pkg/streamplayer"
sptypes "github.com/xaionaro-go/streamctl/pkg/streamplayer/types"
"github.com/xaionaro-go/streamctl/pkg/streamserver"
sstypes "github.com/xaionaro-go/streamctl/pkg/streamserver/types"
"github.com/xaionaro-go/streamctl/pkg/streamserver/types/streamportserver"
"github.com/xaionaro-go/streamctl/pkg/streamtypes"
"github.com/xaionaro-go/streamctl/pkg/xcontext"
"github.com/xaionaro-go/streamctl/pkg/xpath"
@@ -936,10 +936,12 @@ func (d *StreamD) ListStreamServers(
var result []api.StreamServer
for idx, portSrv := range servers {
srv := api.StreamServer{
ServerConfig: portSrv.Config(),
Config: streamportserver.Config{
ProtocolSpecificConfig: portSrv.ProtocolSpecificConfig(),
Type: portSrv.Type(),
ListenAddr: portSrv.ListenAddr(),
Type: portSrv.Type(),
ListenAddr: portSrv.ListenAddr(),
},
NumBytesConsumerWrote: portSrv.NumBytesConsumerWrote(),
NumBytesProducerRead: portSrv.NumBytesProducerRead(),
@@ -957,7 +959,7 @@ func (d *StreamD) StartStreamServer(
ctx context.Context,
serverType api.StreamServerType,
listenAddr string,
opts ...sstypes.ServerOption,
opts ...streamportserver.Option,
) error {
logger.Debugf(ctx, "StartStreamServer")
defer logger.Debugf(ctx, "/StartStreamServer")
@@ -974,7 +976,7 @@ func (d *StreamD) StartStreamServer(
return fmt.Errorf("unable to start stream server: %w", err)
}
logger.Tracef(ctx, "new StreamServer.Servers config == %#+v", d.Config.StreamServer.Servers)
logger.Tracef(ctx, "new StreamServer.Servers config == %#+v", d.Config.StreamServer.PortServers)
err = d.SaveConfig(ctx)
if err != nil {
return fmt.Errorf("unable to save config: %w", err)
@@ -987,10 +989,10 @@ func (d *StreamD) StartStreamServer(
func (d *StreamD) getStreamServerByListenAddr(
ctx context.Context,
listenAddr string,
) *sstypes.PortServer {
) streamportserver.Server {
for _, server := range d.StreamServer.ListServers(ctx) {
if server.ListenAddr() == listenAddr {
return &server
return server
}
}
return nil
@@ -1010,9 +1012,9 @@ func (d *StreamD) StopStreamServer(
return fmt.Errorf("have not found any stream listeners at %s", listenAddr)
}
err := d.StreamServer.StopServer(ctx, *srv)
err := d.StreamServer.StopServer(ctx, srv)
if err != nil {
return fmt.Errorf("unable to stop server %#+v: %w", *srv, err)
return fmt.Errorf("unable to stop server %#+v: %w", srv, err)
}
err = d.SaveConfig(ctx)
@@ -1276,7 +1278,7 @@ func (d *StreamD) UpdateStreamForward(
quirks,
)
if err != nil {
return fmt.Errorf("unable to add the stream forwarding: %w", err)
return fmt.Errorf("unable to update the stream forwarding: %w", err)
}
err = d.SaveConfig(ctx)
@@ -1340,12 +1342,6 @@ func (d *StreamD) WaitForStreamPublisher(
return ch, nil
}
func (d *StreamD) GetStreamPortServers(
ctx context.Context,
) ([]streamplayer.StreamPortServer, error) {
return d.StreamServer.GetPortServers(ctx)
}
func (d *StreamD) AddStreamPlayer(
ctx context.Context,
streamID streamtypes.StreamID,

View File

@@ -21,6 +21,7 @@ import (
"github.com/xaionaro-go/streamctl/pkg/streamd/api"
sptypes "github.com/xaionaro-go/streamctl/pkg/streamplayer/types"
sstypes "github.com/xaionaro-go/streamctl/pkg/streamserver/types"
"github.com/xaionaro-go/streamctl/pkg/streamserver/types/streamportserver"
"github.com/xaionaro-go/streamctl/pkg/streamtypes"
"github.com/xaionaro-go/streamctl/pkg/xcontext"
"github.com/xaionaro-go/streamctl/pkg/xfyne"
@@ -228,8 +229,8 @@ func (p *Panel) addStreamServer(
) error {
logger.Debugf(ctx, "addStreamServer")
defer logger.Debugf(ctx, "/addStreamServer")
var opts sstypes.ServerOptions
opts = append(opts, sstypes.ServerOptionIsTLS(enableTLS))
var opts streamportserver.Options
opts = append(opts, streamportserver.OptionIsTLS(enableTLS))
return p.StreamD.StartStreamServer(
ctx,
proto,

View File

@@ -23,6 +23,7 @@ import (
"github.com/xaionaro-go/streamctl/pkg/streamplayer/types"
"github.com/xaionaro-go/streamctl/pkg/streamserver"
sstypes "github.com/xaionaro-go/streamctl/pkg/streamserver/types"
"github.com/xaionaro-go/streamctl/pkg/streamserver/types/streamportserver"
"github.com/xaionaro-go/streamctl/pkg/streamtypes"
"github.com/xaionaro-go/streamctl/pkg/xfyne"
)
@@ -64,9 +65,9 @@ func main() {
m := player.NewManager(ptypes.OptionPathToMPV(*mpvPath))
ss := streamserver.New(
&sstypes.Config{
Servers: []sstypes.Server{{
Type: streamtypes.ServerTypeRTMP,
Listen: *rtmpListenAddr,
PortServers: []streamportserver.Config{{
Type: streamtypes.ServerTypeRTMP,
ListenAddr: *rtmpListenAddr,
}},
Streams: map[sstypes.StreamID]*sstypes.StreamConfig{
sstypes.StreamID(*streamID): {},

View File

@@ -16,15 +16,11 @@ import (
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/streamctl/pkg/player"
"github.com/xaionaro-go/streamctl/pkg/player/types"
"github.com/xaionaro-go/streamctl/pkg/streamserver/types/streamportserver"
"github.com/xaionaro-go/streamctl/pkg/streamtypes"
"github.com/xaionaro-go/streamctl/pkg/xsync"
)
type StreamPortServer struct {
Addr string
Type streamtypes.ServerType
}
type Publisher interface {
ClosedChan() <-chan struct{}
}
@@ -37,13 +33,9 @@ type WaitPublisherChaner interface {
) (<-chan Publisher, error)
}
type GetPortServerser interface {
GetPortServers(context.Context) ([]StreamPortServer, error)
}
type StreamServer interface {
WaitPublisherChaner
GetPortServerser
streamportserver.GetPortServerser
}
type StreamPlayers struct {
@@ -274,7 +266,7 @@ func (p *StreamPlayerHandler) getInternalURL(ctx context.Context) (*url.URL, err
var u url.URL
u.Scheme = portSrv.Type.String()
u.Host = portSrv.Addr
u.Host = portSrv.ListenAddr
u.Path = string(p.StreamID)
return &u, nil
}

View File

@@ -8,6 +8,7 @@ import (
sptypes "github.com/xaionaro-go/streamctl/pkg/streamplayer/types"
"github.com/xaionaro-go/streamctl/pkg/streamserver/streamforward"
"github.com/xaionaro-go/streamctl/pkg/streamserver/types"
"github.com/xaionaro-go/streamctl/pkg/streamserver/types/streamportserver"
"github.com/xaionaro-go/streamctl/pkg/streamtypes"
)
@@ -24,11 +25,11 @@ type StreamServer interface {
ctx context.Context,
serverType streamtypes.ServerType,
listenAddr string,
opts ...types.ServerOption,
) (types.PortServer, error)
opts ...streamportserver.Option,
) (streamportserver.Server, error)
StopServer(
ctx context.Context,
server types.PortServer,
server streamportserver.Server,
) error
AddIncomingStream(
@@ -118,5 +119,5 @@ type StreamServer interface {
streamID types.StreamID,
) (player.Player, error)
ListServers(ctx context.Context) []types.PortServer
ListServers(ctx context.Context) []streamportserver.Server
}

View File

@@ -11,7 +11,7 @@ import (
"github.com/xaionaro-go/mediamtx/pkg/servers/hls"
"github.com/xaionaro-go/mediamtx/pkg/servers/rtsp"
"github.com/xaionaro-go/mediamtx/pkg/servers/srt"
"github.com/xaionaro-go/streamctl/pkg/streamserver/types"
"github.com/xaionaro-go/streamctl/pkg/streamserver/types/streamportserver"
)
func newRTSPServer() *rtsp.Server {
@@ -86,30 +86,30 @@ func newHLSServer() *hls.Server {
func (s *StreamServer) newServerRTSP(
ctx context.Context,
listenAddr string,
opts ...types.ServerOption,
) (_ types.PortServer, _ret error) {
opts ...streamportserver.Option,
) (_ streamportserver.Server, _ret error) {
return nil, fmt.Errorf("support of RTSP is not implemented, yet")
}
func (s *StreamServer) newServerSRT(
ctx context.Context,
listenAddr string,
opts ...types.ServerOption,
) (_ types.PortServer, _ret error) {
opts ...streamportserver.Option,
) (_ streamportserver.Server, _ret error) {
return nil, fmt.Errorf("support of SRT is not implemented, yet")
}
func (s *StreamServer) newServerHLS(
ctx context.Context,
listenAddr string,
opts ...types.ServerOption,
) (_ types.PortServer, _ret error) {
opts ...streamportserver.Option,
) (_ streamportserver.Server, _ret error) {
return nil, fmt.Errorf("support of HLS is not implemented, yet")
}
func (s *StreamServer) newServerWebRTC(
ctx context.Context,
listenAddr string,
opts ...types.ServerOption,
) (_ types.PortServer, _ret error) {
opts ...streamportserver.Option,
) (_ streamportserver.Server, _ret error) {
return nil, fmt.Errorf("support of WebRTC is not implemented, yet")
}

View File

@@ -20,7 +20,7 @@ import (
mediamtxlogger "github.com/xaionaro-go/mediamtx/pkg/logger"
"github.com/xaionaro-go/mediamtx/pkg/pathmanager"
"github.com/xaionaro-go/mediamtx/pkg/servers/rtmp"
"github.com/xaionaro-go/streamctl/pkg/streamserver/types"
"github.com/xaionaro-go/streamctl/pkg/streamserver/types/streamportserver"
"github.com/xaionaro-go/streamctl/pkg/streamtypes"
"github.com/xaionaro-go/streamctl/pkg/xsync"
)
@@ -29,7 +29,7 @@ type RTMPServer struct {
*rtmp.Server
locker xsync.Mutex
originalConfig types.ServerConfig
originalConfig streamportserver.Config
isInitialized bool
}
@@ -37,16 +37,20 @@ func newRTMPServer(
pathManager *pathmanager.PathManager,
listenAddr string,
logger mediamtxlogger.Writer,
opts ...types.ServerOption,
opts ...streamportserver.Option,
) (*RTMPServer, error) {
cfg := types.ServerOptions(opts).Config(context.Background())
psCfg := streamportserver.Options(opts).ProtocolSpecificConfig(context.Background())
srv := &RTMPServer{
originalConfig: cfg,
originalConfig: streamportserver.Config{
ProtocolSpecificConfig: psCfg,
Type: streamtypes.ServerTypeRTMP,
ListenAddr: listenAddr,
},
Server: &rtmp.Server{
Address: listenAddr,
ReadTimeout: conf.StringDuration(cfg.ReadTimeout),
WriteTimeout: conf.StringDuration(cfg.WriteTimeout),
IsTLS: cfg.IsTLS,
ReadTimeout: conf.StringDuration(psCfg.ReadTimeout),
WriteTimeout: conf.StringDuration(psCfg.WriteTimeout),
IsTLS: psCfg.IsTLS,
ServerCert: "",
ServerKey: "",
RTSPAddress: "",
@@ -58,14 +62,14 @@ func newRTMPServer(
Parent: logger,
},
}
if err := srv.init(cfg); err != nil {
if err := srv.init(srv.originalConfig); err != nil {
return nil, err
}
return srv, nil
}
func (srv *RTMPServer) init(
cfg types.ServerConfig,
cfg streamportserver.Config,
) (_err error) {
defer func() {
if _err != nil {
@@ -215,10 +219,10 @@ func (srv *RTMPServer) setServerCertificate(
return nil
}
var _ types.PortServer = (*RTMPServer)(nil)
var _ streamportserver.Server = (*RTMPServer)(nil)
func (srv *RTMPServer) Config() types.ServerConfig {
return srv.originalConfig
func (srv *RTMPServer) ProtocolSpecificConfig() streamportserver.ProtocolSpecificConfig {
return srv.originalConfig.ProtocolSpecificConfig
}
func (srv *RTMPServer) Close() error {

View File

@@ -21,6 +21,7 @@ import (
"github.com/xaionaro-go/streamctl/pkg/streamserver/implementations/libav/streamforward"
"github.com/xaionaro-go/streamctl/pkg/streamserver/streamplayers"
"github.com/xaionaro-go/streamctl/pkg/streamserver/types"
"github.com/xaionaro-go/streamctl/pkg/streamserver/types/streamportserver"
"github.com/xaionaro-go/streamctl/pkg/streamtypes"
"github.com/xaionaro-go/streamctl/pkg/xsync"
)
@@ -36,7 +37,7 @@ type StreamServer struct {
mutex xsync.Gorex
config *types.Config
pathManager *pathmanager.PathManager
serverHandlers []types.PortServer
serverHandlers []streamportserver.Server
isInitialized bool
streamsStatusLocker xsync.Mutex
@@ -110,18 +111,18 @@ func (s *StreamServer) init(
s.reloadPathConfs(ctx)
for _, srv := range cfg.Servers {
for _, srv := range cfg.PortServers {
{
srv := srv
observability.Go(ctx, func() {
s.mutex.Do(ctx, func() {
_, err := s.startServer(ctx, srv.Type, srv.Listen, srv.Options()...)
_, err := s.startServer(ctx, srv.Type, srv.ListenAddr, srv.Options()...)
if err != nil {
logger.Errorf(
ctx,
"unable to initialize %s server at %s: %w",
srv.Type,
srv.Listen,
srv.ListenAddr,
err,
)
}
@@ -215,13 +216,13 @@ func (s *StreamServer) PubsubNames() (types.AppKeys, error) {
func (s *StreamServer) ListServers(
ctx context.Context,
) (_ret []types.PortServer) {
) (_ret []streamportserver.Server) {
ctx = belt.WithField(ctx, "module", "StreamServer")
logger.Tracef(ctx, "ListServers")
defer func() { logger.Tracef(ctx, "/ListServers: %d servers", len(_ret)) }()
return xsync.DoR1(ctx, &s.mutex, func() []types.PortServer {
c := make([]types.PortServer, len(s.serverHandlers))
return xsync.DoR1(ctx, &s.mutex, func() []streamportserver.Server {
c := make([]streamportserver.Server, len(s.serverHandlers))
copy(c, s.serverHandlers)
return c
})
@@ -231,18 +232,18 @@ func (s *StreamServer) StartServer(
ctx context.Context,
serverType streamtypes.ServerType,
listenAddr string,
opts ...types.ServerOption,
) (types.PortServer, error) {
opts ...streamportserver.Option,
) (streamportserver.Server, error) {
ctx = belt.WithField(ctx, "module", "StreamServer")
return xsync.DoR2(ctx, &s.mutex, func() (types.PortServer, error) {
return xsync.DoR2(ctx, &s.mutex, func() (streamportserver.Server, error) {
portSrv, err := s.startServer(ctx, serverType, listenAddr, opts...)
if err != nil {
return nil, err
}
s.config.Servers = append(s.config.Servers, types.Server{
ServerConfig: types.ServerOptions(opts).Config(ctx),
Type: serverType,
Listen: listenAddr,
s.config.PortServers = append(s.config.PortServers, streamportserver.Config{
ProtocolSpecificConfig: streamportserver.Options(opts).ProtocolSpecificConfig(ctx),
Type: serverType,
ListenAddr: listenAddr,
})
return portSrv, nil
})
@@ -250,7 +251,7 @@ func (s *StreamServer) StartServer(
func (s *StreamServer) findServer(
_ context.Context,
server types.PortServer,
server streamportserver.Server,
) (int, error) {
for i := range s.serverHandlers {
if s.serverHandlers[i] == server {
@@ -262,13 +263,13 @@ func (s *StreamServer) findServer(
func (s *StreamServer) StopServer(
ctx context.Context,
server types.PortServer,
server streamportserver.Server,
) error {
ctx = belt.WithField(ctx, "module", "StreamServer")
return xsync.DoR1(ctx, &s.mutex, func() error {
for idx, srv := range s.config.Servers {
if srv.Listen == server.ListenAddr() {
s.config.Servers = append(s.config.Servers[:idx], s.config.Servers[idx+1:]...)
for idx, srv := range s.config.PortServers {
if srv.ListenAddr == server.ListenAddr() {
s.config.PortServers = append(s.config.PortServers[:idx], s.config.PortServers[idx+1:]...)
break
}
}
@@ -278,7 +279,7 @@ func (s *StreamServer) StopServer(
func (s *StreamServer) stopServer(
ctx context.Context,
server types.PortServer,
server streamportserver.Server,
) error {
idx, err := s.findServer(ctx, server)
if err != nil {
@@ -429,14 +430,15 @@ func (s *StreamServer) WaitPublisherChan(
func (s *StreamServer) GetPortServers(
ctx context.Context,
) ([]streamplayer.StreamPortServer, error) {
) ([]streamportserver.Config, error) {
srvs := s.ListServers(ctx)
result := make([]streamplayer.StreamPortServer, 0, len(srvs))
result := make([]streamportserver.Config, 0, len(srvs))
for _, srv := range srvs {
result = append(result, streamplayer.StreamPortServer{
Addr: srv.ListenAddr(),
Type: srv.Type(),
result = append(result, streamportserver.Config{
ProtocolSpecificConfig: srv.ProtocolSpecificConfig(),
Type: srv.Type(),
ListenAddr: srv.ListenAddr(),
})
}
@@ -447,8 +449,8 @@ func (s *StreamServer) startServer(
ctx context.Context,
serverType streamtypes.ServerType,
listenAddr string,
opts ...types.ServerOption,
) (_ types.PortServer, _ret error) {
opts ...streamportserver.Option,
) (_ streamportserver.Server, _ret error) {
logger.Tracef(ctx, "startServer(%s, '%s')", serverType, listenAddr)
defer func() { logger.Tracef(ctx, "/startServer(%s, '%s'): %v", serverType, listenAddr, _ret) }()
@@ -473,7 +475,7 @@ func (s *StreamServer) startServer(
)
}
logger.Tracef(ctx, "adding serverHandler %#+v %#+v", portSrv, portSrv.Config())
logger.Tracef(ctx, "adding serverHandler %#+v %#+v", portSrv, portSrv.ProtocolSpecificConfig())
s.serverHandlers = append(s.serverHandlers, portSrv)
return nil, err
}
@@ -482,8 +484,8 @@ func (s *StreamServer) newServer(
ctx context.Context,
serverType streamtypes.ServerType,
listenAddr string,
opts ...types.ServerOption,
) (_ types.PortServer, _ret error) {
opts ...streamportserver.Option,
) (_ streamportserver.Server, _ret error) {
switch serverType {
case streamtypes.ServerTypeRTSP:
return s.newServerRTSP(ctx, listenAddr, opts...)
@@ -503,8 +505,8 @@ func (s *StreamServer) newServer(
func (s *StreamServer) newServerRTMP(
ctx context.Context,
listenAddr string,
opts ...types.ServerOption,
) (_ types.PortServer, _ret error) {
opts ...streamportserver.Option,
) (_ streamportserver.Server, _ret error) {
logger.Tracef(ctx, "newServerRTMP(ctx, '%s', %#+v)", listenAddr, opts)
rtmpSrv, err := newRTMPServer(
s.pathManager,

View File

@@ -5,7 +5,7 @@ import (
"sync/atomic"
"github.com/xaionaro-go/go-rtmp"
"github.com/xaionaro-go/streamctl/pkg/streamserver/types"
"github.com/xaionaro-go/streamctl/pkg/streamserver/types/streamportserver"
"github.com/xaionaro-go/streamctl/pkg/streamtypes"
)
@@ -16,7 +16,7 @@ type PortServer struct {
WriteCount uint64
}
var _ types.PortServer = (*PortServer)(nil)
var _ streamportserver.Server = (*PortServer)(nil)
func (srv *PortServer) Close() error {
return srv.Server.Close()
@@ -33,6 +33,6 @@ func (srv *PortServer) NumBytesConsumerWrote() uint64 {
func (srv *PortServer) NumBytesProducerRead() uint64 {
return atomic.LoadUint64(&srv.ReadCount)
}
func (srv *PortServer) Config() types.ServerConfig {
return types.ServerConfig{}
func (srv *PortServer) ProtocolSpecificConfig() streamportserver.ProtocolSpecificConfig {
return streamportserver.ProtocolSpecificConfig{}
}

View File

@@ -21,6 +21,7 @@ import (
"github.com/xaionaro-go/streamctl/pkg/streamserver/implementations/xaionaro-go-rtmp/streamforward"
"github.com/xaionaro-go/streamctl/pkg/streamserver/streamplayers"
"github.com/xaionaro-go/streamctl/pkg/streamserver/types"
"github.com/xaionaro-go/streamctl/pkg/streamserver/types/streamportserver"
"github.com/xaionaro-go/streamctl/pkg/streamtypes"
"github.com/xaionaro-go/streamctl/pkg/xlogger"
"github.com/xaionaro-go/streamctl/pkg/xsync"
@@ -37,7 +38,7 @@ type StreamServer struct {
*streamforward.StreamForwards
Config *types.Config
RelayService *yutoppgortmp.RelayService
ServerHandlers []types.PortServer
ServerHandlers []streamportserver.Server
}
var _ streamforward.StreamServer = (*StreamServer)(nil)
@@ -82,17 +83,17 @@ func (s *StreamServer) init(
cfg := s.Config
logger.Debugf(ctx, "config == %#+v", *cfg)
for _, srv := range cfg.Servers {
for _, srv := range cfg.PortServers {
{
srv := srv
observability.Go(ctx, func() {
_, err := s.startServer(ctx, srv.Type, srv.Listen)
_, err := s.startServer(ctx, srv.Type, srv.ListenAddr)
if err != nil {
logger.Errorf(
ctx,
"unable to initialize %s server at %s: %w",
srv.Type,
srv.Listen,
srv.ListenAddr,
err,
)
}
@@ -153,13 +154,13 @@ func (s *StreamServer) PubsubNames() (types.AppKeys, error) {
func (s *StreamServer) ListServers(
ctx context.Context,
) (_ret []types.PortServer) {
) (_ret []streamportserver.Server) {
ctx = belt.WithField(ctx, "module", "StreamServer")
logger.Tracef(ctx, "ListServers")
defer func() { logger.Tracef(ctx, "/ListServers: %d servers", len(_ret)) }()
return xsync.DoR1(ctx, &s.Mutex, func() []types.PortServer {
c := make([]types.PortServer, len(s.ServerHandlers))
return xsync.DoR1(ctx, &s.Mutex, func() []streamportserver.Server {
c := make([]streamportserver.Server, len(s.ServerHandlers))
copy(c, s.ServerHandlers)
return c
})
@@ -169,17 +170,18 @@ func (s *StreamServer) StartServer(
ctx context.Context,
serverType streamtypes.ServerType,
listenAddr string,
opts ...types.ServerOption,
) (types.PortServer, error) {
opts ...streamportserver.Option,
) (streamportserver.Server, error) {
ctx = belt.WithField(ctx, "module", "StreamServer")
return xsync.DoR2(ctx, &s.Mutex, func() (types.PortServer, error) {
return xsync.DoR2(ctx, &s.Mutex, func() (streamportserver.Server, error) {
srv, err := s.startServer(ctx, serverType, listenAddr, opts...)
if err != nil {
return nil, err
}
s.Config.Servers = append(s.Config.Servers, types.Server{
Type: serverType,
Listen: listenAddr,
s.Config.PortServers = append(s.Config.PortServers, streamportserver.Config{
ProtocolSpecificConfig: srv.ProtocolSpecificConfig().Options().ProtocolSpecificConfig(ctx),
Type: serverType,
ListenAddr: listenAddr,
})
return srv, nil
})
@@ -189,17 +191,17 @@ func (s *StreamServer) startServer(
ctx context.Context,
serverType streamtypes.ServerType,
listenAddr string,
opts ...types.ServerOption,
) (_ types.PortServer, _ret error) {
opts ...streamportserver.Option,
) (_ streamportserver.Server, _ret error) {
logger.Tracef(ctx, "startServer(%s, '%s')", serverType, listenAddr)
defer func() { logger.Tracef(ctx, "/startServer(%s, '%s'): %v", serverType, listenAddr, _ret) }()
cfg := types.ServerOptions(opts).Config(ctx)
cfg := streamportserver.Options(opts).ProtocolSpecificConfig(ctx)
if cfg.IsTLS {
return nil, fmt.Errorf("this implementation of the stream server does not support TLS")
}
var srv types.PortServer
var srv streamportserver.Server
var err error
switch serverType {
case streamtypes.ServerTypeRTMP:
@@ -257,7 +259,7 @@ func (s *StreamServer) startServer(
func (s *StreamServer) findServer(
_ context.Context,
server types.PortServer,
server streamportserver.Server,
) (int, error) {
for i := range s.ServerHandlers {
if s.ServerHandlers[i] == server {
@@ -269,13 +271,13 @@ func (s *StreamServer) findServer(
func (s *StreamServer) StopServer(
ctx context.Context,
server types.PortServer,
server streamportserver.Server,
) error {
ctx = belt.WithField(ctx, "module", "StreamServer")
return xsync.DoR1(ctx, &s.Mutex, func() error {
for idx, srv := range s.Config.Servers {
if srv.Listen == server.ListenAddr() {
s.Config.Servers = append(s.Config.Servers[:idx], s.Config.Servers[idx+1:]...)
for idx, srv := range s.Config.PortServers {
if srv.ListenAddr == server.ListenAddr() {
s.Config.PortServers = append(s.Config.PortServers[:idx], s.Config.PortServers[idx+1:]...)
break
}
}
@@ -285,7 +287,7 @@ func (s *StreamServer) StopServer(
func (s *StreamServer) stopServer(
ctx context.Context,
server types.PortServer,
server streamportserver.Server,
) error {
idx, err := s.findServer(ctx, server)
if err != nil {
@@ -375,14 +377,15 @@ func (s *StreamServer) WaitPublisherChan(
func (s *StreamServer) GetPortServers(
ctx context.Context,
) ([]streamplayer.StreamPortServer, error) {
) ([]streamportserver.Config, error) {
srvs := s.ListServers(ctx)
result := make([]streamplayer.StreamPortServer, 0, len(srvs))
result := make([]streamportserver.Config, 0, len(srvs))
for _, srv := range srvs {
result = append(result, streamplayer.StreamPortServer{
Addr: srv.ListenAddr(),
Type: srv.Type(),
result = append(result, streamportserver.Config{
ListenAddr: srv.ListenAddr(),
Type: srv.Type(),
ProtocolSpecificConfig: srv.ProtocolSpecificConfig(),
})
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net/url"
"sort"
"time"
"github.com/facebookincubator/go-belt"
@@ -175,7 +176,7 @@ func (s *StreamForwards) getLocalhostRTMP(ctx context.Context) (*url.URL, error)
}
portSrv := portSrvs[0]
urlString := fmt.Sprintf("%s://%s", portSrv.Type, portSrv.Addr)
urlString := fmt.Sprintf("%s://%s", portSrv.Type, portSrv.ListenAddr)
urlParsed, err := url.Parse(urlString)
if err != nil {
return nil, fmt.Errorf("unable to parse '%s': %w", urlString, err)
@@ -848,14 +849,30 @@ func (s *StreamForwards) findStreamDestinationByID(
)
}
func (s *StreamForwards) getLocalhostEndpoint(ctx context.Context) (*url.URL, error) {
func (s *StreamForwards) getLocalhostEndpoint(ctx context.Context) (_ret *url.URL, _err error) {
defer func() { logger.Debugf(ctx, "getLocalhostEndpoint result: %v %v", _ret, _err) }()
portSrvs, err := s.StreamServer.GetPortServers(ctx)
if err != nil {
return nil, fmt.Errorf("unable to get port servers info: %w", err)
}
portSrv := portSrvs[0]
urlString := fmt.Sprintf("%s://%s", portSrv.Type, portSrv.Addr)
sort.Slice(portSrvs, func(i, j int) bool {
a := &portSrvs[i]
b := &portSrvs[j]
if a.IsTLS != b.IsTLS {
return b.IsTLS
}
return false
})
portSrv := portSrvs[0]
logger.Debugf(ctx, "getLocalhostEndpoint: chosen portSrv == %#+v", portSrv)
protoString := portSrv.Type.String()
if portSrv.IsTLS {
protoString += "s"
}
urlString := fmt.Sprintf("%s://%s", protoString, portSrv.ListenAddr)
urlParsed, err := url.Parse(urlString)
if err != nil {
return nil, fmt.Errorf("unable to parse '%s': %w", urlString, err)

View File

@@ -5,13 +5,41 @@ import (
"github.com/xaionaro-go/streamctl/pkg/player"
sptypes "github.com/xaionaro-go/streamctl/pkg/streamplayer/types"
"github.com/xaionaro-go/streamctl/pkg/streamserver/types/streamportserver"
"github.com/xaionaro-go/streamctl/pkg/streamtypes"
)
type Server struct {
ServerConfig `yaml:"config"`
Type streamtypes.ServerType `yaml:"protocol"`
Listen string `yaml:"listen"`
type Config struct {
PortServers []streamportserver.Config `yaml:"servers"`
Streams map[StreamID]*StreamConfig `yaml:"streams"`
Destinations map[DestinationID]*DestinationConfig `yaml:"destinations"`
VideoPlayer struct {
MPV struct {
Path string `yaml:"path"`
} `yaml:"mpv"`
} `yaml:"video_player"`
}
type ForwardingConfig struct {
Disabled bool `yaml:"disabled,omitempty"`
Quirks ForwardingQuirks `yaml:"quirks,omitempty"`
Convert VideoConvertConfig `yaml:"convert,omitempty"`
}
type PlayerConfig struct {
Player player.Backend `yaml:"player,omitempty"`
Disabled bool `yaml:"disabled,omitempty"`
StreamPlayback sptypes.Config `yaml:"stream_playback,omitempty"`
}
type StreamConfig struct {
Forwardings map[DestinationID]ForwardingConfig `yaml:"forwardings"`
Player *PlayerConfig `yaml:"player,omitempty"`
}
type DestinationConfig struct {
URL string `yaml:"url"`
StreamKey string `yaml:"stream_key"`
}
type RestartUntilYoutubeRecognizesStream struct {
@@ -44,36 +72,3 @@ type ForwardingQuirks struct {
}
type VideoConvertConfig = streamtypes.VideoConvertConfig
type ForwardingConfig struct {
Disabled bool `yaml:"disabled,omitempty"`
Quirks ForwardingQuirks `yaml:"quirks,omitempty"`
Convert VideoConvertConfig `yaml:"convert,omitempty"`
}
type PlayerConfig struct {
Player player.Backend `yaml:"player,omitempty"`
Disabled bool `yaml:"disabled,omitempty"`
StreamPlayback sptypes.Config `yaml:"stream_playback,omitempty"`
}
type StreamConfig struct {
Forwardings map[DestinationID]ForwardingConfig `yaml:"forwardings"`
Player *PlayerConfig `yaml:"player,omitempty"`
}
type DestinationConfig struct {
URL string `yaml:"url"`
StreamKey string `yaml:"stream_key"`
}
type Config struct {
Servers []Server `yaml:"servers"`
Streams map[StreamID]*StreamConfig `yaml:"streams"`
Destinations map[DestinationID]*DestinationConfig `yaml:"destinations"`
VideoPlayer struct {
MPV struct {
Path string `yaml:"path"`
} `yaml:"mpv"`
} `yaml:"video_player"`
}

View File

@@ -1,14 +1,11 @@
package types
import (
"context"
"crypto"
"crypto/x509"
"io"
"strings"
"time"
"github.com/xaionaro-go/streamctl/pkg/streamplayer"
"github.com/xaionaro-go/streamctl/pkg/streamserver/types/streamportserver"
)
type PubsubNameser interface {
@@ -17,7 +14,7 @@ type PubsubNameser interface {
type Publisher = streamplayer.Publisher
type WaitPublisherChaner = streamplayer.WaitPublisherChaner
type GetPortServerser = streamplayer.GetPortServerser
type GetPortServerser = streamportserver.GetPortServerser
type InitConfig struct {
DefaultStreamPlayerOptions streamplayer.Options
@@ -65,98 +62,3 @@ type IncomingStream struct {
NumBytesWrote uint64
NumBytesRead uint64
}
type ServerOption interface {
apply(*ServerConfig)
}
type ServerOptions []ServerOption
func (s ServerOptions) apply(cfg *ServerConfig) {
for _, opt := range s {
opt.apply(cfg)
}
}
func (s ServerOptions) Config(
ctx context.Context,
) ServerConfig {
cfg := DefaultServerConfig(ctx)
for _, opt := range s {
opt.apply(&cfg)
}
return cfg
}
type ServerConfig struct {
IsTLS bool
WriteQueueSize uint64
WriteTimeout time.Duration
ReadTimeout time.Duration
ServerCert *x509.Certificate
ServerKey crypto.PrivateKey
}
var DefaultServerConfig = func(
ctx context.Context,
) ServerConfig {
return ServerConfig{
IsTLS: false,
WriteQueueSize: 60 * 10, // 60 FPS * 10 secs
WriteTimeout: 10 * time.Second,
ReadTimeout: 10 * time.Second,
ServerCert: nil,
ServerKey: nil,
}
}
func (cfg ServerConfig) Options() ServerOptions {
return ServerOptions{
ServerOptionIsTLS((bool)(cfg.IsTLS)),
ServerOptionWriteQueueSize((uint64)(cfg.WriteQueueSize)),
ServerOptionWriteTimeout((time.Duration)(cfg.WriteTimeout)),
ServerOptionReadTimeout((time.Duration)(cfg.ReadTimeout)),
ServerOptionServerCert{cfg.ServerCert},
ServerOptionServerKey{cfg.ServerKey},
}
}
type ServerOptionIsTLS bool
func (opt ServerOptionIsTLS) apply(cfg *ServerConfig) {
cfg.IsTLS = (bool)(opt)
}
type ServerOptionWriteQueueSize uint64
func (opt ServerOptionWriteQueueSize) apply(cfg *ServerConfig) {
cfg.WriteQueueSize = (uint64)(opt)
}
type ServerOptionWriteTimeout time.Duration
func (opt ServerOptionWriteTimeout) apply(cfg *ServerConfig) {
cfg.WriteTimeout = (time.Duration)(opt)
}
type ServerOptionReadTimeout time.Duration
func (opt ServerOptionReadTimeout) apply(cfg *ServerConfig) {
cfg.ReadTimeout = (time.Duration)(opt)
}
type ServerOptionServerCert struct{ *x509.Certificate }
func (opt ServerOptionServerCert) apply(cfg *ServerConfig) {
cfg.ServerCert = opt.Certificate
}
type ServerOptionServerKey struct{ crypto.PrivateKey }
func (opt ServerOptionServerKey) apply(cfg *ServerConfig) {
if opt.PrivateKey == nil {
cfg.ServerKey = nil
return
}
cfg.ServerKey = opt.PrivateKey
}

View File

@@ -0,0 +1,9 @@
package streamportserver
import (
"context"
)
type GetPortServerser interface {
GetPortServers(context.Context) ([]Config, error)
}

View File

@@ -0,0 +1,111 @@
package streamportserver
import (
"context"
"crypto"
"crypto/x509"
"time"
"github.com/xaionaro-go/streamctl/pkg/streamtypes"
)
type Option interface {
apply(*ProtocolSpecificConfig)
}
type Options []Option
func (s Options) apply(cfg *ProtocolSpecificConfig) {
for _, opt := range s {
opt.apply(cfg)
}
}
func (s Options) ProtocolSpecificConfig(
ctx context.Context,
) ProtocolSpecificConfig {
cfg := DefaultConfig(ctx)
for _, opt := range s {
opt.apply(&cfg)
}
return cfg
}
type ProtocolSpecificConfig struct {
IsTLS bool
WriteQueueSize uint64
WriteTimeout time.Duration
ReadTimeout time.Duration
ServerCert *x509.Certificate
ServerKey crypto.PrivateKey
}
var DefaultConfig = func(
ctx context.Context,
) ProtocolSpecificConfig {
return ProtocolSpecificConfig{
IsTLS: false,
WriteQueueSize: 60 * 10, // 60 FPS * 10 secs
WriteTimeout: 10 * time.Second,
ReadTimeout: 10 * time.Second,
ServerCert: nil,
ServerKey: nil,
}
}
func (cfg ProtocolSpecificConfig) Options() Options {
return Options{
OptionIsTLS((bool)(cfg.IsTLS)),
OptionWriteQueueSize((uint64)(cfg.WriteQueueSize)),
OptionWriteTimeout((time.Duration)(cfg.WriteTimeout)),
OptionReadTimeout((time.Duration)(cfg.ReadTimeout)),
OptionServerCert{cfg.ServerCert},
OptionServerKey{cfg.ServerKey},
}
}
type OptionIsTLS bool
func (opt OptionIsTLS) apply(cfg *ProtocolSpecificConfig) {
cfg.IsTLS = (bool)(opt)
}
type OptionWriteQueueSize uint64
func (opt OptionWriteQueueSize) apply(cfg *ProtocolSpecificConfig) {
cfg.WriteQueueSize = (uint64)(opt)
}
type OptionWriteTimeout time.Duration
func (opt OptionWriteTimeout) apply(cfg *ProtocolSpecificConfig) {
cfg.WriteTimeout = (time.Duration)(opt)
}
type OptionReadTimeout time.Duration
func (opt OptionReadTimeout) apply(cfg *ProtocolSpecificConfig) {
cfg.ReadTimeout = (time.Duration)(opt)
}
type OptionServerCert struct{ *x509.Certificate }
func (opt OptionServerCert) apply(cfg *ProtocolSpecificConfig) {
cfg.ServerCert = opt.Certificate
}
type OptionServerKey struct{ crypto.PrivateKey }
func (opt OptionServerKey) apply(cfg *ProtocolSpecificConfig) {
if opt.PrivateKey == nil {
cfg.ServerKey = nil
return
}
cfg.ServerKey = opt.PrivateKey
}
type Config struct {
ProtocolSpecificConfig `yaml:"config"`
Type streamtypes.ServerType `yaml:"protocol"`
ListenAddr string `yaml:"listen"`
}

View File

@@ -0,0 +1,19 @@
package streamportserver
import (
"io"
"github.com/xaionaro-go/streamctl/pkg/streamtypes"
)
type Server interface {
io.Closer
ProtocolSpecificConfig() ProtocolSpecificConfig
Type() streamtypes.ServerType
ListenAddr() string
NumBytesConsumerWrote() uint64
NumBytesProducerRead() uint64
}

View File

@@ -1,23 +1,9 @@
package types
import (
"io"
"github.com/xaionaro-go/streamctl/pkg/streamtypes"
)
type PortServer interface {
io.Closer
Config() ServerConfig
Type() streamtypes.ServerType
ListenAddr() string
NumBytesConsumerWrote() uint64
NumBytesProducerRead() uint64
}
type StreamDestination struct {
ID DestinationID
URL string