diff --git a/.github/workflows/test-leveldb.yml b/.github/workflows/test-leveldb.yml new file mode 100644 index 00000000..cb7383ae --- /dev/null +++ b/.github/workflows/test-leveldb.yml @@ -0,0 +1,28 @@ +on: + push: + branches: + - master + - main + paths: + - 'leveldb/**' + pull_request: + paths: + - 'leveldb/**' +name: "Tests LevelDB" +jobs: + Tests: + strategy: + matrix: + go-version: + - 1.23.x + - 1.24.x + runs-on: ubuntu-latest + steps: + - name: Fetch Repository + uses: actions/checkout@v4 + - name: Install Go + uses: actions/setup-go@v5 + with: + go-version: '${{ matrix.go-version }}' + - name: Test LevelDB + run: cd ./leveldb && go test ./... -v -race diff --git a/.gitignore b/.gitignore index 78e20aa2..e49c0051 100644 --- a/.gitignore +++ b/.gitignore @@ -22,6 +22,7 @@ *.fasthttp.gz *.pprof *.workspace +/tmp/ # Dependencies /vendor/ diff --git a/MIGRATE.md b/MIGRATE.md new file mode 100644 index 00000000..8b2776dd --- /dev/null +++ b/MIGRATE.md @@ -0,0 +1,22 @@ +This document contains instructions for migrating to various storage versions. + +### 0.1 -> 0.2 +v0.2 fixes [a bug](https://github.com/gofiber/fiber/issues/1258) in MYSQL, Postgres and Arangodb in which +inserting non-UTF8 characters would trigger a panic due to the values being saved in a TEXT column instead of a +BYTEA/BLOB column. Migration instructions (note you may need to adjust the table names if you have supplied a custom +config to the storage): + +**Postgres** +```sql +ALTER TABLE fiber_storage +ALTER COLUMN v TYPE BYTEA USING v::bytea; +``` + +**MYSQL** +```sql +ALTER TABLE fiber_storage MODIFY COLUMN v BLOB; +``` + +**Arangodb** + +No migration other then updating the library is necessary. diff --git a/README.md b/README.md index 8ec2fcb8..ddb141b3 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,7 @@ type Storage interface { - [Couchbase](./couchbase/README.md) - [DynamoDB](./dynamodb/README.md) - [Etcd](./etcd/README.md) +- [LevelDB](./leveldb/README.md) LevelDB Tests Status - [Memcache](./memcache/README.md) - [Memory](./memory/README.md) - [Minio](./minio/README.md) @@ -76,4 +77,4 @@ type Storage interface { - [ScyllaDB](./scylladb/README.md) - [SQLite3](./sqlite3/README.md) - [ClickHouse](./clickhouse/README.md) -- [Valkey](./valkey/README.md) +- [Valkey](./valkey/README.md) \ No newline at end of file diff --git a/leveldb/README.md b/leveldb/README.md new file mode 100644 index 00000000..8b5905d9 --- /dev/null +++ b/leveldb/README.md @@ -0,0 +1,174 @@ +--- +id: leveldb +title: LevelDB +--- + +![Release](https://img.shields.io/github/v/tag/gofiber/storage?filter=leveldb*) +[![Discord](https://img.shields.io/discord/704680098577514527?style=flat&label=%F0%9F%92%AC%20discord&color=00ACD7)](https://gofiber.io/discord) +![Test](https://img.shields.io/github/actions/workflow/status/gofiber/storage/test-leveldb.yml?label=Tests) +![Linter](https://img.shields.io/github/actions/workflow/status/gofiber/storage/linter.yml?label=Linter) + +A fast key-value DB using [syndtr/goleveldb](https://github.com/syndtr/goleveldb) + +### Table of Contents + +- [Signatures](#signatures) +- [Installation](#installation) +- [Examples](#examples) +- [Config](#config) +- [Default Config](#default-config) + +### Signatures + +```go +func New(config ...Config) Storage +func (s *Storage) Get(key string) ([]byte, error) +func (s *Storage) Set(key string, val []byte, exp time.Duration) error +func (s *Storage) Delete(key string) error +func (s *Storage) Reset() error +func (s *Storage) Close() error +func (s *Storage) Conn() *leveldb.DB +``` + +### Installation + +LevelDB is tested on the 2 last [Go versions](https://golang.org/dl/) with support for modules. So make sure to initialize one first if you didn't do that yet: + +```bash +go mod init github.com// +``` + +And then install the leveldb implementation: + +```bash +go get github.com/gofiber/storage/leveldb +``` + +### Examples + +Import the storage package. + +```go +import "github.com/gofiber/storage/leveldb" +``` + +You can use the following possibilities to create a storage: + +```go +// Initialize default config +store := leveldb.New() + +// Initialize custom config +store := leveldb.New(leveldb.Config{ + Path: "./testdb", + GCInterval: 10 * time.Second, +}) +``` + +### Config + +```go +type Config struct { + // Path is the filesystem path for the database + // + // Optional. Default is "./fiber.leveldb" + Path string + + // CacheSize is the size of LevelDB's cache (in MB) + // + // Optional. Default is 8MB + CacheSize int + + // BlockSize is the size of data blocks (in KB) + // + // Optional. Default is 4KB + BlockSize int + + // WriteBuffer is the size of write buffer (in MB) + // + // Optional. Default is 4MB + WriteBuffer int + + // CompactionL0Trigger is the number of level-0 tables that triggers compaction + // + // Optional. Default is 4 + CompactionL0Trigger int + + // WriteL0PauseTrigger is the number of level-0 tables that triggers write pause + // + // Optional. Default is 12 + WriteL0PauseTrigger int + + // WriteL0SlowdownTrigger is the number of level-0 tables that triggers write slowdown + // + // Optional. Default is 8 + WriteL0SlowdownTrigger int + + // MaxOpenFiles is the maximum number of open files that can be held + // + // Optional. Default is 200 on MacOS, 500 on others + MaxOpenFiles int + + // CompactionTableSize is the size of compaction table (in MB) + // + // Optional. Default is 2MB + CompactionTableSize int + + // BloomFilterBits is the number of bits used in bloom filter + // + // Optional. Default is 10 bits/key + BloomFilterBits int + + // NoSync completely disables fsync + // + // Optional. Default is false + NoSync bool + + // ReadOnly opens the database in read-only mode + // + // Optional. Default is false + ReadOnly bool + + // ErrorIfMissing returns error if database doesn't exist + // + // Optional. Default is false + ErrorIfMissing bool + + // ErrorIfExist returns error if database exists + // + // Optional. Default is false + ErrorIfExist bool + + // GCInterval is the garbage collection interval + // + // Optional. Default is 10 minutes + GCInterval time.Duration +} +``` + +### Default Config + +```go +var ConfigDefault = Config{ + Path: "./fiber.leveldb", + CacheSize: 8, // 8 MB + BlockSize: 4, // 4 KB + WriteBuffer: 4, // 4 MB + CompactionL0Trigger: 4, + WriteL0PauseTrigger: 12, + WriteL0SlowdownTrigger: 8, + MaxOpenFiles: func() int { + if runtime.GOOS == "darwin" { + return 200 // MacOS + } + return 500 // Unix/Linux + }(), + CompactionTableSize: 2, // 2 MB + BloomFilterBits: 10, // 10 bits per key + NoSync: false, + ReadOnly: false, + ErrorIfMissing: false, + ErrorIfExist: false, + GCInterval: 10 * time.Minute, +} +``` diff --git a/leveldb/config.go b/leveldb/config.go new file mode 100644 index 00000000..6bd9c535 --- /dev/null +++ b/leveldb/config.go @@ -0,0 +1,163 @@ +package leveldb + +import ( + "runtime" + "time" +) + +// Config holds the configuration options for LevelDB database +type Config struct { + // Path is the filesystem path for the database + // + // Optional. Default is "./fiber.leveldb" + Path string + + // CacheSize is the size of LevelDB's cache (in MB) + // + // Optional. Default is 8MB + CacheSize int + + // BlockSize is the size of data blocks (in KB) + // + // Optional. Default is 4KB + BlockSize int + + // WriteBuffer is the size of write buffer (in MB) + // + // Optional. Default is 4MB + WriteBuffer int + + // CompactionL0Trigger is the number of level-0 tables that triggers compaction + // + // Optional. Default is 4 + CompactionL0Trigger int + + // WriteL0PauseTrigger is the number of level-0 tables that triggers write pause + // + // Optional. Default is 12 + WriteL0PauseTrigger int + + // WriteL0SlowdownTrigger is the number of level-0 tables that triggers write slowdown + // + // Optional. Default is 8 + WriteL0SlowdownTrigger int + + // MaxOpenFiles is the maximum number of open files that can be held + // + // Optional. Default is 200 on MacOS, 500 on others + MaxOpenFiles int + + // CompactionTableSize is the size of compaction table (in MB) + // + // Optional. Default is 2MB + CompactionTableSize int + + // BloomFilterBits is the number of bits used in bloom filter + // + // Optional. Default is 10 bits/key + BloomFilterBits int + + // NoSync completely disables fsync + // + // Optional. Default is false + NoSync bool + + // ReadOnly opens the database in read-only mode + // + // Optional. Default is false + ReadOnly bool + + // ErrorIfMissing returns error if database doesn't exist + // + // Optional. Default is false + ErrorIfMissing bool + + // ErrorIfExist returns error if database exists + // + // Optional. Default is false + ErrorIfExist bool + + // GCInterval is the garbage collection interval + // + // Optional. Default is 10 minutes + GCInterval time.Duration +} + +// ConfigDefault is the default config +var ConfigDefault = Config{ + Path: "./fiber.leveldb", + CacheSize: 8, // 8 MB + BlockSize: 4, // 4 KB + WriteBuffer: 4, // 4 MB + CompactionL0Trigger: 4, + WriteL0PauseTrigger: 12, + WriteL0SlowdownTrigger: 8, + MaxOpenFiles: func() int { + if runtime.GOOS == "darwin" { + return 200 // MacOS + } + return 500 // Unix/Linux + }(), + CompactionTableSize: 2, // 2 MB + BloomFilterBits: 10, // 10 bits per key + NoSync: false, + ReadOnly: false, + ErrorIfMissing: false, + ErrorIfExist: false, + GCInterval: 10 * time.Minute, +} + +// configDefault is a helper function to set default values for the config +func configDefault(config ...Config) Config { + if len(config) < 1 { + return ConfigDefault + } + + cfg := config[0] + + if cfg.Path == "" { + cfg.Path = ConfigDefault.Path + } + + if cfg.CacheSize <= 0 { + cfg.CacheSize = ConfigDefault.CacheSize + } + + if cfg.BlockSize <= 0 { + cfg.BlockSize = ConfigDefault.BlockSize + } + + if cfg.WriteBuffer <= 0 { + cfg.WriteBuffer = ConfigDefault.WriteBuffer + } + + if cfg.CompactionL0Trigger <= 0 { + cfg.CompactionL0Trigger = ConfigDefault.CompactionL0Trigger + } + + if cfg.WriteL0PauseTrigger <= 0 { + cfg.WriteL0PauseTrigger = ConfigDefault.WriteL0PauseTrigger + } + + if cfg.WriteL0SlowdownTrigger <= 0 { + cfg.WriteL0SlowdownTrigger = ConfigDefault.WriteL0SlowdownTrigger + } + + if cfg.MaxOpenFiles <= 0 { + cfg.MaxOpenFiles = ConfigDefault.MaxOpenFiles + } + + if cfg.CompactionTableSize <= 0 { + cfg.CompactionTableSize = ConfigDefault.CompactionTableSize + } + + if cfg.BloomFilterBits <= 0 { + cfg.BloomFilterBits = ConfigDefault.BloomFilterBits + } + + if cfg.GCInterval <= 0 { + cfg.GCInterval = ConfigDefault.GCInterval + } + + return cfg +} diff --git a/leveldb/config_test.go b/leveldb/config_test.go new file mode 100644 index 00000000..962831e8 --- /dev/null +++ b/leveldb/config_test.go @@ -0,0 +1,24 @@ +package leveldb + +import ( + "runtime" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestConfigConfigMaxOpenFiles(t *testing.T) { + cfg := Config{ + MaxOpenFiles: 1000, + } + require.Equal(t, 1000, cfg.MaxOpenFiles) +} + +func TestConfigDefaultDarwin(t *testing.T) { // MacOS + cfg := configDefault() + if runtime.GOOS == "darwin" { + require.Equal(t, 200, cfg.MaxOpenFiles) + } else { + require.Equal(t, 500, cfg.MaxOpenFiles) + } +} diff --git a/leveldb/go.mod b/leveldb/go.mod new file mode 100644 index 00000000..a2dd2cf5 --- /dev/null +++ b/leveldb/go.mod @@ -0,0 +1,12 @@ +module github.com/gofiber/storage/leveldb + +go 1.23 + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/stretchr/testify v1.10.0 // indirect + github.com/syndtr/goleveldb v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/leveldb/go.sum b/leveldb/go.sum new file mode 100644 index 00000000..8495d2a5 --- /dev/null +++ b/leveldb/go.sum @@ -0,0 +1,26 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w= +github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE= +github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/leveldb/leveldb.go b/leveldb/leveldb.go new file mode 100644 index 00000000..cadb41e6 --- /dev/null +++ b/leveldb/leveldb.go @@ -0,0 +1,159 @@ +package leveldb + +import ( + "encoding/json" + "time" + + "github.com/syndtr/goleveldb/leveldb" +) + +// data structure for storing items in the database +type item struct { + Value []byte `json:"value"` + ExpireAt time.Time `json:"expire_at"` +} + +// Storage interface that is implemented by storage providers +type Storage struct { + db *leveldb.DB + gcInterval time.Duration + done chan struct{} +} + +// New creates a new memory storage +func New(config ...Config) *Storage { + cfg := configDefault(config...) + + db, err := leveldb.OpenFile(cfg.Path, nil) + if err != nil { + panic(err) + } + + store := &Storage{ + db: db, + gcInterval: cfg.GCInterval, + done: make(chan struct{}), + } + + go store.gc() + + return store +} + +// Get value by key +func (s *Storage) Get(key []byte) ([]byte, error) { + if len(key) <= 0 { + return nil, nil + } + + data, err := s.db.Get(key, nil) + if err != nil { + return nil, nil + } + + var stored item + if err := json.Unmarshal(data, &stored); err != nil { + return data, nil + } + + if !stored.ExpireAt.IsZero() && time.Now().After(stored.ExpireAt) { + if err := s.Delete(string(key)); err != nil { + return nil, err + } + return nil, nil + } + + return stored.Value, nil +} + +// Set key with value +func (s *Storage) Set(key, value []byte, exp time.Duration) error { + if len(key) <= 0 || len(value) <= 0 { + return nil + } + if exp == 0 { + return s.db.Put(key, value, nil) + } + + data := item{ + Value: value, + ExpireAt: time.Now().Add(exp), + } + + encoded, err := json.Marshal(data) + if err != nil { + return err + } + return s.db.Put(key, encoded, nil) +} + +// Delete key by key +func (s *Storage) Delete(key string) error { + if len(key) <= 0 { + return nil + } + + return s.db.Delete([]byte(key), nil) +} + +// Reset all keys +func (s *Storage) Reset() error { + iter := s.db.NewIterator(nil, nil) + defer iter.Release() + + for iter.Next() { + key := iter.Key() + if err := s.db.Delete(key, nil); err != nil { + return err + } + } + + return iter.Error() +} + +// Close the memory storage +func (s *Storage) Close() error { + s.done <- struct{}{} // GC stop + close(s.done) + return s.db.Close() +} + +// Return database client +func (s *Storage) Conn() *leveldb.DB { + return s.db +} + +// gc is a helper function to clean up expired keys +func (s *Storage) gc() { + ticker := time.NewTicker(s.gcInterval) + defer ticker.Stop() + + for { + select { + case <-s.done: + return + case <-ticker.C: + iter := s.db.NewIterator(nil, nil) + batch := new(leveldb.Batch) + + for iter.Next() { + key := iter.Key() + data := iter.Value() + + var stored item + if err := json.Unmarshal(data, &stored); err != nil { + continue + } + if !stored.ExpireAt.IsZero() && time.Now().After(stored.ExpireAt) { + batch.Delete(key) + } + } + + iter.Release() + + if batch.Len() > 0 { + _ = s.db.Write(batch, nil) + } + } + } +} diff --git a/leveldb/leveldb_test.go b/leveldb/leveldb_test.go new file mode 100644 index 00000000..e4f10db0 --- /dev/null +++ b/leveldb/leveldb_test.go @@ -0,0 +1,284 @@ +package leveldb + +import ( + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func removeAllFiles(dir string) error { + return os.RemoveAll(dir) +} + +func Test_New_EmptyConfig(t *testing.T) { + db := New() + require.NotNil(t, db) + + _, err := os.Stat("./fiber.leveldb") + require.Nil(t, err) + + err = removeAllFiles("./fiber.leveldb") + require.Nil(t, err) +} + +func Test_New_WithConfig(t *testing.T) { + db := New(Config{ + Path: "./testdb", + }) + require.NotNil(t, db) + _, err := os.Stat("./testdb") + require.Nil(t, err) + + err = removeAllFiles("./testdb") + require.Nil(t, err) +} + +func Test_Set_Overwrite(t *testing.T) { + db := New() + + db.Set([]byte("key"), []byte("value"), time.Second*1) + db.Set([]byte("key"), []byte("value2"), time.Second*1) + + value, err := db.Get([]byte("key")) + require.Nil(t, err) + require.Equal(t, []byte("value2"), value) + + err = removeAllFiles("./fiber.leveldb") + require.Nil(t, err) +} + +func Test_Get_For0Second(t *testing.T) { + db := New() + + db.Set([]byte("key"), []byte("value"), 0) + + _, err := db.Get([]byte("key")) + require.Nil(t, err) + + err = removeAllFiles("./fiber.leveldb") + require.Nil(t, err) +} + +func Test_Get_ForExpired100Millisecond(t *testing.T) { + db := New() + + require.NoError(t, db.Set([]byte("key"), []byte("value"), time.Millisecond*100)) + + // Anahtarın silinmesini bekle + deadline := time.Now().Add(time.Second) + for time.Now().Before(deadline) { + value, err := db.Get([]byte("key")) + if err == nil && value == nil { + break + } + time.Sleep(time.Millisecond * 10) + } + + value, err := db.Get([]byte("key")) + require.Nil(t, err) + require.Nil(t, value) + + err = removeAllFiles("./fiber.leveldb") + require.Nil(t, err) +} + +func Test_Delete_WhileThereIsData(t *testing.T) { + db := New() + + db.Set([]byte("key"), []byte("value"), time.Second*1) + + err := db.Delete("key") + require.Nil(t, err) + + value, err := db.Get([]byte("key")) + require.Nil(t, err) + require.Nil(t, value) + + err = removeAllFiles("./fiber.leveldb") + require.Nil(t, err) + +} + +func Test_Reset(t *testing.T) { + db := New() + + db.Set([]byte("key1"), []byte("value1"), time.Second*1) + db.Set([]byte("key2"), []byte("value2"), time.Second*1) + db.Set([]byte("key3"), []byte("value3"), time.Second*1) + + require.NoError(t, db.Reset()) + + value, err := db.Get([]byte("key1")) + require.Nil(t, err) + require.Nil(t, value) + + value, err = db.Get([]byte("key2")) + require.Nil(t, err) + require.Nil(t, value) + + value, err = db.Get([]byte("key3")) + require.Nil(t, err) + require.Nil(t, value) + + err = removeAllFiles("./fiber.leveldb") + require.Nil(t, err) + +} + +func Test_Close(t *testing.T) { + db := New() + + db.Close() + + err := db.Conn().Put([]byte("key"), []byte("value"), nil) + require.Error(t, err) + + err = removeAllFiles("./fiber.leveldb") + require.Nil(t, err) +} + +func Test_GarbageCollection_AfterWorking(t *testing.T) { + db := New(Config{ + GCInterval: time.Millisecond * 100, + }) + + require.NoError(t, db.Set([]byte("key"), []byte("value"), time.Millisecond*100)) + + deadline := time.Now().Add(time.Second) + for time.Now().Before(deadline) { + _, err := db.Conn().Get([]byte("key"), nil) + if err != nil { + break + } + time.Sleep(time.Millisecond * 10) + } + + value, err := db.Conn().Get([]byte("key"), nil) + require.Error(t, err) + require.Equal(t, []byte{}, value) + + err = removeAllFiles("./fiber.leveldb") + require.Nil(t, err) +} + +func Test_GarbageCollection_BeforeWorking(t *testing.T) { + t.Cleanup(func() { + require.NoError(t, removeAllFiles("./fiber.leveldb")) + }) + + db := New(Config{ + GCInterval: time.Second * 1, + }) + require.NoError(t, db.Set([]byte("key"), []byte("value"), time.Second*1)) + + value, err := db.Conn().Get([]byte("key"), nil) + require.Nil(t, err) + require.NotNil(t, value) +} + +func Test_GarbageCollection_Interval(t *testing.T) { + t.Cleanup(func() { + require.NoError(t, removeAllFiles("./fiber.leveldb")) + }) + + db := New(Config{ + GCInterval: time.Hour, // Uzun aralık + }) + require.NoError(t, db.Set([]byte("key"), []byte("value"), time.Millisecond)) + + // GC çalışmadığı için değer hala var olmalı + deadline := time.Now().Add(time.Millisecond * 100) + for time.Now().Before(deadline) { + value, err := db.Conn().Get([]byte("key"), nil) + if err == nil && value != nil { + return + } + time.Sleep(time.Millisecond * 10) + } + + t.Error("value should still exist as GC hasn't run yet") +} + +func Test_Close_Channel(t *testing.T) { + db := New() + + err := db.Close() + require.Nil(t, err) + + select { + case _, ok := <-db.done: + require.False(t, ok, "channel should be closed") + default: + t.Error("channel should be closed") + } + + err = removeAllFiles("./fiber.leveldb") + require.Nil(t, err) +} + +func Benchmark_Set(b *testing.B) { + db := New() + defer func() { + _ = db.Close() + _ = removeAllFiles("./fiber.leveldb") + }() + + key := []byte("test_key") + value := []byte("test_value") + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + if err := db.Set(key, value, 0); err != nil { + b.Fatal(err) + } + } + }) +} + +func Benchmark_Get(b *testing.B) { + db := New() + defer func() { + _ = db.Close() + _ = removeAllFiles("./fiber.leveldb") + }() + + key := []byte("test_key") + value := []byte("test_value") + if err := db.Set(key, value, 0); err != nil { + b.Fatal(err) + } + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + if _, err := db.Get(key); err != nil { + b.Fatal(err) + } + } + }) +} + +func Benchmark_Delete(b *testing.B) { + db := New() + defer func() { + _ = db.Close() + _ = removeAllFiles("./fiber.leveldb") + }() + + key := "test_key" + if err := db.Set([]byte(key), []byte("test_value"), 0); err != nil { + b.Fatal(err) + } + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + if err := db.Delete(key); err != nil { + b.Fatal(err) + } + } + }) +} diff --git a/postgres/postgres.go b/postgres/postgres.go index e995e954..3250b187 100644 --- a/postgres/postgres.go +++ b/postgres/postgres.go @@ -26,19 +26,31 @@ type Storage struct { } var ( - checkSchemaMsg = "The `v` row has an incorrect data type. " + - "It should be BYTEA but is instead %s. This will cause encoding-related panics if the DB is not migrated (see https://github.com/gofiber/storage/blob/main/MIGRATE.md)." - dropQuery = `DROP TABLE IF EXISTS %s;` + checkSchemaMsg = "The `%s` row has an incorrect data type. " + + "It should be %s but is instead %s. This will cause encoding-related panics if the DB is not migrated (see https://github.com/gofiber/storage/blob/main/MIGRATE.md)." + dropQuery = `DROP TABLE IF EXISTS %s;` + checkTableExistsQuery = `SELECT COUNT(table_name) + FROM information_schema.tables + WHERE table_schema = '%s' + AND table_name = '%s';` initQuery = []string{ - `CREATE TABLE IF NOT EXISTS %s ( + `CREATE TABLE %s ( k VARCHAR(64) PRIMARY KEY NOT NULL DEFAULT '', v BYTEA NOT NULL, e BIGINT NOT NULL DEFAULT '0' );`, `CREATE INDEX IF NOT EXISTS e ON %s (e);`, } - checkSchemaQuery = `SELECT DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS - WHERE table_name = '%s' AND COLUMN_NAME = 'v';` + checkSchemaQuery = `SELECT column_name, data_type + FROM information_schema.columns + WHERE table_schema = '%s' + AND table_name = '%s' + AND column_name IN ('k','v','e');` + checkSchemaTargetDataType = map[string]string{ + "k": "character varying", + "v": "bytea", + "e": "bigint", + } ) // New creates a new storage @@ -61,6 +73,14 @@ func New(config ...Config) *Storage { panic(err) } + // Parse out schema in config, if provided + schema := "public" + tableName := cfg.Table + if strings.Contains(cfg.Table, ".") { + schema = strings.Split(cfg.Table, ".")[0] + tableName = strings.Split(cfg.Table, ".")[1] + } + // Drop table if set to true if cfg.Reset { if _, err := db.Exec(context.Background(), fmt.Sprintf(dropQuery, cfg.Table)); err != nil { @@ -69,11 +89,23 @@ func New(config ...Config) *Storage { } } + // Determine if table exists + tableExists := false + row := db.QueryRow(context.Background(), fmt.Sprintf(checkTableExistsQuery, schema, tableName)) + var count int + if err := row.Scan(&count); err != nil { + db.Close() + panic(err) + } + tableExists = count > 0 + // Init database queries - for _, query := range initQuery { - if _, err := db.Exec(context.Background(), fmt.Sprintf(query, cfg.Table)); err != nil { - db.Close() - panic(err) + if !tableExists { + for _, query := range initQuery { + if _, err := db.Exec(context.Background(), fmt.Sprintf(query, cfg.Table)); err != nil { + db.Close() + panic(err) + } } } @@ -185,15 +217,41 @@ func (s *Storage) gc(t time.Time) { _, _ = s.db.Exec(context.Background(), s.sqlGC, t.Unix()) } -func (s *Storage) checkSchema(tableName string) { - var data []byte +func (s *Storage) checkSchema(fullTableName string) { + schema := "public" + tableName := fullTableName + if strings.Contains(fullTableName, ".") { + schema = strings.Split(fullTableName, ".")[0] + tableName = strings.Split(fullTableName, ".")[1] + } - row := s.db.QueryRow(context.Background(), fmt.Sprintf(checkSchemaQuery, tableName)) - if err := row.Scan(&data); err != nil { + rows, err := s.db.Query(context.Background(), fmt.Sprintf(checkSchemaQuery, schema, tableName)) + if err != nil { panic(err) } + defer rows.Close() - if strings.ToLower(string(data)) != "bytea" { - fmt.Printf(checkSchemaMsg, string(data)) + data := make(map[string]string) + + rowCount := 0 + for rows.Next() { + var columnName, dataType string + if err := rows.Scan(&columnName, &dataType); err != nil { + panic(err) + } + data[columnName] = dataType + rowCount++ + } + if rowCount == 0 { + panic(fmt.Errorf("table %s does not exist", tableName)) + } + for columnName, dataType := range checkSchemaTargetDataType { + dt, ok := data[columnName] + if !ok { + panic(fmt.Errorf("required column %s does not exist in table %s", columnName, tableName)) + } + if dt != dataType { + panic(fmt.Errorf(checkSchemaMsg, columnName, dataType, dt)) + } } } diff --git a/postgres/postgres_test.go b/postgres/postgres_test.go index b67b7b31..a2c2b3e9 100644 --- a/postgres/postgres_test.go +++ b/postgres/postgres_test.go @@ -3,6 +3,7 @@ package postgres import ( "context" "os" + "strconv" "testing" "time" @@ -51,6 +52,157 @@ func newTestStore(t testing.TB) (*Storage, error) { }), nil } +func TestNoCreateUser(t *testing.T) { + // Create a new user + // give the use usage permissions to the database (but not create) + ctx := context.Background() + conn := testStore.Conn() + + username := "testuser" + strconv.Itoa(int(time.Now().UnixNano())) + password := "testpassword" + + _, err := conn.Exec(ctx, "CREATE USER "+username+" WITH PASSWORD '"+password+"'") + require.NoError(t, err) + + _, err = conn.Exec(ctx, "GRANT CONNECT ON DATABASE "+os.Getenv("POSTGRES_DATABASE")+" TO "+username) + require.NoError(t, err) + + _, err = conn.Exec(ctx, "GRANT USAGE ON SCHEMA public TO "+username) + require.NoError(t, err) + + _, err = conn.Exec(ctx, "GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA public TO "+username) + require.NoError(t, err) + + _, err = conn.Exec(ctx, "ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO "+username) + require.NoError(t, err) + + _, err = conn.Exec(ctx, "REVOKE CREATE ON SCHEMA public FROM "+username) + require.NoError(t, err) + + t.Run("should panic if limited user tries to create table", func(t *testing.T) { + tableThatDoesNotExist := "public.table_does_not_exists_" + strconv.Itoa(int(time.Now().UnixNano())) + + defer func() { + r := recover() + require.NotNil(t, r, "Expected a panic when creating a table without permissions") + }() + + // This should panic since the user doesn't have CREATE permissions + New(Config{ + Database: os.Getenv("POSTGRES_DATABASE"), + Username: username, + Password: password, + Reset: true, + Table: tableThatDoesNotExist, + }) + }) + + // connect to an existing table using an unprivileged user + limitedStore := New(Config{ + Database: os.Getenv("POSTGRES_DATABASE"), + Username: username, + Password: password, + Reset: false, + }) + + defer func() { + limitedStore.Close() + conn.Exec(ctx, "DROP USER "+username) + }() + + t.Run("should set", func(t *testing.T) { + var ( + key = "john" + strconv.Itoa(int(time.Now().UnixNano())) + val = []byte("doe" + strconv.Itoa(int(time.Now().UnixNano()))) + ) + + err := limitedStore.Set(key, val, 0) + require.NoError(t, err) + }) + t.Run("should set override", func(t *testing.T) { + var ( + key = "john" + strconv.Itoa(int(time.Now().UnixNano())) + val = []byte("doe" + strconv.Itoa(int(time.Now().UnixNano()))) + ) + err := limitedStore.Set(key, val, 0) + require.NoError(t, err) + err = limitedStore.Set(key, val, 0) + require.NoError(t, err) + }) + t.Run("should get", func(t *testing.T) { + var ( + key = "john" + strconv.Itoa(int(time.Now().UnixNano())) + val = []byte("doe" + strconv.Itoa(int(time.Now().UnixNano()))) + ) + err := limitedStore.Set(key, val, 0) + require.NoError(t, err) + result, err := limitedStore.Get(key) + require.NoError(t, err) + require.Equal(t, val, result) + }) + t.Run("should set expiration", func(t *testing.T) { + var ( + key = "john" + strconv.Itoa(int(time.Now().UnixNano())) + val = []byte("doe" + strconv.Itoa(int(time.Now().UnixNano()))) + exp = 100 * time.Millisecond + ) + err := limitedStore.Set(key, val, exp) + require.NoError(t, err) + }) + t.Run("should get expired", func(t *testing.T) { + var ( + key = "john" + strconv.Itoa(int(time.Now().UnixNano())) + val = []byte("doe" + strconv.Itoa(int(time.Now().UnixNano()))) + exp = 100 * time.Millisecond + ) + err := limitedStore.Set(key, val, exp) + require.NoError(t, err) + time.Sleep(200 * time.Millisecond) + result, err := limitedStore.Get(key) + require.NoError(t, err) + require.Zero(t, len(result)) + }) + t.Run("should get not exists", func(t *testing.T) { + result, err := limitedStore.Get("nonexistentkey") + require.NoError(t, err) + require.Zero(t, len(result)) + }) + t.Run("should delete", func(t *testing.T) { + var ( + key = "john" + strconv.Itoa(int(time.Now().UnixNano())) + val = []byte("doe" + strconv.Itoa(int(time.Now().UnixNano()))) + ) + err := limitedStore.Set(key, val, 0) + require.NoError(t, err) + err = limitedStore.Delete(key) + require.NoError(t, err) + result, err := limitedStore.Get(key) + require.NoError(t, err) + require.Zero(t, len(result)) + }) + +} +func Test_Should_Panic_On_Wrong_Schema(t *testing.T) { + // Create a test table with wrong schema + _, err := testStore.Conn().Exec(context.Background(), ` + CREATE TABLE IF NOT EXISTS test_schema_table ( + k VARCHAR(64) PRIMARY KEY NOT NULL DEFAULT '', + v BYTEA NOT NULL, + e VARCHAR(64) NOT NULL DEFAULT '' -- Changed e from BIGINT to VARCHAR + ); + `) + require.NoError(t, err) + defer func() { + _, err := testStore.Conn().Exec(context.Background(), "DROP TABLE IF EXISTS test_schema_table;") + require.NoError(t, err) + }() + + // Call checkSchema with the wrong table + require.Panics(t, func() { + testStore.checkSchema("test_schema_table") + }) +} + func Test_Postgres_Set(t *testing.T) { var ( key = "john"