From 8516f33fe0df7e22c73c9711f0098200073ee09c Mon Sep 17 00:00:00 2001 From: Nicolas JUHEL Date: Thu, 15 Apr 2021 15:42:49 +0200 Subject: [PATCH] # Fix Race detection - logger - password # Fix Archive : - create archive : add params to clean prefix path from contents path # Fix Nutsdb : - fix client command detection - add test for unknown command before parsing - fix snapshoot process for cluster --- archive/archive.go | 8 +-- archive/gzip/writer.go | 2 +- archive/tar/writer.go | 21 ++++++-- logger/level.go | 51 +++++++++++++------ logger/logger.go | 2 +- nutsdb/entryKv.go | 8 ++- nutsdb/snap.go | 2 +- password/password.go | 8 +-- test/test-nutsdb/main.go | 104 ++++++++++++++++++++++++++++++++------- 9 files changed, 154 insertions(+), 52 deletions(-) diff --git a/archive/archive.go b/archive/archive.go index a17b1b4..1f65601 100644 --- a/archive/archive.go +++ b/archive/archive.go @@ -199,7 +199,7 @@ func ExtractAll(src libiot.FileProgress, originalName, outputPath string, defaul return nil } -func CreateArchive(archiveType ArchiveType, archive libiot.FileProgress, pathContent ...string) (created bool, err liberr.Error) { +func CreateArchive(archiveType ArchiveType, archive libiot.FileProgress, stripPath string, pathContent ...string) (created bool, err liberr.Error) { //@TODO: make function if len(pathContent) < 1 { //nolint #goerr113 @@ -208,11 +208,11 @@ func CreateArchive(archiveType ArchiveType, archive libiot.FileProgress, pathCon switch archiveType { case TypeGzip: - return libgzp.Create(archive, pathContent...) + return libgzp.Create(archive, stripPath, pathContent...) case TypeTar: - return libtar.Create(archive, pathContent...) + return libtar.Create(archive, stripPath, pathContent...) case TypeTarGzip: - return libtar.CreateGzip(archive, pathContent...) + return libtar.CreateGzip(archive, stripPath, pathContent...) } return false, nil diff --git a/archive/gzip/writer.go b/archive/gzip/writer.go index 62e7fab..47dd604 100644 --- a/archive/gzip/writer.go +++ b/archive/gzip/writer.go @@ -36,7 +36,7 @@ import ( liberr "github.com/nabbar/golib/errors" ) -func Create(archive io.WriteSeeker, content ...string) (bool, liberr.Error) { +func Create(archive io.WriteSeeker, stripPath string, content ...string) (bool, liberr.Error) { var ( w *gzip.Writer f *os.File diff --git a/archive/tar/writer.go b/archive/tar/writer.go index 702abd4..823d541 100644 --- a/archive/tar/writer.go +++ b/archive/tar/writer.go @@ -34,17 +34,18 @@ import ( "io" "os" "path/filepath" + "strings" liberr "github.com/nabbar/golib/errors" ) -func Create(archive io.WriteSeeker, content ...string) (bool, liberr.Error) { +func Create(archive io.WriteSeeker, stripPath string, content ...string) (bool, liberr.Error) { if _, err := archive.Seek(0, io.SeekStart); err != nil { return false, ErrorFileSeek.ErrorParent(err) } - if ok, err := createTar(archive, content...); err != nil || !ok { + if ok, err := createTar(archive, stripPath, content...); err != nil || !ok { return ok, err } @@ -55,7 +56,7 @@ func Create(archive io.WriteSeeker, content ...string) (bool, liberr.Error) { return true, nil } -func CreateGzip(archive io.WriteSeeker, content ...string) (bool, liberr.Error) { +func CreateGzip(archive io.WriteSeeker, stripPath string, content ...string) (bool, liberr.Error) { if _, err := archive.Seek(0, io.SeekStart); err != nil { return false, ErrorFileSeek.ErrorParent(err) @@ -63,7 +64,7 @@ func CreateGzip(archive io.WriteSeeker, content ...string) (bool, liberr.Error) z := gzip.NewWriter(archive) - if ok, err := createTar(z, content...); err != nil || !ok { + if ok, err := createTar(z, stripPath, content...); err != nil || !ok { return ok, err } @@ -78,7 +79,7 @@ func CreateGzip(archive io.WriteSeeker, content ...string) (bool, liberr.Error) return true, nil } -func createTar(w io.Writer, content ...string) (bool, liberr.Error) { +func createTar(w io.Writer, stripPath string, content ...string) (bool, liberr.Error) { var ( t *tar.Writer n int64 @@ -87,6 +88,7 @@ func createTar(w io.Writer, content ...string) (bool, liberr.Error) { lEr = ErrorTarCreateAddFile.Error(nil) ) + stripPath = strings.TrimLeft(stripPath, "/") t = tar.NewWriter(w) for i := 0; i < len(content); i++ { @@ -111,6 +113,15 @@ func createTar(w io.Writer, content ...string) (bool, liberr.Error) { // (see https://golang.org/src/archive/tar/common.go?#L626) h.Name = filepath.ToSlash(file) + if stripPath != "" { + h.Name = filepath.Clean(strings.Replace(strings.TrimLeft(h.Name, "/"), stripPath, "", 1)) + } + h.Name = strings.TrimLeft(h.Name, "/") + + if h.Name == "" || h.Name == "." { + return nil + } + // write header if e = t.WriteHeader(h); e != nil { return e diff --git a/logger/level.go b/logger/level.go index f5264d6..6781751 100644 --- a/logger/level.go +++ b/logger/level.go @@ -28,6 +28,7 @@ import ( "encoding/json" "fmt" "strings" + "sync/atomic" "time" "github.com/gin-gonic/gin" @@ -56,18 +57,13 @@ const ( ) var ( - curLevel = NilLevel + curLevel *atomic.Value ) func init() { SetLevel(InfoLevel) } -//GetCurrentLevel return the current loglevel setting in the logger. All log entry matching this level or below will be logged. -func GetCurrentLevel() Level { - return curLevel -} - // GetLevelListString return a list ([]string) of all string loglevel available. func GetLevelListString() []string { return []string{ @@ -80,6 +76,28 @@ func GetLevelListString() []string { } } +//GetCurrentLevel return the current loglevel setting in the logger. All log entry matching this level or below will be logged. +func GetCurrentLevel() Level { + if curLevel == nil { + curLevel = new(atomic.Value) + } + + if i := curLevel.Load(); i == nil { + return NilLevel + } else if l, ok := i.(Level); !ok { + return NilLevel + } else { + return l + } +} + +func setCurLevel(lvl Level) { + if curLevel == nil { + curLevel = new(atomic.Value) + } + curLevel.Store(lvl) +} + // SetLevel Change the Level of all log entry with the Level type given in parameter. The change is apply for next log entry only. // If the given Level type is not matching a correct Level type, no change will be apply. /* @@ -90,30 +108,31 @@ func SetLevel(level Level) { switch level { case PanicLevel: - curLevel = PanicLevel + setCurLevel(PanicLevel) logrus.SetLevel(logrus.PanicLevel) case FatalLevel: - curLevel = FatalLevel + setCurLevel(FatalLevel) logrus.SetLevel(logrus.FatalLevel) case ErrorLevel: - curLevel = ErrorLevel + setCurLevel(ErrorLevel) logrus.SetLevel(logrus.ErrorLevel) case WarnLevel: - curLevel = WarnLevel + setCurLevel(WarnLevel) logrus.SetLevel(logrus.WarnLevel) case InfoLevel: - curLevel = InfoLevel + setCurLevel(InfoLevel) logrus.SetLevel(logrus.InfoLevel) case DebugLevel: - curLevel = DebugLevel + setCurLevel(DebugLevel) logrus.SetLevel(logrus.DebugLevel) case NilLevel: + setCurLevel(NilLevel) return } @@ -126,8 +145,8 @@ func setViperLogTrace() { return } - jwalterweatherman.SetLogOutput(GetIOWriter(curLevel, "[Log Config Viper]")) - jwalterweatherman.SetStdoutOutput(GetIOWriter(curLevel, "[Std Config Viper]")) + jwalterweatherman.SetLogOutput(GetIOWriter(GetCurrentLevel(), "[Log Config Viper]")) + jwalterweatherman.SetStdoutOutput(GetIOWriter(GetCurrentLevel(), "[Std Config Viper]")) if filetrace { jwalterweatherman.SetStdoutThreshold(jwalterweatherman.LevelTrace) @@ -135,7 +154,7 @@ func setViperLogTrace() { } //nolint exhaustive - switch curLevel { + switch GetCurrentLevel() { case PanicLevel: jwalterweatherman.SetStdoutThreshold(jwalterweatherman.LevelCritical) @@ -370,7 +389,7 @@ func (level Level) logDetails(message string, data interface{}, err error, field tags[tagTime] = level.String() - if filetrace && curLevel == DebugLevel { + if filetrace && GetCurrentLevel() == DebugLevel { frame := getFrame() tags[tagCaller] = frame.Function tags[tagFile] = filterPath(frame.File) diff --git a/logger/logger.go b/logger/logger.go index b8c034e..8874902 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -175,7 +175,7 @@ func ginTonicAddError(c *gin.Context, err error) { } func proceed(lvl Level) bool { - return lvl != NilLevel && lvl <= curLevel + return lvl != NilLevel && lvl <= GetCurrentLevel() } func filterPath(pathname string) string { diff --git a/nutsdb/entryKv.go b/nutsdb/entryKv.go index cd8a40a..72bcfe3 100644 --- a/nutsdb/entryKv.go +++ b/nutsdb/entryKv.go @@ -31,6 +31,7 @@ import ( "fmt" "reflect" "runtime" + "strings" "github.com/fxamacker/cbor/v2" liberr "github.com/nabbar/golib/errors" @@ -72,7 +73,8 @@ func NewCommandByCaller(params ...interface{}) *CommandRequest { f := runtime.FuncForPC(pc[0]) d := &CommandRequest{} - d.Cmd = CmdCodeFromName(f.Name()) + fn := strings.Split(f.Name(), ".") + d.Cmd = CmdCodeFromName(fn[len(fn)-1]) if len(params) > 0 { d.Params = params @@ -232,6 +234,10 @@ func (c *CommandRequest) RunLocal(tx *nutsdb.Tx) (*CommandResponse, liberr.Error } func (c *CommandRequest) Run(tx *nutsdb.Tx) ([]byte, liberr.Error) { + if c.Cmd == CmdUnknown { + return nil, ErrorClientCommandInvalid.Error(nil) + } + if r, err := c.RunLocal(tx); err != nil { return nil, err } else if p, e := cbor.Marshal(r); e != nil { diff --git a/nutsdb/snap.go b/nutsdb/snap.go index e2545ee..904516d 100644 --- a/nutsdb/snap.go +++ b/nutsdb/snap.go @@ -104,7 +104,7 @@ func (s *snap) Save(opt Options, writer io.Writer) liberr.Error { _ = t.Close() }() - if _, e = archive.CreateArchive(archive.TypeTarGzip, t, s.path); e != nil { + if _, e = archive.CreateArchive(archive.TypeTarGzip, t, s.path, s.path); e != nil { return ErrorFolderArchive.Error(e) } diff --git a/password/password.go b/password/password.go index 77dca99..08f1c14 100644 --- a/password/password.go +++ b/password/password.go @@ -41,13 +41,9 @@ const ( loopRandMaxLen = 10 ) -var src = rand.NewSource(time.Now().UnixNano()) - -func init() { - rand.Seed(time.Now().UnixNano()) -} - func randStringBytesMaskImprSrc(n int) string { + var src = rand.NewSource(time.Now().UnixNano()) + b := make([]byte, n) // A src.Int63() generates 63 random bits, enough for letterIdxMax characters! for i, cache, remain := n-1, src.Int63(), letterIdxMax; i >= 0; { diff --git a/test/test-nutsdb/main.go b/test/test-nutsdb/main.go index b7bebc0..e43efda 100644 --- a/test/test-nutsdb/main.go +++ b/test/test-nutsdb/main.go @@ -28,13 +28,18 @@ package main import ( + "bytes" "context" "errors" "fmt" "os" + "runtime" + "strings" "sync/atomic" "time" + "github.com/nabbar/golib/password" + "github.com/nabbar/golib/logger" libclu "github.com/nabbar/golib/cluster" @@ -48,8 +53,10 @@ import ( const ( BaseDirPattern = "/nutsdb/node-%d" NbInstances = 3 - NbEntries = 100000 + NbEntries = 10000 LoggerFile = "/nutsdb/nutsdb.log" + AllowPut = true + AllowGet = true ) var ( @@ -59,7 +66,7 @@ var ( func init() { liberr.SetModeReturnError(liberr.ErrorReturnCodeErrorTraceFull) - logger.SetLevel(logger.WarnLevel) + logger.SetLevel(logger.InfoLevel) logger.AddGID(true) logger.EnableColor() logger.FileTrace(true) @@ -96,10 +103,19 @@ func main() { } }() + println(fmt.Sprintf("Running test with %d threads...", runtime.GOMAXPROCS(0))) println(fmt.Sprintf("Init cluster...")) tStart := time.Now() cluster := Start(ctx) + + logger.SetLevel(logger.WarnLevel) + defer func() { + Stop(ctx, cluster) + }() + tInit := time.Since(tStart) + mInit := fmt.Sprintf("Memory used after Init: \n%s", strings.Join(GetMemUsage(), "\n")) + runtime.GC() println(fmt.Sprintf("Init done. \n")) pgb := progress.NewProgressBarWithContext(ctx, mpb.WithWidth(64), mpb.WithRefreshRate(200*time.Millisecond)) @@ -114,14 +130,18 @@ func main() { go func(ctx context.Context, bar progress.Bar, clu libndb.NutsDB, num int) { defer bar.DeferWorker() - Put(ctx, clu, fmt.Sprintf("key-%3d", num), fmt.Sprintf("val-%03d", num)) - }(ctx, barPut, cluster[i%3], i) + if AllowPut { + Put(ctx, clu, fmt.Sprintf("key-%03d", num), fmt.Sprintf("val-%03d|%s|%s|%s", num, password.Generate(50), password.Generate(50), password.Generate(50))) + } + }(ctx, barPut, cluster[i%3], i+1) } if e := barPut.WaitAll(); e != nil { panic(e) } tPut := time.Since(tStart) + mPut := fmt.Sprintf("Memory used after Put entries: \n%s", strings.Join(GetMemUsage(), "\n")) + runtime.GC() barGet := pgb.NewBarSimpleCounter("GetEntry", int64(NbEntries)) defer barGet.DeferMain(false) @@ -139,20 +159,52 @@ func main() { go func(ctx context.Context, bar progress.Bar, clu libndb.NutsDB, num int) { defer bar.DeferWorker() - Get(ctx, clu, fmt.Sprintf("key-%3d", num)) - }(ctx, barGet, cluster[c], i) + if AllowGet { + Get(ctx, clu, fmt.Sprintf("key-%03d", num), fmt.Sprintf("val-%03d", num)) + } + }(ctx, barGet, cluster[c], i+1) } if e := barGet.WaitAll(); e != nil { panic(e) } tGet := time.Since(tStart) + mGet := fmt.Sprintf("Memory used after Get entries: \n%s", strings.Join(GetMemUsage(), "\n")) + runtime.GC() - time.Sleep(10 * time.Second) + barPut.DeferMain(false) + barPut = nil - println(fmt.Sprintf("Time for init cluster: %s", tInit.String())) - println(fmt.Sprintf("Time for %d Put in DB: %s", NbEntries, tPut.String())) - println(fmt.Sprintf("Time for %d Get in DB: %s", NbEntries, tGet.String())) + barGet.DeferMain(false) + barGet = nil + + pgb = nil + res := []string{ + fmt.Sprintf("Time for init cluster: %s", tInit.String()), + fmt.Sprintf("Time for %d Put in DB: %s ( %s by entry )", NbEntries, tPut.String(), (tPut / NbEntries).String()), + fmt.Sprintf("Time for %d Get in DB: %s ( %s by entry )", NbEntries, tGet.String(), (tGet / NbEntries).String()), + mInit, + mPut, + mGet, + } + runtime.GC() + logger.SetLevel(logger.InfoLevel) + time.Sleep(5 * time.Second) + + println(strings.Join(res, "\n")) + logger.InfoLevel.Logf("Results testing: \n%s", strings.Join(res, "\n")) + time.Sleep(5 * time.Second) +} + +func GetMemUsage() []string { + var m runtime.MemStats + runtime.ReadMemStats(&m) + return []string{ + fmt.Sprintf("\t - Alloc = %v MiB", m.Alloc/1024/1024), + fmt.Sprintf("\t - TotalAlloc = %v MiB", m.TotalAlloc/1024/1024), + fmt.Sprintf("\t - Sys = %v MiB", m.Sys/1024/1024), + fmt.Sprintf("\t - NumGC = %v\n", m.NumGC), + } } func Put(ctx context.Context, c libndb.NutsDB, key, val string) { @@ -161,10 +213,16 @@ func Put(ctx context.Context, c libndb.NutsDB, key, val string) { //fmt.Printf("Cmd Put(%s, %s) : %v\n", key, val, res) } -func Get(ctx context.Context, c libndb.NutsDB, key string) { - _, _ = c.Client(ctx, 100*time.Microsecond).Get("myBucket", []byte(key)) - //v, e := c.Client(ctx, 100*time.Microsecond).Get("myBucket", []byte(key)) - //fmt.Printf("Cmd Get(%s) : %v --- err : %v\n", key, string(v.Value), e) +func Get(ctx context.Context, c libndb.NutsDB, key, val string) { + //_, _ = c.Client(ctx, 100*time.Microsecond).Get("myBucket", []byte(key)) + v, e := c.Client(ctx, 100*time.Microsecond).Get("myBucket", []byte(key)) + if e != nil { + logger.ErrorLevel.Logf("Cmd Get for key '%s', error : %v", key, e) + fmt.Printf("Cmd Get for key '%s', error : %v", key, e) + } else if !bytes.HasPrefix(v.Value, []byte(val)) { + logger.ErrorLevel.Logf("Cmd Get for key '%s', awaiting value start with '%s', but find : %s", key, val, string(v.Value)) + fmt.Printf("Cmd Get for key '%s', awaiting value start with '%s', but find : %s", key, val, string(v.Value)) + } } func Start(ctx context.Context) []libndb.NutsDB { @@ -173,6 +231,7 @@ func Start(ctx context.Context) []libndb.NutsDB { for i := 0; i < NbInstances; i++ { clusters[i] = initNutDB(i + 1) + logger.InfoLevel.Logf("Starting node ID #%d...", i+1) if err := clusters[i].Listen(); err != nil { panic(err) } @@ -183,6 +242,17 @@ func Start(ctx context.Context) []libndb.NutsDB { return clusters } +func Stop(ctx context.Context, clusters []libndb.NutsDB) { + for i := 0; i < NbInstances; i++ { + logger.InfoLevel.Logf("Stopping node ID #%d...", i+1) + if err := clusters[i].Shutdown(); err != nil { + panic(err) + } + + time.Sleep(5 * time.Second) + } +} + func initNutDB(num int) libndb.NutsDB { cfg := configNutDB() cfg.Cluster.Cluster.NodeID = uint64(num) @@ -195,10 +265,10 @@ func initNutDB(num int) libndb.NutsDB { func configNutDB() libndb.Config { cfg := libndb.Config{ DB: libndb.NutsDBOptions{ - EntryIdxMode: nutsdb.HintKeyValAndRAMIdxMode, + EntryIdxMode: nutsdb.HintKeyAndRAMIdxMode, RWMode: nutsdb.FileIO, - SegmentSize: 8 * 1024 * 1024, - SyncEnable: false, + SegmentSize: 64 * 1024, + SyncEnable: true, StartFileLoadingMode: nutsdb.MMap, },