mirror of
https://github.com/EchoVault/SugarDB.git
synced 2025-10-13 03:34:07 +08:00
Use memory footprint of sugardb.store to compare against max memory for eviction policies (#133)
Improved memory usage reporting. - @osteensco
This commit is contained in:
11686
coverage/coverage.out
11686
coverage/coverage.out
File diff suppressed because it is too large
Load Diff
@@ -67,3 +67,8 @@ const (
|
||||
AllKeysRandom = "allkeys-random"
|
||||
VolatileRandom = "volatile-random"
|
||||
)
|
||||
|
||||
// CompositeTypes are SugarDB KeyData Value types like set, sorted set, etc.
|
||||
type CompositeType interface {
|
||||
GetMem() int64
|
||||
}
|
||||
|
@@ -392,6 +392,7 @@ func Test_AdminCommands(t *testing.T) {
|
||||
respConn := resp.NewConn(conn)
|
||||
|
||||
for i := 0; i < len(tests); i++ {
|
||||
t.Log(tests[i].name)
|
||||
if len(tests[i].wantExecRes) > 0 {
|
||||
// If the length of execCommand is > 0, write the command to the connection.
|
||||
if err := respConn.WriteArray(tests[i].execCommand); err != nil {
|
||||
|
@@ -3353,7 +3353,7 @@ func Test_LFU_Generic(t *testing.T) {
|
||||
DataDir: "",
|
||||
EvictionPolicy: constants.AllKeysLFU,
|
||||
EvictionInterval: duration,
|
||||
MaxMemory: 4000000,
|
||||
MaxMemory: 550,
|
||||
}),
|
||||
)
|
||||
if err != nil {
|
||||
@@ -3534,7 +3534,7 @@ func Test_LRU_Generic(t *testing.T) {
|
||||
DataDir: "",
|
||||
EvictionPolicy: constants.AllKeysLRU,
|
||||
EvictionInterval: duration,
|
||||
MaxMemory: 4000000,
|
||||
MaxMemory: 550,
|
||||
}),
|
||||
)
|
||||
if err != nil {
|
||||
|
@@ -17,12 +17,13 @@ package pubsub
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/gobwas/glob"
|
||||
"github.com/tidwall/resp"
|
||||
"log"
|
||||
"net"
|
||||
"slices"
|
||||
"sync"
|
||||
|
||||
"github.com/gobwas/glob"
|
||||
"github.com/tidwall/resp"
|
||||
)
|
||||
|
||||
type PubSub struct {
|
||||
|
@@ -15,9 +15,12 @@
|
||||
package set
|
||||
|
||||
import (
|
||||
"github.com/echovault/sugardb/internal"
|
||||
"math/rand"
|
||||
"slices"
|
||||
"unsafe"
|
||||
|
||||
"github.com/echovault/sugardb/internal"
|
||||
"github.com/echovault/sugardb/internal/constants"
|
||||
)
|
||||
|
||||
type Set struct {
|
||||
@@ -25,6 +28,23 @@ type Set struct {
|
||||
length int
|
||||
}
|
||||
|
||||
func (s *Set) GetMem() int64 {
|
||||
var size int64
|
||||
size += int64(unsafe.Sizeof(s))
|
||||
// above only gives us the size of the pointer to the map, so we need to add it's headers and contents
|
||||
size += int64(unsafe.Sizeof(s.members))
|
||||
for k, v := range s.members {
|
||||
size += int64(unsafe.Sizeof(k))
|
||||
size += int64(len(k))
|
||||
size += int64(unsafe.Sizeof(v))
|
||||
}
|
||||
|
||||
return size
|
||||
}
|
||||
|
||||
// compile time interface check
|
||||
var _ constants.CompositeType = (*Set)(nil)
|
||||
|
||||
func NewSet(elems []string) *Set {
|
||||
set := &Set{
|
||||
members: make(map[string]interface{}),
|
||||
|
@@ -17,11 +17,14 @@ package sorted_set
|
||||
import (
|
||||
"cmp"
|
||||
"errors"
|
||||
"github.com/echovault/sugardb/internal"
|
||||
"math"
|
||||
"math/rand"
|
||||
"slices"
|
||||
"strings"
|
||||
"unsafe"
|
||||
|
||||
"github.com/echovault/sugardb/internal"
|
||||
"github.com/echovault/sugardb/internal/constants"
|
||||
)
|
||||
|
||||
type Value string
|
||||
@@ -45,6 +48,29 @@ type SortedSet struct {
|
||||
members map[Value]MemberObject
|
||||
}
|
||||
|
||||
func (s *SortedSet) GetMem() int64 {
|
||||
var size int64
|
||||
// map header
|
||||
size += int64(unsafe.Sizeof(s))
|
||||
// map contents
|
||||
for k, v := range s.members {
|
||||
// string header
|
||||
size += int64(unsafe.Sizeof(k))
|
||||
// string
|
||||
size += int64(len(k))
|
||||
// MemberObject
|
||||
size += int64(unsafe.Sizeof(v))
|
||||
// value field
|
||||
size += int64(unsafe.Sizeof(v.Value))
|
||||
size += int64(len(v.Value))
|
||||
}
|
||||
|
||||
return size
|
||||
}
|
||||
|
||||
// compile time interface check
|
||||
var _ constants.CompositeType = (*SortedSet)(nil)
|
||||
|
||||
func NewSortedSet(members []MemberParam) *SortedSet {
|
||||
s := &SortedSet{
|
||||
members: make(map[Value]MemberObject),
|
||||
|
@@ -16,9 +16,15 @@ package internal
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/echovault/sugardb/internal/clock"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"reflect"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/echovault/sugardb/internal/clock"
|
||||
"github.com/echovault/sugardb/internal/constants"
|
||||
)
|
||||
|
||||
type KeyData struct {
|
||||
@@ -26,6 +32,66 @@ type KeyData struct {
|
||||
ExpireAt time.Time
|
||||
}
|
||||
|
||||
func (k *KeyData) GetMem() (int64, error) {
|
||||
var size int64
|
||||
size = int64(unsafe.Sizeof(k.ExpireAt))
|
||||
|
||||
// check type of Value field
|
||||
switch v := k.Value.(type) {
|
||||
case nil:
|
||||
size += 0
|
||||
// AdaptType() will always ensure data type is of string, float64 or int.
|
||||
case int:
|
||||
size += int64(unsafe.Sizeof(v))
|
||||
// int64 data type used with module.SET
|
||||
case float64, int64:
|
||||
size += 8
|
||||
case string:
|
||||
// Add the size of the header and the number of bytes of the string
|
||||
size += int64(unsafe.Sizeof(v))
|
||||
size += int64(len(v))
|
||||
|
||||
// handle hash
|
||||
// AdaptType() will always ensure data type is of string, float64 or int.
|
||||
case map[string]interface{}:
|
||||
// Map headers
|
||||
size += int64(unsafe.Sizeof(v))
|
||||
|
||||
for key, val := range v {
|
||||
size += int64(unsafe.Sizeof(key))
|
||||
size += int64(len(key))
|
||||
switch vt := val.(type) {
|
||||
|
||||
case nil:
|
||||
size += 0
|
||||
case int:
|
||||
size += int64(unsafe.Sizeof(vt))
|
||||
case float64, int64:
|
||||
size += 8
|
||||
case string:
|
||||
size += int64(unsafe.Sizeof(vt))
|
||||
size += int64(len(vt))
|
||||
}
|
||||
}
|
||||
|
||||
// handle list
|
||||
case []string:
|
||||
for _, s := range v {
|
||||
size += int64(unsafe.Sizeof(s))
|
||||
size += int64(len(s))
|
||||
}
|
||||
|
||||
// handle non primitive datatypes like set and sorted set
|
||||
case constants.CompositeType:
|
||||
size += k.Value.(constants.CompositeType).GetMem()
|
||||
|
||||
default:
|
||||
return 0, errors.New(fmt.Sprintf("ERROR: type %v is not supported in method KeyData.GetMem()", reflect.TypeOf(v)))
|
||||
}
|
||||
|
||||
return size, nil
|
||||
}
|
||||
|
||||
type ContextServerID string
|
||||
type ContextConnID string
|
||||
|
||||
@@ -57,6 +123,8 @@ type ServerInfo struct {
|
||||
Mode string
|
||||
Role string
|
||||
Modules []string
|
||||
MemoryUsed int64
|
||||
MaxMemory uint64
|
||||
}
|
||||
|
||||
// ConnectionInfo holds information about the connection
|
||||
|
@@ -21,7 +21,6 @@ import (
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/echovault/sugardb/internal/constants"
|
||||
"io"
|
||||
"log"
|
||||
"math/big"
|
||||
@@ -34,6 +33,7 @@ import (
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/echovault/sugardb/internal/constants"
|
||||
"github.com/sethvargo/go-retry"
|
||||
"github.com/tidwall/resp"
|
||||
)
|
||||
@@ -187,16 +187,13 @@ func ParseMemory(memory string) (uint64, error) {
|
||||
}
|
||||
|
||||
// IsMaxMemoryExceeded checks whether we have exceeded the current maximum memory limit.
|
||||
func IsMaxMemoryExceeded(maxMemory uint64) bool {
|
||||
func IsMaxMemoryExceeded(memUsed int64, maxMemory uint64) bool {
|
||||
if maxMemory == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
var memStats runtime.MemStats
|
||||
runtime.ReadMemStats(&memStats)
|
||||
|
||||
// If we're currently using less than the configured max memory, return false.
|
||||
if memStats.HeapInuse < maxMemory {
|
||||
if uint64(memUsed) < maxMemory {
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -204,10 +201,9 @@ func IsMaxMemoryExceeded(maxMemory uint64) bool {
|
||||
// This measure is to prevent deleting keys that may be important when some memory can be reclaimed
|
||||
// by just collecting garbage.
|
||||
runtime.GC()
|
||||
runtime.ReadMemStats(&memStats)
|
||||
|
||||
// Return true when whe are above or equal to max memory.
|
||||
return memStats.HeapInuse >= maxMemory
|
||||
return uint64(memUsed) >= maxMemory
|
||||
}
|
||||
|
||||
// FilterExpiredKeys filters out keys that are already expired, so they are not persisted.
|
||||
|
@@ -48,6 +48,8 @@ func (server *SugarDB) GetServerInfo() internal.ServerInfo {
|
||||
return "replica"
|
||||
}(),
|
||||
Modules: server.ListModules(),
|
||||
MemoryUsed: server.memUsed,
|
||||
MaxMemory: server.config.MaxMemory,
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -19,9 +19,6 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/echovault/sugardb/internal"
|
||||
"github.com/echovault/sugardb/internal/constants"
|
||||
"github.com/echovault/sugardb/internal/eviction"
|
||||
"log"
|
||||
"math/rand"
|
||||
"runtime"
|
||||
@@ -29,6 +26,11 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/echovault/sugardb/internal"
|
||||
"github.com/echovault/sugardb/internal/constants"
|
||||
"github.com/echovault/sugardb/internal/eviction"
|
||||
)
|
||||
|
||||
// SwapDBs swaps every TCP client connection from database1 over to database2.
|
||||
@@ -199,7 +201,8 @@ func (server *SugarDB) setValues(ctx context.Context, entries map[string]interfa
|
||||
server.storeLock.Lock()
|
||||
defer server.storeLock.Unlock()
|
||||
|
||||
if internal.IsMaxMemoryExceeded(server.config.MaxMemory) && server.config.EvictionPolicy == constants.NoEviction {
|
||||
if internal.IsMaxMemoryExceeded(server.memUsed, server.config.MaxMemory) && server.config.EvictionPolicy == constants.NoEviction {
|
||||
|
||||
return errors.New("max memory reached, key value not set")
|
||||
}
|
||||
|
||||
@@ -219,6 +222,15 @@ func (server *SugarDB) setValues(ctx context.Context, entries map[string]interfa
|
||||
Value: value,
|
||||
ExpireAt: expireAt,
|
||||
}
|
||||
data := server.store[database][key]
|
||||
mem, err := data.GetMem()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
server.memUsed += mem
|
||||
server.memUsed += int64(unsafe.Sizeof(key))
|
||||
server.memUsed += int64(len(key))
|
||||
|
||||
if !server.isInCluster() {
|
||||
server.snapshotEngine.IncrementChangeCount()
|
||||
}
|
||||
@@ -269,6 +281,16 @@ func (server *SugarDB) setExpiry(ctx context.Context, key string, expireAt time.
|
||||
func (server *SugarDB) deleteKey(ctx context.Context, key string) error {
|
||||
database := ctx.Value("Database").(int)
|
||||
|
||||
// Deduct memory usage in tracker.
|
||||
data := server.store[database][key]
|
||||
mem, err := data.GetMem()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
server.memUsed -= mem
|
||||
server.memUsed -= int64(unsafe.Sizeof(key))
|
||||
server.memUsed -= int64(len(key))
|
||||
|
||||
// Delete the key from keyLocks and store.
|
||||
delete(server.store[database], key)
|
||||
|
||||
@@ -421,22 +443,21 @@ func (server *SugarDB) adjustMemoryUsage(ctx context.Context) error {
|
||||
|
||||
// Check if memory usage is above max-memory.
|
||||
// If it is, pop items from the cache until we get under the limit.
|
||||
var memStats runtime.MemStats
|
||||
runtime.ReadMemStats(&memStats)
|
||||
// If we're using less memory than the max-memory, there's no need to evict.
|
||||
if memStats.HeapInuse < server.config.MaxMemory {
|
||||
if uint64(server.memUsed) < server.config.MaxMemory {
|
||||
return nil
|
||||
}
|
||||
// Force a garbage collection first before we start evicting keys.
|
||||
runtime.GC()
|
||||
runtime.ReadMemStats(&memStats)
|
||||
if memStats.HeapInuse < server.config.MaxMemory {
|
||||
if uint64(server.memUsed) < server.config.MaxMemory {
|
||||
return nil
|
||||
}
|
||||
|
||||
// We've done a GC, but we're still at or above the max memory limit.
|
||||
// Start a loop that evicts keys until either the heap is empty or
|
||||
// we're below the max memory limit.
|
||||
|
||||
log.Printf("Memory used: %v, Max Memory: %v", server.GetServerInfo().MemoryUsed, server.GetServerInfo().MaxMemory)
|
||||
switch {
|
||||
case slices.Contains([]string{constants.AllKeysLFU, constants.VolatileLFU}, strings.ToLower(server.config.EvictionPolicy)):
|
||||
// Remove keys from LFU cache until we're below the max memory limit or
|
||||
@@ -453,20 +474,21 @@ func (server *SugarDB) adjustMemoryUsage(ctx context.Context) error {
|
||||
if !server.isInCluster() {
|
||||
// If in standalone mode, directly delete the key
|
||||
if err := server.deleteKey(ctx, key); err != nil {
|
||||
|
||||
log.Printf("Evicting key %v from database %v \n", key, database)
|
||||
return fmt.Errorf("adjustMemoryUsage -> LFU cache eviction: %+v", err)
|
||||
}
|
||||
} else if server.isInCluster() && server.raft.IsRaftLeader() {
|
||||
// If in raft cluster, send command to delete key from cluster
|
||||
if err := server.raftApplyDeleteKey(ctx, key); err != nil {
|
||||
|
||||
return fmt.Errorf("adjustMemoryUsage -> LFU cache eviction: %+v", err)
|
||||
}
|
||||
}
|
||||
// Run garbage collection
|
||||
runtime.GC()
|
||||
// Return if we're below max memory
|
||||
runtime.ReadMemStats(&memStats)
|
||||
if memStats.HeapInuse < server.config.MaxMemory {
|
||||
if uint64(server.memUsed) < server.config.MaxMemory {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@@ -499,8 +521,7 @@ func (server *SugarDB) adjustMemoryUsage(ctx context.Context) error {
|
||||
// Run garbage collection
|
||||
runtime.GC()
|
||||
// Return if we're below max memory
|
||||
runtime.ReadMemStats(&memStats)
|
||||
if memStats.HeapInuse < server.config.MaxMemory {
|
||||
if uint64(server.memUsed) < server.config.MaxMemory {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@@ -523,18 +544,19 @@ func (server *SugarDB) adjustMemoryUsage(ctx context.Context) error {
|
||||
// If in standalone mode, directly delete the key
|
||||
if err := server.deleteKey(ctx, key); err != nil {
|
||||
log.Printf("Evicting key %v from database %v \n", key, db)
|
||||
|
||||
return fmt.Errorf("adjustMemoryUsage -> all keys random: %+v", err)
|
||||
}
|
||||
} else if server.isInCluster() && server.raft.IsRaftLeader() {
|
||||
if err := server.raftApplyDeleteKey(ctx, key); err != nil {
|
||||
|
||||
return fmt.Errorf("adjustMemoryUsage -> all keys random: %+v", err)
|
||||
}
|
||||
}
|
||||
// Run garbage collection
|
||||
runtime.GC()
|
||||
// Return if we're below max memory
|
||||
runtime.ReadMemStats(&memStats)
|
||||
if memStats.HeapInuse < server.config.MaxMemory {
|
||||
if uint64(server.memUsed) < server.config.MaxMemory {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@@ -557,10 +579,12 @@ func (server *SugarDB) adjustMemoryUsage(ctx context.Context) error {
|
||||
// If in standalone mode, directly delete the key
|
||||
if err := server.deleteKey(ctx, key); err != nil {
|
||||
log.Printf("Evicting key %v from database %v \n", key, database)
|
||||
|
||||
return fmt.Errorf("adjustMemoryUsage -> volatile keys random: %+v", err)
|
||||
}
|
||||
} else if server.isInCluster() && server.raft.IsRaftLeader() {
|
||||
if err := server.raftApplyDeleteKey(ctx, key); err != nil {
|
||||
|
||||
return fmt.Errorf("adjustMemoryUsage -> volatile keys randome: %+v", err)
|
||||
}
|
||||
}
|
||||
@@ -568,8 +592,7 @@ func (server *SugarDB) adjustMemoryUsage(ctx context.Context) error {
|
||||
// Run garbage collection
|
||||
runtime.GC()
|
||||
// Return if we're below max memory
|
||||
runtime.ReadMemStats(&memStats)
|
||||
if memStats.HeapInuse < server.config.MaxMemory {
|
||||
if uint64(server.memUsed) < server.config.MaxMemory {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
@@ -70,11 +70,15 @@ type SugarDB struct {
|
||||
|
||||
// Global read-write mutex for entire store.
|
||||
storeLock *sync.RWMutex
|
||||
|
||||
// Data store to hold the keys and their associated data, expiry time, etc.
|
||||
// The int key on the outer map represents the database index.
|
||||
// Each database has a map that has a string key and the key data (value and expiry time).
|
||||
store map[int]map[string]internal.KeyData
|
||||
|
||||
// memUsed tracks the memory usage of the data in the store.
|
||||
memUsed int64
|
||||
|
||||
// Holds all the keys that are currently associated with an expiry.
|
||||
keysWithExpiry struct {
|
||||
// Mutex as only one process should be able to update this list at a time.
|
||||
@@ -163,6 +167,7 @@ func NewSugarDB(options ...func(sugarDB *SugarDB)) (*SugarDB, error) {
|
||||
},
|
||||
storeLock: &sync.RWMutex{},
|
||||
store: make(map[int]map[string]internal.KeyData),
|
||||
memUsed: 0,
|
||||
keysWithExpiry: struct {
|
||||
rwMutex sync.RWMutex
|
||||
keys map[int][]string
|
||||
|
@@ -610,6 +610,8 @@ func Test_Cluster(t *testing.T) {
|
||||
Mode: "cluster",
|
||||
Role: "master",
|
||||
Modules: nodes[0].server.ListModules(),
|
||||
MemoryUsed: nodes[0].server.memUsed,
|
||||
MaxMemory: nodes[0].server.config.MaxMemory,
|
||||
},
|
||||
{
|
||||
Server: "sugardb",
|
||||
@@ -618,6 +620,8 @@ func Test_Cluster(t *testing.T) {
|
||||
Mode: "cluster",
|
||||
Role: "replica",
|
||||
Modules: nodes[1].server.ListModules(),
|
||||
MemoryUsed: nodes[1].server.memUsed,
|
||||
MaxMemory: nodes[1].server.config.MaxMemory,
|
||||
},
|
||||
{
|
||||
Server: "sugardb",
|
||||
@@ -626,6 +630,8 @@ func Test_Cluster(t *testing.T) {
|
||||
Mode: "cluster",
|
||||
Role: "replica",
|
||||
Modules: nodes[2].server.ListModules(),
|
||||
MemoryUsed: nodes[2].server.memUsed,
|
||||
MaxMemory: nodes[2].server.config.MaxMemory,
|
||||
},
|
||||
{
|
||||
Server: "sugardb",
|
||||
@@ -634,6 +640,8 @@ func Test_Cluster(t *testing.T) {
|
||||
Mode: "cluster",
|
||||
Role: "replica",
|
||||
Modules: nodes[3].server.ListModules(),
|
||||
MemoryUsed: nodes[3].server.memUsed,
|
||||
MaxMemory: nodes[3].server.config.MaxMemory,
|
||||
},
|
||||
{
|
||||
Server: "sugardb",
|
||||
@@ -642,11 +650,13 @@ func Test_Cluster(t *testing.T) {
|
||||
Mode: "cluster",
|
||||
Role: "replica",
|
||||
Modules: nodes[4].server.ListModules(),
|
||||
MemoryUsed: nodes[4].server.memUsed,
|
||||
MaxMemory: nodes[4].server.config.MaxMemory,
|
||||
},
|
||||
}
|
||||
for i := 0; i < len(nodes); i++ {
|
||||
if diff := deep.Equal(nodes[i].server.GetServerInfo(), nodeInfo[i]); diff != nil {
|
||||
t.Errorf("GetServerInfo() - node %d: %+v", i, err)
|
||||
t.Errorf("GetServerInfo() - node %d: %+v expected %v got %v", i, err, nodes[i].server.GetServerInfo(), nodeInfo[i])
|
||||
return
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user