diff --git a/README.md b/README.md index e5c5333..c6f0fa1 100644 --- a/README.md +++ b/README.md @@ -30,9 +30,9 @@ vs的一些亮点是 全协议readv加速,lazy技术,vless v1,hysteria 阻 支持的功能有: -socks5/http/dokodemo/tproxy(透明代理)/trojan/simplesocks/vless/vless_v1, +socks5/http/dokodemo/tproxy(透明代理)/trojan/simplesocks/vless(v0/v1), -ws(以及earlydata)/grpc/quic(以及hy阻控)/smux, +ws(以及earlydata)/grpc(以及multiMode 以及uTls)/quic(以及hy阻控)/smux, dns(udp/tls)/route(geoip/geosite)/fallback(path/sni/alpn), diff --git a/advLayer/quic/client.go b/advLayer/quic/client.go new file mode 100644 index 0000000..a959f76 --- /dev/null +++ b/advLayer/quic/client.go @@ -0,0 +1,168 @@ +package quic + +import ( + "crypto/tls" + "net" + "reflect" + "sync" + "sync/atomic" + + "github.com/hahahrfool/v2ray_simple/netLayer" + "github.com/hahahrfool/v2ray_simple/utils" + "github.com/lucas-clemente/quic-go" + "github.com/lucas-clemente/quic-go/congestion" + "go.uber.org/zap" +) + +type Client struct { + knownServerMaxStreamCount int32 + + serverAddrStr string + + tlsConf tls.Config + useHysteria, hysteria_manual bool + maxbyteCount int + + clientconns map[[16]byte]*sessionState + sessionMapMutex sync.RWMutex +} + +func NewClient(addr *netLayer.Addr, alpnList []string, host string, insecure bool, useHysteria bool, maxbyteCount int, hysteria_manual bool) *Client { + return &Client{ + serverAddrStr: addr.String(), + tlsConf: tls.Config{ + InsecureSkipVerify: insecure, + ServerName: host, + NextProtos: alpnList, + }, + useHysteria: useHysteria, + hysteria_manual: hysteria_manual, + maxbyteCount: maxbyteCount, + } +} + +//trimSessions移除不Active的session, 并试图返回一个 最佳的可用于新stream的session +func (c *Client) trimSessions(ss map[[16]byte]*sessionState) (s *sessionState) { + minSessionNum := 10000 + for id, thisState := range ss { + if isActive(thisState) { + + if c.knownServerMaxStreamCount == 0 { + s = thisState + return + } else { + osc := int(thisState.openedStreamCount) + + if osc < int(c.knownServerMaxStreamCount) { + + if osc < minSessionNum { + s = thisState + minSessionNum = osc + + } + } + } + + } else { + thisState.CloseWithError(0, "") + delete(ss, id) + } + } + + return +} + +//获取已拨号的连接,或者重新从底层拨号。返回一个可作 c.DialSubConn 参数 的值. +func (c *Client) DialCommonConn(openBecausePreviousFull bool, previous any) any { + //返回一个 *sessionState. + + //我们采用预先openStream的策略, 来试出哪些session已经满了, 哪些没满 + // 已知的是, 一个session满了之后, 要等待 0~45秒 或以上的时间, 才能它才可能腾出空位 + + //我们对每一个session所打开过的stream进行计数,这样就可以探知 服务端 的 最大stream数设置. + + if !openBecausePreviousFull { + + c.sessionMapMutex.Lock() + var theSession *sessionState + if len(c.clientconns) > 0 { + theSession = c.trimSessions(c.clientconns) + } + if len(c.clientconns) > 0 { + c.sessionMapMutex.Unlock() + if theSession != nil { + return theSession + + } + } else { + c.clientconns = make(map[[16]byte]*sessionState) + c.sessionMapMutex.Unlock() + } + } else if previous != nil && c.knownServerMaxStreamCount == 0 { + + ps, ok := previous.(*sessionState) + if !ok { + if ce := utils.CanLogDebug("QUIC: 'previous' parameter was given but with wrong type "); ce != nil { + ce.Write(zap.String("type", reflect.TypeOf(previous).String())) + } + return nil + } + + c.knownServerMaxStreamCount = ps.openedStreamCount + + if ce := utils.CanLogDebug("QUIC: knownServerMaxStreamCount"); ce != nil { + ce.Write(zap.Int32("count", c.knownServerMaxStreamCount)) + } + + } + + session, err := quic.DialAddr(c.serverAddrStr, &c.tlsConf, &common_DialConfig) + if err != nil { + if ce := utils.CanLogErr("QUIC: dial failed"); ce != nil { + ce.Write(zap.Error(err)) + } + return nil + } + + if c.useHysteria { + if c.maxbyteCount <= 0 { + c.maxbyteCount = Default_hysteriaMaxByteCount + } + + if c.hysteria_manual { + bs := NewBrutalSender_M(congestion.ByteCount(c.maxbyteCount)) + session.SetCongestionControl(bs) + + } else { + bs := NewBrutalSender(congestion.ByteCount(c.maxbyteCount)) + session.SetCongestionControl(bs) + + } + } + + id := utils.GenerateUUID() + + var result = &sessionState{Connection: session, id: id} + c.sessionMapMutex.Lock() + c.clientconns[id] = result + c.sessionMapMutex.Unlock() + + return result +} + +func (c *Client) DialSubConn(thing any) (net.Conn, error) { + theState, ok := thing.(*sessionState) + if !ok { + return nil, utils.ErrNilOrWrongParameter + } + stream, err := theState.OpenStream() + if err != nil { + + return nil, err + + } + + atomic.AddInt32(&theState.openedStreamCount, 1) + + return StreamConn{Stream: stream, laddr: theState.LocalAddr(), raddr: theState.RemoteAddr(), relatedSessionState: theState}, nil +} diff --git a/advLayer/quic/conn.go b/advLayer/quic/conn.go new file mode 100644 index 0000000..dfa92b9 --- /dev/null +++ b/advLayer/quic/conn.go @@ -0,0 +1,51 @@ +package quic + +import ( + "net" + "sync/atomic" + + "github.com/lucas-clemente/quic-go" +) + +//用于 跟踪 一个 session 中 所开启的 stream的数量 +type sessionState struct { + quic.Connection + id [16]byte + + openedStreamCount int32 +} + +//给 quic.Stream 添加 方法使其满足 net.Conn. +// quic.Stream 唯独不支持 LocalAddr 和 RemoteAddr 方法. +// 因为它是通过 StreamID 来识别连接. 不过session是有的。 +type StreamConn struct { + quic.Stream + laddr, raddr net.Addr + relatedSessionState *sessionState + isclosed bool +} + +func (sc StreamConn) LocalAddr() net.Addr { + return sc.laddr +} +func (sc StreamConn) RemoteAddr() net.Addr { + return sc.raddr +} + +//这里必须要同时调用 CancelRead 和 CancelWrite +// 因为 quic-go这个设计的是双工的,调用Close实际上只是间接调用了 CancelWrite +// 看 quic-go包中的 quic.SendStream 的注释就知道了. +func (sc StreamConn) Close() error { + if sc.isclosed { + return nil + } + sc.isclosed = true + sc.CancelRead(quic.StreamErrorCode(quic.ConnectionRefused)) + sc.CancelWrite(quic.StreamErrorCode(quic.ConnectionRefused)) + if rss := sc.relatedSessionState; rss != nil { + + atomic.AddInt32(&rss.openedStreamCount, -1) + + } + return sc.Stream.Close() +} diff --git a/advLayer/quic/quic.go b/advLayer/quic/quic.go index efa5eaa..7361d9d 100644 --- a/advLayer/quic/quic.go +++ b/advLayer/quic/quic.go @@ -7,14 +7,12 @@ package quic import ( "context" "crypto/tls" + "log" "net" "reflect" - "sync" - "sync/atomic" "time" "github.com/hahahrfool/v2ray_simple/advLayer" - "github.com/hahahrfool/v2ray_simple/netLayer" "github.com/hahahrfool/v2ray_simple/utils" "github.com/lucas-clemente/quic-go" "github.com/lucas-clemente/quic-go/congestion" @@ -36,55 +34,34 @@ func init() { //我们要是以后不使用hysteria的话,只需删掉 useHysteria 里的代码, 删掉 pacer.go/brutal.go, 并删掉 go.mod中的replace部分. // 然后proxy.go里的 相关配置部分也要删掉 在 prepareTLS_for* 函数中 的相关配置 即可. -//100mbps -const Default_hysteriaMaxByteCount = 1024 * 1024 / 8 * 100 - -func CloseSession(baseC any) { - baseC.(quic.Connection).CloseWithError(0, "") -} - -//给 quic.Stream 添加 方法使其满足 net.Conn. -// quic.Stream 唯独不支持 LocalAddr 和 RemoteAddr 方法. -// 因为它是通过 StreamID 来识别连接. 不过session是有的。 -type StreamConn struct { - quic.Stream - laddr, raddr net.Addr - relatedSessionState *sessionState - isclosed bool -} - -func (sc StreamConn) LocalAddr() net.Addr { - return sc.laddr -} -func (sc StreamConn) RemoteAddr() net.Addr { - return sc.raddr -} - -//这里必须要同时调用 CancelRead 和 CancelWrite -// 因为 quic-go这个设计的是双工的,调用Close实际上只是间接调用了 CancelWrite -// 看 quic-go包中的 quic.SendStream 的注释就知道了. -func (sc StreamConn) Close() error { - if sc.isclosed { - return nil - } - sc.isclosed = true - sc.CancelRead(quic.StreamErrorCode(quic.ConnectionRefused)) - sc.CancelWrite(quic.StreamErrorCode(quic.ConnectionRefused)) - if rss := sc.relatedSessionState; rss != nil { - - atomic.AddInt32(&rss.openedStreamCount, -1) - - } - return sc.Stream.Close() -} - const ( + //100mbps + Default_hysteriaMaxByteCount = 1024 * 1024 / 8 * 100 + common_maxidletimeout = time.Second * 45 common_HandshakeIdleTimeout = time.Second * 8 common_ConnectionIDLength = 12 - server_maxStreamCountInOneSession = 4 + server_maxStreamCountInOneSession = 4 //一个session中 stream越多, 性能越低, 因此我们这里限制为4 ) +func isActive(s quic.Connection) bool { + select { + case <-s.Context().Done(): + return false + default: + return true + } +} + +func CloseConn(baseC any) { + qc, ok := baseC.(quic.Connection) + if ok { + qc.CloseWithError(0, "") + } else { + log.Panicln("quic.CloseConn called with illegal parameter", reflect.TypeOf(baseC).String(), baseC) + } +} + var ( AlpnList = []string{"h3"} @@ -129,222 +106,62 @@ func ListenInitialLayers(addr string, tlsConf tls.Config, useHysteria bool, hyst newConnChan = make(chan net.Conn, 10) - go func(theChan chan net.Conn) { + go loopAccept(listener, newConnChan, useHysteria, hysteria_manual, hysteriaMaxByteCount) + return +} + +//阻塞 +func loopAccept(listener quic.Listener, theChan chan net.Conn, useHysteria bool, hysteria_manual bool, hysteriaMaxByteCount int) { + for { + session, err := listener.Accept(context.Background()) + if err != nil { + if ce := utils.CanLogErr("quic session accept"); ce != nil { + ce.Write(zap.Error(err)) + } + //close(theChan) //不应关闭chan,因为listen虽然不好使但是也许现存的stream还是好使的... + return + } + + dealNewSession(session, theChan, useHysteria, hysteria_manual, hysteriaMaxByteCount) + } +} + +//非阻塞 +func dealNewSession(session quic.Connection, theChan chan net.Conn, useHysteria bool, hysteria_manual bool, hysteriaMaxByteCount int) { + + if useHysteria { + + if hysteria_manual { + bs := NewBrutalSender_M(congestion.ByteCount(hysteriaMaxByteCount)) + + session.SetCongestionControl(bs) + } else { + bs := NewBrutalSender(congestion.ByteCount(hysteriaMaxByteCount)) + + session.SetCongestionControl(bs) + } + + } + + go func() { for { - session, err := listener.Accept(context.Background()) + stream, err := session.AcceptStream(context.Background()) if err != nil { - if ce := utils.CanLogErr("quic session accept"); ce != nil { + if ce := utils.CanLogDebug("quic stream accept failed"); ce != nil { + //只要某个连接idle时间一长,超过了idleTimeout,服务端就会出现此错误: + // timeout: no recent network activity,即 IdleTimeoutError + //这不能说是错误, 而是quic的udp特性所致,所以放到debug 输出中. + //这也同时说明, keep alive功能并不会更新 idle的最后期限. + + //我们为了性能,不必将该err转成 net.Error然后判断是否是timeout + //如果要排错那就开启debug日志即可. + ce.Write(zap.Error(err)) } - //close(theChan) //不应关闭chan,因为listen虽然不好使但是也许现存的stream还是好使的... - return + break } - - if useHysteria { - - if hysteria_manual { - bs := NewBrutalSender_M(congestion.ByteCount(hysteriaMaxByteCount)) - - session.SetCongestionControl(bs) - } else { - bs := NewBrutalSender(congestion.ByteCount(hysteriaMaxByteCount)) - - session.SetCongestionControl(bs) - } - - } - - go func() { - for { - stream, err := session.AcceptStream(context.Background()) - if err != nil { - if ce := utils.CanLogDebug("quic stream accept failed"); ce != nil { - //只要某个连接idle时间一长,超过了idleTimeout,服务端就会出现此错误: - // timeout: no recent network activity,即 IdleTimeoutError - //这不能说是错误, 而是quic的udp特性所致,所以放到debug 输出中. - //这也同时说明, keep alive功能并不会更新 idle的最后期限. - - //我们为了性能,不必将该err转成 net.Error然后判断是否是timeout - //如果要排错那就开启debug日志即可. - - ce.Write(zap.Error(err)) - } - break - } - theChan <- StreamConn{stream, session.LocalAddr(), session.RemoteAddr(), nil, false} - } - }() + theChan <- StreamConn{stream, session.LocalAddr(), session.RemoteAddr(), nil, false} } - - }(newConnChan) - - return -} - -func isActive(s quic.Connection) bool { - select { - case <-s.Context().Done(): - return false - default: - return true - } -} - -type Client struct { - knownServerMaxStreamCount int32 - - serverAddrStr string - - tlsConf tls.Config - useHysteria, hysteria_manual bool - maxbyteCount int - - clientconns map[[16]byte]*sessionState - sessionMapMutex sync.RWMutex -} - -type sessionState struct { - quic.Connection - id [16]byte - - openedStreamCount int32 -} - -func NewClient(addr *netLayer.Addr, alpnList []string, host string, insecure bool, useHysteria bool, maxbyteCount int, hysteria_manual bool) *Client { - return &Client{ - serverAddrStr: addr.String(), - tlsConf: tls.Config{ - InsecureSkipVerify: insecure, - ServerName: host, - NextProtos: alpnList, - }, - useHysteria: useHysteria, - hysteria_manual: hysteria_manual, - maxbyteCount: maxbyteCount, - } -} - -//trimSessions移除不Active的session, 并试图返回一个 最佳的可用于新stream的session -func (c *Client) trimSessions(ss map[[16]byte]*sessionState) (s *sessionState) { - minSessionNum := 10000 - for id, thisState := range ss { - if isActive(thisState) { - - if c.knownServerMaxStreamCount == 0 { - s = thisState - return - } else { - osc := int(thisState.openedStreamCount) - - if osc < int(c.knownServerMaxStreamCount) { - - if osc < minSessionNum { - s = thisState - minSessionNum = osc - - } - } - } - - } else { - thisState.CloseWithError(0, "") - delete(ss, id) - } - } - - return -} - -//获取已拨号的连接,或者重新从底层拨号 -func (c *Client) DialCommonConn(openBecausePreviousFull bool, previous any) any { - //我们采用预先openStream的策略, 来试出哪些session已经满了, 哪些没满 - // 已知的是, 一个session满了之后, 要等待 0~45秒 或以上的时间, 才能它才可能腾出空位 - - //我们对每一个session所打开过的stream进行计数,这样就可以探知 服务端 的 最大stream数设置. - - if !openBecausePreviousFull { - - c.sessionMapMutex.Lock() - var theSession *sessionState - if len(c.clientconns) > 0 { - theSession = c.trimSessions(c.clientconns) - } - if len(c.clientconns) > 0 { - c.sessionMapMutex.Unlock() - if theSession != nil { - return theSession - - } - } else { - c.clientconns = make(map[[16]byte]*sessionState) - c.sessionMapMutex.Unlock() - } - } else if previous != nil && c.knownServerMaxStreamCount == 0 { - - ps, ok := previous.(*sessionState) - if !ok { - if ce := utils.CanLogDebug("QUIC: 'previous' parameter was given but with wrong type "); ce != nil { - ce.Write(zap.String("type", reflect.TypeOf(previous).String())) - } - return nil - } - - c.knownServerMaxStreamCount = ps.openedStreamCount - - if ce := utils.CanLogDebug("QUIC: knownServerMaxStreamCount"); ce != nil { - ce.Write(zap.Int32("count", c.knownServerMaxStreamCount)) - } - - } - - session, err := quic.DialAddr(c.serverAddrStr, &c.tlsConf, &common_DialConfig) - if err != nil { - if ce := utils.CanLogErr("QUIC: dial failed"); ce != nil { - ce.Write(zap.Error(err)) - } - return nil - } - - if c.useHysteria { - if c.maxbyteCount <= 0 { - c.maxbyteCount = Default_hysteriaMaxByteCount - } - - if c.hysteria_manual { - bs := NewBrutalSender_M(congestion.ByteCount(c.maxbyteCount)) - session.SetCongestionControl(bs) - - } else { - bs := NewBrutalSender(congestion.ByteCount(c.maxbyteCount)) - session.SetCongestionControl(bs) - - } - } - - id := utils.GenerateUUID() - - var result = &sessionState{Connection: session, id: id} - c.sessionMapMutex.Lock() - c.clientconns[id] = result - c.sessionMapMutex.Unlock() - - return result -} - -func (c *Client) DialSubConn(thing any) (net.Conn, error) { - theState, ok := thing.(*sessionState) - if !ok { - return nil, utils.ErrNilOrWrongParameter - } - stream, err := theState.OpenStream() - if err != nil { - - return nil, err - - } - - atomic.AddInt32(&theState.openedStreamCount, 1) - - return StreamConn{Stream: stream, laddr: theState.LocalAddr(), raddr: theState.RemoteAddr(), relatedSessionState: theState}, nil + }() } diff --git a/examples/trojan_grpc.server.toml b/examples/trojan_grpc.server.toml index 11dfc4b..7ee5e13 100644 --- a/examples/trojan_grpc.server.toml +++ b/examples/trojan_grpc.server.toml @@ -3,7 +3,6 @@ protocol = "trojans" uuid = "a684455c-b14f-11ea-bf0d-42010aaa0003" host = "0.0.0.0" port = 4434 -version = 0 insecure = true fallback = ":80" cert = "cert.pem" diff --git a/examples/trojan_grpc_smux.client.toml b/examples/trojan_grpc_smux.client.toml index 5de4575..e6bb3e5 100644 --- a/examples/trojan_grpc_smux.client.toml +++ b/examples/trojan_grpc_smux.client.toml @@ -9,7 +9,6 @@ protocol = "trojans" uuid = "a684455c-b14f-11ea-bf0d-42010aaa0003" host = "127.0.0.1" port = 4434 -version = 0 insecure = true utls = true advancedLayer = "grpc" diff --git a/main.go b/main.go index 7f7565a..e4ba713 100644 --- a/main.go +++ b/main.go @@ -366,7 +366,7 @@ func listenSer(inServer proxy.Server, defaultOutClientForThis proxy.Client, not_ if !ok { utils.Error("read from SuperProxy not ok") - quic.CloseSession(baseConn) + quic.CloseConn(baseConn) return } diff --git a/tcp_test.go b/tcp_test.go index f29a78f..df6ac94 100644 --- a/tcp_test.go +++ b/tcp_test.go @@ -133,12 +133,11 @@ protocol = "direct" }, } - //tryGetHttp(client, "http://www.baidu.com", t) - //tryGetHttp(client, "https://www.qq.com", t) tryGetHttp(client, "http://captive.apple.com", t) tryGetHttp(client, "http://www.msftconnecttest.com/connecttest.txt", t) //联通性测试 可参考 https://imldy.cn/posts/99d42f85/ + // 用这种 captive 测试 不容易遇到 网站无法在 某些地区 如 github action 所在的地区 访问 或者卡顿等情况. } func tryGetHttp(client *http.Client, path string, t *testing.T) { @@ -149,7 +148,7 @@ func tryGetHttp(client *http.Client, path string, t *testing.T) { t.FailNow() } - t.Log("Got,start read") + t.Log("Got response, start read") bs, err := ioutil.ReadAll(resp.Body) if err != nil { @@ -162,6 +161,9 @@ func tryGetHttp(client *http.Client, path string, t *testing.T) { if len(bs) > 5 { t.Log("first 5:", string(bs[:5])) + } else { + t.Log("all:", bs) + } } diff --git a/udp_test.go b/udp_test.go index f7d0167..bc61102 100644 --- a/udp_test.go +++ b/udp_test.go @@ -12,43 +12,6 @@ import ( "github.com/miekg/dns" ) -/* -nc 模拟dns请求 -https://unix.stackexchange.com/questions/600194/create-dns-query-with-netcat-or-dev-udp - -echo cfc9 0100 0001 0000 0000 0000 0a64 7563 6b64 7563 6b67 6f03 636f 6d00 0001 0001 | - xxd -p -r | nc -u -v 114.114.114.114 53 - -不过为了灵活我们还是引用 miekg/dns 包 - 参考 https://zhengyinyong.com/post/go-dns-library/ - -虽然net.Resolver也能用, -https://stackoverflow.com/questions/59889882/specifying-dns-server-for-lookup-in-go - -但是我还是喜欢 miekg/dns; - -func TestDNSLookup_CN(t *testing.T) { - m := new(dns.Msg) - m.SetQuestion(dns.Fqdn("www.qq.com"), dns.TypeA) - c := new(dns.Client) - - r, _, err := c.Exchange(m, "114.114.114.114:53") - if r == nil { - t.Log("*** error: ", err.Error()) - t.FailNow() - } - - if r.Rcode != dns.RcodeSuccess { - t.Log("*** err2 ", r.Rcode, r) - t.FailNow() - } - - for _, a := range r.Answer { - t.Log(a) - } -} -*/ - func TestUDP_vless(t *testing.T) { testUDP("vless", 0, "tcp", false, false, false, t) }