This commit is contained in:
gospider
2025-04-16 16:59:51 +08:00
parent 7d1618aca3
commit 0f6556f385
4 changed files with 85 additions and 103 deletions

View File

@@ -22,9 +22,8 @@ import (
type reqTask struct {
bodyCtx context.Context
ctx context.Context
cnl context.CancelFunc
cnl context.CancelCauseFunc
reqCtx *Response
err error
enableRetry bool
disRetry bool
isNotice bool
@@ -143,20 +142,21 @@ func (obj *roundTripper) ghttp3Dial(ctx *Response, remoteAddress Address, proxyA
if ctx.option.UquicConfig != nil {
quicConfig = ctx.option.QuicConfig.Clone()
}
var udpCtx context.Context
if ct, ok := udpConn.(interface {
Context() context.Context
}); ok {
udpCtx = ct.Context()
}
netConn, err := quic.DialEarly(ctx.Context(), udpConn, &net.UDPAddr{IP: remoteAddress.IP, Port: remoteAddress.Port}, tlsConfig, quicConfig)
if err != nil {
return nil, err
}
conn = obj.newConnecotr()
conn.Conn, err = http3.NewClient(udpCtx, netConn, func() {
conn.Conn, err = http3.NewClient(netConn, func() {
conn.forceCnl(errors.New("http3 client close"))
})
if ct, ok := udpConn.(interface {
Context() context.Context
}); ok {
context.AfterFunc(ct.Context(), func() {
conn.forceCnl(errors.New("http3 client close with udp"))
})
}
return
}
@@ -178,13 +178,6 @@ func (obj *roundTripper) uhttp3Dial(ctx *Response, spec uquic.QUICSpec, remoteAd
if ctx.option.UquicConfig != nil {
quicConfig = ctx.option.UquicConfig.Clone()
}
var udpCtx context.Context
if ct, ok := udpConn.(interface {
Context() context.Context
}); ok {
udpCtx = ct.Context()
}
netConn, err := (&uquic.UTransport{
Transport: &uquic.Transport{
Conn: udpConn,
@@ -195,9 +188,16 @@ func (obj *roundTripper) uhttp3Dial(ctx *Response, spec uquic.QUICSpec, remoteAd
return nil, err
}
conn = obj.newConnecotr()
conn.Conn, err = http3.NewClient(udpCtx, netConn, func() {
conn.Conn, err = http3.NewClient(netConn, func() {
conn.forceCnl(errors.New("http3 client close"))
})
if ct, ok := udpConn.(interface {
Context() context.Context
}); ok {
context.AfterFunc(ct.Context(), func() {
conn.forceCnl(errors.New("http3 client close with udp"))
})
}
return
}
@@ -278,7 +278,7 @@ func (obj *roundTripper) dialConnecotr(ctx *Response, conne *connecotr, h2 bool)
return err
}
} else {
conne.Conn = newClientConn(conne.forceCtx, conne.c, func(err error) {
conne.Conn = newClientConn(conne.c, func(err error) {
conne.forceCnl(tools.WrapError(err, "http1 client close"))
})
}
@@ -329,42 +329,43 @@ func (obj *roundTripper) initProxys(ctx *Response) ([]Address, error) {
return proxys, nil
}
func (obj *roundTripper) poolRoundTrip(task *reqTask) {
func (obj *roundTripper) poolRoundTrip(task *reqTask) error {
connPool := obj.connPools.get(task)
if connPool == nil {
obj.newRoudTrip(task)
return
return obj.newRoudTrip(task)
}
task.ctx, task.cnl = context.WithTimeout(task.reqCtx.Context(), task.reqCtx.option.ResponseHeaderTimeout)
task.ctx, task.cnl = context.WithCancelCause(task.reqCtx.Context())
select {
case connPool.tasks <- task:
<-task.ctx.Done()
if task.err == nil && task.reqCtx.response == nil {
task.err = context.Cause(task.ctx)
err := context.Cause(task.ctx)
if errors.Is(err, errNoErr) {
err = nil
}
return err
default:
obj.newRoudTrip(task)
return
return obj.newRoudTrip(task)
}
}
func (obj *roundTripper) newRoudTrip(task *reqTask) {
func (obj *roundTripper) newRoudTrip(task *reqTask) error {
task.reqCtx.isNewConn = true
conn, err := obj.dial(task.reqCtx)
if err != nil {
task.err = tools.WrapError(err, "newRoudTrip dial error")
err = tools.WrapError(err, "newRoudTrip dial error")
if task.reqCtx.option.ErrCallBack != nil {
task.reqCtx.err = err
if err2 := task.reqCtx.option.ErrCallBack(task.reqCtx); err2 != nil {
task.err = err2
err = err2
}
}
task.enableRetry = true
}
if task.err == nil {
if err == nil {
obj.putConnPool(task, conn)
obj.poolRoundTrip(task)
err = obj.poolRoundTrip(task)
}
return err
}
func (obj *roundTripper) closeConns() {
@@ -413,18 +414,18 @@ func (obj *roundTripper) RoundTrip(ctx *Response) (err error) {
if err != nil {
return err
}
obj.poolRoundTrip(task)
if task.err == nil || !task.suppertRetry() {
err = obj.poolRoundTrip(task)
if err == nil || !task.suppertRetry() {
break
}
if task.isNotice {
currentRetry--
}
}
if task.err == nil && ctx.option.RequestCallBack != nil {
if err = ctx.option.RequestCallBack(ctx); err != nil {
task.err = err
if err == nil && ctx.option.RequestCallBack != nil {
if err2 := ctx.option.RequestCallBack(ctx); err2 != nil {
err = err2
}
}
return task.err
return err
}