From 94e12c1f213eb9a105c7e1794344d07bd8f00c65 Mon Sep 17 00:00:00 2001 From: impact-eintr Date: Wed, 21 Dec 2022 17:46:16 +0800 Subject: [PATCH] =?UTF-8?q?RTT=20=E8=BF=98=E6=98=AF=E6=B2=A1=E6=95=B4?= =?UTF-8?q?=E6=98=8E=E7=99=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tcpip/link/loopback/loopback.go | 4 ++++ tcpip/transport/tcp/accept.go | 5 +---- tcpip/transport/tcp/connect.go | 2 -- tcpip/transport/tcp/endpoint.go | 10 ++++++++++ tcpip/transport/tcp/rcv.go | 7 +++---- tcpip/transport/tcp/snd.go | 16 ++++++++++++---- 6 files changed, 30 insertions(+), 14 deletions(-) diff --git a/tcpip/link/loopback/loopback.go b/tcpip/link/loopback/loopback.go index e98a4e3..8f7b463 100644 --- a/tcpip/link/loopback/loopback.go +++ b/tcpip/link/loopback/loopback.go @@ -2,10 +2,12 @@ package loopback import ( "fmt" + "math/rand" "netstack/logger" "netstack/tcpip" "netstack/tcpip/buffer" "netstack/tcpip/stack" + "time" ) type endpoint struct { @@ -47,6 +49,8 @@ func (e *endpoint) WritePacket(r *stack.Route, hdr buffer.Prependable, payload b vv := buffer.NewVectorisedView(len(views[0])+payload.Size(), views) // TODO 这里整点活 在特定的情况下丢掉数据报 模拟网络阻塞 + rand.Seed(time.Now().Unix()) + //time.Sleep(time.Duration(rand.Intn(50)+50) * time.Millisecond) e.count++ if e.count == -1 { // 丢掉客户端写入的第二个包 diff --git a/tcpip/transport/tcp/accept.go b/tcpip/transport/tcp/accept.go index 9c46a1b..b6e5148 100644 --- a/tcpip/transport/tcp/accept.go +++ b/tcpip/transport/tcp/accept.go @@ -251,11 +251,8 @@ func (l *listenContext) createEndpointAndPerformHandshake(s *segment, opts *head return nil, err } - // TODO 更新接收窗口扩张因子 + // 更新接收窗口扩张因子 ep.rcv.rcvWndScale = h.effectiveRcvWndScale() - logger.GetInstance().Info(logger.HANDSHAKE, func() { - log.Println("ep.rcv.rcvWndScale", ep.rcv.rcvWndScale) - }) return ep, nil } diff --git a/tcpip/transport/tcp/connect.go b/tcpip/transport/tcp/connect.go index 3534cb9..6642d4d 100644 --- a/tcpip/transport/tcp/connect.go +++ b/tcpip/transport/tcp/connect.go @@ -712,8 +712,6 @@ func (e *endpoint) handleClose() *tcpip.Error { // handleSegments 从队列中取出 tcp 段数据,然后处理它们。 func (e *endpoint) handleSegments() *tcpip.Error { checkRequeue := true - // FIXME 用于更清楚地DUBUG 之后删掉 - time.Sleep(100 * time.Millisecond) for i := 0; i < maxSegmentsPerWake; i++ { s := e.segmentQueue.dequeue() diff --git a/tcpip/transport/tcp/endpoint.go b/tcpip/transport/tcp/endpoint.go index 74ef553..dfb87a1 100644 --- a/tcpip/transport/tcp/endpoint.go +++ b/tcpip/transport/tcp/endpoint.go @@ -130,6 +130,8 @@ type endpoint struct { // recentTS is the timestamp that should be sent in the TSEcr field of // the timestamp for future segments sent by the endpoint. This field is // updated if required when a new segment is received by this endpoint. + // recentTS 是应该在端点发送的未来段的时间戳的 TSEcr 字段中发送的时间戳。 + // 当此端点接收到新段时,如果需要,此字段会更新。 recentTS uint32 // tsOffset is a randomized offset added to the value of the @@ -1173,6 +1175,14 @@ func (e *endpoint) receiveBufferSize() int { return size } +// updateRecentTimestamp updates the recent timestamp using the algorithm +// described in https://tools.ietf.org/html/rfc7323#section-4.3 +func (e *endpoint) updateRecentTimestamp(tsVal uint32, maxSentAck seqnum.Value, segSeq seqnum.Value) { + if e.sendTSOk && seqnum.Value(e.recentTS).LessThan(seqnum.Value(tsVal)) && segSeq. LessThanEq(maxSentAck) { + e.recentTS = tsVal + } +} + // maybeEnableTimestamp marks the timestamp option enabled for this endpoint if // the SYN options indicate that timestamp option was negotiated. It also // initializes the recentTS with the value provided in synOpts.TSval. diff --git a/tcpip/transport/tcp/rcv.go b/tcpip/transport/tcp/rcv.go index 8488f1c..b2203f2 100644 --- a/tcpip/transport/tcp/rcv.go +++ b/tcpip/transport/tcp/rcv.go @@ -3,7 +3,6 @@ package tcp import ( "container/heap" "fmt" - "log" "netstack/logger" "netstack/tcpip/seqnum" ) @@ -100,9 +99,9 @@ func (r *receiver) consumeSegment(s *segment, segSeq seqnum.Value, segLen seqnum logger.GetInstance().Info(logger.TCP, func() { }) - if r.ep.id.LocalPort == 9999 { - log.Println(r) - } + //if r.ep.id.LocalPort == 9999 { + // log.Println(r) + //} // 修剪SACK块以删除任何涵盖已消耗序列号的SACK信息。 TrimSACKBlockList(&r.ep.sack, r.rcvNxt) diff --git a/tcpip/transport/tcp/snd.go b/tcpip/transport/tcp/snd.go index da10a9e..7146d60 100644 --- a/tcpip/transport/tcp/snd.go +++ b/tcpip/transport/tcp/snd.go @@ -203,6 +203,7 @@ func newSender(ep *endpoint, iss, irs seqnum.Value, sndWnd seqnum.Size, mss uint sndNxt: iss + 1, // 缓存长度为0 sndNxtList: iss + 1, rto: 1 * time.Second, + rttMeasureSeqNum: iss + 1, lastSendTime: time.Now(), maxPayloadSize: int(mss), maxSentAck: irs + 1, @@ -377,7 +378,7 @@ func (s *sender) resendSegment() { // sequence number. // 根据给定的参数,负载数据、flags标记和序列号来发送数据 func (s *sender) sendSegment(data buffer.VectorisedView, flags byte, seq seqnum.Value) *tcpip.Error { - s.lastSendTime = time.Now() + s.lastSendTime = time.Now() // 发送时间 if seq == s.rttMeasureSeqNum { s.rttMeasureTime = s.lastSendTime } @@ -398,6 +399,8 @@ func (s *sender) handleRcvdSegment(seg *segment) { s.rttMeasureSeqNum = s.sndNxt } + s.ep.updateRecentTimestamp(seg.parsedOptions.TSVal, s.maxSentAck, seg.sequenceNumber) + // tcp的拥塞控制:检查是否有重复的ack,是否进入快速重传和快速恢复状态 rtx := s.checkDuplicateAck(seg) @@ -405,12 +408,16 @@ func (s *sender) handleRcvdSegment(seg *segment) { s.sndWnd = seg.window // 获取确认号 ack := seg.ackNumber + if s.ep.id.LocalPort != 9999 { + logger.NOTICE("进入处理ack报文", atoi(ack-1), atoi(s.sndUna), atoi(s.sndNxt)) + } // 如果ack在最小未确认的seq和segNext之间 if (ack - 1).InRange(s.sndUna, s.sndNxt) { // 收到了东西 就暂停计时 s.resendTimer.disable() // NOTE 一个RTT 结束 + logger.NOTICE(time.Duration(seg.parsedOptions.TSEcr).String()) if s.ep.sendTSOk && seg.parsedOptions.TSEcr != 0 { // TSVal/Ecr values sent by Netstack are at a millisecond // granularity. @@ -467,9 +474,9 @@ func (s *sender) handleRcvdSegment(seg *segment) { s.resendSegment() } - if s.ep.id.LocalPort != 9999 { - log.Println(s) - } + //if s.ep.id.LocalPort != 9999 { + // log.Println(s) + //} // 现在某些待处理数据已被确认,或者窗口打开,或者由于快速恢复期间出现重复的ack而导致拥塞窗口膨胀, // 因此发送更多数据。如果需要,这也将重新启用重传计时器。 @@ -577,6 +584,7 @@ func (s *sender) sendData() { // TODO } + // 发送包 开始计算RTT s.sendSegment(seg.data, seg.flags, seg.sequenceNumber) // 发送一个数据段后,更新sndNxt if s.sndNxt.LessThan(segEnd) {