Package socket:

- Client: optimize code & memory
- Client: allow multiple send content for one connection
- Server: optmize code
- Server: optimize memory use & stream

Package server:
- Rework chan / cancel process
- Simplify & optimize chan / cancel process
- Add Function Action for Ticker as FuncTicker
- Rename type func Action to FuncAction

Package httpserver:
- Fix error on starting runner
- Log error on starting runner
- Optimize some code

Package config:
- Add log info for each starting / reloading component (if default logger is set)
- Add log error for starting error or not started for each component on start / reload (if default logger is set)

Package monitor:
- review ticker delay for starting puller (pass from 500ms to 50ms)

Other:
- bump dependencies
This commit is contained in:
Nicolas JUHEL
2024-03-07 15:12:20 +01:00
parent a9a4d1e7c2
commit b190739581
31 changed files with 707 additions and 792 deletions

View File

@@ -34,6 +34,7 @@ import (
cfgcst "github.com/nabbar/golib/config/const"
cfgtps "github.com/nabbar/golib/config/types"
loglvl "github.com/nabbar/golib/logger/level"
spfcbr "github.com/spf13/cobra"
"golang.org/x/exp/slices"
)
@@ -118,12 +119,22 @@ func (c *configModel) ComponentStart() error {
} else if cpt := c.ComponentGet(key); cpt == nil {
continue
} else {
if l := c.getDefaultLogger(); l != nil {
l.Entry(loglvl.InfoLevel, fmt.Sprintf("starting component '%s'", key)).Log()
}
e := cpt.Start()
c.componentUpdate(key, cpt)
if e != nil {
if l := c.getDefaultLogger(); l != nil {
l.Entry(loglvl.ErrorLevel, fmt.Sprintf("component '%s' starting return an error", key)).ErrorAdd(true, e).Log()
}
err.Add(e)
} else if !cpt.IsStarted() {
err.Add(fmt.Errorf("component '%s' has been call to start, but is not started", key))
e = fmt.Errorf("component '%s' has been call to start, but is not started", key)
if l := c.getDefaultLogger(); l != nil {
l.Entry(loglvl.ErrorLevel, fmt.Sprintf("component '%s' is not started", key)).ErrorAdd(true, e).Log()
}
err.Add(e)
}
}
}
@@ -163,11 +174,21 @@ func (c *configModel) ComponentReload() error {
} else if cpt := c.ComponentGet(key); cpt == nil {
continue
} else {
if l := c.getDefaultLogger(); l != nil {
l.Entry(loglvl.InfoLevel, fmt.Sprintf("reloading component '%s'", key)).Log()
}
e := cpt.Reload()
c.componentUpdate(key, cpt)
if e != nil {
if l := c.getDefaultLogger(); l != nil {
l.Entry(loglvl.ErrorLevel, fmt.Sprintf("component '%s' reloading return an error", key)).ErrorAdd(true, e).Log()
}
err.Add(e)
} else if !cpt.IsStarted() {
e = fmt.Errorf("component '%s' has been call to reload, but is not started", key)
if l := c.getDefaultLogger(); l != nil {
l.Entry(loglvl.ErrorLevel, fmt.Sprintf("component '%s' is not started after a reload", key)).ErrorAdd(true, e).Log()
}
err.Add(fmt.Errorf("component '%s' has been call to reload, but is not started", key))
}
}

95
go.mod
View File

