mirror of
https://github.com/datarhei/core.git
synced 2025-10-08 17:30:52 +08:00
Allow to log each finished session to filesystem
By providing CORE_SESSIONS_SESSION_LOG_PATH_PATTERN (e.g. "/log/%Y-%m-%d.log") all finished sessions will be logged to a file according to the provided strftime-pattern. The actual value is calculated from when the session closed. CORE_SESSIONS_PERSIST must be set. Default: not set. Set CORE_SESSIONS_SESSION_LOG_BUFFER_SEC to the number of seconds the log should be buffered in memory before persisted to disk. Default 15 seconds.
This commit is contained in:
@@ -339,7 +339,10 @@ func (a *api) start() error {
|
||||
|
||||
if cfg.Sessions.Enable {
|
||||
sessionConfig := session.Config{
|
||||
Logger: a.log.logger.core.WithComponent("Session"),
|
||||
PersistInterval: time.Duration(cfg.Sessions.PersistInterval) * time.Second,
|
||||
LogPattern: cfg.Sessions.SessionLogPathPattern,
|
||||
LogBufferDuration: time.Duration(cfg.Sessions.SessionLogBuffer) * time.Second,
|
||||
Logger: a.log.logger.core.WithComponent("Session"),
|
||||
}
|
||||
|
||||
if cfg.Sessions.Persist {
|
||||
@@ -371,7 +374,6 @@ func (a *api) start() error {
|
||||
MaxSessions: cfg.Sessions.MaxSessions,
|
||||
InactiveTimeout: 5 * time.Second,
|
||||
SessionTimeout: time.Duration(cfg.Sessions.SessionTimeout) * time.Second,
|
||||
PersistInterval: time.Duration(cfg.Sessions.PersistInterval) * time.Second,
|
||||
Limiter: iplimiter,
|
||||
}
|
||||
|
||||
@@ -1823,6 +1825,7 @@ func (a *api) stop() {
|
||||
// Stop the session tracker
|
||||
if a.sessions != nil {
|
||||
a.sessions.UnregisterAll()
|
||||
a.sessions.Close()
|
||||
a.sessions = nil
|
||||
}
|
||||
|
||||
|
@@ -260,10 +260,12 @@ func (d *Config) init() {
|
||||
d.vars.Register(value.NewInt64(&d.Metrics.Interval, 2), "metrics.interval_seconds", "CORE_METRICS_INTERVAL_SECONDS", nil, "Interval for collecting metrics", false, false)
|
||||
|
||||
// Sessions
|
||||
d.vars.Register(value.NewBool(&d.Sessions.Enable, true), "sessions.enable", "CORE_SESSIONS_ENABLE", nil, "Enable collecting HLS session stats for /memfs", false, false)
|
||||
d.vars.Register(value.NewBool(&d.Sessions.Enable, true), "sessions.enable", "CORE_SESSIONS_ENABLE", nil, "Enable collecting session stats", false, false)
|
||||
d.vars.Register(value.NewCIDRList(&d.Sessions.IPIgnoreList, []string{"127.0.0.1/32", "::1/128"}, ","), "sessions.ip_ignorelist", "CORE_SESSIONS_IP_IGNORELIST", nil, "List of IP ranges in CIDR notation to ignore", false, false)
|
||||
d.vars.Register(value.NewInt(&d.Sessions.SessionTimeout, 30), "sessions.session_timeout_sec", "CORE_SESSIONS_SESSION_TIMEOUT_SEC", nil, "Timeout for an idle session", false, false)
|
||||
d.vars.Register(value.NewBool(&d.Sessions.Persist, false), "sessions.persist", "CORE_SESSIONS_PERSIST", nil, "Whether to persist session history. Will be stored as sessions.json in db.dir", false, false)
|
||||
d.vars.Register(value.NewStrftime(&d.Sessions.SessionLogPathPattern, ""), "sessions.session_log_path_pattern", "CORE_SESSIONS_LOG_PATH_PATTERN", nil, "Path to where the sessions will be logged, may contain strftime-patterns, leave empty for no session logging, persist must be enabled", false, false)
|
||||
d.vars.Register(value.NewInt(&d.Sessions.SessionLogBuffer, 15), "sessions.session_log_buffer_sec", "CORE_SESSIONS_SESSION_LOG_BUFFER_SEC", nil, "Maximum duration to buffer session logs in memory before persisting on disk", false, false)
|
||||
d.vars.Register(value.NewBool(&d.Sessions.Persist, false), "sessions.persist", "CORE_SESSIONS_PERSIST", nil, "Whether to persist session history. Will be stored in /sessions/[collector].json in db.dir", false, false)
|
||||
d.vars.Register(value.NewInt(&d.Sessions.PersistInterval, 300), "sessions.persist_interval_sec", "CORE_SESSIONS_PERSIST_INTERVAL_SEC", nil, "Interval in seconds in which to persist the current session history", false, false)
|
||||
d.vars.Register(value.NewUint64(&d.Sessions.MaxBitrate, 0), "sessions.max_bitrate_mbit", "CORE_SESSIONS_MAXBITRATE_MBIT", nil, "Max. allowed outgoing bitrate in mbit/s, 0 for unlimited", false, false)
|
||||
d.vars.Register(value.NewUint64(&d.Sessions.MaxSessions, 0), "sessions.max_sessions", "CORE_SESSIONS_MAX_SESSIONS", []string{"CORE_SESSIONS_MAXSESSIONS"}, "Max. allowed number of simultaneous sessions, 0 for unlimited", false, false)
|
||||
|
@@ -150,13 +150,15 @@ type Data struct {
|
||||
Interval int64 `json:"interval_sec" format:"int64"` // seconds
|
||||
} `json:"metrics"`
|
||||
Sessions struct {
|
||||
Enable bool `json:"enable"`
|
||||
IPIgnoreList []string `json:"ip_ignorelist"`
|
||||
SessionTimeout int `json:"session_timeout_sec" format:"int"`
|
||||
Persist bool `json:"persist"`
|
||||
PersistInterval int `json:"persist_interval_sec" format:"int"`
|
||||
MaxBitrate uint64 `json:"max_bitrate_mbit" format:"uint64"`
|
||||
MaxSessions uint64 `json:"max_sessions" format:"uint64"`
|
||||
Enable bool `json:"enable"`
|
||||
IPIgnoreList []string `json:"ip_ignorelist"`
|
||||
Persist bool `json:"persist"`
|
||||
PersistInterval int `json:"persist_interval_sec" format:"int"`
|
||||
SessionTimeout int `json:"session_timeout_sec" format:"int"`
|
||||
SessionLogPathPattern string `json:"session_log_path_pattern"`
|
||||
SessionLogBuffer int `json:"session_log_buffer_sec" format:"int"`
|
||||
MaxBitrate uint64 `json:"max_bitrate_mbit" format:"uint64"`
|
||||
MaxSessions uint64 `json:"max_sessions" format:"uint64"`
|
||||
} `json:"sessions"`
|
||||
Service struct {
|
||||
Enable bool `json:"enable"`
|
||||
@@ -207,7 +209,6 @@ func MergeV2toV3(data *Data, d *v2.Data) (*Data, error) {
|
||||
data.SRT = d.SRT
|
||||
data.Playout = d.Playout
|
||||
data.Metrics = d.Metrics
|
||||
data.Sessions = d.Sessions
|
||||
data.Service = d.Service
|
||||
data.Router = d.Router
|
||||
|
||||
@@ -233,7 +234,13 @@ func MergeV2toV3(data *Data, d *v2.Data) (*Data, error) {
|
||||
data.FFmpeg.Log.MaxLines = d.FFmpeg.Log.MaxLines
|
||||
data.FFmpeg.Log.MaxHistory = d.FFmpeg.Log.MaxHistory
|
||||
|
||||
data.Sessions.Enable = d.Sessions.Enable
|
||||
data.Sessions.IPIgnoreList = copy.Slice(d.Sessions.IPIgnoreList)
|
||||
data.Sessions.SessionTimeout = d.Sessions.SessionTimeout
|
||||
data.Sessions.Persist = d.Sessions.Persist
|
||||
data.Sessions.PersistInterval = d.Sessions.PersistInterval
|
||||
data.Sessions.MaxBitrate = d.Sessions.MaxBitrate
|
||||
data.Sessions.MaxSessions = d.Sessions.MaxSessions
|
||||
|
||||
data.SRT.Log.Topics = copy.Slice(d.SRT.Log.Topics)
|
||||
|
||||
@@ -295,7 +302,6 @@ func DowngradeV3toV2(d *Data) (*v2.Data, error) {
|
||||
data.SRT = d.SRT
|
||||
data.Playout = d.Playout
|
||||
data.Metrics = d.Metrics
|
||||
data.Sessions = d.Sessions
|
||||
data.Service = d.Service
|
||||
data.Router = d.Router
|
||||
|
||||
@@ -321,7 +327,13 @@ func DowngradeV3toV2(d *Data) (*v2.Data, error) {
|
||||
data.FFmpeg.Log.MaxLines = d.FFmpeg.Log.MaxLines
|
||||
data.FFmpeg.Log.MaxHistory = d.FFmpeg.Log.MaxHistory
|
||||
|
||||
data.Sessions.Enable = d.Sessions.Enable
|
||||
data.Sessions.IPIgnoreList = copy.Slice(d.Sessions.IPIgnoreList)
|
||||
data.Sessions.SessionTimeout = d.Sessions.SessionTimeout
|
||||
data.Sessions.Persist = d.Sessions.Persist
|
||||
data.Sessions.PersistInterval = d.Sessions.PersistInterval
|
||||
data.Sessions.MaxBitrate = d.Sessions.MaxBitrate
|
||||
data.Sessions.MaxSessions = d.Sessions.MaxSessions
|
||||
|
||||
data.SRT.Log.Topics = copy.Slice(d.SRT.Log.Topics)
|
||||
|
||||
|
@@ -1,6 +1,10 @@
|
||||
package value
|
||||
|
||||
import "time"
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/lestrrat-go/strftime"
|
||||
)
|
||||
|
||||
// time
|
||||
|
||||
@@ -34,3 +38,31 @@ func (u *Time) IsEmpty() bool {
|
||||
v := time.Time(*u)
|
||||
return v.IsZero()
|
||||
}
|
||||
|
||||
// strftime
|
||||
|
||||
type Strftime string
|
||||
|
||||
func NewStrftime(p *string, val string) *Strftime {
|
||||
*p = val
|
||||
|
||||
return (*Strftime)(p)
|
||||
}
|
||||
|
||||
func (s *Strftime) Set(val string) error {
|
||||
*s = Strftime(val)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Strftime) String() string {
|
||||
return string(*s)
|
||||
}
|
||||
|
||||
func (s *Strftime) Validate() error {
|
||||
_, err := strftime.New(string(*s))
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Strftime) IsEmpty() bool {
|
||||
return len(string(*s)) == 0
|
||||
}
|
||||
|
@@ -28,3 +28,23 @@ func TestTimeValue(t *testing.T) {
|
||||
|
||||
require.Equal(t, time.Time(time.Date(2009, time.November, 11, 23, 0, 0, 0, time.UTC)), x)
|
||||
}
|
||||
|
||||
func TestStrftimeValue(t *testing.T) {
|
||||
var x string
|
||||
|
||||
val := NewStrftime(&x, "%Y-%m-%d.log")
|
||||
|
||||
require.Equal(t, "%Y-%m-%d.log", val.String())
|
||||
require.Equal(t, nil, val.Validate())
|
||||
require.Equal(t, false, val.IsEmpty())
|
||||
|
||||
x = "%Y-%m-%d-%H:%M:%S.log"
|
||||
|
||||
require.Equal(t, "%Y-%m-%d-%H:%M:%S.log", val.String())
|
||||
require.Equal(t, nil, val.Validate())
|
||||
require.Equal(t, false, val.IsEmpty())
|
||||
|
||||
val.Set("bla.log")
|
||||
|
||||
require.Equal(t, "bla.log", x)
|
||||
}
|
||||
|
2
go.mod
2
go.mod
@@ -9,7 +9,7 @@ require (
|
||||
github.com/atrox/haikunatorgo/v2 v2.0.1
|
||||
github.com/caddyserver/certmagic v0.18.0
|
||||
github.com/casbin/casbin/v2 v2.71.1
|
||||
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230614141756-a25a5fc3c60e
|
||||
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230620131644-140b3a61d4c8
|
||||
github.com/datarhei/gosrt v0.5.2
|
||||
github.com/datarhei/joy4 v0.0.0-20230505074825-fde05957445a
|
||||
github.com/fujiwara/shapeio v1.0.0
|
||||
|
9
go.sum
9
go.sum
@@ -8,6 +8,8 @@ github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc
|
||||
github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE=
|
||||
github.com/Masterminds/semver/v3 v3.2.1 h1:RN9w6+7QoMeJVGyfmbcgs28Br8cvmnucEXnY0rYXWg0=
|
||||
github.com/Masterminds/semver/v3 v3.2.1/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ=
|
||||
github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
|
||||
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
|
||||
github.com/adhocore/gronx v1.6.3 h1:bnm5vieTrY3QQPpsfB0hrAaeaHDpuZTUC2LLCVMLe9c=
|
||||
github.com/adhocore/gronx v1.6.3/go.mod h1:7oUY1WAU8rEJWmAxXR2DN0JaO4gi9khSgKjiRypqteg=
|
||||
github.com/agnivade/levenshtein v1.1.1 h1:QY8M92nrzkmr798gCo3kmMyqXFzdQVpxLlGPRBij0P8=
|
||||
@@ -48,6 +50,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46t
|
||||
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
||||
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230614141756-a25a5fc3c60e h1:iQKqGTyIdCyO7kY/G5MCKhzt3xZ5YPRubbJskVp5EvQ=
|
||||
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230614141756-a25a5fc3c60e/go.mod h1:6L0zr/NUwvaPsCTK/IL17m8JUEtgLp3BDtlsBREwacg=
|
||||
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230620131644-140b3a61d4c8 h1:CILOzUB7CJGHtZHOxMJn+dN6rKzH29TOOOOep0AnFWM=
|
||||
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230620131644-140b3a61d4c8/go.mod h1:6L0zr/NUwvaPsCTK/IL17m8JUEtgLp3BDtlsBREwacg=
|
||||
github.com/datarhei/gosrt v0.5.2 h1:eagqZwEIiGPNJW0rLep3gwceObyaZ17+iKRc+l4VEpc=
|
||||
github.com/datarhei/gosrt v0.5.2/go.mod h1:0308GQhAu5hxe2KYdbss901aKceSSKXnwCr8Vs++eiw=
|
||||
github.com/datarhei/joy4 v0.0.0-20230505074825-fde05957445a h1:Tf4DSHY1xruBglr+yYP5Wct7czM86GKMYgbXH8a7OFo=
|
||||
@@ -71,6 +75,7 @@ github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2
|
||||
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
|
||||
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
|
||||
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
|
||||
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
|
||||
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
|
||||
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
|
||||
github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
|
||||
@@ -154,11 +159,13 @@ github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
|
||||
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
|
||||
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
|
||||
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
|
||||
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
|
||||
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
|
||||
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
|
||||
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
|
||||
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
|
||||
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
|
||||
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
|
||||
github.com/klauspost/compress v1.16.6 h1:91SKEy4K37vkp255cJ8QesJhjyRO0hn9i9G0GoUwLsk=
|
||||
github.com/klauspost/compress v1.16.6/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
|
||||
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
|
||||
@@ -228,6 +235,7 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb
|
||||
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
|
||||
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
|
||||
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
|
||||
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
|
||||
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
|
||||
github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=
|
||||
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
|
||||
@@ -279,6 +287,7 @@ github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFt
|
||||
github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ=
|
||||
github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU=
|
||||
github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k=
|
||||
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
|
||||
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
|
||||
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
|
||||
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
|
||||
|
@@ -1,13 +1,12 @@
|
||||
package session
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/datarhei/core/v16/io/fs"
|
||||
"github.com/datarhei/core/v16/log"
|
||||
"github.com/datarhei/core/v16/net"
|
||||
|
||||
@@ -16,10 +15,11 @@ import (
|
||||
|
||||
// Session represents an active session
|
||||
type Session struct {
|
||||
Collector string
|
||||
ID string
|
||||
Reference string
|
||||
CreatedAt time.Time
|
||||
ClosesAt time.Time
|
||||
ClosedAt time.Time
|
||||
Location string
|
||||
Peer string
|
||||
Extra map[string]interface{}
|
||||
@@ -167,6 +167,12 @@ type Collector interface {
|
||||
|
||||
// Stop stops the collector to calculate rates
|
||||
Stop()
|
||||
|
||||
// Snapshot returns the current snapshot of the history
|
||||
Snapshot() (Snapshot, error)
|
||||
|
||||
// Restore restores a previously made snapshot
|
||||
Restore(snapshot io.ReadCloser) error
|
||||
}
|
||||
|
||||
// CollectorConfig is the configuration for registering a new collector
|
||||
@@ -195,11 +201,6 @@ type CollectorConfig struct {
|
||||
// SessionTimeout is the duration of how long an idle active session is kept. A
|
||||
// session is idle if there are no ingress or egress bytes.
|
||||
SessionTimeout time.Duration
|
||||
|
||||
// PersistInterval is the duration between persisting the
|
||||
// history. Can be 0. Then the history will only be persisted
|
||||
// at stopping the collector.
|
||||
PersistInterval time.Duration
|
||||
}
|
||||
|
||||
type totals struct {
|
||||
@@ -212,7 +213,7 @@ type totals struct {
|
||||
}
|
||||
|
||||
type history struct {
|
||||
Sessions map[string]totals `json:"sessions"` // key = `${session.location}:${session.peer}`
|
||||
Sessions map[string]totals `json:"sessions"` // key = `${session.location}:${session.peer}:${session.reference}`
|
||||
}
|
||||
|
||||
type collector struct {
|
||||
@@ -222,6 +223,7 @@ type collector struct {
|
||||
sessions map[string]*session
|
||||
sessionPool sync.Pool
|
||||
sessionsWG sync.WaitGroup
|
||||
sessionsCh chan<- Session
|
||||
|
||||
staleCallback func(*session)
|
||||
|
||||
@@ -241,14 +243,6 @@ type collector struct {
|
||||
|
||||
history history
|
||||
|
||||
persist struct {
|
||||
enable bool
|
||||
fs fs.Filesystem
|
||||
path string
|
||||
interval time.Duration
|
||||
done context.CancelFunc
|
||||
}
|
||||
|
||||
inactiveTimeout time.Duration
|
||||
sessionTimeout time.Duration
|
||||
|
||||
@@ -259,12 +253,11 @@ type collector struct {
|
||||
lock struct {
|
||||
session sync.RWMutex
|
||||
history sync.RWMutex
|
||||
persist sync.Mutex
|
||||
run sync.Mutex
|
||||
companion sync.RWMutex
|
||||
}
|
||||
|
||||
startOnce sync.Once
|
||||
stopOnce sync.Once
|
||||
running bool
|
||||
}
|
||||
|
||||
const (
|
||||
@@ -285,20 +278,21 @@ func NewCollector(config CollectorConfig) Collector {
|
||||
return collector
|
||||
}
|
||||
|
||||
func newCollector(id string, persistFS fs.Filesystem, logger log.Logger, config CollectorConfig) (*collector, error) {
|
||||
func newCollector(id string, sessionsCh chan<- Session, logger log.Logger, config CollectorConfig) (*collector, error) {
|
||||
c := &collector{
|
||||
id: id,
|
||||
logger: logger,
|
||||
sessionsCh: sessionsCh,
|
||||
maxRxBitrate: float64(config.MaxRxBitrate),
|
||||
maxTxBitrate: float64(config.MaxTxBitrate),
|
||||
maxSessions: config.MaxSessions,
|
||||
inactiveTimeout: config.InactiveTimeout,
|
||||
sessionTimeout: config.SessionTimeout,
|
||||
limiter: config.Limiter,
|
||||
logger: logger,
|
||||
id: id,
|
||||
}
|
||||
|
||||
if c.logger == nil {
|
||||
c.logger = log.New("Session")
|
||||
c.logger = log.New("")
|
||||
}
|
||||
|
||||
if c.limiter == nil {
|
||||
@@ -370,6 +364,25 @@ func newCollector(id string, persistFS fs.Filesystem, logger log.Logger, config
|
||||
|
||||
c.lock.history.Unlock()
|
||||
|
||||
if c.sessionsCh != nil {
|
||||
c.sessionsCh <- Session{
|
||||
Collector: c.id,
|
||||
ID: sess.id,
|
||||
Reference: sess.reference,
|
||||
CreatedAt: sess.createdAt,
|
||||
ClosedAt: sess.closedAt,
|
||||
Location: sess.location,
|
||||
Peer: sess.peer,
|
||||
Extra: sess.extra,
|
||||
RxBytes: sess.rxBytes,
|
||||
RxBitrate: sess.RxBitrate(),
|
||||
TopRxBitrate: sess.TopRxBitrate(),
|
||||
TxBytes: sess.txBytes,
|
||||
TxBitrate: sess.TxBitrate(),
|
||||
TopTxBitrate: sess.TopTxBitrate(),
|
||||
}
|
||||
}
|
||||
|
||||
c.sessionPool.Put(sess)
|
||||
|
||||
c.currentActiveSessions--
|
||||
@@ -379,123 +392,107 @@ func newCollector(id string, persistFS fs.Filesystem, logger log.Logger, config
|
||||
|
||||
c.history.Sessions = make(map[string]totals)
|
||||
|
||||
c.persist.enable = persistFS != nil
|
||||
c.persist.fs = persistFS
|
||||
c.persist.path = "/" + id + ".json"
|
||||
c.persist.interval = config.PersistInterval
|
||||
|
||||
c.loadHistory(c.persist.fs, c.persist.path, &c.history)
|
||||
|
||||
c.stopOnce.Do(func() {})
|
||||
|
||||
c.start()
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (c *collector) start() {
|
||||
c.startOnce.Do(func() {
|
||||
if c.persist.enable && c.persist.interval != 0 {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
c.persist.done = cancel
|
||||
go c.persister(ctx, c.persist.interval)
|
||||
}
|
||||
c.lock.run.Lock()
|
||||
defer c.lock.run.Unlock()
|
||||
|
||||
c.rxBitrate, _ = average.New(averageWindow, averageGranularity)
|
||||
c.txBitrate, _ = average.New(averageWindow, averageGranularity)
|
||||
if c.running {
|
||||
return
|
||||
}
|
||||
|
||||
c.stopOnce = sync.Once{}
|
||||
})
|
||||
c.running = true
|
||||
|
||||
c.rxBitrate, _ = average.New(averageWindow, averageGranularity)
|
||||
c.txBitrate, _ = average.New(averageWindow, averageGranularity)
|
||||
}
|
||||
|
||||
func (c *collector) Stop() {
|
||||
c.stopOnce.Do(func() {
|
||||
if c.persist.enable && c.persist.interval != 0 {
|
||||
c.persist.done()
|
||||
}
|
||||
c.lock.run.Lock()
|
||||
defer c.lock.run.Unlock()
|
||||
|
||||
c.lock.session.RLock()
|
||||
for _, sess := range c.sessions {
|
||||
// Cancel all current sessions
|
||||
sess.Cancel()
|
||||
}
|
||||
c.lock.session.RUnlock()
|
||||
|
||||
// Wait for all current sessions to finish
|
||||
c.sessionsWG.Wait()
|
||||
|
||||
c.Persist()
|
||||
|
||||
c.startOnce = sync.Once{}
|
||||
})
|
||||
}
|
||||
|
||||
func (c *collector) Persist() {
|
||||
c.lock.history.RLock()
|
||||
defer c.lock.history.RUnlock()
|
||||
|
||||
c.saveHistory(c.persist.fs, c.persist.path, &c.history)
|
||||
}
|
||||
|
||||
func (c *collector) persister(ctx context.Context, interval time.Duration) {
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
c.Persist()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *collector) loadHistory(fs fs.Filesystem, path string, data *history) {
|
||||
if fs == nil {
|
||||
if !c.running {
|
||||
return
|
||||
}
|
||||
|
||||
c.logger.WithComponent("SessionStore").WithFields(log.Fields{
|
||||
"base": fs.Metadata("base"),
|
||||
"path": path,
|
||||
}).Debug().Log("Loading history")
|
||||
c.running = false
|
||||
|
||||
c.lock.persist.Lock()
|
||||
defer c.lock.persist.Unlock()
|
||||
c.lock.session.RLock()
|
||||
for _, sess := range c.sessions {
|
||||
// Cancel all current sessions
|
||||
sess.Cancel()
|
||||
}
|
||||
c.lock.session.RUnlock()
|
||||
|
||||
jsondata, err := fs.ReadFile(path)
|
||||
// Wait for all current sessions to finish
|
||||
c.sessionsWG.Wait()
|
||||
}
|
||||
|
||||
type historySnapshot struct {
|
||||
data []byte
|
||||
}
|
||||
|
||||
func (s *historySnapshot) Persist(sink SnapshotSink) error {
|
||||
if _, err := sink.Write(s.data); err != nil {
|
||||
sink.Cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
return sink.Close()
|
||||
}
|
||||
|
||||
func (s *historySnapshot) Release() {
|
||||
s.data = nil
|
||||
}
|
||||
|
||||
func (c *collector) Snapshot() (Snapshot, error) {
|
||||
c.logger.Debug().Log("Creating history snapshot")
|
||||
|
||||
c.lock.history.Lock()
|
||||
defer c.lock.history.Unlock()
|
||||
|
||||
jsondata, err := json.MarshalIndent(&c.history, "", " ")
|
||||
if err != nil {
|
||||
return
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err = json.Unmarshal(jsondata, data); err != nil {
|
||||
return
|
||||
s := &historySnapshot{
|
||||
data: jsondata,
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (c *collector) saveHistory(fs fs.Filesystem, path string, data *history) {
|
||||
if fs == nil {
|
||||
return
|
||||
func (c *collector) Restore(snapshot io.ReadCloser) error {
|
||||
if snapshot == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
c.logger.WithComponent("SessionStore").WithFields(log.Fields{
|
||||
"base": fs.Metadata("base"),
|
||||
"path": path,
|
||||
}).Debug().Log("Storing history")
|
||||
defer snapshot.Close()
|
||||
|
||||
c.lock.persist.Lock()
|
||||
defer c.lock.persist.Unlock()
|
||||
c.logger.Debug().Log("Restoring history snapshot")
|
||||
|
||||
jsondata, err := json.MarshalIndent(data, "", " ")
|
||||
jsondata, err := io.ReadAll(snapshot)
|
||||
if err != nil {
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
_, _, err = fs.WriteFileSafe(path, jsondata)
|
||||
if err != nil {
|
||||
return
|
||||
data := history{}
|
||||
|
||||
if err = json.Unmarshal(jsondata, &data); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.lock.history.Lock()
|
||||
defer c.lock.history.Unlock()
|
||||
|
||||
c.history = data
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *collector) IsCollectableIP(ip string) bool {
|
||||
@@ -838,6 +835,7 @@ func (c *collector) Active() []Session {
|
||||
}
|
||||
|
||||
session := Session{
|
||||
Collector: c.id,
|
||||
ID: sess.id,
|
||||
Reference: sess.reference,
|
||||
CreatedAt: sess.createdAt,
|
||||
@@ -953,3 +951,5 @@ func (n *nullCollector) CompanionEgressBitrate() float64
|
||||
func (n *nullCollector) CompanionTopIngressBitrate() float64 { return 0.0 }
|
||||
func (n *nullCollector) CompanionTopEgressBitrate() float64 { return 0.0 }
|
||||
func (n *nullCollector) Stop() {}
|
||||
func (n *nullCollector) Snapshot() (Snapshot, error) { return nil, nil }
|
||||
func (n *nullCollector) Restore(snapshot io.ReadCloser) error { return snapshot.Close() }
|
||||
|
@@ -7,11 +7,15 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestRegisterSession(t *testing.T) {
|
||||
c, err := newCollector("", nil, nil, CollectorConfig{
|
||||
InactiveTimeout: time.Hour,
|
||||
SessionTimeout: time.Hour,
|
||||
func createCollector(inactive, session time.Duration) (*collector, error) {
|
||||
return newCollector("", nil, nil, CollectorConfig{
|
||||
InactiveTimeout: inactive,
|
||||
SessionTimeout: session,
|
||||
})
|
||||
}
|
||||
|
||||
func TestRegisterSession(t *testing.T) {
|
||||
c, err := createCollector(time.Hour, time.Hour)
|
||||
require.Equal(t, nil, err)
|
||||
|
||||
b := c.IsKnownSession("foobar")
|
||||
@@ -31,10 +35,7 @@ func TestRegisterSession(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestInactiveSession(t *testing.T) {
|
||||
c, err := newCollector("", nil, nil, CollectorConfig{
|
||||
InactiveTimeout: time.Second,
|
||||
SessionTimeout: time.Hour,
|
||||
})
|
||||
c, err := createCollector(time.Second, time.Hour)
|
||||
require.Equal(t, nil, err)
|
||||
|
||||
b := c.IsKnownSession("foobar")
|
||||
@@ -52,10 +53,7 @@ func TestInactiveSession(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestActivateSession(t *testing.T) {
|
||||
c, err := newCollector("", nil, nil, CollectorConfig{
|
||||
InactiveTimeout: time.Second,
|
||||
SessionTimeout: time.Second,
|
||||
})
|
||||
c, err := createCollector(time.Second, time.Second)
|
||||
require.Equal(t, nil, err)
|
||||
|
||||
b := c.IsKnownSession("foobar")
|
||||
@@ -73,10 +71,7 @@ func TestActivateSession(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestIngress(t *testing.T) {
|
||||
c, err := newCollector("", nil, nil, CollectorConfig{
|
||||
InactiveTimeout: time.Second,
|
||||
SessionTimeout: time.Hour,
|
||||
})
|
||||
c, err := createCollector(time.Second, time.Hour)
|
||||
require.Equal(t, nil, err)
|
||||
|
||||
c.RegisterAndActivate("foobar", "", "", "")
|
||||
@@ -92,10 +87,7 @@ func TestIngress(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestEgress(t *testing.T) {
|
||||
c, err := newCollector("", nil, nil, CollectorConfig{
|
||||
InactiveTimeout: time.Second,
|
||||
SessionTimeout: time.Hour,
|
||||
})
|
||||
c, err := createCollector(time.Second, time.Hour)
|
||||
require.Equal(t, nil, err)
|
||||
|
||||
c.RegisterAndActivate("foobar", "", "", "")
|
||||
@@ -111,10 +103,7 @@ func TestEgress(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNbSessions(t *testing.T) {
|
||||
c, err := newCollector("", nil, nil, CollectorConfig{
|
||||
InactiveTimeout: time.Hour,
|
||||
SessionTimeout: time.Hour,
|
||||
})
|
||||
c, err := createCollector(time.Hour, time.Hour)
|
||||
require.Equal(t, nil, err)
|
||||
|
||||
nsessions := c.Sessions()
|
||||
|
@@ -1,12 +1,17 @@
|
||||
package session
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/datarhei/core/v16/io/fs"
|
||||
"github.com/datarhei/core/v16/log"
|
||||
"github.com/lestrrat-go/strftime"
|
||||
)
|
||||
|
||||
// Config is the configuration for creating a new registry
|
||||
@@ -15,6 +20,19 @@ type Config struct {
|
||||
// history will not be persisted.
|
||||
PersistFS fs.Filesystem
|
||||
|
||||
// PersistInterval is the duration between persisting the history. Can be 0. Then the history will
|
||||
// only be persisted at stopping the collector.
|
||||
PersistInterval time.Duration
|
||||
|
||||
// SessionLogPattern is a path inside the PersistFS where the individual sessions will
|
||||
// be logged. The path can contain strftime-plateholders in order to split the log files.
|
||||
// If this string is empty or PersistFS is nil, the sessions will not be logged.
|
||||
LogPattern string
|
||||
|
||||
// SessionLogBufferDuration is the maximum duration session logs should be buffered before written
|
||||
// to the filesystem. If not provided, the default of 15 seconds will be used.
|
||||
LogBufferDuration time.Duration
|
||||
|
||||
// Logger is an instance of a logger. If it is nil, no logs
|
||||
// will be written.
|
||||
Logger log.Logger
|
||||
@@ -47,32 +65,212 @@ type Registry interface {
|
||||
UnregisterAll()
|
||||
|
||||
RegistryReader
|
||||
|
||||
Close() error
|
||||
}
|
||||
|
||||
type registry struct {
|
||||
collector map[string]*collector
|
||||
persistFS fs.Filesystem
|
||||
logger log.Logger
|
||||
|
||||
persist struct {
|
||||
fs fs.Filesystem
|
||||
interval time.Duration
|
||||
cancel context.CancelFunc
|
||||
sessionsCh chan Session
|
||||
sessionsWg sync.WaitGroup
|
||||
logPattern *strftime.Strftime
|
||||
logBufferDuration time.Duration
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
logger log.Logger
|
||||
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
// New returns a new registry for collectors that implement the Registry interface. The error
|
||||
// is non-nil if the PersistDir from the config can't be created.
|
||||
func New(conf Config) (Registry, error) {
|
||||
// is non-nil if the registry can't be created.
|
||||
func New(config Config) (Registry, error) {
|
||||
r := ®istry{
|
||||
collector: make(map[string]*collector),
|
||||
persistFS: conf.PersistFS,
|
||||
logger: conf.Logger,
|
||||
logger: config.Logger,
|
||||
}
|
||||
|
||||
r.persist.fs = config.PersistFS
|
||||
r.persist.interval = config.PersistInterval
|
||||
|
||||
if r.logger == nil {
|
||||
r.logger = log.New("Session")
|
||||
}
|
||||
|
||||
pattern, err := strftime.New(config.LogPattern)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
r.persist.logPattern = pattern
|
||||
r.persist.logBufferDuration = config.LogBufferDuration
|
||||
if r.persist.logBufferDuration <= 0 {
|
||||
r.persist.logBufferDuration = 15 * time.Second
|
||||
}
|
||||
|
||||
r.startPersister()
|
||||
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func (r *registry) Close() error {
|
||||
r.UnregisterAll()
|
||||
r.stopPersister()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *registry) startPersister() {
|
||||
if r.persist.fs == nil {
|
||||
return
|
||||
}
|
||||
|
||||
r.persist.lock.Lock()
|
||||
defer r.persist.lock.Unlock()
|
||||
|
||||
if r.persist.interval > 0 {
|
||||
if r.persist.cancel == nil {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
r.persist.cancel = cancel
|
||||
|
||||
go r.historyPersister(ctx, r.persist.interval)
|
||||
}
|
||||
}
|
||||
|
||||
if r.persist.logPattern != nil {
|
||||
r.persist.sessionsCh = make(chan Session, 128)
|
||||
r.persist.sessionsWg.Add(1)
|
||||
}
|
||||
|
||||
go r.sessionPersister(r.persist.logPattern, r.persist.logBufferDuration, r.persist.sessionsCh)
|
||||
}
|
||||
|
||||
func (r *registry) stopPersister() {
|
||||
if r.persist.fs == nil {
|
||||
return
|
||||
}
|
||||
|
||||
r.persist.lock.Lock()
|
||||
defer r.persist.lock.Unlock()
|
||||
|
||||
if r.persist.cancel != nil {
|
||||
r.persist.cancel()
|
||||
r.persist.cancel = nil
|
||||
}
|
||||
|
||||
if r.persist.sessionsCh != nil {
|
||||
close(r.persist.sessionsCh)
|
||||
r.persist.sessionsWg.Wait()
|
||||
r.persist.sessionsCh = nil
|
||||
}
|
||||
}
|
||||
|
||||
func (r *registry) historyPersister(ctx context.Context, interval time.Duration) {
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
r.logger.Debug().WithField("interval", interval).Log("History persister started")
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
r.persistAllCollectors()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *registry) sessionPersister(pattern *strftime.Strftime, bufferDuration time.Duration, ch <-chan Session) {
|
||||
defer r.persist.sessionsWg.Done()
|
||||
|
||||
r.logger.Debug().WithFields(log.Fields{
|
||||
"pattern": pattern.Pattern(),
|
||||
"buffer": bufferDuration,
|
||||
}).Log("Session persister started")
|
||||
|
||||
buffer := &bytes.Buffer{}
|
||||
path := pattern.FormatString(time.Now())
|
||||
|
||||
enc := json.NewEncoder(buffer)
|
||||
|
||||
ticker := time.NewTicker(bufferDuration)
|
||||
defer ticker.Stop()
|
||||
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case session, ok := <-ch:
|
||||
if !ok {
|
||||
break loop
|
||||
}
|
||||
currentPath := pattern.FormatString(session.ClosedAt)
|
||||
if currentPath != path {
|
||||
if buffer.Len() > 0 {
|
||||
_, _, err := r.persist.fs.WriteFileSafe(path, buffer.Bytes())
|
||||
r.logger.Error().WithError(err).WithField("path", path).Log("")
|
||||
}
|
||||
buffer.Reset()
|
||||
path = currentPath
|
||||
}
|
||||
|
||||
enc.Encode(&session)
|
||||
case t := <-ticker.C:
|
||||
if buffer.Len() > 0 {
|
||||
_, _, err := r.persist.fs.WriteFileSafe(path, buffer.Bytes())
|
||||
r.logger.Error().WithError(err).WithField("path", path).Log("")
|
||||
}
|
||||
currentPath := pattern.FormatString(t)
|
||||
if currentPath != path {
|
||||
buffer.Reset()
|
||||
path = currentPath
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if buffer.Len() > 0 {
|
||||
_, _, err := r.persist.fs.WriteFileSafe(path, buffer.Bytes())
|
||||
r.logger.Error().WithError(err).WithField("path", path).Log("")
|
||||
}
|
||||
|
||||
buffer = nil
|
||||
}
|
||||
|
||||
func (r *registry) persistAllCollectors() {
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
r.lock.Lock()
|
||||
for id, m := range r.collector {
|
||||
wg.Add(1)
|
||||
go func(id string, m Collector) {
|
||||
defer wg.Done()
|
||||
|
||||
s, err := m.Snapshot()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
sink, err := NewHistorySink(r.persist.fs, "/"+id+".json")
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if err := s.Persist(sink); err != nil {
|
||||
return
|
||||
}
|
||||
}(id, m)
|
||||
}
|
||||
r.lock.Unlock()
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (r *registry) Register(id string, conf CollectorConfig) (Collector, error) {
|
||||
if len(id) == 0 {
|
||||
return nil, fmt.Errorf("invalid ID. empty IDs are not allowed")
|
||||
@@ -91,11 +289,22 @@ func (r *registry) Register(id string, conf CollectorConfig) (Collector, error)
|
||||
return nil, fmt.Errorf("a collector with the ID '%s' already exists", id)
|
||||
}
|
||||
|
||||
m, err := newCollector(id, r.persistFS, r.logger, conf)
|
||||
m, err := newCollector(id, r.persist.sessionsCh, r.logger.WithComponent(id), conf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if r.persist.fs != nil {
|
||||
s, err := NewHistorySource(r.persist.fs, "/"+id+".json")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := m.Restore(s); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
m.start()
|
||||
|
||||
r.collector[id] = m
|
||||
@@ -107,6 +316,10 @@ func (r *registry) Unregister(id string) error {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
|
||||
return r.unregister(id)
|
||||
}
|
||||
|
||||
func (r *registry) unregister(id string) error {
|
||||
m, ok := r.collector[id]
|
||||
if !ok {
|
||||
return fmt.Errorf("a collector with the ID '%s' doesn't exist", id)
|
||||
@@ -116,6 +329,24 @@ func (r *registry) Unregister(id string) error {
|
||||
|
||||
delete(r.collector, id)
|
||||
|
||||
if r.persist.fs != nil {
|
||||
s, err := m.Snapshot()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if s != nil {
|
||||
sink, err := NewHistorySink(r.persist.fs, "/"+id+".json")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.Persist(sink); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -148,8 +379,8 @@ func (r *registry) UnregisterAll() {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
|
||||
for _, m := range r.collector {
|
||||
m.Stop()
|
||||
for id := range r.collector {
|
||||
r.unregister(id)
|
||||
}
|
||||
|
||||
r.collector = make(map[string]*collector)
|
||||
|
@@ -2,56 +2,68 @@ package session
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/datarhei/core/v16/io/fs"
|
||||
"github.com/lestrrat-go/strftime"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestRegister(t *testing.T) {
|
||||
r, err := New(Config{})
|
||||
require.Equal(t, nil, err)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
r.Close()
|
||||
})
|
||||
|
||||
_, err = r.Register("", CollectorConfig{})
|
||||
require.NotEqual(t, nil, err)
|
||||
require.Error(t, err)
|
||||
|
||||
_, err = r.Register("../foo/bar", CollectorConfig{})
|
||||
require.NotEqual(t, nil, err)
|
||||
require.Error(t, err)
|
||||
|
||||
_, err = r.Register("foobar", CollectorConfig{})
|
||||
require.Equal(t, nil, err)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = r.Register("foobar", CollectorConfig{})
|
||||
require.NotEqual(t, nil, err)
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestUnregister(t *testing.T) {
|
||||
r, err := New(Config{})
|
||||
require.Equal(t, nil, err)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
r.Close()
|
||||
})
|
||||
|
||||
_, err = r.Register("foobar", CollectorConfig{})
|
||||
require.Equal(t, nil, err)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = r.Unregister("foobar")
|
||||
require.Equal(t, nil, err)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = r.Unregister("foobar")
|
||||
require.NotEqual(t, nil, err)
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestCollectors(t *testing.T) {
|
||||
r, err := New(Config{})
|
||||
require.Equal(t, nil, err)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
r.Close()
|
||||
})
|
||||
|
||||
c := r.Collectors()
|
||||
require.Equal(t, []string{}, c)
|
||||
|
||||
_, err = r.Register("foobar", CollectorConfig{})
|
||||
require.Equal(t, nil, err)
|
||||
require.NoError(t, err)
|
||||
|
||||
c = r.Collectors()
|
||||
require.Equal(t, []string{"foobar"}, c)
|
||||
|
||||
err = r.Unregister("foobar")
|
||||
require.Equal(t, nil, err)
|
||||
require.NoError(t, err)
|
||||
|
||||
c = r.Collectors()
|
||||
require.Equal(t, []string{}, c)
|
||||
@@ -59,33 +71,39 @@ func TestCollectors(t *testing.T) {
|
||||
|
||||
func TestGetCollector(t *testing.T) {
|
||||
r, err := New(Config{})
|
||||
require.Equal(t, nil, err)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
r.Close()
|
||||
})
|
||||
|
||||
c := r.Collector("foobar")
|
||||
require.Equal(t, nil, c)
|
||||
require.Nil(t, c)
|
||||
|
||||
_, err = r.Register("foobar", CollectorConfig{})
|
||||
require.Equal(t, nil, err)
|
||||
require.NoError(t, err)
|
||||
|
||||
c = r.Collector("foobar")
|
||||
require.NotEqual(t, nil, c)
|
||||
require.NotNil(t, c)
|
||||
|
||||
err = r.Unregister("foobar")
|
||||
require.Equal(t, nil, err)
|
||||
require.NoError(t, err)
|
||||
|
||||
c = r.Collector("foobar")
|
||||
require.Equal(t, nil, c)
|
||||
require.Nil(t, c)
|
||||
}
|
||||
|
||||
func TestUnregisterAll(t *testing.T) {
|
||||
r, err := New(Config{})
|
||||
require.Equal(t, nil, err)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
r.Close()
|
||||
})
|
||||
|
||||
_, err = r.Register("foo", CollectorConfig{})
|
||||
require.Equal(t, nil, err)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = r.Register("bar", CollectorConfig{})
|
||||
require.Equal(t, nil, err)
|
||||
require.NoError(t, err)
|
||||
|
||||
c := r.Collectors()
|
||||
require.ElementsMatch(t, []string{"foo", "bar"}, c)
|
||||
@@ -95,3 +113,190 @@ func TestUnregisterAll(t *testing.T) {
|
||||
c = r.Collectors()
|
||||
require.Equal(t, []string{}, c)
|
||||
}
|
||||
|
||||
func TestPersistHistory(t *testing.T) {
|
||||
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
|
||||
require.NoError(t, err)
|
||||
|
||||
r, err := New(Config{
|
||||
PersistFS: memfs,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
r.Close()
|
||||
})
|
||||
|
||||
c, err := r.Register("foobar", CollectorConfig{})
|
||||
require.NoError(t, err)
|
||||
|
||||
c.RegisterAndActivate("foo", "ref", "location", "peer")
|
||||
c.Egress("foo", 42)
|
||||
|
||||
err = r.Unregister("foobar")
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = memfs.Stat("/foobar.json")
|
||||
require.NoError(t, err)
|
||||
|
||||
c, err = r.Register("foobar", CollectorConfig{})
|
||||
require.NoError(t, err)
|
||||
|
||||
cc := c.(*collector)
|
||||
totals, ok := cc.history.Sessions["location:peer:ref"]
|
||||
require.True(t, ok)
|
||||
require.Equal(t, "location", totals.Location)
|
||||
require.Equal(t, "ref", totals.Reference)
|
||||
require.Equal(t, "peer", totals.Peer)
|
||||
require.Equal(t, uint64(42), totals.TotalTxBytes)
|
||||
}
|
||||
|
||||
func TestPeriodicPersistHistory(t *testing.T) {
|
||||
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
|
||||
require.NoError(t, err)
|
||||
|
||||
r, err := New(Config{
|
||||
PersistFS: memfs,
|
||||
PersistInterval: 5 * time.Second,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
r.Close()
|
||||
})
|
||||
|
||||
c, err := r.Register("foobar", CollectorConfig{
|
||||
SessionTimeout: time.Second,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
c.RegisterAndActivate("foo", "ref", "location", "peer")
|
||||
c.Egress("foo", 42)
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
_, err = memfs.Stat("/foobar.json")
|
||||
return err == nil
|
||||
}, 10*time.Second, time.Second)
|
||||
|
||||
err = r.Unregister("foobar")
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = memfs.Stat("/foobar.json")
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestPersistSession(t *testing.T) {
|
||||
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
|
||||
require.NoError(t, err)
|
||||
|
||||
pattern := "/log/%Y-%m-%d.log"
|
||||
|
||||
r, err := New(Config{
|
||||
PersistFS: memfs,
|
||||
LogPattern: pattern,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
r.Close()
|
||||
})
|
||||
|
||||
c, err := r.Register("foobar", CollectorConfig{
|
||||
SessionTimeout: 3 * time.Second,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
c.RegisterAndActivate("foo", "ref", "location", "peer")
|
||||
c.Egress("foo", 42)
|
||||
|
||||
err = r.Unregister("foobar")
|
||||
require.NoError(t, err)
|
||||
|
||||
r.Close()
|
||||
|
||||
path, err := strftime.Format(pattern, time.Now())
|
||||
require.NoError(t, err)
|
||||
|
||||
info, err := memfs.Stat(path)
|
||||
require.NoError(t, err)
|
||||
require.Greater(t, info.Size(), int64(0))
|
||||
}
|
||||
|
||||
func TestPersistSessionSlpit(t *testing.T) {
|
||||
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
|
||||
require.NoError(t, err)
|
||||
|
||||
pattern := "/log/%Y-%m-%d-%H:%M:%S.log"
|
||||
|
||||
r, err := New(Config{
|
||||
PersistFS: memfs,
|
||||
LogPattern: pattern,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
r.Close()
|
||||
})
|
||||
|
||||
c, err := r.Register("foobar", CollectorConfig{
|
||||
SessionTimeout: 3 * time.Second,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
c.RegisterAndActivate("foo", "ref", "location", "peer")
|
||||
c.Egress("foo", 42)
|
||||
|
||||
time.Sleep(3 * time.Second)
|
||||
|
||||
c.RegisterAndActivate("bar", "ref", "location", "peer")
|
||||
c.Egress("bar", 24)
|
||||
|
||||
err = r.Unregister("foobar")
|
||||
require.NoError(t, err)
|
||||
|
||||
r.Close()
|
||||
|
||||
require.Equal(t, int64(2), memfs.Files())
|
||||
}
|
||||
|
||||
func TestPersistSessionBuffer(t *testing.T) {
|
||||
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
|
||||
require.NoError(t, err)
|
||||
|
||||
pattern := "/log/%Y-%m-%d.log"
|
||||
|
||||
r, err := New(Config{
|
||||
PersistFS: memfs,
|
||||
LogPattern: pattern,
|
||||
LogBufferDuration: 5 * time.Second,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
r.Close()
|
||||
})
|
||||
|
||||
c, err := r.Register("foobar", CollectorConfig{
|
||||
SessionTimeout: 3 * time.Second,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
c.RegisterAndActivate("foo", "ref", "location", "peer")
|
||||
c.Egress("foo", 42)
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
path, err := strftime.Format(pattern, time.Now())
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
_, err = memfs.Stat(path)
|
||||
return err == nil
|
||||
}, 10*time.Second, time.Second)
|
||||
|
||||
err = r.Unregister("foobar")
|
||||
require.NoError(t, err)
|
||||
|
||||
r.Close()
|
||||
|
||||
path, err := strftime.Format(pattern, time.Now())
|
||||
require.NoError(t, err)
|
||||
|
||||
info, err := memfs.Stat(path)
|
||||
require.NoError(t, err)
|
||||
require.Greater(t, info.Size(), int64(0))
|
||||
}
|
||||
|
94
session/snapshot.go
Normal file
94
session/snapshot.go
Normal file
@@ -0,0 +1,94 @@
|
||||
package session
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"os"
|
||||
|
||||
"github.com/datarhei/core/v16/io/fs"
|
||||
)
|
||||
|
||||
type SnapshotSource interface {
|
||||
io.ReadCloser
|
||||
}
|
||||
|
||||
type SnapshotSink interface {
|
||||
io.WriteCloser
|
||||
|
||||
Cancel() error
|
||||
}
|
||||
|
||||
type Snapshot interface {
|
||||
Persist(sink SnapshotSink) error
|
||||
Release()
|
||||
}
|
||||
|
||||
type historySource struct {
|
||||
fs fs.Filesystem
|
||||
path string
|
||||
data *bytes.Reader
|
||||
}
|
||||
|
||||
// NewHistorySource returns a new SnapshotSource which reads the previously stored
|
||||
// session history. If there's no data, a nil source with a nil error will be returned.
|
||||
// If there's data, a non-nil source with a nil error will be returned. Otherwise
|
||||
// the source will be nil and the error non-nil.
|
||||
func NewHistorySource(fs fs.Filesystem, path string) (SnapshotSource, error) {
|
||||
s := &historySource{
|
||||
fs: fs,
|
||||
path: path,
|
||||
}
|
||||
|
||||
if _, err := s.fs.Stat(s.path); err == os.ErrNotExist {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
data, err := s.fs.ReadFile(s.path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s.data = bytes.NewReader(data)
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *historySource) Read(p []byte) (int, error) {
|
||||
return s.data.Read(p)
|
||||
}
|
||||
|
||||
func (s *historySource) Close() error {
|
||||
s.data = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
type historySink struct {
|
||||
fs fs.Filesystem
|
||||
path string
|
||||
data *bytes.Buffer
|
||||
}
|
||||
|
||||
func NewHistorySink(fs fs.Filesystem, path string) (SnapshotSink, error) {
|
||||
s := &historySink{
|
||||
fs: fs,
|
||||
path: path,
|
||||
data: &bytes.Buffer{},
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *historySink) Write(p []byte) (int, error) {
|
||||
return s.data.Write(p)
|
||||
}
|
||||
|
||||
func (s *historySink) Close() error {
|
||||
_, _, err := s.fs.WriteFileSafe(s.path, s.data.Bytes())
|
||||
s.data = nil
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *historySink) Cancel() error {
|
||||
s.data = nil
|
||||
return nil
|
||||
}
|
16
vendor/github.com/datarhei/core-client-go/v16/api/config.go
generated
vendored
16
vendor/github.com/datarhei/core-client-go/v16/api/config.go
generated
vendored
@@ -430,13 +430,15 @@ type ConfigV3 struct {
|
||||
Interval int64 `json:"interval_sec" format:"int64"` // seconds
|
||||
} `json:"metrics"`
|
||||
Sessions struct {
|
||||
Enable bool `json:"enable"`
|
||||
IPIgnoreList []string `json:"ip_ignorelist"`
|
||||
SessionTimeout int `json:"session_timeout_sec" format:"int"`
|
||||
Persist bool `json:"persist"`
|
||||
PersistInterval int `json:"persist_interval_sec" format:"int"`
|
||||
MaxBitrate uint64 `json:"max_bitrate_mbit" format:"uint64"`
|
||||
MaxSessions uint64 `json:"max_sessions" format:"uint64"`
|
||||
Enable bool `json:"enable"`
|
||||
IPIgnoreList []string `json:"ip_ignorelist"`
|
||||
Persist bool `json:"persist"`
|
||||
PersistInterval int `json:"persist_interval_sec" format:"int"`
|
||||
SessionTimeout int `json:"session_timeout_sec" format:"int"`
|
||||
SessionLogPathPattern string `json:"session_log_path_pattern"`
|
||||
SessionLogBuffer int `json:"session_log_buffer_sec" format:"int"`
|
||||
MaxBitrate uint64 `json:"max_bitrate_mbit" format:"uint64"`
|
||||
MaxSessions uint64 `json:"max_sessions" format:"uint64"`
|
||||
} `json:"sessions"`
|
||||
Service struct {
|
||||
Enable bool `json:"enable"`
|
||||
|
2
vendor/modules.txt
vendored
2
vendor/modules.txt
vendored
@@ -78,7 +78,7 @@ github.com/cespare/xxhash/v2
|
||||
# github.com/cpuguy83/go-md2man/v2 v2.0.2
|
||||
## explicit; go 1.11
|
||||
github.com/cpuguy83/go-md2man/v2/md2man
|
||||
# github.com/datarhei/core-client-go/v16 v16.11.1-0.20230614141756-a25a5fc3c60e
|
||||
# github.com/datarhei/core-client-go/v16 v16.11.1-0.20230620131644-140b3a61d4c8
|
||||
## explicit; go 1.18
|
||||
github.com/datarhei/core-client-go/v16
|
||||
github.com/datarhei/core-client-go/v16/api
|
||||
|
Reference in New Issue
Block a user