diff --git a/.vscode/c_cpp_properties.json b/.vscode/c_cpp_properties.json new file mode 100644 index 0000000..f6d7322 --- /dev/null +++ b/.vscode/c_cpp_properties.json @@ -0,0 +1,17 @@ +{ + "configurations": [ + { + "name": "Win32", + "includePath": [ + "${workspaceFolder}/**" + ], + "defines": [ + "_DEBUG", + "UNICODE", + "_UNICODE" + ], + "intelliSenseMode": "windows-msvc-x64" + } + ], + "version": 4 +} \ No newline at end of file diff --git a/cmd/tcpclient/main.go b/cmd/tcpclient/main.go index 0e78301..fe3ccab 100644 --- a/cmd/tcpclient/main.go +++ b/cmd/tcpclient/main.go @@ -3,12 +3,22 @@ package main import ( "fmt" "net" + "time" ) func main() { - _, err := net.Dial("tcp", "192.168.1.1:9999") - if err != nil { - fmt.Println("err : ", err) + go func() { + _, err := net.Dial("tcp", "192.168.1.1:9999") + if err != nil { + fmt.Println("err : ", err) + return + } + }() + + t := time.NewTimer(500 * time.Millisecond) + select { + case <-t.C: return } + } diff --git a/sleep/sleep_unsafe.go b/sleep/sleep_unsafe.go index dabbcce..0aca751 100644 --- a/sleep/sleep_unsafe.go +++ b/sleep/sleep_unsafe.go @@ -149,10 +149,14 @@ func (s *Sleeper) AddWaker(w *Waker, id int) { // nextWaker returns the next waker in the notification list, blocking if // needed. +// listenLoop的Sleeper尝试改变自己的调度 如果没有人等着催 就睡 +// 设置了block的话 尝试去获取本地列表中的唤醒者 并取出最前面的唤醒者 +// 要是本地列表为空 就去遍历共享列表 func (s *Sleeper) nextWaker(block bool) *Waker { // Attempt to replenish the local list if it's currently empty. if s.localList == nil { - for atomic.LoadPointer(&s.sharedList) == nil { + // 首次进入循环 检查共享链表为空 可能需要睡 + for atomic.LoadPointer(&s.sharedList) == nil { // 被唤醒 再次检查共享链表情况 很有可能有人催了 // Fail request if caller requested that we // don't block. if !block { @@ -163,13 +167,13 @@ func (s *Sleeper) nextWaker(block bool) *Waker { // this allows them to abort the wait by setting // waitingG back to zero (which we'll notice // before committing the sleep). - atomic.StoreUintptr(&s.waitingG, preparingG) + atomic.StoreUintptr(&s.waitingG, preparingG) // 准备睡 // Check if something was queued while we were // preparing to sleep. We need this interleaving // to avoid missing wake ups. - if atomic.LoadPointer(&s.sharedList) != nil { - atomic.StoreUintptr(&s.waitingG, 0) + if atomic.LoadPointer(&s.sharedList) != nil { // 有人催了 不睡了 + atomic.StoreUintptr(&s.waitingG, 0) // 放弃睡眠 break } @@ -180,16 +184,20 @@ func (s *Sleeper) nextWaker(block bool) *Waker { // commitSleep to decide whether to immediately // wake the caller up or to leave it sleeping. const traceEvGoBlockSelect = 24 + log.Println(atomic.LoadUintptr(&s.waitingG), "进入睡眠") + // 没人催 进入睡眠 gopark(commitSleep, &s.waitingG, "sleeper", traceEvGoBlockSelect, 0) } + log.Println(atomic.LoadPointer(&s.sharedList), "唤醒了", atomic.LoadUintptr(&s.waitingG)) // Pull the shared list out and reverse it in the local // list. Given that wakers push themselves in reverse // order, we fix things here. - v := (*Waker)(atomic.SwapPointer(&s.sharedList, nil)) - for v != nil { + // 将共享列表拉出来,在本地列表中反转。鉴于唤醒者以相反的顺序推动自己,我们在这里解决问题 + v := (*Waker)(atomic.SwapPointer(&s.sharedList, nil)) // 这里将唤醒者置空 + for v != nil { // 共享链表有东西 找到最后一位 cur := v - v = v.next + v = v.next // 向后遍历共享链表 cur.next = s.localList s.localList = cur @@ -216,6 +224,7 @@ func (s *Sleeper) nextWaker(block bool) *Waker { // allowed to call this method. func (s *Sleeper) Fetch(block bool) (id int, ok bool) { for { + // 这个s是ListenLoop的Sleeper w := s.nextWaker(block) // 如果没有 将暂停调度 call gopark if w == nil { return -1, false @@ -283,11 +292,12 @@ func (s *Sleeper) Done() { // enqueueAssertedWaker enqueues an asserted waker to the "ready" circular list // of wakers that want to notify the sleeper. -func (s *Sleeper) enqueueAssertedWaker(w *Waker) { +func (s *Sleeper) enqueueAssertedWaker(w *Waker) { // w.s 是 assertSleeper // Add the new waker to the front of the list. for { v := (*Waker)(atomic.LoadPointer(&s.sharedList)) w.next = v + // 每个新连接的 newSegmentWaker.s.sharedList 初始化为 正在睡眠的 listenLoop 的waker if atomic.CompareAndSwapPointer(&s.sharedList, uwaker(v), uwaker(w)) { break } @@ -296,7 +306,8 @@ func (s *Sleeper) enqueueAssertedWaker(w *Waker) { for { // Nothing to do if there isn't a G waiting. g := atomic.LoadUintptr(&s.waitingG) - if g == 0 { + log.Println("tcp端 所在的G: ", g) + if g == 0 { // 0 表示 醒着 return } @@ -304,6 +315,7 @@ func (s *Sleeper) enqueueAssertedWaker(w *Waker) { if atomic.CompareAndSwapUintptr(&s.waitingG, g, 0) { if g != preparingG { // We managed to get a G. Wake it up. + log.Println(g, "去唤醒 0") goready(g, 0) } } @@ -347,21 +359,23 @@ type Waker struct { // Assert moves the waker to an asserted state, if it isn't asserted yet. When // asserted, the waker will cause its matching sleeper to wake up. func (w *Waker) Assert() { - log.Println("Assert...") + log.Println(unsafe.Pointer(w), "Assert", &assertedSleeper) // Nothing to do if the waker is already asserted. This check allows us // to complete this case (already asserted) without any interlocked // operations on x86. if atomic.LoadPointer(&w.s) == usleeper(&assertedSleeper) { + log.Println("已经叫过了") return } + log.Println("Asserting ...") // Mark the waker as asserted, and wake up a sleeper if there is one. switch s := (*Sleeper)(atomic.SwapPointer(&w.s, usleeper(&assertedSleeper))); s { case nil: case &assertedSleeper: - default: - log.Println("唤醒", s) - s.enqueueAssertedWaker(w) // call goready + default: // 初始态 + log.Println("唤醒", s.waitingG) + s.enqueueAssertedWaker(w) // s 是tcp端的newSegmentWaker call goready } } diff --git a/tcpip/header/tcp.go b/tcpip/header/tcp.go index 90b4f3b..73569fa 100644 --- a/tcpip/header/tcp.go +++ b/tcpip/header/tcp.go @@ -261,11 +261,43 @@ func (b TCP) SetChecksum(checksum uint16) { binary.BigEndian.PutUint16(b[tcpChecksum:], checksum) } +// CalculateChecksum calculates the checksum of the tcp segment given +// the totalLen and partialChecksum(descriptions below) +// totalLen is the total length of the segment +// partialChecksum is the checksum of the network-layer pseudo-header +// (excluding the total length) and the checksum of the segment data. +func (b TCP) CalculateChecksum(partialChecksum uint16, totalLen uint16) uint16 { + // Add the length portion of the checksum to the pseudo-checksum. + tmp := make([]byte, 2) + binary.BigEndian.PutUint16(tmp, totalLen) + checksum := Checksum(tmp, partialChecksum) + + // Calculate the rest of the checksum. + return Checksum(b[:b.DataOffset()], checksum) +} + // Options returns a slice that holds the unparsed TCP options in the segment. func (b TCP) Options() []byte { return b[TCPMinimumSize:b.DataOffset()] } +func (b TCP) encodeSubset(seq, ack uint32, flags uint8, rcvwnd uint16) { + binary.BigEndian.PutUint32(b[seqNum:], seq) + binary.BigEndian.PutUint32(b[ackNum:], ack) + b[tcpFlags] = flags + binary.BigEndian.PutUint16(b[winSize:], rcvwnd) +} + +// Encode encodes all the fields of the tcp header. +func (b TCP) Encode(t *TCPFields) { + b.encodeSubset(t.SeqNum, t.AckNum, t.Flags, t.WindowSize) + binary.BigEndian.PutUint16(b[srcPort:], t.SrcPort) + binary.BigEndian.PutUint16(b[dstPort:], t.DstPort) + b[dataOffset] = (t.DataOffset / 4) << 4 + binary.BigEndian.PutUint16(b[tcpChecksum:], t.Checksum) + binary.BigEndian.PutUint16(b[urgentPtr:], t.UrgentPointer) +} + // ParseTCPOptions extracts and stores all known options in the provided byte // slice in a TCPOptions structure. func ParseTCPOptions(b []byte) TCPOptions { @@ -432,6 +464,112 @@ func (opts TCPSynOptions) String() string { return fmt.Sprintf("|%d|%d|%v|%d|%d|%v|", opts.MSS, opts.WS, opts.TS, opts.TSVal, opts.TSEcr, opts.SACKPermitted) } +// EncodeMSSOption encodes the MSS TCP option with the provided MSS values in +// the supplied buffer. If the provided buffer is not large enough then it just +// returns without encoding anything. It returns the number of bytes written to +// the provided buffer. +func EncodeMSSOption(mss uint32, b []byte) int { + // mssOptionSize is the number of bytes in a valid MSS option. + const mssOptionSize = 4 + + if len(b) < mssOptionSize { + return 0 + } + b[0], b[1], b[2], b[3] = TCPOptionMSS, mssOptionSize, byte(mss>>8), byte(mss) + return mssOptionSize +} + +// EncodeWSOption encodes the WS TCP option with the WS value in the +// provided buffer. If the provided buffer is not large enough then it just +// returns without encoding anything. It returns the number of bytes written to +// the provided buffer. +func EncodeWSOption(ws int, b []byte) int { + if len(b) < 3 { + return 0 + } + b[0], b[1], b[2] = TCPOptionWS, 3, uint8(ws) + return int(b[1]) +} + +// EncodeTSOption encodes the provided tsVal and tsEcr values as a TCP timestamp +// option into the provided buffer. If the buffer is smaller than expected it +// just returns without encoding anything. It returns the number of bytes +// written to the provided buffer. +func EncodeTSOption(tsVal, tsEcr uint32, b []byte) int { + if len(b) < 10 { + return 0 + } + b[0], b[1] = TCPOptionTS, 10 + binary.BigEndian.PutUint32(b[2:], tsVal) + binary.BigEndian.PutUint32(b[6:], tsEcr) + return int(b[1]) +} + +// EncodeSACKPermittedOption encodes a SACKPermitted option into the provided +// buffer. If the buffer is smaller than required it just returns without +// encoding anything. It returns the number of bytes written to the provided +// buffer. +func EncodeSACKPermittedOption(b []byte) int { + if len(b) < 2 { + return 0 + } + + b[0], b[1] = TCPOptionSACKPermitted, 2 + return int(b[1]) +} + +// EncodeSACKBlocks encodes the provided SACK blocks as a TCP SACK option block +// in the provided slice. It tries to fit in as many blocks as possible based on +// number of bytes available in the provided buffer. It returns the number of +// bytes written to the provided buffer. +func EncodeSACKBlocks(sackBlocks []SACKBlock, b []byte) int { + if len(sackBlocks) == 0 { + return 0 + } + l := len(sackBlocks) + if l > TCPMaxSACKBlocks { + l = TCPMaxSACKBlocks + } + if ll := (len(b) - 2) / 8; ll < l { + l = ll + } + if l == 0 { + // There is not enough space in the provided buffer to add + // any SACK blocks. + return 0 + } + b[0] = TCPOptionSACK + b[1] = byte(l*8 + 2) + for i := 0; i < l; i++ { + binary.BigEndian.PutUint32(b[i*8+2:], uint32(sackBlocks[i].Start)) + binary.BigEndian.PutUint32(b[i*8+6:], uint32(sackBlocks[i].End)) + } + return int(b[1]) +} + +// EncodeNOP adds an explicit NOP to the option list. +func EncodeNOP(b []byte) int { + if len(b) == 0 { + return 0 + } + b[0] = TCPOptionNOP + return 1 +} + +// AddTCPOptionPadding adds the required number of TCPOptionNOP to quad align +// the option buffer. It adds padding bytes after the offset specified and +// returns the number of padding bytes added. The passed in options slice +// must have space for the padding bytes. +func AddTCPOptionPadding(options []byte, offset int) int { + paddingToAdd := -offset & 3 + // Now add any padding bytes that might be required to quad align the + // options. + for i := offset; i < offset+paddingToAdd; i++ { + options[i] = TCPOptionNOP + } + return paddingToAdd +} + /* 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 diff --git a/tcpip/transport/tcp/accept.go b/tcpip/transport/tcp/accept.go index 659a67d..4de6e2d 100644 --- a/tcpip/transport/tcp/accept.go +++ b/tcpip/transport/tcp/accept.go @@ -306,7 +306,6 @@ func (e *endpoint) protocolListenLoop(rcvWnd seqnum.Size) *tcpip.Error { for { switch index, _ := s.Fetch(true); index { // Fetch(true) 阻塞获取 case wakerForNewSegment: - log.Println("处理处理") mayRequeue := true // 接收和处理tcp报文 for i := 0; i < maxSegmentsPerWake; i++ { @@ -325,6 +324,7 @@ func (e *endpoint) protocolListenLoop(rcvWnd seqnum.Size) *tcpip.Error { } case wakerForNotification: // TODO 触发其他事件 + log.Println("其他事件?") } } } diff --git a/tcpip/transport/tcp/connect.go b/tcpip/transport/tcp/connect.go index 7161dba..81933cd 100644 --- a/tcpip/transport/tcp/connect.go +++ b/tcpip/transport/tcp/connect.go @@ -2,12 +2,16 @@ package tcp import ( "crypto/rand" + "fmt" "log" + "netstack/sleep" "netstack/tcpip" "netstack/tcpip/buffer" "netstack/tcpip/header" "netstack/tcpip/seqnum" "netstack/tcpip/stack" + "sync" + "time" ) const maxSegmentsPerWake = 100 @@ -108,11 +112,50 @@ func (h *handshake) resetToSynRcvd(iss seqnum.Value, irs seqnum.Value, opts *hea log.Println("发送 syn|ack 确认报文 设置tcp状态为 [rcvd]") h.flags = flagSyn | flagAck h.iss = iss - h.ackNum = irs + 1 + h.ackNum = irs + 1 // NOTE ACK = synNum + 1 h.mss = opts.MSS h.sndWndScale = opts.WS } +func (h *handshake) resolveRoute() *tcpip.Error { + log.Printf("tcp resolveRoute") + // Set up the wakers. + s := sleep.Sleeper{} + resolutionWaker := &sleep.Waker{} + s.AddWaker(resolutionWaker, wakerForResolution) + s.AddWaker(&h.ep.notificationWaker, wakerForNotification) + defer s.Done() + + // Initial action is to resolve route. + index := wakerForResolution + for { + log.Println(index) + switch index { + case wakerForResolution: + if _, err := h.ep.route.Resolve(resolutionWaker); err != tcpip.ErrWouldBlock { + // Either success (err == nil) or failure. + return err + } + // Resolution not completed. Keep trying... + + case wakerForNotification: + // TODO + //n := h.ep.fetchNotifications() + //if n¬ifyClose != 0 { + // h.ep.route.RemoveWaker(resolutionWaker) + // return tcpip.ErrAborted + //} + //if n¬ifyDrain != 0 { + // close(h.ep.drainDone) + // <-h.ep.undrain + //} + } + + // Wait for notification. + index, _ = s.Fetch(true) + } +} + // execute executes the TCP 3-way handshake. // 执行tcp 3次握手,客户端和服务端都是调用该函数来实现三次握手 /* @@ -127,20 +170,136 @@ func (h *handshake) resetToSynRcvd(iss seqnum.Value, irs seqnum.Value, opts *hea |------ack----->|established */ func (h *handshake) execute() *tcpip.Error { + // 是否需要拿到下一条地址 + if h.ep.route.IsResolutionRequired() { + if err := h.resolveRoute(); err != nil { + return err + } + } + // Initialize the resend timer. + // 初始化重传定时器 + resendWaker := sleep.Waker{} + // 设置1s超时 + timeOut := time.Duration(time.Second) + rt := time.AfterFunc(timeOut, func() { + resendWaker.Assert() + }) + defer rt.Stop() + + // Set up the wakers. + s := sleep.Sleeper{} + s.AddWaker(&resendWaker, wakerForResend) + s.AddWaker(&h.ep.notificationWaker, wakerForNotification) + s.AddWaker(&h.ep.newSegmentWaker, wakerForNewSegment) + defer s.Done() + // sync报文的选项参数 synOpts := header.TCPSynOptions{} // 如果是客户端发送 syn 报文,如果是服务端发送 syn+ack 报文 sendSynTCP(&h.ep.route, h.ep.id, h.flags, h.iss, h.ackNum, h.rcvWnd, synOpts) + + for h.state != handshakeCompleted { + // 获取事件id + switch index, _ := s.Fetch(true); index { + case wakerForResend: // NOTE tcp超时重传机制 + // 如果是客户端当发送 syn 报文,超过一定的时间未收到回包,触发超时重传 + // 如果是服务端当发送 syn+ack 报文,超过一定的时间未收到 ack 回包,触发超时重传 + // 超时时间变为上次的2倍 + timeOut *= 2 + if timeOut > 60*time.Second { + return tcpip.ErrTimeout + } + rt.Reset(timeOut) + // 重新发送syn报文 + sendSynTCP(&h.ep.route, h.ep.id, h.flags, h.iss, h.ackNum, h.rcvWnd, synOpts) + + case wakerForNotification: + + case wakerForNewSegment: + // 处理握手报文 + } + } return nil } +var optionPool = sync.Pool{ + New: func() interface{} { + return make([]byte, maxOptionSize) + }, +} + +// 减少资源浪费 +func getOptions() []byte { + return optionPool.Get().([]byte) +} + +func putOptions(options []byte) { + // Reslice to full capacity. + optionPool.Put(options[0:cap(options)]) +} + +// tcp选项的编码 将一个TCPSyncOptions编码到 []byte 中 +func makeSynOptions(opts header.TCPSynOptions) []byte { + // Emulate linux option order. This is as follows: + // + // if md5: NOP NOP MD5SIG 18 md5sig(16) + // if mss: MSS 4 mss(2) + // if ts and sack_advertise: + // SACK 2 TIMESTAMP 2 timestamp(8) + // elif ts: NOP NOP TIMESTAMP 10 timestamp(8) + // elif sack: NOP NOP SACK 2 + // if wscale: NOP WINDOW 3 ws(1) + // if sack_blocks: NOP NOP SACK ((2 + (#blocks * 8)) + // [for each block] start_seq(4) end_seq(4) + // if fastopen_cookie: + // if exp: EXP (4 + len(cookie)) FASTOPEN_MAGIC(2) + // else: FASTOPEN (2 + len(cookie)) + // cookie(variable) [padding to four bytes] + // + options := getOptions() + + // Always encode the mss. + offset := header.EncodeMSSOption(uint32(opts.MSS), options) + + // Special ordering is required here. If both TS and SACK are enabled, + // then the SACK option precedes TS, with no padding. If they are + // enabled individually, then we see padding before the option. + if opts.TS && opts.SACKPermitted { + offset += header.EncodeSACKPermittedOption(options[offset:]) + offset += header.EncodeTSOption(opts.TSVal, opts.TSEcr, options[offset:]) + } else if opts.TS { + offset += header.EncodeNOP(options[offset:]) + offset += header.EncodeNOP(options[offset:]) + offset += header.EncodeTSOption(opts.TSVal, opts.TSEcr, options[offset:]) + } else if opts.SACKPermitted { + offset += header.EncodeNOP(options[offset:]) + offset += header.EncodeNOP(options[offset:]) + offset += header.EncodeSACKPermittedOption(options[offset:]) + } + + // Initialize the WS option. + if opts.WS >= 0 { + offset += header.EncodeNOP(options[offset:]) + offset += header.EncodeWSOption(opts.WS, options[offset:]) + } + + // Padding to the end; note that this never apply unless we add a + // fastopen option, we always expect the offset to remain the same. + if delta := header.AddTCPOptionPadding(options, offset); delta != 0 { + panic("unexpected option encoding") + } + + return options[:offset] +} + // 封装 sendTCP ,发送 syn 报文 func sendSynTCP(r *stack.Route, id stack.TransportEndpointID, flags byte, seq, ack seqnum.Value, rcvWnd seqnum.Size, opts header.TCPSynOptions) *tcpip.Error { - - options := []byte{} - err := sendTCP(r, id, buffer.VectorisedView{}, 0, flags, seq, ack, rcvWnd, options) - + if opts.MSS == 0 { + opts.MSS = uint16(r.MTU() - header.TCPMinimumSize) + } + options := makeSynOptions(opts) + err := sendTCP(r, id, buffer.VectorisedView{}, r.DefaultTTL(), flags, seq, ack, rcvWnd, options) return err } @@ -150,7 +309,50 @@ func sendSynTCP(r *stack.Route, id stack.TransportEndpointID, flags byte, func sendTCP(r *stack.Route, id stack.TransportEndpointID, data buffer.VectorisedView, ttl uint8, flags byte, seq, ack seqnum.Value, rcvWnd seqnum.Size, opts []byte) *tcpip.Error { log.Println("进行一个报文的发送") - return nil + optLen := len(opts) + // Allocate a buffer for the TCP header. + hdr := buffer.NewPrependable(header.TCPMinimumSize + int(r.MaxHeaderLength()) + optLen) + + if rcvWnd > 0xffff { + rcvWnd = 0xffff + } + + // Initialize the header. + tcp := header.TCP(hdr.Prepend(header.TCPMinimumSize + optLen)) + tcp.Encode(&header.TCPFields{ + SrcPort: id.LocalPort, + DstPort: id.RemotePort, + SeqNum: uint32(seq), + AckNum: uint32(ack), + DataOffset: uint8(header.TCPMinimumSize + optLen), + Flags: flags, + WindowSize: uint16(rcvWnd), + }) + copy(tcp[header.TCPMinimumSize:], opts) + + // Only calculate the checksum if offloading isn't supported. + if r.Capabilities()&stack.CapabilityChecksumOffload == 0 { + length := uint16(hdr.UsedLength() + data.Size()) + // tcp伪首部校验和的计算 + xsum := r.PseudoHeaderChecksum(ProtocolNumber) + for _, v := range data.Views() { + xsum = header.Checksum(v, xsum) + } + + // tcp的可靠性:校验和的计算,用于检测损伤的报文段 + tcp.SetChecksum(^tcp.CalculateChecksum(xsum, length)) + } + + r.Stats().TCP.SegmentsSent.Increment() + if (flags & flagRst) != 0 { + r.Stats().TCP.ResetsSent.Increment() + } + + log.Printf("send tcp %s segment to %s, seq: %d, ack: %d, rcvWnd: %d", + flagString(flags), fmt.Sprintf("%s:%d", id.RemoteAddress, id.RemotePort), + seq, ack, rcvWnd) + + return r.WritePacket(hdr, data, ProtocolNumber, ttl) } // protocolMainLoop 是TCP协议的主循环。它在自己的goroutine中运行,负责握手、发送段和处理收到的段