keepalive 成功解决 ZWP 问题 当接收端不理发送端的时候 每 idleTime 就会重发一个询问报文

This commit is contained in:
impact-eintr
2022-12-17 19:02:24 +08:00
parent cca8ccd914
commit 682299e895
7 changed files with 315 additions and 30 deletions

View File

@@ -97,6 +97,16 @@ func (conn *TcpConn) Close() {
conn.ep.Close()
}
// SetSockOpt 设置socket属性 暂时只测试keepalive
func (conn *TcpConn) SetSockOpt(opt interface{}) error {
err := conn.ep.SetSockOpt(opt)
if err != nil {
return fmt.Errorf("%s", err.String())
}
return nil
}
// Listener tcp连接监听器
type Listener struct {
raddr tcpip.FullAddress

View File

@@ -142,7 +142,7 @@ func main() {
log.Println(err)
}
log.Println("服务端 建立连接")
_ = conn
go func() {
time.Sleep(3 * time.Second)
for {
@@ -171,12 +171,22 @@ func main() {
}
log.Printf("客户端 建立连接\n")
conn.SetSockOpt(tcpip.KeepaliveEnabledOption(1))
conn.SetSockOpt(tcpip.KeepaliveIntervalOption(75 * time.Second))
conn.SetSockOpt(tcpip.KeepaliveIdleOption(30 * time.Second)) // 30s的探活心跳
conn.SetSockOpt(tcpip.KeepaliveCountOption(9))
time.Sleep(time.Second)
log.Printf("\n\n客户端 写入数据")
buf := make([]byte, 1<<20)
conn.Write(buf)
time.Sleep(5 * time.Second)
buf = make([]byte, 1<<22)
conn.Write(buf)
time.Sleep(500 * time.Minute)
conn.Close()
}()

View File

@@ -8,6 +8,7 @@ import (
"reflect"
"strings"
"sync/atomic"
"time"
)
type Error struct {
@@ -383,6 +384,32 @@ type ReceiveQueueSizeOption int
// SO_TIMESTAMP socket control messages are enabled.
type TimestampOption int
// TCPInfoOption is used by GetSockOpt to expose TCP statistics.
//
// TODO: Add and populate stat fields.
type TCPInfoOption struct {
RTT time.Duration
RTTVar time.Duration
}
// KeepaliveEnabledOption is used by SetSockOpt/GetSockOpt to specify whether
// TCP keepalive is enabled for this socket.
type KeepaliveEnabledOption int
// KeepaliveIdleOption is used by SetSockOpt/GetSockOpt to specify the time a
// connection must remain idle before the first TCP keepalive packet is sent.
// Once this time is reached, KeepaliveIntervalOption is used instead.
type KeepaliveIdleOption time.Duration
// KeepaliveIntervalOption is used by SetSockOpt/GetSockOpt to specify the
// interval between sending TCP keepalive packets.
type KeepaliveIntervalOption time.Duration
// KeepaliveCountOption is used by SetSockOpt/GetSockOpt to specify the number
// of un-ACKed TCP keepalives that will be sent before the connection is
// closed.
type KeepaliveCountOption int
// MulticastTTLOption is used by SetSockOpt/GetSockOpt to control the default
// TTL value for multicast messages. The default is 1.
type MulticastTTLOption uint8

View File

@@ -247,7 +247,6 @@ func (h *handshake) synSentState(s *segment) *tcpip.Error {
h.flags |= flagAck
h.mss = rcvSynOpts.MSS
h.sndWndScale = rcvSynOpts.WS
logger.NOTICE(atoi(h.sndWnd), atoi(h.sndWndScale), atoi(h.rcvWnd))
// If this is a SYN ACK response, we only need to acknowledge the SYN
// and the handshake is completed.
@@ -351,8 +350,6 @@ func (h *handshake) handleSegment(s *segment) *tcpip.Error {
if !s.flagIsSet(flagSyn) && h.sndWndScale > 0 {
h.sndWnd <<= uint8(h.sndWndScale) // 收紧发送窗口
logger.NOTICE("扩张发送窗口到", atoi(h.sndWnd))
} else {
logger.NOTICE("原有发送窗口与服务端的接收窗口大小相同", atoi(h.sndWnd))
}
switch h.state {
@@ -778,19 +775,12 @@ func (e *endpoint) handleSegments() *tcpip.Error {
return nil
}
func (e *endpoint) keepaliveTimerExpired() *tcpip.Error {
return nil
}
// NOTE 这里是 Nagle algorithm 的实现 也就是用来解决 发送端的 ZWP 问题
func (e *endpoint) resetKeepaliveTimer(receivedData bool) *tcpip.Error {
func (e *endpoint) keepaliveTimerExpired() *tcpip.Error {
e.keepalive.Lock()
defer e.keepalive.Unlock()
if receivedData {
e.keepalive.unacked = 0
logger.NOTICE("测试 keepalive 有数据进入")
} else {
logger.NOTICE("测试 keepalive 无数据进入")
if !e.keepalive.enabled || !e.keepalive.timer.checkExpiration() {
e.keepalive.Unlock()
return nil
}
if e.keepalive.unacked >= e.keepalive.count {
@@ -807,6 +797,28 @@ func (e *endpoint) resetKeepaliveTimer(receivedData bool) *tcpip.Error {
return nil
}
// 充值 keepalive 的时间
func (e *endpoint) resetKeepaliveTimer(receivedData bool) {
e.keepalive.Lock()
defer e.keepalive.Unlock()
if receivedData {
e.keepalive.unacked = 0
}
// Start the keepalive timer IFF it's enabled and there is no pending
// data to send.
if !e.keepalive.enabled || e.snd == nil || e.snd.sndUna != e.snd.sndNxt {
e.keepalive.timer.disable()
return
}
if e.keepalive.unacked > 0 {
e.keepalive.timer.enable(e.keepalive.interval)
} else {
e.keepalive.timer.enable(e.keepalive.idle)
}
}
// disableKeepaliveTimer stops the keepalive timer.
func (e *endpoint) disableKeepaliveTimer() {
e.keepalive.Lock()
@@ -937,6 +949,7 @@ func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error {
}
if n&notifyKeepaliveChanged != 0 {
e.resetKeepaliveTimer(true)
}
return nil

View File

@@ -59,12 +59,15 @@ type SACKInfo struct {
// keepalive is a synchronization wrapper used to appease stateify. See the
// comment in endpoint, where it is used.
//
// KeepAlive默认情况下是关闭的可以被上层应用开启和关闭
// tcp_keepalive_probes: 在tcp_keepalive_time之后没有接收到对方确认继续发送保活探测包次数默认值为9
// +stateify savable
type keepalive struct {
sync.Mutex
enabled bool
// KeepAlive的空闲时长或者说每次正常发送心跳的周期默认值为7200s2小时
idle time.Duration
// KeepAlive探测包的发送间隔默认值为75s
interval time.Duration
count int
unacked int
@@ -842,12 +845,241 @@ func (e *endpoint) zeroReceiveWindow(scale uint8) bool {
return ((e.rcvBufSize - e.rcvBufUsed) >> scale) == 0 // 接收方接收空间告急
}
// SetSockOpt sets a socket option
func (e *endpoint) SetSockOpt(opt interface{}) *tcpip.Error {
switch v := opt.(type) {
case tcpip.KeepaliveEnabledOption:
e.keepalive.Lock()
e.keepalive.enabled = v != 0
e.keepalive.Unlock()
e.notifyProtocolGoroutine(notifyKeepaliveChanged)
case tcpip.KeepaliveIdleOption:
e.keepalive.Lock()
e.keepalive.idle = time.Duration(v)
e.keepalive.Unlock()
e.notifyProtocolGoroutine(notifyKeepaliveChanged)
case tcpip.KeepaliveIntervalOption:
e.keepalive.Lock()
e.keepalive.interval = time.Duration(v)
e.keepalive.Unlock()
e.notifyProtocolGoroutine(notifyKeepaliveChanged)
case tcpip.KeepaliveCountOption:
e.keepalive.Lock()
e.keepalive.count = int(v)
e.keepalive.Unlock()
e.notifyProtocolGoroutine(notifyKeepaliveChanged)
//case tcpip.NoDelayOption:
// e.mu.Lock()
// e.noDelay = v != 0
// e.mu.Unlock()
// return nil
//case tcpip.ReuseAddressOption:
// e.mu.Lock()
// e.reuseAddr = v != 0
// e.mu.Unlock()
// return nil
//case tcpip.ReceiveBufferSizeOption:
// // Make sure the receive buffer size is within the min and max
// // allowed.
// var rs ReceiveBufferSizeOption
// size := int(v)
// if err := e.stack.TransportProtocolOption(ProtocolNumber, &rs); err == nil {
// if size < rs.Min {
// size = rs.Min
// }
// if size > rs.Max {
// size = rs.Max
// }
// }
// mask := uint32(notifyReceiveWindowChanged)
// e.rcvListMu.Lock()
// // Make sure the receive buffer size allows us to send a
// // non-zero window size.
// scale := uint8(0)
// if e.rcv != nil {
// scale = e.rcv.rcvWndScale
// }
// if size>>scale == 0 {
// size = 1 << scale
// }
// // Make sure 2*size doesn't overflow.
// if size > math.MaxInt32/2 {
// size = math.MaxInt32 / 2
// }
// wasZero := e.zeroReceiveWindow(scale)
// e.rcvBufSize = size
// if wasZero && !e.zeroReceiveWindow(scale) {
// mask |= notifyNonZeroReceiveWindow
// }
// e.rcvListMu.Unlock()
// e.segmentQueue.setLimit(2 * size)
// e.notifyProtocolGoroutine(mask)
// return nil
//case tcpip.SendBufferSizeOption:
// // Make sure the send buffer size is within the min and max
// // allowed.
// size := int(v)
// var ss SendBufferSizeOption
// if err := e.stack.TransportProtocolOption(ProtocolNumber, &ss); err == nil {
// if size < ss.Min {
// size = ss.Min
// }
// if size > ss.Max {
// size = ss.Max
// }
// }
// e.sndBufMu.Lock()
// e.sndBufSize = size
// e.sndBufMu.Unlock()
// return nil
//case tcpip.V6OnlyOption:
// // We only recognize this option on v6 endpoints.
// if e.netProto != header.IPv6ProtocolNumber {
// return tcpip.ErrInvalidEndpointState
// }
// e.mu.Lock()
// defer e.mu.Unlock()
// // We only allow this to be set when we're in the initial state.
// if e.state != stateInitial {
// return tcpip.ErrInvalidEndpointState
// }
// e.v6only = v != 0
}
return nil
}
func (e *endpoint) GetSockOpt(opt interface{}) *tcpip.Error {
switch o := opt.(type) {
case tcpip.ErrorOption:
e.lastErrorMu.Lock()
err := e.lastError
e.lastError = nil
e.lastErrorMu.Unlock()
return err
case *tcpip.SendBufferSizeOption:
e.sndBufMu.Lock()
*o = tcpip.SendBufferSizeOption(e.sndBufSize)
e.sndBufMu.Unlock()
return nil
case *tcpip.ReceiveBufferSizeOption:
e.rcvListMu.Lock()
*o = tcpip.ReceiveBufferSizeOption(e.rcvBufSize)
e.rcvListMu.Unlock()
return nil
//case *tcpip.ReceiveQueueSizeOption:
// v, err := e.readyReceiveSize()
// if err != nil {
// return err
// }
// *o = tcpip.ReceiveQueueSizeOption(v)
// return nil
//case *tcpip.NoDelayOption:
// e.mu.RLock()
// v := e.noDelay
// e.mu.RUnlock()
// *o = 0
// if v {
// *o = 1
// }
// return nil
//case *tcpip.ReuseAddressOption:
// e.mu.RLock()
// v := e.reuseAddr
// e.mu.RUnlock()
// *o = 0
// if v {
// *o = 1
// }
// return nil
case *tcpip.V6OnlyOption:
// We only recognize this option on v6 endpoints.
if e.netProto != header.IPv6ProtocolNumber {
return tcpip.ErrUnknownProtocolOption
}
e.mu.Lock()
v := e.v6only
e.mu.Unlock()
*o = 0
if v {
*o = 1
}
return nil
case *tcpip.TCPInfoOption:
*o = tcpip.TCPInfoOption{}
e.mu.RLock()
snd := e.snd
e.mu.RUnlock()
if snd != nil {
snd.rtt.Lock()
o.RTT = snd.rtt.srtt
o.RTTVar = snd.rtt.rttvar
snd.rtt.Unlock()
}
return nil
case *tcpip.KeepaliveEnabledOption:
e.keepalive.Lock()
v := e.keepalive.enabled
e.keepalive.Unlock()
*o = 0
if v {
*o = 1
}
case *tcpip.KeepaliveIdleOption:
e.keepalive.Lock()
*o = tcpip.KeepaliveIdleOption(e.keepalive.idle)
e.keepalive.Unlock()
case *tcpip.KeepaliveIntervalOption:
e.keepalive.Lock()
*o = tcpip.KeepaliveIntervalOption(e.keepalive.interval)
e.keepalive.Unlock()
case *tcpip.KeepaliveCountOption:
e.keepalive.Lock()
*o = tcpip.KeepaliveCountOption(e.keepalive.count)
e.keepalive.Unlock()
}
return tcpip.ErrUnknownProtocolOption
}
func (e *endpoint) HandlePacket(r *stack.Route, id stack.TransportEndpointID, vv buffer.VectorisedView) {

View File

@@ -70,10 +70,7 @@ func (r *receiver) nonZeroWindow() {
// don't need to immediately announce a nonzero one.
return
}
logger.NOTICE("探测到 0 窗口")
// FIXME 揭开注释
//time.Sleep(100 * time.Second)
//r.ep.snd.sendAck()
r.ep.snd.sendAck()
}
func (r *receiver) consumeSegment(s *segment, segSeq seqnum.Value, segLen seqnum.Size) bool {

View File

@@ -263,9 +263,6 @@ func (s *sender) updateMaxPayloadSize(mtu, count int) {
}
func (s *sender) sendAck() {
if s.ep.id.LocalPort == 9999 {
logger.NOTICE("之前的数据已经确认过了 服务端更新自己的确认边界", atoi(s.ep.rcv.rcvNxt))
}
s.sendSegment(buffer.VectorisedView{}, flagAck, s.sndNxt) // seq = cookies+1 ack ack|fin.seq+1
}
@@ -347,13 +344,12 @@ func (s *sender) sendSegment(data buffer.VectorisedView, flags byte, seq seqnum.
rcvNxt, rcvWnd := s.ep.rcv.getSendParams()
// Remember the max sent ack.
old := s.maxSentAck
s.maxSentAck = rcvNxt
if s.ep.id.LocalPort == 9999 {
fmt.Println()
log.Println("服务端要求客户端扩展窗口到", rcvWnd, "更新发送端的边缘", old, " TO ", s.maxSentAck)
fmt.Println()
//fmt.Println()
//log.Println("服务端要求客户端扩展窗口到", rcvWnd, "更新发送端的边缘", old, " TO ", s.maxSentAck)
//fmt.Println()
}
return s.ep.sendRaw(data, flags, seq, rcvNxt, rcvWnd)
}