修订代码;为 quic的 0-rtt功能做铺垫.

This commit is contained in:
hahafool
2022-04-24 22:59:47 +08:00
parent 73b14b9a9a
commit 96dbc99e8b
9 changed files with 301 additions and 302 deletions

View File

@@ -30,9 +30,9 @@ vs的一些亮点是 全协议readv加速lazy技术vless v1hysteria 阻
支持的功能有:
socks5/http/dokodemo/tproxy(透明代理)/trojan/simplesocks/vless/vless_v1,
socks5/http/dokodemo/tproxy(透明代理)/trojan/simplesocks/vless(v0/v1),
ws(以及earlydata)/grpc/quic(以及hy阻控)/smux,
ws(以及earlydata)/grpc(以及multiMode 以及uTls)/quic(以及hy阻控)/smux,
dns(udp/tls)/route(geoip/geosite)/fallback(path/sni/alpn),

168
advLayer/quic/client.go Normal file
View File

@@ -0,0 +1,168 @@
package quic
import (
"crypto/tls"
"net"
"reflect"
"sync"
"sync/atomic"
"github.com/hahahrfool/v2ray_simple/netLayer"
"github.com/hahahrfool/v2ray_simple/utils"
"github.com/lucas-clemente/quic-go"
"github.com/lucas-clemente/quic-go/congestion"
"go.uber.org/zap"
)
type Client struct {
knownServerMaxStreamCount int32
serverAddrStr string
tlsConf tls.Config
useHysteria, hysteria_manual bool
maxbyteCount int
clientconns map[[16]byte]*sessionState
sessionMapMutex sync.RWMutex
}
func NewClient(addr *netLayer.Addr, alpnList []string, host string, insecure bool, useHysteria bool, maxbyteCount int, hysteria_manual bool) *Client {
return &Client{
serverAddrStr: addr.String(),
tlsConf: tls.Config{
InsecureSkipVerify: insecure,
ServerName: host,
NextProtos: alpnList,
},
useHysteria: useHysteria,
hysteria_manual: hysteria_manual,
maxbyteCount: maxbyteCount,
}
}
//trimSessions移除不Active的session, 并试图返回一个 最佳的可用于新stream的session
func (c *Client) trimSessions(ss map[[16]byte]*sessionState) (s *sessionState) {
minSessionNum := 10000
for id, thisState := range ss {
if isActive(thisState) {
if c.knownServerMaxStreamCount == 0 {
s = thisState
return
} else {
osc := int(thisState.openedStreamCount)
if osc < int(c.knownServerMaxStreamCount) {
if osc < minSessionNum {
s = thisState
minSessionNum = osc
}
}
}
} else {
thisState.CloseWithError(0, "")
delete(ss, id)
}
}
return
}
//获取已拨号的连接,或者重新从底层拨号。返回一个可作 c.DialSubConn 参数 的值.
func (c *Client) DialCommonConn(openBecausePreviousFull bool, previous any) any {
//返回一个 *sessionState.
//我们采用预先openStream的策略, 来试出哪些session已经满了, 哪些没满
// 已知的是, 一个session满了之后, 要等待 045秒 或以上的时间, 才能它才可能腾出空位
//我们对每一个session所打开过的stream进行计数这样就可以探知 服务端 的 最大stream数设置.
if !openBecausePreviousFull {
c.sessionMapMutex.Lock()
var theSession *sessionState
if len(c.clientconns) > 0 {
theSession = c.trimSessions(c.clientconns)
}
if len(c.clientconns) > 0 {
c.sessionMapMutex.Unlock()
if theSession != nil {
return theSession
}
} else {
c.clientconns = make(map[[16]byte]*sessionState)
c.sessionMapMutex.Unlock()
}
} else if previous != nil && c.knownServerMaxStreamCount == 0 {
ps, ok := previous.(*sessionState)
if !ok {
if ce := utils.CanLogDebug("QUIC: 'previous' parameter was given but with wrong type "); ce != nil {
ce.Write(zap.String("type", reflect.TypeOf(previous).String()))
}
return nil
}
c.knownServerMaxStreamCount = ps.openedStreamCount
if ce := utils.CanLogDebug("QUIC: knownServerMaxStreamCount"); ce != nil {
ce.Write(zap.Int32("count", c.knownServerMaxStreamCount))
}
}
session, err := quic.DialAddr(c.serverAddrStr, &c.tlsConf, &common_DialConfig)
if err != nil {
if ce := utils.CanLogErr("QUIC: dial failed"); ce != nil {
ce.Write(zap.Error(err))
}
return nil
}
if c.useHysteria {
if c.maxbyteCount <= 0 {
c.maxbyteCount = Default_hysteriaMaxByteCount
}
if c.hysteria_manual {
bs := NewBrutalSender_M(congestion.ByteCount(c.maxbyteCount))
session.SetCongestionControl(bs)
} else {
bs := NewBrutalSender(congestion.ByteCount(c.maxbyteCount))
session.SetCongestionControl(bs)
}
}
id := utils.GenerateUUID()
var result = &sessionState{Connection: session, id: id}
c.sessionMapMutex.Lock()
c.clientconns[id] = result
c.sessionMapMutex.Unlock()
return result
}
func (c *Client) DialSubConn(thing any) (net.Conn, error) {
theState, ok := thing.(*sessionState)
if !ok {
return nil, utils.ErrNilOrWrongParameter
}
stream, err := theState.OpenStream()
if err != nil {
return nil, err
}
atomic.AddInt32(&theState.openedStreamCount, 1)
return StreamConn{Stream: stream, laddr: theState.LocalAddr(), raddr: theState.RemoteAddr(), relatedSessionState: theState}, nil
}

