Initial commit, pt. 67

This commit is contained in:
Dmitrii Okunev
2024-08-04 16:03:30 +01:00
parent 40adda392b
commit 58bfcb04cf
22 changed files with 253 additions and 139 deletions

View File

@@ -63,6 +63,10 @@ func runSubprocess(
} }
procName := ProcessName(parts[0]) procName := ProcessName(parts[0])
addr := parts[1] addr := parts[1]
ctx = belt.WithField(ctx, "process", procName)
childProcessSignalHandler(ctx)
switch procName { switch procName {
case ProcessNameStreamd: case ProcessNameStreamd:
forkStreamd(ctx, addr, os.Getenv(EnvPassword)) forkStreamd(ctx, addr, os.Getenv(EnvPassword))
@@ -73,10 +77,11 @@ func runSubprocess(
func runSplitProcesses( func runSplitProcesses(
ctx context.Context, ctx context.Context,
cancelFunc context.CancelFunc,
flags Flags, flags Flags,
) { ) {
ctx = belt.WithField(ctx, "process", "main") ctx = belt.WithField(ctx, "process", ProcessNameMain)
signalsChan := signalHandler(ctx) signalsChan := mainProcessSignalHandler(ctx, cancelFunc)
procList := []ProcessName{ procList := []ProcessName{
ProcessNameUI, ProcessNameUI,
@@ -126,20 +131,22 @@ func runSplitProcesses(
logger.Fatalf(ctx, "failed to start process manager: %v", err) logger.Fatalf(ctx, "failed to start process manager: %v", err)
} }
defer m.Close() defer m.Close()
go m.Serve(ctx, func(ctx context.Context, source ProcessName, content any) error { observability.Go(ctx, func() {
switch content.(type) { m.Serve(ctx, func(ctx context.Context, source ProcessName, content any) error {
case GetFlags: switch content.(type) {
msg := GetFlagsResult{ case GetFlags:
Flags: flags, msg := GetFlagsResult{
Flags: flags,
}
err := m.SendMessagePreReady(ctx, source, msg)
if err != nil {
logger.Errorf(ctx, "failed to send message %#+v to '%s': %v", msg, source, err)
}
case MessageQuit:
signalsChan <- os.Interrupt
} }
err := m.SendMessagePreReady(ctx, source, msg) return nil
if err != nil { })
logger.Errorf(ctx, "failed to send message %#+v to '%s': %v", msg, source, err)
}
case MessageQuit:
signalsChan <- os.Interrupt
}
return nil
}) })
observability.Go(ctx, func() { observability.Go(ctx, func() {
select { select {
@@ -200,10 +207,10 @@ func runFork(
func fakeFork(ctx context.Context, procName ProcessName, addr, password string) error { func fakeFork(ctx context.Context, procName ProcessName, addr, password string) error {
switch procName { switch procName {
case ProcessNameStreamd: case ProcessNameStreamd:
go forkUI(ctx, addr, password) observability.Go(ctx, func() { forkUI(ctx, addr, password) })
return nil return nil
case ProcessNameUI: case ProcessNameUI:
go forkStreamd(ctx, addr, password) observability.Go(ctx, func() { forkStreamd(ctx, addr, password) })
return nil return nil
} }
return fmt.Errorf("unexpected process name: %s", procName) return fmt.Errorf("unexpected process name: %s", procName)

View File

@@ -29,7 +29,6 @@ func main() {
flags := parseFlags() flags := parseFlags()
ctx := getContext(flags) ctx := getContext(flags)
defer belt.Flush(ctx)
{ {
// rerunning flag parsing just for logs of parsing the flags (after initializing the logger in `getContext` above) // rerunning flag parsing just for logs of parsing the flags (after initializing the logger in `getContext` above)
for _, platformGetFlagsFunc := range platformGetFlagsFuncs { for _, platformGetFlagsFunc := range platformGetFlagsFuncs {
@@ -37,16 +36,17 @@ func main() {
} }
logger.Debugf(ctx, "flags == %#+v", flags) logger.Debugf(ctx, "flags == %#+v", flags)
} }
cancelFunc := initRuntime(ctx, flags, ProcessNameMain) ctx, cancelFunc := initRuntime(ctx, flags, ProcessNameMain)
defer cancelFunc() defer cancelFunc()
if flags.Subprocess != "" { if flags.Subprocess != "" {
runSubprocess(ctx, flags.Subprocess) runSubprocess(ctx, flags.Subprocess)
return return
} }
ctx = belt.WithField(ctx, "process", ProcessNameMain)
if flags.SplitProcess && flags.RemoteAddr == "" { if flags.SplitProcess && flags.RemoteAddr == "" {
runSplitProcesses(ctx, flags) runSplitProcesses(ctx, cancelFunc, flags)
return return
} }
@@ -124,6 +124,11 @@ func runPanel(
return nil return nil
}, },
) )
select {
case <-ctx.Done():
return
default:
}
logger.Fatalf(ctx, "communication (with the main process) error: %v", err) logger.Fatalf(ctx, "communication (with the main process) error: %v", err)
}) })
} }

View File

@@ -10,11 +10,16 @@ import (
"runtime/pprof" "runtime/pprof"
"time" "time"
"github.com/facebookincubator/go-belt"
"github.com/facebookincubator/go-belt/tool/logger" "github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/streamctl/pkg/observability" "github.com/xaionaro-go/streamctl/pkg/observability"
) )
func initRuntime(ctx context.Context, flags Flags, _procName ProcessName) context.CancelFunc { func initRuntime(
ctx context.Context,
flags Flags,
_procName ProcessName,
) (context.Context, context.CancelFunc) {
procName := string(_procName) procName := string(_procName)
var closeFuncs []func() var closeFuncs []func()
@@ -90,7 +95,11 @@ func initRuntime(ctx context.Context, flags Flags, _procName ProcessName) contex
runtime.GOMAXPROCS(16) runtime.GOMAXPROCS(16)
} }
return func() { defer belt.Flush(ctx)
ctx, cancelFn := context.WithCancel(ctx)
return ctx, func() {
cancelFn()
for i := len(closeFuncs) - 1; i >= 0; i-- { for i := len(closeFuncs) - 1; i >= 0; i-- {
closeFuncs[i]() closeFuncs[i]()
} }

View File

@@ -4,28 +4,74 @@ import (
"context" "context"
"os" "os"
"os/signal" "os/signal"
"sync"
"time"
"github.com/facebookincubator/go-belt"
"github.com/facebookincubator/go-belt/tool/logger" "github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/streamctl/pkg/observability" "github.com/xaionaro-go/streamctl/pkg/observability"
) )
func signalHandler( func mainProcessSignalHandler(
ctx context.Context,
cancelFn context.CancelFunc,
) chan<- os.Signal {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
observability.Go(ctx, func() {
for range c {
cancelFn()
forkLocker.Lock()
var wg sync.WaitGroup
for name, f := range forkMap {
wg.Add(1)
{
name, f := name, f
observability.Go(ctx, func() {
defer wg.Done()
logger.Debugf(ctx, "interrupting '%s'", name)
err := f.Process.Signal(os.Interrupt)
if err != nil {
logger.Errorf(ctx, "unable to send Interrupt to '%s': %v", name, err)
logger.Debugf(ctx, "killing '%s'", name)
f.Process.Kill()
return
}
observability.Go(ctx, func() {
time.Sleep(5 * time.Second)
logger.Debugf(ctx, "killing '%s'", name)
err := f.Process.Kill()
if err != nil {
logger.Errorf(ctx, "unable to kill '%s': %v", name, err)
}
})
err = f.Wait()
if err != nil {
logger.Errorf(ctx, "unable to wait for '%s': %v", name, err)
}
})
}
}
wg.Wait()
forkLocker.Unlock()
belt.Flush(ctx)
os.Exit(0)
}
})
return c
}
func childProcessSignalHandler(
ctx context.Context, ctx context.Context,
) chan<- os.Signal { ) chan<- os.Signal {
c := make(chan os.Signal, 1) c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt) signal.Notify(c, os.Interrupt)
observability.Go(ctx, func() { observability.Go(ctx, func() {
for range c { for range c {
forkLocker.Lock() logger.Infof(ctx, "received an interruption signal")
for name, f := range forkMap { belt.Flush(ctx)
logger.Debugf(ctx, "killing '%s'", name) os.Exit(0)
err := f.Process.Kill()
if err != nil {
logger.Errorf(ctx, "unable to kill '%s': %v", name, err)
}
}
forkLocker.Unlock()
os.Exit(1)
} }
}) })
return c return c

View File

@@ -35,7 +35,6 @@ func init() {
func forkStreamd(ctx context.Context, mainProcessAddr, password string) { func forkStreamd(ctx context.Context, mainProcessAddr, password string) {
procName := ProcessNameStreamd procName := ProcessNameStreamd
ctx = belt.WithField(ctx, "process", procName)
mainProcess, err := mainprocess.NewClient( mainProcess, err := mainprocess.NewClient(
procName, procName,
@@ -47,10 +46,8 @@ func forkStreamd(ctx context.Context, mainProcessAddr, password string) {
} }
flags := getFlags(ctx, mainProcess) flags := getFlags(ctx, mainProcess)
ctx = getContext(flags) ctx = getContext(flags)
ctx = belt.WithField(ctx, "process", procName)
defer belt.Flush(ctx)
logger.Debugf(ctx, "flags == %#+v", flags) logger.Debugf(ctx, "flags == %#+v", flags)
cancelFunc := initRuntime(ctx, flags, procName) ctx, cancelFunc := initRuntime(ctx, flags, procName)
defer cancelFunc() defer cancelFunc()
runStreamd(ctx, flags, mainProcess) runStreamd(ctx, flags, mainProcess)
@@ -169,6 +166,11 @@ func runStreamd(
} }
}, },
) )
select {
case <-ctx.Done():
return
default:
}
logger.Fatalf(ctx, "communication (with the main process) error: %v", err) logger.Fatalf(ctx, "communication (with the main process) error: %v", err)
}) })
} }

View File

@@ -11,7 +11,6 @@ import (
func forkUI(ctx context.Context, mainProcessAddr, password string) { func forkUI(ctx context.Context, mainProcessAddr, password string) {
procName := ProcessNameUI procName := ProcessNameUI
ctx = belt.WithField(ctx, "process", procName)
mainProcess, err := mainprocess.NewClient( mainProcess, err := mainprocess.NewClient(
procName, procName,
@@ -23,10 +22,8 @@ func forkUI(ctx context.Context, mainProcessAddr, password string) {
} }
flags := getFlags(ctx, mainProcess) flags := getFlags(ctx, mainProcess)
ctx = getContext(flags) ctx = getContext(flags)
ctx = belt.WithField(ctx, "process", procName)
defer belt.Flush(ctx)
logger.Debugf(ctx, "flags == %#+v", flags) logger.Debugf(ctx, "flags == %#+v", flags)
cancelFunc := initRuntime(ctx, flags, procName) ctx, cancelFunc := initRuntime(ctx, flags, procName)
defer cancelFunc() defer cancelFunc()
streamdAddr := getStreamDAddress(ctx, mainProcess) streamdAddr := getStreamDAddress(ctx, mainProcess)

View File

@@ -124,7 +124,7 @@ func (m *Manager) Serve(
<-ctx.Done() <-ctx.Done()
err := m.Close() err := m.Close()
if err != nil { if err != nil {
logger.Error(ctx, err) logger.Debug(ctx, err)
} }
}) })
@@ -238,6 +238,12 @@ func (m *Manager) handleConnection(
decoder := gob.NewDecoder(conn) decoder := gob.NewDecoder(conn)
err := decoder.Decode(&message) err := decoder.Decode(&message)
logger.Tracef(ctx, "getting a message from '%s': %#+v %#+v", regMessage.Source, message, err) logger.Tracef(ctx, "getting a message from '%s': %#+v %#+v", regMessage.Source, message, err)
select {
case <-ctx.Done():
logger.Tracef(ctx, "context was closed")
return
default:
}
if err != nil { if err != nil {
err = fmt.Errorf( err = fmt.Errorf(
"unable to parse the message from %s (%s): %w", "unable to parse the message from %s (%s): %w",
@@ -287,10 +293,13 @@ func (m *Manager) processMessage(
continue continue
} }
wg.Add(1) wg.Add(1)
go func(dst ProcessName) { {
defer wg.Done() dst := dst
errCh <- m.sendMessage(ctx, source, dst, message.Content) observability.Go(ctx, func() {
}(dst) defer wg.Done()
errCh <- m.sendMessage(ctx, source, dst, message.Content)
})
}
} }
wg.Wait() wg.Wait()
close(errCh) close(errCh)

View File

@@ -56,7 +56,7 @@ func Test(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer m.Close() defer m.Close()
go m.Serve( go m.Serve(
belt.WithField(ctx, "process", "main"), belt.WithField(ctx, "process", ProcessNameMain),
func(ctx context.Context, source ProcessName, content any) error { func(ctx context.Context, source ProcessName, content any) error {
handleCall("main", content) handleCall("main", content)
return nil return nil

View File

@@ -83,7 +83,9 @@ func NewCodeReceiver(
close(codeCh) close(codeCh)
}) })
go srv.Serve(listener) observability.Go(ctx, func() {
srv.Serve(listener)
})
return codeCh, uint16(listener.Addr().(*net.TCPAddr).Port), nil return codeCh, uint16(listener.Addr().(*net.TCPAddr).Port), nil
} }

View File

@@ -2,6 +2,7 @@ package observability
import ( import (
"bytes" "bytes"
"context"
"fmt" "fmt"
"runtime" "runtime"
@@ -68,7 +69,7 @@ func NewErrorMonitorLoggerHook(
ErrorMonitor: errorMonitor, ErrorMonitor: errorMonitor,
SendChan: make(chan ErrorMonitorMessage, 10), SendChan: make(chan ErrorMonitorMessage, 10),
} }
go result.senderLoop() GoSafe(context.TODO(), result.senderLoop)
return result return result
} }

View File

@@ -2,6 +2,8 @@ package observability
import ( import (
"context" "context"
"github.com/facebookincubator/go-belt/tool/experimental/errmon"
) )
func Go(ctx context.Context, fn func()) { func Go(ctx context.Context, fn func()) {
@@ -10,3 +12,10 @@ func Go(ctx context.Context, fn func()) {
fn() fn()
}() }()
} }
func GoSafe(ctx context.Context, fn func()) {
go func() {
defer func() { errmon.ObserveRecoverCtx(ctx, recover()) }()
fn()
}()
}

View File

@@ -229,19 +229,22 @@ func (s StreamControllers) ApplyProfiles(
errCh := make(chan error) errCh := make(chan error)
for _, p := range profiles { for _, p := range profiles {
wg.Add(1) wg.Add(1)
go func(p AbstractStreamProfile) { {
defer wg.Done() p := p
profileType := reflect.TypeOf(p) observability.Go(ctx, func() {
c, ok := m[profileType] defer wg.Done()
if !ok { profileType := reflect.TypeOf(p)
errCh <- ErrNoStreamControllerForProfile{StreamProfile: p} c, ok := m[profileType]
return if !ok {
} errCh <- ErrNoStreamControllerForProfile{StreamProfile: p}
if err := c.ApplyProfile(ctx, p); err != nil { return
errCh <- fmt.Errorf("StreamController %T return error: %w", c.GetImplementation(), err) }
return if err := c.ApplyProfile(ctx, p); err != nil {
} errCh <- fmt.Errorf("StreamController %T return error: %w", c.GetImplementation(), err)
}(p) return
}
})
}
} }
observability.Go(ctx, func() { observability.Go(ctx, func() {
wg.Wait() wg.Wait()
@@ -345,12 +348,15 @@ func (s StreamControllers) concurrently(
errCh := make(chan error) errCh := make(chan error)
for _, c := range s { for _, c := range s {
wg.Add(1) wg.Add(1)
go func(c AbstractStreamController) { {
defer wg.Done() c := c
if err := callback(c); err != nil { observability.Go(ctx, func() {
errCh <- err defer wg.Done()
} if err := callback(c); err != nil {
}(c) errCh <- err
}
})
}
} }
observability.Go(ctx, func() { observability.Go(ctx, func() {
wg.Wait() wg.Wait()

View File

@@ -377,35 +377,38 @@ func (t *Twitch) getNewToken(
} }
logger.Tracef(ctx, "starting the oauth handler at port %d", listenPort) logger.Tracef(ctx, "starting the oauth handler at port %d", listenPort)
wg.Add(1) wg.Add(1)
go func(listenPort uint16) { {
defer wg.Done() listenPort := listenPort
authURL := GetAuthorizationURL( observability.Go(ctx, func() {
&helix.AuthorizationURLParams{ defer wg.Done()
ResponseType: "code", // or "token" authURL := GetAuthorizationURL(
Scopes: []string{"channel:manage:broadcast"}, &helix.AuthorizationURLParams{
}, ResponseType: "code", // or "token"
t.config.Config.ClientID, Scopes: []string{"channel:manage:broadcast"},
fmt.Sprintf("127.0.0.1:%d", listenPort), },
) t.config.Config.ClientID,
fmt.Sprintf("127.0.0.1:%d", listenPort),
)
arg := oauthhandler.OAuthHandlerArgument{ arg := oauthhandler.OAuthHandlerArgument{
AuthURL: authURL, AuthURL: authURL,
ExchangeFn: func(code string) error { ExchangeFn: func(code string) error {
t.config.Config.ClientCode = code t.config.Config.ClientCode = code
err := t.saveCfgFn(t.config) err := t.saveCfgFn(t.config)
errmon.ObserveErrorCtx(ctx, err) errmon.ObserveErrorCtx(ctx, err)
return nil return nil
}, },
} }
err := oauthHandler(ctx, arg) err := oauthHandler(ctx, arg)
if err != nil { if err != nil {
errCh <- fmt.Errorf("unable to get or exchange the oauth code to a token: %w", err) errCh <- fmt.Errorf("unable to get or exchange the oauth code to a token: %w", err)
return return
} }
cancelFunc() cancelFunc()
success = true success = true
}(listenPort) })
}
} }
for _, listenPort := range getPortsFn() { for _, listenPort := range getPortsFn() {

View File

@@ -234,34 +234,37 @@ func getToken(ctx context.Context, cfg Config) (*oauth2.Token, error) {
oauthCfg := getAuthCfgBase(cfg) oauthCfg := getAuthCfgBase(cfg)
oauthCfg.RedirectURL = fmt.Sprintf("http://127.0.0.1:%d", listenPort) oauthCfg.RedirectURL = fmt.Sprintf("http://127.0.0.1:%d", listenPort)
wg.Add(1) wg.Add(1)
go func(oauthCfg *oauth2.Config) { {
defer wg.Done() oauthCfg := oauthCfg
oauthHandlerArg := oauthhandler.OAuthHandlerArgument{ observability.Go(ctx, func() {
AuthURL: oauthCfg.AuthCodeURL("state-token", oauth2.AccessTypeOffline), defer wg.Done()
ListenPort: listenPort, oauthHandlerArg := oauthhandler.OAuthHandlerArgument{
ExchangeFn: func(code string) error { AuthURL: oauthCfg.AuthCodeURL("state-token", oauth2.AccessTypeOffline),
_tok, err := oauthCfg.Exchange(ctx, code) ListenPort: listenPort,
if err != nil { ExchangeFn: func(code string) error {
return fmt.Errorf("unable to get a token: %w", err) _tok, err := oauthCfg.Exchange(ctx, code)
} if err != nil {
tok = _tok return fmt.Errorf("unable to get a token: %w", err)
cancelFn() }
return nil tok = _tok
}, cancelFn()
} return nil
},
}
oauthHandler := cfg.Config.CustomOAuthHandler oauthHandler := cfg.Config.CustomOAuthHandler
if oauthHandler == nil { if oauthHandler == nil {
oauthHandler = oauthhandler.OAuth2HandlerViaCLI oauthHandler = oauthhandler.OAuth2HandlerViaCLI
} }
logger.Tracef(ctx, "calling oauthHandler for %d", listenPort) logger.Tracef(ctx, "calling oauthHandler for %d", listenPort)
err := oauthHandler(ctx, oauthHandlerArg) err := oauthHandler(ctx, oauthHandlerArg)
logger.Tracef(ctx, "called oauthHandler for %d: %v", listenPort, err) logger.Tracef(ctx, "called oauthHandler for %d: %v", listenPort, err)
if err != nil { if err != nil {
errCh <- err errCh <- err
return return
} }
}(oauthCfg) })
}
} }
for _, listenPort := range cfg.Config.GetOAuthListenPorts() { for _, listenPort := range cfg.Config.GetOAuthListenPorts() {

View File

@@ -14,6 +14,7 @@ import (
"github.com/chai2010/webp" "github.com/chai2010/webp"
"github.com/facebookincubator/go-belt/tool/logger" "github.com/facebookincubator/go-belt/tool/logger"
"github.com/nfnt/resize" "github.com/nfnt/resize"
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/streamctl/pkg/screenshot" "github.com/xaionaro-go/streamctl/pkg/screenshot"
"github.com/xaionaro-go/streamctl/pkg/screenshoter" "github.com/xaionaro-go/streamctl/pkg/screenshoter"
"github.com/xaionaro-go/streamctl/pkg/streampanel/consts" "github.com/xaionaro-go/streamctl/pkg/streampanel/consts"
@@ -258,10 +259,12 @@ func (p *Panel) reinitScreenshoter(ctx context.Context) {
ctx, cancelFunc := context.WithCancel(ctx) ctx, cancelFunc := context.WithCancel(ctx)
p.screenshoterClose = cancelFunc p.screenshoterClose = cancelFunc
go p.Screenshoter.Loop( observability.Go(ctx, func() {
ctx, p.Screenshoter.Loop(
200*time.Millisecond, ctx,
p.Config.Screenshot.Config, 200*time.Millisecond,
func(ctx context.Context, img *image.RGBA) { p.setScreenshot(ctx, img) }, p.Config.Screenshot.Config,
) func(ctx context.Context, img *image.RGBA) { p.setScreenshot(ctx, img) },
)
})
} }

View File

@@ -27,7 +27,7 @@ func (p *Panel) startMonitorPage(
logger.Debugf(ctx, "startMonitorPage") logger.Debugf(ctx, "startMonitorPage")
defer logger.Debugf(ctx, "/startMonitorPage") defer logger.Debugf(ctx, "/startMonitorPage")
go func(ctx context.Context) { observability.Go(ctx, func() {
p.updateMonitorPageImages(ctx) p.updateMonitorPageImages(ctx)
p.updateMonitorPageStreamStatus(ctx) p.updateMonitorPageStreamStatus(ctx)
@@ -56,7 +56,7 @@ func (p *Panel) startMonitorPage(
p.updateMonitorPageStreamStatus(ctx) p.updateMonitorPageStreamStatus(ctx)
} }
}) })
}(ctx) })
} }
func (p *Panel) updateMonitorPageImages( func (p *Panel) updateMonitorPageImages(

View File

@@ -1047,7 +1047,7 @@ func (p *Panel) streamServersUpdater(
p.streamServersLocker.Lock() p.streamServersLocker.Lock()
defer p.streamServersLocker.Unlock() defer p.streamServersLocker.Unlock()
p.streamServersUpdaterCanceller = nil p.streamServersUpdaterCanceller = nil
go updateData() observability.Go(ctx, updateData)
}() }()
logger.Debugf(ctx, "streamServersUpdater") logger.Debugf(ctx, "streamServersUpdater")
@@ -1084,7 +1084,7 @@ func (p *Panel) startStreamPlayersUpdater(
p.streamPlayersLocker.Lock() p.streamPlayersLocker.Lock()
defer p.streamPlayersLocker.Unlock() defer p.streamPlayersLocker.Unlock()
p.streamPlayersUpdaterCanceller = nil p.streamPlayersUpdaterCanceller = nil
go updateData() observability.Go(ctx, updateData)
}() }()
logger.Debugf(ctx, "streamPlayersUpdater") logger.Debugf(ctx, "streamPlayersUpdater")
@@ -1121,7 +1121,7 @@ func (p *Panel) startStreamForwardersUpdater(
p.streamForwardersLocker.Lock() p.streamForwardersLocker.Lock()
defer p.streamForwardersLocker.Unlock() defer p.streamForwardersLocker.Unlock()
p.streamForwardersUpdaterCanceller = nil p.streamForwardersUpdaterCanceller = nil
go updateData() observability.Go(ctx, updateData)
}() }()
logger.Debugf(ctx, "streamForwardersUpdater") logger.Debugf(ctx, "streamForwardersUpdater")

View File

@@ -5,6 +5,7 @@ import (
"time" "time"
"fyne.io/fyne/v2/widget" "fyne.io/fyne/v2/widget"
"github.com/xaionaro-go/streamctl/pkg/observability"
) )
type updateTimerHandler struct { type updateTimerHandler struct {
@@ -23,7 +24,7 @@ func newUpdateTimerHandler(startStopButton *widget.Button, startedAt time.Time)
startTS: startedAt, startTS: startedAt,
} }
h.startStopButton.Text = "..." h.startStopButton.Text = "..."
go h.loop() observability.Go(ctx, h.loop)
return h return h
} }

View File

@@ -177,7 +177,7 @@ func (p *StreamPlayer) start(ctx context.Context) error {
} }
logger.Tracef(ctx, "the player #%+v opened the stream", player) logger.Tracef(ctx, "the player #%+v opened the stream", player)
go p.controllerLoop(ctx) observability.Go(ctx, func() { p.controllerLoop(ctx) })
return nil return nil
} }

