diff --git a/pkg/types.go b/pkg/types.go index 072d33d..8239076 100644 --- a/pkg/types.go +++ b/pkg/types.go @@ -4,6 +4,7 @@ import ( "database/sql/driver" "encoding/json" "errors" + "strings" "time" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" @@ -61,6 +62,38 @@ type LogEntry struct { Content string } +// IsEmpty returns true if the content for the log entry is just an empty string +func (l *LogEntry) IsEmpty() bool { + return l.Content == "" +} + +// LogEntryFromLine creates a LogEntry given a line of text +// it tries to parse out a timestamp and content +func LogEntryFromLine(line *string) *LogEntry { + if line == nil { + return nil + } + + if *line == "" { + return &LogEntry{Content: ""} + } + + parts := strings.Split(*line, " ") + if len(parts) == 0 { + return nil + } + + timestamp, err := time.Parse(time.RFC3339, parts[0]) + if err != nil { + return &LogEntry{Content: *line} + } else { + return &LogEntry{ + Timestamp: timestamp, + Content: strings.Join(parts[1:], " "), + } + } +} + type Metric struct { Name string Value float64 diff --git a/pkg/workflow_execution.go b/pkg/workflow_execution.go index 1d8f546..9d826fa 100644 --- a/pkg/workflow_execution.go +++ b/pkg/workflow_execution.go @@ -1342,6 +1342,7 @@ func (c *Client) GetWorkflowExecutionLogs(namespace, uid, podName, containerName buffer := make([]byte, 4096) reader := bufio.NewReader(stream) + lastChunkSent := -1 lastLine := "" for { bytesRead, err := reader.Read(buffer) @@ -1358,52 +1359,36 @@ func (c *Client) GetWorkflowExecutionLogs(namespace, uid, podName, containerName lastLine = line continue } - if line == "" { + + newLogEntry := LogEntryFromLine(&line) + if newLogEntry == nil { continue } - parts := strings.Split(line, " ") - if len(parts) == 0 { - continue - } - timestamp, err := time.Parse(time.RFC3339, parts[0]) - if err != nil { - chunk = append(chunk, &LogEntry{Content: line}) - } else { - chunk = append(chunk, &LogEntry{ - Timestamp: timestamp, - Content: strings.Join(parts[1:], " "), - }) + + chunk = append(chunk, newLogEntry) + } + + if lastChunkSent == 0 && lastLine != "" { + newLogEntry := LogEntryFromLine(&lastLine) + if newLogEntry != nil { + chunk = append(chunk, newLogEntry) + lastLine = "" } } - logWatcher <- chunk + if len(chunk) > 0 { + logWatcher <- chunk + } + lastChunkSent = len(chunk) if err != nil && err.Error() == "EOF" { break } } - if lastLine != "" { - logWatcher <- []*LogEntry{ - { - Content: lastLine, - }, - } - - parts := strings.Split(lastLine, " ") - if len(parts) != 0 { - timestamp, err := time.Parse(time.RFC3339, parts[0]) - if err != nil { - logWatcher <- []*LogEntry{{Content: lastLine}} - } else { - logWatcher <- []*LogEntry{ - { - Timestamp: timestamp, - Content: strings.Join(parts[1:], " "), - }, - } - } - } + newLogEntry := LogEntryFromLine(&lastLine) + if newLogEntry != nil { + logWatcher <- []*LogEntry{newLogEntry} } close(logWatcher)