分片机制开始实现 大致是使用了一个map管理+小根堆自动排序实现的

This commit is contained in:
impact-eintr
2022-11-28 11:31:46 +08:00
parent 553a16f655
commit cd0d9492a0
6 changed files with 87 additions and 19 deletions

View File

@@ -48,7 +48,7 @@ type IPv4Fields struct {
TotalLength uint16
// ID is the "identification" field of an IPv4 packet.
// 标识符
// 标识符 注意这个ID对于每个IP报文来说是唯一的 它的每个分片共享这个ID来标识它们同属一个报文
ID uint16
// Flags is the "flags" field of an IPv4 packet.
@@ -294,6 +294,8 @@ func atoi[T int | int8 | int16 | int32 | int64 | uint | uint8 |uint16 | uint32](
}
func (b IPv4) String() string {
for i := range b.Payload() {
if i != int(b.PayloadLength()-1) && b.Payload()[i]^b.Payload()[i+1] != 0 {
return fmt.Sprintf(ipv4Fmt, atoi(IPVersion(b)), atoi(b.HeaderLength()), atoi(0), atoi(b.TotalLength()),
atoi(b.ID()), atoi(b.Flags()>>2), atoi((b.Flags()&2)>>1), atoi(b.Flags()&1), atoi(b.FragmentOffset()),
atoi(b.TTL()), atoi(b.Protocol()), atoi(b.Checksum()),
@@ -301,3 +303,11 @@ func (b IPv4) String() string {
b.DestinationAddress().String(),
b.Payload())
}
}
return fmt.Sprintf(ipv4Fmt, atoi(IPVersion(b)), atoi(b.HeaderLength()), atoi(0), atoi(b.TotalLength()),
atoi(b.ID()), atoi(b.Flags()>>2), atoi((b.Flags()&2)>>1), atoi(b.Flags()&1), atoi(b.FragmentOffset()),
atoi(b.TTL()), atoi(b.Protocol()), atoi(b.Checksum()),
b.SourceAddress().String(),
b.DestinationAddress().String(),
fmt.Sprintf("%v x %d", b.Payload()[0], b.PayloadLength()))
}

View File

@@ -22,16 +22,18 @@ const HighFragThreshold = 4 << 20 // 4MB
// net.ipv4.ipfrag_low_thresh for more information.
const LowFragThreshold = 3 << 20 // 3MB
// Fragmentation 分片处理器对象
type Fragmentation struct {
mu sync.Mutex
highLimit int
lowLimit int
reassemblers map[uint32]*reassembler
reassemblers map[uint32]*reassembler // IP报文hash:重组器
rList reassemblerList
size int
timeout time.Duration
}
// NewFragmentation 新建一个分片处理器
func NewFragmentation(highMemoryLimit, lowMemoryLimit int, reassemblingTimeout time.Duration) *Fragmentation {
if lowMemoryLimit >= highMemoryLimit {
lowMemoryLimit = highMemoryLimit
@@ -49,20 +51,41 @@ func NewFragmentation(highMemoryLimit, lowMemoryLimit int, reassemblingTimeout t
}
}
// Process 处理ip报文分片
func (f *Fragmentation) Process(id uint32, first, last uint16, more bool, vv buffer.VectorisedView) (buffer.VectorisedView, bool) {
log.Println("分片机制工作中", id, first, last, vv.First())
f.mu.Lock()
r, ok := f.reassemblers[id]
if ok && r.tooOld(f.timeout) {
if ok && r.tooOld(f.timeout) { // 检测一个分片是否存在超过了30s
// This is very likely to be an id-collision or someone performing a slow-rate attack.
//f.release(r)
f.release(r)
ok = false
}
if !ok {
if !ok { // 首次注册该报文的分片
r = newReassembler(id)
f.reassemblers[id] = r
f.rList.PushFront(r)
}
f.mu.Unlock()
//res, done, consumed := r.process(first, last, more, vv)
log.Printf("[%d]的分片 [%d,%d) 合并中\n", id, first, last)
r.process(first, last, more, vv)
return buffer.VectorisedView{}, false
}
func (f *Fragmentation) release(r *reassembler) {
// Before releasing a fragment we need to check if r is already marked as done.
// Otherwise, we would delete it twice.
if r.checkDoneOrMark() {
return
}
delete(f.reassemblers, r.id)
f.rList.Remove(r)
f.size -= r.size
if f.size < 0 {
log.Printf("memory counter < 0 (%d), this is an accounting bug that requires investigation", f.size)
f.size = 0
}
}

View File

@@ -1,6 +1,8 @@
package fragmentation_test
import (
"log"
"math"
"netstack/tcpip"
"netstack/tcpip/buffer"
"netstack/tcpip/header"
@@ -23,6 +25,7 @@ type testContext struct {
t *testing.T
linkEP *channel.Endpoint
s *stack.Stack
id uint16
}
func newTestContext(t *testing.T) *testContext {
@@ -55,6 +58,7 @@ func newTestContext(t *testing.T) *testContext {
t: t,
s: s,
linkEP: linkEP,
id: uint16(time.Now().Unix() % math.MaxUint16),
}
}
@@ -101,23 +105,24 @@ func TestFragmentationBase(t *testing.T) {
t.Fatalf("Case #1 Time Out\n")
}
// 一个纯粹的IP报文
v = make(buffer.View, header.IPv4MinimumSize+256)
// 一个纯粹的IP报文 Part1
pLen := ((1500 - header.EthernetMinimumSize - header.IPv4MinimumSize) >> 3) << 3
v = make(buffer.View, header.IPv4MinimumSize+pLen)
hdr := buffer.NewPrependable(header.IPv4MinimumSize)
ip := header.IPv4(hdr.Prepend(header.IPv4MinimumSize))
buf := make(buffer.View, 256)
buf := make(buffer.View, pLen)
for i := range buf {
buf[i] = 1
}
payload := buffer.NewVectorisedView(256, buf.ToVectorisedView().Views())
payload := buffer.NewVectorisedView(pLen, buf.ToVectorisedView().Views())
length := uint16(hdr.UsedLength() + payload.Size())
// ip首部编码
ip.Encode(&header.IPv4Fields{
IHL: header.IPv4MinimumSize,
TotalLength: length,
ID: uint16(2),
ID: c.id,
Flags: 0x1,
FragmentOffset: 1024,
FragmentOffset: 0,
TTL: 255,
Protocol: uint8(0x6), // tcp 伪装报文
SrcAddr: senderIPv4,
@@ -136,4 +141,29 @@ func TestFragmentationBase(t *testing.T) {
inject(stackAddr1)
// 一个纯粹的IP报文 Part2
pLen = 256
v = make(buffer.View, header.IPv4MinimumSize+pLen)
payload = buffer.NewVectorisedView(pLen, buf.ToVectorisedView().Views())
length = uint16(hdr.UsedLength() + payload.Size())
// ip首部编码
ip.Encode(&header.IPv4Fields{
IHL: header.IPv4MinimumSize,
TotalLength: length,
ID: c.id,
FragmentOffset: 1464,
TTL: 255,
Protocol: uint8(0x6), // tcp 伪装报文
SrcAddr: senderIPv4,
DstAddr: stackAddr1,
})
//ip.SetFlagsFragmentOffset()
// 计算校验和和设置校验和
ip.SetChecksum(^ip.CalculateChecksum())
copy(v, ip)
copy(v[header.IPv4MinimumSize:], payload.First())
log.Println(ip.FragmentOffset())
inject(stackAddr1)
}

View File

@@ -30,6 +30,7 @@ type hole struct {
deleted bool
}
// 重组器对象
type reassembler struct {
reassemblerEntry
id uint32
@@ -37,7 +38,7 @@ type reassembler struct {
mu sync.Mutex
holes []hole
deleted int
heap fragHeap
heap fragHeap // 小根堆用来自动排序
done bool
creationTime time.Time
}

View File

@@ -121,7 +121,6 @@ func (e *endpoint) WritePacket(r *stack.Route, hdr buffer.Prependable, payload b
func (e *endpoint) HandlePacket(r *stack.Route, vv buffer.VectorisedView) {
// 得到ip报文
h := header.IPv4(vv.First())
log.Println(h)
// 检查报文是否有效
if !h.IsValid(vv.Size()) {
return
@@ -133,7 +132,7 @@ func (e *endpoint) HandlePacket(r *stack.Route, vv buffer.VectorisedView) {
vv.TrimFront(hlen)
vv.CapLength(tlen - hlen)
// 报文重组 TODO
// 报文重组
more := (h.Flags() & header.IPv4FlagMoreFragments) != 0
// 是否需要ip重组
if more || h.FragmentOffset() != 0 {

View File

@@ -365,7 +365,12 @@ func (n *NIC) DeliverNetworkPacket(linkEP LinkEndpoint, remoteLinkAddr, localLin
return
}
src, dst := netProto.ParseAddresses(vv.First())
log.Printf("设备[%v]准备从 [%s] 向 [%s] 分发数据: %v\n", linkEP.LinkAddress(), src, dst, vv.ToView())
log.Printf("设备[%v]准备从 [%s] 向 [%s] 分发数据: %v\n", linkEP.LinkAddress(), src, dst, func() []byte {
if len(vv.ToView()) > 64 {
return vv.ToView()[:64]
}
return vv.ToView()
}())
// 根据网络协议和数据包的目的地址,找到网络端
// 然后将数据包分发给网络层
if ref := n.getRef(protocol, dst); ref != nil {