mirror of
https://github.com/gofiber/storage.git
synced 2025-10-16 13:41:26 +08:00
Updated Scylladb
This commit is contained in:
7
.github/workflows/test-scylladb.yml
vendored
7
.github/workflows/test-scylladb.yml
vendored
@@ -27,14 +27,9 @@ jobs:
|
||||
|
||||
- name: Run ScyllaDb
|
||||
run: |
|
||||
docker run --name scylladb -p 9042:9042 -p 9142:9142 -p 7000:7000 -p 7001:7001 -p 7199:7199 -p 10000:10000 -p 9180:9180 -p 9100:9100 -p 9160:9160 -p 19042:19042 -p 19142:19142 -d scylladb/scylla:latest --broadcast-address 127.0.0.1 --listen-address 0.0.0.0 --broadcast-rpc-address 127.0.0.1
|
||||
docker run --name scylladb -p 9042:9042 -p 19042:19042 -p 9160:9160 -p 10000 -p 7000:7000 -p 7001:7001 -p 7199:7199 -p 9180:9180 -d scylladb/scylla:latest --broadcast-address 127.0.0.1 --listen-address 0.0.0.0 --broadcast-rpc-address 127.0.0.1
|
||||
sleep 30 # Wait for ScyllaDb to initialize
|
||||
|
||||
- name: Create Default Keyspace
|
||||
run: |
|
||||
docker exec --tty scylladb cqlsh -e "CREATE KEYSPACE IF NOT EXISTS scylla_db WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1};"
|
||||
sleep 10 # Wait for ScyllaDb to initialize
|
||||
|
||||
- name: Install Go
|
||||
uses: actions/setup-go@v4
|
||||
with:
|
||||
|
@@ -1,6 +1,11 @@
|
||||
# ScyllaDB
|
||||
---
|
||||
id: scylladb
|
||||
title: ScyllaDb
|
||||
---
|
||||
|
||||
A ScyllaDB storage driver using [gocql/gocql]("https://github.com/gocql/gocql").
|
||||
# ScyllaDb
|
||||
|
||||
A ScyllaDb storage engine for [Fiber](github.com/gofiber/fiber) using [gocql](github.com/gocql/gocql).
|
||||
|
||||
### Table of Contents
|
||||
- [Signatures](#signatures)
|
||||
@@ -13,14 +18,15 @@ A ScyllaDB storage driver using [gocql/gocql]("https://github.com/gocql/gocql").
|
||||
```go
|
||||
func New(config ...Config) Storage
|
||||
func (s *Storage) Get(key string) ([]byte, error)
|
||||
func (s *Storage) Set(key string, val []byte, exp time.Duration) error
|
||||
func (s *Storage) Set(key string, value []byte, expire time.Duration) error
|
||||
func (s *Storage) Delete(key string) error
|
||||
func (s *Storage) Reset() error
|
||||
func (s *Storage) Close() error
|
||||
func (s *Storage) Conn() *gocql.Session
|
||||
```
|
||||
|
||||
### Installation
|
||||
ScyllaDB is tested on the 2 last [Go versions](https://golang.org/dl/) with support for modules. So make sure to initialize one first if you didn't do that yet:
|
||||
ScyllaDb is tested on the 2 last [Go versions](https://golang.org/dl/) with support for modules. So make sure to initialize one first if you didn't do that yet:
|
||||
```bash
|
||||
go mod init github.com/<user>/<repo>
|
||||
```
|
||||
@@ -42,69 +48,89 @@ store := scylladb.New()
|
||||
|
||||
// Initialize custom config
|
||||
store := scylladb.New(scylladb.Config{
|
||||
Hosts: []string{"127.0.0.1"},
|
||||
Keyspace: "fiber_keyspace",
|
||||
Consistency: "ONE",
|
||||
Table: "fiber_table",
|
||||
Keyspace: "fiber",
|
||||
Hosts: []string{"127.0.0.1"},
|
||||
Port: 9042,
|
||||
Table: "fiber_storage",
|
||||
Consistency: "ONE",
|
||||
Reset: false,
|
||||
})
|
||||
|
||||
// Initialize custom config using scylladb connection
|
||||
cluster, _ := gocql.NewCluster("127.0.0.1")
|
||||
cluster.Keyspace = "fiber"
|
||||
cluster.Port = 9042
|
||||
|
||||
session, _ := cluster.CreateSession()
|
||||
store := scylladb.New(scylladb.Config{
|
||||
Session: session,
|
||||
Keyspace: "fiber",
|
||||
Table: "fiber_storage",
|
||||
Reset: false,
|
||||
})
|
||||
```
|
||||
|
||||
### Config
|
||||
```go
|
||||
type Config struct {
|
||||
// Hosts name where the DB is hosted
|
||||
// Session Will override Keyspace and all other authentication values if used
|
||||
//
|
||||
// Optional. Default is nil
|
||||
Session *gocql.Session
|
||||
|
||||
// Keyspace name
|
||||
//
|
||||
// Optional. Default is "fiber"
|
||||
Keyspace string
|
||||
|
||||
// Host name where the ScyllaDb cluster is hosted
|
||||
//
|
||||
// Optional. Default is "127.0.0.1"
|
||||
Hosts []string
|
||||
|
||||
// Server username
|
||||
// Port where the ScyllaDb cluster is listening on
|
||||
//
|
||||
// Optional. Default is ""
|
||||
Username string
|
||||
// Optional. Default is 9042
|
||||
Port int
|
||||
|
||||
// Server password
|
||||
// Username for ScyllaDb cluster
|
||||
//
|
||||
// Optional. Default is ""
|
||||
Password string
|
||||
// Optional. Default is ""
|
||||
Username string
|
||||
|
||||
// Keyspace name
|
||||
// Password for ScyllaDb cluster
|
||||
//
|
||||
// Optional. Default is "scylladb_db"
|
||||
Keyspace string
|
||||
// Optional. Default is ""
|
||||
Password string
|
||||
|
||||
// Level of the consistency
|
||||
//
|
||||
// Optional. Default is "LOCAL_ONE"
|
||||
Consistency string
|
||||
|
||||
// Number of replication
|
||||
// Table name
|
||||
//
|
||||
// Optional. Default 1
|
||||
ReplicationFactor int
|
||||
// Optional. Default is "fiber_storage"
|
||||
Table string
|
||||
|
||||
// Database to be operated on in the cluster.
|
||||
//
|
||||
// Optional. Default is "".
|
||||
Table string
|
||||
|
||||
// Reset clears any existing keys in existing Table
|
||||
// Level of the consistency
|
||||
//
|
||||
// Optional. Default is false
|
||||
Reset bool
|
||||
// Optional. Default is "LOCAL_ONE"
|
||||
Consistency string
|
||||
|
||||
// Reset clears any existing keys in existing Table
|
||||
//
|
||||
// Optional. Default is false
|
||||
Reset bool
|
||||
}
|
||||
```
|
||||
|
||||
### Default Config
|
||||
```go
|
||||
var ConfigDefault = Config{
|
||||
Hosts: []string{"172.0.0.1"},
|
||||
Username: "",
|
||||
Password: "",
|
||||
Table: "scylla_table",
|
||||
Keyspace: "scylla_db",
|
||||
Consistency: "LOCAL_ONE",
|
||||
ReplicationFactor: 1,
|
||||
Session: nil,
|
||||
Keyspace: "fiber",
|
||||
Hosts: []string{"127.0.0.1"},
|
||||
Username: "",
|
||||
Password: "",
|
||||
Port: 9042,
|
||||
Table: "fiber_storage",
|
||||
Consistency: "ONE",
|
||||
Reset: false,
|
||||
}
|
||||
```
|
||||
```
|
@@ -1,57 +1,68 @@
|
||||
package scylladb
|
||||
|
||||
import "github.com/gocql/gocql"
|
||||
|
||||
type Config struct {
|
||||
// Host name where the DB is hosted
|
||||
// Session is provided by the user to use an existing ScyllaDb session
|
||||
//
|
||||
// Optional. Default is nil
|
||||
Session *gocql.Session
|
||||
|
||||
// Keyspace name
|
||||
//
|
||||
// Optional. Default is "fiber"
|
||||
Keyspace string
|
||||
|
||||
// Host name where the ScyllaDb cluster is hosted
|
||||
//
|
||||
// Optional. Default is "127.0.0.1"
|
||||
Hosts []string
|
||||
|
||||
// Server username
|
||||
// Port where the ScyllaDb cluster is listening on
|
||||
//
|
||||
// Optional. Default is 9042
|
||||
Port int
|
||||
|
||||
// Username for ScyllaDb cluster
|
||||
//
|
||||
// Optional. Default is ""
|
||||
Username string
|
||||
|
||||
// Server password
|
||||
// Password for ScyllaDb cluster
|
||||
//
|
||||
// Optional. Default is ""
|
||||
Password string
|
||||
|
||||
// Name of the keyspace
|
||||
// Table name
|
||||
//
|
||||
// Optional. Default is "scylla_db"
|
||||
Keyspace string
|
||||
// Optional. Default is "fiber_storage"
|
||||
Table string
|
||||
|
||||
// Level of the consistency
|
||||
//
|
||||
// Optional. Default is "LOCAL_ONE"
|
||||
Consistency string
|
||||
|
||||
// Number of replication factor
|
||||
//
|
||||
// Optional. Default 1
|
||||
ReplicationFactor int
|
||||
|
||||
// Database to be operated on in the cluster.
|
||||
//
|
||||
// Optional. Default is "scylla_table".
|
||||
Table string
|
||||
|
||||
// Reset clears any existing keys in existing Table
|
||||
//
|
||||
// Optional. Default is false
|
||||
Reset bool
|
||||
}
|
||||
|
||||
// ConfigDefault is the default config
|
||||
var ConfigDefault = Config{
|
||||
Hosts: []string{"172.0.0.1"},
|
||||
Username: "",
|
||||
Password: "",
|
||||
Table: "scylla_table",
|
||||
Keyspace: "scylla_db",
|
||||
Consistency: "LOCAL_ONE",
|
||||
ReplicationFactor: 1,
|
||||
Session: nil,
|
||||
Keyspace: "fiber",
|
||||
Hosts: []string{"127.0.0.1"},
|
||||
Username: "",
|
||||
Password: "",
|
||||
Port: 9042,
|
||||
Table: "fiber_storage",
|
||||
Consistency: "LOCAL_ONE",
|
||||
Reset: false,
|
||||
}
|
||||
|
||||
// configDefault helps to set a default config
|
||||
func configDefault(config ...Config) Config {
|
||||
// Return default config if nothing provided
|
||||
if len(config) < 1 {
|
||||
@@ -65,18 +76,17 @@ func configDefault(config ...Config) Config {
|
||||
if cfg.Hosts == nil {
|
||||
cfg.Hosts = ConfigDefault.Hosts
|
||||
}
|
||||
if cfg.Keyspace == "" {
|
||||
cfg.Keyspace = ConfigDefault.Keyspace
|
||||
if cfg.Port <= 0 {
|
||||
cfg.Port = ConfigDefault.Port
|
||||
}
|
||||
if cfg.Table == "" {
|
||||
cfg.Table = ConfigDefault.Table
|
||||
}
|
||||
if cfg.Keyspace == "" {
|
||||
cfg.Keyspace = ConfigDefault.Keyspace
|
||||
}
|
||||
if cfg.Consistency == "" {
|
||||
cfg.Consistency = ConfigDefault.Consistency
|
||||
}
|
||||
if cfg.ReplicationFactor <= 0 {
|
||||
cfg.ReplicationFactor = ConfigDefault.ReplicationFactor
|
||||
}
|
||||
|
||||
return cfg
|
||||
}
|
||||
|
@@ -12,5 +12,3 @@ require (
|
||||
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
|
||||
gopkg.in/inf.v0 v0.9.1 // indirect
|
||||
)
|
||||
|
||||
replace github.com/gocql/gocql => github.com/scylladb/gocql v1.11.1
|
||||
|
@@ -3,15 +3,13 @@ github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCS
|
||||
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
|
||||
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/gocql/gocql v1.6.0 h1:IdFdOTbnpbd0pDhl4REKQDM+Q0SzKXQ1Yh+YZZ8T/qU=
|
||||
github.com/gocql/gocql v1.6.0/go.mod h1:3gM2c4D3AnkISwBxGnMMsS8Oy4y2lhbPRsH4xnJrHG8=
|
||||
github.com/gofiber/utils v1.1.0 h1:vdEBpn7AzIUJRhe+CiTOJdUcTg4Q9RK+pEa0KPbLdrM=
|
||||
github.com/gofiber/utils v1.1.0/go.mod h1:poZpsnhBykfnY1Mc0KeEa6mSHrS3dV0+oBWyeQmb2e0=
|
||||
github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
|
||||
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
|
||||
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
|
||||
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8=
|
||||
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4=
|
||||
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
|
||||
@@ -20,22 +18,7 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
|
||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/scylladb/gocql v1.11.1 h1:AlIPHHZf2l0Cbj8wGjfELspaGfnd4meGj9sPQnr5dn8=
|
||||
github.com/scylladb/gocql v1.11.1/go.mod h1:ZLEJ0EVE5JhmtxIW2stgHq/v1P4fWap0qyyXSKyV8K0=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
golang.org/x/net v0.0.0-20220526153639-5463443f8c37 h1:lUkvobShwKsOesNfWWlCS5q7fnbG1MEliIzwu886fn8=
|
||||
golang.org/x/net v0.0.0-20220526153639-5463443f8c37/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
|
||||
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
||||
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
|
||||
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
|
||||
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
|
||||
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
|
||||
sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo=
|
||||
sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8=
|
||||
|
@@ -1,140 +1,168 @@
|
||||
package scylladb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/gocql/gocql"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Storage interface that is implemented by storage providers
|
||||
type Storage struct {
|
||||
Cluster *gocql.ClusterConfig
|
||||
Session *gocql.Session
|
||||
session *gocql.Session
|
||||
tableName string
|
||||
|
||||
cqlSelect string
|
||||
cqlInsert string
|
||||
cqlDelete string
|
||||
cqlReset string
|
||||
cqlGC string
|
||||
selectQuery string
|
||||
insertQuery string
|
||||
deleteQuery string
|
||||
resetQuery string
|
||||
}
|
||||
|
||||
var (
|
||||
checkSchemaMsg = "the `data` row has an incorrect data type. " +
|
||||
"The message should be BLOB, but it is instead %s. This could lead to encoding-related issues if the database is not migrated (refer to https://github.com/gofiber/storage/blob/main/MIGRATE.md)"
|
||||
createKeyspaceQuery = "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};"
|
||||
|
||||
dropQuery = "DROP TABLE IF EXISTS %s;"
|
||||
|
||||
initQuery = []string{
|
||||
`CREATE TABLE IF NOT EXISTS %s (
|
||||
k text PRIMARY KEY,
|
||||
v blob,
|
||||
e bigint
|
||||
);`,
|
||||
}
|
||||
dropQuery = `DROP TABLE IF EXISTS %s.%s;`
|
||||
createTableQuery = `CREATE TABLE IF NOT EXISTS %s.%s (id TEXT PRIMARY KEY, data BLOB, value BIGINT)`
|
||||
checkSchemaQuery = `SELECT type FROM system_schema.columns WHERE keyspace_name = '%s' AND table_name = '%s' AND column_name = 'data';`
|
||||
keyspaceMsg = `Keyspace cannot be empty.`
|
||||
)
|
||||
|
||||
// New creates a new storage
|
||||
func New(config ...Config) *Storage {
|
||||
var err error
|
||||
var session *gocql.Session
|
||||
|
||||
// Set default config
|
||||
cfg := configDefault(config...)
|
||||
|
||||
cluster := gocql.NewCluster(cfg.Hosts...)
|
||||
cluster.Consistency = gocql.ParseConsistency(cfg.Consistency)
|
||||
cluster.Keyspace = cfg.Keyspace
|
||||
|
||||
session, err := cluster.CreateSession()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
if cfg.Keyspace == "" {
|
||||
panic(keyspaceMsg)
|
||||
}
|
||||
|
||||
// Primitive ping method
|
||||
if err := session.Query("SELECT release_version FROM system.local").Exec(); err != nil {
|
||||
session.Close()
|
||||
panic(err)
|
||||
if cfg.Session == nil {
|
||||
// Create a cassandra cluster
|
||||
cluster := gocql.NewCluster(cfg.Hosts...)
|
||||
cluster.Consistency = gocql.ParseConsistency(cfg.Consistency)
|
||||
|
||||
// Set credentials if provided
|
||||
if cfg.Username != "" && cfg.Password != "" {
|
||||
cluster.Authenticator = gocql.PasswordAuthenticator{
|
||||
Username: cfg.Username,
|
||||
Password: cfg.Password,
|
||||
}
|
||||
}
|
||||
|
||||
// Create session
|
||||
session, err = cluster.CreateSession()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
} else {
|
||||
// Set session if provided
|
||||
session = cfg.Session
|
||||
}
|
||||
|
||||
// Create keyspace if it does not exist
|
||||
if err := session.Query(fmt.Sprintf(createKeyspaceQuery, cfg.Keyspace)).Exec(); err != nil {
|
||||
if err = session.Query(fmt.Sprintf(createKeyspaceQuery, cfg.Keyspace)).Exec(); err != nil {
|
||||
session.Close()
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// Drop table if reset set
|
||||
ctx := context.Background()
|
||||
// Drop table if reset is true
|
||||
if cfg.Reset {
|
||||
if err := session.Query(dropQuery, cfg.Table).WithContext(ctx).Exec(); err != nil {
|
||||
if err = session.Query(fmt.Sprintf(dropQuery, cfg.Keyspace, cfg.Table)).Exec(); err != nil {
|
||||
session.Close()
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Init database queries
|
||||
ctx = context.Background()
|
||||
for _, query := range initQuery {
|
||||
|
||||
if err := session.Query(fmt.Sprintf(query, cfg.Table)).WithContext(ctx).Exec(); err != nil {
|
||||
session.Close()
|
||||
panic(err)
|
||||
}
|
||||
// Create the storage
|
||||
store := &Storage{
|
||||
session: session,
|
||||
tableName: cfg.Table,
|
||||
selectQuery: fmt.Sprintf("SELECT data, value FROM %s.%s WHERE id = ?", cfg.Keyspace, cfg.Table),
|
||||
insertQuery: fmt.Sprintf("INSERT INTO %s.%s (id, data, value) VALUES (?, ?, ?)", cfg.Keyspace, cfg.Table),
|
||||
deleteQuery: fmt.Sprintf("DELETE FROM %s.%s WHERE id = ?", cfg.Keyspace, cfg.Table),
|
||||
resetQuery: fmt.Sprintf("TRUNCATE %s.%s", cfg.Keyspace, cfg.Table),
|
||||
}
|
||||
|
||||
storage := &Storage{
|
||||
Cluster: cluster,
|
||||
Session: session,
|
||||
|
||||
cqlSelect: fmt.Sprintf(`SELECT v, e FROM %s WHERE k=?`, cfg.Table),
|
||||
cqlInsert: fmt.Sprintf(`INSERT INTO %s (k, v, e) VALUES (?, ?, ?)`, cfg.Table),
|
||||
cqlDelete: fmt.Sprintf(`DELETE FROM %s WHERE k=?`, cfg.Table),
|
||||
cqlReset: fmt.Sprintf(`TRUNCATE %s`, cfg.Table),
|
||||
cqlGC: fmt.Sprintf(`DELETE FROM %s WHERE e <= ? AND e != 0`, cfg.Table),
|
||||
// Create table if not exists
|
||||
if err = store.createTableIfNotExists(cfg.Keyspace); err != nil {
|
||||
session.Close()
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return storage
|
||||
// Check schema
|
||||
store.checkSchema(cfg.Keyspace)
|
||||
|
||||
return store // Return storage
|
||||
}
|
||||
|
||||
// Get value by key
|
||||
func (s *Storage) createTableIfNotExists(keyspace string) error {
|
||||
// Create table if not exists
|
||||
query := fmt.Sprintf(createTableQuery, keyspace, s.tableName)
|
||||
if err := s.session.Query(query).Exec(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Storage) checkSchema(keyspace string) {
|
||||
// Check schema for data column type (should be blob)
|
||||
var dataType string
|
||||
query := fmt.Sprintf(checkSchemaQuery, keyspace, s.tableName)
|
||||
if err := s.session.Query(query).Scan(&dataType); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if dataType != "blob" {
|
||||
panic(fmt.Errorf(checkSchemaMsg, dataType))
|
||||
}
|
||||
}
|
||||
|
||||
// Get retrieves a value by key
|
||||
func (s *Storage) Get(key string) ([]byte, error) {
|
||||
ctx := context.Background()
|
||||
var (
|
||||
data []byte
|
||||
exp int64 = 0
|
||||
)
|
||||
if err := s.Session.Query(s.cqlSelect, key).WithContext(ctx).Scan(&data, &exp); err != nil {
|
||||
var value []byte
|
||||
var expiration int64
|
||||
if err := s.session.Query(s.selectQuery, key).Scan(&value, &expiration); err != nil {
|
||||
if errors.Is(err, gocql.ErrNotFound) {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return data, nil
|
||||
if expiration > 0 && time.Now().Unix() > expiration {
|
||||
return nil, nil
|
||||
}
|
||||
return value, nil
|
||||
}
|
||||
|
||||
// Set sets a value in the storage for the provided key
|
||||
func (s *Storage) Set(key string, val []byte, exp time.Duration) error {
|
||||
ctx := context.Background()
|
||||
|
||||
return s.Session.Query(s.cqlInsert, key, val, int64(exp.Seconds())).WithContext(ctx).Exec()
|
||||
// Set sets a value by key
|
||||
func (s *Storage) Set(key string, value []byte, expire time.Duration) error {
|
||||
var expiration int64
|
||||
if expire != 0 {
|
||||
expiration = time.Now().Add(expire).Unix()
|
||||
}
|
||||
return s.session.Query(s.insertQuery, key, value, expiration).Exec()
|
||||
}
|
||||
|
||||
// Delete deletes a value from the storage based on the provided key
|
||||
// Delete removes a value by key
|
||||
func (s *Storage) Delete(key string) error {
|
||||
ctx := context.Background()
|
||||
|
||||
return s.Session.Query(s.cqlDelete, key).WithContext(ctx).Exec()
|
||||
return s.session.Query(s.deleteQuery, key).Exec()
|
||||
}
|
||||
|
||||
// Reset resets the storage
|
||||
// Reset resets all values
|
||||
func (s *Storage) Reset() error {
|
||||
ctx := context.Background()
|
||||
|
||||
return s.Session.Query(s.cqlReset).WithContext(ctx).Exec()
|
||||
return s.session.Query(s.resetQuery).Exec()
|
||||
}
|
||||
|
||||
// Close closes the connection to the storage
|
||||
// Close closes the storage
|
||||
func (s *Storage) Close() error {
|
||||
s.Session.Close()
|
||||
s.session.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Conn returns session
|
||||
// Conn returns the underlying gocql session
|
||||
func (s *Storage) Conn() *gocql.Session {
|
||||
return s.Session
|
||||
return s.session
|
||||
}
|
||||
|
@@ -6,7 +6,7 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
var testStore = New(Config{})
|
||||
var testStore = New(Config{Reset: true})
|
||||
|
||||
func Test_Scylla_Set(t *testing.T) {
|
||||
// Create a new instance of the Storage
|
||||
|
Reference in New Issue
Block a user