Upgrade some dependencies

This commit is contained in:
Dmitrii Okunev
2025-06-22 14:59:00 +01:00
parent bf93d16666
commit eea64a701b
57 changed files with 215 additions and 213 deletions

View File

@@ -42,7 +42,7 @@ var (
l.Error("unable to get the value of the flag 'go-net-pprof-addr': %v", err) l.Error("unable to get the value of the flag 'go-net-pprof-addr': %v", err)
} }
if netPprofAddr != "" { if netPprofAddr != "" {
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
if netPprofAddr == "" { if netPprofAddr == "" {
netPprofAddr = "localhost:0" netPprofAddr = "localhost:0"
} }

View File

@@ -91,7 +91,7 @@ func main() {
ctx = logger.CtxWithLogger(ctx, l) ctx = logger.CtxWithLogger(ctx, l)
if *netPprofAddr != "" || (forceNetPProfOnAndroid && runtime.GOOS == "android") { if *netPprofAddr != "" || (forceNetPProfOnAndroid && runtime.GOOS == "android") {
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
if *netPprofAddr == "" { if *netPprofAddr == "" {
*netPprofAddr = "localhost:0" *netPprofAddr = "localhost:0"
} }
@@ -175,7 +175,7 @@ func main() {
l.Fatalf("unable to initialize the streamd instance: %v", err) l.Fatalf("unable to initialize the streamd instance: %v", err)
} }
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
if err = streamD.Run(ctx); err != nil { if err = streamD.Run(ctx); err != nil {
l.Errorf("streamd returned an error: %v", err) l.Errorf("streamd returned an error: %v", err)
} }
@@ -186,7 +186,7 @@ func main() {
log.Fatalf("failed to listen: %v", err) log.Fatalf("failed to listen: %v", err)
} }
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
<-ctx.Done() <-ctx.Done()
listener.Close() listener.Close()
}) })

View File

@@ -100,7 +100,7 @@ func getContext(
closeFile = func() { f.Close() } closeFile = func() { f.Close() }
} }
rotateFunc() rotateFunc()
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer func() { defer func() {
logger.Debugf(ctx, "log rotator is closed") logger.Debugf(ctx, "log rotator is closed")
}() }()

View File

