dao: add buntdb memory mode and dummy implemention

This commit is contained in:
Auztin Zhai
2024-05-19 10:45:25 -04:00
parent 09a37dd6d1
commit be3f1aac8a
4 changed files with 499 additions and 0 deletions

View File

@@ -0,0 +1,118 @@
package membuntdb
import (
"errors"
"github.com/singchia/frontier/pkg/frontier/config"
"github.com/tidwall/buntdb"
"k8s.io/klog/v2"
)
const (
IdxEdge_Meta = "idx_edge_meta"
IdxEdge_Addr = "idx_edge_addr"
IdxEdge_CreateTime = "idx_create_time"
IdxEdgeRPC_RPC = "idx_edgerpc_rpc"
IdxEdgeRPC_EdgeID = "idx_edgerpc_edge_id"
IdxEdgeRPC_CreateTime = "idx_edgerpc_create_time"
IdxService_Service = "idx_service_service"
IdxService_Addr = "idx_service_addr"
IdxService_CreateTime = "index_service_create_time"
IdxServiceRPC_RPC = "idx_servicerpc_rpc"
IdxServiceRPC_ServiceID = "idx_servicerpc_service_id"
IdxServiceRPC_CreateTime = "idx_servicerpc_create_time"
IdxServiceTopic_Topic = "idx_servicetopic_topic"
IdxServiceTopic_ServiceID = "idx_servicetopic_service_id"
IdxServiceTopic_CreateTime = "idx_servicetopic_create_time"
)
var (
ErrUnimplemented = errors.New("unimplemented")
)
type dao struct {
db *buntdb.DB
config *config.Configuration
}
func NewDao(config *config.Configuration) (*dao, error) {
db, err := buntdb.Open(":memory:")
if err != nil {
klog.Errorf("dao open buntdb err: %s", err)
return nil, err
}
db.SetConfig(buntdb.Config{})
// edge's indexes
err = db.CreateIndex(IdxEdge_Meta, "edges*", buntdb.IndexJSON("meta"))
if err != nil {
return nil, err
}
err = db.CreateIndex(IdxEdge_Addr, "edges*", buntdb.IndexJSON("addr"))
if err != nil {
return nil, err
}
err = db.CreateIndex(IdxEdge_CreateTime, "edges*", buntdb.IndexJSON("create_time"))
if err != nil {
return nil, err
}
// edgeRPC's indexes
err = db.CreateIndex(IdxEdgeRPC_RPC, "edge_rpcs*", buntdb.IndexJSON("rpc"))
if err != nil {
return nil, err
}
err = db.CreateIndex(IdxEdgeRPC_EdgeID, "edge_rpcs*", buntdb.IndexJSON("edge_id"))
if err != nil {
return nil, err
}
err = db.CreateIndex(IdxEdgeRPC_CreateTime, "edge_rpcs*", buntdb.IndexJSON("create_time"))
if err != nil {
return nil, err
}
// service's indexes
err = db.CreateIndex(IdxService_Service, "services*", buntdb.IndexJSON("service"))
if err != nil {
return nil, err
}
err = db.CreateIndex(IdxService_Addr, "services*", buntdb.IndexJSON("addr"))
if err != nil {
return nil, err
}
err = db.CreateIndex(IdxService_CreateTime, "services*", buntdb.IndexJSON("create_time"))
if err != nil {
return nil, err
}
// serviceRPC's indexes
err = db.CreateIndex(IdxServiceRPC_RPC, "service_rpcs*", buntdb.IndexJSON("rpc"))
if err != nil {
return nil, err
}
err = db.CreateIndex(IdxServiceRPC_ServiceID, "service_rpcs*", buntdb.IndexJSON("service_id"))
if err != nil {
return nil, err
}
err = db.CreateIndex(IdxServiceRPC_CreateTime, "service_rpcs*", buntdb.IndexJSON("create_time"))
if err != nil {
return nil, err
}
// serviceTopics's indexes
err = db.CreateIndex(IdxServiceTopic_Topic, "service_topics*", buntdb.IndexJSON("topic"))
if err != nil {
return nil, err
}
err = db.CreateIndex(IdxServiceTopic_ServiceID, "service_topics*", buntdb.IndexJSON("service_id"))
if err != nil {
return nil, err
}
err = db.CreateIndex(IdxServiceTopic_CreateTime, "service_topics*", buntdb.IndexJSON("create_time"))
if err != nil {
return nil, err
}
return &dao{
db: db,
config: config,
}, nil
}
func (dao *dao) Close() error {
return dao.db.Close()
}

View File

