FIXME!!! 为什么客户端会重复发送 fin|ack

This commit is contained in:
impact-eintr
2022-12-12 20:33:50 +08:00
parent 0eff0e912f
commit 07fb40bb6a
9 changed files with 360 additions and 25 deletions

View File

@@ -136,20 +136,24 @@ func main() {
go func() { // echo server
listener := tcpListen(s, proto, addr, localPort)
done <- struct{}{}
conn, err := listener.Accept()
if err != nil {
log.Println(err)
}
for {
buf := make([]byte, 1024)
if _, err := conn.Read(buf); err != nil {
conn, err := listener.Accept()
if err != nil {
log.Println(err)
break
}
fmt.Println(string(buf))
// conn.Write([]byte("Server echo"))
//}
log.Println("服务端 建立连接")
for {
buf := make([]byte, 1024)
if _, err := conn.Read(buf); err != nil {
log.Println(err)
break
}
fmt.Println(string(buf))
// conn.Write([]byte("Server echo"))
//}
}
}
os.Exit(1)
@@ -159,10 +163,12 @@ func main() {
<-done
go func() {
port := localPort
_, err := Dial(s, header.IPv4ProtocolNumber, addr, port)
conn, err := Dial(s, header.IPv4ProtocolNumber, addr, port)
if err != nil {
log.Fatal(err)
}
log.Println("客户端 建立连接")
conn.Close()
}()
close(done)
@@ -180,6 +186,7 @@ func Dial(s *stack.Stack, proto tcpip.NetworkProtocolNumber, addr tcpip.Address,
}
var wq waiter.Queue
waitEntry, notifyCh := waiter.NewChannelEntry(nil)
wq.EventRegister(&waitEntry, waiter.EventOut)
// 新建一个tcp端
ep, err := s.NewEndpoint(tcp.ProtocolNumber, proto, &wq)
if err != nil {
@@ -189,6 +196,7 @@ func Dial(s *stack.Stack, proto tcpip.NetworkProtocolNumber, addr tcpip.Address,
if err != nil {
if err == tcpip.ErrConnectStarted {
<-notifyCh
log.Println("???")
} else {
return nil, fmt.Errorf("%s", err.String())
}
@@ -245,6 +253,11 @@ func (conn *TcpConn) Write(snd []byte) error {
}
}
func (conn *TcpConn) Close() {
log.Println("Close")
conn.ep.Close()
}
// Listener tcp连接监听器
type Listener struct {
raddr tcpip.FullAddress

View File

@@ -1,6 +1,7 @@
package logger
import (
"log"
"sync"
)
@@ -50,3 +51,19 @@ func (l *logger) Info(mask uint8, f func()) {
f()
}
}
func (l *logger) info(f func()) {
f()
}
func TODO(msg string) {
GetInstance().info(func() {
log.Println("TODO: " + msg)
})
}
func FIXME(msg string) {
GetInstance().info(func() {
log.Fatal("FIXME: " + msg)
})
}

View File

@@ -234,7 +234,6 @@ func (s *Sleeper) Fetch(block bool) (id int, ok bool) {
old := (*Sleeper)(atomic.SwapPointer(&w.s, usleeper(s)))
//log.Println("Sleeper", unsafe.Pointer(s), "old", unsafe.Pointer(old), "&assertSleeper", unsafe.Pointer(&assertedSleeper), "w.id", w.id)
if old == &assertedSleeper {
//log.Println("成功返回 没有阻塞")
return w.id, true
}
}

View File

@@ -26,7 +26,7 @@ func (v Value) InRange(a, b Value) bool {
}
// InWindows check v in [first, first+size)
func (v Value) InWindows(first Value, size Size) bool {
func (v Value) InWindow(first Value, size Size) bool {
return v.InRange(first, first.Add(size))
}

View File

@@ -102,3 +102,17 @@ TCP 最初只规定了一种选项,即最大报文段长度 MSSMaximum Segm
a. 如果此时双方都没有数据发送,主机 B 会周期性超时重传,直到收到 A 的确认,收到之后主机 B 的 TCP 连接也为 Established 状态,双向可以发包。
b. 如果此时 A 有数据发送,主机 B 收到主机 A 的 Data + ACK自然会切换为 established 状态,并接受主机 A 的 Data。
## TCP连接的释放
![img](https://doc.shiyanlou.com/document-uid949121labid10418timestamp1555574060171.png)
1数据传输结束后主机 A 的应用进程调用 Close 函数,先向其 TCP 发出释放连接请求不再发送数据。TCP 通知对方要释放从主机 A 到主机 B 的连接,将发往主机 B 的 TCP 报文段首部的终止比特 FIN 置为 1序号 seq1 等于已传送数据的最后一个字节的序号加 1。
2主机 B 的 TCP 收到释放连接通知后发出确认,其序号为 seq1+1同时通知应用进程这样主机 A 到主机 B 的连接就释放了,连接处于半关闭状态。主机 B 不在接受主机 A 发来的数据;但主机 B 还向 A 发送数据,主机 A 若正确接收数据仍需要发送确认。
3在主机 B 向主机 A 的数据发送结束后,其应用进程应该主动调用 Close 函数,释放 TCP 连接。主机 B 发出的连接释放报文段必须将终止比特置为 1并使其序号 seq2 等于前面已经传送过的数据的最后一个字节的序号加 1还必须回复 ACK=seq1+1。
4主机 A 对主机 B 的连接释放报文段发出确认,将 ACK 置为 1ACK=seq2+1, seq=seq1+1。这样才把从 B 到 A 的反方向连接释放掉,主机 A 的 TCP 再向其应用进程报告,整个连接已经全部释放。
还有一个要注意的是fin 包和数据包一样,如果丢失了,会进行重传,实际上可能是是 fin 丢失或 ack 丢失。重传的周期由 rto 决定。

View File

@@ -11,8 +11,10 @@ import (
"netstack/tcpip/header"
"netstack/tcpip/seqnum"
"netstack/tcpip/stack"
"netstack/waiter"
"sync"
"time"
"unsafe"
)
const maxSegmentsPerWake = 100
@@ -108,6 +110,16 @@ func (h *handshake) resetState() *tcpip.Error {
return nil
}
// effectiveRcvWndScale returns the effective receive window scale to be used.
// If the peer doesn't support window scaling, the effective rcv wnd scale is
// zero; otherwise it's the value calculated based on the initial rcv wnd.
func (h *handshake) effectiveRcvWndScale() uint8 {
if h.sndWndScale < 0 {
return 0
}
return uint8(h.rcvWndScale)
}
// resetToSynRcvd resets the state of the handshake object to the SYN-RCVD
// state.
func (h *handshake) resetToSynRcvd(iss seqnum.Value, irs seqnum.Value, opts *header.TCPSynOptions) {
@@ -180,7 +192,74 @@ func (h *handshake) checkAck(s *segment) bool {
// synSentState 是客户端或者服务端接收到第一个握手报文的处理
// 正常情况下,如果是客户端,此时应该收到 syn+ack 报文,处理后发送 ack 报文给服务端。
// 如果是服务端此时接收到syn报文那么应该回复 syn+ack 报文给客户端,并设置状态为 handshakeSynRcvd。
// NOTE 为什么这里服务端又一次实现发送 syn|ack 是为了处理普通tcp连接收到 syn 报文的异常情况
// 比如第一次监听者收到syn并发送syn|ack后并没有收到返回 客户端
func (h *handshake) synSentState(s *segment) *tcpip.Error {
log.Println("客户端收到了 syn|ack segment")
// RFC 793, page 37, states that in the SYN-SENT state, a reset is
// acceptable if the ack field acknowledges the SYN.
if s.flagIsSet(flagRst) {
if s.flagIsSet(flagAck) && s.ackNumber == h.iss+1 {
return tcpip.ErrConnectionRefused
}
return nil
}
if !h.checkAck(s) {
return nil
}
// We are in the SYN-SENT state. We only care about segments that have
// the SYN flag.
if !s.flagIsSet(flagSyn) {
return nil
}
// Parse the SYN options.
rcvSynOpts := parseSynSegmentOptions(s)
// Remember if the Timestamp option was negotiated.
h.ep.maybeEnableTimestamp(&rcvSynOpts)
// Remember if the SACKPermitted option was negotiated.
h.ep.maybeEnableSACKPermitted(&rcvSynOpts)
// Remember the sequence we'll ack from now on.
h.ackNum = s.sequenceNumber + 1
h.flags |= flagAck
h.mss = rcvSynOpts.MSS
h.sndWndScale = rcvSynOpts.WS
// If this is a SYN ACK response, we only need to acknowledge the SYN
// and the handshake is completed.
// 客户端接收到了 syn+ack 报文
if s.flagIsSet(flagAck) {
// 客户端握手完成,发送 ack 报文给服务端
h.state = handshakeCompleted
// 最后依次 ack 报文丢了也没关系因为后面一但发送任何数据包都是带ack的
h.ep.sendRaw(buffer.VectorisedView{}, flagAck, h.iss+1, h.ackNum, h.rcvWnd>>h.effectiveRcvWndScale())
return nil
}
// A SYN segment was received, but no ACK in it. We acknowledge the SYN
// but resend our own SYN and wait for it to be acknowledged in the
// SYN-RCVD state.
// 服务端收到了 syn 报文,应该回复客户端 syn+ack 报文,且设置状态为 handshakeSynRcvd
h.state = handshakeSynRcvd
synOpts := header.TCPSynOptions{
WS: h.rcvWndScale,
TS: rcvSynOpts.TS,
TSVal: h.ep.timestamp(),
TSEcr: h.ep.recentTS,
// We only send SACKPermitted if the other side indicated it
// permits SACK. This is not explicitly defined in the RFC but
// this is the behaviour implemented by Linux.
SACKPermitted: rcvSynOpts.SACKPermitted,
}
// 发送 syn+ack 报文,如果该报文在链路中丢了,没有关系,客户端会重新发送 syn 报文
sendSynTCP(&s.route, h.ep.id, h.flags, h.iss, h.ackNum, h.rcvWnd, synOpts)
return nil
}
@@ -190,6 +269,11 @@ func (h *handshake) synSentState(s *segment) *tcpip.Error {
func (h *handshake) synRcvdState(s *segment) *tcpip.Error {
if s.flagIsSet(flagRst) {
// TODO 需要根据窗口返回 等理解了窗口后再写
// RFC 793, page 37, states that in the SYN-RCVD state, a reset
// is acceptable if the sequence number is in the window.
if s.sequenceNumber.InWindow(h.ackNum, h.rcvWnd) {
return tcpip.ErrConnectionRefused
}
return nil
}
// 校验ack报文
@@ -506,6 +590,7 @@ func (e *endpoint) sendRaw(data buffer.VectorisedView, flags byte, seq, ack seqn
// sackBlocks = e.sack.Blocks[:e.sack.NumBlocks]
//}
options := e.makeOptions(sackBlocks)
log.Println(unsafe.Pointer(e), "怎么又调用了一次 handleWrite!!!!!!!!!!!!!!!!!!!!!!!!!!!")
err := sendTCP(&e.route, e.id, data, e.route.DefaultTTL(), flags, seq, ack, rcvWnd, options)
putOptions(options)
return err
@@ -513,16 +598,50 @@ func (e *endpoint) sendRaw(data buffer.VectorisedView, flags byte, seq, ack seqn
// 从发送队列中取出数据并发送出去
func (e *endpoint) handleWrite() *tcpip.Error {
e.sndBufMu.Lock()
// 得到第一个tcp段
first := e.sndQueue.Front()
if first != nil {
// 向发送链表添加元素
e.snd.writeList.PushBackList(&e.sndQueue)
// NOTE 更新发送队列下一个发送字节的序号 一次性将链表全部取用
// 当有新的数据需要发送时会有相关逻辑更新这个数值
e.snd.sndNxtList.UpdateForward(e.sndBufInQueue)
e.sndBufInQueue = 0
}
e.sndBufMu.Unlock()
// Initialize the next segment to write if it's currently nil.
// 初始化snder的发送列表头
if e.snd.writeNext == nil {
e.snd.writeNext = first
}
// Push out any new packets.
// 将数据发送出去
e.snd.sendData()
return nil
}
// 关闭连接的处理,最终会调用 sendData 来发送 fin 包
func (e *endpoint) handleClose() *tcpip.Error {
// Drain the send queue.
e.handleWrite()
// Mark send side as closed.
// 标记发送器关闭
e.snd.closed = true
return nil
}
// handleSegments 从队列中取出 tcp 段数据,然后处理它们。
func (e *endpoint) handleSegments() *tcpip.Error {
log.Println(unsafe.Pointer(e), "处理报文")
checkRequeue := true
for i := 0; i < maxSegmentsPerWake; i++ {
s := e.segmentQueue.dequeue()
@@ -561,6 +680,16 @@ func (e *endpoint) handleSegments() *tcpip.Error {
func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error {
// 收尾工作
// 收尾的一些工作
epilogue := func() {
// e.mu is expected to be hold upon entering this section.
// TODO 需要添加
e.mu.Unlock()
// When the protocol loop exits we should wake up our waiters.
e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut)
}
// 处理三次握手
if handshake {
@@ -572,8 +701,36 @@ func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error {
// 执行握手
err = h.execute()
}
// 处理握手有错
if err != nil {
e.lastErrorMu.Lock()
e.lastError = err
e.lastErrorMu.Unlock()
e.mu.Lock()
e.state = stateError
e.hardError = err
// Lock released below.
epilogue()
return err
}
// 到这里就表示三次握手已经成功了,那么初始化发送器和接收器
e.snd = newSender(e, h.iss, h.ackNum-1, h.sndWnd, h.mss, h.sndWndScale)
e.rcvListMu.Lock()
e.rcv = newReceiver(e, h.ackNum-1, h.rcvWnd, h.effectiveRcvWndScale())
e.rcvListMu.Unlock()
}
e.mu.Lock()
e.state = stateConnected
// TODO drained
e.mu.Unlock()
// 提醒 Dial 函数 连接已经成功建立
e.waiterQueue.Notify(waiter.EventOut)
// Set up the functions that will be called when the main protocol loop
// wakes up.
// 触发器的事件,这些函数很重要
@@ -601,11 +758,26 @@ func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error {
s.AddWaker(funcs[i].w, i)
}
// 恢复的端点需要以下断言和通知。新创建的新端点具有空状态,不应调用任何端点。
e.segmentQueue.mu.Lock()
if !e.segmentQueue.list.Empty() {
e.newSegmentWaker.Assert()
}
e.segmentQueue.mu.Unlock()
e.rcvListMu.Lock()
if !e.rcvList.Empty() {
e.waiterQueue.Notify(waiter.EventIn)
}
e.rcvListMu.Unlock()
// TODO 需要添加 workerCleanup
// 主循环处理tcp报文
// 要使这个主循环结束也就是tcp连接完全关闭得同时满足三个条件
// 1接收器关闭了 2发送器关闭了 3下一个未确认的序列号等于添加到发送列表的下一个段的序列号
//for !e.rcv.closed || !e.snd.closed || e.snd.sndUna != e.snd.sndNxtList {
for {
for !e.rcv.closed /*TODO 其他条件*/ {
e.workMu.Unlock()
// s.Fetch 会返回事件的index比如 v=0 的话,
// funcs[v].f()就是调用 e.handleWrite
@@ -616,9 +788,19 @@ func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error {
e.mu.Lock()
//e.resetConnectionLocked(err)
// Lock released below.
//epilogue()
epilogue()
log.Println(err)
return nil
}
}
// Mark endpoint as closed.
e.mu.Lock()
if e.state != stateError {
e.state = stateClosed
}
// Lock released below.
epilogue()
return nil
}

View File

@@ -41,6 +41,11 @@ type endpoint struct {
netProto tcpip.NetworkProtocolNumber // 网络协议号 ipv4 ipv6
waiterQueue *waiter.Queue // 事件驱动机制
// lastError represents the last error that the endpoint reported;
// access to it is protected by the following mutex.
lastErrorMu sync.Mutex
lastError *tcpip.Error
// TODO 需要添加
// rcvListMu can be taken after the endpoint mu below.
@@ -88,6 +93,9 @@ type endpoint struct {
// TSVal field in the timestamp option.
tsOffset uint32
// shutdownFlags represent the current shutdown state of the endpoint.
shutdownFlags tcpip.ShutdownFlags
// sackPermitted is set to true if the peer sends the TCPSACKPermitted
// option in the SYN/SYN-ACK.
sackPermitted bool
@@ -146,7 +154,25 @@ func newEndpoint(stack *stack.Stack, netProto tcpip.NetworkProtocolNumber, waite
}
func (e *endpoint) Close() {
log.Println("TODO 在写了 在写了")
e.Shutdown(tcpip.ShutdownWrite | tcpip.ShutdownRead)
e.mu.Lock()
// We always release ports inline so that they are immediately available
// for reuse after Close() is called. If also registered, it means this
// is a listening socket, so we must unregister as well otherwise the
// next user would fail in Listen() when trying to register.
if e.isPortReserved {
e.stack.ReleasePort(e.effectiveNetProtos, ProtocolNumber, e.id.LocalAddress, e.id.LocalPort)
e.isPortReserved = false
if e.isRegistered {
e.stack.UnregisterTransportEndpoint(e.boundNICID, e.effectiveNetProtos, ProtocolNumber, e.id)
e.isRegistered = false
}
}
logger.TODO("添加清理资源的逻辑")
e.mu.Unlock()
}
// Read 从tcp的接收队列中读取数据
@@ -407,6 +433,46 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) (er
}
func (e *endpoint) Shutdown(flags tcpip.ShutdownFlags) *tcpip.Error {
e.mu.Lock()
defer e.mu.Unlock()
e.shutdownFlags |= flags
switch e.state {
case stateConnected: // 客户端关闭
// 不能直接关闭读数据包,因为关闭连接的时候四次挥手还需要读取报文。
if (e.shutdownFlags & tcpip.ShutdownWrite) != 0 {
e.rcvListMu.Lock()
rcvBufUsed := e.rcvBufUsed
e.rcvListMu.Unlock()
if rcvBufUsed > 0 {
// 如果接收队列中还有数据 通知对端RESET
logger.TODO("通知对端RESET")
return nil
}
}
e.sndBufMu.Lock()
if e.sndClosed {
// Already closed.
e.sndBufMu.Unlock()
break
}
// Queue fin segment.
s := newSegmentFromView(&e.route, e.id, nil)
e.sndQueue.PushBack(s)
e.sndBufInQueue++ // 仅仅占用一个字节位置
// Mark endpoint as closed.
e.sndClosed = true
e.sndBufMu.Unlock()
// 触发调用 handleClose
e.sndCloseWaker.Assert()
case stateListen: // 服务端关闭
logger.FIXME("添加服务端关闭逻辑")
default:
return tcpip.ErrNotConnected
}
return nil
}
@@ -640,11 +706,11 @@ func (e *endpoint) HandlePacket(r *stack.Route, id stack.TransportEndpointID, vv
}
// Send packet to worker goroutine.
if e.segmentQueue.enqueue(s) {
var prifix string = "tcp连接"
var prefix string = "tcp连接"
if _, err := e.GetRemoteAddress(); err != nil {
prifix = "监听者"
prefix = "监听者"
}
log.Printf(prifix+"收到 tcp [%s] 报文片段 from %s, seq: %d, ack: |%d|",
log.Printf(prefix+"收到 tcp [%s] 报文片段 from %s, seq: %d, ack: |%d|",
flagString(s.flags), fmt.Sprintf("%s:%d", s.id.RemoteAddress, s.id.RemotePort),
s.sequenceNumber, s.ackNumber)

View File

@@ -52,11 +52,12 @@ func (r *receiver) getSendParams() (rcvNxt seqnum.Value, rcvWnd seqnum.Size) {
return r.rcvNxt, r.rcvNxt.Size(r.rcvAcc) >> r.rcvWndScale
}
// FIXME 重大嫌疑 客户端为什么又发送了一个 fin|ack ??????????????????????????????????
func (r *receiver) consumeSegment(s *segment, segSeq seqnum.Value, segLen seqnum.Size) bool {
if segLen > 0 {
// 我们期望接收到的序列号范围应该是 seqStart <= rcvNxt < seqEnd
// 如果不在这个范围内说明我们少了数据段返回false表示不能立马消费
if !r.rcvNxt.InWindows(segSeq, segLen) {
if !r.rcvNxt.InWindow(segSeq, segLen) {
return false
}
// 尝试去除已经确认过的数据

View File

@@ -1,7 +1,7 @@
package tcp
import (
"log"
"netstack/logger"
"netstack/tcpip"
"netstack/tcpip/buffer"
"netstack/tcpip/seqnum"
@@ -48,9 +48,19 @@ type sender struct {
// sndNxt 是要发送的下一个段的序列号。
sndNxt seqnum.Value
// sndNxtList is the sequence number of the next segment to be added to
// the send list.
// sndNxtList 是要添加到发送列表的下一个段的序列号。
sndNxtList seqnum.Value
// maxSentAck is the maxium acknowledgement actually sent.
maxSentAck seqnum.Value
closed bool
writeNext *segment
// 发送链表
writeList segmentList
// cc is the congestion control algorithm in use for this sender.
// cc 是实现拥塞控制算法的接口
cc congestionControl
@@ -67,9 +77,8 @@ func newSender(ep *endpoint, iss, irs seqnum.Value, sndWnd seqnum.Size, mss uint
}
func (s *sender) sendAck() {
log.Println("发送字节序", s.sndNxt)
s.sendSegment(buffer.VectorisedView{}, flagAck, s.sndNxt) // seq = cookies+1 ack ack|fin.seq+1
s.sendSegment(buffer.VectorisedView{}, flagFin, 0)
logger.TODO("发送字节序")
}
// sendSegment sends a new segment containing the given payload, flags and
@@ -98,5 +107,39 @@ func (s *sender) handleRcvdSegment(seg *segment) {
// 发送数据段,最终调用 sendSegment 来发送
func (s *sender) sendData() {
//log.Println(unsafe.Pointer(s.ep), "怎么又调用了一次")
var seg *segment
// 遍历发送链表,发送数据
// tcp拥塞控制s.outstanding < s.sndCwnd 判断正在发送的数据量不能超过拥塞窗口。
for seg = s.writeNext; seg != nil; /*&& s.outstanding < s.sndCwnd*/ seg = seg.Next() {
// 如果seg的flags是0将flags改为psh|ack
if seg.flags == 0 {
seg.sequenceNumber = s.sndNxt
seg.flags = flagAck | flagPsh
}
var segEnd seqnum.Value
if seg.data.Size() == 0 { // 数据段没有负载,表示要结束连接
if s.writeList.Back() != seg {
panic("FIN segments must be the final segment in the write list.")
}
// 发送 fin 报文
seg.flags = flagAck | flagFin
// fin 报文需要确认,且消耗一个字节序列号
segEnd = seg.sequenceNumber.Add(1)
} else {
// We're sending a non-FIN segment.
if seg.flags&flagFin != 0 {
panic("Netstack queues FIN segments without data.")
}
logger.TODO("发送正常的数据, 需要流量控制")
}
s.sendSegment(seg.data, seg.flags, seg.sequenceNumber)
// 发送一个数据段后更新sndNxt
if s.sndNxt.LessThan(segEnd) {
s.sndNxt = segEnd
}
}
}