好难好难好难 窗口控制失效 主要表现为 segment.data.Size() 在 cap操作后进入下个循环时恢复初始值 截取失败

This commit is contained in:
impact-eintr
2022-12-14 21:45:51 +08:00
parent 0b6ffaa995
commit 57fcf28f83
7 changed files with 139 additions and 26 deletions

View File

@@ -147,11 +147,12 @@ func main() {
go func() { go func() {
for { for {
buf := make([]byte, 1024) buf := make([]byte, 1024)
if _, err := conn.Read(buf); err != nil { n, err := conn.Read(buf)
if err != nil {
log.Println(err) log.Println(err)
break break
} }
fmt.Println("data: ", len(buf), string(buf)) fmt.Println("data: ", n, len(buf), string(buf))
// conn.Write([]byte("Server echo")) // conn.Write([]byte("Server echo"))
//} //}
} }
@@ -160,16 +161,20 @@ func main() {
}() }()
<-done <-done
go func() { go func() {
port := localPort port := localPort
conn, err := Dial(s, header.IPv4ProtocolNumber, addr, port) conn, err := Dial(s, header.IPv4ProtocolNumber, addr, port)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
log.Println("客户端 建立连接") log.Printf("客户端 建立连接\n")
buf := make([]byte, 1<<21)
time.Sleep(time.Second)
log.Printf("\n\n客户端 写入数据")
buf := make([]byte, 1<<17)
conn.Write(buf) conn.Write(buf)
time.Sleep(3 * time.Second) time.Sleep(1 * time.Minute)
conn.Close() conn.Close()
}() }()
@@ -237,7 +242,7 @@ func (conn *TcpConn) Read(rcv []byte) (int, error) {
n = cap(rcv) n = cap(rcv)
} }
rcv = append(rcv[:0], buf[:n]...) rcv = append(rcv[:0], buf[:n]...)
return n, nil return len(buf), nil
} }
} }

View File

