betteralign

This commit is contained in:
gospider
2025-02-10 23:12:37 +08:00
parent 112a5f9bd4
commit e06234a258
8 changed files with 1720 additions and 1772 deletions

299
client.go
View File

@@ -1,151 +1,148 @@
package requests
import (
"context"
"crypto/tls"
"fmt"
"io"
"net/http"
"github.com/gospider007/gtls"
utls "github.com/refraction-networking/utls"
)
// Connection Management
type Client struct {
ClientOption ClientOption
transport *roundTripper
ctx context.Context
cnl context.CancelFunc
closed bool
}
var defaultClient, _ = NewClient(context.TODO())
// New Connection Management
func NewClient(preCtx context.Context, options ...ClientOption) (*Client, error) {
if preCtx == nil {
preCtx = context.TODO()
}
var option ClientOption
if len(options) > 0 {
option = options[0]
}
result := new(Client)
result.ctx, result.cnl = context.WithCancel(preCtx)
result.transport = newRoundTripper(result.ctx)
result.ClientOption = option
if result.ClientOption.TlsConfig == nil {
result.ClientOption.TlsConfig = &tls.Config{
InsecureSkipVerify: true,
ClientSessionCache: tls.NewLRUClientSessionCache(0),
}
}
if result.ClientOption.UtlsConfig == nil {
result.ClientOption.UtlsConfig = &utls.Config{
InsecureSkipVerify: true,
ClientSessionCache: utls.NewLRUClientSessionCache(0),
InsecureSkipTimeVerify: true,
OmitEmptyPsk: true,
PreferSkipResumptionOnNilExtension: true,
}
}
//cookiesjar
if !result.ClientOption.DisCookie {
if result.ClientOption.Jar == nil {
result.ClientOption.Jar = NewJar()
}
}
var err error
if result.ClientOption.Proxy != "" {
_, err = gtls.VerifyProxy(result.ClientOption.Proxy)
}
return result, err
}
// Close idle connections. If the connection is in use, wait until it ends before closing
func (obj *Client) CloseConns() {
obj.transport.closeConns()
}
// Close the connection, even if it is in use, it will be closed
func (obj *Client) ForceCloseConns() {
obj.transport.forceCloseConns()
}
// Close the client and cannot be used again after shutdown
func (obj *Client) Close() {
obj.closed = true
obj.ForceCloseConns()
obj.cnl()
}
func (obj *Client) do(ctx *Response) (err error) {
var redirectNum int
for {
redirectNum++
err = obj.send(ctx)
if ctx.Request().Body != nil {
ctx.Request().Body.Close()
}
if err != nil {
return
}
if ctx.Option().MaxRedirect < 0 { //dis redirect
return
}
if ctx.Option().MaxRedirect > 0 && redirectNum > ctx.Option().MaxRedirect {
return
}
loc := ctx.response.Header.Get("Location")
if loc == "" {
return nil
}
u, err := ctx.Request().URL.Parse(loc)
if err != nil {
return fmt.Errorf("failed to parse Location header %q: %v", loc, err)
}
ctx.request, err = NewRequestWithContext(ctx.Context(), http.MethodGet, u, nil)
if err != nil {
return err
}
var shouldRedirect bool
ctx.request.Method, shouldRedirect, _ = redirectBehavior(ctx.Request().Method, ctx.response, ctx.request)
if !shouldRedirect {
return nil
}
ctx.request.Response = ctx.response
ctx.request.Header = defaultHeaders()
ctx.request.Header.Set("Referer", ctx.Request().URL.String())
for key := range ctx.request.Header {
if val := ctx.Request().Header.Get(key); val != "" {
ctx.request.Header.Set(key, val)
}
}
if getDomain(u) == getDomain(ctx.Request().URL) {
if Authorization := ctx.Request().Header.Get("Authorization"); Authorization != "" {
ctx.request.Header.Set("Authorization", Authorization)
}
cookies := Cookies(ctx.Request().Cookies()).String()
if cookies != "" {
ctx.request.Header.Set("Cookie", cookies)
}
addCookie(ctx.request, ctx.response.Cookies())
}
io.Copy(io.Discard, ctx.response.Body)
ctx.response.Body.Close()
}
}
func (obj *Client) send(ctx *Response) (err error) {
if ctx.Option().Jar != nil {
addCookie(ctx.Request(), ctx.Option().Jar.GetCookies(ctx.Request().URL))
}
err = obj.transport.RoundTrip(ctx)
if ctx.Option().Jar != nil && ctx.response != nil {
if rc := ctx.response.Cookies(); len(rc) > 0 {
ctx.Option().Jar.SetCookies(ctx.Request().URL, rc)
}
}
return err
}
package requests
import (
"context"
"crypto/tls"
"fmt"
"github.com/gospider007/gtls"
utls "github.com/refraction-networking/utls"
"io"
"net/http"
)
// Connection Management
type Client struct {
ctx context.Context
transport *roundTripper
cnl context.CancelFunc
ClientOption ClientOption
closed bool
}
var defaultClient, _ = NewClient(context.TODO())
// New Connection Management
func NewClient(preCtx context.Context, options ...ClientOption) (*Client, error) {
if preCtx == nil {
preCtx = context.TODO()
}
var option ClientOption
if len(options) > 0 {
option = options[0]
}
result := new(Client)
result.ctx, result.cnl = context.WithCancel(preCtx)
result.transport = newRoundTripper(result.ctx)
result.ClientOption = option
if result.ClientOption.TlsConfig == nil {
result.ClientOption.TlsConfig = &tls.Config{
InsecureSkipVerify: true,
ClientSessionCache: tls.NewLRUClientSessionCache(0),
}
}
if result.ClientOption.UtlsConfig == nil {
result.ClientOption.UtlsConfig = &utls.Config{
InsecureSkipVerify: true,
ClientSessionCache: utls.NewLRUClientSessionCache(0),
InsecureSkipTimeVerify: true,
OmitEmptyPsk: true,
PreferSkipResumptionOnNilExtension: true,
}
}
//cookiesjar
if !result.ClientOption.DisCookie {
if result.ClientOption.Jar == nil {
result.ClientOption.Jar = NewJar()
}
}
var err error
if result.ClientOption.Proxy != "" {
_, err = gtls.VerifyProxy(result.ClientOption.Proxy)
}
return result, err
}
// Close idle connections. If the connection is in use, wait until it ends before closing
func (obj *Client) CloseConns() {
obj.transport.closeConns()
}
// Close the connection, even if it is in use, it will be closed
func (obj *Client) ForceCloseConns() {
obj.transport.forceCloseConns()
}
// Close the client and cannot be used again after shutdown
func (obj *Client) Close() {
obj.closed = true
obj.ForceCloseConns()
obj.cnl()
}
func (obj *Client) do(ctx *Response) (err error) {
var redirectNum int
for {
redirectNum++
err = obj.send(ctx)
if ctx.Request().Body != nil {
ctx.Request().Body.Close()
}
if err != nil {
return
}
if ctx.Option().MaxRedirect < 0 { //dis redirect
return
}
if ctx.Option().MaxRedirect > 0 && redirectNum > ctx.Option().MaxRedirect {
return
}
loc := ctx.response.Header.Get("Location")
if loc == "" {
return nil
}
u, err := ctx.Request().URL.Parse(loc)
if err != nil {
return fmt.Errorf("failed to parse Location header %q: %v", loc, err)
}
ctx.request, err = NewRequestWithContext(ctx.Context(), http.MethodGet, u, nil)
if err != nil {
return err
}
var shouldRedirect bool
ctx.request.Method, shouldRedirect, _ = redirectBehavior(ctx.Request().Method, ctx.response, ctx.request)
if !shouldRedirect {
return nil
}
ctx.request.Response = ctx.response
ctx.request.Header = defaultHeaders()
ctx.request.Header.Set("Referer", ctx.Request().URL.String())
for key := range ctx.request.Header {
if val := ctx.Request().Header.Get(key); val != "" {
ctx.request.Header.Set(key, val)
}
}
if getDomain(u) == getDomain(ctx.Request().URL) {
if Authorization := ctx.Request().Header.Get("Authorization"); Authorization != "" {
ctx.request.Header.Set("Authorization", Authorization)
}
cookies := Cookies(ctx.Request().Cookies()).String()
if cookies != "" {
ctx.request.Header.Set("Cookie", cookies)
}
addCookie(ctx.request, ctx.response.Cookies())
}
io.Copy(io.Discard, ctx.response.Body)
ctx.response.Body.Close()
}
}
func (obj *Client) send(ctx *Response) (err error) {
if ctx.Option().Jar != nil {
addCookie(ctx.Request(), ctx.Option().Jar.GetCookies(ctx.Request().URL))
}
err = obj.transport.RoundTrip(ctx)
if ctx.Option().Jar != nil && ctx.response != nil {
if rc := ctx.response.Cookies(); len(rc) > 0 {
ctx.Option().Jar.SetCookies(ctx.Request().URL, rc)
}
}
return err
}

662
conn.go
View File

