完成complete方法

This commit is contained in:
chenjiekun
2023-03-01 18:09:23 +08:00
parent 1c0e58563d
commit bf7143c930
6 changed files with 186 additions and 13 deletions

View File

@@ -0,0 +1,106 @@
package proxy_mirror
import (
"fmt"
"github.com/eolinker/eosc/eocontext"
http_service "github.com/eolinker/eosc/eocontext/http-context"
"github.com/eolinker/eosc/log"
"time"
)
type httpComplete struct {
proxyCfg *Config
}
func newHttpMirrorComplete(proxyCfg *Config) eocontext.CompleteHandler {
return &httpComplete{
proxyCfg: proxyCfg,
}
}
func (h *httpComplete) Complete(org eocontext.EoContext) error {
ctx, err := http_service.Assert(org)
if err != nil {
return err
}
//设置响应开始时间
proxyTime := time.Now()
defer func() {
//设置原始响应状态码
ctx.Response().SetProxyStatus(ctx.Response().StatusCode(), "")
//设置上游响应总时间, 单位为毫秒
//ctx.WithValue("response_time", time.Now().Sub(proxyTime).Milliseconds())
ctx.Response().SetResponseTime(time.Now().Sub(proxyTime))
ctx.SetLabel("handler", "proxy")
}()
var lastErr error
timeOut := time.Duration(h.proxyCfg.Timeout) * time.Millisecond
//构造addr
path := ctx.Request().URI().Path()
if h.proxyCfg.Path != "" {
switch h.proxyCfg.PathMode {
case pathModeReplace:
path = h.proxyCfg.Path
case pathModePrefix:
path = fmt.Sprintf("%s%s", h.proxyCfg.Path, path)
}
}
addr := fmt.Sprintf("%s%s", h.proxyCfg.Host, path)
lastErr = ctx.SendTo(addr, timeOut)
if lastErr == nil {
return nil
}
log.Error("http proxyMirror send error: ", lastErr)
return lastErr
}
type dubbo2Complete struct {
proxyCfg *Config
}
func newDubbo2MirrorComplete(proxyCfg *Config) eocontext.CompleteHandler {
return &httpComplete{
proxyCfg: proxyCfg,
}
}
func (d *dubbo2Complete) Complete(ctx eocontext.EoContext) error {
//TODO implement me
return nil
}
type grpcComplete struct {
proxyCfg *Config
}
func newGrpcMirrorComplete(proxyCfg *Config) eocontext.CompleteHandler {
return &httpComplete{
proxyCfg: proxyCfg,
}
}
func (g *grpcComplete) Complete(ctx eocontext.EoContext) error {
//TODO implement me
return nil
}
type websocketComplete struct {
proxyCfg *Config
}
func newWebsocketMirrorComplete(proxyCfg *Config) eocontext.CompleteHandler {
return &httpComplete{
proxyCfg: proxyCfg,
}
}
func (w *websocketComplete) Complete(ctx eocontext.EoContext) error {
//TODO implement me
return nil
}

View File

