This commit is contained in:
chenjiekun
2023-03-02 20:18:09 +08:00
parent a077ec7a20
commit 81dbcdca32
9 changed files with 35 additions and 108 deletions

View File

@@ -1,83 +0,0 @@
package proxy_mirror
import (
"errors"
"fmt"
"github.com/eolinker/eosc/eocontext"
)
var (
errNoValidNode = errors.New("no valid node")
)
type node struct {
labels eocontext.Attrs
id string
ip string
port int
status eocontext.NodeStatus
}
// newNode 创建新节点
func newNode(labels map[string]string, id string, ip string, port int) eocontext.INode {
return &node{labels: labels, id: id, ip: ip, port: port, status: eocontext.Running}
}
// GetAttrs 获取节点属性集合
func (n *node) GetAttrs() eocontext.Attrs {
return n.labels
}
// GetAttrByName 通过属性名获取节点属性
func (n *node) GetAttrByName(name string) (string, bool) {
v, ok := n.labels[name]
return v, ok
}
// IP 返回节点IP
func (n *node) IP() string {
return n.ip
}
// Port 返回节点端口
func (n *node) Port() int {
return n.port
}
// ID 返回节点ID
func (n *node) ID() string {
return n.id
}
// Status 返回节点状态
func (n *node) Status() eocontext.NodeStatus {
return n.status
}
// Labels 返回节点标签集合
func (n *node) Labels() map[string]string {
return n.labels
}
// Addr 返回节点地址
func (n *node) Addr() string {
if n.port == 0 {
return n.ip
}
return fmt.Sprintf("%s:%d", n.ip, n.port)
}
// Up 将节点状态置为运行中
func (n *node) Up() {
n.status = eocontext.Running
}
// Down 将节点状态置为不可用
func (n *node) Down() {
n.status = eocontext.Down
}
// Leave 将节点状态置为离开
func (n *node) Leave() {
n.status = eocontext.Leave
}

View File

@@ -1,13 +1,19 @@
package proxy_mirror package proxy_mirror
import ( import (
"errors"
"fmt" "fmt"
"github.com/eolinker/apinto/discovery"
"github.com/eolinker/eosc/eocontext" "github.com/eolinker/eosc/eocontext"
"strconv" "strconv"
"strings" "strings"
"time" "time"
) )
var (
errNoValidNode = errors.New("no valid node")
)
type mirrorService struct { type mirrorService struct {
scheme string scheme string
passHost eocontext.PassHostMod passHost eocontext.PassHostMod
@@ -32,7 +38,7 @@ func newMirrorService(target, passHost, host string, timeout time.Duration) *mir
port, _ = strconv.Atoi(portStr) port, _ = strconv.Atoi(portStr)
} }
inode := newNode(labels, fmt.Sprintf("%s:%d", ip, port), ip, port) inode := discovery.NewNode(labels, fmt.Sprintf("%s:%d", ip, port), ip, port)
var mode eocontext.PassHostMod var mode eocontext.PassHostMod
switch passHost { switch passHost {

View File

@@ -54,8 +54,8 @@ func (h *httpHandler) ServeHTTP(ctx eocontext.EoContext) {
ctx = wsCtx ctx = wsCtx
} }
//set retry timeout //set retry timeout
ctx.WithValue(http_complete.KeyHttpRetry, h.retry) ctx.WithValue(http_context.KeyHttpRetry, h.retry)
ctx.WithValue(http_complete.KeyHttpTimeout, h.timeout) ctx.WithValue(http_context.KeyHttpTimeout, h.timeout)
//Set Label //Set Label
ctx.SetLabel("api", h.routerName) ctx.SetLabel("api", h.routerName)

View File

@@ -295,11 +295,8 @@ func (d *DubboContext) IsCloneable() bool {
} }
func (d *DubboContext) Clone() (eocontext.EoContext, error) { func (d *DubboContext) Clone() (eocontext.EoContext, error) {
if !d.IsCloneable() {
return nil, fmt.Errorf("%s %w", "DubboContext", eocontext.ErrEoCtxUnCloneable)
}
//TODO //TODO
return nil, nil return nil, fmt.Errorf("%s %w", "DubboContext", eocontext.ErrEoCtxUnCloneable)
} }
func addrToIP(addr net.Addr) net.IP { func addrToIP(addr net.Addr) net.IP {

View File

@@ -266,9 +266,6 @@ func (c *Context) IsCloneable() bool {
} }
func (c *Context) Clone() (eocontext.EoContext, error) { func (c *Context) Clone() (eocontext.EoContext, error) {
if !c.IsCloneable() {
return nil, fmt.Errorf("%s %w", "GrpcContext", eocontext.ErrEoCtxUnCloneable)
}
//TODO //TODO
return nil, nil return nil, fmt.Errorf("%s %w", "GrpcContext", eocontext.ErrEoCtxUnCloneable)
} }

View File

@@ -179,9 +179,7 @@ func (ctx *cloneContext) IsCloneable() bool {
} }
func (ctx *cloneContext) Clone() (eoscContext.EoContext, error) { func (ctx *cloneContext) Clone() (eoscContext.EoContext, error) {
return nil, fmt.Errorf("%s %w", "HttpContext", eoscContext.ErrEoCtxUnCloneable) return nil, fmt.Errorf("%s %w", "HttpContext", eoscContext.ErrEoCtxUnCloneable)
} }
var copyKey = struct{}{} var copyKey = struct{}{}

