Package Socket:

- rework client / server to offer tcp & udp server/client in addition of unix socket file
- add config model
- add aggregation function
This commit is contained in:
nabbar
2023-07-30 14:30:51 +02:00
parent aae7b9ea0e
commit 212cfdc0cf
30 changed files with 2110 additions and 100 deletions

View File

@@ -0,0 +1,57 @@
//go:build linux
// +build linux
/*
* MIT License
*
* Copyright (c) 2022 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
*/
package client
import (
"fmt"
"runtime"
"strings"
libptc "github.com/nabbar/golib/network/protocol"
libsck "github.com/nabbar/golib/socket"
sckclt "github.com/nabbar/golib/socket/client/tcp"
sckclu "github.com/nabbar/golib/socket/client/udp"
sckclx "github.com/nabbar/golib/socket/client/unix"
)
func New(proto libptc.NetworkProtocol, sizeBufferRead int32, address string) (libsck.Client, error) {
switch proto {
case libptc.NetworkUnix:
if strings.EqualFold(runtime.GOOS, "linux") {
return sckclx.New(sizeBufferRead, address), nil
}
case libptc.NetworkTCP, libptc.NetworkTCP4, libptc.NetworkTCP6:
return sckclt.New(sizeBufferRead, address)
case libptc.NetworkUDP, libptc.NetworkUDP4, libptc.NetworkUDP6:
return sckclu.New(address)
}
return nil, fmt.Errorf("invalid client protocol")
}

View File

@@ -0,0 +1,50 @@
//go:build !linux
// +build !linux
/*
* MIT License
*
* Copyright (c) 2022 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
*/
package client
import (
"fmt"
libptc "github.com/nabbar/golib/network/protocol"
libsck "github.com/nabbar/golib/socket"
sckclt "github.com/nabbar/golib/socket/client/tcp"
sckclu "github.com/nabbar/golib/socket/client/udp"
)
func New(proto libptc.NetworkProtocol, sizeBufferRead int32, address string) (libsck.Client, error) {
switch proto {
case libptc.NetworkTCP, libptc.NetworkTCP4, libptc.NetworkTCP6:
return sckclt.New(sizeBufferRead, address)
case libptc.NetworkUDP, libptc.NetworkUDP4, libptc.NetworkUDP6:
return sckclu.New(address)
}
return nil, fmt.Errorf("invalid client protocol")
}

View File

@@ -0,0 +1,38 @@
/*
* MIT License
*
* Copyright (c) 2022 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
*/
package tcp
import "fmt"
var (
ErrInstance = fmt.Errorf("invalid instance")
ErrAddress = fmt.Errorf("invalid dial address")
ErrHostName = fmt.Errorf("invalid dial host name")
ErrHostPort = fmt.Errorf("invalid dial host port")
ErrContextClosed = fmt.Errorf("context closed")
ErrClientClosed = fmt.Errorf("server closed")
)

View File

@@ -0,0 +1,72 @@
/*
* MIT License
*
* Copyright (c) 2022 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
*/
package tcp
import (
"net/url"
"strconv"
"sync/atomic"
libsck "github.com/nabbar/golib/socket"
)
type ClientTCP interface {
libsck.Client
}
func New(buffSizeRead int32, address string) (ClientTCP, error) {
var (
a = new(atomic.Value)
s = new(atomic.Int32)
u = &url.URL{
Host: address,
}
)
if len(u.Hostname()) < 1 {
return nil, ErrHostName
} else if len(u.Port()) < 1 {
return nil, ErrHostPort
} else if i, e := strconv.Atoi(u.Port()); e != nil {
return nil, e
} else if i < 1 || i > 65534 {
return nil, ErrHostPort
} else {
a.Store(u)
}
s.Store(buffSizeRead)
return &cltt{
a: a,
s: s,
e: new(atomic.Value),
i: new(atomic.Value),
tr: new(atomic.Value),
tw: new(atomic.Value),
}, nil
}

166
socket/client/tcp/model.go Normal file
View File

@@ -0,0 +1,166 @@
/*
* MIT License
*
* Copyright (c) 2022 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
*/
package tcp
import (
"bytes"
"context"
"io"
"net"
"os"
"sync/atomic"
libptc "github.com/nabbar/golib/network/protocol"
libsck "github.com/nabbar/golib/socket"
)
type cltt struct {
a *atomic.Value // address: hostname + port
s *atomic.Int32 // buffer size
e *atomic.Value // function error
i *atomic.Value // function info
tr *atomic.Value // connection read timeout
tw *atomic.Value // connection write timeout
}
func (o *cltt) RegisterFuncError(f libsck.FuncError) {
if o == nil {
return
}
o.e.Store(f)
}
func (o *cltt) RegisterFuncInfo(f libsck.FuncInfo) {
if o == nil {
return
}
o.i.Store(f)
}
func (o *cltt) fctError(e error) {
if o == nil {
return
}
v := o.e.Load()
if v != nil {
v.(libsck.FuncError)(e)
}
}
func (o *cltt) fctInfo(local, remote net.Addr, state libsck.ConnState) {
if o == nil {
return
}
v := o.i.Load()
if v != nil {
v.(libsck.FuncInfo)(local, remote, state)
}
}
func (o *cltt) buffRead() *bytes.Buffer {
v := o.s.Load()
if v > 0 {
return bytes.NewBuffer(make([]byte, 0, int(v)))
}
return bytes.NewBuffer(make([]byte, 0, libsck.DefaultBufferSize))
}
func (o *cltt) dial(ctx context.Context) (net.Conn, error) {
if o == nil {
return nil, ErrInstance
}
v := o.a.Load()
if v == nil {
return nil, ErrAddress
} else if _, e := os.Stat(v.(string)); e != nil {
return nil, e
} else {
d := net.Dialer{}
return d.DialContext(ctx, libptc.NetworkTCP.Code(), v.(string))
}
}
func (o *cltt) Do(ctx context.Context, request io.Reader) (io.Reader, error) {
if o == nil {
return nil, ErrInstance
}
var (
e error
lc net.Addr
rm net.Addr
cnn net.Conn
buf = o.buffRead()
)
o.fctInfo(nil, nil, libsck.ConnectionDial)
if cnn, e = o.dial(ctx); e != nil {
o.fctError(e)
return nil, e
}
defer o.fctError(cnn.Close())
lc = cnn.LocalAddr()
rm = cnn.RemoteAddr()
defer o.fctInfo(lc, rm, libsck.ConnectionClose)
o.fctInfo(lc, rm, libsck.ConnectionNew)
if request != nil {
o.fctInfo(lc, rm, libsck.ConnectionWrite)
if _, e = io.Copy(cnn, request); e != nil {
o.fctError(e)
return nil, e
}
}
o.fctInfo(lc, rm, libsck.ConnectionCloseWrite)
if e = cnn.(*net.TCPConn).CloseWrite(); e != nil {
o.fctError(e)
return nil, e
}
o.fctInfo(lc, rm, libsck.ConnectionRead)
if _, e = io.Copy(buf, cnn); e != nil {
o.fctError(e)
return nil, e
}
o.fctInfo(lc, rm, libsck.ConnectionCloseRead)
o.fctError(cnn.(*net.TCPConn).CloseRead())
return buf, nil
}

