fix: bugs bewteen frontlas and frontier on offline

This commit is contained in:
singchia
2024-05-06 23:13:35 +08:00
parent 28f9af4434
commit 3529d88a78
15 changed files with 275 additions and 447 deletions

View File

@@ -1,16 +1,25 @@
include ./Makefile.defs
REGISTRY?=registry.hub.docker.com/singchia
CC?=cc
all: frontier examples
all: frontier frontlas examples
.PHONY: frontier
frontier:
CC=${CC} CGO_ENABLED=1 go build -trimpath -ldflags "-s -w" -o ./frontier cmd/frontier/main.go
CC=${CC} CGO_ENABLED=1 go build -trimpath -ldflags "-s -w" -o ./bin/frontier cmd/frontier/main.go
.PHONY: frontier-linux
frontier-linux:
CC=${CC} GOOS=linux GOARCH=amd64 CGO_ENABLED=1 go build -trimpath -ldflags "-s -w" -o ./frontier cmd/frontier/main.go
CC=${CC} GOOS=linux GOARCH=amd64 CGO_ENABLED=1 go build -trimpath -ldflags "-s -w" -o ./bin/frontier cmd/frontier/main.go
.PHONY: frontlas
frontlas:
CC=${CC} CGO_ENABLED=1 go build -trimpath -ldflags "-s -w" -o ./bin/frontlas cmd/frontlas/main.go
.PHONY: frontlas-linux
frontlas-linux:
CC=${CC} GOOS=linux GOARCH=amd64 CGO_ENABLED=1 go build -trimpath -ldflags "-s -w" -o ./bin/frontlas cmd/frontlas/main.go
.PHONY: examples
examples:
@@ -18,20 +27,28 @@ examples:
.PHONY: clean
clean:
rm ./frontier || true
rm ./bin/frontier || true
rm ./bin/frontlas || true
make clean -C examples
make clean -C test/bench
.PHONY: install
install:
.PHONY: install-frontier
install-frontier:
install -m 0755 -d $(DESTDIR)$(BINDIR)
install -m 0755 -d $(DESTDIR)$(CONFDIR)
install -m 0755 ./frontier $(DESTDIR)$(BINDIR)
install -m 0755 ./pkg/frontier/config/frontier.yaml $(DESTDIR)$(CONFDIR)
install -m 0755 ./bin/frontier $(DESTDIR)$(BINDIR)
install -m 0755 ./etc/frontier.yaml $(DESTDIR)$(CONFDIR)
.PHONY: install-frontlas
install-frontlas:
install -m 0755 -d $(DESTDIR)$(BINDIR)
install -m 0755 -d $(DESTDIR)$(CONFDIR)
install -m 0755 ./bin/frontier $(DESTDIR)$(BINDIR)
install -m 0755 ./etc/frontier.yaml $(DESTDIR)$(CONFDIR)
.PHONY: image-frontier
image-frontier:
docker buildx build -t frontier:${VERSION} -f images/Dockerfile.frontier .
docker buildx build -t ${REGISTRY}/frontier:${VERSION} -f images/Dockerfile.frontier .
.PHONY: image-gen-api
image-gen-api:
@@ -46,16 +63,16 @@ container-frontier:
docker rm -f frontier
docker run -d --name frontier -p 2431:2431 -p 2432:2432 frontier:${VERSION} --config /usr/conf/frontier.yaml -v 5
.PHONY: frontier-api
frontier-api:
.PHONY: api-frontier
api-frontier:
docker run --rm -v ${PWD}/api/controlplane/frontier/v1:/api/controlplane/frontier/v1 image-gen-api:${VERSION}
.PHONY: frontlas-api
frontlas-api:
.PHONY: api-frontlas
api-frontlas:
docker run --rm -v ${PWD}/api/controlplane/frontlas/v1:/api/controlplane/frontlas/v1 image-gen-api:${VERSION}
.PHONY: bench
bench: container
bench: container-frontier
make bench -C test/bench
.PHONY: swagger

View File

@@ -6,6 +6,25 @@ daemon:
enable: true
addr: 0.0.0.0:6060
cpu_profile_rate: 0
controlplane:
enable: false
listen:
network: tcp
addr: 0.0.0.0:30010
servicebound:
listen:
network: tcp
addr: 0.0.0.0:30011
tls:
enable: false
mtls: false
ca_certs:
- ca1.cert
- ca2.cert
certs:
- cert: servicebound.cert
key: servicebound.key
insecure_skip_verify: false
edgebound:
listen:
network: tcp
@@ -34,25 +53,6 @@ edgebound:
insecure_skip_verify: false
bypass_enable: false
edgeid_alloc_when_no_idservice_on: true
servicebound:
listen:
network: tcp
addr: 0.0.0.0:30011
tls:
enable: false
mtls: false
ca_certs:
- ca1.cert
- ca2.cert
certs:
- cert: servicebound.cert
key: servicebound.key
insecure_skip_verify: false
controlplane:
enable: false
listen:
network: tcp
addr: 0.0.0.0:30010
dao:
debug: false
frontlas:
@@ -63,72 +63,4 @@ frontlas:
metrics:
enable: false
interval: 0
mqm:
kafka:
enable: false
addrs: []
producer:
topics: []
async: false
maxmessagebytes: 0
requiredacks: 0
timeout: 0
compression: none
compressionlevel: 0
idempotent: false
flush:
bytes: 0
messages: 0
frequency: 0
maxmessages: 0
retry:
max: 0
backoff: 0
amqp:
enable: false
addrs: []
vhost: ""
channelmax: 0
framesize: 0
heartbeat: 0
locale: ""
exchanges: []
queues: []
queuebindings: []
producer:
routingkeys: []
exchange: ""
mandatory: false
immediate: false
headers: {}
contenttype: ""
contentencoding: ""
deliverymode: 0
priority: 0
replyto: ""
expiration: ""
type: ""
userid: ""
appid: ""
nats:
enable: false
addrs: []
producer:
subjects: []
jetstream:
enable: false
name: ""
producer:
subjects: []
nsq:
enable: false
addrs: []
producer:
topics: []
redis:
enable: false
addrs: []
db: 0
password: ""
producer:
channels: []
mqm: {}

