diff --git a/cluster/api.go b/cluster/api.go index 6c082fe2..94dacd56 100644 --- a/cluster/api.go +++ b/cluster/api.go @@ -118,6 +118,12 @@ func NewAPI(config APIConfig) (API, error) { a.router.PUT("/v1/iam/user/:name/policies", a.SetIdentityPolicies) a.router.DELETE("/v1/iam/user/:name", a.RemoveIdentity) + a.router.POST("/v1/lock", a.Lock) + a.router.DELETE("/v1/lock/:name", a.Unlock) + + a.router.POST("/v1/kv", a.SetKV) + a.router.DELETE("/v1/kv/:key", a.UnsetKV) + a.router.GET("/v1/core", a.CoreAPIAddress) return a, nil @@ -151,7 +157,7 @@ func (a *api) AddServer(c echo.Context) error { r := client.JoinRequest{} if err := util.ShouldBindJSON(c, &r); err != nil { - return Err(http.StatusBadRequest, "Invalid JSON", "%s", err) + return Err(http.StatusBadRequest, "Invalid JSON", "%s", err.Error()) } a.logger.Debug().WithFields(log.Fields{ @@ -168,7 +174,7 @@ func (a *api) AddServer(c echo.Context) error { err := a.cluster.Join(origin, r.ID, r.RaftAddress, "") if err != nil { a.logger.Debug().WithError(err).WithField("id", r.ID).Log("Unable to join cluster") - return Err(http.StatusInternalServerError, "unable to join cluster", "%s", err) + return Err(http.StatusInternalServerError, "unable to join cluster", "%s", err.Error()) } return c.JSON(http.StatusOK, "OK") @@ -202,7 +208,7 @@ func (a *api) RemoveServer(c echo.Context) error { err := a.cluster.Leave(origin, id) if err != nil { a.logger.Debug().WithError(err).WithField("id", id).Log("Unable to leave cluster") - return Err(http.StatusInternalServerError, "unable to leave cluster", "%s", err) + return Err(http.StatusInternalServerError, "unable to leave cluster", "%s", err.Error()) } return c.JSON(http.StatusOK, "OK") @@ -221,7 +227,7 @@ func (a *api) Snapshot(c echo.Context) error { data, err := a.cluster.Snapshot() if err != nil { a.logger.Debug().WithError(err).Log("Unable to create snaphot") - return Err(http.StatusInternalServerError, "unable to create snapshot", "%s", err) + return Err(http.StatusInternalServerError, "unable to create snapshot", "%s", err.Error()) } defer data.Close() @@ -247,7 +253,7 @@ func (a *api) AddProcess(c echo.Context) error { r := client.AddProcessRequest{} if err := util.ShouldBindJSON(c, &r); err != nil { - return Err(http.StatusBadRequest, "Invalid JSON", "%s", err) + return Err(http.StatusBadRequest, "Invalid JSON", "%s", err.Error()) } origin := c.Request().Header.Get("X-Cluster-Origin") @@ -261,7 +267,7 @@ func (a *api) AddProcess(c echo.Context) error { err := a.cluster.AddProcess(origin, &r.Config) if err != nil { a.logger.Debug().WithError(err).WithField("id", r.Config.ID).Log("Unable to add process") - return Err(http.StatusInternalServerError, "unable to add process", "%s", err) + return Err(http.StatusInternalServerError, "unable to add process", "%s", err.Error()) } return c.JSON(http.StatusOK, "OK") @@ -297,7 +303,7 @@ func (a *api) RemoveProcess(c echo.Context) error { err := a.cluster.RemoveProcess(origin, pid) if err != nil { a.logger.Debug().WithError(err).WithField("id", pid).Log("Unable to remove process") - return Err(http.StatusInternalServerError, "unable to remove process", "%s", err) + return Err(http.StatusInternalServerError, "unable to remove process", "%s", err.Error()) } return c.JSON(http.StatusOK, "OK") @@ -324,7 +330,7 @@ func (a *api) UpdateProcess(c echo.Context) error { r := client.UpdateProcessRequest{} if err := util.ShouldBindJSON(c, &r); err != nil { - return Err(http.StatusBadRequest, "Invalid JSON", "%s", err) + return Err(http.StatusBadRequest, "Invalid JSON", "%s", err.Error()) } origin := c.Request().Header.Get("X-Cluster-Origin") @@ -343,7 +349,7 @@ func (a *api) UpdateProcess(c echo.Context) error { err := a.cluster.UpdateProcess(origin, pid, &r.Config) if err != nil { a.logger.Debug().WithError(err).WithField("id", pid).Log("Unable to update process") - return Err(http.StatusInternalServerError, "unable to update process", "%s", err) + return Err(http.StatusInternalServerError, "unable to update process", "%s", err.Error()) } return c.JSON(http.StatusOK, "OK") @@ -371,7 +377,7 @@ func (a *api) SetProcessMetadata(c echo.Context) error { r := client.SetProcessMetadataRequest{} if err := util.ShouldBindJSON(c, &r); err != nil { - return Err(http.StatusBadRequest, "Invalid JSON", "%s", err) + return Err(http.StatusBadRequest, "Invalid JSON", "%s", err.Error()) } origin := c.Request().Header.Get("X-Cluster-Origin") @@ -385,7 +391,7 @@ func (a *api) SetProcessMetadata(c echo.Context) error { err := a.cluster.SetProcessMetadata(origin, pid, key, r.Metadata) if err != nil { a.logger.Debug().WithError(err).WithField("id", pid).Log("Unable to update metadata") - return Err(http.StatusInternalServerError, "unable to update metadata", "%s", err) + return Err(http.StatusInternalServerError, "unable to update metadata", "%s", err.Error()) } return c.JSON(http.StatusOK, "OK") @@ -409,7 +415,7 @@ func (a *api) AddIdentity(c echo.Context) error { r := client.AddIdentityRequest{} if err := util.ShouldBindJSON(c, &r); err != nil { - return Err(http.StatusBadRequest, "Invalid JSON", "%s", err) + return Err(http.StatusBadRequest, "Invalid JSON", "%s", err.Error()) } origin := c.Request().Header.Get("X-Cluster-Origin") @@ -423,7 +429,7 @@ func (a *api) AddIdentity(c echo.Context) error { err := a.cluster.AddIdentity(origin, r.Identity) if err != nil { a.logger.Debug().WithError(err).WithField("identity", r.Identity).Log("Unable to add identity") - return Err(http.StatusInternalServerError, "unable to add identity", "%s", err) + return Err(http.StatusInternalServerError, "unable to add identity", "%s", err.Error()) } return c.JSON(http.StatusOK, "OK") @@ -448,7 +454,7 @@ func (a *api) UpdateIdentity(c echo.Context) error { r := client.UpdateIdentityRequest{} if err := util.ShouldBindJSON(c, &r); err != nil { - return Err(http.StatusBadRequest, "Invalid JSON", "%s", err) + return Err(http.StatusBadRequest, "Invalid JSON", "%s", err.Error()) } origin := c.Request().Header.Get("X-Cluster-Origin") @@ -468,7 +474,7 @@ func (a *api) UpdateIdentity(c echo.Context) error { "name": name, "identity": r.Identity, }).Log("Unable to add identity") - return Err(http.StatusInternalServerError, "unable to update identity", "%s", err) + return Err(http.StatusInternalServerError, "unable to update identity", "%s", err.Error()) } return c.JSON(http.StatusOK, "OK") @@ -493,7 +499,7 @@ func (a *api) SetIdentityPolicies(c echo.Context) error { r := client.SetPoliciesRequest{} if err := util.ShouldBindJSON(c, &r); err != nil { - return Err(http.StatusBadRequest, "Invalid JSON", "%s", err) + return Err(http.StatusBadRequest, "Invalid JSON", "%s", err.Error()) } origin := c.Request().Header.Get("X-Cluster-Origin") @@ -507,7 +513,7 @@ func (a *api) SetIdentityPolicies(c echo.Context) error { err := a.cluster.SetPolicies(origin, name, r.Policies) if err != nil { a.logger.Debug().WithError(err).WithField("policies", r.Policies).Log("Unable to set policies") - return Err(http.StatusInternalServerError, "unable to add identity", "%s", err) + return Err(http.StatusInternalServerError, "unable to add identity", "%s", err.Error()) } return c.JSON(http.StatusOK, "OK") @@ -539,7 +545,7 @@ func (a *api) RemoveIdentity(c echo.Context) error { err := a.cluster.RemoveIdentity(origin, name) if err != nil { a.logger.Debug().WithError(err).WithField("identity", name).Log("Unable to remove identity") - return Err(http.StatusInternalServerError, "unable to remove identity", "%s", err) + return Err(http.StatusInternalServerError, "unable to remove identity", "%s", err.Error()) } return c.JSON(http.StatusOK, "OK") @@ -575,7 +581,7 @@ func (a *api) Lock(c echo.Context) error { r := client.LockRequest{} if err := util.ShouldBindJSON(c, &r); err != nil { - return Err(http.StatusBadRequest, "Invalid JSON", "%s", err) + return Err(http.StatusBadRequest, "Invalid JSON", "%s", err.Error()) } origin := c.Request().Header.Get("X-Cluster-Origin") @@ -627,6 +633,74 @@ func (a *api) Unlock(c echo.Context) error { return c.JSON(http.StatusOK, "OK") } +// SetKV stores the value under key +// @Summary Store value under key +// @Description Store value under key +// @Tags v1.0.0 +// @ID cluster-1-kv-set +// @Produce json +// @Param data body client.SetKVRequest true "Set KV request" +// @Param X-Cluster-Origin header string false "Origin ID of request" +// @Success 200 {string} string +// @Success 500 {object} Error +// @Failure 508 {object} Error +// @Router /v1/kv [post] +func (a *api) SetKV(c echo.Context) error { + r := client.SetKVRequest{} + + if err := util.ShouldBindJSON(c, &r); err != nil { + return Err(http.StatusBadRequest, "Invalid JSON", "%s", err.Error()) + } + + origin := c.Request().Header.Get("X-Cluster-Origin") + + if origin == a.id { + return Err(http.StatusLoopDetected, "", "breaking circuit") + } + + a.logger.Debug().WithField("key", r.Key).Log("Store value") + + err := a.cluster.SetKV(origin, r.Key, r.Value) + if err != nil { + a.logger.Debug().WithError(err).WithField("key", r.Key).Log("Unable to store value") + return Err(http.StatusInternalServerError, "unable to store value", "%s", err.Error()) + } + + return c.JSON(http.StatusOK, "OK") +} + +// UnsetKV removes a key +// @Summary Removes a key +// @Description Removes a key +// @Tags v1.0.0 +// @ID cluster-1-kv-unset +// @Produce json +// @Param name path string true "Key name" +// @Param X-Cluster-Origin header string false "Origin ID of request" +// @Success 200 {string} string +// @Failure 404 {object} Error +// @Failure 508 {object} Error +// @Router /v1/kv/{key} [delete] +func (a *api) UnsetKV(c echo.Context) error { + key := util.PathParam(c, "key") + + origin := c.Request().Header.Get("X-Cluster-Origin") + + if origin == a.id { + return Err(http.StatusLoopDetected, "", "breaking circuit") + } + + a.logger.Debug().WithField("key", key).Log("Delete key") + + err := a.cluster.UnsetKV(origin, key) + if err != nil { + a.logger.Debug().WithError(err).WithField("key", key).Log("Unable to remove key") + return Err(http.StatusInternalServerError, "unable to remove key", "%s", err.Error()) + } + + return c.JSON(http.StatusOK, "OK") +} + // Error represents an error response of the API type Error struct { Code int `json:"code" jsonschema:"required" format:"int"` diff --git a/cluster/client/client.go b/cluster/client/client.go index e6866f10..03cbcb96 100644 --- a/cluster/client/client.go +++ b/cluster/client/client.go @@ -49,6 +49,11 @@ type LockRequest struct { ValidUntil time.Time `json:"valid_until"` } +type SetKVRequest struct { + Key string `json:"key"` + Value string `json:"value"` +} + type APIClient struct { Address string Client *http.Client @@ -196,6 +201,23 @@ func (c *APIClient) Unlock(origin string, name string) error { return err } +func (c *APIClient) SetKV(origin string, r SetKVRequest) error { + data, err := json.Marshal(r) + if err != nil { + return err + } + + _, err = c.call(http.MethodPost, "/v1/kv", "application/json", bytes.NewReader(data), origin) + + return err +} + +func (c *APIClient) UnsetKV(origin string, key string) error { + _, err := c.call(http.MethodDelete, "/v1/kv/"+url.PathEscape(key), "application/json", nil, origin) + + return err +} + func (c *APIClient) Snapshot() (io.ReadCloser, error) { return c.stream(http.MethodGet, "/v1/snapshot", "", nil, "") } diff --git a/cluster/cluster.go b/cluster/cluster.go index 3d327626..296b18af 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -69,6 +69,10 @@ type Cluster interface { DeleteLock(origin string, name string) error ListLocks() map[string]time.Time + SetKV(origin, key, value string) error + UnsetKV(origin, key string) error + GetKV(key string) (string, time.Time, error) + ProxyReader() proxy.ProxyReader } @@ -1063,6 +1067,54 @@ func (c *cluster) ListLocks() map[string]time.Time { return c.store.ListLocks() } +func (c *cluster) SetKV(origin, key, value string) error { + if ok, _ := c.IsDegraded(); ok { + return ErrDegraded + } + + if !c.IsRaftLeader() { + return c.forwarder.SetKV(origin, key, value) + } + + cmd := &store.Command{ + Operation: store.OpSetKV, + Data: &store.CommandSetKV{ + Key: key, + Value: value, + }, + } + + return c.applyCommand(cmd) +} + +func (c *cluster) UnsetKV(origin, key string) error { + if ok, _ := c.IsDegraded(); ok { + return ErrDegraded + } + + if !c.IsRaftLeader() { + return c.forwarder.UnsetKV(origin, key) + } + + cmd := &store.Command{ + Operation: store.OpUnsetKV, + Data: &store.CommandUnsetKV{ + Key: key, + }, + } + + return c.applyCommand(cmd) +} + +func (c *cluster) GetKV(key string) (string, time.Time, error) { + value, err := c.store.GetFromKVS(key) + if err != nil { + return "", time.Time{}, err + } + + return value.Value, value.UpdatedAt, nil +} + func (c *cluster) applyCommand(cmd *store.Command) error { b, err := json.Marshal(cmd) if err != nil { diff --git a/cluster/forwarder/forwarder.go b/cluster/forwarder/forwarder.go index c3386cf3..d9a1ff1a 100644 --- a/cluster/forwarder/forwarder.go +++ b/cluster/forwarder/forwarder.go @@ -34,6 +34,9 @@ type Forwarder interface { CreateLock(origin string, name string, validUntil time.Time) error DeleteLock(origin string, name string) error + + SetKV(origin, key, value string) error + UnsetKV(origin, key string) error } type forwarder struct { @@ -290,3 +293,32 @@ func (f *forwarder) DeleteLock(origin string, name string) error { return client.Unlock(origin, name) } + +func (f *forwarder) SetKV(origin, key, value string) error { + if origin == "" { + origin = f.id + } + + r := apiclient.SetKVRequest{ + Key: key, + Value: value, + } + + f.lock.RLock() + client := f.client + f.lock.RUnlock() + + return client.SetKV(origin, r) +} + +func (f *forwarder) UnsetKV(origin, key string) error { + if origin == "" { + origin = f.id + } + + f.lock.RLock() + client := f.client + f.lock.RUnlock() + + return client.UnsetKV(origin, key) +} diff --git a/cluster/store/store.go b/cluster/store/store.go index 57e28173..b0579133 100644 --- a/cluster/store/store.go +++ b/cluster/store/store.go @@ -4,6 +4,8 @@ import ( "encoding/json" "fmt" "io" + "io/fs" + "strings" "sync" "time" @@ -30,6 +32,9 @@ type Store interface { ListUserPolicies(name string) Policies ListLocks() map[string]time.Time + + ListKVS(prefix string) map[string]Value + GetFromKVS(key string) (Value, error) } type Process struct { @@ -49,6 +54,11 @@ type Policies struct { Policies []access.Policy } +type Value struct { + Value string + UpdatedAt time.Time +} + type Operation string const ( @@ -64,6 +74,8 @@ const ( OpCreateLock Operation = "createLock" OpDeleteLock Operation = "deleteLock" OpClearLocks Operation = "clearLocks" + OpSetKV Operation = "setKV" + OpUnsetKV Operation = "unsetKV" ) type Command struct { @@ -123,6 +135,15 @@ type CommandDeleteLock struct { type CommandClearLocks struct{} +type CommandSetKV struct { + Key string + Value string +} + +type CommandUnsetKV struct { + Key string +} + type storeData struct { Version uint64 Process map[string]Process @@ -139,6 +160,8 @@ type storeData struct { } Locks map[string]time.Time + + KVS map[string]Value } func (s *storeData) init() { @@ -152,6 +175,7 @@ func (s *storeData) init() { s.Policies.UpdatedAt = now s.Policies.Policies = map[string][]access.Policy{} s.Locks = map[string]time.Time{} + s.KVS = map[string]Value{} } // store implements a raft.FSM @@ -220,155 +244,133 @@ func (s *store) Apply(entry *raft.Log) interface{} { return nil } +func convertCommand[T any](cmd T, data any) error { + b, err := json.Marshal(data) + if err != nil { + return err + } + + err = json.Unmarshal(b, cmd) + + return err +} + func (s *store) applyCommand(c Command) error { - var b []byte var err error = nil switch c.Operation { case OpAddProcess: - b, err = json.Marshal(c.Data) - if err != nil { - break - } cmd := CommandAddProcess{} - err = json.Unmarshal(b, &cmd) + err = convertCommand(&cmd, c.Data) if err != nil { break } err = s.addProcess(cmd) case OpRemoveProcess: - b, err = json.Marshal(c.Data) - if err != nil { - break - } cmd := CommandRemoveProcess{} - err = json.Unmarshal(b, &cmd) + err = convertCommand(&cmd, c.Data) if err != nil { break } err = s.removeProcess(cmd) case OpUpdateProcess: - b, err = json.Marshal(c.Data) - if err != nil { - break - } cmd := CommandUpdateProcess{} - err = json.Unmarshal(b, &cmd) + err = convertCommand(&cmd, c.Data) if err != nil { break } err = s.updateProcess(cmd) case OpSetProcessMetadata: - b, err = json.Marshal(c.Data) - if err != nil { - break - } cmd := CommandSetProcessMetadata{} - err = json.Unmarshal(b, &cmd) + err = convertCommand(&cmd, c.Data) if err != nil { break } err = s.setProcessMetadata(cmd) case OpAddIdentity: - b, err = json.Marshal(c.Data) - if err != nil { - break - } cmd := CommandAddIdentity{} - err = json.Unmarshal(b, &cmd) + err = convertCommand(&cmd, c.Data) if err != nil { break } err = s.addIdentity(cmd) case OpUpdateIdentity: - b, err = json.Marshal(c.Data) - if err != nil { - break - } cmd := CommandUpdateIdentity{} - err = json.Unmarshal(b, &cmd) + err = convertCommand(&cmd, c.Data) if err != nil { break } err = s.updateIdentity(cmd) case OpRemoveIdentity: - b, err = json.Marshal(c.Data) - if err != nil { - break - } cmd := CommandRemoveIdentity{} - err = json.Unmarshal(b, &cmd) + err = convertCommand(&cmd, c.Data) if err != nil { break } err = s.removeIdentity(cmd) case OpSetPolicies: - b, err = json.Marshal(c.Data) - if err != nil { - break - } cmd := CommandSetPolicies{} - err = json.Unmarshal(b, &cmd) + err = convertCommand(&cmd, c.Data) if err != nil { break } err = s.setPolicies(cmd) case OpSetProcessNodeMap: - b, err = json.Marshal(c.Data) - if err != nil { - break - } cmd := CommandSetProcessNodeMap{} - err = json.Unmarshal(b, &cmd) + err = convertCommand(&cmd, c.Data) if err != nil { break } err = s.setProcessNodeMap(cmd) case OpCreateLock: - b, err = json.Marshal(c.Data) - if err != nil { - break - } cmd := CommandCreateLock{} - err = json.Unmarshal(b, &cmd) + err = convertCommand(&cmd, c.Data) if err != nil { break } err = s.createLock(cmd) case OpDeleteLock: - b, err = json.Marshal(c.Data) - if err != nil { - break - } cmd := CommandDeleteLock{} - err = json.Unmarshal(b, &cmd) + err = convertCommand(&cmd, c.Data) if err != nil { break } err = s.deleteLock(cmd) case OpClearLocks: - b, err = json.Marshal(c.Data) - if err != nil { - break - } cmd := CommandClearLocks{} - err = json.Unmarshal(b, &cmd) + err = convertCommand(&cmd, c.Data) if err != nil { break } err = s.clearLocks(cmd) + case OpSetKV: + cmd := CommandSetKV{} + err = convertCommand(&cmd, c.Data) + if err != nil { + break + } + + err = s.setKV(cmd) + case OpUnsetKV: + cmd := CommandUnsetKV{} + err = convertCommand(&cmd, c.Data) + if err != nil { + break + } + + err = s.unsetKV(cmd) default: s.logger.Warn().WithField("operation", c.Operation).Log("Unknown operation") err = fmt.Errorf("unknown operation: %s", c.Operation) @@ -639,6 +641,33 @@ func (s *store) clearLocks(cmd CommandClearLocks) error { return nil } +func (s *store) setKV(cmd CommandSetKV) error { + s.lock.Lock() + defer s.lock.Unlock() + + value := s.data.KVS[cmd.Key] + + value.Value = cmd.Value + value.UpdatedAt = time.Now() + + s.data.KVS[cmd.Key] = value + + return nil +} + +func (s *store) unsetKV(cmd CommandUnsetKV) error { + s.lock.Lock() + defer s.lock.Unlock() + + if _, ok := s.data.KVS[cmd.Key]; !ok { + return fs.ErrNotExist + } + + delete(s.data.KVS, cmd.Key) + + return nil +} + func (s *store) OnApply(fn func(op Operation)) { s.lock.Lock() defer s.lock.Unlock() @@ -815,6 +844,35 @@ func (s *store) ListLocks() map[string]time.Time { return m } +func (s *store) ListKVS(prefix string) map[string]Value { + s.lock.RLock() + defer s.lock.RUnlock() + + m := map[string]Value{} + + for key, value := range s.data.KVS { + if !strings.HasPrefix(key, prefix) { + continue + } + + m[key] = value + } + + return m +} + +func (s *store) GetFromKVS(key string) (Value, error) { + s.lock.RLock() + defer s.lock.RUnlock() + + value, ok := s.data.KVS[key] + if !ok { + return Value{}, fs.ErrNotExist + } + + return value, nil +} + type fsmSnapshot struct { data []byte } diff --git a/cluster/store/store_test.go b/cluster/store/store_test.go index c08aa9b7..079c092d 100644 --- a/cluster/store/store_test.go +++ b/cluster/store/store_test.go @@ -2,6 +2,7 @@ package store import ( "encoding/json" + "io/fs" "testing" "time" @@ -35,6 +36,7 @@ func TestCreateStore(t *testing.T) { require.NotNil(t, s.data.Users.Users) require.NotNil(t, s.data.Policies.Policies) require.NotNil(t, s.data.Locks) + require.NotNil(t, s.data.KVS) } func TestAddProcessCommand(t *testing.T) { @@ -1014,6 +1016,111 @@ func TestClearLocks(t *testing.T) { require.NoError(t, err) } +func TestSetKVCommand(t *testing.T) { + s, err := createStore() + require.NoError(t, err) + + err = s.applyCommand(Command{ + Operation: OpSetKV, + Data: CommandSetKV{ + Key: "foo", + Value: "bar", + }, + }) + require.NoError(t, err) + + _, ok := s.data.KVS["foo"] + require.True(t, ok) +} + +func TestSetKV(t *testing.T) { + s, err := createStore() + require.NoError(t, err) + + err = s.setKV(CommandSetKV{ + Key: "foo", + Value: "bar", + }) + require.NoError(t, err) + + value, err := s.GetFromKVS("foo") + require.NoError(t, err) + require.Equal(t, "bar", value.Value) + + updatedAt := value.UpdatedAt + + err = s.setKV(CommandSetKV{ + Key: "foo", + Value: "baz", + }) + require.NoError(t, err) + + value, err = s.GetFromKVS("foo") + require.NoError(t, err) + require.Equal(t, "baz", value.Value) + require.Greater(t, value.UpdatedAt, updatedAt) +} + +func TestUnsetKVCommand(t *testing.T) { + s, err := createStore() + require.NoError(t, err) + + err = s.applyCommand(Command{ + Operation: OpSetKV, + Data: CommandSetKV{ + Key: "foo", + Value: "bar", + }, + }) + require.NoError(t, err) + + _, ok := s.data.KVS["foo"] + require.True(t, ok) + + err = s.applyCommand(Command{ + Operation: OpUnsetKV, + Data: CommandUnsetKV{ + Key: "foo", + }, + }) + require.NoError(t, err) + + _, ok = s.data.KVS["foo"] + require.False(t, ok) + + err = s.applyCommand(Command{ + Operation: OpUnsetKV, + Data: CommandUnsetKV{ + Key: "foo", + }, + }) + require.Error(t, err) + require.Equal(t, fs.ErrNotExist, err) +} + +func TestUnsetKV(t *testing.T) { + s, err := createStore() + require.NoError(t, err) + + err = s.setKV(CommandSetKV{ + Key: "foo", + Value: "bar", + }) + require.NoError(t, err) + + _, err = s.GetFromKVS("foo") + require.NoError(t, err) + + err = s.unsetKV(CommandUnsetKV{ + Key: "foo", + }) + require.NoError(t, err) + + _, err = s.GetFromKVS("foo") + require.Error(t, err) + require.Equal(t, fs.ErrNotExist, err) +} + func TestApplyCommand(t *testing.T) { s, err := createStore() require.NoError(t, err)