fix #113 , 使用smux时会出现无法加载的情况

这是由两个问题造成的

问题1:

同时并发请求多个请求时,会出现同时建立两个mux的情况,导致先建立的mux被覆盖;

问题2:

一旦某个stream的连接失败后,代码 会关闭整个session。这是由于 iics无法分辨simplesocks和普通协议造成的。

加一个 isInner 标签即可分辨。
This commit is contained in:
e1732a364fed
2000-01-01 00:00:00 +00:00
parent 7e69f17927
commit 8fcf747479
5 changed files with 115 additions and 84 deletions

View File

@@ -14,6 +14,9 @@ insecure = true
utls = true
#lazy = true
# 备注: trojan 也是一样可以应用 ws/grpc/quic 的,具体你只要参考对应示例文件即可,然后把 vlesss 改成 trojans 即可.
# 备注verysimple 的trojan实现 完全兼容 trojan-go, 包括mux, 因为一样用的是 smux+ simplesocks
# use_mux = true
# verysimple 的trojan实现 完全兼容 trojan-go, 包括mux, 因为一样用的是 smux+ simplesocks

View File

@@ -64,6 +64,8 @@ type incomingInserverConnState struct {
inServer proxy.Server //可为 nil
defaultClient proxy.Client
isInner bool
inTag string //在inServer为nil时可用此项确定 inTag
useSniffing bool //在inServer为nil时可用此项确定 是否使用sniffing

182
main.go
View File

@@ -169,7 +169,7 @@ func ListenSer(inServer proxy.Server, defaultOutClient proxy.Client, env *proxy.
// handleNewIncomeConnection 会处理 网络层至高级层的数据,
// 然后将代理层的处理发往 handshakeInserver_and_passToOutClient 函数。
//
// 在 listenSer 中被调用。
// 在 ListenSer 中被调用。
func handleNewIncomeConnection(inServer proxy.Server, defaultClientForThis proxy.Client, thisLocalConnectionInstance net.Conn, env *proxy.RoutingEnv) {
iics := incomingInserverConnState{
@@ -380,111 +380,117 @@ func handshakeInserver(iics *incomingInserverConnState) (wlc net.Conn, udp_wlc n
wlc, udp_wlc, targetAddr, err = inServer.Handshake(iics.wrappedConn)
if err == nil {
if udp_wlc != nil && inServer.Name() == "socks5" {
// socks5的 udp associate返回的是 clientFutureAddr, 而不是实际客户的第一个请求.
//所以我们要读一次才能进行下一步。
if err != nil {
return
}
if udp_wlc != nil && inServer.Name() == "socks5" {
// socks5的 udp associate返回的是 clientFutureAddr, 而不是实际客户的第一个请求.
//所以我们要读一次才能进行下一步。
firstSocks5RequestData, firstSocks5RequestAddr, err2 := udp_wlc.ReadMsgFrom()
if err2 != nil {
if ce := iics.CanLogWarn("failed in socks5 read"); ce != nil {
ce.Write(
zap.String("handler", inServer.AddrStr()),
zap.Error(err2),
)
}
err = err2
return
firstSocks5RequestData, firstSocks5RequestAddr, err2 := udp_wlc.ReadMsgFrom()
if err2 != nil {
if ce := iics.CanLogWarn("failed in socks5 read"); ce != nil {
ce.Write(
zap.String("handler", inServer.AddrStr()),
zap.Error(err2),
)
}
iics.fallbackFirstBuffer = bytes.NewBuffer(firstSocks5RequestData)
targetAddr = firstSocks5RequestAddr
err = err2
return
}
////////////////////////////// 内层mux阶段 /////////////////////////////////////
iics.fallbackFirstBuffer = bytes.NewBuffer(firstSocks5RequestData)
if muxInt, innerProxyName := inServer.HasInnerMux(); muxInt > 0 {
if mh, ok := wlc.(proxy.MuxMarker); ok {
targetAddr = firstSocks5RequestAddr
}
innerSerConf := proxy.ListenConf{
CommonConf: proxy.CommonConf{
Protocol: innerProxyName,
},
////////////////////////////// 内层mux阶段 /////////////////////////////////////
if muxInt, innerProxyName := inServer.HasInnerMux(); muxInt > 0 {
mh, ok := wlc.(proxy.MuxMarker)
if !ok {
return
}
innerSerConf := proxy.ListenConf{
CommonConf: proxy.CommonConf{
Protocol: innerProxyName,
},
}
innerSer, err2 := proxy.NewServer(&innerSerConf)
if err2 != nil {
if ce := iics.CanLogDebug("mux inner proxy server creation failed"); ce != nil {
ce.Write(zap.Error(err))
}
err = err2
return
}
session := inServer.GetServerInnerMuxSession(mh)
if session == nil {
err = utils.ErrFailed
return
}
//内层mux要对每一个子连接单独进行 子代理协议握手 以及 outClient的拨号。
go func() {
for {
if ce := iics.CanLogDebug("inServer try accept smux stream "); ce != nil {
ce.Write()
}
innerSer, err2 := proxy.NewServer(&innerSerConf)
if err2 != nil {
if ce := iics.CanLogDebug("mux inner proxy server creation failed"); ce != nil {
stream, err := session.AcceptStream()
if err != nil {
if ce := iics.CanLogDebug("mux inServer try accept stream failed"); ce != nil {
ce.Write(zap.Error(err))
}
err = err2
session.Close()
return
}
session := inServer.GetServerInnerMuxSession(mh)
if session == nil {
err = utils.ErrFailed
return
if ce := iics.CanLogDebug("inServer got inner mux stream"); ce != nil {
ce.Write(zap.String("innerProxyName", innerProxyName))
}
//内层mux要对每一个子连接单独进行 子代理协议握手 以及 outClient的拨号。
go func() {
for {
if ce := iics.CanLogDebug("inServer try accept smux stream "); ce != nil {
ce.Write()
wlc1, udp_wlc1, targetAddr1, err1 := innerSer.Handshake(stream)
if err1 != nil {
if ce := iics.CanLogDebug("inServer mux inner proxy handshake failed"); ce != nil {
ce.Write(zap.Error(err1))
}
newiics := *iics
stream, err := session.AcceptStream()
if err != nil {
if ce := iics.CanLogDebug("mux inServer try accept stream failed"); ce != nil {
ce.Write(zap.Error(err))
}
session.Close()
if !newiics.extractFirstBufFromErr(err1) {
return
}
if ce := iics.CanLogDebug("inServer got inner mux stream"); ce != nil {
ce.Write(zap.String("innerProxyName", innerProxyName))
passToOutClient(newiics, true, wlc1, udp_wlc1, targetAddr1)
} else {
if ce := iics.CanLogDebug("inServer mux stream handshake ok"); ce != nil {
ce.Write(zap.String("targetAddr", targetAddr1.String()))
}
go func() {
newiics := *iics
newiics.isInner = true
wlc1, udp_wlc1, targetAddr1, err1 := innerSer.Handshake(stream)
if err1 != nil {
if ce := iics.CanLogDebug("inServer mux inner proxy handshake failed"); ce != nil {
ce.Write(zap.Error(err1))
}
newiics := *iics
if !newiics.extractFirstBufFromErr(err1) {
return
}
passToOutClient(newiics, true, wlc1, udp_wlc1, targetAddr1)
} else {
if ce := iics.CanLogDebug("inServer mux stream handshake ok"); ce != nil {
ce.Write(zap.String("targetAddr1", targetAddr1.String()))
}
passToOutClient(*iics, false, wlc1, udp_wlc1, targetAddr1)
}
}()
passToOutClient(newiics, false, wlc1, udp_wlc1, targetAddr1)
}
}()
err = utils.ErrHandled
return
}
}
}()
err = utils.ErrHandled
return
}
@@ -494,7 +500,7 @@ func handshakeInserver(iics *incomingInserverConnState) (wlc net.Conn, udp_wlc n
// 本函数 处理inServer的代理层数据并在试图处理 分流和回落后将流量导向目标并开始Copy。
// iics 不使用指针, 因为iics不能公用因为 在多路复用时 iics.wrappedConn 是会变化的。
//
//被 handleNewIncomeConnection 调用。
//被 handleNewIncomeConnection 和 ListenSer 调用。
func handshakeInserver_and_passToOutClient(iics incomingInserverConnState) {
wlc, udp_wlc, targetAddr, err := handshakeInserver(&iics)
@@ -953,7 +959,12 @@ func dialClient(iics incomingInserverConnState, targetAddr netLayer.Addr,
hasInnerMux = true
//先过滤掉 innermux 通道已经建立的情况, 此时我们不必再次外部拨号,而是直接进行内层拨号.
client.Lock()
if client.InnerMuxEstablished() {
client.Unlock()
wrc1, realudp_wrc, result1 := dialInnerProxy(client, wlc, nil, iics, innerProxyName, targetAddr, isudp)
if result1 == 0 {
@@ -966,11 +977,20 @@ func dialClient(iics incomingInserverConnState, targetAddr netLayer.Addr,
result = result1
return
} else {
if ce := iics.CanLogDebug("mux failed, will redial"); ce != nil {
if ce := iics.CanLogDebug("client inner mux dial innerProxy failed, will redial"); ce != nil {
ce.Write()
}
}
} else {
//在实测时 发现,可能出现并发问题,比如在加载图多的网页时,很容易碰到
//此时如果是两个连接同时 发出,而且 尚未 建立 innerMux
// 则如果不加锁的话 ,两个连接 会同时 获取到 client.InnerMuxEstablished() 为 false
// 这会导致同时试图拨号 innerMux而这是错误的
//我们只允许有一个 innerMux 连接存在如果有多个的话那么最新拨号的innerMux 会覆盖以前的拨号,
// 导致 以前的 innerMux 成为了 悬垂连接,而且会导致 相关联的请求卡住。
defer client.Unlock()
}
}
}
@@ -1372,7 +1392,7 @@ func dialClient_andRelay(iics incomingInserverConnState, targetAddr netLayer.Add
//在内层mux时, 不能因为单个传输完毕就关闭整个连接
if innerMuxResult, _ := client.HasInnerMux(); innerMuxResult == 0 {
if iics.shouldCloseInSerBaseConnWhenFinish {
if iics.shouldCloseInSerBaseConnWhenFinish && !iics.isInner {
if iics.baseLocalConn != nil {
defer iics.baseLocalConn.Close()
}

View File

@@ -4,6 +4,7 @@ import (
"crypto/tls"
"io"
"strings"
"sync"
"github.com/e1732a364fed/v2ray_simple/advLayer"
"github.com/e1732a364fed/v2ray_simple/httpLayer"
@@ -110,6 +111,7 @@ type Base struct {
Innermux *smux.Session //用于存储 client的已拨号的mux连接
sync.Mutex
}
func (b *Base) GetBase() *Base {
@@ -171,6 +173,7 @@ func (b *Base) Sniffing() bool {
}
func (b *Base) InnerMuxEstablished() bool {
return b.Innermux != nil && !b.Innermux.IsClosed()
}
@@ -194,14 +197,14 @@ func (*Base) GetServerInnerMuxSession(wlc io.ReadWriteCloser) *smux.Session {
}
func (b *Base) CloseInnerMuxSession() {
if b.Innermux != nil && !b.Innermux.IsClosed() {
if b.InnerMuxEstablished() {
b.Innermux.Close()
b.Innermux = nil
}
}
func (b *Base) GetClientInnerMuxSession(wrc io.ReadWriteCloser) *smux.Session {
if b.Innermux != nil && !b.Innermux.IsClosed() {
if b.InnerMuxEstablished() {
return b.Innermux
} else {
smuxConfig := smux.DefaultConfig()

View File

@@ -4,6 +4,7 @@ import (
"io"
"net"
"strings"
"sync"
"time"
"github.com/e1732a364fed/v2ray_simple/netLayer"
@@ -58,6 +59,8 @@ type Client interface {
GetClientInnerMuxSession(wrc io.ReadWriteCloser) *smux.Session
InnerMuxEstablished() bool
CloseInnerMuxSession()
sync.Locker //用于锁定 innerMux
}
type UserClient interface {