From 85204b1788377ed01b5fd02e9bbd00ded43c861d Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Thu, 2 Nov 2023 20:23:25 +0100 Subject: [PATCH] Fix session log loss if session closes before latest log file split --- session/registry.go | 8 ++- session/registry_test.go | 116 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 122 insertions(+), 2 deletions(-) diff --git a/session/registry.go b/session/registry.go index 5ee74686..03353e07 100644 --- a/session/registry.go +++ b/session/registry.go @@ -210,6 +210,8 @@ func (r *registry) sessionPersister(pattern *strftime.Strftime, bufferDuration t ticker := time.NewTicker(bufferDuration) defer ticker.Stop() + splitTime := time.Time{} + loop: for { select { @@ -218,7 +220,7 @@ loop: break loop } currentPath := pattern.FormatString(session.ClosedAt) - if currentPath != path { + if currentPath != path && session.ClosedAt.After(splitTime) { if buffer.Len() > 0 { _, _, err := r.persist.fs.WriteFileSafe(path, buffer.Bytes()) if err != nil { @@ -231,6 +233,7 @@ loop: "current": currentPath, }).Log("Creating new session log file") path = currentPath + splitTime = session.ClosedAt } enc.Encode(&session) @@ -244,13 +247,14 @@ loop: } } currentPath := pattern.FormatString(t) - if currentPath != path { + if currentPath != path && t.After(splitTime) { buffer.Reset() r.logger.Info().WithFields(log.Fields{ "previous": path, "current": currentPath, }).Log("Creating new session log file") path = currentPath + splitTime = t } } } diff --git a/session/registry_test.go b/session/registry_test.go index fbe9a089..7b1d987d 100644 --- a/session/registry_test.go +++ b/session/registry_test.go @@ -1,7 +1,9 @@ package session import ( + "io" "strconv" + "strings" "testing" "time" @@ -325,6 +327,120 @@ func TestPersistSession(t *testing.T) { require.Greater(t, info.Size(), int64(0)) } +func TestPersistSessionDelayed(t *testing.T) { + memfs, err := fs.NewMemFilesystem(fs.MemConfig{}) + require.NoError(t, err) + + pattern := "/log/%Y-%m-%d-%H.log" + + r, err := New(Config{ + PersistFS: memfs, + LogPattern: pattern, + LogBufferDuration: 5 * time.Second, + }) + require.NoError(t, err) + t.Cleanup(func() { + r.Close() + }) + + c, err := r.Register("foobar", CollectorConfig{ + SessionTimeout: 3 * time.Second, + }) + require.NoError(t, err) + + ce, ok := c.(*collector) + require.True(t, ok) + + start, err := time.Parse(time.RFC3339, "2023-11-01T15:04:00Z") + require.NoError(t, err) + + ce.sessionsCh <- Session{ + Collector: "foobar", + CreatedAt: start, + ClosedAt: start.Add(5 * time.Minute), + } + + ce.sessionsCh <- Session{ + Collector: "foobar", + CreatedAt: start, + ClosedAt: start.Add(6 * time.Minute), + } + + ce.sessionsCh <- Session{ + Collector: "foobar", + CreatedAt: start, + ClosedAt: start.Add(7 * time.Minute), + } + + ce.sessionsCh <- Session{ + Collector: "foobar", + CreatedAt: start, + ClosedAt: start.Add(1 * time.Hour), + } + + // late entry + ce.sessionsCh <- Session{ + Collector: "foobar", + CreatedAt: start, + ClosedAt: start.Add(8 * time.Minute), + } + + ce.sessionsCh <- Session{ + Collector: "foobar", + CreatedAt: start, + ClosedAt: start.Add(61 * time.Minute), + } + + require.Eventually(t, func() bool { + path, err := strftime.Format(pattern, start) + if err != nil { + return false + } + _, err = memfs.Stat(path) + return err == nil + }, 10*time.Second, time.Second) + + path, err := strftime.Format(pattern, start) + require.NoError(t, err) + + file := memfs.Open(path) + require.NotNil(t, file) + + data, err := io.ReadAll(file) + require.NoError(t, err) + + file.Close() + + lines := strings.Split(string(data), "\n") + require.Equal(t, 3, len(lines)-1) + + require.Eventually(t, func() bool { + path, err := strftime.Format(pattern, start.Add(1*time.Hour)) + if err != nil { + return false + } + _, err = memfs.Stat(path) + return err == nil + }, 10*time.Second, time.Second) + + path, err = strftime.Format(pattern, start.Add(1*time.Hour)) + require.NoError(t, err) + + file = memfs.Open(path) + require.NotNil(t, file) + + data, err = io.ReadAll(file) + require.NoError(t, err) + + file.Close() + + lines = strings.Split(string(data), "\n") + require.Equal(t, 3, len(lines)-1) + + err = r.Unregister("foobar") + require.NoError(t, err) +} + func TestPersistSessionSlpit(t *testing.T) { memfs, err := fs.NewMemFilesystem(fs.MemConfig{}) require.NoError(t, err)