This commit is contained in:
gospider
2025-02-26 21:58:50 +08:00
parent f99363680c
commit d7d7796b22
21 changed files with 514 additions and 572 deletions

359
body.go
View File

@@ -6,58 +6,75 @@ import (
"errors"
"fmt"
"io"
"maps"
"mime/multipart"
"net/http"
"net/textproto"
"net/url"
"reflect"
"github.com/gospider007/gson"
"github.com/gospider007/tools"
)
const (
readType = iota
mapType
)
type OrderMap struct {
data map[string]any
keys []string
}
func NewOrderMap() *OrderMap {
return &OrderMap{
data: make(map[string]any),
keys: []string{},
type OrderData struct {
data []struct {
key string
val any
}
}
func (obj *OrderMap) Set(key string, val any) {
obj.Del(key)
obj.data[key] = val
obj.keys = append(obj.keys, key)
func NewOrderData() *OrderData {
return &OrderData{
data: []struct {
key string
val any
}{},
}
}
func (obj *OrderMap) Del(key string) {
delete(obj.data, key)
obj.keys = tools.DelSliceVals(obj.keys, key)
func (obj *OrderData) Add(key string, val any) {
obj.data = append(obj.data, struct {
key string
val any
}{key: key, val: val})
}
func (obj *OrderMap) Keys() []string {
return obj.keys
func (obj *OrderData) Keys() []string {
keys := make([]string, len(obj.data))
for i, value := range obj.data {
keys[i] = value.key
}
return keys
}
func (obj *OrderMap) parseHeaders() (map[string][]string, []string) {
head := make(http.Header)
for _, kk := range obj.keys {
if vvs, ok := obj.data[kk].([]any); ok {
for _, vv := range vvs {
head.Add(kk, fmt.Sprint(vv))
}
} else {
head.Add(kk, fmt.Sprint(obj.data[kk]))
type orderT struct {
key string
val any
}
func (obj orderT) Key() string {
return obj.key
}
func (obj orderT) Val() any {
return obj.val
}
func (obj *OrderData) Data() []interface {
Key() string
Val() any
} {
if obj == nil {
return nil
}
keys := make([]interface {
Key() string
Val() any
}, len(obj.data))
for i, value := range obj.data {
keys[i] = orderT{
key: value.key,
val: value.val,
}
}
return head, obj.keys
return keys
}
func formWrite(writer *multipart.Writer, key string, val any) (err error) {
@@ -109,7 +126,10 @@ func formWrite(writer *multipart.Writer, key string, val any) (err error) {
case string:
err = writer.WriteField(key, value)
default:
con, _ := gson.Decode(val)
con, err := gson.Decode(val)
if err != nil {
return err
}
err = writer.WriteField(key, con.Raw())
if err != nil {
return err
@@ -117,65 +137,23 @@ func formWrite(writer *multipart.Writer, key string, val any) (err error) {
}
return
}
func (obj *OrderMap) parseForm(ctx context.Context) (io.Reader, string, bool, error) {
if len(obj.keys) == 0 || len(obj.data) == 0 {
return nil, "", false, nil
}
if obj.isformPip() {
pr, pw := io.Pipe()
writer := multipart.NewWriter(pw)
go func() {
stop := context.AfterFunc(ctx, func() {
pw.CloseWithError(ctx.Err())
})
defer stop()
pw.CloseWithError(obj.formWriteMain(writer))
}()
return pr, writer.FormDataContentType(), true, nil
}
body := bytes.NewBuffer(nil)
writer := multipart.NewWriter(body)
err := obj.formWriteMain(writer)
if err != nil {
return nil, writer.FormDataContentType(), false, err
}
return bytes.NewReader(body.Bytes()), writer.FormDataContentType(), false, err
}
func (obj *OrderMap) isformPip() bool {
if len(obj.keys) == 0 || len(obj.data) == 0 {
func (obj *OrderData) isformPip() bool {
if len(obj.data) == 0 {
return false
}
for _, key := range obj.keys {
if vals, ok := obj.data[key].([]any); ok {
for _, val := range vals {
if file, ok := val.(File); ok {
if _, ok := file.Content.(io.Reader); ok {
return true
}
}
}
} else {
if file, ok := obj.data[key].(File); ok {
if _, ok := file.Content.(io.Reader); ok {
return true
}
for _, value := range obj.data {
if file, ok := value.val.(File); ok {
if _, ok := file.Content.(io.Reader); ok {
return true
}
}
}
return false
}
func (obj *OrderMap) formWriteMain(writer *multipart.Writer) (err error) {
for _, key := range obj.keys {
if vals, ok := obj.data[key].([]any); ok {
for _, val := range vals {
if err = formWrite(writer, key, val); err != nil {
return
}
}
} else {
if err = formWrite(writer, key, obj.data[key]); err != nil {
return
}
func (obj *OrderData) formWriteMain(writer *multipart.Writer) (err error) {
for _, value := range obj.data {
if err = formWrite(writer, value.key, value.val); err != nil {
return
}
}
return writer.Close()
@@ -193,39 +171,19 @@ func paramsWrite(buf *bytes.Buffer, key string, val any) {
buf.WriteString(url.QueryEscape(fmt.Sprintf("%v", val)))
}
}
func (obj *OrderMap) parseParams() *bytes.Buffer {
buf := bytes.NewBuffer(nil)
for _, k := range obj.keys {
if vals, ok := obj.data[k].([]any); ok {
for _, v := range vals {
paramsWrite(buf, k, v)
}
} else {
paramsWrite(buf, k, obj.data[k])
}
}
return buf
}
func (obj *OrderMap) parseData() io.Reader {
val := obj.parseParams().Bytes()
if val == nil {
return nil
}
return bytes.NewReader(val)
}
func (obj *OrderMap) MarshalJSON() ([]byte, error) {
func (obj *OrderData) MarshalJSON() ([]byte, error) {
buf := bytes.NewBuffer(nil)
err := buf.WriteByte('{')
if err != nil {
return nil, err
}
for i, k := range obj.keys {
for i, value := range obj.data {
if i > 0 {
if err = buf.WriteByte(','); err != nil {
return nil, err
}
}
key, err := gson.Encode(k)
key, err := gson.Encode(value.key)
if err != nil {
return nil, err
}
@@ -235,7 +193,7 @@ func (obj *OrderMap) MarshalJSON() ([]byte, error) {
if err = buf.WriteByte(':'); err != nil {
return nil, err
}
val, err := gson.Encode(obj.data[k])
val, err := gson.Encode(value.val)
if err != nil {
return nil, err
}
@@ -248,107 +206,88 @@ func (obj *OrderMap) MarshalJSON() ([]byte, error) {
}
return buf.Bytes(), nil
}
func (obj *RequestOption) newBody(val any) (io.Reader, *OrderData, bool, error) {
switch value := val.(type) {
case *OrderData:
return nil, value, true, nil
case io.ReadCloser:
obj.once = true
return value, nil, true, nil
case io.Reader:
obj.once = true
return value, nil, true, nil
case string:
return bytes.NewReader(tools.StringToBytes(value)), nil, true, nil
case []byte:
return bytes.NewReader(value), nil, true, nil
case map[string]any:
orderMap := NewOrderData()
for key, val := range value {
orderMap.Add(key, val)
}
return nil, orderMap, true, nil
default:
jsonData, err := gson.Decode(val)
if err != nil {
return nil, nil, false, errors.New("invalid body type")
}
orderMap := NewOrderData()
for kk, vv := range jsonData.Map() {
orderMap.Add(kk, vv.Value())
}
return nil, orderMap, false, nil
}
}
func any2Map(val any) map[string]any {
if reflect.TypeOf(val).Kind() != reflect.Map {
func (obj *OrderData) parseParams() *bytes.Buffer {
buf := bytes.NewBuffer(nil)
for _, value := range obj.data {
paramsWrite(buf, value.key, value.val)
}
return buf
}
func (obj *OrderData) parseForm(ctx context.Context) (io.Reader, bool, error) {
if len(obj.data) == 0 {
return nil, false, nil
}
if obj.isformPip() {
pr, pw := io.Pipe()
writer := multipart.NewWriter(pw)
go func() {
stop := context.AfterFunc(ctx, func() {
pw.CloseWithError(ctx.Err())
})
defer stop()
pw.CloseWithError(obj.formWriteMain(writer))
}()
return pr, true, nil
}
body := bytes.NewBuffer(nil)
writer := multipart.NewWriter(body)
err := obj.formWriteMain(writer)
if err != nil {
return nil, false, err
}
return bytes.NewReader(body.Bytes()), false, err
}
func (obj *OrderData) parseData() io.Reader {
val := obj.parseParams().Bytes()
if val == nil {
return nil
}
mapValue := reflect.ValueOf(val)
result := make(map[string]any)
for _, key := range mapValue.MapKeys() {
valueData := mapValue.MapIndex(key).Interface()
sliceValue := reflect.ValueOf(valueData)
if sliceValue.Kind() == reflect.Slice {
valueData2 := []any{}
for i := 0; i < sliceValue.Len(); i++ {
valueData2 = append(valueData2, sliceValue.Index(i).Interface())
}
result[fmt.Sprint(key.Interface())] = valueData2
} else {
result[fmt.Sprint(key.Interface())] = valueData
}
}
return result
return bytes.NewReader(val)
}
func (obj *RequestOption) newBody(val any, valType int) (reader io.Reader, parseOrderMap *OrderMap, orderKey []string, err error) {
var isOrderMap bool
parseOrderMap, isOrderMap = val.(*OrderMap)
if isOrderMap {
if valType == readType {
readCon, err := parseOrderMap.MarshalJSON()
return bytes.NewReader(readCon), nil, nil, err
} else {
return nil, parseOrderMap, parseOrderMap.Keys(), nil
}
func (obj *OrderData) parseJson() (io.Reader, error) {
con, err := obj.MarshalJSON()
if err != nil {
return nil, err
}
if valType == readType {
switch value := val.(type) {
case io.ReadCloser:
obj.once = true
return value, nil, nil, nil
case io.Reader:
obj.once = true
return value, nil, nil, nil
case string:
return bytes.NewReader(tools.StringToBytes(value)), nil, nil, nil
case []byte:
return bytes.NewReader(value), nil, nil, nil
default:
enData, err := gson.Encode(val)
if err != nil {
return nil, nil, nil, err
}
return bytes.NewReader(enData), nil, nil, nil
}
}
if mapData := any2Map(val); mapData != nil {
val = mapData
}
mapL:
switch value := val.(type) {
case *gson.Client:
if !value.IsObject() {
return nil, nil, nil, errors.New("body-type error")
}
orderMap := NewOrderMap()
for kk, vv := range value.Map() {
if vv.IsArray() {
valData := make([]any, len(vv.Array()))
for i, v := range vv.Array() {
valData[i] = v.Value()
}
orderMap.Set(kk, valData)
} else {
orderMap.Set(kk, vv.Value())
}
}
return nil, orderMap, nil, nil
case *OrderMap:
if mapData := any2Map(value.data); mapData != nil {
value.data = mapData
}
return nil, value, nil, nil
case map[string]any:
orderMap := NewOrderMap()
orderMap.data = value
orderMap.keys = make([]string, len(value))
i := 0
for key := range maps.Keys(value) {
orderMap.keys[i] = key
i++
}
return nil, orderMap, nil, nil
}
if val, err = gson.Decode(val); err != nil {
switch value := val.(type) {
case string:
return bytes.NewReader(tools.StringToBytes(value)), nil, nil, nil
case []byte:
return bytes.NewReader(value), nil, nil, nil
default:
return nil, nil, nil, err
}
}
goto mapL
return bytes.NewReader(con), nil
}
func (obj *OrderData) parseText() (io.Reader, error) {
con, err := obj.MarshalJSON()
if err != nil {
return nil, err
}
return bytes.NewReader(con), nil
}

View File

@@ -63,20 +63,14 @@ func NewClient(preCtx context.Context, options ...ClientOption) (*Client, error)
return result, err
}
// Close idle connections. If the connection is in use, wait until it ends before closing
func (obj *Client) CloseConns() {
obj.transport.closeConns()
}
// Close the connection, even if it is in use, it will be closed
func (obj *Client) ForceCloseConns() {
obj.transport.forceCloseConns()
}
// Close the client and cannot be used again after shutdown
func (obj *Client) Close() {
obj.closed = true
obj.ForceCloseConns()
obj.CloseConns()
obj.cnl()
}
func (obj *Client) do(ctx *Response) (err error) {

57
conn.go
View File

@@ -7,12 +7,10 @@ import (
"iter"
"net"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/gospider007/ja3"
"github.com/gospider007/tools"
)
@@ -20,26 +18,24 @@ var maxRetryCount = 10
type Conn interface {
CloseWithError(err error) error
DoRequest(*http.Request, []string) (*http.Response, context.Context, error)
DoRequest(*http.Request, []interface {
Key() string
Val() any
}) (*http.Response, context.Context, error)
CloseCtx() context.Context
Stream() io.ReadWriteCloser
}
type connecotr struct {
parentForceCtx context.Context //parent force close
forceCtx context.Context //force close
forceCnl context.CancelCauseFunc
safeCtx context.Context //safe close
safeCnl context.CancelCauseFunc
Conn Conn
forceCtx context.Context //force close
forceCnl context.CancelCauseFunc
Conn Conn
c net.Conn
proxys []Address
}
func (obj *connecotr) withCancel(forceCtx context.Context, safeCtx context.Context) {
obj.parentForceCtx = forceCtx
func (obj *connecotr) withCancel(forceCtx context.Context) {
obj.forceCtx, obj.forceCnl = context.WithCancelCause(forceCtx)
obj.safeCtx, obj.safeCnl = context.WithCancelCause(safeCtx)
}
func (obj *connecotr) Close() error {
return obj.CloseWithError(errors.New("connecotr Close close"))
@@ -58,18 +54,10 @@ func (obj *connecotr) wrapBody(task *reqTask) {
body.conn = obj
task.reqCtx.response.Body = body
}
func (obj *connecotr) httpReq(task *reqTask, done chan struct{}) {
defer close(done)
if task.reqCtx.option.OrderHeaders == nil {
task.reqCtx.option.OrderHeaders = ja3.DefaultOrderHeaders()
} else {
orderHeaders := make([]string, len(task.reqCtx.option.OrderHeaders))
for i, v := range task.reqCtx.option.OrderHeaders {
orderHeaders[i] = strings.ToLower(v)
}
task.reqCtx.option.OrderHeaders = orderHeaders
}
task.reqCtx.response, task.bodyCtx, task.err = obj.Conn.DoRequest(task.reqCtx.request, task.reqCtx.option.OrderHeaders)
task.reqCtx.response, task.bodyCtx, task.err = obj.Conn.DoRequest(task.reqCtx.request, task.reqCtx.option.orderHeaders.Data())
if task.reqCtx.response != nil {
obj.wrapBody(task)
}
@@ -92,13 +80,11 @@ func (obj *connecotr) taskMain(task *reqTask) {
if errors.Is(task.err, errLastTaskRuning) {
task.isNotice = true
}
obj.CloseWithError(errors.New("taskMain close with error"))
obj.CloseWithError(tools.WrapError(task.err, errors.New("taskMain close with error")))
}
task.cnl()
if task.err == nil && task.reqCtx.response != nil && task.reqCtx.response.Body != nil {
select {
case <-obj.safeCtx.Done():
task.err = context.Cause(obj.safeCtx)
case <-obj.forceCtx.Done():
task.err = context.Cause(obj.forceCtx)
case <-task.bodyCtx.Done():
@@ -109,11 +95,6 @@ func (obj *connecotr) taskMain(task *reqTask) {
}
}()
select {
case <-obj.safeCtx.Done():
task.err = context.Cause(obj.safeCtx)
task.enableRetry = true
task.isNotice = true
return
case <-obj.forceCtx.Done(): //force conn close
task.err = context.Cause(obj.forceCtx)
task.enableRetry = true
@@ -153,9 +134,7 @@ func (obj *connecotr) taskMain(task *reqTask) {
type connPool struct {
forceCtx context.Context
safeCtx context.Context
forceCnl context.CancelCauseFunc
safeCnl context.CancelCauseFunc
tasks chan *reqTask
connPools *connPools
connKey string
@@ -190,19 +169,17 @@ func (obj *connPools) Range() iter.Seq2[string, *connPool] {
}
func (obj *connPool) rwMain(done chan struct{}, conn *connecotr) {
conn.withCancel(obj.forceCtx, obj.safeCtx)
conn.withCancel(obj.forceCtx)
defer func() {
conn.CloseWithError(errors.New("connPool rwMain close"))
obj.total.Add(-1)
if obj.total.Load() <= 0 {
obj.safeClose()
obj.close(errors.New("conn pool close"))
}
}()
close(done)
for {
select {
case <-conn.safeCtx.Done(): //safe close conn
return
case <-conn.forceCtx.Done(): //force close conn
return
case <-conn.Conn.CloseCtx().Done():
@@ -218,11 +195,7 @@ func (obj *connPool) rwMain(done chan struct{}, conn *connecotr) {
}
}
}
func (obj *connPool) forceClose() {
obj.safeClose()
obj.forceCnl(errors.New("connPool forceClose"))
}
func (obj *connPool) safeClose() {
func (obj *connPool) close(err error) {
obj.connPools.del(obj.connKey)
obj.safeCnl(errors.New("connPool close"))
obj.forceCnl(tools.WrapError(err, errors.New("connPool close")))
}

20
dial.go
View File

@@ -291,25 +291,21 @@ func (obj *Dialer) verifyUDPSocks5(ctx context.Context, conn net.Conn, proxyAddr
remoteAddr.NetWork = "udp"
proxyAddress, err := obj.verifySocks5(conn, "udp", proxyAddr, remoteAddr)
if err != nil {
return nil, err
return
}
var listener net.ListenConfig
wrapConn, err = listener.ListenPacket(ctx, "udp", ":0")
if err != nil {
return nil, err
return
}
wrapConn = NewUDPConn(wrapConn, &net.UDPAddr{IP: proxyAddress.IP, Port: proxyAddress.Port})
var cnl context.CancelFunc
udpCtx, cnl := context.WithCancel(context.TODO())
wrapConn = NewUDPConn(udpCtx, wrapConn, &net.UDPAddr{IP: proxyAddress.IP, Port: proxyAddress.Port})
go func() {
var buf [1]byte
for {
_, err := conn.Read(buf[:])
if err != nil {
wrapConn.Close()
break
}
}
io.Copy(io.Discard, conn)
cnl()
}()
return wrapConn, nil
return
}
func (obj *Dialer) writeCmd(conn net.Conn, network string) (err error) {
var cmd byte

1
go.mod
View File

@@ -18,6 +18,7 @@ require (
github.com/refraction-networking/uquic v0.0.6
github.com/refraction-networking/utls v1.6.7
golang.org/x/net v0.35.0
gopkg.in/errgo.v2 v2.1.0
)
require (

6
go.sum
View File

@@ -147,9 +147,12 @@ github.com/klauspost/cpuid/v2 v2.2.9 h1:66ze0taIn2H33fBvCkXuv9BmCwDfafmiIVpKV9kK
github.com/klauspost/cpuid/v2 v2.2.9/go.mod h1:rqkxqrZ1EhYM9G+hXH7YdowN5R5RGN6NK4QwQ3WMXF8=
github.com/klauspost/pgzip v1.2.6 h1:8RXeL5crjEUFnR2/Sn6GJNWtSQ3Dk8pq4CL3jvdDyjU=
github.com/klauspost/pgzip v1.2.6/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/libdns/libdns v0.2.3 h1:ba30K4ObwMGB/QTmqUxf3H4/GmUrCAIkMWejeGl12v8=
github.com/libdns/libdns v0.2.3/go.mod h1:4Bj9+5CQiNMVGf87wjX4CY3HQJypUHRuLvlsfsZqLWQ=
github.com/mholt/acmez/v3 v3.0.1 h1:4PcjKjaySlgXK857aTfDuRbmnM5gb3Ruz3tvoSJAUp8=
@@ -451,6 +454,9 @@ google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwl
google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/errgo.v2 v2.1.0 h1:0vLT13EuvQ0hNvakwLuFZ/jYrLp5F3kcWHXdRggjCE8=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -1,14 +1,17 @@
package requests
import (
"fmt"
"net/http"
"github.com/gospider007/tools"
"gopkg.in/errgo.v2/fmt/errors"
)
func defaultHeaders() http.Header {
return http.Header{
"User-Agent": []string{tools.UserAgent},
"Connection": []string{"keep-alive"},
"Accept": []string{"text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7"},
"Accept-Encoding": []string{"gzip, deflate, br, zstd"},
"Accept-Language": []string{tools.AcceptLanguage},
@@ -18,26 +21,29 @@ func defaultHeaders() http.Header {
}
}
func (obj *RequestOption) initHeaders() (http.Header, error) {
func (obj *RequestOption) initOrderHeaders() (http.Header, error) {
if obj.Headers == nil {
return nil, nil
return defaultHeaders(), nil
}
switch headers := obj.Headers.(type) {
case http.Header:
return headers.Clone(), nil
case *OrderMap:
head, order := headers.parseHeaders()
obj.OrderHeaders = order
return head, nil
return headers, nil
case *OrderData:
obj.orderHeaders = headers
return make(http.Header), nil
case map[string]any:
results := make(http.Header)
for key, val := range headers {
results.Add(key, fmt.Sprintf("%v", val))
}
return results, nil
case map[string]string:
results := make(http.Header)
for key, val := range headers {
results.Add(key, val)
}
return results, nil
default:
_, dataMap, _, err := obj.newBody(headers, mapType)
if err != nil {
return nil, err
}
if dataMap == nil {
return nil, nil
}
head, _ := dataMap.parseHeaders()
return head, err
return nil, errors.New("headers type error")
}
}

143
http.go
View File

@@ -8,8 +8,7 @@ import (
"io"
"net"
"net/http"
"slices"
"sort"
"strings"
"time"
"github.com/gospider007/tools"
@@ -19,8 +18,11 @@ import (
type httpTask struct {
req *http.Request
res *http.Response
orderHeaders []string
err error
orderHeaders []interface {
Key() string
Val() any
}
err error
ctx context.Context
cnl context.CancelFunc
@@ -206,7 +208,9 @@ func (obj *conn2) keepSendClose() {
var errLastTaskRuning = errors.New("last task is running")
func (obj *conn2) run() (err error) {
defer obj.CloseWithError(err)
defer func() {
obj.CloseWithError(err)
}()
for {
select {
case <-obj.ctx.Done():
@@ -217,12 +221,14 @@ func (obj *conn2) run() (err error) {
obj.keepSendDisable()
go obj.httpWrite(task, task.req.Header.Clone())
task.res, task.err = http.ReadResponse(obj.r, nil)
if task.res != nil && task.res.Body != nil {
if task.res != nil && task.res.Body != nil && task.err == nil {
rawBody := task.res.Body
pr, pw := io.Pipe()
go func() {
var readErr error
defer task.readCnl(readErr)
defer func() {
task.readCnl(readErr)
}()
_, readErr = io.Copy(pw, rawBody)
pw.CloseWithError(readErr)
if readErr != nil && readErr != io.EOF && readErr != io.ErrUnexpectedEOF {
@@ -235,6 +241,15 @@ func (obj *conn2) run() (err error) {
} else {
select {
case <-task.writeCtx.Done():
if task.res.StatusCode == 101 || strings.Contains(task.res.Header.Get("Content-Type"), "text/event-stream") {
obj.keepSendClose()
select {
case <-obj.ctx.Done():
return
case <-obj.closeCtx.Done():
return
}
}
default:
readErr = tools.WrapError(errLastTaskRuning, errors.New("last task not write done with read done"))
task.err = readErr
@@ -274,7 +289,10 @@ func (obj *conn2) CloseWithError(err error) error {
}
return obj.conn.Close()
}
func (obj *conn2) DoRequest(req *http.Request, orderHeaders []string) (*http.Response, context.Context, error) {
func (obj *conn2) DoRequest(req *http.Request, orderHeaders []interface {
Key() string
Val() any
}) (*http.Response, context.Context, error) {
readCtx, readCnl := context.WithCancelCause(obj.closeCtx)
writeCtx, writeCnl := context.WithCancel(obj.closeCtx)
ctx, cnl := context.WithCancel(req.Context())
@@ -296,6 +314,8 @@ func (obj *conn2) DoRequest(req *http.Request, orderHeaders []string) (*http.Res
case <-obj.closeCtx.Done():
return nil, task.readCtx, obj.closeCtx.Err()
case obj.tasks <- task:
default:
return nil, nil, errLastTaskRuning
}
select {
case <-obj.ctx.Done():
@@ -309,18 +329,40 @@ func (obj *conn2) DoRequest(req *http.Request, orderHeaders []string) (*http.Res
}
return task.res, task.readCtx, task.err
}
type websocketConn struct {
r io.Reader
w io.WriteCloser
cnl context.CancelCauseFunc
}
func (obj *websocketConn) Read(p []byte) (n int, err error) {
return obj.r.Read(p)
}
func (obj *websocketConn) Write(p []byte) (n int, err error) {
return obj.w.Write(p)
}
func (obj *websocketConn) Close() error {
obj.cnl(nil)
return obj.w.Close()
}
func (obj *conn2) Stream() io.ReadWriteCloser {
obj.keepSendClose()
return obj.conn
return &websocketConn{
cnl: obj.closeCnl,
r: obj.r,
w: obj.conn,
}
// return obj.conn
}
func (obj *conn2) httpWrite(task *httpTask, rawHeaders http.Header) {
defer task.writeCnl()
defer func() {
if task.err != nil {
obj.CloseWithError(tools.WrapError(task.err, "failed to send request body"))
}
task.writeCnl()
}()
host := task.req.Host
if host == "" {
host = task.req.URL.Host
@@ -330,39 +372,15 @@ func (obj *conn2) httpWrite(task *httpTask, rawHeaders http.Header) {
return
}
host = removeZone(host)
if rawHeaders.Get("Host") == "" {
rawHeaders.Set("Host", host)
}
if rawHeaders.Get("Connection") == "" {
rawHeaders.Set("Connection", "keep-alive")
contentL, chunked := tools.GetContentLength(task.req)
if contentL >= 0 {
rawHeaders.Set("Content-Length", fmt.Sprint(contentL))
} else if chunked {
rawHeaders.Set("Transfer-Encoding", "chunked")
}
if rawHeaders.Get("User-Agent") == "" {
rawHeaders.Set("User-Agent", tools.UserAgent)
}
if rawHeaders.Get("Content-Length") == "" && task.req.ContentLength != 0 && shouldSendContentLength(task.req) {
rawHeaders.Set("Content-Length", fmt.Sprint(task.req.ContentLength))
}
writeHeaders := [][2]string{}
for k, vs := range rawHeaders {
for _, v := range vs {
writeHeaders = append(writeHeaders, [2]string{k, v})
}
}
sort.Slice(writeHeaders, func(x, y int) bool {
xI := slices.Index(task.orderHeaders, writeHeaders[x][0])
yI := slices.Index(task.orderHeaders, writeHeaders[y][0])
if xI < 0 {
return false
}
if yI < 0 {
return true
}
if xI <= yI {
return true
}
return false
})
ruri := task.req.URL.RequestURI()
if task.req.Method == "CONNECT" && task.req.URL.Path == "" {
if task.req.URL.Opaque != "" {
@@ -374,7 +392,7 @@ func (obj *conn2) httpWrite(task *httpTask, rawHeaders http.Header) {
if _, task.err = obj.w.WriteString(fmt.Sprintf("%s %s %s\r\n", task.req.Method, ruri, task.req.Proto)); task.err != nil {
return
}
for _, kv := range writeHeaders {
for _, kv := range tools.NewHeadersWithH1(task.orderHeaders, rawHeaders) {
if _, task.err = obj.w.WriteString(fmt.Sprintf("%s: %s\r\n", kv[0], kv[1])); task.err != nil {
return
}
@@ -386,8 +404,47 @@ func (obj *conn2) httpWrite(task *httpTask, rawHeaders http.Header) {
task.err = obj.w.Flush()
return
}
if _, task.err = io.Copy(obj.w, task.req.Body); task.err != nil {
return
if chunked {
chunkedWriter := newChunkedWriter(obj.w)
if _, task.err = io.Copy(chunkedWriter, task.req.Body); task.err != nil {
return
}
if task.err = chunkedWriter.Close(); task.err != nil {
return
}
} else {
if _, task.err = io.Copy(obj.w, task.req.Body); task.err != nil {
return
}
}
task.err = obj.w.Flush()
}
func newChunkedWriter(w *bufio.Writer) io.WriteCloser {
return &chunkedWriter{w}
}
type chunkedWriter struct {
w *bufio.Writer
}
func (cw *chunkedWriter) Write(data []byte) (n int, err error) {
if len(data) == 0 {
return 0, nil
}
if _, err = fmt.Fprintf(cw.w, "%x\r\n", len(data)); err != nil {
return 0, err
}
if _, err = cw.w.Write(data); err != nil {
return
}
if _, err = io.WriteString(cw.w, "\r\n"); err != nil {
return
}
return len(data), cw.w.Flush()
}
func (cw *chunkedWriter) Close() error {
_, err := io.WriteString(cw.w, "0\r\n\r\n")
return err
}

141
option.go
View File

@@ -1,10 +1,15 @@
package requests
import (
"bytes"
"context"
"crypto/rand"
"crypto/tls"
"errors"
"fmt"
"io"
"net/url"
"strings"
"time"
"github.com/gospider007/ja3"
@@ -56,7 +61,6 @@ type ClientOption struct {
USpec any //support ja3.USpec,uquic.QUICID,bool
Proxy string //proxy,support https,http,socks5
UserAgent string //headers User-Agent value
OrderHeaders []string //order headers
Proxys []string //proxy list,support https,http,socks5
HSpec ja3.HSpec //h2 fingerprint
MaxRetries int //try num
@@ -88,9 +92,10 @@ type RequestOption struct {
Referer string //set headers referer value
ContentType string //headers Content-Type value
ClientOption
Stream bool //disable auto read
DisProxy bool //force disable proxy
once bool
Stream bool //disable auto read
DisProxy bool //force disable proxy
once bool
orderHeaders *OrderData //order headers
}
// Upload files with form-data,
@@ -100,77 +105,89 @@ type File struct {
ContentType string
}
func randomBoundary() string {
var buf [30]byte
io.ReadFull(rand.Reader, buf[:])
boundary := fmt.Sprintf("%x", buf[:])
if strings.ContainsAny(boundary, `()<>@,;:\"/[]?= `) {
boundary = `"` + boundary + `"`
}
return "multipart/form-data; boundary=" + boundary
}
func (obj *RequestOption) initBody(ctx context.Context) (io.Reader, error) {
if obj.Body != nil {
body, _, _, err := obj.newBody(obj.Body, readType)
if err != nil || body == nil {
return nil, err
}
return body, err
} else if obj.Form != nil {
var orderMap *OrderMap
_, orderMap, _, err := obj.newBody(obj.Form, mapType)
body, orderData, _, err := obj.newBody(obj.Body)
if err != nil {
return nil, err
}
if orderMap == nil {
return nil, nil
}
body, contentType, once, err := orderMap.parseForm(ctx)
if err != nil {
return nil, err
}
obj.once = once
if obj.ContentType == "" {
obj.ContentType = contentType
}
if body == nil {
return nil, nil
}
return body, nil
} else if obj.Data != nil {
body, orderMap, _, err := obj.newBody(obj.Data, mapType)
if err != nil {
return body, err
}
if obj.ContentType == "" {
obj.ContentType = "application/x-www-form-urlencoded"
}
if body != nil {
return body, nil
}
if orderMap == nil {
return nil, nil
}
body2 := orderMap.parseData()
if body2 == nil {
return nil, nil
}
return body2, nil
} else if obj.Json != nil {
body, _, _, err := obj.newBody(obj.Json, readType)
con, err := orderData.MarshalJSON()
if err != nil {
return nil, err
}
return bytes.NewReader(con), nil
} else if obj.Form != nil {
if obj.ContentType == "" {
obj.ContentType = randomBoundary()
}
body, orderData, ok, err := obj.newBody(obj.Body)
if err != nil {
return nil, err
}
if !ok {
return nil, errors.New("not support type")
}
if body != nil {
return body, nil
}
body, once, err := orderData.parseForm(ctx)
if err != nil {
return nil, err
}
obj.once = once
return body, err
} else if obj.Data != nil {
if obj.ContentType == "" {
obj.ContentType = "application/x-www-form-urlencoded"
}
body, orderData, ok, err := obj.newBody(obj.Body)
if err != nil {
return nil, err
}
if !ok {
return nil, errors.New("not support type")
}
if body != nil {
return body, nil
}
return orderData.parseData(), nil
} else if obj.Json != nil {
if obj.ContentType == "" {
obj.ContentType = "application/json"
}
if body == nil {
return nil, nil
}
return body, nil
} else if obj.Text != nil {
body, _, _, err := obj.newBody(obj.Text, readType)
body, orderData, _, err := obj.newBody(obj.Json)
if err != nil {
return nil, err
}
if body != nil {
return body, nil
}
return orderData.parseJson()
} else if obj.Text != nil {
if obj.ContentType == "" {
obj.ContentType = "text/plain"
}
if body == nil {
return nil, nil
body, orderData, _, err := obj.newBody(obj.Text)
if err != nil {
return nil, err
}
return body, nil
if body != nil {
return body, nil
}
return orderData.parseText()
} else {
return nil, nil
}
@@ -180,11 +197,23 @@ func (obj *RequestOption) initParams() (*url.URL, error) {
if obj.Params == nil {
return baseUrl, nil
}
_, dataMap, _, err := obj.newBody(obj.Params, mapType)
body, dataData, ok, err := obj.newBody(obj.Params)
if err != nil {
return nil, err
}
query := dataMap.parseParams().String()
if !ok {
return nil, errors.New("not support type")
}
var query string
if body != nil {
paramsBytes, err := io.ReadAll(body)
if err != nil {
return nil, err
}
query = tools.BytesToString(paramsBytes)
} else {
query = dataData.parseParams().String()
}
if query == "" {
return baseUrl, nil
}

View File

@@ -166,7 +166,7 @@ func (obj *Client) Request(ctx context.Context, method string, href string, opti
func (obj *Client) request(ctx *Response) (err error) {
defer func() {
//read body
if err == nil && !ctx.IsWebSocket() && !ctx.IsSSE() && !ctx.IsStream() {
if err == nil && !ctx.IsSSE() && !ctx.IsStream() {
err = ctx.ReadBody()
}
//result callback
@@ -175,7 +175,7 @@ func (obj *Client) request(ctx *Response) (err error) {
err = ctx.option.ResultCallBack(ctx)
}
if err != nil { //err callback, must close body
ctx.CloseBody()
ctx.CloseConn()
if ctx.option.ErrCallBack != nil {
ctx.err = err
if err2 := ctx.option.ErrCallBack(ctx); err2 != nil {
@@ -189,14 +189,6 @@ func (obj *Client) request(ctx *Response) (err error) {
return
}
}
//init headers and orderheaders,befor init ctxData
headers, err := ctx.option.initHeaders()
if err != nil {
return tools.WrapError(err, errors.New("tempRequest init headers error"), err)
}
if headers != nil && ctx.option.UserAgent != "" {
headers.Set("User-Agent", ctx.option.UserAgent)
}
//init tls timeout
if ctx.option.TlsHandshakeTimeout == 0 {
ctx.option.TlsHandshakeTimeout = time.Second * 15
@@ -219,16 +211,13 @@ func (obj *Client) request(ctx *Response) (err error) {
ctx.proxys[i] = tempProxy
}
}
//init headers
if headers == nil {
headers = defaultHeaders()
}
//init ctx,cnl
if ctx.option.Timeout > 0 { //超时
ctx.ctx, ctx.cnl = context.WithTimeout(ctx.Context(), ctx.option.Timeout)
} else {
ctx.ctx, ctx.cnl = context.WithCancel(ctx.Context())
}
var isWebsocket bool
//init Scheme
switch ctx.option.Url.Scheme {
case "file":
@@ -241,11 +230,11 @@ func (obj *Client) request(ctx *Response) (err error) {
case "ws":
ctx.option.ForceHttp1 = true
ctx.option.Url.Scheme = "http"
websocket.SetClientHeadersWithOption(headers, ctx.option.WsOption)
isWebsocket = true
case "wss":
ctx.option.ForceHttp1 = true
ctx.option.Url.Scheme = "https"
websocket.SetClientHeadersWithOption(headers, ctx.option.WsOption)
isWebsocket = true
}
//init url
href, err := ctx.option.initParams()
@@ -253,9 +242,7 @@ func (obj *Client) request(ctx *Response) (err error) {
err = tools.WrapError(err, "url init error")
return
}
if href.User != nil {
headers.Set("Authorization", "Basic "+tools.Base64Encode(href.User.String()))
}
//init body
body, err := ctx.option.initBody(ctx.ctx)
if err != nil {
@@ -266,27 +253,29 @@ func (obj *Client) request(ctx *Response) (err error) {
if err != nil {
return tools.WrapError(errFatal, errors.New("tempRequest 构造request失败"), err)
}
reqs.Header = headers
//add Referer
if reqs.Header.Get("Referer") == "" && ctx.option.Referer != "" {
//init headers
if reqs.Header, err = ctx.option.initOrderHeaders(); err != nil {
return tools.WrapError(err, errors.New("tempRequest init headers error"), err)
}
if isWebsocket && reqs.Header.Get("Sec-WebSocket-Key") == "" {
websocket.SetClientHeadersWithOption(reqs.Header, ctx.option.WsOption)
}
if href.User != nil && reqs.Header.Get("Authorization") == "" {
reqs.Header.Set("Authorization", "Basic "+tools.Base64Encode(href.User.String()))
}
if ctx.option.UserAgent != "" && reqs.Header.Get("User-Agent") == "" {
reqs.Header.Set("User-Agent", ctx.option.UserAgent)
}
if ctx.option.Referer != "" && reqs.Header.Get("Referer") == "" {
reqs.Header.Set("Referer", ctx.option.Referer)
}
//set ContentType
if ctx.option.ContentType != "" && reqs.Header.Get("Content-Type") == "" {
reqs.Header.Set("Content-Type", ctx.option.ContentType)
}
//add host
if ctx.option.Host != "" {
reqs.Host = ctx.option.Host
} else if reqs.Header.Get("Host") != "" {
reqs.Host = reqs.Header.Get("Host")
} else {
reqs.Host = reqs.URL.Host
if ctx.option.Host != "" && reqs.Header.Get("Host") == "" {
reqs.Header.Set("Host", ctx.option.Host)
}
//init headers ok
//init cookies
cookies, err := ctx.option.initCookies()
if err != nil {
@@ -309,10 +298,7 @@ func (obj *Client) request(ctx *Response) (err error) {
if ctx.Body() != nil {
ctx.rawConn = ctx.Body().(*readWriteCloser)
}
if ctx.response.StatusCode == 101 {
ctx.Body()
ctx.webSocket = websocket.NewClientConn(newFakeConn(ctx.rawConn.connStream()), websocket.GetResponseHeaderOption(ctx.response.Header))
} else if strings.Contains(ctx.response.Header.Get("Content-Type"), "text/event-stream") {
if strings.Contains(ctx.response.Header.Get("Content-Type"), "text/event-stream") {
ctx.sse = newSSE(ctx)
} else if encoding := ctx.ContentEncoding(); encoding != "" {
var unCompressionBody io.ReadCloser

View File

@@ -146,7 +146,7 @@ func (obj *SSE) Range() iter.Seq2[Event, error] {
// close SSE
func (obj *SSE) Close() {
obj.response.ForceCloseConn()
obj.response.CloseConn()
}
// return websocket client
@@ -320,19 +320,21 @@ func (obj *Response) IsSSE() bool {
// read body
func (obj *Response) ReadBody() (err error) {
if obj.IsWebSocket() && obj.IsSSE() {
return errors.New("can not read stream")
}
obj.readBodyLock.Lock()
defer func() {
if err != nil {
obj.ForceCloseConn()
}
obj.readBodyLock.Unlock()
}()
if obj.readBody {
return nil
}
defer func() {
if err != nil {
obj.CloseConn()
} else {
obj.Close()
if obj.response.StatusCode == 101 && obj.webSocket == nil {
obj.webSocket = websocket.NewClientConn(newFakeConn(obj.rawConn.connStream()), websocket.GetResponseHeaderOption(obj.response.Header))
}
}
obj.readBodyLock.Unlock()
}()
obj.readBody = true
bBody := bytes.NewBuffer(nil)
done := make(chan struct{})
@@ -364,7 +366,6 @@ func (obj *Response) ReadBody() (err error) {
} else {
obj.content = bBody.Bytes()
}
obj.CloseBody()
return
}
@@ -381,39 +382,28 @@ func (obj *Response) Proxys() []Address {
return nil
}
// close body
func (obj *Response) CloseBody() {
obj.close(false)
}
// safe close conn
func (obj *Response) CloseConn() {
obj.close(true)
}
// close
func (obj *Response) close(closeConn bool) {
func (obj *Response) CloseConn() {
if obj.webSocket != nil {
obj.webSocket.Close()
}
if obj.sse != nil {
obj.sse.Close()
}
if obj.IsWebSocket() || obj.IsSSE() || !obj.readBody {
obj.ForceCloseConn()
} else if obj.rawConn != nil {
if closeConn {
obj.rawConn.CloseConn()
} else {
obj.rawConn.Close()
}
if obj.rawConn != nil {
obj.rawConn.CloseConn()
}
obj.cnl() //must later
}
// force close conn
func (obj *Response) ForceCloseConn() {
// close
func (obj *Response) Close() {
if obj.webSocket != nil {
obj.webSocket.Close()
}
if obj.sse != nil {
obj.sse.Close()
}
if obj.rawConn != nil {
obj.rawConn.ForceCloseConn()
obj.rawConn.Close()
}
}

View File

@@ -82,7 +82,6 @@ func (obj *roundTripper) newConnPool(done chan struct{}, conn *connecotr, task *
pool := new(connPool)
pool.connKey = task.key
pool.forceCtx, pool.forceCnl = context.WithCancelCause(obj.ctx)
pool.safeCtx, pool.safeCnl = context.WithCancelCause(pool.forceCtx)
pool.tasks = make(chan *reqTask)
pool.connPools = obj.connPools
@@ -103,7 +102,7 @@ func (obj *roundTripper) putConnPool(task *reqTask, conn *connecotr) {
}
func (obj *roundTripper) newConnecotr() *connecotr {
conne := new(connecotr)
conne.withCancel(obj.ctx, obj.ctx)
conne.withCancel(obj.ctx)
return conne
}
@@ -137,9 +136,18 @@ func (obj *roundTripper) ghttp3Dial(ctx *Response, remoteAddress Address, proxyA
if ctx.option.UquicConfig != nil {
quicConfig = ctx.option.QuicConfig.Clone()
}
var udpCtx context.Context
if ct, ok := udpConn.(interface {
Context() context.Context
}); ok {
udpCtx = ct.Context()
}
netConn, err := quic.DialEarly(ctx.Context(), udpConn, &net.UDPAddr{IP: remoteAddress.IP, Port: remoteAddress.Port}, tlsConfig, quicConfig)
if err != nil {
return nil, err
}
conn = obj.newConnecotr()
conn.Conn = http3.NewClient(netConn, func() {
conn.Conn, err = http3.NewClient(udpCtx, netConn, func() {
conn.forceCnl(errors.New("http3 client close"))
})
return
@@ -163,14 +171,24 @@ func (obj *roundTripper) uhttp3Dial(ctx *Response, spec uquic.QUICSpec, remoteAd
if ctx.option.UquicConfig != nil {
quicConfig = ctx.option.UquicConfig.Clone()
}
var udpCtx context.Context
if ct, ok := udpConn.(interface {
Context() context.Context
}); ok {
udpCtx = ct.Context()
}
netConn, err := (&uquic.UTransport{
Transport: &uquic.Transport{
Conn: udpConn,
},
QUICSpec: &spec,
}).DialEarly(ctx.Context(), &net.UDPAddr{IP: remoteAddress.IP, Port: remoteAddress.Port}, tlsConfig, quicConfig)
if err != nil {
return nil, err
}
conn = obj.newConnecotr()
conn.Conn = http3.NewUClient(netConn, func() {
conn.Conn, err = http3.NewClient(udpCtx, netConn, func() {
conn.forceCnl(errors.New("http3 client close"))
})
return
@@ -243,13 +261,13 @@ func (obj *roundTripper) dial(ctx *Response) (conn *connecotr, err error) {
}
func (obj *roundTripper) dialConnecotr(ctx *Response, conne *connecotr, h2 bool) (err error) {
if h2 {
if conne.Conn, err = http2.NewClientConn(ctx.Context(), conne.c, ctx.option.HSpec, func() {
conne.forceCnl(errors.New("http2 client close"))
if conne.Conn, err = http2.NewClientConn(ctx.Context(), conne.c, ctx.option.HSpec, func(err error) {
conne.forceCnl(tools.WrapError(err, "http2 client close"))
}); err != nil {
return err
}
} else {
conne.Conn = newConn2(conne.safeCtx, conne.c, func(err error) {
conne.Conn = newConn2(conne.forceCtx, conne.c, func(err error) {
conne.forceCnl(tools.WrapError(err, "http1 client close"))
})
// conne.Conn = newRoudTrip(conne.forceCtx, conne.c, func(err error) {
@@ -375,16 +393,11 @@ func (obj *roundTripper) newRoudTrip(task *reqTask) {
func (obj *roundTripper) closeConns() {
for key, pool := range obj.connPools.Range() {
pool.safeClose()
obj.connPools.del(key)
}
}
func (obj *roundTripper) forceCloseConns() {
for key, pool := range obj.connPools.Range() {
pool.forceClose()
pool.close(errors.New("close all conn"))
obj.connPools.del(key)
}
}
func (obj *roundTripper) newReqTask(ctx *Response) *reqTask {
if ctx.option.ResponseHeaderTimeout == 0 {
ctx.option.ResponseHeaderTimeout = time.Second * 300

43
rw.go
View File

@@ -4,63 +4,38 @@ import (
"context"
"errors"
"io"
"sync/atomic"
"github.com/gospider007/tools"
)
type readWriteCloser struct {
body io.ReadCloser
err error
conn *connecotr
isClosed atomic.Bool
body io.ReadCloser
conn *connecotr
}
func (obj *readWriteCloser) connStream() io.ReadWriteCloser {
return obj.conn.Conn.Stream()
}
func (obj *readWriteCloser) Read(p []byte) (n int, err error) {
if obj.isClosed.Load() {
return 0, obj.err
}
i, err := obj.body.Read(p)
if err != nil {
obj.err = err
if err == io.EOF {
obj.Close()
}
}
return i, err
return obj.body.Read(p)
}
func (obj *readWriteCloser) Proxys() []Address {
return obj.conn.proxys
}
func (obj *readWriteCloser) Close() (err error) {
return obj.CloseWithError(nil)
}
func (obj *readWriteCloser) ConnCloseCtx() context.Context {
return obj.conn.Conn.CloseCtx()
}
func (obj *readWriteCloser) CloseWithError(err error) error {
if err == nil {
obj.err = io.EOF
} else {
err = tools.WrapError(obj.err, err)
obj.err = err
if err != nil {
obj.conn.CloseWithError(err)
}
obj.isClosed.Store(true)
// obj.conn.bodyCnl(err)
return obj.body.Close() //reuse conn
}
func (obj *readWriteCloser) Close() error {
return obj.CloseWithError(nil)
}
// safe close conn
func (obj *readWriteCloser) CloseConn() {
// obj.conn.bodyCnl(errors.New("readWriterCloser close conn"))
obj.conn.safeCnl(errors.New("readWriterCloser close conn"))
}
// force close conn
func (obj *readWriteCloser) ForceCloseConn() {
obj.conn.forceCnl(errors.New("readWriterCloser close conn"))
obj.conn.CloseWithError(errConnectionForceClosed)
}

View File

@@ -2,6 +2,7 @@ package requests
import (
"bytes"
"context"
"encoding/binary"
"errors"
"io"
@@ -126,6 +127,7 @@ func ReadUdpAddr(r io.Reader) (Address, error) {
}
type UDPConn struct {
ctx context.Context
proxyAddress net.Addr
net.PacketConn
prefix []byte
@@ -133,8 +135,9 @@ type UDPConn struct {
bufWrite [MaxUdpPacket]byte
}
func NewUDPConn(packConn net.PacketConn, proxyAddress net.Addr) *UDPConn {
func NewUDPConn(ctx context.Context, packConn net.PacketConn, proxyAddress net.Addr) *UDPConn {
return &UDPConn{
ctx: ctx,
PacketConn: packConn,
proxyAddress: proxyAddress,
prefix: []byte{0, 0, 0},
@@ -183,3 +186,6 @@ func (c *UDPConn) SetReadBuffer(i int) error {
func (c *UDPConn) SetWriteBuffer(i int) error {
return c.PacketConn.(*net.UDPConn).SetWriteBuffer(i)
}
func (c *UDPConn) Context() context.Context {
return c.ctx
}

View File

@@ -12,14 +12,14 @@ import (
func TestOrderHeaders(t *testing.T) {
headers := requests.NewOrderMap()
headers.Set("Accept-Encoding", "gzip, deflate, br")
headers.Set("Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7")
headers.Set("User-Agent", tools.UserAgent)
headers.Set("Accept-Language", tools.AcceptLanguage)
headers.Set("Sec-Ch-Ua", tools.SecChUa)
headers.Set("Sec-Ch-Ua-Mobile", "?0")
headers.Set("Sec-Ch-Ua-Platform", `"Windows"`)
headers := requests.NewOrderData()
headers.Add("Accept-Encoding", "gzip, deflate, br")
headers.Add("Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7")
headers.Add("User-Agent", tools.UserAgent)
headers.Add("Accept-Language", tools.AcceptLanguage)
headers.Add("Sec-Ch-Ua", tools.SecChUa)
headers.Add("Sec-Ch-Ua-Mobile", "?0")
headers.Add("Sec-Ch-Ua-Platform", `"Windows"`)
resp, err := requests.Get(nil, "https://tools.scrapfly.io/api/fp/anything", requests.RequestOption{
ClientOption: requests.ClientOption{
Headers: headers,
@@ -76,8 +76,7 @@ func TestOrderHeaders2(t *testing.T) {
resp, err := requests.Get(nil, "https://tools.scrapfly.io/api/fp/anything", requests.RequestOption{
ClientOption: requests.ClientOption{
Headers: headers,
OrderHeaders: orderHeaders,
Headers: headers,
},
// ForceHttp1: true,
})

View File

@@ -51,7 +51,7 @@ func TestSse(t *testing.T) {
if err != nil {
t.Error(err)
}
defer response.CloseBody()
defer response.CloseConn()
sseCli := response.SSE()
defer sseCli.Close()
if sseCli == nil {

View File

@@ -4,6 +4,7 @@ import (
"log"
"strings"
"testing"
"time"
"github.com/gospider007/requests"
"github.com/gospider007/websocket"
@@ -14,12 +15,13 @@ func TestWebSocket(t *testing.T) {
if err != nil {
log.Panic(err)
}
defer response.CloseBody()
defer response.CloseConn()
wsCli := response.WebSocket()
defer wsCli.Close()
if err = wsCli.WriteMessage(websocket.TextMessage, "test1122332211"); err != nil { // Send text message
log.Panic(err)
}
n := 0
for {
msgType, con, err := wsCli.ReadMessage() // Receive message
@@ -31,7 +33,14 @@ func TestWebSocket(t *testing.T) {
}
log.Print(string(con))
if strings.Contains(string(con), "test1122332211") {
break
n++
if n > 6 {
break
}
}
if err = wsCli.WriteMessage(websocket.TextMessage, "test1122332211"); err != nil { // Send text message
log.Panic(err)
}
time.Sleep(time.Second)
}
}

View File

@@ -90,10 +90,10 @@ func TestSendFormWithGson(t *testing.T) {
}
}
func TestSendFormWithOrderMap(t *testing.T) {
orderMap := requests.NewOrderMap()
orderMap.Set("name", "test")
orderMap.Set("age", 11)
orderMap.Set("sex", "boy")
orderMap := requests.NewOrderData()
orderMap.Add("name", "test")
orderMap.Add("age", 11)
orderMap.Add("sex", "boy")
resp, err := requests.Post(nil, "https://httpbin.org/anything", requests.RequestOption{
Form: orderMap,

View File

@@ -99,12 +99,12 @@ func TestSendJsonWithGson(t *testing.T) {
}
}
func TestSendJsonWithOrder(t *testing.T) {
orderMap := requests.NewOrderMap()
orderMap.Set("age", "1")
orderMap.Set("age4", "4")
orderMap.Set("Name", "test")
orderMap.Set("age2", "2")
orderMap.Set("age3", []string{"22", "121"})
orderMap := requests.NewOrderData()
orderMap.Add("age", "1")
orderMap.Add("age4", "4")
orderMap.Add("Name", "test")
orderMap.Add("age2", "2")
orderMap.Add("age3", []string{"22", "121"})
bodyJson, err := gson.Encode(orderMap)
if err != nil {

View File

@@ -32,7 +32,7 @@ func TestStream(t *testing.T) {
// t.Log(string(con))
// t.Log(resp.Text())
time.Sleep(2 * time.Second)
resp.CloseBody()
resp.CloseConn()
time.Sleep(2 * time.Second)
if resp.StatusCode() != 200 {
t.Fatal("resp.StatusCode()!= 200")

View File

@@ -129,43 +129,6 @@ func removeZone(host string) string {
return host[:j] + host[i:]
}
func chunked(te []string) bool { return len(te) > 0 && te[0] == "chunked" }
func isIdentity(te []string) bool { return len(te) == 1 && te[0] == "identity" }
func shouldSendContentLength(t *http.Request) bool {
if chunked(t.TransferEncoding) {
return false
}
if t.ContentLength > 0 {
return true
}
if t.ContentLength < 0 {
return false
}
// Many servers expect a Content-Length for these methods
if t.Method == "POST" || t.Method == "PUT" || t.Method == "PATCH" {
return true
}
if t.ContentLength == 0 && isIdentity(t.TransferEncoding) {
if t.Method == "GET" || t.Method == "HEAD" {
return false
}
return true
}
return false
}
func hasPort(s string) bool { return strings.LastIndex(s, ":") > strings.LastIndex(s, "]") }
// removeEmptyPort strips the empty port in ":port" to ""
// as mandated by RFC 3986 Section 6.2.3.
func removeEmptyPort(host string) string {
if hasPort(host) {
return strings.TrimSuffix(host, ":")
}
return host
}
func outgoingLength(r *http.Request) int64 {
if r.Body == nil || r.Body == http.NoBody {
return 0
@@ -232,13 +195,11 @@ func NewRequestWithContext(ctx context.Context, method string, u *url.URL, body
}
req.URL = u
req.Proto = "HTTP/1.1"
req.ProtoMajor = 1
req.ProtoMinor = 1
req.Host = u.Host
u.Host = removeEmptyPort(u.Host)
if body != nil {
if v, ok := body.(interface{ Len() int }); ok {
req.ContentLength = int64(v.Len())
} else {
req.ContentLength = -1
}
if _, ok := body.(io.Seeker); ok {
req.Body = &requestBody{body}
@@ -247,6 +208,8 @@ func NewRequestWithContext(ctx context.Context, method string, u *url.URL, body
} else {
req.Body = io.NopCloser(body)
}
} else {
req.ContentLength = -1
}
return req, nil
}