diff --git a/api/dataplane/v1/service/cluster_service_end.go b/api/dataplane/v1/service/cluster_service_end.go index 0098a5b..c6dad70 100644 --- a/api/dataplane/v1/service/cluster_service_end.go +++ b/api/dataplane/v1/service/cluster_service_end.go @@ -58,8 +58,11 @@ func newclusterServiceEnd(addr string, opts ...ServiceOption) (*clusterServiceEn cc := clusterv1.NewClusterServiceClient(conn) end := &clusterServiceEnd{ - cc: cc, - serviceOption: &serviceOption{}, + cc: cc, + serviceOption: &serviceOption{ + readBufferSize: 1024, + writeBufferSize: 1024, + }, rpcs: map[string]geminio.RPC{}, topics: mapset.NewSet[string](), edgefrontiers: mapmap.NewBiMap(), diff --git a/api/dataplane/v1/service/service_end.go b/api/dataplane/v1/service/service_end.go index e97e071..a8756d3 100644 --- a/api/dataplane/v1/service/service_end.go +++ b/api/dataplane/v1/service/service_end.go @@ -67,8 +67,8 @@ func newServiceEnd(dialer client.Dialer, opts ...ServiceOption) (*serviceEnd, er func newRetryServiceEnd(dialer client.Dialer, opts ...ServiceOption) (*serviceEnd, error) { // options sopt := &serviceOption{ - readBufferSize: -1, - writeBufferSize: -1, + readBufferSize: 1024, + writeBufferSize: 1024, } for _, opt := range opts { opt(sopt) diff --git a/examples/iclm/edge/edge.go b/examples/iclm/edge/edge.go index 1447e3f..cc610ef 100644 --- a/examples/iclm/edge/edge.go +++ b/examples/iclm/edge/edge.go @@ -59,7 +59,8 @@ func main() { // get edge cli, err := edge.NewEdge(dialer, - edge.OptionEdgeLog(armlog.DefaultLog), edge.OptionEdgeMeta([]byte(*meta))) + edge.OptionEdgeLog(armlog.DefaultLog), + edge.OptionEdgeMeta([]byte(*meta))) if err != nil { armlog.Info("new edge err:", err) return diff --git a/examples/iclm/service/service.go b/examples/iclm/service/service.go index 9ab8336..6845e14 100644 --- a/examples/iclm/service/service.go +++ b/examples/iclm/service/service.go @@ -102,6 +102,7 @@ func main() { methods := pflag.String("methods", "", "method name, support echo") printmessage = pflag.Bool("printmessage", false, "whether print message out") nostdin = pflag.Bool("nostdin", false, "nostdin mode, no stdin will be accepted") + buffersize := pflag.Int("buffer", 8192, "buffer size set for service") stats := pflag.Bool("stats", false, "print statistics or not") pflag.Parse() @@ -121,7 +122,7 @@ func main() { opt := []service.ServiceOption{ service.OptionServiceLog(armlog.DefaultLog), service.OptionServiceName(*serviceName), - service.OptionServiceBufferSize(8192, 8192)} + service.OptionServiceBufferSize(*buffersize, *buffersize)} if *topics != "" { topicSlice = strings.Split(*topics, ",") opt = append(opt, service.OptionServiceReceiveTopics(topicSlice)) diff --git a/go.mod b/go.mod index abb6c99..ed7c28a 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,7 @@ module github.com/singchia/frontier +replace "github.com/singchia/geminio" => ../geminio + go 1.22 require ( diff --git a/pkg/frontier/controlplane/service/edge_service.go b/pkg/frontier/controlplane/service/edge_service.go index 4536440..46cb40a 100644 --- a/pkg/frontier/controlplane/service/edge_service.go +++ b/pkg/frontier/controlplane/service/edge_service.go @@ -4,6 +4,7 @@ import ( "context" v1 "github.com/singchia/frontier/api/controlplane/frontier/v1" + "github.com/singchia/frontier/pkg/frontier/repo/dao/membuntdb" "github.com/singchia/frontier/pkg/frontier/repo/model" "github.com/singchia/frontier/pkg/frontier/repo/query" ) @@ -50,7 +51,11 @@ func (cps *ControlPlaneService) listEdges(_ context.Context, req *v1.ListEdgesRe } count, err := cps.repo.CountEdges(query) if err != nil { - return nil, err + if err != membuntdb.ErrUnimplemented { + return nil, err + } else { + count = -1 + } } retEdges := transferEdges(edges) return &v1.ListEdgesResponse{ @@ -112,7 +117,11 @@ func (cps *ControlPlaneService) listEdgeRPCs(_ context.Context, req *v1.ListEdge } count, err := cps.repo.CountEdgeRPCs(query) if err != nil { - return nil, err + if err != membuntdb.ErrUnimplemented { + return nil, err + } else { + count = -1 + } } return &v1.ListEdgeRPCsResponse{ Rpcs: rpcs, diff --git a/pkg/frontier/edgebound/edge_manager.go b/pkg/frontier/edgebound/edge_manager.go index e4dad4a..8e746b7 100644 --- a/pkg/frontier/edgebound/edge_manager.go +++ b/pkg/frontier/edgebound/edge_manager.go @@ -187,7 +187,6 @@ func (em *edgeManager) ListStreams(edgeID uint64) []geminio.Stream { } func (em *edgeManager) DelEdgeByID(edgeID uint64) error { - // TODO test it em.mtx.RLock() defer em.mtx.RUnlock() diff --git a/pkg/frontier/edgebound/edge_onoff.go b/pkg/frontier/edgebound/edge_onoff.go index 3592967..6ea0545 100644 --- a/pkg/frontier/edgebound/edge_onoff.go +++ b/pkg/frontier/edgebound/edge_onoff.go @@ -68,11 +68,11 @@ func (em *edgeManager) online(end geminio.End) error { // inform others if em.informer != nil { - em.informer.EdgeOnline(end.ClientID(), end.Meta(), end.Addr()) + em.informer.EdgeOnline(end.ClientID(), end.Meta(), end.RemoteAddr()) } // exchange to service if em.exchange != nil { - err := em.exchange.EdgeOnline(end.ClientID(), end.Meta(), end.Addr()) + err := em.exchange.EdgeOnline(end.ClientID(), end.Meta(), end.RemoteAddr()) if err == apis.ErrServiceNotOnline { return nil } diff --git a/test/batch/edges/edges.go b/test/batch/edges/edges.go index f281d2d..97fd6e1 100644 --- a/test/batch/edges/edges.go +++ b/test/batch/edges/edges.go @@ -8,9 +8,9 @@ import ( "net/http" _ "net/http/pprof" "os" - "strconv" "strings" "sync" + "sync/atomic" "time" armlog "github.com/jumboframes/armorigo/log" @@ -32,7 +32,8 @@ func main() { count := pflag.Int("count", 10000, "edges to dial") topic := pflag.String("topic", "test", "topic to specific") nseconds := pflag.Int("nseconds", 10, "publish message every n seconds for every edge") - sourceIPs := pflag.String("source_ips", "", "source ips to dial, if your ") + msg := pflag.String("msg", "testtesttest", "the message content to publish") + sourceIPs := pflag.String("source_ips", "", "source ips to dial, if you want dial more than \"65535\" source ports") pprof := pflag.String("pprof", "", "pprof addr to listen") pflag.Parse() @@ -43,7 +44,8 @@ func main() { } ips := []string{} - idx := 0 + idx := int64(0) + if *sourceIPs != "" { ips = strings.Split(*sourceIPs, ",") idx = 0 @@ -52,9 +54,11 @@ func main() { dialer := func() (net.Conn, error) { if len(ips) != 0 { - for retry := 0; retry < 2; retry++ { + for retry := 0; retry < 3; retry++ { + thisidx := atomic.LoadInt64(&idx) + ip := ips[int(thisidx)%len(ips)] localAddr := &net.TCPAddr{ - IP: net.ParseIP(ips[idx]), + IP: net.ParseIP(ip), } dialer := &net.Dialer{ LocalAddr: localAddr, @@ -66,14 +70,10 @@ func main() { } if strings.Contains(err.Error(), "cannot assign requested address") || strings.Contains(err.Error(), "address already in use") { - fmt.Println("source ip:", localAddr.IP.String(), localAddr.Port, err) - idx += 1 - if idx >= len(ips) { - return nil, err - } + atomic.AddInt64(&idx, 1) continue } - fmt.Println("source ip:", localAddr.IP.String(), localAddr.Port) + fmt.Printf("unknown dial err: %s, source ip: %s:%d \n", err, localAddr.IP.String(), localAddr.Port) return nil, err } } @@ -96,7 +96,11 @@ func main() { go func(i int) { defer wg.Done() // avoid congestion of connection - random := rand.Intn(*count/100) + 1 + n := *count / 100 + if n == 0 { + n = 1 + } + random := rand.Intn(n) + 1 time.Sleep(time.Second * time.Duration(random)) // new edge connection cli, err := edge.NewEdge(dialer, @@ -112,9 +116,8 @@ func main() { mtx.Unlock() // publish message in loop for { - str := strconv.FormatInt(int64(i), 10) - msg := cli.NewMessage([]byte(str)) - err := cli.Publish(context.TODO(), *topic, msg) + gmsg := cli.NewMessage([]byte(*msg)) + err := cli.Publish(context.TODO(), *topic, gmsg) if err != nil { fmt.Println("publish err", err) break