controlplane: refactor directories for servers

This commit is contained in:
singchia
2024-03-08 19:48:04 +08:00
parent 3241672e0a
commit ab59ece219
16 changed files with 465 additions and 108 deletions

3
.gitignore vendored
View File

@@ -21,3 +21,6 @@
go.work
frontier
.DS_Store
docs/.DS_Store

View File

@@ -31,7 +31,7 @@ install:
.PHONY: image
image:
docker buildx build -t frontier:${VERSION} .
docker buildx build -t frontier:${VERSION} -f images/Dockerfile.frontier .
.PHONY: container
container:
@@ -50,6 +50,14 @@ frontier-gen-api:
api:
docker run --rm -v ${PWD}/api/controlplane/v1:/api/controlplane/v1 frontier-gen-api:${VERSION}
.PHONY: frontier-gen-swagger
frontier-gen-swagger:
docker buildx build -t frontier-gen-swagger:${VERSION} -f images/Dockerfile.controlplane-swagger .
.PHONY: swagger
swagger:
docker run --rm -v ${PWD}:/frontier frontier-gen-swagger:${VERSION}
.PHONY: output
output: build

View File

@@ -7,14 +7,12 @@ import (
"runtime"
"github.com/jumboframes/armorigo/sigaction"
"github.com/singchia/frontier/pkg/apis"
"github.com/singchia/frontier/pkg/config"
"github.com/singchia/frontier/pkg/edgebound"
"github.com/singchia/frontier/pkg/exchange"
"github.com/singchia/frontier/pkg/mq"
"github.com/singchia/frontier/pkg/repo/dao"
"github.com/singchia/frontier/pkg/servicebound"
"github.com/singchia/frontier/pkg/repo"
"github.com/singchia/frontier/pkg/server"
"github.com/singchia/frontier/pkg/utils"
"github.com/singchia/go-timer/v2"
"k8s.io/klog/v2"
)
@@ -46,53 +44,49 @@ func main() {
klog.Flush()
}()
// dao
dao, err := dao.NewDao(conf)
// new repo and mqm
repo, mqm, err := newMidwares(conf)
if err != nil {
klog.Errorf("new dao err: %s", err)
klog.Errorf("new midwares err: %s", err)
return
}
klog.V(2).Infof("new dao succeed")
defer func() {
repo.Close()
mqm.Close()
}()
// servers
srvs, err := server.NewServer(conf, repo, mqm)
if err != nil {
klog.Errorf("new server failed")
return
}
klog.V(2).Infof("new servers succeed")
srvs.Serve()
defer func() {
srvs.Close()
}()
sig := sigaction.NewSignal()
sig.Wait(context.TODO())
}
func newMidwares(conf *config.Configuration) (apis.Repo, apis.MQM, error) {
// repo
repo, err := repo.NewRepo(conf)
if err != nil {
klog.Errorf("new repo err: %s", err)
return nil, nil, err
}
klog.V(2).Infof("new repo succeed")
// mqm
mqm, err := mq.NewMQM(conf)
if err != nil {
klog.Errorf("new mq manager err: %s", err)
return
return nil, nil, err
}
klog.V(2).Infof("new mq manager succeed")
// exchange
exchange, err := exchange.NewExchange(conf, mqm)
if err != nil {
klog.Errorf("new exchange err: %s", err)
return
}
klog.V(2).Infof("new exchange succeed")
tmr := timer.NewTimer()
// servicebound
servicebound, err := servicebound.NewServicebound(conf, dao, nil, exchange, mqm, tmr)
if err != nil {
klog.Errorf("new servicebound err: %s", err)
return
}
go servicebound.Serve()
klog.V(2).Infof("new servicebound succeed")
// edgebound
edgebound, err := edgebound.NewEdgebound(conf, dao, nil, exchange, tmr)
if err != nil {
klog.Errorf("new edgebound err: %s", err)
return
}
go edgebound.Serve()
klog.V(2).Infof("new edgebound succeed")
sig := sigaction.NewSignal()
sig.Wait(context.TODO())
edgebound.Close()
servicebound.Close()
tmr.Close()
return repo, mqm, nil
}

13
go.mod
View File

