large test (+goselect)

This commit is contained in:
Juan Batiz-Benet
2015-01-22 22:12:35 -08:00
parent 6924153ade
commit 3b2661f810
4 changed files with 112 additions and 22 deletions

View File

@@ -8,8 +8,3 @@ import (
var soReusePort = syscall.SO_REUSEPORT
var soReuseAddr = syscall.SO_REUSEADDR
func Select(nfd int, r *syscall.FdSet, w *syscall.FdSet, e *syscall.FdSet, timeout *syscall.Timeval) (n int, err error) {
err = syscall.Select(nfd, r, w, e, timeout)
return 0, err
}

View File

@@ -8,7 +8,3 @@ import (
var soReusePort = 15 // this is not defined in unix go pkg.
var soReuseAddr = syscall.SO_REUSEADDR
func Select(nfd int, r *syscall.FdSet, w *syscall.FdSet, e *syscall.FdSet, timeout *syscall.Timeval) (n int, err error) {
return syscall.Select(nfd, r, w, e, timeout)
}

View File

@@ -3,7 +3,6 @@
package reuseport
import (
"fmt"
"net"
"os"
"strconv"
@@ -11,6 +10,7 @@ import (
"time"
sockaddrnet "github.com/jbenet/go-sockaddr/net"
goselect "github.com/jbenet/goselect"
)
const (
@@ -324,18 +324,18 @@ func connect(fd int, ra syscall.Sockaddr, deadline time.Time) error {
}
var err error
var to *syscall.Timeval
var pw syscall.FdSet
FD_SET(uintptr(fd), &pw)
var timeout time.Duration
var pw goselect.FDSet
pw.Set(uintptr(fd))
for {
// wait until the fd is ready to read or write.
if !deadline.IsZero() {
to2 := syscall.NsecToTimeval(deadline.Sub(time.Now()).Nanoseconds())
to = &to2
timeout = deadline.Sub(time.Now())
} else {
timeout = -1
}
if _, err = Select(fd+1, nil, &pw, nil, to); err != nil {
fmt.Println(err)
if err = goselect.Select(fd+1, nil, &pw, nil, timeout); err != nil {
return err
}
@@ -373,8 +373,3 @@ type timeoutError struct{}
func (e *timeoutError) Error() string { return "i/o timeout" }
func (e *timeoutError) Timeout() bool { return true }
func (e *timeoutError) Temporary() bool { return true }
func FD_SET(fd uintptr, p *syscall.FdSet) {
n, k := fd/32, fd%32
p.Bits[n] |= (1 << uint32(k))
}

View File

@@ -3,10 +3,12 @@ package reuseport
import (
"bytes"
"errors"
"fmt"
"io"
"net"
"os"
"strings"
"sync"
"testing"
"time"
)
@@ -262,6 +264,108 @@ func TestStreamListenDialSamePort(t *testing.T) {
}
}
func TestStreamListenDialSamePortStressManyMsgs(t *testing.T) {
testCases := [][]string{
[]string{"tcp", "127.0.0.1:0"},
[]string{"tcp4", "127.0.0.1:0"},
[]string{"tcp6", "[::]:0"},
}
for _, tcase := range testCases {
subestStreamListenDialSamePortStress(t, tcase[0], tcase[1], 2, 100)
}
}
func TestStreamListenDialSamePortStressManyNodes(t *testing.T) {
testCases := [][]string{
[]string{"tcp", "127.0.0.1:0"},
[]string{"tcp4", "127.0.0.1:0"},
[]string{"tcp6", "[::]:0"},
}
for _, tcase := range testCases {
subestStreamListenDialSamePortStress(t, tcase[0], tcase[1], 100, 1)
}
}
func subestStreamListenDialSamePortStress(t *testing.T, network, addr string, nodes int, msgs int) {
t.Logf("testing %s:%s %d nodes %d msgs", network, addr, nodes, msgs)
var ls []net.Listener
for i := 0; i < nodes; i++ {
l, err := Listen(network, addr)
if err != nil {
t.Fatal(err)
}
defer l.Close()
go acceptAndEcho(l)
ls = append(ls, l)
t.Logf("listening %s", l.Addr())
}
// connect them all
var cs []net.Conn
for i := 0; i < nodes; i++ {
for j := 0; j < i; j++ {
if i == j {
continue // cannot do self.
}
ia := ls[i].Addr().String()
ja := ls[j].Addr().String()
c, err := Dial(network, ia, ja)
if err != nil {
t.Fatal(network, ia, ja, err)
}
defer c.Close()
cs = append(cs, c)
t.Logf("dialed %s --> %s", c.LocalAddr(), c.RemoteAddr())
}
}
errs := make(chan error)
send := func(c net.Conn, buf []byte) {
if _, err := c.Write(buf); err != nil {
errs <- err
}
}
recv := func(c net.Conn, buf []byte) {
buf2 := make([]byte, len(buf))
if _, err := c.Read(buf2); err != nil {
errs <- err
}
if !bytes.Equal(buf, buf2) {
errs <- fmt.Errorf("recv failure: %s <--> %s -- %s %s", c.RemoteAddr(), c.LocalAddr(), buf, buf2)
}
}
t.Logf("sending %d msgs per conn", msgs)
go func() {
var wg sync.WaitGroup
for _, c := range cs {
wg.Add(1)
go func(c net.Conn) {
defer wg.Done()
for i := 0; i < msgs; i++ {
msg := []byte(fmt.Sprintf("message %d", i))
send(c, msg)
recv(c, msg)
}
}(c)
}
wg.Wait()
close(errs)
}()
for err := range errs {
if err != nil {
t.Error(err)
}
}
}
func TestPacketListenDialSamePort(t *testing.T) {
any := [][]string{