Fix session log loss if session closes before latest log file split

This commit is contained in:
Ingo Oppermann
2023-11-02 20:23:25 +01:00
parent 5b81e6e23f
commit 85204b1788
2 changed files with 122 additions and 2 deletions

View File

@@ -210,6 +210,8 @@ func (r *registry) sessionPersister(pattern *strftime.Strftime, bufferDuration t
ticker := time.NewTicker(bufferDuration) ticker := time.NewTicker(bufferDuration)
defer ticker.Stop() defer ticker.Stop()
splitTime := time.Time{}
loop: loop:
for { for {
select { select {
@@ -218,7 +220,7 @@ loop:
break loop break loop
} }
currentPath := pattern.FormatString(session.ClosedAt) currentPath := pattern.FormatString(session.ClosedAt)
if currentPath != path { if currentPath != path && session.ClosedAt.After(splitTime) {
if buffer.Len() > 0 { if buffer.Len() > 0 {
_, _, err := r.persist.fs.WriteFileSafe(path, buffer.Bytes()) _, _, err := r.persist.fs.WriteFileSafe(path, buffer.Bytes())
if err != nil { if err != nil {
@@ -231,6 +233,7 @@ loop:
"current": currentPath, "current": currentPath,
}).Log("Creating new session log file") }).Log("Creating new session log file")
path = currentPath path = currentPath
splitTime = session.ClosedAt
} }
enc.Encode(&session) enc.Encode(&session)
@@ -244,13 +247,14 @@ loop:
} }
} }
currentPath := pattern.FormatString(t) currentPath := pattern.FormatString(t)
if currentPath != path { if currentPath != path && t.After(splitTime) {
buffer.Reset() buffer.Reset()
r.logger.Info().WithFields(log.Fields{ r.logger.Info().WithFields(log.Fields{
"previous": path, "previous": path,
"current": currentPath, "current": currentPath,
}).Log("Creating new session log file") }).Log("Creating new session log file")
path = currentPath path = currentPath
splitTime = t
} }
} }
} }

View File

@@ -1,7 +1,9 @@
package session package session
import ( import (
"io"
"strconv" "strconv"
"strings"
"testing" "testing"
"time" "time"
@@ -325,6 +327,120 @@ func TestPersistSession(t *testing.T) {
require.Greater(t, info.Size(), int64(0)) require.Greater(t, info.Size(), int64(0))
} }
func TestPersistSessionDelayed(t *testing.T) {
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
require.NoError(t, err)
pattern := "/log/%Y-%m-%d-%H.log"
r, err := New(Config{
PersistFS: memfs,
LogPattern: pattern,
LogBufferDuration: 5 * time.Second,
})
require.NoError(t, err)
t.Cleanup(func() {
r.Close()
})
c, err := r.Register("foobar", CollectorConfig{
SessionTimeout: 3 * time.Second,
})
require.NoError(t, err)
ce, ok := c.(*collector)
require.True(t, ok)
start, err := time.Parse(time.RFC3339, "2023-11-01T15:04:00Z")
require.NoError(t, err)
ce.sessionsCh <- Session{
Collector: "foobar",
CreatedAt: start,
ClosedAt: start.Add(5 * time.Minute),
}
ce.sessionsCh <- Session{
Collector: "foobar",
CreatedAt: start,
ClosedAt: start.Add(6 * time.Minute),
}
ce.sessionsCh <- Session{
Collector: "foobar",
CreatedAt: start,
ClosedAt: start.Add(7 * time.Minute),
}
ce.sessionsCh <- Session{
Collector: "foobar",
CreatedAt: start,
ClosedAt: start.Add(1 * time.Hour),
}
// late entry
ce.sessionsCh <- Session{
Collector: "foobar",
CreatedAt: start,
ClosedAt: start.Add(8 * time.Minute),
}
ce.sessionsCh <- Session{
Collector: "foobar",
CreatedAt: start,
ClosedAt: start.Add(61 * time.Minute),
}
require.Eventually(t, func() bool {
path, err := strftime.Format(pattern, start)
if err != nil {
return false
}
_, err = memfs.Stat(path)
return err == nil
}, 10*time.Second, time.Second)
path, err := strftime.Format(pattern, start)
require.NoError(t, err)
file := memfs.Open(path)
require.NotNil(t, file)
data, err := io.ReadAll(file)
require.NoError(t, err)
file.Close()
lines := strings.Split(string(data), "\n")
require.Equal(t, 3, len(lines)-1)
require.Eventually(t, func() bool {
path, err := strftime.Format(pattern, start.Add(1*time.Hour))
if err != nil {
return false
}
_, err = memfs.Stat(path)
return err == nil
}, 10*time.Second, time.Second)
path, err = strftime.Format(pattern, start.Add(1*time.Hour))
require.NoError(t, err)
file = memfs.Open(path)
require.NotNil(t, file)
data, err = io.ReadAll(file)
require.NoError(t, err)
file.Close()
lines = strings.Split(string(data), "\n")
require.Equal(t, 3, len(lines)-1)
err = r.Unregister("foobar")
require.NoError(t, err)
}
func TestPersistSessionSlpit(t *testing.T) { func TestPersistSessionSlpit(t *testing.T) {
memfs, err := fs.NewMemFilesystem(fs.MemConfig{}) memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
require.NoError(t, err) require.NoError(t, err)