diff --git a/tcpip/transport/tcp/connect.go b/tcpip/transport/tcp/connect.go index 1eb46a7..6864608 100644 --- a/tcpip/transport/tcp/connect.go +++ b/tcpip/transport/tcp/connect.go @@ -772,9 +772,48 @@ func (e *endpoint) handleSegments() *tcpip.Error { e.snd.sendAck() } + // TODO keepalive 解决 ZWP 问题 + e.resetKeepaliveTimer(true) + return nil } +func (e *endpoint) keepaliveTimerExpired() *tcpip.Error { + return nil +} + +// NOTE 这里是 Nagle algorithm 的实现 也就是用来解决 发送端的 ZWP 问题 +func (e *endpoint) resetKeepaliveTimer(receivedData bool) *tcpip.Error { + e.keepalive.Lock() + defer e.keepalive.Unlock() + if receivedData { + e.keepalive.unacked = 0 + logger.NOTICE("测试 keepalive 有数据进入") + } else { + logger.NOTICE("测试 keepalive 无数据进入") + } + + if e.keepalive.unacked >= e.keepalive.count { + e.keepalive.Unlock() + return tcpip.ErrConnectionReset + } + + // RFC1122 4.2.3.6: TCP keepalive is a dataless ACK with + // seg.seq = snd.nxt-1. + e.keepalive.unacked++ + e.keepalive.Unlock() + e.snd.sendSegment(buffer.VectorisedView{}, flagAck, e.snd.sndNxt-1) + e.resetKeepaliveTimer(false) + return nil +} + +// disableKeepaliveTimer stops the keepalive timer. +func (e *endpoint) disableKeepaliveTimer() { + e.keepalive.Lock() + e.keepalive.timer.disable() + e.keepalive.Unlock() +} + // protocolMainLoop 是TCP协议的主循环。它在自己的goroutine中运行,负责握手、发送段和处理收到的段 func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error { @@ -826,6 +865,9 @@ func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error { e.rcvListMu.Unlock() } + e.keepalive.timer.init(&e.keepalive.waker) + defer e.keepalive.timer.cleanup() + e.mu.Lock() e.state = stateConnected // TODO drained @@ -863,6 +905,10 @@ func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error { return nil }, }, + { + w: &e.keepalive.waker, + f: e.keepaliveTimerExpired, + }, { // NOTE 这段代码的设计值得借鉴 w: &e.notificationWaker, diff --git a/tcpip/transport/tcp/endpoint.go b/tcpip/transport/tcp/endpoint.go index 170e0a5..e6a04a1 100644 --- a/tcpip/transport/tcp/endpoint.go +++ b/tcpip/transport/tcp/endpoint.go @@ -57,6 +57,21 @@ type SACKInfo struct { NumBlocks int } +// keepalive is a synchronization wrapper used to appease stateify. See the +// comment in endpoint, where it is used. +// +// +stateify savable +type keepalive struct { + sync.Mutex + enabled bool + idle time.Duration + interval time.Duration + count int + unacked int + timer timer + waker sleep.Waker +} + // endpoint 表示TCP端点。该结构用作端点用户和协议实现之间的接口;让并发goroutine调用端点是合法的, // 它们是正确同步的。然而,协议实现在单个goroutine中运行。 type endpoint struct { @@ -170,6 +185,8 @@ type endpoint struct { // read by Accept() calls. acceptedChan chan *endpoint + keepalive keepalive + // The following are only used from the protocol goroutine, and // therefore don't need locks to protect them. rcv *receiver @@ -192,6 +209,12 @@ func newEndpoint(stack *stack.Stack, netProto tcpip.NetworkProtocolNumber, waite rcvBufSize: DefaultBufferSize, sndBufSize: DefaultBufferSize, sndMTU: int(math.MaxInt32), + keepalive: keepalive{ + // Linux defaults. + idle: 2 * time.Hour, + interval: 75 * time.Second, + count: 9, + }, } var ss SendBufferSizeOption diff --git a/tcpip/transport/tcp/rcv.go b/tcpip/transport/tcp/rcv.go index c46d96c..c2c89ca 100644 --- a/tcpip/transport/tcp/rcv.go +++ b/tcpip/transport/tcp/rcv.go @@ -71,6 +71,7 @@ func (r *receiver) nonZeroWindow() { return } logger.NOTICE("探测到 0 窗口") + // FIXME 揭开注释 //time.Sleep(100 * time.Second) //r.ep.snd.sendAck() } diff --git a/tcpip/transport/tcp/snd.go b/tcpip/transport/tcp/snd.go index e15e43b..70c6473 100644 --- a/tcpip/transport/tcp/snd.go +++ b/tcpip/transport/tcp/snd.go @@ -537,12 +537,11 @@ func (s *sender) sendData() { // NOTE 开启计时器 如果在RTO后没有回信(snd.handleRecvdSegment 中有数据可以处理) 那么将会重发 // 在 s.resendTimer.init() 中 将会调用 Assert() 唤醒重发函数 retransmitTimerExpired() s.resendTimer.enable(s.rto) - logger.NOTICE("没数据 所以开启一个定时器") } // TODO KEEPALIVE if s.sndUna == s.sndNxt { - //log.Fatal("注意测试", s.sndWnd) + s.ep.resetKeepaliveTimer(false) } time.Sleep(20 * time.Millisecond)