51
advLayer/quic/conn.go Normal file
View File

@@ -0,0 +1,51 @@
package quic
import (
"net"
"sync/atomic"
"github.com/lucas-clemente/quic-go"
)
//用于 跟踪 一个 session 中 所开启的 stream的数量
type sessionState struct {
quic.Connection
id [16]byte
openedStreamCount int32
}
//给 quic.Stream 添加 方法使其满足 net.Conn.
// quic.Stream 唯独不支持 LocalAddr 和 RemoteAddr 方法.
// 因为它是通过 StreamID 来识别连接. 不过session是有的。
type StreamConn struct {
quic.Stream
laddr, raddr net.Addr
relatedSessionState *sessionState
isclosed bool
}
func (sc StreamConn) LocalAddr() net.Addr {
return sc.laddr
}
func (sc StreamConn) RemoteAddr() net.Addr {
return sc.raddr
}
//这里必须要同时调用 CancelRead 和 CancelWrite
// 因为 quic-go这个设计的是双工的调用Close实际上只是间接调用了 CancelWrite
// 看 quic-go包中的 quic.SendStream 的注释就知道了.
func (sc StreamConn) Close() error {
if sc.isclosed {
return nil
}
sc.isclosed = true
sc.CancelRead(quic.StreamErrorCode(quic.ConnectionRefused))
sc.CancelWrite(quic.StreamErrorCode(quic.ConnectionRefused))
if rss := sc.relatedSessionState; rss != nil {
atomic.AddInt32(&rss.openedStreamCount, -1)
}
return sc.Stream.Close()
}

View File

