Write deploy errors to cluster DB

This commit is contained in:
Ingo Oppermann
2023-07-11 15:55:46 +02:00
parent 0f06b8b5a0
commit 3883696034
5 changed files with 362 additions and 13 deletions

View File

@@ -968,7 +968,7 @@ func ErrorHandler(err error, c echo.Context) {
} else { } else {
code = http.StatusInternalServerError code = http.StatusInternalServerError
message = http.StatusText(http.StatusInternalServerError) message = http.StatusText(http.StatusInternalServerError)
details = strings.Split(fmt.Sprintf("%s", err), "\n") details = strings.Split(err.Error(), "\n")
} }
// Send response // Send response

View File

@@ -443,12 +443,23 @@ type processOpSkip struct {
err error err error
} }
func (c *cluster) applyOpStack(stack []interface{}) { type processOpError struct {
processid app.ProcessID
err error
}
func (c *cluster) applyOpStack(stack []interface{}) []processOpError {
errors := []processOpError{}
for _, op := range stack { for _, op := range stack {
switch v := op.(type) { switch v := op.(type) {
case processOpAdd: case processOpAdd:
err := c.proxy.AddProcess(v.nodeid, v.config, v.metadata) err := c.proxy.AddProcess(v.nodeid, v.config, v.metadata)
if err != nil { if err != nil {
errors = append(errors, processOpError{
processid: v.config.ProcessID(),
err: err,
})
c.logger.Info().WithError(err).WithFields(log.Fields{ c.logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.config.ProcessID(), "processid": v.config.ProcessID(),
"nodeid": v.nodeid, "nodeid": v.nodeid,
@@ -459,34 +470,57 @@ func (c *cluster) applyOpStack(stack []interface{}) {
if v.order == "start" { if v.order == "start" {
err = c.proxy.CommandProcess(v.nodeid, v.config.ProcessID(), "start") err = c.proxy.CommandProcess(v.nodeid, v.config.ProcessID(), "start")
if err != nil { if err != nil {
errors = append(errors, processOpError{
processid: v.config.ProcessID(),
err: err,
})
c.logger.Info().WithError(err).WithFields(log.Fields{ c.logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.config.ID, "processid": v.config.ProcessID(),
"nodeid": v.nodeid, "nodeid": v.nodeid,
}).Log("Starting process") }).Log("Starting process")
break break
} }
} }
errors = append(errors, processOpError{
processid: v.config.ProcessID(),
err: nil,
})
c.logger.Info().WithFields(log.Fields{ c.logger.Info().WithFields(log.Fields{
"processid": v.config.ID, "processid": v.config.ProcessID(),
"nodeid": v.nodeid, "nodeid": v.nodeid,
}).Log("Adding process") }).Log("Adding process")
case processOpUpdate: case processOpUpdate:
err := c.proxy.UpdateProcess(v.nodeid, v.processid, v.config, v.metadata) err := c.proxy.UpdateProcess(v.nodeid, v.processid, v.config, v.metadata)
if err != nil { if err != nil {
errors = append(errors, processOpError{
processid: v.processid,
err: err,
})
c.logger.Info().WithError(err).WithFields(log.Fields{ c.logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.config.ID, "processid": v.processid,
"nodeid": v.nodeid, "nodeid": v.nodeid,
}).Log("Updating process") }).Log("Updating process")
break break
} }
errors = append(errors, processOpError{
processid: v.processid,
err: nil,
})
c.logger.Info().WithFields(log.Fields{ c.logger.Info().WithFields(log.Fields{
"processid": v.config.ID, "processid": v.config.ProcessID(),
"nodeid": v.nodeid, "nodeid": v.nodeid,
}).Log("Updating process") }).Log("Updating process")
case processOpDelete: case processOpDelete:
err := c.proxy.DeleteProcess(v.nodeid, v.processid) err := c.proxy.DeleteProcess(v.nodeid, v.processid)
if err != nil { if err != nil {
errors = append(errors, processOpError{
processid: v.processid,
err: err,
})
c.logger.Info().WithError(err).WithFields(log.Fields{ c.logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.processid, "processid": v.processid,
"nodeid": v.nodeid, "nodeid": v.nodeid,
@@ -494,6 +528,11 @@ func (c *cluster) applyOpStack(stack []interface{}) {
break break
} }
errors = append(errors, processOpError{
processid: v.processid,
err: nil,
})
c.logger.Info().WithFields(log.Fields{ c.logger.Info().WithFields(log.Fields{
"processid": v.processid, "processid": v.processid,
"nodeid": v.nodeid, "nodeid": v.nodeid,
@@ -501,8 +540,12 @@ func (c *cluster) applyOpStack(stack []interface{}) {
case processOpMove: case processOpMove:
err := c.proxy.AddProcess(v.toNodeid, v.config, v.metadata) err := c.proxy.AddProcess(v.toNodeid, v.config, v.metadata)
if err != nil { if err != nil {
errors = append(errors, processOpError{
processid: v.config.ProcessID(),
err: err,
})
c.logger.Info().WithError(err).WithFields(log.Fields{ c.logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.config.ID, "processid": v.config.ProcessID(),
"fromnodeid": v.fromNodeid, "fromnodeid": v.fromNodeid,
"tonodeid": v.toNodeid, "tonodeid": v.toNodeid,
}).Log("Moving process, adding process") }).Log("Moving process, adding process")
@@ -511,8 +554,12 @@ func (c *cluster) applyOpStack(stack []interface{}) {
err = c.proxy.DeleteProcess(v.fromNodeid, v.config.ProcessID()) err = c.proxy.DeleteProcess(v.fromNodeid, v.config.ProcessID())
if err != nil { if err != nil {
errors = append(errors, processOpError{
processid: v.config.ProcessID(),
err: err,
})
c.logger.Info().WithError(err).WithFields(log.Fields{ c.logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.config.ID, "processid": v.config.ProcessID(),
"fromnodeid": v.fromNodeid, "fromnodeid": v.fromNodeid,
"tonodeid": v.toNodeid, "tonodeid": v.toNodeid,
}).Log("Moving process, removing process") }).Log("Moving process, removing process")
@@ -521,22 +568,35 @@ func (c *cluster) applyOpStack(stack []interface{}) {
err = c.proxy.CommandProcess(v.toNodeid, v.config.ProcessID(), "start") err = c.proxy.CommandProcess(v.toNodeid, v.config.ProcessID(), "start")
if err != nil { if err != nil {
errors = append(errors, processOpError{
processid: v.config.ProcessID(),
err: err,
})
c.logger.Info().WithError(err).WithFields(log.Fields{ c.logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.config.ID, "processid": v.config.ProcessID(),
"fromnodeid": v.fromNodeid, "fromnodeid": v.fromNodeid,
"tonodeid": v.toNodeid, "tonodeid": v.toNodeid,
}).Log("Moving process, starting process") }).Log("Moving process, starting process")
break break
} }
errors = append(errors, processOpError{
processid: v.config.ProcessID(),
err: nil,
})
c.logger.Info().WithFields(log.Fields{ c.logger.Info().WithFields(log.Fields{
"processid": v.config.ID, "processid": v.config.ProcessID(),
"fromnodeid": v.fromNodeid, "fromnodeid": v.fromNodeid,
"tonodeid": v.toNodeid, "tonodeid": v.toNodeid,
}).Log("Moving process") }).Log("Moving process")
case processOpStart: case processOpStart:
err := c.proxy.CommandProcess(v.nodeid, v.processid, "start") err := c.proxy.CommandProcess(v.nodeid, v.processid, "start")
if err != nil { if err != nil {
errors = append(errors, processOpError{
processid: v.processid,
err: err,
})
c.logger.Info().WithError(err).WithFields(log.Fields{ c.logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.processid, "processid": v.processid,
"nodeid": v.nodeid, "nodeid": v.nodeid,
@@ -544,6 +604,11 @@ func (c *cluster) applyOpStack(stack []interface{}) {
break break
} }
errors = append(errors, processOpError{
processid: v.processid,
err: nil,
})
c.logger.Info().WithFields(log.Fields{ c.logger.Info().WithFields(log.Fields{
"processid": v.processid, "processid": v.processid,
"nodeid": v.nodeid, "nodeid": v.nodeid,
@@ -551,6 +616,10 @@ func (c *cluster) applyOpStack(stack []interface{}) {
case processOpStop: case processOpStop:
err := c.proxy.CommandProcess(v.nodeid, v.processid, "stop") err := c.proxy.CommandProcess(v.nodeid, v.processid, "stop")
if err != nil { if err != nil {
errors = append(errors, processOpError{
processid: v.processid,
err: err,
})
c.logger.Info().WithError(err).WithFields(log.Fields{ c.logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.processid, "processid": v.processid,
"nodeid": v.nodeid, "nodeid": v.nodeid,
@@ -558,21 +627,35 @@ func (c *cluster) applyOpStack(stack []interface{}) {
break break
} }
errors = append(errors, processOpError{
processid: v.processid,
err: nil,
})
c.logger.Info().WithFields(log.Fields{ c.logger.Info().WithFields(log.Fields{
"processid": v.processid, "processid": v.processid,
"nodeid": v.nodeid, "nodeid": v.nodeid,
}).Log("Stopping process") }).Log("Stopping process")
case processOpReject: case processOpReject:
errors = append(errors, processOpError(v))
c.logger.Warn().WithError(v.err).WithField("processid", v.processid).Log("Process rejected") c.logger.Warn().WithError(v.err).WithField("processid", v.processid).Log("Process rejected")
case processOpSkip: case processOpSkip:
errors = append(errors, processOpError{
processid: v.processid,
err: v.err,
})
c.logger.Warn().WithError(v.err).WithFields(log.Fields{ c.logger.Warn().WithError(v.err).WithFields(log.Fields{
"nodeid": v.nodeid, "nodeid": v.nodeid,
"processid": v.processid, "processid": v.processid,
}).Log("Process skipped") }).Log("Process skipped")
case processOpError:
errors = append(errors, v)
default: default:
c.logger.Warn().Log("Unknown operation on stack: %+v", v) c.logger.Warn().Log("Unknown operation on stack: %+v", v)
} }
} }
return errors
} }
func (c *cluster) doSynchronize(emergency bool) { func (c *cluster) doSynchronize(emergency bool) {
@@ -608,7 +691,37 @@ func (c *cluster) doSynchronize(emergency bool) {
c.applyCommand(cmd) c.applyCommand(cmd)
} }
c.applyOpStack(opStack) errors := c.applyOpStack(opStack)
if !emergency {
for _, e := range errors {
// Only apply the command if the error is different.
process, err := c.store.GetProcess(e.processid)
if err != nil {
continue
}
if e.err != nil {
if process.Error == e.err.Error() {
continue
}
} else {
if len(process.Error) == 0 {
continue
}
}
cmd := &store.Command{
Operation: store.OpSetProcessError,
Data: store.CommandSetProcessError{
ID: e.processid,
Error: e.err.Error(),
},
}
c.applyCommand(cmd)
}
}
} }
func (c *cluster) doRebalance(emergency bool) { func (c *cluster) doRebalance(emergency bool) {
@@ -634,7 +747,35 @@ func (c *cluster) doRebalance(emergency bool) {
opStack, _ := rebalance(have, nodesMap) opStack, _ := rebalance(have, nodesMap)
c.applyOpStack(opStack) errors := c.applyOpStack(opStack)
for _, e := range errors {
// Only apply the command if the error is different.
process, err := c.store.GetProcess(e.processid)
if err != nil {
continue
}
if e.err != nil {
if process.Error == e.err.Error() {
continue
}
} else {
if len(process.Error) == 0 {
continue
}
}
cmd := &store.Command{
Operation: store.OpSetProcessError,
Data: store.CommandSetProcessError{
ID: e.processid,
Error: e.err.Error(),
},
}
c.applyCommand(cmd)
}
} }
// isMetadataUpdateRequired compares two metadata. It relies on the documented property that json.Marshal // isMetadataUpdateRequired compares two metadata. It relies on the documented property that json.Marshal

View File

@@ -44,6 +44,7 @@ type Process struct {
Config *app.Config Config *app.Config
Order string Order string
Metadata map[string]interface{} Metadata map[string]interface{}
Error string
} }
type Users struct { type Users struct {
@@ -69,6 +70,7 @@ const (
OpUpdateProcess Operation = "updateProcess" OpUpdateProcess Operation = "updateProcess"
OpSetProcessOrder Operation = "setProcessOrder" OpSetProcessOrder Operation = "setProcessOrder"
OpSetProcessMetadata Operation = "setProcessMetadata" OpSetProcessMetadata Operation = "setProcessMetadata"
OpSetProcessError Operation = "setProcessError"
OpAddIdentity Operation = "addIdentity" OpAddIdentity Operation = "addIdentity"
OpUpdateIdentity Operation = "updateIdentity" OpUpdateIdentity Operation = "updateIdentity"
OpRemoveIdentity Operation = "removeIdentity" OpRemoveIdentity Operation = "removeIdentity"
@@ -110,6 +112,11 @@ type CommandSetProcessMetadata struct {
Data interface{} Data interface{}
} }
type CommandSetProcessError struct {
ID app.ProcessID
Error string
}
type CommandAddIdentity struct { type CommandAddIdentity struct {
Identity identity.User Identity identity.User
} }
@@ -307,6 +314,14 @@ func (s *store) applyCommand(c Command) error {
} }
err = s.setProcessMetadata(cmd) err = s.setProcessMetadata(cmd)
case OpSetProcessError:
cmd := CommandSetProcessError{}
err = decodeCommand(&cmd, c.Data)
if err != nil {
break
}
err = s.setProcessError(cmd)
case OpAddIdentity: case OpAddIdentity:
cmd := CommandAddIdentity{} cmd := CommandAddIdentity{}
err = decodeCommand(&cmd, c.Data) err = decodeCommand(&cmd, c.Data)
@@ -536,6 +551,24 @@ func (s *store) setProcessMetadata(cmd CommandSetProcessMetadata) error {
return nil return nil
} }
func (s *store) setProcessError(cmd CommandSetProcessError) error {
s.lock.Lock()
defer s.lock.Unlock()
id := cmd.ID.String()
p, ok := s.data.Process[id]
if !ok {
return fmt.Errorf("the process with the ID '%s' doesn't exists", cmd.ID)
}
p.Error = cmd.Error
s.data.Process[id] = p
return nil
}
func (s *store) addIdentity(cmd CommandAddIdentity) error { func (s *store) addIdentity(cmd CommandAddIdentity) error {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
@@ -783,6 +816,7 @@ func (s *store) ListProcesses() []Process {
Config: p.Config.Clone(), Config: p.Config.Clone(),
Order: p.Order, Order: p.Order,
Metadata: p.Metadata, Metadata: p.Metadata,
Error: p.Error,
}) })
} }
@@ -804,6 +838,7 @@ func (s *store) GetProcess(id app.ProcessID) (Process, error) {
Config: process.Config.Clone(), Config: process.Config.Clone(),
Order: process.Order, Order: process.Order,
Metadata: process.Metadata, Metadata: process.Metadata,
Error: process.Error,
}, nil }, nil
} }

View File

@@ -343,6 +343,92 @@ func TestUpdateProcess(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
} }
func TestSetProcessOrderCommand(t *testing.T) {
s, err := createStore()
require.NoError(t, err)
config := &app.Config{
ID: "foobar",
LimitCPU: 1,
LimitMemory: 1,
}
err = s.applyCommand(Command{
Operation: OpAddProcess,
Data: CommandAddProcess{
Config: config,
},
})
require.NoError(t, err)
require.NotEmpty(t, s.data.Process)
p, err := s.GetProcess(config.ProcessID())
require.NoError(t, err)
require.Equal(t, "stop", p.Order)
err = s.applyCommand(Command{
Operation: OpSetProcessOrder,
Data: CommandSetProcessOrder{
ID: config.ProcessID(),
Order: "start",
},
})
require.NoError(t, err)
p, err = s.GetProcess(config.ProcessID())
require.NoError(t, err)
require.Equal(t, "start", p.Order)
}
func TestSetProcessOrder(t *testing.T) {
s, err := createStore()
require.NoError(t, err)
config := &app.Config{
ID: "foobar",
LimitCPU: 1,
LimitMemory: 1,
}
err = s.setProcessOrder(CommandSetProcessOrder{
ID: config.ProcessID(),
Order: "start",
})
require.Error(t, err)
err = s.addProcess(CommandAddProcess{
Config: config,
})
require.NoError(t, err)
require.Equal(t, 1, len(s.data.Process))
err = s.setProcessOrder(CommandSetProcessOrder{
ID: config.ProcessID(),
Order: "start",
})
require.NoError(t, err)
err = s.setProcessOrder(CommandSetProcessOrder{
ID: config.ProcessID(),
Order: "stop",
})
require.NoError(t, err)
p, err := s.GetProcess(config.ProcessID())
require.NoError(t, err)
require.Equal(t, "stop", p.Order)
err = s.setProcessOrder(CommandSetProcessOrder{
ID: config.ProcessID(),
Order: "start",
})
require.NoError(t, err)
p, err = s.GetProcess(config.ProcessID())
require.NoError(t, err)
require.Equal(t, "start", p.Order)
}
func TestSetProcessMetadataCommand(t *testing.T) { func TestSetProcessMetadataCommand(t *testing.T) {
s, err := createStore() s, err := createStore()
require.NoError(t, err) require.NoError(t, err)
@@ -460,6 +546,92 @@ func TestSetProcessMetadata(t *testing.T) {
require.Equal(t, "bor", p.Metadata["foo"]) require.Equal(t, "bor", p.Metadata["foo"])
} }
func TestSetProcessErrorCommand(t *testing.T) {
s, err := createStore()
require.NoError(t, err)
config := &app.Config{
ID: "foobar",
LimitCPU: 1,
LimitMemory: 1,
}
err = s.applyCommand(Command{
Operation: OpAddProcess,
Data: CommandAddProcess{
Config: config,
},
})
require.NoError(t, err)
require.NotEmpty(t, s.data.Process)
p, err := s.GetProcess(config.ProcessID())
require.NoError(t, err)
require.Equal(t, "", p.Error)
err = s.applyCommand(Command{
Operation: OpSetProcessError,
Data: CommandSetProcessError{
ID: config.ProcessID(),
Error: "foobar",
},
})
require.NoError(t, err)
p, err = s.GetProcess(config.ProcessID())
require.NoError(t, err)
require.Equal(t, "foobar", p.Error)
}
func TestSetProcessError(t *testing.T) {
s, err := createStore()
require.NoError(t, err)
config := &app.Config{
ID: "foobar",
LimitCPU: 1,
LimitMemory: 1,
}
err = s.setProcessError(CommandSetProcessError{
ID: config.ProcessID(),
Error: "foobar",
})
require.Error(t, err)
err = s.addProcess(CommandAddProcess{
Config: config,
})
require.NoError(t, err)
require.Equal(t, 1, len(s.data.Process))
err = s.setProcessError(CommandSetProcessError{
ID: config.ProcessID(),
Error: "foobar",
})
require.NoError(t, err)
err = s.setProcessError(CommandSetProcessError{
ID: config.ProcessID(),
Error: "",
})
require.NoError(t, err)
p, err := s.GetProcess(config.ProcessID())
require.NoError(t, err)
require.Equal(t, "", p.Error)
err = s.setProcessError(CommandSetProcessError{
ID: config.ProcessID(),
Error: "foobar",
})
require.NoError(t, err)
p, err = s.GetProcess(config.ProcessID())
require.NoError(t, err)
require.Equal(t, "foobar", p.Error)
}
func TestAddIdentityCommand(t *testing.T) { func TestAddIdentityCommand(t *testing.T) {
s, err := createStore() s, err := createStore()
require.NoError(t, err) require.NoError(t, err)

View File

@@ -491,7 +491,8 @@ func (h *ClusterHandler) ListStoreProcesses(c echo.Context) error {
process.Config = config process.Config = config
process.State = &api.ProcessState{ process.State = &api.ProcessState{
Order: p.Order, Order: p.Order,
LastLog: p.Error,
} }
processes = append(processes, process) processes = append(processes, process)