mirror of
				https://github.com/datarhei/core.git
				synced 2025-10-31 11:26:52 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			966 lines
		
	
	
		
			23 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			966 lines
		
	
	
		
			23 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package session
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"encoding/json"
 | |
| 	"os"
 | |
| 	"path/filepath"
 | |
| 	"sort"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/datarhei/core/v16/io/file"
 | |
| 	"github.com/datarhei/core/v16/log"
 | |
| 	"github.com/datarhei/core/v16/net"
 | |
| 
 | |
| 	"github.com/prep/average"
 | |
| )
 | |
| 
 | |
| // Session represents an active session
 | |
| type Session struct {
 | |
| 	ID           string
 | |
| 	Reference    string
 | |
| 	CreatedAt    time.Time
 | |
| 	Location     string
 | |
| 	Peer         string
 | |
| 	Extra        string
 | |
| 	RxBytes      uint64
 | |
| 	RxBitrate    float64 // bit/s
 | |
| 	TopRxBitrate float64 // bit/s
 | |
| 	TxBytes      uint64
 | |
| 	TxBitrate    float64 // bit/s
 | |
| 	TopTxBitrate float64 // bit/s
 | |
| }
 | |
| 
 | |
| // Summary is a summary over all current and past sessions.
 | |
| // The past sessions are grouped over the Peers/Locations and
 | |
| // the Locations.
 | |
