Fix usages of readmsg_x

This commit is contained in:
世界
2025-07-08 15:44:35 +08:00
committed by wwqgtxx
parent a0881ada32
commit 4c81c8a62a
4 changed files with 73 additions and 82 deletions

View File

@@ -74,12 +74,29 @@ const (
// dispatch options but the one that is supported by all underlying FD // dispatch options but the one that is supported by all underlying FD
// types. // types.
Readv PacketDispatchMode = iota Readv PacketDispatchMode = iota
// RecvMMsg enables use of recvmmsg() syscall instead of readv() to
// read inbound packets. This reduces # of syscalls needed to process
// packets.
//
// NOTE: recvmmsg() is only supported for sockets, so if the underlying
// FD is not a socket then the code will still fall back to the readv()
// path.
RecvMMsg
// PacketMMap enables use of PACKET_RX_RING to receive packets from the
// NIC. PacketMMap requires that the underlying FD be an AF_PACKET. The
// primary use-case for this is runsc which uses an AF_PACKET FD to
// receive packets from the veth device.
PacketMMap
) )
func (p PacketDispatchMode) String() string { func (p PacketDispatchMode) String() string {
switch p { switch p {
case Readv: case Readv:
return "Readv" return "Readv"
case RecvMMsg:
return "RecvMMsg"
case PacketMMap:
return "PacketMMap"
default: default:
return fmt.Sprintf("unknown packet dispatch mode '%d'", p) return fmt.Sprintf("unknown packet dispatch mode '%d'", p)
} }
@@ -283,14 +300,6 @@ func New(opts *Options) (stack.LinkEndpoint, error) {
return e, nil return e, nil
} }
func isSocketFD(fd int) (bool, error) {
var stat unix.Stat_t
if err := unix.Fstat(fd, &stat); err != nil {
return false, fmt.Errorf("unix.Fstat(%v,...) failed: %v", fd, err)
}
return (stat.Mode & unix.S_IFSOCK) == unix.S_IFSOCK, nil
}
// Attach launches the goroutine that reads packets from the file descriptor and // Attach launches the goroutine that reads packets from the file descriptor and
// dispatches them via the provided dispatcher. If one is already attached, // dispatches them via the provided dispatcher. If one is already attached,
// then nothing happens. // then nothing happens.

View File

@@ -25,51 +25,31 @@ import (
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
) )
// BufConfig defines the shape of the buffer used to read packets from the NIC.
var BufConfig = []int{4, 128, 256, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768}
// +stateify savable
type iovecBuffer struct { type iovecBuffer struct {
// buffer is the actual buffer that holds the packet contents. Some contents mtu int
// are reused across calls to pullBuffer if number of requested bytes is
// smaller than the number of bytes allocated in the buffer.
views []*buffer.View views []*buffer.View
// iovecs are initialized with base pointers/len of the corresponding
// entries in the views defined above, except when GSO is enabled
// (skipsVnetHdr) then the first iovec points to a buffer for the vnet header
// which is stripped before the views are passed up the stack for further
// processing.
iovecs []unix.Iovec `state:"nosave"` iovecs []unix.Iovec `state:"nosave"`
// sizes is an array of buffer sizes for the underlying views. sizes is
// immutable.
sizes []int
// pulledIndex is the index of the last []byte buffer pulled from the
// underlying buffer storage during a call to pullBuffers. It is -1
// if no buffer is pulled.
pulledIndex int
} }
func newIovecBuffer(sizes []int) *iovecBuffer { func newIovecBuffer(mtu uint32) *iovecBuffer {
b := &iovecBuffer{ b := &iovecBuffer{
views: make([]*buffer.View, len(sizes)), mtu: int(mtu),
iovecs: make([]unix.Iovec, len(sizes)), views: make([]*buffer.View, 2),
sizes: sizes, iovecs: make([]unix.Iovec, 2),
} }
return b return b
} }
func (b *iovecBuffer) nextIovecs() []unix.Iovec { func (b *iovecBuffer) nextIovecs() []unix.Iovec {
for i := range b.views { if b.views[0] == nil {
if b.views[i] != nil { b.views[0] = buffer.NewViewSize(4)
break b.iovecs[0] = unix.Iovec{Base: b.views[0].BasePtr()}
b.iovecs[0].SetLen(4)
} }
v := buffer.NewViewSize(b.sizes[i]) if b.views[1] == nil {
b.views[i] = v b.views[1] = buffer.NewViewSize(b.mtu)
b.iovecs[i] = unix.Iovec{Base: v.BasePtr()} b.iovecs[1] = unix.Iovec{Base: b.views[1].BasePtr()}
b.iovecs[i].SetLen(v.Size()) b.iovecs[1].SetLen(b.mtu)
} }
return b.iovecs return b.iovecs
} }
@@ -80,25 +60,13 @@ func (b *iovecBuffer) nextIovecs() []unix.Iovec {
// of b.buffer's storage must be reallocated during the next call to // of b.buffer's storage must be reallocated during the next call to
// nextIovecs. // nextIovecs.
func (b *iovecBuffer) pullBuffer(n int) buffer.Buffer { func (b *iovecBuffer) pullBuffer(n int) buffer.Buffer {
var views []*buffer.View
c := 0
// Remove the used views from the buffer.
for i, v := range b.views {
c += v.Size()
if c >= n {
b.views[i].CapLength(v.Size() - (c - n))
views = append(views, b.views[:i+1]...)
break
}
}
for i := range views {
b.views[i] = nil
}
pulled := buffer.Buffer{} pulled := buffer.Buffer{}
for _, v := range views { pulled.Append(b.views[0])
pulled.Append(v) pulled.Append(b.views[1])
}
pulled.Truncate(int64(n)) pulled.Truncate(int64(n))
pulled.TrimFront(4)
b.views[0] = nil
b.views[1] = nil
return pulled return pulled
} }
@@ -147,7 +115,12 @@ func newRecvMMsgDispatcher(fd int, e *endpoint, opts *Options) (linkDispatcher,
if err != nil { if err != nil {
return nil, err return nil, err
} }
batchSize := int((512*1024)/(opts.MTU)) + 1 var batchSize int
if opts.MTU < 49152 {
batchSize = int((512*1024)/(opts.MTU)) + 1
} else {
batchSize = 1
}
d := &recvMMsgDispatcher{ d := &recvMMsgDispatcher{
StopFD: stopFD, StopFD: stopFD,
fd: fd, fd: fd,
@@ -155,9 +128,8 @@ func newRecvMMsgDispatcher(fd int, e *endpoint, opts *Options) (linkDispatcher,
bufs: make([]*iovecBuffer, batchSize), bufs: make([]*iovecBuffer, batchSize),
msgHdrs: make([]rawfile.MsgHdrX, batchSize), msgHdrs: make([]rawfile.MsgHdrX, batchSize),
} }
bufConfig := []int{4, int(opts.MTU)}
for i := range d.bufs { for i := range d.bufs {
d.bufs[i] = newIovecBuffer(bufConfig) d.bufs[i] = newIovecBuffer(opts.MTU)
} }
d.gro.Init(false) d.gro.Init(false)
d.mgr = newProcessorManager(opts, e) d.mgr = newProcessorManager(opts, e)
@@ -178,12 +150,11 @@ func (d *recvMMsgDispatcher) release() {
func (d *recvMMsgDispatcher) dispatch() (bool, tcpip.Error) { func (d *recvMMsgDispatcher) dispatch() (bool, tcpip.Error) {
// Fill message headers. // Fill message headers.
for k := range d.msgHdrs { for k := range d.msgHdrs {
if d.msgHdrs[k].Msg.Iovlen > 0 {
break
}
iovecs := d.bufs[k].nextIovecs() iovecs := d.bufs[k].nextIovecs()
iovLen := len(iovecs) iovLen := len(iovecs)
d.msgHdrs[k].DataLen = 0 // Cannot clear only the length field. Older versions of the darwin kernel will check whether other data is empty.
// https://github.com/Darm64/XNU/blob/xnu-2782.40.9/bsd/kern/uipc_syscalls.c#L2026-L2048
d.msgHdrs[k] = rawfile.MsgHdrX{}
d.msgHdrs[k].Msg.Iov = &iovecs[0] d.msgHdrs[k].Msg.Iov = &iovecs[0]
d.msgHdrs[k].Msg.SetIovlen(iovLen) d.msgHdrs[k].Msg.SetIovlen(iovLen)
} }
@@ -209,7 +180,6 @@ func (d *recvMMsgDispatcher) dispatch() (bool, tcpip.Error) {
for k := 0; k < nMsgs; k++ { for k := 0; k < nMsgs; k++ {
n := int(d.msgHdrs[k].DataLen) n := int(d.msgHdrs[k].DataLen)
payload := d.bufs[k].pullBuffer(n) payload := d.bufs[k].pullBuffer(n)
payload.TrimFront(4)
pkt := stack.NewPacketBuffer(stack.PacketBufferOptions{ pkt := stack.NewPacketBuffer(stack.PacketBufferOptions{
Payload: payload, Payload: payload,
}) })

View File

@@ -111,14 +111,14 @@ func New(options Options) (Tun, error) {
unix.Close(tunFd) unix.Close(tunFd)
return nil, err return nil, err
} }
err = configure(tunFd, batchSize) err = configure(tunFd, options.MTU, batchSize)
if err != nil { if err != nil {
unix.Close(tunFd) unix.Close(tunFd)
return nil, err return nil, err
} }
} else { } else {
tunFd = options.FileDescriptor tunFd = options.FileDescriptor
err := configure(tunFd, batchSize) err := configure(tunFd, options.MTU, batchSize)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -332,16 +332,18 @@ func create(tunFd int, ifIndex int, name string, options Options) error {
return nil return nil
} }
func configure(tunFd int, batchSize int) error { func configure(tunFd int, tunMTU uint32, batchSize int) error {
err := unix.SetNonblock(tunFd, true) err := unix.SetNonblock(tunFd, true)
if err != nil { if err != nil {
return os.NewSyscallError("SetNonblock", err) return os.NewSyscallError("SetNonblock", err)
} }
if tunMTU < 49152 {
const UTUN_OPT_MAX_PENDING_PACKETS = 16 const UTUN_OPT_MAX_PENDING_PACKETS = 16
err = unix.SetsockoptInt(tunFd, 2, UTUN_OPT_MAX_PENDING_PACKETS, batchSize) err = unix.SetsockoptInt(tunFd, 2, UTUN_OPT_MAX_PENDING_PACKETS, batchSize)
if err != nil { if err != nil {
return os.NewSyscallError("SetsockoptInt UTUN_OPT_MAX_PENDING_PACKETS", err) return os.NewSyscallError("SetsockoptInt UTUN_OPT_MAX_PENDING_PACKETS", err)
} }
}
return nil return nil
} }
@@ -352,12 +354,19 @@ func (t *NativeTun) BatchSize() int {
func (t *NativeTun) BatchRead() ([]*buf.Buffer, error) { func (t *NativeTun) BatchRead() ([]*buf.Buffer, error) {
for i := 0; i < t.batchSize; i++ { for i := 0; i < t.batchSize; i++ {
iovecs := t.iovecs[i].nextIovecs() iovecs := t.iovecs[i].nextIovecs()
t.msgHdrs[i].DataLen = 0 // Cannot clear only the length field. Older versions of the darwin kernel will check whether other data is empty.
// https://github.com/Darm64/XNU/blob/xnu-2782.40.9/bsd/kern/uipc_syscalls.c#L2026-L2048
t.msgHdrs[i] = rawfile.MsgHdrX{}
t.msgHdrs[i].Msg.Iov = &iovecs[0] t.msgHdrs[i].Msg.Iov = &iovecs[0]
t.msgHdrs[i].Msg.Iovlen = 2 t.msgHdrs[i].Msg.Iovlen = 2
} }
n, errno := rawfile.BlockingRecvMMsgUntilStopped(t.stopFd.ReadFD, t.tunFd, t.msgHdrs) n, errno := rawfile.BlockingRecvMMsgUntilStopped(t.stopFd.ReadFD, t.tunFd, t.msgHdrs)
if errno != 0 { if errno != 0 {
for k := 0; k < n; k++ {
t.iovecs[k].buffer.Release()
t.iovecs[k].buffer = nil
}
t.buffers = t.buffers[:0]
return nil, errno return nil, errno
} }
if n < 1 { if n < 1 {

View File

@@ -15,11 +15,14 @@ func (t *NativeTun) NewEndpoint() (stack.LinkEndpoint, stack.NICOptions, error)
FDs: []int{t.tunFd}, FDs: []int{t.tunFd},
MTU: t.options.MTU, MTU: t.options.MTU,
RXChecksumOffload: true, RXChecksumOffload: true,
PacketDispatchMode: fdbased.RecvMMsg,
}) })
if err != nil { if err != nil {
return nil, stack.NICOptions{}, err return nil, stack.NICOptions{}, err
} }
return ep, stack.NICOptions{ var nicOptions stack.NICOptions
QDisc: fifo.New(ep, 1, 1000), if t.options.MTU < 49152 {
}, nil nicOptions.QDisc = fifo.New(ep, 1, 1000)
}
return ep, nicOptions, nil
} }