update: database records for all resources.

This commit is contained in:
Andrey Melnikov
2020-04-27 09:32:45 -07:00
parent d42bffa0e2
commit 1920eb5ed0
18 changed files with 851 additions and 303 deletions

View File

@@ -10,10 +10,11 @@ import (
"github.com/ghodss/yaml"
"github.com/onepanelio/core/api"
"github.com/onepanelio/core/pkg/util/label"
"github.com/onepanelio/core/pkg/util/pagination"
"github.com/onepanelio/core/pkg/util/ptr"
"io"
"io/ioutil"
"regexp"
"sort"
"strconv"
"strings"
"time"
@@ -192,7 +193,7 @@ func addEnvToTemplate(template *wfv1.Template, key string, value string) {
}
}
func (c *Client) createWorkflow(namespace string, workflowTemplateId *uint64, wf *wfv1.Workflow, opts *WorkflowExecutionOptions) (createdWorkflow *wfv1.Workflow, err error) {
func (c *Client) createWorkflow(namespace string, workflowTemplateId uint64, workflowTemplateVersionId uint64, wf *wfv1.Workflow, opts *WorkflowExecutionOptions) (createdWorkflow *wfv1.Workflow, err error) {
if opts == nil {
opts = &WorkflowExecutionOptions{}
}
@@ -237,7 +238,7 @@ func (c *Client) createWorkflow(namespace string, workflowTemplateId *uint64, wf
return nil, err
}
err = InjectExitHandlerWorkflowExecutionStatistic(wf, namespace, workflowTemplateId)
err = InjectExitHandlerWorkflowExecutionStatistic(wf, namespace, &workflowTemplateId)
if err != nil {
return nil, err
}
@@ -249,7 +250,7 @@ func (c *Client) createWorkflow(namespace string, workflowTemplateId *uint64, wf
}
//Create an entry for workflow_executions statistic
//CURL code will hit the API endpoint that will update the db row
err = c.InsertPreWorkflowExecutionStatistic(namespace, createdWorkflow.Name, int64(*workflowTemplateId), time.Now())
err = c.InsertPreWorkflowExecutionStatistic(namespace, createdWorkflow.Name, int64(workflowTemplateId), workflowTemplateVersionId, createdWorkflow.CreationTimestamp.UTC())
if err != nil {
return nil, err
}
@@ -332,7 +333,7 @@ func (c *Client) CreateWorkflowExecution(namespace string, workflow *WorkflowExe
var createdWorkflows []*wfv1.Workflow
for _, wf := range workflows {
createdWorkflow, err := c.createWorkflow(namespace, &workflowTemplate.ID, &wf, opts)
createdWorkflow, err := c.createWorkflow(namespace, workflowTemplate.ID, workflowTemplate.WorkflowTemplateVersionId, &wf, opts)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
@@ -365,7 +366,7 @@ func (c *Client) CreateWorkflowExecution(namespace string, workflow *WorkflowExe
return workflow, nil
}
func (c *Client) InsertPreWorkflowExecutionStatistic(namespace, name string, workflowTemplateID int64, createdAt time.Time) error {
func (c *Client) InsertPreWorkflowExecutionStatistic(namespace, name string, workflowTemplateID int64, workflowTemplateVersionId uint64, createdAt time.Time) error {
tx, err := c.DB.Begin()
if err != nil {
return err
@@ -373,10 +374,11 @@ func (c *Client) InsertPreWorkflowExecutionStatistic(namespace, name string, wor
defer tx.Rollback()
insertMap := sq.Eq{
"workflow_template_id": workflowTemplateID,
"name": name,
"namespace": namespace,
"created_at": createdAt.UTC(),
"workflow_template_version_id": workflowTemplateVersionId,
"name": name,
"namespace": namespace,
"created_at": createdAt.UTC(),
"phase": wfv1.NodePending,
}
_, err = sb.Insert("workflow_executions").
@@ -391,7 +393,7 @@ func (c *Client) InsertPreWorkflowExecutionStatistic(namespace, name string, wor
return err
}
func (c *Client) FinishWorkflowExecutionStatisticViaExitHandler(namespace, name string, workflowTemplateID int64, workflowOutcomeIsSuccess bool) (err error) {
func (c *Client) FinishWorkflowExecutionStatisticViaExitHandler(namespace, name string, workflowTemplateID int64, phase wfv1.NodePhase, startedAt time.Time) (err error) {
tx, err := c.DB.Begin()
if err != nil {
return err
@@ -399,15 +401,11 @@ func (c *Client) FinishWorkflowExecutionStatisticViaExitHandler(namespace, name
defer tx.Rollback()
updateMap := sq.Eq{
"workflow_template_id": workflowTemplateID,
"name": name,
"namespace": namespace,
}
if workflowOutcomeIsSuccess {
updateMap["finished_at"] = time.Now().UTC()
} else {
updateMap["failed_at"] = time.Now().UTC()
"started_at": startedAt.UTC(),
"name": name,
"namespace": namespace,
"finished_at": time.Now().UTC(),
"phase": phase,
}
_, err = sb.Update("workflow_executions").
@@ -423,6 +421,30 @@ func (c *Client) FinishWorkflowExecutionStatisticViaExitHandler(namespace, name
}
func (c *Client) CronStartWorkflowExecutionStatisticInsert(namespace, name string, workflowTemplateID int64) (err error) {
query, args, err := c.workflowTemplatesSelectBuilder(namespace).
Where(sq.Eq{
"wt.id": workflowTemplateID,
}).
ToSql()
if err != nil {
return err
}
workflowTemplate := &WorkflowTemplate{}
if err := c.DB.Get(workflowTemplate, query, args...); err != nil {
return err
}
query, args, err = c.cronWorkflowSelectBuilder(namespace, workflowTemplate.UID).ToSql()
if err != nil {
return err
}
cronWorkflow := &CronWorkflow{}
if err := c.DB.Get(cronWorkflow, query, args...); err != nil {
return err
}
tx, err := c.DB.Begin()
if err != nil {
return err
@@ -430,10 +452,11 @@ func (c *Client) CronStartWorkflowExecutionStatisticInsert(namespace, name strin
defer tx.Rollback()
insertMap := sq.Eq{
"workflow_template_id": workflowTemplateID,
"name": name,
"namespace": namespace,
"created_at": time.Now().UTC(),
"workflow_template_version_id": cronWorkflow.WorkflowTemplateVersionId,
"name": name,
"namespace": namespace,
"phase": wfv1.NodePending,
"cron_workflow_id": cronWorkflow.ID,
}
_, err = sb.Insert("workflow_executions").
@@ -497,9 +520,9 @@ func (c *Client) GetWorkflowExecution(namespace, name string) (workflow *Workflo
UID: string(wf.UID),
CreatedAt: wf.CreationTimestamp.UTC(),
Name: wf.Name,
Phase: WorkflowExecutionPhase(wf.Status.Phase),
StartedAt: wf.Status.StartedAt.UTC(),
FinishedAt: wf.Status.FinishedAt.UTC(),
Phase: wf.Status.Phase,
StartedAt: ptr.Time(wf.Status.StartedAt.UTC()),
FinishedAt: ptr.Time(wf.Status.FinishedAt.UTC()),
Manifest: string(manifest),
WorkflowTemplate: workflowTemplate,
}
@@ -507,67 +530,27 @@ func (c *Client) GetWorkflowExecution(namespace, name string) (workflow *Workflo
return
}
func (c *Client) ListWorkflowExecutions(namespace, workflowTemplateUID, workflowTemplateVersion string) (workflows []*WorkflowExecution, err error) {
opts := &WorkflowExecutionOptions{}
if workflowTemplateUID != "" {
labelSelect := fmt.Sprintf("%s=%s", workflowTemplateUIDLabelKey, workflowTemplateUID)
if workflowTemplateVersion != "" {
labelSelect = fmt.Sprintf("%s,%s=%s", labelSelect, workflowTemplateVersionLabelKey, workflowTemplateVersion)
}
opts.ListOptions = &ListOptions{
LabelSelector: labelSelect,
}
}
workflowList, err := c.ArgoprojV1alpha1().Workflows(namespace).List(*opts.ListOptions)
func (c *Client) ListWorkflowExecutions(namespace, workflowTemplateUID, workflowTemplateVersion string, paginator *pagination.PaginationRequest) (workflows []*WorkflowExecution, err error) {
sb := workflowExecutionsSelectBuilder(namespace, workflowTemplateUID, workflowTemplateVersion).
OrderBy("we.created_at DESC")
sb = *paginator.ApplyToSelect(&sb)
query, args, err := sb.ToSql()
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"WorkflowTemplateUID": workflowTemplateUID,
"WorkflowTemplateVersion": workflowTemplateVersion,
"Error": err.Error(),
}).Error("Workflows not found.")
return nil, util.NewUserError(codes.NotFound, "Workflows not found.")
return nil, err
}
wfs := workflowList.Items
sort.Slice(wfs, func(i, j int) bool {
ith := wfs[i].CreationTimestamp.Time
jth := wfs[j].CreationTimestamp.Time
//Most recent first
return ith.After(jth)
})
for _, wf := range wfs {
execution := &WorkflowExecution{
Name: wf.ObjectMeta.Name,
UID: string(wf.ObjectMeta.UID),
Phase: WorkflowExecutionPhase(wf.Status.Phase),
StartedAt: wf.Status.StartedAt.UTC(),
FinishedAt: wf.Status.FinishedAt.UTC(),
CreatedAt: wf.CreationTimestamp.UTC(),
}
versionString, ok := wf.Labels[workflowTemplateVersionLabelKey]
if ok {
versionNumber, err := strconv.ParseInt(versionString, 10, 64)
if err == nil {
execution.WorkflowTemplate = &WorkflowTemplate{
Version: versionNumber,
}
} else {
log.WithFields(log.Fields{
"Namespace": namespace,
"WorkflowTemplateUID": workflowTemplateUID,
"WorkflowExecutionUID": wf.UID,
"Error": "Unable to get workflow template version",
}).Error("Unable to get workflow template version")
}
}
workflows = append(workflows, execution)
if err := c.DB.Select(&workflows, query, args...); err != nil {
return nil, err
}
return
}
func (c *Client) CountWorkflowExecutions(namespace, workflowTemplateUID, workflowTemplateVersion string) (count int, err error) {
err = workflowExecutionsSelectBuilderNoColumns(namespace, workflowTemplateUID, workflowTemplateVersion).
Columns("COUNT(*)").
RunWith(c.DB.DB).
QueryRow().
Scan(&count)
return
}
@@ -644,8 +627,8 @@ func (c *Client) WatchWorkflowExecution(namespace, name string) (<-chan *Workflo
workflowWatcher <- &WorkflowExecution{
CreatedAt: workflow.CreationTimestamp.UTC(),
StartedAt: workflow.Status.StartedAt.UTC(),
FinishedAt: workflow.Status.FinishedAt.UTC(),
StartedAt: ptr.Time(workflow.Status.StartedAt.UTC()),
FinishedAt: ptr.Time(workflow.Status.FinishedAt.UTC()),
UID: string(workflow.UID),
Name: workflow.Name,
Manifest: string(manifest),
@@ -1152,7 +1135,7 @@ func (c *Client) SetWorkflowTemplateLabels(namespace, name, prefix string, keyVa
func (c *Client) GetWorkflowExecutionStatisticsForTemplates(workflowTemplates ...*WorkflowTemplate) (err error) {
if len(workflowTemplates) == 0 {
return errors.New("GetWorkflowExecutionStatisticsForTemplates requires at least 1 id")
return nil
}
tx, err := c.DB.Begin()
@@ -1160,7 +1143,7 @@ func (c *Client) GetWorkflowExecutionStatisticsForTemplates(workflowTemplates ..
return err
}
whereIn := "workflow_template_id IN (?"
whereIn := "wtv.workflow_template_id IN (?"
for i := range workflowTemplates {
if i == 0 {
continue
@@ -1179,16 +1162,17 @@ func (c *Client) GetWorkflowExecutionStatisticsForTemplates(workflowTemplates ..
statsSelect := `
workflow_template_id,
MAX(created_at) last_executed,
COUNT(*) FILTER (WHERE finished_At IS NULL AND failed_at IS NULL) running,
COUNT(*) FILTER (WHERE finished_at IS NOT NULL) completed,
COUNT(*) FILTER (WHERE failed_at IS NOT NULL) failed,
MAX(we.created_at) last_executed,
COUNT(*) FILTER (WHERE finished_at IS NULL AND (phase = 'Running' OR phase = 'Pending')) running,
COUNT(*) FILTER (WHERE finished_at IS NOT NULL AND phase = 'Succeeded') completed,
COUNT(*) FILTER (WHERE finished_at IS NOT NULL AND (phase = 'Failed' OR phase = 'Error')) failed,
COUNT(*) total`
query, args, err := sb.Select(statsSelect).
From("workflow_executions").
From("workflow_executions we").
Join("workflow_template_versions wtv ON wtv.id = we.workflow_template_version_id").
Where(whereIn, ids...).
GroupBy("workflow_template_id").
GroupBy("wtv.workflow_template_id").
ToSql()
if err != nil {
@@ -1336,3 +1320,28 @@ func InjectInitHandlerWorkflowExecutionStatistic(wf *wfv1.Workflow, namespace st
}
return nil
}
func workflowExecutionsSelectBuilderNoColumns(namespace, workflowTemplateUID, workflowTemplateVersion string) sq.SelectBuilder {
whereMap := sq.Eq{
"wt.namespace": namespace,
"wt.uid": workflowTemplateUID,
}
if workflowTemplateVersion != "" {
whereMap["wtv.version"] = workflowTemplateVersion
}
sb := sb.Select().
From("workflow_executions we").
LeftJoin("workflow_template_versions wtv ON wtv.id = we.workflow_template_version_id").
LeftJoin("workflow_templates wt ON wt.id = wtv.workflow_template_id").
Where(whereMap)
return sb
}
func workflowExecutionsSelectBuilder(namespace, workflowTemplateUID, workflowTemplateVersion string) sq.SelectBuilder {
sb := workflowExecutionsSelectBuilderNoColumns(namespace, workflowTemplateUID, workflowTemplateVersion)
sb = sb.Columns("we.id", "we.created_at", "we.uid", "we.name", "we.phase", "we.started_at", "we.finished_at", `wtv.version "workflow_template.version"`)
return sb
}