readme: barely finished

This commit is contained in:
singchia
2024-05-31 00:16:55 +08:00
parent 496cd7dbb3
commit 9b15862266
10 changed files with 671 additions and 157 deletions

331
README.md
View File

@@ -681,39 +681,199 @@ curl -X GET http://127.0.0.1:30010/v1/services/rpcs?service_id={service_id}
## Frontier配置
如果需要更近一步定制你的Frontier实例可以在这一节了解各个配置是如何工作的。
如果需要更近一步定制你的Frontier实例可以在这一节了解各个配置是如何工作的。定制完你的配置,保存为```frontier.yaml```,挂载到容器```/usr/conf/frontier.yaml```位置生效。
### 最小化配置
简单起,你可以仅配置面向微服务和边缘节点的服务监听地址:
```
# 微服务端配置
servicebound:
# 监听网络
listen:
network: tcp
# 监听地址
addr: 0.0.0.0:30011
# 边缘节点端配置
edgebound:
# 监听网络
listen:
network: tcp
# 监听地址
addr: 0.0.0.0:30012
# 找不到注册的GetEdgeID时是否允许Frontier分配edgeID
edgeid_alloc_when_no_idservice_on: true
```
### TLS
对于用户来说比较重要的TLS配置在微服务、边缘节点和控制面都是支持的另支持mTLSFrontier由此校验客户端携带的证书。
```
tls:
servicebound:
listen:
addr: 0.0.0.0:30011
network: tcp
tls:
# 是否开启TLS默认不开启
enable: false
# 证书和私钥,允许配置多对证书,由客户端协商确定
certs:
- cert: servicebound.cert
key: servicebound.key
# 是否启用mtls启动会校验客户端携带的证书是否由下面的CA签发
mtls: false
# CA证书用于校验客户端证书
ca_certs:
- ca1.cert
edgebound:
listen:
addr: 0.0.0.0:30012
network: tcp
tls:
# 是否开启TLS默认不开启
enable: false
# 证书和私钥,允许配置多对证书,由客户端协商确定
certs:
- cert: edgebound.cert
key: edgebound.key
insecure_skip_verify: false
# 是否启用mtls启动会校验客户端携带的证书是否由下面的CA签发
mtls: false
# CA证书用于校验客户端证书
ca_certs:
- ca1.cert
```
### 外部MQ
如果你需要配置外部MQFrontier也支持将相应的Topic转Publish到这些MQ。
**AMQP**
```
mqm:
amqp:
# 是否允许
enable: false
# AMQP地址
addrs: null
# 生产者
producer:
# exchange名
exchange: ""
# 等于Frontier内Topic的概念数组值
routing_keys: null
```
对于AMQP来说以上是最小配置边缘节点Publish的消息Topic如果在routing_keys内Frontier会Publish到exchange中如果还有微服务或其他外部MQ也声明了该TopicFrontier仍然会按照hashby来选择一个Publish。
**Kafka**
```
mqm:
kafka:
# 是否允许
enable: false
# kafka地址
addrs: null
# 生产者
producer:
# 数组值
topics: null
```
对于Kafka来说以上是最小配置边缘节点Publish的消息Topic如果在上面数组中Frontier会Publish过来。如果还有微服务或其他外部MQ也声明了该TopicFrontier仍然会按照hashby来选择一个Publish。
**NATS**
```
mqm:
nats:
# 是否允许
enable: false
# NATS地址
addrs: null
producer:
# 等于Frontier内Topic的概念数组值
subjects: null
# 如果允许jetstream会优先Publish到jetstream
jetstream:
enable: false
# jetstream名
name: ""
producer:
# 等于Frontier内Topic的概念数组值
subjects: null
```
NATS配置里如果允许Jetstream会优先使用Publish到Jetstream。如果还有微服务或其他外部MQ也声明了该TopicFrontier仍然会按照hashby来选择一个Publish。
**NSQ**
```
mqm:
nsq:
# 是否允许
enable: false
# NSQ地址
addrs: null
producer:
# 数组值
topics: null
```
NSQ的Topic里如果还有微服务或其他外部MQ也声明了该TopicFrontier仍然会按照hashby来选择一个Publish。
**Redis**
```
mqm:
redis:
# 是否允许
enable: false
# Redis地址
addrs: null
# Redis DB
db: 0
# 密码
password: ""
producer:
# 等于Frontier内Topic的概念数组值
channels: null
```
如果还有微服务或其他外部MQ也声明了该TopicFrontier仍然会按照hashby来选择一个Publish。
### 其他配置
```
daemon:
# 是否开启PProf
pprof:
addr: 0.0.0.0:6060
cpu_profile_rate: 0
enable: true
# 资源限制
rlimit:
enable: true
nofile: 102400
# 控制面开启
controlplane:
enable: false
mtls: false
ca_certs:
- ca1.cert
- ca2.cert
certs:
- cert: edgebound.cert
key: edgebound.key
insecure_skip_verify: false
listen:
network: tcp
addr: 0.0.0.0:30010
dao:
# 支持buntdb和sqlite3都使用的in-memory模式保持无状态
backend: buntdb
# sqlite debug开启
debug: false
exchange:
# Frontier根据edgeid srcip或random的哈希策略转发边缘节点的消息、RPC和打开流到微服务默认edgeid
# 即相同的边缘节点总是会请求到相同的微服务。
hashby: edgeid
```
更多详细配置见 [frontier_all.yaml](./etc/frontier_all.yaml)
## Frontier部署
在单Frontier实例下可以根据环境选择以下方式部署你的Frontier实例。
@@ -752,27 +912,168 @@ helm install frontier ./ -f values.yaml
新增Frontlas组件用于构建集群Frontlas同样也是无状态组件并不在内存里留存其他信息因此需要额外依赖Redis你需要提供一个Redis连接信息给到Frontlas支持 ```redis``` ```sentinel```和```redis-cluster```。
- _Frontier_微服务和边缘数据面通信组件
- _Frontlas_集群管理组件将微服务和边缘的元信息、活跃信息记录在Redis里
- _Frontlas_命名取自Frontier Atlas集群管理组件将微服务和边缘的元信息、活跃信息记录在Redis里
Frontier需要主动连接Frontlas以上报自己、微服务和边缘的活跃和状态默认Frontlas的端口是
- ```:40011``` 提供给微服务连接代替微服务在单Frontier实例下连接的30011端口
- ```:40012``` 提供给Frontier连接上报状态
你可以根据需要部署任意多个Frontier实例而对于Frontlas分开部署两个即可保障HA高可用因为不存储状态没有一致性问题。
### 配置
**Frontier**的frontier.yaml需要添加如下配置
```
frontlas:
enable: true
dial:
network: tcp
addr:
- 127.0.0.1:40012
tls:
metrics:
enable: false
interval: 0
daemon:
# Frontier集群内的唯一ID
frontier_id: frontier01
```
Frontier需要连接Frontlas用来上报自己、微服务和边缘的活跃和状态。
**Frontlas**的frontlas.yaml最小化配置
```
control_plane:
listen:
# 微服务改连接这个地址用来发现集群的边缘节点所在的Frontier
network: tcp
addr: 0.0.0.0:40011
frontier_plane:
# Frontier连接这个地址
listen:
network: tcp
addr: 0.0.0.0:40012
expiration:
# 微服务在redis内元信息的过期时间
service_meta: 30
# 边缘节点在redis内元信息的过期时间
edge_meta: 30
redis:
# 支持连接standalone、sentinel和cluster
mode: standalone
standalone:
network: tcp
addr: redis:6379
db: 0
```
更多详细配置见 [frontlas_all.yaml](./etc/frontlas_all.yaml)
### 使用
### 分布式
由于使用Frontlas来发现可用的Frontier因此微服务需要做出调整如下
当部署多个Frontier实例时跨实例的微服务和边缘节点寻址一定需要分布式存储如果没有Frontlas这部分的存储工作
**微服务获取Service**
### 高可用
```
package main
import (
"net"
"github.com/singchia/frontier/api/dataplane/v1/service"
)
func main() {
// 改使用NewClusterService来获取Service
svc, err := service.NewClusterService("127.0.0.1:40011")
// 开始使用service其他一切保持不变
}
```
**边缘节点获取连接地址**
对于边缘节点来说依然连接Frontier不过可以从Frontlas来获取可用的Frontier地址Frontlas提供了列举Frontier实例接口
```
curl -X http://127.0.0.1:40011/cluster/v1/frontiers
```
你可以在这个接口上封装一下提供给边缘节点做负载均衡或者高可用或加上mTLS直接提供给边缘节点不建议
**控制面GRPC** 详见[Protobuf定义](./api/controlplane/frontlas/v1/cluster.proto)
Frontlas控制面与Frontier不同是面向集群的控制面目前只提供了读取集群的接口
```protobuf
service ClusterService {
rpc GetFrontierByEdge(GetFrontierByEdgeIDRequest) returns (GetFrontierByEdgeIDResponse);
rpc ListFrontiers(ListFrontiersRequest) returns (ListFrontiersResponse);
rpc ListEdges(ListEdgesRequest) returns (ListEdgesResponse);
rpc GetEdgeByID(GetEdgeByIDRequest) returns (GetEdgeByIDResponse);
rpc GetEdgesCount(GetEdgesCountRequest) returns (GetEdgesCountResponse);
rpc ListServices(ListServicesRequest) returns (ListServicesResponse) ;
rpc GetServiceByID(GetServiceByIDRequest) returns (GetServiceByIDResponse) ;
rpc GetServicesCount(GetServicesCountRequest) returns (GetServicesCountResponse) ;
}
```
### 水平扩展
## k8s
### Operator
**安装CRD和Operator**
按照以下步骤安装和部署Operator到你的.kubeconfig环境中
```
git clone https://github.com/singchia/frontier.git
cd pkg/operator
make install && make deploy
```
**CR**
```
apiVersion: frontier.singchia.io/v1alpha1
kind: FrontierCluster
metadata:
labels:
app.kubernetes.io/name: frontiercluster
app.kubernetes.io/managed-by: kustomize
name: frontiercluster
spec:
frontier:
# 单实例Frontier
replicas: 1
# 微服务侧端口
servicebound:
port: 30011
# 边缘节点侧端口
edgebound:
port: 30012
frontlas:
# 单实例Frontlas
replicas: 1
# 控制面端口
controlplane:
port: 40011
# 依赖的Redis配置
redis:
addrs:
- rfs-redisfailover:26379
password: your-password
masterName: mymaster
redisType: sentinel
```
1分钟你即可拥有一个Frontier+Frontlas的集群。
## 开发
### 路线图

