快速重传

This commit is contained in:
impact-eintr
2022-12-27 17:44:45 +08:00
parent 823e76e979
commit 5e99462726
5 changed files with 15 additions and 17 deletions

View File

@@ -149,7 +149,7 @@ func main() {
go func() { go func() {
cnt := 0 cnt := 0
time.Sleep(3 * time.Second) time.Sleep(2 * time.Second)
for { for {
// 一个慢读者 才能体现出网络的情况 // 一个慢读者 才能体现出网络的情况
buf := make([]byte, 1024) buf := make([]byte, 1024)

View File

@@ -51,7 +51,7 @@ func (e *endpoint) WritePacket(r *stack.Route, hdr buffer.Prependable, payload b
//time.Sleep(time.Duration(rand.Intn(50)+50) * time.Millisecond) //time.Sleep(time.Duration(rand.Intn(50)+50) * time.Millisecond)
e.count++ e.count++
if e.count == 10 { // 丢掉客户端写入的第二个包 if e.count == 6 { // 丢掉客户端写入的第二个包
logger.NOTICE(fmt.Sprintf("统计 %d 丢掉这个报文", e.count)) logger.NOTICE(fmt.Sprintf("统计 %d 丢掉这个报文", e.count))
return nil return nil
} }

View File

@@ -171,7 +171,7 @@ func (r *receiver) handleRcvdSegment(s *segment) {
// Immediately send an ack so that the peer knows it may // Immediately send an ack so that the peer knows it may
// have to retransmit. // have to retransmit.
logger.NOTICE("统计非顺序到达", atoi(cnt)) //logger.NOTICE("统计非顺序到达", atoi(cnt), " rcv.go 175")
cnt++ cnt++
r.ep.snd.sendAck() r.ep.snd.sendAck()

View File

@@ -53,7 +53,7 @@ func (r *renoState) updateCongestionAvoidance(packetsAcked int) {
// 它将 sndSsthresh 变为 outstanding 的一半。 // 它将 sndSsthresh 变为 outstanding 的一半。
// sndSsthresh 最小为2因为至少要比丢包后的拥塞窗口cwnd=1来的大才会进入慢启动阶段。 // sndSsthresh 最小为2因为至少要比丢包后的拥塞窗口cwnd=1来的大才会进入慢启动阶段。
func (r *renoState) reduceSlowStartThreshold() { func (r *renoState) reduceSlowStartThreshold() {
r.s.sndSsthresh = r.s.sndSsthresh/2 r.s.sndSsthresh = r.s.outstanding/2
if r.s.sndSsthresh < 2 { if r.s.sndSsthresh < 2 {
r.s.sndSsthresh = 2 r.s.sndSsthresh = 2
} }

View File

@@ -16,11 +16,11 @@ import (
const ( const (
// minRTO is the minimum allowed value for the retransmit timeout. // minRTO is the minimum allowed value for the retransmit timeout.
minRTO = 200 * time.Millisecond minRTO = 2000 * time.Millisecond
// InitialCwnd is the initial congestion window. // InitialCwnd is the initial congestion window.
// 初始拥塞窗口大小 // 初始拥塞窗口大小
InitialCwnd = 1 InitialCwnd = 4
// nDupAckThreshold is the number of duplicate ACK's required // nDupAckThreshold is the number of duplicate ACK's required
// before fast-retransmit is entered. // before fast-retransmit is entered.
@@ -353,7 +353,6 @@ func (s *sender) updateRTO(rtt time.Duration) {
if s.rto < minRTO { if s.rto < minRTO {
s.rto = minRTO s.rto = minRTO
} }
logger.NOTICE("更新RTO", s.rto.String()," RTT:", rtt.String())
} }
// resendSegment resends the first unacknowledged segment. // resendSegment resends the first unacknowledged segment.
@@ -369,13 +368,13 @@ func (s *sender) resendSegment() {
// Resend the segment. // Resend the segment.
if seg := s.writeList.Front(); seg != nil { if seg := s.writeList.Front(); seg != nil {
logger.NOTICE("重复发送...") logger.NOTICE("重复收到3个ack报文 启动快速重传... : ", atoi(seg.sequenceNumber))
s.sendSegment(seg.data, seg.flags, seg.sequenceNumber) s.sendSegment(seg.data, seg.flags, seg.sequenceNumber)
} }
} }
// sendSegment sends a new segment containing the given payload, flags and // sendSegment sends a new segment containing the given payload, flags and
// sequence number. // eequence number.
// 根据给定的参数负载数据、flags标记和序列号来发送数据 // 根据给定的参数负载数据、flags标记和序列号来发送数据
func (s *sender) sendSegment(data buffer.VectorisedView, flags byte, seq seqnum.Value) *tcpip.Error { func (s *sender) sendSegment(data buffer.VectorisedView, flags byte, seq seqnum.Value) *tcpip.Error {
s.lastSendTime = time.Now() // 发送时间 s.lastSendTime = time.Now() // 发送时间
@@ -408,20 +407,20 @@ func (s *sender) handleRcvdSegment(seg *segment) {
s.sndWnd = seg.window s.sndWnd = seg.window
// 获取确认号 // 获取确认号
ack := seg.ackNumber ack := seg.ackNumber
if s.ep.id.LocalPort != 9999 { //if s.ep.id.LocalPort != 9999 {
logger.NOTICE("进入处理ack报文", atoi(ack-1), atoi(s.sndUna), atoi(s.sndNxt)) // logger.NOTICE("进入处理ack报文", atoi(ack-1), atoi(s.sndUna), atoi(s.sndNxt))
} //}
// 如果ack在最小未确认的seq和segNext之间 // 如果ack在最小未确认的seq和segNext之间
if (ack - 1).InRange(s.sndUna, s.sndNxt) { if (ack - 1).InRange(s.sndUna, s.sndNxt) {
// 收到了东西 就暂停计时 // 收到了东西 就暂停计时
s.resendTimer.disable() s.resendTimer.disable()
// NOTE 一个RTT 结束 // NOTE 一个RTT 结束
logger.NOTICE(time.Duration(seg.parsedOptions.TSEcr).String())
if s.ep.sendTSOk && seg.parsedOptions.TSEcr != 0 { if s.ep.sendTSOk && seg.parsedOptions.TSEcr != 0 {
// TSVal/Ecr values sent by Netstack are at a millisecond // TSVal/Ecr values sent by Netstack are at a millisecond
// granularity. // granularity.
elapsed := time.Duration(s.ep.timestamp()-seg.parsedOptions.TSEcr) * time.Millisecond elapsed := time.Duration(s.ep.timestamp()-seg.parsedOptions.TSEcr) * time.Millisecond
//logger.NOTICE("snd 424 ", elapsed.String())
s.updateRTO(elapsed) s.updateRTO(elapsed)
} }
// 获取这次确认的字节数,即 ack - snaUna // 获取这次确认的字节数,即 ack - snaUna
@@ -470,7 +469,6 @@ func (s *sender) handleRcvdSegment(seg *segment) {
// tcp拥塞控制 快速重传 // tcp拥塞控制 快速重传
if rtx { if rtx {
logger.NOTICE("重复收到3个ack报文 启动快速重传...")
s.resendSegment() s.resendSegment()
} }
@@ -507,7 +505,8 @@ func (s *sender) retransmitTimerExpired() bool {
s.writeNext = s.writeList.Front() s.writeNext = s.writeList.Front()
// 重新发送数据包 // 重新发送数据包
logger.NOTICE("暂时关闭超时重发", s.rto.String()) logger.NOTICE("暂时关闭超时重发", s.rto.String())
//s.sendData() panic(nil)
s.sendData()
return true return true
} }
@@ -599,6 +598,7 @@ func (s *sender) sendData() {
if !s.resendTimer.enabled() && s.sndUna != s.sndNxt { if !s.resendTimer.enabled() && s.sndUna != s.sndNxt {
// NOTE 开启计时器 如果在RTO后没有回信(snd.handleRecvdSegment 中有数据可以处理) 那么将会重发 // NOTE 开启计时器 如果在RTO后没有回信(snd.handleRecvdSegment 中有数据可以处理) 那么将会重发
// 在 s.resendTimer.init() 中 将会调用 Assert() 唤醒重发函数 retransmitTimerExpired() // 在 s.resendTimer.init() 中 将会调用 Assert() 唤醒重发函数 retransmitTimerExpired()
logger.NOTICE("snd.go 602 ", s.rto.String())
s.resendTimer.enable(s.rto) s.resendTimer.enable(s.rto)
} }
@@ -634,7 +634,6 @@ func (s *sender) leaveFastRecovery() {
// Deflate cwnd. It had been artificially inflated when new dups arrived. // Deflate cwnd. It had been artificially inflated when new dups arrived.
s.sndCwnd = s.sndSsthresh s.sndCwnd = s.sndSsthresh
s.cc.PostRecovery() s.cc.PostRecovery()
logger.NOTICE("退出快速恢复")
} }
@@ -644,7 +643,6 @@ func (s *sender) checkDuplicateAck(seg *segment) (rtx bool) {
ack := seg.ackNumber ack := seg.ackNumber
// 已经启动了快速恢复 // 已经启动了快速恢复
if s.fr.active { if s.fr.active {
log.Fatal("启动了快速恢复")
if !ack.InRange(s.sndUna, s.sndNxt+1) { if !ack.InRange(s.sndUna, s.sndNxt+1) {
return false return false
} }