From 2f9ca84dbec438c8966c4ed0741e5744f81ed92d Mon Sep 17 00:00:00 2001 From: singchia Date: Thu, 16 May 2024 15:13:08 +0800 Subject: [PATCH] frontlas: update redis model --- Makefile | 22 +- .../v1/service/cluster_service_end.go | 7 +- api/dataplane/v1/service/service_end.go | 3 + api/dataplane/v1/service/service_option.go | 9 +- examples/iclm/Makefile | 9 +- examples/iclm/service/service.go | 312 +++++++++--------- pkg/frontlas/frontierbound/frontier_event.go | 2 +- pkg/frontlas/repo/dao_service.go | 35 +- pkg/frontlas/repo/lua/service_delete.lua | 27 +- pkg/frontlas/repo/model_service.go | 27 ++ .../frontier_v1alpha1_frontiercluster.yaml | 22 +- pkg/operator/go.mod | 6 +- .../controller/frontiercluster_deployment.go | 2 +- 13 files changed, 291 insertions(+), 192 deletions(-) diff --git a/Makefile b/Makefile index d6c4a84..30de139 100644 --- a/Makefile +++ b/Makefile @@ -26,30 +26,36 @@ frontlas-linux: .PHONY: examples examples: make -C examples + mv examples/iclm/bin/* ./bin/ # clean .PHONY: clean clean: - rm ./bin/frontier || true - rm ./bin/frontlas || true + rm ./bin/* || true make clean -C examples make clean -C test/bench # install .PHONY: install-frontier -install-frontier: +install-frontier: frontier install -m 0755 -d $(DESTDIR)$(BINDIR) install -m 0755 -d $(DESTDIR)$(CONFDIR) install -m 0755 ./bin/frontier $(DESTDIR)$(BINDIR) install -m 0755 ./etc/frontier.yaml $(DESTDIR)$(CONFDIR) .PHONY: install-frontlas -install-frontlas: +install-frontlas: frontlas install -m 0755 -d $(DESTDIR)$(BINDIR) install -m 0755 -d $(DESTDIR)$(CONFDIR) install -m 0755 ./bin/frontlas $(DESTDIR)$(BINDIR) install -m 0755 ./etc/frontlas.yaml $(DESTDIR)$(CONFDIR) +.PHONY: install-example-iclm +install-example-iclm: examples + install -m 0755 -d $(DESTDIR)$(BINDIR) + install -m 0755 ./bin/iclm_service $(DESTDIR)$(BINDIR) + install -m 0755 ./bin/iclm_edge $(DESTDIR)$(BINDIR) + # image .PHONY: image-frontier image-frontier: @@ -67,6 +73,10 @@ image-gen-api: image-gen-swagger: docker buildx build -t frontier-gen-swagger:${VERSION} -f images/Dockerfile.controlplane-swagger . +.PHONY: image-example-iclm +image-example-iclm: + docker buildx build -t ${REGISTRY}/iclm_service:${VERSION} -f images/Dockerfile.example_iclm_service . + # push .PHONY: push push: push-frontier push-frontlas @@ -79,6 +89,10 @@ push-frontier: push-frontlas: docker push ${REGISTRY}/frontlas:${VERSION} +.PHONY: push-example-iclm +push-example-iclm: + docker push ${REGISTRY}/iclm_service:${VERSION} + # container .PHONY: container container: container-frontier container-frontlas diff --git a/api/dataplane/v1/service/cluster_service_end.go b/api/dataplane/v1/service/cluster_service_end.go index a02acbb..da29ae9 100644 --- a/api/dataplane/v1/service/cluster_service_end.go +++ b/api/dataplane/v1/service/cluster_service_end.go @@ -243,10 +243,15 @@ func (service *clusterServiceEnd) newServiceEnd(addr string) (*serviceEnd, error OptionServiceDelegate(service.serviceOption.delegate), OptionServiceName(service.serviceOption.service), OptionServiceReceiveTopics(service.serviceOption.topics), - OptionServiceTimer(service.serviceOption.tmr)) + OptionServiceTimer(service.serviceOption.tmr), + OptionServiceID(service.serviceOption.serviceID)) if err != nil { return nil, err } + if service.serviceOption.serviceID == 0 { + // record serviceID for later using + service.serviceOption.serviceID = serviceEnd.ClientID() + } go func() { for { st, err := serviceEnd.AcceptStream() diff --git a/api/dataplane/v1/service/service_end.go b/api/dataplane/v1/service/service_end.go index ea10490..8ac0542 100644 --- a/api/dataplane/v1/service/service_end.go +++ b/api/dataplane/v1/service/service_end.go @@ -29,6 +29,9 @@ func newServiceEnd(dialer client.Dialer, opts ...ServiceOption) (*serviceEnd, er if sopt.logger != nil { sopts.SetLog(sopt.logger) } + if sopt.serviceID != 0 { + sopts.SetClientID(sopt.serviceID) + } // meta meta := &apis.Meta{} if sopt.topics != nil { diff --git a/api/dataplane/v1/service/service_option.go b/api/dataplane/v1/service/service_option.go index 5184e10..f609372 100644 --- a/api/dataplane/v1/service/service_option.go +++ b/api/dataplane/v1/service/service_option.go @@ -20,7 +20,8 @@ type serviceOption struct { // to tell frontier what service we are service string // delegate to know online offline stuff - delegate Delegate + delegate Delegate + serviceID uint64 } type ServiceOption func(*serviceOption) @@ -56,3 +57,9 @@ func OptionServiceDelegate(delegate Delegate) ServiceOption { opt.delegate = delegate } } + +func OptionServiceID(serviceID uint64) ServiceOption { + return func(opt *serviceOption) { + opt.serviceID = serviceID + } +} diff --git a/examples/iclm/Makefile b/examples/iclm/Makefile index 0ad7001..32a0e5a 100644 --- a/examples/iclm/Makefile +++ b/examples/iclm/Makefile @@ -1,3 +1,6 @@ +PREFIX?=/usr +BINDIR?=$(PREFIX)/bin + GOHOSTOS?=$(shell go env GOHOSTOS) GOARCH?=$(shell go env GOARCH) @@ -8,10 +11,12 @@ all: iclm_service iclm_edge clean: rm iclm_service iclm_edge +.PHONY: iclm_service iclm_service: service/*.go CGO_ENABLED=0 GOOS=$(GOHOSTOS) GOARCH=$(GOARCH) \ - go build -trimpath -ldflags "-s -w" -o iclm_service service/*.go + go build -trimpath -ldflags "-s -w" -o ./bin/iclm_service service/*.go +.PHONY: iclm_edge iclm_edge: edge/*.go CGO_ENABLED=0 GOOS=$(GOHOSTOS) GOARCH=$(GOARCH) \ - go build -trimpath -ldflags "-s -w" -o iclm_edge edge/*.go \ No newline at end of file + go build -trimpath -ldflags "-s -w" -o ./bin/iclm_edge edge/*.go \ No newline at end of file diff --git a/examples/iclm/service/service.go b/examples/iclm/service/service.go index b6b8341..b863b1e 100644 --- a/examples/iclm/service/service.go +++ b/examples/iclm/service/service.go @@ -18,6 +18,7 @@ import ( "time" armlog "github.com/jumboframes/armorigo/log" + "github.com/jumboframes/armorigo/sigaction" "github.com/singchia/frontier/api/dataplane/v1/service" "github.com/singchia/geminio" "github.com/spf13/pflag" @@ -33,6 +34,8 @@ var ( topicSlice []string printmessage *bool srv service.Service + sig *sigaction.Signal + nostdin *bool labels map[string]int64 = map[string]int64{} labelsMtx sync.RWMutex @@ -79,6 +82,7 @@ func main() { topics := pflag.String("topics", "", "topics to receive message, empty means without consuming") methods := pflag.String("methods", "", "method name, support echo") printmessage = pflag.Bool("printmessage", false, "whether print message out") + nostdin = pflag.Bool("nostdin", false, "nostdin mode, no stdin will be accepted") stats := pflag.Bool("stats", false, "print statistics or not") pflag.Parse() @@ -160,7 +164,7 @@ func main() { } if err != nil { fmt.Println("\n> receive err:", err) - fmt.Print(">>> ") + printPreempt() continue } msg.Done() @@ -173,7 +177,7 @@ func main() { } if *printmessage { fmt.Printf("\n> receive msg, edgeID: %d streamID: %d data: %s\n", msg.ClientID(), msg.StreamID(), string(value)) - fmt.Print(">>> ") + printPreempt() } } }() @@ -189,33 +193,34 @@ func main() { continue } fmt.Println("\n> accept stream", st.ClientID(), st.StreamID()) - fmt.Print(">>> ") + printPreempt() sns.Store(strconv.FormatUint(st.StreamID(), 10), st) go handleStream(st) } }() - cursor := "1" - fmt.Print(">>> ") + if !*nostdin { + cursor := "1" + printPreempt() - // the command-line protocol - // 1. close - // 2. quit - // 3. open {edgeID} - // 4. close {streamID} - // 5. switch {streamID} - // 6. publish {msg} #note to switch to stream first - // 7. publish {edgeID} {msg} - // 8. call {method} {req} #note to switch to stream first - // 9. call {edgeID} {method} {req} - scanner := bufio.NewScanner(os.Stdin) - for scanner.Scan() { - text := scanner.Text() - parts := strings.Split(text, " ") - switch len(parts) { - case 1: - if parts[0] == "help" { - fmt.Println(`the command-line protocol + // the command-line protocol + // 1. close + // 2. quit + // 3. open {edgeID} + // 4. close {streamID} + // 5. switch {streamID} + // 6. publish {msg} #note to switch to stream first + // 7. publish {edgeID} {msg} + // 8. call {method} {req} #note to switch to stream first + // 9. call {edgeID} {method} {req} + scanner := bufio.NewScanner(os.Stdin) + for scanner.Scan() { + text := scanner.Text() + parts := strings.Split(text, " ") + switch len(parts) { + case 1: + if parts[0] == "help" { + fmt.Println(`the command-line protocol 1. close 2. quit 3. open {edgeID} @@ -225,154 +230,157 @@ func main() { 7. publish {clientId} {msg} 8. call {method} {req} #note to switch to stream first 9. call {clientId} {method} {req}`) - goto NEXT - } - // 1. close - if parts[0] == "close" || parts[0] == "quit" { - srv.Close() - goto END - } - if parts[0] == "count" { - count := 0 - edges.Range(func(key, value interface{}) bool { - count++ - return true - }) - fmt.Println("> count:", count) - goto NEXT - } - case 2: - // 1. open {edgeID} - // 2. close {streamID} - // 3. switch {streamID} - // 4. publish {msg} - if parts[0] == "open" { - edgeID, err := strconv.ParseUint(parts[1], 10, 64) - if err != nil { - fmt.Println("> illegal edgeID", err, parts[1]) goto NEXT } - // 1. open edgeID - st, err := srv.OpenStream(context.TODO(), edgeID) - if err != nil { - fmt.Println("> open stream err", err) + // 1. close + if parts[0] == "close" || parts[0] == "quit" { + srv.Close() + goto END + } + if parts[0] == "count" { + count := 0 + edges.Range(func(key, value interface{}) bool { + count++ + return true + }) + fmt.Println("> count:", count) goto NEXT } - fmt.Println("> open stream success:", edgeID, st.StreamID()) - sns.Store(strconv.FormatUint(st.StreamID(), 10), st) - go handleStream(st) - goto NEXT - } - if parts[0] == "close" { - stream := parts[1] - sn, ok := sns.LoadAndDelete(stream) - if !ok { - fmt.Printf("> stream id: %s not found\n", stream) + case 2: + // 1. open {edgeID} + // 2. close {streamID} + // 3. switch {streamID} + // 4. publish {msg} + if parts[0] == "open" { + edgeID, err := strconv.ParseUint(parts[1], 10, 64) + if err != nil { + fmt.Println("> illegal edgeID", err, parts[1]) + goto NEXT + } + // 1. open edgeID + st, err := srv.OpenStream(context.TODO(), edgeID) + if err != nil { + fmt.Println("> open stream err", err) + goto NEXT + } + fmt.Println("> open stream success:", edgeID, st.StreamID()) + sns.Store(strconv.FormatUint(st.StreamID(), 10), st) + go handleStream(st) goto NEXT } - sn.(geminio.Stream).Close() - fmt.Println("> close stream success:", stream) - goto NEXT - } - if parts[0] == "switch" { - session := parts[1] - if session == "1" { + if parts[0] == "close" { + stream := parts[1] + sn, ok := sns.LoadAndDelete(stream) + if !ok { + fmt.Printf("> stream id: %s not found\n", stream) + goto NEXT + } + sn.(geminio.Stream).Close() + fmt.Println("> close stream success:", stream) + goto NEXT + } + if parts[0] == "switch" { + session := parts[1] + if session == "1" { + cursor = session + fmt.Println("> swith stream success:", session) + goto NEXT + } + _, ok := sns.Load(session) + if !ok { + fmt.Println("> swith stream failed, not found:", session) + goto NEXT + } cursor = session fmt.Println("> swith stream success:", session) goto NEXT } - _, ok := sns.Load(session) - if !ok { - fmt.Println("> swith stream failed, not found:", session) - goto NEXT - } - cursor = session - fmt.Println("> swith stream success:", session) - goto NEXT - } - if cursor != "1" && (parts[0] == "publish") { - sn, ok := sns.Load(cursor) - if !ok { - fmt.Printf("> stream: %s not found\n", cursor) - goto NEXT - } - stream := sn.(geminio.Stream) + if cursor != "1" && (parts[0] == "publish") { + sn, ok := sns.Load(cursor) + if !ok { + fmt.Printf("> stream: %s not found\n", cursor) + goto NEXT + } + stream := sn.(geminio.Stream) + if parts[0] == "publish" { + msg := stream.NewMessage([]byte(parts[1])) + err := stream.Publish(context.TODO(), msg) + if err != nil { + fmt.Println("> publish err:", err) + goto NEXT + } + fmt.Println("> publish success") + goto NEXT + } + } + case 3: + // 1. publish {edgeID} {msg} + // 2. call {method} {req} if switch to stream + if cursor != "1" { + // in stream + sn, ok := sns.Load(cursor) + if !ok { + fmt.Printf("> stream: %s not found\n", cursor) + goto NEXT + } + stream := sn.(geminio.Stream) + if parts[0] == "call" { + req := stream.NewRequest([]byte(parts[2])) + rsp, err := stream.Call(context.TODO(), string(parts[1]), req) + if err != nil { + fmt.Println("> call err:", err) + goto NEXT + } + fmt.Println("\n> call success, ret:", string(rsp.Data())) + goto NEXT + } + } if parts[0] == "publish" { - msg := stream.NewMessage([]byte(parts[1])) - err := stream.Publish(context.TODO(), msg) + edgeID, err := strconv.ParseUint(parts[1], 10, 64) if err != nil { - fmt.Println("> publish err:", err) + log.Println("> illegal edge id", err, parts[1]) + goto NEXT + } + msg := srv.NewMessage([]byte(parts[2])) + err = srv.Publish(context.TODO(), edgeID, msg) + if err != nil { + log.Println("> publish err:", err) goto NEXT } fmt.Println("> publish success") goto NEXT } - } - case 3: - // 1. publish {edgeID} {msg} - // 2. call {method} {req} if switch to stream - if cursor != "1" { - // in stream - sn, ok := sns.Load(cursor) - if !ok { - fmt.Printf("> stream: %s not found\n", cursor) - goto NEXT - } - stream := sn.(geminio.Stream) + case 4: + // call {edgeID} {method} {req} if parts[0] == "call" { - req := stream.NewRequest([]byte(parts[2])) - rsp, err := stream.Call(context.TODO(), string(parts[1]), req) + edgeID, err := strconv.ParseUint(parts[1], 10, 64) if err != nil { - fmt.Println("> call err:", err) + log.Println("> illegal edge id", err, parts[1]) goto NEXT } - fmt.Println("\n> call success, ret:", string(rsp.Data())) + req := srv.NewRequest([]byte(parts[3])) + rsp, err := srv.Call(context.TODO(), edgeID, parts[2], req) + if err != nil { + log.Println("> call err:", err) + goto NEXT + } + log.Println("> call success, ret:", string(rsp.Data())) goto NEXT } } - if parts[0] == "publish" { - edgeID, err := strconv.ParseUint(parts[1], 10, 64) - if err != nil { - log.Println("> illegal edge id", err, parts[1]) - goto NEXT - } - msg := srv.NewMessage([]byte(parts[2])) - err = srv.Publish(context.TODO(), edgeID, msg) - if err != nil { - log.Println("> publish err:", err) - goto NEXT - } - fmt.Println("> publish success") - goto NEXT + log.Println("illegal operation") + NEXT: + if cursor != "1" { + fmt.Printf("[%20s] >>> ", cursor) + } else { + printPreempt() } - case 4: - // call {edgeID} {method} {req} - if parts[0] == "call" { - edgeID, err := strconv.ParseUint(parts[1], 10, 64) - if err != nil { - log.Println("> illegal edge id", err, parts[1]) - goto NEXT - } - req := srv.NewRequest([]byte(parts[3])) - rsp, err := srv.Call(context.TODO(), edgeID, parts[2], req) - if err != nil { - log.Println("> call err:", err) - goto NEXT - } - log.Println("> call success, ret:", string(rsp.Data())) - goto NEXT - } - } - log.Println("illegal operation") - NEXT: - if cursor != "1" { - fmt.Printf("[%20s] >>> ", cursor) - } else { - fmt.Print(">>> ") } } + sig = sigaction.NewSignal() + sig.Wait(context.TODO()) END: time.Sleep(10 * time.Second) } @@ -383,7 +391,7 @@ func handleStream(stream geminio.Stream) { msg, err := stream.Receive(context.TODO()) if err != nil { fmt.Printf("\n> streamID: %d receive err: %s\n", stream.StreamID(), err) - fmt.Print(">>> ") + printPreempt() return } msg.Done() @@ -396,7 +404,7 @@ func handleStream(stream geminio.Stream) { } if *printmessage { fmt.Printf("\n> receive msg, edgeID: %d streamID: %d data: %s\n", msg.ClientID(), msg.StreamID(), string(value)) - fmt.Print(">>> ") + printPreempt() } } }() @@ -406,12 +414,12 @@ func handleStream(stream geminio.Stream) { _, err := stream.Read(data) if err != nil { fmt.Printf("\n> streamID: %d read err: %s\n", stream.StreamID(), err) - fmt.Print(">>> ") + printPreempt() return } fmt.Println("> read data:", stream.ClientID(), string(data)) - fmt.Print(">>> ") + printPreempt() } }() go func() { @@ -452,14 +460,14 @@ func getID(meta []byte) (uint64, error) { func online(edgeID uint64, meta []byte, addr net.Addr) error { fmt.Printf("\n> online, edgeID: %d, addr: %s\n", edgeID, addr.String()) - fmt.Print(">>> ") + printPreempt() edges.Store(edgeID, struct{}{}) return nil } func offline(edgeID uint64, meta []byte, addr net.Addr) error { fmt.Printf("\n> offline, edgeID: %d, addr: %s\n", edgeID, addr.String()) - fmt.Print(">>> ") + printPreempt() edges.Delete(edgeID) return nil } @@ -474,7 +482,13 @@ func echo(ctx context.Context, req geminio.Request, rsp geminio.Response) { } if *printmessage { fmt.Printf("\n> rpc called, method: %s edgeID: %d streamID: %d data: %s\n", "echo", req.ClientID(), req.StreamID(), string(value)) - fmt.Print(">>> ") + printPreempt() } rsp.SetData(value) } + +func printPreempt() { + if !*nostdin { + printPreempt() + } +} diff --git a/pkg/frontlas/frontierbound/frontier_event.go b/pkg/frontlas/frontierbound/frontier_event.go index 1110477..2fcc993 100644 --- a/pkg/frontlas/frontierbound/frontier_event.go +++ b/pkg/frontlas/frontierbound/frontier_event.go @@ -170,7 +170,7 @@ func (fm *FrontierManager) ServiceOffline(ctx context.Context, req geminio.Reque rsp.SetError(err) return } - err = fm.repo.DeleteService(serviceOffline.ServiceID) + err = fm.repo.DeleteService(serviceOffline.ServiceID, serviceOffline.FrontierID) if err != nil { klog.Errorf("frontier manager service offline, delete service err: %s, serviceID: %d", err, serviceOffline.ServiceID) rsp.SetError(err) diff --git a/pkg/frontlas/repo/dao_service.go b/pkg/frontlas/repo/dao_service.go index b531dd5..1799f6a 100644 --- a/pkg/frontlas/repo/dao_service.go +++ b/pkg/frontlas/repo/dao_service.go @@ -19,7 +19,10 @@ const ( ) //go:embed lua/service_delete.lua -var deleteFrontierScript string +var deleteServiceScript string + +//go:embed lua/service_create.lua +var createServiceScript string func (dao *Dao) GetAllServiceIDs() ([]uint64, error) { results, err := dao.rds.Keys(context.TODO(), frontiersKeyPrefixAll).Result() @@ -133,17 +136,15 @@ func (dao *Dao) SetServiceAndAlive(serviceID uint64, service *Service, expiratio klog.Errorf("dao set service and alive, json marshal err: %s", err) return err } - - pipeliner := dao.rds.TxPipeline() - // service meta TODO expiration to custom - pipeliner.Set(context.TODO(), getServiceKey(serviceID), serviceData, - time.Duration(dao.conf.FrontierManager.Expiration.ServiceMeta)*time.Second) - // alive - pipeliner.Set(context.TODO(), getAliveServiceKey(serviceID), 1, expiration) - // frontier service_count - pipeliner.HIncrBy(context.TODO(), getFrontierKey(service.FrontierID), "service_count", 1) - - _, err = pipeliner.Exec(context.TODO()) + _, err = dao.rds.Eval(context.TODO(), createServiceScript, + []string{ + getServiceKey(serviceID), + service.FrontierID, + getAliveServiceKey(serviceID), + getFrontierKey(service.FrontierID)}, + serviceData, + time.Duration(dao.conf.FrontierManager.Expiration.ServiceMeta), + int(expiration.Seconds())).Result() if err != nil { klog.Errorf("dao set service and alive, pipeliner exec err: %s", err) return err @@ -172,9 +173,13 @@ func (dao *Dao) ExpireService(serviceID uint64, expiration time.Duration) error return nil } -func (dao *Dao) DeleteService(serviceID uint64) error { - _, err := dao.rds.Eval(context.TODO(), deleteFrontierScript, - []string{getServiceKey(serviceID), getAliveServiceKey(serviceID), frontiersKeyPrefix}).Result() +func (dao *Dao) DeleteService(serviceID uint64, frontierID string) error { + _, err := dao.rds.Eval(context.TODO(), deleteServiceScript, + []string{ + getServiceKey(serviceID), + frontierID, + getAliveServiceKey(serviceID), + frontiersKeyPrefix}).Result() if err != nil { klog.Errorf("dao delete service, eval err: %s", err) return err diff --git a/pkg/frontlas/repo/lua/service_delete.lua b/pkg/frontlas/repo/lua/service_delete.lua index 7f41bd9..e97f50e 100644 --- a/pkg/frontlas/repo/lua/service_delete.lua +++ b/pkg/frontlas/repo/lua/service_delete.lua @@ -1,19 +1,20 @@ local service_key = KEYS[1] -local service_alive_key = KEYS[2] -local frontier_key_prefix = KEYS[3] +local frontier_id = KEYS[2] +local service_alive_key = KEYS[3] +local frontier_key_prefix = KEYS[4] --- get frontier and it's frontier_id -local frontier = redis.call("GET", service_key) -if frontier then - local value = cjson.decode(frontier) - local frontier_id = value['frontier_id'] - if frontier_id then - -- decrement the frontier_count in frontier - local frontier_key = frontier_key_prefix .. tostring(frontier_id) - redis.call("HINCRBY", frontier_key, "service_count", -1) - end -end +-- decrement the frontier_count in frontier +local frontier_key = frontier_key_prefix .. tostring(frontier_id) +redis.call("HINCRBY", frontier_key, "service_count", -1) + +-- remove service side frontier +redis.call("HDEL", service_key, frontier_id) -- remove frontier alive +local ret = redis.call("HLEN", service_key) +if ret ~= 0 then + return 0 +end +-- service offline all frontiers local ret = redis.call("DEL", service_alive_key) return ret \ No newline at end of file diff --git a/pkg/frontlas/repo/model_service.go b/pkg/frontlas/repo/model_service.go index b1f9e97..ec65cd5 100644 --- a/pkg/frontlas/repo/model_service.go +++ b/pkg/frontlas/repo/model_service.go @@ -1,5 +1,10 @@ package repo +import ( + "bytes" + "fmt" +) + // key: serviceID; value: Service type Service struct { Service string `json:"service"` @@ -7,3 +12,25 @@ type Service struct { Addr string `json:"addr"` UpdateTime int64 `json:"update_time"` } + +func (service *Service) MarshalJSON() ([]byte, error) { + buffer := bytes.NewBufferString("{") + _, err := buffer.WriteString(fmt.Sprintf("service: %s, ", service.Service)) + if err != nil { + return nil, err + } + _, err = buffer.WriteString(fmt.Sprintf("frontierID: %s, ", service.FrontierID)) + if err != nil { + return nil, err + } + _, err = buffer.WriteString(fmt.Sprintf("addr: %s, ", service.Addr)) + if err != nil { + return nil, err + } + _, err = buffer.WriteString(fmt.Sprintf("update_time: %d", service.UpdateTime)) + if err != nil { + return nil, err + } + buffer.WriteString("}") + return buffer.Bytes(), nil +} diff --git a/pkg/operator/config/samples/frontier_v1alpha1_frontiercluster.yaml b/pkg/operator/config/samples/frontier_v1alpha1_frontiercluster.yaml index f3c0e87..bffa5af 100644 --- a/pkg/operator/config/samples/frontier_v1alpha1_frontiercluster.yaml +++ b/pkg/operator/config/samples/frontier_v1alpha1_frontiercluster.yaml @@ -2,9 +2,23 @@ apiVersion: frontier.singchia.io/v1alpha1 kind: FrontierCluster metadata: labels: - app.kubernetes.io/name: frontier + app.kubernetes.io/name: frontiercluster app.kubernetes.io/managed-by: kustomize - name: frontiercluster-sample + name: frontiercluster spec: - # TODO(user): Add fields here - replica: "1" \ No newline at end of file + frontier: + replicas: 1 + servicebound: + port: 30011 + edgebound: + port: 30012 + frontlas: + replicas: 1 + controlplane: + port: 40011 + redis: + addrs: + - rfs-redisfailover:26379 + password: your-password + masterName: mymaster + redisType: sentinel \ No newline at end of file diff --git a/pkg/operator/go.mod b/pkg/operator/go.mod index 552f803..5b124ea 100644 --- a/pkg/operator/go.mod +++ b/pkg/operator/go.mod @@ -5,12 +5,16 @@ go 1.21 require ( github.com/onsi/ginkgo/v2 v2.14.0 github.com/onsi/gomega v1.30.0 + github.com/stretchr/testify v1.8.4 k8s.io/apimachinery v0.29.0 k8s.io/client-go v0.29.0 sigs.k8s.io/controller-runtime v0.17.2 ) -require github.com/hashicorp/errwrap v1.0.0 // indirect +require ( + github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect +) require ( github.com/beorn7/perks v1.0.1 // indirect diff --git a/pkg/operator/internal/controller/frontiercluster_deployment.go b/pkg/operator/internal/controller/frontiercluster_deployment.go index f098b84..2c97e47 100644 --- a/pkg/operator/internal/controller/frontiercluster_deployment.go +++ b/pkg/operator/internal/controller/frontiercluster_deployment.go @@ -269,7 +269,7 @@ func (r *FrontierClusterReconciler) ensureFrontlasDeployment(ctx context.Context SetServiceName(service). SetLabels(labels). SetMatchLabels(labels). - SetReplicas(fc.FrontierReplicas()). + SetReplicas(fc.FrontlasReplicas()). SetPodTemplateSpec(podTemplateSpec). SetOwnerReference(fc.GetOwnerReferences()). Build()