Random bugfixes
Some checks failed
rolling-release / build (push) Has been cancelled
rolling-release / rolling-release (push) Has been cancelled

This commit is contained in:
Dmitrii Okunev
2025-08-05 19:43:04 +01:00
parent ee80a08343
commit f943aea609
7 changed files with 126 additions and 39 deletions

6
go.mod
View File

@@ -216,12 +216,13 @@ require (
github.com/volatiletech/sqlboiler/v4 v4.16.2 // indirect
github.com/volatiletech/strmangle v0.0.6 // indirect
github.com/wlynxg/anet v0.0.6-0.20250109065809-5501d401a269 // indirect
github.com/xaionaro-go/avcommon v0.0.0-20250510235605-840f8210b727 // indirect
github.com/xaionaro-go/avcommon v0.0.0-20250629220425-ad846e62efb6 // indirect
github.com/xaionaro-go/avmediacodec v0.0.0-20250505012527-c819676502d8 // indirect
github.com/xaionaro-go/gorex v0.0.0-20241010205749-bcd59d639c4d // indirect
github.com/xaionaro-go/libsrt v0.0.0-20250505013920-61d894a3b7e9 // indirect
github.com/xaionaro-go/ndk v0.0.0-20250420195304-361bb98583bf // indirect
github.com/xaionaro-go/proxy v0.0.0-20250525144747-579f5a891c15 // indirect
github.com/xaionaro-go/sockopt v0.0.0-20250526214909-e15e1e4bc738 // indirect
github.com/xaionaro-go/spinlock v0.0.0-20200518175509-30e6d1ce68a1 // indirect
github.com/xanzy/ssh-agent v0.3.3 // indirect
github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778 // indirect
@@ -236,6 +237,7 @@ require (
go.opentelemetry.io/otel/trace v1.36.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
gocv.io/x/gocv v0.41.0 // indirect
golang.org/x/arch v0.12.0 // indirect
golang.org/x/exp v0.0.0-20250408133849-7e4ce0ab07d0 // indirect
golang.org/x/image v0.24.0 // indirect
@@ -296,7 +298,7 @@ require (
github.com/spf13/pflag v1.0.6
github.com/stretchr/testify v1.10.0
github.com/xaionaro-go/audio v0.0.0-20250210102901-abfced9d5ef3
github.com/xaionaro-go/avpipeline v0.0.0-20250525204026-17104bc4baca
github.com/xaionaro-go/avpipeline v0.0.0-20250714012253-e93f3585e4b2
github.com/xaionaro-go/datacounter v1.0.4
github.com/xaionaro-go/go-rtmp v0.0.0-20241009130244-1e3160f27f42
github.com/xaionaro-go/grpcproxy v0.0.0-20241103205849-a8fef42e72f9

12
go.sum
View File

@@ -1072,12 +1072,12 @@ github.com/xaionaro-go/astiav v0.0.0-20250521203320-7402f3e25a7c h1:MTrVWO9fpyua
github.com/xaionaro-go/astiav v0.0.0-20250521203320-7402f3e25a7c/go.mod h1:GI0pHw6K2/pl/o8upCtT49P/q4KCwhv/8nGLlCsZLdA=
github.com/xaionaro-go/audio v0.0.0-20250210102901-abfced9d5ef3 h1:LRIpqqC7Gsz5+/EsIWRtdPZZPMpx9yykUVFyUnRaKbE=
github.com/xaionaro-go/audio v0.0.0-20250210102901-abfced9d5ef3/go.mod h1:i4CntPlryh9HLmA3p3M0CNr1usRkEkuh3N2Ui3HeXQA=
github.com/xaionaro-go/avcommon v0.0.0-20250510235605-840f8210b727 h1:uQ8V1T3Oeru15gEIthts9GWkMS3eQ0Eo7vOKvE4G2/k=
github.com/xaionaro-go/avcommon v0.0.0-20250510235605-840f8210b727/go.mod h1:kjLo1LasgdDJqbTGD5bbEM+D6RiZSbf5ZT8yiPFF1BA=
github.com/xaionaro-go/avcommon v0.0.0-20250629220425-ad846e62efb6 h1:Z/krIvEqIU9VVogzrtRR6/AWJt12qIhFQ7GoZVC+gSg=
github.com/xaionaro-go/avcommon v0.0.0-20250629220425-ad846e62efb6/go.mod h1:kjLo1LasgdDJqbTGD5bbEM+D6RiZSbf5ZT8yiPFF1BA=
github.com/xaionaro-go/avmediacodec v0.0.0-20250505012527-c819676502d8 h1:FZn9+TN3uHhohfpanWkR9lFNHApizznZbML6XjvEgTU=
github.com/xaionaro-go/avmediacodec v0.0.0-20250505012527-c819676502d8/go.mod h1:2W2Kp/HJFXcFBppQ4YytgDy/ydFL3hGc23xSB1U/Luc=
github.com/xaionaro-go/avpipeline v0.0.0-20250525204026-17104bc4baca h1:Cls4rEimemZicWzhVlzQm1otU/RTpfNVpgdfwvGEJrQ=
github.com/xaionaro-go/avpipeline v0.0.0-20250525204026-17104bc4baca/go.mod h1:LMh5Qi7cuntcktUezfA9toVCUCCsx9pjyGDWe9GLt9A=
github.com/xaionaro-go/avpipeline v0.0.0-20250714012253-e93f3585e4b2 h1:5mPJWmZhHgA+RykZdAyT9+6D1SJlaoYl16zfgxKDP+U=
github.com/xaionaro-go/avpipeline v0.0.0-20250714012253-e93f3585e4b2/go.mod h1:eFxxNA50Pyp1B+snK0TlmpsnrGUtzJohbY/+J3BEvqA=
github.com/xaionaro-go/datacounter v1.0.4 h1:+QMZLmu73R5WGkQfUPwlXF/JFN+Weo4iuDZkiL2wVm8=
github.com/xaionaro-go/datacounter v1.0.4/go.mod h1:Sf9vBevuV6w5iE6K3qJ9pWVKcyS60clWBUSQLjt5++c=
github.com/xaionaro-go/eventbus v0.0.0-20250720144534-4670758005d9 h1:ZAm8ueMw5D85LDeV1Kboc3ANqXr3LK/eXIl9hj1BJyM=
@@ -1134,6 +1134,8 @@ github.com/xaionaro-go/secret v0.0.0-20250111141743-ced12e1082c2 h1:QHpTWfyfmz65
github.com/xaionaro-go/secret v0.0.0-20250111141743-ced12e1082c2/go.mod h1:XKoHGZ4VKMbVBl8VotLIoWQdrB6Q7jnR++RbkiegZFU=
github.com/xaionaro-go/serializable v0.0.0-20250412140540-5ac572306599 h1:CzcQd6wLiqgjd8K/6UzR5uyt6sg4ut/kVxi6+FJMbdI=
github.com/xaionaro-go/serializable v0.0.0-20250412140540-5ac572306599/go.mod h1:U3CCHI+1qYVegcvMrg95mUYU2Q7qC5deapVHbAHmSZI=
github.com/xaionaro-go/sockopt v0.0.0-20250526214909-e15e1e4bc738 h1:aGQY6gS+Y8GBwgt+Dg33f7JiUTU3p3OGrVKxqGzU7MM=
github.com/xaionaro-go/sockopt v0.0.0-20250526214909-e15e1e4bc738/go.mod h1:CqilA8ZWT91g8OUBwHE0JTIklhtB77lBrgTM7HlKYo4=
github.com/xaionaro-go/spinlock v0.0.0-20190309154744-55278e21e817/go.mod h1:Nb/15eS0BMty6TMuWgRQM8WCDIUlyPZagcpchHT6c9Y=
github.com/xaionaro-go/spinlock v0.0.0-20200518175509-30e6d1ce68a1 h1:1Kqw9dv2LnznIhJoMt3dNzc/ctSj6VHjyGh4YZHjpE4=
github.com/xaionaro-go/spinlock v0.0.0-20200518175509-30e6d1ce68a1/go.mod h1:UwmTXX+EpoEYHuy0rSys1Rp5PW+eVTgZSjgMVLJENKg=
@@ -1218,6 +1220,8 @@ go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
go4.org v0.0.0-20180809161055-417644f6feb5/go.mod h1:MkTOUMDaeVYJUOUsaDXIhWPZYa1yOyC1qaOBpL57BhE=
gocv.io/x/gocv v0.41.0 h1:KM+zRXUP28b6dHfhy+4JxDODbCNQNtLg8kio+YE7TqA=
gocv.io/x/gocv v0.41.0/go.mod h1:zYdWMj29WAEznM3Y8NsU3A0TRq/wR/cy75jeUypThqU=
golang.org/x/arch v0.12.0 h1:UsYJhbzPYGsT0HbEdmYcqtCv8UNGvnaL561NnIUvaKg=
golang.org/x/arch v0.12.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys=
golang.org/x/build v0.0.0-20190111050920-041ab4dc3f9d/go.mod h1:OWs+y06UdEOHN4y+MfF/py+xQ/tYqIWW03b70/CG9Rw=

View File

@@ -9,6 +9,7 @@ import (
type Client interface {
GetAuthorize(redirectURI, state, codeChallenge string, scope []gokick.Scope) (string, error)
GetToken(ctx context.Context, redirectURI, code, codeVerifier string) (gokick.TokenResponse, error)
RefreshToken(ctx context.Context, refreshToken string) (gokick.TokenResponse, error)
OnUserAccessTokenRefreshed(callback func(accessToken, refreshToken string))
UpdateStreamTitle(ctx context.Context, title string) (gokick.EmptyResponse, error)
UpdateStreamCategory(ctx context.Context, categoryID int) (gokick.EmptyResponse, error)

View File

@@ -18,6 +18,10 @@ func (clientMock) GetAuthorize(redirectURI, state, codeChallenge string, scope [
return "", nil
}
func (clientMock) RefreshToken(ctx context.Context, refreshToken string) (gokick.TokenResponse, error) {
return gokick.TokenResponse{}, nil
}
func (clientMock) GetToken(ctx context.Context, redirectURI, code, codeVerifier string) (gokick.TokenResponse, error) {
return gokick.TokenResponse{}, nil
}

View File

@@ -45,9 +45,8 @@ type Kick struct {
SaveCfgFn func(Config) error
PrepareLocker xsync.Mutex
lazyInitOnce sync.Once
getAccessTokenLocker xsync.Mutex
lastGetMutexSuccessAt time.Time
lazyInitOnce sync.Once
getAccessTokenLocker xsync.Mutex
}
var _ streamcontrol.StreamController[StreamProfile] = (*Kick)(nil)
@@ -140,13 +139,10 @@ func (k *Kick) keepAliveLoop(
time.Sleep(time.Second)
continue
}
client := k.GetClient()
if client == nil {
logger.Errorf(ctx, "client is not initialized")
time.Sleep(time.Second)
continue
}
_, err := client.GetLivestreams(k.CloseCtx, gokick.NewLivestreamListFilter().SetBroadcasterUserIDs(int(k.Channel.UserID)))
_, err := k.getLivestreams(
k.CloseCtx,
gokick.NewLivestreamListFilter().SetBroadcasterUserIDs(int(k.Channel.UserID)),
)
if err != nil {
logger.Errorf(ctx, "unable to get my stream status: %v", err)
time.Sleep(time.Second)
@@ -245,13 +241,8 @@ func (k *Kick) getAccessToken(
) (_err error) {
logger.Tracef(ctx, "getAccessToken")
defer func() { logger.Tracef(ctx, "/getAccessToken: %v", _err) }()
now := time.Now()
return xsync.DoR1(ctx, &k.getAccessTokenLocker, func() error {
if k.lastGetMutexSuccessAt.After(now) {
return nil
}
err := k.getAccessTokenNoLock(ctx)
return err
return k.getAccessTokenNoLock(ctx)
})
}
@@ -261,12 +252,6 @@ func (k *Kick) getAccessTokenNoLock(
logger.Tracef(ctx, "getAccessTokenNoLock")
defer func() { logger.Tracef(ctx, "/getAccessTokenNoLock: %v", _err) }()
defer func() {
if _err == nil {
k.lastGetMutexSuccessAt = time.Now()
}
}()
getPortsFn := k.CurrentConfig.Config.GetOAuthListenPorts
if getPortsFn == nil {
// TODO: find a way to adjust the OAuth ports dynamically without re-creating the Kick client.
@@ -391,10 +376,34 @@ func (k *Kick) Flush(ctx context.Context) error {
}
func (k *Kick) EndStream(ctx context.Context) error {
logger.Warnf(ctx, "not implemented yet")
// Kick ends a stream automatically, nothing to do:
return nil
}
func (k *Kick) getLivestreams(
ctx context.Context,
filter gokick.LivestreamListFilter,
) (_ret *gokick.LivestreamsResponseWrapper, _err error) {
logger.Debugf(ctx, "getLivestreams")
defer func() { logger.Debugf(ctx, "/getLivestreams: %v, %v", _ret, _err) }()
if err := k.prepare(ctx); err != nil {
return nil, fmt.Errorf("unable to get a prepared client: %w", err)
}
client := k.GetClient()
if client == nil {
return nil, fmt.Errorf("client is not initialized")
}
resp, err := client.GetLivestreams(k.CloseCtx, filter)
if err != nil {
return nil, fmt.Errorf("unable to get my stream status: %w", err)
}
return &resp, nil
}
func (k *Kick) GetStreamStatus(
ctx context.Context,
) (_ret *streamcontrol.StreamStatus, _err error) {
@@ -765,6 +774,14 @@ func (k *Kick) getAccessTokenIfNeeded(
logger.Tracef(ctx, "getAccessTokenIfNeeded")
defer func() { logger.Tracef(ctx, "/getAccessTokenIfNeeded: %v", _err) }()
if time.Now().After(k.CurrentConfig.Config.UserAccessTokenExpiresAt.Add(-30 * time.Second)) {
if k.CurrentConfig.Config.RefreshToken.Get() != "" {
if err := k.refreshAccessToken(ctx); err != nil {
return fmt.Errorf("unable to refresh the access token: %w", err)
}
}
}
if k.CurrentConfig.Config.UserAccessToken.Get() != "" {
return nil
}
@@ -777,6 +794,29 @@ func (k *Kick) getAccessTokenIfNeeded(
return nil
}
func (k *Kick) refreshAccessToken(
ctx context.Context,
) (_err error) {
logger.Tracef(ctx, "refreshAccessToken")
defer func() { logger.Tracef(ctx, "/refreshAccessToken: %v", _err) }()
resp, err := k.GetClient().RefreshToken(ctx, k.CurrentConfig.Config.RefreshToken.Get())
if err != nil {
logger.Errorf(ctx, "unable to refresh the token: %v", err)
if getErr := k.getAccessToken(ctx); getErr != nil {
return fmt.Errorf("unable to refresh access token (%w); and unable to get a new access token (%w)", err, getErr)
}
return nil
}
err = k.setToken(ctx, resp, time.Now())
if err != nil {
return fmt.Errorf("unable to set access token: %w")
}
return nil
}
func (k *Kick) IsCapable(
ctx context.Context,
cap streamcontrol.Capability,

View File

@@ -824,8 +824,11 @@ func (d *StreamD) EndStream(ctx context.Context, platID streamcontrol.PlatformNa
continue
}
logger.Debugf(ctx, "sleeping for 2 seconds, just in case")
time.Sleep(2 * time.Second)
switch platID {
case twitch.ID:
logger.Debugf(ctx, "sleeping for 20 seconds, to wait until Raid happens")
time.Sleep(20 * time.Second)
}
break
}
}

View File

@@ -2250,6 +2250,7 @@ func (p *Panel) doStopStream(ctx context.Context) {
obs.ID,
youtube.ID,
twitch.ID,
kick.ID,
} {
isEnabled, err := p.StreamD.IsBackendEnabled(ctx, backendID)
if err != nil {
@@ -2273,6 +2274,46 @@ func (p *Panel) doStopStream(ctx context.Context) {
p.updateStreamClockHandler = nil
}
var wg sync.WaitGroup
if p.youtubeCheck.Checked && backendEnabled[youtube.ID] {
wg.Add(1)
observability.Go(ctx, func(ctx context.Context) {
defer wg.Done()
p.startStopButton.SetText("Stopping YouTube...")
err := p.StreamD.EndStream(ctx, youtube.ID)
if err != nil {
p.DisplayError(fmt.Errorf("unable to stop the stream on YouTube: %w", err))
}
})
}
if p.twitchCheck.Checked && backendEnabled[twitch.ID] {
wg.Add(1)
observability.Go(ctx, func(ctx context.Context) {
defer wg.Done()
p.startStopButton.SetText("Stopping Twitch...")
err := p.StreamD.EndStream(ctx, twitch.ID)
if err != nil {
p.DisplayError(fmt.Errorf("unable to stop the stream on Twitch: %w", err))
}
})
}
if p.kickCheck.Checked && backendEnabled[kick.ID] {
wg.Add(1)
observability.Go(ctx, func(ctx context.Context) {
defer wg.Done()
p.startStopButton.SetText("Stopping Kick...")
err := p.StreamD.EndStream(ctx, kick.ID)
if err != nil {
p.DisplayError(fmt.Errorf("unable to stop the stream on Kick: %w", err))
}
})
}
wg.Wait()
if backendEnabled[obs.ID] {
if streamDCfg != nil {
obsCfg := streamcontrol.GetPlatformConfig[obs.PlatformSpecificConfig, obs.StreamProfile](ctx, streamDCfg.Backends, obs.ID)
@@ -2296,14 +2337,6 @@ func (p *Panel) doStopStream(ctx context.Context) {
}
}
if p.youtubeCheck.Checked && backendEnabled[youtube.ID] {
p.startStopButton.SetText("Stopping YouTube...")
err := p.StreamD.EndStream(ctx, youtube.ID)
if err != nil {
p.DisplayError(fmt.Errorf("unable to stop the stream on YouTube: %w", err))
}
}
if backendEnabled[twitch.ID] {
p.twitchCheck.Enable()
}