Added emebedded API methods to change Protocol and logical DB. Added comments to SwapDBs and Flush methods.

This commit is contained in:
Kelvin Mwinuka
2024-06-25 23:25:59 +08:00
parent 1a8d2314cd
commit 0bb4ce6756
2 changed files with 136 additions and 75 deletions

View File

@@ -13,3 +13,58 @@
// limitations under the License.
package echovault
import (
"errors"
"slices"
)
// SetProtocol sets the RESP protocol that's expected from responses to embedded API calls.
// This command does not affect the RESP protocol expected by any of the TCP clients.
//
// Parameters:
//
// `protocol` - int - The RESP version (either 2 or 3).
//
// Errors:
//
// "protocol must be either 2 or 3" - When the provided protocol is not either 2 or 3.
func (server *EchoVault) SetProtocol(protocol int) error {
if !slices.Contains([]int{2, 3}, protocol) {
return errors.New("protocol must be either 2 or 3")
}
server.connInfo.mut.Lock()
defer server.connInfo.mut.Unlock()
server.connInfo.embedded.Protocol = protocol
return nil
}
// SelectDB sets the logical database to use for all embedded API calls.
// All subsequent calls after this call will use the new logical database.
// This does not affect the databases used by any of the TCP clients.
//
// Parameters:
//
// `database` - int - The Database index.
//
// Errors:
//
// "database index must be 0 or higher" - When the database index is less than 0.
func (server *EchoVault) SelectDB(database int) error {
if database < 0 {
return errors.New("database index must be 0 or higher")
}
// If the database index does not exist, create the new database.
server.storeLock.Lock()
if server.store[database] == nil {
server.createDatabase(database)
}
server.storeLock.Unlock()
// Set the DB.
server.connInfo.mut.Lock()
defer server.connInfo.mut.Unlock()
server.connInfo.embedded.Database = database
return nil
}

View File

@@ -28,6 +28,87 @@ import (
"time"
)
// SwapDBs swaps every TCP client connection from database1 over to database2.
// It also swaps every TCP client connection from database2 over to database1.
// This only affects TCP connections, it does not swap the logical database currently
// being used by the embedded API.
func (server *EchoVault) SwapDBs(database1, database2 int) {
// If the databases are the same, skip the swap.
if database1 == database2 {
return
}
// If any of the databases does not exist, create them.
server.storeLock.Lock()
for _, database := range []int{database1, database2} {
if server.store[database] == nil {
server.createDatabase(database)
}
}
server.storeLock.Unlock()
// Swap the connections for each database.
server.connInfo.mut.Lock()
defer server.connInfo.mut.Unlock()
for connection, info := range server.connInfo.tcpClients {
switch info.Database {
case database1:
server.connInfo.tcpClients[connection] = internal.ConnectionInfo{
Id: info.Id,
Name: info.Name,
Protocol: info.Protocol,
Database: database2,
}
case database2:
server.connInfo.tcpClients[connection] = internal.ConnectionInfo{
Id: info.Id,
Name: info.Name,
Protocol: info.Protocol,
Database: database1,
}
}
}
}
// Flush flushes all the data from the database at the specified index.
// When -1 is passed, all the logical databases are cleared.
func (server *EchoVault) Flush(database int) {
server.storeLock.Lock()
defer server.storeLock.Unlock()
server.keysWithExpiry.rwMutex.Lock()
defer server.keysWithExpiry.rwMutex.Unlock()
server.lfuCache.mutex.Lock()
defer server.lfuCache.mutex.Unlock()
server.lruCache.mutex.Lock()
defer server.lruCache.mutex.Unlock()
if database == -1 {
for db, _ := range server.store {
// Clear db store.
clear(server.store[db])
// Clear db volatile key tracker.
clear(server.keysWithExpiry.keys[db])
// Clear db LFU cache.
server.lfuCache.cache[db] = eviction.NewCacheLFU()
// Clear db LRU cache.
server.lruCache.cache[db] = eviction.NewCacheLRU()
}
return
}
// Clear db store.
clear(server.store[database])
// Clear db volatile key tracker.
clear(server.keysWithExpiry.keys[database])
// Clear db LFU cache.
server.lfuCache.cache[database] = eviction.NewCacheLFU()
// Clear db LRU cache.
server.lruCache.cache[database] = eviction.NewCacheLRU()
}
func (server *EchoVault) keysExist(ctx context.Context, keys []string) map[string]bool {
server.storeLock.RLock()
defer server.storeLock.RUnlock()
@@ -227,81 +308,6 @@ func (server *EchoVault) createDatabase(database int) {
server.lruCache.cache[database] = eviction.NewCacheLRU()
}
func (server *EchoVault) SwapDBs(database1, database2 int) {
// If the databases are the same, skip the swap.
if database1 == database2 {
return
}
// If any of the databases does not exist, create them.
server.storeLock.Lock()
for _, database := range []int{database1, database2} {
if server.store[database] == nil {
server.createDatabase(database)
}
}
server.storeLock.Unlock()
// Swap the connections for each database.
server.connInfo.mut.Lock()
defer server.connInfo.mut.Unlock()
for connection, info := range server.connInfo.tcpClients {
switch info.Database {
case database1:
server.connInfo.tcpClients[connection] = internal.ConnectionInfo{
Id: info.Id,
Name: info.Name,
Protocol: info.Protocol,
Database: database2,
}
case database2:
server.connInfo.tcpClients[connection] = internal.ConnectionInfo{
Id: info.Id,
Name: info.Name,
Protocol: info.Protocol,
Database: database1,
}
}
}
}
func (server *EchoVault) Flush(database int) {
server.storeLock.Lock()
defer server.storeLock.Unlock()
server.keysWithExpiry.rwMutex.Lock()
defer server.keysWithExpiry.rwMutex.Unlock()
server.lfuCache.mutex.Lock()
defer server.lfuCache.mutex.Unlock()
server.lruCache.mutex.Lock()
defer server.lruCache.mutex.Unlock()
if database == -1 {
for db, _ := range server.store {
// Clear db store.
clear(server.store[db])
// Clear db volatile key tracker.
clear(server.keysWithExpiry.keys[db])
// Clear db LFU cache.
server.lfuCache.cache[db] = eviction.NewCacheLFU()
// Clear db LRU cache.
server.lruCache.cache[db] = eviction.NewCacheLRU()
}
return
}
// Clear db store.
clear(server.store[database])
// Clear db volatile key tracker.
clear(server.keysWithExpiry.keys[database])
// Clear db LFU cache.
server.lfuCache.cache[database] = eviction.NewCacheLFU()
// Clear db LRU cache.
server.lruCache.cache[database] = eviction.NewCacheLRU()
}
func (server *EchoVault) getState() map[int]map[string]interface{} {
// Wait unit there's no state mutation or copy in progress before starting a new copy process.
for {