处理sleep的链接问题

This commit is contained in:
impact-eintr
2022-11-24 19:35:26 +08:00
parent 91ec6f0433
commit e51d8ea721
15 changed files with 744 additions and 38 deletions

View File

@@ -0,0 +1,308 @@
// Copyright 2018 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stack
import (
"fmt"
"log"
"sync"
"time"
"netstack/sleep"
"netstack/tcpip"
)
const linkAddrCacheSize = 512 // max cache entries
// linkAddrCache is a fixed-sized cache mapping IP addresses to link addresses.
//
// The entries are stored in a ring buffer, oldest entry replaced first.
//
// This struct is safe for concurrent use.
type linkAddrCache struct {
// ageLimit is how long a cache entry is valid for.
ageLimit time.Duration
// resolutionTimeout is the amount of time to wait for a link request to
// resolve an address.
resolutionTimeout time.Duration
// resolutionAttempts is the number of times an address is attempted to be
// resolved before failing.
resolutionAttempts int
mu sync.Mutex
cache map[tcpip.FullAddress]*linkAddrEntry
next int // array index of next available entry
entries [linkAddrCacheSize]linkAddrEntry
}
// entryState controls the state of a single entry in the cache.
type entryState int
const (
// incomplete means that there is an outstanding request to resolve the
// address. This is the initial state.
incomplete entryState = iota
// ready means that the address has been resolved and can be used.
ready
// failed means that address resolution timed out and the address
// could not be resolved.
failed
// expired means that the cache entry has expired and the address must be
// resolved again.
expired
)
// String implements Stringer.
func (s entryState) String() string {
switch s {
case incomplete:
return "incomplete"
case ready:
return "ready"
case failed:
return "failed"
case expired:
return "expired"
default:
return fmt.Sprintf("unknown(%d)", s)
}
}
// A linkAddrEntry is an entry in the linkAddrCache.
// This struct is thread-compatible.
type linkAddrEntry struct {
addr tcpip.FullAddress
linkAddr tcpip.LinkAddress
expiration time.Time
s entryState
// wakers is a set of waiters for address resolution result. Anytime
// state transitions out of 'incomplete' these waiters are notified.
wakers map[*sleep.Waker]struct{}
done chan struct{}
}
func (e *linkAddrEntry) state() entryState {
if e.s != expired && time.Now().After(e.expiration) {
// Force the transition to ensure waiters are notified.
e.changeState(expired)
}
return e.s
}
func (e *linkAddrEntry) changeState(ns entryState) {
if e.s == ns {
return
}
// Validate state transition.
switch e.s {
case incomplete:
// All transitions are valid.
case ready, failed:
if ns != expired {
panic(fmt.Sprintf("invalid state transition from %s to %s", e.s, ns))
}
case expired:
// Terminal state.
panic(fmt.Sprintf("invalid state transition from %s to %s", e.s, ns))
default:
panic(fmt.Sprintf("invalid state: %s", e.s))
}
// Notify whoever is waiting on address resolution when transitioning
// out of 'incomplete'.
if e.s == incomplete {
for w := range e.wakers {
w.Assert()
}
e.wakers = nil
if e.done != nil {
close(e.done)
}
}
e.s = ns
}
func (e *linkAddrEntry) addWaker(w *sleep.Waker) {
e.wakers[w] = struct{}{}
}
func (e *linkAddrEntry) removeWaker(w *sleep.Waker) {
delete(e.wakers, w)
}
// add adds a k -> v mapping to the cache.
func (c *linkAddrCache) add(k tcpip.FullAddress, v tcpip.LinkAddress) {
log.Printf("add link cache: %v-%v", k, v)
c.mu.Lock()
defer c.mu.Unlock()
entry, ok := c.cache[k]
if ok {
s := entry.state()
if s != expired && entry.linkAddr == v {
// Disregard repeated calls.
return
}
// Check if entry is waiting for address resolution.
if s == incomplete {
entry.linkAddr = v
} else {
// Otherwise create a new entry to replace it.
entry = c.makeAndAddEntry(k, v)
}
} else {
entry = c.makeAndAddEntry(k, v)
}
entry.changeState(ready)
}
// makeAndAddEntry is a helper function to create and add a new
// entry to the cache map and evict older entry as needed.
func (c *linkAddrCache) makeAndAddEntry(k tcpip.FullAddress, v tcpip.LinkAddress) *linkAddrEntry {
// Take over the next entry.
entry := &c.entries[c.next]
if c.cache[entry.addr] == entry {
delete(c.cache, entry.addr)
}
// Mark the soon-to-be-replaced entry as expired, just in case there is
// someone waiting for address resolution on it.
entry.changeState(expired)
*entry = linkAddrEntry{
addr: k,
linkAddr: v,
expiration: time.Now().Add(c.ageLimit),
wakers: make(map[*sleep.Waker]struct{}),
done: make(chan struct{}),
}
c.cache[k] = entry
c.next = (c.next + 1) % len(c.entries)
return entry
}
// get reports any known link address for k.
func (c *linkAddrCache) get(k tcpip.FullAddress, linkRes LinkAddressResolver, localAddr tcpip.Address, linkEP LinkEndpoint, waker *sleep.Waker) (tcpip.LinkAddress, <-chan struct{}, *tcpip.Error) {
log.Printf("link addr get linkRes: %#v, addr: %+v", linkRes, k)
if linkRes != nil {
if addr, ok := linkRes.ResolveStaticAddress(k.Addr); ok {
return addr, nil, nil
}
}
c.mu.Lock()
defer c.mu.Unlock()
// 尝试从缓存中得到MAC地址
if entry, ok := c.cache[k]; ok {
switch s := entry.state(); s {
case expired:
case ready:
return entry.linkAddr, nil, nil
case failed:
return "", nil, tcpip.ErrNoLinkAddress
case incomplete:
// Address resolution is still in progress.
entry.addWaker(waker)
return "", entry.done, tcpip.ErrWouldBlock
default:
panic(fmt.Sprintf("invalid cache entry state: %s", s))
}
}
if linkRes == nil {
return "", nil, tcpip.ErrNoLinkAddress
}
// Add 'incomplete' entry in the cache to mark that resolution is in progress.
e := c.makeAndAddEntry(k, "")
e.addWaker(waker)
go c.startAddressResolution(k, linkRes, localAddr, linkEP, e.done)
return "", e.done, tcpip.ErrWouldBlock
}
// removeWaker removes a waker previously added through get().
func (c *linkAddrCache) removeWaker(k tcpip.FullAddress, waker *sleep.Waker) {
c.mu.Lock()
defer c.mu.Unlock()
if entry, ok := c.cache[k]; ok {
entry.removeWaker(waker)
}
}
func (c *linkAddrCache) startAddressResolution(k tcpip.FullAddress, linkRes LinkAddressResolver, localAddr tcpip.Address, linkEP LinkEndpoint, done <-chan struct{}) {
for i := 0; ; i++ {
// Send link request, then wait for the timeout limit and check
// whether the request succeeded.
linkRes.LinkAddressRequest(k.Addr, localAddr, linkEP)
select {
case <-time.After(c.resolutionTimeout):
if stop := c.checkLinkRequest(k, i); stop {
return
}
case <-done:
return
}
}
}
// checkLinkRequest checks whether previous attempt to resolve address has succeeded
// and mark the entry accordingly, e.g. ready, failed, etc. Return true if request
// can stop, false if another request should be sent.
func (c *linkAddrCache) checkLinkRequest(k tcpip.FullAddress, attempt int) bool {
c.mu.Lock()
defer c.mu.Unlock()
entry, ok := c.cache[k]
if !ok {
// Entry was evicted from the cache.
return true
}
switch s := entry.state(); s {
case ready, failed, expired:
// Entry was made ready by resolver or failed. Either way we're done.
return true
case incomplete:
if attempt+1 >= c.resolutionAttempts {
// Max number of retries reached, mark entry as failed.
entry.changeState(failed)
return true
}
// No response yet, need to send another ARP request.
return false
default:
panic(fmt.Sprintf("invalid cache entry state: %s", s))
}
}
func newLinkAddrCache(ageLimit, resolutionTimeout time.Duration, resolutionAttempts int) *linkAddrCache {
return &linkAddrCache{
ageLimit: ageLimit,
resolutionTimeout: resolutionTimeout,
resolutionAttempts: resolutionAttempts,
cache: make(map[tcpip.FullAddress]*linkAddrEntry, linkAddrCacheSize),
}
}

