From 1004082fe4caebfa1bf7f907d41bbf1b8d056eed Mon Sep 17 00:00:00 2001 From: Dmitrii Okunev Date: Sat, 12 Jul 2025 22:21:28 +0100 Subject: [PATCH] Multiple updates --- cmd/streamd/main.go | 17 +- cmd/streampanel/FyneApp.toml | 2 +- cmd/streampanel/runtime.go | 3 + cmd/streampanel/streamd.go | 14 +- go.mod | 6 +- go.sum | 8 +- pkg/cert/generate_self_signed.go | 44 +++ pkg/chatmessagesstorage/load.go | 1 + pkg/ringbuffer/ring_buffer.go | 39 ++ pkg/streamcontrol/kick/kick.go | 2 + pkg/streamcontrol/stream_control.go | 14 + .../twitch/auth/access_token_app.go | 34 ++ .../twitch/auth/access_token_user.go | 43 +++ pkg/streamcontrol/twitch/auth/client_code.go | 181 +++++++++ pkg/streamcontrol/twitch/chat_client.go | 25 -- pkg/streamcontrol/twitch/chat_client_irc.go | 31 ++ pkg/streamcontrol/twitch/chat_handler.go | 105 +----- pkg/streamcontrol/twitch/chat_handler_irc.go | 126 +++++++ ...ndler_test.go => chat_handler_irc_test.go} | 23 +- pkg/streamcontrol/twitch/chat_handler_sub.go | 305 +++++++++++++++ .../twitch/cmd/chatlistener/main.go | 83 +++- pkg/streamcontrol/twitch/twitch.go | 353 +++++++----------- pkg/streamcontrol/twitch/types/config.go | 6 +- pkg/streamcontrol/youtube/chat_listener.go | 169 ++++----- .../youtube/chat_listener_obsolete.go | 158 ++++++++ pkg/streamcontrol/youtube/error.go | 29 ++ pkg/streamcontrol/youtube/youtube.go | 110 ++++-- pkg/streamcontrol/youtube/youtube_client.go | 16 + .../youtube/youtube_client_calc_points.go | 12 +- .../youtube/youtube_client_mock.go | 11 + pkg/streamcontrol/youtube/youtube_test.go | 25 ++ pkg/streamd/chat.go | 4 +- pkg/streamd/client/client.go | 7 +- pkg/streamd/events.go | 118 +++--- pkg/streamd/events_reflect.go | 31 -- pkg/streamd/image_taker.go | 10 +- pkg/streamd/server/grpc.go | 33 +- pkg/streamd/stream_controller.go | 4 + pkg/streamd/streamd.go | 51 +-- pkg/streamd/subscribe.go | 61 +++ pkg/streamd/variables.go | 21 +- pkg/streampanel/chat.go | 8 +- pkg/streampanel/chat_android.go | 8 + pkg/streampanel/chat_as_text.go | 5 + pkg/streampanel/monitor.go | 13 +- pkg/streampanel/panel.go | 63 ++-- pkg/streampanel/restream.go | 96 ++++- pkg/streampanel/subscribe.go | 61 +++ 48 files changed, 1930 insertions(+), 659 deletions(-) create mode 100644 pkg/cert/generate_self_signed.go create mode 100644 pkg/ringbuffer/ring_buffer.go create mode 100644 pkg/streamcontrol/twitch/auth/access_token_app.go create mode 100644 pkg/streamcontrol/twitch/auth/access_token_user.go create mode 100644 pkg/streamcontrol/twitch/auth/client_code.go delete mode 100644 pkg/streamcontrol/twitch/chat_client.go create mode 100644 pkg/streamcontrol/twitch/chat_client_irc.go create mode 100644 pkg/streamcontrol/twitch/chat_handler_irc.go rename pkg/streamcontrol/twitch/{chat_handler_test.go => chat_handler_irc_test.go} (75%) create mode 100644 pkg/streamcontrol/twitch/chat_handler_sub.go create mode 100644 pkg/streamcontrol/youtube/chat_listener_obsolete.go create mode 100644 pkg/streamcontrol/youtube/error.go create mode 100644 pkg/streamcontrol/youtube/youtube_test.go delete mode 100644 pkg/streamd/events_reflect.go create mode 100644 pkg/streamd/subscribe.go create mode 100644 pkg/streampanel/chat_android.go create mode 100644 pkg/streampanel/subscribe.go diff --git a/cmd/streamd/main.go b/cmd/streamd/main.go index 20254a5..0db2d23 100644 --- a/cmd/streamd/main.go +++ b/cmd/streamd/main.go @@ -2,8 +2,8 @@ package main import ( "context" + "crypto/tls" "log" - "net" "net/http" _ "net/http/pprof" "os" @@ -19,11 +19,13 @@ import ( "github.com/getsentry/sentry-go" grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" "github.com/spf13/pflag" + "github.com/xaionaro-go/eventbus" "github.com/xaionaro-go/grpcproxy/grpcproxyserver" "github.com/xaionaro-go/grpcproxy/protobuf/go/proxy_grpc" "github.com/xaionaro-go/obs-grpc-proxy/protobuf/go/obs_grpc" "github.com/xaionaro-go/observability" "github.com/xaionaro-go/streamctl/cmd/streamd/ui" + "github.com/xaionaro-go/streamctl/pkg/cert" "github.com/xaionaro-go/streamctl/pkg/streamcontrol" "github.com/xaionaro-go/streamctl/pkg/streamd" "github.com/xaionaro-go/streamctl/pkg/streamd/config" @@ -127,6 +129,8 @@ func main() { } defer belt.Flush(ctx) + eventbus.LoggingEnabled = true + configPathExpanded, err := xpath.Expand(*configPath) if err != nil { l.Fatalf("unable to get the path to the data file: %v", err) @@ -181,7 +185,16 @@ func main() { } }) - listener, err := net.Listen("tcp", *listenAddr) + cert, err := cert.GenerateSelfSignedForServer() + if err != nil { + logger.Panicf(ctx, "unable to generate the certificate: %v", err) + } + + listener, err := tls.Listen("tcp", *listenAddr, &tls.Config{ + Certificates: []tls.Certificate{cert}, + NextProtos: []string{"h2"}, + }) + //listener, err := net.Listen("tcp", *listenAddr) if err != nil { log.Fatalf("failed to listen: %v", err) } diff --git a/cmd/streampanel/FyneApp.toml b/cmd/streampanel/FyneApp.toml index 8774f7e..d9c47e1 100755 --- a/cmd/streampanel/FyneApp.toml +++ b/cmd/streampanel/FyneApp.toml @@ -5,4 +5,4 @@ Website = "https://github.com/xaionaro/streamctl" Name = "streampanel" ID = "center.dx.streampanel" Version = "0.1.0" - Build = 432 + Build = 433 diff --git a/cmd/streampanel/runtime.go b/cmd/streampanel/runtime.go index 0017de3..223c3a4 100644 --- a/cmd/streampanel/runtime.go +++ b/cmd/streampanel/runtime.go @@ -16,6 +16,7 @@ import ( "github.com/facebookincubator/go-belt" "github.com/facebookincubator/go-belt/tool/logger" "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/xaionaro-go/eventbus" "github.com/xaionaro-go/observability" ) @@ -115,6 +116,8 @@ func initRuntime( seppukuIfMemHugeLeak(ctx) + eventbus.LoggingEnabled = true + ctx, cancelFn := context.WithCancel(ctx) return ctx, func() { defer belt.Flush(ctx) diff --git a/cmd/streampanel/streamd.go b/cmd/streampanel/streamd.go index cc25f4c..b7533cf 100644 --- a/cmd/streampanel/streamd.go +++ b/cmd/streampanel/streamd.go @@ -3,6 +3,7 @@ package main import ( "bytes" "context" + "crypto/tls" "encoding/gob" "fmt" "net" @@ -20,6 +21,7 @@ import ( "github.com/xaionaro-go/obs-grpc-proxy/protobuf/go/obs_grpc" "github.com/xaionaro-go/observability" "github.com/xaionaro-go/streamctl/cmd/streamd/ui" + "github.com/xaionaro-go/streamctl/pkg/cert" "github.com/xaionaro-go/streamctl/pkg/mainprocess" "github.com/xaionaro-go/streamctl/pkg/streamcontrol" "github.com/xaionaro-go/streamctl/pkg/streamd" @@ -257,7 +259,17 @@ func initGRPCServers( ) (net.Listener, *grpc.Server, *server.GRPCServer, obs_grpc.OBSServer, proxy_grpc.NetworkProxyServer) { logger.Debugf(ctx, "initGRPCServers") defer logger.Debugf(ctx, "/initGRPCServers") - listener, err := net.Listen("tcp", listenAddr) + + cert, err := cert.GenerateSelfSignedForServer() + if err != nil { + logger.Panicf(ctx, "unable to generate the certificate: %v", err) + } + + logger.Debugf(ctx, "generated certificate %#+v", cert) + listener, err := tls.Listen("tcp", listenAddr, &tls.Config{ + Certificates: []tls.Certificate{cert}, + NextProtos: []string{"h2"}, + }) if err != nil { logger.Panicf(ctx, "failed to listen: %v", err) } diff --git a/go.mod b/go.mod index 1be6146..efeeefc 100755 --- a/go.mod +++ b/go.mod @@ -27,8 +27,6 @@ replace github.com/asticode/go-astiav v0.36.0 => github.com/xaionaro-go/astiav v replace github.com/bluenviron/mediacommon/v2 v2.0.1-0.20250324151931-b8ce69d15d3d => github.com/xaionaro-go/mediacommon/v2 v2.0.0-20250420012906-03d6d69ac3b7 -replace github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef => github.com/mysteriumnetwork/EventBus v0.0.0-20220414214953-84469ec2b111 - require ( github.com/facebookincubator/go-belt v0.0.0-20250308011339-62fb7027b11f github.com/go-git/go-billy/v5 v5.6.2 @@ -36,6 +34,7 @@ require ( github.com/hashicorp/go-multierror v1.1.1 github.com/nicklaw5/helix/v2 v2.30.1-0.20240715193454-0151ccccf980 github.com/spf13/cobra v1.8.1 + github.com/xaionaro-go/eventbus v0.0.0-20250712221024-f0986ef769fa github.com/xaionaro-go/logrustash v0.0.0-20240804141650-d48034780a5f // indirect golang.org/x/oauth2 v0.30.0 google.golang.org/api v0.239.0 @@ -256,7 +255,6 @@ require ( github.com/adeithe/go-twitch v0.3.1 github.com/andreykaipov/goobs v1.4.1 github.com/anthonynsimon/bild v0.14.0 - github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef github.com/asticode/go-astiav v0.36.0 github.com/bamiaux/rez v0.0.0-20170731184118-29f4463c688b github.com/bluenviron/gortsplib/v4 v4.12.4-0.20250324174248-61372cfa6800 @@ -326,6 +324,8 @@ require ( require ( github.com/BurntSushi/xgbutil v0.0.0-20190907113008-ad855c713046 github.com/cloudwego/eino-ext/components/model/openai v0.0.0-20250424061409-ccd60fbc7c1c + github.com/coder/websocket v1.8.13 + github.com/joeyak/go-twitch-eventsub/v3 v3.0.0 github.com/phuslu/goid v1.0.2 // indirect github.com/pion/datachannel v1.5.10 // indirect github.com/pion/dtls/v2 v2.2.12 // indirect diff --git a/go.sum b/go.sum index 95eb250..6020a1e 100755 --- a/go.sum +++ b/go.sum @@ -216,6 +216,8 @@ github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWH github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= +github.com/coder/websocket v1.8.13 h1:f3QZdXy7uGVz+4uCJy2nTZyM0yTBj8yANEHhqlXZ9FE= +github.com/coder/websocket v1.8.13/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs= github.com/coreos/go-oidc/v3 v3.11.0 h1:Ia3MxdwpSw702YW0xgfmP1GVCMA9aEFWu12XUZ3/OtI= github.com/coreos/go-oidc/v3 v3.11.0/go.mod h1:gE3LgjOgFoHi9a4ce4/tJczr0Ai2/BoDhf0r5lltWI0= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= @@ -613,6 +615,8 @@ github.com/jfreymuth/oggvorbis v1.0.5 h1:u+Ck+R0eLSRhgq8WTmffYnrVtSztJcYrl588DM4 github.com/jfreymuth/oggvorbis v1.0.5/go.mod h1:1U4pqWmghcoVsCJJ4fRBKv9peUJMBHixthRlBeD6uII= github.com/jfreymuth/vorbis v1.0.2 h1:m1xH6+ZI4thH927pgKD8JOH4eaGRm18rEE9/0WKjvNE= github.com/jfreymuth/vorbis v1.0.2/go.mod h1:DoftRo4AznKnShRl1GxiTFCseHr4zR9BN3TWXyuzrqQ= +github.com/joeyak/go-twitch-eventsub/v3 v3.0.0 h1:6BDgmYJynNDyCP7P+wM9jPQnE3leJAi58nohDnzliJ4= +github.com/joeyak/go-twitch-eventsub/v3 v3.0.0/go.mod h1:rpqOjYP1ftWDj3H4D8fA58AdOpkvK9YvODoduDpPCQU= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= @@ -741,8 +745,6 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= -github.com/mysteriumnetwork/EventBus v0.0.0-20220414214953-84469ec2b111 h1:7s+VqlctjdVjy1z0slV2giUawTnv1A6vWj9oKKfgPhI= -github.com/mysteriumnetwork/EventBus v0.0.0-20220414214953-84469ec2b111/go.mod h1:ef8wV5ITJhXSTG1sUkcHPAQF7lh83c7l875IvrYU7H0= github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJEU3ofeGjhHklVoIGuVj85JJwZ6kWPaJwCIxgnFmo= github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM= github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 h1:zYyBkD/k9seD2A7fsi6Oo2LfFZAehjjQMERAvZLEDnQ= @@ -1088,6 +1090,8 @@ github.com/xaionaro-go/avpipeline v0.0.0-20250525204026-17104bc4baca h1:Cls4rEim github.com/xaionaro-go/avpipeline v0.0.0-20250525204026-17104bc4baca/go.mod h1:LMh5Qi7cuntcktUezfA9toVCUCCsx9pjyGDWe9GLt9A= github.com/xaionaro-go/datacounter v1.0.4 h1:+QMZLmu73R5WGkQfUPwlXF/JFN+Weo4iuDZkiL2wVm8= github.com/xaionaro-go/datacounter v1.0.4/go.mod h1:Sf9vBevuV6w5iE6K3qJ9pWVKcyS60clWBUSQLjt5++c= +github.com/xaionaro-go/eventbus v0.0.0-20250712221024-f0986ef769fa h1:p4rzmAuKfYTgoHp4yL3qIL3Js4P3I91yhe654L4gnFo= +github.com/xaionaro-go/eventbus v0.0.0-20250712221024-f0986ef769fa/go.mod h1:zSbWHZpDvsRhjD3Sr3bruqqsWotjXvsIKmx6/THwXFw= github.com/xaionaro-go/fyne/v2 v2.0.0-20250622004601-3a26ee69528a h1:awMQXlaweeiSZB4rSNfMmJGJriyn1ca/m/lglBi9uyA= github.com/xaionaro-go/fyne/v2 v2.0.0-20250622004601-3a26ee69528a/go.mod h1:0GOXKqyvNwk3DLmsFu9v0oYM0ZcD1ysGnlHCerKoAmo= github.com/xaionaro-go/go-rtmp v0.0.0-20241009130244-1e3160f27f42 h1:izCjREd+62HDF9FRYqUI7dgJNdUxAIysEuqed8lBcDY= diff --git a/pkg/cert/generate_self_signed.go b/pkg/cert/generate_self_signed.go new file mode 100644 index 0000000..9ee0a80 --- /dev/null +++ b/pkg/cert/generate_self_signed.go @@ -0,0 +1,44 @@ +package cert + +import ( + "crypto/ed25519" + "crypto/rand" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "math/big" + "time" +) + +func GenerateSelfSignedForServer() (tls.Certificate, error) { + pub, priv, err := ed25519.GenerateKey(rand.Reader) + if err != nil { + return tls.Certificate{}, err + } + + tmpl := x509.Certificate{ + SerialNumber: big.NewInt(1), + Subject: pkix.Name{ + Organization: []string{"DX.center"}, + }, + NotBefore: time.Now(), + NotAfter: time.Now().Add(10 * 365 * 24 * time.Hour), + + KeyUsage: x509.KeyUsageDigitalSignature, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, + DNSNames: []string{"wingout.dx.center"}, + } + + certDER, err := x509.CreateCertificate(rand.Reader, &tmpl, &tmpl, pub, priv) + if err != nil { + return tls.Certificate{}, err + } + + keyBytes, err := x509.MarshalPKCS8PrivateKey(priv) + + certPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certDER}) + keyPEM := pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: keyBytes}) + + return tls.X509KeyPair(certPEM, keyPEM) +} diff --git a/pkg/chatmessagesstorage/load.go b/pkg/chatmessagesstorage/load.go index 8c680d2..7c460ae 100644 --- a/pkg/chatmessagesstorage/load.go +++ b/pkg/chatmessagesstorage/load.go @@ -33,6 +33,7 @@ func (s *ChatMessagesStorage) loadLocked(ctx context.Context) (_err error) { if err != nil { return fmt.Errorf("unable to parse file '%s': %w", s.FilePath, err) } + logger.Debugf(ctx, "loaded %d messages", len(s.Messages)) s.sortAndDeduplicateAndTruncate(ctx) return nil } diff --git a/pkg/ringbuffer/ring_buffer.go b/pkg/ringbuffer/ring_buffer.go new file mode 100644 index 0000000..c774f31 --- /dev/null +++ b/pkg/ringbuffer/ring_buffer.go @@ -0,0 +1,39 @@ +package ringbuffer + +import ( + "context" + "slices" + + "github.com/xaionaro-go/xsync" +) + +type RingBuffer[T comparable] struct { + Storage []T + CurrentWriteIndex uint + Locker xsync.Mutex +} + +func New[T comparable](size uint) *RingBuffer[T] { + return &RingBuffer[T]{ + Storage: make([]T, 0, size), + } +} + +func (r *RingBuffer[T]) Add(item T) { + r.Locker.Do(context.TODO(), func() { + if r.CurrentWriteIndex >= uint(len(r.Storage)) { + r.Storage = r.Storage[:len(r.Storage)+1] + } + r.Storage[r.CurrentWriteIndex] = item + r.CurrentWriteIndex++ + if r.CurrentWriteIndex >= uint(cap(r.Storage)) { + r.CurrentWriteIndex = 0 + } + }) +} + +func (r *RingBuffer[T]) Contains(item T) bool { + return xsync.DoR1(context.TODO(), &r.Locker, func() bool { + return slices.Contains(r.Storage, item) + }) +} diff --git a/pkg/streamcontrol/kick/kick.go b/pkg/streamcontrol/kick/kick.go index 5ad3dff..90b6a2a 100644 --- a/pkg/streamcontrol/kick/kick.go +++ b/pkg/streamcontrol/kick/kick.go @@ -11,6 +11,7 @@ import ( http "github.com/Danny-Dasilva/fhttp" "github.com/davecgh/go-spew/spew" + "github.com/facebookincubator/go-belt" "github.com/facebookincubator/go-belt/tool/logger" "github.com/google/uuid" "github.com/scorfly/gokick" @@ -64,6 +65,7 @@ func New( cfg Config, saveCfgFn func(Config) error, ) (*Kick, error) { + ctx = belt.WithField(ctx, "controller", ID) if cfg.Config.Channel == "" { return nil, fmt.Errorf("channel is not set") } diff --git a/pkg/streamcontrol/stream_control.go b/pkg/streamcontrol/stream_control.go index 08876b7..bb2faae 100644 --- a/pkg/streamcontrol/stream_control.go +++ b/pkg/streamcontrol/stream_control.go @@ -115,6 +115,20 @@ type ChatMessage struct { Username string MessageID ChatMessageID Message string + Paid Money +} + +type Currency int + +const ( + CurrencyNone = Currency(iota) + CurrencyUSD + CurrencyOther +) + +type Money struct { + Currency Currency + Amount float64 } type StreamControllerCommons interface { diff --git a/pkg/streamcontrol/twitch/auth/access_token_app.go b/pkg/streamcontrol/twitch/auth/access_token_app.go new file mode 100644 index 0000000..278ff16 --- /dev/null +++ b/pkg/streamcontrol/twitch/auth/access_token_app.go @@ -0,0 +1,34 @@ +package auth + +import ( + "context" + "fmt" + + "github.com/facebookincubator/go-belt/tool/logger" + "github.com/nicklaw5/helix/v2" + "github.com/xaionaro-go/streamctl/pkg/secret" +) + +func NewTokenByApp( + ctx context.Context, + client *helix.Client, +) (secret.String, error) { + logger.Debugf(ctx, "getNewTokenByApp") + defer func() { logger.Debugf(ctx, "/getNewTokenByApp") }() + + resp, err := client.RequestAppAccessToken(nil) + if err != nil { + return secret.New(""), fmt.Errorf("unable to get app access token: %w", err) + } + + if resp.ErrorStatus != 0 { + return secret.New(""), fmt.Errorf( + "unable to get app access token (the response contains an error): %d %v: %v", + resp.ErrorStatus, + resp.Error, + resp.ErrorMessage, + ) + } + + return secret.New(resp.Data.AccessToken), nil +} diff --git a/pkg/streamcontrol/twitch/auth/access_token_user.go b/pkg/streamcontrol/twitch/auth/access_token_user.go new file mode 100644 index 0000000..6c9bed6 --- /dev/null +++ b/pkg/streamcontrol/twitch/auth/access_token_user.go @@ -0,0 +1,43 @@ +package auth + +import ( + "context" + "fmt" + + "github.com/facebookincubator/go-belt/tool/logger" + "github.com/nicklaw5/helix/v2" + "github.com/xaionaro-go/observability" + "github.com/xaionaro-go/streamctl/pkg/secret" +) + +func NewTokenByUser( + ctx context.Context, + client *helix.Client, + clientCode secret.String, +) (secret.String, secret.String, error) { + logger.Debugf(ctx, "getNewTokenByUser") + defer func() { logger.Debugf(ctx, "/getNewTokenByUser") }() + + if clientCode.Get() == "" { + return secret.New(""), secret.New(""), fmt.Errorf("internal error: ClientCode is empty") + } + + logger.Debugf(ctx, "requesting user access token...") + resp, err := client.RequestUserAccessToken(clientCode.Get()) + if observability.IsOnInsecureDebug(ctx) { + logger.Debugf(ctx, "requesting user access token result: %#+v %v", resp, err) + } + if err != nil { + return secret.New(""), secret.New(""), fmt.Errorf("unable to get user access token: %w", err) + } + if resp.ErrorStatus != 0 { + return secret.New(""), secret.New(""), fmt.Errorf( + "unable to query: %d %v: %v", + resp.ErrorStatus, + resp.Error, + resp.ErrorMessage, + ) + } + + return secret.New(resp.Data.AccessToken), secret.New(resp.Data.RefreshToken), nil +} diff --git a/pkg/streamcontrol/twitch/auth/client_code.go b/pkg/streamcontrol/twitch/auth/client_code.go new file mode 100644 index 0000000..5f44fb8 --- /dev/null +++ b/pkg/streamcontrol/twitch/auth/client_code.go @@ -0,0 +1,181 @@ +package auth + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "github.com/facebookincubator/go-belt/tool/experimental/errmon" + "github.com/facebookincubator/go-belt/tool/logger" + "github.com/hashicorp/go-multierror" + "github.com/nicklaw5/helix/v2" + "github.com/xaionaro-go/observability" + "github.com/xaionaro-go/streamctl/pkg/oauthhandler" +) + +type OAuthHandler func(context.Context, oauthhandler.OAuthHandlerArgument) error + +func NewClientCode( + ctx context.Context, + clientID string, + oauthHandler OAuthHandler, + getOAuthListenPortsFn func() []uint16, + onNewClientCode func(string), +) (_err error) { + logger.Debugf(ctx, "getNewClientCode") + defer func() { logger.Debugf(ctx, "/getNewClientCode: %v", _err) }() + + if oauthHandler == nil { + oauthHandler = oauthhandler.OAuth2HandlerViaCLI + } + + ctx, ctxCancelFunc := context.WithCancel(ctx) + cancelFunc := func() { + logger.Debugf(ctx, "cancelling the context") + ctxCancelFunc() + } + + var errWg sync.WaitGroup + var resultErr error + errCh := make(chan error) + errWg.Add(1) + observability.Go(ctx, func(ctx context.Context) { + errWg.Done() + for err := range errCh { + errmon.ObserveErrorCtx(ctx, err) + resultErr = multierror.Append(resultErr, err) + } + }) + + alreadyListening := map[uint16]struct{}{} + var wg sync.WaitGroup + success := false + + startHandlerForPort := func(listenPort uint16) { + if _, ok := alreadyListening[listenPort]; ok { + return + } + alreadyListening[listenPort] = struct{}{} + + logger.Debugf(ctx, "starting the oauth handler at port %d", listenPort) + wg.Add(1) + { + listenPort := listenPort + observability.Go(ctx, func(ctx context.Context) { + defer func() { logger.Debugf(ctx, "ended the oauth handler at port %d", listenPort) }() + defer wg.Done() + authURL := GetAuthorizationURL( + &helix.AuthorizationURLParams{ + ResponseType: "code", // or "token" + Scopes: []string{ + "user:read:chat", + "chat:read", + "chat:edit", + "channel:manage:broadcast", + "moderator:manage:chat_messages", + "moderator:manage:banned_users", + }, + }, + clientID, + RedirectURI(listenPort), + ) + + arg := oauthhandler.OAuthHandlerArgument{ + AuthURL: authURL, + ListenPort: listenPort, + ExchangeFn: func(ctx context.Context, code string) (_err error) { + logger.Debugf(ctx, "ExchangeFn()") + defer func() { logger.Debugf(ctx, "/ExchangeFn(): %v", _err) }() + if code == "" { + return fmt.Errorf("code is empty") + } + onNewClientCode(code) + return nil + }, + } + + err := oauthHandler(ctx, arg) + if err != nil { + errCh <- fmt.Errorf("unable to get or exchange the oauth code to a token: %w", err) + return + } + cancelFunc() + success = true + }) + } + } + + // TODO: either support only one port as in New, or support multiple + // ports as we do below + getPortsFn := getOAuthListenPortsFn + if getPortsFn == nil { + return fmt.Errorf("the function GetOAuthListenPorts is not set") + } + + for _, listenPort := range getPortsFn() { + startHandlerForPort(listenPort) + } + + wg.Add(1) + observability.Go(ctx, func(ctx context.Context) { + defer wg.Done() + t := time.NewTicker(time.Second) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + } + ports := getPortsFn() + logger.Tracef(ctx, "oauth listener ports: %#+v", ports) + + for _, listenPort := range ports { + startHandlerForPort(listenPort) + } + } + }) + + observability.Go(ctx, func(ctx context.Context) { + wg.Wait() + close(errCh) + }) + <-ctx.Done() + logger.Debugf(ctx, "did successfully took a new client code? -- %v", success) + if !success { + errWg.Wait() + return resultErr + } + return nil +} + +func RedirectURI(listenPort uint16) string { + return fmt.Sprintf("http://localhost:%d/", listenPort) +} + +func GetAuthorizationURL( + params *helix.AuthorizationURLParams, + clientID string, + redirectURI string, +) string { + url := helix.AuthBaseURL + "/authorize" + url += "?response_type=" + params.ResponseType + url += "&client_id=" + clientID + url += "&redirect_uri=" + redirectURI + + if params.State != "" { + url += "&state=" + params.State + } + + if params.ForceVerify { + url += "&force_verify=true" + } + + if len(params.Scopes) != 0 { + url += "&scope=" + strings.Join(params.Scopes, "%20") + } + + return url +} diff --git a/pkg/streamcontrol/twitch/chat_client.go b/pkg/streamcontrol/twitch/chat_client.go deleted file mode 100644 index 5145016..0000000 --- a/pkg/streamcontrol/twitch/chat_client.go +++ /dev/null @@ -1,25 +0,0 @@ -package twitch - -import ( - "github.com/adeithe/go-twitch" - "github.com/adeithe/go-twitch/irc" -) - -type chatClientImpl struct { - *irc.Client -} - -var _ ChatClient = (*chatClientImpl)(nil) - -func (c *chatClientImpl) Join(channelIDs ...string) error { - return c.Client.Join(channelIDs...) -} -func (c *chatClientImpl) OnShardMessage(callback func(shard int, msg irc.ChatMessage)) { - c.Client.OnShardMessage(callback) -} - -func newChatClient() *chatClientImpl { - return &chatClientImpl{ - Client: twitch.IRC(), - } -} diff --git a/pkg/streamcontrol/twitch/chat_client_irc.go b/pkg/streamcontrol/twitch/chat_client_irc.go new file mode 100644 index 0000000..df60f22 --- /dev/null +++ b/pkg/streamcontrol/twitch/chat_client_irc.go @@ -0,0 +1,31 @@ +package twitch + +import ( + "context" + + "github.com/adeithe/go-twitch" + "github.com/adeithe/go-twitch/irc" +) + +type chatClientIRC struct { + *irc.Client +} + +var _ ChatClientIRC = (*chatClientIRC)(nil) + +func (c *chatClientIRC) Join(channelIDs ...string) error { + return c.Client.Join(channelIDs...) +} +func (c *chatClientIRC) OnShardMessage(callback func(shard int, msg irc.ChatMessage)) { + c.Client.OnShardMessage(callback) +} +func (c *chatClientIRC) Close(ctx context.Context) error { + c.Client.Close() + return nil +} + +func newChatClientIRC() *chatClientIRC { + return &chatClientIRC{ + Client: twitch.IRC(), + } +} diff --git a/pkg/streamcontrol/twitch/chat_handler.go b/pkg/streamcontrol/twitch/chat_handler.go index e5e0563..81aa416 100644 --- a/pkg/streamcontrol/twitch/chat_handler.go +++ b/pkg/streamcontrol/twitch/chat_handler.go @@ -2,110 +2,11 @@ package twitch import ( "context" - "errors" - "fmt" - "sync" - "time" - "github.com/adeithe/go-twitch/irc" - "github.com/facebookincubator/go-belt/tool/logger" - "github.com/xaionaro-go/observability" "github.com/xaionaro-go/streamctl/pkg/streamcontrol" ) -type ChatClient interface { - Join(channelIDs ...string) error - OnShardMessage(func(shard int, msg irc.ChatMessage)) - Close() -} - -type ChatHandler struct { - client ChatClient - cancelFunc context.CancelFunc - waitGroup sync.WaitGroup - messagesInChan chan irc.ChatMessage - messagesOutChan chan streamcontrol.ChatMessage -} - -func NewChatHandler( - ctx context.Context, - channelID string, -) (*ChatHandler, error) { - var errs []error - for attempt := 0; attempt < 3; attempt++ { - h, err := newChatHandler(ctx, newChatClient(), channelID) - if err == nil { - return h, nil - } - err = fmt.Errorf("attempt #%d failed: %w", attempt, err) - logger.Errorf(ctx, "%v", err) - errs = append(errs, err) - time.Sleep(time.Second) - } - return nil, errors.Join(errs...) -} - -func newChatHandler( - ctx context.Context, - chatClient ChatClient, - channelID string, -) (*ChatHandler, error) { - err := chatClient.Join(channelID) - if err != nil { - return nil, fmt.Errorf("unable to join channel '%s': %w", channelID, err) - } - - ctx, cancelFn := context.WithCancel(ctx) - h := &ChatHandler{ - client: chatClient, - cancelFunc: cancelFn, - messagesInChan: make(chan irc.ChatMessage), - messagesOutChan: make(chan streamcontrol.ChatMessage, 100), - } - - h.waitGroup.Add(1) - observability.Go(ctx, func(ctx context.Context) { - defer h.waitGroup.Done() - defer func() { - h.client.Close() - // h.Client.Close above waits inside for everything to finish, - // so we can safely close the channel here: - close(h.messagesInChan) - close(h.messagesOutChan) - }() - for { - select { - case <-ctx.Done(): - return - case ev := <-h.messagesInChan: - select { - case h.messagesOutChan <- streamcontrol.ChatMessage{ - CreatedAt: ev.CreatedAt, - UserID: streamcontrol.ChatUserID(ev.Sender.Username), - Username: ev.Sender.Username, - MessageID: streamcontrol.ChatMessageID(ev.ID), - Message: ev.Text, // TODO: investigate if we need ev.IRCMessage.Text - }: - default: - } - } - } - }) - - chatClient.OnShardMessage(h.onShardMessage) - return h, nil -} - -func (h *ChatHandler) onShardMessage(shard int, msg irc.ChatMessage) { - h.messagesInChan <- msg -} - -func (h *ChatHandler) Close() error { - h.cancelFunc() - h.waitGroup.Wait() - return nil -} - -func (h *ChatHandler) MessagesChan() <-chan streamcontrol.ChatMessage { - return h.messagesOutChan +type ChatHandler interface { + Close(ctx context.Context) error + MessagesChan() <-chan streamcontrol.ChatMessage } diff --git a/pkg/streamcontrol/twitch/chat_handler_irc.go b/pkg/streamcontrol/twitch/chat_handler_irc.go new file mode 100644 index 0000000..4c2495c --- /dev/null +++ b/pkg/streamcontrol/twitch/chat_handler_irc.go @@ -0,0 +1,126 @@ +package twitch + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/adeithe/go-twitch/irc" + "github.com/facebookincubator/go-belt/tool/logger" + "github.com/xaionaro-go/observability" + "github.com/xaionaro-go/streamctl/pkg/streamcontrol" +) + +type ChatClientIRC interface { + Join(channelIDs ...string) error + OnShardMessage(func(shard int, msg irc.ChatMessage)) + Close(context.Context) error +} + +type ChatHandlerIRC struct { + client ChatClientIRC + cancelFunc context.CancelFunc + waitGroup sync.WaitGroup + messagesInChan chan irc.ChatMessage + messagesOutChan chan streamcontrol.ChatMessage +} + +var _ ChatHandler = (*ChatHandlerIRC)(nil) + +func NewChatHandlerIRC( + ctx context.Context, + channelID string, +) (_ret *ChatHandlerIRC, _err error) { + logger.Debugf(ctx, "NewChatHandlerIRC") + defer func() { logger.Debugf(ctx, "/NewChatHandlerIRC: %v", _err) }() + var errs []error + for attempt := 0; attempt < 3; attempt++ { + h, err := newChatHandlerIRC(ctx, newChatClientIRC(), channelID) + if err == nil { + return h, nil + } + err = fmt.Errorf("attempt #%d failed: %w", attempt, err) + logger.Errorf(ctx, "%v", err) + errs = append(errs, err) + time.Sleep(time.Second) + } + return nil, errors.Join(errs...) +} + +func newChatHandlerIRC( + ctx context.Context, + chatClient ChatClientIRC, + channelID string, +) (_ret *ChatHandlerIRC, _err error) { + logger.Debugf(ctx, "newChatHandlerIRC") + defer func() { logger.Debugf(ctx, "/newChatHandlerIRC: %v", _err) }() + err := chatClient.Join(channelID) + if err != nil { + return nil, fmt.Errorf("unable to join channel '%s': %w", channelID, err) + } + + ctx, cancelFn := context.WithCancel(ctx) + h := &ChatHandlerIRC{ + client: chatClient, + cancelFunc: cancelFn, + messagesInChan: make(chan irc.ChatMessage), + messagesOutChan: make(chan streamcontrol.ChatMessage, 100), + } + + h.waitGroup.Add(1) + observability.Go(ctx, func(ctx context.Context) { + defer logger.Debugf(ctx, "newChatHandlerIRC: closed") + defer h.waitGroup.Done() + defer func() { + h.client.Close(ctx) + // h.Client.Close above waits inside for everything to finish, + // so we can safely close the channel here: + close(h.messagesInChan) + close(h.messagesOutChan) + }() + for { + select { + case <-ctx.Done(): + logger.Debugf(ctx, "newChatHandlerIRC: closing: %v", ctx.Err()) + return + case ev, ok := <-h.messagesInChan: + if !ok { + logger.Debugf(ctx, "newChatHandlerIRC: input channel closed") + return + } + select { + case h.messagesOutChan <- streamcontrol.ChatMessage{ + CreatedAt: ev.CreatedAt, + UserID: streamcontrol.ChatUserID(ev.Sender.Username), + Username: ev.Sender.Username, + MessageID: streamcontrol.ChatMessageID(ev.ID), + Message: ev.Text, // TODO: investigate if we need ev.IRCMessage.Text + }: + default: + logger.Warnf(ctx, "the queue is full, skipping the message") + } + } + } + }) + + chatClient.OnShardMessage(h.onShardMessage) + return h, nil +} + +func (h *ChatHandlerIRC) onShardMessage(shard int, msg irc.ChatMessage) { + ctx := context.TODO() + logger.Debugf(ctx, "newChatHandlerIRC: onShardMessage") + h.messagesInChan <- msg +} + +func (h *ChatHandlerIRC) Close(ctx context.Context) error { + h.cancelFunc() + h.waitGroup.Wait() + return nil +} + +func (h *ChatHandlerIRC) MessagesChan() <-chan streamcontrol.ChatMessage { + return h.messagesOutChan +} diff --git a/pkg/streamcontrol/twitch/chat_handler_test.go b/pkg/streamcontrol/twitch/chat_handler_irc_test.go similarity index 75% rename from pkg/streamcontrol/twitch/chat_handler_test.go rename to pkg/streamcontrol/twitch/chat_handler_irc_test.go index a624d57..e59d251 100644 --- a/pkg/streamcontrol/twitch/chat_handler_test.go +++ b/pkg/streamcontrol/twitch/chat_handler_irc_test.go @@ -11,25 +11,25 @@ import ( "github.com/xaionaro-go/streamctl/pkg/streamcontrol" ) -type chatClientMock struct { +type chatClientIRCMock struct { join func(channelIDs ...string) error onShardMessage func(func(shard int, msg irc.ChatMessage)) - close func() + close func(ctx context.Context) error } -var _ ChatClient = (*chatClientMock)(nil) +var _ ChatClientIRC = (*chatClientIRCMock)(nil) -func (c *chatClientMock) Join(channelIDs ...string) error { +func (c *chatClientIRCMock) Join(channelIDs ...string) error { return c.join(channelIDs...) } -func (c *chatClientMock) OnShardMessage(callback func(shard int, msg irc.ChatMessage)) { +func (c *chatClientIRCMock) OnShardMessage(callback func(shard int, msg irc.ChatMessage)) { c.onShardMessage(callback) } -func (c *chatClientMock) Close() { - c.close() +func (c *chatClientIRCMock) Close(ctx context.Context) error { + return c.close(ctx) } -func TestChatHandler(t *testing.T) { +func TestChatHandlerIRC(t *testing.T) { ctx := context.TODO() const channelID = "test-channel-id" @@ -38,7 +38,7 @@ func TestChatHandler(t *testing.T) { callback func(shard int, msg irc.ChatMessage) closeCount = 0 ) - h, err := newChatHandler(ctx, &chatClientMock{ + h, err := newChatHandlerIRC(ctx, &chatClientIRCMock{ join: func(channelIDs ...string) error { joinedChannelIDs = append(joinedChannelIDs, channelIDs...) return nil @@ -46,8 +46,9 @@ func TestChatHandler(t *testing.T) { onShardMessage: func(_callback func(shard int, msg irc.ChatMessage)) { callback = _callback }, - close: func() { + close: func(ctx context.Context) error { closeCount++ + return nil }, }, channelID) require.NoError(t, err) @@ -84,7 +85,7 @@ func TestChatHandler(t *testing.T) { }) require.Equal(t, 0, closeCount) - h.Close() + h.Close(ctx) require.Equal(t, 1, closeCount) wg.Wait() } diff --git a/pkg/streamcontrol/twitch/chat_handler_sub.go b/pkg/streamcontrol/twitch/chat_handler_sub.go new file mode 100644 index 0000000..246a8e0 --- /dev/null +++ b/pkg/streamcontrol/twitch/chat_handler_sub.go @@ -0,0 +1,305 @@ +package twitch + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "time" + + "github.com/coder/websocket" + "github.com/facebookincubator/go-belt/tool/logger" + twitcheventsub "github.com/joeyak/go-twitch-eventsub/v3" + "github.com/nicklaw5/helix/v2" + "github.com/xaionaro-go/observability" + "github.com/xaionaro-go/streamctl/pkg/streamcontrol" +) + +type ChatHandlerSub struct { + client TwitchSubscriptionClient + wsConn *websocket.Conn + broadcasterID string + cancelFunc context.CancelFunc + waitGroup sync.WaitGroup + messagesOutChan chan streamcontrol.ChatMessage +} + +var _ ChatHandler = (*ChatHandlerSub)(nil) + +type TwitchSubscriptionClient interface { + CreateEventSubSubscription(payload *helix.EventSubSubscription) (*helix.EventSubSubscriptionsResponse, error) + GetUsers(params *helix.UsersParams) (*helix.UsersResponse, error) +} + +func NewChatHandlerSub( + ctx context.Context, + client TwitchSubscriptionClient, + broadcasterID string, + onClose func(context.Context), +) (_ret *ChatHandlerSub, _err error) { + logger.Debugf(ctx, "NewChatHandlerSub") + defer func() { logger.Debugf(ctx, "/NewChatHandlerSub: %v", _err) }() + + const urlString = "wss://eventsub.wss.twitch.tv/ws" + c, _, err := websocket.Dial(ctx, urlString, nil) + if err != nil { + return nil, fmt.Errorf("unable to initiate a websocket connection to '%s': %w", urlString, err) + } + + var myUserID string + { + resp, err := client.GetUsers(&helix.UsersParams{}) + if err != nil { + return nil, fmt.Errorf("unable to get my user info: %w", err) + } + if len(resp.Data.Users) != 1 { + return nil, fmt.Errorf("expected to get one user info, but received %d", len(resp.Data.Users)) + } + myUserID = resp.Data.Users[0].ID + logger.Debugf(ctx, "my user ID: %v", myUserID) + } + + ctx, cancelFn := context.WithCancel(ctx) + h := &ChatHandlerSub{ + client: client, + wsConn: c, + broadcasterID: broadcasterID, + cancelFunc: cancelFn, + messagesOutChan: make(chan streamcontrol.ChatMessage, 100), + } + defer func() { + if _err != nil { + _ = h.Close(ctx) + } + }() + + _, sessMsgBytes, err := c.Read(ctx) + if err != nil { + return nil, fmt.Errorf("unable to get my session ID: %w", err) + } + var sessMsg SessionWelcomeMessage + err = json.Unmarshal(sessMsgBytes, &sessMsg) + if err != nil { + return nil, fmt.Errorf("unable to deserialize the session ID message '%s': %w", sessMsgBytes, err) + } + sessID := sessMsg.Payload.Session.ID + logger.Debugf(ctx, "session ID: '%s' (%s)", sessID, sessMsgBytes) + + params := &helix.EventSubSubscription{ + Type: string(twitcheventsub.SubChannelChatMessage), + Version: "1", + Condition: helix.EventSubCondition{ + BroadcasterUserID: broadcasterID, + UserID: myUserID, + }, + Transport: helix.EventSubTransport{ + Method: "websocket", + SessionID: sessID, + }, + } + resp, err := client.CreateEventSubSubscription(params) + if err != nil { + return nil, fmt.Errorf("unable to create a subscription (%#+v): %w", params, err) + } + if resp.ErrorMessage != "" { + return nil, fmt.Errorf("got an error during subscription (%#+v): %s", params, resp.ErrorMessage) + } + + h.waitGroup.Add(1) + observability.Go(ctx, func(ctx context.Context) { + defer logger.Debugf(ctx, "NewChatHandlerSub: closed") + if onClose != nil { + defer onClose(ctx) + } + defer h.waitGroup.Done() + defer func() { + close(h.messagesOutChan) + }() + for { + select { + case <-ctx.Done(): + logger.Debugf(ctx, "NewChatHandlerSub: context closed: %v", ctx.Err()) + return + default: + } + _, messageSerialized, err := c.Read(ctx) + if err != nil { + logger.Errorf(ctx, "unable to read the message: %v", err) + return + } + var header struct { + Metadata MessageMetadata `json:"metadata"` + } + if err := json.Unmarshal(messageSerialized, &header); err != nil { + logger.Errorf(ctx, "unable to un-JSON-ize message '%s': %v", messageSerialized, err) + return + } + + t, err := time.Parse(time.RFC3339Nano, header.Metadata.MessageTimestamp) + if err != nil { + logger.Errorf(ctx, "unable to parse timestamp '%s': %v", header.Metadata.MessageTimestamp, err) + t = time.Now() + } + + var msgAbstract any + switch header.Metadata.MessageType { + case "session_welcome": + msgAbstract = &SessionWelcomeMessage{} + case "notification": + var msg NotificationMessage + err := json.Unmarshal(messageSerialized, &msg) + if err != nil { + logger.Errorf(ctx, "unable to unserialize the notification message '%s': %v", messageSerialized, err) + continue + } + logger.Tracef(ctx, "notification: %#+v", msg) + switch twitcheventsub.EventSubscription(msg.Payload.Subscription.Type) { + case twitcheventsub.SubChannelChatMessage: + var chatEvent ChatMessageEvent + err := json.Unmarshal(msg.Payload.Event, &chatEvent) + if err != nil { + logger.Errorf(ctx, "unable to unserialize the chat message '%s': %v", msg.Payload.Event, err) + continue + } + logger.Tracef(ctx, "chat message: %#+v", msg) + msg := streamcontrol.ChatMessage{ + CreatedAt: t, + UserID: streamcontrol.ChatUserID(chatEvent.ChatterUserID), + Username: chatEvent.ChatterUserName, + MessageID: streamcontrol.ChatMessageID(chatEvent.MessageID), + Message: chatEvent.Message.Text, + } + logger.Tracef(ctx, "resulting chat: %#+v", msg) + select { + case h.messagesOutChan <- msg: + default: + logger.Errorf(ctx, "the queue is full, have to drop %#+v", msg) + } + default: + logger.Warnf(ctx, "got an event on a channel I haven't subscribed to: '%s': %s", msg.Payload.Subscription.Type, messageSerialized) + } + continue + case "session_keepalive": + msgAbstract = &KeepaliveMessage{} + case "reconnect": + msgAbstract = &ReconnectMessage{} + case "revocation": + msgAbstract = &RevocationMessage{} + default: + logger.Debugf(ctx, "unknown message type: '%s': '%s'", header.Metadata.MessageType, messageSerialized) + continue + } + err = json.Unmarshal(messageSerialized, &msgAbstract) + if err != nil { + logger.Errorf(ctx, "unable to unserialize the %T message '%s': %v", msgAbstract, messageSerialized, err) + continue + } + logger.Tracef(ctx, "received %T: %#+v", msgAbstract, msgAbstract) + } + }) + + return h, nil +} + +func (h *ChatHandlerSub) Close(ctx context.Context) error { + h.cancelFunc() + return nil +} + +func (h *ChatHandlerSub) MessagesChan() <-chan streamcontrol.ChatMessage { + return h.messagesOutChan +} + +// Common metadata for all messages +type MessageMetadata struct { + MessageID string `json:"message_id"` + MessageType string `json:"message_type"` + MessageTimestamp string `json:"message_timestamp"` +} + +// Session (used in session_welcome and reconnect) +type Session struct { + ID string `json:"id"` + Status string `json:"status"` + KeepaliveTimeoutSeconds int `json:"keepalive_timeout_seconds"` + ReconnectURL *string `json:"reconnect_url"` // can be null + ConnectedAt string `json:"connected_at"` +} + +// Payload for session_welcome +type WelcomePayload struct { + Session Session `json:"session"` +} + +// Session Welcome message +type SessionWelcomeMessage struct { + Metadata MessageMetadata `json:"metadata"` + Payload WelcomePayload `json:"payload"` +} + +// Subscription info (used in notifications) +type Subscription struct { + ID string `json:"id"` + Type string `json:"type"` + Version string `json:"version"` + Status string `json:"status"` + Cost int `json:"cost"` + Condition map[string]string `json:"condition"` + CreatedAt string `json:"created_at"` + Transport struct { + Method string `json:"method"` + SessionID string `json:"session_id"` + } `json:"transport"` +} + +// Example: Channel Chat Message Event (payload.event for chat messages) +type ChatMessageEvent struct { + BroadcasterUserID string `json:"broadcaster_user_id"` + BroadcasterUserLogin string `json:"broadcaster_user_login"` + BroadcasterUserName string `json:"broadcaster_user_name"` + ChatterUserID string `json:"chatter_user_id"` + ChatterUserLogin string `json:"chatter_user_login"` + ChatterUserName string `json:"chatter_user_name"` + MessageID string `json:"message_id"` + Message struct { + Text string `json:"text"` + } `json:"message"` + Color string `json:"color"` + Badges []struct { + SetID string `json:"set_id"` + ID string `json:"id"` + Info string `json:"info"` + } `json:"badges"` +} + +type NotificationPayload struct { + Subscription Subscription `json:"subscription"` + Event json.RawMessage `json:"event"` +} + +type NotificationMessage struct { + Metadata MessageMetadata `json:"metadata"` + Payload NotificationPayload `json:"payload"` +} + +type KeepaliveMessage struct { + Metadata MessageMetadata `json:"metadata"` +} + +type ReconnectPayload struct { + Session Session `json:"session"` +} + +type ReconnectMessage struct { + Metadata MessageMetadata `json:"metadata"` + Payload ReconnectPayload `json:"payload"` +} + +type RevocationPayload struct { + Subscription Subscription `json:"subscription"` +} + +type RevocationMessage struct { + Metadata MessageMetadata `json:"metadata"` + Payload RevocationPayload `json:"payload"` +} diff --git a/pkg/streamcontrol/twitch/cmd/chatlistener/main.go b/pkg/streamcontrol/twitch/cmd/chatlistener/main.go index 7565c39..967bfcb 100644 --- a/pkg/streamcontrol/twitch/cmd/chatlistener/main.go +++ b/pkg/streamcontrol/twitch/cmd/chatlistener/main.go @@ -7,7 +7,15 @@ import ( "log" "os" + "github.com/facebookincubator/go-belt" + "github.com/facebookincubator/go-belt/tool/logger" + "github.com/facebookincubator/go-belt/tool/logger/implementation/zap" + "github.com/nicklaw5/helix/v2" + "github.com/xaionaro-go/observability" + "github.com/xaionaro-go/streamctl/pkg/oauthhandler" + "github.com/xaionaro-go/streamctl/pkg/secret" "github.com/xaionaro-go/streamctl/pkg/streamcontrol/twitch" + "github.com/xaionaro-go/streamctl/pkg/streamcontrol/twitch/auth" ) func assertNoError(err error) { @@ -18,10 +26,21 @@ func assertNoError(err error) { } func main() { - ctx := context.TODO() - flag.Usage = func() { - fmt.Fprintf(os.Stderr, "syntax: chatlistener \n") + l := zap.Default().WithLevel(logger.LevelTrace) + ctx := context.Background() + ctx = logger.CtxWithLogger(ctx, l) + ctx = observability.OnInsecureDebug(ctx) + logger.Default = func() logger.Logger { + return l } + defer belt.Flush(ctx) + oldUsage := flag.Usage + flag.Usage = func() { + fmt.Fprintf(os.Stderr, "syntax: chatlistener [options] \n") + oldUsage() + } + clientID := flag.String("client-id", "", "client ID for a WebSockets subscription (if not provided IRC will be used, instead)") + clientSecret := flag.String("client-secret", "", "client secret for a WebSockets subscription (if not provided IRC will be used, instead)") flag.Parse() if flag.NArg() != 1 { flag.Usage() @@ -29,7 +48,15 @@ func main() { } channelID := flag.Arg(0) - h, err := twitch.NewChatHandler(ctx, channelID) + var ( + h twitch.ChatHandler + err error + ) + if *clientID != "" && *clientSecret != "" { + h, err = newChatHandlerWebsockets(ctx, *clientID, *clientSecret, channelID) + } else { + h, err = twitch.NewChatHandlerIRC(ctx, channelID) + } assertNoError(err) fmt.Println("started") @@ -37,3 +64,51 @@ func main() { fmt.Printf("%#+v\n", ev) } } + +const oauthListenerPort = 8091 + +func newChatHandlerWebsockets( + ctx context.Context, + clientID string, + clientSecret string, + channelID string, +) (*twitch.ChatHandlerSub, error) { + options := &helix.Options{ + ClientID: clientID, + ClientSecret: clientSecret, + RedirectURI: auth.RedirectURI(oauthListenerPort), + } + client, err := helix.NewClientWithContext(ctx, options) + if err != nil { + return nil, fmt.Errorf("unable to create a helix client object: %w", err) + } + var clientCode secret.String + err = auth.NewClientCode( + ctx, + clientID, + oauthhandler.OAuth2HandlerViaBrowser, + func() []uint16 { + return []uint16{oauthListenerPort} + }, func(code string) { + clientCode.Set(code) + }, + ) + if err != nil { + return nil, fmt.Errorf("unable to get a client code: %w", err) + } + accessToken, refreshToken, err := auth.NewTokenByUser(ctx, client, clientCode) + client.SetUserAccessToken(accessToken.Get()) + client.SetRefreshToken(refreshToken.Get()) + userID, err := twitch.GetUserID(ctx, client, channelID) + if err != nil { + return nil, fmt.Errorf("unable to get the user ID for login '%s': %w", channelID, err) + } + if observability.IsOnInsecureDebug(ctx) { + logger.Tracef(ctx, "user access token: %v", accessToken.Get()) + } + h, err := twitch.NewChatHandlerSub(ctx, client, userID, nil) + if err != nil { + return nil, fmt.Errorf("unable to get a chat handler: %w", err) + } + return h, nil +} diff --git a/pkg/streamcontrol/twitch/twitch.go b/pkg/streamcontrol/twitch/twitch.go index b4e157f..adcddbf 100644 --- a/pkg/streamcontrol/twitch/twitch.go +++ b/pkg/streamcontrol/twitch/twitch.go @@ -9,31 +9,34 @@ import ( "unicode" "unicode/utf8" + "github.com/facebookincubator/go-belt" "github.com/facebookincubator/go-belt/tool/experimental/errmon" "github.com/facebookincubator/go-belt/tool/logger" "github.com/hashicorp/go-multierror" "github.com/nicklaw5/helix/v2" "github.com/xaionaro-go/observability" "github.com/xaionaro-go/streamctl/pkg/buildvars" - "github.com/xaionaro-go/streamctl/pkg/oauthhandler" + "github.com/xaionaro-go/streamctl/pkg/ringbuffer" "github.com/xaionaro-go/streamctl/pkg/secret" "github.com/xaionaro-go/streamctl/pkg/streamcontrol" + "github.com/xaionaro-go/streamctl/pkg/streamcontrol/twitch/auth" "github.com/xaionaro-go/xsync" ) type Twitch struct { - closeCtx context.Context - closeFn context.CancelFunc - chatHandler *ChatHandler - client *helix.Client - config Config - broadcasterID string - lazyInitOnce sync.Once - saveCfgFn func(Config) error - tokenLocker xsync.Mutex - prepareLocker xsync.Mutex - clientID string - clientSecret secret.String + closeCtx context.Context + closeFn context.CancelFunc + chatHandlerSub *ChatHandlerSub + chatHandlerIRC *ChatHandlerIRC + client *helix.Client + config Config + broadcasterID string + lazyInitOnce sync.Once + saveCfgFn func(Config) error + tokenLocker xsync.Mutex + prepareLocker xsync.Mutex + clientID string + clientSecret secret.String } const twitchDebug = false @@ -45,6 +48,7 @@ func New( cfg Config, saveCfgFn func(Config) error, ) (*Twitch, error) { + ctx = belt.WithField(ctx, "controller", ID) if cfg.Config.Channel == "" { return nil, fmt.Errorf("'channel' is not set") } @@ -84,11 +88,11 @@ func New( clientSecret: secret.New(clientSecret), } - h, err := NewChatHandler(ctx, cfg.Config.Channel) + h, err := NewChatHandlerIRC(ctx, cfg.Config.Channel) if err != nil { return nil, fmt.Errorf("unable to initialize a chat handler for channel '%s': %w", cfg.Config.Channel, err) } - t.chatHandler = h + t.chatHandlerIRC = h client, err := t.getClient(ctx, oauthPorts[0]) if err != nil { @@ -135,7 +139,7 @@ func New( return t, nil } -func getUserID( +func GetUserID( _ context.Context, client *helix.Client, login string, @@ -171,7 +175,7 @@ func (t *Twitch) prepareNoLock(ctx context.Context) error { if t.broadcasterID != "" { return } - t.broadcasterID, err = getUserID(ctx, t.client, t.config.Config.Channel) + t.broadcasterID, err = GetUserID(ctx, t.client, t.config.Config.Channel) if err != nil { logger.Errorf(ctx, "unable to get broadcaster ID: %v", err) return @@ -183,6 +187,20 @@ func (t *Twitch) prepareNoLock(ctx context.Context) error { t.config.Config.Channel, ) }) + + if t.chatHandlerSub == nil { + t.chatHandlerSub, err = NewChatHandlerSub( + t.closeCtx, t.client, t.broadcasterID, + func(ctx context.Context) { + t.prepareLocker.Do(ctx, func() { + t.chatHandlerSub = nil + }) + }, + ) + if err != nil { + logger.Errorf(ctx, "unable to initialize websockets based chat listener: %v", err) + } + } return err } @@ -528,146 +546,25 @@ func (t *Twitch) getNewToken( }) } -func authRedirectURI(listenPort uint16) string { - return fmt.Sprintf("http://localhost:%d/", listenPort) -} - func (t *Twitch) getNewClientCode( ctx context.Context, ) (_err error) { - logger.Debugf(ctx, "getNewClientCode") - defer func() { logger.Debugf(ctx, "/getNewClientCode: %v", _err) }() - - oauthHandler := t.config.Config.CustomOAuthHandler - if oauthHandler == nil { - oauthHandler = oauthhandler.OAuth2HandlerViaCLI - } - - ctx, ctxCancelFunc := context.WithCancel(ctx) - cancelFunc := func() { - logger.Debugf(ctx, "cancelling the context") - ctxCancelFunc() - } - - var errWg sync.WaitGroup - var resultErr error - errCh := make(chan error) - errWg.Add(1) - observability.Go(ctx, func(ctx context.Context) { - errWg.Done() - for err := range errCh { + return auth.NewClientCode( + ctx, + t.clientID, + t.config.Config.CustomOAuthHandler, + t.config.Config.GetOAuthListenPorts, + func(code string) { + t.config.Config.ClientCode.Set(code) + err := t.saveCfgFn(t.config) errmon.ObserveErrorCtx(ctx, err) - resultErr = multierror.Append(resultErr, err) - } - }) - - alreadyListening := map[uint16]struct{}{} - var wg sync.WaitGroup - success := false - - startHandlerForPort := func(listenPort uint16) { - if _, ok := alreadyListening[listenPort]; ok { - return - } - alreadyListening[listenPort] = struct{}{} - - logger.Debugf(ctx, "starting the oauth handler at port %d", listenPort) - wg.Add(1) - { - listenPort := listenPort - observability.Go(ctx, func(ctx context.Context) { - defer func() { logger.Debugf(ctx, "ended the oauth handler at port %d", listenPort) }() - defer wg.Done() - authURL := GetAuthorizationURL( - &helix.AuthorizationURLParams{ - ResponseType: "code", // or "token" - Scopes: []string{ - "channel:manage:broadcast", - "moderator:manage:chat_messages", - "moderator:manage:banned_users", - }, - }, - t.clientID, - authRedirectURI(listenPort), - ) - - arg := oauthhandler.OAuthHandlerArgument{ - AuthURL: authURL, - ListenPort: listenPort, - ExchangeFn: func(ctx context.Context, code string) (_err error) { - logger.Debugf(ctx, "ExchangeFn()") - defer func() { logger.Debugf(ctx, "/ExchangeFn(): %v", _err) }() - if code == "" { - return fmt.Errorf("code is empty") - } - t.config.Config.ClientCode.Set(code) - err := t.saveCfgFn(t.config) - errmon.ObserveErrorCtx(ctx, err) - return nil - }, - } - - err := oauthHandler(ctx, arg) - if err != nil { - errCh <- fmt.Errorf("unable to get or exchange the oauth code to a token: %w", err) - return - } - cancelFunc() - success = true - }) - } - } - - // TODO: either support only one port as in New, or support multiple - // ports as we do below - getPortsFn := t.config.Config.GetOAuthListenPorts - if getPortsFn == nil { - return fmt.Errorf("the function GetOAuthListenPorts is not set") - } - - for _, listenPort := range getPortsFn() { - startHandlerForPort(listenPort) - } - - wg.Add(1) - observability.Go(ctx, func(ctx context.Context) { - defer wg.Done() - t := time.NewTicker(time.Second) - defer t.Stop() - for { - select { - case <-ctx.Done(): - return - case <-t.C: - } - ports := getPortsFn() - logger.Tracef(ctx, "oauth listener ports: %#+v", ports) - - for _, listenPort := range ports { - startHandlerForPort(listenPort) - } - } - }) - - observability.Go(ctx, func(ctx context.Context) { - wg.Wait() - close(errCh) - }) - <-ctx.Done() - logger.Debugf(ctx, "did successfully took a new client code? -- %v", success) - if !success { - errWg.Wait() - return resultErr - } - return nil + }, + ) } func (t *Twitch) getNewTokenByUser( ctx context.Context, ) error { - logger.Debugf(ctx, "getNewTokenByUser") - defer func() { logger.Debugf(ctx, "/getNewTokenByUser") }() - if t.config.Config.ClientCode.Get() == "" { err := t.getNewClientCode(ctx) if err != nil { @@ -675,29 +572,17 @@ func (t *Twitch) getNewTokenByUser( } } - if t.config.Config.ClientCode.Get() == "" { - return fmt.Errorf("internal error: ClientCode is empty") + accessToken, refreshToken, err := auth.NewTokenByUser(ctx, t.client, t.config.Config.ClientCode) + if err != nil { + return fmt.Errorf("unable to get an access token: %w", err) } - logger.Debugf(ctx, "requesting user access token...") - resp, err := t.client.RequestUserAccessToken(t.config.Config.ClientCode.Get()) - logger.Debugf(ctx, "requesting user access token result: %#+v %v", resp, err) - if err != nil { - return fmt.Errorf("unable to get user access token: %w", err) - } - if resp.ErrorStatus != 0 { - return fmt.Errorf( - "unable to query: %d %v: %v", - resp.ErrorStatus, - resp.Error, - resp.ErrorMessage, - ) - } - t.client.SetUserAccessToken(resp.Data.AccessToken) - t.client.SetRefreshToken(resp.Data.RefreshToken) + logger.Debugf(ctx, "setting the user access token") + t.client.SetUserAccessToken(accessToken.Get()) + t.client.SetRefreshToken(refreshToken.Get()) t.config.Config.ClientCode.Set("") - t.config.Config.UserAccessToken.Set(resp.Data.AccessToken) - t.config.Config.RefreshToken.Set(resp.Data.RefreshToken) + t.config.Config.UserAccessToken = accessToken + t.config.Config.RefreshToken = refreshToken err = t.saveCfgFn(t.config) errmon.ObserveErrorCtx(ctx, err) return nil @@ -706,24 +591,13 @@ func (t *Twitch) getNewTokenByUser( func (t *Twitch) getNewTokenByApp( ctx context.Context, ) error { - logger.Debugf(ctx, "getNewTokenByApp") - defer func() { logger.Debugf(ctx, "/getNewTokenByApp") }() - - resp, err := t.client.RequestAppAccessToken(nil) + accessToken, err := auth.NewTokenByApp(ctx, t.client) if err != nil { - return fmt.Errorf("unable to get app access token: %w", err) - } - if resp.ErrorStatus != 0 { - return fmt.Errorf( - "unable to get app access token (the response contains an error): %d %v: %v", - resp.ErrorStatus, - resp.Error, - resp.ErrorMessage, - ) + return err } logger.Debugf(ctx, "setting the app access token") - t.client.SetAppAccessToken(resp.Data.AccessToken) - t.config.Config.AppAccessToken.Set(resp.Data.AccessToken) + t.client.SetAppAccessToken(accessToken.Get()) + t.config.Config.AppAccessToken = accessToken err = t.saveCfgFn(t.config) errmon.ObserveErrorCtx(ctx, err) return nil @@ -739,7 +613,7 @@ func (t *Twitch) getClient( options := &helix.Options{ ClientID: t.clientID, ClientSecret: t.clientSecret.Get(), - RedirectURI: authRedirectURI(oauthListenPort), // TODO: delete this hardcode + RedirectURI: auth.RedirectURI(oauthListenPort), // TODO: delete this hardcode } client, err := helix.NewClientWithContext(ctx, options) if err != nil { @@ -748,31 +622,6 @@ func (t *Twitch) getClient( return client, nil } -func GetAuthorizationURL( - params *helix.AuthorizationURLParams, - clientID string, - redirectURI string, -) string { - url := helix.AuthBaseURL + "/authorize" - url += "?response_type=" + params.ResponseType - url += "&client_id=" + clientID - url += "&redirect_uri=" + redirectURI - - if params.State != "" { - url += "&state=" + params.State - } - - if params.ForceVerify { - url += "&force_verify=true" - } - - if len(params.Scopes) != 0 { - url += "&scope=" + strings.Join(params.Scopes, "%20") - } - - return url -} - func (t *Twitch) GetAllCategories( ctx context.Context, ) ([]helix.Game, error) { @@ -837,22 +686,100 @@ func (t *Twitch) GetChatMessagesChan( logger.Debugf(ctx, "GetChatMessagesChan") defer func() { logger.Debugf(ctx, "/GetChatMessagesChan") }() + if err := t.prepare(ctx); err != nil { + logger.Errorf(ctx, "unable to prepare the client: %v", err) + } + outCh := make(chan streamcontrol.ChatMessage) + recentMsgIDs := ringbuffer.New[streamcontrol.ChatMessageID](10) + + sendEvent := func(ev streamcontrol.ChatMessage) { + recentMsgIDs.Add(ev.MessageID) + select { + case outCh <- ev: + default: + logger.Warnf(ctx, "the queue is full, dropping message %#+v", ev) + } + } + + alreadySeen := func(msgID streamcontrol.ChatMessageID) bool { + return recentMsgIDs.Contains(msgID) + } + observability.Go(ctx, func(ctx context.Context) { defer func() { logger.Debugf(ctx, "closing the messages channel") close(outCh) }() + var ( + chSub <-chan streamcontrol.ChatMessage + chIRC <-chan streamcontrol.ChatMessage + ) + t.prepareLocker.Do(ctx, func() { + if t.chatHandlerSub != nil { + chSub = t.chatHandlerSub.MessagesChan() + } + if t.chatHandlerIRC != nil { + chIRC = t.chatHandlerIRC.MessagesChan() + } + }) + logger.Debugf(ctx, "chSub == %p; chIRC == %p", chSub, chIRC) for { + if chSub == nil { + t.prepareLocker.Do(ctx, func() { + if t.chatHandlerSub != nil { + chSub = t.chatHandlerSub.MessagesChan() + } + }) + } + if chSub == nil && chIRC == nil { + logger.Debugf(ctx, "both channels are closed") + return + } select { case <-ctx.Done(): return - case ev, ok := <-t.chatHandler.MessagesChan(): + case ev, ok := <-chSub: if !ok { - logger.Debugf(ctx, "the input channel is closed") - return + chSub = nil + logger.Debugf(ctx, "the API receiver channel closed") + continue } - outCh <- ev + logger.Tracef(ctx, "received a message from API: %#+v", ev) + if alreadySeen(ev.MessageID) { + logger.Tracef(ctx, "already seen message %s", ev.MessageID) + continue + } + sendEvent(ev) + case evIRC, ok := <-chIRC: + if !ok { + chIRC = nil + logger.Debugf(ctx, "the IRC receiver channel closed") + continue + } + logger.Tracef(ctx, "received a message from IRC: %#+v", evIRC) + if alreadySeen(evIRC.MessageID) { + logger.Tracef(ctx, "already seen message %s", evIRC.MessageID) + continue + } + + // not previously seen message: + select { + case evSub, ok := <-chSub: + if !ok { + chSub = nil + break + } + logger.Tracef(ctx, "received a message from API: %#+v", evIRC) + sendEvent(evSub) + if alreadySeen(evIRC.MessageID) { + logger.Tracef(ctx, "the same message") + continue + } + case <-time.After(time.Second): + logger.Warnf(ctx, "received a message from IRC, but not from API") + } + sendEvent(evIRC) } } }) diff --git a/pkg/streamcontrol/twitch/types/config.go b/pkg/streamcontrol/twitch/types/config.go index 9683bc1..bacf818 100644 --- a/pkg/streamcontrol/twitch/types/config.go +++ b/pkg/streamcontrol/twitch/types/config.go @@ -1,17 +1,15 @@ package twitch import ( - "context" - "github.com/xaionaro-go/streamctl/pkg/buildvars" - "github.com/xaionaro-go/streamctl/pkg/oauthhandler" "github.com/xaionaro-go/streamctl/pkg/secret" streamctl "github.com/xaionaro-go/streamctl/pkg/streamcontrol" + "github.com/xaionaro-go/streamctl/pkg/streamcontrol/twitch/auth" ) const ID = streamctl.PlatformName("twitch") -type OAuthHandler func(context.Context, oauthhandler.OAuthHandlerArgument) error +type OAuthHandler = auth.OAuthHandler type PlatformSpecificConfig struct { Channel string diff --git a/pkg/streamcontrol/youtube/chat_listener.go b/pkg/streamcontrol/youtube/chat_listener.go index e4b499d..dbc957d 100644 --- a/pkg/streamcontrol/youtube/chat_listener.go +++ b/pkg/streamcontrol/youtube/chat_listener.go @@ -2,85 +2,45 @@ package youtube import ( "context" + "encoding/json" "errors" - "fmt" - "math/rand" - "net/http" - "net/url" "sync" "time" - ytchat "github.com/abhinavxd/youtube-live-chat-downloader/v2" "github.com/facebookincubator/go-belt/tool/logger" "github.com/xaionaro-go/observability" "github.com/xaionaro-go/streamctl/pkg/streamcontrol" + "google.golang.org/api/googleapi" + "google.golang.org/api/youtube/v3" ) -const youtubeWatchURLString = `https://www.youtube.com/watch` - -func chatCustomCookies() []*http.Cookie { - // borrowed from: https://github.com/abhinavxd/youtube-live-chat-downloader/blob/main/example/main.go - return []*http.Cookie{ - {Name: "PREF", - Value: "tz=Europe.Rome", - MaxAge: 300}, - {Name: "CONSENT", - Value: fmt.Sprintf("YES+yt.432048971.it+FX+%d", 100+rand.Intn(999-100+1)), - MaxAge: 300}, - } -} - -var youtubeWatchURL *url.URL - -func init() { - var err error - youtubeWatchURL, err = url.Parse(youtubeWatchURLString) - if err != nil { - panic(err) - } - - ytchat.AddCookies(chatCustomCookies()) -} - -func ytWatchURL(videoID string) *url.URL { - result := ptr(*youtubeWatchURL) - query := result.Query() - query.Add("v", videoID) - result.RawQuery = query.Encode() - return result -} - type ChatListener struct { - videoID string - continuationCode string - clientConfig ytchat.YtCfg - wg sync.WaitGroup - cancelFunc context.CancelFunc - messagesOutChan chan streamcontrol.ChatMessage + videoID string + liveChatID string + client YouTubeChatClient + + wg sync.WaitGroup + cancelFunc context.CancelFunc + messagesOutChan chan streamcontrol.ChatMessage +} + +type YouTubeChatClient interface { + GetLiveChatMessages(ctx context.Context, chatID string, pageToken string, parts []string) (*youtube.LiveChatMessageListResponse, error) } func NewChatListener( ctx context.Context, + ytClient YouTubeChatClient, videoID string, + liveChatID string, ) (*ChatListener, error) { - if videoID == "" { - return nil, fmt.Errorf("video ID is empty") - } - - watchURL := ytWatchURL(videoID) - - continuationCode, cfg, err := ytchat.ParseInitialData(watchURL.String()) - if err != nil { - return nil, fmt.Errorf("unable to fetch the initial data for chat messages retrieval (URL: %s): %w", watchURL, err) - } - ctx, cancelFunc := context.WithCancel(ctx) l := &ChatListener{ - videoID: videoID, - continuationCode: continuationCode, - clientConfig: cfg, - cancelFunc: cancelFunc, - messagesOutChan: make(chan streamcontrol.ChatMessage, 100), + videoID: videoID, + liveChatID: liveChatID, + client: ytClient, + cancelFunc: cancelFunc, + messagesOutChan: make(chan streamcontrol.ChatMessage, 100), } l.wg.Add(1) observability.Go(ctx, func(ctx context.Context) { @@ -90,52 +50,81 @@ func NewChatListener( close(l.messagesOutChan) }() err := l.listenLoop(ctx) - if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, ytchat.ErrLiveStreamOver) { + if err != nil && !errors.Is(err, context.Canceled) && !errors.As(err, &ErrChatEnded{}) { logger.Errorf(ctx, "the listener loop returned an error: %v", err) } }) + return l, nil } -const chatFetchRetryInterval = time.Second - func (l *ChatListener) listenLoop(ctx context.Context) (_err error) { logger.Debugf(ctx, "listenLoop") defer func() { logger.Debugf(ctx, "/listenLoop: %v", _err) }() + + var pageToken string for { - msgs, newContinuation, err := ytchat.FetchContinuationChat(l.continuationCode, l.clientConfig) - switch err { - case nil: - case ytchat.ErrLiveStreamOver: - return err + select { + case <-ctx.Done(): + return ctx.Err() default: - logger.Errorf( - ctx, - "unable to get a continuation for %v: %v; retrying in %v", - l.videoID, - chatFetchRetryInterval, - err, - ) - time.Sleep(chatFetchRetryInterval) + } + response, err := l.client.GetLiveChatMessages( + ctx, + l.liveChatID, + pageToken, + []string{"snippet", "authorDetails"}, + ) + if err != nil { + gErr := &googleapi.Error{} + if !errors.As(err, &gErr) { + logger.Warnf(ctx, "unable to get chat messages: %v", err) + continue + } + for _, e := range gErr.Errors { + switch e.Reason { + case "liveChatEnded": + return ErrChatEnded{ChatID: l.liveChatID} + case "liveChatDisabled": + return ErrChatDisabled{ChatID: l.liveChatID} + case "liveChatNotFound": + return ErrChatNotFound{ChatID: l.liveChatID} + } + } + b, _ := json.Marshal(err) + logger.Warnf(ctx, "unable to get chat messages: %v (%s)", err, b) continue } - l.continuationCode = newContinuation - for _, msg := range msgs { - l.messagesOutChan <- streamcontrol.ChatMessage{ - CreatedAt: msg.Timestamp, - UserID: streamcontrol.ChatUserID(msg.AuthorName), - Username: msg.AuthorName, - // TODO: find a way to extract the message ID, - // in the mean while we we use a soft key for that: - MessageID: streamcontrol.ChatMessageID(fmt.Sprintf("%s/%s", msg.AuthorName, msg.Message)), - Message: msg.Message, + for _, item := range response.Items { + publishedAt, err := ParseTimestamp(item.Snippet.PublishedAt) + if err != nil { + logger.Errorf(ctx, "unable to parse the timestamp '%s': %v", item.Snippet.PublishedAt, err) + } + msg := streamcontrol.ChatMessage{ + CreatedAt: publishedAt, + UserID: streamcontrol.ChatUserID(item.AuthorDetails.ChannelId), + Username: item.AuthorDetails.DisplayName, + MessageID: streamcontrol.ChatMessageID(item.Id), + Message: item.Snippet.DisplayMessage, + } + if item.Snippet.SuperChatDetails != nil { + msg.Paid.Currency = streamcontrol.CurrencyOther + msg.Paid.Amount = float64(item.Snippet.SuperChatDetails.AmountMicros) / 1000000 + } + select { + case l.messagesOutChan <- msg: + default: + logger.Errorf(ctx, "the queue is full, have to drop %#+v", msg) } } + pageToken = response.NextPageToken + + time.Sleep(time.Millisecond * time.Duration(response.PollingIntervalMillis)) } } -func (h *ChatListener) Close() error { +func (h *ChatListener) Close(ctx context.Context) error { h.cancelFunc() return nil } @@ -143,3 +132,7 @@ func (h *ChatListener) Close() error { func (h *ChatListener) MessagesChan() <-chan streamcontrol.ChatMessage { return h.messagesOutChan } + +func (h *ChatListener) GetVideoID() string { + return h.videoID +} diff --git a/pkg/streamcontrol/youtube/chat_listener_obsolete.go b/pkg/streamcontrol/youtube/chat_listener_obsolete.go new file mode 100644 index 0000000..cc32800 --- /dev/null +++ b/pkg/streamcontrol/youtube/chat_listener_obsolete.go @@ -0,0 +1,158 @@ +package youtube + +import ( + "context" + "errors" + "fmt" + "math/rand" + "net/http" + "net/url" + "sync" + "time" + + ytchat "github.com/abhinavxd/youtube-live-chat-downloader/v2" + "github.com/facebookincubator/go-belt/tool/logger" + "github.com/xaionaro-go/observability" + "github.com/xaionaro-go/streamctl/pkg/streamcontrol" +) + +const youtubeWatchURLString = `https://www.youtube.com/watch` + +func chatCustomCookies() []*http.Cookie { + // borrowed from: https://github.com/abhinavxd/youtube-live-chat-downloader/blob/main/example/main.go + return []*http.Cookie{ + {Name: "PREF", + Value: "tz=Europe.Rome", + MaxAge: 300}, + {Name: "CONSENT", + Value: fmt.Sprintf("YES+yt.432048971.it+FX+%d", 100+rand.Intn(999-100+1)), + MaxAge: 300}, + } +} + +var youtubeWatchURL *url.URL + +func init() { + var err error + youtubeWatchURL, err = url.Parse(youtubeWatchURLString) + if err != nil { + panic(err) + } + + ytchat.AddCookies(chatCustomCookies()) +} + +func ytWatchURL(videoID string) *url.URL { + result := ptr(*youtubeWatchURL) + query := result.Query() + query.Add("v", videoID) + result.RawQuery = query.Encode() + return result +} + +type ChatListenerOBSOLETE struct { + videoID string + continuationCode string + clientConfig ytchat.YtCfg + wg sync.WaitGroup + cancelFunc context.CancelFunc + messagesOutChan chan streamcontrol.ChatMessage +} + +func NewChatListenerOBSOLETE( + ctx context.Context, + videoID string, + onClose func(context.Context, *chatListener), +) (*ChatListenerOBSOLETE, error) { + if videoID == "" { + return nil, fmt.Errorf("video ID is empty") + } + + watchURL := ytWatchURL(videoID) + + continuationCode, cfg, err := ytchat.ParseInitialData(watchURL.String()) + if err != nil { + return nil, fmt.Errorf("unable to fetch the initial data for chat messages retrieval (URL: %s): %w", watchURL, err) + } + + ctx, cancelFunc := context.WithCancel(ctx) + l := &ChatListenerOBSOLETE{ + videoID: videoID, + continuationCode: continuationCode, + clientConfig: cfg, + cancelFunc: cancelFunc, + messagesOutChan: make(chan streamcontrol.ChatMessage, 100), + } + l.wg.Add(1) + observability.Go(ctx, func(ctx context.Context) { + defer l.wg.Done() + if onClose != nil { + defer onClose(ctx, l) + } + defer func() { + logger.Debugf(ctx, "the listener loop is finished") + close(l.messagesOutChan) + }() + err := l.listenLoop(ctx) + if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, ytchat.ErrLiveStreamOver) { + logger.Errorf(ctx, "the listener loop returned an error: %v", err) + } + }) + return l, nil +} + +const chatFetchRetryInterval = time.Second + +func (l *ChatListenerOBSOLETE) listenLoop(ctx context.Context) (_err error) { + logger.Debugf(ctx, "listenLoop") + defer func() { logger.Debugf(ctx, "/listenLoop: %v", _err) }() + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + msgs, newContinuation, err := ytchat.FetchContinuationChat(l.continuationCode, l.clientConfig) + switch err { + case nil: + case ytchat.ErrLiveStreamOver: + return err + default: + logger.Errorf( + ctx, + "unable to get a continuation for %v: %v; retrying in %v", + l.videoID, + chatFetchRetryInterval, + err, + ) + time.Sleep(chatFetchRetryInterval) + continue + } + l.continuationCode = newContinuation + + for _, msg := range msgs { + l.messagesOutChan <- streamcontrol.ChatMessage{ + CreatedAt: msg.Timestamp, + UserID: streamcontrol.ChatUserID(msg.AuthorName), + Username: msg.AuthorName, + // TODO: find a way to extract the message ID, + // in the mean while we we use a soft key for that: + MessageID: streamcontrol.ChatMessageID(fmt.Sprintf("%s/%s", msg.AuthorName, msg.Message)), + Message: msg.Message, + } + } + } +} + +func (h *ChatListenerOBSOLETE) Close(ctx context.Context) error { + h.cancelFunc() + return nil +} + +func (h *ChatListenerOBSOLETE) MessagesChan() <-chan streamcontrol.ChatMessage { + return h.messagesOutChan +} + +func (h *ChatListenerOBSOLETE) GetVideoID() string { + return h.videoID +} diff --git a/pkg/streamcontrol/youtube/error.go b/pkg/streamcontrol/youtube/error.go new file mode 100644 index 0000000..803bb92 --- /dev/null +++ b/pkg/streamcontrol/youtube/error.go @@ -0,0 +1,29 @@ +package youtube + +import ( + "fmt" +) + +type ErrChatNotFound struct { + ChatID string +} + +func (e ErrChatNotFound) Error() string { + return fmt.Sprintf("chat '%s' not found", e.ChatID) +} + +type ErrChatDisabled struct { + ChatID string +} + +func (e ErrChatDisabled) Error() string { + return fmt.Sprintf("chat '%s' is disabled", e.ChatID) +} + +type ErrChatEnded struct { + ChatID string +} + +func (e ErrChatEnded) Error() string { + return fmt.Sprintf("chat '%s' ended", e.ChatID) +} diff --git a/pkg/streamcontrol/youtube/youtube.go b/pkg/streamcontrol/youtube/youtube.go index 58884b6..2ef5bba 100644 --- a/pkg/streamcontrol/youtube/youtube.go +++ b/pkg/streamcontrol/youtube/youtube.go @@ -48,6 +48,8 @@ type YouTube struct { currentLiveBroadcastsLocker xsync.Mutex currentLiveBroadcasts []*youtube.LiveBroadcast + chatListeners map[string]*chatListener + messagesOutChan chan streamcontrol.ChatMessage } @@ -58,11 +60,14 @@ const ( debugUseMockClient = false ) +type chatListener = ChatListenerOBSOLETE + func New( ctx context.Context, cfg Config, saveCfgFn func(Config) error, ) (*YouTube, error) { + ctx = belt.WithField(ctx, "controller", ID) if cfg.Config.ClientID == "" || cfg.Config.ClientSecret.Get() == "" { return nil, fmt.Errorf( "'clientid' or/and 'clientsecret' is/are not set; go to https://console.cloud.google.com/apis/credentials and create an app if it not created, yet", @@ -76,6 +81,8 @@ func New( SaveConfigFunc: saveCfgFn, CancelFunc: cancelFn, + chatListeners: map[string]*chatListener{}, + messagesOutChan: make(chan streamcontrol.ChatMessage, 100), } @@ -955,7 +962,7 @@ func (yt *YouTube) StartStream( } } yt.currentLiveBroadcasts = append(yt.currentLiveBroadcasts, newBroadcast) - err = yt.startChatListener(ctx, newBroadcast.Id) + err = yt.startChatListener(ctx, newBroadcast) if err != nil { logger.Errorf(ctx, "unable to start a chat listener for video '%s': %v", newBroadcast.Id, err) } @@ -979,21 +986,38 @@ func setProfile(broadcast *youtube.LiveBroadcast, profile StreamProfile) { func (yt *YouTube) startChatListener( ctx context.Context, - videoID string, + broadcast *youtube.LiveBroadcast, ) (_err error) { + videoID := broadcast.Id + chatID := broadcast.Snippet.LiveChatId ctx = belt.WithField(ctx, "video_id", videoID) + ctx = belt.WithField(ctx, "chat_id", chatID) ctx = xcontext.DetachDone(ctx) - logger.Debugf(ctx, "startChatListener(ctx, '%s')", videoID) - defer func() { logger.Debugf(ctx, "/startChatListener(ctx, '%s'): %v", videoID, _err) }() + logger.Debugf(ctx, "startChatListener(ctx, '%s':'%s')", videoID, chatID) + defer func() { logger.Debugf(ctx, "/startChatListener(ctx, '%s':'%s'): %v", videoID, chatID, _err) }() - chatListener, err := NewChatListener(ctx, videoID) + _chatListener, err := NewChatListenerOBSOLETE(ctx, videoID, func( + ctx context.Context, + _chatListener *chatListener, + ) { + yt.deleteChatListener(ctx, _chatListener) + }) if err != nil { return fmt.Errorf("unable to initialize the chat listener instance: %w", err) } + oldListener := xsync.DoR1(ctx, &yt.locker, func() *chatListener { + oldListener := yt.chatListeners[broadcast.Id] + yt.chatListeners[broadcast.Id] = _chatListener + return oldListener + }) + if err := oldListener.Close(ctx); err != nil { + logger.Debugf(ctx, "unable to close the old chat listener: %v", err) + } + observability.Go(ctx, func(ctx context.Context) { - err := yt.processChatListener(ctx, chatListener) + err := yt.processChatListener(ctx, _chatListener) if err != nil && !errors.Is(err, context.Canceled) { logger.Errorf(ctx, "unable to process the chat listener for '%s': %v", videoID, err) } @@ -1001,18 +1025,45 @@ func (yt *YouTube) startChatListener( return nil } +func (yt *YouTube) deleteChatListenerByBroadcast( + ctx context.Context, + broadcast *youtube.LiveBroadcast, +) error { + chatListener := yt.getChatListener(ctx, broadcast) + if chatListener == nil { + return nil + } + return yt.deleteChatListener(ctx, chatListener) +} + +func (yt *YouTube) deleteChatListener( + ctx context.Context, + chatListener *chatListener, +) error { + err := chatListener.Close(ctx) + if err != nil { + logger.Warnf(ctx, "unable to close the chat listener for %s: %v", chatListener.GetVideoID(), err) + } + yt.locker.Do(ctx, func() { + if yt.chatListeners[chatListener.GetVideoID()] == chatListener { + delete(yt.chatListeners, chatListener.GetVideoID()) + } + }) + return nil +} + func (yt *YouTube) processChatListener( ctx context.Context, - chatListener *ChatListener, + chatListener *chatListener, ) (_err error) { defer func() { - err := chatListener.Close() + err := yt.deleteChatListener(ctx, chatListener) if err != nil { - logger.Errorf(ctx, "unable to close the chat listener for '%s': %v", chatListener.videoID, err) + logger.Errorf(ctx, "unable to delete the chat listener for '%s': %v", chatListener.GetVideoID(), err) } }() defer func() { - logger.Debugf(ctx, "stopped listening for chat messages in '%s': %v", chatListener.videoID, _err) + logger.Debugf(ctx, "stopped listening for chat messages in '%s': %v", chatListener.GetVideoID(), _err) }() inChan := chatListener.MessagesChan() for { @@ -1031,6 +1082,15 @@ func (yt *YouTube) processChatListener( } } +func (yt *YouTube) getChatListener( + ctx context.Context, + broadcast *youtube.LiveBroadcast, +) *chatListener { + return xsync.DoR1(ctx, &yt.locker, func() *chatListener { + return yt.chatListeners[broadcast.Id] + }) +} + func (yt *YouTube) EndStream( ctx context.Context, ) error { @@ -1043,6 +1103,9 @@ func (yt *YouTube) EndStream( }) return yt.updateActiveBroadcasts(ctx, func(broadcast *youtube.LiveBroadcast) error { + if err := yt.deleteChatListenerByBroadcast(ctx, broadcast); err != nil { + logger.Warnf(ctx, "unable to delete the chat listener for %s: %v", broadcast.Id, err) + } broadcast.ContentDetails.EnableAutoStop = true broadcast.ContentDetails.MonitorStream.ForceSendFields = []string{"BroadcastStreamDelayMs"} if _, ok := expectedVideoIDs[broadcast.Id]; !ok { @@ -1055,6 +1118,18 @@ func (yt *YouTube) EndStream( const timeLayout = "2006-01-02T15:04:05-0700" const timeLayoutFallback = time.RFC3339 +func ParseTimestamp(s string) (time.Time, error) { + ts, err0 := time.Parse(timeLayout, s) + if err0 == nil { + return ts, nil + } + ts, err1 := time.Parse(timeLayoutFallback, s) + if err1 == nil { + return ts, nil + } + return time.Now(), errors.Join(err0, err1) +} + func (yt *YouTube) GetStreamStatus( ctx context.Context, ) (_ret *streamcontrol.StreamStatus, _err error) { @@ -1074,18 +1149,9 @@ func (yt *YouTube) GetStreamStatus( var requestStatsVideoIDs []string err := yt.IterateActiveBroadcasts(ctx, func(broadcast *youtube.LiveBroadcast) error { ts := broadcast.Snippet.ActualStartTime - _startedAt, err := time.Parse(timeLayout, ts) + _startedAt, err := ParseTimestamp(ts) if err != nil { - _startedAt, err = time.Parse(timeLayoutFallback, ts) - if err != nil { - return fmt.Errorf( - "unable to parse '%s' with layouts '%s' and '%s': %w", - ts, - timeLayout, - timeLayoutFallback, - err, - ) - } + return fmt.Errorf("unable to parse '%s': %w", ts, err) } startedAt = &_startedAt if broadcast.Statistics != nil { @@ -1124,7 +1190,7 @@ func (yt *YouTube) GetStreamStatus( if _, ok := ids[newBroadcast.Id]; ok { continue } - err = yt.startChatListener(ctx, newBroadcast.Id) + err = yt.startChatListener(ctx, newBroadcast) if err != nil { logger.Errorf(ctx, "unable to start a chat listener for video '%s': %v", newBroadcast.Id, err) } diff --git a/pkg/streamcontrol/youtube/youtube_client.go b/pkg/streamcontrol/youtube/youtube_client.go index 8fbffc9..2b582c4 100644 --- a/pkg/streamcontrol/youtube/youtube_client.go +++ b/pkg/streamcontrol/youtube/youtube_client.go @@ -37,6 +37,7 @@ func (t BroadcastType) String() string { } type YouTubeClient interface { + YouTubeChatClient Ping(context.Context) error GetBroadcasts(ctx context.Context, t BroadcastType, ids []string, parts []string, pageToken string) (*youtube.LiveBroadcastListResponse, error) UpdateBroadcast(context.Context, *youtube.LiveBroadcast, []string) error @@ -309,3 +310,18 @@ func (c *YouTubeClientV3) DeleteChatMessage( do := c.Service.LiveChatMessages.Delete(messageID).Context(ctx).Do return wrapRequest(ctx, c.RequestWrapper, do) } + +func (c *YouTubeClientV3) GetLiveChatMessages( + ctx context.Context, + chatID string, + pageToken string, + parts []string, +) (_ret *youtube.LiveChatMessageListResponse, _err error) { + logger.Tracef(ctx, "GetLiveChatMessages") + defer func() { logger.Tracef(ctx, "/GetLiveChatMessages: %v", _err) }() + q := c.Service.LiveChatMessages.List(chatID, parts).Context(ctx) + if pageToken != "" { + q = q.PageToken(pageToken) + } + return q.Do() +} diff --git a/pkg/streamcontrol/youtube/youtube_client_calc_points.go b/pkg/streamcontrol/youtube/youtube_client_calc_points.go index 242f43e..71787e2 100644 --- a/pkg/streamcontrol/youtube/youtube_client_calc_points.go +++ b/pkg/streamcontrol/youtube/youtube_client_calc_points.go @@ -21,7 +21,7 @@ func init() { tzLosAngeles, err = time.LoadLocation("America/Los_Angeles") if err != nil { fmt.Fprintf(os.Stderr, "unable to get the timezone of Los_Angeles") - tzLosAngeles = time.FixedZone("America/Los_Angeles", -7 * 3600) + tzLosAngeles = time.FixedZone("America/Los_Angeles", -7*3600) } } @@ -208,3 +208,13 @@ func (c *YouTubeClientCalcPoints) DeleteChatMessage( defer func() { c.addUsedPointsIfNoError(ctx, 1, _err) }() return c.Client.DeleteChatMessage(ctx, messageID) } + +func (c *YouTubeClientCalcPoints) GetLiveChatMessages( + ctx context.Context, + chatID string, + pageToken string, + parts []string, +) (_ret *youtube.LiveChatMessageListResponse, _err error) { + defer func() { c.addUsedPointsIfNoError(ctx, 1, _err) }() + return c.Client.GetLiveChatMessages(ctx, chatID, pageToken, parts) +} diff --git a/pkg/streamcontrol/youtube/youtube_client_mock.go b/pkg/streamcontrol/youtube/youtube_client_mock.go index 9aed3b9..204ac3b 100644 --- a/pkg/streamcontrol/youtube/youtube_client_mock.go +++ b/pkg/streamcontrol/youtube/youtube_client_mock.go @@ -177,3 +177,14 @@ func (c *YouTubeClientMock) DeleteChatMessage( defer func() { logger.Tracef(ctx, "/DeleteChatMessage: %v", _err) }() return fmt.Errorf("not implemented") } + +func (c *YouTubeClientMock) GetLiveChatMessages( + ctx context.Context, + chatID string, + pageToken string, + parts []string, +) (_ret *youtube.LiveChatMessageListResponse, _err error) { + logger.Tracef(ctx, "GetLiveChatMessages") + defer func() { logger.Tracef(ctx, "/GetLiveChatMessages: %v", _err) }() + return nil, fmt.Errorf("not implemented") +} diff --git a/pkg/streamcontrol/youtube/youtube_test.go b/pkg/streamcontrol/youtube/youtube_test.go new file mode 100644 index 0000000..d6c6922 --- /dev/null +++ b/pkg/streamcontrol/youtube/youtube_test.go @@ -0,0 +1,25 @@ +package youtube + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestParseTimestamp(t *testing.T) { + type testCase struct { + input string + output time.Time + } + + for _, testCase := range []testCase{ + {input: "2025-07-09T11:49:53Z", output: time.Date(2025, 7, 9, 11, 49, 53, 0, time.UTC)}, + } { + t.Run(testCase.input, func(t *testing.T) { + r, err := ParseTimestamp(testCase.input) + require.NoError(t, err) + require.Equal(t, testCase.output, r.UTC()) + }) + } +} diff --git a/pkg/streamd/chat.go b/pkg/streamd/chat.go index e36cfe3..02a316a 100644 --- a/pkg/streamd/chat.go +++ b/pkg/streamd/chat.go @@ -55,7 +55,7 @@ func (d *StreamD) startListeningForChatMessages( if err := d.ChatMessagesStorage.AddMessage(ctx, msg); err != nil { logger.Errorf(ctx, "unable to add the message %#+v to the chat messages storage: %v", msg, err) } - d.publishEvent(ctx, msg) + publishEvent(ctx, d.EventBus, msg) } } }) @@ -113,7 +113,7 @@ func (d *StreamD) SubscribeToChatMessages( defer func() { logger.Tracef(ctx, "/SubscribeToChatMessages(ctx, %v, %v): %p %v", since, limit, _ret, _err) }() return eventSubToChan( - ctx, d, + ctx, d.EventBus, 1000, func(ctx context.Context, outCh chan api.ChatMessage) { logger.Tracef(ctx, "backfilling the channel") defer func() { logger.Tracef(ctx, "/backfilling the channel") }() diff --git a/pkg/streamd/client/client.go b/pkg/streamd/client/client.go index 6488ca1..9471068 100644 --- a/pkg/streamd/client/client.go +++ b/pkg/streamd/client/client.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "crypto" + "crypto/tls" "encoding/json" "errors" "fmt" @@ -42,7 +43,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/backoff" "google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/credentials" "google.golang.org/grpc/stats" "google.golang.org/grpc/status" ) @@ -163,7 +164,9 @@ func (c *Client) connect( ) (*grpc.ClientConn, error) { opts := []grpc.DialOption{ grpc.WithTransportCredentials( - insecure.NewCredentials(), + credentials.NewTLS(&tls.Config{ + InsecureSkipVerify: true, + }), ), grpc.WithConnectParams(grpc.ConnectParams{ Backoff: backoff.Config{ diff --git a/pkg/streamd/events.go b/pkg/streamd/events.go index abd8cca..5135389 100644 --- a/pkg/streamd/events.go +++ b/pkg/streamd/events.go @@ -4,11 +4,11 @@ import ( "context" "encoding/json" "fmt" - "sync" "time" "github.com/davecgh/go-spew/spew" "github.com/facebookincubator/go-belt/tool/logger" + "github.com/xaionaro-go/eventbus" "github.com/xaionaro-go/observability" "github.com/xaionaro-go/streamctl/pkg/expression" "github.com/xaionaro-go/streamctl/pkg/streamd/api" @@ -58,6 +58,19 @@ func (d *StreamD) submitEvent( return nil } +func publishEvent[E any]( + ctx context.Context, + bus *eventbus.EventBus, + event E, +) { + logger.Debugf(ctx, "publishEvent[%T](ctx, %#+v)", event, event) + defer logger.Debugf(ctx, "/publishEvent[%T](ctx, %#+v)", event, event) + result := eventbus.SendEvent(ctx, bus, event) + if result.DropCountImmediate != 0 || result.DropCountDeferred != 0 { + logger.Warnf(ctx, "unable to deliver the event to some of the subscriptions: %d + %d", result.DropCountImmediate, result.DropCountDeferred) + } +} + func (d *StreamD) doAction( ctx context.Context, a action.Action, @@ -92,118 +105,95 @@ func (d *StreamD) doAction( func eventSubToChan[T any]( ctx context.Context, - d *StreamD, + eventBus *eventbus.EventBus, + queueSize uint, onReady func(ctx context.Context, outCh chan T), ) (<-chan T, error) { - var sample T - logger.Debugf(ctx, "eventSubToChan[%T]", sample) - defer func() { logger.Debugf(ctx, "/eventSubToChan[%T]", sample) }() - - topic := eventTopic(sample) - return eventSubToChanUsingTopic(ctx, d, onReady, topic) + var topic T + logger.Debugf(ctx, "eventSubToChan[%T]", topic) + defer func() { logger.Debugf(ctx, "/eventSubToChan[%T]", topic) }() + return eventSubToChanUsingTopic(ctx, eventBus, queueSize, onReady, topic) } -func eventSubToChanUsingTopic[T any]( +func eventSubToChanUsingTopic[T, E any]( ctx context.Context, - d *StreamD, - onReady func(ctx context.Context, outCh chan T), - topic string, -) (<-chan T, error) { - var mutex sync.Mutex - r := make(chan T) - callback := func(in T) { - mutex.Lock() - defer mutex.Unlock() - logger.Tracef(ctx, "eventSubToChanUsingTopic(%T): received %#+v", topic, in) + eventBus *eventbus.EventBus, + queueSize uint, + onReady func(ctx context.Context, outCh chan E), + topic T, +) (<-chan E, error) { + var sample E + logger.Debugf(ctx, "eventSubToChanUsingTopic[%T, %T]", topic, sample) + defer func() { logger.Debugf(ctx, "/eventSubToChanUsingTopic[%T, %T]", topic, sample) }() - select { - case <-ctx.Done(): - return - default: - } - - select { - case r <- in: - case <-time.After(time.Minute): - logger.Errorf(ctx, "unable to notify about '%s': timeout", topic) - } + opts := eventbus.Options{ + eventbus.OptionQueueSize(1), + eventbus.OptionOnOverflow(eventbus.OnOverflowPileUpOrClose(queueSize, 10*time.Second)), + eventbus.OptionOnUnsubscribe[E](func(_ context.Context, sub *eventbus.Subscription[E]) { + logger.Debugf(ctx, "eventSubToChanUsingTopic[%T, %T]: unsubscribed", topic, sample) + }), } - if onReady != nil { - observability.Go(ctx, func(ctx context.Context) { - mutex.Lock() - defer mutex.Unlock() - err := d.EventBus.SubscribeAsync(topic, callback, true) - if err != nil { - logger.Errorf(ctx, "unable to subscribe: %v", err) - return - } - onReady(ctx, r) - }) - } else { - err := d.EventBus.SubscribeAsync(topic, callback, true) - if err != nil { - return nil, fmt.Errorf("unable to subscribe: %w", err) - } + opts = append(opts, + eventbus.OptionOnSubscribed[E](func( + ctx context.Context, + sub *eventbus.Subscription[E], + ) { + onReady(ctx, sub.EventChan()) + }), + ) } - observability.Go(ctx, func(ctx context.Context) { - <-ctx.Done() - - d.EventBus.Unsubscribe(topic, callback) - d.EventBus.WaitAsync() - close(r) - }) - - return r, nil + sub := eventbus.SubscribeWithCustomTopic[T, E](ctx, eventBus, topic, opts...) + return sub.EventChan(), nil } func (d *StreamD) SubscribeToDashboardChanges( ctx context.Context, ) (<-chan api.DiffDashboard, error) { - return eventSubToChan[api.DiffDashboard](ctx, d, nil) + return eventSubToChan[api.DiffDashboard](ctx, d.EventBus, 1000, nil) } func (d *StreamD) SubscribeToConfigChanges( ctx context.Context, ) (<-chan api.DiffConfig, error) { - return eventSubToChan[api.DiffConfig](ctx, d, nil) + return eventSubToChan[api.DiffConfig](ctx, d.EventBus, 1000, nil) } func (d *StreamD) SubscribeToStreamsChanges( ctx context.Context, ) (<-chan api.DiffStreams, error) { - return eventSubToChan[api.DiffStreams](ctx, d, nil) + return eventSubToChan[api.DiffStreams](ctx, d.EventBus, 1000, nil) } func (d *StreamD) SubscribeToStreamServersChanges( ctx context.Context, ) (<-chan api.DiffStreamServers, error) { - return eventSubToChan[api.DiffStreamServers](ctx, d, nil) + return eventSubToChan[api.DiffStreamServers](ctx, d.EventBus, 1000, nil) } func (d *StreamD) SubscribeToStreamDestinationsChanges( ctx context.Context, ) (<-chan api.DiffStreamDestinations, error) { - return eventSubToChan[api.DiffStreamDestinations](ctx, d, nil) + return eventSubToChan[api.DiffStreamDestinations](ctx, d.EventBus, 1000, nil) } func (d *StreamD) SubscribeToIncomingStreamsChanges( ctx context.Context, ) (<-chan api.DiffIncomingStreams, error) { - return eventSubToChan[api.DiffIncomingStreams](ctx, d, nil) + return eventSubToChan[api.DiffIncomingStreams](ctx, d.EventBus, 1000, nil) } func (d *StreamD) SubscribeToStreamForwardsChanges( ctx context.Context, ) (<-chan api.DiffStreamForwards, error) { - return eventSubToChan[api.DiffStreamForwards](ctx, d, nil) + return eventSubToChan[api.DiffStreamForwards](ctx, d.EventBus, 1000, nil) } func (d *StreamD) SubscribeToStreamPlayersChanges( ctx context.Context, ) (<-chan api.DiffStreamPlayers, error) { - return eventSubToChan[api.DiffStreamPlayers](ctx, d, nil) + return eventSubToChan[api.DiffStreamPlayers](ctx, d.EventBus, 1000, nil) } func (d *StreamD) notifyStreamPlayerStart( @@ -213,5 +203,5 @@ func (d *StreamD) notifyStreamPlayerStart( logger.Debugf(ctx, "notifyStreamPlayerStart") defer logger.Debugf(ctx, "/notifyStreamPlayerStart") - d.publishEvent(ctx, api.DiffStreamPlayers{}) + publishEvent(ctx, d.EventBus, api.DiffStreamPlayers{}) } diff --git a/pkg/streamd/events_reflect.go b/pkg/streamd/events_reflect.go deleted file mode 100644 index 834fa5e..0000000 --- a/pkg/streamd/events_reflect.go +++ /dev/null @@ -1,31 +0,0 @@ -package streamd - -import ( - "context" - "fmt" - "reflect" - - "github.com/facebookincubator/go-belt/tool/logger" -) - -type Event interface{} - -func eventTopic( - event Event, -) string { - t := reflect.ValueOf(event).Type() - if t.Kind() == reflect.Pointer { - t = t.Elem() - } - return fmt.Sprintf("type:", t.Name()) -} - -func (d *StreamD) publishEvent( - ctx context.Context, - event Event, -) { - topic := eventTopic(event) - logger.Debugf(ctx, "publishEvent(ctx, %#+v): %s", event, topic) - defer logger.Debugf(ctx, "/publishEvent(ctx, %#+v): %s", event, topic) - d.EventBus.Publish(topic, event) -} diff --git a/pkg/streamd/image_taker.go b/pkg/streamd/image_taker.go index 529df31..76dab10 100644 --- a/pkg/streamd/image_taker.go +++ b/pkg/streamd/image_taker.go @@ -62,8 +62,10 @@ func (d *StreamD) getImageBytes( func (d *StreamD) initImageTaker(ctx context.Context) error { observability.Go(ctx, func(ctx context.Context) { + ctxCh, cancelFn := context.WithCancel(ctx) + defer cancelFn() defer logger.Debugf(ctx, "/imageTaker") - ch, err := d.SubscribeToDashboardChanges(ctx) + ch, restartCh, err := autoResubscribe(ctxCh, d.SubscribeToDashboardChanges) if err != nil { logger.Errorf(ctx, "unable to subscribe to dashboard changes: %v", err) return @@ -72,6 +74,8 @@ func (d *StreamD) initImageTaker(ctx context.Context) error { select { case <-ctx.Done(): return + case <-restartCh: + d.restartImageTaker(ctx) case <-ch: d.restartImageTaker(ctx) } @@ -85,7 +89,9 @@ func (d *StreamD) restartImageTaker(ctx context.Context) error { return xsync.DoA1R1(ctx, &d.imageTakerLocker, d.restartImageTakerNoLock, ctx) } -func (d *StreamD) restartImageTakerNoLock(ctx context.Context) error { +func (d *StreamD) restartImageTakerNoLock(ctx context.Context) (_err error) { + logger.Debugf(ctx, "restartImageTakerNoLock") + defer func() { logger.Debugf(ctx, "/restartImageTakerNoLock: %v", _err) }() if d.imageTakerCancel != nil { d.imageTakerCancel() d.imageTakerCancel = nil diff --git a/pkg/streamd/server/grpc.go b/pkg/streamd/server/grpc.go index eaeba0c..486dc05 100644 --- a/pkg/streamd/server/grpc.go +++ b/pkg/streamd/server/grpc.go @@ -14,6 +14,7 @@ import ( "github.com/goccy/go-yaml" "github.com/google/uuid" "github.com/hashicorp/go-multierror" + "github.com/xaionaro-go/observability" "github.com/xaionaro-go/player/pkg/player/protobuf/go/player_grpc" "github.com/xaionaro-go/streamctl/pkg/streamcontrol" "github.com/xaionaro-go/streamctl/pkg/streamd" @@ -1649,21 +1650,37 @@ func wrapChan[T any, E any]( if err != nil { return err } + errCh := make(chan error, 1) for { var input E + var ok bool select { case <-ctx.Done(): return ctx.Err() - case input = <-ch: + case input, ok = <-ch: + } + if !ok { + return fmt.Errorf("channel is closed") } result := parse(input) - err := sender.Send(&result) - if err != nil { - return fmt.Errorf( - "unable to send %#+v: %w", - result, - err, - ) + sendCtx, cancelFn := context.WithTimeout(ctx, time.Minute) + observability.Go(ctx, func(ctx context.Context) { + errCh <- sender.Send(&result) + }) + select { + case <-sendCtx.Done(): + logger.Warnf(ctx, "sending timed out") + cancelFn() + return sendCtx.Err() + case err := <-errCh: + cancelFn() + if err != nil { + return fmt.Errorf( + "unable to send %#+v: %w", + result, + err, + ) + } } } } diff --git a/pkg/streamd/stream_controller.go b/pkg/streamd/stream_controller.go index a8a6b75..a30b0e2 100644 --- a/pkg/streamd/stream_controller.go +++ b/pkg/streamd/stream_controller.go @@ -10,6 +10,7 @@ import ( "github.com/andreykaipov/goobs" "github.com/andreykaipov/goobs/api/events" "github.com/andreykaipov/goobs/api/events/subscriptions" + "github.com/facebookincubator/go-belt" "github.com/facebookincubator/go-belt/tool/logger" "github.com/hashicorp/go-multierror" "github.com/xaionaro-go/object" @@ -331,6 +332,7 @@ func (d *StreamD) processOBSEvent( } func (d *StreamD) initTwitchBackend(ctx context.Context) error { + ctx = belt.WithField(ctx, "controller", twitch.ID) twitch, err := newTwitch( ctx, d.Config.Backends[twitch.ID], @@ -348,6 +350,7 @@ func (d *StreamD) initTwitchBackend(ctx context.Context) error { } func (d *StreamD) initKickBackend(ctx context.Context) error { + ctx = belt.WithField(ctx, "controller", kick.ID) cacheHashBeforeInit, _ := object.CalcCryptoHash(d.Cache.Kick) kick, err := newKick( kick.CtxWithCache(ctx, &d.Cache.Kick), @@ -373,6 +376,7 @@ func (d *StreamD) initKickBackend(ctx context.Context) error { } func (d *StreamD) initYouTubeBackend(ctx context.Context) error { + ctx = belt.WithField(ctx, "controller", youtube.ID) youTube, err := newYouTube( ctx, d.Config.Backends[youtube.ID], diff --git a/pkg/streamd/streamd.go b/pkg/streamd/streamd.go index 024b958..83a8d6b 100644 --- a/pkg/streamd/streamd.go +++ b/pkg/streamd/streamd.go @@ -11,11 +11,11 @@ import ( "sync/atomic" "time" - eventbus "github.com/asaskevich/EventBus" "github.com/facebookincubator/go-belt" "github.com/facebookincubator/go-belt/tool/experimental/errmon" "github.com/facebookincubator/go-belt/tool/logger" "github.com/hashicorp/go-multierror" + "github.com/xaionaro-go/eventbus" "github.com/xaionaro-go/observability" "github.com/xaionaro-go/player/pkg/player" "github.com/xaionaro-go/streamctl/pkg/chatmessagesstorage" @@ -92,7 +92,7 @@ type StreamD struct { StreamStatusCache *memoize.MemoizeData OBSState OBSState - EventBus eventbus.Bus + EventBus *eventbus.EventBus TimersLocker xsync.Mutex NextTimerID uint64 @@ -259,7 +259,7 @@ func (d *StreamD) secretsProviderUpdater(ctx context.Context) (_err error) { logger.Debugf(ctx, "secretsProviderUpdater") defer logger.Debugf(ctx, "/secretsProviderUpdater: %v", _err) - cfgChangeCh, err := eventSubToChan[api.DiffConfig](ctx, d, nil) + cfgChangeCh, err := eventSubToChan[api.DiffConfig](ctx, d.EventBus, 1000, nil) if err != nil { return fmt.Errorf("unable to subscribe to config changes: %w", err) } @@ -302,7 +302,7 @@ func (d *StreamD) initStreamServer(ctx context.Context) (_err error) { //newBrowserOpenerAdapter(d), ) assert(d.StreamServer != nil) - defer d.publishEvent(ctx, api.DiffStreamServers{}) + defer publishEvent(ctx, d.EventBus, api.DiffStreamServers{}) return d.StreamServer.Init( ctx, sstypes.InitOptionDefaultStreamPlayerOptions(d.streamPlayerOptions()), @@ -566,9 +566,9 @@ func (d *StreamD) setConfig(ctx context.Context, cfg *config.Config) (_ret error } if !dashboardCfgEqual { logger.Debugf(ctx, "dashboard config changed") - d.publishEvent(ctx, api.DiffDashboard{}) + publishEvent(ctx, d.EventBus, api.DiffDashboard{}) } - d.publishEvent(ctx, api.DiffConfig{}) + publishEvent(ctx, d.EventBus, api.DiffConfig{}) }() logger.Debugf(ctx, "SetConfig: %#+v", *cfg) @@ -648,7 +648,7 @@ func (d *StreamD) StartStream( logger.Debugf(ctx, "StartStream(%s)", platID) return xsync.RDoR1(ctx, &d.ControllersLocker, func() error { defer func() { logger.Debugf(ctx, "/StartStream(%s): %v", platID, _err) }() - defer d.publishEvent(ctx, api.DiffStreams{}) + defer publishEvent(ctx, d.EventBus, api.DiffStreams{}) defer func() { d.StreamStatusCache.InvalidateCache(ctx) @@ -776,7 +776,7 @@ func (d *StreamD) EndStream(ctx context.Context, platID streamcontrol.PlatformNa logger.Debugf(ctx, "EndStream(ctx, '%s')", platID) defer logger.Debugf(ctx, "/EndStream(ctx, '%s')", platID) - defer d.publishEvent(ctx, api.DiffStreams{}) + defer publishEvent(ctx, d.EventBus, api.DiffStreams{}) return xsync.RDoR1(ctx, &d.ControllersLocker, func() error { defer d.StreamStatusCache.InvalidateCache(ctx) @@ -902,6 +902,7 @@ func (d *StreamD) streamController( ctx context.Context, platID streamcontrol.PlatformName, ) (streamcontrol.AbstractStreamController, error) { + ctx = belt.WithField(ctx, "controller", platID) var result streamcontrol.AbstractStreamController switch platID { case obs.ID: @@ -978,7 +979,7 @@ func (d *StreamD) SetTitle( platID streamcontrol.PlatformName, title string, ) error { - defer d.publishEvent(ctx, api.DiffStreams{}) + defer publishEvent(ctx, d.EventBus, api.DiffStreams{}) return xsync.RDoR1(ctx, &d.ControllersLocker, func() error { c, err := d.streamController(ctx, platID) @@ -995,7 +996,7 @@ func (d *StreamD) SetDescription( platID streamcontrol.PlatformName, description string, ) error { - defer d.publishEvent(ctx, api.DiffStreams{}) + defer publishEvent(ctx, d.EventBus, api.DiffStreams{}) return xsync.RDoR1(ctx, &d.ControllersLocker, func() error { c, err := d.streamController(ctx, platID) @@ -1018,7 +1019,7 @@ func (d *StreamD) ApplyProfile( profile streamcontrol.AbstractStreamProfile, customArgs ...any, ) error { - defer d.publishEvent(ctx, api.DiffStreams{}) + defer publishEvent(ctx, d.EventBus, api.DiffStreams{}) return xsync.RDoR1(ctx, &d.ControllersLocker, func() error { c, err := d.streamController(d.ctxForController(ctx), platID) @@ -1037,7 +1038,7 @@ func (d *StreamD) UpdateStream( profile streamcontrol.AbstractStreamProfile, customArgs ...any, ) error { - defer d.publishEvent(ctx, api.DiffStreams{}) + defer publishEvent(ctx, d.EventBus, api.DiffStreams{}) return xsync.RDoR1(ctx, &d.ControllersLocker, func() error { err := d.SetTitle(d.ctxForController(ctx), platID, title) @@ -1160,7 +1161,7 @@ func (d *StreamD) StartStreamServer( ) error { logger.Debugf(ctx, "StartStreamServer") defer logger.Debugf(ctx, "/StartStreamServer") - defer d.publishEvent(ctx, api.DiffStreamServers{}) + defer publishEvent(ctx, d.EventBus, api.DiffStreamServers{}) return xsync.DoR1(ctx, &d.StreamServerLocker, func() error { if d.StreamServer == nil { @@ -1204,7 +1205,7 @@ func (d *StreamD) StopStreamServer( ) error { logger.Debugf(ctx, "StopStreamServer") defer logger.Debugf(ctx, "/StopStreamServer") - defer d.publishEvent(ctx, api.DiffStreamServers{}) + defer publishEvent(ctx, d.EventBus, api.DiffStreamServers{}) return xsync.DoR1(ctx, &d.StreamServerLocker, func() error { if d.StreamServer == nil { @@ -1235,7 +1236,7 @@ func (d *StreamD) AddIncomingStream( ) error { logger.Debugf(ctx, "AddIncomingStream") defer logger.Debugf(ctx, "/AddIncomingStream") - defer d.publishEvent(ctx, api.DiffIncomingStreams{}) + defer publishEvent(ctx, d.EventBus, api.DiffIncomingStreams{}) return xsync.DoR1(ctx, &d.StreamServerLocker, func() error { if d.StreamServer == nil { @@ -1261,7 +1262,7 @@ func (d *StreamD) RemoveIncomingStream( ) error { logger.Debugf(ctx, "RemoveIncomingStream") defer logger.Debugf(ctx, "/RemoveIncomingStream") - defer d.publishEvent(ctx, api.DiffIncomingStreams{}) + defer publishEvent(ctx, d.EventBus, api.DiffIncomingStreams{}) return xsync.DoR1(ctx, &d.StreamServerLocker, func() error { if d.StreamServer == nil { @@ -1357,7 +1358,7 @@ func (d *StreamD) AddStreamDestination( ) error { logger.Debugf(ctx, "AddStreamDestination") defer logger.Debugf(ctx, "/AddStreamDestination") - defer d.publishEvent(ctx, api.DiffStreamDestinations{}) + defer publishEvent(ctx, d.EventBus, api.DiffStreamDestinations{}) return xsync.DoR1(ctx, &d.StreamServerLocker, func() error { if d.StreamServer == nil { @@ -1390,7 +1391,7 @@ func (d *StreamD) UpdateStreamDestination( ) error { logger.Debugf(ctx, "UpdateStreamDestination") defer logger.Debugf(ctx, "/UpdateStreamDestination") - defer d.publishEvent(ctx, api.DiffStreamDestinations{}) + defer publishEvent(ctx, d.EventBus, api.DiffStreamDestinations{}) return xsync.DoR1(ctx, &d.StreamServerLocker, func() error { if d.StreamServer == nil { @@ -1421,7 +1422,7 @@ func (d *StreamD) RemoveStreamDestination( ) error { logger.Debugf(ctx, "RemoveStreamDestination") defer logger.Debugf(ctx, "/RemoveStreamDestination") - defer d.publishEvent(ctx, api.DiffStreamDestinations{}) + defer publishEvent(ctx, d.EventBus, api.DiffStreamDestinations{}) return xsync.DoR1(ctx, &d.StreamServerLocker, func() error { if d.StreamServer == nil { @@ -1483,7 +1484,7 @@ func (d *StreamD) AddStreamForward( ) error { logger.Debugf(ctx, "AddStreamForward") defer logger.Debugf(ctx, "/AddStreamForward") - defer d.publishEvent(ctx, api.DiffStreamForwards{}) + defer publishEvent(ctx, d.EventBus, api.DiffStreamForwards{}) return xsync.DoR1(ctx, &d.StreamServerLocker, func() error { if d.StreamServer == nil { @@ -1520,7 +1521,7 @@ func (d *StreamD) UpdateStreamForward( ) (_err error) { logger.Debugf(ctx, "UpdateStreamForward") defer func() { logger.Debugf(ctx, "/UpdateStreamForward: %v", _err) }() - defer d.publishEvent(ctx, api.DiffStreamForwards{}) + defer publishEvent(ctx, d.EventBus, api.DiffStreamForwards{}) return xsync.DoR1(ctx, &d.StreamServerLocker, func() error { if d.StreamServer == nil { @@ -1554,7 +1555,7 @@ func (d *StreamD) RemoveStreamForward( ) error { logger.Debugf(ctx, "RemoveStreamForward") defer logger.Debugf(ctx, "/RemoveStreamForward") - defer d.publishEvent(ctx, api.DiffStreamForwards{}) + defer publishEvent(ctx, d.EventBus, api.DiffStreamForwards{}) return xsync.DoR1(ctx, &d.StreamServerLocker, func() error { if d.StreamServer == nil { @@ -1619,7 +1620,7 @@ func (d *StreamD) AddStreamPlayer( if d.StreamServer == nil { return fmt.Errorf("stream server is not initialized") } - defer d.publishEvent(ctx, api.DiffStreamPlayers{}) + defer publishEvent(ctx, d.EventBus, api.DiffStreamPlayers{}) var result *multierror.Error result = multierror.Append(result, d.StreamServer.AddStreamPlayer( ctx, @@ -1664,7 +1665,7 @@ func (d *StreamD) UpdateStreamPlayer( if d.StreamServer == nil { return fmt.Errorf("stream server is not initialized") } - defer d.publishEvent(ctx, api.DiffStreamPlayers{}) + defer publishEvent(ctx, d.EventBus, api.DiffStreamPlayers{}) var result *multierror.Error result = multierror.Append(result, d.StreamServer.UpdateStreamPlayer( ctx, @@ -1687,7 +1688,7 @@ func (d *StreamD) RemoveStreamPlayer( if d.StreamServer == nil { return fmt.Errorf("stream server is not initialized") } - defer d.publishEvent(ctx, api.DiffStreamPlayers{}) + defer publishEvent(ctx, d.EventBus, api.DiffStreamPlayers{}) var result *multierror.Error result = multierror.Append(result, d.StreamServer.RemoveStreamPlayer( ctx, diff --git a/pkg/streamd/subscribe.go b/pkg/streamd/subscribe.go new file mode 100644 index 0000000..b338e5b --- /dev/null +++ b/pkg/streamd/subscribe.go @@ -0,0 +1,61 @@ +package streamd + +import ( + "context" + "time" + + "github.com/facebookincubator/go-belt/tool/logger" + "github.com/xaionaro-go/observability" +) + +func autoResubscribe[T any]( + ctx context.Context, + fn func(context.Context) (<-chan T, error), +) (<-chan T, <-chan struct{}, error) { + input, err := fn(ctx) + if err != nil { + return nil, nil, err + } + result := make(chan T, 1) + restartCh := make(chan struct{}, 1) + observability.Go(ctx, func(ctx context.Context) { + defer func() { + var sample T + logger.Debugf(ctx, "autoResubscribe[%T] handler is closed", sample) + }() + defer close(result) + defer close(restartCh) + for { + for { + var ( + ev T + ok bool + ) + select { + case <-ctx.Done(): + return + case ev, ok = <-input: + } + if !ok { + logger.Debugf(ctx, "the input channel is closed; reconnect") + break + } + select { + case <-ctx.Done(): + return + case result <- ev: + } + } + for { + input, err = fn(ctx) + if err == nil { + break + } + logger.Warnf(ctx, "unable to reconnect: %w") + time.Sleep(time.Second) + } + restartCh <- struct{}{} + } + }) + return result, restartCh, nil +} diff --git a/pkg/streamd/variables.go b/pkg/streamd/variables.go index 8e47b8a..46f6db1 100644 --- a/pkg/streamd/variables.go +++ b/pkg/streamd/variables.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/facebookincubator/go-belt/tool/logger" + "github.com/xaionaro-go/eventbus" "github.com/xaionaro-go/streamctl/pkg/streamd/api" "github.com/xaionaro-go/streamctl/pkg/streamd/consts" ) @@ -43,8 +44,10 @@ func (d *StreamD) GetVariableHash( return hash, nil } -func topicForVariable(key consts.VarKey) string { - return fmt.Sprintf("var:%s", key) +type subscriptionTopic string + +func topicForVariable(key consts.VarKey) subscriptionTopic { + return subscriptionTopic(fmt.Sprintf("var:%s", key)) } func (d *StreamD) SetVariable( @@ -55,7 +58,12 @@ func (d *StreamD) SetVariable( logger.Tracef(ctx, "SetVariable(ctx, '%s', value [len == %d])", key, len(value)) defer logger.Tracef(ctx, "/SetVariable(ctx, '%s', value [len == %d])", key, len(value)) d.Variables.Store(key, value) - d.EventBus.Publish(topicForVariable(key), value) + eventbus.SendEventWithCustomTopic( + ctx, + d.EventBus, + topicForVariable(key), + value, + ) return nil } @@ -63,5 +71,10 @@ func (d *StreamD) SubscribeToVariable( ctx context.Context, varKey consts.VarKey, ) (<-chan api.VariableValue, error) { - return eventSubToChanUsingTopic[api.VariableValue](ctx, d, nil, topicForVariable(varKey)) + return eventSubToChanUsingTopic[subscriptionTopic, api.VariableValue]( + ctx, + d.EventBus, 10, + nil, + topicForVariable(varKey), + ) } diff --git a/pkg/streampanel/chat.go b/pkg/streampanel/chat.go index 9e228dc..4d71731 100644 --- a/pkg/streampanel/chat.go +++ b/pkg/streampanel/chat.go @@ -17,8 +17,8 @@ import ( "github.com/xaionaro-go/xsync" ) -const ( - ChatLogSize = 20 +var ( + ChatLogSize = 35 ) type chatUIInterface interface { @@ -67,7 +67,9 @@ func (p *Panel) getChatUIs(ctx context.Context) []chatUIInterface { } func (p *Panel) initChatMessagesHandler(ctx context.Context) error { - msgCh, err := p.StreamD.SubscribeToChatMessages(ctx, time.Now().Add(-7*24*time.Hour), ChatLogSize) + msgCh, _, err := autoResubscribe(ctx, func(ctx context.Context) (<-chan api.ChatMessage, error) { + return p.StreamD.SubscribeToChatMessages(ctx, time.Now().Add(-60*24*time.Hour), uint64(ChatLogSize)) + }) if err != nil { return fmt.Errorf("unable to subscribe to chat messages: %w", err) } diff --git a/pkg/streampanel/chat_android.go b/pkg/streampanel/chat_android.go new file mode 100644 index 0000000..c364bd1 --- /dev/null +++ b/pkg/streampanel/chat_android.go @@ -0,0 +1,8 @@ +//go:build android +// +build android + +package streampanel + +func init() { + ChatLogSize = 20 +} diff --git a/pkg/streampanel/chat_as_text.go b/pkg/streampanel/chat_as_text.go index 8663a29..aa16443 100644 --- a/pkg/streampanel/chat_as_text.go +++ b/pkg/streampanel/chat_as_text.go @@ -127,6 +127,7 @@ func (ui *chatUIAsText) Rebuild( ui.newItem(ctx, itemIdx, msg) } }) + ui.Text.Refresh() if ui.OnAdd != nil { ui.OnAdd(ctx, api.ChatMessage{}) } @@ -153,6 +154,10 @@ func (ui *chatUIAsText) Append( return } ui.newItem(ctx, itemIdx, *msg) + ui.Text.Refresh() + if ui.OnAdd != nil { + ui.OnAdd(ctx, *msg) + } }) } diff --git a/pkg/streampanel/monitor.go b/pkg/streampanel/monitor.go index 0989746..f264ef8 100644 --- a/pkg/streampanel/monitor.go +++ b/pkg/streampanel/monitor.go @@ -221,6 +221,7 @@ func (p *monitorPage) startUpdatingNoLock( } observability.Go(ctx, func(ctx context.Context) { + defer logger.Debugf(ctx, "startUpdatingNoLock: the handler closed") updateData := func() { inStreams, err := streamD.ListIncomingStreams(ctx) if err != nil { @@ -232,12 +233,20 @@ func (p *monitorPage) startUpdatingNoLock( } updateData() - ch, err := streamD.SubscribeToIncomingStreamsChanges(ctx) + ch, restartCh, err := autoResubscribe(ctx, streamD.SubscribeToIncomingStreamsChanges) if err != nil { p.parent().DisplayError(err) return } - for range ch { + for { + var ok bool + select { + case _, ok = <-restartCh: + case _, ok = <-ch: + } + if !ok { + break + } logger.Debugf(ctx, "got event IncomingStreamsChange") updateData() } diff --git a/pkg/streampanel/panel.go b/pkg/streampanel/panel.go index 05e409c..3151588 100644 --- a/pkg/streampanel/panel.go +++ b/pkg/streampanel/panel.go @@ -525,7 +525,9 @@ func (p *Panel) startOAuthListenerForRemoteStreamD( return fmt.Errorf("unable to start listener for OAuth responses: %w", err) } - oauthURLChan, err := streamD.SubscribeToOAuthURLs(ctx, listenPort) + oauthURLChan, restartOAuthURLChan, err := autoResubscribe(ctx, func(ctx context.Context) (<-chan *streamd_grpc.OAuthRequest, error) { + return streamD.SubscribeToOAuthURLs(ctx, listenPort) + }) if err != nil { cancelFn() return fmt.Errorf("unable to subscribe to OAuth requests of streamd: %w", err) @@ -541,6 +543,8 @@ func (p *Panel) startOAuthListenerForRemoteStreamD( select { case <-ctx.Done(): return + case <-restartOAuthURLChan: + continue case req, ok := <-oauthURLChan: logger.Debugf(ctx, "<-oauthURLChan") if !ok { @@ -1864,14 +1868,14 @@ func (p *Panel) subscribeUpdateControlPage(ctx context.Context) { logger.Debugf(ctx, "subscribe to streams and config changes") defer logger.Debugf(ctx, "/subscribe to streams and config changes") - chStreams, err := p.StreamD.SubscribeToStreamsChanges(ctx) + chStreams, restartChStreams, err := autoResubscribe(ctx, p.StreamD.SubscribeToStreamsChanges) if err != nil { p.DisplayError(err) //return } // TODO: deduplicate with localConfigCacheUpdater - chConfigs, err := p.StreamD.SubscribeToConfigChanges(ctx) + chConfigs, restartChConfigs, err := autoResubscribe(ctx, p.StreamD.SubscribeToConfigChanges) if err != nil { p.DisplayError(err) //return @@ -1880,16 +1884,23 @@ func (p *Panel) subscribeUpdateControlPage(ctx context.Context) { p.getUpdatedStatus(ctx) observability.Go(ctx, func(ctx context.Context) { + defer logger.Debugf(ctx, "subscribeUpdateControlPage: the handler closed") t := time.NewTicker(time.Second * 5) defer t.Stop() for { + var ok bool select { case <-ctx.Done(): return - case <-chStreams: - case <-chConfigs: + case _, ok = <-chStreams: + case _, ok = <-restartChStreams: + case _, ok = <-chConfigs: + case _, ok = <-restartChConfigs: case <-t.C: } + if !ok { + return + } p.getUpdatedStatus(ctx) } }) @@ -2404,7 +2415,7 @@ func (p *Panel) localConfigCacheUpdater(ctx context.Context) (_err error) { logger.Debugf(ctx, "localConfigCacheUpdater") defer logger.Debugf(ctx, "/localConfigCacheUpdater: %v", _err) - cfgChangeCh, err := p.StreamD.SubscribeToConfigChanges(ctx) + cfgChangeCh, restartCfgChangeCh, err := autoResubscribe(ctx, p.StreamD.SubscribeToConfigChanges) if err != nil { return fmt.Errorf("unable to subscribe to config changes: %w", err) } @@ -2431,27 +2442,33 @@ func (p *Panel) localConfigCacheUpdater(ctx context.Context) (_err error) { defer logger.Debugf(ctx, "/localConfigUpdaterLoop") for { + var ok bool select { case <-ctx.Done(): return - case <-cfgChangeCh: - newCfg, err := p.StreamD.GetConfig(ctx) - if err != nil { - logger.Errorf(ctx, "unable to get the new config: %v", err) - continue - } - err = newCfg.Convert() - if err != nil { - logger.Errorf(ctx, "unable to convert the config: %v", err) - continue - } - p.configCacheLocker.Do(ctx, func() { - p.configCache = newCfg - }) - logger.Debugf(ctx, "updated the config cache") - observability.SecretsProviderFromCtx(ctx).(*observability.SecretsStaticProvider).ParseSecretsFrom(newCfg) - logger.Debugf(ctx, "updated the secrets") + case _, ok = <-cfgChangeCh: + case _, ok = <-restartCfgChangeCh: } + if !ok { + logger.Errorf(ctx, "the channel is closed") + return + } + newCfg, err := p.StreamD.GetConfig(ctx) + if err != nil { + logger.Errorf(ctx, "unable to get the new config: %v", err) + continue + } + err = newCfg.Convert() + if err != nil { + logger.Errorf(ctx, "unable to convert the config: %v", err) + continue + } + p.configCacheLocker.Do(ctx, func() { + p.configCache = newCfg + }) + logger.Debugf(ctx, "updated the config cache") + observability.SecretsProviderFromCtx(ctx).(*observability.SecretsStaticProvider).ParseSecretsFrom(newCfg) + logger.Debugf(ctx, "updated the secrets") } }) diff --git a/pkg/streampanel/restream.go b/pkg/streampanel/restream.go index a051eb9..1f9758d 100644 --- a/pkg/streampanel/restream.go +++ b/pkg/streampanel/restream.go @@ -59,13 +59,26 @@ func (p *Panel) initRestreamPage( } updateData() - ch, err := p.StreamD.SubscribeToIncomingStreamsChanges(ctx) + ch, restartCh, err := autoResubscribe(ctx, p.StreamD.SubscribeToIncomingStreamsChanges) if err != nil { p.DisplayError(err) return } for range ch { - logger.Debugf(ctx, "got event IncomingStreamsChange") + var ok bool + select { + case _, ok = <-ch: + if ok { + logger.Debugf(ctx, "got event IncomingStreamsChange") + } + case _, ok = <-restartCh: + if ok { + logger.Debugf(ctx, "restarted SubscribeToIncomingStreamsChanges") + } + } + if !ok { + break + } updateData() } }) @@ -81,18 +94,32 @@ func (p *Panel) initRestreamPage( } updateData() - ch, err := p.StreamD.SubscribeToStreamServersChanges(ctx) + ch, restartCh, err := autoResubscribe(ctx, p.StreamD.SubscribeToStreamServersChanges) if err != nil { p.DisplayError(err) return } - for range ch { - logger.Debugf(ctx, "got event StreamServersChange") + for { + var ok bool + select { + case _, ok = <-ch: + if ok { + logger.Debugf(ctx, "got event StreamServersChange") + } + case _, ok = <-restartCh: + if ok { + logger.Debugf(ctx, "restarted SubscribeToStreamServersChanges") + } + } + if !ok { + break + } updateData() } }) observability.Go(ctx, func(ctx context.Context) { + defer logger.Debugf(ctx, "/SubscribeToStreamDestinationsChanges") updateData := func() { dsts, err := p.StreamD.ListStreamDestinations(ctx) if err != nil { @@ -103,18 +130,32 @@ func (p *Panel) initRestreamPage( } updateData() - ch, err := p.StreamD.SubscribeToStreamDestinationsChanges(ctx) + ch, restartCh, err := autoResubscribe(ctx, p.StreamD.SubscribeToStreamDestinationsChanges) if err != nil { p.DisplayError(err) return } - for range ch { - logger.Debugf(ctx, "got event StreamDestinationsChange") + for { + var ok bool + select { + case _, ok = <-ch: + if ok { + logger.Debugf(ctx, "got event StreamDestinationsChange") + } + case _, ok = <-restartCh: + if ok { + logger.Debugf(ctx, "restarted SubscribeToStreamDestinationsChanges") + } + } + if !ok { + break + } updateData() } }) observability.Go(ctx, func(ctx context.Context) { + defer logger.Debugf(ctx, "/SubscribeToStreamForwardsChanges") updateData := func() { streamFwds, err := p.StreamD.ListStreamForwards(ctx) if err != nil { @@ -125,18 +166,32 @@ func (p *Panel) initRestreamPage( } updateData() - ch, err := p.StreamD.SubscribeToStreamForwardsChanges(ctx) + ch, restartCh, err := autoResubscribe(ctx, p.StreamD.SubscribeToStreamForwardsChanges) if err != nil { p.DisplayError(err) return } - for range ch { - logger.Debugf(ctx, "got event StreamForwardsChange") + for { + var ok bool + select { + case _, ok = <-ch: + if ok { + logger.Debugf(ctx, "got event StreamForwardsChange") + } + case _, ok = <-restartCh: + if ok { + logger.Debugf(ctx, "restarted SubscribeToStreamForwardsChanges") + } + } + if !ok { + break + } updateData() } }) observability.Go(ctx, func(ctx context.Context) { + defer logger.Debugf(ctx, "/SubscribeToStreamPlayersChanges") updateData := func() { streamPlayers, err := p.StreamD.ListStreamPlayers(ctx) if err != nil { @@ -147,13 +202,26 @@ func (p *Panel) initRestreamPage( } updateData() - ch, err := p.StreamD.SubscribeToStreamPlayersChanges(ctx) + ch, restartCh, err := autoResubscribe(ctx, p.StreamD.SubscribeToStreamPlayersChanges) if err != nil { p.DisplayError(err) return } - for range ch { - logger.Debugf(ctx, "got event StreamPlayersChange") + for { + var ok bool + select { + case _, ok = <-ch: + if ok { + logger.Debugf(ctx, "got event StreamPlayersChange") + } + case _, ok = <-restartCh: + if ok { + logger.Debugf(ctx, "restarted SubscribeToStreamPlayersChanges") + } + } + if !ok { + break + } updateData() } }) diff --git a/pkg/streampanel/subscribe.go b/pkg/streampanel/subscribe.go new file mode 100644 index 0000000..22776fc --- /dev/null +++ b/pkg/streampanel/subscribe.go @@ -0,0 +1,61 @@ +package streampanel + +import ( + "context" + "time" + + "github.com/facebookincubator/go-belt/tool/logger" + "github.com/xaionaro-go/observability" +) + +func autoResubscribe[T any]( + ctx context.Context, + fn func(context.Context) (<-chan T, error), +) (<-chan T, <-chan struct{}, error) { + input, err := fn(ctx) + if err != nil { + return nil, nil, err + } + result := make(chan T, 1) + restartCh := make(chan struct{}, 1) + observability.Go(ctx, func(ctx context.Context) { + defer func() { + var sample T + logger.Debugf(ctx, "autoResubscribe[%T] handler is closed", sample) + }() + defer close(result) + defer close(restartCh) + for { + for { + var ( + ev T + ok bool + ) + select { + case <-ctx.Done(): + return + case ev, ok = <-input: + } + if !ok { + logger.Debugf(ctx, "the input channel is closed; reconnect") + break + } + select { + case <-ctx.Done(): + return + case result <- ev: + } + } + for { + input, err = fn(ctx) + if err == nil { + break + } + logger.Warnf(ctx, "unable to reconnect: %w") + time.Sleep(time.Second) + } + restartCh <- struct{}{} + } + }) + return result, restartCh, nil +}