fdbased endpoint

This commit is contained in:
impact-eintr
2022-11-23 11:31:21 +08:00
parent 6011e0f8f3
commit 990a0c2901
7 changed files with 314 additions and 8 deletions

View File

@@ -27,7 +27,7 @@ type Ethernet []byte
const ( const (
// EthernetMinimumSize以太网帧最小的长度 // EthernetMinimumSize以太网帧最小的长度
EthernetMinimumSize = 14 EthernetMinimumSize = 14 // 6 + 6 + 2
// EthernetAddressSize以太网地址的长度 // EthernetAddressSize以太网地址的长度
EthernetAddressSize = 6 EthernetAddressSize = 6

View File

@@ -27,7 +27,7 @@ type endpoint struct {
closed func(*tcpip.Error) closed func(*tcpip.Error)
iovers []syscall.Iovec iovecs []syscall.Iovec
views []buffer.View views []buffer.View
dispatcher stack.NetworkDispatcher dispatcher stack.NetworkDispatcher
@@ -73,7 +73,7 @@ func New(opts *Options) tcpip.LinkEndpointID {
addr: opts.Address, addr: opts.Address,
hdrSize: header.EthernetMinimumSize, hdrSize: header.EthernetMinimumSize,
views: make([]buffer.View, len(BufConfig)), views: make([]buffer.View, len(BufConfig)),
iovers: make([]syscall.Iovec, len(BufConfig)), iovecs: make([]syscall.Iovec, len(BufConfig)),
handleLocal: opts.HandleLocal, handleLocal: opts.HandleLocal,
} }
@@ -128,8 +128,9 @@ func (e *endpoint) WritePacket(r *stack.Route, hdr buffer.Prependable,
eth.Encode(ethHdr) // 将以太帧信息作为报文头编入 eth.Encode(ethHdr) // 将以太帧信息作为报文头编入
// 写入网卡中 // 写入网卡中
if payload.Size() == 0 { if payload.Size() == 0 {
return rawfile return rawfile.NonBlockingWrite(e.fd, hdr.View())
} }
return rawfile.NonBlockingWrite2(e.fd, hdr.View(), payload.ToView())
} }
// Attach 启动从文件描述符中读取数据包的goroutine,并通过提供的分发函数来分发数据报 // Attach 启动从文件描述符中读取数据包的goroutine,并通过提供的分发函数来分发数据报
@@ -142,3 +143,82 @@ func (e *endpoint) Attach(dispatcher stack.NetworkDispatcher) {
func (e *endpoint) IsAttached() bool { func (e *endpoint) IsAttached() bool {
return e.dispatcher != nil return e.dispatcher != nil
} }
// 截取需要的内容
func (e *endpoint) capViews(n int, buffers []int) int {
c := 0
for i, s := range buffers {
c += s
if c >= n {
e.views[i].CapLength(s - (c - n))
return i + 1
}
}
return len(buffers)
}
// 按照bufConfig的长度分配内存大小
// 注意e.views 和 e.iovecs共用相同的内存块
func (e *endpoint) allocateViews(bufConfig []int) {
for i, v := range e.views {
if v != nil {
break
}
b := buffer.NewView(bufConfig[i]) // 分配内存
e.views[i] = b
e.iovecs[i] = syscall.Iovec{
Base: &b[0],
Len: uint64(len(b)),
}
}
}
func (e *endpoint) dispatch() (bool, *tcpip.Error) {
// 读取数据缓存的分配
e.allocateViews(BufConfig)
// 从网卡读取数据
n, err := rawfile.BlockingReadv(e.fd, e.iovecs) // 读到ioves中相当于读到views中
if err != nil {
return false, err
}
if n <= e.hdrSize {
return false, nil // 读到的数据比头部还小 直接丢弃
}
var (
p tcpip.NetworkProtocolNumber
remoteLinkAddr, localLinkAddr tcpip.LinkAddress // 目标MAC 源MAC
)
// 获取以太网头部信息
eth := header.Ethernet(e.views[0])
p = eth.Type()
remoteLinkAddr = eth.SourceAddress()
localLinkAddr = eth.DestinationAddress()
used := e.capViews(n, BufConfig) // 从缓存中截有效的内容
vv := buffer.NewVectorisedView(n, e.views[:used]) // 用这些有效的内容构建vv
vv.TrimFront(e.hdrSize) // 将数据内容删除以太网头部信息 将网络层作为数据头
e.dispatcher.DeliverNetworkPacket(e, remoteLinkAddr, localLinkAddr, p, vv)
// 将分发后的数据无效化(设置nil可以让gc回收这些内存)
for i := 0;i < used;i++ {
e.views[i] = nil
}
return true, nil
}
// 循环地从fd中读取数据 然后就爱那个数据报分发给协议栈
func (e *endpoint) dispatchLoop() *tcpip.Error {
for {
cont, err := e.dispatch()
if err != nil || !cont {
if e.closed != nil {
e.closed(err)
}
return err
}
}
}

View File

@@ -0,0 +1,67 @@
package fdbased
import (
"netstack/tcpip"
"netstack/tcpip/buffer"
"netstack/tcpip/stack"
"syscall"
"testing"
)
const (
mtu = 1500
laddr = tcpip.LinkAddress("\x11\x22\x33\x44\x55\x66")
raddr = tcpip.LinkAddress("\x77\x88\x99\xaa\xbb\xcc")
proto = 10
)
type packetInfo struct {
raddr tcpip.LinkAddress
proto tcpip.NetworkProtocolNumber
contents buffer.View
}
type context struct {
t *testing.T
fds [2]int
ep stack.LinkEndpoint
ch chan packetInfo
done chan struct{}
}
func NewContext(t *testing.T, opt *Options) *context {
fds, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_SEQPACKET, 0)
if err != nil {
t.Fatalf("Socketpair failed: %v", err)
}
done := make(chan struct{}, 1)
opt.ClosedFunc = func(*tcpip.Error) {
done <- struct{}{}
}
opt.FD = fds[1]
ep := stack.FindLinkEndpoint(New(opt)).(*endpoint)
c := &context{
t: t,
fds: fds,
ep:ep,
ch: make(chan packetInfo, 100),
done: done,
}
ep.Attach(c)
return c
}
func (c *context) DeliverNetworkPacket(linkEP stack.LinkEndpoint,
dstLinkAddr, srcLinkAddr tcpip.LinkAddress,
protocol tcpip.NetworkProtocolNumber, vv buffer.VectorisedView) {
c.ch <- packetInfo{dstLinkAddr, protocol, vv.ToView()}
}
func TestFdbased(t *testing.T) {
}

View File

@@ -41,10 +41,46 @@ func NonBlockingWrite(fd int, buf []byte) *tcpip.Error {
if len(buf) > 0 { if len(buf) > 0 {
ptr = unsafe.Pointer(&buf[0]) ptr = unsafe.Pointer(&buf[0])
} }
_, _, e := syscall.RawSyscall(syscall.SYS_WRITE, uintptr(fd),
uintptr(ptr), uintptr(len(buf)))
if e != 0 {
return TranslateErrno(e)
}
return nil
} }
func NonBlockingWrite2(fd int, b1, b2 []byte) *tcpip.Error { func NonBlockingWrite2(fd int, b1, b2 []byte) *tcpip.Error {
if len(b2) == 0 {
return NonBlockingWrite(fd, b1)
}
/*
#include <sys/uio.h>
struct iovec {
void *iov_base;
size_t iov_len;
};
**/
iovec := [...]syscall.Iovec{
{
Base: &b1[0],
Len: uint64(len(b1)),
},
{
Base: &b2[0],
Len: uint64(len(b2)),
},
}
// ssize_t writev(int fildes, const struct iovec *iov, int iovcnt);
_, _, e := syscall.RawSyscall(syscall.SYS_WRITEV, uintptr(fd),
uintptr(unsafe.Pointer(&iovec[0])), uintptr(len(iovec)))
if e != 0 {
return TranslateErrno(e)
}
return nil
} }
func BlockingRead(fd int, b []byte) (int, *tcpip.Error) { func BlockingRead(fd int, b []byte) (int, *tcpip.Error) {
@@ -69,7 +105,8 @@ func BlockingRead(fd int, b []byte) (int, *tcpip.Error) {
func BlockingReadv(fd int, iovecs []syscall.Iovec) (int, *tcpip.Error) { func BlockingReadv(fd int, iovecs []syscall.Iovec) (int, *tcpip.Error) {
for { for {
n, _, e := syscall.RawSyscall(syscall.SYS_READV, uintptr(fd), uintptr(unsafe. Pointer(&iovecs[0])), uintptr(len(iovecs))) n, _, e := syscall.RawSyscall(syscall.SYS_READV, uintptr(fd),
uintptr(unsafe.Pointer(&iovecs[0])), uintptr(len(iovecs)))
if e == 0 { if e == 0 {
return int(n), nil return int(n), nil
} }

View File

@@ -3,6 +3,15 @@ package stack
import ( import (
"netstack/tcpip" "netstack/tcpip"
"netstack/tcpip/buffer" "netstack/tcpip/buffer"
"sync"
)
const (
CapabilityChecksumOffload LinkEndpointCapabilities = 1 << iota
CapabilityResolutionRequired
CapabilitySaveRestore
CapabilityDisconnectOK
CapabilityLoopback
) )
// 所谓 io 就是数据的输入输出,对于网卡来说就是接收或发送数据, // 所谓 io 就是数据的输入输出,对于网卡来说就是接收或发送数据,
@@ -14,6 +23,9 @@ type LinkEndpoint interface {
// 当这种物理网络不存在时限制通常为64k其中包括IP数据包的最大大小。 // 当这种物理网络不存在时限制通常为64k其中包括IP数据包的最大大小。
MTU() uint32 MTU() uint32
// Capabilities返回链路层端点支持的功能集。
Capabilities() LinkEndpointCapabilities
// MaxHeaderLength 返回数据链接(和较低级别的图层组合)标头可以具有的最大大小。 // MaxHeaderLength 返回数据链接(和较低级别的图层组合)标头可以具有的最大大小。
// 较高级别使用此信息来保留它们正在构建的数据包前面预留空间。 // 较高级别使用此信息来保留它们正在构建的数据包前面预留空间。
MaxHeaderLength() uint16 MaxHeaderLength() uint16
@@ -40,3 +52,33 @@ type NetworkDispatcher interface {
} }
type LinkEndpointCapabilities uint type LinkEndpointCapabilities uint
var (
// 传输层协议的注册存储结构 TODO
// 网络层协议的注册存储结构 TODO
linkEPMu sync.RWMutex
nextLinkEndpointID tcpip.LinkEndpointID = 1
linkEndpoints = make(map[tcpip.LinkEndpointID]LinkEndpoint) // 设备注册表 设备号:设备实现
)
// 注册一个链路层设备
func RegisterLinkEndpoint(linkEP LinkEndpoint) tcpip.LinkEndpointID {
linkEPMu.Lock()
defer linkEPMu.Unlock()
v := nextLinkEndpointID
nextLinkEndpointID++
linkEndpoints[v] = linkEP
return v
}
func FindLinkEndpoint(id tcpip.LinkEndpointID) LinkEndpoint {
linkEPMu.RLock()
defer linkEPMu.RUnlock()
return linkEndpoints[id]
}

View File

@@ -1,11 +1,28 @@
package stack package stack
import ( import "netstack/tcpip"
)
// 贯穿整个协议栈的路由,也就是在链路层和网络层都可以路由 // 贯穿整个协议栈的路由,也就是在链路层和网络层都可以路由
// 如果目标地址是链路层地址,那么在链路层路由, // 如果目标地址是链路层地址,那么在链路层路由,
// 如果目标地址是网络层地址,那么在网络层路由。 // 如果目标地址是网络层地址,那么在网络层路由。
type Route struct { type Route struct {
// TODO // 远端网络层地址 ipv4 or ipv6 地址
RemoteAddress tcpip.Address
// 远端网卡MAC地址
RemoteLinkAddress tcpip.LinkAddress
// 本地网络层地址 ipv4 or ipv6 地址
LocalAddress tcpip.Address
// 本地网卡MAC地址
LocalLinkAddress tcpip.LinkAddress
// 下一跳网络层地址
NextHop tcpip.Address
// 网络层协议号
NetProto tcpip.NetworkProtocolNumber
// 相关的网络终端
ref *referenceNetworkEndpoint
} }

View File

@@ -1,5 +1,10 @@
package tcpip package tcpip
import (
"fmt"
"strings"
)
type Error struct { type Error struct {
msg string msg string
@@ -51,10 +56,68 @@ var (
ErrNoBufferSpace = &Error{msg: "no buffer space available"} ErrNoBufferSpace = &Error{msg: "no buffer space available"}
) )
type Address string
type AddressMask string
func (a AddressMask) String() string {
return Address(a).String()
}
// LinkAddress 是一个字节切片,转换为表示链接地址的字符串。 // LinkAddress 是一个字节切片,转换为表示链接地址的字符串。
// 它通常是一个 6 字节的 MAC 地址。 // 它通常是一个 6 字节的 MAC 地址。
type LinkAddress string // MAC地址 type LinkAddress string // MAC地址
type LinkEndpointID uint64
type TransportProtocolNumber uint32
type NetworkProtocolNumber uint32 type NetworkProtocolNumber uint32
func (a Address) String() string {
switch len(a) {
case 4:
fmt.Println(string(a))
return fmt.Sprintf("%d.%d.%d.%d", int(a[0]), int(a[1]), int(a[2]), int(a[3]))
case 16:
fmt.Println(string(a))
// Find the longest subsequence of hexadecimal zeros.
start, end := -1, -1
for i := 0; i < len(a); i += 2 {
j := i
for j < len(a) && a[j] == 0 && a[j+1] == 0 {
j += 2
}
if j > i+2 && j-i > end-start {
start, end = i, j
}
}
var b strings.Builder
for i := 0; i < len(a); i += 2 {
if i == start {
b.WriteString("::")
i = end
if end >= len(a) {
break
}
} else if i > 0 {
b.WriteByte(':')
}
v := uint16(a[i+0])<<8 | uint16(a[i+1])
if v == 0 {
b.WriteByte('0')
} else {
const digits = "0123456789abcdef"
for i := uint(3); i < 4; i-- {
if v := v >> (i * 4); v != 0 {
b.WriteByte(digits[v&0xf])
}
}
}
}
return b.String()
default:
return fmt.Sprintf("%x", []byte(a))
}
}