修订shadowTls代码;取消shadowTls的readv, 添加writev

经过思考,readv应该不会有什么性能提升,因为需要解析数据并重新分包;
通过实现writev,可以将从tcp流readv得到的数据写入,配合readv得到性能提升
This commit is contained in:
e1732a364fed
2000-01-01 00:00:00 +00:00
parent 6e9808926b
commit e841d258b1
6 changed files with 164 additions and 229 deletions

76
main.go
View File

@@ -418,47 +418,49 @@ func handleNewIncomeConnection(inServer proxy.Server, defaultClientForThis proxy
wsConn, err := singleSer.Handshake(wrappedConn)
if errors.Is(err, httpLayer.ErrShouldFallback) {
if err != nil {
if errors.Is(err, httpLayer.ErrShouldFallback) {
meta := wsConn.(httpLayer.FallbackMeta)
meta := wsConn.(httpLayer.FallbackMeta)
iics.fallbackRequestPath = meta.Path
iics.fallbackFirstBuffer = meta.H1RequestBuf
iics.wrappedConn = meta.Conn
if meta.XFF != nil {
iics.cachedRemoteAddr = meta.XFF.String()
iics.fallbackRequestPath = meta.Path
iics.fallbackFirstBuffer = meta.H1RequestBuf
iics.wrappedConn = meta.Conn
if meta.XFF != nil {
iics.cachedRemoteAddr = meta.XFF.String()
}
if ce := iics.CanLogDebug("Single AdvLayer Check failed, will fallback."); ce != nil {
ce.Write(
zap.String("handler", inServer.AddrStr()),
zap.String("advLayer", adv),
zap.String("Reason", meta.Reason),
zap.String("validPath", advSer.GetPath()),
zap.String("gotMethod", meta.Method),
zap.String("gotPath", meta.Path),
zap.String("from", iics.cachedRemoteAddr),
)
}
passToOutClient(iics, true, nil, nil, netLayer.Addr{})
return
} else {
if ce := iics.CanLogErr("InServer Single AdvLayer handshake failed"); ce != nil {
ce.Write(
zap.String("handler", inServer.AddrStr()),
zap.String("advLayer", adv),
zap.Error(err),
)
}
wrappedConn.Close()
return
}
if ce := iics.CanLogDebug("Single AdvLayer Check failed, will fallback."); ce != nil {
ce.Write(
zap.String("handler", inServer.AddrStr()),
zap.String("advLayer", adv),
zap.String("Reason", meta.Reason),
zap.String("validPath", advSer.GetPath()),
zap.String("gotMethod", meta.Method),
zap.String("gotPath", meta.Path),
zap.String("from", iics.cachedRemoteAddr),
)
}
passToOutClient(iics, true, nil, nil, netLayer.Addr{})
return
} else if err != nil {
if ce := iics.CanLogErr("InServer Single AdvLayer handshake failed"); ce != nil {
ce.Write(
zap.String("handler", inServer.AddrStr()),
zap.String("advLayer", adv),
zap.Error(err),
)
}
wrappedConn.Close()
return
} else {
wrappedConn = wsConn

View File

@@ -53,7 +53,8 @@ func NewClient(conf Conf) *Client {
return c
}
func (c *Client) Handshake(underlay net.Conn) (tlsConn Conn, err error) {
// utls和tls时返回tlsLayer.Conn, shadowTls1时返回underlay, shadowTls2时返回 普通 net.Conn
func (c *Client) Handshake(underlay net.Conn) (result net.Conn, err error) {
switch c.tlsType {
case UTls_t:
@@ -65,7 +66,7 @@ func (c *Client) Handshake(underlay net.Conn) (tlsConn Conn, err error) {
if err != nil {
return
}
tlsConn = &conn{
result = &conn{
Conn: utlsConn,
ptr: unsafe.Pointer(utlsConn.Conn),
tlsType: UTls_t,
@@ -77,7 +78,7 @@ func (c *Client) Handshake(underlay net.Conn) (tlsConn Conn, err error) {
return
}
tlsConn = &conn{
result = &conn{
Conn: officialConn,
ptr: unsafe.Pointer(officialConn),
tlsType: Tls_t,
@@ -89,10 +90,7 @@ func (c *Client) Handshake(underlay net.Conn) (tlsConn Conn, err error) {
return
}
tlsConn = &conn{
Conn: underlay,
tlsType: ShadowTls_t,
}
result = underlay
case ShadowTls2_t:
@@ -110,17 +108,9 @@ func (c *Client) Handshake(underlay net.Conn) (tlsConn Conn, err error) {
return
}
// err = tls.Client(rw, c.tlsConfig).Handshake()
// if err != nil {
// return
// }
tlsConn = &conn{
Conn: &shadowClientConn{
FakeAppDataConn: &FakeAppDataConn{Conn: rw},
sum: hashR.Sum(),
},
tlsType: ShadowTls2_t,
result = &shadowClientConn{
FakeAppDataConn: &FakeAppDataConn{Conn: rw},
sum: hashR.Sum(),
}
}

View File

@@ -1,28 +1,20 @@
package tlsLayer
import (
"bytes"
"crypto/tls"
"encoding/binary"
"errors"
"io"
"net"
"syscall"
"github.com/e1732a364fed/v2ray_simple/netLayer"
"github.com/e1732a364fed/v2ray_simple/utils"
)
// 读写都按tls application data 的格式走,其实就是加个包头
// 读写都按tls application data 的格式走,其实就是加个包头. 实现 utils.MultiWriter
type FakeAppDataConn struct {
net.Conn
readRemaining int
OptionalReader io.Reader
OptionalReaderRemainLen int
//for readv
rr syscall.RawConn
}
func (c *FakeAppDataConn) Read(p []byte) (n int, err error) {
@@ -63,46 +55,6 @@ func (c *FakeAppDataConn) Read(p []byte) (n int, err error) {
return
}
func WriteAppData(conn io.Writer, buf *bytes.Buffer, d []byte) (n int, err error) {
var h [5]byte
h[0] = 23
binary.BigEndian.PutUint16(h[1:3], tls.VersionTLS12)
binary.BigEndian.PutUint16(h[3:], uint16(len(d)))
shouldPut := false
if buf == nil {
buf = utils.GetBuf()
shouldPut = true
}
buf.Write(h[:])
buf.Write(d)
n, err = conn.Write(buf.Bytes())
if shouldPut {
utils.PutBuf(buf)
}
return
}
// 一般conn直接为tcp连接而它是有系统缓存的因此我们一般不需要特地创建一个缓存
// 写两遍之后在发出
func WriteAppDataNoBuf(conn io.Writer, d []byte) (n int, err error) {
var h [5]byte
h[0] = 23
binary.BigEndian.PutUint16(h[1:3], tls.VersionTLS12)
binary.BigEndian.PutUint16(h[3:], uint16(len(d)))
_, err = conn.Write(h[:])
if err != nil {
return
}
return conn.Write(d)
}
func (c *FakeAppDataConn) Write(p []byte) (n int, err error) {
const maxlen = 1 << 14
@@ -131,127 +83,13 @@ func (c *FakeAppDataConn) Upstream() any {
return c.Conn
}
func (c *FakeAppDataConn) WillReadBuffersBenifit() int {
//一般而言底层连接就是tcp
if r, rr := netLayer.IsConnGoodForReadv(c.Conn); r != 0 {
c.rr = rr
}
return 1
}
// ReadBuffers一次性尽可能多读几个buf, 每个buf内部都是一个完整的appdata
func (c *FakeAppDataConn) ReadBuffers() (buffers [][]byte, err error) {
//最理想的情况是一次性读到一个大包我们根据tls包头分割出多个appdata然后放入buffers中
//但是有可能读不完整
var reader io.Reader
wholeReadLen := 0
//一次性读取
if c.rr != nil {
readv_mem := utils.Get_readvMem()
buffers, err = utils.ReadvFrom(c.rr, readv_mem)
if err != nil {
return nil, err
}
reader = utils.BuffersToMultiReader(buffers)
wholeReadLen = utils.BuffersLen(buffers)
} else {
bs := utils.GetPacket()
wholeReadLen, err = c.Conn.Read(bs)
if err != nil {
return nil, err
}
reader = bytes.NewBuffer(bs[:wholeReadLen])
}
if wholeReadLen < 5 {
return nil, errors.New("FakeAppDataConn ReadBuffers too short")
}
wholeLeftLen := wholeReadLen
if c.readRemaining > 0 {
bs := utils.GetPacket()
var n int
n, err = c.Conn.Read(bs[:c.readRemaining])
c.readRemaining -= n
buffers = append(buffers, bs[:n])
return
}
for {
if wholeLeftLen < 5 {
reader = io.MultiReader(reader, c.Conn)
var tlsHeader [5]byte
_, err = io.ReadFull(reader, tlsHeader[:])
if err != nil {
return
}
thisAppDataLen := int(binary.BigEndian.Uint16(tlsHeader[3:]))
if tlsHeader[0] != 23 {
return nil, utils.ErrInErr{ErrDesc: "unexpected TLS record type: ", Data: tlsHeader[0]}
}
thisBs := utils.GetPacket()
var n int
n, err = reader.Read(thisBs[:thisAppDataLen])
if err != nil {
return
}
buffers = append(buffers, thisBs[:n])
if diff := thisAppDataLen - n; diff > 0 {
c.readRemaining = diff
} else if diff < 0 {
return nil, utils.ErrInErr{ErrDesc: "FakeAppDataConn ReadBuffers read n>thisAppDataLen ", Data: []int{n, thisAppDataLen}}
}
return
}
var tlsHeader [5]byte
_, err = io.ReadFull(reader, tlsHeader[:])
if err != nil {
return
}
wholeLeftLen -= 5
thisAppDataLen := int(binary.BigEndian.Uint16(tlsHeader[3:]))
if tlsHeader[0] != 23 {
return nil, utils.ErrInErr{ErrDesc: "unexpected TLS record type: ", Data: tlsHeader[0]}
}
thisBs := utils.GetPacket()
var n int
n, err = reader.Read(thisBs[:thisAppDataLen])
if err != nil {
return
}
buffers = append(buffers, thisBs[:n])
if diff := thisAppDataLen - n; diff > 0 {
c.readRemaining = diff
return
} else if diff < 0 {
return nil, utils.ErrInErr{ErrDesc: "FakeAppDataConn ReadBuffers read n>thisAppDataLen ", Data: []int{n, thisAppDataLen}}
}
wholeLeftLen -= thisAppDataLen
if wholeLeftLen == 0 {
return
}
}
}
func (c *FakeAppDataConn) PutBuffers(bss [][]byte) {
for _, v := range bss {
utils.PutPacket(v)
}
func (c *FakeAppDataConn) WriteBuffers(bss [][]byte) (int64, error) {
// 在server端从direct用readv读到的数据可以用 WriteBuffers写回可以加速
allLen := utils.BuffersLen(bss)
err := WriteAppDataHeader(c.Conn, allLen)
if err != nil {
return 0, err
}
return utils.BuffersWriteTo(bss, c.Conn)
}

View File

@@ -2,6 +2,7 @@ package tlsLayer
import (
"bytes"
"crypto/tls"
"encoding/binary"
"errors"
"io"
@@ -84,3 +85,46 @@ func CopyTls12Handshake(isSrcClient bool, dst, src net.Conn) error {
}
return nil
}
func WriteAppData(conn io.Writer, buf *bytes.Buffer, d []byte) (n int, err error) {
shouldPut := false
if buf == nil {
buf = utils.GetBuf()
shouldPut = true
}
WriteAppDataHeader(buf, len(d))
buf.Write(d)
n, err = conn.Write(buf.Bytes())
if shouldPut {
utils.PutBuf(buf)
}
return
}
// 一般conn直接为tcp连接而它是有系统缓存的因此我们一般不需要特地创建一个缓存
// 写两遍之后在发出
func WriteAppDataNoBuf(w io.Writer, d []byte) (n int, err error) {
err = WriteAppDataHeader(w, len(d))
if err != nil {
return
}
return w.Write(d)
}
func WriteAppDataHeader(w io.Writer, len int) (err error) {
var h [5]byte
h[0] = 23
binary.BigEndian.PutUint16(h[1:3], tls.VersionTLS12)
binary.BigEndian.PutUint16(h[3:], uint16(len))
_, err = w.Write(h[:])
return
}

View File

@@ -251,7 +251,7 @@ func shadowCopyHandshakeClientToFake(fakeConn, clientConn net.Conn, hashW *utils
}
// 第一次写时写入一个hash其余直接使用 FakeAppDataConn
// 第一次写时写入一个hash其余直接使用 FakeAppDataConn. 实现 utils.MultiWriter
type shadowClientConn struct {
*FakeAppDataConn
sum []byte
@@ -278,3 +278,26 @@ func (c *shadowClientConn) Write(p []byte) (n int, err error) {
}
return c.FakeAppDataConn.Write(p)
}
func (c *shadowClientConn) WriteBuffers(bss [][]byte) (int64, error) {
if c.sum != nil {
sum := c.sum
c.sum = nil
result, dup := utils.MergeBuffersWithPrefix(sum, bss)
allDataLen := len(result) - len(sum)
_, err := c.FakeAppDataConn.Write(result)
if dup {
utils.PutPacket(result)
}
if err != nil {
return 0, err
}
return int64(allDataLen), nil
}
return c.FakeAppDataConn.WriteBuffers(bss)
}

View File

@@ -71,7 +71,9 @@ type SystemReadver interface {
所以websocket很有必要实现 WriteBuffers 方法.
目前实现 的有 vless/trojan 的 UserTCPConn , ws.Conn 以及 grpc 的 multiConn
目前实现 的有 vless/trojan 的 UserTCPConn , ws.Conn
WriteV本身几乎没什么加速但是因为ReadV有加速所以通过WriteV写入ReadV读到的数组才能配合ReadV加速
*/
type MultiWriter interface {
WriteBuffers([][]byte) (int64, error)
@@ -90,7 +92,7 @@ type MultiReader interface {
type BuffersReader interface {
ReadBuffers() ([][]byte, error)
//将 ReadBuffers 放回缓存,以便重复利用内存
//将 ReadBuffers 放回缓存,以便重复利用内存. 因为这里不确定这个buffers是如何获取到的, 所以由实现者自行确定
PutBuffers([][]byte)
}
@@ -205,3 +207,39 @@ func BuffersToMultiReader(bs [][]byte) io.Reader {
}
return io.MultiReader(bufs...)
}
// similar to MergeBuffers. prefix must has content
func MergeBuffersWithPrefix(prefix []byte, bs [][]byte) (result []byte, duplicate bool) {
b0 := prefix
if len(bs) == 0 {
return prefix, false
}
allLen := BuffersLen(bs) + len(prefix)
if allLen <= cap(b0) {
b0 = b0[:allLen]
cursor := len(b0)
for i := 0; i < len(bs); i++ {
cursor += copy(b0[cursor:], bs[i])
}
return b0, false
}
if allLen <= MaxPacketLen {
result = GetPacket()
} else {
result = make([]byte, allLen) //实际目前的readv实现也很难出现这种情况
// 一定要尽量避免这种情况,如果 MaxBufLen小于readv buf总长度会导致严重的内存泄漏问题,
// 见github issue #24
}
cursor := copy(result, prefix)
for i := 0; i < len(bs); i++ {
cursor += copy(result[cursor:], bs[i])
}
return result[:allLen], true
}