frontlas: add basic directory of frontier atlas

This commit is contained in:
singchia
2024-03-18 22:45:34 +08:00
parent cae584ed5a
commit 5c5cabd6c5
10 changed files with 380 additions and 77 deletions

View File

@@ -10,7 +10,7 @@ frontier:
.PHONY: frontier-linux
frontier-linux:
CC=${CC} GOOS=linux GOARCH=amd64 go build -trimpath -ldflags "-s -w"-o ./frontier cmd/frontier/main.go
CC=${CC} GOOS=linux GOARCH=amd64 CGO_ENABLED=1 go build -trimpath -ldflags "-s -w" -o ./frontier cmd/frontier/main.go
.PHONY: examples
examples:

View File

@@ -2,91 +2,23 @@ package main
import (
"context"
"net/http"
_ "net/http/pprof"
"runtime"
"github.com/jumboframes/armorigo/sigaction"
"github.com/singchia/frontier/pkg/frontier/apis"
"github.com/singchia/frontier/pkg/frontier/config"
"github.com/singchia/frontier/pkg/frontier/mq"
"github.com/singchia/frontier/pkg/frontier/repo"
"github.com/singchia/frontier/pkg/frontier/server"
"github.com/singchia/frontier/pkg/utils"
"github.com/singchia/frontier/pkg/frontier"
"k8s.io/klog/v2"
)
func main() {
conf, err := config.Parse()
frontier, err := frontier.NewFrontier()
if err != nil {
klog.Errorf("parse flags err: %s", err)
klog.Errorf("new frontier err: %s", err)
return
}
// pprof
if conf.Daemon.PProf.Enable {
runtime.SetCPUProfileRate(conf.Daemon.PProf.CPUProfileRate)
go func() {
http.ListenAndServe(conf.Daemon.PProf.Addr, nil)
}()
}
// rlimit
if conf.Daemon.RLimit.Enable {
err = utils.SetRLimit(uint64(conf.Daemon.RLimit.NumFile))
if err != nil {
klog.Errorf("set rlimit err: %s", err)
return
}
}
klog.Infof("frontier starts")
defer func() {
klog.Infof("frontier ends")
klog.Flush()
}()
// new repo and mqm
repo, mqm, err := newMidwares(conf)
if err != nil {
klog.Errorf("new midwares err: %s", err)
return
}
defer func() {
repo.Close()
mqm.Close()
}()
// servers
srvs, err := server.NewServer(conf, repo, mqm)
if err != nil {
klog.Errorf("new server failed")
return
}
klog.V(2).Infof("new servers succeed")
srvs.Serve()
defer func() {
srvs.Close()
}()
frontier.Run()
sig := sigaction.NewSignal()
sig.Wait(context.TODO())
}
func newMidwares(conf *config.Configuration) (apis.Repo, apis.MQM, error) {
// repo
repo, err := repo.NewRepo(conf)
if err != nil {
klog.Errorf("new repo err: %s", err)
return nil, nil, err
}
klog.V(2).Infof("new repo succeed")
// mqm
mqm, err := mq.NewMQM(conf)
if err != nil {
klog.Errorf("new mq manager err: %s", err)
return nil, nil, err
}
klog.V(2).Infof("new mq manager succeed")
return repo, mqm, nil
frontier.Close()
}

2
go.mod
View File

