Hi 加个好友呀?❤

Fixes #1
This commit is contained in:
impact-eintr
2022-12-07 19:39:09 +08:00
parent 3617794e76
commit d4d5c61a83
6 changed files with 404 additions and 23 deletions

17
.vscode/c_cpp_properties.json vendored Normal file
View File

@@ -0,0 +1,17 @@
{
"configurations": [
{
"name": "Win32",
"includePath": [
"${workspaceFolder}/**"
],
"defines": [
"_DEBUG",
"UNICODE",
"_UNICODE"
],
"intelliSenseMode": "windows-msvc-x64"
}
],
"version": 4
}

View File

@@ -3,12 +3,22 @@ package main
import (
"fmt"
"net"
"time"
)
func main() {
go func() {
_, err := net.Dial("tcp", "192.168.1.1:9999")
if err != nil {
fmt.Println("err : ", err)
return
}
}()
t := time.NewTimer(500 * time.Millisecond)
select {
case <-t.C:
return
}
}

View File

@@ -149,10 +149,14 @@ func (s *Sleeper) AddWaker(w *Waker, id int) {
// nextWaker returns the next waker in the notification list, blocking if
// needed.
// listenLoop的Sleeper尝试改变自己的调度 如果没有人等着催 就睡
// 设置了block的话 尝试去获取本地列表中的唤醒者 并取出最前面的唤醒者
// 要是本地列表为空 就去遍历共享列表
func (s *Sleeper) nextWaker(block bool) *Waker {
// Attempt to replenish the local list if it's currently empty.
if s.localList == nil {
for atomic.LoadPointer(&s.sharedList) == nil {
// 首次进入循环 检查共享链表为空 可能需要睡
for atomic.LoadPointer(&s.sharedList) == nil { // 被唤醒 再次检查共享链表情况 很有可能有人催了
// Fail request if caller requested that we
// don't block.
if !block {
@@ -163,13 +167,13 @@ func (s *Sleeper) nextWaker(block bool) *Waker {
// this allows them to abort the wait by setting
// waitingG back to zero (which we'll notice
// before committing the sleep).
atomic.StoreUintptr(&s.waitingG, preparingG)
atomic.StoreUintptr(&s.waitingG, preparingG) // 准备睡
// Check if something was queued while we were
// preparing to sleep. We need this interleaving
// to avoid missing wake ups.
if atomic.LoadPointer(&s.sharedList) != nil {
atomic.StoreUintptr(&s.waitingG, 0)
if atomic.LoadPointer(&s.sharedList) != nil { // 有人催了 不睡了
atomic.StoreUintptr(&s.waitingG, 0) // 放弃睡眠
break
}
@@ -180,16 +184,20 @@ func (s *Sleeper) nextWaker(block bool) *Waker {
// commitSleep to decide whether to immediately
// wake the caller up or to leave it sleeping.
const traceEvGoBlockSelect = 24
log.Println(atomic.LoadUintptr(&s.waitingG), "进入睡眠")
// 没人催 进入睡眠
gopark(commitSleep, &s.waitingG, "sleeper", traceEvGoBlockSelect, 0)
}
log.Println(atomic.LoadPointer(&s.sharedList), "唤醒了", atomic.LoadUintptr(&s.waitingG))
// Pull the shared list out and reverse it in the local
// list. Given that wakers push themselves in reverse
// order, we fix things here.
v := (*Waker)(atomic.SwapPointer(&s.sharedList, nil))
for v != nil {
// 将共享列表拉出来,在本地列表中反转。鉴于唤醒者以相反的顺序推动自己,我们在这里解决问题
v := (*Waker)(atomic.SwapPointer(&s.sharedList, nil)) // 这里将唤醒者置空
for v != nil { // 共享链表有东西 找到最后一位
cur := v
v = v.next
v = v.next // 向后遍历共享链表
cur.next = s.localList
s.localList = cur
@@ -216,6 +224,7 @@ func (s *Sleeper) nextWaker(block bool) *Waker {
// allowed to call this method.
func (s *Sleeper) Fetch(block bool) (id int, ok bool) {
for {
// 这个s是ListenLoop的Sleeper
w := s.nextWaker(block) // 如果没有 将暂停调度 call gopark
if w == nil {
return -1, false
@@ -283,11 +292,12 @@ func (s *Sleeper) Done() {
// enqueueAssertedWaker enqueues an asserted waker to the "ready" circular list
// of wakers that want to notify the sleeper.
func (s *Sleeper) enqueueAssertedWaker(w *Waker) {
func (s *Sleeper) enqueueAssertedWaker(w *Waker) { // w.s 是 assertSleeper
// Add the new waker to the front of the list.
for {
v := (*Waker)(atomic.LoadPointer(&s.sharedList))
w.next = v
// 每个新连接的 newSegmentWaker.s.sharedList 初始化为 正在睡眠的 listenLoop 的waker
if atomic.CompareAndSwapPointer(&s.sharedList, uwaker(v), uwaker(w)) {
break
}
@@ -296,7 +306,8 @@ func (s *Sleeper) enqueueAssertedWaker(w *Waker) {
for {
// Nothing to do if there isn't a G waiting.
g := atomic.LoadUintptr(&s.waitingG)
if g == 0 {
log.Println("tcp端 所在的G: ", g)
if g == 0 { // 0 表示 醒着
return
}
@@ -304,6 +315,7 @@ func (s *Sleeper) enqueueAssertedWaker(w *Waker) {
if atomic.CompareAndSwapUintptr(&s.waitingG, g, 0) {
if g != preparingG {
// We managed to get a G. Wake it up.
log.Println(g, "去唤醒 0")
goready(g, 0)
}
}
@@ -347,21 +359,23 @@ type Waker struct {
// Assert moves the waker to an asserted state, if it isn't asserted yet. When
// asserted, the waker will cause its matching sleeper to wake up.
func (w *Waker) Assert() {
log.Println("Assert...")
log.Println(unsafe.Pointer(w), "Assert", &assertedSleeper)
// Nothing to do if the waker is already asserted. This check allows us
// to complete this case (already asserted) without any interlocked
// operations on x86.
if atomic.LoadPointer(&w.s) == usleeper(&assertedSleeper) {
log.Println("已经叫过了")
return
}
log.Println("Asserting ...")
// Mark the waker as asserted, and wake up a sleeper if there is one.
switch s := (*Sleeper)(atomic.SwapPointer(&w.s, usleeper(&assertedSleeper))); s {
case nil:
case &assertedSleeper:
default:
log.Println("唤醒", s)
s.enqueueAssertedWaker(w) // call goready
default: // 初始态
log.Println("唤醒", s.waitingG)
s.enqueueAssertedWaker(w) // s 是tcp端的newSegmentWaker call goready
}
}

View File

@@ -261,11 +261,43 @@ func (b TCP) SetChecksum(checksum uint16) {
binary.BigEndian.PutUint16(b[tcpChecksum:], checksum)
}
// CalculateChecksum calculates the checksum of the tcp segment given
// the totalLen and partialChecksum(descriptions below)
// totalLen is the total length of the segment
// partialChecksum is the checksum of the network-layer pseudo-header
// (excluding the total length) and the checksum of the segment data.
func (b TCP) CalculateChecksum(partialChecksum uint16, totalLen uint16) uint16 {
// Add the length portion of the checksum to the pseudo-checksum.
tmp := make([]byte, 2)
binary.BigEndian.PutUint16(tmp, totalLen)
checksum := Checksum(tmp, partialChecksum)
// Calculate the rest of the checksum.
return Checksum(b[:b.DataOffset()], checksum)
}
// Options returns a slice that holds the unparsed TCP options in the segment.
func (b TCP) Options() []byte {
return b[TCPMinimumSize:b.DataOffset()]
}
func (b TCP) encodeSubset(seq, ack uint32, flags uint8, rcvwnd uint16) {
binary.BigEndian.PutUint32(b[seqNum:], seq)
binary.BigEndian.PutUint32(b[ackNum:], ack)
b[tcpFlags] = flags
binary.BigEndian.PutUint16(b[winSize:], rcvwnd)
}
// Encode encodes all the fields of the tcp header.
func (b TCP) Encode(t *TCPFields) {
b.encodeSubset(t.SeqNum, t.AckNum, t.Flags, t.WindowSize)
binary.BigEndian.PutUint16(b[srcPort:], t.SrcPort)
binary.BigEndian.PutUint16(b[dstPort:], t.DstPort)
b[dataOffset] = (t.DataOffset / 4) << 4
binary.BigEndian.PutUint16(b[tcpChecksum:], t.Checksum)
binary.BigEndian.PutUint16(b[urgentPtr:], t.UrgentPointer)
}
// ParseTCPOptions extracts and stores all known options in the provided byte
// slice in a TCPOptions structure.
func ParseTCPOptions(b []byte) TCPOptions {
@@ -432,6 +464,112 @@ func (opts TCPSynOptions) String() string {
return fmt.Sprintf("|%d|%d|%v|%d|%d|%v|", opts.MSS, opts.WS, opts.TS, opts.TSVal, opts.TSEcr, opts.SACKPermitted)
}
// EncodeMSSOption encodes the MSS TCP option with the provided MSS values in
// the supplied buffer. If the provided buffer is not large enough then it just
// returns without encoding anything. It returns the number of bytes written to
// the provided buffer.
func EncodeMSSOption(mss uint32, b []byte) int {
// mssOptionSize is the number of bytes in a valid MSS option.
const mssOptionSize = 4
if len(b) < mssOptionSize {
return 0
}
b[0], b[1], b[2], b[3] = TCPOptionMSS, mssOptionSize, byte(mss>>8), byte(mss)
return mssOptionSize
}
// EncodeWSOption encodes the WS TCP option with the WS value in the
// provided buffer. If the provided buffer is not large enough then it just
// returns without encoding anything. It returns the number of bytes written to
// the provided buffer.
func EncodeWSOption(ws int, b []byte) int {
if len(b) < 3 {
return 0
}
b[0], b[1], b[2] = TCPOptionWS, 3, uint8(ws)
return int(b[1])
}
// EncodeTSOption encodes the provided tsVal and tsEcr values as a TCP timestamp
// option into the provided buffer. If the buffer is smaller than expected it
// just returns without encoding anything. It returns the number of bytes
// written to the provided buffer.
func EncodeTSOption(tsVal, tsEcr uint32, b []byte) int {
if len(b) < 10 {
return 0
}
b[0], b[1] = TCPOptionTS, 10
binary.BigEndian.PutUint32(b[2:], tsVal)
binary.BigEndian.PutUint32(b[6:], tsEcr)
return int(b[1])
}
// EncodeSACKPermittedOption encodes a SACKPermitted option into the provided
// buffer. If the buffer is smaller than required it just returns without
// encoding anything. It returns the number of bytes written to the provided
// buffer.
func EncodeSACKPermittedOption(b []byte) int {
if len(b) < 2 {
return 0
}
b[0], b[1] = TCPOptionSACKPermitted, 2
return int(b[1])
}
// EncodeSACKBlocks encodes the provided SACK blocks as a TCP SACK option block
// in the provided slice. It tries to fit in as many blocks as possible based on
// number of bytes available in the provided buffer. It returns the number of
// bytes written to the provided buffer.
func EncodeSACKBlocks(sackBlocks []SACKBlock, b []byte) int {
if len(sackBlocks) == 0 {
return 0
}
l := len(sackBlocks)
if l > TCPMaxSACKBlocks {
l = TCPMaxSACKBlocks
}
if ll := (len(b) - 2) / 8; ll < l {
l = ll
}
if l == 0 {
// There is not enough space in the provided buffer to add
// any SACK blocks.
return 0
}
b[0] = TCPOptionSACK
b[1] = byte(l*8 + 2)
for i := 0; i < l; i++ {
binary.BigEndian.PutUint32(b[i*8+2:], uint32(sackBlocks[i].Start))
binary.BigEndian.PutUint32(b[i*8+6:], uint32(sackBlocks[i].End))
}
return int(b[1])
}
// EncodeNOP adds an explicit NOP to the option list.
func EncodeNOP(b []byte) int {
if len(b) == 0 {
return 0
}
b[0] = TCPOptionNOP
return 1
}
// AddTCPOptionPadding adds the required number of TCPOptionNOP to quad align
// the option buffer. It adds padding bytes after the offset specified and
// returns the number of padding bytes added. The passed in options slice
// must have space for the padding bytes.
func AddTCPOptionPadding(options []byte, offset int) int {
paddingToAdd := -offset & 3
// Now add any padding bytes that might be required to quad align the
// options.
for i := offset; i < offset+paddingToAdd; i++ {
options[i] = TCPOptionNOP
}
return paddingToAdd
}
/*
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1

View File

@@ -306,7 +306,6 @@ func (e *endpoint) protocolListenLoop(rcvWnd seqnum.Size) *tcpip.Error {
for {
switch index, _ := s.Fetch(true); index { // Fetch(true) 阻塞获取
case wakerForNewSegment:
log.Println("处理处理")
mayRequeue := true
// 接收和处理tcp报文
for i := 0; i < maxSegmentsPerWake; i++ {
@@ -325,6 +324,7 @@ func (e *endpoint) protocolListenLoop(rcvWnd seqnum.Size) *tcpip.Error {
}
case wakerForNotification:
// TODO 触发其他事件
log.Println("其他事件?")
}
}
}

View File

@@ -2,12 +2,16 @@ package tcp
import (
"crypto/rand"
"fmt"
"log"
"netstack/sleep"
"netstack/tcpip"
"netstack/tcpip/buffer"
"netstack/tcpip/header"
"netstack/tcpip/seqnum"
"netstack/tcpip/stack"
"sync"
"time"
)
const maxSegmentsPerWake = 100
@@ -108,11 +112,50 @@ func (h *handshake) resetToSynRcvd(iss seqnum.Value, irs seqnum.Value, opts *hea
log.Println("发送 syn|ack 确认报文 设置tcp状态为 [rcvd]")
h.flags = flagSyn | flagAck
h.iss = iss
h.ackNum = irs + 1
h.ackNum = irs + 1 // NOTE ACK = synNum + 1
h.mss = opts.MSS
h.sndWndScale = opts.WS
}
func (h *handshake) resolveRoute() *tcpip.Error {
log.Printf("tcp resolveRoute")
// Set up the wakers.
s := sleep.Sleeper{}
resolutionWaker := &sleep.Waker{}
s.AddWaker(resolutionWaker, wakerForResolution)
s.AddWaker(&h.ep.notificationWaker, wakerForNotification)
defer s.Done()
// Initial action is to resolve route.
index := wakerForResolution
for {
log.Println(index)
switch index {
case wakerForResolution:
if _, err := h.ep.route.Resolve(resolutionWaker); err != tcpip.ErrWouldBlock {
// Either success (err == nil) or failure.
return err
}
// Resolution not completed. Keep trying...
case wakerForNotification:
// TODO
//n := h.ep.fetchNotifications()
//if n&notifyClose != 0 {
// h.ep.route.RemoveWaker(resolutionWaker)
// return tcpip.ErrAborted
//}
//if n&notifyDrain != 0 {
// close(h.ep.drainDone)
// <-h.ep.undrain
//}
}
// Wait for notification.
index, _ = s.Fetch(true)
}
}
// execute executes the TCP 3-way handshake.
// 执行tcp 3次握手客户端和服务端都是调用该函数来实现三次握手
/*
@@ -127,20 +170,136 @@ func (h *handshake) resetToSynRcvd(iss seqnum.Value, irs seqnum.Value, opts *hea
|------ack----->|established
*/
func (h *handshake) execute() *tcpip.Error {
// 是否需要拿到下一条地址
if h.ep.route.IsResolutionRequired() {
if err := h.resolveRoute(); err != nil {
return err
}
}
// Initialize the resend timer.
// 初始化重传定时器
resendWaker := sleep.Waker{}
// 设置1s超时
timeOut := time.Duration(time.Second)
rt := time.AfterFunc(timeOut, func() {
resendWaker.Assert()
})
defer rt.Stop()
// Set up the wakers.
s := sleep.Sleeper{}
s.AddWaker(&resendWaker, wakerForResend)
s.AddWaker(&h.ep.notificationWaker, wakerForNotification)
s.AddWaker(&h.ep.newSegmentWaker, wakerForNewSegment)
defer s.Done()
// sync报文的选项参数
synOpts := header.TCPSynOptions{}
// 如果是客户端发送 syn 报文,如果是服务端发送 syn+ack 报文
sendSynTCP(&h.ep.route, h.ep.id, h.flags, h.iss, h.ackNum, h.rcvWnd, synOpts)
for h.state != handshakeCompleted {
// 获取事件id
switch index, _ := s.Fetch(true); index {
case wakerForResend: // NOTE tcp超时重传机制
// 如果是客户端当发送 syn 报文,超过一定的时间未收到回包,触发超时重传
// 如果是服务端当发送 syn+ack 报文,超过一定的时间未收到 ack 回包,触发超时重传
// 超时时间变为上次的2倍
timeOut *= 2
if timeOut > 60*time.Second {
return tcpip.ErrTimeout
}
rt.Reset(timeOut)
// 重新发送syn报文
sendSynTCP(&h.ep.route, h.ep.id, h.flags, h.iss, h.ackNum, h.rcvWnd, synOpts)
case wakerForNotification:
case wakerForNewSegment:
// 处理握手报文
}
}
return nil
}
var optionPool = sync.Pool{
New: func() interface{} {
return make([]byte, maxOptionSize)
},
}
// 减少资源浪费
func getOptions() []byte {
return optionPool.Get().([]byte)
}
func putOptions(options []byte) {
// Reslice to full capacity.
optionPool.Put(options[0:cap(options)])
}
// tcp选项的编码 将一个TCPSyncOptions编码到 []byte 中
func makeSynOptions(opts header.TCPSynOptions) []byte {
// Emulate linux option order. This is as follows:
//
// if md5: NOP NOP MD5SIG 18 md5sig(16)
// if mss: MSS 4 mss(2)
// if ts and sack_advertise:
// SACK 2 TIMESTAMP 2 timestamp(8)
// elif ts: NOP NOP TIMESTAMP 10 timestamp(8)
// elif sack: NOP NOP SACK 2
// if wscale: NOP WINDOW 3 ws(1)
// if sack_blocks: NOP NOP SACK ((2 + (#blocks * 8))
// [for each block] start_seq(4) end_seq(4)
// if fastopen_cookie:
// if exp: EXP (4 + len(cookie)) FASTOPEN_MAGIC(2)
// else: FASTOPEN (2 + len(cookie))
// cookie(variable) [padding to four bytes]
//
options := getOptions()
// Always encode the mss.
offset := header.EncodeMSSOption(uint32(opts.MSS), options)
// Special ordering is required here. If both TS and SACK are enabled,
// then the SACK option precedes TS, with no padding. If they are
// enabled individually, then we see padding before the option.
if opts.TS && opts.SACKPermitted {
offset += header.EncodeSACKPermittedOption(options[offset:])
offset += header.EncodeTSOption(opts.TSVal, opts.TSEcr, options[offset:])
} else if opts.TS {
offset += header.EncodeNOP(options[offset:])
offset += header.EncodeNOP(options[offset:])
offset += header.EncodeTSOption(opts.TSVal, opts.TSEcr, options[offset:])
} else if opts.SACKPermitted {
offset += header.EncodeNOP(options[offset:])
offset += header.EncodeNOP(options[offset:])
offset += header.EncodeSACKPermittedOption(options[offset:])
}
// Initialize the WS option.
if opts.WS >= 0 {
offset += header.EncodeNOP(options[offset:])
offset += header.EncodeWSOption(opts.WS, options[offset:])
}
// Padding to the end; note that this never apply unless we add a
// fastopen option, we always expect the offset to remain the same.
if delta := header.AddTCPOptionPadding(options, offset); delta != 0 {
panic("unexpected option encoding")
}
return options[:offset]
}
// 封装 sendTCP ,发送 syn 报文
func sendSynTCP(r *stack.Route, id stack.TransportEndpointID, flags byte,
seq, ack seqnum.Value, rcvWnd seqnum.Size, opts header.TCPSynOptions) *tcpip.Error {
options := []byte{}
err := sendTCP(r, id, buffer.VectorisedView{}, 0, flags, seq, ack, rcvWnd, options)
if opts.MSS == 0 {
opts.MSS = uint16(r.MTU() - header.TCPMinimumSize)
}
options := makeSynOptions(opts)
err := sendTCP(r, id, buffer.VectorisedView{}, r.DefaultTTL(), flags, seq, ack, rcvWnd, options)
return err
}
@@ -150,7 +309,50 @@ func sendSynTCP(r *stack.Route, id stack.TransportEndpointID, flags byte,
func sendTCP(r *stack.Route, id stack.TransportEndpointID, data buffer.VectorisedView, ttl uint8, flags byte,
seq, ack seqnum.Value, rcvWnd seqnum.Size, opts []byte) *tcpip.Error {
log.Println("进行一个报文的发送")
return nil
optLen := len(opts)
// Allocate a buffer for the TCP header.
hdr := buffer.NewPrependable(header.TCPMinimumSize + int(r.MaxHeaderLength()) + optLen)
if rcvWnd > 0xffff {
rcvWnd = 0xffff
}
// Initialize the header.
tcp := header.TCP(hdr.Prepend(header.TCPMinimumSize + optLen))
tcp.Encode(&header.TCPFields{
SrcPort: id.LocalPort,
DstPort: id.RemotePort,
SeqNum: uint32(seq),
AckNum: uint32(ack),
DataOffset: uint8(header.TCPMinimumSize + optLen),
Flags: flags,
WindowSize: uint16(rcvWnd),
})
copy(tcp[header.TCPMinimumSize:], opts)
// Only calculate the checksum if offloading isn't supported.
if r.Capabilities()&stack.CapabilityChecksumOffload == 0 {
length := uint16(hdr.UsedLength() + data.Size())
// tcp伪首部校验和的计算
xsum := r.PseudoHeaderChecksum(ProtocolNumber)
for _, v := range data.Views() {
xsum = header.Checksum(v, xsum)
}
// tcp的可靠性校验和的计算用于检测损伤的报文段
tcp.SetChecksum(^tcp.CalculateChecksum(xsum, length))
}
r.Stats().TCP.SegmentsSent.Increment()
if (flags & flagRst) != 0 {
r.Stats().TCP.ResetsSent.Increment()
}
log.Printf("send tcp %s segment to %s, seq: %d, ack: %d, rcvWnd: %d",
flagString(flags), fmt.Sprintf("%s:%d", id.RemoteAddress, id.RemotePort),
seq, ack, rcvWnd)
return r.WritePacket(hdr, data, ProtocolNumber, ttl)
}
// protocolMainLoop 是TCP协议的主循环。它在自己的goroutine中运行负责握手、发送段和处理收到的段