diff --git a/pkg/workflow_execution.go b/pkg/workflow_execution.go index 0e1b0ed..ba9c9f5 100644 --- a/pkg/workflow_execution.go +++ b/pkg/workflow_execution.go @@ -467,7 +467,7 @@ func (c *Client) FinishWorkflowExecutionStatisticViaExitHandler(namespace, name return err } -func (c *Client) CronStartWorkflowExecutionStatisticInsert(namespace, name string, workflowTemplateID int64) (err error) { +func (c *Client) CronStartWorkflowExecutionStatisticInsert(namespace, uid string, workflowTemplateID int64) (err error) { query, args, err := c.workflowTemplatesSelectBuilder(namespace). Where(sq.Eq{ "wt.id": workflowTemplateID, @@ -515,7 +515,7 @@ func (c *Client) CronStartWorkflowExecutionStatisticInsert(namespace, name strin insertMap := sq.Eq{ "uid": cronUid, "workflow_template_version_id": cronWorkflow.WorkflowTemplateVersionId, - "name": name, + "name": uid, "namespace": namespace, "phase": wfv1.NodePending, "cron_workflow_id": cronWorkflow.ID, @@ -737,12 +737,12 @@ func (c *Client) WatchWorkflowExecution(namespace, uid string) (<-chan *Workflow return workflowWatcher, nil } -func (c *Client) GetWorkflowExecutionLogs(namespace, name, podName, containerName string) (<-chan *LogEntry, error) { - wf, err := c.ArgoprojV1alpha1().Workflows(namespace).Get(name, metav1.GetOptions{}) +func (c *Client) GetWorkflowExecutionLogs(namespace, uid, podName, containerName string) (<-chan *LogEntry, error) { + wf, err := c.ArgoprojV1alpha1().Workflows(namespace).Get(uid, metav1.GetOptions{}) if err != nil { log.WithFields(log.Fields{ "Namespace": namespace, - "Name": name, + "UID": uid, "PodName": podName, "ContainerName": containerName, "Error": err.Error(), @@ -762,7 +762,7 @@ func (c *Client) GetWorkflowExecutionLogs(namespace, name, podName, containerNam if err != nil { log.WithFields(log.Fields{ "Namespace": namespace, - "Name": name, + "UID": uid, "PodName": podName, "ContainerName": containerName, "Error": err.Error(), @@ -774,7 +774,7 @@ func (c *Client) GetWorkflowExecutionLogs(namespace, name, podName, containerNam if err != nil { log.WithFields(log.Fields{ "Namespace": namespace, - "Name": name, + "UID": uid, "PodName": podName, "ContainerName": containerName, "Error": err.Error(), @@ -788,7 +788,8 @@ func (c *Client) GetWorkflowExecutionLogs(namespace, name, podName, containerNam return nil, util.NewUserError(codes.InvalidArgument, "Invaild range.") } opts.SetRange(0, int64(endOffset)) - stream, err = s3Client.GetObject(config[ArtifactRepositoryBucketKey], "artifacts/"+namespace+"/"+name+"/"+podName+"/"+containerName+".log", opts) + + stream, err = s3Client.GetObject(config[ArtifactRepositoryBucketKey], "artifacts/"+namespace+"/"+uid+"/"+podName+"/"+containerName+".log", opts) } else { stream, err = c.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{ Container: containerName, @@ -801,7 +802,7 @@ func (c *Client) GetWorkflowExecutionLogs(namespace, name, podName, containerNam if err != nil { log.WithFields(log.Fields{ "Namespace": namespace, - "Name": name, + "UID": uid, "PodName": podName, "ContainerName": containerName, "Error": err.Error(), @@ -832,8 +833,8 @@ func (c *Client) GetWorkflowExecutionLogs(namespace, name, podName, containerNam return logWatcher, err } -func (c *Client) GetWorkflowExecutionMetrics(namespace, name, podName string) (metrics []*Metric, err error) { - _, err = c.GetWorkflowExecution(namespace, name) +func (c *Client) GetWorkflowExecutionMetrics(namespace, uid, podName string) (metrics []*Metric, err error) { + _, err = c.GetWorkflowExecution(namespace, uid) if err != nil { return nil, util.NewUserError(codes.NotFound, "Workflow not found.") } @@ -848,7 +849,7 @@ func (c *Client) GetWorkflowExecutionMetrics(namespace, name, podName string) (m if err != nil { log.WithFields(log.Fields{ "Namespace": namespace, - "Name": name, + "UID": uid, "PodName": podName, "Error": err.Error(), }).Error("Can't get configuration.") @@ -859,7 +860,7 @@ func (c *Client) GetWorkflowExecutionMetrics(namespace, name, podName string) (m if err != nil { log.WithFields(log.Fields{ "Namespace": namespace, - "Name": name, + "UID": uid, "PodName": podName, "Error": err.Error(), }).Error("Can't connect to S3 storage.") @@ -867,11 +868,12 @@ func (c *Client) GetWorkflowExecutionMetrics(namespace, name, podName string) (m } opts := s3.GetObjectOptions{} - stream, err = s3Client.GetObject(config[ArtifactRepositoryBucketKey], "artifacts/"+namespace+"/"+name+"/"+podName+"/sys-metrics.json", opts) + + stream, err = s3Client.GetObject(config[ArtifactRepositoryBucketKey], "artifacts/"+namespace+"/"+uid+"/"+podName+"/sys-metrics.json", opts) if err != nil { log.WithFields(log.Fields{ "Namespace": namespace, - "Name": name, + "UID": uid, "PodName": podName, "Error": err.Error(), }).Error("Metrics do not exist.") @@ -881,7 +883,7 @@ func (c *Client) GetWorkflowExecutionMetrics(namespace, name, podName string) (m if err != nil { log.WithFields(log.Fields{ "Namespace": namespace, - "Name": name, + "UID": uid, "PodName": podName, "Error": err.Error(), }).Error("Unknown.") @@ -894,7 +896,7 @@ func (c *Client) GetWorkflowExecutionMetrics(namespace, name, podName string) (m if err = json.Unmarshal(content, &metrics); err != nil { log.WithFields(log.Fields{ "Namespace": namespace, - "Name": name, + "UID": uid, "PodName": podName, "Error": err.Error(), }).Error("Error parsing metrics.") @@ -904,8 +906,8 @@ func (c *Client) GetWorkflowExecutionMetrics(namespace, name, podName string) (m return } -func (c *Client) RetryWorkflowExecution(namespace, name string) (workflow *WorkflowExecution, err error) { - wf, err := c.ArgoprojV1alpha1().Workflows(namespace).Get(name, metav1.GetOptions{}) +func (c *Client) RetryWorkflowExecution(namespace, uid string) (workflow *WorkflowExecution, err error) { + wf, err := c.ArgoprojV1alpha1().Workflows(namespace).Get(uid, metav1.GetOptions{}) if err != nil { return } @@ -917,8 +919,8 @@ func (c *Client) RetryWorkflowExecution(namespace, name string) (workflow *Workf return } -func (c *Client) ResubmitWorkflowExecution(namespace, name string) (workflow *WorkflowExecution, err error) { - wf, err := c.ArgoprojV1alpha1().Workflows(namespace).Get(name, metav1.GetOptions{}) +func (c *Client) ResubmitWorkflowExecution(namespace, uid string) (workflow *WorkflowExecution, err error) { + wf, err := c.ArgoprojV1alpha1().Workflows(namespace).Get(uid, metav1.GetOptions{}) if err != nil { return } @@ -938,26 +940,26 @@ func (c *Client) ResubmitWorkflowExecution(namespace, name string) (workflow *Wo return } -func (c *Client) ResumeWorkflowExecution(namespace, name string) (workflow *WorkflowExecution, err error) { - err = argoutil.ResumeWorkflow(c.ArgoprojV1alpha1().Workflows(namespace), name, "") +func (c *Client) ResumeWorkflowExecution(namespace, uid string) (workflow *WorkflowExecution, err error) { + err = argoutil.ResumeWorkflow(c.ArgoprojV1alpha1().Workflows(namespace), uid, "") if err != nil { return } - wf, err := c.ArgoprojV1alpha1().Workflows(namespace).Get(name, metav1.GetOptions{}) + wf, err := c.ArgoprojV1alpha1().Workflows(namespace).Get(uid, metav1.GetOptions{}) workflow = typeWorkflow(wf) return } -func (c *Client) SuspendWorkflowExecution(namespace, name string) (err error) { - err = argoutil.SuspendWorkflow(c.ArgoprojV1alpha1().Workflows(namespace), name) +func (c *Client) SuspendWorkflowExecution(namespace, uid string) (err error) { + err = argoutil.SuspendWorkflow(c.ArgoprojV1alpha1().Workflows(namespace), uid) return } -func (c *Client) TerminateWorkflowExecution(namespace, name string) (err error) { +func (c *Client) TerminateWorkflowExecution(namespace, uid string) (err error) { query := `DELETE FROM workflow_executions USING workflow_template_versions, workflow_templates WHERE workflow_executions.workflow_template_version_id = workflow_template_versions.id @@ -965,16 +967,16 @@ func (c *Client) TerminateWorkflowExecution(namespace, name string) (err error) AND workflow_templates.namespace = $1 AND workflow_executions.name = $2` - if _, err := c.DB.Exec(query, namespace, name); err != nil { + if _, err := c.DB.Exec(query, namespace, uid); err != nil { return err } - err = argoutil.TerminateWorkflow(c.ArgoprojV1alpha1().Workflows(namespace), name) + err = argoutil.TerminateWorkflow(c.ArgoprojV1alpha1().Workflows(namespace), uid) return } -func (c *Client) GetArtifact(namespace, name, key string) (data []byte, err error) { +func (c *Client) GetArtifact(namespace, uid, key string) (data []byte, err error) { config, err := c.GetNamespaceConfig(namespace) if err != nil { return @@ -990,7 +992,7 @@ func (c *Client) GetArtifact(namespace, name, key string) (data []byte, err erro if err != nil { log.WithFields(log.Fields{ "Namespace": namespace, - "Name": name, + "UID": uid, "Key": key, "Error": err.Error(), }).Error("Metrics do not exist.") @@ -1120,12 +1122,12 @@ func filterOutCustomTypesFromManifest(manifest []byte) (result []byte, err error // prefix is the label prefix. // e.g. prefix/my-label-key: my-label-value -func (c *Client) GetWorkflowExecutionLabels(namespace, name, prefix string) (labels map[string]string, err error) { - wf, err := c.ArgoprojV1alpha1().Workflows(namespace).Get(name, metav1.GetOptions{}) +func (c *Client) GetWorkflowExecutionLabels(namespace, uid, prefix string) (labels map[string]string, err error) { + wf, err := c.ArgoprojV1alpha1().Workflows(namespace).Get(uid, metav1.GetOptions{}) if err != nil { log.WithFields(log.Fields{ "Namespace": namespace, - "Name": name, + "UID": uid, "Error": err.Error(), }).Error("Workflow not found.") return nil, util.NewUserError(codes.NotFound, "Workflow not found.") @@ -1137,12 +1139,12 @@ func (c *Client) GetWorkflowExecutionLabels(namespace, name, prefix string) (lab return } -func (c *Client) DeleteWorkflowExecutionLabel(namespace, name string, keysToDelete ...string) (labels map[string]string, err error) { - wf, err := c.ArgoprojV1alpha1().Workflows(namespace).Get(name, metav1.GetOptions{}) +func (c *Client) DeleteWorkflowExecutionLabel(namespace, uid string, keysToDelete ...string) (labels map[string]string, err error) { + wf, err := c.ArgoprojV1alpha1().Workflows(namespace).Get(uid, metav1.GetOptions{}) if err != nil { log.WithFields(log.Fields{ "Namespace": namespace, - "Name": name, + "UID": uid, "Error": err.Error(), }).Error("Workflow not found.") return nil, util.NewUserError(codes.NotFound, "Workflow not found.") @@ -1153,12 +1155,12 @@ func (c *Client) DeleteWorkflowExecutionLabel(namespace, name string, keysToDele return wf.Labels, nil } -func (c *Client) DeleteWorkflowTemplateLabel(namespace, name string, keysToDelete ...string) (labels map[string]string, err error) { - wf, err := c.ArgoprojV1alpha1().WorkflowTemplates(namespace).Get(name, metav1.GetOptions{}) +func (c *Client) DeleteWorkflowTemplateLabel(namespace, uid string, keysToDelete ...string) (labels map[string]string, err error) { + wf, err := c.ArgoprojV1alpha1().WorkflowTemplates(namespace).Get(uid, metav1.GetOptions{}) if err != nil { log.WithFields(log.Fields{ "Namespace": namespace, - "Name": name, + "UID": uid, "Error": err.Error(), }).Error("Workflow Template not found.") return nil, util.NewUserError(codes.NotFound, "Workflow Template not found.") @@ -1172,12 +1174,12 @@ func (c *Client) DeleteWorkflowTemplateLabel(namespace, name string, keysToDelet // prefix is the label prefix. // we delete all labels with that prefix and set the new ones // e.g. prefix/my-label-key: my-label-value -func (c *Client) SetWorkflowExecutionLabels(namespace, name, prefix string, keyValues map[string]string, deleteOld bool) (workflowLabels map[string]string, err error) { - wf, err := c.ArgoprojV1alpha1().Workflows(namespace).Get(name, metav1.GetOptions{}) +func (c *Client) SetWorkflowExecutionLabels(namespace, uid, prefix string, keyValues map[string]string, deleteOld bool) (workflowLabels map[string]string, err error) { + wf, err := c.ArgoprojV1alpha1().Workflows(namespace).Get(uid, metav1.GetOptions{}) if err != nil { log.WithFields(log.Fields{ "Namespace": namespace, - "Name": name, + "UID": uid, "Error": err.Error(), }).Error("Workflow not found.") return nil, util.NewUserError(codes.NotFound, "Workflow not found.") @@ -1203,12 +1205,12 @@ func (c *Client) SetWorkflowExecutionLabels(namespace, name, prefix string, keyV // prefix is the label prefix. // we delete all labels with that prefix and set the new ones // e.g. prefix/my-label-key: my-label-value -func (c *Client) SetWorkflowTemplateLabels(namespace, name, prefix string, keyValues map[string]string, deleteOld bool) (workflowLabels map[string]string, err error) { - wf, err := c.getArgoWorkflowTemplate(namespace, name, "latest") +func (c *Client) SetWorkflowTemplateLabels(namespace, uid, prefix string, keyValues map[string]string, deleteOld bool) (workflowLabels map[string]string, err error) { + wf, err := c.getArgoWorkflowTemplate(namespace, uid, "latest") if err != nil { log.WithFields(log.Fields{ "Namespace": namespace, - "Name": name, + "UID": uid, "Error": err.Error(), }).Error("Workflow Template not found.") return nil, util.NewUserError(codes.NotFound, "Workflow Template not found.") diff --git a/server/workflow_server.go b/server/workflow_server.go index e1b9abe..fb1a860 100644 --- a/server/workflow_server.go +++ b/server/workflow_server.go @@ -112,12 +112,12 @@ func (s *WorkflowServer) AddWorkflowExecutionStatistics(ctx context.Context, req phase = v1alpha1.NodeSucceeded } - workflow, err := client.ArgoprojV1alpha1().Workflows(req.Namespace).Get(req.Name, argov1.GetOptions{}) + workflow, err := client.ArgoprojV1alpha1().Workflows(req.Namespace).Get(req.Uid, argov1.GetOptions{}) if err != nil { return &empty.Empty{}, err } - err = client.FinishWorkflowExecutionStatisticViaExitHandler(req.Namespace, req.Name, + err = client.FinishWorkflowExecutionStatisticViaExitHandler(req.Namespace, req.Uid, req.Statistics.WorkflowTemplateId, phase, workflow.Status.StartedAt.UTC()) if err != nil { @@ -131,12 +131,12 @@ func (s *WorkflowServer) AddWorkflowExecutionStatistics(ctx context.Context, req // all required data. func (s *WorkflowServer) CronStartWorkflowExecutionStatistic(ctx context.Context, req *api.CronStartWorkflowExecutionStatisticRequest) (*empty.Empty, error) { client := ctx.Value("kubeClient").(*v1.Client) - allowed, err := auth.IsAuthorized(client, req.Namespace, "get", "argoproj.io", "workflows", req.Name) + allowed, err := auth.IsAuthorized(client, req.Namespace, "get", "argoproj.io", "workflows", req.Uid) if err != nil || !allowed { return &empty.Empty{}, err } - err = client.CronStartWorkflowExecutionStatisticInsert(req.Namespace, req.Name, req.Statistics.WorkflowTemplateId) + err = client.CronStartWorkflowExecutionStatisticInsert(req.Namespace, req.Uid, req.Statistics.WorkflowTemplateId) if err != nil { return &empty.Empty{}, err } @@ -192,12 +192,12 @@ func (s *WorkflowServer) WatchWorkflowExecution(req *api.WatchWorkflowExecutionR func (s *WorkflowServer) GetWorkflowExecutionLogs(req *api.GetWorkflowExecutionLogsRequest, stream api.WorkflowService_GetWorkflowExecutionLogsServer) error { client := stream.Context().Value("kubeClient").(*v1.Client) - allowed, err := auth.IsAuthorized(client, req.Namespace, "get", "argoproj.io", "workflows", req.Name) + allowed, err := auth.IsAuthorized(client, req.Namespace, "get", "argoproj.io", "workflows", req.Uid) if err != nil || !allowed { return err } - watcher, err := client.GetWorkflowExecutionLogs(req.Namespace, req.Name, req.PodName, req.ContainerName) + watcher, err := client.GetWorkflowExecutionLogs(req.Namespace, req.Uid, req.PodName, req.ContainerName) if err != nil { return err } @@ -222,12 +222,12 @@ func (s *WorkflowServer) GetWorkflowExecutionLogs(req *api.GetWorkflowExecutionL func (s *WorkflowServer) GetWorkflowExecutionMetrics(ctx context.Context, req *api.GetWorkflowExecutionMetricsRequest) (*api.GetWorkflowExecutionMetricsResponse, error) { client := ctx.Value("kubeClient").(*v1.Client) - allowed, err := auth.IsAuthorized(client, req.Namespace, "get", "argoproj.io", "workflows", req.Name) + allowed, err := auth.IsAuthorized(client, req.Namespace, "get", "argoproj.io", "workflows", req.Uid) if err != nil || !allowed { return nil, err } - metrics, err := client.GetWorkflowExecutionMetrics(req.Namespace, req.Name, req.PodName) + metrics, err := client.GetWorkflowExecutionMetrics(req.Namespace, req.Uid, req.PodName) if err != nil { return nil, err } @@ -281,12 +281,12 @@ func (s *WorkflowServer) ListWorkflowExecutions(ctx context.Context, req *api.Li func (s *WorkflowServer) ResubmitWorkflowExecution(ctx context.Context, req *api.ResubmitWorkflowExecutionRequest) (*api.WorkflowExecution, error) { client := ctx.Value("kubeClient").(*v1.Client) - allowed, err := auth.IsAuthorized(client, req.Namespace, "create", "argoproj.io", "workflows", req.Name) + allowed, err := auth.IsAuthorized(client, req.Namespace, "create", "argoproj.io", "workflows", req.Uid) if err != nil || !allowed { return nil, err } - wf, err := client.ResubmitWorkflowExecution(req.Namespace, req.Name) + wf, err := client.ResubmitWorkflowExecution(req.Namespace, req.Uid) if err != nil { return nil, err } @@ -301,7 +301,7 @@ func (s *WorkflowServer) TerminateWorkflowExecution(ctx context.Context, req *ap return nil, err } - err = client.TerminateWorkflowExecution(req.Namespace, req.Name) + err = client.TerminateWorkflowExecution(req.Namespace, req.Uid) if err != nil { return nil, err } @@ -311,12 +311,12 @@ func (s *WorkflowServer) TerminateWorkflowExecution(ctx context.Context, req *ap func (s *WorkflowServer) GetArtifact(ctx context.Context, req *api.GetArtifactRequest) (*api.ArtifactResponse, error) { client := ctx.Value("kubeClient").(*v1.Client) - allowed, err := auth.IsAuthorized(client, req.Namespace, "get", "argoproj.io", "workflows", req.Name) + allowed, err := auth.IsAuthorized(client, req.Namespace, "get", "argoproj.io", "workflows", req.Uid) if err != nil || !allowed { return nil, err } - data, err := client.GetArtifact(req.Namespace, req.Name, req.Key) + data, err := client.GetArtifact(req.Namespace, req.Uid, req.Key) if err != nil { return nil, err } @@ -328,7 +328,7 @@ func (s *WorkflowServer) GetArtifact(ctx context.Context, req *api.GetArtifactRe func (s *WorkflowServer) ListFiles(ctx context.Context, req *api.ListFilesRequest) (*api.ListFilesResponse, error) { client := ctx.Value("kubeClient").(*v1.Client) - allowed, err := auth.IsAuthorized(client, req.Namespace, "get", "argoproj.io", "workflows", req.Name) + allowed, err := auth.IsAuthorized(client, req.Namespace, "get", "argoproj.io", "workflows", req.Uid) if err != nil || !allowed { return nil, err }