慢启动 倍速增长在哪里实现的?

This commit is contained in:
impact-eintr
2022-12-19 18:40:12 +08:00
parent 41ce15ba7b
commit c2dccc5c49
4 changed files with 43 additions and 21 deletions

View File

@@ -49,7 +49,7 @@ func (e *endpoint) WritePacket(r *stack.Route, hdr buffer.Prependable, payload b
// TODO 这里整点活 在特定的情况下丢掉数据报 模拟网络阻塞 // TODO 这里整点活 在特定的情况下丢掉数据报 模拟网络阻塞
e.count++ e.count++
if e.count == 6 { // 丢掉客户端写入的第二个包 if e.count == -1 { // 丢掉客户端写入的第二个包
logger.NOTICE(fmt.Sprintf("统计 %d 丢掉这个报文", e.count)) logger.NOTICE(fmt.Sprintf("统计 %d 丢掉这个报文", e.count))
return nil return nil
} }

View File

@@ -199,8 +199,8 @@ func (h *handshake) checkAck(s *segment) bool {
// incoming segment acknowledges something not yet sent. The // incoming segment acknowledges something not yet sent. The
// connection remains in the same state. // connection remains in the same state.
// TODO 返回一个RST报文 // TODO 返回一个RST报文
//ack := s.sequenceNumber.Add(s.logicalLen()) ack := s.sequenceNumber.Add(s.logicalLen())
//h.ep.sendRaw(buffer.VectorisedView{}, flagRst|flagAck, s.ackNumber, ack, 0) h.ep.sendRaw(buffer.VectorisedView{}, flagRst|flagAck, s.ackNumber, ack, 0)
return false return false
} }
@@ -657,9 +657,9 @@ func (e *endpoint) makeOptions(sackBlocks []header.SACKBlock) []byte {
func (e *endpoint) sendRaw(data buffer.VectorisedView, flags byte, seq, ack seqnum.Value, rcvWnd seqnum.Size) *tcpip.Error { func (e *endpoint) sendRaw(data buffer.VectorisedView, flags byte, seq, ack seqnum.Value, rcvWnd seqnum.Size) *tcpip.Error {
var sackBlocks []header.SACKBlock var sackBlocks []header.SACKBlock
// TODO 填充配置 // TODO 填充配置
//if e.state == stateConnected && e.rcv.pendingBufSize > 0 && (flags&flagAck != 0) { if e.state == stateConnected && e.rcv.pendingBufSize > 0 && (flags&flagAck != 0) {
// sackBlocks = e.sack.Blocks[:e.sack.NumBlocks] sackBlocks = e.sack.Blocks[:e.sack.NumBlocks]
//} }
options := e.makeOptions(sackBlocks) options := e.makeOptions(sackBlocks)
err := sendTCP(&e.route, e.id, data, e.route.DefaultTTL(), flags, seq, ack, rcvWnd, options) err := sendTCP(&e.route, e.id, data, e.route.DefaultTTL(), flags, seq, ack, rcvWnd, options)
putOptions(options) putOptions(options)

View File

@@ -26,14 +26,14 @@ func (r *renoState) updateSlowStart(packetsAcked int) int {
packetsAcked -= newcwnd - r.s.sndCwnd packetsAcked -= newcwnd - r.s.sndCwnd
// 更新拥塞窗口 // 更新拥塞窗口
r.s.sndCwnd = newcwnd r.s.sndCwnd = newcwnd
logger.NOTICE("慢启动 reno Update 新的拥塞窗口大小: ", atoi(r.s.sndCwnd)) logger.NOTICE("慢启动中。。。 reno Update 新的拥塞窗口大小: ", atoi(r.s.sndCwnd))
return packetsAcked return packetsAcked
} }
// updateCongestionAvoidance 将在拥塞避免模式下更新拥塞窗口, // updateCongestionAvoidance 将在拥塞避免模式下更新拥塞窗口,
// 如RFC5681第3.1节所述 // 如RFC5681第3.1节所述
func (r *renoState) updateCongestionAvoidance(packetsAcked int) { func (r *renoState) updateCongestionAvoidance(packetsAcked int) {
logger.FIXME("超过阈值后调整拥塞窗口 拥塞避免阶段")
} }
// 当检测到网络拥塞时,调用 reduceSlowStartThreshold。 // 当检测到网络拥塞时,调用 reduceSlowStartThreshold。
@@ -69,8 +69,8 @@ func (r *renoState) Update(packetsAcked int) {
return return
} }
} }
// TODO // 进入拥塞避免阶段
logger.FIXME("超过阈值后调整拥塞窗口") r.updateCongestionAvoidance(packetsAcked)
} }
func (r *renoState) PostRecovery() { func (r *renoState) PostRecovery() {

View File

@@ -185,7 +185,11 @@ type rtt struct {
// fastRecovery 保存与数据包丢失快速恢复相关的信息 // fastRecovery 保存与数据包丢失快速恢复相关的信息
type fastRecovery struct { type fastRecovery struct {
active bool active bool
// TODO 需要添加 // TODO 需要解释
first seqnum.Value
last seqnum.Value
maxCwnd int
} }
// 新建并初始化发送器 irs是cookies // 新建并初始化发送器 irs是cookies
@@ -307,7 +311,6 @@ func (s *sender) updateRTO(rtt time.Duration) {
s.rtt.rttvar = rtt / 2 s.rtt.rttvar = rtt / 2
s.srttInited = true s.srttInited = true
} else { } else {
log.Println("之后的计算")
// |rtt-srtt| 标准差 // |rtt-srtt| 标准差
diff := s.rtt.srtt - rtt diff := s.rtt.srtt - rtt
if diff < 0 { if diff < 0 {
@@ -349,6 +352,7 @@ func (s *sender) updateRTO(rtt time.Duration) {
if s.rto < minRTO { if s.rto < minRTO {
s.rto = minRTO s.rto = minRTO
} }
logger.NOTICE("更新RTO RTT", s.rto.String(), rtt.String())
} }
// resendSegment resends the first unacknowledged segment. // resendSegment resends the first unacknowledged segment.
@@ -390,7 +394,6 @@ func (s *sender) sendSegment(data buffer.VectorisedView, flags byte, seq seqnum.
func (s *sender) handleRcvdSegment(seg *segment) { func (s *sender) handleRcvdSegment(seg *segment) {
// 如果rtt测量seq小于ack num更新rto // 如果rtt测量seq小于ack num更新rto
if !s.ep.sendTSOk && s.rttMeasureSeqNum.LessThan(seg.ackNumber) { if !s.ep.sendTSOk && s.rttMeasureSeqNum.LessThan(seg.ackNumber) {
log.Fatal("测试")
s.updateRTO(time.Now().Sub(s.rttMeasureTime)) s.updateRTO(time.Now().Sub(s.rttMeasureTime))
s.rttMeasureSeqNum = s.sndNxt s.rttMeasureSeqNum = s.sndNxt
} }
@@ -410,8 +413,8 @@ func (s *sender) handleRcvdSegment(seg *segment) {
if s.ep.sendTSOk && seg.parsedOptions.TSEcr != 0 { if s.ep.sendTSOk && seg.parsedOptions.TSEcr != 0 {
// TSVal/Ecr values sent by Netstack are at a millisecond // TSVal/Ecr values sent by Netstack are at a millisecond
// granularity. // granularity.
//elapsed := time.Duration(s.ep.timestamp()-seg.parsedOptions.TSEcr) * time.Millisecond elapsed := time.Duration(s.ep.timestamp()-seg.parsedOptions.TSEcr) * time.Millisecond
//s.updateRTO(elapsed) s.updateRTO(elapsed)
} }
// 获取这次确认的字节数,即 ack - snaUna // 获取这次确认的字节数,即 ack - snaUna
acked := s.sndUna.Size(ack) acked := s.sndUna.Size(ack)
@@ -507,6 +510,7 @@ func (s *sender) sendData() {
// 如果TCP在超过重新传输超时的时间间隔内没有发送数据TCP应该在开始传输之前将cwnd设置为不超过RW。 // 如果TCP在超过重新传输超时的时间间隔内没有发送数据TCP应该在开始传输之前将cwnd设置为不超过RW。
if !s.fr.active && time.Now().Sub(s.lastSendTime) > s.rto { if !s.fr.active && time.Now().Sub(s.lastSendTime) > s.rto {
log.Fatal("重置sndCwnd")
if s.sndCwnd > InitialCwnd { if s.sndCwnd > InitialCwnd {
s.sndCwnd = InitialCwnd s.sndCwnd = InitialCwnd
} }
@@ -588,8 +592,6 @@ func (s *sender) sendData() {
// NOTE 开启计时器 如果在RTO后没有回信(snd.handleRecvdSegment 中有数据可以处理) 那么将会重发 // NOTE 开启计时器 如果在RTO后没有回信(snd.handleRecvdSegment 中有数据可以处理) 那么将会重发
// 在 s.resendTimer.init() 中 将会调用 Assert() 唤醒重发函数 retransmitTimerExpired() // 在 s.resendTimer.init() 中 将会调用 Assert() 唤醒重发函数 retransmitTimerExpired()
s.resendTimer.enable(s.rto) s.resendTimer.enable(s.rto)
logger.NOTICE("注意测试 RTO")
log.Println("RTO: ", s.rto)
} }
// NOTE 如果我们的发送窗口被缩到0 我们需要定时去问一下对端消费完了没 // NOTE 如果我们的发送窗口被缩到0 我们需要定时去问一下对端消费完了没
@@ -598,15 +600,35 @@ func (s *sender) sendData() {
} }
} }
// 进入快速恢复和相应的处理 // 进入快速恢复和相应的处理 快速重传和快速恢复算法一般同时使用。
// 快速恢复算法是认为,你还有 3 个Duplicated Acks回来说明网络也不那么糟糕所以没有必要像 RTO 超时那么强烈
func (s *sender) enterFastRecovery() { func (s *sender) enterFastRecovery() {
s.fr.active = true s.fr.active = true
// 注意正如前面所说进入快速重传之前sshthresh 已被更新ssthresh = max (cwnd/2, 2)然后真正的Fast Recovery算法如下
// 1. cwnd = sshthresh + 33 的意思是确认有 3 个数据包被收到了)
// 2. 重传重复 ACKs 指定的数据包
// 3. 如果再收到重复 Acks那么cwnd = cwnd + 1如果收到了新的 Ack那么cwnd = sshthresh然后就进入了拥塞避免的算法了。
s.sndCwnd = s.sndSsthresh + 3 s.sndCwnd = s.sndSsthresh + 3
//s.fr.first = s.sndUna s.fr.first = s.sndUna
//s.fr.last = s.sndNxt - 1 s.fr.last = s.sndNxt - 1
//s.fr.maxCwnd = s.sndCwnd + s.outstanding logger.NOTICE("快速恢复的范围: ", atoi(s.fr.first), atoi(s.fr.last), atoi(s.fr.last-s.fr.first)) // 一般是4个报文的长度
s.fr.maxCwnd = s.sndCwnd + s.outstanding
} }
// tcp拥塞控制退出快速恢复状态和相应的处理
func (s *sender) leaveFastRecovery() {
s.fr.active = false
s.fr.first = 0
s.fr.last = s.sndNxt - 1
s.fr.maxCwnd = 0
s.dupAckCount = 0
// Deflate cwnd. It had been artificially inflated when new dups arrived.
s.sndCwnd = s.sndSsthresh
s.cc.PostRecovery()
}
// tcp拥塞控制收到确认时调用 checkDuplicateAck。它管理与重复确认相关的状态 // tcp拥塞控制收到确认时调用 checkDuplicateAck。它管理与重复确认相关的状态
// 并根据RFC 6582NewReno中的规则确定是否需要重新传输 // 并根据RFC 6582NewReno中的规则确定是否需要重新传输
func (s *sender) checkDuplicateAck(seg *segment) (rtx bool) { func (s *sender) checkDuplicateAck(seg *segment) (rtx bool) {