Improve test parallelization w/ nodes > 8

Parallelizable tests (`ginkgo -r -p .`) were failing on my 20-core
(`-nodes=20`) Mac Studio. We narrowed this down to two causes:

1. The servers sometimes took longer than the hard-coded 3-second delay
   to become ready to answer queries.
2. The blocklist was downloaded asynchronously, and sometimes weren't
   ready by the time the queries were run.

To address these, we did the following:

1. Rather than hard-code a 3-second delay, we modified the server to
   signal that it's ready to answer queries (by printing "Ready to
   answer queries" to the log). We now wait for that string to appear
   before we begin testing the server. IMHO, this is a much better
   solution than a hard-coded delay.
2. The initial download of the blocklist occurs synchronously, and
   subsequent downloads, asynchronously.

Drive-bys:
- If the server can't bind to even one address, it exits.
- Refactored the blocklist code; the nested if-then-else were too deep

Fixes:
```
  Expected
      <string>: 43.134.66.67

  to match regular expression
      <string>: \A52.0.56.137\n\z
  In [It] at: /Users/cunnie/workspace/sslip.io/src/sslip.io-dns-server/integration_test.go:421
```
This commit is contained in:
Brian Cunnie
2022-08-07 07:31:22 -07:00
parent 56924923d3
commit 369ac1140d
3 changed files with 39 additions and 41 deletions

View File

@@ -29,8 +29,8 @@ var _ = BeforeSuite(func() {
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
// takes 0.455s to start up on macOS Big Sur 3.7 GHz Quad Core 22-nm Xeon E5-1620v2 processor (2013 Mac Pro) // takes 0.455s to start up on macOS Big Sur 3.7 GHz Quad Core 22-nm Xeon E5-1620v2 processor (2013 Mac Pro)
// takes 1.312s to start up on macOS Big Sur 2.0GHz quad-core 10th-generation Intel Core i5 processor (2020 13" MacBook Pro) // takes 1.312s to start up on macOS Big Sur 2.0GHz quad-core 10th-generation Intel Core i5 processor (2020 13" MacBook Pro)
// round up to 3 seconds to account for slow container-on-a-VM-with-shared-core // 10 seconds should be long enough for slow container-on-a-VM-with-shared-core
time.Sleep(3 * time.Second) // takes 0.455s to start up on macOS Big Sur 4-core Xeon Eventually(serverSession.Err, 10).Should(Say("Ready to answer queries"))
}) })
var _ = AfterSuite(func() { var _ = AfterSuite(func() {

View File

@@ -29,9 +29,7 @@ func main() {
// common err hierarchy: net.OpError → os.SyscallError → syscall.Errno // common err hierarchy: net.OpError → os.SyscallError → syscall.Errno
switch { switch {
case err == nil: case err == nil:
log.Printf("Successfully bound to all interfaces, port %d.\n", *bindPort) log.Printf("Successfully bound to all IPs, port %d.\n", *bindPort)
wg.Add(1)
readFrom(conn, &wg, x)
case isErrorPermissionsError(err): case isErrorPermissionsError(err):
log.Printf("Try invoking me with `sudo` because I don't have permission to bind to port %d.\n", *bindPort) log.Printf("Try invoking me with `sudo` because I don't have permission to bind to port %d.\n", *bindPort)
log.Fatal(err.Error()) log.Fatal(err.Error())
@@ -58,15 +56,19 @@ func main() {
go readFrom(conn, &wg, x) go readFrom(conn, &wg, x)
} }
} }
if len(boundIPsPorts) > 0 { if len(boundIPsPorts) == 0 {
log.Printf(`I bound to the following: "%s"`, strings.Join(boundIPsPorts, `", "`)) log.Fatalf("I couldn't bind to any IPs on port %d, so I'm exiting", *bindPort)
} }
log.Printf(`I bound to the following IPs: "%s"`, strings.Join(boundIPsPorts, `", "`))
if len(unboundIPs) > 0 { if len(unboundIPs) > 0 {
log.Printf(`I couldn't bind to the following IPs: "%s"`, strings.Join(unboundIPs, `", "`)) log.Printf(`I couldn't bind to the following IPs: "%s"`, strings.Join(unboundIPs, `", "`))
} }
default: default:
log.Fatal(err.Error()) log.Fatal(err.Error())
} }
log.Printf("Ready to answer queries")
wg.Add(1)
readFrom(conn, &wg, x)
wg.Wait() wg.Wait()
} }