View File

@@ -0,0 +1,38 @@
/*
* MIT License
*
* Copyright (c) 2022 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
*/
package udp
import "fmt"
var (
ErrInstance = fmt.Errorf("invalid instance")
ErrAddress = fmt.Errorf("invalid dial address")
ErrHostName = fmt.Errorf("invalid dial host name")
ErrHostPort = fmt.Errorf("invalid dial host port")
ErrContextClosed = fmt.Errorf("context closed")
ErrClientClosed = fmt.Errorf("server closed")
)

View File

@@ -0,0 +1,68 @@
/*
* MIT License
*
* Copyright (c) 2022 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
*/
package udp
import (
"net/url"
"strconv"
"sync/atomic"
libsck "github.com/nabbar/golib/socket"
)
type ClientUDP interface {
libsck.Client
}
func New(address string) (ClientUDP, error) {
var (
a = new(atomic.Value)
u = &url.URL{
Host: address,
}
)
if len(u.Hostname()) < 1 {
return nil, ErrHostName
} else if len(u.Port()) < 1 {
return nil, ErrHostPort
} else if i, e := strconv.Atoi(u.Port()); e != nil {
return nil, e
} else if i < 1 || i > 65534 {
return nil, ErrHostPort
} else {
a.Store(u)
}
return &cltu{
a: a,
e: new(atomic.Value),
i: new(atomic.Value),
tr: new(atomic.Value),
tw: new(atomic.Value),
}, nil
}

View File

@@ -1,6 +1,3 @@
//go:build linux
// +build linux
/* /*
* MIT License * MIT License
* *
@@ -27,100 +24,122 @@
* *
*/ */
package client package udp
import ( import (
"bytes" "bytes"
"context" "context"
"fmt"
"io" "io"
"net" "net"
"os" "os"
"sync/atomic" "sync/atomic"
libptc "github.com/nabbar/golib/network/protocol"
libsck "github.com/nabbar/golib/socket" libsck "github.com/nabbar/golib/socket"
) )
type clt struct { type cltu struct {
u *atomic.Value // unixfile a *atomic.Value // address: hostname + port
f *atomic.Value // function error e *atomic.Value // function error
i *atomic.Value // function info
tr *atomic.Value // connection read timeout tr *atomic.Value // connection read timeout
tw *atomic.Value // connection write timeout tw *atomic.Value // connection write timeout
} }
func (o *clt) RegisterFuncError(f libsck.FuncError) { func (o *cltu) RegisterFuncError(f libsck.FuncError) {
if o == nil { if o == nil {
return return
} }
o.f.Store(f) o.e.Store(f)
} }
func (o *clt) fctError(e error) { func (o *cltu) RegisterFuncInfo(f libsck.FuncInfo) {
if o == nil { if o == nil {
return return
} }
v := o.f.Load() o.i.Store(f)
}
func (o *cltu) fctError(e error) {
if o == nil {
return
}
v := o.e.Load()
if v != nil { if v != nil {
v.(libsck.FuncError)(e) v.(libsck.FuncError)(e)
} }
} }
func (o *clt) dial(ctx context.Context) (net.Conn, error) { func (o *cltu) fctInfo(local, remote net.Addr, state libsck.ConnState) {
if o == nil { if o == nil {
return nil, fmt.Errorf("invalid instance") return
} }
v := o.u.Load() v := o.i.Load()
if v != nil {
v.(libsck.FuncInfo)(local, remote, state)
}
}
func (o *cltu) buffRead() *bytes.Buffer {
return bytes.NewBuffer(make([]byte, 0, 1))
}
func (o *cltu) dial(ctx context.Context) (net.Conn, error) {
if o == nil {
return nil, ErrInstance
}
v := o.a.Load()
if v == nil { if v == nil {
return nil, fmt.Errorf("invalid unix file") return nil, ErrAddress
} else if _, e := os.Stat(v.(string)); e != nil { } else if _, e := os.Stat(v.(string)); e != nil {
return nil, e return nil, e
} else { } else {
d := net.Dialer{} d := net.Dialer{}
return d.DialContext(ctx, "unix", v.(string)) return d.DialContext(ctx, libptc.NetworkUDP.Code(), v.(string))
} }
} }
func (o *clt) Connection(ctx context.Context, request io.Reader) (io.Reader, error) { func (o *cltu) Do(ctx context.Context, request io.Reader) (io.Reader, error) {
if o == nil { if o == nil {
return nil, fmt.Errorf("invalid instance") return nil, ErrInstance
} }
var ( var (
e error e error
lc net.Addr
rm net.Addr
cnn net.Conn cnn net.Conn
buf = o.buffRead()
) )
o.fctInfo(nil, nil, libsck.ConnectionDial)
if cnn, e = o.dial(ctx); e != nil { if cnn, e = o.dial(ctx); e != nil {
o.fctError(e) o.fctError(e)
return nil, e return nil, e
} }
defer cnn.Close() defer o.fctError(cnn.Close())
lc = cnn.LocalAddr()
rm = cnn.RemoteAddr()
defer o.fctInfo(lc, rm, libsck.ConnectionClose)
o.fctInfo(lc, rm, libsck.ConnectionNew)
if request != nil { if request != nil {
o.fctInfo(lc, rm, libsck.ConnectionWrite)
if _, e = io.Copy(cnn, request); e != nil { if _, e = io.Copy(cnn, request); e != nil {
o.fctError(e) o.fctError(e)
return nil, e return nil, e
} }
} }
if e = cnn.(*net.UnixConn).CloseWrite(); e != nil {
o.fctError(e)
return nil, e
}
var buf = bytes.NewBuffer(make([]byte, 0, 32*1024))
if _, e = io.Copy(buf, cnn); e != nil {
o.fctError(e)
return nil, e
}
_ = cnn.(*net.UnixConn).CloseRead()
return buf, nil return buf, nil
} }

