mirror of
https://codeberg.org/cunicu/cunicu.git
synced 2025-10-31 04:46:19 +08:00
rewrite event system for control socket
Signed-off-by: Steffen Vogel <post@steffenvogel.de>
This commit is contained in:
@@ -1,16 +1,20 @@
|
||||
package socket
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pion/ice/v2"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
ginsecure "google.golang.org/grpc/credentials/insecure"
|
||||
"riasc.eu/wice/pkg/crypto"
|
||||
"riasc.eu/wice/pkg/intf"
|
||||
"riasc.eu/wice/pkg/pb"
|
||||
)
|
||||
|
||||
@@ -21,6 +25,10 @@ type Client struct {
|
||||
grpc *grpc.ClientConn
|
||||
logger *zap.Logger
|
||||
|
||||
connectionStates map[crypto.Key]ice.ConnectionState
|
||||
connectionStatesLock sync.Mutex
|
||||
connectionStatesCond *sync.Cond
|
||||
|
||||
Events chan *pb.Event
|
||||
}
|
||||
|
||||
@@ -59,11 +67,15 @@ func Connect(path string) (*Client, error) {
|
||||
logger := zap.L().Named("socket.client").With(zap.String("path", path))
|
||||
|
||||
client := &Client{
|
||||
SocketClient: pb.NewSocketClient(conn),
|
||||
grpc: conn,
|
||||
logger: logger,
|
||||
Events: make(chan *pb.Event, 100),
|
||||
SocketClient: pb.NewSocketClient(conn),
|
||||
grpc: conn,
|
||||
logger: logger,
|
||||
Events: make(chan *pb.Event, 100),
|
||||
connectionStates: make(map[crypto.Key]ice.ConnectionState),
|
||||
}
|
||||
client.connectionStatesCond = sync.NewCond(&client.connectionStatesLock)
|
||||
|
||||
go client.streamEvents()
|
||||
|
||||
rerr, err := client.UnWait(context.Background(), &pb.UnWaitParams{})
|
||||
if err != nil {
|
||||
@@ -72,8 +84,6 @@ func Connect(path string) (*Client, error) {
|
||||
return nil, fmt.Errorf("received RPC error: %w", rerr)
|
||||
}
|
||||
|
||||
go client.streamEvents()
|
||||
|
||||
return client, nil
|
||||
}
|
||||
|
||||
@@ -84,44 +94,82 @@ func (c *Client) Close() error {
|
||||
}
|
||||
|
||||
func (c *Client) streamEvents() {
|
||||
str, err := c.StreamEvents(context.Background(), &pb.StreamEventsParams{})
|
||||
stream, err := c.StreamEvents(context.Background(), &pb.StreamEventsParams{})
|
||||
if err != nil {
|
||||
c.logger.Error("Failed to stream events", zap.Error(err))
|
||||
}
|
||||
|
||||
ok := true
|
||||
for ok {
|
||||
evt, err := str.Recv()
|
||||
e, err := stream.Recv()
|
||||
if err != nil {
|
||||
c.logger.Error("Failed to receive event", zap.Error(err))
|
||||
break
|
||||
}
|
||||
|
||||
evt.Log(c.logger, "Received event")
|
||||
c.Events <- evt
|
||||
if e.Type == pb.Event_PEER_CONNECTION_STATE_CHANGED {
|
||||
if pcs, ok := e.Event.(*pb.Event_PeerConnectionStateChange); ok {
|
||||
pk := *(*crypto.Key)(e.Peer)
|
||||
cs := pcs.PeerConnectionStateChange.NewState.ConnectionState()
|
||||
|
||||
c.connectionStatesLock.Lock()
|
||||
c.connectionStates[pk] = cs
|
||||
c.connectionStatesCond.Broadcast()
|
||||
c.connectionStatesLock.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
e.Log(c.logger, "Received event")
|
||||
c.Events <- e
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) WaitForEvent(flt *pb.Event) *pb.Event {
|
||||
for evt := range c.Events {
|
||||
if evt.Match(flt) {
|
||||
return evt
|
||||
func (c *Client) WaitForEvent(t pb.Event_Type, intf string, peer crypto.Key) *pb.Event {
|
||||
for e := range c.Events {
|
||||
if e.Type != t {
|
||||
continue
|
||||
}
|
||||
|
||||
if intf != "" && intf != e.Interface {
|
||||
continue
|
||||
}
|
||||
|
||||
if peer.IsSet() && !bytes.Equal(peer.Bytes(), e.Peer) {
|
||||
continue
|
||||
}
|
||||
|
||||
return e
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) WaitPeerHandshake(peer crypto.Key) {
|
||||
c.WaitForEvent(&pb.Event{
|
||||
Type: "handshake",
|
||||
State: "new",
|
||||
})
|
||||
func (c *Client) WaitForPeerHandshake(peer crypto.Key) {
|
||||
for {
|
||||
e := c.WaitForEvent(pb.Event_PEER_MODIFIED, "", peer)
|
||||
|
||||
ee, ok := e.Event.(*pb.Event_PeerModified)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
mod := intf.PeerModifier(ee.PeerModified.Modified)
|
||||
if mod.Is(intf.PeerModifiedHandshakeTime) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) WaitPeerConnected() {
|
||||
c.WaitForEvent(&pb.Event{
|
||||
Type: "state",
|
||||
State: "connected",
|
||||
})
|
||||
func (c *Client) WaitForPeerConnectionState(peer crypto.Key, csd ice.ConnectionState) {
|
||||
for {
|
||||
c.connectionStatesLock.Lock()
|
||||
for {
|
||||
if cs, ok := c.connectionStates[peer]; ok && cs == csd {
|
||||
c.connectionStatesLock.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
c.connectionStatesCond.Wait()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user