diff --git a/pkg/workflow_execution.go b/pkg/workflow_execution.go index dc1dfe8..cdd9aa3 100644 --- a/pkg/workflow_execution.go +++ b/pkg/workflow_execution.go @@ -378,7 +378,7 @@ func (c *Client) CreateWorkflowExecution(namespace string, workflow *WorkflowExe return workflow, nil } -func (c *Client) InsertPreWorkflowExecutionStatistic(namespace, uid string, workflowTemplateID int64, createdAt time.Time) error { +func (c *Client) InsertPreWorkflowExecutionStatistic(namespace, name string, workflowTemplateID int64, createdAt time.Time) error { tx, err := c.DB.Begin() if err != nil { return err @@ -387,8 +387,7 @@ func (c *Client) InsertPreWorkflowExecutionStatistic(namespace, uid string, work insertMap := sq.Eq{ "workflow_template_id": workflowTemplateID, - "uid": uid, - "name": uid, + "name": name, "namespace": namespace, "created_at": createdAt.UTC(), } @@ -447,7 +446,7 @@ func (c *Client) DeletePreWorkflowExecutionStatistic(uid string) error { return err } -func (c *Client) FinishWorkflowExecutionStatisticViaExitHandler(namespace, name, uid string, workflowTemplateID int64, workflowOutcomeIsSuccess bool) (err error) { +func (c *Client) FinishWorkflowExecutionStatisticViaExitHandler(namespace, name string, workflowTemplateID int64, workflowOutcomeIsSuccess bool) (err error) { tx, err := c.DB.Begin() if err != nil { return err @@ -467,7 +466,7 @@ func (c *Client) FinishWorkflowExecutionStatisticViaExitHandler(namespace, name, } _, err = sb.Update("workflow_executions"). - SetMap(updateMap).Where(sq.Eq{"uid": uid}).RunWith(tx).Exec() + SetMap(updateMap).Where(sq.Eq{"name": name}).RunWith(tx).Exec() if err != nil { return err } @@ -478,7 +477,7 @@ func (c *Client) FinishWorkflowExecutionStatisticViaExitHandler(namespace, name, return err } -func (c *Client) CronStartWorkflowExecutionStatisticInsert(namespace, uid string, workflowTemplateID int64) (err error) { +func (c *Client) CronStartWorkflowExecutionStatisticInsert(namespace, name string, workflowTemplateID int64) (err error) { tx, err := c.DB.Begin() if err != nil { return err @@ -487,8 +486,7 @@ func (c *Client) CronStartWorkflowExecutionStatisticInsert(namespace, uid string insertMap := sq.Eq{ "workflow_template_id": workflowTemplateID, - "uid": uid, - "name": uid, + "name": name, "namespace": namespace, "created_at": time.Now().UTC(), } @@ -1274,7 +1272,7 @@ func (c *Client) GetWorkflowExecutionStatisticsForTemplates(workflowTemplates .. Will build a template that makes a CURL request to the onepanel-core API, with statistics about the workflow that was just executed. */ -func GetExitHandlerWorkflowStatistics(namespace, uid string, workflowTemplateId *uint64) (workflowStepName, workflowStepTemplate, workflowStepWhen string, err error, wfv1Template wfv1.Template) { +func GetExitHandlerWorkflowStatistics(namespace string, workflowTemplateId *uint64) (workflowStepName, workflowStepTemplate, workflowStepWhen string, err error, wfv1Template wfv1.Template) { workflowStepName = "workflow-statistics" workflowStepTemplate = "workflow-statistics-template" host := env.GetEnv("ONEPANEL_CORE_SERVICE_HOST", "") @@ -1291,7 +1289,6 @@ func GetExitHandlerWorkflowStatistics(namespace, uid string, workflowTemplateId jsonRequestStruct := api.Statistics{ WorkflowStatus: "{{workflow.status}}", - Uuid: uid, WorkflowTemplateId: int64(*workflowTemplateId), } jsonRequestBytes, err := json.Marshal(jsonRequestStruct) @@ -1321,7 +1318,7 @@ func GetExitHandlerWorkflowStatistics(namespace, uid string, workflowTemplateId return } -func GetInitContainerForCronWorkflow(templateCorrespondingToEntryPoint *wfv1.Template, namespace, uid string, workflowTemplateId int64) (err error) { +func GetInitContainerForCronWorkflow(templateCorrespondingToEntryPoint *wfv1.Template, namespace string, workflowTemplateId int64) (err error) { host := env.GetEnv("ONEPANEL_CORE_SERVICE_HOST", "") if host == "" { err = errors.New("ONEPANEL_CORE_SERVICE_HOST is empty.") @@ -1332,7 +1329,7 @@ func GetInitContainerForCronWorkflow(templateCorrespondingToEntryPoint *wfv1.Tem err = errors.New("ONEPANEL_CORE_SERVICE_PORT is empty.") return } - curlEndpoint := fmt.Sprintf("http://%s:%s/apis/v1beta1/%s/workflow_executions/%s/cron_start_statistics/%d", host, port, namespace, uid, workflowTemplateId) + curlEndpoint := fmt.Sprintf("http://%s:%s/apis/v1beta1/%s/workflow_executions/{{workflow.name}}/cron_start_statistics/%d", host, port, namespace, workflowTemplateId) token, err := GetBearerToken(namespace) if err != nil { @@ -1353,8 +1350,8 @@ func GetInitContainerForCronWorkflow(templateCorrespondingToEntryPoint *wfv1.Tem return } -func InjectExitHandlerWorkflowExecutionStatistic(wf *wfv1.Workflow, namespace, wfExecUid string, workflowTemplateId *uint64) error { - exitHandlerStepName, exitHandlerStepTemplate, exitHandlerStepWhen, err, exitHandlerTemplate := GetExitHandlerWorkflowStatistics(namespace, wfExecUid, workflowTemplateId) +func InjectExitHandlerWorkflowExecutionStatistic(wf *wfv1.Workflow, namespace string, workflowTemplateId *uint64) error { + exitHandlerStepName, exitHandlerStepTemplate, exitHandlerStepWhen, err, exitHandlerTemplate := GetExitHandlerWorkflowStatistics(namespace, workflowTemplateId) if err != nil { return err } @@ -1375,7 +1372,7 @@ func InjectExitHandlerWorkflowExecutionStatistic(wf *wfv1.Workflow, namespace, w return nil } -func InjectInitHandlerWorkflowExecutionStatistic(wf *wfv1.Workflow, namespace, uid string, workflowTemplateId int64) error { +func InjectInitHandlerWorkflowExecutionStatistic(wf *wfv1.Workflow, namespace string, workflowTemplateId int64) error { //Find the template that matches this entrypoint name //Add the init container to that Template var templateToUse *wfv1.Template @@ -1385,7 +1382,7 @@ func InjectInitHandlerWorkflowExecutionStatistic(wf *wfv1.Workflow, namespace, u break } } - err := GetInitContainerForCronWorkflow(templateToUse, namespace, uid, workflowTemplateId) + err := GetInitContainerForCronWorkflow(templateToUse, namespace, workflowTemplateId) if err != nil { return err } diff --git a/server/workflow_server.go b/server/workflow_server.go index f9abc6e..8198b91 100644 --- a/server/workflow_server.go +++ b/server/workflow_server.go @@ -86,7 +86,7 @@ func (s *WorkflowServer) AddWorkflowExecutionStatistics(ctx context.Context, req workflowOutcomeIsSuccess = true } - err := client.FinishWorkflowExecutionStatisticViaExitHandler(request.Namespace, request.Name, request.Statistics.Uuid, + err := client.FinishWorkflowExecutionStatisticViaExitHandler(request.Namespace, request.Name, request.Statistics.WorkflowTemplateId, workflowOutcomeIsSuccess) if err != nil { return &empty.Empty{}, err @@ -96,7 +96,7 @@ func (s *WorkflowServer) AddWorkflowExecutionStatistics(ctx context.Context, req func (s *WorkflowServer) CronStartWorkflowExecutionStatistic(ctx context.Context, request *api.CronStartWorkflowExecutionStatisticRequest) (*empty.Empty, error) { client := ctx.Value("kubeClient").(*v1.Client) - err := client.CronStartWorkflowExecutionStatisticInsert(request.Namespace, request.Uid, request.WorkflowTemplateId) + err := client.CronStartWorkflowExecutionStatisticInsert(request.Namespace, request.Name, request.WorkflowTemplateId) if err != nil { return &empty.Empty{}, err }