diff --git a/cmd/netstack/main.go b/cmd/netstack/main.go index 4764c84..ff61c59 100644 --- a/cmd/netstack/main.go +++ b/cmd/netstack/main.go @@ -136,20 +136,24 @@ func main() { go func() { // echo server listener := tcpListen(s, proto, addr, localPort) done <- struct{}{} - conn, err := listener.Accept() - if err != nil { - log.Println(err) - } - for { - buf := make([]byte, 1024) - if _, err := conn.Read(buf); err != nil { + conn, err := listener.Accept() + if err != nil { log.Println(err) - break } - fmt.Println(string(buf)) - // conn.Write([]byte("Server echo")) - //} + log.Println("服务端 建立连接") + + for { + buf := make([]byte, 1024) + if _, err := conn.Read(buf); err != nil { + log.Println(err) + break + } + fmt.Println(string(buf)) + // conn.Write([]byte("Server echo")) + //} + } + } os.Exit(1) @@ -159,10 +163,12 @@ func main() { <-done go func() { port := localPort - _, err := Dial(s, header.IPv4ProtocolNumber, addr, port) + conn, err := Dial(s, header.IPv4ProtocolNumber, addr, port) if err != nil { log.Fatal(err) } + log.Println("客户端 建立连接") + conn.Close() }() close(done) @@ -180,6 +186,7 @@ func Dial(s *stack.Stack, proto tcpip.NetworkProtocolNumber, addr tcpip.Address, } var wq waiter.Queue waitEntry, notifyCh := waiter.NewChannelEntry(nil) + wq.EventRegister(&waitEntry, waiter.EventOut) // 新建一个tcp端 ep, err := s.NewEndpoint(tcp.ProtocolNumber, proto, &wq) if err != nil { @@ -189,6 +196,7 @@ func Dial(s *stack.Stack, proto tcpip.NetworkProtocolNumber, addr tcpip.Address, if err != nil { if err == tcpip.ErrConnectStarted { <-notifyCh + log.Println("???") } else { return nil, fmt.Errorf("%s", err.String()) } @@ -245,6 +253,11 @@ func (conn *TcpConn) Write(snd []byte) error { } } +func (conn *TcpConn) Close() { + log.Println("Close") + conn.ep.Close() +} + // Listener tcp连接监听器 type Listener struct { raddr tcpip.FullAddress diff --git a/logger/logger.go b/logger/logger.go index b23e9ad..a795673 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -1,6 +1,7 @@ package logger import ( + "log" "sync" ) @@ -50,3 +51,19 @@ func (l *logger) Info(mask uint8, f func()) { f() } } + +func (l *logger) info(f func()) { + f() +} + +func TODO(msg string) { + GetInstance().info(func() { + log.Println("TODO: " + msg) + }) +} + +func FIXME(msg string) { + GetInstance().info(func() { + log.Fatal("FIXME: " + msg) + }) +} diff --git a/sleep/sleep_unsafe.go b/sleep/sleep_unsafe.go index 5428d4f..e44bb85 100644 --- a/sleep/sleep_unsafe.go +++ b/sleep/sleep_unsafe.go @@ -234,7 +234,6 @@ func (s *Sleeper) Fetch(block bool) (id int, ok bool) { old := (*Sleeper)(atomic.SwapPointer(&w.s, usleeper(s))) //log.Println("Sleeper", unsafe.Pointer(s), "old", unsafe.Pointer(old), "&assertSleeper", unsafe.Pointer(&assertedSleeper), "w.id", w.id) if old == &assertedSleeper { - //log.Println("成功返回 没有阻塞") return w.id, true } } diff --git a/tcpip/seqnum/seqnum.go b/tcpip/seqnum/seqnum.go index 82494fb..17b1521 100644 --- a/tcpip/seqnum/seqnum.go +++ b/tcpip/seqnum/seqnum.go @@ -26,7 +26,7 @@ func (v Value) InRange(a, b Value) bool { } // InWindows check v in [first, first+size) -func (v Value) InWindows(first Value, size Size) bool { +func (v Value) InWindow(first Value, size Size) bool { return v.InRange(first, first.Add(size)) } diff --git a/tcpip/transport/tcp/README.md b/tcpip/transport/tcp/README.md index a844c48..4146a5f 100644 --- a/tcpip/transport/tcp/README.md +++ b/tcpip/transport/tcp/README.md @@ -101,4 +101,18 @@ TCP 最初只规定了一种选项,即最大报文段长度 MSS(Maximum Segm 主机 A 发给主机 B 的 ACK 中途被丢,没有到达主机 B 主机 A 发完 ACK,单方面认为 TCP 为 Established 状态,而 B 显然认为 TCP 为 Active 状态: a. 如果此时双方都没有数据发送,主机 B 会周期性超时重传,直到收到 A 的确认,收到之后主机 B 的 TCP 连接也为 Established 状态,双向可以发包。 -b. 如果此时 A 有数据发送,主机 B 收到主机 A 的 Data + ACK,自然会切换为 established 状态,并接受主机 A 的 Data。 \ No newline at end of file +b. 如果此时 A 有数据发送,主机 B 收到主机 A 的 Data + ACK,自然会切换为 established 状态,并接受主机 A 的 Data。 + +## TCP连接的释放 + +![img](https://doc.shiyanlou.com/document-uid949121labid10418timestamp1555574060171.png) + +1)数据传输结束后,主机 A 的应用进程调用 Close 函数,先向其 TCP 发出释放连接请求,不再发送数据。TCP 通知对方要释放从主机 A 到主机 B 的连接,将发往主机 B 的 TCP 报文段首部的终止比特 FIN 置为 1,序号 seq1 等于已传送数据的最后一个字节的序号加 1。 + +2)主机 B 的 TCP 收到释放连接通知后发出确认,其序号为 seq1+1,同时通知应用进程,这样主机 A 到主机 B 的连接就释放了,连接处于半关闭状态。主机 B 不在接受主机 A 发来的数据;但主机 B 还向 A 发送数据,主机 A 若正确接收数据仍需要发送确认。 + +3)在主机 B 向主机 A 的数据发送结束后,其应用进程应该主动调用 Close 函数,释放 TCP 连接。主机 B 发出的连接释放报文段必须将终止比特置为 1,并使其序号 seq2 等于前面已经传送过的数据的最后一个字节的序号加 1,还必须回复 ACK=seq1+1。 + +4)主机 A 对主机 B 的连接释放报文段发出确认,将 ACK 置为 1,ACK=seq2+1, seq=seq1+1。这样才把从 B 到 A 的反方向连接释放掉,主机 A 的 TCP 再向其应用进程报告,整个连接已经全部释放。 + +还有一个要注意的是,fin 包和数据包一样,如果丢失了,会进行重传,实际上可能是是 fin 丢失或 ack 丢失。重传的周期由 rto 决定。 \ No newline at end of file diff --git a/tcpip/transport/tcp/connect.go b/tcpip/transport/tcp/connect.go index ce9c6de..f677985 100644 --- a/tcpip/transport/tcp/connect.go +++ b/tcpip/transport/tcp/connect.go @@ -11,8 +11,10 @@ import ( "netstack/tcpip/header" "netstack/tcpip/seqnum" "netstack/tcpip/stack" + "netstack/waiter" "sync" "time" + "unsafe" ) const maxSegmentsPerWake = 100 @@ -108,6 +110,16 @@ func (h *handshake) resetState() *tcpip.Error { return nil } +// effectiveRcvWndScale returns the effective receive window scale to be used. +// If the peer doesn't support window scaling, the effective rcv wnd scale is +// zero; otherwise it's the value calculated based on the initial rcv wnd. +func (h *handshake) effectiveRcvWndScale() uint8 { + if h.sndWndScale < 0 { + return 0 + } + return uint8(h.rcvWndScale) +} + // resetToSynRcvd resets the state of the handshake object to the SYN-RCVD // state. func (h *handshake) resetToSynRcvd(iss seqnum.Value, irs seqnum.Value, opts *header.TCPSynOptions) { @@ -180,7 +192,74 @@ func (h *handshake) checkAck(s *segment) bool { // synSentState 是客户端或者服务端接收到第一个握手报文的处理 // 正常情况下,如果是客户端,此时应该收到 syn+ack 报文,处理后发送 ack 报文给服务端。 // 如果是服务端,此时接收到syn报文,那么应该回复 syn+ack 报文给客户端,并设置状态为 handshakeSynRcvd。 +// NOTE 为什么这里服务端又一次实现发送 syn|ack 是为了处理普通tcp连接收到 syn 报文的异常情况 +// 比如第一次监听者收到syn并发送syn|ack后并没有收到返回 客户端 func (h *handshake) synSentState(s *segment) *tcpip.Error { + log.Println("客户端收到了 syn|ack segment") + // RFC 793, page 37, states that in the SYN-SENT state, a reset is + // acceptable if the ack field acknowledges the SYN. + if s.flagIsSet(flagRst) { + if s.flagIsSet(flagAck) && s.ackNumber == h.iss+1 { + return tcpip.ErrConnectionRefused + } + return nil + } + + if !h.checkAck(s) { + return nil + } + + // We are in the SYN-SENT state. We only care about segments that have + // the SYN flag. + if !s.flagIsSet(flagSyn) { + return nil + } + + // Parse the SYN options. + rcvSynOpts := parseSynSegmentOptions(s) + + // Remember if the Timestamp option was negotiated. + h.ep.maybeEnableTimestamp(&rcvSynOpts) + + // Remember if the SACKPermitted option was negotiated. + h.ep.maybeEnableSACKPermitted(&rcvSynOpts) + + // Remember the sequence we'll ack from now on. + h.ackNum = s.sequenceNumber + 1 + h.flags |= flagAck + h.mss = rcvSynOpts.MSS + h.sndWndScale = rcvSynOpts.WS + + // If this is a SYN ACK response, we only need to acknowledge the SYN + // and the handshake is completed. + // 客户端接收到了 syn+ack 报文 + if s.flagIsSet(flagAck) { + // 客户端握手完成,发送 ack 报文给服务端 + h.state = handshakeCompleted + // 最后依次 ack 报文丢了也没关系,因为后面一但发送任何数据包都是带ack的 + h.ep.sendRaw(buffer.VectorisedView{}, flagAck, h.iss+1, h.ackNum, h.rcvWnd>>h.effectiveRcvWndScale()) + return nil + } + + // A SYN segment was received, but no ACK in it. We acknowledge the SYN + // but resend our own SYN and wait for it to be acknowledged in the + // SYN-RCVD state. + // 服务端收到了 syn 报文,应该回复客户端 syn+ack 报文,且设置状态为 handshakeSynRcvd + h.state = handshakeSynRcvd + synOpts := header.TCPSynOptions{ + WS: h.rcvWndScale, + TS: rcvSynOpts.TS, + TSVal: h.ep.timestamp(), + TSEcr: h.ep.recentTS, + + // We only send SACKPermitted if the other side indicated it + // permits SACK. This is not explicitly defined in the RFC but + // this is the behaviour implemented by Linux. + SACKPermitted: rcvSynOpts.SACKPermitted, + } + // 发送 syn+ack 报文,如果该报文在链路中丢了,没有关系,客户端会重新发送 syn 报文 + sendSynTCP(&s.route, h.ep.id, h.flags, h.iss, h.ackNum, h.rcvWnd, synOpts) + return nil } @@ -190,6 +269,11 @@ func (h *handshake) synSentState(s *segment) *tcpip.Error { func (h *handshake) synRcvdState(s *segment) *tcpip.Error { if s.flagIsSet(flagRst) { // TODO 需要根据窗口返回 等理解了窗口后再写 + // RFC 793, page 37, states that in the SYN-RCVD state, a reset + // is acceptable if the sequence number is in the window. + if s.sequenceNumber.InWindow(h.ackNum, h.rcvWnd) { + return tcpip.ErrConnectionRefused + } return nil } // 校验ack报文 @@ -506,6 +590,7 @@ func (e *endpoint) sendRaw(data buffer.VectorisedView, flags byte, seq, ack seqn // sackBlocks = e.sack.Blocks[:e.sack.NumBlocks] //} options := e.makeOptions(sackBlocks) + log.Println(unsafe.Pointer(e), "怎么又调用了一次 handleWrite!!!!!!!!!!!!!!!!!!!!!!!!!!!") err := sendTCP(&e.route, e.id, data, e.route.DefaultTTL(), flags, seq, ack, rcvWnd, options) putOptions(options) return err @@ -513,16 +598,50 @@ func (e *endpoint) sendRaw(data buffer.VectorisedView, flags byte, seq, ack seqn // 从发送队列中取出数据并发送出去 func (e *endpoint) handleWrite() *tcpip.Error { + e.sndBufMu.Lock() + + // 得到第一个tcp段 + first := e.sndQueue.Front() + if first != nil { + // 向发送链表添加元素 + e.snd.writeList.PushBackList(&e.sndQueue) + // NOTE 更新发送队列下一个发送字节的序号 一次性将链表全部取用 + // 当有新的数据需要发送时会有相关逻辑更新这个数值 + e.snd.sndNxtList.UpdateForward(e.sndBufInQueue) + e.sndBufInQueue = 0 + } + + e.sndBufMu.Unlock() + + // Initialize the next segment to write if it's currently nil. + // 初始化snder的发送列表头 + if e.snd.writeNext == nil { + e.snd.writeNext = first + } + + // Push out any new packets. + // 将数据发送出去 + e.snd.sendData() + return nil } // 关闭连接的处理,最终会调用 sendData 来发送 fin 包 func (e *endpoint) handleClose() *tcpip.Error { + + // Drain the send queue. + e.handleWrite() + + // Mark send side as closed. + // 标记发送器关闭 + e.snd.closed = true + return nil } // handleSegments 从队列中取出 tcp 段数据,然后处理它们。 func (e *endpoint) handleSegments() *tcpip.Error { + log.Println(unsafe.Pointer(e), "处理报文") checkRequeue := true for i := 0; i < maxSegmentsPerWake; i++ { s := e.segmentQueue.dequeue() @@ -561,6 +680,16 @@ func (e *endpoint) handleSegments() *tcpip.Error { func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error { // 收尾工作 + // 收尾的一些工作 + epilogue := func() { + // e.mu is expected to be hold upon entering this section. + + // TODO 需要添加 + e.mu.Unlock() + + // When the protocol loop exits we should wake up our waiters. + e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) + } // 处理三次握手 if handshake { @@ -572,8 +701,36 @@ func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error { // 执行握手 err = h.execute() } + // 处理握手有错 + if err != nil { + e.lastErrorMu.Lock() + e.lastError = err + e.lastErrorMu.Unlock() + + e.mu.Lock() + e.state = stateError + e.hardError = err + // Lock released below. + epilogue() + + return err + } + + // 到这里就表示三次握手已经成功了,那么初始化发送器和接收器 + e.snd = newSender(e, h.iss, h.ackNum-1, h.sndWnd, h.mss, h.sndWndScale) + + e.rcvListMu.Lock() + e.rcv = newReceiver(e, h.ackNum-1, h.rcvWnd, h.effectiveRcvWndScale()) + e.rcvListMu.Unlock() } + e.mu.Lock() + e.state = stateConnected + // TODO drained + e.mu.Unlock() + // 提醒 Dial 函数 连接已经成功建立 + e.waiterQueue.Notify(waiter.EventOut) + // Set up the functions that will be called when the main protocol loop // wakes up. // 触发器的事件,这些函数很重要 @@ -601,11 +758,26 @@ func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error { s.AddWaker(funcs[i].w, i) } + // 恢复的端点需要以下断言和通知。新创建的新端点具有空状态,不应调用任何端点。 + e.segmentQueue.mu.Lock() + if !e.segmentQueue.list.Empty() { + e.newSegmentWaker.Assert() + } + e.segmentQueue.mu.Unlock() + + e.rcvListMu.Lock() + if !e.rcvList.Empty() { + e.waiterQueue.Notify(waiter.EventIn) + } + e.rcvListMu.Unlock() + + // TODO 需要添加 workerCleanup + // 主循环,处理tcp报文 // 要使这个主循环结束,也就是tcp连接完全关闭,得同时满足三个条件: // 1,接收器关闭了 2,发送器关闭了 3,下一个未确认的序列号等于添加到发送列表的下一个段的序列号 //for !e.rcv.closed || !e.snd.closed || e.snd.sndUna != e.snd.sndNxtList { - for { + for !e.rcv.closed /*TODO 其他条件*/ { e.workMu.Unlock() // s.Fetch 会返回事件的index,比如 v=0 的话, // funcs[v].f()就是调用 e.handleWrite @@ -616,9 +788,19 @@ func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error { e.mu.Lock() //e.resetConnectionLocked(err) // Lock released below. - //epilogue() + epilogue() log.Println(err) return nil } } + + // Mark endpoint as closed. + e.mu.Lock() + if e.state != stateError { + e.state = stateClosed + } + // Lock released below. + epilogue() + + return nil } diff --git a/tcpip/transport/tcp/endpoint.go b/tcpip/transport/tcp/endpoint.go index 09ac301..925bb49 100644 --- a/tcpip/transport/tcp/endpoint.go +++ b/tcpip/transport/tcp/endpoint.go @@ -41,6 +41,11 @@ type endpoint struct { netProto tcpip.NetworkProtocolNumber // 网络协议号 ipv4 ipv6 waiterQueue *waiter.Queue // 事件驱动机制 + // lastError represents the last error that the endpoint reported; + // access to it is protected by the following mutex. + lastErrorMu sync.Mutex + lastError *tcpip.Error + // TODO 需要添加 // rcvListMu can be taken after the endpoint mu below. @@ -88,6 +93,9 @@ type endpoint struct { // TSVal field in the timestamp option. tsOffset uint32 + // shutdownFlags represent the current shutdown state of the endpoint. + shutdownFlags tcpip.ShutdownFlags + // sackPermitted is set to true if the peer sends the TCPSACKPermitted // option in the SYN/SYN-ACK. sackPermitted bool @@ -146,7 +154,25 @@ func newEndpoint(stack *stack.Stack, netProto tcpip.NetworkProtocolNumber, waite } func (e *endpoint) Close() { - log.Println("TODO 在写了 在写了") + e.Shutdown(tcpip.ShutdownWrite | tcpip.ShutdownRead) + e.mu.Lock() + + // We always release ports inline so that they are immediately available + // for reuse after Close() is called. If also registered, it means this + // is a listening socket, so we must unregister as well otherwise the + // next user would fail in Listen() when trying to register. + if e.isPortReserved { + e.stack.ReleasePort(e.effectiveNetProtos, ProtocolNumber, e.id.LocalAddress, e.id.LocalPort) + e.isPortReserved = false + + if e.isRegistered { + e.stack.UnregisterTransportEndpoint(e.boundNICID, e.effectiveNetProtos, ProtocolNumber, e.id) + e.isRegistered = false + } + } + + logger.TODO("添加清理资源的逻辑") + e.mu.Unlock() } // Read 从tcp的接收队列中读取数据 @@ -407,6 +433,46 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) (er } func (e *endpoint) Shutdown(flags tcpip.ShutdownFlags) *tcpip.Error { + e.mu.Lock() + defer e.mu.Unlock() + e.shutdownFlags |= flags + + switch e.state { + case stateConnected: // 客户端关闭 + // 不能直接关闭读数据包,因为关闭连接的时候四次挥手还需要读取报文。 + if (e.shutdownFlags & tcpip.ShutdownWrite) != 0 { + e.rcvListMu.Lock() + rcvBufUsed := e.rcvBufUsed + e.rcvListMu.Unlock() + if rcvBufUsed > 0 { + // 如果接收队列中还有数据 通知对端RESET + logger.TODO("通知对端RESET") + return nil + } + } + + e.sndBufMu.Lock() + if e.sndClosed { + // Already closed. + e.sndBufMu.Unlock() + break + } + + // Queue fin segment. + s := newSegmentFromView(&e.route, e.id, nil) + e.sndQueue.PushBack(s) + e.sndBufInQueue++ // 仅仅占用一个字节位置 + // Mark endpoint as closed. + e.sndClosed = true + e.sndBufMu.Unlock() + + // 触发调用 handleClose + e.sndCloseWaker.Assert() + case stateListen: // 服务端关闭 + logger.FIXME("添加服务端关闭逻辑") + default: + return tcpip.ErrNotConnected + } return nil } @@ -640,11 +706,11 @@ func (e *endpoint) HandlePacket(r *stack.Route, id stack.TransportEndpointID, vv } // Send packet to worker goroutine. if e.segmentQueue.enqueue(s) { - var prifix string = "tcp连接" + var prefix string = "tcp连接" if _, err := e.GetRemoteAddress(); err != nil { - prifix = "监听者" + prefix = "监听者" } - log.Printf(prifix+"收到 tcp [%s] 报文片段 from %s, seq: %d, ack: |%d|", + log.Printf(prefix+"收到 tcp [%s] 报文片段 from %s, seq: %d, ack: |%d|", flagString(s.flags), fmt.Sprintf("%s:%d", s.id.RemoteAddress, s.id.RemotePort), s.sequenceNumber, s.ackNumber) diff --git a/tcpip/transport/tcp/rcv.go b/tcpip/transport/tcp/rcv.go index 3c3ab4f..8263cf6 100644 --- a/tcpip/transport/tcp/rcv.go +++ b/tcpip/transport/tcp/rcv.go @@ -52,11 +52,12 @@ func (r *receiver) getSendParams() (rcvNxt seqnum.Value, rcvWnd seqnum.Size) { return r.rcvNxt, r.rcvNxt.Size(r.rcvAcc) >> r.rcvWndScale } +// FIXME 重大嫌疑 客户端为什么又发送了一个 fin|ack ?????????????????????????????????? func (r *receiver) consumeSegment(s *segment, segSeq seqnum.Value, segLen seqnum.Size) bool { if segLen > 0 { // 我们期望接收到的序列号范围应该是 seqStart <= rcvNxt < seqEnd, // 如果不在这个范围内说明我们少了数据段,返回false,表示不能立马消费 - if !r.rcvNxt.InWindows(segSeq, segLen) { + if !r.rcvNxt.InWindow(segSeq, segLen) { return false } // 尝试去除已经确认过的数据 diff --git a/tcpip/transport/tcp/snd.go b/tcpip/transport/tcp/snd.go index 4c264b0..fc64f84 100644 --- a/tcpip/transport/tcp/snd.go +++ b/tcpip/transport/tcp/snd.go @@ -1,7 +1,7 @@ package tcp import ( - "log" + "netstack/logger" "netstack/tcpip" "netstack/tcpip/buffer" "netstack/tcpip/seqnum" @@ -48,9 +48,19 @@ type sender struct { // sndNxt 是要发送的下一个段的序列号。 sndNxt seqnum.Value + // sndNxtList is the sequence number of the next segment to be added to + // the send list. + // sndNxtList 是要添加到发送列表的下一个段的序列号。 + sndNxtList seqnum.Value + // maxSentAck is the maxium acknowledgement actually sent. maxSentAck seqnum.Value + closed bool + writeNext *segment + // 发送链表 + writeList segmentList + // cc is the congestion control algorithm in use for this sender. // cc 是实现拥塞控制算法的接口 cc congestionControl @@ -67,9 +77,8 @@ func newSender(ep *endpoint, iss, irs seqnum.Value, sndWnd seqnum.Size, mss uint } func (s *sender) sendAck() { - log.Println("发送字节序", s.sndNxt) s.sendSegment(buffer.VectorisedView{}, flagAck, s.sndNxt) // seq = cookies+1 ack ack|fin.seq+1 - s.sendSegment(buffer.VectorisedView{}, flagFin, 0) + logger.TODO("发送字节序") } // sendSegment sends a new segment containing the given payload, flags and @@ -98,5 +107,39 @@ func (s *sender) handleRcvdSegment(seg *segment) { // 发送数据段,最终调用 sendSegment 来发送 func (s *sender) sendData() { + //log.Println(unsafe.Pointer(s.ep), "怎么又调用了一次") + var seg *segment + // 遍历发送链表,发送数据 + // tcp拥塞控制:s.outstanding < s.sndCwnd 判断正在发送的数据量不能超过拥塞窗口。 + for seg = s.writeNext; seg != nil; /*&& s.outstanding < s.sndCwnd*/ seg = seg.Next() { + // 如果seg的flags是0,将flags改为psh|ack + if seg.flags == 0 { + seg.sequenceNumber = s.sndNxt + seg.flags = flagAck | flagPsh + } + var segEnd seqnum.Value + if seg.data.Size() == 0 { // 数据段没有负载,表示要结束连接 + if s.writeList.Back() != seg { + panic("FIN segments must be the final segment in the write list.") + } + // 发送 fin 报文 + seg.flags = flagAck | flagFin + // fin 报文需要确认,且消耗一个字节序列号 + segEnd = seg.sequenceNumber.Add(1) + } else { + // We're sending a non-FIN segment. + if seg.flags&flagFin != 0 { + panic("Netstack queues FIN segments without data.") + } + logger.TODO("发送正常的数据, 需要流量控制") + + } + + s.sendSegment(seg.data, seg.flags, seg.sequenceNumber) + // 发送一个数据段后,更新sndNxt + if s.sndNxt.LessThan(segEnd) { + s.sndNxt = segEnd + } + } }