重构readv代码,将MultiReader分为 BuffersReader和Readver两种

Readver是因为可以暴露出底层连接所以能加速;而BuffersReader是
因为协议对于多buf的读取支持良好所以可以加速,逻辑完全不同
This commit is contained in:
e1732a364fed
2000-01-01 00:00:00 +00:00
parent e92faf7f9f
commit a704f77eb4
7 changed files with 133 additions and 73 deletions

View File

@@ -21,17 +21,18 @@ var (
UseReadv bool
)
// if r!=0, then it means c can be used in readv. 1 means syscall.RawConn, 2 means utils.MultiReader
func IsConnGoodForReadv(c net.Conn) (r int, rawReadConn syscall.RawConn, mr utils.MultiReader) {
// if r!=0, then it means c can be used in readv. -1 means syscall.RawConn,1 means utils.BuffersReader, 2 means utils.Readver
func IsConnGoodForReadv(c net.Conn) (r int, rawReadConn syscall.RawConn) {
rawReadConn = GetRawConn(c)
var ok bool
if rawReadConn != nil {
r = 1
r = -1
return
} else if mr, ok = c.(utils.MultiReader); ok {
r = 2
} else if mr, ok := c.(utils.MultiReader); ok {
r = mr.WillReadBuffersBenifit()
return
}
return
}

View File

@@ -34,9 +34,12 @@ func TryCopy(writeConn io.Writer, readConn io.Reader, identity uint32) (allnum i
var isWriteConn_MultiWriter bool
var mr utils.MultiReader
var br utils.BuffersReader
var readv_withMultiReader bool
readvType := 0
if ce := utils.CanLogDebug("TryCopy"); ce != nil {
ce.Write(
zap.Uint32("id", identity),
@@ -110,8 +113,12 @@ func TryCopy(writeConn io.Writer, readConn io.Reader, identity uint32) (allnum i
if rawReadConn == nil {
var ok bool
mr, ok = readConn.(utils.MultiReader)
if ok && mr.WillReadBuffersBenifit() {
readv_withMultiReader = true
if ok {
readvType = mr.WillReadBuffersBenifit()
if readvType != 0 {
readv_withMultiReader = true
}
} else {
goto classic
@@ -144,6 +151,34 @@ func TryCopy(writeConn io.Writer, readConn io.Reader, identity uint32) (allnum i
if !readv_withMultiReader {
readv_mem = utils.Get_readvMem()
defer utils.Put_readvMem(readv_mem)
} else {
//循环读写直到 CanMultiRead 返回 true
bs := utils.GetPacket()
for {
if mr.CanMultiRead() {
break
}
var n int
n, err = readConn.Read(bs)
if err != nil {
return
}
n, err = writeConn.Write(bs[:n])
allnum += int64(n)
if err != nil {
return
}
}
utils.PutPacket(bs)
if readvType == 2 {
readv_withMultiReader = false
rawReadConn = readConn.(utils.Readver).GetRawForReadv()
} else {
br = readConn.(utils.BuffersReader)
}
}
//这个for循环 只会通过 return 跳出, 不会落到外部
@@ -151,7 +186,7 @@ func TryCopy(writeConn io.Writer, readConn io.Reader, identity uint32) (allnum i
var buffers net.Buffers
if readv_withMultiReader {
buffers, err = mr.ReadBuffers()
buffers, err = br.ReadBuffers()
} else {
buffers, err = utils.ReadvFrom(rawReadConn, readv_mem)

View File

@@ -156,9 +156,12 @@ func (c *Client) Handshake(underlay net.Conn, firstPayload []byte, target netLay
version: c.version,
underlayIsBasic: netLayer.IsBasicConn(underlay),
}
if r, rr, mr := netLayer.IsConnGoodForReadv(underlay); r > 0 {
if r, rr := netLayer.IsConnGoodForReadv(underlay); r != 0 {
uc.rr = rr
uc.mr = mr
uc.readvType = r
if r == 1 {
uc.br = underlay.(utils.BuffersReader)
}
}
return uc, nil

View File

@@ -294,9 +294,17 @@ realPart:
isServerEnd: true,
}
if r, rr, mr := netLayer.IsConnGoodForReadv(underlay); r > 0 {
if r, rr := netLayer.IsConnGoodForReadv(underlay); r != 0 {
//一般而言,裸奔时, readvType == -1
// 使用普通tls时 readvType ==0
//使用 shadowTls 时, readvType == 1
uc.rr = rr
uc.mr = mr
uc.readvType = r
if r == 1 {
uc.br = underlay.(utils.BuffersReader)
}
}
return uc, nil, targetAddr, nil

View File

@@ -28,8 +28,10 @@ type UserTCPConn struct {
isntFirstPacket bool //for v0
rr syscall.RawConn //用于 Readbuffers
mr utils.MultiReader //用于 Readbuffers
rr syscall.RawConn //用于 Readbuffers
readvType int
br utils.BuffersReader
}
func (u *UserTCPConn) GetProtocolVersion() int {
@@ -61,20 +63,27 @@ func (c *UserTCPConn) EverPossibleToSpliceRead() bool {
return false
}
func (c *UserTCPConn) CanSpliceRead() (bool, *net.TCPConn, *net.UnixConn) {
func (c *UserTCPConn) noHandshakeShit() bool {
if c.isServerEnd {
if c.remainFirstBufLen > 0 {
return false, nil, nil
return false
}
} else if c.version == 0 {
if !c.isntFirstPacket {
return false, nil, nil
return false
}
}
return true
}
return netLayer.ReturnSpliceRead(c.Conn)
func (c *UserTCPConn) CanSpliceRead() (bool, *net.TCPConn, *net.UnixConn) {
if c.noHandshakeShit() {
return netLayer.ReturnSpliceRead(c.Conn)
}
return false, nil, nil
}
func (c *UserTCPConn) EverPossibleToSpliceWrite() bool {
@@ -105,10 +114,6 @@ func (c *UserTCPConn) CanSpliceWrite() (r bool, conn *net.TCPConn) {
return
}
func (c *UserTCPConn) WillReadBuffersBenifit() bool {
return c.rr != nil || c.mr != nil
}
func (c *UserTCPConn) WriteBuffers(buffers [][]byte) (int64, error) {
if c.canDirectWrite() {
@@ -252,47 +257,42 @@ func (c *UserTCPConn) Read(p []byte) (int, error) {
}
}
func (c *UserTCPConn) ReadBuffers() (bs [][]byte, err error) {
if !c.isServerEnd {
if c.version == 0 && !c.isntFirstPacket {
c.isntFirstPacket = true
packet := utils.GetPacket()
var n int
n, err = c.Read(packet)
if err != nil {
utils.PutPacket(packet)
return
}
if n < 2 {
utils.PutPacket(packet)
return nil, errors.New("vless response head too short")
}
bs = append(bs, packet[2:n])
return
} else {
return utils.ReadBuffersFrom(c.Conn, c.rr, c.mr)
}
} else {
if c.remainFirstBufLen > 0 { //firstPayload 已经被最开始的main.go 中的 Read读掉了所以 在调用 ReadBuffers 时 c.remainFirstBufLen 一般为 0, 所以一般不会调用这里
bs, err = utils.ReadBuffersFrom(c.optionalReader, nil, nil)
c.remainFirstBufLen -= utils.BuffersLen(bs)
return
} else {
return utils.ReadBuffersFrom(c.Conn, c.rr, c.mr)
}
func (c *UserTCPConn) WillReadBuffersBenifit() int {
if c.readvType == -1 || c.readvType == 2 { //底层连接为原始链接或者Readver时, 我们用Readv
return 2
}
if c.readvType == 1 { //底层连接为 BuffersReader 时,我们用 BuffersReader
return 1
}
return 0
}
func (c *UserTCPConn) CanMultiRead() bool {
switch c.readvType {
case -1:
return c.noHandshakeShit()
case 1, 2:
if c.noHandshakeShit() {
if c.Conn.(utils.MultiReader).CanMultiRead() {
return true
}
}
}
return false
}
func (c *UserTCPConn) GetRawForReadv() syscall.RawConn {
if c.rr != nil {
return c.rr
} else {
return c.Conn.(utils.Readver).GetRawForReadv()
}
}
func (c *UserTCPConn) ReadBuffers() (bs [][]byte, err error) {
return c.br.ReadBuffers()
}

View File

@@ -3,6 +3,7 @@ package utils
import (
"io"
"log"
"syscall"
)
// SystemReadver 是平台相关的 用于 调用readv的 工具.
@@ -45,17 +46,25 @@ type MultiWriter interface {
WriteBuffers([][]byte) (int64, error)
}
// 专门用于 grpc multiMode 的情况.
// 因为其他协议并没有能够加速的情形。
type MultiReader interface {
ReadBuffers() ([][]byte, error)
//在底层没有实现readbuffers时显然调用 ReadBuffers没有什么意义。
//在底层没有实现readbuffers时, 或者协议设计中并没有涉及多buf, 则显然调用 ReadBuffers没有什么意义。
//所以我们通过 WillReadBuffersBenifit 方法 查询 调用是否有益。
WillReadBuffersBenifit() bool
//如果int为1说明它是一个 BuffersReader 如果int为2说明它是一个 Readver
WillReadBuffersBenifit() int
CanMultiRead() bool //一个协议的握手阶段可能需要一些操作后才能真正执行MultiRead
}
//获取所有子[]byte 长度总和
type BuffersReader interface {
ReadBuffers() ([][]byte, error)
}
type Readver interface {
GetRawForReadv() syscall.RawConn
}
// 获取所有子[]byte 长度总和
func BuffersLen(bs [][]byte) (allnum int) {
if len(bs) < 1 {
return 0
@@ -72,7 +81,7 @@ func PrintBuffers(bs [][]byte) {
}
}
//削减buffer内部的子[]byte 到合适的长度;返回削减后 bs应有的长度.
// 削减buffer内部的子[]byte 到合适的长度;返回削减后 bs应有的长度.
func ShrinkBuffers(bs [][]byte, all_len int, SingleBufLen int) int {
curIndex := 0
for curIndex < len(bs) {
@@ -90,7 +99,7 @@ func ShrinkBuffers(bs [][]byte, all_len int, SingleBufLen int) int {
return curIndex
}
//通过reslice 方式将 bs的长度以及 子 []byte 的长度 恢复至指定长度
// 通过reslice 方式将 bs的长度以及 子 []byte 的长度 恢复至指定长度
func RecoverBuffers(bs [][]byte, oldLen, old_sub_len int) [][]byte {
bs = bs[:oldLen]
for i, v := range bs {
@@ -113,10 +122,12 @@ func BuffersWriteTo(bs [][]byte, writeConn io.Writer) (num int64, err2 error) {
}
// 如果 分配了新内存来 包含数据,则 duplicate ==true, 此时可以用PutPacket函数放回;
// 如果利用了原有的第一个[]byte, 则 duplicate==false。
//
// 如果利用了原有的第一个[]byte, 则 duplicate==false。
//
// 如果 duplicate==false, 不要 使用 PutPacket等方法放入Pool
// 因为 在更上级的调用会试图去把 整个bs 放入pool;
//
// 因为 在更上级的调用会试图去把 整个bs 放入pool;
func MergeBuffers(bs [][]byte) (result []byte, duplicate bool) {
if len(bs) < 1 {
return

View File

@@ -111,6 +111,7 @@ func ReadvFrom(rawReadConn syscall.RawConn, rm *ReadvMem) ([][]byte, error) {
return allocedBuffers[:nBuf], nil
}
/*
// 依次试图使用 readv、ReadBuffers 以及 原始 Read 读取数据
func ReadBuffersFrom(c io.Reader, rawReadConn syscall.RawConn, mr MultiReader) (buffers [][]byte, err error) {
@@ -133,3 +134,4 @@ func ReadBuffersFrom(c io.Reader, rawReadConn syscall.RawConn, mr MultiReader) (
}
return
}
*/