# Fix Race detection

- logger
- password

# Fix Archive :
- create archive : add params to clean prefix path from contents path

# Fix Nutsdb :
- fix client command detection
- add test for unknown command before parsing
- fix snapshoot process for cluster
This commit is contained in:
Nicolas JUHEL
2021-04-15 15:42:49 +02:00
parent fcfd320993
commit 8516f33fe0
9 changed files with 154 additions and 52 deletions

View File

@@ -199,7 +199,7 @@ func ExtractAll(src libiot.FileProgress, originalName, outputPath string, defaul
return nil
}
func CreateArchive(archiveType ArchiveType, archive libiot.FileProgress, pathContent ...string) (created bool, err liberr.Error) {
func CreateArchive(archiveType ArchiveType, archive libiot.FileProgress, stripPath string, pathContent ...string) (created bool, err liberr.Error) {
//@TODO: make function
if len(pathContent) < 1 {
//nolint #goerr113
@@ -208,11 +208,11 @@ func CreateArchive(archiveType ArchiveType, archive libiot.FileProgress, pathCon
switch archiveType {
case TypeGzip:
return libgzp.Create(archive, pathContent...)
return libgzp.Create(archive, stripPath, pathContent...)
case TypeTar:
return libtar.Create(archive, pathContent...)
return libtar.Create(archive, stripPath, pathContent...)
case TypeTarGzip:
return libtar.CreateGzip(archive, pathContent...)
return libtar.CreateGzip(archive, stripPath, pathContent...)
}
return false, nil

View File

@@ -36,7 +36,7 @@ import (
liberr "github.com/nabbar/golib/errors"
)
func Create(archive io.WriteSeeker, content ...string) (bool, liberr.Error) {
func Create(archive io.WriteSeeker, stripPath string, content ...string) (bool, liberr.Error) {
var (
w *gzip.Writer
f *os.File

View File

@@ -34,17 +34,18 @@ import (
"io"
"os"
"path/filepath"
"strings"
liberr "github.com/nabbar/golib/errors"
)
func Create(archive io.WriteSeeker, content ...string) (bool, liberr.Error) {
func Create(archive io.WriteSeeker, stripPath string, content ...string) (bool, liberr.Error) {
if _, err := archive.Seek(0, io.SeekStart); err != nil {
return false, ErrorFileSeek.ErrorParent(err)
}
if ok, err := createTar(archive, content...); err != nil || !ok {
if ok, err := createTar(archive, stripPath, content...); err != nil || !ok {
return ok, err
}
@@ -55,7 +56,7 @@ func Create(archive io.WriteSeeker, content ...string) (bool, liberr.Error) {
return true, nil
}
func CreateGzip(archive io.WriteSeeker, content ...string) (bool, liberr.Error) {
func CreateGzip(archive io.WriteSeeker, stripPath string, content ...string) (bool, liberr.Error) {
if _, err := archive.Seek(0, io.SeekStart); err != nil {
return false, ErrorFileSeek.ErrorParent(err)
@@ -63,7 +64,7 @@ func CreateGzip(archive io.WriteSeeker, content ...string) (bool, liberr.Error)
z := gzip.NewWriter(archive)
if ok, err := createTar(z, content...); err != nil || !ok {
if ok, err := createTar(z, stripPath, content...); err != nil || !ok {
return ok, err
}
@@ -78,7 +79,7 @@ func CreateGzip(archive io.WriteSeeker, content ...string) (bool, liberr.Error)
return true, nil
}
func createTar(w io.Writer, content ...string) (bool, liberr.Error) {
func createTar(w io.Writer, stripPath string, content ...string) (bool, liberr.Error) {
var (
t *tar.Writer
n int64
@@ -87,6 +88,7 @@ func createTar(w io.Writer, content ...string) (bool, liberr.Error) {
lEr = ErrorTarCreateAddFile.Error(nil)
)
stripPath = strings.TrimLeft(stripPath, "/")
t = tar.NewWriter(w)
for i := 0; i < len(content); i++ {
@@ -111,6 +113,15 @@ func createTar(w io.Writer, content ...string) (bool, liberr.Error) {
// (see https://golang.org/src/archive/tar/common.go?#L626)
h.Name = filepath.ToSlash(file)
if stripPath != "" {
h.Name = filepath.Clean(strings.Replace(strings.TrimLeft(h.Name, "/"), stripPath, "", 1))
}
h.Name = strings.TrimLeft(h.Name, "/")
if h.Name == "" || h.Name == "." {
return nil
}
// write header
if e = t.WriteHeader(h); e != nil {
return e

View File

@@ -28,6 +28,7 @@ import (
"encoding/json"
"fmt"
"strings"
"sync/atomic"
"time"
"github.com/gin-gonic/gin"
@@ -56,18 +57,13 @@ const (
)
var (
curLevel = NilLevel
curLevel *atomic.Value
)
func init() {
SetLevel(InfoLevel)
}
//GetCurrentLevel return the current loglevel setting in the logger. All log entry matching this level or below will be logged.
func GetCurrentLevel() Level {
return curLevel
}
// GetLevelListString return a list ([]string) of all string loglevel available.
func GetLevelListString() []string {
return []string{
@@ -80,6 +76,28 @@ func GetLevelListString() []string {
}
}
//GetCurrentLevel return the current loglevel setting in the logger. All log entry matching this level or below will be logged.
func GetCurrentLevel() Level {
if curLevel == nil {
curLevel = new(atomic.Value)
}
if i := curLevel.Load(); i == nil {
return NilLevel
} else if l, ok := i.(Level); !ok {
return NilLevel
} else {
return l
}
}
func setCurLevel(lvl Level) {
if curLevel == nil {
curLevel = new(atomic.Value)
}
curLevel.Store(lvl)
}
// SetLevel Change the Level of all log entry with the Level type given in parameter. The change is apply for next log entry only.
// If the given Level type is not matching a correct Level type, no change will be apply.
/*
@@ -90,30 +108,31 @@ func SetLevel(level Level) {
switch level {
case PanicLevel:
curLevel = PanicLevel
setCurLevel(PanicLevel)
logrus.SetLevel(logrus.PanicLevel)
case FatalLevel:
curLevel = FatalLevel
setCurLevel(FatalLevel)
logrus.SetLevel(logrus.FatalLevel)
case ErrorLevel:
curLevel = ErrorLevel
setCurLevel(ErrorLevel)
logrus.SetLevel(logrus.ErrorLevel)
case WarnLevel:
curLevel = WarnLevel
setCurLevel(WarnLevel)
logrus.SetLevel(logrus.WarnLevel)
case InfoLevel:
curLevel = InfoLevel
setCurLevel(InfoLevel)
logrus.SetLevel(logrus.InfoLevel)
case DebugLevel:
curLevel = DebugLevel
setCurLevel(DebugLevel)
logrus.SetLevel(logrus.DebugLevel)
case NilLevel:
setCurLevel(NilLevel)
return
}
@@ -126,8 +145,8 @@ func setViperLogTrace() {
return
}
jwalterweatherman.SetLogOutput(GetIOWriter(curLevel, "[Log Config Viper]"))
jwalterweatherman.SetStdoutOutput(GetIOWriter(curLevel, "[Std Config Viper]"))
jwalterweatherman.SetLogOutput(GetIOWriter(GetCurrentLevel(), "[Log Config Viper]"))
jwalterweatherman.SetStdoutOutput(GetIOWriter(GetCurrentLevel(), "[Std Config Viper]"))
if filetrace {
jwalterweatherman.SetStdoutThreshold(jwalterweatherman.LevelTrace)
@@ -135,7 +154,7 @@ func setViperLogTrace() {
}
//nolint exhaustive
switch curLevel {
switch GetCurrentLevel() {
case PanicLevel:
jwalterweatherman.SetStdoutThreshold(jwalterweatherman.LevelCritical)
@@ -370,7 +389,7 @@ func (level Level) logDetails(message string, data interface{}, err error, field
tags[tagTime] = level.String()
if filetrace && curLevel == DebugLevel {
if filetrace && GetCurrentLevel() == DebugLevel {
frame := getFrame()
tags[tagCaller] = frame.Function
tags[tagFile] = filterPath(frame.File)

View File

@@ -175,7 +175,7 @@ func ginTonicAddError(c *gin.Context, err error) {
}
func proceed(lvl Level) bool {
return lvl != NilLevel && lvl <= curLevel
return lvl != NilLevel && lvl <= GetCurrentLevel()
}
func filterPath(pathname string) string {

View File

@@ -31,6 +31,7 @@ import (
"fmt"
"reflect"
"runtime"
"strings"
"github.com/fxamacker/cbor/v2"
liberr "github.com/nabbar/golib/errors"
@@ -72,7 +73,8 @@ func NewCommandByCaller(params ...interface{}) *CommandRequest {
f := runtime.FuncForPC(pc[0])
d := &CommandRequest{}
d.Cmd = CmdCodeFromName(f.Name())
fn := strings.Split(f.Name(), ".")
d.Cmd = CmdCodeFromName(fn[len(fn)-1])
if len(params) > 0 {
d.Params = params
@@ -232,6 +234,10 @@ func (c *CommandRequest) RunLocal(tx *nutsdb.Tx) (*CommandResponse, liberr.Error
}
func (c *CommandRequest) Run(tx *nutsdb.Tx) ([]byte, liberr.Error) {
if c.Cmd == CmdUnknown {
return nil, ErrorClientCommandInvalid.Error(nil)
}
if r, err := c.RunLocal(tx); err != nil {
return nil, err
} else if p, e := cbor.Marshal(r); e != nil {

View File

@@ -104,7 +104,7 @@ func (s *snap) Save(opt Options, writer io.Writer) liberr.Error {
_ = t.Close()
}()
if _, e = archive.CreateArchive(archive.TypeTarGzip, t, s.path); e != nil {
if _, e = archive.CreateArchive(archive.TypeTarGzip, t, s.path, s.path); e != nil {
return ErrorFolderArchive.Error(e)
}

View File

@@ -41,13 +41,9 @@ const (
loopRandMaxLen = 10
)
func randStringBytesMaskImprSrc(n int) string {
var src = rand.NewSource(time.Now().UnixNano())
func init() {
rand.Seed(time.Now().UnixNano())
}
func randStringBytesMaskImprSrc(n int) string {
b := make([]byte, n)
// A src.Int63() generates 63 random bits, enough for letterIdxMax characters!
for i, cache, remain := n-1, src.Int63(), letterIdxMax; i >= 0; {

View File

@@ -28,13 +28,18 @@
package main
import (
"bytes"
"context"
"errors"
"fmt"
"os"
"runtime"
"strings"
"sync/atomic"
"time"
"github.com/nabbar/golib/password"
"github.com/nabbar/golib/logger"
libclu "github.com/nabbar/golib/cluster"
@@ -48,8 +53,10 @@ import (
const (
BaseDirPattern = "/nutsdb/node-%d"
NbInstances = 3
NbEntries = 100000
NbEntries = 10000
LoggerFile = "/nutsdb/nutsdb.log"
AllowPut = true
AllowGet = true
)
var (
@@ -59,7 +66,7 @@ var (
func init() {
liberr.SetModeReturnError(liberr.ErrorReturnCodeErrorTraceFull)
logger.SetLevel(logger.WarnLevel)
logger.SetLevel(logger.InfoLevel)
logger.AddGID(true)
logger.EnableColor()
logger.FileTrace(true)
@@ -96,10 +103,19 @@ func main() {
}
}()
println(fmt.Sprintf("Running test with %d threads...", runtime.GOMAXPROCS(0)))
println(fmt.Sprintf("Init cluster..."))
tStart := time.Now()
cluster := Start(ctx)
logger.SetLevel(logger.WarnLevel)
defer func() {
Stop(ctx, cluster)
}()
tInit := time.Since(tStart)
mInit := fmt.Sprintf("Memory used after Init: \n%s", strings.Join(GetMemUsage(), "\n"))
runtime.GC()
println(fmt.Sprintf("Init done. \n"))
pgb := progress.NewProgressBarWithContext(ctx, mpb.WithWidth(64), mpb.WithRefreshRate(200*time.Millisecond))
@@ -114,14 +130,18 @@ func main() {
go func(ctx context.Context, bar progress.Bar, clu libndb.NutsDB, num int) {
defer bar.DeferWorker()
Put(ctx, clu, fmt.Sprintf("key-%3d", num), fmt.Sprintf("val-%03d", num))
}(ctx, barPut, cluster[i%3], i)
if AllowPut {
Put(ctx, clu, fmt.Sprintf("key-%03d", num), fmt.Sprintf("val-%03d|%s|%s|%s", num, password.Generate(50), password.Generate(50), password.Generate(50)))
}
}(ctx, barPut, cluster[i%3], i+1)
}
if e := barPut.WaitAll(); e != nil {
panic(e)
}
tPut := time.Since(tStart)
mPut := fmt.Sprintf("Memory used after Put entries: \n%s", strings.Join(GetMemUsage(), "\n"))
runtime.GC()
barGet := pgb.NewBarSimpleCounter("GetEntry", int64(NbEntries))
defer barGet.DeferMain(false)
@@ -139,20 +159,52 @@ func main() {
go func(ctx context.Context, bar progress.Bar, clu libndb.NutsDB, num int) {
defer bar.DeferWorker()
Get(ctx, clu, fmt.Sprintf("key-%3d", num))
}(ctx, barGet, cluster[c], i)
if AllowGet {
Get(ctx, clu, fmt.Sprintf("key-%03d", num), fmt.Sprintf("val-%03d", num))
}
}(ctx, barGet, cluster[c], i+1)
}
if e := barGet.WaitAll(); e != nil {
panic(e)
}
tGet := time.Since(tStart)
mGet := fmt.Sprintf("Memory used after Get entries: \n%s", strings.Join(GetMemUsage(), "\n"))
runtime.GC()
time.Sleep(10 * time.Second)
barPut.DeferMain(false)
barPut = nil
println(fmt.Sprintf("Time for init cluster: %s", tInit.String()))
println(fmt.Sprintf("Time for %d Put in DB: %s", NbEntries, tPut.String()))
println(fmt.Sprintf("Time for %d Get in DB: %s", NbEntries, tGet.String()))
barGet.DeferMain(false)
barGet = nil
pgb = nil
res := []string{
fmt.Sprintf("Time for init cluster: %s", tInit.String()),
fmt.Sprintf("Time for %d Put in DB: %s ( %s by entry )", NbEntries, tPut.String(), (tPut / NbEntries).String()),
fmt.Sprintf("Time for %d Get in DB: %s ( %s by entry )", NbEntries, tGet.String(), (tGet / NbEntries).String()),
mInit,
mPut,
mGet,
}
runtime.GC()
logger.SetLevel(logger.InfoLevel)
time.Sleep(5 * time.Second)
println(strings.Join(res, "\n"))
logger.InfoLevel.Logf("Results testing: \n%s", strings.Join(res, "\n"))
time.Sleep(5 * time.Second)
}
func GetMemUsage() []string {
var m runtime.MemStats
runtime.ReadMemStats(&m)
return []string{
fmt.Sprintf("\t - Alloc = %v MiB", m.Alloc/1024/1024),
fmt.Sprintf("\t - TotalAlloc = %v MiB", m.TotalAlloc/1024/1024),
fmt.Sprintf("\t - Sys = %v MiB", m.Sys/1024/1024),
fmt.Sprintf("\t - NumGC = %v\n", m.NumGC),
}
}
func Put(ctx context.Context, c libndb.NutsDB, key, val string) {
@@ -161,10 +213,16 @@ func Put(ctx context.Context, c libndb.NutsDB, key, val string) {
//fmt.Printf("Cmd Put(%s, %s) : %v\n", key, val, res)
}
func Get(ctx context.Context, c libndb.NutsDB, key string) {
_, _ = c.Client(ctx, 100*time.Microsecond).Get("myBucket", []byte(key))
//v, e := c.Client(ctx, 100*time.Microsecond).Get("myBucket", []byte(key))
//fmt.Printf("Cmd Get(%s) : %v --- err : %v\n", key, string(v.Value), e)
func Get(ctx context.Context, c libndb.NutsDB, key, val string) {
//_, _ = c.Client(ctx, 100*time.Microsecond).Get("myBucket", []byte(key))
v, e := c.Client(ctx, 100*time.Microsecond).Get("myBucket", []byte(key))
if e != nil {
logger.ErrorLevel.Logf("Cmd Get for key '%s', error : %v", key, e)
fmt.Printf("Cmd Get for key '%s', error : %v", key, e)
} else if !bytes.HasPrefix(v.Value, []byte(val)) {
logger.ErrorLevel.Logf("Cmd Get for key '%s', awaiting value start with '%s', but find : %s", key, val, string(v.Value))
fmt.Printf("Cmd Get for key '%s', awaiting value start with '%s', but find : %s", key, val, string(v.Value))
}
}
func Start(ctx context.Context) []libndb.NutsDB {
@@ -173,6 +231,7 @@ func Start(ctx context.Context) []libndb.NutsDB {
for i := 0; i < NbInstances; i++ {
clusters[i] = initNutDB(i + 1)
logger.InfoLevel.Logf("Starting node ID #%d...", i+1)
if err := clusters[i].Listen(); err != nil {
panic(err)
}
@@ -183,6 +242,17 @@ func Start(ctx context.Context) []libndb.NutsDB {
return clusters
}
func Stop(ctx context.Context, clusters []libndb.NutsDB) {
for i := 0; i < NbInstances; i++ {
logger.InfoLevel.Logf("Stopping node ID #%d...", i+1)
if err := clusters[i].Shutdown(); err != nil {
panic(err)
}
time.Sleep(5 * time.Second)
}
}
func initNutDB(num int) libndb.NutsDB {
cfg := configNutDB()
cfg.Cluster.Cluster.NodeID = uint64(num)
@@ -195,10 +265,10 @@ func initNutDB(num int) libndb.NutsDB {
func configNutDB() libndb.Config {
cfg := libndb.Config{
DB: libndb.NutsDBOptions{
EntryIdxMode: nutsdb.HintKeyValAndRAMIdxMode,
EntryIdxMode: nutsdb.HintKeyAndRAMIdxMode,
RWMode: nutsdb.FileIO,
SegmentSize: 8 * 1024 * 1024,
SyncEnable: false,
SegmentSize: 64 * 1024,
SyncEnable: true,
StartFileLoadingMode: nutsdb.MMap,
},