This commit is contained in:
gospider
2025-02-20 08:33:22 +08:00
parent 252fbf4cda
commit a31eb9ed6a
5 changed files with 224 additions and 153 deletions

135
conn.go
View File

@@ -20,21 +20,33 @@ var maxRetryCount = 10
type Conn interface {
CloseWithError(err error) error
DoRequest(*http.Request, []string) (*http.Response, error)
CloseCtx() context.Context
}
type conn struct {
err error
r *bufio.Reader
w *bufio.Writer
conn net.Conn
closeFunc func()
err error
r *bufio.Reader
w *bufio.Writer
conn net.Conn
bodyLock sync.Mutex
bodyRun atomic.Bool
closeFunc func(error)
closeCtx context.Context
closeCnl context.CancelCauseFunc
}
func newConn(ctx context.Context, con net.Conn, closeFunc func()) *conn {
var errIoCopyClosedOk = errors.New("io copy is closed ok")
func newConn(ctx context.Context, con net.Conn, closeFunc func(error)) *conn {
c := &conn{
conn: con,
closeFunc: closeFunc,
}
c.closeCtx, c.closeCnl = context.WithCancelCause(ctx)
pr, pw := io.Pipe()
// c.r = bufio.NewReader(pr)
// c.w = bufio.NewWriter(con)
c.r = bufio.NewReader(pr)
c.w = bufio.NewWriter(c)
go func() {
@@ -44,13 +56,22 @@ func newConn(ctx context.Context, con net.Conn, closeFunc func()) *conn {
})
defer stop()
_, err := io.Copy(pw, c.conn)
c.closeCnl(err)
if c.err == nil {
c.CloseWithError(err)
if err == nil {
c.CloseWithError(errIoCopyClosedOk)
} else {
c.CloseWithError(err)
}
}
pr.CloseWithError(c.err)
pw.CloseWithError(c.err)
}()
return c
}
func (obj *conn) CloseCtx() context.Context {
return obj.closeCtx
}
func (obj *conn) Close() error {
return obj.CloseWithError(nil)
}
@@ -61,15 +82,14 @@ func (obj *conn) CloseWithError(err error) error {
obj.err = tools.WrapError(err, "connecotr closeWithError close")
}
if obj.closeFunc != nil {
obj.closeFunc()
obj.closeFunc(obj.err)
}
return obj.conn.Close()
}
func (obj *conn) DoRequest(req *http.Request, orderHeaders []string) (*http.Response, error) {
err := httpWrite(req, obj.w, orderHeaders)
if err != nil {
return nil, err
}
go func() {
obj.httpWrite(req, orderHeaders)
}()
res, err := http.ReadResponse(obj.r, req)
if err != nil {
err = tools.WrapError(err, "http1 read error")
@@ -80,9 +100,11 @@ func (obj *conn) DoRequest(req *http.Request, orderHeaders []string) (*http.Resp
}
return res, err
}
func (obj *conn) Read(b []byte) (i int, err error) {
return obj.r.Read(b)
}
func (obj *conn) Write(b []byte) (int, error) {
return obj.conn.Write(b)
}
@@ -111,8 +133,9 @@ type connecotr struct {
bodyCtx context.Context //body close
bodyCnl context.CancelCauseFunc
Conn Conn
c net.Conn
proxys []Address
c net.Conn
proxys []Address
}
func (obj *connecotr) withCancel(forceCtx context.Context, safeCtx context.Context) {
@@ -138,43 +161,39 @@ func (obj *connecotr) wrapBody(task *reqTask) {
task.reqCtx.response.Body = body
}
func (obj *connecotr) httpReq(task *reqTask, done chan struct{}) {
if task.reqCtx.response, task.err = obj.Conn.DoRequest(task.reqCtx.request, task.reqCtx.option.OrderHeaders); task.reqCtx.response != nil && task.err == nil {
defer close(done)
task.reqCtx.response, task.err = obj.Conn.DoRequest(task.reqCtx.request, task.reqCtx.option.OrderHeaders)
if task.reqCtx.response != nil {
obj.wrapBody(task)
} else if task.err != nil {
}
if task.err != nil {
task.err = tools.WrapError(task.err, "roundTrip error")
}
close(done)
}
func (obj *connecotr) taskMain(task *reqTask) (retry bool) {
func (obj *connecotr) taskMain(task *reqTask) (isNotice bool) {
task.head = make(chan struct{})
defer func() {
if retry {
task.retry++
if task.retry > maxRetryCount {
retry = false
}
}
if task.err != nil && task.reqCtx.option.ErrCallBack != nil {
task.reqCtx.err = task.err
if err2 := task.reqCtx.option.ErrCallBack(task.reqCtx); err2 != nil {
retry = false
isNotice = false
task.disRetry = true
task.err = err2
}
}
if retry {
task.err = nil
if task.err != nil {
obj.CloseWithError(errors.New("taskMain retry close"))
if task.reqCtx.response != nil && task.reqCtx.response.Body != nil {
task.reqCtx.response.Body.Close()
}
} else {
task.cnl()
if task.err == nil && task.reqCtx.response != nil && task.reqCtx.response.Body != nil {
if task.reqCtx.response != nil && task.reqCtx.response.Body != nil {
task.cnl()
select {
case <-obj.bodyCtx.Done(): //wait body close
if task.err = context.Cause(obj.bodyCtx); !errors.Is(task.err, errGospiderBodyClose) {
task.err = tools.WrapError(task.err, "bodyCtx close")
} else {
task.err = nil
}
case <-task.reqCtx.Context().Done(): //wait request close
task.err = tools.WrapError(context.Cause(task.reqCtx.Context()), "requestCtx close")
@@ -200,9 +219,15 @@ func (obj *connecotr) taskMain(task *reqTask) (retry bool) {
}()
select {
case <-obj.safeCtx.Done():
return true
task.err = obj.safeCtx.Err()
task.enableRetry = true
isNotice = true
return
case <-obj.forceCtx.Done(): //force conn close
return true
task.err = obj.forceCtx.Err()
task.enableRetry = true
isNotice = true
return
default:
}
done := make(chan struct{})
@@ -210,17 +235,12 @@ func (obj *connecotr) taskMain(task *reqTask) (retry bool) {
select {
case <-task.ctx.Done():
task.err = tools.WrapError(context.Cause(task.ctx), "task.ctx error: ")
return false
case <-done:
if task.err != nil {
return task.suppertRetry()
}
if task.reqCtx.response == nil {
task.err = context.Cause(task.ctx)
if task.err == nil {
task.err = errors.New("response is nil")
}
return task.suppertRetry()
}
if task.reqCtx.option.Logger != nil {
task.reqCtx.option.Logger(Log{
@@ -230,20 +250,10 @@ func (obj *connecotr) taskMain(task *reqTask) (retry bool) {
Msg: "response header",
})
}
return false
case <-obj.forceCtx.Done(): //force conn close
err := context.Cause(obj.forceCtx)
task.err = tools.WrapError(err, "taskMain delete ctx error: ")
select {
case <-obj.parentForceCtx.Done():
return false
default:
if errors.Is(err, errConnectionForceClosed) {
return false
}
return true
}
task.err = tools.WrapError(context.Cause(obj.forceCtx), "taskMain delete ctx error: ")
}
return false
}
type connPool struct {
@@ -263,15 +273,15 @@ type connPools struct {
func newConnPools() *connPools {
return new(connPools)
}
func (obj *connPools) get(key string) *connPool {
val, ok := obj.connPools.Load(key)
func (obj *connPools) get(task *reqTask) *connPool {
val, ok := obj.connPools.Load(task.key)
if !ok {
return nil
}
return val.(*connPool)
}
func (obj *connPools) set(key string, pool *connPool) {
obj.connPools.Store(key, pool)
func (obj *connPools) set(task *reqTask, pool *connPool) {
obj.connPools.Store(task.key, pool)
}
func (obj *connPools) del(key string) {
obj.connPools.Delete(key)
@@ -283,10 +293,14 @@ func (obj *connPools) Range() iter.Seq2[string, *connPool] {
})
}
}
func (obj *connPool) notice(task *reqTask) {
func (obj *connPool) notices(task *reqTask) bool {
select {
case obj.tasks <- task:
case task.emptyPool <- struct{}{}:
return true
default:
task.isNotice = true
return false
}
}
func (obj *connPool) rwMain(done chan struct{}, conn *connecotr) {
@@ -309,9 +323,12 @@ func (obj *connPool) rwMain(done chan struct{}, conn *connecotr) {
if task == nil {
return
}
if conn.taskMain(task) {
obj.notice(task)
return
task.isNotice = false
task.disRetry = false
task.enableRetry = false
task.err = nil
if !conn.taskMain(task) || !obj.notices(task) {
task.cnl()
}
if task.err != nil {
return

View File

@@ -5,18 +5,20 @@ import (
"bytes"
"context"
"errors"
"github.com/gospider007/bar"
"github.com/gospider007/bs4"
"github.com/gospider007/gson"
"github.com/gospider007/re"
"github.com/gospider007/tools"
"github.com/gospider007/websocket"
"io"
"iter"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"github.com/gospider007/bar"
"github.com/gospider007/bs4"
"github.com/gospider007/gson"
"github.com/gospider007/re"
"github.com/gospider007/tools"
"github.com/gospider007/websocket"
)
func NewResponse(ctx context.Context, option RequestOption) *Response {
@@ -54,23 +56,24 @@ func (obj *Response) Client() *Client {
}
type Response struct {
err error
ctx context.Context
request *http.Request
rawConn *readWriteCloser
response *http.Response
webSocket *websocket.Conn
sse *SSE
cnl context.CancelFunc
option *RequestOption
client *Client
encoding string
filePath string
requestId string
content []byte
proxys []*url.URL
readBody bool
isNewConn bool
err error
ctx context.Context
request *http.Request
rawConn *readWriteCloser
response *http.Response
webSocket *websocket.Conn
sse *SSE
cnl context.CancelFunc
option *RequestOption
client *Client
encoding string
filePath string
requestId string
content []byte
proxys []*url.URL
readBody bool
readBodyLock sync.Mutex
isNewConn bool
}
type SSE struct {
reader *bufio.Reader
@@ -317,16 +320,19 @@ func (obj *Response) IsSSE() bool {
// read body
func (obj *Response) ReadBody() (err error) {
if obj.readBody {
return nil
}
obj.readBodyLock.Lock()
defer obj.readBodyLock.Unlock()
if obj.IsWebSocket() && obj.IsSSE() {
return errors.New("can not read stream")
}
if obj.readBody {
return nil
}
obj.readBody = true
bBody := bytes.NewBuffer(nil)
done := make(chan struct{})
go func() {
defer close(done)
if obj.option.Bar && obj.ContentLength() > 0 {
_, err = io.Copy(&barBody{
bar: bar.NewClient(obj.response.ContentLength),
@@ -338,7 +344,6 @@ func (obj *Response) ReadBody() (err error) {
if err == io.ErrUnexpectedEOF {
err = nil
}
close(done)
}()
select {
case <-obj.ctx.Done():

View File

@@ -21,15 +21,25 @@ import (
)
type reqTask struct {
ctx context.Context
cnl context.CancelFunc
reqCtx *Response
emptyPool chan struct{}
err error
retry int
head chan struct{}
ctx context.Context
cnl context.CancelFunc
reqCtx *Response
err error
enableRetry bool
disRetry bool
isNotice bool
key string
}
func (obj *reqTask) suppertRetry() bool {
if obj.disRetry {
return false
}
if obj.enableRetry {
return true
}
if obj.reqCtx.request.Body == nil {
return true
} else if body, ok := obj.reqCtx.request.Body.(io.Seeker); ok {
@@ -69,9 +79,9 @@ func newRoundTripper(preCtx context.Context) *roundTripper {
connPools: newConnPools(),
}
}
func (obj *roundTripper) newConnPool(done chan struct{}, conn *connecotr, key string) *connPool {
func (obj *roundTripper) newConnPool(done chan struct{}, conn *connecotr, task *reqTask) *connPool {
pool := new(connPool)
pool.connKey = key
pool.connKey = task.key
pool.forceCtx, pool.forceCnl = context.WithCancelCause(obj.ctx)
pool.safeCtx, pool.safeCnl = context.WithCancelCause(pool.forceCtx)
pool.tasks = make(chan *reqTask)
@@ -81,14 +91,14 @@ func (obj *roundTripper) newConnPool(done chan struct{}, conn *connecotr, key st
go pool.rwMain(done, conn)
return pool
}
func (obj *roundTripper) putConnPool(key string, conn *connecotr) {
pool := obj.connPools.get(key)
func (obj *roundTripper) putConnPool(task *reqTask, conn *connecotr) {
pool := obj.connPools.get(task)
done := make(chan struct{})
if pool != nil {
pool.total.Add(1)
go pool.rwMain(done, conn)
} else {
obj.connPools.set(key, obj.newConnPool(done, conn, key))
obj.connPools.set(task, obj.newConnPool(done, conn, task))
}
<-done
}
@@ -243,8 +253,8 @@ func (obj *roundTripper) dialConnecotr(ctx *Response, conne *connecotr, h2 bool)
return err
}
} else {
conne.Conn = newConn(conne.forceCtx, conne.c, func() {
conne.forceCnl(errors.New("http1 client close"))
conne.Conn = newConn(conne.forceCtx, conne.c, func(err error) {
conne.forceCnl(tools.WrapError(err, "http1 client close"))
})
}
return err
@@ -326,38 +336,48 @@ func (obj *roundTripper) initProxys(ctx *Response) ([]Address, error) {
return proxys, nil
}
func (obj *roundTripper) poolRoundTrip(pool *connPool, task *reqTask, key string) (isOk bool, err error) {
func (obj *roundTripper) poolRoundTrip(task *reqTask) {
connPool := obj.connPools.get(task)
if connPool == nil {
obj.createPool(task)
if task.err != nil {
return
}
obj.poolRoundTrip(task)
return
}
task.ctx, task.cnl = context.WithTimeout(task.reqCtx.Context(), task.reqCtx.option.ResponseHeaderTimeout)
select {
case pool.tasks <- task:
select {
case <-task.emptyPool:
return false, nil
case <-task.ctx.Done():
if task.err == nil && task.reqCtx.response == nil {
task.err = context.Cause(task.ctx)
}
return true, task.err
case connPool.tasks <- task:
<-task.ctx.Done()
if task.err == nil && task.reqCtx.response == nil {
task.err = context.Cause(task.ctx)
}
default:
return obj.createPool(task, key)
obj.createPool(task)
if task.err != nil {
return
}
obj.poolRoundTrip(task)
}
}
func (obj *roundTripper) createPool(task *reqTask, key string) (isOk bool, err error) {
func (obj *roundTripper) createPool(task *reqTask) {
task.reqCtx.isNewConn = true
conn, err := obj.dial(task.reqCtx)
if err != nil {
task.err = err
if task.reqCtx.option.ErrCallBack != nil {
task.reqCtx.err = err
if err2 := task.reqCtx.option.ErrCallBack(task.reqCtx); err2 != nil {
return true, err2
task.err = err2
}
}
return false, err
task.enableRetry = true
}
if task.err == nil {
obj.putConnPool(task, conn)
}
obj.putConnPool(key, conn)
return false, nil
}
func (obj *roundTripper) closeConns() {
@@ -378,7 +398,7 @@ func (obj *roundTripper) newReqTask(ctx *Response) *reqTask {
}
task := new(reqTask)
task.reqCtx = ctx
task.emptyPool = make(chan struct{})
task.key = getKey(ctx) //pool key
return task
}
func (obj *roundTripper) RoundTrip(ctx *Response) (err error) {
@@ -394,33 +414,27 @@ func (obj *roundTripper) RoundTrip(ctx *Response) (err error) {
return err
}
}
key := getKey(ctx) //pool key
task := obj.newReqTask(ctx)
var isOk bool
for {
currentRetry := 0
for currentRetry := 0; currentRetry <= maxRetryCount; currentRetry++ {
select {
case <-ctx.Context().Done():
return context.Cause(ctx.Context())
default:
}
if task.retry >= maxRetryCount {
obj.poolRoundTrip(task)
if task.err == nil || !task.suppertRetry() {
break
}
if task.isNotice {
currentRetry--
}
}
if currentRetry > maxRetryCount {
if task.err == nil {
task.err = fmt.Errorf("roundTrip retry %d times", maxRetryCount)
break
}
pool := obj.connPools.get(key)
if pool == nil {
isOk, err = obj.createPool(task, key)
} else {
isOk, err = obj.poolRoundTrip(pool, task, key)
}
if isOk {
if err != nil {
task.err = err
}
break
}
if err != nil {
task.retry++
task.err = tools.WrapError(err, fmt.Errorf("roundTrip retry %d times", maxRetryCount))
}
}
if task.err == nil && ctx.option.RequestCallBack != nil {

33
rw.go
View File

@@ -1,30 +1,36 @@
package requests
import (
"context"
"errors"
"io"
"net"
"sync/atomic"
"github.com/gospider007/tools"
)
type readWriteCloser struct {
body io.ReadCloser
err error
conn *connecotr
isClosed bool
isClosed atomic.Bool
}
func (obj *readWriteCloser) Conn() net.Conn {
return obj.conn.Conn.(net.Conn)
}
func (obj *readWriteCloser) Read(p []byte) (n int, err error) {
if obj.isClosed {
if obj.isClosed.Load() {
return 0, obj.err
}
i, err := obj.body.Read(p)
if err == io.EOF {
obj.Close()
if err != nil {
obj.err = err
if err == io.EOF {
obj.Close()
}
}
obj.err = err
return i, err
}
func (obj *readWriteCloser) Proxys() []Address {
@@ -34,8 +40,21 @@ func (obj *readWriteCloser) Proxys() []Address {
var errGospiderBodyClose = errors.New("gospider body close error")
func (obj *readWriteCloser) Close() (err error) {
obj.isClosed = true
obj.conn.bodyCnl(errGospiderBodyClose)
return obj.CloseWithError(nil)
}
func (obj *readWriteCloser) ConnCloseCtx() context.Context {
return obj.conn.Conn.CloseCtx()
}
func (obj *readWriteCloser) CloseWithError(err error) error {
if err == nil {
err = errGospiderBodyClose
obj.err = io.EOF
} else {
err = tools.WrapError(obj.err, err)
obj.err = err
}
obj.isClosed.Store(true)
obj.conn.bodyCnl(err)
return obj.body.Close() //reuse conn
}

View File

@@ -1,11 +1,11 @@
package requests
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"log"
"net"
"net/http"
"net/textproto"
@@ -217,7 +217,13 @@ func redirectBehavior(reqMethod string, resp *http.Response, ireq *http.Request)
var filterHeaderKeys = ja3.DefaultOrderHeadersWithH2()
func httpWrite(r *http.Request, w *bufio.Writer, orderHeaders []string) (err error) {
func (obj *conn) httpWrite(r *http.Request, orderHeaders []string) (err error) {
obj.bodyLock.Lock()
defer obj.bodyLock.Unlock()
if obj.bodyRun.Load() {
log.Print("body already run")
return errors.New("body already run")
}
for i := range orderHeaders {
orderHeaders[i] = textproto.CanonicalMIMEHeaderKey(orderHeaders[i])
}
@@ -250,7 +256,7 @@ func httpWrite(r *http.Request, w *bufio.Writer, orderHeaders []string) (err err
if r.Header.Get("Content-Length") == "" && r.ContentLength != 0 && shouldSendContentLength(r) {
r.Header.Set("Content-Length", fmt.Sprint(r.ContentLength))
}
if _, err = w.WriteString(fmt.Sprintf("%s %s %s\r\n", r.Method, ruri, r.Proto)); err != nil {
if _, err = obj.w.WriteString(fmt.Sprintf("%s %s %s\r\n", r.Method, ruri, r.Proto)); err != nil {
return err
}
for _, k := range orderHeaders {
@@ -262,7 +268,7 @@ func httpWrite(r *http.Request, w *bufio.Writer, orderHeaders []string) (err err
continue
}
for _, v := range vs {
if _, err = w.WriteString(fmt.Sprintf("%s: %s\r\n", k, v)); err != nil {
if _, err = obj.w.WriteString(fmt.Sprintf("%s: %s\r\n", k, v)); err != nil {
return err
}
}
@@ -277,21 +283,31 @@ func httpWrite(r *http.Request, w *bufio.Writer, orderHeaders []string) (err err
continue
}
for _, v := range vs {
if _, err = w.WriteString(fmt.Sprintf("%s: %s\r\n", k, v)); err != nil {
if _, err = obj.w.WriteString(fmt.Sprintf("%s: %s\r\n", k, v)); err != nil {
return err
}
}
}
}
if _, err = w.WriteString("\r\n"); err != nil {
if _, err = obj.w.WriteString("\r\n"); err != nil {
return err
}
if r.Body != nil {
if _, err = io.Copy(w, r.Body); err != nil {
return err
}
if r.Body == nil {
return obj.w.Flush()
}
return w.Flush()
go func() {
obj.bodyRun.Store(true)
defer obj.bodyRun.Store(false)
if _, err = io.Copy(obj.w, r.Body); err != nil {
obj.CloseWithError(tools.WrapError(err, "failed to send request body"))
return
} else if err = obj.w.Flush(); err != nil {
obj.CloseWithError(tools.WrapError(err, "failed to flush request body"))
return
}
}()
return nil
// return obj.w.Flush()
}
type requestBody struct {