route: add hash by functions with srcip edgeid and random

This commit is contained in:
singchia
2024-05-30 21:02:26 +08:00
parent 661dee0ed1
commit 496cd7dbb3
20 changed files with 1979 additions and 325 deletions

499
README.md
View File

@@ -2,7 +2,7 @@
<img src="./docs/diagram/frontier-logo.png" width="30%" height="30%">
</p>
Frontier是一个go开发的开源长连接网关旨在让微服务直达边缘节点或客户端反之边缘节点或客户端也同样直达微服务。对于两者提供了单双向RPC调用消息发布和接收以及点对点通信的功能。Frontier符合云原生架构可以使用Operator快速部署一个集群具有高可用和弹性轻松支撑百万边缘节点或客户端在线的需求。
Frontier是一个go开发的全双工开源长连接网关,旨在让微服务直达边缘节点或客户端,反之边缘节点或客户端也同样直达微服务。对于两者,提供了全双工的单双向RPC调用消息发布和接收以及点对点的功能。Frontier符合云原生架构可以使用Operator快速部署一个集群具有高可用和弹性轻松支撑百万边缘节点或客户端在线的需求。
## 特性
@@ -10,7 +10,7 @@ Frontier是一个go开发的开源长连接网关旨在让微服务直达边
- **RPC** 微服务和边缘可以Call对方的函数提前注册并且在微服务侧支持负载均衡
- **消息** 微服务和边缘可以Publish对方的Topic边缘可以Publish到外部MQ的Topic微服务侧支持负载均衡
- **多路复用/流** 微服务可以直接在边缘节点打开一个流(连接),可以封装例如文件上传、代理等,天堑变通途
- **上线离线控制** 微服务可以注册边缘节点获取ID、上线离线回调当这些事件发生Frontier会调用这些方法
- **上线离线控制** 微服务可以注册边缘节点获取ID、上线离线回调当这些事件发生Frontier会调用这些函数
- **API简单** 在项目api目录下分别对边缘和微服务提供了封装好的sdk可以非常简单的基于这个sdk做开发
- **部署简单** 支持多种部署方式(docker docker-compose helm以及operator)来部署Frontier实例或集群
- **水平扩展** 提供了Frontiter和Frontlas集群在单实例性能达到瓶颈下可以水平扩展Frontier实例或集群
@@ -19,7 +19,7 @@ Frontier是一个go开发的开源长连接网关旨在让微服务直达边
## 架构
### Frontier架构
### 组件Frontier
<img src="./docs/diagram/frontier.png" width="100%" height="100%">
@@ -29,6 +29,7 @@ Frontier是一个go开发的开源长连接网关旨在让微服务直达边
- _Publish/Receive_发布和接收消息
- _Call/Register_调用和注册函数
- _OpenStream/AcceptStream_打开和接收点到点流连接
- _外部MQ_Frontier支持将从边缘节点Publish的消息根据配置的Topic转发到外部MQ
Frontier需要微服务和边缘节点两方都主动连接到FrontierService和Edge的元信息接收TopicRPCService名等可以在连接的时候携带过来。连接的默认端口是
@@ -43,7 +44,7 @@ Frontier需要微服务和边缘节点两方都主动连接到FrontierService
<tr>
<th>功能</th>
<th>发起方</th>
<th>目标</th>
<th>接收</th>
<th>方法</th>
<th>路由方式</th>
<th>描述</th>
@@ -62,7 +63,7 @@ Frontier需要微服务和边缘节点两方都主动连接到FrontierService
<td>Service或外部MQ</td>
<td>Publish</td>
<td>Topic</td>
<td>必须Publish到Topic由Frontier根据Topic选择具体Service或MQ</td>
<td>必须Publish到Topic由Frontier根据Topic选择某个Service或MQ</td>
</tr>
<tr>
<td rowspan="2">RPCer</td>
@@ -70,14 +71,14 @@ Frontier需要微服务和边缘节点两方都主动连接到FrontierService
<td>Edge</td>
<td>Call</td>
<td>EdgeID+Method</td>
<td>必须Call到具体的EdgeID必须携带Method</td>
<td>必须Call到具体的EdgeID需要携带Method</td>
</tr>
<tr>
<td>Edge</td>
<td>Service</td>
<td>Call</td>
<td>Method</td>
<td>必须Call到Method由Frontier根据Method选择具体的Service</td>
<td>必须Call到Method由Frontier根据Method选择某个的Service</td>
</tr>
<tr>
<td rowspan="2">Multiplexer</td>
@@ -96,6 +97,17 @@ Frontier需要微服务和边缘节点两方都主动连接到FrontierService
</tr>
</tbody></table>
主要遵守以下设计原则:
1. 所有的消息、RPC和Stream都是点到点的传递
- 从微服务到边缘一定要指定边缘节点ID
- 从边缘到微服务Frontier根据Topic和Method路由最终哈希选择一个微服务或外部MQ默认根据```edgeid```哈希,你也可以选择```random```或```srcip```
2. 消息需要接收方明确结束
- 为了保障消息的传达语义接收方一定需要msg.Done()或msg.Error(err),保障传达一致性
3. Multiplexer打开的流在逻辑上是微服务与边缘节点的直接通信
- 对方接收到流后所有在这个流上功能都会直达对方不会经过Frontierd的路由策略
## 使用
### 示例
@@ -106,7 +118,7 @@ Frontier需要微服务和边缘节点两方都主动连接到FrontierService
make examples
```
在bin目录下得到```chatroom_service```和```chatroom_client```可执行程序,运行示例:
在bin目录下得到```chatroom_service```和```chatroom_egent```可执行程序,运行示例:
https://github.com/singchia/frontier/assets/15531166/18b01d96-e30b-450f-9610-917d65259c30
@@ -150,9 +162,9 @@ func main() {
return net.Dial("tcp", "127.0.0.1:30011")
}
svc, _ := service.NewService(dialer)
srv.RegisterGetEdgeID(context.TODO(), getID)
srv.RegisterEdgeOnline(context.TODO(), online)
srv.RegisterEdgeOffline(context.TODO(), offline)
svc.RegisterGetEdgeID(context.TODO(), getID)
svc.RegisterEdgeOnline(context.TODO(), online)
svc.RegisterEdgeOffline(context.TODO(), offline)
}
// service可以根据meta分配id给edge
@@ -189,78 +201,10 @@ func main() {
return net.Dial("tcp", "127.0.0.1:30011")
}
svc, _ := service.NewService(dialer)
msg := srv.NewMessage([]byte("test"))
msg := svc.NewMessage([]byte("test"))
// 发布一条消息到ID为1001的边缘节点
err = srv.Publish(context.TODO(), 1001, msg)
}
```
**微服务调用边缘节点的RPC**
```golang
package main
import (
"context"
"net"
"github.com/singchia/frontier/api/dataplane/v1/service"
)
func main() {
dialer := func() (net.Conn, error) {
return net.Dial("tcp", "127.0.0.1:30011")
}
req := srv.NewRequest([]byte("test"))
// 调用ID为1001边缘节点的foo方法前提是边缘节点需要预注册该方法
rsp, err := srv.Call(context.TODO(), edgeID, "foo", req)
}
```
**微服务打开边缘节点的点到点流**
```golang
package main
import (
"context"
"net"
"github.com/singchia/frontier/api/dataplane/v1/service"
)
func main() {
dialer := func() (net.Conn, error) {
return net.Dial("tcp", "127.0.0.1:30011")
}
svc, _ := service.NewService(dialer)
// 打开ID为1001边缘节点的新流同时st也是一个net.Conn前提是edge需要AcceptStream接收该流
st, err := srv.OpenStream(context.TODO(), 1001)
}
```
基于这个新打开流,你可以用来传递文件、代理流量等。
**微服务注册方法以供边缘节点调用**
```golang
package main
import (
"context"
"net"
"github.com/singchia/frontier/api/dataplane/v1/service"
)
func main() {
dialer := func() (net.Conn, error) {
return net.Dial("tcp", "127.0.0.1:30011")
}
svc, _ := service.NewService(dialer)
// 注册一个"echo"方法
srv.Register(context.TODO(), "echo", echo)
}
func echo(ctx context.Context, req geminio.Request, rsp geminio.Response) {
value := req.Data()
rsp.SetData(value)
err := svc.Publish(context.TODO(), 1001, msg)
// ...
}
```
@@ -284,18 +228,189 @@ func main() {
// 在获取svc时声明需要接收的topic
svc, _ := service.NewService(dialer, service.OptionServiceReceiveTopics([]string{"foo"}))
for {
msg, err := srv.Receive(context.TODO())
// 接收消息
msg, err := svc.Receive(context.TODO())
if err == io.EOF {
// 收到EOF表示svc生命周期已终结不可以再使用
return
}
if err != nil {
fmt.Println("\n> receive err:", err)
fmt.Println("receive err:", err)
continue
}
// 处理完msg后需要通知调用方已完成
msg.Done()
fmt.Printf("> receive msg, edgeID: %d streamID: %d data: %s\n", msg.ClientID(), msg.StreamID(), string(value))
}
}
```
**微服务调用边缘节点的RPC**
```golang
package main
import (
"context"
"net"
"github.com/singchia/frontier/api/dataplane/v1/service"
)
func main() {
dialer := func() (net.Conn, error) {
return net.Dial("tcp", "127.0.0.1:30011")
}
svc, _ := service.NewService(dialer)
req := svc.NewRequest([]byte("test"))
// 调用ID为1001边缘节点的foo方法前提是边缘节点需要预注册该方法
rsp, err := svc.Call(context.TODO(), 1001, "foo", req)
// ...
}
```
**微服务注册方法以供边缘节点调用**
```golang
package main
import (
"context"
"net"
"github.com/singchia/frontier/api/dataplane/v1/service"
"github.com/singchia/geminio"
)
func main() {
dialer := func() (net.Conn, error) {
return net.Dial("tcp", "127.0.0.1:30011")
}
svc, _ := service.NewService(dialer)
// 注册一个"echo"方法
svc.Register(context.TODO(), "echo", echo)
// ...
}
func echo(ctx context.Context, req geminio.Request, rsp geminio.Response) {
value := req.Data()
rsp.SetData(value)
}
```
**微服务打开边缘节点的点到点流**
```golang
package main
import (
"context"
"net"
"github.com/singchia/frontier/api/dataplane/v1/service"
)
func main() {
dialer := func() (net.Conn, error) {
return net.Dial("tcp", "127.0.0.1:30011")
}
svc, _ := service.NewService(dialer)
// 打开ID为1001边缘节点的新流同时st也是一个net.Conn前提是edge需要AcceptStream接收该流
st, err := svc.OpenStream(context.TODO(), 1001)
}
```
基于这个新打开流,你可以用来传递文件、代理流量等。
**微服务接收流**
```golang
package main
import (
"fmt"
"io"
"net"
"github.com/singchia/frontier/api/dataplane/v1/service"
)
func main() {
dialer := func() (net.Conn, error) {
return net.Dial("tcp", "127.0.0.1:30011")
}
// 在获取svc时声明需要微服务名在边缘打开流时需要指定该微服务名
svc, _ := service.NewService(dialer, service.OptionServiceName("service-name"))
for {
st, err := svc.AcceptStream()
if err == io.EOF {
// 收到EOF表示svc生命周期已终结不可以再使用
return
} else if err != nil {
fmt.Println("accept stream err:", err)
continue
}
// 使用stream这个stream同时是个net.Conn你可以Read/Write/Close同时也可以RPC和消息
}
}
```
基于这个新打开流,你可以用来传递文件、代理流量等。
**消息、RPC和流一起来吧**
```golang
package main
import (
"context"
"fmt"
"io"
"net"
"github.com/singchia/frontier/api/dataplane/v1/service"
"github.com/singchia/geminio"
)
func main() {
dialer := func() (net.Conn, error) {
return net.Dial("tcp", "127.0.0.1:30011")
}
// 在获取svc时声明需要微服务名在边缘打开流时需要指定该微服务名
svc, _ := service.NewService(dialer, service.OptionServiceName("service-name"))
// 接收流
go func() {
for {
st, err := svc.AcceptStream()
if err == io.EOF {
// 收到EOF表示svc生命周期已终结不可以再使用
return
} else if err != nil {
fmt.Println("accept stream err:", err)
continue
}
// 使用stream这个stream同时是个net.Conn你可以Read/Write/Close同时也可以RPC和消息
}
}()
// 注册一个"echo"方法
svc.Register(context.TODO(), "echo", echo)
// 接收消息
for {
msg, err := svc.Receive(context.TODO())
if err == io.EOF {
// 收到EOF表示svc生命周期已终结不可以再使用
return
}
if err != nil {
fmt.Println("receive err:", err)
continue
}
// 处理完msg后需要通知调用方已完成
msg.Done()
}
}
func echo(ctx context.Context, req geminio.Request, rsp geminio.Response) {
value := req.Data()
rsp.SetData(value)
}
```
### 边缘节点/客户端如何使用
@@ -314,8 +429,8 @@ func main() {
dialer := func() (net.Conn, error) {
return net.Dial("tcp", "127.0.0.1:30012")
}
eg, err := edge.NewEdge(dialer)
// 开始使用eg
eg, _ := edge.NewEdge(dialer)
// 开始使用eg ...
}
```
@@ -327,6 +442,7 @@ Service需要提前声明接收该Topic或者在配置文件中配置外部MQ
package main
import (
"context"
"net"
"github.com/singchia/frontier/api/dataplane/v1/edge"
)
@@ -335,11 +451,50 @@ func main() {
dialer := func() (net.Conn, error) {
return net.Dial("tcp", "127.0.0.1:30012")
}
eg, err := edge.NewEdge(dialer)
eg, _ := edge.NewEdge(dialer)
// 开始使用eg
msg := cli.NewMessage([]byte("test"))
err = cli.Publish(context.TODO(), "foo", msg)
msg := eg.NewMessage([]byte("test"))
err := eg.Publish(context.TODO(), "foo", msg)
// ...
}
```
**边缘节点接收消息**
```golang
package main
import (
"context"
"fmt"
"io"
"net"
"github.com/singchia/frontier/api/dataplane/v1/edge"
)
func main() {
dialer := func() (net.Conn, error) {
return net.Dial("tcp", "127.0.0.1:30012")
}
eg, _ := edge.NewEdge(dialer)
for {
// 接收消息
msg, err := eg.Receive(context.TODO())
if err == io.EOF {
// 收到EOF表示eg生命周期已终结不可以再使用
return
}
if err != nil {
fmt.Println("receive err:", err)
continue
}
// 处理完msg后需要通知调用方已完成
msg.Done()
}
// ...
}
```
**边缘节点调用微服务RPC**
@@ -347,6 +502,58 @@ func main() {
```golang
package main
import (
"context"
"net"
"github.com/singchia/frontier/api/dataplane/v1/edge"
)
func main() {
dialer := func() (net.Conn, error) {
return net.Dial("tcp", "127.0.0.1:30012")
}
eg, _ := edge.NewEdge(dialer)
// 开始使用eg
req := eg.NewRequest([]byte("test"))
// 调用echo方法Frontier会查找并转发请求到相应的微服务
rsp, err := eg.Call(context.TODO(), "echo", req)
}
```
**边缘节点注册RPC**
```golang
package main
import (
"context"
"net"
"github.com/singchia/frontier/api/dataplane/v1/edge"
"github.com/singchia/geminio"
)
func main() {
dialer := func() (net.Conn, error) {
return net.Dial("tcp", "127.0.0.1:30012")
}
eg, _ := edge.NewEdge(dialer)
// 注册一个"echo"方法
eg.Register(context.TODO(), "echo", echo)
// ...
}
func echo(ctx context.Context, req geminio.Request, rsp geminio.Response) {
value := req.Data()
rsp.SetData(value)
}
```
**边缘节点打开微服务的点到点流**
```golang
package main
import (
"net"
"github.com/singchia/frontier/api/dataplane/v1/edge"
@@ -356,13 +563,84 @@ func main() {
dialer := func() (net.Conn, error) {
return net.Dial("tcp", "127.0.0.1:30012")
}
eg, err := edge.NewEdge(dialer)
// 开始使用eg
msg := cli.NewMessage([]byte("test"))
// 调用echo方法Frontier会查找并转发请求到相应的微服务
rsp, err := cli.Call(context.TODO(), "echo", req)
eg, _ := edge.NewEdge(dialer)
st, err := eg.OpenStream("service-name")
// ...
}
```
基于这个新打开流,你可以用来传递文件、代理流量等。
**边缘节点接收流**
```golang
package main
import (
"net"
"fmt"
"io"
"github.com/singchia/frontier/api/dataplane/v1/edge"
)
func main() {
dialer := func() (net.Conn, error) {
return net.Dial("tcp", "127.0.0.1:30012")
}
eg, _ := edge.NewEdge(dialer)
for {
stream, err := eg.AcceptStream()
if err == io.EOF {
// 收到EOF表示eg生命周期已终结不可以再使用
return
} else if err != nil {
fmt.Println("accept stream err:", err)
continue
}
// 使用stream这个stream同时是个net.Conn你可以Read/Write/Close同时也可以RPC和消息
}
}
```
### 错误处理
<table><thead>
<tr>
<th>错误</th>
<th>描述和处理</th>
</tr></thead>
<tbody>
<tr>
<td>io.EOF</td>
<td>收到EOF表示流或连接已关闭需要退出Receive、AcceptStream等操作</td>
</tr>
<tr>
<td>io.ErrShortBuffer</td>
<td>发送端或者接收端的Buffer已满可以设置OptionServiceBufferSize或OptionEdgeBufferSize来调整</td>
</tr>
<tr>
<td>apis.ErrEdgeNotOnline</td>
<td>表示该边缘节点不在线,需要检查边缘连接</td>
</tr>
<tr>
<td>apis.ServiceNotOnline</td>
<td>表示微服务不在线,需要检查微服务连接信息或连接</td>
</tr>
<tr>
<td>apis.RPCNotOnline</td>
<td>表示Call的RPC不在线</td>
</tr>
<tr>
<td>apis.TopicOnline</td>
<td>表示Publish的Topic不在线</td>
</tr>
<tr>
<td>其他错误</td>
<td>还存在Timeout、BufferFull等</td>
</tr>
</tbody>
</table>
需要注意的是如果关闭流在流上正在阻塞的方法都会立即得到io.EOF如果关闭入口Service和Edge则所有在此之上的流阻塞的方法都会立即得到io.EOF。
### 控制面
@@ -387,7 +665,7 @@ service ControlPlane {
**REST** Swagger详见[Swagger定义](./docs/swagger/swagger.yaml)
例如你可以使用下面请求来踢某个边缘节点下线:
例如你可以使用下面请求来踢某个边缘节点下线:
```
curl -X DELETE http://127.0.0.1:30010/v1/edges/{edge_id}
@@ -398,13 +676,29 @@ curl -X DELETE http://127.0.0.1:30010/v1/edges/{edge_id}
curl -X GET http://127.0.0.1:30010/v1/services/rpcs?service_id={service_id}
```
注意gRPC/Rest依赖dao backend有两个选项```buntdb```和```sqlite```都是使用的in-memory模式为性能考虑默认backend使用buntdb并且列表接口返回字段count永远是-1当你配置backend为sqlite3时会认为你对在Frontier上连接的微服务和边缘节点有强烈的OLTP需求例如在Frontier上封装web此时count才会返回总数。
**注意**gRPC/Rest依赖dao backend有两个选项```buntdb```和```sqlite```都是使用的in-memory模式为性能考虑默认backend使用buntdb并且列表接口返回字段count永远是-1当你配置backend为sqlite3时会认为你对在Frontier上连接的微服务和边缘节点有强烈的OLTP需求例如在Frontier上封装web此时count才会返回总数。
## Frontier配置
如果需要更近一步定制你的Frontier实例可以在这一节了解各个配置是如何工作的。
### 最小化配置
简单起,你可以仅配置面向微服务和边缘节点的服务监听地址:
```
servicebound:
listen:
network: tcp
addr: 0.0.0.0:30011
edgebound:
listen:
network: tcp
addr: 0.0.0.0:30012
edgeid_alloc_when_no_idservice_on: true
```
### TLS
```
@@ -420,7 +714,7 @@ tls:
insecure_skip_verify: false
```
## 部署
## Frontier部署
在单Frontier实例下可以根据环境选择以下方式部署你的Frontier实例。
@@ -441,7 +735,7 @@ docker-compose up -d frontier
### helm
如果你是在k8s环境下可以使用helm快速部署一个实例,默认
如果你是在k8s环境下可以使用helm快速部署一个实例
```
git clone https://github.com/singchia/frontier.git
@@ -449,7 +743,6 @@ cd dist/helm
helm install frontier ./ -f values.yaml
```
## 集群
### Frontier + Frontlas架构
@@ -486,7 +779,7 @@ Frontier需要主动连接Frontlas以上报自己、微服务和边缘的活跃
详见 [ROADMAP](./ROADMAP.md)
### Bug和Feature
### 贡献
如果你发现任何Bug请提出Issue项目Maintainers会及时响应相关问题。
@@ -496,9 +789,11 @@ Frontier需要主动连接Frontlas以上报自己、微服务和边缘的活跃
* 每次提交一个Feature
* 提交的代码都携带单元测试
## 测试
### 流
### 流功能测试
<img src="./docs/diagram/stream.png" width="100%">

4
dist/README.md vendored
View File

@@ -0,0 +1,4 @@
- compose: a docker-compose yaml example
- helm: a helm charts example
- crd: a k8s CRD and a FrontierCluster example

24
dist/crd/frontier-cluster.yaml vendored Normal file
View File

@@ -0,0 +1,24 @@
apiVersion: frontier.singchia.io/v1alpha1
kind: FrontierCluster
metadata:
labels:
app.kubernetes.io/name: frontiercluster
app.kubernetes.io/managed-by: kustomize
name: frontiercluster
spec:
frontier:
replicas: 1
servicebound:
port: 30011
edgebound:
port: 30012
frontlas:
replicas: 1
controlplane:
port: 40011
redis:
addrs:
- rfs-redisfailover:26379
password: your-password
masterName: mymaster
redisType: sentinel

892
dist/crd/install.yaml vendored Normal file
View File

@@ -0,0 +1,892 @@
apiVersion: v1
kind: Namespace
metadata:
labels:
app.kubernetes.io/component: manager
app.kubernetes.io/created-by: frontier
app.kubernetes.io/instance: system
app.kubernetes.io/managed-by: kustomize
app.kubernetes.io/name: namespace
app.kubernetes.io/part-of: frontier
control-plane: operator
name: frontier-system
---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.14.0
name: frontierclusters.frontier.singchia.io
spec:
group: frontier.singchia.io
names:
kind: FrontierCluster
listKind: FrontierClusterList
plural: frontierclusters
singular: frontiercluster
scope: Namespaced
versions:
- name: v1alpha1
schema:
openAPIV3Schema:
description: FrontierCluster is the Schema for the frontierclusters API
properties:
apiVersion:
description: |-
APIVersion defines the versioned schema of this representation of an object.
Servers should convert recognized schemas to the latest internal value, and
may reject unrecognized values.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
type: string
kind:
description: |-
Kind is a string value representing the REST resource this object represents.
Servers may infer this from the endpoint the client submits requests to.
Cannot be updated.
In CamelCase.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
type: string
metadata:
type: object
spec:
description: FrontierClusterSpec defines the desired state of FrontierCluster
properties:
frontier:
properties:
edgebound:
properties:
port:
type: integer
serviceName:
type: string
serviceType:
description: Service Type string describes ingress methods
for a service
type: string
tls:
description: TLS is the configuration used to set up TLS encryption
properties:
caCertificateSecretRef:
description: |-
CaCertificateSecret is a reference to a Secret containing the certificate for the CA which signed the server certificates
The certificate is expected to be available under the key "ca.crt"
properties:
name:
description: |-
Name of the referent.
More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
TODO: Add other useful fields. apiVersion, kind, uid?
type: string
type: object
x-kubernetes-map-type: atomic
certificateKeySecretRef:
description: |-
CertificateKeySecret is a reference to a Secret containing a private key and certificate to use for TLS.
The key and cert are expected to be PEM encoded and available at "tls.key" and "tls.crt".
This is the same format used for the standard "kubernetes.io/tls" Secret type, but no specific type is required.
Alternatively, an entry tls.pem, containing the concatenation of cert and key, can be provided.
If all of tls.pem, tls.crt and tls.key are present, the tls.pem one needs to be equal to the concatenation of tls.crt and tls.key
properties:
name:
description: |-
Name of the referent.
More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
TODO: Add other useful fields. apiVersion, kind, uid?
type: string
type: object
x-kubernetes-map-type: atomic
enabled:
type: boolean
mtls:
description: CaCertificate is needed when mtls is set
type: boolean
optional:
description: Optional configures if TLS should be required
or optional for connections
type: boolean
required:
- enabled
- mtls
type: object
type: object
nodeAffinity:
description: Node affinity is a group of node affinity scheduling
rules.
properties:
preferredDuringSchedulingIgnoredDuringExecution:
description: |-
The scheduler will prefer to schedule pods to nodes that satisfy
the affinity expressions specified by this field, but it may choose
a node that violates one or more of the expressions. The node that is
most preferred is the one with the greatest sum of weights, i.e.
for each node that meets all of the scheduling requirements (resource
request, requiredDuringScheduling affinity expressions, etc.),
compute a sum by iterating through the elements of this field and adding
"weight" to the sum if the node matches the corresponding matchExpressions; the
node(s) with the highest sum are the most preferred.
items:
description: |-
An empty preferred scheduling term matches all objects with implicit weight 0
(i.e. it's a no-op). A null preferred scheduling term matches no objects (i.e. is also a no-op).
properties:
preference:
description: A node selector term, associated with the
corresponding weight.
properties:
matchExpressions:
description: A list of node selector requirements
by node's labels.
items:
description: |-
A node selector requirement is a selector that contains values, a key, and an operator
that relates the key and values.
properties:
key:
description: The label key that the selector
applies to.
type: string
operator:
description: |-
Represents a key's relationship to a set of values.
Valid operators are In, NotIn, Exists, DoesNotExist. Gt, and Lt.
type: string
values:
description: |-
An array of string values. If the operator is In or NotIn,
the values array must be non-empty. If the operator is Exists or DoesNotExist,
the values array must be empty. If the operator is Gt or Lt, the values
array must have a single element, which will be interpreted as an integer.
This array is replaced during a strategic merge patch.
items:
type: string
type: array
required:
- key
- operator
type: object
type: array
matchFields:
description: A list of node selector requirements
by node's fields.
items:
description: |-
A node selector requirement is a selector that contains values, a key, and an operator
that relates the key and values.
properties:
key:
description: The label key that the selector
applies to.
type: string
operator:
description: |-
Represents a key's relationship to a set of values.
Valid operators are In, NotIn, Exists, DoesNotExist. Gt, and Lt.
type: string
values:
description: |-
An array of string values. If the operator is In or NotIn,
the values array must be non-empty. If the operator is Exists or DoesNotExist,
the values array must be empty. If the operator is Gt or Lt, the values
array must have a single element, which will be interpreted as an integer.
This array is replaced during a strategic merge patch.
items:
type: string
type: array
required:
- key
- operator
type: object
type: array
type: object
x-kubernetes-map-type: atomic
weight:
description: Weight associated with matching the corresponding
nodeSelectorTerm, in the range 1-100.
format: int32
type: integer
required:
- preference
- weight
type: object
type: array
requiredDuringSchedulingIgnoredDuringExecution:
description: |-
If the affinity requirements specified by this field are not met at
scheduling time, the pod will not be scheduled onto the node.
If the affinity requirements specified by this field cease to be met
at some point during pod execution (e.g. due to an update), the system
may or may not try to eventually evict the pod from its node.
properties:
nodeSelectorTerms:
description: Required. A list of node selector terms.
The terms are ORed.
items:
description: |-
A null or empty node selector term matches no objects. The requirements of
them are ANDed.
The TopologySelectorTerm type implements a subset of the NodeSelectorTerm.
properties:
matchExpressions:
description: A list of node selector requirements
by node's labels.
items:
description: |-
A node selector requirement is a selector that contains values, a key, and an operator
that relates the key and values.
properties:
key:
description: The label key that the selector
applies to.
type: string
operator:
description: |-
Represents a key's relationship to a set of values.
Valid operators are In, NotIn, Exists, DoesNotExist. Gt, and Lt.
type: string
values:
description: |-
An array of string values. If the operator is In or NotIn,
the values array must be non-empty. If the operator is Exists or DoesNotExist,
the values array must be empty. If the operator is Gt or Lt, the values
array must have a single element, which will be interpreted as an integer.
This array is replaced during a strategic merge patch.
items:
type: string
type: array
required:
- key
- operator
type: object
type: array
matchFields:
description: A list of node selector requirements
by node's fields.
items:
description: |-
A node selector requirement is a selector that contains values, a key, and an operator
that relates the key and values.
properties:
key:
description: The label key that the selector
applies to.
type: string
operator:
description: |-
Represents a key's relationship to a set of values.
Valid operators are In, NotIn, Exists, DoesNotExist. Gt, and Lt.
type: string
values:
description: |-
An array of string values. If the operator is In or NotIn,
the values array must be non-empty. If the operator is Exists or DoesNotExist,
the values array must be empty. If the operator is Gt or Lt, the values
array must have a single element, which will be interpreted as an integer.
This array is replaced during a strategic merge patch.
items:
type: string
type: array
required:
- key
- operator
type: object
type: array
type: object
x-kubernetes-map-type: atomic
type: array
required:
- nodeSelectorTerms
type: object
x-kubernetes-map-type: atomic
type: object
replicas:
type: integer
servicebound:
properties:
port:
type: integer
service:
type: string
serviceType:
description: Service Type string describes ingress methods
for a service
type: string
type: object
required:
- edgebound
- servicebound
type: object
frontlas:
properties:
controlplane:
properties:
port:
type: integer
service:
type: string
serviceType:
description: Service Type string describes ingress methods
for a service
type: string
type: object
nodeAffinity:
description: Node affinity is a group of node affinity scheduling
rules.
properties:
preferredDuringSchedulingIgnoredDuringExecution:
description: |-
The scheduler will prefer to schedule pods to nodes that satisfy
the affinity expressions specified by this field, but it may choose
a node that violates one or more of the expressions. The node that is
most preferred is the one with the greatest sum of weights, i.e.
for each node that meets all of the scheduling requirements (resource
request, requiredDuringScheduling affinity expressions, etc.),
compute a sum by iterating through the elements of this field and adding
"weight" to the sum if the node matches the corresponding matchExpressions; the
node(s) with the highest sum are the most preferred.
items:
description: |-
An empty preferred scheduling term matches all objects with implicit weight 0
(i.e. it's a no-op). A null preferred scheduling term matches no objects (i.e. is also a no-op).
properties:
preference:
description: A node selector term, associated with the
corresponding weight.
properties:
matchExpressions:
description: A list of node selector requirements
by node's labels.
items:
description: |-
A node selector requirement is a selector that contains values, a key, and an operator
that relates the key and values.
properties:
key:
description: The label key that the selector
applies to.
type: string
operator:
description: |-
Represents a key's relationship to a set of values.
Valid operators are In, NotIn, Exists, DoesNotExist. Gt, and Lt.
type: string
values:
description: |-
An array of string values. If the operator is In or NotIn,
the values array must be non-empty. If the operator is Exists or DoesNotExist,
the values array must be empty. If the operator is Gt or Lt, the values
array must have a single element, which will be interpreted as an integer.
This array is replaced during a strategic merge patch.
items:
type: string
type: array
required:
- key
- operator
type: object
type: array
matchFields:
description: A list of node selector requirements
by node's fields.
items:
description: |-
A node selector requirement is a selector that contains values, a key, and an operator
that relates the key and values.
properties:
key:
description: The label key that the selector
applies to.
type: string
operator:
description: |-
Represents a key's relationship to a set of values.
Valid operators are In, NotIn, Exists, DoesNotExist. Gt, and Lt.
type: string
values:
description: |-
An array of string values. If the operator is In or NotIn,
the values array must be non-empty. If the operator is Exists or DoesNotExist,
the values array must be empty. If the operator is Gt or Lt, the values
array must have a single element, which will be interpreted as an integer.
This array is replaced during a strategic merge patch.
items:
type: string
type: array
required:
- key
- operator
type: object
type: array
type: object
x-kubernetes-map-type: atomic
weight:
description: Weight associated with matching the corresponding
nodeSelectorTerm, in the range 1-100.
format: int32
type: integer
required:
- preference
- weight
type: object
type: array
requiredDuringSchedulingIgnoredDuringExecution:
description: |-
If the affinity requirements specified by this field are not met at
scheduling time, the pod will not be scheduled onto the node.
If the affinity requirements specified by this field cease to be met
at some point during pod execution (e.g. due to an update), the system
may or may not try to eventually evict the pod from its node.
properties:
nodeSelectorTerms:
description: Required. A list of node selector terms.
The terms are ORed.
items:
description: |-
A null or empty node selector term matches no objects. The requirements of
them are ANDed.
The TopologySelectorTerm type implements a subset of the NodeSelectorTerm.
properties:
matchExpressions:
description: A list of node selector requirements
by node's labels.
items:
description: |-
A node selector requirement is a selector that contains values, a key, and an operator
that relates the key and values.
properties:
key:
description: The label key that the selector
applies to.
type: string
operator:
description: |-
Represents a key's relationship to a set of values.
Valid operators are In, NotIn, Exists, DoesNotExist. Gt, and Lt.
type: string
values:
description: |-
An array of string values. If the operator is In or NotIn,
the values array must be non-empty. If the operator is Exists or DoesNotExist,
the values array must be empty. If the operator is Gt or Lt, the values
array must have a single element, which will be interpreted as an integer.
This array is replaced during a strategic merge patch.
items:
type: string
type: array
required:
- key
- operator
type: object
type: array
matchFields:
description: A list of node selector requirements
by node's fields.
items:
description: |-
A node selector requirement is a selector that contains values, a key, and an operator
that relates the key and values.
properties:
key:
description: The label key that the selector
applies to.
type: string
operator:
description: |-
Represents a key's relationship to a set of values.
Valid operators are In, NotIn, Exists, DoesNotExist. Gt, and Lt.
type: string
values:
description: |-
An array of string values. If the operator is In or NotIn,
the values array must be non-empty. If the operator is Exists or DoesNotExist,
the values array must be empty. If the operator is Gt or Lt, the values
array must have a single element, which will be interpreted as an integer.
This array is replaced during a strategic merge patch.
items:
type: string
type: array
required:
- key
- operator
type: object
type: array
type: object
x-kubernetes-map-type: atomic
type: array
required:
- nodeSelectorTerms
type: object
x-kubernetes-map-type: atomic
type: object
redis:
properties:
addrs:
items:
type: string
type: array
db:
type: integer
password:
type: string
redisType:
type: string
user:
type: string
required:
- addrs
- redisType
type: object
replicas:
type: integer
required:
- redis
type: object
required:
- frontier
- frontlas
type: object
status:
description: FrontierClusterStatus defines the observed state of FrontierCluster
properties:
message:
type: string
phase:
description: |-
INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
Important: Run "make" to regenerate code after modifying this file
TODO scale 1 a time
CurrentFrontierReplicas int `json:"currentFrontierReplicas"`
CurrentFrontlasReplicass int `json:"currentFrontlasReplicas"`
type: string
required:
- message
- phase
type: object
type: object
served: true
storage: true
subresources:
status: {}
---
apiVersion: v1
kind: ServiceAccount
metadata:
labels:
app.kubernetes.io/managed-by: kustomize
app.kubernetes.io/name: frontier
name: frontiercluster-operator
namespace: frontier-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
labels:
app.kubernetes.io/component: rbac
app.kubernetes.io/created-by: frontier
app.kubernetes.io/instance: leader-election-role
app.kubernetes.io/managed-by: kustomize
app.kubernetes.io/name: role
app.kubernetes.io/part-of: frontier
name: frontiercluster-leader-election-role
namespace: frontier-system
rules:
- apiGroups:
- ""
resources:
- configmaps
verbs:
- get
- list
- watch
- create
- update
- patch
- delete
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- get
- list
- watch
- create
- update
- patch
- delete
- apiGroups:
- ""
resources:
- events
verbs:
- create
- patch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: frontiercluster-manager-role
rules:
- apiGroups:
- ""
resources:
- pods
- secrets
- services
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- apps
resources:
- deployments
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- frontier.singchia.io
resources:
- frontierclusters
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- frontier.singchia.io
resources:
- frontierclusters/finalizers
verbs:
- update
- apiGroups:
- frontier.singchia.io
resources:
- frontierclusters/status
verbs:
- get
- patch
- update
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
labels:
app.kubernetes.io/component: kube-rbac-proxy
app.kubernetes.io/created-by: frontier
app.kubernetes.io/instance: metrics-reader
app.kubernetes.io/managed-by: kustomize
app.kubernetes.io/name: clusterrole
app.kubernetes.io/part-of: frontier
name: frontiercluster-metrics-reader
rules:
- nonResourceURLs:
- /metrics
verbs:
- get
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
labels:
app.kubernetes.io/component: kube-rbac-proxy
app.kubernetes.io/created-by: frontier
app.kubernetes.io/instance: proxy-role
app.kubernetes.io/managed-by: kustomize
app.kubernetes.io/name: clusterrole
app.kubernetes.io/part-of: frontier
name: frontiercluster-proxy-role
rules:
- apiGroups:
- authentication.k8s.io
resources:
- tokenreviews
verbs:
- create
- apiGroups:
- authorization.k8s.io
resources:
- subjectaccessreviews
verbs:
- create
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
labels:
app.kubernetes.io/component: rbac
app.kubernetes.io/created-by: frontier
app.kubernetes.io/instance: leader-election-rolebinding
app.kubernetes.io/managed-by: kustomize
app.kubernetes.io/name: rolebinding
app.kubernetes.io/part-of: frontier
name: frontiercluster-leader-election-rolebinding
namespace: frontier-system
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: frontiercluster-leader-election-role
subjects:
- kind: ServiceAccount
name: frontiercluster-operator
namespace: frontier-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
labels:
app.kubernetes.io/component: rbac
app.kubernetes.io/created-by: frontier
app.kubernetes.io/instance: manager-rolebinding
app.kubernetes.io/managed-by: kustomize
app.kubernetes.io/name: clusterrolebinding
app.kubernetes.io/part-of: frontier
name: frontiercluster-manager-rolebinding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: frontiercluster-manager-role
subjects:
- kind: ServiceAccount
name: frontiercluster-operator
namespace: frontier-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
labels:
app.kubernetes.io/component: kube-rbac-proxy
app.kubernetes.io/created-by: frontier
app.kubernetes.io/instance: proxy-rolebinding
app.kubernetes.io/managed-by: kustomize
app.kubernetes.io/name: clusterrolebinding
app.kubernetes.io/part-of: frontier
name: frontiercluster-proxy-rolebinding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: frontiercluster-proxy-role
subjects:
- kind: ServiceAccount
name: frontiercluster-operator
namespace: frontier-system
---
apiVersion: v1
kind: Service
metadata:
labels:
app.kubernetes.io/component: kube-rbac-proxy
app.kubernetes.io/created-by: frontier
app.kubernetes.io/instance: operator-metrics-service
app.kubernetes.io/managed-by: kustomize
app.kubernetes.io/name: service
app.kubernetes.io/part-of: frontier
control-plane: operator
name: frontiercluster-operator-metrics-service
namespace: frontier-system
spec:
ports:
- name: https
port: 8443
protocol: TCP
targetPort: https
selector:
control-plane: operator
---
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app.kubernetes.io/component: manager
app.kubernetes.io/created-by: frontier
app.kubernetes.io/instance: operator
app.kubernetes.io/managed-by: kustomize
app.kubernetes.io/name: deployment
app.kubernetes.io/part-of: frontier
control-plane: operator
name: frontiercluster-operator
namespace: frontier-system
spec:
replicas: 1
selector:
matchLabels:
control-plane: operator
template:
metadata:
annotations:
kubectl.kubernetes.io/default-container: manager
labels:
control-plane: operator
spec:
containers:
- args:
- --secure-listen-address=0.0.0.0:8443
- --upstream=http://127.0.0.1:8080/
- --logtostderr=true
- --v=0
image: kubebuilder/kube-rbac-proxy:v0.15.0
name: kube-rbac-proxy
ports:
- containerPort: 8443
name: https
protocol: TCP
resources:
limits:
cpu: 500m
memory: 128Mi
requests:
cpu: 5m
memory: 64Mi
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
- args:
- --health-probe-bind-address=:8081
- --metrics-bind-address=127.0.0.1:8080
- --leader-elect
command:
- /manager
image: singchia/frontiercluster-controller:1.0.0
imagePullPolicy: Always
livenessProbe:
httpGet:
path: /healthz
port: 8081
initialDelaySeconds: 15
periodSeconds: 20
name: manager
readinessProbe:
httpGet:
path: /readyz
port: 8081
initialDelaySeconds: 5
periodSeconds: 10
resources:
limits:
cpu: 500m
memory: 128Mi
requests:
cpu: 10m
memory: 64Mi
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
securityContext:
runAsNonRoot: true
serviceAccountName: frontiercluster-operator
terminationGracePeriodSeconds: 10

38
dist/scripts/gen_cert.sh vendored Normal file
View File

@@ -0,0 +1,38 @@
#!/bin/bash
# define some variables
CA_DIR="ca"
SERVER_DIR="server"
CLIENT_DIR="client"
DAYS_VALID=365
COUNTRY="CN"
STATE="Zhejiang"
LOCALITY="Hangzhou"
ORGANIZATION="singchia"
ORGANIZATIONAL_UNIT="frontier"
COMMON_NAME_CA="MyCA"
COMMON_NAME_SERVER="server.frontier.com"
COMMON_NAME_CLIENT="client.frontier.com"
# make directories
mkdir -p ${CA_DIR} ${SERVER_DIR} ${CLIENT_DIR}
# gen ca cert and key
openssl genpkey -algorithm RSA -out ${CA_DIR}/ca.key
openssl req -x509 -new -nodes -key ${CA_DIR}/ca.key -sha256 -days ${DAYS_VALID} -out ${CA_DIR}/ca.crt -subj "/C=${COUNTRY}/ST=${STATE}/L=${LOCALITY}/O=${ORGANIZATION}/OU=${ORGANIZATIONAL_UNIT}/CN=${COMMON_NAME_CA}"
# gen server key and csr
openssl genpkey -algorithm RSA -out ${SERVER_DIR}/server.key
openssl req -new -key ${SERVER_DIR}/server.key -out ${SERVER_DIR}/server.csr -subj "/C=${COUNTRY}/ST=${STATE}/L=${LOCALITY}/O=${ORGANIZATION}/OU=${ORGANIZATIONAL_UNIT}/CN=${COMMON_NAME_SERVER}"
# gen server cert
openssl x509 -req -in ${SERVER_DIR}/server.csr -CA ${CA_DIR}/ca.crt -CAkey ${CA_DIR}/ca.key -CAcreateserial -out ${SERVER_DIR}/server.crt -days ${DAYS_VALID} -sha256
# gen client key and csr
openssl genpkey -algorithm RSA -out ${CLIENT_DIR}/client.key
openssl req -new -key ${CLIENT_DIR}/client.key -out ${CLIENT_DIR}/client.csr -subj "/C=${COUNTRY}/ST=${STATE}/L=${LOCALITY}/O=${ORGANIZATION}/OU=${ORGANIZATIONAL_UNIT}/CN=${COMMON_NAME_CLIENT}"
# gen client cert
openssl x509 -req -in ${CLIENT_DIR}/client.csr -CA ${CA_DIR}/ca.crt -CAkey ${CA_DIR}/ca.key -CAcreateserial -out ${CLIENT_DIR}/client.crt -days ${DAYS_VALID} -sha256
echo "CA, Server, and Client certificates and keys have been generated."

View File

@@ -1,67 +1,9 @@
daemon:
frontier_id: frontier01
rlimit:
enable: false
nofile: 102400
pprof:
enable: true
addr: 0.0.0.0:6060
cpu_profile_rate: 0
controlplane:
enable: false
listen:
network: tcp
addr: 0.0.0.0:30010
servicebound:
listen:
network: tcp
addr: 0.0.0.0:30011
tls:
enable: false
mtls: false
ca_certs:
- ca1.cert
- ca2.cert
certs:
- cert: servicebound.cert
key: servicebound.key
insecure_skip_verify: false
edgebound:
listen:
network: tcp
addr: 0.0.0.0:30012
tls:
enable: false
mtls: false
ca_certs:
- ca1.cert
- ca2.cert
certs:
- cert: edgebound.cert
key: edgebound.key
insecure_skip_verify: false
bypass:
network: tcp
addr: 192.168.1.10:8443
tls:
enable: true
mtls: true
ca_certs:
- ca1.cert
certs:
- cert: frontier.cert
key: frontier.key
insecure_skip_verify: false
bypass_enable: false
edgeid_alloc_when_no_idservice_on: true
dao:
debug: false
frontlas:
enable: false
dial:
servicebound:
listen:
network: tcp
addr: 127.0.0.1:40012
metrics:
enable: false
interval: 0
mqm: {}
addr: 0.0.0.0:30011

155
etc/frontier_all.yaml Normal file
View File

@@ -0,0 +1,155 @@
daemon:
frontier_id: ""
pprof:
addr: 0.0.0.0:6060
cpu_profile_rate: 0
enable: true
rlimit:
enable: true
nofile: 102400
controlplane:
enable: false
listen:
addr: 0.0.0.0:30010
advertised_addr: ""
network: tcp
tls:
ca_certs: null
certs: null
enable: false
insecure_skip_verify: false
mtls: false
servicebound:
listen:
addr: 0.0.0.0:30011
advertised_addr: ""
network: tcp
tls:
ca_certs:
- ca1.cert
- ca2.cert
certs:
- cert: servicebound.cert
key: servicebound.key
enable: false
insecure_skip_verify: false
mtls: false
edgebound:
bypass:
addr: 192.168.1.10:8443
advertised_addr: ""
network: tcp
tls:
ca_certs:
- ca1.cert
certs:
- cert: frontier.cert
key: frontier.key
enable: true
insecure_skip_verify: false
mtls: true
bypass_enable: false
edgeid_alloc_when_no_idservice_on: true
listen:
addr: 0.0.0.0:30012
advertised_addr: ""
network: tcp
tls:
ca_certs:
- ca1.cert
- ca2.cert
certs:
- cert: edgebound.cert
key: edgebound.key
enable: false
insecure_skip_verify: false
mtls: false
dao:
backend: buntdb
debug: false
exchange:
hashby: edgeid
frontlas:
enable: false
metrics:
enable: false
interval: 0
dial:
addr: 127.0.0.1:40012
network: tcp
tls:
ca_certs: null
certs: null
enable: false
insecure_skip_verify: false
mtls: false
mqm:
amqp:
enable: false
addrs: null
channel_max: 0
exchanges: null
frame_size: 0
heartbeat: 0
locale: ""
producer:
app_id: ""
content_encoding: ""
content_type: ""
delivery_mode: 0
exchange: ""
expiration: ""
headers: null
immediate: false
mandatory: false
priority: 0
reply_to: ""
routing_keys: null
type: ""
user_id: ""
queueBindings: null
queues: null
vhost: ""
kafka:
enable: false
addrs: null
producer:
async: false
compression: none
compression_level: 0
flush:
bytes: 0
frequency: 0
max_messages: 0
messages: 0
idempotent: false
max_message_bytes: 0
required_acks: 0
retry:
backoff: 0
max: 0
timeout: 0
topics: null
nats:
enable: false
addrs: null
jetStream:
enable: false
name: ""
producer:
subjects: null
producer:
subjects: null
nsq:
enable: false
addrs: null
producer:
topics: null
redis:
enable: false
addrs: null
db: 0
password: ""
producer:
channels: null

View File

@@ -2,28 +2,28 @@ package config
// listen related
type CertKey struct {
Cert string `yaml:"cert"`
Key string `yaml:"key"`
Cert string `yaml:"cert" json:"cert"`
Key string `yaml:"key" json:"key"`
}
type TLS struct {
Enable bool `yaml:"enable"`
MTLS bool `yaml:"mtls"`
CACerts []string `yaml:"ca_certs"` // ca certs paths
Certs []CertKey `yaml:"certs"` // certs paths
InsecureSkipVerify bool `yaml:"insecure_skip_verify"` // for client use
Enable bool `yaml:"enable" json:"enable"`
MTLS bool `yaml:"mtls" json:"mtls"`
CACerts []string `yaml:"ca_certs" json:"ca_certs"` // ca certs paths
Certs []CertKey `yaml:"certs" json:"certs"` // certs paths
InsecureSkipVerify bool `yaml:"insecure_skip_verify" json:"insecure_skip_verify"` // for client use
}
type Listen struct {
Network string `yaml:"network"`
Addr string `yaml:"addr"`
AdvertisedAddr string `yaml:"advertised_addr,omitempty"`
TLS TLS `yaml:"tls,omitempty"`
Network string `yaml:"network" json:"network"`
Addr string `yaml:"addr" json:"addr"`
AdvertisedAddr string `yaml:"advertised_addr,omitempty" json:"advertised_addr"`
TLS TLS `yaml:"tls,omitempty" json:"tls"`
}
type Dial struct {
Network string `yaml:"network"`
Addr string `yaml:"addr"`
AdvertisedAddr string `yaml:"advertised_addr,omitempty"`
TLS TLS `yaml:"tls,omitempty"`
Network string `yaml:"network" json:"network"`
Addr string `yaml:"addr" json:"addr"`
AdvertisedAddr string `yaml:"advertised_addr,omitempty" json:"advertised_addr"`
TLS TLS `yaml:"tls,omitempty" json:"tls"`
}

View File

@@ -45,8 +45,11 @@ type Servicebound interface {
ListService() []geminio.End
// for management
GetServiceByName(service string) (geminio.End, error)
GetServicesByName(service string) ([]geminio.End, error)
GetServiceByRPC(rpc string) (geminio.End, error)
GetServicesByRPC(rpc string) ([]geminio.End, error)
GetServiceByTopic(topic string) (geminio.End, error)
GetServicesByTopic(topic string) ([]geminio.End, error)
DelServiceByID(serviceID uint64) error
DelSerivces(service string) error
@@ -89,8 +92,11 @@ type Repo interface {
GetEdge(edgeID uint64) (*model.Edge, error)
GetService(serviceID uint64) (*model.Service, error)
GetServiceByName(name string) (*model.Service, error)
GetServicesByName(name string) ([]*model.Service, error)
GetServiceRPC(rpc string) (*model.ServiceRPC, error)
GetServiceRPCs(rpc string) ([]*model.ServiceRPC, error)
GetServiceTopic(topic string) (*model.ServiceTopic, error)
GetServiceTopics(topic string) ([]*model.ServiceTopic, error)
ListEdgeRPCs(query *query.EdgeRPCQuery) ([]string, error)
ListEdges(query *query.EdgeQuery) ([]*model.Edge, error)
ListServiceRPCs(query *query.ServiceRPCQuery) ([]string, error)
@@ -118,6 +124,7 @@ type MQ interface {
type ProduceOption struct {
Origin interface{}
EdgeID uint64
Addr net.Addr
}
type OptionProduce func(*ProduceOption)
@@ -133,3 +140,9 @@ func WithOrigin(origin interface{}) OptionProduce {
po.Origin = origin
}
}
func WithAddr(addr net.Addr) OptionProduce {
return func(po *ProduceOption) {
po.Addr = addr
}
}

View File

@@ -1,6 +1,7 @@
package config
import (
"encoding/json"
"flag"
"fmt"
"io"
@@ -18,86 +19,81 @@ import (
// daemon related
type RLimit struct {
Enable bool `yaml:"enable"`
NumFile int `yaml:"nofile"`
Enable bool `yaml:"enable" json:"enable"`
NumFile int `yaml:"nofile" json:"nofile"`
}
type PProf struct {
Enable bool `yaml:"enable"`
Addr string `yaml:"addr"`
CPUProfileRate int `yaml:"cpu_profile_rate"`
Enable bool `yaml:"enable" json:"enable"`
Addr string `yaml:"addr" json:"addr"`
CPUProfileRate int `yaml:"cpu_profile_rate" json:"cpu_profile_rate"`
}
type Daemon struct {
RLimit RLimit `yaml:"rlimit"`
PProf PProf `yaml:"pprof"`
RLimit RLimit `yaml:"rlimit,omitempty" json:"rlimit"`
PProf PProf `yaml:"pprof,omitempty" json:"pprof"`
// use with frontlas
FrontierID string `yaml:"frontier_id,omitempty"`
FrontierID string `yaml:"frontier_id,omitempty" json:"frontier_id"`
}
// edgebound
// Bypass is for the lagecy gateway, this will split
type Bypass struct {
Enable bool `yaml:"enable"`
Network string `yaml:"network"`
Addr string `yaml:"addr"` // addr to dial
TLS config.TLS `yaml:"tls"` // certs to dial or ca to auth
Enable bool `yaml:"enable" json:"enable"`
Network string `yaml:"network" json:"network"`
Addr string `yaml:"addr" json:"addr"` // addr to dial
TLS config.TLS `yaml:"tls" json:"tls"` // certs to dial or ca to auth
}
type Edgebound struct {
Listen config.Listen `yaml:"listen"`
Bypass config.Dial `yaml:"bypass"`
BypassEnable bool `yaml:"bypass_enable"`
Listen config.Listen `yaml:"listen" json:"listen"`
Bypass config.Dial `yaml:"bypass,omitempty" json:"bypass"`
BypassEnable bool `yaml:"bypass_enable,omitempty" json:"bypass_enable"`
// alloc edgeID when no get_id function online
EdgeIDAllocWhenNoIDServiceOn bool `yaml:"edgeid_alloc_when_no_idservice_on"`
EdgeIDAllocWhenNoIDServiceOn bool `yaml:"edgeid_alloc_when_no_idservice_on" json:"edgeid_alloc_when_no_idservice_on"`
}
// servicebound
type Servicebound struct {
Listen config.Listen `yaml:"listen"`
Listen config.Listen `yaml:"listen" json:"listen"`
}
type ControlPlane struct {
Enable bool `yaml:"enable"`
Listen config.Listen `yaml:"listen"`
}
// message queue
type MQ struct {
BroadCast bool `yaml:"broadcast"`
Enable bool `yaml:"enable" json:"enable"`
Listen config.Listen `yaml:"listen" json:"listen"`
}
type Kafka struct {
Enable bool `yaml:"enable"`
Addrs []string `yaml:"addrs"`
Enable bool `yaml:"enable" json:"enable"`
Addrs []string `yaml:"addrs" json:"addrs"`
// Producer is the namespace for configuration related to producing messages,
// used by the Producer.
Producer struct {
// topics to notify frontier which topics to allow to publish
Topics []string `yaml:"topics"`
Async bool `yaml:"async"`
Topics []string `yaml:"topics" json:"topics"`
Async bool `yaml:"async" json:"async"`
// The maximum permitted size of a message (defaults to 1000000). Should be
// set equal to or smaller than the broker's `message.max.bytes`.
MaxMessageBytes int `yaml:"max_message_bytes,omitempty"`
MaxMessageBytes int `yaml:"max_message_bytes,omitempty" json:"max_message_bytes"`
// The level of acknowledgement reliability needed from the broker (defaults
// to WaitForLocal). Equivalent to the `request.required.acks` setting of the
// JVM producer.
RequiredAcks sarama.RequiredAcks `yaml:"required_acks,omitempty"`
RequiredAcks sarama.RequiredAcks `yaml:"required_acks,omitempty" json:"required_acks"`
// The maximum duration the broker will wait the receipt of the number of
// RequiredAcks (defaults to 10 seconds). This is only relevant when
// RequiredAcks is set to WaitForAll or a number > 1. Only supports
// millisecond resolution, nanoseconds will be truncated. Equivalent to
// the JVM producer's `request.timeout.ms` setting.
Timeout int `yaml:"timeout,omitempty"`
Timeout int `yaml:"timeout,omitempty" json:"timeout"`
// The type of compression to use on messages (defaults to no compression).
// Similar to `compression.codec` setting of the JVM producer.
Compression sarama.CompressionCodec `yaml:"compression,omitempty"`
Compression sarama.CompressionCodec `yaml:"compression,omitempty" json:"compression"`
// The level of compression to use on messages. The meaning depends
// on the actual compression type used and defaults to default compression
// level for the codec.
CompressionLevel int `yaml:"compression_level,omitempty"`
CompressionLevel int `yaml:"compression_level,omitempty" json:"compression_level"`
// If enabled, the producer will ensure that exactly one copy of each message is
// written.
Idempotent bool `yaml:"idepotent,omitempty"`
Idempotent bool `yaml:"idepotent,omitempty" json:"idempotent"`
// The following config options control how often messages are batched up and
// sent to the broker. By default, messages are sent as fast as possible, and
@@ -106,167 +102,171 @@ type Kafka struct {
Flush struct {
// The best-effort number of bytes needed to trigger a flush. Use the
// global sarama.MaxRequestSize to set a hard upper limit.
Bytes int `yaml:"bytes,omitempty"`
Bytes int `yaml:"bytes,omitempty" json:"bytes"`
// The best-effort number of messages needed to trigger a flush. Use
// `MaxMessages` to set a hard upper limit.
Messages int `yaml:"messages,omitempty"`
Messages int `yaml:"messages,omitempty" json:"messages"`
// The best-effort frequency of flushes. Equivalent to
// `queue.buffering.max.ms` setting of JVM producer.
Frequency int `yaml:"frequency,omitempty"`
Frequency int `yaml:"frequency,omitempty" json:"frequency"`
// The maximum number of messages the producer will send in a single
// broker request. Defaults to 0 for unlimited. Similar to
// `queue.buffering.max.messages` in the JVM producer.
MaxMessages int `yaml:"max_messages,omitempty"`
} `yaml:"flush,omitempty"`
MaxMessages int `yaml:"max_messages,omitempty" json:"max_messages"`
} `yaml:"flush,omitempty" json:"flush"`
Retry struct {
// The total number of times to retry sending a message (default 3).
// Similar to the `message.send.max.retries` setting of the JVM producer.
Max int `yaml:"max,omitempty"`
Max int `yaml:"max,omitempty" json:"max"`
// How long to wait for the cluster to settle between retries
// (default 100ms). Similar to the `retry.backoff.ms` setting of the
// JVM producer.
Backoff int `yaml:"back_off,omitempty"`
} `yaml:"retry"`
} `yaml:"producer"`
Backoff int `yaml:"backoff,omitempty" json:"backoff"`
} `yaml:"retry" json:"retry"`
} `yaml:"producer" json:"producer"`
}
type AMQP struct {
Enable bool `yaml:"enable"`
Enable bool `yaml:"enable" json:"enable"`
// TODO we don't support multiple addresses for now
Addrs []string `yaml:"addrs"`
Addrs []string `yaml:"addrs" json:"addrs"`
// Vhost specifies the namespace of permissions, exchanges, queues and
// bindings on the server. Dial sets this to the path parsed from the URL.
Vhost string `yaml:"vhost,omitempty"`
Vhost string `yaml:"vhost,omitempty" json:"vhost"`
// 0 max channels means 2^16 - 1
ChannelMax int `yaml:"channel_max,omitempty"`
ChannelMax int `yaml:"channel_max,omitempty" json:"channel_max"`
// 0 max bytes means unlimited
FrameSize int `yaml:"frame_size,omitempty"`
FrameSize int `yaml:"frame_size,omitempty" json:"frame_size"`
// less than 1s uses the server's interval
Heartbeat int `yaml:"heartbeat,omitempty"`
Heartbeat int `yaml:"heartbeat,omitempty" json:"heartbeat"`
// Connection locale that we expect to always be en_US
// Even though servers must return it as per the AMQP 0-9-1 spec,
// we are not aware of it being used other than to satisfy the spec requirements
Locale string `yaml:"locale,omitempty"`
Locale string `yaml:"locale,omitempty" json:"locale"`
// exchange to declare
Exchanges []struct {
// exchange name to declare
Name string `yaml:"name"`
Name string `yaml:"name" json:"name"`
// direct topic fanout headers, default direct
Kind string `yaml:"kind,omitempty"`
Durable bool `yaml:"durable,omitempty"`
AutoDelete bool `yaml:"auto_delete,omitempty"`
Internal bool `yaml:"internal,omitempty"`
NoWait bool `yaml:"nowait,omitempty"`
} `yaml:"exchanges,omitempty"`
Kind string `yaml:"kind,omitempty" json:"kind"`
Durable bool `yaml:"durable,omitempty" json:"durable"`
AutoDelete bool `yaml:"auto_delete,omitempty" json:"auto_delete"`
Internal bool `yaml:"internal,omitempty" json:"internal"`
NoWait bool `yaml:"nowait,omitempty" json:"noWait"`
} `yaml:"exchanges,omitempty" json:"exchanges"`
// queues to declare, default nil
Queues []struct {
Name string `yaml:"name"`
Durable bool `yaml:"durable,omitempty"`
AutoDelete bool `yaml:"auto_delete,omitempty"`
Exclustive bool `yaml:"exclustive,omitempty"`
NoWait bool `yaml:"nowait,omitempty"`
}
Name string `yaml:"name" json:"name"`
Durable bool `yaml:"durable,omitempty" json:"durable"`
AutoDelete bool `yaml:"auto_delete,omitempty" json:"auto_delete"`
Exclustive bool `yaml:"exclustive,omitempty" json:"exclustive"`
NoWait bool `yaml:"nowait,omitempty" json:"noWait"`
} `json:"queues"`
// queue bindings to exchange, default nil
QueueBindings []struct {
QueueName string `yaml:"queue_name"`
ExchangeName string `yaml:"exchange_name,omitempty"`
BindingKey string `yaml:"binding_key,omitempty"`
NoWait bool `yaml:"nowait,omitempty"`
}
QueueName string `yaml:"queue_name" json:"queue_name"`
ExchangeName string `yaml:"exchange_name,omitempty" json:"exchange_name"`
BindingKey string `yaml:"binding_key,omitempty" json:"binding_key"`
NoWait bool `yaml:"nowait,omitempty" json:"nowait"`
} `json:"queueBindings"`
Producer struct {
RoutingKeys []string `yaml:"routing_keys"` // topics
Exchange string `yaml:"exchange"`
Mandatory bool `yaml:"mandatory,omitempty"`
Immediate bool `yaml:"immediate,omitempty"`
RoutingKeys []string `yaml:"routing_keys" json:"routing_keys"` // topics
Exchange string `yaml:"exchange" json:"exchange"`
Mandatory bool `yaml:"mandatory,omitempty" json:"mandatory"`
Immediate bool `yaml:"immediate,omitempty" json:"immediate"`
// message related
Headers map[string]interface{} `yaml:"headers,omitempty"`
Headers map[string]interface{} `yaml:"headers,omitempty" json:"headers"`
// properties
ContentType string `yaml:"content_type,omitempty"` // MIME content type
ContentEncoding string `yaml:"content_encoding,omitempty"` // MIME content encoding
DeliveryMode uint8 `yaml:"delivery_mode,omitempty"` // Transient (0 or 1) or Persistent (2)
Priority uint8 `yaml:"priority,omitempty"` // 0 to 9
ReplyTo string `yaml:"reply_to,omitempty"` // address to to reply to (ex: RPC)
Expiration string `yaml:"expiration,omitempty"` // message expiration spec
Type string `yaml:"type,omitempty"` // message type name
UserId string `yaml:"user_id,omitempty"` // creating user id - ex: "guest"
AppId string `yaml:"app_id,omitempty"` // creating application id
} `yaml:"producer,omitempty"`
ContentType string `yaml:"content_type,omitempty" json:"content_type"` // MIME content type
ContentEncoding string `yaml:"content_encoding,omitempty" json:"content_encoding"` // MIME content encoding
DeliveryMode uint8 `yaml:"delivery_mode,omitempty" json:"delivery_mode"` // Transient (0 or 1) or Persistent (2)
Priority uint8 `yaml:"priority,omitempty" json:"priority"` // 0 to 9
ReplyTo string `yaml:"reply_to,omitempty" json:"reply_to"` // address to to reply to (ex: RPC)
Expiration string `yaml:"expiration,omitempty" json:"expiration"` // message expiration spec
Type string `yaml:"type,omitempty" json:"type"` // message type name
UserId string `yaml:"user_id,omitempty" json:"user_id"` // creating user id - ex: "guest"
AppId string `yaml:"app_id,omitempty" json:"app_id"` // creating application id
} `yaml:"producer,omitempty" json:"producer"`
}
type Nats struct {
Enable bool `yaml:"enable"`
Addrs []string `yaml:"addrs"`
Enable bool `yaml:"enable" json:"enable"`
Addrs []string `yaml:"addrs" json:"addrs"`
Producer struct {
Subjects []string `yaml:"subjects"` // topics
} `yaml:"producer,omitempty"`
Subjects []string `yaml:"subjects" json:"subjects"` // topics
} `yaml:"producer,omitempty" json:"producer"`
JetStream struct {
// using jetstream instead of nats
Enable bool `yaml:"enable"`
Name string `yaml:"name"`
Enable bool `yaml:"enable" json:"enable"`
Name string `yaml:"name" json:"name"`
Producer struct {
Subjects []string `yaml:"subjects"`
} `yaml:"producer,omitempty"`
} `yaml:"jetstream,omitempty"`
Subjects []string `yaml:"subjects" json:"subjects"`
} `yaml:"producer,omitempty" json:"producer"`
} `yaml:"jetstream,omitempty" json:"jetStream"`
}
type NSQ struct {
Enable bool `yaml:"enable"`
Addrs []string `yaml:"addrs"`
Enable bool `yaml:"enable" json:"enable"`
Addrs []string `yaml:"addrs" json:"addrs"`
Producer struct {
Topics []string `yaml:"topics"`
} `yaml:"producer"`
Topics []string `yaml:"topics" json:"topics"`
} `yaml:"producer" json:"producer"`
}
type Redis struct {
Enable bool `yaml:"enable"`
Addrs []string `yaml:"addrs"`
DB int `yaml:"db"`
Password string `yaml:"password"`
Enable bool `yaml:"enable" json:"enable"`
Addrs []string `yaml:"addrs" json:"addrs"`
DB int `yaml:"db" json:"db"`
Password string `yaml:"password" json:"password"`
Producer struct {
Channels []string `yaml:"channels"`
} `yaml:"producer"`
Channels []string `yaml:"channels" json:"channels"`
} `yaml:"producer" json:"producer"`
}
type MQM struct {
Kafka Kafka `yaml:"kafka,omitempty"`
AMQP AMQP `yaml:"amqp,omitempty"`
Nats Nats `yaml:"nats,omitempty"`
NSQ NSQ `yaml:"nsq,omitempty"`
Redis Redis `yaml:"redis,omitempty"`
Kafka Kafka `yaml:"kafka,omitempty" json:"kafka"`
AMQP AMQP `yaml:"amqp,omitempty" json:"amqp"`
Nats Nats `yaml:"nats,omitempty" json:"nats"`
NSQ NSQ `yaml:"nsq,omitempty" json:"nsq"`
Redis Redis `yaml:"redis,omitempty" json:"redis"`
}
// exchange
type Exchange struct{}
type Exchange struct {
HashBy string `yaml:"hashby" json:"hashby"` // default edgeid, options: srcip random
}
type Dao struct {
Debug bool `yaml:"debug,omitempty"`
Backend string `yaml:"backend,omitempty"` // default buntdb
Debug bool `yaml:"debug,omitempty" json:"debug"`
Backend string `yaml:"backend,omitempty" json:"backend"` // default buntdb
}
// frontlas
type Frontlas struct {
Enable bool `yaml:"enable"`
Dial config.Dial `yaml:"dial"`
Enable bool `yaml:"enable" json:"enable"`
Dial config.Dial `yaml:"dial" json:"dial"`
Metrics struct {
Enable bool `yaml:"enable"`
Interval int `yaml:"interval"` // for stats
}
Enable bool `yaml:"enable" json:"enable"`
Interval int `yaml:"interval" json:"interval"` // for stats
} `yaml:"metrics" json:"metrics"`
}
type Configuration struct {
Daemon Daemon `yaml:"daemon,omitempty"`
Daemon Daemon `yaml:"daemon,omitempty" json:"daemon"`
Edgebound Edgebound `yaml:"edgebound"`
Edgebound Edgebound `yaml:"edgebound" json:"edgebound"`
Servicebound Servicebound `yaml:"servicebound"`
Servicebound Servicebound `yaml:"servicebound" json:"servicebound"`
ControlPlane ControlPlane `yaml:"controlplane,omitempty"`
ControlPlane ControlPlane `yaml:"controlplane,omitempty" json:"controlplane"`
Dao Dao `yaml:"dao,omitempty"`
Exchange Exchange `yaml:"exchange,omitempty" json:"exchange"`
Frontlas Frontlas `yaml:"frontlas"`
Dao Dao `yaml:"dao,omitempty" json:"dao"`
MQM MQM `yaml:"mqm,omitempty"`
Frontlas Frontlas `yaml:"frontlas,omitempty" json:"frontlas"`
MQM MQM `yaml:"mqm,omitempty" json:"mqm"`
}
// Configuration accepts config file and command-line, and command-line is more privileged.
@@ -360,10 +360,11 @@ func Parse() (*Configuration, error) {
return config, nil
}
func genDefaultConfig(writer io.Writer) error {
func genAllConfig(writer io.Writer) error {
conf := &Configuration{
Daemon: Daemon{
RLimit: RLimit{
Enable: true,
NumFile: 102400,
},
PProf: PProf{
@@ -472,6 +473,44 @@ func genDefaultConfig(writer io.Writer) error {
},
},
}
data, err := json.Marshal(conf)
if err != nil {
return err
}
newConf := map[string]interface{}{}
err = yaml.Unmarshal(data, &newConf)
if err != nil {
return err
}
data, err = yaml.Marshal(newConf)
if err != nil {
return err
}
_, err = armio.WriteAll(data, writer)
if err != nil {
return err
}
return nil
}
func genMinConfig(writer io.Writer) error {
conf := &Configuration{
// default listen on 30011
Servicebound: Servicebound{
Listen: config.Listen{
Network: "tcp",
Addr: "0.0.0.0:30011",
},
},
// default listen on 30012
Edgebound: Edgebound{
Listen: config.Listen{
Network: "tcp",
Addr: "0.0.0.0:30012",
},
EdgeIDAllocWhenNoIDServiceOn: true,
},
}
data, err := yaml.Marshal(conf)
if err != nil {
return err

View File

@@ -60,13 +60,25 @@ func TestParseFile(t *testing.T) {
}
}
func TestGenDefaultConfig(t *testing.T) {
func TestGenMinConfig(t *testing.T) {
file, err := os.OpenFile("../../../etc/frontier.yaml", os.O_TRUNC|os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
t.Error(err)
}
defer file.Close()
err = genDefaultConfig(file)
err = genMinConfig(file)
if err != nil {
t.Error(err)
}
}
func TestGenAllConfig(t *testing.T) {
file, err := os.OpenFile("../../../etc/frontier_all.yaml", os.O_TRUNC|os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
t.Error(err)
}
defer file.Close()
err = genAllConfig(file)
if err != nil {
t.Error(err)
}

View File

@@ -7,6 +7,7 @@ import (
"time"
"github.com/singchia/frontier/pkg/frontier/apis"
"github.com/singchia/frontier/pkg/frontier/misc"
"github.com/singchia/geminio"
"github.com/singchia/geminio/options"
"k8s.io/klog/v2"
@@ -143,15 +144,18 @@ func (ex *exchange) forwardRawToService(end geminio.End) {
// rpc from edge, and forward to service
func (ex *exchange) forwardRPCToService(end geminio.End) {
edgeID := end.ClientID()
addr := end.RemoteAddr()
// we hijack all rpcs and forward them to service
end.Hijack(func(ctx context.Context, method string, r1 geminio.Request, r2 geminio.Response) {
// get service
svc, err := ex.Servicebound.GetServiceByRPC(method)
svcs, err := ex.Servicebound.GetServicesByRPC(method)
if err != nil {
klog.V(2).Infof("exchange forward rpc to service, get service by rpc err: %s, edgeID: %d", err, edgeID)
r2.SetError(err)
return
}
index := misc.Hash(ex.conf.Exchange.HashBy, len(svcs), edgeID, addr)
svc := svcs[index]
serviceID := svc.ClientID()
// we record the edgeID to service
tail := make([]byte, 8)
@@ -201,7 +205,10 @@ func (ex *exchange) forwardMessageToService(end geminio.End) {
}
topic := msg.Topic()
// TODO seperate async and sync produce
err = ex.MQM.Produce(topic, msg.Data(), apis.WithOrigin(msg), apis.WithEdgeID(edgeID))
err = ex.MQM.Produce(topic, msg.Data(),
apis.WithOrigin(msg),
apis.WithEdgeID(edgeID),
apis.WithAddr(end.RemoteAddr()))
if err != nil {
if err != apis.ErrTopicNotOnline {
klog.Errorf("edge forward message, produce err: %s, edgeID: %d", err, edgeID)

View File

@@ -8,6 +8,7 @@ import (
"time"
"github.com/singchia/frontier/pkg/frontier/apis"
"github.com/singchia/frontier/pkg/frontier/misc"
"github.com/singchia/geminio/options"
"k8s.io/klog/v2"
@@ -42,7 +43,7 @@ func (ex *exchange) GetEdgeID(meta []byte) (uint64, error) {
}
func (ex *exchange) EdgeOnline(edgeID uint64, meta []byte, addr net.Addr) error {
svc, err := ex.Servicebound.GetServiceByRPC(apis.RPCEdgeOnline)
svcs, err := ex.Servicebound.GetServicesByRPC(apis.RPCEdgeOnline)
if err != nil {
klog.V(2).Infof("exchange edge online, get service err: %s, edgeID: %d, meta: %s, addr: %s", err, edgeID, string(meta), addr)
if err == apis.ErrRecordNotFound {
@@ -62,6 +63,9 @@ func (ex *exchange) EdgeOnline(edgeID uint64, meta []byte, addr net.Addr) error
klog.Errorf("exchange edge online, json marshal err: %s, edgeID: %d, meta: %s, addr: %s", err, edgeID, string(meta), addr)
return err
}
index := misc.Hash(ex.conf.Exchange.HashBy, len(svcs), edgeID, addr)
svc := svcs[index]
// call service
req := svc.NewRequest(data)
opt := options.Call()
@@ -75,7 +79,7 @@ func (ex *exchange) EdgeOnline(edgeID uint64, meta []byte, addr net.Addr) error
}
func (ex *exchange) EdgeOffline(edgeID uint64, meta []byte, addr net.Addr) error {
svc, err := ex.Servicebound.GetServiceByRPC(apis.RPCEdgeOffline)
svcs, err := ex.Servicebound.GetServicesByRPC(apis.RPCEdgeOffline)
if err != nil {
klog.V(2).Infof("exchange edge offline, get service err: %s, edgeID: %d, meta: %s, addr: %s", err, edgeID, string(meta), addr)
if err == apis.ErrRecordNotFound {
@@ -83,6 +87,8 @@ func (ex *exchange) EdgeOffline(edgeID uint64, meta []byte, addr net.Addr) error
}
return err
}
index := misc.Hash(ex.conf.Exchange.HashBy, len(svcs), edgeID, addr)
svc := svcs[index]
// call service the edge offline event
event := &apis.OnEdgeOffline{
EdgeID: edgeID,

View File

@@ -1,6 +1,12 @@
package misc
import "reflect"
import (
"math/rand"
"net"
"reflect"
"github.com/singchia/frontier/pkg/utils"
)
func IsNil(i interface{}) bool {
return i == nil || reflect.ValueOf(i).IsNil()
@@ -13,3 +19,21 @@ func GetKeys(m map[string]struct{}) []string {
}
return keys
}
func Hash(hashby string, count int, edgeID uint64, addr net.Addr) int {
switch hashby {
case "srcip":
tcpaddr, ok := addr.(*net.TCPAddr)
if !ok {
return 0
}
ip32 := utils.IP2Int(tcpaddr.IP)
return int(ip32 % uint32(count))
case "random":
return rand.Intn(count)
default: // "edgeid" or empty
return int(edgeID % uint64(count))
}
}

View File

@@ -6,6 +6,7 @@ import (
"github.com/singchia/frontier/pkg/frontier/apis"
"github.com/singchia/frontier/pkg/frontier/config"
"github.com/singchia/frontier/pkg/frontier/misc"
"github.com/singchia/geminio"
"k8s.io/klog/v2"
)
@@ -181,15 +182,22 @@ func (mqm *mqManager) GetMQs(topic string) []apis.MQ {
}
func (mqm *mqManager) Produce(topic string, data []byte, opts ...apis.OptionProduce) error {
mq := mqm.GetMQ(topic)
if mq == nil {
mq = mqm.GetMQ("*")
mqs := mqm.GetMQs(topic)
if mqs == nil || len(mqs) == 0 {
mq := mqm.GetMQ("*")
if mq == nil {
err := apis.ErrTopicNotOnline
klog.V(2).Infof("mq manager, get mq nil, topic: %s err: %s", topic, err)
return err
}
}
// TODO optimize the logic
opt := &apis.ProduceOption{}
for _, fun := range opts {
fun(opt)
}
index := misc.Hash(mqm.conf.Exchange.HashBy, len(mqs), opt.EdgeID, opt.Addr)
mq := mqs[index]
err := mq.Produce(topic, data, opts...)
if err != nil {
klog.Errorf("mq manager, produce topic: %s message err: %s", topic, err)

View File

@@ -199,6 +199,15 @@ func (dao *dao) GetService(serviceID uint64) (*model.Service, error) {
}
func (dao *dao) GetServiceByName(name string) (*model.Service, error) {
services, err := dao.GetServicesByName(name)
if err != nil {
return nil, err
}
// random one
return services[rand.Intn(len(services))], err
}
func (dao *dao) GetServicesByName(name string) ([]*model.Service, error) {
services := []*model.Service{}
err := dao.db.View(func(tx *buntdb.Tx) error {
pivot := fmt.Sprintf(`{"service": "%s"}`, name)
@@ -217,8 +226,7 @@ func (dao *dao) GetServiceByName(name string) (*model.Service, error) {
if len(services) == 0 {
return nil, apis.ErrRecordNotFound
}
// random one
return services[rand.Intn(len(services))], err
return services, err
}
func (dao *dao) DeleteService(delete *query.ServiceDelete) error {
@@ -250,6 +258,15 @@ func getServiceKey(serviceID uint64) string {
// service rpc
func (dao *dao) GetServiceRPC(rpc string) (*model.ServiceRPC, error) {
serviceRPCs, err := dao.GetServiceRPCs(rpc)
if err != nil {
return nil, err
}
// return random one
return serviceRPCs[rand.Intn(len(serviceRPCs))], err
}
func (dao *dao) GetServiceRPCs(rpc string) ([]*model.ServiceRPC, error) {
serviceRPCs := []*model.ServiceRPC{}
err := dao.db.View(func(tx *buntdb.Tx) error {
pivot := fmt.Sprintf(`{"rpc":"%s"}`, rpc)
@@ -267,7 +284,7 @@ func (dao *dao) GetServiceRPC(rpc string) (*model.ServiceRPC, error) {
if len(serviceRPCs) == 0 {
return nil, apis.ErrRecordNotFound
}
return serviceRPCs[rand.Intn(len(serviceRPCs))], err
return serviceRPCs, err
}
func (dao *dao) ListServiceRPCs(query *query.ServiceRPCQuery) ([]string, error) {
@@ -465,6 +482,14 @@ func getServiceRPCKey(serviceID uint64, rpc string) string {
// service topics
func (dao *dao) GetServiceTopic(topic string) (*model.ServiceTopic, error) {
serviceTopics, err := dao.GetServiceTopics(topic)
if err != nil {
return nil, err
}
return serviceTopics[rand.Intn(len(serviceTopics))], err
}
func (dao *dao) GetServiceTopics(topic string) ([]*model.ServiceTopic, error) {
serviceTopics := []*model.ServiceTopic{}
err := dao.db.View(func(tx *buntdb.Tx) error {
pivot := fmt.Sprintf(`{"topic":"%s"}`, topic)
@@ -482,7 +507,7 @@ func (dao *dao) GetServiceTopic(topic string) (*model.ServiceTopic, error) {
if len(serviceTopics) == 0 {
return nil, apis.ErrRecordNotFound
}
return serviceTopics[rand.Intn(len(serviceTopics))], err
return serviceTopics, err
}
func (dao *dao) ListServiceTopics(query *query.ServiceTopicQuery) ([]string, error) {

View File

@@ -81,6 +81,20 @@ func (dao *dao) GetServiceByName(name string) (*model.Service, error) {
return &service, tx.Error
}
func (dao *dao) GetServicesByName(name string) ([]*model.Service, error) {
tx := dao.dbService.Model(&model.Service{})
if dao.config.Dao.Debug {
tx = tx.Debug()
}
tx = tx.Where("service = ?", name)
services := []*model.Service{}
tx = tx.Find(&services)
if tx.RowsAffected == 0 {
return nil, gorm.ErrRecordNotFound
}
return services, tx.Error
}
type ServiceDelete struct {
ServiceID uint64
Addr string
@@ -156,6 +170,21 @@ func (dao *dao) GetServiceRPC(rpc string) (*model.ServiceRPC, error) {
return &mrpc, tx.Error
}
func (dao *dao) GetServiceRPCs(rpc string) ([]*model.ServiceRPC, error) {
tx := dao.dbService.Model(&model.ServiceRPC{})
if dao.config.Dao.Debug {
tx = tx.Debug()
}
tx = tx.Where("rpc = ?", rpc)
mrpcs := []*model.ServiceRPC{}
tx = tx.Find(&mrpcs)
if tx.RowsAffected == 0 {
return nil, gorm.ErrRecordNotFound
}
return mrpcs, tx.Error
}
func (dao *dao) ListServiceRPCs(query *query.ServiceRPCQuery) ([]string, error) {
tx := dao.dbService.Model(&model.ServiceRPC{})
if dao.config.Dao.Debug {
@@ -231,7 +260,6 @@ func buildServiceRPCQuery(tx *gorm.DB, query *query.ServiceRPCQuery) *gorm.DB {
}
// service topic
func (dao *dao) GetServiceTopic(topic string) (*model.ServiceTopic, error) {
tx := dao.dbService.Model(&model.ServiceTopic{})
if dao.config.Dao.Debug {
@@ -247,6 +275,21 @@ func (dao *dao) GetServiceTopic(topic string) (*model.ServiceTopic, error) {
return &mtopic, tx.Error
}
func (dao *dao) GetServiceTopics(topic string) ([]*model.ServiceTopic, error) {
tx := dao.dbService.Model(&model.ServiceTopic{})
if dao.config.Dao.Debug {
tx = tx.Debug()
}
tx = tx.Where("topic = ?", topic).Limit(1)
mtopics := []*model.ServiceTopic{}
tx = tx.Find(&mtopics)
if tx.RowsAffected == 0 {
return nil, gorm.ErrRecordNotFound
}
return mtopics, tx.Error
}
func (dao *dao) ListServiceTopics(query *query.ServiceTopicQuery) ([]string, error) {
tx := dao.dbService.Model(&model.ServiceTopic{})
if dao.config.Dao.Debug {

View File

@@ -25,6 +25,7 @@ func NewServer(conf *config.Configuration, repo apis.Repo, mqm apis.MQM) (*Serve
// informer
var (
inf *frontlas.Informer
cp *controlplane.ControlPlane
err error
)
if conf.Frontlas.Enable {
@@ -53,17 +54,19 @@ func NewServer(conf *config.Configuration, repo apis.Repo, mqm apis.MQM) (*Serve
}
// controlplane
controlplane, err := controlplane.NewControlPlane(conf, repo, servicebound, edgebound)
if err != nil {
klog.Errorf("new controlplane err: %s", err)
return nil, err
if conf.ControlPlane.Enable {
cp, err = controlplane.NewControlPlane(conf, repo, servicebound, edgebound)
if err != nil {
klog.Errorf("new controlplane err: %s", err)
return nil, err
}
}
return &Server{
tmr: tmr,
servicebound: servicebound,
edgebound: edgebound,
controlplane: controlplane,
controlplane: cp,
}, nil
}
@@ -71,12 +74,16 @@ func NewServer(conf *config.Configuration, repo apis.Repo, mqm apis.MQM) (*Serve
func (s *Server) Serve() {
go s.servicebound.Serve()
go s.edgebound.Serve()
go s.controlplane.Serve()
if s.controlplane != nil {
go s.controlplane.Serve()
}
}
func (s *Server) Close() {
s.servicebound.Close()
s.edgebound.Close()
s.controlplane.Close()
if s.controlplane != nil {
s.controlplane.Close()
}
s.tmr.Close()
}

View File

@@ -199,6 +199,27 @@ func (sm *serviceManager) GetServiceByName(name string) (geminio.End, error) {
return sm.services[mservice.ServiceID], nil
}
func (sm *serviceManager) GetServicesByName(name string) ([]geminio.End, error) {
sm.mtx.RLock()
defer sm.mtx.RUnlock()
mservices, err := sm.repo.GetServicesByName(name)
if err != nil {
klog.V(2).Infof("get services by name: %s, err: %s", name, err)
return nil, err
}
ends := []geminio.End{}
for _, mservice := range mservices {
end, ok := sm.services[mservice.ServiceID]
if ok {
ends = append(ends, end)
continue
}
// TOTO warning the consistency
}
return ends, nil
}
func (sm *serviceManager) GetServiceByRPC(rpc string) (geminio.End, error) {
sm.mtx.RLock()
defer sm.mtx.RUnlock()
@@ -208,10 +229,30 @@ func (sm *serviceManager) GetServiceByRPC(rpc string) (geminio.End, error) {
klog.V(2).Infof("get service by rpc: %s, err: %s", rpc, err)
return nil, err
}
return sm.services[mrpc.ServiceID], nil
}
func (sm *serviceManager) GetServicesByRPC(rpc string) ([]geminio.End, error) {
sm.mtx.RLock()
defer sm.mtx.RUnlock()
mrpcs, err := sm.repo.GetServiceRPCs(rpc)
if err != nil {
klog.V(2).Infof("get service by rpc: %s, err: %s", rpc, err)
return nil, err
}
ends := []geminio.End{}
for _, mrpc := range mrpcs {
end, ok := sm.services[mrpc.ServiceID]
if ok {
ends = append(ends, end)
continue
}
// TODO warning the consistency
}
return ends, nil
}
func (sm *serviceManager) GetServiceByTopic(topic string) (geminio.End, error) {
sm.mtx.RLock()
defer sm.mtx.RUnlock()
@@ -221,10 +262,30 @@ func (sm *serviceManager) GetServiceByTopic(topic string) (geminio.End, error) {
klog.V(2).Infof("get service by topic: %s, err: %s", topic, err)
return nil, err
}
return sm.services[mtopic.ServiceID], nil
}
func (sm *serviceManager) GetServicesByTopic(topic string) ([]geminio.End, error) {
sm.mtx.RLock()
defer sm.mtx.RUnlock()
mtopics, err := sm.repo.GetServiceTopics(topic)
if err != nil {
klog.V(2).Infof("get service by topic: %s, err: %s", topic, err)
return nil, err
}
ends := []geminio.End{}
for _, mtopic := range mtopics {
end, ok := sm.services[mtopic.ServiceID]
if ok {
ends = append(ends, end)
continue
}
// TODO warning the consistency
}
return ends, nil
}
func (sm *serviceManager) ListService() []geminio.End {
ends := []geminio.End{}
sm.mtx.RLock()

59
pkg/utils/misc.go Normal file
View File

@@ -0,0 +1,59 @@
package utils
import (
"encoding/binary"
"fmt"
"net"
"reflect"
"strings"
)
// TODO test it
func RemoveOmitEmptyTag(obj interface{}) interface{} {
v := reflect.ValueOf(obj)
t := reflect.TypeOf(obj)
if t.Kind() != reflect.Struct {
return obj
}
newStruct := reflect.New(t).Elem()
for i := 0; i < v.NumField(); i++ {
field := v.Field(i)
fieldType := t.Field(i)
tag := fieldType.Tag.Get("yaml")
if tag != "" {
tags := strings.Split(tag, ",")
newTags := []string{}
for _, t := range tags {
if t != "omitempty" {
newTags = append(newTags, t)
}
}
newTag := strings.Join(newTags, ",")
if newTag != "" {
fieldType.Tag = reflect.StructTag(fmt.Sprintf(`yaml:"%s"`, newTag))
} else {
fieldType.Tag = ""
}
}
newStruct.Field(i).Set(field)
}
return newStruct.Interface()
}
func IP2Int(ip net.IP) uint32 {
if len(ip) == 16 {
return binary.BigEndian.Uint32(ip[12:16])
}
return binary.BigEndian.Uint32(ip)
}
func Int2IP(nn uint32) net.IP {
ip := make(net.IP, 4)
binary.BigEndian.PutUint32(ip, nn)
return ip
}