mirror of
https://github.com/eolinker/apinto
synced 2025-12-24 13:28:15 +08:00
access log新增系统变量
This commit is contained in:
@@ -2,6 +2,8 @@ package prometheus
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/eolinker/apinto/drivers"
|
||||
scope_manager "github.com/eolinker/apinto/scope-manager"
|
||||
"github.com/eolinker/apinto/utils"
|
||||
@@ -9,7 +11,6 @@ import (
|
||||
"github.com/eolinker/eosc/router"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func Check(v *Config, workers map[eosc.RequireId]eosc.IWorker) error {
|
||||
@@ -164,7 +165,6 @@ func Create(id, name string, cfg *Config, workers map[eosc.RequireId]eosc.IWorke
|
||||
}
|
||||
metrics[metric.Metric] = m
|
||||
}
|
||||
|
||||
//注册路由
|
||||
p.registry = registry
|
||||
p.handler = promhttp.InstrumentMetricHandler(
|
||||
|
||||
@@ -126,7 +126,8 @@ func (h *complete) Complete(org eocontext.EoContext) error {
|
||||
request.URI().SetHost(targetHost)
|
||||
}
|
||||
response := fasthttp.AcquireResponse()
|
||||
lastErr = fasthttp_client.ProxyTimeout(scheme, node, request, response, timeOut)
|
||||
|
||||
_, lastErr = fasthttp_client.ProxyTimeout(scheme, node, request, response, timeOut)
|
||||
if lastErr == nil {
|
||||
return newGRPCResponse(ctx, response, methodDesc)
|
||||
}
|
||||
|
||||
@@ -235,6 +235,12 @@ var (
|
||||
"set_cookies": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) {
|
||||
return strings.Split(ctx.Response().GetHeader("Set-Cookie"), "; "), true
|
||||
}),
|
||||
"dst_ip": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) {
|
||||
return ctx.Response().RemoteIP(), true
|
||||
}),
|
||||
"dst_port": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) {
|
||||
return ctx.Response().RemotePort(), true
|
||||
}),
|
||||
"proxy": proxyFields,
|
||||
}
|
||||
|
||||
@@ -265,6 +271,12 @@ var (
|
||||
"addr": ProxyReadFunc(func(name string, proxy http_service.IProxy) (interface{}, bool) {
|
||||
return proxy.URI().Host(), true
|
||||
}),
|
||||
"dst_ip": ProxyReadFunc(func(name string, proxy http_service.IProxy) (interface{}, bool) {
|
||||
return proxy.RemotePort(), true
|
||||
}),
|
||||
"dst_port": ProxyReadFunc(func(name string, proxy http_service.IProxy) (interface{}, bool) {
|
||||
return proxy.RemotePort(), true
|
||||
}),
|
||||
"scheme": ProxyReadFunc(func(name string, proxy http_service.IProxy) (interface{}, bool) {
|
||||
return proxy.URI().Scheme(), true
|
||||
}),
|
||||
|
||||
@@ -13,13 +13,41 @@ import (
|
||||
"github.com/valyala/fasthttp"
|
||||
)
|
||||
|
||||
func ProxyTimeout(scheme string, node eocontext.INode, req *fasthttp.Request, resp *fasthttp.Response, timeout time.Duration) error {
|
||||
addr := fmt.Sprintf("%s://%s", scheme, node.Addr())
|
||||
err := defaultClient.ProxyTimeout(addr, req, resp, timeout)
|
||||
type Addr struct {
|
||||
IP net.IP
|
||||
Port int
|
||||
}
|
||||
|
||||
func resolveAddr(scheme string, addr string) (*Addr, error) {
|
||||
tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
port := tcpAddr.Port
|
||||
if port == 0 {
|
||||
if scheme == "http" {
|
||||
port = 80
|
||||
} else if scheme == "https" {
|
||||
port = 443
|
||||
}
|
||||
}
|
||||
return &Addr{
|
||||
IP: tcpAddr.IP,
|
||||
Port: port,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func ProxyTimeout(scheme string, node eocontext.INode, req *fasthttp.Request, resp *fasthttp.Response, timeout time.Duration) (*Addr, error) {
|
||||
tcpAddr, err := resolveAddr(scheme, node.Addr())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
addr := fmt.Sprintf("%s://%s:%d", scheme, tcpAddr.IP.String(), tcpAddr.Port)
|
||||
err = defaultClient.ProxyTimeout(addr, req, resp, timeout)
|
||||
if err != nil {
|
||||
node.Down()
|
||||
}
|
||||
return err
|
||||
return tcpAddr, err
|
||||
}
|
||||
|
||||
var defaultClient Client
|
||||
@@ -101,6 +129,32 @@ func (c *Client) getHostClient(addr string) (*fasthttp.HostClient, string, error
|
||||
return hc, scheme, nil
|
||||
}
|
||||
|
||||
//func (c *Client) getDialFunc() fasthttp.DialFunc {
|
||||
// return func(addr string) (net.Conn, error) {
|
||||
// atomic.AddInt64(&dialCount, 1)
|
||||
// conn, err := tcpDial.Dial(addr)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// c.conn = conn
|
||||
// return &debugConn{Conn: conn}, nil
|
||||
// }
|
||||
//}
|
||||
//
|
||||
//func (c *Client) RemoteAddr() string {
|
||||
// if c.conn != nil {
|
||||
// return c.conn.RemoteAddr().String()
|
||||
// }
|
||||
// return "unknown"
|
||||
//}
|
||||
//
|
||||
//func (c *Client) LocalAddr() string {
|
||||
// if c.conn != nil {
|
||||
// return c.conn.RemoteAddr().String()
|
||||
// }
|
||||
// return "unknown"
|
||||
//}
|
||||
|
||||
// ProxyTimeout performs the given request and waits for response during
|
||||
// the given timeout duration.
|
||||
//
|
||||
|
||||
@@ -2,13 +2,14 @@ package fasthttp_client
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/eolinker/eosc/debug"
|
||||
"github.com/valyala/fasthttp"
|
||||
"net"
|
||||
"net/http"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/eolinker/eosc/debug"
|
||||
"github.com/valyala/fasthttp"
|
||||
)
|
||||
|
||||
var (
|
||||
|
||||
@@ -139,9 +139,9 @@ func (ctx *cloneContext) SendTo(scheme string, node eoscContext.INode, timeout t
|
||||
case eoscContext.ReWriteHost:
|
||||
request.URI().SetHost(targetHost)
|
||||
}
|
||||
|
||||
var tcpAddr *fasthttp_client.Addr
|
||||
beginTime := time.Now()
|
||||
ctx.responseError = fasthttp_client.ProxyTimeout(scheme, node, request, ctx.response.Response, timeout)
|
||||
tcpAddr, ctx.responseError = fasthttp_client.ProxyTimeout(scheme, node, request, ctx.response.Response, timeout)
|
||||
agent := newRequestAgent(&ctx.proxyRequest, host, scheme, beginTime, time.Now())
|
||||
if ctx.responseError != nil {
|
||||
agent.setStatusCode(504)
|
||||
@@ -149,7 +149,8 @@ func (ctx *cloneContext) SendTo(scheme string, node eoscContext.INode, timeout t
|
||||
agent.setStatusCode(ctx.response.Response.StatusCode())
|
||||
}
|
||||
agent.responseBody = string(ctx.response.Response.Body())
|
||||
|
||||
agent.setRemoteIP(tcpAddr.IP.String())
|
||||
agent.setRemotePort(tcpAddr.Port)
|
||||
agent.setResponseLength(ctx.response.Response.Header.ContentLength())
|
||||
|
||||
ctx.proxyRequests = append(ctx.proxyRequests, agent)
|
||||
|
||||
@@ -143,9 +143,9 @@ func (ctx *HttpContext) SendTo(scheme string, node eoscContext.INode, timeout ti
|
||||
case eoscContext.ReWriteHost:
|
||||
request.URI().SetHost(targetHost)
|
||||
}
|
||||
|
||||
var tcpAddr *fasthttp_client.Addr
|
||||
beginTime := time.Now()
|
||||
ctx.response.responseError = fasthttp_client.ProxyTimeout(scheme, node, request, &ctx.fastHttpRequestCtx.Response, timeout)
|
||||
tcpAddr, ctx.response.responseError = fasthttp_client.ProxyTimeout(scheme, node, request, &ctx.fastHttpRequestCtx.Response, timeout)
|
||||
agent := newRequestAgent(&ctx.proxyRequest, host, scheme, beginTime, time.Now())
|
||||
if ctx.response.responseError != nil {
|
||||
agent.setStatusCode(504)
|
||||
@@ -154,8 +154,12 @@ func (ctx *HttpContext) SendTo(scheme string, node eoscContext.INode, timeout ti
|
||||
agent.setStatusCode(ctx.fastHttpRequestCtx.Response.StatusCode())
|
||||
}
|
||||
agent.responseBody = string(ctx.response.Response.Body())
|
||||
agent.setRemoteIP(tcpAddr.IP.String())
|
||||
agent.setRemotePort(tcpAddr.Port)
|
||||
agent.setResponseLength(ctx.fastHttpRequestCtx.Response.Header.ContentLength())
|
||||
|
||||
ctx.response.remoteIP = tcpAddr.IP.String()
|
||||
ctx.response.remotePort = tcpAddr.Port
|
||||
ctx.proxyRequests = append(ctx.proxyRequests, agent)
|
||||
return ctx.response.responseError
|
||||
|
||||
|
||||
@@ -20,6 +20,8 @@ type requestAgent struct {
|
||||
beginTime time.Time
|
||||
endTime time.Time
|
||||
hostAgent *UrlAgent
|
||||
remoteIP string
|
||||
remotePort int
|
||||
}
|
||||
|
||||
func (a *requestAgent) ResponseBody() string {
|
||||
@@ -53,6 +55,14 @@ func (a *requestAgent) setResponseLength(length int) {
|
||||
}
|
||||
}
|
||||
|
||||
func (a *requestAgent) setRemoteIP(ip string) {
|
||||
a.remoteIP = ip
|
||||
}
|
||||
|
||||
func (a *requestAgent) setRemotePort(port int) {
|
||||
a.remotePort = port
|
||||
}
|
||||
|
||||
func newRequestAgent(IRequest http_service.IRequest, host string, scheme string, beginTime, endTime time.Time) *requestAgent {
|
||||
return &requestAgent{IRequest: IRequest, host: host, scheme: scheme, beginTime: beginTime, endTime: endTime}
|
||||
}
|
||||
@@ -61,6 +71,14 @@ func (a *requestAgent) ResponseTime() int64 {
|
||||
return a.endTime.Sub(a.beginTime).Milliseconds()
|
||||
}
|
||||
|
||||
func (a *requestAgent) RemoteIP() string {
|
||||
return a.remoteIP
|
||||
}
|
||||
|
||||
func (a *requestAgent) RemotePort() int {
|
||||
return a.remotePort
|
||||
}
|
||||
|
||||
func (a *requestAgent) URI() http_service.IURIWriter {
|
||||
if a.hostAgent == nil {
|
||||
a.hostAgent = NewUrlAgent(a.IRequest.URI(), a.host, a.scheme)
|
||||
|
||||
@@ -19,6 +19,8 @@ type Response struct {
|
||||
responseTime time.Duration
|
||||
proxyStatusCode int
|
||||
responseError error
|
||||
remoteIP string
|
||||
remotePort int
|
||||
}
|
||||
|
||||
func (r *Response) ContentLength() int {
|
||||
@@ -119,3 +121,11 @@ func (r *Response) SetResponseTime(t time.Duration) {
|
||||
func (r *Response) ResponseTime() time.Duration {
|
||||
return r.responseTime
|
||||
}
|
||||
|
||||
func (r *Response) RemoteIP() string {
|
||||
return r.remoteIP
|
||||
}
|
||||
|
||||
func (r *Response) RemotePort() int {
|
||||
return r.remotePort
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user