From 407f4c0b4261617ae78371350e202f11901038d1 Mon Sep 17 00:00:00 2001 From: impact-eintr Date: Fri, 16 Dec 2022 12:23:30 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8F=91=E9=80=81=E6=8E=A5=E6=94=B6=E7=AA=97?= =?UTF-8?q?=E5=8F=AF=E8=A7=86=E5=8C=96Debug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/netstack/main.go | 2 +- logger/logger.go | 28 +++++++++++++++++--- tcpip/transport/tcp/README.md | 11 ++++++++ tcpip/transport/tcp/accept.go | 5 +++- tcpip/transport/tcp/connect.go | 47 ++++++++++++++++++++++++++++++--- tcpip/transport/tcp/endpoint.go | 13 +++++---- tcpip/transport/tcp/rcv.go | 23 +++++++++++++++- tcpip/transport/tcp/segment.go | 2 +- tcpip/transport/tcp/snd.go | 23 +++++++++------- 9 files changed, 125 insertions(+), 29 deletions(-) diff --git a/cmd/netstack/main.go b/cmd/netstack/main.go index b2c7cfa..1be28be 100644 --- a/cmd/netstack/main.go +++ b/cmd/netstack/main.go @@ -171,7 +171,7 @@ func main() { time.Sleep(time.Second) log.Printf("\n\n客户端 写入数据") - buf := make([]byte, 1<<21) + buf := make([]byte, 1<<17) conn.Write(buf) time.Sleep(1 * time.Minute) conn.Close() diff --git a/logger/logger.go b/logger/logger.go index a795673..882487a 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -1,7 +1,9 @@ package logger import ( + "fmt" "log" + "strings" "sync" ) @@ -56,14 +58,32 @@ func (l *logger) info(f func()) { f() } -func TODO(msg string) { +func TODO(msg string, v ...string) { GetInstance().info(func() { - log.Println("TODO: " + msg) + log.Printf("\033[1;37;41mTODO: %s\033[0m\n", msg+" "+strings.Join(v, " ")) }) } -func FIXME(msg string) { +func FIXME(msg string, v ...string) { GetInstance().info(func() { - log.Fatal("FIXME: " + msg) + log.Fatalf("\033[1;37;41mFIXME: %s\033[0m\n", msg+" "+strings.Join(v, " ")) }) } + +func NOTICE(msg string, v ...string) { + GetInstance().info(func() { + log.Printf("\033[1;37;41mNOTICE: %s\033[0m\n", msg+" "+strings.Join(v, " ")) + }) +} + +func COLORS() { + for b := 40; b <= 47; b++ { // 背景色彩 = 40-47 + for f := 30; f <= 37; f++ { // 前景色彩 = 30-37 + for d := range []int{0, 1, 4, 5, 7, 8} { // 显示方式 = 0,1,4,5,7,8 + fmt.Printf(" %c[%d;%d;%dm%s(f=%d,b=%d,d=%d)%c[0m ", 0x1B, d, b, f, "", f, b, d, 0x1B) + } + fmt.Println("") + } + fmt.Println("") + } +} diff --git a/tcpip/transport/tcp/README.md b/tcpip/transport/tcp/README.md index 8052d3a..ff89e44 100644 --- a/tcpip/transport/tcp/README.md +++ b/tcpip/transport/tcp/README.md @@ -264,6 +264,17 @@ K = 4 ![img](https://doc.shiyanlou.com/document-uid949121labid10418timestamp1555574248321.png) +发送窗口的上限值 = Min [rwnd, cwnd],cwnd 拥塞窗口 + +上图中分成了四个部分,分别是:(其中那个黑模型就是滑动窗口) + +1. 已收到 ack 确认的数据 +2. 已经发送,但还没收到 ack 的数据 +3. 在窗口中还没有发出的(接收方还有空间) +4. 窗口以外的数据(接收方没空间) + +滑动: 当发送端收到数据 ack 确认时,窗口向右滑 + ## Zero Window 如果一个处理缓慢的 Server(接收端)是怎么把 Client(发送端)的 TCP Sliding Window 给降成 0 的。此时,你一定会问,如果 Window 变成 0 了,TCP 会怎么样?是不是发送端就不发数据了?是的,发送端就不发数据了,你可以想像成“Window Closed”,那你一定还会问,如果发送端不发数据了,接收方一会儿 Window size 可用了,怎么通知发送端呢? diff --git a/tcpip/transport/tcp/accept.go b/tcpip/transport/tcp/accept.go index 76c6248..420c33e 100644 --- a/tcpip/transport/tcp/accept.go +++ b/tcpip/transport/tcp/accept.go @@ -212,6 +212,9 @@ func (l *listenContext) createConnectedEndpoint(s *segment, iss seqnum.Value, // The receiver at least temporarily has a zero receive window scale, // but the caller may change it (before starting the protocol loop). n.snd = newSender(n, iss, irs, s.window, rcvdSynOpts.MSS, rcvdSynOpts.WS) + logger.GetInstance().Info(logger.HANDSHAKE, func() { + //log.Println("服务端握手成功 服务端的sender", n.snd) + }) n.rcv = newReceiver(n, irs, l.rcvWnd, 0) return n, nil @@ -251,7 +254,7 @@ func (l *listenContext) createEndpointAndPerformHandshake(s *segment, opts *head // TODO 更新接收窗口扩张因子 ep.rcv.rcvWndScale = h.effectiveRcvWndScale() logger.GetInstance().Info(logger.HANDSHAKE, func() { - log.Println("rp.rcv.rcvWndScale", ep.rcv.rcvWndScale) + log.Println("ep.rcv.rcvWndScale", ep.rcv.rcvWndScale) }) return ep, nil diff --git a/tcpip/transport/tcp/connect.go b/tcpip/transport/tcp/connect.go index f5975fc..f38a149 100644 --- a/tcpip/transport/tcp/connect.go +++ b/tcpip/transport/tcp/connect.go @@ -247,6 +247,7 @@ func (h *handshake) synSentState(s *segment) *tcpip.Error { h.flags |= flagAck h.mss = rcvSynOpts.MSS h.sndWndScale = rcvSynOpts.WS + logger.NOTICE(atoi(h.sndWnd), atoi(h.sndWndScale), atoi(h.rcvWnd)) // If this is a SYN ACK response, we only need to acknowledge the SYN // and the handshake is completed. @@ -255,6 +256,7 @@ func (h *handshake) synSentState(s *segment) *tcpip.Error { // 客户端握手完成,发送 ack 报文给服务端 h.state = handshakeCompleted // 最后依次 ack 报文丢了也没关系,因为后面一但发送任何数据包都是带ack的 + // 这里要求对端缩减窗口 h.ep.sendRaw(buffer.VectorisedView{}, flagAck, h.iss+1, h.ackNum, h.rcvWnd>>h.effectiveRcvWndScale()) return nil } @@ -286,7 +288,6 @@ func (h *handshake) synSentState(s *segment) *tcpip.Error { // 正常情况下,会调用该函数来处理第三次 ack 报文 func (h *handshake) synRcvdState(s *segment) *tcpip.Error { if s.flagIsSet(flagRst) { - // TODO 需要根据窗口返回 等理解了窗口后再写 // RFC 793, page 37, states that in the SYN-RCVD state, a reset // is acceptable if the sequence number is in the window. if s.sequenceNumber.InWindow(h.ackNum, h.rcvWnd) { @@ -302,13 +303,41 @@ func (h *handshake) synRcvdState(s *segment) *tcpip.Error { // 如果是syn报文,且序列号对应不上,那么返回 rst if s.flagIsSet(flagSyn) && s.sequenceNumber != h.ackNum-1 { // TODO 返回RST报文 + // We received two SYN segments with different sequence + // numbers, so we reset this and restart the whole + // process, except that we don't reset the timer. + ack := s.sequenceNumber.Add(s.logicalLen()) + seq := seqnum.Value(0) + if s.flagIsSet(flagAck) { + seq = s.ackNumber + } + h.ep.sendRaw(buffer.VectorisedView{}, flagRst|flagAck, seq, ack, 0) + + if !h.active { + return tcpip.ErrInvalidEndpointState + } + + if err := h.resetState(); err != nil { + return err + } + synOpts := header.TCPSynOptions{ + WS: h.rcvWndScale, + TS: h.ep.sendTSOk, + TSVal: h.ep.timestamp(), + TSEcr: h.ep.recentTS, + SACKPermitted: h.ep.sackPermitted, + } + sendSynTCP(&s.route, h.ep.id, h.flags, h.iss, h.ackNum, h.rcvWnd, synOpts) return nil } // 如果时ack报文 表示三次握手已经完成 if s.flagIsSet(flagAck) { log.Println("TCP STATE ESTABLISHED") - // TODO 修改时间戳 + if h.ep.sendTSOk && !s.parsedOptions.TS { + h.ep.stack.Stats().DroppedPackets.Increment() + return nil + } h.state = handshakeCompleted return nil } @@ -320,7 +349,10 @@ func (h *handshake) synRcvdState(s *segment) *tcpip.Error { func (h *handshake) handleSegment(s *segment) *tcpip.Error { h.sndWnd = s.window if !s.flagIsSet(flagSyn) && h.sndWndScale > 0 { - h.sndWnd <<= uint8(h.sndWndScale) // 收紧窗口 + h.sndWnd <<= uint8(h.sndWndScale) // 收紧发送窗口 + logger.NOTICE("扩张发送窗口到", atoi(h.sndWnd)) + } else { + logger.NOTICE("原有发送窗口与服务端的接收窗口大小相同", atoi(h.sndWnd)) } switch h.state { @@ -543,6 +575,7 @@ func sendTCP(r *stack.Route, id stack.TransportEndpointID, data buffer.Vectorise if rcvWnd > 0xffff { // 65535 rcvWnd = 0xffff + logger.NOTICE("告诉对端 我的接收窗口为", atoi(rcvWnd)) } // Initialize the header. @@ -576,7 +609,9 @@ func sendTCP(r *stack.Route, id stack.TransportEndpointID, data buffer.Vectorise r.Stats().TCP.ResetsSent.Increment() } - log.Printf("TCP 发送 [%s] 报文片段到 %s, seq: |%d|, ack: %d, rcvWnd: %d", + logger.GetInstance().Info(logger.TCP, func() { + }) + log.Printf("TCP 发送 [%s] 报文片段到 %s, seq: %d, ack: %d, 可接收rcvWnd: %d", flagString(flags), fmt.Sprintf("%s:%d", id.RemoteAddress, id.RemotePort), seq, ack, rcvWnd) @@ -703,6 +738,7 @@ func (e *endpoint) handleSegments() *tcpip.Error { // Patch the window size in the segment according to the // send window scale. s.window <<= e.snd.sndWndScale + logger.NOTICE("这里进行了发送窗口的扩张", atoi(s.window)) // If the timestamp option is negotiated and the segment // does not carry a timestamp option then the segment // must be dropped as per @@ -781,6 +817,9 @@ func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error { // 到这里就表示三次握手已经成功了,那么初始化发送器和接收器 e.snd = newSender(e, h.iss, h.ackNum-1, h.sndWnd, h.mss, h.sndWndScale) + logger.GetInstance().Info(logger.HANDSHAKE, func() { + //log.Println("客户端握手成功 客户端的sender", e.snd) + }) e.rcvListMu.Lock() e.rcv = newReceiver(e, h.ackNum-1, h.rcvWnd, h.effectiveRcvWndScale()) diff --git a/tcpip/transport/tcp/endpoint.go b/tcpip/transport/tcp/endpoint.go index c69659e..f3bf12d 100644 --- a/tcpip/transport/tcp/endpoint.go +++ b/tcpip/transport/tcp/endpoint.go @@ -194,9 +194,6 @@ func newEndpoint(stack *stack.Stack, netProto tcpip.NetworkProtocolNumber, waite e.cc = cs } - log.Println(e.sndBufSize, e.rcvBufSize, e.cc) - - // TODO 需要添加 e.segmentQueue.setLimit(2 * e.rcvBufSize) e.workMu.Init() e.workMu.Lock() @@ -800,9 +797,11 @@ func (e *endpoint) HandlePacket(r *stack.Route, id stack.TransportEndpointID, vv if _, err := e.GetRemoteAddress(); err != nil { prefix = "监听者" } - log.Printf(prefix+"收到 tcp [%s] 报文片段 from %s, seq: %d, ack: |%d|", - flagString(s.flags), fmt.Sprintf("%s:%d", s.id.RemoteAddress, s.id.RemotePort), - s.sequenceNumber, s.ackNumber) + logger.GetInstance().Info(logger.TCP, func() { + log.Printf(prefix+"收到 tcp [%s] 报文片段 from %s, seq: %d, ack: |%d|", + flagString(s.flags), fmt.Sprintf("%s:%d", s.id.RemoteAddress, s.id.RemotePort), + s.sequenceNumber, s.ackNumber) + }) // 对于 端口监听者 listener 而言这里唤醒的是 protocolListenLoop // 对于普通tcp连接 conn 而言这里唤醒的是 protocolMainLoop @@ -827,7 +826,7 @@ func (e *endpoint) updateSndBufferUsage(v int) { e.sndBufMu.Unlock() if notify { // 如果缓存中剩余的数据过多是不需要补充的 e.waiterQueue.Notify(waiter.EventOut) - //log.Println("提醒 用户层的 Write() 继续写入") + log.Println("提醒 用户层的 Write() 继续写入") } } diff --git a/tcpip/transport/tcp/rcv.go b/tcpip/transport/tcp/rcv.go index c398725..9784b9f 100644 --- a/tcpip/transport/tcp/rcv.go +++ b/tcpip/transport/tcp/rcv.go @@ -2,6 +2,7 @@ package tcp import ( "container/heap" + "fmt" "log" "netstack/logger" "netstack/tcpip/seqnum" @@ -59,7 +60,7 @@ func (r *receiver) getSendParams() (rcvNxt seqnum.Value, rcvWnd seqnum.Size) { r.rcvAcc = acc } - log.Println("-------------", n, acc, r.rcvNxt.Size(r.rcvAcc)>>r.rcvWndScale) + //log.Println("-------------", n, acc, r.rcvNxt.Size(r.rcvAcc)>>r.rcvWndScale) return r.rcvNxt, r.rcvNxt.Size(r.rcvAcc) >> r.rcvWndScale } @@ -90,6 +91,7 @@ func (r *receiver) consumeSegment(s *segment, segSeq seqnum.Value, segLen seqnum r.rcvNxt = segSeq.Add(segLen) logger.GetInstance().Info(logger.TCP, func() { }) + log.Println(r) // 修剪SACK块以删除任何涵盖已消耗序列号的SACK信息。 TrimSACKBlockList(&r.ep.sack, r.rcvNxt) @@ -132,6 +134,7 @@ func (r *receiver) handleRcvdSegment(s *segment) { // tcp流量控制:判断该数据段的序列号是否在接收窗口内,如果不在,立即返回ack给对端。 if !r.acceptable(segSeq, segLen) { + log.Fatal("发了太多") r.ep.snd.sendAck() return } @@ -180,3 +183,21 @@ func (r *receiver) handleRcvdSegment(s *segment) { | | LastByteRead LastByteRecv */ + +var fmtRecver string = `%s + +------>% 10s <-----+ + | | +-----------------+-------------+-------------+------------------------+ +| ANR %10s | not revived | rcvd unack | able rcv | +-----------------+-------------+-------------+------------------------+ +^ ^ ^ +| | | +% 10s % 10s % 10s` + +func (r receiver) String() string { + return fmt.Sprintf(fmtRecver, + atoi(r.ep.id.LocalPort), + atoi(r.rcvNxt.Size(r.rcvAcc)), + atoi(r.ep.rcvBufUsed), + atoi(r.rcvNxt-seqnum.Value(r.ep.rcvBufUsed)), atoi(r.rcvNxt), atoi(r.rcvAcc)) +} diff --git a/tcpip/transport/tcp/segment.go b/tcpip/transport/tcp/segment.go index a6b718b..d166a43 100644 --- a/tcpip/transport/tcp/segment.go +++ b/tcpip/transport/tcp/segment.go @@ -62,7 +62,7 @@ type segment struct { sequenceNumber seqnum.Value // tcp序号 第一个字节在整个报文的位置 ackNumber seqnum.Value // 确认号 希望继续获取的下一个字节序号 flags uint8 - window seqnum.Size + window seqnum.Size // NOTE 这里是本地的接收窗口大小 不是发送窗口 // parsedOptions stores the parsed values from the options in the segment. parsedOptions header.TCPOptions options []byte diff --git a/tcpip/transport/tcp/snd.go b/tcpip/transport/tcp/snd.go index 445333c..9f8d7ca 100644 --- a/tcpip/transport/tcp/snd.go +++ b/tcpip/transport/tcp/snd.go @@ -181,7 +181,7 @@ type fastRecovery struct { func newSender(ep *endpoint, iss, irs seqnum.Value, sndWnd seqnum.Size, mss uint16, sndWndScale int) *sender { s := &sender{ ep: ep, - sndCwnd: InitialCwnd, + sndCwnd: InitialCwnd, // TODO 暂时写死 tcp拥塞窗口 决定了发送窗口的初始大小 sndWnd: sndWnd, sndUna: iss + 1, sndNxt: iss + 1, // 缓存长度为0 @@ -402,9 +402,9 @@ func (s *sender) handleRcvdSegment(seg *segment) { // TODO tcp拥塞控制 if s.writeList.Front() != nil { - log.Println(s) //log.Fatal(s.sndNxt, " 确认成功 继续发送 ", seg.sequenceNumber) } + log.Println(s) s.sendData() } @@ -462,8 +462,9 @@ func (s *sender) sendData() { s.outstanding++ segEnd = seg.sequenceNumber.Add(seqnum.Size(seg.data.Size())) - log.Println("发送窗口一开始是", s.sndWnd, + log.Println("发送窗口是", s.sndWnd, "最多发送数据", available, + "缓存数据头", seg.sequenceNumber, "缓存数据尾", segEnd, "发送端缓存包数量", s.outstanding) } @@ -497,9 +498,9 @@ func (s *sender) sendData() { } -var fmtSender string = ` - +-----> % 10s <------+ - | | +var fmtSender string = `%s + +-----> % 10s <------+ + | Scale % 4s | -----------------+-------------+-------------+------------------ | 已确认 |UAC% 10s|NXT% 10s| 不可发送 -----------------+-------------+-------------+------------------ @@ -508,11 +509,13 @@ var fmtSender string = ` % 10s % 10s` func (s sender) String() string { - return fmt.Sprintf(fmtSender, atoi(uint32(s.sndWnd)), - atoi(uint32(s.sndNxt-s.sndUna)), atoi(s.ep.sndBufUsed), - atoi(uint32(s.sndUna)), atoi(uint32(s.sndNxt))) + return fmt.Sprintf(fmtSender, + atoi(s.ep.id.LocalPort), + atoi(s.sndWnd), atoi(s.sndWndScale), + atoi(s.sndNxt-s.sndUna), atoi(s.ep.sndBufSize-s.ep.sndBufUsed), + atoi(s.sndUna), atoi(s.sndNxt)) } -func atoi[T int | int8 | int16 | int32 | int64 | uint | uint8 | uint16 | uint32](i T) string { +func atoi[T int | int8 | int16 | int32 | int64 | uint | uint8 | uint16 | uint32 | seqnum.Size | seqnum.Value](i T) string { return fmt.Sprintf("%d", i) }