mirror of
https://github.com/gravitl/netmaker.git
synced 2025-10-07 09:41:37 +08:00
initial commit
This commit is contained in:
@@ -5,7 +5,9 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
@@ -13,6 +15,7 @@ import (
|
||||
|
||||
"github.com/cloverstd/tcping/ping"
|
||||
"github.com/gravitl/netmaker/logger"
|
||||
"github.com/gravitl/netmaker/logic/pro/metrics"
|
||||
"github.com/gravitl/netmaker/models"
|
||||
"github.com/gravitl/netmaker/netclient/auth"
|
||||
"github.com/gravitl/netmaker/netclient/config"
|
||||
@@ -20,13 +23,16 @@ import (
|
||||
"github.com/gravitl/netmaker/tls"
|
||||
)
|
||||
|
||||
var metricsCache = new(sync.Map)
|
||||
|
||||
// Checkin -- go routine that checks for public or local ip changes, publishes changes
|
||||
//
|
||||
// if there are no updates, simply "pings" the server as a checkin
|
||||
func Checkin(ctx context.Context, wg *sync.WaitGroup) {
|
||||
logger.Log(2, "starting checkin goroutine")
|
||||
defer wg.Done()
|
||||
checkin()
|
||||
currentRun := 0
|
||||
checkin(currentRun)
|
||||
ticker := time.NewTicker(time.Second * 60)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
@@ -36,12 +42,16 @@ func Checkin(ctx context.Context, wg *sync.WaitGroup) {
|
||||
return
|
||||
//delay should be configuraable -> use cfg.Node.NetworkSettings.DefaultCheckInInterval ??
|
||||
case <-ticker.C:
|
||||
checkin()
|
||||
currentRun++
|
||||
checkin(currentRun)
|
||||
if currentRun >= 5 {
|
||||
currentRun = 0
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func checkin() {
|
||||
func checkin(currentRun int) {
|
||||
networks, _ := ncutils.GetSystemNetworks()
|
||||
logger.Log(3, "checkin with server(s) for all networks")
|
||||
for _, network := range networks {
|
||||
@@ -104,6 +114,10 @@ func checkin() {
|
||||
}
|
||||
Hello(&nodeCfg)
|
||||
checkCertExpiry(&nodeCfg)
|
||||
if currentRun >= 5 {
|
||||
logger.Log(0, "collecting metrics for node", nodeCfg.Node.Name)
|
||||
publishMetrics(&nodeCfg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -146,6 +160,78 @@ func Hello(nodeCfg *config.ClientConfig) {
|
||||
}
|
||||
}
|
||||
|
||||
// publishMetrics - publishes the metrics of a given nodecfg
|
||||
func publishMetrics(nodeCfg *config.ClientConfig) {
|
||||
token, err := Authenticate(nodeCfg)
|
||||
if err != nil {
|
||||
logger.Log(1, "failed to authenticate when publishing metrics", err.Error())
|
||||
return
|
||||
}
|
||||
url := "https://" + nodeCfg.Server.API + "/api/nodes/" + nodeCfg.Network + "/" + nodeCfg.Node.ID
|
||||
response, err := API("", http.MethodGet, url, token)
|
||||
if err != nil {
|
||||
logger.Log(1, "failed to read from server during metrics publish", err.Error())
|
||||
return
|
||||
}
|
||||
if response.StatusCode != http.StatusOK {
|
||||
bytes, err := io.ReadAll(response.Body)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
logger.Log(0, fmt.Sprintf("%s %s", string(bytes), err.Error()))
|
||||
return
|
||||
}
|
||||
defer response.Body.Close()
|
||||
var nodeGET models.NodeGet
|
||||
if err := json.NewDecoder(response.Body).Decode(&nodeGET); err != nil {
|
||||
logger.Log(0, "failed to decode node when running metrics update", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
metrics, err := metrics.Collect(nodeCfg.Node.Interface, nodeGET.PeerIDs)
|
||||
if err != nil {
|
||||
logger.Log(0, "failed metric collection for node", nodeCfg.Node.Name, err.Error())
|
||||
}
|
||||
metrics.Network = nodeCfg.Node.Network
|
||||
metrics.NodeName = nodeCfg.Node.Name
|
||||
metrics.NodeID = nodeCfg.Node.ID
|
||||
metrics.IsServer = "no"
|
||||
data, err := json.Marshal(metrics)
|
||||
if err != nil {
|
||||
logger.Log(0, "something went wrong when marshalling metrics data for node", nodeCfg.Node.Name, err.Error())
|
||||
}
|
||||
|
||||
if err = publish(nodeCfg, fmt.Sprintf("metrics/%s", nodeCfg.Node.ID), data, 1); err != nil {
|
||||
logger.Log(0, "error occurred during publishing of metrics on node", nodeCfg.Node.Name, err.Error())
|
||||
logger.Log(0, "aggregating metrics locally until broker connection re-established")
|
||||
val, ok := metricsCache.Load(nodeCfg.Node.ID)
|
||||
if !ok {
|
||||
metricsCache.Store(nodeCfg.Node.ID, data)
|
||||
} else {
|
||||
var oldMetrics models.Metrics
|
||||
err = json.Unmarshal(val.([]byte), &oldMetrics)
|
||||
if err == nil {
|
||||
for k := range oldMetrics.Connectivity {
|
||||
currentMetric := metrics.Connectivity[k]
|
||||
if currentMetric.Latency == 0 {
|
||||
currentMetric.Latency = oldMetrics.Connectivity[k].Latency
|
||||
}
|
||||
currentMetric.Uptime += oldMetrics.Connectivity[k].Uptime
|
||||
currentMetric.TotalTime += oldMetrics.Connectivity[k].TotalTime
|
||||
metrics.Connectivity[k] = currentMetric
|
||||
}
|
||||
newData, err := json.Marshal(metrics)
|
||||
if err == nil {
|
||||
metricsCache.Store(nodeCfg.Node.ID, newData)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
metricsCache.Delete(nodeCfg.Node.ID)
|
||||
logger.Log(0, "published metrics for node", nodeCfg.Node.Name)
|
||||
}
|
||||
}
|
||||
|
||||
// node cfg is required in order to fetch the traffic keys of that node for encryption
|
||||
func publish(nodeCfg *config.ClientConfig, dest string, msg []byte, qos byte) error {
|
||||
// setup the keys
|
||||
|
Reference in New Issue
Block a user