@@ -1,339 +1,323 @@
package requests
import (
"bufio"
"context"
"errors"
"io"
"iter"
"net"
"net/http"
"sync"
"sync/atomic"
"time"
"github.com/gospider007/tools"
)
var maxRetryCount = 10
type Conn interface {
CloseWithError(err error) error
DoRequest(*http.Request, []string) (*http.Response, error)
}
type conn struct {
r *bufio.Reader
w *bufio.Writer
pr *pipCon
pw *pipCon
conn net.Conn
closeFunc func()
}
func newConn(ctx context.Context, con net.Conn, closeFunc func()) *conn {
c := &conn{
conn: con,
closeFunc: closeFunc,
}
c.pr, c.pw = pipe(ctx)
c.r = bufio.NewReader(c)
c.w = bufio.NewWriter(c)
go c.run()
return c
}
func (obj *conn) Close() error {
return obj.CloseWithError(nil)
}
func (obj *conn) CloseWithError(err error) error {
if err == nil {
err = errors.New("connecotr closeWithError close")
} else {
err = tools.WrapError(err, "connecotr closeWithError close")
}
if obj.closeFunc != nil {
obj.closeFunc()
}
obj.conn.Close()
return obj.pr.CloseWitError(err)
}
func (obj *conn) DoRequest(req *http.Request, orderHeaders []string) (*http.Response, error) {
err := httpWrite(req, obj.w, orderHeaders)
if err != nil {
return nil, err
}
res, err := http.ReadResponse(obj.r, req)
if err != nil {
err = tools.WrapError(err, "http1 read error")
return nil, err
}
if res == nil {
err = errors.New("response is nil")
}
return res, err
}
func (obj *conn) run() (err error) {
_, err = io.Copy(obj.pw, obj.conn)
return obj.CloseWithError(err)
}
func (obj *conn) Read(b []byte) (i int, err error) {
return obj.pr.Read(b)
}
func (obj *conn) Write(b []byte) (int, error) {
return obj.conn.Write(b)
}
func (obj *conn) LocalAddr() net.Addr {
return obj.conn.LocalAddr()
}
func (obj *conn) RemoteAddr() net.Addr {
return obj.conn.RemoteAddr()
}
func (obj *conn) SetDeadline(t time.Time) error {
return obj.conn.SetDeadline(t)
}
func (obj *conn) SetReadDeadline(t time.Time) error {
return obj.conn.SetReadDeadline(t)
}
func (obj *conn) SetWriteDeadline(t time.Time) error {
return obj.conn.SetWriteDeadline(t)
}
type connecotr struct {
parentForceCtx context.Context //parent force close
forceCtx context.Context //force close
forceCnl context.CancelCauseFunc
safeCtx context.Context //safe close
safeCnl context.CancelCauseFunc
bodyCtx context.Context //body close
bodyCnl context.CancelCauseFunc
Conn Conn
c net.Conn
proxys []Address
}
func (obj *connecotr) withCancel(forceCtx context.Context, safeCtx context.Context) {
obj.parentForceCtx = forceCtx
obj.forceCtx, obj.forceCnl = context.WithCancelCause(forceCtx)
obj.safeCtx, obj.safeCnl = context.WithCancelCause(safeCtx)
}
func (obj *connecotr) Close() error {
return obj.CloseWithError(errors.New("connecotr Close close"))
}
func (obj *connecotr) CloseWithError(err error) error {
err = obj.Conn.CloseWithError(err)
if obj.c != nil {
return obj.c.Close()
}
return err
}
func (obj *connecotr) wrapBody(task *reqTask) {
body := new(readWriteCloser)
obj.bodyCtx, obj.bodyCnl = context.WithCancelCause(task.reqCtx.Context())
body.body = task.reqCtx.response.Body
body.conn = obj
task.reqCtx.response.Body = body
}
func (obj *connecotr) httpReq(task *reqTask, done chan struct{}) {
if task.reqCtx.response, task.err = obj.Conn.DoRequest(task.reqCtx.request, task.reqCtx.option.OrderHeaders); task.reqCtx.response != nil && task.err == nil {
obj.wrapBody(task)
} else if task.err != nil {
task.err = tools.WrapError(task.err, "roundTrip error")
}
close(done)
}
func (obj *connecotr) taskMain(task *reqTask) (retry bool) {
defer func() {
if retry {
task.retry++
if task.retry > maxRetryCount {
retry = false
}
}
if task.err != nil && task.reqCtx.option.ErrCallBack != nil {
task.reqCtx.err = task.err
if err2 := task.reqCtx.option.ErrCallBack(task.reqCtx); err2 != nil {
retry = false
task.err = err2
}
}
if retry {
task.err = nil
obj.CloseWithError(errors.New("taskMain retry close"))
if task.reqCtx.response != nil && task.reqCtx.response.Body != nil {
task.reqCtx.response.Body.Close()
}
} else {
task.cnl()
if task.err == nil && task.reqCtx.response != nil && task.reqCtx.response.Body != nil {
select {
case <-obj.bodyCtx.Done(): //wait body close
if task.err = context.Cause(obj.bodyCtx); !errors.Is(task.err, errGospiderBodyClose) {
task.err = tools.WrapError(task.err, "bodyCtx close")
} else {
task.err = nil
}
case <-task.reqCtx.Context().Done(): //wait request close
task.err = tools.WrapError(context.Cause(task.reqCtx.Context()), "requestCtx close")
case <-obj.forceCtx.Done(): //force conn close
task.err = tools.WrapError(context.Cause(obj.forceCtx), "connecotr force close")
}
if task.reqCtx.option.Logger != nil {
task.reqCtx.option.Logger(Log{
Id: task.reqCtx.requestId,
Time: time.Now(),
Type: LogType_ResponseBody,
Msg: "response body",
})
}
}
if task.err != nil {
obj.CloseWithError(task.err)
if task.reqCtx.response != nil && task.reqCtx.response.Body != nil {
task.reqCtx.response.Body.Close()
}
}
}
}()
select {
case <-obj.safeCtx.Done():
return true
case <-obj.forceCtx.Done(): //force conn close
return true
default:
}
done := make(chan struct{})
go obj.httpReq(task, done)
select {
case <-task.ctx.Done():
task.err = tools.WrapError(context.Cause(task.ctx), "task.ctx error: ")
return false
case <-done:
if task.err != nil {
return task.suppertRetry()
}
if task.reqCtx.response == nil {
task.err = context.Cause(task.ctx)
if task.err == nil {
task.err = errors.New("response is nil")
}
return task.suppertRetry()
}
if task.reqCtx.option.Logger != nil {
task.reqCtx.option.Logger(Log{
Id: task.reqCtx.requestId,
Time: time.Now(),
Type: LogType_ResponseHeader,
Msg: "response header",
})
}
return false
case <-obj.forceCtx.Done(): //force conn close
err := context.Cause(obj.forceCtx)
task.err = tools.WrapError(err, "taskMain delete ctx error: ")
select {
case <-obj.parentForceCtx.Done():
return false
default:
if errors.Is(err, errConnectionForceClosed) {
return false
}
return true
}
}
}
type connPool struct {
forceCtx context.Context
forceCnl context.CancelCauseFunc
safeCtx context.Context
safeCnl context.CancelCauseFunc
connKey string
total atomic.Int64
tasks chan *reqTask
connPools *connPools
}
type connPools struct {
connPools sync.Map
}
func newConnPools() *connPools {
return new(connPools)
}
func (obj *connPools) get(key string) *connPool {
val, ok := obj.connPools.Load(key)
if !ok {
return nil
}
return val.(*connPool)
}
func (obj *connPools) set(key string, pool *connPool) {
obj.connPools.Store(key, pool)
}
func (obj *connPools) del(key string) {
obj.connPools.Delete(key)
}
func (obj *connPools) Range() iter.Seq2[string, *connPool] {
return func(yield func(string, *connPool) bool) {
obj.connPools.Range(func(key, value any) bool {
return yield(key.(string), value.(*connPool))
})
}
}
func (obj *connPool) notice(task *reqTask) {
select {
case obj.tasks <- task:
case task.emptyPool <- struct{}{}:
}
}
func (obj *connPool) rwMain(done chan struct{}, conn *connecotr) {
conn.withCancel(obj.forceCtx, obj.safeCtx)
defer func() {
conn.CloseWithError(errors.New("connPool rwMain close"))
obj.total.Add(-1)
if obj.total.Load() <= 0 {
obj.safeClose()
}
}()
close(done)
for {
select {
case <-conn.safeCtx.Done(): //safe close conn
return
case <-conn.forceCtx.Done(): //force close conn
return
case task := <-obj.tasks: //recv task
if task == nil {
return
}
if conn.taskMain(task) {
obj.notice(task)
return
}
if task.err != nil {
return
}
}
}
}
func (obj *connPool) forceClose() {
obj.safeClose()
obj.forceCnl(errors.New("connPool forceClose"))
}
func (obj *connPool) safeClose() {
obj.connPools.del(obj.connKey)
obj.safeCnl(errors.New("connPool close"))
}
package requests
import (
"bufio"
"context"
"errors"
"github.com/gospider007/tools"
"io"
"iter"
"net"
"net/http"
"sync"
"sync/atomic"
"time"
)
var maxRetryCount = 10
type Conn interface {
CloseWithError(err error) error
DoRequest(*http.Request, []string) (*http.Response, error)
}
type conn struct {
r *bufio.Reader
w *bufio.Writer
pr *pipCon
pw *pipCon
conn net.Conn
closeFunc func()
}
func newConn(ctx context.Context, con net.Conn, closeFunc func()) *conn {
c := &conn{
conn: con,
closeFunc: closeFunc,
}
c.pr, c.pw = pipe(ctx)
c.r = bufio.NewReader(c)
c.w = bufio.NewWriter(c)
go c.run()
return c
}
func (obj *conn) Close() error {
return obj.CloseWithError(nil)
}
func (obj *conn) CloseWithError(err error) error {
if err == nil {
err = errors.New("connecotr closeWithError close")
} else {
err = tools.WrapError(err, "connecotr closeWithError close")
}
if obj.closeFunc != nil {
obj.closeFunc()
}
obj.conn.Close()
return obj.pr.CloseWitError(err)
}
func (obj *conn) DoRequest(req *http.Request, orderHeaders []string) (*http.Response, error) {
err := httpWrite(req, obj.w, orderHeaders)
if err != nil {
return nil, err
}
res, err := http.ReadResponse(obj.r, req)
if err != nil {
err = tools.WrapError(err, "http1 read error")
return nil, err
}
if res == nil {
err = errors.New("response is nil")
}
return res, err
}
func (obj *conn) run() (err error) {
_, err = io.Copy(obj.pw, obj.conn)
return obj.CloseWithError(err)
}
func (obj *conn) Read(b []byte) (i int, err error) {
return obj.pr.Read(b)
}
func (obj *conn) Write(b []byte) (int, error) {
return obj.conn.Write(b)
}
func (obj *conn) LocalAddr() net.Addr {
return obj.conn.LocalAddr()
}
func (obj *conn) RemoteAddr() net.Addr {
return obj.conn.RemoteAddr()
}
func (obj *conn) SetDeadline(t time.Time) error {
return obj.conn.SetDeadline(t)
}
func (obj *conn) SetReadDeadline(t time.Time) error {
return obj.conn.SetReadDeadline(t)
}
func (obj *conn) SetWriteDeadline(t time.Time) error {
return obj.conn.SetWriteDeadline(t)
}
type connecotr struct {
parentForceCtx context.Context //parent force close
forceCtx context.Context //force close
forceCnl context.CancelCauseFunc
safeCtx context.Context //safe close
safeCnl context.CancelCauseFunc
bodyCtx context.Context //body close
bodyCnl context.CancelCauseFunc
Conn Conn
c net.Conn
proxys []Address
}
func (obj *connecotr) withCancel(forceCtx context.Context, safeCtx context.Context) {
obj.parentForceCtx = forceCtx
obj.forceCtx, obj.forceCnl = context.WithCancelCause(forceCtx)
obj.safeCtx, obj.safeCnl = context.WithCancelCause(safeCtx)
}
func (obj *connecotr) Close() error {
return obj.CloseWithError(errors.New("connecotr Close close"))
}
func (obj *connecotr) CloseWithError(err error) error {
err = obj.Conn.CloseWithError(err)
if obj.c != nil {
return obj.c.Close()
}
return err
}
func (obj *connecotr) wrapBody(task *reqTask) {
body := new(readWriteCloser)
obj.bodyCtx, obj.bodyCnl = context.WithCancelCause(task.reqCtx.Context())
body.body = task.reqCtx.response.Body
body.conn = obj
task.reqCtx.response.Body = body
}
func (obj *connecotr) httpReq(task *reqTask, done chan struct{}) {
if task.reqCtx.response, task.err = obj.Conn.DoRequest(task.reqCtx.request, task.reqCtx.option.OrderHeaders); task.reqCtx.response != nil && task.err == nil {
obj.wrapBody(task)
} else if task.err != nil {
task.err = tools.WrapError(task.err, "roundTrip error")
}
close(done)
}
func (obj *connecotr) taskMain(task *reqTask) (retry bool) {
defer func() {
if retry {
task.retry++
if task.retry > maxRetryCount {
retry = false
}
}
if task.err != nil && task.reqCtx.option.ErrCallBack != nil {
task.reqCtx.err = task.err
if err2 := task.reqCtx.option.ErrCallBack(task.reqCtx); err2 != nil {
retry = false
task.err = err2
}
}
if retry {
task.err = nil
obj.CloseWithError(errors.New("taskMain retry close"))
if task.reqCtx.response != nil && task.reqCtx.response.Body != nil {
task.reqCtx.response.Body.Close()
}
} else {
task.cnl()
if task.err == nil && task.reqCtx.response != nil && task.reqCtx.response.Body != nil {
select {
case <-obj.bodyCtx.Done(): //wait body close
if task.err = context.Cause(obj.bodyCtx); !errors.Is(task.err, errGospiderBodyClose) {
task.err = tools.WrapError(task.err, "bodyCtx close")
} else {
task.err = nil
}
case <-task.reqCtx.Context().Done(): //wait request close
task.err = tools.WrapError(context.Cause(task.reqCtx.Context()), "requestCtx close")
case <-obj.forceCtx.Done(): //force conn close
task.err = tools.WrapError(context.Cause(obj.forceCtx), "connecotr force close")
}
if task.reqCtx.option.Logger != nil {
task.reqCtx.option.Logger(Log{
Id: task.reqCtx.requestId,
Time: time.Now(),
Type: LogType_ResponseBody,
Msg: "response body",
})
}
}
if task.err != nil {
obj.CloseWithError(task.err)
if task.reqCtx.response != nil && task.reqCtx.response.Body != nil {
task.reqCtx.response.Body.Close()
}
}
}
}()
select {
case <-obj.safeCtx.Done():
return true
case <-obj.forceCtx.Done(): //force conn close
return true
default:
}
done := make(chan struct{})
go obj.httpReq(task, done)
select {
case <-task.ctx.Done():
task.err = tools.WrapError(context.Cause(task.ctx), "task.ctx error: ")
return false
case <-done:
if task.err != nil {
return task.suppertRetry()
}
if task.reqCtx.response == nil {
task.err = context.Cause(task.ctx)
if task.err == nil {
task.err = errors.New("response is nil")
}
return task.suppertRetry()
}
if task.reqCtx.option.Logger != nil {
task.reqCtx.option.Logger(Log{
Id: task.reqCtx.requestId,
Time: time.Now(),
Type: LogType_ResponseHeader,
Msg: "response header",
})
}
return false
case <-obj.forceCtx.Done(): //force conn close
err := context.Cause(obj.forceCtx)
task.err = tools.WrapError(err, "taskMain delete ctx error: ")
select {
case <-obj.parentForceCtx.Done():
return false
default:
if errors.Is(err, errConnectionForceClosed) {
return false
}
return true
}
}
}
type connPool struct {
forceCtx context.Context
safeCtx context.Context
forceCnl context.CancelCauseFunc
safeCnl context.CancelCauseFunc
tasks chan *reqTask
connPools *connPools
connKey string
total atomic.Int64
}
type connPools struct {
connPools sync.Map
}
func newConnPools() *connPools {
return new(connPools)
}
func (obj *connPools) get(key string) *connPool {
val, ok := obj.connPools.Load(key)
if !ok {
return nil
}
return val.(*connPool)
}
func (obj *connPools) set(key string, pool *connPool) {
obj.connPools.Store(key, pool)
}
func (obj *connPools) del(key string) {
obj.connPools.Delete(key)
}
func (obj *connPools) Range() iter.Seq2[string, *connPool] {
return func(yield func(string, *connPool) bool) {
obj.connPools.Range(func(key, value any) bool {
return yield(key.(string), value.(*connPool))
})
}
}
func (obj *connPool) notice(task *reqTask) {
select {
case obj.tasks <- task:
case task.emptyPool <- struct{}{}:
}
}
func (obj *connPool) rwMain(done chan struct{}, conn *connecotr) {
conn.withCancel(obj.forceCtx, obj.safeCtx)
defer func() {
conn.CloseWithError(errors.New("connPool rwMain close"))
obj.total.Add(-1)
if obj.total.Load() <= 0 {
obj.safeClose()
}
}()
close(done)
for {
select {
case <-conn.safeCtx.Done(): //safe close conn
return
case <-conn.forceCtx.Done(): //force close conn
return
case task := <-obj.tasks: //recv task
if task == nil {
return
}
if conn.taskMain(task) {
obj.notice(task)
return
}
if task.err != nil {
return
}
}
}
}
func (obj *connPool) forceClose() {
obj.safeClose()
obj.forceCnl(errors.New("connPool forceClose"))
}
func (obj *connPool) safeClose() {
obj.connPools.del(obj.connKey)
obj.safeCnl(errors.New("connPool close"))
}

