diff --git a/aws/role_test.go b/aws/role_test.go index 4c1e01e..b67d0a5 100644 --- a/aws/role_test.go +++ b/aws/role_test.go @@ -42,11 +42,7 @@ var _ = Describe("Role", func() { Context("Creation", func() { It("Must fail with invalid json", func() { - /* if minioMode { - err = fmt.Errorf("backend not compatible following AWS API reference") - } else { - */_, err = cli.Role().Add(name, "{}") - // } + _, err = cli.Role().Add(name, "{}") Expect(err).To(HaveOccurred()) }) It("Must succeed", func() { @@ -60,11 +56,7 @@ var _ = Describe("Role", func() { }) Context("Attach", func() { It("Must fail with invalid params", func() { - /* if minioMode { - err = fmt.Errorf("backend not compatible following AWS API reference") - } else { - */err = cli.Role().PolicyAttach("policyArn", "roleName") - // } + err = cli.Role().PolicyAttach("policyArn", "roleName") Expect(err).To(HaveOccurred()) }) It("Must succeed", func() { @@ -85,11 +77,7 @@ var _ = Describe("Role", func() { }) Context("List attached policies to role", func() { It("Must fail with invalid role name", func() { - /* if minioMode { - err = fmt.Errorf("backend not compatible following AWS API reference") - } else { - */_, err = cli.Role().PolicyListAttached("invalidRoleName") - // } + _, err = cli.Role().PolicyListAttached("invalidRoleName") Expect(err).To(HaveOccurred()) }) It("Must return 1 policy", func() { @@ -113,11 +101,7 @@ var _ = Describe("Role", func() { }) Context("Detach", func() { It("Must fail with invalid params", func() { - /* if minioMode { - err = fmt.Errorf("backend not compatible following AWS API reference") - } else { - */err = cli.Role().PolicyDetach("policyArn", "roleName") - // } + err = cli.Role().PolicyDetach("policyArn", "roleName") Expect(err).To(HaveOccurred()) }) It("Must succeed", func() { @@ -151,12 +135,7 @@ var _ = Describe("Role", func() { Expect(roleArn).To(Equal(arn)) }) It("Must fail with invalid name", func() { - /* if minioMode { - err = nil - } else { - */_, err = cli.Role().Check("invalid name") - // } - + _, err = cli.Role().Check("invalid name") Expect(err).To(HaveOccurred()) }) }) @@ -189,11 +168,7 @@ var _ = Describe("Role", func() { Expect(err).ToNot(HaveOccurred()) }) It("Must fail", func() { - /* if minioMode { - err = fmt.Errorf("backend not compatible following AWS API reference") - } else { - */err = cli.Role().Delete(name) - // } + err = cli.Role().Delete(name) Expect(err).To(HaveOccurred()) }) }) diff --git a/errors/errors.go b/errors/errors.go index a24f760..4f4b8e4 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -100,8 +100,10 @@ func (e *ers) Add(parent ...error) { if er, ok = v.(*ers); ok { // prevent circular addition - if e.is(er) { - continue + if e.IsError(er) { + for _, erp := range er.p { + e.Add(erp) + } } else { e.p = append(e.p, er) } diff --git a/network/protocol/interface.go b/network/protocol/interface.go index 595ce78..87e1bfe 100644 --- a/network/protocol/interface.go +++ b/network/protocol/interface.go @@ -46,6 +46,7 @@ const ( NetworkIP NetworkIP4 NetworkIP6 + NetworkUnixGram ) func Parse(str string) NetworkProtocol { @@ -60,6 +61,8 @@ func Parse(str string) NetworkProtocol { return NetworkUDP case strings.EqualFold(NetworkUnix.Code(), str): return NetworkUnix + case strings.EqualFold(NetworkUnixGram.Code(), str): + return NetworkUnixGram default: return NetworkEmpty } diff --git a/network/protocol/model.go b/network/protocol/model.go index 9e8e37d..dc7d81e 100644 --- a/network/protocol/model.go +++ b/network/protocol/model.go @@ -56,6 +56,8 @@ func (n NetworkProtocol) String() string { return "ip4" case NetworkIP6: return "ip6" + case NetworkUnixGram: + return "UnixGram" default: return "" } diff --git a/socket/client/interface_linux.go b/socket/client/interface_linux.go index 4eed425..561efa3 100644 --- a/socket/client/interface_linux.go +++ b/socket/client/interface_linux.go @@ -35,22 +35,28 @@ import ( "strings" libptc "github.com/nabbar/golib/network/protocol" + libsiz "github.com/nabbar/golib/size" libsck "github.com/nabbar/golib/socket" sckclt "github.com/nabbar/golib/socket/client/tcp" sckclu "github.com/nabbar/golib/socket/client/udp" sckclx "github.com/nabbar/golib/socket/client/unix" + sckgrm "github.com/nabbar/golib/socket/client/unixgram" ) -func New(proto libptc.NetworkProtocol, sizeBufferRead int32, address string) (libsck.Client, error) { +func New(proto libptc.NetworkProtocol, sizeBufferRead libsiz.Size, address string) (libsck.Client, error) { switch proto { case libptc.NetworkUnix: if strings.EqualFold(runtime.GOOS, "linux") { return sckclx.New(sizeBufferRead, address), nil } + case libptc.NetworkUnixGram: + if strings.EqualFold(runtime.GOOS, "linux") { + return sckgrm.New(sizeBufferRead, address), nil + } case libptc.NetworkTCP, libptc.NetworkTCP4, libptc.NetworkTCP6: return sckclt.New(sizeBufferRead, address) case libptc.NetworkUDP, libptc.NetworkUDP4, libptc.NetworkUDP6: - return sckclu.New(address) + return sckclu.New(sizeBufferRead, address) } return nil, fmt.Errorf("invalid client protocol") diff --git a/socket/client/interface_other.go b/socket/client/interface_other.go index 37dbc9c..21d8613 100644 --- a/socket/client/interface_other.go +++ b/socket/client/interface_other.go @@ -33,17 +33,18 @@ import ( "fmt" libptc "github.com/nabbar/golib/network/protocol" + libsiz "github.com/nabbar/golib/size" libsck "github.com/nabbar/golib/socket" sckclt "github.com/nabbar/golib/socket/client/tcp" sckclu "github.com/nabbar/golib/socket/client/udp" ) -func New(proto libptc.NetworkProtocol, sizeBufferRead int32, address string) (libsck.Client, error) { +func New(proto libptc.NetworkProtocol, sizeBufferRead libsiz.Size, address string) (libsck.Client, error) { switch proto { case libptc.NetworkTCP, libptc.NetworkTCP4, libptc.NetworkTCP6: return sckclt.New(sizeBufferRead, address) case libptc.NetworkUDP, libptc.NetworkUDP4, libptc.NetworkUDP6: - return sckclu.New(address) + return sckclu.New(sizeBufferRead, address) } return nil, fmt.Errorf("invalid client protocol") diff --git a/socket/client/tcp/error.go b/socket/client/tcp/error.go index 817daf9..5618c7e 100644 --- a/socket/client/tcp/error.go +++ b/socket/client/tcp/error.go @@ -29,10 +29,6 @@ package tcp import "fmt" var ( - ErrInstance = fmt.Errorf("invalid instance") - ErrAddress = fmt.Errorf("invalid dial address") - ErrHostName = fmt.Errorf("invalid dial host name") - ErrHostPort = fmt.Errorf("invalid dial host port") - ErrContextClosed = fmt.Errorf("context closed") - ErrClientClosed = fmt.Errorf("server closed") + ErrInstance = fmt.Errorf("invalid instance") + ErrAddress = fmt.Errorf("invalid dial address") ) diff --git a/socket/client/tcp/interface.go b/socket/client/tcp/interface.go index 6666d7a..01c5d48 100644 --- a/socket/client/tcp/interface.go +++ b/socket/client/tcp/interface.go @@ -27,10 +27,13 @@ package tcp import ( - "net/url" - "strconv" + "net" "sync/atomic" + libptc "github.com/nabbar/golib/network/protocol" + + libsiz "github.com/nabbar/golib/size" + libsck "github.com/nabbar/golib/socket" ) @@ -38,35 +41,25 @@ type ClientTCP interface { libsck.Client } -func New(buffSizeRead int32, address string) (ClientTCP, error) { +func New(buffSizeRead libsiz.Size, address string) (ClientTCP, error) { var ( a = new(atomic.Value) s = new(atomic.Int32) - u = &url.URL{ - Host: address, - } ) - if len(u.Hostname()) < 1 { - return nil, ErrHostName - } else if len(u.Port()) < 1 { - return nil, ErrHostPort - } else if i, e := strconv.Atoi(u.Port()); e != nil { - return nil, e - } else if i < 1 || i > 65534 { - return nil, ErrHostPort - } else { - a.Store(u) + if len(address) < 1 { + return nil, ErrAddress + } else if _, err := net.ResolveTCPAddr(libptc.NetworkTCP.Code(), address); err != nil { + return nil, err } - s.Store(buffSizeRead) + a.Store(address) + s.Store(buffSizeRead.Int32()) - return &cltt{ - a: a, - s: s, - e: new(atomic.Value), - i: new(atomic.Value), - tr: new(atomic.Value), - tw: new(atomic.Value), + return &cli{ + a: a, + s: s, + e: new(atomic.Value), + i: new(atomic.Value), }, nil } diff --git a/socket/client/tcp/model.go b/socket/client/tcp/model.go index fe6372e..6ffd7c4 100644 --- a/socket/client/tcp/model.go +++ b/socket/client/tcp/model.go @@ -27,27 +27,25 @@ package tcp import ( - "bytes" + "bufio" "context" + "errors" "io" "net" - "os" "sync/atomic" libptc "github.com/nabbar/golib/network/protocol" libsck "github.com/nabbar/golib/socket" ) -type cltt struct { - a *atomic.Value // address: hostname + port - s *atomic.Int32 // buffer size - e *atomic.Value // function error - i *atomic.Value // function info - tr *atomic.Value // connection read timeout - tw *atomic.Value // connection write timeout +type cli struct { + a *atomic.Value // ptr net TCP Addr + s *atomic.Int32 // buffer size + e *atomic.Value // function error + i *atomic.Value // function info } -func (o *cltt) RegisterFuncError(f libsck.FuncError) { +func (o *cli) RegisterFuncError(f libsck.FuncError) { if o == nil { return } @@ -55,7 +53,7 @@ func (o *cltt) RegisterFuncError(f libsck.FuncError) { o.e.Store(f) } -func (o *cltt) RegisterFuncInfo(f libsck.FuncInfo) { +func (o *cli) RegisterFuncInfo(f libsck.FuncInfo) { if o == nil { return } @@ -63,7 +61,7 @@ func (o *cltt) RegisterFuncInfo(f libsck.FuncInfo) { o.i.Store(f) } -func (o *cltt) fctError(e error) { +func (o *cli) fctError(e error) { if o == nil { return } @@ -74,7 +72,7 @@ func (o *cltt) fctError(e error) { } } -func (o *cltt) fctInfo(local, remote net.Addr, state libsck.ConnState) { +func (o *cli) fctInfo(local, remote net.Addr, state libsck.ConnState) { if o == nil { return } @@ -85,81 +83,115 @@ func (o *cltt) fctInfo(local, remote net.Addr, state libsck.ConnState) { } } -func (o *cltt) buffRead() *bytes.Buffer { +func (o *cli) buffSize() int { v := o.s.Load() if v > 0 { - return bytes.NewBuffer(make([]byte, 0, int(v))) + return int(v) } - return bytes.NewBuffer(make([]byte, 0, libsck.DefaultBufferSize)) + return libsck.DefaultBufferSize } -func (o *cltt) dial(ctx context.Context) (net.Conn, error) { +func (o *cli) dial(ctx context.Context) (net.Conn, error) { if o == nil { return nil, ErrInstance } v := o.a.Load() + if v == nil { return nil, ErrAddress - } else if _, e := os.Stat(v.(string)); e != nil { - return nil, e + } else if adr, ok := v.(string); !ok { + return nil, ErrAddress } else { d := net.Dialer{} - return d.DialContext(ctx, libptc.NetworkTCP.Code(), v.(string)) + return d.DialContext(ctx, libptc.NetworkTCP.Code(), adr) } } -func (o *cltt) Do(ctx context.Context, request io.Reader, fct libsck.Response) error { +func (o *cli) Do(ctx context.Context, request io.Reader, fct libsck.Response) error { if o == nil { return ErrInstance } var ( - e error - lc net.Addr - rm net.Addr + err error cnn net.Conn ) - o.fctInfo(nil, nil, libsck.ConnectionDial) - if cnn, e = o.dial(ctx); e != nil { - o.fctError(e) - return e - } - - defer o.fctError(cnn.Close()) - - lc = cnn.LocalAddr() - rm = cnn.RemoteAddr() - - defer o.fctInfo(lc, rm, libsck.ConnectionClose) - o.fctInfo(lc, rm, libsck.ConnectionNew) - - if request != nil { - o.fctInfo(lc, rm, libsck.ConnectionWrite) - if _, e = io.Copy(cnn, request); e != nil { - o.fctError(e) - return e + defer func() { + if cnn != nil { + o.fctInfo(cnn.LocalAddr(), cnn.RemoteAddr(), libsck.ConnectionClose) + o.fctError(cnn.Close()) } + }() + + o.fctInfo(&net.TCPAddr{}, &net.TCPAddr{}, libsck.ConnectionDial) + if cnn, err = o.dial(ctx); err != nil { + o.fctError(err) + return err } - o.fctInfo(lc, rm, libsck.ConnectionCloseWrite) - if e = cnn.(*net.TCPConn).CloseWrite(); e != nil { - o.fctError(e) - return e - } + o.fctInfo(cnn.LocalAddr(), cnn.RemoteAddr(), libsck.ConnectionNew) - o.fctInfo(lc, rm, libsck.ConnectionHandler) - if fct != nil { - fct(cnn) - } - - o.fctInfo(lc, rm, libsck.ConnectionCloseRead) - if e = cnn.(*net.TCPConn).CloseRead(); e != nil { - o.fctError(e) - return e - } + o.sendRequest(cnn, request) + o.readResponse(cnn, fct) return nil } + +func (o *cli) sendRequest(con net.Conn, r io.Reader) { + var ( + err error + buf []byte + rdr = bufio.NewReaderSize(r, o.buffSize()) + wrt = bufio.NewWriterSize(con, o.buffSize()) + ) + + defer func() { + if con != nil { + if c, ok := con.(*net.TCPConn); ok { + o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionCloseWrite) + _ = c.CloseWrite() + } + } + }() + + for { + if con == nil && r == nil { + return + } + + buf, err = rdr.ReadBytes('\n') + + if err != nil { + if !errors.Is(err, io.EOF) { + o.fctError(err) + } + return + } + + o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionWrite) + + _, err = wrt.Write(buf) + if err != nil { + o.fctError(err) + return + } + + err = wrt.Flush() + if err != nil { + o.fctError(err) + return + } + } +} + +func (o *cli) readResponse(con net.Conn, f libsck.Response) { + if f == nil { + return + } + + o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionHandler) + f(con) +} diff --git a/socket/client/udp/error.go b/socket/client/udp/error.go index 22c1f08..ba4d0fc 100644 --- a/socket/client/udp/error.go +++ b/socket/client/udp/error.go @@ -29,10 +29,6 @@ package udp import "fmt" var ( - ErrInstance = fmt.Errorf("invalid instance") - ErrAddress = fmt.Errorf("invalid dial address") - ErrHostName = fmt.Errorf("invalid dial host name") - ErrHostPort = fmt.Errorf("invalid dial host port") - ErrContextClosed = fmt.Errorf("context closed") - ErrClientClosed = fmt.Errorf("server closed") + ErrInstance = fmt.Errorf("invalid instance") + ErrAddress = fmt.Errorf("invalid dial address") ) diff --git a/socket/client/udp/interface.go b/socket/client/udp/interface.go index 28bf061..b54cb8f 100644 --- a/socket/client/udp/interface.go +++ b/socket/client/udp/interface.go @@ -27,10 +27,13 @@ package udp import ( - "net/url" - "strconv" + "net" "sync/atomic" + libsiz "github.com/nabbar/golib/size" + + libptc "github.com/nabbar/golib/network/protocol" + libsck "github.com/nabbar/golib/socket" ) @@ -38,31 +41,25 @@ type ClientUDP interface { libsck.Client } -func New(address string) (ClientUDP, error) { +func New(buffSizeRead libsiz.Size, address string) (ClientUDP, error) { var ( a = new(atomic.Value) - u = &url.URL{ - Host: address, - } + s = new(atomic.Int32) ) - if len(u.Hostname()) < 1 { - return nil, ErrHostName - } else if len(u.Port()) < 1 { - return nil, ErrHostPort - } else if i, e := strconv.Atoi(u.Port()); e != nil { - return nil, e - } else if i < 1 || i > 65534 { - return nil, ErrHostPort - } else { - a.Store(u) + if len(address) < 1 { + return nil, ErrAddress + } else if _, err := net.ResolveUDPAddr(libptc.NetworkUDP.Code(), address); err != nil { + return nil, err } - return &cltu{ - a: a, - e: new(atomic.Value), - i: new(atomic.Value), - tr: new(atomic.Value), - tw: new(atomic.Value), + a.Store(address) + s.Store(buffSizeRead.Int32()) + + return &cli{ + a: a, + s: s, + e: new(atomic.Value), + i: new(atomic.Value), }, nil } diff --git a/socket/client/udp/model.go b/socket/client/udp/model.go index d12625f..12c576d 100644 --- a/socket/client/udp/model.go +++ b/socket/client/udp/model.go @@ -27,26 +27,25 @@ package udp import ( - "bytes" + "bufio" "context" + "errors" "io" "net" - "os" "sync/atomic" libptc "github.com/nabbar/golib/network/protocol" libsck "github.com/nabbar/golib/socket" ) -type cltu struct { - a *atomic.Value // address: hostname + port - e *atomic.Value // function error - i *atomic.Value // function info - tr *atomic.Value // connection read timeout - tw *atomic.Value // connection write timeout +type cli struct { + a *atomic.Value // address: hostname + port + s *atomic.Int32 // buffer size + e *atomic.Value // function error + i *atomic.Value // function info } -func (o *cltu) RegisterFuncError(f libsck.FuncError) { +func (o *cli) RegisterFuncError(f libsck.FuncError) { if o == nil { return } @@ -54,7 +53,7 @@ func (o *cltu) RegisterFuncError(f libsck.FuncError) { o.e.Store(f) } -func (o *cltu) RegisterFuncInfo(f libsck.FuncInfo) { +func (o *cli) RegisterFuncInfo(f libsck.FuncInfo) { if o == nil { return } @@ -62,7 +61,7 @@ func (o *cltu) RegisterFuncInfo(f libsck.FuncInfo) { o.i.Store(f) } -func (o *cltu) fctError(e error) { +func (o *cli) fctError(e error) { if o == nil { return } @@ -73,7 +72,7 @@ func (o *cltu) fctError(e error) { } } -func (o *cltu) fctInfo(local, remote net.Addr, state libsck.ConnState) { +func (o *cli) fctInfo(local, remote net.Addr, state libsck.ConnState) { if o == nil { return } @@ -84,64 +83,106 @@ func (o *cltu) fctInfo(local, remote net.Addr, state libsck.ConnState) { } } -func (o *cltu) buffRead() *bytes.Buffer { - return bytes.NewBuffer(make([]byte, 0, 1)) +func (o *cli) buffSize() int { + v := o.s.Load() + if v > 0 { + return int(v) + } + + return libsck.DefaultBufferSize } -func (o *cltu) dial(ctx context.Context) (net.Conn, error) { +func (o *cli) dial(ctx context.Context) (net.Conn, error) { if o == nil { return nil, ErrInstance } v := o.a.Load() + if v == nil { return nil, ErrAddress - } else if _, e := os.Stat(v.(string)); e != nil { - return nil, e + } else if adr, ok := v.(string); !ok { + return nil, ErrAddress } else { d := net.Dialer{} - return d.DialContext(ctx, libptc.NetworkUDP.Code(), v.(string)) + return d.DialContext(ctx, libptc.NetworkUDP.Code(), adr) } } -func (o *cltu) Do(ctx context.Context, request io.Reader, fct libsck.Response) error { +func (o *cli) Do(ctx context.Context, request io.Reader, fct libsck.Response) error { if o == nil { return ErrInstance } var ( - e error - lc net.Addr - rm net.Addr + err error cnn net.Conn ) - o.fctInfo(nil, nil, libsck.ConnectionDial) - if cnn, e = o.dial(ctx); e != nil { - o.fctError(e) - return e - } - - defer o.fctError(cnn.Close()) - - lc = cnn.LocalAddr() - rm = cnn.RemoteAddr() - - defer o.fctInfo(lc, rm, libsck.ConnectionClose) - o.fctInfo(lc, rm, libsck.ConnectionNew) - - if request != nil { - o.fctInfo(lc, rm, libsck.ConnectionWrite) - if _, e = io.Copy(cnn, request); e != nil { - o.fctError(e) - return e + defer func() { + if cnn != nil { + o.fctInfo(cnn.LocalAddr(), cnn.RemoteAddr(), libsck.ConnectionClose) + o.fctError(cnn.Close()) } + }() + + o.fctInfo(&net.UDPAddr{}, &net.UDPAddr{}, libsck.ConnectionDial) + if cnn, err = o.dial(ctx); err != nil { + o.fctError(err) + return err } - o.fctInfo(lc, rm, libsck.ConnectionHandler) - if fct != nil { - fct(cnn) - } + o.fctInfo(cnn.LocalAddr(), cnn.RemoteAddr(), libsck.ConnectionNew) + + o.sendRequest(cnn, request) + o.readResponse(cnn, fct) return nil } + +func (o *cli) sendRequest(con net.Conn, r io.Reader) { + var ( + err error + buf []byte + rdr = bufio.NewReaderSize(r, o.buffSize()) + wrt = bufio.NewWriterSize(con, o.buffSize()) + ) + + for { + if con == nil && r == nil { + return + } + + buf, err = rdr.ReadBytes('\n') + + if err != nil { + if !errors.Is(err, io.EOF) { + o.fctError(err) + } + return + } + + o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionWrite) + + _, err = wrt.Write(buf) + if err != nil { + o.fctError(err) + return + } + + err = wrt.Flush() + if err != nil { + o.fctError(err) + return + } + } +} + +func (o *cli) readResponse(con net.Conn, f libsck.Response) { + if f == nil { + return + } + + o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionHandler) + f(con) +} diff --git a/socket/client/unix/error.go b/socket/client/unix/error.go index d62f3ef..8bf0c5b 100644 --- a/socket/client/unix/error.go +++ b/socket/client/unix/error.go @@ -32,8 +32,6 @@ package unix import "fmt" var ( - ErrInvalidInstance = fmt.Errorf("invalid instance") - ErrUnixFile = fmt.Errorf("invalid unix file") - ErrContextClosed = fmt.Errorf("context closed") - ErrServerClosed = fmt.Errorf("server closed") + ErrInstance = fmt.Errorf("invalid instance") + ErrAddress = fmt.Errorf("invalid dial address") ) diff --git a/socket/client/unix/ignore.go b/socket/client/unix/ignore.go index 5974a97..c5b6188 100644 --- a/socket/client/unix/ignore.go +++ b/socket/client/unix/ignore.go @@ -25,3 +25,5 @@ */ package unix + +// this file is to prevent error on build with system not compatible with unix diff --git a/socket/client/unix/interface.go b/socket/client/unix/interface.go index 50c87e1..53849d2 100644 --- a/socket/client/unix/interface.go +++ b/socket/client/unix/interface.go @@ -32,6 +32,8 @@ package unix import ( "sync/atomic" + libsiz "github.com/nabbar/golib/size" + libsck "github.com/nabbar/golib/socket" ) @@ -39,21 +41,19 @@ type ClientUnix interface { libsck.Client } -func New(buffSizeRead int32, unixfile string) ClientUnix { +func New(buffSizeRead libsiz.Size, unixfile string) ClientUnix { var ( a = new(atomic.Value) s = new(atomic.Int32) ) a.Store(unixfile) - s.Store(buffSizeRead) + s.Store(buffSizeRead.Int32()) - return &cltx{ - a: a, - s: s, - e: new(atomic.Value), - i: new(atomic.Value), - tr: new(atomic.Value), - tw: new(atomic.Value), + return &cli{ + a: a, + s: s, + e: new(atomic.Value), + i: new(atomic.Value), } } diff --git a/socket/client/unix/model.go b/socket/client/unix/model.go index b40f3fc..3b2ebac 100644 --- a/socket/client/unix/model.go +++ b/socket/client/unix/model.go @@ -30,27 +30,25 @@ package unix import ( - "bytes" + "bufio" "context" + "errors" "io" "net" - "os" "sync/atomic" libptc "github.com/nabbar/golib/network/protocol" libsck "github.com/nabbar/golib/socket" ) -type cltx struct { - a *atomic.Value // address : unixfile - s *atomic.Int32 // buffer size - e *atomic.Value // function error - i *atomic.Value // function info - tr *atomic.Value // connection read timeout - tw *atomic.Value // connection write timeout +type cli struct { + a *atomic.Value // address : unixfile + s *atomic.Int32 // buffer size + e *atomic.Value // function error + i *atomic.Value // function info } -func (o *cltx) RegisterFuncError(f libsck.FuncError) { +func (o *cli) RegisterFuncError(f libsck.FuncError) { if o == nil { return } @@ -58,7 +56,7 @@ func (o *cltx) RegisterFuncError(f libsck.FuncError) { o.e.Store(f) } -func (o *cltx) RegisterFuncInfo(f libsck.FuncInfo) { +func (o *cli) RegisterFuncInfo(f libsck.FuncInfo) { if o == nil { return } @@ -66,7 +64,7 @@ func (o *cltx) RegisterFuncInfo(f libsck.FuncInfo) { o.i.Store(f) } -func (o *cltx) fctError(e error) { +func (o *cli) fctError(e error) { if o == nil { return } @@ -77,7 +75,7 @@ func (o *cltx) fctError(e error) { } } -func (o *cltx) fctInfo(local, remote net.Addr, state libsck.ConnState) { +func (o *cli) fctInfo(local, remote net.Addr, state libsck.ConnState) { if o == nil { return } @@ -88,84 +86,115 @@ func (o *cltx) fctInfo(local, remote net.Addr, state libsck.ConnState) { } } -func (o *cltx) buffRead() *bytes.Buffer { +func (o *cli) buffSize() int { v := o.s.Load() if v > 0 { - return bytes.NewBuffer(make([]byte, 0, int(v))) + return int(v) } - return bytes.NewBuffer(make([]byte, 0, libsck.DefaultBufferSize)) + return libsck.DefaultBufferSize } -func (o *cltx) dial(ctx context.Context) (net.Conn, error) { +func (o *cli) dial(ctx context.Context) (net.Conn, error) { if o == nil { - return nil, ErrInvalidInstance + return nil, ErrInstance } v := o.a.Load() + if v == nil { - return nil, ErrUnixFile - } else if _, e := os.Stat(v.(string)); e != nil { - return nil, e + return nil, ErrAddress + } else if adr, ok := v.(string); !ok { + return nil, ErrAddress } else { d := net.Dialer{} - return d.DialContext(ctx, libptc.NetworkUnix.Code(), v.(string)) + return d.DialContext(ctx, libptc.NetworkUnix.Code(), adr) } } -func (o *cltx) Do(ctx context.Context, request io.Reader, fct libsck.Response) error { +func (o *cli) Do(ctx context.Context, request io.Reader, fct libsck.Response) error { if o == nil { - return ErrInvalidInstance + return ErrInstance } var ( - e error - lc net.Addr - rm net.Addr + err error cnn net.Conn ) - o.fctInfo(nil, nil, libsck.ConnectionDial) - if cnn, e = o.dial(ctx); e != nil { - o.fctError(e) - return e - } - defer func() { - err := cnn.Close() - o.fctError(err) + if cnn != nil { + o.fctInfo(cnn.LocalAddr(), cnn.RemoteAddr(), libsck.ConnectionClose) + o.fctError(cnn.Close()) + } }() - lc = cnn.LocalAddr() - rm = cnn.RemoteAddr() - - defer o.fctInfo(lc, rm, libsck.ConnectionClose) - o.fctInfo(lc, rm, libsck.ConnectionNew) - - if request != nil { - o.fctInfo(lc, rm, libsck.ConnectionWrite) - if _, e = io.Copy(cnn, request); e != nil { - o.fctError(e) - return e - } + o.fctInfo(&net.UnixAddr{}, &net.UnixAddr{}, libsck.ConnectionDial) + if cnn, err = o.dial(ctx); err != nil { + o.fctError(err) + return err } - o.fctInfo(lc, rm, libsck.ConnectionCloseWrite) - if e = cnn.(*net.UnixConn).CloseWrite(); e != nil { - o.fctError(e) - return e - } + o.fctInfo(cnn.LocalAddr(), cnn.RemoteAddr(), libsck.ConnectionNew) - o.fctInfo(lc, rm, libsck.ConnectionHandler) - if fct != nil { - fct(cnn) - } - - o.fctInfo(lc, rm, libsck.ConnectionCloseRead) - if e = cnn.(*net.UnixConn).CloseRead(); e != nil { - o.fctError(e) - return e - } + o.sendRequest(cnn, request) + o.readResponse(cnn, fct) return nil } + +func (o *cli) sendRequest(con net.Conn, r io.Reader) { + var ( + err error + buf []byte + rdr = bufio.NewReaderSize(r, o.buffSize()) + wrt = bufio.NewWriterSize(con, o.buffSize()) + ) + + defer func() { + if con != nil { + if c, ok := con.(*net.UnixConn); ok { + o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionCloseWrite) + _ = c.CloseWrite() + } + } + }() + + for { + if con == nil && r == nil { + return + } + + buf, err = rdr.ReadBytes('\n') + + if err != nil { + if !errors.Is(err, io.EOF) { + o.fctError(err) + } + return + } + + o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionWrite) + + _, err = wrt.Write(buf) + if err != nil { + o.fctError(err) + return + } + + err = wrt.Flush() + if err != nil { + o.fctError(err) + return + } + } +} + +func (o *cli) readResponse(con net.Conn, f libsck.Response) { + if f == nil { + return + } + + o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionHandler) + f(con) +} diff --git a/socket/client/unixgram/error.go b/socket/client/unixgram/error.go new file mode 100644 index 0000000..b12132a --- /dev/null +++ b/socket/client/unixgram/error.go @@ -0,0 +1,37 @@ +//go:build linux +// +build linux + +/* + * MIT License + * + * Copyright (c) 2022 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * + */ + +package unixgram + +import "fmt" + +var ( + ErrInstance = fmt.Errorf("invalid instance") + ErrAddress = fmt.Errorf("invalid dial address") +) diff --git a/socket/client/unixgram/ignore.go b/socket/client/unixgram/ignore.go new file mode 100644 index 0000000..bf209ae --- /dev/null +++ b/socket/client/unixgram/ignore.go @@ -0,0 +1,29 @@ +/* + * MIT License + * + * Copyright (c) 2022 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * + */ + +package unixgram + +// this file is to prevent error on build with system not compatible with unix diff --git a/socket/client/unixgram/interface.go b/socket/client/unixgram/interface.go new file mode 100644 index 0000000..733d5b6 --- /dev/null +++ b/socket/client/unixgram/interface.go @@ -0,0 +1,59 @@ +//go:build linux +// +build linux + +/* + * MIT License + * + * Copyright (c) 2022 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * + */ + +package unixgram + +import ( + "sync/atomic" + + libsiz "github.com/nabbar/golib/size" + + libsck "github.com/nabbar/golib/socket" +) + +type ClientUnix interface { + libsck.Client +} + +func New(buffSizeRead libsiz.Size, unixfile string) ClientUnix { + var ( + a = new(atomic.Value) + s = new(atomic.Int32) + ) + + a.Store(unixfile) + s.Store(buffSizeRead.Int32()) + + return &cli{ + a: a, + s: s, + e: new(atomic.Value), + i: new(atomic.Value), + } +} diff --git a/socket/client/unixgram/model.go b/socket/client/unixgram/model.go new file mode 100644 index 0000000..70d4786 --- /dev/null +++ b/socket/client/unixgram/model.go @@ -0,0 +1,191 @@ +//go:build linux +// +build linux + +/* + * MIT License + * + * Copyright (c) 2022 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * + */ + +package unixgram + +import ( + "bufio" + "context" + "errors" + "io" + "net" + "sync/atomic" + + libptc "github.com/nabbar/golib/network/protocol" + libsck "github.com/nabbar/golib/socket" +) + +type cli struct { + a *atomic.Value // address : unixfile + s *atomic.Int32 // buffer size + e *atomic.Value // function error + i *atomic.Value // function info +} + +func (o *cli) RegisterFuncError(f libsck.FuncError) { + if o == nil { + return + } + + o.e.Store(f) +} + +func (o *cli) RegisterFuncInfo(f libsck.FuncInfo) { + if o == nil { + return + } + + o.i.Store(f) +} + +func (o *cli) fctError(e error) { + if o == nil { + return + } + + v := o.e.Load() + if v != nil { + v.(libsck.FuncError)(e) + } +} + +func (o *cli) fctInfo(local, remote net.Addr, state libsck.ConnState) { + if o == nil { + return + } + + v := o.i.Load() + if v != nil { + v.(libsck.FuncInfo)(local, remote, state) + } +} + +func (o *cli) buffSize() int { + v := o.s.Load() + if v > 0 { + return int(v) + } + + return libsck.DefaultBufferSize +} + +func (o *cli) dial(ctx context.Context) (net.Conn, error) { + if o == nil { + return nil, ErrInstance + } + + v := o.a.Load() + + if v == nil { + return nil, ErrAddress + } else if adr, ok := v.(string); !ok { + return nil, ErrAddress + } else { + d := net.Dialer{} + return d.DialContext(ctx, libptc.NetworkUnixGram.Code(), adr) + } +} + +func (o *cli) Do(ctx context.Context, request io.Reader, fct libsck.Response) error { + if o == nil { + return ErrInstance + } + + var ( + err error + cnn net.Conn + ) + + defer func() { + if cnn != nil { + o.fctInfo(cnn.LocalAddr(), cnn.RemoteAddr(), libsck.ConnectionClose) + o.fctError(cnn.Close()) + } + }() + + o.fctInfo(&net.UnixAddr{}, &net.UnixAddr{}, libsck.ConnectionDial) + if cnn, err = o.dial(ctx); err != nil { + o.fctError(err) + return err + } + + o.fctInfo(cnn.LocalAddr(), cnn.RemoteAddr(), libsck.ConnectionNew) + + o.sendRequest(cnn, request) + o.readResponse(cnn, fct) + + return nil +} + +func (o *cli) sendRequest(con net.Conn, r io.Reader) { + var ( + err error + buf []byte + rdr = bufio.NewReaderSize(r, o.buffSize()) + wrt = bufio.NewWriterSize(con, o.buffSize()) + ) + + for { + if con == nil && r == nil { + return + } + + buf, err = rdr.ReadBytes('\n') + + if err != nil { + if !errors.Is(err, io.EOF) { + o.fctError(err) + } + return + } + + o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionWrite) + + _, err = wrt.Write(buf) + if err != nil { + o.fctError(err) + return + } + + err = wrt.Flush() + if err != nil { + o.fctError(err) + return + } + } +} + +func (o *cli) readResponse(con net.Conn, f libsck.Response) { + if f == nil { + return + } + + o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionHandler) + f(con) +} diff --git a/socket/config/client.go b/socket/config/client.go index 346b8f6..3b7c626 100644 --- a/socket/config/client.go +++ b/socket/config/client.go @@ -28,6 +28,7 @@ package config import ( libptc "github.com/nabbar/golib/network/protocol" + libsiz "github.com/nabbar/golib/size" libsck "github.com/nabbar/golib/socket" sckclt "github.com/nabbar/golib/socket/client" ) @@ -35,7 +36,7 @@ import ( type ClientConfig struct { Network libptc.NetworkProtocol `` Address string - ReadBuffSize int32 + ReadBuffSize libsiz.Size } func (o ClientConfig) New() (libsck.Client, error) { diff --git a/socket/config/server.go b/socket/config/server.go index 28be1b8..a131613 100644 --- a/socket/config/server.go +++ b/socket/config/server.go @@ -29,7 +29,6 @@ package config import ( "os" - libdur "github.com/nabbar/golib/duration" libptc "github.com/nabbar/golib/network/protocol" libsiz "github.com/nabbar/golib/size" libsck "github.com/nabbar/golib/socket" @@ -40,18 +39,10 @@ type ServerConfig struct { Network libptc.NetworkProtocol `` Address string PermFile os.FileMode + GroupPerm int32 BuffSizeRead libsiz.Size - TimeoutRead libdur.Duration - TimeoutWrite libdur.Duration } func (o ServerConfig) New(handler libsck.Handler) (libsck.Server, error) { - s, e := scksrv.New(handler, o.Network, o.BuffSizeRead, o.Address, o.PermFile) - - if e != nil { - s.SetReadTimeout(o.TimeoutRead.Time()) - s.SetWriteTimeout(o.TimeoutWrite.Time()) - } - - return s, e + return scksrv.New(handler, o.Network, o.BuffSizeRead, o.Address, o.PermFile, o.GroupPerm) } diff --git a/socket/interface.go b/socket/interface.go index 09ad5c3..16ce06c 100644 --- a/socket/interface.go +++ b/socket/interface.go @@ -30,7 +30,6 @@ import ( "context" "io" "net" - "time" libtls "github.com/nabbar/golib/certificates" ) @@ -84,9 +83,6 @@ type Server interface { RegisterFuncInfo(f FuncInfo) RegisterFuncInfoServer(f FuncInfoSrv) - SetReadTimeout(d time.Duration) - SetWriteTimeout(d time.Duration) - SetTLS(enable bool, config libtls.TLSConfig) error Listen(ctx context.Context) error Shutdown() diff --git a/socket/server/interface_linux.go b/socket/server/interface_linux.go index 4ddc87f..cafed52 100644 --- a/socket/server/interface_linux.go +++ b/socket/server/interface_linux.go @@ -41,15 +41,22 @@ import ( scksrt "github.com/nabbar/golib/socket/server/tcp" scksru "github.com/nabbar/golib/socket/server/udp" scksrx "github.com/nabbar/golib/socket/server/unix" + sckgrm "github.com/nabbar/golib/socket/server/unixgram" ) -func New(handler libsck.Handler, proto libptc.NetworkProtocol, sizeBufferRead libsiz.Size, address string, perm os.FileMode) (libsck.Server, error) { +func New(handler libsck.Handler, proto libptc.NetworkProtocol, sizeBufferRead libsiz.Size, address string, perm os.FileMode, gid int32) (libsck.Server, error) { switch proto { case libptc.NetworkUnix: if strings.EqualFold(runtime.GOOS, "linux") { s := scksrx.New(handler, sizeBufferRead) - s.RegisterSocket(address, perm) - return s, nil + e := s.RegisterSocket(address, perm, gid) + return s, e + } + case libptc.NetworkUnixGram: + if strings.EqualFold(runtime.GOOS, "linux") { + s := sckgrm.New(handler, sizeBufferRead) + e := s.RegisterSocket(address, perm, gid) + return s, e } case libptc.NetworkTCP, libptc.NetworkTCP4, libptc.NetworkTCP6: s := scksrt.New(handler, sizeBufferRead) diff --git a/socket/server/interface_other.go b/socket/server/interface_other.go index 5c04570..cf72ae7 100644 --- a/socket/server/interface_other.go +++ b/socket/server/interface_other.go @@ -40,7 +40,7 @@ import ( scksru "github.com/nabbar/golib/socket/server/udp" ) -func New(handler libsck.Handler, proto libptc.NetworkProtocol, sizeBufferRead libsiz.Size, address string, perm os.FileMode) (libsck.Server, error) { +func New(handler libsck.Handler, proto libptc.NetworkProtocol, sizeBufferRead libsiz.Size, address string, perm os.FileMode, gid int32) (libsck.Server, error) { switch proto { case libptc.NetworkTCP, libptc.NetworkTCP4, libptc.NetworkTCP6: s := scksrt.New(handler, sizeBufferRead) diff --git a/socket/server/tcp/inerface.go b/socket/server/tcp/inerface.go index 93a70c0..a0cf9af 100644 --- a/socket/server/tcp/inerface.go +++ b/socket/server/tcp/inerface.go @@ -60,8 +60,6 @@ func New(h libsck.Handler, sizeBuffRead libsiz.Size) ServerTcp { fe: new(atomic.Value), fi: new(atomic.Value), fs: new(atomic.Value), - tr: new(atomic.Value), - tw: new(atomic.Value), sr: sr, ad: new(atomic.Value), } diff --git a/socket/server/tcp/listener.go b/socket/server/tcp/listener.go index 2fb4349..ced813c 100644 --- a/socket/server/tcp/listener.go +++ b/socket/server/tcp/listener.go @@ -27,52 +27,52 @@ package tcp import ( + "bufio" "bytes" "context" "crypto/tls" "io" "net" - "net/url" - "time" libptc "github.com/nabbar/golib/network/protocol" libsck "github.com/nabbar/golib/socket" ) -func (o *srv) timeoutRead() time.Time { - v := o.tr.Load() - if v != nil { - return time.Now().Add(v.(time.Duration)) - } - - return time.Time{} -} - -func (o *srv) timeoutWrite() time.Time { - v := o.tw.Load() - if v != nil { - return time.Now().Add(v.(time.Duration)) - } - - return time.Time{} -} - -func (o *srv) buffRead() *bytes.Buffer { +func (o *srv) buffSize() int { v := o.sr.Load() if v > 0 { - return bytes.NewBuffer(make([]byte, 0, int(v))) + return int(v) } - return bytes.NewBuffer(make([]byte, 0, libsck.DefaultBufferSize)) + return libsck.DefaultBufferSize } -func (o *srv) getAddress() *url.URL { +func (o *srv) getAddress() string { f := o.ad.Load() + if f != nil { - return f.(*url.URL) + return f.(string) } - return nil + return "" +} + +func (o *srv) getListen(addr string) (net.Listener, error) { + var ( + lis net.Listener + err error + ) + + if lis, err = net.Listen(libptc.NetworkTCP.Code(), addr); err != nil { + return lis, err + } else if t := o.getTLS(); t != nil { + lis = tls.NewListener(lis, t) + o.fctInfoSrv("starting listening socket 'TLS %s %s'", libptc.NetworkTCP.String(), addr) + } else { + o.fctInfoSrv("starting listening socket '%s %s'", libptc.NetworkTCP.String(), addr) + } + + return lis, nil } func (o *srv) Listen(ctx context.Context) error { @@ -82,22 +82,8 @@ func (o *srv) Listen(ctx context.Context) error { a = o.getAddress() ) - if a == nil { - return ErrInvalidAddress - } else if t := o.getTLS(); t == nil { - o.fctInfoSrv("starting listening socket '%s %s'", libptc.NetworkTCP.String(), a.Host) - l, e = net.Listen(libptc.NetworkTCP.Code(), a.Host) - } else { - o.fctInfoSrv("starting listening socket 'TLS %s %s'", libptc.NetworkTCP.String(), a.Host) - l, e = tls.Listen(libptc.NetworkTCP.Code(), a.Host, t) - } - - if e != nil { - o.fctError(e) - return e - } - var fctClose = func() { + o.fctInfoSrv("closing listen socket '%s %s'", libptc.NetworkTCP.String(), a) if l != nil { o.fctError(l.Close()) } @@ -105,6 +91,13 @@ func (o *srv) Listen(ctx context.Context) error { defer fctClose() + if len(a) == 0 { + return ErrInvalidAddress + } else if l, e = o.getListen(a); e != nil { + o.fctError(e) + return e + } + // Accept new connection or stop if context or shutdown trigger for { select { @@ -116,7 +109,11 @@ func (o *srv) Listen(ctx context.Context) error { // Accept an incoming connection. if l == nil { return ErrServerClosed - } else if co, ce := l.Accept(); ce != nil { + } + + co, ce := l.Accept() + + if ce != nil { o.fctError(ce) } else { go o.Conn(co) @@ -126,52 +123,36 @@ func (o *srv) Listen(ctx context.Context) error { } func (o *srv) Conn(conn net.Conn) { - defer o.fctError(conn.Close()) + defer func() { + o.fctInfo(conn.LocalAddr(), conn.RemoteAddr(), libsck.ConnectionClose) + _ = conn.Close() + }() + + o.fctInfo(conn.LocalAddr(), conn.RemoteAddr(), libsck.ConnectionNew) var ( - lc = conn.LocalAddr() - rm = conn.RemoteAddr() - tr = o.timeoutRead() - tw = o.timeoutWrite() - br = o.buffRead() + err error + rdr = bufio.NewReaderSize(conn, o.buffSize()) + buf []byte + hdl libsck.Handler ) - defer o.fctInfo(lc, rm, libsck.ConnectionClose) - o.fctInfo(lc, rm, libsck.ConnectionNew) - - if !tr.IsZero() { - if e := conn.SetReadDeadline(tr); e != nil { - o.fctError(e) - return - } - } - - if !tw.IsZero() { - if e := conn.SetReadDeadline(tw); e != nil { - o.fctError(e) - return - } - } - - o.fctInfo(lc, rm, libsck.ConnectionRead) - if _, e := io.Copy(br, conn); e != nil { - o.fctError(e) + if hdl = o.handler(); hdl == nil { return } - o.fctInfo(lc, rm, libsck.ConnectionCloseRead) - if e := conn.(*net.TCPConn).CloseRead(); e != nil { - o.fctError(e) - return - } + for { + buf, err = rdr.ReadBytes('\n') - if h := o.handler(); h != nil { - o.fctInfo(lc, rm, libsck.ConnectionHandler) - h(br, conn) - } + o.fctInfo(conn.LocalAddr(), conn.RemoteAddr(), libsck.ConnectionRead) + if err != nil { + if err != io.EOF { + o.fctError(err) + } + break + } - o.fctInfo(lc, rm, libsck.ConnectionCloseWrite) - if e := conn.(*net.TCPConn).CloseWrite(); e != nil { - o.fctError(e) + o.fctInfo(conn.LocalAddr(), conn.RemoteAddr(), libsck.ConnectionHandler) + hdl(bytes.NewBuffer(buf), conn) } } diff --git a/socket/server/tcp/model.go b/socket/server/tcp/model.go index b362de1..cfed404 100644 --- a/socket/server/tcp/model.go +++ b/socket/server/tcp/model.go @@ -30,10 +30,9 @@ import ( "crypto/tls" "fmt" "net" - "net/url" - "strconv" "sync/atomic" - "time" + + libptc "github.com/nabbar/golib/network/protocol" libtls "github.com/nabbar/golib/certificates" libsck "github.com/nabbar/golib/socket" @@ -64,10 +63,7 @@ type srv struct { fi *atomic.Value // function info fs *atomic.Value // function info server - tr *atomic.Value // connection read timeout - tw *atomic.Value // connection write timeout sr *atomic.Int32 // read buffer size - ad *atomic.Value // Server address url } @@ -134,38 +130,14 @@ func (o *srv) RegisterFuncInfoServer(f libsck.FuncInfoSrv) { o.fs.Store(f) } -func (o *srv) SetReadTimeout(d time.Duration) { - if o == nil { - return - } - - o.tr.Store(d) -} - -func (o *srv) SetWriteTimeout(d time.Duration) { - if o == nil { - return - } - - o.tw.Store(d) -} - func (o *srv) RegisterServer(address string) error { - var u = &url.URL{ - Host: address, + if len(address) < 1 { + return ErrInvalidAddress + } else if _, err := net.ResolveTCPAddr(libptc.NetworkTCP.Code(), address); err != nil { + return err } - if len(u.Hostname()) < 1 { - return ErrInvalidHostName - } else if len(u.Port()) < 1 { - return ErrInvalidHostPort - } else if i, e := strconv.Atoi(u.Port()); e != nil { - return e - } else if i < 1 || i > 65534 { - return ErrInvalidHostPort - } - - o.ad.Store(u) + o.ad.Store(address) return nil } diff --git a/socket/server/udp/error.go b/socket/server/udp/error.go index d1e4100..afabc1a 100644 --- a/socket/server/udp/error.go +++ b/socket/server/udp/error.go @@ -29,9 +29,8 @@ package udp import "fmt" var ( - ErrInvalidAddress = fmt.Errorf("invalid listen address") - ErrInvalidHostName = fmt.Errorf("invalid server host name") - ErrInvalidHostPort = fmt.Errorf("invalid server host port") - ErrContextClosed = fmt.Errorf("context closed") - ErrServerClosed = fmt.Errorf("server closed") + ErrInvalidAddress = fmt.Errorf("invalid listen address") + ErrContextClosed = fmt.Errorf("context closed") + ErrServerClosed = fmt.Errorf("server closed") + ErrInvalidHandler = fmt.Errorf("invalid handler") ) diff --git a/socket/server/udp/inerface.go b/socket/server/udp/inerface.go index 760a578..d983300 100644 --- a/socket/server/udp/inerface.go +++ b/socket/server/udp/inerface.go @@ -59,8 +59,6 @@ func New(h libsck.Handler, sizeBuffRead libsiz.Size) ServerTcp { fe: new(atomic.Value), fi: new(atomic.Value), fs: new(atomic.Value), - tr: new(atomic.Value), - tw: new(atomic.Value), sr: sr, ad: new(atomic.Value), } diff --git a/socket/server/udp/listener.go b/socket/server/udp/listener.go index 0792e88..92df4b8 100644 --- a/socket/server/udp/listener.go +++ b/socket/server/udp/listener.go @@ -31,71 +31,83 @@ import ( "context" "io" "net" - "net/url" - "time" libptc "github.com/nabbar/golib/network/protocol" libsck "github.com/nabbar/golib/socket" ) -func (o *srv) timeoutRead() time.Time { - v := o.tr.Load() - if v != nil { - return time.Now().Add(v.(time.Duration)) - } - - return time.Time{} -} - -func (o *srv) timeoutWrite() time.Time { - v := o.tw.Load() - if v != nil { - return time.Now().Add(v.(time.Duration)) - } - - return time.Time{} -} - -func (o *srv) buffRead() *bytes.Buffer { +func (o *srv) buffSize() int { v := o.sr.Load() if v > 0 { - return bytes.NewBuffer(make([]byte, 0, int(v))) + return int(v) } - return bytes.NewBuffer(make([]byte, 0, libsck.DefaultBufferSize)) + return libsck.DefaultBufferSize } -func (o *srv) getAddress() *url.URL { +func (o *srv) getAddress() string { f := o.ad.Load() + if f != nil { - return f.(*url.URL) + return f.(string) } - return nil + return "" +} + +func (o *srv) getListen(addr string) (*net.UDPConn, error) { + var ( + adr *net.UDPAddr + lis *net.UDPConn + err error + ) + + if adr, err = net.ResolveUDPAddr(libptc.NetworkUDP.Code(), addr); err != nil { + return nil, err + } else if lis, err = net.ListenUDP(libptc.NetworkUDP.Code(), adr); err != nil { + return lis, err + } else { + o.fctInfoSrv("starting listening socket '%s %s'", libptc.NetworkUDP.String(), addr) + } + + return lis, nil } func (o *srv) Listen(ctx context.Context) error { var ( - e error - l net.Listener - a = o.getAddress() + err error + nbr int + loc *net.UDPAddr + rem net.Addr + con *net.UDPConn + adr = o.getAddress() + hdl libsck.Handler ) - if a == nil { + if len(adr) == 0 { return ErrInvalidAddress - } else if l, e = net.Listen(libptc.NetworkUDP.Code(), a.Host); e != nil { - return e + } else if hdl = o.handler(); hdl == nil { + return ErrInvalidHandler + } else if loc, err = net.ResolveUDPAddr(libptc.NetworkUDP.Code(), adr); err != nil { + return err } var fctClose = func() { - if l != nil { - o.fctError(l.Close()) + o.fctInfoSrv("closing listen socket '%s %s'", libptc.NetworkUDP.String(), adr) + if con != nil { + o.fctError(con.Close()) } } - o.fctInfoSrv("starting listening socket 'TLS %s %s'", libptc.NetworkUDP.String(), a.Host) defer fctClose() + if con, err = net.ListenUDP(libptc.NetworkUDP.Code(), loc); err != nil { + o.fctError(err) + return err + } else { + o.fctInfoSrv("starting listening socket '%s %s'", libptc.NetworkUDP.String(), adr) + } + // Accept new connection or stop if context or shutdown trigger for { select { @@ -105,53 +117,30 @@ func (o *srv) Listen(ctx context.Context) error { return nil default: // Accept an incoming connection. - if l == nil { + if con == nil { return ErrServerClosed - } else if co, ce := l.Accept(); ce != nil { - o.fctError(ce) - } else { - go o.Conn(co) } + + var buf = make([]byte, o.buffSize()) + nbr, rem, err = con.ReadFrom(buf) + + if rem == nil { + rem = &net.UDPAddr{} + } + + o.fctInfo(loc, rem, libsck.ConnectionRead) + + if err != nil { + o.fctError(err) + continue + } + + go func(la, ra net.Addr, b []byte) { + o.fctInfo(la, ra, libsck.ConnectionHandler) + r := bytes.NewBuffer(b) + r.WriteString("\n") + hdl(r, io.Discard) + }(loc, rem, buf[:nbr]) } } } - -func (o *srv) Conn(conn net.Conn) { - defer o.fctError(conn.Close()) - - var ( - lc = conn.LocalAddr() - rm = conn.RemoteAddr() - tr = o.timeoutRead() - tw = o.timeoutWrite() - br = o.buffRead() - ) - - defer o.fctInfo(lc, rm, libsck.ConnectionClose) - o.fctInfo(lc, rm, libsck.ConnectionNew) - - if !tr.IsZero() { - if e := conn.SetReadDeadline(tr); e != nil { - o.fctError(e) - return - } - } - - if !tw.IsZero() { - if e := conn.SetReadDeadline(tw); e != nil { - o.fctError(e) - return - } - } - - o.fctInfo(lc, rm, libsck.ConnectionRead) - if _, e := io.Copy(br, conn); e != nil { - o.fctError(e) - return - } - - if h := o.handler(); h != nil { - o.fctInfo(lc, rm, libsck.ConnectionHandler) - h(br, conn) - } -} diff --git a/socket/server/udp/model.go b/socket/server/udp/model.go index dacf9a8..15a84d9 100644 --- a/socket/server/udp/model.go +++ b/socket/server/udp/model.go @@ -29,10 +29,9 @@ package udp import ( "fmt" "net" - "net/url" - "strconv" "sync/atomic" - "time" + + libptc "github.com/nabbar/golib/network/protocol" libtls "github.com/nabbar/golib/certificates" libsck "github.com/nabbar/golib/socket" @@ -58,10 +57,7 @@ type srv struct { fi *atomic.Value // function info fs *atomic.Value // function info server - tr *atomic.Value // connection read timeout - tw *atomic.Value // connection write timeout sr *atomic.Int32 // read buffer size - ad *atomic.Value // Server address url } @@ -113,38 +109,14 @@ func (o *srv) RegisterFuncInfoServer(f libsck.FuncInfoSrv) { o.fs.Store(f) } -func (o *srv) SetReadTimeout(d time.Duration) { - if o == nil { - return - } - - o.tr.Store(d) -} - -func (o *srv) SetWriteTimeout(d time.Duration) { - if o == nil { - return - } - - o.tw.Store(d) -} - func (o *srv) RegisterServer(address string) error { - var u = &url.URL{ - Host: address, + if len(address) < 1 { + return ErrInvalidAddress + } else if _, err := net.ResolveUDPAddr(libptc.NetworkUDP.Code(), address); err != nil { + return err } - if len(u.Hostname()) < 1 { - return ErrInvalidHostName - } else if len(u.Port()) < 1 { - return ErrInvalidHostPort - } else if i, e := strconv.Atoi(u.Port()); e != nil { - return e - } else if i < 1 || i > 65534 { - return ErrInvalidHostPort - } - - o.ad.Store(u) + o.ad.Store(address) return nil } diff --git a/socket/server/unix/error.go b/socket/server/unix/error.go index a366d6f..2f0e184 100644 --- a/socket/server/unix/error.go +++ b/socket/server/unix/error.go @@ -32,6 +32,8 @@ package unix import "fmt" var ( - ErrContextClosed = fmt.Errorf("context closed") - ErrServerClosed = fmt.Errorf("server closed") + ErrContextClosed = fmt.Errorf("context closed") + ErrServerClosed = fmt.Errorf("server closed") + ErrInvalidGroup = fmt.Errorf("invalid unix group for socket group permission") + ErrInvalidHandler = fmt.Errorf("invalid handler") ) diff --git a/socket/server/unix/ignore.go b/socket/server/unix/ignore.go index 5974a97..c5b6188 100644 --- a/socket/server/unix/ignore.go +++ b/socket/server/unix/ignore.go @@ -25,3 +25,5 @@ */ package unix + +// this file is to prevent error on build with system not compatible with unix diff --git a/socket/server/unix/inerface.go b/socket/server/unix/inerface.go index 73d5c9d..832d8ac 100644 --- a/socket/server/unix/inerface.go +++ b/socket/server/unix/inerface.go @@ -37,9 +37,11 @@ import ( libsck "github.com/nabbar/golib/socket" ) +const maxGID = 32767 + type ServerUnix interface { libsck.Server - RegisterSocket(unixFile string, perm os.FileMode) + RegisterSocket(unixFile string, perm os.FileMode, gid int32) error } func New(h libsck.Handler, sizeBuffRead libsiz.Size) ServerUnix { @@ -64,6 +66,10 @@ func New(h libsck.Handler, sizeBuffRead libsiz.Size) ServerUnix { sp := new(atomic.Int64) sp.Store(0) + // socket group permission + sg := new(atomic.Int32) + sg.Store(0) + return &srv{ l: nil, h: f, @@ -72,10 +78,9 @@ func New(h libsck.Handler, sizeBuffRead libsiz.Size) ServerUnix { fe: new(atomic.Value), fi: new(atomic.Value), fs: new(atomic.Value), - tr: new(atomic.Value), - tw: new(atomic.Value), sr: sr, sf: sf, sp: sp, + sg: sg, } } diff --git a/socket/server/unix/listener.go b/socket/server/unix/listener.go index 52382b9..ed66467 100644 --- a/socket/server/unix/listener.go +++ b/socket/server/unix/listener.go @@ -30,6 +30,7 @@ package unix import ( + "bufio" "bytes" "context" "errors" @@ -40,45 +41,24 @@ import ( "os" "path/filepath" "syscall" - "time" libptc "github.com/nabbar/golib/network/protocol" + libsck "github.com/nabbar/golib/socket" ) -func (o *srv) timeoutRead() time.Time { - v := o.tr.Load() - if v == nil { - return time.Time{} - } else if d, k := v.(time.Duration); !k { - return time.Time{} - } else if d > 0 { - return time.Now().Add(v.(time.Duration)) +func (o *srv) buffSize() int { + v := o.sr.Load() + + if v > 0 { + return int(v) } - return time.Time{} -} - -func (o *srv) timeoutWrite() time.Time { - v := o.tw.Load() - if v == nil { - return time.Time{} - } else if d, k := v.(time.Duration); !k { - return time.Time{} - } else if d > 0 { - return time.Now().Add(v.(time.Duration)) - } - - return time.Time{} + return libsck.DefaultBufferSize } func (o *srv) buffRead() *bytes.Buffer { - v := o.sr.Load() - if v > 0 { - return bytes.NewBuffer(make([]byte, 0, int(v))) - } - - return bytes.NewBuffer(make([]byte, 0, libsck.DefaultBufferSize)) + return bytes.NewBuffer(make([]byte, o.buffSize())) } func (o *srv) getSocketFile() (string, error) { @@ -99,6 +79,20 @@ func (o *srv) getSocketPerm() os.FileMode { return os.FileMode(0770) } +func (o *srv) getSocketGroup() int { + p := o.sg.Load() + if p >= 0 { + return int(p) + } + + gid := syscall.Getgid() + if gid <= maxGID { + return gid + } + + return 0 +} + func (o *srv) checkFile(unixFile string) (string, error) { if len(unixFile) < 1 { return unixFile, fmt.Errorf("missing socket file path") @@ -117,47 +111,73 @@ func (o *srv) checkFile(unixFile string) (string, error) { return unixFile, nil } +func (o *srv) getListen(uxf string) (net.Listener, error) { + var ( + err error + prm = o.getSocketPerm() + grp = o.getSocketGroup() + old int + inf fs.FileInfo + lis net.Listener + ) + + old = syscall.Umask(int(prm)) + defer func() { + syscall.Umask(old) + }() + + if lis, err = net.Listen(libptc.NetworkUnix.Code(), uxf); err != nil { + return nil, err + } else if inf, err = os.Stat(uxf); err != nil { + _ = lis.Close() + return nil, err + } else if inf.Mode() != prm { + if err = os.Chmod(uxf, prm); err != nil { + _ = lis.Close() + return nil, err + } + } + + if stt, ok := inf.Sys().(*syscall.Stat_t); ok { + if int(stt.Gid) != grp { + if err = os.Chown(uxf, syscall.Getuid(), grp); err != nil { + _ = lis.Close() + return nil, err + } + } + } + + o.fctInfoSrv("starting listening socket '%s %s'", libptc.NetworkUnix.String(), uxf) + return lis, nil +} + func (o *srv) Listen(ctx context.Context) error { var ( e error - i fs.FileInfo + f string l net.Listener - - perm = o.getSocketPerm() - unixFile string - - p = syscall.Umask(int(perm)) ) - if unixFile, e = o.getSocketFile(); e != nil { - return e - } else if l, e = net.Listen(libptc.NetworkUnix.Code(), unixFile); e != nil { - return e - } else if i, e = os.Stat(unixFile); e != nil { + if f, e = o.getSocketFile(); e != nil { return e } - syscall.Umask(p) - var fctClose = func() { if l != nil { o.fctError(l.Close()) } - if i, e = os.Stat(unixFile); e == nil { - o.fctError(os.Remove(unixFile)) + if _, e = os.Stat(f); e == nil { + o.fctError(os.Remove(f)) } } - o.fctInfoSrv("starting listening socket 'TLS %s %s'", libptc.NetworkUnix.String(), unixFile) + if l, e = o.getListen(f); e != nil { + return e + } + defer fctClose() - if i.Mode() != perm { - if e = os.Chmod(unixFile, perm); e != nil { - return e - } - } - // Accept new connection or stop if context or shutdown trigger for { select { @@ -172,63 +192,44 @@ func (o *srv) Listen(ctx context.Context) error { } else if co, ce := l.Accept(); ce != nil { o.fctError(ce) } else { + o.fctInfo(co.LocalAddr(), co.RemoteAddr(), libsck.ConnectionNew) go o.Conn(co) } } } } -func (o *srv) Conn(conn net.Conn) { +func (o *srv) Conn(con net.Conn) { defer func() { - e := conn.Close() - o.fctError(e) + o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionClose) + _ = con.Close() }() + o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionNew) + var ( - lc = conn.LocalAddr() - rm = conn.RemoteAddr() - tr = o.timeoutRead() - tw = o.timeoutWrite() - br = o.buffRead() + err error + rdr = bufio.NewReaderSize(con, o.buffSize()) + buf []byte + hdl libsck.Handler ) - defer o.fctInfo(lc, rm, libsck.ConnectionClose) - - o.fctInfo(lc, rm, libsck.ConnectionNew) - - if !tr.IsZero() { - if e := conn.SetReadDeadline(tr); e != nil { - o.fctError(e) - return - } - } - - if !tw.IsZero() { - if e := conn.SetReadDeadline(tw); e != nil { - o.fctError(e) - return - } - } - - o.fctInfo(lc, rm, libsck.ConnectionRead) - if _, e := io.Copy(br, conn); e != nil { - o.fctError(e) + if hdl = o.handler(); hdl == nil { return } - o.fctInfo(lc, rm, libsck.ConnectionCloseRead) - if e := conn.(*net.UnixConn).CloseRead(); e != nil { - o.fctError(e) - return - } + for { + buf, err = rdr.ReadBytes('\n') - if h := o.handler(); h != nil { - o.fctInfo(lc, rm, libsck.ConnectionHandler) - h(br, conn) - } + o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionRead) + if err != nil { + if err != io.EOF { + o.fctError(err) + } + break + } - o.fctInfo(lc, rm, libsck.ConnectionCloseWrite) - if e := conn.(*net.UnixConn).CloseWrite(); e != nil { - o.fctError(e) + o.fctInfo(con.LocalAddr(), con.RemoteAddr(), libsck.ConnectionHandler) + hdl(bytes.NewBuffer(buf), con) } } diff --git a/socket/server/unix/model.go b/socket/server/unix/model.go index 7f5073c..8ba1d1e 100644 --- a/socket/server/unix/model.go +++ b/socket/server/unix/model.go @@ -34,7 +34,8 @@ import ( "net" "os" "sync/atomic" - "time" + + libptc "github.com/nabbar/golib/network/protocol" libtls "github.com/nabbar/golib/certificates" libsck "github.com/nabbar/golib/socket" @@ -60,12 +61,10 @@ type srv struct { fi *atomic.Value // function info fs *atomic.Value // function info server - tr *atomic.Value // connection read timeout - tw *atomic.Value // connection write timeout sr *atomic.Int32 // read buffer size - sf *atomic.Value // file unix socket sp *atomic.Int64 // file unix perm + sg *atomic.Int32 // file unix group perm } func (o *srv) Done() <-chan struct{} { @@ -116,25 +115,18 @@ func (o *srv) RegisterFuncInfoServer(f libsck.FuncInfoSrv) { o.fs.Store(f) } -func (o *srv) SetReadTimeout(d time.Duration) { - if o == nil { - return +func (o *srv) RegisterSocket(unixFile string, perm os.FileMode, gid int32) error { + if _, err := net.ResolveUnixAddr(libptc.NetworkUnix.Code(), unixFile); err != nil { + return err + } else if gid > maxGID { + return ErrInvalidGroup } - o.tr.Store(d) -} - -func (o *srv) SetWriteTimeout(d time.Duration) { - if o == nil { - return - } - - o.tw.Store(d) -} - -func (o *srv) RegisterSocket(unixFile string, perm os.FileMode) { o.sf.Store(unixFile) o.sp.Store(int64(perm)) + o.sg.Store(gid) + + return nil } func (o *srv) fctError(e error) { diff --git a/socket/server/unixgram/error.go b/socket/server/unixgram/error.go new file mode 100644 index 0000000..8ccc85d --- /dev/null +++ b/socket/server/unixgram/error.go @@ -0,0 +1,39 @@ +//go:build linux +// +build linux + +/* + * MIT License + * + * Copyright (c) 2022 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * + */ + +package unixgram + +import "fmt" + +var ( + ErrContextClosed = fmt.Errorf("context closed") + ErrServerClosed = fmt.Errorf("server closed") + ErrInvalidGroup = fmt.Errorf("invalid unix group for socket group permission") + ErrInvalidHandler = fmt.Errorf("invalid handler") +) diff --git a/socket/server/unixgram/ignore.go b/socket/server/unixgram/ignore.go new file mode 100644 index 0000000..bf209ae --- /dev/null +++ b/socket/server/unixgram/ignore.go @@ -0,0 +1,29 @@ +/* + * MIT License + * + * Copyright (c) 2022 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * + */ + +package unixgram + +// this file is to prevent error on build with system not compatible with unix diff --git a/socket/server/unixgram/inerface.go b/socket/server/unixgram/inerface.go new file mode 100644 index 0000000..14e073e --- /dev/null +++ b/socket/server/unixgram/inerface.go @@ -0,0 +1,86 @@ +//go:build linux +// +build linux + +/* + * MIT License + * + * Copyright (c) 2022 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * + */ + +package unixgram + +import ( + "os" + "sync/atomic" + + libsiz "github.com/nabbar/golib/size" + libsck "github.com/nabbar/golib/socket" +) + +const maxGID = 32767 + +type ServerUnixGram interface { + libsck.Server + RegisterSocket(unixFile string, perm os.FileMode, gid int32) error +} + +func New(h libsck.Handler, sizeBuffRead libsiz.Size) ServerUnixGram { + c := new(atomic.Value) + c.Store(make(chan []byte)) + + s := new(atomic.Value) + s.Store(make(chan struct{})) + + f := new(atomic.Value) + f.Store(h) + + // socket read buff size + sr := new(atomic.Int32) + sr.Store(sizeBuffRead.Int32()) + + // socket file + sf := new(atomic.Value) + sf.Store("") + + // socket permission + sp := new(atomic.Int64) + sp.Store(0) + + // socket group permission + sg := new(atomic.Int32) + sg.Store(0) + + return &srv{ + l: nil, + h: f, + c: c, + s: s, + fe: new(atomic.Value), + fi: new(atomic.Value), + fs: new(atomic.Value), + sr: sr, + sf: sf, + sp: sp, + sg: sg, + } +} diff --git a/socket/server/unixgram/listener.go b/socket/server/unixgram/listener.go new file mode 100644 index 0000000..cc0a986 --- /dev/null +++ b/socket/server/unixgram/listener.go @@ -0,0 +1,218 @@ +//go:build linux +// +build linux + +/* + * MIT License + * + * Copyright (c) 2022 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * + */ + +package unixgram + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "io/fs" + "net" + "os" + "path/filepath" + "syscall" + + libptc "github.com/nabbar/golib/network/protocol" + + libsck "github.com/nabbar/golib/socket" +) + +func (o *srv) buffSize() int { + v := o.sr.Load() + + if v > 0 { + return int(v) + } + + return libsck.DefaultBufferSize +} + +func (o *srv) getSocketFile() (string, error) { + f := o.sf.Load() + if f != nil { + return o.checkFile(f.(string)) + } + + return "", os.ErrNotExist +} + +func (o *srv) getSocketPerm() os.FileMode { + p := o.sp.Load() + if p > 0 { + return os.FileMode(p) + } + + return os.FileMode(0770) +} + +func (o *srv) getSocketGroup() int { + p := o.sg.Load() + if p >= 0 { + return int(p) + } + + gid := syscall.Getgid() + if gid <= maxGID { + return gid + } + + return 0 +} + +func (o *srv) checkFile(unixFile string) (string, error) { + if len(unixFile) < 1 { + return unixFile, fmt.Errorf("missing socket file path") + } else { + unixFile = filepath.Join(filepath.Dir(unixFile), filepath.Base(unixFile)) + } + + if _, e := os.Stat(unixFile); e != nil && !errors.Is(e, os.ErrNotExist) { + return unixFile, e + } else if e != nil { + return unixFile, nil + } else if e = os.Remove(unixFile); e != nil { + return unixFile, e + } + + return unixFile, nil +} + +func (o *srv) getListen(uxf string, adr *net.UnixAddr) (*net.UnixConn, error) { + var ( + err error + prm = o.getSocketPerm() + grp = o.getSocketGroup() + old int + inf fs.FileInfo + lis *net.UnixConn + ) + + old = syscall.Umask(int(prm)) + defer func() { + syscall.Umask(old) + }() + + if lis, err = net.ListenUnixgram(libptc.NetworkUnixGram.Code(), adr); err != nil { + return nil, err + } else if inf, err = os.Stat(uxf); err != nil { + _ = lis.Close() + return nil, err + } else if inf.Mode() != prm { + if err = os.Chmod(uxf, prm); err != nil { + _ = lis.Close() + return nil, err + } + } + + if stt, ok := inf.Sys().(*syscall.Stat_t); ok { + if int(stt.Gid) != grp { + if err = os.Chown(uxf, syscall.Getuid(), grp); err != nil { + _ = lis.Close() + return nil, err + } + } + } + + o.fctInfoSrv("starting listening socket '%s %s'", libptc.NetworkUnixGram.String(), uxf) + return lis, nil +} + +func (o *srv) Listen(ctx context.Context) error { + var ( + err error + nbr int + uxf string + loc *net.UnixAddr + rem net.Addr + con *net.UnixConn + hdl libsck.Handler + ) + + if uxf, err = o.getSocketFile(); err != nil { + return err + } else if hdl = o.handler(); hdl == nil { + return ErrInvalidHandler + } else if loc, err = net.ResolveUnixAddr(libptc.NetworkUnixGram.Code(), uxf); err != nil { + return err + } else if con, err = o.getListen(uxf, loc); err != nil { + return err + } + + var fctClose = func() { + if con != nil { + o.fctInfoSrv("closing listen socket '%s %s'", libptc.NetworkUnixGram.String(), uxf) + o.fctError(con.Close()) + } + + if _, err = os.Stat(uxf); err == nil { + o.fctError(os.Remove(uxf)) + } + } + + defer fctClose() + + // Accept new connection or stop if context or shutdown trigger + for { + select { + case <-ctx.Done(): + return ErrContextClosed + case <-o.Done(): + return nil + default: + // Accept an incoming connection. + if con == nil { + return ErrServerClosed + } + + var buf = make([]byte, o.buffSize()) + nbr, rem, err = con.ReadFrom(buf) + + if rem == nil { + rem = &net.UnixAddr{} + } + + o.fctInfo(loc, rem, libsck.ConnectionRead) + + if err != nil { + o.fctError(err) + continue + } + + go func(la, ra net.Addr, b []byte) { + o.fctInfo(la, ra, libsck.ConnectionHandler) + r := bytes.NewBuffer(b) + r.WriteString("\n") + hdl(r, io.Discard) + }(loc, rem, buf[:nbr]) + } + } +} diff --git a/socket/server/unixgram/model.go b/socket/server/unixgram/model.go new file mode 100644 index 0000000..fe4f6ae --- /dev/null +++ b/socket/server/unixgram/model.go @@ -0,0 +1,176 @@ +//go:build linux +// +build linux + +/* + * MIT License + * + * Copyright (c) 2022 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * + */ + +package unixgram + +import ( + "fmt" + "net" + "os" + "sync/atomic" + + libptc "github.com/nabbar/golib/network/protocol" + + libtls "github.com/nabbar/golib/certificates" + libsck "github.com/nabbar/golib/socket" +) + +var ( + closedChanStruct chan struct{} +) + +func init() { + closedChanStruct = make(chan struct{}) + close(closedChanStruct) +} + +type srv struct { + l net.Listener + + h *atomic.Value // handler + c *atomic.Value // chan []byte + s *atomic.Value // chan struct{} + + fe *atomic.Value // function error + fi *atomic.Value // function info + fs *atomic.Value // function info server + + sr *atomic.Int32 // read buffer size + sf *atomic.Value // file unix socket + sp *atomic.Int64 // file unix perm + sg *atomic.Int32 // file unix group perm +} + +func (o *srv) Done() <-chan struct{} { + s := o.s.Load() + if s != nil { + return s.(chan struct{}) + } + + return closedChanStruct +} + +func (o *srv) Shutdown() { + if o == nil { + return + } + + s := o.s.Load() + if s != nil { + o.s.Store(nil) + } +} + +func (o *srv) SetTLS(enable bool, config libtls.TLSConfig) error { + return nil +} + +func (o *srv) RegisterFuncError(f libsck.FuncError) { + if o == nil { + return + } + + o.fe.Store(f) +} + +func (o *srv) RegisterFuncInfo(f libsck.FuncInfo) { + if o == nil { + return + } + + o.fi.Store(f) +} + +func (o *srv) RegisterFuncInfoServer(f libsck.FuncInfoSrv) { + if o == nil { + return + } + + o.fs.Store(f) +} + +func (o *srv) RegisterSocket(unixFile string, perm os.FileMode, gid int32) error { + if _, err := net.ResolveUnixAddr(libptc.NetworkUnixGram.Code(), unixFile); err != nil { + return err + } else if gid > maxGID { + return ErrInvalidGroup + } + + o.sf.Store(unixFile) + o.sp.Store(int64(perm)) + o.sg.Store(gid) + + return nil +} + +func (o *srv) fctError(e error) { + if o == nil { + return + } + + v := o.fe.Load() + if v != nil { + v.(libsck.FuncError)(e) + } +} + +func (o *srv) fctInfo(local, remote net.Addr, state libsck.ConnState) { + if o == nil { + return + } + + v := o.fi.Load() + if v != nil { + v.(libsck.FuncInfo)(local, remote, state) + } +} + +func (o *srv) fctInfoSrv(msg string, args ...interface{}) { + if o == nil { + return + } + + v := o.fs.Load() + if v != nil { + v.(libsck.FuncInfoSrv)(fmt.Sprintf(msg, args...)) + } +} + +func (o *srv) handler() libsck.Handler { + if o == nil { + return nil + } + + v := o.h.Load() + if v != nil { + return v.(libsck.Handler) + } + + return nil +} diff --git a/test/test-socket-client-tcp/main.go b/test/test-socket-client-tcp/main.go new file mode 100644 index 0000000..5737f65 --- /dev/null +++ b/test/test-socket-client-tcp/main.go @@ -0,0 +1,102 @@ +/* + * MIT License + * + * Copyright (c) 2022 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * + */ + +package main + +import ( + "bytes" + "context" + "fmt" + "io" + "net" + "os" + "time" + + netptl "github.com/nabbar/golib/network/protocol" + libsiz "github.com/nabbar/golib/size" + libsck "github.com/nabbar/golib/socket" + sckcfg "github.com/nabbar/golib/socket/config" +) + +func config() sckcfg.ClientConfig { + return sckcfg.ClientConfig{ + Network: netptl.NetworkTCP, + Address: ":9000", + ReadBuffSize: 32 * libsiz.SizeKilo, + } +} + +func printError(err ...error) { + for _, e := range err { + if e == nil { + continue + } + _, _ = fmt.Fprintf(os.Stderr, "Error: %v\n", e) + } +} + +func checkPanic(err ...error) { + var found = false + for _, e := range err { + if e == nil { + continue + } + + found = true + printError(err...) + break + } + + if found { + panic(nil) + } +} + +func request() io.Reader { + var buf = bytes.NewBuffer(make([]byte, 0)) + buf.WriteString(fmt.Sprintf("<14>%s myapp: This is a sample syslog message #%d\n", time.Now().Format(time.RFC3339), buf.Len())) + buf.WriteString(fmt.Sprintf("<14>%s myapp: This is a sample syslog message #%d\n", time.Now().Format(time.RFC3339), buf.Len())) + buf.WriteString(fmt.Sprintf("<14>%s myapp: This is a sample syslog message #%d\n", time.Now().Format(time.RFC3339), buf.Len())) + buf.WriteString(fmt.Sprintf("<14>%s myapp: This is a sample syslog message #%d\n", time.Now().Format(time.RFC3339), buf.Len())) + return buf +} + +func main() { + cli, err := config().New() + checkPanic(err) + + cli.RegisterFuncError(func(e error) { + printError(e) + }) + cli.RegisterFuncInfo(func(local, remote net.Addr, state libsck.ConnState) { + _, _ = fmt.Fprintf(os.Stdout, "[%s %s]=>[%s %s] %s\n", remote.Network(), remote.String(), local.Network(), local.String(), state.String()) + }) + + checkPanic(cli.Do(context.Background(), request(), func(r io.Reader) { + _, e := io.Copy(os.Stdout, r) + printError(e) + })) +} diff --git a/test/test-socket-client-udp/main.go b/test/test-socket-client-udp/main.go new file mode 100644 index 0000000..ab993da --- /dev/null +++ b/test/test-socket-client-udp/main.go @@ -0,0 +1,98 @@ +/* + * MIT License + * + * Copyright (c) 2022 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * + */ + +package main + +import ( + "bytes" + "context" + "fmt" + "io" + "net" + "os" + "time" + + netptl "github.com/nabbar/golib/network/protocol" + libsiz "github.com/nabbar/golib/size" + libsck "github.com/nabbar/golib/socket" + sckcfg "github.com/nabbar/golib/socket/config" +) + +func config() sckcfg.ClientConfig { + return sckcfg.ClientConfig{ + Network: netptl.NetworkUDP, + Address: ":9001", + ReadBuffSize: 32 * libsiz.SizeKilo, + } +} + +func printError(err ...error) { + for _, e := range err { + if e == nil { + continue + } + _, _ = fmt.Fprintf(os.Stderr, "Error: %v\n", e) + } +} + +func checkPanic(err ...error) { + var found = false + for _, e := range err { + if e == nil { + continue + } + + found = true + printError(err...) + break + } + + if found { + panic(nil) + } +} + +func request() io.Reader { + var buf = bytes.NewBuffer(make([]byte, 0)) + for i := 1; i <= 100; i++ { + buf.WriteString(fmt.Sprintf("<14>%s myapp: This is a sample syslog message #%d buff: %d\n", time.Now().Format(time.RFC3339), i, buf.Len())) + } + return buf +} + +func main() { + cli, err := config().New() + checkPanic(err) + + cli.RegisterFuncError(func(e error) { + printError(e) + }) + cli.RegisterFuncInfo(func(local, remote net.Addr, state libsck.ConnState) { + _, _ = fmt.Fprintf(os.Stdout, "[%s %s]=>[%s %s] %s\n", remote.Network(), remote.String(), local.Network(), local.String(), state.String()) + }) + + checkPanic(cli.Do(context.Background(), request(), nil)) +} diff --git a/test/test-socket-client-unix/main.go b/test/test-socket-client-unix/main.go new file mode 100644 index 0000000..7133a37 --- /dev/null +++ b/test/test-socket-client-unix/main.go @@ -0,0 +1,99 @@ +/* + * MIT License + * + * Copyright (c) 2022 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * + */ + +package main + +import ( + "bytes" + "context" + "fmt" + "io" + "net" + "os" + "time" + + netptl "github.com/nabbar/golib/network/protocol" + libsiz "github.com/nabbar/golib/size" + libsck "github.com/nabbar/golib/socket" + sckcfg "github.com/nabbar/golib/socket/config" +) + +func config() sckcfg.ClientConfig { + return sckcfg.ClientConfig{ + Network: netptl.NetworkUnixGram, + Address: "/tmp/test-server-unix.sock", + ReadBuffSize: 32 * libsiz.SizeKilo, + } +} + +func printError(err ...error) { + for _, e := range err { + if e == nil { + continue + } + _, _ = fmt.Fprintf(os.Stderr, "Error: %v\n", e) + } +} + +func checkPanic(err ...error) { + var found = false + for _, e := range err { + if e == nil { + continue + } + + found = true + printError(err...) + break + } + + if found { + panic(nil) + } +} + +func request() io.Reader { + var buf = bytes.NewBuffer(make([]byte, 0)) + buf.WriteString(fmt.Sprintf("<14>%s myapp: This is a sample syslog message #%d\n", time.Now().Format(time.RFC3339), buf.Len())) + buf.WriteString(fmt.Sprintf("<14>%s myapp: This is a sample syslog message #%d\n", time.Now().Format(time.RFC3339), buf.Len())) + buf.WriteString(fmt.Sprintf("<14>%s myapp: This is a sample syslog message #%d\n", time.Now().Format(time.RFC3339), buf.Len())) + buf.WriteString(fmt.Sprintf("<14>%s myapp: This is a sample syslog message #%d\n", time.Now().Format(time.RFC3339), buf.Len())) + return buf +} + +func main() { + cli, err := config().New() + checkPanic(err) + + cli.RegisterFuncError(func(e error) { + printError(e) + }) + cli.RegisterFuncInfo(func(local, remote net.Addr, state libsck.ConnState) { + _, _ = fmt.Fprintf(os.Stdout, "[%s %s]=>[%s %s] %s\n", remote.Network(), remote.String(), local.Network(), local.String(), state.String()) + }) + + checkPanic(cli.Do(context.Background(), request(), nil)) +} diff --git a/test/test-socket-server-tcp/10-rsyslog-omfwd-tcp.conf b/test/test-socket-server-tcp/10-rsyslog-omfwd-tcp.conf new file mode 100644 index 0000000..ae38eaf --- /dev/null +++ b/test/test-socket-server-tcp/10-rsyslog-omfwd-tcp.conf @@ -0,0 +1,9 @@ +## TCP Forward +if $syslogtag contains 'testme' then { + Action( + Type="omfwd" + Target="127.0.0.1" + Port="9000" + Protocol="tcp" + ) +} diff --git a/test/test-socket-server-tcp/main.go b/test/test-socket-server-tcp/main.go new file mode 100644 index 0000000..10c8ada --- /dev/null +++ b/test/test-socket-server-tcp/main.go @@ -0,0 +1,97 @@ +/* + * MIT License + * + * Copyright (c) 2022 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * + */ + +package main + +import ( + "context" + "fmt" + "io" + "net" + "os" + + netptl "github.com/nabbar/golib/network/protocol" + libsiz "github.com/nabbar/golib/size" + libsck "github.com/nabbar/golib/socket" + sckcfg "github.com/nabbar/golib/socket/config" +) + +func config() sckcfg.ServerConfig { + return sckcfg.ServerConfig{ + Network: netptl.NetworkTCP, + Address: ":9000", + PermFile: 0, + BuffSizeRead: 32 * libsiz.SizeKilo, + } +} + +func Handler(request io.Reader, response io.Writer) { + _, e := io.Copy(os.Stdout, request) + printError(e) +} + +func printError(err ...error) { + for _, e := range err { + if e == nil { + continue + } + _, _ = fmt.Fprintf(os.Stderr, "Error: %v\n", e) + } +} + +func checkPanic(err ...error) { + var found = false + for _, e := range err { + if e == nil { + continue + } + + found = true + printError(err...) + break + } + + if found { + panic(nil) + } +} + +func main() { + srv, err := config().New(Handler) + checkPanic(err) + + srv.RegisterFuncError(func(e error) { + printError(e) + }) + srv.RegisterFuncInfo(func(local, remote net.Addr, state libsck.ConnState) { + _, _ = fmt.Fprintf(os.Stdout, "[%s %s]=>[%s %s] %s\n", remote.Network(), remote.String(), local.Network(), local.String(), state.String()) + }) + srv.RegisterFuncInfoServer(func(msg string) { + _, _ = fmt.Fprintf(os.Stdout, "%s\n", msg) + }) + + checkPanic(srv.Listen(context.Background())) +} diff --git a/test/test-socket-server-udp/10-rsyslog-omfwd-udp.conf b/test/test-socket-server-udp/10-rsyslog-omfwd-udp.conf new file mode 100644 index 0000000..796870a --- /dev/null +++ b/test/test-socket-server-udp/10-rsyslog-omfwd-udp.conf @@ -0,0 +1,10 @@ +## UDP Forward +if $syslogtag contains 'testme' then { + Action( + Type="omfwd" + Target="127.0.0.1" + Port="9001" + Protocol="udp" + udp.sendToAll="on" + ) +} diff --git a/test/test-socket-server-udp/main.go b/test/test-socket-server-udp/main.go new file mode 100644 index 0000000..db01fc6 --- /dev/null +++ b/test/test-socket-server-udp/main.go @@ -0,0 +1,97 @@ +/* + * MIT License + * + * Copyright (c) 2022 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * + */ + +package main + +import ( + "context" + "fmt" + "io" + "net" + "os" + + netptl "github.com/nabbar/golib/network/protocol" + libsiz "github.com/nabbar/golib/size" + libsck "github.com/nabbar/golib/socket" + sckcfg "github.com/nabbar/golib/socket/config" +) + +func config() sckcfg.ServerConfig { + return sckcfg.ServerConfig{ + Network: netptl.NetworkUDP, + Address: ":9001", + PermFile: 0, + BuffSizeRead: 32 * libsiz.SizeKilo, + } +} + +func Handler(request io.Reader, response io.Writer) { + _, e := io.Copy(os.Stdout, request) + printError(e) +} + +func printError(err ...error) { + for _, e := range err { + if e == nil { + continue + } + _, _ = fmt.Fprintf(os.Stderr, "Error: %v\n", e) + } +} + +func checkPanic(err ...error) { + var found = false + for _, e := range err { + if e == nil { + continue + } + + found = true + printError(err...) + break + } + + if found { + panic(nil) + } +} + +func main() { + srv, err := config().New(Handler) + checkPanic(err) + + srv.RegisterFuncError(func(e error) { + printError(e) + }) + srv.RegisterFuncInfo(func(local, remote net.Addr, state libsck.ConnState) { + _, _ = fmt.Fprintf(os.Stdout, "[%s %s]=>[%s %s] %s\n", remote.Network(), remote.String(), local.Network(), local.String(), state.String()) + }) + srv.RegisterFuncInfoServer(func(msg string) { + _, _ = fmt.Fprintf(os.Stdout, "%s\n", msg) + }) + + checkPanic(srv.Listen(context.Background())) +} diff --git a/test/test-socket-server-unix/10-rsyslog-omfwd-unixgram.conf b/test/test-socket-server-unix/10-rsyslog-omfwd-unixgram.conf new file mode 100644 index 0000000..8c80b15 --- /dev/null +++ b/test/test-socket-server-unix/10-rsyslog-omfwd-unixgram.conf @@ -0,0 +1,13 @@ +## Unix Forward using sock session less +## So unixgram type and as unix socket +## (based on UDP socket) + +if $syslogtag contains 'testme' then { + Action( + Type="omfwd" + Target="127.0.0.1" + Port="9001" + Protocol="udp" + udp.sendToAll="on" + ) +} diff --git a/test/test-socket-server-unix/main.go b/test/test-socket-server-unix/main.go new file mode 100644 index 0000000..e0a1939 --- /dev/null +++ b/test/test-socket-server-unix/main.go @@ -0,0 +1,98 @@ +/* + * MIT License + * + * Copyright (c) 2022 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * + */ + +package main + +import ( + "context" + "fmt" + "io" + "net" + "os" + + netptl "github.com/nabbar/golib/network/protocol" + libsiz "github.com/nabbar/golib/size" + libsck "github.com/nabbar/golib/socket" + sckcfg "github.com/nabbar/golib/socket/config" +) + +func config() sckcfg.ServerConfig { + return sckcfg.ServerConfig{ + Network: netptl.NetworkUnixGram, + Address: "/tmp/test-server-unix.sock", + PermFile: 0777, + GroupPerm: -1, + BuffSizeRead: 32 * libsiz.SizeKilo, + } +} + +func Handler(request io.Reader, response io.Writer) { + _, e := io.Copy(os.Stdout, request) + printError(e) +} + +func printError(err ...error) { + for _, e := range err { + if e == nil { + continue + } + _, _ = fmt.Fprintf(os.Stderr, "Error: %v\n", e) + } +} + +func checkPanic(err ...error) { + var found = false + for _, e := range err { + if e == nil { + continue + } + + found = true + printError(err...) + break + } + + if found { + panic(nil) + } +} + +func main() { + srv, err := config().New(Handler) + checkPanic(err) + + srv.RegisterFuncError(func(e error) { + printError(e) + }) + srv.RegisterFuncInfo(func(local, remote net.Addr, state libsck.ConnState) { + _, _ = fmt.Fprintf(os.Stdout, "[%s %s]=>[%s %s] %s\n", remote.Network(), remote.String(), local.Network(), local.String(), state.String()) + }) + srv.RegisterFuncInfoServer(func(msg string) { + _, _ = fmt.Fprintf(os.Stdout, "%s\n", msg) + }) + + checkPanic(srv.Listen(context.Background())) +}