Using workflow name instead of uuid.

This commit is contained in:
Aleksandr Melnikov
2020-04-22 11:58:52 -07:00
parent 238353ffb9
commit 74b3043db1
2 changed files with 15 additions and 18 deletions

View File

@@ -378,7 +378,7 @@ func (c *Client) CreateWorkflowExecution(namespace string, workflow *WorkflowExe
return workflow, nil return workflow, nil
} }
func (c *Client) InsertPreWorkflowExecutionStatistic(namespace, uid string, workflowTemplateID int64, createdAt time.Time) error { func (c *Client) InsertPreWorkflowExecutionStatistic(namespace, name string, workflowTemplateID int64, createdAt time.Time) error {
tx, err := c.DB.Begin() tx, err := c.DB.Begin()
if err != nil { if err != nil {
return err return err
@@ -387,8 +387,7 @@ func (c *Client) InsertPreWorkflowExecutionStatistic(namespace, uid string, work
insertMap := sq.Eq{ insertMap := sq.Eq{
"workflow_template_id": workflowTemplateID, "workflow_template_id": workflowTemplateID,
"uid": uid, "name": name,
"name": uid,
"namespace": namespace, "namespace": namespace,
"created_at": createdAt.UTC(), "created_at": createdAt.UTC(),
} }
@@ -447,7 +446,7 @@ func (c *Client) DeletePreWorkflowExecutionStatistic(uid string) error {
return err return err
} }
func (c *Client) FinishWorkflowExecutionStatisticViaExitHandler(namespace, name, uid string, workflowTemplateID int64, workflowOutcomeIsSuccess bool) (err error) { func (c *Client) FinishWorkflowExecutionStatisticViaExitHandler(namespace, name string, workflowTemplateID int64, workflowOutcomeIsSuccess bool) (err error) {
tx, err := c.DB.Begin() tx, err := c.DB.Begin()
if err != nil { if err != nil {
return err return err
@@ -467,7 +466,7 @@ func (c *Client) FinishWorkflowExecutionStatisticViaExitHandler(namespace, name,
} }
_, err = sb.Update("workflow_executions"). _, err = sb.Update("workflow_executions").
SetMap(updateMap).Where(sq.Eq{"uid": uid}).RunWith(tx).Exec() SetMap(updateMap).Where(sq.Eq{"name": name}).RunWith(tx).Exec()
if err != nil { if err != nil {
return err return err
} }
@@ -478,7 +477,7 @@ func (c *Client) FinishWorkflowExecutionStatisticViaExitHandler(namespace, name,
return err return err
} }
func (c *Client) CronStartWorkflowExecutionStatisticInsert(namespace, uid string, workflowTemplateID int64) (err error) { func (c *Client) CronStartWorkflowExecutionStatisticInsert(namespace, name string, workflowTemplateID int64) (err error) {
tx, err := c.DB.Begin() tx, err := c.DB.Begin()
if err != nil { if err != nil {
return err return err
@@ -487,8 +486,7 @@ func (c *Client) CronStartWorkflowExecutionStatisticInsert(namespace, uid string
insertMap := sq.Eq{ insertMap := sq.Eq{
"workflow_template_id": workflowTemplateID, "workflow_template_id": workflowTemplateID,
"uid": uid, "name": name,
"name": uid,
"namespace": namespace, "namespace": namespace,
"created_at": time.Now().UTC(), "created_at": time.Now().UTC(),
} }
@@ -1274,7 +1272,7 @@ func (c *Client) GetWorkflowExecutionStatisticsForTemplates(workflowTemplates ..
Will build a template that makes a CURL request to the onepanel-core API, Will build a template that makes a CURL request to the onepanel-core API,
with statistics about the workflow that was just executed. with statistics about the workflow that was just executed.
*/ */
func GetExitHandlerWorkflowStatistics(namespace, uid string, workflowTemplateId *uint64) (workflowStepName, workflowStepTemplate, workflowStepWhen string, err error, wfv1Template wfv1.Template) { func GetExitHandlerWorkflowStatistics(namespace string, workflowTemplateId *uint64) (workflowStepName, workflowStepTemplate, workflowStepWhen string, err error, wfv1Template wfv1.Template) {
workflowStepName = "workflow-statistics" workflowStepName = "workflow-statistics"
workflowStepTemplate = "workflow-statistics-template" workflowStepTemplate = "workflow-statistics-template"
host := env.GetEnv("ONEPANEL_CORE_SERVICE_HOST", "") host := env.GetEnv("ONEPANEL_CORE_SERVICE_HOST", "")
@@ -1291,7 +1289,6 @@ func GetExitHandlerWorkflowStatistics(namespace, uid string, workflowTemplateId
jsonRequestStruct := api.Statistics{ jsonRequestStruct := api.Statistics{
WorkflowStatus: "{{workflow.status}}", WorkflowStatus: "{{workflow.status}}",
Uuid: uid,
WorkflowTemplateId: int64(*workflowTemplateId), WorkflowTemplateId: int64(*workflowTemplateId),
} }
jsonRequestBytes, err := json.Marshal(jsonRequestStruct) jsonRequestBytes, err := json.Marshal(jsonRequestStruct)
@@ -1321,7 +1318,7 @@ func GetExitHandlerWorkflowStatistics(namespace, uid string, workflowTemplateId
return return
} }
func GetInitContainerForCronWorkflow(templateCorrespondingToEntryPoint *wfv1.Template, namespace, uid string, workflowTemplateId int64) (err error) { func GetInitContainerForCronWorkflow(templateCorrespondingToEntryPoint *wfv1.Template, namespace string, workflowTemplateId int64) (err error) {
host := env.GetEnv("ONEPANEL_CORE_SERVICE_HOST", "") host := env.GetEnv("ONEPANEL_CORE_SERVICE_HOST", "")
if host == "" { if host == "" {
err = errors.New("ONEPANEL_CORE_SERVICE_HOST is empty.") err = errors.New("ONEPANEL_CORE_SERVICE_HOST is empty.")
@@ -1332,7 +1329,7 @@ func GetInitContainerForCronWorkflow(templateCorrespondingToEntryPoint *wfv1.Tem
err = errors.New("ONEPANEL_CORE_SERVICE_PORT is empty.") err = errors.New("ONEPANEL_CORE_SERVICE_PORT is empty.")
return return
} }
curlEndpoint := fmt.Sprintf("http://%s:%s/apis/v1beta1/%s/workflow_executions/%s/cron_start_statistics/%d", host, port, namespace, uid, workflowTemplateId) curlEndpoint := fmt.Sprintf("http://%s:%s/apis/v1beta1/%s/workflow_executions/{{workflow.name}}/cron_start_statistics/%d", host, port, namespace, workflowTemplateId)
token, err := GetBearerToken(namespace) token, err := GetBearerToken(namespace)
if err != nil { if err != nil {
@@ -1353,8 +1350,8 @@ func GetInitContainerForCronWorkflow(templateCorrespondingToEntryPoint *wfv1.Tem
return return
} }
func InjectExitHandlerWorkflowExecutionStatistic(wf *wfv1.Workflow, namespace, wfExecUid string, workflowTemplateId *uint64) error { func InjectExitHandlerWorkflowExecutionStatistic(wf *wfv1.Workflow, namespace string, workflowTemplateId *uint64) error {
exitHandlerStepName, exitHandlerStepTemplate, exitHandlerStepWhen, err, exitHandlerTemplate := GetExitHandlerWorkflowStatistics(namespace, wfExecUid, workflowTemplateId) exitHandlerStepName, exitHandlerStepTemplate, exitHandlerStepWhen, err, exitHandlerTemplate := GetExitHandlerWorkflowStatistics(namespace, workflowTemplateId)
if err != nil { if err != nil {
return err return err
} }
@@ -1375,7 +1372,7 @@ func InjectExitHandlerWorkflowExecutionStatistic(wf *wfv1.Workflow, namespace, w
return nil return nil
} }
func InjectInitHandlerWorkflowExecutionStatistic(wf *wfv1.Workflow, namespace, uid string, workflowTemplateId int64) error { func InjectInitHandlerWorkflowExecutionStatistic(wf *wfv1.Workflow, namespace string, workflowTemplateId int64) error {
//Find the template that matches this entrypoint name //Find the template that matches this entrypoint name
//Add the init container to that Template //Add the init container to that Template
var templateToUse *wfv1.Template var templateToUse *wfv1.Template
@@ -1385,7 +1382,7 @@ func InjectInitHandlerWorkflowExecutionStatistic(wf *wfv1.Workflow, namespace, u
break break
} }
} }
err := GetInitContainerForCronWorkflow(templateToUse, namespace, uid, workflowTemplateId) err := GetInitContainerForCronWorkflow(templateToUse, namespace, workflowTemplateId)
if err != nil { if err != nil {
return err return err
} }

View File

@@ -86,7 +86,7 @@ func (s *WorkflowServer) AddWorkflowExecutionStatistics(ctx context.Context, req
workflowOutcomeIsSuccess = true workflowOutcomeIsSuccess = true
} }
err := client.FinishWorkflowExecutionStatisticViaExitHandler(request.Namespace, request.Name, request.Statistics.Uuid, err := client.FinishWorkflowExecutionStatisticViaExitHandler(request.Namespace, request.Name,
request.Statistics.WorkflowTemplateId, workflowOutcomeIsSuccess) request.Statistics.WorkflowTemplateId, workflowOutcomeIsSuccess)
if err != nil { if err != nil {
return &empty.Empty{}, err return &empty.Empty{}, err
@@ -96,7 +96,7 @@ func (s *WorkflowServer) AddWorkflowExecutionStatistics(ctx context.Context, req
func (s *WorkflowServer) CronStartWorkflowExecutionStatistic(ctx context.Context, request *api.CronStartWorkflowExecutionStatisticRequest) (*empty.Empty, error) { func (s *WorkflowServer) CronStartWorkflowExecutionStatistic(ctx context.Context, request *api.CronStartWorkflowExecutionStatisticRequest) (*empty.Empty, error) {
client := ctx.Value("kubeClient").(*v1.Client) client := ctx.Value("kubeClient").(*v1.Client)
err := client.CronStartWorkflowExecutionStatisticInsert(request.Namespace, request.Uid, request.WorkflowTemplateId) err := client.CronStartWorkflowExecutionStatisticInsert(request.Namespace, request.Name, request.WorkflowTemplateId)
if err != nil { if err != nil {
return &empty.Empty{}, err return &empty.Empty{}, err
} }