mirror of
https://github.com/nabbar/golib.git
synced 2025-10-04 23:42:46 +08:00
Package Socket:
- fix tcp,udp,unix socket - add socket unixgram for session less unix socket - optmize process & code - add some standalone test for server / client (add example server config for rsyslog) Package Network/Protocol: - add unixgram protocol Package Errors: - fix bug in loop to prevent circular include
This commit is contained in:
@@ -30,27 +30,25 @@
|
||||
package unix
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"bufio"
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"net"
|
||||
"os"
|
||||
"sync/atomic"
|
||||
|
||||
libptc "github.com/nabbar/golib/network/protocol"
|
||||
libsck "github.com/nabbar/golib/socket"
|
||||
)
|
||||
|
||||
type cltx struct {
|
||||
a *atomic.Value // address : unixfile
|
||||
s *atomic.Int32 // buffer size
|
||||
e *atomic.Value // function error
|
||||
i *atomic.Value // function info
|
||||
tr *atomic.Value // connection read timeout
|
||||
tw *atomic.Value // connection write timeout
|
||||
type cli struct {
|
||||
a *atomic.Value // address : unixfile
|
||||
s *atomic.Int32 // buffer size
|
||||
e *atomic.Value // function error
|
||||
i *atomic.Value // function info
|
||||
}
|
||||
|
||||
func (o *cltx) RegisterFuncError(f libsck.FuncError) {
|
||||
func (o *cli) RegisterFuncError(f libsck.FuncError) {
|
||||
if o == nil {
|
||||
return
|
||||
}
|
||||
@@ -58,7 +56,7 @@ func (o *cltx) RegisterFuncError(f libsck.FuncError) {
|
||||
o.e.Store(f)
|
||||
}
|
||||
|
||||
func (o *cltx) RegisterFuncInfo(f libsck.FuncInfo) {
|
||||
func (o *cli) RegisterFuncInfo(f libsck.FuncInfo) {
|
||||
if o == nil {
|
||||
return
|
||||
}
|
||||
@@ -66,7 +64,7 @@ func (o *cltx) RegisterFuncInfo(f libsck.FuncInfo) {
|
||||
o.i.Store(f)
|
||||
}
|
||||
|
||||
func (o *cltx) fctError(e error) {
|
||||
func (o *cli) fctError(e error) {
|
||||
if o == nil {
|
||||
return
|
||||
}
|
||||
@@ -77,7 +75,7 @@ func (o *cltx) fctError(e error) {
|
||||
}
|
||||
}
|
||||
|
||||
func (o *cltx) fctInfo(local, remote net.Addr, state libsck.ConnState) {
|
||||
func (o *cli) fctInfo(local, remote net.Addr, state libsck.ConnState) {
|
||||
if o == nil {
|
||||
return
|
||||
}
|
||||
@@ -88,84 +86,115 @@ func (o *cltx) fctInfo(local, remote net.Addr, state libsck.ConnState) {
|
||||
}
|
||||
}
|
||||
|
||||
func (o *cltx) buffRead() *bytes.Buffer {
|
||||
func (o *cli) buffSize() int {
|
||||
v := o.s.Load()
|
||||
if v > 0 {
|
||||
return bytes.NewBuffer(make([]byte, 0, int(v)))
|
||||
return int(v)
|
||||
}
|
||||
|
||||
return bytes.NewBuffer(make([]byte, 0, libsck.DefaultBufferSize))
|
||||
return libsck.DefaultBufferSize
|
||||
}
|
||||
|
||||
func (o *cltx) dial(ctx context.Context) (net.Conn, error) {
|
||||
func (o *cli) dial(ctx context.Context) (net.Conn, error) {
|
||||
if o == nil {
|
||||
return nil, ErrInvalidInstance
|
||||
return nil, ErrInstance
|
||||
}
|
||||
|
||||
v := o.a.Load()
|
||||
|
||||
if v == nil {
|
||||
return nil, ErrUnixFile
|
||||
} else if _, e := os.Stat(v.(string)); e != nil {
|
||||
return nil, e
|
||||
return nil, ErrAddress
|
||||
} else if adr, ok := v.(string); !ok {
|
||||
return nil, ErrAddress
|
||||
} else {
|
||||
d := net.Dialer{}
|
||||
return d.DialContext(ctx, libptc.NetworkUnix.Code(), v.(string))
|
||||
return d.DialContext(ctx, libptc.NetworkUnix.Code(), adr)
|
||||
}
|
||||
}
|
||||
|
||||
func (o *cltx) Do(ctx context.Context, request io.Reader, fct libsck.Response) error {
|
||||
func (o *cli) Do(ctx context.Context, request io.Reader, fct libsck.Response) error {
|
||||
if o == nil {
|
||||
return ErrInvalidInstance
|
||||
return ErrInstance
|
||||
}
|
||||
|
||||
var (
|
||||
e error
|
||||
lc net.Addr
|
||||
rm net.Addr
|
||||
err error
|
||||
cnn net.Conn
|
||||
)
|
||||
|
||||
o.fctInfo(nil, nil, libsck.ConnectionDial)
|
||||
if cnn, e = o.dial(ctx); e != nil {
|
||||
o.fctError(e)
|
||||
return e
|
||||
}
|
||||
|
||||
defer func() {
|
||||
err := cnn.Close()
|
||||
o.fctError(err)
|
||||
if cnn != nil {
|
||||
o.fctInfo(cnn.LocalAddr(), cnn.RemoteAddr(), libsck.ConnectionClose)
|
||||
o.fctError(cnn.Close())
|
||||
}
|
||||
}()
|
||||
|
||||
lc = cnn.LocalAddr()
|
||||
rm = cnn.RemoteAddr()
|
||||
|
||||
defer o.fctInfo(lc, rm, libsck.ConnectionClose)
|
||||
o.fctInfo(lc, rm, libsck.ConnectionNew)
|
||||
|
||||
if request != nil {
|
||||
o.fctInfo(lc, rm, libsck.ConnectionWrite)
|
||||
if _, e = io.Copy(cnn, request); e != nil {
|
||||
o.fctError(e)
|
||||
return e
|
||||
}
|
||||
o.fctInfo(&net.UnixAddr{}, &net.UnixAddr{}, libsck.ConnectionDial)
|
||||
if cnn, err = o.dial(ctx); err != nil {
|
||||
o.fctError(err)
|
||||
return err
|
||||
}
|
||||
|
||||
o.fctInfo(lc, rm, libsck.ConnectionCloseWrite)
|
||||
if e = cnn.(*net.UnixConn).CloseWrite(); e != nil {
|
||||
o.fctError(e)
|
||||
return e
|
||||
}
|
||||
o.fctInfo(cnn.LocalAddr(), cnn.RemoteAddr(), libsck.ConnectionNew)
|
||||
|
||||
o.fctInfo(lc, rm, libsck.ConnectionHandler)
|
||||
if fct != nil {
|
||||
fct(cnn)
|
||||
}
|
||||
|
||||
o.fctInfo(lc, rm, libsck.ConnectionCloseRead)
|
||||
if e = cnn.(*net.UnixConn).CloseRead(); e != nil {
|
||||
o.fctError(e)
|
||||
return e
|
||||
}
|
||||
o.sendRequest(cnn, request)
|
||||
o.readResponse(cnn, fct)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o *cli) sendRequest(con net.Conn, r io.Reader) {
|
||||
var (
|
||||
err error
|
||||
buf []byte
|
||||
rdr = bufio.NewReaderSize(r, o.buffSize())
|
||||
wrt = bufio.NewWriterSize(con, o.buffSize())
|
||||
)
|
||||
|
||||
defer func() {
|
||||
if con != nil {
|
||||
if c, ok := con.(*net.UnixConn); ok {
|
||||
o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionCloseWrite)
|
||||
_ = c.CloseWrite()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
if con == nil && r == nil {
|
||||
return
|
||||
}
|
||||
|
||||
buf, err = rdr.ReadBytes('\n')
|
||||
|
||||
if err != nil {
|
||||
if !errors.Is(err, io.EOF) {
|
||||
o.fctError(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionWrite)
|
||||
|
||||
_, err = wrt.Write(buf)
|
||||
if err != nil {
|
||||
o.fctError(err)
|
||||
return
|
||||
}
|
||||
|
||||
err = wrt.Flush()
|
||||
if err != nil {
|
||||
o.fctError(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (o *cli) readResponse(con net.Conn, f libsck.Response) {
|
||||
if f == nil {
|
||||
return
|
||||
}
|
||||
|
||||
o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionHandler)
|
||||
f(con)
|
||||
}
|
||||
|
Reference in New Issue
Block a user