Initial commit, pt. 75

This commit is contained in:
Dmitrii Okunev
2024-08-05 23:08:57 +01:00
parent 71ed15ab14
commit f558df2d77
10 changed files with 2985 additions and 2620 deletions

View File

@@ -118,7 +118,8 @@ func streamSetup(cmd *cobra.Command, args []string) {
remoteAddr, err := cmd.Flags().GetString("remote-addr")
assertNoError(ctx, err)
streamD := client.New(remoteAddr)
streamD, err := client.New(remoteAddr)
assertNoError(ctx, err)
title, err := cmd.Flags().GetString("title")
assertNoError(ctx, err)
description, err := cmd.Flags().GetString("description")
@@ -160,7 +161,8 @@ func streamStatus(cmd *cobra.Command, args []string) {
remoteAddr, err := cmd.Flags().GetString("remote-addr")
assertNoError(ctx, err)
streamD := client.New(remoteAddr)
streamD, err := client.New(remoteAddr)
assertNoError(ctx, err)
for _, platID := range []streamcontrol.PlatformName{
obs.ID, twitch.ID, youtube.ID,
@@ -188,7 +190,8 @@ func variablesGet(cmd *cobra.Command, args []string) {
remoteAddr, err := cmd.Flags().GetString("remote-addr")
assertNoError(ctx, err)
streamD := client.New(remoteAddr)
streamD, err := client.New(remoteAddr)
assertNoError(ctx, err)
b, err := streamD.GetVariable(ctx, consts.VarKey(variableKey))
assertNoError(ctx, err)
@@ -203,7 +206,8 @@ func variablesGetHash(cmd *cobra.Command, args []string) {
remoteAddr, err := cmd.Flags().GetString("remote-addr")
assertNoError(ctx, err)
streamD := client.New(remoteAddr)
streamD, err := client.New(remoteAddr)
assertNoError(ctx, err)
b, err := streamD.GetVariableHash(ctx, consts.VarKey(variableKey), crypto.SHA1)
assertNoError(ctx, err)
@@ -217,7 +221,8 @@ func variablesSet(cmd *cobra.Command, args []string) {
remoteAddr, err := cmd.Flags().GetString("remote-addr")
assertNoError(ctx, err)
streamD := client.New(remoteAddr)
streamD, err := client.New(remoteAddr)
assertNoError(ctx, err)
value, err := io.ReadAll(os.Stdin)
assertNoError(ctx, err)
@@ -231,7 +236,8 @@ func configGet(cmd *cobra.Command, args []string) {
remoteAddr, err := cmd.Flags().GetString("remote-addr")
assertNoError(ctx, err)
streamD := client.New(remoteAddr)
streamD, err := client.New(remoteAddr)
assertNoError(ctx, err)
cfg, err := streamD.GetConfig(ctx)
assertNoError(ctx, err)

View File

@@ -5,4 +5,4 @@ Website = "https://github.com/xaionaro/streamctl"
Name = "streampanel"
ID = "center.dx.streampanel"
Version = "0.1.0"
Build = 88
Build = 91

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,11 @@
package client
import "io"
type dummyCloser struct{}
var _ io.Closer = dummyCloser{}
func (dummyCloser) Close() error {
return nil
}

View File

@@ -0,0 +1,98 @@
package client
import (
"context"
"time"
"google.golang.org/grpc"
)
type ReconnectConfig struct {
InitialInterval time.Duration
MaximalInterval time.Duration
IntervalMultiplier float64
}
type CallWrapperFunc func(
ctx context.Context,
req any,
callFunc func(ctx context.Context, opts ...grpc.CallOption) error,
opts ...grpc.CallOption,
) error
type ConnectWrapperFunc func(
ctx context.Context,
connectFunc func(ctx context.Context) error,
) error
type Config struct {
UsePersistentConnection bool
CallWrapper CallWrapperFunc
ConnectWrapper ConnectWrapperFunc
Reconnect ReconnectConfig
}
var DefaultConfig = func() Config {
return Config{
UsePersistentConnection: false,
Reconnect: ReconnectConfig{
InitialInterval: 10 * time.Millisecond,
MaximalInterval: 5 * time.Second,
IntervalMultiplier: 1.1,
},
}
}()
type Option interface {
Apply(*Config)
}
type Options []Option
func (s Options) Apply(cfg *Config) {
for _, opt := range s {
opt.Apply(cfg)
}
}
func (s Options) Config() Config {
cfg := Config{}
s.Apply(&cfg)
return cfg
}
type OptionUsePersistentConnection bool
func (opt OptionUsePersistentConnection) Apply(cfg *Config) {
cfg.UsePersistentConnection = bool(opt)
}
type OptionCallWrapper CallWrapperFunc
func (opt OptionCallWrapper) Apply(cfg *Config) {
cfg.CallWrapper = CallWrapperFunc(opt)
}
type OptionConnectWrapper ConnectWrapperFunc
func (opt OptionConnectWrapper) Apply(cfg *Config) {
cfg.ConnectWrapper = ConnectWrapperFunc(opt)
}
type OptionReconnectInitialInterval time.Duration
func (opt OptionReconnectInitialInterval) Apply(cfg *Config) {
cfg.Reconnect.InitialInterval = time.Duration(opt)
}
type OptionReconnectMaximalInterval time.Duration
func (opt OptionReconnectMaximalInterval) Apply(cfg *Config) {
cfg.Reconnect.MaximalInterval = time.Duration(opt)
}
type OptionReconnectIntervalMultiplier float64
func (opt OptionReconnectIntervalMultiplier) Apply(cfg *Config) {
cfg.Reconnect.IntervalMultiplier = float64(opt)
}

File diff suppressed because it is too large Load Diff

View File

@@ -22,6 +22,7 @@ const _ = grpc.SupportPackageIsVersion7
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type StreamDClient interface {
Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingReply, error)
SetLoggingLevel(ctx context.Context, in *SetLoggingLevelRequest, opts ...grpc.CallOption) (*SetLoggingLevelReply, error)
GetLoggingLevel(ctx context.Context, in *GetLoggingLevelRequest, opts ...grpc.CallOption) (*GetLoggingLevelReply, error)
GetConfig(ctx context.Context, in *GetConfigRequest, opts ...grpc.CallOption) (*GetConfigReply, error)
@@ -97,6 +98,15 @@ func NewStreamDClient(cc grpc.ClientConnInterface) StreamDClient {
return &streamDClient{cc}
}
func (c *streamDClient) Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingReply, error) {
out := new(PingReply)
err := c.cc.Invoke(ctx, "/streamd.StreamD/Ping", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *streamDClient) SetLoggingLevel(ctx context.Context, in *SetLoggingLevelRequest, opts ...grpc.CallOption) (*SetLoggingLevelReply, error) {
out := new(SetLoggingLevelReply)
err := c.cc.Invoke(ctx, "/streamd.StreamD/SetLoggingLevel", in, out, opts...)
@@ -916,6 +926,7 @@ func (c *streamDClient) StreamPlayerClose(ctx context.Context, in *StreamPlayerC
// All implementations must embed UnimplementedStreamDServer
// for forward compatibility
type StreamDServer interface {
Ping(context.Context, *PingRequest) (*PingReply, error)
SetLoggingLevel(context.Context, *SetLoggingLevelRequest) (*SetLoggingLevelReply, error)
GetLoggingLevel(context.Context, *GetLoggingLevelRequest) (*GetLoggingLevelReply, error)
GetConfig(context.Context, *GetConfigRequest) (*GetConfigReply, error)
@@ -988,6 +999,9 @@ type StreamDServer interface {
type UnimplementedStreamDServer struct {
}
func (UnimplementedStreamDServer) Ping(context.Context, *PingRequest) (*PingReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented")
}
func (UnimplementedStreamDServer) SetLoggingLevel(context.Context, *SetLoggingLevelRequest) (*SetLoggingLevelReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method SetLoggingLevel not implemented")
}
@@ -1196,6 +1210,24 @@ func RegisterStreamDServer(s grpc.ServiceRegistrar, srv StreamDServer) {
s.RegisterService(&StreamD_ServiceDesc, srv)
}
func _StreamD_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(PingRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(StreamDServer).Ping(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/streamd.StreamD/Ping",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(StreamDServer).Ping(ctx, req.(*PingRequest))
}
return interceptor(ctx, in, info, handler)
}
func _StreamD_SetLoggingLevel_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(SetLoggingLevelRequest)
if err := dec(in); err != nil {
@@ -2403,6 +2435,10 @@ var StreamD_ServiceDesc = grpc.ServiceDesc{
ServiceName: "streamd.StreamD",
HandlerType: (*StreamDServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Ping",
Handler: _StreamD_Ping_Handler,
},
{
MethodName: "SetLoggingLevel",
Handler: _StreamD_SetLoggingLevel_Handler,

View File

@@ -5,6 +5,7 @@ option go_package = "go/streamd_grpc";
import "player/player.proto";
service StreamD {
rpc Ping(PingRequest) returns (PingReply) {}
rpc SetLoggingLevel(SetLoggingLevelRequest) returns (SetLoggingLevelReply) {}
rpc GetLoggingLevel(GetLoggingLevelRequest) returns (GetLoggingLevelReply) {}
rpc GetConfig(GetConfigRequest) returns (GetConfigReply) {}
@@ -78,6 +79,16 @@ service StreamD {
rpc StreamPlayerClose(StreamPlayerCloseRequest) returns (StreamPlayerCloseReply) {}
}
message PingRequest {
string payloadToReturn = 1;
string payloadToIgnore = 2;
int32 requestExtraPayloadSize = 3;
}
message PingReply {
string payload = 1;
}
enum LoggingLevel {
none = 0;
fatal = 1;

View File

@@ -6,6 +6,7 @@ import (
"crypto"
"encoding/json"
"fmt"
"strings"
"sync"
"github.com/andreykaipov/goobs/api/requests/scenes"
@@ -65,6 +66,23 @@ func (grpc *GRPCServer) MemoizeData() *memoize.MemoizeData {
return grpc.MemoizeDataValue
}
func (grpc *GRPCServer) Ping(
ctx context.Context,
req *streamd_grpc.PingRequest,
) (*streamd_grpc.PingReply, error) {
var payload strings.Builder
extraSize := req.GetRequestExtraPayloadSize()
totalSize := len(req.GetPayloadToReturn()) + int(extraSize)
if totalSize > 65535 {
return nil, fmt.Errorf("requested a too big payload")
}
payload.WriteString(req.GetPayloadToReturn())
payload.WriteString(strings.Repeat("0", int(extraSize)))
return &streamd_grpc.PingReply{
Payload: payload.String(),
}, nil
}
func (grpc *GRPCServer) Close() error {
err := &multierror.Error{}
grpc.OAuthURLHandlerLocker.Lock()

View File

@@ -341,7 +341,7 @@ func (p *Panel) startOAuthListenerForRemoteStreamD(
return fmt.Errorf("unable to start listener for OAuth responses: %w", err)
}
oauthURLChan, err := streamD.SubscriberToOAuthURLs(ctx, listenPort)
oauthURLChan, err := streamD.SubscribeToOAuthURLs(ctx, listenPort)
if err != nil {
cancelFn()
return fmt.Errorf("unable to subscribe to OAuth requests of streamd: %w", err)
@@ -446,8 +446,9 @@ func (p *Panel) SetLoggingLevel(ctx context.Context, level logger.Level) {
}
func (p *Panel) initRemoteStreamD(context.Context) error {
p.StreamD = client.New(p.Config.RemoteStreamDAddr)
return nil
var err error
p.StreamD, err = client.New(p.Config.RemoteStreamDAddr)
return err
}
func removeNonDigits(input string) string {
@@ -1909,7 +1910,7 @@ func (p *Panel) setupStream(ctx context.Context) {
if p.youtubeCheck.Checked && backendEnabled[youtube.ID] {
if p.streamIsRunning(ctx, youtube.ID) {
logger.Infof(ctx, "updating the stream info at YouTube")
logger.Debugf(ctx, "updating the stream info at YouTube")
err := p.StreamD.UpdateStream(
ctx,
youtube.ID,
@@ -1917,11 +1918,12 @@ func (p *Panel) setupStream(ctx context.Context) {
p.streamDescriptionField.Text,
profile.PerPlatform[youtube.ID],
)
logger.Infof(ctx, "updated the stream info at YouTube")
if err != nil {
p.DisplayError(fmt.Errorf("unable to start the stream on YouTube: %w", err))
}
} else {
logger.Infof(ctx, "creating the stream at YouTube")
logger.Debugf(ctx, "creating the stream at YouTube")
err := p.StreamD.StartStream(
ctx,
youtube.ID,
@@ -1929,6 +1931,7 @@ func (p *Panel) setupStream(ctx context.Context) {
p.streamDescriptionField.Text,
profile.PerPlatform[youtube.ID],
)
logger.Infof(ctx, "created the stream at YouTube")
if err != nil {
p.DisplayError(fmt.Errorf("unable to start the stream on YouTube: %w", err))
}