diff --git a/Makefile b/Makefile index c4277cb..13c9401 100644 --- a/Makefile +++ b/Makefile @@ -5,6 +5,7 @@ CC?=cc all: frontier frontlas +# binary .PHONY: frontier frontier: CC=${CC} CGO_ENABLED=1 go build -trimpath -ldflags "-s -w" -o ./bin/frontier cmd/frontier/main.go @@ -21,10 +22,12 @@ frontlas: frontlas-linux: CC=${CC} GOOS=linux GOARCH=amd64 CGO_ENABLED=1 go build -trimpath -ldflags "-s -w" -o ./bin/frontlas cmd/frontlas/main.go +# example .PHONY: examples examples: make -C examples +# clean .PHONY: clean clean: rm ./bin/frontier || true @@ -32,6 +35,7 @@ clean: make clean -C examples make clean -C test/bench +# install .PHONY: install-frontier install-frontier: install -m 0755 -d $(DESTDIR)$(BINDIR) @@ -46,10 +50,15 @@ install-frontlas: install -m 0755 ./bin/frontier $(DESTDIR)$(BINDIR) install -m 0755 ./etc/frontier.yaml $(DESTDIR)$(CONFDIR) +# image .PHONY: image-frontier image-frontier: docker buildx build -t ${REGISTRY}/frontier:${VERSION} -f images/Dockerfile.frontier . +.PHONY: image-frontlas +image-frontlas: + docker buildx build -t ${REGISTRY}/frontlas:${VERSION} -f images/Dockerfile.frontlas . + .PHONY: image-gen-api image-gen-api: docker buildx build -t image-gen-api:${VERSION} -f images/Dockerfile.controlplane-api . @@ -58,11 +67,13 @@ image-gen-api: image-gen-swagger: docker buildx build -t frontier-gen-swagger:${VERSION} -f images/Dockerfile.controlplane-swagger . +# container .PHONY: container-frontier container-frontier: docker rm -f frontier - docker run -d --name frontier -p 2431:2431 -p 2432:2432 frontier:${VERSION} --config /usr/conf/frontier.yaml -v 5 + docker run -d --name frontier -p 2431:2431 -p 2432:2432 frontier:${VERSION} --config /usr/conf/frontier.yaml -v 1 +# api .PHONY: api-frontier api-frontier: docker run --rm -v ${PWD}/api/controlplane/frontier/v1:/api/controlplane/frontier/v1 image-gen-api:${VERSION} @@ -71,6 +82,7 @@ api-frontier: api-frontlas: docker run --rm -v ${PWD}/api/controlplane/frontlas/v1:/api/controlplane/frontlas/v1 image-gen-api:${VERSION} +# bench .PHONY: bench bench: container-frontier make bench -C test/bench diff --git a/cmd/frontctl/main.go b/cmd/frontctl/main.go deleted file mode 100644 index 38dd16d..0000000 --- a/cmd/frontctl/main.go +++ /dev/null @@ -1,3 +0,0 @@ -package main - -func main() {} diff --git a/etc/frontlas.yaml b/etc/frontlas.yaml index 586dbe8..3fe0f22 100644 --- a/etc/frontlas.yaml +++ b/etc/frontlas.yaml @@ -14,6 +14,9 @@ frontier_plane: listen: network: tcp addr: 0.0.0.0:30021 + expiration: + service_meta: 30 + edge_meta: 30 redis: mode: standalone standalone: diff --git a/examples/iclm/edge/edge.go b/examples/iclm/edge/edge.go index d207413..f501a5f 100644 --- a/examples/iclm/edge/edge.go +++ b/examples/iclm/edge/edge.go @@ -36,7 +36,7 @@ func main() { address := pflag.String("address", "127.0.0.1:30012", "address to dial") loglevel := pflag.String("loglevel", "info", "log level, trace debug info warn error") meta := pflag.String("meta", "test", "meta to set on connection") - methods := pflag.String("methods", "", "method name, support echo, calculate") + methods := pflag.String("methods", "", "method name, support echo") label := pflag.String("label", "label-01", "label to message or rpc") pflag.Parse() @@ -64,6 +64,20 @@ func main() { if *methods != "" { methodSlice = strings.Split(*methods, ",") } + go func() { + time.Sleep(200 * time.Millisecond) + // register + for _, method := range methodSlice { + switch method { + case "echo": + err = cli.Register(context.TODO(), "echo", echo) + if err != nil { + fmt.Printf("\n> register echo err: %s\n", err) + return + } + } + } + }() // receive on edge go func() { @@ -73,7 +87,7 @@ func main() { return } if err != nil { - fmt.Println("> receive err:", err) + fmt.Println("\n> receive err:", err) fmt.Println(">>> ") continue } @@ -89,32 +103,17 @@ func main() { if err == io.EOF { return } else if err != nil { - fmt.Println("> accept stream err:", err) + fmt.Println("\n> accept stream err:", err) fmt.Print(">>> ") continue } - fmt.Println("> accept stream", st.StreamID()) + fmt.Println("\n> accept stream", st.StreamID()) fmt.Print(">>> ") sns.Store(strconv.FormatUint(st.StreamID(), 10), st) go handleStream(st) } }() - go func() { - time.Sleep(200 * time.Millisecond) - // register - for _, method := range methodSlice { - switch method { - case "echo": - err = cli.Register(context.TODO(), "echo", echo) - if err != nil { - armlog.Info("> register echo err:", err) - return - } - } - } - }() - cursor := "1" fmt.Print(">>> ") @@ -291,7 +290,7 @@ func handleStream(stream geminio.Stream) { for { msg, err := stream.Receive(context.TODO()) if err != nil { - fmt.Println("\n> stream receive err:", err) + fmt.Printf("\n> streamID: %d receive err: %s\n", stream.StreamID(), err) fmt.Print(">>> ") return } @@ -305,11 +304,11 @@ func handleStream(stream geminio.Stream) { data := make([]byte, 1024) _, err := stream.Read(data) if err != nil { - fmt.Println("\n> stream read err:", err) + fmt.Printf("\n> streamID: %d read err: %s\n", stream.StreamID(), err) fmt.Print(">>> ") return } - fmt.Println("> read data:", stream.ClientID(), + fmt.Println("\n> read data:", stream.ClientID(), string(data)) fmt.Print(">>> ") } @@ -331,7 +330,7 @@ func handleStream(stream geminio.Stream) { func echo(ctx context.Context, req geminio.Request, rsp geminio.Response) { edgeID := req.ClientID() - fmt.Printf("\n> call rpc, method: %s edgeID: %d streamID: %d data: %s\n", "echo", edgeID, req.StreamID(), string(req.Data())) + fmt.Printf("\n> rpc called, method: %s edgeID: %d streamID: %d data: %s\n", "echo", edgeID, req.StreamID(), string(req.Data())) fmt.Print(">>> ") rsp.SetData(req.Data()) } diff --git a/examples/iclm/service/service.go b/examples/iclm/service/service.go index bf067d5..f16419c 100644 --- a/examples/iclm/service/service.go +++ b/examples/iclm/service/service.go @@ -77,7 +77,7 @@ func main() { loglevel := pflag.String("loglevel", "info", "log level, trace debug info warn error") serviceName := pflag.String("service", "foo", "service name") topics := pflag.String("topics", "", "topics to receive message, empty means without consuming") - methods := pflag.String("methods", "", "method name, support echo, calculate") + methods := pflag.String("methods", "", "method name, support echo") printmessage = pflag.Bool("printmessage", false, "whether print message out") stats := pflag.Bool("stats", false, "print statistics or not") @@ -159,7 +159,7 @@ func main() { return } if err != nil { - fmt.Println("> receive err:", err) + fmt.Println("\n> receive err:", err) fmt.Print(">>> ") continue } @@ -185,10 +185,11 @@ func main() { if err == io.EOF { return } else if err != nil { - fmt.Println("> accept stream err:", err) + fmt.Println("\n> accept stream err:", err) continue } - fmt.Println("> accept stream", st.ClientID(), st.StreamID()) + fmt.Println("\n> accept stream", st.ClientID(), st.StreamID()) + fmt.Print(">>> ") sns.Store(strconv.FormatUint(st.StreamID(), 10), st) go handleStream(st) } @@ -381,7 +382,7 @@ func handleStream(stream geminio.Stream) { for { msg, err := stream.Receive(context.TODO()) if err != nil { - log.Println("\n> stream receive err:", err) + fmt.Printf("\n> streamID: %d receive err: %s\n", stream.StreamID(), err) fmt.Print(">>> ") return } @@ -404,7 +405,7 @@ func handleStream(stream geminio.Stream) { data := make([]byte, 1024) _, err := stream.Read(data) if err != nil { - log.Println("\n> read err:", err) + fmt.Printf("\n> streamID: %d read err: %s\n", stream.StreamID(), err) fmt.Print(">>> ") return } @@ -472,7 +473,7 @@ func echo(ctx context.Context, req geminio.Request, rsp geminio.Response) { value = ld.Data } if *printmessage { - fmt.Printf("\n> call rpc, method: %s edgeID: %d streamID: %d data: %s\n", "echo", edgeID, req.StreamID(), string(value)) + fmt.Printf("\n> rpc called, method: %s edgeID: %d streamID: %d data: %s\n", "echo", req.ClientID(), req.StreamID(), string(value)) fmt.Print(">>> ") } rsp.SetData(value) diff --git a/pkg/frontlas/config/config.go b/pkg/frontlas/config/config.go index 7d5613a..7a809a1 100644 --- a/pkg/frontlas/config/config.go +++ b/pkg/frontlas/config/config.go @@ -121,7 +121,11 @@ type Redis struct { } type FrontierManager struct { - Listen config.Listen `yaml:"listen"` + Listen config.Listen `yaml:"listen"` + Expiration struct { + ServiceMeta int `yaml:"service_meta"` // service meta expiration in redis, in seconds, default 86400s + EdgeMeta int `yaml:"edge_meta"` // edge meta expiration in redis, in seconds, default 86400s + } `yaml:"expiration,omitempty"` } type Configuration struct { @@ -218,6 +222,8 @@ func genDefaultConfig(writer io.Writer) error { conf.Redis.Standalone.Network = "tcp" conf.Redis.Standalone.Addr = "127.0.0.1:6379" conf.Redis.Standalone.DB = 0 + conf.FrontierManager.Expiration.EdgeMeta = 30 + conf.FrontierManager.Expiration.ServiceMeta = 30 data, err := yaml.Marshal(conf) if err != nil { diff --git a/pkg/frontlas/frontierbound/frontier_event.go b/pkg/frontlas/frontierbound/frontier_event.go index 6e5cebd..1110477 100644 --- a/pkg/frontlas/frontierbound/frontier_event.go +++ b/pkg/frontlas/frontierbound/frontier_event.go @@ -100,7 +100,8 @@ func (fm *FrontierManager) EdgeOnline(ctx context.Context, req geminio.Request, UpdateTime: time.Now().Unix(), }, edgeHeartbeatInterval) if err != nil { - klog.Errorf("frontier manager edge online, set edge and alive err: %s", err) + klog.Errorf("frontier manager edge online, set edge and alive err: %s, frontierID: %s, edgeID: %d", + err, edgeOnline.FrontierID, edgeOnline.EdgeID) rsp.SetError(err) return } @@ -116,7 +117,7 @@ func (fm *FrontierManager) EdgeOffline(ctx context.Context, req geminio.Request, } err = fm.repo.DeleteEdge(edgeOffline.EdgeID) if err != nil { - klog.Errorf("frontier manager edge offline, delete edge err: %s", err) + klog.Errorf("frontier manager edge offline, delete edge err: %s, edgeID: %d", err, edgeOffline.EdgeID) rsp.SetError(err) return } @@ -132,7 +133,7 @@ func (fm *FrontierManager) EdgeHeartbeat(ctx context.Context, req geminio.Reques } err = fm.repo.ExpireEdge(edgeHB.EdgeID, edgeHeartbeatInterval) if err != nil { - klog.Errorf("frontier manager edge heartbeat, expire edge err: %s", err) + klog.Errorf("frontier manager edge heartbeat, expire edge err: %s, edgeID: %d", err, edgeHB.EdgeID) rsp.SetError(err) return } @@ -154,7 +155,8 @@ func (fm *FrontierManager) ServiceOnline(ctx context.Context, req geminio.Reques UpdateTime: time.Now().Unix(), }, serviceHeartbeatInterval) if err != nil { - klog.Errorf("frontier manager service online, set service and alive err: %s", err) + klog.Errorf("frontier manager service online, set service and alive err: %s, frontierID: %s, serviceID: %d", + err, serviceOnline.FrontierID, serviceOnline.ServiceID) rsp.SetError(err) return } @@ -170,7 +172,7 @@ func (fm *FrontierManager) ServiceOffline(ctx context.Context, req geminio.Reque } err = fm.repo.DeleteService(serviceOffline.ServiceID) if err != nil { - klog.Errorf("frontier manager service offline, delete service err: %s", err) + klog.Errorf("frontier manager service offline, delete service err: %s, serviceID: %d", err, serviceOffline.ServiceID) rsp.SetError(err) return } @@ -186,7 +188,7 @@ func (fm *FrontierManager) ServiceHeartbeat(ctx context.Context, req geminio.Req } err = fm.repo.ExpireService(serviceHB.ServiceID, serviceHeartbeatInterval) if err != nil { - klog.Errorf("frontier manager service heartbeat, expire service err: %s", err) + klog.Errorf("frontier manager service heartbeat, expire service err: %s, serviceID: %d", err, serviceHB.ServiceID) rsp.SetError(err) return } @@ -203,7 +205,7 @@ func (fm *FrontierManager) FrontierStats(ctx context.Context, req geminio.Reques } err = fm.repo.SetFrontierCount(stats.FrontierID, stats.EdgeCount, stats.ServiceCount) if err != nil { - klog.Errorf("frontier manager frontier stats, set frontier count err: %s", err) + klog.Errorf("frontier manager frontier stats, set frontier count err: %s, frontierID: %s", err, stats.FrontierID) rsp.SetError(err) return } diff --git a/pkg/frontlas/repo/dao.go b/pkg/frontlas/repo/dao.go index 8f86348..9ff7cc0 100644 --- a/pkg/frontlas/repo/dao.go +++ b/pkg/frontlas/repo/dao.go @@ -35,42 +35,43 @@ type RDS interface { } type Dao struct { + conf *config.Configuration mode int rds RDS } -func NewDao(config *config.Configuration) (*Dao, error) { +func NewDao(conf *config.Configuration) (*Dao, error) { var ( rds RDS mode int ) - conf := config.Redis - switch conf.Mode { + redisconf := conf.Redis + switch redisconf.Mode { case "standalone": - sconf := conf.Standalone + sconf := redisconf.Standalone opt := &redis.Options{ Network: sconf.Network, Addr: sconf.Addr, - ClientName: conf.ClientName, - Protocol: conf.Protocol, - Username: conf.Username, - Password: conf.Password, + ClientName: redisconf.ClientName, + Protocol: redisconf.Protocol, + Username: redisconf.Username, + Password: redisconf.Password, DB: sconf.DB, - MaxRetries: conf.MaxRetries, - MinRetryBackoff: time.Duration(conf.MinRetryBackoff) * time.Second, - MaxRetryBackoff: time.Duration(conf.MaxRetryBackoff) * time.Second, - DialTimeout: time.Duration(conf.DialTimeout) * time.Second, - ReadTimeout: time.Duration(conf.ReadTimeout) * time.Second, - WriteTimeout: time.Duration(conf.WriteTimeout) * time.Second, - PoolFIFO: conf.PoolFIFO, - PoolSize: conf.PoolSize, - PoolTimeout: time.Duration(conf.PoolTimeout) * time.Second, - MinIdleConns: conf.MinIdleConns, - MaxIdleConns: conf.MaxIdleConns, - ConnMaxIdleTime: time.Duration(conf.ConnMaxIdleTime) * time.Second, - ConnMaxLifetime: time.Duration(conf.ConnMaxLifetime) * time.Second, - DisableIndentity: conf.DisableIndentity, - IdentitySuffix: conf.IdentitySuffix, + MaxRetries: redisconf.MaxRetries, + MinRetryBackoff: time.Duration(redisconf.MinRetryBackoff) * time.Second, + MaxRetryBackoff: time.Duration(redisconf.MaxRetryBackoff) * time.Second, + DialTimeout: time.Duration(redisconf.DialTimeout) * time.Second, + ReadTimeout: time.Duration(redisconf.ReadTimeout) * time.Second, + WriteTimeout: time.Duration(redisconf.WriteTimeout) * time.Second, + PoolFIFO: redisconf.PoolFIFO, + PoolSize: redisconf.PoolSize, + PoolTimeout: time.Duration(redisconf.PoolTimeout) * time.Second, + MinIdleConns: redisconf.MinIdleConns, + MaxIdleConns: redisconf.MaxIdleConns, + ConnMaxIdleTime: time.Duration(redisconf.ConnMaxIdleTime) * time.Second, + ConnMaxLifetime: time.Duration(redisconf.ConnMaxLifetime) * time.Second, + DisableIndentity: redisconf.DisableIndentity, + IdentitySuffix: redisconf.IdentitySuffix, } rds = redis.NewClient(opt) _, err := rds.Ping(context.TODO()).Result() @@ -81,33 +82,33 @@ func NewDao(config *config.Configuration) (*Dao, error) { mode = modeStandalone case "sentinel": - sconf := conf.Sentinel + sconf := redisconf.Sentinel opt := &redis.FailoverOptions{ MasterName: sconf.MasterName, SentinelAddrs: sconf.Addrs, - Protocol: conf.Protocol, - Username: conf.Username, - Password: conf.Password, + Protocol: redisconf.Protocol, + Username: redisconf.Username, + Password: redisconf.Password, DB: sconf.DB, - ClientName: conf.ClientName, + ClientName: redisconf.ClientName, RouteByLatency: sconf.RouteByLatency, RouteRandomly: sconf.RouteRandomly, ReplicaOnly: sconf.ReplicaOnly, UseDisconnectedReplicas: sconf.UseDisconnectedReplicas, - MinRetryBackoff: time.Duration(conf.MinRetryBackoff) * time.Second, - MaxRetryBackoff: time.Duration(conf.MaxRetryBackoff) * time.Second, - DialTimeout: time.Duration(conf.DialTimeout) * time.Second, - ReadTimeout: time.Duration(conf.ReadTimeout) * time.Second, - WriteTimeout: time.Duration(conf.WriteTimeout) * time.Second, - PoolFIFO: conf.PoolFIFO, - PoolSize: conf.PoolSize, - PoolTimeout: time.Duration(conf.PoolTimeout) * time.Second, - MinIdleConns: conf.MinIdleConns, - MaxIdleConns: conf.MaxIdleConns, - ConnMaxIdleTime: time.Duration(conf.ConnMaxIdleTime) * time.Second, - ConnMaxLifetime: time.Duration(conf.ConnMaxLifetime) * time.Second, - DisableIndentity: conf.DisableIndentity, - IdentitySuffix: conf.IdentitySuffix, + MinRetryBackoff: time.Duration(redisconf.MinRetryBackoff) * time.Second, + MaxRetryBackoff: time.Duration(redisconf.MaxRetryBackoff) * time.Second, + DialTimeout: time.Duration(redisconf.DialTimeout) * time.Second, + ReadTimeout: time.Duration(redisconf.ReadTimeout) * time.Second, + WriteTimeout: time.Duration(redisconf.WriteTimeout) * time.Second, + PoolFIFO: redisconf.PoolFIFO, + PoolSize: redisconf.PoolSize, + PoolTimeout: time.Duration(redisconf.PoolTimeout) * time.Second, + MinIdleConns: redisconf.MinIdleConns, + MaxIdleConns: redisconf.MaxIdleConns, + ConnMaxIdleTime: time.Duration(redisconf.ConnMaxIdleTime) * time.Second, + ConnMaxLifetime: time.Duration(redisconf.ConnMaxLifetime) * time.Second, + DisableIndentity: redisconf.DisableIndentity, + IdentitySuffix: redisconf.IdentitySuffix, } rds = redis.NewFailoverClient(opt) _, err := rds.Ping(context.TODO()).Result() @@ -118,30 +119,30 @@ func NewDao(config *config.Configuration) (*Dao, error) { mode = modeSentinel case "cluster": - cconf := conf.Cluster + cconf := redisconf.Cluster opt := &redis.ClusterOptions{ Addrs: cconf.Addrs, - Protocol: conf.Protocol, - Username: conf.Username, - Password: conf.Password, - ClientName: conf.ClientName, + Protocol: redisconf.Protocol, + Username: redisconf.Username, + Password: redisconf.Password, + ClientName: redisconf.ClientName, MaxRedirects: cconf.MaxRedirects, RouteByLatency: cconf.RouteByLatency, RouteRandomly: cconf.RouteRandomly, - MinRetryBackoff: time.Duration(conf.MinRetryBackoff) * time.Second, - MaxRetryBackoff: time.Duration(conf.MaxRetryBackoff) * time.Second, - DialTimeout: time.Duration(conf.DialTimeout) * time.Second, - ReadTimeout: time.Duration(conf.ReadTimeout) * time.Second, - WriteTimeout: time.Duration(conf.WriteTimeout) * time.Second, - PoolFIFO: conf.PoolFIFO, - PoolSize: conf.PoolSize, - PoolTimeout: time.Duration(conf.PoolTimeout) * time.Second, - MinIdleConns: conf.MinIdleConns, - MaxIdleConns: conf.MaxIdleConns, - ConnMaxIdleTime: time.Duration(conf.ConnMaxIdleTime) * time.Second, - ConnMaxLifetime: time.Duration(conf.ConnMaxLifetime) * time.Second, - DisableIndentity: conf.DisableIndentity, - IdentitySuffix: conf.IdentitySuffix, + MinRetryBackoff: time.Duration(redisconf.MinRetryBackoff) * time.Second, + MaxRetryBackoff: time.Duration(redisconf.MaxRetryBackoff) * time.Second, + DialTimeout: time.Duration(redisconf.DialTimeout) * time.Second, + ReadTimeout: time.Duration(redisconf.ReadTimeout) * time.Second, + WriteTimeout: time.Duration(redisconf.WriteTimeout) * time.Second, + PoolFIFO: redisconf.PoolFIFO, + PoolSize: redisconf.PoolSize, + PoolTimeout: time.Duration(redisconf.PoolTimeout) * time.Second, + MinIdleConns: redisconf.MinIdleConns, + MaxIdleConns: redisconf.MaxIdleConns, + ConnMaxIdleTime: time.Duration(redisconf.ConnMaxIdleTime) * time.Second, + ConnMaxLifetime: time.Duration(redisconf.ConnMaxLifetime) * time.Second, + DisableIndentity: redisconf.DisableIndentity, + IdentitySuffix: redisconf.IdentitySuffix, } rds = redis.NewClusterClient(opt) _, err := rds.Ping(context.TODO()).Result() @@ -155,6 +156,7 @@ func NewDao(config *config.Configuration) (*Dao, error) { return nil, apis.ErrUnsupportRedisServerMode } return &Dao{ + conf: conf, rds: rds, mode: mode, }, nil diff --git a/pkg/frontlas/repo/dao_edge.go b/pkg/frontlas/repo/dao_edge.go index 9777c00..7eee90f 100644 --- a/pkg/frontlas/repo/dao_edge.go +++ b/pkg/frontlas/repo/dao_edge.go @@ -8,7 +8,6 @@ import ( "strings" "time" - "github.com/singchia/frontier/pkg/frontlas/apis" "k8s.io/klog/v2" ) @@ -138,8 +137,9 @@ func (dao *Dao) SetEdgeAndAlive(edgeID uint64, edge *Edge, expiration time.Durat } pipeliner := dao.rds.TxPipeline() - // edge - pipeliner.Set(context.TODO(), getEdgeKey(edgeID), edgeData, 24*time.Hour) + // edge meta TODO expiration to custom + pipeliner.Set(context.TODO(), getEdgeKey(edgeID), edgeData, + time.Duration(dao.conf.FrontierManager.Expiration.ServiceMeta)*time.Second) // alive pipeliner.Set(context.TODO(), getAliveEdgeKey(edgeID), 1, expiration) // frontier edge_count @@ -154,13 +154,22 @@ func (dao *Dao) SetEdgeAndAlive(edgeID uint64, edge *Edge, expiration time.Durat } func (dao *Dao) ExpireEdge(edgeID uint64, expiration time.Duration) error { - ok, err := dao.rds.Expire(context.TODO(), getAliveEdgeKey(edgeID), expiration).Result() + pipeliner := dao.rds.TxPipeline() + // edge meta TODO expiration to custom + pipeliner.Expire(context.TODO(), getEdgeKey(edgeID), + time.Duration(dao.conf.FrontierManager.Expiration.ServiceMeta)*time.Second) + // edge alive + pipeliner.Expire(context.TODO(), getAliveEdgeKey(edgeID), expiration) + + cmds, err := pipeliner.Exec(context.TODO()) if err != nil { - klog.Errorf("dao expire edge, expire err: %s", err) + klog.Errorf("dao expire edge, pipeliner err: %s", err) return err } - if !ok { - return apis.ErrExpireFailed + for _, cmd := range cmds { + if cmd.Err() != nil { + return cmd.Err() + } } return nil } diff --git a/pkg/frontlas/repo/dao_service.go b/pkg/frontlas/repo/dao_service.go index 5c26465..b531dd5 100644 --- a/pkg/frontlas/repo/dao_service.go +++ b/pkg/frontlas/repo/dao_service.go @@ -8,7 +8,6 @@ import ( "strings" "time" - "github.com/singchia/frontier/pkg/frontlas/apis" "k8s.io/klog/v2" ) @@ -136,8 +135,9 @@ func (dao *Dao) SetServiceAndAlive(serviceID uint64, service *Service, expiratio } pipeliner := dao.rds.TxPipeline() - // service, // TODO set expiration - pipeliner.Set(context.TODO(), getServiceKey(serviceID), serviceData, 24*time.Hour) + // service meta TODO expiration to custom + pipeliner.Set(context.TODO(), getServiceKey(serviceID), serviceData, + time.Duration(dao.conf.FrontierManager.Expiration.ServiceMeta)*time.Second) // alive pipeliner.Set(context.TODO(), getAliveServiceKey(serviceID), 1, expiration) // frontier service_count @@ -152,20 +152,29 @@ func (dao *Dao) SetServiceAndAlive(serviceID uint64, service *Service, expiratio } func (dao *Dao) ExpireService(serviceID uint64, expiration time.Duration) error { - ok, err := dao.rds.Expire(context.TODO(), getAliveServiceKey(serviceID), expiration).Result() + pipeliner := dao.rds.TxPipeline() + // service meta TODO expiration to custom + pipeliner.Expire(context.TODO(), getServiceKey(serviceID), + time.Duration(dao.conf.FrontierManager.Expiration.ServiceMeta)*time.Second) + // service alive + pipeliner.Expire(context.TODO(), getAliveServiceKey(serviceID), expiration) + + cmds, err := pipeliner.Exec(context.TODO()) if err != nil { - klog.Errorf("dao expire service, expire err: %s", err) + klog.Errorf("dao expire service, pipeliner err: %s", err) return err } - if !ok { - return apis.ErrExpireFailed + for _, cmd := range cmds { + if cmd.Err() != nil { + return cmd.Err() + } } return nil } func (dao *Dao) DeleteService(serviceID uint64) error { _, err := dao.rds.Eval(context.TODO(), deleteFrontierScript, - []string{getServiceKey(serviceID), getAliveServiceKey(serviceID), servicesKeyPrefix}).Result() + []string{getServiceKey(serviceID), getAliveServiceKey(serviceID), frontiersKeyPrefix}).Result() if err != nil { klog.Errorf("dao delete service, eval err: %s", err) return err diff --git a/pkg/frontlas/repo/lua/service_delete.lua b/pkg/frontlas/repo/lua/service_delete.lua index 81c7116..7f41bd9 100644 --- a/pkg/frontlas/repo/lua/service_delete.lua +++ b/pkg/frontlas/repo/lua/service_delete.lua @@ -1,19 +1,19 @@ -local frontier_key = KEYS[1] -local frontier_alive_key = KEYS[2] +local service_key = KEYS[1] +local service_alive_key = KEYS[2] local frontier_key_prefix = KEYS[3] -- get frontier and it's frontier_id -local frontier = redis.call("GET", frontier_key) +local frontier = redis.call("GET", service_key) if frontier then local value = cjson.decode(frontier) local frontier_id = value['frontier_id'] if frontier_id then -- decrement the frontier_count in frontier local frontier_key = frontier_key_prefix .. tostring(frontier_id) - redis.call("HINCRBY", frontier_key, "frontier_count", -1) + redis.call("HINCRBY", frontier_key, "service_count", -1) end end -- remove frontier alive -local ret = redis.call("DEL", frontier_alive_key) +local ret = redis.call("DEL", service_alive_key) return ret \ No newline at end of file diff --git a/pkg/mapmap/bimap.go b/pkg/mapmap/bimap.go index 9927288..88307f2 100644 --- a/pkg/mapmap/bimap.go +++ b/pkg/mapmap/bimap.go @@ -23,7 +23,7 @@ func (bm *BiMap) Set(key, value any) { bm.kv[key] = value ks, ok := bm.vk[value] - if ok { + if !ok { ks = map[any]struct{}{} } ks[key] = struct{}{}