This commit is contained in:
黄孟柱
2021-11-17 15:47:52 +08:00
parent cd4e2a94c0
commit e909c779e2
3 changed files with 484 additions and 54 deletions

View File

@@ -0,0 +1,475 @@
package fasthttp_client
import (
"crypto/tls"
"fmt"
"net"
"strconv"
"strings"
"sync"
"time"
"github.com/valyala/fasthttp"
)
// Proxy performs the given http request and fills the given http response.
//
// Request must contain at least non-zero RequestURI with full url (including
// scheme and host) or non-zero Host header + RequestURI.
//
// Client determines the server to be requested in the following order:
//
// - from RequestURI if it contains full url with scheme and host;
// - from Host header otherwise.
//
// The function doesn't follow redirects. Use Get* for following redirects.
//
// Response is ignored if resp is nil.
//
// ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
// to the requested host are busy.
//
// It is recommended obtaining req and resp via AcquireRequest
// and AcquireResponse in performance-critical code.
func Proxy(addr string, req *fasthttp.Request, resp *fasthttp.Response) error {
return defaultClient.Proxy(addr, req, resp)
}
// ProxyTimeout performs the given request and waits for response during
// the given timeout duration.
//
// Request must contain at least non-zero RequestURI with full url (including
// scheme and host) or non-zero Host header + RequestURI.
//
// Client determines the server to be requested in the following order:
//
// - from RequestURI if it contains full url with scheme and host;
// - from Host header otherwise.
//
// The function doesn't follow redirects. Use Get* for following redirects.
//
// Response is ignored if resp is nil.
//
// ErrTimeout is returned if the response wasn't returned during
// the given timeout.
//
// ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
// to the requested host are busy.
//
// It is recommended obtaining req and resp via AcquireRequest
// and AcquireResponse in performance-critical code.
//
// Warning: ProxyTimeout does not terminate the request itself. The request will
// continue in the background and the response will be discarded.
// If requests take too long and the connection pool gets filled up please
// try using a Client and setting a ReadTimeout.
func ProxyTimeout(addr string, req *fasthttp.Request, resp *fasthttp.Response, timeout time.Duration) error {
return defaultClient.ProxyTimeout(addr, req, resp, timeout)
}
// ProxyDeadline performs the given request and waits for response until
// the given deadline.
//
// Request must contain at least non-zero RequestURI with full url (including
// scheme and host) or non-zero Host header + RequestURI.
//
// Client determines the server to be requested in the following order:
//
// - from RequestURI if it contains full url with scheme and host;
// - from Host header otherwise.
//
// The function doesn't follow redirects. Use Get* for following redirects.
//
// Response is ignored if resp is nil.
//
// ErrTimeout is returned if the response wasn't returned until
// the given deadline.
//
// ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
// to the requested host are busy.
//
// It is recommended obtaining req and resp via AcquireRequest
// and AcquireResponse in performance-critical code.
func ProxyDeadline(addr string, req *fasthttp.Request, resp *fasthttp.Response, deadline time.Time) error {
return defaultClient.ProxyDeadline(addr, req, resp, deadline)
}
var defaultClient Client
// Client implements http client.
//
// Copying Client by value is prohibited. Create new instance instead.
//
// It is safe calling Client methods from concurrently running goroutines.
//
// The fields of a Client should not be changed while it is in use.
type Client struct {
// Client name. Used in User-Agent request header.
//
// Default client name is used if not set.
Name string
// NoDefaultUserAgentHeader when set to true, causes the default
// User-Agent header to be excluded from the Request.
NoDefaultUserAgentHeader bool
// Callback for establishing new connections to hosts.
//
// Default Dial is used if not set.
Dial fasthttp.DialFunc
// Attempt to connect to both ipv4 and ipv6 addresses if set to true.
//
// This option is used only if default TCP dialer is used,
// i.e. if Dial is blank.
//
// By default client connects only to ipv4 addresses,
// since unfortunately ipv6 remains broken in many networks worldwide :)
DialDualStack bool
// TLS config for https connections.
//
// Default TLS config is used if not set.
TLSConfig *tls.Config
// Maximum number of connections per each host which may be established.
//
// DefaultMaxConnsPerHost is used if not set.
MaxConnsPerHost int
// Idle keep-alive connections are closed after this duration.
//
// By default idle connections are closed
// after DefaultMaxIdleConnDuration.
MaxIdleConnDuration time.Duration
// Keep-alive connections are closed after this duration.
//
// By default connection duration is unlimited.
MaxConnDuration time.Duration
// Maximum number of attempts for idempotent calls
//
// DefaultMaxIdemponentCallAttempts is used if not set.
MaxIdemponentCallAttempts int
// Per-connection buffer size for responses' reading.
// This also limits the maximum header size.
//
// Default buffer size is used if 0.
ReadBufferSize int
// Per-connection buffer size for requests' writing.
//
// Default buffer size is used if 0.
WriteBufferSize int
// Maximum duration for full response reading (including body).
//
// By default response read timeout is unlimited.
ReadTimeout time.Duration
// Maximum duration for full request writing (including body).
//
// By default request write timeout is unlimited.
WriteTimeout time.Duration
// Maximum response body size.
//
// The client returns ErrBodyTooLarge if this limit is greater than 0
// and response body is greater than the limit.
//
// By default response body size is unlimited.
MaxResponseBodySize int
// Header names are passed as-is without normalization
// if this option is set.
//
// Disabled header names' normalization may be useful only for proxying
// responses to other clients expecting case-sensitive
// header names. See https://github.com/valyala/fasthttp/issues/57
// for details.
//
// By default request and response header names are normalized, i.e.
// The first letter and the first letters following dashes
// are uppercased, while all the other letters are lowercased.
// Examples:
//
// * HOST -> Host
// * content-type -> Content-Type
// * cONTENT-lenGTH -> Content-Length
DisableHeaderNamesNormalizing bool
// Path values are sent as-is without normalization
//
// Disabled path normalization may be useful for proxying incoming requests
// to servers that are expecting paths to be forwarded as-is.
//
// By default path values are normalized, i.e.
// extra slashes are removed, special characters are encoded.
DisablePathNormalizing bool
// Maximum duration for waiting for a free connection.
//
// By default will not waiting, return ErrNoFreeConns immediately
MaxConnWaitTimeout time.Duration
// RetryIf controls whether a retry should be attempted after an error.
//
// By default will use isIdempotent function
RetryIf fasthttp.RetryIfFunc
mLock sync.Mutex
m map[string]*fasthttp.HostClient
ms map[string]*fasthttp.HostClient
}
func readAddress(addr string) (scheme, host string) {
if i := strings.Index(addr, "://"); i > 0 {
return strings.ToLower(addr[:i]), addr[i+3:]
}
return "http", addr
}
func (c *Client) getHostClient(addr string) (*fasthttp.HostClient, string, error) {
scheme, host := readAddress(addr)
isTLS := false
if strings.EqualFold(scheme, "https") {
isTLS = true
} else if !strings.EqualFold(scheme, "http") {
return nil, "", fmt.Errorf("unsupported protocol %q. http and https are supported", scheme)
}
startCleaner := false
c.mLock.Lock()
m := c.m
if isTLS {
m = c.ms
}
if m == nil {
m = make(map[string]*fasthttp.HostClient)
if isTLS {
c.ms = m
} else {
c.m = m
}
}
hc := m[host]
if hc == nil {
hc = &fasthttp.HostClient{
Addr: addMissingPort(host, isTLS),
Name: c.Name,
NoDefaultUserAgentHeader: c.NoDefaultUserAgentHeader,
Dial: c.Dial,
DialDualStack: c.DialDualStack,
IsTLS: isTLS,
TLSConfig: c.TLSConfig,
MaxConns: c.MaxConnsPerHost,
MaxIdleConnDuration: c.MaxIdleConnDuration,
MaxConnDuration: c.MaxConnDuration,
MaxIdemponentCallAttempts: c.MaxIdemponentCallAttempts,
ReadBufferSize: c.ReadBufferSize,
WriteBufferSize: c.WriteBufferSize,
ReadTimeout: c.ReadTimeout,
WriteTimeout: c.WriteTimeout,
MaxResponseBodySize: c.MaxResponseBodySize,
DisableHeaderNamesNormalizing: c.DisableHeaderNamesNormalizing,
DisablePathNormalizing: c.DisablePathNormalizing,
MaxConnWaitTimeout: c.MaxConnWaitTimeout,
RetryIf: c.RetryIf,
}
m[string(host)] = hc
if len(m) == 1 {
startCleaner = true
}
}
c.mLock.Unlock()
if startCleaner {
go c.mCleaner(m)
}
return hc, scheme, nil
}
// ProxyTimeout performs the given request and waits for response during
// the given timeout duration.
//
// Request must contain at least non-zero RequestURI with full url (including
// scheme and host) or non-zero Host header + RequestURI.
//
// Client determines the server to be requested in the following order:
//
// - from RequestURI if it contains full url with scheme and host;
// - from Host header otherwise.
//
// The function doesn't follow redirects. Use Get* for following redirects.
//
// Response is ignored if resp is nil.
//
// ErrTimeout is returned if the response wasn't returned during
// the given timeout.
//
// ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
// to the requested host are busy.
//
// It is recommended obtaining req and resp via AcquireRequest
// and AcquireResponse in performance-critical code.
//
// Warning: ProxyTimeout does not terminate the request itself. The request will
// continue in the background and the response will be discarded.
// If requests take too long and the connection pool gets filled up please
// try setting a ReadTimeout.
func (c *Client) ProxyTimeout(addr string, req *fasthttp.Request, resp *fasthttp.Response, timeout time.Duration) error {
client, scheme, err := c.getHostClient(addr)
if err != nil {
return err
}
old := string(req.URI().Scheme())
req.URI().SetScheme(scheme)
defer req.URI().SetScheme(old)
return client.DoTimeout(req, resp, timeout)
}
// ProxyDeadline performs the given request and waits for response until
// the given deadline.
//
// Request must contain at least non-zero RequestURI with full url (including
// scheme and host) or non-zero Host header + RequestURI.
//
// Client determines the server to be requested in the following order:
//
// - from RequestURI if it contains full url with scheme and host;
// - from Host header otherwise.
//
// The function doesn't follow redirects. Use Get* for following redirects.
//
// Response is ignored if resp is nil.
//
// ErrTimeout is returned if the response wasn't returned until
// the given deadline.
//
// ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
// to the requested host are busy.
//
// It is recommended obtaining req and resp via AcquireRequest
// and AcquireResponse in performance-critical code.
func (c *Client) ProxyDeadline(address string, req *fasthttp.Request, resp *fasthttp.Response, deadline time.Time) error {
client, scheme, err := c.getHostClient(address)
if err != nil {
return err
}
old := string(req.URI().Scheme())
req.URI().SetScheme(scheme)
defer req.URI().SetScheme(old)
return client.DoDeadline(req, resp, deadline)
}
// DoRedirects performs the given http request and fills the given http response,
// following up to maxRedirectsCount redirects. When the redirect count exceeds
// maxRedirectsCount, ErrTooManyRedirects is returned.
//
// Request must contain at least non-zero RequestURI with full url (including
// scheme and host) or non-zero Host header + RequestURI.
//
// Client determines the server to be requested in the following order:
//
// - from RequestURI if it contains full url with scheme and host;
// - from Host header otherwise.
//
// Response is ignored if resp is nil.
//
// ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
// to the requested host are busy.
//
// It is recommended obtaining req and resp via AcquireRequest
// and AcquireResponse in performance-critical code.
func (c *Client) DoRedirects(address string, req *fasthttp.Request, resp *fasthttp.Response, maxRedirectsCount int) error {
client, scheme, err := c.getHostClient(address)
if err != nil {
return err
}
old := string(req.URI().Scheme())
req.URI().SetScheme(scheme)
defer req.URI().SetScheme(old)
return client.DoRedirects(req, resp, maxRedirectsCount)
}
// Proxy performs the given http request and fills the given http response.
//
// Request must contain at least non-zero RequestURI with full url (including
// scheme and host) or non-zero Host header + RequestURI.
//
// Client determines the server to be requested in the following order:
//
// - from RequestURI if it contains full url with scheme and host;
// - from Host header otherwise.
//
// Response is ignored if resp is nil.
//
// The function doesn't follow redirects. Use Get* for following redirects.
//
// ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
// to the requested host are busy.
//
// It is recommended obtaining req and resp via AcquireRequest
// and AcquireResponse in performance-critical code.
func (c *Client) Proxy(address string, req *fasthttp.Request, resp *fasthttp.Response) error {
client, scheme, err := c.getHostClient(address)
if err != nil {
return err
}
old := string(req.URI().Scheme())
req.URI().SetScheme(scheme)
defer req.URI().SetScheme(old)
return client.Do(req, resp)
}
func (c *Client) mCleaner(m map[string]*fasthttp.HostClient) {
mustStop := false
sleep := c.MaxIdleConnDuration
if sleep < time.Second {
sleep = time.Second
} else if sleep > 10*time.Second {
sleep = 10 * time.Second
}
for {
c.mLock.Lock()
for k, v := range m {
shouldRemove := v.ConnsCount() == 0
if shouldRemove {
delete(m, k)
}
}
if len(m) == 0 {
mustStop = true
}
c.mLock.Unlock()
if mustStop {
break
}
time.Sleep(sleep)
}
}
func addMissingPort(addr string, isTLS bool) string {
n := strings.Index(addr, ":")
if n >= 0 {
return addr
}
port := 80
if isTLS {
port = 443
}
return net.JoinHostPort(addr, strconv.Itoa(port))
}

