mirror of
https://github.com/gravitl/netmaker.git
synced 2025-10-06 17:29:15 +08:00
resub on server ping timeout; may not work in multi server enviornment
This commit is contained in:
@@ -22,7 +22,7 @@ import (
|
||||
)
|
||||
|
||||
// ServerKeepalive - stores time of last server keepalive message
|
||||
var KeepaliveReceived time.Time
|
||||
var keepalive chan string = make(chan string)
|
||||
var messageCache = make(map[string]string, 20)
|
||||
|
||||
const lastNodeUpdate = "lnu"
|
||||
@@ -83,7 +83,6 @@ func MessageQueue(ctx context.Context, network string) {
|
||||
cfg.Network = network
|
||||
cfg.ReadConfig()
|
||||
ncutils.Log("daemon started for network:" + network)
|
||||
KeepaliveReceived = time.Now()
|
||||
client := SetupMQTT(&cfg)
|
||||
if cfg.DebugOn {
|
||||
if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil {
|
||||
@@ -110,6 +109,7 @@ func MessageQueue(ctx context.Context, network string) {
|
||||
ncutils.Log("subscribed to server keepalives")
|
||||
}
|
||||
defer client.Disconnect(250)
|
||||
go MonitorKeepalive(ctx, client, &cfg)
|
||||
go Checkin(ctx, &cfg, network)
|
||||
<-ctx.Done()
|
||||
ncutils.Log("shutting down daemon")
|
||||
@@ -227,56 +227,67 @@ func UpdatePeers(client mqtt.Client, msg mqtt.Message) {
|
||||
cfg.ReadConfig()
|
||||
var shouldReSub = shouldResub(cfg.Node.NetworkSettings.DefaultServerAddrs, peerUpdate.ServerAddrs)
|
||||
if shouldReSub {
|
||||
Resubscribe(client, &cfg)
|
||||
cfg.Node.NetworkSettings.DefaultServerAddrs = peerUpdate.ServerAddrs
|
||||
}
|
||||
file := ncutils.GetNetclientPathSpecific() + cfg.Node.Interface + ".conf"
|
||||
err = wireguard.UpdateWgPeers(file, peerUpdate.Peers)
|
||||
if err != nil {
|
||||
ncutils.Log("error updating wireguard peers" + err.Error())
|
||||
return
|
||||
}
|
||||
ncutils.Log("applyWGQuickConf to " + file)
|
||||
err = wireguard.ApplyWGQuickConf(file)
|
||||
if err != nil {
|
||||
ncutils.Log("error restarting wg after peer update " + err.Error())
|
||||
return
|
||||
if err := config.ModConfig(&cfg.Node); err == nil {
|
||||
Resubscribe(client, &cfg)
|
||||
cfg.Node.NetworkSettings.DefaultServerAddrs = peerUpdate.ServerAddrs
|
||||
} else {
|
||||
ncutils.Log("resub required but mod config failed")
|
||||
}
|
||||
file := ncutils.GetNetclientPathSpecific() + cfg.Node.Interface + ".conf"
|
||||
err = wireguard.UpdateWgPeers(file, peerUpdate.Peers)
|
||||
if err != nil {
|
||||
ncutils.Log("error updating wireguard peers" + err.Error())
|
||||
return
|
||||
}
|
||||
ncutils.Log("applyWGQuickConf to " + file)
|
||||
err = wireguard.ApplyWGQuickConf(file)
|
||||
if err != nil {
|
||||
ncutils.Log("error restarting wg after peer update " + err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// MonitorKeepalive - checks time last server keepalive received. If more than 3+ minutes, notify and resubscribe
|
||||
func MonitorKeepalive(ctx context.Context, client mqtt.Client, cfg *config.ClientConfig) {
|
||||
lastUpdate := time.Now()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(time.Second * 150):
|
||||
if time.Since(lastUpdate) < time.Second*200 { // more than 3+ minutes
|
||||
Resubscribe(client, cfg)
|
||||
}
|
||||
case <-keepalive:
|
||||
lastUpdate = time.Now()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ServerKeepAlive -- handler to react to keepalive messages published by server
|
||||
func ServerKeepAlive(client mqtt.Client, msg mqtt.Message) {
|
||||
if time.Now().Sub(KeepaliveReceived) < time.Second*200 { // more than 3+ minutes
|
||||
|
||||
KeepaliveReceived = time.Now()
|
||||
return
|
||||
}
|
||||
ncutils.Log("server keepalive not recieved in last 3 minutes")
|
||||
///do other stuff
|
||||
serverid := string(msg.Payload())
|
||||
keepalive <- serverid
|
||||
}
|
||||
|
||||
// Resubscribe --- handles resubscribing if needed
|
||||
func Resubscribe(client mqtt.Client, cfg *config.ClientConfig) error {
|
||||
if err := config.ModConfig(&cfg.Node); err == nil {
|
||||
ncutils.Log("resubbing on network " + cfg.Node.Network)
|
||||
client.Disconnect(250)
|
||||
client = SetupMQTT(cfg)
|
||||
if token := client.Subscribe("update/"+cfg.Node.ID, 0, NodeUpdate); token.Wait() && token.Error() != nil {
|
||||
log.Fatal(token.Error())
|
||||
}
|
||||
if cfg.DebugOn {
|
||||
ncutils.Log("subscribed to node updates for node " + cfg.Node.Name + " update/" + cfg.Node.ID)
|
||||
}
|
||||
if token := client.Subscribe("update/peers/"+cfg.Node.ID, 0, UpdatePeers); token.Wait() && token.Error() != nil {
|
||||
log.Fatal(token.Error())
|
||||
}
|
||||
ncutils.Log("finished re subbing")
|
||||
return nil
|
||||
} else {
|
||||
ncutils.Log("could not mod config when re-subbing")
|
||||
return err
|
||||
ncutils.Log("resubbing on network " + cfg.Node.Network)
|
||||
client.Disconnect(250)
|
||||
client = SetupMQTT(cfg)
|
||||
if token := client.Subscribe("update/"+cfg.Node.ID, 0, NodeUpdate); token.Wait() && token.Error() != nil {
|
||||
log.Fatal(token.Error())
|
||||
}
|
||||
if cfg.DebugOn {
|
||||
ncutils.Log("subscribed to node updates for node " + cfg.Node.Name + " update/" + cfg.Node.ID)
|
||||
}
|
||||
if token := client.Subscribe("update/peers/"+cfg.Node.ID, 0, UpdatePeers); token.Wait() && token.Error() != nil {
|
||||
log.Fatal(token.Error())
|
||||
}
|
||||
ncutils.Log("finished re subbing")
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateKeys -- updates private key and returns new publickey
|
||||
|
Reference in New Issue
Block a user