diff --git a/cmd/netstack/TcpConn.go b/cmd/netstack/TcpConn.go index 87677e8..6984be7 100644 --- a/cmd/netstack/TcpConn.go +++ b/cmd/netstack/TcpConn.go @@ -97,6 +97,16 @@ func (conn *TcpConn) Close() { conn.ep.Close() } +// SetSockOpt 设置socket属性 暂时只测试keepalive +func (conn *TcpConn) SetSockOpt(opt interface{}) error { + err := conn.ep.SetSockOpt(opt) + if err != nil { + return fmt.Errorf("%s", err.String()) + } + + return nil +} + // Listener tcp连接监听器 type Listener struct { raddr tcpip.FullAddress diff --git a/cmd/netstack/main.go b/cmd/netstack/main.go index 54d1019..a0b547a 100644 --- a/cmd/netstack/main.go +++ b/cmd/netstack/main.go @@ -142,7 +142,7 @@ func main() { log.Println(err) } log.Println("服务端 建立连接") - _ = conn + go func() { time.Sleep(3 * time.Second) for { @@ -171,12 +171,22 @@ func main() { } log.Printf("客户端 建立连接\n") + conn.SetSockOpt(tcpip.KeepaliveEnabledOption(1)) + conn.SetSockOpt(tcpip.KeepaliveIntervalOption(75 * time.Second)) + conn.SetSockOpt(tcpip.KeepaliveIdleOption(30 * time.Second)) // 30s的探活心跳 + conn.SetSockOpt(tcpip.KeepaliveCountOption(9)) + time.Sleep(time.Second) + log.Printf("\n\n客户端 写入数据") buf := make([]byte, 1<<20) conn.Write(buf) + time.Sleep(5 * time.Second) + + buf = make([]byte, 1<<22) conn.Write(buf) + time.Sleep(500 * time.Minute) conn.Close() }() diff --git a/tcpip/tcpip.go b/tcpip/tcpip.go index d7b51a9..692e3f0 100644 --- a/tcpip/tcpip.go +++ b/tcpip/tcpip.go @@ -8,6 +8,7 @@ import ( "reflect" "strings" "sync/atomic" + "time" ) type Error struct { @@ -383,6 +384,32 @@ type ReceiveQueueSizeOption int // SO_TIMESTAMP socket control messages are enabled. type TimestampOption int +// TCPInfoOption is used by GetSockOpt to expose TCP statistics. +// +// TODO: Add and populate stat fields. +type TCPInfoOption struct { + RTT time.Duration + RTTVar time.Duration +} + +// KeepaliveEnabledOption is used by SetSockOpt/GetSockOpt to specify whether +// TCP keepalive is enabled for this socket. +type KeepaliveEnabledOption int + +// KeepaliveIdleOption is used by SetSockOpt/GetSockOpt to specify the time a +// connection must remain idle before the first TCP keepalive packet is sent. +// Once this time is reached, KeepaliveIntervalOption is used instead. +type KeepaliveIdleOption time.Duration + +// KeepaliveIntervalOption is used by SetSockOpt/GetSockOpt to specify the +// interval between sending TCP keepalive packets. +type KeepaliveIntervalOption time.Duration + +// KeepaliveCountOption is used by SetSockOpt/GetSockOpt to specify the number +// of un-ACKed TCP keepalives that will be sent before the connection is +// closed. +type KeepaliveCountOption int + // MulticastTTLOption is used by SetSockOpt/GetSockOpt to control the default // TTL value for multicast messages. The default is 1. type MulticastTTLOption uint8 diff --git a/tcpip/transport/tcp/connect.go b/tcpip/transport/tcp/connect.go index 6864608..700ee1b 100644 --- a/tcpip/transport/tcp/connect.go +++ b/tcpip/transport/tcp/connect.go @@ -247,7 +247,6 @@ func (h *handshake) synSentState(s *segment) *tcpip.Error { h.flags |= flagAck h.mss = rcvSynOpts.MSS h.sndWndScale = rcvSynOpts.WS - logger.NOTICE(atoi(h.sndWnd), atoi(h.sndWndScale), atoi(h.rcvWnd)) // If this is a SYN ACK response, we only need to acknowledge the SYN // and the handshake is completed. @@ -351,8 +350,6 @@ func (h *handshake) handleSegment(s *segment) *tcpip.Error { if !s.flagIsSet(flagSyn) && h.sndWndScale > 0 { h.sndWnd <<= uint8(h.sndWndScale) // 收紧发送窗口 logger.NOTICE("扩张发送窗口到", atoi(h.sndWnd)) - } else { - logger.NOTICE("原有发送窗口与服务端的接收窗口大小相同", atoi(h.sndWnd)) } switch h.state { @@ -778,19 +775,12 @@ func (e *endpoint) handleSegments() *tcpip.Error { return nil } -func (e *endpoint) keepaliveTimerExpired() *tcpip.Error { - return nil -} - // NOTE 这里是 Nagle algorithm 的实现 也就是用来解决 发送端的 ZWP 问题 -func (e *endpoint) resetKeepaliveTimer(receivedData bool) *tcpip.Error { +func (e *endpoint) keepaliveTimerExpired() *tcpip.Error { e.keepalive.Lock() - defer e.keepalive.Unlock() - if receivedData { - e.keepalive.unacked = 0 - logger.NOTICE("测试 keepalive 有数据进入") - } else { - logger.NOTICE("测试 keepalive 无数据进入") + if !e.keepalive.enabled || !e.keepalive.timer.checkExpiration() { + e.keepalive.Unlock() + return nil } if e.keepalive.unacked >= e.keepalive.count { @@ -807,6 +797,28 @@ func (e *endpoint) resetKeepaliveTimer(receivedData bool) *tcpip.Error { return nil } +// 充值 keepalive 的时间 +func (e *endpoint) resetKeepaliveTimer(receivedData bool) { + e.keepalive.Lock() + defer e.keepalive.Unlock() + if receivedData { + e.keepalive.unacked = 0 + } + + // Start the keepalive timer IFF it's enabled and there is no pending + // data to send. + if !e.keepalive.enabled || e.snd == nil || e.snd.sndUna != e.snd.sndNxt { + e.keepalive.timer.disable() + return + } + + if e.keepalive.unacked > 0 { + e.keepalive.timer.enable(e.keepalive.interval) + } else { + e.keepalive.timer.enable(e.keepalive.idle) + } +} + // disableKeepaliveTimer stops the keepalive timer. func (e *endpoint) disableKeepaliveTimer() { e.keepalive.Lock() @@ -937,6 +949,7 @@ func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error { } if n¬ifyKeepaliveChanged != 0 { + e.resetKeepaliveTimer(true) } return nil diff --git a/tcpip/transport/tcp/endpoint.go b/tcpip/transport/tcp/endpoint.go index e6a04a1..d2f5c1b 100644 --- a/tcpip/transport/tcp/endpoint.go +++ b/tcpip/transport/tcp/endpoint.go @@ -59,12 +59,15 @@ type SACKInfo struct { // keepalive is a synchronization wrapper used to appease stateify. See the // comment in endpoint, where it is used. -// +// KeepAlive默认情况下是关闭的,可以被上层应用开启和关闭 +// tcp_keepalive_probes: 在tcp_keepalive_time之后,没有接收到对方确认,继续发送保活探测包次数,默认值为9(次) // +stateify savable type keepalive struct { sync.Mutex - enabled bool - idle time.Duration + enabled bool + // KeepAlive的空闲时长,或者说每次正常发送心跳的周期,默认值为7200s(2小时) + idle time.Duration + // KeepAlive探测包的发送间隔,默认值为75s interval time.Duration count int unacked int @@ -842,12 +845,241 @@ func (e *endpoint) zeroReceiveWindow(scale uint8) bool { return ((e.rcvBufSize - e.rcvBufUsed) >> scale) == 0 // 接收方接收空间告急 } +// SetSockOpt sets a socket option func (e *endpoint) SetSockOpt(opt interface{}) *tcpip.Error { + switch v := opt.(type) { + case tcpip.KeepaliveEnabledOption: + e.keepalive.Lock() + e.keepalive.enabled = v != 0 + e.keepalive.Unlock() + e.notifyProtocolGoroutine(notifyKeepaliveChanged) + + case tcpip.KeepaliveIdleOption: + e.keepalive.Lock() + e.keepalive.idle = time.Duration(v) + e.keepalive.Unlock() + e.notifyProtocolGoroutine(notifyKeepaliveChanged) + + case tcpip.KeepaliveIntervalOption: + e.keepalive.Lock() + e.keepalive.interval = time.Duration(v) + e.keepalive.Unlock() + e.notifyProtocolGoroutine(notifyKeepaliveChanged) + + case tcpip.KeepaliveCountOption: + e.keepalive.Lock() + e.keepalive.count = int(v) + e.keepalive.Unlock() + e.notifyProtocolGoroutine(notifyKeepaliveChanged) + + //case tcpip.NoDelayOption: + // e.mu.Lock() + // e.noDelay = v != 0 + // e.mu.Unlock() + // return nil + + //case tcpip.ReuseAddressOption: + // e.mu.Lock() + // e.reuseAddr = v != 0 + // e.mu.Unlock() + // return nil + + //case tcpip.ReceiveBufferSizeOption: + // // Make sure the receive buffer size is within the min and max + // // allowed. + // var rs ReceiveBufferSizeOption + // size := int(v) + // if err := e.stack.TransportProtocolOption(ProtocolNumber, &rs); err == nil { + // if size < rs.Min { + // size = rs.Min + // } + // if size > rs.Max { + // size = rs.Max + // } + // } + + // mask := uint32(notifyReceiveWindowChanged) + + // e.rcvListMu.Lock() + + // // Make sure the receive buffer size allows us to send a + // // non-zero window size. + // scale := uint8(0) + // if e.rcv != nil { + // scale = e.rcv.rcvWndScale + // } + // if size>>scale == 0 { + // size = 1 << scale + // } + + // // Make sure 2*size doesn't overflow. + // if size > math.MaxInt32/2 { + // size = math.MaxInt32 / 2 + // } + + // wasZero := e.zeroReceiveWindow(scale) + // e.rcvBufSize = size + // if wasZero && !e.zeroReceiveWindow(scale) { + // mask |= notifyNonZeroReceiveWindow + // } + // e.rcvListMu.Unlock() + + // e.segmentQueue.setLimit(2 * size) + + // e.notifyProtocolGoroutine(mask) + // return nil + + //case tcpip.SendBufferSizeOption: + // // Make sure the send buffer size is within the min and max + // // allowed. + // size := int(v) + // var ss SendBufferSizeOption + // if err := e.stack.TransportProtocolOption(ProtocolNumber, &ss); err == nil { + // if size < ss.Min { + // size = ss.Min + // } + // if size > ss.Max { + // size = ss.Max + // } + // } + + // e.sndBufMu.Lock() + // e.sndBufSize = size + // e.sndBufMu.Unlock() + + // return nil + + //case tcpip.V6OnlyOption: + // // We only recognize this option on v6 endpoints. + // if e.netProto != header.IPv6ProtocolNumber { + // return tcpip.ErrInvalidEndpointState + // } + + // e.mu.Lock() + // defer e.mu.Unlock() + + // // We only allow this to be set when we're in the initial state. + // if e.state != stateInitial { + // return tcpip.ErrInvalidEndpointState + // } + + // e.v6only = v != 0 + + } + return nil } func (e *endpoint) GetSockOpt(opt interface{}) *tcpip.Error { - return nil + switch o := opt.(type) { + case tcpip.ErrorOption: + e.lastErrorMu.Lock() + err := e.lastError + e.lastError = nil + e.lastErrorMu.Unlock() + return err + + case *tcpip.SendBufferSizeOption: + e.sndBufMu.Lock() + *o = tcpip.SendBufferSizeOption(e.sndBufSize) + e.sndBufMu.Unlock() + return nil + + case *tcpip.ReceiveBufferSizeOption: + e.rcvListMu.Lock() + *o = tcpip.ReceiveBufferSizeOption(e.rcvBufSize) + e.rcvListMu.Unlock() + return nil + + //case *tcpip.ReceiveQueueSizeOption: + // v, err := e.readyReceiveSize() + // if err != nil { + // return err + // } + + // *o = tcpip.ReceiveQueueSizeOption(v) + // return nil + + //case *tcpip.NoDelayOption: + // e.mu.RLock() + // v := e.noDelay + // e.mu.RUnlock() + + // *o = 0 + // if v { + // *o = 1 + // } + // return nil + + //case *tcpip.ReuseAddressOption: + // e.mu.RLock() + // v := e.reuseAddr + // e.mu.RUnlock() + + // *o = 0 + // if v { + // *o = 1 + // } + // return nil + + case *tcpip.V6OnlyOption: + // We only recognize this option on v6 endpoints. + if e.netProto != header.IPv6ProtocolNumber { + return tcpip.ErrUnknownProtocolOption + } + + e.mu.Lock() + v := e.v6only + e.mu.Unlock() + + *o = 0 + if v { + *o = 1 + } + return nil + + case *tcpip.TCPInfoOption: + *o = tcpip.TCPInfoOption{} + e.mu.RLock() + snd := e.snd + e.mu.RUnlock() + if snd != nil { + snd.rtt.Lock() + o.RTT = snd.rtt.srtt + o.RTTVar = snd.rtt.rttvar + snd.rtt.Unlock() + } + + return nil + + case *tcpip.KeepaliveEnabledOption: + e.keepalive.Lock() + v := e.keepalive.enabled + e.keepalive.Unlock() + + *o = 0 + if v { + *o = 1 + } + + case *tcpip.KeepaliveIdleOption: + e.keepalive.Lock() + *o = tcpip.KeepaliveIdleOption(e.keepalive.idle) + e.keepalive.Unlock() + + case *tcpip.KeepaliveIntervalOption: + e.keepalive.Lock() + *o = tcpip.KeepaliveIntervalOption(e.keepalive.interval) + e.keepalive.Unlock() + + case *tcpip.KeepaliveCountOption: + e.keepalive.Lock() + *o = tcpip.KeepaliveCountOption(e.keepalive.count) + e.keepalive.Unlock() + + } + + return tcpip.ErrUnknownProtocolOption } func (e *endpoint) HandlePacket(r *stack.Route, id stack.TransportEndpointID, vv buffer.VectorisedView) { diff --git a/tcpip/transport/tcp/rcv.go b/tcpip/transport/tcp/rcv.go index c2c89ca..be66732 100644 --- a/tcpip/transport/tcp/rcv.go +++ b/tcpip/transport/tcp/rcv.go @@ -70,10 +70,7 @@ func (r *receiver) nonZeroWindow() { // don't need to immediately announce a nonzero one. return } - logger.NOTICE("探测到 0 窗口") - // FIXME 揭开注释 - //time.Sleep(100 * time.Second) - //r.ep.snd.sendAck() + r.ep.snd.sendAck() } func (r *receiver) consumeSegment(s *segment, segSeq seqnum.Value, segLen seqnum.Size) bool { diff --git a/tcpip/transport/tcp/snd.go b/tcpip/transport/tcp/snd.go index 70c6473..99a9ca1 100644 --- a/tcpip/transport/tcp/snd.go +++ b/tcpip/transport/tcp/snd.go @@ -263,9 +263,6 @@ func (s *sender) updateMaxPayloadSize(mtu, count int) { } func (s *sender) sendAck() { - if s.ep.id.LocalPort == 9999 { - logger.NOTICE("之前的数据已经确认过了 服务端更新自己的确认边界", atoi(s.ep.rcv.rcvNxt)) - } s.sendSegment(buffer.VectorisedView{}, flagAck, s.sndNxt) // seq = cookies+1 ack ack|fin.seq+1 } @@ -347,13 +344,12 @@ func (s *sender) sendSegment(data buffer.VectorisedView, flags byte, seq seqnum. rcvNxt, rcvWnd := s.ep.rcv.getSendParams() // Remember the max sent ack. - old := s.maxSentAck s.maxSentAck = rcvNxt if s.ep.id.LocalPort == 9999 { - fmt.Println() - log.Println("服务端要求客户端扩展窗口到", rcvWnd, "更新发送端的边缘", old, " TO ", s.maxSentAck) - fmt.Println() + //fmt.Println() + //log.Println("服务端要求客户端扩展窗口到", rcvWnd, "更新发送端的边缘", old, " TO ", s.maxSentAck) + //fmt.Println() } return s.ep.sendRaw(data, flags, seq, rcvNxt, rcvWnd) }