View File

@@ -10,60 +10,60 @@ daemon:
controlplane:
enable: false
listen:
addr: 0.0.0.0:30010
advertised_addr: ""
network: tcp
tls:
ca_certs: null
certs: null
enable: false
insecure_skip_verify: false
mtls: false
addr: 0.0.0.0:30010
# advertised_addr: ""
# tls:
# ca_certs: null
# certs: null
# enable: false
# insecure_skip_verify: false
# mtls: false
servicebound:
listen:
network: tcp
addr: 0.0.0.0:30011
advertised_addr: ""
network: tcp
tls:
ca_certs:
- ca1.cert
- ca2.cert
certs:
- cert: servicebound.cert
key: servicebound.key
enable: false
insecure_skip_verify: false
mtls: false
# advertised_addr: ""
# tls:
# ca_certs:
# - ca1.cert
# - ca2.cert
# certs:
# - cert: servicebound.cert
# key: servicebound.key
# enable: false
# insecure_skip_verify: false
# mtls: false
edgebound:
bypass:
addr: 192.168.1.10:8443
advertised_addr: ""
network: tcp
tls:
ca_certs:
- ca1.cert
certs:
- cert: frontier.cert
key: frontier.key
enable: true
insecure_skip_verify: false
mtls: true
bypass_enable: false
edgeid_alloc_when_no_idservice_on: true
listen:
addr: 0.0.0.0:30012
advertised_addr: ""
network: tcp
tls:
ca_certs:
- ca1.cert
- ca2.cert
certs:
- cert: edgebound.cert
key: edgebound.key
enable: false
insecure_skip_verify: false
mtls: false
addr: 0.0.0.0:30012
# advertised_addr: ""
# tls:
# ca_certs:
# - ca1.cert
# - ca2.cert
# certs:
# - cert: edgebound.cert
# key: edgebound.key
# enable: false
# insecure_skip_verify: false
# mtls: false
edgeid_alloc_when_no_idservice_on: true
# bypass:
# addr: 192.168.1.10:8443
# advertised_addr: ""
# network: tcp
# tls:
# ca_certs:
# - ca1.cert
# certs:
# - cert: frontier.cert
# key: frontier.key
# enable: true
# insecure_skip_verify: false
# mtls: true
# bypass_enable: false
dao:
backend: buntdb
debug: false
@@ -75,7 +75,8 @@ frontlas:
enable: false
interval: 0
dial:
addr: 127.0.0.1:40012
addrs:
- 127.0.0.1:40012
network: tcp
tls:
ca_certs: null
@@ -87,59 +88,119 @@ mqm:
amqp:
enable: false
addrs: null
# 0 max channels means 2^16 - 1
channel_max: 0
# exchange to declare
exchanges: null
# 0 max bytes means unlimited
frame_size: 0
# less than 1s uses the server's interval
heartbeat: 0
# Connection locale that we expect to always be en_US
# Even though servers must return it as per the AMQP 0-9-1 spec,
# we are not aware of it being used other than to satisfy the spec requirements
locale: ""
producer:
app_id: ""
content_encoding: ""
content_type: ""
delivery_mode: 0
# exchange name to produce
exchange: ""
routing_keys: null
# creating application id
app_id: ""
# MIME content encoding
content_encoding: ""
# MIME content type
content_type: ""
# Transient (0 or 1) or Persistent (2)
delivery_mode: 0
expiration: ""
# message related headers
headers: null
immediate: false
mandatory: false
# 0 to 9
priority: 0
# address to to reply to (ex: RPC)
reply_to: ""
routing_keys: null
# message type name
type: ""
# creating user id - ex: "guest"
user_id: ""
queueBindings: null
queues: null
# Vhost specifies the namespace of permissions, exchanges, queues and
# bindings on the server. Dial sets this to the path parsed from the URL.
vhost: ""
kafka:
enable: false
addrs: null
producer:
async: false
compression: none
compression_level: 0
flush:
bytes: 0
frequency: 0
max_messages: 0
messages: 0
idempotent: false
max_message_bytes: 0
required_acks: 0
retry:
backoff: 0
max: 0
timeout: 0
# topics to notify frontier which topics to allow to publish
topics: null
# The type of compression to use on messages (defaults to no compression).
# Similar to `compression.codec` setting of the JVM producer.
compression: none
# The level of compression to use on messages. The meaning depends
# on the actual compression type used and defaults to default compression
# level for the codec.
compression_level: 0
# If enabled, the producer will ensure that exactly one copy of each message is
# written.
idempotent: false
# The maximum permitted size of a message (defaults to 1000000). Should be
# set equal to or smaller than the broker's `message.max.bytes`.
max_message_bytes: 0
# The level of acknowledgement reliability needed from the broker (defaults
# to WaitForLocal). Equivalent to the `request.required.acks` setting of the
# JVM producer.
required_acks: 0
# The maximum duration the broker will wait the receipt of the number of
# RequiredAcks (defaults to 10 seconds). This is only relevant when
# RequiredAcks is set to WaitForAll or a number > 1. Only supports
# millisecond resolution, nanoseconds will be truncated. Equivalent to
# the JVM producer's `request.timeout.ms` setting.
timeout: 0
# The following config options control how often messages are batched up and
# sent to the broker. By default, messages are sent as fast as possible, and
# all messages received while the current batch is in-flight are placed
# into the subsequent batch.
flush:
# The best-effort number of bytes needed to trigger a flush. Use the
# global sarama.MaxRequestSize to set a hard upper limit.
bytes: 0
# The best-effort frequency of flushes. Equivalent to
# `queue.buffering.max.ms` setting of JVM producer.
frequency: 0
# The maximum number of messages the producer will send in a single
# broker request. Defaults to 0 for unlimited. Similar to
# `queue.buffering.max.messages` in the JVM producer.
max_messages: 0
# The best-effort number of messages needed to trigger a flush. Use
# `MaxMessages` to set a hard upper limit.
messages: 0
retry:
#How long to wait for the cluster to settle between retries
# (default 100ms). Similar to the `retry.backoff.ms` setting of the
# JVM producer.
backoff: 0
# The total number of times to retry sending a message (default 3).
# Similar to the `message.send.max.retries` setting of the JVM producer.
max: 0
nats:
enable: false
addrs: null
jetStream:
enable: false
name: ""
producer:
subjects: null
producer:
# topics to specific
subjects: null
# jetstream will replace upper producer.
jetstream:
enable: false
# jetstream name to publish
name: ""
# jetstream producer
producer:
# topics to specific
subjects: null
nsq:
enable: false
addrs: null
@@ -151,5 +212,6 @@ mqm:
db: 0
password: ""
producer:
# topics to specific
channels: null