View File

@@ -0,0 +1,39 @@
//go:build linux
// +build linux
/*
* MIT License
*
* Copyright (c) 2022 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
*/
package unix
import "fmt"
var (
ErrInvalidInstance = fmt.Errorf("invalid instance")
ErrUnixFile = fmt.Errorf("invalid unix file")
ErrContextClosed = fmt.Errorf("context closed")
ErrServerClosed = fmt.Errorf("server closed")
)

View File

@@ -24,4 +24,4 @@
* *
*/ */
package client package unix

View File

@@ -27,28 +27,32 @@
* *
*/ */
package client package unix
import ( import (
"context"
"io"
"sync/atomic" "sync/atomic"
libsck "github.com/nabbar/golib/socket" libsck "github.com/nabbar/golib/socket"
) )
type Client interface { type ClientUnix interface {
RegisterFuncError(f libsck.FuncError) libsck.Client
Connection(ctx context.Context, request io.Reader) (io.Reader, error)
} }
func New(unixfile string) Client { func New(buffSizeRead int32, unixfile string) ClientUnix {
u := new(atomic.Value) var (
u.Store(unixfile) a = new(atomic.Value)
s = new(atomic.Int32)
)
return &clt{ a.Store(unixfile)
u: u, s.Store(buffSizeRead)
f: new(atomic.Value),
return &cltx{
a: a,
s: s,
e: new(atomic.Value),
i: new(atomic.Value),
tr: new(atomic.Value), tr: new(atomic.Value),
tw: new(atomic.Value), tw: new(atomic.Value),
} }

169
socket/client/unix/model.go Normal file
View File

@@ -0,0 +1,169 @@
//go:build linux
// +build linux
/*
* MIT License
*
* Copyright (c) 2022 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
*/
package unix
import (
"bytes"
"context"
"io"
"net"
"os"
"sync/atomic"
libptc "github.com/nabbar/golib/network/protocol"
libsck "github.com/nabbar/golib/socket"
)
type cltx struct {
a *atomic.Value // address : unixfile
s *atomic.Int32 // buffer size
e *atomic.Value // function error
i *atomic.Value // function info
tr *atomic.Value // connection read timeout
tw *atomic.Value // connection write timeout
}
func (o *cltx) RegisterFuncError(f libsck.FuncError) {
if o == nil {
return
}
o.e.Store(f)
}
func (o *cltx) RegisterFuncInfo(f libsck.FuncInfo) {
if o == nil {
return
}
o.i.Store(f)
}
func (o *cltx) fctError(e error) {
if o == nil {
return
}
v := o.e.Load()
if v != nil {
v.(libsck.FuncError)(e)
}
}
func (o *cltx) fctInfo(local, remote net.Addr, state libsck.ConnState) {
if o == nil {
return
}
v := o.i.Load()
if v != nil {
v.(libsck.FuncInfo)(local, remote, state)
}
}
func (o *cltx) buffRead() *bytes.Buffer {
v := o.s.Load()
if v > 0 {
return bytes.NewBuffer(make([]byte, 0, int(v)))
}
return bytes.NewBuffer(make([]byte, 0, libsck.DefaultBufferSize))
}
func (o *cltx) dial(ctx context.Context) (net.Conn, error) {
if o == nil {
return nil, ErrInvalidInstance
}
v := o.a.Load()
if v == nil {
return nil, ErrUnixFile
} else if _, e := os.Stat(v.(string)); e != nil {
return nil, e
} else {
d := net.Dialer{}
return d.DialContext(ctx, libptc.NetworkUnix.Code(), v.(string))
}
}
func (o *cltx) Do(ctx context.Context, request io.Reader) (io.Reader, error) {
if o == nil {
return nil, ErrInvalidInstance
}
var (
e error
lc net.Addr
rm net.Addr
cnn net.Conn
buf = o.buffRead()
)
o.fctInfo(nil, nil, libsck.ConnectionDial)
if cnn, e = o.dial(ctx); e != nil {
o.fctError(e)
return nil, e
}
defer o.fctError(cnn.Close())
lc = cnn.LocalAddr()
rm = cnn.RemoteAddr()
defer o.fctInfo(lc, rm, libsck.ConnectionClose)
o.fctInfo(lc, rm, libsck.ConnectionNew)
if request != nil {
o.fctInfo(lc, rm, libsck.ConnectionWrite)
if _, e = io.Copy(cnn, request); e != nil {
o.fctError(e)
return nil, e
}
}
o.fctInfo(lc, rm, libsck.ConnectionCloseWrite)
if e = cnn.(*net.UnixConn).CloseWrite(); e != nil {
o.fctError(e)
return nil, e
}
o.fctInfo(lc, rm, libsck.ConnectionRead)
if _, e = io.Copy(buf, cnn); e != nil {
o.fctError(e)
return nil, e
}
o.fctInfo(lc, rm, libsck.ConnectionCloseRead)
o.fctError(cnn.(*net.UnixConn).CloseRead())
return buf, nil
}

43
socket/config/client.go Normal file
View File

@@ -0,0 +1,43 @@
/*
* MIT License
*
* Copyright (c) 2022 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
*/
package config
import (
libptc "github.com/nabbar/golib/network/protocol"
libsck "github.com/nabbar/golib/socket"
sckclt "github.com/nabbar/golib/socket/client"
)
type ClientConfig struct {
Network libptc.NetworkProtocol ``
Address string
ReadBuffSize int32
}
func (o ClientConfig) New() (libsck.Client, error) {
return sckclt.New(o.Network, o.ReadBuffSize, o.Address)
}

57
socket/config/server.go Normal file
View File

@@ -0,0 +1,57 @@
/*
* MIT License
*
* Copyright (c) 2022 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
*/
package config
import (
"os"
"time"
libptc "github.com/nabbar/golib/network/protocol"
libsck "github.com/nabbar/golib/socket"
scksrv "github.com/nabbar/golib/socket/server"
)
type ServerConfig struct {
Network libptc.NetworkProtocol ``
Address string
PermFile os.FileMode
BuffSizeRead int32
BuffSizeWrite int32
TimeoutRead time.Duration
TimeoutWrite time.Duration
}
func (o ServerConfig) New(handler libsck.Handler) (libsck.Server, error) {
s, e := scksrv.New(handler, o.Network, o.BuffSizeRead, o.BuffSizeWrite, o.Address, o.PermFile)
if e != nil {
s.SetReadTimeout(o.TimeoutRead)
s.SetWriteTimeout(o.TimeoutWrite)
}
return s, e
}

72
socket/interface.go Normal file
View File

@@ -0,0 +1,72 @@
/*
* MIT License
*
* Copyright (c) 2022 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
*/
package socket
import (
"context"
"io"
"net"
"time"
)
const DefaultBufferSize = 32 * 1024
type ConnState uint8
const (
ConnectionDial ConnState = iota
ConnectionNew
ConnectionRead
ConnectionCloseRead
ConnectionHandler
ConnectionWrite
ConnectionCloseWrite
ConnectionClose
)
type FuncError func(e error)
type FuncInfo func(local, remote net.Addr, state ConnState)
type Handler func(request io.Reader, response io.Writer)
type Server interface {
RegisterFuncError(f FuncError)
RegisterFuncInfo(f FuncInfo)
SetReadTimeout(d time.Duration)
SetWriteTimeout(d time.Duration)
Listen(ctx context.Context) error
Shutdown()
Done() <-chan struct{}
}
type Client interface {
RegisterFuncError(f FuncError)
RegisterFuncInfo(f FuncInfo)
Do(ctx context.Context, request io.Reader) (io.Reader, error)
}

View File

@@ -0,0 +1,64 @@
//go:build linux
// +build linux
/*
* MIT License
*
* Copyright (c) 2022 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
*/
package server
import (
"fmt"
"os"
"runtime"
"strings"
libptc "github.com/nabbar/golib/network/protocol"
libsck "github.com/nabbar/golib/socket"
scksrt "github.com/nabbar/golib/socket/server/tcp"
scksru "github.com/nabbar/golib/socket/server/udp"
scksrx "github.com/nabbar/golib/socket/server/unix"
)
func New(handler libsck.Handler, proto libptc.NetworkProtocol, sizeBufferRead, sizeBufferWrite int32, address string, perm os.FileMode) (libsck.Server, error) {
switch proto {
case libptc.NetworkUnix:
if strings.EqualFold(runtime.GOOS, "linux") {
s := scksrx.New(handler, sizeBufferRead, sizeBufferWrite)
s.RegisterSocket(address, perm)
return s, nil
}
case libptc.NetworkTCP, libptc.NetworkTCP4, libptc.NetworkTCP6:
s := scksrt.New(handler, sizeBufferRead, sizeBufferWrite)
e := s.RegisterServer(address)
return s, e
case libptc.NetworkUDP, libptc.NetworkUDP4, libptc.NetworkUDP6:
s := scksru.New(handler, sizeBufferRead, sizeBufferWrite)
e := s.RegisterServer(address)
return s, e
}
return nil, fmt.Errorf("invalid server protocol")
}

View File

@@ -0,0 +1,55 @@
//go:build !linux
// +build !linux
/*
* MIT License
*
* Copyright (c) 2022 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
*/
package server
import (
"fmt"
"os"
libptc "github.com/nabbar/golib/network/protocol"
libsck "github.com/nabbar/golib/socket"
scksrt "github.com/nabbar/golib/socket/server/tcp"
scksru "github.com/nabbar/golib/socket/server/udp"
)
func New(handler libsck.Handler, proto libptc.NetworkProtocol, sizeBufferRead, sizeBufferWrite int32, address string, perm os.FileMode) (libsck.Server, error) {
switch proto {
case libptc.NetworkTCP, libptc.NetworkTCP4, libptc.NetworkTCP6:
s := scksrt.New(handler, sizeBufferRead, sizeBufferWrite)
e := s.RegisterServer(address)
return s, e
case libptc.NetworkUDP, libptc.NetworkUDP4, libptc.NetworkUDP6:
s := scksru.New(handler, sizeBufferRead, sizeBufferWrite)
e := s.RegisterServer(address)
return s, e
}
return nil, fmt.Errorf("invalid server protocol")
}

View File

@@ -0,0 +1,37 @@
/*
* MIT License
*
* Copyright (c) 2022 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
*/
package tcp
import "fmt"
var (
ErrInvalidAddress = fmt.Errorf("invalid listen address")
ErrInvalidHostName = fmt.Errorf("invalid server host name")
ErrInvalidHostPort = fmt.Errorf("invalid server host port")
ErrContextClosed = fmt.Errorf("context closed")
ErrServerClosed = fmt.Errorf("server closed")
)

View File

@@ -0,0 +1,68 @@
/*
* MIT License
*
* Copyright (c) 2022 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
*/
package tcp
import (
"sync/atomic"
libsck "github.com/nabbar/golib/socket"
)
type ServerTcp interface {
libsck.Server
RegisterServer(address string) error
}
func New(h libsck.Handler, sizeBuffRead, sizeBuffWrite int32) ServerTcp {
c := new(atomic.Value)
c.Store(make(chan []byte))
s := new(atomic.Value)
s.Store(make(chan struct{}))
f := new(atomic.Value)
f.Store(h)
sr := new(atomic.Int32)
sr.Store(sizeBuffRead)
sw := new(atomic.Int32)
sw.Store(sizeBuffWrite)
return &srv{
l: nil,
h: f,
c: c,
s: s,
e: new(atomic.Value),
i: new(atomic.Value),
tr: new(atomic.Value),
tw: new(atomic.Value),
sr: sr,
sw: sw,
}
}

View File

@@ -0,0 +1,184 @@
/*
* MIT License
*
* Copyright (c) 2022 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
*/
package tcp
import (
"bytes"
"context"
"io"
"net"
"net/url"
"time"
libptc "github.com/nabbar/golib/network/protocol"
libsck "github.com/nabbar/golib/socket"
)
func (o *srv) timeoutRead() time.Time {
v := o.tr.Load()
if v != nil {
return time.Now().Add(v.(time.Duration))
}
return time.Time{}
}
func (o *srv) timeoutWrite() time.Time {
v := o.tw.Load()
if v != nil {
return time.Now().Add(v.(time.Duration))
}
return time.Time{}
}
func (o *srv) buffRead() *bytes.Buffer {
v := o.sr.Load()
if v > 0 {
return bytes.NewBuffer(make([]byte, 0, int(v)))
}
return bytes.NewBuffer(make([]byte, 0, libsck.DefaultBufferSize))
}
func (o *srv) buffWrite() *bytes.Buffer {
v := o.sw.Load()
if v > 0 {
return bytes.NewBuffer(make([]byte, 0, int(v)))
}
return bytes.NewBuffer(make([]byte, 0, libsck.DefaultBufferSize))
}
func (o *srv) getAddress() *url.URL {
f := o.ad.Load()
if f != nil {
return f.(*url.URL)
}
return nil
}
func (o *srv) Listen(ctx context.Context) error {
var (
e error
l net.Listener
a = o.getAddress()
)
if a == nil {
return ErrInvalidAddress
} else if l, e = net.Listen(libptc.NetworkTCP.Code(), a.Host); e != nil {
return e
}
var fctClose = func() {
if l != nil {
o.fctError(l.Close())
}
}
defer fctClose()
// Accept new connection or stop if context or shutdown trigger
for {
select {
case <-ctx.Done():
return ErrContextClosed
case <-o.Done():
return nil
default:
// Accept an incoming connection.
if l == nil {
return ErrServerClosed
} else if co, ce := l.Accept(); ce != nil {
o.fctError(ce)
} else {
go o.Conn(co)
}
}
}
}
func (o *srv) Conn(conn net.Conn) {
defer o.fctError(conn.Close())
var (
lc = conn.LocalAddr()
rm = conn.RemoteAddr()
tr = o.timeoutRead()
tw = o.timeoutWrite()
br = o.buffRead()
bw = o.buffWrite()
)
defer o.fctInfo(lc, rm, libsck.ConnectionClose)
o.fctInfo(lc, rm, libsck.ConnectionNew)
if !tr.IsZero() {
if e := conn.SetReadDeadline(tr); e != nil {
o.fctError(e)
return
}
}
if !tw.IsZero() {
if e := conn.SetReadDeadline(tw); e != nil {
o.fctError(e)
return
}
}
o.fctInfo(lc, rm, libsck.ConnectionRead)
if _, e := io.Copy(br, conn); e != nil {
o.fctError(e)
return
}
o.fctInfo(lc, rm, libsck.ConnectionCloseRead)
if e := conn.(*net.TCPConn).CloseRead(); e != nil {
o.fctError(e)
return
}
if h := o.handler(); h != nil {
o.fctInfo(lc, rm, libsck.ConnectionHandler)
h(br, bw)
}
if bw.Len() > 0 {
o.fctInfo(lc, rm, libsck.ConnectionWrite)
if _, e := io.Copy(conn, bw); e != nil {
o.fctError(e)
}
}
o.fctInfo(lc, rm, libsck.ConnectionCloseWrite)
if e := conn.(*net.TCPConn).CloseWrite(); e != nil {
o.fctError(e)
}
}

169
socket/server/tcp/model.go Normal file
View File

@@ -0,0 +1,169 @@
/*
* MIT License
*
* Copyright (c) 2022 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
*/
package tcp
import (
"net"
"net/url"
"strconv"
"sync/atomic"
"time"
libsck "github.com/nabbar/golib/socket"
)
var (
closedChanStruct chan struct{}
)
func init() {
closedChanStruct = make(chan struct{})
close(closedChanStruct)
}
type srv struct {
l net.Listener
h *atomic.Value // handler
c *atomic.Value // chan []byte
s *atomic.Value // chan struct{}
e *atomic.Value // function error
i *atomic.Value // function info
tr *atomic.Value // connection read timeout
tw *atomic.Value // connection write timeout
sr *atomic.Int32 // read buffer size
sw *atomic.Int32 // write buffer size
ad *atomic.Value // Server address url
}
func (o *srv) Done() <-chan struct{} {
s := o.s.Load()
if s != nil {
return s.(chan struct{})
}
return closedChanStruct
}
func (o *srv) Shutdown() {
if o == nil {
return
}
s := o.s.Load()
if s != nil {
o.s.Store(nil)
}
}
func (o *srv) RegisterFuncError(f libsck.FuncError) {
if o == nil {
return
}
o.e.Store(f)
}
func (o *srv) RegisterFuncInfo(f libsck.FuncInfo) {
if o == nil {
return
}
o.i.Store(f)
}
func (o *srv) SetReadTimeout(d time.Duration) {
if o == nil {
return
}
o.tr.Store(d)
}
func (o *srv) SetWriteTimeout(d time.Duration) {
if o == nil {
return
}
o.tw.Store(d)
}
func (o *srv) RegisterServer(address string) error {
var u = &url.URL{
Host: address,
}
if len(u.Hostname()) < 1 {
return ErrInvalidHostName
} else if len(u.Port()) < 1 {
return ErrInvalidHostPort
} else if i, e := strconv.Atoi(u.Port()); e != nil {
return e
} else if i < 1 || i > 65534 {
return ErrInvalidHostPort
}
o.ad.Store(u)
return nil
}
func (o *srv) fctError(e error) {
if o == nil {
return
}
v := o.e.Load()
if v != nil {
v.(libsck.FuncError)(e)
}
}
func (o *srv) fctInfo(local, remote net.Addr, state libsck.ConnState) {
if o == nil {
return
}
v := o.i.Load()
if v != nil {
v.(libsck.FuncInfo)(local, remote, state)
}
}
func (o *srv) handler() libsck.Handler {
if o == nil {
return nil
}
v := o.h.Load()
if v != nil {
return v.(libsck.Handler)
}
return nil
}

View File

@@ -0,0 +1,37 @@
/*
* MIT License
*
* Copyright (c) 2022 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
*/
package udp
import "fmt"
var (
ErrInvalidAddress = fmt.Errorf("invalid listen address")
ErrInvalidHostName = fmt.Errorf("invalid server host name")
ErrInvalidHostPort = fmt.Errorf("invalid server host port")
ErrContextClosed = fmt.Errorf("context closed")
ErrServerClosed = fmt.Errorf("server closed")
)

View File

@@ -0,0 +1,68 @@
/*
* MIT License
*
* Copyright (c) 2022 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
*/
package udp
import (
"sync/atomic"
libsck "github.com/nabbar/golib/socket"
)
type ServerTcp interface {
libsck.Server
RegisterServer(address string) error
}
func New(h libsck.Handler, sizeBuffRead, sizeBuffWrite int32) ServerTcp {
c := new(atomic.Value)
c.Store(make(chan []byte))
s := new(atomic.Value)
s.Store(make(chan struct{}))
f := new(atomic.Value)
f.Store(h)
sr := new(atomic.Int32)
sr.Store(sizeBuffRead)
sw := new(atomic.Int32)
sw.Store(sizeBuffWrite)
return &srv{
l: nil,
h: f,
c: c,
s: s,
e: new(atomic.Value),
i: new(atomic.Value),
tr: new(atomic.Value),
tw: new(atomic.Value),
sr: sr,
sw: sw,
}
}

View File

@@ -0,0 +1,166 @@
/*
* MIT License
*
* Copyright (c) 2022 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
*/
package udp
import (
"bytes"
"context"
"io"
"net"
"net/url"
"time"
libptc "github.com/nabbar/golib/network/protocol"
libsck "github.com/nabbar/golib/socket"
)
func (o *srv) timeoutRead() time.Time {
v := o.tr.Load()
if v != nil {
return time.Now().Add(v.(time.Duration))
}
return time.Time{}
}
func (o *srv) timeoutWrite() time.Time {
v := o.tw.Load()
if v != nil {
return time.Now().Add(v.(time.Duration))
}
return time.Time{}
}
func (o *srv) buffRead() *bytes.Buffer {
v := o.sr.Load()
if v > 0 {
return bytes.NewBuffer(make([]byte, 0, int(v)))
}
return bytes.NewBuffer(make([]byte, 0, libsck.DefaultBufferSize))
}
func (o *srv) buffWrite() *bytes.Buffer {
v := o.sw.Load()
if v > 0 {
return bytes.NewBuffer(make([]byte, 0, int(v)))
}
return bytes.NewBuffer(make([]byte, 0, libsck.DefaultBufferSize))
}
func (o *srv) getAddress() *url.URL {
f := o.ad.Load()
if f != nil {
return f.(*url.URL)
}
return nil
}
func (o *srv) Listen(ctx context.Context) error {
var (
e error
l net.Listener
a = o.getAddress()
)
if a == nil {
return ErrInvalidAddress
} else if l, e = net.Listen(libptc.NetworkUDP.Code(), a.Host); e != nil {
return e
}
var fctClose = func() {
if l != nil {
o.fctError(l.Close())
}
}
defer fctClose()
// Accept new connection or stop if context or shutdown trigger
for {
select {
case <-ctx.Done():
return ErrContextClosed
case <-o.Done():
return nil
default:
// Accept an incoming connection.
if l == nil {
return ErrServerClosed
} else if co, ce := l.Accept(); ce != nil {
o.fctError(ce)
} else {
go o.Conn(co)
}
}
}
}
func (o *srv) Conn(conn net.Conn) {
defer o.fctError(conn.Close())
var (
lc = conn.LocalAddr()
rm = conn.RemoteAddr()
tr = o.timeoutRead()
tw = o.timeoutWrite()
br = o.buffRead()
bw = io.Discard
)
defer o.fctInfo(lc, rm, libsck.ConnectionClose)
o.fctInfo(lc, rm, libsck.ConnectionNew)
if !tr.IsZero() {
if e := conn.SetReadDeadline(tr); e != nil {
o.fctError(e)
return
}
}
if !tw.IsZero() {
if e := conn.SetReadDeadline(tw); e != nil {
o.fctError(e)
return
}
}
o.fctInfo(lc, rm, libsck.ConnectionRead)
if _, e := io.Copy(br, conn); e != nil {
o.fctError(e)
return
}
if h := o.handler(); h != nil {
o.fctInfo(lc, rm, libsck.ConnectionHandler)
h(br, bw)
}
}

169
socket/server/udp/model.go Normal file
View File

@@ -0,0 +1,169 @@
/*
* MIT License
*
* Copyright (c) 2022 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
*/
package udp
import (
"net"
"net/url"
"strconv"
"sync/atomic"
"time"
libsck "github.com/nabbar/golib/socket"
)
var (
closedChanStruct chan struct{}
)
func init() {
closedChanStruct = make(chan struct{})
close(closedChanStruct)
}
type srv struct {
l net.Listener
h *atomic.Value // handler
c *atomic.Value // chan []byte
s *atomic.Value // chan struct{}
e *atomic.Value // function error
i *atomic.Value // function info
tr *atomic.Value // connection read timeout
tw *atomic.Value // connection write timeout
sr *atomic.Int32 // read buffer size
sw *atomic.Int32 // write buffer size
ad *atomic.Value // Server address url
}
func (o *srv) Done() <-chan struct{} {
s := o.s.Load()
if s != nil {
return s.(chan struct{})
}
return closedChanStruct
}
func (o *srv) Shutdown() {
if o == nil {
return
}
s := o.s.Load()
if s != nil {
o.s.Store(nil)
}
}
func (o *srv) RegisterFuncError(f libsck.FuncError) {
if o == nil {
return
}
o.e.Store(f)
}
func (o *srv) RegisterFuncInfo(f libsck.FuncInfo) {
if o == nil {
return
}
o.i.Store(f)
}
func (o *srv) SetReadTimeout(d time.Duration) {
if o == nil {
return
}
o.tr.Store(d)
}
func (o *srv) SetWriteTimeout(d time.Duration) {
if o == nil {
return
}
o.tw.Store(d)
}
func (o *srv) RegisterServer(address string) error {
var u = &url.URL{
Host: address,
}
if len(u.Hostname()) < 1 {
return ErrInvalidHostName
} else if len(u.Port()) < 1 {
return ErrInvalidHostPort
} else if i, e := strconv.Atoi(u.Port()); e != nil {
return e
} else if i < 1 || i > 65534 {
return ErrInvalidHostPort
}
o.ad.Store(u)
return nil
}
func (o *srv) fctError(e error) {
if o == nil {
return
}
v := o.e.Load()
if v != nil {
v.(libsck.FuncError)(e)
}
}
func (o *srv) fctInfo(local, remote net.Addr, state libsck.ConnState) {
if o == nil {
return
}
v := o.i.Load()
if v != nil {
v.(libsck.FuncInfo)(local, remote, state)
}
}
func (o *srv) handler() libsck.Handler {
if o == nil {
return nil
}
v := o.h.Load()
if v != nil {
return v.(libsck.Handler)
}
return nil
}

View File

@@ -0,0 +1,37 @@
//go:build linux
// +build linux
/*
* MIT License
*
* Copyright (c) 2022 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
*/
package unix
import "fmt"
var (
ErrContextClosed = fmt.Errorf("context closed")
ErrServerClosed = fmt.Errorf("server closed")
)

View File

@@ -24,4 +24,4 @@
* *
*/ */
package socket package unix

View File

@@ -27,30 +27,21 @@
* *
*/ */
package socket package unix
import ( import (
"context"
"io"
"os" "os"
"sync/atomic" "sync/atomic"
"time"
libsck "github.com/nabbar/golib/socket"
) )
type FuncError func(e error) type ServerUnix interface {
type Handler func(request io.Reader, response io.Writer) libsck.Server
RegisterSocket(unixFile string, perm os.FileMode)
type Server interface {
RegisterFuncError(f FuncError)
SetReadTimeout(d time.Duration)
SetWriteTimeout(d time.Duration)
Listen(ctx context.Context, unixFile string, perm os.FileMode)
Shutdown()
Done() <-chan struct{}
} }
func New(h Handler, sizeBuffRead, sizeBuffWrite int32) Server { func New(h libsck.Handler, sizeBuffRead, sizeBuffWrite int32) ServerUnix {
c := new(atomic.Value) c := new(atomic.Value)
c.Store(make(chan []byte)) c.Store(make(chan []byte))
@@ -66,15 +57,24 @@ func New(h Handler, sizeBuffRead, sizeBuffWrite int32) Server {
sw := new(atomic.Int32) sw := new(atomic.Int32)
sw.Store(sizeBuffWrite) sw.Store(sizeBuffWrite)
fp := new(atomic.Value)
fp.Store("")
pe := new(atomic.Int64)
pe.Store(0)
return &srv{ return &srv{
l: nil, l: nil,
h: f, h: f,
c: c, c: c,
s: s, s: s,
f: new(atomic.Value), e: new(atomic.Value),
i: new(atomic.Value),
tr: new(atomic.Value), tr: new(atomic.Value),
tw: new(atomic.Value), tw: new(atomic.Value),
sr: sr, sr: sr,
sw: sw, sw: sw,
fs: fp,
fp: pe,
} }
} }

View File

@@ -27,7 +27,7 @@
* *
*/ */
package socket package unix
import ( import (
"bytes" "bytes"
@@ -41,6 +41,9 @@ import (
"path/filepath" "path/filepath"
"syscall" "syscall"
"time" "time"
libptc "github.com/nabbar/golib/network/protocol"
libsck "github.com/nabbar/golib/socket"
) )
func (o *srv) timeoutRead() time.Time { func (o *srv) timeoutRead() time.Time {
@@ -67,7 +70,7 @@ func (o *srv) buffRead() *bytes.Buffer {
return bytes.NewBuffer(make([]byte, 0, int(v))) return bytes.NewBuffer(make([]byte, 0, int(v)))
} }
return bytes.NewBuffer(make([]byte, 0, 32*1024)) return bytes.NewBuffer(make([]byte, 0, libsck.DefaultBufferSize))
} }
func (o *srv) buffWrite() *bytes.Buffer { func (o *srv) buffWrite() *bytes.Buffer {
@@ -76,7 +79,25 @@ func (o *srv) buffWrite() *bytes.Buffer {
return bytes.NewBuffer(make([]byte, 0, int(v))) return bytes.NewBuffer(make([]byte, 0, int(v)))
} }
return bytes.NewBuffer(make([]byte, 0, 32*1024)) return bytes.NewBuffer(make([]byte, 0, libsck.DefaultBufferSize))
}
func (o *srv) getSocketFile() (string, error) {
f := o.fs.Load()
if f != nil {
return o.checkFile(f.(string))
}
return "", os.ErrNotExist
}
func (o *srv) getSocketPerm() os.FileMode {
p := o.fp.Load()
if p > 0 {
return os.FileMode(p)
}
return os.FileMode(0770)
} }
func (o *srv) checkFile(unixFile string) (string, error) { func (o *srv) checkFile(unixFile string) (string, error) {
@@ -97,23 +118,24 @@ func (o *srv) checkFile(unixFile string) (string, error) {
return unixFile, nil return unixFile, nil
} }
func (o *srv) Listen(ctx context.Context, unixFile string, perm os.FileMode) { func (o *srv) Listen(ctx context.Context) error {
var ( var (
e error e error
i fs.FileInfo i fs.FileInfo
l net.Listener l net.Listener
perm = o.getSocketPerm()
unixFile string
p = syscall.Umask(int(perm)) p = syscall.Umask(int(perm))
) )
if unixFile, e = o.checkFile(unixFile); e != nil { if unixFile, e = o.getSocketFile(); e != nil {
o.fctError(e) return e
return } else if l, e = net.Listen(libptc.NetworkUnix.Code(), unixFile); e != nil {
} else if l, e = net.Listen("unix", unixFile); e != nil { return e
o.fctError(e)
return
} else if i, e = os.Stat(unixFile); e != nil { } else if i, e = os.Stat(unixFile); e != nil {
o.fctError(e) return e
return
} }
syscall.Umask(p) syscall.Umask(p)
@@ -132,7 +154,7 @@ func (o *srv) Listen(ctx context.Context, unixFile string, perm os.FileMode) {
if i.Mode() != perm { if i.Mode() != perm {
if e = os.Chmod(unixFile, perm); e != nil { if e = os.Chmod(unixFile, perm); e != nil {
o.fctError(e) return e
} }
} }
@@ -140,13 +162,13 @@ func (o *srv) Listen(ctx context.Context, unixFile string, perm os.FileMode) {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return ErrContextClosed
case <-o.Done(): case <-o.Done():
return return nil
default: default:
// Accept an incoming connection. // Accept an incoming connection.
if l == nil { if l == nil {
return return ErrServerClosed
} else if co, ce := l.Accept(); ce != nil { } else if co, ce := l.Accept(); ce != nil {
o.fctError(ce) o.fctError(ce)
} else { } else {
@@ -157,15 +179,21 @@ func (o *srv) Listen(ctx context.Context, unixFile string, perm os.FileMode) {
} }
func (o *srv) Conn(conn net.Conn) { func (o *srv) Conn(conn net.Conn) {
defer conn.Close() defer o.fctError(conn.Close())
var ( var (
lc = conn.LocalAddr()
rm = conn.RemoteAddr()
tr = o.timeoutRead() tr = o.timeoutRead()
tw = o.timeoutWrite() tw = o.timeoutWrite()
br = o.buffRead() br = o.buffRead()
bw = o.buffWrite() bw = o.buffWrite()
) )
defer o.fctInfo(lc, rm, libsck.ConnectionClose)
o.fctInfo(lc, rm, libsck.ConnectionNew)
if !tr.IsZero() { if !tr.IsZero() {
if e := conn.SetReadDeadline(tr); e != nil { if e := conn.SetReadDeadline(tr); e != nil {
o.fctError(e) o.fctError(e)
@@ -180,23 +208,34 @@ func (o *srv) Conn(conn net.Conn) {
} }
} }
o.fctInfo(lc, rm, libsck.ConnectionRead)
if _, e := io.Copy(br, conn); e != nil { if _, e := io.Copy(br, conn); e != nil {
o.fctError(e) o.fctError(e)
return return
} else if e = conn.(*net.UnixConn).CloseRead(); e != nil { }
o.fctInfo(lc, rm, libsck.ConnectionCloseRead)
if e := conn.(*net.UnixConn).CloseRead(); e != nil {
o.fctError(e) o.fctError(e)
return return
} }
if h := o.handler(); h != nil { if h := o.handler(); h != nil {
o.fctInfo(lc, rm, libsck.ConnectionHandler)
h(br, bw) h(br, bw)
} }
if _, e := io.Copy(conn, bw); e != nil { if bw.Len() > 0 {
o.fctInfo(lc, rm, libsck.ConnectionWrite)
if _, e := io.Copy(conn, bw); e != nil {
o.fctError(e)
}
}
o.fctInfo(lc, rm, libsck.ConnectionCloseWrite)
if e := conn.(*net.UnixConn).CloseWrite(); e != nil {
o.fctError(e) o.fctError(e)
return
} else if e = conn.(*net.UnixConn).CloseWrite(); e != nil {
o.fctError(e)
return
} }
} }

View File

@@ -27,17 +27,15 @@
* *
*/ */
package socket package unix
import ( import (
"net" "net"
"os"
"sync/atomic" "sync/atomic"
"time" "time"
)
const ( libsck "github.com/nabbar/golib/socket"
defaultTimeoutRead = time.Second
defaultTimeoutWrite = 5 * time.Second
) )
var ( var (
@@ -55,12 +53,15 @@ type srv struct {
h *atomic.Value // handler h *atomic.Value // handler
c *atomic.Value // chan []byte c *atomic.Value // chan []byte
s *atomic.Value // chan struct{} s *atomic.Value // chan struct{}
f *atomic.Value // function error e *atomic.Value // function error
i *atomic.Value // function info
tr *atomic.Value // connection read timeout tr *atomic.Value // connection read timeout
tw *atomic.Value // connection write timeout tw *atomic.Value // connection write timeout
sr *atomic.Int32 // read buffer size sr *atomic.Int32 // read buffer size
sw *atomic.Int32 // write buffer size sw *atomic.Int32 // write buffer size
fs *atomic.Value // file unix socket
fp *atomic.Int64 // file unix perm
} }
func (o *srv) Done() <-chan struct{} { func (o *srv) Done() <-chan struct{} {
@@ -83,12 +84,20 @@ func (o *srv) Shutdown() {
} }
} }
func (o *srv) RegisterFuncError(f FuncError) { func (o *srv) RegisterFuncError(f libsck.FuncError) {
if o == nil { if o == nil {
return return
} }
o.f.Store(f) o.e.Store(f)
}
func (o *srv) RegisterFuncInfo(f libsck.FuncInfo) {
if o == nil {
return
}
o.i.Store(f)
} }
func (o *srv) SetReadTimeout(d time.Duration) { func (o *srv) SetReadTimeout(d time.Duration) {
@@ -107,25 +116,41 @@ func (o *srv) SetWriteTimeout(d time.Duration) {
o.tw.Store(d) o.tw.Store(d)
} }
func (o *srv) RegisterSocket(unixFile string, perm os.FileMode) {
o.fs.Store(unixFile)
o.fp.Store(int64(perm))
}
func (o *srv) fctError(e error) { func (o *srv) fctError(e error) {
if o == nil { if o == nil {
return return
} }
v := o.f.Load() v := o.e.Load()
if v != nil { if v != nil {
v.(FuncError)(e) v.(libsck.FuncError)(e)
} }
} }
func (o *srv) handler() Handler { func (o *srv) fctInfo(local, remote net.Addr, state libsck.ConnState) {
if o == nil {
return
}
v := o.i.Load()
if v != nil {
v.(libsck.FuncInfo)(local, remote, state)
}
}
func (o *srv) handler() libsck.Handler {
if o == nil { if o == nil {
return nil return nil
} }
v := o.h.Load() v := o.h.Load()
if v != nil { if v != nil {
return v.(Handler) return v.(libsck.Handler)
} }
return nil return nil