fix: issues with finished logs. Content was repeating, resulting in a large data load - making the stream fail.

No timestamps were being returned as timestamps for time 0.
Occasionally lines would cut off across multiple lines.
This commit is contained in:
Andrey Melnikov
2020-12-31 11:07:39 -08:00
parent 69c523ee23
commit 080624d9e2
2 changed files with 38 additions and 6 deletions

View File

@@ -1342,16 +1342,22 @@ func (c *Client) GetWorkflowExecutionLogs(namespace, uid, podName, containerName
buffer := make([]byte, 4096) buffer := make([]byte, 4096)
reader := bufio.NewReader(stream) reader := bufio.NewReader(stream)
lastLine := ""
for { for {
bytesRead, err := reader.Read(buffer) bytesRead, err := reader.Read(buffer)
if err != nil && err.Error() != "EOF" { if err != nil && err.Error() != "EOF" {
break break
} }
content := string(buffer[:bytesRead]) content := lastLine + string(buffer[:bytesRead])
lastLine = ""
chunk := make([]*LogEntry, 0) chunk := make([]*LogEntry, 0)
lines := strings.Split(content, "\n") lines := strings.Split(content, "\n")
for _, line := range lines { for lineIndex, line := range lines {
if lineIndex == len(lines)-1 {
lastLine = line
continue
}
if line == "" { if line == "" {
continue continue
} }
@@ -1361,7 +1367,7 @@ func (c *Client) GetWorkflowExecutionLogs(namespace, uid, podName, containerName
} }
timestamp, err := time.Parse(time.RFC3339, parts[0]) timestamp, err := time.Parse(time.RFC3339, parts[0])
if err != nil { if err != nil {
chunk = append(chunk, &LogEntry{Content: content}) chunk = append(chunk, &LogEntry{Content: line})
} else { } else {
chunk = append(chunk, &LogEntry{ chunk = append(chunk, &LogEntry{
Timestamp: timestamp, Timestamp: timestamp,
@@ -1377,6 +1383,29 @@ func (c *Client) GetWorkflowExecutionLogs(namespace, uid, podName, containerName
} }
} }
if lastLine != "" {
logWatcher <- []*LogEntry{
{
Content: lastLine,
},
}
parts := strings.Split(lastLine, " ")
if len(parts) != 0 {
timestamp, err := time.Parse(time.RFC3339, parts[0])
if err != nil {
logWatcher <- []*LogEntry{{Content: lastLine}}
} else {
logWatcher <- []*LogEntry{
{
Timestamp: timestamp,
Content: strings.Join(parts[1:], " "),
},
}
}
}
}
close(logWatcher) close(logWatcher)
}() }()

View File

@@ -242,7 +242,7 @@ func (s *WorkflowServer) GetWorkflowExecutionLogs(req *api.GetWorkflowExecutionL
return err return err
} }
le := []*v1.LogEntry{} le := make([]*v1.LogEntry, 0)
for { for {
le = <-watcher le = <-watcher
if le == nil { if le == nil {
@@ -252,8 +252,11 @@ func (s *WorkflowServer) GetWorkflowExecutionLogs(req *api.GetWorkflowExecutionL
apiLogEntries := make([]*api.LogEntry, len(le)) apiLogEntries := make([]*api.LogEntry, len(le))
for i, item := range le { for i, item := range le {
apiLogEntries[i] = &api.LogEntry{ apiLogEntries[i] = &api.LogEntry{
Timestamp: item.Timestamp.Format(time.RFC3339), Content: item.Content,
Content: item.Content, }
if item.Timestamp.After(time.Time{}) {
apiLogEntries[i].Timestamp = item.Timestamp.Format(time.RFC3339)
} }
} }