diff --git a/README.md b/README.md index 70e6d32..9c67438 100644 --- a/README.md +++ b/README.md @@ -681,39 +681,199 @@ curl -X GET http://127.0.0.1:30010/v1/services/rpcs?service_id={service_id} ## Frontier配置 -如果需要更近一步定制你的Frontier实例,可以在这一节了解各个配置是如何工作的。 +如果需要更近一步定制你的Frontier实例,可以在这一节了解各个配置是如何工作的。定制完你的配置,保存为```frontier.yaml```,挂载到容器```/usr/conf/frontier.yaml```位置生效。 ### 最小化配置 简单起,你可以仅配置面向微服务和边缘节点的服务监听地址: ``` +# 微服务端配置 servicebound: + # 监听网络 listen: network: tcp + # 监听地址 addr: 0.0.0.0:30011 +# 边缘节点端配置 edgebound: + # 监听网络 listen: network: tcp + # 监听地址 addr: 0.0.0.0:30012 + # 找不到注册的GetEdgeID时,是否允许Frontier分配edgeID edgeid_alloc_when_no_idservice_on: true ``` ### TLS +对于用户来说,比较重要的TLS配置在微服务、边缘节点和控制面都是支持的,另支持mTLS,Frontier由此校验客户端携带的证书。 + ``` -tls: +servicebound: + listen: + addr: 0.0.0.0:30011 + network: tcp + tls: + # 是否开启TLS,默认不开启 + enable: false + # 证书和私钥,允许配置多对证书,由客户端协商确定 + certs: + - cert: servicebound.cert + key: servicebound.key + # 是否启用mtls,启动会校验客户端携带的证书是否由下面的CA签发 + mtls: false + # CA证书,用于校验客户端证书 + ca_certs: + - ca1.cert +edgebound: + listen: + addr: 0.0.0.0:30012 + network: tcp + tls: + # 是否开启TLS,默认不开启 + enable: false + # 证书和私钥,允许配置多对证书,由客户端协商确定 + certs: + - cert: edgebound.cert + key: edgebound.key + insecure_skip_verify: false + # 是否启用mtls,启动会校验客户端携带的证书是否由下面的CA签发 + mtls: false + # CA证书,用于校验客户端证书 + ca_certs: + - ca1.cert +``` + +### 外部MQ + +如果你需要配置外部MQ,Frontier也支持将相应的Topic转Publish到这些MQ。 + +**AMQP** + +``` +mqm: + amqp: + # 是否允许 + enable: false + # AMQP地址 + addrs: null + # 生产者 + producer: + # exchange名 + exchange: "" + # 等于Frontier内Topic的概念,数组值 + routing_keys: null +``` +对于AMQP来说,以上是最小配置,边缘节点Publish的消息Topic如果在routing_keys内,Frontier会Publish到exchange中,如果还有微服务或其他外部MQ也声明了该Topic,Frontier仍然会按照hashby来选择一个Publish。 + +**Kafka** + +``` +mqm: + kafka: + # 是否允许 + enable: false + # kafka地址 + addrs: null + # 生产者 + producer: + # 数组值 + topics: null +``` +对于Kafka来说,以上是最小配置,边缘节点Publish的消息Topic如果在上面数组中,Frontier会Publish过来。如果还有微服务或其他外部MQ也声明了该Topic,Frontier仍然会按照hashby来选择一个Publish。 + +**NATS** + +``` +mqm: + nats: + # 是否允许 + enable: false + # NATS地址 + addrs: null + producer: + # 等于Frontier内Topic的概念,数组值 + subjects: null + # 如果允许jetstream,会优先Publish到jetstream + jetstream: + enable: false + # jetstream名 + name: "" + producer: + # 等于Frontier内Topic的概念,数组值 + subjects: null +``` +NATS配置里,如果允许Jetstream,会优先使用Publish到Jetstream。如果还有微服务或其他外部MQ也声明了该Topic,Frontier仍然会按照hashby来选择一个Publish。 + +**NSQ** + +``` +mqm: + nsq: + # 是否允许 + enable: false + # NSQ地址 + addrs: null + producer: + # 数组值 + topics: null +``` +NSQ的Topic里,如果还有微服务或其他外部MQ也声明了该Topic,Frontier仍然会按照hashby来选择一个Publish。 + +**Redis** + +``` +mqm: + redis: + # 是否允许 + enable: false + # Redis地址 + addrs: null + # Redis DB + db: 0 + # 密码 + password: "" + producer: + # 等于Frontier内Topic的概念,数组值 + channels: null +``` +如果还有微服务或其他外部MQ也声明了该Topic,Frontier仍然会按照hashby来选择一个Publish。 + + +### 其他配置 + +``` +daemon: + # 是否开启PProf + pprof: + addr: 0.0.0.0:6060 + cpu_profile_rate: 0 + enable: true + # 资源限制 + rlimit: + enable: true + nofile: 102400 + # 控制面开启 +controlplane: enable: false - mtls: false - ca_certs: - - ca1.cert - - ca2.cert - certs: - - cert: edgebound.cert - key: edgebound.key - insecure_skip_verify: false + listen: + network: tcp + addr: 0.0.0.0:30010 +dao: + # 支持buntdb和sqlite3,都使用的in-memory模式,保持无状态 + backend: buntdb + # sqlite debug开启 + debug: false +exchange: + # Frontier根据edgeid srcip或random的哈希策略转发边缘节点的消息、RPC和打开流到微服务,默认edgeid + # 即相同的边缘节点总是会请求到相同的微服务。 + hashby: edgeid ``` +更多详细配置见 [frontier_all.yaml](./etc/frontier_all.yaml) + ## Frontier部署 在单Frontier实例下,可以根据环境选择以下方式部署你的Frontier实例。 @@ -752,27 +912,168 @@ helm install frontier ./ -f values.yaml 新增Frontlas组件用于构建集群,Frontlas同样也是无状态组件,并不在内存里留存其他信息,因此需要额外依赖Redis,你需要提供一个Redis连接信息给到Frontlas,支持 ```redis``` ```sentinel```和```redis-cluster```。 - _Frontier_:微服务和边缘数据面通信组件 -- _Frontlas_:集群管理组件,将微服务和边缘的元信息、活跃信息记录在Redis里 +- _Frontlas_:命名取自Frontier Atlas,集群管理组件,将微服务和边缘的元信息、活跃信息记录在Redis里 Frontier需要主动连接Frontlas以上报自己、微服务和边缘的活跃和状态,默认Frontlas的端口是: - ```:40011``` 提供给微服务连接,代替微服务在单Frontier实例下连接的30011端口 - ```:40012``` 提供给Frontier连接,上报状态 +你可以根据需要部署任意多个Frontier实例,而对于Frontlas,分开部署两个即可保障HA(高可用),因为不存储状态没有一致性问题。 + +### 配置 + +**Frontier**的frontier.yaml需要添加如下配置: + +``` +frontlas: + enable: true + dial: + network: tcp + addr: + - 127.0.0.1:40012 + tls: + metrics: + enable: false + interval: 0 +daemon: + # Frontier集群内的唯一ID + frontier_id: frontier01 +``` +Frontier需要连接Frontlas,用来上报自己、微服务和边缘的活跃和状态。 + + +**Frontlas**的frontlas.yaml最小化配置: + +``` +control_plane: + listen: + # 微服务改连接这个地址,用来发现集群的边缘节点所在的Frontier + network: tcp + addr: 0.0.0.0:40011 +frontier_plane: + # Frontier连接这个地址 + listen: + network: tcp + addr: 0.0.0.0:40012 + expiration: + # 微服务在redis内元信息的过期时间 + service_meta: 30 + # 边缘节点在redis内元信息的过期时间 + edge_meta: 30 +redis: + # 支持连接standalone、sentinel和cluster + mode: standalone + standalone: + network: tcp + addr: redis:6379 + db: 0 +``` + +更多详细配置见 [frontlas_all.yaml](./etc/frontlas_all.yaml) + ### 使用 -### 分布式 +由于使用Frontlas来发现可用的Frontier,因此微服务需要做出调整如下: -当部署多个Frontier实例时,跨实例的微服务和边缘节点寻址一定需要分布式存储,如果没有Frontlas,这部分的存储工作 +**微服务获取Service** -### 高可用 +``` +package main + +import ( + "net" + "github.com/singchia/frontier/api/dataplane/v1/service" +) + +func main() { + // 改使用NewClusterService来获取Service + svc, err := service.NewClusterService("127.0.0.1:40011") + // 开始使用service,其他一切保持不变 +} +``` + +**边缘节点获取连接地址** + +对于边缘节点来说,依然连接Frontier,不过可以从Frontlas来获取可用的Frontier地址,Frontlas提供了列举Frontier实例接口: + +``` +curl -X http://127.0.0.1:40011/cluster/v1/frontiers +``` +你可以在这个接口上封装一下,提供给边缘节点做负载均衡或者高可用,或加上mTLS直接提供给边缘节点(不建议)。 + +**控制面GRPC** 详见[Protobuf定义](./api/controlplane/frontlas/v1/cluster.proto) + +Frontlas控制面与Frontier不同,是面向集群的控制面,目前只提供了读取集群的接口 + +```protobuf +service ClusterService { + rpc GetFrontierByEdge(GetFrontierByEdgeIDRequest) returns (GetFrontierByEdgeIDResponse); + rpc ListFrontiers(ListFrontiersRequest) returns (ListFrontiersResponse); + + rpc ListEdges(ListEdgesRequest) returns (ListEdgesResponse); + rpc GetEdgeByID(GetEdgeByIDRequest) returns (GetEdgeByIDResponse); + rpc GetEdgesCount(GetEdgesCountRequest) returns (GetEdgesCountResponse); + + rpc ListServices(ListServicesRequest) returns (ListServicesResponse) ; + rpc GetServiceByID(GetServiceByIDRequest) returns (GetServiceByIDResponse) ; + rpc GetServicesCount(GetServicesCountRequest) returns (GetServicesCountResponse) ; +} +``` -### 水平扩展 ## k8s ### Operator +**安装CRD和Operator** + +按照以下步骤安装和部署Operator到你的.kubeconfig环境中: + +``` +git clone https://github.com/singchia/frontier.git +cd pkg/operator +make install && make deploy +``` + +**CR** + +``` +apiVersion: frontier.singchia.io/v1alpha1 +kind: FrontierCluster +metadata: + labels: + app.kubernetes.io/name: frontiercluster + app.kubernetes.io/managed-by: kustomize + name: frontiercluster +spec: + frontier: + # 单实例Frontier + replicas: 1 + # 微服务侧端口 + servicebound: + port: 30011 + # 边缘节点侧端口 + edgebound: + port: 30012 + frontlas: + # 单实例Frontlas + replicas: 1 + # 控制面端口 + controlplane: + port: 40011 + # 依赖的Redis配置 + redis: + addrs: + - rfs-redisfailover:26379 + password: your-password + masterName: mymaster + redisType: sentinel +``` + +1分钟,你即可拥有一个Frontier+Frontlas的集群。 + + ## 开发 ### 路线图 diff --git a/etc/frontier_all.yaml b/etc/frontier_all.yaml index 84e5249..7e65382 100644 --- a/etc/frontier_all.yaml +++ b/etc/frontier_all.yaml @@ -10,60 +10,60 @@ daemon: controlplane: enable: false listen: - addr: 0.0.0.0:30010 - advertised_addr: "" network: tcp - tls: - ca_certs: null - certs: null - enable: false - insecure_skip_verify: false - mtls: false + addr: 0.0.0.0:30010 + # advertised_addr: "" + # tls: + # ca_certs: null + # certs: null + # enable: false + # insecure_skip_verify: false + # mtls: false servicebound: listen: + network: tcp addr: 0.0.0.0:30011 - advertised_addr: "" - network: tcp - tls: - ca_certs: - - ca1.cert - - ca2.cert - certs: - - cert: servicebound.cert - key: servicebound.key - enable: false - insecure_skip_verify: false - mtls: false + # advertised_addr: "" + # tls: + # ca_certs: + # - ca1.cert + # - ca2.cert + # certs: + # - cert: servicebound.cert + # key: servicebound.key + # enable: false + # insecure_skip_verify: false + # mtls: false edgebound: - bypass: - addr: 192.168.1.10:8443 - advertised_addr: "" - network: tcp - tls: - ca_certs: - - ca1.cert - certs: - - cert: frontier.cert - key: frontier.key - enable: true - insecure_skip_verify: false - mtls: true - bypass_enable: false - edgeid_alloc_when_no_idservice_on: true listen: - addr: 0.0.0.0:30012 - advertised_addr: "" network: tcp - tls: - ca_certs: - - ca1.cert - - ca2.cert - certs: - - cert: edgebound.cert - key: edgebound.key - enable: false - insecure_skip_verify: false - mtls: false + addr: 0.0.0.0:30012 + # advertised_addr: "" + # tls: + # ca_certs: + # - ca1.cert + # - ca2.cert + # certs: + # - cert: edgebound.cert + # key: edgebound.key + # enable: false + # insecure_skip_verify: false + # mtls: false + edgeid_alloc_when_no_idservice_on: true + # bypass: + # addr: 192.168.1.10:8443 + # advertised_addr: "" + # network: tcp + # tls: + # ca_certs: + # - ca1.cert + # certs: + # - cert: frontier.cert + # key: frontier.key + # enable: true + # insecure_skip_verify: false + # mtls: true + # bypass_enable: false dao: backend: buntdb debug: false @@ -75,7 +75,8 @@ frontlas: enable: false interval: 0 dial: - addr: 127.0.0.1:40012 + addrs: + - 127.0.0.1:40012 network: tcp tls: ca_certs: null @@ -87,59 +88,119 @@ mqm: amqp: enable: false addrs: null + # 0 max channels means 2^16 - 1 channel_max: 0 + # exchange to declare exchanges: null + # 0 max bytes means unlimited frame_size: 0 + # less than 1s uses the server's interval heartbeat: 0 + # Connection locale that we expect to always be en_US + # Even though servers must return it as per the AMQP 0-9-1 spec, + # we are not aware of it being used other than to satisfy the spec requirements locale: "" producer: - app_id: "" - content_encoding: "" - content_type: "" - delivery_mode: 0 + # exchange name to produce exchange: "" + routing_keys: null + # creating application id + app_id: "" + # MIME content encoding + content_encoding: "" + # MIME content type + content_type: "" + # Transient (0 or 1) or Persistent (2) + delivery_mode: 0 expiration: "" + # message related headers headers: null immediate: false mandatory: false + # 0 to 9 priority: 0 + # address to to reply to (ex: RPC) reply_to: "" - routing_keys: null + # message type name type: "" + # creating user id - ex: "guest" user_id: "" queueBindings: null queues: null + # Vhost specifies the namespace of permissions, exchanges, queues and + # bindings on the server. Dial sets this to the path parsed from the URL. vhost: "" kafka: enable: false addrs: null producer: - async: false - compression: none - compression_level: 0 - flush: - bytes: 0 - frequency: 0 - max_messages: 0 - messages: 0 - idempotent: false - max_message_bytes: 0 - required_acks: 0 - retry: - backoff: 0 - max: 0 - timeout: 0 + # topics to notify frontier which topics to allow to publish topics: null + # The type of compression to use on messages (defaults to no compression). + # Similar to `compression.codec` setting of the JVM producer. + compression: none + # The level of compression to use on messages. The meaning depends + # on the actual compression type used and defaults to default compression + # level for the codec. + compression_level: 0 + # If enabled, the producer will ensure that exactly one copy of each message is + # written. + idempotent: false + # The maximum permitted size of a message (defaults to 1000000). Should be + # set equal to or smaller than the broker's `message.max.bytes`. + max_message_bytes: 0 + # The level of acknowledgement reliability needed from the broker (defaults + # to WaitForLocal). Equivalent to the `request.required.acks` setting of the + # JVM producer. + required_acks: 0 + # The maximum duration the broker will wait the receipt of the number of + # RequiredAcks (defaults to 10 seconds). This is only relevant when + # RequiredAcks is set to WaitForAll or a number > 1. Only supports + # millisecond resolution, nanoseconds will be truncated. Equivalent to + # the JVM producer's `request.timeout.ms` setting. + timeout: 0 + # The following config options control how often messages are batched up and + # sent to the broker. By default, messages are sent as fast as possible, and + # all messages received while the current batch is in-flight are placed + # into the subsequent batch. + flush: + # The best-effort number of bytes needed to trigger a flush. Use the + # global sarama.MaxRequestSize to set a hard upper limit. + bytes: 0 + # The best-effort frequency of flushes. Equivalent to + # `queue.buffering.max.ms` setting of JVM producer. + frequency: 0 + # The maximum number of messages the producer will send in a single + # broker request. Defaults to 0 for unlimited. Similar to + # `queue.buffering.max.messages` in the JVM producer. + max_messages: 0 + # The best-effort number of messages needed to trigger a flush. Use + # `MaxMessages` to set a hard upper limit. + messages: 0 + retry: + #How long to wait for the cluster to settle between retries + # (default 100ms). Similar to the `retry.backoff.ms` setting of the + # JVM producer. + backoff: 0 + # The total number of times to retry sending a message (default 3). + # Similar to the `message.send.max.retries` setting of the JVM producer. + max: 0 + nats: enable: false addrs: null - jetStream: - enable: false - name: "" - producer: - subjects: null producer: + # topics to specific subjects: null + # jetstream will replace upper producer. + jetstream: + enable: false + # jetstream name to publish + name: "" + # jetstream producer + producer: + # topics to specific + subjects: null nsq: enable: false addrs: null @@ -151,5 +212,6 @@ mqm: db: 0 password: "" producer: + # topics to specific channels: null diff --git a/etc/frontlas_all.yaml b/etc/frontlas_all.yaml new file mode 100644 index 0000000..93df325 --- /dev/null +++ b/etc/frontlas_all.yaml @@ -0,0 +1,72 @@ +control_plane: + listen: + addr: 0.0.0.0:40011 + advertised_addr: "" + network: tcp + tls: + ca_certs: null + certs: null + enable: false + insecure_skip_verify: false + mtls: false +daemon: + p_prof: + addr: 0.0.0.0:6061 + cpu_profile_rate: 0 + enable: true + r_limit: + enable: false + num_file: 1024 +frontier_manager: + expiration: + edge_meta: 0 + service_meta: 0 + listen: + addr: 0.0.0.0:40012 + advertised_addr: "" + network: tcp + tls: + ca_certs: null + certs: null + enable: false + insecure_skip_verify: false + mtls: false +redis: + client_name: "" + cluster: + addrs: null + max_redirects: 0 + route_by_latency: false + route_randomly: false + conn_max_idle_time: 0 + conn_max_lifetime: 0 + dial_timeout: 0 + disable_indentity: false + identity_suffix: "" + max_active_conns: 0 + max_idle_conns: 0 + max_retries: 0 + max_retry_backoff: 0 + min_idle_conns: 0 + min_retry_backoff: 0 + mode: standalone + password: "" + pool_fifo: false + pool_size: 0 + pool_timeout: 0 + protocol: 0 + read_timeout: 0 + sentinel: + addrs: null + db: 0 + master_name: "" + replica_only: false + route_by_latency: false + route_randomly: false + use_disconnected_replicas: false + standalone: + addr: "" + db: 0 + network: "" + username: "" + write_timeout: 0 diff --git a/pkg/config/config.go b/pkg/config/config.go index ef52f07..35ee7bf 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -22,8 +22,8 @@ type Listen struct { } type Dial struct { - Network string `yaml:"network" json:"network"` - Addr string `yaml:"addr" json:"addr"` - AdvertisedAddr string `yaml:"advertised_addr,omitempty" json:"advertised_addr"` - TLS TLS `yaml:"tls,omitempty" json:"tls"` + Network string `yaml:"network" json:"network"` + Addrs []string `yaml:"addrs" json:"addrs"` + AdvertisedAddr string `yaml:"advertised_addr,omitempty" json:"advertised_addr"` + TLS TLS `yaml:"tls,omitempty" json:"tls"` } diff --git a/pkg/frontier/config/config.go b/pkg/frontier/config/config.go index 4cb7538..f48269e 100644 --- a/pkg/frontier/config/config.go +++ b/pkg/frontier/config/config.go @@ -202,7 +202,7 @@ type Nats struct { Producer struct { Subjects []string `yaml:"subjects" json:"subjects"` } `yaml:"producer,omitempty" json:"producer"` - } `yaml:"jetstream,omitempty" json:"jetStream"` + } `yaml:"jetstream,omitempty" json:"jetstream"` } type NSQ struct { @@ -355,7 +355,7 @@ func Parse() (*Configuration, error) { frontlasAddr := os.Getenv("FRONTLAS_ADDR") if frontlasAddr != "" { config.Frontlas.Enable = true - config.Frontlas.Dial.Addr = frontlasAddr + config.Frontlas.Dial.Addrs = []string{frontlasAddr} } return config, nil } @@ -424,7 +424,7 @@ func genAllConfig(writer io.Writer) error { BypassEnable: false, Bypass: config.Dial{ Network: "tcp", - Addr: "192.168.1.10:8443", + Addrs: []string{"192.168.1.10:8443"}, TLS: config.TLS{ Enable: true, MTLS: true, @@ -448,7 +448,7 @@ func genAllConfig(writer io.Writer) error { Enable: false, Dial: config.Dial{ Network: "tcp", - Addr: "127.0.0.1:40012", + Addrs: []string{"127.0.0.1:40012"}, TLS: config.TLS{ Enable: false, MTLS: false, diff --git a/pkg/frontier/edgebound/edge_manager.go b/pkg/frontier/edgebound/edge_manager.go index 8e746b7..048ad35 100644 --- a/pkg/frontier/edgebound/edge_manager.go +++ b/pkg/frontier/edgebound/edge_manager.go @@ -2,6 +2,8 @@ package edgebound import ( "context" + "errors" + "math/rand" "net" "strings" "sync" @@ -108,7 +110,10 @@ func newEdgeManager(conf *config.Configuration, repo apis.Repo, informer apis.Ed } func (em *edgeManager) bypassDial(_ net.Addr, _ interface{}) (net.Conn, error) { - return utils.Dial(&em.conf.Edgebound.Bypass) + if em.conf.Edgebound.Bypass.Addrs == nil || len(em.conf.Edgebound.Bypass.Addrs) == 0 { + return nil, errors.New("illegal bypass addrs") + } + return utils.Dial(&em.conf.Edgebound.Bypass, rand.Intn(len(em.conf.Edgebound.Bypass.Addrs))) } // Serve blocks until the Accept error diff --git a/pkg/frontier/frontlas/frontlas.go b/pkg/frontier/frontlas/frontlas.go index c7f445e..3e3b11f 100644 --- a/pkg/frontier/frontlas/frontlas.go +++ b/pkg/frontier/frontlas/frontlas.go @@ -3,6 +3,8 @@ package frontlas import ( "context" "encoding/json" + "errors" + "math/rand" "net" "sync" "sync/atomic" @@ -28,6 +30,9 @@ type Informer struct { func NewInformer(conf *config.Configuration, tmr timer.Timer) (*Informer, error) { dial := conf.Frontlas.Dial + if dial.Addrs == nil || len(dial.Addrs) == 0 { + return nil, errors.New("illegal dial addrs") + } sbAddr, ebAddr, err := getAdvertisedAddrs(conf.Servicebound.Listen, conf.Edgebound.Listen, dial) // meta @@ -44,7 +49,7 @@ func NewInformer(conf *config.Configuration, tmr timer.Timer) (*Informer, error) opt.SetMeta(data) dialer := func() (net.Conn, error) { - conn, err := utils.Dial(&dial) + conn, err := utils.Dial(&dial, rand.Intn(len(dial.Addrs))) if err != nil { klog.Errorf("frontlas new informer, dial err: %s", err) return nil, err @@ -103,7 +108,7 @@ func getAdvertisedAddrs(sblisten, eblisten gconfig.Listen, dial gconfig.Dial) (s ) getDefaultRouteHost := func() (string, error) { once.Do(func() { - ip, rerr := utils.GetDefaultRouteIP(dial.Network, dial.Addr) + ip, rerr := utils.GetDefaultRouteIP(dial.Network, dial.Addrs[0]) if err != nil { err = rerr return diff --git a/pkg/frontlas/config/config.go b/pkg/frontlas/config/config.go index adcd636..beb5775 100644 --- a/pkg/frontlas/config/config.go +++ b/pkg/frontlas/config/config.go @@ -1,6 +1,7 @@ package config import ( + "encoding/json" "flag" "io" "net" @@ -17,128 +18,128 @@ import ( // daemon related type RLimit struct { - Enable bool `yaml:"enable"` - NumFile int `yaml:"nofile"` + Enable bool `yaml:"enable" json:"enable"` + NumFile int `yaml:"nofile" json:"num_file"` } type PProf struct { - Enable bool `yaml:"enable"` - Addr string `yaml:"addr"` - CPUProfileRate int `yaml:"cpu_profile_rate"` + Enable bool `yaml:"enable" json:"enable"` + Addr string `yaml:"addr" json:"addr"` + CPUProfileRate int `yaml:"cpu_profile_rate" json:"cpu_profile_rate"` } type Daemon struct { - RLimit RLimit `yaml:"rlimit"` - PProf PProf `yaml:"pprof"` + RLimit RLimit `yaml:"rlimit" json:"r_limit"` + PProf PProf `yaml:"pprof" json:"p_prof"` } // for rest and grpc type ControlPlane struct { - Listen config.Listen `yaml:"listen"` + Listen config.Listen `yaml:"listen" json:"listen"` } // TODO tls support type Redis struct { - Mode string `yaml:"mode"` // standalone, sentinel or cluster + Mode string `yaml:"mode" json:"mode"` // standalone, sentinel or cluster // Use the specified Username to authenticate the current connection // with one of the connections defined in the ACL list when connecting // to a Redis 6.0 instance, or greater, that is using the Redis ACL system. - Username string `yaml:"username,omitempty"` + Username string `yaml:"username,omitempty" json:"username"` // Optional password. Must match the password specified in the // requirepass server configuration option (if connecting to a Redis 5.0 instance, or lower), // or the User Password when connecting to a Redis 6.0 instance, or greater, // that is using the Redis ACL system. - Password string `yaml:"password,omitempty"` + Password string `yaml:"password,omitempty" json:"password"` // Protocol 2 or 3. Use the version to negotiate RESP version with redis-server. // Default is 3. - Protocol int `yaml:"protocol,omitempty"` + Protocol int `yaml:"protocol,omitempty" json:"protocol"` // ClientName will execute the `CLIENT SETNAME ClientName` command for each conn. - ClientName string `yaml:"clientname,omitempty"` + ClientName string `yaml:"clientname,omitempty" json:"client_name"` // connection retry settings - MaxRetries int `yaml:"max_retries,omitempty"` - MinRetryBackoff int `yaml:"min_retry_backoff,omitempty"` - MaxRetryBackoff int `yaml:"max_retry_backoff,omitempty"` + MaxRetries int `yaml:"max_retries,omitempty" json:"max_retries"` + MinRetryBackoff int `yaml:"min_retry_backoff,omitempty" json:"min_retry_backoff"` + MaxRetryBackoff int `yaml:"max_retry_backoff,omitempty" json:"max_retry_backoff"` // connection r/w settings - DialTimeout int `yaml:"dial_timeout,omitempty"` - ReadTimeout int `yaml:"read_timeout,omitempty"` - WriteTimeout int `yaml:"write_timeout,omitempty"` + DialTimeout int `yaml:"dial_timeout,omitempty" json:"dial_timeout"` + ReadTimeout int `yaml:"read_timeout,omitempty" json:"read_timeout"` + WriteTimeout int `yaml:"write_timeout,omitempty" json:"write_timeout"` // connection pool settings - PoolFIFO bool `yaml:"pool_fifo,omitempty"` - PoolSize int `yaml:"pool_size,omitempty"` // applies per cluster node and not for the whole cluster - PoolTimeout int `yaml:"pool_timeout,omitempty"` - MinIdleConns int `yaml:"min_idle_conns,omitempty"` - MaxIdleConns int `yaml:"max_idle,omitempty"` - MaxActiveConns int `yaml:"max_active_conns,omitempty"` // applies per cluster node and not for the whole cluster - ConnMaxIdleTime int `yaml:"conn_max_idle_time,omitempty"` - ConnMaxLifetime int `yaml:"conn_max_life_time,omitempty"` - DisableIndentity bool `yaml:"disable_identity,omitempty"` // Disable set-lib on connect. Default is false. - IdentitySuffix string `yaml:"identity_suffix,omitempty"` // Add suffix to client name. Default is empty. + PoolFIFO bool `yaml:"pool_fifo,omitempty" json:"pool_fifo"` + PoolSize int `yaml:"pool_size,omitempty" json:"pool_size"` // applies per cluster node and not for the whole cluster + PoolTimeout int `yaml:"pool_timeout,omitempty" json:"pool_timeout"` + MinIdleConns int `yaml:"min_idle_conns,omitempty" json:"min_idle_conns"` + MaxIdleConns int `yaml:"max_idle,omitempty" json:"max_idle_conns"` + MaxActiveConns int `yaml:"max_active_conns,omitempty" json:"max_active_conns"` // applies per cluster node and not for the whole cluster + ConnMaxIdleTime int `yaml:"conn_max_idle_time,omitempty" json:"conn_max_idle_time"` + ConnMaxLifetime int `yaml:"conn_max_life_time,omitempty" json:"conn_max_lifetime"` + DisableIndentity bool `yaml:"disable_identity,omitempty" json:"disable_indentity"` // Disable set-lib on connect. Default is false. + IdentitySuffix string `yaml:"identity_suffix,omitempty" json:"identity_suffix"` // Add suffix to client name. Default is empty. Standalone struct { // The network type, either tcp or unix. // Default is tcp. - Network string `yaml:"network"` + Network string `yaml:"network" json:"network"` // host:port address. - Addr string `yaml:"addr"` + Addr string `yaml:"addr" json:"addr"` // CredentialsProvider allows the username and password to be updated // before reconnecting. It should return the current username and password. - DB int `yaml:"db"` - } `yaml:"standalone,omitempty"` + DB int `yaml:"db" json:"db"` + } `yaml:"standalone,omitempty" json:"standalone"` Sentinel struct { - Addrs []string `yaml:"addrs"` - MasterName string `yaml:"master_name"` - DB int `yaml:"db"` + Addrs []string `yaml:"addrs" json:"addrs"` + MasterName string `yaml:"master_name" json:"master_name"` + DB int `yaml:"db" json:"db"` // route settings // Allows routing read-only commands to the closest master or replica node. // This option only works with NewFailoverClusterClient. - RouteByLatency bool `yaml:"route_by_latency,omitempty"` + RouteByLatency bool `yaml:"route_by_latency,omitempty" json:"route_by_latency"` // Allows routing read-only commands to the random master or replica node. // This option only works with NewFailoverClusterClient. - RouteRandomly bool `yaml:"route_randomly,omitempty"` + RouteRandomly bool `yaml:"route_randomly,omitempty" json:"route_randomly"` // Route all commands to replica read-only nodes. - ReplicaOnly bool `yaml:"replica_only,omitempty"` + ReplicaOnly bool `yaml:"replica_only,omitempty" json:"replica_only"` // Use replicas disconnected with master when cannot get connected replicas // Now, this option only works in RandomReplicaAddr function. - UseDisconnectedReplicas bool `yaml:"use_disconnected_replicas,omitempty"` - } `yaml:"sentinel,omitempty"` + UseDisconnectedReplicas bool `yaml:"use_disconnected_replicas,omitempty" json:"use_disconnected_replicas"` + } `yaml:"sentinel,omitempty" json:"sentinel"` Cluster struct { - Addrs []string `yaml:"addrs"` + Addrs []string `yaml:"addrs" json:"addrs"` // The maximum number of retries before giving up. Command is retried // on network errors and MOVED/ASK redirects. // Default is 3 retries. - MaxRedirects int `yaml:"max_redirects,omitempty"` + MaxRedirects int `yaml:"max_redirects,omitempty" json:"max_redirects"` // Allows routing read-only commands to the closest master or slave node. // It automatically enables ReadOnly. - RouteByLatency bool `yaml:"route_by_latency,omitempty"` + RouteByLatency bool `yaml:"route_by_latency,omitempty" json:"route_by_latency"` // Allows routing read-only commands to the random master or slave node. // It automatically enables ReadOnly. - RouteRandomly bool `yaml:"route_randomly,omitempty"` - } `yaml:"cluster,omitempty"` + RouteRandomly bool `yaml:"route_randomly,omitempty" json:"route_randomly"` + } `yaml:"cluster,omitempty" json:"cluster"` } type FrontierManager struct { - Listen config.Listen `yaml:"listen"` + Listen config.Listen `yaml:"listen" json:"listen"` Expiration struct { - ServiceMeta int `yaml:"service_meta"` // service meta expiration in redis, in seconds, default 86400s - EdgeMeta int `yaml:"edge_meta"` // edge meta expiration in redis, in seconds, default 86400s - } `yaml:"expiration,omitempty"` + ServiceMeta int `yaml:"service_meta" json:"service_meta"` // service meta expiration in redis, in seconds, default 86400s + EdgeMeta int `yaml:"edge_meta" json:"edge_meta"` // edge meta expiration in redis, in seconds, default 86400s + } `yaml:"expiration,omitempty" json:"expiration"` } type Configuration struct { - Daemon Daemon `yaml:"daemon"` + Daemon Daemon `yaml:"daemon" json:"daemon"` - ControlPlane ControlPlane `yaml:"control_plane"` + ControlPlane ControlPlane `yaml:"control_plane" json:"control_plane"` - FrontierManager FrontierManager `yaml:"frontier_plane"` + FrontierManager FrontierManager `yaml:"frontier_plane" json:"frontier_manager"` - Redis Redis `yaml:"redis"` + Redis Redis `yaml:"redis" json:"redis"` } func Parse() (*Configuration, error) { @@ -236,7 +237,54 @@ func Parse() (*Configuration, error) { return config, nil } -func genDefaultConfig(writer io.Writer) error { +func genAllConfig(writer io.Writer) error { + conf := &Configuration{ + Daemon: Daemon{ + RLimit: RLimit{ + NumFile: 1024, + }, + PProf: PProf{ + Enable: true, + Addr: "0.0.0.0:6061", + }, + }, + ControlPlane: ControlPlane{ + Listen: config.Listen{ + Network: "tcp", + Addr: "0.0.0.0:40011", + }, + }, + FrontierManager: FrontierManager{ + Listen: config.Listen{ + Network: "tcp", + Addr: "0.0.0.0:40012", + }, + }, + Redis: Redis{ + Mode: "standalone", + }, + } + data, err := json.Marshal(conf) + if err != nil { + return err + } + newConf := map[string]interface{}{} + err = yaml.Unmarshal(data, &newConf) + if err != nil { + return err + } + data, err = yaml.Marshal(newConf) + if err != nil { + return err + } + _, err = armio.WriteAll(data, writer) + if err != nil { + return err + } + return nil +} + +func genMinConfig(writer io.Writer) error { conf := &Configuration{ Daemon: Daemon{ RLimit: RLimit{ diff --git a/pkg/frontlas/config/config_test.go b/pkg/frontlas/config/config_test.go index 2db40b5..c4c470c 100644 --- a/pkg/frontlas/config/config_test.go +++ b/pkg/frontlas/config/config_test.go @@ -11,7 +11,19 @@ func TestGenDefaultConfig(t *testing.T) { t.Error(err) } defer file.Close() - err = genDefaultConfig(file) + err = genMinConfig(file) + if err != nil { + t.Error(err) + } +} + +func TestGenAllConfig(t *testing.T) { + file, err := os.OpenFile("../../../etc/frontlas_all.yaml", os.O_TRUNC|os.O_CREATE|os.O_RDWR, 0666) + if err != nil { + t.Error(err) + } + defer file.Close() + err = genAllConfig(file) if err != nil { t.Error(err) } diff --git a/pkg/utils/dial.go b/pkg/utils/dial.go index 1869fae..6fbde0b 100644 --- a/pkg/utils/dial.go +++ b/pkg/utils/dial.go @@ -3,6 +3,7 @@ package utils import ( "crypto/tls" "crypto/x509" + "errors" "net" "os" @@ -10,11 +11,19 @@ import ( "k8s.io/klog/v2" ) -func Dial(dial *config.Dial) (net.Conn, error) { +func Dial(dial *config.Dial, index int) (net.Conn, error) { + if len(dial.Addrs) == 0 { + return nil, errors.New("illegal addrs") + } var ( network string = dial.Network - addr string = dial.Addr + addr string ) + if index < len(dial.Addrs) { + addr = dial.Addrs[index] + } else { + addr = dial.Addrs[0] + } if !dial.TLS.Enable { conn, err := net.Dial(network, addr)