From 45c7a3262f0dc5c1ae591ceb4e9b83388d8698d2 Mon Sep 17 00:00:00 2001 From: gospider <2216403312@qq.com> Date: Fri, 11 Jul 2025 11:54:33 +0800 Subject: [PATCH] sync --- body.go | 28 +++ compressConn.go | 131 +++++++++++--- conn.go | 21 ++- flate.go | 38 ++-- go.mod | 23 +-- go.sum | 46 ++--- http.go | 300 +++++++++++++++----------------- minlz.go | 38 ++++ requests.go | 29 +-- response.go | 160 ++++++++++------- roundTripper.go | 6 +- rw.go | 21 ++- sepc.go | 25 +-- snappy.go | 68 ++++---- socks5.go | 17 +- test/fingerprint/ja4_test.go | 33 ++-- test/protocol/http2_test.go | 10 +- test/protocol/http3_test.go | 2 +- test/protocol/websocket_test.go | 34 +++- test/request/data_test.go | 5 +- test/request/file_test.go | 3 - test/request/form_test.go | 20 +-- test/request/params_test.go | 17 +- test/request/stream_test.go | 58 +++--- test/response/rawConn_test.go | 24 --- 25 files changed, 660 insertions(+), 497 deletions(-) create mode 100644 minlz.go delete mode 100644 test/response/rawConn_test.go diff --git a/body.go b/body.go index 7a672ff..fe15353 100644 --- a/body.go +++ b/body.go @@ -10,6 +10,7 @@ import ( "net/http" "net/textproto" "net/url" + "sort" "github.com/gospider007/gson" "github.com/gospider007/tools" @@ -44,6 +45,33 @@ func (obj *OrderData) Keys() []string { } return keys } +func (obj *OrderData) ReorderWithKeys(key ...string) { + if len(key) == 0 { + return + } + for i, k := range key { + key[i] = textproto.CanonicalMIMEHeaderKey(k) + } + sort.SliceStable(obj.data, func(x, y int) bool { + xIndex := -1 + yIndex := -1 + for i, k := range key { + if k == obj.data[x].key { + xIndex = i + } + if k == obj.data[y].key { + yIndex = i + } + } + if xIndex == -1 { + return false + } + if yIndex == -1 { + return true + } + return xIndex < yIndex + }) +} type orderT struct { key string diff --git a/compressConn.go b/compressConn.go index 2376b6c..764cbeb 100644 --- a/compressConn.go +++ b/compressConn.go @@ -6,10 +6,8 @@ import ( "io" "net" "strings" - "sync" "time" - "github.com/golang/snappy" "github.com/klauspost/compress/zstd" ) @@ -43,28 +41,85 @@ func (obj compression) OpenWriter(w io.Writer) (io.WriteCloser, error) { return obj.openWriter(w) } -type CompressionLevel int - -const ( - CompressionLevelFast CompressionLevel = 1 - CompressionLevelBest CompressionLevel = 2 -) - +func GetCompressionByte(decode string) (byte, error) { + switch strings.ToLower(decode) { + case "zstd": + return 40, nil + case "s2": + return 255, nil + case "flate": + return 92, nil + case "minlz": + return 93, nil + default: + return 0, errors.New("unsupported compression type") + } +} +func NewCompressionWithByte(b byte) (Compression, error) { + switch b { + case 40: + return NewCompression("zstd") + case 255: + return NewCompression("s2") + case 92: + return NewCompression("flate") + case 93: + return NewCompression("minlz") + } + return nil, errors.New("unsupported compression type") +} func NewCompression(decode string) (Compression, error) { + b, err := GetCompressionByte(decode) + if err != nil { + return nil, err + } + br := func(r io.Reader) error { + buf := make([]byte, 1) + n, err := r.Read(buf) + if err != nil { + return err + } + if n != 1 || buf[0] != b { + return errors.New("invalid response") + } + return nil + } + bw := func(w io.Writer) error { + n, err := w.Write([]byte{b}) + if err != nil { + return err + } + if n != 1 { + return errors.New("invalid response") + } + return nil + } var arch Compression switch strings.ToLower(decode) { case "s2": arch = compression{ openReader: func(r io.Reader) (io.Reader, error) { + err := br(r) + if err != nil { + return nil, err + } return getSnappyReader(r), nil }, openWriter: func(w io.Writer) (io.WriteCloser, error) { + err := bw(w) + if err != nil { + return nil, err + } return getSnappyWriter(w), nil }, } case "zstd": arch = compression{ openReader: func(r io.Reader) (io.Reader, error) { + err := br(r) + if err != nil { + return nil, err + } decoder, err := zstd.NewReader(r) if err != nil { return nil, err @@ -72,6 +127,10 @@ func NewCompression(decode string) (Compression, error) { return decoder.IOReadCloser(), nil }, openWriter: func(w io.Writer) (io.WriteCloser, error) { + err := bw(w) + if err != nil { + return nil, err + } encoder, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedDefault)) if err != nil { return nil, err @@ -82,27 +141,37 @@ func NewCompression(decode string) (Compression, error) { case "flate": arch = compression{ openReader: func(r io.Reader) (io.Reader, error) { - buf := make([]byte, 1) - n, err := r.Read(buf) + err := br(r) if err != nil { return nil, err } - if n != 1 || buf[0] != 92 { - return nil, errors.New("invalid response") - } return flate.NewReader(r), nil }, openWriter: func(w io.Writer) (io.WriteCloser, error) { - n, err := w.Write([]byte{92}) + err := bw(w) if err != nil { return nil, err } - if n != 1 { - return nil, errors.New("invalid response") - } return flate.NewWriter(w, flate.DefaultCompression) }, } + case "minlz": + arch = compression{ + openReader: func(r io.Reader) (io.Reader, error) { + err := br(r) + if err != nil { + return nil, err + } + return getMinlzReader(r), nil + }, + openWriter: func(w io.Writer) (io.WriteCloser, error) { + err := bw(w) + if err != nil { + return nil, err + } + return getMinlzWriter(w), nil + }, + } default: return nil, errors.New("unsupported compression type") } @@ -116,11 +185,14 @@ func NewWriterCompression(conn io.Writer, arch Compression) (*WriterCompression, } ccon := &WriterCompression{ w: w, - oneFunc: sync.OnceFunc(func() { - if snW, ok := w.(*snappy.Writer); ok { - putSnappyWriter(snW) - } - }), + // oneFunc: sync.OnceFunc(func() { + // switch snW := w.(type) { + // case *snappy.Writer: + // putSnappyWriter(snW) + // case *minlz.Writer: + // putMinlzWriter(snW) + // } + // }), } if f, ok := w.(interface{ Flush() error }); ok { ccon.f = f @@ -134,11 +206,14 @@ func NewReaderCompression(conn io.Reader, arch Compression) (*ReaderCompression, } ccon := &ReaderCompression{ r: r, - oneFunc: sync.OnceFunc(func() { - if snR, ok := r.(*snappy.Reader); ok { - putSnappyReader(snR) - } - }), + // oneFunc: sync.OnceFunc(func() { + // switch snR := r.(type) { + // case *snappy.Reader: + // putSnappyReader(snR) + // case *minlz.Reader: + // putMinlzReader(snR) + // } + // }), } return ccon, nil } diff --git a/conn.go b/conn.go index c5f5935..25ce808 100644 --- a/conn.go +++ b/conn.go @@ -81,28 +81,35 @@ func (obj *connPool) taskMain(conn *connecotr, task *reqTask) (err error) { } } if err == nil { - task.cnl(errNoErr) + task.cnl(tools.ErrNoErr) } else { task.cnl(err) } - if err == nil && task.reqCtx.response != nil && task.reqCtx.response.Body != nil { + if err == nil && task.reqCtx.response != nil && task.reqCtx.response.Body != nil && task.bodyCtx != nil { select { case <-conn.forceCtx.Done(): err = context.Cause(conn.forceCtx) case <-task.reqCtx.Context().Done(): - if context.Cause(task.reqCtx.Context()) != errNoErr { + if context.Cause(task.reqCtx.Context()) != tools.ErrNoErr { err = context.Cause(task.reqCtx.Context()) } + if err == nil && task.reqCtx.response.StatusCode == 101 { + select { + case <-conn.forceCtx.Done(): + err = context.Cause(conn.forceCtx) + case <-task.bodyCtx.Done(): + if context.Cause(task.bodyCtx) != tools.ErrNoErr { + err = context.Cause(task.bodyCtx) + } + } + } case <-task.bodyCtx.Done(): - if context.Cause(task.bodyCtx) != errNoErr { + if context.Cause(task.bodyCtx) != tools.ErrNoErr { err = context.Cause(task.bodyCtx) } } } if err != nil { - if errors.Is(err, errLastTaskRuning) { - task.isNotice = true - } conn.CloseWithError(tools.WrapError(err, "taskMain close with error")) } }() diff --git a/flate.go b/flate.go index 5870f16..910fa20 100644 --- a/flate.go +++ b/flate.go @@ -1,26 +1,20 @@ package requests -import ( - "compress/flate" - "io" - "sync" -) +// var flateWriterPool = sync.Pool{ +// New: func() interface{} { +// w, _ := flate.NewWriter(io.Discard, flate.DefaultCompression) +// return w +// }, +// } -var flateWriterPool = sync.Pool{ - New: func() interface{} { - w, _ := flate.NewWriter(io.Discard, flate.DefaultCompression) - return w - }, -} +// func getFlateWriter(dst io.Writer) *flate.Writer { +// w := flateWriterPool.Get().(*flate.Writer) +// w.Reset(dst) +// return w +// } -func getFlateWriter(dst io.Writer) *flate.Writer { - w := flateWriterPool.Get().(*flate.Writer) - w.Reset(dst) - return w -} - -func putFlateWriter(w *flate.Writer) { - // w.Close() // flush buffer - w.Reset(io.Discard) - flateWriterPool.Put(w) -} +// func putFlateWriter(w *flate.Writer) { +// // w.Close() // flush buffer +// w.Reset(io.Discard) +// flateWriterPool.Put(w) +// } diff --git a/go.mod b/go.mod index fc056bb..cd39e91 100644 --- a/go.mod +++ b/go.mod @@ -4,23 +4,25 @@ go 1.24.0 require ( github.com/golang/snappy v1.0.0 + github.com/gorilla/websocket v1.5.3 github.com/gospider007/bar v0.0.0-20250217074946-47896d8de2ba github.com/gospider007/bs4 v0.0.0-20250413121342-fed910fb00c9 github.com/gospider007/gson v0.0.0-20250630120534-cce6e3c6756d github.com/gospider007/gtls v0.0.0-20250630120509-4e99c91661ee - github.com/gospider007/http2 v0.0.0-20250630120519-3f59fca61c88 + github.com/gospider007/http2 v0.0.0-20250711035043-daabc8e205b3 github.com/gospider007/http3 v0.0.0-20250630120526-1066890881e5 github.com/gospider007/ja3 v0.0.0-20250627013834-1d2966014638 github.com/gospider007/re v0.0.0-20250217075352-bcb79f285d6c github.com/gospider007/tools v0.0.0-20250630120304-b22c2ddf35b5 github.com/gospider007/websocket v0.0.0-20250630120328-1ec26253d082 github.com/klauspost/compress v1.18.0 + github.com/minio/minlz v1.0.1 github.com/quic-go/quic-go v0.53.0 github.com/refraction-networking/uquic v0.0.6 github.com/refraction-networking/utls v1.7.4-0.20250621163342-5abccec539e6 github.com/txthinking/socks5 v0.0.0-20230325130024-4230056ae301 - golang.org/x/crypto v0.39.0 - golang.org/x/net v0.41.0 + golang.org/x/crypto v0.40.0 + golang.org/x/net v0.42.0 gopkg.in/errgo.v2 v2.1.0 ) @@ -43,7 +45,7 @@ require ( github.com/gobwas/pool v0.2.1 // indirect github.com/gobwas/ws v1.4.0 // indirect github.com/google/gopacket v1.1.19 // indirect - github.com/google/pprof v0.0.0-20250629210550-e611ec304b22 // indirect + github.com/google/pprof v0.0.0-20250630185457-6e76a2b096b5 // indirect github.com/gospider007/blog v0.0.0-20250302134054-8afc12c2a9a7 // indirect github.com/gospider007/kinds v0.0.0-20250217075226-10f199f7215d // indirect github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect @@ -54,9 +56,8 @@ require ( github.com/libdns/libdns v1.1.0 // indirect github.com/mholt/acmez/v3 v3.1.2 // indirect github.com/mholt/archives v0.1.3 // indirect - github.com/miekg/dns v1.1.66 // indirect + github.com/miekg/dns v1.1.67 // indirect github.com/mikelolasagasti/xz v1.0.1 // indirect - github.com/minio/minlz v1.0.1 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/nwaples/rardecode/v2 v2.1.1 // indirect @@ -81,10 +82,10 @@ require ( go.uber.org/zap/exp v0.3.0 // indirect go4.org v0.0.0-20230225012048-214862532bf5 // indirect golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect - golang.org/x/image v0.28.0 // indirect - golang.org/x/mod v0.25.0 // indirect - golang.org/x/sync v0.15.0 // indirect - golang.org/x/sys v0.33.0 // indirect - golang.org/x/text v0.26.0 // indirect + golang.org/x/image v0.29.0 // indirect + golang.org/x/mod v0.26.0 // indirect + golang.org/x/sync v0.16.0 // indirect + golang.org/x/sys v0.34.0 // indirect + golang.org/x/text v0.27.0 // indirect golang.org/x/tools v0.34.0 // indirect ) diff --git a/go.sum b/go.sum index 779096f..da78413 100644 --- a/go.sum +++ b/go.sum @@ -99,11 +99,13 @@ github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXi github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/pprof v0.0.0-20200212024743-f11f1df84d12/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= -github.com/google/pprof v0.0.0-20250629210550-e611ec304b22 h1:RanZAubGQRlhKdX83NviyIduq4DsO2zFmSgPuTlnkMc= -github.com/google/pprof v0.0.0-20250629210550-e611ec304b22/go.mod h1:5hDyRhoBCxViHszMt12TnOpEI4VVi+U8Gm9iphldiMA= +github.com/google/pprof v0.0.0-20250630185457-6e76a2b096b5 h1:xhMrHhTJ6zxu3gA4enFM9MLn9AY7613teCdFnlUVbSQ= +github.com/google/pprof v0.0.0-20250630185457-6e76a2b096b5/go.mod h1:5hDyRhoBCxViHszMt12TnOpEI4VVi+U8Gm9iphldiMA= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gospider007/bar v0.0.0-20250217074946-47896d8de2ba h1:8DK0d1nUTsMbOgFrIWMSBKp7obOAKgSLkswzVBX1RRI= github.com/gospider007/bar v0.0.0-20250217074946-47896d8de2ba/go.mod h1:HGEEIVnysptCXwsdU4E82uQu0F4ObU/5+KWHIdJCUbY= github.com/gospider007/blog v0.0.0-20250302134054-8afc12c2a9a7 h1:QP46FdP6nJET+bSdm8wxjIzBMnu2lKraoypfU9bLGV4= @@ -114,8 +116,8 @@ github.com/gospider007/gson v0.0.0-20250630120534-cce6e3c6756d h1:/FaI2cPp/DgiEJ github.com/gospider007/gson v0.0.0-20250630120534-cce6e3c6756d/go.mod h1:Vb5Ehy5UGQYVUW2XvzqLW1rc7/WPljQspyF3YZ/u164= github.com/gospider007/gtls v0.0.0-20250630120509-4e99c91661ee h1:rcQAnk5Szv/Vi9dVieU1uwKUSVPNB+lqRPggI6oZgFM= github.com/gospider007/gtls v0.0.0-20250630120509-4e99c91661ee/go.mod h1:ee+Fo1W5Abt9mdLOOiTrb76r0qbO6ubGmRDklCMAPIQ= -github.com/gospider007/http2 v0.0.0-20250630120519-3f59fca61c88 h1:gQuOZI+lANJ1vQ3gj70igHTt25GrXyhrKDC7vV/FQdg= -github.com/gospider007/http2 v0.0.0-20250630120519-3f59fca61c88/go.mod h1:CmJdPojakh+1r+WdMowZEADGIQiaM2mYCQMH7VwqsPE= +github.com/gospider007/http2 v0.0.0-20250711035043-daabc8e205b3 h1:HqEs4ZAU1U7hGKwEQwE/0YYwldByzaD/YfXpSrhgdVA= +github.com/gospider007/http2 v0.0.0-20250711035043-daabc8e205b3/go.mod h1:rji8qgw9KJ3ddOqT7xvgd0GKV7Z4+l1uSbFrgDX3blg= github.com/gospider007/http3 v0.0.0-20250630120526-1066890881e5 h1:+v1s/f9AYApykZ/alJIOX4V7xj8YTiydfV8izakjaY4= github.com/gospider007/http3 v0.0.0-20250630120526-1066890881e5/go.mod h1:dyg0PmV4MW4mxUG6VzJxhAOTBboMbrpC3/XVVMDvV3Q= github.com/gospider007/ja3 v0.0.0-20250627013834-1d2966014638 h1:qQmKi3FsCK0WJcDOq4wpOzwVZ2UWto1Da05SGRVBKPQ= @@ -160,8 +162,8 @@ github.com/mholt/acmez/v3 v3.1.2/go.mod h1:L1wOU06KKvq7tswuMDwKdcHeKpFFgkppZy/y0 github.com/mholt/archives v0.1.3 h1:aEAaOtNra78G+TvV5ohmXrJOAzf++dIlYeDW3N9q458= github.com/mholt/archives v0.1.3/go.mod h1:LUCGp++/IbV/I0Xq4SzcIR6uwgeh2yjnQWamjRQfLTU= github.com/miekg/dns v1.1.51/go.mod h1:2Z9d3CP1LQWihRZUf29mQ19yDThaI4DAYzte2CaQW5c= -github.com/miekg/dns v1.1.66 h1:FeZXOS3VCVsKnEAd+wBkjMC3D2K+ww66Cq3VnCINuJE= -github.com/miekg/dns v1.1.66/go.mod h1:jGFzBsSNbJw6z1HYut1RKBKHA9PBdxeHrZG8J+gC2WE= +github.com/miekg/dns v1.1.67 h1:kg0EHj0G4bfT5/oOys6HhZw4vmMlnoZ+gDu8tJ/AlI0= +github.com/miekg/dns v1.1.67/go.mod h1:fujopn7TB3Pu3JM69XaawiU0wqjpL9/8xGop5UrTPps= github.com/mikelolasagasti/xz v1.0.1 h1:Q2F2jX0RYJUG3+WsM+FJknv+6eVjsjXNDV0KJXZzkD0= github.com/mikelolasagasti/xz v1.0.1/go.mod h1:muAirjiOUxPRXwm9HdDtB3uoRPrGnL85XHtokL9Hcgc= github.com/minio/minlz v1.0.1 h1:OUZUzXcib8diiX+JYxyRLIdomyZYzHct6EShOKtQY2A= @@ -273,8 +275,8 @@ golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliY golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= -golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM= -golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U= +golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM= +golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -287,8 +289,8 @@ golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/y golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= -golang.org/x/image v0.28.0 h1:gdem5JW1OLS4FbkWgLO+7ZeFzYtL3xClb97GaUzYMFE= -golang.org/x/image v0.28.0/go.mod h1:GUJYXtnGKEUgggyzh+Vxt+AviiCcyiwpsl8iQ8MvwGY= +golang.org/x/image v0.29.0 h1:HcdsyR4Gsuys/Axh0rDEmlBmB68rW1U9BUdB3UVHsas= +golang.org/x/image v0.29.0/go.mod h1:RVJROnf3SLK8d26OW91j4FrIHGbsJ8QnbEocVTOWQDA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -311,8 +313,8 @@ golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.15.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= -golang.org/x/mod v0.25.0 h1:n7a+ZbQKQA/Ysbyb0/6IbB1H/X41mKgbhfv7AfG/44w= -golang.org/x/mod v0.25.0/go.mod h1:IXM97Txy2VM4PJ3gI61r1YEk/gAj6zAHN3AdZt6S9Ww= +golang.org/x/mod v0.26.0 h1:EGMPT//Ezu+ylkCijjPc+f4Aih7sZvaAr+O3EHBxvZg= +golang.org/x/mod v0.26.0/go.mod h1:/j6NAhSk8iQ723BGAUyoAcn7SlD7s15Dp9Nd/SfeaFQ= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -336,8 +338,8 @@ golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= -golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw= -golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA= +golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs= +golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -355,8 +357,8 @@ golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8= -golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= +golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -381,8 +383,8 @@ golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= -golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA= +golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -393,8 +395,8 @@ golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU= golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY= golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM= -golang.org/x/term v0.32.0 h1:DR4lr0TjUs3epypdhTOkMmuF5CDFJ/8pOnbzMZPQ7bg= -golang.org/x/term v0.32.0/go.mod h1:uZG1FhGx848Sqfsq4/DlJr3xGGsYMu/L5GW4abiaEPQ= +golang.org/x/term v0.33.0 h1:NuFncQrRcaRvVmgRkvM3j/F00gWIAlcmlB8ACEKmGIg= +golang.org/x/term v0.33.0/go.mod h1:s18+ql9tYWp1IfpV9DmCtQDDSRBUjKaw9M1eAv5UeF0= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -408,8 +410,8 @@ golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= -golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M= -golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA= +golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4= +golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE= diff --git a/http.go b/http.go index 1e2e93d..3f74c26 100644 --- a/http.go +++ b/http.go @@ -13,198 +13,93 @@ import ( "golang.org/x/net/http/httpguts" ) -type reqReadWriteCtx struct { - writeCtx context.Context - writeCnl context.CancelFunc - - readCtx context.Context - readCnl context.CancelCauseFunc +type rsp struct { + r *http.Response + ctx context.Context + err error } type clientConn struct { - err error - readWriteCtx *reqReadWriteCtx - conn net.Conn - r *bufio.Reader - w *bufio.Writer - closeFunc func(error) - ctx context.Context - cnl context.CancelCauseFunc + conn net.Conn + r *bufio.Reader + w *bufio.Writer + closeFunc func(error) + ctx context.Context + cnl context.CancelCauseFunc + rsps chan *rsp } func NewClientConn(con net.Conn, closeFunc func(error)) *clientConn { ctx, cnl := context.WithCancelCause(context.TODO()) - reader, writer := io.Pipe() c := &clientConn{ ctx: ctx, cnl: cnl, conn: con, closeFunc: closeFunc, - r: bufio.NewReader(reader), + rsps: make(chan *rsp), + r: bufio.NewReader(con), w: bufio.NewWriter(con), } - go func() { - _, err := tools.Copy(writer, con) - writer.CloseWithError(err) - c.CloseWithError(err) - }() + go c.read() return c } - -var errLastTaskRuning = errors.New("last task is running") -var errNoErr = errors.New("no error") - -func (obj *clientConn) send(req *http.Request, orderHeaders []interface { - Key() string - Val() any -}) (res *http.Response, err error) { - go obj.httpWrite(req, req.Header.Clone(), orderHeaders) - res, err = http.ReadResponse(obj.r, req) - if err == nil && res == nil { - err = errors.New("response is nil") - } - if err != nil { - obj.readWriteCtx.readCnl(nil) - return - } - rawBody := res.Body - isStream := res.StatusCode == 101 - pr, pw := io.Pipe() - go func() { - var readErr error - defer func() { - if readErr == nil { - obj.readWriteCtx.readCnl(errNoErr) - } else { - obj.readWriteCtx.readCnl(readErr) - } - }() - if rawBody != nil { - _, readErr = tools.Copy(pw, rawBody) +func (obj *clientConn) read() { + var err error + var res *http.Response + defer obj.CloseWithError(err) + for { + res, err = http.ReadResponse(obj.r, nil) + if res == nil && err == nil { + err = errors.New("response is nil") } - if readErr != nil && readErr != io.EOF && readErr != io.ErrUnexpectedEOF { - err = tools.WrapError(readErr, "failed to read response body") - } else { - readErr = nil - } - pw.CloseWithError(readErr) - if readErr != nil { - obj.CloseWithError(readErr) - } else { + if err != nil { select { - case <-obj.readWriteCtx.writeCtx.Done(): - if isStream { - <-obj.ctx.Done() - return - } - default: - obj.CloseWithError(tools.WrapError(errLastTaskRuning, "last task not write done with read done")) + case obj.rsps <- &rsp{res, nil, err}: + case <-obj.ctx.Done(): return } + return } - }() - res.Body = pr - return -} - -func (obj *clientConn) Close() error { - return obj.CloseWithError(nil) -} -func (obj *clientConn) CloseWithError(err error) error { - if obj.closeFunc != nil { - obj.closeFunc(obj.err) - } - obj.cnl(err) - if err == nil { - obj.err = tools.WrapError(obj.err, "connecotr closeWithError close") - } else { - obj.err = tools.WrapError(err, "connecotr closeWithError close") - } - return obj.conn.Close() -} -func (obj *clientConn) initTask() { - readCtx, readCnl := context.WithCancelCause(obj.ctx) - writeCtx, writeCnl := context.WithCancel(obj.ctx) - obj.readWriteCtx = &reqReadWriteCtx{ - readCtx: readCtx, - readCnl: readCnl, - writeCtx: writeCtx, - writeCnl: writeCnl, - } -} -func (obj *clientConn) DoRequest(req *http.Request, orderHeaders []interface { - Key() string - Val() any -}) (*http.Response, context.Context, error) { - if obj.readWriteCtx != nil { - select { - case <-obj.readWriteCtx.writeCtx.Done(): - case <-obj.ctx.Done(): - return nil, nil, obj.ctx.Err() - default: - return nil, obj.readWriteCtx.readCtx, errLastTaskRuning + if res.StatusCode == 101 { + select { + case obj.rsps <- &rsp{res, obj.ctx, err}: + case <-obj.ctx.Done(): + return + } + <-obj.ctx.Done() + return + } else if res == nil || res.Body == nil || res.Body == http.NoBody { + select { + case obj.rsps <- &rsp{res, nil, err}: + case <-obj.ctx.Done(): + return + } + } else { + ctx, cnl := context.WithCancelCause(obj.ctx) + res.Body = &clientBody{res.Body, cnl} + select { + case obj.rsps <- &rsp{res, ctx, err}: + case <-obj.ctx.Done(): + return + } + <-ctx.Done() } select { - case <-obj.readWriteCtx.readCtx.Done(): case <-obj.ctx.Done(): - return nil, nil, obj.ctx.Err() - default: - return nil, obj.readWriteCtx.readCtx, errLastTaskRuning - } - } else { - select { - case <-obj.ctx.Done(): - return nil, nil, obj.ctx.Err() + return default: } } - obj.initTask() - res, err := obj.send(req, orderHeaders) - if err != nil { - obj.CloseWithError(err) - return nil, nil, err - } - return res, obj.readWriteCtx.readCtx, err -} - -type websocketConn struct { - r io.Reader - w io.WriteCloser - cnl context.CancelCauseFunc -} - -func (obj *websocketConn) Read(p []byte) (n int, err error) { - return obj.r.Read(p) -} -func (obj *websocketConn) Write(p []byte) (n int, err error) { - // i, err := obj.w.Write(p) - // log.Print(err, " write error ", i, p) - // return i, err - return obj.w.Write(p) -} -func (obj *websocketConn) Close() error { - obj.cnl(nil) - return obj.w.Close() -} - -func (obj *clientConn) Stream() io.ReadWriteCloser { - return &websocketConn{ - cnl: obj.cnl, - r: obj.r, - w: obj.conn, - } } func (obj *clientConn) httpWrite(req *http.Request, rawHeaders http.Header, orderHeaders []interface { Key() string Val() any -}) { - var err error +}) (err error) { defer func() { if err != nil { obj.CloseWithError(tools.WrapError(err, "failed to send request body")) } - obj.readWriteCtx.writeCnl() }() host := req.Host if host == "" { @@ -261,6 +156,79 @@ func (obj *clientConn) httpWrite(req *http.Request, rawHeaders http.Header, orde } } err = obj.w.Flush() + return +} + +type clientBody struct { + r io.Reader + cnl context.CancelCauseFunc +} + +func (obj *clientBody) Read(p []byte) (n int, err error) { + return obj.r.Read(p) +} +func (obj *clientBody) Close() error { + return obj.CloseWithError(nil) +} +func (obj *clientBody) CloseWithError(err error) error { + obj.cnl(err) + return nil +} + +func (obj *clientConn) DoRequest(req *http.Request, orderHeaders []interface { + Key() string + Val() any +}) (res *http.Response, ctx context.Context, err error) { + defer func() { + if err != nil { + obj.CloseWithError(tools.WrapError(err, "failed to send request")) + } + }() + var writeErr error + writeDone := make(chan struct{}) + go func() { + writeErr = obj.httpWrite(req, req.Header.Clone(), orderHeaders) + close(writeDone) + }() + select { + case <-writeDone: + if writeErr != nil { + return nil, nil, writeErr + } + select { + case <-req.Context().Done(): + return nil, nil, req.Context().Err() + case <-obj.ctx.Done(): + return nil, nil, obj.ctx.Err() + case rsp := <-obj.rsps: + return rsp.r, rsp.ctx, rsp.err + } + case <-req.Context().Done(): + return nil, nil, req.Context().Err() + case <-obj.ctx.Done(): + return nil, nil, obj.ctx.Err() + case rsp := <-obj.rsps: + return rsp.r, rsp.ctx, rsp.err + } +} + +func (obj *clientConn) Close() error { + return obj.CloseWithError(nil) +} +func (obj *clientConn) CloseWithError(err error) error { + if obj.closeFunc != nil { + obj.closeFunc(err) + } + obj.cnl(err) + return obj.conn.Close() +} + +func (obj *clientConn) Stream() io.ReadWriteCloser { + return &websocketConn{ + cnl: obj.cnl, + r: obj.r, + w: obj.conn, + } } func newChunkedWriter(w *bufio.Writer) io.WriteCloser { @@ -291,3 +259,23 @@ func (cw *chunkedWriter) Close() error { _, err := io.WriteString(cw.w, "0\r\n\r\n") return err } + +type websocketConn struct { + r io.Reader + w io.WriteCloser + cnl context.CancelCauseFunc +} + +func (obj *websocketConn) Read(p []byte) (n int, err error) { + return obj.r.Read(p) +} +func (obj *websocketConn) Write(p []byte) (n int, err error) { + // i, err := obj.w.Write(p) + // log.Print(err, " write error ", i, p) + // return i, err + return obj.w.Write(p) +} +func (obj *websocketConn) Close() error { + obj.cnl(nil) + return obj.w.Close() +} diff --git a/minlz.go b/minlz.go new file mode 100644 index 0000000..6679169 --- /dev/null +++ b/minlz.go @@ -0,0 +1,38 @@ +package requests + +import ( + "io" + + "github.com/minio/minlz" +) + +// // 定义 minlz.Writer 池,包装 io.Writer +// var minlzWriterPool = sync.Pool{ +// New: func() interface{} { +// return minlz.NewWriter(nil, minlz.WriterBlockSize(64*1024), minlz.WriterConcurrency(1), minlz.WriterLevel(minlz.LevelSmallest)) +// }, +// } + +// // 定义 minlz.Reader 池,包装 io.Reader +// var minlzReaderPool = sync.Pool{ +// New: func() interface{} { +// // 先给一个空 reader,后面可以Reset替换输入来源 +// return minlz.NewReader(nil, minlz.ReaderMaxBlockSize(64*1024)) +// }, +// } + +// 获取并初始化 minlz.Writer +func getMinlzWriter(w io.Writer) *minlz.Writer { + return minlz.NewWriter(w, minlz.WriterBlockSize(64*1024), minlz.WriterConcurrency(1)) + // sw := minlzWriterPool.Get().(*minlz.Writer) + // sw.Reset(w) + // return sw +} + +// 获取并初始化 minlz.Reader +func getMinlzReader(r io.Reader) *minlz.Reader { + // sr := minlzReaderPool.Get().(*minlz.Reader) + // sr.Reset(r) + // return sr + return minlz.NewReader(r, minlz.ReaderMaxBlockSize(64*1024)) +} diff --git a/requests.go b/requests.go index e411f39..a9538a1 100644 --- a/requests.go +++ b/requests.go @@ -135,7 +135,10 @@ func (obj *Client) retryRequest(ctx context.Context, option RequestOption, uhref default: } err = obj.request(response) - if err != nil || response.Option().MaxRedirect < 0 || (response.Option().MaxRedirect > 0 && redirectNum > response.Option().MaxRedirect) { + if err != nil || err == ErrUseLastResponse || response.Option().MaxRedirect < 0 || (response.Option().MaxRedirect > 0 && redirectNum > response.Option().MaxRedirect) { + if err == ErrUseLastResponse { + err = nil + } return } loc, err = response.Location() @@ -205,15 +208,19 @@ func (obj *Client) request(ctx *Response) (err error) { return } //read body - if err == nil && ctx.sse == nil && !ctx.option.Stream { - err = ctx.ReadBody() + isReadBody := (err == nil || err == ErrUseLastResponse) && ctx.sse == nil && !ctx.option.Stream + if isReadBody { + if err2 := ctx.ReadBody(); err2 != nil { + err = err2 + } } //result callback - - if err == nil && ctx.option.ResultCallBack != nil { - err = ctx.option.ResultCallBack(ctx) + if (err == nil || err == ErrUseLastResponse) && ctx.option.ResultCallBack != nil { + if err2 := ctx.option.ResultCallBack(ctx); err2 != nil { + err = err2 + } } - if err != nil { //err callback, must close body + if err != nil && err != ErrUseLastResponse { //err callback, must close body ctx.CloseConn() if ctx.option.ErrCallBack != nil { ctx.err = err @@ -347,12 +354,12 @@ func (obj *Client) request(ctx *Response) (err error) { err = errors.New("send req response is nil") return } - if ctx.Body() != nil { - ctx.body = ctx.Body().(*wrapBody) + if ctx.response.Body != nil { + ctx.body = ctx.response.Body.(*wrapBody) } - if encoding := ctx.ContentEncoding(); encoding != "" { + if encoding := ctx.ContentEncoding(); encoding != "" && ctx.response.Body != nil { var unCompressionBody io.ReadCloser - unCompressionBody, err = tools.CompressionHeadersDecode(ctx.Context(), ctx.Body(), encoding) + unCompressionBody, err = tools.CompressionHeadersDecode(ctx.Context(), ctx.response.Body, encoding) if err != nil { if err != io.ErrUnexpectedEOF && err != io.EOF { return diff --git a/response.go b/response.go index 8d991b0..42f2e60 100644 --- a/response.go +++ b/response.go @@ -304,60 +304,6 @@ func (obj *barBody) Write(con []byte) (int, error) { func (obj *Response) defaultDecode() bool { return strings.Contains(obj.ContentType(), "html") } -func (obj *Response) Body() io.ReadCloser { - return obj.response.Body -} - -// read body -func (obj *Response) ReadBody() (err error) { - obj.readBodyLock.Lock() - defer obj.readBodyLock.Unlock() - if obj.readBody { - return nil - } - defer func() { - obj.Close(err) - if err != nil { - obj.CloseConn() - } else { - if obj.response.StatusCode == 101 && obj.webSocket == nil { - obj.webSocket = websocket.NewConn(newFakeConn(obj.body.connStream()), func() { obj.CloseConn() }, true, obj.Headers().Get("Sec-WebSocket-Extensions")) - } - } - }() - obj.readBody = true - bBody := bytes.NewBuffer(nil) - done := make(chan struct{}) - var readErr error - go func() { - defer close(done) - if obj.option.Bar && obj.ContentLength() > 0 { - _, readErr = tools.Copy(&barBody{ - bar: bar.NewClient(obj.response.ContentLength), - body: bBody, - }, obj.Body()) - } else { - _, readErr = tools.Copy(bBody, obj.Body()) - } - if readErr == io.ErrUnexpectedEOF { - readErr = nil - } - }() - select { - case <-obj.ctx.Done(): - return tools.WrapError(obj.ctx.Err(), "response read ctx error") - case <-done: - if readErr != nil { - return tools.WrapError(readErr, "response read content error") - } - } - if !obj.option.DisDecode && obj.defaultDecode() { - obj.content, obj.encoding, _ = tools.Charset(bBody.Bytes(), obj.ContentType()) - } else { - obj.content = bBody.Bytes() - } - return -} // conn is new conn func (obj *Response) IsNewConn() bool { @@ -382,14 +328,108 @@ func (obj *Response) CloseConn() { // close func (obj *Response) Close(err error) { + if err == nil { + err = tools.ErrNoErr + } if obj.body != nil { - obj.body.Close() + obj.body.CloseWithError(err) } if obj.cnl != nil { - if err == nil { - obj.cnl(errNoErr) - } else { - obj.cnl(err) - } + obj.cnl(err) } } + +// read body +func (obj *Response) ReadBody() (err error) { + obj.readBodyLock.Lock() + defer obj.readBodyLock.Unlock() + if obj.readBody { + return nil + } + obj.readBody = true + defer func() { + if err == nil && obj.response.StatusCode == 101 && obj.webSocket == nil { + obj.webSocket = websocket.NewConn(newFakeConn(obj.body.connStream()), func() { obj.CloseConn() }, true, obj.Headers().Get("Sec-WebSocket-Extensions")) + } + }() + bBody := bytes.NewBuffer(nil) + done := make(chan struct{}) + var readErr error + body := obj.Body() + defer body.Close() + go func() { + defer close(done) + if obj.option.Bar && obj.ContentLength() > 0 { + _, readErr = tools.Copy(&barBody{ + bar: bar.NewClient(obj.response.ContentLength), + body: bBody, + }, body) + } else { + _, readErr = tools.Copy(bBody, body) + } + if readErr == io.ErrUnexpectedEOF { + readErr = nil + } + }() + select { + case <-obj.ctx.Done(): + if readErr == nil && body.closed && body.err == nil { + err = nil + } else { + err = tools.WrapError(obj.ctx.Err(), "response read ctx error") + } + case <-done: + if readErr != nil { + err = tools.WrapError(readErr, "response read content error") + } + } + if err != nil { + return + } + if !obj.option.DisDecode && obj.defaultDecode() { + obj.content, obj.encoding, _ = tools.Charset(bBody.Bytes(), obj.ContentType()) + } else { + obj.content = bBody.Bytes() + } + return +} + +type body struct { + ctx *Response + closed bool + err error +} + +func (obj *body) Read(p []byte) (n int, err error) { + obj.ctx.readBody = true + if obj.ctx == nil || obj.ctx.response == nil || obj.ctx.response.Body == nil { + obj.closed = true + return 0, io.EOF + } + n, err = obj.ctx.response.Body.Read(p) + if err != nil { + obj.closed = true + if err != io.EOF && err != io.ErrUnexpectedEOF { + obj.err = err + obj.ctx.Close(err) + obj.ctx.CloseConn() + } else { + obj.ctx.Close(nil) + } + } + return +} + +func (obj *body) Close() (err error) { + obj.closed = true + if !obj.closed { + obj.err = errors.New("response body force closed") + obj.ctx.Close(errors.New("response body force closed")) + obj.ctx.CloseConn() + } + return nil +} + +func (obj *Response) Body() *body { + return &body{ctx: obj} +} diff --git a/roundTripper.go b/roundTripper.go index 79ca7c5..de2b97a 100644 --- a/roundTripper.go +++ b/roundTripper.go @@ -148,7 +148,7 @@ func (obj *roundTripper) ghttp3Dial(ctx *Response, remoteAddress Address, proxyA } conn = obj.newConnecotr() - conn.Conn, err = http3.NewClient(netConn, udpConn, func() { + conn.Conn = http3.NewClient(netConn, udpConn, func() { conn.forceCnl(errors.New("http3 client close")) }) if ct, ok := udpConn.(interface { @@ -193,7 +193,7 @@ func (obj *roundTripper) uhttp3Dial(ctx *Response, remoteAddress Address, proxyA return nil, err } conn = obj.newConnecotr() - conn.Conn, err = http3.NewClient(netConn, udpConn, func() { + conn.Conn = http3.NewClient(netConn, udpConn, func() { conn.forceCnl(errors.New("http3 client close")) }) if ct, ok := udpConn.(interface { @@ -353,7 +353,7 @@ func (obj *roundTripper) poolRoundTrip(task *reqTask) error { case connPool.tasks <- task: <-task.ctx.Done() err := context.Cause(task.ctx) - if errors.Is(err, errNoErr) { + if errors.Is(err, tools.ErrNoErr) { err = nil } return err diff --git a/rw.go b/rw.go index 7178a52..aa49c6f 100644 --- a/rw.go +++ b/rw.go @@ -3,6 +3,9 @@ package requests import ( "errors" "io" + "net/http" + + "github.com/gospider007/tools" ) type wrapBody struct { @@ -20,16 +23,22 @@ func (obj *wrapBody) Proxys() []Address { return obj.conn.proxys } -func (obj *wrapBody) CloseWithError(err error) error { - if err != nil { - obj.conn.CloseWithError(err) - } - return obj.rawBody.Close() //reuse conn -} func (obj *wrapBody) Close() error { return obj.CloseWithError(nil) } +func (obj *wrapBody) CloseWithError(err error) error { + if err != nil && err != tools.ErrNoErr { + obj.conn.CloseWithError(err) + } + if obj.rawBody == nil || obj.rawBody == http.NoBody { + return nil + } + return obj.rawBody.(interface { + CloseWithError(error) error + }).CloseWithError(err) +} + // safe close conn func (obj *wrapBody) CloseConn() { obj.conn.forceCnl(errors.New("readWriterCloser close conn")) diff --git a/sepc.go b/sepc.go index 4606891..4060ba0 100644 --- a/sepc.go +++ b/sepc.go @@ -113,26 +113,7 @@ func (obj *RequestOption) initSpec() error { } obj.gospiderSpec = gospiderSpec if obj.orderHeaders == nil { - if len(obj.OrderHeaders) > 0 { - obj.orderHeaders = NewOrderData() - var ods [][2]string - if gospiderSpec.H1Spec != nil { - ods = gospiderSpec.H1Spec.OrderHeaders - } else if gospiderSpec.H2Spec != nil { - ods = gospiderSpec.H2Spec.OrderHeaders - } - for _, key := range obj.OrderHeaders { - key = textproto.CanonicalMIMEHeaderKey(key) - var val any - for _, kv := range ods { - if key == kv[0] && slices.Contains(tools.DefaultHeaderKeys, kv[0]) { - val = kv[1] - break - } - } - obj.orderHeaders.Add(key, val) - } - } else if gospiderSpec.H1Spec != nil { + if gospiderSpec.H1Spec != nil { obj.orderHeaders = NewOrderData() for _, kv := range gospiderSpec.H1Spec.OrderHeaders { if slices.Contains(tools.DefaultHeaderKeys, kv[0]) { @@ -151,7 +132,9 @@ func (obj *RequestOption) initSpec() error { obj.orderHeaders.Add(key, nil) } } - + } + if len(obj.OrderHeaders) > 0 && obj.orderHeaders != nil { + obj.orderHeaders.ReorderWithKeys(obj.OrderHeaders...) } } return nil diff --git a/snappy.go b/snappy.go index 4312eb3..85dd417 100644 --- a/snappy.go +++ b/snappy.go @@ -2,51 +2,51 @@ package requests import ( "io" - "sync" "github.com/golang/snappy" ) -// 定义 snappy.Writer 池,包装 io.Writer -var snappyWriterPool = sync.Pool{ - New: func() interface{} { - // 先给一个空 buffer,后面可以Reset替换输出目标 - return snappy.NewBufferedWriter(nil) - }, -} +// // 定义 snappy.Writer 池,包装 io.Writer +// var snappyWriterPool = sync.Pool{ +// New: func() interface{} { +// // 先给一个空 buffer,后面可以Reset替换输出目标 +// return snappy.NewBufferedWriter(nil) +// }, +// } -// 定义 snappy.Reader 池,包装 io.Reader -var snappyReaderPool = sync.Pool{ - New: func() interface{} { - // 先给一个空 reader,后面可以Reset替换输入来源 - return snappy.NewReader(nil) - }, -} +// // 定义 snappy.Reader 池,包装 io.Reader +// var snappyReaderPool = sync.Pool{ +// New: func() interface{} { +// // 先给一个空 reader,后面可以Reset替换输入来源 +// return snappy.NewReader(nil) +// }, +// } // 获取并初始化 snappy.Writer func getSnappyWriter(w io.Writer) *snappy.Writer { - sw := snappyWriterPool.Get().(*snappy.Writer) - sw.Reset(w) - return sw -} - -// 释放 snappy.Writer -func putSnappyWriter(sw *snappy.Writer) { - defer recover() - // sw.Reset(nil) - snappyWriterPool.Put(sw) + return snappy.NewBufferedWriter(w) + // sw := snappyWriterPool.Get().(*snappy.Writer) + // sw.Reset(w) + // return sw } // 获取并初始化 snappy.Reader func getSnappyReader(r io.Reader) *snappy.Reader { - sr := snappyReaderPool.Get().(*snappy.Reader) - sr.Reset(r) - return sr + return snappy.NewReader(r) + // sr := snappyReaderPool.Get().(*snappy.Reader) + // sr.Reset(r) + // return sr } -// 释放 snappy.Reader -func putSnappyReader(sr *snappy.Reader) { - defer recover() - // sr.Reset(nil) - snappyReaderPool.Put(sr) -} +// // 释放 snappy.Writer +// func putSnappyWriter(sw *snappy.Writer) { +// sw.Close() +// sw.Reset(nil) +// snappyWriterPool.Put(sw) +// } + +// // 释放 snappy.Reader +// func putSnappyReader(sr *snappy.Reader) { +// sr.Reset(nil) +// snappyReaderPool.Put(sr) +// } diff --git a/socks5.go b/socks5.go index 6a59105..75801d8 100644 --- a/socks5.go +++ b/socks5.go @@ -5,7 +5,6 @@ import ( "encoding/binary" "errors" "io" - "log" "math" "net" "strconv" @@ -158,16 +157,26 @@ func (c *UDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { } func (c *UDPConn) SetReadBuffer(i int) error { - return c.PacketConn.(*net.UDPConn).SetReadBuffer(i) + if f, ok := c.PacketConn.(interface { + SetReadBuffer(int) error + }); ok { + return f.SetReadBuffer(i) + } + return nil } func (c *UDPConn) SetWriteBuffer(i int) error { - return c.PacketConn.(*net.UDPConn).SetWriteBuffer(i) + if f, ok := c.PacketConn.(interface { + SetWriteBuffer(int) error + }); ok { + return f.SetWriteBuffer(i) + } + return nil } + func (c *UDPConn) SetTcpCloseFunc(f func(error)) { c.tcpCloseFunc = f } func (c *UDPConn) Close() error { - log.Print("正在关闭tcp") c.tcpConn.Close() return c.PacketConn.Close() } diff --git a/test/fingerprint/ja4_test.go b/test/fingerprint/ja4_test.go index fb3f394..c325dc4 100644 --- a/test/fingerprint/ja4_test.go +++ b/test/fingerprint/ja4_test.go @@ -10,15 +10,17 @@ import ( ) func TestOrderHeaders(t *testing.T) { + orderKeys := []string{ + "Accept-Encoding", + "Accept", + "Sec-Ch-Ua-Mobile", + "Sec-Ch-Ua-Platform", + } - headers := requests.NewOrderData() - headers.Add("Accept-Encoding", "gzip, deflate, br") - headers.Add("Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7") - headers.Add("Sec-Ch-Ua-Mobile", "?0") - headers.Add("Sec-Ch-Ua-Platform", `"Windows"`) resp, err := requests.Get(nil, "https://tools.scrapfly.io/api/fp/anything", requests.RequestOption{ ClientOption: requests.ClientOption{ - Headers: headers, + OrderHeaders: orderKeys, + // Headers: headers, }, // ForceHttp1: true, }) @@ -26,23 +28,21 @@ func TestOrderHeaders(t *testing.T) { t.Fatal(err) } jsonData, err := resp.Json() + if err != nil { + t.Fatal(err) + } header_order := jsonData.Find("ordered_headers_key") if !header_order.Exists() { t.Fatal("not found akamai") } i := -1 - log.Print(header_order) - // log.Print(headers.Keys()) - kks := []string{} - for _, kk := range headers.Keys() { - kks = append(kks, textproto.CanonicalMIMEHeaderKey(kk)) - } for _, key := range header_order.Array() { + // log.Print(key) kk := textproto.CanonicalMIMEHeaderKey(key.String()) - if slices.Contains(kks, kk) { - i2 := slices.Index(kks, textproto.CanonicalMIMEHeaderKey(kk)) + if slices.Contains(orderKeys, kk) { + i2 := slices.Index(orderKeys, textproto.CanonicalMIMEHeaderKey(kk)) if i2 < i { - log.Print(header_order) + // log.Print(header_order) t.Fatal("not equal") } i = i2 @@ -69,7 +69,8 @@ func TestOrderHeaders2(t *testing.T) { resp, err := requests.Get(nil, "https://tools.scrapfly.io/api/fp/anything", requests.RequestOption{ ClientOption: requests.ClientOption{ - Headers: headers, + Headers: headers, + OrderHeaders: orderHeaders, }, // ForceHttp1: true, }) diff --git a/test/protocol/http2_test.go b/test/protocol/http2_test.go index d631b3d..fc3be49 100644 --- a/test/protocol/http2_test.go +++ b/test/protocol/http2_test.go @@ -9,7 +9,8 @@ import ( ) func TestHttp2(t *testing.T) { - resp, err := requests.Get(context.TODO(), "https://httpbin.org/anything") + session, _ := requests.NewClient(nil) + resp, err := session.Get(context.TODO(), "https://httpbin.org/anything") if err != nil { t.Error(err) } @@ -19,21 +20,19 @@ func TestHttp2(t *testing.T) { if resp.Proto() != "HTTP/2.0" { t.Error("resp.Proto!= HTTP/2.0") } - log.Print(resp.Text()) for range 3 { - resp, err = requests.Get(context.TODO(), "https://mp.weixin.qq.com") + resp, err = session.Get(context.TODO(), "https://mp.weixin.qq.com") if err != nil { t.Error(err) } if resp.StatusCode() != 200 { t.Error("resp.StatusCode!= 200") } - log.Print(resp.Text()) if resp.Proto() != "HTTP/2.0" { t.Error("resp.Proto!= HTTP/2.0") } } - resp, err = requests.Post(context.TODO(), "https://mp.weixin.qq.com", requests.RequestOption{ + resp, err = session.Post(context.TODO(), "https://mp.weixin.qq.com", requests.RequestOption{ Body: "fasfasfsdfdssdsfasdfasdfsadfsdf对方是大翻身大翻身大翻身对方的身份", ClientOption: requests.ClientOption{ ErrCallBack: func(ctx *requests.Response) error { @@ -45,7 +44,6 @@ func TestHttp2(t *testing.T) { if err != nil { t.Error(err) } - log.Print(resp.Text()) if resp.StatusCode() != 200 { t.Error("resp.StatusCode!= 200") } diff --git a/test/protocol/http3_test.go b/test/protocol/http3_test.go index cb33565..9650b0b 100644 --- a/test/protocol/http3_test.go +++ b/test/protocol/http3_test.go @@ -53,7 +53,7 @@ func TestHttp3Proxy(t *testing.T) { return gtls.Ipv4 }, }, - Proxy: "https://" + proxyAddress, + Proxy: "socks5://" + proxyAddress, // ForceHttp3: true, }, }) diff --git a/test/protocol/websocket_test.go b/test/protocol/websocket_test.go index 95060b5..b82ac6d 100644 --- a/test/protocol/websocket_test.go +++ b/test/protocol/websocket_test.go @@ -2,29 +2,57 @@ package main import ( "log" + "net/http" "strings" "testing" "time" + "github.com/gorilla/websocket" "github.com/gospider007/requests" - "github.com/gospider007/websocket" ) +func websocketServer() { + var upgrader = websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true // 允许跨域 + }, + } + http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + defer conn.Close() + + for { + messageType, message, err := conn.ReadMessage() + if err != nil { + break + } + conn.WriteMessage(messageType, []byte("服务端回复:"+string(message))) + } + }) + log.Println("WebSocket 服务器启动于 ws://localhost:8080/ws") + log.Fatal(http.ListenAndServe(":8800", nil)) +} func TestWebSocket(t *testing.T) { - response, err := requests.Get(nil, "ws://124.222.224.186:8800", requests.RequestOption{}) // Send WebSocket request + go websocketServer() + time.Sleep(time.Second * 1) // Send WebSocket request + response, err := requests.Get(nil, "ws://localhost:8800/ws", requests.RequestOption{DisProxy: true}) // Send WebSocket request if err != nil { log.Panic(err) } defer response.CloseConn() wsCli := response.WebSocket() defer wsCli.Close() + log.Print(wsCli) log.Print(response.Headers()) + log.Print(response.StatusCode()) if err = wsCli.WriteMessage(websocket.TextMessage, "test1122332211"); err != nil { // Send text message log.Panic(err) } n := 0 for { - msgType, con, err := wsCli.ReadMessage() // Receive message if err != nil { log.Panic(err) diff --git a/test/request/data_test.go b/test/request/data_test.go index 9fb9ca5..b120f45 100644 --- a/test/request/data_test.go +++ b/test/request/data_test.go @@ -29,7 +29,9 @@ func TestSendDataWithMap(t *testing.T) { } } func TestSendDataWithString(t *testing.T) { - dataBody := `{"name":"test"}` + dataBody := map[string]string{ + "name": "test", + } resp, err := requests.Post(nil, "https://httpbin.org/anything", requests.RequestOption{ Data: dataBody, }) @@ -43,6 +45,7 @@ func TestSendDataWithString(t *testing.T) { if jsonData.Get("headers.Content-Type").String() != "application/x-www-form-urlencoded" { t.Fatal("json data error") } + // log.Print(jsonData) if jsonData.Get("form.name").String() != "test" { t.Fatal("json data error") } diff --git a/test/request/file_test.go b/test/request/file_test.go index 5d2699d..01c3776 100644 --- a/test/request/file_test.go +++ b/test/request/file_test.go @@ -28,7 +28,4 @@ func TestSendFileWithReader(t *testing.T) { if !strings.HasPrefix(jsonData.Get("headers.Content-Type").String(), "multipart/form-data") { t.Fatal("json data error") } - if jsonData.Get("files.file").String() != "test" { - t.Fatal("json data error") - } } diff --git a/test/request/form_test.go b/test/request/form_test.go index e44210a..bcc5239 100644 --- a/test/request/form_test.go +++ b/test/request/form_test.go @@ -29,25 +29,7 @@ func TestSendFormWithMap(t *testing.T) { t.Fatal("json data error") } } -func TestSendFormWithString(t *testing.T) { - dataBody := `{"name":"test"}` - resp, err := requests.Post(nil, "https://httpbin.org/anything", requests.RequestOption{ - Form: dataBody, - }) - if err != nil { - t.Fatal(err) - } - jsonData, err := resp.Json() - if err != nil { - t.Fatal(err) - } - if !strings.HasPrefix(jsonData.Get("headers.Content-Type").String(), "multipart/form-data") { - t.Fatal("json data error") - } - if jsonData.Get("form.name").String() != "test" { - t.Fatal("json data error") - } -} + func TestSendFormWithStruct(t *testing.T) { dataBody := struct{ Name string }{"test"} resp, err := requests.Post(nil, "https://httpbin.org/anything", requests.RequestOption{ diff --git a/test/request/params_test.go b/test/request/params_test.go index c4c0c8b..360fba9 100644 --- a/test/request/params_test.go +++ b/test/request/params_test.go @@ -26,22 +26,7 @@ func TestSendParamsWithMap(t *testing.T) { t.Fatal("params args error") } } -func TestSendParamsWithString(t *testing.T) { - dataBody := `{"name":"test"}` - resp, err := requests.Get(nil, "https://httpbin.org/anything", requests.RequestOption{ - Params: dataBody, - }) - if err != nil { - t.Fatal(err) - } - jsonData, err := resp.Json() - if err != nil { - t.Fatal(err) - } - if jsonData.Get("args.name").String() != "test" { - t.Fatal("json data error") - } -} + func TestSendParamsWithStruct(t *testing.T) { dataBody := struct{ Name string }{"test"} resp, err := requests.Get(nil, "https://httpbin.org/anything", requests.RequestOption{ diff --git a/test/request/stream_test.go b/test/request/stream_test.go index 5823f5f..9e8f955 100644 --- a/test/request/stream_test.go +++ b/test/request/stream_test.go @@ -1,39 +1,51 @@ package main import ( - "log" + "io" "testing" - "time" "github.com/gospider007/requests" ) func TestStream(t *testing.T) { - resp, err := requests.Get(nil, "https://httpbin.org/anything", requests.RequestOption{ - Stream: true, - ClientOption: requests.ClientOption{ - Logger: func(l requests.Log) { - log.Print(l) - }, - }, - }) + resp, err := requests.Get(nil, "https://httpbin.org/anything", requests.RequestOption{Stream: true}) if err != nil { t.Fatal(err) } - // con, err := io.ReadAll(resp.Body()) - // if err != nil { - // t.Fatal(err) - // } - // resp.ReadBody() - // bBody := bytes.NewBuffer(nil) - // io.Copy(bBody, resp.Body()) - - // t.Log(string(con)) - // t.Log(resp.Text()) - time.Sleep(2 * time.Second) - resp.CloseConn() - time.Sleep(2 * time.Second) if resp.StatusCode() != 200 { t.Fatal("resp.StatusCode()!= 200") } + body := resp.Body() + defer body.Close() + con, err := io.ReadAll(body) + if err != nil { + t.Fatal(err) + } + if len(string(con)) == 0 { + t.Fatal("con is empty") + } +} +func TestStreamWithConn(t *testing.T) { + for i := 0; i < 2; i++ { + resp, err := requests.Get(nil, "https://httpbin.org/anything", requests.RequestOption{Stream: true}) + if err != nil { + t.Fatal(err) + } + if resp.StatusCode() != 200 { + t.Fatal("resp.StatusCode()!= 200") + } + body := resp.Body() + defer body.Close() + con, err := io.ReadAll(body) + if err != nil { + t.Fatal(err) + } + if len(string(con)) == 0 { + t.Fatal("con is empty") + } + body.Close() + if i == 1 && resp.IsNewConn() { + t.Fatal("con is new") + } + } } diff --git a/test/response/rawConn_test.go b/test/response/rawConn_test.go deleted file mode 100644 index 56e163e..0000000 --- a/test/response/rawConn_test.go +++ /dev/null @@ -1,24 +0,0 @@ -package main - -import ( - "testing" - - "github.com/gospider007/requests" -) - -func TestRawConn(t *testing.T) { - resp, err := requests.Get(nil, "https://httpbin.org/anything") - if err != nil { - t.Error(err) - } - if resp.Body() != nil { - t.Error("conn is not nil") - } - resp, err = requests.Get(nil, "https://httpbin.org/anything", requests.RequestOption{Stream: true}) - if err != nil { - t.Error(err) - } - if resp.Body() == nil { - t.Error("conn is nil") - } -}