diff --git a/cmd/streamd/main.go b/cmd/streamd/main.go index e9b9b0e..1458d5d 100644 --- a/cmd/streamd/main.go +++ b/cmd/streamd/main.go @@ -40,7 +40,9 @@ func main() { netPprofAddr := pflag.String("go-net-pprof-addr", "", "address to listen to for net/pprof requests") cpuProfile := pflag.String("go-profile-cpu", "", "file to write cpu profile to") heapProfile := pflag.String("go-profile-heap", "", "file to write memory profile to") + sentryDSN := pflag.String("sentry-dsn", "", "DSN of a Sentry instance to send error reports") pflag.Parse() + l := logrus.Default().WithLevel(loggerLevel) if *cpuProfile != "" { @@ -84,6 +86,24 @@ func main() { ctx := context.Background() ctx = logger.CtxWithLogger(ctx, l) + + if *sentryDSN != "" { + l.Infof("setting up Sentry at DSN '%s'", *sentryDSN) + sentryClient, err := sentry.NewClient(sentry.ClientOptions{ + Dsn: *sentryDSN, + }) + if err != nil { + l.Fatal(err) + } + sentryErrorMonitor := errmonsentry.New(sentryClient) + ctx = errmon.CtxWithErrorMonitor(ctx, sentryErrorMonitor) + + l = l.WithPreHooks(observability.NewErrorMonitorLoggerHook( + sentryErrorMonitor, + )) + ctx = logger.CtxWithLogger(ctx, l) + } + logger.Default = func() logger.Logger { return l } @@ -134,27 +154,6 @@ func main() { l.Fatalf("unable to initialize the streamd instance: %v", err) } - l := l - if streamD.Config.SentryDSN != "" { - l.Infof("setting up Sentry at DSN '%s'", streamD.Config.SentryDSN) - sentryClient, err := sentry.NewClient(sentry.ClientOptions{ - Dsn: streamD.Config.SentryDSN, - }) - if err != nil { - l.Fatal(err) - } - sentryErrorMonitor := errmonsentry.New(sentryClient) - ctx = errmon.CtxWithErrorMonitor(ctx, sentryErrorMonitor) - - l = l.WithPreHooks(observability.NewErrorMonitorLoggerHook( - sentryErrorMonitor, - )) - ctx = logger.CtxWithLogger(ctx, l) - logger.Default = func() logger.Logger { - return l - } - } - go func() { if err = streamD.Run(ctx); err != nil { l.Errorf("streamd returned an error: %v", err) diff --git a/cmd/streampanel/context.go b/cmd/streampanel/context.go new file mode 100644 index 0000000..a88e44b --- /dev/null +++ b/cmd/streampanel/context.go @@ -0,0 +1,50 @@ +package main + +import ( + "context" + "os" + + "github.com/facebookincubator/go-belt/tool/experimental/errmon" + errmonsentry "github.com/facebookincubator/go-belt/tool/experimental/errmon/implementation/sentry" + "github.com/facebookincubator/go-belt/tool/logger" + "github.com/facebookincubator/go-belt/tool/logger/implementation/logrus" + "github.com/getsentry/sentry-go" + "github.com/xaionaro-go/streamctl/pkg/observability" +) + +func getContext(flags Flags) context.Context { + ctx := context.Background() + + ll := logrus.DefaultLogrusLogger() + l := logrus.New(ll).WithLevel(flags.LoggerLevel) + + if flags.LogFile != "" { + f, err := os.OpenFile(flags.LogFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0750) + if err != nil { + l.Errorf("failed to open log file '%s': %v", flags.LogFile, err) + } + ll.SetOutput(f) + } + + if flags.SentryDSN != "" { + l.Infof("setting up Sentry at DSN '%s'", flags.SentryDSN) + sentryClient, err := sentry.NewClient(sentry.ClientOptions{ + Dsn: flags.SentryDSN, + }) + if err != nil { + l.Fatal(err) + } + sentryErrorMonitor := errmonsentry.New(sentryClient) + ctx = errmon.CtxWithErrorMonitor(ctx, sentryErrorMonitor) + l = l.WithPreHooks(observability.NewErrorMonitorLoggerHook( + sentryErrorMonitor, + )) + } + + ctx = logger.CtxWithLogger(ctx, l) + logger.Default = func() logger.Logger { + return l + } + + return ctx +} diff --git a/cmd/streampanel/flag.go b/cmd/streampanel/flag.go new file mode 100644 index 0000000..715054a --- /dev/null +++ b/cmd/streampanel/flag.go @@ -0,0 +1,87 @@ +package main + +import ( + "context" + "fmt" + + "github.com/facebookincubator/go-belt/tool/logger" + "github.com/spf13/pflag" + "github.com/xaionaro-go/streamctl/pkg/mainprocess" + "github.com/xaionaro-go/streamctl/pkg/streampanel/consts" +) + +type Flags struct { + LoggerLevel logger.Level + ListenAddr string + RemoteAddr string + ConfigPath string + NetPprofAddr string + CPUProfile string + HeapProfile string + SentryDSN string + Page string + LogFile string + Subprocess string + SplitProcess bool +} + +func parseFlags() Flags { + loggerLevel := logger.LevelWarning + pflag.Var(&loggerLevel, "log-level", "Log level") + listenAddr := pflag.String("listen-addr", "", "the address to listen for incoming connections to") + remoteAddr := pflag.String("remote-addr", "", "the address (for example 127.0.0.1:3594) of streamd to connect to, instead of running the stream controllers locally") + configPath := pflag.String("config-path", "~/.streampanel.yaml", "the path to the config file") + netPprofAddr := pflag.String("go-net-pprof-addr", "", "address to listen to for net/pprof requests") + cpuProfile := pflag.String("go-profile-cpu", "", "file to write cpu profile to") + heapProfile := pflag.String("go-profile-heap", "", "file to write memory profile to") + sentryDSN := pflag.String("sentry-dsn", "", "DSN of a Sentry instance to send error reports") + page := pflag.String("page", string(consts.PageControl), "DSN of a Sentry instance to send error reports") + logFile := pflag.String("log-file", "", "log file to write logs into") + subprocess := pflag.String("subprocess", "", "[internal use flag] run a specific sub-process (format: processName:addressToConnect)") + splitProcess := pflag.Bool("split-process", !isMobile(), "split the process into multiple processes for better stability") + pflag.Parse() + + return Flags{ + LoggerLevel: loggerLevel, + ListenAddr: *listenAddr, + RemoteAddr: *remoteAddr, + ConfigPath: *configPath, + NetPprofAddr: *netPprofAddr, + CPUProfile: *cpuProfile, + HeapProfile: *heapProfile, + SentryDSN: *sentryDSN, + Page: *page, + LogFile: *logFile, + Subprocess: *subprocess, + SplitProcess: *splitProcess, + } +} + +type GetFlags struct{} +type GetFlagsResult struct { + Flags Flags +} + +func getFlags( + ctx context.Context, + mainProcess *mainprocess.Client, +) Flags { + err := mainProcess.SendMessage(ctx, "main", GetFlags{}) + assertNoError(err) + + var flags Flags + err = mainProcess.ReadOne( + ctx, + func(ctx context.Context, source mainprocess.ProcessName, content any) error { + result, ok := content.(GetFlagsResult) + if !ok { + return fmt.Errorf("got unexpected type '%T' instead of %T", content, GetFlagsResult{}) + } + flags = result.Flags + return nil + }, + ) + assertNoError(err) + + return flags +} diff --git a/cmd/streampanel/fork.go b/cmd/streampanel/fork.go new file mode 100644 index 0000000..c0e52bd --- /dev/null +++ b/cmd/streampanel/fork.go @@ -0,0 +1,186 @@ +package main + +import ( + "context" + "encoding/gob" + "fmt" + "os" + "os/exec" + "strings" + "sync" + "time" + + "github.com/facebookincubator/go-belt/tool/logger" + "github.com/xaionaro-go/streamctl/pkg/mainprocess" +) + +type ProcessName = mainprocess.ProcessName + +const ( + ProcessNameMain = ProcessName("main") + ProcessNameStreamd = ProcessName("streamd") + ProcessNameUI = ProcessName("ui") +) + +var forkLocker sync.Mutex +var forkMap = map[ProcessName]*exec.Cmd{} + +func getFork(procName ProcessName) *exec.Cmd { + forkLocker.Lock() + defer forkLocker.Unlock() + return forkMap[procName] +} + +func setFork(procName ProcessName, f *exec.Cmd) { + forkLocker.Lock() + defer forkLocker.Unlock() + forkMap[procName] = f +} + +func init() { + gob.Register(StreamDDied{}) + gob.Register(GetFlags{}) + gob.Register(GetFlagsResult{}) + gob.Register(GetStreamdAddress{}) + gob.Register(GetStreamdAddressResult{}) +} + +const ( + EnvPassword = "STREAMPANEL_PASSWORD" +) + +func runSubprocess( + ctx context.Context, + subprocessFlag string, +) { + parts := strings.SplitN(subprocessFlag, ":", 2) + if len(parts) != 2 { + logger.Fatalf(ctx, "expected 2 parts in --subprocess: name and address, separated via a colon") + } + procName := ProcessName(parts[0]) + addr := parts[1] + switch procName { + case ProcessNameStreamd: + forkStreamd(ctx, addr, os.Getenv(EnvPassword)) + case ProcessNameUI: + forkUI(ctx, addr, os.Getenv(EnvPassword)) + } +} + +func runSplitProcesses( + ctx context.Context, + flags Flags, +) { + procList := []ProcessName{ + ProcessNameUI, + } + if flags.RemoteAddr == "" { + procList = append(procList, ProcessNameStreamd) + } + + var m *mainprocess.Manager + var err error + m, err = mainprocess.NewManager( + func( + ctx context.Context, + procName ProcessName, + addr string, + password string, + isRestart bool, + ) error { + + f := getFork(procName) + if f != nil { + time.Sleep(time.Millisecond * 100) + //f.Process.Kill() + logger.Debugf(ctx, "waiting for process '%s' to die", procName) + f.Wait() + } + + logger.Infof(ctx, "running process '%s'", procName) + err := runFork(ctx, flags, procName, addr, password) + if err != nil { + panic(err) + } + if isRestart && flags.RemoteAddr == "" { + switch procName { + case ProcessNameStreamd: + err := m.SendMessage(ctx, ProcessNameUI, StreamDDied{}) + if err != nil { + logger.Errorf(ctx, "failed to send a StreamDDied message to '%s': %v", ProcessNameUI, err) + } + } + } + return nil + }, + procList..., + ) + if err != nil { + logger.Fatalf(ctx, "failed to start process manager: %v", err) + } + defer m.Close() + go m.Serve(ctx, func(ctx context.Context, source mainprocess.ProcessName, content any) error { + switch content.(type) { + case GetFlags: + msg := GetFlagsResult{ + Flags: flags, + } + err := m.SendMessage(ctx, source, msg) + if err != nil { + logger.Errorf(ctx, "failed to send message %#+v to '%s': %v", msg, source, err) + } + } + return nil + }) + go func() { + select { + case <-ctx.Done(): + return + case <-time.After(10 * time.Second): + } + + err := m.VerifyEverybodyConnected(ctx) + if err != nil { + logger.Fatalf(ctx, "%s", err) + } + }() + + <-ctx.Done() +} + +const debugDontFork = false + +func runFork( + ctx context.Context, + flags Flags, + procName ProcessName, + addr, password string, +) error { + logger.Debugf(ctx, "running fork: '%s' '%s' '%s'", procName, addr, password) + defer logger.Debugf(ctx, "/running fork: '%s' '%s' '%s'", procName, addr, password) + if debugDontFork { + return fakeFork(ctx, procName, addr, password) + } + os.Setenv(EnvPassword, password) + args := []string{os.Args[0], "--sentry-dsn=" + flags.SentryDSN, "--log-level=" + flags.LoggerLevel.String(), "--subprocess=" + string(procName) + ":" + addr} + logger.Infof(ctx, "running '%s %s'", args[0], strings.Join(args[1:], " ")) + cmd := exec.Command(args[0], args[1:]...) + cmd.Stderr = os.Stderr + cmd.Stdout = os.Stdout + cmd.Stdin = os.Stdin + setFork(procName, cmd) + go cmd.Run() + return nil +} + +func fakeFork(ctx context.Context, procName ProcessName, addr, password string) error { + switch procName { + case ProcessNameStreamd: + go forkUI(ctx, addr, password) + return nil + case ProcessNameUI: + go forkStreamd(ctx, addr, password) + return nil + } + return fmt.Errorf("unexpected process name: %s", procName) +} diff --git a/cmd/streampanel/is_mobile.go b/cmd/streampanel/is_mobile.go new file mode 100644 index 0000000..44b709a --- /dev/null +++ b/cmd/streampanel/is_mobile.go @@ -0,0 +1,13 @@ +package main + +import ( + "runtime" +) + +func isMobile() bool { + switch runtime.GOOS { + case "android": + return true + } + return false +} diff --git a/cmd/streampanel/main.go b/cmd/streampanel/main.go index 44f3863..3bb4d28 100644 --- a/cmd/streampanel/main.go +++ b/cmd/streampanel/main.go @@ -2,161 +2,54 @@ package main import ( "context" - "net" - "net/http" _ "net/http/pprof" - "os" - "runtime" - "runtime/pprof" "github.com/facebookincubator/go-belt" - "github.com/facebookincubator/go-belt/tool/experimental/errmon" - errmonsentry "github.com/facebookincubator/go-belt/tool/experimental/errmon/implementation/sentry" "github.com/facebookincubator/go-belt/tool/logger" - "github.com/facebookincubator/go-belt/tool/logger/implementation/logrus" - "github.com/getsentry/sentry-go" - "github.com/kraken-hpc/go-fork" - "github.com/spf13/pflag" - "github.com/xaionaro-go/streamctl/pkg/observability" "github.com/xaionaro-go/streamctl/pkg/streamcontrol" "github.com/xaionaro-go/streamctl/pkg/streamd/grpc/go/streamd_grpc" - "github.com/xaionaro-go/streamctl/pkg/streamd/server" "github.com/xaionaro-go/streamctl/pkg/streampanel" - "github.com/xaionaro-go/streamctl/pkg/streampanel/consts" _ "github.com/xaionaro-go/streamctl/pkg/streamserver" - "google.golang.org/grpc" ) -func init() { - fork.RegisterFunc("streamd", streamd) - fork.Init() -} - -func streamd(remoteAddr string) { - -} - const forceNetPProfOnAndroid = true func main() { - loggerLevel := logger.LevelWarning - pflag.Var(&loggerLevel, "log-level", "Log level") - listenAddr := pflag.String("listen-addr", "", "the address to listen for incoming connections to") - remoteAddr := pflag.String("remote-addr", "", "the address (for example 127.0.0.1:3594) of streamd to connect to, instead of running the stream controllers locally") - configPath := pflag.String("config-path", "~/.streampanel.yaml", "the path to the config file") - netPprofAddr := pflag.String("go-net-pprof-addr", "", "address to listen to for net/pprof requests") - cpuProfile := pflag.String("go-profile-cpu", "", "file to write cpu profile to") - heapProfile := pflag.String("go-profile-heap", "", "file to write memory profile to") - sentryDSN := pflag.String("sentry-dsn", "", "DSN of a Sentry instance to send error reports") - page := pflag.String("page", string(consts.PageControl), "DSN of a Sentry instance to send error reports") - splitProcess := pflag.Bool("split-process", !isMobile(), "split the process into multiple processes for better stability") - pflag.Parse() - - l := logrus.Default().WithLevel(loggerLevel) - logger.Default = func() logger.Logger { - return l - } - - if *cpuProfile != "" { - f, err := os.Create(*cpuProfile) - if err != nil { - l.Fatalf("unable to create file '%s': %v", *cpuProfile, err) - } - defer f.Close() - if err := pprof.StartCPUProfile(f); err != nil { - l.Fatalf("unable to write to file '%s': %v", *cpuProfile, err) - } - defer pprof.StopCPUProfile() - } - - if *heapProfile != "" { - f, err := os.Create(*heapProfile) - if err != nil { - l.Fatalf("unable to create file '%s': %v", *heapProfile, err) - } - defer f.Close() - runtime.GC() - if err := pprof.WriteHeapProfile(f); err != nil { - l.Fatalf("unable to write to file '%s': %v", *heapProfile, err) - } - } - - if *netPprofAddr != "" || (forceNetPProfOnAndroid && runtime.GOOS == "android") { - go func() { - if *netPprofAddr == "" { - *netPprofAddr = "localhost:0" - } - l.Infof("starting to listen for net/pprof requests at '%s'", *netPprofAddr) - l.Error(http.ListenAndServe(*netPprofAddr, nil)) - }() - } - - if oldValue := runtime.GOMAXPROCS(0); oldValue < 16 { - l.Infof("increased GOMAXPROCS from %d to %d", oldValue, 16) - runtime.GOMAXPROCS(16) - } - - if *splitProcess && *listenAddr == "" { - listenAddr = ptr("localhost:0") - } - - listener, err := net.Listen("tcp", *listenAddr) - if err != nil { - l.Fatalf("failed to listen: %v", err) - } - - if *splitProcess { - fork.Fork("streamd", listener.Addr().String()) - } - - ctx := context.Background() - go func() { - <-ctx.Done() - listener.Close() - }() - - if *splitProcess && *remoteAddr == "" { - remoteAddr = ptr(listener.Addr().String()) - } - - var opts []streampanel.Option - if *remoteAddr != "" { - opts = append(opts, streampanel.OptionRemoteStreamDAddr(*remoteAddr)) - } - if *sentryDSN != "" { - opts = append(opts, streampanel.OptionSentryDSN(*sentryDSN)) - } - panel, panelErr := streampanel.New(*configPath, opts...) - - if panel != nil && panel.Config.SentryDSN != "" { - l.Infof("setting up Sentry at DSN '%s'", panel.Config.SentryDSN) - sentryClient, err := sentry.NewClient(sentry.ClientOptions{ - Dsn: panel.Config.SentryDSN, - }) - if err != nil { - l.Fatal(err) - } - sentryErrorMonitor := errmonsentry.New(sentryClient) - ctx = errmon.CtxWithErrorMonitor(ctx, sentryErrorMonitor) - l = l.WithPreHooks(observability.NewErrorMonitorLoggerHook( - sentryErrorMonitor, - )) - } - - ctx = logger.CtxWithLogger(ctx, l) - logger.Default = func() logger.Logger { - return l - } + flags := parseFlags() + ctx := getContext(flags) defer belt.Flush(ctx) + cancelFunc := initRuntime(ctx, flags, "main") + defer cancelFunc() - if panelErr != nil { - l.Fatal(panelErr) + if flags.Subprocess != "" { + runSubprocess(ctx, flags.Subprocess) + return } - if *listenAddr != "" { - grpcServer := grpc.NewServer() - streamdGRPC := server.NewGRPCServer(panel.StreamD) - streamd_grpc.RegisterStreamDServer(grpcServer, streamdGRPC) + if flags.SplitProcess { + runSplitProcesses(ctx, flags) + return + } + + runPanel(ctx, flags) +} + +func runPanel( + ctx context.Context, + flags Flags, +) { + var opts []streampanel.Option + if flags.RemoteAddr != "" { + opts = append(opts, streampanel.OptionRemoteStreamDAddr(flags.RemoteAddr)) + } + + panel, panelErr := streampanel.New(flags.ConfigPath, opts...) + if panelErr != nil { + logger.Fatal(ctx, panelErr) + } + + if flags.ListenAddr != "" { + listener, grpcServer, streamdGRPC := initGRPCServer(ctx, panel.StreamD, flags.ListenAddr) // to erase an oauth request answered locally from "UnansweredOAuthRequests" in the GRPC server: panel.OnInternallySubmittedOAuthCode = func( @@ -171,20 +64,18 @@ func main() { return err } - // start the server: - go func() { - l.Infof("started server at %s", *listenAddr) - err = grpcServer.Serve(listener) - if err != nil { - l.Fatal(err) - } - }() + err := grpcServer.Serve(listener) + if err != nil { + logger.Fatalf(ctx, "unable to server the gRPC server: %v", err) + } } var loopOpts []streampanel.LoopOption - loopOpts = append(loopOpts, streampanel.LoopOptionStartingPage(*page)) - err = panel.Loop(ctx, loopOpts...) + if flags.Page != "" { + loopOpts = append(loopOpts, streampanel.LoopOptionStartingPage(flags.Page)) + } + err := panel.Loop(ctx, loopOpts...) if err != nil { - l.Fatal(err) + logger.Fatal(ctx, err) } } diff --git a/cmd/streampanel/runtime.go b/cmd/streampanel/runtime.go new file mode 100644 index 0000000..de224c7 --- /dev/null +++ b/cmd/streampanel/runtime.go @@ -0,0 +1,63 @@ +package main + +import ( + "context" + "net/http" + "os" + "runtime" + "runtime/pprof" + + "github.com/facebookincubator/go-belt/tool/logger" +) + +func initRuntime(ctx context.Context, flags Flags, _procName ProcessName) context.CancelFunc { + procName := string(_procName) + var closeFuncs []func() + + l := logger.FromCtx(ctx) + + if flags.CPUProfile != "" { + f, err := os.Create(flags.CPUProfile + "-" + procName) + if err != nil { + l.Fatalf("unable to create file '%s': %v", flags.CPUProfile+"-"+procName, err) + } + closeFuncs = append(closeFuncs, func() { f.Close() }) + if err := pprof.StartCPUProfile(f); err != nil { + l.Fatalf("unable to write to file '%s': %v", flags.CPUProfile+"-"+procName, err) + } + closeFuncs = append(closeFuncs, pprof.StopCPUProfile) + } + + if flags.HeapProfile != "" { + f, err := os.Create(flags.HeapProfile + "-" + procName) + if err != nil { + l.Fatalf("unable to create file '%s': %v", flags.HeapProfile+"-"+procName, err) + } + closeFuncs = append(closeFuncs, func() { f.Close() }) + runtime.GC() + if err := pprof.WriteHeapProfile(f); err != nil { + l.Fatalf("unable to write to file '%s': %v", flags.HeapProfile+"-"+procName, err) + } + } + + if flags.NetPprofAddr != "" || (forceNetPProfOnAndroid && runtime.GOOS == "android") { + go func() { + if flags.NetPprofAddr == "" { + flags.NetPprofAddr = "localhost:0" + } + l.Infof("starting to listen for net/pprof requests at '%s'", flags.NetPprofAddr) + l.Error(http.ListenAndServe(flags.NetPprofAddr, nil)) + }() + } + + if oldValue := runtime.GOMAXPROCS(0); oldValue < 16 { + l.Infof("increased GOMAXPROCS from %d to %d", oldValue, 16) + runtime.GOMAXPROCS(16) + } + + return func() { + for i := len(closeFuncs) - 1; i >= 0; i-- { + closeFuncs[i]() + } + } +} diff --git a/cmd/streampanel/streamd.go b/cmd/streampanel/streamd.go new file mode 100644 index 0000000..fe7843c --- /dev/null +++ b/cmd/streampanel/streamd.go @@ -0,0 +1,150 @@ +package main + +import ( + "context" + "net" + "os" + "sync" + + "github.com/facebookincubator/go-belt" + "github.com/facebookincubator/go-belt/tool/experimental/errmon" + "github.com/facebookincubator/go-belt/tool/logger" + "github.com/xaionaro-go/streamctl/cmd/streamd/ui" + "github.com/xaionaro-go/streamctl/pkg/mainprocess" + "github.com/xaionaro-go/streamctl/pkg/streamcontrol" + "github.com/xaionaro-go/streamctl/pkg/streamd" + "github.com/xaionaro-go/streamctl/pkg/streamd/api" + "github.com/xaionaro-go/streamctl/pkg/streamd/config" + "github.com/xaionaro-go/streamctl/pkg/streamd/grpc/go/streamd_grpc" + "github.com/xaionaro-go/streamctl/pkg/streamd/server" + streampanelconfig "github.com/xaionaro-go/streamctl/pkg/streampanel/config" + "github.com/xaionaro-go/streamctl/pkg/xpath" + "google.golang.org/grpc" +) + +func forkStreamd(ctx context.Context, mainProcessAddr, password string) { + procName := ProcessNameStreamd + ctx = belt.WithField(ctx, "process", procName) + + mainProcess, err := mainprocess.NewClient( + procName, + mainProcessAddr, + password, + ) + if err != nil { + panic(err) + } + flags := getFlags(ctx, mainProcess) + ctx = getContext(flags) + ctx = belt.WithField(ctx, "process", procName) + defer belt.Flush(ctx) + logger.Debugf(ctx, "flags == %#+v", flags) + cancelFunc := initRuntime(ctx, flags, procName) + defer cancelFunc() + + runStreamd(ctx, flags) +} + +func runStreamd( + ctx context.Context, + flags Flags, +) { + logger.Debugf(ctx, "runStreamd: %#+v", flags) + defer logger.Debugf(ctx, "/runStreamd") + if flags.RemoteAddr != "" { + logger.Fatal(ctx, "not implemented") + } + + configPath, err := xpath.Expand(flags.ConfigPath) + if err != nil { + logger.Fatal(ctx, err) + } + + var cfg streampanelconfig.Config + err = streampanelconfig.ReadConfigFromPath(configPath, &cfg) + if err != nil { + logger.Fatalf(ctx, "unable to read the config from path '%s': %v", flags.ConfigPath, err) + } + + var streamdGRPCLocker sync.Mutex + + var streamdGRPC *server.GRPCServer + ui := ui.NewUI( + ctx, + func(listenPort uint16, platID streamcontrol.PlatformName, authURL string) bool { + logger.Tracef(ctx, "streamd.UI.OpenOAuthURL(%d, %s, '%s')", listenPort, platID, authURL) + defer logger.Tracef(ctx, "/streamd.UI.OpenOAuthURL(%d, %s, '%s')", listenPort, platID, authURL) + + streamdGRPCLocker.Lock() + logger.Tracef(ctx, "streamdGRPCLocker.Lock()-ed") + defer logger.Tracef(ctx, "streamdGRPCLocker.Lock()-ed") + defer streamdGRPCLocker.Unlock() + + err := streamdGRPC.OpenOAuthURL(ctx, listenPort, platID, authURL) + errmon.ObserveErrorCtx(ctx, err) + return err == nil + }, + func(ctx context.Context, s string) { + logger.Infof(ctx, "restarting streamd") + os.Exit(0) + }, + ) + + streamD, err := streamd.New( + cfg.BuiltinStreamD, + ui, + func(ctx context.Context, cfg config.Config) error { + return nil + }, + belt.CtxBelt(ctx), + ) + if err != nil { + logger.Fatalf(ctx, "unable to initialize streamd: %v", err) + } + + var listener net.Listener + var grpcServer *grpc.Server + listener, grpcServer, streamdGRPC = initGRPCServer(ctx, streamD, flags.ListenAddr) + + err = streamD.Run(ctx) + if err != nil { + logger.Fatalf(ctx, "unable to start streamd: %v", err) + } + + err = grpcServer.Serve(listener) + if err != nil { + logger.Fatalf(ctx, "unable to server the gRPC server: %v", err) + } + + logger.Fatalf(ctx, "internal error: was supposed to never reach this line") +} + +func initGRPCServer( + ctx context.Context, + streamD api.StreamD, + listenAddr string, +) (net.Listener, *grpc.Server, *server.GRPCServer) { + listener, err := net.Listen("tcp", listenAddr) + if err != nil { + logger.Fatalf(ctx, "failed to listen: %v", err) + } + go func() { + <-ctx.Done() + listener.Close() + }() + + grpcServer := grpc.NewServer() + streamdGRPC := server.NewGRPCServer(streamD) + streamd_grpc.RegisterStreamDServer(grpcServer, streamdGRPC) + + // start the server: + go func() { + logger.Infof(ctx, "started server at %s", listener.Addr().String()) + err = grpcServer.Serve(listener) + if err != nil { + logger.Fatal(ctx, err) + } + }() + + return listener, grpcServer, streamdGRPC +} diff --git a/cmd/streampanel/ui.go b/cmd/streampanel/ui.go new file mode 100644 index 0000000..0cec3dd --- /dev/null +++ b/cmd/streampanel/ui.go @@ -0,0 +1,89 @@ +package main + +import ( + "context" + "fmt" + "os" + + "github.com/facebookincubator/go-belt" + "github.com/facebookincubator/go-belt/tool/logger" + "github.com/xaionaro-go/streamctl/pkg/mainprocess" +) + +func forkUI(ctx context.Context, mainProcessAddr, password string) { + procName := ProcessNameUI + ctx = belt.WithField(ctx, "process", procName) + + mainProcess, err := mainprocess.NewClient( + procName, + mainProcessAddr, + password, + ) + if err != nil { + panic(err) + } + flags := getFlags(ctx, mainProcess) + ctx = getContext(flags) + ctx = belt.WithField(ctx, "process", procName) + defer belt.Flush(ctx) + logger.Debugf(ctx, "flags == %#+v", flags) + cancelFunc := initRuntime(ctx, flags, procName) + defer cancelFunc() + + streamdAddr := getStreamDAddress(ctx, mainProcess) + go mainProcess.Serve( + ctx, + func(ctx context.Context, source mainprocess.ProcessName, content any) error { + switch content.(type) { + case StreamDDied: + os.Exit(0) + } + return nil + }, + ) + + flags.RemoteAddr = streamdAddr + runPanel(ctx, flags) +} + +type StreamDDied struct{} + +type GetStreamdAddress struct{} +type GetStreamdAddressResult struct { + Address string +} + +func getStreamDAddress( + ctx context.Context, + mainProcess *mainprocess.Client, +) string { + err := mainProcess.SendMessage(ctx, "streamd", GetStreamdAddress{}) + assertNoError(err) + + var addr string + for { + readOnceMore := false + err = mainProcess.ReadOne( + ctx, + func(ctx context.Context, source mainprocess.ProcessName, content any) error { + switch msg := content.(type) { + case GetStreamdAddressResult: + addr = msg.Address + case StreamDDied: + readOnceMore = true + default: + return fmt.Errorf("got unexpected type '%T' instead of %T", content, GetStreamdAddressResult{}) + } + return nil + }, + ) + assertNoError(err) + + if readOnceMore { + continue + } + break + } + + return addr +} diff --git a/cmd/streampanel/utils.go b/cmd/streampanel/utils.go new file mode 100644 index 0000000..d0292bf --- /dev/null +++ b/cmd/streampanel/utils.go @@ -0,0 +1,11 @@ +package main + +func ptr[T any](in T) *T { + return &in +} + +func assertNoError(err error) { + if err != nil { + panic(err) + } +} diff --git a/go.mod b/go.mod index e561cdb..956caef 100644 --- a/go.mod +++ b/go.mod @@ -164,7 +164,6 @@ require ( github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/srwiley/oksvg v0.0.0-20221011165216-be6e8873101c // indirect github.com/srwiley/rasterx v0.0.0-20220730225603-2ab79fcdd4ef // indirect - github.com/stretchr/testify v1.9.0 // indirect github.com/vishvananda/netlink v1.1.1-0.20211118161826-650dca95af54 // indirect github.com/vishvananda/netns v0.0.4 // indirect github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect @@ -219,7 +218,6 @@ require ( github.com/hyprspace/hyprspace v0.10.1 github.com/immune-gmbh/attestation-sdk v0.0.0-20230711173209-f44e4502aeca github.com/kbinani/screenshot v0.0.0-20230812210009-b87d31814237 - github.com/kraken-hpc/go-fork v0.1.1 github.com/libp2p/go-libp2p v0.33.2 github.com/libp2p/go-libp2p-kad-dht v0.25.2 github.com/multiformats/go-multiaddr v0.12.3 @@ -228,6 +226,7 @@ require ( github.com/rs/zerolog v1.33.0 github.com/sethvargo/go-password v0.3.1 github.com/spf13/pflag v1.0.5 + github.com/stretchr/testify v1.9.0 github.com/xaionaro-go/datacounter v1.0.4 github.com/xaionaro-go/unsafetools v0.0.0-20210722164218-75ba48cf7b3c github.com/yl2chen/cidranger v1.0.2 diff --git a/go.sum b/go.sum index 3173c30..ed8aca7 100644 --- a/go.sum +++ b/go.sum @@ -454,8 +454,6 @@ github.com/kr/pty v1.1.3/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/kraken-hpc/go-fork v0.1.1 h1:O3X/ynoNy/eS7UIcZYef8ndFq2RXEIOue9kZqyzF0Sk= -github.com/kraken-hpc/go-fork v0.1.1/go.mod h1:uu0e5h+V4ONH5Qk/xuVlyNXJXy/swhqGIEMK7w+9dNc= github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w= github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY= github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6cdF0Y8= diff --git a/pkg/mainprocess/client.go b/pkg/mainprocess/client.go index 5451419..ce18dda 100644 --- a/pkg/mainprocess/client.go +++ b/pkg/mainprocess/client.go @@ -10,16 +10,14 @@ import ( ) type Client struct { - Conn net.Conn - Password string - OnReceivedMessage OnReceivedMessageFunc + Conn net.Conn + Password string } func NewClient( - myName string, + myName ProcessName, addr string, password string, - onReceivedMessage OnReceivedMessageFunc, ) (*Client, error) { conn, err := net.Dial("tcp", addr) if err != nil { @@ -47,15 +45,14 @@ func NewClient( logger.Default().Tracef("successfully registered the process '%s'", myName) return &Client{ - Conn: conn, - Password: password, - OnReceivedMessage: onReceivedMessage, + Conn: conn, + Password: password, }, nil } func (c *Client) SendMessage( ctx context.Context, - dst string, + dst ProcessName, content any, ) error { encoder := gob.NewEncoder(c.Conn) @@ -76,7 +73,10 @@ func (c *Client) Close() error { return c.Conn.Close() } -func (c *Client) Serve(ctx context.Context) error { +func (c *Client) Serve( + ctx context.Context, + onReceivedMessage OnReceivedMessageFunc, +) error { ctx, cancelFn := context.WithCancel(ctx) defer cancelFn() go func() { @@ -94,26 +94,26 @@ func (c *Client) Serve(ctx context.Context) error { default: } - var msg MessageFromMain - decoder := gob.NewDecoder(c.Conn) - err := decoder.Decode(&msg) - if err != nil { - return fmt.Errorf("unable to receive&decode message: %w", err) - } - - if err := c.onReceivedMessage(ctx, msg); err != nil { - logger.Error(ctx, err) + if err := c.ReadOne(ctx, onReceivedMessage); err != nil { + return err } } } -func (c *Client) onReceivedMessage( +func (c *Client) ReadOne( ctx context.Context, - msg MessageFromMain, + onReceivedMessage OnReceivedMessageFunc, ) error { - if c.OnReceivedMessage == nil { - return fmt.Errorf("OnReceivedMessage function is not set") + var msg MessageFromMain + decoder := gob.NewDecoder(c.Conn) + err := decoder.Decode(&msg) + if err != nil { + return fmt.Errorf("unable to receive&decode message: %w", err) } - return c.OnReceivedMessage(ctx, msg.Source, msg.Content) + if err := onReceivedMessage(ctx, msg.Source, msg.Content); err != nil { + return fmt.Errorf("unable to process the message '%#+v': %w", msg, err) + } + + return nil } diff --git a/pkg/mainprocess/main_process.go b/pkg/mainprocess/main_process.go index 1f2c043..607b12d 100644 --- a/pkg/mainprocess/main_process.go +++ b/pkg/mainprocess/main_process.go @@ -15,26 +15,34 @@ import ( type OnReceivedMessageFunc func( ctx context.Context, - source string, + source ProcessName, content any, ) error +type LaunchClientFunc func( + ctx context.Context, + procName ProcessName, + addr string, + password string, + isRestart bool, +) error + type Manager struct { listener net.Listener password string connsLocker sync.Mutex - conns map[string]net.Conn + conns map[ProcessName]net.Conn connsChanged chan struct{} - allClientProcesses []string + allClientProcesses []ProcessName - OnReceivedMessage OnReceivedMessageFunc + LaunchClient LaunchClientFunc } func NewManager( - onReceivedMessage OnReceivedMessageFunc, - expectedClients ...string, + launchClient LaunchClientFunc, + expectedClients ...ProcessName, ) (*Manager, error) { listener, err := net.Listen("tcp", "localhost:0") if err != nil { @@ -47,12 +55,12 @@ func NewManager( } return &Manager{ - OnReceivedMessage: onReceivedMessage, + LaunchClient: launchClient, listener: listener, password: password, - conns: map[string]net.Conn{}, + conns: map[ProcessName]net.Conn{}, connsChanged: make(chan struct{}), allClientProcesses: expectedClients, @@ -71,7 +79,24 @@ func (m *Manager) Close() error { return m.listener.Close() } -func (m *Manager) Serve(ctx context.Context) error { +func (m *Manager) VerifyEverybodyConnected( + ctx context.Context, +) error { + m.connsLocker.Lock() + defer m.connsLocker.Unlock() + + for _, name := range m.allClientProcesses { + if _, ok := m.conns[name]; !ok { + return fmt.Errorf("client '%s' is not connected", name) + } + } + return nil +} + +func (m *Manager) Serve( + ctx context.Context, + onReceivedMessage OnReceivedMessageFunc, +) error { logger.Tracef(ctx, "serving listener at %s", m.listener.Addr()) defer logger.Tracef(ctx, "/serving listener at %s", m.listener.Addr()) @@ -86,6 +111,15 @@ func (m *Manager) Serve(ctx context.Context) error { } }() + if m.LaunchClient != nil { + for _, name := range m.allClientProcesses { + err := m.LaunchClient(ctx, name, m.listener.Addr().String(), m.password, false) + if err != nil { + return fmt.Errorf("unable to launch '%s': %w", name, err) + } + } + } + for { select { case <-ctx.Done(): @@ -99,22 +133,24 @@ func (m *Manager) Serve(ctx context.Context) error { } logger.Tracef(ctx, "accepted a connection from '%s'", conn.RemoteAddr()) - m.addNewConnection(ctx, conn) + m.addNewConnection(ctx, conn, onReceivedMessage) } } func (m *Manager) addNewConnection( ctx context.Context, conn net.Conn, + onReceivedMessage OnReceivedMessageFunc, ) { go func() { - m.handleConnection(ctx, conn) + m.handleConnection(ctx, conn, onReceivedMessage) }() } func (m *Manager) handleConnection( ctx context.Context, conn net.Conn, + onReceivedMessage OnReceivedMessageFunc, ) { var regMessage RegistrationMessage logger.Tracef(ctx, "handleConnection from %s", conn.RemoteAddr()) @@ -152,7 +188,7 @@ func (m *Manager) handleConnection( logger.Error(ctx, err) return } - defer func(sourceName string) { + defer func(sourceName ProcessName) { m.unregisterConnection(sourceName) }(regMessage.Source) if err := encoder.Encode(RegistrationResult{}); err != nil { @@ -162,6 +198,16 @@ func (m *Manager) handleConnection( } ctx = belt.WithField(ctx, "client", regMessage.Source) + defer func() { + if m.LaunchClient == nil { + return + } + err := m.LaunchClient(ctx, regMessage.Source, m.listener.Addr().String(), m.password, true) + if err != nil { + logger.Error(ctx, err) + } + }() + for { select { case <-ctx.Done(): @@ -185,7 +231,7 @@ func (m *Manager) handleConnection( return } - if err := m.processMessage(ctx, regMessage.Source, message); err != nil { + if err := m.processMessage(ctx, regMessage.Source, message, onReceivedMessage); err != nil { logger.Errorf( ctx, "unable to process the message %#+v from %s (%s): %w", @@ -198,8 +244,9 @@ func (m *Manager) handleConnection( func (m *Manager) processMessage( ctx context.Context, - source string, + source ProcessName, message MessageToMain, + onReceivedMessage OnReceivedMessageFunc, ) (_ret error) { logger.Tracef(ctx, "processing message from '%s': %#+v", source, message) defer func() { logger.Tracef(ctx, "/processing message from '%s': %#+v: %v", source, message, _ret) }() @@ -209,7 +256,7 @@ func (m *Manager) processMessage( logger.Tracef(ctx, "a broadcast message from '%s': %#+v", source, message.Content) var wg sync.WaitGroup var err *multierror.Error - err = multierror.Append(err, m.onReceivedMessage(ctx, source, message.Content)) + err = multierror.Append(err, onReceivedMessage(ctx, source, message.Content)) errCh := make(chan error) go func() { @@ -222,7 +269,7 @@ func (m *Manager) processMessage( continue } wg.Add(1) - go func(dst string) { + go func(dst ProcessName) { defer wg.Done() errCh <- m.sendMessage(ctx, source, dst, message.Content) }(dst) @@ -232,37 +279,22 @@ func (m *Manager) processMessage( return err.ErrorOrNil() case "main": logger.Tracef(ctx, "a message to the main process from '%s': %#+v", source, message.Content) - return m.onReceivedMessage(ctx, source, message.Content) + return onReceivedMessage(ctx, source, message.Content) default: logger.Tracef(ctx, "a message to '%s' from '%s': %#+v", message.Destination, source, message.Content) return m.sendMessage(ctx, source, message.Destination, message.Content) } } -func (m *Manager) onReceivedMessage( - ctx context.Context, - source string, - content any, -) error { - if m.OnReceivedMessage == nil { - err := fmt.Errorf("OnReceivedMessage is not set") - logger.Tracef(ctx, "%v", err) - return err - } - - logger.Tracef(ctx, "calling the OnReceivedMessage function") - return m.OnReceivedMessage(ctx, source, content) -} - type MessageFromMain struct { - Source string + Source ProcessName Password string - Destination string + Destination ProcessName Content any } func (m *Manager) isExpectedProcess( - name string, + name ProcessName, ) bool { for _, p := range m.allClientProcesses { if name == p { @@ -274,8 +306,8 @@ func (m *Manager) isExpectedProcess( func (m *Manager) sendMessage( ctx context.Context, - source string, - destination string, + source ProcessName, + destination ProcessName, content any, ) (_ret error) { logger.Tracef(ctx, "sending message message %#+v from '%s' to '%s'", content, source, destination) @@ -309,7 +341,7 @@ func (m *Manager) sendMessage( } func (m *Manager) waitForProcess( - name string, + name ProcessName, ) (net.Conn, error) { if !m.isExpectedProcess(name) { return nil, fmt.Errorf("process '%s' is not ever expected", name) @@ -336,7 +368,7 @@ func (m *Manager) checkPassword( } func (m *Manager) registerConnection( - sourceName string, + sourceName ProcessName, conn net.Conn, ) error { if !m.isExpectedProcess(sourceName) { @@ -356,7 +388,7 @@ func (m *Manager) registerConnection( } func (m *Manager) unregisterConnection( - sourceName string, + sourceName ProcessName, ) { m.connsLocker.Lock() defer m.connsLocker.Unlock() @@ -368,7 +400,7 @@ func (m *Manager) unregisterConnection( type RegistrationMessage struct { Password string - Source string + Source ProcessName } type RegistrationResult struct { @@ -377,6 +409,30 @@ type RegistrationResult struct { type MessageToMain struct { Password string - Destination string + Destination ProcessName Content any } + +func (m *Manager) SendMessage( + ctx context.Context, + dst ProcessName, + content any, +) error { + conn, err := m.waitForProcess(dst) + if err != nil { + return fmt.Errorf("unable to wait for process '%s': %w", dst, err) + } + encoder := gob.NewEncoder(conn) + msg := MessageFromMain{ + Source: "main", + Password: m.password, + Destination: dst, + Content: content, + } + err = encoder.Encode(msg) + logger.Tracef(ctx, "sending message %#+v: %v", msg, err) + if err != nil { + return fmt.Errorf("unable to encode&send message %#+v: %w", msg, err) + } + return nil +} diff --git a/pkg/mainprocess/types.go b/pkg/mainprocess/types.go new file mode 100644 index 0000000..201038c --- /dev/null +++ b/pkg/mainprocess/types.go @@ -0,0 +1,3 @@ +package mainprocess + +type ProcessName string diff --git a/pkg/mainprocess/unit_test.go b/pkg/mainprocess/unit_test.go index 834423b..40fe91d 100644 --- a/pkg/mainprocess/unit_test.go +++ b/pkg/mainprocess/unit_test.go @@ -50,35 +50,36 @@ func Test(t *testing.T) { } m, err := NewManager( - func(ctx context.Context, source string, content any) error { - handleCall("main", content) - return nil - }, + nil, "child0", "child1", ) require.NoError(t, err) defer m.Close() - go m.Serve(belt.WithField(ctx, "process", "main")) + go m.Serve( + belt.WithField(ctx, "process", "main"), + func(ctx context.Context, source ProcessName, content any) error { + handleCall("main", content) + return nil + }, + ) - c0, err := NewClient("child0", m.Addr().String(), m.Password(), func(ctx context.Context, source string, content any) error { + c0, err := NewClient("child0", m.Addr().String(), m.Password()) + require.NoError(t, err) + defer c0.Close() + go c0.Serve(belt.WithField(ctx, "process", "child0"), func(ctx context.Context, source ProcessName, content any) error { handleCall("child0", content) return nil }) - require.NoError(t, err) - defer c0.Close() - go c0.Serve(belt.WithField(ctx, "process", "child0")) - c1, err := NewClient("child1", m.Addr().String(), m.Password(), func(ctx context.Context, source string, content any) error { + c1, err := NewClient("child1", m.Addr().String(), m.Password()) + require.NoError(t, err) + defer c1.Close() + go c1.Serve(belt.WithField(ctx, "process", "child1"), func(ctx context.Context, source ProcessName, content any) error { handleCall("child1", content) return nil }) - require.NoError(t, err) - defer c1.Close() - go c1.Serve(belt.WithField(ctx, "process", "child1")) - _, err = NewClient("child2", m.Addr().String(), m.Password(), func(ctx context.Context, source string, content any) error { - return nil - }) + _, err = NewClient("child2", m.Addr().String(), m.Password()) require.Error(t, err) waitCh0 := handleCallHappened["main"] diff --git a/pkg/streamd/config/config.go b/pkg/streamd/config/config.go index 1aacacd..38a8fdb 100644 --- a/pkg/streamd/config/config.go +++ b/pkg/streamd/config/config.go @@ -19,13 +19,11 @@ type ProfileMetadata struct { MaxOrder int } - type config struct { CachePath *string `yaml:"cache_path"` GitRepo GitRepoConfig Backends streamcontrol.Config ProfileMetadata map[streamcontrol.ProfileName]ProfileMetadata - SentryDSN string `yaml:"sentry_dsn"` StreamServer streamserver.Config `yaml:"stream_server"` } diff --git a/pkg/streamd/config/git_repo_config.go b/pkg/streamd/config/git_repo_config.go new file mode 100644 index 0000000..142b26f --- /dev/null +++ b/pkg/streamd/config/git_repo_config.go @@ -0,0 +1,8 @@ +package config + +type GitRepoConfig struct { + Enable *bool + URL string `yaml:"url,omitempty"` + PrivateKey string `yaml:"private_key,omitempty"` + LatestSyncCommit string `yaml:"latest_sync_commit,omitempty"` // TODO: deprecate this field, it's just a non-needed mechanism (better to check against git history). +} diff --git a/pkg/streampanel/config/config.go b/pkg/streampanel/config/config.go index 60a4d24..12b48c3 100644 --- a/pkg/streampanel/config/config.go +++ b/pkg/streampanel/config/config.go @@ -17,7 +17,6 @@ type ScreenshotConfig struct { } type Config struct { - SentryDSN string `yaml:"sentry_dsn"` RemoteStreamDAddr string `yaml:"streamd_remote"` BuiltinStreamD streamd.Config `yaml:"streamd_builtin"` Screenshot ScreenshotConfig `yaml:"screenshot"` diff --git a/pkg/streampanel/config/options.go b/pkg/streampanel/config/options.go index cd15d3b..b078b0f 100644 --- a/pkg/streampanel/config/options.go +++ b/pkg/streampanel/config/options.go @@ -18,9 +18,3 @@ type OptionRemoteStreamDAddr string func (o OptionRemoteStreamDAddr) Apply(cfg *Config) { cfg.RemoteStreamDAddr = string(o) } - -type OptionSentryDSN string - -func (o OptionSentryDSN) Apply(cfg *Config) { - cfg.SentryDSN = string(o) -} diff --git a/pkg/streampanel/fyne.go b/pkg/streampanel/fyne.go new file mode 100644 index 0000000..0829e8b --- /dev/null +++ b/pkg/streampanel/fyne.go @@ -0,0 +1,14 @@ +package streampanel + +import ( + "time" + + "fyne.io/fyne/v2" +) + +func hideWindow(w fyne.Window) { + for i := 0; i < 10; i++ { + time.Sleep(time.Millisecond) + w.Hide() + } +} diff --git a/pkg/streampanel/options.go b/pkg/streampanel/options.go index effd3d6..cb0d973 100644 --- a/pkg/streampanel/options.go +++ b/pkg/streampanel/options.go @@ -6,4 +6,3 @@ type Config = config.Config type Option = config.Option type Options = config.Options type OptionRemoteStreamDAddr = config.OptionRemoteStreamDAddr -type OptionSentryDSN = config.OptionSentryDSN diff --git a/pkg/streampanel/panel.go b/pkg/streampanel/panel.go index 2514f71..6227a7b 100644 --- a/pkg/streampanel/panel.go +++ b/pkg/streampanel/panel.go @@ -200,6 +200,23 @@ func (p *Panel) dumpConfig(ctx context.Context) { logger.Tracef(ctx, "the current config is: %s", buf.String()) } +func (p *Panel) lazyInitStreamD(ctx context.Context) error { + if p.StreamD != nil { + return nil + } + + if p.Config.RemoteStreamDAddr != "" { + if err := p.initRemoteStreamD(ctx); err != nil { + return fmt.Errorf("unable to initialize the remote stream controller '%s': %w", p.Config.RemoteStreamDAddr, err) + } + } else { + if err := p.initBuiltinStreamD(ctx); err != nil { + return fmt.Errorf("unable to initialize the builtin stream controller '%s': %w", p.configPath, err) + } + } + return nil +} + func (p *Panel) Loop(ctx context.Context, opts ...LoopOption) error { if p.defaultContext != nil { return fmt.Errorf("Loop was already used, and cannot be used the second time") @@ -210,14 +227,8 @@ func (p *Panel) Loop(ctx context.Context, opts ...LoopOption) error { p.defaultContext = ctx - if p.Config.RemoteStreamDAddr != "" { - if err := p.initRemoteStreamD(ctx); err != nil { - return fmt.Errorf("unable to initialize the remote stream controller '%s': %w", p.Config.RemoteStreamDAddr, err) - } - } else { - if err := p.initBuiltinStreamD(ctx); err != nil { - return fmt.Errorf("unable to initialize the builtin stream controller '%s': %w", p.configPath, err) - } + if err := p.lazyInitStreamD(ctx); err != nil { + return fmt.Errorf("unable to initialize stream controller: %w", err) } p.app = fyneapp.New() diff --git a/pkg/streampanel/util.go b/pkg/streampanel/util.go new file mode 100644 index 0000000..4c4543a --- /dev/null +++ b/pkg/streampanel/util.go @@ -0,0 +1,7 @@ +package streampanel + +func assert(b bool) { + if !b { + panic("assertion failed") + } +} diff --git a/pkg/streamserver/server/traffic_counter.go b/pkg/streamserver/server/traffic_counter.go new file mode 100644 index 0000000..ab62c10 --- /dev/null +++ b/pkg/streamserver/server/traffic_counter.go @@ -0,0 +1,77 @@ +package server + +import ( + "fmt" + "math" + "sync" + "sync/atomic" + "unsafe" +) + +type NumBytesWroter interface { + NumBytesWrote() uint64 +} + +type NumBytesReader interface { + NumBytesRead() uint64 +} + +type NumBytesReaderWroter interface { + NumBytesReader + NumBytesWroter +} + +type Counter interface { + Count() uint64 +} + +type TrafficCounter struct { + sync.Mutex + ReaderCounter Counter + WriterCounter Counter +} + +func (tc *TrafficCounter) NumBytesWrote() uint64 { + if tc == nil { + return math.MaxUint64 + } + + tc.Lock() + defer tc.Unlock() + if tc.WriterCounter == nil { + return math.MaxUint64 + } + return tc.WriterCounter.Count() +} + +func (tc *TrafficCounter) NumBytesRead() uint64 { + if tc == nil { + return math.MaxUint64 + } + + tc.Lock() + defer tc.Unlock() + if tc.ReaderCounter == nil { + return math.MaxUint64 + } + fmt.Println(tc.ReaderCounter.Count()) + return tc.ReaderCounter.Count() +} + +type IntPtrCounter struct { + Pointer *int +} + +func NewIntPtrCounter(v *int) *IntPtrCounter { + return &IntPtrCounter{ + Pointer: v, + } +} + +func (c *IntPtrCounter) Count() uint64 { + if c == nil || c.Pointer == nil { + return math.MaxUint64 + } + + return uint64(atomic.LoadInt32((*int32)(unsafe.Pointer(c.Pointer)))) +} diff --git a/pkg/streamserver/util.go b/pkg/streamserver/util.go new file mode 100644 index 0000000..ec1d7d0 --- /dev/null +++ b/pkg/streamserver/util.go @@ -0,0 +1,7 @@ +package streamserver + +func assert(b bool) { + if !b { + panic("assertion failed") + } +}