mirror of
https://github.com/gospider007/requests.git
synced 2025-12-24 13:57:52 +08:00
修复bug
This commit is contained in:
@@ -33,8 +33,6 @@ type ClientOption struct {
|
||||
|
||||
RedirectNum int //重定向次数,小于0为禁用,0:不限制
|
||||
|
||||
DisAlive bool //关闭连接复用
|
||||
|
||||
DisDecode bool //关闭自动编码
|
||||
DisUnZip bool //关闭自动解压
|
||||
TryNum int //重试次数
|
||||
@@ -66,7 +64,6 @@ type Client struct {
|
||||
headers any //请求头
|
||||
bar bool //是否开启bar
|
||||
|
||||
disAlive bool
|
||||
disCookie bool
|
||||
client *http.Client
|
||||
noJarClient *http.Client
|
||||
@@ -165,7 +162,6 @@ func NewClient(preCtx context.Context, options ...ClientOption) (*Client, error)
|
||||
|
||||
disCookie: option.DisCookie,
|
||||
redirectNum: option.RedirectNum,
|
||||
disAlive: option.DisAlive,
|
||||
disDecode: option.DisDecode,
|
||||
disUnZip: option.DisUnZip,
|
||||
tryNum: option.TryNum,
|
||||
|
||||
28
dial.go
28
dial.go
@@ -275,10 +275,10 @@ func (obj *DialClient) DialContext(ctx context.Context, netword string, addr str
|
||||
func (obj *DialClient) Dialer() *net.Dialer {
|
||||
return obj.dialer
|
||||
}
|
||||
func (obj *DialClient) AddTls(ctx context.Context, conn net.Conn, host string, disHttp bool, tlsConfig *tls.Config) (*tls.Conn, error) {
|
||||
func (obj *DialClient) AddTls(ctx context.Context, conn net.Conn, host string, disHttp2 bool, tlsConfig *tls.Config) (*tls.Conn, error) {
|
||||
var tlsConn *tls.Conn
|
||||
tlsConfig.ServerName = GetServerName(host)
|
||||
if disHttp {
|
||||
if disHttp2 {
|
||||
tlsConfig.NextProtos = []string{"http/1.1"}
|
||||
} else {
|
||||
tlsConfig.NextProtos = []string{"h2", "http/1.1"}
|
||||
@@ -286,14 +286,14 @@ func (obj *DialClient) AddTls(ctx context.Context, conn net.Conn, host string, d
|
||||
tlsConn = tls.Client(conn, tlsConfig)
|
||||
return tlsConn, tlsConn.HandshakeContext(ctx)
|
||||
}
|
||||
func (obj *DialClient) AddJa3Tls(ctx context.Context, conn net.Conn, host string, disHttp bool, ja3Spec ja3.Ja3Spec, tlsConfig *utls.Config) (*utls.UConn, error) {
|
||||
func (obj *DialClient) AddJa3Tls(ctx context.Context, conn net.Conn, host string, disHttp2 bool, ja3Spec ja3.Ja3Spec, tlsConfig *utls.Config) (*utls.UConn, error) {
|
||||
tlsConfig.ServerName = GetServerName(host)
|
||||
if disHttp {
|
||||
if disHttp2 {
|
||||
tlsConfig.NextProtos = []string{"http/1.1"}
|
||||
} else {
|
||||
tlsConfig.NextProtos = []string{"h2", "http/1.1"}
|
||||
}
|
||||
return ja3.NewClient(ctx, conn, ja3Spec, disHttp, tlsConfig)
|
||||
return ja3.NewClient(ctx, conn, ja3Spec, disHttp2, tlsConfig)
|
||||
}
|
||||
func (obj *DialClient) Socks5Proxy(ctx context.Context, network string, addr string, proxyUrl *url.URL) (conn net.Conn, err error) {
|
||||
defer func() {
|
||||
@@ -324,7 +324,6 @@ func (obj *DialClient) clientVerifyHttps(ctx context.Context, scheme string, pro
|
||||
hdr.Set("Proxy-Authorization", "Basic "+tools.Base64Encode(proxyUrl.User.Username()+":"+password))
|
||||
}
|
||||
}
|
||||
didReadResponse := make(chan struct{}) // closed after CONNECT write+read is done or fails
|
||||
cHost := host
|
||||
_, hport, _ := net.SplitHostPort(host)
|
||||
if hport == "" {
|
||||
@@ -341,6 +340,7 @@ func (obj *DialClient) clientVerifyHttps(ctx context.Context, scheme string, pro
|
||||
}
|
||||
|
||||
var resp *http.Response
|
||||
didReadResponse := make(chan struct{}) // closed after CONNECT write+read is done or fails
|
||||
go func() {
|
||||
defer close(didReadResponse)
|
||||
connectReq := &http.Request{
|
||||
@@ -383,22 +383,16 @@ func (obj *DialClient) DialContextWithProxy(ctx context.Context, netword string,
|
||||
}
|
||||
}
|
||||
switch proxyUrl.Scheme {
|
||||
case "http":
|
||||
case "http", "https":
|
||||
conn, err := obj.DialContext(ctx, netword, net.JoinHostPort(proxyUrl.Hostname(), proxyUrl.Port()))
|
||||
if err != nil {
|
||||
return conn, err
|
||||
} else if proxyUrl.Scheme == "https" {
|
||||
if conn, err = obj.AddTls(ctx, conn, proxyUrl.Host, true, tlsConfig); err != nil {
|
||||
return conn, err
|
||||
}
|
||||
}
|
||||
return conn, obj.clientVerifyHttps(ctx, scheme, proxyUrl, addr, host, conn)
|
||||
case "https":
|
||||
conn, err := obj.DialContext(ctx, netword, net.JoinHostPort(proxyUrl.Hostname(), proxyUrl.Port()))
|
||||
if err != nil {
|
||||
return conn, err
|
||||
}
|
||||
tlsConn, err := obj.AddTls(ctx, conn, proxyUrl.Host, true, tlsConfig)
|
||||
if err == nil {
|
||||
err = obj.clientVerifyHttps(ctx, scheme, proxyUrl, addr, host, tlsConn)
|
||||
}
|
||||
return tlsConn, err
|
||||
case "socks5":
|
||||
return obj.Socks5Proxy(ctx, netword, addr, proxyUrl)
|
||||
default:
|
||||
|
||||
14
go.mod
14
go.mod
@@ -6,13 +6,13 @@ require (
|
||||
gitee.com/baixudong/bar v0.0.0-20230923032414-c19cc384edeb
|
||||
gitee.com/baixudong/bs4 v0.0.0-20230927091031-00dd545f5784
|
||||
gitee.com/baixudong/gson v0.0.0-20230927090937-556b58fb6899
|
||||
gitee.com/baixudong/ja3 v0.0.0-20230927070510-152bc4a3675a
|
||||
gitee.com/baixudong/ja3 v0.0.0-20231010095622-1f26d89ef419
|
||||
gitee.com/baixudong/net v0.0.0-20230927083058-4a6a7a20f917
|
||||
gitee.com/baixudong/re v0.0.0-20230809033040-360c1d945e59
|
||||
gitee.com/baixudong/tools v0.0.0-20230926010219-77f29cbf5935
|
||||
gitee.com/baixudong/websocket v0.0.0-20230927082325-5b4eb178c3ec
|
||||
github.com/refraction-networking/utls v1.5.3
|
||||
golang.org/x/net v0.15.0
|
||||
github.com/refraction-networking/utls v1.5.4
|
||||
golang.org/x/net v0.16.0
|
||||
)
|
||||
|
||||
require (
|
||||
@@ -44,10 +44,10 @@ require (
|
||||
go.mongodb.org/mongo-driver v1.12.1 // indirect
|
||||
go.uber.org/multierr v1.11.0 // indirect
|
||||
go.uber.org/zap v1.26.0 // indirect
|
||||
golang.org/x/crypto v0.13.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
|
||||
golang.org/x/image v0.12.0 // indirect
|
||||
golang.org/x/sys v0.12.0 // indirect
|
||||
golang.org/x/crypto v0.14.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect
|
||||
golang.org/x/image v0.13.0 // indirect
|
||||
golang.org/x/sys v0.13.0 // indirect
|
||||
golang.org/x/text v0.13.0 // indirect
|
||||
nhooyr.io/websocket v1.8.7 // indirect
|
||||
)
|
||||
|
||||
14
go.sum
14
go.sum
@@ -8,6 +8,8 @@ gitee.com/baixudong/gson v0.0.0-20230927090937-556b58fb6899 h1:JFV4d4J07fYDmp5hy
|
||||
gitee.com/baixudong/gson v0.0.0-20230927090937-556b58fb6899/go.mod h1:1DBHleeT45+7ogNCRkJYKhq28Krc3oC5Hr4pHmd+GM0=
|
||||
gitee.com/baixudong/ja3 v0.0.0-20230927070510-152bc4a3675a h1:IONxzaxrHVCEINNw6oMcguyYqAu5DD9DjmYq2DoOuhY=
|
||||
gitee.com/baixudong/ja3 v0.0.0-20230927070510-152bc4a3675a/go.mod h1:Y3tcqTP6YapuS8pWDT7zkHgoKV14Sx652+OuJJyssk4=
|
||||
gitee.com/baixudong/ja3 v0.0.0-20231010095622-1f26d89ef419 h1:EUeBpoYVKI/rK0FQ8I8g50wx510HrQNju1orZ9RF25M=
|
||||
gitee.com/baixudong/ja3 v0.0.0-20231010095622-1f26d89ef419/go.mod h1:SiOf/JBnmWJ/aTQKVUHFKSDLF+gnlV2rIZ73Ecy/RD8=
|
||||
gitee.com/baixudong/kinds v0.0.0-20230809033013-c3d6d3479f8c h1:MZewpjU0+82TcA+nHrcAbbgZhTCqVH2I5zwYBWJ4v54=
|
||||
gitee.com/baixudong/kinds v0.0.0-20230809033013-c3d6d3479f8c/go.mod h1:J3r4FWnZOMgk7/1EWEEDaPQnx5uH4OLCRzpTOyKg5cA=
|
||||
gitee.com/baixudong/net v0.0.0-20230927083058-4a6a7a20f917 h1:/eAVC7BfyvZCMNMhYXtAw50RezEREx5baecMQZHGFAs=
|
||||
@@ -127,6 +129,8 @@ github.com/quic-go/quic-go v0.39.0 h1:AgP40iThFMY0bj8jGxROhw3S0FMGa8ryqsmi9tBH3S
|
||||
github.com/quic-go/quic-go v0.39.0/go.mod h1:T09QsDQWjLiQ74ZmacDfqZmhY/NLnw5BC40MANNNZ1Q=
|
||||
github.com/refraction-networking/utls v1.5.3 h1:Ds5Ocg1+MC1ahNx5iBEcHe0jHeLaA/fLey61EENm7ro=
|
||||
github.com/refraction-networking/utls v1.5.3/go.mod h1:SPuDbBmgLGp8s+HLNc83FuavwZCFoMmExj+ltUHiHUw=
|
||||
github.com/refraction-networking/utls v1.5.4 h1:9k6EO2b8TaOGsQ7Pl7p9w6PUhx18/ZCeT0WNTZ7Uw4o=
|
||||
github.com/refraction-networking/utls v1.5.4/go.mod h1:SPuDbBmgLGp8s+HLNc83FuavwZCFoMmExj+ltUHiHUw=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
||||
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
|
||||
@@ -173,10 +177,16 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y
|
||||
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||
golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck=
|
||||
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
|
||||
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
|
||||
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
|
||||
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g=
|
||||
golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k=
|
||||
golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI=
|
||||
golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo=
|
||||
golang.org/x/image v0.12.0 h1:w13vZbU4o5rKOFFR8y7M+c4A5jXDC0uXTdHYRP8X2DQ=
|
||||
golang.org/x/image v0.12.0/go.mod h1:Lu90jvHG7GfemOIcldsh9A2hS01ocl6oNO7ype5mEnk=
|
||||
golang.org/x/image v0.13.0 h1:3cge/F/QTkNLauhf2QoE9zp+7sr+ZcL4HnoZmdwg9sg=
|
||||
golang.org/x/image v0.13.0/go.mod h1:6mmbMOeV28HuMTgA6OSRkdXKYw/t5W9Uwn2Yv1r3Yxk=
|
||||
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
|
||||
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
@@ -189,6 +199,8 @@ golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
|
||||
golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns=
|
||||
golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8=
|
||||
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
|
||||
golang.org/x/net v0.16.0 h1:7eBu7KsSvFDtSXUIDbh3aqlK4DPsZ1rByC8PFfBThos=
|
||||
golang.org/x/net v0.16.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
@@ -204,6 +216,8 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
|
||||
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
|
||||
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
||||
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
|
||||
|
||||
@@ -40,7 +40,6 @@ type RequestOption struct {
|
||||
ContentType string //headers 中Content-Type 的值
|
||||
Raw any //不设置context-type,支持string,[]bytes,json,map
|
||||
|
||||
DisAlive bool //关闭连接复用
|
||||
DisCookie bool //关闭cookies管理,这个请求不用cookies池
|
||||
DisDecode bool //关闭自动解码
|
||||
DisRead bool //关闭自动读取
|
||||
@@ -217,9 +216,6 @@ func (obj *Client) newRequestOption(option RequestOption) RequestOption {
|
||||
if option.Timeout == 0 {
|
||||
option.Timeout = obj.timeout
|
||||
}
|
||||
if !option.DisAlive {
|
||||
option.DisAlive = obj.disAlive
|
||||
}
|
||||
if !option.DisCookie {
|
||||
option.DisCookie = obj.disCookie
|
||||
}
|
||||
|
||||
35
requests.go
35
requests.go
@@ -139,8 +139,6 @@ type reqCtxData struct {
|
||||
proxy *url.URL
|
||||
disProxy bool
|
||||
|
||||
disAlive bool
|
||||
ws bool
|
||||
requestCallBack func(context.Context, *http.Request) error
|
||||
responseCallBack func(context.Context, *http.Request, *http.Response) error
|
||||
|
||||
@@ -349,7 +347,7 @@ func (obj *Client) request(preCtx context.Context, option RequestOption) (respon
|
||||
var reqs *http.Request
|
||||
//构造ctxData
|
||||
ctxData := new(reqCtxData)
|
||||
ctxData.disAlive = option.DisAlive
|
||||
|
||||
ctxData.requestCallBack = option.RequestCallBack
|
||||
ctxData.responseCallBack = option.ResponseCallBack
|
||||
//构造代理
|
||||
@@ -389,26 +387,23 @@ func (obj *Client) request(preCtx context.Context, option RequestOption) (respon
|
||||
return response, tools.WrapError(errFatal, errors.New("tempRequest 构造request失败"), err)
|
||||
}
|
||||
|
||||
//判断file
|
||||
if reqs.URL.Scheme == "file" { //文件直接返回
|
||||
//解析Scheme
|
||||
var isWebSocket bool
|
||||
switch reqs.URL.Scheme {
|
||||
case "ws", "wss":
|
||||
isWebSocket = true
|
||||
case "file":
|
||||
response.filePath = re.Sub(`^/+`, "", reqs.URL.Path)
|
||||
response.content, err = os.ReadFile(response.filePath)
|
||||
if err != nil {
|
||||
err = tools.WrapError(errFatal, errors.New("read filePath data error"), err)
|
||||
}
|
||||
return
|
||||
case "http", "https":
|
||||
default:
|
||||
err = tools.WrapError(errFatal, fmt.Errorf("url scheme error: %s", reqs.URL.Scheme))
|
||||
return
|
||||
}
|
||||
|
||||
//判断ws
|
||||
switch reqs.URL.Scheme {
|
||||
case "ws":
|
||||
ctxData.ws = true
|
||||
reqs.URL.Scheme = "http"
|
||||
case "wss":
|
||||
ctxData.ws = true
|
||||
reqs.URL.Scheme = "https"
|
||||
}
|
||||
|
||||
//添加headers
|
||||
var headOk bool
|
||||
if reqs.Header, headOk = option.Headers.(http.Header); !headOk {
|
||||
@@ -440,20 +435,20 @@ func (obj *Client) request(preCtx context.Context, option RequestOption) (respon
|
||||
}
|
||||
}
|
||||
//开始发送请求
|
||||
if ctxData.ws { //设置websocket headers
|
||||
if isWebSocket { //设置websocket headers
|
||||
websocket.SetClientHeadersOption(reqs.Header, option.WsOption)
|
||||
}
|
||||
if response.response, err = obj.getClient(option).Do(reqs); err != nil {
|
||||
return
|
||||
}
|
||||
if response.response == nil {
|
||||
} else if response.response == nil {
|
||||
err = errors.New("response is nil")
|
||||
return
|
||||
}
|
||||
if !response.disUnzip {
|
||||
response.disUnzip = response.response.Uncompressed
|
||||
}
|
||||
if ctxData.ws { //判断ws 的状态码是否正确
|
||||
|
||||
if isWebSocket { //判断ws 的状态码是否正确
|
||||
if response.response.StatusCode == 101 {
|
||||
response.webSocket, err = websocket.NewClientConn(response.response)
|
||||
} else {
|
||||
|
||||
@@ -334,14 +334,10 @@ func (obj *Response) ReadBody() error { //读取body,对body 解压,解码操
|
||||
}
|
||||
|
||||
func (obj *Response) Delete() { //通知关闭连接,不会影响正在传输中的数据
|
||||
if delFunc, ok := obj.response.Body.(interface{ Delete() }); ok {
|
||||
delFunc.Delete()
|
||||
}
|
||||
obj.response.Body.(interface{ Delete() }).Delete()
|
||||
}
|
||||
func (obj *Response) ForceDelete() { //强制关闭连接,立刻马上,正在传输中的数据立马中断
|
||||
if delFunc, ok := obj.response.Body.(interface{ ForceDelete() }); ok {
|
||||
delFunc.ForceDelete()
|
||||
}
|
||||
obj.response.Body.(interface{ ForceDelete() }).ForceDelete()
|
||||
}
|
||||
|
||||
// 关闭response ,当DisRead 为true,websocket,sse 协议 请一定要手动关闭
|
||||
|
||||
248
roundTripper.go
248
roundTripper.go
@@ -15,7 +15,6 @@ import (
|
||||
|
||||
"net/http"
|
||||
|
||||
"gitee.com/baixudong/ja3"
|
||||
"gitee.com/baixudong/net/http2"
|
||||
"gitee.com/baixudong/tools"
|
||||
utls "github.com/refraction-networking/utls"
|
||||
@@ -34,6 +33,10 @@ type reqTask struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (obj *reqTask) isPool() bool {
|
||||
return obj.err == nil && obj.res != nil && obj.res.StatusCode != 101 && obj.res.Header.Get("Content-Type") != "text/event-stream"
|
||||
}
|
||||
|
||||
type connPool struct {
|
||||
ctx context.Context //控制请求的生命周期
|
||||
cnl context.CancelFunc
|
||||
@@ -83,10 +86,10 @@ func getKey(ctxData *reqCtxData, req *http.Request) string {
|
||||
func (obj *RoundTripper) newConnPool(key string, conn *Connecotr) *connPool { //新建连接池
|
||||
pool := new(connPool)
|
||||
pool.ctx, pool.cnl = context.WithCancel(obj.ctx)
|
||||
pool.total.Add(1)
|
||||
pool.tasks = make(chan *reqTask)
|
||||
pool.rt = obj
|
||||
pool.key = key
|
||||
pool.total.Add(1)
|
||||
go pool.rwMain(conn)
|
||||
return pool
|
||||
}
|
||||
@@ -116,10 +119,10 @@ func (obj *RoundTripper) TlsConfig() *tls.Config {
|
||||
func (obj *RoundTripper) UtlsConfig() *utls.Config {
|
||||
return obj.utlsConfig.Clone()
|
||||
}
|
||||
func (obj *RoundTripper) dial(ctxData *reqCtxData, addr string, key string, req *http.Request) (conn *Connecotr, oneSessionCache *ja3.OneSessionCache, err error) {
|
||||
func (obj *RoundTripper) dial(ctxData *reqCtxData, addr string, key string, req *http.Request) (conn *Connecotr, err error) {
|
||||
if !ctxData.disProxy && ctxData.proxy == nil { //确定代理
|
||||
if ctxData.proxy, err = obj.GetProxy(req.Context(), req.URL); err != nil {
|
||||
return conn, oneSessionCache, err
|
||||
return conn, err
|
||||
}
|
||||
}
|
||||
var netConn net.Conn
|
||||
@@ -130,43 +133,27 @@ func (obj *RoundTripper) dial(ctxData *reqCtxData, addr string, key string, req
|
||||
netConn, err = obj.dialer.DialContextWithProxy(req.Context(), "tcp", req.URL.Scheme, addr, host, ctxData.proxy, obj.TlsConfig())
|
||||
}
|
||||
if err != nil {
|
||||
return conn, oneSessionCache, err
|
||||
return conn, err
|
||||
}
|
||||
conne := new(Connecotr)
|
||||
|
||||
conne.rn = make(chan int)
|
||||
conne.rc = make(chan []byte)
|
||||
|
||||
conne.deleteCtx, conne.deleteCnl = context.WithCancel(obj.ctx)
|
||||
conne.closeCtx, conne.closeCnl = context.WithCancel(conne.deleteCtx)
|
||||
|
||||
if req.URL.Scheme == "https" {
|
||||
conne.WithCancel(obj.ctx)
|
||||
isWebSocket := req.URL.Scheme == "wss"
|
||||
if req.URL.Scheme == "https" || isWebSocket {
|
||||
ctx, cnl := context.WithTimeout(req.Context(), obj.tlsHandshakeTimeout)
|
||||
defer cnl()
|
||||
if ctxData.ja3Spec.IsSet() {
|
||||
session, ok := obj.clientSessionCache.Get(addr)
|
||||
utlsConfig := obj.UtlsConfig()
|
||||
oneSessionCache = ja3.NewOneSessionCache(session)
|
||||
utlsConfig.ClientSessionCache = oneSessionCache
|
||||
if ok {
|
||||
if !ctxData.ja3Spec.HasPsk() {
|
||||
ja3.AddPsk(&ctxData.ja3Spec)
|
||||
}
|
||||
} else {
|
||||
if ctxData.ja3Spec.HasPsk() {
|
||||
ja3.DelPsk(&ctxData.ja3Spec)
|
||||
}
|
||||
}
|
||||
tlsConn, err := obj.dialer.AddJa3Tls(ctx, netConn, host, ctxData.ws, ctxData.ja3Spec, utlsConfig)
|
||||
tlsConn, err := obj.dialer.AddJa3Tls(ctx, netConn, host, isWebSocket, ctxData.ja3Spec, obj.UtlsConfig())
|
||||
if err != nil {
|
||||
return conne, oneSessionCache, err
|
||||
return conne, err
|
||||
}
|
||||
conne.h2 = tlsConn.ConnectionState().NegotiatedProtocol == "h2"
|
||||
netConn = tlsConn
|
||||
} else {
|
||||
tlsConn, err := obj.dialer.AddTls(ctx, netConn, host, ctxData.ws, obj.TlsConfig())
|
||||
tlsConn, err := obj.dialer.AddTls(ctx, netConn, host, isWebSocket, obj.TlsConfig())
|
||||
if err != nil {
|
||||
return conne, oneSessionCache, err
|
||||
return conne, err
|
||||
}
|
||||
conne.h2 = tlsConn.ConnectionState().NegotiatedProtocol == "h2"
|
||||
netConn = tlsConn
|
||||
@@ -177,23 +164,24 @@ func (obj *RoundTripper) dial(ctxData *reqCtxData, addr string, key string, req
|
||||
if conne.h2RawConn, err = http2.NewClientConn(func() {
|
||||
conne.deleteCnl()
|
||||
}, netConn, ctxData.h2Ja3Spec); err != nil {
|
||||
return conne, oneSessionCache, err
|
||||
return conne, err
|
||||
}
|
||||
} else {
|
||||
conne.r = bufio.NewReader(conne)
|
||||
}
|
||||
return conne, oneSessionCache, err
|
||||
return conne, err
|
||||
}
|
||||
|
||||
type Connecotr struct {
|
||||
err error
|
||||
deleteCtx context.Context //强制关闭
|
||||
deleteCtx context.Context //强制关闭,直接关闭所有底层conn
|
||||
deleteCnl context.CancelFunc
|
||||
|
||||
closeCtx context.Context //通知关闭
|
||||
closeCtx context.Context //通知关闭,安全的关闭,等待body 完成生命周期
|
||||
closeCnl context.CancelFunc
|
||||
|
||||
bodyCtx context.Context //body的生命周期
|
||||
bodyCtx context.Context //body的生命周期
|
||||
bodyCnl context.CancelFunc //body的生命周期
|
||||
|
||||
rawConn net.Conn
|
||||
h2 bool
|
||||
@@ -205,6 +193,10 @@ type Connecotr struct {
|
||||
isPool bool
|
||||
}
|
||||
|
||||
func (obj *Connecotr) WithCancel(ctx context.Context) {
|
||||
obj.deleteCtx, obj.deleteCnl = context.WithCancel(ctx)
|
||||
obj.closeCtx, obj.closeCnl = context.WithCancel(obj.deleteCtx)
|
||||
}
|
||||
func (obj *Connecotr) Close() error {
|
||||
obj.deleteCnl()
|
||||
if obj.h2RawConn != nil {
|
||||
@@ -213,8 +205,8 @@ func (obj *Connecotr) Close() error {
|
||||
return obj.rawConn.Close()
|
||||
}
|
||||
func (obj *Connecotr) read() {
|
||||
defer obj.Close()
|
||||
obj.isRead = true
|
||||
defer obj.Close()
|
||||
con := make([]byte, 4096)
|
||||
var i int
|
||||
for {
|
||||
@@ -224,15 +216,15 @@ func (obj *Connecotr) read() {
|
||||
b := con[:i]
|
||||
for once := true; once || len(b) > 0; once = false {
|
||||
select {
|
||||
case obj.rc <- b:
|
||||
select {
|
||||
case nw := <-obj.rn:
|
||||
b = b[nw:]
|
||||
case <-obj.deleteCtx.Done():
|
||||
return
|
||||
}
|
||||
case <-obj.deleteCtx.Done():
|
||||
return
|
||||
case obj.rc <- b:
|
||||
select {
|
||||
case <-obj.deleteCtx.Done():
|
||||
return
|
||||
case nw := <-obj.rn:
|
||||
b = b[nw:]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -242,21 +234,21 @@ func (obj *Connecotr) Read(b []byte) (i int, err error) {
|
||||
return obj.rawConn.Read(b)
|
||||
}
|
||||
select {
|
||||
case <-obj.deleteCtx.Done():
|
||||
if err = obj.err; err == nil {
|
||||
err = tools.WrapError(obj.deleteCtx.Err(), "connecotr close")
|
||||
}
|
||||
case con := <-obj.rc:
|
||||
i, err = copy(b, con), obj.err
|
||||
select {
|
||||
case obj.rn <- i:
|
||||
if i < len(con) {
|
||||
err = nil
|
||||
}
|
||||
case <-obj.deleteCtx.Done():
|
||||
if err = obj.err; err == nil {
|
||||
err = tools.WrapError(obj.deleteCtx.Err(), "connecotr close")
|
||||
}
|
||||
}
|
||||
case <-obj.deleteCtx.Done():
|
||||
if err = obj.err; err == nil {
|
||||
err = tools.WrapError(obj.deleteCtx.Err(), "connecotr close")
|
||||
case obj.rn <- i:
|
||||
if i < len(con) {
|
||||
err = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
@@ -286,7 +278,6 @@ func (obj *Connecotr) h2Closed() bool {
|
||||
}
|
||||
|
||||
type ReadWriteCloser struct {
|
||||
cnl context.CancelFunc
|
||||
body io.ReadCloser
|
||||
conn *Connecotr
|
||||
}
|
||||
@@ -297,12 +288,14 @@ func (obj *ReadWriteCloser) Conn() net.Conn {
|
||||
func (obj *ReadWriteCloser) Read(p []byte) (n int, err error) {
|
||||
return obj.body.Read(p)
|
||||
}
|
||||
func (obj *ReadWriteCloser) Close() error {
|
||||
func (obj *ReadWriteCloser) Close() (err error) {
|
||||
err = obj.body.Close()
|
||||
if !obj.conn.isPool {
|
||||
defer obj.Delete()
|
||||
obj.ForceDelete()
|
||||
} else {
|
||||
obj.conn.bodyCnl()
|
||||
}
|
||||
defer obj.cnl()
|
||||
return obj.body.Close()
|
||||
return
|
||||
}
|
||||
|
||||
func (obj *ReadWriteCloser) Delete() { //通知关闭连接,不会影响正在传输中的数据
|
||||
@@ -313,26 +306,26 @@ func (obj *ReadWriteCloser) ForceDelete() { //强制关闭连接,立刻马上
|
||||
obj.conn.Close()
|
||||
}
|
||||
|
||||
func wrapBody(conn *Connecotr, task *reqTask) {
|
||||
func (obj *Connecotr) wrapBody(task *reqTask) {
|
||||
body := new(ReadWriteCloser)
|
||||
conn.bodyCtx, body.cnl = context.WithCancel(conn.deleteCtx)
|
||||
obj.bodyCtx, obj.bodyCnl = context.WithCancel(obj.deleteCtx)
|
||||
body.body = task.res.Body
|
||||
body.conn = conn
|
||||
body.conn = obj
|
||||
task.res.Body = body
|
||||
}
|
||||
func http1Req(conn *Connecotr, task *reqTask) {
|
||||
func (obj *Connecotr) http1Req(task *reqTask) {
|
||||
defer task.cnl()
|
||||
if task.err = task.req.Write(conn); task.err == nil {
|
||||
if task.res, task.err = http.ReadResponse(conn.r, task.req); task.res != nil && task.err == nil {
|
||||
wrapBody(conn, task)
|
||||
if task.err = task.req.Write(obj); task.err == nil {
|
||||
if task.res, task.err = http.ReadResponse(obj.r, task.req); task.res != nil && task.err == nil {
|
||||
obj.wrapBody(task)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func http2Req(conn *Connecotr, task *reqTask) {
|
||||
func (obj *Connecotr) http2Req(task *reqTask) {
|
||||
defer task.cnl()
|
||||
if task.res, task.err = conn.h2RawConn.RoundTrip(task.req); task.res != nil && task.err == nil {
|
||||
wrapBody(conn, task)
|
||||
if task.res, task.err = obj.h2RawConn.RoundTrip(task.req); task.res != nil && task.err == nil {
|
||||
obj.wrapBody(task)
|
||||
}
|
||||
}
|
||||
func (obj *connPool) notice(task *reqTask) {
|
||||
@@ -341,9 +334,46 @@ func (obj *connPool) notice(task *reqTask) {
|
||||
case task.emptyPool <- struct{}{}: //告诉提交任务方,池子没有可用连接
|
||||
}
|
||||
}
|
||||
func (obj *Connecotr) taskMain(task *reqTask, afterTime *time.Timer, responseHeaderTimeout time.Duration) (*http.Response, error, bool) {
|
||||
if obj.h2 && obj.h2Closed() { //连接不可用
|
||||
return nil, errors.New("连接不可用"), true
|
||||
}
|
||||
select {
|
||||
case <-obj.closeCtx.Done(): //连接池通知关闭,等待连接被释放掉
|
||||
return nil, obj.closeCtx.Err(), true
|
||||
default:
|
||||
}
|
||||
if obj.h2 {
|
||||
go obj.http2Req(task)
|
||||
} else {
|
||||
go obj.http1Req(task)
|
||||
}
|
||||
//等待任务完成
|
||||
if afterTime == nil {
|
||||
afterTime = time.NewTimer(responseHeaderTimeout)
|
||||
} else {
|
||||
afterTime.Reset(responseHeaderTimeout)
|
||||
}
|
||||
if !obj.isPool {
|
||||
defer afterTime.Stop()
|
||||
}
|
||||
select {
|
||||
case <-task.ctx.Done():
|
||||
if task.res != nil && task.err == nil && obj.isPool {
|
||||
<-obj.bodyCtx.Done() //等待body close
|
||||
}
|
||||
case <-obj.deleteCtx.Done(): //强制关闭
|
||||
task.err = obj.deleteCtx.Err()
|
||||
task.cnl()
|
||||
case <-afterTime.C:
|
||||
task.err = errors.New("response Header is Timeout")
|
||||
task.cnl()
|
||||
}
|
||||
return task.res, task.err, false
|
||||
}
|
||||
|
||||
func (obj *connPool) rwMain(conn *Connecotr) {
|
||||
conn.deleteCtx, conn.deleteCnl = context.WithCancel(obj.ctx)
|
||||
conn.closeCtx, conn.closeCnl = context.WithCancel(conn.deleteCtx)
|
||||
conn.WithCancel(obj.ctx)
|
||||
var afterTime *time.Timer
|
||||
defer func() {
|
||||
if afterTime != nil {
|
||||
@@ -358,46 +388,22 @@ func (obj *connPool) rwMain(conn *Connecotr) {
|
||||
delete(obj.rt.connPools, obj.key)
|
||||
}
|
||||
}()
|
||||
if !conn.h2 {
|
||||
<-conn.bodyCtx.Done() //等待连接占用被释放
|
||||
select {
|
||||
case <-conn.deleteCtx.Done(): //强制关闭所有连接
|
||||
return
|
||||
case <-conn.bodyCtx.Done(): //等待连接占用被释放
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-conn.closeCtx.Done(): //连接池通知关闭,等待连接被释放掉
|
||||
return
|
||||
case task := <-obj.tasks: //接收到任务
|
||||
if conn.h2 && conn.h2Closed() { //连接不可用
|
||||
res, err, notice := conn.taskMain(task, afterTime, obj.rt.responseHeaderTimeout)
|
||||
if notice {
|
||||
obj.notice(task)
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-conn.closeCtx.Done(): //连接池通知关闭,等待连接被释放掉
|
||||
obj.notice(task)
|
||||
return
|
||||
default:
|
||||
}
|
||||
if conn.h2 {
|
||||
go http2Req(conn, task)
|
||||
} else {
|
||||
go http1Req(conn, task)
|
||||
}
|
||||
//等待任务完成
|
||||
if afterTime == nil {
|
||||
afterTime = time.NewTimer(obj.rt.responseHeaderTimeout)
|
||||
} else {
|
||||
afterTime.Reset(obj.rt.responseHeaderTimeout)
|
||||
}
|
||||
select {
|
||||
case <-task.ctx.Done():
|
||||
if task.req == nil || task.err != nil { //如果没有response返回,就认定这个连接异常,直接返回
|
||||
return
|
||||
}
|
||||
if !conn.h2 {
|
||||
<-conn.bodyCtx.Done() //等待body close
|
||||
}
|
||||
case <-afterTime.C:
|
||||
task.err = errors.New("response Header is Timeout")
|
||||
task.cnl()
|
||||
if res == nil || err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -405,7 +411,6 @@ func (obj *connPool) rwMain(conn *Connecotr) {
|
||||
}
|
||||
|
||||
type RoundTripper struct {
|
||||
clientSessionCache utls.ClientSessionCache
|
||||
ctx context.Context
|
||||
cnl context.CancelFunc
|
||||
connPools map[string]*connPool
|
||||
@@ -456,13 +461,14 @@ func NewRoundTripper(preCtx context.Context, option RoundTripperOption) *RoundTr
|
||||
ClientSessionCache: tls.NewLRUClientSessionCache(0),
|
||||
}
|
||||
utlsConfig := &utls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
InsecureSkipTimeVerify: true,
|
||||
SessionTicketKey: [32]byte{},
|
||||
OmitEmptyPsk: true,
|
||||
InsecureSkipVerify: true,
|
||||
InsecureSkipTimeVerify: true,
|
||||
SessionTicketKey: [32]byte{},
|
||||
ClientSessionCache: utls.NewLRUClientSessionCache(0),
|
||||
OmitEmptyPsk: true,
|
||||
PreferSkipResumptionOnNilExtension: true,
|
||||
}
|
||||
return &RoundTripper{
|
||||
clientSessionCache: ja3.NewClientSessionCache(),
|
||||
tlsConfig: tlsConfig,
|
||||
utlsConfig: utlsConfig,
|
||||
ctx: ctx,
|
||||
@@ -525,42 +531,34 @@ func (obj *RoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
key := getKey(ctxData, req)
|
||||
key := getKey(ctxData, req) //pool key
|
||||
task := &reqTask{req: req, emptyPool: make(chan struct{})}
|
||||
task.ctx, task.cnl = context.WithCancel(obj.ctx)
|
||||
defer task.cnl()
|
||||
if !ctxData.disAlive {
|
||||
ok, err := obj.poolRoundTrip(task, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if ok {
|
||||
if ctxData.responseCallBack != nil {
|
||||
if err = ctxData.responseCallBack(task.req.Context(), req, task.res); err != nil {
|
||||
task.err = err
|
||||
}
|
||||
//get pool conn
|
||||
if ok, err := obj.poolRoundTrip(task, key); err != nil {
|
||||
return nil, err
|
||||
} else if ok { //is conn multi
|
||||
if ctxData.responseCallBack != nil {
|
||||
if err = ctxData.responseCallBack(task.req.Context(), req, task.res); err != nil {
|
||||
task.err = err
|
||||
}
|
||||
return task.res, task.err
|
||||
}
|
||||
return task.res, task.err
|
||||
}
|
||||
newConn:
|
||||
addr := getAddr(req.URL)
|
||||
conn, oneSessionCache, err := obj.dial(ctxData, addr, key, req)
|
||||
conn, err := obj.dial(ctxData, addr, key, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !conn.h2 {
|
||||
go http1Req(conn, task)
|
||||
} else {
|
||||
go http2Req(conn, task)
|
||||
}
|
||||
<-task.ctx.Done()
|
||||
if oneSessionCache != nil && oneSessionCache.Session() != nil {
|
||||
obj.clientSessionCache.Put(addr, oneSessionCache.Session())
|
||||
if _, _, notice := conn.taskMain(task, nil, obj.responseHeaderTimeout); notice {
|
||||
goto newConn
|
||||
}
|
||||
if task.err == nil && task.res == nil {
|
||||
task.err = obj.ctx.Err()
|
||||
}
|
||||
if task.err == nil && task.res != nil && task.res.StatusCode != 101 && task.res.Header.Get("Content-Type") != "text/event-stream" && !ctxData.disAlive {
|
||||
if task.isPool() {
|
||||
obj.putConnPool(key, conn)
|
||||
}
|
||||
if ctxData.responseCallBack != nil {
|
||||
|
||||
Reference in New Issue
Block a user