Update On Tue Aug 12 20:40:50 CEST 2025

This commit is contained in:
github-action[bot]
2025-08-12 20:40:51 +02:00
parent 7a3097a4a2
commit 94f030fce0
58 changed files with 1108 additions and 854 deletions

View File

@@ -17,12 +17,10 @@ package testtool
import (
"bytes"
"errors"
"io"
mrand "math/rand"
"net"
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/enfein/mieru/v3/pkg/common"
@@ -32,12 +30,17 @@ import (
func BufPipe() (net.Conn, net.Conn) {
var buf1, buf2 bytes.Buffer
var lock1, lock2 sync.Mutex
cond1 := sync.NewCond(&lock1) // endpoint 1 has data to read
cond2 := sync.NewCond(&lock2) // endpoint 2 has data to read
ep1 := &ioEndpoint{
direction: forward,
buf1: &buf1,
buf2: &buf2,
lock1: &lock1,
lock2: &lock2,
cond1: cond1,
cond2: cond2,
}
ep2 := &ioEndpoint{
direction: backward,
@@ -45,7 +48,11 @@ func BufPipe() (net.Conn, net.Conn) {
buf2: &buf2,
lock1: &lock1,
lock2: &lock2,
cond1: cond1,
cond2: cond2,
}
ep1.peer = ep2
ep2.peer = ep1
return ep1, ep2
}
@@ -62,56 +69,83 @@ type ioEndpoint struct {
buf2 *bytes.Buffer // backward writes to here
lock1 *sync.Mutex // lock of buf1
lock2 *sync.Mutex // lock of buf2
closed bool
cond1 *sync.Cond
cond2 *sync.Cond
closed atomic.Bool
peer *ioEndpoint
}
var _ net.Conn = &ioEndpoint{}
func (e *ioEndpoint) Read(b []byte) (n int, err error) {
if e.closed {
if e.closed.Load() {
return 0, io.EOF
}
var buffer *bytes.Buffer
var lock *sync.Mutex
var cond *sync.Cond
if e.direction == forward {
e.lock2.Lock()
n, err = e.buf2.Read(b)
e.lock2.Unlock()
buffer = e.buf2
lock = e.lock2
cond = e.cond2
} else {
e.lock1.Lock()
n, err = e.buf1.Read(b)
e.lock1.Unlock()
buffer = e.buf1
lock = e.lock1
cond = e.cond1
}
if errors.Is(err, io.EOF) {
// io.ReadFull() with partial result will not fail.
err = nil
action := mrand.Intn(2)
if action == 0 {
// Allow the writer to catch up.
runtime.Gosched()
} else {
time.Sleep(time.Microsecond)
lock.Lock()
defer lock.Unlock()
for buffer.Len() == 0 {
if e.closed.Load() || e.peer.closed.Load() {
return 0, io.EOF
}
cond.Wait()
}
return
return buffer.Read(b)
}
func (e *ioEndpoint) Write(b []byte) (n int, err error) {
if e.closed {
if e.closed.Load() {
return 0, io.ErrClosedPipe
}
var buffer *bytes.Buffer
var lock *sync.Mutex
var cond *sync.Cond
if e.direction == forward {
e.lock1.Lock()
n, err = e.buf1.Write(b)
e.lock1.Unlock()
buffer = e.buf1
lock = e.lock1
cond = e.cond1
} else {
e.lock2.Lock()
n, err = e.buf2.Write(b)
e.lock2.Unlock()
buffer = e.buf2
lock = e.lock2
cond = e.cond2
}
lock.Lock()
defer lock.Unlock()
if e.peer.closed.Load() {
return 0, io.ErrClosedPipe
}
n, err = buffer.Write(b)
cond.Signal()
return
}
func (e *ioEndpoint) Close() error {
e.closed = true
if e.closed.Swap(true) {
return nil
}
e.cond1.Broadcast()
e.cond2.Broadcast()
return nil
}