diff --git a/config/config.go b/config/config.go index 36ce1cbf..06e8ee39 100644 --- a/config/config.go +++ b/config/config.go @@ -37,7 +37,7 @@ type ServerConfig struct { APIConnString string `yaml:"apiconn"` APIHost string `yaml:"apihost"` APIPort string `yaml:"apiport"` - Broker string `yam:"broker"` + Broker string `yaml:"broker"` ServerBrokerEndpoint string `yaml:"serverbrokerendpoint"` BrokerType string `yaml:"brokertype"` EmqxRestEndpoint string `yaml:"emqxrestendpoint"` diff --git a/controllers/hosts.go b/controllers/hosts.go index 1346b654..5458a179 100644 --- a/controllers/hosts.go +++ b/controllers/hosts.go @@ -24,6 +24,10 @@ func hostHandlers(r *mux.Router) { Methods(http.MethodGet) r.HandleFunc("/api/hosts/keys", logic.SecurityCheck(true, http.HandlerFunc(updateAllKeys))). Methods(http.MethodPut) + r.HandleFunc("/api/hosts/sync", logic.SecurityCheck(true, http.HandlerFunc(syncHosts))). + Methods(http.MethodPost) + r.HandleFunc("/api/hosts/upgrade", logic.SecurityCheck(true, http.HandlerFunc(upgradeHosts))). + Methods(http.MethodPost) r.HandleFunc("/api/hosts/{hostid}/keys", logic.SecurityCheck(true, http.HandlerFunc(updateKeys))). Methods(http.MethodPut) r.HandleFunc("/api/hosts/{hostid}/sync", logic.SecurityCheck(true, http.HandlerFunc(syncHost))). @@ -50,11 +54,57 @@ func hostHandlers(r *mux.Router) { r.HandleFunc("/api/v1/auth-register/host", socketHandler) } +// @Summary Requests all the hosts to upgrade their version +// @Router /api/hosts/upgrade [post] +// @Tags Hosts +// @Security oauth +// @Param force query bool false "Force upgrade" +// @Success 200 {string} string "upgrade all hosts request received" +func upgradeHosts(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + + action := models.Upgrade + + if r.URL.Query().Get("force") == "true" { + action = models.ForceUpgrade + } + + user := r.Header.Get("user") + + go func() { + slog.Info("requesting all hosts to upgrade", "user", user) + + hosts, err := logic.GetAllHosts() + if err != nil { + slog.Error("failed to retrieve all hosts", "user", user, "error", err) + return + } + + for _, host := range hosts { + go func(host models.Host) { + hostUpdate := models.HostUpdate{ + Action: action, + Host: host, + } + if err = mq.HostUpdate(&hostUpdate); err != nil { + slog.Error("failed to request host to upgrade", "user", user, "host", host.ID.String(), "error", err) + } else { + slog.Info("host upgrade requested", "user", user, "host", host.ID.String()) + } + }(host) + } + }() + + slog.Info("upgrade all hosts request received", "user", user) + logic.ReturnSuccessResponse(w, r, "upgrade all hosts request received") +} + // @Summary Upgrade a host // @Router /api/hosts/{hostid}/upgrade [put] // @Tags Hosts // @Security oauth // @Param hostid path string true "Host ID" +// @Param force query bool false "Force upgrade" // @Success 200 {string} string "passed message to upgrade host" // @Failure 500 {object} models.ErrorResponse // upgrade host is a handler to send upgrade message to a host @@ -65,7 +115,14 @@ func upgradeHost(w http.ResponseWriter, r *http.Request) { logic.ReturnErrorResponse(w, r, logic.FormatError(err, "notfound")) return } - if err := mq.HostUpdate(&models.HostUpdate{Action: models.Upgrade, Host: *host}); err != nil { + + action := models.Upgrade + + if r.URL.Query().Get("force") == "true" { + action = models.ForceUpgrade + } + + if err := mq.HostUpdate(&models.HostUpdate{Action: action, Host: *host}); err != nil { slog.Error("failed to upgrade host", "error", err) logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal")) return @@ -860,6 +917,44 @@ func updateKeys(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } +// @Summary Requests all the hosts to pull +// @Router /api/hosts/sync [post] +// @Tags Hosts +// @Security oauth +// @Success 200 {string} string "sync all hosts request received" +func syncHosts(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + + user := r.Header.Get("user") + + go func() { + slog.Info("requesting all hosts to sync", "user", user) + + hosts, err := logic.GetAllHosts() + if err != nil { + slog.Error("failed to retrieve all hosts", "user", user, "error", err) + return + } + + for _, host := range hosts { + go func(host models.Host) { + hostUpdate := models.HostUpdate{ + Action: models.RequestPull, + Host: host, + } + if err = mq.HostUpdate(&hostUpdate); err != nil { + slog.Error("failed to request host to sync", "user", user, "host", host.ID.String(), "error", err) + } else { + slog.Info("host sync requested", "user", user, "host", host.ID.String()) + } + }(host) + } + }() + + slog.Info("sync all hosts request received", "user", user) + logic.ReturnSuccessResponse(w, r, "sync all hosts request received") +} + // @Summary Requests a host to pull // @Router /api/hosts/{hostid}/sync [post] // @Tags Hosts @@ -892,7 +987,7 @@ func syncHost(w http.ResponseWriter, r *http.Request) { } }() - slog.Info("requested host pull", "user", r.Header.Get("user"), "host", host.ID) + slog.Info("requested host pull", "user", r.Header.Get("user"), "host", host.ID.String()) w.WriteHeader(http.StatusOK) } diff --git a/models/host.go b/models/host.go index c6d5eaa3..a1a4c1b9 100644 --- a/models/host.go +++ b/models/host.go @@ -98,6 +98,8 @@ type HostMqAction string const ( // Upgrade - const to request host to update it's client Upgrade HostMqAction = "UPGRADE" + // ForceUpgrade - const for forcing a host to upgrade its client binary + ForceUpgrade HostMqAction = "FORCE_UPGRADE" // SignalHost - const for host signal action SignalHost HostMqAction = "SIGNAL_HOST" // UpdateHost - constant for host update action