From 07e2e09adb60a33d374332cce0aff36fb52d85dd Mon Sep 17 00:00:00 2001 From: impact-eintr Date: Sun, 18 Dec 2022 18:52:33 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=80=E4=B8=AA=E7=AE=80=E5=8D=95=E7=9A=84?= =?UTF-8?q?=E6=B3=A8=E5=86=8C=E5=BC=8Fnetstack?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/netstack/TcpConn.go | 32 ++++----- cmd/netstack/main.go | 114 ++++++++++++++++++++++-------- cmd/netstack/tcp_server.go | 121 +++++++++++++++++++++++++------- tcpip/link/loopback/loopback.go | 10 ++- tcpip/transport/tcp/README.md | 24 +++++++ 5 files changed, 221 insertions(+), 80 deletions(-) diff --git a/cmd/netstack/TcpConn.go b/cmd/netstack/TcpConn.go index 6984be7..60ed576 100644 --- a/cmd/netstack/TcpConn.go +++ b/cmd/netstack/TcpConn.go @@ -107,24 +107,15 @@ func (conn *TcpConn) SetSockOpt(opt interface{}) error { return nil } -// Listener tcp连接监听器 -type Listener struct { - raddr tcpip.FullAddress - ep tcpip.Endpoint - wq *waiter.Queue - we *waiter.Entry - notifyCh chan struct{} -} - // Accept 封装tcp的accept操作 -func (l *Listener) Accept() (*TcpConn, error) { - l.wq.EventRegister(l.we, waiter.EventIn|waiter.EventOut) - defer l.wq.EventUnregister(l.we) +func (conn *TcpConn) Accept() (*TcpConn, error) { + conn.wq.EventRegister(conn.we, waiter.EventIn|waiter.EventOut) + defer conn.wq.EventUnregister(conn.we) for { - ep, wq, err := l.ep.Accept() + ep, wq, err := conn.ep.Accept() if err != nil { if err == tcpip.ErrWouldBlock { - <-l.notifyCh + <-conn.notifyCh continue } return nil, fmt.Errorf("%s", err.String()) @@ -137,7 +128,7 @@ func (l *Listener) Accept() (*TcpConn, error) { } } -func tcpListen(s *stack.Stack, proto tcpip.NetworkProtocolNumber, addr tcpip.Address, localPort int) *Listener { +func tcpListen(s *stack.Stack, proto tcpip.NetworkProtocolNumber, addr tcpip.Address, localPort int) *TcpConn { var wq waiter.Queue // 新建一个tcp端 ep, err := s.NewEndpoint(tcp.ProtocolNumber, proto, &wq) @@ -157,9 +148,18 @@ func tcpListen(s *stack.Stack, proto tcpip.NetworkProtocolNumber, addr tcpip.Add } waitEntry, notifyCh := waiter.NewChannelEntry(nil) - return &Listener{ + return &TcpConn{ ep: ep, wq: &wq, we: &waitEntry, notifyCh: notifyCh} } + +// Listener tcp连接监听器 +type Listener struct { + raddr tcpip.FullAddress + ep tcpip.Endpoint + wq *waiter.Queue + we *waiter.Entry + notifyCh chan struct{} +} diff --git a/cmd/netstack/main.go b/cmd/netstack/main.go index 9c26d3e..2aed549 100644 --- a/cmd/netstack/main.go +++ b/cmd/netstack/main.go @@ -4,7 +4,6 @@ import ( "encoding/binary" "flag" "fmt" - "io" "log" "net" "netstack/logger" @@ -136,38 +135,35 @@ func main() { //logger.SetFlags(logger.TCP) go func() { // echo server - //time.Sleep(1 * time.Second) - //pid := Register() - //log.Fatal(pid) + pid := Register() - listener := tcpListen(s, proto, addr, localPort) + lfd := Listen(pid, addr, localPort) done <- struct{}{} + for { - conn, err := listener.Accept() + cfd := Accept(pid, lfd) if err != nil { log.Println(err) } - log.Println("服务端 建立连接") - go func() { for { time.Sleep(50 * time.Millisecond) buf := make([]byte, 1024) - n, err := conn.Read(buf) + n, err := Read(pid, cfd, buf) if err != nil { log.Println(err) break } logger.NOTICE(string(buf[:n])) - //conn.Write([]byte("Hello Client")) + Write(pid, cfd, []byte("Hello Client")) } }() } }() - <-done - go func() { + <-done + logger.NOTICE("客户端上线") port := localPort conn, err := Dial(s, header.IPv4ProtocolNumber, addr, port) if err != nil { @@ -185,13 +181,13 @@ func main() { for i := 0; i < 1; i++ { conn.Write([]byte("Hello Server!")) - //buf := make([]byte, 1024) - //n, err := conn.Read(buf) - //if err != nil { - // log.Println(err) - // break - //} - //logger.NOTICE(string(buf[:n])) + buf := make([]byte, 1024) + n, err := conn.Read(buf) + if err != nil { + log.Println(err) + break + } + logger.NOTICE(string(buf[:n])) time.Sleep(1 * time.Second) } @@ -199,8 +195,6 @@ func main() { conn.Close() }() - close(done) - l, err := net.Listen("tcp", "127.0.0.1:9999") if err != nil { fmt.Println("Error listening:", err) @@ -213,6 +207,7 @@ func main() { TCPServer(l, rcv) + defer close(done) c := make(chan os.Signal) signal.Notify(c, os.Interrupt, os.Kill, syscall.SIGUSR1, syscall.SIGUSR2) <-c @@ -221,6 +216,7 @@ func main() { const ( REGISTER byte = iota LISTEN + ACCEPT CONNECT READ WRITE @@ -237,25 +233,81 @@ func Register() PID { } defer conn.Close() - _, err = conn.Write(make([]byte, 1<<20)) - buf, _ := io.ReadAll(conn) + _, err = conn.Write([]byte{0}) + buf := make([]byte, 2) + conn.Read(buf) - log.Fatal(buf) - return 3 + return PID(binary.BigEndian.Uint16(buf)) } // Listen 传递 pid addr port 监听+绑定地址 func Listen(pid PID, addr tcpip.Address, localPort int) FD { - // 连接本地netstack服务 conn, err := net.Dial("tcp", "127.0.0.1:9999") if err != nil { fmt.Println("err : ", err) return 0 } - buf := make([]byte, 1024) - binary.BigEndian.PutUint16(buf[:4], uint16(pid)) + // 1 pid port + buf := make([]byte, 5) + buf[0] = LISTEN + binary.BigEndian.PutUint16(buf[1:3], uint16(pid)) + binary.BigEndian.PutUint16(buf[3:5], uint16(localPort)) conn.Write(buf) - buf, _ = io.ReadAll(conn) - log.Fatal(buf) - return 0 + + buf = make([]byte, 2) + conn.Read(buf) + return FD(binary.BigEndian.Uint16(buf)) +} + +// Accept 传递 pid + listenerfd 返回 connfd +func Accept(pid PID, lfd FD) FD { + conn, err := net.Dial("tcp", "127.0.0.1:9999") + if err != nil { + fmt.Println("err : ", err) + return 0 + } + // 2 pid lfd + buf := make([]byte, 5) + buf[0] = ACCEPT + binary.BigEndian.PutUint16(buf[1:3], uint16(pid)) + binary.BigEndian.PutUint16(buf[3:5], uint16(lfd)) + conn.Write(buf) + + buf = make([]byte, 2) + conn.Read(buf) + return FD(binary.BigEndian.Uint16(buf)) +} + +func Read(pid PID, cfd FD, rcv []byte) (int, error) { + conn, err := net.Dial("tcp", "127.0.0.1:9999") + if err != nil { + fmt.Println("err : ", err) + return 0, err + } + // 2 pid cfd + buf := make([]byte, 5) + buf[0] = READ + binary.BigEndian.PutUint16(buf[1:3], uint16(pid)) + binary.BigEndian.PutUint16(buf[3:5], uint16(cfd)) + conn.Write(buf) + + return conn.Read(rcv) +} + +func Write(pid PID, cfd FD, snd []byte) (int, error) { + conn, err := net.Dial("tcp", "127.0.0.1:9999") + if err != nil { + fmt.Println("err : ", err) + return 0, err + } + // 2 pid cfd + buf := make([]byte, 9) + buf[0] = WRITE + binary.BigEndian.PutUint16(buf[1:3], uint16(pid)) + binary.BigEndian.PutUint16(buf[3:5], uint16(cfd)) + binary.BigEndian.PutUint32(buf[5:9], uint32(len(snd))) + buf = append(buf, snd...) + conn.Write(buf) + + return conn.Read(nil) } diff --git a/cmd/netstack/tcp_server.go b/cmd/netstack/tcp_server.go index 2660bf8..be692cd 100644 --- a/cmd/netstack/tcp_server.go +++ b/cmd/netstack/tcp_server.go @@ -3,11 +3,11 @@ package main import ( "encoding/binary" "fmt" - "io" "log" "net" "netstack/logger" "netstack/tcpip" + "netstack/tcpip/header" "netstack/tcpip/stack" "runtime" "strings" @@ -17,11 +17,17 @@ import ( // PID netstack PID type PID uint16 -var currPID uint32 = 2 // 0 1 2 用过了 +var currPID uint32 = 1 +// Socket in memory +type Socket struct { // 0 1 2 用过了 + socket *TcpConn +} + +// FD file descriptor type FD uint16 -var fds = make(map[PID][1024]FD, 8) +var fds = make(map[PID][]Socket, 8) type TCPHandler interface { Handle(net.Conn) @@ -54,32 +60,34 @@ var transportPool = make(map[uint64]tcpip.Endpoint) type RCV struct { *stack.Stack - ep tcpip.Endpoint - addr tcpip.FullAddress rcvBuf []byte } func (r *RCV) Handle(conn net.Conn) { - logger.NOTICE("RCV handle") var err error - r.rcvBuf, err = io.ReadAll(conn) + _, err = conn.Read(r.rcvBuf) if err != nil && len(r.rcvBuf) < 1 { // 操作码 panic(err) } - logger.NOTICE("注意测试") - switch r.rcvBuf[0] { case REGISTER: - conn.Write(r.Register()) + conn.Write(r.register()) + return case LISTEN: - goto FAULT + conn.Write(r.listen()) + return + case ACCEPT: + conn.Write(r.accept()) + return case CONNECT: goto FAULT case READ: - goto FAULT + conn.Write(r.read()) + return case WRITE: - goto FAULT + conn.Write(r.write()) + return case CLOSE: goto FAULT default: @@ -90,30 +98,89 @@ FAULT: logger.NOTICE("FAULT") } -func (r *RCV) Listen() { - if len(r.rcvBuf) < 9 { // udp ip port +func (r *RCV) listen() []byte { + if len(r.rcvBuf) < 5 { // udp ip port log.Println("Error: too few arg") - return + return nil } - port := binary.BigEndian.Uint16(r.rcvBuf[7:9]) - r.addr = tcpip.FullAddress{ - NIC: 1, - Addr: tcpip.Address(r.rcvBuf[3:7]), - Port: port, + pid := binary.BigEndian.Uint16(r.rcvBuf[1:3]) + port := binary.BigEndian.Uint16(r.rcvBuf[3:5]) + + listener := tcpListen(r.Stack, header.IPv4ProtocolNumber, "", int(port)) + + for i, v := range fds[PID(pid)] { + if i > 2 && v.socket == nil { + fds[PID(pid)][i] = Socket{listener} + b := make([]byte, 2) + binary.BigEndian.PutUint16(b[:2], uint16(i)) + return b + } } - r.ep.Bind(r.addr, nil) + panic("No Idle Space") } -func (r *RCV) Connect() { - r.ep.Connect(tcpip.FullAddress{NIC: 1, Addr: "\xc0\xa8\x01\x02", Port: 8888}) +func (r *RCV) accept() []byte { + if len(r.rcvBuf) < 5 { // udp ip port + log.Println("Error: too few arg") + return nil + } + pid := binary.BigEndian.Uint16(r.rcvBuf[1:3]) + lfd := binary.BigEndian.Uint16(r.rcvBuf[3:5]) + + l := fds[PID(pid)][lfd] + conn, err := l.socket.Accept() + if err != nil { + log.Println(err) + } + for i, v := range fds[PID(pid)] { + if i > 2 && v.socket == nil { + fds[PID(pid)][i] = Socket{conn} + b := make([]byte, 2) + binary.BigEndian.PutUint16(b[:2], uint16(i)) + return b + } + } + panic("No Idle Space") } -func (r *RCV) Close() { - r.ep.Close() +func (r *RCV) connect() { } -func (r *RCV) Register() []byte { +func (r *RCV) read() []byte { + if len(r.rcvBuf) < 5 { // opc pid cfd + log.Println("Error: too few arg") + return nil + } + pid := binary.BigEndian.Uint16(r.rcvBuf[1:3]) + cfd := binary.BigEndian.Uint16(r.rcvBuf[3:5]) + + c := fds[PID(pid)][cfd] + buf := make([]byte, 1024) + c.socket.Read(buf) + return buf +} + +func (r *RCV) write() []byte { + if len(r.rcvBuf) < 9 { // opc pid cfd length + log.Println("Error: too few arg") + return nil + } + pid := binary.BigEndian.Uint16(r.rcvBuf[1:3]) + cfd := binary.BigEndian.Uint16(r.rcvBuf[3:5]) + length := binary.BigEndian.Uint32(r.rcvBuf[5:9]) + + c := fds[PID(pid)][cfd] + c.socket.Write(r.rcvBuf[9 : 9+length]) + return nil +} + +func (r *RCV) close() { +} + +// Register 注册pid +func (r *RCV) register() []byte { pid := uint16(atomic.AddUint32(&currPID, 1)) + fds[PID(pid)] = make([]Socket, 1024) b := make([]byte, 2) binary.BigEndian.PutUint16(b[:2], pid) return b diff --git a/tcpip/link/loopback/loopback.go b/tcpip/link/loopback/loopback.go index bb9be52..3dd5815 100644 --- a/tcpip/link/loopback/loopback.go +++ b/tcpip/link/loopback/loopback.go @@ -1,8 +1,6 @@ package loopback import ( - "fmt" - "netstack/logger" "netstack/tcpip" "netstack/tcpip/buffer" "netstack/tcpip/stack" @@ -49,10 +47,10 @@ func (e *endpoint) WritePacket(r *stack.Route, hdr buffer.Prependable, payload b // TODO 这里整点活 在特定的情况下丢掉数据报 模拟网络阻塞 e.count++ - if e.count == 5 || e.count == 7 || e.count == 9 { - logger.NOTICE(fmt.Sprintf("统计 %d 丢掉这个报文", e.count)) - return nil - } + //if e.count == 5 || e.count == 7 || e.count == 9 { + // logger.NOTICE(fmt.Sprintf("统计 %d 丢掉这个报文", e.count)) + // return nil + //} // Because we're immediately turning around and writing the packet back to the // rx path, we intentionally don't preserve the remote and local link // addresses from the stack.Route we're passed. diff --git a/tcpip/transport/tcp/README.md b/tcpip/transport/tcp/README.md index 5da8d75..3e30dfd 100644 --- a/tcpip/transport/tcp/README.md +++ b/tcpip/transport/tcp/README.md @@ -305,3 +305,27 @@ Silly Window Syndrome 翻译成中文就是“糊涂窗口综合症”。正如 ## tcp的拥塞控制 节介绍 tcp 的拥塞控制,拥塞控制控制是 tcp 协议中最复杂问题之一,主要是如何探测链路已经拥塞?探测到拥塞后如何处理? + +事实上,早期 TCP 实现是没有拥塞控制的,拥塞控制是网络出现问题后才提出的,在 1986 年,互联网首次出现了一系列“拥堵事故”,从伯克利实验室到加州大学伯克利分校的数据吞吐量从32 Kbps降至40 bps。然后在伯克利实验室的工作人员 Van_Jacobson 很好奇为何网络会拥堵,并开始调查网络变得如此糟糕,2 年后,他提出了拥塞算法。 + +我们知道 TCP 通过采样了 RTT 并计算 RTO,但是,如果网络上的延时突然增加超过了 RTO,那么 TCP 对这个事做出的应对只有重传数据,但是,重传会导致网络的负担更重,于是会导致更大的延迟以及更多的丢包,这样就会进入恶性循环被不断地放大。试想一下,如果一个网络内有成千上万的 TCP 连接都这么行事,那么马上就会形成“网络风暴”,TCP 这个协议就会拖垮整个网络。这是一个灾难。 + +所以,TCP 不能忽略网络上发生的事情,而一个劲的重发数据,对网络造成更大的伤害。于是就提出了拥塞控制,当拥塞发生的时候,要做自我牺牲,降低发送速率。就像交通阻塞一样,每个车都应该把路让出来,而不要再去抢路了。 + +要注意流量控制和拥塞控制的区别,流量控制只控制两个端的速度,它抑制发送端的速度,以便接收端能接收,但它并不关心中间链路的网络情况。而拥塞控制是关心中间链路的网络情况,防止过多的数据注入到网络中,以便防止中间的链路或路由不过载。 + +## 拥塞控制的算法 + +TCP 通过维护一个拥塞窗口(cwnd 全称 Congestion Window)来进行拥塞控制,拥塞控制的原则是,只要网络中没有出现拥塞,拥塞窗口的值就可以再增大一些,以便把更多的数据包发送出去,但只要网络出现拥塞,拥塞窗口的值就应该减小一些,以减少注入到网络中的数据包数。 + +拥塞控制主要是四个算法:1)慢启动,2)拥塞避免,3)快速重传,4)快速恢复。 + +这四个算法不是一天都搞出来的,这个四算法的发展经历了很时间,至今都还在优化中,仅实现这个四个算法的拥塞算法叫 Reno 算法。 + +### 慢启动(slow start) + +### 拥塞避免(congestion avoidance) + +### 快速重传(Fast Retransmit) + +### 快速恢复(Fast Recovery)