From bda93571035b95cbc4ac284b09d0f4fe5fe39ad0 Mon Sep 17 00:00:00 2001 From: impact-eintr Date: Wed, 4 Jan 2023 11:31:11 +0800 Subject: [PATCH] =?UTF-8?q?=E8=BF=9E=E6=8E=A5=E7=9A=84=E5=BB=BA=E7=AB=8B?= =?UTF-8?q?=20=E6=95=B0=E6=8D=AE=E7=9A=84=E6=94=B6=E5=8F=91=20=E6=96=BD?= =?UTF-8?q?=E5=B7=A5=E4=B8=AD=E3=80=82=E3=80=82=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/netstack/main.go | 4 +- note/README.md | 1048 ++++++++++++++++++++++++++++++ tcpip/stack/nic.go | 2 + tcpip/stack/transport_demuxer.go | 2 + tcpip/transport/tcp/accept.go | 47 +- tcpip/transport/tcp/connect.go | 34 +- tcpip/transport/tcp/endpoint.go | 6 +- tcpip/transport/tcp/snd.go | 15 +- 8 files changed, 1127 insertions(+), 31 deletions(-) diff --git a/cmd/netstack/main.go b/cmd/netstack/main.go index 1a94356..0f989be 100644 --- a/cmd/netstack/main.go +++ b/cmd/netstack/main.go @@ -193,8 +193,8 @@ func main() { log.Printf("客户端 建立连接\n\n客户端 写入数据\n") - size := 1 << 11 - for i := 0; i < 100; i++ { + size := 1 << 10 + for i := 0; i < 3; i++ { //conn.Write([]byte("Hello Netstack")) conn.Write(make([]byte, size)) } diff --git a/note/README.md b/note/README.md index 0a260e6..fc2804e 100644 --- a/note/README.md +++ b/note/README.md @@ -847,6 +847,1054 @@ func (e *endpoint) WritePacket(r *stack.Route, hdr buffer.Prependable, ``` ``` go +// DeliverNetworkPacket 当 NIC 从物理接口接收数据包时,将调用函数 DeliverNetworkPacket,用来分发网络层数据包。 +// 比如 protocol 是 arp 协议号,那么会找到arp.HandlePacket来处理数据报。 +// 简单来说就是根据网络层协议和目的地址来找到相应的网络层端,将网络层数据发给它, +// 当前实现的网络层协议有 arp、ipv4 和 ipv6。 +func (n *NIC) DeliverNetworkPacket(linkEP LinkEndpoint, remoteLinkAddr, localLinkAddr tcpip.LinkAddress, + protocol tcpip.NetworkProtocolNumber, vv buffer.VectorisedView) { + // 检查本协议栈是否注册过该网络协议 + netProto, ok := n.stack.networkProtocols[protocol] + if !ok { + n.stack.stats.UnknownProtocolRcvdPackets.Increment() + return + } + + // 网络层协议状态统计 + if netProto.Number() == header.IPv4ProtocolNumber || netProto.Number() == header.IPv6ProtocolNumber { + n.stack.stats.IP.PacketsReceived.Increment() + } + + // 报文内容过小 判断为受损报文 丢弃 + if len(vv.First()) < netProto.MinimumPacketSize() { + n.stack.stats.MalformedRcvdPackets.Increment() + return + } + // 解析源 IP 和目标IP + src, dst := netProto.ParseAddresses(vv.First()) + // 根据网络协议和数据包的目的地址,找到绑定该目标地址的网络端 + if ref := n.getRef(protocol, dst); ref != nil { + // 路由 源 与 目标 反转 + r := makeRoute(protocol, dst, src, linkEP.LinkAddress(), ref) + r.RemoteLinkAddress = remoteLinkAddr + // 将数据包分发给网络层 + ref.ep.HandlePacket(&r, vv) + ref.decRef() + return + } + + // 如果配置了允许转发 什么意思呢 + // 就是说当前网卡并没有找到目标IP 我们来试试本机的其他网卡 + // 其他网卡-其他网卡上的一个可用地址-目标地址 + if n.stack.Forwarding() { + r, err := n.stack.FindRoute(0, dst, src, protocol) // FIXME 将dst和src调转? + if err != nil { + n.stack.stats.IP.InvalidAddressesReceived.Increment() + return + } + defer r.Release() + + r.LocalLinkAddress = n.linkEP.LinkAddress() + r.RemoteLinkAddress = remoteLinkAddr + + // Found a NIC. + n := r.ref.nic + n.mu.RLock() + ref, ok := n.endpoints[NetworkEndpointID{dst}] // 检查这张网卡是否绑定了目标IP + n.mu.RUnlock() + + if ok && ref.tryIncRef() { + ref.ep.HandlePacket(&r, vv) + logger.NOTICE("转发数据") + ref.decRef() + } else { + // n doesn't have a destination endpoint. + // Send the packet out of n. + hdr := buffer.NewPrependableFromView(vv.First()) + vv.RemoveFirst() + n.linkEP.WritePacket(&r, hdr, vv, protocol) + } + return + } + + n.stack.stats.IP.InvalidAddressesReceived.Increment() +} + ``` +转发模式我并不能确保理解正确。 + +可以看到,网卡找到对方要求处理的IP所绑定的网络端后,调用其HandlePacket 进行网络层数据的分发。 + +看一下IP报文如何处理: + + +``` go +// HandlePacket is called by the link layer when new ipv4 packets arrive for +// this endpoint. +// 收到ip包的处理 +func (e *endpoint) HandlePacket(r *stack.Route, vv buffer.VectorisedView) { + // 得到ip报文 + h := header.IPv4(vv.First()) + // 检查报文是否有效 + if !h.IsValid(vv.Size()) { + return + } + logger.GetInstance().Info(logger.IP, func() { + log.Println(h) + }) + + hlen := int(h.HeaderLength()) + tlen := int(h.TotalLength()) + vv.TrimFront(hlen) + vv.CapLength(tlen - hlen) + + // 报文重组 + more := (h.Flags() & header.IPv4FlagMoreFragments) != 0 + // 是否需要ip重组 + if more || h.FragmentOffset() != 0 { + // The packet is a fragment, let's try to reassemble it. + last := h.FragmentOffset() + uint16(vv.Size()) - 1 + var ready bool + // ip分片重组 + vv, ready = e.fragmentation.Process(hash.IPv4FragmentHash(h), h.FragmentOffset(), last, more, vv) + if !ready { + return + } + } + + // 得到传输层的协议 + p := h.TransportProtocol() + // 如果时ICMP协议,则进入ICMP处理函数 + if p == header.ICMPv4ProtocolNumber { + e.handleICMP(r, vv) + return + } + r.Stats().IP.PacketsDelivered.Increment() + // 根据协议分发到不同处理函数,比如协议时TCP,会进入tcp.HandlePacket + logger.GetInstance().Info(logger.IP, func() { + log.Printf("准备前往 UDP/TCP recv ipv4 packet %d bytes, proto: 0x%x", tlen, p) + }) + e.dispatcher.DeliverTransportPacket(r, p, vv) +} +``` + +可以发现,IP报文有一个分片重组的机制,IP报文最大可以总长65535,但是以太网可承载布料这么多数据,所以需要分片发送,给同一IP报文的不同分片编号,接受者收到片段后缓存并进行堆排序,当所有分片均收到以后,将排好序的数据一次性分发给传输层。 + + +``` go +// 缓存并排序 + if r.updateHoles(first, last, more) { + // We store the incoming packet only if it filled some holes. + heap.Push(&r.heap, fragment{offset: first, vv: vv.Clone(nil)}) + consumed = vv.Size() + r.size += consumed + } + +// 全部收集后组合数据 + for h.Len() > 0 { + curr := heap.Pop(h).(fragment) + if int(curr.offset) < size { + curr.vv.TrimFront(size - int(curr.offset)) // 截取重复的部分 + } else if int(curr.offset) > size { + return buffer.VectorisedView{}, fmt.Errorf("packet has a hole, expected offset %d, got %d", size, curr.offset) + } + // curr.offset == size 没有空洞 紧密排布 + size += curr.vv.Size() + views = append(views, curr.vv.Views()...) + } + return buffer.NewVectorisedView(size, views), nil +``` + +我们的连接在这一步需要进一步地分发,这仍将是网卡来实现的。 + +``` go +// DeliverTransportPacket delivers packets to the appropriate +// transport protocol endpoint. +func (n *NIC) DeliverTransportPacket(r *Route, protocol tcpip.TransportProtocolNumber, vv buffer.VectorisedView) { + // 先查找协议栈是否注册了该传输层协议 + state, ok := n.stack.transportProtocols[protocol] + if !ok { + n.stack.stats.UnknownProtocolRcvdPackets.Increment() + return + } + transProto := state.proto + // 如果报文长度比该协议最小报文长度还小,那么丢弃它 + if len(vv.First()) < transProto.MinimumPacketSize() { + n.stack.stats.MalformedRcvdPackets.Increment() + return + } + // 解析报文得到源端口和目的端口 + srcPort, dstPort, err := transProto.ParsePorts(vv.First()) + if err != nil { + n.stack.stats.MalformedRcvdPackets.Increment() + return + } + id := TransportEndpointID{dstPort, r.LocalAddress, srcPort, r.RemoteAddress} + // 调用分流器,根据传输层协议和传输层id分发数据报文 + // 现在本网卡中尝试分发 + if n.demux.deliverPacket(r, protocol, vv, id) { + return + } + // 本网卡中没有目标六元组 在整个协议栈尝试分发 + if n.stack.demux.deliverPacket(r, protocol, vv, id) { + return + } + // ... +} + +``` + + +``` go +// 根据传输层的id来找到对应的传输端,再将数据包交给这个传输端处理 +func (d *transportDemuxer) deliverPacket(r *Route, protocol tcpip.TransportProtocolNumber, vv buffer.VectorisedView, id TransportEndpointID) bool { + // 先看看分流器里有没有注册相关协议端,如果没有则返回false + eps, ok := d.protocol[protocolIDs{r.NetProto, protocol}] + if !ok { + return false + } + // 从 eps 中找符合 id 的传输端 + eps.mu.RLock() + ep := d.findEndpointLocked(eps, vv, id) + eps.mu.RUnlock() + + if ep == nil { + return false + } + + // Deliver the packet + ep.HandlePacket(r, id, vv) + + return true +} + +// 根据传输层id来找到相应的传输层端 +// 当本地没有存在连接的时候 只有 LocalAddr:LocalPort 监听的传输端 也就是客户端来建立新连接 +// 当本地存在连接的时候 就有可能找到 LAddr:LPort+RAddr:RPort 的传输端 +func (d *transportDemuxer) findEndpointLocked(eps *transportEndpoints, + vv buffer.VectorisedView, id TransportEndpointID) TransportEndpoint { + if ep := eps.endpoints[id]; ep != nil { // IPv4:udp + return ep + } + // Try to find a match with the id minus the local address. + nid := id + // 如果上面的 endpoints 没有找到,那么去掉本地ip地址,看看有没有相应的传输层端 + // 因为有时候传输层监听的时候没有绑定本地ip,也就是 any address,此时的 LocalAddress + // 为空。 + nid.LocalAddress = "" + if ep := eps.endpoints[nid]; ep != nil { + return ep + } + + // Try to find a match with the id minus the remote part. + // listener 的情况 本地没有这个 dstIP+dstPort:srcIP+srcPort 的连接交由 + // ""+0:srcIP+srcPort的Listener来处理 + nid.LocalAddress = id.LocalAddress + nid.RemoteAddress = "" + nid.RemotePort = 0 + if ep := eps.endpoints[nid]; ep != nil { + return ep + } + + // Try to find a match with only the local port. + nid.LocalAddress = "" + return eps.endpoints[nid] +} +``` + + +这里需要解释一下,我们提到过对于任意一个传输层数据流,它应当唯一标识为 `网络层协议-传输层协议-目标IP-目标端口-本地IP-本地端口`的一个六元组,协议栈负责保存这个六元组。 + +这里会讲的比较乱,因为前面没有铺垫,所以我们需要了解tcp的一部分通信过程。 + +``` go + // 首先我们需要绑定一个端口 + if err := ep.Bind(tcpip.FullAddress{NIC: 1, Addr: addr, Port: uint16(localPort)}, nil); err != nil { + log.Fatal("Bind failed: ", err) + } + + // 然后我们开始监听这个tcp端点 + if err := ep.Listen(10); err != nil { + log.Fatal("Listen failed: ", err) + } + + // 使用Accept从listen的tcp端点中可以获取一个新的tcp端点 + conn, err := listener.Accept() + if err != nil { + log.Println(err) + } + log.Println("服务端 建立连接") + + // 我们使用这个新的tcp端点可以与客户端进行通信 + +``` + +在我们Listen之后,listener这个tcp端点就对应着处理所有 ""+0:srcIP+srcPort的情况,也就是一个客户端创建新连接,服务端没有任何此连接的信息(dstIP+dstPort:srcIP+srcPort),所以就有了下面的逻辑: + +``` go +func (e *endpoint) HandlePacket(r *stack.Route, id stack.TransportEndpointID, vv buffer.VectorisedView) { + // ... + if e.segmentQueue.enqueue(s) { + // 对于 端口监听者 listener 而言这里唤醒的是 protocolListenLoop + // 对于普通tcp连接 conn 而言这里唤醒的是 protocolMainLoop + e.newSegmentWaker.Assert() + } +} +``` + +这样我们就解析完了传输层以外的主机网络栈,并将传输层数据分发到了正确的端点。 + +那么传输层协议是如何实现的呢,首先,传输层是建立在上面的主机网络栈之上的,无需关注底层的细节。 + +#### 连接的建立 + +``` go + c flag s +生成ISN1 | | + sync_sent|------sync---->|sync_rcvd + | | + | |生成ISN2 + established|<--sync|ack----| + | | + | | + |------ack----->|established +``` + +一个经典的三次握手,我们使用一个handshake对象对其进行管理 + + +``` go + +// protocolMainLoop 是TCP协议的主循环。它在自己的goroutine中运行,负责握手、发送段和处理收到的段 +func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error { + // ... + + // 处理三次握手 + if handshake { + h, err := newHandshake(e, seqnum.Size(e.receiveBufferAvailable())) + logger.GetInstance().Info(logger.HANDSHAKE, func() { + log.Println("TCP STATE SENT") + }) + if err == nil { + // 执行握手 + err = h.execute() + } + // 处理握手有错 + if err != nil { + // ... + return err + } + + // 到这里就表示三次握手已经成功了,那么初始化发送器和接收器 + e.snd = newSender(e, h.iss, h.ackNum-1, h.sndWnd, h.mss, h.sndWndScale) + logger.GetInstance().Info(logger.HANDSHAKE, func() { + log.Println("客户端握手成功 客户端的sender", e.snd) + }) + + e.rcvListMu.Lock() + e.rcv = newReceiver(e, h.ackNum-1, h.rcvWnd, h.effectiveRcvWndScale()) + e.rcvListMu.Unlock() + } + + // ... +} +``` + +``` go + +func (h *handshake) execute() *tcpip.Error { + // 是否需要拿到下一条地址 + if h.ep.route.IsResolutionRequired() { + if err := h.resolveRoute(); err != nil { + return err + } + } + + // Initialize the resend timer. + // 初始化重传定时器 + resendWaker := sleep.Waker{} + // 设置1s超时 + timeOut := time.Duration(time.Second) + rt := time.AfterFunc(timeOut, func() { + resendWaker.Assert() + }) + defer rt.Stop() + + // Set up the wakers. + s := sleep.Sleeper{} + s.AddWaker(&resendWaker, wakerForResend) + s.AddWaker(&h.ep.notificationWaker, wakerForNotification) + s.AddWaker(&h.ep.newSegmentWaker, wakerForNewSegment) + defer s.Done() + + // 开启SCAK .... + + // 表示服务端收到了syn报文 + if h.state == handshakeSynRcvd { + synOpts.TS = h.ep.sendTSOk + synOpts.SACKPermitted = h.ep.sackPermitted && bool(sackEnabled) + } + + // 如果是客户端发送 syn 报文,如果是服务端发送 syn+ack 报文 + sendSynTCP(&h.ep.route, h.ep.id, h.flags, h.iss, h.ackNum, h.rcvWnd, synOpts) + + for h.state != handshakeCompleted { + // 获取事件id + switch index, _ := s.Fetch(true); index { + case wakerForResend: // NOTE tcp超时重传机制 + // 如果是客户端当发送 syn 报文,超过一定的时间未收到回包,触发超时重传 + // 如果是服务端当发送 syn+ack 报文,超过一定的时间未收到 ack 回包,触发超时重传 + // 超时时间变为上次的2倍 如果重传周期超过 1 分钟,返回错误,不再尝试重连 + timeOut *= 2 + if timeOut > 60*time.Second { + return tcpip.ErrTimeout + } + rt.Reset(timeOut) + // 重新发送syn|ack报文 + sendSynTCP(&h.ep.route, h.ep.id, h.flags, h.iss, h.ackNum, h.rcvWnd, synOpts) + case wakerForNotification: + + case wakerForNewSegment: // 受到了回复 + // 对方主机的 TCP 收到 syn+ack 报文段后,还要向 本机 回复确认和上面一样, + // tcp 的控制报文需要消耗一个字节的序列号,所以回复的 ack 序列号为 ISN2+1,发送 ack 报文给本机。 + // 处理握手报文 + if err := h.processSegments(); err != nil { + return err + } + } + } + return nil +} +``` + + +1. 第一次握手 + +首先是创建一个handshake对象,然后随机生成一个32位数字,作为同步序号。 + +``` go + + h := handshake{ + ep: ep, + active: true, // 激活这个管理器 + rcvWnd: rcvWnd, // 初始接收窗口 + rcvWndScale: FindWndScale(rcvWnd), // 接收窗口扩展因子 + } + + // 随机一个iss(对方将收到的序号) 防止黑客搞事 + b := make([]byte, 4) + if _, err := rand.Read(b); err != nil { + panic(err) + } + // 初始化状态为 SynSent + h.state = handshakeSynSent + h.flags = flagSyn + h.ackNum = 0 + h.mss = 0 + h.iss = seqnum.Value(uint32(b[0]) | uint32(b[1])<<8 | uint32(b[2])<<16 | uint32(b[3])<<24) // 随机生成ISN2 + + // 如果是客户端发送 syn 报文,如果是服务端发送 syn+ack 报文 + sendSynTCP(&h.ep.route, h.ep.id, h.flags, h.iss, h.ackNum, h.rcvWnd, synOpts) +``` + + +2. 第二次握手 + +首先,我们开启了一个后台协程,这个协程会轮询acceptedChan + +``` go +// protocolListenLoop 是侦听TCP端点的主循环。它在自己的goroutine中运行,负责处理连接请求 +// 什么叫处理连接请求呢 其实就是 ep.Listen()时在协议栈中注册了一个Laddr+LPort的组合 +// 当有客户端给服务端发送 syn 报文时 由于是新连接 所以服务端并没有相关信息 +// 服务端会把这个报文交给 LAddr:LPort 的ep 去处理 也就是以下Loop +// 在验证通过后 会新建并注册一个 LAddr:LPort+RAddr:RPort的新ep 由它来处理后续的请求 +func (e *endpoint) protocolListenLoop(rcvWnd seqnum.Size) *tcpip.Error { + + // 收尾处理 ... + + e.mu.Lock() + v6only := e.v6only + e.mu.Unlock() + // 创建一个新的tcp连接 + ctx := newListenContext(e.stack, rcvWnd, v6only, e.netProto) + // 初始化事件触发器 并添加事件 + + for { + var index int + switch index, _ = s.Fetch(true); index { // Fetch(true) 阻塞获取 + case wakerForNewSegment: + mayRequeue := true + // 接收和处理tcp报文 ... + default: + panic((nil)) + } + } +} + +``` + + +为了避免直面窗口滑动,我们只看服务端资源不足时,关闭窗口滑动后的连接建立。 + +``` go +// handleListenSegment is called when a listening endpoint receives a segment +// and needs to handle it. +func (e *endpoint) handleListenSegment(ctx *listenContext, s *segment) { + switch s.flags { + case flagSyn: // syn报文处理 + // 分析tcp选项 + opts := parseSynSegmentOptions(s) + if !incSynRcvdCount() { + s.incRef() + go e.handleSynSegment(ctx, s, &opts) + } else { + // 防止半连接池攻击 我们使用cookie + cookie := ctx.createCookie(s.id, s.sequenceNumber, encodeMSS(opts.MSS)) + synOpts := header.TCPSynOptions{ + WS: -1, // 告知对方关闭窗口滑动 + TS: opts.TS, + TSVal: tcpTimeStamp(timeStampOffset()), + TSEcr: opts.TSVal, + } + // 返回 syn+ack 报文 ack+1 表明我们确认了这个syn报文 占用一个字节 + sendSynTCP(&s.route, s.id, flagSyn|flagAck, cookie, s.sequenceNumber+1, ctx.rcvWnd, synOpts) + } + + + } +} +``` + +3. 第三次握手 + +客户端发送回复 ACK + +``` go + case wakerForNewSegment: + // 对方主机的 TCP 收到 syn+ack 报文段后,还要向 本机 回复确认和上面一样, + // tcp 的控制报文需要消耗一个字节的序列号,所以回复的 ack 序列号为 ISN2+1,发送 ack 报文给本机。 + // 处理握手报文 + if err := h.processSegments(); err != nil { + return err + } + } + + // 报文处理 + + if s.flagIsSet(flagAck) { + // 客户端握手完成,发送 ack 报文给服务端 + h.state = handshakeCompleted + // 最后依次 ack 报文丢了也没关系,因为后面一但发送任何数据包都是带ack的 + // 这里要求对端缩减窗口 + // cookie不变 seq+1 表示确认了服务端的 ack|syn 报文 + h.ep.sendRaw(buffer.VectorisedView{}, flagAck, h.iss+1, h.ackNum, h.rcvWnd>>h.effectiveRcvWndScale()) + return nil + } +``` + +服务端处理ACK 新建一个tcp连接 并加入到全连接队列 + +``` go + case flagAck: + // NOTICE 对应处理后台协程过多的情况 三次握手最后一次 ack 报文 + // 当我们的后台写协程不足以处理新的连接的时候 + // 我们认为协议栈目前没有能力处理大规模数据 + // 所以我们限制后面新成立的连接的窗口尺寸 + + // 验证cookie seq-1 和 ack-1 表明 还原两次握手增加的计数 + if data, ok := ctx.isCookieValid(s.id, s.ackNumber-1, + s.sequenceNumber-1); ok && int(data) < len(mssTable) { + // Create newly accepted endpoint and deliver it. + rcvdSynOptions := &header.TCPSynOptions{ + MSS: mssTable[data], + // 关闭我们的窗口滑动 + WS: -1, + } + if s.parsedOptions.TS { + rcvdSynOptions.TS = true + rcvdSynOptions.TSVal = s.parsedOptions.TSVal + rcvdSynOptions.TSEcr = s.parsedOptions.TSEcr + } + + // 三次握手已经完成,新建一个tcp连接 + n, err := ctx.createConnectedEndpoint(s, s.ackNumber-1, + s.sequenceNumber-1, rcvdSynOptions) + if err == nil { + n.tsOffset = 0 + e.deliverAccepted(n) // 分发这个新连接到全连接队列 + } + } +``` + + +``` go + c flag s +生成ISN1 | | + sync_sent|------ isn1 0 ---->|sync_rcvd + | | + | |生成ISN2 + established|<--- isn2 isn+1 ---| + | | + | | + |---isn1+1 isn2+1-->|established +` +``` + + +#### 数据的发送 + +当我们的连接成功建立之后,我们可以直接进行全双工的通信,我们选取一个最简单的场景来演示一下。 + +客户端单方面发送,服务端单方面接收。 + +``` go + go func() { // echo server + + listener := tcpListen(s, proto, addr, localPort) + done <- struct{}{} + for { + conn, err := listener.Accept() + if err != nil { + log.Println(err) + } + log.Println("服务端 建立连接") + + go TestServerEcho(conn) + } + + }() + + go func() { + <-done + port := localPort + conn, err := Dial(s, header.IPv4ProtocolNumber, addr, port) + if err != nil { + log.Fatal(err) + } + + log.Printf("客户端 建立连接\n\n客户端 写入数据\n") + + size := 1 << 10 + for i := 0; i < 3; i++ { + conn.Write(make([]byte, size)) + } + + conn.Close() + }() + +func TestServerEcho(conn *TcpConn) { + for { + buf := make([]byte, 1024) + n, err := conn.Read(buf) + if err != nil { + log.Println(err) + break + } + _ = n + logger.NOTICE("服务端读取数据", string(buf[:])) + } + + conn.ep.Close() +} + +``` + + +连接建立后,客户端写三次,然后关闭连接,服务端循环读取客户端的数据。 + +首先我们需要知道,tcp作为一个内核模块,是位于应用层之下的,应用层无法知悉其细节。它与应用层交流的唯一办法就是调用特定的API。那么我们从API看起。 + +首先是客户端写数据: + +``` go +func (conn *TcpConn) Write(snd []byte) error { + conn.wq.EventRegister(conn.we, waiter.EventOut) + defer conn.wq.EventUnregister(conn.we) + for { + // 调用tcp端点的Write() + n, _, err := conn.ep.Write(tcpip.SlicePayload(snd), tcpip.WriteOptions{To: &conn.raddr}) + if err != nil { + // 如果返回阻塞错误 需要等待 说明底层暂时不支持继续写入 + if err == tcpip.ErrWouldBlock { + <-conn.notifyCh // 不再阻塞 可以接续写 + if int(n) < len(snd) && n > 0 { + snd = snd[n:] + } + continue + } + return fmt.Errorf("%s", err.String()) + } + return nil + } +} + +``` + +可以发现,这个写是会阻塞的,通过一个channle进行控制。 + +那么我们是如何进行写数据的呢,其实我们的数据接收发送分别有两个结构来控制,负责发送的是`sender`。 + +``` go + +type endpoint struct { + // ... + + rcv *receiver // 接收器 + snd *sender // 发送器 +} + + +func (e *endpoint) Write(p tcpip.Payload, + opts tcpip.WriteOptions) (uintptr, <-chan struct{}, *tcpip.Error) { + + // 状态校验 ... + + // tcp流量控制:未被占用发送缓存还剩多少,如果发送缓存已经被用光了,返回 ErrWouldBlock + avail := e.sndBufSize - e.sndBufUsed // sndBufSize 初始化为20m + if avail <= 0 { + e.sndBufMu.Unlock() + return 0, nil, tcpip.ErrWouldBlock + } + + v, perr := p.Get(avail) + if perr != nil { + e.sndBufMu.Unlock() + return 0, nil, perr + } + var err *tcpip.Error + if p.Size() > avail { // 给的数据 缓存不足以容纳 + err = tcpip.ErrWouldBlock + } + l := len(v) + s := newSegmentFromView(&e.route, e.id, v) // 分段 + // 插入发送队列 + e.sndBufUsed += l // 发送队列中段+1 + e.sndBufInQueue += seqnum.Size(l) // 发送队列长度+length + e.sndQueue.PushBack(s) // 将段压入发送队列 + + e.sndBufMu.Unlock() + + // 发送数据,最终会调用 sender sendData 来发送数据 + if e.workMu.TryLock() { + // Do the work inline. + e.handleWrite() // 消费发送队列中的数据 + e.workMu.Unlock() + } else { + // Let the protocol goroutine do the work. + e.sndWaker.Assert() + } + + return uintptr(l), nil, err + +} +``` + +我们来直观地展示一下sender的结构 + +``` go + +数据从左到右进行发送 + + +-------> sndWnd <-------+ + | | +---------------------+-------------+----------+-------------------- +| acked | * * * * * * | # # # # #| unable send +---------------------+-------------+----------+-------------------- + ^ ^ + | | + sndUna sndNxt +*** in flight data +### able send date +``` + + + +``` go +// 从发送队列中取出数据并发送出去 +func (e *endpoint) handleWrite() *tcpip.Error { + e.sndBufMu.Lock() + + // 得到第一个tcp段 注意并不是取出只是查看 + first := e.sndQueue.Front() + if first != nil { + // 向发送链表添加元素 + e.snd.writeList.PushBackList(&e.sndQueue) + // NOTE 更新发送队列下一个发送字节的序号 一次性将链表全部取用 + // 当有新的数据需要发送时会有相逻辑更新这个数值 + e.snd.sndNxtList.UpdateForward(e.sndBufInQueue) + e.sndBufInQueue = 0 + } + + e.sndBufMu.Unlock() + + // Initialize the next segment to write if it's currently nil. + // 初始化snder的发送列表头 + if e.snd.writeNext == nil { + e.snd.writeNext = first + } + + // Push out any new packets. + // 将数据发送出去 + e.snd.sendData() + + return nil +} +``` + + +``` go + +ep.sndQueue: ...->seg3->seg2->seg1 => + +当发送队列中有数据的时候 将这个队列压入写队列 队列的队列 + + writeNext + V +ep.snd.writeList:...->seglist3->seglist2->seglist1 => + ^ ^ ^ ^ ^ ^ + |_s->s_| |_s->s_| |_s->s_| + +我们消费数据的时候找到写队列的队列头,然后遍历它 + +``` + + +``` go + +func (s *sender) sendData() { + limit := s.maxPayloadSize + + // 如果TCP在超过重新传输超时的时间间隔内没有发送数据,TCP应该在开始传输之前将cwnd设置为不超过RW。 + if !s.fr.active && time.Now().Sub(s.lastSendTime) > s.rto { + if s.sndCwnd > InitialCwnd { + s.sndCwnd = InitialCwnd + } + } + + var seg *segment + end := s.sndUna.Add(s.sndWnd) + var dataSent bool + // 遍历发送链表,发送数据 + // tcp拥塞控制:s.outstanding < s.sndCwnd 判断正在发送的数据量不能超过拥塞窗口。 + for seg = s.writeNext; seg != nil && s.outstanding < s.sndCwnd; + seg = seg.Next() { + // 如果seg的flags是0,将flags改为psh|ack + if seg.flags == 0 { + seg.sequenceNumber = s.sndNxt + seg.flags = flagAck | flagPsh + } + + var segEnd seqnum.Value + if seg.data.Size() == 0 { // 数据段没有负载,表示要结束连接 + if s.writeList.Back() != seg { + panic("FIN segments must be the final segment in the write list.") + } + // 发送 fin 报文 + seg.flags = flagAck | flagFin + // fin 报文需要确认,且消耗一个字节序列号 + segEnd = seg.sequenceNumber.Add(1) + } else { // 普通报文 + // We're sending a non-FIN segment. + if seg.flags&flagFin != 0 { + panic("Netstack queues FIN segments without data.") + } + if !seg.sequenceNumber.LessThan(end) { // 超过了发送窗口限制 + break + } + + // tcp流量控制:计算最多一次发送多大数据, + available := int(seg.sequenceNumber.Size(end)) + if available > limit { + available = limit + } + + // 如果seg的payload字节数大于available + // 将seg进行分段,并且插入到该seg的后面 + // ...->[seg3->seg2->seg1]->[seg3->seg2->seg1(2048)] + // ...->[seg3->seg2->seg1]->[seg4->seg3->seg2(1024)->seg1(1024)] + if seg.data.Size() > available { + nSeg := seg.clone() + nSeg.data.TrimFront(available) // NOTE 删掉用过的 + nSeg.sequenceNumber.UpdateForward(seqnum.Size(available)) + s.writeList.InsertAfter(seg, nSeg) + seg.data.CapLength(available) + } + + s.outstanding++ + segEnd = seg.sequenceNumber.Add(seqnum.Size(seg.data.Size())) + } + + if !dataSent { // 没有成功发送任何数据 + dataSent = true + s.ep.disableKeepaliveTimer() + } + + // 发送包 开始计算RTT + s.sendSegment(seg.data, seg.flags, seg.sequenceNumber) + // 发送一个数据段后,更新sndNxt + // 旧的 sndNxt V + // ...->[seg3->seg2->seg1]->[seg3->seg2->seg1] + // 新的 sndNxt^ + if s.sndNxt.LessThan(segEnd) { + s.sndNxt = segEnd + } + } + // Remember the next segment we'll write. + s.writeNext = seg + + // 如果重传定时器没有启动 且 sndUna != sndNxt 启动定时器 + if !s.resendTimer.enabled() && s.sndUna != s.sndNxt { + // NOTE 开启计时器 如果在RTO后没有回信(snd.handleRecvdSegment 中有数据可以处理) 那么将会重发 + // 在 s.resendTimer.init() 中 将会调用 Assert() 唤醒重发函数 retransmitTimerExpired() + s.resendTimer.enable(s.rto) + } + + // NOTE 如果我们的发送窗口被缩到0 我们需要定时去问一下对端消费完了没 + if s.sndUna == s.sndNxt { + s.ep.resetKeepaliveTimer(false) + } +} +``` + + +``` go +c seq ack s +| | +|-----isn1+101 isn2+1-->| 发送100个字节 请确认 没有收到你的数据 +| | +| | +|<--- isn2+1 isn+101 ---| 收到100个字节 确认了 不发送给你数据 +| | +| | +|-----isn1+201 isn2+1-->| 发送100个字节 请确认 没有受到你的数据 +| | +| | +|<---isn2+101 isn1+201--| 收到100个字节 确认了 发送给你100个字节 +| | +| | +|---isn1+201 isn2+101-->| 收到100个字节 确认了 不发送数据给你 + +``` + + +处理对应的ACK报文 + +``` go + +// 收到段时调用 handleRcvdSegment 它负责更新与发送相关的状态 +func (s *sender) handleRcvdSegment(seg *segment) { + // 如果rtt测量seq小于ack num,更新rto + if !s.ep.sendTSOk && s.rttMeasureSeqNum.LessThan(seg.ackNumber) { + s.updateRTO(time.Now().Sub(s.rttMeasureTime)) + s.rttMeasureSeqNum = s.sndNxt + } + + s.ep.updateRecentTimestamp(seg.parsedOptions.TSVal, s.maxSentAck, seg.sequenceNumber) + + // tcp的拥塞控制:检查是否有重复的ack,是否进入快速重传和快速恢复状态 + rtx := s.checkDuplicateAck(seg) + + // 存放当前窗口大小。 + s.sndWnd = seg.window + // 获取确认号 + ack := seg.ackNumber + // 如果ack在最小未确认的seq和segNext之间 + if (ack - 1).InRange(s.sndUna, s.sndNxt) { + // 收到了东西 就暂停计时 + s.resendTimer.disable() + + // NOTE 一个RTT 结束 + if s.ep.sendTSOk && seg.parsedOptions.TSEcr != 0 { + // TSVal/Ecr values sent by Netstack are at a millisecond + // granularity. + elapsed := time.Duration(s.ep.timestamp()-seg.parsedOptions.TSEcr) * time.Millisecond + s.updateRTO(elapsed) + } + // 获取这次确认的字节数,即 ack - snaUna + acked := s.sndUna.Size(ack) + // 更新下一个未确认的序列号 + s.sndUna = ack + + ackLeft := acked + originalOutstanding := s.outstanding + // 从发送链表中删除已经确认的数据,发送窗口的滑动。 + for ackLeft > 0 { // 有成功确认的数据 丢弃它们 有剩余数据的话继续发送(根据拥塞策略控制) + seg := s.writeList.Front() + datalen := seg.logicalLen() + + if datalen > ackLeft { + seg.data.TrimFront(int(ackLeft)) + break + } + + if s.writeNext == seg { + s.writeNext = seg.Next() + } + // 从发送链表中删除已确认的tcp段。 + s.writeList.Remove(seg) + // 因为有一个tcp段确认了,所以 outstanding 减1 + s.outstanding-- + seg.decRef() + ackLeft -= datalen + } + // 当收到ack确认时,需要更新发送缓冲占用 + s.ep.updateSndBufferUsage(int(acked)) + + // tcp拥塞控制:如果没有进入快速恢复状态,那么根据确认的数据包的数量更新拥塞窗口。 + if !s.fr.active { + // 调用相应拥塞控制算法的 Update + s.cc.Update(originalOutstanding - s.outstanding) + } + + // 如果发生超时重传时,s.outstanding可能会降到零以下, + // 重置为零但后来得到一个覆盖先前发送数据的确认。 + if s.outstanding < 0 { + s.outstanding = 0 + } + } + + // tcp拥塞控制 快速重传 + if rtx { + s.resendSegment() + } + + //if s.ep.id.LocalPort != 9999 { + // log.Println(s) + //} + + // 现在某些待处理数据已被确认,或者窗口打开,或者由于快速恢复期间出现重复的ack而导致拥塞窗口膨胀, + // 因此发送更多数据。如果需要,这也将重新启用重传计时器。 + s.sendData() +} +``` + + +#### 连接的断开 + +``` go + c flag s + | | + 1 |------fin----->| + | | + |<-----ack------| 2 + | | + | | + |<-----ack------| + | | + |-----ack------>| + | | + | | + |<------fin-----| 3 + | | + 4 |------ack----->| + | | + | | + +``` + + + + + diff --git a/tcpip/stack/nic.go b/tcpip/stack/nic.go index fe69e5d..fc2ddb6 100644 --- a/tcpip/stack/nic.go +++ b/tcpip/stack/nic.go @@ -488,9 +488,11 @@ func (n *NIC) DeliverTransportPacket(r *Route, protocol tcpip.TransportProtocolN }) id := TransportEndpointID{dstPort, r.LocalAddress, srcPort, r.RemoteAddress} // 调用分流器,根据传输层协议和传输层id分发数据报文 + // 现在本网卡中尝试分发 if n.demux.deliverPacket(r, protocol, vv, id) { return } + // 在整个协议栈尝试分发 if n.stack.demux.deliverPacket(r, protocol, vv, id) { return } diff --git a/tcpip/stack/transport_demuxer.go b/tcpip/stack/transport_demuxer.go index b290e33..889ba95 100644 --- a/tcpip/stack/transport_demuxer.go +++ b/tcpip/stack/transport_demuxer.go @@ -126,6 +126,8 @@ func (d *transportDemuxer) findEndpointLocked(eps *transportEndpoints, } // Try to find a match with the id minus the remote part. + // listener 的情况 本地没有这个 dstIP+dstPort:srcIP+srcPort 的连接交由 + // ""+0:srcIP+srcPort的Listener来处理 nid.LocalAddress = id.LocalAddress nid.RemoteAddress = "" nid.RemotePort = 0 diff --git a/tcpip/transport/tcp/accept.go b/tcpip/transport/tcp/accept.go index f3963af..89dc006 100644 --- a/tcpip/transport/tcp/accept.go +++ b/tcpip/transport/tcp/accept.go @@ -280,7 +280,7 @@ func (e *endpoint) handleSynSegment(ctx *listenContext, s *segment, opts *header return } // 到这里,三次握手已经完成,那么分发一个新的连接 - e.deliverAccepted(n) + e.deliverAccepted(n) // 分发这个新连接到全连接队列 } // handleListenSegment is called when a listening endpoint receives a segment @@ -290,28 +290,51 @@ func (e *endpoint) handleListenSegment(ctx *listenContext, s *segment) { case flagSyn: // syn报文处理 // 分析tcp选项 opts := parseSynSegmentOptions(s) - if incSynRcvdCount() { + if !incSynRcvdCount() { s.incRef() go e.handleSynSegment(ctx, s, &opts) } else { + // 防止半连接池攻击 我们使用cookie cookie := ctx.createCookie(s.id, s.sequenceNumber, encodeMSS(opts.MSS)) - // Send SYN with window scaling because we currently - // dont't encode this information in the cookie. - // - // Enable Timestamp option if the original syn did have - // the timestamp option specified. synOpts := header.TCPSynOptions{ - WS: -1, + WS: -1, // 告知对方关闭窗口滑动 TS: opts.TS, TSVal: tcpTimeStamp(timeStampOffset()), TSEcr: opts.TSVal, } - // 返回 syn+ack 报文 + // 返回 syn+ack 报文 ack+1 表明我们确认了这个syn报文 占用一个字节 sendSynTCP(&s.route, s.id, flagSyn|flagAck, cookie, s.sequenceNumber+1, ctx.rcvWnd, synOpts) } - // 返回一个syn+ack报文 - case flagFin: // fin报文处理 - // 三次握手最后一次 ack 报文 + + case flagAck: + // NOTICE 对应处理后台协程过多的情况 三次握手最后一次 ack 报文 + // 当我们的后台写协程不足以处理新的连接的时候 + // 我们认为协议栈目前没有能力处理大规模数据 + // 所以我们限制后面新成立的连接的窗口尺寸 + + // 验证cookie seq-1 和 ack-1 表明 还原两次握手增加的计数 + if data, ok := ctx.isCookieValid(s.id, s.ackNumber-1, + s.sequenceNumber-1); ok && int(data) < len(mssTable) { + // Create newly accepted endpoint and deliver it. + rcvdSynOptions := &header.TCPSynOptions{ + MSS: mssTable[data], + // 关闭我们的窗口滑动 + WS: -1, + } + if s.parsedOptions.TS { + rcvdSynOptions.TS = true + rcvdSynOptions.TSVal = s.parsedOptions.TSVal + rcvdSynOptions.TSEcr = s.parsedOptions.TSEcr + } + + // 三次握手已经完成,新建一个tcp连接 + n, err := ctx.createConnectedEndpoint(s, s.ackNumber-1, + s.sequenceNumber-1, rcvdSynOptions) + if err == nil { + n.tsOffset = 0 + e.deliverAccepted(n) // 分发这个新连接到全连接队列 + } + } } } diff --git a/tcpip/transport/tcp/connect.go b/tcpip/transport/tcp/connect.go index be21b71..49ed65f 100644 --- a/tcpip/transport/tcp/connect.go +++ b/tcpip/transport/tcp/connect.go @@ -388,15 +388,15 @@ func (h *handshake) processSegments() *tcpip.Error { // execute executes the TCP 3-way handshake. // 执行tcp 3次握手,客户端和服务端都是调用该函数来实现三次握手 /* - c flag s -生成ISN1 | |生成ISN2 - sync_sent|------sync---->|sync_rcvd - | | - | | - established|<--sync|ack----| - | | - | | - |------ack----->|established + c flag s + 生成ISN1 | | 生成ISN2 + sync_sent |------sync----->| sync_rcvd + | | + | | + established |<---sync|ack----| + | | + | | + |------ack------>| established */ func (h *handshake) execute() *tcpip.Error { // 是否需要拿到下一条地址 @@ -663,7 +663,23 @@ func (e *endpoint) sendRaw(data buffer.VectorisedView, flags byte, seq, ack seqn return err } + + // 从发送队列中取出数据并发送出去 +/* +ep.sndQueue: ...->seg3->seg2->seg1 => + +当发送队列中有数据的时候 将这个队列压入写队列 队列的队列 + + writeNext + V +ep.snd.writeList:...->seglist3->seglist2->seglist1 => + ^ ^ ^ ^ ^ ^ + |_s->s_| |_s->s_| |_s->s_| + +我们消费数据的时候找到写队列的队列头,然后遍历它 + + */ func (e *endpoint) handleWrite() *tcpip.Error { e.sndBufMu.Lock() diff --git a/tcpip/transport/tcp/endpoint.go b/tcpip/transport/tcp/endpoint.go index 144f85f..d3ef1ec 100644 --- a/tcpip/transport/tcp/endpoint.go +++ b/tcpip/transport/tcp/endpoint.go @@ -425,9 +425,9 @@ func (e *endpoint) Write(p tcpip.Payload, opts tcpip.WriteOptions) (uintptr, <-c l := len(v) s := newSegmentFromView(&e.route, e.id, v) // 分段 // 插入发送队列 - e.sndBufUsed += l - e.sndBufInQueue += seqnum.Size(l) - e.sndQueue.PushBack(s) + e.sndBufUsed += l // 发送队列中段+1 + e.sndBufInQueue += seqnum.Size(l) // 发送队列长度+length + e.sndQueue.PushBack(s) // 将段压入发送队列 e.sndBufMu.Unlock() diff --git a/tcpip/transport/tcp/snd.go b/tcpip/transport/tcp/snd.go index 5272442..f672962 100644 --- a/tcpip/transport/tcp/snd.go +++ b/tcpip/transport/tcp/snd.go @@ -525,7 +525,7 @@ func (s *sender) retransmitTimerExpired() bool { // 发送数据段,最终调用 sendSegment 来发送 func (s *sender) sendData() { - limit := s.maxPayloadSize //最开始是65483 + limit := s.maxPayloadSize // 如果TCP在超过重新传输超时的时间间隔内没有发送数据,TCP应该在开始传输之前将cwnd设置为不超过RW。 if !s.fr.active && time.Now().Sub(s.lastSendTime) > s.rto { @@ -539,7 +539,9 @@ func (s *sender) sendData() { var dataSent bool // 遍历发送链表,发送数据 // tcp拥塞控制:s.outstanding < s.sndCwnd 判断正在发送的数据量不能超过拥塞窗口。 - for seg = s.writeNext; seg != nil && s.outstanding < s.sndCwnd; seg = seg.Next() { // 首次发送不会超过两个包 + for seg = s.writeNext; + seg != nil && s.outstanding < s.sndCwnd; + seg = seg.Next() { // 如果seg的flags是0,将flags改为psh|ack if seg.flags == 0 { seg.sequenceNumber = s.sndNxt @@ -560,8 +562,7 @@ func (s *sender) sendData() { if seg.flags&flagFin != 0 { panic("Netstack queues FIN segments without data.") } - if !seg.sequenceNumber.LessThan(end) { - log.Println("暂停数据发送 等待确认标号", seg.sequenceNumber, " 已收到 。。。。") + if !seg.sequenceNumber.LessThan(end) { // 超过了发送窗口限制 break } @@ -573,6 +574,8 @@ func (s *sender) sendData() { // 如果seg的payload字节数大于available // 将seg进行分段,并且插入到该seg的后面 + // ...->[seg3->seg2->seg1]->[seg3->seg2->seg1(2048)] + // ...->[seg3->seg2->seg1]->[seg4->seg3->seg2(1024)->seg1(1024)] if seg.data.Size() > available { nSeg := seg.clone() nSeg.data.TrimFront(available) // NOTE 删掉用过的 @@ -599,8 +602,10 @@ func (s *sender) sendData() { // 发送包 开始计算RTT s.sendSegment(seg.data, seg.flags, seg.sequenceNumber) // 发送一个数据段后,更新sndNxt + // 旧的 sndNxt V + // ...->[seg3->seg2->seg1]->[seg3->seg2->seg1] + // 新的 sndNxt^ if s.sndNxt.LessThan(segEnd) { - log.Println(s.ep.id.LocalPort, " 更新sndNxt", s.sndNxt, " 为 ", segEnd, "下一次发送的数据头为", segEnd) s.sndNxt = segEnd } }