diff --git a/go.mod b/go.mod index 5dde13d..f77fdc4 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/nats-io/nats.go v1.33.1 github.com/nsqio/go-nsq v1.1.0 github.com/rabbitmq/amqp091-go v1.9.0 - github.com/singchia/geminio v1.1.6-rc.1 + github.com/singchia/geminio v1.1.6-rc.2 github.com/singchia/go-timer/v2 v2.2.1 github.com/soheilhy/cmux v0.1.5 github.com/spf13/pflag v1.0.5 diff --git a/go.sum b/go.sum index 4037ba6..a9b1dac 100644 --- a/go.sum +++ b/go.sum @@ -133,8 +133,8 @@ github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqn github.com/redis/go-redis/v9 v9.5.1 h1:H1X4D3yHPaYrkL5X06Wh6xNVM/pX0Ft4RV0vMGvLBh8= github.com/redis/go-redis/v9 v9.5.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= -github.com/singchia/geminio v1.1.6-rc.1 h1:CJzpReZM01ePnRxtzEn5Y87QQcx079Mbyj50rojK24A= -github.com/singchia/geminio v1.1.6-rc.1/go.mod h1:CmDttmY18CGhpmeiVeGEBLXYKHIOoN3MSaEDMzfSOXA= +github.com/singchia/geminio v1.1.6-rc.2 h1:MElZfy8nTCDG6a4nlhJYd7vbeuUUn3+0hCif5iwXfO8= +github.com/singchia/geminio v1.1.6-rc.2/go.mod h1:CmDttmY18CGhpmeiVeGEBLXYKHIOoN3MSaEDMzfSOXA= github.com/singchia/go-timer/v2 v2.0.3/go.mod h1:PgkEQc6io8slCUiT5rHzWKU4/P2HXHWk3WWfijZXAf4= github.com/singchia/go-timer/v2 v2.2.1 h1:gJucmL99fkuNzGk2AfNPFpa1X3/4+aGO21KkjFAG624= github.com/singchia/go-timer/v2 v2.2.1/go.mod h1:PgkEQc6io8slCUiT5rHzWKU4/P2HXHWk3WWfijZXAf4= diff --git a/pkg/frontier/apis/interface.go b/pkg/frontier/apis/interface.go index 575f87c..31743e4 100644 --- a/pkg/frontier/apis/interface.go +++ b/pkg/frontier/apis/interface.go @@ -3,8 +3,8 @@ package apis import ( "net" - "github.com/singchia/frontier/pkg/frontier/repo/dao" "github.com/singchia/frontier/pkg/frontier/repo/model" + "github.com/singchia/frontier/pkg/frontier/repo/query" "github.com/singchia/geminio" ) @@ -71,19 +71,19 @@ type ServiceInformer interface { // repo type Repo interface { Close() error - CountEdgeRPCs(query *dao.EdgeRPCQuery) (int64, error) - CountEdges(query *dao.EdgeQuery) (int64, error) - CountServiceRPCs(query *dao.ServiceRPCQuery) (int64, error) - CountServiceTopics(query *dao.ServiceTopicQuery) (int64, error) - CountServices(query *dao.ServiceQuery) (int64, error) + CountEdgeRPCs(query *query.EdgeRPCQuery) (int64, error) + CountEdges(query *query.EdgeQuery) (int64, error) + CountServiceRPCs(query *query.ServiceRPCQuery) (int64, error) + CountServiceTopics(query *query.ServiceTopicQuery) (int64, error) + CountServices(query *query.ServiceQuery) (int64, error) CreateEdge(edge *model.Edge) error CreateEdgeRPC(rpc *model.EdgeRPC) error CreateService(service *model.Service) error CreateServiceRPC(rpc *model.ServiceRPC) error CreateServiceTopic(topic *model.ServiceTopic) error - DeleteEdge(delete *dao.EdgeDelete) error + DeleteEdge(delete *query.EdgeDelete) error DeleteEdgeRPCs(edgeID uint64) error - DeleteService(delete *dao.ServiceDelete) error + DeleteService(delete *query.ServiceDelete) error DeleteServiceRPCs(serviceID uint64) error DeleteServiceTopics(serviceID uint64) error GetEdge(edgeID uint64) (*model.Edge, error) @@ -91,11 +91,11 @@ type Repo interface { GetServiceByName(name string) (*model.Service, error) GetServiceRPC(rpc string) (*model.ServiceRPC, error) GetServiceTopic(topic string) (*model.ServiceTopic, error) - ListEdgeRPCs(query *dao.EdgeRPCQuery) ([]string, error) - ListEdges(query *dao.EdgeQuery) ([]*model.Edge, error) - ListServiceRPCs(query *dao.ServiceRPCQuery) ([]string, error) - ListServiceTopics(query *dao.ServiceTopicQuery) ([]string, error) - ListServices(query *dao.ServiceQuery) ([]*model.Service, error) + ListEdgeRPCs(query *query.EdgeRPCQuery) ([]string, error) + ListEdges(query *query.EdgeQuery) ([]*model.Edge, error) + ListServiceRPCs(query *query.ServiceRPCQuery) ([]string, error) + ListServiceTopics(query *query.ServiceTopicQuery) ([]string, error) + ListServices(query *query.ServiceQuery) ([]*model.Service, error) } // mq manager and mq related diff --git a/pkg/frontier/controlplane/service/edge_service.go b/pkg/frontier/controlplane/service/edge_service.go index 241d0b0..7302203 100644 --- a/pkg/frontier/controlplane/service/edge_service.go +++ b/pkg/frontier/controlplane/service/edge_service.go @@ -4,12 +4,12 @@ import ( "context" v1 "github.com/singchia/frontier/api/controlplane/frontier/v1" - "github.com/singchia/frontier/pkg/frontier/repo/dao" "github.com/singchia/frontier/pkg/frontier/repo/model" + "github.com/singchia/frontier/pkg/frontier/repo/query" ) func (cps *ControlPlaneService) listEdges(_ context.Context, req *v1.ListEdgesRequest) (*v1.ListEdgesResponse, error) { - query := &dao.EdgeQuery{} + query := &query.EdgeQuery{} // conditions if req.Meta != nil { query.Meta = *req.Meta @@ -79,7 +79,7 @@ func (cps *ControlPlaneService) kickEdge(_ context.Context, req *v1.KickEdgeRequ } func (cps *ControlPlaneService) listEdgeRPCs(_ context.Context, req *v1.ListEdgeRPCsRequest) (*v1.ListEdgeRPCsResponse, error) { - query := &dao.EdgeRPCQuery{} + query := &query.EdgeRPCQuery{} // conditions if req.EdgeId != nil { query.EdgeID = *req.EdgeId diff --git a/pkg/frontier/controlplane/service/service_service.go b/pkg/frontier/controlplane/service/service_service.go index b52e21f..a7646b7 100644 --- a/pkg/frontier/controlplane/service/service_service.go +++ b/pkg/frontier/controlplane/service/service_service.go @@ -4,12 +4,12 @@ import ( "context" v1 "github.com/singchia/frontier/api/controlplane/frontier/v1" - "github.com/singchia/frontier/pkg/frontier/repo/dao" "github.com/singchia/frontier/pkg/frontier/repo/model" + "github.com/singchia/frontier/pkg/frontier/repo/query" ) func (cps *ControlPlaneService) listServices(_ context.Context, req *v1.ListServicesRequest) (*v1.ListServicesResponse, error) { - query := &dao.ServiceQuery{} + query := &query.ServiceQuery{} // conditions if req.Service != nil { query.Service = *req.Service @@ -82,7 +82,7 @@ func (cps *ControlPlaneService) kickService(_ context.Context, req *v1.KickServi } func (cps *ControlPlaneService) listServiceRPCs(_ context.Context, req *v1.ListServiceRPCsRequest) (*v1.ListServiceRPCsResponse, error) { - query := &dao.ServiceRPCQuery{} + query := &query.ServiceRPCQuery{} // conditions if req.ServiceId != nil { query.ServiceID = *req.ServiceId @@ -129,7 +129,7 @@ func (cps *ControlPlaneService) listServiceRPCs(_ context.Context, req *v1.ListS } func (cps *ControlPlaneService) listServiceTopics(_ context.Context, req *v1.ListServiceTopicsRequest) (*v1.ListServiceTopicsResponse, error) { - query := &dao.ServiceTopicQuery{} + query := &query.ServiceTopicQuery{} // conditions if req.ServiceId != nil { query.ServiceID = *req.ServiceId diff --git a/pkg/frontier/edgebound/edge_manager.go b/pkg/frontier/edgebound/edge_manager.go index f223a70..4937300 100644 --- a/pkg/frontier/edgebound/edge_manager.go +++ b/pkg/frontier/edgebound/edge_manager.go @@ -11,6 +11,7 @@ import ( "github.com/jumboframes/armorigo/synchub" "github.com/singchia/frontier/pkg/frontier/apis" "github.com/singchia/frontier/pkg/frontier/config" + "github.com/singchia/frontier/pkg/frontier/misc" "github.com/singchia/frontier/pkg/mapmap" "github.com/singchia/frontier/pkg/utils" "github.com/singchia/geminio" @@ -75,6 +76,9 @@ func newEdgeManager(conf *config.Configuration, repo apis.Repo, informer apis.Ed informer: informer, exchange: exchange, } + if misc.IsNil(informer) { + em.informer = nil + } exchange.AddEdgebound(em) ln, err := utils.Listen(listen) diff --git a/pkg/frontier/edgebound/edge_onoff.go b/pkg/frontier/edgebound/edge_onoff.go index c3a797d..b6d008d 100644 --- a/pkg/frontier/edgebound/edge_onoff.go +++ b/pkg/frontier/edgebound/edge_onoff.go @@ -7,8 +7,8 @@ import ( "github.com/jumboframes/armorigo/synchub" "github.com/singchia/frontier/pkg/frontier/apis" - "github.com/singchia/frontier/pkg/frontier/repo/dao" "github.com/singchia/frontier/pkg/frontier/repo/model" + "github.com/singchia/frontier/pkg/frontier/repo/query" "github.com/singchia/geminio" "github.com/singchia/geminio/delegate" "k8s.io/klog/v2" @@ -28,6 +28,7 @@ func (em *edgeManager) online(end geminio.End) error { sync = em.shub.Add(syncKey) if err := oldend.Close(); err != nil { klog.Warningf("edge online, kick off old end err: %s, edgeID: %d", err, end.ClientID()) + em.shub.Cancel(syncKey, false) } } em.edges[end.ClientID()] = end @@ -84,7 +85,7 @@ func (em *edgeManager) offline(edgeID uint64, addr net.Addr) error { }() // memdb - if err := em.repo.DeleteEdge(&dao.EdgeDelete{ + if err := em.repo.DeleteEdge(&query.EdgeDelete{ EdgeID: edgeID, Addr: addr.String(), }); err != nil { diff --git a/pkg/frontier/exchange/oob.go b/pkg/frontier/exchange/oob.go index 15b6dc8..3feb3a0 100644 --- a/pkg/frontier/exchange/oob.go +++ b/pkg/frontier/exchange/oob.go @@ -5,8 +5,10 @@ import ( "encoding/binary" "encoding/json" "net" + "time" "github.com/singchia/frontier/pkg/frontier/apis" + "github.com/singchia/geminio/options" "k8s.io/klog/v2" ) @@ -22,7 +24,9 @@ func (ex *exchange) GetEdgeID(meta []byte) (uint64, error) { } // call service req := svc.NewRequest(meta) - rsp, err := svc.Call(context.TODO(), apis.RPCGetEdgeID, req) + opt := options.Call() + opt.SetTimeout(30 * time.Second) + rsp, err := svc.Call(context.TODO(), apis.RPCGetEdgeID, req, opt) if err != nil { klog.V(2).Infof("exchange call service: %d, get edgeID err: %s, meta: %s", svc.ClientID(), err, meta) return 0, err @@ -57,7 +61,9 @@ func (ex *exchange) EdgeOnline(edgeID uint64, meta []byte, addr net.Addr) error } // call service req := svc.NewRequest(data) - _, err = svc.Call(context.TODO(), apis.RPCEdgeOnline, req) + opt := options.Call() + opt.SetTimeout(30 * time.Second) + _, err = svc.Call(context.TODO(), apis.RPCEdgeOnline, req, opt) if err != nil { klog.V(2).Infof("exchange call service: %d, edge online err: %s, meta: %s, addr: %s", svc.ClientID(), err, meta, addr) return err @@ -88,7 +94,9 @@ func (ex *exchange) EdgeOffline(edgeID uint64, meta []byte, addr net.Addr) error } // call service req := svc.NewRequest(data) - _, err = svc.Call(context.TODO(), apis.RPCEdgeOffline, req) + opt := options.Call() + opt.SetTimeout(30 * time.Second) + _, err = svc.Call(context.TODO(), apis.RPCEdgeOffline, req, opt) if err != nil { klog.V(2).Infof("exchange call service: %d, edge offline err: %s, meta: %s, addr: %s", svc.ClientID(), err, meta, addr) return err diff --git a/pkg/frontier/repo/dao/dao.go b/pkg/frontier/repo/dao/dao.go index 1aa315d..a2911d8 100644 --- a/pkg/frontier/repo/dao/dao.go +++ b/pkg/frontier/repo/dao/dao.go @@ -3,63 +3,46 @@ package dao import ( "github.com/singchia/frontier/pkg/frontier/config" "github.com/singchia/frontier/pkg/frontier/repo/model" - "gorm.io/driver/sqlite" - "gorm.io/gorm" - "k8s.io/klog/v2" + "github.com/singchia/frontier/pkg/frontier/repo/query" ) -type Dao struct { - dbEdge, dbService *gorm.DB - - // config - config config.Configuration +type Dao interface { + Close() error + CountEdgeRPCs(query *query.EdgeRPCQuery) (int64, error) + CountEdges(query *query.EdgeQuery) (int64, error) + CountServiceRPCs(query *query.ServiceRPCQuery) (int64, error) + CountServiceTopics(query *query.ServiceTopicQuery) (int64, error) + CountServices(query *query.ServiceQuery) (int64, error) + CreateEdge(edge *model.Edge) error + CreateEdgeRPC(rpc *model.EdgeRPC) error + CreateService(service *model.Service) error + CreateServiceRPC(rpc *model.ServiceRPC) error + CreateServiceTopic(topic *model.ServiceTopic) error + DeleteEdge(delete *query.EdgeDelete) error + DeleteEdgeRPCs(edgeID uint64) error + DeleteService(delete *query.ServiceDelete) error + DeleteServiceRPCs(serviceID uint64) error + DeleteServiceTopics(serviceID uint64) error + GetEdge(edgeID uint64) (*model.Edge, error) + GetService(serviceID uint64) (*model.Service, error) + GetServiceByName(name string) (*model.Service, error) + GetServiceRPC(rpc string) (*model.ServiceRPC, error) + GetServiceTopic(topic string) (*model.ServiceTopic, error) + ListEdgeRPCs(query *query.EdgeRPCQuery) ([]string, error) + ListEdges(query *query.EdgeQuery) ([]*model.Edge, error) + ListServiceRPCs(query *query.ServiceRPCQuery) ([]string, error) + ListServiceTopics(query *query.ServiceTopicQuery) ([]string, error) + ListServices(query *query.ServiceQuery) ([]*model.Service, error) } -func NewDao(config *config.Configuration) (*Dao, error) { - // we split edge and service sqlite3 memory databases, since the concurrent - // writes perform bad, see https://github.com/mattn/go-sqlite3/issues/274 - - // edget bound models - dbEdge, err := gorm.Open(sqlite.Open("file:edge?mode=memory&cache=shared")) - if err != nil { - klog.Errorf("dao open edge sqlite3 err: %s", err) - return nil, err - } - sqlDB, err := dbEdge.DB() - if err != nil { - klog.Errorf("get edge DB err: %s", err) - return nil, err - } - sqlDB.SetMaxOpenConns(1) - if err = dbEdge.AutoMigrate(&model.Edge{}, &model.EdgeRPC{}); err != nil { - return nil, err - } - - // service bound models - dbService, err := gorm.Open(sqlite.Open("file:service?mode=memory&cache=shared")) - if err != nil { - klog.Errorf("dao open service sqlite3 err: %s", err) - return nil, err - } - sqlDB, err = dbService.DB() - if err != nil { - klog.Errorf("get service DB err: %s", err) - return nil, err - } - sqlDB.SetMaxOpenConns(1) - if err = dbService.AutoMigrate(&model.Service{}, &model.ServiceRPC{}, &model.ServiceTopic{}); err != nil { - return nil, err - } - return &Dao{ - dbEdge: dbEdge, - dbService: dbService, - }, nil +type dao struct { + config *config.Configuration } -func (dao *Dao) Close() error { - sqlDB, err := dao.dbEdge.DB() - if err != nil { - return err - } - return sqlDB.Close() +func NewDao(config *config.Configuration) (Dao, error) { + return nil, nil +} + +func (dao *dao) Close() error { + return nil } diff --git a/pkg/frontier/repo/dao/memsqlite/dao.go b/pkg/frontier/repo/dao/memsqlite/dao.go new file mode 100644 index 0000000..8d19bce --- /dev/null +++ b/pkg/frontier/repo/dao/memsqlite/dao.go @@ -0,0 +1,79 @@ +package memsqlite + +import ( + "github.com/singchia/frontier/pkg/frontier/config" + "github.com/singchia/frontier/pkg/frontier/repo/model" + "gorm.io/driver/sqlite" + "gorm.io/gorm" + "k8s.io/klog/v2" +) + +type dao struct { + dbEdge, dbService *gorm.DB + + // config + config config.Configuration +} + +func NewDao(config *config.Configuration) (*dao, error) { + // we split edge and service sqlite3 memory databases, since the concurrent + // writes perform bad, see https://github.com/mattn/go-sqlite3/issues/274 + + // edget bound models + dbEdge, err := gorm.Open(sqlite.Open("file:edge?mode=memory&cache=shared")) + if err != nil { + klog.Errorf("dao open edge sqlite3 err: %s", err) + return nil, err + } + sqlDB, err := dbEdge.DB() + if err != nil { + klog.Errorf("get edge DB err: %s", err) + return nil, err + } + sqlDB.Exec("PRAGMA synchronous = OFF;") + sqlDB.Exec("PRAGMA journal_mode = DELETE;") + sqlDB.Exec("PRAGMA cache_size = -2000;") // 2MB cache + sqlDB.Exec("PRAGMA temp_store = MEMORY;") + sqlDB.Exec("PRAGMA locking_mode = EXCLUSIVE;") + sqlDB.Exec("PRAGMA mmap_size = 268435456;") // 256MB memory map size + sqlDB.SetMaxOpenConns(0) + if err = dbEdge.AutoMigrate(&model.Edge{}, &model.EdgeRPC{}); err != nil { + return nil, err + } + + // service bound models + dbService, err := gorm.Open(sqlite.Open("file:service?mode=memory&cache=shared"), &gorm.Config{ + PrepareStmt: true, + }) + if err != nil { + klog.Errorf("dao open service sqlite3 err: %s", err) + return nil, err + } + sqlDB, err = dbService.DB() + if err != nil { + klog.Errorf("get service DB err: %s", err) + return nil, err + } + sqlDB.Exec("PRAGMA synchronous = OFF;") + sqlDB.Exec("PRAGMA journal_mode = DELETE;") + sqlDB.Exec("PRAGMA cache_size = -2000;") // 2MB cache + sqlDB.Exec("PRAGMA temp_store = MEMORY;") + sqlDB.Exec("PRAGMA locking_mode = EXCLUSIVE;") + sqlDB.Exec("PRAGMA mmap_size = 268435456;") // 256MB memory map size + sqlDB.SetMaxOpenConns(0) + if err = dbService.AutoMigrate(&model.Service{}, &model.ServiceRPC{}, &model.ServiceTopic{}); err != nil { + return nil, err + } + return &dao{ + dbEdge: dbEdge, + dbService: dbService, + }, nil +} + +func (dao *dao) Close() error { + sqlDB, err := dao.dbEdge.DB() + if err != nil { + return err + } + return sqlDB.Close() +} diff --git a/pkg/frontier/repo/dao/dao_edge.go b/pkg/frontier/repo/dao/memsqlite/dao_edge.go similarity index 78% rename from pkg/frontier/repo/dao/dao_edge.go rename to pkg/frontier/repo/dao/memsqlite/dao_edge.go index 8504e31..f8be86f 100644 --- a/pkg/frontier/repo/dao/dao_edge.go +++ b/pkg/frontier/repo/dao/memsqlite/dao_edge.go @@ -1,21 +1,13 @@ -package dao +package memsqlite import ( "github.com/singchia/frontier/pkg/frontier/repo/model" + "github.com/singchia/frontier/pkg/frontier/repo/query" "gorm.io/gorm" "gorm.io/gorm/clause" ) -type EdgeQuery struct { - Query - // Condition fields - Meta string - Addr string - RPC string - EdgeID uint64 -} - -func (dao *Dao) ListEdges(query *EdgeQuery) ([]*model.Edge, error) { +func (dao *dao) ListEdges(query *query.EdgeQuery) ([]*model.Edge, error) { tx := dao.dbEdge.Model(&model.Edge{}) if dao.config.Dao.Debug { tx = tx.Debug() @@ -46,7 +38,7 @@ func (dao *Dao) ListEdges(query *EdgeQuery) ([]*model.Edge, error) { return edges, tx.Error } -func (dao *Dao) CountEdges(query *EdgeQuery) (int64, error) { +func (dao *dao) CountEdges(query *query.EdgeQuery) (int64, error) { tx := dao.dbEdge.Model(&model.Edge{}) if dao.config.Dao.Debug { tx = tx.Debug() @@ -58,7 +50,7 @@ func (dao *Dao) CountEdges(query *EdgeQuery) (int64, error) { return count, tx.Error } -func (dao *Dao) GetEdge(edgeID uint64) (*model.Edge, error) { +func (dao *dao) GetEdge(edgeID uint64) (*model.Edge, error) { tx := dao.dbEdge.Model(&model.Edge{}) if dao.config.Dao.Debug { tx = tx.Debug() @@ -70,12 +62,7 @@ func (dao *Dao) GetEdge(edgeID uint64) (*model.Edge, error) { return &edge, tx.Error } -type EdgeDelete struct { - EdgeID uint64 - Addr string -} - -func (dao *Dao) DeleteEdge(delete *EdgeDelete) error { +func (dao *dao) DeleteEdge(delete *query.EdgeDelete) error { tx := dao.dbEdge if dao.config.Dao.Debug { tx = tx.Debug() @@ -84,7 +71,7 @@ func (dao *Dao) DeleteEdge(delete *EdgeDelete) error { return tx.Delete(&model.Edge{}).Error } -func (dao *Dao) CreateEdge(edge *model.Edge) error { +func (dao *dao) CreateEdge(edge *model.Edge) error { tx := dao.dbEdge if dao.config.Dao.Debug { tx = tx.Debug() @@ -92,7 +79,7 @@ func (dao *Dao) CreateEdge(edge *model.Edge) error { return tx.Create(edge).Error } -func buildEdgeQuery(tx *gorm.DB, query *EdgeQuery) *gorm.DB { +func buildEdgeQuery(tx *gorm.DB, query *query.EdgeQuery) *gorm.DB { // join if query.RPC != "" { tx = tx.InnerJoins("INNER JOIN edge_rpcs ON edges.edge_id = edge_rpcs.edge_id AND service_rpcs.rpc = ?", query.RPC) @@ -115,7 +102,7 @@ func buildEdgeQuery(tx *gorm.DB, query *EdgeQuery) *gorm.DB { return tx } -func buildEdgeDelete(tx *gorm.DB, delete *EdgeDelete) *gorm.DB { +func buildEdgeDelete(tx *gorm.DB, delete *query.EdgeDelete) *gorm.DB { if delete.EdgeID != 0 { tx = tx.Where("edge_id = ?", delete.EdgeID) } @@ -125,14 +112,7 @@ func buildEdgeDelete(tx *gorm.DB, delete *EdgeDelete) *gorm.DB { return tx } -type EdgeRPCQuery struct { - Query - // Condition fields - Meta string - EdgeID uint64 -} - -func (dao *Dao) ListEdgeRPCs(query *EdgeRPCQuery) ([]string, error) { +func (dao *dao) ListEdgeRPCs(query *query.EdgeRPCQuery) ([]string, error) { tx := dao.dbEdge.Model(&model.EdgeRPC{}) if dao.config.Dao.Debug { tx = tx.Debug() @@ -163,7 +143,7 @@ func (dao *Dao) ListEdgeRPCs(query *EdgeRPCQuery) ([]string, error) { return rpcs, tx.Error } -func (dao *Dao) CountEdgeRPCs(query *EdgeRPCQuery) (int64, error) { +func (dao *dao) CountEdgeRPCs(query *query.EdgeRPCQuery) (int64, error) { tx := dao.dbEdge.Model(&model.EdgeRPC{}) if dao.config.Dao.Debug { tx = tx.Debug() @@ -176,7 +156,7 @@ func (dao *Dao) CountEdgeRPCs(query *EdgeRPCQuery) (int64, error) { return count, tx.Error } -func (dao *Dao) DeleteEdgeRPCs(edgeID uint64) error { +func (dao *dao) DeleteEdgeRPCs(edgeID uint64) error { tx := dao.dbEdge.Where("edge_id = ?", edgeID) if dao.config.Dao.Debug { tx = tx.Debug() @@ -184,7 +164,7 @@ func (dao *Dao) DeleteEdgeRPCs(edgeID uint64) error { return tx.Delete(&model.EdgeRPC{}).Error } -func (dao *Dao) CreateEdgeRPC(rpc *model.EdgeRPC) error { +func (dao *dao) CreateEdgeRPC(rpc *model.EdgeRPC) error { tx := dao.dbEdge if dao.config.Dao.Debug { tx = tx.Debug() @@ -192,7 +172,7 @@ func (dao *Dao) CreateEdgeRPC(rpc *model.EdgeRPC) error { return tx.Create(rpc).Error } -func buildEdgeRPCQuery(tx *gorm.DB, query *EdgeRPCQuery) *gorm.DB { +func buildEdgeRPCQuery(tx *gorm.DB, query *query.EdgeRPCQuery) *gorm.DB { // join if query.Meta != "" { tx = tx.InnerJoins("INNER JOIN edges ON edges.edge_id = edge_rpcs.edge_id AND meta LIKE ?", "%"+query.Meta+"%") diff --git a/pkg/frontier/repo/dao/dao_edge_test.go b/pkg/frontier/repo/dao/memsqlite/dao_edge_test.go similarity index 94% rename from pkg/frontier/repo/dao/dao_edge_test.go rename to pkg/frontier/repo/dao/memsqlite/dao_edge_test.go index 549190a..bc5a501 100644 --- a/pkg/frontier/repo/dao/dao_edge_test.go +++ b/pkg/frontier/repo/dao/memsqlite/dao_edge_test.go @@ -1,4 +1,4 @@ -package dao +package memsqlite import ( "encoding/json" @@ -9,6 +9,7 @@ import ( "github.com/singchia/frontier/pkg/frontier/config" "github.com/singchia/frontier/pkg/frontier/repo/model" + "github.com/singchia/frontier/pkg/frontier/repo/query" ) func TestCreateEdge(t *testing.T) { @@ -56,7 +57,7 @@ func TestCountEdges(t *testing.T) { } } - c, err := dao.CountEdges(&EdgeQuery{ + c, err := dao.CountEdges(&query.EdgeQuery{ Meta: "test", }) if err != nil { @@ -182,9 +183,9 @@ func BenchmarkListEdges(b *testing.B) { for p.Next() { pageSize := 10 page := rand.Intn(count/pageSize) + 1 - clients, err := dao.ListEdges(&EdgeQuery{ + clients, err := dao.ListEdges(&query.EdgeQuery{ Meta: "test", - Query: Query{ + Query: query.Query{ PageSize: pageSize, Page: page, }, @@ -231,7 +232,7 @@ func BenchmarkDeleteEdge(b *testing.B) { b.RunParallel(func(p *testing.PB) { for p.Next() { new := atomic.AddUint64(&index, 1) - err := dao.DeleteEdge(&EdgeDelete{EdgeID: new}) + err := dao.DeleteEdge(&query.EdgeDelete{EdgeID: new}) if err != nil { b.Error(err) return diff --git a/pkg/frontier/repo/dao/dao_service.go b/pkg/frontier/repo/dao/memsqlite/dao_service.go similarity index 80% rename from pkg/frontier/repo/dao/dao_service.go rename to pkg/frontier/repo/dao/memsqlite/dao_service.go index 9aacb2b..a659e9c 100644 --- a/pkg/frontier/repo/dao/dao_service.go +++ b/pkg/frontier/repo/dao/memsqlite/dao_service.go @@ -1,23 +1,14 @@ -package dao +package memsqlite import ( "github.com/singchia/frontier/pkg/frontier/repo/model" + "github.com/singchia/frontier/pkg/frontier/repo/query" "gorm.io/gorm" "gorm.io/gorm/clause" ) -type ServiceQuery struct { - Query - // Condition fields - Service string - Addr string - RPC string - Topic string - ServiceID uint64 -} - // service -func (dao *Dao) ListServices(query *ServiceQuery) ([]*model.Service, error) { +func (dao *dao) ListServices(query *query.ServiceQuery) ([]*model.Service, error) { tx := dao.dbService.Model(&model.Service{}) if dao.config.Dao.Debug { tx = tx.Debug() @@ -48,7 +39,7 @@ func (dao *Dao) ListServices(query *ServiceQuery) ([]*model.Service, error) { return services, tx.Error } -func (dao *Dao) CountServices(query *ServiceQuery) (int64, error) { +func (dao *dao) CountServices(query *query.ServiceQuery) (int64, error) { tx := dao.dbService.Model(&model.Service{}) if dao.config.Dao.Debug { tx = tx.Debug() @@ -60,7 +51,7 @@ func (dao *Dao) CountServices(query *ServiceQuery) (int64, error) { return count, tx.Error } -func (dao *Dao) GetService(serviceID uint64) (*model.Service, error) { +func (dao *dao) GetService(serviceID uint64) (*model.Service, error) { tx := dao.dbService.Model(&model.Service{}) if dao.config.Dao.Debug { tx = tx.Debug() @@ -75,7 +66,7 @@ func (dao *Dao) GetService(serviceID uint64) (*model.Service, error) { return &service, tx.Error } -func (dao *Dao) GetServiceByName(name string) (*model.Service, error) { +func (dao *dao) GetServiceByName(name string) (*model.Service, error) { tx := dao.dbService.Model(&model.Service{}) if dao.config.Dao.Debug { tx = tx.Debug() @@ -95,7 +86,7 @@ type ServiceDelete struct { Addr string } -func (dao *Dao) DeleteService(delete *ServiceDelete) error { +func (dao *dao) DeleteService(delete *query.ServiceDelete) error { tx := dao.dbService if dao.config.Dao.Debug { tx = tx.Debug() @@ -104,7 +95,7 @@ func (dao *Dao) DeleteService(delete *ServiceDelete) error { return tx.Delete(&model.Service{}).Error } -func (dao *Dao) CreateService(service *model.Service) error { +func (dao *dao) CreateService(service *model.Service) error { var tx *gorm.DB if dao.config.Dao.Debug { tx = tx.Debug() @@ -113,7 +104,7 @@ func (dao *Dao) CreateService(service *model.Service) error { return tx.Error } -func buildServiceQuery(tx *gorm.DB, query *ServiceQuery) *gorm.DB { +func buildServiceQuery(tx *gorm.DB, query *query.ServiceQuery) *gorm.DB { // join if query.RPC != "" { tx = tx.InnerJoins("INNER JOIN service_rpcs ON services.service_id = service_rpcs.service_id AND service_rpcs.rpc = ?", query.RPC) @@ -140,7 +131,7 @@ func buildServiceQuery(tx *gorm.DB, query *ServiceQuery) *gorm.DB { return tx } -func buildServiceDelete(tx *gorm.DB, delete *ServiceDelete) *gorm.DB { +func buildServiceDelete(tx *gorm.DB, delete *query.ServiceDelete) *gorm.DB { if delete.ServiceID != 0 { tx = tx.Where("service_id = ?", delete.ServiceID) } @@ -151,14 +142,8 @@ func buildServiceDelete(tx *gorm.DB, delete *ServiceDelete) *gorm.DB { } // service rpc -type ServiceRPCQuery struct { - Query - // Condition fields - Service string - ServiceID uint64 -} -func (dao *Dao) GetServiceRPC(rpc string) (*model.ServiceRPC, error) { +func (dao *dao) GetServiceRPC(rpc string) (*model.ServiceRPC, error) { tx := dao.dbService.Model(&model.ServiceRPC{}) if dao.config.Dao.Debug { tx = tx.Debug() @@ -175,7 +160,7 @@ func (dao *Dao) GetServiceRPC(rpc string) (*model.ServiceRPC, error) { return &mrpc, tx.Error } -func (dao *Dao) ListServiceRPCs(query *ServiceRPCQuery) ([]string, error) { +func (dao *dao) ListServiceRPCs(query *query.ServiceRPCQuery) ([]string, error) { tx := dao.dbService.Model(&model.ServiceRPC{}) if dao.config.Dao.Debug { tx = tx.Debug() @@ -205,7 +190,7 @@ func (dao *Dao) ListServiceRPCs(query *ServiceRPCQuery) ([]string, error) { return rpcs, tx.Error } -func (dao *Dao) CountServiceRPCs(query *ServiceRPCQuery) (int64, error) { +func (dao *dao) CountServiceRPCs(query *query.ServiceRPCQuery) (int64, error) { tx := dao.dbService.Model(&model.ServiceRPC{}) if dao.config.Dao.Debug { tx = tx.Debug() @@ -217,7 +202,7 @@ func (dao *Dao) CountServiceRPCs(query *ServiceRPCQuery) (int64, error) { return count, tx.Error } -func (dao *Dao) DeleteServiceRPCs(serviceID uint64) error { +func (dao *dao) DeleteServiceRPCs(serviceID uint64) error { tx := dao.dbService.Where("service_id = ?", serviceID) if dao.config.Dao.Debug { tx = tx.Debug() @@ -225,7 +210,7 @@ func (dao *Dao) DeleteServiceRPCs(serviceID uint64) error { return tx.Delete(&model.ServiceRPC{}).Error } -func (dao *Dao) CreateServiceRPC(rpc *model.ServiceRPC) error { +func (dao *dao) CreateServiceRPC(rpc *model.ServiceRPC) error { tx := dao.dbService if dao.config.Dao.Debug { tx = tx.Debug() @@ -233,7 +218,7 @@ func (dao *Dao) CreateServiceRPC(rpc *model.ServiceRPC) error { return tx.Create(rpc).Error } -func buildServiceRPCQuery(tx *gorm.DB, query *ServiceRPCQuery) *gorm.DB { +func buildServiceRPCQuery(tx *gorm.DB, query *query.ServiceRPCQuery) *gorm.DB { // join and search if query.Service != "" { tx = tx.InnerJoins("INNER JOIN services ON services.service_id = service_rpcs.service_id AND service LIKE ?", "%"+query.Service+"%") @@ -250,14 +235,8 @@ func buildServiceRPCQuery(tx *gorm.DB, query *ServiceRPCQuery) *gorm.DB { } // service topic -type ServiceTopicQuery struct { - Query - // Condition fields - Service string - ServiceID uint64 -} -func (dao *Dao) GetServiceTopic(topic string) (*model.ServiceTopic, error) { +func (dao *dao) GetServiceTopic(topic string) (*model.ServiceTopic, error) { tx := dao.dbService.Model(&model.ServiceTopic{}) if dao.config.Dao.Debug { tx = tx.Debug() @@ -272,7 +251,7 @@ func (dao *Dao) GetServiceTopic(topic string) (*model.ServiceTopic, error) { return &mtopic, tx.Error } -func (dao *Dao) ListServiceTopics(query *ServiceTopicQuery) ([]string, error) { +func (dao *dao) ListServiceTopics(query *query.ServiceTopicQuery) ([]string, error) { tx := dao.dbService.Model(&model.ServiceTopic{}) if dao.config.Dao.Debug { tx = tx.Debug() @@ -303,7 +282,7 @@ func (dao *Dao) ListServiceTopics(query *ServiceTopicQuery) ([]string, error) { return topics, tx.Error } -func (dao *Dao) CountServiceTopics(query *ServiceTopicQuery) (int64, error) { +func (dao *dao) CountServiceTopics(query *query.ServiceTopicQuery) (int64, error) { tx := dao.dbService.Model(&model.ServiceTopic{}) if dao.config.Dao.Debug { tx = tx.Debug() @@ -315,7 +294,7 @@ func (dao *Dao) CountServiceTopics(query *ServiceTopicQuery) (int64, error) { return count, tx.Error } -func (dao *Dao) DeleteServiceTopics(serviceID uint64) error { +func (dao *dao) DeleteServiceTopics(serviceID uint64) error { tx := dao.dbService.Where("service_id = ?", serviceID) if dao.config.Dao.Debug { tx = tx.Debug() @@ -323,7 +302,7 @@ func (dao *Dao) DeleteServiceTopics(serviceID uint64) error { return tx.Delete(&model.ServiceTopic{}).Error } -func (dao *Dao) CreateServiceTopic(topic *model.ServiceTopic) error { +func (dao *dao) CreateServiceTopic(topic *model.ServiceTopic) error { tx := dao.dbService if dao.config.Dao.Debug { tx = tx.Debug() @@ -331,7 +310,7 @@ func (dao *Dao) CreateServiceTopic(topic *model.ServiceTopic) error { return tx.Create(topic).Error } -func buildServiceTopicQuery(tx *gorm.DB, query *ServiceTopicQuery) *gorm.DB { +func buildServiceTopicQuery(tx *gorm.DB, query *query.ServiceTopicQuery) *gorm.DB { // join and search if query.Service != "" { tx = tx.InnerJoins("INNER JOIN services ON services.service_id = service_topics.service_id AND service LIKE ?", "%"+query.Service+"%") diff --git a/pkg/frontier/repo/dao/dao_service_test.go b/pkg/frontier/repo/dao/memsqlite/dao_service_test.go similarity index 91% rename from pkg/frontier/repo/dao/dao_service_test.go rename to pkg/frontier/repo/dao/memsqlite/dao_service_test.go index fff4b50..60d9237 100644 --- a/pkg/frontier/repo/dao/dao_service_test.go +++ b/pkg/frontier/repo/dao/memsqlite/dao_service_test.go @@ -1,4 +1,4 @@ -package dao +package memsqlite import ( "sync/atomic" @@ -7,6 +7,7 @@ import ( "github.com/singchia/frontier/pkg/frontier/config" "github.com/singchia/frontier/pkg/frontier/repo/model" + "github.com/singchia/frontier/pkg/frontier/repo/query" ) func TestListServices(t *testing.T) { @@ -64,7 +65,7 @@ func TestListServices(t *testing.T) { } // list - services, err := dao.ListServices(&ServiceQuery{ + services, err := dao.ListServices(&query.ServiceQuery{ RPC: "foo", Topic: "bar", }) diff --git a/pkg/frontier/repo/dao/utils.go b/pkg/frontier/repo/dao/utils.go deleted file mode 100644 index 7d420d1..0000000 --- a/pkg/frontier/repo/dao/utils.go +++ /dev/null @@ -1,11 +0,0 @@ -package dao - -type Query struct { - // Pagination - Page, PageSize int - // Time range - StartTime, EndTime int64 - // Order - Order string - Desc bool -} diff --git a/pkg/frontier/repo/query/query.go b/pkg/frontier/repo/query/query.go new file mode 100644 index 0000000..079b212 --- /dev/null +++ b/pkg/frontier/repo/query/query.go @@ -0,0 +1,61 @@ +package query + +type Query struct { + // Pagination + Page, PageSize int + // Time range + StartTime, EndTime int64 + // Order + Order string + Desc bool +} + +type EdgeQuery struct { + Query + // Condition fields + Meta string + Addr string + RPC string + EdgeID uint64 +} + +type EdgeRPCQuery struct { + Query + // Condition fields + Meta string + EdgeID uint64 +} + +type EdgeDelete struct { + EdgeID uint64 + Addr string +} + +type ServiceQuery struct { + Query + // Condition fields + Service string + Addr string + RPC string + Topic string + ServiceID uint64 +} + +type ServiceDelete struct { + ServiceID uint64 + Addr string +} + +type ServiceRPCQuery struct { + Query + // Condition fields + Service string + ServiceID uint64 +} + +type ServiceTopicQuery struct { + Query + // Condition fields + Service string + ServiceID uint64 +} diff --git a/pkg/frontier/servicebound/service_manager.go b/pkg/frontier/servicebound/service_manager.go index 1ac6181..8f16ea9 100644 --- a/pkg/frontier/servicebound/service_manager.go +++ b/pkg/frontier/servicebound/service_manager.go @@ -11,6 +11,7 @@ import ( "github.com/jumboframes/armorigo/synchub" "github.com/singchia/frontier/pkg/frontier/apis" "github.com/singchia/frontier/pkg/frontier/config" + "github.com/singchia/frontier/pkg/frontier/misc" "github.com/singchia/frontier/pkg/frontier/repo/model" "github.com/singchia/frontier/pkg/mapmap" "github.com/singchia/frontier/pkg/utils" @@ -78,6 +79,9 @@ func newServiceManager(conf *config.Configuration, repo apis.Repo, informer apis exchange: exchange, mqm: mqm, } + if misc.IsNil(informer) { + sm.informer = nil + } exchange.AddServicebound(sm) ln, err := utils.Listen(listen) if err != nil { diff --git a/pkg/frontier/servicebound/service_onoff.go b/pkg/frontier/servicebound/service_onoff.go index 768e455..a371918 100644 --- a/pkg/frontier/servicebound/service_onoff.go +++ b/pkg/frontier/servicebound/service_onoff.go @@ -7,8 +7,8 @@ import ( "github.com/jumboframes/armorigo/synchub" "github.com/singchia/frontier/pkg/frontier/apis" - "github.com/singchia/frontier/pkg/frontier/repo/dao" "github.com/singchia/frontier/pkg/frontier/repo/model" + "github.com/singchia/frontier/pkg/frontier/repo/query" "github.com/singchia/geminio" "github.com/singchia/geminio/delegate" "k8s.io/klog/v2" @@ -85,7 +85,7 @@ func (sm *serviceManager) offline(serviceID uint64, addr net.Addr) error { }() // clear memdb - if err := sm.repo.DeleteService(&dao.ServiceDelete{ + if err := sm.repo.DeleteService(&query.ServiceDelete{ ServiceID: serviceID, Addr: addr.String(), }); err != nil { diff --git a/test/batch/edges/edges.go b/test/batch/edges/edges.go index 517a9f5..a1b931e 100644 --- a/test/batch/edges/edges.go +++ b/test/batch/edges/edges.go @@ -25,7 +25,7 @@ func main() { network := pflag.String("network", "tcp", "network to dial") address := pflag.String("address", "127.0.0.1:30012", "address to dial") loglevel := pflag.String("loglevel", "info", "log level, trace debug info warn error") - count := pflag.Int("count", 10000, "messages to publish") + count := pflag.Int("count", 10000, "edges to dial") topic := pflag.String("topic", "test", "topic to specific") nseconds := pflag.Int("nseconds", 10, "publish message every n seconds for every edge") @@ -74,6 +74,7 @@ func main() { mtx.Lock() delete(edges, i) mtx.Unlock() + cli.Close() }(i) } diff --git a/test/bench/call/service/call_service.go b/test/bench/call/service/call_service.go index f7f3535..e022ae2 100644 --- a/test/bench/call/service/call_service.go +++ b/test/bench/call/service/call_service.go @@ -6,20 +6,31 @@ import ( "log" "net" "os" + "sync" + "time" armlog "github.com/jumboframes/armorigo/log" "github.com/jumboframes/armorigo/sigaction" "github.com/singchia/frontier/api/dataplane/v1/service" + "github.com/singchia/frontier/test/misc" "github.com/singchia/geminio" + "github.com/singchia/go-timer/v2" "github.com/spf13/pflag" ) +var ( + stats = map[string]uint64{} + updated bool + mtx = sync.RWMutex{} +) + func main() { network := pflag.String("network", "tcp", "network to dial") address := pflag.String("address", "127.0.0.1:30011", "address to dial") serviceName := pflag.String("service", "foo", "service name") loglevel := pflag.String("loglevel", "info", "log level, trace debug info warn error") printmessage := pflag.Bool("printmessage", false, "whether print message out") + printstats := pflag.Bool("printstats", false, "whether print topic stats") pflag.Parse() dialer := func() (net.Conn, error) { @@ -34,6 +45,19 @@ func main() { armlog.SetLevel(level) armlog.SetOutput(os.Stdout) + if *printstats { + t := timer.NewTimer() + t.Add(10*time.Second, timer.WithCyclically(), timer.WithHandler(func(e *timer.Event) { + mtx.Lock() + defer mtx.Unlock() + if !updated { + return + } + misc.PrintMap(stats) + updated = false + })) + } + // get service opt := []service.ServiceOption{service.OptionServiceLog(armlog.DefaultLog), service.OptionServiceName(*serviceName)} svc, err := service.NewService(dialer, opt...) @@ -44,6 +68,15 @@ func main() { // register svc.Register(context.TODO(), "echo", func(ctx context.Context, req geminio.Request, rsp geminio.Response) { value := req.Data() + mtx.Lock() + count, ok := stats[req.Method()] + if !ok { + stats[req.Method()] = 1 + } else { + stats[req.Method()] = count + 1 + } + updated = true + mtx.Unlock() if *printmessage { edgeID := req.ClientID() fmt.Printf("\n> call rpc, method: %s edgeID: %d streamID: %d data: %s\n", "echo", edgeID, req.StreamID(), string(value)) diff --git a/test/bench/open/service/open_service.go b/test/bench/open/service/open_service.go index 62996ee..be1799c 100644 --- a/test/bench/open/service/open_service.go +++ b/test/bench/open/service/open_service.go @@ -6,17 +6,23 @@ import ( "log" "net" "os" + "strconv" "sync" + "time" armlog "github.com/jumboframes/armorigo/log" "github.com/singchia/frontier/api/dataplane/v1/service" + "github.com/singchia/frontier/test/misc" "github.com/singchia/geminio" + "github.com/singchia/go-timer/v2" "github.com/spf13/pflag" ) var ( - sts = map[uint64]geminio.Stream{} - mtx sync.RWMutex + sts = map[uint64]geminio.Stream{} // streamID stream + stats = map[string]uint64{} // clientID count + updated bool + mtx sync.RWMutex ) func main() { @@ -24,6 +30,7 @@ func main() { address := pflag.String("address", "127.0.0.1:30011", "address to dial") serviceName := pflag.String("service", "foo", "service name") loglevel := pflag.String("loglevel", "info", "log level, trace debug info warn error") + printstats := pflag.Bool("printstats", false, "whether print topic stats") pflag.Parse() dialer := func() (net.Conn, error) { @@ -38,6 +45,20 @@ func main() { armlog.SetLevel(level) armlog.SetOutput(os.Stdout) + if *printstats { + t := timer.NewTimer() + t.Add(10*time.Second, timer.WithCyclically(), timer.WithHandler(func(e *timer.Event) { + mtx.Lock() + defer mtx.Unlock() + if !updated { + return + } + fmt.Printf("\033[2K\r stream number now: %d\n", len(sts)) + misc.PrintMap(stats) + updated = false + })) + } + // get service opt := []service.ServiceOption{service.OptionServiceLog(armlog.DefaultLog), service.OptionServiceName(*serviceName)} svc, err := service.NewService(dialer, opt...) @@ -56,9 +77,16 @@ func main() { fmt.Printf("> accept stream err: %s", err) continue } + clientID := strconv.FormatUint(st.ClientID(), 10) mtx.Lock() sts[st.StreamID()] = st - fmt.Print("\033[2K\r stream number:", len(sts)) + count, ok := stats[clientID] + if !ok { + stats[clientID] = 1 + } else { + stats[clientID] = count + 1 + } + updated = true mtx.Unlock() go func(st geminio.Stream) { @@ -67,8 +95,8 @@ func main() { if err == io.EOF { mtx.Lock() delete(sts, st.StreamID()) + updated = true mtx.Unlock() - fmt.Printf("\033[2K\r stream number: %d", len(sts)) } }(st) } diff --git a/test/bench/publish/service/publish_service.go b/test/bench/publish/service/publish_service.go index a193712..4b58fc3 100644 --- a/test/bench/publish/service/publish_service.go +++ b/test/bench/publish/service/publish_service.go @@ -7,12 +7,22 @@ import ( "log" "net" "os" + "sync" + "time" armlog "github.com/jumboframes/armorigo/log" "github.com/singchia/frontier/api/dataplane/v1/service" + "github.com/singchia/frontier/test/misc" + "github.com/singchia/go-timer/v2" "github.com/spf13/pflag" ) +var ( + stats = map[string]uint64{} + updated bool + mtx = sync.RWMutex{} +) + func main() { network := pflag.String("network", "tcp", "network to dial") address := pflag.String("address", "127.0.0.1:30011", "address to dial") @@ -20,6 +30,7 @@ func main() { topic := pflag.String("topic", "bench", "topic to specific") loglevel := pflag.String("loglevel", "info", "log level, trace debug info warn error") printmessage := pflag.Bool("printmessage", false, "whether print message out") + printstats := pflag.Bool("printstats", false, "whether print topic stats") pflag.Parse() dialer := func() (net.Conn, error) { @@ -34,6 +45,19 @@ func main() { armlog.SetLevel(level) armlog.SetOutput(os.Stdout) + if *printstats { + t := timer.NewTimer() + t.Add(10*time.Second, timer.WithCyclically(), timer.WithHandler(func(e *timer.Event) { + mtx.Lock() + defer mtx.Unlock() + if !updated { + return + } + misc.PrintMap(stats) + updated = false + })) + } + // get service opt := []service.ServiceOption{service.OptionServiceLog(armlog.DefaultLog), service.OptionServiceName(*serviceName)} if *topic != "" { @@ -54,6 +78,16 @@ func main() { fmt.Print(">>> ") continue } + mtx.Lock() + count, ok := stats[msg.Topic()] + if !ok { + stats[msg.Topic()] = 1 + } else { + stats[msg.Topic()] = count + 1 + } + updated = true + mtx.Unlock() + msg.Done() if *printmessage { value := msg.Data()