update completed sessions

This commit is contained in:
Jason
2019-08-10 20:39:04 +08:00
parent 22d83027bb
commit 79d8309b34
2 changed files with 14 additions and 14 deletions

View File

@@ -10,6 +10,7 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/hashicorp/golang-lru"
"golang.org/x/text/language" "golang.org/x/text/language"
"golang.org/x/text/message" "golang.org/x/text/message"
@@ -30,13 +31,14 @@ var (
type simpleSessionStater struct { type simpleSessionStater struct {
activeSessions sync.Map activeSessions sync.Map
completedSessions chan *stats.Session completedSessions *lru.Cache
server *http.Server server *http.Server
} }
func NewSimpleSessionStater() stats.SessionStater { func NewSimpleSessionStater() stats.SessionStater {
cache, _ := lru.New(maxCompletedSessions)
return &simpleSessionStater{ return &simpleSessionStater{
completedSessions: make(chan *stats.Session, maxCompletedSessions), completedSessions: cache,
} }
} }
@@ -52,8 +54,11 @@ func (s *simpleSessionStater) Start() error {
}) })
var completedSessions []stats.Session var completedSessions []stats.Session
for sess := range s.completedSessions { keys := s.completedSessions.Keys()
completedSessions = append(completedSessions, *sess) for _, key := range keys {
if item, ok := s.completedSessions.Peek(key); ok {
completedSessions = append(completedSessions, *(item.(*stats.Session)))
}
} }
p := message.NewPrinter(language.English) p := message.NewPrinter(language.English)
@@ -95,7 +100,7 @@ func (s *simpleSessionStater) Start() error {
_, _ = fmt.Fprintf(w, "<p>Active sessions %d (%d)</p>", len(activeSessions), atomic.LoadInt64(ActiveTCPConnections)) _, _ = fmt.Fprintf(w, "<p>Active sessions %d (%d)</p>", len(activeSessions), atomic.LoadInt64(ActiveTCPConnections))
tablePrint(w, activeSessions) tablePrint(w, activeSessions)
_, _ = fmt.Fprintf(w, "<br/><br/>") _, _ = fmt.Fprintf(w, "<br/><br/>")
_, _ = fmt.Fprintf(w, "<p>Recently completed sessions %d</p>", len(s.completedSessions)) _, _ = fmt.Fprintf(w, "<p>Recently completed sessions %d</p>", len(completedSessions))
tablePrint(w, completedSessions) tablePrint(w, completedSessions)
_, _ = fmt.Fprintf(w, "</html>") _, _ = fmt.Fprintf(w, "</html>")
_ = w.Flush() _ = w.Flush()
@@ -129,12 +134,7 @@ func (s *simpleSessionStater) GetSession(key interface{}) *stats.Session {
func (s *simpleSessionStater) RemoveSession(key interface{}) { func (s *simpleSessionStater) RemoveSession(key interface{}) {
if sess, ok := s.activeSessions.Load(key); ok { if sess, ok := s.activeSessions.Load(key); ok {
// move to completed sessions // move to completed sessions
select { s.completedSessions.ContainsOrAdd(key, sess)
case s.completedSessions <- sess.(*stats.Session):
default:
<-s.completedSessions
s.completedSessions <- sess.(*stats.Session)
}
// delete // delete
s.activeSessions.Delete(key) s.activeSessions.Delete(key)
} }

View File

@@ -115,12 +115,12 @@ func (h *tcpHandler) relay(localConn, remoteConn net.Conn, sess *stats.Session)
<-upCh // Wait for UpLink done. <-upCh // Wait for UpLink done.
// add -1
atomic.AddInt64(&activeTCPConnections, -1)
if h.sessionStater != nil { if h.sessionStater != nil {
h.sessionStater.RemoveSession(localConn) h.sessionStater.RemoveSession(localConn)
} }
// add -1
atomic.AddInt64(&activeTCPConnections, -1)
} }
func (h *tcpHandler) Handle(localConn net.Conn, target *net.TCPAddr) error { func (h *tcpHandler) Handle(localConn net.Conn, target *net.TCPAddr) error {