Files
onepanel/pkg/labels.go
2020-10-26 15:38:03 -07:00

400 lines
10 KiB
Go

package v1
import (
"fmt"
sq "github.com/Masterminds/squirrel"
"github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/onepanelio/core/pkg/util/label"
"github.com/onepanelio/core/pkg/util/mapping"
"github.com/onepanelio/core/pkg/util/types"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"strings"
)
// SelectLabelsQuery represents the options available to filter a select labels query
type SelectLabelsQuery struct {
Table string
Alias string
Namespace string
KeyLike string
Skip []string
}
// SkipKeysFromString parses keys encoded in a string and returns an array of keys
// The separator is ";"
func SkipKeysFromString(keys string) []string {
results := make([]string, 0)
for _, key := range strings.Split(keys, ";") {
if key == "" {
continue
}
results = append(results, key)
}
return results
}
// SelectLabels returns a SelectBuilder that selects key, value columns from the criteria specified in query
func SelectLabels(query *SelectLabelsQuery) sq.SelectBuilder {
// Sample query
// SELECT DISTINCT labels.*
// FROM workflow_executions w,
// jsonb_each_text(w.labels) labels
// WHERE labels.key LIKE 'ca%'
// AND labels.key NOT IN ('catdog')
// AND namespace = 'onepanel'
fromTable := fmt.Sprintf("%s %s", query.Table, query.Alias)
fromJsonb := fmt.Sprintf("jsonb_each_text(%s.labels) labels", query.Alias)
bld := sb.Select("key", "value").
Distinct().
From(fromTable + ", " + fromJsonb)
if query.Namespace != "" {
bld = bld.Where(sq.Eq{query.Alias + ".namespace": query.Namespace})
}
if query.KeyLike != "" {
bld = bld.Where(sq.Like{"labels.key": query.KeyLike})
}
if len(query.Skip) != 0 {
bld = bld.Where(sq.NotEq{"labels.key": query.Skip})
}
return bld
}
func (c *Client) ListLabels(resource string, uid string) (labels []*Label, err error) {
sb := sb.Select("labels").
From(TypeToTableName(resource))
switch resource {
case TypeWorkflowTemplate:
sb = sb.Where(sq.Eq{"uid": uid})
case TypeWorkflowExecution:
sb = sb.Where(sq.Eq{"uid": uid})
case TypeCronWorkflow:
sb = sb.Where(sq.Eq{"uid": uid})
case TypeWorkspace:
sb = sb.Where(sq.And{
sq.Eq{"uid": uid},
sq.NotEq{"phase": "Terminated"},
})
default:
return nil, fmt.Errorf("unsupported label resource %v", resource)
}
result := types.JSONLabels{}
err = c.DB.Getx(&result, sb)
if err != nil {
return
}
for key, value := range result {
newLabel := &Label{
Key: key,
Value: value,
Resource: resource,
}
labels = append(labels, newLabel)
}
return
}
// ListAvailableLabels lists the labels available for the resource specified by the query
func (c *Client) ListAvailableLabels(query *SelectLabelsQuery) (result []*Label, err error) {
selectLabelsBuilder := SelectLabels(query)
err = c.Selectx(&result, selectLabelsBuilder)
return
}
func (c *Client) AddLabels(namespace, resource, uid string, keyValues map[string]string) error {
source, meta, err := c.GetK8sLabelResource(namespace, resource, uid)
if err != nil {
return err
}
if meta.Labels == nil {
meta.Labels = make(map[string]string)
}
label.MergeLabelsPrefix(meta.Labels, keyValues, label.TagPrefix)
if err := c.UpdateK8sLabelResource(namespace, resource, source); err != nil {
return err
}
return nil
}
func (c *Client) ReplaceLabels(namespace, resource, uid string, keyValues map[string]string) error {
tx, err := c.DB.Begin()
if err != nil {
return err
}
defer tx.Rollback()
tableName := TypeToTableName(resource)
if tableName == "" {
return fmt.Errorf("unknown resources '%v'", resource)
}
var whereCondition interface{} = nil
if resource == TypeWorkspace {
whereCondition = sq.And{
sq.Eq{"uid": uid},
sq.NotEq{"phase": "Terminated"},
}
} else if resource == TypeWorkspaceTemplate || resource == TypeWorkflowExecution {
whereCondition =
sq.Eq{
"uid": uid,
"is_archived": false,
}
}
_, err = sb.Update(tableName).
SetMap(sq.Eq{
"labels": types.JSONLabels(keyValues),
}).
Where(whereCondition).
RunWith(tx).
Exec()
if err != nil {
return err
}
if err := tx.Commit(); err != nil {
return err
}
return c.ReplaceLabelsUsingKnownID(namespace, resource, uid, keyValues)
}
// ReplaceLabelsUsingKnownID updates the k8s resource labels for the given resource/uid
// deprecated
func (c *Client) ReplaceLabelsUsingKnownID(namespace, resource string, uid string, keyValues map[string]string) error {
source, meta, err := c.GetK8sLabelResource(namespace, resource, uid)
if err != nil {
return err
}
if meta != nil {
if meta.Labels == nil {
meta.Labels = make(map[string]string)
}
label.MergeLabelsPrefix(meta.Labels, keyValues, label.TagPrefix)
if err := c.UpdateK8sLabelResource(namespace, resource, source); err != nil {
return err
}
}
return nil
}
func (c *Client) DeleteLabels(namespace, resource, uid string, keyValues map[string]string) error {
tx, err := c.DB.Begin()
if err != nil {
return err
}
defer tx.Rollback()
tableName := TypeToTableName(resource)
if tableName == "" {
return fmt.Errorf("unknown resources '%v'", resource)
}
resourceId := uint64(0)
err = sb.Select("id").
From(tableName).
Where(sq.Eq{
"uid": uid,
}).
RunWith(tx).
QueryRow().
Scan(&resourceId)
if err != nil {
return err
}
_, err = sb.Delete("labels").
Where(sq.Eq{
"key": mapping.PluckKeysStr(keyValues),
"resource": resource,
"resource_id": resourceId,
}).RunWith(tx).
Exec()
if err != nil {
return err
}
if err := tx.Commit(); err != nil {
return err
}
source, meta, err := c.GetK8sLabelResource(namespace, resource, uid)
if err != nil {
return err
}
if meta.Labels == nil {
meta.Labels = make(map[string]string)
}
toDelete := make([]string, 0)
for key := range keyValues {
toDelete = append(toDelete, key)
}
label.Delete(meta.Labels, toDelete...)
if err := c.UpdateK8sLabelResource(namespace, resource, source); err != nil {
return err
}
return nil
}
// DeleteResourceLabels deletes all of the labels for a specific resource, like workflow templates.
// NOTE: this does NOT delete k8s labels, and is only meant to be used for special cases.
func (c *Client) DeleteResourceLabels(runner sq.BaseRunner, resource string) error {
tableName := TypeToTableName(resource)
if tableName == "" {
return fmt.Errorf("unknown resources '%v'", resource)
}
_, err := sb.Delete("labels").
Where(sq.Eq{
"resource": resource,
}).
RunWith(runner).
Exec()
return err
}
func (c *Client) GetK8sLabelResource(namespace, resource, uid string) (source interface{}, result *v1.ObjectMeta, err error) {
switch resource {
case TypeWorkflowTemplateVersion:
return c.getK8sLabelResourceWorkflowTemplateVersion(namespace, uid)
case TypeWorkflowExecution:
return c.getK8sLabelResourceWorkflowExecution(namespace, uid)
case TypeCronWorkflow:
return c.getK8sLabelResourceCronWorkflow(namespace, uid)
case TypeWorkspaceTemplateVersion:
return c.getK8sLabelResourceWorkspaceTemplate(namespace, uid)
}
return nil, nil, nil
}
func (c *Client) getK8sLabelResourceWorkflowTemplateVersion(namespace, uid string) (source interface{}, result *v1.ObjectMeta, err error) {
labelSelect := fmt.Sprintf("%v=%v", label.WorkflowTemplateVersionUid, uid)
workflowTemplates, err := c.ArgoprojV1alpha1().WorkflowTemplates(namespace).List(v1.ListOptions{
LabelSelector: labelSelect,
})
if err != nil {
return nil, nil, err
}
if workflowTemplates.Items.Len() != 1 {
return nil, nil, fmt.Errorf("no argo resource found")
}
item := workflowTemplates.Items[0]
return item, &item.ObjectMeta, nil
}
func (c *Client) getK8sLabelResourceWorkflowExecution(namespace, uid string) (source interface{}, result *v1.ObjectMeta, err error) {
workflow, err := c.ArgoprojV1alpha1().Workflows(namespace).Get(uid, v1.GetOptions{})
if err != nil {
return nil, nil, err
}
return workflow, &workflow.ObjectMeta, nil
}
func (c *Client) getK8sLabelResourceCronWorkflow(namespace, uid string) (source interface{}, result *v1.ObjectMeta, err error) {
labelSelect := fmt.Sprintf("%v=%v", label.CronWorkflowUid, uid)
cronWorkflows, err := c.ArgoprojV1alpha1().CronWorkflows(namespace).List(v1.ListOptions{
LabelSelector: labelSelect,
})
if err != nil {
return nil, nil, err
}
if len(cronWorkflows.Items) != 1 {
return nil, nil, fmt.Errorf("no argo resource found")
}
item := cronWorkflows.Items[0]
return item, &item.ObjectMeta, nil
}
func (c *Client) getK8sLabelResourceWorkspaceTemplate(namespace, uid string) (source interface{}, result *v1.ObjectMeta, err error) {
labelSelect := fmt.Sprintf("%v=%v", label.WorkspaceTemplateVersionUid, uid)
workflowTemplates, err := c.ArgoprojV1alpha1().WorkflowTemplates(namespace).List(v1.ListOptions{
LabelSelector: labelSelect,
})
if err != nil {
return nil, nil, err
}
if workflowTemplates.Items.Len() != 1 {
return nil, nil, fmt.Errorf("no argo resource found")
}
item := workflowTemplates.Items[0]
return item, &item.ObjectMeta, nil
}
func (c *Client) UpdateK8sLabelResource(namespace, resource string, obj interface{}) error {
if resource == TypeWorkflowTemplateVersion {
workflowTemplate, ok := obj.(v1alpha1.WorkflowTemplate)
if !ok {
return fmt.Errorf("unable to convert object to WorkflowTemplate")
}
if _, err := c.ArgoprojV1alpha1().WorkflowTemplates(namespace).Update(&workflowTemplate); err != nil {
return err
}
} else if resource == TypeWorkflowExecution {
workflowExecution, ok := obj.(*v1alpha1.Workflow)
if !ok {
return fmt.Errorf("unable to convert object to workflow")
}
if _, err := c.ArgoprojV1alpha1().Workflows(namespace).Update(workflowExecution); err != nil {
return err
}
} else if resource == TypeCronWorkflow {
cronWorkflow, ok := obj.(v1alpha1.CronWorkflow)
if !ok {
return fmt.Errorf("unable to convert object to cron workflow")
}
if _, err := c.ArgoprojV1alpha1().CronWorkflows(namespace).Update(&cronWorkflow); err != nil {
return err
}
} else if resource == TypeWorkspaceTemplateVersion {
workflowTemplate, ok := obj.(v1alpha1.WorkflowTemplate)
if !ok {
return fmt.Errorf("unable to convert object to WorkflowTemplate")
}
if _, err := c.ArgoprojV1alpha1().WorkflowTemplates(namespace).Update(&workflowTemplate); err != nil {
return err
}
}
return nil
}