@@ -21,11 +21,10 @@ func (v Value) LessThanEq(w Value) bool {
// InRange v ∈ [a, b) // InRange v ∈ [a, b)
func (v Value) InRange(a, b Value) bool { func (v Value) InRange(a, b Value) bool {
//return a <= v && v < b return v-a < b-a // 注意 uint32(-1) > uint32(0)
return v-a < b-a
} }
// InWindows check v in [first, first+size) // InWindow check v in [first, first+size)
func (v Value) InWindow(first Value, size Size) bool { func (v Value) InWindow(first Value, size Size) bool {
return v.InRange(first, first.Add(size)) return v.InRange(first, first.Add(size))
} }

View File

@@ -82,10 +82,10 @@ const (
// 根据这些参数生成 syn+ack 报文的选项参数,附在 tcp 选项中,回复给主机 A。 // 根据这些参数生成 syn+ack 报文的选项参数,附在 tcp 选项中,回复给主机 A。
func newHandshake(ep *endpoint, rcvWnd seqnum.Size) (handshake, *tcpip.Error) { func newHandshake(ep *endpoint, rcvWnd seqnum.Size) (handshake, *tcpip.Error) {
h := handshake{ h := handshake{
ep: ep, ep: ep,
active: true, // 激活这个管理器 active: true, // 激活这个管理器
rcvWnd: rcvWnd, // 初始接收窗口 rcvWnd: rcvWnd, // 初始接收窗口
// TODO rcvWndScale: FindWndScale(rcvWnd), // 接收窗口扩展因子
} }
if err := h.resetState(); err != nil { if err := h.resetState(); err != nil {
return handshake{}, err return handshake{}, err
@@ -93,6 +93,25 @@ func newHandshake(ep *endpoint, rcvWnd seqnum.Size) (handshake, *tcpip.Error) {
return h, nil return h, nil
} }
// FindWndScale determines the window scale to use for the given maximum window
// size.
// 因为窗口的大小不能超过序列号范围的一半即窗口最大2^30,
// so (2^16)*(2^maxWnsScale) < 2^30,get maxWnsScale = 14
func FindWndScale(wnd seqnum.Size) int {
if wnd < 0x10000 {
return 0
}
max := seqnum.Size(0xffff)
s := 0
for wnd > max && s < header.MaxWndScale {
s++
max <<= 1
}
return s
}
func (h *handshake) resetState() *tcpip.Error { func (h *handshake) resetState() *tcpip.Error {
// 随机一个iss(对方将收到的序号) 防止黑客搞事 // 随机一个iss(对方将收到的序号) 防止黑客搞事
b := make([]byte, 4) b := make([]byte, 4)
@@ -301,7 +320,7 @@ func (h *handshake) synRcvdState(s *segment) *tcpip.Error {
func (h *handshake) handleSegment(s *segment) *tcpip.Error { func (h *handshake) handleSegment(s *segment) *tcpip.Error {
h.sndWnd = s.window h.sndWnd = s.window
if !s.flagIsSet(flagSyn) && h.sndWndScale > 0 { if !s.flagIsSet(flagSyn) && h.sndWndScale > 0 {
h.sndWnd <<= uint8(h.sndWndScale) h.sndWnd <<= uint8(h.sndWndScale) // 收紧窗口
} }
switch h.state { switch h.state {
@@ -619,13 +638,13 @@ func (e *endpoint) sendRaw(data buffer.VectorisedView, flags byte, seq, ack seqn
func (e *endpoint) handleWrite() *tcpip.Error { func (e *endpoint) handleWrite() *tcpip.Error {
e.sndBufMu.Lock() e.sndBufMu.Lock()
// 得到第一个tcp段 // 得到第一个tcp段 注意并不是取出只是查看
first := e.sndQueue.Front() first := e.sndQueue.Front()
if first != nil { if first != nil {
// 向发送链表添加元素 // 向发送链表添加元素
e.snd.writeList.PushBackList(&e.sndQueue) e.snd.writeList.PushBackList(&e.sndQueue)
// NOTE 更新发送队列下一个发送字节的序号 一次性将链表全部取用 // NOTE 更新发送队列下一个发送字节的序号 一次性将链表全部取用
// 当有新的数据需要发送时会有相逻辑更新这个数值 // 当有新的数据需要发送时会有相逻辑更新这个数值
e.snd.sndNxtList.UpdateForward(e.sndBufInQueue) e.snd.sndNxtList.UpdateForward(e.sndBufInQueue)
e.sndBufInQueue = 0 e.sndBufInQueue = 0
} }
@@ -684,7 +703,6 @@ func (e *endpoint) handleSegments() *tcpip.Error {
// Patch the window size in the segment according to the // Patch the window size in the segment according to the
// send window scale. // send window scale.
s.window <<= e.snd.sndWndScale s.window <<= e.snd.sndWndScale
// If the timestamp option is negotiated and the segment // If the timestamp option is negotiated and the segment
// does not carry a timestamp option then the segment // does not carry a timestamp option then the segment
// must be dropped as per // must be dropped as per
@@ -713,8 +731,8 @@ func (e *endpoint) handleSegments() *tcpip.Error {
// tcp可靠性累积确认 // tcp可靠性累积确认
// 如果发送的最大ack不等于下一个接收的序列号发送ack // 如果发送的最大ack不等于下一个接收的序列号发送ack
log.Println("============", e.rcv.rcvNxt, e.snd.maxSentAck, "=============")
if e.rcv.rcvNxt != e.snd.maxSentAck { if e.rcv.rcvNxt != e.snd.maxSentAck {
fmt.Printf("\n\n=======ACK=======%d=======ACK======\n\n", e.rcv.rcvNxt-e.snd.maxSentAck)
e.snd.sendAck() e.snd.sendAck()
} }

View File

@@ -818,6 +818,19 @@ func (e *endpoint) HandleControlPacket(id stack.TransportEndpointID, typ stack.C
} }
// 当收到ack确认时 需要更新发送确认缓冲占用
func (e *endpoint) updateSndBufferUsage(v int) {
e.sndBufMu.Lock()
notify := e.sndBufUsed >= e.sndBufSize>>1
e.sndBufUsed -= v
notify = notify && e.sndBufUsed < e.sndBufSize>>1
e.sndBufMu.Unlock()
if notify { // 如果缓存中剩余的数据过多是不需要补充的
log.Fatal("缓存中剩余的数据", e.sndBufUsed, notify)
e.waiterQueue.Notify(waiter.EventOut)
}
}
func (e *endpoint) readyToRead(s *segment) { func (e *endpoint) readyToRead(s *segment) {
e.rcvListMu.Lock() e.rcvListMu.Lock()
if s != nil { if s != nil {
@@ -840,14 +853,11 @@ func (e *endpoint) receiveBufferAvailable() int {
size := e.rcvBufSize size := e.rcvBufSize
used := e.rcvBufUsed used := e.rcvBufUsed
e.rcvListMu.Unlock() e.rcvListMu.Unlock()
// We may use more bytes than the buffer size when the receive buffer // We may use more bytes than the buffer size when the receive buffer
// shrinks. // shrinks.
if used >= size { if used >= size {
return 0 return 0
} }
log.Println("Init Recv Windeow Size: ", size-used)
return size - used return size - used
} }

View File

@@ -58,6 +58,7 @@ func (r *receiver) getSendParams() (rcvNxt seqnum.Value, rcvWnd seqnum.Size) {
r.rcvAcc = acc r.rcvAcc = acc
} }
log.Println("-------------", n, acc, r.rcvWndScale)
return r.rcvNxt, r.rcvNxt.Size(r.rcvAcc) >> r.rcvWndScale return r.rcvNxt, r.rcvNxt.Size(r.rcvAcc) >> r.rcvWndScale
} }

View File

@@ -114,6 +114,20 @@ func (s *segment) incRef() {
atomic.AddInt32(&s.refCnt, 1) atomic.AddInt32(&s.refCnt, 1)
} }
// logicalLen is the segment length in the sequence number space. It's defined
// as the data length plus one for each of the SYN and FIN bits set.
// 计算tcp段的逻辑长度包括负载数据的长度如果有控制标记需要加1
func (s *segment) logicalLen() seqnum.Size {
l := seqnum.Size(s.data.Size())
if s.flagIsSet(flagSyn) {
l++
}
if s.flagIsSet(flagFin) {
l++
}
return l
}
func (s *segment) parse() bool { func (s *segment) parse() bool {
h := header.TCP(s.data.First()) h := header.TCP(s.data.First())
offset := int(h.DataOffset()) offset := int(h.DataOffset())

View File

@@ -9,6 +9,7 @@ import (
"netstack/tcpip/seqnum" "netstack/tcpip/seqnum"
"sync" "sync"
"time" "time"
"unsafe"
) )
const ( const (
@@ -253,6 +254,7 @@ func (s *sender) sendSegment(data buffer.VectorisedView, flags byte, seq seqnum.
// Remember the max sent ack. // Remember the max sent ack.
s.maxSentAck = rcvNxt s.maxSentAck = rcvNxt
log.Println(s.ep.id.LocalPort, "要求扩展窗口", s.sndWnd)
return s.ep.sendRaw(data, flags, seq, rcvNxt, rcvWnd) return s.ep.sendRaw(data, flags, seq, rcvNxt, rcvWnd)
} }
@@ -262,6 +264,62 @@ func (s *sender) handleRcvdSegment(seg *segment) {
// 因此发送更多数据。如果需要,这也将重新启用重传计时器。 // 因此发送更多数据。如果需要,这也将重新启用重传计时器。
// 存放当前窗口大小。 // 存放当前窗口大小。
s.sndWnd = seg.window s.sndWnd = seg.window
log.Println(s.ep.id.LocalPort, "移动窗口", s.sndWnd)
// 获取确认号
ack := seg.ackNumber
// 如果ack在最小未确认的seq和segNext之间
if (ack - 1).InRange(s.sndUna, s.sndNxt) {
log.Printf("[...XXXXXX]-[%d|\t%d\t|%d]==>", s.sndNxt, ack-1, s.sndUna)
if s.ep.sendTSOk && seg.parsedOptions.TSEcr != 0 {
// TSVal/Ecr values sent by Netstack are at a millisecond
// granularity.
//elapsed := time.Duration(s.ep.timestamp()-seg.parsedOptions.TSEcr) * time.Millisecond
//s.updateRTO(elapsed)
}
// 获取这次确认的字节数,即 ack - snaUna
acked := s.sndUna.Size(ack)
// 更新下一个未确认的序列号
s.sndUna = ack
ackLeft := acked
//originalOutstanding := s.outstanding
// 从发送链表中删除已经确认的数据,发送窗口的滑动。
//log.Printf("[...XXXXXX]-[%d|\t\t|%d]==>", s.sndNxt, s.sndUna)
for ackLeft > 0 { // 有成功确认的数据 丢弃它们 有剩余数据的话继续发送(根据拥塞策略控制)
seg := s.writeList.Front()
datalen := seg.logicalLen()
if datalen > ackLeft {
seg.data.TrimFront(int(ackLeft))
break
}
log.Println(s.writeNext == seg)
if s.writeNext == seg {
log.Fatal("更新 下一段")
s.writeNext = seg.Next()
}
// 从发送链表中删除已确认的tcp段。
s.writeList.Remove(seg)
// 因为有一个tcp段确认了所以 outstanding 减1
s.outstanding--
seg.decRef()
ackLeft -= datalen
}
// 当收到ack确认时需要更新发送缓冲占用
s.ep.updateSndBufferUsage(int(acked))
// 如果发生超时重传时s.outstanding可能会降到零以下
// 重置为零但后来得到一个覆盖先前发送数据的确认。
if s.outstanding < 0 {
s.outstanding = 0
}
}
// TODO tcp拥塞控制
if s.writeList.Front() != nil {
log.Println("确认成功 继续发送")
}
s.sendData() s.sendData()
} }
@@ -297,6 +355,7 @@ func (s *sender) sendData() {
panic("Netstack queues FIN segments without data.") panic("Netstack queues FIN segments without data.")
} }
if !seg.sequenceNumber.LessThan(end) { if !seg.sequenceNumber.LessThan(end) {
log.Println("暂停数据发送", seg.sequenceNumber, end)
break break
} }
@@ -309,18 +368,19 @@ func (s *sender) sendData() {
// 如果seg的payload字节数大于available // 如果seg的payload字节数大于available
// 将seg进行分段并且插入到该seg的后面 // 将seg进行分段并且插入到该seg的后面
if seg.data.Size() > available { if seg.data.Size() > available {
log.Println("-------------------------------------分段!!!", seg.data.Size(), available, end)
nSeg := seg.clone() nSeg := seg.clone()
nSeg.data.TrimFront(available)
nSeg.sequenceNumber.UpdateForward(seqnum.Size(available)) nSeg.sequenceNumber.UpdateForward(seqnum.Size(available))
s.writeList.InsertAfter(seg, nSeg) s.writeList.InsertAfter(seg, nSeg)
seg.data.CapLength(available) seg.data.CapLength(available)
} }
s.outstanding++ s.outstanding++
log.Println("发送窗口一开始是", s.sndWnd,
"最多发送数据", available, dataSent,
"发送端缓存包数量", s.outstanding)
segEnd = seg.sequenceNumber.Add(seqnum.Size(seg.data.Size())) segEnd = seg.sequenceNumber.Add(seqnum.Size(seg.data.Size()))
log.Println("发送窗口一开始是", s.sndWnd,
"最多发送数据", available,
"缓存数据尾", segEnd,
"发送端缓存包数量", s.outstanding)
} }
if !dataSent { // 上面有个break能跳过这一步 if !dataSent { // 上面有个break能跳过这一步
@@ -331,12 +391,18 @@ func (s *sender) sendData() {
s.sendSegment(seg.data, seg.flags, seg.sequenceNumber) s.sendSegment(seg.data, seg.flags, seg.sequenceNumber)
// 发送一个数据段后更新sndNxt // 发送一个数据段后更新sndNxt
if s.sndNxt.LessThan(segEnd) { if s.sndNxt.LessThan(segEnd) {
log.Println("更新sndNxt", s.sndNxt, segEnd)
s.sndNxt = segEnd s.sndNxt = segEnd
} }
} }
// Remember the next segment we'll write. // Remember the next segment we'll write.
s.writeNext = seg s.writeNext = seg
if seg != nil {
log.Println("-------------------------------------分段!!!", s.writeNext.data.Size())
log.Println(unsafe.Pointer(seg), seg.data.Size())
}
time.Sleep(200 * time.Millisecond)
// TODO 启动定时器 // TODO 启动定时器
} }