View File

@@ -15,81 +15,8 @@ frontier_plane:
network: tcp
addr: 0.0.0.0:30021
redis:
mode: ""
mode: standalone
standalone:
network: ""
addr: ""
protocol: 0
username: ""
password: ""
network: tcp
addr: 127.0.0.1:6379
db: 0
clientname: ""
maxretries: 0
minretrybackoff: 0
maxretrybackoff: 0
dialtimeout: 0
readtimeout: 0
writetimeout: 0
poolfifo: false
poolsize: 0
pooltimeout: 0
minidleconns: 0
maxidleconns: 0
maxactiveconns: 0
connmaxidletime: 0
connmaxlifetime: 0
disableindentity: false
identitysuffix: ""
sentinel:
addrs: []
master_name: ""
protocol: 0
username: ""
password: ""
db: 0
clientname: ""
routebylatency: false
routerandomly: false
replicaonly: false
usedisconnectedreplicas: false
maxretries: 0
minretrybackoff: 0
maxretrybackoff: 0
dialtimeout: 0
readtimeout: 0
writetimeout: 0
poolfifo: false
poolsize: 0
pooltimeout: 0
minidleconns: 0
maxidleconns: 0
maxactiveconns: 0
connmaxidletime: 0
connmaxlifetime: 0
disableindentity: false
identitysuffix: ""
cluster:
addrs: []
protocol: 0
username: ""
password: ""
clientname: ""
maxredirects: 0
routebylatency: false
routerandomly: false
maxretries: 0
minretrybackoff: 0
maxretrybackoff: 0
dialtimeout: 0
readtimeout: 0
writetimeout: 0
poolfifo: false
poolsize: 0
pooltimeout: 0
minidleconns: 0
maxidleconns: 0
maxactiveconns: 0
connmaxidletime: 0
connmaxlifetime: 0
disableindentity: false
identitysuffix: ""

2
go.mod
View File

