feat: added versions to workflow tempaltes.

update: updated logic to get workflow template executions stats to be a single query.
This commit is contained in:
Andrey Melnikov
2020-04-20 13:47:48 -07:00
parent 43f71184a9
commit ae524fbd2e
9 changed files with 200 additions and 101 deletions

View File

@@ -1145,43 +1145,66 @@ func (c *Client) SetWorkflowTemplateLabels(namespace, name, prefix string, keyVa
return filteredMap, nil
}
func (c *Client) GetWorkflowExecutionStatisticsForTemplate(workflowTemplate *WorkflowTemplate) (err error) {
func (c *Client) GetWorkflowExecutionStatisticsForTemplates(workflowTemplates ...*WorkflowTemplate) (err error) {
if len(workflowTemplates) == 0 {
return errors.New("GetWorkflowExecutionStatisticsForTemplates requires at least 1 id")
}
tx, err := c.DB.Begin()
if err != nil {
return err
}
whereIn := "workflow_template_id IN (?"
for i := range workflowTemplates {
if i == 0 {
continue
}
whereIn += ",?"
}
whereIn += ")"
ids := make([]interface{}, len(workflowTemplates))
for i, workflowTemplate := range workflowTemplates {
ids[i] = workflowTemplate.ID
}
defer tx.Rollback()
query, args, err := sb.Select("name, created_at, finished_at, failed_at").
From("workflow_executions").Where(sq.Eq{"workflow_template_id": workflowTemplate.ID}).OrderBy("created_at DESC").ToSql()
if err != nil {
return err
}
var workflowExecStats []WorkflowExecutionStatistic
err = c.DB.Select(&workflowExecStats, query, args...)
statsSelect := `
workflow_template_id,
MAX(created_at) last_executed,
COUNT(*) FILTER (WHERE finished_At IS NULL AND failed_at IS NULL) running,
COUNT(*) FILTER (WHERE finished_at IS NOT NULL) completed,
COUNT(*) FILTER (WHERE failed_at IS NOT NULL) failed,
COUNT(*) total`
query, args, err := sb.Select(statsSelect).
From("workflow_executions").
Where(whereIn, ids...).
GroupBy("workflow_template_id").
ToSql()
if err != nil {
return err
}
workflowTemplate.WorkflowExecutionStatisticReport = &WorkflowExecutionStatisticReport{}
if len(workflowExecStats) == 0 {
return
result := make([]*WorkflowExecutionStatisticReport, 0)
err = c.DB.Select(&result, query, args...)
if err != nil {
return err
}
//Calculate and set the values
workflowTemplate.WorkflowExecutionStatisticReport.Total = int32(len(workflowExecStats))
createdAtTime := workflowExecStats[0].CreatedAt
workflowTemplate.WorkflowExecutionStatisticReport.LastExecuted = *createdAtTime
for _, workflowExecStat := range workflowExecStats {
if workflowExecStat.FailedAt != nil {
workflowTemplate.WorkflowExecutionStatisticReport.Failed++
} else if workflowExecStat.FinishedAt == nil {
workflowTemplate.WorkflowExecutionStatisticReport.Running++
} else if workflowExecStat.FinishedAt != nil {
workflowTemplate.WorkflowExecutionStatisticReport.Completed++
}
resultMapping := make(map[uint64]*WorkflowExecutionStatisticReport)
for i := range result {
report := result[i]
resultMapping[report.WorkflowTemplateId] = report
}
for _, workflowTemplate := range workflowTemplates {
workflowTemplate.WorkflowExecutionStatisticReport = resultMapping[workflowTemplate.ID]
}
return
}