diff --git a/cmd/tcpclient/main.go b/cmd/tcpclient/main.go index 9e59899..cfa6a1d 100644 --- a/cmd/tcpclient/main.go +++ b/cmd/tcpclient/main.go @@ -4,28 +4,17 @@ import ( "fmt" "log" "net" - "time" ) func main() { - go func() { - conn, err := net.Dial("tcp", "192.168.1.1:9999") - if err != nil { - fmt.Println("err : ", err) - return - } - log.Println("连接建立") - conn.Write([]byte("helloworld")) - log.Println("发送了数据") - //buf := make([]byte, 1024) - //conn.Read(buf) - conn.Close() - }() - - t := time.NewTimer(1000 * time.Millisecond) - select { - case <-t.C: + conn, err := net.Dial("tcp", "192.168.1.1:9999") + if err != nil { + fmt.Println("err : ", err) return } - + //buf := make([]byte, 1024) + //conn.Read(buf) + if err = conn.Close(); err != nil { + log.Fatal(err) + } } diff --git a/tcpip/transport/tcp/accept.go b/tcpip/transport/tcp/accept.go index 7af7d70..4c2866e 100644 --- a/tcpip/transport/tcp/accept.go +++ b/tcpip/transport/tcp/accept.go @@ -222,7 +222,7 @@ func (l *listenContext) createEndpointAndPerformHandshake(s *segment, opts *head irs := s.sequenceNumber cookie := l.createCookie(s.id, irs, encodeMSS(opts.MSS)) logger.GetInstance().Info(logger.HANDSHAKE, func() { - log.Println("收到一个远端握手申请", irs, "客户端请携带 标记 iss", cookie) + log.Println("收到一个远端握手申请 SYN seq=", irs, "客户端请携带 标记 iss ", cookie, "+1") }) ep, err := l.createConnectedEndpoint(s, cookie, irs, opts) if err != nil { diff --git a/tcpip/transport/tcp/connect.go b/tcpip/transport/tcp/connect.go index fa33a2f..165899b 100644 --- a/tcpip/transport/tcp/connect.go +++ b/tcpip/transport/tcp/connect.go @@ -95,11 +95,11 @@ func (h *handshake) resetState() *tcpip.Error { } // 初始化状态为 SynSent h.state = handshakeSynSent - log.Println("收到 syn 同步报文 设置tcp状态为 [sent]") h.flags = flagSyn h.ackNum = 0 h.mss = 0 h.iss = seqnum.Value(uint32(b[0]) | uint32(b[1])<<8 | uint32(b[2])<<16 | uint32(b[3])<<24) + log.Println("收到 syn 同步报文 设置tcp状态为 [sent]") return nil } @@ -109,7 +109,6 @@ func (h *handshake) resetState() *tcpip.Error { func (h *handshake) resetToSynRcvd(iss seqnum.Value, irs seqnum.Value, opts *header.TCPSynOptions) { h.active = false h.state = handshakeSynRcvd - log.Println("发送 syn|ack 确认报文 设置tcp状态为 [rcvd]") h.flags = flagSyn | flagAck h.iss = iss h.ackNum = irs + 1 // NOTE ACK = synNum + 1 @@ -448,7 +447,7 @@ func sendTCP(r *stack.Route, id stack.TransportEndpointID, data buffer.Vectorise r.Stats().TCP.ResetsSent.Increment() } - log.Printf("send tcp %s segment to %s, seq: %d, ack: %d, rcvWnd: %d", + log.Printf("send tcp %s segment to %s, seq: |%d|, ack: %d, rcvWnd: %d", flagString(flags), fmt.Sprintf("%s:%d", id.RemoteAddress, id.RemotePort), seq, ack, rcvWnd) @@ -538,7 +537,7 @@ func (e *endpoint) handleSegments() *tcpip.Error { // 处理tcp数据段,同时给接收器和发送器 // 为何要给发送器传接收到的数据段呢?主要是为了滑动窗口的滑动和拥塞控制处理 e.rcv.handleRcvdSegment(s) - //e.snd.handleRcvdSegment(s) + e.snd.handleRcvdSegment(s) } s.decRef() // 该segment处理完成 } diff --git a/tcpip/transport/tcp/endpoint.go b/tcpip/transport/tcp/endpoint.go index ddecdf7..35256ac 100644 --- a/tcpip/transport/tcp/endpoint.go +++ b/tcpip/transport/tcp/endpoint.go @@ -599,7 +599,7 @@ func (e *endpoint) HandlePacket(r *stack.Route, id stack.TransportEndpointID, vv if _, err := e.GetRemoteAddress(); err != nil { prifix = "监听者" } - log.Printf(prifix+"收到 tcp [%s] 报文片段 from %s, seq: %d, ack: %d", + log.Printf(prifix+"收到 tcp [%s] 报文片段 from %s, seq: %d, ack: |%d|", flagString(s.flags), fmt.Sprintf("%s:%d", s.id.RemoteAddress, s.id.RemotePort), s.sequenceNumber, s.ackNumber) diff --git a/tcpip/transport/tcp/rcv.go b/tcpip/transport/tcp/rcv.go index 137b872..09a6d22 100644 --- a/tcpip/transport/tcp/rcv.go +++ b/tcpip/transport/tcp/rcv.go @@ -24,7 +24,7 @@ type receiver struct { func newReceiver(ep *endpoint, irs seqnum.Value, rcvWnd seqnum.Size, rcvWndScale uint8) *receiver { r := &receiver{ ep: ep, - rcvNxt: irs + 1, + rcvNxt: irs + 1, // 成功建立连接后期望读取的第一个字节序号 rcvAcc: irs.Add(rcvWnd + 1), rcvWndScale: rcvWndScale, } @@ -78,7 +78,6 @@ func (r *receiver) consumeSegment(s *segment, segSeq seqnum.Value, segLen seqnum r.rcvNxt = segSeq.Add(segLen) logger.GetInstance().Info(logger.TCP, func() { }) - log.Println("下一个期望接收的字节序列号", r.rcvNxt) // 如果收到 fin 报文 if s.flagIsSet(flagFin) { @@ -86,7 +85,7 @@ func (r *receiver) consumeSegment(s *segment, segSeq seqnum.Value, segLen seqnum r.rcvNxt++ // 收到 fin,立即回复 ack - r.ep.snd.sendAck() + r.ep.snd.sendAck() // FIXME 不应该是 seq+2 捏 } return true diff --git a/tcpip/transport/tcp/snd.go b/tcpip/transport/tcp/snd.go index e8e7543..4c264b0 100644 --- a/tcpip/transport/tcp/snd.go +++ b/tcpip/transport/tcp/snd.go @@ -1,6 +1,7 @@ package tcp import ( + "log" "netstack/tcpip" "netstack/tcpip/buffer" "netstack/tcpip/seqnum" @@ -66,7 +67,9 @@ func newSender(ep *endpoint, iss, irs seqnum.Value, sndWnd seqnum.Size, mss uint } func (s *sender) sendAck() { - s.sendSegment(buffer.VectorisedView{}, flagAck, s.sndNxt) + log.Println("发送字节序", s.sndNxt) + s.sendSegment(buffer.VectorisedView{}, flagAck, s.sndNxt) // seq = cookies+1 ack ack|fin.seq+1 + s.sendSegment(buffer.VectorisedView{}, flagFin, 0) } // sendSegment sends a new segment containing the given payload, flags and @@ -85,3 +88,15 @@ func (s *sender) sendSegment(data buffer.VectorisedView, flags byte, seq seqnum. return s.ep.sendRaw(data, flags, seq, rcvNxt, rcvWnd) } + +// 收到段时调用 handleRcvdSegment 它负责更新与发送相关的状态 +func (s *sender) handleRcvdSegment(seg *segment) { + // 现在某些待处理数据已被确认,或者窗口打开,或者由于快速恢复期间出现重复的ack而导致拥塞窗口膨胀, + // 因此发送更多数据。如果需要,这也将重新启用重传计时器。 + s.sendData() +} + +// 发送数据段,最终调用 sendSegment 来发送 +func (s *sender) sendData() { + +}