Add KV store in cluster DB

This commit is contained in:
Ingo Oppermann
2023-06-22 16:20:09 +02:00
parent db00144cab
commit e5f0b3a57f
6 changed files with 425 additions and 80 deletions

View File

@@ -118,6 +118,12 @@ func NewAPI(config APIConfig) (API, error) {
a.router.PUT("/v1/iam/user/:name/policies", a.SetIdentityPolicies) a.router.PUT("/v1/iam/user/:name/policies", a.SetIdentityPolicies)
a.router.DELETE("/v1/iam/user/:name", a.RemoveIdentity) a.router.DELETE("/v1/iam/user/:name", a.RemoveIdentity)
a.router.POST("/v1/lock", a.Lock)
a.router.DELETE("/v1/lock/:name", a.Unlock)
a.router.POST("/v1/kv", a.SetKV)
a.router.DELETE("/v1/kv/:key", a.UnsetKV)
a.router.GET("/v1/core", a.CoreAPIAddress) a.router.GET("/v1/core", a.CoreAPIAddress)
return a, nil return a, nil
@@ -151,7 +157,7 @@ func (a *api) AddServer(c echo.Context) error {
r := client.JoinRequest{} r := client.JoinRequest{}
if err := util.ShouldBindJSON(c, &r); err != nil { if err := util.ShouldBindJSON(c, &r); err != nil {
return Err(http.StatusBadRequest, "Invalid JSON", "%s", err) return Err(http.StatusBadRequest, "Invalid JSON", "%s", err.Error())
} }
a.logger.Debug().WithFields(log.Fields{ a.logger.Debug().WithFields(log.Fields{
@@ -168,7 +174,7 @@ func (a *api) AddServer(c echo.Context) error {
err := a.cluster.Join(origin, r.ID, r.RaftAddress, "") err := a.cluster.Join(origin, r.ID, r.RaftAddress, "")
if err != nil { if err != nil {
a.logger.Debug().WithError(err).WithField("id", r.ID).Log("Unable to join cluster") a.logger.Debug().WithError(err).WithField("id", r.ID).Log("Unable to join cluster")
return Err(http.StatusInternalServerError, "unable to join cluster", "%s", err) return Err(http.StatusInternalServerError, "unable to join cluster", "%s", err.Error())
} }
return c.JSON(http.StatusOK, "OK") return c.JSON(http.StatusOK, "OK")
@@ -202,7 +208,7 @@ func (a *api) RemoveServer(c echo.Context) error {
err := a.cluster.Leave(origin, id) err := a.cluster.Leave(origin, id)
if err != nil { if err != nil {
a.logger.Debug().WithError(err).WithField("id", id).Log("Unable to leave cluster") a.logger.Debug().WithError(err).WithField("id", id).Log("Unable to leave cluster")
return Err(http.StatusInternalServerError, "unable to leave cluster", "%s", err) return Err(http.StatusInternalServerError, "unable to leave cluster", "%s", err.Error())
} }
return c.JSON(http.StatusOK, "OK") return c.JSON(http.StatusOK, "OK")
@@ -221,7 +227,7 @@ func (a *api) Snapshot(c echo.Context) error {
data, err := a.cluster.Snapshot() data, err := a.cluster.Snapshot()
if err != nil { if err != nil {
a.logger.Debug().WithError(err).Log("Unable to create snaphot") a.logger.Debug().WithError(err).Log("Unable to create snaphot")
return Err(http.StatusInternalServerError, "unable to create snapshot", "%s", err) return Err(http.StatusInternalServerError, "unable to create snapshot", "%s", err.Error())
} }
defer data.Close() defer data.Close()
@@ -247,7 +253,7 @@ func (a *api) AddProcess(c echo.Context) error {
r := client.AddProcessRequest{} r := client.AddProcessRequest{}
if err := util.ShouldBindJSON(c, &r); err != nil { if err := util.ShouldBindJSON(c, &r); err != nil {
return Err(http.StatusBadRequest, "Invalid JSON", "%s", err) return Err(http.StatusBadRequest, "Invalid JSON", "%s", err.Error())
} }
origin := c.Request().Header.Get("X-Cluster-Origin") origin := c.Request().Header.Get("X-Cluster-Origin")
@@ -261,7 +267,7 @@ func (a *api) AddProcess(c echo.Context) error {
err := a.cluster.AddProcess(origin, &r.Config) err := a.cluster.AddProcess(origin, &r.Config)
if err != nil { if err != nil {
a.logger.Debug().WithError(err).WithField("id", r.Config.ID).Log("Unable to add process") a.logger.Debug().WithError(err).WithField("id", r.Config.ID).Log("Unable to add process")
return Err(http.StatusInternalServerError, "unable to add process", "%s", err) return Err(http.StatusInternalServerError, "unable to add process", "%s", err.Error())
} }
return c.JSON(http.StatusOK, "OK") return c.JSON(http.StatusOK, "OK")
@@ -297,7 +303,7 @@ func (a *api) RemoveProcess(c echo.Context) error {
err := a.cluster.RemoveProcess(origin, pid) err := a.cluster.RemoveProcess(origin, pid)
if err != nil { if err != nil {
a.logger.Debug().WithError(err).WithField("id", pid).Log("Unable to remove process") a.logger.Debug().WithError(err).WithField("id", pid).Log("Unable to remove process")
return Err(http.StatusInternalServerError, "unable to remove process", "%s", err) return Err(http.StatusInternalServerError, "unable to remove process", "%s", err.Error())
} }
return c.JSON(http.StatusOK, "OK") return c.JSON(http.StatusOK, "OK")
@@ -324,7 +330,7 @@ func (a *api) UpdateProcess(c echo.Context) error {
r := client.UpdateProcessRequest{} r := client.UpdateProcessRequest{}
if err := util.ShouldBindJSON(c, &r); err != nil { if err := util.ShouldBindJSON(c, &r); err != nil {
return Err(http.StatusBadRequest, "Invalid JSON", "%s", err) return Err(http.StatusBadRequest, "Invalid JSON", "%s", err.Error())
} }
origin := c.Request().Header.Get("X-Cluster-Origin") origin := c.Request().Header.Get("X-Cluster-Origin")
@@ -343,7 +349,7 @@ func (a *api) UpdateProcess(c echo.Context) error {
err := a.cluster.UpdateProcess(origin, pid, &r.Config) err := a.cluster.UpdateProcess(origin, pid, &r.Config)
if err != nil { if err != nil {
a.logger.Debug().WithError(err).WithField("id", pid).Log("Unable to update process") a.logger.Debug().WithError(err).WithField("id", pid).Log("Unable to update process")
return Err(http.StatusInternalServerError, "unable to update process", "%s", err) return Err(http.StatusInternalServerError, "unable to update process", "%s", err.Error())
} }
return c.JSON(http.StatusOK, "OK") return c.JSON(http.StatusOK, "OK")
@@ -371,7 +377,7 @@ func (a *api) SetProcessMetadata(c echo.Context) error {
r := client.SetProcessMetadataRequest{} r := client.SetProcessMetadataRequest{}
if err := util.ShouldBindJSON(c, &r); err != nil { if err := util.ShouldBindJSON(c, &r); err != nil {
return Err(http.StatusBadRequest, "Invalid JSON", "%s", err) return Err(http.StatusBadRequest, "Invalid JSON", "%s", err.Error())
} }
origin := c.Request().Header.Get("X-Cluster-Origin") origin := c.Request().Header.Get("X-Cluster-Origin")
@@ -385,7 +391,7 @@ func (a *api) SetProcessMetadata(c echo.Context) error {
err := a.cluster.SetProcessMetadata(origin, pid, key, r.Metadata) err := a.cluster.SetProcessMetadata(origin, pid, key, r.Metadata)
if err != nil { if err != nil {
a.logger.Debug().WithError(err).WithField("id", pid).Log("Unable to update metadata") a.logger.Debug().WithError(err).WithField("id", pid).Log("Unable to update metadata")
return Err(http.StatusInternalServerError, "unable to update metadata", "%s", err) return Err(http.StatusInternalServerError, "unable to update metadata", "%s", err.Error())
} }
return c.JSON(http.StatusOK, "OK") return c.JSON(http.StatusOK, "OK")
@@ -409,7 +415,7 @@ func (a *api) AddIdentity(c echo.Context) error {
r := client.AddIdentityRequest{} r := client.AddIdentityRequest{}
if err := util.ShouldBindJSON(c, &r); err != nil { if err := util.ShouldBindJSON(c, &r); err != nil {
return Err(http.StatusBadRequest, "Invalid JSON", "%s", err) return Err(http.StatusBadRequest, "Invalid JSON", "%s", err.Error())
} }
origin := c.Request().Header.Get("X-Cluster-Origin") origin := c.Request().Header.Get("X-Cluster-Origin")
@@ -423,7 +429,7 @@ func (a *api) AddIdentity(c echo.Context) error {
err := a.cluster.AddIdentity(origin, r.Identity) err := a.cluster.AddIdentity(origin, r.Identity)
if err != nil { if err != nil {
a.logger.Debug().WithError(err).WithField("identity", r.Identity).Log("Unable to add identity") a.logger.Debug().WithError(err).WithField("identity", r.Identity).Log("Unable to add identity")
return Err(http.StatusInternalServerError, "unable to add identity", "%s", err) return Err(http.StatusInternalServerError, "unable to add identity", "%s", err.Error())
} }
return c.JSON(http.StatusOK, "OK") return c.JSON(http.StatusOK, "OK")
@@ -448,7 +454,7 @@ func (a *api) UpdateIdentity(c echo.Context) error {
r := client.UpdateIdentityRequest{} r := client.UpdateIdentityRequest{}
if err := util.ShouldBindJSON(c, &r); err != nil { if err := util.ShouldBindJSON(c, &r); err != nil {
return Err(http.StatusBadRequest, "Invalid JSON", "%s", err) return Err(http.StatusBadRequest, "Invalid JSON", "%s", err.Error())
} }
origin := c.Request().Header.Get("X-Cluster-Origin") origin := c.Request().Header.Get("X-Cluster-Origin")
@@ -468,7 +474,7 @@ func (a *api) UpdateIdentity(c echo.Context) error {
"name": name, "name": name,
"identity": r.Identity, "identity": r.Identity,
}).Log("Unable to add identity") }).Log("Unable to add identity")
return Err(http.StatusInternalServerError, "unable to update identity", "%s", err) return Err(http.StatusInternalServerError, "unable to update identity", "%s", err.Error())
} }
return c.JSON(http.StatusOK, "OK") return c.JSON(http.StatusOK, "OK")
@@ -493,7 +499,7 @@ func (a *api) SetIdentityPolicies(c echo.Context) error {
r := client.SetPoliciesRequest{} r := client.SetPoliciesRequest{}
if err := util.ShouldBindJSON(c, &r); err != nil { if err := util.ShouldBindJSON(c, &r); err != nil {
return Err(http.StatusBadRequest, "Invalid JSON", "%s", err) return Err(http.StatusBadRequest, "Invalid JSON", "%s", err.Error())
} }
origin := c.Request().Header.Get("X-Cluster-Origin") origin := c.Request().Header.Get("X-Cluster-Origin")
@@ -507,7 +513,7 @@ func (a *api) SetIdentityPolicies(c echo.Context) error {
err := a.cluster.SetPolicies(origin, name, r.Policies) err := a.cluster.SetPolicies(origin, name, r.Policies)
if err != nil { if err != nil {
a.logger.Debug().WithError(err).WithField("policies", r.Policies).Log("Unable to set policies") a.logger.Debug().WithError(err).WithField("policies", r.Policies).Log("Unable to set policies")
return Err(http.StatusInternalServerError, "unable to add identity", "%s", err) return Err(http.StatusInternalServerError, "unable to add identity", "%s", err.Error())
} }
return c.JSON(http.StatusOK, "OK") return c.JSON(http.StatusOK, "OK")
@@ -539,7 +545,7 @@ func (a *api) RemoveIdentity(c echo.Context) error {
err := a.cluster.RemoveIdentity(origin, name) err := a.cluster.RemoveIdentity(origin, name)
if err != nil { if err != nil {
a.logger.Debug().WithError(err).WithField("identity", name).Log("Unable to remove identity") a.logger.Debug().WithError(err).WithField("identity", name).Log("Unable to remove identity")
return Err(http.StatusInternalServerError, "unable to remove identity", "%s", err) return Err(http.StatusInternalServerError, "unable to remove identity", "%s", err.Error())
} }
return c.JSON(http.StatusOK, "OK") return c.JSON(http.StatusOK, "OK")
@@ -575,7 +581,7 @@ func (a *api) Lock(c echo.Context) error {
r := client.LockRequest{} r := client.LockRequest{}
if err := util.ShouldBindJSON(c, &r); err != nil { if err := util.ShouldBindJSON(c, &r); err != nil {
return Err(http.StatusBadRequest, "Invalid JSON", "%s", err) return Err(http.StatusBadRequest, "Invalid JSON", "%s", err.Error())
} }
origin := c.Request().Header.Get("X-Cluster-Origin") origin := c.Request().Header.Get("X-Cluster-Origin")
@@ -627,6 +633,74 @@ func (a *api) Unlock(c echo.Context) error {
return c.JSON(http.StatusOK, "OK") return c.JSON(http.StatusOK, "OK")
} }
// SetKV stores the value under key
// @Summary Store value under key
// @Description Store value under key
// @Tags v1.0.0
// @ID cluster-1-kv-set
// @Produce json
// @Param data body client.SetKVRequest true "Set KV request"
// @Param X-Cluster-Origin header string false "Origin ID of request"
// @Success 200 {string} string
// @Success 500 {object} Error
// @Failure 508 {object} Error
// @Router /v1/kv [post]
func (a *api) SetKV(c echo.Context) error {
r := client.SetKVRequest{}
if err := util.ShouldBindJSON(c, &r); err != nil {
return Err(http.StatusBadRequest, "Invalid JSON", "%s", err.Error())
}
origin := c.Request().Header.Get("X-Cluster-Origin")
if origin == a.id {
return Err(http.StatusLoopDetected, "", "breaking circuit")
}
a.logger.Debug().WithField("key", r.Key).Log("Store value")
err := a.cluster.SetKV(origin, r.Key, r.Value)
if err != nil {
a.logger.Debug().WithError(err).WithField("key", r.Key).Log("Unable to store value")
return Err(http.StatusInternalServerError, "unable to store value", "%s", err.Error())
}
return c.JSON(http.StatusOK, "OK")
}
// UnsetKV removes a key
// @Summary Removes a key
// @Description Removes a key
// @Tags v1.0.0
// @ID cluster-1-kv-unset
// @Produce json
// @Param name path string true "Key name"
// @Param X-Cluster-Origin header string false "Origin ID of request"
// @Success 200 {string} string
// @Failure 404 {object} Error
// @Failure 508 {object} Error
// @Router /v1/kv/{key} [delete]
func (a *api) UnsetKV(c echo.Context) error {
key := util.PathParam(c, "key")
origin := c.Request().Header.Get("X-Cluster-Origin")
if origin == a.id {
return Err(http.StatusLoopDetected, "", "breaking circuit")
}
a.logger.Debug().WithField("key", key).Log("Delete key")
err := a.cluster.UnsetKV(origin, key)
if err != nil {
a.logger.Debug().WithError(err).WithField("key", key).Log("Unable to remove key")
return Err(http.StatusInternalServerError, "unable to remove key", "%s", err.Error())
}
return c.JSON(http.StatusOK, "OK")
}
// Error represents an error response of the API // Error represents an error response of the API
type Error struct { type Error struct {
Code int `json:"code" jsonschema:"required" format:"int"` Code int `json:"code" jsonschema:"required" format:"int"`

View File

@@ -49,6 +49,11 @@ type LockRequest struct {
ValidUntil time.Time `json:"valid_until"` ValidUntil time.Time `json:"valid_until"`
} }
type SetKVRequest struct {
Key string `json:"key"`
Value string `json:"value"`
}
type APIClient struct { type APIClient struct {
Address string Address string
Client *http.Client Client *http.Client
@@ -196,6 +201,23 @@ func (c *APIClient) Unlock(origin string, name string) error {
return err return err
} }
func (c *APIClient) SetKV(origin string, r SetKVRequest) error {
data, err := json.Marshal(r)
if err != nil {
return err
}
_, err = c.call(http.MethodPost, "/v1/kv", "application/json", bytes.NewReader(data), origin)
return err
}
func (c *APIClient) UnsetKV(origin string, key string) error {
_, err := c.call(http.MethodDelete, "/v1/kv/"+url.PathEscape(key), "application/json", nil, origin)
return err
}
func (c *APIClient) Snapshot() (io.ReadCloser, error) { func (c *APIClient) Snapshot() (io.ReadCloser, error) {
return c.stream(http.MethodGet, "/v1/snapshot", "", nil, "") return c.stream(http.MethodGet, "/v1/snapshot", "", nil, "")
} }

View File

@@ -69,6 +69,10 @@ type Cluster interface {
DeleteLock(origin string, name string) error DeleteLock(origin string, name string) error
ListLocks() map[string]time.Time ListLocks() map[string]time.Time
SetKV(origin, key, value string) error
UnsetKV(origin, key string) error
GetKV(key string) (string, time.Time, error)
ProxyReader() proxy.ProxyReader ProxyReader() proxy.ProxyReader
} }
@@ -1063,6 +1067,54 @@ func (c *cluster) ListLocks() map[string]time.Time {
return c.store.ListLocks() return c.store.ListLocks()
} }
func (c *cluster) SetKV(origin, key, value string) error {
if ok, _ := c.IsDegraded(); ok {
return ErrDegraded
}
if !c.IsRaftLeader() {
return c.forwarder.SetKV(origin, key, value)
}
cmd := &store.Command{
Operation: store.OpSetKV,
Data: &store.CommandSetKV{
Key: key,
Value: value,
},
}
return c.applyCommand(cmd)
}
func (c *cluster) UnsetKV(origin, key string) error {
if ok, _ := c.IsDegraded(); ok {
return ErrDegraded
}
if !c.IsRaftLeader() {
return c.forwarder.UnsetKV(origin, key)
}
cmd := &store.Command{
Operation: store.OpUnsetKV,
Data: &store.CommandUnsetKV{
Key: key,
},
}
return c.applyCommand(cmd)
}
func (c *cluster) GetKV(key string) (string, time.Time, error) {
value, err := c.store.GetFromKVS(key)
if err != nil {
return "", time.Time{}, err
}
return value.Value, value.UpdatedAt, nil
}
func (c *cluster) applyCommand(cmd *store.Command) error { func (c *cluster) applyCommand(cmd *store.Command) error {
b, err := json.Marshal(cmd) b, err := json.Marshal(cmd)
if err != nil { if err != nil {

View File

@@ -34,6 +34,9 @@ type Forwarder interface {
CreateLock(origin string, name string, validUntil time.Time) error CreateLock(origin string, name string, validUntil time.Time) error
DeleteLock(origin string, name string) error DeleteLock(origin string, name string) error
SetKV(origin, key, value string) error
UnsetKV(origin, key string) error
} }
type forwarder struct { type forwarder struct {
@@ -290,3 +293,32 @@ func (f *forwarder) DeleteLock(origin string, name string) error {
return client.Unlock(origin, name) return client.Unlock(origin, name)
} }
func (f *forwarder) SetKV(origin, key, value string) error {
if origin == "" {
origin = f.id
}
r := apiclient.SetKVRequest{
Key: key,
Value: value,
}
f.lock.RLock()
client := f.client
f.lock.RUnlock()
return client.SetKV(origin, r)
}
func (f *forwarder) UnsetKV(origin, key string) error {
if origin == "" {
origin = f.id
}
f.lock.RLock()
client := f.client
f.lock.RUnlock()
return client.UnsetKV(origin, key)
}

View File

@@ -4,6 +4,8 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"io/fs"
"strings"
"sync" "sync"
"time" "time"
@@ -30,6 +32,9 @@ type Store interface {
ListUserPolicies(name string) Policies ListUserPolicies(name string) Policies
ListLocks() map[string]time.Time ListLocks() map[string]time.Time
ListKVS(prefix string) map[string]Value
GetFromKVS(key string) (Value, error)
} }
type Process struct { type Process struct {
@@ -49,6 +54,11 @@ type Policies struct {
Policies []access.Policy Policies []access.Policy
} }
type Value struct {
Value string
UpdatedAt time.Time
}
type Operation string type Operation string
const ( const (
@@ -64,6 +74,8 @@ const (
OpCreateLock Operation = "createLock" OpCreateLock Operation = "createLock"
OpDeleteLock Operation = "deleteLock" OpDeleteLock Operation = "deleteLock"
OpClearLocks Operation = "clearLocks" OpClearLocks Operation = "clearLocks"
OpSetKV Operation = "setKV"
OpUnsetKV Operation = "unsetKV"
) )
type Command struct { type Command struct {
@@ -123,6 +135,15 @@ type CommandDeleteLock struct {
type CommandClearLocks struct{} type CommandClearLocks struct{}
type CommandSetKV struct {
Key string
Value string
}
type CommandUnsetKV struct {
Key string
}
type storeData struct { type storeData struct {
Version uint64 Version uint64
Process map[string]Process Process map[string]Process
@@ -139,6 +160,8 @@ type storeData struct {
} }
Locks map[string]time.Time Locks map[string]time.Time
KVS map[string]Value
} }
func (s *storeData) init() { func (s *storeData) init() {
@@ -152,6 +175,7 @@ func (s *storeData) init() {
s.Policies.UpdatedAt = now s.Policies.UpdatedAt = now
s.Policies.Policies = map[string][]access.Policy{} s.Policies.Policies = map[string][]access.Policy{}
s.Locks = map[string]time.Time{} s.Locks = map[string]time.Time{}
s.KVS = map[string]Value{}
} }
// store implements a raft.FSM // store implements a raft.FSM
@@ -220,155 +244,133 @@ func (s *store) Apply(entry *raft.Log) interface{} {
return nil return nil
} }
func convertCommand[T any](cmd T, data any) error {
b, err := json.Marshal(data)
if err != nil {
return err
}
err = json.Unmarshal(b, cmd)
return err
}
func (s *store) applyCommand(c Command) error { func (s *store) applyCommand(c Command) error {
var b []byte
var err error = nil var err error = nil
switch c.Operation { switch c.Operation {
case OpAddProcess: case OpAddProcess:
b, err = json.Marshal(c.Data)
if err != nil {
break
}
cmd := CommandAddProcess{} cmd := CommandAddProcess{}
err = json.Unmarshal(b, &cmd) err = convertCommand(&cmd, c.Data)
if err != nil { if err != nil {
break break
} }
err = s.addProcess(cmd) err = s.addProcess(cmd)
case OpRemoveProcess: case OpRemoveProcess:
b, err = json.Marshal(c.Data)
if err != nil {
break
}
cmd := CommandRemoveProcess{} cmd := CommandRemoveProcess{}
err = json.Unmarshal(b, &cmd) err = convertCommand(&cmd, c.Data)
if err != nil { if err != nil {
break break
} }
err = s.removeProcess(cmd) err = s.removeProcess(cmd)
case OpUpdateProcess: case OpUpdateProcess:
b, err = json.Marshal(c.Data)
if err != nil {
break
}
cmd := CommandUpdateProcess{} cmd := CommandUpdateProcess{}
err = json.Unmarshal(b, &cmd) err = convertCommand(&cmd, c.Data)
if err != nil { if err != nil {
break break
} }
err = s.updateProcess(cmd) err = s.updateProcess(cmd)
case OpSetProcessMetadata: case OpSetProcessMetadata:
b, err = json.Marshal(c.Data)
if err != nil {
break
}
cmd := CommandSetProcessMetadata{} cmd := CommandSetProcessMetadata{}
err = json.Unmarshal(b, &cmd) err = convertCommand(&cmd, c.Data)
if err != nil { if err != nil {
break break
} }
err = s.setProcessMetadata(cmd) err = s.setProcessMetadata(cmd)
case OpAddIdentity: case OpAddIdentity:
b, err = json.Marshal(c.Data)
if err != nil {
break
}
cmd := CommandAddIdentity{} cmd := CommandAddIdentity{}
err = json.Unmarshal(b, &cmd) err = convertCommand(&cmd, c.Data)
if err != nil { if err != nil {
break break
} }
err = s.addIdentity(cmd) err = s.addIdentity(cmd)
case OpUpdateIdentity: case OpUpdateIdentity:
b, err = json.Marshal(c.Data)
if err != nil {
break
}
cmd := CommandUpdateIdentity{} cmd := CommandUpdateIdentity{}
err = json.Unmarshal(b, &cmd) err = convertCommand(&cmd, c.Data)
if err != nil { if err != nil {
break break
} }
err = s.updateIdentity(cmd) err = s.updateIdentity(cmd)
case OpRemoveIdentity: case OpRemoveIdentity:
b, err = json.Marshal(c.Data)
if err != nil {
break
}
cmd := CommandRemoveIdentity{} cmd := CommandRemoveIdentity{}
err = json.Unmarshal(b, &cmd) err = convertCommand(&cmd, c.Data)
if err != nil { if err != nil {
break break
} }
err = s.removeIdentity(cmd) err = s.removeIdentity(cmd)
case OpSetPolicies: case OpSetPolicies:
b, err = json.Marshal(c.Data)
if err != nil {
break
}
cmd := CommandSetPolicies{} cmd := CommandSetPolicies{}
err = json.Unmarshal(b, &cmd) err = convertCommand(&cmd, c.Data)
if err != nil { if err != nil {
break break
} }
err = s.setPolicies(cmd) err = s.setPolicies(cmd)
case OpSetProcessNodeMap: case OpSetProcessNodeMap:
b, err = json.Marshal(c.Data)
if err != nil {
break
}
cmd := CommandSetProcessNodeMap{} cmd := CommandSetProcessNodeMap{}
err = json.Unmarshal(b, &cmd) err = convertCommand(&cmd, c.Data)
if err != nil { if err != nil {
break break
} }
err = s.setProcessNodeMap(cmd) err = s.setProcessNodeMap(cmd)
case OpCreateLock: case OpCreateLock:
b, err = json.Marshal(c.Data)
if err != nil {
break
}
cmd := CommandCreateLock{} cmd := CommandCreateLock{}
err = json.Unmarshal(b, &cmd) err = convertCommand(&cmd, c.Data)
if err != nil { if err != nil {
break break
} }
err = s.createLock(cmd) err = s.createLock(cmd)
case OpDeleteLock: case OpDeleteLock:
b, err = json.Marshal(c.Data)
if err != nil {
break
}
cmd := CommandDeleteLock{} cmd := CommandDeleteLock{}
err = json.Unmarshal(b, &cmd) err = convertCommand(&cmd, c.Data)
if err != nil { if err != nil {
break break
} }
err = s.deleteLock(cmd) err = s.deleteLock(cmd)
case OpClearLocks: case OpClearLocks:
b, err = json.Marshal(c.Data)
if err != nil {
break
}
cmd := CommandClearLocks{} cmd := CommandClearLocks{}
err = json.Unmarshal(b, &cmd) err = convertCommand(&cmd, c.Data)
if err != nil { if err != nil {
break break
} }
err = s.clearLocks(cmd) err = s.clearLocks(cmd)
case OpSetKV:
cmd := CommandSetKV{}
err = convertCommand(&cmd, c.Data)
if err != nil {
break
}
err = s.setKV(cmd)
case OpUnsetKV:
cmd := CommandUnsetKV{}
err = convertCommand(&cmd, c.Data)
if err != nil {
break
}
err = s.unsetKV(cmd)
default: default:
s.logger.Warn().WithField("operation", c.Operation).Log("Unknown operation") s.logger.Warn().WithField("operation", c.Operation).Log("Unknown operation")
err = fmt.Errorf("unknown operation: %s", c.Operation) err = fmt.Errorf("unknown operation: %s", c.Operation)
@@ -639,6 +641,33 @@ func (s *store) clearLocks(cmd CommandClearLocks) error {
return nil return nil
} }
func (s *store) setKV(cmd CommandSetKV) error {
s.lock.Lock()
defer s.lock.Unlock()
value := s.data.KVS[cmd.Key]
value.Value = cmd.Value
value.UpdatedAt = time.Now()
s.data.KVS[cmd.Key] = value
return nil
}
func (s *store) unsetKV(cmd CommandUnsetKV) error {
s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.data.KVS[cmd.Key]; !ok {
return fs.ErrNotExist
}
delete(s.data.KVS, cmd.Key)
return nil
}
func (s *store) OnApply(fn func(op Operation)) { func (s *store) OnApply(fn func(op Operation)) {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
@@ -815,6 +844,35 @@ func (s *store) ListLocks() map[string]time.Time {
return m return m
} }
func (s *store) ListKVS(prefix string) map[string]Value {
s.lock.RLock()
defer s.lock.RUnlock()
m := map[string]Value{}
for key, value := range s.data.KVS {
if !strings.HasPrefix(key, prefix) {
continue
}
m[key] = value
}
return m
}
func (s *store) GetFromKVS(key string) (Value, error) {
s.lock.RLock()
defer s.lock.RUnlock()
value, ok := s.data.KVS[key]
if !ok {
return Value{}, fs.ErrNotExist
}
return value, nil
}
type fsmSnapshot struct { type fsmSnapshot struct {
data []byte data []byte
} }

View File

@@ -2,6 +2,7 @@ package store
import ( import (
"encoding/json" "encoding/json"
"io/fs"
"testing" "testing"
"time" "time"
@@ -35,6 +36,7 @@ func TestCreateStore(t *testing.T) {
require.NotNil(t, s.data.Users.Users) require.NotNil(t, s.data.Users.Users)
require.NotNil(t, s.data.Policies.Policies) require.NotNil(t, s.data.Policies.Policies)
require.NotNil(t, s.data.Locks) require.NotNil(t, s.data.Locks)
require.NotNil(t, s.data.KVS)
} }
func TestAddProcessCommand(t *testing.T) { func TestAddProcessCommand(t *testing.T) {
@@ -1014,6 +1016,111 @@ func TestClearLocks(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
} }
func TestSetKVCommand(t *testing.T) {
s, err := createStore()
require.NoError(t, err)
err = s.applyCommand(Command{
Operation: OpSetKV,
Data: CommandSetKV{
Key: "foo",
Value: "bar",
},
})
require.NoError(t, err)
_, ok := s.data.KVS["foo"]
require.True(t, ok)
}
func TestSetKV(t *testing.T) {
s, err := createStore()
require.NoError(t, err)
err = s.setKV(CommandSetKV{
Key: "foo",
Value: "bar",
})
require.NoError(t, err)
value, err := s.GetFromKVS("foo")
require.NoError(t, err)
require.Equal(t, "bar", value.Value)
updatedAt := value.UpdatedAt
err = s.setKV(CommandSetKV{
Key: "foo",
Value: "baz",
})
require.NoError(t, err)
value, err = s.GetFromKVS("foo")
require.NoError(t, err)
require.Equal(t, "baz", value.Value)
require.Greater(t, value.UpdatedAt, updatedAt)
}
func TestUnsetKVCommand(t *testing.T) {
s, err := createStore()
require.NoError(t, err)
err = s.applyCommand(Command{
Operation: OpSetKV,
Data: CommandSetKV{
Key: "foo",
Value: "bar",
},
})
require.NoError(t, err)
_, ok := s.data.KVS["foo"]
require.True(t, ok)
err = s.applyCommand(Command{
Operation: OpUnsetKV,
Data: CommandUnsetKV{
Key: "foo",
},
})
require.NoError(t, err)
_, ok = s.data.KVS["foo"]
require.False(t, ok)
err = s.applyCommand(Command{
Operation: OpUnsetKV,
Data: CommandUnsetKV{
Key: "foo",
},
})
require.Error(t, err)
require.Equal(t, fs.ErrNotExist, err)
}
func TestUnsetKV(t *testing.T) {
s, err := createStore()
require.NoError(t, err)
err = s.setKV(CommandSetKV{
Key: "foo",
Value: "bar",
})
require.NoError(t, err)
_, err = s.GetFromKVS("foo")
require.NoError(t, err)
err = s.unsetKV(CommandUnsetKV{
Key: "foo",
})
require.NoError(t, err)
_, err = s.GetFromKVS("foo")
require.Error(t, err)
require.Equal(t, fs.ErrNotExist, err)
}
func TestApplyCommand(t *testing.T) { func TestApplyCommand(t *testing.T) {
s, err := createStore() s, err := createStore()
require.NoError(t, err) require.NoError(t, err)