Update On Wed Sep 10 20:43:05 CEST 2025

This commit is contained in:
github-action[bot]
2025-09-10 20:43:06 +02:00
parent c5f9cfaa06
commit b50b35fed3
65 changed files with 768 additions and 662 deletions

View File

@@ -483,7 +483,7 @@ func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *transport.
handler = h
} else {
errors.LogWarning(ctx, "non existing outTag: ", outTag)
return
return // DO NOT CHANGE: the traffic shouldn't be processed by default outbound if the specified outbound tag doesn't exist (yet), e.g., VLESS Reverse Proxy
}
} else {
errors.LogInfo(ctx, "default route for ", destination)

View File

@@ -2,6 +2,7 @@ package inbound
import (
"context"
gonet "net"
"sync"
"sync/atomic"
"time"
@@ -76,7 +77,25 @@ func (w *tcpWorker) callback(conn stat.Connection) {
case internet.SocketConfig_TProxy:
dest = net.DestinationFromAddr(conn.LocalAddr())
}
if dest.IsValid() {
// Check if try to connect to this inbound itself (can cause loopback)
var isLoopBack bool
if w.address == net.AnyIP || w.address == net.AnyIPv6 {
if dest.Port.Value() == w.port.Value() && IsLocal(dest.Address.IP()) {
isLoopBack = true
}
} else {
if w.hub.Addr().String() == dest.NetAddr() {
isLoopBack = true
}
}
if isLoopBack {
cancel()
conn.Close()
errors.LogError(ctx, errors.New("loopback connection detected"))
return
}
outbounds[0].Target = dest
}
}
@@ -544,3 +563,18 @@ func (w *dsWorker) Close() error {
return nil
}
func IsLocal(ip net.IP) bool {
addrs, err := gonet.InterfaceAddrs()
if err != nil {
return false
}
for _, addr := range addrs {
if ipnet, ok := addr.(*gonet.IPNet); ok {
if ipnet.IP.Equal(ip) {
return true
}
}
}
return false
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/xtls/xray-core/common/mux"
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/session"
"github.com/xtls/xray-core/common/signal"
"github.com/xtls/xray-core/common/task"
"github.com/xtls/xray-core/features/routing"
"github.com/xtls/xray-core/transport"
@@ -53,6 +54,11 @@ func (b *Bridge) cleanup() {
if w.IsActive() {
activeWorkers = append(activeWorkers, w)
}
if w.Closed() {
if w.Timer != nil {
w.Timer.SetTimeout(0)
}
}
}
if len(activeWorkers) != len(b.workers) {
@@ -98,6 +104,7 @@ type BridgeWorker struct {
Worker *mux.ServerWorker
Dispatcher routing.Dispatcher
State Control_State
Timer *signal.ActivityTimer
}
func NewBridgeWorker(domain string, tag string, d routing.Dispatcher) (*BridgeWorker, error) {
@@ -125,6 +132,10 @@ func NewBridgeWorker(domain string, tag string, d routing.Dispatcher) (*BridgeWo
}
w.Worker = worker
terminate := func() {
worker.Close()
}
w.Timer = signal.CancelAfterInactivity(ctx, terminate, 60*time.Second)
return w, nil
}
@@ -144,6 +155,10 @@ func (w *BridgeWorker) IsActive() bool {
return w.State == Control_ACTIVE && !w.Worker.Closed()
}
func (w *BridgeWorker) Closed() bool {
return w.Worker.Closed()
}
func (w *BridgeWorker) Connections() uint32 {
return w.Worker.ActiveConnections()
}
@@ -153,13 +168,26 @@ func (w *BridgeWorker) handleInternalConn(link *transport.Link) {
for {
mb, err := reader.ReadMultiBuffer()
if err != nil {
break
if w.Timer != nil {
if w.Closed() {
w.Timer.SetTimeout(0)
} else {
w.Timer.SetTimeout(24 * time.Hour)
}
}
return
}
if w.Timer != nil {
w.Timer.Update()
}
for _, b := range mb {
var ctl Control
if err := proto.Unmarshal(b.Bytes(), &ctl); err != nil {
errors.LogInfoInner(context.Background(), err, "failed to parse proto message")
break
if w.Timer != nil {
w.Timer.SetTimeout(0)
}
return
}
if ctl.State != w.State {
w.State = ctl.State

View File

@@ -12,6 +12,7 @@ import (
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/serial"
"github.com/xtls/xray-core/common/session"
"github.com/xtls/xray-core/common/signal"
"github.com/xtls/xray-core/common/task"
"github.com/xtls/xray-core/features/outbound"
"github.com/xtls/xray-core/transport"
@@ -159,6 +160,8 @@ func (p *StaticMuxPicker) cleanup() error {
for _, w := range p.workers {
if !w.Closed() {
activeWorkers = append(activeWorkers, w)
} else {
w.timer.SetTimeout(0)
}
}
@@ -225,6 +228,7 @@ type PortalWorker struct {
reader buf.Reader
draining bool
counter uint32
timer *signal.ActivityTimer
}
func NewPortalWorker(client *mux.ClientWorker) (*PortalWorker, error) {
@@ -244,10 +248,14 @@ func NewPortalWorker(client *mux.ClientWorker) (*PortalWorker, error) {
if !f {
return nil, errors.New("unable to dispatch control connection")
}
terminate := func() {
client.Close()
}
w := &PortalWorker{
client: client,
reader: downlinkReader,
writer: uplinkWriter,
timer: signal.CancelAfterInactivity(ctx, terminate, 24*time.Hour), // // prevent leak
}
w.control = &task.Periodic{
Execute: w.heartbeat,
@@ -274,7 +282,6 @@ func (w *PortalWorker) heartbeat() error {
msg.State = Control_DRAIN
defer func() {
w.client.GetTimer().Reset(time.Second * 16)
common.Close(w.writer)
common.Interrupt(w.reader)
w.writer = nil
@@ -286,6 +293,7 @@ func (w *PortalWorker) heartbeat() error {
b, err := proto.Marshal(msg)
common.Must(err)
mb := buf.MergeBytes(nil, b)
w.timer.Update()
return w.writer.WriteMultiBuffer(mb)
}
return nil

View File

@@ -219,14 +219,16 @@ func (m *ClientWorker) WaitClosed() <-chan struct{} {
return m.done.Wait()
}
func (m *ClientWorker) GetTimer() *time.Ticker {
return m.timer
func (m *ClientWorker) Close() error {
return m.done.Close()
}
func (m *ClientWorker) monitor() {
defer m.timer.Stop()
for {
checkSize := m.sessionManager.Size()
checkCount := m.sessionManager.Count()
select {
case <-m.done.Wait():
m.sessionManager.Close()
@@ -234,8 +236,7 @@ func (m *ClientWorker) monitor() {
common.Interrupt(m.link.Reader)
return
case <-m.timer.C:
size := m.sessionManager.Size()
if size == 0 && m.sessionManager.CloseIfNoSession() {
if m.sessionManager.CloseIfNoSessionAndIdle(checkSize, checkCount) {
common.Must(m.done.Close())
}
}
@@ -255,7 +256,7 @@ func writeFirstPayload(reader buf.Reader, writer *Writer) error {
return nil
}
func fetchInput(ctx context.Context, s *Session, output buf.Writer, timer *time.Ticker) {
func fetchInput(ctx context.Context, s *Session, output buf.Writer) {
outbounds := session.OutboundsFromContext(ctx)
ob := outbounds[len(outbounds)-1]
transferType := protocol.TransferTypeStream
@@ -266,7 +267,6 @@ func fetchInput(ctx context.Context, s *Session, output buf.Writer, timer *time.
writer := NewWriter(s.ID, ob.Target, output, transferType, xudp.GetGlobalID(ctx))
defer s.Close(false)
defer writer.Close()
defer timer.Reset(time.Second * 16)
errors.LogInfo(ctx, "dispatching request to ", ob.Target)
if err := writeFirstPayload(s.input, writer); err != nil {
@@ -316,10 +316,12 @@ func (m *ClientWorker) Dispatch(ctx context.Context, link *transport.Link) bool
}
s.input = link.Reader
s.output = link.Writer
if _, ok := link.Reader.(*pipe.Reader); ok {
go fetchInput(ctx, s, m.link.Writer, m.timer)
} else {
fetchInput(ctx, s, m.link.Writer, m.timer)
go fetchInput(ctx, s, m.link.Writer)
if _, ok := link.Reader.(*pipe.Reader); !ok {
select {
case <-ctx.Done():
case <-s.done.Wait():
}
}
return true
}

View File

@@ -3,6 +3,7 @@ package mux
import (
"context"
"io"
"time"
"github.com/xtls/xray-core/app/dispatcher"
"github.com/xtls/xray-core/common"
@@ -12,6 +13,7 @@ import (
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/protocol"
"github.com/xtls/xray-core/common/session"
"github.com/xtls/xray-core/common/signal/done"
"github.com/xtls/xray-core/core"
"github.com/xtls/xray-core/features/routing"
"github.com/xtls/xray-core/transport"
@@ -63,8 +65,15 @@ func (s *Server) DispatchLink(ctx context.Context, dest net.Destination, link *t
return s.dispatcher.DispatchLink(ctx, dest, link)
}
link = s.dispatcher.(*dispatcher.DefaultDispatcher).WrapLink(ctx, link)
_, err := NewServerWorker(ctx, s.dispatcher, link)
return err
worker, err := NewServerWorker(ctx, s.dispatcher, link)
if err != nil {
return err
}
select {
case <-ctx.Done():
case <-worker.done.Wait():
}
return nil
}
// Start implements common.Runnable.
@@ -81,6 +90,8 @@ type ServerWorker struct {
dispatcher routing.Dispatcher
link *transport.Link
sessionManager *SessionManager
done *done.Instance
timer *time.Ticker
}
func NewServerWorker(ctx context.Context, d routing.Dispatcher, link *transport.Link) (*ServerWorker, error) {
@@ -88,15 +99,14 @@ func NewServerWorker(ctx context.Context, d routing.Dispatcher, link *transport.
dispatcher: d,
link: link,
sessionManager: NewSessionManager(),
done: done.New(),
timer: time.NewTicker(60 * time.Second),
}
if inbound := session.InboundFromContext(ctx); inbound != nil {
inbound.CanSpliceCopy = 3
}
if _, ok := link.Reader.(*pipe.Reader); ok {
go worker.run(ctx)
} else {
worker.run(ctx)
}
go worker.run(ctx)
go worker.monitor()
return worker, nil
}
@@ -111,12 +121,40 @@ func handle(ctx context.Context, s *Session, output buf.Writer) {
s.Close(false)
}
func (w *ServerWorker) monitor() {
defer w.timer.Stop()
for {
checkSize := w.sessionManager.Size()
checkCount := w.sessionManager.Count()
select {
case <-w.done.Wait():
w.sessionManager.Close()
common.Interrupt(w.link.Writer)
common.Interrupt(w.link.Reader)
return
case <-w.timer.C:
if w.sessionManager.CloseIfNoSessionAndIdle(checkSize, checkCount) {
common.Must(w.done.Close())
}
}
}
}
func (w *ServerWorker) ActiveConnections() uint32 {
return uint32(w.sessionManager.Size())
}
func (w *ServerWorker) Closed() bool {
return w.sessionManager.Closed()
return w.done.Done()
}
func (w *ServerWorker) WaitClosed() <-chan struct{} {
return w.done.Wait()
}
func (w *ServerWorker) Close() error {
return w.done.Close()
}
func (w *ServerWorker) handleStatusKeepAlive(meta *FrameMetadata, reader *buf.BufferedReader) error {
@@ -317,11 +355,11 @@ func (w *ServerWorker) handleFrame(ctx context.Context, reader *buf.BufferedRead
}
func (w *ServerWorker) run(ctx context.Context) {
reader := &buf.BufferedReader{Reader: w.link.Reader}
defer func() {
common.Must(w.done.Close())
}()
defer w.sessionManager.Close()
defer common.Interrupt(w.link.Reader)
defer common.Interrupt(w.link.Writer)
reader := &buf.BufferedReader{Reader: w.link.Reader}
for {
select {

View File

@@ -12,6 +12,7 @@ import (
"github.com/xtls/xray-core/common/errors"
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/protocol"
"github.com/xtls/xray-core/common/signal/done"
"github.com/xtls/xray-core/transport/pipe"
)
@@ -53,7 +54,7 @@ func (m *SessionManager) Count() int {
func (m *SessionManager) Allocate(Strategy *ClientStrategy) *Session {
m.Lock()
defer m.Unlock()
MaxConcurrency := int(Strategy.MaxConcurrency)
MaxConnection := uint16(Strategy.MaxConnection)
@@ -65,6 +66,7 @@ func (m *SessionManager) Allocate(Strategy *ClientStrategy) *Session {
s := &Session{
ID: m.count,
parent: m,
done: done.New(),
}
m.sessions[s.ID] = s
return s
@@ -115,7 +117,7 @@ func (m *SessionManager) Get(id uint16) (*Session, bool) {
return s, found
}
func (m *SessionManager) CloseIfNoSession() bool {
func (m *SessionManager) CloseIfNoSessionAndIdle(checkSize int, checkCount int) bool {
m.Lock()
defer m.Unlock()
@@ -123,11 +125,13 @@ func (m *SessionManager) CloseIfNoSession() bool {
return true
}
if len(m.sessions) != 0 {
if len(m.sessions) != 0 || checkSize != 0 || checkCount != int(m.count) {
return false
}
m.closed = true
m.sessions = nil
return true
}
@@ -157,6 +161,7 @@ type Session struct {
ID uint16
transferType protocol.TransferType
closed bool
done *done.Instance
XUDP *XUDP
}
@@ -171,6 +176,9 @@ func (s *Session) Close(locked bool) error {
return nil
}
s.closed = true
if s.done != nil {
s.done.Close()
}
if s.XUDP == nil {
common.Interrupt(s.input)
common.Close(s.output)

View File

@@ -41,11 +41,11 @@ func TestSessionManagerClose(t *testing.T) {
m := NewSessionManager()
s := m.Allocate(&ClientStrategy{})
if m.CloseIfNoSession() {
if m.CloseIfNoSessionAndIdle(m.Size(), m.Count()) {
t.Error("able to close")
}
m.Remove(false, s.ID)
if !m.CloseIfNoSession() {
if !m.CloseIfNoSessionAndIdle(m.Size(), m.Count()) {
t.Error("not able to close")
}
}

View File

@@ -19,7 +19,7 @@ import (
var (
Version_x byte = 25
Version_y byte = 9
Version_z byte = 5
Version_z byte = 11
)
var (

View File

@@ -19,15 +19,15 @@ require (
github.com/stretchr/testify v1.11.1
github.com/v2fly/ss-bloomring v0.0.0-20210312155135-28617310f63e
github.com/vishvananda/netlink v1.3.1
github.com/xtls/reality v0.0.0-20250828044527-046fad5ab64f
github.com/xtls/reality v0.0.0-20250904214705-431b6ff8c67c
go4.org/netipx v0.0.0-20231129151722-fdeea329fbba
golang.org/x/crypto v0.42.0
golang.org/x/net v0.43.0
golang.org/x/net v0.44.0
golang.org/x/sync v0.17.0
golang.org/x/sys v0.36.0
golang.zx2c4.com/wireguard v0.0.0-20231211153847-12269c276173
google.golang.org/grpc v1.75.0
google.golang.org/protobuf v1.36.8
google.golang.org/protobuf v1.36.9
gvisor.dev/gvisor v0.0.0-20250428193742-2d800c3129d5
h12.io/socks v1.0.3
lukechampine.com/blake3 v1.4.1

View File

@@ -75,8 +75,8 @@ github.com/vishvananda/netlink v1.3.1 h1:3AEMt62VKqz90r0tmNhog0r/PpWKmrEShJU0wJW
github.com/vishvananda/netlink v1.3.1/go.mod h1:ARtKouGSTGchR8aMwmkzC0qiNPrrWO5JS/XMVl45+b4=
github.com/vishvananda/netns v0.0.5 h1:DfiHV+j8bA32MFM7bfEunvT8IAqQ/NzSJHtcmW5zdEY=
github.com/vishvananda/netns v0.0.5/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM=
github.com/xtls/reality v0.0.0-20250828044527-046fad5ab64f h1:o1Kryl9qEYYzNep9RId9DM1kBn8tBrcK5UJnti/l0NI=
github.com/xtls/reality v0.0.0-20250828044527-046fad5ab64f/go.mod h1:XxvnCCgBee4WWE0bc4E+a7wbk8gkJ/rS0vNVNtC5qp0=
github.com/xtls/reality v0.0.0-20250904214705-431b6ff8c67c h1:LHLhQY3mKXSpTcQAkjFR4/6ar3rXjQryNeM7khK3AHU=
github.com/xtls/reality v0.0.0-20250904214705-431b6ff8c67c/go.mod h1:XxvnCCgBee4WWE0bc4E+a7wbk8gkJ/rS0vNVNtC5qp0=
github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
@@ -104,8 +104,8 @@ golang.org/x/mod v0.27.0/go.mod h1:rWI627Fq0DEoudcK+MBkNkCe0EetEaDSwJJkCcjpazc=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE=
golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg=
golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I=
golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
@@ -145,8 +145,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7 h1:
google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
google.golang.org/grpc v1.75.0 h1:+TW+dqTd2Biwe6KKfhE5JpiYIBWq865PhKGSXiivqt4=
google.golang.org/grpc v1.75.0/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ=
google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc=
google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
google.golang.org/protobuf v1.36.9 h1:w2gp2mA27hUeUzj9Ex9FBjsBm40zfaDtEWow293U7Iw=
google.golang.org/protobuf v1.36.9/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=

View File

@@ -678,10 +678,10 @@ func CopyRawConnIfExist(ctx context.Context, readerConn net.Conn, writerConn net
errors.LogInfo(ctx, "CopyRawConn splice")
statWriter, _ := writer.(*dispatcher.SizeStatWriter)
//runtime.Gosched() // necessary
time.Sleep(time.Millisecond) // without this, there will be a rare ssl error for freedom splice
timer.SetTimeout(8 * time.Hour) // prevent leak, just in case
time.Sleep(time.Millisecond) // without this, there will be a rare ssl error for freedom splice
timer.SetTimeout(24 * time.Hour) // prevent leak, just in case
if inTimer != nil {
inTimer.SetTimeout(8 * time.Hour)
inTimer.SetTimeout(24 * time.Hour)
}
w, err := tc.ReadFrom(readerConn)
if readCounter != nil {