From 4af99d0cd09584fbf8b26aba4a493bd79606859a Mon Sep 17 00:00:00 2001 From: gospider <2216403312@qq.com> Date: Mon, 23 Jun 2025 17:25:47 +0800 Subject: [PATCH] sync --- compressConn.go | 90 +++++++++++++++++++++++++++++++++++++------------ 1 file changed, 69 insertions(+), 21 deletions(-) diff --git a/compressConn.go b/compressConn.go index 2dd83ca..2376b6c 100644 --- a/compressConn.go +++ b/compressConn.go @@ -14,10 +14,17 @@ import ( ) type CompressionConn struct { - conn net.Conn + conn net.Conn + w *WriterCompression + r *ReaderCompression +} +type ReaderCompression struct { + oneFunc func() + r io.Reader +} +type WriterCompression struct { oneFunc func() w io.WriteCloser - r io.Reader f interface{ Flush() error } } type Compression interface { @@ -102,26 +109,17 @@ func NewCompression(decode string) (Compression, error) { return arch, nil } -func NewCompressionConn(conn net.Conn, arch Compression) (net.Conn, error) { +func NewWriterCompression(conn io.Writer, arch Compression) (*WriterCompression, error) { w, err := arch.OpenWriter(conn) if err != nil { return nil, err } - r, err := arch.OpenReader(conn) - if err != nil { - return nil, err - } - ccon := &CompressionConn{ - conn: conn, - r: r, - w: w, + ccon := &WriterCompression{ + w: w, oneFunc: sync.OnceFunc(func() { if snW, ok := w.(*snappy.Writer); ok { putSnappyWriter(snW) } - if snR, ok := r.(*snappy.Reader); ok { - putSnappyReader(snR) - } }), } if f, ok := w.(interface{ Flush() error }); ok { @@ -129,11 +127,38 @@ func NewCompressionConn(conn net.Conn, arch Compression) (net.Conn, error) { } return ccon, nil } - -func (obj *CompressionConn) Read(b []byte) (n int, err error) { - return obj.r.Read(b) +func NewReaderCompression(conn io.Reader, arch Compression) (*ReaderCompression, error) { + r, err := arch.OpenReader(conn) + if err != nil { + return nil, err + } + ccon := &ReaderCompression{ + r: r, + oneFunc: sync.OnceFunc(func() { + if snR, ok := r.(*snappy.Reader); ok { + putSnappyReader(snR) + } + }), + } + return ccon, nil } -func (obj *CompressionConn) Write(b []byte) (n int, err error) { +func NewCompressionConn(conn net.Conn, arch Compression) (net.Conn, error) { + w, err := NewWriterCompression(conn, arch) + if err != nil { + return nil, err + } + r, err := NewReaderCompression(conn, arch) + if err != nil { + return nil, err + } + ccon := &CompressionConn{ + conn: conn, + r: r, + w: w, + } + return ccon, nil +} +func (obj *WriterCompression) Write(b []byte) (n int, err error) { n, err = obj.w.Write(b) if err != nil { return @@ -143,13 +168,36 @@ func (obj *CompressionConn) Write(b []byte) (n int, err error) { } return } -func (obj *CompressionConn) Close() error { - err := obj.conn.Close() + +func (obj *WriterCompression) Close() error { if obj.oneFunc != nil { obj.oneFunc() } - return err + return nil } + +func (obj *ReaderCompression) Read(b []byte) (n int, err error) { + return obj.r.Read(b) +} +func (obj *ReaderCompression) Close() error { + if obj.oneFunc != nil { + obj.oneFunc() + } + return nil +} + +func (obj *CompressionConn) Read(b []byte) (n int, err error) { + return obj.r.Read(b) +} +func (obj *CompressionConn) Write(b []byte) (n int, err error) { + return obj.w.Write(b) +} +func (obj *CompressionConn) Close() error { + obj.w.Close() + obj.r.Close() + return obj.conn.Close() +} + func (obj *CompressionConn) LocalAddr() net.Addr { return obj.conn.LocalAddr() }