@@ -13,6 +13,11 @@ type SampleConfig struct {
RandomPivot int `json:"random_pivot" label:"随机数锚点"`
}
const (
pathModeReplace = "replace"
pathModePrefix = "prefix"
)
func (c *Config) doCheck() error {
//TODO

View File

@@ -1 +1,5 @@
package proxy_mirror
import "github.com/pkg/errors"
var ErrUnsupportedType = errors.New("send mirror proxy fail. Unsupported Context Type")

View File

@@ -0,0 +1,52 @@
package proxy_mirror
import (
"github.com/eolinker/eosc/eocontext"
dubbo2_context "github.com/eolinker/eosc/eocontext/dubbo2-context"
grpc_context "github.com/eolinker/eosc/eocontext/grpc-context"
http_service "github.com/eolinker/eosc/eocontext/http-context"
log "github.com/eolinker/goku-api-gateway/goku-log"
)
type proxyMirrorCompleteHandler struct {
orgComplete eocontext.CompleteHandler
mirrorComplete eocontext.CompleteHandler
}
func newMirrorHandler(eoCtx eocontext.EoContext, proxyCfg *Config) (eocontext.CompleteHandler, error) {
handler := &proxyMirrorCompleteHandler{
orgComplete: eoCtx.GetComplete(),
}
if _, success := eoCtx.(http_service.IHttpContext); success {
handler.mirrorComplete = newHttpMirrorComplete(proxyCfg)
} else if _, success = eoCtx.(grpc_context.IGrpcContext); success {
handler.mirrorComplete = newGrpcMirrorComplete(proxyCfg)
} else if _, success = eoCtx.(dubbo2_context.IDubbo2Context); success {
handler.mirrorComplete = newDubbo2MirrorComplete(proxyCfg)
} else if _, success = eoCtx.(http_service.IWebsocketContext); success {
handler.mirrorComplete = newWebsocketMirrorComplete(proxyCfg)
} else {
return nil, ErrUnsupportedType
}
return handler, nil
}
func (p *proxyMirrorCompleteHandler) Complete(ctx eocontext.EoContext) error {
//执行镜像请求的Complete
cloneCtx, err := ctx.Clone()
if err != nil {
return err
}
go func() {
err = p.mirrorComplete.Complete(cloneCtx)
if err != nil {
log.Error(err)
}
}()
//执行原始Complete
return p.orgComplete.Complete(ctx)
}

View File

@@ -11,7 +11,6 @@ import (
)
var _ eocontext.IFilter = (*proxyMirror)(nil)
var _ http_service.HttpFilter = (*proxyMirror)(nil)
type proxyMirror struct {
drivers.WorkerBase
@@ -19,10 +18,6 @@ type proxyMirror struct {
}
func (p *proxyMirror) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) error {
return http_service.DoHttpFilter(p, ctx, next)
}
func (p *proxyMirror) DoHttpFilter(ctx http_service.IHttpContext, next eocontext.IChain) error {
if next != nil {
err := next.DoChain(ctx)
if err != nil {
@@ -34,19 +29,26 @@ func (p *proxyMirror) DoHttpFilter(ctx http_service.IHttpContext, next eocontext
rand.Seed(time.Now().UnixNano())
randomNum := rand.Intn(p.proxyConf.SampleConf.RandomRange + 1) //[0,range]范围内整型
if randomNum <= p.proxyConf.SampleConf.RandomPivot { //若随机数在[0,pivot]范围内则进行转发
//进行转发
go sendMirrorProxy(p.proxyConf, ctx)
setMirrorProxy(p.proxyConf, ctx)
}
}
return nil
}
func sendMirrorProxy(proxyCfg *Config, ctx http_service.IHttpContext) {
//先判断当前Ctx是否能Copy,若可以就进行copy并且设置新的APP
//send
func setMirrorProxy(proxyCfg *Config, ctx eocontext.EoContext) {
//先判断当前Ctx是否能Copy
if !ctx.IsCloneable() {
log.Info(ErrUnsupportedType)
return
}
//给ctx设置新的FinishHandler
newCompleteHandler, err := newMirrorHandler(ctx, proxyCfg)
if err != nil {
log.Info(err)
return
}
ctx.SetCompleteHandler(newCompleteHandler)
}
func (p *proxyMirror) Start() error {

View File

@@ -196,7 +196,11 @@ func (ctx *HttpContext) Clone() (eoscContext.EoContext, error) {
cloneResp := fasthttp.AcquireResponse()
ctx.response.Response.CopyTo(cloneResp)
//cloneRequestCtx := new(fasthttp.RequestCtx)
//cloneRequestCtx.Request = *cloneReq
//cloneRequestCtx.Response = *cloneResp
cloneCtx.fastHttpRequestCtx = ctx.fastHttpRequestCtx //TODO
cloneCtx.requestID = ctx.requestID //TODO
cloneCtx.requestReader.reset(cloneReq, remoteAddr)
cloneCtx.proxyRequest.reset(cloneReq, remoteAddr)