From 3883696034d27c3aefcb3c63050f3646981a3be7 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Tue, 11 Jul 2023 15:55:46 +0200 Subject: [PATCH] Write deploy errors to cluster DB --- cluster/api.go | 2 +- cluster/leader.go | 163 +++++++++++++++++++++++++++++++--- cluster/store/store.go | 35 ++++++++ cluster/store/store_test.go | 172 ++++++++++++++++++++++++++++++++++++ http/handler/api/cluster.go | 3 +- 5 files changed, 362 insertions(+), 13 deletions(-) diff --git a/cluster/api.go b/cluster/api.go index ae12f8aa..41eb96d0 100644 --- a/cluster/api.go +++ b/cluster/api.go @@ -968,7 +968,7 @@ func ErrorHandler(err error, c echo.Context) { } else { code = http.StatusInternalServerError message = http.StatusText(http.StatusInternalServerError) - details = strings.Split(fmt.Sprintf("%s", err), "\n") + details = strings.Split(err.Error(), "\n") } // Send response diff --git a/cluster/leader.go b/cluster/leader.go index 992aa472..2250202b 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -443,12 +443,23 @@ type processOpSkip struct { err error } -func (c *cluster) applyOpStack(stack []interface{}) { +type processOpError struct { + processid app.ProcessID + err error +} + +func (c *cluster) applyOpStack(stack []interface{}) []processOpError { + errors := []processOpError{} + for _, op := range stack { switch v := op.(type) { case processOpAdd: err := c.proxy.AddProcess(v.nodeid, v.config, v.metadata) if err != nil { + errors = append(errors, processOpError{ + processid: v.config.ProcessID(), + err: err, + }) c.logger.Info().WithError(err).WithFields(log.Fields{ "processid": v.config.ProcessID(), "nodeid": v.nodeid, @@ -459,34 +470,57 @@ func (c *cluster) applyOpStack(stack []interface{}) { if v.order == "start" { err = c.proxy.CommandProcess(v.nodeid, v.config.ProcessID(), "start") if err != nil { + errors = append(errors, processOpError{ + processid: v.config.ProcessID(), + err: err, + }) c.logger.Info().WithError(err).WithFields(log.Fields{ - "processid": v.config.ID, + "processid": v.config.ProcessID(), "nodeid": v.nodeid, }).Log("Starting process") break } } + + errors = append(errors, processOpError{ + processid: v.config.ProcessID(), + err: nil, + }) + c.logger.Info().WithFields(log.Fields{ - "processid": v.config.ID, + "processid": v.config.ProcessID(), "nodeid": v.nodeid, }).Log("Adding process") case processOpUpdate: err := c.proxy.UpdateProcess(v.nodeid, v.processid, v.config, v.metadata) if err != nil { + errors = append(errors, processOpError{ + processid: v.processid, + err: err, + }) c.logger.Info().WithError(err).WithFields(log.Fields{ - "processid": v.config.ID, + "processid": v.processid, "nodeid": v.nodeid, }).Log("Updating process") break } + errors = append(errors, processOpError{ + processid: v.processid, + err: nil, + }) + c.logger.Info().WithFields(log.Fields{ - "processid": v.config.ID, + "processid": v.config.ProcessID(), "nodeid": v.nodeid, }).Log("Updating process") case processOpDelete: err := c.proxy.DeleteProcess(v.nodeid, v.processid) if err != nil { + errors = append(errors, processOpError{ + processid: v.processid, + err: err, + }) c.logger.Info().WithError(err).WithFields(log.Fields{ "processid": v.processid, "nodeid": v.nodeid, @@ -494,6 +528,11 @@ func (c *cluster) applyOpStack(stack []interface{}) { break } + errors = append(errors, processOpError{ + processid: v.processid, + err: nil, + }) + c.logger.Info().WithFields(log.Fields{ "processid": v.processid, "nodeid": v.nodeid, @@ -501,8 +540,12 @@ func (c *cluster) applyOpStack(stack []interface{}) { case processOpMove: err := c.proxy.AddProcess(v.toNodeid, v.config, v.metadata) if err != nil { + errors = append(errors, processOpError{ + processid: v.config.ProcessID(), + err: err, + }) c.logger.Info().WithError(err).WithFields(log.Fields{ - "processid": v.config.ID, + "processid": v.config.ProcessID(), "fromnodeid": v.fromNodeid, "tonodeid": v.toNodeid, }).Log("Moving process, adding process") @@ -511,8 +554,12 @@ func (c *cluster) applyOpStack(stack []interface{}) { err = c.proxy.DeleteProcess(v.fromNodeid, v.config.ProcessID()) if err != nil { + errors = append(errors, processOpError{ + processid: v.config.ProcessID(), + err: err, + }) c.logger.Info().WithError(err).WithFields(log.Fields{ - "processid": v.config.ID, + "processid": v.config.ProcessID(), "fromnodeid": v.fromNodeid, "tonodeid": v.toNodeid, }).Log("Moving process, removing process") @@ -521,22 +568,35 @@ func (c *cluster) applyOpStack(stack []interface{}) { err = c.proxy.CommandProcess(v.toNodeid, v.config.ProcessID(), "start") if err != nil { + errors = append(errors, processOpError{ + processid: v.config.ProcessID(), + err: err, + }) c.logger.Info().WithError(err).WithFields(log.Fields{ - "processid": v.config.ID, + "processid": v.config.ProcessID(), "fromnodeid": v.fromNodeid, "tonodeid": v.toNodeid, }).Log("Moving process, starting process") break } + errors = append(errors, processOpError{ + processid: v.config.ProcessID(), + err: nil, + }) + c.logger.Info().WithFields(log.Fields{ - "processid": v.config.ID, + "processid": v.config.ProcessID(), "fromnodeid": v.fromNodeid, "tonodeid": v.toNodeid, }).Log("Moving process") case processOpStart: err := c.proxy.CommandProcess(v.nodeid, v.processid, "start") if err != nil { + errors = append(errors, processOpError{ + processid: v.processid, + err: err, + }) c.logger.Info().WithError(err).WithFields(log.Fields{ "processid": v.processid, "nodeid": v.nodeid, @@ -544,6 +604,11 @@ func (c *cluster) applyOpStack(stack []interface{}) { break } + errors = append(errors, processOpError{ + processid: v.processid, + err: nil, + }) + c.logger.Info().WithFields(log.Fields{ "processid": v.processid, "nodeid": v.nodeid, @@ -551,6 +616,10 @@ func (c *cluster) applyOpStack(stack []interface{}) { case processOpStop: err := c.proxy.CommandProcess(v.nodeid, v.processid, "stop") if err != nil { + errors = append(errors, processOpError{ + processid: v.processid, + err: err, + }) c.logger.Info().WithError(err).WithFields(log.Fields{ "processid": v.processid, "nodeid": v.nodeid, @@ -558,21 +627,35 @@ func (c *cluster) applyOpStack(stack []interface{}) { break } + errors = append(errors, processOpError{ + processid: v.processid, + err: nil, + }) + c.logger.Info().WithFields(log.Fields{ "processid": v.processid, "nodeid": v.nodeid, }).Log("Stopping process") case processOpReject: + errors = append(errors, processOpError(v)) c.logger.Warn().WithError(v.err).WithField("processid", v.processid).Log("Process rejected") case processOpSkip: + errors = append(errors, processOpError{ + processid: v.processid, + err: v.err, + }) c.logger.Warn().WithError(v.err).WithFields(log.Fields{ "nodeid": v.nodeid, "processid": v.processid, }).Log("Process skipped") + case processOpError: + errors = append(errors, v) default: c.logger.Warn().Log("Unknown operation on stack: %+v", v) } } + + return errors } func (c *cluster) doSynchronize(emergency bool) { @@ -608,7 +691,37 @@ func (c *cluster) doSynchronize(emergency bool) { c.applyCommand(cmd) } - c.applyOpStack(opStack) + errors := c.applyOpStack(opStack) + + if !emergency { + for _, e := range errors { + // Only apply the command if the error is different. + process, err := c.store.GetProcess(e.processid) + if err != nil { + continue + } + + if e.err != nil { + if process.Error == e.err.Error() { + continue + } + } else { + if len(process.Error) == 0 { + continue + } + } + + cmd := &store.Command{ + Operation: store.OpSetProcessError, + Data: store.CommandSetProcessError{ + ID: e.processid, + Error: e.err.Error(), + }, + } + + c.applyCommand(cmd) + } + } } func (c *cluster) doRebalance(emergency bool) { @@ -634,7 +747,35 @@ func (c *cluster) doRebalance(emergency bool) { opStack, _ := rebalance(have, nodesMap) - c.applyOpStack(opStack) + errors := c.applyOpStack(opStack) + + for _, e := range errors { + // Only apply the command if the error is different. + process, err := c.store.GetProcess(e.processid) + if err != nil { + continue + } + + if e.err != nil { + if process.Error == e.err.Error() { + continue + } + } else { + if len(process.Error) == 0 { + continue + } + } + + cmd := &store.Command{ + Operation: store.OpSetProcessError, + Data: store.CommandSetProcessError{ + ID: e.processid, + Error: e.err.Error(), + }, + } + + c.applyCommand(cmd) + } } // isMetadataUpdateRequired compares two metadata. It relies on the documented property that json.Marshal diff --git a/cluster/store/store.go b/cluster/store/store.go index 5b1fadc4..af4ba102 100644 --- a/cluster/store/store.go +++ b/cluster/store/store.go @@ -44,6 +44,7 @@ type Process struct { Config *app.Config Order string Metadata map[string]interface{} + Error string } type Users struct { @@ -69,6 +70,7 @@ const ( OpUpdateProcess Operation = "updateProcess" OpSetProcessOrder Operation = "setProcessOrder" OpSetProcessMetadata Operation = "setProcessMetadata" + OpSetProcessError Operation = "setProcessError" OpAddIdentity Operation = "addIdentity" OpUpdateIdentity Operation = "updateIdentity" OpRemoveIdentity Operation = "removeIdentity" @@ -110,6 +112,11 @@ type CommandSetProcessMetadata struct { Data interface{} } +type CommandSetProcessError struct { + ID app.ProcessID + Error string +} + type CommandAddIdentity struct { Identity identity.User } @@ -307,6 +314,14 @@ func (s *store) applyCommand(c Command) error { } err = s.setProcessMetadata(cmd) + case OpSetProcessError: + cmd := CommandSetProcessError{} + err = decodeCommand(&cmd, c.Data) + if err != nil { + break + } + + err = s.setProcessError(cmd) case OpAddIdentity: cmd := CommandAddIdentity{} err = decodeCommand(&cmd, c.Data) @@ -536,6 +551,24 @@ func (s *store) setProcessMetadata(cmd CommandSetProcessMetadata) error { return nil } +func (s *store) setProcessError(cmd CommandSetProcessError) error { + s.lock.Lock() + defer s.lock.Unlock() + + id := cmd.ID.String() + + p, ok := s.data.Process[id] + if !ok { + return fmt.Errorf("the process with the ID '%s' doesn't exists", cmd.ID) + } + + p.Error = cmd.Error + + s.data.Process[id] = p + + return nil +} + func (s *store) addIdentity(cmd CommandAddIdentity) error { s.lock.Lock() defer s.lock.Unlock() @@ -783,6 +816,7 @@ func (s *store) ListProcesses() []Process { Config: p.Config.Clone(), Order: p.Order, Metadata: p.Metadata, + Error: p.Error, }) } @@ -804,6 +838,7 @@ func (s *store) GetProcess(id app.ProcessID) (Process, error) { Config: process.Config.Clone(), Order: process.Order, Metadata: process.Metadata, + Error: process.Error, }, nil } diff --git a/cluster/store/store_test.go b/cluster/store/store_test.go index 079c092d..9ff49615 100644 --- a/cluster/store/store_test.go +++ b/cluster/store/store_test.go @@ -343,6 +343,92 @@ func TestUpdateProcess(t *testing.T) { require.NoError(t, err) } +func TestSetProcessOrderCommand(t *testing.T) { + s, err := createStore() + require.NoError(t, err) + + config := &app.Config{ + ID: "foobar", + LimitCPU: 1, + LimitMemory: 1, + } + + err = s.applyCommand(Command{ + Operation: OpAddProcess, + Data: CommandAddProcess{ + Config: config, + }, + }) + require.NoError(t, err) + require.NotEmpty(t, s.data.Process) + + p, err := s.GetProcess(config.ProcessID()) + require.NoError(t, err) + require.Equal(t, "stop", p.Order) + + err = s.applyCommand(Command{ + Operation: OpSetProcessOrder, + Data: CommandSetProcessOrder{ + ID: config.ProcessID(), + Order: "start", + }, + }) + require.NoError(t, err) + + p, err = s.GetProcess(config.ProcessID()) + require.NoError(t, err) + require.Equal(t, "start", p.Order) +} + +func TestSetProcessOrder(t *testing.T) { + s, err := createStore() + require.NoError(t, err) + + config := &app.Config{ + ID: "foobar", + LimitCPU: 1, + LimitMemory: 1, + } + + err = s.setProcessOrder(CommandSetProcessOrder{ + ID: config.ProcessID(), + Order: "start", + }) + require.Error(t, err) + + err = s.addProcess(CommandAddProcess{ + Config: config, + }) + require.NoError(t, err) + require.Equal(t, 1, len(s.data.Process)) + + err = s.setProcessOrder(CommandSetProcessOrder{ + ID: config.ProcessID(), + Order: "start", + }) + require.NoError(t, err) + + err = s.setProcessOrder(CommandSetProcessOrder{ + ID: config.ProcessID(), + Order: "stop", + }) + require.NoError(t, err) + + p, err := s.GetProcess(config.ProcessID()) + require.NoError(t, err) + require.Equal(t, "stop", p.Order) + + err = s.setProcessOrder(CommandSetProcessOrder{ + ID: config.ProcessID(), + Order: "start", + }) + require.NoError(t, err) + + p, err = s.GetProcess(config.ProcessID()) + require.NoError(t, err) + require.Equal(t, "start", p.Order) +} + func TestSetProcessMetadataCommand(t *testing.T) { s, err := createStore() require.NoError(t, err) @@ -460,6 +546,92 @@ func TestSetProcessMetadata(t *testing.T) { require.Equal(t, "bor", p.Metadata["foo"]) } +func TestSetProcessErrorCommand(t *testing.T) { + s, err := createStore() + require.NoError(t, err) + + config := &app.Config{ + ID: "foobar", + LimitCPU: 1, + LimitMemory: 1, + } + + err = s.applyCommand(Command{ + Operation: OpAddProcess, + Data: CommandAddProcess{ + Config: config, + }, + }) + require.NoError(t, err) + require.NotEmpty(t, s.data.Process) + + p, err := s.GetProcess(config.ProcessID()) + require.NoError(t, err) + require.Equal(t, "", p.Error) + + err = s.applyCommand(Command{ + Operation: OpSetProcessError, + Data: CommandSetProcessError{ + ID: config.ProcessID(), + Error: "foobar", + }, + }) + require.NoError(t, err) + + p, err = s.GetProcess(config.ProcessID()) + require.NoError(t, err) + require.Equal(t, "foobar", p.Error) +} + +func TestSetProcessError(t *testing.T) { + s, err := createStore() + require.NoError(t, err) + + config := &app.Config{ + ID: "foobar", + LimitCPU: 1, + LimitMemory: 1, + } + + err = s.setProcessError(CommandSetProcessError{ + ID: config.ProcessID(), + Error: "foobar", + }) + require.Error(t, err) + + err = s.addProcess(CommandAddProcess{ + Config: config, + }) + require.NoError(t, err) + require.Equal(t, 1, len(s.data.Process)) + + err = s.setProcessError(CommandSetProcessError{ + ID: config.ProcessID(), + Error: "foobar", + }) + require.NoError(t, err) + + err = s.setProcessError(CommandSetProcessError{ + ID: config.ProcessID(), + Error: "", + }) + require.NoError(t, err) + + p, err := s.GetProcess(config.ProcessID()) + require.NoError(t, err) + require.Equal(t, "", p.Error) + + err = s.setProcessError(CommandSetProcessError{ + ID: config.ProcessID(), + Error: "foobar", + }) + require.NoError(t, err) + + p, err = s.GetProcess(config.ProcessID()) + require.NoError(t, err) + require.Equal(t, "foobar", p.Error) +} + func TestAddIdentityCommand(t *testing.T) { s, err := createStore() require.NoError(t, err) diff --git a/http/handler/api/cluster.go b/http/handler/api/cluster.go index 1c8b68db..1ca76031 100644 --- a/http/handler/api/cluster.go +++ b/http/handler/api/cluster.go @@ -491,7 +491,8 @@ func (h *ClusterHandler) ListStoreProcesses(c echo.Context) error { process.Config = config process.State = &api.ProcessState{ - Order: p.Order, + Order: p.Order, + LastLog: p.Error, } processes = append(processes, process)