tcp可靠性实现 抄了一堆东西 信息量太大了 看不过来

This commit is contained in:
impact-eintr
2022-12-13 16:17:57 +08:00
parent fb578a6f9e
commit 5a9042d890
12 changed files with 700 additions and 23 deletions

View File

@@ -33,6 +33,7 @@ func ChecksumCombine(a, b uint16) uint16 {
// given destination protocol and network address, ignoring the length
// field. Pseudo-headers are needed by transport layers when calculating
// their own checksum.
// hash(protocol, hash(dst, hash(src, 0)))
func PseudoHeaderChecksum(protocol tcpip.TransportProtocolNumber, srcAddr tcpip.Address, dstAddr tcpip.Address) uint16 {
xsum := Checksum([]byte(srcAddr), 0)
xsum = Checksum([]byte(dstAddr), xsum)

View File

@@ -131,6 +131,23 @@ const (
)
// SACKBlock 表示 sack 块的结构体
/*
+--------+--------+
| Kind=5 | Length |
+--------+--------+--------+---------+
| Start of 1st Block |
+--------+--------+--------+---------+
| End of 1st Block |
+--------+--------+--------+---------+
| |
/ . . . . . . /
| |
+--------+--------+--------+---------+
| Start of nth Block |
+--------+--------+--------+---------+
| End of nth Block |
+--------+--------+--------+---------+
*/
type SACKBlock struct {
// Start indicates the lowest sequence number in the block.
Start seqnum.Value

View File

@@ -46,6 +46,7 @@ func (v *Value) UpdateForward(s Size) {
}
// Overlap checks if the window [a,a+b) overlaps with the window [x, x+y).
// [a,x+y)&&[x, a+b) [a, x, a+b, x+y) [a, x, x+y, a+b) [x, a, a+b, x+y) [x, a, x+y, a+b)
func Overlap(a Value, b Size, x Value, y Size) bool {
return a.LessThan(x.Add(y)) && x.LessThan(a.Add(b))
}

View File

@@ -6,7 +6,9 @@ import (
"netstack/sleep"
"netstack/tcpip"
"netstack/tcpip/buffer"
"netstack/tcpip/header"
"netstack/tcpip/ports"
"netstack/tcpip/seqnum"
"netstack/waiter"
"sync"
"time"
@@ -21,12 +23,22 @@ const (
resolutionAttempts = 3
)
// TODO 需要解读
type TCPProbeFunc func(s TcpEndpointState)
// TCPProbeFunc is the expected function type for a TCP probe function to be
// passed to stack.AddTCPProbe.
type TCPProbeFunc func(s TCPEndpointState)
// TODO 需要解读
type TcpEndpointState struct {
// TODO 需要添加
// TCPCubicState is used to hold a copy of the internal cubic state when the
// TCPProbeFunc is invoked.
type TCPCubicState struct {
WLastMax float64
WMax float64
T time.Time
TimeSinceLastCongestion time.Duration
C float64
K float64
Beta float64
WC float64
WEst float64
}
// 传输层协议状态机 包含传输层协议以及默认处理方法
@@ -35,6 +47,207 @@ type transportProtocolState struct {
defaultHandler func(*Route, TransportEndpointID, buffer.VectorisedView) bool
}
// TCPEndpointID is the unique 4 tuple that identifies a given endpoint.
type TCPEndpointID struct {
// LocalPort is the local port associated with the endpoint.
LocalPort uint16
// LocalAddress is the local [network layer] address associated with
// the endpoint.
LocalAddress tcpip.Address
// RemotePort is the remote port associated with the endpoint.
RemotePort uint16
// RemoteAddress it the remote [network layer] address associated with
// the endpoint.
RemoteAddress tcpip.Address
}
// TCPFastRecoveryState holds a copy of the internal fast recovery state of a
// TCP endpoint.
type TCPFastRecoveryState struct {
// Active if true indicates the endpoint is in fast recovery.
Active bool
// First is the first unacknowledged sequence number being recovered.
First seqnum.Value
// Last is the 'recover' sequence number that indicates the point at
// which we should exit recovery barring any timeouts etc.
Last seqnum.Value
// MaxCwnd is the maximum value we are permitted to grow the congestion
// window during recovery. This is set at the time we enter recovery.
MaxCwnd int
}
// TCPReceiverState holds a copy of the internal state of the receiver for
// a given TCP endpoint.
type TCPReceiverState struct {
// RcvNxt is the TCP variable RCV.NXT.
RcvNxt seqnum.Value
// RcvAcc is the TCP variable RCV.ACC.
RcvAcc seqnum.Value
// RcvWndScale is the window scaling to use for inbound segments.
RcvWndScale uint8
// PendingBufUsed is the number of bytes pending in the receive
// queue.
PendingBufUsed seqnum.Size
// PendingBufSize is the size of the socket receive buffer.
PendingBufSize seqnum.Size
}
// TCPSenderState holds a copy of the internal state of the sender for
// a given TCP Endpoint.
type TCPSenderState struct {
// LastSendTime is the time at which we sent the last segment.
LastSendTime time.Time
// DupAckCount is the number of Duplicate ACK's received.
DupAckCount int
// SndCwnd is the size of the sending congestion window in packets.
SndCwnd int
// Ssthresh is the slow start threshold in packets.
Ssthresh int
// SndCAAckCount is the number of packets consumed in congestion
// avoidance mode.
SndCAAckCount int
// Outstanding is the number of packets in flight.
Outstanding int
// SndWnd is the send window size in bytes.
SndWnd seqnum.Size
// SndUna is the next unacknowledged sequence number.
SndUna seqnum.Value
// SndNxt is the sequence number of the next segment to be sent.
SndNxt seqnum.Value
// RTTMeasureSeqNum is the sequence number being used for the latest RTT
// measurement.
RTTMeasureSeqNum seqnum.Value
// RTTMeasureTime is the time when the RTTMeasureSeqNum was sent.
RTTMeasureTime time.Time
// Closed indicates that the caller has closed the endpoint for sending.
Closed bool
// SRTT is the smoothed round-trip time as defined in section 2 of
// RFC 6298.
SRTT time.Duration
// RTO is the retransmit timeout as defined in section of 2 of RFC 6298.
RTO time.Duration
// RTTVar is the round-trip time variation as defined in section 2 of
// RFC 6298.
RTTVar time.Duration
// SRTTInited if true indicates take a valid RTT measurement has been
// completed.
SRTTInited bool
// MaxPayloadSize is the maximum size of the payload of a given segment.
// It is initialized on demand.
MaxPayloadSize int
// SndWndScale is the number of bits to shift left when reading the send
// window size from a segment.
SndWndScale uint8
// MaxSentAck is the highest acknowledgement number sent till now.
MaxSentAck seqnum.Value
// FastRecovery holds the fast recovery state for the endpoint.
FastRecovery TCPFastRecoveryState
// Cubic holds the state related to CUBIC congestion control.
Cubic TCPCubicState
}
// TCPSACKInfo holds TCP SACK related information for a given TCP endpoint.
type TCPSACKInfo struct {
// Blocks is the list of SACK block currently received by the
// TCP endpoint.
Blocks []header.SACKBlock
}
// TCPEndpointState is a copy of the internal state of a TCP endpoint.
type TCPEndpointState struct {
// ID is a copy of the TransportEndpointID for the endpoint.
ID TCPEndpointID
// SegTime denotes the absolute time when this segment was received.
SegTime time.Time
// RcvBufSize is the size of the receive socket buffer for the endpoint.
RcvBufSize int
// RcvBufUsed is the amount of bytes actually held in the receive socket
// buffer for the endpoint.
RcvBufUsed int
// RcvClosed if true, indicates the endpoint has been closed for reading.
RcvClosed bool
// SendTSOk is used to indicate when the TS Option has been negotiated.
// When sendTSOk is true every non-RST segment should carry a TS as per
// RFC7323#section-1.1.
SendTSOk bool
// RecentTS is the timestamp that should be sent in the TSEcr field of
// the timestamp for future segments sent by the endpoint. This field is
// updated if required when a new segment is received by this endpoint.
RecentTS uint32
// TSOffset is a randomized offset added to the value of the TSVal field
// in the timestamp option.
TSOffset uint32
// SACKPermitted is set to true if the peer sends the TCPSACKPermitted
// option in the SYN/SYN-ACK.
SACKPermitted bool
// SACK holds TCP SACK related information for this endpoint.
SACK TCPSACKInfo
// SndBufSize is the size of the socket send buffer.
SndBufSize int
// SndBufUsed is the number of bytes held in the socket send buffer.
SndBufUsed int
// SndClosed indicates that the endpoint has been closed for sends.
SndClosed bool
// SndBufInQueue is the number of bytes in the send queue.
SndBufInQueue seqnum.Size
// PacketTooBigCount is used to notify the main protocol routine how
// many times a "packet too big" control packet is received.
PacketTooBigCount int
// SndMTU is the smallest MTU seen in the control packets received.
SndMTU int
// Receiver holds variables related to the TCP receiver for the endpoint.
Receiver TCPReceiverState
// Sender holds state related to the TCP Sender for the endpoint.
Sender TCPSenderState
}
// Stack 是一个网络堆栈具有所有支持的协议、NIC 和路由表。
type Stack struct {
transportProtocols map[tcpip.TransportProtocolNumber]*transportProtocolState // 各种传输层协议

View File

@@ -116,3 +116,109 @@ b. 如果此时 A 有数据发送,主机 B 收到主机 A 的 Data + ACK
4主机 A 对主机 B 的连接释放报文段发出确认,将 ACK 置为 1ACK=seq2+1, seq=seq1+1。这样才把从 B 到 A 的反方向连接释放掉,主机 A 的 TCP 再向其应用进程报告,整个连接已经全部释放。
还有一个要注意的是fin 包和数据包一样,如果丢失了,会进行重传,实际上可能是是 fin 丢失或 ack 丢失。重传的周期由 rto 决定。
## tcp的可靠性机制
本小节讨论 tcp 可靠性的实现,首先得知道可靠性指的是什么。可靠性指的是网络层能通信的前提下,保证数据包正确且按序到达对端。
比如发送端发送了“12345678”那么接收端一定能收到“12345678”不会乱序“12456783”也不会少或多数据。
实现 TCP 的可靠传输有以下机制:
1. 校验和机制(检测和重传受到损伤的报文段)
2. 确认应答机制(保存失序到达的报文段直至缺失的报文到期,以及检测和丢弃重复的报文段)
3. 超时重传机制(重传丢失的报文段)
正因为 tcp 实现了可靠性,那么基于 tcp 的应用就可以不用担心发送的数据包丢失、乱序、不正确等,减轻了上层开发的负担。
### 检验和
每个 tcp 段都包含了一个检验和字段,用来检查报文段是否收到损伤。如果某个报文段因检验和无效而被检查出受到损伤,就由终点 TCP 将其丢弃并被认为是丢失了。TCP 规定每个报文段都必须使用 16 位的检验和。
### 确认机制
控制报文段不携带数据,但需要消耗一个序号,它也需要被确认,而 ACK 报文段永远不需要确认ACK 报文段不消耗序号也不需要被确认。在以前TCP 只使用一种类型的确认,叫积累确认,目前 TCP 实现还实现了选择确认。
#### 累积确认 ACK
接收方通告它期望接收的下一个字节的序号,并忽略所有失序到达并被保存的报文段。有时这被称为肯定累积确认。在 TCP 首部的 32 位 ACK 字段用于积累确认,而它的值仅在 ACK 标志为 1 时才有效。举个例子来说,这里先不考虑 tcp 的序列号,如果发送方发了数据包 p1p2p3p4接受方成功收到 p1p2p4。那么接收方需要发回一个确认包序号为 3(3 表示期望下一个收到的包的序号),那么发送方就知道 p1 到 p2 都发送接收成功,必要时重发 p3。一个确认包确认了累积到某一序号的所有包而不是对每个序号都发确认包。实际的 tcp 确认的都是序列号,而不是包的序号,但原理是一样的。
累积确认是快速重传的基础,这个后面讲拥塞控制的时候会详细说明。
#### 选择确认 SACK
选择确认 SACK 要报告失序的数据块以及重复的报文段块是为了更准确的告诉发送方需要重传哪些数据块。SACK 并没有取代 ACK而是向发送方报告了更多的信息。SACK 是作为 TCP 首部末尾的选项来实现的。 首先是否要启动 sack应该在握手的时候告诉对方自己是否开启了 sack这个是通过 kind=4 是选择性确认Selective AcknowledgmentSACK选项来实现的。 实际传送 sack 信息的是 kind=5 的选项,其格式如下:
``` sh
+--------+--------+
| Kind=5 | Length |
+--------+--------+--------+---------+
| Start of 1st Block |
+--------+--------+--------+---------+
| End of 1st Block |
+--------+--------+--------+---------+
| |
/ . . . . . . /
| |
+--------+--------+--------+---------+
| Start of nth Block |
+--------+--------+--------+---------+
| End of nth Block |
+--------+--------+--------+---------+
```
sack 的每个块是由两个参数构成的 { Start, End } Start 不连续块的第一个数据的序列号。End 不连续块的最后一个数据的序列号之后的序列号。
该选项参数告诉对方已经接收到并缓存的不连续的数据块,注意都是已经接收的,发送方可根据此信息检查究竟是哪个块丢失,从而发送相应的数据块。
![inmg](https://doc.shiyanlou.com/document-uid949121labid10418timestamp1555574090941.png)
如图所示tcp 接收方在接收到不连续的 tcp 段,可以看出,序号 1 10001501 30003501 4500 接收到了,但却少了序号 1001 15003001 3500 。
前面说了sack 报告的是已接收的不连续的块在这个例子中sack 块的内容为 {Start:1501, End:3001},{Start:3501, End:4501}。
注意:这里的 End 不是接收到数据段最后的序列号,而是最后的序列号加 1。
## 产生确认的情况
1. 当接收方收到了按序到达(序号是所期望的)的报文段,那么接收方就累积发送确认报文段。
2. 当具有所期望的序号的报文段到达,而前一个按序到达的报文段还没有被确认,那么接收方就要立即发送 ACK 报文段。
3. 当序号比期望的序号还大的失序报文段到达时,接收方立即发送 ACK 报文段,并宣布下一个期望的报文段序号。这将导致对丢失报文段的快重传。
4. 当一个丢失的报文段到达时,接收方要发送 ACK 报文段,并宣布下一个所期望的序号。
5. 如果到达一个重复的报文段,接收方丢弃该报文段,但是应当立即发送确认,指出下一个期望的报文段。
6. 收到 fin 报文的时候,立即回复确认。
### 重传机制
- **RTO 即超时重传时间** 30s后重发一次
- **RTT 数据包往返时间** 从发送到收到确认的时间
- 平均偏差 |∑Pi-P均值|/N N为测定的次数
![img](https://doc.shiyanlou.com/document-uid949121labid10418timestamp1555574119125.png)
可靠性的核心就是报文段的重传。在一个报文段发送时,它会被保存到一个队列中,直至被确认为止。当重传计时器超时,或者发送方收到该队列中第一个报文段的三个重复的 ACK 时,该报文段被重传。
超时重传的概念很简单,就是一定时间内未收到确认,进行再次发送,但是如何计算重传的时间确实 tcp 最复杂的问题之一毕竟要适应各种网络情况。TCP 一个连接期间只有一个 RTO 计时器目前大部分实现都是采用Jacobaon/Karels 算法,详细可以看 RFC6298其计算公式如下
``` sh
第一次rtt计算
SRTT = R
RTTVAR = R/2
RTO = SRTT + max (G, K*RTTVAR) = R + max(G, 2 * R)
K = 4
之后:
RTTVAR = (1 - beta) * RTTVAR + beta * |SRTT - R'| = 0.75 * RTTVAR + 0.25 * |SRTT - R'|
SRTT = (1 - alpha) * SRTT + alpha * R' = 0.875 * SRTT + 0.125 * R'
RTO = SRTT + max (G, K*RTTVAR) = SRTT + max(G, 4 * RTTVAR)
K = 4
```
- SRTT(smoothed round-trip time)平滑 RTT 时间
- RTTVAR(round-trip time variation)RTT 变量,其实就是 rtt 平均偏差
- G 表示系统时钟的粒度一般很小us 级别。beta = 1/4, alpha = 1/8
发送方 TCP 的计时器时间到TCP 发送队列中最前面的报文段(即序列号最小的报文段),并重启计时器。

View File

@@ -14,7 +14,6 @@ import (
"netstack/waiter"
"sync"
"time"
"unsafe"
)
const maxSegmentsPerWake = 100
@@ -502,7 +501,7 @@ func sendTCP(r *stack.Route, id stack.TransportEndpointID, data buffer.Vectorise
// Allocate a buffer for the TCP header.
hdr := buffer.NewPrependable(header.TCPMinimumSize + int(r.MaxHeaderLength()) + optLen)
if rcvWnd > 0xffff {
if rcvWnd > 0xffff { // 65535
rcvWnd = 0xffff
}
@@ -521,7 +520,7 @@ func sendTCP(r *stack.Route, id stack.TransportEndpointID, data buffer.Vectorise
// Only calculate the checksum if offloading isn't supported.
if r.Capabilities()&stack.CapabilityChecksumOffload == 0 {
length := uint16(hdr.UsedLength() + data.Size())
length := uint16(hdr.UsedLength() + data.Size()) // 报文头+数据长度
// tcp伪首部校验和的计算
xsum := r.PseudoHeaderChecksum(ProtocolNumber)
for _, v := range data.Views() {
@@ -640,7 +639,6 @@ func (e *endpoint) handleClose() *tcpip.Error {
// handleSegments 从队列中取出 tcp 段数据,然后处理它们。
func (e *endpoint) handleSegments() *tcpip.Error {
log.Println(unsafe.Pointer(e), "处理报文")
checkRequeue := true
for i := 0; i < maxSegmentsPerWake; i++ {
s := e.segmentQueue.dequeue()
@@ -648,10 +646,18 @@ func (e *endpoint) handleSegments() *tcpip.Error {
checkRequeue = false
break
}
// Invoke the tcp probe if installed.
if e.probe != nil {
e.probe(e.completeState())
}
if s.flagIsSet(flagRst) {
// TODO 如果收到 rst 报文
if e.rcv.acceptable(s.sequenceNumber, 0) {
s.decRef()
return tcpip.ErrConnectionReset
}
} else if s.flagIsSet(flagAck) {
// 处理正常报文
// Patch the window size in the segment according to the

View File

@@ -32,6 +32,19 @@ const (
stateError
)
// SACKInfo holds TCP SACK related information for a given endpoint.
//
// +stateify savable
type SACKInfo struct {
// Blocks is the maximum number of SACK blocks we track
// per endpoint.
Blocks [MaxSACKBlocks]header.SACKBlock
// NumBlocks is the number of valid SACK blocks stored in the
// blocks array above.
NumBlocks int
}
// endpoint 表示TCP端点。该结构用作端点用户和协议实现之间的接口;让并发goroutine调用端点是合法的
// 它们是正确同步的。然而协议实现在单个goroutine中运行。
type endpoint struct {
@@ -100,6 +113,8 @@ type endpoint struct {
// option in the SYN/SYN-ACK.
sackPermitted bool
sack SACKInfo
segmentQueue segmentQueue
// When the send side is closed, the protocol goroutine is notified via
@@ -113,6 +128,14 @@ type endpoint struct {
sndWaker sleep.Waker
sndCloseWaker sleep.Waker
// The following are used when a "packet too big" control packet is
// received. They are protected by sndBufMu. They are used to
// communicate to the main protocol goroutine how many such control
// messages have been received since the last notification was processed
// and what was the smallest MTU seen
packetTooBigCount int
sndMTU int
// notificationWaker is used to indicate to the protocol goroutine that
// it needs to wake up and check for notifications.
notificationWaker sleep.Waker
@@ -132,6 +155,10 @@ type endpoint struct {
rcv *receiver
snd *sender
// probe if not nil is invoked on every received segment. It is passed
// a copy of the current state of the endpoint.
probe stack.TCPProbeFunc
// The following are only used to assist the restore run to re-connect.
bindAddress tcpip.Address
connectingAddress tcpip.Address
@@ -817,3 +844,95 @@ func (e *endpoint) maybeEnableSACKPermitted(synOpts *header.TCPSynOptions) {
e.sackPermitted = true
}
}
// completeState makes a full copy of the endpoint and returns it. This is used
// before invoking the probe. The state returned may not be fully consistent if
// there are intervening syscalls when the state is being copied.
func (e *endpoint) completeState() stack.TCPEndpointState {
var s stack.TCPEndpointState
s.SegTime = time.Now()
// Copy EndpointID.
e.mu.Lock()
s.ID = stack.TCPEndpointID(e.id)
e.mu.Unlock()
// Copy endpoint rcv state.
e.rcvListMu.Lock()
s.RcvBufSize = e.rcvBufSize
s.RcvBufUsed = e.rcvBufUsed
s.RcvClosed = e.rcvClosed
e.rcvListMu.Unlock()
// Endpoint TCP Option state.
s.SendTSOk = e.sendTSOk
s.RecentTS = e.recentTS
s.TSOffset = e.tsOffset
s.SACKPermitted = e.sackPermitted
s.SACK.Blocks = make([]header.SACKBlock, e.sack.NumBlocks)
copy(s.SACK.Blocks, e.sack.Blocks[:e.sack.NumBlocks])
// Copy endpoint send state.
e.sndBufMu.Lock()
s.SndBufSize = e.sndBufSize
s.SndBufUsed = e.sndBufUsed
s.SndClosed = e.sndClosed
s.SndBufInQueue = e.sndBufInQueue
s.PacketTooBigCount = e.packetTooBigCount
s.SndMTU = e.sndMTU
e.sndBufMu.Unlock()
// Copy receiver state.
s.Receiver = stack.TCPReceiverState{
RcvNxt: e.rcv.rcvNxt,
RcvAcc: e.rcv.rcvAcc,
RcvWndScale: e.rcv.rcvWndScale,
PendingBufUsed: e.rcv.pendingBufUsed,
PendingBufSize: e.rcv.pendingBufSize,
}
// Copy sender state.
s.Sender = stack.TCPSenderState{
LastSendTime: e.snd.lastSendTime,
DupAckCount: e.snd.dupAckCount,
//FastRecovery: stack.TCPFastRecoveryState{
// Active: e.snd.fr.active,
// First: e.snd.fr.first,
// Last: e.snd.fr.last,
// MaxCwnd: e.snd.fr.maxCwnd,
//},
SndCwnd: e.snd.sndCwnd,
Ssthresh: e.snd.sndSsthresh,
SndCAAckCount: e.snd.sndCAAckCount,
Outstanding: e.snd.outstanding,
SndWnd: e.snd.sndWnd,
SndUna: e.snd.sndUna,
SndNxt: e.snd.sndNxt,
RTTMeasureSeqNum: e.snd.rttMeasureSeqNum,
RTTMeasureTime: e.snd.rttMeasureTime,
Closed: e.snd.closed,
RTO: e.snd.rto,
SRTTInited: e.snd.srttInited,
MaxPayloadSize: e.snd.maxPayloadSize,
SndWndScale: e.snd.sndWndScale,
MaxSentAck: e.snd.maxSentAck,
}
e.snd.rtt.Lock()
s.Sender.SRTT = e.snd.rtt.srtt
e.snd.rtt.Unlock()
//if cubic, ok := e.snd.cc.(*cubicState); ok {
// s.Sender.Cubic = stack.TCPCubicState{
// WMax: cubic.wMax,
// WLastMax: cubic.wLastMax,
// T: cubic.t,
// TimeSinceLastCongestion: time.Since(cubic.t),
// C: cubic.c,
// K: cubic.k,
// Beta: cubic.beta,
// WC: cubic.wC,
// WEst: cubic.wEst,
// }
//}
return s
}

View File

@@ -17,7 +17,9 @@ type receiver struct {
closed bool
// TODO 需要添加
pendingRcvdSegments segmentHeap
pendingBufUsed seqnum.Size
pendingBufSize seqnum.Size
}
// 新建并初始化接收器
@@ -27,14 +29,20 @@ func newReceiver(ep *endpoint, irs seqnum.Value, rcvWnd seqnum.Size, rcvWndScale
rcvNxt: irs + 1, // 成功建立连接后期望读取的第一个字节序号
rcvAcc: irs.Add(rcvWnd + 1),
rcvWndScale: rcvWndScale,
pendingBufSize: rcvWnd,
}
return r
}
// tcp流量控制判断 segSeq 在窗口內
func (r *receiver) acceptable(segSeq seqnum.Value, segLen seqnum.Size) bool {
// TODO 流量控制
return true
rcvWnd := r.rcvNxt.Size(r.rcvAcc)
if rcvWnd == 0 {
return segLen == 0 && segSeq == r.rcvNxt // 是否卡在边上
}
return segSeq.InWindow(r.rcvNxt, rcvWnd) || // 在窗口内部
seqnum.Overlap(r.rcvNxt, rcvWnd, segSeq, segLen) // 范围有重叠
}
// getSendParams returns the parameters needed by the sender when building
@@ -85,7 +93,7 @@ func (r *receiver) consumeSegment(s *segment, segSeq seqnum.Value, segLen seqnum
r.rcvNxt++
// 收到 fin立即回复 ack
r.ep.snd.sendAck() // FIXME 不应该是 seq+2 捏
r.ep.snd.sendAck()
// 标记接收器关闭
// 触发上层应用可以读取

View File

@@ -0,0 +1,6 @@
package tcp
const (
// MaxSACKBlocks 是接收端存储的最大SACK数
MaxSACKBlocks = 6
)

View File

@@ -0,0 +1,34 @@
package tcp
// tcp段的堆用来暂存失序的tcp段
// 实现了堆排序
type segmentHeap []*segment
// Len returns the length of h.
func (h segmentHeap) Len() int {
return len(h)
}
// Less determines whether the i-th element of h is less than the j-th element.
func (h segmentHeap) Less(i, j int) bool {
return h[i].sequenceNumber.LessThan(h[j].sequenceNumber)
}
// Swap swaps the i-th and j-th elements of h.
func (h segmentHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}
// Push adds x as the last element of h.
func (h *segmentHeap) Push(x interface{}) {
*h = append(*h, x.(*segment))
}
// Pop removes the last element of h and returns it.
func (h *segmentHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[:n-1]
return x
}

View File

@@ -2,9 +2,11 @@ package tcp
import (
"netstack/logger"
"netstack/sleep"
"netstack/tcpip"
"netstack/tcpip/buffer"
"netstack/tcpip/seqnum"
"sync"
"time"
)
@@ -45,6 +47,43 @@ type sender struct {
// lastSendTime 是发送最后一个数据包的时间戳。
lastSendTime time.Time
// dupAckCount is the number of duplicated acks received. It is used for
// fast retransmit.
// dupAckCount 是收到的重复ack数。它用于快速重传。
dupAckCount int
// fr holds state related to fast recovery.
// fr 持有与快速恢复有关的状态。
fr fastRecovery
// sndCwnd is the congestion window, in packets.
// sndCwnd 是拥塞窗口,单位是包
sndCwnd int
// sndSsthresh is the threshold between slow start and congestion
// avoidance.
// sndSsthresh 是慢启动和拥塞避免之间的阈值。
sndSsthresh int
// sndCAAckCount is the number of packets acknowledged during congestion
// avoidance. When enough packets have been ack'd (typically cwnd
// packets), the congestion window is incremented by one.
// sndCAAckCount 是拥塞避免期间确认的数据包数。当已经确认了足够的分组通常是cwnd分组拥塞窗口增加1。
sndCAAckCount int
// outstanding is the number of outstanding packets, that is, packets
// that have been sent but not yet acknowledged.
// outstanding 是正在发送的数据包的数量,即已发送但尚未确认的数据包。
outstanding int
// sndWnd is the send window size.
// 发送窗口大小,单位是字节
sndWnd seqnum.Size
// sndUna is the next unacknowledged sequence number.
// sndUna 是下一个未确认的序列号
sndUna seqnum.Value
// sndNxt 是要发送的下一个段的序列号。
sndNxt seqnum.Value
@@ -53,19 +92,55 @@ type sender struct {
// sndNxtList 是要添加到发送列表的下一个段的序列号。
sndNxtList seqnum.Value
// maxSentAck is the maxium acknowledgement actually sent.
maxSentAck seqnum.Value
// rttMeasureSeqNum is the sequence number being used for the latest RTT
// measurement.
rttMeasureSeqNum seqnum.Value
// rttMeasureTime is the time when the rttMeasureSeqNum was sent.
rttMeasureTime time.Time
closed bool
writeNext *segment
// 发送链表
writeList segmentList
resendTimer timer
resendWaker sleep.Waker
rtt rtt // 往返时间
rto time.Duration // 超时重发时间
srttInited bool
// maxPayloadSize is the maximum size of the payload of a given segment.
// It is initialized on demand.
maxPayloadSize int
// sndWndScale is the number of bits to shift left when reading the send
// window size from a segment.
sndWndScale uint8
// maxSentAck is the maxium acknowledgement actually sent.
maxSentAck seqnum.Value
// cc is the congestion control algorithm in use for this sender.
// cc 是实现拥塞控制算法的接口
cc congestionControl
}
type rtt struct {
sync.Mutex
srtt time.Duration // 平滑 RTT 时间
rttvar time.Duration // rtt 平均偏差 ∑|x-xbar|/n
}
// fastRecovery holds information related to fast recovery from a packet loss.
//
// +stateify savable
// fastRecovery 保存与数据包丢失快速恢复相关的信息
type fastRecovery struct {
active bool
// TODO 需要添加
}
// 新建并初始化发送器 irs是cookies
func newSender(ep *endpoint, iss, irs seqnum.Value, sndWnd seqnum.Size, mss uint16, sndWndScale int) *sender {
s := &sender{

View File

@@ -0,0 +1,91 @@
package tcp
import (
"netstack/sleep"
"time"
)
type timerState int
const (
timerStateDisabled timerState = iota
timerStateEnabled
timerStateOrphaned
)
// 定时器的实现
type timer struct {
state timerState
// target is the expiration time of the current timer. It is only
// meaningful in the enabled state.
target time.Time
// runtimeTarget is the expiration time of the runtime timer. It is
// meaningful in the enabled and orphaned states.
runtimeTarget time.Time
// timer is the runtime timer used to wait on.
timer *time.Timer
}
// init initializes the timer. Once it expires, it the given waker will be
// asserted.
func (t *timer) init(w *sleep.Waker) {
t.state = timerStateDisabled
// Initialize a runtime timer that will assert the waker, then
// immediately stop it.
t.timer = time.AfterFunc(time.Hour, func() {
w.Assert()
})
t.timer.Stop()
}
// cleanup frees all resources associated with the timer.
func (t *timer) cleanup() {
t.timer.Stop()
}
// 检查是否过期
func (t *timer) checkExpiration() bool {
if t.state == timerStateOrphaned {
t.state = timerStateDisabled
return false
}
now := time.Now()
if now.Before(t.target) {
t.runtimeTarget = t.target
t.timer.Reset(t.target.Sub(now)) // ??这一步是为了什么
return false
}
t.state = timerStateDisabled
return true
}
// 关闭计时器 设置其状态为一个孤儿
func (t *timer) disable() {
if t.state != timerStateDisabled {
t.state = timerStateOrphaned
}
}
// 开启计时器
func (t *timer) enable(d time.Duration) {
t.target = time.Now().Add(d)
// Check if we need to set the runtime timer.
if t.state == timerStateDisabled || t.target.Before(t.runtimeTarget) {
t.runtimeTarget = t.target
t.timer.Reset(d)
}
t.state = timerStateEnabled
}
// 检验计时器是否已经启动
func (t *timer) enabled() bool {
return t.state == timerStateEnabled
}