971
dial.go
View File

@@ -1,490 +1,481 @@
package requests
import (
"bufio"
"context"
"crypto/tls"
"errors"
"io"
"net"
"net/url"
"sync"
"time"
"net/http"
"github.com/gospider007/gtls"
"github.com/gospider007/ja3"
"github.com/gospider007/tools"
utls "github.com/refraction-networking/utls"
)
type msgClient struct {
time time.Time
ip net.IP
}
type DialOption struct {
DialTimeout time.Duration
KeepAlive time.Duration
LocalAddr *net.TCPAddr //network card ip
AddrType gtls.AddrType //first ip type
Dns *net.UDPAddr
GetAddrType func(host string) gtls.AddrType
}
type dialer interface {
DialContext(ctx context.Context, network string, address string) (net.Conn, error)
LookupIPAddr(ctx context.Context, host string) ([]net.IPAddr, error)
}
// 自定义dialer
type Dialer struct {
dnsIpData sync.Map
// dialer dialer
}
type myDialer struct {
dialer *net.Dialer
}
func (d *myDialer) DialContext(ctx context.Context, network string, address string) (net.Conn, error) {
return d.dialer.DialContext(ctx, network, address)
}
func (d *myDialer) LookupIPAddr(ctx context.Context, host string) ([]net.IPAddr, error) {
return d.dialer.Resolver.LookupIPAddr(ctx, host)
}
func newDialer(option DialOption) dialer {
if option.KeepAlive == 0 {
option.KeepAlive = time.Second * 5
}
if option.DialTimeout == 0 {
option.DialTimeout = time.Second * 5
}
var dialer myDialer
dialer.dialer = &net.Dialer{
Timeout: option.DialTimeout,
KeepAlive: option.KeepAlive,
LocalAddr: option.LocalAddr,
FallbackDelay: time.Nanosecond,
KeepAliveConfig: net.KeepAliveConfig{
Enable: true,
Idle: time.Second * 5,
Interval: time.Second * 5,
Count: 3,
},
}
if option.LocalAddr != nil {
dialer.dialer.LocalAddr = option.LocalAddr
}
if option.Dns != nil {
dialer.dialer.Resolver = &net.Resolver{
PreferGo: true,
Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
return (&net.Dialer{
Timeout: option.DialTimeout,
KeepAlive: option.KeepAlive,
}).DialContext(ctx, network, option.Dns.String())
},
}
}
dialer.dialer.SetMultipathTCP(true)
return &dialer
}
func (obj *Dialer) dialContext(ctx *Response, network string, addr Address, isProxy bool) (net.Conn, error) {
var err error
if addr.IP == nil {
addr.IP, err = obj.loadHost(ctx, addr.Name)
}
if ctx.option != nil && ctx.option.Logger != nil {
if isProxy {
ctx.option.Logger(Log{
Id: ctx.requestId,
Time: time.Now(),
Type: LogType_ProxyDNSLookup,
Msg: addr.Name,
})
} else {
ctx.option.Logger(Log{
Id: ctx.requestId,
Time: time.Now(),
Type: LogType_DNSLookup,
Msg: addr.Name,
})
}
}
if err != nil {
return nil, err
}
con, err := newDialer(ctx.option.DialOption).DialContext(ctx.Context(), network, addr.String())
if ctx.option != nil && ctx.option.Logger != nil {
if isProxy {
ctx.option.Logger(Log{
Id: ctx.requestId,
Time: time.Now(),
Type: LogType_ProxyTCPConnect,
Msg: addr,
})
} else {
ctx.option.Logger(Log{
Id: ctx.requestId,
Time: time.Now(),
Type: LogType_TCPConnect,
Msg: addr,
})
}
}
return con, err
}
func (obj *Dialer) DialContext(ctx *Response, network string, addr Address) (net.Conn, error) {
return obj.dialContext(ctx, network, addr, false)
}
func (obj *Dialer) ProxyDialContext(ctx *Response, network string, addr Address) (net.Conn, error) {
return obj.dialContext(ctx, network, addr, true)
}
func (obj *Dialer) DialProxyContext(ctx *Response, network string, proxyTlsConfig *tls.Config, proxyUrls ...Address) (net.PacketConn, net.Conn, error) {
proxyLen := len(proxyUrls)
if proxyLen < 2 {
return nil, nil, errors.New("proxyUrls is nil")
}
var conn net.Conn
var err error
var packCon net.PacketConn
for index := range proxyLen - 1 {
oneProxy := proxyUrls[index]
remoteUrl := proxyUrls[index+1]
if index == 0 {
conn, err = obj.dialProxyContext(ctx, network, oneProxy)
if err != nil {
return packCon, conn, err
}
}
packCon, conn, err = obj.verifyProxyToRemote(ctx, conn, proxyTlsConfig, oneProxy, remoteUrl, index == proxyLen-2)
}
return packCon, conn, err
}
func (obj *Dialer) dialProxyContext(ctx *Response, network string, proxyUrl Address) (net.Conn, error) {
return obj.ProxyDialContext(ctx, network, proxyUrl)
}
func (obj *Dialer) verifyProxyToRemote(ctx *Response, conn net.Conn, proxyTlsConfig *tls.Config, proxyAddress Address, remoteAddress Address, isLast bool) (net.PacketConn, net.Conn, error) {
var err error
var packCon net.PacketConn
if proxyAddress.Scheme == "https" {
if conn, err = obj.addTls(ctx.Context(), conn, proxyAddress.Host, true, proxyTlsConfig); err != nil {
return packCon, conn, err
}
if ctx.option.Logger != nil {
ctx.option.Logger(Log{
Id: ctx.requestId,
Time: time.Now(),
Type: LogType_ProxyTLSHandshake,
Msg: proxyAddress.String(),
})
}
}
done := make(chan struct{})
go func() {
switch proxyAddress.Scheme {
case "http", "https":
err = obj.clientVerifyHttps(ctx.Context(), conn, proxyAddress, remoteAddress)
if ctx.option.Logger != nil {
ctx.option.Logger(Log{
Id: ctx.requestId,
Time: time.Now(),
Type: LogType_ProxyConnectRemote,
Msg: remoteAddress.String(),
})
}
case "socks5":
if isLast && ctx.option.H3 {
packCon, err = obj.verifyUDPSocks5(ctx.Context(), conn, proxyAddress, remoteAddress)
} else {
err = obj.verifyTCPSocks5(conn, proxyAddress, remoteAddress)
}
if ctx.option.Logger != nil {
ctx.option.Logger(Log{
Id: ctx.requestId,
Time: time.Now(),
Type: LogType_ProxyConnectRemote,
Msg: remoteAddress.String(),
})
}
}
close(done)
}()
select {
case <-ctx.Context().Done():
return packCon, conn, context.Cause(ctx.Context())
case <-done:
return packCon, conn, err
}
}
func (obj *Dialer) loadHost(ctx *Response, host string) (net.IP, error) {
msgDataAny, ok := obj.dnsIpData.Load(host)
if ok {
msgdata := msgDataAny.(msgClient)
if time.Since(msgdata.time) < time.Second*60*5 {
return msgdata.ip, nil
}
}
ip, ipInt := gtls.ParseHost(host)
if ipInt != 0 {
return ip, nil
}
var addrType gtls.AddrType
if ctx.option.DialOption.AddrType != 0 {
addrType = ctx.option.DialOption.AddrType
} else if ctx.option.DialOption.GetAddrType != nil {
addrType = ctx.option.DialOption.GetAddrType(host)
}
ips, err := newDialer(ctx.option.DialOption).LookupIPAddr(ctx.Context(), host)
if err != nil {
return net.IP{}, err
}
if ip, err = obj.addrToIp(host, ips, addrType); err != nil {
return nil, err
}
return ip, nil
}
func (obj *Dialer) addrToIp(host string, ips []net.IPAddr, addrType gtls.AddrType) (net.IP, error) {
ip, err := obj.lookupIPAddr(ips, addrType)
if err != nil {
return ip, tools.WrapError(err, "addrToIp error,lookupIPAddr")
}
obj.dnsIpData.Store(host, msgClient{time: time.Now(), ip: ip})
return ip, nil
}
func (obj *Dialer) verifySocks5(conn net.Conn, network string, proxyAddr Address, remoteAddr Address) (proxyAddress Address, err error) {
err = obj.verifySocks5Auth(conn, proxyAddr)
if err != nil {
return
}
err = obj.writeCmd(conn, network)
if err != nil {
return
}
remoteAddr.NetWork = network
err = WriteUdpAddr(conn, remoteAddr)
if err != nil {
return
}
readCon := make([]byte, 3)
if _, err = io.ReadFull(conn, readCon); err != nil {
return
}
if readCon[0] != 5 {
err = errors.New("socks version error")
return
}
if readCon[1] != 0 {
err = errors.New("socks conn error")
return
}
proxyAddress, err = ReadUdpAddr(conn)
return
}
func (obj *Dialer) verifyTCPSocks5(conn net.Conn, proxyAddr Address, remoteAddr Address) (err error) {
_, err = obj.verifySocks5(conn, "tcp", proxyAddr, remoteAddr)
return
}
func (obj *Dialer) verifyUDPSocks5(ctx context.Context, conn net.Conn, proxyAddr Address, remoteAddr Address) (wrapConn net.PacketConn, err error) {
remoteAddr.NetWork = "udp"
proxyAddress, err := obj.verifySocks5(conn, "udp", proxyAddr, remoteAddr)
if err != nil {
return nil, err
}
var listener net.ListenConfig
wrapConn, err = listener.ListenPacket(ctx, "udp", ":0")
if err != nil {
return nil, err
}
wrapConn, err = NewUDPConn(wrapConn, &net.UDPAddr{IP: proxyAddress.IP, Port: proxyAddress.Port})
if err != nil {
return wrapConn, err
}
go func() {
var buf [1]byte
for {
_, err := conn.Read(buf[:])
if err != nil {
wrapConn.Close()
break
}
}
}()
return wrapConn, nil
}
func (obj *Dialer) writeCmd(conn net.Conn, network string) (err error) {
var cmd byte
switch network {
case "tcp":
cmd = 1
case "udp":
cmd = 3
default:
return errors.New("not support network")
}
_, err = conn.Write([]byte{5, cmd, 0})
return
}
func (obj *Dialer) verifySocks5Auth(conn net.Conn, proxyAddr Address) (err error) {
if _, err = conn.Write([]byte{5, 2, 0, 2}); err != nil {
return
}
readCon := make([]byte, 2)
if _, err = io.ReadFull(conn, readCon); err != nil {
return
}
switch readCon[1] {
case 2:
if proxyAddr.User == "" || proxyAddr.Password == "" {
err = errors.New("socks5 need auth")
return
}
if _, err = conn.Write(append(
append(
[]byte{1, byte(len(proxyAddr.User))},
tools.StringToBytes(proxyAddr.User)...,
),
append(
[]byte{byte(len(proxyAddr.Password))},
tools.StringToBytes(proxyAddr.Password)...,
)...,
)); err != nil {
return
}
if _, err = io.ReadFull(conn, readCon); err != nil {
return
}
switch readCon[1] {
case 0:
default:
err = errors.New("socks5 auth error")
}
case 0:
default:
err = errors.New("not support auth format")
}
return
}
func (obj *Dialer) lookupIPAddr(ips []net.IPAddr, addrType gtls.AddrType) (net.IP, error) {
for _, ipAddr := range ips {
ip := ipAddr.IP
if ipType := gtls.ParseIp(ip); ipType == 4 || ipType == 6 {
if addrType == 0 || addrType == ipType {
return ip, nil
}
}
}
for _, ipAddr := range ips {
ip := ipAddr.IP
if ipType := gtls.ParseIp(ip); ipType == 4 || ipType == 6 {
return ip, nil
}
}
return nil, errors.New("dns parse host error")
}
func (obj *Dialer) addTls(ctx context.Context, conn net.Conn, host string, h2 bool, tlsConfig *tls.Config) (*tls.Conn, error) {
var tlsConn *tls.Conn
tlsConfig.ServerName = gtls.GetServerName(host)
if h2 {
tlsConfig.NextProtos = []string{"h2", "http/1.1"}
} else {
tlsConfig.NextProtos = []string{"http/1.1"}
}
tlsConn = tls.Client(conn, tlsConfig)
return tlsConn, tlsConn.HandshakeContext(ctx)
}
func (obj *Dialer) addJa3Tls(ctx context.Context, conn net.Conn, host string, h2 bool, spec ja3.Spec, tlsConfig *utls.Config) (*utls.UConn, error) {
tlsConfig.ServerName = gtls.GetServerName(host)
if h2 {
tlsConfig.NextProtos = []string{"h2", "http/1.1"}
} else {
tlsConfig.NextProtos = []string{"http/1.1"}
}
return ja3.NewClient(ctx, conn, spec, h2, tlsConfig)
}
func (obj *Dialer) Socks5TcpProxy(ctx *Response, proxyAddr Address, remoteAddr Address) (conn net.Conn, err error) {
if conn, err = obj.DialContext(ctx, "tcp", proxyAddr); err != nil {
return
}
defer func() {
if err != nil && conn != nil {
conn.Close()
}
}()
didVerify := make(chan struct{})
go func() {
defer close(didVerify)
err = obj.verifyTCPSocks5(conn, proxyAddr, remoteAddr)
}()
select {
case <-ctx.Context().Done():
return conn, context.Cause(ctx.Context())
case <-didVerify:
return
}
}
func (obj *Dialer) Socks5UdpProxy(ctx *Response, proxyAddress Address, remoteAddress Address) (udpConn net.PacketConn, err error) {
conn, err := obj.ProxyDialContext(ctx, "tcp", proxyAddress)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
if conn != nil {
conn.Close()
}
if udpConn != nil {
udpConn.Close()
}
}
}()
didVerify := make(chan struct{})
go func() {
defer close(didVerify)
udpConn, err = obj.verifyUDPSocks5(ctx.Context(), conn, proxyAddress, remoteAddress)
if ctx.option.Logger != nil {
ctx.option.Logger(Log{
Id: ctx.requestId,
Time: time.Now(),
Type: LogType_ProxyConnectRemote,
Msg: remoteAddress.String(),
})
}
}()
select {
case <-ctx.Context().Done():
return udpConn, context.Cause(ctx.Context())
case <-didVerify:
return
}
}
func (obj *Dialer) clientVerifyHttps(ctx context.Context, conn net.Conn, proxyAddress Address, remoteAddress Address) (err error) {
hdr := make(http.Header)
hdr.Set("User-Agent", tools.UserAgent)
if proxyAddress.User != "" && proxyAddress.Password != "" {
hdr.Set("Proxy-Authorization", "Basic "+tools.Base64Encode(proxyAddress.User+":"+proxyAddress.Password))
}
connectReq, err := NewRequestWithContext(ctx, http.MethodConnect, &url.URL{Opaque: remoteAddress.String()}, nil)
if err != nil {
return err
}
connectReq.Header = hdr
connectReq.Host = remoteAddress.Host
if err = connectReq.Write(conn); err != nil {
return err
}
resp, err := http.ReadResponse(bufio.NewReader(conn), connectReq)
if err != nil {
return err
}
if resp.StatusCode != 200 {
return errors.New(resp.Status)
}
return
}
package requests
import (
"bufio"
"context"
"crypto/tls"
"errors"
"github.com/gospider007/gtls"
"github.com/gospider007/ja3"
"github.com/gospider007/tools"
utls "github.com/refraction-networking/utls"
"io"
"net"
"net/http"
"net/url"
"sync"
"time"
)
type msgClient struct {
time time.Time
ip net.IP
}
type DialOption struct {
LocalAddr *net.TCPAddr //network card ip
Dns *net.UDPAddr
GetAddrType func(host string) gtls.AddrType
DialTimeout time.Duration
KeepAlive time.Duration
AddrType gtls.AddrType //first ip type
}
type dialer interface {
DialContext(ctx context.Context, network string, address string) (net.Conn, error)
LookupIPAddr(ctx context.Context, host string) ([]net.IPAddr, error)
}
// 自定义dialer
type Dialer struct {
dnsIpData sync.Map
// dialer dialer
}
type myDialer struct {
dialer *net.Dialer
}
func (d *myDialer) DialContext(ctx context.Context, network string, address string) (net.Conn, error) {
return d.dialer.DialContext(ctx, network, address)
}
func (d *myDialer) LookupIPAddr(ctx context.Context, host string) ([]net.IPAddr, error) {
return d.dialer.Resolver.LookupIPAddr(ctx, host)
}
func newDialer(option DialOption) dialer {
if option.KeepAlive == 0 {
option.KeepAlive = time.Second * 5
}
if option.DialTimeout == 0 {
option.DialTimeout = time.Second * 5
}
var dialer myDialer
dialer.dialer = &net.Dialer{
Timeout: option.DialTimeout,
KeepAlive: option.KeepAlive,
LocalAddr: option.LocalAddr,
FallbackDelay: time.Nanosecond,
KeepAliveConfig: net.KeepAliveConfig{
Enable: true,
Idle: time.Second * 5,
Interval: time.Second * 5,
Count: 3,
},
}
if option.LocalAddr != nil {
dialer.dialer.LocalAddr = option.LocalAddr
}
if option.Dns != nil {
dialer.dialer.Resolver = &net.Resolver{
PreferGo: true,
Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
return (&net.Dialer{
Timeout: option.DialTimeout,
KeepAlive: option.KeepAlive,
}).DialContext(ctx, network, option.Dns.String())
},
}
}
dialer.dialer.SetMultipathTCP(true)
return &dialer
}
func (obj *Dialer) dialContext(ctx *Response, network string, addr Address, isProxy bool) (net.Conn, error) {
var err error
if addr.IP == nil {
addr.IP, err = obj.loadHost(ctx, addr.Name)
}
if ctx.option != nil && ctx.option.Logger != nil {
if isProxy {
ctx.option.Logger(Log{
Id: ctx.requestId,
Time: time.Now(),
Type: LogType_ProxyDNSLookup,
Msg: addr.Name,
})
} else {
ctx.option.Logger(Log{
Id: ctx.requestId,
Time: time.Now(),
Type: LogType_DNSLookup,
Msg: addr.Name,
})
}
}
if err != nil {
return nil, err
}
con, err := newDialer(ctx.option.DialOption).DialContext(ctx.Context(), network, addr.String())
if ctx.option != nil && ctx.option.Logger != nil {
if isProxy {
ctx.option.Logger(Log{
Id: ctx.requestId,
Time: time.Now(),
Type: LogType_ProxyTCPConnect,
Msg: addr,
})
} else {
ctx.option.Logger(Log{
Id: ctx.requestId,
Time: time.Now(),
Type: LogType_TCPConnect,
Msg: addr,
})
}
}
return con, err
}
func (obj *Dialer) DialContext(ctx *Response, network string, addr Address) (net.Conn, error) {
return obj.dialContext(ctx, network, addr, false)
}
func (obj *Dialer) ProxyDialContext(ctx *Response, network string, addr Address) (net.Conn, error) {
return obj.dialContext(ctx, network, addr, true)
}
func (obj *Dialer) DialProxyContext(ctx *Response, network string, proxyTlsConfig *tls.Config, proxyUrls ...Address) (net.PacketConn, net.Conn, error) {
proxyLen := len(proxyUrls)
if proxyLen < 2 {
return nil, nil, errors.New("proxyUrls is nil")
}
var conn net.Conn
var err error
var packCon net.PacketConn
for index := range proxyLen - 1 {
oneProxy := proxyUrls[index]
remoteUrl := proxyUrls[index+1]
if index == 0 {
conn, err = obj.dialProxyContext(ctx, network, oneProxy)
if err != nil {
return packCon, conn, err
}
}
packCon, conn, err = obj.verifyProxyToRemote(ctx, conn, proxyTlsConfig, oneProxy, remoteUrl, index == proxyLen-2)
}
return packCon, conn, err
}
func (obj *Dialer) dialProxyContext(ctx *Response, network string, proxyUrl Address) (net.Conn, error) {
return obj.ProxyDialContext(ctx, network, proxyUrl)
}
func (obj *Dialer) verifyProxyToRemote(ctx *Response, conn net.Conn, proxyTlsConfig *tls.Config, proxyAddress Address, remoteAddress Address, isLast bool) (net.PacketConn, net.Conn, error) {
var err error
var packCon net.PacketConn
if proxyAddress.Scheme == "https" {
if conn, err = obj.addTls(ctx.Context(), conn, proxyAddress.Host, true, proxyTlsConfig); err != nil {
return packCon, conn, err
}
if ctx.option.Logger != nil {
ctx.option.Logger(Log{
Id: ctx.requestId,
Time: time.Now(),
Type: LogType_ProxyTLSHandshake,
Msg: proxyAddress.String(),
})
}
}
done := make(chan struct{})
go func() {
switch proxyAddress.Scheme {
case "http", "https":
err = obj.clientVerifyHttps(ctx.Context(), conn, proxyAddress, remoteAddress)
if ctx.option.Logger != nil {
ctx.option.Logger(Log{
Id: ctx.requestId,
Time: time.Now(),
Type: LogType_ProxyConnectRemote,
Msg: remoteAddress.String(),
})
}
case "socks5":
if isLast && ctx.option.H3 {
packCon, err = obj.verifyUDPSocks5(ctx.Context(), conn, proxyAddress, remoteAddress)
} else {
err = obj.verifyTCPSocks5(conn, proxyAddress, remoteAddress)
}
if ctx.option.Logger != nil {
ctx.option.Logger(Log{
Id: ctx.requestId,
Time: time.Now(),
Type: LogType_ProxyConnectRemote,
Msg: remoteAddress.String(),
})
}
}
close(done)
}()
select {
case <-ctx.Context().Done():
return packCon, conn, context.Cause(ctx.Context())
case <-done:
return packCon, conn, err
}
}
func (obj *Dialer) loadHost(ctx *Response, host string) (net.IP, error) {
msgDataAny, ok := obj.dnsIpData.Load(host)
if ok {
msgdata := msgDataAny.(msgClient)
if time.Since(msgdata.time) < time.Second*60*5 {
return msgdata.ip, nil
}
}
ip, ipInt := gtls.ParseHost(host)
if ipInt != 0 {
return ip, nil
}
var addrType gtls.AddrType
if ctx.option.DialOption.AddrType != 0 {
addrType = ctx.option.DialOption.AddrType
} else if ctx.option.DialOption.GetAddrType != nil {
addrType = ctx.option.DialOption.GetAddrType(host)
}
ips, err := newDialer(ctx.option.DialOption).LookupIPAddr(ctx.Context(), host)
if err != nil {
return net.IP{}, err
}
if ip, err = obj.addrToIp(host, ips, addrType); err != nil {
return nil, err
}
return ip, nil
}
func (obj *Dialer) addrToIp(host string, ips []net.IPAddr, addrType gtls.AddrType) (net.IP, error) {
ip, err := obj.lookupIPAddr(ips, addrType)
if err != nil {
return ip, tools.WrapError(err, "addrToIp error,lookupIPAddr")
}
obj.dnsIpData.Store(host, msgClient{time: time.Now(), ip: ip})
return ip, nil
}
func (obj *Dialer) verifySocks5(conn net.Conn, network string, proxyAddr Address, remoteAddr Address) (proxyAddress Address, err error) {
err = obj.verifySocks5Auth(conn, proxyAddr)
if err != nil {
return
}
err = obj.writeCmd(conn, network)
if err != nil {
return
}
remoteAddr.NetWork = network
err = WriteUdpAddr(conn, remoteAddr)
if err != nil {
return
}
readCon := make([]byte, 3)
if _, err = io.ReadFull(conn, readCon); err != nil {
return
}
if readCon[0] != 5 {
err = errors.New("socks version error")
return
}
if readCon[1] != 0 {
err = errors.New("socks conn error")
return
}
proxyAddress, err = ReadUdpAddr(conn)
return
}
func (obj *Dialer) verifyTCPSocks5(conn net.Conn, proxyAddr Address, remoteAddr Address) (err error) {
_, err = obj.verifySocks5(conn, "tcp", proxyAddr, remoteAddr)
return
}
func (obj *Dialer) verifyUDPSocks5(ctx context.Context, conn net.Conn, proxyAddr Address, remoteAddr Address) (wrapConn net.PacketConn, err error) {
remoteAddr.NetWork = "udp"
proxyAddress, err := obj.verifySocks5(conn, "udp", proxyAddr, remoteAddr)
if err != nil {
return nil, err
}
var listener net.ListenConfig
wrapConn, err = listener.ListenPacket(ctx, "udp", ":0")
if err != nil {
return nil, err
}
wrapConn, err = NewUDPConn(wrapConn, &net.UDPAddr{IP: proxyAddress.IP, Port: proxyAddress.Port})
if err != nil {
return wrapConn, err
}
go func() {
var buf [1]byte
for {
_, err := conn.Read(buf[:])
if err != nil {
wrapConn.Close()
break
}
}
}()
return wrapConn, nil
}
func (obj *Dialer) writeCmd(conn net.Conn, network string) (err error) {
var cmd byte
switch network {
case "tcp":
cmd = 1
case "udp":
cmd = 3
default:
return errors.New("not support network")
}
_, err = conn.Write([]byte{5, cmd, 0})
return
}
func (obj *Dialer) verifySocks5Auth(conn net.Conn, proxyAddr Address) (err error) {
if _, err = conn.Write([]byte{5, 2, 0, 2}); err != nil {
return
}
readCon := make([]byte, 2)
if _, err = io.ReadFull(conn, readCon); err != nil {
return
}
switch readCon[1] {
case 2:
if proxyAddr.User == "" || proxyAddr.Password == "" {
err = errors.New("socks5 need auth")
return
}
if _, err = conn.Write(append(
append(
[]byte{1, byte(len(proxyAddr.User))},
tools.StringToBytes(proxyAddr.User)...,
),
append(
[]byte{byte(len(proxyAddr.Password))},
tools.StringToBytes(proxyAddr.Password)...,
)...,
)); err != nil {
return
}
if _, err = io.ReadFull(conn, readCon); err != nil {
return
}
switch readCon[1] {
case 0:
default:
err = errors.New("socks5 auth error")
}
case 0:
default:
err = errors.New("not support auth format")
}
return
}
func (obj *Dialer) lookupIPAddr(ips []net.IPAddr, addrType gtls.AddrType) (net.IP, error) {
for _, ipAddr := range ips {
ip := ipAddr.IP
if ipType := gtls.ParseIp(ip); ipType == 4 || ipType == 6 {
if addrType == 0 || addrType == ipType {
return ip, nil
}
}
}
for _, ipAddr := range ips {
ip := ipAddr.IP
if ipType := gtls.ParseIp(ip); ipType == 4 || ipType == 6 {
return ip, nil
}
}
return nil, errors.New("dns parse host error")
}
func (obj *Dialer) addTls(ctx context.Context, conn net.Conn, host string, h2 bool, tlsConfig *tls.Config) (*tls.Conn, error) {
var tlsConn *tls.Conn
tlsConfig.ServerName = gtls.GetServerName(host)
if h2 {
tlsConfig.NextProtos = []string{"h2", "http/1.1"}
} else {
tlsConfig.NextProtos = []string{"http/1.1"}
}
tlsConn = tls.Client(conn, tlsConfig)
return tlsConn, tlsConn.HandshakeContext(ctx)
}
func (obj *Dialer) addJa3Tls(ctx context.Context, conn net.Conn, host string, h2 bool, spec ja3.Spec, tlsConfig *utls.Config) (*utls.UConn, error) {
tlsConfig.ServerName = gtls.GetServerName(host)
if h2 {
tlsConfig.NextProtos = []string{"h2", "http/1.1"}
} else {
tlsConfig.NextProtos = []string{"http/1.1"}
}
return ja3.NewClient(ctx, conn, spec, h2, tlsConfig)
}
func (obj *Dialer) Socks5TcpProxy(ctx *Response, proxyAddr Address, remoteAddr Address) (conn net.Conn, err error) {
if conn, err = obj.DialContext(ctx, "tcp", proxyAddr); err != nil {
return
}
defer func() {
if err != nil && conn != nil {
conn.Close()
}
}()
didVerify := make(chan struct{})
go func() {
defer close(didVerify)
err = obj.verifyTCPSocks5(conn, proxyAddr, remoteAddr)
}()
select {
case <-ctx.Context().Done():
return conn, context.Cause(ctx.Context())
case <-didVerify:
return
}
}
func (obj *Dialer) Socks5UdpProxy(ctx *Response, proxyAddress Address, remoteAddress Address) (udpConn net.PacketConn, err error) {
conn, err := obj.ProxyDialContext(ctx, "tcp", proxyAddress)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
if conn != nil {
conn.Close()
}
if udpConn != nil {
udpConn.Close()
}
}
}()
didVerify := make(chan struct{})
go func() {
defer close(didVerify)
udpConn, err = obj.verifyUDPSocks5(ctx.Context(), conn, proxyAddress, remoteAddress)
if ctx.option.Logger != nil {
ctx.option.Logger(Log{
Id: ctx.requestId,
Time: time.Now(),
Type: LogType_ProxyConnectRemote,
Msg: remoteAddress.String(),
})
}
}()
select {
case <-ctx.Context().Done():
return udpConn, context.Cause(ctx.Context())
case <-didVerify:
return
}
}
func (obj *Dialer) clientVerifyHttps(ctx context.Context, conn net.Conn, proxyAddress Address, remoteAddress Address) (err error) {
hdr := make(http.Header)
hdr.Set("User-Agent", tools.UserAgent)
if proxyAddress.User != "" && proxyAddress.Password != "" {
hdr.Set("Proxy-Authorization", "Basic "+tools.Base64Encode(proxyAddress.User+":"+proxyAddress.Password))
}
connectReq, err := NewRequestWithContext(ctx, http.MethodConnect, &url.URL{Opaque: remoteAddress.String()}, nil)
if err != nil {
return err
}
connectReq.Header = hdr
connectReq.Host = remoteAddress.Host
if err = connectReq.Write(conn); err != nil {
return err
}
resp, err := http.ReadResponse(bufio.NewReader(conn), connectReq)
if err != nil {
return err
}
if resp.StatusCode != 200 {
return errors.New(resp.Status)
}
return
}