72
etc/frontlas_all.yaml Normal file
View File

@@ -0,0 +1,72 @@
control_plane:
listen:
addr: 0.0.0.0:40011
advertised_addr: ""
network: tcp
tls:
ca_certs: null
certs: null
enable: false
insecure_skip_verify: false
mtls: false
daemon:
p_prof:
addr: 0.0.0.0:6061
cpu_profile_rate: 0
enable: true
r_limit:
enable: false
num_file: 1024
frontier_manager:
expiration:
edge_meta: 0
service_meta: 0
listen:
addr: 0.0.0.0:40012
advertised_addr: ""
network: tcp
tls:
ca_certs: null
certs: null
enable: false
insecure_skip_verify: false
mtls: false
redis:
client_name: ""
cluster:
addrs: null
max_redirects: 0
route_by_latency: false
route_randomly: false
conn_max_idle_time: 0
conn_max_lifetime: 0
dial_timeout: 0
disable_indentity: false
identity_suffix: ""
max_active_conns: 0
max_idle_conns: 0
max_retries: 0
max_retry_backoff: 0
min_idle_conns: 0
min_retry_backoff: 0
mode: standalone
password: ""
pool_fifo: false
pool_size: 0
pool_timeout: 0
protocol: 0
read_timeout: 0
sentinel:
addrs: null
db: 0
master_name: ""
replica_only: false
route_by_latency: false
route_randomly: false
use_disconnected_replicas: false
standalone:
addr: ""
db: 0
network: ""
username: ""
write_timeout: 0

