fix: issue where logs would sometimes accumulate because there was no newline in the content.

This commit is contained in:
Andrey Melnikov
2020-12-31 23:23:13 -08:00
parent 080624d9e2
commit a8f5cde75e
2 changed files with 53 additions and 35 deletions

View File

@@ -4,6 +4,7 @@ import (
"database/sql/driver" "database/sql/driver"
"encoding/json" "encoding/json"
"errors" "errors"
"strings"
"time" "time"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
@@ -61,6 +62,38 @@ type LogEntry struct {
Content string Content string
} }
// IsEmpty returns true if the content for the log entry is just an empty string
func (l *LogEntry) IsEmpty() bool {
return l.Content == ""
}
// LogEntryFromLine creates a LogEntry given a line of text
// it tries to parse out a timestamp and content
func LogEntryFromLine(line *string) *LogEntry {
if line == nil {
return nil
}
if *line == "" {
return &LogEntry{Content: ""}
}
parts := strings.Split(*line, " ")
if len(parts) == 0 {
return nil
}
timestamp, err := time.Parse(time.RFC3339, parts[0])
if err != nil {
return &LogEntry{Content: *line}
} else {
return &LogEntry{
Timestamp: timestamp,
Content: strings.Join(parts[1:], " "),
}
}
}
type Metric struct { type Metric struct {
Name string Name string
Value float64 Value float64

View File

@@ -1342,6 +1342,7 @@ func (c *Client) GetWorkflowExecutionLogs(namespace, uid, podName, containerName
buffer := make([]byte, 4096) buffer := make([]byte, 4096)
reader := bufio.NewReader(stream) reader := bufio.NewReader(stream)
lastChunkSent := -1
lastLine := "" lastLine := ""
for { for {
bytesRead, err := reader.Read(buffer) bytesRead, err := reader.Read(buffer)
@@ -1358,52 +1359,36 @@ func (c *Client) GetWorkflowExecutionLogs(namespace, uid, podName, containerName
lastLine = line lastLine = line
continue continue
} }
if line == "" {
newLogEntry := LogEntryFromLine(&line)
if newLogEntry == nil {
continue continue
} }
parts := strings.Split(line, " ")
if len(parts) == 0 { chunk = append(chunk, newLogEntry)
continue }
}
timestamp, err := time.Parse(time.RFC3339, parts[0]) if lastChunkSent == 0 && lastLine != "" {
if err != nil { newLogEntry := LogEntryFromLine(&lastLine)
chunk = append(chunk, &LogEntry{Content: line}) if newLogEntry != nil {
} else { chunk = append(chunk, newLogEntry)
chunk = append(chunk, &LogEntry{ lastLine = ""
Timestamp: timestamp,
Content: strings.Join(parts[1:], " "),
})
} }
} }
logWatcher <- chunk if len(chunk) > 0 {
logWatcher <- chunk
}
lastChunkSent = len(chunk)
if err != nil && err.Error() == "EOF" { if err != nil && err.Error() == "EOF" {
break break
} }
} }
if lastLine != "" { newLogEntry := LogEntryFromLine(&lastLine)
logWatcher <- []*LogEntry{ if newLogEntry != nil {
{ logWatcher <- []*LogEntry{newLogEntry}
Content: lastLine,
},
}
parts := strings.Split(lastLine, " ")
if len(parts) != 0 {
timestamp, err := time.Parse(time.RFC3339, parts[0])
if err != nil {
logWatcher <- []*LogEntry{{Content: lastLine}}
} else {
logWatcher <- []*LogEntry{
{
Timestamp: timestamp,
Content: strings.Join(parts[1:], " "),
},
}
}
}
} }
close(logWatcher) close(logWatcher)