diff --git a/config/components.go b/config/components.go index 0cc3dcd..f8f4f79 100644 --- a/config/components.go +++ b/config/components.go @@ -34,6 +34,7 @@ import ( cfgcst "github.com/nabbar/golib/config/const" cfgtps "github.com/nabbar/golib/config/types" + loglvl "github.com/nabbar/golib/logger/level" spfcbr "github.com/spf13/cobra" "golang.org/x/exp/slices" ) @@ -118,12 +119,22 @@ func (c *configModel) ComponentStart() error { } else if cpt := c.ComponentGet(key); cpt == nil { continue } else { + if l := c.getDefaultLogger(); l != nil { + l.Entry(loglvl.InfoLevel, fmt.Sprintf("starting component '%s'", key)).Log() + } e := cpt.Start() c.componentUpdate(key, cpt) if e != nil { + if l := c.getDefaultLogger(); l != nil { + l.Entry(loglvl.ErrorLevel, fmt.Sprintf("component '%s' starting return an error", key)).ErrorAdd(true, e).Log() + } err.Add(e) } else if !cpt.IsStarted() { - err.Add(fmt.Errorf("component '%s' has been call to start, but is not started", key)) + e = fmt.Errorf("component '%s' has been call to start, but is not started", key) + if l := c.getDefaultLogger(); l != nil { + l.Entry(loglvl.ErrorLevel, fmt.Sprintf("component '%s' is not started", key)).ErrorAdd(true, e).Log() + } + err.Add(e) } } } @@ -163,11 +174,21 @@ func (c *configModel) ComponentReload() error { } else if cpt := c.ComponentGet(key); cpt == nil { continue } else { + if l := c.getDefaultLogger(); l != nil { + l.Entry(loglvl.InfoLevel, fmt.Sprintf("reloading component '%s'", key)).Log() + } e := cpt.Reload() c.componentUpdate(key, cpt) if e != nil { + if l := c.getDefaultLogger(); l != nil { + l.Entry(loglvl.ErrorLevel, fmt.Sprintf("component '%s' reloading return an error", key)).ErrorAdd(true, e).Log() + } err.Add(e) } else if !cpt.IsStarted() { + e = fmt.Errorf("component '%s' has been call to reload, but is not started", key) + if l := c.getDefaultLogger(); l != nil { + l.Entry(loglvl.ErrorLevel, fmt.Sprintf("component '%s' is not started after a reload", key)).ErrorAdd(true, e).Log() + } err.Add(fmt.Errorf("component '%s' has been call to reload, but is not started", key)) } } diff --git a/go.mod b/go.mod index 7b5c09e..d1a26a4 100644 --- a/go.mod +++ b/go.mod @@ -2,16 +2,16 @@ module github.com/nabbar/golib go 1.21 -toolchain go1.21.6 +toolchain go1.21.7 require ( - github.com/aws/aws-sdk-go v1.50.19 - github.com/aws/aws-sdk-go-v2 v1.25.0 - github.com/aws/aws-sdk-go-v2/config v1.27.0 - github.com/aws/aws-sdk-go-v2/credentials v1.17.0 - github.com/aws/aws-sdk-go-v2/service/iam v1.29.0 - github.com/aws/aws-sdk-go-v2/service/s3 v1.49.0 - github.com/aws/smithy-go v1.20.0 + github.com/aws/aws-sdk-go v1.50.33 + github.com/aws/aws-sdk-go-v2 v1.25.2 + github.com/aws/aws-sdk-go-v2/config v1.27.6 + github.com/aws/aws-sdk-go-v2/credentials v1.17.6 + github.com/aws/aws-sdk-go-v2/service/iam v1.31.1 + github.com/aws/aws-sdk-go-v2/service/s3 v1.51.3 + github.com/aws/smithy-go v1.20.1 github.com/bits-and-blooms/bitset v1.13.0 github.com/c-bata/go-prompt v0.2.6 github.com/fatih/color v1.16.0 @@ -19,7 +19,7 @@ require ( github.com/fxamacker/cbor/v2 v2.6.0 github.com/gin-gonic/gin v1.9.1 github.com/go-ldap/ldap/v3 v3.4.6 - github.com/go-playground/validator/v10 v10.18.0 + github.com/go-playground/validator/v10 v10.19.0 github.com/google/go-github/v33 v33.0.0 github.com/hashicorp/go-hclog v1.6.2 github.com/hashicorp/go-retryablehttp v0.7.5 @@ -31,14 +31,14 @@ require ( github.com/mattn/go-colorable v0.1.13 github.com/mitchellh/go-homedir v1.1.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/nats-io/jwt/v2 v2.5.4 + github.com/nats-io/jwt/v2 v2.5.5 github.com/nats-io/nats-server/v2 v2.10.11 - github.com/nats-io/nats.go v1.33.0 + github.com/nats-io/nats.go v1.33.1 github.com/nutsdb/nutsdb v0.14.3 - github.com/onsi/ginkgo/v2 v2.15.0 + github.com/onsi/ginkgo/v2 v2.16.0 github.com/onsi/gomega v1.31.1 github.com/pelletier/go-toml v1.9.5 - github.com/prometheus/client_golang v1.18.0 + github.com/prometheus/client_golang v1.19.0 github.com/shirou/gopsutil v3.21.11+incompatible github.com/sirupsen/logrus v1.9.3 github.com/spf13/cobra v1.8.0 @@ -46,15 +46,15 @@ require ( github.com/spf13/viper v1.18.2 github.com/ugorji/go/codec v1.2.12 github.com/vbauerster/mpb/v8 v8.7.2 - github.com/xanzy/go-gitlab v0.97.0 + github.com/xanzy/go-gitlab v0.99.0 github.com/xhit/go-simple-mail v2.2.2+incompatible github.com/xujiajun/utils v0.0.0-20220904132955-5f7c5b914235 - golang.org/x/exp v0.0.0-20240213143201-ec583247a57a - golang.org/x/net v0.21.0 - golang.org/x/oauth2 v0.17.0 + golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 + golang.org/x/net v0.22.0 + golang.org/x/oauth2 v0.18.0 golang.org/x/sync v0.6.0 - golang.org/x/sys v0.17.0 - golang.org/x/term v0.17.0 + golang.org/x/sys v0.18.0 + golang.org/x/term v0.18.0 gopkg.in/yaml.v3 v3.0.1 gorm.io/driver/clickhouse v0.6.0 gorm.io/driver/mysql v1.5.4 @@ -66,12 +66,12 @@ require ( require ( github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect - github.com/ClickHouse/ch-go v0.61.2 // indirect - github.com/ClickHouse/clickhouse-go/v2 v2.18.0 // indirect + github.com/ClickHouse/ch-go v0.61.3 // indirect + github.com/ClickHouse/clickhouse-go/v2 v2.20.0 // indirect github.com/Masterminds/goutils v1.1.1 // indirect github.com/Masterminds/semver v1.5.0 // indirect github.com/Masterminds/sprig v2.22.0+incompatible // indirect - github.com/PuerkitoBio/goquery v1.8.1 // indirect + github.com/PuerkitoBio/goquery v1.9.1 // indirect github.com/VictoriaMetrics/metrics v1.6.2 // indirect github.com/VividCortex/ewma v1.2.0 // indirect github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d // indirect @@ -80,22 +80,22 @@ require ( github.com/antlabs/stl v0.0.2 // indirect github.com/antlabs/timer v0.1.3 // indirect github.com/armon/go-metrics v0.4.1 // indirect - github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0 // indirect - github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.2 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect - github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.0 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.0 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.0 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.0 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0 // indirect - github.com/aws/aws-sdk-go-v2/service/sso v1.19.0 // indirect - github.com/aws/aws-sdk-go-v2/service/ssooidc v1.22.0 // indirect - github.com/aws/aws-sdk-go-v2/service/sts v1.27.0 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.2 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.1 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.4 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.4 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.2 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.20.1 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.1 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.28.3 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bwmarrin/snowflake v0.3.0 // indirect - github.com/bytedance/sonic v1.10.2 // indirect + github.com/bytedance/sonic v1.11.2 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect github.com/chenzhuoyu/iasm v0.9.1 // indirect @@ -120,12 +120,12 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect github.com/golang-sql/sqlexp v0.1.0 // indirect - github.com/golang/protobuf v1.5.3 // indirect + github.com/golang/protobuf v1.5.4 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/btree v1.0.1 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/go-querystring v1.1.0 // indirect - github.com/google/pprof v0.0.0-20240207164012-fb44976bdcd5 // indirect + github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7 // indirect github.com/google/uuid v1.6.0 // indirect github.com/gorilla/css v1.0.1 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect @@ -142,7 +142,7 @@ require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect - github.com/jackc/pgx/v5 v5.5.3 // indirect + github.com/jackc/pgx/v5 v5.5.4 // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/jaytaylor/html2text v0.0.0-20230321000545-74c2419ad056 // indirect github.com/jinzhu/inflection v1.0.0 // indirect @@ -150,8 +150,8 @@ require ( github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/juju/ratelimit v1.0.2-0.20191002062651-f60b32039441 // indirect - github.com/klauspost/compress v1.17.6 // indirect - github.com/klauspost/cpuid/v2 v2.2.6 // indirect + github.com/klauspost/compress v1.17.7 // indirect + github.com/klauspost/cpuid/v2 v2.2.7 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect github.com/leodido/go-urn v1.4.0 // indirect @@ -161,8 +161,7 @@ require ( github.com/mattn/go-runewidth v0.0.15 // indirect github.com/mattn/go-sqlite3 v1.14.22 // indirect github.com/mattn/go-tty v0.0.5 // indirect - github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect - github.com/microsoft/go-mssqldb v1.6.0 // indirect + github.com/microsoft/go-mssqldb v1.7.0 // indirect github.com/miekg/dns v1.1.43 // indirect github.com/minio/highwayhash v1.0.2 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect @@ -178,7 +177,7 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/pkg/term v1.2.0-beta.2 // indirect github.com/prometheus/client_model v0.5.0 // indirect - github.com/prometheus/common v0.45.0 // indirect + github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/rogpeppe/go-internal v1.10.0 // indirect @@ -203,15 +202,15 @@ require ( github.com/x448/float16 v0.8.4 // indirect github.com/xujiajun/mmap-go v1.0.1 // indirect github.com/yusufpapurcu/wmi v1.2.3 // indirect - go.opentelemetry.io/otel v1.23.1 // indirect - go.opentelemetry.io/otel/trace v1.23.1 // indirect + go.opentelemetry.io/otel v1.24.0 // indirect + go.opentelemetry.io/otel/trace v1.24.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/arch v0.7.0 // indirect - golang.org/x/crypto v0.19.0 // indirect + golang.org/x/crypto v0.21.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.5.0 // indirect - golang.org/x/tools v0.18.0 // indirect + golang.org/x/tools v0.19.0 // indirect google.golang.org/appengine v1.6.8 // indirect - google.golang.org/protobuf v1.32.0 // indirect + google.golang.org/protobuf v1.33.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect ) diff --git a/httpserver/run.go b/httpserver/run.go index c83d2e2..f3d2c31 100644 --- a/httpserver/run.go +++ b/httpserver/run.go @@ -86,7 +86,27 @@ func (o *srv) runStart(ctx context.Context) error { return ErrorServerValidate.Error(nil) } - return o.r.Start(ctx) + if e := o.r.Start(ctx); e != nil { + return e + } + + var x, n = context.WithTimeout(ctx, 30*time.Second) + + defer n() + + for !o.r.IsRunning() { + select { + case <-x.Done(): + return errNotRunning + default: + time.Sleep(100 * time.Millisecond) + if o.r.IsRunning() { + return o.GetError() + } + } + } + + return o.GetError() } func (o *srv) runStop(ctx context.Context) error { @@ -239,3 +259,25 @@ func (o *srv) Uptime() time.Duration { return 0 } + +func (o *srv) IsError() bool { + if el := o.r.ErrorsList(); len(el) > 0 { + for _, e := range el { + if e != nil { + return true + } + } + } + + return false +} + +func (o *srv) GetError() error { + var err = ErrorServerStart.Error(o.r.ErrorsList()...) + + if err.HasParent() { + return err + } + + return nil +} diff --git a/monitor/server.go b/monitor/server.go index c13f27f..4643da4 100644 --- a/monitor/server.go +++ b/monitor/server.go @@ -36,7 +36,7 @@ import ( const ( MaxPoolStart = 15 * time.Second - MaxTickPooler = 500 * time.Millisecond + MaxTickPooler = 50 * time.Millisecond ) func (o *mon) Start(ctx context.Context) error { @@ -126,10 +126,10 @@ func (o *mon) poolIsRunning(ctx context.Context) error { for { select { case <-tck.C: - if time.Since(tms) >= MaxPoolStart { - return ErrorTimeout.Error(nil) - } else if o.IsRunning() { + if o.IsRunning() { return nil + } else if time.Since(tms) >= MaxPoolStart { + return ErrorTimeout.Error(nil) } case <-ctx.Done(): return ctx.Err() diff --git a/server/interface.go b/server/interface.go index e97e5f2..86aa76e 100644 --- a/server/interface.go +++ b/server/interface.go @@ -31,7 +31,8 @@ import ( "time" ) -type Action func(ctx context.Context) error +type FuncAction func(ctx context.Context) error +type FuncTicker func(ctx context.Context, tck *time.Ticker) error type Server interface { // Start is used to start the server diff --git a/server/runner/startStop/chan.go b/server/runner/startStop/chan.go deleted file mode 100644 index f75c466..0000000 --- a/server/runner/startStop/chan.go +++ /dev/null @@ -1,124 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2022 Nicolas JUHEL - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - * - * - */ - -package startStop - -var closeChan = make(chan struct{}) - -func init() { - close(closeChan) -} - -func (o *run) getChan() *chData { - if i := o.c.Load(); i == nil { - return &chData{ - cl: true, - ch: closeChan, - } - } else if d, k := i.(*chData); !k { - return &chData{ - cl: true, - ch: closeChan, - } - } else { - return d - } -} - -func (o *run) setChan(d *chData) { - if d == nil { - d = &chData{ - cl: true, - ch: closeChan, - } - } - o.c.Store(d) -} - -func (o *run) IsRunning() bool { - if e := o.checkMe(); e != nil { - return false - } - - c := o.getChan() - - if c.cl || c.ch == closeChan { - return false - } - - select { - case <-c.ch: - return false - default: - return true - } -} - -func (o *run) chanInit() { - if e := o.checkMe(); e != nil { - return - } - - o.setChan(&chData{ - cl: false, - ch: make(chan struct{}), - }) -} - -func (o *run) chanDone() <-chan struct{} { - if e := o.checkMe(); e != nil { - return nil - } - - return o.getChan().ch -} - -func (o *run) chanSend() { - if e := o.checkMe(); e != nil { - return - } - - c := o.getChan() - if !c.cl && c.ch != closeChan { - c.ch <- struct{}{} - o.setChan(c) - } -} - -func (o *run) chanClose() { - if e := o.checkMe(); e != nil { - return - } - - c := o.getChan() - if !c.cl && c.ch != closeChan { - close(c.ch) - o.setChan(&chData{ - cl: true, - ch: closeChan, - }) - } -} diff --git a/server/runner/startStop/interface.go b/server/runner/startStop/interface.go index c0a7acf..3953360 100644 --- a/server/runner/startStop/interface.go +++ b/server/runner/startStop/interface.go @@ -45,6 +45,7 @@ func New(start, stop func(ctx context.Context) error) StartStop { f: start, s: stop, t: new(atomic.Value), - c: new(atomic.Value), + r: new(atomic.Bool), + n: new(atomic.Value), } } diff --git a/server/runner/startStop/model.go b/server/runner/startStop/model.go index 2f49b41..115b669 100644 --- a/server/runner/startStop/model.go +++ b/server/runner/startStop/model.go @@ -49,13 +49,14 @@ type chData struct { type run struct { e *atomic.Value // slice []error - f libsrv.Action - s libsrv.Action + f libsrv.FuncAction + s libsrv.FuncAction t *atomic.Value - c *atomic.Value // chan struct{} + r *atomic.Bool // Want Stop + n *atomic.Value // context.CancelFunc } -func (o *run) getFctStart() libsrv.Action { +func (o *run) getFctStart() libsrv.FuncAction { if o.f == nil { return func(ctx context.Context) error { return fmt.Errorf("invalid start function") @@ -64,7 +65,7 @@ func (o *run) getFctStart() libsrv.Action { return o.f } -func (o *run) getFctStop() libsrv.Action { +func (o *run) getFctStop() libsrv.FuncAction { if o.s == nil { return func(ctx context.Context) error { return fmt.Errorf("invalid stop function") @@ -73,102 +74,6 @@ func (o *run) getFctStop() libsrv.Action { return o.s } -func (o *run) Restart(ctx context.Context) error { - if e := o.Stop(ctx); e != nil { - return e - } else if e = o.Start(ctx); e != nil { - _ = o.Stop(ctx) - return e - } - - return nil -} - -func (o *run) Stop(ctx context.Context) error { - if e := o.checkMe(); e != nil { - return e - } else if !o.IsRunning() { - return nil - } - - var ( - e error - t = time.NewTicker(pollStop) - ) - - o.t.Store(time.Time{}) - - defer func() { - libsrv.RecoveryCaller("golib/server/startstop", recover()) - t.Stop() - }() - - for { - select { - case <-t.C: - if o.IsRunning() { - o.chanSend() - e = o.callStop(ctx) - } else { - return e - } - case <-ctx.Done(): - return ctx.Err() - } - } -} - -func (o *run) callStop(ctx context.Context) error { - if o == nil { - return nil - } else { - return o.getFctStop()(ctx) - } -} - -func (o *run) Start(ctx context.Context) error { - if e := o.checkMeStart(ctx); e != nil { - return e - } - - var can context.CancelFunc - ctx, can = context.WithCancel(ctx) - o.t.Store(time.Now()) - - go func(x context.Context, n context.CancelFunc) { - defer n() - - o.chanInit() - defer func() { - libsrv.RecoveryCaller("golib/server/startstop", recover()) - _ = o.Stop(ctx) - }() - - if e := o.getFctStart()(x); e != nil { - o.errorsAdd(e) - return - } - }(ctx, can) - - go func(x context.Context, n context.CancelFunc) { - defer n() - defer func() { - _ = o.Stop(ctx) - }() - - for { - select { - case <-o.chanDone(): - return - case <-x.Done(): - return - } - } - }(ctx, can) - - return nil -} - func (o *run) checkMe() error { if o == nil { return ErrInvalid @@ -200,3 +105,112 @@ func (o *run) Uptime() time.Duration { return time.Since(t) } } + +func (o *run) IsRunning() bool { + if e := o.checkMe(); e != nil { + return false + } + + return o.r.Load() +} + +func (o *run) Restart(ctx context.Context) error { + if e := o.Stop(ctx); e != nil { + return e + } else if e = o.Start(ctx); e != nil { + _ = o.Stop(ctx) + return e + } + + return nil +} + +func (o *run) Stop(ctx context.Context) error { + if e := o.checkMe(); e != nil { + return e + } else if !o.IsRunning() { + return nil + } + + var ( + e error + t = time.NewTicker(pollStop) + ) + + o.errorsClean() + + if i := o.n.Load(); i != nil { + if f, k := i.(context.CancelFunc); k { + f() + } + } + + defer func() { + libsrv.RecoveryCaller("golib/server/startstop", recover()) + t.Stop() + }() + + for { + select { + case <-t.C: + if o.IsRunning() { + e = o.getFctStop()(ctx) + } else { + return e + } + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func (o *run) Start(ctx context.Context) error { + if e := o.checkMeStart(ctx); e != nil { + return e + } + + cx, ca := context.WithCancel(ctx) + + if i := o.n.Swap(ca); i != nil { + if f, k := i.(context.CancelFunc); k { + f() + } + } + + o.errorsClean() + + fStart := func(c context.Context) error { + defer libsrv.RecoveryCaller("golib/server/startstop", recover()) + return o.getFctStart()(c) + } + + fStop := func(c context.Context) error { + defer libsrv.RecoveryCaller("golib/server/startstop", recover()) + return o.getFctStop()(c) + } + + go func(x context.Context, n context.CancelFunc, start, stop libsrv.FuncAction) { + defer func() { + libsrv.RecoveryCaller("golib/server/startstop", recover()) + _ = stop(ctx) + + n() + + o.t.Store(time.Time{}) + o.r.Store(false) + }() + + o.t.Store(time.Now()) + + for !o.r.Load() && ctx.Err() == nil { + o.r.Store(true) + + if e := start(x); e != nil { + o.errorsAdd(e) + return + } + } + }(cx, ca, fStart, fStop) + + return nil +} diff --git a/server/runner/ticker/chan.go b/server/runner/ticker/chan.go deleted file mode 100644 index d55c96d..0000000 --- a/server/runner/ticker/chan.go +++ /dev/null @@ -1,124 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2022 Nicolas JUHEL - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - * - * - */ - -package ticker - -var closeChan = make(chan struct{}) - -func init() { - close(closeChan) -} - -func (o *run) getChan() *chData { - if i := o.c.Load(); i == nil { - return &chData{ - cl: true, - ch: closeChan, - } - } else if d, k := i.(*chData); !k { - return &chData{ - cl: true, - ch: closeChan, - } - } else { - return d - } -} - -func (o *run) setChan(d *chData) { - if d == nil { - d = &chData{ - cl: true, - ch: closeChan, - } - } - o.c.Store(d) -} - -func (o *run) IsRunning() bool { - if e := o.checkMe(); e != nil { - return false - } - - c := o.getChan() - - if c.cl || c.ch == closeChan { - return false - } - - select { - case <-c.ch: - return false - default: - return true - } -} - -func (o *run) chanInit() { - if e := o.checkMe(); e != nil { - return - } - - o.setChan(&chData{ - cl: false, - ch: make(chan struct{}), - }) -} - -func (o *run) chanDone() <-chan struct{} { - if e := o.checkMe(); e != nil { - return nil - } - - return o.getChan().ch -} - -func (o *run) chanSend() { - if e := o.checkMe(); e != nil { - return - } - - c := o.getChan() - if !c.cl && c.ch != closeChan { - c.ch <- struct{}{} - o.setChan(c) - } -} - -func (o *run) chanClose() { - if e := o.checkMe(); e != nil { - return - } - - c := o.getChan() - if !c.cl && c.ch != closeChan { - close(c.ch) - o.setChan(&chData{ - cl: true, - ch: closeChan, - }) - } -} diff --git a/server/runner/ticker/interface.go b/server/runner/ticker/interface.go index 8d5e4ca..f71f858 100644 --- a/server/runner/ticker/interface.go +++ b/server/runner/ticker/interface.go @@ -46,6 +46,7 @@ func New(tick time.Duration, fct func(ctx context.Context, tck *time.Ticker) err f: fct, d: tick, t: new(atomic.Value), - c: new(atomic.Value), + r: new(atomic.Bool), + n: new(atomic.Value), } } diff --git a/server/runner/ticker/model.go b/server/runner/ticker/model.go index e06d44e..7fe46f9 100644 --- a/server/runner/ticker/model.go +++ b/server/runner/ticker/model.go @@ -29,6 +29,7 @@ package ticker import ( "context" "errors" + "fmt" "sync/atomic" "time" @@ -42,17 +43,13 @@ const ( var ErrInvalid = errors.New("invalid instance") -type chData struct { - cl bool - ch chan struct{} -} - type run struct { e *atomic.Value // slice []error - f func(ctx context.Context, tck *time.Ticker) error + f libsrv.FuncTicker d time.Duration t *atomic.Value - c *atomic.Value // chan struct{} + r *atomic.Bool + n *atomic.Value } func (o *run) getDuration() time.Duration { @@ -60,98 +57,17 @@ func (o *run) getDuration() time.Duration { return o.d } -func (o *run) getFunction() func(ctx context.Context, tck *time.Ticker) error { +func (o *run) getFunction() libsrv.FuncTicker { // still check on function checkMe + if o.f == nil { + return func(ctx context.Context, tck *time.Ticker) error { + return fmt.Errorf("invalid function ticker") + } + } + return o.f } -func (o *run) Restart(ctx context.Context) error { - if e := o.Stop(ctx); e != nil { - return e - } else if e = o.Start(ctx); e != nil { - _ = o.Stop(ctx) - return e - } - - return nil -} - -func (o *run) Stop(ctx context.Context) error { - if e := o.checkMe(); e != nil { - return e - } - - o.t.Store(time.Time{}) - - if !o.IsRunning() { - return nil - } - - var t = time.NewTicker(pollStop) - defer t.Stop() - - for { - select { - case <-t.C: - if o.IsRunning() { - o.chanSend() - } else { - return nil - } - case <-ctx.Done(): - return ctx.Err() - } - } -} - -func (o *run) Start(ctx context.Context) error { - if e := o.checkMeStart(ctx); e != nil { - return e - } - - go func(con context.Context) { - var ( - tck = time.NewTicker(o.getDuration()) - x, n = context.WithCancel(con) - ) - - defer func() { - libsrv.RecoveryCaller("golib/server/ticker", recover()) - if n != nil { - n() - } - if tck != nil { - tck.Stop() - } - o.chanClose() - }() - - o.t.Store(time.Now()) - - o.chanInit() - o.errorsClean() - - for { - select { - case <-tck.C: - f := func(ctx context.Context, tck *time.Ticker) error { - defer libsrv.RecoveryCaller("golib/server/ticker", recover()) - return o.getFunction()(ctx, tck) - } - if e := f(x, tck); e != nil { - o.errorsAdd(e) - } - case <-con.Done(): - return - case <-o.chanDone(): - return - } - } - }(ctx) - - return nil -} - func (o *run) checkMe() error { if o == nil { return ErrInvalid @@ -183,3 +99,107 @@ func (o *run) Uptime() time.Duration { return time.Since(t) } } + +func (o *run) IsRunning() bool { + if e := o.checkMe(); e != nil { + return false + } + + return o.r.Load() +} + +func (o *run) Restart(ctx context.Context) error { + if e := o.Stop(ctx); e != nil { + return e + } else if e = o.Start(ctx); e != nil { + _ = o.Stop(ctx) + return e + } + + return nil +} + +func (o *run) Stop(ctx context.Context) error { + if e := o.checkMe(); e != nil { + return e + } else if !o.IsRunning() { + return nil + } + + var t = time.NewTicker(pollStop) + defer t.Stop() + + o.errorsClean() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-t.C: + if o.IsRunning() { + if i := o.n.Load(); i != nil { + if f, k := i.(context.CancelFunc); k { + f() + } else { + o.r.Store(false) + } + } else { + o.r.Store(false) + } + } else { + return nil + } + } + } +} + +func (o *run) Start(ctx context.Context) error { + if e := o.checkMeStart(ctx); e != nil { + return e + } + + cx, ca := context.WithCancel(ctx) + + if i := o.n.Swap(ca); i != nil { + if f, k := i.(context.CancelFunc); k { + f() + } + } + + o.errorsClean() + + fct := func(ctx context.Context, tck *time.Ticker) error { + defer libsrv.RecoveryCaller("golib/server/ticker", recover()) + return o.getFunction()(ctx, tck) + } + + go func(x context.Context, n context.CancelFunc, f libsrv.FuncTicker) { + var tck = time.NewTicker(o.getDuration()) + + defer func() { + libsrv.RecoveryCaller("golib/server/ticker", recover()) + tck.Stop() + n() + + o.t.Store(time.Time{}) + o.r.Store(false) + }() + + o.r.Store(true) + o.t.Store(time.Now()) + + for { + select { + case <-x.Done(): + return + case <-tck.C: + o.r.Store(true) + if e := f(x, tck); e != nil { + o.errorsAdd(e) + } + } + } + }(cx, ca, fct) + + return nil +} diff --git a/socket/client/tcp/error.go b/socket/client/tcp/error.go index 5618c7e..d7558bb 100644 --- a/socket/client/tcp/error.go +++ b/socket/client/tcp/error.go @@ -29,6 +29,7 @@ package tcp import "fmt" var ( - ErrInstance = fmt.Errorf("invalid instance") - ErrAddress = fmt.Errorf("invalid dial address") + ErrInstance = fmt.Errorf("invalid instance") + ErrConnection = fmt.Errorf("invalid connection") + ErrAddress = fmt.Errorf("invalid dial address") ) diff --git a/socket/client/tcp/interface.go b/socket/client/tcp/interface.go index a24c7fa..24c7a7e 100644 --- a/socket/client/tcp/interface.go +++ b/socket/client/tcp/interface.go @@ -59,5 +59,6 @@ func New(buffSizeRead libsiz.Size, address string) (ClientTCP, error) { s: s, e: new(atomic.Value), i: new(atomic.Value), + c: new(atomic.Value), }, nil } diff --git a/socket/client/tcp/model.go b/socket/client/tcp/model.go index 6ffd7c4..d71b3cc 100644 --- a/socket/client/tcp/model.go +++ b/socket/client/tcp/model.go @@ -27,7 +27,6 @@ package tcp import ( - "bufio" "context" "errors" "io" @@ -43,6 +42,7 @@ type cli struct { s *atomic.Int32 // buffer size e *atomic.Value // function error i *atomic.Value // function info + c *atomic.Value // net.Conn } func (o *cli) RegisterFuncError(f libsck.FuncError) { @@ -109,89 +109,98 @@ func (o *cli) dial(ctx context.Context) (net.Conn, error) { } } -func (o *cli) Do(ctx context.Context, request io.Reader, fct libsck.Response) error { +func (o *cli) Connect(ctx context.Context) error { if o == nil { return ErrInstance } var ( err error - cnn net.Conn + con net.Conn ) - defer func() { - if cnn != nil { - o.fctInfo(cnn.LocalAddr(), cnn.RemoteAddr(), libsck.ConnectionClose) - o.fctError(cnn.Close()) - } - }() - o.fctInfo(&net.TCPAddr{}, &net.TCPAddr{}, libsck.ConnectionDial) - if cnn, err = o.dial(ctx); err != nil { + if con, err = o.dial(ctx); err != nil { o.fctError(err) return err } - o.fctInfo(cnn.LocalAddr(), cnn.RemoteAddr(), libsck.ConnectionNew) - - o.sendRequest(cnn, request) - o.readResponse(cnn, fct) + o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionNew) + o.c.Store(con) return nil } -func (o *cli) sendRequest(con net.Conn, r io.Reader) { - var ( - err error - buf []byte - rdr = bufio.NewReaderSize(r, o.buffSize()) - wrt = bufio.NewWriterSize(con, o.buffSize()) - ) +func (o *cli) Read(p []byte) (n int, err error) { + if o == nil { + return 0, ErrInstance + } else if i := o.c.Load(); i == nil { + return 0, ErrConnection + } else if c, k := i.(net.Conn); !k { + return 0, ErrConnection + } else { + o.fctInfo(c.LocalAddr(), c.RemoteAddr(), libsck.ConnectionRead) + return c.Read(p) + } +} + +func (o *cli) Write(p []byte) (n int, err error) { + if o == nil { + return 0, ErrInstance + } else if i := o.c.Load(); i == nil { + return 0, ErrConnection + } else if c, k := i.(net.Conn); !k { + return 0, ErrConnection + } else { + o.fctInfo(c.LocalAddr(), c.RemoteAddr(), libsck.ConnectionWrite) + return c.Write(p) + } +} + +func (o *cli) Close() error { + if o == nil { + return ErrInstance + } else if i := o.c.Load(); i == nil { + return ErrConnection + } else if c, k := i.(net.Conn); !k { + return ErrConnection + } else { + o.fctInfo(c.LocalAddr(), c.RemoteAddr(), libsck.ConnectionClose) + e := c.Close() + o.c.Store(c) + return e + } +} + +func (o *cli) Once(ctx context.Context, request io.Reader, fct libsck.Response) error { + if o == nil { + return ErrInstance + } defer func() { - if con != nil { - if c, ok := con.(*net.TCPConn); ok { - o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionCloseWrite) - _ = c.CloseWrite() - } - } + o.fctError(o.Close()) }() - for { - if con == nil && r == nil { - return - } + var err error - buf, err = rdr.ReadBytes('\n') + if err = o.Connect(ctx); err != nil { + o.fctError(err) + return err + } + + for { + _, err = io.Copy(o, request) if err != nil { if !errors.Is(err, io.EOF) { o.fctError(err) + return err + } else { + break } - return - } - - o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionWrite) - - _, err = wrt.Write(buf) - if err != nil { - o.fctError(err) - return - } - - err = wrt.Flush() - if err != nil { - o.fctError(err) - return } } -} -func (o *cli) readResponse(con net.Conn, f libsck.Response) { - if f == nil { - return - } - - o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionHandler) - f(con) + fct(o) + return nil } diff --git a/socket/client/udp/error.go b/socket/client/udp/error.go index ba4d0fc..079128d 100644 --- a/socket/client/udp/error.go +++ b/socket/client/udp/error.go @@ -29,6 +29,7 @@ package udp import "fmt" var ( - ErrInstance = fmt.Errorf("invalid instance") - ErrAddress = fmt.Errorf("invalid dial address") + ErrInstance = fmt.Errorf("invalid instance") + ErrConnection = fmt.Errorf("invalid connection") + ErrAddress = fmt.Errorf("invalid dial address") ) diff --git a/socket/client/udp/interface.go b/socket/client/udp/interface.go index 1ea362c..3dd9bd2 100644 --- a/socket/client/udp/interface.go +++ b/socket/client/udp/interface.go @@ -59,5 +59,6 @@ func New(buffSizeRead libsiz.Size, address string) (ClientUDP, error) { s: s, e: new(atomic.Value), i: new(atomic.Value), + c: new(atomic.Value), }, nil } diff --git a/socket/client/udp/model.go b/socket/client/udp/model.go index 12c576d..b6946a0 100644 --- a/socket/client/udp/model.go +++ b/socket/client/udp/model.go @@ -27,7 +27,6 @@ package udp import ( - "bufio" "context" "errors" "io" @@ -43,6 +42,7 @@ type cli struct { s *atomic.Int32 // buffer size e *atomic.Value // function error i *atomic.Value // function info + c *atomic.Value // net.Conn } func (o *cli) RegisterFuncError(f libsck.FuncError) { @@ -109,80 +109,98 @@ func (o *cli) dial(ctx context.Context) (net.Conn, error) { } } -func (o *cli) Do(ctx context.Context, request io.Reader, fct libsck.Response) error { +func (o *cli) Connect(ctx context.Context) error { if o == nil { return ErrInstance } var ( err error - cnn net.Conn + con net.Conn ) - defer func() { - if cnn != nil { - o.fctInfo(cnn.LocalAddr(), cnn.RemoteAddr(), libsck.ConnectionClose) - o.fctError(cnn.Close()) - } - }() - o.fctInfo(&net.UDPAddr{}, &net.UDPAddr{}, libsck.ConnectionDial) - if cnn, err = o.dial(ctx); err != nil { + if con, err = o.dial(ctx); err != nil { o.fctError(err) return err } - o.fctInfo(cnn.LocalAddr(), cnn.RemoteAddr(), libsck.ConnectionNew) - - o.sendRequest(cnn, request) - o.readResponse(cnn, fct) + o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionNew) + o.c.Store(con) return nil } -func (o *cli) sendRequest(con net.Conn, r io.Reader) { - var ( - err error - buf []byte - rdr = bufio.NewReaderSize(r, o.buffSize()) - wrt = bufio.NewWriterSize(con, o.buffSize()) - ) +func (o *cli) Read(p []byte) (n int, err error) { + if o == nil { + return 0, ErrInstance + } else if i := o.c.Load(); i == nil { + return 0, ErrConnection + } else if c, k := i.(net.Conn); !k { + return 0, ErrConnection + } else { + o.fctInfo(c.LocalAddr(), c.RemoteAddr(), libsck.ConnectionRead) + return c.Read(p) + } +} + +func (o *cli) Write(p []byte) (n int, err error) { + if o == nil { + return 0, ErrInstance + } else if i := o.c.Load(); i == nil { + return 0, ErrConnection + } else if c, k := i.(net.Conn); !k { + return 0, ErrConnection + } else { + o.fctInfo(c.LocalAddr(), c.RemoteAddr(), libsck.ConnectionWrite) + return c.Write(p) + } +} + +func (o *cli) Close() error { + if o == nil { + return ErrInstance + } else if i := o.c.Load(); i == nil { + return ErrConnection + } else if c, k := i.(net.Conn); !k { + return ErrConnection + } else { + o.fctInfo(c.LocalAddr(), c.RemoteAddr(), libsck.ConnectionClose) + e := c.Close() + o.c.Store(c) + return e + } +} + +func (o *cli) Once(ctx context.Context, request io.Reader, fct libsck.Response) error { + if o == nil { + return ErrInstance + } + + defer func() { + o.fctError(o.Close()) + }() + + var err error + + if err = o.Connect(ctx); err != nil { + o.fctError(err) + return err + } for { - if con == nil && r == nil { - return - } - - buf, err = rdr.ReadBytes('\n') + _, err = io.Copy(o, request) if err != nil { if !errors.Is(err, io.EOF) { o.fctError(err) + return err + } else { + break } - return - } - - o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionWrite) - - _, err = wrt.Write(buf) - if err != nil { - o.fctError(err) - return - } - - err = wrt.Flush() - if err != nil { - o.fctError(err) - return } } -} -func (o *cli) readResponse(con net.Conn, f libsck.Response) { - if f == nil { - return - } - - o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionHandler) - f(con) + fct(o) + return nil } diff --git a/socket/client/unix/error.go b/socket/client/unix/error.go index 8bf0c5b..9621daf 100644 --- a/socket/client/unix/error.go +++ b/socket/client/unix/error.go @@ -32,6 +32,7 @@ package unix import "fmt" var ( - ErrInstance = fmt.Errorf("invalid instance") - ErrAddress = fmt.Errorf("invalid dial address") + ErrInstance = fmt.Errorf("invalid instance") + ErrConnection = fmt.Errorf("invalid connection") + ErrAddress = fmt.Errorf("invalid dial address") ) diff --git a/socket/client/unix/interface.go b/socket/client/unix/interface.go index d64912d..3363255 100644 --- a/socket/client/unix/interface.go +++ b/socket/client/unix/interface.go @@ -54,5 +54,6 @@ func New(buffSizeRead libsiz.Size, unixfile string) ClientUnix { s: s, e: new(atomic.Value), i: new(atomic.Value), + c: new(atomic.Value), } } diff --git a/socket/client/unix/model.go b/socket/client/unix/model.go index 7c19fd1..0b8d75b 100644 --- a/socket/client/unix/model.go +++ b/socket/client/unix/model.go @@ -30,7 +30,6 @@ package unix import ( - "bufio" "context" "errors" "io" @@ -46,6 +45,7 @@ type cli struct { s *atomic.Int32 // buffer size e *atomic.Value // function error i *atomic.Value // function info + c *atomic.Value // net.Conn } func (o *cli) RegisterFuncError(f libsck.FuncError) { @@ -112,91 +112,98 @@ func (o *cli) dial(ctx context.Context) (net.Conn, error) { } } -func (o *cli) Do(ctx context.Context, request io.Reader, fct libsck.Response) error { +func (o *cli) Connect(ctx context.Context) error { if o == nil { return ErrInstance } var ( err error - cnn net.Conn + con net.Conn ) - defer func() { - if cnn != nil { - o.fctInfo(cnn.LocalAddr(), cnn.RemoteAddr(), libsck.ConnectionClose) - o.fctError(cnn.Close()) - } - }() - o.fctInfo(&net.UnixAddr{}, &net.UnixAddr{}, libsck.ConnectionDial) - if cnn, err = o.dial(ctx); err != nil { + if con, err = o.dial(ctx); err != nil { o.fctError(err) return err } - o.fctInfo(cnn.LocalAddr(), cnn.RemoteAddr(), libsck.ConnectionNew) - - o.sendRequest(cnn, request) - o.readResponse(cnn, fct) + o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionNew) + o.c.Store(con) return nil } -func (o *cli) sendRequest(con net.Conn, r io.Reader) { - var ( - err error - buf []byte - rdr = bufio.NewReaderSize(r, o.buffSize()) - wrt = bufio.NewWriterSize(con, o.buffSize()) - ) +func (o *cli) Read(p []byte) (n int, err error) { + if o == nil { + return 0, ErrInstance + } else if i := o.c.Load(); i == nil { + return 0, ErrConnection + } else if c, k := i.(net.Conn); !k { + return 0, ErrConnection + } else { + o.fctInfo(c.LocalAddr(), c.RemoteAddr(), libsck.ConnectionRead) + return c.Read(p) + } +} + +func (o *cli) Write(p []byte) (n int, err error) { + if o == nil { + return 0, ErrInstance + } else if i := o.c.Load(); i == nil { + return 0, ErrConnection + } else if c, k := i.(net.Conn); !k { + return 0, ErrConnection + } else { + o.fctInfo(c.LocalAddr(), c.RemoteAddr(), libsck.ConnectionWrite) + return c.Write(p) + } +} + +func (o *cli) Close() error { + if o == nil { + return ErrInstance + } else if i := o.c.Load(); i == nil { + return ErrConnection + } else if c, k := i.(net.Conn); !k { + return ErrConnection + } else { + o.fctInfo(c.LocalAddr(), c.RemoteAddr(), libsck.ConnectionClose) + e := c.Close() + o.c.Store(c) + return e + } +} + +func (o *cli) Once(ctx context.Context, request io.Reader, fct libsck.Response) error { + if o == nil { + return ErrInstance + } defer func() { - if con != nil { - if c, ok := con.(*net.UnixConn); ok { - o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionCloseWrite) - _ = c.CloseWrite() - } - } + o.fctError(o.Close()) }() - for { - if con == nil && r == nil { - return - } + var err error - buf, err = rdr.ReadBytes('\n') + if err = o.Connect(ctx); err != nil { + o.fctError(err) + return err + } + + for { + _, err = io.Copy(o, request) if err != nil { if !errors.Is(err, io.EOF) { o.fctError(err) - } - if len(buf) < 1 { - return + return err + } else { + break } } - - o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionWrite) - - _, err = wrt.Write(buf) - if err != nil { - o.fctError(err) - return - } - - err = wrt.Flush() - if err != nil { - o.fctError(err) - return - } - } -} - -func (o *cli) readResponse(con net.Conn, f libsck.Response) { - if f == nil { - return } - o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionHandler) - f(con) + fct(o) + return nil } diff --git a/socket/client/unixgram/error.go b/socket/client/unixgram/error.go index b12132a..838d18c 100644 --- a/socket/client/unixgram/error.go +++ b/socket/client/unixgram/error.go @@ -32,6 +32,7 @@ package unixgram import "fmt" var ( - ErrInstance = fmt.Errorf("invalid instance") - ErrAddress = fmt.Errorf("invalid dial address") + ErrInstance = fmt.Errorf("invalid instance") + ErrConnection = fmt.Errorf("invalid connection") + ErrAddress = fmt.Errorf("invalid dial address") ) diff --git a/socket/client/unixgram/interface.go b/socket/client/unixgram/interface.go index 27ee86a..d1933e3 100644 --- a/socket/client/unixgram/interface.go +++ b/socket/client/unixgram/interface.go @@ -54,5 +54,6 @@ func New(buffSizeRead libsiz.Size, unixfile string) ClientUnix { s: s, e: new(atomic.Value), i: new(atomic.Value), + c: new(atomic.Value), } } diff --git a/socket/client/unixgram/model.go b/socket/client/unixgram/model.go index 70d4786..fb2e628 100644 --- a/socket/client/unixgram/model.go +++ b/socket/client/unixgram/model.go @@ -30,7 +30,6 @@ package unixgram import ( - "bufio" "context" "errors" "io" @@ -46,6 +45,7 @@ type cli struct { s *atomic.Int32 // buffer size e *atomic.Value // function error i *atomic.Value // function info + c *atomic.Value // net.Conn } func (o *cli) RegisterFuncError(f libsck.FuncError) { @@ -112,80 +112,98 @@ func (o *cli) dial(ctx context.Context) (net.Conn, error) { } } -func (o *cli) Do(ctx context.Context, request io.Reader, fct libsck.Response) error { +func (o *cli) Connect(ctx context.Context) error { if o == nil { return ErrInstance } var ( err error - cnn net.Conn + con net.Conn ) - defer func() { - if cnn != nil { - o.fctInfo(cnn.LocalAddr(), cnn.RemoteAddr(), libsck.ConnectionClose) - o.fctError(cnn.Close()) - } - }() - o.fctInfo(&net.UnixAddr{}, &net.UnixAddr{}, libsck.ConnectionDial) - if cnn, err = o.dial(ctx); err != nil { + if con, err = o.dial(ctx); err != nil { o.fctError(err) return err } - o.fctInfo(cnn.LocalAddr(), cnn.RemoteAddr(), libsck.ConnectionNew) - - o.sendRequest(cnn, request) - o.readResponse(cnn, fct) + o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionNew) + o.c.Store(con) return nil } -func (o *cli) sendRequest(con net.Conn, r io.Reader) { - var ( - err error - buf []byte - rdr = bufio.NewReaderSize(r, o.buffSize()) - wrt = bufio.NewWriterSize(con, o.buffSize()) - ) +func (o *cli) Read(p []byte) (n int, err error) { + if o == nil { + return 0, ErrInstance + } else if i := o.c.Load(); i == nil { + return 0, ErrConnection + } else if c, k := i.(net.Conn); !k { + return 0, ErrConnection + } else { + o.fctInfo(c.LocalAddr(), c.RemoteAddr(), libsck.ConnectionRead) + return c.Read(p) + } +} + +func (o *cli) Write(p []byte) (n int, err error) { + if o == nil { + return 0, ErrInstance + } else if i := o.c.Load(); i == nil { + return 0, ErrConnection + } else if c, k := i.(net.Conn); !k { + return 0, ErrConnection + } else { + o.fctInfo(c.LocalAddr(), c.RemoteAddr(), libsck.ConnectionWrite) + return c.Write(p) + } +} + +func (o *cli) Close() error { + if o == nil { + return ErrInstance + } else if i := o.c.Load(); i == nil { + return ErrConnection + } else if c, k := i.(net.Conn); !k { + return ErrConnection + } else { + o.fctInfo(c.LocalAddr(), c.RemoteAddr(), libsck.ConnectionClose) + e := c.Close() + o.c.Store(c) + return e + } +} + +func (o *cli) Once(ctx context.Context, request io.Reader, fct libsck.Response) error { + if o == nil { + return ErrInstance + } + + defer func() { + o.fctError(o.Close()) + }() + + var err error + + if err = o.Connect(ctx); err != nil { + o.fctError(err) + return err + } for { - if con == nil && r == nil { - return - } - - buf, err = rdr.ReadBytes('\n') + _, err = io.Copy(o, request) if err != nil { if !errors.Is(err, io.EOF) { o.fctError(err) + return err + } else { + break } - return - } - - o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionWrite) - - _, err = wrt.Write(buf) - if err != nil { - o.fctError(err) - return - } - - err = wrt.Flush() - if err != nil { - o.fctError(err) - return } } -} -func (o *cli) readResponse(con net.Conn, f libsck.Response) { - if f == nil { - return - } - - o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionHandler) - f(con) + fct(o) + return nil } diff --git a/socket/interface.go b/socket/interface.go index 8f359a8..7ad1a7f 100644 --- a/socket/interface.go +++ b/socket/interface.go @@ -94,8 +94,11 @@ type Server interface { } type Client interface { + io.ReadWriteCloser + RegisterFuncError(f FuncError) RegisterFuncInfo(f FuncInfo) - Do(ctx context.Context, request io.Reader, fct Response) error + Connect(ctx context.Context) error + Once(ctx context.Context, request io.Reader, fct Response) error } diff --git a/socket/server/tcp/listener.go b/socket/server/tcp/listener.go index cec3b08..6696b33 100644 --- a/socket/server/tcp/listener.go +++ b/socket/server/tcp/listener.go @@ -146,15 +146,16 @@ func (o *srv) Listen(ctx context.Context) error { } } -func (o *srv) Conn(conn net.Conn) { +func (o *srv) Conn(con net.Conn) { defer func() { - o.fctInfo(conn.LocalAddr(), conn.RemoteAddr(), libsck.ConnectionClose) - _ = conn.Close() + o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionClose) + _ = con.Close() }() var ( err error - rdr = bufio.NewReaderSize(conn, o.buffSize()) + nbr int + rdr = bufio.NewReaderSize(con, o.buffSize()) msg []byte hdl libsck.Handler ) @@ -164,25 +165,26 @@ func (o *srv) Conn(conn net.Conn) { } for { + msg = msg[:0] msg, err = rdr.ReadBytes('\n') + nbr = len(msg) + + o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionRead) + + if nbr > 0 { + if !bytes.HasSuffix(msg, []byte{libsck.EOL}) { + msg = append(msg, libsck.EOL) + nbr++ + } + + o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionHandler) + hdl(bytes.NewBuffer(msg[:nbr]), con) + } - o.fctInfo(conn.LocalAddr(), conn.RemoteAddr(), libsck.ConnectionRead) if err != nil { if err != io.EOF { o.fctError(err) } - if len(msg) < 1 { - break - } } - - var buf = bytes.NewBuffer(msg) - - if !bytes.HasSuffix(msg, []byte{libsck.EOL}) { - buf.Write([]byte{libsck.EOL}) - } - - o.fctInfo(conn.LocalAddr(), conn.RemoteAddr(), libsck.ConnectionHandler) - hdl(buf, conn) } } diff --git a/socket/server/udp/listener.go b/socket/server/udp/listener.go index a74f8b0..482d59d 100644 --- a/socket/server/udp/listener.go +++ b/socket/server/udp/listener.go @@ -137,6 +137,13 @@ func (o *srv) Listen(ctx context.Context) error { o.fctInfoSrv("starting listening socket '%s %s'", libptc.NetworkUDP.String(), adr) // Accept new connection or stop if context or shutdown trigger + + var ( + siz = o.buffSize() + buf []byte + rer error + ) + for { // Accept an incoming connection. if con == nil { @@ -145,11 +152,7 @@ func (o *srv) Listen(ctx context.Context) error { return err } - var ( - buf = make([]byte, o.buffSize()) - rer error - ) - + buf = make([]byte, siz) nbr, rem, rer = con.ReadFrom(buf) if rem == nil { @@ -162,20 +165,16 @@ func (o *srv) Listen(ctx context.Context) error { if !stp.Load() { o.fctError(rer) } - if nbr < 1 { - continue - } } - go func(la, ra net.Addr, b []byte) { - o.fctInfo(la, ra, libsck.ConnectionHandler) - - r := bytes.NewBuffer(b) - if !bytes.HasSuffix(b, []byte{libsck.EOL}) { - r.Write([]byte{libsck.EOL}) + if nbr > 0 { + if !bytes.HasSuffix(buf, []byte{libsck.EOL}) { + buf = append(buf, libsck.EOL) + nbr++ } - hdl(r, io.Discard) - }(loc, rem, buf[:nbr]) + o.fctInfo(loc, rem, libsck.ConnectionHandler) + hdl(bytes.NewBuffer(buf[:nbr]), io.Discard) + } } } diff --git a/socket/server/unix/listener.go b/socket/server/unix/listener.go index c2d28d5..838603c 100644 --- a/socket/server/unix/listener.go +++ b/socket/server/unix/listener.go @@ -234,6 +234,7 @@ func (o *srv) Conn(con net.Conn) { var ( err error + nbr int rdr = bufio.NewReaderSize(con, o.buffSize()) msg []byte hdl libsck.Handler @@ -244,25 +245,26 @@ func (o *srv) Conn(con net.Conn) { } for { + msg = msg[:0] msg, err = rdr.ReadBytes('\n') + nbr = len(msg) o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionRead) + + if nbr > 0 { + if !bytes.HasSuffix(msg, []byte{libsck.EOL}) { + msg = append(msg, libsck.EOL) + nbr++ + } + + o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionHandler) + hdl(bytes.NewBuffer(msg[:nbr]), con) + } + if err != nil { if err != io.EOF { o.fctError(err) } - if len(msg) < 1 { - break - } } - - var buf = bytes.NewBuffer(msg) - - if !bytes.HasSuffix(msg, []byte{libsck.EOL}) { - buf.Write([]byte{libsck.EOL}) - } - - o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionHandler) - hdl(buf, con) } } diff --git a/socket/server/unixgram/listener.go b/socket/server/unixgram/listener.go index be456ee..9d99abe 100644 --- a/socket/server/unixgram/listener.go +++ b/socket/server/unixgram/listener.go @@ -205,6 +205,13 @@ func (o *srv) Listen(ctx context.Context) error { }() o.r.Store(true) + + var ( + siz = o.buffSize() + buf []byte + rer error + ) + // Accept new connection or stop if context or shutdown trigger for { // Accept an incoming connection. @@ -214,11 +221,7 @@ func (o *srv) Listen(ctx context.Context) error { return err } - var ( - buf = make([]byte, o.buffSize()) - rer error - ) - + buf = make([]byte, siz) nbr, rem, rer = con.ReadFrom(buf) if rem == nil { @@ -227,25 +230,20 @@ func (o *srv) Listen(ctx context.Context) error { o.fctInfo(loc, rem, libsck.ConnectionRead) + if nbr > 0 { + if !bytes.HasSuffix(buf, []byte{libsck.EOL}) { + buf = append(buf, libsck.EOL) + nbr++ + } + + o.fctInfo(loc, rem, libsck.ConnectionHandler) + hdl(bytes.NewBuffer(buf[:nbr]), io.Discard) + } + if rer != nil { if !stp.Load() { o.fctError(rer) } - if nbr < 1 { - continue - } } - - go func(la, ra net.Addr, b []byte) { - o.fctInfo(la, ra, libsck.ConnectionHandler) - - r := bytes.NewBuffer(b) - - if !bytes.HasSuffix(b, []byte{libsck.EOL}) { - r.Write([]byte{libsck.EOL}) - } - - hdl(r, io.Discard) - }(loc, rem, buf[:nbr]) } } diff --git a/test/test-socket-client-tcp/main.go b/test/test-socket-client-tcp/main.go index 5737f65..4af0484 100644 --- a/test/test-socket-client-tcp/main.go +++ b/test/test-socket-client-tcp/main.go @@ -95,7 +95,7 @@ func main() { _, _ = fmt.Fprintf(os.Stdout, "[%s %s]=>[%s %s] %s\n", remote.Network(), remote.String(), local.Network(), local.String(), state.String()) }) - checkPanic(cli.Do(context.Background(), request(), func(r io.Reader) { + checkPanic(cli.Once(context.Background(), request(), func(r io.Reader) { _, e := io.Copy(os.Stdout, r) printError(e) })) diff --git a/test/test-socket-client-udp/main.go b/test/test-socket-client-udp/main.go index ab993da..d2f7655 100644 --- a/test/test-socket-client-udp/main.go +++ b/test/test-socket-client-udp/main.go @@ -94,5 +94,5 @@ func main() { _, _ = fmt.Fprintf(os.Stdout, "[%s %s]=>[%s %s] %s\n", remote.Network(), remote.String(), local.Network(), local.String(), state.String()) }) - checkPanic(cli.Do(context.Background(), request(), nil)) + checkPanic(cli.Once(context.Background(), request(), nil)) } diff --git a/test/test-socket-client-unix/main.go b/test/test-socket-client-unix/main.go index 7133a37..1c87e4f 100644 --- a/test/test-socket-client-unix/main.go +++ b/test/test-socket-client-unix/main.go @@ -95,5 +95,5 @@ func main() { _, _ = fmt.Fprintf(os.Stdout, "[%s %s]=>[%s %s] %s\n", remote.Network(), remote.String(), local.Network(), local.String(), state.String()) }) - checkPanic(cli.Do(context.Background(), request(), nil)) + checkPanic(cli.Once(context.Background(), request(), nil)) }