455
option.go
View File

@@ -1,234 +1,221 @@
package requests
import (
"context"
"crypto/tls"
"io"
"net/url"
"time"
"github.com/gospider007/ja3"
"github.com/gospider007/tools"
"github.com/gospider007/websocket"
"github.com/quic-go/quic-go"
uquic "github.com/refraction-networking/uquic"
utls "github.com/refraction-networking/utls"
)
type LogType string
const (
LogType_DNSLookup LogType = "DNSLookup"
LogType_TCPConnect LogType = "TCPConnect"
LogType_TLSHandshake LogType = "TLSHandshake"
LogType_ProxyDNSLookup LogType = "ProxyDNSLookup"
LogType_ProxyTCPConnect LogType = "ProxyTCPConnect"
LogType_ProxyTLSHandshake LogType = "ProxyTLSHandshake"
LogType_ProxyConnectRemote LogType = "ProxyConnectRemote"
LogType_ResponseHeader LogType = "ResponseHeader"
LogType_ResponseBody LogType = "ResponseBody"
)
type Log struct {
Id string `json:"id"`
Type LogType `json:"type"`
Time time.Time
Msg any `json:"msg"`
}
// Connection Management Options
type ClientOption struct {
Logger func(Log) //debuggable
H3 bool //开启http3
OrderHeaders []string //order headers
Ja3Spec ja3.Spec //custom ja3Spec,use ja3.CreateSpecWithStr or ja3.CreateSpecWithId create
H2Ja3Spec ja3.H2Spec //h2 fingerprint
UJa3Spec ja3.USpec //h3 fingerprint
Proxy string //proxy,support https,http,socks5
Proxys []string //proxy list,support https,http,socks5
ForceHttp1 bool //force use http1 send requests
Ja3 bool //enable ja3 fingerprint
DisCookie bool //disable cookies
DisDecode bool //disable auto decode
Bar bool ////enable bar display
OptionCallBack func(ctx *Response) error //option callback,if error is returnd, break request
ResultCallBack func(ctx *Response) error //result callback,if error is returnd,next errCallback
ErrCallBack func(ctx *Response) error //error callback,if error is returnd,break request
RequestCallBack func(ctx *Response) error //request and response callback,if error is returnd,reponse is error
MaxRetries int //try num
MaxRedirect int //redirect num ,<0 no redirect,==0 no limit
Headers any //default headers
Timeout time.Duration //request timeout
ResponseHeaderTimeout time.Duration //ResponseHeaderTimeout ,default:300
TlsHandshakeTimeout time.Duration //tls timeout,default:15
UserAgent string //headers User-Agent value
GetProxy func(ctx *Response) (string, error) //proxy callback:support https,http,socks5 proxy
GetProxys func(ctx *Response) ([]string, error) //proxys callback:support https,http,socks5 proxy
DialOption DialOption
Jar Jar //custom cookies
TlsConfig *tls.Config
UtlsConfig *utls.Config
QuicConfig *quic.Config
UquicConfig *uquic.Config
}
// Options for sending requests
type RequestOption struct {
ClientOption
// other option
Method string //method
Url *url.URL
Host string
Referer string //set headers referer value
ContentType string //headers Content-Type value
Cookies any // cookies,support :json,map,strhttp.Header
Params any //url paramsjoin url query,json,map
Json any //send application/json,support io.Reader,string,[]bytes,json,map
Data any //send application/x-www-form-urlencoded, support io.Reader, string,[]bytes,json,map
Form any //send multipart/form-data,file upload,support io.Reader, json,map
Text any //send text/xml,support: io.Reader, string,[]bytes,json,map
Body any //not setting context-type,support io.Reader, string,[]bytes,json,map
Stream bool //disable auto read
WsOption websocket.Option //websocket option
DisProxy bool //force disable proxy
once bool
}
// Upload files with form-data,
type File struct {
FileName string
ContentType string
Content any
}
func (obj *RequestOption) initBody(ctx context.Context) (io.Reader, error) {
if obj.Body != nil {
body, _, _, err := obj.newBody(obj.Body, readType)
if err != nil || body == nil {
return nil, err
}
return body, err
} else if obj.Form != nil {
var orderMap *OrderMap
_, orderMap, _, err := obj.newBody(obj.Form, mapType)
if err != nil {
return nil, err
}
if orderMap == nil {
return nil, nil
}
body, contentType, once, err := orderMap.parseForm(ctx)
if err != nil {
return nil, err
}
obj.once = once
if obj.ContentType == "" {
obj.ContentType = contentType
}
if body == nil {
return nil, nil
}
return body, nil
} else if obj.Data != nil {
body, orderMap, _, err := obj.newBody(obj.Data, mapType)
if err != nil {
return body, err
}
if obj.ContentType == "" {
obj.ContentType = "application/x-www-form-urlencoded"
}
if body != nil {
return body, nil
}
if orderMap == nil {
return nil, nil
}
body2 := orderMap.parseData()
if body2 == nil {
return nil, nil
}
return body2, nil
} else if obj.Json != nil {
body, _, _, err := obj.newBody(obj.Json, readType)
if err != nil {
return nil, err
}
if obj.ContentType == "" {
obj.ContentType = "application/json"
}
if body == nil {
return nil, nil
}
return body, nil
} else if obj.Text != nil {
body, _, _, err := obj.newBody(obj.Text, readType)
if err != nil {
return nil, err
}
if obj.ContentType == "" {
obj.ContentType = "text/plain"
}
if body == nil {
return nil, nil
}
return body, nil
} else {
return nil, nil
}
}
func (obj *RequestOption) initParams() (*url.URL, error) {
baseUrl := cloneUrl(obj.Url)
if obj.Params == nil {
return baseUrl, nil
}
_, dataMap, _, err := obj.newBody(obj.Params, mapType)
if err != nil {
return nil, err
}
query := dataMap.parseParams().String()
if query == "" {
return baseUrl, nil
}
pquery := baseUrl.Query().Encode()
if pquery == "" {
baseUrl.RawQuery = query
} else {
baseUrl.RawQuery = pquery + "&" + query
}
return baseUrl, nil
}
func (obj *Client) newRequestOption(option RequestOption) (RequestOption, error) {
err := tools.Merge(&option, obj.ClientOption)
//end
if option.MaxRetries < 0 {
option.MaxRetries = 0
}
if !option.Ja3Spec.IsSet() && option.Ja3 {
option.Ja3Spec = ja3.DefaultSpec()
}
if !option.UJa3Spec.IsSet() && option.Ja3 {
option.UJa3Spec = ja3.DefaultUSpec()
}
if option.UserAgent == "" {
option.UserAgent = obj.ClientOption.UserAgent
}
if option.DisCookie {
option.Jar = nil
}
if option.DisProxy {
option.Proxy = ""
}
return option, err
}
package requests
import (
"context"
"crypto/tls"
"github.com/gospider007/ja3"
"github.com/gospider007/tools"
"github.com/gospider007/websocket"
"github.com/quic-go/quic-go"
uquic "github.com/refraction-networking/uquic"
utls "github.com/refraction-networking/utls"
"io"
"net/url"
"time"
)
type LogType string
const (
LogType_DNSLookup LogType = "DNSLookup"
LogType_TCPConnect LogType = "TCPConnect"
LogType_TLSHandshake LogType = "TLSHandshake"
LogType_ProxyDNSLookup LogType = "ProxyDNSLookup"
LogType_ProxyTCPConnect LogType = "ProxyTCPConnect"
LogType_ProxyTLSHandshake LogType = "ProxyTLSHandshake"
LogType_ProxyConnectRemote LogType = "ProxyConnectRemote"
LogType_ResponseHeader LogType = "ResponseHeader"
LogType_ResponseBody LogType = "ResponseBody"
)
type Log struct {
Time time.Time
Msg any `json:"msg"`
Id string `json:"id"`
Type LogType `json:"type"`
}
// Connection Management Options
type ClientOption struct {
Ja3Spec ja3.Spec //custom ja3Spec,use ja3.CreateSpecWithStr or ja3.CreateSpecWithId create
DialOption DialOption
Headers any //default headers
Jar Jar //custom cookies
Logger func(Log) //debuggable
OptionCallBack func(ctx *Response) error //option callback,if error is returnd, break request
ResultCallBack func(ctx *Response) error //result callback,if error is returnd,next errCallback
ErrCallBack func(ctx *Response) error //error callback,if error is returnd,break request
RequestCallBack func(ctx *Response) error //request and response callback,if error is returnd,reponse is error
GetProxy func(ctx *Response) (string, error) //proxy callback:support https,http,socks5 proxy
GetProxys func(ctx *Response) ([]string, error) //proxys callback:support https,http,socks5 proxy
TlsConfig *tls.Config
UtlsConfig *utls.Config
QuicConfig *quic.Config
UquicConfig *uquic.Config
UJa3Spec ja3.USpec //h3 fingerprint
Proxy string //proxy,support https,http,socks5
UserAgent string //headers User-Agent value
OrderHeaders []string //order headers
Proxys []string //proxy list,support https,http,socks5
H2Ja3Spec ja3.H2Spec //h2 fingerprint
MaxRetries int //try num
MaxRedirect int //redirect num ,<0 no redirect,==0 no limit
Timeout time.Duration //request timeout
ResponseHeaderTimeout time.Duration //ResponseHeaderTimeout ,default:300
TlsHandshakeTimeout time.Duration //tls timeout,default:15
H3 bool //开启http3
ForceHttp1 bool //force use http1 send requests
Ja3 bool //enable ja3 fingerprint
DisCookie bool //disable cookies
DisDecode bool //disable auto decode
Bar bool ////enable bar display
}
// Options for sending requests
type RequestOption struct {
WsOption websocket.Option //websocket option
Cookies any // cookies,support :json,map,strhttp.Header
Params any //url paramsjoin url query,json,map
Json any //send application/json,support io.Reader,string,[]bytes,json,map
Data any //send application/x-www-form-urlencoded, support io.Reader, string,[]bytes,json,map
Form any //send multipart/form-data,file upload,support io.Reader, json,map
Text any //send text/xml,support: io.Reader, string,[]bytes,json,map
Body any //not setting context-type,support io.Reader, string,[]bytes,json,map
Url *url.URL
// other option
Method string //method
Host string
Referer string //set headers referer value
ContentType string //headers Content-Type value
ClientOption
Stream bool //disable auto read
DisProxy bool //force disable proxy
once bool
}
// Upload files with form-data,
type File struct {
Content any
FileName string
ContentType string
}
func (obj *RequestOption) initBody(ctx context.Context) (io.Reader, error) {
if obj.Body != nil {
body, _, _, err := obj.newBody(obj.Body, readType)
if err != nil || body == nil {
return nil, err
}
return body, err
} else if obj.Form != nil {
var orderMap *OrderMap
_, orderMap, _, err := obj.newBody(obj.Form, mapType)
if err != nil {
return nil, err
}
if orderMap == nil {
return nil, nil
}
body, contentType, once, err := orderMap.parseForm(ctx)
if err != nil {
return nil, err
}
obj.once = once
if obj.ContentType == "" {
obj.ContentType = contentType
}
if body == nil {
return nil, nil
}
return body, nil
} else if obj.Data != nil {
body, orderMap, _, err := obj.newBody(obj.Data, mapType)
if err != nil {
return body, err
}
if obj.ContentType == "" {
obj.ContentType = "application/x-www-form-urlencoded"
}
if body != nil {
return body, nil
}
if orderMap == nil {
return nil, nil
}
body2 := orderMap.parseData()
if body2 == nil {
return nil, nil
}
return body2, nil
} else if obj.Json != nil {
body, _, _, err := obj.newBody(obj.Json, readType)
if err != nil {
return nil, err
}
if obj.ContentType == "" {
obj.ContentType = "application/json"
}
if body == nil {
return nil, nil
}
return body, nil
} else if obj.Text != nil {
body, _, _, err := obj.newBody(obj.Text, readType)
if err != nil {
return nil, err
}
if obj.ContentType == "" {
obj.ContentType = "text/plain"
}
if body == nil {
return nil, nil
}
return body, nil
} else {
return nil, nil
}
}
func (obj *RequestOption) initParams() (*url.URL, error) {
baseUrl := cloneUrl(obj.Url)
if obj.Params == nil {
return baseUrl, nil
}
_, dataMap, _, err := obj.newBody(obj.Params, mapType)
if err != nil {
return nil, err
}
query := dataMap.parseParams().String()
if query == "" {
return baseUrl, nil
}
pquery := baseUrl.Query().Encode()
if pquery == "" {
baseUrl.RawQuery = query
} else {
baseUrl.RawQuery = pquery + "&" + query
}
return baseUrl, nil
}
func (obj *Client) newRequestOption(option RequestOption) (RequestOption, error) {
err := tools.Merge(&option, obj.ClientOption)
//end
if option.MaxRetries < 0 {
option.MaxRetries = 0
}
if !option.Ja3Spec.IsSet() && option.Ja3 {
option.Ja3Spec = ja3.DefaultSpec()
}
if !option.UJa3Spec.IsSet() && option.Ja3 {
option.UJa3Spec = ja3.DefaultUSpec()
}
if option.UserAgent == "" {
option.UserAgent = obj.ClientOption.UserAgent
}
if option.DisCookie {
option.Jar = nil
}
if option.DisProxy {
option.Proxy = ""
}
return option, err
}

