diff --git a/cluster/api.go b/cluster/api.go index b8943255..cb9df036 100644 --- a/cluster/api.go +++ b/cluster/api.go @@ -242,7 +242,13 @@ func (a *api) RemoveServer(c echo.Context) error { // @Success 500 {array} Error // @Router /v1/snapshot [get] func (a *api) Snapshot(c echo.Context) error { - data, err := a.cluster.Snapshot() + origin := c.Request().Header.Get("X-Cluster-Origin") + + if origin == a.id { + return Err(http.StatusLoopDetected, "", "breaking circuit") + } + + data, err := a.cluster.Snapshot(origin) if err != nil { a.logger.Debug().WithError(err).Log("Unable to create snaphot") return Err(http.StatusInternalServerError, "", "unable to create snapshot: %s", err.Error()) diff --git a/cluster/client/client.go b/cluster/client/client.go index ce7f8758..c42c648e 100644 --- a/cluster/client/client.go +++ b/cluster/client/client.go @@ -249,8 +249,8 @@ func (c *APIClient) UnsetKV(origin string, key string) error { return err } -func (c *APIClient) Snapshot() (io.ReadCloser, error) { - return c.stream(http.MethodGet, "/v1/snapshot", "", nil, "") +func (c *APIClient) Snapshot(origin string) (io.ReadCloser, error) { + return c.stream(http.MethodGet, "/v1/snapshot", "", nil, origin) } func (c *APIClient) IsReady(origin string) error { diff --git a/cluster/cluster.go b/cluster/cluster.go index b455b05c..e1bc64c6 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -48,7 +48,7 @@ type Cluster interface { Join(origin, id, raftAddress, peerAddress string) error Leave(origin, id string) error // gracefully remove a node from the cluster - Snapshot() (io.ReadCloser, error) + Snapshot(origin string) (io.ReadCloser, error) Shutdown() error @@ -827,10 +827,10 @@ func (c *cluster) Join(origin, id, raftAddress, peerAddress string) error { return nil } -func (c *cluster) Snapshot() (io.ReadCloser, error) { +func (c *cluster) Snapshot(origin string) (io.ReadCloser, error) { if !c.IsRaftLeader() { c.logger.Debug().Log("Not leader, forwarding to leader") - return c.forwarder.Snapshot() + return c.forwarder.Snapshot(origin) } return c.raft.Snapshot() diff --git a/cluster/forwarder/forwarder.go b/cluster/forwarder/forwarder.go index 9c796ab5..9d12d92c 100644 --- a/cluster/forwarder/forwarder.go +++ b/cluster/forwarder/forwarder.go @@ -20,7 +20,7 @@ type Forwarder interface { Join(origin, id, raftAddress, peerAddress string) error Leave(origin, id string) error - Snapshot() (io.ReadCloser, error) + Snapshot(origin string) (io.ReadCloser, error) AddProcess(origin string, config *app.Config) error UpdateProcess(origin string, id app.ProcessID, config *app.Config) error @@ -140,12 +140,12 @@ func (f *forwarder) Leave(origin, id string) error { return client.Leave(origin, id) } -func (f *forwarder) Snapshot() (io.ReadCloser, error) { +func (f *forwarder) Snapshot(origin string) (io.ReadCloser, error) { f.lock.RLock() client := f.client f.lock.RUnlock() - return client.Snapshot() + return client.Snapshot(origin) } func (f *forwarder) AddProcess(origin string, config *app.Config) error { diff --git a/cluster/raft/raft.go b/cluster/raft/raft.go index 36380f9d..4bfd155c 100644 --- a/cluster/raft/raft.go +++ b/cluster/raft/raft.go @@ -2,7 +2,8 @@ package raft import ( "bytes" - "encoding/gob" + "encoding/base64" + "encoding/json" "fmt" "io" gonet "net" @@ -287,7 +288,7 @@ func (rcw *readCloserWrapper) Close() error { type Snapshot struct { Metadata *hcraft.SnapshotMeta - Data []byte + Data string } func (r *raft) Snapshot() (io.ReadCloser, error) { @@ -311,11 +312,11 @@ func (r *raft) Snapshot() (io.ReadCloser, error) { snapshot := Snapshot{ Metadata: metadata, - Data: data, + Data: base64.StdEncoding.EncodeToString(data), } buffer := bytes.Buffer{} - enc := gob.NewEncoder(&buffer) + enc := json.NewEncoder(&buffer) err = enc.Encode(snapshot) if err != nil { return nil, err diff --git a/docs/docs.go b/docs/docs.go index ddd92fde..5da4011d 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -1441,6 +1441,32 @@ const docTemplate = `{ } } }, + "/api/v3/cluster/snapshot": { + "get": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Retrieve snapshot of the cluster DB", + "produces": [ + "application/octet-stream" + ], + "tags": [ + "v16.?.?" + ], + "summary": "Retrieve snapshot of the cluster DB", + "operationId": "cluster-3-snapshot", + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "file" + } + } + } + } + }, "/api/v3/config": { "get": { "security": [ diff --git a/docs/swagger.json b/docs/swagger.json index e3e6b87e..00969ca0 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -1433,6 +1433,32 @@ } } }, + "/api/v3/cluster/snapshot": { + "get": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Retrieve snapshot of the cluster DB", + "produces": [ + "application/octet-stream" + ], + "tags": [ + "v16.?.?" + ], + "summary": "Retrieve snapshot of the cluster DB", + "operationId": "cluster-3-snapshot", + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "file" + } + } + } + } + }, "/api/v3/config": { "get": { "security": [ diff --git a/docs/swagger.yaml b/docs/swagger.yaml index a73570be..cf42dc9a 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -3323,6 +3323,22 @@ paths: summary: Add JSON metadata with a process under the given key tags: - v16.?.? + /api/v3/cluster/snapshot: + get: + description: Retrieve snapshot of the cluster DB + operationId: cluster-3-snapshot + produces: + - application/octet-stream + responses: + "200": + description: OK + schema: + type: file + security: + - ApiKeyAuth: [] + summary: Retrieve snapshot of the cluster DB + tags: + - v16.?.? /api/v3/config: get: description: Retrieve the currently active Restreamer configuration diff --git a/http/handler/api/cluster.go b/http/handler/api/cluster.go index a9c307f0..0f14a2f7 100644 --- a/http/handler/api/cluster.go +++ b/http/handler/api/cluster.go @@ -1291,3 +1291,23 @@ func (h *ClusterHandler) ListStoreKV(c echo.Context) error { return c.JSON(http.StatusOK, kvs) } + +// GetSnapshot returns a current snapshot of the cluster DB +// @Summary Retrieve snapshot of the cluster DB +// @Description Retrieve snapshot of the cluster DB +// @Tags v16.?.? +// @ID cluster-3-snapshot +// @Produce application/octet-stream +// @Success 200 {file} byte +// @Security ApiKeyAuth +// @Router /api/v3/cluster/snapshot [get] +func (h *ClusterHandler) GetSnapshot(c echo.Context) error { + r, err := h.cluster.Snapshot("") + if err != nil { + return api.Err(http.StatusInternalServerError, "", "failed to retrieve snapshot: %w", err) + } + + defer r.Close() + + return c.Stream(http.StatusOK, "application/octet-stream", r) +} diff --git a/http/server.go b/http/server.go index a0f93fcd..2c5a4934 100644 --- a/http/server.go +++ b/http/server.go @@ -675,6 +675,8 @@ func (s *server) setRoutesV3(v3 *echo.Group) { if s.v3handler.cluster != nil { v3.GET("/cluster", s.v3handler.cluster.About) + v3.GET("/cluster/snapshot", s.v3handler.cluster.GetSnapshot) + v3.GET("/cluster/db/process", s.v3handler.cluster.ListStoreProcesses) v3.GET("/cluster/db/user", s.v3handler.cluster.ListStoreIdentities) v3.GET("/cluster/db/user/:name", s.v3handler.cluster.ListStoreIdentity)