From c1fc4e816e2355f34cede15a16b8205fa326a57e Mon Sep 17 00:00:00 2001 From: Nicolas JUHEL Date: Mon, 5 Feb 2024 15:03:58 +0100 Subject: [PATCH] Package Socket/Server: - fix UDP as kernel SOCKGRAM - add unixgram as kernel SOCKGRAM for local socket file - optimize parallel statment - optimize code / minor bugs fix - add IsRunning function - add timeout shutdown based on periodicly check of IsRunning - fix close channel before send empty struct to stop process - optimize clean shutdown - optimize connection process to use less resources and lock pipeline less time --- socket/interface.go | 6 +- socket/server/tcp/error.go | 5 +- socket/server/tcp/inerface.go | 1 + socket/server/tcp/listener.go | 90 ++++++++++++++-------- socket/server/tcp/model.go | 46 +++++++++++- socket/server/udp/error.go | 10 ++- socket/server/udp/inerface.go | 1 + socket/server/udp/listener.go | 117 ++++++++++++++++++----------- socket/server/udp/model.go | 46 +++++++++++- socket/server/unix/error.go | 10 ++- socket/server/unix/inerface.go | 1 + socket/server/unix/listener.go | 94 +++++++++++++++-------- socket/server/unix/model.go | 46 +++++++++++- socket/server/unixgram/error.go | 10 ++- socket/server/unixgram/inerface.go | 1 + socket/server/unixgram/listener.go | 98 ++++++++++++++++-------- socket/server/unixgram/model.go | 46 +++++++++++- 17 files changed, 464 insertions(+), 164 deletions(-) diff --git a/socket/interface.go b/socket/interface.go index 16ce06c..8f359a8 100644 --- a/socket/interface.go +++ b/socket/interface.go @@ -35,6 +35,7 @@ import ( ) const DefaultBufferSize = 32 * 1024 +const EOL byte = '\n' type ConnState uint8 @@ -79,13 +80,16 @@ type Handler func(request io.Reader, response io.Writer) type Response func(r io.Reader) type Server interface { + io.Closer + RegisterFuncError(f FuncError) RegisterFuncInfo(f FuncInfo) RegisterFuncInfoServer(f FuncInfoSrv) SetTLS(enable bool, config libtls.TLSConfig) error Listen(ctx context.Context) error - Shutdown() + Shutdown() error + IsRunning() bool Done() <-chan struct{} } diff --git a/socket/server/tcp/error.go b/socket/server/tcp/error.go index ee2abb3..84e7328 100644 --- a/socket/server/tcp/error.go +++ b/socket/server/tcp/error.go @@ -30,8 +30,9 @@ import "fmt" var ( ErrInvalidAddress = fmt.Errorf("invalid listen address") - ErrInvalidHostName = fmt.Errorf("invalid server host name") - ErrInvalidHostPort = fmt.Errorf("invalid server host port") ErrContextClosed = fmt.Errorf("context closed") ErrServerClosed = fmt.Errorf("server closed") + ErrInvalidHandler = fmt.Errorf("invalid handler") + ErrShutdownTimeout = fmt.Errorf("timeout on stopping socket") + ErrInvalidInstance = fmt.Errorf("invalid socket instance") ) diff --git a/socket/server/tcp/inerface.go b/socket/server/tcp/inerface.go index a0cf9af..22222bd 100644 --- a/socket/server/tcp/inerface.go +++ b/socket/server/tcp/inerface.go @@ -57,6 +57,7 @@ func New(h libsck.Handler, sizeBuffRead libsiz.Size) ServerTcp { h: f, c: c, s: s, + r: new(atomic.Bool), fe: new(atomic.Value), fi: new(atomic.Value), fs: new(atomic.Value), diff --git a/socket/server/tcp/listener.go b/socket/server/tcp/listener.go index ced813c..d2e99e9 100644 --- a/socket/server/tcp/listener.go +++ b/socket/server/tcp/listener.go @@ -33,6 +33,7 @@ import ( "crypto/tls" "io" "net" + "sync/atomic" libptc "github.com/nabbar/golib/network/protocol" libsck "github.com/nabbar/golib/socket" @@ -80,44 +81,67 @@ func (o *srv) Listen(ctx context.Context) error { e error l net.Listener a = o.getAddress() + s = new(atomic.Bool) ) - var fctClose = func() { - o.fctInfoSrv("closing listen socket '%s %s'", libptc.NetworkTCP.String(), a) - if l != nil { - o.fctError(l.Close()) - } - } - - defer fctClose() - if len(a) == 0 { return ErrInvalidAddress + } else if hdl := o.handler(); hdl == nil { + return ErrInvalidHandler } else if l, e = o.getListen(a); e != nil { o.fctError(e) return e } + var fctClose = func() { + o.fctInfoSrv("closing listen socket '%s %s'", libptc.NetworkTCP.String(), a) + + if l != nil { + _ = l.Close() + } + + o.r.Store(false) + } + + defer fctClose() + s.Store(false) + + go func() { + <-ctx.Done() + go func() { + _ = o.Shutdown() + }() + return + }() + + go func() { + <-o.Done() + + e = nil + s.Store(true) + + if l != nil { + o.fctError(l.Close()) + } + + return + }() + + o.r.Store(true) // Accept new connection or stop if context or shutdown trigger for { - select { - case <-ctx.Done(): - return ErrContextClosed - case <-o.Done(): - return nil - default: - // Accept an incoming connection. - if l == nil { - return ErrServerClosed - } + // Accept an incoming connection. + if l == nil { + return ErrServerClosed + } else if s.Load() { + return e + } - co, ce := l.Accept() - - if ce != nil { - o.fctError(ce) - } else { - go o.Conn(co) - } + if co, ce := l.Accept(); ce != nil && !s.Load() { + o.fctError(ce) + } else { + o.fctInfo(co.LocalAddr(), co.RemoteAddr(), libsck.ConnectionNew) + go o.Conn(co) } } } @@ -128,12 +152,10 @@ func (o *srv) Conn(conn net.Conn) { _ = conn.Close() }() - o.fctInfo(conn.LocalAddr(), conn.RemoteAddr(), libsck.ConnectionNew) - var ( err error rdr = bufio.NewReaderSize(conn, o.buffSize()) - buf []byte + msg []byte hdl libsck.Handler ) @@ -142,7 +164,7 @@ func (o *srv) Conn(conn net.Conn) { } for { - buf, err = rdr.ReadBytes('\n') + msg, err = rdr.ReadBytes('\n') o.fctInfo(conn.LocalAddr(), conn.RemoteAddr(), libsck.ConnectionRead) if err != nil { @@ -152,7 +174,13 @@ func (o *srv) Conn(conn net.Conn) { break } + var buf = bytes.NewBuffer(msg) + + if !bytes.HasSuffix(msg, []byte{libsck.EOL}) { + buf.Write([]byte{libsck.EOL}) + } + o.fctInfo(conn.LocalAddr(), conn.RemoteAddr(), libsck.ConnectionHandler) - hdl(bytes.NewBuffer(buf), conn) + hdl(buf, conn) } } diff --git a/socket/server/tcp/model.go b/socket/server/tcp/model.go index cfed404..707fc1d 100644 --- a/socket/server/tcp/model.go +++ b/socket/server/tcp/model.go @@ -27,10 +27,12 @@ package tcp import ( + "context" "crypto/tls" "fmt" "net" "sync/atomic" + "time" libptc "github.com/nabbar/golib/network/protocol" @@ -58,6 +60,7 @@ type srv struct { h *atomic.Value // handler c *atomic.Value // chan []byte s *atomic.Value // chan struct{} + r *atomic.Bool // is Running fe *atomic.Value // function error fi *atomic.Value // function info @@ -67,6 +70,10 @@ type srv struct { ad *atomic.Value // Server address url } +func (o *srv) IsRunning() bool { + return o.r.Load() +} + func (o *srv) Done() <-chan struct{} { s := o.s.Load() if s != nil { @@ -76,14 +83,47 @@ func (o *srv) Done() <-chan struct{} { return closedChanStruct } -func (o *srv) Shutdown() { +func (o *srv) Close() error { + return o.Shutdown() +} + +func (o *srv) Shutdown() error { if o == nil { - return + return ErrInvalidInstance } s := o.s.Load() if s != nil { - o.s.Store(nil) + if c, k := s.(chan struct{}); k { + c <- struct{}{} + o.s.Store(c) + } + } + + var ( + tck = time.NewTicker(100 * time.Millisecond) + ctx, cnl = context.WithTimeout(context.Background(), 25*time.Second) + ) + + defer func() { + if s != nil { + o.s.Store(closedChanStruct) + } + + tck.Stop() + cnl() + }() + + for { + select { + case <-ctx.Done(): + return ErrShutdownTimeout + case <-tck.C: + if o.IsRunning() { + continue + } + return nil + } } } diff --git a/socket/server/udp/error.go b/socket/server/udp/error.go index afabc1a..e69288d 100644 --- a/socket/server/udp/error.go +++ b/socket/server/udp/error.go @@ -29,8 +29,10 @@ package udp import "fmt" var ( - ErrInvalidAddress = fmt.Errorf("invalid listen address") - ErrContextClosed = fmt.Errorf("context closed") - ErrServerClosed = fmt.Errorf("server closed") - ErrInvalidHandler = fmt.Errorf("invalid handler") + ErrInvalidAddress = fmt.Errorf("invalid listen address") + ErrContextClosed = fmt.Errorf("context closed") + ErrServerClosed = fmt.Errorf("server closed") + ErrInvalidHandler = fmt.Errorf("invalid handler") + ErrShutdownTimeout = fmt.Errorf("timeout on stopping socket") + ErrInvalidInstance = fmt.Errorf("invalid socket instance") ) diff --git a/socket/server/udp/inerface.go b/socket/server/udp/inerface.go index d983300..07c3042 100644 --- a/socket/server/udp/inerface.go +++ b/socket/server/udp/inerface.go @@ -56,6 +56,7 @@ func New(h libsck.Handler, sizeBuffRead libsiz.Size) ServerTcp { h: f, c: c, s: s, + r: new(atomic.Bool), fe: new(atomic.Value), fi: new(atomic.Value), fs: new(atomic.Value), diff --git a/socket/server/udp/listener.go b/socket/server/udp/listener.go index 92df4b8..62664ae 100644 --- a/socket/server/udp/listener.go +++ b/socket/server/udp/listener.go @@ -31,6 +31,7 @@ import ( "context" "io" "net" + "sync/atomic" libptc "github.com/nabbar/golib/network/protocol" libsck "github.com/nabbar/golib/socket" @@ -78,6 +79,7 @@ func (o *srv) Listen(ctx context.Context) error { err error nbr int loc *net.UDPAddr + stp = new(atomic.Bool) rem net.Addr con *net.UDPConn adr = o.getAddress() @@ -92,55 +94,86 @@ func (o *srv) Listen(ctx context.Context) error { return err } - var fctClose = func() { - o.fctInfoSrv("closing listen socket '%s %s'", libptc.NetworkUDP.String(), adr) - if con != nil { - o.fctError(con.Close()) - } - } - - defer fctClose() - if con, err = net.ListenUDP(libptc.NetworkUDP.Code(), loc); err != nil { o.fctError(err) return err - } else { - o.fctInfoSrv("starting listening socket '%s %s'", libptc.NetworkUDP.String(), adr) } + var fctClose = func() { + o.fctInfoSrv("closing listen socket '%s %s'", libptc.NetworkUDP.String(), adr) + + if con != nil { + _ = con.Close() + } + + o.r.Store(false) + } + + defer fctClose() + stp.Store(false) + + go func() { + <-ctx.Done() + go func() { + _ = o.Shutdown() + }() + return + }() + + go func() { + <-o.Done() + + err = nil + stp.Store(true) + + if con != nil { + o.fctError(con.Close()) + } + + return + }() + + o.r.Store(true) + + o.fctInfoSrv("starting listening socket '%s %s'", libptc.NetworkUDP.String(), adr) // Accept new connection or stop if context or shutdown trigger for { - select { - case <-ctx.Done(): - return ErrContextClosed - case <-o.Done(): - return nil - default: - // Accept an incoming connection. - if con == nil { - return ErrServerClosed - } - - var buf = make([]byte, o.buffSize()) - nbr, rem, err = con.ReadFrom(buf) - - if rem == nil { - rem = &net.UDPAddr{} - } - - o.fctInfo(loc, rem, libsck.ConnectionRead) - - if err != nil { - o.fctError(err) - continue - } - - go func(la, ra net.Addr, b []byte) { - o.fctInfo(la, ra, libsck.ConnectionHandler) - r := bytes.NewBuffer(b) - r.WriteString("\n") - hdl(r, io.Discard) - }(loc, rem, buf[:nbr]) + // Accept an incoming connection. + if con == nil { + return ErrServerClosed + } else if stp.Load() { + return err } + + var ( + buf = make([]byte, o.buffSize()) + rer error + ) + + nbr, rem, rer = con.ReadFrom(buf) + + if rem == nil { + rem = &net.UDPAddr{} + } + + o.fctInfo(loc, rem, libsck.ConnectionRead) + + if rer != nil { + if !stp.Load() { + o.fctError(rer) + } + continue + } + + go func(la, ra net.Addr, b []byte) { + o.fctInfo(la, ra, libsck.ConnectionHandler) + + r := bytes.NewBuffer(b) + if !bytes.HasSuffix(b, []byte{libsck.EOL}) { + r.Write([]byte{libsck.EOL}) + } + + hdl(r, io.Discard) + }(loc, rem, buf[:nbr]) } } diff --git a/socket/server/udp/model.go b/socket/server/udp/model.go index 15a84d9..b74a69b 100644 --- a/socket/server/udp/model.go +++ b/socket/server/udp/model.go @@ -27,9 +27,11 @@ package udp import ( + "context" "fmt" "net" "sync/atomic" + "time" libptc "github.com/nabbar/golib/network/protocol" @@ -52,6 +54,7 @@ type srv struct { h *atomic.Value // handler c *atomic.Value // chan []byte s *atomic.Value // chan struct{} + r *atomic.Bool // is Running fe *atomic.Value // function error fi *atomic.Value // function info @@ -61,6 +64,10 @@ type srv struct { ad *atomic.Value // Server address url } +func (o *srv) IsRunning() bool { + return o.r.Load() +} + func (o *srv) Done() <-chan struct{} { s := o.s.Load() if s != nil { @@ -70,14 +77,47 @@ func (o *srv) Done() <-chan struct{} { return closedChanStruct } -func (o *srv) Shutdown() { +func (o *srv) Close() error { + return o.Shutdown() +} + +func (o *srv) Shutdown() error { if o == nil { - return + return ErrInvalidInstance } s := o.s.Load() if s != nil { - o.s.Store(nil) + if c, k := s.(chan struct{}); k { + c <- struct{}{} + o.s.Store(c) + } + } + + var ( + tck = time.NewTicker(100 * time.Millisecond) + ctx, cnl = context.WithTimeout(context.Background(), 25*time.Second) + ) + + defer func() { + if s != nil { + o.s.Store(closedChanStruct) + } + + tck.Stop() + cnl() + }() + + for { + select { + case <-ctx.Done(): + return ErrShutdownTimeout + case <-tck.C: + if o.IsRunning() { + continue + } + return nil + } } } diff --git a/socket/server/unix/error.go b/socket/server/unix/error.go index 2f0e184..e10cf8e 100644 --- a/socket/server/unix/error.go +++ b/socket/server/unix/error.go @@ -32,8 +32,10 @@ package unix import "fmt" var ( - ErrContextClosed = fmt.Errorf("context closed") - ErrServerClosed = fmt.Errorf("server closed") - ErrInvalidGroup = fmt.Errorf("invalid unix group for socket group permission") - ErrInvalidHandler = fmt.Errorf("invalid handler") + ErrContextClosed = fmt.Errorf("context closed") + ErrServerClosed = fmt.Errorf("server closed") + ErrInvalidGroup = fmt.Errorf("invalid unix group for socket group permission") + ErrInvalidHandler = fmt.Errorf("invalid handler") + ErrShutdownTimeout = fmt.Errorf("timeout on stopping socket") + ErrInvalidInstance = fmt.Errorf("invalid socket instance") ) diff --git a/socket/server/unix/inerface.go b/socket/server/unix/inerface.go index 832d8ac..2a21eab 100644 --- a/socket/server/unix/inerface.go +++ b/socket/server/unix/inerface.go @@ -75,6 +75,7 @@ func New(h libsck.Handler, sizeBuffRead libsiz.Size) ServerUnix { h: f, c: c, s: s, + r: new(atomic.Bool), fe: new(atomic.Value), fi: new(atomic.Value), fs: new(atomic.Value), diff --git a/socket/server/unix/listener.go b/socket/server/unix/listener.go index ed66467..606d1c6 100644 --- a/socket/server/unix/listener.go +++ b/socket/server/unix/listener.go @@ -40,6 +40,7 @@ import ( "net" "os" "path/filepath" + "sync/atomic" "syscall" libptc "github.com/nabbar/golib/network/protocol" @@ -156,45 +157,72 @@ func (o *srv) Listen(ctx context.Context) error { e error f string l net.Listener + s = new(atomic.Bool) ) if f, e = o.getSocketFile(); e != nil { return e - } - - var fctClose = func() { - if l != nil { - o.fctError(l.Close()) - } - - if _, e = os.Stat(f); e == nil { - o.fctError(os.Remove(f)) - } + } else if hdl := o.handler(); hdl == nil { + return ErrInvalidHandler } if l, e = o.getListen(f); e != nil { return e } - defer fctClose() + var fctClose = func() { + o.fctInfoSrv("closing listen socket '%s %s'", libptc.NetworkUnixGram.String(), f) + if l != nil { + _ = l.Close() + } + + if _, e = os.Stat(f); e == nil { + o.fctError(os.Remove(f)) + } + + o.r.Store(false) + } + + defer fctClose() + s.Store(false) + + go func() { + <-ctx.Done() + go func() { + _ = o.Shutdown() + }() + return + }() + + go func() { + <-o.Done() + + e = nil + s.Store(true) + + if l != nil { + o.fctError(l.Close()) + } + + return + }() + + o.r.Store(true) // Accept new connection or stop if context or shutdown trigger for { - select { - case <-ctx.Done(): - return ErrContextClosed - case <-o.Done(): - return nil - default: - // Accept an incoming connection. - if l == nil { - return ErrServerClosed - } else if co, ce := l.Accept(); ce != nil { - o.fctError(ce) - } else { - o.fctInfo(co.LocalAddr(), co.RemoteAddr(), libsck.ConnectionNew) - go o.Conn(co) - } + // Accept an incoming connection. + if l == nil { + return ErrServerClosed + } else if s.Load() { + return e + } + + if co, ce := l.Accept(); ce != nil && !s.Load() { + o.fctError(ce) + } else { + o.fctInfo(co.LocalAddr(), co.RemoteAddr(), libsck.ConnectionNew) + go o.Conn(co) } } } @@ -205,12 +233,10 @@ func (o *srv) Conn(con net.Conn) { _ = con.Close() }() - o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionNew) - var ( err error rdr = bufio.NewReaderSize(con, o.buffSize()) - buf []byte + msg []byte hdl libsck.Handler ) @@ -219,7 +245,7 @@ func (o *srv) Conn(con net.Conn) { } for { - buf, err = rdr.ReadBytes('\n') + msg, err = rdr.ReadBytes('\n') o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionRead) if err != nil { @@ -229,7 +255,13 @@ func (o *srv) Conn(con net.Conn) { break } + var buf = bytes.NewBuffer(msg) + + if !bytes.HasSuffix(msg, []byte{libsck.EOL}) { + buf.Write([]byte{libsck.EOL}) + } + o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionHandler) - hdl(bytes.NewBuffer(buf), con) + hdl(buf, con) } } diff --git a/socket/server/unix/model.go b/socket/server/unix/model.go index 8ba1d1e..2cfb5c7 100644 --- a/socket/server/unix/model.go +++ b/socket/server/unix/model.go @@ -30,10 +30,12 @@ package unix import ( + "context" "fmt" "net" "os" "sync/atomic" + "time" libptc "github.com/nabbar/golib/network/protocol" @@ -56,6 +58,7 @@ type srv struct { h *atomic.Value // handler c *atomic.Value // chan []byte s *atomic.Value // chan struct{} + r *atomic.Bool // is Running fe *atomic.Value // function error fi *atomic.Value // function info @@ -67,6 +70,10 @@ type srv struct { sg *atomic.Int32 // file unix group perm } +func (o *srv) IsRunning() bool { + return o.r.Load() +} + func (o *srv) Done() <-chan struct{} { s := o.s.Load() if s != nil { @@ -76,14 +83,47 @@ func (o *srv) Done() <-chan struct{} { return closedChanStruct } -func (o *srv) Shutdown() { +func (o *srv) Close() error { + return o.Shutdown() +} + +func (o *srv) Shutdown() error { if o == nil { - return + return ErrInvalidInstance } s := o.s.Load() if s != nil { - o.s.Store(nil) + if c, k := s.(chan struct{}); k { + c <- struct{}{} + o.s.Store(c) + } + } + + var ( + tck = time.NewTicker(100 * time.Millisecond) + ctx, cnl = context.WithTimeout(context.Background(), 25*time.Second) + ) + + defer func() { + if s != nil { + o.s.Store(closedChanStruct) + } + + tck.Stop() + cnl() + }() + + for { + select { + case <-ctx.Done(): + return ErrShutdownTimeout + case <-tck.C: + if o.IsRunning() { + continue + } + return nil + } } } diff --git a/socket/server/unixgram/error.go b/socket/server/unixgram/error.go index 8ccc85d..c798d2b 100644 --- a/socket/server/unixgram/error.go +++ b/socket/server/unixgram/error.go @@ -32,8 +32,10 @@ package unixgram import "fmt" var ( - ErrContextClosed = fmt.Errorf("context closed") - ErrServerClosed = fmt.Errorf("server closed") - ErrInvalidGroup = fmt.Errorf("invalid unix group for socket group permission") - ErrInvalidHandler = fmt.Errorf("invalid handler") + ErrContextClosed = fmt.Errorf("context closed") + ErrServerClosed = fmt.Errorf("server closed") + ErrInvalidGroup = fmt.Errorf("invalid unix group for socket group permission") + ErrInvalidHandler = fmt.Errorf("invalid handler") + ErrShutdownTimeout = fmt.Errorf("timeout on stopping socket") + ErrInvalidInstance = fmt.Errorf("invalid socket instance") ) diff --git a/socket/server/unixgram/inerface.go b/socket/server/unixgram/inerface.go index 14e073e..0446d67 100644 --- a/socket/server/unixgram/inerface.go +++ b/socket/server/unixgram/inerface.go @@ -75,6 +75,7 @@ func New(h libsck.Handler, sizeBuffRead libsiz.Size) ServerUnixGram { h: f, c: c, s: s, + r: new(atomic.Bool), fe: new(atomic.Value), fi: new(atomic.Value), fs: new(atomic.Value), diff --git a/socket/server/unixgram/listener.go b/socket/server/unixgram/listener.go index cc0a986..e1252ae 100644 --- a/socket/server/unixgram/listener.go +++ b/socket/server/unixgram/listener.go @@ -39,6 +39,7 @@ import ( "net" "os" "path/filepath" + "sync/atomic" "syscall" libptc "github.com/nabbar/golib/network/protocol" @@ -151,6 +152,7 @@ func (o *srv) Listen(ctx context.Context) error { err error nbr int uxf string + stp = new(atomic.Bool) loc *net.UnixAddr rem net.Addr con *net.UnixConn @@ -168,9 +170,10 @@ func (o *srv) Listen(ctx context.Context) error { } var fctClose = func() { + o.fctInfoSrv("closing listen socket '%s %s'", libptc.NetworkUnixGram.String(), uxf) + if con != nil { - o.fctInfoSrv("closing listen socket '%s %s'", libptc.NetworkUnixGram.String(), uxf) - o.fctError(con.Close()) + _ = con.Close() } if _, err = os.Stat(uxf); err == nil { @@ -179,40 +182,69 @@ func (o *srv) Listen(ctx context.Context) error { } defer fctClose() + stp.Store(false) + go func() { + <-ctx.Done() + go func() { + _ = o.Shutdown() + }() + return + }() + + go func() { + <-o.Done() + + err = nil + stp.Store(true) + + if con != nil { + o.fctError(con.Close()) + } + + return + }() + + o.r.Store(true) // Accept new connection or stop if context or shutdown trigger for { - select { - case <-ctx.Done(): - return ErrContextClosed - case <-o.Done(): - return nil - default: - // Accept an incoming connection. - if con == nil { - return ErrServerClosed - } - - var buf = make([]byte, o.buffSize()) - nbr, rem, err = con.ReadFrom(buf) - - if rem == nil { - rem = &net.UnixAddr{} - } - - o.fctInfo(loc, rem, libsck.ConnectionRead) - - if err != nil { - o.fctError(err) - continue - } - - go func(la, ra net.Addr, b []byte) { - o.fctInfo(la, ra, libsck.ConnectionHandler) - r := bytes.NewBuffer(b) - r.WriteString("\n") - hdl(r, io.Discard) - }(loc, rem, buf[:nbr]) + // Accept an incoming connection. + if con == nil { + return ErrServerClosed + } else if stp.Load() { + return err } + + var ( + buf = make([]byte, o.buffSize()) + rer error + ) + + nbr, rem, rer = con.ReadFrom(buf) + + if rem == nil { + rem = &net.UnixAddr{} + } + + o.fctInfo(loc, rem, libsck.ConnectionRead) + + if rer != nil { + if !stp.Load() { + o.fctError(rer) + } + continue + } + + go func(la, ra net.Addr, b []byte) { + o.fctInfo(la, ra, libsck.ConnectionHandler) + + r := bytes.NewBuffer(b) + + if !bytes.HasSuffix(b, []byte{libsck.EOL}) { + r.Write([]byte{libsck.EOL}) + } + + hdl(r, io.Discard) + }(loc, rem, buf[:nbr]) } } diff --git a/socket/server/unixgram/model.go b/socket/server/unixgram/model.go index fe4f6ae..3803566 100644 --- a/socket/server/unixgram/model.go +++ b/socket/server/unixgram/model.go @@ -30,10 +30,12 @@ package unixgram import ( + "context" "fmt" "net" "os" "sync/atomic" + "time" libptc "github.com/nabbar/golib/network/protocol" @@ -56,6 +58,7 @@ type srv struct { h *atomic.Value // handler c *atomic.Value // chan []byte s *atomic.Value // chan struct{} + r *atomic.Bool // is Running fe *atomic.Value // function error fi *atomic.Value // function info @@ -67,6 +70,10 @@ type srv struct { sg *atomic.Int32 // file unix group perm } +func (o *srv) IsRunning() bool { + return o.r.Load() +} + func (o *srv) Done() <-chan struct{} { s := o.s.Load() if s != nil { @@ -76,14 +83,47 @@ func (o *srv) Done() <-chan struct{} { return closedChanStruct } -func (o *srv) Shutdown() { +func (o *srv) Close() error { + return o.Shutdown() +} + +func (o *srv) Shutdown() error { if o == nil { - return + return ErrInvalidInstance } s := o.s.Load() if s != nil { - o.s.Store(nil) + if c, k := s.(chan struct{}); k { + c <- struct{}{} + o.s.Store(c) + } + } + + var ( + tck = time.NewTicker(100 * time.Millisecond) + ctx, cnl = context.WithTimeout(context.Background(), 25*time.Second) + ) + + defer func() { + if s != nil { + o.s.Store(closedChanStruct) + } + + tck.Stop() + cnl() + }() + + for { + select { + case <-ctx.Done(): + return ErrShutdownTimeout + case <-tck.C: + if o.IsRunning() { + continue + } + return nil + } } }