View File

@@ -22,8 +22,8 @@ type Listen struct {
}
type Dial struct {
Network string `yaml:"network" json:"network"`
Addr string `yaml:"addr" json:"addr"`
AdvertisedAddr string `yaml:"advertised_addr,omitempty" json:"advertised_addr"`
TLS TLS `yaml:"tls,omitempty" json:"tls"`
Network string `yaml:"network" json:"network"`
Addrs []string `yaml:"addrs" json:"addrs"`
AdvertisedAddr string `yaml:"advertised_addr,omitempty" json:"advertised_addr"`
TLS TLS `yaml:"tls,omitempty" json:"tls"`
}

View File

@@ -202,7 +202,7 @@ type Nats struct {
Producer struct {
Subjects []string `yaml:"subjects" json:"subjects"`
} `yaml:"producer,omitempty" json:"producer"`
} `yaml:"jetstream,omitempty" json:"jetStream"`
} `yaml:"jetstream,omitempty" json:"jetstream"`
}
type NSQ struct {
@@ -355,7 +355,7 @@ func Parse() (*Configuration, error) {
frontlasAddr := os.Getenv("FRONTLAS_ADDR")
if frontlasAddr != "" {
config.Frontlas.Enable = true
config.Frontlas.Dial.Addr = frontlasAddr
config.Frontlas.Dial.Addrs = []string{frontlasAddr}
}
return config, nil
}
@@ -424,7 +424,7 @@ func genAllConfig(writer io.Writer) error {
BypassEnable: false,
Bypass: config.Dial{
Network: "tcp",
Addr: "192.168.1.10:8443",
Addrs: []string{"192.168.1.10:8443"},
TLS: config.TLS{
Enable: true,
MTLS: true,
@@ -448,7 +448,7 @@ func genAllConfig(writer io.Writer) error {
Enable: false,
Dial: config.Dial{
Network: "tcp",
Addr: "127.0.0.1:40012",
Addrs: []string{"127.0.0.1:40012"},
TLS: config.TLS{
Enable: false,
MTLS: false,

View File

@@ -2,6 +2,8 @@ package edgebound
import (
"context"
"errors"
"math/rand"
"net"
"strings"
"sync"
@@ -108,7 +110,10 @@ func newEdgeManager(conf *config.Configuration, repo apis.Repo, informer apis.Ed
}
func (em *edgeManager) bypassDial(_ net.Addr, _ interface{}) (net.Conn, error) {
return utils.Dial(&em.conf.Edgebound.Bypass)
if em.conf.Edgebound.Bypass.Addrs == nil || len(em.conf.Edgebound.Bypass.Addrs) == 0 {
return nil, errors.New("illegal bypass addrs")
}
return utils.Dial(&em.conf.Edgebound.Bypass, rand.Intn(len(em.conf.Edgebound.Bypass.Addrs)))
}
// Serve blocks until the Accept error

View File

@@ -3,6 +3,8 @@ package frontlas
import (
"context"
"encoding/json"
"errors"
"math/rand"
"net"
"sync"
"sync/atomic"
@@ -28,6 +30,9 @@ type Informer struct {
func NewInformer(conf *config.Configuration, tmr timer.Timer) (*Informer, error) {
dial := conf.Frontlas.Dial
if dial.Addrs == nil || len(dial.Addrs) == 0 {
return nil, errors.New("illegal dial addrs")
}
sbAddr, ebAddr, err := getAdvertisedAddrs(conf.Servicebound.Listen, conf.Edgebound.Listen, dial)
// meta
@@ -44,7 +49,7 @@ func NewInformer(conf *config.Configuration, tmr timer.Timer) (*Informer, error)
opt.SetMeta(data)
dialer := func() (net.Conn, error) {
conn, err := utils.Dial(&dial)
conn, err := utils.Dial(&dial, rand.Intn(len(dial.Addrs)))
if err != nil {
klog.Errorf("frontlas new informer, dial err: %s", err)
return nil, err
@@ -103,7 +108,7 @@ func getAdvertisedAddrs(sblisten, eblisten gconfig.Listen, dial gconfig.Dial) (s
)
getDefaultRouteHost := func() (string, error) {
once.Do(func() {
ip, rerr := utils.GetDefaultRouteIP(dial.Network, dial.Addr)
ip, rerr := utils.GetDefaultRouteIP(dial.Network, dial.Addrs[0])
if err != nil {
err = rerr
return

View File

@@ -1,6 +1,7 @@
package config
import (
"encoding/json"
"flag"
"io"
"net"
@@ -17,128 +18,128 @@ import (
// daemon related
type RLimit struct {
Enable bool `yaml:"enable"`
NumFile int `yaml:"nofile"`
Enable bool `yaml:"enable" json:"enable"`
NumFile int `yaml:"nofile" json:"num_file"`
}
type PProf struct {
Enable bool `yaml:"enable"`
Addr string `yaml:"addr"`
CPUProfileRate int `yaml:"cpu_profile_rate"`
Enable bool `yaml:"enable" json:"enable"`
Addr string `yaml:"addr" json:"addr"`
CPUProfileRate int `yaml:"cpu_profile_rate" json:"cpu_profile_rate"`
}
type Daemon struct {
RLimit RLimit `yaml:"rlimit"`
PProf PProf `yaml:"pprof"`
RLimit RLimit `yaml:"rlimit" json:"r_limit"`
PProf PProf `yaml:"pprof" json:"p_prof"`
}
// for rest and grpc
type ControlPlane struct {
Listen config.Listen `yaml:"listen"`
Listen config.Listen `yaml:"listen" json:"listen"`
}
// TODO tls support
type Redis struct {
Mode string `yaml:"mode"` // standalone, sentinel or cluster
Mode string `yaml:"mode" json:"mode"` // standalone, sentinel or cluster
// Use the specified Username to authenticate the current connection
// with one of the connections defined in the ACL list when connecting
// to a Redis 6.0 instance, or greater, that is using the Redis ACL system.
Username string `yaml:"username,omitempty"`
Username string `yaml:"username,omitempty" json:"username"`
// Optional password. Must match the password specified in the
// requirepass server configuration option (if connecting to a Redis 5.0 instance, or lower),
// or the User Password when connecting to a Redis 6.0 instance, or greater,
// that is using the Redis ACL system.
Password string `yaml:"password,omitempty"`
Password string `yaml:"password,omitempty" json:"password"`
// Protocol 2 or 3. Use the version to negotiate RESP version with redis-server.
// Default is 3.
Protocol int `yaml:"protocol,omitempty"`
Protocol int `yaml:"protocol,omitempty" json:"protocol"`
// ClientName will execute the `CLIENT SETNAME ClientName` command for each conn.
ClientName string `yaml:"clientname,omitempty"`
ClientName string `yaml:"clientname,omitempty" json:"client_name"`
// connection retry settings
MaxRetries int `yaml:"max_retries,omitempty"`
MinRetryBackoff int `yaml:"min_retry_backoff,omitempty"`
MaxRetryBackoff int `yaml:"max_retry_backoff,omitempty"`
MaxRetries int `yaml:"max_retries,omitempty" json:"max_retries"`
MinRetryBackoff int `yaml:"min_retry_backoff,omitempty" json:"min_retry_backoff"`
MaxRetryBackoff int `yaml:"max_retry_backoff,omitempty" json:"max_retry_backoff"`
// connection r/w settings
DialTimeout int `yaml:"dial_timeout,omitempty"`
ReadTimeout int `yaml:"read_timeout,omitempty"`
WriteTimeout int `yaml:"write_timeout,omitempty"`
DialTimeout int `yaml:"dial_timeout,omitempty" json:"dial_timeout"`
ReadTimeout int `yaml:"read_timeout,omitempty" json:"read_timeout"`
WriteTimeout int `yaml:"write_timeout,omitempty" json:"write_timeout"`
// connection pool settings
PoolFIFO bool `yaml:"pool_fifo,omitempty"`
PoolSize int `yaml:"pool_size,omitempty"` // applies per cluster node and not for the whole cluster
PoolTimeout int `yaml:"pool_timeout,omitempty"`
MinIdleConns int `yaml:"min_idle_conns,omitempty"`
MaxIdleConns int `yaml:"max_idle,omitempty"`
MaxActiveConns int `yaml:"max_active_conns,omitempty"` // applies per cluster node and not for the whole cluster
ConnMaxIdleTime int `yaml:"conn_max_idle_time,omitempty"`
ConnMaxLifetime int `yaml:"conn_max_life_time,omitempty"`
DisableIndentity bool `yaml:"disable_identity,omitempty"` // Disable set-lib on connect. Default is false.
IdentitySuffix string `yaml:"identity_suffix,omitempty"` // Add suffix to client name. Default is empty.
PoolFIFO bool `yaml:"pool_fifo,omitempty" json:"pool_fifo"`
PoolSize int `yaml:"pool_size,omitempty" json:"pool_size"` // applies per cluster node and not for the whole cluster
PoolTimeout int `yaml:"pool_timeout,omitempty" json:"pool_timeout"`
MinIdleConns int `yaml:"min_idle_conns,omitempty" json:"min_idle_conns"`
MaxIdleConns int `yaml:"max_idle,omitempty" json:"max_idle_conns"`
MaxActiveConns int `yaml:"max_active_conns,omitempty" json:"max_active_conns"` // applies per cluster node and not for the whole cluster
ConnMaxIdleTime int `yaml:"conn_max_idle_time,omitempty" json:"conn_max_idle_time"`
ConnMaxLifetime int `yaml:"conn_max_life_time,omitempty" json:"conn_max_lifetime"`
DisableIndentity bool `yaml:"disable_identity,omitempty" json:"disable_indentity"` // Disable set-lib on connect. Default is false.
IdentitySuffix string `yaml:"identity_suffix,omitempty" json:"identity_suffix"` // Add suffix to client name. Default is empty.
Standalone struct {
// The network type, either tcp or unix.
// Default is tcp.
Network string `yaml:"network"`
Network string `yaml:"network" json:"network"`
// host:port address.
Addr string `yaml:"addr"`
Addr string `yaml:"addr" json:"addr"`
// CredentialsProvider allows the username and password to be updated
// before reconnecting. It should return the current username and password.
DB int `yaml:"db"`
} `yaml:"standalone,omitempty"`
DB int `yaml:"db" json:"db"`
} `yaml:"standalone,omitempty" json:"standalone"`
Sentinel struct {
Addrs []string `yaml:"addrs"`
MasterName string `yaml:"master_name"`
DB int `yaml:"db"`
Addrs []string `yaml:"addrs" json:"addrs"`
MasterName string `yaml:"master_name" json:"master_name"`
DB int `yaml:"db" json:"db"`
// route settings
// Allows routing read-only commands to the closest master or replica node.
// This option only works with NewFailoverClusterClient.
RouteByLatency bool `yaml:"route_by_latency,omitempty"`
RouteByLatency bool `yaml:"route_by_latency,omitempty" json:"route_by_latency"`
// Allows routing read-only commands to the random master or replica node.
// This option only works with NewFailoverClusterClient.
RouteRandomly bool `yaml:"route_randomly,omitempty"`
RouteRandomly bool `yaml:"route_randomly,omitempty" json:"route_randomly"`
// Route all commands to replica read-only nodes.
ReplicaOnly bool `yaml:"replica_only,omitempty"`
ReplicaOnly bool `yaml:"replica_only,omitempty" json:"replica_only"`
// Use replicas disconnected with master when cannot get connected replicas
// Now, this option only works in RandomReplicaAddr function.
UseDisconnectedReplicas bool `yaml:"use_disconnected_replicas,omitempty"`
} `yaml:"sentinel,omitempty"`
UseDisconnectedReplicas bool `yaml:"use_disconnected_replicas,omitempty" json:"use_disconnected_replicas"`
} `yaml:"sentinel,omitempty" json:"sentinel"`
Cluster struct {
Addrs []string `yaml:"addrs"`
Addrs []string `yaml:"addrs" json:"addrs"`
// The maximum number of retries before giving up. Command is retried
// on network errors and MOVED/ASK redirects.
// Default is 3 retries.
MaxRedirects int `yaml:"max_redirects,omitempty"`
MaxRedirects int `yaml:"max_redirects,omitempty" json:"max_redirects"`
// Allows routing read-only commands to the closest master or slave node.
// It automatically enables ReadOnly.
RouteByLatency bool `yaml:"route_by_latency,omitempty"`
RouteByLatency bool `yaml:"route_by_latency,omitempty" json:"route_by_latency"`
// Allows routing read-only commands to the random master or slave node.
// It automatically enables ReadOnly.
RouteRandomly bool `yaml:"route_randomly,omitempty"`
} `yaml:"cluster,omitempty"`
RouteRandomly bool `yaml:"route_randomly,omitempty" json:"route_randomly"`
} `yaml:"cluster,omitempty" json:"cluster"`
}
type FrontierManager struct {
Listen config.Listen `yaml:"listen"`
Listen config.Listen `yaml:"listen" json:"listen"`
Expiration struct {
ServiceMeta int `yaml:"service_meta"` // service meta expiration in redis, in seconds, default 86400s
EdgeMeta int `yaml:"edge_meta"` // edge meta expiration in redis, in seconds, default 86400s
} `yaml:"expiration,omitempty"`
ServiceMeta int `yaml:"service_meta" json:"service_meta"` // service meta expiration in redis, in seconds, default 86400s
EdgeMeta int `yaml:"edge_meta" json:"edge_meta"` // edge meta expiration in redis, in seconds, default 86400s
} `yaml:"expiration,omitempty" json:"expiration"`
}
type Configuration struct {
Daemon Daemon `yaml:"daemon"`
Daemon Daemon `yaml:"daemon" json:"daemon"`
ControlPlane ControlPlane `yaml:"control_plane"`
ControlPlane ControlPlane `yaml:"control_plane" json:"control_plane"`
FrontierManager FrontierManager `yaml:"frontier_plane"`
FrontierManager FrontierManager `yaml:"frontier_plane" json:"frontier_manager"`
Redis Redis `yaml:"redis"`
Redis Redis `yaml:"redis" json:"redis"`
}
func Parse() (*Configuration, error) {
@@ -236,7 +237,54 @@ func Parse() (*Configuration, error) {
return config, nil
}
func genDefaultConfig(writer io.Writer) error {
func genAllConfig(writer io.Writer) error {
conf := &Configuration{
Daemon: Daemon{
RLimit: RLimit{
NumFile: 1024,
},
PProf: PProf{
Enable: true,
Addr: "0.0.0.0:6061",
},
},
ControlPlane: ControlPlane{
Listen: config.Listen{
Network: "tcp",
Addr: "0.0.0.0:40011",
},
},
FrontierManager: FrontierManager{
Listen: config.Listen{
Network: "tcp",
Addr: "0.0.0.0:40012",
},
},
Redis: Redis{
Mode: "standalone",
},
}
data, err := json.Marshal(conf)
if err != nil {
return err
}
newConf := map[string]interface{}{}
err = yaml.Unmarshal(data, &newConf)
if err != nil {
return err
}
data, err = yaml.Marshal(newConf)
if err != nil {
return err
}
_, err = armio.WriteAll(data, writer)
if err != nil {
return err
}
return nil
}
func genMinConfig(writer io.Writer) error {
conf := &Configuration{
Daemon: Daemon{
RLimit: RLimit{

View File

@@ -11,7 +11,19 @@ func TestGenDefaultConfig(t *testing.T) {
t.Error(err)
}
defer file.Close()
err = genDefaultConfig(file)
err = genMinConfig(file)
if err != nil {
t.Error(err)
}
}
func TestGenAllConfig(t *testing.T) {
file, err := os.OpenFile("../../../etc/frontlas_all.yaml", os.O_TRUNC|os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
t.Error(err)
}
defer file.Close()
err = genAllConfig(file)
if err != nil {
t.Error(err)
}

View File

@@ -3,6 +3,7 @@ package utils
import (
"crypto/tls"
"crypto/x509"
"errors"
"net"
"os"
@@ -10,11 +11,19 @@ import (
"k8s.io/klog/v2"
)
func Dial(dial *config.Dial) (net.Conn, error) {
func Dial(dial *config.Dial, index int) (net.Conn, error) {
if len(dial.Addrs) == 0 {
return nil, errors.New("illegal addrs")
}
var (
network string = dial.Network
addr string = dial.Addr
addr string
)
if index < len(dial.Addrs) {
addr = dial.Addrs[index]
} else {
addr = dial.Addrs[0]
}
if !dial.TLS.Enable {
conn, err := net.Dial(network, addr)