mirror of
https://github.com/onepanelio/onepanel.git
synced 2025-11-02 20:04:01 +08:00
fix: issue where newly created workflow execution did not add new labels.
This commit is contained in:
@@ -194,7 +194,7 @@ func addEnvToTemplate(template *wfv1.Template, key string, value string) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) createWorkflow(namespace string, workflowTemplateId uint64, workflowTemplateVersionId uint64, wf *wfv1.Workflow, opts *WorkflowExecutionOptions) (createdWorkflow *wfv1.Workflow, err error) {
|
||||
func (c *Client) createWorkflow(namespace string, workflowTemplateId uint64, workflowTemplateVersionId uint64, wf *wfv1.Workflow, opts *WorkflowExecutionOptions) (newDbId uint64, createdWorkflow *wfv1.Workflow, err error) {
|
||||
if opts == nil {
|
||||
opts = &WorkflowExecutionOptions{}
|
||||
}
|
||||
@@ -237,25 +237,24 @@ func (c *Client) createWorkflow(namespace string, workflowTemplateId uint64, wor
|
||||
|
||||
err = injectExitHandlerWorkflowExecutionStatistic(wf, namespace, &workflowTemplateId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return 0, nil, err
|
||||
}
|
||||
|
||||
if err = c.injectAutomatedFields(namespace, wf, opts); err != nil {
|
||||
return nil, err
|
||||
return 0, nil, err
|
||||
}
|
||||
|
||||
createdWorkflow, err = c.ArgoprojV1alpha1().Workflows(namespace).Create(wf)
|
||||
if err != nil {
|
||||
|
||||
return nil, err
|
||||
return 0, nil, err
|
||||
}
|
||||
|
||||
uid := wf.Labels[label.WorkflowUid]
|
||||
//Create an entry for workflow_executions statistic
|
||||
//CURL code will hit the API endpoint that will update the db row
|
||||
err = c.InsertPreWorkflowExecutionStatistic(namespace, createdWorkflow.Name, workflowTemplateVersionId, createdWorkflow.CreationTimestamp.UTC(), uid)
|
||||
newDbId, err = c.InsertPreWorkflowExecutionStatistic(namespace, createdWorkflow.Name, workflowTemplateVersionId, createdWorkflow.CreationTimestamp.UTC(), uid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return 0, nil, err
|
||||
}
|
||||
|
||||
return
|
||||
@@ -330,6 +329,7 @@ func (c *Client) CreateWorkflowExecution(namespace string, workflow *WorkflowExe
|
||||
(*opts.Labels)[label.WorkflowUid] = workflowUid
|
||||
label.MergeLabelsPrefix(*opts.Labels, workflow.Labels, label.TagPrefix)
|
||||
|
||||
// @todo we need to enforce the below requirement in API.
|
||||
//UX will prevent multiple workflows
|
||||
manifest, err := workflowTemplate.GetWorkflowManifestBytes()
|
||||
if err != nil {
|
||||
@@ -353,7 +353,7 @@ func (c *Client) CreateWorkflowExecution(namespace string, workflow *WorkflowExe
|
||||
|
||||
var createdWorkflows []*wfv1.Workflow
|
||||
for _, wf := range workflows {
|
||||
createdWorkflow, err := c.createWorkflow(namespace, workflowTemplate.ID, workflowTemplate.WorkflowTemplateVersionId, &wf, opts)
|
||||
dbId, createdWorkflow, err := c.createWorkflow(namespace, workflowTemplate.ID, workflowTemplate.WorkflowTemplateVersionId, &wf, opts)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"Namespace": namespace,
|
||||
@@ -362,6 +362,17 @@ func (c *Client) CreateWorkflowExecution(namespace string, workflow *WorkflowExe
|
||||
}).Error("Error parsing workflow.")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(workflow.Labels) > 0 {
|
||||
_, err = c.InsertLabelsBuilder(TypeWorkflowExecution, dbId, workflow.Labels).
|
||||
RunWith(c.DB).
|
||||
Exec()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
createdWorkflows = append(createdWorkflows, createdWorkflow)
|
||||
}
|
||||
|
||||
@@ -386,10 +397,10 @@ func (c *Client) CreateWorkflowExecution(namespace string, workflow *WorkflowExe
|
||||
return workflow, nil
|
||||
}
|
||||
|
||||
func (c *Client) InsertPreWorkflowExecutionStatistic(namespace, name string, workflowTemplateVersionId uint64, createdAt time.Time, uid string) error {
|
||||
func (c *Client) InsertPreWorkflowExecutionStatistic(namespace, name string, workflowTemplateVersionId uint64, createdAt time.Time, uid string) (newId uint64, err error) {
|
||||
tx, err := c.DB.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
@@ -402,16 +413,21 @@ func (c *Client) InsertPreWorkflowExecutionStatistic(namespace, name string, wor
|
||||
"phase": wfv1.NodePending,
|
||||
}
|
||||
|
||||
_, err = sb.Insert("workflow_executions").
|
||||
SetMap(insertMap).RunWith(tx).Exec()
|
||||
err = sb.Insert("workflow_executions").
|
||||
SetMap(insertMap).
|
||||
Suffix("RETURNING id").
|
||||
RunWith(tx).
|
||||
QueryRow().
|
||||
Scan(&newId)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
err = tx.Commit()
|
||||
if err != nil {
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
return err
|
||||
return newId, err
|
||||
}
|
||||
|
||||
func (c *Client) FinishWorkflowExecutionStatisticViaExitHandler(namespace, name string, workflowTemplateID int64, phase wfv1.NodePhase, startedAt time.Time) (err error) {
|
||||
|
||||
Reference in New Issue
Block a user