perf: []byte and payload objects reuse

This commit is contained in:
lynx
2024-09-09 17:33:13 +08:00
parent 58a8b1f91b
commit b8dbb0ab97
4 changed files with 67 additions and 23 deletions

View File

@@ -52,7 +52,7 @@ func (e *Engine) addConnByID(id string) (PacketChan, error) {
return conn, nil return conn, nil
} }
peerChan := make(chan Payload, ChanSize) peerChan := make(PacketChan, ChanSize)
e.routeTable.id.Store(id, peerChan) e.routeTable.id.Store(id, peerChan)
go func() { go func() {
@@ -109,10 +109,11 @@ func (e *Engine) addConn(peerChan PacketChan, id string) {
return return
} }
buff := make([]byte, len(msg)) payload := e.payloadPool.Get()
copy(buff, msg) payload.Data = e.bufferPool.Get(len(msg))
copy(payload.Data, msg)
mr.ReleaseMsg(msg) mr.ReleaseMsg(msg)
e.devWriter <- Payload{Data: buff} e.devWriter <- payload
} }
}() }()
@@ -120,6 +121,8 @@ func (e *Engine) addConn(peerChan PacketChan, id string) {
select { select {
case payload := <-peerChan: case payload := <-peerChan:
err := mw.WriteMsg(payload.Data) err := mw.WriteMsg(payload.Data)
e.bufferPool.Put(payload.Data)
e.payloadPool.Put(payload)
if err != nil { if err != nil {
e.log.Errorf("Peer [%s] write msg error: %s", id, err) e.log.Errorf("Peer [%s] write msg error: %s", id, err)
return return

View File

@@ -4,8 +4,10 @@ import (
"context" "context"
"net/netip" "net/netip"
pool "github.com/libp2p/go-buffer-pool"
"github.com/libp2p/go-msgio" "github.com/libp2p/go-msgio"
"github.com/wlynxg/NetHive/core/route" "github.com/wlynxg/NetHive/core/route"
"github.com/wlynxg/NetHive/pkgs/xpool"
"github.com/wlynxg/NetHive/core/config" "github.com/wlynxg/NetHive/core/config"
"github.com/wlynxg/NetHive/core/device" "github.com/wlynxg/NetHive/core/device"
@@ -27,7 +29,7 @@ const (
VPNStreamProtocol = "/NetHive/vpn" VPNStreamProtocol = "/NetHive/vpn"
) )
type PacketChan chan Payload type PacketChan chan *Payload
type Engine struct { type Engine struct {
log *mlog.Logger log *mlog.Logger
@@ -48,6 +50,9 @@ type Engine struct {
devReader PacketChan devReader PacketChan
errChan chan error errChan chan error
bufferPool *pool.BufferPool
payloadPool xpool.Pool[*Payload]
routeTable struct { routeTable struct {
m xsync.Map[string, netip.Prefix] m xsync.Map[string, netip.Prefix]
id xsync.Map[string, PacketChan] id xsync.Map[string, PacketChan]
@@ -69,6 +74,11 @@ func Run(ctx context.Context, cfg *config.Config) (*Engine, error) {
e.devWriter = make(PacketChan, ChanSize) e.devWriter = make(PacketChan, ChanSize)
e.devReader = make(PacketChan, ChanSize) e.devReader = make(PacketChan, ChanSize)
e.bufferPool = &pool.BufferPool{}
e.payloadPool = xpool.New[*Payload](func() *Payload {
return &Payload{}
})
pk, err := cfg.PrivateKey.PrivKey() pk, err := cfg.PrivateKey.PrivKey()
if err != nil { if err != nil {
return nil, err return nil, err
@@ -220,10 +230,11 @@ func (e *Engine) VPNHandler(stream network.Stream) {
return return
} }
buff := make([]byte, len(msg)) payload := e.payloadPool.Get()
copy(buff, msg) payload.Data = e.bufferPool.Get(len(msg))
copy(payload.Data, msg)
mr.ReleaseMsg(msg) mr.ReleaseMsg(msg)
e.devWriter <- Payload{Data: buff} e.devWriter <- payload
} }
}() }()
@@ -231,9 +242,11 @@ func (e *Engine) VPNHandler(stream network.Stream) {
select { select {
case payload := <-peerChan: case payload := <-peerChan:
err := mw.WriteMsg(payload.Data) err := mw.WriteMsg(payload.Data)
e.bufferPool.Put(payload.Data)
e.payloadPool.Put(payload)
if err != nil { if err != nil {
e.log.Errorf("Peer [%s] write msg error: %s", id, err) e.log.Errorf("Peer [%s] write msg error: %s", id, err)
return continue
} }
} }
} }

View File

@@ -1,24 +1,27 @@
package engine package engine
import ( import (
"fmt"
"github.com/wlynxg/NetHive/core/protocol"
"net/netip" "net/netip"
"github.com/wlynxg/NetHive/core/protocol"
) )
// RoutineTUNReader loop to read packets from TUN // RoutineTUNReader loop to read packets from TUN
func (e *Engine) RoutineTUNReader() { func (e *Engine) RoutineTUNReader() {
var ( var (
buff = make([]byte, BuffSize) buff []byte
err error err error
n int n int
) )
for { for {
buff = e.bufferPool.Get(BuffSize)
n, err = e.device.Read(buff) n, err = e.device.Read(buff)
if err != nil { if err != nil {
e.errChan <- fmt.Errorf("[RoutineTUNReader]: %s", err) e.bufferPool.Put(buff)
return e.log.Warnf("[RoutineTUNReader]: %s", err)
continue
} }
ip, err := protocol.ParseIP(buff[:n]) ip, err := protocol.ParseIP(buff[:n])
if err != nil { if err != nil {
e.log.Warnf("[RoutineTUNReader] drop packet, because %s", err) e.log.Warnf("[RoutineTUNReader] drop packet, because %s", err)
@@ -30,16 +33,16 @@ func (e *Engine) RoutineTUNReader() {
continue continue
} }
payload := Payload{ payload := e.payloadPool.Get()
Src: ip.Src(), payload.Src = ip.Src()
Dst: ip.Dst(), payload.Dst = ip.Dst()
Data: make([]byte, n), payload.Data = buff[:n]
}
copy(payload.Data, buff[:n])
select { select {
case e.devReader <- payload: case e.devReader <- payload:
default: default:
e.log.Warnf("[RoutineTUNReader] drop packet: %s, because the sending queue is already full", payload.Dst) e.log.Warnf("[RoutineTUNReader] drop packet: %s, because the sending queue is already full", payload.Dst)
e.bufferPool.Put(payload.Data)
e.payloadPool.Put(payload)
} }
} }
} }
@@ -47,12 +50,15 @@ func (e *Engine) RoutineTUNReader() {
// RoutineTUNWriter loop writing packets to TUN // RoutineTUNWriter loop writing packets to TUN
func (e *Engine) RoutineTUNWriter() { func (e *Engine) RoutineTUNWriter() {
var ( var (
payload Payload payload *Payload
err error err error
) )
for payload = range e.devWriter { for payload = range e.devWriter {
_, err = e.device.Write(payload.Data) _, err = e.device.Write(payload.Data)
e.bufferPool.Put(payload.Data)
e.payloadPool.Put(payload)
if err != nil { if err != nil {
e.log.Errorf("[RoutineTUNWriter]: %s", err) e.log.Errorf("[RoutineTUNWriter]: %s", err)
e.log.Errorf("[err packet]: %v", payload.Data) e.log.Errorf("[err packet]: %v", payload.Data)
@@ -63,7 +69,7 @@ func (e *Engine) RoutineTUNWriter() {
// RoutineRouteTableWriter loop sending the data packet to the corresponding channel according to the routing table // RoutineRouteTableWriter loop sending the data packet to the corresponding channel according to the routing table
func (e *Engine) RoutineRouteTableWriter() { func (e *Engine) RoutineRouteTableWriter() {
var ( var (
payload Payload payload *Payload
ok bool ok bool
conn PacketChan conn PacketChan
) )
@@ -81,7 +87,6 @@ func (e *Engine) RoutineRouteTableWriter() {
defer e.routeTable.addr.Delete(value.Addr()) defer e.routeTable.addr.Delete(value.Addr())
e.addConn(conn, key) e.addConn(conn, key)
}() }()
} }
select { select {
case conn <- payload: case conn <- payload:
@@ -111,6 +116,8 @@ func (e *Engine) RoutineRouteTableWriter() {
case conn <- payload: case conn <- payload:
default: default:
e.log.Warnf("[RoutineRouteTableWriter] drop packet: %s, because the sending queue is already full", payload.Dst) e.log.Warnf("[RoutineRouteTableWriter] drop packet: %s, because the sending queue is already full", payload.Dst)
e.bufferPool.Put(payload.Data)
e.payloadPool.Put(payload)
} }
} }
} }

21
pkgs/xpool/xpool.go Normal file
View File

@@ -0,0 +1,21 @@
package xpool
import (
"sync"
)
type Pool[T any] struct {
pool sync.Pool
}
func New[T any](fn func() T) Pool[T] {
return Pool[T]{
pool: sync.Pool{New: func() interface{} { return fn() }},
}
}
func (p *Pool[T]) Get() T {
return p.pool.Get().(T)
}
func (p *Pool[T]) Put(x T) {
p.pool.Put(x)
}