Start implementing OBS restarter

This commit is contained in:
Dmitrii Okunev
2024-12-07 16:20:22 +00:00
parent 428de13870
commit 07540eea2f
10 changed files with 240 additions and 4 deletions

View File

@@ -1,4 +1,4 @@
package streampanel
package command
import (
"context"
@@ -22,7 +22,7 @@ func init() {
)
}
func expandCommand(
func Expand(
ctx context.Context,
cmdString string,
context any,

View File

@@ -1,6 +1,9 @@
package kick
import (
"context"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol"
streamctl "github.com/xaionaro-go/streamctl/pkg/streamcontrol"
kick "github.com/xaionaro-go/streamctl/pkg/streamcontrol/kick/types"
)
@@ -19,3 +22,10 @@ func init() {
func InitConfig(cfg streamctl.Config) {
kick.InitConfig(cfg)
}
func GetConfig(
ctx context.Context,
cfg streamcontrol.Config,
) *Config {
return streamcontrol.GetPlatformConfig[PlatformSpecificConfig, StreamProfile](ctx, cfg, ID)
}

View File

@@ -1,6 +1,9 @@
package obs
import (
"context"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol"
streamctl "github.com/xaionaro-go/streamctl/pkg/streamcontrol"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol/obs/types"
)
@@ -18,3 +21,10 @@ func init() {
func InitConfig(cfg streamctl.Config) {
types.InitConfig(cfg)
}
func GetConfig(
ctx context.Context,
cfg streamcontrol.Config,
) *Config {
return streamcontrol.GetPlatformConfig[PlatformSpecificConfig, StreamProfile](ctx, cfg, ID)
}

View File

@@ -17,6 +17,10 @@ type PlatformSpecificConfig struct {
Name string `yaml:"name" json:"name"`
Duration time.Duration `yaml:"duration" json:"duration"`
} `yaml:"scene_after_stream" json:"scene_after_stream"`
RestartOnUnavailable struct {
Enable bool `yaml:"bool" json:"bool"`
ExecCommand string `yaml:"exec_command" json:"exec_command"`
} `yaml:"restart_on_unavailable" json:"restart_on_unavailable"`
}
type Config = streamctl.PlatformConfig[PlatformSpecificConfig, StreamProfile]

View File

@@ -1,6 +1,9 @@
package twitch
import (
"context"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol"
streamctl "github.com/xaionaro-go/streamctl/pkg/streamcontrol"
twitch "github.com/xaionaro-go/streamctl/pkg/streamcontrol/twitch/types"
)
@@ -19,3 +22,10 @@ func init() {
func InitConfig(cfg streamctl.Config) {
twitch.InitConfig(cfg)
}
func GetConfig(
ctx context.Context,
cfg streamcontrol.Config,
) *Config {
return streamcontrol.GetPlatformConfig[PlatformSpecificConfig, StreamProfile](ctx, cfg, ID)
}

View File

@@ -1,6 +1,9 @@
package youtube
import (
"context"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol"
streamctl "github.com/xaionaro-go/streamctl/pkg/streamcontrol"
youtube "github.com/xaionaro-go/streamctl/pkg/streamcontrol/youtube/types"
)
@@ -20,6 +23,13 @@ func InitConfig(cfg streamctl.Config) {
youtube.InitConfig(cfg)
}
func GetConfig(
ctx context.Context,
cfg streamcontrol.Config,
) *Config {
return streamcontrol.GetPlatformConfig[PlatformSpecificConfig, StreamProfile](ctx, cfg, ID)
}
type TemplateTags = youtube.TemplateTags
const (

View File

@@ -0,0 +1,131 @@
package streamd
import (
"context"
"fmt"
"time"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/obs-grpc-proxy/protobuf/go/obs_grpc"
"github.com/xaionaro-go/streamctl/pkg/command"
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol/obs"
"github.com/xaionaro-go/streamctl/pkg/xsync"
)
type obsRestarter struct {
locker xsync.Mutex
streamD *StreamD
cancelFunc context.CancelFunc
}
func newOBSRestarter(d *StreamD) *obsRestarter {
return &obsRestarter{streamD: d}
}
func (d *StreamD) initOBSRestarter(
ctx context.Context,
) error {
d.obsRestarter = newOBSRestarter(d)
d.obsRestarter.updateConfig(ctx)
return nil
}
func (d *StreamD) updateOBSRestarterConfig(
ctx context.Context,
) error {
return d.obsRestarter.updateConfig(ctx)
}
func (r *obsRestarter) updateConfig(
ctx context.Context,
) error {
return xsync.DoA1R1(ctx, &r.locker, r.updateConfigNoLock, ctx)
}
func (r *obsRestarter) updateConfigNoLock(
ctx context.Context,
) error {
if r.cancelFunc != nil {
r.cancelFunc()
r.cancelFunc = nil
}
d := r.streamD
cfg, err := d.GetConfig(ctx)
if err != nil {
return fmt.Errorf("unable to get config: %w", err)
}
obsCfg := obs.GetConfig(ctx, cfg.Backends)
if obsCfg == nil {
return fmt.Errorf("OBS config is not set")
}
if !obsCfg.Config.RestartOnUnavailable.Enable {
return nil
}
execCmd, err := command.Expand(ctx, obsCfg.Config.RestartOnUnavailable.ExecCommand, nil)
if err != nil {
return fmt.Errorf("unable to expand the command '%s': %w", obsCfg.Config.RestartOnUnavailable.ExecCommand, err)
}
ctx, cancelFn := context.WithCancel(ctx)
r.cancelFunc = cancelFn
observability.Go(ctx, func() {
d.obsRestarter.loop(ctx, execCmd)
})
return nil
}
func (r *obsRestarter) loop(
ctx context.Context,
execCmd []string,
) {
logger.Debugf(ctx, "OBS-restarter: loop: %#+v", execCmd)
defer logger.Debugf(ctx, "/OBS-restarter: loop: %#+v", execCmd)
t := time.NewTicker(time.Second)
defer t.Stop()
for {
select {
case <-t.C:
r.checkOBSAndRestartIfNeeded(
ctx,
execCmd,
)
case <-ctx.Done():
return
}
}
}
func (r *obsRestarter) checkOBSAndRestartIfNeeded(
ctx context.Context,
execCmd []string,
) {
obsServer, closeFn, err := r.streamD.OBS(ctx)
if closeFn != nil {
defer closeFn()
}
if err != nil {
logger.Error(ctx, "unable to connect to OBS server: %v", err)
r.restartOBS(ctx, execCmd)
return
}
_, err = obsServer.GetStats(ctx, &obs_grpc.GetStatsRequest{})
if err != nil {
logger.Error(ctx, "unable to get stats from the OBS server: %v", err)
r.restartOBS(ctx, execCmd)
return
}
}
func (r *obsRestarter) restartOBS(
ctx context.Context,
execCmd []string,
) {
logger.Errorf(ctx, "TODO: implement me, I was supposed to restart OBS here with command %v", execCmd)
}

View File

@@ -104,6 +104,8 @@ type StreamD struct {
imageTakerLocker xsync.Mutex
imageTakerCancel context.CancelFunc
imageTakerWG sync.WaitGroup
obsRestarter *obsRestarter
}
type imageHash uint64
@@ -195,6 +197,11 @@ func (d *StreamD) Run(ctx context.Context) (_ret error) { // TODO: delete the fe
d.UI.DisplayError(fmt.Errorf("unable to initialize the P2P network: %w", err))
}
d.UI.SetStatus("OBS restarter...")
if err := d.initOBSRestarter(ctx); err != nil {
d.UI.DisplayError(fmt.Errorf("unable to initialize the OBS restarter: %w", err))
}
d.UI.SetStatus("Initializing UI...")
return nil
}
@@ -465,9 +472,40 @@ func (d *StreamD) setConfig(ctx context.Context, cfg *config.Config) (_ret error
return fmt.Errorf("unable to convert the config: %w", err)
}
d.Config = *cfg
if err := d.onUpdateConfig(ctx); err != nil {
logger.Errorf(ctx, "onUpdateConfig: %v", err)
}
return nil
}
func (d *StreamD) onUpdateConfig(
ctx context.Context,
) error {
var wg sync.WaitGroup
errCh := make(chan error, 1)
wg.Add(1)
observability.Go(ctx, func() {
defer wg.Done()
errCh <- d.updateOBSRestarterConfig(ctx)
})
observability.Go(ctx, func() {
wg.Wait()
close(errCh)
})
var mErr *multierror.Error
for err := range errCh {
if err == nil {
continue
}
mErr = multierror.Append(mErr, err)
}
return mErr.ErrorOrNil()
}
func (d *StreamD) IsBackendEnabled(
ctx context.Context,
id streamcontrol.PlatformName,

View File

@@ -28,6 +28,7 @@ import (
"github.com/xaionaro-go/obs-grpc-proxy/protobuf/go/obs_grpc"
"github.com/xaionaro-go/streamctl/pkg/autoupdater"
"github.com/xaionaro-go/streamctl/pkg/buildvars"
"github.com/xaionaro-go/streamctl/pkg/command"
gconsts "github.com/xaionaro-go/streamctl/pkg/consts"
"github.com/xaionaro-go/streamctl/pkg/oauthhandler"
"github.com/xaionaro-go/streamctl/pkg/observability"
@@ -1739,7 +1740,7 @@ func (p *Panel) execCommand(
cmdString string,
execContext any,
) {
cmdExpanded, err := expandCommand(ctx, cmdString, execContext)
cmdExpanded, err := command.Expand(ctx, cmdString, execContext)
if err != nil {
p.DisplayError(err)
}

View File

@@ -286,14 +286,36 @@ func (p *Panel) openSettingsWindowNoLock(
}
obsCfg.Config.SceneAfterStream.Duration = time.Duration(float64(time.Second) * v)
}
obsExecCommand := widget.NewEntry()
obsExecCommand.SetPlaceHolder("command to exec OBS")
obsExecCommand.SetText(obsCfg.Config.RestartOnUnavailable.ExecCommand)
if !obsCfg.Config.RestartOnUnavailable.Enable {
obsExecCommand.Hide()
}
obsExecCommand.OnChanged = func(s string) {
obsCfg.Config.RestartOnUnavailable.ExecCommand = s
}
autoRestartEnable := widget.NewCheck("Auto-restart (if OBS is hanging or not started)", func(b bool) {
obsCfg.Config.RestartOnUnavailable.Enable = b
if obsCfg.Config.RestartOnUnavailable.Enable {
obsExecCommand.Show()
} else {
obsExecCommand.Hide()
}
})
autoRestartEnable.SetChecked(obsCfg.Config.RestartOnUnavailable.Enable)
obsSettings.Add(container.NewVBox(
widget.NewRichTextFromMarkdown(`# OBS`),
widget.NewLabel("Switch to scene after streaming:"),
sceneAfterStreamingSelector,
widget.NewLabel("Hold the scene for (seconds):"),
sceneAfterStreamingDuration,
autoRestartEnable,
obsExecCommand,
))
}
}