diff --git a/tcpip/header/checksum.go b/tcpip/header/checksum.go index 6a2a00c..a6b059d 100644 --- a/tcpip/header/checksum.go +++ b/tcpip/header/checksum.go @@ -33,6 +33,7 @@ func ChecksumCombine(a, b uint16) uint16 { // given destination protocol and network address, ignoring the length // field. Pseudo-headers are needed by transport layers when calculating // their own checksum. +// hash(protocol, hash(dst, hash(src, 0))) func PseudoHeaderChecksum(protocol tcpip.TransportProtocolNumber, srcAddr tcpip.Address, dstAddr tcpip.Address) uint16 { xsum := Checksum([]byte(srcAddr), 0) xsum = Checksum([]byte(dstAddr), xsum) diff --git a/tcpip/header/tcp.go b/tcpip/header/tcp.go index 25c5377..3ebe13a 100644 --- a/tcpip/header/tcp.go +++ b/tcpip/header/tcp.go @@ -131,6 +131,23 @@ const ( ) // SACKBlock 表示 sack 块的结构体 +/* + +--------+--------+ + | Kind=5 | Length | ++--------+--------+--------+---------+ +| Start of 1st Block | ++--------+--------+--------+---------+ +| End of 1st Block | ++--------+--------+--------+---------+ +| | +/ . . . . . . / +| | ++--------+--------+--------+---------+ +| Start of nth Block | ++--------+--------+--------+---------+ +| End of nth Block | ++--------+--------+--------+---------+ +*/ type SACKBlock struct { // Start indicates the lowest sequence number in the block. Start seqnum.Value diff --git a/tcpip/seqnum/seqnum.go b/tcpip/seqnum/seqnum.go index 17b1521..53d33ae 100644 --- a/tcpip/seqnum/seqnum.go +++ b/tcpip/seqnum/seqnum.go @@ -46,6 +46,7 @@ func (v *Value) UpdateForward(s Size) { } // Overlap checks if the window [a,a+b) overlaps with the window [x, x+y). +// [a,x+y)&&[x, a+b) [a, x, a+b, x+y) [a, x, x+y, a+b) [x, a, a+b, x+y) [x, a, x+y, a+b) func Overlap(a Value, b Size, x Value, y Size) bool { return a.LessThan(x.Add(y)) && x.LessThan(a.Add(b)) } diff --git a/tcpip/stack/stack.go b/tcpip/stack/stack.go index 53876b6..9e4d358 100644 --- a/tcpip/stack/stack.go +++ b/tcpip/stack/stack.go @@ -6,7 +6,9 @@ import ( "netstack/sleep" "netstack/tcpip" "netstack/tcpip/buffer" + "netstack/tcpip/header" "netstack/tcpip/ports" + "netstack/tcpip/seqnum" "netstack/waiter" "sync" "time" @@ -21,12 +23,22 @@ const ( resolutionAttempts = 3 ) -// TODO 需要解读 -type TCPProbeFunc func(s TcpEndpointState) +// TCPProbeFunc is the expected function type for a TCP probe function to be +// passed to stack.AddTCPProbe. +type TCPProbeFunc func(s TCPEndpointState) -// TODO 需要解读 -type TcpEndpointState struct { - // TODO 需要添加 +// TCPCubicState is used to hold a copy of the internal cubic state when the +// TCPProbeFunc is invoked. +type TCPCubicState struct { + WLastMax float64 + WMax float64 + T time.Time + TimeSinceLastCongestion time.Duration + C float64 + K float64 + Beta float64 + WC float64 + WEst float64 } // 传输层协议状态机 包含传输层协议以及默认处理方法 @@ -35,6 +47,207 @@ type transportProtocolState struct { defaultHandler func(*Route, TransportEndpointID, buffer.VectorisedView) bool } +// TCPEndpointID is the unique 4 tuple that identifies a given endpoint. +type TCPEndpointID struct { + // LocalPort is the local port associated with the endpoint. + LocalPort uint16 + + // LocalAddress is the local [network layer] address associated with + // the endpoint. + LocalAddress tcpip.Address + + // RemotePort is the remote port associated with the endpoint. + RemotePort uint16 + + // RemoteAddress it the remote [network layer] address associated with + // the endpoint. + RemoteAddress tcpip.Address +} + +// TCPFastRecoveryState holds a copy of the internal fast recovery state of a +// TCP endpoint. +type TCPFastRecoveryState struct { + // Active if true indicates the endpoint is in fast recovery. + Active bool + + // First is the first unacknowledged sequence number being recovered. + First seqnum.Value + + // Last is the 'recover' sequence number that indicates the point at + // which we should exit recovery barring any timeouts etc. + Last seqnum.Value + + // MaxCwnd is the maximum value we are permitted to grow the congestion + // window during recovery. This is set at the time we enter recovery. + MaxCwnd int +} + +// TCPReceiverState holds a copy of the internal state of the receiver for +// a given TCP endpoint. +type TCPReceiverState struct { + // RcvNxt is the TCP variable RCV.NXT. + RcvNxt seqnum.Value + + // RcvAcc is the TCP variable RCV.ACC. + RcvAcc seqnum.Value + + // RcvWndScale is the window scaling to use for inbound segments. + RcvWndScale uint8 + + // PendingBufUsed is the number of bytes pending in the receive + // queue. + PendingBufUsed seqnum.Size + + // PendingBufSize is the size of the socket receive buffer. + PendingBufSize seqnum.Size +} + +// TCPSenderState holds a copy of the internal state of the sender for +// a given TCP Endpoint. +type TCPSenderState struct { + // LastSendTime is the time at which we sent the last segment. + LastSendTime time.Time + + // DupAckCount is the number of Duplicate ACK's received. + DupAckCount int + + // SndCwnd is the size of the sending congestion window in packets. + SndCwnd int + + // Ssthresh is the slow start threshold in packets. + Ssthresh int + + // SndCAAckCount is the number of packets consumed in congestion + // avoidance mode. + SndCAAckCount int + + // Outstanding is the number of packets in flight. + Outstanding int + + // SndWnd is the send window size in bytes. + SndWnd seqnum.Size + + // SndUna is the next unacknowledged sequence number. + SndUna seqnum.Value + + // SndNxt is the sequence number of the next segment to be sent. + SndNxt seqnum.Value + + // RTTMeasureSeqNum is the sequence number being used for the latest RTT + // measurement. + RTTMeasureSeqNum seqnum.Value + + // RTTMeasureTime is the time when the RTTMeasureSeqNum was sent. + RTTMeasureTime time.Time + + // Closed indicates that the caller has closed the endpoint for sending. + Closed bool + + // SRTT is the smoothed round-trip time as defined in section 2 of + // RFC 6298. + SRTT time.Duration + + // RTO is the retransmit timeout as defined in section of 2 of RFC 6298. + RTO time.Duration + + // RTTVar is the round-trip time variation as defined in section 2 of + // RFC 6298. + RTTVar time.Duration + + // SRTTInited if true indicates take a valid RTT measurement has been + // completed. + SRTTInited bool + + // MaxPayloadSize is the maximum size of the payload of a given segment. + // It is initialized on demand. + MaxPayloadSize int + + // SndWndScale is the number of bits to shift left when reading the send + // window size from a segment. + SndWndScale uint8 + + // MaxSentAck is the highest acknowledgement number sent till now. + MaxSentAck seqnum.Value + + // FastRecovery holds the fast recovery state for the endpoint. + FastRecovery TCPFastRecoveryState + + // Cubic holds the state related to CUBIC congestion control. + Cubic TCPCubicState +} + +// TCPSACKInfo holds TCP SACK related information for a given TCP endpoint. +type TCPSACKInfo struct { + // Blocks is the list of SACK block currently received by the + // TCP endpoint. + Blocks []header.SACKBlock +} + +// TCPEndpointState is a copy of the internal state of a TCP endpoint. +type TCPEndpointState struct { + // ID is a copy of the TransportEndpointID for the endpoint. + ID TCPEndpointID + + // SegTime denotes the absolute time when this segment was received. + SegTime time.Time + + // RcvBufSize is the size of the receive socket buffer for the endpoint. + RcvBufSize int + + // RcvBufUsed is the amount of bytes actually held in the receive socket + // buffer for the endpoint. + RcvBufUsed int + + // RcvClosed if true, indicates the endpoint has been closed for reading. + RcvClosed bool + + // SendTSOk is used to indicate when the TS Option has been negotiated. + // When sendTSOk is true every non-RST segment should carry a TS as per + // RFC7323#section-1.1. + SendTSOk bool + + // RecentTS is the timestamp that should be sent in the TSEcr field of + // the timestamp for future segments sent by the endpoint. This field is + // updated if required when a new segment is received by this endpoint. + RecentTS uint32 + + // TSOffset is a randomized offset added to the value of the TSVal field + // in the timestamp option. + TSOffset uint32 + + // SACKPermitted is set to true if the peer sends the TCPSACKPermitted + // option in the SYN/SYN-ACK. + SACKPermitted bool + + // SACK holds TCP SACK related information for this endpoint. + SACK TCPSACKInfo + + // SndBufSize is the size of the socket send buffer. + SndBufSize int + + // SndBufUsed is the number of bytes held in the socket send buffer. + SndBufUsed int + + // SndClosed indicates that the endpoint has been closed for sends. + SndClosed bool + + // SndBufInQueue is the number of bytes in the send queue. + SndBufInQueue seqnum.Size + + // PacketTooBigCount is used to notify the main protocol routine how + // many times a "packet too big" control packet is received. + PacketTooBigCount int + + // SndMTU is the smallest MTU seen in the control packets received. + SndMTU int + + // Receiver holds variables related to the TCP receiver for the endpoint. + Receiver TCPReceiverState + + // Sender holds state related to the TCP Sender for the endpoint. + Sender TCPSenderState +} + // Stack 是一个网络堆栈,具有所有支持的协议、NIC 和路由表。 type Stack struct { transportProtocols map[tcpip.TransportProtocolNumber]*transportProtocolState // 各种传输层协议 diff --git a/tcpip/transport/tcp/README.md b/tcpip/transport/tcp/README.md index 4146a5f..b7de03f 100644 --- a/tcpip/transport/tcp/README.md +++ b/tcpip/transport/tcp/README.md @@ -115,4 +115,110 @@ b. 如果此时 A 有数据发送,主机 B 收到主机 A 的 Data + ACK,自 4)主机 A 对主机 B 的连接释放报文段发出确认,将 ACK 置为 1,ACK=seq2+1, seq=seq1+1。这样才把从 B 到 A 的反方向连接释放掉,主机 A 的 TCP 再向其应用进程报告,整个连接已经全部释放。 -还有一个要注意的是,fin 包和数据包一样,如果丢失了,会进行重传,实际上可能是是 fin 丢失或 ack 丢失。重传的周期由 rto 决定。 \ No newline at end of file +还有一个要注意的是,fin 包和数据包一样,如果丢失了,会进行重传,实际上可能是是 fin 丢失或 ack 丢失。重传的周期由 rto 决定。 + +## tcp的可靠性机制 + +本小节讨论 tcp 可靠性的实现,首先得知道可靠性指的是什么。可靠性指的是网络层能通信的前提下,保证数据包正确且按序到达对端。 + +比如发送端发送了“12345678”,那么接收端一定能收到“12345678”,不会乱序“12456783”,也不会少或多数据。 + +实现 TCP 的可靠传输有以下机制: + +1. 校验和机制(检测和重传受到损伤的报文段) +2. 确认应答机制(保存失序到达的报文段直至缺失的报文到期,以及检测和丢弃重复的报文段) +3. 超时重传机制(重传丢失的报文段) + +正因为 tcp 实现了可靠性,那么基于 tcp 的应用就可以不用担心发送的数据包丢失、乱序、不正确等,减轻了上层开发的负担。 + +### 检验和 + +每个 tcp 段都包含了一个检验和字段,用来检查报文段是否收到损伤。如果某个报文段因检验和无效而被检查出受到损伤,就由终点 TCP 将其丢弃,并被认为是丢失了。TCP 规定每个报文段都必须使用 16 位的检验和。 + +### 确认机制 + +控制报文段不携带数据,但需要消耗一个序号,它也需要被确认,而 ACK 报文段永远不需要确认,ACK 报文段不消耗序号,也不需要被确认。在以前,TCP 只使用一种类型的确认,叫积累确认,目前 TCP 实现还实现了选择确认。 + +#### 累积确认 ACK + +接收方通告它期望接收的下一个字节的序号,并忽略所有失序到达并被保存的报文段。有时这被称为肯定累积确认。在 TCP 首部的 32 位 ACK 字段用于积累确认,而它的值仅在 ACK 标志为 1 时才有效。举个例子来说,这里先不考虑 tcp 的序列号,如果发送方发了数据包 p1,p2,p3,p4;接受方成功收到 p1,p2,p4。那么接收方需要发回一个确认包,序号为 3(3 表示期望下一个收到的包的序号),那么发送方就知道 p1 到 p2 都发送接收成功,必要时重发 p3。一个确认包确认了累积到某一序号的所有包,而不是对每个序号都发确认包。实际的 tcp 确认的都是序列号,而不是包的序号,但原理是一样的。 + +累积确认是快速重传的基础,这个后面讲拥塞控制的时候会详细说明。 + +#### 选择确认 SACK + +选择确认 SACK 要报告失序的数据块以及重复的报文段块,是为了更准确的告诉发送方需要重传哪些数据块。SACK 并没有取代 ACK,而是向发送方报告了更多的信息。SACK 是作为 TCP 首部末尾的选项来实现的。 首先是否要启动 sack,应该在握手的时候告诉对方自己是否开启了 sack,这个是通过 kind=4 是选择性确认(Selective Acknowledgment,SACK)选项来实现的。 实际传送 sack 信息的是 kind=5 的选项,其格式如下: + +``` sh + +--------+--------+ + | Kind=5 | Length | ++--------+--------+--------+---------+ +| Start of 1st Block | ++--------+--------+--------+---------+ +| End of 1st Block | ++--------+--------+--------+---------+ +| | +/ . . . . . . / +| | ++--------+--------+--------+---------+ +| Start of nth Block | ++--------+--------+--------+---------+ +| End of nth Block | ++--------+--------+--------+---------+ + +``` + +sack 的每个块是由两个参数构成的 { Start, End } Start 不连续块的第一个数据的序列号。End 不连续块的最后一个数据的序列号之后的序列号。 + +该选项参数告诉对方已经接收到并缓存的不连续的数据块,注意都是已经接收的,发送方可根据此信息检查究竟是哪个块丢失,从而发送相应的数据块。 + +![inmg](https://doc.shiyanlou.com/document-uid949121labid10418timestamp1555574090941.png) + +如图所示,tcp 接收方在接收到不连续的 tcp 段,可以看出,序号 1 ~ 1000,1501 ~ 3000,3501 ~ 4500 接收到了,但却少了序号 1001 ~ 1500,3001 ~ 3500 。 + +前面说了,sack 报告的是已接收的不连续的块,在这个例子中,sack 块的内容为 {Start:1501, End:3001},{Start:3501, End:4501}。 + +注意:这里的 End 不是接收到数据段最后的序列号,而是最后的序列号加 1。 + +## 产生确认的情况 + +1. 当接收方收到了按序到达(序号是所期望的)的报文段,那么接收方就累积发送确认报文段。 +2. 当具有所期望的序号的报文段到达,而前一个按序到达的报文段还没有被确认,那么接收方就要立即发送 ACK 报文段。 +3. 当序号比期望的序号还大的失序报文段到达时,接收方立即发送 ACK 报文段,并宣布下一个期望的报文段序号。这将导致对丢失报文段的快重传。 +4. 当一个丢失的报文段到达时,接收方要发送 ACK 报文段,并宣布下一个所期望的序号。 +5. 如果到达一个重复的报文段,接收方丢弃该报文段,但是应当立即发送确认,指出下一个期望的报文段。 +6. 收到 fin 报文的时候,立即回复确认。 + +### 重传机制 + +- **RTO 即超时重传时间** 30s后重发一次 +- **RTT 数据包往返时间** 从发送到收到确认的时间 +- 平均偏差 |∑Pi-P均值|/N N为测定的次数 + +![img](https://doc.shiyanlou.com/document-uid949121labid10418timestamp1555574119125.png) + +可靠性的核心就是报文段的重传。在一个报文段发送时,它会被保存到一个队列中,直至被确认为止。当重传计时器超时,或者发送方收到该队列中第一个报文段的三个重复的 ACK 时,该报文段被重传。 + +超时重传的概念很简单,就是一定时间内未收到确认,进行再次发送,但是如何计算重传的时间确实 tcp 最复杂的问题之一,毕竟要适应各种网络情况。TCP 一个连接期间只有一个 RTO 计时器,目前大部分实现都是采用Jacobaon/Karels 算法,详细可以看 RFC6298,其计算公式如下: + +``` sh +第一次rtt计算: +SRTT = R +RTTVAR = R/2 +RTO = SRTT + max (G, K*RTTVAR) = R + max(G, 2 * R) +K = 4 + +之后: +RTTVAR = (1 - beta) * RTTVAR + beta * |SRTT - R'| = 0.75 * RTTVAR + 0.25 * |SRTT - R'| +SRTT = (1 - alpha) * SRTT + alpha * R' = 0.875 * SRTT + 0.125 * R' +RTO = SRTT + max (G, K*RTTVAR) = SRTT + max(G, 4 * RTTVAR) +K = 4 + +``` + +- SRTT(smoothed round-trip time)平滑 RTT 时间 +- RTTVAR(round-trip time variation)RTT 变量,其实就是 rtt 平均偏差 +- G 表示系统时钟的粒度,一般很小,us 级别。beta = 1/4, alpha = 1/8 + +发送方 TCP 的计时器时间到,TCP 发送队列中最前面的报文段(即序列号最小的报文段),并重启计时器。 + diff --git a/tcpip/transport/tcp/connect.go b/tcpip/transport/tcp/connect.go index 68ff0eb..ad2a92d 100644 --- a/tcpip/transport/tcp/connect.go +++ b/tcpip/transport/tcp/connect.go @@ -14,7 +14,6 @@ import ( "netstack/waiter" "sync" "time" - "unsafe" ) const maxSegmentsPerWake = 100 @@ -502,7 +501,7 @@ func sendTCP(r *stack.Route, id stack.TransportEndpointID, data buffer.Vectorise // Allocate a buffer for the TCP header. hdr := buffer.NewPrependable(header.TCPMinimumSize + int(r.MaxHeaderLength()) + optLen) - if rcvWnd > 0xffff { + if rcvWnd > 0xffff { // 65535 rcvWnd = 0xffff } @@ -521,7 +520,7 @@ func sendTCP(r *stack.Route, id stack.TransportEndpointID, data buffer.Vectorise // Only calculate the checksum if offloading isn't supported. if r.Capabilities()&stack.CapabilityChecksumOffload == 0 { - length := uint16(hdr.UsedLength() + data.Size()) + length := uint16(hdr.UsedLength() + data.Size()) // 报文头+数据长度 // tcp伪首部校验和的计算 xsum := r.PseudoHeaderChecksum(ProtocolNumber) for _, v := range data.Views() { @@ -640,7 +639,6 @@ func (e *endpoint) handleClose() *tcpip.Error { // handleSegments 从队列中取出 tcp 段数据,然后处理它们。 func (e *endpoint) handleSegments() *tcpip.Error { - log.Println(unsafe.Pointer(e), "处理报文") checkRequeue := true for i := 0; i < maxSegmentsPerWake; i++ { s := e.segmentQueue.dequeue() @@ -648,10 +646,18 @@ func (e *endpoint) handleSegments() *tcpip.Error { checkRequeue = false break } + + // Invoke the tcp probe if installed. + if e.probe != nil { + e.probe(e.completeState()) + } + if s.flagIsSet(flagRst) { // TODO 如果收到 rst 报文 - s.decRef() - return tcpip.ErrConnectionReset + if e.rcv.acceptable(s.sequenceNumber, 0) { + s.decRef() + return tcpip.ErrConnectionReset + } } else if s.flagIsSet(flagAck) { // 处理正常报文 // Patch the window size in the segment according to the diff --git a/tcpip/transport/tcp/endpoint.go b/tcpip/transport/tcp/endpoint.go index 925bb49..eace208 100644 --- a/tcpip/transport/tcp/endpoint.go +++ b/tcpip/transport/tcp/endpoint.go @@ -32,6 +32,19 @@ const ( stateError ) +// SACKInfo holds TCP SACK related information for a given endpoint. +// +// +stateify savable +type SACKInfo struct { + // Blocks is the maximum number of SACK blocks we track + // per endpoint. + Blocks [MaxSACKBlocks]header.SACKBlock + + // NumBlocks is the number of valid SACK blocks stored in the + // blocks array above. + NumBlocks int +} + // endpoint 表示TCP端点。该结构用作端点用户和协议实现之间的接口;让并发goroutine调用端点是合法的, // 它们是正确同步的。然而,协议实现在单个goroutine中运行。 type endpoint struct { @@ -100,6 +113,8 @@ type endpoint struct { // option in the SYN/SYN-ACK. sackPermitted bool + sack SACKInfo + segmentQueue segmentQueue // When the send side is closed, the protocol goroutine is notified via @@ -113,6 +128,14 @@ type endpoint struct { sndWaker sleep.Waker sndCloseWaker sleep.Waker + // The following are used when a "packet too big" control packet is + // received. They are protected by sndBufMu. They are used to + // communicate to the main protocol goroutine how many such control + // messages have been received since the last notification was processed + // and what was the smallest MTU seen + packetTooBigCount int + sndMTU int + // notificationWaker is used to indicate to the protocol goroutine that // it needs to wake up and check for notifications. notificationWaker sleep.Waker @@ -132,6 +155,10 @@ type endpoint struct { rcv *receiver snd *sender + // probe if not nil is invoked on every received segment. It is passed + // a copy of the current state of the endpoint. + probe stack.TCPProbeFunc + // The following are only used to assist the restore run to re-connect. bindAddress tcpip.Address connectingAddress tcpip.Address @@ -817,3 +844,95 @@ func (e *endpoint) maybeEnableSACKPermitted(synOpts *header.TCPSynOptions) { e.sackPermitted = true } } + +// completeState makes a full copy of the endpoint and returns it. This is used +// before invoking the probe. The state returned may not be fully consistent if +// there are intervening syscalls when the state is being copied. +func (e *endpoint) completeState() stack.TCPEndpointState { + var s stack.TCPEndpointState + s.SegTime = time.Now() + + // Copy EndpointID. + e.mu.Lock() + s.ID = stack.TCPEndpointID(e.id) + e.mu.Unlock() + + // Copy endpoint rcv state. + e.rcvListMu.Lock() + s.RcvBufSize = e.rcvBufSize + s.RcvBufUsed = e.rcvBufUsed + s.RcvClosed = e.rcvClosed + e.rcvListMu.Unlock() + + // Endpoint TCP Option state. + s.SendTSOk = e.sendTSOk + s.RecentTS = e.recentTS + s.TSOffset = e.tsOffset + s.SACKPermitted = e.sackPermitted + s.SACK.Blocks = make([]header.SACKBlock, e.sack.NumBlocks) + copy(s.SACK.Blocks, e.sack.Blocks[:e.sack.NumBlocks]) + + // Copy endpoint send state. + e.sndBufMu.Lock() + s.SndBufSize = e.sndBufSize + s.SndBufUsed = e.sndBufUsed + s.SndClosed = e.sndClosed + s.SndBufInQueue = e.sndBufInQueue + s.PacketTooBigCount = e.packetTooBigCount + s.SndMTU = e.sndMTU + e.sndBufMu.Unlock() + + // Copy receiver state. + s.Receiver = stack.TCPReceiverState{ + RcvNxt: e.rcv.rcvNxt, + RcvAcc: e.rcv.rcvAcc, + RcvWndScale: e.rcv.rcvWndScale, + PendingBufUsed: e.rcv.pendingBufUsed, + PendingBufSize: e.rcv.pendingBufSize, + } + + // Copy sender state. + s.Sender = stack.TCPSenderState{ + LastSendTime: e.snd.lastSendTime, + DupAckCount: e.snd.dupAckCount, + //FastRecovery: stack.TCPFastRecoveryState{ + // Active: e.snd.fr.active, + // First: e.snd.fr.first, + // Last: e.snd.fr.last, + // MaxCwnd: e.snd.fr.maxCwnd, + //}, + SndCwnd: e.snd.sndCwnd, + Ssthresh: e.snd.sndSsthresh, + SndCAAckCount: e.snd.sndCAAckCount, + Outstanding: e.snd.outstanding, + SndWnd: e.snd.sndWnd, + SndUna: e.snd.sndUna, + SndNxt: e.snd.sndNxt, + RTTMeasureSeqNum: e.snd.rttMeasureSeqNum, + RTTMeasureTime: e.snd.rttMeasureTime, + Closed: e.snd.closed, + RTO: e.snd.rto, + SRTTInited: e.snd.srttInited, + MaxPayloadSize: e.snd.maxPayloadSize, + SndWndScale: e.snd.sndWndScale, + MaxSentAck: e.snd.maxSentAck, + } + e.snd.rtt.Lock() + s.Sender.SRTT = e.snd.rtt.srtt + e.snd.rtt.Unlock() + + //if cubic, ok := e.snd.cc.(*cubicState); ok { + // s.Sender.Cubic = stack.TCPCubicState{ + // WMax: cubic.wMax, + // WLastMax: cubic.wLastMax, + // T: cubic.t, + // TimeSinceLastCongestion: time.Since(cubic.t), + // C: cubic.c, + // K: cubic.k, + // Beta: cubic.beta, + // WC: cubic.wC, + // WEst: cubic.wEst, + // } + //} + return s +} diff --git a/tcpip/transport/tcp/rcv.go b/tcpip/transport/tcp/rcv.go index 4587e85..eac765a 100644 --- a/tcpip/transport/tcp/rcv.go +++ b/tcpip/transport/tcp/rcv.go @@ -17,24 +17,32 @@ type receiver struct { closed bool - // TODO 需要添加 + pendingRcvdSegments segmentHeap + pendingBufUsed seqnum.Size + pendingBufSize seqnum.Size } // 新建并初始化接收器 func newReceiver(ep *endpoint, irs seqnum.Value, rcvWnd seqnum.Size, rcvWndScale uint8) *receiver { r := &receiver{ - ep: ep, - rcvNxt: irs + 1, // 成功建立连接后期望读取的第一个字节序号 - rcvAcc: irs.Add(rcvWnd + 1), - rcvWndScale: rcvWndScale, + ep: ep, + rcvNxt: irs + 1, // 成功建立连接后期望读取的第一个字节序号 + rcvAcc: irs.Add(rcvWnd + 1), + rcvWndScale: rcvWndScale, + pendingBufSize: rcvWnd, } return r } // tcp流量控制:判断 segSeq 在窗口內 func (r *receiver) acceptable(segSeq seqnum.Value, segLen seqnum.Size) bool { - // TODO 流量控制 - return true + rcvWnd := r.rcvNxt.Size(r.rcvAcc) + if rcvWnd == 0 { + return segLen == 0 && segSeq == r.rcvNxt // 是否卡在边上 + } + + return segSeq.InWindow(r.rcvNxt, rcvWnd) || // 在窗口内部 + seqnum.Overlap(r.rcvNxt, rcvWnd, segSeq, segLen) // 范围有重叠 } // getSendParams returns the parameters needed by the sender when building @@ -85,7 +93,7 @@ func (r *receiver) consumeSegment(s *segment, segSeq seqnum.Value, segLen seqnum r.rcvNxt++ // 收到 fin,立即回复 ack - r.ep.snd.sendAck() // FIXME 不应该是 seq+2 捏 + r.ep.snd.sendAck() // 标记接收器关闭 // 触发上层应用可以读取 diff --git a/tcpip/transport/tcp/sack.go b/tcpip/transport/tcp/sack.go new file mode 100644 index 0000000..74b31fe --- /dev/null +++ b/tcpip/transport/tcp/sack.go @@ -0,0 +1,6 @@ +package tcp + +const ( + // MaxSACKBlocks 是接收端存储的最大SACK数 + MaxSACKBlocks = 6 +) diff --git a/tcpip/transport/tcp/segment_heap.go b/tcpip/transport/tcp/segment_heap.go new file mode 100644 index 0000000..a288990 --- /dev/null +++ b/tcpip/transport/tcp/segment_heap.go @@ -0,0 +1,34 @@ +package tcp + +// tcp段的堆,用来暂存失序的tcp段 +// 实现了堆排序 +type segmentHeap []*segment + +// Len returns the length of h. +func (h segmentHeap) Len() int { + return len(h) +} + +// Less determines whether the i-th element of h is less than the j-th element. +func (h segmentHeap) Less(i, j int) bool { + return h[i].sequenceNumber.LessThan(h[j].sequenceNumber) +} + +// Swap swaps the i-th and j-th elements of h. +func (h segmentHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] +} + +// Push adds x as the last element of h. +func (h *segmentHeap) Push(x interface{}) { + *h = append(*h, x.(*segment)) +} + +// Pop removes the last element of h and returns it. +func (h *segmentHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[:n-1] + return x +} diff --git a/tcpip/transport/tcp/snd.go b/tcpip/transport/tcp/snd.go index 59628dd..ae4daa0 100644 --- a/tcpip/transport/tcp/snd.go +++ b/tcpip/transport/tcp/snd.go @@ -2,9 +2,11 @@ package tcp import ( "netstack/logger" + "netstack/sleep" "netstack/tcpip" "netstack/tcpip/buffer" "netstack/tcpip/seqnum" + "sync" "time" ) @@ -45,6 +47,43 @@ type sender struct { // lastSendTime 是发送最后一个数据包的时间戳。 lastSendTime time.Time + // dupAckCount is the number of duplicated acks received. It is used for + // fast retransmit. + // dupAckCount 是收到的重复ack数。它用于快速重传。 + dupAckCount int + + // fr holds state related to fast recovery. + // fr 持有与快速恢复有关的状态。 + fr fastRecovery + + // sndCwnd is the congestion window, in packets. + // sndCwnd 是拥塞窗口,单位是包 + sndCwnd int + + // sndSsthresh is the threshold between slow start and congestion + // avoidance. + // sndSsthresh 是慢启动和拥塞避免之间的阈值。 + sndSsthresh int + + // sndCAAckCount is the number of packets acknowledged during congestion + // avoidance. When enough packets have been ack'd (typically cwnd + // packets), the congestion window is incremented by one. + // sndCAAckCount 是拥塞避免期间确认的数据包数。当已经确认了足够的分组(通常是cwnd分组)时,拥塞窗口增加1。 + sndCAAckCount int + + // outstanding is the number of outstanding packets, that is, packets + // that have been sent but not yet acknowledged. + // outstanding 是正在发送的数据包的数量,即已发送但尚未确认的数据包。 + outstanding int + + // sndWnd is the send window size. + // 发送窗口大小,单位是字节 + sndWnd seqnum.Size + + // sndUna is the next unacknowledged sequence number. + // sndUna 是下一个未确认的序列号 + sndUna seqnum.Value + // sndNxt 是要发送的下一个段的序列号。 sndNxt seqnum.Value @@ -53,19 +92,55 @@ type sender struct { // sndNxtList 是要添加到发送列表的下一个段的序列号。 sndNxtList seqnum.Value - // maxSentAck is the maxium acknowledgement actually sent. - maxSentAck seqnum.Value + // rttMeasureSeqNum is the sequence number being used for the latest RTT + // measurement. + rttMeasureSeqNum seqnum.Value + + // rttMeasureTime is the time when the rttMeasureSeqNum was sent. + rttMeasureTime time.Time closed bool writeNext *segment // 发送链表 - writeList segmentList + writeList segmentList + resendTimer timer + resendWaker sleep.Waker + + rtt rtt // 往返时间 + rto time.Duration // 超时重发时间 + srttInited bool + + // maxPayloadSize is the maximum size of the payload of a given segment. + // It is initialized on demand. + maxPayloadSize int + + // sndWndScale is the number of bits to shift left when reading the send + // window size from a segment. + sndWndScale uint8 + + // maxSentAck is the maxium acknowledgement actually sent. + maxSentAck seqnum.Value // cc is the congestion control algorithm in use for this sender. // cc 是实现拥塞控制算法的接口 cc congestionControl } +type rtt struct { + sync.Mutex + srtt time.Duration // 平滑 RTT 时间 + rttvar time.Duration // rtt 平均偏差 ∑|x-xbar|/n +} + +// fastRecovery holds information related to fast recovery from a packet loss. +// +// +stateify savable +// fastRecovery 保存与数据包丢失快速恢复相关的信息 +type fastRecovery struct { + active bool + // TODO 需要添加 +} + // 新建并初始化发送器 irs是cookies func newSender(ep *endpoint, iss, irs seqnum.Value, sndWnd seqnum.Size, mss uint16, sndWndScale int) *sender { s := &sender{ diff --git a/tcpip/transport/tcp/timer.go b/tcpip/transport/tcp/timer.go new file mode 100644 index 0000000..e7e7ca3 --- /dev/null +++ b/tcpip/transport/tcp/timer.go @@ -0,0 +1,91 @@ +package tcp + +import ( + "netstack/sleep" + "time" +) + +type timerState int + +const ( + timerStateDisabled timerState = iota + timerStateEnabled + timerStateOrphaned +) + +// 定时器的实现 +type timer struct { + state timerState + + // target is the expiration time of the current timer. It is only + // meaningful in the enabled state. + target time.Time + + // runtimeTarget is the expiration time of the runtime timer. It is + // meaningful in the enabled and orphaned states. + runtimeTarget time.Time + + // timer is the runtime timer used to wait on. + timer *time.Timer +} + +// init initializes the timer. Once it expires, it the given waker will be +// asserted. +func (t *timer) init(w *sleep.Waker) { + t.state = timerStateDisabled + + // Initialize a runtime timer that will assert the waker, then + // immediately stop it. + t.timer = time.AfterFunc(time.Hour, func() { + w.Assert() + }) + t.timer.Stop() +} + +// cleanup frees all resources associated with the timer. +func (t *timer) cleanup() { + t.timer.Stop() +} + +// 检查是否过期 +func (t *timer) checkExpiration() bool { + if t.state == timerStateOrphaned { + t.state = timerStateDisabled + return false + } + + now := time.Now() + if now.Before(t.target) { + t.runtimeTarget = t.target + t.timer.Reset(t.target.Sub(now)) // ??这一步是为了什么 + return false + } + + t.state = timerStateDisabled + return true +} + +// 关闭计时器 设置其状态为一个孤儿 +func (t *timer) disable() { + if t.state != timerStateDisabled { + t.state = timerStateOrphaned + } +} + +// 开启计时器 +func (t *timer) enable(d time.Duration) { + t.target = time.Now().Add(d) + + // Check if we need to set the runtime timer. + if t.state == timerStateDisabled || t.target.Before(t.runtimeTarget) { + t.runtimeTarget = t.target + t.timer.Reset(d) + } + + t.state = timerStateEnabled +} + +// 检验计时器是否已经启动 +func (t *timer) enabled() bool { + return t.state == timerStateEnabled +}