Introduce basic support of an P2P-network

This commit is contained in:
Dmitrii Okunev
2024-11-03 20:04:36 +00:00
parent 4284a1d9f7
commit 5d37d9ea90
29 changed files with 2147 additions and 97 deletions

View File

@@ -18,10 +18,12 @@ import (
"github.com/facebookincubator/go-belt/tool/logger"
xlogrus "github.com/facebookincubator/go-belt/tool/logger/implementation/logrus"
"github.com/getsentry/sentry-go"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/sirupsen/logrus"
"github.com/xaionaro-go/streamctl/pkg/consts"
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/streamctl/pkg/secret"
"github.com/xaionaro-go/streamctl/pkg/streampanel"
"github.com/xaionaro-go/streamctl/pkg/xpath"
)
@@ -159,11 +161,11 @@ func getContext(
ctx = observability.CtxWithLogstash(
ctx,
flags.LogstashAddr,
strings.ToLower(streampanel.AppName),
strings.ToLower(consts.AppName),
)
}
ctx = belt.WithField(ctx, "program", strings.ToLower(streampanel.AppName))
ctx = belt.WithField(ctx, "program", strings.ToLower(consts.AppName))
if hostname, err := os.Hostname(); err == nil {
ctx = belt.WithField(ctx, "hostname", strings.ToLower(hostname))
@@ -184,6 +186,7 @@ func getContext(
logger.Default = func() logger.Logger {
return l
}
log.Logger = &zerolog.Logger{Logger: logger.FromCtx(ctx)}
return ctx
}

View File

@@ -14,7 +14,7 @@ import (
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/goccy/go-yaml"
"github.com/xaionaro-go/streamctl/pkg/streampanel"
"github.com/xaionaro-go/streamctl/pkg/consts"
"github.com/xaionaro-go/streamctl/pkg/xpath"
)
@@ -39,7 +39,7 @@ func getFlagsAndroidFromSysprop(flags *Flags) {
for i := 0; i < sv.NumField(); i++ {
f := sv.Field(i)
t := st.Field(i)
sysPropName := "streaming." + strings.ToLower(streampanel.AppName) + ".flags." + t.Name
sysPropName := "streaming." + strings.ToLower(consts.AppName) + ".flags." + t.Name
value, err := getSystemProperty(sysPropName)
if err != nil {
logger.Errorf(ctx, "unable to get sysprop '%s': %v", sysPropName, err)

68
go.mod
View File

@@ -13,19 +13,19 @@ replace fyne.io/fyne/v2 v2.5.0 => github.com/xaionaro-go/fyne/v2 v2.0.0-20241020
replace code.cloudfoundry.org/bytefmt => github.com/cloudfoundry/bytefmt v0.0.0-20211005130812-5bb3c17173e5
replace github.com/pion/ice/v2 => github.com/aler9/ice/v2 v2.0.0-20241006110309-c973995af023
replace github.com/pion/webrtc/v3 => github.com/aler9/webrtc/v3 v3.0.0-20240610104456-eaec24056d06
replace github.com/jfreymuth/pulse v0.1.1 => github.com/xaionaro-go/pulse v0.0.0-20241023202712-7151fa00d4bb
replace github.com/rs/zerolog v1.33.0 => github.com/xaionaro-go/zerolog2belt v0.0.0-20241103164018-a3bc1ea487e5
replace github.com/pojntfx/weron v0.2.7 => ../../pojntfx/weron
require (
github.com/facebookincubator/go-belt v0.0.0-20240804203001-846c4409d41c
github.com/go-git/go-billy/v5 v5.5.0
github.com/goccy/go-yaml v1.11.3
github.com/hashicorp/go-multierror v1.1.1
github.com/nicklaw5/helix/v2 v2.30.1-0.20240715193454-0151ccccf980
github.com/spf13/cobra v1.8.0
github.com/spf13/cobra v1.8.1
github.com/xaionaro-go/logrustash v0.0.0-20240804141650-d48034780a5f
golang.org/x/oauth2 v0.22.0
google.golang.org/api v0.192.0
@@ -48,6 +48,7 @@ require (
github.com/ProtonMail/go-crypto v1.0.0 // indirect
github.com/RomainMichau/cloudscraper_go v0.4.1 // indirect
github.com/abema/go-mp4 v1.2.0 // indirect
github.com/alecthomas/kong v1.2.1 // indirect
github.com/andybalholm/brotli v1.1.0 // indirect
github.com/asticode/go-astits v1.13.0 // indirect
github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c // indirect
@@ -61,25 +62,31 @@ require (
github.com/cloudflare/circl v1.3.7 // indirect
github.com/cloudwego/base64x v0.1.4 // indirect
github.com/cloudwego/iasm v0.2.0 // indirect
github.com/coreos/go-oidc/v3 v3.11.0 // indirect
github.com/cyphar/filepath-securejoin v0.2.4 // indirect
github.com/datarhei/gosrt v0.7.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dsnet/compress v0.0.1 // indirect
github.com/ebitengine/purego v0.8.0 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/fatih/color v1.15.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fredbi/uri v1.1.0 // indirect
github.com/friendsofgo/errors v0.9.2 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/fyne-io/gl-js v0.0.0-20230506162202-1fdaa286a934 // indirect
github.com/fyne-io/glfw-js v0.0.0-20240101223322-6e1efdc71b7a // indirect
github.com/fyne-io/image v0.0.0-20240417123036-dc0ee9e7c964 // indirect
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
github.com/gen2brain/shm v0.0.0-20230802011745-f2460f5984f7 // indirect
github.com/gin-contrib/pprof v1.5.0 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/gin-gonic/gin v1.10.0 // indirect
github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376 // indirect
github.com/go-gl/gl v0.0.0-20231021071112-07e5d0ea2e71 // indirect
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20240506104042-037f3cc74f2a // indirect
github.com/go-gorp/gorp/v3 v3.1.0 // indirect
github.com/go-jose/go-jose/v4 v4.0.4 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ng/slices v0.0.0-20230703171042-6195d35636a2 // indirect
@@ -90,14 +97,17 @@ require (
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.20.0 // indirect
github.com/go-redis/redis/v8 v8.11.5 // indirect
github.com/go-text/render v0.1.1-0.20240418202334-dd62631dae9b // indirect
github.com/go-text/typesetting v0.1.1 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/golang-jwt/jwt/v4 v4.0.0 // indirect
github.com/gofrs/uuid v4.4.0+incompatible // indirect
github.com/golang-jwt/jwt/v4 v4.2.0 // indirect
github.com/golang-jwt/jwt/v5 v5.2.1 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/google/pprof v0.0.0-20240207164012-fb44976bdcd5 // indirect
github.com/google/s2a-go v0.1.8 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
@@ -119,6 +129,7 @@ require (
github.com/kevinburke/ssh_config v1.2.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/lxn/win v0.0.0-20210218163916-a377121e959e // indirect
github.com/matthewhartstonge/argon2 v1.0.1 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
@@ -129,27 +140,40 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/nicksnyder/go-i18n/v2 v2.4.0 // indirect
github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/pion/ice/v2 v2.3.24 // indirect
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
github.com/pion/ice/v2 v2.3.34 // indirect
github.com/pion/interceptor v0.1.37 // indirect
github.com/pion/logging v0.2.2 // indirect
github.com/pion/rtcp v1.2.14 // indirect
github.com/pion/rtp v1.8.9 // indirect
github.com/pion/sdp/v3 v3.0.9 // indirect
github.com/pjbgf/sha1cd v0.3.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/pojntfx/go-auth-utils v0.1.0 // indirect
github.com/prometheus/client_model v0.6.0 // indirect
github.com/prometheus/common v0.47.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/rubenv/sql-migrate v1.7.0 // indirect
github.com/rymdport/portal v0.2.6 // indirect
github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3 // indirect
github.com/skeema/knownhosts v1.2.2 // indirect
github.com/songgao/water v0.0.0-20200317203138-2b4b6d7c09d8 // indirect
github.com/spf13/cast v1.7.0 // indirect
github.com/srwiley/oksvg v0.0.0-20221011165216-be6e8873101c // indirect
github.com/srwiley/rasterx v0.0.0-20220730225603-2ab79fcdd4ef // indirect
github.com/teivah/broadcast v0.1.0 // indirect
github.com/tklauser/go-sysconf v0.3.14 // indirect
github.com/tklauser/numcpus v0.8.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.12 // indirect
github.com/vishvananda/netlink v1.3.0 // indirect
github.com/vishvananda/netns v0.0.4 // indirect
github.com/volatiletech/inflect v0.0.1 // indirect
github.com/volatiletech/null/v8 v8.1.2 // indirect
github.com/volatiletech/randomize v0.0.1 // indirect
github.com/volatiletech/sqlboiler/v4 v4.16.2 // indirect
github.com/volatiletech/strmangle v0.0.6 // indirect
github.com/wlynxg/anet v0.0.4 // indirect
github.com/xaionaro-go/spinlock v0.0.0-20200518175509-30e6d1ce68a1 // indirect
github.com/xanzy/ssh-agent v0.3.3 // indirect
github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778 // indirect
@@ -164,17 +188,17 @@ require (
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/arch v0.8.0 // indirect
golang.org/x/exp v0.0.0-20240213143201-ec583247a57a // indirect
golang.org/x/exp v0.0.0-20240823005443-9b4947da3948 // indirect
golang.org/x/image v0.18.0 // indirect
golang.org/x/mobile v0.0.0-20240404231514-09dbf07665ed // indirect
golang.org/x/mod v0.18.0 // indirect
golang.org/x/mod v0.20.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/term v0.25.0 // indirect
golang.org/x/text v0.19.0 // indirect
golang.org/x/time v0.6.0 // indirect
golang.org/x/tools v0.22.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
golang.org/x/tools v0.24.0 // indirect
golang.org/x/xerrors v0.0.0-20240716161551-93cc26a95ae9 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
@@ -199,7 +223,7 @@ require (
github.com/blang/mpv v0.0.0-20160810175505-d56d7352e068
github.com/bluenviron/gortsplib/v4 v4.11.0
github.com/chai2010/webp v1.1.1
github.com/davecgh/go-spew v1.1.1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
github.com/dustin/go-humanize v1.0.1
github.com/ebitengine/oto/v3 v3.3.1
github.com/getsentry/sentry-go v0.28.1
@@ -218,7 +242,9 @@ require (
github.com/klauspost/compress v1.17.7
github.com/lusingander/colorpicker v0.7.3
github.com/pkg/errors v0.9.1
github.com/pojntfx/weron v0.2.7
github.com/prometheus/client_golang v1.18.0
github.com/rs/zerolog v1.33.0
github.com/sethvargo/go-password v0.3.1
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/sirupsen/logrus v1.9.3
@@ -230,7 +256,7 @@ require (
github.com/xaionaro-go/grpcproxy v0.0.0-20241030215807-d4204b934e10
github.com/xaionaro-go/kickcom v0.0.0-20241022142825-25a234cc8628
github.com/xaionaro-go/lockmap v0.0.0-20240901172806-e17aea364748
github.com/xaionaro-go/mediamtx v0.0.0-20241009124606-94c22c603970
github.com/xaionaro-go/mediamtx v0.0.0-20241103200202-882a99e8df73
github.com/xaionaro-go/object v0.0.0-20241026212449-753ce10ec94c
github.com/xaionaro-go/obs-grpc-proxy v0.0.0-20241018162120-5faf4e7a684a
github.com/xaionaro-go/timeapiio v0.0.0-20240915203246-b907cf699af3
@@ -249,14 +275,14 @@ require (
github.com/BurntSushi/xgbutil v0.0.0-20190907113008-ad855c713046
github.com/onsi/gomega v1.30.0 // indirect
github.com/phuslu/goid v1.0.1 // indirect
github.com/pion/datachannel v1.5.6 // indirect
github.com/pion/dtls/v2 v2.2.11 // indirect
github.com/pion/datachannel v1.5.9 // indirect
github.com/pion/dtls/v2 v2.2.12 // indirect
github.com/pion/mdns v0.0.12 // indirect
github.com/pion/randutil v0.1.0 // indirect
github.com/pion/sctp v1.8.16 // indirect
github.com/pion/srtp/v2 v2.0.18 // indirect
github.com/pion/sctp v1.8.33 // indirect
github.com/pion/srtp/v2 v2.0.20 // indirect
github.com/pion/stun v0.6.1 // indirect
github.com/pion/transport/v2 v2.2.5 // indirect
github.com/pion/transport/v2 v2.2.10 // indirect
github.com/pion/turn/v2 v2.1.6 // indirect
github.com/pion/webrtc/v3 v3.2.23 // indirect
github.com/pion/webrtc/v3 v3.3.0 // indirect
)

528
go.sum

File diff suppressed because it is too large Load Diff

3
pkg/consts/consts.go Normal file
View File

@@ -0,0 +1,3 @@
package consts
const AppName = "StreamPanel"

View File

@@ -0,0 +1,50 @@
package main
import (
"context"
"crypto/ed25519"
"crypto/rand"
"github.com/facebookincubator/go-belt"
"github.com/facebookincubator/go-belt/tool/logger"
xlogrus "github.com/facebookincubator/go-belt/tool/logger/implementation/logrus"
"github.com/facebookincubator/go-belt/tool/logger/types"
"github.com/spf13/pflag"
"github.com/xaionaro-go/streamctl/pkg/p2p"
)
func main() {
pflag.Parse()
ctx := logger.CtxWithLogger(context.Background(), xlogrus.Default().WithLevel(logger.LevelTrace))
ctx, cancelFn := context.WithCancel(ctx)
logger.Default = func() types.Logger {
return logger.FromCtx(ctx)
}
defer belt.Flush(ctx)
defer cancelFn()
_, peer0PrivKey, err := ed25519.GenerateKey(rand.Reader)
assertNoError(err)
p2p, err := p2p.NewP2P(
ctx,
peer0PrivKey,
"test",
"xaionaro-void-test",
[]byte("test"),
"",
)
assertNoError(err)
err = p2p.Start(ctx)
assertNoError(err)
<-context.Background().Done()
}
func assertNoError(err error) {
if err != nil {
panic(err)
}
}

View File

@@ -0,0 +1,3 @@
package consts
var SignalerURL = "wss://weron.dx.center/"

View File

@@ -0,0 +1,48 @@
package weron
import (
"fmt"
"io"
"net"
"time"
)
type dummyConn struct {
wrc io.ReadWriteCloser
lAddr net.Addr
}
func connFromReadWriter(
wrc io.ReadWriteCloser,
lAddr net.Addr,
) net.Conn {
return &dummyConn{
wrc: wrc,
lAddr: lAddr,
}
}
func (c *dummyConn) Read(b []byte) (n int, err error) {
return c.wrc.Read(b)
}
func (c *dummyConn) Write(b []byte) (n int, err error) {
return c.wrc.Write(b)
}
func (c *dummyConn) Close() error {
return c.wrc.Close()
}
func (c *dummyConn) LocalAddr() net.Addr {
return c.lAddr
}
func (c *dummyConn) RemoteAddr() net.Addr {
return nil
}
func (c *dummyConn) SetDeadline(t time.Time) error {
return fmt.Errorf("SetDeadline: not implemented")
}
func (c *dummyConn) SetReadDeadline(t time.Time) error {
return fmt.Errorf("SetReadDeadline: not implemented")
}
func (c *dummyConn) SetWriteDeadline(t time.Time) error {
return fmt.Errorf("SetWriteDeadline: not implemented")
}

View File

@@ -0,0 +1,38 @@
package weron
import (
"context"
"net"
"sync/atomic"
)
type dummyListener struct {
conn net.Conn
ctx context.Context
acceptCount atomic.Uint32
}
func newDummyListener(ctx context.Context, conn net.Conn) net.Listener {
return &dummyListener{conn: conn, ctx: ctx}
}
func (l *dummyListener) Accept() (net.Conn, error) {
if l.acceptCount.Add(1) > 1 {
<-l.ctx.Done()
return nil, l.ctx.Err()
}
return l.conn, nil
}
func (l *dummyListener) Close() error {
return nil
}
func (l *dummyListener) Addr() net.Addr {
lAddr := l.conn.LocalAddr()
if lAddr == nil {
panic("l.conn.LocalAddr() is nil")
}
return lAddr
}

View File

@@ -0,0 +1,385 @@
package weron
import (
"context"
"crypto/aes"
"crypto/ed25519"
"crypto/sha512"
"errors"
"fmt"
"net/url"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/pojntfx/weron/pkg/services"
"github.com/pojntfx/weron/pkg/wrtcconn"
"github.com/pojntfx/weron/pkg/wrtcip"
"github.com/xaionaro-go/streamctl/pkg/consts"
"github.com/xaionaro-go/streamctl/pkg/observability"
p2pconsts "github.com/xaionaro-go/streamctl/pkg/p2p/implementations/weron/consts"
"github.com/xaionaro-go/streamctl/pkg/p2p/types"
"github.com/xaionaro-go/streamctl/pkg/secret"
)
const (
salt = "this is not important what is here, important is that this is not an empty (and pretty unique) string"
)
type P2P struct {
startCount atomic.Uint32
locker sync.Mutex
waitGroup sync.WaitGroup
networkID string
peerName string
vpnAdapter *wrtcip.Adapter
conn0Adapter *wrtcconn.Adapter
conn1Adapter *wrtcconn.Adapter
privKey secret.Any[ed25519.PrivateKey]
psk secret.Any[[]byte]
peers map[string]*Peer
cancelFn context.CancelFunc
}
var _ types.P2P = (*P2P)(nil)
func getSignalerURL(
networkID string,
serviceID string,
signallerPassword string,
) (*url.URL, error) {
u, err := url.Parse(p2pconsts.SignalerURL)
if err != nil {
return nil, fmt.Errorf("unable to parse the initial URL: %w", err)
}
q := u.Query()
q.Set("community", networkID+"-"+serviceID)
q.Set("password", signallerPassword)
u.RawQuery = q.Encode()
return u, nil
}
func getICEServers() []string {
// TODO: implement me
return []string{"stun:stun.l.google.com:19302"}
}
var p2pCount atomic.Uint32
func NewP2P(
ctx context.Context,
privKey ed25519.PrivateKey,
peerName string,
networkID string,
psk []byte,
networkCIDR string,
) (*P2P, error) {
if len(psk) != aes.BlockSize {
return nil, fmt.Errorf("expected a Pre-Shared-Key of size 16, received %d", len(psk))
}
h := sha512.New512_256()
h.Write(psk)
h.Write([]byte(salt))
signalerPassword := h.Sum(nil)
p := &P2P{
networkID: networkID,
privKey: secret.New(privKey),
peerName: peerName,
peers: map[string]*Peer{},
}
signalerURLVPN, err := getSignalerURL(
networkID,
"vpn",
string(signalerPassword),
)
if err != nil {
return nil, fmt.Errorf("unable to get a signaler URL for VPN: %w", err)
}
logger.Debugf(ctx, "VPN signaler URL: %s", signalerURLVPN)
signalerURLConn0, err := getSignalerURL(
networkID,
"conn0",
string(signalerPassword),
)
if err != nil {
return nil, fmt.Errorf("unable to get a signaler URL for Conn0: %w", err)
}
logger.Debugf(ctx, "Conn0 signaler URL: %s", signalerURLConn0)
signalerURLConn1, err := getSignalerURL(
networkID,
"conn1",
string(signalerPassword),
)
if err != nil {
return nil, fmt.Errorf("unable to get a signaler URL for Conn1: %w", err)
}
logger.Debugf(ctx, "Conn1 signaler URL: %s", signalerURLConn0)
iceServers := getICEServers()
vpnAdapter := wrtcip.NewAdapter(
signalerURLVPN.String(), // TODO: replace the server-based signaler with a DHT-based signaler
string(psk), // TODO: use ECDH to generate an ephemeral encryption key
iceServers,
&wrtcip.AdapterConfig{
Device: fmt.Sprintf("%s%d", strings.ToLower(consts.AppName), p2pCount.Add(1)),
OnSignalerConnect: p.onVPNSignalerConnect,
OnPeerConnect: p.onVPNPeerConnect,
OnPeerDisconnected: p.onVPNPeerDisconnected,
CIDRs: []string{networkCIDR},
MaxRetries: 255, // TODO: pass this as an option
Parallel: runtime.GOMAXPROCS(0), // TODO: pass this as an option
NamedAdapterConfig: &wrtcconn.NamedAdapterConfig{
AdapterConfig: &wrtcconn.AdapterConfig{
Timeout: 10 * time.Second, // TODO: pass this as an option
ForceRelay: false, // TODO: pass this as an option
OnSignalerReconnect: p.onVPNSignalerReconnect,
},
IDChannel: services.IPID,
Kicks: 5 * time.Second, // TODO: pass this as an option
},
Static: true, // TODO: pass this as an option
},
ctx,
)
p.vpnAdapter = vpnAdapter
conn0Adapter := wrtcconn.NewAdapter(
signalerURLConn0.String(), // TODO: replace the server-based signaler with a DHT-based signaler
string("test"), // TODO: use ECDH to generate an ephemeral encryption key
iceServers,
[]string{"streampanel/connection_bus_0"},
&wrtcconn.AdapterConfig{
Timeout: 10 * time.Second, // TODO: pass this as an option
ID: p.GetPeerID().String(),
ForceRelay: false, // TODO: pass this as an option
OnSignalerReconnect: p.onConn0SignalerReconnect,
},
ctx,
)
p.conn0Adapter = conn0Adapter
conn1Adapter := wrtcconn.NewAdapter(
signalerURLConn1.String(), // TODO: replace the server-based signaler with a DHT-based signaler
string(psk), // TODO: use ECDH to generate an ephemeral encryption key
iceServers,
[]string{"streampanel/connection_bus_1"},
&wrtcconn.AdapterConfig{
Timeout: 10 * time.Second, // TODO: pass this as an option
ID: p.GetPeerID().String(),
ForceRelay: false, // TODO: pass this as an option
OnSignalerReconnect: p.onConn1SignalerReconnect,
},
ctx,
)
p.conn1Adapter = conn1Adapter
return p, nil
}
func (p *P2P) Start(
ctx context.Context,
) (_err error) {
logger.Debugf(ctx, "Start")
defer func() { logger.Debugf(ctx, "/Start: %v", _err) }()
if p.startCount.Add(1) > 1 {
return fmt.Errorf("Start could be called only once")
}
ctx, p.cancelFn = context.WithCancel(ctx)
if err := p.vpnAdapter.Open(); err != nil {
logger.Errorf(ctx, "unable to initialize the VPN: %v", err)
}
myIDChan0, err := p.conn0Adapter.Open()
if err != nil {
return fmt.Errorf("unable to initialize the P2P connection 0 adapter: %w", err)
}
myIDChan1, err := p.conn1Adapter.Open()
if err != nil {
return fmt.Errorf("unable to initialize the P2P connection 1 adapter: %w", err)
}
p.waitGroup.Add(1)
observability.Go(ctx, func() {
defer p.waitGroup.Done()
err := p.acceptPeersLoop(ctx, 0, myIDChan0)
if err != nil && !errors.Is(err, context.Canceled) {
logger.Errorf(ctx, "the peers accepting loop for conn 0 returned an error: %v", err)
}
})
p.waitGroup.Add(1)
observability.Go(ctx, func() {
defer p.waitGroup.Done()
err := p.acceptPeersLoop(ctx, 1, myIDChan1)
if err != nil && !errors.Is(err, context.Canceled) {
logger.Errorf(ctx, "the peers accepting loop for conn 1 returned an error: %v", err)
}
})
return nil
}
func (p *P2P) acceptPeersLoop(
ctx context.Context,
connID int,
myIDChan <-chan string,
) (_err error) {
logger.Debugf(ctx, "acceptPeersLoop")
defer func() { logger.Debugf(ctx, "/acceptPeersLoop: %v", _err) }()
var acceptChan chan *wrtcconn.Peer
switch connID {
case 0:
acceptChan = p.conn0Adapter.Accept()
case 1:
acceptChan = p.conn1Adapter.Accept()
default:
return fmt.Errorf("unexpected conn ID: %d", connID)
}
for {
select {
case <-ctx.Done():
return ctx.Err()
case receivedID := <-myIDChan:
logger.Debugf(ctx, "connected to the signaller with ID %v", receivedID)
receivedID = strings.SplitN(receivedID, "-", 2)[0]
if receivedID != p.GetPeerID().String() {
return fmt.Errorf("received unexpected PeerID as my peer ID: %s != %s", receivedID, p.GetPeerID())
}
case wrtcPeer := <-acceptChan:
_, err := p.newPeer(ctx, connID, wrtcPeer)
if err != nil {
logger.Errorf(ctx, "unable to initialize the peer %s: %v", wrtcPeer.PeerID, err)
}
}
}
}
func (p *P2P) newPeer(
ctx context.Context,
connID int,
wrtcPeer *wrtcconn.Peer,
) (_ret *Peer, _err error) {
logger.Debugf(ctx, "newPeer(ctx, %d, Peer:%s)", connID, wrtcPeer.PeerID)
defer func() { logger.Debugf(ctx, "/newPeer(ctx, %d, Peer:%s): %v", connID, wrtcPeer.PeerID, _err) }()
peerID, err := types.ParsePeerID(wrtcPeer.PeerID)
if err != nil {
return nil, fmt.Errorf("unable to parse the peerID: %w", err)
}
p.locker.Lock()
defer p.locker.Unlock()
peer := p.peers[peerID.String()]
if peer == nil {
peer = newPeer(peerID, p)
p.peers[peerID.String()] = peer
}
isServer := peerID.Less(p.GetPeerID()) == (connID == 0)
if isServer {
if err := peer.initServer(ctx, wrtcPeer); err != nil {
return nil, fmt.Errorf("unable to initialize the peer server handler: %w", err)
}
} else {
if err := peer.initClient(ctx, wrtcPeer); err != nil {
return nil, fmt.Errorf("unable to initialize the peer client handler: %w", err)
}
}
return peer, nil
}
func (p *P2P) removePeer(
peerID types.PeerID,
) (_err error) {
ctx := context.TODO()
logger.Debugf(ctx, "removePeer")
defer func() { logger.Debugf(ctx, "/removePeer: %v", _err) }()
p.locker.Lock()
defer p.locker.Unlock()
peerIDString := peerID.String()
if _, ok := p.peers[peerIDString]; !ok {
return fmt.Errorf("peer %s not found", peerIDString)
}
delete(p.peers, peerIDString)
return nil
}
func (p *P2P) Close() (_err error) {
ctx := context.TODO()
logger.Debugf(ctx, "Close")
defer func() { logger.Debugf(ctx, "/Close: %v", _err) }()
return p.vpnAdapter.Close()
}
func (p *P2P) onVPNSignalerConnect(myPeerID string) {
ctx := context.TODO()
logger.Debugf(ctx, "Close")
defer func() { logger.Debugf(ctx, "/Close") }()
}
func (p *P2P) onVPNSignalerReconnect() {
ctx := context.TODO()
logger.Debugf(ctx, "onVPNSignalerReconnect")
defer func() { logger.Debugf(ctx, "/onVPNSignalerReconnect") }()
}
func (p *P2P) onConn0SignalerReconnect() {
ctx := context.TODO()
logger.Debugf(ctx, "onConn0SignalerReconnect")
defer func() { logger.Debugf(ctx, "/onConn0SignalerReconnect") }()
}
func (p *P2P) onConn1SignalerReconnect() {
ctx := context.TODO()
logger.Debugf(ctx, "onConn1SignalerReconnect")
defer func() { logger.Debugf(ctx, "/onConn1SignalerReconnect") }()
}
func (p *P2P) onVPNPeerConnect(peerIDString string) {
ctx := context.TODO()
logger.Debugf(ctx, "onVPNPeerConnect")
defer func() { logger.Debugf(ctx, "/onVPNPeerConnect") }()
}
func (p *P2P) onVPNPeerDisconnected(peerID string) {
ctx := context.TODO()
logger.Debugf(ctx, "onVPNPeerDisconnected")
defer func() { logger.Debugf(ctx, "/onVPNPeerDisconnected") }()
}
func (p *P2P) GetPeerID() types.PeerID {
return types.PeerID(p.privKey.Get().Public().(ed25519.PublicKey))
}
func (p *P2P) GetNetworkID() string {
return p.networkID
}
func (p *P2P) GetPSK() []byte {
return p.psk.Get()
}
func (p *P2P) GetPeers() ([]types.Peer, error) {
result := make([]types.Peer, 0, len(p.peers))
for _, peer := range p.peers {
result = append(result, peer)
}
return result, nil
}

View File

@@ -0,0 +1,70 @@
package weron
import (
"context"
"fmt"
"net"
"github.com/pojntfx/weron/pkg/wrtcconn"
"github.com/xaionaro-go/streamctl/pkg/p2p/types"
)
type Peer struct {
id types.PeerID
name string
network *P2P
*peerServer
*peerClient
}
var _ types.Peer = (*Peer)(nil)
func newPeer(id types.PeerID, network *P2P) *Peer {
return &Peer{
id: id,
network: network,
}
}
func (p *Peer) initServer(
ctx context.Context,
wrtcPeer *wrtcconn.Peer,
) error {
peerServer := newPeerServer(p, wrtcPeer)
if err := peerServer.init(ctx); err != nil {
return err
}
p.peerServer = peerServer
return nil
}
func (p *Peer) initClient(
ctx context.Context,
wrtcPeer *wrtcconn.Peer,
) error {
peerClient := newPeerClient(p, wrtcPeer)
if err := peerClient.init(ctx); err != nil {
return err
}
p.peerClient = peerClient
return nil
}
func (p *Peer) GetID() types.PeerID {
return p.id
}
func (p *Peer) GetName() string {
return p.name
}
func (p *Peer) DialContext(
ctx context.Context,
network string,
addr string,
) (net.Conn, error) {
if p.peerClient == nil {
return nil, fmt.Errorf("the client connection is not initialized yet")
}
return p.peerClient.DialContext(ctx, network, addr)
}

View File

@@ -0,0 +1,114 @@
package weron
import (
"context"
"fmt"
"net"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/pojntfx/weron/pkg/wrtcconn"
"github.com/xaionaro-go/grpcproxy/grpchttpproxy"
"github.com/xaionaro-go/grpcproxy/protobuf/go/proxy_grpc"
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/streamctl/pkg/p2p/implementations/weron/protobuf/go/p2p_grpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
type peerClient struct {
peer *Peer
wrtcPeer *wrtcconn.Peer
grpcConn *grpc.ClientConn
peerClient p2p_grpc.PeerClient
proxyClient proxy_grpc.NetworkProxyClient
cancelFn context.CancelFunc
}
func newPeerClient(
peer *Peer,
wrtcPeer *wrtcconn.Peer,
) *peerClient {
return &peerClient{
peer: peer,
wrtcPeer: wrtcPeer,
}
}
func (p *peerClient) init(
ctx context.Context,
) error {
ctx, cancelFn := context.WithCancel(ctx)
p.cancelFn = cancelFn
observability.Go(ctx, func() {
<-ctx.Done()
if err := p.peer.network.removePeer(p.peer.id); err != nil {
logger.Errorf(ctx, "unable to remove peer '%s': %v", p.peer.id, err)
}
if err := p.wrtcPeer.Conn.Close(); err != nil {
logger.Errorf(ctx, "unable to close peer '%s': %v", p.peer.id, err)
}
})
grpcConn, err := grpc.NewClient(
"0.0.0.0:0",
grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) {
return connFromReadWriter(p.wrtcPeer.Conn, &net.IPAddr{IP: net.ParseIP("0.0.0.0")}), nil
}),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithStatsHandler(&peerClientStatsHandler{p}),
)
if err != nil {
return fmt.Errorf("unable to create a gRPC client: %w", err)
}
p.grpcConn = grpcConn
p.proxyClient = proxy_grpc.NewNetworkProxyClient(grpcConn)
p.peerClient = p2p_grpc.NewPeerClient(grpcConn)
nameReply, err := p.peerClient.GetName(ctx, &p2p_grpc.GetNameRequest{})
if err != nil {
return fmt.Errorf("unable to request the peer name: %w", err)
}
p.peer.name = nameReply.GetName()
logger.Debugf(ctx, "peer name: %s", p.peer.name)
err = p.Ping(ctx, "")
if err != nil {
return fmt.Errorf("ping failed: %w", err)
}
return nil
}
func (p *peerClient) Ping(
ctx context.Context,
payload string,
) error {
reply, err := p.peerClient.Ping(ctx, &p2p_grpc.PingRequest{
Payload: payload,
})
if err != nil {
return err
}
if reply.GetPayload() != payload {
return fmt.Errorf("the payload did not match: '%s' != '%s'", reply.GetPayload(), payload)
}
return nil
}
func (p *peerClient) Close() error {
p.cancelFn()
return nil
}
func (p *peerClient) DialContext(
ctx context.Context,
network string,
addr string,
) (net.Conn, error) {
return grpchttpproxy.NewDialer(p.proxyClient).DialContext(ctx, "tcp", addr)
}
func (p *peerClient) GRPCConn() *grpc.ClientConn {
return p.grpcConn
}

View File

@@ -0,0 +1,27 @@
package weron
import (
"context"
"github.com/facebookincubator/go-belt/tool/logger"
"google.golang.org/grpc/stats"
)
type peerClientStatsHandler struct {
*peerClient
}
func (p *peerClientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
logger.Tracef(ctx, "TagRPC(ctx, %#+v)", info)
return ctx
}
func (p *peerClientStatsHandler) HandleRPC(ctx context.Context, stats stats.RPCStats) {
logger.Tracef(ctx, "HandleRPC(ctx, %#+v)", stats)
}
func (p *peerClientStatsHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
logger.Debugf(ctx, "TagConn(ctx, %#+v)", info)
return ctx
}
func (p *peerClientStatsHandler) HandleConn(ctx context.Context, stats stats.ConnStats) {
logger.Debugf(ctx, "HandleConn(ctx, %#+v)", stats)
}

View File

@@ -0,0 +1,77 @@
package weron
import (
"context"
"net"
"sync"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/pojntfx/weron/pkg/wrtcconn"
"github.com/xaionaro-go/grpcproxy/grpcproxyserver"
"github.com/xaionaro-go/grpcproxy/protobuf/go/proxy_grpc"
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/streamctl/pkg/p2p/implementations/weron/protobuf/go/p2p_grpc"
"google.golang.org/grpc"
)
type peerServer struct {
peer *Peer
waitGroup sync.WaitGroup
wrtcPeer *wrtcconn.Peer
p2p_grpc.UnimplementedPeerServer
}
func newPeerServer(
peer *Peer,
wrtcPeer *wrtcconn.Peer,
) *peerServer {
return &peerServer{
peer: peer,
wrtcPeer: wrtcPeer,
}
}
func (p *peerServer) init(
ctx context.Context,
) error {
ctx, cancelFn := context.WithCancel(ctx)
observability.Go(ctx, func() {
<-ctx.Done()
if err := p.peer.network.removePeer(p.peer.id); err != nil {
logger.Errorf(ctx, "unable to remove peer '%s': %v", p.peer.id, err)
}
if err := p.wrtcPeer.Conn.Close(); err != nil {
logger.Errorf(ctx, "unable to close peer '%s': %v", p.peer.id, err)
}
})
grpcServerListener := newDummyListener(ctx, connFromReadWriter(p.wrtcPeer.Conn, &net.IPAddr{IP: net.ParseIP("0.0.0.0")}))
grpcServer := grpc.NewServer()
proxyServer := grpcproxyserver.New()
proxy_grpc.RegisterNetworkProxyServer(grpcServer, proxyServer)
p2p_grpc.RegisterPeerServer(grpcServer, p)
p.waitGroup.Add(1)
observability.Go(ctx, func() {
defer cancelFn()
defer p.waitGroup.Done()
logger.Infof(ctx, "started the gRPC server at '%s'", grpcServerListener.Addr())
err := grpcServer.Serve(grpcServerListener)
if err != nil {
logger.Errorf(ctx, "unable to serve the gRPC server: %v", err)
}
})
return nil
}
func (p *peerServer) GetName(context.Context, *p2p_grpc.GetNameRequest) (*p2p_grpc.GetNameReply, error) {
return &p2p_grpc.GetNameReply{
Name: p.peer.network.peerName,
}, nil
}
func (p *peerServer) Ping(ctx context.Context, req *p2p_grpc.PingRequest) (*p2p_grpc.PingReply, error) {
return &p2p_grpc.PingReply{
Payload: req.GetPayload(),
}, nil
}

View File

@@ -0,0 +1,7 @@
all: go
go:
protoc --go_out=. --go-grpc_out=. p2p.proto
mv github.com/xaionaro-go/streamctl/pkg/p2p/implementations/weron/protobuf/go .
rm -rf github.com

View File

@@ -0,0 +1,333 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.32.0
// protoc v3.21.12
// source: p2p.proto
package p2p_grpc
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type PingRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Payload string `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"`
}
func (x *PingRequest) Reset() {
*x = PingRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_p2p_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *PingRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*PingRequest) ProtoMessage() {}
func (x *PingRequest) ProtoReflect() protoreflect.Message {
mi := &file_p2p_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use PingRequest.ProtoReflect.Descriptor instead.
func (*PingRequest) Descriptor() ([]byte, []int) {
return file_p2p_proto_rawDescGZIP(), []int{0}
}
func (x *PingRequest) GetPayload() string {
if x != nil {
return x.Payload
}
return ""
}
type PingReply struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Payload string `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"`
}
func (x *PingReply) Reset() {
*x = PingReply{}
if protoimpl.UnsafeEnabled {
mi := &file_p2p_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *PingReply) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*PingReply) ProtoMessage() {}
func (x *PingReply) ProtoReflect() protoreflect.Message {
mi := &file_p2p_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use PingReply.ProtoReflect.Descriptor instead.
func (*PingReply) Descriptor() ([]byte, []int) {
return file_p2p_proto_rawDescGZIP(), []int{1}
}
func (x *PingReply) GetPayload() string {
if x != nil {
return x.Payload
}
return ""
}
type GetNameRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
}
func (x *GetNameRequest) Reset() {
*x = GetNameRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_p2p_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *GetNameRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*GetNameRequest) ProtoMessage() {}
func (x *GetNameRequest) ProtoReflect() protoreflect.Message {
mi := &file_p2p_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use GetNameRequest.ProtoReflect.Descriptor instead.
func (*GetNameRequest) Descriptor() ([]byte, []int) {
return file_p2p_proto_rawDescGZIP(), []int{2}
}
type GetNameReply struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
}
func (x *GetNameReply) Reset() {
*x = GetNameReply{}
if protoimpl.UnsafeEnabled {
mi := &file_p2p_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *GetNameReply) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*GetNameReply) ProtoMessage() {}
func (x *GetNameReply) ProtoReflect() protoreflect.Message {
mi := &file_p2p_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use GetNameReply.ProtoReflect.Descriptor instead.
func (*GetNameReply) Descriptor() ([]byte, []int) {
return file_p2p_proto_rawDescGZIP(), []int{3}
}
func (x *GetNameReply) GetName() string {
if x != nil {
return x.Name
}
return ""
}
var File_p2p_proto protoreflect.FileDescriptor
var file_p2p_proto_rawDesc = []byte{
0x0a, 0x09, 0x70, 0x32, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x03, 0x70, 0x32, 0x70,
0x22, 0x27, 0x0a, 0x0b, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12,
0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x25, 0x0a, 0x09, 0x50, 0x69, 0x6e,
0x67, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61,
0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64,
0x22, 0x10, 0x0a, 0x0e, 0x47, 0x65, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x22, 0x22, 0x0a, 0x0c, 0x47, 0x65, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x52, 0x65, 0x70,
0x6c, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x32, 0x67, 0x0a, 0x04, 0x50, 0x65, 0x65, 0x72, 0x12, 0x33,
0x0a, 0x07, 0x47, 0x65, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x13, 0x2e, 0x70, 0x32, 0x70, 0x2e,
0x47, 0x65, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x11,
0x2e, 0x70, 0x32, 0x70, 0x2e, 0x47, 0x65, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x52, 0x65, 0x70, 0x6c,
0x79, 0x22, 0x00, 0x12, 0x2a, 0x0a, 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x10, 0x2e, 0x70, 0x32,
0x70, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0e, 0x2e,
0x70, 0x32, 0x70, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x42,
0x55, 0x5a, 0x53, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x78, 0x61,
0x69, 0x6f, 0x6e, 0x61, 0x72, 0x6f, 0x2d, 0x67, 0x6f, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d,
0x63, 0x74, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x32, 0x70, 0x2f, 0x69, 0x6d, 0x70, 0x6c,
0x65, 0x6d, 0x65, 0x6e, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2f, 0x77, 0x65, 0x72, 0x6f,
0x6e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x67, 0x6f, 0x2f, 0x70, 0x32,
0x70, 0x5f, 0x67, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_p2p_proto_rawDescOnce sync.Once
file_p2p_proto_rawDescData = file_p2p_proto_rawDesc
)
func file_p2p_proto_rawDescGZIP() []byte {
file_p2p_proto_rawDescOnce.Do(func() {
file_p2p_proto_rawDescData = protoimpl.X.CompressGZIP(file_p2p_proto_rawDescData)
})
return file_p2p_proto_rawDescData
}
var file_p2p_proto_msgTypes = make([]protoimpl.MessageInfo, 4)
var file_p2p_proto_goTypes = []interface{}{
(*PingRequest)(nil), // 0: p2p.PingRequest
(*PingReply)(nil), // 1: p2p.PingReply
(*GetNameRequest)(nil), // 2: p2p.GetNameRequest
(*GetNameReply)(nil), // 3: p2p.GetNameReply
}
var file_p2p_proto_depIdxs = []int32{
2, // 0: p2p.Peer.GetName:input_type -> p2p.GetNameRequest
0, // 1: p2p.Peer.Ping:input_type -> p2p.PingRequest
3, // 2: p2p.Peer.GetName:output_type -> p2p.GetNameReply
1, // 3: p2p.Peer.Ping:output_type -> p2p.PingReply
2, // [2:4] is the sub-list for method output_type
0, // [0:2] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_p2p_proto_init() }
func file_p2p_proto_init() {
if File_p2p_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_p2p_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*PingRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_p2p_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*PingReply); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_p2p_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*GetNameRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_p2p_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*GetNameReply); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_p2p_proto_rawDesc,
NumEnums: 0,
NumMessages: 4,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_p2p_proto_goTypes,
DependencyIndexes: file_p2p_proto_depIdxs,
MessageInfos: file_p2p_proto_msgTypes,
}.Build()
File_p2p_proto = out.File
file_p2p_proto_rawDesc = nil
file_p2p_proto_goTypes = nil
file_p2p_proto_depIdxs = nil
}

View File

@@ -0,0 +1,133 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
package p2p_grpc
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion7
// PeerClient is the client API for Peer service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type PeerClient interface {
GetName(ctx context.Context, in *GetNameRequest, opts ...grpc.CallOption) (*GetNameReply, error)
Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingReply, error)
}
type peerClient struct {
cc grpc.ClientConnInterface
}
func NewPeerClient(cc grpc.ClientConnInterface) PeerClient {
return &peerClient{cc}
}
func (c *peerClient) GetName(ctx context.Context, in *GetNameRequest, opts ...grpc.CallOption) (*GetNameReply, error) {
out := new(GetNameReply)
err := c.cc.Invoke(ctx, "/p2p.Peer/GetName", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *peerClient) Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingReply, error) {
out := new(PingReply)
err := c.cc.Invoke(ctx, "/p2p.Peer/Ping", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// PeerServer is the server API for Peer service.
// All implementations must embed UnimplementedPeerServer
// for forward compatibility
type PeerServer interface {
GetName(context.Context, *GetNameRequest) (*GetNameReply, error)
Ping(context.Context, *PingRequest) (*PingReply, error)
mustEmbedUnimplementedPeerServer()
}
// UnimplementedPeerServer must be embedded to have forward compatible implementations.
type UnimplementedPeerServer struct {
}
func (UnimplementedPeerServer) GetName(context.Context, *GetNameRequest) (*GetNameReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetName not implemented")
}
func (UnimplementedPeerServer) Ping(context.Context, *PingRequest) (*PingReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented")
}
func (UnimplementedPeerServer) mustEmbedUnimplementedPeerServer() {}
// UnsafePeerServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to PeerServer will
// result in compilation errors.
type UnsafePeerServer interface {
mustEmbedUnimplementedPeerServer()
}
func RegisterPeerServer(s *grpc.Server, srv PeerServer) {
s.RegisterService(&_Peer_serviceDesc, srv)
}
func _Peer_GetName_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetNameRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(PeerServer).GetName(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/p2p.Peer/GetName",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(PeerServer).GetName(ctx, req.(*GetNameRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Peer_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(PingRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(PeerServer).Ping(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/p2p.Peer/Ping",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(PeerServer).Ping(ctx, req.(*PingRequest))
}
return interceptor(ctx, in, info, handler)
}
var _Peer_serviceDesc = grpc.ServiceDesc{
ServiceName: "p2p.Peer",
HandlerType: (*PeerServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "GetName",
Handler: _Peer_GetName_Handler,
},
{
MethodName: "Ping",
Handler: _Peer_Ping_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "p2p.proto",
}

View File

@@ -0,0 +1,20 @@
syntax = "proto3";
package p2p;
option go_package = "github.com/xaionaro-go/streamctl/pkg/p2p/implementations/weron/protobuf/go/p2p_grpc";
service Peer {
rpc GetName(GetNameRequest) returns (GetNameReply) {}
rpc Ping(PingRequest) returns (PingReply) {}
}
message PingRequest {
string payload = 1;
}
message PingReply {
string payload = 1;
}
message GetNameRequest {}
message GetNameReply {
string name = 1;
}

29
pkg/p2p/p2p.go Normal file
View File

@@ -0,0 +1,29 @@
package p2p
import (
"context"
"crypto/ed25519"
"crypto/sha1"
"github.com/xaionaro-go/streamctl/pkg/p2p/implementations/weron"
"github.com/xaionaro-go/streamctl/pkg/p2p/types"
)
func NewP2P(
ctx context.Context,
privKey ed25519.PrivateKey,
peerName string,
networkID string,
psk []byte,
networkCIDR string,
) (types.P2P, error) {
pskHash := sha1.Sum(psk)
return weron.NewP2P(
ctx,
privKey,
peerName,
networkID,
pskHash[:16], // TODO; fix this
networkCIDR,
)
}

View File

@@ -0,0 +1,196 @@
//go:build e2e_tests
// +build e2e_tests
package e2e
import (
"context"
"crypto/ed25519"
"crypto/rand"
"fmt"
"io"
"net"
"net/http"
"net/url"
"sync"
"testing"
"time"
"github.com/facebookincubator/go-belt"
"github.com/facebookincubator/go-belt/tool/logger"
xlogrus "github.com/facebookincubator/go-belt/tool/logger/implementation/logrus"
"github.com/pojntfx/weron/pkg/wrtcsgl"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/stretchr/testify/require"
"github.com/xaionaro-go/streamctl/pkg/p2p"
p2pconsts "github.com/xaionaro-go/streamctl/pkg/p2p/implementations/weron/consts"
"github.com/xaionaro-go/streamctl/pkg/p2p/types"
)
const (
signalerAddr = "127.0.0.1:28573"
)
func newSignaler(ctx context.Context) (*wrtcsgl.Signaler, error) {
signaler := wrtcsgl.NewSignaler(
signalerAddr,
"",
"",
&wrtcsgl.SignalerConfig{
Heartbeat: 10 * time.Second,
Cleanup: false,
EphemeralCommunities: true,
APIUsername: "",
APIPassword: "",
OIDCIssuer: "",
OIDCClientID: "",
OnConnect: func(raddr, community string) {
logger.FromCtx(ctx).
WithField("address", raddr).
WithField("community", community).
Infof("connected to a client")
},
OnDisconnect: func(raddr, community string, err any) {
logger.FromCtx(ctx).
WithField("address", raddr).
WithField("community", community).
WithField("error", err).
Infof("disconnected from the client")
},
},
ctx,
)
if err := signaler.Open(); err != nil {
return nil, err
}
return signaler, nil
}
func TestE2E(t *testing.T) {
var wg sync.WaitGroup
ctx := logger.CtxWithLogger(context.Background(), xlogrus.Default().WithLevel(logger.LevelTrace))
ctx, cancelFn := context.WithCancel(ctx)
logger.Default = func() logger.Logger {
return logger.FromCtx(ctx)
}
defer belt.Flush(ctx)
log.Logger = &zerolog.Logger{Logger: logger.FromCtx(ctx)}
signaler, err := newSignaler(ctx)
require.NoError(t, err)
defer signaler.Close()
p2pconsts.SignalerURL = "ws://" + signalerAddr
peer0PubKey, peer0PrivKey, err := ed25519.GenerateKey(rand.Reader)
peer1PubKey, peer1PrivKey, err := ed25519.GenerateKey(rand.Reader)
require.NoError(t, err)
const (
peer0Name = "peer0"
peer1Name = "peer1"
networkID = "network"
psk = "psk"
networkCIDR = "fd51:2eaf:7a4e::/64"
)
p2pPeer0, err := p2p.NewP2P(
ctx,
peer0PrivKey,
peer0Name,
networkID,
[]byte(psk),
networkCIDR,
)
require.NoError(t, err)
err = p2pPeer0.Start(ctx)
require.NoError(t, err)
p2pPeer1, err := p2p.NewP2P(
ctx,
peer1PrivKey,
peer1Name,
networkID,
[]byte(psk),
networkCIDR,
)
require.NoError(t, err)
time.Sleep(1 * time.Second)
err = p2pPeer1.Start(ctx)
require.NoError(t, err)
//p2pPeer0.WaitForPeer(peer1PubKey)
//p2pPeer1.WaitForPeer(peer0PubKey)
time.Sleep(5 * time.Second)
peer0Peers, getPeers0Err := p2pPeer0.GetPeers()
peer1Peers, getPeers1Err := p2pPeer1.GetPeers()
require.NoError(t, getPeers0Err)
require.Len(t, peer0Peers, 1)
require.Equal(t, peer1Name, peer0Peers[0].GetName())
require.Equal(t, types.PeerID(peer1PubKey), peer0Peers[0].GetID())
require.NoError(t, getPeers1Err)
require.Len(t, peer1Peers, 1)
require.Equal(t, peer0Name, peer1Peers[0].GetName())
require.Equal(t, types.PeerID(peer0PubKey), peer1Peers[0].GetID())
t.Run("http", func(t *testing.T) {
var wg sync.WaitGroup
mux := http.NewServeMux()
mux.HandleFunc("GET /somePath/", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, "OK\n")
})
finalEndpointListener, err := net.ListenTCP("tcp", &net.TCPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: 0,
})
require.NoError(t, err)
wg.Add(1)
go func() {
defer wg.Done()
logger.Debugf(ctx, "the final endpoint server start")
defer logger.Debugf(ctx, "the final endpoint server ended")
logger.Infof(ctx, "started the final endpoint server at '%s'", finalEndpointListener.Addr())
err := http.Serve(finalEndpointListener, mux)
require.Contains(t, err.Error(), "closed network")
}()
httpClient := &http.Client{
Transport: &http.Transport{
DialContext: peer0Peers[0].DialContext,
},
}
u := &url.URL{
Scheme: "http",
Host: finalEndpointListener.Addr().String(),
Path: "/somePath",
}
for i := 0; i < 2; i++ {
resp, err := httpClient.Get(u.String())
require.NoError(t, err)
body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.Equal(t, "OK\n", string(body))
}
err = finalEndpointListener.Close()
require.NoError(t, err)
wg.Wait()
})
cancelFn()
wg.Wait()
}

14
pkg/p2p/types/p2p.go Normal file
View File

@@ -0,0 +1,14 @@
package types
import (
"context"
"io"
)
type P2P interface {
io.Closer
Start(context.Context) error
GetNetworkID() string
GetPSK() []byte
GetPeers() ([]Peer, error)
}

12
pkg/p2p/types/peer.go Normal file
View File

@@ -0,0 +1,12 @@
package types
import (
"context"
"net"
)
type Peer interface {
GetID() PeerID
GetName() string
DialContext(ctx context.Context, network string, addr string) (net.Conn, error)
}

31
pkg/p2p/types/peer_id.go Normal file
View File

@@ -0,0 +1,31 @@
package types
import (
"crypto/ed25519"
"encoding/base64"
"fmt"
)
type PeerID ed25519.PublicKey
func (id PeerID) String() string {
return base64.StdEncoding.EncodeToString((ed25519.PublicKey)(id))
}
func (id PeerID) Less(cmp PeerID) bool {
return id.String() < cmp.String()
}
func ParsePeerID(s string) (PeerID, error) {
peerIDBytes, err := base64.StdEncoding.DecodeString(s)
if err != nil {
return nil, fmt.Errorf("unable to base64-decode the peerID: %w", err)
}
peerID := ed25519.PublicKey(peerIDBytes)
if len(peerID) != ed25519.PublicKeySize {
return nil, fmt.Errorf("expected to see an ED25519 public key (size %d), but received a sequence of size %d", ed25519.PublicKeySize, len(peerID))
}
return PeerID(peerID), nil
}

View File

@@ -586,7 +586,7 @@ func (d *StreamD) StartStream(
*profile,
customArgs...)
if err != nil {
return fmt.Errorf("unable to start the stream on Twitch: %w", err)
return fmt.Errorf("unable to start the stream on Kick: %w", err)
}
return nil
case youtube.ID:

View File

@@ -12,6 +12,7 @@ import (
"fyne.io/fyne/v2/container"
"fyne.io/fyne/v2/widget"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/streamctl/pkg/consts"
"github.com/xaionaro-go/streamctl/pkg/xsync"
)
@@ -140,7 +141,7 @@ func (p *Panel) DisplayError(err error) {
p.displayErrorWindow.SetContent(textWidget)
return
}
w := p.app.NewWindow(AppName + ": Got an error: " + err.Error())
w := p.app.NewWindow(consts.AppName + ": Got an error: " + err.Error())
resizeWindow(w, fyne.NewSize(400, 300))
w.SetContent(textWidget)

View File

@@ -32,6 +32,7 @@ import (
"github.com/xaionaro-go/obs-grpc-proxy/protobuf/go/obs_grpc"
"github.com/xaionaro-go/streamctl/pkg/autoupdater"
"github.com/xaionaro-go/streamctl/pkg/buildvars"
gconsts "github.com/xaionaro-go/streamctl/pkg/consts"
"github.com/xaionaro-go/streamctl/pkg/oauthhandler"
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/streamctl/pkg/screenshot"
@@ -54,7 +55,6 @@ import (
"google.golang.org/grpc"
)
const AppName = "StreamPanel"
const youtubeTitleLength = 90
type Profile struct {
@@ -624,7 +624,7 @@ func (p *Panel) newLoadingWindow(ctx context.Context) fyne.Window {
logger.FromCtx(ctx).Debugf("newLoadingWindow")
defer logger.FromCtx(ctx).Debugf("endof newLoadingWindow")
w := p.app.NewWindow(AppName + ": Loading...")
w := p.app.NewWindow(gconsts.AppName + ": Loading...")
w.Show()
return w
@@ -634,7 +634,7 @@ func (p *Panel) newConnectingWindow(ctx context.Context) fyne.Window {
logger.FromCtx(ctx).Debugf("newConnectingWindow")
defer logger.FromCtx(ctx).Debugf("endof newConnectingWindow")
w := p.app.NewWindow(AppName + ": Connecting...")
w := p.app.NewWindow(gconsts.AppName + ": Connecting...")
w.Show()
return w
@@ -734,7 +734,7 @@ func (p *Panel) InputOBSConnectInfo(
ctx context.Context,
cfg *streamcontrol.PlatformConfig[obs.PlatformSpecificConfig, obs.StreamProfile],
) BackendStatusCode {
w := p.app.NewWindow(AppName + ": Input OBS connection info")
w := p.app.NewWindow(gconsts.AppName + ": Input OBS connection info")
resizeWindow(w, fyne.NewSize(600, 200))
hostField := widget.NewEntry()
@@ -930,7 +930,7 @@ func (p *Panel) openBrowser(
waitCh := make(chan struct{})
w := p.app.NewWindow(AppName + ": Browser selection window")
w := p.app.NewWindow(gconsts.AppName + ": Browser selection window")
resizeWindow(w, fyne.NewSize(600, 400))
if reason != "" {
reason += ". "
@@ -980,7 +980,7 @@ func (p *Panel) InputTwitchUserInfo(
ctx context.Context,
cfg *streamcontrol.PlatformConfig[twitch.PlatformSpecificConfig, twitch.StreamProfile],
) BackendStatusCode {
w := p.app.NewWindow(AppName + ": Input Twitch user info")
w := p.app.NewWindow(gconsts.AppName + ": Input Twitch user info")
resizeWindow(w, fyne.NewSize(600, 200))
clientSecretIsBuiltin := buildvars.TwitchClientID != "" && buildvars.TwitchClientSecret != ""
@@ -1063,7 +1063,7 @@ func (p *Panel) InputKickUserInfo(
ctx context.Context,
cfg *streamcontrol.PlatformConfig[kick.PlatformSpecificConfig, kick.StreamProfile],
) BackendStatusCode {
w := p.app.NewWindow(AppName + ": Input Kick user info")
w := p.app.NewWindow(gconsts.AppName + ": Input Kick user info")
resizeWindow(w, fyne.NewSize(600, 200))
channelField := widget.NewEntry()
@@ -1123,7 +1123,7 @@ func (p *Panel) InputYouTubeUserInfo(
ctx context.Context,
cfg *streamcontrol.PlatformConfig[youtube.PlatformSpecificConfig, youtube.StreamProfile],
) BackendStatusCode {
w := p.app.NewWindow(AppName + ": Input YouTube user info")
w := p.app.NewWindow(gconsts.AppName + ": Input YouTube user info")
resizeWindow(w, fyne.NewSize(600, 200))
clientIDField := widget.NewEntry()
@@ -1510,7 +1510,7 @@ func (p *Panel) openSettingsWindowNoLock(
backendEnabled[backendID] = isEnabled
}
w := p.app.NewWindow(AppName + ": Settings")
w := p.app.NewWindow(gconsts.AppName + ": Settings")
resizeWindow(w, fyne.NewSize(400, 900))
if obsCfg, ok := streamDCfg.Backends[obs.ID]; ok {
@@ -2055,7 +2055,7 @@ func (p *Panel) initMainWindow(
logger.Debugf(ctx, "initMainWindow")
defer logger.Debugf(ctx, "/initMainWindow")
w := p.app.NewWindow(AppName)
w := p.app.NewWindow(gconsts.AppName)
p.mainWindow = w
w.SetMaster()
resizeWindow(w, fyne.NewSize(400, 600))
@@ -2880,7 +2880,7 @@ func (p *Panel) cloneProfileWindow(ctx context.Context) fyne.Window {
}
func (p *Panel) deleteProfileWindow(ctx context.Context) fyne.Window {
w := p.app.NewWindow(AppName + ": Delete the profile?")
w := p.app.NewWindow(gconsts.AppName + ": Delete the profile?")
yesButton := widget.NewButton("YES", func() {
err := p.profileDelete(ctx, *p.selectedProfileName)

View File

@@ -16,6 +16,7 @@ import (
"fyne.io/fyne/v2/widget"
"github.com/dustin/go-humanize"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/streamctl/pkg/consts"
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/streamctl/pkg/player"
"github.com/xaionaro-go/streamctl/pkg/streamd/api"
@@ -154,7 +155,7 @@ func (p *Panel) initRestreamPage(
}
func (p *Panel) openAddStreamServerWindow(ctx context.Context) {
w := p.app.NewWindow(AppName + ": Add Stream Server")
w := p.app.NewWindow(consts.AppName + ": Add Stream Server")
resizeWindow(w, fyne.NewSize(400, 300))
currentProtocol := streamtypes.ServerTypeRTMP
@@ -353,7 +354,7 @@ func bwString(
}
func (p *Panel) openAddStreamWindow(ctx context.Context) {
w := p.app.NewWindow(AppName + ": Add incoming stream")
w := p.app.NewWindow(consts.AppName + ": Add incoming stream")
resizeWindow(w, fyne.NewSize(400, 300))
streamIDEntry := widget.NewEntry()
@@ -470,7 +471,7 @@ func (p *Panel) openAddOrEditDestinationWindow(
streamKey string,
) error,
) {
w := p.app.NewWindow(AppName + ": " + title)
w := p.app.NewWindow(consts.AppName + ": " + title)
resizeWindow(w, fyne.NewSize(400, 300))
destinationIDEntry := widget.NewEntry()
@@ -637,7 +638,7 @@ func (p *Panel) openAddOrEditPlayerWindow(
streamPlaybackConfig sptypes.Config,
) error,
) {
w := p.app.NewWindow(AppName + ": " + title)
w := p.app.NewWindow(consts.AppName + ": " + title)
resizeWindow(w, fyne.NewSize(400, 300))
var playerStrs []string
@@ -1008,7 +1009,7 @@ func (p *Panel) openAddOrEditRestreamWindow(
dstID,
fwd,
)
w := p.app.NewWindow(AppName + ": " + title)
w := p.app.NewWindow(consts.AppName + ": " + title)
resizeWindow(w, fyne.NewSize(400, 300))
enabledCheck := widget.NewCheck("Enable", func(b bool) {})

View File

@@ -10,6 +10,7 @@ import (
"fyne.io/fyne/v2/theme"
"fyne.io/fyne/v2/widget"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/streamctl/pkg/consts"
"github.com/xaionaro-go/streamctl/pkg/observability"
"github.com/xaionaro-go/streamctl/pkg/serializable"
"github.com/xaionaro-go/streamctl/pkg/serializable/registry"
@@ -45,7 +46,7 @@ func NewTriggerRulesUI(
}
func (ui *triggerRulesUI) openSetupWindow(ctx context.Context) {
w := ui.panel.app.NewWindow(AppName + ": Setup trigger rules")
w := ui.panel.app.NewWindow(consts.AppName + ": Setup trigger rules")
resizeWindow(w, fyne.NewSize(1000, 1000))
var refreshContent func() bool
@@ -142,7 +143,7 @@ func (ui *triggerRulesUI) openAddOrEditSceneRuleWindow(
triggerRule config.TriggerRule,
commitFn func(context.Context, *config.TriggerRule) error,
) {
w := ui.panel.app.NewWindow(AppName + ": " + title)
w := ui.panel.app.NewWindow(consts.AppName + ": " + title)
resizeWindow(w, fyne.NewSize(1000, 1000))
var triggerQueryTypeList []string

View File

@@ -56,7 +56,7 @@ func (wmh *XWindowManagerHandler) WindowFocusChangeChan(ctx context.Context) <-c
clientID, err := ewmh.ActiveWindowGet(wmh.XUtil)
if err != nil {
logger.Errorf(ctx, "unable to get active window: %w", err)
logger.Errorf(ctx, "unable to get active window: %v", err)
continue
}