From df8f6affa3195fade8f4dcfc25d0c02305b4473b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=BA=90=E6=96=87=E9=9B=A8?= <41315874+fumiama@users.noreply.github.com> Date: Tue, 13 May 2025 00:59:05 +0900 Subject: [PATCH] optimize: memory consumption --- go.mod | 4 +- go.sum | 8 +- gold/head/box.go | 22 ++--- gold/head/builder.go | 10 +-- gold/head/pool.go | 6 +- gold/head/unbox.go | 13 ++- gold/link/crypto.go | 9 +- gold/link/event.go | 3 +- gold/link/link.go | 16 ++-- gold/link/listen.go | 33 ++++---- gold/link/me.go | 12 +-- gold/link/recv.go | 2 +- gold/link/send.go | 26 +++--- gold/proto/data/data.go | 4 +- gold/proto/hello/hello.go | 23 +++--- gold/proto/nat/nat.go | 154 +++++++++++++++++------------------ internal/algo/crypto.go | 85 +++++++++---------- internal/algo/crypto_test.go | 6 +- internal/algo/zstd.go | 10 +-- 19 files changed, 204 insertions(+), 242 deletions(-) diff --git a/go.mod b/go.mod index 3323345..8b3d5ee 100644 --- a/go.mod +++ b/go.mod @@ -5,10 +5,10 @@ go 1.21 require ( github.com/FloatTech/ttl v0.0.0-20250224045156-012b1463287d github.com/RomiChan/syncx v0.0.0-20240418144900-b7402ffdebc7 - github.com/fumiama/blake2b-simd v0.0.0-20220412110131-4481822068bb + github.com/fumiama/blake2b-simd v0.0.0-20250228045919-a5dcaba5419a github.com/fumiama/go-base16384 v1.7.0 github.com/fumiama/go-x25519 v1.0.0 - github.com/fumiama/orbyte v0.0.0-20250414141219-63dd01e81ea2 + github.com/fumiama/orbyte v0.0.0-20250512155242-23a2b7120589 github.com/fumiama/water v0.0.0-20211231134027-da391938d6ac github.com/klauspost/compress v1.17.9 github.com/sirupsen/logrus v1.9.3 diff --git a/go.sum b/go.sum index 96190d7..b253f65 100644 --- a/go.sum +++ b/go.sum @@ -5,14 +5,14 @@ github.com/RomiChan/syncx v0.0.0-20240418144900-b7402ffdebc7/go.mod h1:vD7Ra3Q9o github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/fumiama/blake2b-simd v0.0.0-20220412110131-4481822068bb h1:Gd7gIiR68ErTWtxnl/PCycMog6IRdK1ApxrvoAy0lFY= -github.com/fumiama/blake2b-simd v0.0.0-20220412110131-4481822068bb/go.mod h1:Olmv2uLdFllRsvwhzOvG/O/Nvgzg0ViokUL4+hiaRSE= +github.com/fumiama/blake2b-simd v0.0.0-20250228045919-a5dcaba5419a h1:hhCWoHNGDATjUitm8rKwrf5xRnuxO7P6UHIusLXuqag= +github.com/fumiama/blake2b-simd v0.0.0-20250228045919-a5dcaba5419a/go.mod h1:Olmv2uLdFllRsvwhzOvG/O/Nvgzg0ViokUL4+hiaRSE= github.com/fumiama/go-base16384 v1.7.0 h1:6fep7XPQWxRlh4Hu+KsdH+6+YdUp+w6CwRXtMWSsXCA= github.com/fumiama/go-base16384 v1.7.0/go.mod h1:OEn+947GV5gsbTAnyuUW/SrfxJYUdYupSIQXOuGOcXM= github.com/fumiama/go-x25519 v1.0.0 h1:hiGg9EhseVmGCc8T1jECVkj8Keu/aJ1ZK05RM8Vuavo= github.com/fumiama/go-x25519 v1.0.0/go.mod h1:8VOhfyGZzw4IUs4nCjQFqW9cA3V/QpSCtP3fo2dLNg4= -github.com/fumiama/orbyte v0.0.0-20250414141219-63dd01e81ea2 h1:A0jvi8f+FHNtPJBKujPYshazTmujSE/trf5xkmrJvSQ= -github.com/fumiama/orbyte v0.0.0-20250414141219-63dd01e81ea2/go.mod h1:FOjdw7KdCbK2eH3gRPhwFNCoXKpu9sN5vPH4El/8e0c= +github.com/fumiama/orbyte v0.0.0-20250512155242-23a2b7120589 h1:ifo33HcA8HFYWaTe4fsJjozTUaOBlA0dFa1BcTZDEHs= +github.com/fumiama/orbyte v0.0.0-20250512155242-23a2b7120589/go.mod h1:FOjdw7KdCbK2eH3gRPhwFNCoXKpu9sN5vPH4El/8e0c= github.com/fumiama/water v0.0.0-20211231134027-da391938d6ac h1:A/5A0rODsg+EQHH61Ew5mMUtDpRXaSNqHhPvW+fN4C4= github.com/fumiama/water v0.0.0-20211231134027-da391938d6ac/go.mod h1:BBnNY9PwK+UUn4trAU+H0qsMEypm7+3Bj1bVFuJItlo= github.com/fumiama/wintun v0.0.0-20211229152851-8bc97c8034c0 h1:WfrSFlIlCAtg6Rt2IGna0HhJYSDE45YVHiYqO4wwsEw= diff --git a/gold/head/box.go b/gold/head/box.go index c718cbe..6b49579 100644 --- a/gold/head/box.go +++ b/gold/head/box.go @@ -59,13 +59,10 @@ func (p *Packet) WriteHeaderTo(buf *bytes.Buffer) { buf.Write((*[PacketHeadNoCRCLen]byte)( (unsafe.Pointer)(p), )[:]) - b := pbuf.NewBytes(buf.Len()) - b.V(func(b []byte) { - copy(b, buf.Bytes()) - ClearTTL(b) - p.md5h8 = algo.MD5Hash8(b) - }) - b.ManualDestroy() + b := make([]byte, buf.Len()) + copy(b, buf.Bytes()) + ClearTTL(b) + p.md5h8 = algo.MD5Hash8(b) _ = binary.Write(buf, binary.LittleEndian, p.md5h8) return } @@ -79,13 +76,10 @@ func (p *Packet) WriteHeaderTo(buf *bytes.Buffer) { w.Write(p.src[:]) w.Write(p.dst[:]) w.P(func(buf *pbuf.Buffer) { - b := pbuf.NewBytes(buf.Len()) - b.V(func(b []byte) { - copy(b, buf.Bytes()) - ClearTTL(b) - p.md5h8 = algo.MD5Hash8(b) - }) - b.ManualDestroy() + b := make([]byte, buf.Len()) + copy(b, buf.Bytes()) + ClearTTL(b) + p.md5h8 = algo.MD5Hash8(b) }) w.WriteUInt64(p.md5h8) w.P(func(b *pbuf.Buffer) { diff --git a/gold/head/builder.go b/gold/head/builder.go index 5a6d376..9e88fed 100644 --- a/gold/head/builder.go +++ b/gold/head/builder.go @@ -87,11 +87,10 @@ func (pb *DataBuilder) Zstd() *DataBuilder { return pb.p(func(ub *PacketBuf) { data := algo.EncodeZstd(ub.Bytes()) ub.Reset() - data.V(func(b []byte) { ub.Write(b) }) + ub.Write(data) if config.ShowDebugLog { logrus.Debugln(file.Header(), strconv.FormatUint(ub.DAT.md5h8, 16), "data after zstd", file.ToLimitHexString(ub.Bytes(), 64)) } - data.ManualDestroy() }) } @@ -125,8 +124,7 @@ func (pb *DataBuilder) Seal(aead cipher.AEAD, teatyp uint8, additional uint16) * w.P(func(b *pbuf.Buffer) { data := algo.EncodeAEAD(aead, additional, b.Bytes()) ub.Reset() - data.V(func(b []byte) { ub.Write(b) }) - data.ManualDestroy() + ub.Write(data) }) w.Destroy() })) @@ -204,7 +202,7 @@ func (pb *PacketBuilder) Split(mtu int, nofrag bool) (pbs []PacketBytes) { pbs = []PacketBytes{ pbuf.BufferItemToBytes((*PacketItem)( pb.copy().noFrag(nofrag).hasMore(false).offset(0), - )), + )).Ignore(), } return } @@ -226,7 +224,7 @@ func (pb *PacketBuilder) Split(mtu int, nofrag bool) (pbs []PacketBytes) { } pbs[i] = pbuf.BufferItemToBytes((*PacketItem)( pb.copy().offset(uint16(i*datalim)), - )).Slice(a, b) + )).Ignore().Slice(a, b) } }) return diff --git a/gold/head/pool.go b/gold/head/pool.go index b0880c7..c7909cd 100644 --- a/gold/head/pool.go +++ b/gold/head/pool.go @@ -12,6 +12,10 @@ import ( var packetPool = pbuf.NewBufferPool[Packet]() func init() { + packetPool.LimitInput(256) + packetPool.LimitOutput(256) + pbuf.LimitInput(256) + pbuf.LimitOutput(256) if config.ShowDebugLog { go status() } @@ -23,7 +27,7 @@ func selectPacket(buf ...byte) *PacketItem { } func status() { - for range time.NewTicker(time.Minute).C { + for range time.NewTicker(time.Second).C { out, in := packetPool.CountItems() logrus.Infoln(file.Header(), "packet outside:", out, "inside:", in) out, in = pbuf.CountItems() diff --git a/gold/head/unbox.go b/gold/head/unbox.go index a01807f..746f360 100644 --- a/gold/head/unbox.go +++ b/gold/head/unbox.go @@ -55,13 +55,10 @@ func ParsePacketHeader(data []byte) (pbytes PacketBytes, err error) { return } var crc uint64 - b := pbuf.NewBytes(int(PacketHeadNoCRCLen)) - b.V(func(b []byte) { - copy(b, data[:PacketHeadNoCRCLen]) - ClearTTL(b) - crc = algo.MD5Hash8(b) - }) - b.ManualDestroy() + var b [PacketHeadNoCRCLen]byte + copy(b[:], data[:PacketHeadNoCRCLen]) + ClearTTL(b[:]) + crc = algo.MD5Hash8(b[:]) if crc != pb.DAT.md5h8 { err = ErrBadCRCChecksum if config.ShowDebugLog { @@ -85,7 +82,7 @@ func ParsePacketHeader(data []byte) (pbytes PacketBytes, err error) { p.ManualDestroy() return } - pbytes = pbuf.BufferItemToBytes(p) + pbytes = pbuf.BufferItemToBytes(p).Ignore() return } diff --git a/gold/link/crypto.go b/gold/link/crypto.go index 401a650..4e77982 100644 --- a/gold/link/crypto.go +++ b/gold/link/crypto.go @@ -3,7 +3,6 @@ package link import ( "encoding/hex" - "github.com/fumiama/orbyte/pbuf" "github.com/sirupsen/logrus" "github.com/fumiama/WireGold/config" @@ -19,7 +18,7 @@ func (l *Link) randkeyidx() uint8 { } // decode by aead and put b into pool -func (l *Link) decode(teatype uint8, additional uint16, b []byte) (db pbuf.Bytes, err error) { +func (l *Link) decode(teatype uint8, additional uint16, b []byte) (db []byte, err error) { if len(b) == 0 || teatype >= 32 { return } @@ -33,7 +32,9 @@ func (l *Link) decode(teatype uint8, additional uint16, b []byte) (db pbuf.Bytes } logrus.Debugln(file.Header(), "copy plain text", hex.EncodeToString(b[:n]), endl) } - return pbuf.ParseBytes(b...).Copy(), nil + db = make([]byte, len(b)) + copy(db, b) + return } aead := l.keys[teatype] if aead == nil { @@ -43,7 +44,7 @@ func (l *Link) decode(teatype uint8, additional uint16, b []byte) (db pbuf.Bytes } // xorenc 按 8 字节, 以初始 m.mask 循环异或编码 data -func (m *Me) xorenc(data []byte, seq uint32) pbuf.Bytes { +func (m *Me) xorenc(data []byte, seq uint32) []byte { return algo.EncodeXOR(data, m.mask, seq) } diff --git a/gold/link/event.go b/gold/link/event.go index deb4f07..9c263f5 100644 --- a/gold/link/event.go +++ b/gold/link/event.go @@ -3,13 +3,12 @@ package link import ( "github.com/RomiChan/syncx" "github.com/fumiama/WireGold/gold/head" - "github.com/fumiama/orbyte/pbuf" ) // 事件分发器 var dispachers syncx.Map[uint8, Dispacher] -type Dispacher func(header *head.Packet, peer *Link, data pbuf.Bytes) +type Dispacher func(header *head.Packet, peer *Link, data []byte) // RegisterDispacher of proto func RegisterDispacher(p uint8, d Dispacher) (actual Dispacher, hasexist bool) { diff --git a/gold/link/link.go b/gold/link/link.go index df0b61a..d804d9e 100644 --- a/gold/link/link.go +++ b/gold/link/link.go @@ -12,7 +12,6 @@ import ( "github.com/fumiama/WireGold/gold/p2p" "github.com/fumiama/WireGold/internal/bin" base14 "github.com/fumiama/go-base16384" - "github.com/fumiama/orbyte/pbuf" "github.com/sirupsen/logrus" ) @@ -70,25 +69,24 @@ func (m *Me) Connect(peer string) (*Link, error) { return nil, ErrPerrNotExist } -func (l *Link) ToLower(header *head.Packet, data pbuf.Bytes) { +func (l *Link) ToLower(header *head.Packet, data []byte) { if l.pipe != nil { + d := make([]byte, len(data)) + copy(d, data) l.pipe <- LinkData{ H: *header, - D: data.Copy().Trans(), + D: d, } if config.ShowDebugLog { logrus.Debugln("[listen] deliver to pipe of", l.peerip) } return } - var err error - data.V(func(b []byte) { - _, err = l.me.nic.Write(b) - }) + _, err := l.me.nic.Write(data) if err != nil { - logrus.Errorln("[listen] deliver", data.Len(), "bytes data to nic err:", err) + logrus.Errorln("[listen] deliver", len(data), "bytes data to nic err:", err) } else if config.ShowDebugLog { - logrus.Debugln("[listen] deliver", data.Len(), "bytes data to nic") + logrus.Debugln("[listen] deliver", len(data), "bytes data to nic") } } diff --git a/gold/link/listen.go b/gold/link/listen.go index c5130e6..a47a19f 100644 --- a/gold/link/listen.go +++ b/gold/link/listen.go @@ -32,11 +32,17 @@ func (m *Me) runworkers() { m.jobs = make([]chan job, ncpu) for i := 0; i < ncpu; i++ { m.jobs[i] = make(chan job, 4096) - go func(jobs <-chan job) { + go func(i int, jobs <-chan job) { for jb := range jobs { + if config.ShowDebugLog { + logrus.Debugln("[listen] job thread", i, "call waitordispatch") + } m.waitordispatch(jb.addr, jb.buf, jb.n, jb.fil) + if config.ShowDebugLog { + logrus.Debugln("[listen] job thread", i, "fin waitordispatch") + } } - }(m.jobs[i]) + }(i, m.jobs[i]) } } @@ -73,9 +79,9 @@ func (m *Me) listen() (conn p2p.Conn, err error) { fil *uintptr ) if idx < 0 { - lbf = pbuf.NewBytes(lstnbufgragsz) + lbf = pbuf.NewLargeBytes(lstnbufgragsz) } else { - lbf = pbuf.ParseBytes(bufs[idx*lstnbufgragsz : (idx+1)*lstnbufgragsz]...) + lbf = pbuf.ParseBytes(bufs[idx*lstnbufgragsz : (idx+1)*lstnbufgragsz : (idx+1)*lstnbufgragsz]...).Ignore() fil = &fils[idx] } @@ -194,29 +200,23 @@ func (m *Me) dispatch(header *head.Packet, body []byte, addr p2p.EndPoint) { } return } - if data.Len() < 8 { + if len(data) < 8 { if config.ShowDebugLog { - logrus.Debugln("[listen] drop invalid data len packet key idx:", header.CipherIndex(), "addt:", addt, "len", data.Len()) + logrus.Debugln("[listen] drop invalid data len packet key idx:", header.CipherIndex(), "addt:", addt, "len", len(data)) } return } ok := false - data.V(func(b []byte) { - ok = algo.IsVaildBlake2bHash8(header.PreCRC64(), b) - }) + ok = algo.IsVaildBlake2bHash8(header.PreCRC64(), data) if !ok { if config.ShowDebugLog { logrus.Debugln("[listen] drop invalid hash packet") } return } - data = data.SliceFrom(8) + data = data[8:] if p.usezstd { - data.V(func(b []byte) { - old := data - data, err = algo.DecodeZstd(b) // skip hash - old.ManualDestroy() - }) + data, err = algo.DecodeZstd(data) // skip hash if err != nil { if config.ShowDebugLog { logrus.Debugln("[listen] drop invalid zstd packet:", err) @@ -224,7 +224,7 @@ func (m *Me) dispatch(header *head.Packet, body []byte, addr p2p.EndPoint) { return } if config.ShowDebugLog { - logrus.Debugln("[listen] zstd decoded len:", data.Len()) + logrus.Debugln("[listen] zstd decoded len:", len(data)) } } fn, ok := GetDispacher(header.Proto.Proto()) @@ -233,5 +233,4 @@ func (m *Me) dispatch(header *head.Packet, body []byte, addr p2p.EndPoint) { return } fn(header, p, data) - data.ManualDestroy() } diff --git a/gold/link/me.go b/gold/link/me.go index 2652d79..1d5fdda 100644 --- a/gold/link/me.go +++ b/gold/link/me.go @@ -10,7 +10,6 @@ import ( "time" "github.com/FloatTech/ttl" - "github.com/fumiama/orbyte/pbuf" "github.com/fumiama/water/waterutil" "github.com/sirupsen/logrus" @@ -316,15 +315,6 @@ func (m *Me) sendAllSameDst(packet []byte) (n int) { logrus.Warnln("[me] drop packet to", dst.String()+":"+strconv.Itoa(int(m.DstPort())), ": nil nexthop") return } - pcp := pbuf.NewBytes(len(packet)) - pcp.V(func(b []byte) { - copy(b, packet) - }) - go func() { - pcp.V(func(b []byte) { - lnk.WritePacket(head.ProtoData, b, lnk.me.ttl) - }) - pcp.ManualDestroy() - }() + lnk.WritePacket(head.ProtoData, packet, lnk.me.ttl) return } diff --git a/gold/link/recv.go b/gold/link/recv.go index 93f67ab..457fe11 100644 --- a/gold/link/recv.go +++ b/gold/link/recv.go @@ -123,7 +123,7 @@ func (m *Me) wait(data []byte, addr p2p.EndPoint) (h head.PacketBytes) { logrus.Warnln("[recv] transfer drop packet: zero ttl") return } - go lnk.write2peer(pbuf.ParseBytes(data...).Copy(), seq) + go lnk.write2peer(pbuf.ParseBytes(data...).Ignore().Copy(), seq) if config.ShowDebugLog { logrus.Debugln("[listen] trans", len(data), "bytes packet to", p.Dst().String()+":"+strconv.Itoa(int(p.DstPort))) } diff --git a/gold/link/send.go b/gold/link/send.go index 149d07f..c683121 100644 --- a/gold/link/send.go +++ b/gold/link/send.go @@ -110,24 +110,24 @@ func (l *Link) write2peer1(b pbuf.Bytes, seq uint32) (err error) { } logrus.Debugln("[send] crc seq", fmt.Sprintf("%08x", seq), "raw data bytes", hex.EncodeToString(data[:bound]), endl) } - b = l.me.xorenc(data, seq) + b = pbuf.ParseBytes(l.me.xorenc(data, seq)...).Ignore() isnewb = true - if config.ShowDebugLog { - bound := 64 - endl := "..." - if b.Len() < bound { - bound = b.Len() - endl = "." - } - b.V(func(b []byte) { - logrus.Debugln("[send] crc seq", fmt.Sprintf("%08x", seq), "xored data bytes", hex.EncodeToString(b[:bound]), endl) - }) - } }) + if config.ShowDebugLog { + bound := 64 + endl := "..." + if b.Len() < bound { + bound = b.Len() + endl = "." + } + b.V(func(b []byte) { + logrus.Debugln("[send] crc seq", fmt.Sprintf("%08x", seq), "xored data bytes", hex.EncodeToString(b[:bound]), endl) + }) + } if l.me.base14 { b.V(func(data []byte) { old := b - b = pbuf.ParseBytes(base14.Encode(data)...) + b = pbuf.ParseBytes(base14.Encode(data)...).Ignore() if isnewb { old.ManualDestroy() } diff --git a/gold/proto/data/data.go b/gold/proto/data/data.go index e7f17e9..ca94d77 100644 --- a/gold/proto/data/data.go +++ b/gold/proto/data/data.go @@ -1,14 +1,12 @@ package data import ( - "github.com/fumiama/orbyte/pbuf" - "github.com/fumiama/WireGold/gold/head" "github.com/fumiama/WireGold/gold/link" ) func init() { - link.RegisterDispacher(head.ProtoData, func(header *head.Packet, peer *link.Link, data pbuf.Bytes) { + link.RegisterDispacher(head.ProtoData, func(header *head.Packet, peer *link.Link, data []byte) { peer.ToLower(header, data) }) } diff --git a/gold/proto/hello/hello.go b/gold/proto/hello/hello.go index 1e1cd10..2679e6b 100644 --- a/gold/proto/hello/hello.go +++ b/gold/proto/hello/hello.go @@ -1,7 +1,6 @@ package hello import ( - "github.com/fumiama/orbyte/pbuf" "github.com/sirupsen/logrus" "github.com/fumiama/WireGold/gold/head" @@ -10,17 +9,15 @@ import ( ) func init() { - link.RegisterDispacher(head.ProtoHello, func(_ *head.Packet, peer *link.Link, data pbuf.Bytes) { - data.V(func(b []byte) { - switch { - case len(b) == 0: - logrus.Warnln(file.Header(), "recv old packet, do nothing") - case b[0] == byte(head.HelloPing): - go peer.WritePacket(head.ProtoHello, []byte{byte(head.HelloPong)}, peer.Me().TTL()) - logrus.Infoln(file.Header(), "recv, send ack") - default: - logrus.Infoln(file.Header(), "recv ack, do nothing") - } - }) + link.RegisterDispacher(head.ProtoHello, func(_ *head.Packet, peer *link.Link, data []byte) { + switch { + case len(data) == 0: + logrus.Warnln(file.Header(), "recv old packet, do nothing") + case data[0] == byte(head.HelloPing): + go peer.WritePacket(head.ProtoHello, []byte{byte(head.HelloPong)}, peer.Me().TTL()) + logrus.Infoln(file.Header(), "recv, send ack") + default: + logrus.Infoln(file.Header(), "recv ack, do nothing") + } }) } diff --git a/gold/proto/nat/nat.go b/gold/proto/nat/nat.go index 8996763..27fd805 100644 --- a/gold/proto/nat/nat.go +++ b/gold/proto/nat/nat.go @@ -17,97 +17,93 @@ import ( func init() { // 收到通告包的处理 - link.RegisterDispacher(head.ProtoNotify, func(_ *head.Packet, peer *link.Link, data pbuf.Bytes) { - data.V(func(b []byte) { - // 1. Data 解包 - // ---- 使用 head.Notify 解释 packet - notify := make(head.Notify, 32) - err := json.Unmarshal(b, ¬ify) - if err != nil { - logrus.Errorln(file.Header(), "notify json unmarshal err:", err) - return + link.RegisterDispacher(head.ProtoNotify, func(_ *head.Packet, peer *link.Link, data []byte) { + // 1. Data 解包 + // ---- 使用 head.Notify 解释 packet + notify := make(head.Notify, 32) + err := json.Unmarshal(data, ¬ify) + if err != nil { + logrus.Errorln(file.Header(), "notify json unmarshal err:", err) + return + } + // 2. endpoint注册 + // ---- 遍历 Notify,注册对方的 endpoint 到 + // ---- connections,注意使用读写锁connmapmu + for ps, ep := range notify { + nw, epstr := ep[0], ep[1] + if nw != peer.Me().EndPoint().Network() { + logrus.Warnln(file.Header(), "ignore different network notify", nw, "addr", epstr) + continue } - // 2. endpoint注册 - // ---- 遍历 Notify,注册对方的 endpoint 到 - // ---- connections,注意使用读写锁connmapmu - for ps, ep := range notify { - nw, epstr := ep[0], ep[1] - if nw != peer.Me().EndPoint().Network() { - logrus.Warnln(file.Header(), "ignore different network notify", nw, "addr", epstr) + addr, err := p2p.NewEndPoint(nw, epstr, peer.Me().NetworkConfigs()...) + if err == nil { + p, ok := peer.Me().IsInPeer(ps) + if ok { + if bin.IsNilInterface(p.EndPoint()) || !p.EndPoint().Euqal(addr) { + p.SetEndPoint(addr) + logrus.Infoln(file.Header(), "notify set ep of peer", ps, "to", ep) + } continue } - addr, err := p2p.NewEndPoint(nw, epstr, peer.Me().NetworkConfigs()...) - if err == nil { - p, ok := peer.Me().IsInPeer(ps) - if ok { - if bin.IsNilInterface(p.EndPoint()) || !p.EndPoint().Euqal(addr) { - p.SetEndPoint(addr) - logrus.Infoln(file.Header(), "notify set ep of peer", ps, "to", ep) - } - continue - } - } - if config.ShowDebugLog { - logrus.Debugln(file.Header(), "notify drop invalid peer:", ps, "ep:", ep) - } } - }) + if config.ShowDebugLog { + logrus.Debugln(file.Header(), "notify drop invalid peer:", ps, "ep:", ep) + } + } }) // 收到询问包的处理 - link.RegisterDispacher(head.ProtoQuery, func(_ *head.Packet, peer *link.Link, data pbuf.Bytes) { - data.V(func(b []byte) { - // 完成data解包与notify分发 + link.RegisterDispacher(head.ProtoQuery, func(_ *head.Packet, peer *link.Link, data []byte) { + // 完成data解包与notify分发 - // 1. Data 解包 - // ---- 使用 head.Query 解释 packet - // ---- 根据 Query 确定需要封装的 Notify - var peers head.Query - err := json.Unmarshal(b, &peers) - if err != nil { - logrus.Errorln(file.Header(), "query json unmarshal err:", err) - return - } + // 1. Data 解包 + // ---- 使用 head.Query 解释 packet + // ---- 根据 Query 确定需要封装的 Notify + var peers head.Query + err := json.Unmarshal(data, &peers) + if err != nil { + logrus.Errorln(file.Header(), "query json unmarshal err:", err) + return + } - if peer == nil || peer.Me() == nil { - logrus.Errorln(file.Header(), "nil link/me") - return - } + if peer == nil || peer.Me() == nil { + logrus.Errorln(file.Header(), "nil link/me") + return + } - // 2. notify分发 - // ---- 封装 Notify 到 新的 packet - // ---- 发送到对方 - notify := make(head.Notify, len(peers)) - for _, p := range peers { - lnk, ok := peer.Me().IsInPeer(p) - eps := "" - if peer.Me().EndPoint().Network() == "udp" { // udp has real p2p - if bin.IsNilInterface(lnk.EndPoint()) { - continue - } - eps = lnk.EndPoint().String() - } - if eps == "" { - eps = peer.RawEndPoint() // use registered ep only - } - if eps == "" { + // 2. notify分发 + // ---- 封装 Notify 到 新的 packet + // ---- 发送到对方 + notify := make(head.Notify, len(peers)) + for _, p := range peers { + lnk, ok := peer.Me().IsInPeer(p) + eps := "" + if peer.Me().EndPoint().Network() == "udp" { // udp has real p2p + if bin.IsNilInterface(lnk.EndPoint()) { continue } - if ok && bin.IsNonNilInterface(lnk.EndPoint()) { - notify[p] = [2]string{ - lnk.EndPoint().Network(), - eps, - } + eps = lnk.EndPoint().String() + } + if eps == "" { + eps = peer.RawEndPoint() // use registered ep only + } + if eps == "" { + continue + } + if ok && bin.IsNonNilInterface(lnk.EndPoint()) { + notify[p] = [2]string{ + lnk.EndPoint().Network(), + eps, } } - if len(notify) > 0 { - logrus.Infoln(file.Header(), "query wrap", len(notify), "notify") - w := bin.SelectWriter() - _ = json.NewEncoder(w).Encode(¬ify) - w.P(func(b *pbuf.Buffer) { - peer.WritePacket(head.ProtoNotify, b.Bytes(), peer.Me().TTL()) - }) - w.Destroy() - } - }) + } + if len(notify) > 0 { + logrus.Infoln(file.Header(), "query wrap", len(notify), "notify") + w := bin.SelectWriter() + _ = json.NewEncoder(w).Encode(¬ify) + w.P(func(b *pbuf.Buffer) { + peer.WritePacket(head.ProtoNotify, b.Bytes(), peer.Me().TTL()) + }) + w.Destroy() + } }) } diff --git a/internal/algo/crypto.go b/internal/algo/crypto.go index 300ff75..1593b02 100644 --- a/internal/algo/crypto.go +++ b/internal/algo/crypto.go @@ -5,58 +5,51 @@ import ( "crypto/rand" "encoding/binary" "errors" - - "github.com/fumiama/orbyte/pbuf" ) var ( ErrCipherTextTooShort = errors.New("ciphertext too short") ) -func EncodeAEAD(aead cipher.AEAD, additional uint16, b []byte) pbuf.Bytes { +func EncodeAEAD(aead cipher.AEAD, additional uint16, b []byte) []byte { nsz := aead.NonceSize() // Accocate capacity for all the stuffs. - buf := pbuf.NewBytes(2 + nsz + len(b) + aead.Overhead()) + buf := make([]byte, 2+nsz+len(b)+aead.Overhead()) n := 0 - buf.V(func(buf []byte) { - binary.LittleEndian.PutUint16(buf[:2], additional) - nonce := buf[2 : 2+nsz] - // Select a random nonce - _, err := rand.Read(nonce) - if err != nil { - panic(err) - } - // Encrypt the message and append the ciphertext to the nonce. - eb := aead.Seal(nonce[nsz:nsz], nonce, b, buf[:2]) - n = len(eb) - }) - return buf.Slice(2, 2+nsz+n) + binary.LittleEndian.PutUint16(buf[:2], additional) + nonce := buf[2 : 2+nsz] + // Select a random nonce + _, err := rand.Read(nonce) + if err != nil { + panic(err) + } + // Encrypt the message and append the ciphertext to the nonce. + eb := aead.Seal(nonce[nsz:nsz], nonce, b, buf[:2]) + n = len(eb) + return buf[2 : 2+nsz+n] } -func DecodeAEAD(aead cipher.AEAD, additional uint16, b []byte) (data pbuf.Bytes, err error) { +func DecodeAEAD(aead cipher.AEAD, additional uint16, b []byte) (data []byte, err error) { nsz := aead.NonceSize() if len(b) < nsz { - return pbuf.Bytes{}, ErrCipherTextTooShort + return nil, ErrCipherTextTooShort } // Split nonce and ciphertext. nonce, ciphertext := b[:nsz], b[nsz:] if len(ciphertext) == 0 { - return pbuf.Bytes{}, nil + return nil, nil } // Decrypt the message and check it wasn't tampered with. var buf [2]byte binary.LittleEndian.PutUint16(buf[:], additional) - data = pbuf.NewBytes(len(ciphertext)) + data = make([]byte, len(ciphertext)) n := 0 - data.V(func(b []byte) { - var d []byte - d, err = aead.Open(b[:0], nonce, ciphertext, buf[:]) - n = len(d) - }) + d, err := aead.Open(data[:0], nonce, ciphertext, buf[:]) + n = len(d) if err != nil { return } - return data.SliceTo(n), nil + return data[:n], nil } func EncodeXORLen(datalen int) int { @@ -65,29 +58,27 @@ func EncodeXORLen(datalen int) int { } // EncodeXOR 按 8 字节, 以初始 mask 循环异或编码 data -func EncodeXOR(data []byte, mask uint64, seq uint32) pbuf.Bytes { +func EncodeXOR(data []byte, mask uint64, seq uint32) []byte { batchsz := len(data) / 8 remain := len(data) % 8 sum := mask - newdat := pbuf.NewBytes(EncodeXORLen(len(data))) - newdat.V(func(buf []byte) { - binary.LittleEndian.PutUint32(buf[:4], seq) - _, _ = rand.Read(buf[4:8]) // seqrand - sum ^= binary.LittleEndian.Uint64(buf[:8]) // init from seqrand - binary.LittleEndian.PutUint64(buf[:8], sum) - for i := 0; i < batchsz; i++ { // range on batch data - a := i * 8 - b := (i + 1) * 8 - sum ^= binary.LittleEndian.Uint64(data[a:b]) - binary.LittleEndian.PutUint64(buf[a+8:b+8], sum) - } - p := batchsz * 8 - copy(buf[8+p:], data[p:]) - buf[newdat.Len()-1] = byte(remain) - sum ^= binary.LittleEndian.Uint64(buf[8+p:]) - binary.LittleEndian.PutUint64(buf[8+p:], sum) - }) - return newdat + buf := make([]byte, EncodeXORLen(len(data))) + binary.LittleEndian.PutUint32(buf[:4], seq) + _, _ = rand.Read(buf[4:8]) // seqrand + sum ^= binary.LittleEndian.Uint64(buf[:8]) // init from seqrand + binary.LittleEndian.PutUint64(buf[:8], sum) + for i := 0; i < batchsz; i++ { // range on batch data + a := i * 8 + b := (i + 1) * 8 + sum ^= binary.LittleEndian.Uint64(data[a:b]) + binary.LittleEndian.PutUint64(buf[a+8:b+8], sum) + } + p := batchsz * 8 + copy(buf[8+p:], data[p:]) + buf[len(buf)-1] = byte(remain) + sum ^= binary.LittleEndian.Uint64(buf[8+p:]) + binary.LittleEndian.PutUint64(buf[8+p:], sum) + return buf } // DecodeXOR 按 8 字节, 以初始 mask 循环异或解码 data, diff --git a/internal/algo/crypto_test.go b/internal/algo/crypto_test.go index 317a9ae..ef5c26d 100644 --- a/internal/algo/crypto_test.go +++ b/internal/algo/crypto_test.go @@ -25,7 +25,7 @@ func TestXOR(t *testing.T) { if err != nil { t.Fatal(err) } - seq, dec := DecodeXOR(EncodeXOR(r1.Bytes(), mask, uint32(i)).Trans(), mask) + seq, dec := DecodeXOR(EncodeXOR(r1.Bytes(), mask, uint32(i)), mask) if !bytes.Equal(dec, r2.Bytes()) { t.Fatal("unexpected xor at", i, "except", hex.EncodeToString(r2.Bytes()), "got", hex.EncodeToString(dec)) } @@ -51,11 +51,11 @@ func TestXChacha20(t *testing.T) { t.Fatal(err) } for i := 0; i < 4096; i++ { - db, err := DecodeAEAD(aead, uint16(i), EncodeAEAD(aead, uint16(i), data[:i]).Trans()) + db, err := DecodeAEAD(aead, uint16(i), EncodeAEAD(aead, uint16(i), data[:i])) if err != nil { t.Fatal(err) } - if !bytes.Equal(db.Trans(), data[:i]) { + if !bytes.Equal(db, data[:i]) { t.Fatal("unexpected preshared at idx(len)", i, "addt", uint16(i)) } } diff --git a/internal/algo/zstd.go b/internal/algo/zstd.go index 47ceea7..42eaebf 100644 --- a/internal/algo/zstd.go +++ b/internal/algo/zstd.go @@ -9,7 +9,7 @@ import ( "github.com/klauspost/compress/zstd" ) -func EncodeZstd(data []byte) pbuf.Bytes { +func EncodeZstd(data []byte) []byte { return bin.SelectWriter().P(func(w *pbuf.Buffer) { enc, err := zstd.NewWriter(w, zstd.WithEncoderLevel(zstd.SpeedFastest)) if err != nil { @@ -23,19 +23,19 @@ func EncodeZstd(data []byte) pbuf.Bytes { if err != nil { panic(err) } - }).ToBytes() + }).ToBytes().Copy().Ignore().Trans() } -func DecodeZstd(data []byte) (b pbuf.Bytes, err error) { +func DecodeZstd(data []byte) (b []byte, err error) { dec, err := zstd.NewReader(bytes.NewReader(data)) if err != nil { - return pbuf.Bytes{}, err + return } b = bin.SelectWriter().P(func(w *pbuf.Buffer) { _, err = io.Copy(w, dec) dec.Close() - }).ToBytes() + }).ToBytes().Copy().Ignore().Trans() return }