Package Socket/Server:

- fix UDP as kernel SOCKGRAM
- add unixgram as kernel SOCKGRAM for local socket file
- optimize parallel statment
- optimize code / minor bugs fix
- add IsRunning function
- add timeout shutdown based on periodicly check of IsRunning
- fix close channel before send empty struct to stop process
- optimize clean shutdown
- optimize connection process to use less resources and lock pipeline less time
This commit is contained in:
Nicolas JUHEL
2024-02-05 15:03:58 +01:00
parent f7fd908a14
commit c1fc4e816e
17 changed files with 464 additions and 164 deletions

View File

@@ -35,6 +35,7 @@ import (
)
const DefaultBufferSize = 32 * 1024
const EOL byte = '\n'
type ConnState uint8
@@ -79,13 +80,16 @@ type Handler func(request io.Reader, response io.Writer)
type Response func(r io.Reader)
type Server interface {
io.Closer
RegisterFuncError(f FuncError)
RegisterFuncInfo(f FuncInfo)
RegisterFuncInfoServer(f FuncInfoSrv)
SetTLS(enable bool, config libtls.TLSConfig) error
Listen(ctx context.Context) error
Shutdown()
Shutdown() error
IsRunning() bool
Done() <-chan struct{}
}

View File

@@ -30,8 +30,9 @@ import "fmt"
var (
ErrInvalidAddress = fmt.Errorf("invalid listen address")
ErrInvalidHostName = fmt.Errorf("invalid server host name")
ErrInvalidHostPort = fmt.Errorf("invalid server host port")
ErrContextClosed = fmt.Errorf("context closed")
ErrServerClosed = fmt.Errorf("server closed")
ErrInvalidHandler = fmt.Errorf("invalid handler")
ErrShutdownTimeout = fmt.Errorf("timeout on stopping socket")
ErrInvalidInstance = fmt.Errorf("invalid socket instance")
)

View File

