Continue the implementation of trigger rules (ex scene rules)

This commit is contained in:
Dmitrii Okunev
2024-10-17 21:52:28 +01:00
parent fb33738f1c
commit d17944c0dd
45 changed files with 5077 additions and 1748 deletions

2
go.mod
View File

@@ -39,6 +39,7 @@ require (
dario.cat/mergo v1.0.0 // indirect dario.cat/mergo v1.0.0 // indirect
fyne.io/systray v1.11.0 // indirect fyne.io/systray v1.11.0 // indirect
github.com/BurntSushi/toml v1.4.0 // indirect github.com/BurntSushi/toml v1.4.0 // indirect
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802 // indirect
github.com/MicahParks/jwkset v0.5.20 // indirect github.com/MicahParks/jwkset v0.5.20 // indirect
github.com/MicahParks/keyfunc/v3 v3.3.5 // indirect github.com/MicahParks/keyfunc/v3 v3.3.5 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect
@@ -219,6 +220,7 @@ require (
) )
require ( require (
github.com/BurntSushi/xgbutil v0.0.0-20190907113008-ad855c713046
github.com/onsi/gomega v1.30.0 // indirect github.com/onsi/gomega v1.30.0 // indirect
github.com/phuslu/goid v1.0.1 // indirect github.com/phuslu/goid v1.0.1 // indirect
github.com/pion/datachannel v1.5.6 // indirect github.com/pion/datachannel v1.5.6 // indirect

3
go.sum
View File

@@ -52,7 +52,10 @@ github.com/AgustinSRG/go-child-process-manager v1.0.1/go.mod h1:JgXUSAhyOo1awWbB
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/toml v1.4.0 h1:kuoIxZQy2WRRk1pttg9asf+WVv6tWQuBNVmK8+nqPr0= github.com/BurntSushi/toml v1.4.0 h1:kuoIxZQy2WRRk1pttg9asf+WVv6tWQuBNVmK8+nqPr0=
github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802 h1:1BDTz0u9nC3//pOCMdNH+CiXJVYJh5UQNCOBG7jbELc=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/BurntSushi/xgbutil v0.0.0-20190907113008-ad855c713046 h1:O/r2Sj+8QcMF7V5IcmiE2sMFV2q3J47BEirxbXJAdzA=
github.com/BurntSushi/xgbutil v0.0.0-20190907113008-ad855c713046/go.mod h1:uw9h2sd4WWHOPdJ13MQpwK5qYWKYDumDqxWWIknEQ+k=
github.com/DataDog/gostackparse v0.6.0 h1:egCGQviIabPwsyoWpGvIBGrEnNWez35aEO7OJ1vBI4o= github.com/DataDog/gostackparse v0.6.0 h1:egCGQviIabPwsyoWpGvIBGrEnNWez35aEO7OJ1vBI4o=
github.com/DataDog/gostackparse v0.6.0/go.mod h1:lTfqcJKqS9KnXQGnyQMCugq3u1FP6UZMfWR0aitKFMM= github.com/DataDog/gostackparse v0.6.0/go.mod h1:lTfqcJKqS9KnXQGnyQMCugq3u1FP6UZMfWR0aitKFMM=
github.com/DexterLB/mpvipc v0.0.0-20230829142118-145d6eabdc37 h1:/oQBAuySCcme0DLhicWkr7FaAT5nh1XbbbnCMR2WdPA= github.com/DexterLB/mpvipc v0.0.0-20230829142118-145d6eabdc37 h1:/oQBAuySCcme0DLhicWkr7FaAT5nh1XbbbnCMR2WdPA=

33
pkg/expression/eval.go Normal file
View File

@@ -0,0 +1,33 @@
package expression
import (
"bytes"
"fmt"
"text/template"
)
func Eval[T any](
expr Expression,
evalCtx any,
) (T, error) {
var result T
tmpl, err := template.New("").Funcs(funcMap).Parse(string(expr))
if err != nil {
return result, fmt.Errorf("unable to parse the template: %w", err)
}
var buf bytes.Buffer
err = tmpl.Execute(&buf, evalCtx)
if err != nil {
return result, fmt.Errorf("unable to execute the template: %w", err)
}
value := buf.String()
_, err = fmt.Sscanf(value, "%v", &result)
if err != nil {
return result, fmt.Errorf("unable to scan value '%v' into %T: %w", value, result, err)
}
return result, nil
}

View File

@@ -0,0 +1,3 @@
package expression
type Expression string

View File

@@ -0,0 +1,29 @@
package serializable
import (
registrylib "github.com/xaionaro-go/streamctl/pkg/serializable/registry"
)
var localRegistry = registrylib.New[any]()
func RegisterType[T any]() {
var sample T
localRegistry.RegisterType(sample)
}
func NewByTypeName[T any](typeName string) (T, bool) {
result := localRegistry.NewByTypeName(typeName)
casted, ok := result.(T)
return casted, ok
}
func ListTypeNames[T any]() []string {
names := localRegistry.ListTypeNames()
var result []string
for _, name := range names {
if _, ok := localRegistry.NewByTypeName(name).(T); ok {
result = append(result, name)
}
}
return result
}

View File

@@ -1,8 +1,10 @@
package registry package registry
import ( import (
"fmt"
"reflect" "reflect"
"sort" "sort"
"strings"
"github.com/iancoleman/strcase" "github.com/iancoleman/strcase"
) )
@@ -18,11 +20,19 @@ func New[T any]() *Registry[T] {
} }
func typeOf(v any) reflect.Type { func typeOf(v any) reflect.Type {
return reflect.ValueOf(v).Type().Elem() return reflect.Indirect(reflect.ValueOf(v)).Type()
} }
func ToTypeName[T any](sample T) string { func ToTypeName[T any](sample T) string {
return strcase.ToSnake(typeOf(sample).Name()) name := typeOf(sample).Name()
name = strings.ReplaceAll(name, "github.com/xaionaro-go/streamctl/pkg/streamd/config/", "")
name = strings.ReplaceAll(name, "eventquery/", "")
name = strings.ReplaceAll(name, "eventquery.", "")
name = strings.ReplaceAll(name, "event/", "")
name = strings.ReplaceAll(name, "event.", "")
name = strings.ReplaceAll(name, "action/", "")
name = strings.ReplaceAll(name, "action.", "")
return strcase.ToSnake(name)
} }
func (r *Registry[T]) RegisterType(sample T) { func (r *Registry[T]) RegisterType(sample T) {
@@ -30,7 +40,11 @@ func (r *Registry[T]) RegisterType(sample T) {
} }
func (r *Registry[T]) NewByTypeName(typeName string) T { func (r *Registry[T]) NewByTypeName(typeName string) T {
return reflect.New(r.Types[typeName]).Interface().(T) t := r.Types[typeName]
if t == nil {
panic(fmt.Errorf("type '%s' is not registered", typeName))
}
return reflect.New(t).Interface().(T)
} }
func (r *Registry[T]) ListTypeNames() []string { func (r *Registry[T]) ListTypeNames() []string {

View File

@@ -0,0 +1,69 @@
package serializable
import (
"fmt"
"runtime/debug"
"github.com/goccy/go-yaml"
"github.com/xaionaro-go/streamctl/pkg/serializable/registry"
)
type serializableInterface interface {
yaml.BytesMarshaler
yaml.BytesUnmarshaler
}
type Serializable[T any] struct {
Value T
}
type serializable[T any] struct {
Type string `yaml:"type"`
Value T `yaml:",inline"`
}
var _ serializableInterface = (*Serializable[struct{}])(nil)
func (s *Serializable[T]) UnmarshalYAML(b []byte) (_err error) {
defer func() {
if r := recover(); r != nil {
_err = fmt.Errorf("got a panic: %v\n%s", r, debug.Stack())
}
}()
intermediate := serializable[struct{}]{}
err := yaml.Unmarshal(b, &intermediate)
if err != nil {
return fmt.Errorf("unable to unmarshal the intermediate structure for %T: %w", s.Value, err)
}
if intermediate.Type == "" {
return fmt.Errorf("'type' is not set")
}
value, ok := NewByTypeName[T](intermediate.Type)
if !ok {
return fmt.Errorf("unknown/unregistered value type name: '%v'", intermediate.Type)
}
err = yaml.Unmarshal(b, value)
if err != nil {
return fmt.Errorf("unable to unmarshal the %T structure: %w", s.Value, err)
}
s.Value = value
return nil
}
func (s Serializable[T]) MarshalYAML() (_ []byte, _err error) {
defer func() {
if r := recover(); r != nil {
_err = fmt.Errorf("got a panic: %v\n%s", r, debug.Stack())
}
}()
return yaml.Marshal(serializable[T]{
Type: registry.ToTypeName(s.Value),
Value: s.Value,
})
}

View File

@@ -8,9 +8,6 @@ import (
const ID = types.ID const ID = types.ID
type Config = types.Config type Config = types.Config
type SceneName = types.SceneName
type SceneRule = types.SceneRule
type SceneRules = types.SceneRules
type StreamProfile = types.StreamProfile type StreamProfile = types.StreamProfile
type PlatformSpecificConfig = types.PlatformSpecificConfig type PlatformSpecificConfig = types.PlatformSpecificConfig

View File

@@ -1,28 +0,0 @@
package action
func init() {
registry.RegisterType((*ElementShowHide)(nil))
registry.RegisterType((*WindowCaptureSetSource)(nil))
}
type Action interface {
isAction()
}
type ValueExpression string
type ElementShowHide struct {
ElementName *string `yaml:"element_name,omitempty" json:"element_name,omitempty"`
ElementUUID *string `yaml:"element_uuid,omitempty" json:"element_uuid,omitempty"`
ValueExpression ValueExpression `yaml:"value_expression,omitempty" json:"value_expression,omitempty"`
}
func (ElementShowHide) isAction() {}
type WindowCaptureSetSource struct {
ElementName *string `yaml:"element_name,omitempty" json:"element_name,omitempty"`
ElementUUID *string `yaml:"element_uuid,omitempty" json:"element_uuid,omitempty"`
ValueExpression ValueExpression `yaml:"value_expression,omitempty" json:"value_expression,omitempty"`
}
func (WindowCaptureSetSource) isAction() {}

View File

@@ -1,15 +0,0 @@
package action
import (
registrylib "github.com/xaionaro-go/streamctl/pkg/streamcontrol/obs/types/registry"
)
var registry = registrylib.New[Action]()
func NewByTypeName(typeName string) Action {
return registry.NewByTypeName(typeName)
}
func ListTypeNames() []string {
return registry.ListTypeNames()
}

View File

@@ -6,14 +6,10 @@ import (
const ID = streamctl.PlatformName("obs") const ID = streamctl.PlatformName("obs")
type SceneName string
type PlatformSpecificConfig struct { type PlatformSpecificConfig struct {
Host string Host string
Port uint16 Port uint16
Password string `yaml:"pass" json:"pass"` Password string `yaml:"pass" json:"pass"`
SceneRulesByScene map[SceneName]SceneRules `yaml:"scene_rules" json:"scene_rules"`
} }
type Config = streamctl.PlatformConfig[PlatformSpecificConfig, StreamProfile] type Config = streamctl.PlatformConfig[PlatformSpecificConfig, StreamProfile]

View File

@@ -1,128 +0,0 @@
package types
import (
"fmt"
"runtime/debug"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol/obs/types/action"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol/obs/types/registry"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol/obs/types/trigger"
"gopkg.in/yaml.v2"
)
type SceneRules []SceneRule
type SceneRule struct {
Description string `yaml:"description,omitempty" json:"description,omitempty"`
TriggerQuery trigger.Query `yaml:"trigger" json:"trigger"`
Action action.Action `yaml:"action" json:"action"`
}
func (sr *SceneRule) UnmarshalYAML(b []byte) (_err error) {
defer func() {
if r := recover(); r != nil {
_err = fmt.Errorf("got a panic: %v\n%s", r, debug.Stack())
}
}()
if sr == nil {
return fmt.Errorf("nil SceneRule")
}
intermediate := serializableSceneRule{}
err := yaml.Unmarshal(b, &intermediate)
if err != nil {
return fmt.Errorf("unable to unmarshal the MonitorElementConfig: %w", err)
}
triggerQueryType, _ := intermediate.Trigger["type"].(string)
if triggerQueryType == "" {
return fmt.Errorf("trigger type is not set")
}
actionType, _ := intermediate.Action["type"].(string)
if actionType == "" {
return fmt.Errorf("action type is not set")
}
triggerQuery := trigger.NewByTypeName(triggerQueryType)
if triggerQuery == nil {
return fmt.Errorf("unknown trigger type name: '%v'", triggerQueryType)
}
action := action.NewByTypeName(actionType)
if action == nil {
return fmt.Errorf("unknown action type name: '%v'", actionType)
}
*sr = SceneRule{
Description: intermediate.Description,
TriggerQuery: triggerQuery,
Action: action,
}
return nil
}
func (sr SceneRule) MarshalYAML() (b []byte, _err error) {
defer func() {
if r := recover(); r != nil {
_err = fmt.Errorf("got a panic: %v\n%s", r, debug.Stack())
}
}()
triggerBytes, err := yaml.Marshal(sr.TriggerQuery)
if err != nil {
return nil, fmt.Errorf(
"unable to serialize the trigger %T:%#+v: %w",
sr.TriggerQuery,
sr.TriggerQuery,
err,
)
}
triggerMap := map[string]any{}
err = yaml.Unmarshal(triggerBytes, &triggerMap)
if err != nil {
return nil, fmt.Errorf(
"unable to unserialize the trigger '%s' into a map: %w",
triggerBytes,
err,
)
}
triggerMap["type"] = registry.ToTypeName(sr.TriggerQuery)
actionBytes, err := yaml.Marshal(sr.Action)
if err != nil {
return nil, fmt.Errorf(
"unable to serialize the action %T:%#+v: %w",
sr.Action,
sr.Action,
err,
)
}
actionMap := map[string]any{}
err = yaml.Unmarshal(actionBytes, &actionMap)
if err != nil {
return nil, fmt.Errorf(
"unable to unserialize the action '%s' into a map: %w",
actionBytes,
err,
)
}
actionMap["type"] = registry.ToTypeName(sr.Action)
intermediate := serializableSceneRule{
Description: sr.Description,
Trigger: triggerMap,
Action: actionMap,
}
return yaml.Marshal(intermediate)
}
type serializableSceneRule struct {
Description string `yaml:"description,omitempty" json:"description,omitempty"`
Trigger map[string]any `yaml:"trigger" json:"trigger"`
Action map[string]any `yaml:"action" json:"action"`
}

View File

@@ -1,15 +0,0 @@
package trigger
import (
registrylib "github.com/xaionaro-go/streamctl/pkg/streamcontrol/obs/types/registry"
)
var registry = registrylib.New[Query]()
func NewByTypeName(typeName string) Query {
return registry.NewByTypeName(typeName)
}
func ListQueryTypeNames() []string {
return registry.ListTypeNames()
}

View File

@@ -9,9 +9,10 @@ import (
"github.com/xaionaro-go/obs-grpc-proxy/protobuf/go/obs_grpc" "github.com/xaionaro-go/obs-grpc-proxy/protobuf/go/obs_grpc"
"github.com/xaionaro-go/streamctl/pkg/player" "github.com/xaionaro-go/streamctl/pkg/player"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol" "github.com/xaionaro-go/streamctl/pkg/streamcontrol"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol/obs"
"github.com/xaionaro-go/streamctl/pkg/streamd/cache" "github.com/xaionaro-go/streamctl/pkg/streamd/cache"
"github.com/xaionaro-go/streamctl/pkg/streamd/config" "github.com/xaionaro-go/streamctl/pkg/streamd/config"
"github.com/xaionaro-go/streamctl/pkg/streamd/config/action"
"github.com/xaionaro-go/streamctl/pkg/streamd/config/event"
"github.com/xaionaro-go/streamctl/pkg/streamd/grpc/go/streamd_grpc" "github.com/xaionaro-go/streamctl/pkg/streamd/grpc/go/streamd_grpc"
"github.com/xaionaro-go/streamctl/pkg/streampanel/consts" "github.com/xaionaro-go/streamctl/pkg/streampanel/consts"
sptypes "github.com/xaionaro-go/streamctl/pkg/streamplayer/types" sptypes "github.com/xaionaro-go/streamctl/pkg/streamplayer/types"
@@ -29,7 +30,10 @@ type StreamD interface {
SaveConfig(ctx context.Context) error SaveConfig(ctx context.Context) error
GetConfig(ctx context.Context) (*config.Config, error) GetConfig(ctx context.Context) (*config.Config, error)
SetConfig(ctx context.Context, cfg *config.Config) error SetConfig(ctx context.Context, cfg *config.Config) error
IsBackendEnabled(ctx context.Context, id streamcontrol.PlatformName) (bool, error) IsBackendEnabled(
ctx context.Context,
id streamcontrol.PlatformName,
) (bool, error)
OBSOLETE_IsGITInitialized(ctx context.Context) (bool, error) OBSOLETE_IsGITInitialized(ctx context.Context) (bool, error)
StartStream( StartStream(
ctx context.Context, ctx context.Context,
@@ -46,8 +50,16 @@ type StreamD interface {
profile streamcontrol.AbstractStreamProfile, profile streamcontrol.AbstractStreamProfile,
customArgs ...any, customArgs ...any,
) error ) error
SetTitle(ctx context.Context, platID streamcontrol.PlatformName, title string) error SetTitle(
SetDescription(ctx context.Context, platID streamcontrol.PlatformName, description string) error ctx context.Context,
platID streamcontrol.PlatformName,
title string,
) error
SetDescription(
ctx context.Context,
platID streamcontrol.PlatformName,
description string,
) error
ApplyProfile( ApplyProfile(
ctx context.Context, ctx context.Context,
platID streamcontrol.PlatformName, platID streamcontrol.PlatformName,
@@ -55,7 +67,10 @@ type StreamD interface {
customArgs ...any, customArgs ...any,
) error ) error
OBSOLETE_GitRelogin(ctx context.Context) error OBSOLETE_GitRelogin(ctx context.Context) error
GetBackendData(ctx context.Context, platID streamcontrol.PlatformName) (any, error) GetBackendData(
ctx context.Context,
platID streamcontrol.PlatformName,
) (any, error)
Restart(ctx context.Context) error Restart(ctx context.Context) error
EXPERIMENTAL_ReinitStreamControllers(ctx context.Context) error EXPERIMENTAL_ReinitStreamControllers(ctx context.Context) error
GetStreamStatus( GetStreamStatus(
@@ -63,7 +78,11 @@ type StreamD interface {
platID streamcontrol.PlatformName, platID streamcontrol.PlatformName,
) (*streamcontrol.StreamStatus, error) ) (*streamcontrol.StreamStatus, error)
GetVariable(ctx context.Context, key consts.VarKey) ([]byte, error) GetVariable(ctx context.Context, key consts.VarKey) ([]byte, error)
GetVariableHash(ctx context.Context, key consts.VarKey, hashType crypto.Hash) ([]byte, error) GetVariableHash(
ctx context.Context,
key consts.VarKey,
hashType crypto.Hash,
) ([]byte, error)
SetVariable(ctx context.Context, key consts.VarKey, value []byte) error SetVariable(ctx context.Context, key consts.VarKey, value []byte) error
OBS(ctx context.Context) (obs_grpc.OBSServer, context.CancelFunc, error) OBS(ctx context.Context) (obs_grpc.OBSServer, context.CancelFunc, error)
@@ -169,39 +188,89 @@ type StreamD interface {
streamID streamtypes.StreamID, streamID streamtypes.StreamID,
) (*StreamPlayer, error) ) (*StreamPlayer, error)
StreamPlayerProcessTitle(ctx context.Context, streamID StreamID) (string, error) StreamPlayerProcessTitle(
StreamPlayerOpenURL(ctx context.Context, streamID StreamID, link string) error ctx context.Context,
streamID StreamID,
) (string, error)
StreamPlayerOpenURL(
ctx context.Context,
streamID StreamID,
link string,
) error
StreamPlayerGetLink(ctx context.Context, streamID StreamID) (string, error) StreamPlayerGetLink(ctx context.Context, streamID StreamID) (string, error)
StreamPlayerEndChan(ctx context.Context, streamID StreamID) (<-chan struct{}, error) StreamPlayerEndChan(
ctx context.Context,
streamID StreamID,
) (<-chan struct{}, error)
StreamPlayerIsEnded(ctx context.Context, streamID StreamID) (bool, error) StreamPlayerIsEnded(ctx context.Context, streamID StreamID) (bool, error)
StreamPlayerGetPosition(ctx context.Context, streamID StreamID) (time.Duration, error) StreamPlayerGetPosition(
StreamPlayerGetLength(ctx context.Context, streamID StreamID) (time.Duration, error) ctx context.Context,
StreamPlayerSetSpeed(ctx context.Context, streamID StreamID, speed float64) error streamID StreamID,
StreamPlayerSetPause(ctx context.Context, streamID StreamID, pause bool) error ) (time.Duration, error)
StreamPlayerGetLength(
ctx context.Context,
streamID StreamID,
) (time.Duration, error)
StreamPlayerSetSpeed(
ctx context.Context,
streamID StreamID,
speed float64,
) error
StreamPlayerSetPause(
ctx context.Context,
streamID StreamID,
pause bool,
) error
StreamPlayerStop(ctx context.Context, streamID StreamID) error StreamPlayerStop(ctx context.Context, streamID StreamID) error
StreamPlayerClose(ctx context.Context, streamID StreamID) error StreamPlayerClose(ctx context.Context, streamID StreamID) error
SubscribeToConfigChanges(ctx context.Context) (<-chan DiffConfig, error) SubscribeToConfigChanges(ctx context.Context) (<-chan DiffConfig, error)
SubscribeToStreamsChanges(ctx context.Context) (<-chan DiffStreams, error) SubscribeToStreamsChanges(ctx context.Context) (<-chan DiffStreams, error)
SubscribeToStreamServersChanges(ctx context.Context) (<-chan DiffStreamServers, error) SubscribeToStreamServersChanges(
SubscribeToStreamDestinationsChanges(ctx context.Context) (<-chan DiffStreamDestinations, error) ctx context.Context,
SubscribeToIncomingStreamsChanges(ctx context.Context) (<-chan DiffIncomingStreams, error) ) (<-chan DiffStreamServers, error)
SubscribeToStreamForwardsChanges(ctx context.Context) (<-chan DiffStreamForwards, error) SubscribeToStreamDestinationsChanges(
SubscribeToStreamPlayersChanges(ctx context.Context) (<-chan DiffStreamPlayers, error) ctx context.Context,
) (<-chan DiffStreamDestinations, error)
SubscribeToIncomingStreamsChanges(
ctx context.Context,
) (<-chan DiffIncomingStreams, error)
SubscribeToStreamForwardsChanges(
ctx context.Context,
) (<-chan DiffStreamForwards, error)
SubscribeToStreamPlayersChanges(
ctx context.Context,
) (<-chan DiffStreamPlayers, error)
AddTimer(ctx context.Context, triggerAt time.Time, action TimerAction) (TimerID, error) AddTimer(
ctx context.Context,
triggerAt time.Time,
action Action,
) (TimerID, error)
RemoveTimer(ctx context.Context, timerID TimerID) error RemoveTimer(ctx context.Context, timerID TimerID) error
ListTimers(ctx context.Context) ([]Timer, error) ListTimers(ctx context.Context) ([]Timer, error)
AddOBSSceneRule(ctx context.Context, sceneName SceneName, sceneRule SceneRule) error AddTriggerRule(
UpdateOBSSceneRule(
ctx context.Context, ctx context.Context,
sceneName SceneName, triggerRule *config.TriggerRule,
idx uint64, ) (TriggerRuleID, error)
sceneRule SceneRule, UpdateTriggerRule(
ctx context.Context,
ruleID TriggerRuleID,
triggerRule *config.TriggerRule,
) error
RemoveTriggerRule(
ctx context.Context,
ruleID TriggerRuleID,
) error
ListTriggerRules(
ctx context.Context,
) (TriggerRules, error)
SubmitEvent(
ctx context.Context,
event event.Event,
) error ) error
RemoveOBSSceneRule(ctx context.Context, sceneName SceneName, idx uint64) error
ListOBSSceneRules(ctx context.Context, sceneName SceneName) (SceneRules, error)
} }
type StreamPlayer = sstypes.StreamPlayer type StreamPlayer = sstypes.StreamPlayer
@@ -252,7 +321,9 @@ type DestinationID = streamtypes.DestinationID
type OBSInstanceID = streamtypes.OBSInstanceID type OBSInstanceID = streamtypes.OBSInstanceID
type StreamForwardingQuirks = sstypes.ForwardingQuirks type StreamForwardingQuirks = sstypes.ForwardingQuirks
type RestartUntilYoutubeRecognizesStream = sstypes.RestartUntilYoutubeRecognizesStream type RestartUntilYoutubeRecognizesStream = sstypes.RestartUntilYoutubeRecognizesStream
type StartAfterYoutubeRecognizedStream = sstypes.StartAfterYoutubeRecognizedStream type StartAfterYoutubeRecognizedStream = sstypes.StartAfterYoutubeRecognizedStream
type DiffConfig struct{} type DiffConfig struct{}
@@ -263,51 +334,16 @@ type DiffIncomingStreams struct{}
type DiffStreamForwards struct{} type DiffStreamForwards struct{}
type DiffStreamPlayers struct{} type DiffStreamPlayers struct{}
/*
AddTimer(ctx context.Context, triggerAt time.Time, action TimerAction) (TimerID, error)
RemoveTimer(ctx context.Context, timerID TimerID) error
ListTimers(ctx context.Context) ([]Timer, error)
*/
type TimerAction interface {
timerAction() // just to enable build-time type checks
}
type TimerID uint64 type TimerID uint64
type Timer struct { type Timer struct {
ID TimerID ID TimerID
TriggerAt time.Time TriggerAt time.Time
Action TimerAction Action Action
} }
type TimerActionNoop struct{} type Action = action.Action
var _ TimerAction = (*TimerActionNoop)(nil) type TriggerRuleID uint64
type TriggerRule = config.TriggerRule
func (*TimerActionNoop) timerAction() {} type TriggerRules = config.TriggerRules
type TimerActionStartStream struct {
PlatID streamcontrol.PlatformName
Title string
Description string
Profile streamcontrol.AbstractStreamProfile
CustomArgs []any
}
var _ TimerAction = (*TimerActionStartStream)(nil)
func (*TimerActionStartStream) timerAction() {}
type TimerActionEndStream struct {
PlatID streamcontrol.PlatformName
}
var _ TimerAction = (*TimerActionEndStream)(nil)
func (*TimerActionEndStream) timerAction() {}
type SceneName = obs.SceneName
type SceneRule = obs.SceneRule
type SceneRules = obs.SceneRules

View File

@@ -28,6 +28,7 @@ import (
youtube "github.com/xaionaro-go/streamctl/pkg/streamcontrol/youtube/types" youtube "github.com/xaionaro-go/streamctl/pkg/streamcontrol/youtube/types"
"github.com/xaionaro-go/streamctl/pkg/streamd/api" "github.com/xaionaro-go/streamctl/pkg/streamd/api"
streamdconfig "github.com/xaionaro-go/streamctl/pkg/streamd/config" streamdconfig "github.com/xaionaro-go/streamctl/pkg/streamd/config"
"github.com/xaionaro-go/streamctl/pkg/streamd/config/event"
"github.com/xaionaro-go/streamctl/pkg/streamd/grpc/go/streamd_grpc" "github.com/xaionaro-go/streamctl/pkg/streamd/grpc/go/streamd_grpc"
"github.com/xaionaro-go/streamctl/pkg/streamd/grpc/goconv" "github.com/xaionaro-go/streamctl/pkg/streamd/grpc/goconv"
"github.com/xaionaro-go/streamctl/pkg/streampanel/consts" "github.com/xaionaro-go/streamctl/pkg/streampanel/consts"
@@ -2422,7 +2423,7 @@ func (c *Client) GetLoggingLevel(
func (c *Client) AddTimer( func (c *Client) AddTimer(
ctx context.Context, ctx context.Context,
triggerAt time.Time, triggerAt time.Time,
action api.TimerAction, action api.Action,
) (api.TimerID, error) { ) (api.TimerID, error) {
actionGRPC, err := goconv.ActionGo2GRPC(action) actionGRPC, err := goconv.ActionGo2GRPC(action)
if err != nil { if err != nil {
@@ -2529,33 +2530,142 @@ func (c *Client) ListTimers(
return result, nil return result, nil
} }
func (c *Client) AddOBSSceneRule( func (c *Client) AddTriggerRule(
ctx context.Context, ctx context.Context,
sceneName obs.SceneName, triggerRule *api.TriggerRule,
sceneRule obs.SceneRule, ) (api.TriggerRuleID, error) {
triggerRuleGRPC, err := goconv.TriggerRuleGo2GRPC(triggerRule)
if err != nil {
return 0, fmt.Errorf("unable to convert the trigger rule %#+v: %w", triggerRule, err)
}
resp, err := withStreamDClient(ctx, c, func(
ctx context.Context,
client streamd_grpc.StreamDClient,
conn io.Closer,
) (*streamd_grpc.AddTriggerRuleReply, error) {
return callWrapper(
ctx,
c,
client.AddTriggerRule,
&streamd_grpc.AddTriggerRuleRequest{
Rule: triggerRuleGRPC,
},
)
})
if err != nil {
return 0, fmt.Errorf("unable to add the trigger rule %#+v: %w", triggerRule, err)
}
return api.TriggerRuleID(resp.GetRuleID()), nil
}
func (c *Client) UpdateTriggerRule(
ctx context.Context,
ruleID api.TriggerRuleID,
triggerRule *api.TriggerRule,
) error { ) error {
triggerRuleGRPC, err := goconv.TriggerRuleGo2GRPC(triggerRule)
if err != nil {
return fmt.Errorf("unable to convert the trigger rule %#+v: %w", ruleID, triggerRule, err)
} }
_, err = withStreamDClient(ctx, c, func(
func (c *Client) UpdateOBSSceneRule(
ctx context.Context, ctx context.Context,
sceneName obs.SceneName, client streamd_grpc.StreamDClient,
idx uint64, conn io.Closer,
sceneRule obs.SceneRule, ) (*streamd_grpc.UpdateTriggerRuleReply, error) {
return callWrapper(
ctx,
c,
client.UpdateTriggerRule,
&streamd_grpc.UpdateTriggerRuleRequest{
RuleID: uint64(ruleID),
Rule: triggerRuleGRPC,
},
)
})
if err != nil {
return fmt.Errorf("unable to update the trigger rule %d to %#+v: %w", ruleID, triggerRule, err)
}
return nil
}
func (c *Client) RemoveTriggerRule(
ctx context.Context,
ruleID api.TriggerRuleID,
) error { ) error {
_, err := withStreamDClient(ctx, c, func(
}
func (c *Client) RemoveOBSSceneRule(
ctx context.Context, ctx context.Context,
sceneName obs.SceneName, client streamd_grpc.StreamDClient,
idx uint64, conn io.Closer,
) (*streamd_grpc.RemoveTriggerRuleReply, error) {
return callWrapper(
ctx,
c,
client.RemoveTriggerRule,
&streamd_grpc.RemoveTriggerRuleRequest{
RuleID: uint64(ruleID),
},
)
})
if err != nil {
return fmt.Errorf("unable to remove the rule %d: %w", ruleID, err)
}
return nil
}
func (c *Client) ListTriggerRules(
ctx context.Context,
) (api.TriggerRules, error) {
response, err := withStreamDClient(ctx, c, func(
ctx context.Context,
client streamd_grpc.StreamDClient,
conn io.Closer,
) (*streamd_grpc.ListTriggerRulesReply, error) {
return callWrapper(
ctx,
c,
client.ListTriggerRules,
&streamd_grpc.ListTriggerRulesRequest{},
)
})
if err != nil {
return nil, fmt.Errorf("unable to list the rules: %w", err)
}
rules := response.GetRules()
result := make(api.TriggerRules, 0, len(rules))
for _, ruleGRPC := range rules {
rule, err := goconv.TriggerRuleGRPC2Go(ruleGRPC)
if err != nil {
return nil, fmt.Errorf("unable to convert the trigger rule %#+v: %w", rule, err)
}
result = append(result, rule)
}
return result, nil
}
func (c *Client) SubmitEvent(
ctx context.Context,
event event.Event,
) error { ) error {
eventGRPC, err := goconv.EventGo2GRPC(event)
if err != nil {
return fmt.Errorf("unable to convert the event: %w", err)
} }
_, err = withStreamDClient(ctx, c, func(
func (c *Client) ListOBSSceneRules(
ctx context.Context, ctx context.Context,
sceneName obs.SceneName, client streamd_grpc.StreamDClient,
) (obs.SceneRules, error) { conn io.Closer,
) (*streamd_grpc.SubmitEventReply, error) {
return callWrapper(
ctx,
c,
client.SubmitEvent,
&streamd_grpc.SubmitEventRequest{
Event: eventGRPC,
},
)
})
if err != nil {
return fmt.Errorf("unable to submit the event: %w", err)
}
return nil
} }

View File

@@ -0,0 +1,70 @@
package action
import (
"github.com/xaionaro-go/streamctl/pkg/expression"
"github.com/xaionaro-go/streamctl/pkg/serializable"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol"
)
func init() {
serializable.RegisterType[Noop]()
serializable.RegisterType[OBSElementShowHide]()
serializable.RegisterType[OBSWindowCaptureSetSource]()
serializable.RegisterType[StartStream]()
serializable.RegisterType[EndStream]()
}
type Action interface {
isAction() // just to enable build-time type checks
}
type ValueExpression = expression.Expression
type OBSElementShowHide struct {
ElementName *string `yaml:"element_name,omitempty" json:"element_name,omitempty"`
ElementUUID *string `yaml:"element_uuid,omitempty" json:"element_uuid,omitempty"`
ValueExpression ValueExpression `yaml:"value_expression,omitempty" json:"value_expression,omitempty"`
}
var _ Action = (*OBSElementShowHide)(nil)
func (OBSElementShowHide) isAction() {}
type OBSWindowCaptureSetSource struct {
ElementName *string `yaml:"element_name,omitempty" json:"element_name,omitempty"`
ElementUUID *string `yaml:"element_uuid,omitempty" json:"element_uuid,omitempty"`
ValueExpression ValueExpression `yaml:"value_expression,omitempty" json:"value_expression,omitempty"`
}
var _ Action = (*OBSWindowCaptureSetSource)(nil)
func (OBSWindowCaptureSetSource) isAction() {}
type Noop struct{}
var _ Action = (*Noop)(nil)
func (*Noop) isAction() {}
type StartStream struct {
PlatID streamcontrol.PlatformName
Title string
Description string
Profile streamcontrol.AbstractStreamProfile
CustomArgs []any
//lint:ignore U1000 this field is used by reflection
uiDisable struct{} // currently out current reflect-y generator of fyne-Entry-ies does not support interfaces like field 'Profile' here, so we just forbid using this action.
}
var _ Action = (*StartStream)(nil)
func (*StartStream) isAction() {}
type EndStream struct {
PlatID streamcontrol.PlatformName
}
var _ Action = (*EndStream)(nil)
func (*EndStream) isAction() {}

View File

@@ -26,6 +26,7 @@ type config struct {
ProfileMetadata map[streamcontrol.ProfileName]ProfileMetadata ProfileMetadata map[streamcontrol.ProfileName]ProfileMetadata
StreamServer streamserver.Config `yaml:"stream_server"` StreamServer streamserver.Config `yaml:"stream_server"`
Monitor MonitorConfig Monitor MonitorConfig
TriggerRules TriggerRules `yaml:"trigger_rules"`
} }
type Config config type Config config

View File

@@ -1,16 +1,13 @@
package trigger package event
import "github.com/xaionaro-go/streamctl/pkg/serializable"
func init() { func init() {
//registry.RegisterType((*Not)(nil)) serializable.RegisterType[WindowFocusChange]()
registry.RegisterType((*WindowFocusChange)(nil))
} }
type Query interface { type Event interface {
isTriggerQuery() isEvent() // just to enable build-time type checks
}
type Not struct {
Query `yaml:"query"`
} }
type WindowFocusChange struct { type WindowFocusChange struct {
@@ -23,4 +20,4 @@ type WindowFocusChange struct {
uiComment struct{} `uicomment:"This action will also add field .IsFocused to the event."` uiComment struct{} `uicomment:"This action will also add field .IsFocused to the event."`
} }
func (WindowFocusChange) isTriggerQuery() {} func (WindowFocusChange) isEvent() {}

View File

@@ -0,0 +1,27 @@
package eventquery
import (
"github.com/xaionaro-go/streamctl/pkg/serializable"
"github.com/xaionaro-go/streamctl/pkg/streamd/config/event"
)
func init() {
serializable.RegisterType[EventType[event.WindowFocusChange]]()
}
type EventQuery interface {
Match(event.Event) bool
}
type Event serializable.Serializable[event.Event]
func (ev Event) Match(cmp event.Event) bool {
return ev.Value == cmp
}
type EventType[T event.Event] struct{}
func (EventType[T]) Match(ev event.Event) bool {
_, ok := ev.(T)
return ok
}

View File

@@ -0,0 +1,94 @@
package config
import (
"encoding/json"
"fmt"
"github.com/goccy/go-yaml"
"github.com/xaionaro-go/streamctl/pkg/serializable"
"github.com/xaionaro-go/streamctl/pkg/serializable/registry"
"github.com/xaionaro-go/streamctl/pkg/streamd/config/action"
"github.com/xaionaro-go/streamctl/pkg/streamd/config/event"
"github.com/xaionaro-go/streamctl/pkg/streamd/config/event/eventquery"
)
type TriggerRules []*TriggerRule
type Event = event.Event
type EventQuery = eventquery.EventQuery
type TriggerRule struct {
Description string `yaml:"description,omitempty" json:"description,omitempty"`
EventQuery eventquery.EventQuery `yaml:"trigger" json:"trigger"`
Action action.Action `yaml:"action" json:"action"`
}
var _ yaml.BytesMarshaler = (*TriggerRule)(nil)
var _ yaml.BytesUnmarshaler = (*TriggerRule)(nil)
func (tr *TriggerRule) UnmarshalYAML(b []byte) (_err error) {
if tr == nil {
return fmt.Errorf("nil TriggerRule")
}
intermediate := serializableTriggerRule{}
err := yaml.Unmarshal(b, &intermediate)
if err != nil {
return fmt.Errorf("unable to unmarshal the TriggerRule: %w", err)
}
*tr = TriggerRule{
Description: intermediate.Description,
EventQuery: intermediate.EventQuery.Value,
Action: intermediate.Action.Value,
}
return nil
}
func (tr TriggerRule) MarshalYAML() (b []byte, _err error) {
return yaml.Marshal(serializableTriggerRule{
Description: "",
EventQuery: serializable.Serializable[eventquery.EventQuery]{Value: tr.EventQuery},
Action: serializable.Serializable[action.Action]{Value: tr.Action},
})
}
type serializableTriggerRule struct {
Description string `yaml:"description,omitempty" json:"description,omitempty"`
EventQuery serializable.Serializable[eventquery.EventQuery] `yaml:"trigger" json:"trigger"`
Action serializable.Serializable[action.Action] `yaml:"action" json:"action"`
}
func (tr TriggerRule) String() string {
descr := tr.Description
if descr != "" {
descr += ": "
}
eventQueryJSON := string(tryJSON(tr.EventQuery))
if eventQueryJSON != "{}" {
eventQueryJSON = ":" + eventQueryJSON
} else {
eventQueryJSON = ""
}
actionJSON := string(tryJSON(tr.Action))
if actionJSON != "{}" {
actionJSON = ":" + actionJSON
} else {
actionJSON = ""
}
return fmt.Sprintf(
"%s%s%s -> %s%s",
descr,
typeName(tr.EventQuery), eventQueryJSON,
typeName(tr.Action), actionJSON,
)
}
func typeName(value any) string {
return registry.ToTypeName(value)
}
func tryJSON(value any) []byte {
b, _ := json.Marshal(value)
return b
}

82
pkg/streamd/events.go Normal file
View File

@@ -0,0 +1,82 @@
package streamd
import (
"context"
"encoding/json"
"fmt"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/streamctl/pkg/expression"
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/streamctl/pkg/streamd/config/action"
"github.com/xaionaro-go/streamctl/pkg/streamd/config/event"
"github.com/xaionaro-go/streamctl/pkg/xsync"
)
func (d *StreamD) SubmitEvent(
ctx context.Context,
ev event.Event,
) error {
return xsync.DoA2R1(ctx, &d.ConfigLock, d.submitEvent, ctx, ev)
}
func objToMap(obj any) map[string]any {
b, err := json.Marshal(obj)
if err != nil {
panic(err)
}
m := map[string]any{}
err = json.Unmarshal(b, &m)
if err != nil {
panic(err)
}
return m
}
func (d *StreamD) submitEvent(
ctx context.Context,
ev event.Event,
) error {
exprCtx := objToMap(ev)
for _, rule := range d.Config.TriggerRules {
if rule.EventQuery.Match(ev) {
observability.Go(ctx, func() {
err := d.doAction(ctx, rule.Action, exprCtx)
if err != nil {
logger.Errorf(ctx, "unable to perform action %#+v: %w", rule.Action, err)
}
})
}
}
return nil
}
func (d *StreamD) doAction(
ctx context.Context,
a action.Action,
exprCtx any,
) error {
switch a := a.(type) {
case *action.Noop:
return nil
case *action.StartStream:
return d.StartStream(ctx, a.PlatID, a.Title, a.Description, a.Profile, a.CustomArgs...)
case *action.EndStream:
return d.EndStream(ctx, a.PlatID)
case *action.OBSElementShowHide:
value, err := expression.Eval[bool](a.ValueExpression, exprCtx)
if err != nil {
return fmt.Errorf("unable to Eval() the expression '%s': %w", a.ValueExpression, err)
}
return d.OBSElementSetShow(
ctx,
SceneElementIdentifier{
Name: a.ElementName,
UUID: a.ElementUUID,
},
value,
)
default:
return fmt.Errorf("unknown action type: %T", a)
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -90,6 +90,11 @@ type StreamDClient interface {
AddTimer(ctx context.Context, in *AddTimerRequest, opts ...grpc.CallOption) (*AddTimerReply, error) AddTimer(ctx context.Context, in *AddTimerRequest, opts ...grpc.CallOption) (*AddTimerReply, error)
RemoveTimer(ctx context.Context, in *RemoveTimerRequest, opts ...grpc.CallOption) (*RemoveTimerReply, error) RemoveTimer(ctx context.Context, in *RemoveTimerRequest, opts ...grpc.CallOption) (*RemoveTimerReply, error)
ListTimers(ctx context.Context, in *ListTimersRequest, opts ...grpc.CallOption) (*ListTimersReply, error) ListTimers(ctx context.Context, in *ListTimersRequest, opts ...grpc.CallOption) (*ListTimersReply, error)
ListTriggerRules(ctx context.Context, in *ListTriggerRulesRequest, opts ...grpc.CallOption) (*ListTriggerRulesReply, error)
AddTriggerRule(ctx context.Context, in *AddTriggerRuleRequest, opts ...grpc.CallOption) (*AddTriggerRuleReply, error)
RemoveTriggerRule(ctx context.Context, in *RemoveTriggerRuleRequest, opts ...grpc.CallOption) (*RemoveTriggerRuleReply, error)
UpdateTriggerRule(ctx context.Context, in *UpdateTriggerRuleRequest, opts ...grpc.CallOption) (*UpdateTriggerRuleReply, error)
SubmitEvent(ctx context.Context, in *SubmitEventRequest, opts ...grpc.CallOption) (*SubmitEventReply, error)
} }
type streamDClient struct { type streamDClient struct {
@@ -942,6 +947,51 @@ func (c *streamDClient) ListTimers(ctx context.Context, in *ListTimersRequest, o
return out, nil return out, nil
} }
func (c *streamDClient) ListTriggerRules(ctx context.Context, in *ListTriggerRulesRequest, opts ...grpc.CallOption) (*ListTriggerRulesReply, error) {
out := new(ListTriggerRulesReply)
err := c.cc.Invoke(ctx, "/streamd.StreamD/ListTriggerRules", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *streamDClient) AddTriggerRule(ctx context.Context, in *AddTriggerRuleRequest, opts ...grpc.CallOption) (*AddTriggerRuleReply, error) {
out := new(AddTriggerRuleReply)
err := c.cc.Invoke(ctx, "/streamd.StreamD/AddTriggerRule", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *streamDClient) RemoveTriggerRule(ctx context.Context, in *RemoveTriggerRuleRequest, opts ...grpc.CallOption) (*RemoveTriggerRuleReply, error) {
out := new(RemoveTriggerRuleReply)
err := c.cc.Invoke(ctx, "/streamd.StreamD/RemoveTriggerRule", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *streamDClient) UpdateTriggerRule(ctx context.Context, in *UpdateTriggerRuleRequest, opts ...grpc.CallOption) (*UpdateTriggerRuleReply, error) {
out := new(UpdateTriggerRuleReply)
err := c.cc.Invoke(ctx, "/streamd.StreamD/UpdateTriggerRule", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *streamDClient) SubmitEvent(ctx context.Context, in *SubmitEventRequest, opts ...grpc.CallOption) (*SubmitEventReply, error) {
out := new(SubmitEventReply)
err := c.cc.Invoke(ctx, "/streamd.StreamD/SubmitEvent", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// StreamDServer is the server API for StreamD service. // StreamDServer is the server API for StreamD service.
// All implementations must embed UnimplementedStreamDServer // All implementations must embed UnimplementedStreamDServer
// for forward compatibility // for forward compatibility
@@ -1014,6 +1064,11 @@ type StreamDServer interface {
AddTimer(context.Context, *AddTimerRequest) (*AddTimerReply, error) AddTimer(context.Context, *AddTimerRequest) (*AddTimerReply, error)
RemoveTimer(context.Context, *RemoveTimerRequest) (*RemoveTimerReply, error) RemoveTimer(context.Context, *RemoveTimerRequest) (*RemoveTimerReply, error)
ListTimers(context.Context, *ListTimersRequest) (*ListTimersReply, error) ListTimers(context.Context, *ListTimersRequest) (*ListTimersReply, error)
ListTriggerRules(context.Context, *ListTriggerRulesRequest) (*ListTriggerRulesReply, error)
AddTriggerRule(context.Context, *AddTriggerRuleRequest) (*AddTriggerRuleReply, error)
RemoveTriggerRule(context.Context, *RemoveTriggerRuleRequest) (*RemoveTriggerRuleReply, error)
UpdateTriggerRule(context.Context, *UpdateTriggerRuleRequest) (*UpdateTriggerRuleReply, error)
SubmitEvent(context.Context, *SubmitEventRequest) (*SubmitEventReply, error)
mustEmbedUnimplementedStreamDServer() mustEmbedUnimplementedStreamDServer()
} }
@@ -1225,6 +1280,21 @@ func (UnimplementedStreamDServer) RemoveTimer(context.Context, *RemoveTimerReque
func (UnimplementedStreamDServer) ListTimers(context.Context, *ListTimersRequest) (*ListTimersReply, error) { func (UnimplementedStreamDServer) ListTimers(context.Context, *ListTimersRequest) (*ListTimersReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method ListTimers not implemented") return nil, status.Errorf(codes.Unimplemented, "method ListTimers not implemented")
} }
func (UnimplementedStreamDServer) ListTriggerRules(context.Context, *ListTriggerRulesRequest) (*ListTriggerRulesReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method ListTriggerRules not implemented")
}
func (UnimplementedStreamDServer) AddTriggerRule(context.Context, *AddTriggerRuleRequest) (*AddTriggerRuleReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method AddTriggerRule not implemented")
}
func (UnimplementedStreamDServer) RemoveTriggerRule(context.Context, *RemoveTriggerRuleRequest) (*RemoveTriggerRuleReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method RemoveTriggerRule not implemented")
}
func (UnimplementedStreamDServer) UpdateTriggerRule(context.Context, *UpdateTriggerRuleRequest) (*UpdateTriggerRuleReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method UpdateTriggerRule not implemented")
}
func (UnimplementedStreamDServer) SubmitEvent(context.Context, *SubmitEventRequest) (*SubmitEventReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method SubmitEvent not implemented")
}
func (UnimplementedStreamDServer) mustEmbedUnimplementedStreamDServer() {} func (UnimplementedStreamDServer) mustEmbedUnimplementedStreamDServer() {}
// UnsafeStreamDServer may be embedded to opt out of forward compatibility for this service. // UnsafeStreamDServer may be embedded to opt out of forward compatibility for this service.
@@ -2492,6 +2562,96 @@ func _StreamD_ListTimers_Handler(srv interface{}, ctx context.Context, dec func(
return interceptor(ctx, in, info, handler) return interceptor(ctx, in, info, handler)
} }
func _StreamD_ListTriggerRules_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ListTriggerRulesRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(StreamDServer).ListTriggerRules(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/streamd.StreamD/ListTriggerRules",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(StreamDServer).ListTriggerRules(ctx, req.(*ListTriggerRulesRequest))
}
return interceptor(ctx, in, info, handler)
}
func _StreamD_AddTriggerRule_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(AddTriggerRuleRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(StreamDServer).AddTriggerRule(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/streamd.StreamD/AddTriggerRule",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(StreamDServer).AddTriggerRule(ctx, req.(*AddTriggerRuleRequest))
}
return interceptor(ctx, in, info, handler)
}
func _StreamD_RemoveTriggerRule_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RemoveTriggerRuleRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(StreamDServer).RemoveTriggerRule(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/streamd.StreamD/RemoveTriggerRule",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(StreamDServer).RemoveTriggerRule(ctx, req.(*RemoveTriggerRuleRequest))
}
return interceptor(ctx, in, info, handler)
}
func _StreamD_UpdateTriggerRule_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(UpdateTriggerRuleRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(StreamDServer).UpdateTriggerRule(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/streamd.StreamD/UpdateTriggerRule",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(StreamDServer).UpdateTriggerRule(ctx, req.(*UpdateTriggerRuleRequest))
}
return interceptor(ctx, in, info, handler)
}
func _StreamD_SubmitEvent_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(SubmitEventRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(StreamDServer).SubmitEvent(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/streamd.StreamD/SubmitEvent",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(StreamDServer).SubmitEvent(ctx, req.(*SubmitEventRequest))
}
return interceptor(ctx, in, info, handler)
}
// StreamD_ServiceDesc is the grpc.ServiceDesc for StreamD service. // StreamD_ServiceDesc is the grpc.ServiceDesc for StreamD service.
// It's only intended for direct use with grpc.RegisterService, // It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy) // and not to be introspected or modified (even as a copy)
@@ -2731,6 +2891,26 @@ var StreamD_ServiceDesc = grpc.ServiceDesc{
MethodName: "ListTimers", MethodName: "ListTimers",
Handler: _StreamD_ListTimers_Handler, Handler: _StreamD_ListTimers_Handler,
}, },
{
MethodName: "ListTriggerRules",
Handler: _StreamD_ListTriggerRules_Handler,
},
{
MethodName: "AddTriggerRule",
Handler: _StreamD_AddTriggerRule_Handler,
},
{
MethodName: "RemoveTriggerRule",
Handler: _StreamD_RemoveTriggerRule_Handler,
},
{
MethodName: "UpdateTriggerRule",
Handler: _StreamD_UpdateTriggerRule_Handler,
},
{
MethodName: "SubmitEvent",
Handler: _StreamD_SubmitEvent_Handler,
},
}, },
Streams: []grpc.StreamDesc{ Streams: []grpc.StreamDesc{
{ {

View File

@@ -0,0 +1,113 @@
package goconv
import (
"fmt"
"github.com/xaionaro-go/streamctl/pkg/expression"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol"
"github.com/xaionaro-go/streamctl/pkg/streamd/config/action"
"github.com/xaionaro-go/streamctl/pkg/streamd/grpc/go/streamd_grpc"
)
func ActionGRPC2Go(
actionGRPC *streamd_grpc.Action,
) (action.Action, error) {
var result action.Action
switch a := actionGRPC.ActionOneof.(type) {
case *streamd_grpc.Action_NoopRequest:
result = &action.Noop{}
case *streamd_grpc.Action_StartStreamRequest:
platID := streamcontrol.PlatformName(a.StartStreamRequest.PlatID)
profile, err := ProfileGRPC2Go(platID, a.StartStreamRequest.GetProfile())
if err != nil {
return nil, err
}
result = &action.StartStream{
PlatID: platID,
Title: a.StartStreamRequest.Title,
Description: a.StartStreamRequest.Description,
Profile: profile,
CustomArgs: nil,
}
case *streamd_grpc.Action_EndStreamRequest:
result = &action.EndStream{
PlatID: streamcontrol.PlatformName(a.EndStreamRequest.PlatID),
}
case *streamd_grpc.Action_ObsAction:
switch o := a.ObsAction.OBSActionOneOf.(type) {
case *streamd_grpc.OBSAction_ElementShowHide:
result = &action.OBSElementShowHide{
ElementName: o.ElementShowHide.ElementName,
ElementUUID: o.ElementShowHide.ElementUUID,
ValueExpression: expression.Expression(o.ElementShowHide.ValueExpression),
}
case *streamd_grpc.OBSAction_WindowCaptureSetSource:
result = &action.OBSWindowCaptureSetSource{
ElementName: o.WindowCaptureSetSource.ElementName,
ElementUUID: o.WindowCaptureSetSource.ElementUUID,
ValueExpression: expression.Expression(o.WindowCaptureSetSource.ValueExpression),
}
default:
return nil, fmt.Errorf("unexpected OBS action type: %T", o)
}
default:
return nil, fmt.Errorf("unexpected action type: %T", a)
}
return result, nil
}
func ActionGo2GRPC(
input action.Action,
) (*streamd_grpc.Action, error) {
result := streamd_grpc.Action{}
switch a := input.(type) {
case *action.Noop:
result.ActionOneof = &streamd_grpc.Action_NoopRequest{}
case *action.StartStream:
profileString, err := ProfileGo2GRPC(a.Profile)
if err != nil {
return nil, fmt.Errorf("unable to serialize the profile: %w", err)
}
result.ActionOneof = &streamd_grpc.Action_StartStreamRequest{
StartStreamRequest: &streamd_grpc.StartStreamRequest{
PlatID: string(a.PlatID),
Title: a.Title,
Description: a.Description,
Profile: profileString,
},
}
case *action.EndStream:
result.ActionOneof = &streamd_grpc.Action_EndStreamRequest{
EndStreamRequest: &streamd_grpc.EndStreamRequest{
PlatID: string(a.PlatID),
},
}
case *action.OBSElementShowHide:
result.ActionOneof = &streamd_grpc.Action_ObsAction{
ObsAction: &streamd_grpc.OBSAction{
OBSActionOneOf: &streamd_grpc.OBSAction_ElementShowHide{
ElementShowHide: &streamd_grpc.OBSActionElementShowHide{
ElementName: a.ElementName,
ElementUUID: a.ElementUUID,
ValueExpression: string(a.ValueExpression),
},
},
},
}
case *action.OBSWindowCaptureSetSource:
result.ActionOneof = &streamd_grpc.Action_ObsAction{
ObsAction: &streamd_grpc.OBSAction{
OBSActionOneOf: &streamd_grpc.OBSAction_WindowCaptureSetSource{
WindowCaptureSetSource: &streamd_grpc.OBSActionWindowCaptureSetSource{
ElementName: a.ElementName,
ElementUUID: a.ElementUUID,
ValueExpression: string(a.ValueExpression),
},
},
},
}
default:
return nil, fmt.Errorf("unknown action type: %T", a)
}
return &result, nil
}

View File

@@ -0,0 +1,51 @@
package goconv
import (
"fmt"
"github.com/xaionaro-go/streamctl/pkg/streamd/config"
"github.com/xaionaro-go/streamctl/pkg/streamd/config/event"
"github.com/xaionaro-go/streamctl/pkg/streamd/grpc/go/streamd_grpc"
)
func EventGo2GRPC(in event.Event) (*streamd_grpc.Event, error) {
switch q := in.(type) {
case *event.WindowFocusChange:
return &streamd_grpc.Event{
EventOneOf: &streamd_grpc.Event_WindowFocusChange{
WindowFocusChange: triggerGo2GRPCWindowFocusChange(q),
},
}, nil
default:
return nil, fmt.Errorf("conversion of type %T is not implemented, yet", q)
}
}
func triggerGo2GRPCWindowFocusChange(q *event.WindowFocusChange) *streamd_grpc.EventWindowFocusChange {
return &streamd_grpc.EventWindowFocusChange{
WindowID: q.WindowID,
WindowTitle: q.WindowTitle,
WindowTitlePartial: q.WindowTitlePartial,
UserID: q.UserID,
}
}
func EventGRPC2Go(in *streamd_grpc.Event) (config.Event, error) {
switch q := in.EventOneOf.(type) {
case *streamd_grpc.Event_WindowFocusChange:
return triggerGRPC2GoWindowFocusChange(q.WindowFocusChange), nil
default:
return nil, fmt.Errorf("conversion of type %T is not implemented, yet", q)
}
}
func triggerGRPC2GoWindowFocusChange(
q *streamd_grpc.EventWindowFocusChange,
) config.Event {
return &event.WindowFocusChange{
WindowID: q.WindowID,
WindowTitle: q.WindowTitle,
WindowTitlePartial: q.WindowTitlePartial,
UserID: q.UserID,
}
}

View File

@@ -0,0 +1,55 @@
package goconv
import (
"fmt"
"github.com/xaionaro-go/streamctl/pkg/streamd/config"
"github.com/xaionaro-go/streamctl/pkg/streamd/config/event"
"github.com/xaionaro-go/streamctl/pkg/streamd/config/event/eventquery"
"github.com/xaionaro-go/streamctl/pkg/streamd/grpc/go/streamd_grpc"
)
func EventQueryGo2GRPC(in eventquery.EventQuery) (*streamd_grpc.EventQuery, error) {
switch q := in.(type) {
case *eventquery.Event:
ev, err := EventGo2GRPC(q.Value)
if err != nil {
return nil, fmt.Errorf("unable to convert event: %w", err)
}
return &streamd_grpc.EventQuery{
EventQueryOneOf: &streamd_grpc.EventQuery_Event{
Event: ev,
},
}, nil
case *eventquery.EventType[event.WindowFocusChange]:
return &streamd_grpc.EventQuery{
EventQueryOneOf: &streamd_grpc.EventQuery_EventType{
EventType: streamd_grpc.EventType_eventWindowFocusChange,
},
}, nil
default:
return nil, fmt.Errorf("conversion of type %T is not implemented, yet", q)
}
}
func EventQueryGRPC2Go(in *streamd_grpc.EventQuery) (config.EventQuery, error) {
switch q := in.GetEventQueryOneOf().(type) {
case *streamd_grpc.EventQuery_Event:
ev, err := EventGRPC2Go(q.Event)
if err != nil {
return nil, fmt.Errorf("unable to convert event: %w", err)
}
return &eventquery.Event{
Value: ev,
}, nil
case *streamd_grpc.EventQuery_EventType:
switch q.EventType {
case streamd_grpc.EventType_eventWindowFocusChange:
return &eventquery.EventType[event.WindowFocusChange]{}, nil
default:
return nil, fmt.Errorf("unable to convert event type %v", q.EventType)
}
default:
return nil, fmt.Errorf("conversion of type %T is not implemented, yet", q)
}
}

View File

@@ -1,67 +0,0 @@
package goconv
import (
"fmt"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol"
"github.com/xaionaro-go/streamctl/pkg/streamd/api"
"github.com/xaionaro-go/streamctl/pkg/streamd/grpc/go/streamd_grpc"
)
func ActionGRPC2Go(actionGRPC *streamd_grpc.Action) (api.TimerAction, error) {
var action api.TimerAction
switch actionRaw := actionGRPC.ActionOneof.(type) {
case *streamd_grpc.Action_NoopRequest:
action = &api.TimerActionNoop{}
case *streamd_grpc.Action_StartStreamRequest:
platID := streamcontrol.PlatformName(actionRaw.StartStreamRequest.PlatID)
profile, err := ProfileGRPC2Go(platID, actionRaw.StartStreamRequest.GetProfile())
if err != nil {
return nil, err
}
action = &api.TimerActionStartStream{
PlatID: platID,
Title: actionRaw.StartStreamRequest.Title,
Description: actionRaw.StartStreamRequest.Description,
Profile: profile,
CustomArgs: nil,
}
case *streamd_grpc.Action_EndStreamRequest:
action = &api.TimerActionEndStream{
PlatID: streamcontrol.PlatformName(actionRaw.EndStreamRequest.PlatID),
}
default:
return nil, fmt.Errorf("unexpected timer action: %T", actionRaw)
}
return action, nil
}
func ActionGo2GRPC(action api.TimerAction) (*streamd_grpc.Action, error) {
resultAction := streamd_grpc.Action{}
switch action := action.(type) {
case *api.TimerActionNoop:
resultAction.ActionOneof = &streamd_grpc.Action_NoopRequest{}
case *api.TimerActionStartStream:
profileString, err := ProfileGo2GRPC(action.Profile)
if err != nil {
return nil, fmt.Errorf("unable to serialize the profile: %w", err)
}
resultAction.ActionOneof = &streamd_grpc.Action_StartStreamRequest{
StartStreamRequest: &streamd_grpc.StartStreamRequest{
PlatID: string(action.PlatID),
Title: action.Title,
Description: action.Description,
Profile: profileString,
},
}
case *api.TimerActionEndStream:
resultAction.ActionOneof = &streamd_grpc.Action_EndStreamRequest{
EndStreamRequest: &streamd_grpc.EndStreamRequest{
PlatID: string(action.PlatID),
},
}
default:
return nil, fmt.Errorf("unknown action type: %T", action)
}
return &resultAction, nil
}

View File

@@ -0,0 +1,50 @@
package goconv
import (
"fmt"
"github.com/xaionaro-go/streamctl/pkg/streamd/api"
"github.com/xaionaro-go/streamctl/pkg/streamd/grpc/go/streamd_grpc"
)
func TriggerRuleGo2GRPC(
rule *api.TriggerRule,
) (*streamd_grpc.TriggerRule, error) {
resultEventQuery, err := EventQueryGo2GRPC(
rule.EventQuery,
)
if err != nil {
return nil, fmt.Errorf("unable to convert the trigger query: %w", err)
}
resultAction, err := ActionGo2GRPC(rule.Action)
if err != nil {
return nil, fmt.Errorf("unable to convert the action: %w", err)
}
return &streamd_grpc.TriggerRule{
Description: rule.Description,
EventQuery: resultEventQuery,
Action: resultAction,
}, nil
}
func TriggerRuleGRPC2Go(
rule *streamd_grpc.TriggerRule,
) (*api.TriggerRule, error) {
resultEventQuery, err := EventQueryGRPC2Go(rule.GetEventQuery())
if err != nil {
return nil, fmt.Errorf("unable to convert the trigger query: %w", err)
}
resultAction, err := ActionGRPC2Go(rule.GetAction())
if err != nil {
return nil, fmt.Errorf("unable to convert the action: %w", err)
}
return &api.TriggerRule{
Description: rule.GetDescription(),
EventQuery: resultEventQuery,
Action: resultAction,
}, nil
}

View File

@@ -79,6 +79,13 @@ service StreamD {
rpc AddTimer(AddTimerRequest) returns (AddTimerReply) {} rpc AddTimer(AddTimerRequest) returns (AddTimerReply) {}
rpc RemoveTimer(RemoveTimerRequest) returns (RemoveTimerReply) {} rpc RemoveTimer(RemoveTimerRequest) returns (RemoveTimerReply) {}
rpc ListTimers(ListTimersRequest) returns (ListTimersReply) {} rpc ListTimers(ListTimersRequest) returns (ListTimersReply) {}
rpc ListTriggerRules(ListTriggerRulesRequest) returns (ListTriggerRulesReply) {}
rpc AddTriggerRule(AddTriggerRuleRequest) returns (AddTriggerRuleReply) {}
rpc RemoveTriggerRule(RemoveTriggerRuleRequest) returns (RemoveTriggerRuleReply) {}
rpc UpdateTriggerRule(UpdateTriggerRuleRequest) returns (UpdateTriggerRuleReply) {}
rpc SubmitEvent(SubmitEventRequest) returns (SubmitEventReply) {}
} }
message PingRequest { message PingRequest {
@@ -534,11 +541,31 @@ message StreamPlayersChange {}
message NoopRequest {} message NoopRequest {}
message OBSActionElementShowHide {
optional string elementName = 1;
optional string elementUUID = 2;
string valueExpression = 3;
}
message OBSActionWindowCaptureSetSource {
optional string elementName = 1;
optional string elementUUID = 2;
string valueExpression = 3;
}
message OBSAction {
oneof OBSActionOneOf {
OBSActionElementShowHide elementShowHide = 1;
OBSActionWindowCaptureSetSource windowCaptureSetSource = 2;
}
}
message Action { message Action {
oneof ActionOneof { oneof ActionOneof {
NoopRequest noopRequest = 1; NoopRequest noopRequest = 1;
StartStreamRequest startStreamRequest = 2; StartStreamRequest startStreamRequest = 2;
EndStreamRequest endStreamRequest = 3; EndStreamRequest endStreamRequest = 3;
OBSAction obsAction = 4;
} }
} }
@@ -565,3 +592,79 @@ message ListTimersRequest {}
message ListTimersReply { message ListTimersReply {
repeated Timer timers = 1; repeated Timer timers = 1;
} }
message EventQueryAnd {
repeated Event queries = 1;
}
message EventQueryOr {
repeated Event queries = 1;
}
message EventQueryNot {
Event query = 1;
}
message EventOBSSceneChange {
string sceneName = 1;
}
message EventWindowFocusChange {
optional uint64 windowID = 1;
optional string windowTitle = 2;
optional string windowTitlePartial = 3;
optional uint64 userID = 4;
}
message EventQuery {
oneof EventQueryOneOf {
EventQueryAnd and = 1;
EventQueryOr or = 2;
EventQueryNot not = 3;
EventType eventType = 4;
Event event = 5;
}
}
enum EventType {
eventWindowFocusChange = 0;
eventOBSSceneChange = 1;
}
message Event {
oneof EventOneOf {
EventOBSSceneChange obsSceneChange = 1;
EventWindowFocusChange windowFocusChange = 2;
}
}
message TriggerRule {
string Description = 1;
EventQuery eventQuery = 2;
Action action = 3;
}
message ListTriggerRulesRequest {}
message ListTriggerRulesReply {
repeated TriggerRule rules = 1;
}
message AddTriggerRuleRequest {
TriggerRule rule = 1;
}
message AddTriggerRuleReply {
uint64 ruleID = 1;
}
message RemoveTriggerRuleRequest {
uint64 ruleID = 1;
}
message RemoveTriggerRuleReply {}
message UpdateTriggerRuleRequest {
uint64 ruleID = 1;
TriggerRule rule = 2;
}
message UpdateTriggerRuleReply {}
message SubmitEventRequest {
Event event = 1;
}
message SubmitEventReply {}

174
pkg/streamd/obs.go Normal file
View File

@@ -0,0 +1,174 @@
package streamd
import (
"bytes"
"context"
"fmt"
"time"
"github.com/andreykaipov/goobs"
"github.com/chai2010/webp"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/obs-grpc-proxy/pkg/obsgrpcproxy"
"github.com/xaionaro-go/obs-grpc-proxy/protobuf/go/obs_grpc"
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol/obs"
"github.com/xaionaro-go/streamctl/pkg/streamd/config"
"github.com/xaionaro-go/streamctl/pkg/streamd/consts"
"github.com/xaionaro-go/streamctl/pkg/streamtypes"
"github.com/xaionaro-go/streamctl/pkg/xsync"
)
func (d *StreamD) OBS(
ctx context.Context,
) (obs_grpc.OBSServer, context.CancelFunc, error) {
logger.Tracef(ctx, "OBS()")
defer logger.Tracef(ctx, "/OBS()")
proxy := obsgrpcproxy.New(
ctx,
func(ctx context.Context) (*goobs.Client, context.CancelFunc, error) {
logger.Tracef(ctx, "OBS proxy getting client")
defer logger.Tracef(ctx, "/OBS proxy getting client")
obs := xsync.RDoR1(ctx, &d.ControllersLocker, func() *obs.OBS {
return d.StreamControllers.OBS
})
if obs == nil {
return nil, nil, fmt.Errorf("connection to OBS is not initialized")
}
client, err := obs.GetClient()
logger.Tracef(ctx, "getting OBS client result: %v %v", client, err)
if err != nil {
return nil, nil, err
}
return client, func() {
err := client.Disconnect()
if err != nil {
logger.Errorf(ctx, "unable to disconnect from OBS: %w", err)
} else {
logger.Tracef(ctx, "disconnected from OBS")
}
}, nil
},
)
return proxy, func() {}, nil
}
func getOBSImageBytes(
ctx context.Context,
obsServer obs_grpc.OBSServer,
el config.MonitorElementConfig,
obsState *streamtypes.OBSState,
) ([]byte, time.Time, error) {
img, nextUpdateAt, err := el.Source.GetImage(ctx, obsServer, el, obsState)
if err != nil {
return nil, time.Now().
Add(time.Second),
fmt.Errorf(
"unable to get the image from the source: %w",
err,
)
}
for _, filter := range el.Filters {
img = filter.Filter(ctx, img)
}
var out bytes.Buffer
err = webp.Encode(&out, img, &webp.Options{
Lossless: el.ImageLossless,
Quality: float32(el.ImageQuality),
Exact: false,
})
if err != nil {
return nil, time.Now().Add(time.Second), fmt.Errorf("unable to encode the image: %w", err)
}
return out.Bytes(), nextUpdateAt, nil
}
func (d *StreamD) initImageTaker(ctx context.Context) error {
for elName, el := range d.Config.Monitor.Elements {
if el.Source == nil {
continue
}
if _, ok := el.Source.(*config.MonitorSourceDummy); ok {
continue
}
{
elName, el := elName, el
_ = el
observability.Go(ctx, func() {
logger.Debugf(ctx, "taker of image '%s'", elName)
defer logger.Debugf(ctx, "/taker of image '%s'", elName)
obsServer, obsServerClose, err := d.OBS(ctx)
if obsServerClose != nil {
defer obsServerClose()
}
if err != nil {
logger.Errorf(ctx, "unable to init connection with OBS: %w", err)
return
}
for {
var (
imgBytes []byte
nextUpdateAt time.Time
err error
)
waitUntilNextIteration := func() bool {
if nextUpdateAt.IsZero() {
return false
}
select {
case <-ctx.Done():
return false
case <-time.After(time.Until(nextUpdateAt)):
return true
}
}
imgBytes, nextUpdateAt, err = getOBSImageBytes(ctx, obsServer, el, &d.OBSState)
if err != nil {
logger.Errorf(ctx, "unable to get the image of '%s': %v", elName, err)
if !waitUntilNextIteration() {
return
}
continue
}
err = d.SetVariable(ctx, consts.VarKeyImage(consts.ImageID(elName)), imgBytes)
if err != nil {
logger.Errorf(ctx, "unable to save the image of '%s': %w", elName, err)
if !waitUntilNextIteration() {
return
}
continue
}
if !waitUntilNextIteration() {
return
}
}
})
}
}
return nil
}
type SceneElementIdentifier struct {
Name *string
UUID *string
}
func (d *StreamD) OBSElementSetShow(
ctx context.Context,
elID SceneElementIdentifier,
shouldShow bool,
) error {
return fmt.Errorf("not implemented, yet")
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,7 +1,6 @@
package streamd package streamd
import ( import (
"bytes"
"context" "context"
"crypto" "crypto"
"fmt" "fmt"
@@ -11,15 +10,11 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/andreykaipov/goobs"
eventbus "github.com/asaskevich/EventBus" eventbus "github.com/asaskevich/EventBus"
"github.com/chai2010/webp"
"github.com/facebookincubator/go-belt" "github.com/facebookincubator/go-belt"
"github.com/facebookincubator/go-belt/tool/experimental/errmon" "github.com/facebookincubator/go-belt/tool/experimental/errmon"
"github.com/facebookincubator/go-belt/tool/logger" "github.com/facebookincubator/go-belt/tool/logger"
"github.com/hashicorp/go-multierror" "github.com/hashicorp/go-multierror"
"github.com/xaionaro-go/obs-grpc-proxy/pkg/obsgrpcproxy"
"github.com/xaionaro-go/obs-grpc-proxy/protobuf/go/obs_grpc"
"github.com/xaionaro-go/streamctl/pkg/observability" "github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/streamctl/pkg/player" "github.com/xaionaro-go/streamctl/pkg/player"
"github.com/xaionaro-go/streamctl/pkg/repository" "github.com/xaionaro-go/streamctl/pkg/repository"
@@ -175,110 +170,6 @@ func (d *StreamD) Run(ctx context.Context) (_ret error) { // TODO: delete the fe
return nil return nil
} }
func getOBSImageBytes(
ctx context.Context,
obsServer obs_grpc.OBSServer,
el config.MonitorElementConfig,
obsState *streamtypes.OBSState,
) ([]byte, time.Time, error) {
img, nextUpdateAt, err := el.Source.GetImage(ctx, obsServer, el, obsState)
if err != nil {
return nil, time.Now().
Add(time.Second),
fmt.Errorf(
"unable to get the image from the source: %w",
err,
)
}
for _, filter := range el.Filters {
img = filter.Filter(ctx, img)
}
var out bytes.Buffer
err = webp.Encode(&out, img, &webp.Options{
Lossless: el.ImageLossless,
Quality: float32(el.ImageQuality),
Exact: false,
})
if err != nil {
return nil, time.Now().Add(time.Second), fmt.Errorf("unable to encode the image: %w", err)
}
return out.Bytes(), nextUpdateAt, nil
}
func (d *StreamD) initImageTaker(ctx context.Context) error {
for elName, el := range d.Config.Monitor.Elements {
if el.Source == nil {
continue
}
if _, ok := el.Source.(*config.MonitorSourceDummy); ok {
continue
}
{
elName, el := elName, el
_ = el
observability.Go(ctx, func() {
logger.Debugf(ctx, "taker of image '%s'", elName)
defer logger.Debugf(ctx, "/taker of image '%s'", elName)
obsServer, obsServerClose, err := d.OBS(ctx)
if obsServerClose != nil {
defer obsServerClose()
}
if err != nil {
logger.Errorf(ctx, "unable to init connection with OBS: %w", err)
return
}
for {
var (
imgBytes []byte
nextUpdateAt time.Time
err error
)
waitUntilNextIteration := func() bool {
if nextUpdateAt.IsZero() {
return false
}
select {
case <-ctx.Done():
return false
case <-time.After(time.Until(nextUpdateAt)):
return true
}
}
imgBytes, nextUpdateAt, err = getOBSImageBytes(ctx, obsServer, el, &d.OBSState)
if err != nil {
logger.Errorf(ctx, "unable to get the image of '%s': %v", elName, err)
if !waitUntilNextIteration() {
return
}
continue
}
err = d.SetVariable(ctx, consts.VarKeyImage(consts.ImageID(elName)), imgBytes)
if err != nil {
logger.Errorf(ctx, "unable to save the image of '%s': %w", elName, err)
if !waitUntilNextIteration() {
return
}
continue
}
if !waitUntilNextIteration() {
return
}
}
})
}
}
return nil
}
func (d *StreamD) InitStreamServer(ctx context.Context) (_err error) { func (d *StreamD) InitStreamServer(ctx context.Context) (_err error) {
logger.Debugf(ctx, "InitStreamServer") logger.Debugf(ctx, "InitStreamServer")
defer logger.Debugf(ctx, "/InitStreamServer: %v", _err) defer logger.Debugf(ctx, "/InitStreamServer: %v", _err)
@@ -973,43 +864,6 @@ func (d *StreamD) SetVariable(
return nil return nil
} }
func (d *StreamD) OBS(
ctx context.Context,
) (obs_grpc.OBSServer, context.CancelFunc, error) {
logger.Tracef(ctx, "OBS()")
defer logger.Tracef(ctx, "/OBS()")
proxy := obsgrpcproxy.New(
ctx,
func(ctx context.Context) (*goobs.Client, context.CancelFunc, error) {
logger.Tracef(ctx, "OBS proxy getting client")
defer logger.Tracef(ctx, "/OBS proxy getting client")
obs := xsync.RDoR1(ctx, &d.ControllersLocker, func() *obs.OBS {
return d.StreamControllers.OBS
})
if obs == nil {
return nil, nil, fmt.Errorf("connection to OBS is not initialized")
}
client, err := obs.GetClient()
logger.Tracef(ctx, "getting OBS client result: %v %v", client, err)
if err != nil {
return nil, nil, err
}
return client, func() {
err := client.Disconnect()
if err != nil {
logger.Errorf(ctx, "unable to disconnect from OBS: %w", err)
} else {
logger.Tracef(ctx, "disconnected from OBS")
}
}, nil
},
)
return proxy, func() {}, nil
}
func (d *StreamD) SubmitOAuthCode( func (d *StreamD) SubmitOAuthCode(
ctx context.Context, ctx context.Context,
req *streamd_grpc.SubmitOAuthCodeRequest, req *streamd_grpc.SubmitOAuthCodeRequest,
@@ -1816,7 +1670,7 @@ func (d *StreamD) GetLoggingLevel(ctx context.Context) (logger.Level, error) {
func (d *StreamD) AddTimer( func (d *StreamD) AddTimer(
ctx context.Context, ctx context.Context,
triggerAt time.Time, triggerAt time.Time,
action api.TimerAction, action api.Action,
) (api.TimerID, error) { ) (api.TimerID, error) {
return xsync.DoA3R2(ctx, &d.TimersLocker, d.addTimer, ctx, triggerAt, action) return xsync.DoA3R2(ctx, &d.TimersLocker, d.addTimer, ctx, triggerAt, action)
} }
@@ -1824,7 +1678,7 @@ func (d *StreamD) AddTimer(
func (d *StreamD) addTimer( func (d *StreamD) addTimer(
ctx context.Context, ctx context.Context,
triggerAt time.Time, triggerAt time.Time,
action api.TimerAction, action api.Action,
) (api.TimerID, error) { ) (api.TimerID, error) {
logger.Debugf(ctx, "addTimer(ctx, %v, %v)", triggerAt, action) logger.Debugf(ctx, "addTimer(ctx, %v, %v)", triggerAt, action)
defer logger.Debugf(ctx, "/addTimer(ctx, %v, %v)", triggerAt, action) defer logger.Debugf(ctx, "/addTimer(ctx, %v, %v)", triggerAt, action)

View File

@@ -21,7 +21,7 @@ func NewTimer(
streamD *StreamD, streamD *StreamD,
timerID api.TimerID, timerID api.TimerID,
triggerAt time.Time, triggerAt time.Time,
action api.TimerAction, action api.Action,
) *Timer { ) *Timer {
return &Timer{ return &Timer{
StreamD: streamD, StreamD: streamD,
@@ -108,29 +108,8 @@ func (t *Timer) trigger(ctx context.Context) {
} }
}) })
switch action := t.Timer.Action.(type) { err := t.StreamD.doAction(ctx, t.Timer.Action, nil)
case *api.TimerActionNoop:
return
case *api.TimerActionStartStream:
err := t.StreamD.StartStream(
ctx,
action.PlatID,
action.Title,
action.Description,
action.Profile,
)
if err != nil { if err != nil {
logger.Errorf(ctx, "unable to start stream by timer %d (%#+v): %v", t.Timer.ID, t.Timer, err) logger.Errorf(ctx, "unable to perform action %#+v: %w", t.Timer.Action, err)
}
case *api.TimerActionEndStream:
err := t.StreamD.EndStream(
ctx,
action.PlatID,
)
if err != nil {
logger.Errorf(ctx, "unable to end stream by timer %d (%#+v): %v", t.Timer.ID, t.Timer, err)
}
default:
logger.Error(ctx, "unknown action type: %t", action)
} }
} }

View File

@@ -0,0 +1,107 @@
package streamd
import (
"context"
"fmt"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/streamctl/pkg/streamd/api"
"github.com/xaionaro-go/streamctl/pkg/xsync"
)
func (d *StreamD) AddTriggerRule(
ctx context.Context,
triggerRule *api.TriggerRule,
) (api.TriggerRuleID, error) {
logger.Debugf(ctx, "AddTriggerRule(ctx, %#+v)", triggerRule)
defer logger.Debugf(ctx, "/AddTriggerRule(ctx, %#+v)", triggerRule)
ruleID, err := d.addTriggerRuleToConfig(ctx, triggerRule)
if err != nil {
return 0, fmt.Errorf("unable to add the trigger rule to config: %w", err)
}
return ruleID, nil
}
func (d *StreamD) addTriggerRuleToConfig(
ctx context.Context,
triggerRule *api.TriggerRule,
) (api.TriggerRuleID, error) {
return xsync.DoR2(ctx, &d.ConfigLock, func() (api.TriggerRuleID, error) {
ruleID := api.TriggerRuleID(len(d.Config.TriggerRules))
d.Config.TriggerRules = append(d.Config.TriggerRules, triggerRule)
if err := d.SaveConfig(ctx); err != nil {
return ruleID, fmt.Errorf("unable to save the config: %w", err)
}
return ruleID, nil
})
}
func (d *StreamD) UpdateTriggerRule(
ctx context.Context,
ruleID api.TriggerRuleID,
triggerRule *api.TriggerRule,
) error {
logger.Debugf(ctx, "UpdateTriggerRule(ctx, %v, %#+v)", ruleID, triggerRule)
defer logger.Debugf(ctx, "/UpdateTriggerRule(ctx, %v, %#+v)", ruleID, triggerRule)
if err := d.updateTriggerRuleInConfig(ctx, ruleID, triggerRule); err != nil {
return fmt.Errorf("unable to update the trigger rule %d in the config: %w", ruleID, err)
}
return nil
}
func (d *StreamD) updateTriggerRuleInConfig(
ctx context.Context,
ruleID api.TriggerRuleID,
triggerRule *api.TriggerRule,
) error {
return xsync.DoR1(ctx, &d.ConfigLock, func() error {
if ruleID >= api.TriggerRuleID(len(d.Config.TriggerRules)) {
return fmt.Errorf("rule %d does not exist", ruleID)
}
d.Config.TriggerRules[ruleID] = triggerRule
if err := d.SaveConfig(ctx); err != nil {
return fmt.Errorf("unable to save the config: %w", err)
}
return nil
})
}
func (d *StreamD) RemoveTriggerRule(
ctx context.Context,
ruleID api.TriggerRuleID,
) error {
logger.Debugf(ctx, "RemoveTriggerRule(ctx, %v)", ruleID)
defer logger.Debugf(ctx, "/RemoveTriggerRule(ctx, %v)", ruleID)
if err := d.removeTriggerRuleFromConfig(ctx, ruleID); err != nil {
return fmt.Errorf("unable to remove the trigger rule %d from the config: %w", ruleID, err)
}
return nil
}
func (d *StreamD) removeTriggerRuleFromConfig(
ctx context.Context,
ruleID api.TriggerRuleID,
) error {
return xsync.DoR1(ctx, &d.ConfigLock, func() error {
if ruleID >= api.TriggerRuleID(len(d.Config.TriggerRules)) {
return fmt.Errorf("rule %d does not exist", ruleID)
}
d.Config.TriggerRules = append(d.Config.TriggerRules[:ruleID], d.Config.TriggerRules[ruleID+1:]...)
if err := d.SaveConfig(ctx); err != nil {
return fmt.Errorf("unable to save the config: %w", err)
}
return nil
})
}
func (d *StreamD) ListTriggerRules(
ctx context.Context,
) (api.TriggerRules, error) {
logger.Debugf(ctx, "ListTriggerRules(ctx)")
defer logger.Debugf(ctx, "/ListTriggerRules(ctx)")
return xsync.DoR2(ctx, &d.ConfigLock, func() (api.TriggerRules, error) {
rules := make(api.TriggerRules, len(d.Config.TriggerRules))
copy(rules, d.Config.TriggerRules)
return rules, nil
})
}

View File

@@ -4,16 +4,6 @@ import (
"context" "context"
"fmt" "fmt"
"fyne.io/fyne/v2"
"fyne.io/fyne/v2/container"
"fyne.io/fyne/v2/dialog"
"fyne.io/fyne/v2/theme"
"fyne.io/fyne/v2/widget"
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol/obs"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol/obs/types/action"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol/obs/types/registry"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol/obs/types/trigger"
"github.com/xaionaro-go/streamctl/pkg/streamd/config" "github.com/xaionaro-go/streamctl/pkg/streamd/config"
) )
@@ -29,159 +19,3 @@ func (p *Panel) setStreamDConfig(
} }
return nil return nil
} }
func (p *Panel) openSetupOBSSceneRulesWindow(
ctx context.Context,
sceneName obs.SceneName,
) {
w := p.app.NewWindow(AppName + ": Setup scene rules")
resizeWindow(w, fyne.NewSize(1000, 1000))
var refreshContent func()
refreshContent = func() {
sceneRules, err := p.StreamD.ListOBSSceneRules(ctx, sceneName)
if err != nil {
p.DisplayError(err)
return
}
var objs []fyne.CanvasObject
for idx, sceneRule := range sceneRules {
objs = append(objs, container.NewHBox(
widget.NewButtonWithIcon("", theme.SettingsIcon(), func() {
p.openAddOrEditSceneRuleWindow(
ctx,
"Edit scene rule",
sceneRule,
func(
ctx context.Context,
sceneRule obs.SceneRule,
) error {
p.StreamD.UpdateOBSSceneRule(ctx, sceneName, uint64(idx), sceneRule)
observability.Go(ctx, refreshContent)
return nil
},
)
}),
widget.NewButtonWithIcon("", theme.ContentRemoveIcon(), func() {
cw := dialog.NewConfirm(
"Delete scene rule",
"Are you sure you want to delete the stream rule?",
func(b bool) {
if !b {
return
}
p.StreamD.RemoveOBSSceneRule(ctx, sceneName, uint64(idx))
observability.Go(ctx, refreshContent)
},
p.mainWindow,
)
cw.Show()
}),
))
}
w.SetContent(container.NewBorder(
nil,
widget.NewButtonWithIcon("Add rule", theme.ContentAddIcon(), func() {
p.openAddOrEditSceneRuleWindow(
ctx,
"Add scene rule",
obs.SceneRule{},
func(
ctx context.Context,
sceneRule obs.SceneRule,
) error {
p.StreamD.AddOBSSceneRule(ctx, sceneName, sceneRule)
observability.Go(ctx, refreshContent)
return nil
},
)
}),
nil,
nil,
objs...,
))
}
refreshContent()
w.Show()
}
func (p *Panel) openAddOrEditSceneRuleWindow(
ctx context.Context,
title string,
sceneRule obs.SceneRule,
commitFn func(context.Context, obs.SceneRule) error,
) {
w := p.app.NewWindow(AppName + ": " + title)
resizeWindow(w, fyne.NewSize(1000, 1000))
triggerQueryTypeList := trigger.ListQueryTypeNames()
triggerQueryValues := map[string]trigger.Query{}
for _, typeName := range triggerQueryTypeList {
triggerQueryValues[typeName] = trigger.NewByTypeName(typeName)
}
if sceneRule.TriggerQuery == nil {
sceneRule.TriggerQuery = triggerQueryValues[triggerQueryTypeList[0]]
}
triggerQueryValues[registry.ToTypeName(sceneRule.TriggerQuery)] = sceneRule.TriggerQuery
actionTypeList := action.ListTypeNames()
actionValues := map[string]action.Action{}
for _, typeName := range actionTypeList {
actionValues[typeName] = action.NewByTypeName(typeName)
}
if sceneRule.Action == nil {
sceneRule.Action = actionValues[actionTypeList[0]]
}
actionValues[registry.ToTypeName(sceneRule.Action)] = sceneRule.Action
var refreshContent func()
refreshContent = func() {
triggerSelector := widget.NewSelect(triggerQueryTypeList, func(s string) {
if s == registry.ToTypeName(sceneRule.TriggerQuery) {
return
}
sceneRule.TriggerQuery = triggerQueryValues[s]
refreshContent()
})
triggerSelector.SetSelected(registry.ToTypeName(sceneRule.TriggerQuery))
triggerFields := makeFieldsFor(sceneRule.TriggerQuery)
actionSelector := widget.NewSelect(actionTypeList, func(s string) {
if s == registry.ToTypeName(sceneRule.Action) {
return
}
sceneRule.Action = actionValues[s]
refreshContent()
})
actionSelector.SetSelected(registry.ToTypeName(sceneRule.Action))
actionFields := makeFieldsFor(sceneRule.Action)
w.SetContent(container.NewBorder(
nil,
widget.NewButton("Save", func() {
err := commitFn(ctx, sceneRule)
if err != nil {
p.DisplayError(err)
return
}
w.Close()
}),
nil,
nil,
container.NewVBox(
widget.NewLabel("Trigger:"),
triggerSelector,
container.NewVBox(triggerFields...),
widget.NewLabel("Action:"),
actionSelector,
container.NewVBox(actionFields...),
),
))
}
refreshContent()
w.Show()
}

View File

@@ -1924,10 +1924,6 @@ func (p *Panel) initMainWindow(
streamInfoContainer, streamInfoContainer,
) )
setupSceneRulesButton := widget.NewButton("Setup scene rules", func() {
p.openSetupOBSSceneRulesWindow(ctx, obs.SceneName(p.obsSelectScene.Selected))
})
p.obsSelectScene = widget.NewSelect(nil, func(s string) { p.obsSelectScene = widget.NewSelect(nil, func(s string) {
logger.Debugf(ctx, "OBS scene is changed to '%s'", s) logger.Debugf(ctx, "OBS scene is changed to '%s'", s)
obsServer, obsServerClose, err := p.StreamD.OBS(ctx) obsServer, obsServerClose, err := p.StreamD.OBS(ctx)
@@ -1944,13 +1940,7 @@ func (p *Panel) initMainWindow(
if err != nil { if err != nil {
p.DisplayError(fmt.Errorf("unable to set the OBS scene: %w", err)) p.DisplayError(fmt.Errorf("unable to set the OBS scene: %w", err))
} }
setupSceneRulesButton.Enable()
}) })
if p.obsSelectScene.Selected == "" {
setupSceneRulesButton.Disable()
}
obsPage := container.NewBorder( obsPage := container.NewBorder(
nil, nil,
nil, nil,
@@ -1958,7 +1948,6 @@ func (p *Panel) initMainWindow(
nil, nil,
container.NewVBox( container.NewVBox(
container.NewHBox(widget.NewLabel("Scene:"), p.obsSelectScene), container.NewHBox(widget.NewLabel("Scene:"), p.obsSelectScene),
setupSceneRulesButton,
), ),
) )
@@ -2014,6 +2003,8 @@ func (p *Panel) initMainWindow(
)) ))
timersUI := NewTimersUI(ctx, p) timersUI := NewTimersUI(ctx, p)
triggersUI := NewTriggerRulesUI(ctx, p)
moreControlPage := container.NewBorder( moreControlPage := container.NewBorder(
nil, nil,
nil, nil,
@@ -2022,6 +2013,8 @@ func (p *Panel) initMainWindow(
container.NewVBox( container.NewVBox(
timersUI.CanvasObject, timersUI.CanvasObject,
widget.NewSeparator(), widget.NewSeparator(),
triggersUI.CanvasObject,
widget.NewSeparator(),
), ),
) )

View File

@@ -14,6 +14,25 @@ func makeFieldsFor(
return reflectMakeFieldsFor(reflect.ValueOf(obj), reflect.ValueOf(obj).Type(), nil, "") return reflectMakeFieldsFor(reflect.ValueOf(obj), reflect.ValueOf(obj).Type(), nil, "")
} }
func isUIDisabled(
obj any,
) bool {
return reflectIsUIDisabled(reflect.ValueOf(obj))
}
func reflectIsUIDisabled(
v reflect.Value,
) bool {
v = reflect.Indirect(v)
t := v.Type()
for i := 0; i < t.NumField(); i++ {
if t.Field(i).Name == "uiDisable" {
return true
}
}
return false
}
func reflectMakeFieldsFor( func reflectMakeFieldsFor(
v reflect.Value, v reflect.Value,
t reflect.Type, t reflect.Type,
@@ -53,6 +72,9 @@ func reflectMakeFieldsFor(
fv := v.Field(i) fv := v.Field(i)
ft := t.Field(i) ft := t.Field(i)
if ft.PkgPath != "" { if ft.PkgPath != "" {
if ft.Name == "uiDisable" {
return nil
}
tag := ft.Tag.Get("uicomment") tag := ft.Tag.Get("uicomment")
if tag != "" { if tag != "" {
result = append(result, widget.NewLabel(tag)) result = append(result, widget.NewLabel(tag))
@@ -67,7 +89,7 @@ func reflectMakeFieldsFor(
} }
return result return result
default: default:
panic(fmt.Errorf("internal error: support of %v is not implemented, yet", t.Kind())) panic(fmt.Errorf("internal error: %s: support of %v is not implemented, yet", namePrefix, t.Kind()))
} }
} }

View File

@@ -1,62 +1,11 @@
package streampanel package streampanel
import ( import (
"bytes"
"fmt"
"io"
"net/http"
"strings" "strings"
"text/template"
"github.com/xaionaro-go/streamctl/pkg/expression"
) )
var funcMap = map[string]interface{}{
"devnull": func(args ...any) string {
return ""
},
"httpGET": func(urlString string) string {
resp, err := http.Get(urlString)
if err != nil {
panic(err)
}
defer resp.Body.Close()
b, err := io.ReadAll(resp.Body)
if err != nil {
panic(err)
}
return string(b)
},
"httpGETIgnoreErrors": func(urlString string) string {
resp, err := http.Get(urlString)
if err != nil {
return ""
}
defer resp.Body.Close()
b, err := io.ReadAll(resp.Body)
if err != nil {
return ""
}
return string(b)
},
}
func expandTemplate(tpl string) (string, error) {
parsed, err := template.New("").Funcs(funcMap).Parse(tpl)
if err != nil {
return "", fmt.Errorf("unable to parse the template: %w", err)
}
var buf bytes.Buffer
if err = parsed.Execute(&buf, nil); err != nil {
return "", fmt.Errorf("unable to execute the template: %w", err)
}
return buf.String(), nil
}
func splitWithQuotes(s string) []string { func splitWithQuotes(s string) []string {
var result []string var result []string
var current string var current string
@@ -97,7 +46,7 @@ func splitWithQuotes(s string) []string {
} }
func expandCommand(cmdString string) ([]string, error) { func expandCommand(cmdString string) ([]string, error) {
cmdStringExpanded, err := expandTemplate(cmdString) cmdStringExpanded, err := expression.Eval[string](expression.Expression(cmdString), nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -18,18 +18,12 @@ import (
obs "github.com/xaionaro-go/streamctl/pkg/streamcontrol/obs/types" obs "github.com/xaionaro-go/streamctl/pkg/streamcontrol/obs/types"
twitch "github.com/xaionaro-go/streamctl/pkg/streamcontrol/twitch/types" twitch "github.com/xaionaro-go/streamctl/pkg/streamcontrol/twitch/types"
youtube "github.com/xaionaro-go/streamctl/pkg/streamcontrol/youtube/types" youtube "github.com/xaionaro-go/streamctl/pkg/streamcontrol/youtube/types"
"github.com/xaionaro-go/streamctl/pkg/streamd/api" "github.com/xaionaro-go/streamctl/pkg/streamd/config/action"
"github.com/xaionaro-go/streamctl/pkg/xcontext" "github.com/xaionaro-go/streamctl/pkg/xcontext"
"github.com/xaionaro-go/streamctl/pkg/xfyne" "github.com/xaionaro-go/streamctl/pkg/xfyne"
"github.com/xaionaro-go/streamctl/pkg/xsync" "github.com/xaionaro-go/streamctl/pkg/xsync"
) )
var closedChan = make(chan struct{})
func init() {
close(closedChan)
}
type timersUI struct { type timersUI struct {
locker xsync.Mutex locker xsync.Mutex
CanvasObject fyne.CanvasObject CanvasObject fyne.CanvasObject
@@ -137,11 +131,11 @@ func (ui *timersUI) refreshFromRemote(
var triggerAt time.Time var triggerAt time.Time
for _, timer := range timers { for _, timer := range timers {
switch timer.Action.(type) { switch timer.Action.(type) {
case *api.TimerActionNoop: case *action.Noop:
continue continue
case *api.TimerActionStartStream: case *action.StartStream:
continue continue
case *api.TimerActionEndStream: case *action.EndStream:
triggerAt = timer.TriggerAt triggerAt = timer.TriggerAt
default: default:
continue continue
@@ -258,7 +252,7 @@ func (ui *timersUI) kickOffRemotely(
twitch.ID, twitch.ID,
obs.ID, obs.ID,
} { } {
_, err := streamD.AddTimer(ctx, deadline, &api.TimerActionEndStream{ _, err := streamD.AddTimer(ctx, deadline, &action.EndStream{
PlatID: platID, PlatID: platID,
}) })
if err != nil { if err != nil {

View File

@@ -0,0 +1,231 @@
package streampanel
import (
"context"
"fmt"
"fyne.io/fyne/v2"
"fyne.io/fyne/v2/container"
"fyne.io/fyne/v2/dialog"
"fyne.io/fyne/v2/theme"
"fyne.io/fyne/v2/widget"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/streamctl/pkg/serializable"
"github.com/xaionaro-go/streamctl/pkg/serializable/registry"
"github.com/xaionaro-go/streamctl/pkg/streamd/api"
"github.com/xaionaro-go/streamctl/pkg/streamd/config"
"github.com/xaionaro-go/streamctl/pkg/streamd/config/action"
"github.com/xaionaro-go/streamctl/pkg/streamd/config/event/eventquery"
)
type triggerRulesUI struct {
CanvasObject fyne.CanvasObject
panel *Panel
}
func NewTriggerRulesUI(
ctx context.Context,
panel *Panel,
) *triggerRulesUI {
ui := &triggerRulesUI{
panel: panel,
}
button := widget.NewButtonWithIcon(
"Setup trigger rules",
theme.SettingsIcon(), func() {
ui.openSetupWindow(ctx)
},
)
ui.CanvasObject = container.NewVBox(
button,
)
return ui
}
func (ui *triggerRulesUI) openSetupWindow(ctx context.Context) {
w := ui.panel.app.NewWindow(AppName + ": Setup trigger rules")
resizeWindow(w, fyne.NewSize(1000, 1000))
var refreshContent func() bool
refreshContent = func() bool {
triggerRules, err := ui.panel.StreamD.ListTriggerRules(ctx)
if err != nil {
ui.panel.DisplayError(err)
return false
}
var objs []fyne.CanvasObject
for idx, triggerRule := range triggerRules {
objs = append(objs, container.NewHBox(
widget.NewButtonWithIcon("", theme.SettingsIcon(), func() {
ui.openAddOrEditSceneRuleWindow(
ctx,
"Edit trigger rule",
*triggerRule,
func(
ctx context.Context,
triggerRule *config.TriggerRule,
) error {
err := ui.panel.StreamD.UpdateTriggerRule(ctx, api.TriggerRuleID(idx), triggerRule)
if err != nil {
return err
}
observability.Go(ctx, func() { refreshContent() })
return nil
},
)
}),
widget.NewButtonWithIcon("", theme.ContentRemoveIcon(), func() {
cw := dialog.NewConfirm(
"Delete scene rule",
"Are you sure you want to delete the stream rule?",
func(b bool) {
if !b {
return
}
err := ui.panel.StreamD.RemoveTriggerRule(ctx, api.TriggerRuleID(idx))
if err != nil {
ui.panel.DisplayError(err)
return
}
observability.Go(ctx, func() { refreshContent() })
},
ui.panel.mainWindow,
)
cw.Show()
}),
widget.NewLabel(fmt.Sprintf("%v", triggerRule)),
))
}
w.SetContent(container.NewBorder(
nil,
widget.NewButtonWithIcon("Add rule", theme.ContentAddIcon(), func() {
ui.openAddOrEditSceneRuleWindow(
ctx,
"Add scene rule",
config.TriggerRule{},
func(
ctx context.Context,
triggerRule *config.TriggerRule,
) error {
_, err := ui.panel.StreamD.AddTriggerRule(ctx, triggerRule)
if err != nil {
return err
}
observability.Go(ctx, func() { refreshContent() })
return nil
},
)
}),
nil,
nil,
container.NewVBox(
objs...,
),
))
return true
}
if !refreshContent() {
w.Close()
return
}
w.Show()
}
func (ui *triggerRulesUI) openAddOrEditSceneRuleWindow(
ctx context.Context,
title string,
triggerRule config.TriggerRule,
commitFn func(context.Context, *config.TriggerRule) error,
) {
w := ui.panel.app.NewWindow(AppName + ": " + title)
resizeWindow(w, fyne.NewSize(1000, 1000))
var triggerQueryTypeList []string
_triggerQueryTypeList := serializable.ListTypeNames[eventquery.EventQuery]()
triggerQueryValues := map[string]eventquery.EventQuery{}
for _, typeName := range _triggerQueryTypeList {
value, _ := serializable.NewByTypeName[eventquery.EventQuery](typeName)
if isUIDisabled(value) {
continue
}
triggerQueryValues[typeName] = value
triggerQueryTypeList = append(triggerQueryTypeList, typeName)
}
if triggerRule.EventQuery == nil {
triggerRule.EventQuery = triggerQueryValues[triggerQueryTypeList[0]]
}
triggerQueryValues[registry.ToTypeName(triggerRule.EventQuery)] = triggerRule.EventQuery
var actionTypeList []string
_actionTypeList := serializable.ListTypeNames[action.Action]()
actionValues := map[string]action.Action{}
for _, typeName := range _actionTypeList {
value, _ := serializable.NewByTypeName[action.Action](typeName)
if isUIDisabled(value) {
continue
}
actionValues[typeName] = value
actionTypeList = append(actionTypeList, typeName)
}
if triggerRule.Action == nil {
triggerRule.Action = actionValues[actionTypeList[0]]
}
actionValues[registry.ToTypeName(triggerRule.Action)] = triggerRule.Action
var refreshContent func()
refreshContent = func() {
triggerSelector := widget.NewSelect(triggerQueryTypeList, func(s string) {
if s == registry.ToTypeName(triggerRule.EventQuery) {
return
}
triggerRule.EventQuery = triggerQueryValues[s]
refreshContent()
})
curEventQueryType := registry.ToTypeName(triggerRule.EventQuery)
triggerSelector.SetSelected(curEventQueryType)
logger.Debugf(ctx, "trigger: selector %v: cur: %v (%T)", triggerQueryTypeList, curEventQueryType, triggerRule.EventQuery)
triggerFields := makeFieldsFor(triggerRule.EventQuery)
actionSelector := widget.NewSelect(actionTypeList, func(s string) {
if s == registry.ToTypeName(triggerRule.Action) {
return
}
triggerRule.Action = actionValues[s]
refreshContent()
})
curActionType := registry.ToTypeName(triggerRule.Action)
actionSelector.SetSelected(curActionType)
logger.Debugf(ctx, "action: selector %v: cur: %v (%T)", actionTypeList, curActionType, triggerRule.Action)
actionFields := makeFieldsFor(triggerRule.Action)
w.SetContent(container.NewBorder(
nil,
widget.NewButton("Save", func() {
err := commitFn(ctx, &triggerRule)
if err != nil {
ui.panel.DisplayError(err)
return
}
w.Close()
}),
nil,
nil,
container.NewVBox(
widget.NewLabel("Trigger:"),
triggerSelector,
container.NewVBox(triggerFields...),
widget.NewLabel("Action:"),
actionSelector,
container.NewVBox(actionFields...),
),
))
}
refreshContent()
w.Show()
}

View File

@@ -0,0 +1,27 @@
package windowmanagerhandler
import (
"context"
"fmt"
)
type WindowManagerHandler struct {
*PlatformSpecificWindowManagerHandler
}
func New() (*WindowManagerHandler, error) {
wmh := &WindowManagerHandler{}
if err := wmh.init(); err != nil {
return nil, fmt.Errorf("unable to initialize a window manager handler: %w", err)
}
return wmh, nil
}
func (wmh *WindowManagerHandler) WindowFocusChangeChan(ctx context.Context) <-chan WindowFocusChange {
return wmh.PlatformSpecificWindowManagerHandler.WindowFocusChangeChan(ctx)
}
type WindowFocusChange struct {
WindowID WindowID
WindowTitle string
}

View File

@@ -0,0 +1,27 @@
//go:build linux
// +build linux
package windowmanagerhandler
import (
"context"
"os"
)
type WindowID uint64
type XWMOrWaylandWM interface {
WindowFocusChangeChan(ctx context.Context) <-chan WindowFocusChange
}
type PlatformSpecificWindowManagerHandler struct {
XWMOrWaylandWM
}
func (wmh *WindowManagerHandler) init() error {
if os.Getenv("DISPLAY") != "" {
return wmh.initUsingXServer()
} else {
return wmh.initUsingWayland()
}
}

View File

@@ -0,0 +1,9 @@
package windowmanagerhandler
import (
"fmt"
)
func (wmh *WindowManagerHandler) initUsingWayland() error {
return fmt.Errorf("support of Wayland is not implemented, yet")
}

View File

@@ -0,0 +1,80 @@
//go:build linux
// +build linux
package windowmanagerhandler
import (
"context"
"fmt"
"os"
"time"
"github.com/BurntSushi/xgb/xproto"
"github.com/BurntSushi/xgbutil"
"github.com/BurntSushi/xgbutil/ewmh"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/streamctl/pkg/observability"
)
type XWindowManagerHandler struct {
*xgbutil.XUtil
}
func (wmh *WindowManagerHandler) initUsingXServer() error {
x, err := xgbutil.NewConn()
if err != nil {
return fmt.Errorf("unable to connect to X-server using DISPLAY '%s': %w", os.Getenv("DISPLAY"), err)
}
wmh.XWMOrWaylandWM = &XWindowManagerHandler{
XUtil: x,
}
return nil
}
func (wmh *XWindowManagerHandler) WindowFocusChangeChan(ctx context.Context) <-chan WindowFocusChange {
logger.Debugf(ctx, "WindowFocusChangeChan")
ch := make(chan WindowFocusChange)
observability.Go(ctx, func() {
defer logger.Debugf(ctx, "/WindowFocusChangeChan")
defer func() {
close(ch)
}()
prevClientID := xproto.Window(0)
t := time.NewTicker(200 * time.Millisecond)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
}
clientID, err := ewmh.ActiveWindowGet(wmh.XUtil)
if err != nil {
logger.Errorf(ctx, "unable to get active window: %w", err)
continue
}
if clientID == prevClientID {
continue
}
prevClientID = clientID
name, err := ewmh.WmNameGet(wmh.XUtil, clientID)
if err != nil {
logger.Errorf(ctx, "unable to get the name of the active window (%d): %w", clientID, err)
continue
}
ch <- WindowFocusChange{
WindowID: WindowID(clientID),
WindowTitle: name,
}
}
})
return ch
}

View File

@@ -0,0 +1,16 @@
//go:build !linux
// +build !linux
package windowmanagerhandler
import (
"context"
"fmt"
)
type PlatformSpecificWindowManagerHandler struct{}
type WindowID struct{}
func (wmh *WindowManagerHandler) init(context.Context) error {
return fmt.Errorf("the support of window manager handler for this platform is not implemented, yet")
}