diff --git a/app/api/api.go b/app/api/api.go index f2f8d30e..cf7e7169 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -339,7 +339,10 @@ func (a *api) start() error { if cfg.Sessions.Enable { sessionConfig := session.Config{ - Logger: a.log.logger.core.WithComponent("Session"), + PersistInterval: time.Duration(cfg.Sessions.PersistInterval) * time.Second, + LogPattern: cfg.Sessions.SessionLogPathPattern, + LogBufferDuration: time.Duration(cfg.Sessions.SessionLogBuffer) * time.Second, + Logger: a.log.logger.core.WithComponent("Session"), } if cfg.Sessions.Persist { @@ -371,7 +374,6 @@ func (a *api) start() error { MaxSessions: cfg.Sessions.MaxSessions, InactiveTimeout: 5 * time.Second, SessionTimeout: time.Duration(cfg.Sessions.SessionTimeout) * time.Second, - PersistInterval: time.Duration(cfg.Sessions.PersistInterval) * time.Second, Limiter: iplimiter, } @@ -1823,6 +1825,7 @@ func (a *api) stop() { // Stop the session tracker if a.sessions != nil { a.sessions.UnregisterAll() + a.sessions.Close() a.sessions = nil } diff --git a/config/config.go b/config/config.go index a899276a..e2f72a40 100644 --- a/config/config.go +++ b/config/config.go @@ -260,10 +260,12 @@ func (d *Config) init() { d.vars.Register(value.NewInt64(&d.Metrics.Interval, 2), "metrics.interval_seconds", "CORE_METRICS_INTERVAL_SECONDS", nil, "Interval for collecting metrics", false, false) // Sessions - d.vars.Register(value.NewBool(&d.Sessions.Enable, true), "sessions.enable", "CORE_SESSIONS_ENABLE", nil, "Enable collecting HLS session stats for /memfs", false, false) + d.vars.Register(value.NewBool(&d.Sessions.Enable, true), "sessions.enable", "CORE_SESSIONS_ENABLE", nil, "Enable collecting session stats", false, false) d.vars.Register(value.NewCIDRList(&d.Sessions.IPIgnoreList, []string{"127.0.0.1/32", "::1/128"}, ","), "sessions.ip_ignorelist", "CORE_SESSIONS_IP_IGNORELIST", nil, "List of IP ranges in CIDR notation to ignore", false, false) d.vars.Register(value.NewInt(&d.Sessions.SessionTimeout, 30), "sessions.session_timeout_sec", "CORE_SESSIONS_SESSION_TIMEOUT_SEC", nil, "Timeout for an idle session", false, false) - d.vars.Register(value.NewBool(&d.Sessions.Persist, false), "sessions.persist", "CORE_SESSIONS_PERSIST", nil, "Whether to persist session history. Will be stored as sessions.json in db.dir", false, false) + d.vars.Register(value.NewStrftime(&d.Sessions.SessionLogPathPattern, ""), "sessions.session_log_path_pattern", "CORE_SESSIONS_LOG_PATH_PATTERN", nil, "Path to where the sessions will be logged, may contain strftime-patterns, leave empty for no session logging, persist must be enabled", false, false) + d.vars.Register(value.NewInt(&d.Sessions.SessionLogBuffer, 15), "sessions.session_log_buffer_sec", "CORE_SESSIONS_SESSION_LOG_BUFFER_SEC", nil, "Maximum duration to buffer session logs in memory before persisting on disk", false, false) + d.vars.Register(value.NewBool(&d.Sessions.Persist, false), "sessions.persist", "CORE_SESSIONS_PERSIST", nil, "Whether to persist session history. Will be stored in /sessions/[collector].json in db.dir", false, false) d.vars.Register(value.NewInt(&d.Sessions.PersistInterval, 300), "sessions.persist_interval_sec", "CORE_SESSIONS_PERSIST_INTERVAL_SEC", nil, "Interval in seconds in which to persist the current session history", false, false) d.vars.Register(value.NewUint64(&d.Sessions.MaxBitrate, 0), "sessions.max_bitrate_mbit", "CORE_SESSIONS_MAXBITRATE_MBIT", nil, "Max. allowed outgoing bitrate in mbit/s, 0 for unlimited", false, false) d.vars.Register(value.NewUint64(&d.Sessions.MaxSessions, 0), "sessions.max_sessions", "CORE_SESSIONS_MAX_SESSIONS", []string{"CORE_SESSIONS_MAXSESSIONS"}, "Max. allowed number of simultaneous sessions, 0 for unlimited", false, false) diff --git a/config/data.go b/config/data.go index e1d2393e..c1320372 100644 --- a/config/data.go +++ b/config/data.go @@ -150,13 +150,15 @@ type Data struct { Interval int64 `json:"interval_sec" format:"int64"` // seconds } `json:"metrics"` Sessions struct { - Enable bool `json:"enable"` - IPIgnoreList []string `json:"ip_ignorelist"` - SessionTimeout int `json:"session_timeout_sec" format:"int"` - Persist bool `json:"persist"` - PersistInterval int `json:"persist_interval_sec" format:"int"` - MaxBitrate uint64 `json:"max_bitrate_mbit" format:"uint64"` - MaxSessions uint64 `json:"max_sessions" format:"uint64"` + Enable bool `json:"enable"` + IPIgnoreList []string `json:"ip_ignorelist"` + Persist bool `json:"persist"` + PersistInterval int `json:"persist_interval_sec" format:"int"` + SessionTimeout int `json:"session_timeout_sec" format:"int"` + SessionLogPathPattern string `json:"session_log_path_pattern"` + SessionLogBuffer int `json:"session_log_buffer_sec" format:"int"` + MaxBitrate uint64 `json:"max_bitrate_mbit" format:"uint64"` + MaxSessions uint64 `json:"max_sessions" format:"uint64"` } `json:"sessions"` Service struct { Enable bool `json:"enable"` @@ -207,7 +209,6 @@ func MergeV2toV3(data *Data, d *v2.Data) (*Data, error) { data.SRT = d.SRT data.Playout = d.Playout data.Metrics = d.Metrics - data.Sessions = d.Sessions data.Service = d.Service data.Router = d.Router @@ -233,7 +234,13 @@ func MergeV2toV3(data *Data, d *v2.Data) (*Data, error) { data.FFmpeg.Log.MaxLines = d.FFmpeg.Log.MaxLines data.FFmpeg.Log.MaxHistory = d.FFmpeg.Log.MaxHistory + data.Sessions.Enable = d.Sessions.Enable data.Sessions.IPIgnoreList = copy.Slice(d.Sessions.IPIgnoreList) + data.Sessions.SessionTimeout = d.Sessions.SessionTimeout + data.Sessions.Persist = d.Sessions.Persist + data.Sessions.PersistInterval = d.Sessions.PersistInterval + data.Sessions.MaxBitrate = d.Sessions.MaxBitrate + data.Sessions.MaxSessions = d.Sessions.MaxSessions data.SRT.Log.Topics = copy.Slice(d.SRT.Log.Topics) @@ -295,7 +302,6 @@ func DowngradeV3toV2(d *Data) (*v2.Data, error) { data.SRT = d.SRT data.Playout = d.Playout data.Metrics = d.Metrics - data.Sessions = d.Sessions data.Service = d.Service data.Router = d.Router @@ -321,7 +327,13 @@ func DowngradeV3toV2(d *Data) (*v2.Data, error) { data.FFmpeg.Log.MaxLines = d.FFmpeg.Log.MaxLines data.FFmpeg.Log.MaxHistory = d.FFmpeg.Log.MaxHistory + data.Sessions.Enable = d.Sessions.Enable data.Sessions.IPIgnoreList = copy.Slice(d.Sessions.IPIgnoreList) + data.Sessions.SessionTimeout = d.Sessions.SessionTimeout + data.Sessions.Persist = d.Sessions.Persist + data.Sessions.PersistInterval = d.Sessions.PersistInterval + data.Sessions.MaxBitrate = d.Sessions.MaxBitrate + data.Sessions.MaxSessions = d.Sessions.MaxSessions data.SRT.Log.Topics = copy.Slice(d.SRT.Log.Topics) diff --git a/config/value/time.go b/config/value/time.go index 7fe3fa71..88eb627b 100644 --- a/config/value/time.go +++ b/config/value/time.go @@ -1,6 +1,10 @@ package value -import "time" +import ( + "time" + + "github.com/lestrrat-go/strftime" +) // time @@ -34,3 +38,31 @@ func (u *Time) IsEmpty() bool { v := time.Time(*u) return v.IsZero() } + +// strftime + +type Strftime string + +func NewStrftime(p *string, val string) *Strftime { + *p = val + + return (*Strftime)(p) +} + +func (s *Strftime) Set(val string) error { + *s = Strftime(val) + return nil +} + +func (s *Strftime) String() string { + return string(*s) +} + +func (s *Strftime) Validate() error { + _, err := strftime.New(string(*s)) + return err +} + +func (s *Strftime) IsEmpty() bool { + return len(string(*s)) == 0 +} diff --git a/config/value/time_test.go b/config/value/time_test.go index 3259d7d2..c6f6cf1a 100644 --- a/config/value/time_test.go +++ b/config/value/time_test.go @@ -28,3 +28,23 @@ func TestTimeValue(t *testing.T) { require.Equal(t, time.Time(time.Date(2009, time.November, 11, 23, 0, 0, 0, time.UTC)), x) } + +func TestStrftimeValue(t *testing.T) { + var x string + + val := NewStrftime(&x, "%Y-%m-%d.log") + + require.Equal(t, "%Y-%m-%d.log", val.String()) + require.Equal(t, nil, val.Validate()) + require.Equal(t, false, val.IsEmpty()) + + x = "%Y-%m-%d-%H:%M:%S.log" + + require.Equal(t, "%Y-%m-%d-%H:%M:%S.log", val.String()) + require.Equal(t, nil, val.Validate()) + require.Equal(t, false, val.IsEmpty()) + + val.Set("bla.log") + + require.Equal(t, "bla.log", x) +} diff --git a/go.mod b/go.mod index 6a132c80..a7d284a1 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/atrox/haikunatorgo/v2 v2.0.1 github.com/caddyserver/certmagic v0.18.0 github.com/casbin/casbin/v2 v2.71.1 - github.com/datarhei/core-client-go/v16 v16.11.1-0.20230614141756-a25a5fc3c60e + github.com/datarhei/core-client-go/v16 v16.11.1-0.20230620131644-140b3a61d4c8 github.com/datarhei/gosrt v0.5.2 github.com/datarhei/joy4 v0.0.0-20230505074825-fde05957445a github.com/fujiwara/shapeio v1.0.0 diff --git a/go.sum b/go.sum index 1548a01b..6829c35a 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,8 @@ github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE= github.com/Masterminds/semver/v3 v3.2.1 h1:RN9w6+7QoMeJVGyfmbcgs28Br8cvmnucEXnY0rYXWg0= github.com/Masterminds/semver/v3 v3.2.1/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ= +github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= +github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/adhocore/gronx v1.6.3 h1:bnm5vieTrY3QQPpsfB0hrAaeaHDpuZTUC2LLCVMLe9c= github.com/adhocore/gronx v1.6.3/go.mod h1:7oUY1WAU8rEJWmAxXR2DN0JaO4gi9khSgKjiRypqteg= github.com/agnivade/levenshtein v1.1.1 h1:QY8M92nrzkmr798gCo3kmMyqXFzdQVpxLlGPRBij0P8= @@ -48,6 +50,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46t github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/datarhei/core-client-go/v16 v16.11.1-0.20230614141756-a25a5fc3c60e h1:iQKqGTyIdCyO7kY/G5MCKhzt3xZ5YPRubbJskVp5EvQ= github.com/datarhei/core-client-go/v16 v16.11.1-0.20230614141756-a25a5fc3c60e/go.mod h1:6L0zr/NUwvaPsCTK/IL17m8JUEtgLp3BDtlsBREwacg= +github.com/datarhei/core-client-go/v16 v16.11.1-0.20230620131644-140b3a61d4c8 h1:CILOzUB7CJGHtZHOxMJn+dN6rKzH29TOOOOep0AnFWM= +github.com/datarhei/core-client-go/v16 v16.11.1-0.20230620131644-140b3a61d4c8/go.mod h1:6L0zr/NUwvaPsCTK/IL17m8JUEtgLp3BDtlsBREwacg= github.com/datarhei/gosrt v0.5.2 h1:eagqZwEIiGPNJW0rLep3gwceObyaZ17+iKRc+l4VEpc= github.com/datarhei/gosrt v0.5.2/go.mod h1:0308GQhAu5hxe2KYdbss901aKceSSKXnwCr8Vs++eiw= github.com/datarhei/joy4 v0.0.0-20230505074825-fde05957445a h1:Tf4DSHY1xruBglr+yYP5Wct7czM86GKMYgbXH8a7OFo= @@ -71,6 +75,7 @@ github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2 github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= +github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= @@ -154,11 +159,13 @@ github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= +github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/klauspost/compress v1.16.6 h1:91SKEy4K37vkp255cJ8QesJhjyRO0hn9i9G0GoUwLsk= github.com/klauspost/compress v1.16.6/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= @@ -228,6 +235,7 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= @@ -279,6 +287,7 @@ github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFt github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU= github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= +github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= diff --git a/session/collector.go b/session/collector.go index 5598007d..a3ac6de1 100644 --- a/session/collector.go +++ b/session/collector.go @@ -1,13 +1,12 @@ package session import ( - "context" "encoding/json" + "io" "sort" "sync" "time" - "github.com/datarhei/core/v16/io/fs" "github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/net" @@ -16,10 +15,11 @@ import ( // Session represents an active session type Session struct { + Collector string ID string Reference string CreatedAt time.Time - ClosesAt time.Time + ClosedAt time.Time Location string Peer string Extra map[string]interface{} @@ -167,6 +167,12 @@ type Collector interface { // Stop stops the collector to calculate rates Stop() + + // Snapshot returns the current snapshot of the history + Snapshot() (Snapshot, error) + + // Restore restores a previously made snapshot + Restore(snapshot io.ReadCloser) error } // CollectorConfig is the configuration for registering a new collector @@ -195,11 +201,6 @@ type CollectorConfig struct { // SessionTimeout is the duration of how long an idle active session is kept. A // session is idle if there are no ingress or egress bytes. SessionTimeout time.Duration - - // PersistInterval is the duration between persisting the - // history. Can be 0. Then the history will only be persisted - // at stopping the collector. - PersistInterval time.Duration } type totals struct { @@ -212,7 +213,7 @@ type totals struct { } type history struct { - Sessions map[string]totals `json:"sessions"` // key = `${session.location}:${session.peer}` + Sessions map[string]totals `json:"sessions"` // key = `${session.location}:${session.peer}:${session.reference}` } type collector struct { @@ -222,6 +223,7 @@ type collector struct { sessions map[string]*session sessionPool sync.Pool sessionsWG sync.WaitGroup + sessionsCh chan<- Session staleCallback func(*session) @@ -241,14 +243,6 @@ type collector struct { history history - persist struct { - enable bool - fs fs.Filesystem - path string - interval time.Duration - done context.CancelFunc - } - inactiveTimeout time.Duration sessionTimeout time.Duration @@ -259,12 +253,11 @@ type collector struct { lock struct { session sync.RWMutex history sync.RWMutex - persist sync.Mutex + run sync.Mutex companion sync.RWMutex } - startOnce sync.Once - stopOnce sync.Once + running bool } const ( @@ -285,20 +278,21 @@ func NewCollector(config CollectorConfig) Collector { return collector } -func newCollector(id string, persistFS fs.Filesystem, logger log.Logger, config CollectorConfig) (*collector, error) { +func newCollector(id string, sessionsCh chan<- Session, logger log.Logger, config CollectorConfig) (*collector, error) { c := &collector{ + id: id, + logger: logger, + sessionsCh: sessionsCh, maxRxBitrate: float64(config.MaxRxBitrate), maxTxBitrate: float64(config.MaxTxBitrate), maxSessions: config.MaxSessions, inactiveTimeout: config.InactiveTimeout, sessionTimeout: config.SessionTimeout, limiter: config.Limiter, - logger: logger, - id: id, } if c.logger == nil { - c.logger = log.New("Session") + c.logger = log.New("") } if c.limiter == nil { @@ -370,6 +364,25 @@ func newCollector(id string, persistFS fs.Filesystem, logger log.Logger, config c.lock.history.Unlock() + if c.sessionsCh != nil { + c.sessionsCh <- Session{ + Collector: c.id, + ID: sess.id, + Reference: sess.reference, + CreatedAt: sess.createdAt, + ClosedAt: sess.closedAt, + Location: sess.location, + Peer: sess.peer, + Extra: sess.extra, + RxBytes: sess.rxBytes, + RxBitrate: sess.RxBitrate(), + TopRxBitrate: sess.TopRxBitrate(), + TxBytes: sess.txBytes, + TxBitrate: sess.TxBitrate(), + TopTxBitrate: sess.TopTxBitrate(), + } + } + c.sessionPool.Put(sess) c.currentActiveSessions-- @@ -379,123 +392,107 @@ func newCollector(id string, persistFS fs.Filesystem, logger log.Logger, config c.history.Sessions = make(map[string]totals) - c.persist.enable = persistFS != nil - c.persist.fs = persistFS - c.persist.path = "/" + id + ".json" - c.persist.interval = config.PersistInterval - - c.loadHistory(c.persist.fs, c.persist.path, &c.history) - - c.stopOnce.Do(func() {}) - c.start() return c, nil } func (c *collector) start() { - c.startOnce.Do(func() { - if c.persist.enable && c.persist.interval != 0 { - ctx, cancel := context.WithCancel(context.Background()) - c.persist.done = cancel - go c.persister(ctx, c.persist.interval) - } + c.lock.run.Lock() + defer c.lock.run.Unlock() - c.rxBitrate, _ = average.New(averageWindow, averageGranularity) - c.txBitrate, _ = average.New(averageWindow, averageGranularity) + if c.running { + return + } - c.stopOnce = sync.Once{} - }) + c.running = true + + c.rxBitrate, _ = average.New(averageWindow, averageGranularity) + c.txBitrate, _ = average.New(averageWindow, averageGranularity) } func (c *collector) Stop() { - c.stopOnce.Do(func() { - if c.persist.enable && c.persist.interval != 0 { - c.persist.done() - } + c.lock.run.Lock() + defer c.lock.run.Unlock() - c.lock.session.RLock() - for _, sess := range c.sessions { - // Cancel all current sessions - sess.Cancel() - } - c.lock.session.RUnlock() - - // Wait for all current sessions to finish - c.sessionsWG.Wait() - - c.Persist() - - c.startOnce = sync.Once{} - }) -} - -func (c *collector) Persist() { - c.lock.history.RLock() - defer c.lock.history.RUnlock() - - c.saveHistory(c.persist.fs, c.persist.path, &c.history) -} - -func (c *collector) persister(ctx context.Context, interval time.Duration) { - ticker := time.NewTicker(interval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - c.Persist() - } - } -} - -func (c *collector) loadHistory(fs fs.Filesystem, path string, data *history) { - if fs == nil { + if !c.running { return } - c.logger.WithComponent("SessionStore").WithFields(log.Fields{ - "base": fs.Metadata("base"), - "path": path, - }).Debug().Log("Loading history") + c.running = false - c.lock.persist.Lock() - defer c.lock.persist.Unlock() + c.lock.session.RLock() + for _, sess := range c.sessions { + // Cancel all current sessions + sess.Cancel() + } + c.lock.session.RUnlock() - jsondata, err := fs.ReadFile(path) + // Wait for all current sessions to finish + c.sessionsWG.Wait() +} + +type historySnapshot struct { + data []byte +} + +func (s *historySnapshot) Persist(sink SnapshotSink) error { + if _, err := sink.Write(s.data); err != nil { + sink.Cancel() + return err + } + + return sink.Close() +} + +func (s *historySnapshot) Release() { + s.data = nil +} + +func (c *collector) Snapshot() (Snapshot, error) { + c.logger.Debug().Log("Creating history snapshot") + + c.lock.history.Lock() + defer c.lock.history.Unlock() + + jsondata, err := json.MarshalIndent(&c.history, "", " ") if err != nil { - return + return nil, err } - if err = json.Unmarshal(jsondata, data); err != nil { - return + s := &historySnapshot{ + data: jsondata, } + + return s, nil } -func (c *collector) saveHistory(fs fs.Filesystem, path string, data *history) { - if fs == nil { - return +func (c *collector) Restore(snapshot io.ReadCloser) error { + if snapshot == nil { + return nil } - c.logger.WithComponent("SessionStore").WithFields(log.Fields{ - "base": fs.Metadata("base"), - "path": path, - }).Debug().Log("Storing history") + defer snapshot.Close() - c.lock.persist.Lock() - defer c.lock.persist.Unlock() + c.logger.Debug().Log("Restoring history snapshot") - jsondata, err := json.MarshalIndent(data, "", " ") + jsondata, err := io.ReadAll(snapshot) if err != nil { - return + return err } - _, _, err = fs.WriteFileSafe(path, jsondata) - if err != nil { - return + data := history{} + + if err = json.Unmarshal(jsondata, &data); err != nil { + return err } + + c.lock.history.Lock() + defer c.lock.history.Unlock() + + c.history = data + + return nil } func (c *collector) IsCollectableIP(ip string) bool { @@ -838,6 +835,7 @@ func (c *collector) Active() []Session { } session := Session{ + Collector: c.id, ID: sess.id, Reference: sess.reference, CreatedAt: sess.createdAt, @@ -953,3 +951,5 @@ func (n *nullCollector) CompanionEgressBitrate() float64 func (n *nullCollector) CompanionTopIngressBitrate() float64 { return 0.0 } func (n *nullCollector) CompanionTopEgressBitrate() float64 { return 0.0 } func (n *nullCollector) Stop() {} +func (n *nullCollector) Snapshot() (Snapshot, error) { return nil, nil } +func (n *nullCollector) Restore(snapshot io.ReadCloser) error { return snapshot.Close() } diff --git a/session/collector_test.go b/session/collector_test.go index 4e9a0d52..abff46b4 100644 --- a/session/collector_test.go +++ b/session/collector_test.go @@ -7,11 +7,15 @@ import ( "github.com/stretchr/testify/require" ) -func TestRegisterSession(t *testing.T) { - c, err := newCollector("", nil, nil, CollectorConfig{ - InactiveTimeout: time.Hour, - SessionTimeout: time.Hour, +func createCollector(inactive, session time.Duration) (*collector, error) { + return newCollector("", nil, nil, CollectorConfig{ + InactiveTimeout: inactive, + SessionTimeout: session, }) +} + +func TestRegisterSession(t *testing.T) { + c, err := createCollector(time.Hour, time.Hour) require.Equal(t, nil, err) b := c.IsKnownSession("foobar") @@ -31,10 +35,7 @@ func TestRegisterSession(t *testing.T) { } func TestInactiveSession(t *testing.T) { - c, err := newCollector("", nil, nil, CollectorConfig{ - InactiveTimeout: time.Second, - SessionTimeout: time.Hour, - }) + c, err := createCollector(time.Second, time.Hour) require.Equal(t, nil, err) b := c.IsKnownSession("foobar") @@ -52,10 +53,7 @@ func TestInactiveSession(t *testing.T) { } func TestActivateSession(t *testing.T) { - c, err := newCollector("", nil, nil, CollectorConfig{ - InactiveTimeout: time.Second, - SessionTimeout: time.Second, - }) + c, err := createCollector(time.Second, time.Second) require.Equal(t, nil, err) b := c.IsKnownSession("foobar") @@ -73,10 +71,7 @@ func TestActivateSession(t *testing.T) { } func TestIngress(t *testing.T) { - c, err := newCollector("", nil, nil, CollectorConfig{ - InactiveTimeout: time.Second, - SessionTimeout: time.Hour, - }) + c, err := createCollector(time.Second, time.Hour) require.Equal(t, nil, err) c.RegisterAndActivate("foobar", "", "", "") @@ -92,10 +87,7 @@ func TestIngress(t *testing.T) { } func TestEgress(t *testing.T) { - c, err := newCollector("", nil, nil, CollectorConfig{ - InactiveTimeout: time.Second, - SessionTimeout: time.Hour, - }) + c, err := createCollector(time.Second, time.Hour) require.Equal(t, nil, err) c.RegisterAndActivate("foobar", "", "", "") @@ -111,10 +103,7 @@ func TestEgress(t *testing.T) { } func TestNbSessions(t *testing.T) { - c, err := newCollector("", nil, nil, CollectorConfig{ - InactiveTimeout: time.Hour, - SessionTimeout: time.Hour, - }) + c, err := createCollector(time.Hour, time.Hour) require.Equal(t, nil, err) nsessions := c.Sessions() diff --git a/session/registry.go b/session/registry.go index 405f010a..bf7ad014 100644 --- a/session/registry.go +++ b/session/registry.go @@ -1,12 +1,17 @@ package session import ( + "bytes" + "context" + "encoding/json" "fmt" "regexp" "sync" + "time" "github.com/datarhei/core/v16/io/fs" "github.com/datarhei/core/v16/log" + "github.com/lestrrat-go/strftime" ) // Config is the configuration for creating a new registry @@ -15,6 +20,19 @@ type Config struct { // history will not be persisted. PersistFS fs.Filesystem + // PersistInterval is the duration between persisting the history. Can be 0. Then the history will + // only be persisted at stopping the collector. + PersistInterval time.Duration + + // SessionLogPattern is a path inside the PersistFS where the individual sessions will + // be logged. The path can contain strftime-plateholders in order to split the log files. + // If this string is empty or PersistFS is nil, the sessions will not be logged. + LogPattern string + + // SessionLogBufferDuration is the maximum duration session logs should be buffered before written + // to the filesystem. If not provided, the default of 15 seconds will be used. + LogBufferDuration time.Duration + // Logger is an instance of a logger. If it is nil, no logs // will be written. Logger log.Logger @@ -47,32 +65,212 @@ type Registry interface { UnregisterAll() RegistryReader + + Close() error } type registry struct { collector map[string]*collector - persistFS fs.Filesystem - logger log.Logger + + persist struct { + fs fs.Filesystem + interval time.Duration + cancel context.CancelFunc + sessionsCh chan Session + sessionsWg sync.WaitGroup + logPattern *strftime.Strftime + logBufferDuration time.Duration + lock sync.Mutex + } + + logger log.Logger lock sync.Mutex } // New returns a new registry for collectors that implement the Registry interface. The error -// is non-nil if the PersistDir from the config can't be created. -func New(conf Config) (Registry, error) { +// is non-nil if the registry can't be created. +func New(config Config) (Registry, error) { r := ®istry{ collector: make(map[string]*collector), - persistFS: conf.PersistFS, - logger: conf.Logger, + logger: config.Logger, } + r.persist.fs = config.PersistFS + r.persist.interval = config.PersistInterval + if r.logger == nil { r.logger = log.New("Session") } + pattern, err := strftime.New(config.LogPattern) + if err != nil { + return nil, err + } + + r.persist.logPattern = pattern + r.persist.logBufferDuration = config.LogBufferDuration + if r.persist.logBufferDuration <= 0 { + r.persist.logBufferDuration = 15 * time.Second + } + + r.startPersister() + return r, nil } +func (r *registry) Close() error { + r.UnregisterAll() + r.stopPersister() + + return nil +} + +func (r *registry) startPersister() { + if r.persist.fs == nil { + return + } + + r.persist.lock.Lock() + defer r.persist.lock.Unlock() + + if r.persist.interval > 0 { + if r.persist.cancel == nil { + ctx, cancel := context.WithCancel(context.Background()) + r.persist.cancel = cancel + + go r.historyPersister(ctx, r.persist.interval) + } + } + + if r.persist.logPattern != nil { + r.persist.sessionsCh = make(chan Session, 128) + r.persist.sessionsWg.Add(1) + } + + go r.sessionPersister(r.persist.logPattern, r.persist.logBufferDuration, r.persist.sessionsCh) +} + +func (r *registry) stopPersister() { + if r.persist.fs == nil { + return + } + + r.persist.lock.Lock() + defer r.persist.lock.Unlock() + + if r.persist.cancel != nil { + r.persist.cancel() + r.persist.cancel = nil + } + + if r.persist.sessionsCh != nil { + close(r.persist.sessionsCh) + r.persist.sessionsWg.Wait() + r.persist.sessionsCh = nil + } +} + +func (r *registry) historyPersister(ctx context.Context, interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + r.logger.Debug().WithField("interval", interval).Log("History persister started") + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + r.persistAllCollectors() + } + } +} + +func (r *registry) sessionPersister(pattern *strftime.Strftime, bufferDuration time.Duration, ch <-chan Session) { + defer r.persist.sessionsWg.Done() + + r.logger.Debug().WithFields(log.Fields{ + "pattern": pattern.Pattern(), + "buffer": bufferDuration, + }).Log("Session persister started") + + buffer := &bytes.Buffer{} + path := pattern.FormatString(time.Now()) + + enc := json.NewEncoder(buffer) + + ticker := time.NewTicker(bufferDuration) + defer ticker.Stop() + +loop: + for { + select { + case session, ok := <-ch: + if !ok { + break loop + } + currentPath := pattern.FormatString(session.ClosedAt) + if currentPath != path { + if buffer.Len() > 0 { + _, _, err := r.persist.fs.WriteFileSafe(path, buffer.Bytes()) + r.logger.Error().WithError(err).WithField("path", path).Log("") + } + buffer.Reset() + path = currentPath + } + + enc.Encode(&session) + case t := <-ticker.C: + if buffer.Len() > 0 { + _, _, err := r.persist.fs.WriteFileSafe(path, buffer.Bytes()) + r.logger.Error().WithError(err).WithField("path", path).Log("") + } + currentPath := pattern.FormatString(t) + if currentPath != path { + buffer.Reset() + path = currentPath + } + } + } + + if buffer.Len() > 0 { + _, _, err := r.persist.fs.WriteFileSafe(path, buffer.Bytes()) + r.logger.Error().WithError(err).WithField("path", path).Log("") + } + + buffer = nil +} + +func (r *registry) persistAllCollectors() { + wg := sync.WaitGroup{} + + r.lock.Lock() + for id, m := range r.collector { + wg.Add(1) + go func(id string, m Collector) { + defer wg.Done() + + s, err := m.Snapshot() + if err != nil { + return + } + + sink, err := NewHistorySink(r.persist.fs, "/"+id+".json") + if err != nil { + return + } + + if err := s.Persist(sink); err != nil { + return + } + }(id, m) + } + r.lock.Unlock() + + wg.Wait() +} + func (r *registry) Register(id string, conf CollectorConfig) (Collector, error) { if len(id) == 0 { return nil, fmt.Errorf("invalid ID. empty IDs are not allowed") @@ -91,11 +289,22 @@ func (r *registry) Register(id string, conf CollectorConfig) (Collector, error) return nil, fmt.Errorf("a collector with the ID '%s' already exists", id) } - m, err := newCollector(id, r.persistFS, r.logger, conf) + m, err := newCollector(id, r.persist.sessionsCh, r.logger.WithComponent(id), conf) if err != nil { return nil, err } + if r.persist.fs != nil { + s, err := NewHistorySource(r.persist.fs, "/"+id+".json") + if err != nil { + return nil, err + } + + if err := m.Restore(s); err != nil { + return nil, err + } + } + m.start() r.collector[id] = m @@ -107,6 +316,10 @@ func (r *registry) Unregister(id string) error { r.lock.Lock() defer r.lock.Unlock() + return r.unregister(id) +} + +func (r *registry) unregister(id string) error { m, ok := r.collector[id] if !ok { return fmt.Errorf("a collector with the ID '%s' doesn't exist", id) @@ -116,6 +329,24 @@ func (r *registry) Unregister(id string) error { delete(r.collector, id) + if r.persist.fs != nil { + s, err := m.Snapshot() + if err != nil { + return err + } + + if s != nil { + sink, err := NewHistorySink(r.persist.fs, "/"+id+".json") + if err != nil { + return err + } + + if err := s.Persist(sink); err != nil { + return err + } + } + } + return nil } @@ -148,8 +379,8 @@ func (r *registry) UnregisterAll() { r.lock.Lock() defer r.lock.Unlock() - for _, m := range r.collector { - m.Stop() + for id := range r.collector { + r.unregister(id) } r.collector = make(map[string]*collector) diff --git a/session/registry_test.go b/session/registry_test.go index 792a9585..cf9f46b3 100644 --- a/session/registry_test.go +++ b/session/registry_test.go @@ -2,56 +2,68 @@ package session import ( "testing" + "time" + "github.com/datarhei/core/v16/io/fs" + "github.com/lestrrat-go/strftime" "github.com/stretchr/testify/require" ) func TestRegister(t *testing.T) { r, err := New(Config{}) - require.Equal(t, nil, err) + require.NoError(t, err) + t.Cleanup(func() { + r.Close() + }) _, err = r.Register("", CollectorConfig{}) - require.NotEqual(t, nil, err) + require.Error(t, err) _, err = r.Register("../foo/bar", CollectorConfig{}) - require.NotEqual(t, nil, err) + require.Error(t, err) _, err = r.Register("foobar", CollectorConfig{}) - require.Equal(t, nil, err) + require.NoError(t, err) _, err = r.Register("foobar", CollectorConfig{}) - require.NotEqual(t, nil, err) + require.Error(t, err) } func TestUnregister(t *testing.T) { r, err := New(Config{}) - require.Equal(t, nil, err) + require.NoError(t, err) + t.Cleanup(func() { + r.Close() + }) _, err = r.Register("foobar", CollectorConfig{}) - require.Equal(t, nil, err) + require.NoError(t, err) err = r.Unregister("foobar") - require.Equal(t, nil, err) + require.NoError(t, err) err = r.Unregister("foobar") - require.NotEqual(t, nil, err) + require.Error(t, err) } func TestCollectors(t *testing.T) { r, err := New(Config{}) - require.Equal(t, nil, err) + require.NoError(t, err) + t.Cleanup(func() { + r.Close() + }) c := r.Collectors() require.Equal(t, []string{}, c) _, err = r.Register("foobar", CollectorConfig{}) - require.Equal(t, nil, err) + require.NoError(t, err) c = r.Collectors() require.Equal(t, []string{"foobar"}, c) err = r.Unregister("foobar") - require.Equal(t, nil, err) + require.NoError(t, err) c = r.Collectors() require.Equal(t, []string{}, c) @@ -59,33 +71,39 @@ func TestCollectors(t *testing.T) { func TestGetCollector(t *testing.T) { r, err := New(Config{}) - require.Equal(t, nil, err) + require.NoError(t, err) + t.Cleanup(func() { + r.Close() + }) c := r.Collector("foobar") - require.Equal(t, nil, c) + require.Nil(t, c) _, err = r.Register("foobar", CollectorConfig{}) - require.Equal(t, nil, err) + require.NoError(t, err) c = r.Collector("foobar") - require.NotEqual(t, nil, c) + require.NotNil(t, c) err = r.Unregister("foobar") - require.Equal(t, nil, err) + require.NoError(t, err) c = r.Collector("foobar") - require.Equal(t, nil, c) + require.Nil(t, c) } func TestUnregisterAll(t *testing.T) { r, err := New(Config{}) - require.Equal(t, nil, err) + require.NoError(t, err) + t.Cleanup(func() { + r.Close() + }) _, err = r.Register("foo", CollectorConfig{}) - require.Equal(t, nil, err) + require.NoError(t, err) _, err = r.Register("bar", CollectorConfig{}) - require.Equal(t, nil, err) + require.NoError(t, err) c := r.Collectors() require.ElementsMatch(t, []string{"foo", "bar"}, c) @@ -95,3 +113,190 @@ func TestUnregisterAll(t *testing.T) { c = r.Collectors() require.Equal(t, []string{}, c) } + +func TestPersistHistory(t *testing.T) { + memfs, err := fs.NewMemFilesystem(fs.MemConfig{}) + require.NoError(t, err) + + r, err := New(Config{ + PersistFS: memfs, + }) + require.NoError(t, err) + t.Cleanup(func() { + r.Close() + }) + + c, err := r.Register("foobar", CollectorConfig{}) + require.NoError(t, err) + + c.RegisterAndActivate("foo", "ref", "location", "peer") + c.Egress("foo", 42) + + err = r.Unregister("foobar") + require.NoError(t, err) + + _, err = memfs.Stat("/foobar.json") + require.NoError(t, err) + + c, err = r.Register("foobar", CollectorConfig{}) + require.NoError(t, err) + + cc := c.(*collector) + totals, ok := cc.history.Sessions["location:peer:ref"] + require.True(t, ok) + require.Equal(t, "location", totals.Location) + require.Equal(t, "ref", totals.Reference) + require.Equal(t, "peer", totals.Peer) + require.Equal(t, uint64(42), totals.TotalTxBytes) +} + +func TestPeriodicPersistHistory(t *testing.T) { + memfs, err := fs.NewMemFilesystem(fs.MemConfig{}) + require.NoError(t, err) + + r, err := New(Config{ + PersistFS: memfs, + PersistInterval: 5 * time.Second, + }) + require.NoError(t, err) + t.Cleanup(func() { + r.Close() + }) + + c, err := r.Register("foobar", CollectorConfig{ + SessionTimeout: time.Second, + }) + require.NoError(t, err) + + c.RegisterAndActivate("foo", "ref", "location", "peer") + c.Egress("foo", 42) + + require.Eventually(t, func() bool { + _, err = memfs.Stat("/foobar.json") + return err == nil + }, 10*time.Second, time.Second) + + err = r.Unregister("foobar") + require.NoError(t, err) + + _, err = memfs.Stat("/foobar.json") + require.NoError(t, err) +} + +func TestPersistSession(t *testing.T) { + memfs, err := fs.NewMemFilesystem(fs.MemConfig{}) + require.NoError(t, err) + + pattern := "/log/%Y-%m-%d.log" + + r, err := New(Config{ + PersistFS: memfs, + LogPattern: pattern, + }) + require.NoError(t, err) + t.Cleanup(func() { + r.Close() + }) + + c, err := r.Register("foobar", CollectorConfig{ + SessionTimeout: 3 * time.Second, + }) + require.NoError(t, err) + + c.RegisterAndActivate("foo", "ref", "location", "peer") + c.Egress("foo", 42) + + err = r.Unregister("foobar") + require.NoError(t, err) + + r.Close() + + path, err := strftime.Format(pattern, time.Now()) + require.NoError(t, err) + + info, err := memfs.Stat(path) + require.NoError(t, err) + require.Greater(t, info.Size(), int64(0)) +} + +func TestPersistSessionSlpit(t *testing.T) { + memfs, err := fs.NewMemFilesystem(fs.MemConfig{}) + require.NoError(t, err) + + pattern := "/log/%Y-%m-%d-%H:%M:%S.log" + + r, err := New(Config{ + PersistFS: memfs, + LogPattern: pattern, + }) + require.NoError(t, err) + t.Cleanup(func() { + r.Close() + }) + + c, err := r.Register("foobar", CollectorConfig{ + SessionTimeout: 3 * time.Second, + }) + require.NoError(t, err) + + c.RegisterAndActivate("foo", "ref", "location", "peer") + c.Egress("foo", 42) + + time.Sleep(3 * time.Second) + + c.RegisterAndActivate("bar", "ref", "location", "peer") + c.Egress("bar", 24) + + err = r.Unregister("foobar") + require.NoError(t, err) + + r.Close() + + require.Equal(t, int64(2), memfs.Files()) +} + +func TestPersistSessionBuffer(t *testing.T) { + memfs, err := fs.NewMemFilesystem(fs.MemConfig{}) + require.NoError(t, err) + + pattern := "/log/%Y-%m-%d.log" + + r, err := New(Config{ + PersistFS: memfs, + LogPattern: pattern, + LogBufferDuration: 5 * time.Second, + }) + require.NoError(t, err) + t.Cleanup(func() { + r.Close() + }) + + c, err := r.Register("foobar", CollectorConfig{ + SessionTimeout: 3 * time.Second, + }) + require.NoError(t, err) + + c.RegisterAndActivate("foo", "ref", "location", "peer") + c.Egress("foo", 42) + + require.Eventually(t, func() bool { + path, err := strftime.Format(pattern, time.Now()) + if err != nil { + return false + } + _, err = memfs.Stat(path) + return err == nil + }, 10*time.Second, time.Second) + + err = r.Unregister("foobar") + require.NoError(t, err) + + r.Close() + + path, err := strftime.Format(pattern, time.Now()) + require.NoError(t, err) + + info, err := memfs.Stat(path) + require.NoError(t, err) + require.Greater(t, info.Size(), int64(0)) +} diff --git a/session/snapshot.go b/session/snapshot.go new file mode 100644 index 00000000..ef303ef4 --- /dev/null +++ b/session/snapshot.go @@ -0,0 +1,94 @@ +package session + +import ( + "bytes" + "io" + "os" + + "github.com/datarhei/core/v16/io/fs" +) + +type SnapshotSource interface { + io.ReadCloser +} + +type SnapshotSink interface { + io.WriteCloser + + Cancel() error +} + +type Snapshot interface { + Persist(sink SnapshotSink) error + Release() +} + +type historySource struct { + fs fs.Filesystem + path string + data *bytes.Reader +} + +// NewHistorySource returns a new SnapshotSource which reads the previously stored +// session history. If there's no data, a nil source with a nil error will be returned. +// If there's data, a non-nil source with a nil error will be returned. Otherwise +// the source will be nil and the error non-nil. +func NewHistorySource(fs fs.Filesystem, path string) (SnapshotSource, error) { + s := &historySource{ + fs: fs, + path: path, + } + + if _, err := s.fs.Stat(s.path); err == os.ErrNotExist { + return nil, nil + } + + data, err := s.fs.ReadFile(s.path) + if err != nil { + return nil, err + } + + s.data = bytes.NewReader(data) + + return s, nil +} + +func (s *historySource) Read(p []byte) (int, error) { + return s.data.Read(p) +} + +func (s *historySource) Close() error { + s.data = nil + return nil +} + +type historySink struct { + fs fs.Filesystem + path string + data *bytes.Buffer +} + +func NewHistorySink(fs fs.Filesystem, path string) (SnapshotSink, error) { + s := &historySink{ + fs: fs, + path: path, + data: &bytes.Buffer{}, + } + + return s, nil +} + +func (s *historySink) Write(p []byte) (int, error) { + return s.data.Write(p) +} + +func (s *historySink) Close() error { + _, _, err := s.fs.WriteFileSafe(s.path, s.data.Bytes()) + s.data = nil + return err +} + +func (s *historySink) Cancel() error { + s.data = nil + return nil +} diff --git a/vendor/github.com/datarhei/core-client-go/v16/api/config.go b/vendor/github.com/datarhei/core-client-go/v16/api/config.go index 9f98ea3e..400a419e 100644 --- a/vendor/github.com/datarhei/core-client-go/v16/api/config.go +++ b/vendor/github.com/datarhei/core-client-go/v16/api/config.go @@ -430,13 +430,15 @@ type ConfigV3 struct { Interval int64 `json:"interval_sec" format:"int64"` // seconds } `json:"metrics"` Sessions struct { - Enable bool `json:"enable"` - IPIgnoreList []string `json:"ip_ignorelist"` - SessionTimeout int `json:"session_timeout_sec" format:"int"` - Persist bool `json:"persist"` - PersistInterval int `json:"persist_interval_sec" format:"int"` - MaxBitrate uint64 `json:"max_bitrate_mbit" format:"uint64"` - MaxSessions uint64 `json:"max_sessions" format:"uint64"` + Enable bool `json:"enable"` + IPIgnoreList []string `json:"ip_ignorelist"` + Persist bool `json:"persist"` + PersistInterval int `json:"persist_interval_sec" format:"int"` + SessionTimeout int `json:"session_timeout_sec" format:"int"` + SessionLogPathPattern string `json:"session_log_path_pattern"` + SessionLogBuffer int `json:"session_log_buffer_sec" format:"int"` + MaxBitrate uint64 `json:"max_bitrate_mbit" format:"uint64"` + MaxSessions uint64 `json:"max_sessions" format:"uint64"` } `json:"sessions"` Service struct { Enable bool `json:"enable"` diff --git a/vendor/modules.txt b/vendor/modules.txt index 0e152598..69e3212a 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -78,7 +78,7 @@ github.com/cespare/xxhash/v2 # github.com/cpuguy83/go-md2man/v2 v2.0.2 ## explicit; go 1.11 github.com/cpuguy83/go-md2man/v2/md2man -# github.com/datarhei/core-client-go/v16 v16.11.1-0.20230614141756-a25a5fc3c60e +# github.com/datarhei/core-client-go/v16 v16.11.1-0.20230620131644-140b3a61d4c8 ## explicit; go 1.18 github.com/datarhei/core-client-go/v16 github.com/datarhei/core-client-go/v16/api