This commit is contained in:
gospider
2025-06-23 17:25:47 +08:00
parent 48eac55e95
commit 4af99d0cd0

View File

@@ -14,10 +14,17 @@ import (
)
type CompressionConn struct {
conn net.Conn
conn net.Conn
w *WriterCompression
r *ReaderCompression
}
type ReaderCompression struct {
oneFunc func()
r io.Reader
}
type WriterCompression struct {
oneFunc func()
w io.WriteCloser
r io.Reader
f interface{ Flush() error }
}
type Compression interface {
@@ -102,26 +109,17 @@ func NewCompression(decode string) (Compression, error) {
return arch, nil
}
func NewCompressionConn(conn net.Conn, arch Compression) (net.Conn, error) {
func NewWriterCompression(conn io.Writer, arch Compression) (*WriterCompression, error) {
w, err := arch.OpenWriter(conn)
if err != nil {
return nil, err
}
r, err := arch.OpenReader(conn)
if err != nil {
return nil, err
}
ccon := &CompressionConn{
conn: conn,
r: r,
w: w,
ccon := &WriterCompression{
w: w,
oneFunc: sync.OnceFunc(func() {
if snW, ok := w.(*snappy.Writer); ok {
putSnappyWriter(snW)
}
if snR, ok := r.(*snappy.Reader); ok {
putSnappyReader(snR)
}
}),
}
if f, ok := w.(interface{ Flush() error }); ok {
@@ -129,11 +127,38 @@ func NewCompressionConn(conn net.Conn, arch Compression) (net.Conn, error) {
}
return ccon, nil
}
func (obj *CompressionConn) Read(b []byte) (n int, err error) {
return obj.r.Read(b)
func NewReaderCompression(conn io.Reader, arch Compression) (*ReaderCompression, error) {
r, err := arch.OpenReader(conn)
if err != nil {
return nil, err
}
ccon := &ReaderCompression{
r: r,
oneFunc: sync.OnceFunc(func() {
if snR, ok := r.(*snappy.Reader); ok {
putSnappyReader(snR)
}
}),
}
return ccon, nil
}
func (obj *CompressionConn) Write(b []byte) (n int, err error) {
func NewCompressionConn(conn net.Conn, arch Compression) (net.Conn, error) {
w, err := NewWriterCompression(conn, arch)
if err != nil {
return nil, err
}
r, err := NewReaderCompression(conn, arch)
if err != nil {
return nil, err
}
ccon := &CompressionConn{
conn: conn,
r: r,
w: w,
}
return ccon, nil
}
func (obj *WriterCompression) Write(b []byte) (n int, err error) {
n, err = obj.w.Write(b)
if err != nil {
return
@@ -143,13 +168,36 @@ func (obj *CompressionConn) Write(b []byte) (n int, err error) {
}
return
}
func (obj *CompressionConn) Close() error {
err := obj.conn.Close()
func (obj *WriterCompression) Close() error {
if obj.oneFunc != nil {
obj.oneFunc()
}
return err
return nil
}
func (obj *ReaderCompression) Read(b []byte) (n int, err error) {
return obj.r.Read(b)
}
func (obj *ReaderCompression) Close() error {
if obj.oneFunc != nil {
obj.oneFunc()
}
return nil
}
func (obj *CompressionConn) Read(b []byte) (n int, err error) {
return obj.r.Read(b)
}
func (obj *CompressionConn) Write(b []byte) (n int, err error) {
return obj.w.Write(b)
}
func (obj *CompressionConn) Close() error {
obj.w.Close()
obj.r.Close()
return obj.conn.Close()
}
func (obj *CompressionConn) LocalAddr() net.Addr {
return obj.conn.LocalAddr()
}