mirror of
https://git.zx2c4.com/wireguard-go
synced 2025-10-08 10:00:19 +08:00
Improved receive.go
- Fixed configuration listen-port semantics - Improved receive.go code for updating listen port - Updated under load detection, how follows the kernel space implementation - Fixed trie bug accidentally introduced in last commit - Added interface name to log (format still subject to change) - Can now configure the logging level using the LOG_LEVEL variable - Begin porting netsh.sh tests - A number of smaller changes
This commit is contained in:
207
src/receive.go
207
src/receive.go
@@ -72,43 +72,6 @@ func (device *Device) addToHandshakeQueue(
|
||||
}
|
||||
}
|
||||
|
||||
/* Routine determining the busy state of the interface
|
||||
*
|
||||
* TODO: Under load for some time
|
||||
*/
|
||||
func (device *Device) RoutineBusyMonitor() {
|
||||
samples := 0
|
||||
interval := time.Second
|
||||
for timer := time.NewTimer(interval); ; {
|
||||
|
||||
select {
|
||||
case <-device.signal.stop:
|
||||
return
|
||||
case <-timer.C:
|
||||
}
|
||||
|
||||
// compute busy heuristic
|
||||
|
||||
if len(device.queue.handshake) > QueueHandshakeBusySize {
|
||||
samples += 1
|
||||
} else if samples > 0 {
|
||||
samples -= 1
|
||||
}
|
||||
samples %= 30
|
||||
busy := samples > 5
|
||||
|
||||
// update busy state
|
||||
|
||||
if busy {
|
||||
atomic.StoreInt32(&device.underLoad, AtomicTrue)
|
||||
} else {
|
||||
atomic.StoreInt32(&device.underLoad, AtomicFalse)
|
||||
}
|
||||
|
||||
timer.Reset(interval)
|
||||
}
|
||||
}
|
||||
|
||||
func (device *Device) RoutineReceiveIncomming() {
|
||||
|
||||
logDebug := device.log.Debug
|
||||
@@ -118,117 +81,121 @@ func (device *Device) RoutineReceiveIncomming() {
|
||||
|
||||
// wait for new conn
|
||||
|
||||
var conn *net.UDPConn
|
||||
logDebug.Println("Waiting for udp socket")
|
||||
|
||||
select {
|
||||
case <-device.signal.newUDPConn:
|
||||
device.net.mutex.RLock()
|
||||
conn = device.net.conn
|
||||
device.net.mutex.RUnlock()
|
||||
|
||||
case <-device.signal.stop:
|
||||
return
|
||||
}
|
||||
|
||||
if conn == nil {
|
||||
continue
|
||||
}
|
||||
case <-device.signal.newUDPConn:
|
||||
|
||||
// receive datagrams until closed
|
||||
// fetch connection
|
||||
|
||||
buffer := device.GetMessageBuffer()
|
||||
|
||||
for {
|
||||
|
||||
// read next datagram
|
||||
|
||||
size, raddr, err := conn.ReadFromUDP(buffer[:]) // TODO: This is broken
|
||||
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
if size < MinMessageSize {
|
||||
device.net.mutex.RLock()
|
||||
conn := device.net.conn
|
||||
device.net.mutex.RUnlock()
|
||||
if conn == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// check size of packet
|
||||
logDebug.Println("Listening for inbound packets")
|
||||
|
||||
packet := buffer[:size]
|
||||
msgType := binary.LittleEndian.Uint32(packet[:4])
|
||||
// receive datagrams until conn is closed
|
||||
|
||||
var okay bool
|
||||
buffer := device.GetMessageBuffer()
|
||||
|
||||
switch msgType {
|
||||
for {
|
||||
|
||||
// check if transport
|
||||
// read next datagram
|
||||
|
||||
case MessageTransportType:
|
||||
size, raddr, err := conn.ReadFromUDP(buffer[:]) // Blocks sometimes
|
||||
|
||||
// check size
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
if len(packet) < MessageTransportType {
|
||||
if size < MinMessageSize {
|
||||
continue
|
||||
}
|
||||
|
||||
// lookup key pair
|
||||
// check size of packet
|
||||
|
||||
receiver := binary.LittleEndian.Uint32(
|
||||
packet[MessageTransportOffsetReceiver:MessageTransportOffsetCounter],
|
||||
)
|
||||
value := device.indices.Lookup(receiver)
|
||||
keyPair := value.keyPair
|
||||
if keyPair == nil {
|
||||
continue
|
||||
}
|
||||
packet := buffer[:size]
|
||||
msgType := binary.LittleEndian.Uint32(packet[:4])
|
||||
|
||||
// check key-pair expiry
|
||||
var okay bool
|
||||
|
||||
if keyPair.created.Add(RejectAfterTime).Before(time.Now()) {
|
||||
continue
|
||||
}
|
||||
switch msgType {
|
||||
|
||||
// create work element
|
||||
// check if transport
|
||||
|
||||
peer := value.peer
|
||||
elem := &QueueInboundElement{
|
||||
packet: packet,
|
||||
buffer: buffer,
|
||||
keyPair: keyPair,
|
||||
dropped: AtomicFalse,
|
||||
}
|
||||
elem.mutex.Lock()
|
||||
case MessageTransportType:
|
||||
|
||||
// add to decryption queues
|
||||
// check size
|
||||
|
||||
device.addToInboundQueue(device.queue.decryption, elem)
|
||||
device.addToInboundQueue(peer.queue.inbound, elem)
|
||||
buffer = nil
|
||||
continue
|
||||
if len(packet) < MessageTransportType {
|
||||
continue
|
||||
}
|
||||
|
||||
// otherwise it is a handshake related packet
|
||||
// lookup key pair
|
||||
|
||||
case MessageInitiationType:
|
||||
okay = len(packet) == MessageInitiationSize
|
||||
receiver := binary.LittleEndian.Uint32(
|
||||
packet[MessageTransportOffsetReceiver:MessageTransportOffsetCounter],
|
||||
)
|
||||
value := device.indices.Lookup(receiver)
|
||||
keyPair := value.keyPair
|
||||
if keyPair == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
case MessageResponseType:
|
||||
okay = len(packet) == MessageResponseSize
|
||||
// check key-pair expiry
|
||||
|
||||
case MessageCookieReplyType:
|
||||
okay = len(packet) == MessageCookieReplySize
|
||||
}
|
||||
if keyPair.created.Add(RejectAfterTime).Before(time.Now()) {
|
||||
continue
|
||||
}
|
||||
|
||||
if okay {
|
||||
device.addToHandshakeQueue(
|
||||
device.queue.handshake,
|
||||
QueueHandshakeElement{
|
||||
msgType: msgType,
|
||||
buffer: buffer,
|
||||
// create work element
|
||||
|
||||
peer := value.peer
|
||||
elem := &QueueInboundElement{
|
||||
packet: packet,
|
||||
source: raddr,
|
||||
},
|
||||
)
|
||||
buffer = device.GetMessageBuffer()
|
||||
buffer: buffer,
|
||||
keyPair: keyPair,
|
||||
dropped: AtomicFalse,
|
||||
}
|
||||
elem.mutex.Lock()
|
||||
|
||||
// add to decryption queues
|
||||
|
||||
device.addToInboundQueue(device.queue.decryption, elem)
|
||||
device.addToInboundQueue(peer.queue.inbound, elem)
|
||||
buffer = device.GetMessageBuffer()
|
||||
continue
|
||||
|
||||
// otherwise it is a handshake related packet
|
||||
|
||||
case MessageInitiationType:
|
||||
okay = len(packet) == MessageInitiationSize
|
||||
|
||||
case MessageResponseType:
|
||||
okay = len(packet) == MessageResponseSize
|
||||
|
||||
case MessageCookieReplyType:
|
||||
okay = len(packet) == MessageCookieReplySize
|
||||
}
|
||||
|
||||
if okay {
|
||||
device.addToHandshakeQueue(
|
||||
device.queue.handshake,
|
||||
QueueHandshakeElement{
|
||||
msgType: msgType,
|
||||
buffer: buffer,
|
||||
packet: packet,
|
||||
source: raddr,
|
||||
},
|
||||
)
|
||||
buffer = device.GetMessageBuffer()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -326,10 +293,11 @@ func (device *Device) RoutineHandshake() {
|
||||
return
|
||||
}
|
||||
|
||||
busy := atomic.LoadInt32(&device.underLoad) == AtomicTrue
|
||||
|
||||
if busy {
|
||||
if device.IsUnderLoad() {
|
||||
if !device.mac.CheckMAC2(elem.packet, elem.source) {
|
||||
|
||||
// construct cookie reply
|
||||
|
||||
sender := binary.LittleEndian.Uint32(elem.packet[4:8]) // "sender" always follows "type"
|
||||
reply, err := device.CreateMessageCookieReply(elem.packet, sender, elem.source)
|
||||
if err != nil {
|
||||
@@ -347,6 +315,7 @@ func (device *Device) RoutineHandshake() {
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if !device.ratelimiter.Allow(elem.source.IP) {
|
||||
continue
|
||||
}
|
||||
@@ -577,7 +546,7 @@ func (peer *Peer) RoutineSequentialReceiver() {
|
||||
// write to tun
|
||||
|
||||
atomic.AddUint64(&peer.stats.rxBytes, uint64(len(elem.packet)))
|
||||
_, err := device.tun.Write(elem.packet)
|
||||
_, err := device.tun.device.Write(elem.packet)
|
||||
device.PutMessageBuffer(elem.buffer)
|
||||
if err != nil {
|
||||
logError.Println("Failed to write packet to TUN device:", err)
|
||||
|
Reference in New Issue
Block a user