Initial commit, pt. 88

This commit is contained in:
Dmitrii Okunev
2024-08-11 20:37:59 +01:00
parent 37add3aea0
commit 4087e7785e
9 changed files with 85 additions and 39 deletions

View File

@@ -5,4 +5,4 @@ Website = "https://github.com/xaionaro/streamctl"
Name = "streampanel" Name = "streampanel"
ID = "center.dx.streampanel" ID = "center.dx.streampanel"
Version = "0.1.0" Version = "0.1.0"
Build = 95 Build = 96

View File

@@ -8,7 +8,9 @@ import (
"net/http" "net/http"
"os/exec" "os/exec"
"runtime" "runtime"
"strings"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/streamctl/pkg/observability" "github.com/xaionaro-go/streamctl/pkg/observability"
) )
@@ -42,7 +44,7 @@ func OAuth2HandlerViaBrowser(ctx context.Context, arg OAuthHandlerArgument) erro
return err return err
} }
err = LaunchBrowser(arg.AuthURL) err = LaunchBrowser(ctx, arg.AuthURL)
if err != nil { if err != nil {
return err return err
} }
@@ -91,14 +93,23 @@ func NewCodeReceiver(
return codeCh, uint16(listener.Addr().(*net.TCPAddr).Port), nil return codeCh, uint16(listener.Addr().(*net.TCPAddr).Port), nil
} }
func LaunchBrowser(url string) error { func LaunchBrowser(
ctx context.Context,
url string,
) error {
var args []string
switch runtime.GOOS { switch runtime.GOOS {
case "darwin": case "darwin":
return exec.Command("open", url).Start() args = []string{"open"}
case "linux": case "linux":
return exec.Command("xdg-open", url).Start() args = []string{"xdg-open"}
case "windows": case "windows":
return exec.Command("rundll32", "url.dll,FileProtocolHandler", url).Start() args = []string{"rundll32", "url.dll,FileProtocolHandler"}
default:
return fmt.Errorf("unsupported platform: <%s>", runtime.GOOS)
} }
return fmt.Errorf("unsupported platform: <%s>", runtime.GOOS)
args = append(args, url)
logger.Debugf(ctx, "launching a browser using command '%s'", strings.Join(args, " "))
return exec.Command(args[0], args[1:]...).Start()
} }

View File

