batch: add default timer for all edges

This commit is contained in:
Auztin Zhai
2024-05-26 06:02:20 -04:00
parent 9877e65631
commit a5bab194b9
2 changed files with 57 additions and 2 deletions

View File

@@ -5,14 +5,18 @@ import (
"fmt"
"math/rand"
"net"
"net/http"
_ "net/http/pprof"
"os"
"strconv"
"strings"
"sync"
"time"
armlog "github.com/jumboframes/armorigo/log"
"github.com/jumboframes/armorigo/sigaction"
"github.com/singchia/frontier/api/dataplane/v1/edge"
"github.com/singchia/go-timer/v2"
"github.com/spf13/pflag"
)
@@ -28,9 +32,51 @@ func main() {
count := pflag.Int("count", 10000, "edges to dial")
topic := pflag.String("topic", "test", "topic to specific")
nseconds := pflag.Int("nseconds", 10, "publish message every n seconds for every edge")
sourceIPs := pflag.String("source_ips", "", "source ips to dial, if your ")
pprof := pflag.String("pprof", "", "pprof addr to listen")
pflag.Parse()
if *pprof != "" {
go func() {
http.ListenAndServe(*pprof, nil)
}()
}
ips := []string{}
idx := 0
if *sourceIPs != "" {
ips = strings.Split(*sourceIPs, ",")
idx = 0
}
dialer := func() (net.Conn, error) {
if len(ips) != 0 {
for retry := 0; retry < 2; retry++ {
localAddr := &net.TCPAddr{
IP: net.ParseIP(ips[idx]),
}
dialer := &net.Dialer{
LocalAddr: localAddr,
Timeout: 5 * time.Second,
}
conn, err := dialer.Dial(*network, *address)
if err == nil {
return conn, nil
}
if strings.Contains(err.Error(), "cannot assign requested address") ||
strings.Contains(err.Error(), "address already in use") {
fmt.Println("source ip:", localAddr.IP.String(), localAddr.Port, err)
idx += 1
if idx >= len(ips) {
return nil, err
}
continue
}
fmt.Println("source ip:", localAddr.IP.String(), localAddr.Port)
return nil, err
}
}
return net.Dial(*network, *address)
}
// log
@@ -45,6 +91,7 @@ func main() {
wg := sync.WaitGroup{}
wg.Add(*count)
tmr := timer.NewTimer()
for i := 0; i < *count; i++ {
go func(i int) {
defer wg.Done()
@@ -53,7 +100,9 @@ func main() {
time.Sleep(time.Second * time.Duration(random))
// new edge connection
cli, err := edge.NewEdge(dialer,
edge.OptionEdgeLog(armlog.DefaultLog))
edge.OptionEdgeLog(armlog.DefaultLog),
edge.OptionEdgeTimer(tmr),
edge.OptionServiceBufferSize(128, 128))
if err != nil {
armlog.Info("new edge err:", err)
return
@@ -67,6 +116,7 @@ func main() {
msg := cli.NewMessage([]byte(str))
err := cli.Publish(context.TODO(), *topic, msg)
if err != nil {
fmt.Println("publish err", err)
break
}
time.Sleep(time.Duration(*nseconds) * time.Second)

View File

@@ -5,6 +5,8 @@ import (
"io"
"log"
"net"
"net/http"
_ "net/http/pprof"
"os"
"strconv"
"sync"
@@ -33,6 +35,9 @@ func main() {
printstats := pflag.Bool("printstats", false, "whether print topic stats")
pflag.Parse()
go func() {
http.ListenAndServe("0.0.0.0:6062", nil)
}()
dialer := func() (net.Conn, error) {
return net.Dial(*network, *address)
}