@@ -0,0 +1,118 @@
package membuntdb
import (
"encoding/json"
"fmt"
"strconv"
"strings"
"github.com/singchia/frontier/pkg/frontier/repo/model"
"github.com/singchia/frontier/pkg/frontier/repo/query"
"github.com/tidwall/buntdb"
)
func (dao *dao) ListEdges(query *query.EdgeQuery) ([]*model.Edge, error) {
edges := []*model.Edge{}
err := dao.db.View(func(tx *buntdb.Tx) error {
var (
err error
finderr error
)
// query on addr
if query.Addr != "" {
pivot := fmt.Sprintf(`{"addr":%s}`, query.Addr)
finderr = tx.AscendGreaterOrEqual(IdxEdge_Addr, pivot, func(key, value string) bool {
edge := &model.Edge{}
err = json.Unmarshal([]byte(value), edge)
if err != nil {
return false
}
if !strings.HasPrefix(edge.Addr, query.Addr) {
return false
}
edges = append(edges, edge)
return true
})
}
if err != nil {
return err
}
if finderr != nil {
return finderr
}
return nil
})
return edges, err
}
func (dao *dao) CountEdges(query *query.EdgeQuery) (int64, error) {
return 0, ErrUnimplemented
}
func (dao *dao) GetEdge(edgeID uint64) (*model.Edge, error) {
edge := &model.Edge{}
err := dao.db.View(func(tx *buntdb.Tx) error {
value, err := tx.Get(getEdgeKey(edgeID))
if err != nil {
return err
}
err = json.Unmarshal([]byte(value), edge)
return err
})
return edge, err
}
func (dao *dao) DeleteEdge(delete *query.EdgeDelete) error {
err := dao.db.Update(func(tx *buntdb.Tx) error {
if delete.EdgeID != 0 {
_, err := tx.Delete(getEdgeKey(delete.EdgeID))
return err
}
return nil
})
return err
}
func (dao *dao) CreateEdge(edge *model.Edge) error {
err := dao.db.Update(func(tx *buntdb.Tx) error {
data, err := json.Marshal(edge)
if err != nil {
return err
}
_, _, err = tx.Set(getEdgeKey(edge.EdgeID), string(data), nil)
return err
})
return err
}
func getEdgeKey(edgeID uint64) string {
return "edges:" + strconv.FormatUint(edgeID, 10)
}
func (dao *dao) ListEdgeRPCs(query *query.EdgeRPCQuery) ([]string, error) {
return nil, ErrUnimplemented
}
func (dao *dao) CountEdgeRPCs(query *query.EdgeRPCQuery) (int64, error) {
return 0, ErrUnimplemented
}
func (dao *dao) DeleteEdgeRPCs(edgeID uint64) error {
return nil
}
func (dao *dao) CreateEdgeRPC(rpc *model.EdgeRPC) error {
err := dao.db.Update(func(tx *buntdb.Tx) error {
data, err := json.Marshal(rpc)
if err != nil {
return err
}
_, _, err = tx.Set(getEdgeRPCKey(rpc.EdgeID, rpc.RPC), string(data), nil)
return err
})
return err
}
func getEdgeRPCKey(edgeID uint64, rpc string) string {
return "edge_rpcs:" + strconv.FormatUint(edgeID, 10) + "-" + rpc
}

View File

@@ -0,0 +1,144 @@
package membuntdb
import (
"encoding/json"
"fmt"
"strconv"
"github.com/singchia/frontier/pkg/frontier/repo/model"
"github.com/singchia/frontier/pkg/frontier/repo/query"
"github.com/tidwall/buntdb"
)
func (dao *dao) ListServices(query *query.ServiceQuery) ([]*model.Service, error) {
return nil, ErrUnimplemented
}
func (dao *dao) CountServices(query *query.ServiceQuery) (int64, error) {
return 0, ErrUnimplemented
}
func (dao *dao) GetService(serviceID uint64) (*model.Service, error) {
return nil, ErrUnimplemented
}
func (dao *dao) GetServiceByName(name string) (*model.Service, error) {
return nil, ErrUnimplemented
}
func (dao *dao) DeleteService(delete *query.ServiceDelete) error {
err := dao.db.Update(func(tx *buntdb.Tx) error {
if delete.ServiceID != 0 {
_, err := tx.Delete(getServiceKey(delete.ServiceID))
return err
}
return nil
})
return err
}
func (dao *dao) CreateService(service *model.Service) error {
err := dao.db.Update(func(tx *buntdb.Tx) error {
data, err := json.Marshal(service)
if err != nil {
return err
}
_, _, err = tx.Set(getServiceKey(service.ServiceID), string(data), nil)
return err
})
return err
}
func getServiceKey(serviceID uint64) string {
return "services:" + strconv.FormatUint(serviceID, 10)
}
// service rpc
func (dao *dao) GetServiceRPC(rpc string) (*model.ServiceRPC, error) {
serviceRPC := &model.ServiceRPC{}
err := dao.db.View(func(tx *buntdb.Tx) error {
var (
err error
finderr error
)
pivot := fmt.Sprintf(`{"rpc":"%s"}`, rpc)
finderr = tx.AscendEqual(IdxServiceRPC_RPC, pivot, func(key, value string) bool {
// TODO return random one
err = json.Unmarshal([]byte(value), serviceRPC)
if err != nil {
return false
}
return false
})
if err != nil {
return err
}
if finderr != nil {
return finderr
}
return nil
})
return serviceRPC, err
}
func (dao *dao) ListServiceRPCs(query *query.ServiceRPCQuery) ([]string, error) {
return nil, ErrUnimplemented
}
func (dao *dao) CountServiceRPCs(query *query.ServiceRPCQuery) (int64, error) {
return 0, ErrUnimplemented
}
func (dao *dao) DeleteServiceRPCs(serviceID uint64) error {
err := dao.db.Update(func(tx *buntdb.Tx) error {
var delkeys []string
pivot := fmt.Sprintf(`{"service_id":%d}`, serviceID)
tx.AscendEqual(IdxServiceRPC_ServiceID, pivot, func(key, value string) bool {
delkeys = append(delkeys, key)
return true
})
for _, key := range delkeys {
if _, err := tx.Delete(key); err != nil {
return err
}
}
return nil
})
return err
}
func (dao *dao) CreateServiceRPC(rpc *model.ServiceRPC) error {
err := dao.db.Update(func(tx *buntdb.Tx) error {
data, err := json.Marshal(rpc)
if err != nil {
return err
}
_, _, err = tx.Set(getServiceRPCKey(rpc.ServiceID, rpc.RPC), string(data), nil)
return err
})
return err
}
func getServiceRPCKey(serviceID uint64, rpc string) string {
return "service_rpcs:" + strconv.FormatUint(serviceID, 10) + "-" + rpc
}
func (dao *dao) GetServiceTopic(topic string) (*model.ServiceTopic, error) {
return nil, ErrUnimplemented
}
func (dao *dao) ListServiceTopics(query *query.ServiceTopicQuery) ([]string, error) {
return nil, ErrUnimplemented
}
func (dao *dao) CountServiceTopics(query *query.ServiceTopicQuery) (int64, error) {
return 0, ErrUnimplemented
}
func (dao *dao) DeleteServiceTopics(serviceID uint64) error {
return ErrUnimplemented
}
func (dao *dao) CreateServiceTopic(topic *model.ServiceTopic) error {
return ErrUnimplemented
}

