diff --git a/cmd/netstack/main.go b/cmd/netstack/main.go index 12c535d..35c6ea6 100644 --- a/cmd/netstack/main.go +++ b/cmd/netstack/main.go @@ -2,6 +2,7 @@ package main import ( "flag" + "fmt" "log" "net" "netstack/logger" @@ -147,16 +148,18 @@ func main() { log.Println("服务端 建立连接") go func() { + cnt := 0 for { // 一个慢读者 才能体现出网络的情况 - time.Sleep(500 * time.Millisecond) + time.Sleep(10 * time.Millisecond) buf := make([]byte, 1024) n, err := conn.Read(buf) if err != nil { log.Println(n, err) break } - logger.NOTICE("服务端读取了数据", string(buf)) + cnt+=n + logger.NOTICE("服务端读取了数据", fmt.Sprintf("n: %d, cnt: %d", n, cnt), string(buf)) //conn.Write([]byte("Hello Client")) } }() @@ -180,9 +183,10 @@ func main() { log.Printf("\n\n客户端 写入数据") - for i := 0; i < 1; i++ { - conn.Write(make([]byte, 1<<20)) - + cnt := 0 + for i := 0; i < 10; i++ { + conn.Write(make([]byte, 1<<(5))) + cnt += 1<<(5) //buf := make([]byte, 1024) //n, err := conn.Read(buf) //if err != nil { @@ -190,9 +194,11 @@ func main() { // break //} //logger.NOTICE(string(buf[:n])) - time.Sleep(5 * time.Millisecond) + time.Sleep(50 * time.Millisecond) } + logger.NOTICE("写完了", fmt.Sprintf("共计写入: %d", cnt)) + select {} conn.Close() }() diff --git a/tcpip/transport/tcp/endpoint.go b/tcpip/transport/tcp/endpoint.go index d2f5c1b..74ef553 100644 --- a/tcpip/transport/tcp/endpoint.go +++ b/tcpip/transport/tcp/endpoint.go @@ -335,6 +335,7 @@ func (e *endpoint) readLocked() (buffer.View, *tcpip.Error) { e.rcvBufUsed -= len(v) if wasZero && !e.zeroReceiveWindow(scale) { // 之前没空闲 现在有了 告知一下对端 e.notifyProtocolGoroutine(notifyNonZeroReceiveWindow) + logger.NOTICE("通知上层有了空间") } return v, nil @@ -565,17 +566,17 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) (er // Connect in the restore phase does not perform handshake. Restore its // connection setting here. if !handshake { - //e.segmentQueue.mu.Lock() - //for _, l := range []segmentList{e.segmentQueue.list, e.sndQueue, e.snd.writeList} { - // for s := l.Front(); s != nil; s = s.Next() { - // s.id = e.id - // s.route = r.Clone() - // e.sndWaker.Assert() - // } - //} - //e.segmentQueue.mu.Unlock() - //e.snd.updateMaxPayloadSize(int(e.route.MTU()), 0) - //e.state = stateConnected + e.segmentQueue.mu.Lock() + for _, l := range []segmentList{e.segmentQueue.list, e.sndQueue, e.snd.writeList} { + for s := l.Front(); s != nil; s = s.Next() { + s.id = e.id + s.route = r.Clone() + e.sndWaker.Assert() + } + } + e.segmentQueue.mu.Unlock() + e.snd.updateMaxPayloadSize(int(e.route.MTU()), 0) + e.state = stateConnected } if run { diff --git a/tcpip/transport/tcp/reno.go b/tcpip/transport/tcp/reno.go index d65d3ea..a5a9e86 100644 --- a/tcpip/transport/tcp/reno.go +++ b/tcpip/transport/tcp/reno.go @@ -14,7 +14,7 @@ func newRenoCC(s *sender) *renoState { // updateSlowStart 将根据NewReno使用的慢启动算法更新拥塞窗口。 // 如果在调整拥塞窗口后我们越过了 SSthreshold ,那么它将返回在拥塞避免模式下必须消耗的数据包的数量。 func (r *renoState) updateSlowStart(packetsAcked int) int { - // 在慢启动阶段,每次收到ack,sndCwnd加上已确认的段数 + // 在慢启动阶段,每当收到一个 ACK,cwnd++; 呈线性上升 newcwnd := r.s.sndCwnd + packetsAcked // 判断增大过后的拥塞窗口是否超过慢启动阀值 sndSsthresh, // 如果超过 sndSsthresh ,将窗口调整为 sndSsthresh @@ -32,8 +32,16 @@ func (r *renoState) updateSlowStart(packetsAcked int) int { // updateCongestionAvoidance 将在拥塞避免模式下更新拥塞窗口, // 如RFC5681第3.1节所述 +// 每当收到一个 ACK 时,cwnd = cwnd + 1/cwnd +// 每当过一个 RTT 时,cwnd = cwnd + 1 func (r *renoState) updateCongestionAvoidance(packetsAcked int) { - logger.FIXME("超过阈值后调整拥塞窗口 拥塞避免阶段") + // sndCAAckCount 累计收到的tcp段数 + r.s.sndCAAckCount += packetsAcked + // 如果累计的段数超过当前的拥塞窗口,那么 sndCwnd 加上 sndCAAckCount/sndCwnd 的整数倍 + if r.s.sndCAAckCount >= r.s.sndCwnd { + r.s.sndCwnd += r.s.sndCAAckCount / r.s.sndCwnd + r.s.sndCAAckCount = r.s.sndCAAckCount % r.s.sndCwnd + } } // 当检测到网络拥塞时,调用 reduceSlowStartThreshold。 @@ -56,7 +64,15 @@ func (r *renoState) HandleNDupAcks() { } func (r *renoState) HandleRTOExpired() { + // We lost a packet, so reduce ssthresh. + // 减小慢启动阀值 + r.reduceSlowStartThreshold() + // Reduce the congestion window to 1, i.e., enter slow-start. Per + // RFC 5681, page 7, we must use 1 regardless of the value of the + // initial congestion window. + // 更新拥塞窗口为1,这样就会重新进入慢启动 + r.s.sndCwnd = 1 } // packetsAcked 已经确认过的数据段数 @@ -69,7 +85,7 @@ func (r *renoState) Update(packetsAcked int) { return } } - // 进入拥塞避免阶段 + // 当拥塞窗口大于阈值时 进入拥塞避免阶段 r.updateCongestionAvoidance(packetsAcked) } diff --git a/tcpip/transport/tcp/snd.go b/tcpip/transport/tcp/snd.go index 76f6d53..16a494d 100644 --- a/tcpip/transport/tcp/snd.go +++ b/tcpip/transport/tcp/snd.go @@ -352,7 +352,7 @@ func (s *sender) updateRTO(rtt time.Duration) { if s.rto < minRTO { s.rto = minRTO } - logger.NOTICE("更新RTO RTT", s.rto.String(), rtt.String()) + logger.NOTICE("更新RTO", s.rto.String()," RTT:", rtt.String()) } // resendSegment resends the first unacknowledged segment. @@ -465,7 +465,6 @@ func (s *sender) handleRcvdSegment(seg *segment) { logger.NOTICE("重复收到3个ack报文 启动快速重传...") s.resendSegment() } - //log.Fatal(s.sndCwnd, s.sndSsthresh) if s.ep.id.LocalPort != 9999 { log.Println(s) @@ -510,7 +509,6 @@ func (s *sender) sendData() { // 如果TCP在超过重新传输超时的时间间隔内没有发送数据,TCP应该在开始传输之前将cwnd设置为不超过RW。 if !s.fr.active && time.Now().Sub(s.lastSendTime) > s.rto { - log.Fatal("重置sndCwnd") if s.sndCwnd > InitialCwnd { s.sndCwnd = InitialCwnd }