diff --git a/cmd/streamcli/commands/commands.go b/cmd/streamcli/commands/commands.go index 6b9059d..e3adfb8 100644 --- a/cmd/streamcli/commands/commands.go +++ b/cmd/streamcli/commands/commands.go @@ -42,7 +42,7 @@ var ( l.Error("unable to get the value of the flag 'go-net-pprof-addr': %v", err) } if netPprofAddr != "" { - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { if netPprofAddr == "" { netPprofAddr = "localhost:0" } diff --git a/cmd/streamd/main.go b/cmd/streamd/main.go index ebc2f3a..20254a5 100644 --- a/cmd/streamd/main.go +++ b/cmd/streamd/main.go @@ -91,7 +91,7 @@ func main() { ctx = logger.CtxWithLogger(ctx, l) if *netPprofAddr != "" || (forceNetPProfOnAndroid && runtime.GOOS == "android") { - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { if *netPprofAddr == "" { *netPprofAddr = "localhost:0" } @@ -175,7 +175,7 @@ func main() { l.Fatalf("unable to initialize the streamd instance: %v", err) } - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { if err = streamD.Run(ctx); err != nil { l.Errorf("streamd returned an error: %v", err) } @@ -186,7 +186,7 @@ func main() { log.Fatalf("failed to listen: %v", err) } - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { <-ctx.Done() listener.Close() }) diff --git a/cmd/streampanel/context.go b/cmd/streampanel/context.go index e2f9629..a1ec934 100644 --- a/cmd/streampanel/context.go +++ b/cmd/streampanel/context.go @@ -100,7 +100,7 @@ func getContext( closeFile = func() { f.Close() } } rotateFunc() - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer func() { logger.Debugf(ctx, "log rotator is closed") }() diff --git a/cmd/streampanel/fork.go b/cmd/streampanel/fork.go index bfe4cec..e3ab373 100644 --- a/cmd/streampanel/fork.go +++ b/cmd/streampanel/fork.go @@ -147,7 +147,7 @@ func runSplitProcesses( logger.Fatalf(ctx, "failed to start process manager: %v", err) } defer m.Close() - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { m.Serve(ctx, func(ctx context.Context, source ProcessName, content any) error { switch content.(type) { case GetFlags: @@ -170,7 +170,7 @@ func runSplitProcesses( return nil }) }) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { select { case <-ctx.Done(): return @@ -289,7 +289,7 @@ func runFork( logger.Errorf(ctx, "unable to register the command %v to be auto-killed: %v (GOOS: %v)", args, err, runtime.GOOS) } } - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { err := cmd.Wait() stderrLogger.Flush() stdoutLogger.Flush() @@ -307,10 +307,10 @@ func runFork( func fakeFork(ctx context.Context, procName ProcessName, addr, password string) error { switch procName { case ProcessNameStreamd: - observability.Go(ctx, func() { forkUI(ctx, addr, password) }) + observability.Go(ctx, func(ctx context.Context) { forkUI(ctx, addr, password) }) return nil case ProcessNameUI: - observability.Go(ctx, func() { forkStreamd(ctx, addr, password) }) + observability.Go(ctx, func(ctx context.Context) { forkStreamd(ctx, addr, password) }) return nil } return fmt.Errorf("unexpected process name: %s", procName) diff --git a/cmd/streampanel/main.go b/cmd/streampanel/main.go index ce67795..f56af58 100644 --- a/cmd/streampanel/main.go +++ b/cmd/streampanel/main.go @@ -49,7 +49,7 @@ func main() { } ctx = belt.WithField(ctx, "process", ProcessNameMain) defer func() { observability.PanicIfNotNil(ctx, recover()) }() - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { <-ctx.Done() logger.Debugf(ctx, "context is cancelled") }) @@ -125,7 +125,7 @@ func runPanel( return err } - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { err = grpcServer.Serve(listener) if err != nil { logger.Panicf(ctx, "unable to server the gRPC server: %v", err) @@ -135,7 +135,7 @@ func runPanel( if mainProcess != nil { setReadyFor(ctx, mainProcess, StreamDDied{}, UpdateStreamDConfig{}) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { err := mainProcess.Serve( ctx, func(ctx context.Context, source mainprocess.ProcessName, content any) error { diff --git a/cmd/streampanel/runtime.go b/cmd/streampanel/runtime.go index 7e692d1..0017de3 100644 --- a/cmd/streampanel/runtime.go +++ b/cmd/streampanel/runtime.go @@ -30,7 +30,7 @@ func initRuntime( l := logger.FromCtx(ctx) if ForceDebug { - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { t := time.NewTicker(time.Second) defer t.Stop() for { @@ -88,7 +88,7 @@ func initRuntime( } if netPprofAddr != "" { - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { http.Handle( "/metrics", promhttp.Handler(), @@ -104,7 +104,7 @@ func initRuntime( runtime.GOMAXPROCS(4) } - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { t := time.NewTicker(time.Second) defer t.Stop() for { diff --git a/cmd/streampanel/signal_handler.go b/cmd/streampanel/signal_handler.go index c9be39e..1e1800e 100644 --- a/cmd/streampanel/signal_handler.go +++ b/cmd/streampanel/signal_handler.go @@ -17,7 +17,7 @@ func mainProcessSignalHandler( ) chan<- os.Signal { c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { for range c { cancelFn() forkLocker.Do(ctx, func() { @@ -26,7 +26,7 @@ func mainProcessSignalHandler( wg.Add(1) { name, f := name, f - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer wg.Done() logger.Debugf(ctx, "interrupting '%s'", name) err := f.Process.Signal(os.Interrupt) @@ -37,7 +37,7 @@ func mainProcessSignalHandler( return } - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { time.Sleep(5 * time.Second) logger.Debugf(ctx, "killing '%s'", name) err := f.Process.Kill() @@ -67,7 +67,7 @@ func childProcessSignalHandler( ) chan<- os.Signal { c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { for range c { logger.Infof(ctx, "received an interruption signal") cancelFunc() diff --git a/cmd/streampanel/streamd.go b/cmd/streampanel/streamd.go index 59d6b26..cc25f4c 100644 --- a/cmd/streampanel/streamd.go +++ b/cmd/streampanel/streamd.go @@ -57,7 +57,7 @@ func forkStreamd(preCtx context.Context, mainProcessAddr, password string) { logger.Debugf(ctx, "flags == %#+v", flags) ctx, cancelFunc := initRuntime(ctx, flags, procName) defer cancelFunc() - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { <-ctx.Done() logger.Debugf(ctx, "context is cancelled") }) @@ -187,7 +187,7 @@ func runStreamd( if mainProcess != nil { logger.Debugf(ctx, "starting the IPC server") setReadyFor(ctx, mainProcess, GetStreamdAddress{}, RequestStreamDConfig{}) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { err := mainProcess.Serve( ctx, func( @@ -267,7 +267,7 @@ func initGRPCServers( } obsGRPC, obsGRPCClose, err := streamD.OBS(ctx) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { <-ctx.Done() listener.Close() if obsGRPCClose != nil { @@ -298,7 +298,7 @@ func initGRPCServers( registerGRPCServices(grpcServer, streamdGRPC, obsGRPC, proxyGRPC) // start the server: - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { logger.Infof(ctx, "started server at %s", listener.Addr().String()) err = grpcServer.Serve(listener) select { diff --git a/cmd/streampanel/ui.go b/cmd/streampanel/ui.go index a466429..7382214 100644 --- a/cmd/streampanel/ui.go +++ b/cmd/streampanel/ui.go @@ -29,7 +29,7 @@ func forkUI(preCtx context.Context, mainProcessAddr, password string) { ctx, cancelFunc := initRuntime(ctx, flags, procName) defer cancelFunc() defer belt.Flush(ctx) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { <-ctx.Done() logger.Debugf(ctx, "context is cancelled") }) diff --git a/go.mod b/go.mod index 2da8e67..81cf720 100755 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ replace github.com/wlynxg/anet => github.com/BieHDC/anet v0.0.6-0.20241226223613 replace github.com/nicklaw5/helix/v2 v2.30.1-0.20240715193454-0151ccccf980 => github.com/xaionaro-go/helix/v2 v2.0.0-20250309182928-f54c9d4c8a29 -replace github.com/asticode/go-astiav v0.35.1 => github.com/xaionaro-go/astiav v0.0.0-20250406220418-87d14d2908f9 +replace github.com/asticode/go-astiav v0.36.0 => github.com/xaionaro-go/astiav v0.0.0-20250521203320-7402f3e25a7c replace github.com/bluenviron/mediacommon/v2 v2.0.1-0.20250324151931-b8ce69d15d3d => github.com/xaionaro-go/mediacommon/v2 v2.0.0-20250420012906-03d6d69ac3b7 @@ -207,11 +207,12 @@ require ( github.com/volatiletech/sqlboiler/v4 v4.16.2 // indirect github.com/volatiletech/strmangle v0.0.6 // indirect github.com/wlynxg/anet v0.0.6-0.20250109065809-5501d401a269 // indirect - github.com/xaionaro-go/avmediacodec v0.0.0-20250421150856-ddd390422c21 // indirect + github.com/xaionaro-go/avcommon v0.0.0-20250510235605-840f8210b727 // indirect + github.com/xaionaro-go/avmediacodec v0.0.0-20250505012527-c819676502d8 // indirect github.com/xaionaro-go/gorex v0.0.0-20241010205749-bcd59d639c4d // indirect - github.com/xaionaro-go/libsrt v0.0.0-20250105232601-e760c79b2bc3 // indirect + github.com/xaionaro-go/libsrt v0.0.0-20250505013920-61d894a3b7e9 // indirect github.com/xaionaro-go/ndk v0.0.0-20250420195304-361bb98583bf // indirect - github.com/xaionaro-go/proxy v0.0.0-20250111150848-1f0e7b262638 // indirect + github.com/xaionaro-go/proxy v0.0.0-20250525144747-579f5a891c15 // indirect github.com/xaionaro-go/spinlock v0.0.0-20200518175509-30e6d1ce68a1 // indirect github.com/xanzy/ssh-agent v0.3.3 // indirect github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778 // indirect @@ -255,7 +256,7 @@ require ( github.com/andreykaipov/goobs v1.4.1 github.com/anthonynsimon/bild v0.14.0 github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef - github.com/asticode/go-astiav v0.35.1 + github.com/asticode/go-astiav v0.36.0 github.com/bamiaux/rez v0.0.0-20170731184118-29f4463c688b github.com/bluenviron/gortsplib/v4 v4.12.4-0.20250324174248-61372cfa6800 github.com/chai2010/webp v1.1.1 @@ -289,7 +290,7 @@ require ( github.com/spf13/pflag v1.0.6 github.com/stretchr/testify v1.10.0 github.com/xaionaro-go/audio v0.0.0-20250210102901-abfced9d5ef3 - github.com/xaionaro-go/avpipeline v0.0.0-20250428012319-401ac2ebe66c + github.com/xaionaro-go/avpipeline v0.0.0-20250525204026-17104bc4baca github.com/xaionaro-go/datacounter v1.0.4 github.com/xaionaro-go/go-rtmp v0.0.0-20241009130244-1e3160f27f42 github.com/xaionaro-go/grpcproxy v0.0.0-20241103205849-a8fef42e72f9 @@ -300,9 +301,9 @@ require ( github.com/xaionaro-go/mediamtx v0.0.0-20250406132618-79ecbc3e138f github.com/xaionaro-go/object v0.0.0-20241026212449-753ce10ec94c github.com/xaionaro-go/obs-grpc-proxy v0.0.0-20241018162120-5faf4e7a684a - github.com/xaionaro-go/observability v0.0.0-20250420133500-5c4d2e045932 - github.com/xaionaro-go/player v0.0.0-20250427220051-e366ad8a1fb5 - github.com/xaionaro-go/recoder v0.0.0-20250503155018-6f353978d332 + github.com/xaionaro-go/observability v0.0.0-20250622130956-24b7017284e4 + github.com/xaionaro-go/player v0.0.0-20250622133132-5473824ef0d0 + github.com/xaionaro-go/recoder v0.0.0-20250622133456-7bd1af83fda5 github.com/xaionaro-go/secret v0.0.0-20250111141743-ced12e1082c2 github.com/xaionaro-go/serializable v0.0.0-20250412140540-5ac572306599 github.com/xaionaro-go/timeapiio v0.0.0-20240915203246-b907cf699af3 @@ -315,7 +316,7 @@ require ( github.com/xaionaro-go/xsync v0.0.0-20250614210231-b74f647f859f github.com/yutopp/go-flv v0.3.1 golang.org/x/crypto v0.38.0 - google.golang.org/grpc v1.71.1 + google.golang.org/grpc v1.72.1 google.golang.org/protobuf v1.36.6 gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v3 v3.0.1 diff --git a/go.sum b/go.sum index 5ff9174..7f51f57 100755 --- a/go.sum +++ b/go.sum @@ -1076,14 +1076,16 @@ github.com/volatiletech/strmangle v0.0.6 h1:AdOYE3B2ygRDq4rXDij/MMwq6KVK/pWAYxpC github.com/volatiletech/strmangle v0.0.6/go.mod h1:ycDvbDkjDvhC0NUU8w3fWwl5JEMTV56vTKXzR3GeR+0= github.com/x-cray/logrus-prefixed-formatter v0.5.2 h1:00txxvfBM9muc0jiLIEAkAcIMJzfthRT6usrui8uGmg= github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE= -github.com/xaionaro-go/astiav v0.0.0-20250406220418-87d14d2908f9 h1:6thYU3Iykk876UV8zTlN9rut3BEhLv/k28fiHDcx+uE= -github.com/xaionaro-go/astiav v0.0.0-20250406220418-87d14d2908f9/go.mod h1:K7D8UC6GeQt85FUxk2KVwYxHnotrxuEnp5evkkudc2s= +github.com/xaionaro-go/astiav v0.0.0-20250521203320-7402f3e25a7c h1:MTrVWO9fpyuavKvXCitikDCHRSOorjD7/OOejhAXz+A= +github.com/xaionaro-go/astiav v0.0.0-20250521203320-7402f3e25a7c/go.mod h1:GI0pHw6K2/pl/o8upCtT49P/q4KCwhv/8nGLlCsZLdA= github.com/xaionaro-go/audio v0.0.0-20250210102901-abfced9d5ef3 h1:LRIpqqC7Gsz5+/EsIWRtdPZZPMpx9yykUVFyUnRaKbE= github.com/xaionaro-go/audio v0.0.0-20250210102901-abfced9d5ef3/go.mod h1:i4CntPlryh9HLmA3p3M0CNr1usRkEkuh3N2Ui3HeXQA= -github.com/xaionaro-go/avmediacodec v0.0.0-20250421150856-ddd390422c21 h1:afsJFvZaf4z87n/V9LgXgD6u8MZBesZj8mnHvlfRi2k= -github.com/xaionaro-go/avmediacodec v0.0.0-20250421150856-ddd390422c21/go.mod h1:obX+/HoeX6lzM/Zuzgtm0JdG8CQ58I1isp5xSQEm+kQ= -github.com/xaionaro-go/avpipeline v0.0.0-20250428012319-401ac2ebe66c h1:KTQYCAJAofdHnjxYC8VV+oMW2WRnxa8wy2o0fNlfueU= -github.com/xaionaro-go/avpipeline v0.0.0-20250428012319-401ac2ebe66c/go.mod h1:Czl84biYuPZeILYiRiziR0yeuXKvrqQfzJ1ItjSq/o0= +github.com/xaionaro-go/avcommon v0.0.0-20250510235605-840f8210b727 h1:uQ8V1T3Oeru15gEIthts9GWkMS3eQ0Eo7vOKvE4G2/k= +github.com/xaionaro-go/avcommon v0.0.0-20250510235605-840f8210b727/go.mod h1:kjLo1LasgdDJqbTGD5bbEM+D6RiZSbf5ZT8yiPFF1BA= +github.com/xaionaro-go/avmediacodec v0.0.0-20250505012527-c819676502d8 h1:FZn9+TN3uHhohfpanWkR9lFNHApizznZbML6XjvEgTU= +github.com/xaionaro-go/avmediacodec v0.0.0-20250505012527-c819676502d8/go.mod h1:2W2Kp/HJFXcFBppQ4YytgDy/ydFL3hGc23xSB1U/Luc= +github.com/xaionaro-go/avpipeline v0.0.0-20250525204026-17104bc4baca h1:Cls4rEimemZicWzhVlzQm1otU/RTpfNVpgdfwvGEJrQ= +github.com/xaionaro-go/avpipeline v0.0.0-20250525204026-17104bc4baca/go.mod h1:LMh5Qi7cuntcktUezfA9toVCUCCsx9pjyGDWe9GLt9A= github.com/xaionaro-go/datacounter v1.0.4 h1:+QMZLmu73R5WGkQfUPwlXF/JFN+Weo4iuDZkiL2wVm8= github.com/xaionaro-go/datacounter v1.0.4/go.mod h1:Sf9vBevuV6w5iE6K3qJ9pWVKcyS60clWBUSQLjt5++c= github.com/xaionaro-go/fyne/v2 v2.0.0-20250622004601-3a26ee69528a h1:awMQXlaweeiSZB4rSNfMmJGJriyn1ca/m/lglBi9uyA= @@ -1102,8 +1104,8 @@ github.com/xaionaro-go/iterate v0.0.0-20250406123757-7802d56b52ce h1:4a0vM4EOq7c github.com/xaionaro-go/iterate v0.0.0-20250406123757-7802d56b52ce/go.mod h1:Dx52o1WH1xUoV8jjpiIVTRZ/rNoPNPcnLdMrQ5D2FH0= github.com/xaionaro-go/kickcom v0.0.0-20250316223447-a1fe3d153d96 h1:SZlH+zdXjJ4QYjFZURNvJUF3cH0ToBJhACqXdxuN+50= github.com/xaionaro-go/kickcom v0.0.0-20250316223447-a1fe3d153d96/go.mod h1:Y8KwAUd/RrqYtRiJZWVZsGClOXF+vK5jOjZWMIvhYTE= -github.com/xaionaro-go/libsrt v0.0.0-20250105232601-e760c79b2bc3 h1:OeqZZ9i3KHaTKOxLtrF7nBkao299F5QK4acl9Nd/GrI= -github.com/xaionaro-go/libsrt v0.0.0-20250105232601-e760c79b2bc3/go.mod h1:7k3ZDcOLrS8ZKOu3P30tWMSno2blKqmH+9pKfDCZ2mE= +github.com/xaionaro-go/libsrt v0.0.0-20250505013920-61d894a3b7e9 h1:z5K1pa9cJZ2oqFNWyyxrxO/50Hv4Gn+1kkfXTHkoKNM= +github.com/xaionaro-go/libsrt v0.0.0-20250505013920-61d894a3b7e9/go.mod h1:yH5w7hdIyhbR4p0WfABH9fLIIK2ZwPd5CKk2K2ND/vo= github.com/xaionaro-go/lockmap v0.0.0-20240901172806-e17aea364748 h1:SlB3zLAuLgRxdOo250gMUG/7hSiEU2NzEUNYbJDuI2A= github.com/xaionaro-go/lockmap v0.0.0-20240901172806-e17aea364748/go.mod h1:UO+SYZ5JAJGOnNkDycFrFwkaaPeSqAEQUM0TUp9Vb24= github.com/xaionaro-go/logrustash v0.0.0-20240804141650-d48034780a5f h1:mMrVrYtH9MyCUzBwPvuEntvqdCJ0zifCfqV6bHU6z1M= @@ -1120,16 +1122,16 @@ github.com/xaionaro-go/object v0.0.0-20241026212449-753ce10ec94c h1:2CIIxTRox9au github.com/xaionaro-go/object v0.0.0-20241026212449-753ce10ec94c/go.mod h1:vRcA12NWsR0IrS75eqnPBs5aVfCYmJy4bR+6DbJpBCg= github.com/xaionaro-go/obs-grpc-proxy v0.0.0-20241018162120-5faf4e7a684a h1:PyX7XpLkj+eAwrPMFMGpvZIG4zBfzAfwNhwTtbORqN0= github.com/xaionaro-go/obs-grpc-proxy v0.0.0-20241018162120-5faf4e7a684a/go.mod h1:exSKIlCibB0ww+ABDwH+YG/iNdqVfdzXBBg5LYxkxGw= -github.com/xaionaro-go/observability v0.0.0-20250420133500-5c4d2e045932 h1:uDPBczg4UmPPig0l7DLlIj5XCCVIlW+7KQ4THVtqOU8= -github.com/xaionaro-go/observability v0.0.0-20250420133500-5c4d2e045932/go.mod h1:j5y9LVYd0v8sJa9Ks7ZyuwFxAUpaNFHBNKBkiYipxPM= -github.com/xaionaro-go/player v0.0.0-20250427220051-e366ad8a1fb5 h1:6/7G8t/Ua31JjttkW/qJlhqpL+wAl6Ei1rFIeS9xvSw= -github.com/xaionaro-go/player v0.0.0-20250427220051-e366ad8a1fb5/go.mod h1:ZBT/KpMvYbHDcm8H7jo54C4xQ1gAgbebu3MHRu6iaqM= -github.com/xaionaro-go/proxy v0.0.0-20250111150848-1f0e7b262638 h1:w7Dt6Mpj36S2cWm0PkT2+D4kxrQbfCwjXZs1HqiILpE= -github.com/xaionaro-go/proxy v0.0.0-20250111150848-1f0e7b262638/go.mod h1:hOkJBFoMsnCDoZgpSPTHYbnevPgtpD16d9Xga91U+Eo= +github.com/xaionaro-go/observability v0.0.0-20250622130956-24b7017284e4 h1:dlsJ1l9WI+LTpB5z4f7FCLLrxJ7uV4tj9jWNqpibhA0= +github.com/xaionaro-go/observability v0.0.0-20250622130956-24b7017284e4/go.mod h1:GkNC0+nPJMOzotTzJVlH4+DvDg9cKrV9qqOe+8/kVw4= +github.com/xaionaro-go/player v0.0.0-20250622133132-5473824ef0d0 h1:XB5P4Gh8QKpC11xd8xMS+29xOdRtYb1Geflvs5vL1tw= +github.com/xaionaro-go/player v0.0.0-20250622133132-5473824ef0d0/go.mod h1:jQ00KD3KmELFS+VsQwkLYUOQ63i0lJA3RZSSP37V720= +github.com/xaionaro-go/proxy v0.0.0-20250525144747-579f5a891c15 h1:Qqoy9MDWq2Yh6uazAqQDzqU0doalTL3tRjNCo7X7GXA= +github.com/xaionaro-go/proxy v0.0.0-20250525144747-579f5a891c15/go.mod h1:6kxHtLmOImv/zwXSvaI1CW9Q8Pw+m5b891ZoejMKHPA= github.com/xaionaro-go/pulse v0.0.0-20241023202712-7151fa00d4bb h1:9iHPI27CYbmJDhzEuCABQthE/DGVNvT60ybWvv3BV8w= github.com/xaionaro-go/pulse v0.0.0-20241023202712-7151fa00d4bb/go.mod h1:cpYspI6YljhkUf1WLXLLDmeaaPFc3CnGLjDZf9dZ4no= -github.com/xaionaro-go/recoder v0.0.0-20250503155018-6f353978d332 h1:jB5I8UE9UL6g7qQKaZ/g9wt3lIXxgkDDJX1cV37D5go= -github.com/xaionaro-go/recoder v0.0.0-20250503155018-6f353978d332/go.mod h1:Twc+NcQQ+afg4RHxwqqo9pRGIaY7+QwpuAiYa7ClSLw= +github.com/xaionaro-go/recoder v0.0.0-20250622133456-7bd1af83fda5 h1:RHlEvVGWUCdOxbRNL1b3qpv0LaNBZsf+n+wl9mIG3c4= +github.com/xaionaro-go/recoder v0.0.0-20250622133456-7bd1af83fda5/go.mod h1:RpW+gwH/6WZSZfu2zV6ffEU4G2rUHzFCyJ3Ps3K3wuo= github.com/xaionaro-go/secret v0.0.0-20250111141743-ced12e1082c2 h1:QHpTWfyfmz65cE0MtFXe9fScdi+X0VIYR2wgolSYEUk= github.com/xaionaro-go/secret v0.0.0-20250111141743-ced12e1082c2/go.mod h1:XKoHGZ4VKMbVBl8VotLIoWQdrB6Q7jnR++RbkiegZFU= github.com/xaionaro-go/serializable v0.0.0-20250412140540-5ac572306599 h1:CzcQd6wLiqgjd8K/6UzR5uyt6sg4ut/kVxi6+FJMbdI= @@ -1773,8 +1775,8 @@ google.golang.org/genproto v0.0.0-20220429170224-98d788798c3e/go.mod h1:8w6bsBMX google.golang.org/genproto v0.0.0-20220505152158-f39f71e6c8f3/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4= google.golang.org/genproto v0.0.0-20220519153652-3a47de7e79bd/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4= google.golang.org/genproto v0.0.0-20241015192408-796eee8c2d53 h1:Df6WuGvthPzc+JiQ/G+m+sNX24kc0aTBqoDN/0yyykE= -google.golang.org/genproto/googleapis/api v0.0.0-20250106144421-5f5ef82da422 h1:GVIKPyP/kLIyVOgOnTwFOrvQaQUzOzGMCxgFUOEmm24= -google.golang.org/genproto/googleapis/api v0.0.0-20250106144421-5f5ef82da422/go.mod h1:b6h1vNKhxaSoEI+5jc3PJUCustfli/mRab7295pY7rw= +google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a h1:nwKuGPlUAt+aR+pcrkfFRrTU1BVrSmYyYMxYbUIVHr0= +google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a/go.mod h1:3kWAYMk1I75K4vykHtKt2ycnOgpA6974V7bREqbsenU= google.golang.org/genproto/googleapis/rpc v0.0.0-20250409194420-de1ac958c67a h1:GIqLhp/cYUkuGuiT+vJk8vhOP86L4+SP5j8yXgeVpvI= google.golang.org/genproto/googleapis/rpc v0.0.0-20250409194420-de1ac958c67a/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= @@ -1810,8 +1812,8 @@ google.golang.org/grpc v1.44.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ5 google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ= google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= google.golang.org/grpc v1.46.2/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= -google.golang.org/grpc v1.71.1 h1:ffsFWr7ygTUscGPI0KKK6TLrGz0476KUvvsbqWK0rPI= -google.golang.org/grpc v1.71.1/go.mod h1:H0GRtasmQOh9LkFoCPDu3ZrwUtD1YGE+b2vYBYd/8Ec= +google.golang.org/grpc v1.72.1 h1:HR03wO6eyZ7lknl75XlxABNVLLFc2PAb6mHlYh756mA= +google.golang.org/grpc v1.72.1/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= diff --git a/pkg/mainprocess/client.go b/pkg/mainprocess/client.go index 533462b..dd236fd 100644 --- a/pkg/mainprocess/client.go +++ b/pkg/mainprocess/client.go @@ -87,7 +87,7 @@ func (c *Client) Serve( ) error { ctx, cancelFn := context.WithCancel(ctx) defer cancelFn() - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { <-ctx.Done() err := c.Close() if err != nil { diff --git a/pkg/mainprocess/main_process.go b/pkg/mainprocess/main_process.go index d789c40..cac2788 100644 --- a/pkg/mainprocess/main_process.go +++ b/pkg/mainprocess/main_process.go @@ -120,7 +120,7 @@ func (m *Manager) Serve( ctx, cancelFn := context.WithCancel(ctx) defer cancelFn() - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { <-ctx.Done() err := m.Close() if err != nil { @@ -160,7 +160,7 @@ func (m *Manager) addNewConnection( conn net.Conn, onReceivedMessage OnReceivedMessageFunc, ) { - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { m.handleConnection(ctx, conn, onReceivedMessage) }) } @@ -175,7 +175,7 @@ func (m *Manager) handleConnection( defer func() { logger.Tracef(ctx, "/handleConnection from %s (%s)", conn.RemoteAddr(), regMessage.Source) }() ctx, cancelFn := context.WithCancel(ctx) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { <-ctx.Done() conn.Close() }) @@ -291,7 +291,7 @@ func (m *Manager) processMessage( err = multierror.Append(err, onReceivedMessage(ctx, source, message.Content)) errCh := make(chan error) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { for e := range errCh { err = multierror.Append(err, e) } @@ -303,7 +303,7 @@ func (m *Manager) processMessage( wg.Add(1) { dst := dst - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer wg.Done() errCh <- m.sendMessage(ctx, source, dst, message.Content) }) @@ -382,7 +382,7 @@ func (m *Manager) sendMessage( return fmt.Errorf("process '%s' is not ever expected", destination) } - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { conn, err := m.waitForReadyProcess(ctx, destination, reflect.TypeOf(content)) if err != nil { logger.Errorf( @@ -400,7 +400,7 @@ func (m *Manager) sendMessage( Content: content, } - h := m.connLocker.Lock(context.Background(), destination) + h := m.connLocker.Lock(context.Background(), destination) // TODO: should we use the provided ctx? defer h.Unlock() defer time.Sleep( 100 * time.Millisecond, diff --git a/pkg/oauthhandler/oauth2_handler.go b/pkg/oauthhandler/oauth2_handler.go index 3b4e077..5bd96c1 100644 --- a/pkg/oauthhandler/oauth2_handler.go +++ b/pkg/oauthhandler/oauth2_handler.go @@ -86,14 +86,14 @@ func NewCodeReceiver( }), } - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { <-ctx.Done() listener.Close() srv.Close() close(codeCh) }) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { srv.Serve(listener) }) diff --git a/pkg/p2p/implementations/weron/p2p.go b/pkg/p2p/implementations/weron/p2p.go index a9ee1be..873d3fa 100644 --- a/pkg/p2p/implementations/weron/p2p.go +++ b/pkg/p2p/implementations/weron/p2p.go @@ -223,7 +223,7 @@ func (p *P2P) Start( } p.waitGroup.Add(1) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer p.waitGroup.Done() err := p.acceptPeersLoop(ctx, 0, myIDChan0) if err != nil && !errors.Is(err, context.Canceled) { @@ -232,7 +232,7 @@ func (p *P2P) Start( }) p.waitGroup.Add(1) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer p.waitGroup.Done() err := p.acceptPeersLoop(ctx, 1, myIDChan1) if err != nil && !errors.Is(err, context.Canceled) { diff --git a/pkg/p2p/implementations/weron/peer_client.go b/pkg/p2p/implementations/weron/peer_client.go index 02ea0be..803a1a7 100644 --- a/pkg/p2p/implementations/weron/peer_client.go +++ b/pkg/p2p/implementations/weron/peer_client.go @@ -39,7 +39,7 @@ func (p *peerClient) init( ) error { ctx, cancelFn := context.WithCancel(ctx) p.cancelFn = cancelFn - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { <-ctx.Done() if err := p.peer.network.removePeer(p.peer.id); err != nil { logger.Errorf(ctx, "unable to remove peer '%s': %v", p.peer.id, err) diff --git a/pkg/p2p/implementations/weron/peer_server.go b/pkg/p2p/implementations/weron/peer_server.go index 2fc7222..760fdc2 100644 --- a/pkg/p2p/implementations/weron/peer_server.go +++ b/pkg/p2p/implementations/weron/peer_server.go @@ -36,7 +36,7 @@ func (p *peerServer) init( ctx context.Context, ) error { ctx, cancelFn := context.WithCancel(ctx) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { <-ctx.Done() if err := p.peer.network.removePeer(p.peer.id); err != nil { logger.Errorf(ctx, "unable to remove peer '%s': %v", p.peer.id, err) @@ -59,7 +59,7 @@ func (p *peerServer) init( } p.waitGroup.Add(1) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer cancelFn() defer p.waitGroup.Done() logger.Infof(ctx, "started the gRPC server at '%s'", grpcServerListener.Addr()) diff --git a/pkg/streamcontrol/kick/chat_handler.go b/pkg/streamcontrol/kick/chat_handler.go index 6626f4b..e369b01 100644 --- a/pkg/streamcontrol/kick/chat_handler.go +++ b/pkg/streamcontrol/kick/chat_handler.go @@ -52,7 +52,7 @@ func NewChatHandler( messagesOutChan: make(chan streamcontrol.ChatMessage, 100), } - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer func() { close(h.messagesOutChan) }() diff --git a/pkg/streamcontrol/kick/kick.go b/pkg/streamcontrol/kick/kick.go index a11e7b0..5ad3dff 100644 --- a/pkg/streamcontrol/kick/kick.go +++ b/pkg/streamcontrol/kick/kick.go @@ -402,7 +402,7 @@ func (k *Kick) GetChatMessagesChan( defer func() { logger.Debugf(ctx, "/GetChatMessagesChan") }() outCh := make(chan streamcontrol.ChatMessage) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer func() { logger.Debugf(ctx, "closing the messages channel") close(outCh) diff --git a/pkg/streamcontrol/stream_control.go b/pkg/streamcontrol/stream_control.go index b3c763e..08876b7 100644 --- a/pkg/streamcontrol/stream_control.go +++ b/pkg/streamcontrol/stream_control.go @@ -289,7 +289,7 @@ func (s StreamControllers) ApplyProfiles( wg.Add(1) { p := p - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer wg.Done() profileType := reflect.TypeOf(p) c, ok := m[profileType] @@ -304,7 +304,7 @@ func (s StreamControllers) ApplyProfiles( }) } } - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { wg.Wait() close(errCh) }) @@ -408,7 +408,7 @@ func (s StreamControllers) concurrently( wg.Add(1) { c := c - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer wg.Done() if err := callback(c); err != nil { errCh <- err @@ -416,7 +416,7 @@ func (s StreamControllers) concurrently( }) } } - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { wg.Wait() close(errCh) }) diff --git a/pkg/streamcontrol/twitch/chat_handler.go b/pkg/streamcontrol/twitch/chat_handler.go index c6e5eac..e5e0563 100644 --- a/pkg/streamcontrol/twitch/chat_handler.go +++ b/pkg/streamcontrol/twitch/chat_handler.go @@ -64,7 +64,7 @@ func newChatHandler( } h.waitGroup.Add(1) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer h.waitGroup.Done() defer func() { h.client.Close() diff --git a/pkg/streamcontrol/twitch/twitch.go b/pkg/streamcontrol/twitch/twitch.go index 05e171f..b4e157f 100644 --- a/pkg/streamcontrol/twitch/twitch.go +++ b/pkg/streamcontrol/twitch/twitch.go @@ -553,7 +553,7 @@ func (t *Twitch) getNewClientCode( var resultErr error errCh := make(chan error) errWg.Add(1) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { errWg.Done() for err := range errCh { errmon.ObserveErrorCtx(ctx, err) @@ -575,7 +575,7 @@ func (t *Twitch) getNewClientCode( wg.Add(1) { listenPort := listenPort - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer func() { logger.Debugf(ctx, "ended the oauth handler at port %d", listenPort) }() defer wg.Done() authURL := GetAuthorizationURL( @@ -630,7 +630,7 @@ func (t *Twitch) getNewClientCode( } wg.Add(1) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer wg.Done() t := time.NewTicker(time.Second) defer t.Stop() @@ -649,7 +649,7 @@ func (t *Twitch) getNewClientCode( } }) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { wg.Wait() close(errCh) }) @@ -838,7 +838,7 @@ func (t *Twitch) GetChatMessagesChan( defer func() { logger.Debugf(ctx, "/GetChatMessagesChan") }() outCh := make(chan streamcontrol.ChatMessage) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer func() { logger.Debugf(ctx, "closing the messages channel") close(outCh) diff --git a/pkg/streamcontrol/youtube/chat_listener.go b/pkg/streamcontrol/youtube/chat_listener.go index 4523eee..e4b499d 100644 --- a/pkg/streamcontrol/youtube/chat_listener.go +++ b/pkg/streamcontrol/youtube/chat_listener.go @@ -83,7 +83,7 @@ func NewChatListener( messagesOutChan: make(chan streamcontrol.ChatMessage, 100), } l.wg.Add(1) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer l.wg.Done() defer func() { logger.Debugf(ctx, "the listener loop is finished") diff --git a/pkg/streamcontrol/youtube/youtube.go b/pkg/streamcontrol/youtube/youtube.go index d9c43a3..81aa865 100644 --- a/pkg/streamcontrol/youtube/youtube.go +++ b/pkg/streamcontrol/youtube/youtube.go @@ -86,7 +86,7 @@ func New( return nil, fmt.Errorf("connection verification failed: %w", err) } - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { ticker := time.NewTicker(time.Minute) defer ticker.Stop() for { @@ -232,7 +232,7 @@ func getToken(ctx context.Context, cfg Config) (*oauth2.Token, error) { var resultErr error errCh := make(chan error) errWg.Add(1) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { errWg.Done() for err := range errCh { errmon.ObserveErrorCtx(ctx, err) @@ -256,7 +256,7 @@ func getToken(ctx context.Context, cfg Config) (*oauth2.Token, error) { wg.Add(1) { oauthCfg := oauthCfg - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer wg.Done() oauthHandlerArg := oauthhandler.OAuthHandlerArgument{ AuthURL: oauthCfg.AuthCodeURL("state-token", oauth2.AccessTypeOffline), @@ -295,7 +295,7 @@ func getToken(ctx context.Context, cfg Config) (*oauth2.Token, error) { } wg.Add(1) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer wg.Done() t := time.NewTicker(time.Second) defer t.Stop() @@ -317,7 +317,7 @@ func getToken(ctx context.Context, cfg Config) (*oauth2.Token, error) { } }) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { wg.Wait() close(errCh) }) @@ -1039,7 +1039,7 @@ func (yt *YouTube) startChatListener( return fmt.Errorf("unable to initialize the chat listener instance: %w", err) } - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { err := yt.processChatListener(ctx, chatListener) if err != nil && !errors.Is(err, context.Canceled) { logger.Errorf(ctx, "unable to process the chat listener for '%s': %v", videoID, err) @@ -1392,7 +1392,7 @@ func (yt *YouTube) GetChatMessagesChan( defer logger.Debugf(ctx, "/GetChatMessagesChan") outCh := make(chan streamcontrol.ChatMessage) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer func() { logger.Debugf(ctx, "closing the messages channel") close(outCh) diff --git a/pkg/streamd/chat.go b/pkg/streamd/chat.go index ff39100..7e06c1a 100644 --- a/pkg/streamd/chat.go +++ b/pkg/streamd/chat.go @@ -32,7 +32,7 @@ func (d *StreamD) startListeningForChatMessages( if err != nil { return fmt.Errorf("unable to get the channel for chat messages of '%s': %w", platName, err) } - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer logger.Debugf(ctx, "/startListeningForChatMessages(ctx, '%s')", platName) for { select { diff --git a/pkg/streamd/client/client.go b/pkg/streamd/client/client.go index 7d7f4b8..64da814 100644 --- a/pkg/streamd/client/client.go +++ b/pkg/streamd/client/client.go @@ -421,7 +421,7 @@ func unwrapStreamDChan[E any, R any, S receiver[R]]( } r := make(chan E) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer closer.Close() defer cancelFn() for { diff --git a/pkg/streamd/events.go b/pkg/streamd/events.go index de4c3f9..0c77114 100644 --- a/pkg/streamd/events.go +++ b/pkg/streamd/events.go @@ -47,7 +47,7 @@ func (d *StreamD) submitEvent( exprCtx := objToMap(ev) for _, rule := range d.Config.TriggerRules { if rule.EventQuery.Match(ev) { - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { err := d.doAction(ctx, rule.Action, exprCtx) if err != nil { logger.Errorf(ctx, "unable to perform action %s: %v", rule.Action, err) @@ -131,13 +131,13 @@ func eventSubToChan[T any]( } if onReady != nil { - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer mutex.Unlock() onReady(ctx, r) }) } - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { <-ctx.Done() mutex.Lock() diff --git a/pkg/streamd/image_taker.go b/pkg/streamd/image_taker.go index 3f31862..529df31 100644 --- a/pkg/streamd/image_taker.go +++ b/pkg/streamd/image_taker.go @@ -61,7 +61,7 @@ func (d *StreamD) getImageBytes( } func (d *StreamD) initImageTaker(ctx context.Context) error { - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer logger.Debugf(ctx, "/imageTaker") ch, err := d.SubscribeToDashboardChanges(ctx) if err != nil { @@ -106,7 +106,7 @@ func (d *StreamD) restartImageTakerNoLock(ctx context.Context) error { elName, el := elName, el _ = el d.imageTakerWG.Add(1) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer d.imageTakerWG.Done() logger.Debugf(ctx, "taker of image '%s'", elName) defer logger.Debugf(ctx, "/taker of image '%s'", elName) diff --git a/pkg/streamd/obs_restarter.go b/pkg/streamd/obs_restarter.go index f3be69d..74092e9 100644 --- a/pkg/streamd/obs_restarter.go +++ b/pkg/streamd/obs_restarter.go @@ -73,7 +73,7 @@ func (r *obsRestarter) updateConfigNoLock( ctx, cancelFn := context.WithCancel(ctx) r.cancelFunc = cancelFn - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { d.obsRestarter.loop(ctx, execCmd) }) return nil diff --git a/pkg/streamd/streamd.go b/pkg/streamd/streamd.go index 4b469c8..265e0b8 100644 --- a/pkg/streamd/streamd.go +++ b/pkg/streamd/streamd.go @@ -236,7 +236,7 @@ func (d *StreamD) initChatMessagesStorage(ctx context.Context) (_err error) { logger.Debugf(ctx, "initChatMessagesStorage") defer logger.Debugf(ctx, "/initChatMessagesStorage: %v", _err) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { logger.Debugf(ctx, "initChatMessagesStorage-refresherLoop") defer logger.Debugf(ctx, "/initChatMessagesStorage-refresherLoop") t := time.NewTicker(5 * time.Second) @@ -271,7 +271,7 @@ func (d *StreamD) secretsProviderUpdater(ctx context.Context) (_err error) { }) logger.Debugf(ctx, "updated the secrets") - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { for { select { case <-ctx.Done(): @@ -392,7 +392,7 @@ func (d *StreamD) InitCache(ctx context.Context) error { var wg sync.WaitGroup wg.Add(1) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer wg.Done() _changedCache := d.initTwitchData(ctx) d.normalizeTwitchData() @@ -402,7 +402,7 @@ func (d *StreamD) InitCache(ctx context.Context) error { }) wg.Add(1) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer wg.Done() _changedCache := d.initKickData(ctx) d.normalizeKickData() @@ -412,7 +412,7 @@ func (d *StreamD) InitCache(ctx context.Context) error { }) wg.Add(1) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer wg.Done() _changedCache := d.initYoutubeData(ctx) d.normalizeYoutubeData() @@ -593,12 +593,12 @@ func (d *StreamD) onUpdateConfig( errCh := make(chan error, 1) wg.Add(1) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer wg.Done() errCh <- d.updateOBSRestarterConfig(ctx) }) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { wg.Wait() close(errCh) }) @@ -655,7 +655,7 @@ func (d *StreamD) StartStream( defer func() { d.StreamStatusCache.InvalidateCache(ctx) if platID == youtube.ID { - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { now := time.Now() time.Sleep(10 * time.Second) for time.Since(now) < 5*time.Minute { @@ -1639,7 +1639,7 @@ func (d *StreamD) WaitForStreamPublisher( } ch := make(chan struct{}) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { select { case <-pubCh: close(ch) diff --git a/pkg/streamd/timer.go b/pkg/streamd/timer.go index fad8753..44f60ae 100644 --- a/pkg/streamd/timer.go +++ b/pkg/streamd/timer.go @@ -62,7 +62,7 @@ func (t *Timer) start(ctx context.Context) { runningTimer := time.NewTimer(time.Until(t.TriggerAt)) t.RunningTimer = runningTimer - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { select { case <-ctx.Done(): t.stop(ctx) @@ -101,7 +101,7 @@ func (t *Timer) trigger(ctx context.Context) { logger.Debugf(ctx, "trigger (%T)", t.Timer.Action) defer logger.Debugf(ctx, "/trigger (%T)", t.Timer.Action) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { err := t.StreamD.RemoveTimer(ctx, t.Timer.ID) if err != nil { logger.Error(ctx, "unable to remove timer %d: %v", t.Timer.ID, err) diff --git a/pkg/streampanel/chat.go b/pkg/streampanel/chat.go index b9b1325..0a0959a 100644 --- a/pkg/streampanel/chat.go +++ b/pkg/streampanel/chat.go @@ -50,7 +50,7 @@ func (p *Panel) addChatUI(ctx context.Context, ui chatUIInterface) { p.chatUIs = append(p.chatUIs, ui) logger.Debugf(ctx, "len(p.chatUI) == %d", len(p.chatUIs)) }) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { <-ctx.Done() p.chatUIsLocker.Do(ctx, func() { p.chatUIs = slices.DeleteFunc(p.chatUIs, func(cmp chatUIInterface) bool { @@ -72,7 +72,7 @@ func (p *Panel) initChatMessagesHandler(ctx context.Context) error { return fmt.Errorf("unable to subscribe to chat messages: %w", err) } - observability.GoSafe(ctx, func() { + observability.GoSafeRestartable(ctx, func(ctx context.Context) { p.messageReceiverLoop(ctx, msgCh) }) return nil @@ -134,7 +134,7 @@ func (p *Panel) onReceiveMessage( if !notificationsEnabled { return } - observability.GoSafe(ctx, func() { + observability.GoSafe(ctx, func(ctx context.Context) { commandTemplate := xsync.DoR1(ctx, &p.configLocker, func() string { return p.Config.Chat.CommandOnReceiveMessage }) @@ -146,7 +146,7 @@ func (p *Panel) onReceiveMessage( p.execCommand(ctx, commandTemplate, msg) }) - observability.GoSafe(ctx, func() { + observability.GoSafe(ctx, func(ctx context.Context) { logger.Debugf(ctx, "SendNotification") defer logger.Debugf(ctx, "/SendNotification") p.app.SendNotification(&fyne.Notification{ @@ -154,7 +154,7 @@ func (p *Panel) onReceiveMessage( Content: msg.Username + ": " + msg.Message, }) }) - observability.GoSafe(ctx, func() { + observability.GoSafe(ctx, func(ctx context.Context) { soundEnabled := xsync.DoR1(ctx, &p.configLocker, func() bool { return p.Config.Chat.ReceiveMessageSoundAlarmEnabled() }) diff --git a/pkg/streampanel/chat_as_list.go b/pkg/streampanel/chat_as_list.go index 2991822..d00e725 100644 --- a/pkg/streampanel/chat_as_list.go +++ b/pkg/streampanel/chat_as_list.go @@ -141,7 +141,7 @@ func (ui *chatUIAsList) Remove( delete(ui.ItemsByMessageID, msg.MessageID) delete(ui.ItemsByCanvasObject, item.Container) }) - observability.Go(ctx, func() { ui.CanvasObject.Refresh() }) // TODO: remove the observability.Go + observability.Go(ctx, func(context.Context) { ui.CanvasObject.Refresh() }) // TODO: remove the observability.Go } func (ui *chatUIAsList) GetTotalHeight( diff --git a/pkg/streampanel/chat_as_text.go b/pkg/streampanel/chat_as_text.go index 88c9f8f..8663a29 100644 --- a/pkg/streampanel/chat_as_text.go +++ b/pkg/streampanel/chat_as_text.go @@ -101,7 +101,7 @@ func (ui *chatUIAsText) init( nil, ui.Text, ) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { ui.Rebuild(ctx) }) ui.Panel.addChatUI(ctx, ui) @@ -166,7 +166,7 @@ func (ui *chatUIAsText) Remove( ui.ItemLocker.Do(ctx, func() { delete(ui.ItemsByMessageID, msg.MessageID) }) - observability.Go(ctx, func() { ui.CanvasObject.Refresh() }) // TODO: remove the observability.Go + observability.Go(ctx, func(context.Context) { ui.CanvasObject.Refresh() }) // TODO: remove the observability.Go } func (ui *chatUIAsText) GetTotalHeight( diff --git a/pkg/streampanel/dashboard.go b/pkg/streampanel/dashboard.go index 75c4f2b..ccf8f04 100644 --- a/pkg/streampanel/dashboard.go +++ b/pkg/streampanel/dashboard.go @@ -189,7 +189,7 @@ func (w *dashboardWindow) renderStreamStatus(ctx context.Context) { bwIn := float64(bytesInDiff) * 8 / tsDiff.Seconds() / 1000 bwOut := float64(bytesOutDiff) * 8 / tsDiff.Seconds() / 1000 newAppStatusText := fmt.Sprintf("%4.0fKb/s | %4.0fKb/s", bwIn, bwOut) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { w.appStatus.SetText(newAppStatusText) }) } @@ -201,7 +201,7 @@ func (w *dashboardWindow) renderStreamStatus(ctx context.Context) { w.Panel.streamStatusLocker.Do(ctx, func() { w.streamStatusLocker.Do(ctx, func() { for platID, dst := range w.streamStatus { - observability.CallSafe(ctx, func() { + observability.CallSafe(ctx, func(ctx context.Context) { src := w.Panel.streamStatus[platID] if src == nil { logger.Debugf(ctx, "status for '%s' is not set", platID) @@ -210,7 +210,7 @@ func (w *dashboardWindow) renderStreamStatus(ctx context.Context) { defer dst.Refresh() if !src.BackendIsEnabled { - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { dst.SetText("disabled") }) return @@ -218,7 +218,7 @@ func (w *dashboardWindow) renderStreamStatus(ctx context.Context) { if src.BackendError != nil { dst.Importance = widget.LowImportance - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { dst.SetText("error") }) return @@ -226,7 +226,7 @@ func (w *dashboardWindow) renderStreamStatus(ctx context.Context) { if !src.IsActive { dst.Importance = widget.DangerImportance - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { dst.SetText("stopped") }) return @@ -234,7 +234,7 @@ func (w *dashboardWindow) renderStreamStatus(ctx context.Context) { dst.Importance = widget.SuccessImportance if src.StartedAt == nil { - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { dst.SetText("started") }) return @@ -247,7 +247,7 @@ func (w *dashboardWindow) renderStreamStatus(ctx context.Context) { viewerCountString = fmt.Sprintf(" (%d)", *src.ViewersCount) } - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { dst.SetText(fmt.Sprintf("%s%s", duration.Truncate(time.Second).String(), viewerCountString)) }) }) @@ -356,7 +356,7 @@ func (p *Panel) newDashboardWindow( c.Move(pos) } w.chat.ScrollToBottom(ctx) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { time.Sleep(time.Second) w.chat.ScrollToBottom(ctx) }) @@ -758,7 +758,7 @@ func (w *dashboardWindow) startUpdatingNoLock( ctx, cancelFunc := context.WithCancel(ctx) w.stopUpdatingFunc = cancelFunc - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { t := time.NewTicker(time.Second) defer t.Stop() oldSize := w.Window.Canvas().Size() @@ -777,7 +777,7 @@ func (w *dashboardWindow) startUpdatingNoLock( } }) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { t := time.NewTicker(time.Second) defer t.Stop() oldSize := w.chat.ScrollingContainer.Content.MinSize() @@ -797,7 +797,7 @@ func (w *dashboardWindow) startUpdatingNoLock( }) w.renderLocalStatus(ctx) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { t := time.NewTicker(2 * time.Second) defer t.Stop() for { @@ -817,12 +817,12 @@ func (w *dashboardWindow) startUpdatingNoLock( return } - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { w.updateImages(ctx, cfg.Dashboard) w.updateStreamStatus(ctx) w.renderStreamStatus(ctx) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { t := time.NewTicker(250 * time.Millisecond) defer t.Stop() for { @@ -836,7 +836,7 @@ func (w *dashboardWindow) startUpdatingNoLock( } }) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { t := time.NewTicker(2 * time.Second) defer t.Stop() for { @@ -995,7 +995,7 @@ func (w *dashboardWindow) updateImagesNoLock( continue } wg.Add(1) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer wg.Done() img, changed, err := w.getImage(ctx, streamdconsts.ImageID(el.ElementName)) if err != nil { @@ -1025,7 +1025,7 @@ func (w *dashboardWindow) updateImagesNoLock( } wg.Add(1) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer wg.Done() img, changed, err := w.getImage(ctx, consts.ImageScreenshot) if err != nil { @@ -1111,7 +1111,7 @@ func (p *Panel) newDashboardSettingsWindow(ctx context.Context) { p.dashboardLocker.Do(ctx, func() { if p.dashboardWindow != nil { p.dashboardWindow.Window.Close() - observability.Go(ctx, func() { p.focusDashboardWindow(ctx) }) + observability.Go(ctx, func(ctx context.Context) { p.focusDashboardWindow(ctx) }) } }) diff --git a/pkg/streampanel/error.go b/pkg/streampanel/error.go index ac4bade..522c230 100644 --- a/pkg/streampanel/error.go +++ b/pkg/streampanel/error.go @@ -70,7 +70,7 @@ func (p *Panel) statusPanelSet(text string) { newText = "status: " + text }) if panel != nil { - observability.CallSafe(ctx, func() { + observability.CallSafe(ctx, func(ctx context.Context) { panel.SetText(newText) }) } diff --git a/pkg/streampanel/events.go b/pkg/streampanel/events.go index a174936..a456f84 100644 --- a/pkg/streampanel/events.go +++ b/pkg/streampanel/events.go @@ -30,7 +30,7 @@ func (p *Panel) initEventSensor(ctx context.Context) { p.eventSensor = es - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { logger.Debugf(ctx, "eventSensor") defer logger.Debugf(ctx, "/eventSensor") es.Loop(ctx, p.StreamD) diff --git a/pkg/streampanel/fyne_hacks.go b/pkg/streampanel/fyne_hacks.go index 5984d3c..313e13a 100644 --- a/pkg/streampanel/fyne_hacks.go +++ b/pkg/streampanel/fyne_hacks.go @@ -15,7 +15,7 @@ func (p *Panel) initFyneHacks(ctx context.Context) { } func (p *Panel) initWindowsHealthChecker(ctx context.Context) { - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { logger.Debugf(ctx, "initWindowsHealthChecker") defer logger.Debugf(ctx, "/initWindowsHealthChecker") @@ -30,7 +30,6 @@ func (p *Panel) initWindowsHealthChecker(ctx context.Context) { p.checkAndFixWindowsHealth(ctx) } } - }) } @@ -81,7 +80,7 @@ func checkAndFixPermanentWindowHealth( logger.Warnf(ctx, "window %v has a broken event queue, fixing") window.InitEventQueue() - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { window.RunEventQueue() }) } diff --git a/pkg/streampanel/image.go b/pkg/streampanel/image.go index b76c8fc..f5ea991 100644 --- a/pkg/streampanel/image.go +++ b/pkg/streampanel/image.go @@ -312,7 +312,7 @@ func (p *Panel) reinitScreenshoter(ctx context.Context) { ctx, cancelFunc := context.WithCancel(ctx) p.screenshoterClose = cancelFunc - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { p.Screenshoter.Loop( ctx, 200*time.Millisecond, diff --git a/pkg/streampanel/monitor.go b/pkg/streampanel/monitor.go index 294abb6..0989746 100644 --- a/pkg/streampanel/monitor.go +++ b/pkg/streampanel/monitor.go @@ -79,7 +79,7 @@ func (p *Panel) startMonitorPage( p.monitorPageLocker.Do(ctx, func() { if p.monitorPage == nil { - observability.Go(ctx, func() { // TODO: get rid of this ugliness + observability.Go(ctx, func(ctx context.Context) { // TODO: get rid of this ugliness t := time.NewTicker(100 * time.Millisecond) defer t.Stop() for { @@ -220,7 +220,7 @@ func (p *monitorPage) startUpdatingNoLock( return } - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { updateData := func() { inStreams, err := streamD.ListIncomingStreams(ctx) if err != nil { @@ -534,7 +534,7 @@ func (w streamDAsStreamPlayersServerType) WaitPublisherChan( } result := make(chan streamplayer.Publisher) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer close(result) select { case <-ctx.Done(): diff --git a/pkg/streampanel/panel.go b/pkg/streampanel/panel.go index e337213..05e409c 100644 --- a/pkg/streampanel/panel.go +++ b/pkg/streampanel/panel.go @@ -282,7 +282,7 @@ func (p *Panel) Loop(ctx context.Context, opts ...LoopOption) (_err error) { closeLoadingWindow := func() { logger.Tracef(ctx, "closing the loading window") loadingWindow.Hide() - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { time.Sleep(10 * time.Millisecond) loadingWindow.Hide() time.Sleep(100 * time.Millisecond) @@ -292,7 +292,7 @@ func (p *Panel) Loop(ctx context.Context, opts ...LoopOption) (_err error) { }) } - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { if streamD, ok := p.StreamD.(*client.Client); ok { p.setStatusFunc("Connecting...") err := p.startOAuthListenerForRemoteStreamD(ctx, streamD) @@ -314,7 +314,7 @@ func (p *Panel) Loop(ctx context.Context, opts ...LoopOption) (_err error) { streamD := p.StreamD.(*streamd.StreamD) streamD.AddOAuthListenPort(cfg.OAuth.ListenPorts.Twitch) streamD.AddOAuthListenPort(cfg.OAuth.ListenPorts.Kick) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { <-ctx.Done() streamD.RemoveOAuthListenPort(cfg.OAuth.ListenPorts.Twitch) streamD.RemoveOAuthListenPort(cfg.OAuth.ListenPorts.Kick) @@ -355,7 +355,7 @@ func (p *Panel) Loop(ctx context.Context, opts ...LoopOption) (_err error) { } if initCfg.AutoUpdater != nil { - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { p.checkForUpdates(ctx, initCfg.AutoUpdater) }) } @@ -532,7 +532,7 @@ func (p *Panel) startOAuthListenerForRemoteStreamD( } logger.Debugf(ctx, "started oauth listener for the remote streamd") - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { logger.Debugf(ctx, "oauthListenerForRemoteStreamD") defer logger.Debugf(ctx, "/oauthListenerForRemoteStreamD") defer cancelFn() @@ -1153,7 +1153,7 @@ func (p *Panel) getUpdatedStatus_backends_noLock(ctx context.Context) { } if backendEnabled[obs.ID] { - observability.Call(ctx, func() { + observability.Call(ctx, func(ctx context.Context) { obsServer, obsServerClose, err := p.StreamD.OBS(ctx) if obsServerClose != nil { defer obsServerClose() @@ -1685,7 +1685,7 @@ func (p *Panel) initMainWindow( p.dashboardShowHideButton = widget.NewButtonWithIcon("Open", theme.ComputerIcon(), func() { p.dashboardLocker.Do(ctx, func() { if p.dashboardWindow == nil { - observability.Go(ctx, func() { p.focusDashboardWindow(ctx) }) + observability.Go(ctx, func(ctx context.Context) { p.focusDashboardWindow(ctx) }) } else { p.dashboardWindow.Window.Close() } @@ -1879,7 +1879,7 @@ func (p *Panel) subscribeUpdateControlPage(ctx context.Context) { p.getUpdatedStatus(ctx) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { t := time.NewTicker(time.Second * 5) defer t.Stop() for { @@ -1918,7 +1918,7 @@ func (p *Panel) execCommand( var stdout, stderr bytes.Buffer cmd.Stdout = &stdout cmd.Stderr = &stderr - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { err := cmd.Run() if err == nil { err = child_process_manager.AddChildProcess(cmd.Process) @@ -2050,7 +2050,7 @@ func (p *Panel) setupStreamNoLock(ctx context.Context) { // in the browser, then the stream does not want to start. // // And here we wait until the hack with opening the page will complete. - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { waitFor := 15 * time.Second deadline := time.Now().Add(waitFor) @@ -2077,7 +2077,7 @@ func (p *Panel) setupStreamNoLock(ctx context.Context) { func (p *Panel) startStream(ctx context.Context) { p.streamMutex.ManualLock(ctx) defer func() { - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { time.Sleep(10 * time.Second) // TODO: remove this p.streamMutex.ManualUnlock(ctx) }) @@ -2133,11 +2133,11 @@ func (p *Panel) afterStreamStart(ctx context.Context) { p.execCommand(ctx, onStreamStart, nil) } - observability.Go(ctx, func() { p.openStreamStartedWindow(ctx) }) + observability.Go(ctx, func(ctx context.Context) { p.openStreamStartedWindow(ctx) }) } func (p *Panel) stopStream(ctx context.Context) { - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { p.streamStartedLocker.Do(ctx, func() { if p.streamStartedWindow != nil { p.streamStartedWindow.Close() @@ -2297,7 +2297,7 @@ const aggregationDelayBeforeNotificationEnd = 100 * time.Millisecond func (p *Panel) showWaitStreamDCallWindow(ctx context.Context) { atomic.AddInt32(&p.waitStreamDCallWindowCounter, 1) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer func() { <-ctx.Done() p.waitStreamDCallWindowLocker.Do(ctx, func() { @@ -2326,7 +2326,7 @@ func (p *Panel) showWaitStreamDCallWindow(ctx context.Context) { func (p *Panel) showWaitStreamDConnectWindow(ctx context.Context) { atomic.AddInt32(&p.waitStreamDConnectWindowCounter, 1) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer func() { <-ctx.Done() p.waitStreamDConnectWindowLocker.Do(ctx, func() { @@ -2361,7 +2361,9 @@ func (p *Panel) Close() (_err error) { err = multierror.Append(err, p.eventSensor.Close()) // TODO: remove observability.Go, Quit should be executed synchronously, // but there is a bug in fyne and it hangs - observability.Go(context.TODO(), p.app.Quit) + observability.Go(context.TODO(), func(ctx context.Context) { + p.app.Quit() + }) return err.ErrorOrNil() } @@ -2424,7 +2426,7 @@ func (p *Panel) localConfigCacheUpdater(ctx context.Context) (_err error) { p.configCache = newCfg }) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { logger.Debugf(ctx, "localConfigUpdaterLoop") defer logger.Debugf(ctx, "/localConfigUpdaterLoop") diff --git a/pkg/streampanel/profile.go b/pkg/streampanel/profile.go index 53d6e6f..fb43e2c 100644 --- a/pkg/streampanel/profile.go +++ b/pkg/streampanel/profile.go @@ -307,7 +307,7 @@ func (p *Panel) profileWindow( for _, cat := range dataTwitch.Cache.Categories { if cleanTwitchCategoryName(cat.Name) == text { setSelectedTwitchCategory(cat.Name) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { time.Sleep(100 * time.Millisecond) twitchCategory.SetText("") }) @@ -409,7 +409,7 @@ func (p *Panel) profileWindow( text = cleanKickCategoryName(text) cat := catN[text] setSelectedKickCategory(cat.ID) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { time.Sleep(100 * time.Millisecond) kickCategory.SetText("") }) @@ -516,7 +516,7 @@ func (p *Panel) profileWindow( for _, bc := range dataYouTube.Cache.Broadcasts { if cleanYoutubeRecordingName(bc.Snippet.Title) == text { setSelectedYoutubeBroadcast(bc) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { time.Sleep(100 * time.Millisecond) youtubeTemplate.SetText("") }) diff --git a/pkg/streampanel/restream.go b/pkg/streampanel/restream.go index 9bb34c1..a051eb9 100644 --- a/pkg/streampanel/restream.go +++ b/pkg/streampanel/restream.go @@ -48,7 +48,7 @@ func (p *Panel) initRestreamPage( logger.Debugf(ctx, "initRestreamPage") defer logger.Debugf(ctx, "/initRestreamPage") - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { updateData := func() { inStreams, err := p.StreamD.ListIncomingStreams(ctx) if err != nil { @@ -70,7 +70,7 @@ func (p *Panel) initRestreamPage( } }) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { updateData := func() { streamServers, err := p.StreamD.ListStreamServers(ctx) if err != nil { @@ -92,7 +92,7 @@ func (p *Panel) initRestreamPage( } }) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { updateData := func() { dsts, err := p.StreamD.ListStreamDestinations(ctx) if err != nil { @@ -114,7 +114,7 @@ func (p *Panel) initRestreamPage( } }) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { updateData := func() { streamFwds, err := p.StreamD.ListStreamForwards(ctx) if err != nil { @@ -136,7 +136,7 @@ func (p *Panel) initRestreamPage( } }) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { updateData := func() { streamPlayers, err := p.StreamD.ListStreamPlayers(ctx) if err != nil { @@ -1548,11 +1548,11 @@ func (p *Panel) streamServersUpdater( ctx context.Context, ) context.CancelFunc { ctx, cancelFn := context.WithCancel(ctx) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { logger.Debugf(ctx, "streamServersUpdater: updater") defer logger.Debugf(ctx, "streamServersUpdater: /updater") - updateData := func() { + updateData := func(ctx context.Context) { streamServers, err := p.StreamD.ListStreamServers(ctx) if err != nil { p.DisplayError(err) @@ -1579,7 +1579,7 @@ func (p *Panel) streamServersUpdater( return case <-t.C: } - updateData() + updateData(ctx) } }) return cancelFn @@ -1589,11 +1589,11 @@ func (p *Panel) startStreamPlayersUpdater( ctx context.Context, ) context.CancelFunc { ctx, cancelFn := context.WithCancel(ctx) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { logger.Debugf(ctx, "startStreamPlayersUpdater: updater") defer logger.Debugf(ctx, "startStreamPlayersUpdater: /updater") - updateData := func() { + updateData := func(ctx context.Context) { streamPlayers, err := p.StreamD.ListStreamPlayers(ctx) if err != nil { p.DisplayError(err) @@ -1620,7 +1620,7 @@ func (p *Panel) startStreamPlayersUpdater( return case <-t.C: } - updateData() + updateData(ctx) } }) return cancelFn @@ -1630,11 +1630,11 @@ func (p *Panel) startStreamForwardersUpdater( ctx context.Context, ) context.CancelFunc { ctx, cancelFn := context.WithCancel(ctx) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { logger.Debugf(ctx, "startStreamForwardersUpdater: updater") defer logger.Debugf(ctx, "startStreamForwardersUpdater: /updater") - updateData := func() { + updateData := func(ctx context.Context) { streamFwds, err := p.StreamD.ListStreamForwards(ctx) if err != nil { p.DisplayError(err) @@ -1661,7 +1661,7 @@ func (p *Panel) startStreamForwardersUpdater( return case <-t.C: } - updateData() + updateData(ctx) } }) return cancelFn diff --git a/pkg/streampanel/stream_started_window.go b/pkg/streampanel/stream_started_window.go index 2cae7eb..ef9f833 100644 --- a/pkg/streampanel/stream_started_window.go +++ b/pkg/streampanel/stream_started_window.go @@ -135,7 +135,7 @@ func (w *streamStartedWindow) updateStreamStatusLoop( var wg sync.WaitGroup wg.Add(1) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer wg.Done() t := time.NewTicker(time.Second) defer t.Stop() @@ -150,7 +150,7 @@ func (w *streamStartedWindow) updateStreamStatusLoop( }) wg.Add(1) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer wg.Done() t := time.NewTicker(time.Second) defer t.Stop() @@ -198,7 +198,7 @@ func (w *streamStartedWindow) open( } ctx, cancelFn := context.WithCancel(ctx) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { w.updateStreamStatusLoop(ctx) }) diff --git a/pkg/streampanel/stream_status.go b/pkg/streampanel/stream_status.go index 199af41..b8ffdaf 100644 --- a/pkg/streampanel/stream_status.go +++ b/pkg/streampanel/stream_status.go @@ -37,7 +37,7 @@ func (p *Panel) updateStreamStatus( kick.ID, } { wg.Add(1) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer wg.Done() ok, err := p.StreamD.IsBackendEnabled(ctx, platID) diff --git a/pkg/streampanel/streamd.go b/pkg/streampanel/streamd.go index 230d02e..7c09f34 100644 --- a/pkg/streampanel/streamd.go +++ b/pkg/streampanel/streamd.go @@ -145,7 +145,7 @@ func initGRPCServers( } obsGRPC, obsGRPCClose, err := streamD.OBS(ctx) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { <-ctx.Done() if obsGRPCClose != nil { obsGRPCClose() diff --git a/pkg/streampanel/timers.go b/pkg/streampanel/timers.go index 1520723..a7d6280 100644 --- a/pkg/streampanel/timers.go +++ b/pkg/streampanel/timers.go @@ -89,7 +89,7 @@ func (ui *timersUI) StartRefreshingFromRemote( ui.refresherCancelFunc = cancelFn ui.refreshFromRemote(ctx) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { t := time.NewTicker(time.Second * 5) defer t.Stop() for { @@ -310,7 +310,7 @@ func (ui *timersUI) kickOff( ctx, ui.timerCancelFunc = context.WithCancel(ctx) ui.timer = time.NewTimer(time.Until(deadline)) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer func() { ui.doStop(xcontext.DetachDone(ctx)) }() diff --git a/pkg/streampanel/trigger_rules.go b/pkg/streampanel/trigger_rules.go index 013e892..409212f 100644 --- a/pkg/streampanel/trigger_rules.go +++ b/pkg/streampanel/trigger_rules.go @@ -74,7 +74,7 @@ func (ui *triggerRulesUI) openSetupWindow(ctx context.Context) { if err != nil { return err } - observability.Go(ctx, func() { refreshContent() }) + observability.Go(ctx, func(ctx context.Context) { refreshContent() }) return nil }, ) @@ -92,7 +92,7 @@ func (ui *triggerRulesUI) openSetupWindow(ctx context.Context) { ui.panel.DisplayError(err) return } - observability.Go(ctx, func() { refreshContent() }) + observability.Go(ctx, func(ctx context.Context) { refreshContent() }) }, ui.panel.mainWindow, ) @@ -117,7 +117,7 @@ func (ui *triggerRulesUI) openSetupWindow(ctx context.Context) { if err != nil { return err } - observability.Go(ctx, func() { refreshContent() }) + observability.Go(ctx, func(ctx context.Context) { refreshContent() }) return nil }, ) diff --git a/pkg/streampanel/update_timer_handler.go b/pkg/streampanel/update_timer_handler.go index 0dc261c..7333ba8 100644 --- a/pkg/streampanel/update_timer_handler.go +++ b/pkg/streampanel/update_timer_handler.go @@ -9,7 +9,6 @@ import ( ) type updateTimerHandler struct { - ctx context.Context cancelFn context.CancelFunc startStopButton *widget.Button startTS time.Time @@ -21,7 +20,6 @@ func newUpdateTimerHandler( ) *updateTimerHandler { ctx, cancelFn := context.WithCancel(context.Background()) h := &updateTimerHandler{ - ctx: ctx, cancelFn: cancelFn, startStopButton: startStopButton, startTS: startedAt, @@ -39,12 +37,12 @@ func (h *updateTimerHandler) GetStartTS() time.Time { return h.startTS } -func (h *updateTimerHandler) loop() { +func (h *updateTimerHandler) loop(ctx context.Context) { t := time.NewTicker(time.Second) defer t.Stop() for { select { - case <-h.ctx.Done(): + case <-ctx.Done(): return case <-t.C: timePassed := time.Since(h.startTS).Truncate(time.Second) diff --git a/pkg/streampanel/widget_hint.go b/pkg/streampanel/widget_hint.go index d133502..916bc03 100644 --- a/pkg/streampanel/widget_hint.go +++ b/pkg/streampanel/widget_hint.go @@ -66,7 +66,7 @@ func (w *HintWidget) mouseIn(ev *desktop.MouseEvent) { panic("should not have happened") } w.RecheckerCancelFn = cancelFn - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { for { select { case <-ctx.Done(): diff --git a/pkg/streamplayer/stream_player.go b/pkg/streamplayer/stream_player.go index c734551..3d50423 100644 --- a/pkg/streamplayer/stream_player.go +++ b/pkg/streamplayer/stream_player.go @@ -240,7 +240,7 @@ func (p *StreamPlayerHandler) startU(ctx context.Context) error { p.Player = player logger.Debugf(ctx, "initialized player %#+v", player) - observability.Go(ctx, func() { p.controllerLoop(ctx, cancelFn) }) + observability.Go(ctx, func(ctx context.Context) { p.controllerLoop(ctx, cancelFn) }) return nil } @@ -351,7 +351,7 @@ func (p *StreamPlayerHandler) startObserver( url *url.URL, restartFn context.CancelFunc, ) { - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer restartFn() logger.Debugf(ctx, "observer started") defer func() { logger.Debugf(ctx, "observer ended") }() @@ -468,7 +468,7 @@ func (p *StreamPlayerHandler) openStream( ctx, cancelFn := context.WithTimeout(ctx, openURLTimeout) defer cancelFn() var once sync.Once - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { <-ctx.Done() once.Do(func() { logger.Errorf(ctx, "timed out, unable to open the URL '%s' within the timeout of %s", u, openURLTimeout) @@ -535,7 +535,7 @@ func (p *StreamPlayerHandler) controllerLoop( return } isClosed = true - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { p.PlayerLocker.Do(ctx, func() { err := p.restartU(ctx) errmon.ObserveErrorCtx(ctx, err) @@ -646,7 +646,7 @@ func (p *StreamPlayerHandler) controllerLoop( if !triedToFixEmptyLinkViaReopen { if link, err := player.GetLink(ctx); link == "" { logger.Debugf(ctx, "the link is empty for some reason, reopening the link (BTW, err if any is: %v)", err) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { if err := p.openStream(ctx, restart); err != nil { logger.Errorf(ctx, "unable to open link '%s': %v", link, err) } @@ -704,7 +704,7 @@ func (p *StreamPlayerHandler) controllerLoop( restart() return false } - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { if err := p.openStream(ctx, restart); err != nil { logger.Error(ctx, "unable to re-open the stream: %v", err) restart() @@ -748,7 +748,7 @@ func (p *StreamPlayerHandler) controllerLoop( cancelFn() return } else { - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { select { case <-closeChan: logger.Warnf(ctx, "the player is apparently closed, restarting it") @@ -773,7 +773,7 @@ func (p *StreamPlayerHandler) controllerLoop( logger.Error(ctx, "unable to access the player for setting it up for streaming: %v", err) } - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { time.Sleep(time.Second) // TODO: delete this ugly racy hack p.notifyStart(context.WithValue(ctx, CtxKeyStreamPlayer, p)) }) diff --git a/pkg/streamserver/implementations/mediamtx/streamserver/stream_server.go b/pkg/streamserver/implementations/mediamtx/streamserver/stream_server.go index 24e15f8..db45738 100644 --- a/pkg/streamserver/implementations/mediamtx/streamserver/stream_server.go +++ b/pkg/streamserver/implementations/mediamtx/streamserver/stream_server.go @@ -115,7 +115,7 @@ func (s *StreamServer) init( for _, srv := range cfg.PortServers { { srv := srv - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { s.mutex.Do(ctx, func() { _, err := s.startServer(ctx, srv.Type, srv.ListenAddr, srv.Options()...) if err != nil { @@ -392,7 +392,7 @@ func (s *StreamServer) WaitPublisherChan( appKey := types.AppKey(streamID) ch := make(chan types.Publisher, 1) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { var curPublisher *PublisherClosedNotifier if waitForNext { curPublisher = xsync.DoR1( diff --git a/pkg/streamserver/implementations/xaionaro-go-rtmp/streamserver/stream_server.go b/pkg/streamserver/implementations/xaionaro-go-rtmp/streamserver/stream_server.go index 2d171a8..8f8d8e6 100644 --- a/pkg/streamserver/implementations/xaionaro-go-rtmp/streamserver/stream_server.go +++ b/pkg/streamserver/implementations/xaionaro-go-rtmp/streamserver/stream_server.go @@ -87,7 +87,7 @@ func (s *StreamServer) init( for _, srv := range cfg.PortServers { { srv := srv - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { _, err := s.startServer(ctx, srv.Type, srv.ListenAddr) if err != nil { logger.Errorf( @@ -238,7 +238,7 @@ func (s *StreamServer) startServer( } }, }) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { err = portSrv.Serve(listener) if err != nil { err = fmt.Errorf( @@ -374,7 +374,7 @@ func (s *StreamServer) WaitPublisherChan( ) (<-chan types.Publisher, error) { ch := make(chan types.Publisher, 1) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { ch <- s.RelayService.WaitPubsub(ctx, types.StreamID2LocalAppName(streamID), waitForNext) close(ch) }) diff --git a/pkg/streamserver/streamforward/stream_forward.go b/pkg/streamserver/streamforward/stream_forward.go index bca87c8..88aab10 100644 --- a/pkg/streamserver/streamforward/stream_forward.go +++ b/pkg/streamserver/streamforward/stream_forward.go @@ -108,7 +108,7 @@ func (fwd *ActiveStreamForwarding) start(ctx context.Context) (_err error) { } ctx, cancelFn := context.WithCancel(ctx) fwd.cancelFunc = cancelFn - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { for { err := fwd.waitForPublisherAndStart( ctx, @@ -182,7 +182,7 @@ func (fwd *ActiveStreamForwarding) waitForPublisherAndStart( ctx, cancelFn := context.WithCancel(ctx) defer cancelFn() - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer cancelFn() select { case <-ctx.Done(): @@ -322,7 +322,7 @@ func (fwd *ActiveStreamForwarding) waitForPublisherAndStart( return err } - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { t := time.NewTicker(time.Second) defer t.Stop() for { @@ -429,7 +429,7 @@ func (fwd *ActiveStreamForwarding) killRecodingProcess( } resultCh := make(chan error, 1) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer func() { close(resultCh) }() diff --git a/pkg/streamserver/streamforward/stream_forwards.go b/pkg/streamserver/streamforward/stream_forwards.go index 79c98c4..7ccf3d3 100644 --- a/pkg/streamserver/streamforward/stream_forwards.go +++ b/pkg/streamserver/streamforward/stream_forwards.go @@ -315,7 +315,7 @@ func (s *StreamForwards) newActiveStreamForward( result.ActiveForwarding = fwd if quirks.RestartUntilYoutubeRecognizesStream.Enabled { - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { s.restartUntilYoutubeRecognizesStream( ctx, result, diff --git a/pkg/streamserver/streamplayers/stream_players.go b/pkg/streamserver/streamplayers/stream_players.go index 3c5f3f3..ff659d9 100644 --- a/pkg/streamserver/streamplayers/stream_players.go +++ b/pkg/streamserver/streamplayers/stream_players.go @@ -45,7 +45,7 @@ func (s *StreamPlayers) Init( ) error { initCfg := types.InitOptions(opts).Config() s.WithConfig(ctx, func(ctx context.Context, cfg *types.Config) { - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { var opts SetupOptions if initCfg.DefaultStreamPlayerOptions != nil { opts = append( @@ -175,7 +175,7 @@ func setupStreamPlayers( } ch := make(chan struct{}) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { select { case <-ctx.Done(): return diff --git a/pkg/windowmanagerhandler/window_manager_handler_linux_xserver.go b/pkg/windowmanagerhandler/window_manager_handler_linux_xserver.go index a1c927a..dd73052 100644 --- a/pkg/windowmanagerhandler/window_manager_handler_linux_xserver.go +++ b/pkg/windowmanagerhandler/window_manager_handler_linux_xserver.go @@ -37,7 +37,7 @@ func (wmh *XWindowManagerHandler) WindowFocusChangeChan(ctx context.Context) <-c logger.Debugf(ctx, "WindowFocusChangeChan") ch := make(chan WindowFocusChange) - observability.Go(ctx, func() { + observability.Go(ctx, func(ctx context.Context) { defer logger.Debugf(ctx, "/WindowFocusChangeChan") defer func() { close(ch)