fix: compatibility with older Redis versions without UNLINK command support #69

perf: add waiting indicator for deleting keys and flushing database
This commit is contained in:
tiny-craft
2023-11-10 15:57:19 +08:00
parent 5a58a57cd5
commit 65e077c0c0
4 changed files with 113 additions and 64 deletions

View File

@@ -1522,23 +1522,25 @@ func (b *browserService) DeleteKey(connName string, db int, k any, async bool) (
if strings.HasSuffix(key, "*") {
// delete by prefix
var mutex sync.Mutex
supportUnlink := true
del := func(ctx context.Context, cli redis.UniversalClient) error {
handleDel := func(ks []string) error {
pipe := cli.Pipeline()
for _, k2 := range ks {
if async {
cli.Unlink(ctx, k2)
} else {
cli.Del(ctx, k2)
var delErr error
if async && supportUnlink {
supportUnlink = false
if delErr = cli.Unlink(ctx, ks...).Err(); delErr != nil {
// not support unlink? try del command
delErr = cli.Del(ctx, ks...).Err()
}
} else {
delErr = cli.Del(ctx, ks...).Err()
}
pipe.Exec(ctx)
mutex.Lock()
deletedKeys = append(deletedKeys, ks...)
mutex.Unlock()
return nil
return delErr
}
scanSize := int64(Preferences().GetScanSize())
@@ -1546,7 +1548,7 @@ func (b *browserService) DeleteKey(connName string, db int, k any, async bool) (
resultKeys := make([]string, 0, 100)
for iter.Next(ctx) {
resultKeys = append(resultKeys, iter.Val())
if len(resultKeys) >= 3 {
if len(resultKeys) >= 20 {
handleDel(resultKeys)
resultKeys = resultKeys[:0:cap(resultKeys)]
}
@@ -1574,12 +1576,14 @@ func (b *browserService) DeleteKey(connName string, db int, k any, async bool) (
} else {
// delete key only
if async {
if _, err = client.Unlink(ctx, key).Result(); err != nil {
resp.Msg = err.Error()
return
if err = client.Unlink(ctx, key).Err(); err != nil {
if err = client.Del(ctx, key).Err(); err != nil {
resp.Msg = err.Error()
return
}
}
} else {
if _, err = client.Del(ctx, key).Result(); err != nil {
if err = client.Del(ctx, key).Err(); err != nil {
resp.Msg = err.Error()
return
}
@@ -1603,8 +1607,8 @@ func (b *browserService) FlushDB(connName string, db int, async bool) (resp type
return
}
flush := func(ctx context.Context, cli redis.UniversalClient) {
cli.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
flush := func(ctx context.Context, cli redis.UniversalClient, async bool) error {
_, e := cli.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Select(ctx, db)
if async {
pipe.FlushDBAsync(ctx)
@@ -1613,17 +1617,26 @@ func (b *browserService) FlushDB(connName string, db int, async bool) (resp type
}
return nil
})
return e
}
client, ctx := item.client, item.ctx
if cluster, ok := client.(*redis.ClusterClient); ok {
// cluster mode
err = cluster.ForEachMaster(ctx, func(ctx context.Context, cli *redis.Client) error {
flush(ctx, cli)
return nil
return flush(ctx, cli, async)
})
// try sync mode if error cause
if err != nil && async {
err = cluster.ForEachMaster(ctx, func(ctx context.Context, cli *redis.Client) error {
return flush(ctx, cli, false)
})
}
} else {
flush(ctx, client)
if err = flush(ctx, client, async); err != nil && async {
// try sync mode if error cause
err = flush(ctx, client, false)
}
}
if err != nil {