Merge remote-tracking branch 'gitlab/release/v0.12.0' into release/v0.12.0

This commit is contained in:
Liujian
2023-03-03 21:14:10 +08:00
33 changed files with 2860 additions and 28 deletions

View File

@@ -27,10 +27,12 @@ import (
"github.com/eolinker/apinto/drivers/plugins/gzip"
http_to_dubbo2 "github.com/eolinker/apinto/drivers/plugins/http-to-dubbo2"
http_to_grpc "github.com/eolinker/apinto/drivers/plugins/http-to-gRPC"
"github.com/eolinker/apinto/drivers/plugins/http_mocking"
ip_restriction "github.com/eolinker/apinto/drivers/plugins/ip-restriction"
"github.com/eolinker/apinto/drivers/plugins/monitor"
params_transformer "github.com/eolinker/apinto/drivers/plugins/params-transformer"
prometheus_plugin "github.com/eolinker/apinto/drivers/plugins/prometheus"
proxy_mirror "github.com/eolinker/apinto/drivers/plugins/proxy-mirror"
proxy_rewrite "github.com/eolinker/apinto/drivers/plugins/proxy-rewrite"
proxy_rewriteV2 "github.com/eolinker/apinto/drivers/plugins/proxy_rewrite_v2"
rate_limiting "github.com/eolinker/apinto/drivers/plugins/rate-limiting"
@@ -146,4 +148,7 @@ func Register(extenderRegister eosc.IExtenderDriverRegister) {
http_to_grpc.Register(extenderRegister)
protocbuf.Register(extenderRegister)
grpc_to_http.Register(extenderRegister)
proxy_mirror.Register(extenderRegister)
http_mocking.Register(extenderRegister)
}

View File

@@ -17,6 +17,7 @@ install() {
upgrade() {
apinto stop
install
sleep 10s
apinto start
}

View File

@@ -0,0 +1,107 @@
package http_mocking
import (
"encoding/json"
"github.com/eolinker/apinto/utils"
"github.com/eolinker/eosc/eocontext"
http_context "github.com/eolinker/eosc/eocontext/http-context"
"github.com/eolinker/eosc/log"
)
type complete struct {
responseStatus int
contentType string
responseExample string
responseSchema map[string]interface{}
responseHeader map[string]string
}
func NewComplete(responseStatus int, contentType string, responseExample string, responseSchema map[string]interface{}, responseHeader map[string]string) *complete {
return &complete{responseStatus: responseStatus, contentType: contentType, responseExample: responseExample, responseSchema: responseSchema, responseHeader: responseHeader}
}
func (c *complete) Complete(org eocontext.EoContext) error {
ctx, err := http_context.Assert(org)
if err != nil {
return err
}
return c.writeHttp(ctx)
}
func (c *complete) writeHttp(ctx http_context.IHttpContext) error {
ctx.Response().SetHeader("Content-Type", c.contentType)
ctx.Response().SetStatus(c.responseStatus, "")
for k, v := range c.responseHeader {
ctx.Response().SetHeader(k, v)
}
if c.responseExample != "" {
ctx.Response().SetBody([]byte(c.responseExample))
return nil
}
schema := utils.JsonSchemaMockJsUnmarshal(c.responseSchema)
bytes, err := json.Marshal(schema)
if err != nil {
log.Errorf("mocking complete err=%s", err.Error())
return err
}
ctx.Response().SetBody(bytes)
return nil
}
//func (c *complete) writeDubbo2(ctx dubbo2_context.IDubbo2Context) error {
// if c.responseExample != "" {
//
// var val interface{}
// if err := json.Unmarshal([]byte(c.responseExample), &val); err != nil {
// ctx.Response().SetBody(Dubbo2ErrorResult(err))
// return err
// }
//
// ctx.Response().SetBody(getDubbo2Response(val, ctx.Proxy().Attachments()))
// return nil
// }
//
// schema := jsonSchemaUnmarshal(c.responseSchema)
// ctx.Response().SetBody(getDubbo2Response(schema, ctx.Proxy().Attachments()))
// return nil
//}
//func (c *complete) writeGrpc(ctx grpc_context.IGrpcContext) error {
// descriptor, err := c.descriptor.Descriptor().FindSymbol(fmt.Sprintf("%s.%s", ctx.Proxy().Service(), ctx.Proxy().Method()))
// if err != nil {
// return err
// }
// methodDesc := descriptor.GetFile().FindService(ctx.Proxy().Service()).FindMethodByName(ctx.Proxy().Method())
//
// message := dynamic.NewMessage(methodDesc.GetOutputType())
//
// fields := message.GetKnownFields()
// for _, field := range fields {
// switch field.GetType() {
// case descriptorpb.FieldDescriptorProto_TYPE_DOUBLE, descriptorpb.FieldDescriptorProto_TYPE_FLOAT: //float32
// message.SetField(field, gofakeit.Float32())
// case descriptorpb.FieldDescriptorProto_TYPE_INT64, descriptorpb.FieldDescriptorProto_TYPE_SINT64, descriptorpb.FieldDescriptorProto_TYPE_SFIXED64: //int64
// message.SetField(field, gofakeit.Int64())
// case descriptorpb.FieldDescriptorProto_TYPE_INT32, descriptorpb.FieldDescriptorProto_TYPE_SINT32, descriptorpb.FieldDescriptorProto_TYPE_SFIXED32: //int32
// message.SetField(field, gofakeit.Int32())
// case descriptorpb.FieldDescriptorProto_TYPE_UINT32, descriptorpb.FieldDescriptorProto_TYPE_FIXED32: //uint32
// message.SetField(field, gofakeit.Uint32())
// case descriptorpb.FieldDescriptorProto_TYPE_UINT64, descriptorpb.FieldDescriptorProto_TYPE_FIXED64: //uint64
// message.SetField(field, gofakeit.Uint64())
// case descriptorpb.FieldDescriptorProto_TYPE_BOOL: //bool
// message.SetField(field, gofakeit.Bool())
// case descriptorpb.FieldDescriptorProto_TYPE_STRING: //string
// message.SetField(field, gofakeit.LetterN(5))
// case descriptorpb.FieldDescriptorProto_TYPE_BYTES: //bytes
// message.SetField(field, []byte(gofakeit.LetterN(5)))
// }
//
// }
//
// ctx.Response().Write(message)
// return nil
//}

View File

@@ -0,0 +1,13 @@
package http_mocking
type Config struct {
ResponseStatus int `json:"response_status" default:"200" label:"返回响应的 HTTP 状态码仅http路由有效"`
ContentType string `json:"content_type" label:"返回响应的 Header Content-Type" enum:"application/json"`
ResponseExample string `json:"response_example" format:"text" label:"返回响应的Body与jsonschema字段二选一"`
ResponseSchema string `json:"response_schema" format:"text" label:"指定响应的jsonschema对象"`
ResponseHeader map[string]string `json:"response_header" label:"响应头"`
}
const (
contentTypeJson = "application/json"
)

View File

@@ -0,0 +1,87 @@
package http_mocking
import (
"encoding/json"
"errors"
"fmt"
"github.com/eolinker/apinto/drivers"
grpc_descriptor "github.com/eolinker/apinto/grpc-descriptor"
"github.com/eolinker/eosc"
"github.com/eolinker/eosc/common/bean"
"github.com/eolinker/eosc/log"
"strings"
)
func check(v interface{}) (*Config, error) {
conf, err := drivers.Assert[Config](v)
if err != nil {
return nil, err
}
if conf.ResponseStatus < 100 {
conf.ResponseStatus = 200
}
if conf.ContentType == contentTypeJson {
if len(strings.TrimSpace(conf.ResponseSchema)) == 0 && len(strings.TrimSpace(conf.ResponseExample)) == 0 {
log.Errorf("mocking check schema is null && example is null ")
return nil, errors.New("param err")
}
if len(strings.TrimSpace(conf.ResponseExample)) > 0 {
var val interface{}
if err = json.Unmarshal([]byte(conf.ResponseExample), &val); err != nil {
log.Errorf("mocking check example Format err = %s example=%s", err.Error(), conf.ResponseExample)
return nil, errors.New("param err")
}
}
if len(strings.TrimSpace(conf.ResponseSchema)) > 0 {
var val interface{}
if err = json.Unmarshal([]byte(conf.ResponseSchema), &val); err != nil {
log.Errorf("mocking check Schema Format err = %s Schema=%s", err.Error(), conf.ResponseSchema)
return nil, errors.New("param err")
}
}
}
return conf, nil
}
func Create(id, name string, conf *Config, workers map[eosc.RequireId]eosc.IWorker) (eosc.IWorker, error) {
once.Do(func() {
bean.Autowired(&worker)
})
jsonSchema := make(map[string]interface{})
if conf.ResponseSchema != "" {
if err := json.Unmarshal([]byte(conf.ResponseSchema), &jsonSchema); err != nil {
log.Errorf("create mocking err=%s,jsonSchema=%s", err.Error(), conf.ResponseSchema)
return nil, err
}
}
return &Mocking{
WorkerBase: drivers.Worker(id, name),
handler: NewComplete(conf.ResponseStatus, conf.ContentType, conf.ResponseExample, jsonSchema, conf.ResponseHeader),
}, nil
}
func getDescSource(protobufID string) (grpc_descriptor.IDescriptor, error) {
w, ok := worker.Get(protobufID)
if ok {
v, vOk := w.(grpc_descriptor.IDescriptor)
if !vOk {
return nil, fmt.Errorf("invalid protobuf id: %s", protobufID)
}
return v, nil
}
return nil, fmt.Errorf("protobuf worker(%s) is not exist", protobufID)
}

View File

@@ -0,0 +1,25 @@
package http_mocking
import (
"sync"
"github.com/eolinker/apinto/drivers"
"github.com/eolinker/eosc"
)
const (
Name = "http-mocking"
)
var (
once = sync.Once{}
worker eosc.IWorkers
)
func Register(register eosc.IExtenderDriverRegister) {
register.RegisterExtenderDriver(Name, NewFactory())
}
func NewFactory() eosc.IExtenderDriverFactory {
return drivers.NewFactory[Config](Create)
}

View File

@@ -0,0 +1,80 @@
package http_mocking
import (
"encoding/json"
"github.com/eolinker/apinto/drivers"
"github.com/eolinker/eosc"
"github.com/eolinker/eosc/eocontext"
http_context "github.com/eolinker/eosc/eocontext/http-context"
log "github.com/eolinker/eosc/log"
)
var _ eocontext.IFilter = (*Mocking)(nil)
var _ http_context.HttpFilter = (*Mocking)(nil)
type Mocking struct {
drivers.WorkerBase
responseStatus int
contentType string
responseExample string
responseSchema string
responseHeader map[string]string
handler eocontext.CompleteHandler
}
func (m *Mocking) DoHttpFilter(ctx http_context.IHttpContext, next eocontext.IChain) (err error) {
if m.handler != nil {
ctx.SetCompleteHandler(m.handler)
}
if next != nil {
return next.DoChain(ctx)
}
return nil
}
func (m *Mocking) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) (err error) {
return http_context.DoHttpFilter(m, ctx, next)
}
func (m *Mocking) Destroy() {
return
}
func (m *Mocking) Start() error {
return nil
}
func (m *Mocking) Reset(v interface{}, workers map[eosc.RequireId]eosc.IWorker) error {
conf, err := check(v)
if err != nil {
return err
}
m.responseSchema = conf.ResponseSchema
m.responseExample = conf.ResponseExample
m.contentType = conf.ContentType
m.responseStatus = conf.ResponseStatus
m.responseHeader = conf.ResponseHeader
jsonSchema := make(map[string]interface{})
if conf.ResponseSchema != "" {
if err = json.Unmarshal([]byte(conf.ResponseSchema), &jsonSchema); err != nil {
log.Errorf("create mocking err=%s,jsonSchema=%s", err.Error(), conf.ResponseSchema)
return err
}
}
m.handler = NewComplete(m.responseStatus, m.contentType, m.responseExample, jsonSchema, m.responseHeader)
return nil
}
func (m *Mocking) Stop() error {
return nil
}
func (m *Mocking) CheckSkill(skill string) bool {
return true
}

View File

@@ -0,0 +1,71 @@
package proxy_mirror
import (
"github.com/eolinker/apinto/utils"
"strings"
)
type Config struct {
Addr string `json:"Addr" label:"服务地址" description:"镜像服务地址, 需要包含scheme"`
SampleConf *SampleConfig `json:"sample_conf" label:"采样配置"`
Timeout int `json:"timeout" label:"请求超时时间"`
PassHost string `json:"pass_host" enum:"pass,node,rewrite" default:"pass" label:"转发域名" description:"请求发给上游时的 host 设置选型pass:将客户端的 host 透传给上游node:使用addr中配置的hostrewrite:使用下面指定的host值"`
Host string `json:"host" label:"新host" description:"指定上游请求的host只有在 转发域名 配置为 rewrite 时有效" switch:"pass_host==='rewrite'"`
}
type SampleConfig struct {
RandomRange int `json:"random_range" label:"随机数范围"`
RandomPivot int `json:"random_pivot" label:"随机数锚点"`
}
const (
modePass = "pass"
modeNode = "node"
modeRewrite = "rewrite"
)
func (c *Config) doCheck() error {
//校验addr
if !utils.IsMatchSchemeIpPort(c.Addr) && !utils.IsMatchSchemeDomainPort(c.Addr) {
return errAddr
}
//scheme小写
schemeIdx := strings.Index(c.Addr, "://")
c.Addr = strings.ToLower(c.Addr[:schemeIdx]) + c.Addr[schemeIdx:]
//校验采样配置
if c.SampleConf.RandomRange <= 0 {
return errRandomRangeNum
}
if c.SampleConf.RandomPivot <= 0 {
return errRandomPivotNum
}
if c.SampleConf.RandomPivot > c.SampleConf.RandomRange {
return errRandomPivot
}
//校验镜像请求超时时间
if c.Timeout < 0 {
c.Timeout = 3000
}
//校验passHost
switch c.PassHost {
case modePass:
case modeNode:
case modeRewrite:
default:
return errUnsupportedPassHost
}
//校验host
if c.PassHost == modeRewrite {
if c.Host == "" {
return errHostNull
} else if !utils.IsMatchIpPort(c.Host) && !utils.IsMatchDomainPort(c.Host) {
return errAddr
}
}
return nil
}

View File

@@ -0,0 +1,42 @@
package proxy_mirror
import (
"github.com/eolinker/apinto/drivers"
"github.com/eolinker/eosc"
"time"
)
func Check(v *Config, workers map[eosc.RequireId]eosc.IWorker) error {
return v.doCheck()
}
func check(v interface{}) (*Config, error) {
conf, err := drivers.Assert[Config](v)
if err != nil {
return nil, err
}
err = conf.doCheck()
if err != nil {
return nil, err
}
return conf, nil
}
func Create(id, name string, conf *Config, workers map[eosc.RequireId]eosc.IWorker) (eosc.IWorker, error) {
err := conf.doCheck()
if err != nil {
return nil, err
}
pm := &proxyMirror{
WorkerBase: drivers.Worker(id, name),
randomRange: conf.SampleConf.RandomRange,
randomPivot: conf.SampleConf.RandomPivot,
service: newMirrorService(conf.Addr, conf.PassHost, conf.Host, time.Duration(conf.Timeout)),
}
return pm, nil
}

View File

@@ -0,0 +1,15 @@
package proxy_mirror
import "github.com/pkg/errors"
var (
errUnsupportedContextType = errors.New("send mirror proxy fail. Unsupported Context Type")
errHostNull = errors.New("host can't be null when pass_host is rewrite. ")
errUnsupportedPassHost = errors.New("unsupported pass_host. ")
errRandomRangeNum = errors.New("random_range should be bigger than 0. ")
errRandomPivotNum = errors.New("random_pivot should be bigger than 0. ")
errRandomPivot = errors.New("random_pivot should be smaller than random_range. ")
errAddr = errors.New("addr is illegal. ")
)

View File

@@ -0,0 +1,18 @@
package proxy_mirror
import (
"github.com/eolinker/apinto/drivers"
"github.com/eolinker/eosc"
)
const (
Name = "proxy_mirror"
)
func Register(register eosc.IExtenderDriverRegister) {
register.RegisterExtenderDriver(Name, NewFactory())
}
func NewFactory() eosc.IExtenderDriverFactory {
return drivers.NewFactory[Config](Create, Check)
}

View File

@@ -0,0 +1,45 @@
package proxy_mirror
import (
"github.com/eolinker/eosc/eocontext"
"github.com/eolinker/eosc/log"
)
type proxyMirrorCompleteHandler struct {
orgComplete eocontext.CompleteHandler
service *mirrorService
}
func newMirrorHandler(eoCtx eocontext.EoContext, service *mirrorService) (eocontext.CompleteHandler, error) {
handler := &proxyMirrorCompleteHandler{
orgComplete: eoCtx.GetComplete(),
service: service,
}
return handler, nil
}
func (p *proxyMirrorCompleteHandler) Complete(ctx eocontext.EoContext) error {
cloneCtx, err := ctx.Clone()
//先执行原始Complete, 再执行镜像请求的Complete
orgErr := p.orgComplete.Complete(ctx)
if err != nil {
log.Warn(err)
return orgErr
}
cloneCtx.SetApp(p.service)
cloneCtx.SetBalance(p.service)
cloneCtx.SetUpstreamHostHandler(p.service)
go func() {
err = p.orgComplete.Complete(cloneCtx)
if err != nil {
log.Error(err)
}
}()
return orgErr
}

View File

@@ -0,0 +1,83 @@
package proxy_mirror
import (
"errors"
"fmt"
"github.com/eolinker/apinto/discovery"
"github.com/eolinker/eosc/eocontext"
"strconv"
"strings"
"time"
)
var (
errNoValidNode = errors.New("no valid node")
)
type mirrorService struct {
scheme string
passHost eocontext.PassHostMod
host string
timeout time.Duration
nodes []eocontext.INode
}
func newMirrorService(target, passHost, host string, timeout time.Duration) *mirrorService {
labels := map[string]string{}
idx := strings.Index(target, "://")
scheme := target[:idx]
addr := target[idx+3:]
idx = strings.Index(addr, ":")
ip := addr
port := 0
if idx > 0 {
ip = addr[:idx]
portStr := addr[idx+1:]
port, _ = strconv.Atoi(portStr)
}
inode := discovery.NewNode(labels, fmt.Sprintf("%s:%d", ip, port), ip, port)
var mode eocontext.PassHostMod
switch passHost {
case modePass:
mode = eocontext.PassHost
case modeNode:
mode = eocontext.NodeHost
case modeRewrite:
mode = eocontext.ReWriteHost
}
return &mirrorService{
scheme: scheme,
passHost: mode,
host: host,
timeout: timeout,
nodes: []eocontext.INode{inode},
}
}
func (m *mirrorService) Nodes() []eocontext.INode {
return m.nodes
}
func (m *mirrorService) Scheme() string {
return m.scheme
}
func (m *mirrorService) TimeOut() time.Duration {
return m.timeout
}
func (m *mirrorService) PassHost() (eocontext.PassHostMod, string) {
return m.passHost, m.host
}
func (m *mirrorService) Select(ctx eocontext.EoContext) (eocontext.INode, error) {
if len(m.nodes) < 1 {
return nil, errNoValidNode
}
return m.nodes[0], nil
}

View File

@@ -0,0 +1,78 @@
package proxy_mirror
import (
"github.com/eolinker/apinto/drivers"
"github.com/eolinker/eosc"
"github.com/eolinker/eosc/eocontext"
http_service "github.com/eolinker/eosc/eocontext/http-context"
"github.com/eolinker/eosc/log"
"math/rand"
"time"
)
var _ eocontext.IFilter = (*proxyMirror)(nil)
type proxyMirror struct {
drivers.WorkerBase
randomRange int
randomPivot int
service *mirrorService
}
func (p *proxyMirror) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) error {
//进行采样, 生成随机数判断
rand.Seed(time.Now().UnixNano())
randomNum := rand.Intn(p.randomRange + 1) //[0,range]范围内整型
if randomNum <= p.randomPivot { //若随机数在[0,pivot]范围内则进行转发
setMirrorProxy(p.service, ctx)
}
if next != nil {
return next.DoChain(ctx)
}
return nil
}
func setMirrorProxy(service *mirrorService, ctx eocontext.EoContext) {
//先判断当前Ctx是否能Copy
if !ctx.IsCloneable() {
log.Info(errUnsupportedContextType)
return
}
//给ctx设置新的FinishHandler
newCompleteHandler, err := newMirrorHandler(ctx, service)
if err != nil {
log.Info(err)
return
}
ctx.SetCompleteHandler(newCompleteHandler)
}
func (p *proxyMirror) Start() error {
return nil
}
func (p *proxyMirror) Reset(v interface{}, workers map[eosc.RequireId]eosc.IWorker) error {
conf, err := check(v)
if err != nil {
return err
}
p.service = newMirrorService(conf.Addr, conf.PassHost, conf.Host, time.Duration(conf.Timeout))
p.randomRange = conf.SampleConf.RandomRange
p.randomPivot = conf.SampleConf.RandomPivot
return nil
}
func (p *proxyMirror) Stop() error {
return nil
}
func (p *proxyMirror) Destroy() {
}
func (p *proxyMirror) CheckSkill(skill string) bool {
return http_service.FilterSkillName == skill
}