164
pip.go
View File

@@ -1,83 +1,81 @@
package requests
import (
"context"
"sync"
)
type pipCon struct {
reader <-chan []byte
writer chan<- []byte
readerI <-chan int
writerI chan<- int
lock sync.Mutex
ctx context.Context
cnl context.CancelCauseFunc
}
func (obj *pipCon) Read(b []byte) (n int, err error) {
select {
case con := <-obj.reader:
n = copy(b, con)
select {
case obj.writerI <- n:
return
case <-obj.ctx.Done():
return n, context.Cause(obj.ctx)
}
case <-obj.ctx.Done():
return n, context.Cause(obj.ctx)
}
}
func (obj *pipCon) Write(b []byte) (n int, err error) {
obj.lock.Lock()
defer obj.lock.Unlock()
for once := true; once || len(b) > 0; once = false {
select {
case obj.writer <- b:
select {
case i := <-obj.readerI:
b = b[i:]
n += i
case <-obj.ctx.Done():
return n, context.Cause(obj.ctx)
}
case <-obj.ctx.Done():
return n, context.Cause(obj.ctx)
}
}
return
}
func (obj *pipCon) CloseWitError(err error) error {
obj.cnl(err)
return nil
}
func (obj *pipCon) Close() error {
return obj.CloseWitError(nil)
}
func pipe(preCtx context.Context) (*pipCon, *pipCon) {
ctx, cnl := context.WithCancelCause(preCtx)
readerCha := make(chan []byte)
writerCha := make(chan []byte)
readerI := make(chan int)
writerI := make(chan int)
localpipCon := &pipCon{
reader: readerCha,
readerI: readerI,
writer: writerCha,
writerI: writerI,
ctx: ctx,
cnl: cnl,
}
remotepipCon := &pipCon{
reader: writerCha,
readerI: writerI,
writer: readerCha,
writerI: readerI,
ctx: ctx,
cnl: cnl,
}
return localpipCon, remotepipCon
}
package requests
import (
"context"
"sync"
)
type pipCon struct {
ctx context.Context
reader <-chan []byte
writer chan<- []byte
readerI <-chan int
writerI chan<- int
cnl context.CancelCauseFunc
lock sync.Mutex
}
func (obj *pipCon) Read(b []byte) (n int, err error) {
select {
case con := <-obj.reader:
n = copy(b, con)
select {
case obj.writerI <- n:
return
case <-obj.ctx.Done():
return n, context.Cause(obj.ctx)
}
case <-obj.ctx.Done():
return n, context.Cause(obj.ctx)
}
}
func (obj *pipCon) Write(b []byte) (n int, err error) {
obj.lock.Lock()
defer obj.lock.Unlock()
for once := true; once || len(b) > 0; once = false {
select {
case obj.writer <- b:
select {
case i := <-obj.readerI:
b = b[i:]
n += i
case <-obj.ctx.Done():
return n, context.Cause(obj.ctx)
}
case <-obj.ctx.Done():
return n, context.Cause(obj.ctx)
}
}
return
}
func (obj *pipCon) CloseWitError(err error) error {
obj.cnl(err)
return nil
}
func (obj *pipCon) Close() error {
return obj.CloseWitError(nil)
}
func pipe(preCtx context.Context) (*pipCon, *pipCon) {
ctx, cnl := context.WithCancelCause(preCtx)
readerCha := make(chan []byte)
writerCha := make(chan []byte)
readerI := make(chan int)
writerI := make(chan int)
localpipCon := &pipCon{
reader: readerCha,
readerI: readerI,
writer: writerCha,
writerI: writerI,
ctx: ctx,
cnl: cnl,
}
remotepipCon := &pipCon{
reader: writerCha,
readerI: writerI,
writer: readerCha,
writerI: readerI,
ctx: ctx,
cnl: cnl,
}
return localpipCon, remotepipCon
}

