mirror of
https://github.com/bolucat/Archive.git
synced 2025-09-26 20:21:35 +08:00
Update On Sun Sep 7 20:32:43 CEST 2025
This commit is contained in:
@@ -38,7 +38,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
idleUnderlayTickerInterval = 5 * time.Second
|
||||
underlayCleanInterval = 5 * time.Second
|
||||
)
|
||||
|
||||
// Mux manages the sessions and underlays.
|
||||
@@ -85,7 +85,7 @@ func NewMux(isClinet bool) *Mux {
|
||||
chAccept: make(chan net.Conn, sessionChanCapacity),
|
||||
acceptErr: make(chan error),
|
||||
done: make(chan struct{}),
|
||||
cleaner: time.NewTicker(idleUnderlayTickerInterval),
|
||||
cleaner: time.NewTicker(underlayCleanInterval),
|
||||
}
|
||||
mux.ctx, mux.ctxCancelFunc = context.WithCancel(context.Background())
|
||||
|
||||
@@ -479,10 +479,10 @@ func (m *Mux) acceptUnderlayLoop(ctx context.Context, properties UnderlayPropert
|
||||
}
|
||||
log.Infof("Mux is listening to endpoint %s %s", network, laddr)
|
||||
underlay := &PacketUnderlay{
|
||||
baseUnderlay: *newBaseUnderlay(false, properties.MTU()),
|
||||
conn: conn,
|
||||
idleSessionTicker: time.NewTicker(idleSessionTickerInterval),
|
||||
users: m.users,
|
||||
baseUnderlay: *newBaseUnderlay(false, properties.MTU()),
|
||||
conn: conn,
|
||||
sessionCleanTicker: time.NewTicker(sessionCleanInterval),
|
||||
users: m.users,
|
||||
}
|
||||
log.Infof("Created new server underlay %v", underlay)
|
||||
m.mu.Lock()
|
||||
@@ -564,10 +564,11 @@ func (m *Mux) serverWrapTCPConn(rawConn net.Conn, mtu int, users map[string]*app
|
||||
blocks = append(blocks, blocksFromUser...)
|
||||
}
|
||||
return &StreamUnderlay{
|
||||
baseUnderlay: *newBaseUnderlay(false, mtu),
|
||||
conn: rawConn,
|
||||
candidates: blocks,
|
||||
users: users,
|
||||
baseUnderlay: *newBaseUnderlay(false, mtu),
|
||||
conn: rawConn,
|
||||
candidates: blocks,
|
||||
sessionCleanTicker: time.NewTicker(sessionCleanInterval),
|
||||
users: users,
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -21,6 +21,7 @@ import (
|
||||
"io"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/enfein/mieru/v3/pkg/appctl/appctlpb"
|
||||
"github.com/enfein/mieru/v3/pkg/common"
|
||||
@@ -28,7 +29,10 @@ import (
|
||||
"github.com/enfein/mieru/v3/pkg/stderror"
|
||||
)
|
||||
|
||||
const sessionChanCapacity = 64
|
||||
const (
|
||||
sessionChanCapacity = 64
|
||||
sessionCleanInterval = 5 * time.Second
|
||||
)
|
||||
|
||||
// baseUnderlay contains a partial implementation of underlay.
|
||||
type baseUnderlay struct {
|
||||
|
@@ -38,8 +38,7 @@ const (
|
||||
packetOverhead = cipher.DefaultNonceSize + MetadataLength + cipher.DefaultOverhead*2
|
||||
packetNonHeaderPosition = cipher.DefaultNonceSize + MetadataLength + cipher.DefaultOverhead
|
||||
|
||||
idleSessionTickerInterval = 5 * time.Second
|
||||
idleSessionTimeout = time.Minute
|
||||
idleSessionTimeout = time.Minute
|
||||
|
||||
readOneSegmentTimeout = 5 * time.Second
|
||||
)
|
||||
@@ -51,7 +50,7 @@ type PacketUnderlay struct {
|
||||
baseUnderlay
|
||||
conn net.PacketConn
|
||||
|
||||
idleSessionTicker *time.Ticker
|
||||
sessionCleanTicker *time.Ticker
|
||||
|
||||
// ---- client fields ----
|
||||
serverAddr net.Addr
|
||||
@@ -91,11 +90,11 @@ func NewPacketUnderlay(ctx context.Context, network, addr string, mtu int, block
|
||||
return nil, fmt.Errorf("ApplyUDPControls() failed: %w", err)
|
||||
}
|
||||
u := &PacketUnderlay{
|
||||
baseUnderlay: *newBaseUnderlay(true, mtu),
|
||||
conn: conn,
|
||||
idleSessionTicker: time.NewTicker(idleSessionTickerInterval),
|
||||
serverAddr: remoteAddr,
|
||||
block: block,
|
||||
baseUnderlay: *newBaseUnderlay(true, mtu),
|
||||
conn: conn,
|
||||
sessionCleanTicker: time.NewTicker(sessionCleanInterval),
|
||||
serverAddr: remoteAddr,
|
||||
block: block,
|
||||
}
|
||||
// The block cipher expires after this time.
|
||||
u.scheduler.SetRemainingTime(cipher.KeyRefreshInterval / 2)
|
||||
@@ -123,7 +122,7 @@ func (u *PacketUnderlay) Close() error {
|
||||
}
|
||||
|
||||
log.Debugf("Closing %v", u)
|
||||
u.idleSessionTicker.Stop()
|
||||
u.sessionCleanTicker.Stop()
|
||||
u.baseUnderlay.Close()
|
||||
return u.conn.Close()
|
||||
}
|
||||
@@ -175,13 +174,13 @@ func (u *PacketUnderlay) RunEventLoop(ctx context.Context) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
u.closeIdleSessions()
|
||||
u.cleanSessions()
|
||||
return nil
|
||||
case <-u.done:
|
||||
u.closeIdleSessions()
|
||||
u.cleanSessions()
|
||||
return nil
|
||||
case <-u.idleSessionTicker.C:
|
||||
u.closeIdleSessions()
|
||||
case <-u.sessionCleanTicker.C:
|
||||
u.cleanSessions()
|
||||
default:
|
||||
}
|
||||
seg, addr, err := u.readOneSegment()
|
||||
@@ -706,7 +705,7 @@ func (u *PacketUnderlay) writeOneSegment(seg *segment, addr net.Addr) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (u *PacketUnderlay) closeIdleSessions() {
|
||||
func (u *PacketUnderlay) cleanSessions() {
|
||||
u.sessionMap.Range(func(k, v any) bool {
|
||||
session := v.(*Session)
|
||||
select {
|
||||
|
@@ -50,6 +50,8 @@ type StreamUnderlay struct {
|
||||
// When isClient is true, there must be exactly 1 element in the slice.
|
||||
candidates []cipher.BlockCipher
|
||||
|
||||
sessionCleanTicker *time.Ticker
|
||||
|
||||
// ---- server fields ----
|
||||
users map[string]*appctlpb.User
|
||||
}
|
||||
@@ -75,9 +77,10 @@ func NewStreamUnderlay(ctx context.Context, dialer apicommon.Dialer, network, ad
|
||||
return nil, fmt.Errorf("DialContext() failed: %w", err)
|
||||
}
|
||||
t := &StreamUnderlay{
|
||||
baseUnderlay: *newBaseUnderlay(true, mtu),
|
||||
conn: conn,
|
||||
candidates: []cipher.BlockCipher{block},
|
||||
baseUnderlay: *newBaseUnderlay(true, mtu),
|
||||
conn: conn,
|
||||
candidates: []cipher.BlockCipher{block},
|
||||
sessionCleanTicker: time.NewTicker(sessionCleanInterval),
|
||||
}
|
||||
return t, nil
|
||||
}
|
||||
@@ -99,6 +102,7 @@ func (t *StreamUnderlay) Close() error {
|
||||
}
|
||||
|
||||
log.Debugf("Closing %v", t)
|
||||
t.sessionCleanTicker.Stop()
|
||||
t.baseUnderlay.Close()
|
||||
return t.conn.Close()
|
||||
}
|
||||
@@ -157,9 +161,13 @@ func (t *StreamUnderlay) RunEventLoop(ctx context.Context) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
t.cleanSessions()
|
||||
return nil
|
||||
case <-t.done:
|
||||
t.cleanSessions()
|
||||
return nil
|
||||
case <-t.sessionCleanTicker.C:
|
||||
t.cleanSessions()
|
||||
default:
|
||||
}
|
||||
seg, err := t.readOneSegment()
|
||||
@@ -636,3 +644,18 @@ func (t *StreamUnderlay) drainAfterError() {
|
||||
log.Debugf("%v read at least %d bytes after stream error", t, n)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *StreamUnderlay) cleanSessions() {
|
||||
t.sessionMap.Range(func(k, v any) bool {
|
||||
session := v.(*Session)
|
||||
select {
|
||||
case <-session.closedChan:
|
||||
log.Debugf("Found closed %v", session)
|
||||
if err := t.RemoveSession(session); err != nil {
|
||||
log.Debugf("%v RemoveSession() failed: %v", t, err)
|
||||
}
|
||||
default:
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
Reference in New Issue
Block a user