From 080624d9e283982d68e727a5e458ce285afd88f8 Mon Sep 17 00:00:00 2001 From: Andrey Melnikov Date: Thu, 31 Dec 2020 11:07:39 -0800 Subject: [PATCH] fix: issues with finished logs. Content was repeating, resulting in a large data load - making the stream fail. No timestamps were being returned as timestamps for time 0. Occasionally lines would cut off across multiple lines. --- pkg/workflow_execution.go | 35 ++++++++++++++++++++++++++++++++--- server/workflow_server.go | 9 ++++++--- 2 files changed, 38 insertions(+), 6 deletions(-) diff --git a/pkg/workflow_execution.go b/pkg/workflow_execution.go index 289ad02..1d8f546 100644 --- a/pkg/workflow_execution.go +++ b/pkg/workflow_execution.go @@ -1342,16 +1342,22 @@ func (c *Client) GetWorkflowExecutionLogs(namespace, uid, podName, containerName buffer := make([]byte, 4096) reader := bufio.NewReader(stream) + lastLine := "" for { bytesRead, err := reader.Read(buffer) if err != nil && err.Error() != "EOF" { break } - content := string(buffer[:bytesRead]) + content := lastLine + string(buffer[:bytesRead]) + lastLine = "" chunk := make([]*LogEntry, 0) lines := strings.Split(content, "\n") - for _, line := range lines { + for lineIndex, line := range lines { + if lineIndex == len(lines)-1 { + lastLine = line + continue + } if line == "" { continue } @@ -1361,7 +1367,7 @@ func (c *Client) GetWorkflowExecutionLogs(namespace, uid, podName, containerName } timestamp, err := time.Parse(time.RFC3339, parts[0]) if err != nil { - chunk = append(chunk, &LogEntry{Content: content}) + chunk = append(chunk, &LogEntry{Content: line}) } else { chunk = append(chunk, &LogEntry{ Timestamp: timestamp, @@ -1377,6 +1383,29 @@ func (c *Client) GetWorkflowExecutionLogs(namespace, uid, podName, containerName } } + if lastLine != "" { + logWatcher <- []*LogEntry{ + { + Content: lastLine, + }, + } + + parts := strings.Split(lastLine, " ") + if len(parts) != 0 { + timestamp, err := time.Parse(time.RFC3339, parts[0]) + if err != nil { + logWatcher <- []*LogEntry{{Content: lastLine}} + } else { + logWatcher <- []*LogEntry{ + { + Timestamp: timestamp, + Content: strings.Join(parts[1:], " "), + }, + } + } + } + } + close(logWatcher) }() diff --git a/server/workflow_server.go b/server/workflow_server.go index b1b4a2a..62d4ff0 100644 --- a/server/workflow_server.go +++ b/server/workflow_server.go @@ -242,7 +242,7 @@ func (s *WorkflowServer) GetWorkflowExecutionLogs(req *api.GetWorkflowExecutionL return err } - le := []*v1.LogEntry{} + le := make([]*v1.LogEntry, 0) for { le = <-watcher if le == nil { @@ -252,8 +252,11 @@ func (s *WorkflowServer) GetWorkflowExecutionLogs(req *api.GetWorkflowExecutionL apiLogEntries := make([]*api.LogEntry, len(le)) for i, item := range le { apiLogEntries[i] = &api.LogEntry{ - Timestamp: item.Timestamp.Format(time.RFC3339), - Content: item.Content, + Content: item.Content, + } + + if item.Timestamp.After(time.Time{}) { + apiLogEntries[i].Timestamp = item.Timestamp.Format(time.RFC3339) } }