mirror of
https://github.com/onepanelio/onepanel.git
synced 2025-10-21 12:39:48 +08:00
Added support for label operations in CronWorkflows.
- Endpoints added are add, list, update, and delete. Note: A bug exists with delete, it doesn't actually delete the label. WIP.
This commit is contained in:
@@ -5,6 +5,7 @@ import (
|
|||||||
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
|
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
|
||||||
argojson "github.com/argoproj/pkg/json"
|
argojson "github.com/argoproj/pkg/json"
|
||||||
"github.com/onepanelio/core/pkg/util"
|
"github.com/onepanelio/core/pkg/util"
|
||||||
|
"github.com/onepanelio/core/pkg/util/label"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
@@ -182,6 +183,72 @@ func (c *Client) GetCronWorkflow(namespace, name string) (cronWorkflow *CronWork
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// prefix is the label prefix.
|
||||||
|
// e.g. prefix/my-label-key: my-label-value
|
||||||
|
func (c *Client) GetCronWorkflowLabels(namespace, name, prefix string) (labels map[string]string, err error) {
|
||||||
|
cwf, err := c.ArgoprojV1alpha1().CronWorkflows(namespace).Get(name, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"Namespace": namespace,
|
||||||
|
"Name": name,
|
||||||
|
"Error": err.Error(),
|
||||||
|
}).Error("CronWorkflow not found.")
|
||||||
|
return nil, util.NewUserError(codes.NotFound, "CronWorkflow not found.")
|
||||||
|
}
|
||||||
|
|
||||||
|
labels = label.FilterByPrefix(prefix, cwf.Labels)
|
||||||
|
labels = label.RemovePrefix(prefix, labels)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// prefix is the label prefix.
|
||||||
|
// we delete all labels with that prefix and set the new ones
|
||||||
|
// e.g. prefix/my-label-key: my-label-value
|
||||||
|
func (c *Client) SetCronWorkflowLabels(namespace, name, prefix string, keyValues map[string]string, deleteOld bool) (workflowLabels map[string]string, err error) {
|
||||||
|
cwf, err := c.ArgoprojV1alpha1().CronWorkflows(namespace).Get(name, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"Namespace": namespace,
|
||||||
|
"Name": name,
|
||||||
|
"Error": err.Error(),
|
||||||
|
}).Error("CronWorkflow not found.")
|
||||||
|
return nil, util.NewUserError(codes.NotFound, "CronWorkflow not found.")
|
||||||
|
}
|
||||||
|
|
||||||
|
if deleteOld {
|
||||||
|
label.DeleteWithPrefix(cwf.Labels, prefix)
|
||||||
|
}
|
||||||
|
|
||||||
|
label.MergeLabelsPrefix(cwf.Labels, keyValues, prefix+"/")
|
||||||
|
|
||||||
|
cwf, err = c.ArgoprojV1alpha1().CronWorkflows(namespace).Update(cwf)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
filteredMap := label.FilterByPrefix(prefix+"/", cwf.Labels)
|
||||||
|
filteredMap = label.RemovePrefix(prefix+"/", filteredMap)
|
||||||
|
|
||||||
|
return filteredMap, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) DeleteCronWorkflowLabel(namespace, name string, keysToDelete ...string) (labels map[string]string, err error) {
|
||||||
|
wf, err := c.ArgoprojV1alpha1().CronWorkflows(namespace).Get(name, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"Namespace": namespace,
|
||||||
|
"Name": name,
|
||||||
|
"Error": err.Error(),
|
||||||
|
}).Error("CronWorkflow not found.")
|
||||||
|
return nil, util.NewUserError(codes.NotFound, "CronWorkflow not found.")
|
||||||
|
}
|
||||||
|
|
||||||
|
label.Delete(wf.Labels, keysToDelete...)
|
||||||
|
|
||||||
|
return wf.Labels, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Client) ListCronWorkflows(namespace, workflowTemplateUID string) (cronWorkflows []*CronWorkflow, err error) {
|
func (c *Client) ListCronWorkflows(namespace, workflowTemplateUID string) (cronWorkflows []*CronWorkflow, err error) {
|
||||||
listOptions := ListOptions{}
|
listOptions := ListOptions{}
|
||||||
if workflowTemplateUID != "" {
|
if workflowTemplateUID != "" {
|
||||||
|
@@ -145,6 +145,106 @@ func (c *CronWorkflowServer) GetCronWorkflow(ctx context.Context, req *api.GetCr
|
|||||||
return apiCronWorkflow(cwf), nil
|
return apiCronWorkflow(cwf), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *CronWorkflowServer) GetCronWorkflowLabels(ctx context.Context, req *api.GetLabelsRequest) (*api.GetLabelsResponse, error) {
|
||||||
|
client := ctx.Value("kubeClient").(*v1.Client)
|
||||||
|
allowed, err := auth.IsAuthorized(client, req.Namespace, "create", "argoproj.io", "cronworkflows", "")
|
||||||
|
if err != nil || !allowed {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
labels, err := client.GetCronWorkflowLabels(req.Namespace, req.Name, "tags.onepanel.io/")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := &api.GetLabelsResponse{
|
||||||
|
Labels: mapToKeyValue(labels),
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Adds any labels that are not yet associated to the workflow execution.
|
||||||
|
// If the label already exists, overwrites it.
|
||||||
|
func (c *CronWorkflowServer) AddCronWorkflowLabels(ctx context.Context, req *api.AddLabelsRequest) (*api.GetLabelsResponse, error) {
|
||||||
|
client := ctx.Value("kubeClient").(*v1.Client)
|
||||||
|
allowed, err := auth.IsAuthorized(client, req.Namespace, "create", "argoproj.io", "cronworkflows", "")
|
||||||
|
if err != nil || !allowed {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
keyValues := make(map[string]string)
|
||||||
|
for _, item := range req.Labels.Items {
|
||||||
|
keyValues[item.Key] = item.Value
|
||||||
|
}
|
||||||
|
|
||||||
|
labels, err := client.SetCronWorkflowLabels(req.Namespace, req.Name, "tags.onepanel.io", keyValues, false)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := &api.GetLabelsResponse{
|
||||||
|
Labels: mapToKeyValue(labels),
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deletes all of the old labels and adds the new ones.
|
||||||
|
func (c *CronWorkflowServer) ReplaceCronWorkflowLabels(ctx context.Context, req *api.ReplaceLabelsRequest) (*api.GetLabelsResponse, error) {
|
||||||
|
client := ctx.Value("kubeClient").(*v1.Client)
|
||||||
|
allowed, err := auth.IsAuthorized(client, req.Namespace, "create", "argoproj.io", "cronworkflows", "")
|
||||||
|
if err != nil || !allowed {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
keyValues := make(map[string]string)
|
||||||
|
for _, item := range req.Labels.Items {
|
||||||
|
keyValues[item.Key] = item.Value
|
||||||
|
}
|
||||||
|
|
||||||
|
labels, err := client.SetCronWorkflowLabels(req.Namespace, req.Name, "tags.onepanel.io", keyValues, true)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := &api.GetLabelsResponse{
|
||||||
|
Labels: mapToKeyValue(labels),
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *CronWorkflowServer) DeleteCronWorkflowLabel(ctx context.Context, req *api.DeleteLabelRequest) (*api.GetLabelsResponse, error) {
|
||||||
|
client := ctx.Value("kubeClient").(*v1.Client)
|
||||||
|
allowed, err := auth.IsAuthorized(client, req.Namespace, "delete", "argoproj.io", "cronworkflows", "")
|
||||||
|
if err != nil || !allowed {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
keyToDelete := "tags.onepanel.io/" + req.Key
|
||||||
|
labels, err := client.DeleteCronWorkflowLabel(req.Namespace, req.Name, keyToDelete)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
keyValues := make(map[string]string)
|
||||||
|
for key, val := range labels {
|
||||||
|
keyValues[key] = val
|
||||||
|
}
|
||||||
|
|
||||||
|
labels, err = client.SetCronWorkflowLabels(req.Namespace, req.Name, "tags.onepanel.io", keyValues, true)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := &api.GetLabelsResponse{
|
||||||
|
Labels: mapToKeyValue(labels),
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *CronWorkflowServer) ListCronWorkflows(ctx context.Context, req *api.ListCronWorkflowRequest) (*api.ListCronWorkflowsResponse, error) {
|
func (c *CronWorkflowServer) ListCronWorkflows(ctx context.Context, req *api.ListCronWorkflowRequest) (*api.ListCronWorkflowsResponse, error) {
|
||||||
client := ctx.Value("kubeClient").(*v1.Client)
|
client := ctx.Value("kubeClient").(*v1.Client)
|
||||||
allowed, err := auth.IsAuthorized(client, req.Namespace, "list", "argoproj.io", "cronworkflows", "")
|
allowed, err := auth.IsAuthorized(client, req.Namespace, "list", "argoproj.io", "cronworkflows", "")
|
||||||
|
Reference in New Issue
Block a user