frontlas: update redis model

This commit is contained in:
singchia
2024-05-16 15:13:08 +08:00
parent 8517dff9ac
commit 2f9ca84dbe
13 changed files with 291 additions and 192 deletions

View File

@@ -26,30 +26,36 @@ frontlas-linux:
.PHONY: examples .PHONY: examples
examples: examples:
make -C examples make -C examples
mv examples/iclm/bin/* ./bin/
# clean # clean
.PHONY: clean .PHONY: clean
clean: clean:
rm ./bin/frontier || true rm ./bin/* || true
rm ./bin/frontlas || true
make clean -C examples make clean -C examples
make clean -C test/bench make clean -C test/bench
# install # install
.PHONY: install-frontier .PHONY: install-frontier
install-frontier: install-frontier: frontier
install -m 0755 -d $(DESTDIR)$(BINDIR) install -m 0755 -d $(DESTDIR)$(BINDIR)
install -m 0755 -d $(DESTDIR)$(CONFDIR) install -m 0755 -d $(DESTDIR)$(CONFDIR)
install -m 0755 ./bin/frontier $(DESTDIR)$(BINDIR) install -m 0755 ./bin/frontier $(DESTDIR)$(BINDIR)
install -m 0755 ./etc/frontier.yaml $(DESTDIR)$(CONFDIR) install -m 0755 ./etc/frontier.yaml $(DESTDIR)$(CONFDIR)
.PHONY: install-frontlas .PHONY: install-frontlas
install-frontlas: install-frontlas: frontlas
install -m 0755 -d $(DESTDIR)$(BINDIR) install -m 0755 -d $(DESTDIR)$(BINDIR)
install -m 0755 -d $(DESTDIR)$(CONFDIR) install -m 0755 -d $(DESTDIR)$(CONFDIR)
install -m 0755 ./bin/frontlas $(DESTDIR)$(BINDIR) install -m 0755 ./bin/frontlas $(DESTDIR)$(BINDIR)
install -m 0755 ./etc/frontlas.yaml $(DESTDIR)$(CONFDIR) install -m 0755 ./etc/frontlas.yaml $(DESTDIR)$(CONFDIR)
.PHONY: install-example-iclm
install-example-iclm: examples
install -m 0755 -d $(DESTDIR)$(BINDIR)
install -m 0755 ./bin/iclm_service $(DESTDIR)$(BINDIR)
install -m 0755 ./bin/iclm_edge $(DESTDIR)$(BINDIR)
# image # image
.PHONY: image-frontier .PHONY: image-frontier
image-frontier: image-frontier:
@@ -67,6 +73,10 @@ image-gen-api:
image-gen-swagger: image-gen-swagger:
docker buildx build -t frontier-gen-swagger:${VERSION} -f images/Dockerfile.controlplane-swagger . docker buildx build -t frontier-gen-swagger:${VERSION} -f images/Dockerfile.controlplane-swagger .
.PHONY: image-example-iclm
image-example-iclm:
docker buildx build -t ${REGISTRY}/iclm_service:${VERSION} -f images/Dockerfile.example_iclm_service .
# push # push
.PHONY: push .PHONY: push
push: push-frontier push-frontlas push: push-frontier push-frontlas
@@ -79,6 +89,10 @@ push-frontier:
push-frontlas: push-frontlas:
docker push ${REGISTRY}/frontlas:${VERSION} docker push ${REGISTRY}/frontlas:${VERSION}
.PHONY: push-example-iclm
push-example-iclm:
docker push ${REGISTRY}/iclm_service:${VERSION}
# container # container
.PHONY: container .PHONY: container
container: container-frontier container-frontlas container: container-frontier container-frontlas

View File

@@ -243,10 +243,15 @@ func (service *clusterServiceEnd) newServiceEnd(addr string) (*serviceEnd, error
OptionServiceDelegate(service.serviceOption.delegate), OptionServiceDelegate(service.serviceOption.delegate),
OptionServiceName(service.serviceOption.service), OptionServiceName(service.serviceOption.service),
OptionServiceReceiveTopics(service.serviceOption.topics), OptionServiceReceiveTopics(service.serviceOption.topics),
OptionServiceTimer(service.serviceOption.tmr)) OptionServiceTimer(service.serviceOption.tmr),
OptionServiceID(service.serviceOption.serviceID))
if err != nil { if err != nil {
return nil, err return nil, err
} }
if service.serviceOption.serviceID == 0 {
// record serviceID for later using
service.serviceOption.serviceID = serviceEnd.ClientID()
}
go func() { go func() {
for { for {
st, err := serviceEnd.AcceptStream() st, err := serviceEnd.AcceptStream()

View File

@@ -29,6 +29,9 @@ func newServiceEnd(dialer client.Dialer, opts ...ServiceOption) (*serviceEnd, er
if sopt.logger != nil { if sopt.logger != nil {
sopts.SetLog(sopt.logger) sopts.SetLog(sopt.logger)
} }
if sopt.serviceID != 0 {
sopts.SetClientID(sopt.serviceID)
}
// meta // meta
meta := &apis.Meta{} meta := &apis.Meta{}
if sopt.topics != nil { if sopt.topics != nil {

View File

@@ -21,6 +21,7 @@ type serviceOption struct {
service string service string
// delegate to know online offline stuff // delegate to know online offline stuff
delegate Delegate delegate Delegate
serviceID uint64
} }
type ServiceOption func(*serviceOption) type ServiceOption func(*serviceOption)
@@ -56,3 +57,9 @@ func OptionServiceDelegate(delegate Delegate) ServiceOption {
opt.delegate = delegate opt.delegate = delegate
} }
} }
func OptionServiceID(serviceID uint64) ServiceOption {
return func(opt *serviceOption) {
opt.serviceID = serviceID
}
}

View File

@@ -1,3 +1,6 @@
PREFIX?=/usr
BINDIR?=$(PREFIX)/bin
GOHOSTOS?=$(shell go env GOHOSTOS) GOHOSTOS?=$(shell go env GOHOSTOS)
GOARCH?=$(shell go env GOARCH) GOARCH?=$(shell go env GOARCH)
@@ -8,10 +11,12 @@ all: iclm_service iclm_edge
clean: clean:
rm iclm_service iclm_edge rm iclm_service iclm_edge
.PHONY: iclm_service
iclm_service: service/*.go iclm_service: service/*.go
CGO_ENABLED=0 GOOS=$(GOHOSTOS) GOARCH=$(GOARCH) \ CGO_ENABLED=0 GOOS=$(GOHOSTOS) GOARCH=$(GOARCH) \
go build -trimpath -ldflags "-s -w" -o iclm_service service/*.go go build -trimpath -ldflags "-s -w" -o ./bin/iclm_service service/*.go
.PHONY: iclm_edge
iclm_edge: edge/*.go iclm_edge: edge/*.go
CGO_ENABLED=0 GOOS=$(GOHOSTOS) GOARCH=$(GOARCH) \ CGO_ENABLED=0 GOOS=$(GOHOSTOS) GOARCH=$(GOARCH) \
go build -trimpath -ldflags "-s -w" -o iclm_edge edge/*.go go build -trimpath -ldflags "-s -w" -o ./bin/iclm_edge edge/*.go

View File

@@ -18,6 +18,7 @@ import (
"time" "time"
armlog "github.com/jumboframes/armorigo/log" armlog "github.com/jumboframes/armorigo/log"
"github.com/jumboframes/armorigo/sigaction"
"github.com/singchia/frontier/api/dataplane/v1/service" "github.com/singchia/frontier/api/dataplane/v1/service"
"github.com/singchia/geminio" "github.com/singchia/geminio"
"github.com/spf13/pflag" "github.com/spf13/pflag"
@@ -33,6 +34,8 @@ var (
topicSlice []string topicSlice []string
printmessage *bool printmessage *bool
srv service.Service srv service.Service
sig *sigaction.Signal
nostdin *bool
labels map[string]int64 = map[string]int64{} labels map[string]int64 = map[string]int64{}
labelsMtx sync.RWMutex labelsMtx sync.RWMutex
@@ -79,6 +82,7 @@ func main() {
topics := pflag.String("topics", "", "topics to receive message, empty means without consuming") topics := pflag.String("topics", "", "topics to receive message, empty means without consuming")
methods := pflag.String("methods", "", "method name, support echo") methods := pflag.String("methods", "", "method name, support echo")
printmessage = pflag.Bool("printmessage", false, "whether print message out") printmessage = pflag.Bool("printmessage", false, "whether print message out")
nostdin = pflag.Bool("nostdin", false, "nostdin mode, no stdin will be accepted")
stats := pflag.Bool("stats", false, "print statistics or not") stats := pflag.Bool("stats", false, "print statistics or not")
pflag.Parse() pflag.Parse()
@@ -160,7 +164,7 @@ func main() {
} }
if err != nil { if err != nil {
fmt.Println("\n> receive err:", err) fmt.Println("\n> receive err:", err)
fmt.Print(">>> ") printPreempt()
continue continue
} }
msg.Done() msg.Done()
@@ -173,7 +177,7 @@ func main() {
} }
if *printmessage { if *printmessage {
fmt.Printf("\n> receive msg, edgeID: %d streamID: %d data: %s\n", msg.ClientID(), msg.StreamID(), string(value)) fmt.Printf("\n> receive msg, edgeID: %d streamID: %d data: %s\n", msg.ClientID(), msg.StreamID(), string(value))
fmt.Print(">>> ") printPreempt()
} }
} }
}() }()
@@ -189,14 +193,15 @@ func main() {
continue continue
} }
fmt.Println("\n> accept stream", st.ClientID(), st.StreamID()) fmt.Println("\n> accept stream", st.ClientID(), st.StreamID())
fmt.Print(">>> ") printPreempt()
sns.Store(strconv.FormatUint(st.StreamID(), 10), st) sns.Store(strconv.FormatUint(st.StreamID(), 10), st)
go handleStream(st) go handleStream(st)
} }
}() }()
if !*nostdin {
cursor := "1" cursor := "1"
fmt.Print(">>> ") printPreempt()
// the command-line protocol // the command-line protocol
// 1. close // 1. close
@@ -369,10 +374,13 @@ func main() {
if cursor != "1" { if cursor != "1" {
fmt.Printf("[%20s] >>> ", cursor) fmt.Printf("[%20s] >>> ", cursor)
} else { } else {
fmt.Print(">>> ") printPreempt()
}
} }
} }
sig = sigaction.NewSignal()
sig.Wait(context.TODO())
END: END:
time.Sleep(10 * time.Second) time.Sleep(10 * time.Second)
} }
@@ -383,7 +391,7 @@ func handleStream(stream geminio.Stream) {
msg, err := stream.Receive(context.TODO()) msg, err := stream.Receive(context.TODO())
if err != nil { if err != nil {
fmt.Printf("\n> streamID: %d receive err: %s\n", stream.StreamID(), err) fmt.Printf("\n> streamID: %d receive err: %s\n", stream.StreamID(), err)
fmt.Print(">>> ") printPreempt()
return return
} }
msg.Done() msg.Done()
@@ -396,7 +404,7 @@ func handleStream(stream geminio.Stream) {
} }
if *printmessage { if *printmessage {
fmt.Printf("\n> receive msg, edgeID: %d streamID: %d data: %s\n", msg.ClientID(), msg.StreamID(), string(value)) fmt.Printf("\n> receive msg, edgeID: %d streamID: %d data: %s\n", msg.ClientID(), msg.StreamID(), string(value))
fmt.Print(">>> ") printPreempt()
} }
} }
}() }()
@@ -406,12 +414,12 @@ func handleStream(stream geminio.Stream) {
_, err := stream.Read(data) _, err := stream.Read(data)
if err != nil { if err != nil {
fmt.Printf("\n> streamID: %d read err: %s\n", stream.StreamID(), err) fmt.Printf("\n> streamID: %d read err: %s\n", stream.StreamID(), err)
fmt.Print(">>> ") printPreempt()
return return
} }
fmt.Println("> read data:", stream.ClientID(), fmt.Println("> read data:", stream.ClientID(),
string(data)) string(data))
fmt.Print(">>> ") printPreempt()
} }
}() }()
go func() { go func() {
@@ -452,14 +460,14 @@ func getID(meta []byte) (uint64, error) {
func online(edgeID uint64, meta []byte, addr net.Addr) error { func online(edgeID uint64, meta []byte, addr net.Addr) error {
fmt.Printf("\n> online, edgeID: %d, addr: %s\n", edgeID, addr.String()) fmt.Printf("\n> online, edgeID: %d, addr: %s\n", edgeID, addr.String())
fmt.Print(">>> ") printPreempt()
edges.Store(edgeID, struct{}{}) edges.Store(edgeID, struct{}{})
return nil return nil
} }
func offline(edgeID uint64, meta []byte, addr net.Addr) error { func offline(edgeID uint64, meta []byte, addr net.Addr) error {
fmt.Printf("\n> offline, edgeID: %d, addr: %s\n", edgeID, addr.String()) fmt.Printf("\n> offline, edgeID: %d, addr: %s\n", edgeID, addr.String())
fmt.Print(">>> ") printPreempt()
edges.Delete(edgeID) edges.Delete(edgeID)
return nil return nil
} }
@@ -474,7 +482,13 @@ func echo(ctx context.Context, req geminio.Request, rsp geminio.Response) {
} }
if *printmessage { if *printmessage {
fmt.Printf("\n> rpc called, method: %s edgeID: %d streamID: %d data: %s\n", "echo", req.ClientID(), req.StreamID(), string(value)) fmt.Printf("\n> rpc called, method: %s edgeID: %d streamID: %d data: %s\n", "echo", req.ClientID(), req.StreamID(), string(value))
fmt.Print(">>> ") printPreempt()
} }
rsp.SetData(value) rsp.SetData(value)
} }
func printPreempt() {
if !*nostdin {
printPreempt()
}
}

View File

@@ -170,7 +170,7 @@ func (fm *FrontierManager) ServiceOffline(ctx context.Context, req geminio.Reque
rsp.SetError(err) rsp.SetError(err)
return return
} }
err = fm.repo.DeleteService(serviceOffline.ServiceID) err = fm.repo.DeleteService(serviceOffline.ServiceID, serviceOffline.FrontierID)
if err != nil { if err != nil {
klog.Errorf("frontier manager service offline, delete service err: %s, serviceID: %d", err, serviceOffline.ServiceID) klog.Errorf("frontier manager service offline, delete service err: %s, serviceID: %d", err, serviceOffline.ServiceID)
rsp.SetError(err) rsp.SetError(err)

View File

@@ -19,7 +19,10 @@ const (
) )
//go:embed lua/service_delete.lua //go:embed lua/service_delete.lua
var deleteFrontierScript string var deleteServiceScript string
//go:embed lua/service_create.lua
var createServiceScript string
func (dao *Dao) GetAllServiceIDs() ([]uint64, error) { func (dao *Dao) GetAllServiceIDs() ([]uint64, error) {
results, err := dao.rds.Keys(context.TODO(), frontiersKeyPrefixAll).Result() results, err := dao.rds.Keys(context.TODO(), frontiersKeyPrefixAll).Result()
@@ -133,17 +136,15 @@ func (dao *Dao) SetServiceAndAlive(serviceID uint64, service *Service, expiratio
klog.Errorf("dao set service and alive, json marshal err: %s", err) klog.Errorf("dao set service and alive, json marshal err: %s", err)
return err return err
} }
_, err = dao.rds.Eval(context.TODO(), createServiceScript,
pipeliner := dao.rds.TxPipeline() []string{
// service meta TODO expiration to custom getServiceKey(serviceID),
pipeliner.Set(context.TODO(), getServiceKey(serviceID), serviceData, service.FrontierID,
time.Duration(dao.conf.FrontierManager.Expiration.ServiceMeta)*time.Second) getAliveServiceKey(serviceID),
// alive getFrontierKey(service.FrontierID)},
pipeliner.Set(context.TODO(), getAliveServiceKey(serviceID), 1, expiration) serviceData,
// frontier service_count time.Duration(dao.conf.FrontierManager.Expiration.ServiceMeta),
pipeliner.HIncrBy(context.TODO(), getFrontierKey(service.FrontierID), "service_count", 1) int(expiration.Seconds())).Result()
_, err = pipeliner.Exec(context.TODO())
if err != nil { if err != nil {
klog.Errorf("dao set service and alive, pipeliner exec err: %s", err) klog.Errorf("dao set service and alive, pipeliner exec err: %s", err)
return err return err
@@ -172,9 +173,13 @@ func (dao *Dao) ExpireService(serviceID uint64, expiration time.Duration) error
return nil return nil
} }
func (dao *Dao) DeleteService(serviceID uint64) error { func (dao *Dao) DeleteService(serviceID uint64, frontierID string) error {
_, err := dao.rds.Eval(context.TODO(), deleteFrontierScript, _, err := dao.rds.Eval(context.TODO(), deleteServiceScript,
[]string{getServiceKey(serviceID), getAliveServiceKey(serviceID), frontiersKeyPrefix}).Result() []string{
getServiceKey(serviceID),
frontierID,
getAliveServiceKey(serviceID),
frontiersKeyPrefix}).Result()
if err != nil { if err != nil {
klog.Errorf("dao delete service, eval err: %s", err) klog.Errorf("dao delete service, eval err: %s", err)
return err return err

View File

@@ -1,19 +1,20 @@
local service_key = KEYS[1] local service_key = KEYS[1]
local service_alive_key = KEYS[2] local frontier_id = KEYS[2]
local frontier_key_prefix = KEYS[3] local service_alive_key = KEYS[3]
local frontier_key_prefix = KEYS[4]
-- get frontier and it's frontier_id -- decrement the frontier_count in frontier
local frontier = redis.call("GET", service_key) local frontier_key = frontier_key_prefix .. tostring(frontier_id)
if frontier then redis.call("HINCRBY", frontier_key, "service_count", -1)
local value = cjson.decode(frontier)
local frontier_id = value['frontier_id'] -- remove service side frontier
if frontier_id then redis.call("HDEL", service_key, frontier_id)
-- decrement the frontier_count in frontier
local frontier_key = frontier_key_prefix .. tostring(frontier_id)
redis.call("HINCRBY", frontier_key, "service_count", -1)
end
end
-- remove frontier alive -- remove frontier alive
local ret = redis.call("HLEN", service_key)
if ret ~= 0 then
return 0
end
-- service offline all frontiers
local ret = redis.call("DEL", service_alive_key) local ret = redis.call("DEL", service_alive_key)
return ret return ret

View File

@@ -1,5 +1,10 @@
package repo package repo
import (
"bytes"
"fmt"
)
// key: serviceID; value: Service // key: serviceID; value: Service
type Service struct { type Service struct {
Service string `json:"service"` Service string `json:"service"`
@@ -7,3 +12,25 @@ type Service struct {
Addr string `json:"addr"` Addr string `json:"addr"`
UpdateTime int64 `json:"update_time"` UpdateTime int64 `json:"update_time"`
} }
func (service *Service) MarshalJSON() ([]byte, error) {
buffer := bytes.NewBufferString("{")
_, err := buffer.WriteString(fmt.Sprintf("service: %s, ", service.Service))
if err != nil {
return nil, err
}
_, err = buffer.WriteString(fmt.Sprintf("frontierID: %s, ", service.FrontierID))
if err != nil {
return nil, err
}
_, err = buffer.WriteString(fmt.Sprintf("addr: %s, ", service.Addr))
if err != nil {
return nil, err
}
_, err = buffer.WriteString(fmt.Sprintf("update_time: %d", service.UpdateTime))
if err != nil {
return nil, err
}
buffer.WriteString("}")
return buffer.Bytes(), nil
}

View File

@@ -2,9 +2,23 @@ apiVersion: frontier.singchia.io/v1alpha1
kind: FrontierCluster kind: FrontierCluster
metadata: metadata:
labels: labels:
app.kubernetes.io/name: frontier app.kubernetes.io/name: frontiercluster
app.kubernetes.io/managed-by: kustomize app.kubernetes.io/managed-by: kustomize
name: frontiercluster-sample name: frontiercluster
spec: spec:
# TODO(user): Add fields here frontier:
replica: "1" replicas: 1
servicebound:
port: 30011
edgebound:
port: 30012
frontlas:
replicas: 1
controlplane:
port: 40011
redis:
addrs:
- rfs-redisfailover:26379
password: your-password
masterName: mymaster
redisType: sentinel

View File

@@ -5,12 +5,16 @@ go 1.21
require ( require (
github.com/onsi/ginkgo/v2 v2.14.0 github.com/onsi/ginkgo/v2 v2.14.0
github.com/onsi/gomega v1.30.0 github.com/onsi/gomega v1.30.0
github.com/stretchr/testify v1.8.4
k8s.io/apimachinery v0.29.0 k8s.io/apimachinery v0.29.0
k8s.io/client-go v0.29.0 k8s.io/client-go v0.29.0
sigs.k8s.io/controller-runtime v0.17.2 sigs.k8s.io/controller-runtime v0.17.2
) )
require github.com/hashicorp/errwrap v1.0.0 // indirect require (
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
)
require ( require (
github.com/beorn7/perks v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect

View File

@@ -269,7 +269,7 @@ func (r *FrontierClusterReconciler) ensureFrontlasDeployment(ctx context.Context
SetServiceName(service). SetServiceName(service).
SetLabels(labels). SetLabels(labels).
SetMatchLabels(labels). SetMatchLabels(labels).
SetReplicas(fc.FrontierReplicas()). SetReplicas(fc.FrontlasReplicas()).
SetPodTemplateSpec(podTemplateSpec). SetPodTemplateSpec(podTemplateSpec).
SetOwnerReference(fc.GetOwnerReferences()). SetOwnerReference(fc.GetOwnerReferences()).
Build() Build()