mirror of
https://github.com/onepanelio/onepanel.git
synced 2025-10-05 13:46:51 +08:00
Updating name to uid in cron_workflow and related code.
This commit is contained in:

committed by
rushtehrani

parent
20151d5108
commit
fc10444656
@@ -8,7 +8,7 @@ import (
|
|||||||
"github.com/onepanelio/core/pkg/util"
|
"github.com/onepanelio/core/pkg/util"
|
||||||
"github.com/onepanelio/core/pkg/util/label"
|
"github.com/onepanelio/core/pkg/util/label"
|
||||||
"github.com/onepanelio/core/pkg/util/pagination"
|
"github.com/onepanelio/core/pkg/util/pagination"
|
||||||
"github.com/onepanelio/core/pkg/util/uid"
|
uid2 "github.com/onepanelio/core/pkg/util/uid"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
@@ -16,7 +16,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (c *Client) UpdateCronWorkflow(namespace string, name string, cronWorkflow *CronWorkflow) (*CronWorkflow, error) {
|
func (c *Client) UpdateCronWorkflow(namespace string, uid string, cronWorkflow *CronWorkflow) (*CronWorkflow, error) {
|
||||||
err := c.cronWorkflowSelectBuilderNoColumns(namespace, cronWorkflow.WorkflowExecution.WorkflowTemplate.UID).
|
err := c.cronWorkflowSelectBuilderNoColumns(namespace, cronWorkflow.WorkflowExecution.WorkflowTemplate.UID).
|
||||||
Columns("cw.id").
|
Columns("cw.id").
|
||||||
RunWith(c.DB).
|
RunWith(c.DB).
|
||||||
@@ -39,7 +39,7 @@ func (c *Client) UpdateCronWorkflow(namespace string, name string, cronWorkflow
|
|||||||
|
|
||||||
// TODO: Need to pull system parameters from k8s config/secret here, example: HOST
|
// TODO: Need to pull system parameters from k8s config/secret here, example: HOST
|
||||||
opts := &WorkflowExecutionOptions{}
|
opts := &WorkflowExecutionOptions{}
|
||||||
opts.GenerateName, err = uid.GenerateUID(workflowTemplate.Name)
|
opts.GenerateName, err = uid2.GenerateUID(workflowTemplate.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -93,7 +93,7 @@ func (c *Client) UpdateCronWorkflow(namespace string, name string, cronWorkflow
|
|||||||
|
|
||||||
wf := workflows[0]
|
wf := workflows[0]
|
||||||
argoCronWorkflow.Spec.WorkflowSpec = wf.Spec
|
argoCronWorkflow.Spec.WorkflowSpec = wf.Spec
|
||||||
_, err = c.updateCronWorkflow(namespace, name, &workflowTemplate.ID, &wf, &argoCronWorkflow, opts)
|
_, err = c.updateCronWorkflow(namespace, uid, &workflowTemplate.ID, &wf, &argoCronWorkflow, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"Namespace": namespace,
|
"Namespace": namespace,
|
||||||
@@ -233,7 +233,7 @@ func (c *Client) CreateCronWorkflow(namespace string, cronWorkflow *CronWorkflow
|
|||||||
cronWorkflow.Name = argoCreatedCronWorkflow.Name
|
cronWorkflow.Name = argoCreatedCronWorkflow.Name
|
||||||
cronWorkflow.CreatedAt = argoCreatedCronWorkflow.CreationTimestamp.UTC()
|
cronWorkflow.CreatedAt = argoCreatedCronWorkflow.CreationTimestamp.UTC()
|
||||||
|
|
||||||
cronWorkflow.UID, err = uid.GenerateUID(argoCreatedCronWorkflow.Name)
|
cronWorkflow.UID, err = uid2.GenerateUID(argoCreatedCronWorkflow.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -278,10 +278,10 @@ func (c *Client) CreateCronWorkflow(namespace string, cronWorkflow *CronWorkflow
|
|||||||
return cronWorkflow, nil
|
return cronWorkflow, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) GetCronWorkflow(namespace, name string) (cronWorkflow *CronWorkflow, err error) {
|
func (c *Client) GetCronWorkflow(namespace, uid string) (cronWorkflow *CronWorkflow, err error) {
|
||||||
cronWorkflow = &CronWorkflow{}
|
cronWorkflow = &CronWorkflow{}
|
||||||
|
|
||||||
err = c.cronWorkflowSelectBuilderNamespaceName(namespace, name).
|
err = c.cronWorkflowSelectBuilderNamespaceName(namespace, uid).
|
||||||
RunWith(c.DB).
|
RunWith(c.DB).
|
||||||
QueryRow().
|
QueryRow().
|
||||||
Scan(cronWorkflow)
|
Scan(cronWorkflow)
|
||||||
@@ -465,13 +465,13 @@ func (c *Client) buildCronWorkflowDefinition(namespace string, workflowTemplateI
|
|||||||
return cwf, nil
|
return cwf, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) updateCronWorkflow(namespace string, name string, workflowTemplateId *uint64, wf *wfv1.Workflow, cwf *wfv1.CronWorkflow, opts *WorkflowExecutionOptions) (updatedCronWorkflow *wfv1.CronWorkflow, err error) {
|
func (c *Client) updateCronWorkflow(namespace string, uid string, workflowTemplateId *uint64, wf *wfv1.Workflow, cwf *wfv1.CronWorkflow, opts *WorkflowExecutionOptions) (updatedCronWorkflow *wfv1.CronWorkflow, err error) {
|
||||||
//Make sure the CronWorkflow exists before we edit it
|
//Make sure the CronWorkflow exists before we edit it
|
||||||
toUpdateCWF, err := c.ArgoprojV1alpha1().CronWorkflows(namespace).Get(name, metav1.GetOptions{})
|
toUpdateCWF, err := c.ArgoprojV1alpha1().CronWorkflows(namespace).Get(uid, metav1.GetOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"Namespace": namespace,
|
"Namespace": namespace,
|
||||||
"Name": name,
|
"UID": uid,
|
||||||
"Error": err.Error(),
|
"Error": err.Error(),
|
||||||
}).Error("CronWorkflow not found.")
|
}).Error("CronWorkflow not found.")
|
||||||
return nil, util.NewUserError(codes.NotFound, "CronWorkflow not found.")
|
return nil, util.NewUserError(codes.NotFound, "CronWorkflow not found.")
|
||||||
@@ -482,7 +482,7 @@ func (c *Client) updateCronWorkflow(namespace string, name string, workflowTempl
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
cwf.Name = name
|
cwf.Name = uid
|
||||||
cwf.ResourceVersion = toUpdateCWF.ResourceVersion
|
cwf.ResourceVersion = toUpdateCWF.ResourceVersion
|
||||||
updatedCronWorkflow, err = c.ArgoprojV1alpha1().CronWorkflows(namespace).Update(cwf)
|
updatedCronWorkflow, err = c.ArgoprojV1alpha1().CronWorkflows(namespace).Update(cwf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -506,14 +506,14 @@ func (c *Client) createCronWorkflow(namespace string, workflowTemplateId *uint64
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) TerminateCronWorkflow(namespace, name string) (err error) {
|
func (c *Client) TerminateCronWorkflow(namespace, uid string) (err error) {
|
||||||
query, args, err := sb.Select(cronWorkflowColumns("wtv.version")...).
|
query, args, err := sb.Select(cronWorkflowColumns("wtv.version")...).
|
||||||
From("cron_workflows cw").
|
From("cron_workflows cw").
|
||||||
Join("workflow_template_versions wtv ON wtv.id = cw.workflow_template_version_id").
|
Join("workflow_template_versions wtv ON wtv.id = cw.workflow_template_version_id").
|
||||||
Join("workflow_templates wt ON wt.id = wtv.workflow_template_id").
|
Join("workflow_templates wt ON wt.id = wtv.workflow_template_id").
|
||||||
Where(sq.Eq{
|
Where(sq.Eq{
|
||||||
"wt.namespace": namespace,
|
"wt.namespace": namespace,
|
||||||
"cw.name": name,
|
"cw.name": uid,
|
||||||
}).
|
}).
|
||||||
ToSql()
|
ToSql()
|
||||||
|
|
||||||
@@ -535,11 +535,11 @@ func (c *Client) TerminateCronWorkflow(namespace, name string) (err error) {
|
|||||||
AND workflow_template_versions.workflow_template_id = workflow_templates.id
|
AND workflow_template_versions.workflow_template_id = workflow_templates.id
|
||||||
AND workflow_templates.namespace = $1
|
AND workflow_templates.namespace = $1
|
||||||
AND cron_workflows.name = $2`
|
AND cron_workflows.name = $2`
|
||||||
if _, err := c.DB.Exec(query, namespace, name); err != nil {
|
if _, err := c.DB.Exec(query, namespace, uid); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = c.ArgoprojV1alpha1().CronWorkflows(namespace).Delete(name, nil)
|
err = c.ArgoprojV1alpha1().CronWorkflows(namespace).Delete(uid, nil)
|
||||||
if err != nil && strings.Contains(err.Error(), "not found") {
|
if err != nil && strings.Contains(err.Error(), "not found") {
|
||||||
err = nil
|
err = nil
|
||||||
}
|
}
|
||||||
@@ -567,7 +567,7 @@ func (c *Client) cronWorkflowSelectBuilder(namespace string, workflowTemplateUid
|
|||||||
return sb
|
return sb
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) cronWorkflowSelectBuilderNamespaceName(namespace string, name string) sq.SelectBuilder {
|
func (c *Client) cronWorkflowSelectBuilderNamespaceName(namespace string, uid string) sq.SelectBuilder {
|
||||||
sb := sb.Select("cw.id", "cw.created_at", "cw.uid", "cw.name", "cw.workflow_template_version_id").
|
sb := sb.Select("cw.id", "cw.created_at", "cw.uid", "cw.name", "cw.workflow_template_version_id").
|
||||||
Columns("cw.manifest", "wtv.version").
|
Columns("cw.manifest", "wtv.version").
|
||||||
From("cron_workflows cw").
|
From("cron_workflows cw").
|
||||||
@@ -575,7 +575,7 @@ func (c *Client) cronWorkflowSelectBuilderNamespaceName(namespace string, name s
|
|||||||
Join("workflow_templates wt ON wt.id = wtv.workflow_template_id").
|
Join("workflow_templates wt ON wt.id = wtv.workflow_template_id").
|
||||||
Where(sq.Eq{
|
Where(sq.Eq{
|
||||||
"wt.namespace": namespace,
|
"wt.namespace": namespace,
|
||||||
"cw.name": name,
|
"cw.name": uid,
|
||||||
})
|
})
|
||||||
|
|
||||||
return sb
|
return sb
|
||||||
|
@@ -127,7 +127,7 @@ func (c *CronWorkflowServer) UpdateCronWorkflow(ctx context.Context, req *api.Up
|
|||||||
Labels: converter.APIKeyValueToLabel(req.CronWorkflow.Labels),
|
Labels: converter.APIKeyValueToLabel(req.CronWorkflow.Labels),
|
||||||
}
|
}
|
||||||
|
|
||||||
cwf, err := client.UpdateCronWorkflow(req.Namespace, req.Name, &cronWorkflow)
|
cwf, err := client.UpdateCronWorkflow(req.Namespace, req.Uid, &cronWorkflow)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -139,11 +139,11 @@ func (c *CronWorkflowServer) UpdateCronWorkflow(ctx context.Context, req *api.Up
|
|||||||
|
|
||||||
func (c *CronWorkflowServer) GetCronWorkflow(ctx context.Context, req *api.GetCronWorkflowRequest) (*api.CronWorkflow, error) {
|
func (c *CronWorkflowServer) GetCronWorkflow(ctx context.Context, req *api.GetCronWorkflowRequest) (*api.CronWorkflow, error) {
|
||||||
client := ctx.Value("kubeClient").(*v1.Client)
|
client := ctx.Value("kubeClient").(*v1.Client)
|
||||||
allowed, err := auth.IsAuthorized(client, req.Namespace, "get", "argoproj.io", "cronworkflows", req.Name)
|
allowed, err := auth.IsAuthorized(client, req.Namespace, "get", "argoproj.io", "cronworkflows", req.Uid)
|
||||||
if err != nil || !allowed {
|
if err != nil || !allowed {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
cwf, err := client.GetCronWorkflow(req.Namespace, req.Name)
|
cwf, err := client.GetCronWorkflow(req.Namespace, req.Uid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -188,7 +188,7 @@ func (c *CronWorkflowServer) TerminateCronWorkflow(ctx context.Context, req *api
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = client.TerminateCronWorkflow(req.Namespace, req.Name)
|
err = client.TerminateCronWorkflow(req.Namespace, req.Uid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user