View File

@@ -2,9 +2,10 @@ package http_context
import ( import (
"context" "context"
"net/url"
"time" "time"
fasthttp_client "github.com/eolinker/goku/node/fasthttp-client"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
http_service "github.com/eolinker/eosc/http-service" http_service "github.com/eolinker/eosc/http-service"
@@ -39,38 +40,11 @@ type Finish interface {
func (ctx *Context) SendTo(address string, timeout time.Duration) error { func (ctx *Context) SendTo(address string, timeout time.Duration) error {
target, err := url.Parse(address)
if err != nil {
ctx.responseError = err
return err
}
request := ctx.proxyRequest.Request() request := ctx.proxyRequest.Request()
backScheme := string(request.URI().Scheme()) ctx.responseError = fasthttp_client.ProxyTimeout(address, request, &ctx.fastHttpRequestCtx.Response, timeout)
request.URI().SetScheme(target.Scheme) return ctx.responseError
tem := fasthttp.AcquireResponse()
defer func() {
request.URI().SetScheme(backScheme)
fasthttp.ReleaseResponse(tem)
}()
c := fasthttp.HostClient{
Addr: target.Host,
}
err = c.DoTimeout(request, tem, timeout)
if err != nil {
ctx.responseError = err
return err
}
ctx.SetResponse(tem)
return nil
} }
@@ -108,7 +82,7 @@ func NewContext(ctx *fasthttp.RequestCtx) *Context {
requestID: requestID, requestID: requestID,
requestReader: NewRequestReader(&ctx.Request, ctx.RemoteIP().String()), requestReader: NewRequestReader(&ctx.Request, ctx.RemoteIP().String()),
proxyRequest: NewProxyRequest(&ctx.Request, ctx.RemoteIP().String()), proxyRequest: NewProxyRequest(&ctx.Request, ctx.RemoteIP().String()),
response: NewResponse(), response: NewResponse(ctx),
responseError: nil, responseError: nil,
} }
@@ -120,12 +94,6 @@ func (ctx *Context) RequestId() string {
return ctx.requestID return ctx.requestID
} }
func (ctx *Context) SetResponse(response *fasthttp.Response) {
ctx.response.Set(response)
ctx.responseError = nil
}
//Finish finish //Finish finish
func (ctx *Context) Finish() { func (ctx *Context) Finish() {
@@ -135,9 +103,6 @@ func (ctx *Context) Finish() {
return return
} }
ctx.response.WriteTo(ctx.fastHttpRequestCtx)
ctx.response.Finish()
ctx.requestReader.Finish() ctx.requestReader.Finish()
ctx.proxyRequest.Finish() ctx.proxyRequest.Finish()
return return

View File

@@ -15,15 +15,13 @@ type Response struct {
*fasthttp.Response *fasthttp.Response
} }
func (r *Response) Finish() error { func (r *Response) reset() error {
r.ResponseHeader.tmp = nil
fasthttp.ReleaseResponse(r.Response)
return nil return nil
} }
func NewResponse() *Response { func NewResponse(ctx *fasthttp.RequestCtx) *Response {
response := fasthttp.AcquireResponse() return &Response{Response: &ctx.Response, ResponseHeader: NewResponseHeader(&ctx.Response.Header)}
return &Response{Response: fasthttp.AcquireResponse(), ResponseHeader: NewResponseHeader(&response.Header)}
} }
func (r *Response) GetBody() []byte { func (r *Response) GetBody() []byte {
@@ -45,11 +43,3 @@ func (r *Response) SetStatus(code int, status string) {
func (r *Response) SetBody(bytes []byte) { func (r *Response) SetBody(bytes []byte) {
r.Response.SetBody(bytes) r.Response.SetBody(bytes)
} }
func (r *Response) Set(response *fasthttp.Response) {
if response != nil {
r.Response.Reset()
response.CopyTo(r.Response)
r.ResponseHeader = NewResponseHeader(&r.Response.Header)
}
}