View File

@@ -0,0 +1,119 @@
package memdummy
import (
"errors"
"github.com/singchia/frontier/pkg/frontier/config"
"github.com/singchia/frontier/pkg/frontier/repo/model"
"github.com/singchia/frontier/pkg/frontier/repo/query"
)
type dao struct {
}
func NewDao(config *config.Configuration) (*dao, error) {
return &dao{}, nil
}
func (dao *dao) Close() error {
return nil
}
func (dao *dao) CountEdgeRPCs(query *query.EdgeRPCQuery) (int64, error) {
return 0, nil
}
func (dao *dao) CountEdges(query *query.EdgeQuery) (int64, error) {
return 0, nil
}
func (dao *dao) CountServiceRPCs(query *query.ServiceRPCQuery) (int64, error) {
return 0, nil
}
func (dao *dao) CountServiceTopics(query *query.ServiceTopicQuery) (int64, error) {
return 0, nil
}
func (dao *dao) CountServices(query *query.ServiceQuery) (int64, error) {
return 0, nil
}
func (dao *dao) CreateEdge(edge *model.Edge) error {
return nil
}
func (dao *dao) CreateEdgeRPC(rpc *model.EdgeRPC) error {
return nil
}
func (dao *dao) CreateService(service *model.Service) error {
return nil
}
func (dao *dao) CreateServiceRPC(rpc *model.ServiceRPC) error {
return nil
}
func (dao *dao) CreateServiceTopic(topic *model.ServiceTopic) error {
return nil
}
func (dao *dao) DeleteEdge(delete *query.EdgeDelete) error {
return nil
}
func (dao *dao) DeleteEdgeRPCs(edgeID uint64) error {
return nil
}
func (dao *dao) DeleteService(delete *query.ServiceDelete) error {
return nil
}
func (dao *dao) DeleteServiceRPCs(serviceID uint64) error {
return nil
}
func (dao *dao) DeleteServiceTopics(serviceID uint64) error {
return nil
}
func (dao *dao) GetEdge(edgeID uint64) (*model.Edge, error) {
return nil, errors.New("not found")
}
func (dao *dao) GetService(serviceID uint64) (*model.Service, error) {
return nil, errors.New("not found")
}
func (dao *dao) GetServiceByName(name string) (*model.Service, error) {
return nil, errors.New("not found")
}
func (dao *dao) GetServiceRPC(rpc string) (*model.ServiceRPC, error) {
return nil, errors.New("not found")
}
func (dao *dao) GetServiceTopic(topic string) (*model.ServiceTopic, error) {
return nil, errors.New("not found")
}
func (dao *dao) ListEdgeRPCs(query *query.EdgeRPCQuery) ([]string, error) {
return nil, errors.New("not found")
}
func (dao *dao) ListEdges(query *query.EdgeQuery) ([]*model.Edge, error) {
return nil, errors.New("not found")
}
func (dao *dao) ListServiceRPCs(query *query.ServiceRPCQuery) ([]string, error) {
return nil, errors.New("not found")
}
func (dao *dao) ListServiceTopics(query *query.ServiceTopicQuery) ([]string, error) {
return nil, errors.New("not found")
}
func (dao *dao) ListServices(query *query.ServiceQuery) ([]*model.Service, error) {
return nil, errors.New("not found")
}