Compare commits

..

33 Commits

Author SHA1 Message Date
Rush Tehrani
709871dd3b Merge pull request #834 from rushtehrani/fix/params
fix: Only remove whitespaces in var references
2021-01-11 20:44:37 -08:00
rushtehrani
1d0c898fa4 code cleanup + no param mutation 2021-01-11 20:37:23 -08:00
rushtehrani
1355f8039e only remove whitespaces in var references 2021-01-11 20:05:58 -08:00
Rush Tehrani
2c187ad784 Merge pull request #830 from Vafilor/feat/workflow.template.version.parameters
feat: add parameters to workflow template versions response
2021-01-09 20:36:51 -08:00
Andrey Melnikov
c2a80121b7 fix: issue where opts.params were overwritten because of pointer reference - this caused the params stored in database to be altered 2021-01-09 19:26:33 -08:00
Andrey Melnikov
5275aa2ea7 Merge branch 'feat/workflow.template.version.parameters' of github.com:Vafilor/core into feat/workflow.template.version.parameters 2021-01-09 19:25:50 -08:00
Andrey Melnikov
682994c4c2 feat: add parameters to workflow template versions response 2021-01-09 19:20:25 -08:00
Andrey Melnikov
302731e23a feat: replace parameter namespace values for workflow/workspace with the runtime value 2021-01-09 15:14:38 -08:00
Andrey Melnikov
f9338bd51e feat: add parameters to workflow template versions response 2021-01-08 10:47:06 -08:00
Andrey Melnikov
f424e5e661 Merge pull request #826 from rushtehrani/fix/tfod
fix: Rename train.py => main.py in TFOD workflow template
2021-01-07 12:54:07 -08:00
rushtehrani
d9cc564596 rename train.py => main.py in tfod workflow template 2021-01-07 12:50:37 -08:00
Andrey Melnikov
fbc780ab4d Merge pull request #824 from rushtehrani/feat/update-cvat
feat: Update CVAT and FileSyncer images
2021-01-07 10:05:28 -08:00
rushtehrani
5f531f2ac0 update cvat and filesyncer images 2021-01-07 09:59:26 -08:00
Rush Tehrani
208828fcaf Merge pull request #822 from aleksandrmelnikov/feat/core.804-upg.k8s.arg.support.secrets
feat: Adding support for ImagePullSecrets in workspaces and workflows
2021-01-06 12:14:31 -08:00
Rush Tehrani
855eaf2a40 Merge pull request #823 from rushtehrani/fix/tf-training
fix: Remove used-by: cvat from tf training workflow
2021-01-06 10:00:41 -08:00
rushtehrani
d230357a28 remove used-by: cvat from tf training workflow 2021-01-05 16:48:50 -08:00
Aleksandr Melnikov
6e0131636e go fmt. 2021-01-05 12:28:12 -08:00
Aleksandr Melnikov
70e3d36416 Adding support for ImagePullSecrets in workspaces. 2021-01-05 11:44:10 -08:00
Rush Tehrani
313e6841b1 Merge pull request #821 from rushtehrani/feat/cleanup
chore: Remove archive from templates
2021-01-04 13:51:39 -08:00
rushtehrani
22abceb9df remove archive from templates 2021-01-04 13:47:32 -08:00
Rush Tehrani
e6889ac470 Merge pull request #820 from Vafilor/fix/finished.logs
fix: issue where logs would sometimes crash
2020-12-31 23:36:16 -08:00
Andrey Melnikov
5669d03c5c chore: codacy 2020-12-31 23:33:42 -08:00
Andrey Melnikov
a8f5cde75e fix: issue where logs would sometimes accumulate because there was no newline in the content. 2020-12-31 23:23:13 -08:00
Rush Tehrani
67af745dab Merge pull request #818 from Vafilor/fix/finished.logs
fix: issues with finished logs
2020-12-31 11:12:09 -08:00
Andrey Melnikov
080624d9e2 fix: issues with finished logs. Content was repeating, resulting in a large data load - making the stream fail.
No timestamps were being returned as timestamps for time 0.
Occasionally lines would cut off across multiple lines.
2020-12-31 11:07:39 -08:00
Rush Tehrani
69c523ee23 Merge pull request #815 from Vafilor/feat/default.make.api
feat: make api command use docker by default.
2020-12-30 15:18:46 -08:00
Andrey Melnikov
303cee3e9f feat: make api command use docker by default. 2020-12-30 15:05:16 -08:00
Rush Tehrani
3cb799f6fe Merge pull request #814 from Vafilor/fix/timestamp.logs
fix: workflow execution logs showing timestamps incorrectly
2020-12-30 13:25:35 -08:00
Andrey Melnikov
8d896c03c0 Merge branch 'fix/timestamp.logs' of github.com:Vafilor/core into fix/timestamp.logs 2020-12-30 12:05:37 -08:00
Andrey Melnikov
543367c36e feat: updated workflow logs to return a response that may contain several log entries. 2020-12-30 12:04:21 -08:00
Andrey Melnikov
ba776cddbd fix: fixed issue with logs where the timestamp parsing ignored newlines 2020-12-30 12:04:21 -08:00
Andrey Melnikov
ea89ddf289 feat: updated workflow logs to return a response that may contain several log entries. 2020-12-30 12:02:25 -08:00
Andrey Melnikov
48d66004ec fix: fixed issue with logs where the timestamp parsing ignored newlines 2020-12-30 11:49:37 -08:00
20 changed files with 893 additions and 513 deletions