@@ -57,6 +57,7 @@ func New(h libsck.Handler, sizeBuffRead libsiz.Size) ServerTcp {
h: f,
c: c,
s: s,
r: new(atomic.Bool),
fe: new(atomic.Value),
fi: new(atomic.Value),
fs: new(atomic.Value),

View File

@@ -33,6 +33,7 @@ import (
"crypto/tls"
"io"
"net"
"sync/atomic"
libptc "github.com/nabbar/golib/network/protocol"
libsck "github.com/nabbar/golib/socket"
@@ -80,46 +81,69 @@ func (o *srv) Listen(ctx context.Context) error {
e error
l net.Listener
a = o.getAddress()
s = new(atomic.Bool)
)
var fctClose = func() {
o.fctInfoSrv("closing listen socket '%s %s'", libptc.NetworkTCP.String(), a)
if l != nil {
o.fctError(l.Close())
}
}
defer fctClose()
if len(a) == 0 {
return ErrInvalidAddress
} else if hdl := o.handler(); hdl == nil {
return ErrInvalidHandler
} else if l, e = o.getListen(a); e != nil {
o.fctError(e)
return e
}
var fctClose = func() {
o.fctInfoSrv("closing listen socket '%s %s'", libptc.NetworkTCP.String(), a)
if l != nil {
_ = l.Close()
}
o.r.Store(false)
}
defer fctClose()
s.Store(false)
go func() {
<-ctx.Done()
go func() {
_ = o.Shutdown()
}()
return
}()
go func() {
<-o.Done()
e = nil
s.Store(true)
if l != nil {
o.fctError(l.Close())
}
return
}()
o.r.Store(true)
// Accept new connection or stop if context or shutdown trigger
for {
select {
case <-ctx.Done():
return ErrContextClosed
case <-o.Done():
return nil
default:
// Accept an incoming connection.
if l == nil {
return ErrServerClosed
} else if s.Load() {
return e
}
co, ce := l.Accept()
if ce != nil {
if co, ce := l.Accept(); ce != nil && !s.Load() {
o.fctError(ce)
} else {
o.fctInfo(co.LocalAddr(), co.RemoteAddr(), libsck.ConnectionNew)
go o.Conn(co)
}
}
}
}
func (o *srv) Conn(conn net.Conn) {
@@ -128,12 +152,10 @@ func (o *srv) Conn(conn net.Conn) {
_ = conn.Close()
}()
o.fctInfo(conn.LocalAddr(), conn.RemoteAddr(), libsck.ConnectionNew)
var (
err error
rdr = bufio.NewReaderSize(conn, o.buffSize())
buf []byte
msg []byte
hdl libsck.Handler
)
@@ -142,7 +164,7 @@ func (o *srv) Conn(conn net.Conn) {
}
for {
buf, err = rdr.ReadBytes('\n')
msg, err = rdr.ReadBytes('\n')
o.fctInfo(conn.LocalAddr(), conn.RemoteAddr(), libsck.ConnectionRead)
if err != nil {
@@ -152,7 +174,13 @@ func (o *srv) Conn(conn net.Conn) {
break
}
var buf = bytes.NewBuffer(msg)
if !bytes.HasSuffix(msg, []byte{libsck.EOL}) {
buf.Write([]byte{libsck.EOL})
}
o.fctInfo(conn.LocalAddr(), conn.RemoteAddr(), libsck.ConnectionHandler)
hdl(bytes.NewBuffer(buf), conn)
hdl(buf, conn)
}
}

View File

@@ -27,10 +27,12 @@
package tcp
import (
"context"
"crypto/tls"
"fmt"
"net"
"sync/atomic"
"time"
libptc "github.com/nabbar/golib/network/protocol"
@@ -58,6 +60,7 @@ type srv struct {
h *atomic.Value // handler
c *atomic.Value // chan []byte
s *atomic.Value // chan struct{}
r *atomic.Bool // is Running
fe *atomic.Value // function error
fi *atomic.Value // function info
@@ -67,6 +70,10 @@ type srv struct {
ad *atomic.Value // Server address url
}
func (o *srv) IsRunning() bool {
return o.r.Load()
}
func (o *srv) Done() <-chan struct{} {
s := o.s.Load()
if s != nil {
@@ -76,14 +83,47 @@ func (o *srv) Done() <-chan struct{} {
return closedChanStruct
}
func (o *srv) Shutdown() {
func (o *srv) Close() error {
return o.Shutdown()
}
func (o *srv) Shutdown() error {
if o == nil {
return
return ErrInvalidInstance
}
s := o.s.Load()
if s != nil {
o.s.Store(nil)
if c, k := s.(chan struct{}); k {
c <- struct{}{}
o.s.Store(c)
}
}
var (
tck = time.NewTicker(100 * time.Millisecond)
ctx, cnl = context.WithTimeout(context.Background(), 25*time.Second)
)
defer func() {
if s != nil {
o.s.Store(closedChanStruct)
}
tck.Stop()
cnl()
}()
for {
select {
case <-ctx.Done():
return ErrShutdownTimeout
case <-tck.C:
if o.IsRunning() {
continue
}
return nil
}
}
}

View File

@@ -33,4 +33,6 @@ var (
ErrContextClosed = fmt.Errorf("context closed")
ErrServerClosed = fmt.Errorf("server closed")
ErrInvalidHandler = fmt.Errorf("invalid handler")
ErrShutdownTimeout = fmt.Errorf("timeout on stopping socket")
ErrInvalidInstance = fmt.Errorf("invalid socket instance")
)

View File

@@ -56,6 +56,7 @@ func New(h libsck.Handler, sizeBuffRead libsiz.Size) ServerTcp {
h: f,
c: c,
s: s,
r: new(atomic.Bool),
fe: new(atomic.Value),
fi: new(atomic.Value),
fs: new(atomic.Value),

View File

@@ -31,6 +31,7 @@ import (
"context"
"io"
"net"
"sync/atomic"
libptc "github.com/nabbar/golib/network/protocol"
libsck "github.com/nabbar/golib/socket"
@@ -78,6 +79,7 @@ func (o *srv) Listen(ctx context.Context) error {
err error
nbr int
loc *net.UDPAddr
stp = new(atomic.Bool)
rem net.Addr
con *net.UDPConn
adr = o.getAddress()
@@ -92,37 +94,63 @@ func (o *srv) Listen(ctx context.Context) error {
return err
}
var fctClose = func() {
o.fctInfoSrv("closing listen socket '%s %s'", libptc.NetworkUDP.String(), adr)
if con != nil {
o.fctError(con.Close())
}
}
defer fctClose()
if con, err = net.ListenUDP(libptc.NetworkUDP.Code(), loc); err != nil {
o.fctError(err)
return err
} else {
o.fctInfoSrv("starting listening socket '%s %s'", libptc.NetworkUDP.String(), adr)
}
var fctClose = func() {
o.fctInfoSrv("closing listen socket '%s %s'", libptc.NetworkUDP.String(), adr)
if con != nil {
_ = con.Close()
}
o.r.Store(false)
}
defer fctClose()
stp.Store(false)
go func() {
<-ctx.Done()
go func() {
_ = o.Shutdown()
}()
return
}()
go func() {
<-o.Done()
err = nil
stp.Store(true)
if con != nil {
o.fctError(con.Close())
}
return
}()
o.r.Store(true)
o.fctInfoSrv("starting listening socket '%s %s'", libptc.NetworkUDP.String(), adr)
// Accept new connection or stop if context or shutdown trigger
for {
select {
case <-ctx.Done():
return ErrContextClosed
case <-o.Done():
return nil
default:
// Accept an incoming connection.
if con == nil {
return ErrServerClosed
} else if stp.Load() {
return err
}
var buf = make([]byte, o.buffSize())
nbr, rem, err = con.ReadFrom(buf)
var (
buf = make([]byte, o.buffSize())
rer error
)
nbr, rem, rer = con.ReadFrom(buf)
if rem == nil {
rem = &net.UDPAddr{}
@@ -130,17 +158,22 @@ func (o *srv) Listen(ctx context.Context) error {
o.fctInfo(loc, rem, libsck.ConnectionRead)
if err != nil {
o.fctError(err)
if rer != nil {
if !stp.Load() {
o.fctError(rer)
}
continue
}
go func(la, ra net.Addr, b []byte) {
o.fctInfo(la, ra, libsck.ConnectionHandler)
r := bytes.NewBuffer(b)
r.WriteString("\n")
if !bytes.HasSuffix(b, []byte{libsck.EOL}) {
r.Write([]byte{libsck.EOL})
}
hdl(r, io.Discard)
}(loc, rem, buf[:nbr])
}
}
}

View File

@@ -27,9 +27,11 @@
package udp
import (
"context"
"fmt"
"net"
"sync/atomic"
"time"
libptc "github.com/nabbar/golib/network/protocol"
@@ -52,6 +54,7 @@ type srv struct {
h *atomic.Value // handler
c *atomic.Value // chan []byte
s *atomic.Value // chan struct{}
r *atomic.Bool // is Running
fe *atomic.Value // function error
fi *atomic.Value // function info
@@ -61,6 +64,10 @@ type srv struct {
ad *atomic.Value // Server address url
}
func (o *srv) IsRunning() bool {
return o.r.Load()
}
func (o *srv) Done() <-chan struct{} {
s := o.s.Load()
if s != nil {
@@ -70,14 +77,47 @@ func (o *srv) Done() <-chan struct{} {
return closedChanStruct
}
func (o *srv) Shutdown() {
func (o *srv) Close() error {
return o.Shutdown()
}
func (o *srv) Shutdown() error {
if o == nil {
return
return ErrInvalidInstance
}
s := o.s.Load()
if s != nil {
o.s.Store(nil)
if c, k := s.(chan struct{}); k {
c <- struct{}{}
o.s.Store(c)
}
}
var (
tck = time.NewTicker(100 * time.Millisecond)
ctx, cnl = context.WithTimeout(context.Background(), 25*time.Second)
)
defer func() {
if s != nil {
o.s.Store(closedChanStruct)
}
tck.Stop()
cnl()
}()
for {
select {
case <-ctx.Done():
return ErrShutdownTimeout
case <-tck.C:
if o.IsRunning() {
continue
}
return nil
}
}
}

View File

@@ -36,4 +36,6 @@ var (
ErrServerClosed = fmt.Errorf("server closed")
ErrInvalidGroup = fmt.Errorf("invalid unix group for socket group permission")
ErrInvalidHandler = fmt.Errorf("invalid handler")
ErrShutdownTimeout = fmt.Errorf("timeout on stopping socket")
ErrInvalidInstance = fmt.Errorf("invalid socket instance")
)

View File

@@ -75,6 +75,7 @@ func New(h libsck.Handler, sizeBuffRead libsiz.Size) ServerUnix {
h: f,
c: c,
s: s,
r: new(atomic.Bool),
fe: new(atomic.Value),
fi: new(atomic.Value),
fs: new(atomic.Value),

View File

@@ -40,6 +40,7 @@ import (
"net"
"os"
"path/filepath"
"sync/atomic"
"syscall"
libptc "github.com/nabbar/golib/network/protocol"
@@ -156,47 +157,74 @@ func (o *srv) Listen(ctx context.Context) error {
e error
f string
l net.Listener
s = new(atomic.Bool)
)
if f, e = o.getSocketFile(); e != nil {
return e
}
var fctClose = func() {
if l != nil {
o.fctError(l.Close())
}
if _, e = os.Stat(f); e == nil {
o.fctError(os.Remove(f))
}
} else if hdl := o.handler(); hdl == nil {
return ErrInvalidHandler
}
if l, e = o.getListen(f); e != nil {
return e
}
defer fctClose()
var fctClose = func() {
o.fctInfoSrv("closing listen socket '%s %s'", libptc.NetworkUnixGram.String(), f)
if l != nil {
_ = l.Close()
}
if _, e = os.Stat(f); e == nil {
o.fctError(os.Remove(f))
}
o.r.Store(false)
}
defer fctClose()
s.Store(false)
go func() {
<-ctx.Done()
go func() {
_ = o.Shutdown()
}()
return
}()
go func() {
<-o.Done()
e = nil
s.Store(true)
if l != nil {
o.fctError(l.Close())
}
return
}()
o.r.Store(true)
// Accept new connection or stop if context or shutdown trigger
for {
select {
case <-ctx.Done():
return ErrContextClosed
case <-o.Done():
return nil
default:
// Accept an incoming connection.
if l == nil {
return ErrServerClosed
} else if co, ce := l.Accept(); ce != nil {
} else if s.Load() {
return e
}
if co, ce := l.Accept(); ce != nil && !s.Load() {
o.fctError(ce)
} else {
o.fctInfo(co.LocalAddr(), co.RemoteAddr(), libsck.ConnectionNew)
go o.Conn(co)
}
}
}
}
func (o *srv) Conn(con net.Conn) {
@@ -205,12 +233,10 @@ func (o *srv) Conn(con net.Conn) {
_ = con.Close()
}()
o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionNew)
var (
err error
rdr = bufio.NewReaderSize(con, o.buffSize())
buf []byte
msg []byte
hdl libsck.Handler
)
@@ -219,7 +245,7 @@ func (o *srv) Conn(con net.Conn) {
}
for {
buf, err = rdr.ReadBytes('\n')
msg, err = rdr.ReadBytes('\n')
o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionRead)
if err != nil {
@@ -229,7 +255,13 @@ func (o *srv) Conn(con net.Conn) {
break
}
var buf = bytes.NewBuffer(msg)
if !bytes.HasSuffix(msg, []byte{libsck.EOL}) {
buf.Write([]byte{libsck.EOL})
}
o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionHandler)
hdl(bytes.NewBuffer(buf), con)
hdl(buf, con)
}
}

View File

@@ -30,10 +30,12 @@
package unix
import (
"context"
"fmt"
"net"
"os"
"sync/atomic"
"time"
libptc "github.com/nabbar/golib/network/protocol"
@@ -56,6 +58,7 @@ type srv struct {
h *atomic.Value // handler
c *atomic.Value // chan []byte
s *atomic.Value // chan struct{}
r *atomic.Bool // is Running
fe *atomic.Value // function error
fi *atomic.Value // function info
@@ -67,6 +70,10 @@ type srv struct {
sg *atomic.Int32 // file unix group perm
}
func (o *srv) IsRunning() bool {
return o.r.Load()
}
func (o *srv) Done() <-chan struct{} {
s := o.s.Load()
if s != nil {
@@ -76,14 +83,47 @@ func (o *srv) Done() <-chan struct{} {
return closedChanStruct
}
func (o *srv) Shutdown() {
func (o *srv) Close() error {
return o.Shutdown()
}
func (o *srv) Shutdown() error {
if o == nil {
return
return ErrInvalidInstance
}
s := o.s.Load()
if s != nil {
o.s.Store(nil)
if c, k := s.(chan struct{}); k {
c <- struct{}{}
o.s.Store(c)
}
}
var (
tck = time.NewTicker(100 * time.Millisecond)
ctx, cnl = context.WithTimeout(context.Background(), 25*time.Second)
)
defer func() {
if s != nil {
o.s.Store(closedChanStruct)
}
tck.Stop()
cnl()
}()
for {
select {
case <-ctx.Done():
return ErrShutdownTimeout
case <-tck.C:
if o.IsRunning() {
continue
}
return nil
}
}
}

View File

@@ -36,4 +36,6 @@ var (
ErrServerClosed = fmt.Errorf("server closed")
ErrInvalidGroup = fmt.Errorf("invalid unix group for socket group permission")
ErrInvalidHandler = fmt.Errorf("invalid handler")
ErrShutdownTimeout = fmt.Errorf("timeout on stopping socket")
ErrInvalidInstance = fmt.Errorf("invalid socket instance")
)

View File

@@ -75,6 +75,7 @@ func New(h libsck.Handler, sizeBuffRead libsiz.Size) ServerUnixGram {
h: f,
c: c,
s: s,
r: new(atomic.Bool),
fe: new(atomic.Value),
fi: new(atomic.Value),
fs: new(atomic.Value),

View File

@@ -39,6 +39,7 @@ import (
"net"
"os"
"path/filepath"
"sync/atomic"
"syscall"
libptc "github.com/nabbar/golib/network/protocol"
@@ -151,6 +152,7 @@ func (o *srv) Listen(ctx context.Context) error {
err error
nbr int
uxf string
stp = new(atomic.Bool)
loc *net.UnixAddr
rem net.Addr
con *net.UnixConn
@@ -168,9 +170,10 @@ func (o *srv) Listen(ctx context.Context) error {
}
var fctClose = func() {
if con != nil {
o.fctInfoSrv("closing listen socket '%s %s'", libptc.NetworkUnixGram.String(), uxf)
o.fctError(con.Close())
if con != nil {
_ = con.Close()
}
if _, err = os.Stat(uxf); err == nil {
@@ -179,22 +182,45 @@ func (o *srv) Listen(ctx context.Context) error {
}
defer fctClose()
stp.Store(false)
go func() {
<-ctx.Done()
go func() {
_ = o.Shutdown()
}()
return
}()
go func() {
<-o.Done()
err = nil
stp.Store(true)
if con != nil {
o.fctError(con.Close())
}
return
}()
o.r.Store(true)
// Accept new connection or stop if context or shutdown trigger
for {
select {
case <-ctx.Done():
return ErrContextClosed
case <-o.Done():
return nil
default:
// Accept an incoming connection.
if con == nil {
return ErrServerClosed
} else if stp.Load() {
return err
}
var buf = make([]byte, o.buffSize())
nbr, rem, err = con.ReadFrom(buf)
var (
buf = make([]byte, o.buffSize())
rer error
)
nbr, rem, rer = con.ReadFrom(buf)
if rem == nil {
rem = &net.UnixAddr{}
@@ -202,17 +228,23 @@ func (o *srv) Listen(ctx context.Context) error {
o.fctInfo(loc, rem, libsck.ConnectionRead)
if err != nil {
o.fctError(err)
if rer != nil {
if !stp.Load() {
o.fctError(rer)
}
continue
}
go func(la, ra net.Addr, b []byte) {
o.fctInfo(la, ra, libsck.ConnectionHandler)
r := bytes.NewBuffer(b)
r.WriteString("\n")
if !bytes.HasSuffix(b, []byte{libsck.EOL}) {
r.Write([]byte{libsck.EOL})
}
hdl(r, io.Discard)
}(loc, rem, buf[:nbr])
}
}
}

View File

@@ -30,10 +30,12 @@
package unixgram
import (
"context"
"fmt"
"net"
"os"
"sync/atomic"
"time"
libptc "github.com/nabbar/golib/network/protocol"
@@ -56,6 +58,7 @@ type srv struct {
h *atomic.Value // handler
c *atomic.Value // chan []byte
s *atomic.Value // chan struct{}
r *atomic.Bool // is Running
fe *atomic.Value // function error
fi *atomic.Value // function info
@@ -67,6 +70,10 @@ type srv struct {
sg *atomic.Int32 // file unix group perm
}
func (o *srv) IsRunning() bool {
return o.r.Load()
}
func (o *srv) Done() <-chan struct{} {
s := o.s.Load()
if s != nil {
@@ -76,14 +83,47 @@ func (o *srv) Done() <-chan struct{} {
return closedChanStruct
}
func (o *srv) Shutdown() {
func (o *srv) Close() error {
return o.Shutdown()
}
func (o *srv) Shutdown() error {
if o == nil {
return
return ErrInvalidInstance
}
s := o.s.Load()
if s != nil {
o.s.Store(nil)
if c, k := s.(chan struct{}); k {
c <- struct{}{}
o.s.Store(c)
}
}
var (
tck = time.NewTicker(100 * time.Millisecond)
ctx, cnl = context.WithTimeout(context.Background(), 25*time.Second)
)
defer func() {
if s != nil {
o.s.Store(closedChanStruct)
}
tck.Stop()
cnl()
}()
for {
select {
case <-ctx.Done():
return ErrShutdownTimeout
case <-tck.C:
if o.IsRunning() {
continue
}
return nil
}
}
}