View File

@@ -240,26 +240,20 @@ func NewXip(etcdEndpoint, blocklistURL string) (x *Xip, logmessages []string) {
// determine whether to use a local key-value store instead // determine whether to use a local key-value store instead
x.Etcd, err = clientv3New(etcdEndpoint) x.Etcd, err = clientv3New(etcdEndpoint)
if err != nil { if err != nil {
logmessages = append(logmessages, fmt.Sprintf("failed to connect to etcd at %s; using local key-value store instead: %s", etcdEndpoint, err.Error())) logmessages = append(logmessages, fmt.Sprintf("failed to connect to etcd at %s, using local key-value store instead: %s", etcdEndpoint, err.Error()))
} else { } else {
logmessages = append(logmessages, fmt.Sprintf("Successfully connected to etcd at %s", etcdEndpoint)) logmessages = append(logmessages, fmt.Sprintf("Successfully connected to etcd at %s", etcdEndpoint))
} }
// don't `defer etcdCli.Close()`: "The Client has internal state (watchers and leases), so // don't `defer etcdCli.Close()`: "The Client has internal state (watchers and leases), so
// Clients should be reused instead of created as needed" // Clients should be reused instead of created as needed"
// Download the blocklist
logmessages = append(logmessages, x.downloadBlockList(blocklistURL))
// re-download the blocklist every hour so I don't need to restart servers after updating blocklist // re-download the blocklist every hour so I don't need to restart servers after updating blocklist
go func() { go func() {
for { for {
blocklistStrings, blocklistCDIRs, err := readBlocklist(blocklistURL)
if err != nil {
logmessages = append(logmessages, fmt.Sprintf("couldn't get blocklist at %s: %s", blocklistURL, err.Error()))
} else {
logmessages = append(logmessages, fmt.Sprintf("Successfully downloaded blocklist from %s: %v, %v", blocklistURL, blocklistStrings, blocklistCDIRs))
x.BlocklistStrings = blocklistStrings
x.BlocklistCDIRs = blocklistCDIRs
x.BlocklistUpdated = time.Now()
}
time.Sleep(1 * time.Hour) time.Sleep(1 * time.Hour)
_ = x.downloadBlockList(blocklistURL) // uh-oh, I lose the log message.
} }
}() }()
@@ -295,6 +289,7 @@ func NewXip(etcdEndpoint, blocklistURL string) (x *Xip, logmessages []string) {
// QueryResponse are not as hard. // QueryResponse are not as hard.
// //
// Examples of log strings returned: // Examples of log strings returned:
//
// 78.46.204.247.33654: TypeA 127-0-0-1.sslip.io ? 127.0.0.1 // 78.46.204.247.33654: TypeA 127-0-0-1.sslip.io ? 127.0.0.1
// 78.46.204.247.33654: TypeA non-existent.sslip.io ? nil, SOA // 78.46.204.247.33654: TypeA non-existent.sslip.io ? nil, SOA
// 78.46.204.247.33654: TypeNS www.example.com ? NS // 78.46.204.247.33654: TypeNS www.example.com ? NS
@@ -374,7 +369,8 @@ func (x *Xip) processQuestion(q dnsmessage.Question, srcAddr net.IP) (response R
RCode: dnsmessage.RCodeSuccess, // assume success, may be replaced later RCode: dnsmessage.RCodeSuccess, // assume success, may be replaced later
}, },
} }
if IsAcmeChallenge(q.Name.String()) && !x.blocklist(q.Name.String()) { // thanks @NormanR if IsAcmeChallenge(q.Name.String()) && !x.blocklist(q.Name.String()) {
// thanks, @NormanR
// delegate everything to its stripped (remove "_acme-challenge.") address, e.g. // delegate everything to its stripped (remove "_acme-challenge.") address, e.g.
// dig _acme-challenge.127-0-0-1.sslip.io mx → NS 127-0-0-1.sslip.io // dig _acme-challenge.127-0-0-1.sslip.io mx → NS 127-0-0-1.sslip.io
response.Header.Authoritative = false // we're delegating, so we're not authoritative response.Header.Authoritative = false // we're delegating, so we're not authoritative
@@ -610,7 +606,7 @@ func (x *Xip) processQuestion(q dnsmessage.Question, srcAddr net.IP) (response R
} }
} }
// NSResponse sets the Answers/Authorities depending whether we're delegating or authoritative // NSResponse sets the Answers/Authorities depending upon whether we're delegating or authoritative
// (whether it's an "_acme-challenge." domain or not). Either way, it supplies the Additionals // (whether it's an "_acme-challenge." domain or not). Either way, it supplies the Additionals
// (IP addresses of the nameservers). // (IP addresses of the nameservers).
func (x *Xip) NSResponse(name dnsmessage.Name, response Response, logMessage string) (Response, string, error) { func (x *Xip) NSResponse(name dnsmessage.Name, response Response, logMessage string) (Response, string, error) {
@@ -1071,29 +1067,29 @@ func (a Metrics) MostlyEquals(b Metrics) bool {
return false return false
} }
// readBlocklist downloads the blocklist of domains & CIDRs that are forbidden func (x *Xip) downloadBlockList(blocklistURL string) string {
// because they're used for phishing (e.g. "raiffeisen")
func readBlocklist(blocklistURL string) (blocklistStrings []string, blocklistCIDRs []net.IPNet, err error) {
resp, err := http.Get(blocklistURL) resp, err := http.Get(blocklistURL)
if err != nil { if err != nil {
log.Println(fmt.Errorf(`failed to download blocklist "%s": %w`, blocklistURL, err)) return fmt.Sprintf(`failed to download blocklist "%s": %s`, blocklistURL, err.Error())
} else { }
//noinspection GoUnhandledErrorResult //noinspection GoUnhandledErrorResult
defer resp.Body.Close() defer resp.Body.Close()
if resp.StatusCode > 299 { if resp.StatusCode > 299 {
log.Printf(`failed to download blocklist "%s", HTTP status: "%d"`, blocklistURL, resp.StatusCode) return fmt.Sprintf(`failed to download blocklist "%s", HTTP status: "%d"`, blocklistURL, resp.StatusCode)
} else { }
blocklistStrings, blocklistCIDRs, err = ReadBlocklist(resp.Body) blocklistStrings, blocklistCIDRs, err := ReadBlocklist(resp.Body)
if err != nil { if err != nil {
log.Println(fmt.Errorf(`failed to parse blocklist "%s": %w`, blocklistURL, err)) return fmt.Sprintf(`failed to parse blocklist "%s": %s`, blocklistURL, err.Error())
} }
} x.BlocklistStrings = blocklistStrings
} x.BlocklistCDIRs = blocklistCIDRs
return blocklistStrings, blocklistCIDRs, err x.BlocklistUpdated = time.Now()
return fmt.Sprintf("Successfully downloaded blocklist from %s: %v, %v", blocklistURL, x.BlocklistStrings, x.BlocklistCDIRs)
} }
// ReadBlocklist "sanitizes" the block list, removing comments, invalid characters // ReadBlocklist "sanitizes" the block list, removing comments, invalid characters
// and lowercasing the names to be blocked // and lowercasing the names to be blocked.
// public to make testing easier
func ReadBlocklist(blocklist io.Reader) (stringBlocklists []string, cidrBlocklists []net.IPNet, err error) { func ReadBlocklist(blocklist io.Reader) (stringBlocklists []string, cidrBlocklists []net.IPNet, err error) {
scanner := bufio.NewScanner(blocklist) scanner := bufio.NewScanner(blocklist)
comments := regexp.MustCompile(`#.*`) comments := regexp.MustCompile(`#.*`)