View File

@@ -35,10 +35,10 @@ protoc:
--openapiv2_opt simple_operation_ids=true \
api/proto/*.proto
api: init protoc jq
api-internal: init protoc jq
api-docker: init
docker run --rm --mount type=bind,source="${PWD}",target=/root onepanel/helper:v1.0.0 make api version=$(version)
api: init
docker run --rm --mount type=bind,source="${PWD}",target=/root onepanel/helper:v1.0.0 make api-internal version=$(version)
docker-build:
docker build -t onepanel-core .

View File

@@ -1289,13 +1289,13 @@
"type": "object",
"properties": {
"result": {
"$ref": "#/definitions/LogEntry"
"$ref": "#/definitions/LogStreamResponse"
},
"error": {
"$ref": "#/definitions/google.rpc.Status"
}
},
"title": "Stream result of LogEntry"
"title": "Stream result of LogStreamResponse"
}
},
"default": {
@@ -3543,6 +3543,17 @@
}
}
},
"LogStreamResponse": {
"type": "object",
"properties": {
"logEntries": {
"type": "array",
"items": {
"$ref": "#/definitions/LogEntry"
}
}
}
},
"Metric": {
"type": "object",
"properties": {

File diff suppressed because it is too large Load Diff

View File

@@ -140,7 +140,7 @@ func (c *workflowServiceClient) GetWorkflowExecutionLogs(ctx context.Context, in
}
type WorkflowService_GetWorkflowExecutionLogsClient interface {
Recv() (*LogEntry, error)
Recv() (*LogStreamResponse, error)
grpc.ClientStream
}
@@ -148,8 +148,8 @@ type workflowServiceGetWorkflowExecutionLogsClient struct {
grpc.ClientStream
}
func (x *workflowServiceGetWorkflowExecutionLogsClient) Recv() (*LogEntry, error) {
m := new(LogEntry)
func (x *workflowServiceGetWorkflowExecutionLogsClient) Recv() (*LogStreamResponse, error) {
m := new(LogStreamResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
@@ -460,7 +460,7 @@ func _WorkflowService_GetWorkflowExecutionLogs_Handler(srv interface{}, stream g
}
type WorkflowService_GetWorkflowExecutionLogsServer interface {
Send(*LogEntry) error
Send(*LogStreamResponse) error
grpc.ServerStream
}
@@ -468,7 +468,7 @@ type workflowServiceGetWorkflowExecutionLogsServer struct {
grpc.ServerStream
}
func (x *workflowServiceGetWorkflowExecutionLogsServer) Send(m *LogEntry) error {
func (x *workflowServiceGetWorkflowExecutionLogsServer) Send(m *LogStreamResponse) error {
return x.ServerStream.SendMsg(m)
}

View File

@@ -50,7 +50,7 @@ service WorkflowService {
};
}
rpc GetWorkflowExecutionLogs (GetWorkflowExecutionLogsRequest) returns (stream LogEntry) {
rpc GetWorkflowExecutionLogs (GetWorkflowExecutionLogsRequest) returns (stream LogStreamResponse) {
option (google.api.http) = {
get: "/apis/v1beta1/{namespace}/workflow_executions/{uid}/pods/{podName}/containers/{containerName}/logs"
};
@@ -205,6 +205,10 @@ message ListWorkflowExecutionsResponse {
int32 totalAvailableCount = 6;
}
message LogStreamResponse {
repeated LogEntry logEntries = 1;
}
message LogEntry {
string timestamp = 1;
string content = 2;

View File

@@ -21,7 +21,6 @@ func Up20201223062947(tx *sql.Tx) error {
tensorflowWorkflowTemplateName,
map[string]string{
"created-by": "system",
"used-by": "cvat",
},
)
}

View File

@@ -0,0 +1,30 @@
package migration
import (
"database/sql"
"github.com/pressly/goose"
"path/filepath"
)
func initialize20210107094725() {
if _, ok := initializedMigrations[20210107094725]; !ok {
goose.AddMigration(Up20210107094725, Down20210107094725)
initializedMigrations[20210107094725] = true
}
}
//Up20210107094725 updates CVAT to latest image
func Up20210107094725(tx *sql.Tx) error {
// This code is executed when the migration is applied.
return updateWorkspaceTemplateManifest(
filepath.Join("workspaces", "cvat", "20210107094725.yaml"),
cvatTemplateName)
}
//Down20210107094725 reverts to previous CVAT image
func Down20210107094725(tx *sql.Tx) error {
// This code is executed when the migration is rolled back.
return updateWorkspaceTemplateManifest(
filepath.Join("workspaces", "cvat", "20201211161117"),
cvatTemplateName)
}

View File

@@ -85,6 +85,7 @@ func Initialize() {
initialize20201223202929()
initialize20201225172926()
initialize20201229205644()
initialize20210107094725()
if err := client.DB.Close(); err != nil {
log.Printf("[error] closing db %v", err)

View File

@@ -100,8 +100,6 @@ templates:
- name: output
path: /mnt/output
optional: true
archive:
none: {}
container:
image: onepanel/dl:0.17.0
args:

View File

@@ -24,8 +24,6 @@ templates:
- name: output
path: /mnt/output
optional: true
archive:
none: {}
script:
image: onepanel/dl:0.17.0
command:

View File

@@ -24,8 +24,6 @@ templates:
- name: output
path: /mnt/output
optional: true
archive:
none: {}
script:
image: onepanel/dl:0.17.0
command:

View File

@@ -101,7 +101,7 @@ templates:
cd /mnt/src/tf/research/ && \
/mnt/src/protoc/bin/protoc object_detection/protos/*.proto --python_out=. && \
cd /mnt/src/train/workflows/tf-object-detection-training && \
python train.py \
python main.py \
--extras="{{workflow.parameters.hyperparameters}}" \
--model="{{workflow.parameters.cvat-model}}" \
--num_classes="{{workflow.parameters.cvat-num-classes}}" \

View File

@@ -0,0 +1,163 @@
# Workspace arguments
arguments:
parameters:
- name: sync-directory
displayName: Directory to sync raw input and training output
value: workflow-data
hint: Location (relative to current namespace) to sync raw input, models and checkpoints from default object storage to '/share'.
containers:
- name: cvat-db
image: postgres:10-alpine
env:
- name: POSTGRES_USER
value: root
- name: POSTGRES_DB
value: cvat
- name: POSTGRES_HOST_AUTH_METHOD
value: trust
- name: PGDATA
value: /var/lib/psql/data
ports:
- containerPort: 5432
name: tcp
volumeMounts:
- name: db
mountPath: /var/lib/psql
- name: cvat-redis
image: redis:4.0-alpine
ports:
- containerPort: 6379
name: tcp
- name: cvat
image: onepanel/cvat:0.17.0_cvat.1.0.0
env:
- name: DJANGO_MODWSGI_EXTRA_ARGS
value: ""
- name: ALLOWED_HOSTS
value: '*'
- name: CVAT_REDIS_HOST
value: localhost
- name: CVAT_POSTGRES_HOST
value: localhost
- name: CVAT_SHARE_URL
value: /cvat/data
- name: CVAT_SHARE_DIR
value: /share
- name: CVAT_DATA_DIR
value: /cvat/data
- name: CVAT_MEDIA_DATA_DIR
value: /cvat/data/data
- name: CVAT_KEYS_DIR
value: /cvat/data/keys
- name: CVAT_MODELS_DIR
value: /cvat/data/models
- name: CVAT_LOGS_DIR
value: /cvat/logs
- name: ONEPANEL_SYNC_DIRECTORY
value: '{{workspace.parameters.sync-directory}}'
- name: NVIDIA_VISIBLE_DEVICES
value: all
- name: NVIDIA_DRIVER_CAPABILITIES
value: compute,utility
- name: NVIDIA_REQUIRE_CUDA
value: "cuda>=10.0 brand=tesla,driver>=384,driver<385 brand=tesla,driver>=410,driver<411"
- name: ONEPANEL_MAIN_CONTAINER
value: 'true'
ports:
- containerPort: 8080
name: http
volumeMounts:
- name: cvat-data
mountPath: /cvat
- name: share
mountPath: /share
- name: sys-namespace-config
mountPath: /etc/onepanel
readOnly: true
- name: cvat-ui
image: onepanel/cvat-ui:0.17.0_cvat.1.0.0
ports:
- containerPort: 80
name: http
# You can add multiple FileSyncer sidecar containers if needed
- name: filesyncer
image: onepanel/filesyncer:0.17.0
imagePullPolicy: Always
args:
- download
- -server-prefix=/sys/filesyncer
env:
- name: FS_PATH
value: /mnt/share
- name: FS_PREFIX
value: '{{workflow.namespace}}/{{workspace.parameters.sync-directory}}'
volumeMounts:
- name: share
mountPath: /mnt/share
- name: sys-namespace-config
mountPath: /etc/onepanel
readOnly: true
ports:
- name: cvat-ui
port: 80
protocol: TCP
targetPort: 80
- name: cvat
port: 8080
protocol: TCP
targetPort: 8080
- name: fs
port: 8888
protocol: TCP
targetPort: 8888
routes:
- match:
- uri:
prefix: /sys/filesyncer
route:
- destination:
port:
number: 8888
- match:
- uri:
regex: /api/.*|/git/.*|/tensorflow/.*|/onepanelio/.*|/tracking/.*|/auto_annotation/.*|/analytics/.*|/static/.*|/admin/.*|/documentation/.*|/dextr/.*|/reid/.*
- queryParams:
id:
regex: \d+.*
route:
- destination:
port:
number: 8080
- match:
- uri:
prefix: /
route:
- destination:
port:
number: 80
volumeClaimTemplates:
- metadata:
name: db
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 20Gi
# DAG Workflow to be executed once a Workspace action completes (optional)
# Uncomment the lines below if you want to send Slack notifications
#postExecutionWorkflow:
# entrypoint: main
# templates:
# - name: main
# dag:
# tasks:
# - name: slack-notify
# template: slack-notify
# - name: slack-notify
# container:
# image: technosophos/slack-notify
# args:
# - SLACK_USERNAME=onepanel SLACK_TITLE="Your workspace is ready" SLACK_ICON=https://www.gravatar.com/avatar/5c4478592fe00878f62f0027be59c1bd SLACK_MESSAGE="Your workspace is now running" ./slack-notify
# command:
# - sh
# - -c

View File

@@ -4,6 +4,7 @@ import (
"database/sql/driver"
"encoding/json"
"errors"
"strings"
"time"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
@@ -61,6 +62,38 @@ type LogEntry struct {
Content string
}
// IsEmpty returns true if the content for the log entry is just an empty string
func (l *LogEntry) IsEmpty() bool {
return l.Content == ""
}
// LogEntryFromLine creates a LogEntry given a line of text
// it tries to parse out a timestamp and content
func LogEntryFromLine(line *string) *LogEntry {
if line == nil {
return nil
}
if *line == "" {
return &LogEntry{Content: ""}
}
parts := strings.Split(*line, " ")
if len(parts) == 0 {
return nil
}
timestamp, err := time.Parse(time.RFC3339, parts[0])
if err != nil {
return &LogEntry{Content: *line}
}
return &LogEntry{
Timestamp: timestamp,
Content: strings.Join(parts[1:], " "),
}
}
type Metric struct {
Name string
Value float64

View File

@@ -23,6 +23,7 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/watch"
"net/http"
"regexp"
yaml2 "sigs.k8s.io/yaml"
"strconv"
"strings"
@@ -423,7 +424,6 @@ func (c *Client) createWorkflow(namespace string, workflowTemplateID uint64, wor
if opts == nil {
opts = &WorkflowExecutionOptions{}
}
if opts.Name != "" {
wf.ObjectMeta.Name = opts.Name
}
@@ -460,6 +460,22 @@ func (c *Client) createWorkflow(namespace string, workflowTemplateID uint64, wor
wf.ObjectMeta.Labels = opts.Labels
}
newParameters := make([]wfv1.Parameter, 0)
for i := range wf.Spec.Arguments.Parameters {
param := wf.Spec.Arguments.Parameters[i]
if param.Value != nil {
re, reErr := regexp.Compile(`{{\s*workflow.namespace\s*}}|{{\s*workspace.namespace\s*}}`)
if reErr != nil {
return nil, reErr
}
value := re.ReplaceAllString(*param.Value, namespace)
param.Value = &value
}
newParameters = append(newParameters, param)
}
wf.Spec.Arguments.Parameters = newParameters
if err = injectWorkflowExecutionStatusCaller(wf, wfv1.NodeRunning); err != nil {
return nil, err
}
@@ -1231,7 +1247,7 @@ func (c *Client) WatchWorkflowExecution(namespace, uid string) (<-chan *Workflow
return workflowWatcher, nil
}
func (c *Client) GetWorkflowExecutionLogs(namespace, uid, podName, containerName string) (<-chan *LogEntry, error) {
func (c *Client) GetWorkflowExecutionLogs(namespace, uid, podName, containerName string) (<-chan []*LogEntry, error) {
wf, err := c.ArgoprojV1alpha1().Workflows(namespace).Get(uid, metav1.GetOptions{})
if err != nil {
log.WithFields(log.Fields{
@@ -1337,43 +1353,58 @@ func (c *Client) GetWorkflowExecutionLogs(namespace, uid, podName, containerName
return nil, util.NewUserError(codes.NotFound, "Log not found.")
}
logWatcher := make(chan *LogEntry)
logWatcher := make(chan []*LogEntry)
go func() {
buffer := make([]byte, 4096)
reader := bufio.NewReader(stream)
newLine := true
lastChunkSent := -1
lastLine := ""
for {
bytesRead, err := reader.Read(buffer)
if err != nil && err.Error() != "EOF" {
break
}
content := string(buffer[:bytesRead])
content := lastLine + string(buffer[:bytesRead])
lastLine = ""
if newLine {
parts := strings.Split(content, " ")
if len(parts) == 0 {
logWatcher <- &LogEntry{Content: content}
} else {
timestamp, err := time.Parse(time.RFC3339, parts[0])
if err != nil {
logWatcher <- &LogEntry{Content: content}
} else {
logWatcher <- &LogEntry{
Timestamp: timestamp,
Content: strings.Join(parts[1:], " "),
}
}
chunk := make([]*LogEntry, 0)
lines := strings.Split(content, "\n")
for lineIndex, line := range lines {
if lineIndex == len(lines)-1 {
lastLine = line
continue
}
} else {
logWatcher <- &LogEntry{Content: content}
newLogEntry := LogEntryFromLine(&line)
if newLogEntry == nil {
continue
}
chunk = append(chunk, newLogEntry)
}
if lastChunkSent == 0 && lastLine != "" {
newLogEntry := LogEntryFromLine(&lastLine)
if newLogEntry != nil {
chunk = append(chunk, newLogEntry)
lastLine = ""
}
}
if len(chunk) > 0 {
logWatcher <- chunk
}
lastChunkSent = len(chunk)
if err != nil && err.Error() == "EOF" {
break
}
}
newLine = strings.Contains(content, "\n")
newLogEntry := LogEntryFromLine(&lastLine)
if newLogEntry != nil {
logWatcher <- []*LogEntry{newLogEntry}
}
close(logWatcher)

View File

@@ -472,6 +472,20 @@ func (c *Client) listWorkflowTemplateVersions(namespace, uid string) (workflowTe
}
for _, version := range dbVersions {
if version.ParametersBytes != nil {
_, err := version.LoadParametersFromBytes()
if err != nil {
return nil, err
}
updatedParameters, err := c.replaceSysNodePoolOptions(version.Parameters)
if err != nil {
return nil, err
}
version.Parameters = updatedParameters
}
newItem := WorkflowTemplate{
ID: version.WorkflowTemplate.ID,
CreatedAt: version.CreatedAt.UTC(),
@@ -482,6 +496,7 @@ func (c *Client) listWorkflowTemplateVersions(namespace, uid string) (workflowTe
IsLatest: version.IsLatest,
IsArchived: version.WorkflowTemplate.IsArchived,
Labels: version.Labels,
Parameters: version.Parameters,
}
workflowTemplateVersions = append(workflowTemplateVersions, &newItem)

View File

@@ -1,6 +1,7 @@
package v1
import (
"encoding/json"
"github.com/onepanelio/core/pkg/util/sql"
"github.com/onepanelio/core/pkg/util/types"
"time"
@@ -38,6 +39,26 @@ func WorkflowTemplateVersionsToIDs(resources []*WorkflowTemplateVersion) (ids []
return
}
// LoadParametersFromBytes loads Parameters from the WorkflowTemplateVersion's ParameterBytes field.
func (wtv *WorkflowTemplateVersion) LoadParametersFromBytes() ([]Parameter, error) {
loadedParameters := make([]Parameter, 0)
err := json.Unmarshal(wtv.ParametersBytes, &loadedParameters)
if err != nil {
return wtv.Parameters, err
}
// It might be nil because the value "null" is stored in db if there are no parameters.
// for consistency, we return an empty array.
if loadedParameters == nil {
loadedParameters = make([]Parameter, 0)
}
wtv.Parameters = loadedParameters
return wtv.Parameters, err
}
// getWorkflowTemplateVersionColumns returns all of the columns for workflow template versions modified by alias, destination.
// see formatColumnSelect
func getWorkflowTemplateVersionColumns(aliasAndDestination ...string) []string {

View File

@@ -429,7 +429,8 @@ func createStatefulSetManifest(spec *WorkspaceSpec) (statefulSetManifest string,
NodeSelector: map[string]string{
"{{workflow.parameters.sys-node-pool-label}}": "{{workflow.parameters.sys-node-pool}}",
},
Containers: spec.Containers,
ImagePullSecrets: spec.ImagePullSecrets,
Containers: spec.Containers,
Volumes: []corev1.Volume{
{
Name: "sys-dshm",
@@ -783,10 +784,11 @@ metadata:
}
workflowTemplateSpec := map[string]interface{}{
"arguments": spec.Arguments,
"entrypoint": "workspace",
"onExit": "handleExit",
"templates": templates,
"arguments": spec.Arguments,
"entrypoint": "workspace",
"onExit": "handleExit",
"templates": templates,
"imagePullSecrets": spec.ImagePullSecrets,
}
workflowTemplateSpecManifestBytes, err := yaml.Marshal(workflowTemplateSpec)

View File

@@ -57,6 +57,7 @@ type Workspace struct {
type WorkspaceSpec struct {
Arguments *Arguments `json:"arguments" protobuf:"bytes,1,opt,name=arguments"`
Containers []corev1.Container `json:"containers" protobuf:"bytes,3,opt,name=containers"`
ImagePullSecrets []corev1.LocalObjectReference `json:"imagePullSecrets"`
Ports []corev1.ServicePort `json:"ports" protobuf:"bytes,4,opt,name=ports"`
Routes []*networking.HTTPRoute `json:"routes" protobuf:"bytes,5,opt,name=routes"`
VolumeClaimTemplates []corev1.PersistentVolumeClaim `json:"volumeClaimTemplates" protobuf:"bytes,6,opt,name=volumeClaimTemplates"`
@@ -131,7 +132,7 @@ func getWorkspaceColumnsMap(camelCase bool) map[string]string {
"labels": "labels",
"name": "name",
"uid": "uid",
"namespace": "namespace",
"namespace": "namespace",
"phase": "phase",
"parameters": "parameters",
}

View File

@@ -242,16 +242,26 @@ func (s *WorkflowServer) GetWorkflowExecutionLogs(req *api.GetWorkflowExecutionL
return err
}
le := &v1.LogEntry{}
le := make([]*v1.LogEntry, 0)
for {
le = <-watcher
if le == nil {
break
}
if err := stream.Send(&api.LogEntry{
Timestamp: le.Timestamp.String(),
Content: le.Content,
apiLogEntries := make([]*api.LogEntry, len(le))
for i, item := range le {
apiLogEntries[i] = &api.LogEntry{
Content: item.Content,
}
if item.Timestamp.After(time.Time{}) {
apiLogEntries[i].Timestamp = item.Timestamp.Format(time.RFC3339)
}
}
if err := stream.Send(&api.LogStreamResponse{
LogEntries: apiLogEntries,
}); err != nil {
return err
}