@@ -2,16 +2,16 @@ module github.com/nabbar/golib
go 1.21
toolchain go1.21.6
toolchain go1.21.7
require (
github.com/aws/aws-sdk-go v1.50.19
github.com/aws/aws-sdk-go-v2 v1.25.0
github.com/aws/aws-sdk-go-v2/config v1.27.0
github.com/aws/aws-sdk-go-v2/credentials v1.17.0
github.com/aws/aws-sdk-go-v2/service/iam v1.29.0
github.com/aws/aws-sdk-go-v2/service/s3 v1.49.0
github.com/aws/smithy-go v1.20.0
github.com/aws/aws-sdk-go v1.50.33
github.com/aws/aws-sdk-go-v2 v1.25.2
github.com/aws/aws-sdk-go-v2/config v1.27.6
github.com/aws/aws-sdk-go-v2/credentials v1.17.6
github.com/aws/aws-sdk-go-v2/service/iam v1.31.1
github.com/aws/aws-sdk-go-v2/service/s3 v1.51.3
github.com/aws/smithy-go v1.20.1
github.com/bits-and-blooms/bitset v1.13.0
github.com/c-bata/go-prompt v0.2.6
github.com/fatih/color v1.16.0
@@ -19,7 +19,7 @@ require (
github.com/fxamacker/cbor/v2 v2.6.0
github.com/gin-gonic/gin v1.9.1
github.com/go-ldap/ldap/v3 v3.4.6
github.com/go-playground/validator/v10 v10.18.0
github.com/go-playground/validator/v10 v10.19.0
github.com/google/go-github/v33 v33.0.0
github.com/hashicorp/go-hclog v1.6.2
github.com/hashicorp/go-retryablehttp v0.7.5
@@ -31,14 +31,14 @@ require (
github.com/mattn/go-colorable v0.1.13
github.com/mitchellh/go-homedir v1.1.0
github.com/mitchellh/mapstructure v1.5.0
github.com/nats-io/jwt/v2 v2.5.4
github.com/nats-io/jwt/v2 v2.5.5
github.com/nats-io/nats-server/v2 v2.10.11
github.com/nats-io/nats.go v1.33.0
github.com/nats-io/nats.go v1.33.1
github.com/nutsdb/nutsdb v0.14.3
github.com/onsi/ginkgo/v2 v2.15.0
github.com/onsi/ginkgo/v2 v2.16.0
github.com/onsi/gomega v1.31.1
github.com/pelletier/go-toml v1.9.5
github.com/prometheus/client_golang v1.18.0
github.com/prometheus/client_golang v1.19.0
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.8.0
@@ -46,15 +46,15 @@ require (
github.com/spf13/viper v1.18.2
github.com/ugorji/go/codec v1.2.12
github.com/vbauerster/mpb/v8 v8.7.2
github.com/xanzy/go-gitlab v0.97.0
github.com/xanzy/go-gitlab v0.99.0
github.com/xhit/go-simple-mail v2.2.2+incompatible
github.com/xujiajun/utils v0.0.0-20220904132955-5f7c5b914235
golang.org/x/exp v0.0.0-20240213143201-ec583247a57a
golang.org/x/net v0.21.0
golang.org/x/oauth2 v0.17.0
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225
golang.org/x/net v0.22.0
golang.org/x/oauth2 v0.18.0
golang.org/x/sync v0.6.0
golang.org/x/sys v0.17.0
golang.org/x/term v0.17.0
golang.org/x/sys v0.18.0
golang.org/x/term v0.18.0
gopkg.in/yaml.v3 v3.0.1
gorm.io/driver/clickhouse v0.6.0
gorm.io/driver/mysql v1.5.4
@@ -66,12 +66,12 @@ require (
require (
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect
github.com/ClickHouse/ch-go v0.61.2 // indirect
github.com/ClickHouse/clickhouse-go/v2 v2.18.0 // indirect
github.com/ClickHouse/ch-go v0.61.3 // indirect
github.com/ClickHouse/clickhouse-go/v2 v2.20.0 // indirect
github.com/Masterminds/goutils v1.1.1 // indirect
github.com/Masterminds/semver v1.5.0 // indirect
github.com/Masterminds/sprig v2.22.0+incompatible // indirect
github.com/PuerkitoBio/goquery v1.8.1 // indirect
github.com/PuerkitoBio/goquery v1.9.1 // indirect
github.com/VictoriaMetrics/metrics v1.6.2 // indirect
github.com/VividCortex/ewma v1.2.0 // indirect
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d // indirect
@@ -80,22 +80,22 @@ require (
github.com/antlabs/stl v0.0.2 // indirect
github.com/antlabs/timer v0.1.3 // indirect
github.com/armon/go-metrics v0.4.1 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.19.0 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.22.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.27.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.2 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.4 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.4 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.2 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.20.1 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.28.3 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bwmarrin/snowflake v0.3.0 // indirect
github.com/bytedance/sonic v1.10.2 // indirect
github.com/bytedance/sonic v1.11.2 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect
github.com/chenzhuoyu/iasm v0.9.1 // indirect
@@ -120,12 +120,12 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect
github.com/golang-sql/sqlexp v0.1.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/btree v1.0.1 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/pprof v0.0.0-20240207164012-fb44976bdcd5 // indirect
github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/css v1.0.1 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
@@ -142,7 +142,7 @@ require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect
github.com/jackc/pgx/v5 v5.5.3 // indirect
github.com/jackc/pgx/v5 v5.5.4 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jaytaylor/html2text v0.0.0-20230321000545-74c2419ad056 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
@@ -150,8 +150,8 @@ require (
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/juju/ratelimit v1.0.2-0.20191002062651-f60b32039441 // indirect
github.com/klauspost/compress v1.17.6 // indirect
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
github.com/klauspost/compress v1.17.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
@@ -161,8 +161,7 @@ require (
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/mattn/go-sqlite3 v1.14.22 // indirect
github.com/mattn/go-tty v0.0.5 // indirect
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
github.com/microsoft/go-mssqldb v1.6.0 // indirect
github.com/microsoft/go-mssqldb v1.7.0 // indirect
github.com/miekg/dns v1.1.43 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
@@ -178,7 +177,7 @@ require (
github.com/pkg/errors v0.9.1 // indirect
github.com/pkg/term v1.2.0-beta.2 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.45.0 // indirect
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
@@ -203,15 +202,15 @@ require (
github.com/x448/float16 v0.8.4 // indirect
github.com/xujiajun/mmap-go v1.0.1 // indirect
github.com/yusufpapurcu/wmi v1.2.3 // indirect
go.opentelemetry.io/otel v1.23.1 // indirect
go.opentelemetry.io/otel/trace v1.23.1 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/arch v0.7.0 // indirect
golang.org/x/crypto v0.19.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.18.0 // indirect
golang.org/x/tools v0.19.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/protobuf v1.32.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
)

View File

@@ -86,7 +86,27 @@ func (o *srv) runStart(ctx context.Context) error {
return ErrorServerValidate.Error(nil)
}
return o.r.Start(ctx)
if e := o.r.Start(ctx); e != nil {
return e
}
var x, n = context.WithTimeout(ctx, 30*time.Second)
defer n()
for !o.r.IsRunning() {
select {
case <-x.Done():
return errNotRunning
default:
time.Sleep(100 * time.Millisecond)
if o.r.IsRunning() {
return o.GetError()
}
}
}
return o.GetError()
}
func (o *srv) runStop(ctx context.Context) error {
@@ -239,3 +259,25 @@ func (o *srv) Uptime() time.Duration {
return 0
}
func (o *srv) IsError() bool {
if el := o.r.ErrorsList(); len(el) > 0 {
for _, e := range el {
if e != nil {
return true
}
}
}
return false
}
func (o *srv) GetError() error {
var err = ErrorServerStart.Error(o.r.ErrorsList()...)
if err.HasParent() {
return err
}
return nil
}

View File

@@ -36,7 +36,7 @@ import (
const (
MaxPoolStart = 15 * time.Second
MaxTickPooler = 500 * time.Millisecond
MaxTickPooler = 50 * time.Millisecond
)
func (o *mon) Start(ctx context.Context) error {
@@ -126,10 +126,10 @@ func (o *mon) poolIsRunning(ctx context.Context) error {
for {
select {
case <-tck.C:
if time.Since(tms) >= MaxPoolStart {
return ErrorTimeout.Error(nil)
} else if o.IsRunning() {
if o.IsRunning() {
return nil
} else if time.Since(tms) >= MaxPoolStart {
return ErrorTimeout.Error(nil)
}
case <-ctx.Done():
return ctx.Err()

View File

@@ -31,7 +31,8 @@ import (
"time"
)
type Action func(ctx context.Context) error
type FuncAction func(ctx context.Context) error
type FuncTicker func(ctx context.Context, tck *time.Ticker) error
type Server interface {
// Start is used to start the server

View File

@@ -1,124 +0,0 @@
/*
* MIT License
*
* Copyright (c) 2022 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
*/
package startStop
var closeChan = make(chan struct{})
func init() {
close(closeChan)
}
func (o *run) getChan() *chData {
if i := o.c.Load(); i == nil {
return &chData{
cl: true,
ch: closeChan,
}
} else if d, k := i.(*chData); !k {
return &chData{
cl: true,
ch: closeChan,
}
} else {
return d
}
}
func (o *run) setChan(d *chData) {
if d == nil {
d = &chData{
cl: true,
ch: closeChan,
}
}
o.c.Store(d)
}
func (o *run) IsRunning() bool {
if e := o.checkMe(); e != nil {
return false
}
c := o.getChan()
if c.cl || c.ch == closeChan {
return false
}
select {
case <-c.ch:
return false
default:
return true
}
}
func (o *run) chanInit() {
if e := o.checkMe(); e != nil {
return
}
o.setChan(&chData{
cl: false,
ch: make(chan struct{}),
})
}
func (o *run) chanDone() <-chan struct{} {
if e := o.checkMe(); e != nil {
return nil
}
return o.getChan().ch
}
func (o *run) chanSend() {
if e := o.checkMe(); e != nil {
return
}
c := o.getChan()
if !c.cl && c.ch != closeChan {
c.ch <- struct{}{}
o.setChan(c)
}
}
func (o *run) chanClose() {
if e := o.checkMe(); e != nil {
return
}
c := o.getChan()
if !c.cl && c.ch != closeChan {
close(c.ch)
o.setChan(&chData{
cl: true,
ch: closeChan,
})
}
}

View File

@@ -45,6 +45,7 @@ func New(start, stop func(ctx context.Context) error) StartStop {
f: start,
s: stop,
t: new(atomic.Value),
c: new(atomic.Value),
r: new(atomic.Bool),
n: new(atomic.Value),
}
}

View File

@@ -49,13 +49,14 @@ type chData struct {
type run struct {
e *atomic.Value // slice []error
f libsrv.Action
s libsrv.Action
f libsrv.FuncAction
s libsrv.FuncAction
t *atomic.Value
c *atomic.Value // chan struct{}
r *atomic.Bool // Want Stop
n *atomic.Value // context.CancelFunc
}
func (o *run) getFctStart() libsrv.Action {
func (o *run) getFctStart() libsrv.FuncAction {
if o.f == nil {
return func(ctx context.Context) error {
return fmt.Errorf("invalid start function")
@@ -64,7 +65,7 @@ func (o *run) getFctStart() libsrv.Action {
return o.f
}
func (o *run) getFctStop() libsrv.Action {
func (o *run) getFctStop() libsrv.FuncAction {
if o.s == nil {
return func(ctx context.Context) error {
return fmt.Errorf("invalid stop function")
@@ -73,102 +74,6 @@ func (o *run) getFctStop() libsrv.Action {
return o.s
}
func (o *run) Restart(ctx context.Context) error {
if e := o.Stop(ctx); e != nil {
return e
} else if e = o.Start(ctx); e != nil {
_ = o.Stop(ctx)
return e
}
return nil
}
func (o *run) Stop(ctx context.Context) error {
if e := o.checkMe(); e != nil {
return e
} else if !o.IsRunning() {
return nil
}
var (
e error
t = time.NewTicker(pollStop)
)
o.t.Store(time.Time{})
defer func() {
libsrv.RecoveryCaller("golib/server/startstop", recover())
t.Stop()
}()
for {
select {
case <-t.C:
if o.IsRunning() {
o.chanSend()
e = o.callStop(ctx)
} else {
return e
}
case <-ctx.Done():
return ctx.Err()
}
}
}
func (o *run) callStop(ctx context.Context) error {
if o == nil {
return nil
} else {
return o.getFctStop()(ctx)
}
}
func (o *run) Start(ctx context.Context) error {
if e := o.checkMeStart(ctx); e != nil {
return e
}
var can context.CancelFunc
ctx, can = context.WithCancel(ctx)
o.t.Store(time.Now())
go func(x context.Context, n context.CancelFunc) {
defer n()
o.chanInit()
defer func() {
libsrv.RecoveryCaller("golib/server/startstop", recover())
_ = o.Stop(ctx)
}()
if e := o.getFctStart()(x); e != nil {
o.errorsAdd(e)
return
}
}(ctx, can)
go func(x context.Context, n context.CancelFunc) {
defer n()
defer func() {
_ = o.Stop(ctx)
}()
for {
select {
case <-o.chanDone():
return
case <-x.Done():
return
}
}
}(ctx, can)
return nil
}
func (o *run) checkMe() error {
if o == nil {
return ErrInvalid
@@ -200,3 +105,112 @@ func (o *run) Uptime() time.Duration {
return time.Since(t)
}
}
func (o *run) IsRunning() bool {
if e := o.checkMe(); e != nil {
return false
}
return o.r.Load()
}
func (o *run) Restart(ctx context.Context) error {
if e := o.Stop(ctx); e != nil {
return e
} else if e = o.Start(ctx); e != nil {
_ = o.Stop(ctx)
return e
}
return nil
}
func (o *run) Stop(ctx context.Context) error {
if e := o.checkMe(); e != nil {
return e
} else if !o.IsRunning() {
return nil
}
var (
e error
t = time.NewTicker(pollStop)
)
o.errorsClean()
if i := o.n.Load(); i != nil {
if f, k := i.(context.CancelFunc); k {
f()
}
}
defer func() {
libsrv.RecoveryCaller("golib/server/startstop", recover())
t.Stop()
}()
for {
select {
case <-t.C:
if o.IsRunning() {
e = o.getFctStop()(ctx)
} else {
return e
}
case <-ctx.Done():
return ctx.Err()
}
}
}
func (o *run) Start(ctx context.Context) error {
if e := o.checkMeStart(ctx); e != nil {
return e
}
cx, ca := context.WithCancel(ctx)
if i := o.n.Swap(ca); i != nil {
if f, k := i.(context.CancelFunc); k {
f()
}
}
o.errorsClean()
fStart := func(c context.Context) error {
defer libsrv.RecoveryCaller("golib/server/startstop", recover())
return o.getFctStart()(c)
}
fStop := func(c context.Context) error {
defer libsrv.RecoveryCaller("golib/server/startstop", recover())
return o.getFctStop()(c)
}
go func(x context.Context, n context.CancelFunc, start, stop libsrv.FuncAction) {
defer func() {
libsrv.RecoveryCaller("golib/server/startstop", recover())
_ = stop(ctx)
n()
o.t.Store(time.Time{})
o.r.Store(false)
}()
o.t.Store(time.Now())
for !o.r.Load() && ctx.Err() == nil {
o.r.Store(true)
if e := start(x); e != nil {
o.errorsAdd(e)
return
}
}
}(cx, ca, fStart, fStop)
return nil
}

View File

@@ -1,124 +0,0 @@
/*
* MIT License
*
* Copyright (c) 2022 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
*/
package ticker
var closeChan = make(chan struct{})
func init() {
close(closeChan)
}
func (o *run) getChan() *chData {
if i := o.c.Load(); i == nil {
return &chData{
cl: true,
ch: closeChan,
}
} else if d, k := i.(*chData); !k {
return &chData{
cl: true,
ch: closeChan,
}
} else {
return d
}
}
func (o *run) setChan(d *chData) {
if d == nil {
d = &chData{
cl: true,
ch: closeChan,
}
}
o.c.Store(d)
}
func (o *run) IsRunning() bool {
if e := o.checkMe(); e != nil {
return false
}
c := o.getChan()
if c.cl || c.ch == closeChan {
return false
}
select {
case <-c.ch:
return false
default:
return true
}
}
func (o *run) chanInit() {
if e := o.checkMe(); e != nil {
return
}
o.setChan(&chData{
cl: false,
ch: make(chan struct{}),
})
}
func (o *run) chanDone() <-chan struct{} {
if e := o.checkMe(); e != nil {
return nil
}
return o.getChan().ch
}
func (o *run) chanSend() {
if e := o.checkMe(); e != nil {
return
}
c := o.getChan()
if !c.cl && c.ch != closeChan {
c.ch <- struct{}{}
o.setChan(c)
}
}
func (o *run) chanClose() {
if e := o.checkMe(); e != nil {
return
}
c := o.getChan()
if !c.cl && c.ch != closeChan {
close(c.ch)
o.setChan(&chData{
cl: true,
ch: closeChan,
})
}
}

View File

@@ -46,6 +46,7 @@ func New(tick time.Duration, fct func(ctx context.Context, tck *time.Ticker) err
f: fct,
d: tick,
t: new(atomic.Value),
c: new(atomic.Value),
r: new(atomic.Bool),
n: new(atomic.Value),
}
}

View File

@@ -29,6 +29,7 @@ package ticker
import (
"context"
"errors"
"fmt"
"sync/atomic"
"time"
@@ -42,17 +43,13 @@ const (
var ErrInvalid = errors.New("invalid instance")
type chData struct {
cl bool
ch chan struct{}
}
type run struct {
e *atomic.Value // slice []error
f func(ctx context.Context, tck *time.Ticker) error
f libsrv.FuncTicker
d time.Duration
t *atomic.Value
c *atomic.Value // chan struct{}
r *atomic.Bool
n *atomic.Value
}
func (o *run) getDuration() time.Duration {
@@ -60,98 +57,17 @@ func (o *run) getDuration() time.Duration {
return o.d
}
func (o *run) getFunction() func(ctx context.Context, tck *time.Ticker) error {
func (o *run) getFunction() libsrv.FuncTicker {
// still check on function checkMe
if o.f == nil {
return func(ctx context.Context, tck *time.Ticker) error {
return fmt.Errorf("invalid function ticker")
}
}
return o.f
}
func (o *run) Restart(ctx context.Context) error {
if e := o.Stop(ctx); e != nil {
return e
} else if e = o.Start(ctx); e != nil {
_ = o.Stop(ctx)
return e
}
return nil
}
func (o *run) Stop(ctx context.Context) error {
if e := o.checkMe(); e != nil {
return e
}
o.t.Store(time.Time{})
if !o.IsRunning() {
return nil
}
var t = time.NewTicker(pollStop)
defer t.Stop()
for {
select {
case <-t.C:
if o.IsRunning() {
o.chanSend()
} else {
return nil
}
case <-ctx.Done():
return ctx.Err()
}
}
}
func (o *run) Start(ctx context.Context) error {
if e := o.checkMeStart(ctx); e != nil {
return e
}
go func(con context.Context) {
var (
tck = time.NewTicker(o.getDuration())
x, n = context.WithCancel(con)
)
defer func() {
libsrv.RecoveryCaller("golib/server/ticker", recover())
if n != nil {
n()
}
if tck != nil {
tck.Stop()
}
o.chanClose()
}()
o.t.Store(time.Now())
o.chanInit()
o.errorsClean()
for {
select {
case <-tck.C:
f := func(ctx context.Context, tck *time.Ticker) error {
defer libsrv.RecoveryCaller("golib/server/ticker", recover())
return o.getFunction()(ctx, tck)
}
if e := f(x, tck); e != nil {
o.errorsAdd(e)
}
case <-con.Done():
return
case <-o.chanDone():
return
}
}
}(ctx)
return nil
}
func (o *run) checkMe() error {
if o == nil {
return ErrInvalid
@@ -183,3 +99,107 @@ func (o *run) Uptime() time.Duration {
return time.Since(t)
}
}
func (o *run) IsRunning() bool {
if e := o.checkMe(); e != nil {
return false
}
return o.r.Load()
}
func (o *run) Restart(ctx context.Context) error {
if e := o.Stop(ctx); e != nil {
return e
} else if e = o.Start(ctx); e != nil {
_ = o.Stop(ctx)
return e
}
return nil
}
func (o *run) Stop(ctx context.Context) error {
if e := o.checkMe(); e != nil {
return e
} else if !o.IsRunning() {
return nil
}
var t = time.NewTicker(pollStop)
defer t.Stop()
o.errorsClean()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-t.C:
if o.IsRunning() {
if i := o.n.Load(); i != nil {
if f, k := i.(context.CancelFunc); k {
f()
} else {
o.r.Store(false)
}
} else {
o.r.Store(false)
}
} else {
return nil
}
}
}
}
func (o *run) Start(ctx context.Context) error {
if e := o.checkMeStart(ctx); e != nil {
return e
}
cx, ca := context.WithCancel(ctx)
if i := o.n.Swap(ca); i != nil {
if f, k := i.(context.CancelFunc); k {
f()
}
}
o.errorsClean()
fct := func(ctx context.Context, tck *time.Ticker) error {
defer libsrv.RecoveryCaller("golib/server/ticker", recover())
return o.getFunction()(ctx, tck)
}
go func(x context.Context, n context.CancelFunc, f libsrv.FuncTicker) {
var tck = time.NewTicker(o.getDuration())
defer func() {
libsrv.RecoveryCaller("golib/server/ticker", recover())
tck.Stop()
n()
o.t.Store(time.Time{})
o.r.Store(false)
}()
o.r.Store(true)
o.t.Store(time.Now())
for {
select {
case <-x.Done():
return
case <-tck.C:
o.r.Store(true)
if e := f(x, tck); e != nil {
o.errorsAdd(e)
}
}
}
}(cx, ca, fct)
return nil
}

View File

@@ -29,6 +29,7 @@ package tcp
import "fmt"
var (
ErrInstance = fmt.Errorf("invalid instance")
ErrAddress = fmt.Errorf("invalid dial address")
ErrInstance = fmt.Errorf("invalid instance")
ErrConnection = fmt.Errorf("invalid connection")
ErrAddress = fmt.Errorf("invalid dial address")
)

View File

@@ -59,5 +59,6 @@ func New(buffSizeRead libsiz.Size, address string) (ClientTCP, error) {
s: s,
e: new(atomic.Value),
i: new(atomic.Value),
c: new(atomic.Value),
}, nil
}

View File

@@ -27,7 +27,6 @@
package tcp
import (
"bufio"
"context"
"errors"
"io"
@@ -43,6 +42,7 @@ type cli struct {
s *atomic.Int32 // buffer size
e *atomic.Value // function error
i *atomic.Value // function info
c *atomic.Value // net.Conn
}
func (o *cli) RegisterFuncError(f libsck.FuncError) {
@@ -109,89 +109,98 @@ func (o *cli) dial(ctx context.Context) (net.Conn, error) {
}
}
func (o *cli) Do(ctx context.Context, request io.Reader, fct libsck.Response) error {
func (o *cli) Connect(ctx context.Context) error {
if o == nil {
return ErrInstance
}
var (
err error
cnn net.Conn
con net.Conn
)
defer func() {
if cnn != nil {
o.fctInfo(cnn.LocalAddr(), cnn.RemoteAddr(), libsck.ConnectionClose)
o.fctError(cnn.Close())
}
}()
o.fctInfo(&net.TCPAddr{}, &net.TCPAddr{}, libsck.ConnectionDial)
if cnn, err = o.dial(ctx); err != nil {
if con, err = o.dial(ctx); err != nil {
o.fctError(err)
return err
}
o.fctInfo(cnn.LocalAddr(), cnn.RemoteAddr(), libsck.ConnectionNew)
o.sendRequest(cnn, request)
o.readResponse(cnn, fct)
o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionNew)
o.c.Store(con)
return nil
}
func (o *cli) sendRequest(con net.Conn, r io.Reader) {
var (
err error
buf []byte
rdr = bufio.NewReaderSize(r, o.buffSize())
wrt = bufio.NewWriterSize(con, o.buffSize())
)
func (o *cli) Read(p []byte) (n int, err error) {
if o == nil {
return 0, ErrInstance
} else if i := o.c.Load(); i == nil {
return 0, ErrConnection
} else if c, k := i.(net.Conn); !k {
return 0, ErrConnection
} else {
o.fctInfo(c.LocalAddr(), c.RemoteAddr(), libsck.ConnectionRead)
return c.Read(p)
}
}
func (o *cli) Write(p []byte) (n int, err error) {
if o == nil {
return 0, ErrInstance
} else if i := o.c.Load(); i == nil {
return 0, ErrConnection
} else if c, k := i.(net.Conn); !k {
return 0, ErrConnection
} else {
o.fctInfo(c.LocalAddr(), c.RemoteAddr(), libsck.ConnectionWrite)
return c.Write(p)
}
}
func (o *cli) Close() error {
if o == nil {
return ErrInstance
} else if i := o.c.Load(); i == nil {
return ErrConnection
} else if c, k := i.(net.Conn); !k {
return ErrConnection
} else {
o.fctInfo(c.LocalAddr(), c.RemoteAddr(), libsck.ConnectionClose)
e := c.Close()
o.c.Store(c)
return e
}
}
func (o *cli) Once(ctx context.Context, request io.Reader, fct libsck.Response) error {
if o == nil {
return ErrInstance
}
defer func() {
if con != nil {
if c, ok := con.(*net.TCPConn); ok {
o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionCloseWrite)
_ = c.CloseWrite()
}
}
o.fctError(o.Close())
}()
for {
if con == nil && r == nil {
return
}
var err error
buf, err = rdr.ReadBytes('\n')
if err = o.Connect(ctx); err != nil {
o.fctError(err)
return err
}
for {
_, err = io.Copy(o, request)
if err != nil {
if !errors.Is(err, io.EOF) {
o.fctError(err)
return err
} else {
break
}
return
}
o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionWrite)
_, err = wrt.Write(buf)
if err != nil {
o.fctError(err)
return
}
err = wrt.Flush()
if err != nil {
o.fctError(err)
return
}
}
}
func (o *cli) readResponse(con net.Conn, f libsck.Response) {
if f == nil {
return
}
o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionHandler)
f(con)
fct(o)
return nil
}

View File

@@ -29,6 +29,7 @@ package udp
import "fmt"
var (
ErrInstance = fmt.Errorf("invalid instance")
ErrAddress = fmt.Errorf("invalid dial address")
ErrInstance = fmt.Errorf("invalid instance")
ErrConnection = fmt.Errorf("invalid connection")
ErrAddress = fmt.Errorf("invalid dial address")
)

View File

@@ -59,5 +59,6 @@ func New(buffSizeRead libsiz.Size, address string) (ClientUDP, error) {
s: s,
e: new(atomic.Value),
i: new(atomic.Value),
c: new(atomic.Value),
}, nil
}

View File

@@ -27,7 +27,6 @@
package udp
import (
"bufio"
"context"
"errors"
"io"
@@ -43,6 +42,7 @@ type cli struct {
s *atomic.Int32 // buffer size
e *atomic.Value // function error
i *atomic.Value // function info
c *atomic.Value // net.Conn
}
func (o *cli) RegisterFuncError(f libsck.FuncError) {
@@ -109,80 +109,98 @@ func (o *cli) dial(ctx context.Context) (net.Conn, error) {
}
}
func (o *cli) Do(ctx context.Context, request io.Reader, fct libsck.Response) error {
func (o *cli) Connect(ctx context.Context) error {
if o == nil {
return ErrInstance
}
var (
err error
cnn net.Conn
con net.Conn
)
defer func() {
if cnn != nil {
o.fctInfo(cnn.LocalAddr(), cnn.RemoteAddr(), libsck.ConnectionClose)
o.fctError(cnn.Close())
}
}()
o.fctInfo(&net.UDPAddr{}, &net.UDPAddr{}, libsck.ConnectionDial)
if cnn, err = o.dial(ctx); err != nil {
if con, err = o.dial(ctx); err != nil {
o.fctError(err)
return err
}
o.fctInfo(cnn.LocalAddr(), cnn.RemoteAddr(), libsck.ConnectionNew)
o.sendRequest(cnn, request)
o.readResponse(cnn, fct)
o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionNew)
o.c.Store(con)
return nil
}
func (o *cli) sendRequest(con net.Conn, r io.Reader) {
var (
err error
buf []byte
rdr = bufio.NewReaderSize(r, o.buffSize())
wrt = bufio.NewWriterSize(con, o.buffSize())
)
func (o *cli) Read(p []byte) (n int, err error) {
if o == nil {
return 0, ErrInstance
} else if i := o.c.Load(); i == nil {
return 0, ErrConnection
} else if c, k := i.(net.Conn); !k {
return 0, ErrConnection
} else {
o.fctInfo(c.LocalAddr(), c.RemoteAddr(), libsck.ConnectionRead)
return c.Read(p)
}
}
func (o *cli) Write(p []byte) (n int, err error) {
if o == nil {
return 0, ErrInstance
} else if i := o.c.Load(); i == nil {
return 0, ErrConnection
} else if c, k := i.(net.Conn); !k {
return 0, ErrConnection
} else {
o.fctInfo(c.LocalAddr(), c.RemoteAddr(), libsck.ConnectionWrite)
return c.Write(p)
}
}
func (o *cli) Close() error {
if o == nil {
return ErrInstance
} else if i := o.c.Load(); i == nil {
return ErrConnection
} else if c, k := i.(net.Conn); !k {
return ErrConnection
} else {
o.fctInfo(c.LocalAddr(), c.RemoteAddr(), libsck.ConnectionClose)
e := c.Close()
o.c.Store(c)
return e
}
}
func (o *cli) Once(ctx context.Context, request io.Reader, fct libsck.Response) error {
if o == nil {
return ErrInstance
}
defer func() {
o.fctError(o.Close())
}()
var err error
if err = o.Connect(ctx); err != nil {
o.fctError(err)
return err
}
for {
if con == nil && r == nil {
return
}
buf, err = rdr.ReadBytes('\n')
_, err = io.Copy(o, request)
if err != nil {
if !errors.Is(err, io.EOF) {
o.fctError(err)
return err
} else {
break
}
return
}
o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionWrite)
_, err = wrt.Write(buf)
if err != nil {
o.fctError(err)
return
}
err = wrt.Flush()
if err != nil {
o.fctError(err)
return
}
}
}
func (o *cli) readResponse(con net.Conn, f libsck.Response) {
if f == nil {
return
}
o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionHandler)
f(con)
fct(o)
return nil
}

View File

@@ -32,6 +32,7 @@ package unix
import "fmt"
var (
ErrInstance = fmt.Errorf("invalid instance")
ErrAddress = fmt.Errorf("invalid dial address")
ErrInstance = fmt.Errorf("invalid instance")
ErrConnection = fmt.Errorf("invalid connection")
ErrAddress = fmt.Errorf("invalid dial address")
)

View File

@@ -54,5 +54,6 @@ func New(buffSizeRead libsiz.Size, unixfile string) ClientUnix {
s: s,
e: new(atomic.Value),
i: new(atomic.Value),
c: new(atomic.Value),
}
}

View File

@@ -30,7 +30,6 @@
package unix
import (
"bufio"
"context"
"errors"
"io"
@@ -46,6 +45,7 @@ type cli struct {
s *atomic.Int32 // buffer size
e *atomic.Value // function error
i *atomic.Value // function info
c *atomic.Value // net.Conn
}
func (o *cli) RegisterFuncError(f libsck.FuncError) {
@@ -112,91 +112,98 @@ func (o *cli) dial(ctx context.Context) (net.Conn, error) {
}
}
func (o *cli) Do(ctx context.Context, request io.Reader, fct libsck.Response) error {
func (o *cli) Connect(ctx context.Context) error {
if o == nil {
return ErrInstance
}
var (
err error
cnn net.Conn
con net.Conn
)
defer func() {
if cnn != nil {
o.fctInfo(cnn.LocalAddr(), cnn.RemoteAddr(), libsck.ConnectionClose)
o.fctError(cnn.Close())
}
}()
o.fctInfo(&net.UnixAddr{}, &net.UnixAddr{}, libsck.ConnectionDial)
if cnn, err = o.dial(ctx); err != nil {
if con, err = o.dial(ctx); err != nil {
o.fctError(err)
return err
}
o.fctInfo(cnn.LocalAddr(), cnn.RemoteAddr(), libsck.ConnectionNew)
o.sendRequest(cnn, request)
o.readResponse(cnn, fct)
o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionNew)
o.c.Store(con)
return nil
}
func (o *cli) sendRequest(con net.Conn, r io.Reader) {
var (
err error
buf []byte
rdr = bufio.NewReaderSize(r, o.buffSize())
wrt = bufio.NewWriterSize(con, o.buffSize())
)
func (o *cli) Read(p []byte) (n int, err error) {
if o == nil {
return 0, ErrInstance
} else if i := o.c.Load(); i == nil {
return 0, ErrConnection
} else if c, k := i.(net.Conn); !k {
return 0, ErrConnection
} else {
o.fctInfo(c.LocalAddr(), c.RemoteAddr(), libsck.ConnectionRead)
return c.Read(p)
}
}
func (o *cli) Write(p []byte) (n int, err error) {
if o == nil {
return 0, ErrInstance
} else if i := o.c.Load(); i == nil {
return 0, ErrConnection
} else if c, k := i.(net.Conn); !k {
return 0, ErrConnection
} else {
o.fctInfo(c.LocalAddr(), c.RemoteAddr(), libsck.ConnectionWrite)
return c.Write(p)
}
}
func (o *cli) Close() error {
if o == nil {
return ErrInstance
} else if i := o.c.Load(); i == nil {
return ErrConnection
} else if c, k := i.(net.Conn); !k {
return ErrConnection
} else {
o.fctInfo(c.LocalAddr(), c.RemoteAddr(), libsck.ConnectionClose)
e := c.Close()
o.c.Store(c)
return e
}
}
func (o *cli) Once(ctx context.Context, request io.Reader, fct libsck.Response) error {
if o == nil {
return ErrInstance
}
defer func() {
if con != nil {
if c, ok := con.(*net.UnixConn); ok {
o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionCloseWrite)
_ = c.CloseWrite()
}
}
o.fctError(o.Close())
}()
for {
if con == nil && r == nil {
return
}
var err error
buf, err = rdr.ReadBytes('\n')
if err = o.Connect(ctx); err != nil {
o.fctError(err)
return err
}
for {
_, err = io.Copy(o, request)
if err != nil {
if !errors.Is(err, io.EOF) {
o.fctError(err)
}
if len(buf) < 1 {
return
return err
} else {
break
}
}
o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionWrite)
_, err = wrt.Write(buf)
if err != nil {
o.fctError(err)
return
}
err = wrt.Flush()
if err != nil {
o.fctError(err)
return
}
}
}
func (o *cli) readResponse(con net.Conn, f libsck.Response) {
if f == nil {
return
}
o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionHandler)
f(con)
fct(o)
return nil
}

View File

@@ -32,6 +32,7 @@ package unixgram
import "fmt"
var (
ErrInstance = fmt.Errorf("invalid instance")
ErrAddress = fmt.Errorf("invalid dial address")
ErrInstance = fmt.Errorf("invalid instance")
ErrConnection = fmt.Errorf("invalid connection")
ErrAddress = fmt.Errorf("invalid dial address")
)

View File

@@ -54,5 +54,6 @@ func New(buffSizeRead libsiz.Size, unixfile string) ClientUnix {
s: s,
e: new(atomic.Value),
i: new(atomic.Value),
c: new(atomic.Value),
}
}

View File

@@ -30,7 +30,6 @@
package unixgram
import (
"bufio"
"context"
"errors"
"io"
@@ -46,6 +45,7 @@ type cli struct {
s *atomic.Int32 // buffer size
e *atomic.Value // function error
i *atomic.Value // function info
c *atomic.Value // net.Conn
}
func (o *cli) RegisterFuncError(f libsck.FuncError) {
@@ -112,80 +112,98 @@ func (o *cli) dial(ctx context.Context) (net.Conn, error) {
}
}
func (o *cli) Do(ctx context.Context, request io.Reader, fct libsck.Response) error {
func (o *cli) Connect(ctx context.Context) error {
if o == nil {
return ErrInstance
}
var (
err error
cnn net.Conn
con net.Conn
)
defer func() {
if cnn != nil {
o.fctInfo(cnn.LocalAddr(), cnn.RemoteAddr(), libsck.ConnectionClose)
o.fctError(cnn.Close())
}
}()
o.fctInfo(&net.UnixAddr{}, &net.UnixAddr{}, libsck.ConnectionDial)
if cnn, err = o.dial(ctx); err != nil {
if con, err = o.dial(ctx); err != nil {
o.fctError(err)
return err
}
o.fctInfo(cnn.LocalAddr(), cnn.RemoteAddr(), libsck.ConnectionNew)
o.sendRequest(cnn, request)
o.readResponse(cnn, fct)
o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionNew)
o.c.Store(con)
return nil
}
func (o *cli) sendRequest(con net.Conn, r io.Reader) {
var (
err error
buf []byte
rdr = bufio.NewReaderSize(r, o.buffSize())
wrt = bufio.NewWriterSize(con, o.buffSize())
)
func (o *cli) Read(p []byte) (n int, err error) {
if o == nil {
return 0, ErrInstance
} else if i := o.c.Load(); i == nil {
return 0, ErrConnection
} else if c, k := i.(net.Conn); !k {
return 0, ErrConnection
} else {
o.fctInfo(c.LocalAddr(), c.RemoteAddr(), libsck.ConnectionRead)
return c.Read(p)
}
}
func (o *cli) Write(p []byte) (n int, err error) {
if o == nil {
return 0, ErrInstance
} else if i := o.c.Load(); i == nil {
return 0, ErrConnection
} else if c, k := i.(net.Conn); !k {
return 0, ErrConnection
} else {
o.fctInfo(c.LocalAddr(), c.RemoteAddr(), libsck.ConnectionWrite)
return c.Write(p)
}
}
func (o *cli) Close() error {
if o == nil {
return ErrInstance
} else if i := o.c.Load(); i == nil {
return ErrConnection
} else if c, k := i.(net.Conn); !k {
return ErrConnection
} else {
o.fctInfo(c.LocalAddr(), c.RemoteAddr(), libsck.ConnectionClose)
e := c.Close()
o.c.Store(c)
return e
}
}
func (o *cli) Once(ctx context.Context, request io.Reader, fct libsck.Response) error {
if o == nil {
return ErrInstance
}
defer func() {
o.fctError(o.Close())
}()
var err error
if err = o.Connect(ctx); err != nil {
o.fctError(err)
return err
}
for {
if con == nil && r == nil {
return
}
buf, err = rdr.ReadBytes('\n')
_, err = io.Copy(o, request)
if err != nil {
if !errors.Is(err, io.EOF) {
o.fctError(err)
return err
} else {
break
}
return
}
o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionWrite)
_, err = wrt.Write(buf)
if err != nil {
o.fctError(err)
return
}
err = wrt.Flush()
if err != nil {
o.fctError(err)
return
}
}
}
func (o *cli) readResponse(con net.Conn, f libsck.Response) {
if f == nil {
return
}
o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionHandler)
f(con)
fct(o)
return nil
}

View File

@@ -94,8 +94,11 @@ type Server interface {
}
type Client interface {
io.ReadWriteCloser
RegisterFuncError(f FuncError)
RegisterFuncInfo(f FuncInfo)
Do(ctx context.Context, request io.Reader, fct Response) error
Connect(ctx context.Context) error
Once(ctx context.Context, request io.Reader, fct Response) error
}

View File

@@ -146,15 +146,16 @@ func (o *srv) Listen(ctx context.Context) error {
}
}
func (o *srv) Conn(conn net.Conn) {
func (o *srv) Conn(con net.Conn) {
defer func() {
o.fctInfo(conn.LocalAddr(), conn.RemoteAddr(), libsck.ConnectionClose)
_ = conn.Close()
o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionClose)
_ = con.Close()
}()
var (
err error
rdr = bufio.NewReaderSize(conn, o.buffSize())
nbr int
rdr = bufio.NewReaderSize(con, o.buffSize())
msg []byte
hdl libsck.Handler
)
@@ -164,25 +165,26 @@ func (o *srv) Conn(conn net.Conn) {
}
for {
msg = msg[:0]
msg, err = rdr.ReadBytes('\n')
nbr = len(msg)
o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionRead)
if nbr > 0 {
if !bytes.HasSuffix(msg, []byte{libsck.EOL}) {
msg = append(msg, libsck.EOL)
nbr++
}
o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionHandler)
hdl(bytes.NewBuffer(msg[:nbr]), con)
}
o.fctInfo(conn.LocalAddr(), conn.RemoteAddr(), libsck.ConnectionRead)
if err != nil {
if err != io.EOF {
o.fctError(err)
}
if len(msg) < 1 {
break
}
}
var buf = bytes.NewBuffer(msg)
if !bytes.HasSuffix(msg, []byte{libsck.EOL}) {
buf.Write([]byte{libsck.EOL})
}
o.fctInfo(conn.LocalAddr(), conn.RemoteAddr(), libsck.ConnectionHandler)
hdl(buf, conn)
}
}

View File

@@ -137,6 +137,13 @@ func (o *srv) Listen(ctx context.Context) error {
o.fctInfoSrv("starting listening socket '%s %s'", libptc.NetworkUDP.String(), adr)
// Accept new connection or stop if context or shutdown trigger
var (
siz = o.buffSize()
buf []byte
rer error
)
for {
// Accept an incoming connection.
if con == nil {
@@ -145,11 +152,7 @@ func (o *srv) Listen(ctx context.Context) error {
return err
}
var (
buf = make([]byte, o.buffSize())
rer error
)
buf = make([]byte, siz)
nbr, rem, rer = con.ReadFrom(buf)
if rem == nil {
@@ -162,20 +165,16 @@ func (o *srv) Listen(ctx context.Context) error {
if !stp.Load() {
o.fctError(rer)
}
if nbr < 1 {
continue
}
}
go func(la, ra net.Addr, b []byte) {
o.fctInfo(la, ra, libsck.ConnectionHandler)
r := bytes.NewBuffer(b)
if !bytes.HasSuffix(b, []byte{libsck.EOL}) {
r.Write([]byte{libsck.EOL})
if nbr > 0 {
if !bytes.HasSuffix(buf, []byte{libsck.EOL}) {
buf = append(buf, libsck.EOL)
nbr++
}
hdl(r, io.Discard)
}(loc, rem, buf[:nbr])
o.fctInfo(loc, rem, libsck.ConnectionHandler)
hdl(bytes.NewBuffer(buf[:nbr]), io.Discard)
}
}
}

View File

@@ -234,6 +234,7 @@ func (o *srv) Conn(con net.Conn) {
var (
err error
nbr int
rdr = bufio.NewReaderSize(con, o.buffSize())
msg []byte
hdl libsck.Handler
@@ -244,25 +245,26 @@ func (o *srv) Conn(con net.Conn) {
}
for {
msg = msg[:0]
msg, err = rdr.ReadBytes('\n')
nbr = len(msg)
o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionRead)
if nbr > 0 {
if !bytes.HasSuffix(msg, []byte{libsck.EOL}) {
msg = append(msg, libsck.EOL)
nbr++
}
o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionHandler)
hdl(bytes.NewBuffer(msg[:nbr]), con)
}
if err != nil {
if err != io.EOF {
o.fctError(err)
}
if len(msg) < 1 {
break
}
}
var buf = bytes.NewBuffer(msg)
if !bytes.HasSuffix(msg, []byte{libsck.EOL}) {
buf.Write([]byte{libsck.EOL})
}
o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionHandler)
hdl(buf, con)
}
}

View File

@@ -205,6 +205,13 @@ func (o *srv) Listen(ctx context.Context) error {
}()
o.r.Store(true)
var (
siz = o.buffSize()
buf []byte
rer error
)
// Accept new connection or stop if context or shutdown trigger
for {
// Accept an incoming connection.
@@ -214,11 +221,7 @@ func (o *srv) Listen(ctx context.Context) error {
return err
}
var (
buf = make([]byte, o.buffSize())
rer error
)
buf = make([]byte, siz)
nbr, rem, rer = con.ReadFrom(buf)
if rem == nil {
@@ -227,25 +230,20 @@ func (o *srv) Listen(ctx context.Context) error {
o.fctInfo(loc, rem, libsck.ConnectionRead)
if nbr > 0 {
if !bytes.HasSuffix(buf, []byte{libsck.EOL}) {
buf = append(buf, libsck.EOL)
nbr++
}
o.fctInfo(loc, rem, libsck.ConnectionHandler)
hdl(bytes.NewBuffer(buf[:nbr]), io.Discard)
}
if rer != nil {
if !stp.Load() {
o.fctError(rer)
}
if nbr < 1 {
continue
}
}
go func(la, ra net.Addr, b []byte) {
o.fctInfo(la, ra, libsck.ConnectionHandler)
r := bytes.NewBuffer(b)
if !bytes.HasSuffix(b, []byte{libsck.EOL}) {
r.Write([]byte{libsck.EOL})
}
hdl(r, io.Discard)
}(loc, rem, buf[:nbr])
}
}

View File

@@ -95,7 +95,7 @@ func main() {
_, _ = fmt.Fprintf(os.Stdout, "[%s %s]=>[%s %s] %s\n", remote.Network(), remote.String(), local.Network(), local.String(), state.String())
})
checkPanic(cli.Do(context.Background(), request(), func(r io.Reader) {
checkPanic(cli.Once(context.Background(), request(), func(r io.Reader) {
_, e := io.Copy(os.Stdout, r)
printError(e)
}))

View File

@@ -94,5 +94,5 @@ func main() {
_, _ = fmt.Fprintf(os.Stdout, "[%s %s]=>[%s %s] %s\n", remote.Network(), remote.String(), local.Network(), local.String(), state.String())
})
checkPanic(cli.Do(context.Background(), request(), nil))
checkPanic(cli.Once(context.Background(), request(), nil))
}

View File

@@ -95,5 +95,5 @@ func main() {
_, _ = fmt.Fprintf(os.Stdout, "[%s %s]=>[%s %s] %s\n", remote.Network(), remote.String(), local.Network(), local.String(), state.String())
})
checkPanic(cli.Do(context.Background(), request(), nil))
checkPanic(cli.Once(context.Background(), request(), nil))
}