diff --git a/.gitignore b/.gitignore index 182494f..3115721 100644 --- a/.gitignore +++ b/.gitignore @@ -20,4 +20,7 @@ # Go workspace file go.work -frontier \ No newline at end of file +frontier + +.DS_Store +docs/.DS_Store \ No newline at end of file diff --git a/Makefile b/Makefile index 052054b..880b555 100644 --- a/Makefile +++ b/Makefile @@ -31,7 +31,7 @@ install: .PHONY: image image: - docker buildx build -t frontier:${VERSION} . + docker buildx build -t frontier:${VERSION} -f images/Dockerfile.frontier . .PHONY: container container: @@ -50,6 +50,14 @@ frontier-gen-api: api: docker run --rm -v ${PWD}/api/controlplane/v1:/api/controlplane/v1 frontier-gen-api:${VERSION} +.PHONY: frontier-gen-swagger +frontier-gen-swagger: + docker buildx build -t frontier-gen-swagger:${VERSION} -f images/Dockerfile.controlplane-swagger . + +.PHONY: swagger +swagger: + docker run --rm -v ${PWD}:/frontier frontier-gen-swagger:${VERSION} + .PHONY: output output: build diff --git a/cmd/frontier/main.go b/cmd/frontier/main.go index 5ac3a25..d06761f 100644 --- a/cmd/frontier/main.go +++ b/cmd/frontier/main.go @@ -7,14 +7,12 @@ import ( "runtime" "github.com/jumboframes/armorigo/sigaction" + "github.com/singchia/frontier/pkg/apis" "github.com/singchia/frontier/pkg/config" - "github.com/singchia/frontier/pkg/edgebound" - "github.com/singchia/frontier/pkg/exchange" "github.com/singchia/frontier/pkg/mq" - "github.com/singchia/frontier/pkg/repo/dao" - "github.com/singchia/frontier/pkg/servicebound" + "github.com/singchia/frontier/pkg/repo" + "github.com/singchia/frontier/pkg/server" "github.com/singchia/frontier/pkg/utils" - "github.com/singchia/go-timer/v2" "k8s.io/klog/v2" ) @@ -46,53 +44,49 @@ func main() { klog.Flush() }() - // dao - dao, err := dao.NewDao(conf) + // new repo and mqm + repo, mqm, err := newMidwares(conf) if err != nil { - klog.Errorf("new dao err: %s", err) + klog.Errorf("new midwares err: %s", err) return } - klog.V(2).Infof("new dao succeed") + defer func() { + repo.Close() + mqm.Close() + }() + + // servers + srvs, err := server.NewServer(conf, repo, mqm) + if err != nil { + klog.Errorf("new server failed") + return + } + klog.V(2).Infof("new servers succeed") + srvs.Serve() + defer func() { + srvs.Close() + }() + + sig := sigaction.NewSignal() + sig.Wait(context.TODO()) +} + +func newMidwares(conf *config.Configuration) (apis.Repo, apis.MQM, error) { + // repo + repo, err := repo.NewRepo(conf) + if err != nil { + klog.Errorf("new repo err: %s", err) + return nil, nil, err + } + klog.V(2).Infof("new repo succeed") // mqm mqm, err := mq.NewMQM(conf) if err != nil { klog.Errorf("new mq manager err: %s", err) - return + return nil, nil, err } klog.V(2).Infof("new mq manager succeed") - // exchange - exchange, err := exchange.NewExchange(conf, mqm) - if err != nil { - klog.Errorf("new exchange err: %s", err) - return - } - klog.V(2).Infof("new exchange succeed") - - tmr := timer.NewTimer() - // servicebound - servicebound, err := servicebound.NewServicebound(conf, dao, nil, exchange, mqm, tmr) - if err != nil { - klog.Errorf("new servicebound err: %s", err) - return - } - go servicebound.Serve() - klog.V(2).Infof("new servicebound succeed") - - // edgebound - edgebound, err := edgebound.NewEdgebound(conf, dao, nil, exchange, tmr) - if err != nil { - klog.Errorf("new edgebound err: %s", err) - return - } - go edgebound.Serve() - klog.V(2).Infof("new edgebound succeed") - - sig := sigaction.NewSignal() - sig.Wait(context.TODO()) - - edgebound.Close() - servicebound.Close() - tmr.Close() + return repo, mqm, nil } diff --git a/go.mod b/go.mod index 21dc3ae..a35211d 100644 --- a/go.mod +++ b/go.mod @@ -3,12 +3,14 @@ module github.com/singchia/frontier go 1.20 require ( + github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 github.com/go-kratos/kratos/v2 v2.7.2 github.com/jumboframes/armorigo v0.4.0-rc.1 github.com/singchia/geminio v1.1.5-rc.2 github.com/singchia/go-timer/v2 v2.2.1 github.com/soheilhy/cmux v0.1.5 github.com/spf13/pflag v1.0.5 + github.com/swaggo/swag v1.16.3 google.golang.org/genproto/googleapis/api v0.0.0-20240304212257-790db918fca8 google.golang.org/grpc v1.62.1 google.golang.org/protobuf v1.33.0 @@ -19,21 +21,30 @@ require ( ) require ( + github.com/KyleBanks/depth v1.2.1 // indirect + github.com/PuerkitoBio/purell v1.1.1 // indirect + github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect github.com/go-kratos/aegis v0.2.0 // indirect github.com/go-logr/logr v1.4.1 // indirect + github.com/go-openapi/jsonpointer v0.19.5 // indirect + github.com/go-openapi/jsonreference v0.19.6 // indirect + github.com/go-openapi/spec v0.20.4 // indirect + github.com/go-openapi/swag v0.19.15 // indirect github.com/go-playground/form/v4 v4.2.0 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/google/uuid v1.6.0 // indirect github.com/gorilla/mux v1.8.1 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect - github.com/kr/text v0.2.0 // indirect + github.com/josharian/intern v1.0.0 // indirect + github.com/mailru/easyjson v0.7.6 // indirect github.com/mattn/go-sqlite3 v1.14.17 // indirect github.com/singchia/yafsm v1.0.1 // indirect golang.org/x/net v0.20.0 // indirect golang.org/x/sync v0.6.0 // indirect golang.org/x/sys v0.16.0 // indirect golang.org/x/text v0.14.0 // indirect + golang.org/x/tools v0.7.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240228224816-df926f6c8641 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 0b43616..7fa49cb 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,11 @@ +github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc= +github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE= +github.com/PuerkitoBio/purell v1.1.1 h1:WEQqlqaGbrPkxLJWfBwQmfEAE1Z7ONdDLqrN38tNFfI= +github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= +github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M= +github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= +github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM= +github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/census-instrumentation/opencensus-proto v0.4.1 h1:iKLQ0xPNFxR/2hzXZMrBo8f1j86j5WHzznCCQxV/b8g= github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa h1:jQCWAUqqlij9Pgj2i/PB79y4KOPYVyFYdROxgaCwdTQ= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= @@ -12,6 +20,16 @@ github.com/go-kratos/kratos/v2 v2.7.2 h1:WVPGFNLKpv+0odMnCPxM4ZHa2hy9I5FOnwpG3Vv github.com/go-kratos/kratos/v2 v2.7.2/go.mod h1:rppuc8+pGL2UtXA29bgFHWKqaaF6b6GB2XIYiDvFBRk= github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= +github.com/go-openapi/jsonpointer v0.19.5 h1:gZr+CIYByUqjcgeLXnQu2gHYQC9o73G2XUeOFYEICuY= +github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= +github.com/go-openapi/jsonreference v0.19.6 h1:UBIxjkht+AWIgYzCDSv2GN+E/togfwXUJFRTWhl2Jjs= +github.com/go-openapi/jsonreference v0.19.6/go.mod h1:diGHMEHg2IqXZGKxqyvWdfWU/aim5Dprw5bqpKkTvns= +github.com/go-openapi/spec v0.20.4 h1:O8hJrt0UMnhHcluhIdUgCLRWyM2x7QkBXRvOs7m+O1M= +github.com/go-openapi/spec v0.20.4/go.mod h1:faYFR1CvsJZ0mNsmsphTMSoRrNV3TEDoAM7FOEWeq8I= +github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk= +github.com/go-openapi/swag v0.19.15 h1:D2NRCBzS9/pEY3gP9Nl8aDqGUcPFrwG2p+CNFrLyrCM= +github.com/go-openapi/swag v0.19.15/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ= github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A= github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= github.com/go-playground/form/v4 v4.2.0 h1:N1wh+Goz61e6w66vo8vJkQt+uwZSoLz50kZPJWR8eic= @@ -30,14 +48,24 @@ github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= +github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/jumboframes/armorigo v0.2.3/go.mod h1:sXe0R32y6V3oJD2eXcPzMlimvZx0xIDiLedpQOy06t4= github.com/jumboframes/armorigo v0.4.0-rc.1 h1:+9AM5ZLM/KdF0ldLvhbaSRFLIWbcynIrCJZ2G9FJrnk= github.com/jumboframes/armorigo v0.4.0-rc.1/go.mod h1:H4OlF0Jj8e+8LkAqDjeLtapNNnUuUXR/h4Q32Lqgf9o= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= +github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= +github.com/mailru/easyjson v0.7.6 h1:8yTIVnZgCoiM1TgqoeTl+LfU5Jg6/xL3QhGQnimLYnA= +github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mattn/go-sqlite3 v1.14.17 h1:mCRHCLDUBXgpKAqIKsaAaAsrAlbkeomtRFKXh2L6YIM= github.com/mattn/go-sqlite3 v1.14.17/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= @@ -55,14 +83,20 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/swaggo/swag v1.16.3 h1:PnCYjPCah8FK4I26l2F/KQ4yz3sILcVUN3cTlBFA9Pg= +github.com/swaggo/swag v1.16.3/go.mod h1:DImHIuOFXKpMFAQjcC7FG4m3Dg4+QuUgUzJmKjI/gRk= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.9.0 h1:KENHtAZL2y3NLMYZeHY9DW8HW8V+kQyJsY/V9JlKvCs= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210421230115-4e50805a0758/go.mod h1:72T/g9IO56b78aLF+1Kcs5dz7/ng1VjMUvfKvpfy+jM= golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= @@ -70,13 +104,20 @@ golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210420072515-93ed5bcd2bfe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.7.0 h1:W4OVu8VVOaIO0yzWMNdepAulS7YfoS3Zabrm8DOXXU4= +golang.org/x/tools v0.7.0/go.mod h1:4pg6aUX35JBAogB10C9AtvVL+qowtN4pT3CGSQex14s= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/genproto/googleapis/api v0.0.0-20240304212257-790db918fca8 h1:8eadJkXbwDEMNwcB5O0s5Y5eCfyuCLdvaiOIaGTrWmQ= google.golang.org/genproto/googleapis/api v0.0.0-20240304212257-790db918fca8/go.mod h1:O1cOfN1Cy6QEYr7VxtjOyP5AdAuR0aJ/MYZaaof623Y= @@ -89,10 +130,14 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gorm.io/driver/sqlite v1.5.4 h1:IqXwXi8M/ZlPzH/947tn5uik3aYQslP9BVveoax0nV0= diff --git a/images/Dockerfile.controlplane-api b/images/Dockerfile.controlplane-api index 7b91edd..f5f999e 100644 --- a/images/Dockerfile.controlplane-api +++ b/images/Dockerfile.controlplane-api @@ -3,7 +3,6 @@ FROM golang:1.18-alpine # Install curl and unzip, which are required to add protoc. RUN apk add --no-cache curl unzip protoc protobuf-dev -# Add protoc extends RUN go install google.golang.org/protobuf/cmd/protoc-gen-go@latest \ && go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest \ && go install github.com/go-kratos/kratos/cmd/kratos/v2@latest \ diff --git a/Dockerfile b/images/Dockerfile.frontier similarity index 100% rename from Dockerfile rename to images/Dockerfile.frontier diff --git a/pkg/apis/interface.go b/pkg/apis/interface.go index 0e46f6c..74f92cd 100644 --- a/pkg/apis/interface.go +++ b/pkg/apis/interface.go @@ -3,6 +3,8 @@ package apis import ( "net" + "github.com/singchia/frontier/pkg/repo/dao" + "github.com/singchia/frontier/pkg/repo/model" "github.com/singchia/geminio" ) @@ -45,6 +47,7 @@ type Servicebound interface { GetServiceByName(service string) (geminio.End, error) GetServiceByRPC(rpc string) (geminio.End, error) GetServiceByTopic(topic string) (geminio.End, error) + DelServiceByID(serviceID uint64) error DelSerivces(service string) error Serve() error @@ -63,6 +66,36 @@ type ServiceInformer interface { ServiceHeartbeat(serviceID uint64, service string, addr net.Addr) } +// repo +type Repo interface { + Close() error + CountEdgeRPCs(query *dao.EdgeRPCQuery) (int64, error) + CountEdges(query *dao.EdgeQuery) (int64, error) + CountServiceRPCs(query *dao.ServiceRPCQuery) (int64, error) + CountServiceTopics(query *dao.ServiceTopicQuery) (int64, error) + CountServices(query *dao.ServiceQuery) (int64, error) + CreateEdge(edge *model.Edge) error + CreateEdgeRPC(rpc *model.EdgeRPC) error + CreateService(service *model.Service) error + CreateServiceRPC(rpc *model.ServiceRPC) error + CreateServiceTopic(topic *model.ServiceTopic) error + DeleteEdge(delete *dao.EdgeDelete) error + DeleteEdgeRPCs(edgeID uint64) error + DeleteService(delete *dao.ServiceDelete) error + DeleteServiceRPCs(serviceID uint64) error + DeleteServiceTopics(serviceID uint64) error + GetEdge(edgeID uint64) (*model.Edge, error) + GetService(serviceID uint64) (*model.Service, error) + GetServiceByName(name string) (*model.Service, error) + GetServiceRPC(rpc string) (*model.ServiceRPC, error) + GetServiceTopic(topic string) (*model.ServiceTopic, error) + ListEdgeRPCs(query *dao.EdgeRPCQuery) ([]string, error) + ListEdges(query *dao.EdgeQuery) ([]*model.Edge, error) + ListServiceRPCs(query *dao.ServiceRPCQuery) ([]string, error) + ListServiceTopics(query *dao.ServiceTopicQuery) ([]string, error) + ListServices(query *dao.ServiceQuery) ([]*model.Service, error) +} + // mq manager and mq related type MQM interface { // MQM is a MQ wrapper diff --git a/pkg/controlplane/service/cp_service.go b/pkg/controlplane/service/cp_service.go index a0c9bc8..51e5fec 100644 --- a/pkg/controlplane/service/cp_service.go +++ b/pkg/controlplane/service/cp_service.go @@ -1,11 +1,20 @@ package service import ( + "context" + v1 "github.com/singchia/frontier/api/controlplane/v1" "github.com/singchia/frontier/pkg/apis" "github.com/singchia/frontier/pkg/repo/dao" ) +// @title Frontier Swagger API +// @version 1.0 +// @contact.name Austin Zhai +// @contact.email singchia@163.com +// @license.name Apache 2.0 +// @license.url http://www.apache.org/licenses/LICENSE-2.0.html + type ControlPlaneService struct { v1.UnimplementedControlPlaneServer @@ -23,3 +32,84 @@ func NewControlPlaneService(dao *dao.Dao, servicebound apis.Servicebound, edgebo } return cp } + +// @Summary ListEdges +// @Tags 1.0 +// @Param params query v1.ListEdgesRequest true "queries" +// @Success 200 {object} v1.ListEdgesResponse "result" +// @Router /v1/edges [get] +func (cps *ControlPlaneService) ListEdges(ctx context.Context, req *v1.ListEdgesRequest) (*v1.ListEdgesResponse, error) { + return cps.listEdges(ctx, req) +} + +// @Summary Get Edge +// @Tags 1.0 +// @Param params query v1.GetEdgeRequest true "queries" +// @Success 200 {object} v1.Edge "result" +// @Router /v1/edges/{edge_id} [get] +func (cps *ControlPlaneService) GetEdge(ctx context.Context, req *v1.GetEdgeRequest) (*v1.Edge, error) { + return cps.getEdge(ctx, req) +} + +// @Summary Kick Edge +// @Tags 1.0 +// @Param params query v1.KickEdgeRequest true "queries" +// @Success 200 {object} v1.KickEdgeResponse "result" +// @Router /v1/edges/{edge_id} [delete] +func (cps *ControlPlaneService) KickEdge(ctx context.Context, req *v1.KickEdgeRequest) (*v1.KickEdgeResponse, error) { + return cps.kickEdge(ctx, req) +} + +// @Summary List Edges RPCs +// @Tags 1.0 +// @Param params query v1.ListEdgeRPCsRequest true "queries" +// @Success 200 {object} v1.ListEdgeRPCsResponse "result" +// @Router /v1/edges/rpcs [get] +func (cps *ControlPlaneService) ListEdgeRPCs(ctx context.Context, req *v1.ListEdgeRPCsRequest) (*v1.ListEdgeRPCsResponse, error) { + return cps.listEdgeRPCs(ctx, req) +} + +// @Summary List Services +// @Tags 1.0 +// @Param params query v1.ListServicesRequest true "queries" +// @Success 200 {object} v1.ListServicesResponse "result" +// @Router /v1/services [get] +func (cps *ControlPlaneService) ListServices(ctx context.Context, req *v1.ListServicesRequest) (*v1.ListServicesResponse, error) { + return cps.listServices(ctx, req) +} + +// @Summary Get Service +// @Tags 1.0 +// @Param params query v1.GetServiceRequest true "queries" +// @Success 200 {object} v1.Service "result" +// @Router /v1/services/{service_id} [get] +func (cps *ControlPlaneService) GetService(ctx context.Context, req *v1.GetServiceRequest) (*v1.Service, error) { + return cps.getService(ctx, req) +} + +// @Summary Kick Service +// @Tags 1.0 +// @Param params query v1.KickServiceRequest true "queries" +// @Success 200 {object} v1.KickServiceResponse "result" +// @Router /v1/services/{service_id} [delete] +func (cps *ControlPlaneService) KickService(ctx context.Context, req *v1.KickServiceRequest) (*v1.KickServiceResponse, error) { + return cps.kickService(ctx, req) +} + +// @Summary List Services RPCs +// @Tags 1.0 +// @Param params query v1.ListServiceRPCsRequest true "queries" +// @Success 200 {object} v1.ListServiceRPCsResponse "result" +// @Router /v1/services/rpcs [get] +func (cps *ControlPlaneService) ListServiceRPCs(ctx context.Context, req *v1.ListServiceRPCsRequest) (*v1.ListServiceRPCsResponse, error) { + return cps.listServiceRPCs(ctx, req) +} + +// @Summary List Services Topics +// @Tags 1.0 +// @Param params query v1.ListServiceTopicsRequest true "queries" +// @Success 200 {object} v1.ListServiceTopicsResponse "result" +// @Router /v1/services/topics [get] +func (cps *ControlPlaneService) ListServiceTopics(ctx context.Context, req *v1.ListServiceTopicsRequest) (*v1.ListServiceTopicsResponse, error) { + return cps.listServiceTopics(ctx, req) +} diff --git a/pkg/controlplane/service/edge_service.go b/pkg/controlplane/service/edge_service.go index 71c7303..83ce874 100644 --- a/pkg/controlplane/service/edge_service.go +++ b/pkg/controlplane/service/edge_service.go @@ -8,30 +8,21 @@ import ( "github.com/singchia/frontier/pkg/repo/model" ) -func (cps *ControlPlaneService) ListEdges(ctx context.Context, req *v1.ListEdgesRequest) (*v1.ListEdgesResponse, error) { - return cps.listEdges(ctx, req) -} - -func (cps *ControlPlaneService) GetEdge(ctx context.Context, req *v1.GetEdgeRequest) (*v1.Edge, error) { - return cps.getEdge(ctx, req) -} - -func (cps *ControlPlaneService) KickEdge(ctx context.Context, req *v1.KickEdgeRequest) (*v1.KickEdgeResponse, error) { - return cps.kickEdge(ctx, req) -} - -func (cps *ControlPlaneService) ListEdgeRPCs(ctx context.Context, req *v1.ListEdgeRPCsRequest) (*v1.ListEdgeRPCsResponse, error) { - return cps.listEdgeRPCs(ctx, req) -} - func (cps *ControlPlaneService) listEdges(_ context.Context, req *v1.ListEdgesRequest) (*v1.ListEdgesResponse, error) { query := &dao.EdgeQuery{} + // conditions if req.Meta != nil { query.Meta = *req.Meta } if req.Addr != nil { query.Addr = *req.Addr } + if req.Rpc != nil { + query.RPC = *req.Rpc + } + if req.EdgeId != nil { + query.EdgeID = *req.EdgeId + } // order if req.Order != nil && len(*req.Order) != 0 { order := *req.Order @@ -47,15 +38,10 @@ func (cps *ControlPlaneService) listEdges(_ context.Context, req *v1.ListEdgesRe query.Desc = false } } - if req.Rpc != nil { - query.RPC = *req.Rpc - } - if req.EdgeId != nil { - query.EdgeID = *req.EdgeId - } // pagination query.Page = int(req.Page) query.PageSize = int(req.PageSize) + // time range query.StartTime = *req.StartTime query.EndTime = *req.EndTime @@ -92,6 +78,7 @@ func (cps *ControlPlaneService) kickEdge(_ context.Context, req *v1.KickEdgeRequ func (cps *ControlPlaneService) listEdgeRPCs(_ context.Context, req *v1.ListEdgeRPCsRequest) (*v1.ListEdgeRPCsResponse, error) { query := &dao.EdgeRPCQuery{} + // conditions if req.EdgeId != nil { query.EdgeID = *req.EdgeId } @@ -116,6 +103,7 @@ func (cps *ControlPlaneService) listEdgeRPCs(_ context.Context, req *v1.ListEdge // pagination query.Page = int(req.Page) query.PageSize = int(req.PageSize) + // time range query.StartTime = *req.StartTime query.EndTime = *req.EndTime diff --git a/pkg/controlplane/service/service_service.go b/pkg/controlplane/service/service_service.go index 6d43c33..7969fef 100644 --- a/pkg/controlplane/service/service_service.go +++ b/pkg/controlplane/service/service_service.go @@ -1 +1,189 @@ package service + +import ( + "context" + + v1 "github.com/singchia/frontier/api/controlplane/v1" + "github.com/singchia/frontier/pkg/repo/dao" + "github.com/singchia/frontier/pkg/repo/model" +) + +func (cps *ControlPlaneService) listServices(_ context.Context, req *v1.ListServicesRequest) (*v1.ListServicesResponse, error) { + query := &dao.ServiceQuery{} + // conditions + if req.Service != nil { + query.Service = *req.Service + } + if req.Addr != nil { + query.Addr = *req.Addr + } + if req.Rpc != nil { + query.RPC = *req.Rpc + } + if req.Topic != nil { + query.Topic = *req.Topic + } + if req.ServiceId != nil { + query.ServiceID = *req.ServiceId + } + // order + if req.Order != nil && len(*req.Order) != 0 { + order := *req.Order + switch order[0] { + case '-': + query.Order = order[1:] + query.Desc = true + case '+': + query.Order = order[1:] + query.Desc = false + default: + query.Order = order + query.Desc = false + } + } + // pagination + query.Page = int(req.Page) + query.PageSize = int(req.PageSize) + // time range + query.StartTime = *req.StartTime + query.EndTime = *req.EndTime + + services, err := cps.dao.ListServices(query) + if err != nil { + return nil, err + } + count, err := cps.dao.CountServices(query) + if err != nil { + return nil, err + } + retServices := transferServices(services) + return &v1.ListServicesResponse{ + Services: retServices, + Count: uint32(count), + }, nil +} + +func (cps *ControlPlaneService) getService(_ context.Context, req *v1.GetServiceRequest) (*v1.Service, error) { + service, err := cps.dao.GetService(req.ServiceId) + if err != nil { + return nil, err + } + return transferService(service), nil +} + +func (cps *ControlPlaneService) kickService(_ context.Context, req *v1.KickServiceRequest) (*v1.KickServiceResponse, error) { + err := cps.servicebound.DelServiceByID(req.ServiceId) + if err != nil { + return nil, err + } + return &v1.KickServiceResponse{}, nil +} + +func (cps *ControlPlaneService) listServiceRPCs(_ context.Context, req *v1.ListServiceRPCsRequest) (*v1.ListServiceRPCsResponse, error) { + query := &dao.ServiceRPCQuery{} + // conditions + if req.ServiceId != nil { + query.ServiceID = *req.ServiceId + } + if req.Service != nil { + query.Service = *req.Service + } + // order + if req.Order != nil && len(*req.Order) != 0 { + order := *req.Order + switch order[0] { + case '-': + query.Order = order[1:] + query.Desc = true + case '+': + query.Order = order[1:] + query.Desc = false + default: + query.Order = order + query.Desc = false + } + } + // pagination + query.Page = int(req.Page) + query.PageSize = int(req.PageSize) + // time range + query.StartTime = *req.StartTime + query.EndTime = *req.EndTime + + rpcs, err := cps.dao.ListServiceRPCs(query) + if err != nil { + return nil, err + } + count, err := cps.dao.CountServiceRPCs(query) + if err != nil { + return nil, err + } + return &v1.ListServiceRPCsResponse{ + Rpcs: rpcs, + Count: uint32(count), + }, nil +} + +func (cps *ControlPlaneService) listServiceTopics(_ context.Context, req *v1.ListServiceTopicsRequest) (*v1.ListServiceTopicsResponse, error) { + query := &dao.ServiceTopicQuery{} + // conditions + if req.ServiceId != nil { + query.ServiceID = *req.ServiceId + } + if req.Service != nil { + query.Service = *req.Service + } + // order + if req.Order != nil && len(*req.Order) != 0 { + order := *req.Order + switch order[0] { + case '-': + query.Order = order[1:] + query.Desc = true + case '+': + query.Order = order[1:] + query.Desc = false + default: + query.Order = order + query.Desc = false + } + } + // pagination + query.Page = int(req.Page) + query.PageSize = int(req.PageSize) + // time range + query.StartTime = *req.StartTime + query.EndTime = *req.EndTime + + topics, err := cps.dao.ListServiceTopics(query) + if err != nil { + return nil, err + } + count, err := cps.dao.CountServiceTopics(query) + if err != nil { + return nil, err + } + return &v1.ListServiceTopicsResponse{ + Topics: topics, + Count: uint32(count), + }, nil +} + +func transferServices(services []*model.Service) []*v1.Service { + retServices := make([]*v1.Service, len(services)) + for i, service := range services { + retService := transferService(service) + retServices[i] = retService + } + return retServices +} + +func transferService(service *model.Service) *v1.Service { + retService := &v1.Service{ + ServiceId: service.ServiceID, + Service: service.Service, + Addr: service.Addr, + CreateTime: service.CreateTime, + } + return retService +} diff --git a/pkg/edgebound/edge_manager.go b/pkg/edgebound/edge_manager.go index 788f31e..dc761b5 100644 --- a/pkg/edgebound/edge_manager.go +++ b/pkg/edgebound/edge_manager.go @@ -15,7 +15,6 @@ import ( "github.com/singchia/frontier/pkg/apis" "github.com/singchia/frontier/pkg/config" "github.com/singchia/frontier/pkg/mapmap" - "github.com/singchia/frontier/pkg/repo/dao" "github.com/singchia/frontier/pkg/security" "github.com/singchia/frontier/pkg/utils" "github.com/singchia/geminio" @@ -27,9 +26,9 @@ import ( "k8s.io/klog/v2" ) -func NewEdgebound(conf *config.Configuration, dao *dao.Dao, informer apis.EdgeInformer, +func NewEdgebound(conf *config.Configuration, repo apis.Repo, informer apis.EdgeInformer, exchange apis.Exchange, tmr timer.Timer) (apis.Edgebound, error) { - return newEdgeManager(conf, dao, informer, exchange, tmr) + return newEdgeManager(conf, repo, informer, exchange, tmr) } type edgeManager struct { @@ -48,11 +47,11 @@ type edgeManager struct { edges map[uint64]geminio.End mtx sync.RWMutex // key: edgeID; subkey: streamID; value: geminio.Stream - // we don't store stream info to dao, because they may will be too much. + // we don't store stream info to repo, because they may will be too much. streams *mapmap.MapMap - // dao and repo for edges - dao *dao.Dao + // repo and repo for edges + repo apis.Repo // listener for edges cm cmux.CMux geminioLn net.Listener @@ -63,7 +62,7 @@ type edgeManager struct { } // support for tls, mtls and tcp listening -func newEdgeManager(conf *config.Configuration, dao *dao.Dao, informer apis.EdgeInformer, +func newEdgeManager(conf *config.Configuration, repo apis.Repo, informer apis.EdgeInformer, exchange apis.Exchange, tmr timer.Timer) (*edgeManager, error) { listen := &conf.Edgebound.Listen var ( @@ -77,7 +76,7 @@ func newEdgeManager(conf *config.Configuration, dao *dao.Dao, informer apis.Edge conf: conf, tmr: tmr, streams: mapmap.NewMapMap(), - dao: dao, + repo: repo, shub: synchub.NewSyncHub(synchub.OptionTimer(tmr)), edges: make(map[uint64]geminio.End), UnimplementedDelegate: &delegate.UnimplementedDelegate{}, diff --git a/pkg/edgebound/edge_onoff.go b/pkg/edgebound/edge_onoff.go index 12da981..8455d84 100644 --- a/pkg/edgebound/edge_onoff.go +++ b/pkg/edgebound/edge_onoff.go @@ -45,8 +45,8 @@ func (em *edgeManager) online(end geminio.End) error { Addr: end.RemoteAddr().String(), CreateTime: time.Now().Unix(), } - if err := em.dao.CreateEdge(edge); err != nil { - klog.Errorf("edge online, dao create err: %s, edgeID: %d", err, end.ClientID()) + if err := em.repo.CreateEdge(edge); err != nil { + klog.Errorf("edge online, repo create err: %s, edgeID: %d", err, end.ClientID()) return err } return nil @@ -78,15 +78,15 @@ func (em *edgeManager) offline(edgeID uint64, addr net.Addr) error { }() // memdb - if err := em.dao.DeleteEdge(&dao.EdgeDelete{ + if err := em.repo.DeleteEdge(&dao.EdgeDelete{ EdgeID: edgeID, Addr: addr.String(), }); err != nil { - klog.Errorf("edge offline, dao delete edge err: %s, edgeID: %d", err, edgeID) + klog.Errorf("edge offline, repo delete edge err: %s, edgeID: %d", err, edgeID) return err } - if err := em.dao.DeleteEdgeRPCs(edgeID); err != nil { - klog.Errorf("edge offline, dao delete edge rpcs err: %s, edgeID: %d", err, edgeID) + if err := em.repo.DeleteEdgeRPCs(edgeID); err != nil { + klog.Errorf("edge offline, repo delete edge rpcs err: %s, edgeID: %d", err, edgeID) return err } return nil @@ -157,7 +157,7 @@ func (em *edgeManager) RemoteRegistration(rpc string, edgeID, streamID uint64) { EdgeID: edgeID, CreateTime: time.Now().Unix(), } - err := em.dao.CreateEdgeRPC(er) + err := em.repo.CreateEdgeRPC(er) if err != nil { klog.Errorf("edge remote registration, create edge rpc err: %s, rpc: %s, edgeID: %d, streamID: %d", err, rpc, edgeID, streamID) } diff --git a/pkg/exchange/exchange.go b/pkg/exchange/exchange.go index 5dc4bdd..f853409 100644 --- a/pkg/exchange/exchange.go +++ b/pkg/exchange/exchange.go @@ -13,16 +13,16 @@ type exchange struct { MQM apis.MQM } -func NewExchange(conf *config.Configuration, mqm apis.MQM) (apis.Exchange, error) { +func NewExchange(conf *config.Configuration, mqm apis.MQM) apis.Exchange { return newExchange(conf, mqm) } -func newExchange(conf *config.Configuration, mqm apis.MQM) (*exchange, error) { +func newExchange(conf *config.Configuration, mqm apis.MQM) *exchange { exchange := &exchange{ conf: conf, MQM: mqm, } - return exchange, nil + return exchange } func (ex *exchange) AddEdgebound(edgebound apis.Edgebound) { diff --git a/pkg/servicebound/service_manager.go b/pkg/servicebound/service_manager.go index 5ec4c16..64203c9 100644 --- a/pkg/servicebound/service_manager.go +++ b/pkg/servicebound/service_manager.go @@ -15,7 +15,6 @@ import ( "github.com/singchia/frontier/pkg/apis" "github.com/singchia/frontier/pkg/config" "github.com/singchia/frontier/pkg/mapmap" - "github.com/singchia/frontier/pkg/repo/dao" "github.com/singchia/frontier/pkg/repo/model" "github.com/singchia/frontier/pkg/security" "github.com/singchia/frontier/pkg/utils" @@ -27,9 +26,9 @@ import ( "k8s.io/klog/v2" ) -func NewServicebound(conf *config.Configuration, dao *dao.Dao, informer apis.ServiceInformer, +func NewServicebound(conf *config.Configuration, repo apis.Repo, informer apis.ServiceInformer, exchange apis.Exchange, mqm apis.MQM, tmr timer.Timer) (apis.Servicebound, error) { - return newServiceManager(conf, dao, informer, exchange, mqm, tmr) + return newServiceManager(conf, repo, informer, exchange, mqm, tmr) } type end struct { @@ -53,11 +52,11 @@ type serviceManager struct { services map[uint64]geminio.End mtx sync.RWMutex // key: serviceID; subkey: streamID; value: geminio.Stream - // we don't store stream info to dao, because they may will be too much. + // we don't store stream info to repo, because they may will be too much. streams *mapmap.MapMap - // dao and repo for services - dao *dao.Dao + // repo and repo for services + repo apis.Repo // listener for geminio ln net.Listener @@ -65,7 +64,7 @@ type serviceManager struct { tmr timer.Timer } -func newServiceManager(conf *config.Configuration, dao *dao.Dao, informer apis.ServiceInformer, +func newServiceManager(conf *config.Configuration, repo apis.Repo, informer apis.ServiceInformer, exchange apis.Exchange, mqm apis.MQM, tmr timer.Timer) (*serviceManager, error) { listen := &conf.Servicebound.Listen var ( @@ -79,7 +78,7 @@ func newServiceManager(conf *config.Configuration, dao *dao.Dao, informer apis.S conf: conf, tmr: tmr, streams: mapmap.NewMapMap(), - dao: dao, + repo: repo, shub: synchub.NewSyncHub(synchub.OptionTimer(tmr)), services: make(map[uint64]geminio.End), UnimplementedDelegate: &delegate.UnimplementedDelegate{}, @@ -212,7 +211,7 @@ func (sm *serviceManager) remoteReceiveClaim(serviceID uint64, topics []string) Topic: topic, ServiceID: serviceID, } - err = sm.dao.CreateServiceTopic(st) + err = sm.repo.CreateServiceTopic(st) if err != nil { klog.Errorf("service remote receive claim, create service topic: %s, err: %s", topic, err) return err @@ -232,7 +231,7 @@ func (sm *serviceManager) RemoteRegistration(rpc string, serviceID, streamID uin ServiceID: serviceID, CreateTime: time.Now().Unix(), } - err := sm.dao.CreateServiceRPC(sr) + err := sm.repo.CreateServiceRPC(sr) if err != nil { klog.Errorf("service remote registration, create service rpc: %s, err: %s, serviceID: %d, streamID: %d", err, rpc, serviceID, streamID) } @@ -249,7 +248,7 @@ func (sm *serviceManager) GetServiceByName(name string) (geminio.End, error) { sm.mtx.RLock() defer sm.mtx.RUnlock() - mservice, err := sm.dao.GetServiceByName(name) + mservice, err := sm.repo.GetServiceByName(name) if err != nil { klog.V(2).Infof("get service by name: %s, err: %s", name, err) return nil, err @@ -262,7 +261,7 @@ func (sm *serviceManager) GetServiceByRPC(rpc string) (geminio.End, error) { sm.mtx.RLock() defer sm.mtx.RUnlock() - mrpc, err := sm.dao.GetServiceRPC(rpc) + mrpc, err := sm.repo.GetServiceRPC(rpc) if err != nil { klog.V(2).Infof("get service by rpc: %s, err: %s", rpc, err) return nil, err @@ -275,7 +274,7 @@ func (sm *serviceManager) GetServiceByTopic(topic string) (geminio.End, error) { sm.mtx.RLock() defer sm.mtx.RUnlock() - mtopic, err := sm.dao.GetServiceTopic(topic) + mtopic, err := sm.repo.GetServiceTopic(topic) if err != nil { klog.V(2).Infof("get service by topic: %s, err: %s", topic, err) return nil, err diff --git a/pkg/servicebound/service_onoff.go b/pkg/servicebound/service_onoff.go index f3e5ebd..22948bc 100644 --- a/pkg/servicebound/service_onoff.go +++ b/pkg/servicebound/service_onoff.go @@ -46,8 +46,8 @@ func (sm *serviceManager) online(end geminio.End, meta *apis.Meta) error { Addr: end.RemoteAddr().String(), CreateTime: time.Now().Unix(), } - if err := sm.dao.CreateService(service); err != nil { - klog.Errorf("service online, dao create err: %s, serviceID: %d", err, end.ClientID()) + if err := sm.repo.CreateService(service); err != nil { + klog.Errorf("service online, repo create err: %s, serviceID: %d", err, end.ClientID()) return err } return nil @@ -79,22 +79,22 @@ func (sm *serviceManager) offline(serviceID uint64, addr net.Addr) error { }() // clear memdb - if err := sm.dao.DeleteService(&dao.ServiceDelete{ + if err := sm.repo.DeleteService(&dao.ServiceDelete{ ServiceID: serviceID, Addr: addr.String(), }); err != nil { - klog.Errorf("service offline, dao delete service err: %s, serviceID: %d", err, serviceID) + klog.Errorf("service offline, repo delete service err: %s, serviceID: %d", err, serviceID) return err } - if err := sm.dao.DeleteServiceRPCs(serviceID); err != nil { - klog.Errorf("service offline, dao delete service rpcs err: %s, serviceID: %d", err, serviceID) + if err := sm.repo.DeleteServiceRPCs(serviceID); err != nil { + klog.Errorf("service offline, repo delete service rpcs err: %s, serviceID: %d", err, serviceID) return err } klog.V(2).Infof("service offline, remote rpc de-register succeed, serviceID: %d", serviceID) - if err := sm.dao.DeleteServiceTopics(serviceID); err != nil { - klog.Errorf("service offline, dao delete service topics err: %s, serviceID: %d", err, serviceID) + if err := sm.repo.DeleteServiceTopics(serviceID); err != nil { + klog.Errorf("service offline, repo delete service topics err: %s, serviceID: %d", err, serviceID) return err }