| type Summary struct {
 | |
| 	MaxSessions  uint64
 | |
| 	MaxRxBitrate float64 // bit/s
 | |
| 	MaxTxBitrate float64 // bit/s
 | |
| 
 | |
| 	CurrentSessions  uint64
 | |
| 	CurrentRxBitrate float64 // bit/s
 | |
| 	CurrentTxBitrate float64 // bit/s
 | |
| 
 | |
| 	Active []Session
 | |
| 
 | |
| 	Summary struct {
 | |
| 		Peers      map[string]Peers
 | |
| 		Locations  map[string]Stats
 | |
| 		References map[string]Stats
 | |
| 		Stats
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Peers is a group of the same peer grouped of the locations.
 | |
| type Peers struct {
 | |
| 	Stats
 | |
| 
 | |
| 	Locations map[string]Stats
 | |
| }
 | |
| 
 | |
| // Stats holds the basic accumulated values like the number of sessions,
 | |
| // total transmitted and received bytes.
 | |
| type Stats struct {
 | |
| 	TotalSessions uint64
 | |
| 	TotalRxBytes  uint64
 | |
| 	TotalTxBytes  uint64
 | |
| }
 | |
| 
 | |
| // The Collector interface
 | |
| type Collector interface {
 | |
| 	// Register registers a new session. A session has to be activated in order
 | |
| 	// not to be dropped. A different id distinguishes different sessions.
 | |
| 	Register(id, reference, location, peer string)
 | |
| 
 | |
| 	// Activate activates the session with the id. Returns true if the session
 | |
| 	// has been activated, false if the session was already activated.
 | |
| 	Activate(id string) bool
 | |
| 
 | |
| 	// RegisterAndActivate registers and an activates a session.
 | |
| 	RegisterAndActivate(id, reference, location, peer string)
 | |
| 
 | |
| 	// Add arbitrary extra data to a session
 | |
| 	Extra(id, extra string)
 | |
| 
 | |
| 	// Unregister cancels a session prematurely.
 | |
| 	Unregister(id string)
 | |
| 
 | |
| 	// Ingress adds size bytes of ingress traffic to a session.
 | |
| 	Ingress(id string, size int64)
 | |
| 
 | |
| 	// Egress adds size bytes of egress traffic to a session.
 | |
| 	Egress(id string, size int64)
 | |
| 
 | |
| 	// IngressBitrate returns the current bitrate of ingress traffic.
 | |
| 	IngressBitrate() float64
 | |
| 
 | |
| 	// EgressBitrate returns the current bitrate of egress traffic.
 | |
| 	EgressBitrate() float64
 | |
| 
 | |
| 	// MaxIngressBitrate return the defined maximum ingress bitrate. All values <= 0
 | |
| 	// mean no limit.
 | |
| 	MaxIngressBitrate() float64
 | |
| 
 | |
| 	// MaxEgressBitrate return the defined maximum egress bitrate. All values <= 0
 | |
| 	// mean no limit.
 | |
| 	MaxEgressBitrate() float64
 | |
| 
 | |
| 	// TopIngressBitrate returns the summed current top bitrates of all ingress sessions.
 | |
| 	TopIngressBitrate() float64
 | |
| 
 | |
| 	// TopEgressBitrate returns the summed current top bitrates of all egress sessions.
 | |
| 	TopEgressBitrate() float64
 | |
| 
 | |
| 	// IsIngressBitrateExceeded returns whether the defined maximum ingress bitrate has
 | |
| 	// been exceeded.
 | |
| 	IsIngressBitrateExceeded() bool
 | |
| 
 | |
| 	// IsEgressBitrateExceeded returns whether the defined maximum egress bitrate has
 | |
| 	// been exceeded.
 | |
| 	IsEgressBitrateExceeded() bool
 | |
| 
 | |
| 	// IsSessionsExceeded return whether the maximum number of session have been exceeded.
 | |
| 	IsSessionsExceeded() bool
 | |
| 
 | |
| 	// IsKnowsession returns whether a session with the given id exists.
 | |
| 	IsKnownSession(id string) bool
 | |
| 
 | |
| 	// IsAllowedIP returns whether traffic from/to the given IP should be considered.
 | |
| 	IsCollectableIP(ip string) bool
 | |
| 
 | |
| 	// Summary returns the summary of all currently active sessions and the session history.
 | |
| 	Summary() Summary
 | |
| 
 | |
| 	// Active returns a list of currently active sessions.
 | |
| 	Active() []Session
 | |
| 
 | |
| 	// SessionIngressTopBitrate returns the top ingress bitrate of a specific session.
 | |
| 	SessionTopIngressBitrate(id string) float64
 | |
| 
 | |
| 	// SessionIngressTopBitrate returns the top egress bitrate of a specific session.
 | |
| 	SessionTopEgressBitrate(id string) float64
 | |
| 
 | |
| 	// SessionSetIngressTopBitrate sets the current top ingress bitrate of a session.
 | |
| 	SessionSetTopIngressBitrate(id string, bitrate float64)
 | |
| 
 | |
| 	// SessionSetEgressTopBitrate sets the current top egress bitrate of a session.
 | |
| 	SessionSetTopEgressBitrate(id string, bitrate float64)
 | |
| 
 | |
| 	// Sessions returns the number of currently active sessions.
 | |
| 	Sessions() uint64
 | |
| 
 | |
| 	AddCompanion(collector Collector)
 | |
| 
 | |
| 	// IngressBitrate returns the current bitrate of ingress traffic.
 | |
| 	CompanionIngressBitrate() float64
 | |
| 
 | |
| 	// EgressBitrate returns the current bitrate of egress traffic.
 | |
| 	CompanionEgressBitrate() float64
 | |
| 
 | |
| 	// TopIngressBitrate returns the summed current top bitrates of all ingress sessions.
 | |
| 	CompanionTopIngressBitrate() float64
 | |
| 
 | |
| 	// TopEgressBitrate returns the summed current top bitrates of all egress sessions.
 | |
| 	CompanionTopEgressBitrate() float64
 | |
| 
 | |
| 	// Stop stops the collector to calculate rates
 | |
| 	Stop()
 | |
| }
 | |
| 
 | |
| // CollectorConfig is the configuration for registering a new collector
 | |
| type CollectorConfig struct {
 | |
| 	// MaxRxBitrate is the maximum ingress bitrate. It is used to query whether
 | |
| 	// the maximum bitrate is reached, based on the actucal bitrate.
 | |
| 	MaxRxBitrate uint64
 | |
| 
 | |
| 	// MaxTxBitrate is the maximum egress bitrate. It is used to query whether
 | |
| 	// the maximum bitrate is reached, based on the actucal bitrate.
 | |
| 	MaxTxBitrate uint64
 | |
| 
 | |
| 	// MaxSessions is the maximum number of session. It is used to query whether
 | |
| 	// the maximum number of sessions is reached, based on the actual number
 | |
| 	// of pending and active sessions.
 | |
| 	MaxSessions uint64
 | |
| 
 | |
| 	// Limiter is an IPLimiter. It is used to query whether a session for an IP
 | |
| 	// should be created.
 | |
| 	Limiter net.IPLimiter
 | |
| 
 | |
| 	// InactiveTimeout is the duration of how long a not yet activated session is kept.
 | |
| 	// A session gets activated with the first ingress or egress bytes.
 | |
| 	InactiveTimeout time.Duration
 | |
| 
 | |
| 	// SessionTimeout is the duration of how long an idle active session is kept. A
 | |
| 	// session is idle if there are no ingress or egress bytes.
 | |
| 	SessionTimeout time.Duration
 | |
| 
 | |
| 	// PersistInterval is the duration between persisting the
 | |
| 	// history. Can be 0. Then the history will only be persisted
 | |
| 	// at stopping the collector.
 | |
| 	PersistInterval time.Duration
 | |
| }
 | |
| 
 | |
| type totals struct {
 | |
| 	Location      string `json:"location"`
 | |
| 	Peer          string `json:"peer"`
 | |
| 	Reference     string `json:"reference"`
 | |
| 	TotalSessions uint64 `json:"total_sessions"`
 | |
| 	TotalRxBytes  uint64 `json:"total_rxbytes"`
 | |
| 	TotalTxBytes  uint64 `json:"total_txbytes"`
 | |
| }
 | |
| 
 | |
| type history struct {
 | |
| 	Sessions map[string]totals `json:"sessions"` // key = `${session.location}:${session.peer}`
 | |
| }
 | |
| 
 | |
| type collector struct {
 | |
| 	id     string
 | |
| 	logger log.Logger
 | |
| 
 | |
| 	sessions    map[string]*session
 | |
| 	sessionPool sync.Pool
 | |
| 	sessionsWG  sync.WaitGroup
 | |
| 
 | |
| 	staleCallback func(*session)
 | |
| 
 | |
| 	currentPendingSessions uint64
 | |
| 	currentActiveSessions  uint64
 | |
| 
 | |
| 	totalSessions uint64
 | |
| 	rxBytes       uint64
 | |
| 	txBytes       uint64
 | |
| 
 | |
| 	maxRxBitrate float64
 | |
| 	maxTxBitrate float64
 | |
| 	maxSessions  uint64
 | |
| 
 | |
| 	rxBitrate *average.SlidingWindow
 | |
| 	txBitrate *average.SlidingWindow
 | |
| 
 | |
| 	history history
 | |
| 
 | |
| 	persist struct {
 | |
| 		enable   bool
 | |
| 		path     string
 | |
| 		interval time.Duration
 | |
| 		done     context.CancelFunc
 | |
| 	}
 | |
| 
 | |
| 	inactiveTimeout time.Duration
 | |
| 	sessionTimeout  time.Duration
 | |
| 
 | |
| 	limiter net.IPLimiter
 | |
| 
 | |
| 	companions []Collector
 | |
| 
 | |
| 	lock struct {
 | |
| 		session   sync.RWMutex
 | |
| 		history   sync.RWMutex
 | |
| 		persist   sync.Mutex
 | |
| 		companion sync.RWMutex
 | |
| 	}
 | |
| 
 | |
| 	startOnce sync.Once
 | |
| 	stopOnce  sync.Once
 | |
| }
 | |
| 
 | |
| const (
 | |
| 	averageWindow      = 10 * time.Second
 | |
| 	averageGranularity = time.Second
 | |
| )
 | |
| 
 | |
| // NewCollector returns a new collector according to the provided configuration. If such a
 | |
| // collector can't be created, a NullCollector is returned.
 | |
| func NewCollector(config CollectorConfig) Collector {
 | |
| 	collector, err := newCollector("", "", nil, config)
 | |
| 	if err != nil {
 | |
| 		return NewNullCollector()
 | |
| 	}
 | |
| 
 | |
| 	collector.start()
 | |
| 
 | |
| 	return collector
 | |
| }
 | |
| 
 | |
| func newCollector(id, persistPath string, logger log.Logger, config CollectorConfig) (*collector, error) {
 | |
| 	c := &collector{
 | |
| 		maxRxBitrate:    float64(config.MaxRxBitrate),
 | |
| 		maxTxBitrate:    float64(config.MaxTxBitrate),
 | |
| 		maxSessions:     config.MaxSessions,
 | |
| 		inactiveTimeout: config.InactiveTimeout,
 | |
| 		sessionTimeout:  config.SessionTimeout,
 | |
| 		limiter:         config.Limiter,
 | |
| 		logger:          logger,
 | |
| 		id:              id,
 | |
| 	}
 | |
| 
 | |
| 	if c.logger == nil {
 | |
| 		c.logger = log.New("Session")
 | |
| 	}
 | |
| 
 | |
| 	if c.limiter == nil {
 | |
| 		c.limiter, _ = net.NewIPLimiter(nil, nil)
 | |
| 	}
 | |
| 
 | |
| 	if c.sessionTimeout <= 0 {
 | |
| 		c.sessionTimeout = 5 * time.Second
 | |
| 	}
 | |
| 
 | |
| 	if c.inactiveTimeout <= 0 {
 | |
| 		c.inactiveTimeout = c.sessionTimeout
 | |
| 	}
 | |
| 
 | |
| 	c.sessionPool = sync.Pool{
 | |
| 		New: func() interface{} {
 | |
| 			return &session{}
 | |
| 		},
 | |
| 	}
 | |
| 
 | |
| 	c.staleCallback = func(sess *session) {
 | |
| 		defer func() {
 | |
| 			c.sessionsWG.Done()
 | |
| 		}()
 | |
| 
 | |
| 		c.lock.session.Lock()
 | |
| 		defer c.lock.session.Unlock()
 | |
| 
 | |
| 		delete(c.sessions, sess.id)
 | |
| 
 | |
| 		if !sess.active {
 | |
| 			c.currentPendingSessions--
 | |
| 
 | |
| 			sess.logger.Debug().Log("Closed pending")
 | |
| 
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		logger = sess.logger.WithFields(log.Fields{
 | |
| 			"rx_bytes":           sess.rxBytes,
 | |
| 			"rx_bitrate_kbit":    sess.RxBitrate() / 1024,
 | |
| 			"rx_maxbitrate_kbit": sess.MaxRxBitrate() / 1024,
 | |
| 			"tx_bytes":           sess.txBytes,
 | |
| 			"tx_bitrate_kbit":    sess.TxBitrate() / 1024,
 | |
| 			"tx_maxbitrate_kbit": sess.MaxTxBitrate() / 1024,
 | |
| 		})
 | |
| 
 | |
| 		// Only log session that have been active
 | |
| 		logger.Info().Log("Closed")
 | |
| 		logger.Debug().Log("Closed")
 | |
| 
 | |
| 		c.lock.history.Lock()
 | |
| 
 | |
| 		key := sess.location + ":" + sess.peer + ":" + sess.reference
 | |
| 
 | |
| 		// Update history totals per key
 | |
| 		t, ok := c.history.Sessions[key]
 | |
| 		t.TotalSessions++
 | |
| 		t.TotalRxBytes += sess.rxBytes
 | |
| 		t.TotalTxBytes += sess.txBytes
 | |
| 
 | |
| 		if !ok {
 | |
| 			t.Location = sess.location
 | |
| 			t.Peer = sess.peer
 | |
| 			t.Reference = sess.reference
 | |
| 		}
 | |
| 
 | |
| 		c.history.Sessions[key] = t
 | |
| 
 | |
| 		c.lock.history.Unlock()
 | |
| 
 | |
| 		c.sessionPool.Put(sess)
 | |
| 
 | |
| 		c.currentActiveSessions--
 | |
| 	}
 | |
| 
 | |
| 	c.sessions = make(map[string]*session)
 | |
| 
 | |
| 	c.history.Sessions = make(map[string]totals)
 | |
| 
 | |
| 	c.persist.enable = len(persistPath) != 0
 | |
| 	c.persist.path = persistPath
 | |
| 	c.persist.interval = config.PersistInterval
 | |
| 
 | |
| 	c.loadHistory(c.persist.path, &c.history)
 | |
| 
 | |
| 	c.stopOnce.Do(func() {})
 | |
| 
 | |
| 	c.start()
 | |
| 
 | |
| 	return c, nil
 | |
| }
 | |
| 
 | |
| func (c *collector) start() {
 | |
| 	c.startOnce.Do(func() {
 | |
| 		if c.persist.enable && c.persist.interval != 0 {
 | |
| 			ctx, cancel := context.WithCancel(context.Background())
 | |
| 			c.persist.done = cancel
 | |
| 			go c.persister(ctx, c.persist.interval)
 | |
| 		}
 | |
| 
 | |
| 		c.rxBitrate, _ = average.New(averageWindow, averageGranularity)
 | |
| 		c.txBitrate, _ = average.New(averageWindow, averageGranularity)
 | |
| 
 | |
| 		c.stopOnce = sync.Once{}
 | |
| 	})
 | |
| }
 | |
| 
 | |
| func (c *collector) Stop() {
 | |
| 	c.stopOnce.Do(func() {
 | |
| 		if c.persist.enable && c.persist.interval != 0 {
 | |
| 			c.persist.done()
 | |
| 		}
 | |
| 
 | |
| 		c.lock.session.RLock()
 | |
| 		for _, sess := range c.sessions {
 | |
| 			// Cancel all current sessions
 | |
| 			sess.Cancel()
 | |
| 		}
 | |
| 		c.lock.session.RUnlock()
 | |
| 
 | |
| 		// Wait for all current sessions to finish
 | |
| 		c.sessionsWG.Wait()
 | |
| 
 | |
| 		c.Persist()
 | |
| 
 | |
| 		c.startOnce = sync.Once{}
 | |
| 	})
 | |
| }
 | |
| 
 | |
| func (c *collector) Persist() {
 | |
| 	c.lock.history.RLock()
 | |
| 	defer c.lock.history.RUnlock()
 | |
| 
 | |
| 	c.saveHistory(c.persist.path, &c.history)
 | |
| }
 | |
| 
 | |
| func (c *collector) persister(ctx context.Context, interval time.Duration) {
 | |
| 	ticker := time.NewTicker(interval)
 | |
| 	defer ticker.Stop()
 | |
| 
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			return
 | |
| 		case <-ticker.C:
 | |
| 			c.Persist()
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (c *collector) loadHistory(path string, data *history) {
 | |
| 	c.logger.WithComponent("SessionStore").WithField("path", path).Debug().Log("Loading history")
 | |
| 
 | |
| 	if len(path) == 0 {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	c.lock.persist.Lock()
 | |
| 	defer c.lock.persist.Unlock()
 | |
| 
 | |
| 	jsondata, err := os.ReadFile(path)
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	if err = json.Unmarshal(jsondata, data); err != nil {
 | |
| 		return
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (c *collector) saveHistory(path string, data *history) {
 | |
| 	if len(path) == 0 {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	c.logger.WithComponent("SessionStore").WithField("path", path).Debug().Log("Storing history")
 | |
| 
 | |
| 	c.lock.persist.Lock()
 | |
| 	defer c.lock.persist.Unlock()
 | |
| 
 | |
| 	jsondata, err := json.MarshalIndent(data, "", "    ")
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	dir := filepath.Dir(path)
 | |
| 	filename := filepath.Base(path)
 | |
| 
 | |
| 	tmpfile, err := os.CreateTemp(dir, filename)
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	defer os.Remove(tmpfile.Name())
 | |
| 
 | |
| 	if _, err := tmpfile.Write(jsondata); err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	if err := tmpfile.Close(); err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	if err := file.Rename(tmpfile.Name(), path); err != nil {
 | |
| 		return
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (c *collector) IsCollectableIP(ip string) bool {
 | |
| 	return c.limiter.IsAllowed(ip)
 | |
| }
 | |
| 
 | |
| func (c *collector) IsKnownSession(id string) bool {
 | |
| 	c.lock.session.RLock()
 | |
| 	_, ok := c.sessions[id]
 | |
| 	c.lock.session.RUnlock()
 | |
| 
 | |
| 	return ok
 | |
| }
 | |
| 
 | |
| func (c *collector) RegisterAndActivate(id, reference, location, peer string) {
 | |
| 	c.Register(id, reference, location, peer)
 | |
| 	c.Activate(id)
 | |
| }
 | |
| 
 | |
| func (c *collector) Register(id, reference, location, peer string) {
 | |
| 	c.lock.session.Lock()
 | |
| 	defer c.lock.session.Unlock()
 | |
| 
 | |
| 	_, ok := c.sessions[id]
 | |
| 	if ok {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	logger := c.logger.WithFields(log.Fields{
 | |
| 		"id":        id,
 | |
| 		"type":      c.id,
 | |
| 		"location":  location,
 | |
| 		"peer":      peer,
 | |
| 		"reference": reference,
 | |
| 	})
 | |
| 
 | |
| 	c.sessionsWG.Add(1)
 | |
| 	sess := c.sessionPool.Get().(*session)
 | |
| 	sess.Init(id, reference, c.staleCallback, c.inactiveTimeout, c.sessionTimeout, logger)
 | |
| 
 | |
| 	logger.Debug().Log("Pending")
 | |
| 
 | |
| 	c.currentPendingSessions++
 | |
| 
 | |
| 	sess.Register(location, peer)
 | |
| 
 | |
| 	c.sessions[id] = sess
 | |
| }
 | |
| 
 | |
| func (c *collector) Unregister(id string) {
 | |
| 	c.lock.session.RLock()
 | |
| 	defer c.lock.session.RUnlock()
 | |
| 
 | |
| 	sess, ok := c.sessions[id]
 | |
| 	if ok {
 | |
| 		sess.Cancel()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (c *collector) Activate(id string) bool {
 | |
| 	if len(id) == 0 {
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	c.lock.session.RLock()
 | |
| 	sess, ok := c.sessions[id]
 | |
| 	c.lock.session.RUnlock()
 | |
| 
 | |
| 	if !ok {
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	if sess.Activate() {
 | |
| 		c.currentPendingSessions--
 | |
| 		c.currentActiveSessions++
 | |
| 		c.totalSessions++
 | |
| 
 | |
| 		sess.logger.Info().Log("Active")
 | |
| 		sess.logger.Debug().Log("Active")
 | |
| 
 | |
| 		return true
 | |
| 	}
 | |
| 
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| func (c *collector) Extra(id, extra string) {
 | |
| 	c.lock.session.RLock()
 | |
| 	sess, ok := c.sessions[id]
 | |
| 	c.lock.session.RUnlock()
 | |
| 
 | |
| 	if !ok {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	sess.Extra(extra)
 | |
| }
 | |
| 
 | |
| func (c *collector) Ingress(id string, size int64) {
 | |
| 	if len(id) == 0 {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	c.lock.session.RLock()
 | |
| 	sess, ok := c.sessions[id]
 | |
| 	c.lock.session.RUnlock()
 | |
| 
 | |
| 	if !ok {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	if sess.Ingress(size) {
 | |
| 		c.rxBitrate.Add(size * 8)
 | |
| 		c.rxBytes += uint64(size)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (c *collector) Egress(id string, size int64) {
 | |
| 	if len(id) == 0 {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	c.lock.session.RLock()
 | |
| 	sess, ok := c.sessions[id]
 | |
| 	c.lock.session.RUnlock()
 | |
| 
 | |
| 	if !ok {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	if sess.Egress(size) {
 | |
| 		c.txBitrate.Add(size * 8)
 | |
| 		c.txBytes += uint64(size)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (c *collector) IsIngressBitrateExceeded() bool {
 | |
| 	if c.maxRxBitrate <= 0 {
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	if c.IngressBitrate() > c.maxRxBitrate {
 | |
| 		return true
 | |
| 	}
 | |
| 
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| func (c *collector) IsEgressBitrateExceeded() bool {
 | |
| 	if c.maxTxBitrate <= 0 {
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	if c.EgressBitrate() > c.maxTxBitrate {
 | |
| 		return true
 | |
| 	}
 | |
| 
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| func (c *collector) IsSessionsExceeded() bool {
 | |
| 	if c.maxSessions <= 0 {
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	if c.Sessions() >= c.maxSessions {
 | |
| 		return true
 | |
| 	}
 | |
| 
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| func (c *collector) IngressBitrate() float64 {
 | |
| 	return c.rxBitrate.Average(averageWindow)
 | |
| }
 | |
| 
 | |
| func (c *collector) EgressBitrate() float64 {
 | |
| 	return c.txBitrate.Average(averageWindow)
 | |
| }
 | |
| 
 | |
| func (c *collector) MaxIngressBitrate() float64 {
 | |
| 	return c.maxRxBitrate
 | |
| }
 | |
| 
 | |
| func (c *collector) MaxEgressBitrate() float64 {
 | |
| 	return c.maxTxBitrate
 | |
| }
 | |
| 
 | |
| func (c *collector) TopIngressBitrate() float64 {
 | |
| 	var bitrate float64 = 0
 | |
| 
 | |
| 	c.lock.session.RLock()
 | |
| 	defer c.lock.session.RUnlock()
 | |
| 
 | |
| 	for _, sess := range c.sessions {
 | |
| 		if !sess.active {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		bitrate += sess.TopRxBitrate()
 | |
| 	}
 | |
| 
 | |
| 	return bitrate
 | |
| }
 | |
| 
 | |
| func (c *collector) TopEgressBitrate() float64 {
 | |
| 	var bitrate float64 = 0
 | |
| 
 | |
| 	c.lock.session.RLock()
 | |
| 	defer c.lock.session.RUnlock()
 | |
| 
 | |
| 	for _, sess := range c.sessions {
 | |
| 		if !sess.active {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		bitrate += sess.TopTxBitrate()
 | |
| 	}
 | |
| 
 | |
| 	return bitrate
 | |
| }
 | |
| 
 | |
| func (c *collector) SessionTopIngressBitrate(id string) float64 {
 | |
| 	c.lock.session.RLock()
 | |
| 	defer c.lock.session.RUnlock()
 | |
| 
 | |
| 	if sess, ok := c.sessions[id]; ok {
 | |
| 		return sess.TopRxBitrate()
 | |
| 	}
 | |
| 
 | |
| 	return 0.0
 | |
| }
 | |
| 
 | |
| func (c *collector) SessionTopEgressBitrate(id string) float64 {
 | |
| 	c.lock.session.RLock()
 | |
| 	defer c.lock.session.RUnlock()
 | |
| 
 | |
| 	if sess, ok := c.sessions[id]; ok {
 | |
| 		return sess.TopTxBitrate()
 | |
| 	}
 | |
| 
 | |
| 	return 0.0
 | |
| }
 | |
| 
 | |
| func (c *collector) SessionSetTopIngressBitrate(id string, bitrate float64) {
 | |
| 	c.lock.session.RLock()
 | |
| 	defer c.lock.session.RUnlock()
 | |
| 
 | |
| 	if sess, ok := c.sessions[id]; ok {
 | |
| 		sess.SetTopRxBitrate(bitrate)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (c *collector) SessionSetTopEgressBitrate(id string, bitrate float64) {
 | |
| 	c.lock.session.RLock()
 | |
| 	defer c.lock.session.RUnlock()
 | |
| 
 | |
| 	if sess, ok := c.sessions[id]; ok {
 | |
| 		sess.SetTopTxBitrate(bitrate)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (c *collector) Sessions() uint64 {
 | |
| 	return c.currentActiveSessions
 | |
| }
 | |
| 
 | |
| func (c *collector) Summary() Summary {
 | |
| 	summary := Summary{
 | |
| 		MaxSessions:  c.maxSessions,
 | |
| 		MaxRxBitrate: c.maxRxBitrate,
 | |
| 		MaxTxBitrate: c.maxTxBitrate,
 | |
| 	}
 | |
| 
 | |
| 	summary.CurrentSessions = c.currentActiveSessions
 | |
| 	summary.CurrentRxBitrate = c.IngressBitrate()
 | |
| 	summary.CurrentTxBitrate = c.EgressBitrate()
 | |
| 
 | |
| 	summary.Summary.Peers = make(map[string]Peers)
 | |
| 	summary.Summary.Locations = make(map[string]Stats)
 | |
| 	summary.Summary.References = make(map[string]Stats)
 | |
| 
 | |
| 	c.lock.history.RLock()
 | |
| 
 | |
| 	for _, v := range c.history.Sessions {
 | |
| 		p := summary.Summary.Peers[v.Peer]
 | |
| 
 | |
| 		p.TotalSessions += v.TotalSessions
 | |
| 		p.TotalRxBytes += v.TotalRxBytes
 | |
| 		p.TotalTxBytes += v.TotalTxBytes
 | |
| 
 | |
| 		if p.Locations == nil {
 | |
| 			p.Locations = make(map[string]Stats)
 | |
| 		}
 | |
| 
 | |
| 		stats := p.Locations[v.Location]
 | |
| 
 | |
| 		stats.TotalSessions += v.TotalSessions
 | |
| 		stats.TotalRxBytes += v.TotalRxBytes
 | |
| 		stats.TotalTxBytes += v.TotalTxBytes
 | |
| 
 | |
| 		p.Locations[v.Location] = stats
 | |
| 
 | |
| 		summary.Summary.Peers[v.Peer] = p
 | |
| 
 | |
| 		l := summary.Summary.Locations[v.Location]
 | |
| 
 | |
| 		l.TotalSessions += v.TotalSessions
 | |
| 		l.TotalRxBytes += v.TotalRxBytes
 | |
| 		l.TotalTxBytes += v.TotalTxBytes
 | |
| 
 | |
| 		summary.Summary.Locations[v.Location] = l
 | |
| 
 | |
| 		r := summary.Summary.References[v.Reference]
 | |
| 
 | |
| 		r.TotalSessions += v.TotalSessions
 | |
| 		r.TotalRxBytes += v.TotalRxBytes
 | |
| 		r.TotalTxBytes += v.TotalTxBytes
 | |
| 
 | |
| 		summary.Summary.References[v.Reference] = r
 | |
| 
 | |
| 		summary.Summary.TotalSessions += v.TotalSessions
 | |
| 		summary.Summary.TotalRxBytes += v.TotalRxBytes
 | |
| 		summary.Summary.TotalTxBytes += v.TotalTxBytes
 | |
| 	}
 | |
| 
 | |
| 	c.lock.history.RUnlock()
 | |
| 
 | |
| 	summary.Active = c.Active()
 | |
| 
 | |
| 	return summary
 | |
| }
 | |
| 
 | |
| func (c *collector) Active() []Session {
 | |
| 	sessions := []Session{}
 | |
| 
 | |
| 	c.lock.session.RLock()
 | |
| 	for _, sess := range c.sessions {
 | |
| 		if !sess.active {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		session := Session{
 | |
| 			ID:           sess.id,
 | |
| 			Reference:    sess.reference,
 | |
| 			CreatedAt:    sess.createdAt,
 | |
| 			Location:     sess.location,
 | |
| 			Peer:         sess.peer,
 | |
| 			Extra:        sess.extra,
 | |
| 			RxBytes:      sess.rxBytes,
 | |
| 			RxBitrate:    sess.RxBitrate(),
 | |
| 			TopRxBitrate: sess.TopRxBitrate(),
 | |
| 			TxBytes:      sess.txBytes,
 | |
| 			TxBitrate:    sess.TxBitrate(),
 | |
| 			TopTxBitrate: sess.TopTxBitrate(),
 | |
| 		}
 | |
| 
 | |
| 		sessions = append(sessions, session)
 | |
| 	}
 | |
| 	c.lock.session.RUnlock()
 | |
| 
 | |
| 	sort.Slice(sessions, func(i, j int) bool {
 | |
| 		return sessions[i].CreatedAt.Before(sessions[j].CreatedAt)
 | |
| 	})
 | |
| 
 | |
| 	return sessions
 | |
| }
 | |
| 
 | |
| func (c *collector) AddCompanion(collector Collector) {
 | |
| 	c.lock.companion.Lock()
 | |
| 	c.companions = append(c.companions, collector)
 | |
| 	c.lock.companion.Unlock()
 | |
| }
 | |
| 
 | |
| func (c *collector) CompanionIngressBitrate() float64 {
 | |
| 	bitrate := c.IngressBitrate()
 | |
| 
 | |
| 	c.lock.companion.RLock()
 | |
| 	for _, co := range c.companions {
 | |
| 		bitrate += co.IngressBitrate()
 | |
| 	}
 | |
| 	c.lock.companion.RUnlock()
 | |
| 
 | |
| 	return bitrate
 | |
| }
 | |
| 
 | |
| func (c *collector) CompanionEgressBitrate() float64 {
 | |
| 	bitrate := c.EgressBitrate()
 | |
| 
 | |
| 	c.lock.companion.RLock()
 | |
| 	for _, co := range c.companions {
 | |
| 		bitrate += co.EgressBitrate()
 | |
| 	}
 | |
| 	c.lock.companion.RUnlock()
 | |
| 
 | |
| 	return bitrate
 | |
| }
 | |
| 
 | |
| func (c *collector) CompanionTopIngressBitrate() float64 {
 | |
| 	bitrate := c.TopIngressBitrate()
 | |
| 
 | |
| 	c.lock.companion.RLock()
 | |
| 	for _, co := range c.companions {
 | |
| 		bitrate += co.TopIngressBitrate()
 | |
| 	}
 | |
| 	c.lock.companion.RUnlock()
 | |
| 
 | |
| 	return bitrate
 | |
| }
 | |
| 
 | |
| func (c *collector) CompanionTopEgressBitrate() float64 {
 | |
| 	bitrate := c.TopEgressBitrate()
 | |
| 
 | |
| 	c.lock.companion.RLock()
 | |
| 	for _, co := range c.companions {
 | |
| 		bitrate += co.TopEgressBitrate()
 | |
| 	}
 | |
| 	c.lock.companion.RUnlock()
 | |
| 
 | |
| 	return bitrate
 | |
| }
 | |
| 
 | |
| type nullCollector struct{}
 | |
| 
 | |
| // NewNullCollector returns an implementation of the Collector interface that
 | |
| // doesn't collect any metrics at all.
 | |
| func NewNullCollector() Collector                                                 { return &nullCollector{} }
 | |
| func (n *nullCollector) Register(id, reference, location, peer string)            {}
 | |
| func (n *nullCollector) Activate(id string) bool                                  { return false }
 | |
| func (n *nullCollector) RegisterAndActivate(id, reference, location, peer string) {}
 | |
| func (n *nullCollector) Extra(id, extra string)                                   {}
 | |
| func (n *nullCollector) Unregister(id string)                                     {}
 | |
| func (n *nullCollector) Ingress(id string, size int64)                            {}
 | |
| func (n *nullCollector) Egress(id string, size int64)                             {}
 | |
| func (n *nullCollector) IngressBitrate() float64                                  { return 0.0 }
 | |
| func (n *nullCollector) EgressBitrate() float64                                   { return 0.0 }
 | |
| func (n *nullCollector) MaxIngressBitrate() float64                               { return 0.0 }
 | |
| func (n *nullCollector) MaxEgressBitrate() float64                                { return 0.0 }
 | |
| func (n *nullCollector) TopIngressBitrate() float64                               { return 0.0 }
 | |
| func (n *nullCollector) TopEgressBitrate() float64                                { return 0.0 }
 | |
| func (n *nullCollector) IsIngressBitrateExceeded() bool                           { return false }
 | |
| func (n *nullCollector) IsEgressBitrateExceeded() bool                            { return false }
 | |
| func (n *nullCollector) IsSessionsExceeded() bool                                 { return false }
 | |
| func (n *nullCollector) IsKnownSession(id string) bool                            { return false }
 | |
| func (n *nullCollector) IsCollectableIP(ip string) bool                           { return true }
 | |
| func (n *nullCollector) Summary() Summary                                         { return Summary{} }
 | |
| func (n *nullCollector) Active() []Session                                        { return []Session{} }
 | |
| func (n *nullCollector) SessionTopIngressBitrate(id string) float64               { return 0.0 }
 | |
| func (n *nullCollector) SessionTopEgressBitrate(id string) float64                { return 0.0 }
 | |
| func (n *nullCollector) SessionSetTopIngressBitrate(id string, bitrate float64)   {}
 | |
| func (n *nullCollector) SessionSetTopEgressBitrate(id string, bitrate float64)    {}
 | |
| func (n *nullCollector) Sessions() uint64                                         { return 0 }
 | |
| func (n *nullCollector) AddCompanion(collector Collector)                         {}
 | |
| func (n *nullCollector) CompanionIngressBitrate() float64                         { return 0.0 }
 | |
| func (n *nullCollector) CompanionEgressBitrate() float64                          { return 0.0 }
 | |
| func (n *nullCollector) CompanionTopIngressBitrate() float64                      { return 0.0 }
 | |
| func (n *nullCollector) CompanionTopEgressBitrate() float64                       { return 0.0 }
 | |
| func (n *nullCollector) Stop()                                                    {}
 | 
