disco: add ControllerManager

This commit is contained in:
rkonfj
2024-07-13 08:57:44 +08:00
parent 1795187493
commit 3056f2152c
4 changed files with 64 additions and 22 deletions

12
disco/control.go Normal file
View File

@@ -0,0 +1,12 @@
package disco
type Controller interface {
Handle(b []byte)
Name() string
Type() uint8
}
type ControllerManager interface {
Register(Controller)
Unregister(Controller)
}

View File

@@ -75,6 +75,7 @@ func (d *Disco) magic() []byte {
type PeerStore interface {
FindPeer(peer.ID) (*PeerContext, bool)
Peers() []PeerState
}
type PeerContext struct {

View File

@@ -22,24 +22,27 @@ import (
)
var (
_ io.ReadWriter = (*WSConn)(nil)
_ io.ReadWriter = (*WSConn)(nil)
_ ControllerManager = (*WSConn)(nil)
)
type WSConn struct {
*websocket.Conn
server *peer.Peermap
connectedServer string
peerID peer.ID
metadata url.Values
closedSig chan int
datagrams chan *Datagram
peers chan *PeerFindEvent
peersUDPAddrs chan *PeerUDPAddrEvent
nonce byte
stuns []string
activeTime atomic.Int64
writeMutex sync.Mutex
rateLimiter *rate.Limiter
server *peer.Peermap
connectedServer string
peerID peer.ID
metadata url.Values
closedSig chan int
datagrams chan *Datagram
peers chan *PeerFindEvent
peersUDPAddrs chan *PeerUDPAddrEvent
nonce byte
stuns []string
activeTime atomic.Int64
writeMutex sync.Mutex
rateLimiter *rate.Limiter
controllersMutex sync.RWMutex
controllers map[uint8][]Controller
connData chan []byte
connBuf []byte
@@ -135,6 +138,24 @@ func (c *WSConn) ServerURL() string {
return c.connectedServer
}
func (c *WSConn) Register(ctr Controller) {
c.controllersMutex.Lock()
defer c.controllersMutex.Unlock()
c.controllers[ctr.Type()] = append(c.controllers[ctr.Type()], ctr)
}
func (c *WSConn) Unregister(ctr Controller) {
c.controllersMutex.Lock()
defer c.controllersMutex.Unlock()
var filterd []Controller
for _, ct := range c.controllers[ctr.Type()] {
if ct.Name() != ctr.Name() {
filterd = append(filterd, ct)
}
}
c.controllers[ctr.Type()] = filterd
}
func (c *WSConn) dial(ctx context.Context, server string) error {
networkSecret, err := c.server.SecretStore().NetworkSecret()
if err != nil {
@@ -343,6 +364,13 @@ func (c *WSConn) handleEvents(b []byte) {
go c.updateNetworkSecret(secret)
case peer.CONTROL_CONN:
c.connData <- b[1:]
default:
c.controllersMutex.RLock()
ctrs := c.controllers[b[0]]
c.controllersMutex.RUnlock()
for _, ctr := range ctrs {
ctr.Handle(b)
}
}
}
@@ -384,6 +412,7 @@ func DialPeermap(ctx context.Context, server *peer.Peermap, peerID peer.ID, meta
peers: make(chan *PeerFindEvent, 20),
peersUDPAddrs: make(chan *PeerUDPAddrEvent, 20),
connData: make(chan []byte, 128),
controllers: make(map[uint8][]Controller),
}
if err := wsConn.dial(ctx, ""); err != nil {
return nil, err

View File

@@ -185,11 +185,6 @@ func (c *PeerPacketConn) TryLeadDisco(peerID peer.ID) {
}
}
// UDPConn return the os udp socket
func (c *PeerPacketConn) UDPConn() net.PacketConn {
return c.udpConn
}
// ServerStream is the connection stream to the peermap server
func (c *PeerPacketConn) ServerStream() io.ReadWriter {
return c.wsConn
@@ -200,9 +195,14 @@ func (c *PeerPacketConn) ServerURL() string {
return c.wsConn.ServerURL()
}
// Peers return the found peers
func (c *PeerPacketConn) Peers() []disco.PeerState {
return c.udpConn.Peers()
// ControllerManager makes changes attempting to move the current state towards the desired state
func (c *PeerPacketConn) ControllerManager() disco.ControllerManager {
return c.wsConn
}
// PeerStore stores the found peers
func (c *PeerPacketConn) PeerStore() disco.PeerStore {
return c.udpConn
}
// runControlEventLoop events control loop