View File

@@ -13,7 +13,7 @@ import (
eoscContext "github.com/eolinker/eosc/eocontext" eoscContext "github.com/eolinker/eosc/eocontext"
http_service "github.com/eolinker/eosc/eocontext/http-context" http_service "github.com/eolinker/eosc/eocontext/http-context"
uuid "github.com/google/uuid" "github.com/google/uuid"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
) )
@@ -185,19 +185,25 @@ func (ctx *HttpContext) IsCloneable() bool {
} }
func (ctx *HttpContext) Clone() (eoscContext.EoContext, error) { func (ctx *HttpContext) Clone() (eoscContext.EoContext, error) {
copyContext := copyPool.Get().(*cloneContext)
copyContext := &cloneContext{ copyContext.org = ctx
org: ctx, copyContext.proxyRequests = make([]http_service.IProxy, 0, 2)
proxyRequests: make([]http_service.IProxy, 0, 5),
}
req := fasthttp.AcquireRequest() req := fasthttp.AcquireRequest()
ctx.fastHttpRequestCtx.Request.CopyTo(req) ctx.fastHttpRequestCtx.Request.CopyTo(req)
copyContext.proxyRequest.reset(req, ctx.requestReader.remoteAddr) copyContext.proxyRequest.reset(req, ctx.requestReader.remoteAddr)
copyContext.proxyRequests = copyContext.proxyRequests[:0] copyContext.app = ctx.app
copyContext.balance = ctx.balance
copyContext.upstreamHostHandler = ctx.upstreamHostHandler
copyContext.completeHandler = ctx.completeHandler
copyContext.finishHandler = ctx.finishHandler
copyContext.labels = ctx.Labels() cloneLabels := make(map[string]string, len(ctx.labels))
for k, v := range ctx.labels {
cloneLabels[k] = v
}
copyContext.labels = cloneLabels
//记录请求时间 //记录请求时间
copyContext.ctx = context.WithValue(ctx.Context(), copyKey, true) copyContext.ctx = context.WithValue(ctx.Context(), copyKey, true)

View File

@@ -10,6 +10,9 @@ var (
pool = sync.Pool{ pool = sync.Pool{
New: newContext, New: newContext,
} }
copyPool = sync.Pool{
New: newCopyContext,
}
) )
func newContext() interface{} { func newContext() interface{} {
@@ -17,3 +20,9 @@ func newContext() interface{} {
h.proxyRequests = make([]http_service.IProxy, 0, 5) h.proxyRequests = make([]http_service.IProxy, 0, 5)
return h return h
} }
func newCopyContext() interface{} {
h := new(cloneContext)
h.proxyRequests = make([]http_service.IProxy, 0, 5)
return h
}

View File

@@ -86,9 +86,6 @@ func (w *WebsocketContext) IsCloneable() bool {
} }
func (w *WebsocketContext) Clone() (eoscContext.EoContext, error) { func (w *WebsocketContext) Clone() (eoscContext.EoContext, error) {
if !w.IsCloneable() {
return nil, fmt.Errorf("%s %w", "WebsocketContext", eoscContext.ErrEoCtxUnCloneable)
}
//TODO //TODO
return nil, nil return nil, fmt.Errorf("%s %w", "WebsocketContext", eoscContext.ErrEoCtxUnCloneable)
} }