@@ -3,12 +3,14 @@ module github.com/singchia/frontier
go 1.20
require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751
github.com/go-kratos/kratos/v2 v2.7.2
github.com/jumboframes/armorigo v0.4.0-rc.1
github.com/singchia/geminio v1.1.5-rc.2
github.com/singchia/go-timer/v2 v2.2.1
github.com/soheilhy/cmux v0.1.5
github.com/spf13/pflag v1.0.5
github.com/swaggo/swag v1.16.3
google.golang.org/genproto/googleapis/api v0.0.0-20240304212257-790db918fca8
google.golang.org/grpc v1.62.1
google.golang.org/protobuf v1.33.0
@@ -19,21 +21,30 @@ require (
)
require (
github.com/KyleBanks/depth v1.2.1 // indirect
github.com/PuerkitoBio/purell v1.1.1 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/go-kratos/aegis v0.2.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.19.6 // indirect
github.com/go-openapi/spec v0.20.4 // indirect
github.com/go-openapi/swag v0.19.15 // indirect
github.com/go-playground/form/v4 v4.2.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/mux v1.8.1 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/mailru/easyjson v0.7.6 // indirect
github.com/mattn/go-sqlite3 v1.14.17 // indirect
github.com/singchia/yafsm v1.0.1 // indirect
golang.org/x/net v0.20.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.7.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240228224816-df926f6c8641 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

45
go.sum
View File

@@ -1,3 +1,11 @@
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE=
github.com/PuerkitoBio/purell v1.1.1 h1:WEQqlqaGbrPkxLJWfBwQmfEAE1Z7ONdDLqrN38tNFfI=
github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/census-instrumentation/opencensus-proto v0.4.1 h1:iKLQ0xPNFxR/2hzXZMrBo8f1j86j5WHzznCCQxV/b8g=
github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa h1:jQCWAUqqlij9Pgj2i/PB79y4KOPYVyFYdROxgaCwdTQ=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
@@ -12,6 +20,16 @@ github.com/go-kratos/kratos/v2 v2.7.2 h1:WVPGFNLKpv+0odMnCPxM4ZHa2hy9I5FOnwpG3Vv
github.com/go-kratos/kratos/v2 v2.7.2/go.mod h1:rppuc8+pGL2UtXA29bgFHWKqaaF6b6GB2XIYiDvFBRk=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
github.com/go-openapi/jsonpointer v0.19.5 h1:gZr+CIYByUqjcgeLXnQu2gHYQC9o73G2XUeOFYEICuY=
github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
github.com/go-openapi/jsonreference v0.19.6 h1:UBIxjkht+AWIgYzCDSv2GN+E/togfwXUJFRTWhl2Jjs=
github.com/go-openapi/jsonreference v0.19.6/go.mod h1:diGHMEHg2IqXZGKxqyvWdfWU/aim5Dprw5bqpKkTvns=
github.com/go-openapi/spec v0.20.4 h1:O8hJrt0UMnhHcluhIdUgCLRWyM2x7QkBXRvOs7m+O1M=
github.com/go-openapi/spec v0.20.4/go.mod h1:faYFR1CvsJZ0mNsmsphTMSoRrNV3TEDoAM7FOEWeq8I=
github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk=
github.com/go-openapi/swag v0.19.15 h1:D2NRCBzS9/pEY3gP9Nl8aDqGUcPFrwG2p+CNFrLyrCM=
github.com/go-openapi/swag v0.19.15/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ=
github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A=
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/form/v4 v4.2.0 h1:N1wh+Goz61e6w66vo8vJkQt+uwZSoLz50kZPJWR8eic=
@@ -30,14 +48,24 @@ github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/jumboframes/armorigo v0.2.3/go.mod h1:sXe0R32y6V3oJD2eXcPzMlimvZx0xIDiLedpQOy06t4=
github.com/jumboframes/armorigo v0.4.0-rc.1 h1:+9AM5ZLM/KdF0ldLvhbaSRFLIWbcynIrCJZ2G9FJrnk=
github.com/jumboframes/armorigo v0.4.0-rc.1/go.mod h1:H4OlF0Jj8e+8LkAqDjeLtapNNnUuUXR/h4Q32Lqgf9o=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.7.6 h1:8yTIVnZgCoiM1TgqoeTl+LfU5Jg6/xL3QhGQnimLYnA=
github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mattn/go-sqlite3 v1.14.17 h1:mCRHCLDUBXgpKAqIKsaAaAsrAlbkeomtRFKXh2L6YIM=
github.com/mattn/go-sqlite3 v1.14.17/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
@@ -55,14 +83,20 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/swaggo/swag v1.16.3 h1:PnCYjPCah8FK4I26l2F/KQ4yz3sILcVUN3cTlBFA9Pg=
github.com/swaggo/swag v1.16.3/go.mod h1:DImHIuOFXKpMFAQjcC7FG4m3Dg4+QuUgUzJmKjI/gRk=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/mod v0.9.0 h1:KENHtAZL2y3NLMYZeHY9DW8HW8V+kQyJsY/V9JlKvCs=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210421230115-4e50805a0758/go.mod h1:72T/g9IO56b78aLF+1Kcs5dz7/ng1VjMUvfKvpfy+jM=
golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo=
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
@@ -70,13 +104,20 @@ golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210420072515-93ed5bcd2bfe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.7.0 h1:W4OVu8VVOaIO0yzWMNdepAulS7YfoS3Zabrm8DOXXU4=
golang.org/x/tools v0.7.0/go.mod h1:4pg6aUX35JBAogB10C9AtvVL+qowtN4pT3CGSQex14s=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/api v0.0.0-20240304212257-790db918fca8 h1:8eadJkXbwDEMNwcB5O0s5Y5eCfyuCLdvaiOIaGTrWmQ=
google.golang.org/genproto/googleapis/api v0.0.0-20240304212257-790db918fca8/go.mod h1:O1cOfN1Cy6QEYr7VxtjOyP5AdAuR0aJ/MYZaaof623Y=
@@ -89,10 +130,14 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gorm.io/driver/sqlite v1.5.4 h1:IqXwXi8M/ZlPzH/947tn5uik3aYQslP9BVveoax0nV0=

View File

@@ -3,7 +3,6 @@ FROM golang:1.18-alpine
# Install curl and unzip, which are required to add protoc.
RUN apk add --no-cache curl unzip protoc protobuf-dev
# Add protoc extends
RUN go install google.golang.org/protobuf/cmd/protoc-gen-go@latest \
&& go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest \
&& go install github.com/go-kratos/kratos/cmd/kratos/v2@latest \

View File

@@ -3,6 +3,8 @@ package apis
import (
"net"
"github.com/singchia/frontier/pkg/repo/dao"
"github.com/singchia/frontier/pkg/repo/model"
"github.com/singchia/geminio"
)
@@ -45,6 +47,7 @@ type Servicebound interface {
GetServiceByName(service string) (geminio.End, error)
GetServiceByRPC(rpc string) (geminio.End, error)
GetServiceByTopic(topic string) (geminio.End, error)
DelServiceByID(serviceID uint64) error
DelSerivces(service string) error
Serve() error
@@ -63,6 +66,36 @@ type ServiceInformer interface {
ServiceHeartbeat(serviceID uint64, service string, addr net.Addr)
}
// repo
type Repo interface {
Close() error
CountEdgeRPCs(query *dao.EdgeRPCQuery) (int64, error)
CountEdges(query *dao.EdgeQuery) (int64, error)
CountServiceRPCs(query *dao.ServiceRPCQuery) (int64, error)
CountServiceTopics(query *dao.ServiceTopicQuery) (int64, error)
CountServices(query *dao.ServiceQuery) (int64, error)
CreateEdge(edge *model.Edge) error
CreateEdgeRPC(rpc *model.EdgeRPC) error
CreateService(service *model.Service) error
CreateServiceRPC(rpc *model.ServiceRPC) error
CreateServiceTopic(topic *model.ServiceTopic) error
DeleteEdge(delete *dao.EdgeDelete) error
DeleteEdgeRPCs(edgeID uint64) error
DeleteService(delete *dao.ServiceDelete) error
DeleteServiceRPCs(serviceID uint64) error
DeleteServiceTopics(serviceID uint64) error
GetEdge(edgeID uint64) (*model.Edge, error)
GetService(serviceID uint64) (*model.Service, error)
GetServiceByName(name string) (*model.Service, error)
GetServiceRPC(rpc string) (*model.ServiceRPC, error)
GetServiceTopic(topic string) (*model.ServiceTopic, error)
ListEdgeRPCs(query *dao.EdgeRPCQuery) ([]string, error)
ListEdges(query *dao.EdgeQuery) ([]*model.Edge, error)
ListServiceRPCs(query *dao.ServiceRPCQuery) ([]string, error)
ListServiceTopics(query *dao.ServiceTopicQuery) ([]string, error)
ListServices(query *dao.ServiceQuery) ([]*model.Service, error)
}
// mq manager and mq related
type MQM interface {
// MQM is a MQ wrapper

View File

@@ -1,11 +1,20 @@
package service
import (
"context"
v1 "github.com/singchia/frontier/api/controlplane/v1"
"github.com/singchia/frontier/pkg/apis"
"github.com/singchia/frontier/pkg/repo/dao"
)
// @title Frontier Swagger API
// @version 1.0
// @contact.name Austin Zhai
// @contact.email singchia@163.com
// @license.name Apache 2.0
// @license.url http://www.apache.org/licenses/LICENSE-2.0.html
type ControlPlaneService struct {
v1.UnimplementedControlPlaneServer
@@ -23,3 +32,84 @@ func NewControlPlaneService(dao *dao.Dao, servicebound apis.Servicebound, edgebo
}
return cp
}
// @Summary ListEdges
// @Tags 1.0
// @Param params query v1.ListEdgesRequest true "queries"
// @Success 200 {object} v1.ListEdgesResponse "result"
// @Router /v1/edges [get]
func (cps *ControlPlaneService) ListEdges(ctx context.Context, req *v1.ListEdgesRequest) (*v1.ListEdgesResponse, error) {
return cps.listEdges(ctx, req)
}
// @Summary Get Edge
// @Tags 1.0
// @Param params query v1.GetEdgeRequest true "queries"
// @Success 200 {object} v1.Edge "result"
// @Router /v1/edges/{edge_id} [get]
func (cps *ControlPlaneService) GetEdge(ctx context.Context, req *v1.GetEdgeRequest) (*v1.Edge, error) {
return cps.getEdge(ctx, req)
}
// @Summary Kick Edge
// @Tags 1.0
// @Param params query v1.KickEdgeRequest true "queries"
// @Success 200 {object} v1.KickEdgeResponse "result"
// @Router /v1/edges/{edge_id} [delete]
func (cps *ControlPlaneService) KickEdge(ctx context.Context, req *v1.KickEdgeRequest) (*v1.KickEdgeResponse, error) {
return cps.kickEdge(ctx, req)
}
// @Summary List Edges RPCs
// @Tags 1.0
// @Param params query v1.ListEdgeRPCsRequest true "queries"
// @Success 200 {object} v1.ListEdgeRPCsResponse "result"
// @Router /v1/edges/rpcs [get]
func (cps *ControlPlaneService) ListEdgeRPCs(ctx context.Context, req *v1.ListEdgeRPCsRequest) (*v1.ListEdgeRPCsResponse, error) {
return cps.listEdgeRPCs(ctx, req)
}
// @Summary List Services
// @Tags 1.0
// @Param params query v1.ListServicesRequest true "queries"
// @Success 200 {object} v1.ListServicesResponse "result"
// @Router /v1/services [get]
func (cps *ControlPlaneService) ListServices(ctx context.Context, req *v1.ListServicesRequest) (*v1.ListServicesResponse, error) {
return cps.listServices(ctx, req)
}
// @Summary Get Service
// @Tags 1.0
// @Param params query v1.GetServiceRequest true "queries"
// @Success 200 {object} v1.Service "result"
// @Router /v1/services/{service_id} [get]
func (cps *ControlPlaneService) GetService(ctx context.Context, req *v1.GetServiceRequest) (*v1.Service, error) {
return cps.getService(ctx, req)
}
// @Summary Kick Service
// @Tags 1.0
// @Param params query v1.KickServiceRequest true "queries"
// @Success 200 {object} v1.KickServiceResponse "result"
// @Router /v1/services/{service_id} [delete]
func (cps *ControlPlaneService) KickService(ctx context.Context, req *v1.KickServiceRequest) (*v1.KickServiceResponse, error) {
return cps.kickService(ctx, req)
}
// @Summary List Services RPCs
// @Tags 1.0
// @Param params query v1.ListServiceRPCsRequest true "queries"
// @Success 200 {object} v1.ListServiceRPCsResponse "result"
// @Router /v1/services/rpcs [get]
func (cps *ControlPlaneService) ListServiceRPCs(ctx context.Context, req *v1.ListServiceRPCsRequest) (*v1.ListServiceRPCsResponse, error) {
return cps.listServiceRPCs(ctx, req)
}
// @Summary List Services Topics
// @Tags 1.0
// @Param params query v1.ListServiceTopicsRequest true "queries"
// @Success 200 {object} v1.ListServiceTopicsResponse "result"
// @Router /v1/services/topics [get]
func (cps *ControlPlaneService) ListServiceTopics(ctx context.Context, req *v1.ListServiceTopicsRequest) (*v1.ListServiceTopicsResponse, error) {
return cps.listServiceTopics(ctx, req)
}

View File

@@ -8,30 +8,21 @@ import (
"github.com/singchia/frontier/pkg/repo/model"
)
func (cps *ControlPlaneService) ListEdges(ctx context.Context, req *v1.ListEdgesRequest) (*v1.ListEdgesResponse, error) {
return cps.listEdges(ctx, req)
}
func (cps *ControlPlaneService) GetEdge(ctx context.Context, req *v1.GetEdgeRequest) (*v1.Edge, error) {
return cps.getEdge(ctx, req)
}
func (cps *ControlPlaneService) KickEdge(ctx context.Context, req *v1.KickEdgeRequest) (*v1.KickEdgeResponse, error) {
return cps.kickEdge(ctx, req)
}
func (cps *ControlPlaneService) ListEdgeRPCs(ctx context.Context, req *v1.ListEdgeRPCsRequest) (*v1.ListEdgeRPCsResponse, error) {
return cps.listEdgeRPCs(ctx, req)
}
func (cps *ControlPlaneService) listEdges(_ context.Context, req *v1.ListEdgesRequest) (*v1.ListEdgesResponse, error) {
query := &dao.EdgeQuery{}
// conditions
if req.Meta != nil {
query.Meta = *req.Meta
}
if req.Addr != nil {
query.Addr = *req.Addr
}
if req.Rpc != nil {
query.RPC = *req.Rpc
}
if req.EdgeId != nil {
query.EdgeID = *req.EdgeId
}
// order
if req.Order != nil && len(*req.Order) != 0 {
order := *req.Order
@@ -47,15 +38,10 @@ func (cps *ControlPlaneService) listEdges(_ context.Context, req *v1.ListEdgesRe
query.Desc = false
}
}
if req.Rpc != nil {
query.RPC = *req.Rpc
}
if req.EdgeId != nil {
query.EdgeID = *req.EdgeId
}
// pagination
query.Page = int(req.Page)
query.PageSize = int(req.PageSize)
// time range
query.StartTime = *req.StartTime
query.EndTime = *req.EndTime
@@ -92,6 +78,7 @@ func (cps *ControlPlaneService) kickEdge(_ context.Context, req *v1.KickEdgeRequ
func (cps *ControlPlaneService) listEdgeRPCs(_ context.Context, req *v1.ListEdgeRPCsRequest) (*v1.ListEdgeRPCsResponse, error) {
query := &dao.EdgeRPCQuery{}
// conditions
if req.EdgeId != nil {
query.EdgeID = *req.EdgeId
}
@@ -116,6 +103,7 @@ func (cps *ControlPlaneService) listEdgeRPCs(_ context.Context, req *v1.ListEdge
// pagination
query.Page = int(req.Page)
query.PageSize = int(req.PageSize)
// time range
query.StartTime = *req.StartTime
query.EndTime = *req.EndTime

View File

@@ -1 +1,189 @@
package service
import (
"context"
v1 "github.com/singchia/frontier/api/controlplane/v1"
"github.com/singchia/frontier/pkg/repo/dao"
"github.com/singchia/frontier/pkg/repo/model"
)
func (cps *ControlPlaneService) listServices(_ context.Context, req *v1.ListServicesRequest) (*v1.ListServicesResponse, error) {
query := &dao.ServiceQuery{}
// conditions
if req.Service != nil {
query.Service = *req.Service
}
if req.Addr != nil {
query.Addr = *req.Addr
}
if req.Rpc != nil {
query.RPC = *req.Rpc
}
if req.Topic != nil {
query.Topic = *req.Topic
}
if req.ServiceId != nil {
query.ServiceID = *req.ServiceId
}
// order
if req.Order != nil && len(*req.Order) != 0 {
order := *req.Order
switch order[0] {
case '-':
query.Order = order[1:]
query.Desc = true
case '+':
query.Order = order[1:]
query.Desc = false
default:
query.Order = order
query.Desc = false
}
}
// pagination
query.Page = int(req.Page)
query.PageSize = int(req.PageSize)
// time range
query.StartTime = *req.StartTime
query.EndTime = *req.EndTime
services, err := cps.dao.ListServices(query)
if err != nil {
return nil, err
}
count, err := cps.dao.CountServices(query)
if err != nil {
return nil, err
}
retServices := transferServices(services)
return &v1.ListServicesResponse{
Services: retServices,
Count: uint32(count),
}, nil
}
func (cps *ControlPlaneService) getService(_ context.Context, req *v1.GetServiceRequest) (*v1.Service, error) {
service, err := cps.dao.GetService(req.ServiceId)
if err != nil {
return nil, err
}
return transferService(service), nil
}
func (cps *ControlPlaneService) kickService(_ context.Context, req *v1.KickServiceRequest) (*v1.KickServiceResponse, error) {
err := cps.servicebound.DelServiceByID(req.ServiceId)
if err != nil {
return nil, err
}
return &v1.KickServiceResponse{}, nil
}
func (cps *ControlPlaneService) listServiceRPCs(_ context.Context, req *v1.ListServiceRPCsRequest) (*v1.ListServiceRPCsResponse, error) {
query := &dao.ServiceRPCQuery{}
// conditions
if req.ServiceId != nil {
query.ServiceID = *req.ServiceId
}
if req.Service != nil {
query.Service = *req.Service
}
// order
if req.Order != nil && len(*req.Order) != 0 {
order := *req.Order
switch order[0] {
case '-':
query.Order = order[1:]
query.Desc = true
case '+':
query.Order = order[1:]
query.Desc = false
default:
query.Order = order
query.Desc = false
}
}
// pagination
query.Page = int(req.Page)
query.PageSize = int(req.PageSize)
// time range
query.StartTime = *req.StartTime
query.EndTime = *req.EndTime
rpcs, err := cps.dao.ListServiceRPCs(query)
if err != nil {
return nil, err
}
count, err := cps.dao.CountServiceRPCs(query)
if err != nil {
return nil, err
}
return &v1.ListServiceRPCsResponse{
Rpcs: rpcs,
Count: uint32(count),
}, nil
}
func (cps *ControlPlaneService) listServiceTopics(_ context.Context, req *v1.ListServiceTopicsRequest) (*v1.ListServiceTopicsResponse, error) {
query := &dao.ServiceTopicQuery{}
// conditions
if req.ServiceId != nil {
query.ServiceID = *req.ServiceId
}
if req.Service != nil {
query.Service = *req.Service
}
// order
if req.Order != nil && len(*req.Order) != 0 {
order := *req.Order
switch order[0] {
case '-':
query.Order = order[1:]
query.Desc = true
case '+':
query.Order = order[1:]
query.Desc = false
default:
query.Order = order
query.Desc = false
}
}
// pagination
query.Page = int(req.Page)
query.PageSize = int(req.PageSize)
// time range
query.StartTime = *req.StartTime
query.EndTime = *req.EndTime
topics, err := cps.dao.ListServiceTopics(query)
if err != nil {
return nil, err
}
count, err := cps.dao.CountServiceTopics(query)
if err != nil {
return nil, err
}
return &v1.ListServiceTopicsResponse{
Topics: topics,
Count: uint32(count),
}, nil
}
func transferServices(services []*model.Service) []*v1.Service {
retServices := make([]*v1.Service, len(services))
for i, service := range services {
retService := transferService(service)
retServices[i] = retService
}
return retServices
}
func transferService(service *model.Service) *v1.Service {
retService := &v1.Service{
ServiceId: service.ServiceID,
Service: service.Service,
Addr: service.Addr,
CreateTime: service.CreateTime,
}
return retService
}

View File

@@ -15,7 +15,6 @@ import (
"github.com/singchia/frontier/pkg/apis"
"github.com/singchia/frontier/pkg/config"
"github.com/singchia/frontier/pkg/mapmap"
"github.com/singchia/frontier/pkg/repo/dao"
"github.com/singchia/frontier/pkg/security"
"github.com/singchia/frontier/pkg/utils"
"github.com/singchia/geminio"
@@ -27,9 +26,9 @@ import (
"k8s.io/klog/v2"
)
func NewEdgebound(conf *config.Configuration, dao *dao.Dao, informer apis.EdgeInformer,
func NewEdgebound(conf *config.Configuration, repo apis.Repo, informer apis.EdgeInformer,
exchange apis.Exchange, tmr timer.Timer) (apis.Edgebound, error) {
return newEdgeManager(conf, dao, informer, exchange, tmr)
return newEdgeManager(conf, repo, informer, exchange, tmr)
}
type edgeManager struct {
@@ -48,11 +47,11 @@ type edgeManager struct {
edges map[uint64]geminio.End
mtx sync.RWMutex
// key: edgeID; subkey: streamID; value: geminio.Stream
// we don't store stream info to dao, because they may will be too much.
// we don't store stream info to repo, because they may will be too much.
streams *mapmap.MapMap
// dao and repo for edges
dao *dao.Dao
// repo and repo for edges
repo apis.Repo
// listener for edges
cm cmux.CMux
geminioLn net.Listener
@@ -63,7 +62,7 @@ type edgeManager struct {
}
// support for tls, mtls and tcp listening
func newEdgeManager(conf *config.Configuration, dao *dao.Dao, informer apis.EdgeInformer,
func newEdgeManager(conf *config.Configuration, repo apis.Repo, informer apis.EdgeInformer,
exchange apis.Exchange, tmr timer.Timer) (*edgeManager, error) {
listen := &conf.Edgebound.Listen
var (
@@ -77,7 +76,7 @@ func newEdgeManager(conf *config.Configuration, dao *dao.Dao, informer apis.Edge
conf: conf,
tmr: tmr,
streams: mapmap.NewMapMap(),
dao: dao,
repo: repo,
shub: synchub.NewSyncHub(synchub.OptionTimer(tmr)),
edges: make(map[uint64]geminio.End),
UnimplementedDelegate: &delegate.UnimplementedDelegate{},

View File

@@ -45,8 +45,8 @@ func (em *edgeManager) online(end geminio.End) error {
Addr: end.RemoteAddr().String(),
CreateTime: time.Now().Unix(),
}
if err := em.dao.CreateEdge(edge); err != nil {
klog.Errorf("edge online, dao create err: %s, edgeID: %d", err, end.ClientID())
if err := em.repo.CreateEdge(edge); err != nil {
klog.Errorf("edge online, repo create err: %s, edgeID: %d", err, end.ClientID())
return err
}
return nil
@@ -78,15 +78,15 @@ func (em *edgeManager) offline(edgeID uint64, addr net.Addr) error {
}()
// memdb
if err := em.dao.DeleteEdge(&dao.EdgeDelete{
if err := em.repo.DeleteEdge(&dao.EdgeDelete{
EdgeID: edgeID,
Addr: addr.String(),
}); err != nil {
klog.Errorf("edge offline, dao delete edge err: %s, edgeID: %d", err, edgeID)
klog.Errorf("edge offline, repo delete edge err: %s, edgeID: %d", err, edgeID)
return err
}
if err := em.dao.DeleteEdgeRPCs(edgeID); err != nil {
klog.Errorf("edge offline, dao delete edge rpcs err: %s, edgeID: %d", err, edgeID)
if err := em.repo.DeleteEdgeRPCs(edgeID); err != nil {
klog.Errorf("edge offline, repo delete edge rpcs err: %s, edgeID: %d", err, edgeID)
return err
}
return nil
@@ -157,7 +157,7 @@ func (em *edgeManager) RemoteRegistration(rpc string, edgeID, streamID uint64) {
EdgeID: edgeID,
CreateTime: time.Now().Unix(),
}
err := em.dao.CreateEdgeRPC(er)
err := em.repo.CreateEdgeRPC(er)
if err != nil {
klog.Errorf("edge remote registration, create edge rpc err: %s, rpc: %s, edgeID: %d, streamID: %d", err, rpc, edgeID, streamID)
}

View File

@@ -13,16 +13,16 @@ type exchange struct {
MQM apis.MQM
}
func NewExchange(conf *config.Configuration, mqm apis.MQM) (apis.Exchange, error) {
func NewExchange(conf *config.Configuration, mqm apis.MQM) apis.Exchange {
return newExchange(conf, mqm)
}
func newExchange(conf *config.Configuration, mqm apis.MQM) (*exchange, error) {
func newExchange(conf *config.Configuration, mqm apis.MQM) *exchange {
exchange := &exchange{
conf: conf,
MQM: mqm,
}
return exchange, nil
return exchange
}
func (ex *exchange) AddEdgebound(edgebound apis.Edgebound) {

View File

@@ -15,7 +15,6 @@ import (
"github.com/singchia/frontier/pkg/apis"
"github.com/singchia/frontier/pkg/config"
"github.com/singchia/frontier/pkg/mapmap"
"github.com/singchia/frontier/pkg/repo/dao"
"github.com/singchia/frontier/pkg/repo/model"
"github.com/singchia/frontier/pkg/security"
"github.com/singchia/frontier/pkg/utils"
@@ -27,9 +26,9 @@ import (
"k8s.io/klog/v2"
)
func NewServicebound(conf *config.Configuration, dao *dao.Dao, informer apis.ServiceInformer,
func NewServicebound(conf *config.Configuration, repo apis.Repo, informer apis.ServiceInformer,
exchange apis.Exchange, mqm apis.MQM, tmr timer.Timer) (apis.Servicebound, error) {
return newServiceManager(conf, dao, informer, exchange, mqm, tmr)
return newServiceManager(conf, repo, informer, exchange, mqm, tmr)
}
type end struct {
@@ -53,11 +52,11 @@ type serviceManager struct {
services map[uint64]geminio.End
mtx sync.RWMutex
// key: serviceID; subkey: streamID; value: geminio.Stream
// we don't store stream info to dao, because they may will be too much.
// we don't store stream info to repo, because they may will be too much.
streams *mapmap.MapMap
// dao and repo for services
dao *dao.Dao
// repo and repo for services
repo apis.Repo
// listener for geminio
ln net.Listener
@@ -65,7 +64,7 @@ type serviceManager struct {
tmr timer.Timer
}
func newServiceManager(conf *config.Configuration, dao *dao.Dao, informer apis.ServiceInformer,
func newServiceManager(conf *config.Configuration, repo apis.Repo, informer apis.ServiceInformer,
exchange apis.Exchange, mqm apis.MQM, tmr timer.Timer) (*serviceManager, error) {
listen := &conf.Servicebound.Listen
var (
@@ -79,7 +78,7 @@ func newServiceManager(conf *config.Configuration, dao *dao.Dao, informer apis.S
conf: conf,
tmr: tmr,
streams: mapmap.NewMapMap(),
dao: dao,
repo: repo,
shub: synchub.NewSyncHub(synchub.OptionTimer(tmr)),
services: make(map[uint64]geminio.End),
UnimplementedDelegate: &delegate.UnimplementedDelegate{},
@@ -212,7 +211,7 @@ func (sm *serviceManager) remoteReceiveClaim(serviceID uint64, topics []string)
Topic: topic,
ServiceID: serviceID,
}
err = sm.dao.CreateServiceTopic(st)
err = sm.repo.CreateServiceTopic(st)
if err != nil {
klog.Errorf("service remote receive claim, create service topic: %s, err: %s", topic, err)
return err
@@ -232,7 +231,7 @@ func (sm *serviceManager) RemoteRegistration(rpc string, serviceID, streamID uin
ServiceID: serviceID,
CreateTime: time.Now().Unix(),
}
err := sm.dao.CreateServiceRPC(sr)
err := sm.repo.CreateServiceRPC(sr)
if err != nil {
klog.Errorf("service remote registration, create service rpc: %s, err: %s, serviceID: %d, streamID: %d", err, rpc, serviceID, streamID)
}
@@ -249,7 +248,7 @@ func (sm *serviceManager) GetServiceByName(name string) (geminio.End, error) {
sm.mtx.RLock()
defer sm.mtx.RUnlock()
mservice, err := sm.dao.GetServiceByName(name)
mservice, err := sm.repo.GetServiceByName(name)
if err != nil {
klog.V(2).Infof("get service by name: %s, err: %s", name, err)
return nil, err
@@ -262,7 +261,7 @@ func (sm *serviceManager) GetServiceByRPC(rpc string) (geminio.End, error) {
sm.mtx.RLock()
defer sm.mtx.RUnlock()
mrpc, err := sm.dao.GetServiceRPC(rpc)
mrpc, err := sm.repo.GetServiceRPC(rpc)
if err != nil {
klog.V(2).Infof("get service by rpc: %s, err: %s", rpc, err)
return nil, err
@@ -275,7 +274,7 @@ func (sm *serviceManager) GetServiceByTopic(topic string) (geminio.End, error) {
sm.mtx.RLock()
defer sm.mtx.RUnlock()
mtopic, err := sm.dao.GetServiceTopic(topic)
mtopic, err := sm.repo.GetServiceTopic(topic)
if err != nil {
klog.V(2).Infof("get service by topic: %s, err: %s", topic, err)
return nil, err

View File

@@ -46,8 +46,8 @@ func (sm *serviceManager) online(end geminio.End, meta *apis.Meta) error {
Addr: end.RemoteAddr().String(),
CreateTime: time.Now().Unix(),
}
if err := sm.dao.CreateService(service); err != nil {
klog.Errorf("service online, dao create err: %s, serviceID: %d", err, end.ClientID())
if err := sm.repo.CreateService(service); err != nil {
klog.Errorf("service online, repo create err: %s, serviceID: %d", err, end.ClientID())
return err
}
return nil
@@ -79,22 +79,22 @@ func (sm *serviceManager) offline(serviceID uint64, addr net.Addr) error {
}()
// clear memdb
if err := sm.dao.DeleteService(&dao.ServiceDelete{
if err := sm.repo.DeleteService(&dao.ServiceDelete{
ServiceID: serviceID,
Addr: addr.String(),
}); err != nil {
klog.Errorf("service offline, dao delete service err: %s, serviceID: %d", err, serviceID)
klog.Errorf("service offline, repo delete service err: %s, serviceID: %d", err, serviceID)
return err
}
if err := sm.dao.DeleteServiceRPCs(serviceID); err != nil {
klog.Errorf("service offline, dao delete service rpcs err: %s, serviceID: %d", err, serviceID)
if err := sm.repo.DeleteServiceRPCs(serviceID); err != nil {
klog.Errorf("service offline, repo delete service rpcs err: %s, serviceID: %d", err, serviceID)
return err
}
klog.V(2).Infof("service offline, remote rpc de-register succeed, serviceID: %d", serviceID)
if err := sm.dao.DeleteServiceTopics(serviceID); err != nil {
klog.Errorf("service offline, dao delete service topics err: %s, serviceID: %d", err, serviceID)
if err := sm.repo.DeleteServiceTopics(serviceID); err != nil {
klog.Errorf("service offline, repo delete service topics err: %s, serviceID: %d", err, serviceID)
return err
}