From c0254080415b210741bfdfa29eb246b70f4643bc Mon Sep 17 00:00:00 2001 From: impact-eintr Date: Thu, 8 Dec 2022 12:38:46 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B9=B4=E8=BD=BB=E4=BA=BA=E7=9A=84=E7=AC=AC?= =?UTF-8?q?=E4=B8=80=E6=9D=A1tcp=E6=95=B0=E6=8D=AE=20xdm=E7=BB=88=E4=BA=8E?= =?UTF-8?q?=E6=8B=BF=E5=88=B0=E5=AE=A2=E6=88=B7=E7=AB=AF=E9=80=81=E8=BF=87?= =?UTF-8?q?=E6=9D=A5=E7=9A=84=E6=95=B0=E6=8D=AE=E4=BA=86=20=E6=9E=9C?= =?UTF-8?q?=E7=84=B6=E6=98=AF=E4=B9=8B=E5=89=8D=E6=8C=96=E7=9A=84=E5=9D=91?= =?UTF-8?q?=20=E6=8E=A5=E6=94=B6=E7=AA=97=E5=8F=A3=E5=88=9D=E5=A7=8B?= =?UTF-8?q?=E5=8C=96=E4=BA=86=E4=B8=AA0=20=E8=BF=98=E4=BB=A5=E4=B8=BA?= =?UTF-8?q?=E6=8F=A1=E6=89=8B=E6=B2=A1=E6=88=90=E5=8A=9F=E5=9C=A8=E9=87=8D?= =?UTF-8?q?=E8=AF=95...?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/tcpclient/main.go | 9 +- sleep/sleep_unsafe.go | 5 +- tcpip/transport/tcp/accept.go | 17 +++- tcpip/transport/tcp/connect.go | 158 +++++++++++++++++++++++++++++++- tcpip/transport/tcp/endpoint.go | 22 ++++- tcpip/transport/tcp/rcv.go | 13 ++- 6 files changed, 212 insertions(+), 12 deletions(-) diff --git a/cmd/tcpclient/main.go b/cmd/tcpclient/main.go index fe3ccab..a4f548d 100644 --- a/cmd/tcpclient/main.go +++ b/cmd/tcpclient/main.go @@ -2,20 +2,25 @@ package main import ( "fmt" + "log" "net" "time" ) func main() { go func() { - _, err := net.Dial("tcp", "192.168.1.1:9999") + conn, err := net.Dial("tcp", "192.168.1.1:9999") if err != nil { fmt.Println("err : ", err) return } + log.Println("连接建立") + conn.Write([]byte("helloworld")) + log.Println("发送了数据") + conn.Close() }() - t := time.NewTimer(500 * time.Millisecond) + t := time.NewTimer(1000 * time.Millisecond) select { case <-t.C: return diff --git a/sleep/sleep_unsafe.go b/sleep/sleep_unsafe.go index 1c989fe..5428d4f 100644 --- a/sleep/sleep_unsafe.go +++ b/sleep/sleep_unsafe.go @@ -68,7 +68,6 @@ package sleep import ( - "log" "sync/atomic" "unsafe" ) @@ -233,9 +232,9 @@ func (s *Sleeper) Fetch(block bool) (id int, ok bool) { // Reassociate the waker with the sleeper. If the waker was // still asserted we can return it, otherwise try the next one. old := (*Sleeper)(atomic.SwapPointer(&w.s, usleeper(s))) - log.Println("Sleeper", unsafe.Pointer(s), "old", unsafe.Pointer(old), "&assertSleeper", unsafe.Pointer(&assertedSleeper), "w.id", w.id) + //log.Println("Sleeper", unsafe.Pointer(s), "old", unsafe.Pointer(old), "&assertSleeper", unsafe.Pointer(&assertedSleeper), "w.id", w.id) if old == &assertedSleeper { - log.Println("成功返回 没有阻塞") + //log.Println("成功返回 没有阻塞") return w.id, true } } diff --git a/tcpip/transport/tcp/accept.go b/tcpip/transport/tcp/accept.go index de040cc..78653fc 100644 --- a/tcpip/transport/tcp/accept.go +++ b/tcpip/transport/tcp/accept.go @@ -12,6 +12,7 @@ import ( "netstack/tcpip/header" "netstack/tcpip/seqnum" "netstack/tcpip/stack" + "netstack/waiter" "sync" "time" ) @@ -246,18 +247,30 @@ func (l *listenContext) createEndpointAndPerformHandshake(s *segment, opts *head return ep, nil } +func (e *endpoint) deliverAccepted(n *endpoint) { + e.mu.RLock() + if e.state == stateListen { + e.acceptedChan <- n + e.waiterQueue.Notify(waiter.EventIn) + } else { + n.Close() + } + e.mu.RUnlock() +} + // 一旦侦听端点收到SYN段,handleSynSegment就会在其自己的goroutine中调用。它负责完成握手并将新端点排队以进行接受。 // 在TCP开始使用SYN cookie接受连接之前,允许使用有限数量的这些goroutine。 func (e *endpoint) handleSynSegment(ctx *listenContext, s *segment, opts *header.TCPSynOptions) { defer decSynRcvdCount() defer s.decRef() - _, err := ctx.createEndpointAndPerformHandshake(s, opts) + // 这里返回的 n 是一个新的tcp端: LAddr:Port+RAddr:RPort + n, err := ctx.createEndpointAndPerformHandshake(s, opts) if err != nil { return } // 到这里,三次握手已经完成,那么分发一个新的连接 - //e.deliverAccepted(n) + e.deliverAccepted(n) } // handleListenSegment is called when a listening endpoint receives a segment diff --git a/tcpip/transport/tcp/connect.go b/tcpip/transport/tcp/connect.go index f0b36b2..f346ffc 100644 --- a/tcpip/transport/tcp/connect.go +++ b/tcpip/transport/tcp/connect.go @@ -155,6 +155,107 @@ func (h *handshake) resolveRoute() *tcpip.Error { } } +// checkAck checks if the ACK number, if present, of a segment received during +// a TCP 3-way handshake is valid. If it's not, a RST segment is sent back in +// response. +func (h *handshake) checkAck(s *segment) bool { + if s.flagIsSet(flagAck) && s.ackNumber != h.iss+1 { + // RFC 793, page 36, states that a reset must be generated when + // the connection is in any non-synchronized state and an + // incoming segment acknowledges something not yet sent. The + // connection remains in the same state. + // TODO 返回一个RST报文 + //ack := s.sequenceNumber.Add(s.logicalLen()) + //h.ep.sendRaw(buffer.VectorisedView{}, flagRst|flagAck, s.ackNumber, ack, 0) + return false + } + + return true +} + +// synSentState 是客户端或者服务端接收到第一个握手报文的处理 +// 正常情况下,如果是客户端,此时应该收到 syn+ack 报文,处理后发送 ack 报文给服务端。 +// 如果是服务端,此时接收到syn报文,那么应该回复 syn+ack 报文给客户端,并设置状态为 handshakeSynRcvd。 +func (h *handshake) synSentState(s *segment) *tcpip.Error { + return nil +} + +// synRcvdState handles a segment received when the TCP 3-way handshake is in +// the SYN-RCVD state. +// 正常情况下,会调用该函数来处理第三次 ack 报文 +func (h *handshake) synRcvdState(s *segment) *tcpip.Error { + if s.flagIsSet(flagRst) { + // TODO 需要根据窗口返回 等理解了窗口后再写 + return nil + } + // 校验ack报文 + if !h.checkAck(s) { + return nil + } + + // 如果是syn报文,且序列号对应不上,那么返回 rst + if s.flagIsSet(flagSyn) && s.sequenceNumber != h.ackNum-1 { + // TODO 返回RST报文 + return nil + } + + // 如果时ack报文 表示三次握手已经完成 + if s.flagIsSet(flagAck) { + // TODO 修改时间戳 + h.state = handshakeCompleted + return nil + } + + return nil +} + +// 握手的时候处理tcp段 +func (h *handshake) handleSegment(s *segment) *tcpip.Error { + h.sndWnd = s.window + if !s.flagIsSet(flagSyn) && h.sndWndScale > 0 { + h.sndWnd <<= uint8(h.sndWndScale) + } + log.Println(h.sndWnd) + + switch h.state { + case handshakeSynRcvd: + // 正常情况下,服务端接收客户端第三次 ack 报文 + return h.synRcvdState(s) + case handshakeSynSent: + // 客户端发送了syn报文后的处理 + return h.synSentState(s) + } + return nil +} + +// processSegments goes through the segment queue and processes up to +// maxSegmentsPerWake (if they're available). +func (h *handshake) processSegments() *tcpip.Error { + + log.Println("处理握手报文") + for i := 0; i < maxSegmentsPerWake; i++ { + // 从建立中的连接队列里取一个报文段 + s := h.ep.segmentQueue.dequeue() + if s == nil { + return nil + } + err := h.handleSegment(s) + if err != nil { + return err + } + + if h.state == handshakeCompleted { + break + } + } + // If the queue is not empty, make sure we'll wake up in the next + // iteration. + if !h.ep.segmentQueue.empty() { + h.ep.newSegmentWaker.Assert() + } + return nil +} + // execute executes the TCP 3-way handshake. // 执行tcp 3次握手,客户端和服务端都是调用该函数来实现三次握手 /* @@ -216,6 +317,9 @@ func (h *handshake) execute() *tcpip.Error { case wakerForNewSegment: // 处理握手报文 + if err := h.processSegments(); err != nil { + return err + } } } return nil @@ -353,8 +457,50 @@ func sendTCP(r *stack.Route, id stack.TransportEndpointID, data buffer.Vectorise return r.WritePacket(hdr, data, ProtocolNumber, ttl) } +// 从发送队列中取出数据并发送出去 +func (e *endpoint) handleWrite() *tcpip.Error { + return nil +} + +// 关闭连接的处理,最终会调用 sendData 来发送 fin 包 +func (e *endpoint) handleClose() *tcpip.Error { + return nil +} + // handleSegments 从队列中取出 tcp 段数据,然后处理它们。 func (e *endpoint) handleSegments() *tcpip.Error { + log.Println("年轻人的第一条数据") + checkRequeue := true + for i := 0; i < maxSegmentsPerWake; i++ { + s := e.segmentQueue.dequeue() + if s == nil { + checkRequeue = false + break + } + if s.flagIsSet(flagRst) { + // TODO 如果收到 rst 报文 + s.decRef() + return tcpip.ErrConnectionReset + } else if s.flagIsSet(flagAck) { + // 处理正常报文 + + // RFC 793, page 41 states that "once in the ESTABLISHED + // state all segments must carry current acknowledgment + // information." + // 处理tcp数据段,同时给接收器和发送器 + // 为何要给发送器传接收到的数据段呢?主要是为了滑动窗口的滑动和拥塞控制处理 + e.rcv.handleRcvdSegment(s) + //e.snd.handleRcvdSegment(s) + } + s.decRef() // 该segment处理完成 + } + // If the queue is not empty, make sure we'll wake up in the next + // iteration. + if checkRequeue && !e.segmentQueue.empty() { + e.newSegmentWaker.Assert() + } + + // TODO 需要添加 return nil } @@ -367,8 +513,14 @@ func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error { w *sleep.Waker f func() *tcpip.Error }{ - {}, - {}, + { + w: &e.sndWaker, + f: e.handleWrite, + }, + { + w: &e.sndCloseWaker, + f: e.handleClose, + }, { w: &e.newSegmentWaker, f: e.handleSegments, @@ -395,7 +547,7 @@ func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error { if err := funcs[v].f(); err != nil { e.mu.Lock() //e.resetConnectionLocked(err) - //// Lock released below. + // Lock released below. //epilogue() log.Println(err) return nil diff --git a/tcpip/transport/tcp/endpoint.go b/tcpip/transport/tcp/endpoint.go index 60cf66d..f5b99e0 100644 --- a/tcpip/transport/tcp/endpoint.go +++ b/tcpip/transport/tcp/endpoint.go @@ -214,7 +214,7 @@ func (e *endpoint) Listen(backlog int) (err *tcpip.Error) { e.stack.Stats().TCP.PassiveConnectionOpenings.Increment() // tcp服务端实现的主循环,这个函数很重要,用一个goroutine来服务 - go e.protocolListenLoop(seqnum.Size(0)) + go e.protocolListenLoop(seqnum.Size(e.receiveBufferAvailable())) return nil } @@ -239,6 +239,7 @@ func (e *endpoint) Accept() (tcpip.Endpoint, *waiter.Queue, *tcpip.Error) { var n *endpoint select { case n = <-e.acceptedChan: + log.Println("监听者进行一个新连接的分发", n.id) default: return nil, nil, tcpip.ErrWouldBlock } @@ -384,6 +385,25 @@ func (e *endpoint) HandleControlPacket(id stack.TransportEndpointID, typ stack.C } +// receiveBufferAvailable calculates how many bytes are still available in the +// receive buffer. +// tcp流量控制:计算未被占用的接收缓存大小 +func (e *endpoint) receiveBufferAvailable() int { + e.rcvListMu.Lock() + size := e.rcvBufSize + used := e.rcvBufUsed + e.rcvListMu.Unlock() + + // We may use more bytes than the buffer size when the receive buffer + // shrinks. + if used >= size { + return 0 + } + + log.Println("Init Recv Windeow Size: ", size-used) + return size - used +} + // maybeEnableTimestamp marks the timestamp option enabled for this endpoint if // the SYN options indicate that timestamp option was negotiated. It also // initializes the recentTS with the value provided in synOpts.TSval. diff --git a/tcpip/transport/tcp/rcv.go b/tcpip/transport/tcp/rcv.go index b151d52..1d67a82 100644 --- a/tcpip/transport/tcp/rcv.go +++ b/tcpip/transport/tcp/rcv.go @@ -1,6 +1,9 @@ package tcp -import "netstack/tcpip/seqnum" +import ( + "log" + "netstack/tcpip/seqnum" +) type receiver struct{} @@ -9,3 +12,11 @@ func newReceiver(ep *endpoint, irs seqnum.Value, rcvWnd seqnum.Size, rcvWndScale r := &receiver{} return r } + +// handleRcvdSegment handles TCP segments directed at the connection managed by +// r as they arrive. It is called by the protocol main loop. +// 从 handleSegments 接收到tcp段,然后进行处理消费,所谓的消费就是将负载内容插入到接收队列中 +func (r *receiver) handleRcvdSegment(s *segment) { + log.Println(s.data) + +}