From c2dccc5c49be3d5edc34f618f98a21c7537f427f Mon Sep 17 00:00:00 2001 From: impact-eintr Date: Mon, 19 Dec 2022 18:40:12 +0800 Subject: [PATCH] =?UTF-8?q?=E6=85=A2=E5=90=AF=E5=8A=A8=20=E5=80=8D?= =?UTF-8?q?=E9=80=9F=E5=A2=9E=E9=95=BF=E5=9C=A8=E5=93=AA=E9=87=8C=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0=E7=9A=84=3F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tcpip/link/loopback/loopback.go | 2 +- tcpip/transport/tcp/connect.go | 10 ++++---- tcpip/transport/tcp/reno.go | 8 +++--- tcpip/transport/tcp/snd.go | 44 ++++++++++++++++++++++++--------- 4 files changed, 43 insertions(+), 21 deletions(-) diff --git a/tcpip/link/loopback/loopback.go b/tcpip/link/loopback/loopback.go index 79efaba..e98a4e3 100644 --- a/tcpip/link/loopback/loopback.go +++ b/tcpip/link/loopback/loopback.go @@ -49,7 +49,7 @@ func (e *endpoint) WritePacket(r *stack.Route, hdr buffer.Prependable, payload b // TODO 这里整点活 在特定的情况下丢掉数据报 模拟网络阻塞 e.count++ - if e.count == 6 { // 丢掉客户端写入的第二个包 + if e.count == -1 { // 丢掉客户端写入的第二个包 logger.NOTICE(fmt.Sprintf("统计 %d 丢掉这个报文", e.count)) return nil } diff --git a/tcpip/transport/tcp/connect.go b/tcpip/transport/tcp/connect.go index b40219d..8eff144 100644 --- a/tcpip/transport/tcp/connect.go +++ b/tcpip/transport/tcp/connect.go @@ -199,8 +199,8 @@ func (h *handshake) checkAck(s *segment) bool { // incoming segment acknowledges something not yet sent. The // connection remains in the same state. // TODO 返回一个RST报文 - //ack := s.sequenceNumber.Add(s.logicalLen()) - //h.ep.sendRaw(buffer.VectorisedView{}, flagRst|flagAck, s.ackNumber, ack, 0) + ack := s.sequenceNumber.Add(s.logicalLen()) + h.ep.sendRaw(buffer.VectorisedView{}, flagRst|flagAck, s.ackNumber, ack, 0) return false } @@ -657,9 +657,9 @@ func (e *endpoint) makeOptions(sackBlocks []header.SACKBlock) []byte { func (e *endpoint) sendRaw(data buffer.VectorisedView, flags byte, seq, ack seqnum.Value, rcvWnd seqnum.Size) *tcpip.Error { var sackBlocks []header.SACKBlock // TODO 填充配置 - //if e.state == stateConnected && e.rcv.pendingBufSize > 0 && (flags&flagAck != 0) { - // sackBlocks = e.sack.Blocks[:e.sack.NumBlocks] - //} + if e.state == stateConnected && e.rcv.pendingBufSize > 0 && (flags&flagAck != 0) { + sackBlocks = e.sack.Blocks[:e.sack.NumBlocks] + } options := e.makeOptions(sackBlocks) err := sendTCP(&e.route, e.id, data, e.route.DefaultTTL(), flags, seq, ack, rcvWnd, options) putOptions(options) diff --git a/tcpip/transport/tcp/reno.go b/tcpip/transport/tcp/reno.go index 740bde1..d65d3ea 100644 --- a/tcpip/transport/tcp/reno.go +++ b/tcpip/transport/tcp/reno.go @@ -26,14 +26,14 @@ func (r *renoState) updateSlowStart(packetsAcked int) int { packetsAcked -= newcwnd - r.s.sndCwnd // 更新拥塞窗口 r.s.sndCwnd = newcwnd - logger.NOTICE("慢启动 reno Update 新的拥塞窗口大小: ", atoi(r.s.sndCwnd)) + logger.NOTICE("慢启动中。。。 reno Update 新的拥塞窗口大小: ", atoi(r.s.sndCwnd)) return packetsAcked } // updateCongestionAvoidance 将在拥塞避免模式下更新拥塞窗口, // 如RFC5681第3.1节所述 func (r *renoState) updateCongestionAvoidance(packetsAcked int) { - + logger.FIXME("超过阈值后调整拥塞窗口 拥塞避免阶段") } // 当检测到网络拥塞时,调用 reduceSlowStartThreshold。 @@ -69,8 +69,8 @@ func (r *renoState) Update(packetsAcked int) { return } } - // TODO - logger.FIXME("超过阈值后调整拥塞窗口") + // 进入拥塞避免阶段 + r.updateCongestionAvoidance(packetsAcked) } func (r *renoState) PostRecovery() { diff --git a/tcpip/transport/tcp/snd.go b/tcpip/transport/tcp/snd.go index e0c129c..76f6d53 100644 --- a/tcpip/transport/tcp/snd.go +++ b/tcpip/transport/tcp/snd.go @@ -185,7 +185,11 @@ type rtt struct { // fastRecovery 保存与数据包丢失快速恢复相关的信息 type fastRecovery struct { active bool - // TODO 需要添加 + // TODO 需要解释 + first seqnum.Value + last seqnum.Value + + maxCwnd int } // 新建并初始化发送器 irs是cookies @@ -307,7 +311,6 @@ func (s *sender) updateRTO(rtt time.Duration) { s.rtt.rttvar = rtt / 2 s.srttInited = true } else { - log.Println("之后的计算") // |rtt-srtt| 标准差 diff := s.rtt.srtt - rtt if diff < 0 { @@ -349,6 +352,7 @@ func (s *sender) updateRTO(rtt time.Duration) { if s.rto < minRTO { s.rto = minRTO } + logger.NOTICE("更新RTO RTT", s.rto.String(), rtt.String()) } // resendSegment resends the first unacknowledged segment. @@ -390,7 +394,6 @@ func (s *sender) sendSegment(data buffer.VectorisedView, flags byte, seq seqnum. func (s *sender) handleRcvdSegment(seg *segment) { // 如果rtt测量seq小于ack num,更新rto if !s.ep.sendTSOk && s.rttMeasureSeqNum.LessThan(seg.ackNumber) { - log.Fatal("测试") s.updateRTO(time.Now().Sub(s.rttMeasureTime)) s.rttMeasureSeqNum = s.sndNxt } @@ -410,8 +413,8 @@ func (s *sender) handleRcvdSegment(seg *segment) { if s.ep.sendTSOk && seg.parsedOptions.TSEcr != 0 { // TSVal/Ecr values sent by Netstack are at a millisecond // granularity. - //elapsed := time.Duration(s.ep.timestamp()-seg.parsedOptions.TSEcr) * time.Millisecond - //s.updateRTO(elapsed) + elapsed := time.Duration(s.ep.timestamp()-seg.parsedOptions.TSEcr) * time.Millisecond + s.updateRTO(elapsed) } // 获取这次确认的字节数,即 ack - snaUna acked := s.sndUna.Size(ack) @@ -507,6 +510,7 @@ func (s *sender) sendData() { // 如果TCP在超过重新传输超时的时间间隔内没有发送数据,TCP应该在开始传输之前将cwnd设置为不超过RW。 if !s.fr.active && time.Now().Sub(s.lastSendTime) > s.rto { + log.Fatal("重置sndCwnd") if s.sndCwnd > InitialCwnd { s.sndCwnd = InitialCwnd } @@ -588,8 +592,6 @@ func (s *sender) sendData() { // NOTE 开启计时器 如果在RTO后没有回信(snd.handleRecvdSegment 中有数据可以处理) 那么将会重发 // 在 s.resendTimer.init() 中 将会调用 Assert() 唤醒重发函数 retransmitTimerExpired() s.resendTimer.enable(s.rto) - logger.NOTICE("注意测试 RTO") - log.Println("RTO: ", s.rto) } // NOTE 如果我们的发送窗口被缩到0 我们需要定时去问一下对端消费完了没 @@ -598,15 +600,35 @@ func (s *sender) sendData() { } } -// 进入快速恢复和相应的处理 +// 进入快速恢复和相应的处理 快速重传和快速恢复算法一般同时使用。 +// 快速恢复算法是认为,你还有 3 个Duplicated Acks回来,说明网络也不那么糟糕,所以没有必要像 RTO 超时那么强烈 func (s *sender) enterFastRecovery() { s.fr.active = true + // 注意,正如前面所说,进入快速重传之前,sshthresh 已被更新ssthresh = max (cwnd/2, 2)然后,真正的Fast Recovery算法如下: + // 1. cwnd = sshthresh + 3(3 的意思是确认有 3 个数据包被收到了) + // 2. 重传重复 ACKs 指定的数据包 + // 3. 如果再收到重复 Acks,那么cwnd = cwnd + 1;如果收到了新的 Ack,那么,cwnd = sshthresh,然后就进入了拥塞避免的算法了。 s.sndCwnd = s.sndSsthresh + 3 - //s.fr.first = s.sndUna - //s.fr.last = s.sndNxt - 1 - //s.fr.maxCwnd = s.sndCwnd + s.outstanding + s.fr.first = s.sndUna + s.fr.last = s.sndNxt - 1 + logger.NOTICE("快速恢复的范围: ", atoi(s.fr.first), atoi(s.fr.last), atoi(s.fr.last-s.fr.first)) // 一般是4个报文的长度 + s.fr.maxCwnd = s.sndCwnd + s.outstanding } +// tcp拥塞控制:退出快速恢复状态和相应的处理 +func (s *sender) leaveFastRecovery() { + s.fr.active = false + s.fr.first = 0 + s.fr.last = s.sndNxt - 1 + s.fr.maxCwnd = 0 + s.dupAckCount = 0 + + // Deflate cwnd. It had been artificially inflated when new dups arrived. + s.sndCwnd = s.sndSsthresh + s.cc.PostRecovery() +} + + // tcp拥塞控制:收到确认时调用 checkDuplicateAck。它管理与重复确认相关的状态, // 并根据RFC 6582(NewReno)中的规则确定是否需要重新传输 func (s *sender) checkDuplicateAck(seg *segment) (rtx bool) {