对于read write的封装还是有问题

This commit is contained in:
impact-eintr
2022-12-30 18:27:17 +08:00
parent 12c26a86f1
commit 6156ee63cd
7 changed files with 269 additions and 21 deletions

View File

@@ -34,7 +34,7 @@ func main() {
log.Fatal("Usage: ", os.Args[0], " <tap-device> <local-address/mask> <ip-address> <local-port>")
}
logger.SetFlags(logger.ETH|logger.IP)
logger.SetFlags(logger.IP)
log.SetFlags(log.Lshortfile | log.LstdFlags)
tapName := flag.Arg(0)
@@ -190,15 +190,27 @@ func main() {
log.Printf("客户端 建立连接\n\n客户端 写入数据\n")
for i := 0; i < 3; i++ {
conn.Write([]byte("Hello Netstack"))
cnt := 0
size := 1 << 10
for i := 0; i < 1; i++ {
//conn.Write([]byte("Hello Netstack"))
conn.Write(make([]byte, size))
buf := make([]byte, 1024)
n, err := conn.Read(buf)
if err != nil {
log.Println(err)
return
for {
n, err := conn.Read(buf)
if err != nil {
log.Println(err)
return
}
cnt+=n
logger.NOTICE("客户端读取", string(buf[:]))
log.Println(cnt)
if cnt == size {
logger.NOTICE("退出")
break
}
}
logger.NOTICE("客户端读取", string(buf[:n]))
}
conn.Close()
@@ -230,7 +242,8 @@ func TestServerEcho(conn *TcpConn) {
log.Println(err)
break
}
logger.NOTICE("服务端读取数据", string(buf[:n]))
_ = n
logger.NOTICE("服务端读取数据", string(buf[:]))
conn.Write(buf)
}

View File

@@ -489,14 +489,17 @@ func (e *endpoint) connect(...) {
// 开启三次握手 写入报文 ...
}
```
这个`FindRoute`就是在写入syn报文前寻找目标mac
这个`FindRoute`就是在写入syn报文前获取本机ip mac 和 目标ip 但依旧没有目标mac
``` go
// FindRoute 路由查找实现比如当tcp建立连接时会用该函数得到路由信息
// 注意仅仅包含 SrcMAC SrcIp DstIp 没有 DstMAC
func (s *Stack) FindRoute(id tcpip.NICID, localAddr, remoteAddr tcpip.Address,
netProto tcpip.NetworkProtocolNumber) (Route, *tcpip.Error) {
s.mu.RLock()
@@ -529,6 +532,7 @@ func (s *Stack) FindRoute(id tcpip.NICID, localAddr, remoteAddr tcpip.Address,
remoteAddr = ref.ep.ID().LocalAddress // 发回自己? TODO
}
// 构建一个路由 包括 目标ip 目标mac 本地ip 本地mac
r := makeRoute(netProto, ref.ep.ID().LocalAddress, remoteAddr, nic.linkEP.LinkAddress(), ref)
r.NextHop = s.routeTable[i].Gateway
logger.GetInstance().Info(logger.IP, func() {
@@ -541,4 +545,238 @@ func (s *Stack) FindRoute(id tcpip.NICID, localAddr, remoteAddr tcpip.Address,
}
```
这部分的实现其实隐藏在三次握手的过程中
``` go
// 开启三次握手
func (h *handshake) execute() *tcpip.Error {
// 是否需要拿到下一条地址
if h.ep.route.IsResolutionRequired() {
if err := h.resolveRoute(); err != nil {
return err
}
}
// ...
}
```
``` go
// 检查是否允许了地址解析 首先检查是否配置了mac缓存 然后检查目标mac是否已经存在
func (r *Route) IsResolutionRequired() bool {
return r.ref.linkCache != nil && r.RemoteLinkAddress == ""
}
```
注意如果我们的链路层设备不支持地址解析比如loopback设备tcp将会把mubiaomac设置为本地mac意为本地环回。
#### 缓存的设计
实际的地址解析逻辑在下面这段代码中
``` go
// Resolve 如有必要解决尝试解析链接地址的问题。如果地址解析需要阻塞则返回ErrWouldBlock
// 例如等待ARP回复。地址解析完成成功与否时通知Waker。
// 如果需要地址解析则返回ErrNoLinkAddress和通知通道以阻止顶级调用者。
// 地址解析完成后,通道关闭(不管成功与否)。
func (r *Route) Resolve(waker *sleep.Waker) (<-chan struct{}, *tcpip.Error) {
if !r.IsResolutionRequired() {
return nil, nil
}
nextAddr := r.NextHop
if nextAddr == "" {
// Local link address is already known.
if r.RemoteAddress == r.LocalAddress { // 发给自己
r.RemoteLinkAddress = r.LocalLinkAddress // MAC 就是自己
return nil, nil
}
nextAddr = r.RemoteAddress // 下一跳是远端机
}
// 调用地址解析协议来解析IP地址
linkAddr, ch, err := r.ref.linkCache.GetLinkAddress(r.ref.nic.ID(), nextAddr, r.LocalAddress, r.NetProto, waker)
if err != nil {
return ch, err
}
r.RemoteLinkAddress = linkAddr
return nil, nil
}
```
我们来看看这个地址解析的缓存设计
``` go
func (s *Stack) GetLinkAddress(nicid tcpip.NICID, addr, localAddr tcpip.Address,
protocol tcpip.NetworkProtocolNumber, w *sleep.Waker) (tcpip.LinkAddress, <-chan struct{}, *tcpip.Error) {
s.mu.RLock()
// 获取网卡对象
nic := s.nics[nicid]
if nic == nil {
s.mu.RUnlock()
return "", nil, tcpip.ErrUnknownNICID
}
s.mu.RUnlock()
fullAddr := tcpip.FullAddress{NIC: nicid, Addr: addr} // addr 可能是Remote IP Address
// 根据网络层协议号找到对应的地址解析协议
linkRes := s.linkAddrResolvers[protocol]
return s.linkAddrCache.get(fullAddr, linkRes, localAddr, nic.linkEP, w)
}
```
``` go
// get reports any known link address for k.
func (c *linkAddrCache) get(k tcpip.FullAddress, linkRes LinkAddressResolver,
localAddr tcpip.Address, linkEP LinkEndpoint, waker *sleep.Waker) (tcpip.LinkAddress, <-chan struct{}, *tcpip.Error) {
logger.GetInstance().Info(logger.ETH, func() {
log.Println("在arp本地缓存中寻找", k)
})
if linkRes != nil {
if addr, ok := linkRes.ResolveStaticAddress(k.Addr); ok {
return addr, nil, nil
}
}
c.mu.Lock()
defer c.mu.Unlock()
// 尝试从缓存中得到MAC地址
if entry, ok := c.cache[k]; ok {
switch s := entry.state(); s {
case expired: // 过期了
case ready:
return entry.linkAddr, nil, nil
case failed:
return "", nil, tcpip.ErrNoLinkAddress
case incomplete:
// Address resolution is still in progress.
entry.addWaker(waker)
return "", entry.done, tcpip.ErrWouldBlock
default:
panic(fmt.Sprintf("invalid cache entry state: %s", s))
}
}
if linkRes == nil {
return "", nil, tcpip.ErrNoLinkAddress
}
// Add 'incomplete' entry in the cache to mark that resolution is in progress.
e := c.makeAndAddEntry(k, "")
e.addWaker(waker)
go c.startAddressResolution(k, linkRes, localAddr, linkEP, e.done)
return "", e.done, tcpip.ErrWouldBlock
}
```
简单来说一个LRU策略的缓存如果失效了就找arp协议发送广播报文。
``` go
// LinkAddressRequest implements stack.LinkAddressResolver.
func (*protocol) LinkAddressRequest(addr, localAddr tcpip.Address, linkEP stack.LinkEndpoint) *tcpip.Error {
r := &stack.Route{
RemoteLinkAddress: broadcastMAC,
}
hdr := buffer.NewPrependable(int(linkEP.MaxHeaderLength()) + header.ARPSize)
h := header.ARP(hdr.Prepend(header.ARPSize))
h.SetIPv4OverEthernet()
h.SetOp(header.ARPRequest)
copy(h.HardwareAddressSender(), linkEP.LinkAddress())
copy(h.ProtocolAddressSender(), localAddr)
copy(h.ProtocolAddressTarget(), addr)
log.Println("arp发起广播 寻找:", addr, r)
return linkEP.WritePacket(r, hdr, buffer.VectorisedView{}, ProtocolNumber)
}
```
绑定了目标ip的主机受到这个广播报文的时候会回复一个报文内容是自己的mac地址同时更新自己的arp缓存。
## 网络层
解析过地址后我们拥有了目标mac、目标ip现在我们可以在网络层写数据了。
还是以tcp的三次握手为例检验第一次发送syn同步报文。
``` go
// tcp三次握手流程
func (h *handshake) execute() *tcpip.Error {
// 地址解析
// ...
// 如果是客户端发送 syn 报文,如果是服务端发送 syn+ack 报文
sendSynTCP(&h.ep.route, h.ep.id, h.flags, h.iss, h.ackNum, h.rcvWnd, synOpts)
}
func sendSynTCP(r *stack.Route, ...) {
err := sendTCP(r, id, buffer.VectorisedView{}, r.DefaultTTL(), flags, seq, ack, rcvWnd, options)
}
func sendTCP(r *stack.Route, ...) {
// tcp报文编码
return r.WritePacket(hdr, data, ProtocolNumber, ttl)
}
```
我们将调用Route.WritePacket来执行网路层的写入
``` go
func (r *Route) WritePacket(hdr buffer.Prependable, payload buffer.VectorisedView,
protocol tcpip.TransportProtocolNumber, ttl uint8) *tcpip.Error {
// 路由对应的IP的WritePacket
err := r.ref.ep.WritePacket(r, hdr, payload, protocol, ttl)
if err == tcpip.ErrNoRoute {
r.Stats().IP.OutgoingPacketErrors.Increment()
}
return err
}
```
我们这里查看最一般的IPV4的实现这条路由将调用ipv4的WritePacket函数
``` go
// WritePacket writes a packet to the given destination address and protocol.
// 将传输层的数据封装加上IP头并调用网卡的写入接口写入IP报文
func (e *endpoint) WritePacket(r *stack.Route, hdr buffer.Prependable, payload buffer.VectorisedView,
protocol tcpip.TransportProtocolNumber, ttl uint8) *tcpip.Error {
// 预留ip报文的空间
ip := header.IPv4(hdr.Prepend(header.IPv4MinimumSize))
length := uint16(hdr.UsedLength() + payload.Size())
id := uint32(0)
// 如果报文长度大于68
if length > header.IPv4MaximumHeaderSize+8 {
// Packets of 68 bytes or less are required by RFC 791 to not be
// fragmented, so we only assign ids to larger packets.
id = atomic.AddUint32(&ids[hashRoute(r, protocol)%buckets], 1)
}
// ip首部编码
ip.Encode(&header.IPv4Fields{
IHL: header.IPv4MinimumSize,
TotalLength: length,
ID: uint16(id),
TTL: ttl,
Protocol: uint8(protocol),
SrcAddr: r.LocalAddress,
DstAddr: r.RemoteAddress,
})
// 计算校验和和设置校验和
ip.SetChecksum(^ip.CalculateChecksum())
r.Stats().IP.PacketsSent.Increment()
// 写入网卡接口
return e.linkEP.WritePacket(r, hdr, payload, ProtocolNumber)
}
```

View File

@@ -114,8 +114,6 @@ func (e *endpoint) WritePacket(r *stack.Route, hdr buffer.Prependable, payload b
logger.GetInstance().Info(logger.IP, func() {
if payload.Size() == 0 {
log.Printf("发送 IP 报文 %d bytes", hdr.UsedLength()+payload.Size())
} else {
log.Println(header.IPv4(append(ip, payload.ToView()...)))
}
})
}

View File

@@ -129,7 +129,7 @@ func (n *NIC) addAddressLocked(protocol tcpip.NetworkProtocolNumber, addr tcpip.
// 如果不存在 就会调用arp协议发广播来定位这个ip对应的设备
if n.linkEP.Capabilities()&CapabilityResolutionRequired != 0 {
if _, ok := n.stack.linkAddrResolvers[protocol]; ok {
ref.linkCache = n.stack
ref.linkCache = n.stack // 对于loopback驱动而言 他的缓存就是nil 不开启
}
}

View File

@@ -75,9 +75,7 @@ func (r *Route) Capabilities() LinkEndpointCapabilities {
// 如果需要地址解析则返回ErrNoLinkAddress和通知通道以阻止顶级调用者。
// 地址解析完成后,通道关闭(不管成功与否)。
func (r *Route) Resolve(waker *sleep.Waker) (<-chan struct{}, *tcpip.Error) {
if !r.IsResolutionRequired() {
// Nothing to do if there is no cache (which does the resolution on cache miss) or
// link address is already known.
if !r.IsResolutionRequired() { // 没有配置地址解析
return nil, nil
}
@@ -109,9 +107,7 @@ func (r *Route) RemoveWaker(waker *sleep.Waker) {
r.ref.linkCache.RemoveWaker(r.ref.nic.ID(), nextAddr, waker)
}
// IsResolutionRequired returns true if Resolve() must be called to resolve
// the link address before the this route can be written to.
// 本地缓存有东西 而且不知道远端的MAC
// 检查是否允许了地址解析 首先检查是否配置了mac缓存 然后检查目标mac是否已经存在
func (r *Route) IsResolutionRequired() bool {
return r.ref.linkCache != nil && r.RemoteLinkAddress == ""
}
@@ -119,6 +115,7 @@ func (r *Route) IsResolutionRequired() bool {
// WritePacket writes the packet through the given route.
func (r *Route) WritePacket(hdr buffer.Prependable, payload buffer.VectorisedView,
protocol tcpip.TransportProtocolNumber, ttl uint8) *tcpip.Error {
// 路由对应的IP的WritePacket
err := r.ref.ep.WritePacket(r, hdr, payload, protocol, ttl)
if err == tcpip.ErrNoRoute {
r.Stats().IP.OutgoingPacketErrors.Increment()

View File

@@ -580,11 +580,13 @@ func (s *Stack) RemoveAddress(id tcpip.NICID, addr tcpip.Address) *tcpip.Error {
}
// FindRoute 路由查找实现比如当tcp建立连接时会用该函数得到路由信息
// 注意仅仅包含 SrcMAC SrcIp DstIp 没有 DstMAC
func (s *Stack) FindRoute(id tcpip.NICID, localAddr, remoteAddr tcpip.Address,
netProto tcpip.NetworkProtocolNumber) (Route, *tcpip.Error) {
s.mu.RLock()
defer s.mu.RUnlock()
// 遍历协议栈的路由表
for i := range s.routeTable {
if (id != 0 && id != s.routeTable[i].NIC) || // 检查是否是对应的网卡
(len(remoteAddr) != 0 && !s.routeTable[i].Match(remoteAddr)) {
@@ -609,9 +611,10 @@ func (s *Stack) FindRoute(id tcpip.NICID, localAddr, remoteAddr tcpip.Address,
if len(remoteAddr) == 0 {
// If no remote address was provided, then the route
// provided will refer to the link local address.
remoteAddr = ref.ep.ID().LocalAddress // 发回自己? TODO
remoteAddr = ref.ep.ID().LocalAddress // 本地环回
}
// 构建一个路由 包括 目标ip 目标mac 本地ip 本地mac
r := makeRoute(netProto, ref.ep.ID().LocalAddress, remoteAddr, nic.linkEP.LinkAddress(), ref)
r.NextHop = s.routeTable[i].Gateway
logger.GetInstance().Info(logger.IP, func() {

View File

@@ -377,7 +377,6 @@ func (e *endpoint) protocolListenLoop(rcvWnd seqnum.Size) *tcpip.Error {
e.newSegmentWaker.Assert() // 重新尝试获取数据
}
case wakerForNotification:
// TODO 触发其他事件
n := e.fetchNotifications()
if n&notifyClose != 0 {
return nil