@@ -147,7 +147,7 @@ func runSplitProcesses(
logger.Fatalf(ctx, "failed to start process manager: %v", err) logger.Fatalf(ctx, "failed to start process manager: %v", err)
} }
defer m.Close() defer m.Close()
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
m.Serve(ctx, func(ctx context.Context, source ProcessName, content any) error { m.Serve(ctx, func(ctx context.Context, source ProcessName, content any) error {
switch content.(type) { switch content.(type) {
case GetFlags: case GetFlags:
@@ -170,7 +170,7 @@ func runSplitProcesses(
return nil return nil
}) })
}) })
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
@@ -289,7 +289,7 @@ func runFork(
logger.Errorf(ctx, "unable to register the command %v to be auto-killed: %v (GOOS: %v)", args, err, runtime.GOOS) logger.Errorf(ctx, "unable to register the command %v to be auto-killed: %v (GOOS: %v)", args, err, runtime.GOOS)
} }
} }
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
err := cmd.Wait() err := cmd.Wait()
stderrLogger.Flush() stderrLogger.Flush()
stdoutLogger.Flush() stdoutLogger.Flush()
@@ -307,10 +307,10 @@ func runFork(
func fakeFork(ctx context.Context, procName ProcessName, addr, password string) error { func fakeFork(ctx context.Context, procName ProcessName, addr, password string) error {
switch procName { switch procName {
case ProcessNameStreamd: case ProcessNameStreamd:
observability.Go(ctx, func() { forkUI(ctx, addr, password) }) observability.Go(ctx, func(ctx context.Context) { forkUI(ctx, addr, password) })
return nil return nil
case ProcessNameUI: case ProcessNameUI:
observability.Go(ctx, func() { forkStreamd(ctx, addr, password) }) observability.Go(ctx, func(ctx context.Context) { forkStreamd(ctx, addr, password) })
return nil return nil
} }
return fmt.Errorf("unexpected process name: %s", procName) return fmt.Errorf("unexpected process name: %s", procName)

View File

@@ -49,7 +49,7 @@ func main() {
} }
ctx = belt.WithField(ctx, "process", ProcessNameMain) ctx = belt.WithField(ctx, "process", ProcessNameMain)
defer func() { observability.PanicIfNotNil(ctx, recover()) }() defer func() { observability.PanicIfNotNil(ctx, recover()) }()
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
<-ctx.Done() <-ctx.Done()
logger.Debugf(ctx, "context is cancelled") logger.Debugf(ctx, "context is cancelled")
}) })
@@ -125,7 +125,7 @@ func runPanel(
return err return err
} }
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
err = grpcServer.Serve(listener) err = grpcServer.Serve(listener)
if err != nil { if err != nil {
logger.Panicf(ctx, "unable to server the gRPC server: %v", err) logger.Panicf(ctx, "unable to server the gRPC server: %v", err)
@@ -135,7 +135,7 @@ func runPanel(
if mainProcess != nil { if mainProcess != nil {
setReadyFor(ctx, mainProcess, StreamDDied{}, UpdateStreamDConfig{}) setReadyFor(ctx, mainProcess, StreamDDied{}, UpdateStreamDConfig{})
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
err := mainProcess.Serve( err := mainProcess.Serve(
ctx, ctx,
func(ctx context.Context, source mainprocess.ProcessName, content any) error { func(ctx context.Context, source mainprocess.ProcessName, content any) error {

View File

@@ -30,7 +30,7 @@ func initRuntime(
l := logger.FromCtx(ctx) l := logger.FromCtx(ctx)
if ForceDebug { if ForceDebug {
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
t := time.NewTicker(time.Second) t := time.NewTicker(time.Second)
defer t.Stop() defer t.Stop()
for { for {
@@ -88,7 +88,7 @@ func initRuntime(
} }
if netPprofAddr != "" { if netPprofAddr != "" {
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
http.Handle( http.Handle(
"/metrics", "/metrics",
promhttp.Handler(), promhttp.Handler(),
@@ -104,7 +104,7 @@ func initRuntime(
runtime.GOMAXPROCS(4) runtime.GOMAXPROCS(4)
} }
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
t := time.NewTicker(time.Second) t := time.NewTicker(time.Second)
defer t.Stop() defer t.Stop()
for { for {

View File

@@ -17,7 +17,7 @@ func mainProcessSignalHandler(
) chan<- os.Signal { ) chan<- os.Signal {
c := make(chan os.Signal, 1) c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt) signal.Notify(c, os.Interrupt)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
for range c { for range c {
cancelFn() cancelFn()
forkLocker.Do(ctx, func() { forkLocker.Do(ctx, func() {
@@ -26,7 +26,7 @@ func mainProcessSignalHandler(
wg.Add(1) wg.Add(1)
{ {
name, f := name, f name, f := name, f
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer wg.Done() defer wg.Done()
logger.Debugf(ctx, "interrupting '%s'", name) logger.Debugf(ctx, "interrupting '%s'", name)
err := f.Process.Signal(os.Interrupt) err := f.Process.Signal(os.Interrupt)
@@ -37,7 +37,7 @@ func mainProcessSignalHandler(
return return
} }
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
logger.Debugf(ctx, "killing '%s'", name) logger.Debugf(ctx, "killing '%s'", name)
err := f.Process.Kill() err := f.Process.Kill()
@@ -67,7 +67,7 @@ func childProcessSignalHandler(
) chan<- os.Signal { ) chan<- os.Signal {
c := make(chan os.Signal, 1) c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt) signal.Notify(c, os.Interrupt)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
for range c { for range c {
logger.Infof(ctx, "received an interruption signal") logger.Infof(ctx, "received an interruption signal")
cancelFunc() cancelFunc()

View File

@@ -57,7 +57,7 @@ func forkStreamd(preCtx context.Context, mainProcessAddr, password string) {
logger.Debugf(ctx, "flags == %#+v", flags) logger.Debugf(ctx, "flags == %#+v", flags)
ctx, cancelFunc := initRuntime(ctx, flags, procName) ctx, cancelFunc := initRuntime(ctx, flags, procName)
defer cancelFunc() defer cancelFunc()
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
<-ctx.Done() <-ctx.Done()
logger.Debugf(ctx, "context is cancelled") logger.Debugf(ctx, "context is cancelled")
}) })
@@ -187,7 +187,7 @@ func runStreamd(
if mainProcess != nil { if mainProcess != nil {
logger.Debugf(ctx, "starting the IPC server") logger.Debugf(ctx, "starting the IPC server")
setReadyFor(ctx, mainProcess, GetStreamdAddress{}, RequestStreamDConfig{}) setReadyFor(ctx, mainProcess, GetStreamdAddress{}, RequestStreamDConfig{})
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
err := mainProcess.Serve( err := mainProcess.Serve(
ctx, ctx,
func( func(
@@ -267,7 +267,7 @@ func initGRPCServers(
} }
obsGRPC, obsGRPCClose, err := streamD.OBS(ctx) obsGRPC, obsGRPCClose, err := streamD.OBS(ctx)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
<-ctx.Done() <-ctx.Done()
listener.Close() listener.Close()
if obsGRPCClose != nil { if obsGRPCClose != nil {
@@ -298,7 +298,7 @@ func initGRPCServers(
registerGRPCServices(grpcServer, streamdGRPC, obsGRPC, proxyGRPC) registerGRPCServices(grpcServer, streamdGRPC, obsGRPC, proxyGRPC)
// start the server: // start the server:
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
logger.Infof(ctx, "started server at %s", listener.Addr().String()) logger.Infof(ctx, "started server at %s", listener.Addr().String())
err = grpcServer.Serve(listener) err = grpcServer.Serve(listener)
select { select {

View File

@@ -29,7 +29,7 @@ func forkUI(preCtx context.Context, mainProcessAddr, password string) {
ctx, cancelFunc := initRuntime(ctx, flags, procName) ctx, cancelFunc := initRuntime(ctx, flags, procName)
defer cancelFunc() defer cancelFunc()
defer belt.Flush(ctx) defer belt.Flush(ctx)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
<-ctx.Done() <-ctx.Done()
logger.Debugf(ctx, "context is cancelled") logger.Debugf(ctx, "context is cancelled")
}) })

21
go.mod
View File

@@ -23,7 +23,7 @@ replace github.com/wlynxg/anet => github.com/BieHDC/anet v0.0.6-0.20241226223613
replace github.com/nicklaw5/helix/v2 v2.30.1-0.20240715193454-0151ccccf980 => github.com/xaionaro-go/helix/v2 v2.0.0-20250309182928-f54c9d4c8a29 replace github.com/nicklaw5/helix/v2 v2.30.1-0.20240715193454-0151ccccf980 => github.com/xaionaro-go/helix/v2 v2.0.0-20250309182928-f54c9d4c8a29
replace github.com/asticode/go-astiav v0.35.1 => github.com/xaionaro-go/astiav v0.0.0-20250406220418-87d14d2908f9 replace github.com/asticode/go-astiav v0.36.0 => github.com/xaionaro-go/astiav v0.0.0-20250521203320-7402f3e25a7c
replace github.com/bluenviron/mediacommon/v2 v2.0.1-0.20250324151931-b8ce69d15d3d => github.com/xaionaro-go/mediacommon/v2 v2.0.0-20250420012906-03d6d69ac3b7 replace github.com/bluenviron/mediacommon/v2 v2.0.1-0.20250324151931-b8ce69d15d3d => github.com/xaionaro-go/mediacommon/v2 v2.0.0-20250420012906-03d6d69ac3b7
@@ -207,11 +207,12 @@ require (
github.com/volatiletech/sqlboiler/v4 v4.16.2 // indirect github.com/volatiletech/sqlboiler/v4 v4.16.2 // indirect
github.com/volatiletech/strmangle v0.0.6 // indirect github.com/volatiletech/strmangle v0.0.6 // indirect
github.com/wlynxg/anet v0.0.6-0.20250109065809-5501d401a269 // indirect github.com/wlynxg/anet v0.0.6-0.20250109065809-5501d401a269 // indirect
github.com/xaionaro-go/avmediacodec v0.0.0-20250421150856-ddd390422c21 // indirect github.com/xaionaro-go/avcommon v0.0.0-20250510235605-840f8210b727 // indirect
github.com/xaionaro-go/avmediacodec v0.0.0-20250505012527-c819676502d8 // indirect
github.com/xaionaro-go/gorex v0.0.0-20241010205749-bcd59d639c4d // indirect github.com/xaionaro-go/gorex v0.0.0-20241010205749-bcd59d639c4d // indirect
github.com/xaionaro-go/libsrt v0.0.0-20250105232601-e760c79b2bc3 // indirect github.com/xaionaro-go/libsrt v0.0.0-20250505013920-61d894a3b7e9 // indirect
github.com/xaionaro-go/ndk v0.0.0-20250420195304-361bb98583bf // indirect github.com/xaionaro-go/ndk v0.0.0-20250420195304-361bb98583bf // indirect
github.com/xaionaro-go/proxy v0.0.0-20250111150848-1f0e7b262638 // indirect github.com/xaionaro-go/proxy v0.0.0-20250525144747-579f5a891c15 // indirect
github.com/xaionaro-go/spinlock v0.0.0-20200518175509-30e6d1ce68a1 // indirect github.com/xaionaro-go/spinlock v0.0.0-20200518175509-30e6d1ce68a1 // indirect
github.com/xanzy/ssh-agent v0.3.3 // indirect github.com/xanzy/ssh-agent v0.3.3 // indirect
github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778 // indirect github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778 // indirect
@@ -255,7 +256,7 @@ require (
github.com/andreykaipov/goobs v1.4.1 github.com/andreykaipov/goobs v1.4.1
github.com/anthonynsimon/bild v0.14.0 github.com/anthonynsimon/bild v0.14.0
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef
github.com/asticode/go-astiav v0.35.1 github.com/asticode/go-astiav v0.36.0
github.com/bamiaux/rez v0.0.0-20170731184118-29f4463c688b github.com/bamiaux/rez v0.0.0-20170731184118-29f4463c688b
github.com/bluenviron/gortsplib/v4 v4.12.4-0.20250324174248-61372cfa6800 github.com/bluenviron/gortsplib/v4 v4.12.4-0.20250324174248-61372cfa6800
github.com/chai2010/webp v1.1.1 github.com/chai2010/webp v1.1.1
@@ -289,7 +290,7 @@ require (
github.com/spf13/pflag v1.0.6 github.com/spf13/pflag v1.0.6
github.com/stretchr/testify v1.10.0 github.com/stretchr/testify v1.10.0
github.com/xaionaro-go/audio v0.0.0-20250210102901-abfced9d5ef3 github.com/xaionaro-go/audio v0.0.0-20250210102901-abfced9d5ef3
github.com/xaionaro-go/avpipeline v0.0.0-20250428012319-401ac2ebe66c github.com/xaionaro-go/avpipeline v0.0.0-20250525204026-17104bc4baca
github.com/xaionaro-go/datacounter v1.0.4 github.com/xaionaro-go/datacounter v1.0.4
github.com/xaionaro-go/go-rtmp v0.0.0-20241009130244-1e3160f27f42 github.com/xaionaro-go/go-rtmp v0.0.0-20241009130244-1e3160f27f42
github.com/xaionaro-go/grpcproxy v0.0.0-20241103205849-a8fef42e72f9 github.com/xaionaro-go/grpcproxy v0.0.0-20241103205849-a8fef42e72f9
@@ -300,9 +301,9 @@ require (
github.com/xaionaro-go/mediamtx v0.0.0-20250406132618-79ecbc3e138f github.com/xaionaro-go/mediamtx v0.0.0-20250406132618-79ecbc3e138f
github.com/xaionaro-go/object v0.0.0-20241026212449-753ce10ec94c github.com/xaionaro-go/object v0.0.0-20241026212449-753ce10ec94c
github.com/xaionaro-go/obs-grpc-proxy v0.0.0-20241018162120-5faf4e7a684a github.com/xaionaro-go/obs-grpc-proxy v0.0.0-20241018162120-5faf4e7a684a
github.com/xaionaro-go/observability v0.0.0-20250420133500-5c4d2e045932 github.com/xaionaro-go/observability v0.0.0-20250622130956-24b7017284e4
github.com/xaionaro-go/player v0.0.0-20250427220051-e366ad8a1fb5 github.com/xaionaro-go/player v0.0.0-20250622133132-5473824ef0d0
github.com/xaionaro-go/recoder v0.0.0-20250503155018-6f353978d332 github.com/xaionaro-go/recoder v0.0.0-20250622133456-7bd1af83fda5
github.com/xaionaro-go/secret v0.0.0-20250111141743-ced12e1082c2 github.com/xaionaro-go/secret v0.0.0-20250111141743-ced12e1082c2
github.com/xaionaro-go/serializable v0.0.0-20250412140540-5ac572306599 github.com/xaionaro-go/serializable v0.0.0-20250412140540-5ac572306599
github.com/xaionaro-go/timeapiio v0.0.0-20240915203246-b907cf699af3 github.com/xaionaro-go/timeapiio v0.0.0-20240915203246-b907cf699af3
@@ -315,7 +316,7 @@ require (
github.com/xaionaro-go/xsync v0.0.0-20250614210231-b74f647f859f github.com/xaionaro-go/xsync v0.0.0-20250614210231-b74f647f859f
github.com/yutopp/go-flv v0.3.1 github.com/yutopp/go-flv v0.3.1
golang.org/x/crypto v0.38.0 golang.org/x/crypto v0.38.0
google.golang.org/grpc v1.71.1 google.golang.org/grpc v1.72.1
google.golang.org/protobuf v1.36.6 google.golang.org/protobuf v1.36.6
gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1 gopkg.in/yaml.v3 v3.0.1

42
go.sum
View File

@@ -1076,14 +1076,16 @@ github.com/volatiletech/strmangle v0.0.6 h1:AdOYE3B2ygRDq4rXDij/MMwq6KVK/pWAYxpC
github.com/volatiletech/strmangle v0.0.6/go.mod h1:ycDvbDkjDvhC0NUU8w3fWwl5JEMTV56vTKXzR3GeR+0= github.com/volatiletech/strmangle v0.0.6/go.mod h1:ycDvbDkjDvhC0NUU8w3fWwl5JEMTV56vTKXzR3GeR+0=
github.com/x-cray/logrus-prefixed-formatter v0.5.2 h1:00txxvfBM9muc0jiLIEAkAcIMJzfthRT6usrui8uGmg= github.com/x-cray/logrus-prefixed-formatter v0.5.2 h1:00txxvfBM9muc0jiLIEAkAcIMJzfthRT6usrui8uGmg=
github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE= github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE=
github.com/xaionaro-go/astiav v0.0.0-20250406220418-87d14d2908f9 h1:6thYU3Iykk876UV8zTlN9rut3BEhLv/k28fiHDcx+uE= github.com/xaionaro-go/astiav v0.0.0-20250521203320-7402f3e25a7c h1:MTrVWO9fpyuavKvXCitikDCHRSOorjD7/OOejhAXz+A=
github.com/xaionaro-go/astiav v0.0.0-20250406220418-87d14d2908f9/go.mod h1:K7D8UC6GeQt85FUxk2KVwYxHnotrxuEnp5evkkudc2s= github.com/xaionaro-go/astiav v0.0.0-20250521203320-7402f3e25a7c/go.mod h1:GI0pHw6K2/pl/o8upCtT49P/q4KCwhv/8nGLlCsZLdA=
github.com/xaionaro-go/audio v0.0.0-20250210102901-abfced9d5ef3 h1:LRIpqqC7Gsz5+/EsIWRtdPZZPMpx9yykUVFyUnRaKbE= github.com/xaionaro-go/audio v0.0.0-20250210102901-abfced9d5ef3 h1:LRIpqqC7Gsz5+/EsIWRtdPZZPMpx9yykUVFyUnRaKbE=
github.com/xaionaro-go/audio v0.0.0-20250210102901-abfced9d5ef3/go.mod h1:i4CntPlryh9HLmA3p3M0CNr1usRkEkuh3N2Ui3HeXQA= github.com/xaionaro-go/audio v0.0.0-20250210102901-abfced9d5ef3/go.mod h1:i4CntPlryh9HLmA3p3M0CNr1usRkEkuh3N2Ui3HeXQA=
github.com/xaionaro-go/avmediacodec v0.0.0-20250421150856-ddd390422c21 h1:afsJFvZaf4z87n/V9LgXgD6u8MZBesZj8mnHvlfRi2k= github.com/xaionaro-go/avcommon v0.0.0-20250510235605-840f8210b727 h1:uQ8V1T3Oeru15gEIthts9GWkMS3eQ0Eo7vOKvE4G2/k=
github.com/xaionaro-go/avmediacodec v0.0.0-20250421150856-ddd390422c21/go.mod h1:obX+/HoeX6lzM/Zuzgtm0JdG8CQ58I1isp5xSQEm+kQ= github.com/xaionaro-go/avcommon v0.0.0-20250510235605-840f8210b727/go.mod h1:kjLo1LasgdDJqbTGD5bbEM+D6RiZSbf5ZT8yiPFF1BA=
github.com/xaionaro-go/avpipeline v0.0.0-20250428012319-401ac2ebe66c h1:KTQYCAJAofdHnjxYC8VV+oMW2WRnxa8wy2o0fNlfueU= github.com/xaionaro-go/avmediacodec v0.0.0-20250505012527-c819676502d8 h1:FZn9+TN3uHhohfpanWkR9lFNHApizznZbML6XjvEgTU=
github.com/xaionaro-go/avpipeline v0.0.0-20250428012319-401ac2ebe66c/go.mod h1:Czl84biYuPZeILYiRiziR0yeuXKvrqQfzJ1ItjSq/o0= github.com/xaionaro-go/avmediacodec v0.0.0-20250505012527-c819676502d8/go.mod h1:2W2Kp/HJFXcFBppQ4YytgDy/ydFL3hGc23xSB1U/Luc=
github.com/xaionaro-go/avpipeline v0.0.0-20250525204026-17104bc4baca h1:Cls4rEimemZicWzhVlzQm1otU/RTpfNVpgdfwvGEJrQ=
github.com/xaionaro-go/avpipeline v0.0.0-20250525204026-17104bc4baca/go.mod h1:LMh5Qi7cuntcktUezfA9toVCUCCsx9pjyGDWe9GLt9A=
github.com/xaionaro-go/datacounter v1.0.4 h1:+QMZLmu73R5WGkQfUPwlXF/JFN+Weo4iuDZkiL2wVm8= github.com/xaionaro-go/datacounter v1.0.4 h1:+QMZLmu73R5WGkQfUPwlXF/JFN+Weo4iuDZkiL2wVm8=
github.com/xaionaro-go/datacounter v1.0.4/go.mod h1:Sf9vBevuV6w5iE6K3qJ9pWVKcyS60clWBUSQLjt5++c= github.com/xaionaro-go/datacounter v1.0.4/go.mod h1:Sf9vBevuV6w5iE6K3qJ9pWVKcyS60clWBUSQLjt5++c=
github.com/xaionaro-go/fyne/v2 v2.0.0-20250622004601-3a26ee69528a h1:awMQXlaweeiSZB4rSNfMmJGJriyn1ca/m/lglBi9uyA= github.com/xaionaro-go/fyne/v2 v2.0.0-20250622004601-3a26ee69528a h1:awMQXlaweeiSZB4rSNfMmJGJriyn1ca/m/lglBi9uyA=
@@ -1102,8 +1104,8 @@ github.com/xaionaro-go/iterate v0.0.0-20250406123757-7802d56b52ce h1:4a0vM4EOq7c
github.com/xaionaro-go/iterate v0.0.0-20250406123757-7802d56b52ce/go.mod h1:Dx52o1WH1xUoV8jjpiIVTRZ/rNoPNPcnLdMrQ5D2FH0= github.com/xaionaro-go/iterate v0.0.0-20250406123757-7802d56b52ce/go.mod h1:Dx52o1WH1xUoV8jjpiIVTRZ/rNoPNPcnLdMrQ5D2FH0=
github.com/xaionaro-go/kickcom v0.0.0-20250316223447-a1fe3d153d96 h1:SZlH+zdXjJ4QYjFZURNvJUF3cH0ToBJhACqXdxuN+50= github.com/xaionaro-go/kickcom v0.0.0-20250316223447-a1fe3d153d96 h1:SZlH+zdXjJ4QYjFZURNvJUF3cH0ToBJhACqXdxuN+50=
github.com/xaionaro-go/kickcom v0.0.0-20250316223447-a1fe3d153d96/go.mod h1:Y8KwAUd/RrqYtRiJZWVZsGClOXF+vK5jOjZWMIvhYTE= github.com/xaionaro-go/kickcom v0.0.0-20250316223447-a1fe3d153d96/go.mod h1:Y8KwAUd/RrqYtRiJZWVZsGClOXF+vK5jOjZWMIvhYTE=
github.com/xaionaro-go/libsrt v0.0.0-20250105232601-e760c79b2bc3 h1:OeqZZ9i3KHaTKOxLtrF7nBkao299F5QK4acl9Nd/GrI= github.com/xaionaro-go/libsrt v0.0.0-20250505013920-61d894a3b7e9 h1:z5K1pa9cJZ2oqFNWyyxrxO/50Hv4Gn+1kkfXTHkoKNM=
github.com/xaionaro-go/libsrt v0.0.0-20250105232601-e760c79b2bc3/go.mod h1:7k3ZDcOLrS8ZKOu3P30tWMSno2blKqmH+9pKfDCZ2mE= github.com/xaionaro-go/libsrt v0.0.0-20250505013920-61d894a3b7e9/go.mod h1:yH5w7hdIyhbR4p0WfABH9fLIIK2ZwPd5CKk2K2ND/vo=
github.com/xaionaro-go/lockmap v0.0.0-20240901172806-e17aea364748 h1:SlB3zLAuLgRxdOo250gMUG/7hSiEU2NzEUNYbJDuI2A= github.com/xaionaro-go/lockmap v0.0.0-20240901172806-e17aea364748 h1:SlB3zLAuLgRxdOo250gMUG/7hSiEU2NzEUNYbJDuI2A=
github.com/xaionaro-go/lockmap v0.0.0-20240901172806-e17aea364748/go.mod h1:UO+SYZ5JAJGOnNkDycFrFwkaaPeSqAEQUM0TUp9Vb24= github.com/xaionaro-go/lockmap v0.0.0-20240901172806-e17aea364748/go.mod h1:UO+SYZ5JAJGOnNkDycFrFwkaaPeSqAEQUM0TUp9Vb24=
github.com/xaionaro-go/logrustash v0.0.0-20240804141650-d48034780a5f h1:mMrVrYtH9MyCUzBwPvuEntvqdCJ0zifCfqV6bHU6z1M= github.com/xaionaro-go/logrustash v0.0.0-20240804141650-d48034780a5f h1:mMrVrYtH9MyCUzBwPvuEntvqdCJ0zifCfqV6bHU6z1M=
@@ -1120,16 +1122,16 @@ github.com/xaionaro-go/object v0.0.0-20241026212449-753ce10ec94c h1:2CIIxTRox9au
github.com/xaionaro-go/object v0.0.0-20241026212449-753ce10ec94c/go.mod h1:vRcA12NWsR0IrS75eqnPBs5aVfCYmJy4bR+6DbJpBCg= github.com/xaionaro-go/object v0.0.0-20241026212449-753ce10ec94c/go.mod h1:vRcA12NWsR0IrS75eqnPBs5aVfCYmJy4bR+6DbJpBCg=
github.com/xaionaro-go/obs-grpc-proxy v0.0.0-20241018162120-5faf4e7a684a h1:PyX7XpLkj+eAwrPMFMGpvZIG4zBfzAfwNhwTtbORqN0= github.com/xaionaro-go/obs-grpc-proxy v0.0.0-20241018162120-5faf4e7a684a h1:PyX7XpLkj+eAwrPMFMGpvZIG4zBfzAfwNhwTtbORqN0=
github.com/xaionaro-go/obs-grpc-proxy v0.0.0-20241018162120-5faf4e7a684a/go.mod h1:exSKIlCibB0ww+ABDwH+YG/iNdqVfdzXBBg5LYxkxGw= github.com/xaionaro-go/obs-grpc-proxy v0.0.0-20241018162120-5faf4e7a684a/go.mod h1:exSKIlCibB0ww+ABDwH+YG/iNdqVfdzXBBg5LYxkxGw=
github.com/xaionaro-go/observability v0.0.0-20250420133500-5c4d2e045932 h1:uDPBczg4UmPPig0l7DLlIj5XCCVIlW+7KQ4THVtqOU8= github.com/xaionaro-go/observability v0.0.0-20250622130956-24b7017284e4 h1:dlsJ1l9WI+LTpB5z4f7FCLLrxJ7uV4tj9jWNqpibhA0=
github.com/xaionaro-go/observability v0.0.0-20250420133500-5c4d2e045932/go.mod h1:j5y9LVYd0v8sJa9Ks7ZyuwFxAUpaNFHBNKBkiYipxPM= github.com/xaionaro-go/observability v0.0.0-20250622130956-24b7017284e4/go.mod h1:GkNC0+nPJMOzotTzJVlH4+DvDg9cKrV9qqOe+8/kVw4=
github.com/xaionaro-go/player v0.0.0-20250427220051-e366ad8a1fb5 h1:6/7G8t/Ua31JjttkW/qJlhqpL+wAl6Ei1rFIeS9xvSw= github.com/xaionaro-go/player v0.0.0-20250622133132-5473824ef0d0 h1:XB5P4Gh8QKpC11xd8xMS+29xOdRtYb1Geflvs5vL1tw=
github.com/xaionaro-go/player v0.0.0-20250427220051-e366ad8a1fb5/go.mod h1:ZBT/KpMvYbHDcm8H7jo54C4xQ1gAgbebu3MHRu6iaqM= github.com/xaionaro-go/player v0.0.0-20250622133132-5473824ef0d0/go.mod h1:jQ00KD3KmELFS+VsQwkLYUOQ63i0lJA3RZSSP37V720=
github.com/xaionaro-go/proxy v0.0.0-20250111150848-1f0e7b262638 h1:w7Dt6Mpj36S2cWm0PkT2+D4kxrQbfCwjXZs1HqiILpE= github.com/xaionaro-go/proxy v0.0.0-20250525144747-579f5a891c15 h1:Qqoy9MDWq2Yh6uazAqQDzqU0doalTL3tRjNCo7X7GXA=
github.com/xaionaro-go/proxy v0.0.0-20250111150848-1f0e7b262638/go.mod h1:hOkJBFoMsnCDoZgpSPTHYbnevPgtpD16d9Xga91U+Eo= github.com/xaionaro-go/proxy v0.0.0-20250525144747-579f5a891c15/go.mod h1:6kxHtLmOImv/zwXSvaI1CW9Q8Pw+m5b891ZoejMKHPA=
github.com/xaionaro-go/pulse v0.0.0-20241023202712-7151fa00d4bb h1:9iHPI27CYbmJDhzEuCABQthE/DGVNvT60ybWvv3BV8w= github.com/xaionaro-go/pulse v0.0.0-20241023202712-7151fa00d4bb h1:9iHPI27CYbmJDhzEuCABQthE/DGVNvT60ybWvv3BV8w=
github.com/xaionaro-go/pulse v0.0.0-20241023202712-7151fa00d4bb/go.mod h1:cpYspI6YljhkUf1WLXLLDmeaaPFc3CnGLjDZf9dZ4no= github.com/xaionaro-go/pulse v0.0.0-20241023202712-7151fa00d4bb/go.mod h1:cpYspI6YljhkUf1WLXLLDmeaaPFc3CnGLjDZf9dZ4no=
github.com/xaionaro-go/recoder v0.0.0-20250503155018-6f353978d332 h1:jB5I8UE9UL6g7qQKaZ/g9wt3lIXxgkDDJX1cV37D5go= github.com/xaionaro-go/recoder v0.0.0-20250622133456-7bd1af83fda5 h1:RHlEvVGWUCdOxbRNL1b3qpv0LaNBZsf+n+wl9mIG3c4=
github.com/xaionaro-go/recoder v0.0.0-20250503155018-6f353978d332/go.mod h1:Twc+NcQQ+afg4RHxwqqo9pRGIaY7+QwpuAiYa7ClSLw= github.com/xaionaro-go/recoder v0.0.0-20250622133456-7bd1af83fda5/go.mod h1:RpW+gwH/6WZSZfu2zV6ffEU4G2rUHzFCyJ3Ps3K3wuo=
github.com/xaionaro-go/secret v0.0.0-20250111141743-ced12e1082c2 h1:QHpTWfyfmz65cE0MtFXe9fScdi+X0VIYR2wgolSYEUk= github.com/xaionaro-go/secret v0.0.0-20250111141743-ced12e1082c2 h1:QHpTWfyfmz65cE0MtFXe9fScdi+X0VIYR2wgolSYEUk=
github.com/xaionaro-go/secret v0.0.0-20250111141743-ced12e1082c2/go.mod h1:XKoHGZ4VKMbVBl8VotLIoWQdrB6Q7jnR++RbkiegZFU= github.com/xaionaro-go/secret v0.0.0-20250111141743-ced12e1082c2/go.mod h1:XKoHGZ4VKMbVBl8VotLIoWQdrB6Q7jnR++RbkiegZFU=
github.com/xaionaro-go/serializable v0.0.0-20250412140540-5ac572306599 h1:CzcQd6wLiqgjd8K/6UzR5uyt6sg4ut/kVxi6+FJMbdI= github.com/xaionaro-go/serializable v0.0.0-20250412140540-5ac572306599 h1:CzcQd6wLiqgjd8K/6UzR5uyt6sg4ut/kVxi6+FJMbdI=
@@ -1773,8 +1775,8 @@ google.golang.org/genproto v0.0.0-20220429170224-98d788798c3e/go.mod h1:8w6bsBMX
google.golang.org/genproto v0.0.0-20220505152158-f39f71e6c8f3/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4= google.golang.org/genproto v0.0.0-20220505152158-f39f71e6c8f3/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4=
google.golang.org/genproto v0.0.0-20220519153652-3a47de7e79bd/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4= google.golang.org/genproto v0.0.0-20220519153652-3a47de7e79bd/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4=
google.golang.org/genproto v0.0.0-20241015192408-796eee8c2d53 h1:Df6WuGvthPzc+JiQ/G+m+sNX24kc0aTBqoDN/0yyykE= google.golang.org/genproto v0.0.0-20241015192408-796eee8c2d53 h1:Df6WuGvthPzc+JiQ/G+m+sNX24kc0aTBqoDN/0yyykE=
google.golang.org/genproto/googleapis/api v0.0.0-20250106144421-5f5ef82da422 h1:GVIKPyP/kLIyVOgOnTwFOrvQaQUzOzGMCxgFUOEmm24= google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a h1:nwKuGPlUAt+aR+pcrkfFRrTU1BVrSmYyYMxYbUIVHr0=
google.golang.org/genproto/googleapis/api v0.0.0-20250106144421-5f5ef82da422/go.mod h1:b6h1vNKhxaSoEI+5jc3PJUCustfli/mRab7295pY7rw= google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a/go.mod h1:3kWAYMk1I75K4vykHtKt2ycnOgpA6974V7bREqbsenU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250409194420-de1ac958c67a h1:GIqLhp/cYUkuGuiT+vJk8vhOP86L4+SP5j8yXgeVpvI= google.golang.org/genproto/googleapis/rpc v0.0.0-20250409194420-de1ac958c67a h1:GIqLhp/cYUkuGuiT+vJk8vhOP86L4+SP5j8yXgeVpvI=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250409194420-de1ac958c67a/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= google.golang.org/genproto/googleapis/rpc v0.0.0-20250409194420-de1ac958c67a/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
@@ -1810,8 +1812,8 @@ google.golang.org/grpc v1.44.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ5
google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ= google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ=
google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/grpc v1.46.2/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= google.golang.org/grpc v1.46.2/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/grpc v1.71.1 h1:ffsFWr7ygTUscGPI0KKK6TLrGz0476KUvvsbqWK0rPI= google.golang.org/grpc v1.72.1 h1:HR03wO6eyZ7lknl75XlxABNVLLFc2PAb6mHlYh756mA=
google.golang.org/grpc v1.71.1/go.mod h1:H0GRtasmQOh9LkFoCPDu3ZrwUtD1YGE+b2vYBYd/8Ec= google.golang.org/grpc v1.72.1/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=

View File

@@ -87,7 +87,7 @@ func (c *Client) Serve(
) error { ) error {
ctx, cancelFn := context.WithCancel(ctx) ctx, cancelFn := context.WithCancel(ctx)
defer cancelFn() defer cancelFn()
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
<-ctx.Done() <-ctx.Done()
err := c.Close() err := c.Close()
if err != nil { if err != nil {

View File

@@ -120,7 +120,7 @@ func (m *Manager) Serve(
ctx, cancelFn := context.WithCancel(ctx) ctx, cancelFn := context.WithCancel(ctx)
defer cancelFn() defer cancelFn()
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
<-ctx.Done() <-ctx.Done()
err := m.Close() err := m.Close()
if err != nil { if err != nil {
@@ -160,7 +160,7 @@ func (m *Manager) addNewConnection(
conn net.Conn, conn net.Conn,
onReceivedMessage OnReceivedMessageFunc, onReceivedMessage OnReceivedMessageFunc,
) { ) {
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
m.handleConnection(ctx, conn, onReceivedMessage) m.handleConnection(ctx, conn, onReceivedMessage)
}) })
} }
@@ -175,7 +175,7 @@ func (m *Manager) handleConnection(
defer func() { logger.Tracef(ctx, "/handleConnection from %s (%s)", conn.RemoteAddr(), regMessage.Source) }() defer func() { logger.Tracef(ctx, "/handleConnection from %s (%s)", conn.RemoteAddr(), regMessage.Source) }()
ctx, cancelFn := context.WithCancel(ctx) ctx, cancelFn := context.WithCancel(ctx)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
<-ctx.Done() <-ctx.Done()
conn.Close() conn.Close()
}) })
@@ -291,7 +291,7 @@ func (m *Manager) processMessage(
err = multierror.Append(err, onReceivedMessage(ctx, source, message.Content)) err = multierror.Append(err, onReceivedMessage(ctx, source, message.Content))
errCh := make(chan error) errCh := make(chan error)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
for e := range errCh { for e := range errCh {
err = multierror.Append(err, e) err = multierror.Append(err, e)
} }
@@ -303,7 +303,7 @@ func (m *Manager) processMessage(
wg.Add(1) wg.Add(1)
{ {
dst := dst dst := dst
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer wg.Done() defer wg.Done()
errCh <- m.sendMessage(ctx, source, dst, message.Content) errCh <- m.sendMessage(ctx, source, dst, message.Content)
}) })
@@ -382,7 +382,7 @@ func (m *Manager) sendMessage(
return fmt.Errorf("process '%s' is not ever expected", destination) return fmt.Errorf("process '%s' is not ever expected", destination)
} }
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
conn, err := m.waitForReadyProcess(ctx, destination, reflect.TypeOf(content)) conn, err := m.waitForReadyProcess(ctx, destination, reflect.TypeOf(content))
if err != nil { if err != nil {
logger.Errorf( logger.Errorf(
@@ -400,7 +400,7 @@ func (m *Manager) sendMessage(
Content: content, Content: content,
} }
h := m.connLocker.Lock(context.Background(), destination) h := m.connLocker.Lock(context.Background(), destination) // TODO: should we use the provided ctx?
defer h.Unlock() defer h.Unlock()
defer time.Sleep( defer time.Sleep(
100 * time.Millisecond, 100 * time.Millisecond,

View File

@@ -86,14 +86,14 @@ func NewCodeReceiver(
}), }),
} }
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
<-ctx.Done() <-ctx.Done()
listener.Close() listener.Close()
srv.Close() srv.Close()
close(codeCh) close(codeCh)
}) })
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
srv.Serve(listener) srv.Serve(listener)
}) })

View File

@@ -223,7 +223,7 @@ func (p *P2P) Start(
} }
p.waitGroup.Add(1) p.waitGroup.Add(1)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer p.waitGroup.Done() defer p.waitGroup.Done()
err := p.acceptPeersLoop(ctx, 0, myIDChan0) err := p.acceptPeersLoop(ctx, 0, myIDChan0)
if err != nil && !errors.Is(err, context.Canceled) { if err != nil && !errors.Is(err, context.Canceled) {
@@ -232,7 +232,7 @@ func (p *P2P) Start(
}) })
p.waitGroup.Add(1) p.waitGroup.Add(1)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer p.waitGroup.Done() defer p.waitGroup.Done()
err := p.acceptPeersLoop(ctx, 1, myIDChan1) err := p.acceptPeersLoop(ctx, 1, myIDChan1)
if err != nil && !errors.Is(err, context.Canceled) { if err != nil && !errors.Is(err, context.Canceled) {

View File

@@ -39,7 +39,7 @@ func (p *peerClient) init(
) error { ) error {
ctx, cancelFn := context.WithCancel(ctx) ctx, cancelFn := context.WithCancel(ctx)
p.cancelFn = cancelFn p.cancelFn = cancelFn
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
<-ctx.Done() <-ctx.Done()
if err := p.peer.network.removePeer(p.peer.id); err != nil { if err := p.peer.network.removePeer(p.peer.id); err != nil {
logger.Errorf(ctx, "unable to remove peer '%s': %v", p.peer.id, err) logger.Errorf(ctx, "unable to remove peer '%s': %v", p.peer.id, err)

View File

@@ -36,7 +36,7 @@ func (p *peerServer) init(
ctx context.Context, ctx context.Context,
) error { ) error {
ctx, cancelFn := context.WithCancel(ctx) ctx, cancelFn := context.WithCancel(ctx)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
<-ctx.Done() <-ctx.Done()
if err := p.peer.network.removePeer(p.peer.id); err != nil { if err := p.peer.network.removePeer(p.peer.id); err != nil {
logger.Errorf(ctx, "unable to remove peer '%s': %v", p.peer.id, err) logger.Errorf(ctx, "unable to remove peer '%s': %v", p.peer.id, err)
@@ -59,7 +59,7 @@ func (p *peerServer) init(
} }
p.waitGroup.Add(1) p.waitGroup.Add(1)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer cancelFn() defer cancelFn()
defer p.waitGroup.Done() defer p.waitGroup.Done()
logger.Infof(ctx, "started the gRPC server at '%s'", grpcServerListener.Addr()) logger.Infof(ctx, "started the gRPC server at '%s'", grpcServerListener.Addr())

View File

@@ -52,7 +52,7 @@ func NewChatHandler(
messagesOutChan: make(chan streamcontrol.ChatMessage, 100), messagesOutChan: make(chan streamcontrol.ChatMessage, 100),
} }
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer func() { defer func() {
close(h.messagesOutChan) close(h.messagesOutChan)
}() }()

View File

@@ -402,7 +402,7 @@ func (k *Kick) GetChatMessagesChan(
defer func() { logger.Debugf(ctx, "/GetChatMessagesChan") }() defer func() { logger.Debugf(ctx, "/GetChatMessagesChan") }()
outCh := make(chan streamcontrol.ChatMessage) outCh := make(chan streamcontrol.ChatMessage)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer func() { defer func() {
logger.Debugf(ctx, "closing the messages channel") logger.Debugf(ctx, "closing the messages channel")
close(outCh) close(outCh)

View File

@@ -289,7 +289,7 @@ func (s StreamControllers) ApplyProfiles(
wg.Add(1) wg.Add(1)
{ {
p := p p := p
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer wg.Done() defer wg.Done()
profileType := reflect.TypeOf(p) profileType := reflect.TypeOf(p)
c, ok := m[profileType] c, ok := m[profileType]
@@ -304,7 +304,7 @@ func (s StreamControllers) ApplyProfiles(
}) })
} }
} }
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
wg.Wait() wg.Wait()
close(errCh) close(errCh)
}) })
@@ -408,7 +408,7 @@ func (s StreamControllers) concurrently(
wg.Add(1) wg.Add(1)
{ {
c := c c := c
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer wg.Done() defer wg.Done()
if err := callback(c); err != nil { if err := callback(c); err != nil {
errCh <- err errCh <- err
@@ -416,7 +416,7 @@ func (s StreamControllers) concurrently(
}) })
} }
} }
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
wg.Wait() wg.Wait()
close(errCh) close(errCh)
}) })

View File

@@ -64,7 +64,7 @@ func newChatHandler(
} }
h.waitGroup.Add(1) h.waitGroup.Add(1)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer h.waitGroup.Done() defer h.waitGroup.Done()
defer func() { defer func() {
h.client.Close() h.client.Close()

View File

@@ -553,7 +553,7 @@ func (t *Twitch) getNewClientCode(
var resultErr error var resultErr error
errCh := make(chan error) errCh := make(chan error)
errWg.Add(1) errWg.Add(1)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
errWg.Done() errWg.Done()
for err := range errCh { for err := range errCh {
errmon.ObserveErrorCtx(ctx, err) errmon.ObserveErrorCtx(ctx, err)
@@ -575,7 +575,7 @@ func (t *Twitch) getNewClientCode(
wg.Add(1) wg.Add(1)
{ {
listenPort := listenPort listenPort := listenPort
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer func() { logger.Debugf(ctx, "ended the oauth handler at port %d", listenPort) }() defer func() { logger.Debugf(ctx, "ended the oauth handler at port %d", listenPort) }()
defer wg.Done() defer wg.Done()
authURL := GetAuthorizationURL( authURL := GetAuthorizationURL(
@@ -630,7 +630,7 @@ func (t *Twitch) getNewClientCode(
} }
wg.Add(1) wg.Add(1)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer wg.Done() defer wg.Done()
t := time.NewTicker(time.Second) t := time.NewTicker(time.Second)
defer t.Stop() defer t.Stop()
@@ -649,7 +649,7 @@ func (t *Twitch) getNewClientCode(
} }
}) })
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
wg.Wait() wg.Wait()
close(errCh) close(errCh)
}) })
@@ -838,7 +838,7 @@ func (t *Twitch) GetChatMessagesChan(
defer func() { logger.Debugf(ctx, "/GetChatMessagesChan") }() defer func() { logger.Debugf(ctx, "/GetChatMessagesChan") }()
outCh := make(chan streamcontrol.ChatMessage) outCh := make(chan streamcontrol.ChatMessage)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer func() { defer func() {
logger.Debugf(ctx, "closing the messages channel") logger.Debugf(ctx, "closing the messages channel")
close(outCh) close(outCh)

View File

@@ -83,7 +83,7 @@ func NewChatListener(
messagesOutChan: make(chan streamcontrol.ChatMessage, 100), messagesOutChan: make(chan streamcontrol.ChatMessage, 100),
} }
l.wg.Add(1) l.wg.Add(1)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer l.wg.Done() defer l.wg.Done()
defer func() { defer func() {
logger.Debugf(ctx, "the listener loop is finished") logger.Debugf(ctx, "the listener loop is finished")

View File

@@ -86,7 +86,7 @@ func New(
return nil, fmt.Errorf("connection verification failed: %w", err) return nil, fmt.Errorf("connection verification failed: %w", err)
} }
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
ticker := time.NewTicker(time.Minute) ticker := time.NewTicker(time.Minute)
defer ticker.Stop() defer ticker.Stop()
for { for {
@@ -232,7 +232,7 @@ func getToken(ctx context.Context, cfg Config) (*oauth2.Token, error) {
var resultErr error var resultErr error
errCh := make(chan error) errCh := make(chan error)
errWg.Add(1) errWg.Add(1)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
errWg.Done() errWg.Done()
for err := range errCh { for err := range errCh {
errmon.ObserveErrorCtx(ctx, err) errmon.ObserveErrorCtx(ctx, err)
@@ -256,7 +256,7 @@ func getToken(ctx context.Context, cfg Config) (*oauth2.Token, error) {
wg.Add(1) wg.Add(1)
{ {
oauthCfg := oauthCfg oauthCfg := oauthCfg
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer wg.Done() defer wg.Done()
oauthHandlerArg := oauthhandler.OAuthHandlerArgument{ oauthHandlerArg := oauthhandler.OAuthHandlerArgument{
AuthURL: oauthCfg.AuthCodeURL("state-token", oauth2.AccessTypeOffline), AuthURL: oauthCfg.AuthCodeURL("state-token", oauth2.AccessTypeOffline),
@@ -295,7 +295,7 @@ func getToken(ctx context.Context, cfg Config) (*oauth2.Token, error) {
} }
wg.Add(1) wg.Add(1)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer wg.Done() defer wg.Done()
t := time.NewTicker(time.Second) t := time.NewTicker(time.Second)
defer t.Stop() defer t.Stop()
@@ -317,7 +317,7 @@ func getToken(ctx context.Context, cfg Config) (*oauth2.Token, error) {
} }
}) })
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
wg.Wait() wg.Wait()
close(errCh) close(errCh)
}) })
@@ -1039,7 +1039,7 @@ func (yt *YouTube) startChatListener(
return fmt.Errorf("unable to initialize the chat listener instance: %w", err) return fmt.Errorf("unable to initialize the chat listener instance: %w", err)
} }
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
err := yt.processChatListener(ctx, chatListener) err := yt.processChatListener(ctx, chatListener)
if err != nil && !errors.Is(err, context.Canceled) { if err != nil && !errors.Is(err, context.Canceled) {
logger.Errorf(ctx, "unable to process the chat listener for '%s': %v", videoID, err) logger.Errorf(ctx, "unable to process the chat listener for '%s': %v", videoID, err)
@@ -1392,7 +1392,7 @@ func (yt *YouTube) GetChatMessagesChan(
defer logger.Debugf(ctx, "/GetChatMessagesChan") defer logger.Debugf(ctx, "/GetChatMessagesChan")
outCh := make(chan streamcontrol.ChatMessage) outCh := make(chan streamcontrol.ChatMessage)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer func() { defer func() {
logger.Debugf(ctx, "closing the messages channel") logger.Debugf(ctx, "closing the messages channel")
close(outCh) close(outCh)

View File

@@ -32,7 +32,7 @@ func (d *StreamD) startListeningForChatMessages(
if err != nil { if err != nil {
return fmt.Errorf("unable to get the channel for chat messages of '%s': %w", platName, err) return fmt.Errorf("unable to get the channel for chat messages of '%s': %w", platName, err)
} }
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer logger.Debugf(ctx, "/startListeningForChatMessages(ctx, '%s')", platName) defer logger.Debugf(ctx, "/startListeningForChatMessages(ctx, '%s')", platName)
for { for {
select { select {

View File

@@ -421,7 +421,7 @@ func unwrapStreamDChan[E any, R any, S receiver[R]](
} }
r := make(chan E) r := make(chan E)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer closer.Close() defer closer.Close()
defer cancelFn() defer cancelFn()
for { for {

View File

@@ -47,7 +47,7 @@ func (d *StreamD) submitEvent(
exprCtx := objToMap(ev) exprCtx := objToMap(ev)
for _, rule := range d.Config.TriggerRules { for _, rule := range d.Config.TriggerRules {
if rule.EventQuery.Match(ev) { if rule.EventQuery.Match(ev) {
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
err := d.doAction(ctx, rule.Action, exprCtx) err := d.doAction(ctx, rule.Action, exprCtx)
if err != nil { if err != nil {
logger.Errorf(ctx, "unable to perform action %s: %v", rule.Action, err) logger.Errorf(ctx, "unable to perform action %s: %v", rule.Action, err)
@@ -131,13 +131,13 @@ func eventSubToChan[T any](
} }
if onReady != nil { if onReady != nil {
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer mutex.Unlock() defer mutex.Unlock()
onReady(ctx, r) onReady(ctx, r)
}) })
} }
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
<-ctx.Done() <-ctx.Done()
mutex.Lock() mutex.Lock()

View File

@@ -61,7 +61,7 @@ func (d *StreamD) getImageBytes(
} }
func (d *StreamD) initImageTaker(ctx context.Context) error { func (d *StreamD) initImageTaker(ctx context.Context) error {
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer logger.Debugf(ctx, "/imageTaker") defer logger.Debugf(ctx, "/imageTaker")
ch, err := d.SubscribeToDashboardChanges(ctx) ch, err := d.SubscribeToDashboardChanges(ctx)
if err != nil { if err != nil {
@@ -106,7 +106,7 @@ func (d *StreamD) restartImageTakerNoLock(ctx context.Context) error {
elName, el := elName, el elName, el := elName, el
_ = el _ = el
d.imageTakerWG.Add(1) d.imageTakerWG.Add(1)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer d.imageTakerWG.Done() defer d.imageTakerWG.Done()
logger.Debugf(ctx, "taker of image '%s'", elName) logger.Debugf(ctx, "taker of image '%s'", elName)
defer logger.Debugf(ctx, "/taker of image '%s'", elName) defer logger.Debugf(ctx, "/taker of image '%s'", elName)

View File

@@ -73,7 +73,7 @@ func (r *obsRestarter) updateConfigNoLock(
ctx, cancelFn := context.WithCancel(ctx) ctx, cancelFn := context.WithCancel(ctx)
r.cancelFunc = cancelFn r.cancelFunc = cancelFn
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
d.obsRestarter.loop(ctx, execCmd) d.obsRestarter.loop(ctx, execCmd)
}) })
return nil return nil

View File

@@ -236,7 +236,7 @@ func (d *StreamD) initChatMessagesStorage(ctx context.Context) (_err error) {
logger.Debugf(ctx, "initChatMessagesStorage") logger.Debugf(ctx, "initChatMessagesStorage")
defer logger.Debugf(ctx, "/initChatMessagesStorage: %v", _err) defer logger.Debugf(ctx, "/initChatMessagesStorage: %v", _err)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
logger.Debugf(ctx, "initChatMessagesStorage-refresherLoop") logger.Debugf(ctx, "initChatMessagesStorage-refresherLoop")
defer logger.Debugf(ctx, "/initChatMessagesStorage-refresherLoop") defer logger.Debugf(ctx, "/initChatMessagesStorage-refresherLoop")
t := time.NewTicker(5 * time.Second) t := time.NewTicker(5 * time.Second)
@@ -271,7 +271,7 @@ func (d *StreamD) secretsProviderUpdater(ctx context.Context) (_err error) {
}) })
logger.Debugf(ctx, "updated the secrets") logger.Debugf(ctx, "updated the secrets")
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@@ -392,7 +392,7 @@ func (d *StreamD) InitCache(ctx context.Context) error {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer wg.Done() defer wg.Done()
_changedCache := d.initTwitchData(ctx) _changedCache := d.initTwitchData(ctx)
d.normalizeTwitchData() d.normalizeTwitchData()
@@ -402,7 +402,7 @@ func (d *StreamD) InitCache(ctx context.Context) error {
}) })
wg.Add(1) wg.Add(1)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer wg.Done() defer wg.Done()
_changedCache := d.initKickData(ctx) _changedCache := d.initKickData(ctx)
d.normalizeKickData() d.normalizeKickData()
@@ -412,7 +412,7 @@ func (d *StreamD) InitCache(ctx context.Context) error {
}) })
wg.Add(1) wg.Add(1)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer wg.Done() defer wg.Done()
_changedCache := d.initYoutubeData(ctx) _changedCache := d.initYoutubeData(ctx)
d.normalizeYoutubeData() d.normalizeYoutubeData()
@@ -593,12 +593,12 @@ func (d *StreamD) onUpdateConfig(
errCh := make(chan error, 1) errCh := make(chan error, 1)
wg.Add(1) wg.Add(1)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer wg.Done() defer wg.Done()
errCh <- d.updateOBSRestarterConfig(ctx) errCh <- d.updateOBSRestarterConfig(ctx)
}) })
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
wg.Wait() wg.Wait()
close(errCh) close(errCh)
}) })
@@ -655,7 +655,7 @@ func (d *StreamD) StartStream(
defer func() { defer func() {
d.StreamStatusCache.InvalidateCache(ctx) d.StreamStatusCache.InvalidateCache(ctx)
if platID == youtube.ID { if platID == youtube.ID {
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
now := time.Now() now := time.Now()
time.Sleep(10 * time.Second) time.Sleep(10 * time.Second)
for time.Since(now) < 5*time.Minute { for time.Since(now) < 5*time.Minute {
@@ -1639,7 +1639,7 @@ func (d *StreamD) WaitForStreamPublisher(
} }
ch := make(chan struct{}) ch := make(chan struct{})
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
select { select {
case <-pubCh: case <-pubCh:
close(ch) close(ch)

View File

@@ -62,7 +62,7 @@ func (t *Timer) start(ctx context.Context) {
runningTimer := time.NewTimer(time.Until(t.TriggerAt)) runningTimer := time.NewTimer(time.Until(t.TriggerAt))
t.RunningTimer = runningTimer t.RunningTimer = runningTimer
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
t.stop(ctx) t.stop(ctx)
@@ -101,7 +101,7 @@ func (t *Timer) trigger(ctx context.Context) {
logger.Debugf(ctx, "trigger (%T)", t.Timer.Action) logger.Debugf(ctx, "trigger (%T)", t.Timer.Action)
defer logger.Debugf(ctx, "/trigger (%T)", t.Timer.Action) defer logger.Debugf(ctx, "/trigger (%T)", t.Timer.Action)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
err := t.StreamD.RemoveTimer(ctx, t.Timer.ID) err := t.StreamD.RemoveTimer(ctx, t.Timer.ID)
if err != nil { if err != nil {
logger.Error(ctx, "unable to remove timer %d: %v", t.Timer.ID, err) logger.Error(ctx, "unable to remove timer %d: %v", t.Timer.ID, err)

View File

@@ -50,7 +50,7 @@ func (p *Panel) addChatUI(ctx context.Context, ui chatUIInterface) {
p.chatUIs = append(p.chatUIs, ui) p.chatUIs = append(p.chatUIs, ui)
logger.Debugf(ctx, "len(p.chatUI) == %d", len(p.chatUIs)) logger.Debugf(ctx, "len(p.chatUI) == %d", len(p.chatUIs))
}) })
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
<-ctx.Done() <-ctx.Done()
p.chatUIsLocker.Do(ctx, func() { p.chatUIsLocker.Do(ctx, func() {
p.chatUIs = slices.DeleteFunc(p.chatUIs, func(cmp chatUIInterface) bool { p.chatUIs = slices.DeleteFunc(p.chatUIs, func(cmp chatUIInterface) bool {
@@ -72,7 +72,7 @@ func (p *Panel) initChatMessagesHandler(ctx context.Context) error {
return fmt.Errorf("unable to subscribe to chat messages: %w", err) return fmt.Errorf("unable to subscribe to chat messages: %w", err)
} }
observability.GoSafe(ctx, func() { observability.GoSafeRestartable(ctx, func(ctx context.Context) {
p.messageReceiverLoop(ctx, msgCh) p.messageReceiverLoop(ctx, msgCh)
}) })
return nil return nil
@@ -134,7 +134,7 @@ func (p *Panel) onReceiveMessage(
if !notificationsEnabled { if !notificationsEnabled {
return return
} }
observability.GoSafe(ctx, func() { observability.GoSafe(ctx, func(ctx context.Context) {
commandTemplate := xsync.DoR1(ctx, &p.configLocker, func() string { commandTemplate := xsync.DoR1(ctx, &p.configLocker, func() string {
return p.Config.Chat.CommandOnReceiveMessage return p.Config.Chat.CommandOnReceiveMessage
}) })
@@ -146,7 +146,7 @@ func (p *Panel) onReceiveMessage(
p.execCommand(ctx, commandTemplate, msg) p.execCommand(ctx, commandTemplate, msg)
}) })
observability.GoSafe(ctx, func() { observability.GoSafe(ctx, func(ctx context.Context) {
logger.Debugf(ctx, "SendNotification") logger.Debugf(ctx, "SendNotification")
defer logger.Debugf(ctx, "/SendNotification") defer logger.Debugf(ctx, "/SendNotification")
p.app.SendNotification(&fyne.Notification{ p.app.SendNotification(&fyne.Notification{
@@ -154,7 +154,7 @@ func (p *Panel) onReceiveMessage(
Content: msg.Username + ": " + msg.Message, Content: msg.Username + ": " + msg.Message,
}) })
}) })
observability.GoSafe(ctx, func() { observability.GoSafe(ctx, func(ctx context.Context) {
soundEnabled := xsync.DoR1(ctx, &p.configLocker, func() bool { soundEnabled := xsync.DoR1(ctx, &p.configLocker, func() bool {
return p.Config.Chat.ReceiveMessageSoundAlarmEnabled() return p.Config.Chat.ReceiveMessageSoundAlarmEnabled()
}) })

View File

@@ -141,7 +141,7 @@ func (ui *chatUIAsList) Remove(
delete(ui.ItemsByMessageID, msg.MessageID) delete(ui.ItemsByMessageID, msg.MessageID)
delete(ui.ItemsByCanvasObject, item.Container) delete(ui.ItemsByCanvasObject, item.Container)
}) })
observability.Go(ctx, func() { ui.CanvasObject.Refresh() }) // TODO: remove the observability.Go observability.Go(ctx, func(context.Context) { ui.CanvasObject.Refresh() }) // TODO: remove the observability.Go
} }
func (ui *chatUIAsList) GetTotalHeight( func (ui *chatUIAsList) GetTotalHeight(

View File

@@ -101,7 +101,7 @@ func (ui *chatUIAsText) init(
nil, nil,
ui.Text, ui.Text,
) )
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
ui.Rebuild(ctx) ui.Rebuild(ctx)
}) })
ui.Panel.addChatUI(ctx, ui) ui.Panel.addChatUI(ctx, ui)
@@ -166,7 +166,7 @@ func (ui *chatUIAsText) Remove(
ui.ItemLocker.Do(ctx, func() { ui.ItemLocker.Do(ctx, func() {
delete(ui.ItemsByMessageID, msg.MessageID) delete(ui.ItemsByMessageID, msg.MessageID)
}) })
observability.Go(ctx, func() { ui.CanvasObject.Refresh() }) // TODO: remove the observability.Go observability.Go(ctx, func(context.Context) { ui.CanvasObject.Refresh() }) // TODO: remove the observability.Go
} }
func (ui *chatUIAsText) GetTotalHeight( func (ui *chatUIAsText) GetTotalHeight(

View File

@@ -189,7 +189,7 @@ func (w *dashboardWindow) renderStreamStatus(ctx context.Context) {
bwIn := float64(bytesInDiff) * 8 / tsDiff.Seconds() / 1000 bwIn := float64(bytesInDiff) * 8 / tsDiff.Seconds() / 1000
bwOut := float64(bytesOutDiff) * 8 / tsDiff.Seconds() / 1000 bwOut := float64(bytesOutDiff) * 8 / tsDiff.Seconds() / 1000
newAppStatusText := fmt.Sprintf("%4.0fKb/s | %4.0fKb/s", bwIn, bwOut) newAppStatusText := fmt.Sprintf("%4.0fKb/s | %4.0fKb/s", bwIn, bwOut)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
w.appStatus.SetText(newAppStatusText) w.appStatus.SetText(newAppStatusText)
}) })
} }
@@ -201,7 +201,7 @@ func (w *dashboardWindow) renderStreamStatus(ctx context.Context) {
w.Panel.streamStatusLocker.Do(ctx, func() { w.Panel.streamStatusLocker.Do(ctx, func() {
w.streamStatusLocker.Do(ctx, func() { w.streamStatusLocker.Do(ctx, func() {
for platID, dst := range w.streamStatus { for platID, dst := range w.streamStatus {
observability.CallSafe(ctx, func() { observability.CallSafe(ctx, func(ctx context.Context) {
src := w.Panel.streamStatus[platID] src := w.Panel.streamStatus[platID]
if src == nil { if src == nil {
logger.Debugf(ctx, "status for '%s' is not set", platID) logger.Debugf(ctx, "status for '%s' is not set", platID)
@@ -210,7 +210,7 @@ func (w *dashboardWindow) renderStreamStatus(ctx context.Context) {
defer dst.Refresh() defer dst.Refresh()
if !src.BackendIsEnabled { if !src.BackendIsEnabled {
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
dst.SetText("disabled") dst.SetText("disabled")
}) })
return return
@@ -218,7 +218,7 @@ func (w *dashboardWindow) renderStreamStatus(ctx context.Context) {
if src.BackendError != nil { if src.BackendError != nil {
dst.Importance = widget.LowImportance dst.Importance = widget.LowImportance
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
dst.SetText("error") dst.SetText("error")
}) })
return return
@@ -226,7 +226,7 @@ func (w *dashboardWindow) renderStreamStatus(ctx context.Context) {
if !src.IsActive { if !src.IsActive {
dst.Importance = widget.DangerImportance dst.Importance = widget.DangerImportance
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
dst.SetText("stopped") dst.SetText("stopped")
}) })
return return
@@ -234,7 +234,7 @@ func (w *dashboardWindow) renderStreamStatus(ctx context.Context) {
dst.Importance = widget.SuccessImportance dst.Importance = widget.SuccessImportance
if src.StartedAt == nil { if src.StartedAt == nil {
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
dst.SetText("started") dst.SetText("started")
}) })
return return
@@ -247,7 +247,7 @@ func (w *dashboardWindow) renderStreamStatus(ctx context.Context) {
viewerCountString = fmt.Sprintf(" (%d)", *src.ViewersCount) viewerCountString = fmt.Sprintf(" (%d)", *src.ViewersCount)
} }
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
dst.SetText(fmt.Sprintf("%s%s", duration.Truncate(time.Second).String(), viewerCountString)) dst.SetText(fmt.Sprintf("%s%s", duration.Truncate(time.Second).String(), viewerCountString))
}) })
}) })
@@ -356,7 +356,7 @@ func (p *Panel) newDashboardWindow(
c.Move(pos) c.Move(pos)
} }
w.chat.ScrollToBottom(ctx) w.chat.ScrollToBottom(ctx)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
time.Sleep(time.Second) time.Sleep(time.Second)
w.chat.ScrollToBottom(ctx) w.chat.ScrollToBottom(ctx)
}) })
@@ -758,7 +758,7 @@ func (w *dashboardWindow) startUpdatingNoLock(
ctx, cancelFunc := context.WithCancel(ctx) ctx, cancelFunc := context.WithCancel(ctx)
w.stopUpdatingFunc = cancelFunc w.stopUpdatingFunc = cancelFunc
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
t := time.NewTicker(time.Second) t := time.NewTicker(time.Second)
defer t.Stop() defer t.Stop()
oldSize := w.Window.Canvas().Size() oldSize := w.Window.Canvas().Size()
@@ -777,7 +777,7 @@ func (w *dashboardWindow) startUpdatingNoLock(
} }
}) })
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
t := time.NewTicker(time.Second) t := time.NewTicker(time.Second)
defer t.Stop() defer t.Stop()
oldSize := w.chat.ScrollingContainer.Content.MinSize() oldSize := w.chat.ScrollingContainer.Content.MinSize()
@@ -797,7 +797,7 @@ func (w *dashboardWindow) startUpdatingNoLock(
}) })
w.renderLocalStatus(ctx) w.renderLocalStatus(ctx)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
t := time.NewTicker(2 * time.Second) t := time.NewTicker(2 * time.Second)
defer t.Stop() defer t.Stop()
for { for {
@@ -817,12 +817,12 @@ func (w *dashboardWindow) startUpdatingNoLock(
return return
} }
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
w.updateImages(ctx, cfg.Dashboard) w.updateImages(ctx, cfg.Dashboard)
w.updateStreamStatus(ctx) w.updateStreamStatus(ctx)
w.renderStreamStatus(ctx) w.renderStreamStatus(ctx)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
t := time.NewTicker(250 * time.Millisecond) t := time.NewTicker(250 * time.Millisecond)
defer t.Stop() defer t.Stop()
for { for {
@@ -836,7 +836,7 @@ func (w *dashboardWindow) startUpdatingNoLock(
} }
}) })
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
t := time.NewTicker(2 * time.Second) t := time.NewTicker(2 * time.Second)
defer t.Stop() defer t.Stop()
for { for {
@@ -995,7 +995,7 @@ func (w *dashboardWindow) updateImagesNoLock(
continue continue
} }
wg.Add(1) wg.Add(1)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer wg.Done() defer wg.Done()
img, changed, err := w.getImage(ctx, streamdconsts.ImageID(el.ElementName)) img, changed, err := w.getImage(ctx, streamdconsts.ImageID(el.ElementName))
if err != nil { if err != nil {
@@ -1025,7 +1025,7 @@ func (w *dashboardWindow) updateImagesNoLock(
} }
wg.Add(1) wg.Add(1)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer wg.Done() defer wg.Done()
img, changed, err := w.getImage(ctx, consts.ImageScreenshot) img, changed, err := w.getImage(ctx, consts.ImageScreenshot)
if err != nil { if err != nil {
@@ -1111,7 +1111,7 @@ func (p *Panel) newDashboardSettingsWindow(ctx context.Context) {
p.dashboardLocker.Do(ctx, func() { p.dashboardLocker.Do(ctx, func() {
if p.dashboardWindow != nil { if p.dashboardWindow != nil {
p.dashboardWindow.Window.Close() p.dashboardWindow.Window.Close()
observability.Go(ctx, func() { p.focusDashboardWindow(ctx) }) observability.Go(ctx, func(ctx context.Context) { p.focusDashboardWindow(ctx) })
} }
}) })

View File

@@ -70,7 +70,7 @@ func (p *Panel) statusPanelSet(text string) {
newText = "status: " + text newText = "status: " + text
}) })
if panel != nil { if panel != nil {
observability.CallSafe(ctx, func() { observability.CallSafe(ctx, func(ctx context.Context) {
panel.SetText(newText) panel.SetText(newText)
}) })
} }

View File

@@ -30,7 +30,7 @@ func (p *Panel) initEventSensor(ctx context.Context) {
p.eventSensor = es p.eventSensor = es
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
logger.Debugf(ctx, "eventSensor") logger.Debugf(ctx, "eventSensor")
defer logger.Debugf(ctx, "/eventSensor") defer logger.Debugf(ctx, "/eventSensor")
es.Loop(ctx, p.StreamD) es.Loop(ctx, p.StreamD)

View File

@@ -15,7 +15,7 @@ func (p *Panel) initFyneHacks(ctx context.Context) {
} }
func (p *Panel) initWindowsHealthChecker(ctx context.Context) { func (p *Panel) initWindowsHealthChecker(ctx context.Context) {
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
logger.Debugf(ctx, "initWindowsHealthChecker") logger.Debugf(ctx, "initWindowsHealthChecker")
defer logger.Debugf(ctx, "/initWindowsHealthChecker") defer logger.Debugf(ctx, "/initWindowsHealthChecker")
@@ -30,7 +30,6 @@ func (p *Panel) initWindowsHealthChecker(ctx context.Context) {
p.checkAndFixWindowsHealth(ctx) p.checkAndFixWindowsHealth(ctx)
} }
} }
}) })
} }
@@ -81,7 +80,7 @@ func checkAndFixPermanentWindowHealth(
logger.Warnf(ctx, "window %v has a broken event queue, fixing") logger.Warnf(ctx, "window %v has a broken event queue, fixing")
window.InitEventQueue() window.InitEventQueue()
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
window.RunEventQueue() window.RunEventQueue()
}) })
} }

View File

@@ -312,7 +312,7 @@ func (p *Panel) reinitScreenshoter(ctx context.Context) {
ctx, cancelFunc := context.WithCancel(ctx) ctx, cancelFunc := context.WithCancel(ctx)
p.screenshoterClose = cancelFunc p.screenshoterClose = cancelFunc
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
p.Screenshoter.Loop( p.Screenshoter.Loop(
ctx, ctx,
200*time.Millisecond, 200*time.Millisecond,

View File

@@ -79,7 +79,7 @@ func (p *Panel) startMonitorPage(
p.monitorPageLocker.Do(ctx, func() { p.monitorPageLocker.Do(ctx, func() {
if p.monitorPage == nil { if p.monitorPage == nil {
observability.Go(ctx, func() { // TODO: get rid of this ugliness observability.Go(ctx, func(ctx context.Context) { // TODO: get rid of this ugliness
t := time.NewTicker(100 * time.Millisecond) t := time.NewTicker(100 * time.Millisecond)
defer t.Stop() defer t.Stop()
for { for {
@@ -220,7 +220,7 @@ func (p *monitorPage) startUpdatingNoLock(
return return
} }
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
updateData := func() { updateData := func() {
inStreams, err := streamD.ListIncomingStreams(ctx) inStreams, err := streamD.ListIncomingStreams(ctx)
if err != nil { if err != nil {
@@ -534,7 +534,7 @@ func (w streamDAsStreamPlayersServerType) WaitPublisherChan(
} }
result := make(chan streamplayer.Publisher) result := make(chan streamplayer.Publisher)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer close(result) defer close(result)
select { select {
case <-ctx.Done(): case <-ctx.Done():

View File

@@ -282,7 +282,7 @@ func (p *Panel) Loop(ctx context.Context, opts ...LoopOption) (_err error) {
closeLoadingWindow := func() { closeLoadingWindow := func() {
logger.Tracef(ctx, "closing the loading window") logger.Tracef(ctx, "closing the loading window")
loadingWindow.Hide() loadingWindow.Hide()
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
loadingWindow.Hide() loadingWindow.Hide()
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
@@ -292,7 +292,7 @@ func (p *Panel) Loop(ctx context.Context, opts ...LoopOption) (_err error) {
}) })
} }
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
if streamD, ok := p.StreamD.(*client.Client); ok { if streamD, ok := p.StreamD.(*client.Client); ok {
p.setStatusFunc("Connecting...") p.setStatusFunc("Connecting...")
err := p.startOAuthListenerForRemoteStreamD(ctx, streamD) err := p.startOAuthListenerForRemoteStreamD(ctx, streamD)
@@ -314,7 +314,7 @@ func (p *Panel) Loop(ctx context.Context, opts ...LoopOption) (_err error) {
streamD := p.StreamD.(*streamd.StreamD) streamD := p.StreamD.(*streamd.StreamD)
streamD.AddOAuthListenPort(cfg.OAuth.ListenPorts.Twitch) streamD.AddOAuthListenPort(cfg.OAuth.ListenPorts.Twitch)
streamD.AddOAuthListenPort(cfg.OAuth.ListenPorts.Kick) streamD.AddOAuthListenPort(cfg.OAuth.ListenPorts.Kick)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
<-ctx.Done() <-ctx.Done()
streamD.RemoveOAuthListenPort(cfg.OAuth.ListenPorts.Twitch) streamD.RemoveOAuthListenPort(cfg.OAuth.ListenPorts.Twitch)
streamD.RemoveOAuthListenPort(cfg.OAuth.ListenPorts.Kick) streamD.RemoveOAuthListenPort(cfg.OAuth.ListenPorts.Kick)
@@ -355,7 +355,7 @@ func (p *Panel) Loop(ctx context.Context, opts ...LoopOption) (_err error) {
} }
if initCfg.AutoUpdater != nil { if initCfg.AutoUpdater != nil {
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
p.checkForUpdates(ctx, initCfg.AutoUpdater) p.checkForUpdates(ctx, initCfg.AutoUpdater)
}) })
} }
@@ -532,7 +532,7 @@ func (p *Panel) startOAuthListenerForRemoteStreamD(
} }
logger.Debugf(ctx, "started oauth listener for the remote streamd") logger.Debugf(ctx, "started oauth listener for the remote streamd")
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
logger.Debugf(ctx, "oauthListenerForRemoteStreamD") logger.Debugf(ctx, "oauthListenerForRemoteStreamD")
defer logger.Debugf(ctx, "/oauthListenerForRemoteStreamD") defer logger.Debugf(ctx, "/oauthListenerForRemoteStreamD")
defer cancelFn() defer cancelFn()
@@ -1153,7 +1153,7 @@ func (p *Panel) getUpdatedStatus_backends_noLock(ctx context.Context) {
} }
if backendEnabled[obs.ID] { if backendEnabled[obs.ID] {
observability.Call(ctx, func() { observability.Call(ctx, func(ctx context.Context) {
obsServer, obsServerClose, err := p.StreamD.OBS(ctx) obsServer, obsServerClose, err := p.StreamD.OBS(ctx)
if obsServerClose != nil { if obsServerClose != nil {
defer obsServerClose() defer obsServerClose()
@@ -1685,7 +1685,7 @@ func (p *Panel) initMainWindow(
p.dashboardShowHideButton = widget.NewButtonWithIcon("Open", theme.ComputerIcon(), func() { p.dashboardShowHideButton = widget.NewButtonWithIcon("Open", theme.ComputerIcon(), func() {
p.dashboardLocker.Do(ctx, func() { p.dashboardLocker.Do(ctx, func() {
if p.dashboardWindow == nil { if p.dashboardWindow == nil {
observability.Go(ctx, func() { p.focusDashboardWindow(ctx) }) observability.Go(ctx, func(ctx context.Context) { p.focusDashboardWindow(ctx) })
} else { } else {
p.dashboardWindow.Window.Close() p.dashboardWindow.Window.Close()
} }
@@ -1879,7 +1879,7 @@ func (p *Panel) subscribeUpdateControlPage(ctx context.Context) {
p.getUpdatedStatus(ctx) p.getUpdatedStatus(ctx)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
t := time.NewTicker(time.Second * 5) t := time.NewTicker(time.Second * 5)
defer t.Stop() defer t.Stop()
for { for {
@@ -1918,7 +1918,7 @@ func (p *Panel) execCommand(
var stdout, stderr bytes.Buffer var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout cmd.Stdout = &stdout
cmd.Stderr = &stderr cmd.Stderr = &stderr
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
err := cmd.Run() err := cmd.Run()
if err == nil { if err == nil {
err = child_process_manager.AddChildProcess(cmd.Process) err = child_process_manager.AddChildProcess(cmd.Process)
@@ -2050,7 +2050,7 @@ func (p *Panel) setupStreamNoLock(ctx context.Context) {
// in the browser, then the stream does not want to start. // in the browser, then the stream does not want to start.
// //
// And here we wait until the hack with opening the page will complete. // And here we wait until the hack with opening the page will complete.
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
waitFor := 15 * time.Second waitFor := 15 * time.Second
deadline := time.Now().Add(waitFor) deadline := time.Now().Add(waitFor)
@@ -2077,7 +2077,7 @@ func (p *Panel) setupStreamNoLock(ctx context.Context) {
func (p *Panel) startStream(ctx context.Context) { func (p *Panel) startStream(ctx context.Context) {
p.streamMutex.ManualLock(ctx) p.streamMutex.ManualLock(ctx)
defer func() { defer func() {
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
time.Sleep(10 * time.Second) // TODO: remove this time.Sleep(10 * time.Second) // TODO: remove this
p.streamMutex.ManualUnlock(ctx) p.streamMutex.ManualUnlock(ctx)
}) })
@@ -2133,11 +2133,11 @@ func (p *Panel) afterStreamStart(ctx context.Context) {
p.execCommand(ctx, onStreamStart, nil) p.execCommand(ctx, onStreamStart, nil)
} }
observability.Go(ctx, func() { p.openStreamStartedWindow(ctx) }) observability.Go(ctx, func(ctx context.Context) { p.openStreamStartedWindow(ctx) })
} }
func (p *Panel) stopStream(ctx context.Context) { func (p *Panel) stopStream(ctx context.Context) {
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
p.streamStartedLocker.Do(ctx, func() { p.streamStartedLocker.Do(ctx, func() {
if p.streamStartedWindow != nil { if p.streamStartedWindow != nil {
p.streamStartedWindow.Close() p.streamStartedWindow.Close()
@@ -2297,7 +2297,7 @@ const aggregationDelayBeforeNotificationEnd = 100 * time.Millisecond
func (p *Panel) showWaitStreamDCallWindow(ctx context.Context) { func (p *Panel) showWaitStreamDCallWindow(ctx context.Context) {
atomic.AddInt32(&p.waitStreamDCallWindowCounter, 1) atomic.AddInt32(&p.waitStreamDCallWindowCounter, 1)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer func() { defer func() {
<-ctx.Done() <-ctx.Done()
p.waitStreamDCallWindowLocker.Do(ctx, func() { p.waitStreamDCallWindowLocker.Do(ctx, func() {
@@ -2326,7 +2326,7 @@ func (p *Panel) showWaitStreamDCallWindow(ctx context.Context) {
func (p *Panel) showWaitStreamDConnectWindow(ctx context.Context) { func (p *Panel) showWaitStreamDConnectWindow(ctx context.Context) {
atomic.AddInt32(&p.waitStreamDConnectWindowCounter, 1) atomic.AddInt32(&p.waitStreamDConnectWindowCounter, 1)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer func() { defer func() {
<-ctx.Done() <-ctx.Done()
p.waitStreamDConnectWindowLocker.Do(ctx, func() { p.waitStreamDConnectWindowLocker.Do(ctx, func() {
@@ -2361,7 +2361,9 @@ func (p *Panel) Close() (_err error) {
err = multierror.Append(err, p.eventSensor.Close()) err = multierror.Append(err, p.eventSensor.Close())
// TODO: remove observability.Go, Quit should be executed synchronously, // TODO: remove observability.Go, Quit should be executed synchronously,
// but there is a bug in fyne and it hangs // but there is a bug in fyne and it hangs
observability.Go(context.TODO(), p.app.Quit) observability.Go(context.TODO(), func(ctx context.Context) {
p.app.Quit()
})
return err.ErrorOrNil() return err.ErrorOrNil()
} }
@@ -2424,7 +2426,7 @@ func (p *Panel) localConfigCacheUpdater(ctx context.Context) (_err error) {
p.configCache = newCfg p.configCache = newCfg
}) })
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
logger.Debugf(ctx, "localConfigUpdaterLoop") logger.Debugf(ctx, "localConfigUpdaterLoop")
defer logger.Debugf(ctx, "/localConfigUpdaterLoop") defer logger.Debugf(ctx, "/localConfigUpdaterLoop")

View File

@@ -307,7 +307,7 @@ func (p *Panel) profileWindow(
for _, cat := range dataTwitch.Cache.Categories { for _, cat := range dataTwitch.Cache.Categories {
if cleanTwitchCategoryName(cat.Name) == text { if cleanTwitchCategoryName(cat.Name) == text {
setSelectedTwitchCategory(cat.Name) setSelectedTwitchCategory(cat.Name)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
twitchCategory.SetText("") twitchCategory.SetText("")
}) })
@@ -409,7 +409,7 @@ func (p *Panel) profileWindow(
text = cleanKickCategoryName(text) text = cleanKickCategoryName(text)
cat := catN[text] cat := catN[text]
setSelectedKickCategory(cat.ID) setSelectedKickCategory(cat.ID)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
kickCategory.SetText("") kickCategory.SetText("")
}) })
@@ -516,7 +516,7 @@ func (p *Panel) profileWindow(
for _, bc := range dataYouTube.Cache.Broadcasts { for _, bc := range dataYouTube.Cache.Broadcasts {
if cleanYoutubeRecordingName(bc.Snippet.Title) == text { if cleanYoutubeRecordingName(bc.Snippet.Title) == text {
setSelectedYoutubeBroadcast(bc) setSelectedYoutubeBroadcast(bc)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
youtubeTemplate.SetText("") youtubeTemplate.SetText("")
}) })

View File

@@ -48,7 +48,7 @@ func (p *Panel) initRestreamPage(
logger.Debugf(ctx, "initRestreamPage") logger.Debugf(ctx, "initRestreamPage")
defer logger.Debugf(ctx, "/initRestreamPage") defer logger.Debugf(ctx, "/initRestreamPage")
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
updateData := func() { updateData := func() {
inStreams, err := p.StreamD.ListIncomingStreams(ctx) inStreams, err := p.StreamD.ListIncomingStreams(ctx)
if err != nil { if err != nil {
@@ -70,7 +70,7 @@ func (p *Panel) initRestreamPage(
} }
}) })
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
updateData := func() { updateData := func() {
streamServers, err := p.StreamD.ListStreamServers(ctx) streamServers, err := p.StreamD.ListStreamServers(ctx)
if err != nil { if err != nil {
@@ -92,7 +92,7 @@ func (p *Panel) initRestreamPage(
} }
}) })
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
updateData := func() { updateData := func() {
dsts, err := p.StreamD.ListStreamDestinations(ctx) dsts, err := p.StreamD.ListStreamDestinations(ctx)
if err != nil { if err != nil {
@@ -114,7 +114,7 @@ func (p *Panel) initRestreamPage(
} }
}) })
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
updateData := func() { updateData := func() {
streamFwds, err := p.StreamD.ListStreamForwards(ctx) streamFwds, err := p.StreamD.ListStreamForwards(ctx)
if err != nil { if err != nil {
@@ -136,7 +136,7 @@ func (p *Panel) initRestreamPage(
} }
}) })
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
updateData := func() { updateData := func() {
streamPlayers, err := p.StreamD.ListStreamPlayers(ctx) streamPlayers, err := p.StreamD.ListStreamPlayers(ctx)
if err != nil { if err != nil {
@@ -1548,11 +1548,11 @@ func (p *Panel) streamServersUpdater(
ctx context.Context, ctx context.Context,
) context.CancelFunc { ) context.CancelFunc {
ctx, cancelFn := context.WithCancel(ctx) ctx, cancelFn := context.WithCancel(ctx)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
logger.Debugf(ctx, "streamServersUpdater: updater") logger.Debugf(ctx, "streamServersUpdater: updater")
defer logger.Debugf(ctx, "streamServersUpdater: /updater") defer logger.Debugf(ctx, "streamServersUpdater: /updater")
updateData := func() { updateData := func(ctx context.Context) {
streamServers, err := p.StreamD.ListStreamServers(ctx) streamServers, err := p.StreamD.ListStreamServers(ctx)
if err != nil { if err != nil {
p.DisplayError(err) p.DisplayError(err)
@@ -1579,7 +1579,7 @@ func (p *Panel) streamServersUpdater(
return return
case <-t.C: case <-t.C:
} }
updateData() updateData(ctx)
} }
}) })
return cancelFn return cancelFn
@@ -1589,11 +1589,11 @@ func (p *Panel) startStreamPlayersUpdater(
ctx context.Context, ctx context.Context,
) context.CancelFunc { ) context.CancelFunc {
ctx, cancelFn := context.WithCancel(ctx) ctx, cancelFn := context.WithCancel(ctx)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
logger.Debugf(ctx, "startStreamPlayersUpdater: updater") logger.Debugf(ctx, "startStreamPlayersUpdater: updater")
defer logger.Debugf(ctx, "startStreamPlayersUpdater: /updater") defer logger.Debugf(ctx, "startStreamPlayersUpdater: /updater")
updateData := func() { updateData := func(ctx context.Context) {
streamPlayers, err := p.StreamD.ListStreamPlayers(ctx) streamPlayers, err := p.StreamD.ListStreamPlayers(ctx)
if err != nil { if err != nil {
p.DisplayError(err) p.DisplayError(err)
@@ -1620,7 +1620,7 @@ func (p *Panel) startStreamPlayersUpdater(
return return
case <-t.C: case <-t.C:
} }
updateData() updateData(ctx)
} }
}) })
return cancelFn return cancelFn
@@ -1630,11 +1630,11 @@ func (p *Panel) startStreamForwardersUpdater(
ctx context.Context, ctx context.Context,
) context.CancelFunc { ) context.CancelFunc {
ctx, cancelFn := context.WithCancel(ctx) ctx, cancelFn := context.WithCancel(ctx)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
logger.Debugf(ctx, "startStreamForwardersUpdater: updater") logger.Debugf(ctx, "startStreamForwardersUpdater: updater")
defer logger.Debugf(ctx, "startStreamForwardersUpdater: /updater") defer logger.Debugf(ctx, "startStreamForwardersUpdater: /updater")
updateData := func() { updateData := func(ctx context.Context) {
streamFwds, err := p.StreamD.ListStreamForwards(ctx) streamFwds, err := p.StreamD.ListStreamForwards(ctx)
if err != nil { if err != nil {
p.DisplayError(err) p.DisplayError(err)
@@ -1661,7 +1661,7 @@ func (p *Panel) startStreamForwardersUpdater(
return return
case <-t.C: case <-t.C:
} }
updateData() updateData(ctx)
} }
}) })
return cancelFn return cancelFn

View File

@@ -135,7 +135,7 @@ func (w *streamStartedWindow) updateStreamStatusLoop(
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer wg.Done() defer wg.Done()
t := time.NewTicker(time.Second) t := time.NewTicker(time.Second)
defer t.Stop() defer t.Stop()
@@ -150,7 +150,7 @@ func (w *streamStartedWindow) updateStreamStatusLoop(
}) })
wg.Add(1) wg.Add(1)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer wg.Done() defer wg.Done()
t := time.NewTicker(time.Second) t := time.NewTicker(time.Second)
defer t.Stop() defer t.Stop()
@@ -198,7 +198,7 @@ func (w *streamStartedWindow) open(
} }
ctx, cancelFn := context.WithCancel(ctx) ctx, cancelFn := context.WithCancel(ctx)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
w.updateStreamStatusLoop(ctx) w.updateStreamStatusLoop(ctx)
}) })

View File

@@ -37,7 +37,7 @@ func (p *Panel) updateStreamStatus(
kick.ID, kick.ID,
} { } {
wg.Add(1) wg.Add(1)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer wg.Done() defer wg.Done()
ok, err := p.StreamD.IsBackendEnabled(ctx, platID) ok, err := p.StreamD.IsBackendEnabled(ctx, platID)

View File

@@ -145,7 +145,7 @@ func initGRPCServers(
} }
obsGRPC, obsGRPCClose, err := streamD.OBS(ctx) obsGRPC, obsGRPCClose, err := streamD.OBS(ctx)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
<-ctx.Done() <-ctx.Done()
if obsGRPCClose != nil { if obsGRPCClose != nil {
obsGRPCClose() obsGRPCClose()

View File

@@ -89,7 +89,7 @@ func (ui *timersUI) StartRefreshingFromRemote(
ui.refresherCancelFunc = cancelFn ui.refresherCancelFunc = cancelFn
ui.refreshFromRemote(ctx) ui.refreshFromRemote(ctx)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
t := time.NewTicker(time.Second * 5) t := time.NewTicker(time.Second * 5)
defer t.Stop() defer t.Stop()
for { for {
@@ -310,7 +310,7 @@ func (ui *timersUI) kickOff(
ctx, ui.timerCancelFunc = context.WithCancel(ctx) ctx, ui.timerCancelFunc = context.WithCancel(ctx)
ui.timer = time.NewTimer(time.Until(deadline)) ui.timer = time.NewTimer(time.Until(deadline))
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer func() { defer func() {
ui.doStop(xcontext.DetachDone(ctx)) ui.doStop(xcontext.DetachDone(ctx))
}() }()

View File

@@ -74,7 +74,7 @@ func (ui *triggerRulesUI) openSetupWindow(ctx context.Context) {
if err != nil { if err != nil {
return err return err
} }
observability.Go(ctx, func() { refreshContent() }) observability.Go(ctx, func(ctx context.Context) { refreshContent() })
return nil return nil
}, },
) )
@@ -92,7 +92,7 @@ func (ui *triggerRulesUI) openSetupWindow(ctx context.Context) {
ui.panel.DisplayError(err) ui.panel.DisplayError(err)
return return
} }
observability.Go(ctx, func() { refreshContent() }) observability.Go(ctx, func(ctx context.Context) { refreshContent() })
}, },
ui.panel.mainWindow, ui.panel.mainWindow,
) )
@@ -117,7 +117,7 @@ func (ui *triggerRulesUI) openSetupWindow(ctx context.Context) {
if err != nil { if err != nil {
return err return err
} }
observability.Go(ctx, func() { refreshContent() }) observability.Go(ctx, func(ctx context.Context) { refreshContent() })
return nil return nil
}, },
) )

View File

@@ -9,7 +9,6 @@ import (
) )
type updateTimerHandler struct { type updateTimerHandler struct {
ctx context.Context
cancelFn context.CancelFunc cancelFn context.CancelFunc
startStopButton *widget.Button startStopButton *widget.Button
startTS time.Time startTS time.Time
@@ -21,7 +20,6 @@ func newUpdateTimerHandler(
) *updateTimerHandler { ) *updateTimerHandler {
ctx, cancelFn := context.WithCancel(context.Background()) ctx, cancelFn := context.WithCancel(context.Background())
h := &updateTimerHandler{ h := &updateTimerHandler{
ctx: ctx,
cancelFn: cancelFn, cancelFn: cancelFn,
startStopButton: startStopButton, startStopButton: startStopButton,
startTS: startedAt, startTS: startedAt,
@@ -39,12 +37,12 @@ func (h *updateTimerHandler) GetStartTS() time.Time {
return h.startTS return h.startTS
} }
func (h *updateTimerHandler) loop() { func (h *updateTimerHandler) loop(ctx context.Context) {
t := time.NewTicker(time.Second) t := time.NewTicker(time.Second)
defer t.Stop() defer t.Stop()
for { for {
select { select {
case <-h.ctx.Done(): case <-ctx.Done():
return return
case <-t.C: case <-t.C:
timePassed := time.Since(h.startTS).Truncate(time.Second) timePassed := time.Since(h.startTS).Truncate(time.Second)

View File

@@ -66,7 +66,7 @@ func (w *HintWidget) mouseIn(ev *desktop.MouseEvent) {
panic("should not have happened") panic("should not have happened")
} }
w.RecheckerCancelFn = cancelFn w.RecheckerCancelFn = cancelFn
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():

View File

@@ -240,7 +240,7 @@ func (p *StreamPlayerHandler) startU(ctx context.Context) error {
p.Player = player p.Player = player
logger.Debugf(ctx, "initialized player %#+v", player) logger.Debugf(ctx, "initialized player %#+v", player)
observability.Go(ctx, func() { p.controllerLoop(ctx, cancelFn) }) observability.Go(ctx, func(ctx context.Context) { p.controllerLoop(ctx, cancelFn) })
return nil return nil
} }
@@ -351,7 +351,7 @@ func (p *StreamPlayerHandler) startObserver(
url *url.URL, url *url.URL,
restartFn context.CancelFunc, restartFn context.CancelFunc,
) { ) {
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer restartFn() defer restartFn()
logger.Debugf(ctx, "observer started") logger.Debugf(ctx, "observer started")
defer func() { logger.Debugf(ctx, "observer ended") }() defer func() { logger.Debugf(ctx, "observer ended") }()
@@ -468,7 +468,7 @@ func (p *StreamPlayerHandler) openStream(
ctx, cancelFn := context.WithTimeout(ctx, openURLTimeout) ctx, cancelFn := context.WithTimeout(ctx, openURLTimeout)
defer cancelFn() defer cancelFn()
var once sync.Once var once sync.Once
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
<-ctx.Done() <-ctx.Done()
once.Do(func() { once.Do(func() {
logger.Errorf(ctx, "timed out, unable to open the URL '%s' within the timeout of %s", u, openURLTimeout) logger.Errorf(ctx, "timed out, unable to open the URL '%s' within the timeout of %s", u, openURLTimeout)
@@ -535,7 +535,7 @@ func (p *StreamPlayerHandler) controllerLoop(
return return
} }
isClosed = true isClosed = true
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
p.PlayerLocker.Do(ctx, func() { p.PlayerLocker.Do(ctx, func() {
err := p.restartU(ctx) err := p.restartU(ctx)
errmon.ObserveErrorCtx(ctx, err) errmon.ObserveErrorCtx(ctx, err)
@@ -646,7 +646,7 @@ func (p *StreamPlayerHandler) controllerLoop(
if !triedToFixEmptyLinkViaReopen { if !triedToFixEmptyLinkViaReopen {
if link, err := player.GetLink(ctx); link == "" { if link, err := player.GetLink(ctx); link == "" {
logger.Debugf(ctx, "the link is empty for some reason, reopening the link (BTW, err if any is: %v)", err) logger.Debugf(ctx, "the link is empty for some reason, reopening the link (BTW, err if any is: %v)", err)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
if err := p.openStream(ctx, restart); err != nil { if err := p.openStream(ctx, restart); err != nil {
logger.Errorf(ctx, "unable to open link '%s': %v", link, err) logger.Errorf(ctx, "unable to open link '%s': %v", link, err)
} }
@@ -704,7 +704,7 @@ func (p *StreamPlayerHandler) controllerLoop(
restart() restart()
return false return false
} }
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
if err := p.openStream(ctx, restart); err != nil { if err := p.openStream(ctx, restart); err != nil {
logger.Error(ctx, "unable to re-open the stream: %v", err) logger.Error(ctx, "unable to re-open the stream: %v", err)
restart() restart()
@@ -748,7 +748,7 @@ func (p *StreamPlayerHandler) controllerLoop(
cancelFn() cancelFn()
return return
} else { } else {
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
select { select {
case <-closeChan: case <-closeChan:
logger.Warnf(ctx, "the player is apparently closed, restarting it") logger.Warnf(ctx, "the player is apparently closed, restarting it")
@@ -773,7 +773,7 @@ func (p *StreamPlayerHandler) controllerLoop(
logger.Error(ctx, "unable to access the player for setting it up for streaming: %v", err) logger.Error(ctx, "unable to access the player for setting it up for streaming: %v", err)
} }
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
time.Sleep(time.Second) // TODO: delete this ugly racy hack time.Sleep(time.Second) // TODO: delete this ugly racy hack
p.notifyStart(context.WithValue(ctx, CtxKeyStreamPlayer, p)) p.notifyStart(context.WithValue(ctx, CtxKeyStreamPlayer, p))
}) })

View File

@@ -115,7 +115,7 @@ func (s *StreamServer) init(
for _, srv := range cfg.PortServers { for _, srv := range cfg.PortServers {
{ {
srv := srv srv := srv
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
s.mutex.Do(ctx, func() { s.mutex.Do(ctx, func() {
_, err := s.startServer(ctx, srv.Type, srv.ListenAddr, srv.Options()...) _, err := s.startServer(ctx, srv.Type, srv.ListenAddr, srv.Options()...)
if err != nil { if err != nil {
@@ -392,7 +392,7 @@ func (s *StreamServer) WaitPublisherChan(
appKey := types.AppKey(streamID) appKey := types.AppKey(streamID)
ch := make(chan types.Publisher, 1) ch := make(chan types.Publisher, 1)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
var curPublisher *PublisherClosedNotifier var curPublisher *PublisherClosedNotifier
if waitForNext { if waitForNext {
curPublisher = xsync.DoR1( curPublisher = xsync.DoR1(

View File

@@ -87,7 +87,7 @@ func (s *StreamServer) init(
for _, srv := range cfg.PortServers { for _, srv := range cfg.PortServers {
{ {
srv := srv srv := srv
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
_, err := s.startServer(ctx, srv.Type, srv.ListenAddr) _, err := s.startServer(ctx, srv.Type, srv.ListenAddr)
if err != nil { if err != nil {
logger.Errorf( logger.Errorf(
@@ -238,7 +238,7 @@ func (s *StreamServer) startServer(
} }
}, },
}) })
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
err = portSrv.Serve(listener) err = portSrv.Serve(listener)
if err != nil { if err != nil {
err = fmt.Errorf( err = fmt.Errorf(
@@ -374,7 +374,7 @@ func (s *StreamServer) WaitPublisherChan(
) (<-chan types.Publisher, error) { ) (<-chan types.Publisher, error) {
ch := make(chan types.Publisher, 1) ch := make(chan types.Publisher, 1)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
ch <- s.RelayService.WaitPubsub(ctx, types.StreamID2LocalAppName(streamID), waitForNext) ch <- s.RelayService.WaitPubsub(ctx, types.StreamID2LocalAppName(streamID), waitForNext)
close(ch) close(ch)
}) })

View File

@@ -108,7 +108,7 @@ func (fwd *ActiveStreamForwarding) start(ctx context.Context) (_err error) {
} }
ctx, cancelFn := context.WithCancel(ctx) ctx, cancelFn := context.WithCancel(ctx)
fwd.cancelFunc = cancelFn fwd.cancelFunc = cancelFn
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
for { for {
err := fwd.waitForPublisherAndStart( err := fwd.waitForPublisherAndStart(
ctx, ctx,
@@ -182,7 +182,7 @@ func (fwd *ActiveStreamForwarding) waitForPublisherAndStart(
ctx, cancelFn := context.WithCancel(ctx) ctx, cancelFn := context.WithCancel(ctx)
defer cancelFn() defer cancelFn()
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer cancelFn() defer cancelFn()
select { select {
case <-ctx.Done(): case <-ctx.Done():
@@ -322,7 +322,7 @@ func (fwd *ActiveStreamForwarding) waitForPublisherAndStart(
return err return err
} }
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
t := time.NewTicker(time.Second) t := time.NewTicker(time.Second)
defer t.Stop() defer t.Stop()
for { for {
@@ -429,7 +429,7 @@ func (fwd *ActiveStreamForwarding) killRecodingProcess(
} }
resultCh := make(chan error, 1) resultCh := make(chan error, 1)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer func() { defer func() {
close(resultCh) close(resultCh)
}() }()

View File

@@ -315,7 +315,7 @@ func (s *StreamForwards) newActiveStreamForward(
result.ActiveForwarding = fwd result.ActiveForwarding = fwd
if quirks.RestartUntilYoutubeRecognizesStream.Enabled { if quirks.RestartUntilYoutubeRecognizesStream.Enabled {
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
s.restartUntilYoutubeRecognizesStream( s.restartUntilYoutubeRecognizesStream(
ctx, ctx,
result, result,

View File

@@ -45,7 +45,7 @@ func (s *StreamPlayers) Init(
) error { ) error {
initCfg := types.InitOptions(opts).Config() initCfg := types.InitOptions(opts).Config()
s.WithConfig(ctx, func(ctx context.Context, cfg *types.Config) { s.WithConfig(ctx, func(ctx context.Context, cfg *types.Config) {
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
var opts SetupOptions var opts SetupOptions
if initCfg.DefaultStreamPlayerOptions != nil { if initCfg.DefaultStreamPlayerOptions != nil {
opts = append( opts = append(
@@ -175,7 +175,7 @@ func setupStreamPlayers(
} }
ch := make(chan struct{}) ch := make(chan struct{})
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return

View File

@@ -37,7 +37,7 @@ func (wmh *XWindowManagerHandler) WindowFocusChangeChan(ctx context.Context) <-c
logger.Debugf(ctx, "WindowFocusChangeChan") logger.Debugf(ctx, "WindowFocusChangeChan")
ch := make(chan WindowFocusChange) ch := make(chan WindowFocusChange)
observability.Go(ctx, func() { observability.Go(ctx, func(ctx context.Context) {
defer logger.Debugf(ctx, "/WindowFocusChangeChan") defer logger.Debugf(ctx, "/WindowFocusChangeChan")
defer func() { defer func() {
close(ch) close(ch)