@@ -2,6 +2,8 @@ module github.com/singchia/frontier
go 1.20
replace github.com/singchia/geminio => ../../moresec/singchia/geminio
require (
github.com/IBM/sarama v1.43.0
github.com/go-kratos/kratos/v2 v2.7.2

4
go.sum
View File

@@ -6,6 +6,8 @@ github.com/PuerkitoBio/purell v1.1.1 h1:WEQqlqaGbrPkxLJWfBwQmfEAE1Z7ONdDLqrN38tN
github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/census-instrumentation/opencensus-proto v0.4.1 h1:iKLQ0xPNFxR/2hzXZMrBo8f1j86j5WHzznCCQxV/b8g=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
@@ -122,8 +124,6 @@ github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqn
github.com/redis/go-redis/v9 v9.5.1 h1:H1X4D3yHPaYrkL5X06Wh6xNVM/pX0Ft4RV0vMGvLBh8=
github.com/redis/go-redis/v9 v9.5.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/singchia/geminio v1.1.5-rc.2 h1:C1k+W7OzKH+G1UfKb/8uLkj78wvlbNr9m4ww2rZ8Jr8=
github.com/singchia/geminio v1.1.5-rc.2/go.mod h1:CmDttmY18CGhpmeiVeGEBLXYKHIOoN3MSaEDMzfSOXA=
github.com/singchia/go-timer/v2 v2.0.3/go.mod h1:PgkEQc6io8slCUiT5rHzWKU4/P2HXHWk3WWfijZXAf4=
github.com/singchia/go-timer/v2 v2.2.1 h1:gJucmL99fkuNzGk2AfNPFpa1X3/4+aGO21KkjFAG624=
github.com/singchia/go-timer/v2 v2.2.1/go.mod h1:PgkEQc6io8slCUiT5rHzWKU4/P2HXHWk3WWfijZXAf4=

View File

@@ -0,0 +1,7 @@
package apis
import "errors"
var (
ErrUnsupportRedisServerMode = errors.New("unsupport redis-server mode")
)

View File

@@ -0,0 +1,175 @@
package config
// daemon related
type RLimit struct {
Enable bool `yaml:"enable"`
NumFile int `yaml:"nofile"`
}
type PProf struct {
Enable bool `yaml:"enable"`
Addr string `yaml:"addr"`
CPUProfileRate int `yaml:"cpu_profile_rate"`
}
type Daemon struct {
RLimit RLimit `yaml:"rlimit"`
PProf PProf `yaml:"pprof"`
}
// for rest grpc and geminio
type Server struct {
Network string `yaml:"network"`
Addr string `yaml:"addr"`
}
// TODO tls support
type Redis struct {
Mode string `yaml:"mode"` // standalone, sentinel or cluster
Standalone struct {
// The network type, either tcp or unix.
// Default is tcp.
Network string
// host:port address.
Addr string
// Protocol 2 or 3. Use the version to negotiate RESP version with redis-server.
// Default is 3.
Protocol int
// Use the specified Username to authenticate the current connection
// with one of the connections defined in the ACL list when connecting
// to a Redis 6.0 instance, or greater, that is using the Redis ACL system.
Username string
// Optional password. Must match the password specified in the
// requirepass server configuration option (if connecting to a Redis 5.0 instance, or lower),
// or the User Password when connecting to a Redis 6.0 instance, or greater,
// that is using the Redis ACL system.
Password string
// CredentialsProvider allows the username and password to be updated
// before reconnecting. It should return the current username and password.
DB int
// ClientName will execute the `CLIENT SETNAME ClientName` command for each conn.
ClientName string
// connection retry settings
MaxRetries int
MinRetryBackoff int
MaxRetryBackoff int
// connection r/w settings
DialTimeout int
ReadTimeout int
WriteTimeout int
// connection pool settings
PoolFIFO bool
PoolSize int // applies per cluster node and not for the whole cluster
PoolTimeout int
MinIdleConns int
MaxIdleConns int
MaxActiveConns int // applies per cluster node and not for the whole cluster
ConnMaxIdleTime int
ConnMaxLifetime int
DisableIndentity bool // Disable set-lib on connect. Default is false.
IdentitySuffix string // Add suffix to client name. Default is empty.
}
Sentinel struct {
Addrs []string `yaml:"addrs"`
MasterName string `yaml:"master_name"`
Protocol int
Username string
Password string
DB int
// ClientName will execute the `CLIENT SETNAME ClientName` command for each conn.
ClientName string
// route settings
// Allows routing read-only commands to the closest master or replica node.
// This option only works with NewFailoverClusterClient.
RouteByLatency bool
// Allows routing read-only commands to the random master or replica node.
// This option only works with NewFailoverClusterClient.
RouteRandomly bool
// Route all commands to replica read-only nodes.
ReplicaOnly bool
// Use replicas disconnected with master when cannot get connected replicas
// Now, this option only works in RandomReplicaAddr function.
UseDisconnectedReplicas bool
// connection retry settings
MaxRetries int
MinRetryBackoff int
MaxRetryBackoff int
// connection r/w settings
DialTimeout int
ReadTimeout int
WriteTimeout int
// connection pool settings
PoolFIFO bool
PoolSize int // applies per cluster node and not for the whole cluster
PoolTimeout int
MinIdleConns int
MaxIdleConns int
MaxActiveConns int // applies per cluster node and not for the whole cluster
ConnMaxIdleTime int
ConnMaxLifetime int
DisableIndentity bool // Disable set-lib on connect. Default is false.
IdentitySuffix string // Add suffix to client name. Default is empty.
}
Cluster struct {
Addrs []string `yaml:"addrs"`
Protocol int
Username string
Password string
// ClientName will execute the `CLIENT SETNAME ClientName` command for each conn.
ClientName string
// The maximum number of retries before giving up. Command is retried
// on network errors and MOVED/ASK redirects.
// Default is 3 retries.
MaxRedirects int
// Allows routing read-only commands to the closest master or slave node.
// It automatically enables ReadOnly.
RouteByLatency bool
// Allows routing read-only commands to the random master or slave node.
// It automatically enables ReadOnly.
RouteRandomly bool
// connection retry settings
MaxRetries int
MinRetryBackoff int
MaxRetryBackoff int
// connection r/w settings
DialTimeout int
ReadTimeout int
WriteTimeout int
// connection pool settings
PoolFIFO bool
PoolSize int // applies per cluster node and not for the whole cluster
PoolTimeout int
MinIdleConns int
MaxIdleConns int
MaxActiveConns int // applies per cluster node and not for the whole cluster
ConnMaxIdleTime int
ConnMaxLifetime int
DisableIndentity bool // Disable set-lib on connect. Default is false.
IdentitySuffix string // Add suffix to client name. Default is empty.
}
}
type Configuration struct {
Daemon Daemon `yaml:"daemon"`
Server Server `yaml:"server"`
Redis Redis `yaml:"redis"`
}

157
pkg/frontlas/repo/dao.go Normal file
View File

@@ -0,0 +1,157 @@
package repo
import (
"context"
"time"
"github.com/redis/go-redis/v9"
"github.com/singchia/frontier/pkg/frontlas/apis"
"github.com/singchia/frontier/pkg/frontlas/config"
"k8s.io/klog/v2"
)
const (
modeStandalone = iota
modeSentinel
modeCluster
)
type Dao struct {
mode int
rds *redis.Client
clusterrds *redis.ClusterClient
}
func newDao(config *config.Configuration) (*Dao, error) {
var (
rds *redis.Client
clusterrds *redis.ClusterClient
mode int
)
conf := config.Redis
switch conf.Mode {
case "standalone":
sconf := conf.Standalone
opt := &redis.Options{
Network: sconf.Network,
Addr: sconf.Addr,
ClientName: sconf.ClientName,
Protocol: sconf.Protocol,
Username: sconf.Username,
Password: sconf.Password,
DB: sconf.DB,
MaxRetries: sconf.MaxRetries,
MinRetryBackoff: time.Duration(sconf.MinRetryBackoff) * time.Second,
MaxRetryBackoff: time.Duration(sconf.MaxRetryBackoff) * time.Second,
DialTimeout: time.Duration(sconf.DialTimeout) * time.Second,
ReadTimeout: time.Duration(sconf.ReadTimeout) * time.Second,
WriteTimeout: time.Duration(sconf.WriteTimeout) * time.Second,
PoolFIFO: sconf.PoolFIFO,
PoolSize: sconf.PoolSize,
PoolTimeout: time.Duration(sconf.PoolTimeout) * time.Second,
MinIdleConns: sconf.MinIdleConns,
MaxIdleConns: sconf.MaxIdleConns,
ConnMaxIdleTime: time.Duration(sconf.ConnMaxIdleTime) * time.Second,
ConnMaxLifetime: time.Duration(sconf.ConnMaxLifetime) * time.Second,
DisableIndentity: sconf.DisableIndentity,
IdentitySuffix: sconf.IdentitySuffix,
}
rds = redis.NewClient(opt)
_, err := rds.Ping(context.TODO()).Result()
if err != nil {
klog.Errorf("redis standalone ping err: %s", err)
return nil, err
}
mode = modeStandalone
case "sentinel":
sconf := conf.Sentinel
opt := &redis.FailoverOptions{
MasterName: sconf.MasterName,
SentinelAddrs: sconf.Addrs,
Protocol: sconf.Protocol,
Username: sconf.Username,
Password: sconf.Password,
DB: sconf.DB,
ClientName: sconf.ClientName,
RouteByLatency: sconf.RouteByLatency,
RouteRandomly: sconf.RouteRandomly,
ReplicaOnly: sconf.ReplicaOnly,
UseDisconnectedReplicas: sconf.UseDisconnectedReplicas,
MinRetryBackoff: time.Duration(sconf.MinRetryBackoff) * time.Second,
MaxRetryBackoff: time.Duration(sconf.MaxRetryBackoff) * time.Second,
DialTimeout: time.Duration(sconf.DialTimeout) * time.Second,
ReadTimeout: time.Duration(sconf.ReadTimeout) * time.Second,
WriteTimeout: time.Duration(sconf.WriteTimeout) * time.Second,
PoolFIFO: sconf.PoolFIFO,
PoolSize: sconf.PoolSize,
PoolTimeout: time.Duration(sconf.PoolTimeout) * time.Second,
MinIdleConns: sconf.MinIdleConns,
MaxIdleConns: sconf.MaxIdleConns,
ConnMaxIdleTime: time.Duration(sconf.ConnMaxIdleTime) * time.Second,
ConnMaxLifetime: time.Duration(sconf.ConnMaxLifetime) * time.Second,
DisableIndentity: sconf.DisableIndentity,
IdentitySuffix: sconf.IdentitySuffix,
}
rds = redis.NewFailoverClient(opt)
_, err := rds.Ping(context.TODO()).Result()
if err != nil {
klog.Errorf("redis sentinel ping err: %s", err)
return nil, err
}
mode = modeSentinel
case "cluster":
cconf := conf.Cluster
opt := &redis.ClusterOptions{
Addrs: cconf.Addrs,
Protocol: cconf.Protocol,
Username: cconf.Username,
Password: cconf.Password,
ClientName: cconf.ClientName,
MaxRedirects: cconf.MaxRedirects,
RouteByLatency: cconf.RouteByLatency,
RouteRandomly: cconf.RouteRandomly,
MinRetryBackoff: time.Duration(cconf.MinRetryBackoff) * time.Second,
MaxRetryBackoff: time.Duration(cconf.MaxRetryBackoff) * time.Second,
DialTimeout: time.Duration(cconf.DialTimeout) * time.Second,
ReadTimeout: time.Duration(cconf.ReadTimeout) * time.Second,
WriteTimeout: time.Duration(cconf.WriteTimeout) * time.Second,
PoolFIFO: cconf.PoolFIFO,
PoolSize: cconf.PoolSize,
PoolTimeout: time.Duration(cconf.PoolTimeout) * time.Second,
MinIdleConns: cconf.MinIdleConns,
MaxIdleConns: cconf.MaxIdleConns,
ConnMaxIdleTime: time.Duration(cconf.ConnMaxIdleTime) * time.Second,
ConnMaxLifetime: time.Duration(cconf.ConnMaxLifetime) * time.Second,
DisableIndentity: cconf.DisableIndentity,
IdentitySuffix: cconf.IdentitySuffix,
}
clusterrds = redis.NewClusterClient(opt)
_, err := clusterrds.Ping(context.TODO()).Result()
if err != nil {
klog.Errorf("redis cluster ping err: %s", err)
return nil, err
}
mode = modeCluster
default:
return nil, apis.ErrUnsupportRedisServerMode
}
return &Dao{
rds: rds,
clusterrds: clusterrds,
mode: mode,
}, nil
}
func (dao *Dao) Close() error {
switch dao.mode {
case modeStandalone, modeSentinel:
return dao.rds.Close()
case modeCluster:
return dao.clusterrds.Close()
}
return nil
}

View File

@@ -0,0 +1,29 @@
package repo
import (
"context"
"strconv"
"k8s.io/klog/v2"
)
const (
edgesKey = "frontier:edges"
)
func (dao *Dao) GetAllEdgeIDs() ([]uint64, error) {
results, err := dao.rds.HGetAll(context.TODO(), edgesKey).Result()
if err != nil {
return nil, err
}
edgeIDs := []uint64{}
for k, _ := range results {
edgeID, err := strconv.ParseUint(k, 10, 64)
if err != nil {
klog.Errorf("dao get all edgeIDs err: %s", err)
return nil, err
}
edgeIDs = append(edgeIDs, edgeID)
}
return edgeIDs, nil
}

View File

@@ -0,0 +1 @@
package repo