mirror of
https://github.com/onepanelio/onepanel.git
synced 2025-09-27 01:56:03 +08:00
409 lines
10 KiB
Go
409 lines
10 KiB
Go
package v1
|
|
|
|
import (
|
|
"fmt"
|
|
sq "github.com/Masterminds/squirrel"
|
|
"github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
|
|
"github.com/onepanelio/core/pkg/util/label"
|
|
"github.com/onepanelio/core/pkg/util/mapping"
|
|
"github.com/onepanelio/core/pkg/util/types"
|
|
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"strings"
|
|
)
|
|
|
|
// SelectLabelsQuery represents the options available to filter a select labels query
|
|
type SelectLabelsQuery struct {
|
|
Table string
|
|
Alias string
|
|
Namespace string
|
|
KeyLike string
|
|
Skip []string
|
|
}
|
|
|
|
// SkipKeysFromString parses keys encoded in a string and returns an array of keys
|
|
// The separator is ";"
|
|
func SkipKeysFromString(keys string) []string {
|
|
results := make([]string, 0)
|
|
for _, key := range strings.Split(keys, ";") {
|
|
if key == "" {
|
|
continue
|
|
}
|
|
|
|
results = append(results, key)
|
|
}
|
|
|
|
return results
|
|
}
|
|
|
|
// SelectLabels returns a SelectBuilder that selects key, value columns from the criteria specified in query
|
|
func SelectLabels(query *SelectLabelsQuery) sq.SelectBuilder {
|
|
// Sample query
|
|
// SELECT DISTINCT labels.*
|
|
// FROM workflow_executions w,
|
|
// jsonb_each_text(w.labels) labels
|
|
// WHERE labels.key LIKE 'ca%'
|
|
// AND labels.key NOT IN ('catdog')
|
|
// AND namespace = 'onepanel'
|
|
// AND labels != 'null'::jsonb
|
|
|
|
fromTable := fmt.Sprintf("%s %s", query.Table, query.Alias)
|
|
fromJsonb := fmt.Sprintf("jsonb_each_text(%s.labels) labels", query.Alias)
|
|
|
|
bld := sb.Select("key", "value").
|
|
Distinct().
|
|
From(fromTable + ", " + fromJsonb).
|
|
Where("labels != 'null'::jsonb")
|
|
|
|
if query.Namespace != "" {
|
|
bld = bld.Where(sq.Eq{query.Alias + ".namespace": query.Namespace})
|
|
}
|
|
if query.KeyLike != "" {
|
|
bld = bld.Where(sq.Like{"labels.key": query.KeyLike})
|
|
}
|
|
if len(query.Skip) != 0 {
|
|
bld = bld.Where(sq.NotEq{"labels.key": query.Skip})
|
|
}
|
|
|
|
return bld
|
|
}
|
|
|
|
func (c *Client) ListLabels(resource string, uid string) (labels []*Label, err error) {
|
|
sb := sb.Select("labels").
|
|
From(TypeToTableName(resource))
|
|
|
|
switch resource {
|
|
case TypeWorkflowTemplate:
|
|
sb = sb.Where(sq.Eq{"uid": uid})
|
|
case TypeWorkflowExecution:
|
|
sb = sb.Where(sq.Eq{"uid": uid})
|
|
case TypeCronWorkflow:
|
|
sb = sb.Where(sq.Eq{"uid": uid})
|
|
case TypeWorkspace:
|
|
sb = sb.Where(sq.And{
|
|
sq.Eq{"uid": uid},
|
|
sq.NotEq{"phase": "Terminated"},
|
|
})
|
|
default:
|
|
return nil, fmt.Errorf("unsupported label resource %v", resource)
|
|
}
|
|
|
|
result := types.JSONLabels{}
|
|
err = c.DB.Getx(&result, sb)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
for key, value := range result {
|
|
newLabel := &Label{
|
|
Key: key,
|
|
Value: value,
|
|
Resource: resource,
|
|
}
|
|
|
|
labels = append(labels, newLabel)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// ListAvailableLabels lists the labels available for the resource specified by the query
|
|
func (c *Client) ListAvailableLabels(query *SelectLabelsQuery) (result []*Label, err error) {
|
|
selectLabelsBuilder := SelectLabels(query)
|
|
|
|
// Don't select labels from Terminated workspaces.
|
|
if query.Table == "workspaces" {
|
|
selectLabelsBuilder = selectLabelsBuilder.Where(sq.NotEq{
|
|
"l.phase": "Terminated",
|
|
})
|
|
}
|
|
|
|
err = c.Selectx(&result, selectLabelsBuilder)
|
|
|
|
return
|
|
}
|
|
|
|
func (c *Client) AddLabels(namespace, resource, uid string, keyValues map[string]string) error {
|
|
source, meta, err := c.GetK8sLabelResource(namespace, resource, uid)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if meta.Labels == nil {
|
|
meta.Labels = make(map[string]string)
|
|
}
|
|
label.MergeLabelsPrefix(meta.Labels, keyValues, label.TagPrefix)
|
|
if err := c.UpdateK8sLabelResource(namespace, resource, source); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Client) ReplaceLabels(namespace, resource, uid string, keyValues map[string]string) error {
|
|
tx, err := c.DB.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
tableName := TypeToTableName(resource)
|
|
if tableName == "" {
|
|
return fmt.Errorf("unknown resources '%v'", resource)
|
|
}
|
|
|
|
var whereCondition interface{} = nil
|
|
if resource == TypeWorkspace {
|
|
whereCondition = sq.And{
|
|
sq.Eq{"uid": uid},
|
|
sq.NotEq{"phase": "Terminated"},
|
|
}
|
|
} else if resource == TypeWorkspaceTemplate || resource == TypeWorkflowExecution {
|
|
whereCondition =
|
|
sq.Eq{
|
|
"uid": uid,
|
|
"is_archived": false,
|
|
}
|
|
}
|
|
|
|
_, err = sb.Update(tableName).
|
|
SetMap(sq.Eq{
|
|
"labels": types.JSONLabels(keyValues),
|
|
}).
|
|
Where(whereCondition).
|
|
RunWith(tx).
|
|
Exec()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return c.ReplaceLabelsUsingKnownID(namespace, resource, uid, keyValues)
|
|
}
|
|
|
|
// ReplaceLabelsUsingKnownID updates the k8s resource labels for the given resource/uid
|
|
// deprecated
|
|
func (c *Client) ReplaceLabelsUsingKnownID(namespace, resource string, uid string, keyValues map[string]string) error {
|
|
source, meta, err := c.GetK8sLabelResource(namespace, resource, uid)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if meta != nil {
|
|
if meta.Labels == nil {
|
|
meta.Labels = make(map[string]string)
|
|
}
|
|
label.MergeLabelsPrefix(meta.Labels, keyValues, label.TagPrefix)
|
|
if err := c.UpdateK8sLabelResource(namespace, resource, source); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Client) DeleteLabels(namespace, resource, uid string, keyValues map[string]string) error {
|
|
tx, err := c.DB.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
tableName := TypeToTableName(resource)
|
|
if tableName == "" {
|
|
return fmt.Errorf("unknown resources '%v'", resource)
|
|
}
|
|
|
|
resourceId := uint64(0)
|
|
err = sb.Select("id").
|
|
From(tableName).
|
|
Where(sq.Eq{
|
|
"uid": uid,
|
|
}).
|
|
RunWith(tx).
|
|
QueryRow().
|
|
Scan(&resourceId)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = sb.Delete("labels").
|
|
Where(sq.Eq{
|
|
"key": mapping.PluckKeysStr(keyValues),
|
|
"resource": resource,
|
|
"resource_id": resourceId,
|
|
}).RunWith(tx).
|
|
Exec()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
return err
|
|
}
|
|
|
|
source, meta, err := c.GetK8sLabelResource(namespace, resource, uid)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if meta.Labels == nil {
|
|
meta.Labels = make(map[string]string)
|
|
}
|
|
|
|
toDelete := make([]string, 0)
|
|
for key := range keyValues {
|
|
toDelete = append(toDelete, key)
|
|
}
|
|
|
|
label.Delete(meta.Labels, toDelete...)
|
|
if err := c.UpdateK8sLabelResource(namespace, resource, source); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// DeleteResourceLabels deletes all of the labels for a specific resource, like workflow templates.
|
|
// NOTE: this does NOT delete k8s labels, and is only meant to be used for special cases.
|
|
func (c *Client) DeleteResourceLabels(runner sq.BaseRunner, resource string) error {
|
|
tableName := TypeToTableName(resource)
|
|
if tableName == "" {
|
|
return fmt.Errorf("unknown resources '%v'", resource)
|
|
}
|
|
|
|
_, err := sb.Delete("labels").
|
|
Where(sq.Eq{
|
|
"resource": resource,
|
|
}).
|
|
RunWith(runner).
|
|
Exec()
|
|
|
|
return err
|
|
}
|
|
|
|
func (c *Client) GetK8sLabelResource(namespace, resource, uid string) (source interface{}, result *v1.ObjectMeta, err error) {
|
|
switch resource {
|
|
case TypeWorkflowTemplateVersion:
|
|
return c.getK8sLabelResourceWorkflowTemplateVersion(namespace, uid)
|
|
case TypeWorkflowExecution:
|
|
return c.getK8sLabelResourceWorkflowExecution(namespace, uid)
|
|
case TypeCronWorkflow:
|
|
return c.getK8sLabelResourceCronWorkflow(namespace, uid)
|
|
case TypeWorkspaceTemplateVersion:
|
|
return c.getK8sLabelResourceWorkspaceTemplate(namespace, uid)
|
|
}
|
|
|
|
return nil, nil, nil
|
|
}
|
|
|
|
func (c *Client) getK8sLabelResourceWorkflowTemplateVersion(namespace, uid string) (source interface{}, result *v1.ObjectMeta, err error) {
|
|
labelSelect := fmt.Sprintf("%v=%v", label.WorkflowTemplateVersionUid, uid)
|
|
|
|
workflowTemplates, err := c.ArgoprojV1alpha1().WorkflowTemplates(namespace).List(v1.ListOptions{
|
|
LabelSelector: labelSelect,
|
|
})
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
if workflowTemplates.Items.Len() != 1 {
|
|
return nil, nil, fmt.Errorf("no argo resource found")
|
|
}
|
|
|
|
item := workflowTemplates.Items[0]
|
|
|
|
return item, &item.ObjectMeta, nil
|
|
}
|
|
|
|
func (c *Client) getK8sLabelResourceWorkflowExecution(namespace, uid string) (source interface{}, result *v1.ObjectMeta, err error) {
|
|
workflow, err := c.ArgoprojV1alpha1().Workflows(namespace).Get(uid, v1.GetOptions{})
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
return workflow, &workflow.ObjectMeta, nil
|
|
}
|
|
|
|
func (c *Client) getK8sLabelResourceCronWorkflow(namespace, uid string) (source interface{}, result *v1.ObjectMeta, err error) {
|
|
labelSelect := fmt.Sprintf("%v=%v", label.CronWorkflowUid, uid)
|
|
|
|
cronWorkflows, err := c.ArgoprojV1alpha1().CronWorkflows(namespace).List(v1.ListOptions{
|
|
LabelSelector: labelSelect,
|
|
})
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
if len(cronWorkflows.Items) != 1 {
|
|
return nil, nil, fmt.Errorf("no argo resource found")
|
|
}
|
|
|
|
item := cronWorkflows.Items[0]
|
|
|
|
return item, &item.ObjectMeta, nil
|
|
}
|
|
|
|
func (c *Client) getK8sLabelResourceWorkspaceTemplate(namespace, uid string) (source interface{}, result *v1.ObjectMeta, err error) {
|
|
labelSelect := fmt.Sprintf("%v=%v", label.WorkspaceTemplateVersionUid, uid)
|
|
|
|
workflowTemplates, err := c.ArgoprojV1alpha1().WorkflowTemplates(namespace).List(v1.ListOptions{
|
|
LabelSelector: labelSelect,
|
|
})
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
if workflowTemplates.Items.Len() != 1 {
|
|
return nil, nil, fmt.Errorf("no argo resource found")
|
|
}
|
|
|
|
item := workflowTemplates.Items[0]
|
|
|
|
return item, &item.ObjectMeta, nil
|
|
}
|
|
|
|
func (c *Client) UpdateK8sLabelResource(namespace, resource string, obj interface{}) error {
|
|
if resource == TypeWorkflowTemplateVersion {
|
|
workflowTemplate, ok := obj.(v1alpha1.WorkflowTemplate)
|
|
if !ok {
|
|
return fmt.Errorf("unable to convert object to WorkflowTemplate")
|
|
}
|
|
|
|
if _, err := c.ArgoprojV1alpha1().WorkflowTemplates(namespace).Update(&workflowTemplate); err != nil {
|
|
return err
|
|
}
|
|
} else if resource == TypeWorkflowExecution {
|
|
workflowExecution, ok := obj.(*v1alpha1.Workflow)
|
|
if !ok {
|
|
return fmt.Errorf("unable to convert object to workflow")
|
|
}
|
|
|
|
if _, err := c.ArgoprojV1alpha1().Workflows(namespace).Update(workflowExecution); err != nil {
|
|
return err
|
|
}
|
|
} else if resource == TypeCronWorkflow {
|
|
cronWorkflow, ok := obj.(v1alpha1.CronWorkflow)
|
|
if !ok {
|
|
return fmt.Errorf("unable to convert object to cron workflow")
|
|
}
|
|
|
|
if _, err := c.ArgoprojV1alpha1().CronWorkflows(namespace).Update(&cronWorkflow); err != nil {
|
|
return err
|
|
}
|
|
} else if resource == TypeWorkspaceTemplateVersion {
|
|
workflowTemplate, ok := obj.(v1alpha1.WorkflowTemplate)
|
|
if !ok {
|
|
return fmt.Errorf("unable to convert object to WorkflowTemplate")
|
|
}
|
|
|
|
if _, err := c.ArgoprojV1alpha1().WorkflowTemplates(namespace).Update(&workflowTemplate); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|