feat: support import keys from csv file

This commit is contained in:
Lykin
2023-12-27 15:44:08 +08:00
parent 2bc7a57773
commit 3fe8767c44
15 changed files with 411 additions and 73 deletions

View File

@@ -1999,7 +1999,7 @@ func (b *browserService) DeleteKeys(server string, db int, ks []any, serialNo st
var deletedKeys = make([]any, 0, total)
var mutex sync.Mutex
del := func(ctx context.Context, cli redis.UniversalClient) error {
startTime := time.Now()
startTime := time.Now().Add(-10 * time.Second)
for i, k := range ks {
// emit progress per second
param := map[string]any{
@@ -2010,6 +2010,8 @@ func (b *browserService) DeleteKeys(server string, db int, ks []any, serialNo st
if i >= total-1 || time.Now().Sub(startTime).Milliseconds() > 100 {
startTime = time.Now()
runtime.EventsEmit(b.ctx, processEvent, param)
// do some sleep to prevent blocking the Redis server
time.Sleep(10 * time.Millisecond)
}
key := strutil.DecodeRedisKey(k)
@@ -2026,8 +2028,6 @@ func (b *browserService) DeleteKeys(server string, db int, ks []any, serialNo st
canceled = true
break
}
// do some sleep to prevent blocking the Redis server
time.Sleep(100 * time.Microsecond)
}
return nil
}
@@ -2093,13 +2093,17 @@ func (b *browserService) ExportKey(server string, db int, ks []any, path string)
total := len(ks)
var exported, failed int64
var canceled bool
startTime := time.Now().Add(-10 * time.Second)
for i, k := range ks {
param := map[string]any{
"total": total,
"progress": i + 1,
"processing": k,
if i >= total-1 || time.Now().Sub(startTime).Milliseconds() > 100 {
startTime = time.Now()
param := map[string]any{
"total": total,
"progress": i + 1,
"processing": k,
}
runtime.EventsEmit(b.ctx, processEvent, param)
}
runtime.EventsEmit(b.ctx, processEvent, param)
key := strutil.DecodeRedisKey(k)
content, dumpErr := client.Dump(ctx, key).Bytes()
@@ -2128,6 +2132,121 @@ func (b *browserService) ExportKey(server string, db int, ks []any, path string)
return
}
// ImportCSV import data from csv file
func (b *browserService) ImportCSV(server string, db int, path string, conflict int) (resp types.JSResp) {
// connect a new connection to export keys
conf := Connection().getConnection(server)
if conf == nil {
resp.Msg = fmt.Sprintf("no connection profile named: %s", server)
return
}
var client redis.UniversalClient
var err error
var connConfig = conf.ConnectionConfig
connConfig.LastDB = db
if client, err = b.createRedisClient(connConfig); err != nil {
resp.Msg = err.Error()
return
}
ctx, cancelFunc := context.WithCancel(b.ctx)
defer client.Close()
defer cancelFunc()
file, err := os.Open(path)
if err != nil {
resp.Msg = err.Error()
return
}
defer file.Close()
reader := csv.NewReader(file)
cancelEvent := "import:stop:" + path
runtime.EventsOnce(ctx, cancelEvent, func(data ...any) {
cancelFunc()
})
processEvent := "importing:" + path
var line []string
var readErr error
var key, value []byte
var ttl int64
var imported, ignored int64
var canceled bool
startTime := time.Now().Add(-10 * time.Second)
for {
readErr = nil
ttl = redis.KeepTTL
line, readErr = reader.Read()
if readErr != nil {
break
}
if len(line) < 1 {
continue
}
if key, readErr = hex.DecodeString(line[0]); readErr != nil {
continue
}
if value, readErr = hex.DecodeString(line[1]); readErr != nil {
continue
}
// get ttl
if len(line) > 2 {
if ttl, readErr = strconv.ParseInt(line[2], 10, 64); readErr != nil {
ttl = redis.KeepTTL
}
}
if conflict == 0 {
readErr = client.RestoreReplace(ctx, string(key), time.Duration(ttl), string(value)).Err()
} else {
keyStr := string(key)
// go-redis may crash when batch calling restore
// use "exists" to filter first
if n, _ := client.Exists(ctx, keyStr).Result(); n <= 0 {
readErr = client.Restore(ctx, keyStr, time.Duration(ttl), string(value)).Err()
} else {
readErr = errors.New("key existed")
}
}
if readErr != nil {
// restore fail
ignored += 1
} else {
imported += 1
}
if errors.Is(readErr, context.Canceled) || canceled {
canceled = true
break
}
if time.Now().Sub(startTime).Milliseconds() > 100 {
startTime = time.Now()
param := map[string]any{
"imported": imported,
"ignored": ignored,
//"processing": string(key),
}
runtime.EventsEmit(b.ctx, processEvent, param)
// do some sleep to prevent blocking the Redis server
time.Sleep(10 * time.Millisecond)
}
}
runtime.EventsOff(ctx, cancelEvent)
resp.Success = true
resp.Data = struct {
Canceled bool `json:"canceled"`
Imported int64 `json:"imported"`
Ignored int64 `json:"ignored"`
}{
Canceled: canceled,
Imported: imported,
Ignored: ignored,
}
return
}
// FlushDB flush database
func (b *browserService) FlushDB(connName string, db int, async bool) (resp types.JSResp) {
item, err := b.getRedisClient(connName, db)