diff --git a/api/api.swagger.json b/api/api.swagger.json index a6d5572..5c14ea5 100644 --- a/api/api.swagger.json +++ b/api/api.swagger.json @@ -1296,6 +1296,20 @@ "in": "path", "required": true, "type": "string" + }, + { + "name": "pageSize", + "in": "query", + "required": false, + "type": "integer", + "format": "int32" + }, + { + "name": "page", + "in": "query", + "required": false, + "type": "integer", + "format": "int32" } ], "tags": [ @@ -2106,6 +2120,18 @@ "items": { "$ref": "#/definitions/WorkflowTemplate" } + }, + "page": { + "type": "integer", + "format": "int32" + }, + "pages": { + "type": "integer", + "format": "int32" + }, + "totalCount": { + "type": "integer", + "format": "int32" } } }, diff --git a/api/cron_workflow.proto b/api/cron_workflow.proto index e1c268f..4e18d92 100644 --- a/api/cron_workflow.proto +++ b/api/cron_workflow.proto @@ -107,7 +107,6 @@ message ListCronWorkflowsResponse { int32 totalCount = 5; } - message CronWorkflow { string name = 1; string schedule = 2; diff --git a/api/workflow_template.pb.go b/api/workflow_template.pb.go index 178dbb2..1bef0ff 100644 --- a/api/workflow_template.pb.go +++ b/api/workflow_template.pb.go @@ -333,6 +333,8 @@ func (m *ListWorkflowTemplateVersionsResponse) GetWorkflowTemplates() []*Workflo type ListWorkflowTemplatesRequest struct { Namespace string `protobuf:"bytes,1,opt,name=namespace,proto3" json:"namespace,omitempty"` + PageSize int32 `protobuf:"varint,2,opt,name=pageSize,proto3" json:"pageSize,omitempty"` + Page int32 `protobuf:"varint,3,opt,name=page,proto3" json:"page,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -370,9 +372,26 @@ func (m *ListWorkflowTemplatesRequest) GetNamespace() string { return "" } +func (m *ListWorkflowTemplatesRequest) GetPageSize() int32 { + if m != nil { + return m.PageSize + } + return 0 +} + +func (m *ListWorkflowTemplatesRequest) GetPage() int32 { + if m != nil { + return m.Page + } + return 0 +} + type ListWorkflowTemplatesResponse struct { Count int32 `protobuf:"varint,1,opt,name=count,proto3" json:"count,omitempty"` WorkflowTemplates []*WorkflowTemplate `protobuf:"bytes,2,rep,name=workflowTemplates,proto3" json:"workflowTemplates,omitempty"` + Page int32 `protobuf:"varint,3,opt,name=page,proto3" json:"page,omitempty"` + Pages int32 `protobuf:"varint,4,opt,name=pages,proto3" json:"pages,omitempty"` + TotalCount int32 `protobuf:"varint,5,opt,name=totalCount,proto3" json:"totalCount,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -417,6 +436,27 @@ func (m *ListWorkflowTemplatesResponse) GetWorkflowTemplates() []*WorkflowTempla return nil } +func (m *ListWorkflowTemplatesResponse) GetPage() int32 { + if m != nil { + return m.Page + } + return 0 +} + +func (m *ListWorkflowTemplatesResponse) GetPages() int32 { + if m != nil { + return m.Pages + } + return 0 +} + +func (m *ListWorkflowTemplatesResponse) GetTotalCount() int32 { + if m != nil { + return m.TotalCount + } + return 0 +} + type ArchiveWorkflowTemplateRequest struct { Namespace string `protobuf:"bytes,1,opt,name=namespace,proto3" json:"namespace,omitempty"` Uid string `protobuf:"bytes,2,opt,name=uid,proto3" json:"uid,omitempty"` @@ -767,69 +807,72 @@ func init() { func init() { proto.RegisterFile("workflow_template.proto", fileDescriptor_b9a07547748a96e8) } var fileDescriptor_b9a07547748a96e8 = []byte{ - // 987 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x97, 0x4f, 0x6f, 0x1b, 0x45, - 0x14, 0xc0, 0x35, 0x76, 0x9c, 0xc6, 0xaf, 0x20, 0xa5, 0x0f, 0x92, 0x6c, 0x96, 0xa4, 0x35, 0xd3, - 0x44, 0x72, 0x11, 0xb2, 0xd5, 0xc0, 0x29, 0x2d, 0xa2, 0x6e, 0x5a, 0x0a, 0x34, 0x07, 0xb4, 0xa5, - 0x45, 0xea, 0x05, 0x26, 0xbb, 0xe3, 0xb0, 0xea, 0x66, 0x77, 0xf1, 0x8c, 0x13, 0x4a, 0xb0, 0x44, - 0xe1, 0xc0, 0x8d, 0x0b, 0x7f, 0x04, 0x42, 0xea, 0x19, 0x09, 0x3e, 0x00, 0x67, 0x4e, 0xdc, 0xf3, - 0x15, 0xe0, 0xc0, 0xb7, 0x40, 0xfb, 0x76, 0xd7, 0x89, 0xed, 0x5d, 0xe3, 0x7f, 0xf4, 0xb6, 0x33, - 0xf3, 0xe6, 0xcd, 0xef, 0xfd, 0x99, 0x79, 0x6f, 0x61, 0xe5, 0x28, 0x68, 0x3d, 0x6a, 0x7a, 0xc1, - 0xd1, 0x87, 0x5a, 0x1e, 0x84, 0x9e, 0xd0, 0xb2, 0x16, 0xb6, 0x02, 0x1d, 0x60, 0x51, 0x84, 0xae, - 0xb9, 0xb6, 0x1f, 0x04, 0xfb, 0x9e, 0xac, 0x8b, 0xd0, 0xad, 0x0b, 0xdf, 0x0f, 0xb4, 0xd0, 0x6e, - 0xe0, 0xab, 0x58, 0xc4, 0x3c, 0xef, 0x89, 0x3d, 0xe9, 0xc5, 0x03, 0xfe, 0x05, 0x83, 0xf5, 0x9d, - 0x96, 0x14, 0x5a, 0x7e, 0x90, 0x68, 0x7c, 0x3f, 0x51, 0x68, 0xc9, 0x4f, 0xda, 0x52, 0x69, 0x5c, - 0x83, 0xb2, 0x2f, 0x0e, 0xa4, 0x0a, 0x85, 0x2d, 0x0d, 0x56, 0x61, 0xd5, 0xb2, 0x75, 0x3a, 0x81, - 0x0d, 0x58, 0x3c, 0xea, 0xdb, 0x68, 0x14, 0x2a, 0xac, 0x7a, 0x7e, 0x6b, 0xa9, 0x26, 0x42, 0xb7, - 0x36, 0xa0, 0x75, 0x40, 0x9c, 0x7f, 0xcd, 0x60, 0xe3, 0x7e, 0xe8, 0x64, 0x20, 0x3c, 0x90, 0x2d, - 0xe5, 0x06, 0xfe, 0x33, 0x23, 0x69, 0x82, 0x79, 0x47, 0xea, 0xc9, 0x1c, 0xb1, 0x08, 0xc5, 0xb6, - 0xeb, 0xd0, 0x89, 0x65, 0x2b, 0xfa, 0x44, 0x03, 0xce, 0x1d, 0xc6, 0x06, 0x18, 0xc5, 0x0a, 0xab, - 0x16, 0xad, 0x74, 0xc8, 0x3f, 0x87, 0xb5, 0x1d, 0x2f, 0xf0, 0xe5, 0xac, 0x4e, 0x42, 0x98, 0x8b, - 0x96, 0xe9, 0x98, 0xb2, 0x45, 0xdf, 0x67, 0x4f, 0x9f, 0xeb, 0x3d, 0xfd, 0x3e, 0x5c, 0xde, 0x75, - 0x95, 0xce, 0x71, 0xb6, 0x9a, 0x10, 0x82, 0x3f, 0x61, 0xb0, 0x31, 0x5c, 0xaf, 0x0a, 0x03, 0x5f, - 0x49, 0x7c, 0x11, 0x4a, 0x76, 0xd0, 0xf6, 0x35, 0x29, 0x2d, 0x59, 0xf1, 0x00, 0x77, 0xe0, 0x42, - 0x7f, 0x3c, 0x94, 0x51, 0xa8, 0x14, 0xf3, 0xe3, 0x37, 0x28, 0xcf, 0xaf, 0xc3, 0x5a, 0x16, 0xc2, - 0x68, 0x36, 0xf1, 0xcf, 0x60, 0x3d, 0x67, 0xf7, 0xff, 0x4f, 0xfe, 0x1e, 0x5c, 0x6c, 0xb4, 0xec, - 0x8f, 0xdd, 0xc3, 0x59, 0x25, 0x05, 0x77, 0xe0, 0x52, 0xae, 0xc6, 0xc4, 0x9e, 0xac, 0x2b, 0xc3, - 0xc6, 0xbb, 0x32, 0xbf, 0x31, 0xa8, 0xa4, 0x62, 0xb7, 0x3f, 0x95, 0x76, 0x3b, 0x7a, 0x69, 0xee, - 0x45, 0x0f, 0x8e, 0xd2, 0xae, 0x6d, 0xc9, 0x30, 0x68, 0xe9, 0xc8, 0x6f, 0x3a, 0xd0, 0xc2, 0x4b, - 0xfd, 0x46, 0x03, 0xe4, 0xf0, 0x9c, 0x27, 0x94, 0x8e, 0x77, 0xc9, 0x94, 0xbd, 0x67, 0x2e, 0xca, - 0xe2, 0x56, 0xdb, 0xf7, 0x5d, 0x7f, 0x9f, 0x92, 0xbb, 0x64, 0xa5, 0xc3, 0xc8, 0x1d, 0x76, 0x70, - 0x10, 0x7a, 0x32, 0xda, 0x3a, 0x47, 0x6b, 0xa7, 0x13, 0xb8, 0x0c, 0xf3, 0x4d, 0xe1, 0x7a, 0xd2, - 0x31, 0x4a, 0xb4, 0x94, 0x8c, 0xf8, 0x9f, 0x05, 0x58, 0xec, 0xb7, 0x8a, 0x54, 0xd1, 0x13, 0xe8, - 0x34, 0x74, 0xea, 0xd9, 0xee, 0xc4, 0xb4, 0xd7, 0x0d, 0x4d, 0x58, 0x48, 0x3e, 0x15, 0xc1, 0x14, - 0xad, 0xee, 0x38, 0x5a, 0x3b, 0x10, 0xbe, 0xdb, 0x94, 0x4a, 0x1b, 0xf3, 0xa4, 0xad, 0x3b, 0x8e, - 0xd6, 0x5c, 0xb5, 0x1b, 0x25, 0x87, 0x36, 0xce, 0x55, 0x58, 0x75, 0xc1, 0xea, 0x8e, 0xf1, 0x22, - 0x80, 0xab, 0x92, 0xe8, 0x3a, 0xc6, 0x02, 0xad, 0x9e, 0x99, 0xc1, 0x4d, 0x98, 0xa7, 0x47, 0x5e, - 0x19, 0x65, 0xca, 0xc3, 0xe7, 0x29, 0x9c, 0x77, 0xe5, 0xe3, 0x07, 0xc2, 0x6b, 0x4b, 0x2b, 0x59, - 0xc4, 0x6b, 0x50, 0x52, 0x5a, 0x68, 0x65, 0x00, 0x05, 0x7d, 0xb3, 0x27, 0xe8, 0x79, 0xd1, 0xb4, - 0xe2, 0x3d, 0xfc, 0x57, 0x06, 0x95, 0x8c, 0xd7, 0x72, 0x97, 0x54, 0x8f, 0x96, 0xb4, 0xa9, 0x23, - 0x0b, 0xd9, 0x8e, 0xec, 0x7d, 0x35, 0x4f, 0x69, 0x17, 0xc6, 0xa7, 0xdd, 0xfa, 0xe9, 0x02, 0xac, - 0xf4, 0xa3, 0xde, 0x93, 0xad, 0x43, 0xd7, 0x96, 0xf8, 0x33, 0x83, 0xe5, 0xec, 0x1a, 0x88, 0x9c, - 0x0e, 0x19, 0x5a, 0x20, 0xcd, 0xec, 0xbb, 0xc2, 0xdf, 0xfa, 0xf2, 0xe4, 0xaf, 0x6f, 0x0b, 0x37, - 0xf8, 0xab, 0x51, 0x19, 0x56, 0xf5, 0xc3, 0xab, 0x7b, 0x52, 0x8b, 0xab, 0xf5, 0xe3, 0xae, 0xf9, - 0x9d, 0xfa, 0x40, 0x11, 0x57, 0xdb, 0x03, 0x37, 0x0c, 0xff, 0x66, 0xb0, 0x3e, 0xb4, 0x3c, 0xe2, - 0x15, 0x02, 0x18, 0xa5, 0x84, 0xe6, 0xb1, 0x3e, 0x61, 0x04, 0x7b, 0x6c, 0xee, 0x8f, 0x03, 0x5b, - 0x3f, 0xee, 0x87, 0xad, 0xb5, 0x5d, 0xa7, 0x53, 0x4f, 0xd3, 0x3b, 0x63, 0x3d, 0x59, 0xea, 0x64, - 0xd8, 0xf9, 0x47, 0x6e, 0x27, 0x92, 0xda, 0x39, 0x45, 0x30, 0x9a, 0x64, 0xdf, 0x47, 0x7c, 0x77, - 0x96, 0xf6, 0x65, 0x18, 0x71, 0xc2, 0xe0, 0x85, 0x8c, 0x4b, 0x81, 0x97, 0x08, 0x2b, 0xbf, 0xb9, - 0xc8, 0xe3, 0xfe, 0x2a, 0x0e, 0x4c, 0x07, 0xb7, 0xc6, 0x03, 0x8f, 0x38, 0x1f, 0xde, 0xc1, 0xdb, - 0xe3, 0xef, 0x3a, 0x13, 0xbd, 0x34, 0x58, 0xf8, 0x3b, 0xcb, 0xae, 0xab, 0x69, 0x69, 0xc7, 0x2a, - 0xd1, 0x8f, 0xd0, 0x55, 0x98, 0x57, 0x46, 0x90, 0x8c, 0xab, 0x13, 0xdf, 0x21, 0xd3, 0xdf, 0xc0, - 0x6b, 0x53, 0x18, 0x81, 0xdf, 0x31, 0x58, 0xca, 0x2c, 0xea, 0xf8, 0x72, 0x2e, 0x49, 0x17, 0x96, - 0x0f, 0x13, 0x49, 0x28, 0x5f, 0x27, 0xca, 0x1a, 0x8e, 0x75, 0xcd, 0xf1, 0x1f, 0x06, 0x4b, 0x99, - 0x2d, 0x60, 0x82, 0x35, 0xac, 0x3d, 0xcc, 0xcb, 0x95, 0xef, 0xe3, 0x5c, 0xf9, 0x86, 0xe1, 0x9b, - 0x13, 0x78, 0xcc, 0x8e, 0x4e, 0x8c, 0x45, 0x3b, 0x0f, 0xef, 0xe2, 0x3b, 0x53, 0xaa, 0x38, 0x93, - 0x3d, 0xbf, 0x30, 0x58, 0xc9, 0xe9, 0x44, 0xf0, 0x32, 0x99, 0x32, 0xbc, 0xf3, 0x31, 0x37, 0x86, - 0x0b, 0x25, 0x81, 0xb8, 0x49, 0xd6, 0x5f, 0x37, 0xb7, 0x27, 0x20, 0x17, 0xb1, 0x6e, 0x7c, 0xca, - 0x60, 0x35, 0xb7, 0xa6, 0xe1, 0x66, 0xde, 0x25, 0xee, 0xa9, 0x79, 0xe6, 0x72, 0x2a, 0x96, 0x4e, - 0xf7, 0x02, 0xe2, 0x98, 0x80, 0xb1, 0x3b, 0x93, 0x8a, 0xfd, 0x03, 0x83, 0xd5, 0x86, 0xe3, 0xe4, - 0x00, 0xc6, 0x89, 0xd1, 0x70, 0x9c, 0xd1, 0x80, 0xde, 0x25, 0xa0, 0x5b, 0x7c, 0x0a, 0xa0, 0xed, - 0xb4, 0x95, 0x78, 0xca, 0x60, 0xdd, 0x92, 0xa1, 0x27, 0x6c, 0x99, 0x03, 0xb7, 0x4a, 0x14, 0x89, - 0xcc, 0x58, 0x80, 0xe6, 0x2c, 0x00, 0x7f, 0x64, 0xf0, 0xd2, 0x2d, 0x19, 0x75, 0x87, 0x99, 0x7c, - 0xb8, 0x42, 0x0c, 0xb1, 0x04, 0xcd, 0xfc, 0x17, 0xdc, 0xdb, 0x04, 0x77, 0xf3, 0x95, 0x1b, 0x93, - 0xc3, 0xd5, 0x8f, 0x1f, 0xc9, 0xc7, 0x9d, 0xbd, 0x79, 0xfa, 0x15, 0x7f, 0xed, 0xdf, 0x00, 0x00, - 0x00, 0xff, 0xff, 0xa7, 0xb2, 0x1d, 0x6c, 0xd5, 0x0f, 0x00, 0x00, + // 1031 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x98, 0xcd, 0x6f, 0x1b, 0x45, + 0x14, 0xc0, 0x35, 0x76, 0x9d, 0xc6, 0xaf, 0x20, 0xa5, 0x03, 0x49, 0x36, 0x4b, 0xd2, 0x9a, 0x69, + 0x22, 0xb9, 0x08, 0xd9, 0x6a, 0xe0, 0x94, 0x82, 0xa8, 0xeb, 0x96, 0x02, 0xcd, 0x01, 0x6d, 0x68, + 0x91, 0x7a, 0x81, 0x89, 0x77, 0x6c, 0x56, 0xdd, 0xec, 0x2e, 0x9e, 0x71, 0x42, 0x09, 0x96, 0x28, + 0x1c, 0xb8, 0x71, 0xe1, 0x43, 0x20, 0xa4, 0x9e, 0x91, 0xe0, 0x0f, 0xe0, 0x8c, 0x38, 0x70, 0xcf, + 0xbf, 0x00, 0x07, 0xfe, 0x0b, 0x34, 0x6f, 0x76, 0x1d, 0x7f, 0xec, 0x1a, 0x7f, 0x84, 0x9e, 0xbc, + 0x6f, 0xe6, 0xed, 0x9b, 0xdf, 0xfb, 0x98, 0x7d, 0x4f, 0x86, 0xd5, 0xa3, 0xb0, 0xfd, 0xb0, 0xe9, + 0x87, 0x47, 0x1f, 0x28, 0x71, 0x10, 0xf9, 0x5c, 0x89, 0x4a, 0xd4, 0x0e, 0x55, 0x48, 0xf3, 0x3c, + 0xf2, 0xec, 0xf5, 0x56, 0x18, 0xb6, 0x7c, 0x51, 0xe5, 0x91, 0x57, 0xe5, 0x41, 0x10, 0x2a, 0xae, + 0xbc, 0x30, 0x90, 0x46, 0xc5, 0xbe, 0xe0, 0xf3, 0x7d, 0xe1, 0x1b, 0x81, 0x7d, 0x4e, 0x60, 0xa3, + 0xde, 0x16, 0x5c, 0x89, 0xf7, 0x63, 0x8b, 0xef, 0xc5, 0x06, 0x1d, 0xf1, 0x71, 0x47, 0x48, 0x45, + 0xd7, 0xa1, 0x18, 0xf0, 0x03, 0x21, 0x23, 0xde, 0x10, 0x16, 0x29, 0x91, 0x72, 0xd1, 0x39, 0x5d, + 0xa0, 0x35, 0x58, 0x3a, 0x1a, 0x7a, 0xd1, 0xca, 0x95, 0x48, 0xf9, 0xc2, 0xf6, 0x72, 0x85, 0x47, + 0x5e, 0x65, 0xc4, 0xea, 0x88, 0x3a, 0xfb, 0x8a, 0xc0, 0xe6, 0xbd, 0xc8, 0x4d, 0x41, 0xb8, 0x2f, + 0xda, 0xd2, 0x0b, 0x83, 0xa7, 0x46, 0xd2, 0x04, 0xfb, 0x8e, 0x50, 0xb3, 0x05, 0x62, 0x09, 0xf2, + 0x1d, 0xcf, 0xc5, 0x13, 0x8b, 0x8e, 0x7e, 0xa4, 0x16, 0x9c, 0x3f, 0x34, 0x0e, 0x58, 0xf9, 0x12, + 0x29, 0xe7, 0x9d, 0x44, 0x64, 0x9f, 0xc1, 0x7a, 0xdd, 0x0f, 0x03, 0x71, 0x56, 0x27, 0x51, 0x38, + 0xa7, 0xb7, 0xf1, 0x98, 0xa2, 0x83, 0xcf, 0xfd, 0xa7, 0x9f, 0x1b, 0x3c, 0xfd, 0x1e, 0x5c, 0xd9, + 0xf5, 0xa4, 0xca, 0x08, 0xb6, 0x9c, 0x11, 0x82, 0x3d, 0x26, 0xb0, 0x39, 0xde, 0xae, 0x8c, 0xc2, + 0x40, 0x0a, 0xfa, 0x3c, 0x14, 0x1a, 0x61, 0x27, 0x50, 0x68, 0xb4, 0xe0, 0x18, 0x81, 0xd6, 0xe1, + 0xe2, 0x70, 0x3e, 0xa4, 0x95, 0x2b, 0xe5, 0xb3, 0xf3, 0x37, 0xaa, 0xcf, 0x7c, 0x58, 0x4f, 0x43, + 0x98, 0xd0, 0x27, 0x1b, 0x16, 0x23, 0xde, 0x12, 0x7b, 0xde, 0xa7, 0xa6, 0x72, 0x0a, 0x4e, 0x4f, + 0xd6, 0x21, 0xd6, 0xcf, 0x18, 0xe2, 0x82, 0x83, 0xcf, 0xec, 0x0f, 0x02, 0x1b, 0x19, 0xc7, 0xfd, + 0xef, 0xae, 0xa6, 0x01, 0xe9, 0xe3, 0xf4, 0xaf, 0xc4, 0x8c, 0x17, 0x1c, 0x23, 0xd0, 0x4b, 0x00, + 0x2a, 0x54, 0xdc, 0xaf, 0x23, 0x49, 0x01, 0xb7, 0xfa, 0x56, 0xd8, 0xbb, 0x70, 0xa9, 0xd6, 0x6e, + 0x7c, 0xe4, 0x1d, 0x9e, 0x55, 0x3d, 0x32, 0x17, 0x2e, 0x67, 0x5a, 0x8c, 0x23, 0x93, 0x76, 0x5b, + 0xc9, 0x74, 0xb7, 0xf5, 0x57, 0x02, 0xa5, 0x44, 0xed, 0xf6, 0x27, 0xa2, 0xd1, 0xd1, 0x1f, 0xb9, + 0x3d, 0xfd, 0xad, 0x93, 0xca, 0x6b, 0x38, 0x22, 0x0a, 0xdb, 0x4a, 0x87, 0x04, 0x5d, 0x4d, 0x32, + 0x80, 0x02, 0x65, 0xf0, 0x8c, 0xcf, 0xa5, 0x32, 0x6f, 0x89, 0x84, 0x7d, 0x60, 0x4d, 0x5f, 0xa0, + 0x76, 0x27, 0x08, 0xbc, 0xa0, 0x15, 0xc7, 0x38, 0x11, 0x75, 0x38, 0x1a, 0xe1, 0x41, 0xe4, 0x0b, + 0xfd, 0xaa, 0x09, 0xf5, 0xe9, 0x02, 0x5d, 0x81, 0x85, 0x26, 0xf7, 0x7c, 0xe1, 0xc6, 0xa1, 0x8e, + 0x25, 0xf6, 0x67, 0x0e, 0x96, 0x86, 0xbd, 0x42, 0x53, 0xf8, 0xf5, 0x75, 0x6b, 0x2a, 0x89, 0x6c, + 0x6f, 0x61, 0xde, 0x9b, 0xae, 0x0b, 0x3a, 0x7e, 0x94, 0x08, 0x93, 0x77, 0x7a, 0xb2, 0xde, 0x3b, + 0xe0, 0x81, 0xd7, 0x14, 0x52, 0x59, 0x0b, 0x68, 0xad, 0x27, 0xeb, 0x3d, 0x4f, 0xee, 0xea, 0x32, + 0x53, 0xd6, 0xf9, 0x12, 0x29, 0x2f, 0x3a, 0x3d, 0x59, 0x57, 0x93, 0x27, 0xe3, 0xec, 0xba, 0xd6, + 0x22, 0xee, 0xf6, 0xad, 0xd0, 0x2d, 0x58, 0xc0, 0xfe, 0x22, 0xad, 0x22, 0x56, 0xf4, 0xb3, 0x98, + 0xce, 0xbb, 0xe2, 0xd1, 0x7d, 0xee, 0x77, 0x84, 0x13, 0x6f, 0xd2, 0xeb, 0x50, 0x90, 0x8a, 0x2b, + 0x69, 0x01, 0x26, 0x7d, 0x6b, 0x20, 0xe9, 0x59, 0xd9, 0x74, 0xcc, 0x3b, 0xec, 0x17, 0x02, 0xa5, + 0x94, 0x0f, 0xf5, 0x2e, 0x9a, 0x9e, 0xac, 0x68, 0x93, 0x40, 0xe6, 0xd2, 0x03, 0x39, 0xf8, 0xc1, + 0x3e, 0xa5, 0x5d, 0x9c, 0x9e, 0x76, 0xfb, 0xc7, 0x8b, 0xb0, 0x3a, 0x8c, 0xba, 0x27, 0xda, 0x87, + 0x5e, 0x43, 0xd0, 0x9f, 0x08, 0xac, 0xa4, 0xb7, 0x5f, 0xca, 0xf0, 0x90, 0xb1, 0xbd, 0xd9, 0x4e, + 0xbf, 0x2b, 0xec, 0xcd, 0x2f, 0x4e, 0xfe, 0xfa, 0x26, 0x77, 0x83, 0xbd, 0xac, 0x27, 0x00, 0x59, + 0x3d, 0xbc, 0xb6, 0x2f, 0x14, 0xbf, 0x56, 0x3d, 0xee, 0xb9, 0xdf, 0xad, 0x8e, 0xcc, 0x0f, 0x72, + 0x67, 0xe4, 0x86, 0xd1, 0xbf, 0x09, 0x6c, 0x8c, 0xed, 0xcc, 0xf4, 0x2a, 0x02, 0x4c, 0xd2, 0xbd, + 0xb3, 0x58, 0x1f, 0x13, 0x84, 0x3d, 0xb6, 0x5b, 0xd3, 0xc0, 0x56, 0x8f, 0x87, 0x61, 0x2b, 0x1d, + 0xcf, 0xed, 0x56, 0x93, 0xf2, 0x4e, 0xd9, 0x8f, 0xb7, 0xba, 0x29, 0x7e, 0xfe, 0x9e, 0x39, 0x04, + 0x25, 0x7e, 0xce, 0x91, 0x8c, 0x26, 0xfa, 0xf7, 0x21, 0xdb, 0x3d, 0x4b, 0xff, 0x52, 0x9c, 0x38, + 0x21, 0xf0, 0x5c, 0xca, 0xa5, 0xa0, 0x97, 0x11, 0x2b, 0x7b, 0xae, 0xc9, 0xe2, 0xfe, 0xd2, 0x24, + 0xa6, 0x4b, 0xb7, 0xa7, 0x03, 0xd7, 0x9c, 0x0f, 0xee, 0xd0, 0xdb, 0xd3, 0xbf, 0xd5, 0x97, 0xbd, + 0x24, 0x59, 0xf4, 0x37, 0x92, 0xde, 0xd2, 0x93, 0xa9, 0x82, 0x96, 0x91, 0x7e, 0x82, 0x81, 0xc6, + 0xbe, 0x3a, 0x81, 0xa6, 0xe9, 0x4e, 0xac, 0x8e, 0xae, 0xbf, 0x4e, 0xaf, 0xcf, 0xe1, 0x04, 0xfd, + 0x96, 0xc0, 0x72, 0xea, 0x78, 0x40, 0x5f, 0xcc, 0x24, 0xe9, 0xc1, 0xb2, 0x71, 0x2a, 0x31, 0xe5, + 0xab, 0x48, 0x59, 0xa1, 0x53, 0x5d, 0x73, 0xfa, 0x0f, 0x81, 0xe5, 0xd4, 0xe9, 0x33, 0xc6, 0x1a, + 0x37, 0x99, 0x66, 0xd5, 0xca, 0x77, 0xa6, 0x56, 0xbe, 0x26, 0xf4, 0x8d, 0x19, 0x22, 0xd6, 0xd0, + 0x27, 0x1a, 0xd5, 0xee, 0x83, 0xbb, 0xf4, 0xed, 0x39, 0x4d, 0xf4, 0x55, 0xcf, 0xcf, 0x04, 0x56, + 0x33, 0x26, 0x11, 0x7a, 0x05, 0x5d, 0x19, 0x3f, 0xf9, 0xd8, 0x9b, 0xe3, 0x95, 0xe2, 0x44, 0xdc, + 0x44, 0xef, 0x5f, 0xb3, 0x77, 0x66, 0x20, 0xe7, 0xc6, 0x36, 0x7d, 0x42, 0x60, 0x2d, 0xb3, 0xa7, + 0xd1, 0xad, 0xac, 0x4b, 0x3c, 0xd0, 0xf3, 0xec, 0x95, 0x44, 0x2d, 0x59, 0x1e, 0x04, 0xa4, 0x53, + 0x02, 0x9a, 0x70, 0xc6, 0x1d, 0xfb, 0x7b, 0x02, 0x6b, 0x35, 0xd7, 0xcd, 0x00, 0x34, 0x85, 0x51, + 0x73, 0xdd, 0xc9, 0x80, 0xde, 0x41, 0xa0, 0x5b, 0x6c, 0x0e, 0xa0, 0x9d, 0x64, 0x94, 0x78, 0x42, + 0x60, 0xc3, 0x11, 0x91, 0xcf, 0x1b, 0x22, 0x03, 0x6e, 0x0d, 0x29, 0x62, 0x9d, 0xa9, 0x00, 0xed, + 0xb3, 0x00, 0xfc, 0x81, 0xc0, 0x0b, 0xb7, 0x84, 0x9e, 0x0e, 0x53, 0xf9, 0xe8, 0x2a, 0x32, 0x18, + 0x0d, 0x5c, 0xf9, 0x2f, 0xb8, 0xb7, 0x10, 0xee, 0xe6, 0x4b, 0x37, 0x66, 0x87, 0xab, 0x1e, 0x3f, + 0x14, 0x8f, 0xba, 0xfb, 0x0b, 0xf8, 0x2f, 0xc0, 0x2b, 0xff, 0x06, 0x00, 0x00, 0xff, 0xff, 0x1b, + 0x1a, 0x80, 0x9b, 0x50, 0x10, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. diff --git a/api/workflow_template.pb.gw.go b/api/workflow_template.pb.gw.go index 7169bee..39ce7af 100644 --- a/api/workflow_template.pb.gw.go +++ b/api/workflow_template.pb.gw.go @@ -572,6 +572,10 @@ func local_request_WorkflowTemplateService_ListWorkflowTemplateVersions_0(ctx co } +var ( + filter_WorkflowTemplateService_ListWorkflowTemplates_0 = &utilities.DoubleArray{Encoding: map[string]int{"namespace": 0}, Base: []int{1, 1, 0}, Check: []int{0, 1, 2}} +) + func request_WorkflowTemplateService_ListWorkflowTemplates_0(ctx context.Context, marshaler runtime.Marshaler, client WorkflowTemplateServiceClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { var protoReq ListWorkflowTemplatesRequest var metadata runtime.ServerMetadata @@ -594,6 +598,13 @@ func request_WorkflowTemplateService_ListWorkflowTemplates_0(ctx context.Context return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "namespace", err) } + if err := req.ParseForm(); err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + if err := runtime.PopulateQueryParameters(&protoReq, req.Form, filter_WorkflowTemplateService_ListWorkflowTemplates_0); err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + msg, err := client.ListWorkflowTemplates(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) return msg, metadata, err @@ -621,6 +632,10 @@ func local_request_WorkflowTemplateService_ListWorkflowTemplates_0(ctx context.C return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "namespace", err) } + if err := runtime.PopulateQueryParameters(&protoReq, req.URL.Query(), filter_WorkflowTemplateService_ListWorkflowTemplates_0); err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + msg, err := server.ListWorkflowTemplates(ctx, &protoReq) return msg, metadata, err diff --git a/api/workflow_template.proto b/api/workflow_template.proto index cd0fea5..916dd9f 100644 --- a/api/workflow_template.proto +++ b/api/workflow_template.proto @@ -125,11 +125,16 @@ message ListWorkflowTemplateVersionsResponse { message ListWorkflowTemplatesRequest { string namespace = 1; + int32 pageSize = 2; + int32 page = 3; } message ListWorkflowTemplatesResponse { int32 count = 1; repeated WorkflowTemplate workflowTemplates = 2; + int32 page = 3; + int32 pages = 4; + int32 totalCount = 5; } message ArchiveWorkflowTemplateRequest { diff --git a/db/20200422140125_add_cron_workflows.sql b/db/20200422140125_add_cron_workflows.sql new file mode 100644 index 0000000..d675af1 --- /dev/null +++ b/db/20200422140125_add_cron_workflows.sql @@ -0,0 +1,23 @@ +-- +goose Up +CREATE TABLE cron_workflows +( + id serial PRIMARY KEY, + uid varchar(36) UNIQUE NOT NULL CHECK(uid <> ''), + name varchar(255), + workflow_template_version_id INT REFERENCES workflow_template_versions, + schedule varchar(255), + timezone varchar(255), + suspend boolean, + concurrency_policy varchar(255), + starting_deadline_seconds INT, + successful_jobs_history_limit INT, + failed_jobs_history_limit INT, + workflow_spec TEXT, + + -- auditing info + created_at timestamp NOT NULL DEFAULT (NOW() at time zone 'utc'), + modified_at timestamp +); + +-- +goose Down +DROP TABLE cron_workflows; diff --git a/db/20200422160902_add_labels.sql b/db/20200422160902_add_labels.sql new file mode 100644 index 0000000..1a02440 --- /dev/null +++ b/db/20200422160902_add_labels.sql @@ -0,0 +1,15 @@ +-- +goose Up +CREATE TABLE labels +( + id serial PRIMARY KEY, + key varchar(255), + value varchar(255), + resource varchar(255), + resource_id INTEGER, + + -- auditing info + created_at timestamp NOT NULL DEFAULT (NOW() at time zone 'utc') +); + +-- +goose Down +DROP TABLE labels; diff --git a/db/20200424132932_add_started_and_version_to_workflow_executions.sql b/db/20200424132932_add_started_and_version_to_workflow_executions.sql new file mode 100644 index 0000000..1058713 --- /dev/null +++ b/db/20200424132932_add_started_and_version_to_workflow_executions.sql @@ -0,0 +1,24 @@ +-- +goose Up +ALTER TABLE workflow_executions + ADD COLUMN started_at TIMESTAMP, + ADD COLUMN workflow_template_version_id INT REFERENCES workflow_template_versions, + ADD COLUMN phase VARCHAR(50), + ADD COLUMN cron_workflow_id INT REFERENCES cron_workflows, + DROP COLUMN failed_at, + DROP COLUMN workflow_template_id +; + +UPDATE workflow_executions + SET started_at = created_at, + phase = 'Succeeded' +; + +-- +goose Down +ALTER TABLE workflow_executions + DROP COLUMN started_at, + DROP COLUMN workflow_template_version_id, + DROP COLUMN phase, + DROP COLUMN cron_workflow_id, + ADD COLUMN failed_at TIMESTAMP, + ADD COLUMN workflow_template_id INT +; diff --git a/pkg/cron_workflow.go b/pkg/cron_workflow.go index 160f7c6..e321eb6 100644 --- a/pkg/cron_workflow.go +++ b/pkg/cron_workflow.go @@ -2,15 +2,17 @@ package v1 import ( "fmt" + sq "github.com/Masterminds/squirrel" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" argojson "github.com/argoproj/pkg/json" "github.com/onepanelio/core/pkg/util" "github.com/onepanelio/core/pkg/util/label" + "github.com/onepanelio/core/pkg/util/pagination" log "github.com/sirupsen/logrus" "google.golang.org/grpc/codes" + "gopkg.in/yaml.v2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "regexp" - "sort" "strings" ) @@ -162,6 +164,32 @@ func (c *Client) CreateCronWorkflow(namespace string, cronWorkflow *CronWorkflow // Manifests could get big, don't return them in this case. cronWorkflow.WorkflowExecution.WorkflowTemplate.Manifest = "" + workflowSpec, err := yaml.Marshal(argoCreatedCronWorkflow.Spec.WorkflowSpec) + if err != nil { + return nil, err + } + _, err = sb.Insert("cron_workflows"). + SetMap(sq.Eq{ + "uid": cronWorkflow.UID, + "name": cronWorkflow.Name, + "workflow_template_version_id": workflowTemplate.WorkflowTemplateVersionId, + "schedule": cronWorkflow.Schedule, + "timezone": cronWorkflow.Timezone, + "suspend": cronWorkflow.Suspend, + "concurrency_policy": cronWorkflow.ConcurrencyPolicy, + "starting_deadline_seconds": cronWorkflow.StartingDeadlineSeconds, + "successful_jobs_history_limit": cronWorkflow.SuccessfulJobsHistoryLimit, + "failed_jobs_history_limit": cronWorkflow.FailedJobsHistoryLimit, + "workflow_spec": workflowSpec, + }). + Suffix("RETURNING id"). + RunWith(c.DB.DB). + Exec() + + if err != nil { + return nil, err + } + return cronWorkflow, nil } return nil, nil @@ -261,53 +289,42 @@ func (c *Client) DeleteCronWorkflowLabel(namespace, name string, keysToDelete .. return wf.Labels, nil } -func (c *Client) ListCronWorkflows(namespace, workflowTemplateUID string) (cronWorkflows []*CronWorkflow, err error) { - listOptions := ListOptions{} - if workflowTemplateUID != "" { - listOptions.LabelSelector = "onepanel.io/workflow-template-uid=" + workflowTemplateUID - } - cronWorkflowList, err := c.ArgoprojV1alpha1().CronWorkflows(namespace).List(listOptions) +func (c *Client) ListCronWorkflows(namespace, workflowTemplateUID string, pagination *pagination.PaginationRequest) (cronWorkflows []*CronWorkflow, err error) { + sb := c.cronWorkflowSelectBuilder(namespace, workflowTemplateUID). + OrderBy("cw.created_at DESC") + + sb = *pagination.ApplyToSelect(&sb) + query, args, err := sb.ToSql() + if err != nil { - log.WithFields(log.Fields{ - "Namespace": namespace, - "Error": err.Error(), - }).Error("CronWorkflows not found.") - return nil, util.NewUserError(codes.NotFound, "CronWorkflows not found.") + return nil, err } - cwfs := cronWorkflowList.Items - sort.Slice(cwfs, func(i, j int) bool { - ith := cwfs[i].CreationTimestamp.Time - jth := cwfs[j].CreationTimestamp.Time - //Most recent first - return ith.After(jth) - }) - for _, cwf := range cwfs { - var parameters []WorkflowExecutionParameter + if err := c.DB.Select(&cronWorkflows, query, args...); err != nil { + return nil, err + } - for _, param := range cwf.Spec.WorkflowSpec.Arguments.Parameters { - parameters = append(parameters, WorkflowExecutionParameter{ - Name: param.Name, - Value: param.Value, - }) + for _, cwf := range cronWorkflows { + parameters, err := cwf.GetParametersFromWorkflowSpec() + if err != nil { + continue } - cronWorkflows = append(cronWorkflows, &CronWorkflow{ - CreatedAt: cwf.CreationTimestamp.UTC(), - UID: string(cwf.ObjectMeta.UID), - Name: cwf.Name, - Schedule: cwf.Spec.Schedule, - Timezone: cwf.Spec.Timezone, - Suspend: cwf.Spec.Suspend, - ConcurrencyPolicy: string(cwf.Spec.ConcurrencyPolicy), - StartingDeadlineSeconds: cwf.Spec.StartingDeadlineSeconds, - SuccessfulJobsHistoryLimit: cwf.Spec.SuccessfulJobsHistoryLimit, - FailedJobsHistoryLimit: cwf.Spec.FailedJobsHistoryLimit, - WorkflowExecution: &WorkflowExecution{ - Parameters: parameters, - }, - }) + cwf.WorkflowExecution = &WorkflowExecution{ + Parameters: parameters, + } } + + return +} + +func (c *Client) CountCronWorkflows(namespace, workflowTemplateUID string) (count int, err error) { + err = c.cronWorkflowSelectBuilderNoColumns(namespace, workflowTemplateUID). + Columns("COUNT(*)"). + RunWith(c.DB.DB). + QueryRow(). + Scan(&count) + return } @@ -434,6 +451,7 @@ func (c *Client) createCronWorkflow(namespace string, workflowTemplateId *uint64 newParams = append(newParams, param) } cwf.Spec.WorkflowSpec.Arguments.Parameters = newParams + wf.Spec.Arguments.Parameters = newParams } if opts.Labels != nil { cwf.ObjectMeta.Labels = *opts.Labels @@ -471,7 +489,46 @@ func (c *Client) createCronWorkflow(namespace string, workflowTemplateId *uint64 } func (c *Client) TerminateCronWorkflow(namespace, name string) (err error) { + query, args, err := sb.Select(). + Columns("cw.id", "cw.created_at", "cw.uid", "cw.name", "cw.workflow_template_version_id"). + Columns("cw.schedule", "cw.timezone", "cw.suspend", "cw.concurrency_policy", "cw.starting_deadline_seconds"). + Columns("cw.successful_jobs_history_limit", "cw.failed_jobs_history_limit", "cw.workflow_spec", "wtv.version"). + From("cron_workflows cw"). + Join("workflow_template_versions wtv ON wtv.id = cw.workflow_template_version_id"). + Join("workflow_templates wt ON wt.id = wtv.workflow_template_id"). + Where(sq.Eq{ + "wt.namespace": namespace, + "cw.name": name, + }). + ToSql() + + cronWorkflow := &CronWorkflow{} + if err := c.DB.Get(cronWorkflow, query, args...); err != nil { + return err + } + + query = `DELETE FROM workflow_executions + WHERE cron_workflow_id = $1` + + if _, err := c.DB.Exec(query, cronWorkflow.ID); err != nil { + return err + } + + query = `DELETE FROM cron_workflows + USING workflow_template_versions, workflow_templates + WHERE cron_workflows.workflow_template_version_id = workflow_template_versions.id + AND workflow_template_versions.workflow_template_id = workflow_templates.id + AND workflow_templates.namespace = $1 + AND cron_workflows.name = $2` + if _, err := c.DB.Exec(query, namespace, name); err != nil { + return err + } + err = c.ArgoprojV1alpha1().CronWorkflows(namespace).Delete(name, nil) + if err != nil && strings.Contains(err.Error(), "not found") { + err = nil + } + return } @@ -487,3 +544,25 @@ func unmarshalCronWorkflows(cwfBytes []byte, strict bool) (cwfs wfv1.CronWorkflo } return } + +func (c *Client) cronWorkflowSelectBuilder(namespace string, workflowTemplateUid string) sq.SelectBuilder { + sb := c.cronWorkflowSelectBuilderNoColumns(namespace, workflowTemplateUid). + Columns("cw.id", "cw.created_at", "cw.uid", "cw.name", "cw.workflow_template_version_id"). + Columns("cw.schedule", "cw.timezone", "cw.suspend", "cw.concurrency_policy", "cw.starting_deadline_seconds"). + Columns("cw.successful_jobs_history_limit", "cw.failed_jobs_history_limit", "cw.workflow_spec", "wtv.version") + + return sb +} + +func (c *Client) cronWorkflowSelectBuilderNoColumns(namespace string, workflowTemplateUid string) sq.SelectBuilder { + sb := sb.Select(). + From("cron_workflows cw"). + Join("workflow_template_versions wtv ON wtv.id = cw.workflow_template_version_id"). + Join("workflow_templates wt ON wt.id = wtv.workflow_template_id"). + Where(sq.Eq{ + "wt.namespace": namespace, + "wt.uid": workflowTemplateUid, + }) + + return sb +} diff --git a/pkg/labels.go b/pkg/labels.go new file mode 100644 index 0000000..1e7de5d --- /dev/null +++ b/pkg/labels.go @@ -0,0 +1,82 @@ +package v1 + +import ( + sq "github.com/Masterminds/squirrel" +) + +func (c *Client) InsertLabelsBuilder(resource string, resourceId uint64, keyValues map[string]string) sq.InsertBuilder { + sb := sb.Insert("labels"). + Columns("resource", "resource_id", "key", "value") + + for key, value := range keyValues { + sb = sb.Values(resource, resourceId, key, value) + } + + return sb +} + +func (c *Client) GetDbLabels(resource string, ids ...uint64) (labels []*Label, err error) { + if len(ids) == 0 { + return make([]*Label, 0), nil + } + + tx, err := c.DB.Begin() + if err != nil { + return nil, err + } + + whereIn := "resource_id IN (?" + for i := range ids { + if i == 0 { + continue + } + + whereIn += ",?" + } + whereIn += ")" + + defer tx.Rollback() + + query, args, err := sb.Select("id", "key", "value", "resource", "resource_id"). + From("labels"). + Where(whereIn, ids). + Where(sq.Eq{ + "resource": resource, + }). + ToSql() + + if err != nil { + return nil, err + } + + allArgs := make([]interface{}, 0) + for _, arg := range args[0].([]uint64) { + allArgs = append(allArgs, arg) + } + allArgs = append(allArgs, args[1]) + + err = c.DB.Select(&labels, query, allArgs...) + if err != nil { + return nil, err + } + + return +} + +func (c *Client) GetDbLabelsMapped(resource string, ids ...uint64) (result map[uint64]map[string]string, err error) { + dbLabels, err := c.GetDbLabels(resource, ids...) + if err != nil { + return + } + + result = make(map[uint64]map[string]string) + for _, dbLabel := range dbLabels { + _, ok := result[dbLabel.ResourceId] + if !ok { + result[dbLabel.ResourceId] = make(map[string]string) + } + result[dbLabel.ResourceId][dbLabel.Key] = dbLabel.Value + } + + return +} diff --git a/pkg/types.go b/pkg/types.go index 3a8f9ee..62e60bb 100644 --- a/pkg/types.go +++ b/pkg/types.go @@ -13,6 +13,13 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +const ( + TypeWorkflowTemplate string = "workflow_template" + TypeWorkflowTemplateVersion string = "workflow_template_version" + TypeWorkflowExecution string = "workflow_execution" + TypeCronWorkflow string = "cron_workflow" +) + type Namespace struct { Name string Labels map[string]string @@ -41,18 +48,64 @@ type Metric struct { type CronWorkflow struct { ID uint64 - CreatedAt time.Time `db:"created_at"` + CreatedAt time.Time `db:"created_at"` + ModifiedAt *time.Time `db:"modified_at"` UID string Name string GenerateName string Schedule string Timezone string Suspend bool - ConcurrencyPolicy string - StartingDeadlineSeconds *int64 - SuccessfulJobsHistoryLimit *int32 - FailedJobsHistoryLimit *int32 + ConcurrencyPolicy string `db:"concurrency_policy"` + StartingDeadlineSeconds *int64 `db:"starting_deadline_seconds"` + SuccessfulJobsHistoryLimit *int32 `db:"successful_jobs_history_limit"` + FailedJobsHistoryLimit *int32 `db:"failed_jobs_history_limit"` WorkflowExecution *WorkflowExecution + WorkflowSpec string `db:"workflow_spec"` + Labels []*Label + Version int64 + WorkflowTemplateVersionId uint64 `db:"workflow_template_version_id"` +} + +func (cw *CronWorkflow) GetParametersFromWorkflowSpec() ([]WorkflowExecutionParameter, error) { + var parameters []WorkflowExecutionParameter + + mappedData := make(map[string]interface{}) + + if err := yaml.Unmarshal([]byte(cw.WorkflowSpec), mappedData); err != nil { + return nil, err + } + + arguments, ok := mappedData["arguments"] + if !ok { + return parameters, nil + } + + argumentsMap := arguments.(map[interface{}]interface{}) + parametersRaw, ok := argumentsMap["parameters"] + if !ok { + return parameters, nil + } + + parametersArray, ok := parametersRaw.([]interface{}) + for _, parameter := range parametersArray { + paramMap, ok := parameter.(map[interface{}]interface{}) + if !ok { + continue + } + + name := paramMap["name"].(string) + value := paramMap["value"].(string) + + workflowParameter := WorkflowExecutionParameter{ + Name: name, + Value: &value, + } + + parameters = append(parameters, workflowParameter) + } + + return parameters, nil } type WorkflowTemplate struct { @@ -62,12 +115,22 @@ type WorkflowTemplate struct { Name string Manifest string Version int64 - Versions int64 `db:"versions"` + Versions int64 `db:"versions"` // How many versions there are of this template total. IsLatest bool IsArchived bool `db:"is_archived"` ArgoWorkflowTemplate *wfv1.WorkflowTemplate Labels map[string]string WorkflowExecutionStatisticReport *WorkflowExecutionStatisticReport + WorkflowTemplateVersionId uint64 `db:"workflow_template_version_id"` // Reference to the associated workflow template version. +} + +type Label struct { + ID uint64 + CreatedAt time.Time `db:"created_at"` + Key string + Value string + Resource string + ResourceId uint64 `db:"resource_id"` } type WorkflowExecutionStatisticReport struct { @@ -293,17 +356,6 @@ func (wt *WorkflowTemplate) AddWorkflowTemplateParametersFromAnnotations(spec ma arguments["parameters"] = parameters } -const ( - WorfklowPending WorkflowExecutionPhase = "Pending" - WorfklowRunning WorkflowExecutionPhase = "Running" - WorfklowSucceeded WorkflowExecutionPhase = "Succeeded" - WorfklowSkipped WorkflowExecutionPhase = "Skipped" - WorfklowFailed WorkflowExecutionPhase = "Failed" - WorfklowError WorkflowExecutionPhase = "Error" -) - -type WorkflowExecutionPhase string - type WorkflowExecution struct { ID uint64 CreatedAt time.Time `db:"created_at"` @@ -312,10 +364,10 @@ type WorkflowExecution struct { GenerateName string Parameters []WorkflowExecutionParameter Manifest string - Phase WorkflowExecutionPhase - StartedAt time.Time - FinishedAt time.Time - WorkflowTemplate *WorkflowTemplate + Phase wfv1.NodePhase + StartedAt *time.Time `db:"started_at"` + FinishedAt *time.Time `db:"finished_at"` + WorkflowTemplate *WorkflowTemplate `db:"workflow_template"` Labels map[string]string } @@ -400,3 +452,33 @@ func FilePathToExtension(path string) string { return path[dotIndex+1:] } + +func WorkflowTemplatesToIds(workflowTemplates []*WorkflowTemplate) (ids []uint64) { + mappedIds := make(map[uint64]bool) + + // This is to make sure we don't have duplicates + for _, workflowTemplate := range workflowTemplates { + mappedIds[workflowTemplate.ID] = true + } + + for id := range mappedIds { + ids = append(ids, id) + } + + return +} + +func WorkflowTemplatesToVersionIds(workflowTemplates []*WorkflowTemplate) (ids []uint64) { + mappedIds := make(map[uint64]bool) + + // This is to make sure we don't have duplicates + for _, workflowTemplate := range workflowTemplates { + mappedIds[workflowTemplate.WorkflowTemplateVersionId] = true + } + + for id := range mappedIds { + ids = append(ids, id) + } + + return +} diff --git a/pkg/util/pagination/pagination.go b/pkg/util/pagination/pagination.go new file mode 100644 index 0000000..8447e21 --- /dev/null +++ b/pkg/util/pagination/pagination.go @@ -0,0 +1,46 @@ +package pagination + +import ( + "github.com/Masterminds/squirrel" + "math" +) + +type PaginationRequest struct { + Page uint64 + PageSize uint64 +} + +func NewRequest(page, pageSize int32) PaginationRequest { + if page == 0 { + page = 1 + } + + if pageSize == 0 { + pageSize = 15 + } + + return PaginationRequest{ + Page: uint64(page), + PageSize: uint64(pageSize), + } +} + +func (pr *PaginationRequest) Offset() uint64 { + // start at page 1. + return (pr.Page - 1) * pr.PageSize +} + +func (pr *PaginationRequest) CalculatePages(count int) int32 { + return int32(math.Ceil(float64(count) / float64(pr.PageSize))) +} + +func (pr *PaginationRequest) ApplyToSelect(sb *squirrel.SelectBuilder) *squirrel.SelectBuilder { + if pr == nil { + return sb + } + + result := sb.Limit(pr.PageSize). + Offset(pr.Offset()) + + return &result +} diff --git a/pkg/util/ptr/ptr.go b/pkg/util/ptr/ptr.go index 0af2120..4ded951 100644 --- a/pkg/util/ptr/ptr.go +++ b/pkg/util/ptr/ptr.go @@ -1,5 +1,7 @@ package ptr +import "time" + func Bool(value bool) *bool { return &value } @@ -15,3 +17,7 @@ func Int64(value int64) *int64 { func String(value string) *string { return &value } + +func Time(value time.Time) *time.Time { + return &value +} diff --git a/pkg/workflow_execution.go b/pkg/workflow_execution.go index a7bb26b..dff88b4 100644 --- a/pkg/workflow_execution.go +++ b/pkg/workflow_execution.go @@ -10,10 +10,11 @@ import ( "github.com/ghodss/yaml" "github.com/onepanelio/core/api" "github.com/onepanelio/core/pkg/util/label" + "github.com/onepanelio/core/pkg/util/pagination" + "github.com/onepanelio/core/pkg/util/ptr" "io" "io/ioutil" "regexp" - "sort" "strconv" "strings" "time" @@ -192,7 +193,7 @@ func addEnvToTemplate(template *wfv1.Template, key string, value string) { } } -func (c *Client) createWorkflow(namespace string, workflowTemplateId *uint64, wf *wfv1.Workflow, opts *WorkflowExecutionOptions) (createdWorkflow *wfv1.Workflow, err error) { +func (c *Client) createWorkflow(namespace string, workflowTemplateId uint64, workflowTemplateVersionId uint64, wf *wfv1.Workflow, opts *WorkflowExecutionOptions) (createdWorkflow *wfv1.Workflow, err error) { if opts == nil { opts = &WorkflowExecutionOptions{} } @@ -237,7 +238,7 @@ func (c *Client) createWorkflow(namespace string, workflowTemplateId *uint64, wf return nil, err } - err = InjectExitHandlerWorkflowExecutionStatistic(wf, namespace, workflowTemplateId) + err = InjectExitHandlerWorkflowExecutionStatistic(wf, namespace, &workflowTemplateId) if err != nil { return nil, err } @@ -249,7 +250,7 @@ func (c *Client) createWorkflow(namespace string, workflowTemplateId *uint64, wf } //Create an entry for workflow_executions statistic //CURL code will hit the API endpoint that will update the db row - err = c.InsertPreWorkflowExecutionStatistic(namespace, createdWorkflow.Name, int64(*workflowTemplateId), time.Now()) + err = c.InsertPreWorkflowExecutionStatistic(namespace, createdWorkflow.Name, int64(workflowTemplateId), workflowTemplateVersionId, createdWorkflow.CreationTimestamp.UTC()) if err != nil { return nil, err } @@ -332,7 +333,7 @@ func (c *Client) CreateWorkflowExecution(namespace string, workflow *WorkflowExe var createdWorkflows []*wfv1.Workflow for _, wf := range workflows { - createdWorkflow, err := c.createWorkflow(namespace, &workflowTemplate.ID, &wf, opts) + createdWorkflow, err := c.createWorkflow(namespace, workflowTemplate.ID, workflowTemplate.WorkflowTemplateVersionId, &wf, opts) if err != nil { log.WithFields(log.Fields{ "Namespace": namespace, @@ -365,7 +366,7 @@ func (c *Client) CreateWorkflowExecution(namespace string, workflow *WorkflowExe return workflow, nil } -func (c *Client) InsertPreWorkflowExecutionStatistic(namespace, name string, workflowTemplateID int64, createdAt time.Time) error { +func (c *Client) InsertPreWorkflowExecutionStatistic(namespace, name string, workflowTemplateID int64, workflowTemplateVersionId uint64, createdAt time.Time) error { tx, err := c.DB.Begin() if err != nil { return err @@ -373,10 +374,11 @@ func (c *Client) InsertPreWorkflowExecutionStatistic(namespace, name string, wor defer tx.Rollback() insertMap := sq.Eq{ - "workflow_template_id": workflowTemplateID, - "name": name, - "namespace": namespace, - "created_at": createdAt.UTC(), + "workflow_template_version_id": workflowTemplateVersionId, + "name": name, + "namespace": namespace, + "created_at": createdAt.UTC(), + "phase": wfv1.NodePending, } _, err = sb.Insert("workflow_executions"). @@ -391,7 +393,7 @@ func (c *Client) InsertPreWorkflowExecutionStatistic(namespace, name string, wor return err } -func (c *Client) FinishWorkflowExecutionStatisticViaExitHandler(namespace, name string, workflowTemplateID int64, workflowOutcomeIsSuccess bool) (err error) { +func (c *Client) FinishWorkflowExecutionStatisticViaExitHandler(namespace, name string, workflowTemplateID int64, phase wfv1.NodePhase, startedAt time.Time) (err error) { tx, err := c.DB.Begin() if err != nil { return err @@ -399,15 +401,11 @@ func (c *Client) FinishWorkflowExecutionStatisticViaExitHandler(namespace, name defer tx.Rollback() updateMap := sq.Eq{ - "workflow_template_id": workflowTemplateID, - "name": name, - "namespace": namespace, - } - - if workflowOutcomeIsSuccess { - updateMap["finished_at"] = time.Now().UTC() - } else { - updateMap["failed_at"] = time.Now().UTC() + "started_at": startedAt.UTC(), + "name": name, + "namespace": namespace, + "finished_at": time.Now().UTC(), + "phase": phase, } _, err = sb.Update("workflow_executions"). @@ -423,6 +421,30 @@ func (c *Client) FinishWorkflowExecutionStatisticViaExitHandler(namespace, name } func (c *Client) CronStartWorkflowExecutionStatisticInsert(namespace, name string, workflowTemplateID int64) (err error) { + query, args, err := c.workflowTemplatesSelectBuilder(namespace). + Where(sq.Eq{ + "wt.id": workflowTemplateID, + }). + ToSql() + if err != nil { + return err + } + + workflowTemplate := &WorkflowTemplate{} + if err := c.DB.Get(workflowTemplate, query, args...); err != nil { + return err + } + + query, args, err = c.cronWorkflowSelectBuilder(namespace, workflowTemplate.UID).ToSql() + if err != nil { + return err + } + + cronWorkflow := &CronWorkflow{} + if err := c.DB.Get(cronWorkflow, query, args...); err != nil { + return err + } + tx, err := c.DB.Begin() if err != nil { return err @@ -430,10 +452,11 @@ func (c *Client) CronStartWorkflowExecutionStatisticInsert(namespace, name strin defer tx.Rollback() insertMap := sq.Eq{ - "workflow_template_id": workflowTemplateID, - "name": name, - "namespace": namespace, - "created_at": time.Now().UTC(), + "workflow_template_version_id": cronWorkflow.WorkflowTemplateVersionId, + "name": name, + "namespace": namespace, + "phase": wfv1.NodePending, + "cron_workflow_id": cronWorkflow.ID, } _, err = sb.Insert("workflow_executions"). @@ -497,9 +520,9 @@ func (c *Client) GetWorkflowExecution(namespace, name string) (workflow *Workflo UID: string(wf.UID), CreatedAt: wf.CreationTimestamp.UTC(), Name: wf.Name, - Phase: WorkflowExecutionPhase(wf.Status.Phase), - StartedAt: wf.Status.StartedAt.UTC(), - FinishedAt: wf.Status.FinishedAt.UTC(), + Phase: wf.Status.Phase, + StartedAt: ptr.Time(wf.Status.StartedAt.UTC()), + FinishedAt: ptr.Time(wf.Status.FinishedAt.UTC()), Manifest: string(manifest), WorkflowTemplate: workflowTemplate, } @@ -507,67 +530,27 @@ func (c *Client) GetWorkflowExecution(namespace, name string) (workflow *Workflo return } -func (c *Client) ListWorkflowExecutions(namespace, workflowTemplateUID, workflowTemplateVersion string) (workflows []*WorkflowExecution, err error) { - opts := &WorkflowExecutionOptions{} - if workflowTemplateUID != "" { - labelSelect := fmt.Sprintf("%s=%s", workflowTemplateUIDLabelKey, workflowTemplateUID) - - if workflowTemplateVersion != "" { - labelSelect = fmt.Sprintf("%s,%s=%s", labelSelect, workflowTemplateVersionLabelKey, workflowTemplateVersion) - } - - opts.ListOptions = &ListOptions{ - LabelSelector: labelSelect, - } - } - workflowList, err := c.ArgoprojV1alpha1().Workflows(namespace).List(*opts.ListOptions) +func (c *Client) ListWorkflowExecutions(namespace, workflowTemplateUID, workflowTemplateVersion string, paginator *pagination.PaginationRequest) (workflows []*WorkflowExecution, err error) { + sb := workflowExecutionsSelectBuilder(namespace, workflowTemplateUID, workflowTemplateVersion). + OrderBy("we.created_at DESC") + sb = *paginator.ApplyToSelect(&sb) + query, args, err := sb.ToSql() if err != nil { - log.WithFields(log.Fields{ - "Namespace": namespace, - "WorkflowTemplateUID": workflowTemplateUID, - "WorkflowTemplateVersion": workflowTemplateVersion, - "Error": err.Error(), - }).Error("Workflows not found.") - return nil, util.NewUserError(codes.NotFound, "Workflows not found.") + return nil, err } - wfs := workflowList.Items - sort.Slice(wfs, func(i, j int) bool { - ith := wfs[i].CreationTimestamp.Time - jth := wfs[j].CreationTimestamp.Time - //Most recent first - return ith.After(jth) - }) - - for _, wf := range wfs { - execution := &WorkflowExecution{ - Name: wf.ObjectMeta.Name, - UID: string(wf.ObjectMeta.UID), - Phase: WorkflowExecutionPhase(wf.Status.Phase), - StartedAt: wf.Status.StartedAt.UTC(), - FinishedAt: wf.Status.FinishedAt.UTC(), - CreatedAt: wf.CreationTimestamp.UTC(), - } - - versionString, ok := wf.Labels[workflowTemplateVersionLabelKey] - if ok { - versionNumber, err := strconv.ParseInt(versionString, 10, 64) - if err == nil { - execution.WorkflowTemplate = &WorkflowTemplate{ - Version: versionNumber, - } - } else { - log.WithFields(log.Fields{ - "Namespace": namespace, - "WorkflowTemplateUID": workflowTemplateUID, - "WorkflowExecutionUID": wf.UID, - "Error": "Unable to get workflow template version", - }).Error("Unable to get workflow template version") - } - } - - workflows = append(workflows, execution) + if err := c.DB.Select(&workflows, query, args...); err != nil { + return nil, err } + return +} + +func (c *Client) CountWorkflowExecutions(namespace, workflowTemplateUID, workflowTemplateVersion string) (count int, err error) { + err = workflowExecutionsSelectBuilderNoColumns(namespace, workflowTemplateUID, workflowTemplateVersion). + Columns("COUNT(*)"). + RunWith(c.DB.DB). + QueryRow(). + Scan(&count) return } @@ -644,8 +627,8 @@ func (c *Client) WatchWorkflowExecution(namespace, name string) (<-chan *Workflo workflowWatcher <- &WorkflowExecution{ CreatedAt: workflow.CreationTimestamp.UTC(), - StartedAt: workflow.Status.StartedAt.UTC(), - FinishedAt: workflow.Status.FinishedAt.UTC(), + StartedAt: ptr.Time(workflow.Status.StartedAt.UTC()), + FinishedAt: ptr.Time(workflow.Status.FinishedAt.UTC()), UID: string(workflow.UID), Name: workflow.Name, Manifest: string(manifest), @@ -1152,7 +1135,7 @@ func (c *Client) SetWorkflowTemplateLabels(namespace, name, prefix string, keyVa func (c *Client) GetWorkflowExecutionStatisticsForTemplates(workflowTemplates ...*WorkflowTemplate) (err error) { if len(workflowTemplates) == 0 { - return errors.New("GetWorkflowExecutionStatisticsForTemplates requires at least 1 id") + return nil } tx, err := c.DB.Begin() @@ -1160,7 +1143,7 @@ func (c *Client) GetWorkflowExecutionStatisticsForTemplates(workflowTemplates .. return err } - whereIn := "workflow_template_id IN (?" + whereIn := "wtv.workflow_template_id IN (?" for i := range workflowTemplates { if i == 0 { continue @@ -1179,16 +1162,17 @@ func (c *Client) GetWorkflowExecutionStatisticsForTemplates(workflowTemplates .. statsSelect := ` workflow_template_id, - MAX(created_at) last_executed, - COUNT(*) FILTER (WHERE finished_At IS NULL AND failed_at IS NULL) running, - COUNT(*) FILTER (WHERE finished_at IS NOT NULL) completed, - COUNT(*) FILTER (WHERE failed_at IS NOT NULL) failed, + MAX(we.created_at) last_executed, + COUNT(*) FILTER (WHERE finished_at IS NULL AND (phase = 'Running' OR phase = 'Pending')) running, + COUNT(*) FILTER (WHERE finished_at IS NOT NULL AND phase = 'Succeeded') completed, + COUNT(*) FILTER (WHERE finished_at IS NOT NULL AND (phase = 'Failed' OR phase = 'Error')) failed, COUNT(*) total` query, args, err := sb.Select(statsSelect). - From("workflow_executions"). + From("workflow_executions we"). + Join("workflow_template_versions wtv ON wtv.id = we.workflow_template_version_id"). Where(whereIn, ids...). - GroupBy("workflow_template_id"). + GroupBy("wtv.workflow_template_id"). ToSql() if err != nil { @@ -1336,3 +1320,28 @@ func InjectInitHandlerWorkflowExecutionStatistic(wf *wfv1.Workflow, namespace st } return nil } + +func workflowExecutionsSelectBuilderNoColumns(namespace, workflowTemplateUID, workflowTemplateVersion string) sq.SelectBuilder { + whereMap := sq.Eq{ + "wt.namespace": namespace, + "wt.uid": workflowTemplateUID, + } + if workflowTemplateVersion != "" { + whereMap["wtv.version"] = workflowTemplateVersion + } + + sb := sb.Select(). + From("workflow_executions we"). + LeftJoin("workflow_template_versions wtv ON wtv.id = we.workflow_template_version_id"). + LeftJoin("workflow_templates wt ON wt.id = wtv.workflow_template_id"). + Where(whereMap) + + return sb +} + +func workflowExecutionsSelectBuilder(namespace, workflowTemplateUID, workflowTemplateVersion string) sq.SelectBuilder { + sb := workflowExecutionsSelectBuilderNoColumns(namespace, workflowTemplateUID, workflowTemplateVersion) + sb = sb.Columns("we.id", "we.created_at", "we.uid", "we.name", "we.phase", "we.started_at", "we.finished_at", `wtv.version "workflow_template.version"`) + + return sb +} diff --git a/pkg/workflow_template.go b/pkg/workflow_template.go index b0d4f5d..7302f79 100644 --- a/pkg/workflow_template.go +++ b/pkg/workflow_template.go @@ -4,6 +4,7 @@ import ( "database/sql" "errors" "fmt" + "github.com/onepanelio/core/pkg/util/pagination" "regexp" "strconv" "strings" @@ -49,19 +50,31 @@ func (c *Client) createWorkflowTemplate(namespace string, workflowTemplate *Work return nil, err } - _, err = sb.Insert("workflow_template_versions"). + workflowTemplateVersion := WorkflowTemplateVersion{} + err = sb.Insert("workflow_template_versions"). SetMap(sq.Eq{ "workflow_template_id": workflowTemplate.ID, "version": versionUnix, "is_latest": true, "manifest": workflowTemplate.Manifest, }). + Suffix("RETURNING id"). RunWith(tx). - Exec() + QueryRow().Scan(&workflowTemplateVersion.ID) if err != nil { return nil, err } + if len(workflowTemplate.Labels) > 0 { + _, err = c.InsertLabelsBuilder(TypeWorkflowTemplateVersion, workflowTemplateVersion.ID, workflowTemplate.Labels). + RunWith(tx). + Exec() + + if err != nil { + return nil, err + } + } + argoWft, err := createArgoWorkflowTemplate(workflowTemplate, versionUnix) argoWft, err = c.ArgoprojV1alpha1().WorkflowTemplates(namespace).Create(argoWft) if err != nil { @@ -99,13 +112,58 @@ func (c *Client) workflowTemplatesVersionSelectBuilder(namespace string) sq.Sele return sb } +func (c *Client) GetWorkflowTemplateDb(namespace, name string) (workflowTemplate *WorkflowTemplate, err error) { + query, args, err := c.workflowTemplatesSelectBuilder(namespace). + Where(sq.Eq{ + "name": name, + }). + ToSql() + + if err != nil { + return nil, err + } + + if err := c.DB.Get(workflowTemplate, query, args...); err != nil { + return nil, err + } + + return +} + +// "latest" will get you the latest version +func (c *Client) GetWorkflowTemplateVersionDb(namespace, name, version string) (workflowTemplateVersion *WorkflowTemplateVersion, err error) { + whereMap := sq.Eq{ + "wt.name": name, + } + + if version == "latest" { + whereMap["wtv.is_latest"] = "true" + } else { + whereMap["wtv.version"] = version + } + + query, args, err := c.workflowTemplatesVersionSelectBuilder(namespace). + Where(whereMap). + ToSql() + + if err != nil { + return nil, err + } + + if err := c.DB.Get(workflowTemplateVersion, query, args...); err != nil { + return nil, err + } + + return +} + func (c *Client) getWorkflowTemplate(namespace, uid string, version int64) (workflowTemplate *WorkflowTemplate, err error) { workflowTemplate = &WorkflowTemplate{ WorkflowExecutionStatisticReport: &WorkflowExecutionStatisticReport{}, } sb := c.workflowTemplatesSelectBuilder(namespace). - Column("wtv.manifest"). + Columns("wtv.manifest", "wtv.id workflow_template_version_id"). Join("workflow_template_versions wtv ON wt.id = wtv.workflow_template_id"). Where(sq.Eq{"wt.uid": uid}) @@ -121,12 +179,7 @@ func (c *Client) getWorkflowTemplate(namespace, uid string, version int64) (work } if err = c.DB.Get(workflowTemplate, query, args...); err == sql.ErrNoRows { - err = nil - workflowTemplate = nil - } - - if workflowTemplate == nil { - return workflowTemplate, nil + return nil, nil } versionAsString := "latest" @@ -147,6 +200,13 @@ func (c *Client) getWorkflowTemplate(namespace, uid string, version int64) (work workflowTemplate.Version = templateVersion + labelsMap, err := c.GetDbLabelsMapped(TypeWorkflowTemplateVersion, workflowTemplate.WorkflowTemplateVersionId) + if err != nil { + return workflowTemplate, err + } + + workflowTemplate.Labels = labelsMap[workflowTemplate.WorkflowTemplateVersionId] + return workflowTemplate, nil } @@ -227,18 +287,20 @@ func (c *Client) listWorkflowTemplateVersions(namespace, uid string) (workflowTe return } -func (c *Client) listWorkflowTemplates(namespace string) (workflowTemplateVersions []*WorkflowTemplate, err error) { +func (c *Client) listWorkflowTemplates(namespace string, paginator *pagination.PaginationRequest) (workflowTemplateVersions []*WorkflowTemplate, err error) { workflowTemplateVersions = []*WorkflowTemplate{} - query, args, err := c.workflowTemplatesSelectBuilder(namespace). - Column("COUNT(wtv.*) versions"). - Options("DISTINCT ON (wt.id)"). + sb := c.workflowTemplatesSelectBuilder(namespace). + Column("COUNT(wtv.*) versions, MAX(wtv.id) workflow_template_version_id"). Join("workflow_template_versions wtv ON wtv.workflow_template_id = wt.id"). GroupBy("wt.id", "wt.created_at", "wt.uid", "wt.name", "wt.is_archived"). Where(sq.Eq{ "wt.is_archived": false, }). - OrderBy("wt.id desc").ToSql() + OrderBy("wt.created_at DESC") + + sb = *paginator.ApplyToSelect(&sb) + query, args, err := sb.ToSql() if err != nil { return } @@ -248,6 +310,21 @@ func (c *Client) listWorkflowTemplates(namespace string) (workflowTemplateVersio return } +func (c *Client) CountWorkflowTemplates(namespace string) (count int, err error) { + err = sb.Select("COUNT( DISTINCT( wt.id ))"). + From("workflow_templates wt"). + Join("workflow_template_versions wtv ON wtv.workflow_template_id = wt.id"). + Where(sq.Eq{ + "wt.namespace": namespace, + "wt.is_archived": false, + }). + RunWith(c.DB.DB). + QueryRow(). + Scan(&count) + + return +} + func (c *Client) archiveWorkflowTemplate(namespace, uid string) (bool, error) { query, args, err := sb.Update("workflow_templates"). Set("is_archived", true). @@ -342,18 +419,28 @@ func (c *Client) CreateWorkflowTemplateVersion(namespace string, workflowTemplat return nil, err } - _, err = sb.Insert("workflow_template_versions"). + workflowTemplateVersionId := uint64(0) + err = sb.Insert("workflow_template_versions"). SetMap(sq.Eq{ "workflow_template_id": workflowTemplateDb.ID, "version": versionUnix, "is_latest": true, "manifest": workflowTemplate.Manifest, }). + Suffix("RETURNING id"). + RunWith(tx). + QueryRow().Scan(&workflowTemplateVersionId) + if err != nil { + return nil, err + } + + _, err = c.InsertLabelsBuilder(TypeWorkflowTemplateVersion, workflowTemplateVersionId, workflowTemplate.Labels). RunWith(tx). Exec() if err != nil { return nil, err } + if err := tx.Commit(); err != nil { return nil, err } @@ -452,8 +539,8 @@ func (c *Client) ListWorkflowTemplateVersions(namespace, uid string) (workflowTe return } -func (c *Client) ListWorkflowTemplates(namespace string) (workflowTemplateVersions []*WorkflowTemplate, err error) { - workflowTemplateVersions, err = c.listWorkflowTemplates(namespace) +func (c *Client) ListWorkflowTemplates(namespace string, paginator *pagination.PaginationRequest) (workflowTemplateVersions []*WorkflowTemplate, err error) { + workflowTemplateVersions, err = c.listWorkflowTemplates(namespace, paginator) if err != nil { log.WithFields(log.Fields{ "Namespace": namespace, @@ -470,17 +557,18 @@ func (c *Client) ListWorkflowTemplates(namespace string) (workflowTemplateVersio }).Error("Unable to get Workflow Execution Statistic for Templates.") return nil, util.NewUserError(codes.NotFound, "Unable to get Workflow Execution Statistic for Templates.") } - for _, workflowTemplate := range workflowTemplateVersions { - labels, err := c.GetWorkflowTemplateLabels(namespace, workflowTemplate.UID, label.TagPrefix, workflowTemplate.Version) - if err != nil { - log.WithFields(log.Fields{ - "Namespace": namespace, - "Error": err.Error(), - }).Error("Unable to get GetWorkflowTemplateLabels for Templates.") - continue - } - workflowTemplate.Labels = labels + labelsMap, err := c.GetDbLabelsMapped(TypeWorkflowTemplateVersion, WorkflowTemplatesToVersionIds(workflowTemplateVersions)...) + if err != nil { + log.WithFields(log.Fields{ + "Namespace": namespace, + "Error": err.Error(), + }).Error("Unable to get Workflow Template Labels") + return nil, err + } + + for _, workflowTemplate := range workflowTemplateVersions { + workflowTemplate.Labels = labelsMap[workflowTemplate.WorkflowTemplateVersionId] } return diff --git a/server/cron_workflow_server.go b/server/cron_workflow_server.go index d548b1c..75b9f86 100644 --- a/server/cron_workflow_server.go +++ b/server/cron_workflow_server.go @@ -5,10 +5,10 @@ import ( "github.com/golang/protobuf/ptypes/empty" "github.com/onepanelio/core/api" v1 "github.com/onepanelio/core/pkg" + "github.com/onepanelio/core/pkg/util/pagination" "github.com/onepanelio/core/pkg/util/ptr" "github.com/onepanelio/core/server/auth" "github.com/onepanelio/core/server/converter" - "math" ) type CronWorkflowServer struct{} @@ -255,11 +255,8 @@ func (c *CronWorkflowServer) ListCronWorkflows(ctx context.Context, req *api.Lis return nil, err } - if req.PageSize <= 0 { - req.PageSize = 15 - } - - cronWorkflows, err := client.ListCronWorkflows(req.Namespace, req.WorkflowTemplateUid) + paginator := pagination.NewRequest(req.Page, req.PageSize) + cronWorkflows, err := client.ListCronWorkflows(req.Namespace, req.WorkflowTemplateUid, &paginator) if err != nil { return nil, err } @@ -268,27 +265,17 @@ func (c *CronWorkflowServer) ListCronWorkflows(ctx context.Context, req *api.Lis apiCronWorkflows = append(apiCronWorkflows, apiCronWorkflow(cwf)) } - pages := int32(math.Ceil(float64(len(apiCronWorkflows)) / float64(req.PageSize))) - if req.Page > pages { - req.Page = pages - } - - if req.Page <= 0 { - req.Page = 1 - } - - start := (req.Page - 1) * req.PageSize - end := start + req.PageSize - if end >= int32(len(apiCronWorkflows)) { - end = int32(len(apiCronWorkflows)) + count, err := client.CountCronWorkflows(req.Namespace, req.WorkflowTemplateUid) + if err != nil { + return nil, err } return &api.ListCronWorkflowsResponse{ - Count: end - start, - CronWorkflows: apiCronWorkflows[start:end], - Page: req.Page, - Pages: pages, - TotalCount: int32(len(apiCronWorkflows)), + Count: int32(len(apiCronWorkflows)), + CronWorkflows: apiCronWorkflows, + Page: int32(paginator.Page), + Pages: paginator.CalculatePages(count), + TotalCount: int32(count), }, nil } diff --git a/server/workflow_server.go b/server/workflow_server.go index 8198b91..fa7df5a 100644 --- a/server/workflow_server.go +++ b/server/workflow_server.go @@ -2,10 +2,12 @@ package server import ( "context" + "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" "github.com/onepanelio/core/pkg/util" + "github.com/onepanelio/core/pkg/util/pagination" "github.com/onepanelio/core/server/converter" "google.golang.org/grpc/codes" - "math" + argov1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sort" "strings" "time" @@ -36,10 +38,10 @@ func apiWorkflowExecution(wf *v1.WorkflowExecution) (workflow *api.WorkflowExecu Manifest: wf.Manifest, } - if !wf.StartedAt.IsZero() { + if wf.StartedAt != nil && !wf.StartedAt.IsZero() { workflow.StartedAt = wf.StartedAt.Format(time.RFC3339) } - if !wf.FinishedAt.IsZero() { + if wf.FinishedAt != nil && !wf.FinishedAt.IsZero() { workflow.FinishedAt = wf.FinishedAt.Format(time.RFC3339) } @@ -81,19 +83,27 @@ func (s *WorkflowServer) CreateWorkflowExecution(ctx context.Context, req *api.C func (s *WorkflowServer) AddWorkflowExecutionStatistics(ctx context.Context, request *api.AddWorkflowExecutionStatisticRequest) (*empty.Empty, error) { client := ctx.Value("kubeClient").(*v1.Client) - workflowOutcomeIsSuccess := false + phase := v1alpha1.NodeFailed if request.Statistics.WorkflowStatus == "Succeeded" { - workflowOutcomeIsSuccess = true + phase = v1alpha1.NodeSucceeded } - err := client.FinishWorkflowExecutionStatisticViaExitHandler(request.Namespace, request.Name, - request.Statistics.WorkflowTemplateId, workflowOutcomeIsSuccess) + workflow, err := client.ArgoprojV1alpha1().Workflows(request.Namespace).Get(request.Name, argov1.GetOptions{}) + if err != nil { + return &empty.Empty{}, err + } + + err = client.FinishWorkflowExecutionStatisticViaExitHandler(request.Namespace, request.Name, + request.Statistics.WorkflowTemplateId, phase, workflow.Status.StartedAt.UTC()) if err != nil { return &empty.Empty{}, err } return &empty.Empty{}, nil } +// @todo we should not pass in an id into the request. +// instead pass in the cron workflow uid, we can load the cron workflow from db that way and get +// all required data. func (s *WorkflowServer) CronStartWorkflowExecutionStatistic(ctx context.Context, request *api.CronStartWorkflowExecutionStatisticRequest) (*empty.Empty, error) { client := ctx.Value("kubeClient").(*v1.Client) err := client.CronStartWorkflowExecutionStatisticInsert(request.Namespace, request.Name, request.WorkflowTemplateId) @@ -213,11 +223,8 @@ func (s *WorkflowServer) ListWorkflowExecutions(ctx context.Context, req *api.Li return nil, err } - if req.PageSize <= 0 { - req.PageSize = 15 - } - - workflows, err := client.ListWorkflowExecutions(req.Namespace, req.WorkflowTemplateUid, req.WorkflowTemplateVersion) + paginator := pagination.NewRequest(req.Page, req.PageSize) + workflows, err := client.ListWorkflowExecutions(req.Namespace, req.WorkflowTemplateUid, req.WorkflowTemplateVersion, &paginator) if err != nil { return nil, err } @@ -227,27 +234,17 @@ func (s *WorkflowServer) ListWorkflowExecutions(ctx context.Context, req *api.Li apiWorkflowExecutions = append(apiWorkflowExecutions, apiWorkflowExecution(wf)) } - pages := int32(math.Ceil(float64(len(apiWorkflowExecutions)) / float64(req.PageSize))) - if req.Page > pages { - req.Page = pages - } - - if req.Page <= 0 { - req.Page = 1 - } - - start := (req.Page - 1) * req.PageSize - end := start + req.PageSize - if end >= int32(len(apiWorkflowExecutions)) { - end = int32(len(apiWorkflowExecutions)) + count, err := client.CountWorkflowExecutions(req.Namespace, req.WorkflowTemplateUid, req.WorkflowTemplateVersion) + if err != nil { + return nil, err } return &api.ListWorkflowExecutionsResponse{ - Count: end - start, - WorkflowExecutions: apiWorkflowExecutions[start:end], - Page: req.Page, - Pages: pages, - TotalCount: int32(len(apiWorkflowExecutions)), + Count: int32(len(apiWorkflowExecutions)), + WorkflowExecutions: apiWorkflowExecutions, + Page: int32(paginator.Page), + Pages: paginator.CalculatePages(count), + TotalCount: int32(count), }, nil } diff --git a/server/workflow_template_server.go b/server/workflow_template_server.go index 2d21eec..f249fd5 100644 --- a/server/workflow_template_server.go +++ b/server/workflow_template_server.go @@ -6,6 +6,7 @@ import ( "github.com/onepanelio/core/api" v1 "github.com/onepanelio/core/pkg" "github.com/onepanelio/core/pkg/util/label" + "github.com/onepanelio/core/pkg/util/pagination" "github.com/onepanelio/core/server/auth" "github.com/onepanelio/core/server/converter" "strings" @@ -208,7 +209,8 @@ func (s *WorkflowTemplateServer) ListWorkflowTemplates(ctx context.Context, req return nil, err } - workflowTemplates, err := client.ListWorkflowTemplates(req.Namespace) + paginator := pagination.NewRequest(req.Page, req.PageSize) + workflowTemplates, err := client.ListWorkflowTemplates(req.Namespace, &paginator) if err != nil { return nil, err } @@ -218,9 +220,17 @@ func (s *WorkflowTemplateServer) ListWorkflowTemplates(ctx context.Context, req apiWorkflowTemplates = append(apiWorkflowTemplates, apiWorkflowTemplate(wtv)) } + count, err := client.CountWorkflowTemplates(req.Namespace) + if err != nil { + return nil, err + } + return &api.ListWorkflowTemplatesResponse{ Count: int32(len(apiWorkflowTemplates)), WorkflowTemplates: apiWorkflowTemplates, + Page: int32(paginator.Page), + Pages: paginator.CalculatePages(count), + TotalCount: int32(count), }, nil } @@ -276,7 +286,19 @@ func (s *WorkflowTemplateServer) AddWorkflowTemplateLabels(ctx context.Context, keyValues[item.Key] = item.Value } - labels, err := client.SetWorkflowTemplateLabels(req.Namespace, req.Name, "tags.onepanel.io/", keyValues, false) + labels, err := client.SetWorkflowTemplateLabels(req.Namespace, req.Name, label.TagPrefix, keyValues, false) + if err != nil { + return nil, err + } + + workflowTemplateVersion, err := client.GetWorkflowTemplateVersionDb(req.Namespace, req.Name, "latest") + if err != nil { + return nil, err + } + + _, err = client.InsertLabelsBuilder(v1.TypeWorkflowTemplateVersion, workflowTemplateVersion.ID, keyValues). + RunWith(client.DB.DB). + Exec() if err != nil { return nil, err }