View File

@@ -4,6 +4,7 @@ import (
"github.com/eolinker/apinto/drivers"
"github.com/eolinker/apinto/utils"
"github.com/eolinker/eosc/eocontext"
"github.com/eolinker/eosc/log"
"strconv"
"github.com/eolinker/eosc"
@@ -66,9 +67,12 @@ func (r *ResponseRewrite) CheckSkill(skill string) bool {
return http_service.FilterSkillName == skill
}
func (r *ResponseRewrite) DoHttpFilter(ctx http_service.IHttpContext, next eocontext.IChain) (err error) {
func (r *ResponseRewrite) DoHttpFilter(ctx http_service.IHttpContext, next eocontext.IChain) error {
if next != nil {
err = next.DoChain(ctx)
err := next.DoChain(ctx)
if err != nil {
log.Error(err)
}
}
return r.rewrite(ctx)

View File

@@ -17,12 +17,10 @@ var (
)
type HttpComplete struct {
retry int
timeOut time.Duration
}
func NewHttpComplete(retry int, timeOut time.Duration) *HttpComplete {
return &HttpComplete{retry: retry, timeOut: timeOut}
func NewHttpComplete() *HttpComplete {
return &HttpComplete{}
}
func (h *HttpComplete) Complete(org eocontext.EoContext) error {
@@ -56,9 +54,22 @@ func (h *HttpComplete) Complete(org eocontext.EoContext) error {
}
timeOut := app.TimeOut()
for index := 0; index <= h.retry; index++ {
if h.timeOut > 0 && time.Now().Sub(proxyTime) > h.timeOut {
retryValue := ctx.Value(http_service.KeyHttpRetry)
retry, ok := retryValue.(int)
if !ok {
retry = 1
}
timeoutValue := ctx.Value(http_service.KeyHttpTimeout)
timeout, ok := timeoutValue.(time.Duration)
if !ok {
timeout = 3000 * time.Millisecond
}
for index := 0; index <= retry; index++ {
if timeout > 0 && time.Now().Sub(proxyTime) > timeout {
return ErrorTimeoutComplete
}
node, err := balance.Select(ctx)

View File

@@ -2,6 +2,7 @@ package http_router
import (
"net/http"
"time"
http_service "github.com/eolinker/apinto/node/http-context"
@@ -26,10 +27,12 @@ type httpHandler struct {
filters eocontext.IChainPro
disable bool
websocket bool
retry int
timeout time.Duration
}
func (h *httpHandler) ServeHTTP(ctx eocontext.EoContext) {
httpContext, err := http_context.Assert(ctx)
if err != nil {
return
@@ -50,6 +53,9 @@ func (h *httpHandler) ServeHTTP(ctx eocontext.EoContext) {
}
ctx = wsCtx
}
//set retry timeout
ctx.WithValue(http_context.KeyHttpRetry, h.retry)
ctx.WithValue(http_context.KeyHttpTimeout, h.timeout)
//Set Label
ctx.SetLabel("api", h.routerName)

View File

@@ -60,6 +60,8 @@ func (h *HttpRouter) reset(cfg *Config, workers map[eosc.RequireId]eosc.IWorker)
finisher: defaultFinisher,
disable: cfg.Disable,
websocket: cfg.Websocket,
retry: cfg.Retry,
timeout: time.Duration(cfg.TimeOut) * time.Millisecond,
}
if !cfg.Disable {
@@ -94,7 +96,7 @@ func (h *HttpRouter) reset(cfg *Config, workers map[eosc.RequireId]eosc.IWorker)
handler.completeHandler = websocket.NewComplete(cfg.Retry, time.Duration(cfg.TimeOut)*time.Millisecond)
methods = []string{http.MethodGet}
} else {
handler.completeHandler = http_complete.NewHttpComplete(cfg.Retry, time.Duration(cfg.TimeOut)*time.Millisecond)
handler.completeHandler = http_complete.NewHttpComplete()
}
}
}

3
go.mod
View File

@@ -4,6 +4,7 @@ go 1.19
require (
github.com/Shopify/sarama v1.32.0
github.com/brianvoe/gofakeit/v6 v6.20.1
github.com/coocood/freecache v1.2.2
github.com/dubbogo/gost v1.13.1
github.com/eolinker/eosc v0.10.1
@@ -197,4 +198,4 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect
)
//replace github.com/eolinker/eosc => ../eosc
replace github.com/eolinker/eosc => ../eosc

View File

@@ -290,6 +290,15 @@ func (d *DubboContext) LocalPort() int {
return d.port
}
func (d *DubboContext) IsCloneable() bool {
return false
}
func (d *DubboContext) Clone() (eocontext.EoContext, error) {
//TODO
return nil, fmt.Errorf("%s %w", "DubboContext", eocontext.ErrEoCtxUnCloneable)
}
func addrToIP(addr net.Addr) net.IP {
x, ok := addr.(*net.TCPAddr)
if !ok {

View File

@@ -260,3 +260,12 @@ func (c *Context) reset() {
pool.Put(c)
}
func (c *Context) IsCloneable() bool {
return false
}
func (c *Context) Clone() (eocontext.EoContext, error) {
//TODO
return nil, fmt.Errorf("%s %w", "GrpcContext", eocontext.ErrEoCtxUnCloneable)
}

View File

@@ -2,13 +2,12 @@ package http_context
import (
"bytes"
"io"
"strings"
http_context "github.com/eolinker/eosc/eocontext/http-context"
"github.com/valyala/fasthttp"
"io/ioutil"
"mime"
"mime/multipart"
"net/url"
@@ -227,7 +226,7 @@ func (b *BodyRequestHandler) resetFile() error {
return err
}
data, err := ioutil.ReadAll(fio)
data, err := io.ReadAll(fio)
if err != nil {
fio.Close()
return err
@@ -288,6 +287,5 @@ func (b *BodyRequestHandler) SetForm(values url.Values) error {
func (b *BodyRequestHandler) SetRaw(contentType string, body []byte) {
b.request.SetBodyRaw(body)
b.request.Header.SetContentType(contentType)
return
}

201
node/http-context/clone.go Normal file
View File

@@ -0,0 +1,201 @@
package http_context
import (
"context"
"fmt"
"net"
"time"
"github.com/eolinker/eosc/utils/config"
fasthttp_client "github.com/eolinker/apinto/node/fasthttp-client"
eoscContext "github.com/eolinker/eosc/eocontext"
http_service "github.com/eolinker/eosc/eocontext/http-context"
)
var _ http_service.IHttpContext = (*cloneContext)(nil)
// HttpContext fasthttpRequestCtx
type cloneContext struct {
org *HttpContext
proxyRequest ProxyRequest
response Response
proxyRequests []http_service.IProxy
ctx context.Context
completeHandler eoscContext.CompleteHandler
finishHandler eoscContext.FinishHandler
app eoscContext.EoApp
balance eoscContext.BalanceHandler
upstreamHostHandler eoscContext.UpstreamHostHandler
labels map[string]string
responseError error
}
func (ctx *cloneContext) GetUpstreamHostHandler() eoscContext.UpstreamHostHandler {
return ctx.upstreamHostHandler
}
func (ctx *cloneContext) SetUpstreamHostHandler(handler eoscContext.UpstreamHostHandler) {
ctx.upstreamHostHandler = handler
}
func (ctx *cloneContext) LocalIP() net.IP {
return ctx.org.LocalIP()
}
func (ctx *cloneContext) LocalAddr() net.Addr {
return ctx.org.LocalAddr()
}
func (ctx *cloneContext) LocalPort() int {
return ctx.org.LocalPort()
}
func (ctx *cloneContext) GetApp() eoscContext.EoApp {
return ctx.app
}
func (ctx *cloneContext) SetApp(app eoscContext.EoApp) {
ctx.app = app
}
func (ctx *cloneContext) GetBalance() eoscContext.BalanceHandler {
return ctx.balance
}
func (ctx *cloneContext) SetBalance(handler eoscContext.BalanceHandler) {
ctx.balance = handler
}
func (ctx *cloneContext) SetLabel(name, value string) {
ctx.labels[name] = value
}
func (ctx *cloneContext) GetLabel(name string) string {
return ctx.labels[name]
}
func (ctx *cloneContext) Labels() map[string]string {
return ctx.labels
}
func (ctx *cloneContext) GetComplete() eoscContext.CompleteHandler {
return ctx.completeHandler
}
func (ctx *cloneContext) SetCompleteHandler(handler eoscContext.CompleteHandler) {
ctx.completeHandler = handler
}
func (ctx *cloneContext) GetFinish() eoscContext.FinishHandler {
return ctx.finishHandler
}
func (ctx *cloneContext) SetFinish(handler eoscContext.FinishHandler) {
ctx.finishHandler = handler
}
func (ctx *cloneContext) Scheme() string {
return ctx.org.Scheme()
}
func (ctx *cloneContext) Assert(i interface{}) error {
if v, ok := i.(*http_service.IHttpContext); ok {
*v = ctx
return nil
}
return fmt.Errorf("not suport:%s", config.TypeNameOf(i))
}
func (ctx *cloneContext) Proxies() []http_service.IProxy {
return ctx.proxyRequests
}
func (ctx *cloneContext) Response() http_service.IResponse {
return &ctx.response
}
func (ctx *cloneContext) SendTo(address string, timeout time.Duration) error {
scheme, host := readAddress(address)
request := ctx.proxyRequest.Request()
passHost, targetHost := ctx.GetUpstreamHostHandler().PassHost()
switch passHost {
case eoscContext.PassHost:
case eoscContext.NodeHost:
request.URI().SetHost(host)
case eoscContext.ReWriteHost:
request.URI().SetHost(targetHost)
}
beginTime := time.Now()
ctx.responseError = fasthttp_client.ProxyTimeout(address, request, ctx.response.Response, timeout)
agent := newRequestAgent(&ctx.proxyRequest, host, scheme, beginTime, time.Now())
if ctx.responseError != nil {
agent.setStatusCode(504)
} else {
agent.setStatusCode(ctx.response.Response.StatusCode())
}
agent.setResponseLength(ctx.response.Response.Header.ContentLength())
ctx.proxyRequests = append(ctx.proxyRequests, agent)
return ctx.responseError
}
func (ctx *cloneContext) Context() context.Context {
return ctx.ctx
}
func (ctx *cloneContext) AcceptTime() time.Time {
return ctx.org.AcceptTime()
}
func (ctx *cloneContext) Value(key interface{}) interface{} {
return ctx.ctx.Value(key)
}
func (ctx *cloneContext) WithValue(key, val interface{}) {
ctx.ctx = context.WithValue(ctx.Context(), key, val)
}
func (ctx *cloneContext) Proxy() http_service.IRequest {
return &ctx.proxyRequest
}
func (ctx *cloneContext) Request() http_service.IRequestReader {
return ctx.org.Request()
}
func (ctx *cloneContext) IsCloneable() bool {
return false
}
func (ctx *cloneContext) Clone() (eoscContext.EoContext, error) {
return nil, fmt.Errorf("%s %w", "HttpContext", eoscContext.ErrEoCtxUnCloneable)
}
// RequestId 请求ID
func (ctx *cloneContext) RequestId() string {
return ctx.org.requestID
}
// Finish finish
func (ctx *cloneContext) FastFinish() {
ctx.ctx = nil
ctx.app = nil
ctx.balance = nil
ctx.upstreamHostHandler = nil
ctx.finishHandler = nil
ctx.completeHandler = nil
ctx.proxyRequest.Finish()
}

View File

@@ -13,7 +13,7 @@ import (
eoscContext "github.com/eolinker/eosc/eocontext"
http_service "github.com/eolinker/eosc/eocontext/http-context"
uuid "github.com/google/uuid"
"github.com/google/uuid"
"github.com/valyala/fasthttp"
)
@@ -153,6 +153,7 @@ func (ctx *HttpContext) SendTo(address string, timeout time.Duration) error {
func (ctx *HttpContext) Context() context.Context {
if ctx.ctx == nil {
ctx.ctx = context.Background()
}
return ctx.ctx
@@ -179,6 +180,40 @@ func (ctx *HttpContext) Request() http_service.IRequestReader {
return &ctx.requestReader
}
func (ctx *HttpContext) IsCloneable() bool {
return true
}
func (ctx *HttpContext) Clone() (eoscContext.EoContext, error) {
copyContext := copyPool.Get().(*cloneContext)
copyContext.org = ctx
copyContext.proxyRequests = make([]http_service.IProxy, 0, 2)
req := fasthttp.AcquireRequest()
ctx.fastHttpRequestCtx.Request.CopyTo(req)
resp := fasthttp.AcquireResponse()
ctx.fastHttpRequestCtx.Response.CopyTo(resp)
copyContext.proxyRequest.reset(req, ctx.requestReader.remoteAddr)
copyContext.response.reset(resp)
copyContext.completeHandler = ctx.completeHandler
copyContext.finishHandler = ctx.finishHandler
cloneLabels := make(map[string]string, len(ctx.labels))
for k, v := range ctx.labels {
cloneLabels[k] = v
}
copyContext.labels = cloneLabels
//记录请求时间
copyContext.ctx = context.WithValue(ctx.Context(), http_service.KeyCloneCtx, true)
copyContext.WithValue(http_service.KeyHttpRetry, 0)
copyContext.WithValue(http_service.KeyHttpTimeout, time.Duration(0))
return copyContext, nil
}
// NewContext 创建Context
func NewContext(ctx *fasthttp.RequestCtx, port int) *HttpContext {
@@ -229,7 +264,7 @@ func (ctx *HttpContext) FastFinish() {
ctx.response.Finish()
ctx.fastHttpRequestCtx = nil
pool.Put(ctx)
return
}
func NotFound(ctx *HttpContext) {

View File

@@ -10,6 +10,9 @@ var (
pool = sync.Pool{
New: newContext,
}
copyPool = sync.Pool{
New: newCopyContext,
}
)
func newContext() interface{} {
@@ -17,3 +20,9 @@ func newContext() interface{} {
h.proxyRequests = make([]http_service.IProxy, 0, 5)
return h
}
func newCopyContext() interface{} {
h := new(cloneContext)
h.proxyRequests = make([]http_service.IProxy, 0, 5)
return h
}

View File

@@ -3,6 +3,7 @@ package http_context
import (
"bytes"
"fmt"
"github.com/eolinker/eosc/log"
http_service "github.com/eolinker/eosc/eocontext/http-context"
"github.com/valyala/fasthttp"
@@ -20,7 +21,10 @@ type ProxyRequest struct {
func (r *ProxyRequest) Finish() error {
fasthttp.ReleaseRequest(r.req)
r.RequestReader.Finish()
err := r.RequestReader.Finish()
if err != nil {
log.Warn(err)
}
return nil
}
func (r *ProxyRequest) Header() http_service.IHeaderWriter {

View File

@@ -3,6 +3,7 @@ package http_context
import (
"errors"
"fmt"
eoscContext "github.com/eolinker/eosc/eocontext"
"io"
"net"
"sync"
@@ -79,3 +80,12 @@ func (w *WebsocketContext) Assert(i interface{}) error {
}
return fmt.Errorf("not suport:%s", config.TypeNameOf(i))
}
func (w *WebsocketContext) IsCloneable() bool {
return false
}
func (w *WebsocketContext) Clone() (eoscContext.EoContext, error) {
//TODO
return nil, fmt.Errorf("%s %w", "WebsocketContext", eoscContext.ErrEoCtxUnCloneable)
}

View File

@@ -1,14 +1,8 @@
package utils
import (
"fmt"
"testing"
)
func TestAes(t *testing.T) {
key := Md5("open-api")
enValue := AES_CBC_Encrypt([]byte(Md5("Key123qaz:open-api")), []byte(key))
deValue := AES_CBC_Decrypt(enValue, []byte(key))
log.Debug(enValue)
log.Debug(string(deValue))
}

7
utils/float.go Normal file
View File

@@ -0,0 +1,7 @@
package utils
import "math/rand"
func RandFloats(min, max float64) float64 {
return min + rand.Float64()*(max-min)
}

10
utils/int.go Normal file
View File

@@ -0,0 +1,10 @@
package utils
import "math/rand"
func RandInt64(min, max int64) int64 {
if min >= max || min == 0 || max == 0 {
return max
}
return rand.Int63n(max-min) + min
}

View File

@@ -2,12 +2,18 @@ package utils
import (
"encoding/json"
"errors"
"fmt"
"github.com/brianvoe/gofakeit/v6"
"math/rand"
"strconv"
"strings"
"time"
"github.com/robertkrimen/otto"
)
//JSObjectToJSON 将js对象转为json
// JSObjectToJSON 将js对象转为json
func JSObjectToJSON(s string) ([]byte, error) {
vm := otto.New()
v, err := vm.Run(fmt.Sprintf(`
@@ -20,7 +26,7 @@ func JSObjectToJSON(s string) ([]byte, error) {
return []byte(v.String()), nil
}
//JSONUnmarshal 将json格式的s解码成v所需的json格式
// JSONUnmarshal 将json格式的s解码成v所需的json格式
func JSONUnmarshal(s, v interface{}) error {
data, err := json.Marshal(s)
if err != nil {
@@ -28,3 +34,314 @@ func JSONUnmarshal(s, v interface{}) error {
}
return json.Unmarshal(data, v)
}
// JsonSchemaMockJsUnmarshal
// 解析mockJs生成的json schema 并根据规则生成随机值
// 不是mockJs生成的json 走默认规则解析
func JsonSchemaMockJsUnmarshal(valueMap interface{}) interface{} {
rand.Seed(time.Now().UnixMicro())
gofakeit.Seed(time.Now().UnixMicro())
value, vOk := valueMap.(map[string]interface{})
if vOk {
switch valType := value["properties"].(type) {
case []interface{}:
resultMap := make(map[string]interface{})
for _, v := range valType {
if m, ok := v.(map[string]interface{}); ok {
name, nameOk := m["name"].(string)
if !nameOk {
return jsonSchemaFormat
}
template := m["template"]
if t, tOk := m["type"].(string); tOk {
rule, ruleOk := m["rule"].(map[string]interface{})
if !ruleOk || len(rule) == 0 {
if template != nil {
resultMap[name] = mockConstant(template)
continue
}
}
switch t {
case "string":
minVal, maxVal, _, _, err := getMinMaxDminDmax(rule)
if err != nil {
return err
}
randomNum := 0
if minVal > 0 && maxVal == 0 {
randomNum = int(minVal)
}
if minVal > 0 && maxVal > 0 {
randomNum = int(RandInt64(int64(minVal), int64(maxVal)))
}
if template != nil {
templateStr, sOk := template.(string)
if !sOk {
return jsonSchemaFormat
}
temp := ""
for i := 0; i < randomNum; i++ {
temp = temp + templateStr
}
resultMap[name] = temp
continue
}
case "number":
minVal, maxVal, dminVal, dmaxVal, err := getMinMaxDminDmax(rule)
if err != nil {
return err
}
randomValue := 0.0
if minVal > 0.0 && maxVal == 0.0 {
randomValue = minVal
randomValue += RandFloats(0, 1)
} else if minVal > 0.0 && maxVal > 0.0 {
randomValue = RandFloats(minVal, maxVal)
}
if randomValue == 0.0 {
resultMap[name] = template
continue
}
if dminVal > 0.0 && dmaxVal == 0.0 {
randomValue, _ = strconv.ParseFloat(strconv.FormatFloat(randomValue, 'f', int(dminVal), 64), 64)
} else if dminVal > 0.0 && dmaxVal > 0.0 {
floats := RandFloats(dminVal, dmaxVal)
randomValue, _ = strconv.ParseFloat(strconv.FormatFloat(randomValue, 'f', int(floats), 64), 64)
} else {
randomValue, _ = strconv.ParseFloat(strconv.FormatFloat(randomValue, 'f', 0, 64), 64)
}
resultMap[name] = randomValue
case "boolean":
resultMap[name] = gofakeit.Bool()
case "object":
templateMap, templateOk := template.(map[string]interface{})
if templateOk {
minVal, maxVal, _, _, err := getMinMaxDminDmax(rule)
if err != nil {
return err
}
randomNum := 0
if minVal > 0 && maxVal == 0 {
randomNum = int(minVal)
}
if minVal > 0 && maxVal > 0 {
randomNum = int(RandInt64(int64(minVal), int64(maxVal)))
}
tempMap := make(map[string]interface{})
i := 1
for key, val := range templateMap {
split := strings.Split(key, "|")
tempMap[split[0]] = mockConstant(val)
if i == randomNum {
break
}
i++
}
resultMap[name] = tempMap
}
case "array":
templateList, templateOk := template.([]interface{})
if templateOk {
minVal, maxVal, _, _, err := getMinMaxDminDmax(rule)
if err != nil {
return err
}
randomNum := 0
if minVal > 0.0 && maxVal == 0.0 {
randomNum = int(minVal)
}
if minVal > 0.0 && maxVal > 0.0 {
randomNum = int(RandInt64(int64(minVal), int64(maxVal)))
}
if randomNum == 1 {
if len(templateList) > 1 {
resultMap[name] = templateList[rand.Intn(len(templateList))]
continue
}
switch templateVal := templateList[0].(type) {
case map[string]interface{}:
tempMap := make(map[string]interface{})
for key, val := range templateVal {
split := strings.Split(key, "|")
tempMap[split[0]] = mockConstant(val)
}
resultMap[name] = tempMap
default:
resultMap[name] = templateVal
}
continue
}
tempList := make([]interface{}, 0)
for i := 0; i < randomNum; i++ {
for _, templateType := range templateList {
switch templateVal := templateType.(type) {
case map[string]interface{}:
tempMap := make(map[string]interface{})
for key, val := range templateVal {
split := strings.Split(key, "|")
tempMap[split[0]] = mockConstant(val)
}
tempList = append(tempList, tempMap)
default:
tempList = append(tempList, templateType)
}
}
}
resultMap[name] = tempList
}
}
}
}
}
return resultMap
}
}
return jsonSchemaUnmarshal(value)
}
func mockConstant(v interface{}) interface{} {
if templateStr, ok := v.(string); ok {
templateStr = strings.ToLower(templateStr)
switch templateStr {
case "@cname":
return gofakeit.Username()
case "@cfirst":
return gofakeit.FirstName()
case "@clast":
return gofakeit.LastName()
case "@name", "@name(true)":
return gofakeit.Name()
case "@first":
return gofakeit.FirstName()
case "@last":
return gofakeit.LastName()
case "@email":
return gofakeit.Email()
case "@ip":
return gofakeit.IPv4Address()
case "@zip":
return gofakeit.Address().Zip
case "@city", "@city(true)":
return gofakeit.Address().Address
case "@url":
return gofakeit.URL()
default:
return v
}
}
return v
}
var jsonSchemaFormat = errors.New("json schema format err")
func getMinMaxDminDmax(rule map[string]interface{}) (float64, float64, float64, float64, error) {
minVal := 0.0
min, minOk := rule["min"]
if minOk && min != nil {
mOk := false
minVal, mOk = min.(float64)
if !mOk {
return 0, 0, 0, 0, jsonSchemaFormat
}
}
maxVal := 0.0
max, maxOk := rule["max"]
if maxOk && max != nil {
mOk := false
maxVal, mOk = max.(float64)
if !mOk {
return 0, 0, 0, 0, jsonSchemaFormat
}
}
dminVal := 0.0
dmin, dminOk := rule["dmin"]
if dminOk && dmin != nil {
mOk := false
dminVal, mOk = dmin.(float64)
if !mOk {
return 0, 0, 0, 0, jsonSchemaFormat
}
}
dmaxVal := 0.0
dmax, dmaxOk := rule["dmax"]
if dmaxOk && dmax != nil {
mOk := false
dmaxVal, mOk = dmax.(float64)
if !mOk {
return 0, 0, 0, 0, jsonSchemaFormat
}
}
return minVal, maxVal, dminVal, dmaxVal, nil
}
func jsonSchemaUnmarshal(properties interface{}) interface{} {
propertiesMap, ok := properties.(map[string]interface{})
if !ok {
return jsonSchemaFormat
}
if val, ok := propertiesMap["example"]; ok {
return val
} else {
if t, tOk := propertiesMap["type"].(string); tOk {
switch t {
case "string":
return gofakeit.LetterN(10)
case "number":
return gofakeit.Float64()
case "integer":
return gofakeit.Int64()
case "boolean":
return gofakeit.Bool()
case "object":
propertiesMaps, pOk := propertiesMap["properties"].(map[string]interface{})
if !pOk {
return jsonSchemaFormat
}
resultMap := make(map[string]interface{})
for key, vProperties := range propertiesMaps {
resultMap[key] = jsonSchemaUnmarshal(vProperties)
}
return resultMap
case "array":
items, iOk := propertiesMap["items"].(map[string]interface{})
if !iOk {
return jsonSchemaFormat
}
resultList := make([]interface{}, 0)
resultList = append(resultList, jsonSchemaUnmarshal(items))
return resultList
}
}
return jsonSchemaFormat
}
}

1503
utils/json_test.go Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -7,11 +7,23 @@ const (
regexUrlPathStr = `^\/[a-zA-Z0-9\/_\-\.]*$`
//objectivesExp 校验0.5:0.1,0.9:0.001的格式
objectivesExp = `^((0\.[0-9]+)\:(0\.[0-9]+)(\,)?)+$`
// schemeIPPortExp scheme://IP:PORT
schemeIPPortExp = `^[a-zA-z]+://((2(5[0-5]|[0-4]\d))|[0-1]?\d{1,2})(\.((2(5[0-5]|[0-4]\d))|[0-1]?\d{1,2})){3}:[0-9]+$`
// schemeDomainPortExp scheme://域名或者域名:端口
schemeDomainPortExp = `^[a-zA-z]+://[a-zA-Z0-9][-a-zA-Z0-9]{0,62}(\.[a-zA-Z0-9][-a-zA-Z0-9]{0,62})+\.?(:[0-9]+)?$`
// domainPortExp IP:PORT或者IP
ipPortExp = `^((2(5[0-5]|[0-4]\d))|[0-1]?\d{1,2})(\.((2(5[0-5]|[0-4]\d))|[0-1]?\d{1,2})){3}(:[0-9]+)?$`
// domainPortExp 域名或者域名:端口
domainPortExp = `^[a-zA-Z0-9][-a-zA-Z0-9]{0,62}(\.[a-zA-Z0-9][-a-zA-Z0-9]{0,62})+\.?(:[0-9]+)?$`
)
var (
regexUrlPath = regexp.MustCompile(regexUrlPathStr)
objectivesRegexp = regexp.MustCompile(objectivesExp)
regexUrlPath = regexp.MustCompile(regexUrlPathStr)
objectivesRegexp = regexp.MustCompile(objectivesExp)
schemeIPPortReg = regexp.MustCompile(schemeIPPortExp)
schemeDomainPortReg = regexp.MustCompile(schemeDomainPortExp)
ipPortReg = regexp.MustCompile(ipPortExp)
domainPortReg = regexp.MustCompile(domainPortExp)
)
func CheckUrlPath(path string) bool {
@@ -22,3 +34,23 @@ func CheckUrlPath(path string) bool {
func CheckObjectives(objectives string) bool {
return objectivesRegexp.MatchString(objectives)
}
// IsMatchSchemeIpPort 判断字符串是否符合scheme://ip:port
func IsMatchSchemeIpPort(s string) bool {
return schemeIPPortReg.MatchString(s)
}
// IsMatchSchemeDomainPort 判断字符串是否符合 scheme://域名或者域名:port
func IsMatchSchemeDomainPort(s string) bool {
return schemeDomainPortReg.MatchString(s)
}
// IsMatchIpPort 判断字符串是否符合 ip:port或者ip
func IsMatchIpPort(s string) bool {
return ipPortReg.MatchString(s)
}
// IsMatchDomainPort 判断字符串是否符合 域名或者域名:port
func IsMatchDomainPort(s string) bool {
return domainPortReg.MatchString(s)
}