30
tcpip/stack/nic.go Normal file
View File

@@ -0,0 +1,30 @@
package stack
import (
"netstack/ilist"
"netstack/tcpip"
"sync"
)
// 代表一个网卡对象 当我们创建好tap网卡对象后 我们使用NIC来代表它在我们自己的协议栈中的网卡对象
type NIC struct {
stack *Stack
// 每个网卡的惟一标识号
id tcpip.NICID
// 网卡名,可有可无
name string
// 链路层端
linkEP LinkEndpoint // 在链路层 这就是 fdbased.endpoint
// 传输层的解复用
demux *transportDemuxer
mu sync.RWMutex
spoofing bool
promiscuous bool // 混杂模式
primary map[tcpip.NetworkProtocolNumber]*ilist.List
// 网络层端的记录
endpoints map[NetworkEndpoingID]*referencedNetworkEndpoint
// 子网的记录
subnets []tcpip.Subnet
}

View File

@@ -1,6 +1,8 @@
package stack
import (
"netstack/ilist"
"netstack/sleep"
"netstack/tcpip"
"netstack/tcpip/buffer"
"sync"
@@ -14,6 +16,8 @@ const (
CapabilityLoopback
)
// ====================链路层相关==============================
// 所谓 io 就是数据的输入输出,对于网卡来说就是接收或发送数据,
// 接收意味着对以太网帧解封装和提交给网络层,发送意味着对上层数据的封装和写入网卡
@@ -45,6 +49,38 @@ type LinkEndpoint interface {
IsAttached() bool
}
// LinkAddressResolver 是对可以解析链接地址的 NetworkProtocol 的扩展 TODO 需要解读
type LinkAddressResolver interface {
LinkAddressRequest(addr, localAddr tcpip.Address, linkEP LinkEndpoint) *tcpip.Error
ResolveStaticAddress(addr tcpip.Address) (tcpip.LinkAddress, bool)
LinkAddressProtocol() tcpip.NetworkProtocolNumber
}
// A LinkAddressCache caches link addresses.
type LinkAddressCache interface {
// CheckLocalAddress determines if the given local address exists, and if it
// does not exist.
CheckLocalAddress(nicid tcpip.NICID, protocol tcpip.NetworkProtocolNumber, addr tcpip.Address) tcpip.NICID
// AddLinkAddress adds a link address to the cache.
AddLinkAddress(nicid tcpip.NICID, addr tcpip.Address, linkAddr tcpip.LinkAddress)
// GetLinkAddress looks up the cache to translate address to link address (e.g. IP -> MAC).
// If the LinkEndpoint requests address resolution and there is a LinkAddressResolver
// registered with the network protocol, the cache attempts to resolve the address
// and returns ErrWouldBlock. Waker is notified when address resolution is
// complete (success or not).
//
// If address resolution is required, ErrNoLinkAddress and a notification channel is
// returned for the top level caller to block. Channel is closed once address resolution
// is complete (success or not).
GetLinkAddress(nicid tcpip.NICID, addr, localAddr tcpip.Address, protocol tcpip.NetworkProtocolNumber, w *sleep.Waker) (tcpip.LinkAddress, <-chan struct{}, *tcpip.Error)
// RemoveWaker removes a waker that has been added in GetLinkAddress().
RemoveWaker(nicid tcpip.NICID, addr tcpip.Address, waker *sleep.Waker)
}
type NetworkDispatcher interface {
DeliverNetworkPacket(linkEP LinkEndpoint, dstLinkAddr, srcLinkAddr tcpip.LinkAddress,
@@ -53,16 +89,64 @@ type NetworkDispatcher interface {
type LinkEndpointCapabilities uint
var (
// 传输层协议的注册存储结构 TODO
// 网络层协议的注册存储结构 TODO
linkEPMu sync.RWMutex
linkEPMu sync.RWMutex
nextLinkEndpointID tcpip.LinkEndpointID = 1
linkEndpoints = make(map[tcpip.LinkEndpointID]LinkEndpoint) // 设备注册表 设备号:设备实现
linkEndpoints = make(map[tcpip.LinkEndpointID]LinkEndpoint) // 设备注册表 设备号:设备实现
)
// ==============================网络层相关==============================
type NetworkProtocol interface {
// TODO 需要添加
}
// NetworkEndpoint是需要由网络层协议例如ipv4ipv6的端点实现的接口
type NetworkEndpoint interface {
// TODO 需要添加
}
type NetworkEndpoingID struct {
LocalAddress tcpip.Address
}
// ==============================传输层相关==============================
type TransportEndpointID struct {
// TODO
}
// ControlType 是网络层控制消息的类型
type ControlType int
// TODO 需要解读
type TransportEndpoint interface {
HandlePacket(r *Route, id TransportEndpointID, vv buffer.VectorisedView)
HandleControlPacker(id TransportEndpointID, typ ControlType, extra uint32, vv buffer.VectorisedView)
}
// TODO 需要解读
type referencedNetworkEndpoint struct {
ilist.Entry
refs int32
ep NetworkEndpoint
nic *NIC
protocol tcpip.NetworkProtocolNumber
// linkCache is set if link address resolution is enabled for this
// protocol. Set to nil otherwise.
linkCache LinkAddressCache
linkAddrCache
// holdsInsertRef is protected by the NIC's mutex. It indicates whether
// the reference count is biased by 1 due to the insertion of the
// endpoint. It is reset to false when RemoveAddress is called on the
// NIC.
holdsInsertRef bool
}
// 注册一个链路层设备
func RegisterLinkEndpoint(linkEP LinkEndpoint) tcpip.LinkEndpointID {
linkEPMu.Lock()

View File

@@ -11,7 +11,6 @@ type Route struct {
// 远端网卡MAC地址
RemoteLinkAddress tcpip.LinkAddress
// 本地网络层地址 ipv4 or ipv6 地址
LocalAddress tcpip.Address
// 本地网卡MAC地址
@@ -24,5 +23,5 @@ type Route struct {
NetProto tcpip.NetworkProtocolNumber
// 相关的网络终端
//ref *referenceNetworkEndpoint
ref *referencedNetworkEndpoint
}

75
tcpip/stack/stack.go Normal file
View File

@@ -0,0 +1,75 @@
package stack
import (
"netstack/tcpip"
"netstack/tcpip/ports"
"sync"
)
// TODO 需要解读
type TCPProbeFunc func(s TcpEndpointState)
// TODO 需要解读
type TcpEndpointState struct {
// TODO 需要添加
}
type transportProtocolState struct {
}
// Stack 是一个网络堆栈具有所有支持的协议、NIC 和路由表。
type Stack struct {
transportProtocols map[tcpip.TransportProtocolNumber]*transportProtocolState // 各种传输层协议
networkProtocols map[tcpip.NetworkProtocolNumber]NetworkProtocol // 各种网络层协议
linkAddrResolvers map[tcpip.NetworkProtocolNumber]LinkAddressResolver // 各种传输层协议
demux *transportDemuxer // 传输层的复用器
stats tcpip.Stats // 网络栈的状态监测器
linkAddrCache *linkAddrCache // 链路层地址的缓存
mu sync.RWMutex
nics map[tcpip.NICID]*NIC // 所有的网卡设备
forwarding bool // 是否正在转发
// route is the route table passed in by the user via SetRouteTable(),
// it is used by FindRoute() to build a route for a specific
// destination.
routeTable []tcpip.Route // 路由表
*ports.PortManager // 端口管理器
// If not nil, then any new endpoints will have this probe function
// invoked everytime they receive a TCP segment.
tcpProbeFunc TCPProbeFunc
// clock is used to generate user-visible times.
clock tcpip.Clock
}
func (s *Stack) CreateNIC(id tcpip.NICID, linkEP tcpip.LinkEndpointID) *tcpip.Error {
return s.createNIC(id, "", linkEP, true)
}
// 新建一个网卡对象,并且激活它 激活就是准备好熊网卡中读取和写入数据
func (s *Stack) createNIC(id tcpip.NICID, name string, linkEP tcpip.LinkEndpointID, enable bool) *tcpip.Error {
ep := FindLinkEndpoint(linkEP)
if ep == nil {
return tcpip.ErrBadLinkEndpoint
}
s.mu.Lock()
defer s.mu.Unlock()
// Make sure id is unique
if _, ok := s.nics[id]; ok {
return tcpip.ErrDuplicateNICID
}
n := newIC(s, id, name, ep)
s.nics[id] = n
if enable {
n.attachLinkEndpoint()
}
}

21
tcpip/stack/stack_test.go Normal file
View File

@@ -0,0 +1,21 @@
package stack_test
import (
"netstack/tcpip/link/channel"
"netstack/tcpip/stack"
"testing"
)
const (
defaultMTU = 65536
)
func TestStackBase(t *testing.T) {
myStack := &stack.Stack{}
id, _ := channel.New(10, defaultMTU, "")
if err := myStack.CreateNIC(1, id); err != nil {
panic(err)
}
}

View File

@@ -0,0 +1,23 @@
package stack
import (
"netstack/tcpip"
"sync"
)
// 网络层协议号和传输层协议号的组合 当作分流器的key值
type protocolIDs struct {
network tcpip.NetworkProtocolNumber
transport tcpip.TransportProtocolNumber
}
type transportEndpoints struct {
mu sync.RWMutex
endpoints map[TransportEndpointID]TransportEndpoint
}
// transportDemuxer 解复用战队传输端点的数据包
// 他执行两级解复用:首先基于网络层和传输协议 然后基于端点ID
type transportDemuxer struct {
protocol map[protocolIDs]*transportEndpoints
}