@@ -10,7 +10,7 @@ require (
github.com/deckarep/golang-set/v2 v2.6.0
github.com/go-kratos/kratos/v2 v2.7.2
github.com/google/uuid v1.6.0
github.com/jumboframes/armorigo v0.4.0-rc.1
github.com/jumboframes/armorigo v0.4.1
github.com/nats-io/nats.go v1.33.1
github.com/nsqio/go-nsq v1.1.0
github.com/rabbitmq/amqp091-go v1.9.0

2
go.sum
View File

@@ -99,6 +99,8 @@ github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFF
github.com/jumboframes/armorigo v0.2.3/go.mod h1:sXe0R32y6V3oJD2eXcPzMlimvZx0xIDiLedpQOy06t4=
github.com/jumboframes/armorigo v0.4.0-rc.1 h1:+9AM5ZLM/KdF0ldLvhbaSRFLIWbcynIrCJZ2G9FJrnk=
github.com/jumboframes/armorigo v0.4.0-rc.1/go.mod h1:H4OlF0Jj8e+8LkAqDjeLtapNNnUuUXR/h4Q32Lqgf9o=
github.com/jumboframes/armorigo v0.4.1 h1:MiT21uAGl21yUaj7SjHg4veGtB5Q79+8d7MRJhVq2rM=
github.com/jumboframes/armorigo v0.4.1/go.mod h1:H4OlF0Jj8e+8LkAqDjeLtapNNnUuUXR/h4Q32Lqgf9o=
github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg=
github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=

View File

@@ -6,11 +6,11 @@ ARG TARGETOS
ARG TARGETARCH
ENV GO111MODULE=on \
GOPROXY=https://goproxy.io
GOPROXY=https://goproxy.io,direct
WORKDIR /go/src/github.com/singchia/frontier
RUN --mount=type=bind,readwrite,target=/go/src/github.com/singchia/frontier \
make DESTDIR=/tmp/install all install
make DESTDIR=/tmp/install all install-frontier
FROM alpine:3.14
@@ -21,8 +21,9 @@ RUN wget -q -O /etc/apk/keys/sgerrand.rsa.pub https://alpine-pkgs.sgerrand.com/s
RUN wget https://github.com/sgerrand/alpine-pkg-glibc/releases/download/2.34-r0/glibc-2.34-r0.apk
RUN apk add glibc-2.34-r0.apk
EXPOSE 2431
EXPOSE 2432
EXPOSE 30010
EXPOSE 30011
EXPOSE 30012
ENTRYPOINT ["/usr/bin/frontier"]
CMD ["--config", "/usr/conf/frontier.yaml"]

View File

@@ -72,31 +72,31 @@ type Kafka struct {
// used by the Producer.
Producer struct {
// topics to notify frontier which topics to allow to publish
Topics []string
Async bool
Topics []string `yaml:"topics"`
Async bool `yaml:"async"`
// The maximum permitted size of a message (defaults to 1000000). Should be
// set equal to or smaller than the broker's `message.max.bytes`.
MaxMessageBytes int
MaxMessageBytes int `yaml:"max_message_bytes,omitempty"`
// The level of acknowledgement reliability needed from the broker (defaults
// to WaitForLocal). Equivalent to the `request.required.acks` setting of the
// JVM producer.
RequiredAcks sarama.RequiredAcks
RequiredAcks sarama.RequiredAcks `yaml:"required_acks,omitempty"`
// The maximum duration the broker will wait the receipt of the number of
// RequiredAcks (defaults to 10 seconds). This is only relevant when
// RequiredAcks is set to WaitForAll or a number > 1. Only supports
// millisecond resolution, nanoseconds will be truncated. Equivalent to
// the JVM producer's `request.timeout.ms` setting.
Timeout int
Timeout int `yaml:"timeout,omitempty"`
// The type of compression to use on messages (defaults to no compression).
// Similar to `compression.codec` setting of the JVM producer.
Compression sarama.CompressionCodec
Compression sarama.CompressionCodec `yaml:"compression,omitempty"`
// The level of compression to use on messages. The meaning depends
// on the actual compression type used and defaults to default compression
// level for the codec.
CompressionLevel int
CompressionLevel int `yaml:"compression_level,omitempty"`
// If enabled, the producer will ensure that exactly one copy of each message is
// written.
Idempotent bool
Idempotent bool `yaml:"idepotent,omitempty"`
// The following config options control how often messages are batched up and
// sent to the broker. By default, messages are sent as fast as possible, and
@@ -105,28 +105,28 @@ type Kafka struct {
Flush struct {
// The best-effort number of bytes needed to trigger a flush. Use the
// global sarama.MaxRequestSize to set a hard upper limit.
Bytes int
Bytes int `yaml:"bytes,omitempty"`
// The best-effort number of messages needed to trigger a flush. Use
// `MaxMessages` to set a hard upper limit.
Messages int
Messages int `yaml:"messages,omitempty"`
// The best-effort frequency of flushes. Equivalent to
// `queue.buffering.max.ms` setting of JVM producer.
Frequency int
Frequency int `yaml:"frequency,omitempty"`
// The maximum number of messages the producer will send in a single
// broker request. Defaults to 0 for unlimited. Similar to
// `queue.buffering.max.messages` in the JVM producer.
MaxMessages int
}
MaxMessages int `yaml:"max_messages,omitempty"`
} `yaml:"flush,omitempty"`
Retry struct {
// The total number of times to retry sending a message (default 3).
// Similar to the `message.send.max.retries` setting of the JVM producer.
Max int
Max int `yaml:"max,omitempty"`
// How long to wait for the cluster to settle between retries
// (default 100ms). Similar to the `retry.backoff.ms` setting of the
// JVM producer.
Backoff int
}
}
Backoff int `yaml:"back_off,omitempty"`
} `yaml:"retry"`
} `yaml:"producer"`
}
type AMQP struct {
@@ -135,87 +135,85 @@ type AMQP struct {
Addrs []string `yaml:"addrs"`
// Vhost specifies the namespace of permissions, exchanges, queues and
// bindings on the server. Dial sets this to the path parsed from the URL.
Vhost string
Vhost string `yaml:"vhost,omitempty"`
// 0 max channels means 2^16 - 1
ChannelMax int
ChannelMax int `yaml:"channel_max,omitempty"`
// 0 max bytes means unlimited
FrameSize int
FrameSize int `yaml:"frame_size,omitempty"`
// less than 1s uses the server's interval
Heartbeat int
Heartbeat int `yaml:"heartbeat,omitempty"`
// Connection locale that we expect to always be en_US
// Even though servers must return it as per the AMQP 0-9-1 spec,
// we are not aware of it being used other than to satisfy the spec requirements
Locale string
Locale string `yaml:"locale,omitempty"`
// exchange to declare
Exchanges []struct {
// exchange name to declare
Name string
Name string `yaml:"name"`
// direct topic fanout headers, default direct
Kind string
Durable bool
AutoDelete bool
Internal bool
NoWait bool
}
Kind string `yaml:"kind,omitempty"`
Durable bool `yaml:"durable,omitempty"`
AutoDelete bool `yaml:"auto_delete,omitempty"`
Internal bool `yaml:"internal,omitempty"`
NoWait bool `yaml:"nowait,omitempty"`
} `yaml:"exchanges,omitempty"`
// queues to declare, default nil
Queues []struct {
Name string
Durable bool
AutoDelete bool
Exclustive bool
NoWait bool
Name string `yaml:"name"`
Durable bool `yaml:"durable,omitempty"`
AutoDelete bool `yaml:"auto_delete,omitempty"`
Exclustive bool `yaml:"exclustive,omitempty"`
NoWait bool `yaml:"nowait,omitempty"`
}
// queue bindings to exchange, default nil
QueueBindings []struct {
QueueName string
ExchangeName string
BindingKey string
NoWait bool
QueueName string `yaml:"queue_name"`
ExchangeName string `yaml:"exchange_name,omitempty"`
BindingKey string `yaml:"binding_key,omitempty"`
NoWait bool `yaml:"nowait,omitempty"`
}
Producer struct {
RoutingKeys []string // topics
Exchange string
Mandatory bool
Immediate bool
RoutingKeys []string `yaml:"routing_keys"` // topics
Exchange string `yaml:"exchange"`
Mandatory bool `yaml:"mandatory,omitempty"`
Immediate bool `yaml:"immediate,omitempty"`
// message related
Headers map[string]interface{}
Headers map[string]interface{} `yaml:"headers,omitempty"`
// properties
ContentType string // MIME content type
ContentEncoding string // MIME content encoding
DeliveryMode uint8 // Transient (0 or 1) or Persistent (2)
Priority uint8 // 0 to 9
ReplyTo string // address to to reply to (ex: RPC)
Expiration string // message expiration spec
Type string // message type name
UserId string // creating user id - ex: "guest"
AppId string // creating application id
}
ContentType string `yaml:"content_type,omitempty"` // MIME content type
ContentEncoding string `yaml:"content_encoding,omitempty"` // MIME content encoding
DeliveryMode uint8 `yaml:"delivery_mode,omitempty"` // Transient (0 or 1) or Persistent (2)
Priority uint8 `yaml:"priority,omitempty"` // 0 to 9
ReplyTo string `yaml:"reply_to,omitempty"` // address to to reply to (ex: RPC)
Expiration string `yaml:"expiration,omitempty"` // message expiration spec
Type string `yaml:"type,omitempty"` // message type name
UserId string `yaml:"user_id,omitempty"` // creating user id - ex: "guest"
AppId string `yaml:"app_id,omitempty"` // creating application id
} `yaml:"producer,omitempty"`
}
type Nats struct {
Enable bool `yaml:"enable"`
Addrs []string `yaml:"addrs"`
Producer struct {
Subjects []string // topics
}
Subjects []string `yaml:"subjects"` // topics
} `yaml:"producer,omitempty"`
JetStream struct {
// using jetstream instead of nats
Enable bool `yaml:"enable"`
Name string `yaml:"name"`
Producer struct {
Subjects []string
}
}
Subjects []string `yaml:"subjects"`
} `yaml:"producer,omitempty"`
} `yaml:"jetstream,omitempty"`
}
type NSQ struct {
Enable bool `yaml:"enable"`
Addrs []string `yaml:"addrs"`
Producer struct {
Topics []string
}
Topics []string `yaml:"topics"`
} `yaml:"producer"`
}
type Redis struct {
@@ -224,16 +222,16 @@ type Redis struct {
DB int `yaml:"db"`
Password string `yaml:"password"`
Producer struct {
Channels []string
}
Channels []string `yaml:"channels"`
} `yaml:"producer"`
}
type MQM struct {
Kafka Kafka `yaml:"kafka"`
AMQP AMQP `yaml:"amqp"`
Nats Nats `yaml:"nats"`
NSQ NSQ `yaml:"nsq"`
Redis Redis `yaml:"redis"`
Kafka Kafka `yaml:"kafka,omitempty"`
AMQP AMQP `yaml:"amqp,omitempty"`
Nats Nats `yaml:"nats,omitempty"`
NSQ NSQ `yaml:"nsq,omitempty"`
Redis Redis `yaml:"redis,omitempty"`
}
// exchange
@@ -245,11 +243,11 @@ type Dao struct {
// frontlas
type Frontlas struct {
Enable bool `yaml:"enable"`
Dial config.Dial
Enable bool `yaml:"enable"`
Dial config.Dial `yaml:"dial"`
Metrics struct {
Enable bool
Interval int // for stats
Enable bool `yaml:"enable"`
Interval int `yaml:"interval"` // for stats
}
}
@@ -264,7 +262,7 @@ type Configuration struct {
Dao Dao `yaml:"dao"`
Frontlas Frontlas
Frontlas Frontlas `yaml:"frontlas"`
MQM MQM `yaml:"mqm"`
}

View File

@@ -61,7 +61,7 @@ func TestParseFile(t *testing.T) {
}
func TestGenDefaultConfig(t *testing.T) {
file, err := os.OpenFile("../../../etc/frontier.yaml", os.O_CREATE|os.O_RDWR, 0666)
file, err := os.OpenFile("../../../etc/frontier.yaml", os.O_TRUNC|os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
t.Error(err)
}

View File

@@ -47,9 +47,9 @@ func NewInformer(conf *config.Configuration, tmr timer.Timer) (*Informer, error)
if err != nil {
return nil, err
}
opt := client.NewEndOptions()
opt := client.NewRetryEndOptions()
opt.SetMeta(data)
end, err := client.NewRetryEndWithDialer(dialer)
end, err := client.NewRetryEndWithDialer(dialer, opt)
if err != nil {
klog.Errorf("frontlas new retry end err: %s", err)
return nil, err

View File

@@ -2,12 +2,10 @@ package config
import (
"flag"
"fmt"
"io"
"os"
armio "github.com/jumboframes/armorigo/io"
"github.com/jumboframes/armorigo/log"
"github.com/singchia/frontier/pkg/config"
"github.com/spf13/pflag"
"gopkg.in/yaml.v2"
@@ -38,145 +36,88 @@ type ControlPlane struct {
// TODO tls support
type Redis struct {
Mode string `yaml:"mode"` // standalone, sentinel or cluster
Mode string `yaml:"mode"` // standalone, sentinel or cluster
// Use the specified Username to authenticate the current connection
// with one of the connections defined in the ACL list when connecting
// to a Redis 6.0 instance, or greater, that is using the Redis ACL system.
Username string `yaml:"username,omitempty"`
// Optional password. Must match the password specified in the
// requirepass server configuration option (if connecting to a Redis 5.0 instance, or lower),
// or the User Password when connecting to a Redis 6.0 instance, or greater,
// that is using the Redis ACL system.
Password string `yaml:"password,omitempty"`
// Protocol 2 or 3. Use the version to negotiate RESP version with redis-server.
// Default is 3.
Protocol int `yaml:"protocol,omitempty"`
// ClientName will execute the `CLIENT SETNAME ClientName` command for each conn.
ClientName string `yaml:"clientname,omitempty"`
// connection retry settings
MaxRetries int `yaml:"max_retries,omitempty"`
MinRetryBackoff int `yaml:"min_retry_backoff,omitempty"`
MaxRetryBackoff int `yaml:"max_retry_backoff,omitempty"`
// connection r/w settings
DialTimeout int `yaml:"dial_timeout,omitempty"`
ReadTimeout int `yaml:"read_timeout,omitempty"`
WriteTimeout int `yaml:"write_timeout,omitempty"`
// connection pool settings
PoolFIFO bool `yaml:"pool_fifo,omitempty"`
PoolSize int `yaml:"pool_size,omitempty"` // applies per cluster node and not for the whole cluster
PoolTimeout int `yaml:"pool_timeout,omitempty"`
MinIdleConns int `yaml:"min_idle_conns,omitempty"`
MaxIdleConns int `yaml:"max_idle,omitempty"`
MaxActiveConns int `yaml:"max_active_conns,omitempty"` // applies per cluster node and not for the whole cluster
ConnMaxIdleTime int `yaml:"conn_max_idle_time,omitempty"`
ConnMaxLifetime int `yaml:"conn_max_life_time,omitempty"`
DisableIndentity bool `yaml:"disable_identity,omitempty"` // Disable set-lib on connect. Default is false.
IdentitySuffix string `yaml:"identity_suffix,omitempty"` // Add suffix to client name. Default is empty.
Standalone struct {
// The network type, either tcp or unix.
// Default is tcp.
Network string
Network string `yaml:"network"`
// host:port address.
Addr string
// Protocol 2 or 3. Use the version to negotiate RESP version with redis-server.
// Default is 3.
Protocol int
// Use the specified Username to authenticate the current connection
// with one of the connections defined in the ACL list when connecting
// to a Redis 6.0 instance, or greater, that is using the Redis ACL system.
Username string
// Optional password. Must match the password specified in the
// requirepass server configuration option (if connecting to a Redis 5.0 instance, or lower),
// or the User Password when connecting to a Redis 6.0 instance, or greater,
// that is using the Redis ACL system.
Password string
Addr string `yaml:"addr"`
// CredentialsProvider allows the username and password to be updated
// before reconnecting. It should return the current username and password.
DB int
// ClientName will execute the `CLIENT SETNAME ClientName` command for each conn.
ClientName string
// connection retry settings
MaxRetries int
MinRetryBackoff int
MaxRetryBackoff int
// connection r/w settings
DialTimeout int
ReadTimeout int
WriteTimeout int
// connection pool settings
PoolFIFO bool
PoolSize int // applies per cluster node and not for the whole cluster
PoolTimeout int
MinIdleConns int
MaxIdleConns int
MaxActiveConns int // applies per cluster node and not for the whole cluster
ConnMaxIdleTime int
ConnMaxLifetime int
DisableIndentity bool // Disable set-lib on connect. Default is false.
IdentitySuffix string // Add suffix to client name. Default is empty.
}
DB int `yaml:"db"`
} `yaml:"standalone,omitempty"`
Sentinel struct {
Addrs []string `yaml:"addrs"`
MasterName string `yaml:"master_name"`
Protocol int
Username string
Password string
DB int
// ClientName will execute the `CLIENT SETNAME ClientName` command for each conn.
ClientName string
DB int `yaml:"db"`
// route settings
// Allows routing read-only commands to the closest master or replica node.
// This option only works with NewFailoverClusterClient.
RouteByLatency bool
RouteByLatency bool `yaml:"route_by_latency,omitempty"`
// Allows routing read-only commands to the random master or replica node.
// This option only works with NewFailoverClusterClient.
RouteRandomly bool
RouteRandomly bool `yaml:"route_randomly,omitempty"`
// Route all commands to replica read-only nodes.
ReplicaOnly bool
ReplicaOnly bool `yaml:"replica_only,omitempty"`
// Use replicas disconnected with master when cannot get connected replicas
// Now, this option only works in RandomReplicaAddr function.
UseDisconnectedReplicas bool
// connection retry settings
MaxRetries int
MinRetryBackoff int
MaxRetryBackoff int
// connection r/w settings
DialTimeout int
ReadTimeout int
WriteTimeout int
// connection pool settings
PoolFIFO bool
PoolSize int // applies per cluster node and not for the whole cluster
PoolTimeout int
MinIdleConns int
MaxIdleConns int
MaxActiveConns int // applies per cluster node and not for the whole cluster
ConnMaxIdleTime int
ConnMaxLifetime int
DisableIndentity bool // Disable set-lib on connect. Default is false.
IdentitySuffix string // Add suffix to client name. Default is empty.
}
UseDisconnectedReplicas bool `yaml:"use_disconnected_replicas,omitempty"`
} `yaml:"sentinel,omitempty"`
Cluster struct {
Addrs []string `yaml:"addrs"`
Protocol int
Username string
Password string
// ClientName will execute the `CLIENT SETNAME ClientName` command for each conn.
ClientName string
Addrs []string `yaml:"addrs"`
// The maximum number of retries before giving up. Command is retried
// on network errors and MOVED/ASK redirects.
// Default is 3 retries.
MaxRedirects int
MaxRedirects int `yaml:"max_redirects,omitempty"`
// Allows routing read-only commands to the closest master or slave node.
// It automatically enables ReadOnly.
RouteByLatency bool
RouteByLatency bool `yaml:"route_by_latency,omitempty"`
// Allows routing read-only commands to the random master or slave node.
// It automatically enables ReadOnly.
RouteRandomly bool
// connection retry settings
MaxRetries int
MinRetryBackoff int
MaxRetryBackoff int
// connection r/w settings
DialTimeout int
ReadTimeout int
WriteTimeout int
// connection pool settings
PoolFIFO bool
PoolSize int // applies per cluster node and not for the whole cluster
PoolTimeout int
MinIdleConns int
MaxIdleConns int
MaxActiveConns int // applies per cluster node and not for the whole cluster
ConnMaxIdleTime int
ConnMaxLifetime int
DisableIndentity bool // Disable set-lib on connect. Default is false.
IdentitySuffix string // Add suffix to client name. Default is empty.
}
RouteRandomly bool `yaml:"route_randomly,omitempty"`
} `yaml:"cluster,omitempty"`
}
type FrontierManager struct {
@@ -196,7 +137,6 @@ type Configuration struct {
func Parse() (*Configuration, error) {
var (
argConfigFile = pflag.String("config", "", "config file, default not configured")
argArmorigoLogLevel = pflag.String("loglevel", "info", "log level for armorigo log")
argDaemonRLimitNofile = pflag.Int("daemon-rlimit-nofile", -1, "SetRLimit for number of file of this daemon, default: -1 means ignore")
// TODO more command-line args
@@ -224,15 +164,6 @@ func Parse() (*Configuration, error) {
pflag.CommandLine.AddGoFlagSet(flag.CommandLine)
pflag.Parse()
// armorigo log
level, err := log.ParseLevel(*argArmorigoLogLevel)
if err != nil {
fmt.Println("parse log level err:", err)
return nil, err
}
log.SetLevel(level)
log.SetOutput(os.Stdout)
// config file
if *argConfigFile != "" {
// TODO the command-line is prior to config file
@@ -280,8 +211,14 @@ func genDefaultConfig(writer io.Writer) error {
Addr: "0.0.0.0:30021",
},
},
Redis: Redis{},
Redis: Redis{
Mode: "standalone",
},
}
conf.Redis.Standalone.Network = "tcp"
conf.Redis.Standalone.Addr = "127.0.0.1:6379"
conf.Redis.Standalone.DB = 0
data, err := yaml.Marshal(conf)
if err != nil {
return err

View File

@@ -6,7 +6,7 @@ import (
)
func TestGenDefaultConfig(t *testing.T) {
file, err := os.OpenFile("../../../etc/frontlas.yaml", os.O_CREATE|os.O_RDWR, 0666)
file, err := os.OpenFile("../../../etc/frontlas.yaml", os.O_TRUNC|os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
t.Error(err)
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/singchia/frontier/pkg/utils"
"github.com/singchia/geminio"
"github.com/singchia/geminio/delegate"
"github.com/singchia/geminio/pkg/id"
"github.com/singchia/geminio/server"
"github.com/singchia/go-timer/v2"
"k8s.io/klog/v2"
@@ -22,8 +23,9 @@ type FrontierManager struct {
*delegate.UnimplementedDelegate
conf *config.Configuration
repo *repo.Dao
tmr timer.Timer
repo *repo.Dao
tmr timer.Timer
idFactory id.IDFactory
ln net.Listener
}
@@ -32,9 +34,11 @@ func NewFrontierManager(conf *config.Configuration, dao *repo.Dao, tmr timer.Tim
listen := &conf.FrontierManager.Listen
fm := &FrontierManager{
conf: conf,
tmr: tmr,
repo: dao,
conf: conf,
tmr: tmr,
repo: dao,
// a simple unix timestamp incemental id factory
idFactory: id.DefaultIncIDCounter,
UnimplementedDelegate: &delegate.UnimplementedDelegate{},
}
ln, err := utils.Listen(listen)
@@ -42,6 +46,7 @@ func NewFrontierManager(conf *config.Configuration, dao *repo.Dao, tmr timer.Tim
klog.Errorf("frontier plane listen err: %s", err)
return nil, err
}
klog.V(1).Infof("server listening on: %v", ln.Addr())
fm.ln = ln
return fm, nil
@@ -68,6 +73,7 @@ func (fm *FrontierManager) handleConn(conn net.Conn) error {
opt.SetTimer(fm.tmr)
opt.SetDelegate(fm)
opt.SetLog(log.NewKLog())
opt.SetDelegate(fm)
end, err := server.NewEndWithConn(conn, opt)
if err != nil {
klog.Errorf("frontier manager handle conn, geminio server new err: %s", err)

View File

@@ -20,6 +20,10 @@ const (
)
// delegates for frontier itself
func (fm *FrontierManager) GetClientID(meta []byte) (uint64, error) {
return fm.idFactory.GetID(), nil
}
func (fm *FrontierManager) ConnOnline(d delegate.ConnDescriber) error {
instance := &gapis.FrontierInstance{}
err := json.Unmarshal(d.Meta(), instance)
@@ -27,6 +31,8 @@ func (fm *FrontierManager) ConnOnline(d delegate.ConnDescriber) error {
klog.Errorf("frontier manager conn online, json unmarshal err: %s", err)
return err
}
klog.V(1).Infof("frontier online, frontierID: %s", instance.FrontierID)
set, err := fm.repo.SetFrontierAndAlive(instance.FrontierID, &repo.Frontier{
FrontierID: instance.FrontierID,
AdvertisedServiceboundAddr: instance.AdvertisedServiceboundAddr,
@@ -52,6 +58,8 @@ func (fm *FrontierManager) ConnOffline(d delegate.ConnDescriber) error {
klog.Errorf("frontier manager conn offline, json unmarshal err: %s", err)
return err
}
klog.V(1).Infof("frontier offline, frontierID: %s", instance.FrontierID)
err = fm.repo.DeleteFrontier(instance.FrontierID)
if err != nil {
klog.Errorf("frontier manager conn offline, delete frontier: %s err: %s", instance.FrontierID, err)

View File

@@ -51,26 +51,26 @@ func NewDao(config *config.Configuration) (*Dao, error) {
opt := &redis.Options{
Network: sconf.Network,
Addr: sconf.Addr,
ClientName: sconf.ClientName,
Protocol: sconf.Protocol,
Username: sconf.Username,
Password: sconf.Password,
ClientName: conf.ClientName,
Protocol: conf.Protocol,
Username: conf.Username,
Password: conf.Password,
DB: sconf.DB,
MaxRetries: sconf.MaxRetries,
MinRetryBackoff: time.Duration(sconf.MinRetryBackoff) * time.Second,
MaxRetryBackoff: time.Duration(sconf.MaxRetryBackoff) * time.Second,
DialTimeout: time.Duration(sconf.DialTimeout) * time.Second,
ReadTimeout: time.Duration(sconf.ReadTimeout) * time.Second,
WriteTimeout: time.Duration(sconf.WriteTimeout) * time.Second,
PoolFIFO: sconf.PoolFIFO,
PoolSize: sconf.PoolSize,
PoolTimeout: time.Duration(sconf.PoolTimeout) * time.Second,
MinIdleConns: sconf.MinIdleConns,
MaxIdleConns: sconf.MaxIdleConns,
ConnMaxIdleTime: time.Duration(sconf.ConnMaxIdleTime) * time.Second,
ConnMaxLifetime: time.Duration(sconf.ConnMaxLifetime) * time.Second,
DisableIndentity: sconf.DisableIndentity,
IdentitySuffix: sconf.IdentitySuffix,
MaxRetries: conf.MaxRetries,
MinRetryBackoff: time.Duration(conf.MinRetryBackoff) * time.Second,
MaxRetryBackoff: time.Duration(conf.MaxRetryBackoff) * time.Second,
DialTimeout: time.Duration(conf.DialTimeout) * time.Second,
ReadTimeout: time.Duration(conf.ReadTimeout) * time.Second,
WriteTimeout: time.Duration(conf.WriteTimeout) * time.Second,
PoolFIFO: conf.PoolFIFO,
PoolSize: conf.PoolSize,
PoolTimeout: time.Duration(conf.PoolTimeout) * time.Second,
MinIdleConns: conf.MinIdleConns,
MaxIdleConns: conf.MaxIdleConns,
ConnMaxIdleTime: time.Duration(conf.ConnMaxIdleTime) * time.Second,
ConnMaxLifetime: time.Duration(conf.ConnMaxLifetime) * time.Second,
DisableIndentity: conf.DisableIndentity,
IdentitySuffix: conf.IdentitySuffix,
}
rds = redis.NewClient(opt)
_, err := rds.Ping(context.TODO()).Result()
@@ -85,29 +85,29 @@ func NewDao(config *config.Configuration) (*Dao, error) {
opt := &redis.FailoverOptions{
MasterName: sconf.MasterName,
SentinelAddrs: sconf.Addrs,
Protocol: sconf.Protocol,
Username: sconf.Username,
Password: sconf.Password,
Protocol: conf.Protocol,
Username: conf.Username,
Password: conf.Password,
DB: sconf.DB,
ClientName: sconf.ClientName,
ClientName: conf.ClientName,
RouteByLatency: sconf.RouteByLatency,
RouteRandomly: sconf.RouteRandomly,
ReplicaOnly: sconf.ReplicaOnly,
UseDisconnectedReplicas: sconf.UseDisconnectedReplicas,
MinRetryBackoff: time.Duration(sconf.MinRetryBackoff) * time.Second,
MaxRetryBackoff: time.Duration(sconf.MaxRetryBackoff) * time.Second,
DialTimeout: time.Duration(sconf.DialTimeout) * time.Second,
ReadTimeout: time.Duration(sconf.ReadTimeout) * time.Second,
WriteTimeout: time.Duration(sconf.WriteTimeout) * time.Second,
PoolFIFO: sconf.PoolFIFO,
PoolSize: sconf.PoolSize,
PoolTimeout: time.Duration(sconf.PoolTimeout) * time.Second,
MinIdleConns: sconf.MinIdleConns,
MaxIdleConns: sconf.MaxIdleConns,
ConnMaxIdleTime: time.Duration(sconf.ConnMaxIdleTime) * time.Second,
ConnMaxLifetime: time.Duration(sconf.ConnMaxLifetime) * time.Second,
DisableIndentity: sconf.DisableIndentity,
IdentitySuffix: sconf.IdentitySuffix,
MinRetryBackoff: time.Duration(conf.MinRetryBackoff) * time.Second,
MaxRetryBackoff: time.Duration(conf.MaxRetryBackoff) * time.Second,
DialTimeout: time.Duration(conf.DialTimeout) * time.Second,
ReadTimeout: time.Duration(conf.ReadTimeout) * time.Second,
WriteTimeout: time.Duration(conf.WriteTimeout) * time.Second,
PoolFIFO: conf.PoolFIFO,
PoolSize: conf.PoolSize,
PoolTimeout: time.Duration(conf.PoolTimeout) * time.Second,
MinIdleConns: conf.MinIdleConns,
MaxIdleConns: conf.MaxIdleConns,
ConnMaxIdleTime: time.Duration(conf.ConnMaxIdleTime) * time.Second,
ConnMaxLifetime: time.Duration(conf.ConnMaxLifetime) * time.Second,
DisableIndentity: conf.DisableIndentity,
IdentitySuffix: conf.IdentitySuffix,
}
rds = redis.NewFailoverClient(opt)
_, err := rds.Ping(context.TODO()).Result()
@@ -121,27 +121,27 @@ func NewDao(config *config.Configuration) (*Dao, error) {
cconf := conf.Cluster
opt := &redis.ClusterOptions{
Addrs: cconf.Addrs,
Protocol: cconf.Protocol,
Username: cconf.Username,
Password: cconf.Password,
ClientName: cconf.ClientName,
Protocol: conf.Protocol,
Username: conf.Username,
Password: conf.Password,
ClientName: conf.ClientName,
MaxRedirects: cconf.MaxRedirects,
RouteByLatency: cconf.RouteByLatency,
RouteRandomly: cconf.RouteRandomly,
MinRetryBackoff: time.Duration(cconf.MinRetryBackoff) * time.Second,
MaxRetryBackoff: time.Duration(cconf.MaxRetryBackoff) * time.Second,
DialTimeout: time.Duration(cconf.DialTimeout) * time.Second,
ReadTimeout: time.Duration(cconf.ReadTimeout) * time.Second,
WriteTimeout: time.Duration(cconf.WriteTimeout) * time.Second,
PoolFIFO: cconf.PoolFIFO,
PoolSize: cconf.PoolSize,
PoolTimeout: time.Duration(cconf.PoolTimeout) * time.Second,
MinIdleConns: cconf.MinIdleConns,
MaxIdleConns: cconf.MaxIdleConns,
ConnMaxIdleTime: time.Duration(cconf.ConnMaxIdleTime) * time.Second,
ConnMaxLifetime: time.Duration(cconf.ConnMaxLifetime) * time.Second,
DisableIndentity: cconf.DisableIndentity,
IdentitySuffix: cconf.IdentitySuffix,
MinRetryBackoff: time.Duration(conf.MinRetryBackoff) * time.Second,
MaxRetryBackoff: time.Duration(conf.MaxRetryBackoff) * time.Second,
DialTimeout: time.Duration(conf.DialTimeout) * time.Second,
ReadTimeout: time.Duration(conf.ReadTimeout) * time.Second,
WriteTimeout: time.Duration(conf.WriteTimeout) * time.Second,
PoolFIFO: conf.PoolFIFO,
PoolSize: conf.PoolSize,
PoolTimeout: time.Duration(conf.PoolTimeout) * time.Second,
MinIdleConns: conf.MinIdleConns,
MaxIdleConns: conf.MaxIdleConns,
ConnMaxIdleTime: time.Duration(conf.ConnMaxIdleTime) * time.Second,
ConnMaxLifetime: time.Duration(conf.ConnMaxLifetime) * time.Second,
DisableIndentity: conf.DisableIndentity,
IdentitySuffix: conf.IdentitySuffix,
}
rds = redis.NewClusterClient(opt)
_, err := rds.Ping(context.TODO()).Result()

View File

@@ -16,7 +16,7 @@ func Dial(dial *config.Dial) (net.Conn, error) {
addr string = dial.Addr
)
if dial.TLS.Enable {
if !dial.TLS.Enable {
conn, err := net.Dial(network, addr)
if err != nil {
return nil, err