哇...tcp真的好复杂

目前网络层分发了数据包到tcp端,tcp的handlepacket把数据存到一个队列中并提醒事件驱动机制来取数据
取到数据后先进行一个解析 确认他是一个SYN包 然后解析SYN的相关选项
对于合法的数据包 开启一个goroutine去执行三次握手的第二步:返回确认包 TODO 返回确认包的实现
This commit is contained in:
impact-eintr
2022-12-06 18:02:18 +08:00
parent 5aa21b7820
commit 5ca7a1858b
10 changed files with 654 additions and 24 deletions

View File

@@ -211,10 +211,11 @@ func (s *Sleeper) nextWaker(block bool) *Waker {
// the waker; when 'ok' is false, 'id' is undefined.
//
// N.B. This method is *not* thread-safe. Only one goroutine at a time is
//
// allowed to call this method.
func (s *Sleeper) Fetch(block bool) (id int, ok bool) {
for {
w := s.nextWaker(block)
w := s.nextWaker(block) // 如果没有 将暂停调度 call gopark
if w == nil {
return -1, false
}
@@ -357,7 +358,7 @@ func (w *Waker) Assert() {
case nil:
case &assertedSleeper:
default:
s.enqueueAssertedWaker(w)
s.enqueueAssertedWaker(w) // call goready
}
}

View File

@@ -65,6 +65,31 @@ type TCPFields struct {
UrgentPointer uint16
}
// TCPSynOptions is used to return the parsed TCP Options in a syn
// segment.
// syn 报文的选项
type TCPSynOptions struct {
// MSS is the maximum segment size provided by the peer in the SYN.
MSS uint16
// WS is the window scale option provided by the peer in the SYN.
//
// Set to -1 if no window scale option was provided.
WS int
// TS is true if the timestamp option was provided in the syn/syn-ack.
TS bool
// TSVal is the value of the TSVal field in the timestamp option.
TSVal uint32
// TSEcr is the value of the TSEcr field in the timestamp option.
TSEcr uint32
// SACKPermitted is true if the SACK option was provided in the SYN/SYN-ACK.
SACKPermitted bool
}
const (
srcPort = 0
dstPort = 2
@@ -79,15 +104,32 @@ const (
// Options that may be present in a TCP segment.
const (
// 选项表结束选项
TCPOptionEOL = 0
// 空操作nop选项
TCPOptionNOP = 1
// 最大报文段长度选项
TCPOptionMSS = 2
// 窗口扩大因子选项
TCPOptionWS = 3
// 时间戳选项
TCPOptionTS = 8
// 选择性确认Selective AcknowledgmentSACK选项
TCPOptionSACKPermitted = 4
// SACK 实际工作的选项
TCPOptionSACK = 5
)
const (
// MaxWndScale is maximum allowed window scaling, as described in
// RFC 1323, section 2.3, page 11.
MaxWndScale = 14
// TCPMaxSACKBlocks is the maximum number of SACK blocks that can
// be encoded in a TCP option field.
TCPMaxSACKBlocks = 4
)
// SACKBlock 表示 sack 块的结构体
type SACKBlock struct {
// Start indicates the lowest sequence number in the block.
@@ -98,6 +140,13 @@ type SACKBlock struct {
End seqnum.Value
}
/*
1byte 1byte nbytes
+--------+--------+------------------+
| Kind | Length | Info |
+--------+--------+------------------+
*/
// TCPOptions tcp选项结构这个结构不表示 syn/syn-ack 报文
type TCPOptions struct {
// TS is true if the TimeStamp option is enabled.
@@ -111,6 +160,19 @@ type TCPOptions struct {
// SACKBlocks are the SACK blocks specified in the segment.
SACKBlocks []SACKBlock
// 以下仅供测试之用 不对外暴露
// MSS is the maximum segment size provided by the peer in the SYN.
mss uint16
// WS is the window scale option provided by the peer in the SYN.
//
// Set to -1 if no window scale option was provided.
ws int
// SACKPermitted is true if the SACK option was provided in the SYN/SYN-ACK.
sackPermitted bool
}
// TCP represents a TCP header stored in a byte array.
@@ -215,6 +277,26 @@ func ParseTCPOptions(b []byte) TCPOptions {
i = limit
case TCPOptionNOP: // 空值
i++
case TCPOptionMSS:
if i+4 > limit || b[i+1] != 4 {
return opts
}
mss := uint16(b[i+2])<<8 | uint16(b[i+3])
if mss == 0 {
return opts
}
opts.mss = mss
i += 4
case TCPOptionWS:
if i+3 > limit || b[i+1] != 3 {
return opts
}
ws := int(b[i+2])
if ws > MaxWndScale {
ws = MaxWndScale
}
opts.ws = ws
i += 3
case TCPOptionTS: // 计时
if i+10 > limit || (b[i+1] != 10) {
return opts
@@ -223,17 +305,32 @@ func ParseTCPOptions(b []byte) TCPOptions {
opts.TSVal = binary.BigEndian.Uint32(b[i+2:])
opts.TSEcr = binary.BigEndian.Uint32(b[i+6:])
i += 10
case TCPOptionSACKPermitted:
if i+2 > limit || b[i+1] != 2 {
return opts
}
opts.sackPermitted = true
i += 2
case TCPOptionSACK:
if i+2 > limit {
// Malformed SACK block, just return and stop parsing.
return opts
}
sackOptionLen := int(b[i+1])
// TODO 需要添加
numBlocks := (sackOptionLen - 2) / 8 // 去头 每个block长为8
opts.SACKBlocks = []SACKBlock{}
for j := 0; j < numBlocks; j++ {
start := binary.BigEndian.Uint32(b[i+2+j*8:])
end := binary.BigEndian.Uint32(b[i+2+j*8+4:])
opts.SACKBlocks = append(opts.SACKBlocks, SACKBlock{
Start: seqnum.Value(start),
End: seqnum.Value(end),
})
}
i += sackOptionLen
default:
// We don't recognize this option, just skip over it.
// 这里不做进一步解析 留到后面进行
if i+2 > limit {
return opts
}
@@ -243,12 +340,98 @@ func ParseTCPOptions(b []byte) TCPOptions {
if l < 2 || i+l > limit {
return opts
}
i++
i += l
}
}
return opts
}
func (opts TCPOptions) String() string {
return fmt.Sprintf("|MSS|% 29d|\n|WS |% 29d|\n|TS |% 29v|\n|TSV|% 29d|\n|TSE|% 29d|\n|SP |% 29v|\n|SBS|%v|",
opts.mss, opts.ws, opts.TS, opts.TSVal, opts.TSEcr, opts.sackPermitted, opts.SACKBlocks)
}
// ParseSynOptions parses the options received in a SYN segment and returns the
// relevant ones. opts should point to the option part of the TCP Header.
func ParseSynOptions(opts []byte, isAck bool) TCPSynOptions {
synOpts := TCPSynOptions{
// Per RFC 1122, page 85: "If an MSS option is not received at
// connection setup, TCP MUST assume a default send MSS of 536."
MSS: 536,
// If no window scale option is specified, WS in options is
// returned as -1; this is because the absence of the option
// indicates that the we cannot use window scaling on the
// receive end either.
WS: -1,
}
limit := len(opts)
for i := 0; i < limit; {
switch opts[i] {
case TCPOptionEOL:
i = limit
case TCPOptionNOP:
i++
case TCPOptionMSS:
if i+4 > limit || opts[i+1] != 4 {
return synOpts
}
mss := uint16(opts[i+2])<<8 | uint16(opts[i+3])
if mss == 0 {
return synOpts
}
synOpts.MSS = mss
i += 4
case TCPOptionWS:
if i+3 > limit || opts[i+1] != 3 {
return synOpts
}
ws := int(opts[i+2])
if ws > MaxWndScale {
ws = MaxWndScale
}
synOpts.WS = ws
i += 3
case TCPOptionTS:
if i+10 > limit || opts[i+1] != 10 {
return synOpts
}
synOpts.TSVal = binary.BigEndian.Uint32(opts[i+2:])
if isAck { // ACK报文需要记录时间间隔
// If the segment is a SYN-ACK then store the Timestamp Echo Reply
// in the segment.
synOpts.TSEcr = binary.BigEndian.Uint32(opts[i+6:])
}
synOpts.TS = true
i += 10
case TCPOptionSACKPermitted:
if i+2 > limit || opts[i+1] != 2 {
return synOpts
}
synOpts.SACKPermitted = true
i += 2
default:
// We don't recognize this option, just skip over it.
if i+2 > limit {
return synOpts
}
l := int(opts[i+1])
// If the length is incorrect or if l+i overflows the
// total options length then return false.
if l < 2 || i+l > limit {
return synOpts
}
i += l
}
}
return synOpts
}
func (opts TCPSynOptions) String() string {
return fmt.Sprintf("|%d|%d|%v|%d|%d|%v|", opts.MSS, opts.WS, opts.TS, opts.TSVal, opts.TSEcr, opts.SACKPermitted)
}
/*
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
@@ -277,16 +460,12 @@ var tcpFmt string = `
|% 32s |
|% 4s|% 4s|%06b|% 16s|
|% 16s|% 16s|
|% 8v|
| Padding |
%v`
---------------------------------`
func (b TCP) String() string {
return fmt.Sprintf(tcpFmt, atoi(b.SourcePort()), atoi(b.DestinationPort()),
atoi(b.SequenceNumber()),
atoi(b.AckNumber()),
atoi(b.DataOffset()), "0", b.Flags(), atoi(b.WindowSize()),
atoi(b.Checksum()), atoi(b.UrgentPtr()),
ParseTCPOptions(b.Options()),
b.viewPayload())
atoi(b.Checksum()), atoi(b.UrgentPtr()))
}

View File

@@ -49,3 +49,29 @@
9. 紧急指针 占 2 字节,紧急指针仅在 URG=1 时才有意义,它指出本报文段中的紧急数据的字节数(紧急数据结束后就是普通数据) 。因此在紧急指针指出了紧急数据的末尾在报文段中的位置。当所有紧急数据都处理完时TCP 就告诉应用程序恢复到正常操作。值得注意的是,即使窗口为 0 时也可以发送紧急数据。
10. 选项 选项长度可变,最长可达 40 字节。当没有使用“选项”时TCP 的首部长度是 20 字节。TCP 首部总长度由 TCP 头中的“数据偏移”字段决定,前面说了,最长偏移为 60 字节。那么“tcp 选项”的长度最大为 60-20=40 字节。
## tcp选项
TCP 最初只规定了一种选项,即最大报文段长度 MSSMaximum Segment Szie。后来又增加了几个选项如窗口扩大选项、时间戳选项等下面说明常用的选项。
1. kind=0 是选项表结束选项。
2. kind=1 是空操作nop选项
没有特殊含义,一般用于将 TCP 选项的总长度填充为 4 字节的整数倍,为啥需要 4 字节整数倍?因为前面讲了数据偏移字段的单位是 4 个字节。
3. kind=2 是最大报文段长度选项
TCP 连接初始化时通信双方使用该选项来协商最大报文段长度Max Segment SizeMSS。TCP 模块通常将 MSS 设置为MTU-40字节减掉的这 40 字节包括 20 字节的 TCP 头部和 20 字节的 IP 头部)。这样携带 TCP 报文段的 IP 数据报的长度就不会超过 MTU假设 TCP 头部和 IP 头部都不包含选项字段,并且这也是一般情况),从而避免本机发生 IP 分片。对以太网而言MSS 值是 14601500-40字节。
4. kind=3 是窗口扩大因子选项
TCP 连接初始化时,通信双方使用该选项来协商接收通告窗口的扩大因子。在 TCP 的头部中,接收通告窗口大小是用 16 位表示的,故最大为 65535 字节,但实际上 TCP 模块允许的接收通告窗口大小远不止这个数(为了提高 TCP 通信的吞吐量)。窗口扩大因子解决了这个问题。假设 TCP 头部中的接收通告窗口大小是 N窗口扩大因子移位数是 M那么 TCP 报文段的实际接收通告窗口大小是 N 乘 2M或者说 N 左移 M 位。注意M 的取值范围是 0 14。
和 MSS 选项一样,窗口扩大因子选项只能出现在同步报文段中,否则将被忽略。但同步报文段本身不执行窗口扩大操作,即同步报文段头部的接收通告窗口大小就是该 TCP 报文段的实际接收通告窗口大小。当连接建立好之后,每个数据传输方向的窗口扩大因子就固定不变了。关于窗口扩大因子选项的细节,可参考标准文档 RFC 1323。
5. kind=4 是选择性确认Selective AcknowledgmentSACK选项
TCP 通信时,如果某个 TCP 报文段丢失,则 TCP 模块会重传最后被确认的 TCP 报文段后续的所有报文段,这样原先已经正确传输的 TCP 报文段也可能重复发送,从而降低了 TCP 性能。SACK 技术正是为改善这种情况而产生的,它使 TCP 模块只重新发送丢失的 TCP 报文段,不用发送所有未被确认的 TCP 报文段。选择性确认选项用在连接初始化时,表示是否支持 SACK 技术。
6. kind=5 是 SACK 实际工作的选项
该选项的参数告诉发送方本端已经收到并缓存的不连续的数据块从而让发送端可以据此检查并重发丢失的数据块。每个块边沿edge of block参数包含一个 4 字节的序号。其中块左边沿表示不连续块的第一个数据的序号,而块右边沿则表示不连续块的最后一个数据的序号的下一个序号。这样一对参数(块左边沿和块右边沿)之间的数据是没有收到的。因为一个块信息占用 8 字节,所以 TCP 头部选项中实际上最多可以包含 4 个这样的不连续数据块(考虑选项类型和长度占用的 2 字节)。
7. kind=8 是时间戳选项
该选项提供了较为准确的计算通信双方之间的回路时间Round Trip TimeRTT的方法从而为 TCP 流量控制提供重要信息。

View File

@@ -1,11 +1,313 @@
package tcp
import (
"crypto/rand"
"crypto/sha1"
"encoding/binary"
"hash"
"io"
"log"
"netstack/sleep"
"netstack/tcpip"
"netstack/tcpip/header"
"netstack/tcpip/seqnum"
"netstack/tcpip/stack"
"sync"
"time"
)
const (
// tsLen is the length, in bits, of the timestamp in the SYN cookie.
tsLen = 8
// tsMask is a mask for timestamp values (i.e., tsLen bits).
tsMask = (1 << tsLen) - 1
// tsOffset is the offset, in bits, of the timestamp in the SYN cookie.
tsOffset = 24
// hashMask is the mask for hash values (i.e., tsOffset bits).
hashMask = (1 << tsOffset) - 1
// maxTSDiff is the maximum allowed difference between a received cookie
// timestamp and the current timestamp. If the difference is greater
// than maxTSDiff, the cookie is expired.
maxTSDiff = 2
)
var (
// SynRcvdCountThreshold is the global maximum number of connections
// that are allowed to be in SYN-RCVD state before TCP starts using SYN
// cookies to accept connections.
//
// It is an exported variable only for testing, and should not otherwise
// be used by importers of this package.
SynRcvdCountThreshold uint64 = 1000
// mssTable is a slice containing the possible MSS values that we
// encode in the SYN cookie with two bits.
mssTable = []uint16{536, 1300, 1440, 1460}
)
func encodeMSS(mss uint16) uint32 {
for i := len(mssTable) - 1; i > 0; i-- {
if mss >= mssTable[i] {
return uint32(i)
}
}
return 0
}
// syncRcvdCount is the number of endpoints in the SYN-RCVD state. The value is
// protected by a mutex so that we can increment only when it's guaranteed not
// to go above a threshold.
var synRcvdCount struct {
sync.Mutex
value uint64
pending sync.WaitGroup
}
// listenContext is used by a listening endpoint to store state used while
// listening for connections. This struct is allocated by the listen goroutine
// and must not be accessed or have its methods called concurrently as they
// may mutate the stored objects.
type listenContext struct {
stack *stack.Stack
rcvWnd seqnum.Size
nonce [2][sha1.BlockSize]byte // nonce 随机数
hasherMu sync.Mutex
hasher hash.Hash // 散列实现
v6only bool
netProto tcpip.NetworkProtocolNumber
}
// timeStamp returns an 8-bit timestamp with a granularity of 64 seconds.
func timeStamp() uint32 {
return uint32(time.Now().Unix()>>6) & tsMask // 00 00 00 FF
}
// 增加一个任务 最多1000个
func incSynRcvdCount() bool {
synRcvdCount.Mutex.Lock()
defer synRcvdCount.Unlock()
if synRcvdCount.value >= SynRcvdCountThreshold {
return false
}
synRcvdCount.pending.Add(1)
synRcvdCount.value++
return true
}
// 结束一个任务
func decSynRcvdCount() {
synRcvdCount.Mutex.Lock()
defer synRcvdCount.Unlock()
synRcvdCount.value--
synRcvdCount.pending.Done()
}
// newListenContext creates a new listen context.
func newListenContext(stack *stack.Stack, rcvWnd seqnum.Size, v6only bool, netProto tcpip.NetworkProtocolNumber) *listenContext {
l := &listenContext{
stack: stack,
rcvWnd: rcvWnd,
hasher: sha1.New(),
v6only: v6only,
netProto: netProto,
}
rand.Read(l.nonce[0][:])
rand.Read(l.nonce[1][:])
return l
}
// cookieHash calculates the cookieHash for the given id, timestamp and nonce
// index. The hash is used to create and validate cookies.
func (l *listenContext) cookieHash(id stack.TransportEndpointID, ts uint32, nonceIndex int) uint32 {
// Initialize block with fixed-size data: local ports and v.
var payload [8]byte
binary.BigEndian.PutUint16(payload[0:], id.LocalPort)
binary.BigEndian.PutUint16(payload[2:], id.RemotePort)
binary.BigEndian.PutUint32(payload[4:], ts)
// Feed everything to the hasher.
l.hasherMu.Lock()
l.hasher.Reset()
l.hasher.Write(payload[:])
l.hasher.Write(l.nonce[nonceIndex][:])
io.WriteString(l.hasher, string(id.LocalAddress))
io.WriteString(l.hasher, string(id.RemoteAddress))
// Finalize the calculation of the hash and return the first 4 bytes.
h := make([]byte, 0, sha1.Size)
h = l.hasher.Sum(h)
l.hasherMu.Unlock()
return binary.BigEndian.Uint32(h[:])
}
// createCookie creates a SYN cookie for the given id and incoming sequence
// number.
func (l *listenContext) createCookie(id stack.TransportEndpointID,
seq seqnum.Value, data uint32) seqnum.Value {
ts := timeStamp()
v := l.cookieHash(id, 0, 0) + uint32(seq) + (ts << tsOffset)
v += (l.cookieHash(id, ts, 1) + data) & hashMask
return seqnum.Value(v)
}
// isCookieValid checks if the supplied cookie is valid for the given id and
// sequence number. If it is, it also returns the data originally encoded in the
// cookie when createCookie was called.
func (l *listenContext) isCookieValid(id stack.TransportEndpointID,
cookie seqnum.Value, seq seqnum.Value) (uint32, bool) {
ts := timeStamp()
v := uint32(cookie) - l.cookieHash(id, 0, 0) - uint32(seq)
cookieTS := v >> tsOffset
if ((ts - cookieTS) & tsMask) > maxTSDiff {
return 0, false
}
return (v - l.cookieHash(id, cookieTS, 1)) & hashMask, true
}
// 新建一个tcp端 这个tcp端与segment同属一个tcp连接 但属于不同阶段 用于写回远端
func (l *listenContext) createConnectedEndpoint(s *segment, iss seqnum.Value,
irs seqnum.Value, rcvdSynOpts *header.TCPSynOptions) (*endpoint, *tcpip.Error) {
// Create a new endpoint.
netProto := l.netProto
if netProto == 0 {
netProto = s.route.NetProto
}
n := newEndpoint(l.stack, netProto, nil)
n.v6only = l.v6only
n.id = s.id
n.boundNICID = s.route.NICID()
n.route = s.route.Clone()
n.effectiveNetProtos = []tcpip.NetworkProtocolNumber{s.route.NetProto}
n.rcvBufSize = int(l.rcvWnd)
n.maybeEnableTimestamp(rcvdSynOpts)
n.maybeEnableSACKPermitted(rcvdSynOpts)
// Register new endpoint so that packets are routed to it.
// 在网络协议栈中去注册这个tcp端
if err := n.stack.RegisterTransportEndpoint(n.boundNICID,
n.effectiveNetProtos, ProtocolNumber, n.id, n); err != nil {
n.Close()
return nil, err
}
n.isRegistered = true
n.state = stateConnected
// Create sender and receiver.
// The receiver at least temporarily has a zero receive window scale,
// but the caller may change it (before starting the protocol loop).
n.snd = newSender(n, iss, irs, s.window, rcvdSynOpts.MSS, rcvdSynOpts.WS)
n.rcv = newReceiver(n, irs, l.rcvWnd, 0)
return n, nil
}
func (l *listenContext) createEndpointAndPerformHandshake(s *segment, opts *header.TCPSynOptions) (*endpoint, *tcpip.Error) {
// create new endpoint
irs := s.sequenceNumber
cookie := l.createCookie(s.id, irs, encodeMSS(opts.MSS))
log.Println("收到一个远端握手申请", irs, "标记cookie", cookie)
ep, err := l.createConnectedEndpoint(s, cookie, irs, opts)
if err != nil {
return nil, err
}
// 执行三次握手
h, err := newHandshake(ep, l.rcvWnd)
return ep, nil
}
// 一旦侦听端点收到SYN段handleSynSegment就会在其自己的goroutine中调用。它负责完成握手并将新端点排队以进行接受。
// 在TCP开始使用SYN cookie接受连接之前允许使用有限数量的这些goroutine。
func (e *endpoint) handleSynSegment(ctx *listenContext, s *segment, opts *header.TCPSynOptions) {
defer decSynRcvdCount()
defer s.decRef()
_, err := ctx.createEndpointAndPerformHandshake(s, opts)
if err != nil {
return
}
// 到这里,三次握手已经完成,那么分发一个新的连接
//e.deliverAccepted(n)
}
// handleListenSegment is called when a listening endpoint receives a segment
// and needs to handle it.
func (e *endpoint) handleListenSegment(ctx *listenContext, s *segment) {
switch s.flags {
case flagSyn: // syn报文处理
// 分析tcp选项
opts := parseSynSegmentOptions(s)
if incSynRcvdCount() {
s.incRef()
go e.handleSynSegment(ctx, s, &opts)
} else {
log.Println("暂时不处理")
}
// 返回一个syn+ack报文
case flagFin: // fin报文处理
// 三次握手最后一次 ack 报文
}
}
func parseSynSegmentOptions(s *segment) header.TCPSynOptions {
synOpts := header.ParseSynOptions(s.options, s.flagIsSet(flagAck))
if synOpts.TS {
s.parsedOptions.TSVal = synOpts.TSVal
s.parsedOptions.TSEcr = synOpts.TSEcr
}
return synOpts
}
// protocolListenLoop 是侦听TCP端点的主循环。它在自己的goroutine中运行负责处理连接请求
func (e *endpoint) protocolListenLoop(rcvWnd seqnum.Size) *tcpip.Error {
select {}
defer func() {
// TODO 后置处理
}()
e.mu.Lock()
v6only := e.v6only
e.mu.Unlock()
ctx := newListenContext(e.stack, rcvWnd, v6only, e.netProto)
// 初始化事件触发器 并添加事件
s := sleep.Sleeper{}
s.AddWaker(&e.newSegmentWaker, wakerForNewSegment)
s.AddWaker(&e.notificationWaker, wakerForNotification)
for {
switch index, _ := s.Fetch(true); index { // Fetch(true) 阻塞获取
case wakerForNewSegment:
mayRequeue := true
// 接收和处理tcp报文
for i := 0; i < maxSegmentsPerWake; i++ {
s := e.segmentQueue.dequeue()
if s == nil {
mayRequeue = false
break
}
e.handleListenSegment(ctx, s)
s.decRef()
}
// If the queue is not empty, make sure we'll wake up
// in the next iteration.
if mayRequeue && !e.segmentQueue.empty() { // 主协程又添加了新数据
e.newSegmentWaker.Assert() // 重新尝试获取数据
}
case wakerForNotification:
// TODO 触发其他事件
}
}
}

View File

@@ -5,6 +5,16 @@ import (
"netstack/tcpip"
)
// The following are used to set up sleepers.
const (
wakerForNotification = iota
wakerForNewSegment
wakerForResend
wakerForResolution
)
const maxSegmentsPerWake = 100
// protocolMainLoop 是TCP协议的主循环。它在自己的goroutine中运行负责握手、发送段和处理收到的段
func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error {
for {

View File

@@ -65,6 +65,20 @@ type endpoint struct {
// workerRunning specifies if a worker goroutine is running.
workerRunning bool
// sendTSOk is used to indicate when the TS Option has been negotiated.
// When sendTSOk is true every non-RST segment should carry a TS as per
// RFC7323#section-1.1
sendTSOk bool
// recentTS is the timestamp that should be sent in the TSEcr field of
// the timestamp for future segments sent by the endpoint. This field is
// updated if required when a new segment is received by this endpoint.
recentTS uint32
// sackPermitted is set to true if the peer sends the TCPSACKPermitted
// option in the SYN/SYN-ACK.
sackPermitted bool
segmentQueue segmentQueue
// When the send side is closed, the protocol goroutine is notified via
@@ -78,11 +92,25 @@ type endpoint struct {
sndWaker sleep.Waker
sndCloseWaker sleep.Waker
// notificationWaker is used to indicate to the protocol goroutine that
// it needs to wake up and check for notifications.
notificationWaker sleep.Waker
// newSegmentWaker is used to indicate to the protocol goroutine that
// it needs to wake up and handle new segments queued to it.
// HandlePacket收到segment后通知处理的事件驱动器
newSegmentWaker sleep.Waker
// acceptedChan is used by a listening endpoint protocol goroutine to
// send newly accepted connections to the endpoint so that they can be
// read by Accept() calls.
acceptedChan chan *endpoint
// The following are only used from the protocol goroutine, and
// therefore don't need locks to protect them.
rcv *receiver
snd *sender
// The following are only used to assist the restore run to re-connect.
bindAddress tcpip.Address
connectingAddress tcpip.Address
@@ -97,7 +125,6 @@ func newEndpoint(stack *stack.Stack, netProto tcpip.NetworkProtocolNumber, waite
sndBufSize: DefaultBufferSize,
}
// TODO 需要添加
log.Println("新建tcp端")
e.segmentQueue.setLimit(2 * e.rcvBufSize)
return e
}
@@ -339,7 +366,7 @@ func (e *endpoint) HandlePacket(r *stack.Route, id stack.TransportEndpointID, vv
log.Printf("收到 tcp [%s] 报文片段 from %s, seq: %d, ack: %d",
flagString(s.flags), fmt.Sprintf("%s:%d", s.id.RemoteAddress, s.id.RemotePort),
s.sequenceNumber, s.ackNumber)
//e.newSegmentWaker.Assert()
e.newSegmentWaker.Assert()
} else {
// The queue is full, so we drop the segment.
e.stack.Stats().DroppedPackets.Increment()
@@ -350,3 +377,27 @@ func (e *endpoint) HandlePacket(r *stack.Route, id stack.TransportEndpointID, vv
func (e *endpoint) HandleControlPacket(id stack.TransportEndpointID, typ stack.ControlType, extra uint32, vv buffer.VectorisedView) {
}
// maybeEnableTimestamp marks the timestamp option enabled for this endpoint if
// the SYN options indicate that timestamp option was negotiated. It also
// initializes the recentTS with the value provided in synOpts.TSval.
func (e *endpoint) maybeEnableTimestamp(synOpts *header.TCPSynOptions) {
if synOpts.TS {
e.sendTSOk = true
e.recentTS = synOpts.TSVal
}
}
// maybeEnableSACKPermitted marks the SACKPermitted option enabled for this endpoint
// if the SYN options indicate that the SACK option was negotiated and the TCP
// stack is configured to enable TCP SACK option.
func (e *endpoint) maybeEnableSACKPermitted(synOpts *header.TCPSynOptions) {
var v SACKEnabled
if err := e.stack.TransportProtocolOption(ProtocolNumber, &v); err != nil {
// Stack doesn't support SACK. So just return.
return
}
if bool(v) && synOpts.SACKPermitted {
e.sackPermitted = true
}
}

View File

@@ -24,6 +24,10 @@ const (
maxBufferSize = 4 << 20 // 4MB
)
// SACKEnabled option can be used to enable SACK support in the TCP
// protocol. See: https://tools.ietf.org/html/rfc2018.
type SACKEnabled bool
type protocol struct{}
// Number returns the tcp protocol number.

View File

@@ -0,0 +1,11 @@
package tcp
import "netstack/tcpip/seqnum"
type receiver struct{}
// 新建并初始化接收器
func newReceiver(ep *endpoint, irs seqnum.Value, rcvWnd seqnum.Size, rcvWndScale uint8) *receiver {
r := &receiver{}
return r
}

View File

@@ -1,6 +1,7 @@
package tcp
import (
"fmt"
"log"
"netstack/tcpip/buffer"
"netstack/tcpip/header"
@@ -48,7 +49,7 @@ func flagString(flags uint8) string {
// segment 表示一个 TCP 段。它保存有效负载和解析的 TCP 段信息,并且可以添加到侵入列表中
type segment struct {
segmentEntry
refCnt int32
refCnt int32 // 引用计数
id stack.TransportEndpointID
route stack.Route
data buffer.VectorisedView
@@ -72,6 +73,36 @@ func newSegment(r *stack.Route, id stack.TransportEndpointID, vv buffer.Vectoris
return s
}
func newSegmentFromView(r *stack.Route, id stack.TransportEndpointID, v buffer.View) *segment {
s := &segment{
refCnt: 1,
id: id,
route: r.Clone(),
}
s.views[0] = v
s.data = buffer.NewVectorisedView(len(v), s.views[:1]) // TODO 为什么只复制1?
return s
}
func (s *segment) clone() *segment {
t := &segment{
refCnt: 1,
id: s.id,
sequenceNumber: s.sequenceNumber,
ackNumber: s.ackNumber,
flags: s.flags,
window: s.window,
route: s.route.Clone(),
viewToDeliver: s.viewToDeliver,
}
t.data = s.data.Clone(t.views[:])
return t
}
func (s *segment) flagIsSet(flag uint8) bool {
return (s.flags & flag) != 0
}
func (s *segment) decRef() {
if atomic.AddInt32(&s.refCnt, -1) == 0 {
s.route.Release()
@@ -83,14 +114,17 @@ func (s *segment) incRef() {
}
func (s *segment) parse() bool {
log.Println(header.TCP(s.data.First()))
h := header.TCP(s.data.First())
offset := int(h.DataOffset())
if offset < header.TCPMinimumSize || offset > len(h) {
return false
}
s.options = h.Options()
//s.parsedOptions = header.ParseTCPOptions(s.options)
s.parsedOptions = header.ParseTCPOptions(s.options)
log.Println(h)
fmt.Println(s.parsedOptions)
s.data.TrimFront(offset)
s.sequenceNumber = seqnum.Value(h.SequenceNumber())

View File

@@ -0,0 +1,12 @@
package tcp
import "netstack/tcpip/seqnum"
type sender struct {
}
// 新建并初始化发送器
func newSender(ep *endpoint, iss, irs seqnum.Value, sndWnd seqnum.Size, mss uint16, sndWndScale int) *sender {
s := &sender{}
return s
}