View File

@@ -94,7 +94,7 @@ func New(
if cfg.Username != "" && !conn.RemoteAddr().(*net.TCPAddr).IP.IsLoopback() { if cfg.Username != "" && !conn.RemoteAddr().(*net.TCPAddr).IP.IsLoopback() {
c.Auth(cfg.Username, cfg.Password) c.Auth(cfg.Username, cfg.Password)
} }
go s.tcpHandler(c) observability.Go(ctx, func() { s.tcpHandler(c) })
} }
}) })

View File

@@ -1,6 +1,7 @@
package streams package streams
import ( import (
"context"
"encoding/json" "encoding/json"
"errors" "errors"
"strings" "strings"
@@ -9,6 +10,7 @@ import (
"github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/core"
"github.com/facebookincubator/go-belt/tool/logger" "github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/streamctl/pkg/observability"
) )
type state byte type state byte
@@ -154,7 +156,10 @@ func (p *Producer) start() {
p.state = stateStart p.state = stateStart
p.workerID++ p.workerID++
go p.worker(p.conn, p.workerID) {
conn, workerID := p.conn, p.workerID
observability.Go(context.TODO(), func() { p.worker(conn, workerID) })
}
} }
func (p *Producer) worker(conn core.Producer, workerID int) { func (p *Producer) worker(conn core.Producer, workerID int) {
@@ -239,7 +244,7 @@ func (p *Producer) reconnect(workerID, retry int) {
// swap connections // swap connections
p.conn = conn p.conn = conn
go p.worker(conn, workerID) observability.Go(context.TODO(), func() { p.worker(conn, workerID) })
} }
func (p *Producer) stop() { func (p *Producer) stop() {

View File

@@ -426,7 +426,13 @@ func (s *StreamServer) addStreamForward(
} }
if quirks.RestartUntilYoutubeRecognizesStream.Enabled { if quirks.RestartUntilYoutubeRecognizesStream.Enabled {
go s.restartUntilYoutubeRecognizesStream(ctx, result, quirks.RestartUntilYoutubeRecognizesStream) observability.Go(ctx, func() {
s.restartUntilYoutubeRecognizesStream(
ctx,
result,
quirks.RestartUntilYoutubeRecognizesStream,
)
})
} }
return result, nil return result, nil