我又来更新辣 没想的那么难 维护好状态机是关键 这次提交还有不少bug:有几处可能的数据竞争 状态机中的bolt事务应该合并到一个函数执行,否则会导致一些数据被篡改

This commit is contained in:
impact-eintr
2022-02-14 18:12:51 +08:00
parent 9fadd11a78
commit 4be0ca9a21
4 changed files with 324 additions and 110 deletions

3
.gitignore vendored
View File

@@ -13,4 +13,5 @@
# Dependency directories (remove the comment below to include it)
# vendor/
raftd
cmd/raftd/raftd
cmd/client/client

View File

@@ -20,10 +20,11 @@ type Store interface {
Get(key string, lvl store.ConsistencyLevel) ([]byte, error)
Set(key string, value []byte) error
Delete(key string) error
LeaseGrant(ttl int) (uint64, error)
LeaseGrant(name string, ttl int) (uint64, error)
LeaseKeepAlive(leaseId, key string, value []byte) error
LeaseRevoke() error
LeaseTimeToAlive() error
GetLeaseKV(key string) ([]string, error)
Join(nodeID string, httpAddress string, addr string) error
LeaderAPIAddr() string
SetMeta(key, value string) error
@@ -99,6 +100,7 @@ func (s *Service) newRouter() (r *gin.Engine) {
leaseGroup.POST("/keepalive/:id", s.LeaseKeepAliveHandler())
leaseGroup.POST("/revoke/:id")
leaseGroup.POST("/timetolive/:id")
leaseGroup.GET("/kv/:name", s.GetKeyByLeaseHandler())
}
r.POST("/join", s.JoinHandler())
@@ -157,6 +159,33 @@ func (s *Service) GetKeyHandler() gin.HandlerFunc {
}
}
func (s *Service) GetKeyByLeaseHandler() gin.HandlerFunc {
return func(ctx *gin.Context) {
k := ctx.Param("name")
if k == "" {
ctx.JSON(http.StatusBadRequest, nil)
return
}
v, err := s.store.GetLeaseKV(k)
if err != nil {
if err == store.ErrNotLeader {
leader := s.store.LeaderAPIAddr()
if leader == "" {
ctx.JSON(http.StatusServiceUnavailable, err)
return
}
redirect := s.FormRedirect(ctx.Request, leader)
ctx.Redirect(http.StatusTemporaryRedirect, redirect)
return
}
ctx.JSON(http.StatusInternalServerError, err)
return
}
ctx.JSON(http.StatusOK, v)
}
}
func (s *Service) SetKeyHandler() gin.HandlerFunc {
return func(ctx *gin.Context) {
k := ctx.Param("key")
@@ -260,7 +289,7 @@ func (s *Service) JoinHandler() gin.HandlerFunc {
}
}
/* 租约机制 TODO 暂时是独立的*/
/* 租约机制 NOTICE 是独立的*/
// LeaseGrant 创建一个租约
func (s *Service) LeaseGrantHandler() gin.HandlerFunc {
@@ -277,7 +306,13 @@ func (s *Service) LeaseGrantHandler() gin.HandlerFunc {
return
}
v, err := s.store.LeaseGrant(ttlnum)
name := strings.TrimSpace(q.Get("name"))
if name == "" {
ctx.JSON(http.StatusBadRequest, nil)
return
}
v, err := s.store.LeaseGrant(name, ttlnum) // TODO 这里还可以承载更多的数据
if err != nil {
if err == store.ErrNotLeader {
leader := s.store.LeaderAPIAddr()

View File

@@ -11,6 +11,7 @@ import (
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
@@ -21,7 +22,8 @@ import (
)
const (
DefaultBucketName = "impact-eintr"
DefaultKVBucketName = "impact-eintr"
DefaultLeaseBucketName = "raftd-lease-system"
)
var (
@@ -44,10 +46,12 @@ const (
)
type command struct {
Op string `json:"op,omitempty"`
Key string `json:"key,omitempty"`
Value []byte `json:"value,omitempty"`
Op string `json:"op,omitempty"`
Key string `json:"key,omitempty"`
Value []byte `json:"value,omitempty"`
LeaseId uint64 `json:"leaseId,omitempty"`
Name string `json:"name,omitempty"`
TTL int `json:"ttl,omitempty"`
}
@@ -82,8 +86,10 @@ type Store struct {
raft *raft.Raft // 一致性机制
// 租约系统相关实现
snowflake *utils.Snowflake
wg WaitGroupWrapper
leases map[uint64][]string // LeaseId:[]key
logger *log.Logger
}
@@ -95,6 +101,7 @@ func New(bindID int64) *Store {
}
return &Store{
m: make(map[string][]byte),
leases: make(map[uint64][]string),
snowflake: sn,
logger: log.New(os.Stderr, "[store] ", log.LstdFlags),
}
@@ -143,7 +150,7 @@ func (s *Store) Open(enableSingle bool, localID string) error {
// 新建一个桶
err = db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists([]byte(DefaultBucketName))
_, err := tx.CreateBucketIfNotExists([]byte(DefaultKVBucketName))
if err != nil {
log.Fatalf("CreateBucketIfNotExists err:%s", err.Error())
return err
@@ -309,7 +316,7 @@ func (s *Store) Get(key string, lvl ConsistencyLevel) ([]byte, error) {
val := make([]byte, 0)
s.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(DefaultBucketName))
bucket := tx.Bucket([]byte(DefaultKVBucketName))
val = bucket.Get([]byte(key))
return nil
})
@@ -351,19 +358,31 @@ func (s *Store) Delete(key string) error {
return f.Error()
}
const (
CREATE = iota
ALIVE
DEAD
)
type LeaseMeta struct {
TTL int `json:"ttl"`
Name string `json:"name"`
TTL int `json:"ttl"`
Status int `json:"status"`
Count int `json:"count"`
}
// 创建新的租约
func (s *Store) LeaseGrant(ttl int) (uint64, error) {
func (s *Store) LeaseGrant(name string, ttl int) (uint64, error) {
if s.raft.State() != raft.Leader {
return 0, ErrNotLeader
}
leaseId := s.snowflake.Generate()
// 向状态机写入create 新建一个Lease
c := &command{
Op: "grant",
Name: name,
LeaseId: leaseId,
TTL: ttl,
}
@@ -372,27 +391,102 @@ func (s *Store) LeaseGrant(ttl int) (uint64, error) {
if err != nil {
return 0, err
}
f := s.raft.Apply(b, raftTimeout)
s.wg.Wrap(func() {
ticker := time.NewTicker(time.Second)
defer log.Println("退出LoopCheck")
defer ticker.Stop()
var start bool
// 操作状态机 alive dead
for {
var count int
select {
case <-ticker.C:
s.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(DefaultLeaseBucketName)).Bucket([]byte(fmt.Sprintf("%d", leaseId)))
// 判断是否有租客
if bucket.Stats().KeyN == 1 {
start = false
} else if bucket.Stats().KeyN > 1 {
start = true
}
// 向状态机写入alive 修改Lease中键值对的存活时间 TTL--
if start {
cur := bucket.Cursor()
for k, v := cur.First(); k != nil; k, v = cur.Next() {
// 跳过元数据
if string(k) == "meta" {
return nil
}
// 修改存活时间
ttl := binary.BigEndian.Uint32(v[:4])
ttl = ttl - 1
// 注意由于bolt的实现机制 遍历中的value原值只可以读 不可以修改 需要先复制再Put
b := make([]byte, len(v))
copy(b, v)
binary.BigEndian.PutUint32(b[:4], uint32(ttl))
c := &command{
Op: "loopcheck",
Key: string(k),
Value: b,
LeaseId: leaseId,
}
b, err := json.Marshal(c)
if err != nil {
return err
}
s.raft.Apply(b, raftTimeout)
}
}
return nil
})
// 查看Lease中存活的值
s.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(DefaultLeaseBucketName)).Bucket([]byte(fmt.Sprintf("%d", leaseId)))
if bucket == nil {
log.Println("没有找到对应的Lease", leaseId)
return fmt.Errorf("Lease[%d] hash been removed", leaseId)
}
b := bucket.Get([]byte("meta"))
meta := new(LeaseMeta)
err := json.Unmarshal(b, meta)
if err != nil {
return err
}
count = meta.Count
return nil
})
// 租约已经生效 并且 没有存活的键值 撤销该Lease
if count == 0 && start {
log.Printf("%d 租约已经失效 count=%d", leaseId, count)
c := &command{
Op: "revoke",
LeaseId: leaseId,
}
b, err := json.Marshal(c)
if err != nil {
continue
}
// 向状态机写入dead 撤销该Lease
s.raft.Apply(b, raftTimeout)
return
}
// case exitCh:
}
}
})
return leaseId, f.Error()
}
func (s *Store) LeaseRevoke() error {
if s.raft.State() != raft.Leader {
return ErrNotLeader
}
c := &command{
Op: "revoke",
}
b, err := json.Marshal(c)
if err != nil {
return err
}
f := s.raft.Apply(b, raftTimeout)
return f.Error()
}
func (s *Store) LeaseKeepAlive(leaseId, key string, value []byte) error {
if s.raft.State() != raft.Leader {
return ErrNotLeader
@@ -419,6 +513,22 @@ func (s *Store) LeaseKeepAlive(leaseId, key string, value []byte) error {
return f.Error()
}
func (s *Store) LeaseRevoke() error {
if s.raft.State() != raft.Leader {
return ErrNotLeader
}
c := &command{
Op: "revoke",
}
b, err := json.Marshal(c)
if err != nil {
return err
}
f := s.raft.Apply(b, raftTimeout)
return f.Error()
}
func (s *Store) LeaseTimeToAlive() error {
if s.raft.State() != raft.Leader {
return ErrNotLeader
@@ -435,6 +545,34 @@ func (s *Store) LeaseTimeToAlive() error {
return f.Error()
}
func (s *Store) GetLeaseKV(key string) ([]string, error) {
res := []string{}
s.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(DefaultLeaseBucketName))
if bucket == nil {
return fmt.Errorf("No any KV in LeaseSystem")
}
return bucket.ForEach(func(k, v []byte) error {
b := bucket.Bucket(k)
metabytes := b.Get([]byte("meta"))
log.Println(string(metabytes))
meta := &LeaseMeta{}
json.Unmarshal(metabytes, meta)
if strings.HasPrefix(meta.Name, key) {
b.ForEach(func(k, v []byte) error {
if string(k) == "meta" {
return nil
}
res = append(res, string(v[4:]))
return nil
})
}
return nil
})
})
return res, nil
}
func (s *Store) SetMeta(key, value string) error {
return s.Set(key, []byte(value))
}
@@ -506,11 +644,13 @@ func (f *fsm) Apply(l *raft.Log) interface{} {
case "delete":
return f.applyDelete(c.Key)
case "grant":
return f.applyLeaseGrant(c.LeaseId, c.TTL)
return f.applyLeaseGrant(c.Name, c.LeaseId, c.TTL)
case "loopcheck":
return f.applyLeaseLoopCheck(c.LeaseId, c.Key, c.Value)
case "keepalive":
return f.applyLeaseKeepAlive(c.LeaseId, c.Key, c.Value)
case "revoke":
return f.applyLeaseRevoke()
return f.applyLeaseRevoke(c.LeaseId)
case "timetolive":
return f.applyLeaseTimeToLive()
default:
@@ -522,7 +662,7 @@ func (f *fsm) applySet(key string, value []byte) interface{} {
f.mu.Lock()
defer f.mu.Unlock()
f.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(DefaultBucketName))
bucket := tx.Bucket([]byte(DefaultKVBucketName))
if err := bucket.Put([]byte(key), value); err != nil {
log.Fatalln(err)
return err
@@ -538,7 +678,7 @@ func (f *fsm) applyDelete(key string) interface{} {
f.mu.Lock()
defer f.mu.Unlock()
f.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(DefaultBucketName))
bucket := tx.Bucket([]byte(DefaultKVBucketName))
if err := bucket.Delete([]byte(key)); err != nil {
log.Fatalln(err)
return err
@@ -550,96 +690,44 @@ func (f *fsm) applyDelete(key string) interface{} {
return nil
}
func (f *fsm) applyLeaseGrant(leaseId uint64, ttl int) interface{} {
func (f *fsm) applyLeaseGrant(name string, leaseId uint64, ttl int) interface{} {
key := fmt.Sprintf("%d", leaseId)
// 创建子桶
if err := f.db.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists([]byte(key))
bucket, err := tx.CreateBucketIfNotExists([]byte(DefaultLeaseBucketName))
if err != nil {
return err
}
b, err := json.Marshal(LeaseMeta{TTL: ttl})
bucket, err = bucket.CreateBucketIfNotExists([]byte(key))
if err != nil {
return err
}
b, err := json.Marshal(LeaseMeta{Name: name, TTL: ttl, Status: CREATE, Count: 0})
if err != nil {
return err
}
// 保存元数据
return bucket.Put([]byte("meta"), b)
}); err != nil {
log.Fatalln(err)
log.Println(err)
return err
}
f.wg.Wrap(func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
var start bool
for {
var count int
select {
case <-ticker.C:
// 每秒遍历当前桶 所有值TTL--
f.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(key))
// 判断是否有租客
if bucket.Stats().KeyN == 1 {
start = false
} else if bucket.Stats().KeyN > 1 {
start = true
}
cur := bucket.Cursor()
for k, v := cur.First(); k != nil; k, v = cur.Next() {
// 跳过元数据
if string(k) == "meta" {
return nil
}
// key:value => key:[ttl(int32 4bytes)][raw value]
ttl := binary.BigEndian.Uint32(v[:4])
if ttl > 0 {
count = count + 1
ttl = ttl - 1
// 注意由于bolt的实现机制 遍历中的value原值只可以读 不可以修改 需要先复制再Put
b := make([]byte, len(v))
copy(b, v)
binary.BigEndian.PutUint32(b[:4], uint32(ttl))
bucket.Put([]byte(k), b)
} else if ttl == 0 {
// 删除这个键值对
cur.Delete()
}
}
return nil
})
// 租约已经生效 并且 没有存活的键值 退出该租约 并删除对应的桶
if count == 0 && start {
log.Printf("%d 租约已经失效", leaseId)
f.db.Update(func(tx *bolt.Tx) error {
return tx.DeleteBucket([]byte(key))
})
return
}
// case exitCh:
}
}
})
return leaseId
}
func (f *fsm) applyLeaseRevoke() interface{} {
return nil
}
func (f *fsm) applyLeaseKeepAlive(leaseId uint64, key string, value []byte) interface{} {
var ttl int
Id := fmt.Sprintf("%d", leaseId)
// value是带有ttl头部的值
func (f *fsm) applyLeaseLoopCheck(leaseId uint64, key string, value []byte) interface{} {
var ttl, status, count int
var name string
err := f.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(Id))
bucket := tx.Bucket([]byte(DefaultLeaseBucketName)).Bucket([]byte(fmt.Sprintf("%d", leaseId)))
if bucket == nil {
return fmt.Errorf("Lease[%s] hash been removed", Id)
log.Println("没有找到对应的Lease", leaseId)
return fmt.Errorf("Lease[%d] hash been removed", leaseId)
}
b := bucket.Get([]byte("meta"))
@@ -648,28 +736,118 @@ func (f *fsm) applyLeaseKeepAlive(leaseId uint64, key string, value []byte) inte
if err != nil {
return err
}
name = meta.Name
ttl = meta.TTL
status = meta.Status
count = meta.Count
return nil
})
if err != nil {
return err
}
err = f.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(Id))
if status == DEAD {
log.Println("一个已经被撤销的Lease")
return nil
}
// Lease仍然有效 改变对应键值对的状态
return f.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(DefaultLeaseBucketName)).Bucket([]byte(fmt.Sprintf("%d", leaseId)))
if bucket == nil {
return fmt.Errorf("Lease[%s] hash been removed", Id)
return fmt.Errorf("Lease[%d] hash been removed", leaseId)
}
b := append(make([]byte, 4), value...)
binary.BigEndian.PutUint32(b[:4], uint32(ttl))
return bucket.Put([]byte(key), b)
newttl := binary.BigEndian.Uint32(value[:4])
log.Printf("[%d]状态检测: TTL:%d Now:%d AliveKeys:%d", leaseId, ttl, newttl, count)
if newttl > 0 {
b := make([]byte, len(value))
copy(b, value)
binary.BigEndian.PutUint32(b[:4], uint32(newttl))
return bucket.Put([]byte(key), b)
} else {
// 这个键值对已经失效
count--
b, _ := json.Marshal(LeaseMeta{Name: name, TTL: ttl, Status: ALIVE, Count: count}) // 更新Lease状态
bucket.Put([]byte("meta"), b)
return bucket.Delete([]byte(fmt.Sprintf("%d", leaseId)))
}
})
}
// value是不带有ttl头部的值
func (f *fsm) applyLeaseKeepAlive(leaseId uint64, key string, value []byte) interface{} {
var ttl, status, count int
var name string
err := f.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(DefaultLeaseBucketName)).Bucket([]byte(fmt.Sprintf("%d", leaseId)))
if bucket == nil {
log.Println("没有找到对应的Lease", leaseId)
return fmt.Errorf("Lease[%d] hash been removed", leaseId)
}
b := bucket.Get([]byte("meta"))
meta := new(LeaseMeta)
err := json.Unmarshal(b, meta)
if err != nil {
return err
}
name = meta.Name
ttl = meta.TTL
status = meta.Status
count = meta.Count
return nil
})
if err != nil {
return err
}
return nil
if status == DEAD {
log.Println("一个已经被撤销的Lease")
return nil
}
// Lease仍然有效 改变对应键值对的状态
return f.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(DefaultLeaseBucketName)).Bucket([]byte(fmt.Sprintf("%d", leaseId)))
if bucket == nil {
return fmt.Errorf("Lease[%d] hash been removed", leaseId)
}
// 检测这个值是否出现过 TODO 这里有数据竞争
var flag bool
for _, v := range f.leases[leaseId] {
if v == key {
flag = true
break
}
}
if !flag {
count++ // 没有出现过 是一个新值 添加一下
f.leases[leaseId] = append(f.leases[leaseId], key)
}
b, _ := json.Marshal(LeaseMeta{Name: name, TTL: ttl, Status: ALIVE, Count: count}) // 更新Lease状态
bucket.Put([]byte("meta"), b)
// 这里是存放新值 value是原数据 没有ttl前缀
b = make([]byte, 4+len(value))
copy(b[4:], value)
binary.BigEndian.PutUint32(b[:4], uint32(ttl))
return bucket.Put([]byte(key), b)
})
}
func (f *fsm) applyLeaseRevoke(leaseId uint64) interface{} {
return f.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(DefaultLeaseBucketName)).Bucket([]byte(fmt.Sprintf("%d", leaseId)))
if bucket == nil {
return fmt.Errorf("Lease[%d] hash been removed", leaseId)
}
b, _ := json.Marshal(LeaseMeta{Status: DEAD}) // 更新Lease状态
bucket.Put([]byte("meta"), b)
return tx.Bucket([]byte(DefaultLeaseBucketName)).DeleteBucket([]byte(fmt.Sprintf("%d", leaseId)))
})
}
func (f *fsm) applyLeaseTimeToLive() interface{} {
@@ -711,7 +889,7 @@ func (f *fsm) Restore(rc io.ReadCloser) error {
f.db = db
err = db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists([]byte(DefaultBucketName))
_, err := tx.CreateBucketIfNotExists([]byte(DefaultKVBucketName))
if err != nil {
log.Fatalf("CreateBucketIfNotExists err:%s", err.Error())
return err
@@ -725,7 +903,7 @@ func (f *fsm) Restore(rc io.ReadCloser) error {
// 用快照中的数据更新状态
for k, v := range o {
err = db.Batch(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(DefaultBucketName))
bucket := tx.Bucket([]byte(DefaultKVBucketName))
return bucket.Put([]byte(k), v)
})
if err != nil {