mirror of
https://github.com/onepanelio/onepanel.git
synced 2025-09-27 01:56:03 +08:00
537 lines
16 KiB
Go
537 lines
16 KiB
Go
package server
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
|
|
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
|
|
"github.com/onepanelio/core/pkg/util"
|
|
"github.com/onepanelio/core/pkg/util/collection"
|
|
"github.com/onepanelio/core/pkg/util/request"
|
|
"github.com/onepanelio/core/pkg/util/request/pagination"
|
|
"github.com/onepanelio/core/pkg/util/router"
|
|
"github.com/onepanelio/core/server/converter"
|
|
log "github.com/sirupsen/logrus"
|
|
"google.golang.org/grpc/codes"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"time"
|
|
|
|
"github.com/golang/protobuf/ptypes/empty"
|
|
api "github.com/onepanelio/core/api/gen"
|
|
v1 "github.com/onepanelio/core/pkg"
|
|
"github.com/onepanelio/core/pkg/util/ptr"
|
|
"github.com/onepanelio/core/server/auth"
|
|
|
|
requestSort "github.com/onepanelio/core/pkg/util/request/sort"
|
|
)
|
|
|
|
// WorkflowServer is an implementation of the grpc WorkflowServer
|
|
type WorkflowServer struct {
|
|
api.UnimplementedWorkflowServiceServer
|
|
}
|
|
|
|
// NewWorkflowServer creates a new WorkflowServer
|
|
func NewWorkflowServer() *WorkflowServer {
|
|
return &WorkflowServer{}
|
|
}
|
|
|
|
// removedUnusedManifestFields removes any fields not necessary in a Workflow Manfiest.
|
|
// this is used to cut down the size for more efficient data transmission
|
|
func removedUnusedManifestFields(manifest string) (string, error) {
|
|
if manifest == "" {
|
|
return "", nil
|
|
}
|
|
|
|
result := make(map[string]map[string]interface{})
|
|
if err := json.Unmarshal([]byte(manifest), &result); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
for key := range result {
|
|
collection.RemoveBlanks(result[key])
|
|
}
|
|
|
|
delete(result["metadata"], "managedFields")
|
|
delete(result["status"], "resourcesDuration")
|
|
|
|
templatesRaw, ok := result["spec"]["templates"]
|
|
if ok {
|
|
templatesArray := templatesRaw.([]interface{})
|
|
|
|
for _, template := range templatesArray {
|
|
templateMap := template.(map[string]interface{})
|
|
delete(templateMap, "metadata")
|
|
}
|
|
}
|
|
|
|
nodeStatusRaw, ok := result["status"]["nodes"]
|
|
if ok {
|
|
nodeStatusArray := nodeStatusRaw.(map[string]interface{})
|
|
|
|
for _, nodeStatus := range nodeStatusArray {
|
|
nodeStatusMap := nodeStatus.(map[string]interface{})
|
|
delete(nodeStatusMap, "resourcesDuration")
|
|
}
|
|
}
|
|
|
|
finalManifestBytes, err := json.Marshal(result)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return string(finalManifestBytes), nil
|
|
}
|
|
|
|
// apiWorkflowExecution converts a package workflow execution to the api version
|
|
// router is optional
|
|
func apiWorkflowExecution(wf *v1.WorkflowExecution, router router.Web) (workflow *api.WorkflowExecution) {
|
|
manifest, err := removedUnusedManifestFields(wf.Manifest)
|
|
if err != nil {
|
|
log.Printf("error trimming manifest %v", err)
|
|
return nil
|
|
}
|
|
|
|
workflow = &api.WorkflowExecution{
|
|
CreatedAt: wf.CreatedAt.Format(time.RFC3339),
|
|
Uid: wf.UID,
|
|
Name: wf.Name,
|
|
Phase: string(wf.Phase),
|
|
Manifest: manifest,
|
|
Labels: converter.MappingToKeyValue(wf.Labels),
|
|
Metrics: converter.MetricsToAPI(wf.Metrics),
|
|
}
|
|
|
|
if wf.StartedAt != nil && !wf.StartedAt.IsZero() {
|
|
workflow.StartedAt = wf.StartedAt.Format(time.RFC3339)
|
|
}
|
|
if wf.FinishedAt != nil && !wf.FinishedAt.IsZero() {
|
|
workflow.FinishedAt = wf.FinishedAt.Format(time.RFC3339)
|
|
}
|
|
if wf.WorkflowTemplate != nil {
|
|
workflow.WorkflowTemplate = apiWorkflowTemplate(wf.WorkflowTemplate)
|
|
}
|
|
if wf.ParametersBytes != nil {
|
|
parameters, err := wf.LoadParametersFromBytes()
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
workflow.Parameters = converter.ParametersToAPI(parameters)
|
|
}
|
|
|
|
if router != nil {
|
|
workflow.Metadata = &api.WorkflowExecutionMetadata{
|
|
Url: router.WorkflowExecution(wf.Namespace, wf.UID),
|
|
}
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (s *WorkflowServer) CreateWorkflowExecution(ctx context.Context, req *api.CreateWorkflowExecutionRequest) (*api.WorkflowExecution, error) {
|
|
client := getClient(ctx)
|
|
allowed, err := auth.IsAuthorized(client, req.Namespace, "create", "argoproj.io", "workflows", "")
|
|
if err != nil || !allowed {
|
|
return nil, err
|
|
}
|
|
|
|
workflow := &v1.WorkflowExecution{
|
|
Labels: converter.APIKeyValueToLabel(req.Body.Labels),
|
|
WorkflowTemplate: &v1.WorkflowTemplate{
|
|
UID: req.Body.WorkflowTemplateUid,
|
|
Version: req.Body.WorkflowTemplateVersion,
|
|
},
|
|
}
|
|
for _, param := range req.Body.Parameters {
|
|
workflow.Parameters = append(workflow.Parameters, v1.Parameter{
|
|
Name: param.Name,
|
|
Value: ptr.String(param.Value),
|
|
})
|
|
}
|
|
|
|
workflowTemplate, err := client.GetWorkflowTemplate(req.Namespace, req.Body.WorkflowTemplateUid, req.Body.WorkflowTemplateVersion)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
wf, err := client.CreateWorkflowExecution(req.Namespace, workflow, workflowTemplate)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
wf.Namespace = req.Namespace
|
|
|
|
webRouter, err := client.GetWebRouter()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return apiWorkflowExecution(wf, webRouter), nil
|
|
}
|
|
|
|
func (s *WorkflowServer) CloneWorkflowExecution(ctx context.Context, req *api.CloneWorkflowExecutionRequest) (*api.WorkflowExecution, error) {
|
|
client := getClient(ctx)
|
|
allowed, err := auth.IsAuthorized(client, req.Namespace, "create", "argoproj.io", "workflows", "")
|
|
if err != nil || !allowed {
|
|
return nil, err
|
|
}
|
|
|
|
wf, err := client.CloneWorkflowExecution(req.Namespace, req.Uid)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
wf.Namespace = req.Namespace
|
|
|
|
webRouter, err := client.GetWebRouter()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return apiWorkflowExecution(wf, webRouter), nil
|
|
}
|
|
|
|
func (s *WorkflowServer) AddWorkflowExecutionStatistics(ctx context.Context, req *api.AddWorkflowExecutionStatisticRequest) (*empty.Empty, error) {
|
|
client := getClient(ctx)
|
|
phase := v1alpha1.NodeFailed
|
|
if req.Statistics.WorkflowStatus == "Succeeded" {
|
|
phase = v1alpha1.NodeSucceeded
|
|
}
|
|
|
|
// TODO: This needs to be moved to pkg
|
|
workflow, err := client.ArgoprojV1alpha1().Workflows(req.Namespace).Get(req.Uid, metav1.GetOptions{})
|
|
if err != nil {
|
|
return &empty.Empty{}, err
|
|
}
|
|
|
|
err = client.FinishWorkflowExecutionStatisticViaExitHandler(req.Namespace, req.Uid, phase, workflow.Status.StartedAt.UTC())
|
|
|
|
if err != nil {
|
|
return &empty.Empty{}, err
|
|
}
|
|
return &empty.Empty{}, nil
|
|
}
|
|
|
|
// @todo we should not pass in an id into the request.
|
|
// instead pass in the cron workflow uid, we can load the cron workflow from db that way and get
|
|
// all required data.
|
|
func (s *WorkflowServer) CronStartWorkflowExecutionStatistic(ctx context.Context, req *api.CronStartWorkflowExecutionStatisticRequest) (*empty.Empty, error) {
|
|
client := getClient(ctx)
|
|
allowed, err := auth.IsAuthorized(client, req.Namespace, "get", "argoproj.io", "workflows", req.Uid)
|
|
if err != nil || !allowed {
|
|
return &empty.Empty{}, err
|
|
}
|
|
|
|
err = client.CronStartWorkflowExecutionStatisticInsert(req.Namespace, req.Uid, req.Statistics.WorkflowTemplateId)
|
|
if err != nil {
|
|
return &empty.Empty{}, err
|
|
}
|
|
|
|
return &empty.Empty{}, nil
|
|
}
|
|
|
|
func (s *WorkflowServer) GetWorkflowExecution(ctx context.Context, req *api.GetWorkflowExecutionRequest) (*api.WorkflowExecution, error) {
|
|
client := getClient(ctx)
|
|
allowed, err := auth.IsAuthorized(client, req.Namespace, "get", "argoproj.io", "workflows", req.Uid)
|
|
if err != nil || !allowed {
|
|
return nil, err
|
|
}
|
|
|
|
wf, err := client.GetWorkflowExecution(req.Namespace, req.Uid)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if wf == nil {
|
|
return nil, util.NewUserError(codes.NotFound, "Workflow not found")
|
|
}
|
|
|
|
wf.Namespace = req.Namespace
|
|
|
|
webRouter, err := client.GetWebRouter()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return apiWorkflowExecution(wf, webRouter), nil
|
|
}
|
|
|
|
func (s *WorkflowServer) WatchWorkflowExecution(req *api.WatchWorkflowExecutionRequest, stream api.WorkflowService_WatchWorkflowExecutionServer) error {
|
|
client := getClient(stream.Context())
|
|
allowed, err := auth.IsAuthorized(client, req.Namespace, "get", "argoproj.io", "workflows", req.Uid)
|
|
if err != nil || !allowed {
|
|
return err
|
|
}
|
|
|
|
watcher, err := client.WatchWorkflowExecution(req.Namespace, req.Uid)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
webRouter, err := client.GetWebRouter()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for wf := range watcher {
|
|
if wf == nil {
|
|
break
|
|
}
|
|
wf.Namespace = req.Namespace
|
|
if err := stream.Send(apiWorkflowExecution(wf, webRouter)); err != nil {
|
|
log.Printf("Stream Send failed: %v\n", err)
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *WorkflowServer) GetWorkflowExecutionLogs(req *api.GetWorkflowExecutionLogsRequest, stream api.WorkflowService_GetWorkflowExecutionLogsServer) error {
|
|
client := getClient(stream.Context())
|
|
allowed, err := auth.IsAuthorized(client, req.Namespace, "get", "argoproj.io", "workflows", req.Uid)
|
|
if err != nil || !allowed {
|
|
return err
|
|
}
|
|
|
|
watcher, err := client.GetWorkflowExecutionLogs(req.Namespace, req.Uid, req.PodName, req.ContainerName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
le := make([]*v1.LogEntry, 0)
|
|
for {
|
|
le = <-watcher
|
|
if le == nil {
|
|
break
|
|
}
|
|
|
|
apiLogEntries := make([]*api.LogEntry, len(le))
|
|
for i, item := range le {
|
|
apiLogEntries[i] = &api.LogEntry{
|
|
Content: item.Content,
|
|
}
|
|
|
|
if item.Timestamp.After(time.Time{}) {
|
|
apiLogEntries[i].Timestamp = item.Timestamp.Format(time.RFC3339)
|
|
}
|
|
}
|
|
|
|
if err := stream.Send(&api.LogStreamResponse{
|
|
LogEntries: apiLogEntries,
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *WorkflowServer) GetWorkflowExecutionMetrics(ctx context.Context, req *api.GetWorkflowExecutionMetricsRequest) (*api.GetWorkflowExecutionMetricsResponse, error) {
|
|
client := getClient(ctx)
|
|
allowed, err := auth.IsAuthorized(client, req.Namespace, "get", "argoproj.io", "workflows", req.Uid)
|
|
if err != nil || !allowed {
|
|
return nil, err
|
|
}
|
|
|
|
metrics, err := client.GetWorkflowExecutionMetrics(req.Namespace, req.Uid, req.PodName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var apiMetrics []*api.Metric
|
|
for _, m := range metrics {
|
|
apiMetrics = append(apiMetrics, &api.Metric{
|
|
Name: m.Name,
|
|
Value: m.Value,
|
|
Format: m.Format,
|
|
})
|
|
}
|
|
if len(apiMetrics) == 0 {
|
|
return nil, util.NewUserError(codes.NotFound, "Metrics were not found.")
|
|
}
|
|
|
|
return &api.GetWorkflowExecutionMetricsResponse{Metrics: apiMetrics}, nil
|
|
}
|
|
|
|
// ListWorkflowExecutions returns a list of workflow executions that are specified by the criteria in the ListWorkflowExecutionsRequest
|
|
func (s *WorkflowServer) ListWorkflowExecutions(ctx context.Context, req *api.ListWorkflowExecutionsRequest) (*api.ListWorkflowExecutionsResponse, error) {
|
|
client := getClient(ctx)
|
|
allowed, err := auth.IsAuthorized(client, req.Namespace, "list", "argoproj.io", "workflows", "")
|
|
if err != nil || !allowed {
|
|
return nil, err
|
|
}
|
|
|
|
labelFilter, err := v1.LabelsFromString(req.Labels)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
reqSort, err := requestSort.New(req.Order)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resourceRequest := &request.Request{
|
|
Pagination: pagination.New(req.Page, req.PageSize),
|
|
Filter: v1.WorkflowExecutionFilter{
|
|
Labels: labelFilter,
|
|
Phase: req.Phase,
|
|
},
|
|
Sort: reqSort,
|
|
}
|
|
|
|
workflows, err := client.ListWorkflowExecutions(req.Namespace, req.WorkflowTemplateUid, req.WorkflowTemplateVersion, req.IncludeSystem, resourceRequest)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
webRouter, err := client.GetWebRouter()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var apiWorkflowExecutions []*api.WorkflowExecution
|
|
for _, wf := range workflows {
|
|
wf.Namespace = req.Namespace
|
|
apiWorkflowExecutions = append(apiWorkflowExecutions, apiWorkflowExecution(wf, webRouter))
|
|
}
|
|
|
|
count, err := client.CountWorkflowExecutions(req.Namespace, req.WorkflowTemplateUid, req.WorkflowTemplateVersion, req.IncludeSystem, resourceRequest)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
totalCount, err := client.CountWorkflowExecutions(req.Namespace, "", "", req.IncludeSystem, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
paginator := resourceRequest.Pagination
|
|
return &api.ListWorkflowExecutionsResponse{
|
|
Count: int32(len(apiWorkflowExecutions)),
|
|
WorkflowExecutions: apiWorkflowExecutions,
|
|
Page: int32(paginator.Page),
|
|
Pages: paginator.CalculatePages(count),
|
|
TotalCount: int32(count),
|
|
TotalAvailableCount: int32(totalCount),
|
|
}, nil
|
|
}
|
|
|
|
func (s *WorkflowServer) ResubmitWorkflowExecution(ctx context.Context, req *api.ResubmitWorkflowExecutionRequest) (*api.WorkflowExecution, error) {
|
|
client := getClient(ctx)
|
|
allowed, err := auth.IsAuthorized(client, req.Namespace, "create", "argoproj.io", "workflows", req.Uid)
|
|
if err != nil || !allowed {
|
|
return nil, err
|
|
}
|
|
|
|
wf, err := client.ResubmitWorkflowExecution(req.Namespace, req.Uid)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
wf.Namespace = req.Namespace
|
|
webRouter, err := client.GetWebRouter()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return apiWorkflowExecution(wf, webRouter), nil
|
|
}
|
|
|
|
func (s *WorkflowServer) TerminateWorkflowExecution(ctx context.Context, req *api.TerminateWorkflowExecutionRequest) (*empty.Empty, error) {
|
|
client := getClient(ctx)
|
|
allowed, err := auth.IsAuthorized(client, req.Namespace, "update", "argoproj.io", "workflows", "")
|
|
if err != nil || !allowed {
|
|
return nil, err
|
|
}
|
|
|
|
err = client.TerminateWorkflowExecution(req.Namespace, req.Uid)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &empty.Empty{}, nil
|
|
}
|
|
|
|
func (s *WorkflowServer) UpdateWorkflowExecutionStatus(ctx context.Context, req *api.UpdateWorkflowExecutionStatusRequest) (*empty.Empty, error) {
|
|
client := getClient(ctx)
|
|
allowed, err := auth.IsAuthorized(client, req.Namespace, "update", "argoproj.io", "workflows", req.Uid)
|
|
if err != nil || !allowed {
|
|
return &empty.Empty{}, err
|
|
}
|
|
|
|
status := &v1.WorkflowExecutionStatus{
|
|
Phase: wfv1.NodePhase(req.Status.Phase),
|
|
}
|
|
err = client.UpdateWorkflowExecutionStatus(req.Namespace, req.Uid, status)
|
|
|
|
return &empty.Empty{}, err
|
|
}
|
|
|
|
// GetWorkflowExecutionStatisticsForNamespace returns statistics on workflow executions for a given namespace
|
|
func (s *WorkflowServer) GetWorkflowExecutionStatisticsForNamespace(ctx context.Context, req *api.GetWorkflowExecutionStatisticsForNamespaceRequest) (*api.GetWorkflowExecutionStatisticsForNamespaceResponse, error) {
|
|
client := getClient(ctx)
|
|
|
|
allowed, err := auth.IsAuthorized(client, req.Namespace, "list", "argoproj.io", "workflows", "")
|
|
if err != nil || !allowed {
|
|
return nil, err
|
|
}
|
|
|
|
report, err := client.GetWorkflowExecutionStatisticsForNamespace(req.Namespace)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &api.GetWorkflowExecutionStatisticsForNamespaceResponse{
|
|
Stats: converter.WorkflowExecutionStatisticsReportToAPI(report),
|
|
}, nil
|
|
}
|
|
|
|
// AddWorkflowExecutionMetrics merges the input metrics for the workflow execution identified by (namespace,uid)
|
|
func (s *WorkflowServer) AddWorkflowExecutionMetrics(ctx context.Context, req *api.AddWorkflowExecutionsMetricsRequest) (*api.WorkflowExecutionsMetricsResponse, error) {
|
|
client := getClient(ctx)
|
|
|
|
metrics := converter.APIMetricsToCore(req.Metrics)
|
|
workflowExecution, err := client.AddWorkflowExecutionMetrics(req.Namespace, req.Uid, metrics, req.Override)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp := &api.WorkflowExecutionsMetricsResponse{
|
|
Metrics: converter.MetricsToAPI(workflowExecution.Metrics),
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
// UpdateWorkflowExecutionMetrics replaces the metrics with the input metrics for the workflow identified by (namespace, uid)
|
|
func (s *WorkflowServer) UpdateWorkflowExecutionMetrics(ctx context.Context, req *api.UpdateWorkflowExecutionsMetricsRequest) (*api.WorkflowExecutionsMetricsResponse, error) {
|
|
client := getClient(ctx)
|
|
|
|
metrics := converter.APIMetricsToCore(req.Metrics)
|
|
workflowExecution, err := client.UpdateWorkflowExecutionMetrics(req.Namespace, req.Uid, metrics)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp := &api.WorkflowExecutionsMetricsResponse{
|
|
Metrics: converter.MetricsToAPI(workflowExecution.Metrics),
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
// ListWorkflowExecutionsField returns a list of all the distinct values of a field from WorkflowExecutions
|
|
func (s *WorkflowServer) ListWorkflowExecutionsField(ctx context.Context, req *api.ListWorkflowExecutionsFieldRequest) (*api.ListWorkflowExecutionsFieldResponse, error) {
|
|
client := getClient(ctx)
|
|
allowed, err := auth.IsAuthorized(client, req.Namespace, "list", "onepanel.io", "workspaces", "")
|
|
if err != nil || !allowed {
|
|
return nil, err
|
|
}
|
|
|
|
values, err := client.ListWorkflowExecutionsField(req.Namespace, req.FieldName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &api.ListWorkflowExecutionsFieldResponse{
|
|
Values: values,
|
|
}, nil
|
|
}
|