Initial commit, pt. 65

This commit is contained in:
Dmitrii Okunev
2024-08-04 14:13:06 +01:00
parent 74ce1285b8
commit d828ed38c2
34 changed files with 218 additions and 179 deletions

View File

@@ -69,14 +69,17 @@ func main() {
}
}
ctx := context.Background()
ctx = logger.CtxWithLogger(ctx, l)
if *netPprofAddr != "" || (forceNetPProfOnAndroid && runtime.GOOS == "android") {
go func() {
observability.Go(ctx, func() {
if *netPprofAddr == "" {
*netPprofAddr = "localhost:0"
}
l.Infof("starting to listen for net/pprof requests at '%s'", *netPprofAddr)
l.Error(http.ListenAndServe(*netPprofAddr, nil))
}()
})
}
if oldValue := runtime.GOMAXPROCS(0); oldValue < 16 {
@@ -84,9 +87,6 @@ func main() {
runtime.GOMAXPROCS(16)
}
ctx := context.Background()
ctx = logger.CtxWithLogger(ctx, l)
if *sentryDSN != "" {
l.Infof("setting up Sentry at DSN '%s'", *sentryDSN)
sentryClient, err := sentry.NewClient(sentry.ClientOptions{
@@ -154,21 +154,21 @@ func main() {
l.Fatalf("unable to initialize the streamd instance: %v", err)
}
go func() {
observability.Go(ctx, func() {
if err = streamD.Run(ctx); err != nil {
l.Errorf("streamd returned an error: %v", err)
}
}()
})
listener, err := net.Listen("tcp", *listenAddr)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
go func() {
observability.Go(ctx, func() {
<-ctx.Done()
listener.Close()
}()
})
grpcServer := grpc.NewServer()
streamdGRPC = server.NewGRPCServer(streamD)

View File

@@ -5,4 +5,4 @@ Website = "https://github.com/xaionaro/streamctl"
Name = "streampanel"
ID = "center.dx.streampanel"
Version = "0.1.0"
Build = 84
Build = 85

View File

@@ -13,6 +13,7 @@ import (
"github.com/facebookincubator/go-belt"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/streamctl/pkg/mainprocess"
"github.com/xaionaro-go/streamctl/pkg/observability"
)
type ProcessName = mainprocess.ProcessName
@@ -140,7 +141,7 @@ func runSplitProcesses(
}
return nil
})
go func() {
observability.Go(ctx, func() {
select {
case <-ctx.Done():
return
@@ -151,7 +152,7 @@ func runSplitProcesses(
if err != nil {
logger.Fatalf(ctx, "%s", err)
}
}()
})
<-ctx.Done()
}
@@ -187,12 +188,12 @@ func runFork(
if err != nil {
return fmt.Errorf("unable to start '%s %s': %w", args[0], strings.Join(args[1:], " "), err)
}
go func() {
observability.Go(ctx, func() {
err := cmd.Wait()
if err != nil {
logger.Errorf(ctx, "error running '%s %s': %v", args[0], strings.Join(args[1:], " "), err)
}
}()
})
return nil
}

View File

@@ -11,6 +11,7 @@ import (
"github.com/facebookincubator/go-belt"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/streamctl/pkg/mainprocess"
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol"
"github.com/xaionaro-go/streamctl/pkg/streamd/grpc/go/streamd_grpc"
"github.com/xaionaro-go/streamctl/pkg/streampanel"
@@ -98,7 +99,7 @@ func runPanel(
if mainProcess != nil {
setReadyFor(ctx, mainProcess, StreamDDied{}, UpdateStreamDConfig{})
go func() {
observability.Go(ctx, func() {
err := mainProcess.Serve(
ctx,
func(ctx context.Context, source mainprocess.ProcessName, content any) error {
@@ -124,7 +125,7 @@ func runPanel(
},
)
logger.Fatalf(ctx, "communication (with the main process) error: %v", err)
}()
})
}
var loopOpts []streampanel.LoopOption

View File

@@ -11,6 +11,7 @@ import (
"time"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/streamctl/pkg/observability"
)
func initRuntime(ctx context.Context, flags Flags, _procName ProcessName) context.CancelFunc {
@@ -20,7 +21,7 @@ func initRuntime(ctx context.Context, flags Flags, _procName ProcessName) contex
l := logger.FromCtx(ctx)
if ForceDebug {
go func() {
observability.Go(ctx, func() {
t := time.NewTicker(time.Second)
defer t.Stop()
for {
@@ -33,7 +34,7 @@ func initRuntime(ctx context.Context, flags Flags, _procName ProcessName) contex
l.Tracef("stacktraces:\n%s", buf.String())
<-t.C
}
}()
})
}
if flags.CPUProfile != "" {
@@ -78,10 +79,10 @@ func initRuntime(ctx context.Context, flags Flags, _procName ProcessName) contex
}
if netPprofAddr != "" {
go func() {
observability.Go(ctx, func() {
l.Infof("starting to listen for net/pprof requests at '%s'", netPprofAddr)
l.Error(http.ListenAndServe(netPprofAddr, nil))
}()
})
}
if oldValue := runtime.GOMAXPROCS(0); oldValue < 16 {

View File

@@ -6,6 +6,7 @@ import (
"os/signal"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/streamctl/pkg/observability"
)
func signalHandler(
@@ -13,7 +14,7 @@ func signalHandler(
) chan<- os.Signal {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
observability.Go(ctx, func() {
for range c {
forkLocker.Lock()
for name, f := range forkMap {
@@ -26,6 +27,6 @@ func signalHandler(
forkLocker.Unlock()
os.Exit(1)
}
}()
})
return c
}

View File

@@ -14,6 +14,7 @@ import (
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/streamctl/cmd/streamd/ui"
"github.com/xaionaro-go/streamctl/pkg/mainprocess"
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol"
"github.com/xaionaro-go/streamctl/pkg/streamd"
"github.com/xaionaro-go/streamctl/pkg/streamd/api"
@@ -139,7 +140,7 @@ func runStreamd(
if mainProcess != nil {
logger.Debugf(ctx, "starting the IPC server")
setReadyFor(ctx, mainProcess, GetStreamdAddress{}, RequestStreamDConfig{})
go func() {
observability.Go(ctx, func() {
err := mainProcess.Serve(
ctx,
func(
@@ -169,7 +170,7 @@ func runStreamd(
},
)
logger.Fatalf(ctx, "communication (with the main process) error: %v", err)
}()
})
}
err = streamD.Run(ctx)
@@ -198,23 +199,23 @@ func initGRPCServer(
if err != nil {
logger.Fatalf(ctx, "failed to listen: %v", err)
}
go func() {
observability.Go(ctx, func() {
<-ctx.Done()
listener.Close()
}()
})
grpcServer := grpc.NewServer()
streamdGRPC := server.NewGRPCServer(streamD)
streamd_grpc.RegisterStreamDServer(grpcServer, streamdGRPC)
// start the server:
go func() {
observability.Go(ctx, func() {
logger.Infof(ctx, "started server at %s", listener.Addr().String())
err = grpcServer.Serve(listener)
if err != nil {
logger.Fatal(ctx, err)
}
}()
})
return listener, grpcServer, streamdGRPC
}

View File

@@ -33,6 +33,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/yl2chen/cidranger"
)
@@ -160,9 +161,9 @@ func UpRun(
if ok {
metricsTuple := fmt.Sprintf("127.0.0.1:%s", metricsPort)
http.Handle("/metrics", promhttp.Handler())
go func() {
observability.Go(ctx, func() {
http.ListenAndServe(metricsTuple, nil)
}()
})
fmt.Printf("[+] Listening for metrics scrape requests on http://%s/metrics\n", metricsTuple)
}

View File

@@ -8,6 +8,7 @@ import (
"sync"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/streamctl/pkg/observability"
)
type Client struct {
@@ -86,13 +87,13 @@ func (c *Client) Serve(
) error {
ctx, cancelFn := context.WithCancel(ctx)
defer cancelFn()
go func() {
observability.Go(ctx, func() {
<-ctx.Done()
err := c.Close()
if err != nil {
logger.Error(ctx, err)
}
}()
})
for {
select {

View File

@@ -14,6 +14,7 @@ import (
"github.com/hashicorp/go-multierror"
"github.com/immune-gmbh/attestation-sdk/pkg/lockmap"
"github.com/sethvargo/go-password/password"
"github.com/xaionaro-go/streamctl/pkg/observability"
)
func init() {
@@ -119,13 +120,13 @@ func (m *Manager) Serve(
ctx, cancelFn := context.WithCancel(ctx)
defer cancelFn()
go func() {
observability.Go(ctx, func() {
<-ctx.Done()
err := m.Close()
if err != nil {
logger.Error(ctx, err)
}
}()
})
if m.LaunchClient != nil {
for _, name := range m.allClientProcesses {
@@ -159,9 +160,9 @@ func (m *Manager) addNewConnection(
conn net.Conn,
onReceivedMessage OnReceivedMessageFunc,
) {
go func() {
observability.Go(ctx, func() {
m.handleConnection(ctx, conn, onReceivedMessage)
}()
})
}
func (m *Manager) handleConnection(
@@ -174,10 +175,10 @@ func (m *Manager) handleConnection(
defer func() { logger.Tracef(ctx, "/handleConnection from %s (%s)", conn.RemoteAddr(), regMessage.Source) }()
ctx, cancelFn := context.WithCancel(ctx)
go func() {
observability.Go(ctx, func() {
<-ctx.Done()
conn.Close()
}()
})
defer cancelFn()
encoder := gob.NewEncoder(conn)
@@ -276,11 +277,11 @@ func (m *Manager) processMessage(
err = multierror.Append(err, onReceivedMessage(ctx, source, message.Content))
errCh := make(chan error)
go func() {
observability.Go(ctx, func() {
for e := range errCh {
err = multierror.Append(err, e)
}
}()
})
for _, dst := range m.allClientProcesses {
if dst == source {
continue
@@ -346,7 +347,7 @@ func (m *Manager) sendMessage(
return fmt.Errorf("process '%s' is not ever expected", destination)
}
go func() {
observability.Go(ctx, func() {
conn, err := m.waitForReadyProcess(ctx, destination, reflect.TypeOf(content))
if err != nil {
logger.Errorf(ctx, "%v", fmt.Errorf("unable to wait for process '%s': %w", destination, err))
@@ -368,7 +369,7 @@ func (m *Manager) sendMessage(
logger.Errorf(ctx, "%v", fmt.Errorf("unable to encode&send message: %w", err))
return
}
}()
})
return nil
}

View File

@@ -8,6 +8,8 @@ import (
"net/http"
"os/exec"
"runtime"
"github.com/xaionaro-go/streamctl/pkg/observability"
)
type OAuthHandlerArgument struct {
@@ -74,12 +76,12 @@ func NewCodeReceiver(
}),
}
go func() {
observability.Go(ctx, func() {
<-ctx.Done()
listener.Close()
srv.Close()
close(codeCh)
}()
})
go srv.Serve(listener)

View File

@@ -15,6 +15,7 @@ import (
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/facebookincubator/go-belt/tool/logger/implementation/logrus"
"github.com/spf13/pflag"
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/streamctl/pkg/player"
"github.com/xaionaro-go/streamctl/pkg/player/types"
"github.com/xaionaro-go/streamctl/pkg/xfyne"
@@ -63,7 +64,7 @@ func main() {
app := fyneapp.New()
go func() {
observability.Go(ctx, func() {
for {
ch, err := p.EndChan(ctx)
if err != nil {
@@ -77,7 +78,7 @@ func main() {
w.SetContent(container.NewStack(b))
w.Show()
}
}()
})
errorMessage := widget.NewLabel("")
@@ -128,7 +129,7 @@ func main() {
})
posLabel := widget.NewLabel("")
go func() {
observability.Go(ctx, func() {
t := time.NewTicker(time.Millisecond * 100)
for {
<-t.C
@@ -146,7 +147,7 @@ func main() {
posLabel.SetText(pos.String() + " / " + l.String())
}
}()
})
w := app.NewWindow("player controls")
w.SetContent(container.NewBorder(

View File

@@ -10,6 +10,7 @@ import (
"time"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/streamctl/pkg/player/protobuf/go/player_grpc"
"github.com/xaionaro-go/streamctl/pkg/player/types"
"google.golang.org/grpc"
@@ -127,7 +128,7 @@ func (c *Client) EndChan(ctx context.Context) (<-chan struct{}, error) {
result := make(chan struct{})
waiter.CloseSend()
go func() {
observability.Go(ctx, func() {
defer conn.Close()
defer func() {
close(result)
@@ -142,7 +143,7 @@ func (c *Client) EndChan(ctx context.Context) (<-chan struct{}, error) {
logger.Errorf(ctx, "unable to read data: %v", err)
return
}
}()
})
return result, nil
}

View File

@@ -11,6 +11,7 @@ import (
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/hashicorp/go-multierror"
"github.com/xaionaro-go/streamctl/pkg/observability"
)
type StreamProfileBase struct {
@@ -242,10 +243,10 @@ func (s StreamControllers) ApplyProfiles(
}
}(p)
}
go func() {
observability.Go(ctx, func() {
wg.Wait()
close(errCh)
}()
})
var result error
for err := range errCh {
result = multierror.Append(result, err)
@@ -257,7 +258,7 @@ func (s StreamControllers) SetTitle(
ctx context.Context,
title string,
) error {
return s.concurrently(func(c AbstractStreamController) error {
return s.concurrently(ctx, func(c AbstractStreamController) error {
err := c.SetTitle(ctx, title)
logger.Debugf(ctx, "SetTitle: %T: <%s>: %v", c.GetImplementation(), title, err)
if err != nil {
@@ -271,7 +272,7 @@ func (s StreamControllers) SetDescription(
ctx context.Context,
description string,
) error {
return s.concurrently(func(c AbstractStreamController) error {
return s.concurrently(ctx, func(c AbstractStreamController) error {
logger.Debugf(ctx, "SetDescription: %T: <%s>", c.GetImplementation(), description)
if err := c.SetDescription(ctx, description); err != nil {
return fmt.Errorf("StreamController %T return error: %w", c.GetImplementation(), err)
@@ -285,7 +286,7 @@ func (s StreamControllers) InsertAdsCuePoint(
ts time.Time,
duration time.Duration,
) error {
return s.concurrently(func(c AbstractStreamController) error {
return s.concurrently(ctx, func(c AbstractStreamController) error {
if err := c.InsertAdsCuePoint(ctx, ts, duration); err != nil {
return fmt.Errorf("StreamController %T return error: %w", c.GetImplementation(), err)
}
@@ -304,7 +305,7 @@ func (s StreamControllers) StartStream(
for _, p := range profiles {
m[reflect.TypeOf(p)] = p
}
return s.concurrently(func(c AbstractStreamController) error {
return s.concurrently(ctx, func(c AbstractStreamController) error {
profile := m[c.StreamProfileType()]
logger.Debugf(ctx, "profile == %#+v", profile)
if err := c.StartStream(ctx, title, description, profile, customArgs...); err != nil {
@@ -317,7 +318,7 @@ func (s StreamControllers) StartStream(
func (s StreamControllers) EndStream(
ctx context.Context,
) error {
return s.concurrently(func(c AbstractStreamController) error {
return s.concurrently(ctx, func(c AbstractStreamController) error {
if err := c.EndStream(ctx); err != nil {
return fmt.Errorf("StreamController %T return error: %w", c.GetImplementation(), err)
}
@@ -328,7 +329,7 @@ func (s StreamControllers) EndStream(
func (s StreamControllers) Flush(
ctx context.Context,
) error {
return s.concurrently(func(c AbstractStreamController) error {
return s.concurrently(ctx, func(c AbstractStreamController) error {
if err := c.Flush(ctx); err != nil {
return fmt.Errorf("StreamController %T return error: %w", c.GetImplementation(), err)
}
@@ -336,7 +337,10 @@ func (s StreamControllers) Flush(
})
}
func (s StreamControllers) concurrently(callback func(c AbstractStreamController) error) error {
func (s StreamControllers) concurrently(
ctx context.Context,
callback func(c AbstractStreamController) error,
) error {
var wg sync.WaitGroup
errCh := make(chan error)
for _, c := range s {
@@ -348,10 +352,10 @@ func (s StreamControllers) concurrently(callback func(c AbstractStreamController
}
}(c)
}
go func() {
observability.Go(ctx, func() {
wg.Wait()
close(errCh)
}()
})
var result error
for err := range errCh {

View File

@@ -14,6 +14,7 @@ import (
"github.com/hashicorp/go-multierror"
"github.com/nicklaw5/helix/v2"
"github.com/xaionaro-go/streamctl/pkg/oauthhandler"
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol"
)
@@ -358,13 +359,13 @@ func (t *Twitch) getNewToken(
var resultErr error
errCh := make(chan error)
errWg.Add(1)
go func() {
observability.Go(ctx, func() {
errWg.Done()
for err := range errCh {
errmon.ObserveErrorCtx(ctx, err)
resultErr = multierror.Append(resultErr, err)
}
}()
})
alreadyListening := map[uint16]struct{}{}
var wg sync.WaitGroup
@@ -412,7 +413,7 @@ func (t *Twitch) getNewToken(
}
wg.Add(1)
go func() {
observability.Go(ctx, func() {
defer wg.Done()
t := time.NewTicker(time.Second)
for {
@@ -431,12 +432,12 @@ func (t *Twitch) getNewToken(
}
alreadyListening = alreadyListeningNext
}
}()
})
go func() {
observability.Go(ctx, func() {
wg.Wait()
close(errCh)
}()
})
<-ctx.Done()
if !success {
errWg.Wait()

View File

@@ -20,6 +20,7 @@ import (
"github.com/go-yaml/yaml"
"github.com/hashicorp/go-multierror"
"github.com/xaionaro-go/streamctl/pkg/oauthhandler"
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
@@ -67,7 +68,7 @@ func New(
return nil, fmt.Errorf("connection verification failed: %w", err)
}
go func() {
observability.Go(ctx, func() {
ticker := time.NewTicker(time.Minute)
for {
select {
@@ -82,7 +83,7 @@ func New(
}
}
}
}()
})
return yt, nil
}
@@ -211,13 +212,13 @@ func getToken(ctx context.Context, cfg Config) (*oauth2.Token, error) {
var resultErr error
errCh := make(chan error)
errWg.Add(1)
go func() {
observability.Go(ctx, func() {
errWg.Done()
for err := range errCh {
errmon.ObserveErrorCtx(ctx, err)
resultErr = multierror.Append(resultErr, err)
}
}()
})
alreadyListening := map[uint16]struct{}{}
@@ -268,7 +269,7 @@ func getToken(ctx context.Context, cfg Config) (*oauth2.Token, error) {
}
wg.Add(1)
go func() {
observability.Go(ctx, func() {
defer wg.Done()
t := time.NewTicker(time.Second)
for {
@@ -287,12 +288,12 @@ func getToken(ctx context.Context, cfg Config) (*oauth2.Token, error) {
}
alreadyListening = alreadyListeningNext
}
}()
})
go func() {
observability.Go(ctx, func() {
wg.Wait()
close(errCh)
}()
})
<-ctx.Done()
if tok == nil {

View File

@@ -17,6 +17,7 @@ import (
"github.com/facebookincubator/go-belt"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/goccy/go-yaml"
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/streamctl/pkg/player"
"github.com/xaionaro-go/streamctl/pkg/player/protobuf/go/player_grpc"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol"
@@ -463,7 +464,7 @@ func (c *Client) SubscriberToOAuthURLs(
return nil, fmt.Errorf("unable to subscribe to oauth URLs: %w", err)
}
subClient.CloseSend()
go func() {
observability.Go(ctx, func() {
defer conn.Close()
defer func() {
close(result)
@@ -482,7 +483,7 @@ func (c *Client) SubscriberToOAuthURLs(
result <- res
}
}()
})
return result, nil
}
@@ -973,14 +974,14 @@ func (c *Client) WaitForStreamPublisher(
}
ctx, cancelFn := context.WithCancel(ctx)
go func() {
observability.Go(ctx, func() {
<-ctx.Done()
conn.Close()
}()
})
result := make(chan struct{})
waiter.CloseSend()
go func() {
observability.Go(ctx, func() {
defer cancelFn()
defer conn.Close()
defer func() {
@@ -996,7 +997,7 @@ func (c *Client) WaitForStreamPublisher(
logger.Errorf(ctx, "unable to read data: %v", err)
return
}
}()
})
return result, nil
}
@@ -1197,10 +1198,10 @@ func (c *Client) StreamPlayerEndChan(
defer conn.Close()
ctx, cancelFn := context.WithCancel(ctx)
go func() {
observability.Go(ctx, func() {
<-ctx.Done()
conn.Close()
}()
})
waiter, err := client.StreamPlayerEndChan(ctx, &streamd_grpc.StreamPlayerEndChanRequest{
StreamID: string(streamID),
@@ -1212,7 +1213,7 @@ func (c *Client) StreamPlayerEndChan(
}
result := make(chan struct{})
waiter.CloseSend()
go func() {
observability.Go(ctx, func() {
defer cancelFn()
defer func() {
close(result)
@@ -1227,7 +1228,7 @@ func (c *Client) StreamPlayerEndChan(
logger.Errorf(ctx, "unable to read data: %v", err)
return
}
}()
})
return result, nil
}
@@ -1406,7 +1407,7 @@ func unwrapChan[E any, R any, S receiver[R]](
}
r := make(chan E)
go func() {
observability.Go(ctx, func() {
defer conn.Close()
defer cancelFn()
for {
@@ -1432,7 +1433,7 @@ func unwrapChan[E any, R any, S receiver[R]](
var eventParsed E
r <- eventParsed
}
}()
})
return r, nil
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/go-git/go-git/v5/plumbing"
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/streamctl/pkg/repository"
"github.com/xaionaro-go/streamctl/pkg/streamd/config"
)
@@ -200,7 +201,7 @@ func (d *StreamD) startPeriodicGitSyncer(ctx context.Context) {
d.GitSyncerMutex.Unlock()
d.gitSync(ctx)
go func() {
observability.Go(ctx, func() {
err := d.sendConfigViaGIT(ctx)
if err != nil {
d.UI.DisplayError(fmt.Errorf("unable to send the config to the remote git repository: %w", err))
@@ -220,7 +221,7 @@ func (d *StreamD) startPeriodicGitSyncer(ctx context.Context) {
d.gitSync(ctx)
}
}()
})
}
func (d *StreamD) OBSOLETE_GitRelogin(ctx context.Context) error {

View File

@@ -15,6 +15,7 @@ import (
"github.com/facebookincubator/go-belt/tool/experimental/errmon"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/hashicorp/go-multierror"
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/streamctl/pkg/player"
"github.com/xaionaro-go/streamctl/pkg/repository"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol"
@@ -256,24 +257,24 @@ func (d *StreamD) InitCache(ctx context.Context) error {
var wg sync.WaitGroup
wg.Add(1)
go func() {
observability.Go(ctx, func() {
defer wg.Done()
_changedCache := d.initTwitchData(ctx)
d.normalizeTwitchData()
if _changedCache {
changedCache = true
}
}()
})
wg.Add(1)
go func() {
observability.Go(ctx, func() {
defer wg.Done()
_changedCache := d.initYoutubeData(ctx)
d.normalizeYoutubeData()
if _changedCache {
changedCache = true
}
}()
})
wg.Wait()
if changedCache {
@@ -387,14 +388,14 @@ func (d *StreamD) SaveConfig(ctx context.Context) error {
return err
}
go func() {
observability.Go(ctx, func() {
if d.GitStorage != nil {
err = d.sendConfigViaGIT(ctx)
if err != nil {
d.UI.DisplayError(fmt.Errorf("unable to send the config to the remote git repository: %w", err))
}
}
}()
})
return nil
}
@@ -450,14 +451,14 @@ func (d *StreamD) StartStream(
defer func() {
d.StreamStatusCache.InvalidateCache(ctx)
if platID == youtube.ID {
go func() {
observability.Go(ctx, func() {
now := time.Now()
time.Sleep(10 * time.Second)
for time.Since(now) < 5*time.Minute {
d.StreamStatusCache.InvalidateCache(ctx)
time.Sleep(20 * time.Second)
}
}()
})
}
}()
switch platID {
@@ -1501,12 +1502,12 @@ func eventSubToChan[T any](
return nil, fmt.Errorf("unable to subscribe: %w", err)
}
go func() {
observability.Go(ctx, func() {
<-ctx.Done()
d.EventBus.Unsubscribe(topic, callback)
d.EventBus.WaitAsync()
close(r)
}()
})
return r, nil
}

View File

@@ -13,6 +13,7 @@ import (
"fyne.io/fyne/v2/widget"
"github.com/anthonynsimon/bild/adjust"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol/obs"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol/twitch"
@@ -30,7 +31,7 @@ func (p *Panel) startMonitorPage(
p.updateMonitorPageImages(ctx)
p.updateMonitorPageStreamStatus(ctx)
go func() {
observability.Go(ctx, func() {
t := time.NewTicker(200 * time.Millisecond)
for {
select {
@@ -41,9 +42,9 @@ func (p *Panel) startMonitorPage(
p.updateMonitorPageImages(ctx)
}
}()
})
go func() {
observability.Go(ctx, func() {
t := time.NewTicker(2 * time.Second)
for {
select {
@@ -54,7 +55,7 @@ func (p *Panel) startMonitorPage(
p.updateMonitorPageStreamStatus(ctx)
}
}()
})
}(ctx)
}
@@ -79,7 +80,7 @@ func (p *Panel) updateMonitorPageImages(
var wg sync.WaitGroup
wg.Add(1)
go func() {
observability.Go(ctx, func() {
defer wg.Done()
img, changed, err := p.getImage(ctx, consts.ImageScreenshot)
@@ -101,10 +102,10 @@ func (p *Panel) updateMonitorPageImages(
p.screenshotContainer.Objects = append(p.screenshotContainer.Objects, imgFyne)
p.screenshotContainer.Refresh()
}
}()
})
wg.Add(1)
go func() {
observability.Go(ctx, func() {
defer wg.Done()
img, changed, err := p.getImage(ctx, consts.ImageChat)
if err != nil {
@@ -124,7 +125,7 @@ func (p *Panel) updateMonitorPageImages(
p.chatContainer.Objects = append(p.chatContainer.Objects, imgFyne)
p.chatContainer.Refresh()
}
}()
})
}
@@ -141,7 +142,7 @@ func (p *Panel) updateMonitorPageStreamStatus(
twitch.ID,
} {
wg.Add(1)
go func() {
observability.Go(ctx, func() {
defer wg.Done()
dst := p.streamStatus[platID]
@@ -178,7 +179,7 @@ func (p *Panel) updateMonitorPageStreamStatus(
} else {
dst.SetText("started")
}
}()
})
}
wg.Wait()

View File

@@ -31,6 +31,7 @@ import (
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/go-ng/xmath"
"github.com/xaionaro-go/streamctl/pkg/oauthhandler"
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/streamctl/pkg/screenshot"
"github.com/xaionaro-go/streamctl/pkg/screenshoter"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol"
@@ -264,17 +265,17 @@ func (p *Panel) Loop(ctx context.Context, opts ...LoopOption) error {
closeLoadingWindow := func() {
logger.Tracef(ctx, "closing the loading window")
loadingWindow.Hide()
go func() {
observability.Go(ctx, func() {
time.Sleep(10 * time.Millisecond)
loadingWindow.Hide()
time.Sleep(100 * time.Millisecond)
loadingWindow.Hide()
time.Sleep(time.Second)
loadingWindow.Close()
}()
})
}
go func() {
observability.Go(ctx, func() {
if streamD, ok := p.StreamD.(*client.Client); ok {
p.setStatusFunc("Connecting...")
err := p.startOAuthListenerForRemoteStreamD(ctx, streamD)
@@ -290,10 +291,10 @@ func (p *Panel) Loop(ctx context.Context, opts ...LoopOption) error {
defer closeLoadingWindow()
streamD := p.StreamD.(*streamd.StreamD)
streamD.AddOAuthListenPort(8091)
go func() {
observability.Go(ctx, func() {
<-ctx.Done()
streamD.RemoveOAuthListenPort(8091)
}()
})
logger.Tracef(ctx, "started oauth listener for the local streamd")
}
@@ -323,7 +324,7 @@ func (p *Panel) Loop(ctx context.Context, opts ...LoopOption) error {
}*/
logger.Tracef(ctx, "ended stream controllers initialization")
}()
})
p.app.Run()
return nil
@@ -347,7 +348,7 @@ func (p *Panel) startOAuthListenerForRemoteStreamD(
}
logger.Tracef(ctx, "started oauth listener for the remote streamd")
go func() {
observability.Go(ctx, func() {
defer cancelFn()
defer p.DisplayError(fmt.Errorf("oauth handler was closed"))
for {
@@ -383,7 +384,7 @@ func (p *Panel) startOAuthListenerForRemoteStreamD(
}
}
}
}()
})
return nil
}
@@ -1789,7 +1790,7 @@ func (p *Panel) subscribeUpdateControlPage(ctx context.Context) {
p.getUpdatedStatus(ctx)
go func() {
observability.Go(ctx, func() {
for {
select {
case <-ctx.Done():
@@ -1799,7 +1800,7 @@ func (p *Panel) subscribeUpdateControlPage(ctx context.Context) {
}
p.getUpdatedStatus(ctx)
}
}()
})
}
func (p *Panel) getSelectedProfile() Profile {
@@ -1821,7 +1822,7 @@ func (p *Panel) execCommand(ctx context.Context, cmdString string) {
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
go func() {
observability.Go(ctx, func() {
err := cmd.Run()
if err != nil {
p.DisplayError(err)
@@ -1829,7 +1830,7 @@ func (p *Panel) execCommand(ctx context.Context, cmdString string) {
logger.Debugf(ctx, "stdout: %s", stdout.Bytes())
logger.Debugf(ctx, "stderr: %s", stderr.Bytes())
}()
})
}
func (p *Panel) streamIsRunning(
@@ -2441,10 +2442,10 @@ func (p *Panel) profileWindow(
for _, cat := range dataTwitch.Cache.Categories {
if cleanTwitchCategoryName(cat.Name) == text {
setSelectedTwitchCategory(cat.Name)
go func() {
observability.Go(ctx, func() {
time.Sleep(100 * time.Millisecond)
twitchCategory.SetText("")
}()
})
return
}
}
@@ -2535,10 +2536,10 @@ func (p *Panel) profileWindow(
for _, bc := range dataYouTube.Cache.Broadcasts {
if cleanYoutubeRecordingName(bc.Snippet.Title) == text {
setSelectedYoutubeBroadcast(bc)
go func() {
observability.Go(ctx, func() {
time.Sleep(100 * time.Millisecond)
youtubeTemplate.SetText("")
}()
})
return
}
}

View File

@@ -15,6 +15,7 @@ import (
"fyne.io/fyne/v2/widget"
"github.com/dustin/go-humanize"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/streamctl/pkg/player"
"github.com/xaionaro-go/streamctl/pkg/streamd/api"
sptypes "github.com/xaionaro-go/streamctl/pkg/streamplayer/types"
@@ -38,7 +39,7 @@ func (p *Panel) initRestreamPage(
logger.Debugf(ctx, "initRestreamPage")
defer logger.Debugf(ctx, "/initRestreamPage")
go func() {
observability.Go(ctx, func() {
updateData := func() {
inStreams, err := p.StreamD.ListIncomingStreams(ctx)
if err != nil {
@@ -57,9 +58,9 @@ func (p *Panel) initRestreamPage(
for range ch {
updateData()
}
}()
})
go func() {
observability.Go(ctx, func() {
updateData := func() {
streamServers, err := p.StreamD.ListStreamServers(ctx)
if err != nil {
@@ -78,9 +79,9 @@ func (p *Panel) initRestreamPage(
for range ch {
updateData()
}
}()
})
go func() {
observability.Go(ctx, func() {
updateData := func() {
dsts, err := p.StreamD.ListStreamDestinations(ctx)
if err != nil {
@@ -99,9 +100,9 @@ func (p *Panel) initRestreamPage(
for range ch {
updateData()
}
}()
})
go func() {
observability.Go(ctx, func() {
updateData := func() {
streamFwds, err := p.StreamD.ListStreamForwards(ctx)
if err != nil {
@@ -120,9 +121,9 @@ func (p *Panel) initRestreamPage(
for range ch {
updateData()
}
}()
})
go func() {
observability.Go(ctx, func() {
updateData := func() {
streamPlayers, err := p.StreamD.ListStreamPlayers(ctx)
if err != nil {
@@ -141,7 +142,7 @@ func (p *Panel) initRestreamPage(
for range ch {
updateData()
}
}()
})
}
func (p *Panel) openAddStreamServerWindow(ctx context.Context) {
@@ -1032,7 +1033,7 @@ func (p *Panel) streamServersUpdater(
ctx context.Context,
) context.CancelFunc {
ctx, cancelFn := context.WithCancel(ctx)
go func() {
observability.Go(ctx, func() {
updateData := func() {
streamServers, err := p.StreamD.ListStreamServers(ctx)
if err != nil {
@@ -1061,7 +1062,7 @@ func (p *Panel) streamServersUpdater(
}
updateData()
}
}()
})
return cancelFn
}
@@ -1069,7 +1070,7 @@ func (p *Panel) startStreamPlayersUpdater(
ctx context.Context,
) context.CancelFunc {
ctx, cancelFn := context.WithCancel(ctx)
go func() {
observability.Go(ctx, func() {
updateData := func() {
streamPlayers, err := p.StreamD.ListStreamPlayers(ctx)
if err != nil {
@@ -1098,7 +1099,7 @@ func (p *Panel) startStreamPlayersUpdater(
}
updateData()
}
}()
})
return cancelFn
}
@@ -1106,7 +1107,7 @@ func (p *Panel) startStreamForwardersUpdater(
ctx context.Context,
) context.CancelFunc {
ctx, cancelFn := context.WithCancel(ctx)
go func() {
observability.Go(ctx, func() {
updateData := func() {
streamFwds, err := p.StreamD.ListStreamForwards(ctx)
if err != nil {
@@ -1135,6 +1136,6 @@ func (p *Panel) startStreamForwardersUpdater(
}
updateData()
}
}()
})
return cancelFn
}

View File

@@ -9,6 +9,7 @@ import (
"fyne.io/fyne/v2/driver/desktop"
"fyne.io/fyne/v2/widget"
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/unsafetools"
)
@@ -60,7 +61,7 @@ func (w *HintWidget) MouseIn(ev *desktop.MouseEvent) {
panic("should not have happened")
}
w.RecheckerCancelFn = cancelFn
go func() {
observability.Go(ctx, func() {
for {
select {
case <-ctx.Done():
@@ -80,7 +81,7 @@ func (w *HintWidget) MouseIn(ev *desktop.MouseEvent) {
pos.Y += 5
w.Hint.Move(pos)
}
}()
})
}
func (w *HintWidget) MouseMoved(*desktop.MouseEvent) {
}

View File

@@ -18,6 +18,7 @@ import (
"github.com/facebookincubator/go-belt/tool/logger/implementation/logrus"
"github.com/spf13/pflag"
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/streamctl/pkg/player"
ptypes "github.com/xaionaro-go/streamctl/pkg/player/types"
"github.com/xaionaro-go/streamctl/pkg/streamd/api"
@@ -210,10 +211,10 @@ func (s *StreamPlayerStreamServer) WaitPublisher(
}
ch := make(chan struct{})
go func() {
observability.Go(ctx, func() {
s.StreamServer.RelayServer.WaitPubsub(ctx, localAppName)
close(ch)
}()
})
return ch, nil
}

View File

@@ -11,6 +11,7 @@ import (
"github.com/facebookincubator/go-belt/tool/experimental/errmon"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/hashicorp/go-multierror"
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/streamctl/pkg/player"
"github.com/xaionaro-go/streamctl/pkg/streamd/api"
)
@@ -302,10 +303,10 @@ func (p *StreamPlayer) controllerLoop(ctx context.Context) {
}
}
go func() {
observability.Go(ctx, func() {
time.Sleep(time.Second) // TODO: delete this ugly racy hack
p.notifyStart(context.WithValue(ctx, CtxKeyStreamPlayer, p))
}()
})
logger.Debugf(ctx, "finished waiting for a publisher at '%s'", p.StreamID)

View File

@@ -15,6 +15,7 @@ import (
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/rs/zerolog/log"
"github.com/xaionaro-go/datacounter"
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/streamctl/pkg/streamserver/consts"
"github.com/xaionaro-go/streamctl/pkg/streamserver/implementations/go2rtc/streamserver/streams"
"github.com/xaionaro-go/streamctl/pkg/streamserver/types"
@@ -55,15 +56,15 @@ func New(
Listener: ln,
}
go func() {
observability.Go(ctx, func() {
<-ctx.Done()
logger.Infof(ctx, "closing %s", cfg.Listen)
err := ln.Close()
errmon.ObserveErrorCtx(ctx, err)
}()
})
logger.Infof(ctx, "started RTMP server at %s", cfg.Listen)
go func() {
observability.Go(ctx, func() {
for {
if ctx.Err() != nil {
return
@@ -75,13 +76,13 @@ func New(
return
}
go func() {
observability.Go(ctx, func() {
if err = s.tcpHandle(conn); err != nil {
errmon.ObserveErrorCtx(ctx, err)
}
}()
})
}
}()
})
return s, nil
}
@@ -197,12 +198,12 @@ func StreamsConsumerHandle(url string) (core.Consumer, types.NumBytesReaderWrote
ctx, cancelFn := context.WithCancel(ctx)
defer cancelFn()
go func() {
observability.Go(ctx, func() {
<-ctx.Done()
cancelFn()
err := wr.(io.Closer).Close()
errmon.ObserveErrorCtx(ctx, err)
}()
})
_, err = cons.WriteTo(wrc)
if err != nil {

View File

@@ -13,6 +13,7 @@ import (
"github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/facebookincubator/go-belt/tool/experimental/errmon"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/streamctl/pkg/streamserver/consts"
"github.com/xaionaro-go/streamctl/pkg/streamserver/implementations/go2rtc/streamserver/streams"
"github.com/xaionaro-go/streamctl/pkg/streamserver/types"
@@ -68,15 +69,15 @@ func New(
s.DefaultMedias = ParseQuery(query)
}
go func() {
observability.Go(ctx, func() {
<-ctx.Done()
logger.Infof(ctx, "closing %s", cfg.ListenAddr)
err := ln.Close()
errmon.ObserveErrorCtx(ctx, err)
}()
})
logger.Infof(ctx, "started RTSP server at %s", cfg.ListenAddr)
go func() {
observability.Go(ctx, func() {
for {
if ctx.Err() != nil {
return
@@ -95,7 +96,7 @@ func New(
}
go s.tcpHandler(c)
}
}()
})
return s, nil
}

View File

@@ -1,13 +1,16 @@
package streams
import (
"context"
"errors"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/xaionaro-go/streamctl/pkg/observability"
)
func (s *Stream) Play(source string) error {
ctx := context.TODO()
s.mu.Lock()
for _, producer := range s.producers {
if producer.state == stateInternal && producer.conn != nil {
@@ -45,10 +48,10 @@ func (s *Stream) Play(source string) error {
s.AddInternalProducer(src)
go func() {
observability.Go(ctx, func() {
_ = src.Start()
s.RemoveProducer(src)
}()
})
return nil
}
@@ -82,19 +85,19 @@ func (s *Stream) Play(source string) error {
s.AddInternalProducer(src)
s.AddInternalConsumer(cons)
go func() {
observability.Go(ctx, func() {
_ = dst.Start()
_ = src.Stop()
s.RemoveInternalConsumer(cons)
}()
})
go func() {
observability.Go(ctx, func() {
_ = src.Start()
// little timeout before stop dst, so the buffer can be transferred
time.Sleep(time.Second)
_ = dst.Stop()
s.RemoveProducer(src)
}()
})
return nil
}

View File

@@ -11,6 +11,7 @@ import (
"github.com/facebookincubator/go-belt/tool/experimental/errmon"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/hashicorp/go-multierror"
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/streamctl/pkg/streamserver/types"
)
@@ -48,7 +49,7 @@ func (sf *StreamForwarding) Start(
ctx, cancelFn := context.WithCancel(ctx)
sf.CancelFunc = cancelFn
go func() {
observability.Go(ctx, func() {
for {
select {
case <-ctx.Done():
@@ -81,7 +82,7 @@ func (sf *StreamForwarding) Start(
err := sf.Close()
errmon.ObserveErrorCtx(ctx, err)
}
}()
})
return nil
}

View File

@@ -14,6 +14,7 @@ import (
"github.com/gwuhaolin/livego/configure"
"github.com/gwuhaolin/livego/protocol/rtmp"
"github.com/spf13/viper"
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/streamctl/pkg/streamserver/types"
"github.com/xaionaro-go/streamctl/pkg/streamtypes"
)
@@ -128,13 +129,13 @@ func (s *StreamServer) startServer(
Listener: listener,
}
portServer.Server = rtmp.NewRtmpServer(portServer.Stream, nil)
go func() {
observability.Go(ctx, func() {
err = portServer.Server.Serve(listener)
if err != nil {
err = fmt.Errorf("unable to start serving RTMP at '%s': %w", listener.Addr().String(), err)
logger.Error(ctx, err)
}
}()
})
srv = portServer
case streamtypes.ServerTypeRTSP:
return fmt.Errorf("RTSP is not supported, yet")

View File

@@ -2,9 +2,11 @@ package streamserver
import (
"bytes"
"context"
"sync"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/streamctl/pkg/observability"
flvtag "github.com/yutopp/go-flv/tag"
)
@@ -37,11 +39,11 @@ func (pb *Pubsub) Deregister() error {
pb.m.Lock()
defer pb.m.Unlock()
go func() {
observability.Go(context.TODO(), func() {
for _, sub := range pb.subs {
_ = sub.Close()
}
}()
})
return pb.srv.removePubsub(pb.name)
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/facebookincubator/go-belt"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/hashicorp/go-multierror"
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/streamctl/pkg/streamserver/types"
"github.com/xaionaro-go/streamctl/pkg/xlogger"
flvtag "github.com/yutopp/go-flv/tag"
@@ -71,7 +72,7 @@ func (fwd *ActiveStreamForwarding) Start(ctx context.Context) error {
}
ctx, cancelFn := context.WithCancel(ctx)
fwd.CancelFunc = cancelFn
go func() {
observability.Go(ctx, func() {
for {
err := fwd.waitForPublisherAndStart(
ctx,
@@ -86,7 +87,7 @@ func (fwd *ActiveStreamForwarding) Start(ctx context.Context) error {
logger.Errorf(ctx, "%s", err)
}
}
}()
})
return nil
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/facebookincubator/go-belt"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/hashicorp/go-multierror"
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/streamctl/pkg/player"
"github.com/xaionaro-go/streamctl/pkg/streamd/api"
"github.com/xaionaro-go/streamctl/pkg/streamplayer"
@@ -39,10 +40,10 @@ func (s *StreamPlayerStreamServer) WaitPublisher(
}
ch := make(chan struct{})
go func() {
observability.Go(ctx, func() {
s.RelayServer.WaitPubsub(ctx, localAppName)
close(ch)
}()
})
return ch, nil
}

View File

@@ -11,6 +11,7 @@ import (
"github.com/facebookincubator/go-belt"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/streamctl/pkg/player"
playertypes "github.com/xaionaro-go/streamctl/pkg/player/types"
"github.com/xaionaro-go/streamctl/pkg/streamplayer"
@@ -120,7 +121,7 @@ func (s *StreamServer) Init(
}
}
go func() {
observability.Go(ctx, func() {
var opts setupStreamPlayersOptions
if initCfg.DefaultStreamPlayerOptions != nil {
opts = append(opts, setupStreamPlayersOptionDefaultStreamPlayerOptions(initCfg.DefaultStreamPlayerOptions))
@@ -129,7 +130,7 @@ func (s *StreamServer) Init(
if err != nil {
logger.Error(ctx, err)
}
}()
})
return nil
}
@@ -202,13 +203,13 @@ func (s *StreamServer) startServer(
}
},
})
go func() {
observability.Go(ctx, func() {
err = portSrv.Serve(listener)
if err != nil {
err = fmt.Errorf("unable to start serving RTMP at '%s': %w", listener.Addr().String(), err)
logger.Error(ctx, err)
}
}()
})
srv = portSrv
case streamtypes.ServerTypeRTSP:
return fmt.Errorf("RTSP is not supported, yet")