diff --git a/cmd/streamd/main.go b/cmd/streamd/main.go index 1458d5d..1637127 100644 --- a/cmd/streamd/main.go +++ b/cmd/streamd/main.go @@ -69,14 +69,17 @@ func main() { } } + ctx := context.Background() + ctx = logger.CtxWithLogger(ctx, l) + if *netPprofAddr != "" || (forceNetPProfOnAndroid && runtime.GOOS == "android") { - go func() { + observability.Go(ctx, func() { if *netPprofAddr == "" { *netPprofAddr = "localhost:0" } l.Infof("starting to listen for net/pprof requests at '%s'", *netPprofAddr) l.Error(http.ListenAndServe(*netPprofAddr, nil)) - }() + }) } if oldValue := runtime.GOMAXPROCS(0); oldValue < 16 { @@ -84,9 +87,6 @@ func main() { runtime.GOMAXPROCS(16) } - ctx := context.Background() - ctx = logger.CtxWithLogger(ctx, l) - if *sentryDSN != "" { l.Infof("setting up Sentry at DSN '%s'", *sentryDSN) sentryClient, err := sentry.NewClient(sentry.ClientOptions{ @@ -154,21 +154,21 @@ func main() { l.Fatalf("unable to initialize the streamd instance: %v", err) } - go func() { + observability.Go(ctx, func() { if err = streamD.Run(ctx); err != nil { l.Errorf("streamd returned an error: %v", err) } - }() + }) listener, err := net.Listen("tcp", *listenAddr) if err != nil { log.Fatalf("failed to listen: %v", err) } - go func() { + observability.Go(ctx, func() { <-ctx.Done() listener.Close() - }() + }) grpcServer := grpc.NewServer() streamdGRPC = server.NewGRPCServer(streamD) diff --git a/cmd/streampanel/FyneApp.toml b/cmd/streampanel/FyneApp.toml index f94f738..86e5571 100644 --- a/cmd/streampanel/FyneApp.toml +++ b/cmd/streampanel/FyneApp.toml @@ -5,4 +5,4 @@ Website = "https://github.com/xaionaro/streamctl" Name = "streampanel" ID = "center.dx.streampanel" Version = "0.1.0" - Build = 84 + Build = 85 diff --git a/cmd/streampanel/fork.go b/cmd/streampanel/fork.go index 780dab8..9711436 100644 --- a/cmd/streampanel/fork.go +++ b/cmd/streampanel/fork.go @@ -13,6 +13,7 @@ import ( "github.com/facebookincubator/go-belt" "github.com/facebookincubator/go-belt/tool/logger" "github.com/xaionaro-go/streamctl/pkg/mainprocess" + "github.com/xaionaro-go/streamctl/pkg/observability" ) type ProcessName = mainprocess.ProcessName @@ -140,7 +141,7 @@ func runSplitProcesses( } return nil }) - go func() { + observability.Go(ctx, func() { select { case <-ctx.Done(): return @@ -151,7 +152,7 @@ func runSplitProcesses( if err != nil { logger.Fatalf(ctx, "%s", err) } - }() + }) <-ctx.Done() } @@ -187,12 +188,12 @@ func runFork( if err != nil { return fmt.Errorf("unable to start '%s %s': %w", args[0], strings.Join(args[1:], " "), err) } - go func() { + observability.Go(ctx, func() { err := cmd.Wait() if err != nil { logger.Errorf(ctx, "error running '%s %s': %v", args[0], strings.Join(args[1:], " "), err) } - }() + }) return nil } diff --git a/cmd/streampanel/main.go b/cmd/streampanel/main.go index c4b0060..7850b00 100644 --- a/cmd/streampanel/main.go +++ b/cmd/streampanel/main.go @@ -11,6 +11,7 @@ import ( "github.com/facebookincubator/go-belt" "github.com/facebookincubator/go-belt/tool/logger" "github.com/xaionaro-go/streamctl/pkg/mainprocess" + "github.com/xaionaro-go/streamctl/pkg/observability" "github.com/xaionaro-go/streamctl/pkg/streamcontrol" "github.com/xaionaro-go/streamctl/pkg/streamd/grpc/go/streamd_grpc" "github.com/xaionaro-go/streamctl/pkg/streampanel" @@ -98,7 +99,7 @@ func runPanel( if mainProcess != nil { setReadyFor(ctx, mainProcess, StreamDDied{}, UpdateStreamDConfig{}) - go func() { + observability.Go(ctx, func() { err := mainProcess.Serve( ctx, func(ctx context.Context, source mainprocess.ProcessName, content any) error { @@ -124,7 +125,7 @@ func runPanel( }, ) logger.Fatalf(ctx, "communication (with the main process) error: %v", err) - }() + }) } var loopOpts []streampanel.LoopOption diff --git a/cmd/streampanel/runtime.go b/cmd/streampanel/runtime.go index b748fad..a322f8c 100644 --- a/cmd/streampanel/runtime.go +++ b/cmd/streampanel/runtime.go @@ -11,6 +11,7 @@ import ( "time" "github.com/facebookincubator/go-belt/tool/logger" + "github.com/xaionaro-go/streamctl/pkg/observability" ) func initRuntime(ctx context.Context, flags Flags, _procName ProcessName) context.CancelFunc { @@ -20,7 +21,7 @@ func initRuntime(ctx context.Context, flags Flags, _procName ProcessName) contex l := logger.FromCtx(ctx) if ForceDebug { - go func() { + observability.Go(ctx, func() { t := time.NewTicker(time.Second) defer t.Stop() for { @@ -33,7 +34,7 @@ func initRuntime(ctx context.Context, flags Flags, _procName ProcessName) contex l.Tracef("stacktraces:\n%s", buf.String()) <-t.C } - }() + }) } if flags.CPUProfile != "" { @@ -78,10 +79,10 @@ func initRuntime(ctx context.Context, flags Flags, _procName ProcessName) contex } if netPprofAddr != "" { - go func() { + observability.Go(ctx, func() { l.Infof("starting to listen for net/pprof requests at '%s'", netPprofAddr) l.Error(http.ListenAndServe(netPprofAddr, nil)) - }() + }) } if oldValue := runtime.GOMAXPROCS(0); oldValue < 16 { diff --git a/cmd/streampanel/signal_handler.go b/cmd/streampanel/signal_handler.go index 16764ee..71811d5 100644 --- a/cmd/streampanel/signal_handler.go +++ b/cmd/streampanel/signal_handler.go @@ -6,6 +6,7 @@ import ( "os/signal" "github.com/facebookincubator/go-belt/tool/logger" + "github.com/xaionaro-go/streamctl/pkg/observability" ) func signalHandler( @@ -13,7 +14,7 @@ func signalHandler( ) chan<- os.Signal { c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) - go func() { + observability.Go(ctx, func() { for range c { forkLocker.Lock() for name, f := range forkMap { @@ -26,6 +27,6 @@ func signalHandler( forkLocker.Unlock() os.Exit(1) } - }() + }) return c } diff --git a/cmd/streampanel/streamd.go b/cmd/streampanel/streamd.go index 94e9ab7..6e8b668 100644 --- a/cmd/streampanel/streamd.go +++ b/cmd/streampanel/streamd.go @@ -14,6 +14,7 @@ import ( "github.com/facebookincubator/go-belt/tool/logger" "github.com/xaionaro-go/streamctl/cmd/streamd/ui" "github.com/xaionaro-go/streamctl/pkg/mainprocess" + "github.com/xaionaro-go/streamctl/pkg/observability" "github.com/xaionaro-go/streamctl/pkg/streamcontrol" "github.com/xaionaro-go/streamctl/pkg/streamd" "github.com/xaionaro-go/streamctl/pkg/streamd/api" @@ -139,7 +140,7 @@ func runStreamd( if mainProcess != nil { logger.Debugf(ctx, "starting the IPC server") setReadyFor(ctx, mainProcess, GetStreamdAddress{}, RequestStreamDConfig{}) - go func() { + observability.Go(ctx, func() { err := mainProcess.Serve( ctx, func( @@ -169,7 +170,7 @@ func runStreamd( }, ) logger.Fatalf(ctx, "communication (with the main process) error: %v", err) - }() + }) } err = streamD.Run(ctx) @@ -198,23 +199,23 @@ func initGRPCServer( if err != nil { logger.Fatalf(ctx, "failed to listen: %v", err) } - go func() { + observability.Go(ctx, func() { <-ctx.Done() listener.Close() - }() + }) grpcServer := grpc.NewServer() streamdGRPC := server.NewGRPCServer(streamD) streamd_grpc.RegisterStreamDServer(grpcServer, streamdGRPC) // start the server: - go func() { + observability.Go(ctx, func() { logger.Infof(ctx, "started server at %s", listener.Addr().String()) err = grpcServer.Serve(listener) if err != nil { logger.Fatal(ctx, err) } - }() + }) return listener, grpcServer, streamdGRPC } diff --git a/pkg/hyprspace/up.go b/pkg/hyprspace/up.go index b394cb5..45288c7 100644 --- a/pkg/hyprspace/up.go +++ b/pkg/hyprspace/up.go @@ -33,6 +33,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/xaionaro-go/streamctl/pkg/observability" "github.com/yl2chen/cidranger" ) @@ -160,9 +161,9 @@ func UpRun( if ok { metricsTuple := fmt.Sprintf("127.0.0.1:%s", metricsPort) http.Handle("/metrics", promhttp.Handler()) - go func() { + observability.Go(ctx, func() { http.ListenAndServe(metricsTuple, nil) - }() + }) fmt.Printf("[+] Listening for metrics scrape requests on http://%s/metrics\n", metricsTuple) } diff --git a/pkg/mainprocess/client.go b/pkg/mainprocess/client.go index e2a2aa2..8ec6672 100644 --- a/pkg/mainprocess/client.go +++ b/pkg/mainprocess/client.go @@ -8,6 +8,7 @@ import ( "sync" "github.com/facebookincubator/go-belt/tool/logger" + "github.com/xaionaro-go/streamctl/pkg/observability" ) type Client struct { @@ -86,13 +87,13 @@ func (c *Client) Serve( ) error { ctx, cancelFn := context.WithCancel(ctx) defer cancelFn() - go func() { + observability.Go(ctx, func() { <-ctx.Done() err := c.Close() if err != nil { logger.Error(ctx, err) } - }() + }) for { select { diff --git a/pkg/mainprocess/main_process.go b/pkg/mainprocess/main_process.go index 948ccb9..ad8a577 100644 --- a/pkg/mainprocess/main_process.go +++ b/pkg/mainprocess/main_process.go @@ -14,6 +14,7 @@ import ( "github.com/hashicorp/go-multierror" "github.com/immune-gmbh/attestation-sdk/pkg/lockmap" "github.com/sethvargo/go-password/password" + "github.com/xaionaro-go/streamctl/pkg/observability" ) func init() { @@ -119,13 +120,13 @@ func (m *Manager) Serve( ctx, cancelFn := context.WithCancel(ctx) defer cancelFn() - go func() { + observability.Go(ctx, func() { <-ctx.Done() err := m.Close() if err != nil { logger.Error(ctx, err) } - }() + }) if m.LaunchClient != nil { for _, name := range m.allClientProcesses { @@ -159,9 +160,9 @@ func (m *Manager) addNewConnection( conn net.Conn, onReceivedMessage OnReceivedMessageFunc, ) { - go func() { + observability.Go(ctx, func() { m.handleConnection(ctx, conn, onReceivedMessage) - }() + }) } func (m *Manager) handleConnection( @@ -174,10 +175,10 @@ func (m *Manager) handleConnection( defer func() { logger.Tracef(ctx, "/handleConnection from %s (%s)", conn.RemoteAddr(), regMessage.Source) }() ctx, cancelFn := context.WithCancel(ctx) - go func() { + observability.Go(ctx, func() { <-ctx.Done() conn.Close() - }() + }) defer cancelFn() encoder := gob.NewEncoder(conn) @@ -276,11 +277,11 @@ func (m *Manager) processMessage( err = multierror.Append(err, onReceivedMessage(ctx, source, message.Content)) errCh := make(chan error) - go func() { + observability.Go(ctx, func() { for e := range errCh { err = multierror.Append(err, e) } - }() + }) for _, dst := range m.allClientProcesses { if dst == source { continue @@ -346,7 +347,7 @@ func (m *Manager) sendMessage( return fmt.Errorf("process '%s' is not ever expected", destination) } - go func() { + observability.Go(ctx, func() { conn, err := m.waitForReadyProcess(ctx, destination, reflect.TypeOf(content)) if err != nil { logger.Errorf(ctx, "%v", fmt.Errorf("unable to wait for process '%s': %w", destination, err)) @@ -368,7 +369,7 @@ func (m *Manager) sendMessage( logger.Errorf(ctx, "%v", fmt.Errorf("unable to encode&send message: %w", err)) return } - }() + }) return nil } diff --git a/pkg/oauthhandler/oauth2_handler.go b/pkg/oauthhandler/oauth2_handler.go index f9158f8..fa92f5e 100644 --- a/pkg/oauthhandler/oauth2_handler.go +++ b/pkg/oauthhandler/oauth2_handler.go @@ -8,6 +8,8 @@ import ( "net/http" "os/exec" "runtime" + + "github.com/xaionaro-go/streamctl/pkg/observability" ) type OAuthHandlerArgument struct { @@ -74,12 +76,12 @@ func NewCodeReceiver( }), } - go func() { + observability.Go(ctx, func() { <-ctx.Done() listener.Close() srv.Close() close(codeCh) - }() + }) go srv.Serve(listener) diff --git a/pkg/player/cmd/player/main.go b/pkg/player/cmd/player/main.go index 4db7378..a33b4f6 100644 --- a/pkg/player/cmd/player/main.go +++ b/pkg/player/cmd/player/main.go @@ -15,6 +15,7 @@ import ( "github.com/facebookincubator/go-belt/tool/logger" "github.com/facebookincubator/go-belt/tool/logger/implementation/logrus" "github.com/spf13/pflag" + "github.com/xaionaro-go/streamctl/pkg/observability" "github.com/xaionaro-go/streamctl/pkg/player" "github.com/xaionaro-go/streamctl/pkg/player/types" "github.com/xaionaro-go/streamctl/pkg/xfyne" @@ -63,7 +64,7 @@ func main() { app := fyneapp.New() - go func() { + observability.Go(ctx, func() { for { ch, err := p.EndChan(ctx) if err != nil { @@ -77,7 +78,7 @@ func main() { w.SetContent(container.NewStack(b)) w.Show() } - }() + }) errorMessage := widget.NewLabel("") @@ -128,7 +129,7 @@ func main() { }) posLabel := widget.NewLabel("") - go func() { + observability.Go(ctx, func() { t := time.NewTicker(time.Millisecond * 100) for { <-t.C @@ -146,7 +147,7 @@ func main() { posLabel.SetText(pos.String() + " / " + l.String()) } - }() + }) w := app.NewWindow("player controls") w.SetContent(container.NewBorder( diff --git a/pkg/player/vlcserver/client/client.go b/pkg/player/vlcserver/client/client.go index 0df6a08..73967b1 100644 --- a/pkg/player/vlcserver/client/client.go +++ b/pkg/player/vlcserver/client/client.go @@ -10,6 +10,7 @@ import ( "time" "github.com/facebookincubator/go-belt/tool/logger" + "github.com/xaionaro-go/streamctl/pkg/observability" "github.com/xaionaro-go/streamctl/pkg/player/protobuf/go/player_grpc" "github.com/xaionaro-go/streamctl/pkg/player/types" "google.golang.org/grpc" @@ -127,7 +128,7 @@ func (c *Client) EndChan(ctx context.Context) (<-chan struct{}, error) { result := make(chan struct{}) waiter.CloseSend() - go func() { + observability.Go(ctx, func() { defer conn.Close() defer func() { close(result) @@ -142,7 +143,7 @@ func (c *Client) EndChan(ctx context.Context) (<-chan struct{}, error) { logger.Errorf(ctx, "unable to read data: %v", err) return } - }() + }) return result, nil } diff --git a/pkg/streamcontrol/stream_control.go b/pkg/streamcontrol/stream_control.go index 8444198..badfd2d 100644 --- a/pkg/streamcontrol/stream_control.go +++ b/pkg/streamcontrol/stream_control.go @@ -11,6 +11,7 @@ import ( "github.com/facebookincubator/go-belt/tool/logger" "github.com/hashicorp/go-multierror" + "github.com/xaionaro-go/streamctl/pkg/observability" ) type StreamProfileBase struct { @@ -242,10 +243,10 @@ func (s StreamControllers) ApplyProfiles( } }(p) } - go func() { + observability.Go(ctx, func() { wg.Wait() close(errCh) - }() + }) var result error for err := range errCh { result = multierror.Append(result, err) @@ -257,7 +258,7 @@ func (s StreamControllers) SetTitle( ctx context.Context, title string, ) error { - return s.concurrently(func(c AbstractStreamController) error { + return s.concurrently(ctx, func(c AbstractStreamController) error { err := c.SetTitle(ctx, title) logger.Debugf(ctx, "SetTitle: %T: <%s>: %v", c.GetImplementation(), title, err) if err != nil { @@ -271,7 +272,7 @@ func (s StreamControllers) SetDescription( ctx context.Context, description string, ) error { - return s.concurrently(func(c AbstractStreamController) error { + return s.concurrently(ctx, func(c AbstractStreamController) error { logger.Debugf(ctx, "SetDescription: %T: <%s>", c.GetImplementation(), description) if err := c.SetDescription(ctx, description); err != nil { return fmt.Errorf("StreamController %T return error: %w", c.GetImplementation(), err) @@ -285,7 +286,7 @@ func (s StreamControllers) InsertAdsCuePoint( ts time.Time, duration time.Duration, ) error { - return s.concurrently(func(c AbstractStreamController) error { + return s.concurrently(ctx, func(c AbstractStreamController) error { if err := c.InsertAdsCuePoint(ctx, ts, duration); err != nil { return fmt.Errorf("StreamController %T return error: %w", c.GetImplementation(), err) } @@ -304,7 +305,7 @@ func (s StreamControllers) StartStream( for _, p := range profiles { m[reflect.TypeOf(p)] = p } - return s.concurrently(func(c AbstractStreamController) error { + return s.concurrently(ctx, func(c AbstractStreamController) error { profile := m[c.StreamProfileType()] logger.Debugf(ctx, "profile == %#+v", profile) if err := c.StartStream(ctx, title, description, profile, customArgs...); err != nil { @@ -317,7 +318,7 @@ func (s StreamControllers) StartStream( func (s StreamControllers) EndStream( ctx context.Context, ) error { - return s.concurrently(func(c AbstractStreamController) error { + return s.concurrently(ctx, func(c AbstractStreamController) error { if err := c.EndStream(ctx); err != nil { return fmt.Errorf("StreamController %T return error: %w", c.GetImplementation(), err) } @@ -328,7 +329,7 @@ func (s StreamControllers) EndStream( func (s StreamControllers) Flush( ctx context.Context, ) error { - return s.concurrently(func(c AbstractStreamController) error { + return s.concurrently(ctx, func(c AbstractStreamController) error { if err := c.Flush(ctx); err != nil { return fmt.Errorf("StreamController %T return error: %w", c.GetImplementation(), err) } @@ -336,7 +337,10 @@ func (s StreamControllers) Flush( }) } -func (s StreamControllers) concurrently(callback func(c AbstractStreamController) error) error { +func (s StreamControllers) concurrently( + ctx context.Context, + callback func(c AbstractStreamController) error, +) error { var wg sync.WaitGroup errCh := make(chan error) for _, c := range s { @@ -348,10 +352,10 @@ func (s StreamControllers) concurrently(callback func(c AbstractStreamController } }(c) } - go func() { + observability.Go(ctx, func() { wg.Wait() close(errCh) - }() + }) var result error for err := range errCh { diff --git a/pkg/streamcontrol/twitch/twitch.go b/pkg/streamcontrol/twitch/twitch.go index 3181cc7..0af2547 100644 --- a/pkg/streamcontrol/twitch/twitch.go +++ b/pkg/streamcontrol/twitch/twitch.go @@ -14,6 +14,7 @@ import ( "github.com/hashicorp/go-multierror" "github.com/nicklaw5/helix/v2" "github.com/xaionaro-go/streamctl/pkg/oauthhandler" + "github.com/xaionaro-go/streamctl/pkg/observability" "github.com/xaionaro-go/streamctl/pkg/streamcontrol" ) @@ -358,13 +359,13 @@ func (t *Twitch) getNewToken( var resultErr error errCh := make(chan error) errWg.Add(1) - go func() { + observability.Go(ctx, func() { errWg.Done() for err := range errCh { errmon.ObserveErrorCtx(ctx, err) resultErr = multierror.Append(resultErr, err) } - }() + }) alreadyListening := map[uint16]struct{}{} var wg sync.WaitGroup @@ -412,7 +413,7 @@ func (t *Twitch) getNewToken( } wg.Add(1) - go func() { + observability.Go(ctx, func() { defer wg.Done() t := time.NewTicker(time.Second) for { @@ -431,12 +432,12 @@ func (t *Twitch) getNewToken( } alreadyListening = alreadyListeningNext } - }() + }) - go func() { + observability.Go(ctx, func() { wg.Wait() close(errCh) - }() + }) <-ctx.Done() if !success { errWg.Wait() diff --git a/pkg/streamcontrol/youtube/youtube.go b/pkg/streamcontrol/youtube/youtube.go index 35a1e78..e3c8d16 100644 --- a/pkg/streamcontrol/youtube/youtube.go +++ b/pkg/streamcontrol/youtube/youtube.go @@ -20,6 +20,7 @@ import ( "github.com/go-yaml/yaml" "github.com/hashicorp/go-multierror" "github.com/xaionaro-go/streamctl/pkg/oauthhandler" + "github.com/xaionaro-go/streamctl/pkg/observability" "github.com/xaionaro-go/streamctl/pkg/streamcontrol" "golang.org/x/oauth2" "golang.org/x/oauth2/google" @@ -67,7 +68,7 @@ func New( return nil, fmt.Errorf("connection verification failed: %w", err) } - go func() { + observability.Go(ctx, func() { ticker := time.NewTicker(time.Minute) for { select { @@ -82,7 +83,7 @@ func New( } } } - }() + }) return yt, nil } @@ -211,13 +212,13 @@ func getToken(ctx context.Context, cfg Config) (*oauth2.Token, error) { var resultErr error errCh := make(chan error) errWg.Add(1) - go func() { + observability.Go(ctx, func() { errWg.Done() for err := range errCh { errmon.ObserveErrorCtx(ctx, err) resultErr = multierror.Append(resultErr, err) } - }() + }) alreadyListening := map[uint16]struct{}{} @@ -268,7 +269,7 @@ func getToken(ctx context.Context, cfg Config) (*oauth2.Token, error) { } wg.Add(1) - go func() { + observability.Go(ctx, func() { defer wg.Done() t := time.NewTicker(time.Second) for { @@ -287,12 +288,12 @@ func getToken(ctx context.Context, cfg Config) (*oauth2.Token, error) { } alreadyListening = alreadyListeningNext } - }() + }) - go func() { + observability.Go(ctx, func() { wg.Wait() close(errCh) - }() + }) <-ctx.Done() if tok == nil { diff --git a/pkg/streamd/client/client.go b/pkg/streamd/client/client.go index 95a3d62..2512e99 100644 --- a/pkg/streamd/client/client.go +++ b/pkg/streamd/client/client.go @@ -17,6 +17,7 @@ import ( "github.com/facebookincubator/go-belt" "github.com/facebookincubator/go-belt/tool/logger" "github.com/goccy/go-yaml" + "github.com/xaionaro-go/streamctl/pkg/observability" "github.com/xaionaro-go/streamctl/pkg/player" "github.com/xaionaro-go/streamctl/pkg/player/protobuf/go/player_grpc" "github.com/xaionaro-go/streamctl/pkg/streamcontrol" @@ -463,7 +464,7 @@ func (c *Client) SubscriberToOAuthURLs( return nil, fmt.Errorf("unable to subscribe to oauth URLs: %w", err) } subClient.CloseSend() - go func() { + observability.Go(ctx, func() { defer conn.Close() defer func() { close(result) @@ -482,7 +483,7 @@ func (c *Client) SubscriberToOAuthURLs( result <- res } - }() + }) return result, nil } @@ -973,14 +974,14 @@ func (c *Client) WaitForStreamPublisher( } ctx, cancelFn := context.WithCancel(ctx) - go func() { + observability.Go(ctx, func() { <-ctx.Done() conn.Close() - }() + }) result := make(chan struct{}) waiter.CloseSend() - go func() { + observability.Go(ctx, func() { defer cancelFn() defer conn.Close() defer func() { @@ -996,7 +997,7 @@ func (c *Client) WaitForStreamPublisher( logger.Errorf(ctx, "unable to read data: %v", err) return } - }() + }) return result, nil } @@ -1197,10 +1198,10 @@ func (c *Client) StreamPlayerEndChan( defer conn.Close() ctx, cancelFn := context.WithCancel(ctx) - go func() { + observability.Go(ctx, func() { <-ctx.Done() conn.Close() - }() + }) waiter, err := client.StreamPlayerEndChan(ctx, &streamd_grpc.StreamPlayerEndChanRequest{ StreamID: string(streamID), @@ -1212,7 +1213,7 @@ func (c *Client) StreamPlayerEndChan( } result := make(chan struct{}) waiter.CloseSend() - go func() { + observability.Go(ctx, func() { defer cancelFn() defer func() { close(result) @@ -1227,7 +1228,7 @@ func (c *Client) StreamPlayerEndChan( logger.Errorf(ctx, "unable to read data: %v", err) return } - }() + }) return result, nil } @@ -1406,7 +1407,7 @@ func unwrapChan[E any, R any, S receiver[R]]( } r := make(chan E) - go func() { + observability.Go(ctx, func() { defer conn.Close() defer cancelFn() for { @@ -1432,7 +1433,7 @@ func unwrapChan[E any, R any, S receiver[R]]( var eventParsed E r <- eventParsed } - }() + }) return r, nil } diff --git a/pkg/streamd/git_storage.go b/pkg/streamd/git_storage.go index 3a45744..9bf9d38 100644 --- a/pkg/streamd/git_storage.go +++ b/pkg/streamd/git_storage.go @@ -9,6 +9,7 @@ import ( "github.com/facebookincubator/go-belt/tool/logger" "github.com/go-git/go-git/v5/plumbing" + "github.com/xaionaro-go/streamctl/pkg/observability" "github.com/xaionaro-go/streamctl/pkg/repository" "github.com/xaionaro-go/streamctl/pkg/streamd/config" ) @@ -200,7 +201,7 @@ func (d *StreamD) startPeriodicGitSyncer(ctx context.Context) { d.GitSyncerMutex.Unlock() d.gitSync(ctx) - go func() { + observability.Go(ctx, func() { err := d.sendConfigViaGIT(ctx) if err != nil { d.UI.DisplayError(fmt.Errorf("unable to send the config to the remote git repository: %w", err)) @@ -220,7 +221,7 @@ func (d *StreamD) startPeriodicGitSyncer(ctx context.Context) { d.gitSync(ctx) } - }() + }) } func (d *StreamD) OBSOLETE_GitRelogin(ctx context.Context) error { diff --git a/pkg/streamd/streamd.go b/pkg/streamd/streamd.go index a74e483..567970c 100644 --- a/pkg/streamd/streamd.go +++ b/pkg/streamd/streamd.go @@ -15,6 +15,7 @@ import ( "github.com/facebookincubator/go-belt/tool/experimental/errmon" "github.com/facebookincubator/go-belt/tool/logger" "github.com/hashicorp/go-multierror" + "github.com/xaionaro-go/streamctl/pkg/observability" "github.com/xaionaro-go/streamctl/pkg/player" "github.com/xaionaro-go/streamctl/pkg/repository" "github.com/xaionaro-go/streamctl/pkg/streamcontrol" @@ -256,24 +257,24 @@ func (d *StreamD) InitCache(ctx context.Context) error { var wg sync.WaitGroup wg.Add(1) - go func() { + observability.Go(ctx, func() { defer wg.Done() _changedCache := d.initTwitchData(ctx) d.normalizeTwitchData() if _changedCache { changedCache = true } - }() + }) wg.Add(1) - go func() { + observability.Go(ctx, func() { defer wg.Done() _changedCache := d.initYoutubeData(ctx) d.normalizeYoutubeData() if _changedCache { changedCache = true } - }() + }) wg.Wait() if changedCache { @@ -387,14 +388,14 @@ func (d *StreamD) SaveConfig(ctx context.Context) error { return err } - go func() { + observability.Go(ctx, func() { if d.GitStorage != nil { err = d.sendConfigViaGIT(ctx) if err != nil { d.UI.DisplayError(fmt.Errorf("unable to send the config to the remote git repository: %w", err)) } } - }() + }) return nil } @@ -450,14 +451,14 @@ func (d *StreamD) StartStream( defer func() { d.StreamStatusCache.InvalidateCache(ctx) if platID == youtube.ID { - go func() { + observability.Go(ctx, func() { now := time.Now() time.Sleep(10 * time.Second) for time.Since(now) < 5*time.Minute { d.StreamStatusCache.InvalidateCache(ctx) time.Sleep(20 * time.Second) } - }() + }) } }() switch platID { @@ -1501,12 +1502,12 @@ func eventSubToChan[T any]( return nil, fmt.Errorf("unable to subscribe: %w", err) } - go func() { + observability.Go(ctx, func() { <-ctx.Done() d.EventBus.Unsubscribe(topic, callback) d.EventBus.WaitAsync() close(r) - }() + }) return r, nil } diff --git a/pkg/streampanel/monitor.go b/pkg/streampanel/monitor.go index 72285ab..2fc0ef5 100644 --- a/pkg/streampanel/monitor.go +++ b/pkg/streampanel/monitor.go @@ -13,6 +13,7 @@ import ( "fyne.io/fyne/v2/widget" "github.com/anthonynsimon/bild/adjust" "github.com/facebookincubator/go-belt/tool/logger" + "github.com/xaionaro-go/streamctl/pkg/observability" "github.com/xaionaro-go/streamctl/pkg/streamcontrol" "github.com/xaionaro-go/streamctl/pkg/streamcontrol/obs" "github.com/xaionaro-go/streamctl/pkg/streamcontrol/twitch" @@ -30,7 +31,7 @@ func (p *Panel) startMonitorPage( p.updateMonitorPageImages(ctx) p.updateMonitorPageStreamStatus(ctx) - go func() { + observability.Go(ctx, func() { t := time.NewTicker(200 * time.Millisecond) for { select { @@ -41,9 +42,9 @@ func (p *Panel) startMonitorPage( p.updateMonitorPageImages(ctx) } - }() + }) - go func() { + observability.Go(ctx, func() { t := time.NewTicker(2 * time.Second) for { select { @@ -54,7 +55,7 @@ func (p *Panel) startMonitorPage( p.updateMonitorPageStreamStatus(ctx) } - }() + }) }(ctx) } @@ -79,7 +80,7 @@ func (p *Panel) updateMonitorPageImages( var wg sync.WaitGroup wg.Add(1) - go func() { + observability.Go(ctx, func() { defer wg.Done() img, changed, err := p.getImage(ctx, consts.ImageScreenshot) @@ -101,10 +102,10 @@ func (p *Panel) updateMonitorPageImages( p.screenshotContainer.Objects = append(p.screenshotContainer.Objects, imgFyne) p.screenshotContainer.Refresh() } - }() + }) wg.Add(1) - go func() { + observability.Go(ctx, func() { defer wg.Done() img, changed, err := p.getImage(ctx, consts.ImageChat) if err != nil { @@ -124,7 +125,7 @@ func (p *Panel) updateMonitorPageImages( p.chatContainer.Objects = append(p.chatContainer.Objects, imgFyne) p.chatContainer.Refresh() } - }() + }) } @@ -141,7 +142,7 @@ func (p *Panel) updateMonitorPageStreamStatus( twitch.ID, } { wg.Add(1) - go func() { + observability.Go(ctx, func() { defer wg.Done() dst := p.streamStatus[platID] @@ -178,7 +179,7 @@ func (p *Panel) updateMonitorPageStreamStatus( } else { dst.SetText("started") } - }() + }) } wg.Wait() diff --git a/pkg/streampanel/panel.go b/pkg/streampanel/panel.go index 6804964..3614002 100644 --- a/pkg/streampanel/panel.go +++ b/pkg/streampanel/panel.go @@ -31,6 +31,7 @@ import ( "github.com/facebookincubator/go-belt/tool/logger" "github.com/go-ng/xmath" "github.com/xaionaro-go/streamctl/pkg/oauthhandler" + "github.com/xaionaro-go/streamctl/pkg/observability" "github.com/xaionaro-go/streamctl/pkg/screenshot" "github.com/xaionaro-go/streamctl/pkg/screenshoter" "github.com/xaionaro-go/streamctl/pkg/streamcontrol" @@ -264,17 +265,17 @@ func (p *Panel) Loop(ctx context.Context, opts ...LoopOption) error { closeLoadingWindow := func() { logger.Tracef(ctx, "closing the loading window") loadingWindow.Hide() - go func() { + observability.Go(ctx, func() { time.Sleep(10 * time.Millisecond) loadingWindow.Hide() time.Sleep(100 * time.Millisecond) loadingWindow.Hide() time.Sleep(time.Second) loadingWindow.Close() - }() + }) } - go func() { + observability.Go(ctx, func() { if streamD, ok := p.StreamD.(*client.Client); ok { p.setStatusFunc("Connecting...") err := p.startOAuthListenerForRemoteStreamD(ctx, streamD) @@ -290,10 +291,10 @@ func (p *Panel) Loop(ctx context.Context, opts ...LoopOption) error { defer closeLoadingWindow() streamD := p.StreamD.(*streamd.StreamD) streamD.AddOAuthListenPort(8091) - go func() { + observability.Go(ctx, func() { <-ctx.Done() streamD.RemoveOAuthListenPort(8091) - }() + }) logger.Tracef(ctx, "started oauth listener for the local streamd") } @@ -323,7 +324,7 @@ func (p *Panel) Loop(ctx context.Context, opts ...LoopOption) error { }*/ logger.Tracef(ctx, "ended stream controllers initialization") - }() + }) p.app.Run() return nil @@ -347,7 +348,7 @@ func (p *Panel) startOAuthListenerForRemoteStreamD( } logger.Tracef(ctx, "started oauth listener for the remote streamd") - go func() { + observability.Go(ctx, func() { defer cancelFn() defer p.DisplayError(fmt.Errorf("oauth handler was closed")) for { @@ -383,7 +384,7 @@ func (p *Panel) startOAuthListenerForRemoteStreamD( } } } - }() + }) return nil } @@ -1789,7 +1790,7 @@ func (p *Panel) subscribeUpdateControlPage(ctx context.Context) { p.getUpdatedStatus(ctx) - go func() { + observability.Go(ctx, func() { for { select { case <-ctx.Done(): @@ -1799,7 +1800,7 @@ func (p *Panel) subscribeUpdateControlPage(ctx context.Context) { } p.getUpdatedStatus(ctx) } - }() + }) } func (p *Panel) getSelectedProfile() Profile { @@ -1821,7 +1822,7 @@ func (p *Panel) execCommand(ctx context.Context, cmdString string) { var stdout, stderr bytes.Buffer cmd.Stdout = &stdout cmd.Stderr = &stderr - go func() { + observability.Go(ctx, func() { err := cmd.Run() if err != nil { p.DisplayError(err) @@ -1829,7 +1830,7 @@ func (p *Panel) execCommand(ctx context.Context, cmdString string) { logger.Debugf(ctx, "stdout: %s", stdout.Bytes()) logger.Debugf(ctx, "stderr: %s", stderr.Bytes()) - }() + }) } func (p *Panel) streamIsRunning( @@ -2441,10 +2442,10 @@ func (p *Panel) profileWindow( for _, cat := range dataTwitch.Cache.Categories { if cleanTwitchCategoryName(cat.Name) == text { setSelectedTwitchCategory(cat.Name) - go func() { + observability.Go(ctx, func() { time.Sleep(100 * time.Millisecond) twitchCategory.SetText("") - }() + }) return } } @@ -2535,10 +2536,10 @@ func (p *Panel) profileWindow( for _, bc := range dataYouTube.Cache.Broadcasts { if cleanYoutubeRecordingName(bc.Snippet.Title) == text { setSelectedYoutubeBroadcast(bc) - go func() { + observability.Go(ctx, func() { time.Sleep(100 * time.Millisecond) youtubeTemplate.SetText("") - }() + }) return } } diff --git a/pkg/streampanel/restream.go b/pkg/streampanel/restream.go index c49a2f3..c4e627c 100644 --- a/pkg/streampanel/restream.go +++ b/pkg/streampanel/restream.go @@ -15,6 +15,7 @@ import ( "fyne.io/fyne/v2/widget" "github.com/dustin/go-humanize" "github.com/facebookincubator/go-belt/tool/logger" + "github.com/xaionaro-go/streamctl/pkg/observability" "github.com/xaionaro-go/streamctl/pkg/player" "github.com/xaionaro-go/streamctl/pkg/streamd/api" sptypes "github.com/xaionaro-go/streamctl/pkg/streamplayer/types" @@ -38,7 +39,7 @@ func (p *Panel) initRestreamPage( logger.Debugf(ctx, "initRestreamPage") defer logger.Debugf(ctx, "/initRestreamPage") - go func() { + observability.Go(ctx, func() { updateData := func() { inStreams, err := p.StreamD.ListIncomingStreams(ctx) if err != nil { @@ -57,9 +58,9 @@ func (p *Panel) initRestreamPage( for range ch { updateData() } - }() + }) - go func() { + observability.Go(ctx, func() { updateData := func() { streamServers, err := p.StreamD.ListStreamServers(ctx) if err != nil { @@ -78,9 +79,9 @@ func (p *Panel) initRestreamPage( for range ch { updateData() } - }() + }) - go func() { + observability.Go(ctx, func() { updateData := func() { dsts, err := p.StreamD.ListStreamDestinations(ctx) if err != nil { @@ -99,9 +100,9 @@ func (p *Panel) initRestreamPage( for range ch { updateData() } - }() + }) - go func() { + observability.Go(ctx, func() { updateData := func() { streamFwds, err := p.StreamD.ListStreamForwards(ctx) if err != nil { @@ -120,9 +121,9 @@ func (p *Panel) initRestreamPage( for range ch { updateData() } - }() + }) - go func() { + observability.Go(ctx, func() { updateData := func() { streamPlayers, err := p.StreamD.ListStreamPlayers(ctx) if err != nil { @@ -141,7 +142,7 @@ func (p *Panel) initRestreamPage( for range ch { updateData() } - }() + }) } func (p *Panel) openAddStreamServerWindow(ctx context.Context) { @@ -1032,7 +1033,7 @@ func (p *Panel) streamServersUpdater( ctx context.Context, ) context.CancelFunc { ctx, cancelFn := context.WithCancel(ctx) - go func() { + observability.Go(ctx, func() { updateData := func() { streamServers, err := p.StreamD.ListStreamServers(ctx) if err != nil { @@ -1061,7 +1062,7 @@ func (p *Panel) streamServersUpdater( } updateData() } - }() + }) return cancelFn } @@ -1069,7 +1070,7 @@ func (p *Panel) startStreamPlayersUpdater( ctx context.Context, ) context.CancelFunc { ctx, cancelFn := context.WithCancel(ctx) - go func() { + observability.Go(ctx, func() { updateData := func() { streamPlayers, err := p.StreamD.ListStreamPlayers(ctx) if err != nil { @@ -1098,7 +1099,7 @@ func (p *Panel) startStreamPlayersUpdater( } updateData() } - }() + }) return cancelFn } @@ -1106,7 +1107,7 @@ func (p *Panel) startStreamForwardersUpdater( ctx context.Context, ) context.CancelFunc { ctx, cancelFn := context.WithCancel(ctx) - go func() { + observability.Go(ctx, func() { updateData := func() { streamFwds, err := p.StreamD.ListStreamForwards(ctx) if err != nil { @@ -1135,6 +1136,6 @@ func (p *Panel) startStreamForwardersUpdater( } updateData() } - }() + }) return cancelFn } diff --git a/pkg/streampanel/widget_hint.go b/pkg/streampanel/widget_hint.go index 8ccf4eb..1107a3e 100644 --- a/pkg/streampanel/widget_hint.go +++ b/pkg/streampanel/widget_hint.go @@ -9,6 +9,7 @@ import ( "fyne.io/fyne/v2/driver/desktop" "fyne.io/fyne/v2/widget" + "github.com/xaionaro-go/streamctl/pkg/observability" "github.com/xaionaro-go/unsafetools" ) @@ -60,7 +61,7 @@ func (w *HintWidget) MouseIn(ev *desktop.MouseEvent) { panic("should not have happened") } w.RecheckerCancelFn = cancelFn - go func() { + observability.Go(ctx, func() { for { select { case <-ctx.Done(): @@ -80,7 +81,7 @@ func (w *HintWidget) MouseIn(ev *desktop.MouseEvent) { pos.Y += 5 w.Hint.Move(pos) } - }() + }) } func (w *HintWidget) MouseMoved(*desktop.MouseEvent) { } diff --git a/pkg/streamplayer/cmd/streamplayer/main.go b/pkg/streamplayer/cmd/streamplayer/main.go index b0ac896..fa499e7 100644 --- a/pkg/streamplayer/cmd/streamplayer/main.go +++ b/pkg/streamplayer/cmd/streamplayer/main.go @@ -18,6 +18,7 @@ import ( "github.com/facebookincubator/go-belt/tool/logger/implementation/logrus" "github.com/spf13/pflag" + "github.com/xaionaro-go/streamctl/pkg/observability" "github.com/xaionaro-go/streamctl/pkg/player" ptypes "github.com/xaionaro-go/streamctl/pkg/player/types" "github.com/xaionaro-go/streamctl/pkg/streamd/api" @@ -210,10 +211,10 @@ func (s *StreamPlayerStreamServer) WaitPublisher( } ch := make(chan struct{}) - go func() { + observability.Go(ctx, func() { s.StreamServer.RelayServer.WaitPubsub(ctx, localAppName) close(ch) - }() + }) return ch, nil } diff --git a/pkg/streamplayer/stream_player.go b/pkg/streamplayer/stream_player.go index 4f33897..ca78780 100644 --- a/pkg/streamplayer/stream_player.go +++ b/pkg/streamplayer/stream_player.go @@ -11,6 +11,7 @@ import ( "github.com/facebookincubator/go-belt/tool/experimental/errmon" "github.com/facebookincubator/go-belt/tool/logger" "github.com/hashicorp/go-multierror" + "github.com/xaionaro-go/streamctl/pkg/observability" "github.com/xaionaro-go/streamctl/pkg/player" "github.com/xaionaro-go/streamctl/pkg/streamd/api" ) @@ -302,10 +303,10 @@ func (p *StreamPlayer) controllerLoop(ctx context.Context) { } } - go func() { + observability.Go(ctx, func() { time.Sleep(time.Second) // TODO: delete this ugly racy hack p.notifyStart(context.WithValue(ctx, CtxKeyStreamPlayer, p)) - }() + }) logger.Debugf(ctx, "finished waiting for a publisher at '%s'", p.StreamID) diff --git a/pkg/streamserver/implementations/go2rtc/streamserver/server/rtmp/rtmp_server.go b/pkg/streamserver/implementations/go2rtc/streamserver/server/rtmp/rtmp_server.go index 16acd18..e233d66 100644 --- a/pkg/streamserver/implementations/go2rtc/streamserver/server/rtmp/rtmp_server.go +++ b/pkg/streamserver/implementations/go2rtc/streamserver/server/rtmp/rtmp_server.go @@ -15,6 +15,7 @@ import ( "github.com/facebookincubator/go-belt/tool/logger" "github.com/rs/zerolog/log" "github.com/xaionaro-go/datacounter" + "github.com/xaionaro-go/streamctl/pkg/observability" "github.com/xaionaro-go/streamctl/pkg/streamserver/consts" "github.com/xaionaro-go/streamctl/pkg/streamserver/implementations/go2rtc/streamserver/streams" "github.com/xaionaro-go/streamctl/pkg/streamserver/types" @@ -55,15 +56,15 @@ func New( Listener: ln, } - go func() { + observability.Go(ctx, func() { <-ctx.Done() logger.Infof(ctx, "closing %s", cfg.Listen) err := ln.Close() errmon.ObserveErrorCtx(ctx, err) - }() + }) logger.Infof(ctx, "started RTMP server at %s", cfg.Listen) - go func() { + observability.Go(ctx, func() { for { if ctx.Err() != nil { return @@ -75,13 +76,13 @@ func New( return } - go func() { + observability.Go(ctx, func() { if err = s.tcpHandle(conn); err != nil { errmon.ObserveErrorCtx(ctx, err) } - }() + }) } - }() + }) return s, nil } @@ -197,12 +198,12 @@ func StreamsConsumerHandle(url string) (core.Consumer, types.NumBytesReaderWrote ctx, cancelFn := context.WithCancel(ctx) defer cancelFn() - go func() { + observability.Go(ctx, func() { <-ctx.Done() cancelFn() err := wr.(io.Closer).Close() errmon.ObserveErrorCtx(ctx, err) - }() + }) _, err = cons.WriteTo(wrc) if err != nil { diff --git a/pkg/streamserver/implementations/go2rtc/streamserver/server/rtsp/rtsp_server.go b/pkg/streamserver/implementations/go2rtc/streamserver/server/rtsp/rtsp_server.go index 42a0b6f..277f553 100644 --- a/pkg/streamserver/implementations/go2rtc/streamserver/server/rtsp/rtsp_server.go +++ b/pkg/streamserver/implementations/go2rtc/streamserver/server/rtsp/rtsp_server.go @@ -13,6 +13,7 @@ import ( "github.com/AlexxIT/go2rtc/pkg/tcp" "github.com/facebookincubator/go-belt/tool/experimental/errmon" "github.com/facebookincubator/go-belt/tool/logger" + "github.com/xaionaro-go/streamctl/pkg/observability" "github.com/xaionaro-go/streamctl/pkg/streamserver/consts" "github.com/xaionaro-go/streamctl/pkg/streamserver/implementations/go2rtc/streamserver/streams" "github.com/xaionaro-go/streamctl/pkg/streamserver/types" @@ -68,15 +69,15 @@ func New( s.DefaultMedias = ParseQuery(query) } - go func() { + observability.Go(ctx, func() { <-ctx.Done() logger.Infof(ctx, "closing %s", cfg.ListenAddr) err := ln.Close() errmon.ObserveErrorCtx(ctx, err) - }() + }) logger.Infof(ctx, "started RTSP server at %s", cfg.ListenAddr) - go func() { + observability.Go(ctx, func() { for { if ctx.Err() != nil { return @@ -95,7 +96,7 @@ func New( } go s.tcpHandler(c) } - }() + }) return s, nil } diff --git a/pkg/streamserver/implementations/go2rtc/streamserver/streams/play.go b/pkg/streamserver/implementations/go2rtc/streamserver/streams/play.go index d6ebab5..6de480b 100644 --- a/pkg/streamserver/implementations/go2rtc/streamserver/streams/play.go +++ b/pkg/streamserver/implementations/go2rtc/streamserver/streams/play.go @@ -1,13 +1,16 @@ package streams import ( + "context" "errors" "time" "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/xaionaro-go/streamctl/pkg/observability" ) func (s *Stream) Play(source string) error { + ctx := context.TODO() s.mu.Lock() for _, producer := range s.producers { if producer.state == stateInternal && producer.conn != nil { @@ -45,10 +48,10 @@ func (s *Stream) Play(source string) error { s.AddInternalProducer(src) - go func() { + observability.Go(ctx, func() { _ = src.Start() s.RemoveProducer(src) - }() + }) return nil } @@ -82,19 +85,19 @@ func (s *Stream) Play(source string) error { s.AddInternalProducer(src) s.AddInternalConsumer(cons) - go func() { + observability.Go(ctx, func() { _ = dst.Start() _ = src.Stop() s.RemoveInternalConsumer(cons) - }() + }) - go func() { + observability.Go(ctx, func() { _ = src.Start() // little timeout before stop dst, so the buffer can be transferred time.Sleep(time.Second) _ = dst.Stop() s.RemoveProducer(src) - }() + }) return nil } diff --git a/pkg/streamserver/implementations/go2rtc/streamserver/streams/stream_forwarding.go b/pkg/streamserver/implementations/go2rtc/streamserver/streams/stream_forwarding.go index 0236d33..aad0d08 100644 --- a/pkg/streamserver/implementations/go2rtc/streamserver/streams/stream_forwarding.go +++ b/pkg/streamserver/implementations/go2rtc/streamserver/streams/stream_forwarding.go @@ -11,6 +11,7 @@ import ( "github.com/facebookincubator/go-belt/tool/experimental/errmon" "github.com/facebookincubator/go-belt/tool/logger" "github.com/hashicorp/go-multierror" + "github.com/xaionaro-go/streamctl/pkg/observability" "github.com/xaionaro-go/streamctl/pkg/streamserver/types" ) @@ -48,7 +49,7 @@ func (sf *StreamForwarding) Start( ctx, cancelFn := context.WithCancel(ctx) sf.CancelFunc = cancelFn - go func() { + observability.Go(ctx, func() { for { select { case <-ctx.Done(): @@ -81,7 +82,7 @@ func (sf *StreamForwarding) Start( err := sf.Close() errmon.ObserveErrorCtx(ctx, err) } - }() + }) return nil } diff --git a/pkg/streamserver/implementations/livego/streamserver/stream_server.go b/pkg/streamserver/implementations/livego/streamserver/stream_server.go index 1596a0a..6b32ab6 100644 --- a/pkg/streamserver/implementations/livego/streamserver/stream_server.go +++ b/pkg/streamserver/implementations/livego/streamserver/stream_server.go @@ -14,6 +14,7 @@ import ( "github.com/gwuhaolin/livego/configure" "github.com/gwuhaolin/livego/protocol/rtmp" "github.com/spf13/viper" + "github.com/xaionaro-go/streamctl/pkg/observability" "github.com/xaionaro-go/streamctl/pkg/streamserver/types" "github.com/xaionaro-go/streamctl/pkg/streamtypes" ) @@ -128,13 +129,13 @@ func (s *StreamServer) startServer( Listener: listener, } portServer.Server = rtmp.NewRtmpServer(portServer.Stream, nil) - go func() { + observability.Go(ctx, func() { err = portServer.Server.Serve(listener) if err != nil { err = fmt.Errorf("unable to start serving RTMP at '%s': %w", listener.Addr().String(), err) logger.Error(ctx, err) } - }() + }) srv = portServer case streamtypes.ServerTypeRTSP: return fmt.Errorf("RTSP is not supported, yet") diff --git a/pkg/streamserver/implementations/yutopp-go-rtmp/streamserver/pubsub.go b/pkg/streamserver/implementations/yutopp-go-rtmp/streamserver/pubsub.go index 46e6e84..c4932d1 100644 --- a/pkg/streamserver/implementations/yutopp-go-rtmp/streamserver/pubsub.go +++ b/pkg/streamserver/implementations/yutopp-go-rtmp/streamserver/pubsub.go @@ -2,9 +2,11 @@ package streamserver import ( "bytes" + "context" "sync" "github.com/facebookincubator/go-belt/tool/logger" + "github.com/xaionaro-go/streamctl/pkg/observability" flvtag "github.com/yutopp/go-flv/tag" ) @@ -37,11 +39,11 @@ func (pb *Pubsub) Deregister() error { pb.m.Lock() defer pb.m.Unlock() - go func() { + observability.Go(context.TODO(), func() { for _, sub := range pb.subs { _ = sub.Close() } - }() + }) return pb.srv.removePubsub(pb.name) } diff --git a/pkg/streamserver/implementations/yutopp-go-rtmp/streamserver/stream_forward.go b/pkg/streamserver/implementations/yutopp-go-rtmp/streamserver/stream_forward.go index 9dd8908..eb91e1b 100644 --- a/pkg/streamserver/implementations/yutopp-go-rtmp/streamserver/stream_forward.go +++ b/pkg/streamserver/implementations/yutopp-go-rtmp/streamserver/stream_forward.go @@ -13,6 +13,7 @@ import ( "github.com/facebookincubator/go-belt" "github.com/facebookincubator/go-belt/tool/logger" "github.com/hashicorp/go-multierror" + "github.com/xaionaro-go/streamctl/pkg/observability" "github.com/xaionaro-go/streamctl/pkg/streamserver/types" "github.com/xaionaro-go/streamctl/pkg/xlogger" flvtag "github.com/yutopp/go-flv/tag" @@ -71,7 +72,7 @@ func (fwd *ActiveStreamForwarding) Start(ctx context.Context) error { } ctx, cancelFn := context.WithCancel(ctx) fwd.CancelFunc = cancelFn - go func() { + observability.Go(ctx, func() { for { err := fwd.waitForPublisherAndStart( ctx, @@ -86,7 +87,7 @@ func (fwd *ActiveStreamForwarding) Start(ctx context.Context) error { logger.Errorf(ctx, "%s", err) } } - }() + }) return nil } diff --git a/pkg/streamserver/implementations/yutopp-go-rtmp/streamserver/stream_player.go b/pkg/streamserver/implementations/yutopp-go-rtmp/streamserver/stream_player.go index 1d260fa..af53355 100644 --- a/pkg/streamserver/implementations/yutopp-go-rtmp/streamserver/stream_player.go +++ b/pkg/streamserver/implementations/yutopp-go-rtmp/streamserver/stream_player.go @@ -8,6 +8,7 @@ import ( "github.com/facebookincubator/go-belt" "github.com/facebookincubator/go-belt/tool/logger" "github.com/hashicorp/go-multierror" + "github.com/xaionaro-go/streamctl/pkg/observability" "github.com/xaionaro-go/streamctl/pkg/player" "github.com/xaionaro-go/streamctl/pkg/streamd/api" "github.com/xaionaro-go/streamctl/pkg/streamplayer" @@ -39,10 +40,10 @@ func (s *StreamPlayerStreamServer) WaitPublisher( } ch := make(chan struct{}) - go func() { + observability.Go(ctx, func() { s.RelayServer.WaitPubsub(ctx, localAppName) close(ch) - }() + }) return ch, nil } diff --git a/pkg/streamserver/implementations/yutopp-go-rtmp/streamserver/stream_server.go b/pkg/streamserver/implementations/yutopp-go-rtmp/streamserver/stream_server.go index 0c404de..f050757 100644 --- a/pkg/streamserver/implementations/yutopp-go-rtmp/streamserver/stream_server.go +++ b/pkg/streamserver/implementations/yutopp-go-rtmp/streamserver/stream_server.go @@ -11,6 +11,7 @@ import ( "github.com/facebookincubator/go-belt" "github.com/facebookincubator/go-belt/tool/logger" + "github.com/xaionaro-go/streamctl/pkg/observability" "github.com/xaionaro-go/streamctl/pkg/player" playertypes "github.com/xaionaro-go/streamctl/pkg/player/types" "github.com/xaionaro-go/streamctl/pkg/streamplayer" @@ -120,7 +121,7 @@ func (s *StreamServer) Init( } } - go func() { + observability.Go(ctx, func() { var opts setupStreamPlayersOptions if initCfg.DefaultStreamPlayerOptions != nil { opts = append(opts, setupStreamPlayersOptionDefaultStreamPlayerOptions(initCfg.DefaultStreamPlayerOptions)) @@ -129,7 +130,7 @@ func (s *StreamServer) Init( if err != nil { logger.Error(ctx, err) } - }() + }) return nil } @@ -202,13 +203,13 @@ func (s *StreamServer) startServer( } }, }) - go func() { + observability.Go(ctx, func() { err = portSrv.Serve(listener) if err != nil { err = fmt.Errorf("unable to start serving RTMP at '%s': %w", listener.Addr().String(), err) logger.Error(ctx, err) } - }() + }) srv = portSrv case streamtypes.ServerTypeRTSP: return fmt.Errorf("RTSP is not supported, yet")