diff --git a/compressConn.go b/compressConn.go index 15a038c..f27fde9 100644 --- a/compressConn.go +++ b/compressConn.go @@ -9,27 +9,27 @@ import ( "sync" "time" + "github.com/golang/snappy" "github.com/klauspost/compress/zstd" - "github.com/mholt/archives" ) type CompressionConn struct { conn net.Conn oneFunc func() w io.WriteCloser - r io.ReadCloser + r io.Reader f interface{ Flush() error } } type Compression interface { - OpenReader(r io.Reader) (io.ReadCloser, error) + OpenReader(r io.Reader) (io.Reader, error) OpenWriter(w io.Writer) (io.WriteCloser, error) } type compression struct { - openReader func(r io.Reader) (io.ReadCloser, error) + openReader func(r io.Reader) (io.Reader, error) openWriter func(w io.Writer) (io.WriteCloser, error) } -func (obj compression) OpenReader(r io.Reader) (io.ReadCloser, error) { +func (obj compression) OpenReader(r io.Reader) (io.Reader, error) { return obj.openReader(r) } func (obj compression) OpenWriter(w io.Writer) (io.WriteCloser, error) { @@ -48,24 +48,16 @@ func NewCompression(decode string) (Compression, error) { switch strings.ToLower(decode) { case "s2": arch = compression{ - openReader: func(r io.Reader) (io.ReadCloser, error) { - return archives.Sz{ - S2: archives.S2{ - Compression: archives.S2LevelBetter, - }, - }.OpenReader(r) + openReader: func(r io.Reader) (io.Reader, error) { + return getSnappyReader(r), nil }, openWriter: func(w io.Writer) (io.WriteCloser, error) { - return archives.Sz{ - S2: archives.S2{ - Compression: archives.S2LevelBetter, - }, - }.OpenWriter(w) + return getSnappyWriter(w), nil }, } case "zstd": arch = compression{ - openReader: func(r io.Reader) (io.ReadCloser, error) { + openReader: func(r io.Reader) (io.Reader, error) { decoder, err := zstd.NewReader(r) if err != nil { return nil, err @@ -82,7 +74,7 @@ func NewCompression(decode string) (Compression, error) { } case "flate": arch = compression{ - openReader: func(r io.Reader) (io.ReadCloser, error) { + openReader: func(r io.Reader) (io.Reader, error) { buf := make([]byte, 1) n, err := r.Read(buf) if err != nil { @@ -119,14 +111,18 @@ func NewCompressionConn(conn net.Conn, arch Compression) (net.Conn, error) { if err != nil { return nil, err } - ccon := &CompressionConn{ conn: conn, r: r, w: w, oneFunc: sync.OnceFunc(func() { defer recover() - w.Close() + if snW, ok := w.(*snappy.Writer); ok { + putSnappyWriter(snW) + } + if snR, ok := r.(*snappy.Reader); ok { + putSnappyReader(snR) + } }), } if f, ok := w.(interface{ Flush() error }); ok { diff --git a/flate.go b/flate.go new file mode 100644 index 0000000..5870f16 --- /dev/null +++ b/flate.go @@ -0,0 +1,26 @@ +package requests + +import ( + "compress/flate" + "io" + "sync" +) + +var flateWriterPool = sync.Pool{ + New: func() interface{} { + w, _ := flate.NewWriter(io.Discard, flate.DefaultCompression) + return w + }, +} + +func getFlateWriter(dst io.Writer) *flate.Writer { + w := flateWriterPool.Get().(*flate.Writer) + w.Reset(dst) + return w +} + +func putFlateWriter(w *flate.Writer) { + // w.Close() // flush buffer + w.Reset(io.Discard) + flateWriterPool.Put(w) +} diff --git a/go.mod b/go.mod index d525141..4d2300d 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/gospider007/requests go 1.24.0 require ( + github.com/golang/snappy v0.0.4 github.com/gospider007/bar v0.0.0-20250217074946-47896d8de2ba github.com/gospider007/bs4 v0.0.0-20250413121342-fed910fb00c9 github.com/gospider007/gson v0.0.0-20250611163241-fa021e9c5531 @@ -14,7 +15,6 @@ require ( github.com/gospider007/tools v0.0.0-20250611120310-b7d5692a72bf github.com/gospider007/websocket v0.0.0-20250429035144-b1cf6819063a github.com/klauspost/compress v1.18.0 - github.com/mholt/archives v0.1.2 github.com/quic-go/quic-go v0.52.0 github.com/refraction-networking/uquic v0.0.6 github.com/refraction-networking/utls v1.7.3 @@ -53,6 +53,7 @@ require ( github.com/kr/pretty v0.3.1 // indirect github.com/libdns/libdns v1.1.0 // indirect github.com/mholt/acmez/v3 v3.1.2 // indirect + github.com/mholt/archives v0.1.2 // indirect github.com/miekg/dns v1.1.66 // indirect github.com/minio/minlz v1.0.1 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect diff --git a/go.sum b/go.sum index 5e773e4..cffd49b 100644 --- a/go.sum +++ b/go.sum @@ -80,6 +80,8 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= diff --git a/http.go b/http.go index 4fba730..c247878 100644 --- a/http.go +++ b/http.go @@ -15,14 +15,6 @@ import ( ) type reqReadWriteCtx struct { - // req *http.Request - // res *http.Response - // orderHeaders []interface { - // Key() string - // Val() any - // } - // err error - writeCtx context.Context writeCnl context.CancelFunc @@ -110,10 +102,6 @@ func (obj *clientConn) send(req *http.Request, orderHeaders []interface { return } -// func (obj *clientConn) CloseCtx() context.Context { -// return obj.closeCtx -// } - func (obj *clientConn) Close() error { return obj.CloseWithError(nil) } @@ -201,6 +189,7 @@ func (obj *clientConn) Stream() io.ReadWriteCloser { w: obj.conn, } } + func (obj *clientConn) httpWrite(req *http.Request, rawHeaders http.Header, orderHeaders []interface { Key() string Val() any diff --git a/roundTripper.go b/roundTripper.go index c904b8f..79ca7c5 100644 --- a/roundTripper.go +++ b/roundTripper.go @@ -206,36 +206,6 @@ func (obj *roundTripper) uhttp3Dial(ctx *Response, remoteAddress Address, proxyA return } -// func (obj *roundTripper) thttp3Dial(ctx *Response, remoteAddress Address, proxyAddress ...Address) (conn *connecotr, err error) { -// // var rawNetConn net.Conn - -// // if len(proxys) > 0 { -// // comp := proxys[len(proxys)-1] -// // if comp.Compression != "" { -// // arch, err = NewCompression(comp.Compression, CompressionLevelBest) -// // if err != nil { -// // return nil, err -// // } -// // } -// // _, rawNetConn, err = obj.dialer.DialProxyContext(ctx, "tcp", ctx.option.TlsConfig.Clone(), append(proxys, remoteAddress)...) -// // } else { -// // var remoteAddress Address -// // remoteAddress, err = GetAddressWithUrl(ctx.request.URL) -// // if err != nil { -// // return nil, err -// // } -// // rawNetConn, err = obj.dialer.DialContext(ctx, "tcp", remoteAddress) -// // } -// // defer func() { -// // if err != nil && rawNetConn != nil { -// // rawNetConn.Close() -// // } -// // }() -// // if err != nil { -// // return nil, err -// // } -// } - func (obj *roundTripper) dial(ctx *Response) (conn *connecotr, err error) { proxys, err := obj.initProxys(ctx) if err != nil { diff --git a/snappy.go b/snappy.go new file mode 100644 index 0000000..c4f18e0 --- /dev/null +++ b/snappy.go @@ -0,0 +1,49 @@ +package requests + +import ( + "io" + "sync" + + "github.com/golang/snappy" +) + +// 定义 snappy.Writer 池,包装 io.Writer +var snappyWriterPool = sync.Pool{ + New: func() interface{} { + // 先给一个空 buffer,后面可以Reset替换输出目标 + return snappy.NewBufferedWriter(nil) + }, +} + +// 定义 snappy.Reader 池,包装 io.Reader +var snappyReaderPool = sync.Pool{ + New: func() interface{} { + // 先给一个空 reader,后面可以Reset替换输入来源 + return snappy.NewReader(nil) + }, +} + +// 获取并初始化 snappy.Writer +func getSnappyWriter(w io.Writer) *snappy.Writer { + sw := snappyWriterPool.Get().(*snappy.Writer) + sw.Reset(w) + return sw +} + +// 释放 snappy.Writer +func putSnappyWriter(sw *snappy.Writer) { + snappyWriterPool.Put(sw) +} + +// 获取并初始化 snappy.Reader +func getSnappyReader(r io.Reader) *snappy.Reader { + sr := snappyReaderPool.Get().(*snappy.Reader) + sr.Reset(r) + return sr +} + +// 释放 snappy.Reader +func putSnappyReader(sr *snappy.Reader) { + // snappy.Reader 没有 Close 方法,直接放回池 + snappyReaderPool.Put(sr) +} diff --git a/socks5.go b/socks5.go index bf2da76..6a59105 100644 --- a/socks5.go +++ b/socks5.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "errors" "io" + "log" "math" "net" "strconv" @@ -166,6 +167,7 @@ func (c *UDPConn) SetTcpCloseFunc(f func(error)) { c.tcpCloseFunc = f } func (c *UDPConn) Close() error { + log.Print("正在关闭tcp") c.tcpConn.Close() return c.PacketConn.Close() } diff --git a/socks_linux.go b/socks_linux.go index a35e9b7..d622c85 100644 --- a/socks_linux.go +++ b/socks_linux.go @@ -8,9 +8,10 @@ import ( func Control(network, address string, c syscall.RawConn) error { return c.Control(func(fd uintptr) { + syscall.SetsockoptInt(int(fd), syscall.IPPROTO_TCP, syscall.TCP_CORK, 1) + syscall.SetsockoptInt(int(fd), syscall.IPPROTO_TCP, syscall.TCP_NODELAY, 0) syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1) // 启用地址重用 - syscall.SetsockoptInt(int(fd), syscall.IPPROTO_TCP, syscall.TCP_NODELAY, 1) syscall.SetsockoptLinger(int(fd), syscall.SOL_SOCKET, syscall.SO_LINGER, &syscall.Linger{Onoff: 1, Linger: 0}) - syscall.SetNonblock(int(fd), true) + syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_KEEPALIVE, 1) }) } diff --git a/socks_mac.go b/socks_mac.go index 4fb339d..3139465 100644 --- a/socks_mac.go +++ b/socks_mac.go @@ -10,8 +10,8 @@ func Control(network, address string, c syscall.RawConn) error { return c.Control(func(fd uintptr) { syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1) // 启用地址重用 syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEPORT, 1) // 启用端口重用 - syscall.SetsockoptInt(int(fd), syscall.IPPROTO_TCP, syscall.TCP_NODELAY, 1) + syscall.SetsockoptInt(int(fd), syscall.IPPROTO_TCP, syscall.TCP_NODELAY, 0) //0 表示关闭 TCP_NODELAY,即开启 Nagle 算法。 syscall.SetsockoptLinger(int(fd), syscall.SOL_SOCKET, syscall.SO_LINGER, &syscall.Linger{Onoff: 1, Linger: 0}) - syscall.SetNonblock(int(fd), true) + syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_KEEPALIVE, 1) }) } diff --git a/socks_win.go b/socks_win.go index 19a8715..68fa0ad 100644 --- a/socks_win.go +++ b/socks_win.go @@ -10,7 +10,7 @@ func Control(network, address string, c syscall.RawConn) error { return c.Control(func(fd uintptr) { syscall.SetsockoptInt(syscall.Handle(fd), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1) // 启用地址重用 syscall.SetsockoptInt(syscall.Handle(fd), syscall.IPPROTO_TCP, syscall.TCP_NODELAY, 1) + syscall.SetsockoptInt(syscall.Handle(fd), syscall.SOL_SOCKET, syscall.SO_KEEPALIVE, 1) syscall.SetsockoptLinger(syscall.Handle(fd), syscall.SOL_SOCKET, syscall.SO_LINGER, &syscall.Linger{Onoff: 1, Linger: 0}) - syscall.SetNonblock(syscall.Handle(fd), true) }) } diff --git a/test/fingerprint/quic_test.go b/test/fingerprint/quic_test.go index ebcb6b1..d03b256 100644 --- a/test/fingerprint/quic_test.go +++ b/test/fingerprint/quic_test.go @@ -31,7 +31,7 @@ func TestHttp3(t *testing.T) { func TestHttp32(t *testing.T) { resp, err := requests.Get(context.TODO(), "https://cloudflare-quic.com/", requests.RequestOption{ ClientOption: requests.ClientOption{ - USpec: true, + // USpec: true, ForceHttp3: true, }, }, @@ -43,6 +43,8 @@ func TestHttp32(t *testing.T) { if resp.StatusCode() != 200 { t.Error("resp.StatusCode!= 200") } + resp.CloseConn() + // time.Sleep(time.Second * 5) if resp.Proto() != "HTTP/3.0" { t.Error("resp.Proto!= HTTP/3.0") } diff --git a/test/proxy/http3_proxy_test.go b/test/proxy/http3_proxy_test.go index bb8a39e..560b6de 100644 --- a/test/proxy/http3_proxy_test.go +++ b/test/proxy/http3_proxy_test.go @@ -122,6 +122,7 @@ func TestHttp3Proxy2(t *testing.T) { resp, err := requests.Get(context.TODO(), "https://cloudflare-quic.com/", requests.RequestOption{ ClientOption: requests.ClientOption{ + USpec: true, ForceHttp3: true, // Logger: func(l requests.Log) { // log.Print(l)