View File

@@ -1,418 +1,409 @@
package requests
import (
"bufio"
"bytes"
"context"
"errors"
"io"
"iter"
"net/url"
"strconv"
"strings"
"net/http"
"github.com/gospider007/bar"
"github.com/gospider007/bs4"
"github.com/gospider007/gson"
"github.com/gospider007/re"
"github.com/gospider007/tools"
"github.com/gospider007/websocket"
)
func NewResponse(ctx context.Context, option RequestOption) *Response {
return &Response{
ctx: ctx,
option: &option,
}
}
func (obj *Response) Err() error {
if obj.err != nil {
return obj.err
}
if obj.request != nil {
return obj.request.Context().Err()
}
return obj.ctx.Err()
}
func (obj *Response) Request() *http.Request {
return obj.request
}
func (obj *Response) Response() *http.Response {
return obj.response
}
func (obj *Response) Context() context.Context {
if obj.request != nil {
return obj.request.Context()
}
return obj.ctx
}
func (obj *Response) Option() *RequestOption {
return obj.option
}
func (obj *Response) Client() *Client {
return obj.client
}
type Response struct {
err error
request *http.Request
rawConn *readWriteCloser
response *http.Response
webSocket *websocket.Conn
sse *SSE
ctx context.Context
cnl context.CancelFunc
option *RequestOption
content []byte
encoding string
filePath string
readBody bool
client *Client
requestId string
proxys []*url.URL
isNewConn bool
}
type SSE struct {
reader *bufio.Reader
response *Response
}
type Event struct {
Data string //data
Event string //event
Id string //id
Retry int //retry num
Comment string //comment info
}
func newSSE(response *Response) *SSE {
return &SSE{response: response, reader: bufio.NewReader(response.Body())}
}
// recv SSE envent data
func (obj *SSE) Recv() (Event, error) {
var event Event
for {
readStr, err := obj.reader.ReadString('\n')
if err != nil || readStr == "\n" {
return event, err
}
reResult := re.Search(`data:\s?(.*)`, readStr)
if reResult != nil {
event.Data += reResult.Group(1)
continue
}
reResult = re.Search(`event:\s?(.*)`, readStr)
if reResult != nil {
event.Event = reResult.Group(1)
continue
}
reResult = re.Search(`id:\s?(.*)`, readStr)
if reResult != nil {
event.Id = reResult.Group(1)
continue
}
reResult = re.Search(`retry:\s?(.*)`, readStr)
if reResult != nil {
if event.Retry, err = strconv.Atoi(reResult.Group(1)); err != nil {
return event, err
}
continue
}
reResult = re.Search(`:\s?(.*)`, readStr)
if reResult != nil {
event.Comment = reResult.Group(1)
continue
}
return event, errors.New("content parse error:" + readStr)
}
}
func (obj *SSE) Range() iter.Seq2[Event, error] {
return func(yield func(Event, error) bool) {
defer obj.Close()
for {
event, err := obj.Recv()
if err == io.EOF {
return
}
if !yield(event, err) || err != nil {
return
}
}
}
}
// close SSE
func (obj *SSE) Close() {
obj.response.ForceCloseConn()
}
// return websocket client
func (obj *Response) WebSocket() *websocket.Conn {
return obj.webSocket
}
// return SSE client
func (obj *Response) SSE() *SSE {
return obj.sse
}
// return URL redirected address
func (obj *Response) Location() (*url.URL, error) {
u, err := obj.response.Location()
if err == http.ErrNoLocation {
err = nil
}
return u, err
}
// return response Proto
func (obj *Response) Proto() string {
return obj.response.Proto
}
// return response cookies
func (obj *Response) Cookies() Cookies {
if obj.filePath != "" {
return nil
}
return obj.response.Cookies()
}
// return response status code
func (obj *Response) StatusCode() int {
if obj.filePath != "" {
return 200
}
return obj.response.StatusCode
}
// return response status
func (obj *Response) Status() string {
if obj.filePath != "" {
return "200 OK"
}
return obj.response.Status
}
// return response url
func (obj *Response) Url() *url.URL {
if obj.filePath != "" {
return nil
}
return obj.response.Request.URL
}
// return response headers
func (obj *Response) Headers() http.Header {
if obj.filePath != "" {
return http.Header{
"Content-Type": []string{obj.ContentType()},
}
}
return obj.response.Header
}
// change decoding with content
func (obj *Response) Decode(encoding string) {
if obj.encoding != encoding {
obj.encoding = encoding
obj.SetContent(tools.Decode(obj.Content(), encoding))
}
}
// return content with map[string]any
func (obj *Response) Map() (data map[string]any, err error) {
_, err = gson.Decode(obj.Content(), &data)
return
}
// return content with json and you can parse struct
func (obj *Response) Json(vals ...any) (*gson.Client, error) {
return gson.Decode(obj.Content(), vals...)
}
// return content with string
func (obj *Response) Text() string {
return tools.BytesToString(obj.Content())
}
// set response content with []byte
func (obj *Response) SetContent(val []byte) {
obj.content = val
}
// return content with []byte
func (obj *Response) Content() []byte {
if !obj.IsWebSocket() && !obj.IsSSE() {
obj.ReadBody()
}
return obj.content
}
// return content with parse html
func (obj *Response) Html() *bs4.Client {
return bs4.NewClient(obj.Text(), obj.Url().String())
}
// return content type
func (obj *Response) ContentType() string {
if obj.filePath != "" {
return http.DetectContentType(obj.content)
}
contentType := obj.response.Header.Get("Content-Type")
if contentType == "" {
contentType = http.DetectContentType(obj.content)
}
return contentType
}
// return content encoding
func (obj *Response) ContentEncoding() string {
if obj.filePath != "" {
return ""
}
return obj.response.Header.Get("Content-Encoding")
}
// return content length
func (obj *Response) ContentLength() int64 {
if obj.filePath != "" {
return int64(len(obj.content))
}
if obj.response.ContentLength >= 0 {
return obj.response.ContentLength
}
return int64(len(obj.content))
}
type barBody struct {
body *bytes.Buffer
bar *bar.Client
}
func (obj *barBody) Write(con []byte) (int, error) {
l, err := obj.body.Write(con)
obj.bar.Add(int64(l))
return l, err
}
func (obj *Response) defaultDecode() bool {
return strings.Contains(obj.ContentType(), "html")
}
func (obj *Response) Body() io.ReadCloser {
return obj.response.Body
}
// return true if response is stream
func (obj *Response) IsStream() bool {
return obj.option.Stream
}
// return true if response is other stream
func (obj *Response) IsWebSocket() bool {
return obj.webSocket != nil
}
func (obj *Response) IsSSE() bool {
return obj.sse != nil
}
// read body
func (obj *Response) ReadBody() (err error) {
if obj.readBody {
return nil
}
if obj.IsWebSocket() && obj.IsSSE() {
return errors.New("can not read stream")
}
obj.readBody = true
bBody := bytes.NewBuffer(nil)
done := make(chan struct{})
go func() {
if obj.option.Bar && obj.ContentLength() > 0 {
_, err = io.Copy(&barBody{
bar: bar.NewClient(obj.response.ContentLength),
body: bBody,
}, obj.Body())
} else {
_, err = io.Copy(bBody, obj.Body())
}
if err == io.ErrUnexpectedEOF {
err = nil
}
close(done)
}()
select {
case <-obj.ctx.Done():
err = obj.ctx.Err()
case <-done:
}
if err != nil {
obj.ForceCloseConn()
return errors.New("response read content error: " + err.Error())
}
if !obj.option.DisDecode && obj.defaultDecode() {
obj.content, obj.encoding, _ = tools.Charset(bBody.Bytes(), obj.ContentType())
} else {
obj.content = bBody.Bytes()
}
obj.CloseBody()
return
}
// conn is new conn
func (obj *Response) IsNewConn() bool {
return obj.isNewConn
}
// conn proxy
func (obj *Response) Proxys() []Address {
if obj.rawConn != nil {
return obj.rawConn.Proxys()
}
return nil
}
// close body
func (obj *Response) CloseBody() {
obj.close(false)
}
// safe close conn
func (obj *Response) CloseConn() {
obj.close(true)
}
// close
func (obj *Response) close(closeConn bool) {
if obj.webSocket != nil {
obj.webSocket.Close()
}
if obj.sse != nil {
obj.sse.Close()
}
if obj.IsWebSocket() || obj.IsSSE() || !obj.readBody {
obj.ForceCloseConn()
} else if obj.rawConn != nil {
if closeConn {
obj.rawConn.CloseConn()
} else {
obj.rawConn.Close()
}
}
obj.cnl() //must later
}
// force close conn
func (obj *Response) ForceCloseConn() {
if obj.rawConn != nil {
obj.rawConn.ForceCloseConn()
}
}
package requests
import (
"bufio"
"bytes"
"context"
"errors"
"github.com/gospider007/bar"
"github.com/gospider007/bs4"
"github.com/gospider007/gson"
"github.com/gospider007/re"
"github.com/gospider007/tools"
"github.com/gospider007/websocket"
"io"
"iter"
"net/http"
"net/url"
"strconv"
"strings"
)
func NewResponse(ctx context.Context, option RequestOption) *Response {
return &Response{
ctx: ctx,
option: &option,
}
}
func (obj *Response) Err() error {
if obj.err != nil {
return obj.err
}
if obj.request != nil {
return obj.request.Context().Err()
}
return obj.ctx.Err()
}
func (obj *Response) Request() *http.Request {
return obj.request
}
func (obj *Response) Response() *http.Response {
return obj.response
}
func (obj *Response) Context() context.Context {
if obj.request != nil {
return obj.request.Context()
}
return obj.ctx
}
func (obj *Response) Option() *RequestOption {
return obj.option
}
func (obj *Response) Client() *Client {
return obj.client
}
type Response struct {
err error
ctx context.Context
request *http.Request
rawConn *readWriteCloser
response *http.Response
webSocket *websocket.Conn
sse *SSE
cnl context.CancelFunc
option *RequestOption
client *Client
encoding string
filePath string
requestId string
content []byte
proxys []*url.URL
readBody bool
isNewConn bool
}
type SSE struct {
reader *bufio.Reader
response *Response
}
type Event struct {
Data string //data
Event string //event
Id string //id
Comment string //comment info
Retry int //retry num
}
func newSSE(response *Response) *SSE {
return &SSE{response: response, reader: bufio.NewReader(response.Body())}
}
// recv SSE envent data
func (obj *SSE) Recv() (Event, error) {
var event Event
for {
readStr, err := obj.reader.ReadString('\n')
if err != nil || readStr == "\n" {
return event, err
}
reResult := re.Search(`data:\s?(.*)`, readStr)
if reResult != nil {
event.Data += reResult.Group(1)
continue
}
reResult = re.Search(`event:\s?(.*)`, readStr)
if reResult != nil {
event.Event = reResult.Group(1)
continue
}
reResult = re.Search(`id:\s?(.*)`, readStr)
if reResult != nil {
event.Id = reResult.Group(1)
continue
}
reResult = re.Search(`retry:\s?(.*)`, readStr)
if reResult != nil {
if event.Retry, err = strconv.Atoi(reResult.Group(1)); err != nil {
return event, err
}
continue
}
reResult = re.Search(`:\s?(.*)`, readStr)
if reResult != nil {
event.Comment = reResult.Group(1)
continue
}
return event, errors.New("content parse error:" + readStr)
}
}
func (obj *SSE) Range() iter.Seq2[Event, error] {
return func(yield func(Event, error) bool) {
defer obj.Close()
for {
event, err := obj.Recv()
if err == io.EOF {
return
}
if !yield(event, err) || err != nil {
return
}
}
}
}
// close SSE
func (obj *SSE) Close() {
obj.response.ForceCloseConn()
}
// return websocket client
func (obj *Response) WebSocket() *websocket.Conn {
return obj.webSocket
}
// return SSE client
func (obj *Response) SSE() *SSE {
return obj.sse
}
// return URL redirected address
func (obj *Response) Location() (*url.URL, error) {
u, err := obj.response.Location()
if err == http.ErrNoLocation {
err = nil
}
return u, err
}
// return response Proto
func (obj *Response) Proto() string {
return obj.response.Proto
}
// return response cookies
func (obj *Response) Cookies() Cookies {
if obj.filePath != "" {
return nil
}
return obj.response.Cookies()
}
// return response status code
func (obj *Response) StatusCode() int {
if obj.filePath != "" {
return 200
}
return obj.response.StatusCode
}
// return response status
func (obj *Response) Status() string {
if obj.filePath != "" {
return "200 OK"
}
return obj.response.Status
}
// return response url
func (obj *Response) Url() *url.URL {
if obj.filePath != "" {
return nil
}
return obj.response.Request.URL
}
// return response headers
func (obj *Response) Headers() http.Header {
if obj.filePath != "" {
return http.Header{
"Content-Type": []string{obj.ContentType()},
}
}
return obj.response.Header
}
// change decoding with content
func (obj *Response) Decode(encoding string) {
if obj.encoding != encoding {
obj.encoding = encoding
obj.SetContent(tools.Decode(obj.Content(), encoding))
}
}
// return content with map[string]any
func (obj *Response) Map() (data map[string]any, err error) {
_, err = gson.Decode(obj.Content(), &data)
return
}
// return content with json and you can parse struct
func (obj *Response) Json(vals ...any) (*gson.Client, error) {
return gson.Decode(obj.Content(), vals...)
}
// return content with string
func (obj *Response) Text() string {
return tools.BytesToString(obj.Content())
}
// set response content with []byte
func (obj *Response) SetContent(val []byte) {
obj.content = val
}
// return content with []byte
func (obj *Response) Content() []byte {
if !obj.IsWebSocket() && !obj.IsSSE() {
obj.ReadBody()
}
return obj.content
}
// return content with parse html
func (obj *Response) Html() *bs4.Client {
return bs4.NewClient(obj.Text(), obj.Url().String())
}
// return content type
func (obj *Response) ContentType() string {
if obj.filePath != "" {
return http.DetectContentType(obj.content)
}
contentType := obj.response.Header.Get("Content-Type")
if contentType == "" {
contentType = http.DetectContentType(obj.content)
}
return contentType
}
// return content encoding
func (obj *Response) ContentEncoding() string {
if obj.filePath != "" {
return ""
}
return obj.response.Header.Get("Content-Encoding")
}
// return content length
func (obj *Response) ContentLength() int64 {
if obj.filePath != "" {
return int64(len(obj.content))
}
if obj.response.ContentLength >= 0 {
return obj.response.ContentLength
}
return int64(len(obj.content))
}
type barBody struct {
body *bytes.Buffer
bar *bar.Client
}
func (obj *barBody) Write(con []byte) (int, error) {
l, err := obj.body.Write(con)
obj.bar.Add(int64(l))
return l, err
}
func (obj *Response) defaultDecode() bool {
return strings.Contains(obj.ContentType(), "html")
}
func (obj *Response) Body() io.ReadCloser {
return obj.response.Body
}
// return true if response is stream
func (obj *Response) IsStream() bool {
return obj.option.Stream
}
// return true if response is other stream
func (obj *Response) IsWebSocket() bool {
return obj.webSocket != nil
}
func (obj *Response) IsSSE() bool {
return obj.sse != nil
}
// read body
func (obj *Response) ReadBody() (err error) {
if obj.readBody {
return nil
}
if obj.IsWebSocket() && obj.IsSSE() {
return errors.New("can not read stream")
}
obj.readBody = true
bBody := bytes.NewBuffer(nil)
done := make(chan struct{})
go func() {
if obj.option.Bar && obj.ContentLength() > 0 {
_, err = io.Copy(&barBody{
bar: bar.NewClient(obj.response.ContentLength),
body: bBody,
}, obj.Body())
} else {
_, err = io.Copy(bBody, obj.Body())
}
if err == io.ErrUnexpectedEOF {
err = nil
}
close(done)
}()
select {
case <-obj.ctx.Done():
err = obj.ctx.Err()
case <-done:
}
if err != nil {
obj.ForceCloseConn()
return errors.New("response read content error: " + err.Error())
}
if !obj.option.DisDecode && obj.defaultDecode() {
obj.content, obj.encoding, _ = tools.Charset(bBody.Bytes(), obj.ContentType())
} else {
obj.content = bBody.Bytes()
}
obj.CloseBody()
return
}
// conn is new conn
func (obj *Response) IsNewConn() bool {
return obj.isNewConn
}
// conn proxy
func (obj *Response) Proxys() []Address {
if obj.rawConn != nil {
return obj.rawConn.Proxys()
}
return nil
}
// close body
func (obj *Response) CloseBody() {
obj.close(false)
}
// safe close conn
func (obj *Response) CloseConn() {
obj.close(true)
}
// close
func (obj *Response) close(closeConn bool) {
if obj.webSocket != nil {
obj.webSocket.Close()
}
if obj.sse != nil {
obj.sse.Close()
}
if obj.IsWebSocket() || obj.IsSSE() || !obj.readBody {
obj.ForceCloseConn()
} else if obj.rawConn != nil {
if closeConn {
obj.rawConn.CloseConn()
} else {
obj.rawConn.Close()
}
}
obj.cnl() //must later
}
// force close conn
func (obj *Response) ForceCloseConn() {
if obj.rawConn != nil {
obj.rawConn.ForceCloseConn()
}
}