@@ -2,8 +2,6 @@ package observability
import ( import (
"context" "context"
"github.com/facebookincubator/go-belt/tool/experimental/errmon"
) )
func Go(ctx context.Context, fn func()) { func Go(ctx context.Context, fn func()) {
@@ -15,7 +13,7 @@ func Go(ctx context.Context, fn func()) {
func GoSafe(ctx context.Context, fn func()) { func GoSafe(ctx context.Context, fn func()) {
go func() { go func() {
defer func() { errmon.ObserveRecoverCtx(ctx, recover()) }() defer func() { ReportPanicIfNotNil(ctx, recover()) }()
fn() fn()
}() }()
} }

View File

@@ -3,18 +3,30 @@ package observability
import ( import (
"context" "context"
"fmt" "fmt"
"runtime/debug"
"time" "time"
"github.com/facebookincubator/go-belt" "github.com/facebookincubator/go-belt"
"github.com/facebookincubator/go-belt/tool/experimental/errmon" "github.com/facebookincubator/go-belt/tool/experimental/errmon"
"github.com/facebookincubator/go-belt/tool/logger"
) )
func PanicIfNotNil(ctx context.Context, r any) { func PanicIfNotNil(ctx context.Context, r any) {
if r == nil { if r == nil {
return return
} }
errmon.ObserveRecoverCtx(ctx, r) ReportPanicIfNotNil(ctx, r)
belt.Flush(ctx)
time.Sleep(time.Second) time.Sleep(time.Second)
panic(fmt.Sprintf("%#+v", r)) panic(fmt.Sprintf("%#+v", r))
} }
func ReportPanicIfNotNil(ctx context.Context, r any) {
if r == nil {
return
}
logger.FromCtx(ctx).
WithField("error_event_exception_stack_trace", string(debug.Stack())).
Errorf("got panic: %v", r)
errmon.ObserveRecoverCtx(ctx, r)
belt.Flush(ctx)
}

View File

@@ -650,7 +650,10 @@ func (p *Panel) openBrowser(
ctx context.Context, ctx context.Context,
url string, url string,
reason string, reason string,
) error { ) (_err error) {
logger.Debugf(ctx, "openBrowser(ctx, '%s', '%s')", url, reason)
defer func() { logger.Debugf(ctx, "/openBrowser(ctx, '%s', '%s'): %3", url, reason, _err) }()
if p.Config.Browser.Command != "" { if p.Config.Browser.Command != "" {
logger.Debugf(ctx, "the browser command is configured to be : '%s'", p.Config.Browser.Command) logger.Debugf(ctx, "the browser command is configured to be : '%s'", p.Config.Browser.Command)
return exec.Command(p.Config.Browser.Command, url).Start() return exec.Command(p.Config.Browser.Command, url).Start()
@@ -667,7 +670,7 @@ func (p *Panel) openBrowser(
browserCmd = "xdg-open" browserCmd = "xdg-open"
} }
default: default:
return oauthhandler.LaunchBrowser(url) return oauthhandler.LaunchBrowser(ctx, url)
} }
waitCh := make(chan struct{}) waitCh := make(chan struct{})
@@ -712,6 +715,7 @@ func (p *Panel) openBrowser(
errmon.ObserveErrorCtx(ctx, err) errmon.ObserveErrorCtx(ctx, err)
} }
logger.Debugf(ctx, "openBrowser(ctx, '%s', '%s'): resulting command '%s %s'", browserCmd, url)
return exec.Command(browserCmd, url).Start() return exec.Command(browserCmd, url).Start()
} }
@@ -2052,7 +2056,7 @@ func (p *Panel) setupStream(ctx context.Context) {
if timeDiff < 0 { if timeDiff < 0 {
return return
} }
p.startStopButton.SetText(timeDiff.String()) p.startStopButton.SetText(fmt.Sprintf("%.1fs", timeDiff.Seconds()))
} }
}) })
} }
@@ -2832,6 +2836,7 @@ func (p *Panel) showWaitStreamDCallWindow(ctx context.Context) {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
p.waitStreamDCallWindow.Close() p.waitStreamDCallWindow.Close()
p.waitStreamDCallWindow = nil p.waitStreamDCallWindow = nil
logger.Debugf(ctx, "closed the 'network operation is in progress' window")
}() }()
select { select {
@@ -2872,6 +2877,7 @@ func (p *Panel) showWaitStreamDConnectWindow(ctx context.Context) {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
p.waitStreamDConnectWindow.Close() p.waitStreamDConnectWindow.Close()
p.waitStreamDConnectWindow = nil p.waitStreamDConnectWindow = nil
logger.Debugf(ctx, "closed the 'connecting is in progress' window")
}() }()
select { select {
@@ -2882,8 +2888,8 @@ func (p *Panel) showWaitStreamDConnectWindow(ctx context.Context) {
p.waitStreamDConnectWindowLocker.Lock() p.waitStreamDConnectWindowLocker.Lock()
defer p.waitStreamDConnectWindowLocker.Unlock() defer p.waitStreamDConnectWindowLocker.Unlock()
logger.Debugf(ctx, "making a 'network operation is in progress' window") logger.Debugf(ctx, "making a 'connecting is in progress' window")
defer logger.Debugf(ctx, "made a 'network operation is in progress' window") defer logger.Debugf(ctx, "made a 'connecting is in progress' window")
if p.waitStreamDConnectWindow != nil { if p.waitStreamDConnectWindow != nil {
return return
} }
@@ -2891,7 +2897,7 @@ func (p *Panel) showWaitStreamDConnectWindow(ctx context.Context) {
return return
} }
waitStreamDConnectWindow := p.app.NewWindow(AppName + ": Please wait...") waitStreamDConnectWindow := p.app.NewWindow(AppName + ": Please wait...")
textWidget := widget.NewRichTextFromMarkdown("Network operation is in process, please wait...") textWidget := widget.NewRichTextFromMarkdown("Connecting is in process, please wait...")
waitStreamDConnectWindow.SetContent(textWidget) waitStreamDConnectWindow.SetContent(textWidget)
waitStreamDConnectWindow.Show() waitStreamDConnectWindow.Show()
p.waitStreamDConnectWindow = waitStreamDConnectWindow p.waitStreamDConnectWindow = waitStreamDConnectWindow

View File

@@ -58,15 +58,13 @@ func (h *Handler) OnPublish(_ *rtmp.StreamContext, timestamp uint32, cmd *rtmpms
return errors.New("PublishingName is empty") return errors.New("PublishingName is empty")
} }
pubsub, err := h.relayService.NewPubsub(cmd.PublishingName) pubsub, err := h.relayService.NewPubsub(cmd.PublishingName, h)
if err != nil { if err != nil {
return errors.Wrap(err, "Failed to create pubsub") return errors.Wrap(err, "Failed to create pubsub")
} }
pub := pubsub.Pub() pub := pubsub.Pub()
h.pub = pub h.pub = pub
return nil return nil
} }

View File