@@ -7,14 +7,12 @@ package quic
import (
"context"
"crypto/tls"
"log"
"net"
"reflect"
"sync"
"sync/atomic"
"time"
"github.com/hahahrfool/v2ray_simple/advLayer"
"github.com/hahahrfool/v2ray_simple/netLayer"
"github.com/hahahrfool/v2ray_simple/utils"
"github.com/lucas-clemente/quic-go"
"github.com/lucas-clemente/quic-go/congestion"
@@ -36,55 +34,34 @@ func init() {
//我们要是以后不使用hysteria的话只需删掉 useHysteria 里的代码, 删掉 pacer.go/brutal.go, 并删掉 go.mod中的replace部分.
// 然后proxy.go里的 相关配置部分也要删掉 在 prepareTLS_for* 函数中 的相关配置 即可.
//100mbps
const Default_hysteriaMaxByteCount = 1024 * 1024 / 8 * 100
func CloseSession(baseC any) {
baseC.(quic.Connection).CloseWithError(0, "")
}
//给 quic.Stream 添加 方法使其满足 net.Conn.
// quic.Stream 唯独不支持 LocalAddr 和 RemoteAddr 方法.
// 因为它是通过 StreamID 来识别连接. 不过session是有的。
type StreamConn struct {
quic.Stream
laddr, raddr net.Addr
relatedSessionState *sessionState
isclosed bool
}
func (sc StreamConn) LocalAddr() net.Addr {
return sc.laddr
}
func (sc StreamConn) RemoteAddr() net.Addr {
return sc.raddr
}
//这里必须要同时调用 CancelRead 和 CancelWrite
// 因为 quic-go这个设计的是双工的调用Close实际上只是间接调用了 CancelWrite
// 看 quic-go包中的 quic.SendStream 的注释就知道了.
func (sc StreamConn) Close() error {
if sc.isclosed {
return nil
}
sc.isclosed = true
sc.CancelRead(quic.StreamErrorCode(quic.ConnectionRefused))
sc.CancelWrite(quic.StreamErrorCode(quic.ConnectionRefused))
if rss := sc.relatedSessionState; rss != nil {
atomic.AddInt32(&rss.openedStreamCount, -1)
}
return sc.Stream.Close()
}
const (
//100mbps
Default_hysteriaMaxByteCount = 1024 * 1024 / 8 * 100
common_maxidletimeout = time.Second * 45
common_HandshakeIdleTimeout = time.Second * 8
common_ConnectionIDLength = 12
server_maxStreamCountInOneSession = 4
server_maxStreamCountInOneSession = 4 //一个session中 stream越多, 性能越低, 因此我们这里限制为4
)
func isActive(s quic.Connection) bool {
select {
case <-s.Context().Done():
return false
default:
return true
}
}
func CloseConn(baseC any) {
qc, ok := baseC.(quic.Connection)
if ok {
qc.CloseWithError(0, "")
} else {
log.Panicln("quic.CloseConn called with illegal parameter", reflect.TypeOf(baseC).String(), baseC)
}
}
var (
AlpnList = []string{"h3"}
@@ -129,8 +106,13 @@ func ListenInitialLayers(addr string, tlsConf tls.Config, useHysteria bool, hyst
newConnChan = make(chan net.Conn, 10)
go func(theChan chan net.Conn) {
go loopAccept(listener, newConnChan, useHysteria, hysteria_manual, hysteriaMaxByteCount)
return
}
//阻塞
func loopAccept(listener quic.Listener, theChan chan net.Conn, useHysteria bool, hysteria_manual bool, hysteriaMaxByteCount int) {
for {
session, err := listener.Accept(context.Background())
if err != nil {
@@ -141,6 +123,13 @@ func ListenInitialLayers(addr string, tlsConf tls.Config, useHysteria bool, hyst
return
}
dealNewSession(session, theChan, useHysteria, hysteria_manual, hysteriaMaxByteCount)
}
}
//非阻塞
func dealNewSession(session quic.Connection, theChan chan net.Conn, useHysteria bool, hysteria_manual bool, hysteriaMaxByteCount int) {
if useHysteria {
if hysteria_manual {
@@ -176,175 +165,3 @@ func ListenInitialLayers(addr string, tlsConf tls.Config, useHysteria bool, hyst
}
}()
}
}(newConnChan)
return
}
func isActive(s quic.Connection) bool {
select {
case <-s.Context().Done():
return false
default:
return true
}
}
type Client struct {
knownServerMaxStreamCount int32
serverAddrStr string
tlsConf tls.Config
useHysteria, hysteria_manual bool
maxbyteCount int
clientconns map[[16]byte]*sessionState
sessionMapMutex sync.RWMutex
}
type sessionState struct {
quic.Connection
id [16]byte
openedStreamCount int32
}
func NewClient(addr *netLayer.Addr, alpnList []string, host string, insecure bool, useHysteria bool, maxbyteCount int, hysteria_manual bool) *Client {
return &Client{
serverAddrStr: addr.String(),
tlsConf: tls.Config{
InsecureSkipVerify: insecure,
ServerName: host,
NextProtos: alpnList,
},
useHysteria: useHysteria,
hysteria_manual: hysteria_manual,
maxbyteCount: maxbyteCount,
}
}
//trimSessions移除不Active的session, 并试图返回一个 最佳的可用于新stream的session
func (c *Client) trimSessions(ss map[[16]byte]*sessionState) (s *sessionState) {
minSessionNum := 10000
for id, thisState := range ss {
if isActive(thisState) {
if c.knownServerMaxStreamCount == 0 {
s = thisState
return
} else {
osc := int(thisState.openedStreamCount)
if osc < int(c.knownServerMaxStreamCount) {
if osc < minSessionNum {
s = thisState
minSessionNum = osc
}
}
}
} else {
thisState.CloseWithError(0, "")
delete(ss, id)
}
}
return
}
//获取已拨号的连接,或者重新从底层拨号
func (c *Client) DialCommonConn(openBecausePreviousFull bool, previous any) any {
//我们采用预先openStream的策略, 来试出哪些session已经满了, 哪些没满
// 已知的是, 一个session满了之后, 要等待 045秒 或以上的时间, 才能它才可能腾出空位
//我们对每一个session所打开过的stream进行计数这样就可以探知 服务端 的 最大stream数设置.
if !openBecausePreviousFull {
c.sessionMapMutex.Lock()
var theSession *sessionState
if len(c.clientconns) > 0 {
theSession = c.trimSessions(c.clientconns)
}
if len(c.clientconns) > 0 {
c.sessionMapMutex.Unlock()
if theSession != nil {
return theSession
}
} else {
c.clientconns = make(map[[16]byte]*sessionState)
c.sessionMapMutex.Unlock()
}
} else if previous != nil && c.knownServerMaxStreamCount == 0 {
ps, ok := previous.(*sessionState)
if !ok {
if ce := utils.CanLogDebug("QUIC: 'previous' parameter was given but with wrong type "); ce != nil {
ce.Write(zap.String("type", reflect.TypeOf(previous).String()))
}
return nil
}
c.knownServerMaxStreamCount = ps.openedStreamCount
if ce := utils.CanLogDebug("QUIC: knownServerMaxStreamCount"); ce != nil {
ce.Write(zap.Int32("count", c.knownServerMaxStreamCount))
}
}
session, err := quic.DialAddr(c.serverAddrStr, &c.tlsConf, &common_DialConfig)
if err != nil {
if ce := utils.CanLogErr("QUIC: dial failed"); ce != nil {
ce.Write(zap.Error(err))
}
return nil
}
if c.useHysteria {
if c.maxbyteCount <= 0 {
c.maxbyteCount = Default_hysteriaMaxByteCount
}
if c.hysteria_manual {
bs := NewBrutalSender_M(congestion.ByteCount(c.maxbyteCount))
session.SetCongestionControl(bs)
} else {
bs := NewBrutalSender(congestion.ByteCount(c.maxbyteCount))
session.SetCongestionControl(bs)
}
}
id := utils.GenerateUUID()
var result = &sessionState{Connection: session, id: id}
c.sessionMapMutex.Lock()
c.clientconns[id] = result
c.sessionMapMutex.Unlock()
return result
}
func (c *Client) DialSubConn(thing any) (net.Conn, error) {
theState, ok := thing.(*sessionState)
if !ok {
return nil, utils.ErrNilOrWrongParameter
}
stream, err := theState.OpenStream()
if err != nil {
return nil, err
}
atomic.AddInt32(&theState.openedStreamCount, 1)
return StreamConn{Stream: stream, laddr: theState.LocalAddr(), raddr: theState.RemoteAddr(), relatedSessionState: theState}, nil
}

View File

@@ -3,7 +3,6 @@ protocol = "trojans"
uuid = "a684455c-b14f-11ea-bf0d-42010aaa0003"
host = "0.0.0.0"
port = 4434
version = 0
insecure = true
fallback = ":80"
cert = "cert.pem"

View File

@@ -9,7 +9,6 @@ protocol = "trojans"
uuid = "a684455c-b14f-11ea-bf0d-42010aaa0003"
host = "127.0.0.1"
port = 4434
version = 0
insecure = true
utls = true
advancedLayer = "grpc"

View File

@@ -366,7 +366,7 @@ func listenSer(inServer proxy.Server, defaultOutClientForThis proxy.Client, not_
if !ok {
utils.Error("read from SuperProxy not ok")
quic.CloseSession(baseConn)
quic.CloseConn(baseConn)
return
}

View File

@@ -133,12 +133,11 @@ protocol = "direct"
},
}
//tryGetHttp(client, "http://www.baidu.com", t)
//tryGetHttp(client, "https://www.qq.com", t)
tryGetHttp(client, "http://captive.apple.com", t)
tryGetHttp(client, "http://www.msftconnecttest.com/connecttest.txt", t)
//联通性测试 可参考 https://imldy.cn/posts/99d42f85/
// 用这种 captive 测试 不容易遇到 网站无法在 某些地区 如 github action 所在的地区 访问 或者卡顿等情况.
}
func tryGetHttp(client *http.Client, path string, t *testing.T) {
@@ -149,7 +148,7 @@ func tryGetHttp(client *http.Client, path string, t *testing.T) {
t.FailNow()
}
t.Log("Got,start read")
t.Log("Got response, start read")
bs, err := ioutil.ReadAll(resp.Body)
if err != nil {
@@ -162,6 +161,9 @@ func tryGetHttp(client *http.Client, path string, t *testing.T) {
if len(bs) > 5 {
t.Log("first 5:", string(bs[:5]))
} else {
t.Log("all:", bs)
}
}

View File

@@ -12,43 +12,6 @@ import (
"github.com/miekg/dns"
)
/*
nc 模拟dns请求
https://unix.stackexchange.com/questions/600194/create-dns-query-with-netcat-or-dev-udp
echo cfc9 0100 0001 0000 0000 0000 0a64 7563 6b64 7563 6b67 6f03 636f 6d00 0001 0001 |
xxd -p -r | nc -u -v 114.114.114.114 53
不过为了灵活我们还是引用 miekg/dns 包
参考 https://zhengyinyong.com/post/go-dns-library/
虽然net.Resolver也能用
https://stackoverflow.com/questions/59889882/specifying-dns-server-for-lookup-in-go
但是我还是喜欢 miekg/dns;
func TestDNSLookup_CN(t *testing.T) {
m := new(dns.Msg)
m.SetQuestion(dns.Fqdn("www.qq.com"), dns.TypeA)
c := new(dns.Client)
r, _, err := c.Exchange(m, "114.114.114.114:53")
if r == nil {
t.Log("*** error: ", err.Error())
t.FailNow()
}
if r.Rcode != dns.RcodeSuccess {
t.Log("*** err2 ", r.Rcode, r)
t.FailNow()
}
for _, a := range r.Answer {
t.Log(a)
}
}
*/
func TestUDP_vless(t *testing.T) {
testUDP("vless", 0, "tcp", false, false, false, t)
}