From 79d8309b342b3428cae328fc1e88cadb710b92f9 Mon Sep 17 00:00:00 2001 From: Jason Date: Sat, 10 Aug 2019 20:39:04 +0800 Subject: [PATCH] update completed sessions --- common/stats/session/session.go | 22 +++++++++++----------- proxy/socks/tcp.go | 6 +++--- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/common/stats/session/session.go b/common/stats/session/session.go index 9f9f1da..71532fa 100644 --- a/common/stats/session/session.go +++ b/common/stats/session/session.go @@ -10,6 +10,7 @@ import ( "sync/atomic" "time" + "github.com/hashicorp/golang-lru" "golang.org/x/text/language" "golang.org/x/text/message" @@ -30,13 +31,14 @@ var ( type simpleSessionStater struct { activeSessions sync.Map - completedSessions chan *stats.Session + completedSessions *lru.Cache server *http.Server } func NewSimpleSessionStater() stats.SessionStater { + cache, _ := lru.New(maxCompletedSessions) return &simpleSessionStater{ - completedSessions: make(chan *stats.Session, maxCompletedSessions), + completedSessions: cache, } } @@ -52,8 +54,11 @@ func (s *simpleSessionStater) Start() error { }) var completedSessions []stats.Session - for sess := range s.completedSessions { - completedSessions = append(completedSessions, *sess) + keys := s.completedSessions.Keys() + for _, key := range keys { + if item, ok := s.completedSessions.Peek(key); ok { + completedSessions = append(completedSessions, *(item.(*stats.Session))) + } } p := message.NewPrinter(language.English) @@ -95,7 +100,7 @@ func (s *simpleSessionStater) Start() error { _, _ = fmt.Fprintf(w, "

Active sessions %d (%d)

", len(activeSessions), atomic.LoadInt64(ActiveTCPConnections)) tablePrint(w, activeSessions) _, _ = fmt.Fprintf(w, "

") - _, _ = fmt.Fprintf(w, "

Recently completed sessions %d

", len(s.completedSessions)) + _, _ = fmt.Fprintf(w, "

Recently completed sessions %d

", len(completedSessions)) tablePrint(w, completedSessions) _, _ = fmt.Fprintf(w, "") _ = w.Flush() @@ -129,12 +134,7 @@ func (s *simpleSessionStater) GetSession(key interface{}) *stats.Session { func (s *simpleSessionStater) RemoveSession(key interface{}) { if sess, ok := s.activeSessions.Load(key); ok { // move to completed sessions - select { - case s.completedSessions <- sess.(*stats.Session): - default: - <-s.completedSessions - s.completedSessions <- sess.(*stats.Session) - } + s.completedSessions.ContainsOrAdd(key, sess) // delete s.activeSessions.Delete(key) } diff --git a/proxy/socks/tcp.go b/proxy/socks/tcp.go index 441d00f..9d6cd26 100644 --- a/proxy/socks/tcp.go +++ b/proxy/socks/tcp.go @@ -115,12 +115,12 @@ func (h *tcpHandler) relay(localConn, remoteConn net.Conn, sess *stats.Session) <-upCh // Wait for UpLink done. + // add -1 + atomic.AddInt64(&activeTCPConnections, -1) + if h.sessionStater != nil { h.sessionStater.RemoveSession(localConn) } - - // add -1 - atomic.AddInt64(&activeTCPConnections, -1) } func (h *tcpHandler) Handle(localConn net.Conn, target *net.TCPAddr) error {