@@ -6,13 +6,15 @@ import (
"sync" "sync"
"github.com/facebookincubator/go-belt/tool/logger" "github.com/facebookincubator/go-belt/tool/logger"
"github.com/hashicorp/go-multierror"
"github.com/xaionaro-go/streamctl/pkg/observability" "github.com/xaionaro-go/streamctl/pkg/observability"
flvtag "github.com/yutopp/go-flv/tag" flvtag "github.com/yutopp/go-flv/tag"
) )
type Pubsub struct { type Pubsub struct {
srv *RelayService srv *RelayService
name string name string
publisherHandler *Handler
pub *Pub pub *Pub
@@ -22,10 +24,11 @@ type Pubsub struct {
m sync.Mutex m sync.Mutex
} }
func NewPubsub(srv *RelayService, name string) *Pubsub { func NewPubsub(srv *RelayService, name string, publisherHandler *Handler) *Pubsub {
return &Pubsub{ return &Pubsub{
srv: srv, publisherHandler: publisherHandler,
name: name, srv: srv,
name: name,
subs: map[uint64]*Sub{}, subs: map[uint64]*Sub{},
} }
@@ -42,13 +45,24 @@ func (pb *Pubsub) Deregister() error {
pb.m.Lock() pb.m.Lock()
defer pb.m.Unlock() defer pb.m.Unlock()
observability.Go(context.TODO(), func() { return pb.deregister()
}
func (pb *Pubsub) deregister() error {
observability.GoSafe(context.TODO(), func() {
for _, sub := range pb.subs { for _, sub := range pb.subs {
_ = sub.Close() _ = sub.Close()
} }
}) })
return pb.srv.removePubsub(pb.name) var result *multierror.Error
result = multierror.Append(result, pb.srv.removePubsub(pb.name))
h := pb.publisherHandler
pb.publisherHandler = nil
if h != nil {
result = multierror.Append(result, h.conn.Close())
}
return result.ErrorOrNil()
} }
func (pb *Pubsub) Pub() *Pub { func (pb *Pubsub) Pub() *Pub {

View File

@@ -25,17 +25,18 @@ func NewRelayService() *RelayService {
} }
} }
func (s *RelayService) NewPubsub(key string) (*Pubsub, error) { func (s *RelayService) NewPubsub(key string, publisherHandler *Handler) (*Pubsub, error) {
s.m.Lock() s.m.Lock()
defer s.m.Unlock() defer s.m.Unlock()
ctx := context.TODO()
logger.Debugf(ctx, "NewPubsub(%s)", key)
logger.Default().Debugf("NewPubsub(%s)", key) if oldStream, ok := s.streams[key]; ok {
err := oldStream.deregister()
if _, ok := s.streams[key]; ok { logger.Warnf(ctx, "unable to close the old stream: %v", err)
return nil, fmt.Errorf("already published: %s", key)
} }
pubsub := NewPubsub(s, key) pubsub := NewPubsub(s, key, publisherHandler)
s.streams[key] = pubsub s.streams[key] = pubsub

View File

@@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"net/url" "net/url"
"runtime/debug"
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
@@ -158,7 +159,8 @@ func (fwd *ActiveStreamForwarding) waitForPublisherAndStart(
if _ret == nil { if _ret == nil {
return return
} }
logger.Errorf(ctx, "%v", _ret) logger.FromCtx(ctx).
WithField("error_event_exception_stack_trace", string(debug.Stack())).Errorf("%v", _ret)
}() }()
fwd.Locker.Lock() fwd.Locker.Lock()
defer fwd.Locker.Unlock() defer fwd.Locker.Unlock()
@@ -321,8 +323,10 @@ func (fwd *ActiveStreamForwarding) waitForPublisherAndStart(
fwd.Locker.Unlock() fwd.Locker.Unlock()
<-fwd.Sub.ClosedChan() <-fwd.Sub.ClosedChan()
fwd.Locker.Lock() fwd.Locker.Lock()
fwd.Client.Close() if fwd.Client != nil {
fwd.Client = nil fwd.Client.Close()
fwd.Client = nil
}
logger.Debugf(ctx, "the source stopped, so stopped also publishing to '%s'", urlParsed.String()) logger.Debugf(ctx, "the source stopped, so stopped also publishing to '%s'", urlParsed.String())
return nil return nil
} }
@@ -335,8 +339,10 @@ func (fwd *ActiveStreamForwarding) Close() error {
} }
var result *multierror.Error var result *multierror.Error
fwd.CancelFunc() if fwd.CancelFunc != nil {
fwd.CancelFunc = nil fwd.CancelFunc()
fwd.CancelFunc = nil
}
if fwd.Sub != nil { if fwd.Sub != nil {
result = multierror.Append(result, fwd.Sub.Close()) result = multierror.Append(result, fwd.Sub.Close())
fwd.Sub = nil fwd.Sub = nil