From e841d258b14aabf50ce3cc7eebbcfb673a891807 Mon Sep 17 00:00:00 2001 From: e1732a364fed <75717694+e1732a364fed@users.noreply.github.com> Date: Sat, 1 Jan 2000 00:00:00 +0000 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E8=AE=A2shadowTls=E4=BB=A3=E7=A0=81;?= =?UTF-8?q?=E5=8F=96=E6=B6=88shadowTls=E7=9A=84readv,=20=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?writev?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 经过思考,readv应该不会有什么性能提升,因为需要解析数据并重新分包; 通过实现writev,可以将从tcp流readv得到的数据写入,配合readv得到性能提升 --- main.go | 76 +++++++++---------- tlsLayer/client.go | 26 ++----- tlsLayer/fake.go | 180 +++------------------------------------------ tlsLayer/io.go | 44 +++++++++++ tlsLayer/shadow.go | 25 ++++++- utils/buffers.go | 42 ++++++++++- 6 files changed, 164 insertions(+), 229 deletions(-) diff --git a/main.go b/main.go index a2fb608..af59c8d 100644 --- a/main.go +++ b/main.go @@ -418,47 +418,49 @@ func handleNewIncomeConnection(inServer proxy.Server, defaultClientForThis proxy wsConn, err := singleSer.Handshake(wrappedConn) - if errors.Is(err, httpLayer.ErrShouldFallback) { + if err != nil { + if errors.Is(err, httpLayer.ErrShouldFallback) { - meta := wsConn.(httpLayer.FallbackMeta) + meta := wsConn.(httpLayer.FallbackMeta) - iics.fallbackRequestPath = meta.Path - iics.fallbackFirstBuffer = meta.H1RequestBuf - iics.wrappedConn = meta.Conn - if meta.XFF != nil { - iics.cachedRemoteAddr = meta.XFF.String() + iics.fallbackRequestPath = meta.Path + iics.fallbackFirstBuffer = meta.H1RequestBuf + iics.wrappedConn = meta.Conn + if meta.XFF != nil { + iics.cachedRemoteAddr = meta.XFF.String() + + } + + if ce := iics.CanLogDebug("Single AdvLayer Check failed, will fallback."); ce != nil { + + ce.Write( + zap.String("handler", inServer.AddrStr()), + zap.String("advLayer", adv), + zap.String("Reason", meta.Reason), + zap.String("validPath", advSer.GetPath()), + zap.String("gotMethod", meta.Method), + zap.String("gotPath", meta.Path), + zap.String("from", iics.cachedRemoteAddr), + ) + } + + passToOutClient(iics, true, nil, nil, netLayer.Addr{}) + return + + } else { + if ce := iics.CanLogErr("InServer Single AdvLayer handshake failed"); ce != nil { + + ce.Write( + zap.String("handler", inServer.AddrStr()), + zap.String("advLayer", adv), + zap.Error(err), + ) + } + + wrappedConn.Close() + return } - - if ce := iics.CanLogDebug("Single AdvLayer Check failed, will fallback."); ce != nil { - - ce.Write( - zap.String("handler", inServer.AddrStr()), - zap.String("advLayer", adv), - zap.String("Reason", meta.Reason), - zap.String("validPath", advSer.GetPath()), - zap.String("gotMethod", meta.Method), - zap.String("gotPath", meta.Path), - zap.String("from", iics.cachedRemoteAddr), - ) - } - - passToOutClient(iics, true, nil, nil, netLayer.Addr{}) - return - - } else if err != nil { - if ce := iics.CanLogErr("InServer Single AdvLayer handshake failed"); ce != nil { - - ce.Write( - zap.String("handler", inServer.AddrStr()), - zap.String("advLayer", adv), - zap.Error(err), - ) - } - - wrappedConn.Close() - return - } else { wrappedConn = wsConn diff --git a/tlsLayer/client.go b/tlsLayer/client.go index f378236..9c5226d 100644 --- a/tlsLayer/client.go +++ b/tlsLayer/client.go @@ -53,7 +53,8 @@ func NewClient(conf Conf) *Client { return c } -func (c *Client) Handshake(underlay net.Conn) (tlsConn Conn, err error) { +// utls和tls时返回tlsLayer.Conn, shadowTls1时返回underlay, shadowTls2时返回 普通 net.Conn +func (c *Client) Handshake(underlay net.Conn) (result net.Conn, err error) { switch c.tlsType { case UTls_t: @@ -65,7 +66,7 @@ func (c *Client) Handshake(underlay net.Conn) (tlsConn Conn, err error) { if err != nil { return } - tlsConn = &conn{ + result = &conn{ Conn: utlsConn, ptr: unsafe.Pointer(utlsConn.Conn), tlsType: UTls_t, @@ -77,7 +78,7 @@ func (c *Client) Handshake(underlay net.Conn) (tlsConn Conn, err error) { return } - tlsConn = &conn{ + result = &conn{ Conn: officialConn, ptr: unsafe.Pointer(officialConn), tlsType: Tls_t, @@ -89,10 +90,7 @@ func (c *Client) Handshake(underlay net.Conn) (tlsConn Conn, err error) { return } - tlsConn = &conn{ - Conn: underlay, - tlsType: ShadowTls_t, - } + result = underlay case ShadowTls2_t: @@ -110,17 +108,9 @@ func (c *Client) Handshake(underlay net.Conn) (tlsConn Conn, err error) { return } - // err = tls.Client(rw, c.tlsConfig).Handshake() - // if err != nil { - // return - // } - - tlsConn = &conn{ - Conn: &shadowClientConn{ - FakeAppDataConn: &FakeAppDataConn{Conn: rw}, - sum: hashR.Sum(), - }, - tlsType: ShadowTls2_t, + result = &shadowClientConn{ + FakeAppDataConn: &FakeAppDataConn{Conn: rw}, + sum: hashR.Sum(), } } diff --git a/tlsLayer/fake.go b/tlsLayer/fake.go index 34149d3..a727415 100644 --- a/tlsLayer/fake.go +++ b/tlsLayer/fake.go @@ -1,28 +1,20 @@ package tlsLayer import ( - "bytes" - "crypto/tls" "encoding/binary" - "errors" "io" "net" - "syscall" - "github.com/e1732a364fed/v2ray_simple/netLayer" "github.com/e1732a364fed/v2ray_simple/utils" ) -// 读写都按tls application data 的格式走,其实就是加个包头 +// 读写都按tls application data 的格式走,其实就是加个包头. 实现 utils.MultiWriter type FakeAppDataConn struct { net.Conn readRemaining int OptionalReader io.Reader OptionalReaderRemainLen int - - //for readv - rr syscall.RawConn } func (c *FakeAppDataConn) Read(p []byte) (n int, err error) { @@ -63,46 +55,6 @@ func (c *FakeAppDataConn) Read(p []byte) (n int, err error) { return } -func WriteAppData(conn io.Writer, buf *bytes.Buffer, d []byte) (n int, err error) { - var h [5]byte - h[0] = 23 - binary.BigEndian.PutUint16(h[1:3], tls.VersionTLS12) - binary.BigEndian.PutUint16(h[3:], uint16(len(d))) - - shouldPut := false - - if buf == nil { - buf = utils.GetBuf() - shouldPut = true - } - buf.Write(h[:]) - buf.Write(d) - - n, err = conn.Write(buf.Bytes()) - - if shouldPut { - utils.PutBuf(buf) - - } - return -} - -// 一般conn直接为tcp连接,而它是有系统缓存的,因此我们一般不需要特地创建一个缓存 -// 写两遍之后在发出 -func WriteAppDataNoBuf(conn io.Writer, d []byte) (n int, err error) { - var h [5]byte - h[0] = 23 - binary.BigEndian.PutUint16(h[1:3], tls.VersionTLS12) - binary.BigEndian.PutUint16(h[3:], uint16(len(d))) - - _, err = conn.Write(h[:]) - if err != nil { - return - } - return conn.Write(d) - -} - func (c *FakeAppDataConn) Write(p []byte) (n int, err error) { const maxlen = 1 << 14 @@ -131,127 +83,13 @@ func (c *FakeAppDataConn) Upstream() any { return c.Conn } -func (c *FakeAppDataConn) WillReadBuffersBenifit() int { - //一般而言底层连接就是tcp - if r, rr := netLayer.IsConnGoodForReadv(c.Conn); r != 0 { - c.rr = rr - } - - return 1 -} - -// ReadBuffers一次性尽可能多读几个buf, 每个buf内部都是一个完整的appdata -func (c *FakeAppDataConn) ReadBuffers() (buffers [][]byte, err error) { - - //最理想的情况是,一次性读到一个大包,我们根据tls包头分割出多个appdata,然后放入buffers中 - - //但是有可能读不完整 - - var reader io.Reader - - wholeReadLen := 0 - //一次性读取 - if c.rr != nil { - readv_mem := utils.Get_readvMem() - buffers, err = utils.ReadvFrom(c.rr, readv_mem) - if err != nil { - return nil, err - } - - reader = utils.BuffersToMultiReader(buffers) - wholeReadLen = utils.BuffersLen(buffers) - } else { - bs := utils.GetPacket() - wholeReadLen, err = c.Conn.Read(bs) - if err != nil { - return nil, err - } - reader = bytes.NewBuffer(bs[:wholeReadLen]) - } - if wholeReadLen < 5 { - return nil, errors.New("FakeAppDataConn ReadBuffers too short") - } - wholeLeftLen := wholeReadLen - - if c.readRemaining > 0 { - bs := utils.GetPacket() - var n int - n, err = c.Conn.Read(bs[:c.readRemaining]) - c.readRemaining -= n - buffers = append(buffers, bs[:n]) - return - } - for { - if wholeLeftLen < 5 { - reader = io.MultiReader(reader, c.Conn) - var tlsHeader [5]byte - _, err = io.ReadFull(reader, tlsHeader[:]) - if err != nil { - return - } - thisAppDataLen := int(binary.BigEndian.Uint16(tlsHeader[3:])) - if tlsHeader[0] != 23 { - return nil, utils.ErrInErr{ErrDesc: "unexpected TLS record type: ", Data: tlsHeader[0]} - } - thisBs := utils.GetPacket() - - var n int - n, err = reader.Read(thisBs[:thisAppDataLen]) - if err != nil { - return - } - buffers = append(buffers, thisBs[:n]) - - if diff := thisAppDataLen - n; diff > 0 { - c.readRemaining = diff - - } else if diff < 0 { - return nil, utils.ErrInErr{ErrDesc: "FakeAppDataConn ReadBuffers read n>thisAppDataLen ", Data: []int{n, thisAppDataLen}} - } - return - - } - - var tlsHeader [5]byte - _, err = io.ReadFull(reader, tlsHeader[:]) - if err != nil { - return - } - wholeLeftLen -= 5 - - thisAppDataLen := int(binary.BigEndian.Uint16(tlsHeader[3:])) - if tlsHeader[0] != 23 { - return nil, utils.ErrInErr{ErrDesc: "unexpected TLS record type: ", Data: tlsHeader[0]} - } - thisBs := utils.GetPacket() - - var n int - n, err = reader.Read(thisBs[:thisAppDataLen]) - if err != nil { - return - } - buffers = append(buffers, thisBs[:n]) - - if diff := thisAppDataLen - n; diff > 0 { - c.readRemaining = diff - return - - } else if diff < 0 { - return nil, utils.ErrInErr{ErrDesc: "FakeAppDataConn ReadBuffers read n>thisAppDataLen ", Data: []int{n, thisAppDataLen}} - } - - wholeLeftLen -= thisAppDataLen - - if wholeLeftLen == 0 { - return - } - - } - -} -func (c *FakeAppDataConn) PutBuffers(bss [][]byte) { - for _, v := range bss { - utils.PutPacket(v) - } +func (c *FakeAppDataConn) WriteBuffers(bss [][]byte) (int64, error) { + // 在server端,从direct用readv读到的数据可以用 WriteBuffers写回,可以加速 + allLen := utils.BuffersLen(bss) + err := WriteAppDataHeader(c.Conn, allLen) + if err != nil { + return 0, err + } + return utils.BuffersWriteTo(bss, c.Conn) } diff --git a/tlsLayer/io.go b/tlsLayer/io.go index 1db3e6f..aa50efa 100644 --- a/tlsLayer/io.go +++ b/tlsLayer/io.go @@ -2,6 +2,7 @@ package tlsLayer import ( "bytes" + "crypto/tls" "encoding/binary" "errors" "io" @@ -84,3 +85,46 @@ func CopyTls12Handshake(isSrcClient bool, dst, src net.Conn) error { } return nil } + +func WriteAppData(conn io.Writer, buf *bytes.Buffer, d []byte) (n int, err error) { + + shouldPut := false + + if buf == nil { + buf = utils.GetBuf() + shouldPut = true + } + WriteAppDataHeader(buf, len(d)) + + buf.Write(d) + + n, err = conn.Write(buf.Bytes()) + + if shouldPut { + utils.PutBuf(buf) + + } + return +} + +// 一般conn直接为tcp连接,而它是有系统缓存的,因此我们一般不需要特地创建一个缓存 +// 写两遍之后在发出 +func WriteAppDataNoBuf(w io.Writer, d []byte) (n int, err error) { + + err = WriteAppDataHeader(w, len(d)) + if err != nil { + return + } + return w.Write(d) + +} + +func WriteAppDataHeader(w io.Writer, len int) (err error) { + var h [5]byte + h[0] = 23 + binary.BigEndian.PutUint16(h[1:3], tls.VersionTLS12) + binary.BigEndian.PutUint16(h[3:], uint16(len)) + + _, err = w.Write(h[:]) + return +} diff --git a/tlsLayer/shadow.go b/tlsLayer/shadow.go index e408de6..8472315 100644 --- a/tlsLayer/shadow.go +++ b/tlsLayer/shadow.go @@ -251,7 +251,7 @@ func shadowCopyHandshakeClientToFake(fakeConn, clientConn net.Conn, hashW *utils } -// 第一次写时写入一个hash,其余直接使用 FakeAppDataConn +// 第一次写时写入一个hash,其余直接使用 FakeAppDataConn. 实现 utils.MultiWriter type shadowClientConn struct { *FakeAppDataConn sum []byte @@ -278,3 +278,26 @@ func (c *shadowClientConn) Write(p []byte) (n int, err error) { } return c.FakeAppDataConn.Write(p) } + +func (c *shadowClientConn) WriteBuffers(bss [][]byte) (int64, error) { + if c.sum != nil { + sum := c.sum + c.sum = nil + + result, dup := utils.MergeBuffersWithPrefix(sum, bss) + + allDataLen := len(result) - len(sum) + + _, err := c.FakeAppDataConn.Write(result) + + if dup { + utils.PutPacket(result) + } + if err != nil { + return 0, err + } + return int64(allDataLen), nil + + } + return c.FakeAppDataConn.WriteBuffers(bss) +} diff --git a/utils/buffers.go b/utils/buffers.go index 053327b..6a97e59 100644 --- a/utils/buffers.go +++ b/utils/buffers.go @@ -71,7 +71,9 @@ type SystemReadver interface { 所以websocket很有必要实现 WriteBuffers 方法. - 目前实现 的有 vless/trojan 的 UserTCPConn , ws.Conn 以及 grpc 的 multiConn + 目前实现 的有 vless/trojan 的 UserTCPConn , ws.Conn + + WriteV本身几乎没什么加速,但是因为ReadV有加速,所以通过WriteV写入ReadV读到的数组才能配合ReadV加速 */ type MultiWriter interface { WriteBuffers([][]byte) (int64, error) @@ -90,7 +92,7 @@ type MultiReader interface { type BuffersReader interface { ReadBuffers() ([][]byte, error) - //将 ReadBuffers 放回缓存,以便重复利用内存 + //将 ReadBuffers 放回缓存,以便重复利用内存. 因为这里不确定这个buffers是如何获取到的, 所以由实现者自行确定 PutBuffers([][]byte) } @@ -205,3 +207,39 @@ func BuffersToMultiReader(bs [][]byte) io.Reader { } return io.MultiReader(bufs...) } + +// similar to MergeBuffers. prefix must has content +func MergeBuffersWithPrefix(prefix []byte, bs [][]byte) (result []byte, duplicate bool) { + + b0 := prefix + if len(bs) == 0 { + return prefix, false + } + allLen := BuffersLen(bs) + len(prefix) + + if allLen <= cap(b0) { + b0 = b0[:allLen] + cursor := len(b0) + for i := 0; i < len(bs); i++ { + cursor += copy(b0[cursor:], bs[i]) + } + return b0, false + } + + if allLen <= MaxPacketLen { + result = GetPacket() + + } else { + result = make([]byte, allLen) //实际目前的readv实现也很难出现这种情况 + // 一定要尽量避免这种情况,如果 MaxBufLen小于readv buf总长度,会导致严重的内存泄漏问题, + // 见github issue #24 + } + + cursor := copy(result, prefix) + + for i := 0; i < len(bs); i++ { + cursor += copy(result[cursor:], bs[i]) + } + + return result[:allLen], true +}