Start implementing a stream preview

This commit is contained in:
Dmitrii Okunev
2024-10-30 22:47:51 +00:00
parent 5549cedc47
commit ebd71e92b1
9 changed files with 100 additions and 32 deletions

View File

@@ -19,6 +19,8 @@ import (
"github.com/getsentry/sentry-go"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
"github.com/spf13/pflag"
"github.com/xaionaro-go/grpcproxy/grpcproxyserver"
"github.com/xaionaro-go/grpcproxy/protobuf/go/proxy_grpc"
"github.com/xaionaro-go/obs-grpc-proxy/protobuf/go/obs_grpc"
"github.com/xaionaro-go/streamctl/cmd/streamd/ui"
"github.com/xaionaro-go/streamctl/pkg/observability"
@@ -211,6 +213,7 @@ func main() {
log.Fatal(err)
}
obs_grpc.RegisterOBSServer(grpcServer, obsGRPC)
proxy_grpc.RegisterNetworkProxyServer(grpcServer, grpcproxyserver.New())
streamd_grpc.RegisterStreamDServer(grpcServer, streamdGRPC)
l.Infof("started server at %s", *listenAddr)

View File

@@ -14,6 +14,8 @@ import (
"github.com/facebookincubator/go-belt/tool/experimental/errmon"
"github.com/facebookincubator/go-belt/tool/logger"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
"github.com/xaionaro-go/grpcproxy/grpcproxyserver"
"github.com/xaionaro-go/grpcproxy/protobuf/go/proxy_grpc"
"github.com/xaionaro-go/obs-grpc-proxy/protobuf/go/obs_grpc"
"github.com/xaionaro-go/streamctl/cmd/streamd/ui"
"github.com/xaionaro-go/streamctl/pkg/mainprocess"
@@ -279,6 +281,7 @@ func initGRPCServers(
streamdGRPC := server.NewGRPCServer(streamD)
streamd_grpc.RegisterStreamDServer(grpcServer, streamdGRPC)
obs_grpc.RegisterOBSServer(grpcServer, obsGRPC)
proxy_grpc.RegisterNetworkProxyServer(grpcServer, grpcproxyserver.New())
// start the server:
observability.Go(ctx, func() {

11
go.mod
View File

@@ -1,8 +1,6 @@
module github.com/xaionaro-go/streamctl
go 1.23
toolchain go1.23.2
go 1.23.2
// The original go-yaml is very slow, using the improved version instead
replace github.com/goccy/go-yaml v1.11.3 => github.com/yoelsusanto/go-yaml v0.0.0-20240324162521-2018c1ab915b
@@ -177,7 +175,7 @@ require (
golang.org/x/time v0.6.0 // indirect
golang.org/x/tools v0.22.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
@@ -229,6 +227,7 @@ require (
github.com/xaionaro-go/datacounter v1.0.4
github.com/xaionaro-go/go-rtmp v0.0.0-20241009130244-1e3160f27f42
github.com/xaionaro-go/gorex v0.0.0-20241010205749-bcd59d639c4d
github.com/xaionaro-go/grpcproxy v0.0.0-20241030215807-d4204b934e10
github.com/xaionaro-go/kickcom v0.0.0-20241022142825-25a234cc8628
github.com/xaionaro-go/lockmap v0.0.0-20240901172806-e17aea364748
github.com/xaionaro-go/mediamtx v0.0.0-20241009124606-94c22c603970
@@ -240,8 +239,8 @@ require (
github.com/yutopp/go-flv v0.3.1
golang.org/x/crypto v0.28.0
golang.org/x/net v0.30.0
google.golang.org/grpc v1.65.0
google.golang.org/protobuf v1.34.2
google.golang.org/grpc v1.67.1
google.golang.org/protobuf v1.35.1
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
)

18
go.sum
View File

@@ -683,6 +683,8 @@ github.com/xaionaro-go/goobs v0.0.0-20241025144519-45ebde014c09 h1:NGu2p1ACunAJk
github.com/xaionaro-go/goobs v0.0.0-20241025144519-45ebde014c09/go.mod h1:yE6JutVAl1vLcABwfX7OcPcShTG9eABUUfoUj/y5Xc0=
github.com/xaionaro-go/gorex v0.0.0-20241010205749-bcd59d639c4d h1:9DyH0lboWWzKUwiqGmp9sTZ3bSPhgJHiiWgV+hqY9Uo=
github.com/xaionaro-go/gorex v0.0.0-20241010205749-bcd59d639c4d/go.mod h1:yI0EvHC6Ir5WIZp3tEk7o42/QqeTb9pkII+T8p4FlPo=
github.com/xaionaro-go/grpcproxy v0.0.0-20241030215807-d4204b934e10 h1:Y3ttqM++h0JJqRoiEBciOS9bT1O1xgjvR8deXePJxHs=
github.com/xaionaro-go/grpcproxy v0.0.0-20241030215807-d4204b934e10/go.mod h1:CMAXpustI+zscUtsMhoLXq91uIv+gVnH5Ponk9fvoBI=
github.com/xaionaro-go/kickcom v0.0.0-20241022142825-25a234cc8628 h1:/ohdYrl4nFgEJJTQqP8hSzXqXxyetiB07jnQu5pcvNo=
github.com/xaionaro-go/kickcom v0.0.0-20241022142825-25a234cc8628/go.mod h1:gzKL0qgtR13PXl4woI3nvxVhQ9Z6lHtGL5tk9HmITxA=
github.com/xaionaro-go/libvlc-go/v3 v3.0.0-20241011194409-0fe4e2a9d901 h1:HX0CO6h5oDQfp9NquzQT0xWH4Gn9Z5BZ0IFJrYFl88k=
@@ -1178,10 +1180,10 @@ google.golang.org/genproto v0.0.0-20210319143718-93e7006c17a6/go.mod h1:FWY/as6D
google.golang.org/genproto v0.0.0-20210402141018-6c239bbf2bb1/go.mod h1:9lPAdzaEmUacj36I+k7YKbEc5CXzPIeORRgDAUOu28A=
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0=
google.golang.org/genproto v0.0.0-20240730163845-b1a4ccb954bf h1:OqdXDEakZCVtDiZTjcxfwbHPCT11ycCEsTKesBVKvyY=
google.golang.org/genproto/googleapis/api v0.0.0-20240711142825-46eb208f015d h1:kHjw/5UfflP/L5EbledDrcG4C2597RtymmGRZvHiCuY=
google.golang.org/genproto/googleapis/api v0.0.0-20240711142825-46eb208f015d/go.mod h1:mw8MG/Qz5wfgYr6VqVCiZcHe/GJEfI+oGGDCohaVgB0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf h1:liao9UHurZLtiEwBgT9LMOnKYsHze6eA6w1KQCMVN2Q=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY=
google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 h1:wKguEg1hsxI2/L3hUYrpo1RVi48K+uTyzKqprwLXsb8=
google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142/go.mod h1:d6be+8HhtEtucleCbxpPW9PA9XwISACu8nvpPqF0BVo=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 h1:e7S5W7MGGLaSu8j3YjdezkZ+m1/Nm0uRVRMEMGk26Xs=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
@@ -1202,8 +1204,8 @@ google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAG
google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc=
google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ=
google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E=
google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
@@ -1216,8 +1218,8 @@ google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGj
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA=
google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

View File

@@ -3,6 +3,7 @@ package api
import (
"context"
"crypto"
"net"
"time"
"github.com/facebookincubator/go-belt/tool/logger"
@@ -289,6 +290,12 @@ type StreamD interface {
reason string,
deadline time.Time,
) error
DialContext(
ctx context.Context,
network string,
addr string,
) (net.Conn, error)
}
type StreamPlayer = sstypes.StreamPlayer

View File

@@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"io"
"net"
"runtime"
"strings"
"sync/atomic"
@@ -17,6 +18,8 @@ import (
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/goccy/go-yaml"
"github.com/hashicorp/go-multierror"
"github.com/xaionaro-go/grpcproxy/grpchttpproxy"
"github.com/xaionaro-go/grpcproxy/protobuf/go/proxy_grpc"
"github.com/xaionaro-go/obs-grpc-proxy/pkg/obsgrpcproxy"
"github.com/xaionaro-go/obs-grpc-proxy/protobuf/go/obs_grpc"
"github.com/xaionaro-go/streamctl/pkg/observability"
@@ -2755,3 +2758,38 @@ func (c *Client) SendChatMessage(
}
return nil
}
func (c *Client) DialContext(
ctx context.Context,
network string,
addr string,
) (net.Conn, error) {
conn, err := c.connect(ctx)
if err != nil {
return nil, fmt.Errorf("unable to initialize a gRPC client: %w", err)
}
proxyClient := proxy_grpc.NewNetworkProxyClient(conn)
netConn, err := grpchttpproxy.NewDialer(proxyClient).DialContext(ctx, network, addr)
if err != nil {
conn.Close()
return nil, fmt.Errorf("unable to establish a proxied connection: %w", err)
}
return &proxiedConn{
Conn: netConn,
grpcClientConn: conn,
}, nil
}
type proxiedConn struct {
net.Conn
grpcClientConn *grpc.ClientConn
}
func (conn *proxiedConn) Close() error {
var result *multierror.Error
result = multierror.Append(result, conn.Conn.Close())
result = multierror.Append(result, conn.grpcClientConn.Close())
return result.ErrorOrNil()
}

View File

@@ -596,11 +596,7 @@ func (grpc *GRPCServer) SubscribeToOAuthRequests(
})
for _, req := range unansweredRequests {
logger.Tracef(
ctx,
"re-sending an unanswered request to a new client: %#+v",
*req,
)
logger.Tracef(ctx, "re-sending an unanswered request to a new client: %#+v", req)
err := sender.Send(req)
errmon.ObserveErrorCtx(ctx, err)
}
@@ -669,22 +665,18 @@ func (grpc *GRPCServer) openBrowser(
ctx context.Context,
url string,
) (_ret error) {
req := streamd_grpc.OAuthRequest{
req := &streamd_grpc.OAuthRequest{
PlatID: string("<OpenBrowser>"),
AuthURL: url,
}
count := 0
for _, handlers := range grpc.OAuthURLHandlers {
logger.Debugf(
ctx,
"OpenOAuthURL() sending %#+v",
req,
)
logger.Debugf(ctx, "OpenOAuthURL() sending %#+v", req)
var resultErr *multierror.Error
for _, handler := range handlers {
count++
err := handler.Sender.Send(&req)
err := handler.Sender.Send(req)
if err != nil {
err = multierror.Append(
resultErr,
@@ -737,7 +729,7 @@ func (grpc *GRPCServer) openOAuthURL(
Port: listenPort,
}
}
req := streamd_grpc.OAuthRequest{
req := &streamd_grpc.OAuthRequest{
PlatID: string(platID),
AuthURL: authURL,
}
@@ -745,12 +737,12 @@ func (grpc *GRPCServer) openOAuthURL(
if grpc.UnansweredOAuthRequests[platID] == nil {
grpc.UnansweredOAuthRequests[platID] = map[uint16]*streamd_grpc.OAuthRequest{}
}
grpc.UnansweredOAuthRequests[platID][listenPort] = &req
grpc.UnansweredOAuthRequests[platID][listenPort] = req
})
logger.Debugf(ctx, "OpenOAuthURL() sending %#+v", req)
var resultErr *multierror.Error
for _, handler := range handlers {
err := handler.Sender.Send(&req)
err := handler.Sender.Send(req)
if err != nil {
err = multierror.Append(
resultErr,
@@ -1944,3 +1936,18 @@ func (grpc *GRPCServer) BanUser(
}
return &streamd_grpc.BanUserReply{}, nil
}
/*func (grpc *GRPCServer) ProxyConnect(
req *streamd_grpc.ProxyConnectRequest,
srv streamd_grpc.StreamD_ProxyConnectServer,
) error {
grpc.StreamD.ProxyConnect(
ctx,
)
}
func (grpc *GRPCServer) ProxyPackets(
streamd_grpc.StreamD_ProxyPacketsServer,
) error {
}*/

View File

@@ -31,10 +31,10 @@ func (d *StreamD) EXPERIMENTAL_ReinitStreamControllers(ctx context.Context) (_er
func (d *StreamD) reinitStreamControllers(ctx context.Context) error {
var result *multierror.Error
for _, platName := range []streamcontrol.PlatformName{
youtube.ID,
twitch.ID,
kick.ID,
obs.ID,
youtube.ID,
} {
platCfg := d.Config.Backends[platName]
if platCfg == nil {
@@ -168,8 +168,8 @@ func newKick(
ctx context.Context,
cfg *streamcontrol.AbstractPlatformConfig,
saveCfgFunc func(*streamcontrol.AbstractPlatformConfig) error,
customOAuthHandler kick.OAuthHandler,
getOAuthListenPorts func() []uint16,
_ kick.OAuthHandler,
_ func() []uint16,
) (
*kick.Kick,
error,

View File

@@ -4,6 +4,7 @@ import (
"context"
"crypto"
"fmt"
"net"
"os"
"reflect"
"sort"
@@ -1741,3 +1742,11 @@ func (d *StreamD) SendChatMessage(
return nil
}
func (d *StreamD) DialContext(
ctx context.Context,
network string,
addr string,
) (net.Conn, error) {
return net.Dial(network, addr)
}