102
rw.go
View File

@@ -1,51 +1,51 @@
package requests
import (
"errors"
"io"
"net"
)
type readWriteCloser struct {
body io.ReadCloser
conn *connecotr
isClosed bool
err error
}
func (obj *readWriteCloser) Conn() net.Conn {
return obj.conn.Conn.(net.Conn)
}
func (obj *readWriteCloser) Read(p []byte) (n int, err error) {
if obj.isClosed {
return 0, obj.err
}
i, err := obj.body.Read(p)
if err == io.EOF {
obj.Close()
}
obj.err = err
return i, err
}
func (obj *readWriteCloser) Proxys() []Address {
return obj.conn.proxys
}
var errGospiderBodyClose = errors.New("gospider body close error")
func (obj *readWriteCloser) Close() (err error) {
obj.isClosed = true
obj.conn.bodyCnl(errGospiderBodyClose)
return obj.body.Close() //reuse conn
}
// safe close conn
func (obj *readWriteCloser) CloseConn() {
obj.conn.bodyCnl(errors.New("readWriterCloser close conn"))
obj.conn.safeCnl(errors.New("readWriterCloser close conn"))
}
// force close conn
func (obj *readWriteCloser) ForceCloseConn() {
obj.conn.CloseWithError(errConnectionForceClosed)
}
package requests
import (
"errors"
"io"
"net"
)
type readWriteCloser struct {
body io.ReadCloser
err error
conn *connecotr
isClosed bool
}
func (obj *readWriteCloser) Conn() net.Conn {
return obj.conn.Conn.(net.Conn)
}
func (obj *readWriteCloser) Read(p []byte) (n int, err error) {
if obj.isClosed {
return 0, obj.err
}
i, err := obj.body.Read(p)
if err == io.EOF {
obj.Close()
}
obj.err = err
return i, err
}
func (obj *readWriteCloser) Proxys() []Address {
return obj.conn.proxys
}
var errGospiderBodyClose = errors.New("gospider body close error")
func (obj *readWriteCloser) Close() (err error) {
obj.isClosed = true
obj.conn.bodyCnl(errGospiderBodyClose)
return obj.body.Close() //reuse conn
}
// safe close conn
func (obj *readWriteCloser) CloseConn() {
obj.conn.bodyCnl(errors.New("readWriterCloser close conn"))
obj.conn.safeCnl(errors.New("readWriterCloser close conn"))
}
// force close conn
func (obj *readWriteCloser) ForceCloseConn() {
obj.conn.CloseWithError(errConnectionForceClosed)
}

View File

@@ -74,10 +74,10 @@ type Address struct {
Password string
Name string
Host string
IP net.IP
Port int
NetWork string
Scheme string
IP net.IP
Port int
}
func (a Address) String() string {
@@ -132,13 +132,13 @@ func ReadUdpAddr(r io.Reader) (Address, error) {
}
type UDPConn struct {
bufRead [MaxUdpPacket]byte
bufWrite [MaxUdpPacket]byte
proxyAddress net.Addr
// defaultTarget net.Addr
prefix []byte
net.PacketConn
UDPConn *net.UDPConn
// defaultTarget net.Addr
prefix []byte
bufRead [MaxUdpPacket]byte
bufWrite [MaxUdpPacket]byte
}
func NewUDPConn(packConn net.PacketConn, proxyAddress net.Addr) (*UDPConn, error) {