Multiple updates
Some checks failed
rolling-release / build (push) Has been cancelled
rolling-release / rolling-release (push) Has been cancelled

This commit is contained in:
Dmitrii Okunev
2025-07-12 22:21:28 +01:00
parent 2ae50b98db
commit 1004082fe4
48 changed files with 1930 additions and 659 deletions

View File

@@ -2,8 +2,8 @@ package main
import (
"context"
"crypto/tls"
"log"
"net"
"net/http"
_ "net/http/pprof"
"os"
@@ -19,11 +19,13 @@ import (
"github.com/getsentry/sentry-go"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
"github.com/spf13/pflag"
"github.com/xaionaro-go/eventbus"
"github.com/xaionaro-go/grpcproxy/grpcproxyserver"
"github.com/xaionaro-go/grpcproxy/protobuf/go/proxy_grpc"
"github.com/xaionaro-go/obs-grpc-proxy/protobuf/go/obs_grpc"
"github.com/xaionaro-go/observability"
"github.com/xaionaro-go/streamctl/cmd/streamd/ui"
"github.com/xaionaro-go/streamctl/pkg/cert"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol"
"github.com/xaionaro-go/streamctl/pkg/streamd"
"github.com/xaionaro-go/streamctl/pkg/streamd/config"
@@ -127,6 +129,8 @@ func main() {
}
defer belt.Flush(ctx)
eventbus.LoggingEnabled = true
configPathExpanded, err := xpath.Expand(*configPath)
if err != nil {
l.Fatalf("unable to get the path to the data file: %v", err)
@@ -181,7 +185,16 @@ func main() {
}
})
listener, err := net.Listen("tcp", *listenAddr)
cert, err := cert.GenerateSelfSignedForServer()
if err != nil {
logger.Panicf(ctx, "unable to generate the certificate: %v", err)
}
listener, err := tls.Listen("tcp", *listenAddr, &tls.Config{
Certificates: []tls.Certificate{cert},
NextProtos: []string{"h2"},
})
//listener, err := net.Listen("tcp", *listenAddr)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}

View File

@@ -5,4 +5,4 @@ Website = "https://github.com/xaionaro/streamctl"
Name = "streampanel"
ID = "center.dx.streampanel"
Version = "0.1.0"
Build = 432
Build = 433

View File

@@ -16,6 +16,7 @@ import (
"github.com/facebookincubator/go-belt"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/xaionaro-go/eventbus"
"github.com/xaionaro-go/observability"
)
@@ -115,6 +116,8 @@ func initRuntime(
seppukuIfMemHugeLeak(ctx)
eventbus.LoggingEnabled = true
ctx, cancelFn := context.WithCancel(ctx)
return ctx, func() {
defer belt.Flush(ctx)

View File

@@ -3,6 +3,7 @@ package main
import (
"bytes"
"context"
"crypto/tls"
"encoding/gob"
"fmt"
"net"
@@ -20,6 +21,7 @@ import (
"github.com/xaionaro-go/obs-grpc-proxy/protobuf/go/obs_grpc"
"github.com/xaionaro-go/observability"
"github.com/xaionaro-go/streamctl/cmd/streamd/ui"
"github.com/xaionaro-go/streamctl/pkg/cert"
"github.com/xaionaro-go/streamctl/pkg/mainprocess"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol"
"github.com/xaionaro-go/streamctl/pkg/streamd"
@@ -257,7 +259,17 @@ func initGRPCServers(
) (net.Listener, *grpc.Server, *server.GRPCServer, obs_grpc.OBSServer, proxy_grpc.NetworkProxyServer) {
logger.Debugf(ctx, "initGRPCServers")
defer logger.Debugf(ctx, "/initGRPCServers")
listener, err := net.Listen("tcp", listenAddr)
cert, err := cert.GenerateSelfSignedForServer()
if err != nil {
logger.Panicf(ctx, "unable to generate the certificate: %v", err)
}
logger.Debugf(ctx, "generated certificate %#+v", cert)
listener, err := tls.Listen("tcp", listenAddr, &tls.Config{
Certificates: []tls.Certificate{cert},
NextProtos: []string{"h2"},
})
if err != nil {
logger.Panicf(ctx, "failed to listen: %v", err)
}

6
go.mod
View File

@@ -27,8 +27,6 @@ replace github.com/asticode/go-astiav v0.36.0 => github.com/xaionaro-go/astiav v
replace github.com/bluenviron/mediacommon/v2 v2.0.1-0.20250324151931-b8ce69d15d3d => github.com/xaionaro-go/mediacommon/v2 v2.0.0-20250420012906-03d6d69ac3b7
replace github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef => github.com/mysteriumnetwork/EventBus v0.0.0-20220414214953-84469ec2b111
require (
github.com/facebookincubator/go-belt v0.0.0-20250308011339-62fb7027b11f
github.com/go-git/go-billy/v5 v5.6.2
@@ -36,6 +34,7 @@ require (
github.com/hashicorp/go-multierror v1.1.1
github.com/nicklaw5/helix/v2 v2.30.1-0.20240715193454-0151ccccf980
github.com/spf13/cobra v1.8.1
github.com/xaionaro-go/eventbus v0.0.0-20250712221024-f0986ef769fa
github.com/xaionaro-go/logrustash v0.0.0-20240804141650-d48034780a5f // indirect
golang.org/x/oauth2 v0.30.0
google.golang.org/api v0.239.0
@@ -256,7 +255,6 @@ require (
github.com/adeithe/go-twitch v0.3.1
github.com/andreykaipov/goobs v1.4.1
github.com/anthonynsimon/bild v0.14.0
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef
github.com/asticode/go-astiav v0.36.0
github.com/bamiaux/rez v0.0.0-20170731184118-29f4463c688b
github.com/bluenviron/gortsplib/v4 v4.12.4-0.20250324174248-61372cfa6800
@@ -326,6 +324,8 @@ require (
require (
github.com/BurntSushi/xgbutil v0.0.0-20190907113008-ad855c713046
github.com/cloudwego/eino-ext/components/model/openai v0.0.0-20250424061409-ccd60fbc7c1c
github.com/coder/websocket v1.8.13
github.com/joeyak/go-twitch-eventsub/v3 v3.0.0
github.com/phuslu/goid v1.0.2 // indirect
github.com/pion/datachannel v1.5.10 // indirect
github.com/pion/dtls/v2 v2.2.12 // indirect

8
go.sum
View File

@@ -216,6 +216,8 @@ github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWH
github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
github.com/coder/websocket v1.8.13 h1:f3QZdXy7uGVz+4uCJy2nTZyM0yTBj8yANEHhqlXZ9FE=
github.com/coder/websocket v1.8.13/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs=
github.com/coreos/go-oidc/v3 v3.11.0 h1:Ia3MxdwpSw702YW0xgfmP1GVCMA9aEFWu12XUZ3/OtI=
github.com/coreos/go-oidc/v3 v3.11.0/go.mod h1:gE3LgjOgFoHi9a4ce4/tJczr0Ai2/BoDhf0r5lltWI0=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
@@ -613,6 +615,8 @@ github.com/jfreymuth/oggvorbis v1.0.5 h1:u+Ck+R0eLSRhgq8WTmffYnrVtSztJcYrl588DM4
github.com/jfreymuth/oggvorbis v1.0.5/go.mod h1:1U4pqWmghcoVsCJJ4fRBKv9peUJMBHixthRlBeD6uII=
github.com/jfreymuth/vorbis v1.0.2 h1:m1xH6+ZI4thH927pgKD8JOH4eaGRm18rEE9/0WKjvNE=
github.com/jfreymuth/vorbis v1.0.2/go.mod h1:DoftRo4AznKnShRl1GxiTFCseHr4zR9BN3TWXyuzrqQ=
github.com/joeyak/go-twitch-eventsub/v3 v3.0.0 h1:6BDgmYJynNDyCP7P+wM9jPQnE3leJAi58nohDnzliJ4=
github.com/joeyak/go-twitch-eventsub/v3 v3.0.0/go.mod h1:rpqOjYP1ftWDj3H4D8fA58AdOpkvK9YvODoduDpPCQU=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
@@ -741,8 +745,6 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mysteriumnetwork/EventBus v0.0.0-20220414214953-84469ec2b111 h1:7s+VqlctjdVjy1z0slV2giUawTnv1A6vWj9oKKfgPhI=
github.com/mysteriumnetwork/EventBus v0.0.0-20220414214953-84469ec2b111/go.mod h1:ef8wV5ITJhXSTG1sUkcHPAQF7lh83c7l875IvrYU7H0=
github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJEU3ofeGjhHklVoIGuVj85JJwZ6kWPaJwCIxgnFmo=
github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM=
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 h1:zYyBkD/k9seD2A7fsi6Oo2LfFZAehjjQMERAvZLEDnQ=
@@ -1088,6 +1090,8 @@ github.com/xaionaro-go/avpipeline v0.0.0-20250525204026-17104bc4baca h1:Cls4rEim
github.com/xaionaro-go/avpipeline v0.0.0-20250525204026-17104bc4baca/go.mod h1:LMh5Qi7cuntcktUezfA9toVCUCCsx9pjyGDWe9GLt9A=
github.com/xaionaro-go/datacounter v1.0.4 h1:+QMZLmu73R5WGkQfUPwlXF/JFN+Weo4iuDZkiL2wVm8=
github.com/xaionaro-go/datacounter v1.0.4/go.mod h1:Sf9vBevuV6w5iE6K3qJ9pWVKcyS60clWBUSQLjt5++c=
github.com/xaionaro-go/eventbus v0.0.0-20250712221024-f0986ef769fa h1:p4rzmAuKfYTgoHp4yL3qIL3Js4P3I91yhe654L4gnFo=
github.com/xaionaro-go/eventbus v0.0.0-20250712221024-f0986ef769fa/go.mod h1:zSbWHZpDvsRhjD3Sr3bruqqsWotjXvsIKmx6/THwXFw=
github.com/xaionaro-go/fyne/v2 v2.0.0-20250622004601-3a26ee69528a h1:awMQXlaweeiSZB4rSNfMmJGJriyn1ca/m/lglBi9uyA=
github.com/xaionaro-go/fyne/v2 v2.0.0-20250622004601-3a26ee69528a/go.mod h1:0GOXKqyvNwk3DLmsFu9v0oYM0ZcD1ysGnlHCerKoAmo=
github.com/xaionaro-go/go-rtmp v0.0.0-20241009130244-1e3160f27f42 h1:izCjREd+62HDF9FRYqUI7dgJNdUxAIysEuqed8lBcDY=

View File

@@ -0,0 +1,44 @@
package cert
import (
"crypto/ed25519"
"crypto/rand"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"math/big"
"time"
)
func GenerateSelfSignedForServer() (tls.Certificate, error) {
pub, priv, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
return tls.Certificate{}, err
}
tmpl := x509.Certificate{
SerialNumber: big.NewInt(1),
Subject: pkix.Name{
Organization: []string{"DX.center"},
},
NotBefore: time.Now(),
NotAfter: time.Now().Add(10 * 365 * 24 * time.Hour),
KeyUsage: x509.KeyUsageDigitalSignature,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
DNSNames: []string{"wingout.dx.center"},
}
certDER, err := x509.CreateCertificate(rand.Reader, &tmpl, &tmpl, pub, priv)
if err != nil {
return tls.Certificate{}, err
}
keyBytes, err := x509.MarshalPKCS8PrivateKey(priv)
certPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certDER})
keyPEM := pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: keyBytes})
return tls.X509KeyPair(certPEM, keyPEM)
}

View File

@@ -33,6 +33,7 @@ func (s *ChatMessagesStorage) loadLocked(ctx context.Context) (_err error) {
if err != nil {
return fmt.Errorf("unable to parse file '%s': %w", s.FilePath, err)
}
logger.Debugf(ctx, "loaded %d messages", len(s.Messages))
s.sortAndDeduplicateAndTruncate(ctx)
return nil
}

View File

@@ -0,0 +1,39 @@
package ringbuffer
import (
"context"
"slices"
"github.com/xaionaro-go/xsync"
)
type RingBuffer[T comparable] struct {
Storage []T
CurrentWriteIndex uint
Locker xsync.Mutex
}
func New[T comparable](size uint) *RingBuffer[T] {
return &RingBuffer[T]{
Storage: make([]T, 0, size),
}
}
func (r *RingBuffer[T]) Add(item T) {
r.Locker.Do(context.TODO(), func() {
if r.CurrentWriteIndex >= uint(len(r.Storage)) {
r.Storage = r.Storage[:len(r.Storage)+1]
}
r.Storage[r.CurrentWriteIndex] = item
r.CurrentWriteIndex++
if r.CurrentWriteIndex >= uint(cap(r.Storage)) {
r.CurrentWriteIndex = 0
}
})
}
func (r *RingBuffer[T]) Contains(item T) bool {
return xsync.DoR1(context.TODO(), &r.Locker, func() bool {
return slices.Contains(r.Storage, item)
})
}

View File

@@ -11,6 +11,7 @@ import (
http "github.com/Danny-Dasilva/fhttp"
"github.com/davecgh/go-spew/spew"
"github.com/facebookincubator/go-belt"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/google/uuid"
"github.com/scorfly/gokick"
@@ -64,6 +65,7 @@ func New(
cfg Config,
saveCfgFn func(Config) error,
) (*Kick, error) {
ctx = belt.WithField(ctx, "controller", ID)
if cfg.Config.Channel == "" {
return nil, fmt.Errorf("channel is not set")
}

View File

@@ -115,6 +115,20 @@ type ChatMessage struct {
Username string
MessageID ChatMessageID
Message string
Paid Money
}
type Currency int
const (
CurrencyNone = Currency(iota)
CurrencyUSD
CurrencyOther
)
type Money struct {
Currency Currency
Amount float64
}
type StreamControllerCommons interface {

View File

@@ -0,0 +1,34 @@
package auth
import (
"context"
"fmt"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/nicklaw5/helix/v2"
"github.com/xaionaro-go/streamctl/pkg/secret"
)
func NewTokenByApp(
ctx context.Context,
client *helix.Client,
) (secret.String, error) {
logger.Debugf(ctx, "getNewTokenByApp")
defer func() { logger.Debugf(ctx, "/getNewTokenByApp") }()
resp, err := client.RequestAppAccessToken(nil)
if err != nil {
return secret.New(""), fmt.Errorf("unable to get app access token: %w", err)
}
if resp.ErrorStatus != 0 {
return secret.New(""), fmt.Errorf(
"unable to get app access token (the response contains an error): %d %v: %v",
resp.ErrorStatus,
resp.Error,
resp.ErrorMessage,
)
}
return secret.New(resp.Data.AccessToken), nil
}

View File

@@ -0,0 +1,43 @@
package auth
import (
"context"
"fmt"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/nicklaw5/helix/v2"
"github.com/xaionaro-go/observability"
"github.com/xaionaro-go/streamctl/pkg/secret"
)
func NewTokenByUser(
ctx context.Context,
client *helix.Client,
clientCode secret.String,
) (secret.String, secret.String, error) {
logger.Debugf(ctx, "getNewTokenByUser")
defer func() { logger.Debugf(ctx, "/getNewTokenByUser") }()
if clientCode.Get() == "" {
return secret.New(""), secret.New(""), fmt.Errorf("internal error: ClientCode is empty")
}
logger.Debugf(ctx, "requesting user access token...")
resp, err := client.RequestUserAccessToken(clientCode.Get())
if observability.IsOnInsecureDebug(ctx) {
logger.Debugf(ctx, "requesting user access token result: %#+v %v", resp, err)
}
if err != nil {
return secret.New(""), secret.New(""), fmt.Errorf("unable to get user access token: %w", err)
}
if resp.ErrorStatus != 0 {
return secret.New(""), secret.New(""), fmt.Errorf(
"unable to query: %d %v: %v",
resp.ErrorStatus,
resp.Error,
resp.ErrorMessage,
)
}
return secret.New(resp.Data.AccessToken), secret.New(resp.Data.RefreshToken), nil
}

View File

@@ -0,0 +1,181 @@
package auth
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/facebookincubator/go-belt/tool/experimental/errmon"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/hashicorp/go-multierror"
"github.com/nicklaw5/helix/v2"
"github.com/xaionaro-go/observability"
"github.com/xaionaro-go/streamctl/pkg/oauthhandler"
)
type OAuthHandler func(context.Context, oauthhandler.OAuthHandlerArgument) error
func NewClientCode(
ctx context.Context,
clientID string,
oauthHandler OAuthHandler,
getOAuthListenPortsFn func() []uint16,
onNewClientCode func(string),
) (_err error) {
logger.Debugf(ctx, "getNewClientCode")
defer func() { logger.Debugf(ctx, "/getNewClientCode: %v", _err) }()
if oauthHandler == nil {
oauthHandler = oauthhandler.OAuth2HandlerViaCLI
}
ctx, ctxCancelFunc := context.WithCancel(ctx)
cancelFunc := func() {
logger.Debugf(ctx, "cancelling the context")
ctxCancelFunc()
}
var errWg sync.WaitGroup
var resultErr error
errCh := make(chan error)
errWg.Add(1)
observability.Go(ctx, func(ctx context.Context) {
errWg.Done()
for err := range errCh {
errmon.ObserveErrorCtx(ctx, err)
resultErr = multierror.Append(resultErr, err)
}
})
alreadyListening := map[uint16]struct{}{}
var wg sync.WaitGroup
success := false
startHandlerForPort := func(listenPort uint16) {
if _, ok := alreadyListening[listenPort]; ok {
return
}
alreadyListening[listenPort] = struct{}{}
logger.Debugf(ctx, "starting the oauth handler at port %d", listenPort)
wg.Add(1)
{
listenPort := listenPort
observability.Go(ctx, func(ctx context.Context) {
defer func() { logger.Debugf(ctx, "ended the oauth handler at port %d", listenPort) }()
defer wg.Done()
authURL := GetAuthorizationURL(
&helix.AuthorizationURLParams{
ResponseType: "code", // or "token"
Scopes: []string{
"user:read:chat",
"chat:read",
"chat:edit",
"channel:manage:broadcast",
"moderator:manage:chat_messages",
"moderator:manage:banned_users",
},
},
clientID,
RedirectURI(listenPort),
)
arg := oauthhandler.OAuthHandlerArgument{
AuthURL: authURL,
ListenPort: listenPort,
ExchangeFn: func(ctx context.Context, code string) (_err error) {
logger.Debugf(ctx, "ExchangeFn()")
defer func() { logger.Debugf(ctx, "/ExchangeFn(): %v", _err) }()
if code == "" {
return fmt.Errorf("code is empty")
}
onNewClientCode(code)
return nil
},
}
err := oauthHandler(ctx, arg)
if err != nil {
errCh <- fmt.Errorf("unable to get or exchange the oauth code to a token: %w", err)
return
}
cancelFunc()
success = true
})
}
}
// TODO: either support only one port as in New, or support multiple
// ports as we do below
getPortsFn := getOAuthListenPortsFn
if getPortsFn == nil {
return fmt.Errorf("the function GetOAuthListenPorts is not set")
}
for _, listenPort := range getPortsFn() {
startHandlerForPort(listenPort)
}
wg.Add(1)
observability.Go(ctx, func(ctx context.Context) {
defer wg.Done()
t := time.NewTicker(time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
}
ports := getPortsFn()
logger.Tracef(ctx, "oauth listener ports: %#+v", ports)
for _, listenPort := range ports {
startHandlerForPort(listenPort)
}
}
})
observability.Go(ctx, func(ctx context.Context) {
wg.Wait()
close(errCh)
})
<-ctx.Done()
logger.Debugf(ctx, "did successfully took a new client code? -- %v", success)
if !success {
errWg.Wait()
return resultErr
}
return nil
}
func RedirectURI(listenPort uint16) string {
return fmt.Sprintf("http://localhost:%d/", listenPort)
}
func GetAuthorizationURL(
params *helix.AuthorizationURLParams,
clientID string,
redirectURI string,
) string {
url := helix.AuthBaseURL + "/authorize"
url += "?response_type=" + params.ResponseType
url += "&client_id=" + clientID
url += "&redirect_uri=" + redirectURI
if params.State != "" {
url += "&state=" + params.State
}
if params.ForceVerify {
url += "&force_verify=true"
}
if len(params.Scopes) != 0 {
url += "&scope=" + strings.Join(params.Scopes, "%20")
}
return url
}

View File

@@ -1,25 +0,0 @@
package twitch
import (
"github.com/adeithe/go-twitch"
"github.com/adeithe/go-twitch/irc"
)
type chatClientImpl struct {
*irc.Client
}
var _ ChatClient = (*chatClientImpl)(nil)
func (c *chatClientImpl) Join(channelIDs ...string) error {
return c.Client.Join(channelIDs...)
}
func (c *chatClientImpl) OnShardMessage(callback func(shard int, msg irc.ChatMessage)) {
c.Client.OnShardMessage(callback)
}
func newChatClient() *chatClientImpl {
return &chatClientImpl{
Client: twitch.IRC(),
}
}

View File

@@ -0,0 +1,31 @@
package twitch
import (
"context"
"github.com/adeithe/go-twitch"
"github.com/adeithe/go-twitch/irc"
)
type chatClientIRC struct {
*irc.Client
}
var _ ChatClientIRC = (*chatClientIRC)(nil)
func (c *chatClientIRC) Join(channelIDs ...string) error {
return c.Client.Join(channelIDs...)
}
func (c *chatClientIRC) OnShardMessage(callback func(shard int, msg irc.ChatMessage)) {
c.Client.OnShardMessage(callback)
}
func (c *chatClientIRC) Close(ctx context.Context) error {
c.Client.Close()
return nil
}
func newChatClientIRC() *chatClientIRC {
return &chatClientIRC{
Client: twitch.IRC(),
}
}

View File

@@ -2,110 +2,11 @@ package twitch
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/adeithe/go-twitch/irc"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/observability"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol"
)
type ChatClient interface {
Join(channelIDs ...string) error
OnShardMessage(func(shard int, msg irc.ChatMessage))
Close()
}
type ChatHandler struct {
client ChatClient
cancelFunc context.CancelFunc
waitGroup sync.WaitGroup
messagesInChan chan irc.ChatMessage
messagesOutChan chan streamcontrol.ChatMessage
}
func NewChatHandler(
ctx context.Context,
channelID string,
) (*ChatHandler, error) {
var errs []error
for attempt := 0; attempt < 3; attempt++ {
h, err := newChatHandler(ctx, newChatClient(), channelID)
if err == nil {
return h, nil
}
err = fmt.Errorf("attempt #%d failed: %w", attempt, err)
logger.Errorf(ctx, "%v", err)
errs = append(errs, err)
time.Sleep(time.Second)
}
return nil, errors.Join(errs...)
}
func newChatHandler(
ctx context.Context,
chatClient ChatClient,
channelID string,
) (*ChatHandler, error) {
err := chatClient.Join(channelID)
if err != nil {
return nil, fmt.Errorf("unable to join channel '%s': %w", channelID, err)
}
ctx, cancelFn := context.WithCancel(ctx)
h := &ChatHandler{
client: chatClient,
cancelFunc: cancelFn,
messagesInChan: make(chan irc.ChatMessage),
messagesOutChan: make(chan streamcontrol.ChatMessage, 100),
}
h.waitGroup.Add(1)
observability.Go(ctx, func(ctx context.Context) {
defer h.waitGroup.Done()
defer func() {
h.client.Close()
// h.Client.Close above waits inside for everything to finish,
// so we can safely close the channel here:
close(h.messagesInChan)
close(h.messagesOutChan)
}()
for {
select {
case <-ctx.Done():
return
case ev := <-h.messagesInChan:
select {
case h.messagesOutChan <- streamcontrol.ChatMessage{
CreatedAt: ev.CreatedAt,
UserID: streamcontrol.ChatUserID(ev.Sender.Username),
Username: ev.Sender.Username,
MessageID: streamcontrol.ChatMessageID(ev.ID),
Message: ev.Text, // TODO: investigate if we need ev.IRCMessage.Text
}:
default:
}
}
}
})
chatClient.OnShardMessage(h.onShardMessage)
return h, nil
}
func (h *ChatHandler) onShardMessage(shard int, msg irc.ChatMessage) {
h.messagesInChan <- msg
}
func (h *ChatHandler) Close() error {
h.cancelFunc()
h.waitGroup.Wait()
return nil
}
func (h *ChatHandler) MessagesChan() <-chan streamcontrol.ChatMessage {
return h.messagesOutChan
type ChatHandler interface {
Close(ctx context.Context) error
MessagesChan() <-chan streamcontrol.ChatMessage
}

View File

@@ -0,0 +1,126 @@
package twitch
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/adeithe/go-twitch/irc"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/observability"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol"
)
type ChatClientIRC interface {
Join(channelIDs ...string) error
OnShardMessage(func(shard int, msg irc.ChatMessage))
Close(context.Context) error
}
type ChatHandlerIRC struct {
client ChatClientIRC
cancelFunc context.CancelFunc
waitGroup sync.WaitGroup
messagesInChan chan irc.ChatMessage
messagesOutChan chan streamcontrol.ChatMessage
}
var _ ChatHandler = (*ChatHandlerIRC)(nil)
func NewChatHandlerIRC(
ctx context.Context,
channelID string,
) (_ret *ChatHandlerIRC, _err error) {
logger.Debugf(ctx, "NewChatHandlerIRC")
defer func() { logger.Debugf(ctx, "/NewChatHandlerIRC: %v", _err) }()
var errs []error
for attempt := 0; attempt < 3; attempt++ {
h, err := newChatHandlerIRC(ctx, newChatClientIRC(), channelID)
if err == nil {
return h, nil
}
err = fmt.Errorf("attempt #%d failed: %w", attempt, err)
logger.Errorf(ctx, "%v", err)
errs = append(errs, err)
time.Sleep(time.Second)
}
return nil, errors.Join(errs...)
}
func newChatHandlerIRC(
ctx context.Context,
chatClient ChatClientIRC,
channelID string,
) (_ret *ChatHandlerIRC, _err error) {
logger.Debugf(ctx, "newChatHandlerIRC")
defer func() { logger.Debugf(ctx, "/newChatHandlerIRC: %v", _err) }()
err := chatClient.Join(channelID)
if err != nil {
return nil, fmt.Errorf("unable to join channel '%s': %w", channelID, err)
}
ctx, cancelFn := context.WithCancel(ctx)
h := &ChatHandlerIRC{
client: chatClient,
cancelFunc: cancelFn,
messagesInChan: make(chan irc.ChatMessage),
messagesOutChan: make(chan streamcontrol.ChatMessage, 100),
}
h.waitGroup.Add(1)
observability.Go(ctx, func(ctx context.Context) {
defer logger.Debugf(ctx, "newChatHandlerIRC: closed")
defer h.waitGroup.Done()
defer func() {
h.client.Close(ctx)
// h.Client.Close above waits inside for everything to finish,
// so we can safely close the channel here:
close(h.messagesInChan)
close(h.messagesOutChan)
}()
for {
select {
case <-ctx.Done():
logger.Debugf(ctx, "newChatHandlerIRC: closing: %v", ctx.Err())
return
case ev, ok := <-h.messagesInChan:
if !ok {
logger.Debugf(ctx, "newChatHandlerIRC: input channel closed")
return
}
select {
case h.messagesOutChan <- streamcontrol.ChatMessage{
CreatedAt: ev.CreatedAt,
UserID: streamcontrol.ChatUserID(ev.Sender.Username),
Username: ev.Sender.Username,
MessageID: streamcontrol.ChatMessageID(ev.ID),
Message: ev.Text, // TODO: investigate if we need ev.IRCMessage.Text
}:
default:
logger.Warnf(ctx, "the queue is full, skipping the message")
}
}
}
})
chatClient.OnShardMessage(h.onShardMessage)
return h, nil
}
func (h *ChatHandlerIRC) onShardMessage(shard int, msg irc.ChatMessage) {
ctx := context.TODO()
logger.Debugf(ctx, "newChatHandlerIRC: onShardMessage")
h.messagesInChan <- msg
}
func (h *ChatHandlerIRC) Close(ctx context.Context) error {
h.cancelFunc()
h.waitGroup.Wait()
return nil
}
func (h *ChatHandlerIRC) MessagesChan() <-chan streamcontrol.ChatMessage {
return h.messagesOutChan
}

View File

@@ -11,25 +11,25 @@ import (
"github.com/xaionaro-go/streamctl/pkg/streamcontrol"
)
type chatClientMock struct {
type chatClientIRCMock struct {
join func(channelIDs ...string) error
onShardMessage func(func(shard int, msg irc.ChatMessage))
close func()
close func(ctx context.Context) error
}
var _ ChatClient = (*chatClientMock)(nil)
var _ ChatClientIRC = (*chatClientIRCMock)(nil)
func (c *chatClientMock) Join(channelIDs ...string) error {
func (c *chatClientIRCMock) Join(channelIDs ...string) error {
return c.join(channelIDs...)
}
func (c *chatClientMock) OnShardMessage(callback func(shard int, msg irc.ChatMessage)) {
func (c *chatClientIRCMock) OnShardMessage(callback func(shard int, msg irc.ChatMessage)) {
c.onShardMessage(callback)
}
func (c *chatClientMock) Close() {
c.close()
func (c *chatClientIRCMock) Close(ctx context.Context) error {
return c.close(ctx)
}
func TestChatHandler(t *testing.T) {
func TestChatHandlerIRC(t *testing.T) {
ctx := context.TODO()
const channelID = "test-channel-id"
@@ -38,7 +38,7 @@ func TestChatHandler(t *testing.T) {
callback func(shard int, msg irc.ChatMessage)
closeCount = 0
)
h, err := newChatHandler(ctx, &chatClientMock{
h, err := newChatHandlerIRC(ctx, &chatClientIRCMock{
join: func(channelIDs ...string) error {
joinedChannelIDs = append(joinedChannelIDs, channelIDs...)
return nil
@@ -46,8 +46,9 @@ func TestChatHandler(t *testing.T) {
onShardMessage: func(_callback func(shard int, msg irc.ChatMessage)) {
callback = _callback
},
close: func() {
close: func(ctx context.Context) error {
closeCount++
return nil
},
}, channelID)
require.NoError(t, err)
@@ -84,7 +85,7 @@ func TestChatHandler(t *testing.T) {
})
require.Equal(t, 0, closeCount)
h.Close()
h.Close(ctx)
require.Equal(t, 1, closeCount)
wg.Wait()
}

View File

@@ -0,0 +1,305 @@
package twitch
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
"github.com/coder/websocket"
"github.com/facebookincubator/go-belt/tool/logger"
twitcheventsub "github.com/joeyak/go-twitch-eventsub/v3"
"github.com/nicklaw5/helix/v2"
"github.com/xaionaro-go/observability"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol"
)
type ChatHandlerSub struct {
client TwitchSubscriptionClient
wsConn *websocket.Conn
broadcasterID string
cancelFunc context.CancelFunc
waitGroup sync.WaitGroup
messagesOutChan chan streamcontrol.ChatMessage
}
var _ ChatHandler = (*ChatHandlerSub)(nil)
type TwitchSubscriptionClient interface {
CreateEventSubSubscription(payload *helix.EventSubSubscription) (*helix.EventSubSubscriptionsResponse, error)
GetUsers(params *helix.UsersParams) (*helix.UsersResponse, error)
}
func NewChatHandlerSub(
ctx context.Context,
client TwitchSubscriptionClient,
broadcasterID string,
onClose func(context.Context),
) (_ret *ChatHandlerSub, _err error) {
logger.Debugf(ctx, "NewChatHandlerSub")
defer func() { logger.Debugf(ctx, "/NewChatHandlerSub: %v", _err) }()
const urlString = "wss://eventsub.wss.twitch.tv/ws"
c, _, err := websocket.Dial(ctx, urlString, nil)
if err != nil {
return nil, fmt.Errorf("unable to initiate a websocket connection to '%s': %w", urlString, err)
}
var myUserID string
{
resp, err := client.GetUsers(&helix.UsersParams{})
if err != nil {
return nil, fmt.Errorf("unable to get my user info: %w", err)
}
if len(resp.Data.Users) != 1 {
return nil, fmt.Errorf("expected to get one user info, but received %d", len(resp.Data.Users))
}
myUserID = resp.Data.Users[0].ID
logger.Debugf(ctx, "my user ID: %v", myUserID)
}
ctx, cancelFn := context.WithCancel(ctx)
h := &ChatHandlerSub{
client: client,
wsConn: c,
broadcasterID: broadcasterID,
cancelFunc: cancelFn,
messagesOutChan: make(chan streamcontrol.ChatMessage, 100),
}
defer func() {
if _err != nil {
_ = h.Close(ctx)
}
}()
_, sessMsgBytes, err := c.Read(ctx)
if err != nil {
return nil, fmt.Errorf("unable to get my session ID: %w", err)
}
var sessMsg SessionWelcomeMessage
err = json.Unmarshal(sessMsgBytes, &sessMsg)
if err != nil {
return nil, fmt.Errorf("unable to deserialize the session ID message '%s': %w", sessMsgBytes, err)
}
sessID := sessMsg.Payload.Session.ID
logger.Debugf(ctx, "session ID: '%s' (%s)", sessID, sessMsgBytes)
params := &helix.EventSubSubscription{
Type: string(twitcheventsub.SubChannelChatMessage),
Version: "1",
Condition: helix.EventSubCondition{
BroadcasterUserID: broadcasterID,
UserID: myUserID,
},
Transport: helix.EventSubTransport{
Method: "websocket",
SessionID: sessID,
},
}
resp, err := client.CreateEventSubSubscription(params)
if err != nil {
return nil, fmt.Errorf("unable to create a subscription (%#+v): %w", params, err)
}
if resp.ErrorMessage != "" {
return nil, fmt.Errorf("got an error during subscription (%#+v): %s", params, resp.ErrorMessage)
}
h.waitGroup.Add(1)
observability.Go(ctx, func(ctx context.Context) {
defer logger.Debugf(ctx, "NewChatHandlerSub: closed")
if onClose != nil {
defer onClose(ctx)
}
defer h.waitGroup.Done()
defer func() {
close(h.messagesOutChan)
}()
for {
select {
case <-ctx.Done():
logger.Debugf(ctx, "NewChatHandlerSub: context closed: %v", ctx.Err())
return
default:
}
_, messageSerialized, err := c.Read(ctx)
if err != nil {
logger.Errorf(ctx, "unable to read the message: %v", err)
return
}
var header struct {
Metadata MessageMetadata `json:"metadata"`
}
if err := json.Unmarshal(messageSerialized, &header); err != nil {
logger.Errorf(ctx, "unable to un-JSON-ize message '%s': %v", messageSerialized, err)
return
}
t, err := time.Parse(time.RFC3339Nano, header.Metadata.MessageTimestamp)
if err != nil {
logger.Errorf(ctx, "unable to parse timestamp '%s': %v", header.Metadata.MessageTimestamp, err)
t = time.Now()
}
var msgAbstract any
switch header.Metadata.MessageType {
case "session_welcome":
msgAbstract = &SessionWelcomeMessage{}
case "notification":
var msg NotificationMessage
err := json.Unmarshal(messageSerialized, &msg)
if err != nil {
logger.Errorf(ctx, "unable to unserialize the notification message '%s': %v", messageSerialized, err)
continue
}
logger.Tracef(ctx, "notification: %#+v", msg)
switch twitcheventsub.EventSubscription(msg.Payload.Subscription.Type) {
case twitcheventsub.SubChannelChatMessage:
var chatEvent ChatMessageEvent
err := json.Unmarshal(msg.Payload.Event, &chatEvent)
if err != nil {
logger.Errorf(ctx, "unable to unserialize the chat message '%s': %v", msg.Payload.Event, err)
continue
}
logger.Tracef(ctx, "chat message: %#+v", msg)
msg := streamcontrol.ChatMessage{
CreatedAt: t,
UserID: streamcontrol.ChatUserID(chatEvent.ChatterUserID),
Username: chatEvent.ChatterUserName,
MessageID: streamcontrol.ChatMessageID(chatEvent.MessageID),
Message: chatEvent.Message.Text,
}
logger.Tracef(ctx, "resulting chat: %#+v", msg)
select {
case h.messagesOutChan <- msg:
default:
logger.Errorf(ctx, "the queue is full, have to drop %#+v", msg)
}
default:
logger.Warnf(ctx, "got an event on a channel I haven't subscribed to: '%s': %s", msg.Payload.Subscription.Type, messageSerialized)
}
continue
case "session_keepalive":
msgAbstract = &KeepaliveMessage{}
case "reconnect":
msgAbstract = &ReconnectMessage{}
case "revocation":
msgAbstract = &RevocationMessage{}
default:
logger.Debugf(ctx, "unknown message type: '%s': '%s'", header.Metadata.MessageType, messageSerialized)
continue
}
err = json.Unmarshal(messageSerialized, &msgAbstract)
if err != nil {
logger.Errorf(ctx, "unable to unserialize the %T message '%s': %v", msgAbstract, messageSerialized, err)
continue
}
logger.Tracef(ctx, "received %T: %#+v", msgAbstract, msgAbstract)
}
})
return h, nil
}
func (h *ChatHandlerSub) Close(ctx context.Context) error {
h.cancelFunc()
return nil
}
func (h *ChatHandlerSub) MessagesChan() <-chan streamcontrol.ChatMessage {
return h.messagesOutChan
}
// Common metadata for all messages
type MessageMetadata struct {
MessageID string `json:"message_id"`
MessageType string `json:"message_type"`
MessageTimestamp string `json:"message_timestamp"`
}
// Session (used in session_welcome and reconnect)
type Session struct {
ID string `json:"id"`
Status string `json:"status"`
KeepaliveTimeoutSeconds int `json:"keepalive_timeout_seconds"`
ReconnectURL *string `json:"reconnect_url"` // can be null
ConnectedAt string `json:"connected_at"`
}
// Payload for session_welcome
type WelcomePayload struct {
Session Session `json:"session"`
}
// Session Welcome message
type SessionWelcomeMessage struct {
Metadata MessageMetadata `json:"metadata"`
Payload WelcomePayload `json:"payload"`
}
// Subscription info (used in notifications)
type Subscription struct {
ID string `json:"id"`
Type string `json:"type"`
Version string `json:"version"`
Status string `json:"status"`
Cost int `json:"cost"`
Condition map[string]string `json:"condition"`
CreatedAt string `json:"created_at"`
Transport struct {
Method string `json:"method"`
SessionID string `json:"session_id"`
} `json:"transport"`
}
// Example: Channel Chat Message Event (payload.event for chat messages)
type ChatMessageEvent struct {
BroadcasterUserID string `json:"broadcaster_user_id"`
BroadcasterUserLogin string `json:"broadcaster_user_login"`
BroadcasterUserName string `json:"broadcaster_user_name"`
ChatterUserID string `json:"chatter_user_id"`
ChatterUserLogin string `json:"chatter_user_login"`
ChatterUserName string `json:"chatter_user_name"`
MessageID string `json:"message_id"`
Message struct {
Text string `json:"text"`
} `json:"message"`
Color string `json:"color"`
Badges []struct {
SetID string `json:"set_id"`
ID string `json:"id"`
Info string `json:"info"`
} `json:"badges"`
}
type NotificationPayload struct {
Subscription Subscription `json:"subscription"`
Event json.RawMessage `json:"event"`
}
type NotificationMessage struct {
Metadata MessageMetadata `json:"metadata"`
Payload NotificationPayload `json:"payload"`
}
type KeepaliveMessage struct {
Metadata MessageMetadata `json:"metadata"`
}
type ReconnectPayload struct {
Session Session `json:"session"`
}
type ReconnectMessage struct {
Metadata MessageMetadata `json:"metadata"`
Payload ReconnectPayload `json:"payload"`
}
type RevocationPayload struct {
Subscription Subscription `json:"subscription"`
}
type RevocationMessage struct {
Metadata MessageMetadata `json:"metadata"`
Payload RevocationPayload `json:"payload"`
}

View File

@@ -7,7 +7,15 @@ import (
"log"
"os"
"github.com/facebookincubator/go-belt"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/facebookincubator/go-belt/tool/logger/implementation/zap"
"github.com/nicklaw5/helix/v2"
"github.com/xaionaro-go/observability"
"github.com/xaionaro-go/streamctl/pkg/oauthhandler"
"github.com/xaionaro-go/streamctl/pkg/secret"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol/twitch"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol/twitch/auth"
)
func assertNoError(err error) {
@@ -18,10 +26,21 @@ func assertNoError(err error) {
}
func main() {
ctx := context.TODO()
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "syntax: chatlistener <channel_id>\n")
l := zap.Default().WithLevel(logger.LevelTrace)
ctx := context.Background()
ctx = logger.CtxWithLogger(ctx, l)
ctx = observability.OnInsecureDebug(ctx)
logger.Default = func() logger.Logger {
return l
}
defer belt.Flush(ctx)
oldUsage := flag.Usage
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "syntax: chatlistener [options] <channel_id>\n")
oldUsage()
}
clientID := flag.String("client-id", "", "client ID for a WebSockets subscription (if not provided IRC will be used, instead)")
clientSecret := flag.String("client-secret", "", "client secret for a WebSockets subscription (if not provided IRC will be used, instead)")
flag.Parse()
if flag.NArg() != 1 {
flag.Usage()
@@ -29,7 +48,15 @@ func main() {
}
channelID := flag.Arg(0)
h, err := twitch.NewChatHandler(ctx, channelID)
var (
h twitch.ChatHandler
err error
)
if *clientID != "" && *clientSecret != "" {
h, err = newChatHandlerWebsockets(ctx, *clientID, *clientSecret, channelID)
} else {
h, err = twitch.NewChatHandlerIRC(ctx, channelID)
}
assertNoError(err)
fmt.Println("started")
@@ -37,3 +64,51 @@ func main() {
fmt.Printf("%#+v\n", ev)
}
}
const oauthListenerPort = 8091
func newChatHandlerWebsockets(
ctx context.Context,
clientID string,
clientSecret string,
channelID string,
) (*twitch.ChatHandlerSub, error) {
options := &helix.Options{
ClientID: clientID,
ClientSecret: clientSecret,
RedirectURI: auth.RedirectURI(oauthListenerPort),
}
client, err := helix.NewClientWithContext(ctx, options)
if err != nil {
return nil, fmt.Errorf("unable to create a helix client object: %w", err)
}
var clientCode secret.String
err = auth.NewClientCode(
ctx,
clientID,
oauthhandler.OAuth2HandlerViaBrowser,
func() []uint16 {
return []uint16{oauthListenerPort}
}, func(code string) {
clientCode.Set(code)
},
)
if err != nil {
return nil, fmt.Errorf("unable to get a client code: %w", err)
}
accessToken, refreshToken, err := auth.NewTokenByUser(ctx, client, clientCode)
client.SetUserAccessToken(accessToken.Get())
client.SetRefreshToken(refreshToken.Get())
userID, err := twitch.GetUserID(ctx, client, channelID)
if err != nil {
return nil, fmt.Errorf("unable to get the user ID for login '%s': %w", channelID, err)
}
if observability.IsOnInsecureDebug(ctx) {
logger.Tracef(ctx, "user access token: %v", accessToken.Get())
}
h, err := twitch.NewChatHandlerSub(ctx, client, userID, nil)
if err != nil {
return nil, fmt.Errorf("unable to get a chat handler: %w", err)
}
return h, nil
}

View File

@@ -9,22 +9,25 @@ import (
"unicode"
"unicode/utf8"
"github.com/facebookincubator/go-belt"
"github.com/facebookincubator/go-belt/tool/experimental/errmon"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/hashicorp/go-multierror"
"github.com/nicklaw5/helix/v2"
"github.com/xaionaro-go/observability"
"github.com/xaionaro-go/streamctl/pkg/buildvars"
"github.com/xaionaro-go/streamctl/pkg/oauthhandler"
"github.com/xaionaro-go/streamctl/pkg/ringbuffer"
"github.com/xaionaro-go/streamctl/pkg/secret"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol/twitch/auth"
"github.com/xaionaro-go/xsync"
)
type Twitch struct {
closeCtx context.Context
closeFn context.CancelFunc
chatHandler *ChatHandler
chatHandlerSub *ChatHandlerSub
chatHandlerIRC *ChatHandlerIRC
client *helix.Client
config Config
broadcasterID string
@@ -45,6 +48,7 @@ func New(
cfg Config,
saveCfgFn func(Config) error,
) (*Twitch, error) {
ctx = belt.WithField(ctx, "controller", ID)
if cfg.Config.Channel == "" {
return nil, fmt.Errorf("'channel' is not set")
}
@@ -84,11 +88,11 @@ func New(
clientSecret: secret.New(clientSecret),
}
h, err := NewChatHandler(ctx, cfg.Config.Channel)
h, err := NewChatHandlerIRC(ctx, cfg.Config.Channel)
if err != nil {
return nil, fmt.Errorf("unable to initialize a chat handler for channel '%s': %w", cfg.Config.Channel, err)
}
t.chatHandler = h
t.chatHandlerIRC = h
client, err := t.getClient(ctx, oauthPorts[0])
if err != nil {
@@ -135,7 +139,7 @@ func New(
return t, nil
}
func getUserID(
func GetUserID(
_ context.Context,
client *helix.Client,
login string,
@@ -171,7 +175,7 @@ func (t *Twitch) prepareNoLock(ctx context.Context) error {
if t.broadcasterID != "" {
return
}
t.broadcasterID, err = getUserID(ctx, t.client, t.config.Config.Channel)
t.broadcasterID, err = GetUserID(ctx, t.client, t.config.Config.Channel)
if err != nil {
logger.Errorf(ctx, "unable to get broadcaster ID: %v", err)
return
@@ -183,6 +187,20 @@ func (t *Twitch) prepareNoLock(ctx context.Context) error {
t.config.Config.Channel,
)
})
if t.chatHandlerSub == nil {
t.chatHandlerSub, err = NewChatHandlerSub(
t.closeCtx, t.client, t.broadcasterID,
func(ctx context.Context) {
t.prepareLocker.Do(ctx, func() {
t.chatHandlerSub = nil
})
},
)
if err != nil {
logger.Errorf(ctx, "unable to initialize websockets based chat listener: %v", err)
}
}
return err
}
@@ -528,146 +546,25 @@ func (t *Twitch) getNewToken(
})
}
func authRedirectURI(listenPort uint16) string {
return fmt.Sprintf("http://localhost:%d/", listenPort)
}
func (t *Twitch) getNewClientCode(
ctx context.Context,
) (_err error) {
logger.Debugf(ctx, "getNewClientCode")
defer func() { logger.Debugf(ctx, "/getNewClientCode: %v", _err) }()
oauthHandler := t.config.Config.CustomOAuthHandler
if oauthHandler == nil {
oauthHandler = oauthhandler.OAuth2HandlerViaCLI
}
ctx, ctxCancelFunc := context.WithCancel(ctx)
cancelFunc := func() {
logger.Debugf(ctx, "cancelling the context")
ctxCancelFunc()
}
var errWg sync.WaitGroup
var resultErr error
errCh := make(chan error)
errWg.Add(1)
observability.Go(ctx, func(ctx context.Context) {
errWg.Done()
for err := range errCh {
errmon.ObserveErrorCtx(ctx, err)
resultErr = multierror.Append(resultErr, err)
}
})
alreadyListening := map[uint16]struct{}{}
var wg sync.WaitGroup
success := false
startHandlerForPort := func(listenPort uint16) {
if _, ok := alreadyListening[listenPort]; ok {
return
}
alreadyListening[listenPort] = struct{}{}
logger.Debugf(ctx, "starting the oauth handler at port %d", listenPort)
wg.Add(1)
{
listenPort := listenPort
observability.Go(ctx, func(ctx context.Context) {
defer func() { logger.Debugf(ctx, "ended the oauth handler at port %d", listenPort) }()
defer wg.Done()
authURL := GetAuthorizationURL(
&helix.AuthorizationURLParams{
ResponseType: "code", // or "token"
Scopes: []string{
"channel:manage:broadcast",
"moderator:manage:chat_messages",
"moderator:manage:banned_users",
},
},
return auth.NewClientCode(
ctx,
t.clientID,
authRedirectURI(listenPort),
)
arg := oauthhandler.OAuthHandlerArgument{
AuthURL: authURL,
ListenPort: listenPort,
ExchangeFn: func(ctx context.Context, code string) (_err error) {
logger.Debugf(ctx, "ExchangeFn()")
defer func() { logger.Debugf(ctx, "/ExchangeFn(): %v", _err) }()
if code == "" {
return fmt.Errorf("code is empty")
}
t.config.Config.CustomOAuthHandler,
t.config.Config.GetOAuthListenPorts,
func(code string) {
t.config.Config.ClientCode.Set(code)
err := t.saveCfgFn(t.config)
errmon.ObserveErrorCtx(ctx, err)
return nil
},
}
err := oauthHandler(ctx, arg)
if err != nil {
errCh <- fmt.Errorf("unable to get or exchange the oauth code to a token: %w", err)
return
}
cancelFunc()
success = true
})
}
}
// TODO: either support only one port as in New, or support multiple
// ports as we do below
getPortsFn := t.config.Config.GetOAuthListenPorts
if getPortsFn == nil {
return fmt.Errorf("the function GetOAuthListenPorts is not set")
}
for _, listenPort := range getPortsFn() {
startHandlerForPort(listenPort)
}
wg.Add(1)
observability.Go(ctx, func(ctx context.Context) {
defer wg.Done()
t := time.NewTicker(time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
}
ports := getPortsFn()
logger.Tracef(ctx, "oauth listener ports: %#+v", ports)
for _, listenPort := range ports {
startHandlerForPort(listenPort)
}
}
})
observability.Go(ctx, func(ctx context.Context) {
wg.Wait()
close(errCh)
})
<-ctx.Done()
logger.Debugf(ctx, "did successfully took a new client code? -- %v", success)
if !success {
errWg.Wait()
return resultErr
}
return nil
)
}
func (t *Twitch) getNewTokenByUser(
ctx context.Context,
) error {
logger.Debugf(ctx, "getNewTokenByUser")
defer func() { logger.Debugf(ctx, "/getNewTokenByUser") }()
if t.config.Config.ClientCode.Get() == "" {
err := t.getNewClientCode(ctx)
if err != nil {
@@ -675,29 +572,17 @@ func (t *Twitch) getNewTokenByUser(
}
}
if t.config.Config.ClientCode.Get() == "" {
return fmt.Errorf("internal error: ClientCode is empty")
accessToken, refreshToken, err := auth.NewTokenByUser(ctx, t.client, t.config.Config.ClientCode)
if err != nil {
return fmt.Errorf("unable to get an access token: %w", err)
}
logger.Debugf(ctx, "requesting user access token...")
resp, err := t.client.RequestUserAccessToken(t.config.Config.ClientCode.Get())
logger.Debugf(ctx, "requesting user access token result: %#+v %v", resp, err)
if err != nil {
return fmt.Errorf("unable to get user access token: %w", err)
}
if resp.ErrorStatus != 0 {
return fmt.Errorf(
"unable to query: %d %v: %v",
resp.ErrorStatus,
resp.Error,
resp.ErrorMessage,
)
}
t.client.SetUserAccessToken(resp.Data.AccessToken)
t.client.SetRefreshToken(resp.Data.RefreshToken)
logger.Debugf(ctx, "setting the user access token")
t.client.SetUserAccessToken(accessToken.Get())
t.client.SetRefreshToken(refreshToken.Get())
t.config.Config.ClientCode.Set("")
t.config.Config.UserAccessToken.Set(resp.Data.AccessToken)
t.config.Config.RefreshToken.Set(resp.Data.RefreshToken)
t.config.Config.UserAccessToken = accessToken
t.config.Config.RefreshToken = refreshToken
err = t.saveCfgFn(t.config)
errmon.ObserveErrorCtx(ctx, err)
return nil
@@ -706,24 +591,13 @@ func (t *Twitch) getNewTokenByUser(
func (t *Twitch) getNewTokenByApp(
ctx context.Context,
) error {
logger.Debugf(ctx, "getNewTokenByApp")
defer func() { logger.Debugf(ctx, "/getNewTokenByApp") }()
resp, err := t.client.RequestAppAccessToken(nil)
accessToken, err := auth.NewTokenByApp(ctx, t.client)
if err != nil {
return fmt.Errorf("unable to get app access token: %w", err)
}
if resp.ErrorStatus != 0 {
return fmt.Errorf(
"unable to get app access token (the response contains an error): %d %v: %v",
resp.ErrorStatus,
resp.Error,
resp.ErrorMessage,
)
return err
}
logger.Debugf(ctx, "setting the app access token")
t.client.SetAppAccessToken(resp.Data.AccessToken)
t.config.Config.AppAccessToken.Set(resp.Data.AccessToken)
t.client.SetAppAccessToken(accessToken.Get())
t.config.Config.AppAccessToken = accessToken
err = t.saveCfgFn(t.config)
errmon.ObserveErrorCtx(ctx, err)
return nil
@@ -739,7 +613,7 @@ func (t *Twitch) getClient(
options := &helix.Options{
ClientID: t.clientID,
ClientSecret: t.clientSecret.Get(),
RedirectURI: authRedirectURI(oauthListenPort), // TODO: delete this hardcode
RedirectURI: auth.RedirectURI(oauthListenPort), // TODO: delete this hardcode
}
client, err := helix.NewClientWithContext(ctx, options)
if err != nil {
@@ -748,31 +622,6 @@ func (t *Twitch) getClient(
return client, nil
}
func GetAuthorizationURL(
params *helix.AuthorizationURLParams,
clientID string,
redirectURI string,
) string {
url := helix.AuthBaseURL + "/authorize"
url += "?response_type=" + params.ResponseType
url += "&client_id=" + clientID
url += "&redirect_uri=" + redirectURI
if params.State != "" {
url += "&state=" + params.State
}
if params.ForceVerify {
url += "&force_verify=true"
}
if len(params.Scopes) != 0 {
url += "&scope=" + strings.Join(params.Scopes, "%20")
}
return url
}
func (t *Twitch) GetAllCategories(
ctx context.Context,
) ([]helix.Game, error) {
@@ -837,22 +686,100 @@ func (t *Twitch) GetChatMessagesChan(
logger.Debugf(ctx, "GetChatMessagesChan")
defer func() { logger.Debugf(ctx, "/GetChatMessagesChan") }()
if err := t.prepare(ctx); err != nil {
logger.Errorf(ctx, "unable to prepare the client: %v", err)
}
outCh := make(chan streamcontrol.ChatMessage)
recentMsgIDs := ringbuffer.New[streamcontrol.ChatMessageID](10)
sendEvent := func(ev streamcontrol.ChatMessage) {
recentMsgIDs.Add(ev.MessageID)
select {
case outCh <- ev:
default:
logger.Warnf(ctx, "the queue is full, dropping message %#+v", ev)
}
}
alreadySeen := func(msgID streamcontrol.ChatMessageID) bool {
return recentMsgIDs.Contains(msgID)
}
observability.Go(ctx, func(ctx context.Context) {
defer func() {
logger.Debugf(ctx, "closing the messages channel")
close(outCh)
}()
var (
chSub <-chan streamcontrol.ChatMessage
chIRC <-chan streamcontrol.ChatMessage
)
t.prepareLocker.Do(ctx, func() {
if t.chatHandlerSub != nil {
chSub = t.chatHandlerSub.MessagesChan()
}
if t.chatHandlerIRC != nil {
chIRC = t.chatHandlerIRC.MessagesChan()
}
})
logger.Debugf(ctx, "chSub == %p; chIRC == %p", chSub, chIRC)
for {
if chSub == nil {
t.prepareLocker.Do(ctx, func() {
if t.chatHandlerSub != nil {
chSub = t.chatHandlerSub.MessagesChan()
}
})
}
if chSub == nil && chIRC == nil {
logger.Debugf(ctx, "both channels are closed")
return
}
select {
case <-ctx.Done():
return
case ev, ok := <-t.chatHandler.MessagesChan():
case ev, ok := <-chSub:
if !ok {
logger.Debugf(ctx, "the input channel is closed")
return
chSub = nil
logger.Debugf(ctx, "the API receiver channel closed")
continue
}
outCh <- ev
logger.Tracef(ctx, "received a message from API: %#+v", ev)
if alreadySeen(ev.MessageID) {
logger.Tracef(ctx, "already seen message %s", ev.MessageID)
continue
}
sendEvent(ev)
case evIRC, ok := <-chIRC:
if !ok {
chIRC = nil
logger.Debugf(ctx, "the IRC receiver channel closed")
continue
}
logger.Tracef(ctx, "received a message from IRC: %#+v", evIRC)
if alreadySeen(evIRC.MessageID) {
logger.Tracef(ctx, "already seen message %s", evIRC.MessageID)
continue
}
// not previously seen message:
select {
case evSub, ok := <-chSub:
if !ok {
chSub = nil
break
}
logger.Tracef(ctx, "received a message from API: %#+v", evIRC)
sendEvent(evSub)
if alreadySeen(evIRC.MessageID) {
logger.Tracef(ctx, "the same message")
continue
}
case <-time.After(time.Second):
logger.Warnf(ctx, "received a message from IRC, but not from API")
}
sendEvent(evIRC)
}
}
})

View File

@@ -1,17 +1,15 @@
package twitch
import (
"context"
"github.com/xaionaro-go/streamctl/pkg/buildvars"
"github.com/xaionaro-go/streamctl/pkg/oauthhandler"
"github.com/xaionaro-go/streamctl/pkg/secret"
streamctl "github.com/xaionaro-go/streamctl/pkg/streamcontrol"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol/twitch/auth"
)
const ID = streamctl.PlatformName("twitch")
type OAuthHandler func(context.Context, oauthhandler.OAuthHandlerArgument) error
type OAuthHandler = auth.OAuthHandler
type PlatformSpecificConfig struct {
Channel string

View File

@@ -2,83 +2,43 @@ package youtube
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
"net/http"
"net/url"
"sync"
"time"
ytchat "github.com/abhinavxd/youtube-live-chat-downloader/v2"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/observability"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol"
"google.golang.org/api/googleapi"
"google.golang.org/api/youtube/v3"
)
const youtubeWatchURLString = `https://www.youtube.com/watch`
func chatCustomCookies() []*http.Cookie {
// borrowed from: https://github.com/abhinavxd/youtube-live-chat-downloader/blob/main/example/main.go
return []*http.Cookie{
{Name: "PREF",
Value: "tz=Europe.Rome",
MaxAge: 300},
{Name: "CONSENT",
Value: fmt.Sprintf("YES+yt.432048971.it+FX+%d", 100+rand.Intn(999-100+1)),
MaxAge: 300},
}
}
var youtubeWatchURL *url.URL
func init() {
var err error
youtubeWatchURL, err = url.Parse(youtubeWatchURLString)
if err != nil {
panic(err)
}
ytchat.AddCookies(chatCustomCookies())
}
func ytWatchURL(videoID string) *url.URL {
result := ptr(*youtubeWatchURL)
query := result.Query()
query.Add("v", videoID)
result.RawQuery = query.Encode()
return result
}
type ChatListener struct {
videoID string
continuationCode string
clientConfig ytchat.YtCfg
liveChatID string
client YouTubeChatClient
wg sync.WaitGroup
cancelFunc context.CancelFunc
messagesOutChan chan streamcontrol.ChatMessage
}
type YouTubeChatClient interface {
GetLiveChatMessages(ctx context.Context, chatID string, pageToken string, parts []string) (*youtube.LiveChatMessageListResponse, error)
}
func NewChatListener(
ctx context.Context,
ytClient YouTubeChatClient,
videoID string,
liveChatID string,
) (*ChatListener, error) {
if videoID == "" {
return nil, fmt.Errorf("video ID is empty")
}
watchURL := ytWatchURL(videoID)
continuationCode, cfg, err := ytchat.ParseInitialData(watchURL.String())
if err != nil {
return nil, fmt.Errorf("unable to fetch the initial data for chat messages retrieval (URL: %s): %w", watchURL, err)
}
ctx, cancelFunc := context.WithCancel(ctx)
l := &ChatListener{
videoID: videoID,
continuationCode: continuationCode,
clientConfig: cfg,
liveChatID: liveChatID,
client: ytClient,
cancelFunc: cancelFunc,
messagesOutChan: make(chan streamcontrol.ChatMessage, 100),
}
@@ -90,52 +50,81 @@ func NewChatListener(
close(l.messagesOutChan)
}()
err := l.listenLoop(ctx)
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, ytchat.ErrLiveStreamOver) {
if err != nil && !errors.Is(err, context.Canceled) && !errors.As(err, &ErrChatEnded{}) {
logger.Errorf(ctx, "the listener loop returned an error: %v", err)
}
})
return l, nil
}
const chatFetchRetryInterval = time.Second
func (l *ChatListener) listenLoop(ctx context.Context) (_err error) {
logger.Debugf(ctx, "listenLoop")
defer func() { logger.Debugf(ctx, "/listenLoop: %v", _err) }()
var pageToken string
for {
msgs, newContinuation, err := ytchat.FetchContinuationChat(l.continuationCode, l.clientConfig)
switch err {
case nil:
case ytchat.ErrLiveStreamOver:
return err
select {
case <-ctx.Done():
return ctx.Err()
default:
logger.Errorf(
}
response, err := l.client.GetLiveChatMessages(
ctx,
"unable to get a continuation for %v: %v; retrying in %v",
l.videoID,
chatFetchRetryInterval,
err,
l.liveChatID,
pageToken,
[]string{"snippet", "authorDetails"},
)
time.Sleep(chatFetchRetryInterval)
if err != nil {
gErr := &googleapi.Error{}
if !errors.As(err, &gErr) {
logger.Warnf(ctx, "unable to get chat messages: %v", err)
continue
}
for _, e := range gErr.Errors {
switch e.Reason {
case "liveChatEnded":
return ErrChatEnded{ChatID: l.liveChatID}
case "liveChatDisabled":
return ErrChatDisabled{ChatID: l.liveChatID}
case "liveChatNotFound":
return ErrChatNotFound{ChatID: l.liveChatID}
}
}
b, _ := json.Marshal(err)
logger.Warnf(ctx, "unable to get chat messages: %v (%s)", err, b)
continue
}
l.continuationCode = newContinuation
for _, msg := range msgs {
l.messagesOutChan <- streamcontrol.ChatMessage{
CreatedAt: msg.Timestamp,
UserID: streamcontrol.ChatUserID(msg.AuthorName),
Username: msg.AuthorName,
// TODO: find a way to extract the message ID,
// in the mean while we we use a soft key for that:
MessageID: streamcontrol.ChatMessageID(fmt.Sprintf("%s/%s", msg.AuthorName, msg.Message)),
Message: msg.Message,
for _, item := range response.Items {
publishedAt, err := ParseTimestamp(item.Snippet.PublishedAt)
if err != nil {
logger.Errorf(ctx, "unable to parse the timestamp '%s': %v", item.Snippet.PublishedAt, err)
}
msg := streamcontrol.ChatMessage{
CreatedAt: publishedAt,
UserID: streamcontrol.ChatUserID(item.AuthorDetails.ChannelId),
Username: item.AuthorDetails.DisplayName,
MessageID: streamcontrol.ChatMessageID(item.Id),
Message: item.Snippet.DisplayMessage,
}
if item.Snippet.SuperChatDetails != nil {
msg.Paid.Currency = streamcontrol.CurrencyOther
msg.Paid.Amount = float64(item.Snippet.SuperChatDetails.AmountMicros) / 1000000
}
select {
case l.messagesOutChan <- msg:
default:
logger.Errorf(ctx, "the queue is full, have to drop %#+v", msg)
}
}
pageToken = response.NextPageToken
time.Sleep(time.Millisecond * time.Duration(response.PollingIntervalMillis))
}
}
func (h *ChatListener) Close() error {
func (h *ChatListener) Close(ctx context.Context) error {
h.cancelFunc()
return nil
}
@@ -143,3 +132,7 @@ func (h *ChatListener) Close() error {
func (h *ChatListener) MessagesChan() <-chan streamcontrol.ChatMessage {
return h.messagesOutChan
}
func (h *ChatListener) GetVideoID() string {
return h.videoID
}

View File

@@ -0,0 +1,158 @@
package youtube
import (
"context"
"errors"
"fmt"
"math/rand"
"net/http"
"net/url"
"sync"
"time"
ytchat "github.com/abhinavxd/youtube-live-chat-downloader/v2"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/observability"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol"
)
const youtubeWatchURLString = `https://www.youtube.com/watch`
func chatCustomCookies() []*http.Cookie {
// borrowed from: https://github.com/abhinavxd/youtube-live-chat-downloader/blob/main/example/main.go
return []*http.Cookie{
{Name: "PREF",
Value: "tz=Europe.Rome",
MaxAge: 300},
{Name: "CONSENT",
Value: fmt.Sprintf("YES+yt.432048971.it+FX+%d", 100+rand.Intn(999-100+1)),
MaxAge: 300},
}
}
var youtubeWatchURL *url.URL
func init() {
var err error
youtubeWatchURL, err = url.Parse(youtubeWatchURLString)
if err != nil {
panic(err)
}
ytchat.AddCookies(chatCustomCookies())
}
func ytWatchURL(videoID string) *url.URL {
result := ptr(*youtubeWatchURL)
query := result.Query()
query.Add("v", videoID)
result.RawQuery = query.Encode()
return result
}
type ChatListenerOBSOLETE struct {
videoID string
continuationCode string
clientConfig ytchat.YtCfg
wg sync.WaitGroup
cancelFunc context.CancelFunc
messagesOutChan chan streamcontrol.ChatMessage
}
func NewChatListenerOBSOLETE(
ctx context.Context,
videoID string,
onClose func(context.Context, *chatListener),
) (*ChatListenerOBSOLETE, error) {
if videoID == "" {
return nil, fmt.Errorf("video ID is empty")
}
watchURL := ytWatchURL(videoID)
continuationCode, cfg, err := ytchat.ParseInitialData(watchURL.String())
if err != nil {
return nil, fmt.Errorf("unable to fetch the initial data for chat messages retrieval (URL: %s): %w", watchURL, err)
}
ctx, cancelFunc := context.WithCancel(ctx)
l := &ChatListenerOBSOLETE{
videoID: videoID,
continuationCode: continuationCode,
clientConfig: cfg,
cancelFunc: cancelFunc,
messagesOutChan: make(chan streamcontrol.ChatMessage, 100),
}
l.wg.Add(1)
observability.Go(ctx, func(ctx context.Context) {
defer l.wg.Done()
if onClose != nil {
defer onClose(ctx, l)
}
defer func() {
logger.Debugf(ctx, "the listener loop is finished")
close(l.messagesOutChan)
}()
err := l.listenLoop(ctx)
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, ytchat.ErrLiveStreamOver) {
logger.Errorf(ctx, "the listener loop returned an error: %v", err)
}
})
return l, nil
}
const chatFetchRetryInterval = time.Second
func (l *ChatListenerOBSOLETE) listenLoop(ctx context.Context) (_err error) {
logger.Debugf(ctx, "listenLoop")
defer func() { logger.Debugf(ctx, "/listenLoop: %v", _err) }()
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
msgs, newContinuation, err := ytchat.FetchContinuationChat(l.continuationCode, l.clientConfig)
switch err {
case nil:
case ytchat.ErrLiveStreamOver:
return err
default:
logger.Errorf(
ctx,
"unable to get a continuation for %v: %v; retrying in %v",
l.videoID,
chatFetchRetryInterval,
err,
)
time.Sleep(chatFetchRetryInterval)
continue
}
l.continuationCode = newContinuation
for _, msg := range msgs {
l.messagesOutChan <- streamcontrol.ChatMessage{
CreatedAt: msg.Timestamp,
UserID: streamcontrol.ChatUserID(msg.AuthorName),
Username: msg.AuthorName,
// TODO: find a way to extract the message ID,
// in the mean while we we use a soft key for that:
MessageID: streamcontrol.ChatMessageID(fmt.Sprintf("%s/%s", msg.AuthorName, msg.Message)),
Message: msg.Message,
}
}
}
}
func (h *ChatListenerOBSOLETE) Close(ctx context.Context) error {
h.cancelFunc()
return nil
}
func (h *ChatListenerOBSOLETE) MessagesChan() <-chan streamcontrol.ChatMessage {
return h.messagesOutChan
}
func (h *ChatListenerOBSOLETE) GetVideoID() string {
return h.videoID
}

View File

@@ -0,0 +1,29 @@
package youtube
import (
"fmt"
)
type ErrChatNotFound struct {
ChatID string
}
func (e ErrChatNotFound) Error() string {
return fmt.Sprintf("chat '%s' not found", e.ChatID)
}
type ErrChatDisabled struct {
ChatID string
}
func (e ErrChatDisabled) Error() string {
return fmt.Sprintf("chat '%s' is disabled", e.ChatID)
}
type ErrChatEnded struct {
ChatID string
}
func (e ErrChatEnded) Error() string {
return fmt.Sprintf("chat '%s' ended", e.ChatID)
}

View File

@@ -48,6 +48,8 @@ type YouTube struct {
currentLiveBroadcastsLocker xsync.Mutex
currentLiveBroadcasts []*youtube.LiveBroadcast
chatListeners map[string]*chatListener
messagesOutChan chan streamcontrol.ChatMessage
}
@@ -58,11 +60,14 @@ const (
debugUseMockClient = false
)
type chatListener = ChatListenerOBSOLETE
func New(
ctx context.Context,
cfg Config,
saveCfgFn func(Config) error,
) (*YouTube, error) {
ctx = belt.WithField(ctx, "controller", ID)
if cfg.Config.ClientID == "" || cfg.Config.ClientSecret.Get() == "" {
return nil, fmt.Errorf(
"'clientid' or/and 'clientsecret' is/are not set; go to https://console.cloud.google.com/apis/credentials and create an app if it not created, yet",
@@ -76,6 +81,8 @@ func New(
SaveConfigFunc: saveCfgFn,
CancelFunc: cancelFn,
chatListeners: map[string]*chatListener{},
messagesOutChan: make(chan streamcontrol.ChatMessage, 100),
}
@@ -955,7 +962,7 @@ func (yt *YouTube) StartStream(
}
}
yt.currentLiveBroadcasts = append(yt.currentLiveBroadcasts, newBroadcast)
err = yt.startChatListener(ctx, newBroadcast.Id)
err = yt.startChatListener(ctx, newBroadcast)
if err != nil {
logger.Errorf(ctx, "unable to start a chat listener for video '%s': %v", newBroadcast.Id, err)
}
@@ -979,21 +986,38 @@ func setProfile(broadcast *youtube.LiveBroadcast, profile StreamProfile) {
func (yt *YouTube) startChatListener(
ctx context.Context,
videoID string,
broadcast *youtube.LiveBroadcast,
) (_err error) {
videoID := broadcast.Id
chatID := broadcast.Snippet.LiveChatId
ctx = belt.WithField(ctx, "video_id", videoID)
ctx = belt.WithField(ctx, "chat_id", chatID)
ctx = xcontext.DetachDone(ctx)
logger.Debugf(ctx, "startChatListener(ctx, '%s')", videoID)
defer func() { logger.Debugf(ctx, "/startChatListener(ctx, '%s'): %v", videoID, _err) }()
logger.Debugf(ctx, "startChatListener(ctx, '%s':'%s')", videoID, chatID)
defer func() { logger.Debugf(ctx, "/startChatListener(ctx, '%s':'%s'): %v", videoID, chatID, _err) }()
chatListener, err := NewChatListener(ctx, videoID)
_chatListener, err := NewChatListenerOBSOLETE(ctx, videoID, func(
ctx context.Context,
_chatListener *chatListener,
) {
yt.deleteChatListener(ctx, _chatListener)
})
if err != nil {
return fmt.Errorf("unable to initialize the chat listener instance: %w", err)
}
oldListener := xsync.DoR1(ctx, &yt.locker, func() *chatListener {
oldListener := yt.chatListeners[broadcast.Id]
yt.chatListeners[broadcast.Id] = _chatListener
return oldListener
})
if err := oldListener.Close(ctx); err != nil {
logger.Debugf(ctx, "unable to close the old chat listener: %v", err)
}
observability.Go(ctx, func(ctx context.Context) {
err := yt.processChatListener(ctx, chatListener)
err := yt.processChatListener(ctx, _chatListener)
if err != nil && !errors.Is(err, context.Canceled) {
logger.Errorf(ctx, "unable to process the chat listener for '%s': %v", videoID, err)
}
@@ -1001,18 +1025,45 @@ func (yt *YouTube) startChatListener(
return nil
}
func (yt *YouTube) deleteChatListenerByBroadcast(
ctx context.Context,
broadcast *youtube.LiveBroadcast,
) error {
chatListener := yt.getChatListener(ctx, broadcast)
if chatListener == nil {
return nil
}
return yt.deleteChatListener(ctx, chatListener)
}
func (yt *YouTube) deleteChatListener(
ctx context.Context,
chatListener *chatListener,
) error {
err := chatListener.Close(ctx)
if err != nil {
logger.Warnf(ctx, "unable to close the chat listener for %s: %v", chatListener.GetVideoID(), err)
}
yt.locker.Do(ctx, func() {
if yt.chatListeners[chatListener.GetVideoID()] == chatListener {
delete(yt.chatListeners, chatListener.GetVideoID())
}
})
return nil
}
func (yt *YouTube) processChatListener(
ctx context.Context,
chatListener *ChatListener,
chatListener *chatListener,
) (_err error) {
defer func() {
err := chatListener.Close()
err := yt.deleteChatListener(ctx, chatListener)
if err != nil {
logger.Errorf(ctx, "unable to close the chat listener for '%s': %v", chatListener.videoID, err)
logger.Errorf(ctx, "unable to delete the chat listener for '%s': %v", chatListener.GetVideoID(), err)
}
}()
defer func() {
logger.Debugf(ctx, "stopped listening for chat messages in '%s': %v", chatListener.videoID, _err)
logger.Debugf(ctx, "stopped listening for chat messages in '%s': %v", chatListener.GetVideoID(), _err)
}()
inChan := chatListener.MessagesChan()
for {
@@ -1031,6 +1082,15 @@ func (yt *YouTube) processChatListener(
}
}
func (yt *YouTube) getChatListener(
ctx context.Context,
broadcast *youtube.LiveBroadcast,
) *chatListener {
return xsync.DoR1(ctx, &yt.locker, func() *chatListener {
return yt.chatListeners[broadcast.Id]
})
}
func (yt *YouTube) EndStream(
ctx context.Context,
) error {
@@ -1043,6 +1103,9 @@ func (yt *YouTube) EndStream(
})
return yt.updateActiveBroadcasts(ctx, func(broadcast *youtube.LiveBroadcast) error {
if err := yt.deleteChatListenerByBroadcast(ctx, broadcast); err != nil {
logger.Warnf(ctx, "unable to delete the chat listener for %s: %v", broadcast.Id, err)
}
broadcast.ContentDetails.EnableAutoStop = true
broadcast.ContentDetails.MonitorStream.ForceSendFields = []string{"BroadcastStreamDelayMs"}
if _, ok := expectedVideoIDs[broadcast.Id]; !ok {
@@ -1055,6 +1118,18 @@ func (yt *YouTube) EndStream(
const timeLayout = "2006-01-02T15:04:05-0700"
const timeLayoutFallback = time.RFC3339
func ParseTimestamp(s string) (time.Time, error) {
ts, err0 := time.Parse(timeLayout, s)
if err0 == nil {
return ts, nil
}
ts, err1 := time.Parse(timeLayoutFallback, s)
if err1 == nil {
return ts, nil
}
return time.Now(), errors.Join(err0, err1)
}
func (yt *YouTube) GetStreamStatus(
ctx context.Context,
) (_ret *streamcontrol.StreamStatus, _err error) {
@@ -1074,18 +1149,9 @@ func (yt *YouTube) GetStreamStatus(
var requestStatsVideoIDs []string
err := yt.IterateActiveBroadcasts(ctx, func(broadcast *youtube.LiveBroadcast) error {
ts := broadcast.Snippet.ActualStartTime
_startedAt, err := time.Parse(timeLayout, ts)
_startedAt, err := ParseTimestamp(ts)
if err != nil {
_startedAt, err = time.Parse(timeLayoutFallback, ts)
if err != nil {
return fmt.Errorf(
"unable to parse '%s' with layouts '%s' and '%s': %w",
ts,
timeLayout,
timeLayoutFallback,
err,
)
}
return fmt.Errorf("unable to parse '%s': %w", ts, err)
}
startedAt = &_startedAt
if broadcast.Statistics != nil {
@@ -1124,7 +1190,7 @@ func (yt *YouTube) GetStreamStatus(
if _, ok := ids[newBroadcast.Id]; ok {
continue
}
err = yt.startChatListener(ctx, newBroadcast.Id)
err = yt.startChatListener(ctx, newBroadcast)
if err != nil {
logger.Errorf(ctx, "unable to start a chat listener for video '%s': %v", newBroadcast.Id, err)
}

View File

@@ -37,6 +37,7 @@ func (t BroadcastType) String() string {
}
type YouTubeClient interface {
YouTubeChatClient
Ping(context.Context) error
GetBroadcasts(ctx context.Context, t BroadcastType, ids []string, parts []string, pageToken string) (*youtube.LiveBroadcastListResponse, error)
UpdateBroadcast(context.Context, *youtube.LiveBroadcast, []string) error
@@ -309,3 +310,18 @@ func (c *YouTubeClientV3) DeleteChatMessage(
do := c.Service.LiveChatMessages.Delete(messageID).Context(ctx).Do
return wrapRequest(ctx, c.RequestWrapper, do)
}
func (c *YouTubeClientV3) GetLiveChatMessages(
ctx context.Context,
chatID string,
pageToken string,
parts []string,
) (_ret *youtube.LiveChatMessageListResponse, _err error) {
logger.Tracef(ctx, "GetLiveChatMessages")
defer func() { logger.Tracef(ctx, "/GetLiveChatMessages: %v", _err) }()
q := c.Service.LiveChatMessages.List(chatID, parts).Context(ctx)
if pageToken != "" {
q = q.PageToken(pageToken)
}
return q.Do()
}

View File

@@ -208,3 +208,13 @@ func (c *YouTubeClientCalcPoints) DeleteChatMessage(
defer func() { c.addUsedPointsIfNoError(ctx, 1, _err) }()
return c.Client.DeleteChatMessage(ctx, messageID)
}
func (c *YouTubeClientCalcPoints) GetLiveChatMessages(
ctx context.Context,
chatID string,
pageToken string,
parts []string,
) (_ret *youtube.LiveChatMessageListResponse, _err error) {
defer func() { c.addUsedPointsIfNoError(ctx, 1, _err) }()
return c.Client.GetLiveChatMessages(ctx, chatID, pageToken, parts)
}

View File

@@ -177,3 +177,14 @@ func (c *YouTubeClientMock) DeleteChatMessage(
defer func() { logger.Tracef(ctx, "/DeleteChatMessage: %v", _err) }()
return fmt.Errorf("not implemented")
}
func (c *YouTubeClientMock) GetLiveChatMessages(
ctx context.Context,
chatID string,
pageToken string,
parts []string,
) (_ret *youtube.LiveChatMessageListResponse, _err error) {
logger.Tracef(ctx, "GetLiveChatMessages")
defer func() { logger.Tracef(ctx, "/GetLiveChatMessages: %v", _err) }()
return nil, fmt.Errorf("not implemented")
}

View File

@@ -0,0 +1,25 @@
package youtube
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestParseTimestamp(t *testing.T) {
type testCase struct {
input string
output time.Time
}
for _, testCase := range []testCase{
{input: "2025-07-09T11:49:53Z", output: time.Date(2025, 7, 9, 11, 49, 53, 0, time.UTC)},
} {
t.Run(testCase.input, func(t *testing.T) {
r, err := ParseTimestamp(testCase.input)
require.NoError(t, err)
require.Equal(t, testCase.output, r.UTC())
})
}
}

View File

@@ -55,7 +55,7 @@ func (d *StreamD) startListeningForChatMessages(
if err := d.ChatMessagesStorage.AddMessage(ctx, msg); err != nil {
logger.Errorf(ctx, "unable to add the message %#+v to the chat messages storage: %v", msg, err)
}
d.publishEvent(ctx, msg)
publishEvent(ctx, d.EventBus, msg)
}
}
})
@@ -113,7 +113,7 @@ func (d *StreamD) SubscribeToChatMessages(
defer func() { logger.Tracef(ctx, "/SubscribeToChatMessages(ctx, %v, %v): %p %v", since, limit, _ret, _err) }()
return eventSubToChan(
ctx, d,
ctx, d.EventBus, 1000,
func(ctx context.Context, outCh chan api.ChatMessage) {
logger.Tracef(ctx, "backfilling the channel")
defer func() { logger.Tracef(ctx, "/backfilling the channel") }()

View File

@@ -4,6 +4,7 @@ import (
"bytes"
"context"
"crypto"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
@@ -42,7 +43,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
)
@@ -163,7 +164,9 @@ func (c *Client) connect(
) (*grpc.ClientConn, error) {
opts := []grpc.DialOption{
grpc.WithTransportCredentials(
insecure.NewCredentials(),
credentials.NewTLS(&tls.Config{
InsecureSkipVerify: true,
}),
),
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoff.Config{

View File

@@ -4,11 +4,11 @@ import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
"github.com/davecgh/go-spew/spew"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/eventbus"
"github.com/xaionaro-go/observability"
"github.com/xaionaro-go/streamctl/pkg/expression"
"github.com/xaionaro-go/streamctl/pkg/streamd/api"
@@ -58,6 +58,19 @@ func (d *StreamD) submitEvent(
return nil
}
func publishEvent[E any](
ctx context.Context,
bus *eventbus.EventBus,
event E,
) {
logger.Debugf(ctx, "publishEvent[%T](ctx, %#+v)", event, event)
defer logger.Debugf(ctx, "/publishEvent[%T](ctx, %#+v)", event, event)
result := eventbus.SendEvent(ctx, bus, event)
if result.DropCountImmediate != 0 || result.DropCountDeferred != 0 {
logger.Warnf(ctx, "unable to deliver the event to some of the subscriptions: %d + %d", result.DropCountImmediate, result.DropCountDeferred)
}
}
func (d *StreamD) doAction(
ctx context.Context,
a action.Action,
@@ -92,118 +105,95 @@ func (d *StreamD) doAction(
func eventSubToChan[T any](
ctx context.Context,
d *StreamD,
eventBus *eventbus.EventBus,
queueSize uint,
onReady func(ctx context.Context, outCh chan T),
) (<-chan T, error) {
var sample T
logger.Debugf(ctx, "eventSubToChan[%T]", sample)
defer func() { logger.Debugf(ctx, "/eventSubToChan[%T]", sample) }()
topic := eventTopic(sample)
return eventSubToChanUsingTopic(ctx, d, onReady, topic)
var topic T
logger.Debugf(ctx, "eventSubToChan[%T]", topic)
defer func() { logger.Debugf(ctx, "/eventSubToChan[%T]", topic) }()
return eventSubToChanUsingTopic(ctx, eventBus, queueSize, onReady, topic)
}
func eventSubToChanUsingTopic[T any](
func eventSubToChanUsingTopic[T, E any](
ctx context.Context,
d *StreamD,
onReady func(ctx context.Context, outCh chan T),
topic string,
) (<-chan T, error) {
var mutex sync.Mutex
r := make(chan T)
callback := func(in T) {
mutex.Lock()
defer mutex.Unlock()
logger.Tracef(ctx, "eventSubToChanUsingTopic(%T): received %#+v", topic, in)
eventBus *eventbus.EventBus,
queueSize uint,
onReady func(ctx context.Context, outCh chan E),
topic T,
) (<-chan E, error) {
var sample E
logger.Debugf(ctx, "eventSubToChanUsingTopic[%T, %T]", topic, sample)
defer func() { logger.Debugf(ctx, "/eventSubToChanUsingTopic[%T, %T]", topic, sample) }()
select {
case <-ctx.Done():
return
default:
opts := eventbus.Options{
eventbus.OptionQueueSize(1),
eventbus.OptionOnOverflow(eventbus.OnOverflowPileUpOrClose(queueSize, 10*time.Second)),
eventbus.OptionOnUnsubscribe[E](func(_ context.Context, sub *eventbus.Subscription[E]) {
logger.Debugf(ctx, "eventSubToChanUsingTopic[%T, %T]: unsubscribed", topic, sample)
}),
}
select {
case r <- in:
case <-time.After(time.Minute):
logger.Errorf(ctx, "unable to notify about '%s': timeout", topic)
}
}
if onReady != nil {
observability.Go(ctx, func(ctx context.Context) {
mutex.Lock()
defer mutex.Unlock()
err := d.EventBus.SubscribeAsync(topic, callback, true)
if err != nil {
logger.Errorf(ctx, "unable to subscribe: %v", err)
return
}
onReady(ctx, r)
})
} else {
err := d.EventBus.SubscribeAsync(topic, callback, true)
if err != nil {
return nil, fmt.Errorf("unable to subscribe: %w", err)
}
opts = append(opts,
eventbus.OptionOnSubscribed[E](func(
ctx context.Context,
sub *eventbus.Subscription[E],
) {
onReady(ctx, sub.EventChan())
}),
)
}
observability.Go(ctx, func(ctx context.Context) {
<-ctx.Done()
d.EventBus.Unsubscribe(topic, callback)
d.EventBus.WaitAsync()
close(r)
})
return r, nil
sub := eventbus.SubscribeWithCustomTopic[T, E](ctx, eventBus, topic, opts...)
return sub.EventChan(), nil
}
func (d *StreamD) SubscribeToDashboardChanges(
ctx context.Context,
) (<-chan api.DiffDashboard, error) {
return eventSubToChan[api.DiffDashboard](ctx, d, nil)
return eventSubToChan[api.DiffDashboard](ctx, d.EventBus, 1000, nil)
}
func (d *StreamD) SubscribeToConfigChanges(
ctx context.Context,
) (<-chan api.DiffConfig, error) {
return eventSubToChan[api.DiffConfig](ctx, d, nil)
return eventSubToChan[api.DiffConfig](ctx, d.EventBus, 1000, nil)
}
func (d *StreamD) SubscribeToStreamsChanges(
ctx context.Context,
) (<-chan api.DiffStreams, error) {
return eventSubToChan[api.DiffStreams](ctx, d, nil)
return eventSubToChan[api.DiffStreams](ctx, d.EventBus, 1000, nil)
}
func (d *StreamD) SubscribeToStreamServersChanges(
ctx context.Context,
) (<-chan api.DiffStreamServers, error) {
return eventSubToChan[api.DiffStreamServers](ctx, d, nil)
return eventSubToChan[api.DiffStreamServers](ctx, d.EventBus, 1000, nil)
}
func (d *StreamD) SubscribeToStreamDestinationsChanges(
ctx context.Context,
) (<-chan api.DiffStreamDestinations, error) {
return eventSubToChan[api.DiffStreamDestinations](ctx, d, nil)
return eventSubToChan[api.DiffStreamDestinations](ctx, d.EventBus, 1000, nil)
}
func (d *StreamD) SubscribeToIncomingStreamsChanges(
ctx context.Context,
) (<-chan api.DiffIncomingStreams, error) {
return eventSubToChan[api.DiffIncomingStreams](ctx, d, nil)
return eventSubToChan[api.DiffIncomingStreams](ctx, d.EventBus, 1000, nil)
}
func (d *StreamD) SubscribeToStreamForwardsChanges(
ctx context.Context,
) (<-chan api.DiffStreamForwards, error) {
return eventSubToChan[api.DiffStreamForwards](ctx, d, nil)
return eventSubToChan[api.DiffStreamForwards](ctx, d.EventBus, 1000, nil)
}
func (d *StreamD) SubscribeToStreamPlayersChanges(
ctx context.Context,
) (<-chan api.DiffStreamPlayers, error) {
return eventSubToChan[api.DiffStreamPlayers](ctx, d, nil)
return eventSubToChan[api.DiffStreamPlayers](ctx, d.EventBus, 1000, nil)
}
func (d *StreamD) notifyStreamPlayerStart(
@@ -213,5 +203,5 @@ func (d *StreamD) notifyStreamPlayerStart(
logger.Debugf(ctx, "notifyStreamPlayerStart")
defer logger.Debugf(ctx, "/notifyStreamPlayerStart")
d.publishEvent(ctx, api.DiffStreamPlayers{})
publishEvent(ctx, d.EventBus, api.DiffStreamPlayers{})
}

View File

@@ -1,31 +0,0 @@
package streamd
import (
"context"
"fmt"
"reflect"
"github.com/facebookincubator/go-belt/tool/logger"
)
type Event interface{}
func eventTopic(
event Event,
) string {
t := reflect.ValueOf(event).Type()
if t.Kind() == reflect.Pointer {
t = t.Elem()
}
return fmt.Sprintf("type:", t.Name())
}
func (d *StreamD) publishEvent(
ctx context.Context,
event Event,
) {
topic := eventTopic(event)
logger.Debugf(ctx, "publishEvent(ctx, %#+v): %s", event, topic)
defer logger.Debugf(ctx, "/publishEvent(ctx, %#+v): %s", event, topic)
d.EventBus.Publish(topic, event)
}

View File

@@ -62,8 +62,10 @@ func (d *StreamD) getImageBytes(
func (d *StreamD) initImageTaker(ctx context.Context) error {
observability.Go(ctx, func(ctx context.Context) {
ctxCh, cancelFn := context.WithCancel(ctx)
defer cancelFn()
defer logger.Debugf(ctx, "/imageTaker")
ch, err := d.SubscribeToDashboardChanges(ctx)
ch, restartCh, err := autoResubscribe(ctxCh, d.SubscribeToDashboardChanges)
if err != nil {
logger.Errorf(ctx, "unable to subscribe to dashboard changes: %v", err)
return
@@ -72,6 +74,8 @@ func (d *StreamD) initImageTaker(ctx context.Context) error {
select {
case <-ctx.Done():
return
case <-restartCh:
d.restartImageTaker(ctx)
case <-ch:
d.restartImageTaker(ctx)
}
@@ -85,7 +89,9 @@ func (d *StreamD) restartImageTaker(ctx context.Context) error {
return xsync.DoA1R1(ctx, &d.imageTakerLocker, d.restartImageTakerNoLock, ctx)
}
func (d *StreamD) restartImageTakerNoLock(ctx context.Context) error {
func (d *StreamD) restartImageTakerNoLock(ctx context.Context) (_err error) {
logger.Debugf(ctx, "restartImageTakerNoLock")
defer func() { logger.Debugf(ctx, "/restartImageTakerNoLock: %v", _err) }()
if d.imageTakerCancel != nil {
d.imageTakerCancel()
d.imageTakerCancel = nil

View File

@@ -14,6 +14,7 @@ import (
"github.com/goccy/go-yaml"
"github.com/google/uuid"
"github.com/hashicorp/go-multierror"
"github.com/xaionaro-go/observability"
"github.com/xaionaro-go/player/pkg/player/protobuf/go/player_grpc"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol"
"github.com/xaionaro-go/streamctl/pkg/streamd"
@@ -1649,15 +1650,30 @@ func wrapChan[T any, E any](
if err != nil {
return err
}
errCh := make(chan error, 1)
for {
var input E
var ok bool
select {
case <-ctx.Done():
return ctx.Err()
case input = <-ch:
case input, ok = <-ch:
}
if !ok {
return fmt.Errorf("channel is closed")
}
result := parse(input)
err := sender.Send(&result)
sendCtx, cancelFn := context.WithTimeout(ctx, time.Minute)
observability.Go(ctx, func(ctx context.Context) {
errCh <- sender.Send(&result)
})
select {
case <-sendCtx.Done():
logger.Warnf(ctx, "sending timed out")
cancelFn()
return sendCtx.Err()
case err := <-errCh:
cancelFn()
if err != nil {
return fmt.Errorf(
"unable to send %#+v: %w",
@@ -1667,6 +1683,7 @@ func wrapChan[T any, E any](
}
}
}
}
func (grpc *GRPCServer) SubscribeToConfigChanges(
req *streamd_grpc.SubscribeToConfigChangesRequest,

View File

@@ -10,6 +10,7 @@ import (
"github.com/andreykaipov/goobs"
"github.com/andreykaipov/goobs/api/events"
"github.com/andreykaipov/goobs/api/events/subscriptions"
"github.com/facebookincubator/go-belt"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/hashicorp/go-multierror"
"github.com/xaionaro-go/object"
@@ -331,6 +332,7 @@ func (d *StreamD) processOBSEvent(
}
func (d *StreamD) initTwitchBackend(ctx context.Context) error {
ctx = belt.WithField(ctx, "controller", twitch.ID)
twitch, err := newTwitch(
ctx,
d.Config.Backends[twitch.ID],
@@ -348,6 +350,7 @@ func (d *StreamD) initTwitchBackend(ctx context.Context) error {
}
func (d *StreamD) initKickBackend(ctx context.Context) error {
ctx = belt.WithField(ctx, "controller", kick.ID)
cacheHashBeforeInit, _ := object.CalcCryptoHash(d.Cache.Kick)
kick, err := newKick(
kick.CtxWithCache(ctx, &d.Cache.Kick),
@@ -373,6 +376,7 @@ func (d *StreamD) initKickBackend(ctx context.Context) error {
}
func (d *StreamD) initYouTubeBackend(ctx context.Context) error {
ctx = belt.WithField(ctx, "controller", youtube.ID)
youTube, err := newYouTube(
ctx,
d.Config.Backends[youtube.ID],

View File

@@ -11,11 +11,11 @@ import (
"sync/atomic"
"time"
eventbus "github.com/asaskevich/EventBus"
"github.com/facebookincubator/go-belt"
"github.com/facebookincubator/go-belt/tool/experimental/errmon"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/hashicorp/go-multierror"
"github.com/xaionaro-go/eventbus"
"github.com/xaionaro-go/observability"
"github.com/xaionaro-go/player/pkg/player"
"github.com/xaionaro-go/streamctl/pkg/chatmessagesstorage"
@@ -92,7 +92,7 @@ type StreamD struct {
StreamStatusCache *memoize.MemoizeData
OBSState OBSState
EventBus eventbus.Bus
EventBus *eventbus.EventBus
TimersLocker xsync.Mutex
NextTimerID uint64
@@ -259,7 +259,7 @@ func (d *StreamD) secretsProviderUpdater(ctx context.Context) (_err error) {
logger.Debugf(ctx, "secretsProviderUpdater")
defer logger.Debugf(ctx, "/secretsProviderUpdater: %v", _err)
cfgChangeCh, err := eventSubToChan[api.DiffConfig](ctx, d, nil)
cfgChangeCh, err := eventSubToChan[api.DiffConfig](ctx, d.EventBus, 1000, nil)
if err != nil {
return fmt.Errorf("unable to subscribe to config changes: %w", err)
}
@@ -302,7 +302,7 @@ func (d *StreamD) initStreamServer(ctx context.Context) (_err error) {
//newBrowserOpenerAdapter(d),
)
assert(d.StreamServer != nil)
defer d.publishEvent(ctx, api.DiffStreamServers{})
defer publishEvent(ctx, d.EventBus, api.DiffStreamServers{})
return d.StreamServer.Init(
ctx,
sstypes.InitOptionDefaultStreamPlayerOptions(d.streamPlayerOptions()),
@@ -566,9 +566,9 @@ func (d *StreamD) setConfig(ctx context.Context, cfg *config.Config) (_ret error
}
if !dashboardCfgEqual {
logger.Debugf(ctx, "dashboard config changed")
d.publishEvent(ctx, api.DiffDashboard{})
publishEvent(ctx, d.EventBus, api.DiffDashboard{})
}
d.publishEvent(ctx, api.DiffConfig{})
publishEvent(ctx, d.EventBus, api.DiffConfig{})
}()
logger.Debugf(ctx, "SetConfig: %#+v", *cfg)
@@ -648,7 +648,7 @@ func (d *StreamD) StartStream(
logger.Debugf(ctx, "StartStream(%s)", platID)
return xsync.RDoR1(ctx, &d.ControllersLocker, func() error {
defer func() { logger.Debugf(ctx, "/StartStream(%s): %v", platID, _err) }()
defer d.publishEvent(ctx, api.DiffStreams{})
defer publishEvent(ctx, d.EventBus, api.DiffStreams{})
defer func() {
d.StreamStatusCache.InvalidateCache(ctx)
@@ -776,7 +776,7 @@ func (d *StreamD) EndStream(ctx context.Context, platID streamcontrol.PlatformNa
logger.Debugf(ctx, "EndStream(ctx, '%s')", platID)
defer logger.Debugf(ctx, "/EndStream(ctx, '%s')", platID)
defer d.publishEvent(ctx, api.DiffStreams{})
defer publishEvent(ctx, d.EventBus, api.DiffStreams{})
return xsync.RDoR1(ctx, &d.ControllersLocker, func() error {
defer d.StreamStatusCache.InvalidateCache(ctx)
@@ -902,6 +902,7 @@ func (d *StreamD) streamController(
ctx context.Context,
platID streamcontrol.PlatformName,
) (streamcontrol.AbstractStreamController, error) {
ctx = belt.WithField(ctx, "controller", platID)
var result streamcontrol.AbstractStreamController
switch platID {
case obs.ID:
@@ -978,7 +979,7 @@ func (d *StreamD) SetTitle(
platID streamcontrol.PlatformName,
title string,
) error {
defer d.publishEvent(ctx, api.DiffStreams{})
defer publishEvent(ctx, d.EventBus, api.DiffStreams{})
return xsync.RDoR1(ctx, &d.ControllersLocker, func() error {
c, err := d.streamController(ctx, platID)
@@ -995,7 +996,7 @@ func (d *StreamD) SetDescription(
platID streamcontrol.PlatformName,
description string,
) error {
defer d.publishEvent(ctx, api.DiffStreams{})
defer publishEvent(ctx, d.EventBus, api.DiffStreams{})
return xsync.RDoR1(ctx, &d.ControllersLocker, func() error {
c, err := d.streamController(ctx, platID)
@@ -1018,7 +1019,7 @@ func (d *StreamD) ApplyProfile(
profile streamcontrol.AbstractStreamProfile,
customArgs ...any,
) error {
defer d.publishEvent(ctx, api.DiffStreams{})
defer publishEvent(ctx, d.EventBus, api.DiffStreams{})
return xsync.RDoR1(ctx, &d.ControllersLocker, func() error {
c, err := d.streamController(d.ctxForController(ctx), platID)
@@ -1037,7 +1038,7 @@ func (d *StreamD) UpdateStream(
profile streamcontrol.AbstractStreamProfile,
customArgs ...any,
) error {
defer d.publishEvent(ctx, api.DiffStreams{})
defer publishEvent(ctx, d.EventBus, api.DiffStreams{})
return xsync.RDoR1(ctx, &d.ControllersLocker, func() error {
err := d.SetTitle(d.ctxForController(ctx), platID, title)
@@ -1160,7 +1161,7 @@ func (d *StreamD) StartStreamServer(
) error {
logger.Debugf(ctx, "StartStreamServer")
defer logger.Debugf(ctx, "/StartStreamServer")
defer d.publishEvent(ctx, api.DiffStreamServers{})
defer publishEvent(ctx, d.EventBus, api.DiffStreamServers{})
return xsync.DoR1(ctx, &d.StreamServerLocker, func() error {
if d.StreamServer == nil {
@@ -1204,7 +1205,7 @@ func (d *StreamD) StopStreamServer(
) error {
logger.Debugf(ctx, "StopStreamServer")
defer logger.Debugf(ctx, "/StopStreamServer")
defer d.publishEvent(ctx, api.DiffStreamServers{})
defer publishEvent(ctx, d.EventBus, api.DiffStreamServers{})
return xsync.DoR1(ctx, &d.StreamServerLocker, func() error {
if d.StreamServer == nil {
@@ -1235,7 +1236,7 @@ func (d *StreamD) AddIncomingStream(
) error {
logger.Debugf(ctx, "AddIncomingStream")
defer logger.Debugf(ctx, "/AddIncomingStream")
defer d.publishEvent(ctx, api.DiffIncomingStreams{})
defer publishEvent(ctx, d.EventBus, api.DiffIncomingStreams{})
return xsync.DoR1(ctx, &d.StreamServerLocker, func() error {
if d.StreamServer == nil {
@@ -1261,7 +1262,7 @@ func (d *StreamD) RemoveIncomingStream(
) error {
logger.Debugf(ctx, "RemoveIncomingStream")
defer logger.Debugf(ctx, "/RemoveIncomingStream")
defer d.publishEvent(ctx, api.DiffIncomingStreams{})
defer publishEvent(ctx, d.EventBus, api.DiffIncomingStreams{})
return xsync.DoR1(ctx, &d.StreamServerLocker, func() error {
if d.StreamServer == nil {
@@ -1357,7 +1358,7 @@ func (d *StreamD) AddStreamDestination(
) error {
logger.Debugf(ctx, "AddStreamDestination")
defer logger.Debugf(ctx, "/AddStreamDestination")
defer d.publishEvent(ctx, api.DiffStreamDestinations{})
defer publishEvent(ctx, d.EventBus, api.DiffStreamDestinations{})
return xsync.DoR1(ctx, &d.StreamServerLocker, func() error {
if d.StreamServer == nil {
@@ -1390,7 +1391,7 @@ func (d *StreamD) UpdateStreamDestination(
) error {
logger.Debugf(ctx, "UpdateStreamDestination")
defer logger.Debugf(ctx, "/UpdateStreamDestination")
defer d.publishEvent(ctx, api.DiffStreamDestinations{})
defer publishEvent(ctx, d.EventBus, api.DiffStreamDestinations{})
return xsync.DoR1(ctx, &d.StreamServerLocker, func() error {
if d.StreamServer == nil {
@@ -1421,7 +1422,7 @@ func (d *StreamD) RemoveStreamDestination(
) error {
logger.Debugf(ctx, "RemoveStreamDestination")
defer logger.Debugf(ctx, "/RemoveStreamDestination")
defer d.publishEvent(ctx, api.DiffStreamDestinations{})
defer publishEvent(ctx, d.EventBus, api.DiffStreamDestinations{})
return xsync.DoR1(ctx, &d.StreamServerLocker, func() error {
if d.StreamServer == nil {
@@ -1483,7 +1484,7 @@ func (d *StreamD) AddStreamForward(
) error {
logger.Debugf(ctx, "AddStreamForward")
defer logger.Debugf(ctx, "/AddStreamForward")
defer d.publishEvent(ctx, api.DiffStreamForwards{})
defer publishEvent(ctx, d.EventBus, api.DiffStreamForwards{})
return xsync.DoR1(ctx, &d.StreamServerLocker, func() error {
if d.StreamServer == nil {
@@ -1520,7 +1521,7 @@ func (d *StreamD) UpdateStreamForward(
) (_err error) {
logger.Debugf(ctx, "UpdateStreamForward")
defer func() { logger.Debugf(ctx, "/UpdateStreamForward: %v", _err) }()
defer d.publishEvent(ctx, api.DiffStreamForwards{})
defer publishEvent(ctx, d.EventBus, api.DiffStreamForwards{})
return xsync.DoR1(ctx, &d.StreamServerLocker, func() error {
if d.StreamServer == nil {
@@ -1554,7 +1555,7 @@ func (d *StreamD) RemoveStreamForward(
) error {
logger.Debugf(ctx, "RemoveStreamForward")
defer logger.Debugf(ctx, "/RemoveStreamForward")
defer d.publishEvent(ctx, api.DiffStreamForwards{})
defer publishEvent(ctx, d.EventBus, api.DiffStreamForwards{})
return xsync.DoR1(ctx, &d.StreamServerLocker, func() error {
if d.StreamServer == nil {
@@ -1619,7 +1620,7 @@ func (d *StreamD) AddStreamPlayer(
if d.StreamServer == nil {
return fmt.Errorf("stream server is not initialized")
}
defer d.publishEvent(ctx, api.DiffStreamPlayers{})
defer publishEvent(ctx, d.EventBus, api.DiffStreamPlayers{})
var result *multierror.Error
result = multierror.Append(result, d.StreamServer.AddStreamPlayer(
ctx,
@@ -1664,7 +1665,7 @@ func (d *StreamD) UpdateStreamPlayer(
if d.StreamServer == nil {
return fmt.Errorf("stream server is not initialized")
}
defer d.publishEvent(ctx, api.DiffStreamPlayers{})
defer publishEvent(ctx, d.EventBus, api.DiffStreamPlayers{})
var result *multierror.Error
result = multierror.Append(result, d.StreamServer.UpdateStreamPlayer(
ctx,
@@ -1687,7 +1688,7 @@ func (d *StreamD) RemoveStreamPlayer(
if d.StreamServer == nil {
return fmt.Errorf("stream server is not initialized")
}
defer d.publishEvent(ctx, api.DiffStreamPlayers{})
defer publishEvent(ctx, d.EventBus, api.DiffStreamPlayers{})
var result *multierror.Error
result = multierror.Append(result, d.StreamServer.RemoveStreamPlayer(
ctx,

61
pkg/streamd/subscribe.go Normal file
View File

@@ -0,0 +1,61 @@
package streamd
import (
"context"
"time"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/observability"
)
func autoResubscribe[T any](
ctx context.Context,
fn func(context.Context) (<-chan T, error),
) (<-chan T, <-chan struct{}, error) {
input, err := fn(ctx)
if err != nil {
return nil, nil, err
}
result := make(chan T, 1)
restartCh := make(chan struct{}, 1)
observability.Go(ctx, func(ctx context.Context) {
defer func() {
var sample T
logger.Debugf(ctx, "autoResubscribe[%T] handler is closed", sample)
}()
defer close(result)
defer close(restartCh)
for {
for {
var (
ev T
ok bool
)
select {
case <-ctx.Done():
return
case ev, ok = <-input:
}
if !ok {
logger.Debugf(ctx, "the input channel is closed; reconnect")
break
}
select {
case <-ctx.Done():
return
case result <- ev:
}
}
for {
input, err = fn(ctx)
if err == nil {
break
}
logger.Warnf(ctx, "unable to reconnect: %w")
time.Sleep(time.Second)
}
restartCh <- struct{}{}
}
})
return result, restartCh, nil
}

View File

@@ -6,6 +6,7 @@ import (
"fmt"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/eventbus"
"github.com/xaionaro-go/streamctl/pkg/streamd/api"
"github.com/xaionaro-go/streamctl/pkg/streamd/consts"
)
@@ -43,8 +44,10 @@ func (d *StreamD) GetVariableHash(
return hash, nil
}
func topicForVariable(key consts.VarKey) string {
return fmt.Sprintf("var:%s", key)
type subscriptionTopic string
func topicForVariable(key consts.VarKey) subscriptionTopic {
return subscriptionTopic(fmt.Sprintf("var:%s", key))
}
func (d *StreamD) SetVariable(
@@ -55,7 +58,12 @@ func (d *StreamD) SetVariable(
logger.Tracef(ctx, "SetVariable(ctx, '%s', value [len == %d])", key, len(value))
defer logger.Tracef(ctx, "/SetVariable(ctx, '%s', value [len == %d])", key, len(value))
d.Variables.Store(key, value)
d.EventBus.Publish(topicForVariable(key), value)
eventbus.SendEventWithCustomTopic(
ctx,
d.EventBus,
topicForVariable(key),
value,
)
return nil
}
@@ -63,5 +71,10 @@ func (d *StreamD) SubscribeToVariable(
ctx context.Context,
varKey consts.VarKey,
) (<-chan api.VariableValue, error) {
return eventSubToChanUsingTopic[api.VariableValue](ctx, d, nil, topicForVariable(varKey))
return eventSubToChanUsingTopic[subscriptionTopic, api.VariableValue](
ctx,
d.EventBus, 10,
nil,
topicForVariable(varKey),
)
}

View File

@@ -17,8 +17,8 @@ import (
"github.com/xaionaro-go/xsync"
)
const (
ChatLogSize = 20
var (
ChatLogSize = 35
)
type chatUIInterface interface {
@@ -67,7 +67,9 @@ func (p *Panel) getChatUIs(ctx context.Context) []chatUIInterface {
}
func (p *Panel) initChatMessagesHandler(ctx context.Context) error {
msgCh, err := p.StreamD.SubscribeToChatMessages(ctx, time.Now().Add(-7*24*time.Hour), ChatLogSize)
msgCh, _, err := autoResubscribe(ctx, func(ctx context.Context) (<-chan api.ChatMessage, error) {
return p.StreamD.SubscribeToChatMessages(ctx, time.Now().Add(-60*24*time.Hour), uint64(ChatLogSize))
})
if err != nil {
return fmt.Errorf("unable to subscribe to chat messages: %w", err)
}

View File

@@ -0,0 +1,8 @@
//go:build android
// +build android
package streampanel
func init() {
ChatLogSize = 20
}

View File

@@ -127,6 +127,7 @@ func (ui *chatUIAsText) Rebuild(
ui.newItem(ctx, itemIdx, msg)
}
})
ui.Text.Refresh()
if ui.OnAdd != nil {
ui.OnAdd(ctx, api.ChatMessage{})
}
@@ -153,6 +154,10 @@ func (ui *chatUIAsText) Append(
return
}
ui.newItem(ctx, itemIdx, *msg)
ui.Text.Refresh()
if ui.OnAdd != nil {
ui.OnAdd(ctx, *msg)
}
})
}

View File

@@ -221,6 +221,7 @@ func (p *monitorPage) startUpdatingNoLock(
}
observability.Go(ctx, func(ctx context.Context) {
defer logger.Debugf(ctx, "startUpdatingNoLock: the handler closed")
updateData := func() {
inStreams, err := streamD.ListIncomingStreams(ctx)
if err != nil {
@@ -232,12 +233,20 @@ func (p *monitorPage) startUpdatingNoLock(
}
updateData()
ch, err := streamD.SubscribeToIncomingStreamsChanges(ctx)
ch, restartCh, err := autoResubscribe(ctx, streamD.SubscribeToIncomingStreamsChanges)
if err != nil {
p.parent().DisplayError(err)
return
}
for range ch {
for {
var ok bool
select {
case _, ok = <-restartCh:
case _, ok = <-ch:
}
if !ok {
break
}
logger.Debugf(ctx, "got event IncomingStreamsChange")
updateData()
}

View File

@@ -525,7 +525,9 @@ func (p *Panel) startOAuthListenerForRemoteStreamD(
return fmt.Errorf("unable to start listener for OAuth responses: %w", err)
}
oauthURLChan, err := streamD.SubscribeToOAuthURLs(ctx, listenPort)
oauthURLChan, restartOAuthURLChan, err := autoResubscribe(ctx, func(ctx context.Context) (<-chan *streamd_grpc.OAuthRequest, error) {
return streamD.SubscribeToOAuthURLs(ctx, listenPort)
})
if err != nil {
cancelFn()
return fmt.Errorf("unable to subscribe to OAuth requests of streamd: %w", err)
@@ -541,6 +543,8 @@ func (p *Panel) startOAuthListenerForRemoteStreamD(
select {
case <-ctx.Done():
return
case <-restartOAuthURLChan:
continue
case req, ok := <-oauthURLChan:
logger.Debugf(ctx, "<-oauthURLChan")
if !ok {
@@ -1864,14 +1868,14 @@ func (p *Panel) subscribeUpdateControlPage(ctx context.Context) {
logger.Debugf(ctx, "subscribe to streams and config changes")
defer logger.Debugf(ctx, "/subscribe to streams and config changes")
chStreams, err := p.StreamD.SubscribeToStreamsChanges(ctx)
chStreams, restartChStreams, err := autoResubscribe(ctx, p.StreamD.SubscribeToStreamsChanges)
if err != nil {
p.DisplayError(err)
//return
}
// TODO: deduplicate with localConfigCacheUpdater
chConfigs, err := p.StreamD.SubscribeToConfigChanges(ctx)
chConfigs, restartChConfigs, err := autoResubscribe(ctx, p.StreamD.SubscribeToConfigChanges)
if err != nil {
p.DisplayError(err)
//return
@@ -1880,16 +1884,23 @@ func (p *Panel) subscribeUpdateControlPage(ctx context.Context) {
p.getUpdatedStatus(ctx)
observability.Go(ctx, func(ctx context.Context) {
defer logger.Debugf(ctx, "subscribeUpdateControlPage: the handler closed")
t := time.NewTicker(time.Second * 5)
defer t.Stop()
for {
var ok bool
select {
case <-ctx.Done():
return
case <-chStreams:
case <-chConfigs:
case _, ok = <-chStreams:
case _, ok = <-restartChStreams:
case _, ok = <-chConfigs:
case _, ok = <-restartChConfigs:
case <-t.C:
}
if !ok {
return
}
p.getUpdatedStatus(ctx)
}
})
@@ -2404,7 +2415,7 @@ func (p *Panel) localConfigCacheUpdater(ctx context.Context) (_err error) {
logger.Debugf(ctx, "localConfigCacheUpdater")
defer logger.Debugf(ctx, "/localConfigCacheUpdater: %v", _err)
cfgChangeCh, err := p.StreamD.SubscribeToConfigChanges(ctx)
cfgChangeCh, restartCfgChangeCh, err := autoResubscribe(ctx, p.StreamD.SubscribeToConfigChanges)
if err != nil {
return fmt.Errorf("unable to subscribe to config changes: %w", err)
}
@@ -2431,10 +2442,17 @@ func (p *Panel) localConfigCacheUpdater(ctx context.Context) (_err error) {
defer logger.Debugf(ctx, "/localConfigUpdaterLoop")
for {
var ok bool
select {
case <-ctx.Done():
return
case <-cfgChangeCh:
case _, ok = <-cfgChangeCh:
case _, ok = <-restartCfgChangeCh:
}
if !ok {
logger.Errorf(ctx, "the channel is closed")
return
}
newCfg, err := p.StreamD.GetConfig(ctx)
if err != nil {
logger.Errorf(ctx, "unable to get the new config: %v", err)
@@ -2452,7 +2470,6 @@ func (p *Panel) localConfigCacheUpdater(ctx context.Context) (_err error) {
observability.SecretsProviderFromCtx(ctx).(*observability.SecretsStaticProvider).ParseSecretsFrom(newCfg)
logger.Debugf(ctx, "updated the secrets")
}
}
})
return nil

View File

@@ -59,13 +59,26 @@ func (p *Panel) initRestreamPage(
}
updateData()
ch, err := p.StreamD.SubscribeToIncomingStreamsChanges(ctx)
ch, restartCh, err := autoResubscribe(ctx, p.StreamD.SubscribeToIncomingStreamsChanges)
if err != nil {
p.DisplayError(err)
return
}
for range ch {
var ok bool
select {
case _, ok = <-ch:
if ok {
logger.Debugf(ctx, "got event IncomingStreamsChange")
}
case _, ok = <-restartCh:
if ok {
logger.Debugf(ctx, "restarted SubscribeToIncomingStreamsChanges")
}
}
if !ok {
break
}
updateData()
}
})
@@ -81,18 +94,32 @@ func (p *Panel) initRestreamPage(
}
updateData()
ch, err := p.StreamD.SubscribeToStreamServersChanges(ctx)
ch, restartCh, err := autoResubscribe(ctx, p.StreamD.SubscribeToStreamServersChanges)
if err != nil {
p.DisplayError(err)
return
}
for range ch {
for {
var ok bool
select {
case _, ok = <-ch:
if ok {
logger.Debugf(ctx, "got event StreamServersChange")
}
case _, ok = <-restartCh:
if ok {
logger.Debugf(ctx, "restarted SubscribeToStreamServersChanges")
}
}
if !ok {
break
}
updateData()
}
})
observability.Go(ctx, func(ctx context.Context) {
defer logger.Debugf(ctx, "/SubscribeToStreamDestinationsChanges")
updateData := func() {
dsts, err := p.StreamD.ListStreamDestinations(ctx)
if err != nil {
@@ -103,18 +130,32 @@ func (p *Panel) initRestreamPage(
}
updateData()
ch, err := p.StreamD.SubscribeToStreamDestinationsChanges(ctx)
ch, restartCh, err := autoResubscribe(ctx, p.StreamD.SubscribeToStreamDestinationsChanges)
if err != nil {
p.DisplayError(err)
return
}
for range ch {
for {
var ok bool
select {
case _, ok = <-ch:
if ok {
logger.Debugf(ctx, "got event StreamDestinationsChange")
}
case _, ok = <-restartCh:
if ok {
logger.Debugf(ctx, "restarted SubscribeToStreamDestinationsChanges")
}
}
if !ok {
break
}
updateData()
}
})
observability.Go(ctx, func(ctx context.Context) {
defer logger.Debugf(ctx, "/SubscribeToStreamForwardsChanges")
updateData := func() {
streamFwds, err := p.StreamD.ListStreamForwards(ctx)
if err != nil {
@@ -125,18 +166,32 @@ func (p *Panel) initRestreamPage(
}
updateData()
ch, err := p.StreamD.SubscribeToStreamForwardsChanges(ctx)
ch, restartCh, err := autoResubscribe(ctx, p.StreamD.SubscribeToStreamForwardsChanges)
if err != nil {
p.DisplayError(err)
return
}
for range ch {
for {
var ok bool
select {
case _, ok = <-ch:
if ok {
logger.Debugf(ctx, "got event StreamForwardsChange")
}
case _, ok = <-restartCh:
if ok {
logger.Debugf(ctx, "restarted SubscribeToStreamForwardsChanges")
}
}
if !ok {
break
}
updateData()
}
})
observability.Go(ctx, func(ctx context.Context) {
defer logger.Debugf(ctx, "/SubscribeToStreamPlayersChanges")
updateData := func() {
streamPlayers, err := p.StreamD.ListStreamPlayers(ctx)
if err != nil {
@@ -147,13 +202,26 @@ func (p *Panel) initRestreamPage(
}
updateData()
ch, err := p.StreamD.SubscribeToStreamPlayersChanges(ctx)
ch, restartCh, err := autoResubscribe(ctx, p.StreamD.SubscribeToStreamPlayersChanges)
if err != nil {
p.DisplayError(err)
return
}
for range ch {
for {
var ok bool
select {
case _, ok = <-ch:
if ok {
logger.Debugf(ctx, "got event StreamPlayersChange")
}
case _, ok = <-restartCh:
if ok {
logger.Debugf(ctx, "restarted SubscribeToStreamPlayersChanges")
}
}
if !ok {
break
}
updateData()
}
})

View File

@@ -0,0 +1,61 @@
package streampanel
import (
"context"
"time"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/observability"
)
func autoResubscribe[T any](
ctx context.Context,
fn func(context.Context) (<-chan T, error),
) (<-chan T, <-chan struct{}, error) {
input, err := fn(ctx)
if err != nil {
return nil, nil, err
}
result := make(chan T, 1)
restartCh := make(chan struct{}, 1)
observability.Go(ctx, func(ctx context.Context) {
defer func() {
var sample T
logger.Debugf(ctx, "autoResubscribe[%T] handler is closed", sample)
}()
defer close(result)
defer close(restartCh)
for {
for {
var (
ev T
ok bool
)
select {
case <-ctx.Done():
return
case ev, ok = <-input:
}
if !ok {
logger.Debugf(ctx, "the input channel is closed; reconnect")
break
}
select {
case <-ctx.Done():
return
case result <- ev:
}
}
for {
input, err = fn(ctx)
if err == nil {
break
}
logger.Warnf(ctx, "unable to reconnect: %w")
time.Sleep(time.Second)
}
restartCh <- struct{}{}
}
})
return result, restartCh, nil
}