diff --git a/cmd/frontier/main.go b/cmd/frontier/main.go index 713ff90..5ac3a25 100644 --- a/cmd/frontier/main.go +++ b/cmd/frontier/main.go @@ -2,6 +2,9 @@ package main import ( "context" + "net/http" + _ "net/http/pprof" + "runtime" "github.com/jumboframes/armorigo/sigaction" "github.com/singchia/frontier/pkg/config" @@ -10,6 +13,7 @@ import ( "github.com/singchia/frontier/pkg/mq" "github.com/singchia/frontier/pkg/repo/dao" "github.com/singchia/frontier/pkg/servicebound" + "github.com/singchia/frontier/pkg/utils" "github.com/singchia/go-timer/v2" "k8s.io/klog/v2" ) @@ -20,6 +24,22 @@ func main() { klog.Errorf("parse flags err: %s", err) return } + // pprof + if conf.Daemon.PProf.Enable { + runtime.SetCPUProfileRate(conf.Daemon.PProf.CPUProfileRate) + go func() { + http.ListenAndServe(conf.Daemon.PProf.Addr, nil) + }() + } + // rlimit + if conf.Daemon.RLimit.Enable { + err = utils.SetRLimit(uint64(conf.Daemon.RLimit.NumFile)) + if err != nil { + klog.Errorf("set rlimit err: %s", err) + return + } + } + klog.Infof("frontier starts") defer func() { klog.Infof("frontier ends") @@ -32,21 +52,23 @@ func main() { klog.Errorf("new dao err: %s", err) return } - klog.V(5).Infof("new dao succeed") + klog.V(2).Infof("new dao succeed") + // mqm mqm, err := mq.NewMQM(conf) if err != nil { klog.Errorf("new mq manager err: %s", err) return } - klog.V(5).Infof("new mq manager succeed") + klog.V(2).Infof("new mq manager succeed") + // exchange exchange, err := exchange.NewExchange(conf, mqm) if err != nil { klog.Errorf("new exchange err: %s", err) return } - klog.V(5).Infof("new exchange succeed") + klog.V(2).Infof("new exchange succeed") tmr := timer.NewTimer() // servicebound @@ -56,7 +78,7 @@ func main() { return } go servicebound.Serve() - klog.V(5).Infof("new servicebound succeed") + klog.V(2).Infof("new servicebound succeed") // edgebound edgebound, err := edgebound.NewEdgebound(conf, dao, nil, exchange, tmr) @@ -65,11 +87,12 @@ func main() { return } go edgebound.Serve() - klog.V(5).Infof("new edgebound succeed") + klog.V(2).Infof("new edgebound succeed") sig := sigaction.NewSignal() sig.Wait(context.TODO()) edgebound.Close() servicebound.Close() + tmr.Close() } diff --git a/examples/iclm/edge/edge.go b/examples/iclm/edge/edge.go index 26eed5c..3b2e540 100644 --- a/examples/iclm/edge/edge.go +++ b/examples/iclm/edge/edge.go @@ -78,7 +78,7 @@ func main() { continue } msg.Done() - fmt.Println("> receive msg:", msg.ClientID(), msg.StreamID(), string(msg.Data())) + fmt.Printf("\n> receive msg, edgeID: %d streamID: %d data: %s\n", msg.ClientID(), msg.StreamID(), string(msg.Data())) fmt.Print(">>> ") } }() @@ -293,7 +293,7 @@ func handleStream(stream geminio.Stream) { return } msg.Done() - fmt.Println("> receive msg:", msg.ClientID(), msg.StreamID(), string(msg.Data())) + fmt.Printf("\n> receive msg, edgeID: %d streamID: %d data: %s\n", msg.ClientID(), msg.StreamID(), string(msg.Data())) fmt.Print(">>> ") } }() @@ -327,7 +327,8 @@ func handleStream(stream geminio.Stream) { } func echo(ctx context.Context, req geminio.Request, rsp geminio.Response) { - fmt.Println("\ncall > ", req.ClientID(), req.StreamID(), string(req.Data())) + edgeID := req.ClientID() + fmt.Printf("\n> call rpc, method: %s edgeID: %d streamID: %d data: %s\n", "echo", edgeID, req.StreamID(), string(req.Data())) fmt.Print(">>> ") rsp.SetData(req.Data()) } diff --git a/go.mod b/go.mod index 0c544cc..18cd23f 100644 --- a/go.mod +++ b/go.mod @@ -2,10 +2,8 @@ module github.com/singchia/frontier go 1.20 -replace github.com/singchia/geminio => ../../moresec/singchia/geminio - require ( - github.com/jumboframes/armorigo v0.3.0 + github.com/jumboframes/armorigo v0.4.0-rc.1 github.com/singchia/geminio v1.1.5-rc.1 github.com/singchia/go-timer/v2 v2.2.1 github.com/soheilhy/cmux v0.1.5 @@ -13,11 +11,11 @@ require ( gopkg.in/yaml.v2 v2.4.0 gorm.io/driver/sqlite v1.5.4 gorm.io/gorm v1.25.5 - k8s.io/klog/v2 v2.110.1 + k8s.io/klog/v2 v2.120.1 ) require ( - github.com/go-logr/logr v1.3.0 // indirect + github.com/go-logr/logr v1.4.1 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect github.com/mattn/go-sqlite3 v1.14.17 // indirect diff --git a/go.sum b/go.sum index 2a9f957..84882c9 100644 --- a/go.sum +++ b/go.sum @@ -1,20 +1,22 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= -github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= +github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/jumboframes/armorigo v0.2.3/go.mod h1:sXe0R32y6V3oJD2eXcPzMlimvZx0xIDiLedpQOy06t4= -github.com/jumboframes/armorigo v0.3.0 h1:oNAO7i+Kqm4i7x6G8pNcrCJYyNjyLvNrw9Fs0gRnYA8= -github.com/jumboframes/armorigo v0.3.0/go.mod h1:iwGCR/uQt36CSFfkPqIDMGdMQm/jGb3OZzPsL3Dbw6E= +github.com/jumboframes/armorigo v0.4.0-rc.1 h1:+9AM5ZLM/KdF0ldLvhbaSRFLIWbcynIrCJZ2G9FJrnk= +github.com/jumboframes/armorigo v0.4.0-rc.1/go.mod h1:H4OlF0Jj8e+8LkAqDjeLtapNNnUuUXR/h4Q32Lqgf9o= github.com/mattn/go-sqlite3 v1.14.17 h1:mCRHCLDUBXgpKAqIKsaAaAsrAlbkeomtRFKXh2L6YIM= github.com/mattn/go-sqlite3 v1.14.17/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/singchia/geminio v1.1.5-rc.1 h1:zUh+ilbLif23dgx6c/b8FGBKFoR9vLnjdoKUUG7o4QU= +github.com/singchia/geminio v1.1.5-rc.1/go.mod h1:CmDttmY18CGhpmeiVeGEBLXYKHIOoN3MSaEDMzfSOXA= github.com/singchia/go-timer/v2 v2.0.3/go.mod h1:PgkEQc6io8slCUiT5rHzWKU4/P2HXHWk3WWfijZXAf4= github.com/singchia/go-timer/v2 v2.2.1 h1:gJucmL99fkuNzGk2AfNPFpa1X3/4+aGO21KkjFAG624= github.com/singchia/go-timer/v2 v2.2.1/go.mod h1:PgkEQc6io8slCUiT5rHzWKU4/P2HXHWk3WWfijZXAf4= @@ -54,5 +56,5 @@ gorm.io/driver/sqlite v1.5.4 h1:IqXwXi8M/ZlPzH/947tn5uik3aYQslP9BVveoax0nV0= gorm.io/driver/sqlite v1.5.4/go.mod h1:qxAuCol+2r6PannQDpOP1FP6ag3mKi4esLnB/jHed+4= gorm.io/gorm v1.25.5 h1:zR9lOiiYf09VNh5Q1gphfyia1JpiClIWG9hQaxB/mls= gorm.io/gorm v1.25.5/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8= -k8s.io/klog/v2 v2.110.1 h1:U/Af64HJf7FcwMcXyKm2RPM22WZzyR7OSpYj5tg3cL0= -k8s.io/klog/v2 v2.110.1/go.mod h1:YGtd1984u+GgbuZ7e08/yBuAfKLSO0+uR1Fhi6ExXjo= +k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw= +k8s.io/klog/v2 v2.120.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= diff --git a/pkg/config/config.go b/pkg/config/config.go index fed0efa..698ef18 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -15,11 +15,14 @@ import ( // daemon related type RLimit struct { - NumFile int `yaml:"nofile"` + Enable bool `yaml:"enable"` + NumFile int `yaml:"nofile"` } type PProf struct { - Addr string `yaml:"addr"` + Enable bool `yaml:"enable"` + Addr string `yaml:"addr"` + CPUProfileRate int `yaml:"cpu_profile_rate"` } type Daemon struct { @@ -75,18 +78,8 @@ type MQ struct { // exchange type Exchange struct{} -type Log struct { - LogDir string `yaml:"log_dir"` - LogFile string `yaml:"log_file"` - LogFileMaxSizeMB uint64 `yaml:"log_file_max_size"` - ToStderr bool `yaml:"logtostderr"` - AlsoToStderr bool `yaml:"alsologtostderr"` - Verbosity int32 `yaml:"verbosity"` - AddDirHeader bool `yaml:"add_dir_header"` - SkipHeaders bool `yaml:"skip_headers"` - OneOutput bool `yaml:"one_output"` - SkipLogHeaders bool `yaml:"skip_log_headers"` - StderrThreshold int32 `yaml:"stderrthreshold"` +type Dao struct { + Debug bool `yaml:"debug"` } type Configuration struct { @@ -96,7 +89,7 @@ type Configuration struct { Servicebound Servicebound `yaml:"servicebound"` - Log Log `yaml:"log"` + Dao Dao `yaml:"dao"` } // Configuration accepts config file and command-line, and command-line is more privileged. @@ -155,7 +148,11 @@ func Parse() (*Configuration, error) { if config == nil { config = &Configuration{} } + // daemon config.Daemon.RLimit.NumFile = *argDaemonRLimitNofile + if config.Daemon.PProf.CPUProfileRate == 0 { + config.Daemon.PProf.CPUProfileRate = 10000 + } return config, nil } @@ -166,7 +163,8 @@ func genDefaultConfig(writer io.Writer) error { NumFile: 1024, }, PProf: PProf{ - Addr: "0.0.0.0:6060", + Enable: true, + Addr: "0.0.0.0:6060", }, }, Edgebound: Edgebound{ @@ -228,18 +226,9 @@ func genDefaultConfig(writer io.Writer) error { }, }, }, - Log: Log{ - LogDir: "/app/log", - LogFile: "frontier.log", - LogFileMaxSizeMB: 100, - ToStderr: false, - AlsoToStderr: false, - Verbosity: 4, - AddDirHeader: true, - SkipHeaders: true, - OneOutput: true, - SkipLogHeaders: true, - StderrThreshold: 1024, + + Dao: Dao{ + Debug: false, }, } data, err := yaml.Marshal(conf) diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 8955a26..bea213d 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -52,7 +52,6 @@ func TestParseFile(t *testing.T) { }, Edgebound: Edgebound{}, Servicebound: Servicebound{}, - Log: Log{}, } _, err := yaml.Marshal(conf) if err != nil { diff --git a/pkg/edgebound/edge_dataplane.go b/pkg/edgebound/edge_dataplane.go index 3a6159d..6374a08 100644 --- a/pkg/edgebound/edge_dataplane.go +++ b/pkg/edgebound/edge_dataplane.go @@ -9,7 +9,7 @@ func (em *edgeManager) acceptStream(stream geminio.Stream) { edgeID := stream.ClientID() streamID := stream.StreamID() meta := stream.Meta() - klog.V(5).Infof("edge accept stream, edgeID: %d, streamID: %d, meta: %s", edgeID, streamID, meta) + klog.V(2).Infof("edge accept stream, edgeID: %d, streamID: %d, meta: %s", edgeID, streamID, meta) // cache em.streams.MSet(edgeID, streamID, stream) @@ -23,7 +23,7 @@ func (em *edgeManager) closedStream(stream geminio.Stream) { edgeID := stream.ClientID() streamID := stream.StreamID() meta := stream.Meta() - klog.V(5).Infof("edge closed stream, edgeID: %d, streamID: %d, meta: %s", edgeID, streamID, meta) + klog.V(2).Infof("edge closed stream, edgeID: %d, streamID: %d, meta: %s", edgeID, streamID, meta) // cache em.streams.MDel(edgeID, streamID) // when the stream ends, the exchange can be noticed by functional error, so we don't update exchange @@ -33,7 +33,7 @@ func (em *edgeManager) closedStream(stream geminio.Stream) { func (em *edgeManager) forward(end geminio.End) { edgeID := end.ClientID() meta := end.Meta() - klog.V(5).Infof("edge forward raw message and rpc, edgeID: %d, meta: %s", edgeID, meta) + klog.V(2).Infof("edge forward raw message and rpc, edgeID: %d, meta: %s", edgeID, meta) if em.exchange != nil { em.exchange.ForwardToService(end) } diff --git a/pkg/edgebound/edge_manager.go b/pkg/edgebound/edge_manager.go index c8aff19..b191e36 100644 --- a/pkg/edgebound/edge_manager.go +++ b/pkg/edgebound/edge_manager.go @@ -9,6 +9,7 @@ import ( "strings" "sync" + "github.com/jumboframes/armorigo/log" "github.com/jumboframes/armorigo/rproxy" "github.com/jumboframes/armorigo/synchub" "github.com/singchia/frontier/pkg/api" @@ -244,7 +245,7 @@ func (em *edgeManager) Serve() { conn, err := em.geminioLn.Accept() if err != nil { if !strings.Contains(err.Error(), api.ErrStrUseOfClosedConnection) { - klog.V(4).Infof("edge manager listener accept err: %s", err) + klog.V(1).Infof("edge manager listener accept err: %s", err) } return } @@ -260,6 +261,7 @@ func (em *edgeManager) handleConn(conn net.Conn) error { // stream handler opt.SetAcceptStreamFunc(em.acceptStream) opt.SetClosedStreamFunc(em.closedStream) + opt.SetLog(log.NewKLog()) end, err := server.NewEndWithConn(conn, opt) if err != nil { klog.Errorf("edge manager geminio server new end err: %s", err) diff --git a/pkg/edgebound/edge_onoff.go b/pkg/edgebound/edge_onoff.go index 88d1f71..ec2fac2 100644 --- a/pkg/edgebound/edge_onoff.go +++ b/pkg/edgebound/edge_onoff.go @@ -63,7 +63,7 @@ func (em *edgeManager) offline(edgeID uint64, addr net.Addr) error { if end != nil && end.RemoteAddr().String() == addr.String() { legacy = true delete(em.edges, edgeID) - klog.V(5).Infof("edge offline, edgeID: %d, remote addr: %s", edgeID, end.RemoteAddr().String()) + klog.V(2).Infof("edge offline, edgeID: %d, remote addr: %s", edgeID, end.RemoteAddr().String()) } } else { klog.Warningf("edge offline, edgeID: %d not found in cache", edgeID) @@ -98,7 +98,7 @@ func (em *edgeManager) ConnOnline(d delegate.ConnDescriber) error { meta := d.Meta() addr := d.RemoteAddr() - klog.V(4).Infof("edge online, edgeID: %d, meta: %s, addr: %s", edgeID, string(meta), addr) + klog.V(1).Infof("edge online, edgeID: %d, meta: %s, addr: %s", edgeID, string(meta), addr) // inform others if em.informer != nil { em.informer.EdgeOnline(edgeID, d.Meta(), addr) @@ -118,7 +118,7 @@ func (em *edgeManager) ConnOffline(d delegate.ConnDescriber) error { meta := d.Meta() addr := d.RemoteAddr() - klog.V(4).Infof("edge offline, edgeID: %d, meta: %s, addr: %s", edgeID, string(meta), addr) + klog.V(1).Infof("edge offline, edgeID: %d, meta: %s, addr: %s", edgeID, string(meta), addr) // offline the cache err := em.offline(edgeID, addr) if err != nil { @@ -141,7 +141,7 @@ func (em *edgeManager) Heartbeat(d delegate.ConnDescriber) error { edgeID := d.ClientID() meta := string(d.Meta()) addr := d.RemoteAddr() - klog.V(6).Infof("edge heartbeat, edgeID: %d, meta: %s, addr: %s", edgeID, string(meta), addr) + klog.V(3).Infof("edge heartbeat, edgeID: %d, meta: %s, addr: %s", edgeID, string(meta), addr) if em.informer != nil { em.informer.EdgeHeartbeat(edgeID, d.Meta(), addr) } @@ -149,7 +149,7 @@ func (em *edgeManager) Heartbeat(d delegate.ConnDescriber) error { } func (em *edgeManager) RemoteRegistration(rpc string, edgeID, streamID uint64) { - klog.V(5).Infof("edge remote rpc registration, rpc: %s, edgeID: %d, streamID: %d", rpc, edgeID, streamID) + klog.V(3).Infof("edge remote rpc registration, rpc: %s, edgeID: %d, streamID: %d", rpc, edgeID, streamID) // memdb er := &model.EdgeRPC{ @@ -171,14 +171,14 @@ func (em *edgeManager) GetClientID(meta []byte) (uint64, error) { if em.exchange != nil { edgeID, err = em.exchange.GetEdgeID(meta) if err == nil { - klog.V(5).Infof("edge get edgeID: %d from exchange, meta: %s", edgeID, string(meta)) + klog.V(2).Infof("edge get edgeID: %d from exchange, meta: %s", edgeID, string(meta)) return edgeID, nil } } if err == api.ErrServiceNotOnline && em.conf.Edgebound.EdgeIDAllocWhenNoIDServiceOn { edgeID = em.idFactory.GetID() - klog.V(5).Infof("edge get edgeID: %d, meta: %s, after no ID acquired from exchange", edgeID, string(meta)) + klog.V(2).Infof("edge get edgeID: %d, meta: %s, after no ID acquired from exchange", edgeID, string(meta)) return em.idFactory.GetID(), nil } return 0, err diff --git a/pkg/exchange/forward.go b/pkg/exchange/forward.go index 4468843..eef84c3 100644 --- a/pkg/exchange/forward.go +++ b/pkg/exchange/forward.go @@ -22,7 +22,7 @@ func (ex *exchange) ForwardToEdge(meta *api.Meta, end geminio.End) { func (ex *exchange) forwardRawToEdge(end geminio.End) { //drop the io, actually we won't be here go func() { - klog.V(6).Infof("exchange forward raw to edge, discard for now, serviceID: %d", end.ClientID()) + klog.V(3).Infof("exchange forward raw to edge, discard for now, serviceID: %d", end.ClientID()) io.Copy(io.Discard, end) }() } @@ -39,14 +39,15 @@ func (ex *exchange) forwardRPCToEdge(end geminio.End) { // get edge edge := ex.Edgebound.GetEdgeByID(edgeID) if edge == nil { - klog.V(4).Infof("service forward rpc, serviceID: %d, call edgeID: %d, is not online", serviceID, edgeID) + klog.V(1).Infof("service forward rpc, serviceID: %d, call edgeID: %d, is not online", serviceID, edgeID) r2.SetError(api.ErrEdgeNotOnline) return } // call edge + r1.SetClientID(edge.ClientID()) r3, err := edge.Call(ctx, method, r1) if err != nil { - klog.V(5).Infof("service forward rpc, serviceID: %d, call edgeID: %d, err: %s", serviceID, edgeID, err) + klog.V(2).Infof("service forward rpc, serviceID: %d, call edgeID: %d, err: %s", serviceID, edgeID, err) r2.SetError(err) return } @@ -73,13 +74,13 @@ func (ex *exchange) forwardMessageToEdge(end geminio.End) { msg, err := end.Receive(context.TODO()) if err != nil { if err == io.EOF { - klog.V(5).Infof("service forward message, serviceID: %d, receive EOF", serviceID) + klog.V(2).Infof("service forward message, serviceID: %d, receive EOF", serviceID) return } klog.Errorf("service forward message, serviceID: %d, receive err: %s", serviceID, err) continue } - klog.V(7).Infof("service forward message, receive msg: %s from: %d", string(msg.Data()), end.ClientID()) + klog.V(2).Infof("service forward message, receive msg: %s from: %d", string(msg.Data()), end.ClientID()) // get target edgeID custom := msg.Custom() edgeID := binary.BigEndian.Uint64(custom[len(custom)-8:]) @@ -88,7 +89,7 @@ func (ex *exchange) forwardMessageToEdge(end geminio.End) { // get edge edge := ex.Edgebound.GetEdgeByID(edgeID) if edge == nil { - klog.V(4).Infof("service forward message, serviceID: %d, the edge: %d is not online", serviceID, edgeID) + klog.V(1).Infof("service forward message, serviceID: %d, the edge: %d is not online", serviceID, edgeID) msg.Error(api.ErrEdgeNotOnline) return } @@ -96,7 +97,7 @@ func (ex *exchange) forwardMessageToEdge(end geminio.End) { msg.SetClientID(edgeID) err = edge.Publish(context.TODO(), msg) if err != nil { - klog.V(5).Infof("service forward message, serviceID: %d, publish edge: %d err: %s", serviceID, edgeID, err) + klog.V(2).Infof("service forward message, serviceID: %d, publish edge: %d err: %s", serviceID, edgeID, err) msg.Error(err) return } @@ -118,7 +119,7 @@ func (ex *exchange) ForwardToService(end geminio.End) { func (ex *exchange) forwardRawToService(end geminio.End) { //drop the io, actually we won't be here go func() { - klog.V(6).Infof("exchange forward raw to service, discard for now, edgeID: %d", end.ClientID()) + klog.V(3).Infof("exchange forward raw to service, discard for now, edgeID: %d", end.ClientID()) io.Copy(io.Discard, end) }() } @@ -131,7 +132,7 @@ func (ex *exchange) forwardRPCToService(end geminio.End) { // get service svc, err := ex.Servicebound.GetServiceByRPC(method) if err != nil { - klog.Errorf("exchange forward rpc to service, get service by rpc err: %s, edgeID: %d", err, edgeID) + klog.V(2).Infof("exchange forward rpc to service, get service by rpc err: %s, edgeID: %d", err, edgeID) r2.SetError(err) return } @@ -154,7 +155,7 @@ func (ex *exchange) forwardRPCToService(end geminio.End) { r2.SetError(err) return } - klog.V(6).Infof("edge forward rpc to service, call service: %d rpc: %s success, edgeID: %d", serviceID, method, edgeID) + klog.V(3).Infof("edge forward rpc to service, call service: %d rpc: %s success, edgeID: %d", serviceID, method, edgeID) r2.SetData(r3.Data()) }) } @@ -167,7 +168,7 @@ func (ex *exchange) forwardMessageToService(end geminio.End) { msg, err := end.Receive(context.TODO()) if err != nil { if err == io.EOF { - klog.V(5).Infof("edge forward message, edgeID: %d, receive EOF", edgeID) + klog.V(2).Infof("edge forward message, edgeID: %d, receive EOF", edgeID) return } klog.Errorf("edge forward message, receive err: %s, edgeID: %d, ", err, edgeID) diff --git a/pkg/exchange/oob.go b/pkg/exchange/oob.go index d5494a5..bd38253 100644 --- a/pkg/exchange/oob.go +++ b/pkg/exchange/oob.go @@ -13,7 +13,7 @@ import ( func (ex *exchange) GetEdgeID(meta []byte) (uint64, error) { svc, err := ex.Servicebound.GetServiceByRPC(api.RPCGetEdgeID) if err != nil { - klog.Errorf("exchange get edgeID, get service err: %s, meta: %s", err, string(meta)) + klog.V(2).Infof("exchange get edgeID, get service err: %s, meta: %s", err, string(meta)) if err == api.ErrRecordNotFound { return 0, api.ErrServiceNotOnline } @@ -23,7 +23,7 @@ func (ex *exchange) GetEdgeID(meta []byte) (uint64, error) { req := svc.NewRequest(meta) rsp, err := svc.Call(context.TODO(), api.RPCGetEdgeID, req) if err != nil { - klog.V(5).Infof("exchange call service: %d, get edgeID err: %s, meta: %s", svc.ClientID(), err, meta) + klog.V(2).Infof("exchange call service: %d, get edgeID err: %s, meta: %s", svc.ClientID(), err, meta) return 0, err } data := rsp.Data() @@ -36,7 +36,7 @@ func (ex *exchange) GetEdgeID(meta []byte) (uint64, error) { func (ex *exchange) EdgeOnline(edgeID uint64, meta []byte, addr net.Addr) error { svc, err := ex.Servicebound.GetServiceByRPC(api.RPCEdgeOnline) if err != nil { - klog.Errorf("exchange edge online, get service err: %s, edgeID: %d, meta: %s, addr: %s", err, edgeID, string(meta), addr) + klog.V(2).Infof("exchange edge online, get service err: %s, edgeID: %d, meta: %s, addr: %s", err, edgeID, string(meta), addr) if err == api.ErrRecordNotFound { return api.ErrServiceNotOnline } @@ -58,7 +58,7 @@ func (ex *exchange) EdgeOnline(edgeID uint64, meta []byte, addr net.Addr) error req := svc.NewRequest(data) _, err = svc.Call(context.TODO(), api.RPCEdgeOnline, req) if err != nil { - klog.V(5).Infof("exchange call service: %d, edge online err: %s, meta: %s, addr: %s", svc.ClientID(), err, meta, addr) + klog.V(2).Infof("exchange call service: %d, edge online err: %s, meta: %s, addr: %s", svc.ClientID(), err, meta, addr) return err } return nil @@ -67,7 +67,7 @@ func (ex *exchange) EdgeOnline(edgeID uint64, meta []byte, addr net.Addr) error func (ex *exchange) EdgeOffline(edgeID uint64, meta []byte, addr net.Addr) error { svc, err := ex.Servicebound.GetServiceByRPC(api.RPCEdgeOffline) if err != nil { - klog.Errorf("exchange edge offline, get service err: %s, edgeID: %d, meta: %s, addr: %s", err, edgeID, string(meta), addr) + klog.V(2).Infof("exchange edge offline, get service err: %s, edgeID: %d, meta: %s, addr: %s", err, edgeID, string(meta), addr) if err == api.ErrRecordNotFound { return api.ErrServiceNotOnline } @@ -89,7 +89,7 @@ func (ex *exchange) EdgeOffline(edgeID uint64, meta []byte, addr net.Addr) error req := svc.NewRequest(data) _, err = svc.Call(context.TODO(), api.RPCEdgeOffline, req) if err != nil { - klog.V(5).Infof("exchange call service: %d, edge offline err: %s, meta: %s, addr: %s", svc.ClientID(), err, meta, addr) + klog.V(2).Infof("exchange call service: %d, edge offline err: %s, meta: %s, addr: %s", svc.ClientID(), err, meta, addr) return err } return nil diff --git a/pkg/mq/mq_manager.go b/pkg/mq/mq_manager.go index 649ad48..ad009e6 100644 --- a/pkg/mq/mq_manager.go +++ b/pkg/mq/mq_manager.go @@ -38,14 +38,14 @@ func (mqm *mqManager) AddMQ(topics []string, mq api.MQ) { for _, topic := range topics { mqs, ok := mqm.mqs[topic] if !ok { - klog.V(5).Infof("mq manager, add topic: %s mq succeed", topic) + klog.V(2).Infof("mq manager, add topic: %s mq succeed", topic) mqm.mqs[topic] = []api.MQ{mq} mqm.mqindex[topic] = new(uint64) continue } for _, exist := range mqs { if exist == mq { - klog.V(5).Infof("mq manager, add topic: %s mq existed", topic) + klog.V(2).Infof("mq manager, add topic: %s mq existed", topic) continue } // special handle for service, a deep comparison @@ -53,14 +53,14 @@ func (mqm *mqManager) AddMQ(topics []string, mq api.MQ) { if ok { right, ok := mq.(*mqService) if ok && left.end == right.end { - klog.V(5).Infof("mq manager, add topic: %s service mq existed", topic) + klog.V(2).Infof("mq manager, add topic: %s service mq existed", topic) continue } } } mqs = append(mqs, mq) mqm.mqs[topic] = mqs - klog.V(5).Infof("mq mqnager, add topic: %s mq succeed", topic) + klog.V(2).Infof("mq mqnager, add topic: %s mq succeed", topic) } } @@ -77,7 +77,7 @@ func (mqm *mqManager) DelMQ(mq api.MQ) { news := []api.MQ{} for _, exist := range mqs { if exist == mq { - klog.V(6).Infof("mq manager, del topic: %s mq succeed", topic) + klog.V(3).Infof("mq manager, del topic: %s mq succeed", topic) continue } news = append(news, exist) @@ -103,7 +103,7 @@ func (mqm *mqManager) DelMQByEnd(end geminio.End) { left, ok := exist.(*mqService) if ok { if ok && left.end == end { - klog.V(6).Infof("mq manager, del topic: %s service mq succeed", topic) + klog.V(3).Infof("mq manager, del topic: %s service mq succeed", topic) continue } } @@ -154,7 +154,7 @@ func (mqm *mqManager) Produce(topic string, data []byte, opts ...api.OptionProdu klog.Errorf("mq manager, produce topic: %s message err: %s", topic, err) return err } - klog.V(6).Infof("mq manager, produce topic: %s message succeed", topic) + klog.V(3).Infof("mq manager, produce topic: %s message succeed", topic) return nil } diff --git a/pkg/repo/dao/dao_edge.go b/pkg/repo/dao/dao_edge.go index f11b956..485012d 100644 --- a/pkg/repo/dao/dao_edge.go +++ b/pkg/repo/dao/dao_edge.go @@ -17,7 +17,7 @@ type EdgeQuery struct { func (dao *Dao) ListEdges(query *EdgeQuery) ([]*model.Edge, error) { tx := dao.dbEdge.Model(&model.Edge{}) - if dao.config.Log.Verbosity >= 4 { + if dao.config.Dao.Debug { tx = tx.Debug() } tx = buildEdgeQuery(tx, query) @@ -48,7 +48,7 @@ func (dao *Dao) ListEdges(query *EdgeQuery) ([]*model.Edge, error) { func (dao *Dao) CountEdges(query *EdgeQuery) (int64, error) { tx := dao.dbEdge.Model(&model.Edge{}) - if dao.config.Log.Verbosity >= 4 { + if dao.config.Dao.Debug { tx = tx.Debug() } tx = buildEdgeQuery(tx, query) @@ -60,7 +60,7 @@ func (dao *Dao) CountEdges(query *EdgeQuery) (int64, error) { func (dao *Dao) GetEdge(edgeID uint64) (*model.Edge, error) { tx := dao.dbEdge.Model(&model.Edge{}) - if dao.config.Log.Verbosity >= 4 { + if dao.config.Dao.Debug { tx = tx.Debug() } tx = tx.Where("edge_id = ?", edgeID) @@ -77,7 +77,7 @@ type EdgeDelete struct { func (dao *Dao) DeleteEdge(delete *EdgeDelete) error { tx := dao.dbEdge - if dao.config.Log.Verbosity >= 4 { + if dao.config.Dao.Debug { tx = tx.Debug() } tx = buildEdgeDelete(tx, delete) @@ -86,7 +86,7 @@ func (dao *Dao) DeleteEdge(delete *EdgeDelete) error { func (dao *Dao) CreateEdge(edge *model.Edge) error { tx := dao.dbEdge - if dao.config.Log.Verbosity >= 4 { + if dao.config.Dao.Debug { tx = tx.Debug() } return tx.Create(edge).Error @@ -134,7 +134,7 @@ type EdgeRPCQuery struct { func (dao *Dao) ListEdgeRPCs(query *EdgeRPCQuery) ([]string, error) { tx := dao.dbEdge.Model(&model.EdgeRPC{}) - if dao.config.Log.Verbosity >= 4 { + if dao.config.Dao.Debug { tx = tx.Debug() } tx = buildEdgeRPCQuery(tx, query) @@ -165,7 +165,7 @@ func (dao *Dao) ListEdgeRPCs(query *EdgeRPCQuery) ([]string, error) { func (dao *Dao) CountEdgeRPCs(query *EdgeRPCQuery) (int64, error) { tx := dao.dbEdge.Model(&model.EdgeRPC{}) - if dao.config.Log.Verbosity >= 4 { + if dao.config.Dao.Debug { tx = tx.Debug() } tx = buildEdgeRPCQuery(tx, query) @@ -178,7 +178,7 @@ func (dao *Dao) CountEdgeRPCs(query *EdgeRPCQuery) (int64, error) { func (dao *Dao) DeleteEdgeRPCs(edgeID uint64) error { tx := dao.dbEdge.Where("edge_id = ?", edgeID) - if dao.config.Log.Verbosity >= 4 { + if dao.config.Dao.Debug { tx = tx.Debug() } return tx.Delete(&model.EdgeRPC{}).Error @@ -186,7 +186,7 @@ func (dao *Dao) DeleteEdgeRPCs(edgeID uint64) error { func (dao *Dao) CreateEdgeRPC(rpc *model.EdgeRPC) error { tx := dao.dbEdge - if dao.config.Log.Verbosity >= 4 { + if dao.config.Dao.Debug { tx = tx.Debug() } return tx.Create(rpc).Error diff --git a/pkg/repo/dao/dao_service.go b/pkg/repo/dao/dao_service.go index 75bfc03..43d5cc8 100644 --- a/pkg/repo/dao/dao_service.go +++ b/pkg/repo/dao/dao_service.go @@ -19,7 +19,7 @@ type ServiceQuery struct { // service func (dao *Dao) ListServices(query *ServiceQuery) ([]*model.Service, error) { tx := dao.dbService.Model(&model.Service{}) - if dao.config.Log.Verbosity >= 4 { + if dao.config.Dao.Debug { tx = tx.Debug() } tx = buildServiceQuery(tx, query) @@ -50,7 +50,7 @@ func (dao *Dao) ListServices(query *ServiceQuery) ([]*model.Service, error) { func (dao *Dao) CountServices(query *ServiceQuery) (int64, error) { tx := dao.dbService.Model(&model.Edge{}) - if dao.config.Log.Verbosity >= 4 { + if dao.config.Dao.Debug { tx = tx.Debug() } tx = buildServiceQuery(tx, query) @@ -62,7 +62,7 @@ func (dao *Dao) CountServices(query *ServiceQuery) (int64, error) { func (dao *Dao) GetService(serviceID uint64) (*model.Service, error) { tx := dao.dbService.Model(&model.Service{}) - if dao.config.Log.Verbosity >= 4 { + if dao.config.Dao.Debug { tx = tx.Debug() } tx = tx.Where("service_id = ?", serviceID).Limit(1) @@ -82,7 +82,7 @@ type ServiceDelete struct { func (dao *Dao) DeleteService(delete *ServiceDelete) error { tx := dao.dbService - if dao.config.Log.Verbosity >= 4 { + if dao.config.Dao.Debug { tx = tx.Debug() } tx = buildServiceDelete(tx, delete) @@ -91,7 +91,7 @@ func (dao *Dao) DeleteService(delete *ServiceDelete) error { func (dao *Dao) CreateService(service *model.Service) error { var tx *gorm.DB - if dao.config.Log.Verbosity >= 4 { + if dao.config.Dao.Debug { tx = tx.Debug() } tx = dao.dbService.Create(service) @@ -145,7 +145,7 @@ type ServiceRPCQuery struct { func (dao *Dao) GetServiceRPC(rpc string) (*model.ServiceRPC, error) { tx := dao.dbService.Model(&model.ServiceRPC{}) - if dao.config.Log.Verbosity >= 4 { + if dao.config.Dao.Debug { tx = tx.Debug() } tx = tx.Where("rpc = ?", rpc).Limit(1) @@ -162,7 +162,7 @@ func (dao *Dao) GetServiceRPC(rpc string) (*model.ServiceRPC, error) { func (dao *Dao) ListServiceRPCs(query *ServiceRPCQuery) ([]string, error) { tx := dao.dbService.Model(&model.ServiceRPC{}) - if dao.config.Log.Verbosity >= 4 { + if dao.config.Dao.Debug { tx = tx.Debug() } tx = buildServiceRPCQuery(tx, query) @@ -192,7 +192,7 @@ func (dao *Dao) ListServiceRPCs(query *ServiceRPCQuery) ([]string, error) { func (dao *Dao) CountServiceRPCs(query *ServiceRPCQuery) (int64, error) { tx := dao.dbService.Model(&model.ServiceRPC{}) - if dao.config.Log.Verbosity >= 4 { + if dao.config.Dao.Debug { tx = tx.Debug() } tx = buildServiceRPCQuery(tx, query) @@ -204,7 +204,7 @@ func (dao *Dao) CountServiceRPCs(query *ServiceRPCQuery) (int64, error) { func (dao *Dao) DeleteServiceRPCs(serviceID uint64) error { tx := dao.dbService.Where("service_id = ?", serviceID) - if dao.config.Log.Verbosity >= 4 { + if dao.config.Dao.Debug { tx = tx.Debug() } return tx.Delete(&model.ServiceRPC{}).Error @@ -212,7 +212,7 @@ func (dao *Dao) DeleteServiceRPCs(serviceID uint64) error { func (dao *Dao) CreateServiceRPC(rpc *model.ServiceRPC) error { tx := dao.dbService - if dao.config.Log.Verbosity >= 4 { + if dao.config.Dao.Debug { tx = tx.Debug() } return tx.Create(rpc).Error @@ -244,7 +244,7 @@ type ServiceTopicQuery struct { func (dao *Dao) GetServiceTopic(topic string) (*model.ServiceTopic, error) { tx := dao.dbService.Model(&model.ServiceTopic{}) - if dao.config.Log.Verbosity >= 4 { + if dao.config.Dao.Debug { tx = tx.Debug() } tx = tx.Where("topic = ?", topic).Limit(1) @@ -259,7 +259,7 @@ func (dao *Dao) GetServiceTopic(topic string) (*model.ServiceTopic, error) { func (dao *Dao) ListServiceTopics(query *ServiceTopicQuery) ([]string, error) { tx := dao.dbService.Model(&model.ServiceTopic{}) - if dao.config.Log.Verbosity >= 4 { + if dao.config.Dao.Debug { tx = tx.Debug() } tx = buildServiceTopicQuery(tx, query) @@ -290,7 +290,7 @@ func (dao *Dao) ListServiceTopics(query *ServiceTopicQuery) ([]string, error) { func (dao *Dao) CountServiceTopics(query *ServiceTopicQuery) (int64, error) { tx := dao.dbService.Model(&model.ServiceTopic{}) - if dao.config.Log.Verbosity >= 4 { + if dao.config.Dao.Debug { tx = tx.Debug() } tx = buildServiceTopicQuery(tx, query) @@ -302,7 +302,7 @@ func (dao *Dao) CountServiceTopics(query *ServiceTopicQuery) (int64, error) { func (dao *Dao) DeleteServiceTopics(serviceID uint64) error { tx := dao.dbService.Where("service_id = ?", serviceID) - if dao.config.Log.Verbosity >= 4 { + if dao.config.Dao.Debug { tx = tx.Debug() } return tx.Delete(&model.ServiceTopic{}).Error @@ -310,7 +310,7 @@ func (dao *Dao) DeleteServiceTopics(serviceID uint64) error { func (dao *Dao) CreateServiceTopic(topic *model.ServiceTopic) error { tx := dao.dbService - if dao.config.Log.Verbosity >= 4 { + if dao.config.Dao.Debug { tx = tx.Debug() } return tx.Create(topic).Error diff --git a/pkg/servicebound/service_dataplane.go b/pkg/servicebound/service_dataplane.go index 8698969..9e26ef8 100644 --- a/pkg/servicebound/service_dataplane.go +++ b/pkg/servicebound/service_dataplane.go @@ -10,7 +10,7 @@ func (sm *serviceManager) acceptStream(stream geminio.Stream) { serviceID := stream.ClientID() streamID := stream.StreamID() service := stream.Meta() - klog.V(5).Infof("service accept stream, serviceID: %d, streamID: %d, service: %s", serviceID, streamID, service) + klog.V(2).Infof("service accept stream, serviceID: %d, streamID: %d, service: %s", serviceID, streamID, service) // cache sm.streams.MSet(serviceID, streamID, stream) @@ -24,7 +24,7 @@ func (sm *serviceManager) closedStream(stream geminio.Stream) { serviceID := stream.ClientID() streamID := stream.StreamID() service := stream.Meta() - klog.V(5).Infof("service closed stream, serviceID: %d, streamID: %d, service: %s", serviceID, streamID, service) + klog.V(2).Infof("service closed stream, serviceID: %d, streamID: %d, service: %s", serviceID, streamID, service) // cache sm.streams.MDel(serviceID, streamID) // when the stream ends, the exchange can be noticed by functional error, so we don't update exchange @@ -34,7 +34,7 @@ func (sm *serviceManager) closedStream(stream geminio.Stream) { func (sm *serviceManager) forward(meta *api.Meta, end geminio.End) { serviceID := end.ClientID() service := meta.Service - klog.V(5).Infof("service forward raw message and rpc, serviceID: %d, service: %s", serviceID, service) + klog.V(2).Infof("service forward raw message and rpc, serviceID: %d, service: %s", serviceID, service) if sm.exchange != nil { sm.exchange.ForwardToEdge(meta, end) } diff --git a/pkg/servicebound/service_manager.go b/pkg/servicebound/service_manager.go index 0263056..77da0f4 100644 --- a/pkg/servicebound/service_manager.go +++ b/pkg/servicebound/service_manager.go @@ -10,6 +10,7 @@ import ( "sync" "time" + "github.com/jumboframes/armorigo/log" "github.com/jumboframes/armorigo/synchub" "github.com/singchia/frontier/pkg/api" "github.com/singchia/frontier/pkg/config" @@ -153,7 +154,7 @@ func (sm *serviceManager) Serve() { conn, err := sm.ln.Accept() if err != nil { if !strings.Contains(err.Error(), api.ErrStrUseOfClosedConnection) { - klog.V(4).Infof("service manager listener accept err: %s", err) + klog.V(1).Infof("service manager listener accept err: %s", err) } return } @@ -169,6 +170,7 @@ func (sm *serviceManager) handleConn(conn net.Conn) error { // stream handler opt.SetAcceptStreamFunc(sm.acceptStream) opt.SetClosedStreamFunc(sm.closedStream) + opt.SetLog(log.NewKLog()) end, err := server.NewEndWithConn(conn, opt) if err != nil { klog.Errorf("service manager geminio server new end err: %s", err) @@ -199,7 +201,7 @@ func (sm *serviceManager) handleConn(conn net.Conn) error { // topics func (sm *serviceManager) remoteReceiveClaim(serviceID uint64, topics []string) error { - klog.V(5).Infof("service remote receive claim, topics: %v, serviceID: %d", topics, serviceID) + klog.V(2).Infof("service remote receive claim, topics: %v, serviceID: %d", topics, serviceID) var err error // memdb for _, topic := range topics { @@ -219,7 +221,7 @@ func (sm *serviceManager) remoteReceiveClaim(serviceID uint64, topics []string) // rpc, RemoteRegistration is called from underlayer func (sm *serviceManager) RemoteRegistration(rpc string, serviceID, streamID uint64) { // TODO return error - klog.V(5).Infof("service remote rpc registration, rpc: %s, serviceID: %d, streamID: %d", rpc, serviceID, streamID) + klog.V(2).Infof("service remote rpc registration, rpc: %s, serviceID: %d, streamID: %d", rpc, serviceID, streamID) // memdb sr := &model.ServiceRPC{ @@ -246,7 +248,7 @@ func (sm *serviceManager) GetServiceByRPC(rpc string) (geminio.End, error) { mrpc, err := sm.dao.GetServiceRPC(rpc) if err != nil { - klog.Errorf("get service by rpc: %s, err: %s", rpc, err) + klog.V(2).Infof("get service by rpc: %s, err: %s", rpc, err) return nil, err } @@ -259,7 +261,7 @@ func (sm *serviceManager) GetServiceByTopic(topic string) (geminio.End, error) { mtopic, err := sm.dao.GetServiceTopic(topic) if err != nil { - klog.Errorf("get service by topic: %s, err: %s", topic, err) + klog.V(2).Infof("get service by topic: %s, err: %s", topic, err) return nil, err } diff --git a/pkg/servicebound/service_onoff.go b/pkg/servicebound/service_onoff.go index 8aa3d28..ff3774d 100644 --- a/pkg/servicebound/service_onoff.go +++ b/pkg/servicebound/service_onoff.go @@ -91,7 +91,7 @@ func (sm *serviceManager) offline(serviceID uint64, addr net.Addr) error { klog.Errorf("service offline, dao delete service rpcs err: %s, serviceID: %d", err, serviceID) return err } - klog.V(5).Infof("service offline, remote rpc de-register succeed, serviceID: %d", serviceID) + klog.V(2).Infof("service offline, remote rpc de-register succeed, serviceID: %d", serviceID) if err := sm.dao.DeleteServiceTopics(serviceID); err != nil { klog.Errorf("service offline, dao delete service topics err: %s, serviceID: %d", err, serviceID) @@ -103,7 +103,7 @@ func (sm *serviceManager) offline(serviceID uint64, addr net.Addr) error { sm.mqm.DelMQByEnd(value) } - klog.V(5).Infof("service offline, remote topics declaim succeed, serviceID: %d", serviceID) + klog.V(2).Infof("service offline, remote topics declaim succeed, serviceID: %d", serviceID) return nil } @@ -112,7 +112,7 @@ func (sm *serviceManager) ConnOnline(d delegate.ConnDescriber) error { serviceID := d.ClientID() meta := string(d.Meta()) addr := d.RemoteAddr() - klog.V(4).Infof("service online, serviceID: %d, service: %s, addr: %s", serviceID, meta, addr) + klog.V(1).Infof("service online, serviceID: %d, service: %s, addr: %s", serviceID, meta, addr) // notification for others if sm.informer != nil { sm.informer.ServiceOnline(serviceID, meta, addr) @@ -124,7 +124,7 @@ func (sm *serviceManager) ConnOffline(d delegate.ConnDescriber) error { serviceID := d.ClientID() meta := string(d.Meta()) addr := d.RemoteAddr() - klog.V(5).Infof("service offline, serviceID: %d, service: %s, remote addr: %s", serviceID, meta, addr) + klog.V(2).Infof("service offline, serviceID: %d, service: %s, remote addr: %s", serviceID, meta, addr) // offline the cache err := sm.offline(serviceID, addr) if err != nil { @@ -143,7 +143,7 @@ func (sm *serviceManager) Heartbeat(d delegate.ConnDescriber) error { serviceID := d.ClientID() meta := string(d.Meta()) addr := d.RemoteAddr() - klog.V(6).Infof("service heartbeat, serviceID: %d, meta: %s, addr: %s", serviceID, string(meta), addr) + klog.V(3).Infof("service heartbeat, serviceID: %d, meta: %s, addr: %s", serviceID, string(meta), addr) if sm.informer != nil { sm.informer.ServiceHeartbeat(serviceID, meta, addr) } diff --git a/test/bench/Makefile b/test/bench/Makefile index 605979c..e62b68a 100644 --- a/test/bench/Makefile +++ b/test/bench/Makefile @@ -5,10 +5,13 @@ all: publish .PHONY: publish publish: make -C publish + make -C call .PHONY: clean clean: make clean -C publish + make clean -C call bench: - make bench -C publish \ No newline at end of file + make bench -C publish + make bench -C call \ No newline at end of file diff --git a/test/bench/publish/edge/publish_edge.go b/test/bench/publish/edge/publish_edge.go index 568f5b6..2e362e0 100644 --- a/test/bench/publish/edge/publish_edge.go +++ b/test/bench/publish/edge/publish_edge.go @@ -81,9 +81,9 @@ func benchPublish(topic string, count int64) { err := edge.Publish(context.TODO(), topic, msg) if err != nil { atomic.AddInt64(&failed, 1) - } else { - atomic.AddInt64(&success, 1) + continue } + atomic.AddInt64(&success, 1) } }(e) } diff --git a/test/bench/publish/service/publish_service.go b/test/bench/publish/service/publish_service.go index 2c86d98..77de7d8 100644 --- a/test/bench/publish/service/publish_service.go +++ b/test/bench/publish/service/publish_service.go @@ -44,13 +44,13 @@ func main() { if *topic != "" { opt = append(opt, service.OptionServiceReceiveTopics([]string{*topic})) } - srv, err := service.NewService(dialer, opt...) + svc, err := service.NewService(dialer, opt...) if err != nil { log.Println("new end err:", err) return } for { - msg, err := srv.Receive(context.TODO()) + msg, err := svc.Receive(context.TODO()) if err == io.EOF { return }