修复grpc获取响应存在截断的问题

This commit is contained in:
Liujian
2024-07-22 17:28:14 +08:00
parent a863b0477c
commit fd7d0bd775
14 changed files with 154 additions and 63 deletions

View File

@@ -3,10 +3,11 @@ package redis
import (
"context"
"fmt"
"strconv"
"time"
"github.com/go-redis/redis/v8"
redis "github.com/go-redis/redis/v8"
)
type Vector struct {

View File

@@ -23,15 +23,14 @@ type httpHandler struct {
routerName string
routerId string
serviceName string
finisher eocontext.FinishHandler
service service.IService
filters eocontext.IChainPro
disable bool
websocket bool
labels map[string]string
retry int
timeout time.Duration
finisher eocontext.FinishHandler
service service.IService
filters eocontext.IChainPro
disable bool
websocket bool
labels map[string]string
retry int
timeout time.Duration
}
func (h *httpHandler) Serve(ctx eocontext.EoContext) {
@@ -70,6 +69,7 @@ func (h *httpHandler) Serve(ctx eocontext.EoContext) {
ctx.SetLabel("service", h.serviceName)
if h.service != nil {
ctx.SetLabel("service_id", h.service.Id())
ctx.SetLabel("service_title", h.service.Title())
}
ctx.SetLabel("method", httpContext.Request().Method())

View File

@@ -9,16 +9,17 @@ import (
// Config service_http驱动配置
type Config struct {
Timeout int64 `json:"timeout" label:"请求超时时间" default:"2000" minimum:"1" description:"单位ms最小值1"`
Title string `json:"title" label:"标题"`
Timeout int64 `json:"timeout" label:"请求超时时间" default:"2000" minimum:"1" title:"单位ms最小值1"`
Retry int `json:"retry" label:"失败重试次数"`
Scheme string `json:"scheme" label:"请求协议" enum:"HTTP,HTTPS"`
Discovery eosc.RequireId `json:"discovery" required:"false" empty_label:"使用匿名上游" label:"服务发现" skill:"github.com/eolinker/apinto/discovery.discovery.IDiscovery"`
Service string `json:"service" required:"false" label:"服务名 or 配置" switch:"discovery !==''"`
Nodes []string `json:"nodes" label:"静态配置" switch:"discovery===''"`
Balance string `json:"balance" enum:"round-robin,ip-hash" label:"负载均衡算法"`
PassHost string `json:"pass_host" enum:"pass,node,rewrite" default:"pass" label:"转发域名" description:"请求发给上游时的 host 设置选型pass:将客户端的 host 透传给上游node:使用node中配置的hostrewrite:使用下面指定的host值"`
UpstreamHost string `json:"upstream_host" label:"上游host" description:"指定上游请求的host只有在 转发域名 配置为 rewrite 时有效" switch:"pass_host==='rewrite'"`
KeepSession bool `json:"keep_session" label:"会话保持" description:"同一用户session会被分配到同一台服务器上"`
PassHost string `json:"pass_host" enum:"pass,node,rewrite" default:"pass" label:"转发域名" title:"请求发给上游时的 host 设置选型pass:将客户端的 host 透传给上游node:使用node中配置的hostrewrite:使用下面指定的host值"`
UpstreamHost string `json:"upstream_host" label:"上游host" title:"指定上游请求的host只有在 转发域名 配置为 rewrite 时有效" switch:"pass_host==='rewrite'"`
KeepSession bool `json:"keep_session" label:"会话保持" title:"同一用户session会被分配到同一台服务器上"`
}
func (c *Config) String() string {

View File

@@ -24,8 +24,8 @@ var (
type Service struct {
eocontext.BalanceHandler
app discovery.IApp
app discovery.IApp
title string
scheme string
timeout time.Duration
@@ -40,7 +40,7 @@ func (s *Service) PassHost() (eocontext.PassHostMod, string) {
func (s *Service) Nodes() []eocontext.INode {
return s.app.Nodes()
}
func (s *Service) Scheme() string {
@@ -57,6 +57,7 @@ func (s *Service) Reset(conf interface{}, workers map[eosc.RequireId]eosc.IWorke
if !ok {
return fmt.Errorf("need %s,now %s", config.TypeNameOf((*Config)(nil)), config.TypeNameOf(conf))
}
s.title = data.Title
data.rebuild()
if reflect.DeepEqual(data, s.lastConfig) {
return nil
@@ -148,3 +149,7 @@ func compareArray[T comparable](a, b []T) bool {
}
return true
}
func (s *Service) Title() string {
return s.title
}

View File

@@ -2,6 +2,7 @@ package service
import (
"errors"
"github.com/eolinker/apinto/drivers"
"github.com/eolinker/apinto/service"
)
@@ -11,6 +12,7 @@ var (
ErrorInvalidDiscovery = errors.New("invalid Discovery")
)
var _ service.IService = (*serviceWorker)(nil)
type serviceWorker struct {

View File

@@ -77,6 +77,7 @@ func (a *tActuatorSet) Strategy(ctx eocontext.EoContext, next eocontext.IChain,
if ach.Assert(ctx) {
err := ach.Check(ctx, handlers, scalars)
if err != nil {
ctx.SetLabel("handler", "limiting")
return err
}

View File

@@ -2,12 +2,15 @@ package limiting_strategy
import (
"errors"
"strconv"
"github.com/eolinker/apinto/resources"
http_entry "github.com/eolinker/apinto/entries/http-entry"
"github.com/eolinker/apinto/utils/response"
"github.com/eolinker/eosc/eocontext"
http_service "github.com/eolinker/eosc/eocontext/http-context"
"github.com/eolinker/eosc/log"
"strconv"
)
func init() {
@@ -33,6 +36,30 @@ func (hd *actuatorHttp) Assert(ctx eocontext.EoContext) bool {
return true
}
func (hd *actuatorHttp) compareAndAddCount(ctx http_service.IHttpContext, vector resources.Vector, metricsValue, handlerName, period string, threshold int64, response response.IResponse) error {
if vector.Get(metricsValue) >= threshold {
setLimitingStrategyContent(ctx, handlerName, response)
log.DebugF("refuse by limiting strategy %s of %s query.", handlerName, period)
ctx.WithValue("is_block", true)
ctx.SetLabel("block_name", handlerName)
return ErrorLimitingRefuse
}
vector.Add(metricsValue, 1)
return nil
}
func (hd *actuatorHttp) compareAndAddLength(ctx http_service.IHttpContext, vector resources.Vector, metricsValue, handlerName, period string, threshold, contentLength int64, response response.IResponse) error {
if vector.Get(metricsValue) >= threshold {
setLimitingStrategyContent(ctx, handlerName, response)
log.DebugF("refuse by limiting strategy %s of %s traffic.", handlerName, period)
ctx.WithValue("is_block", true)
ctx.SetLabel("block_name", handlerName)
return ErrorLimitingRefuse
}
vector.Add(metricsValue, contentLength)
return nil
}
func (hd *actuatorHttp) Check(ctx eocontext.EoContext, handlers []*LimitingHandler, scalars *Scalars) error {
httpContext, err := http_service.Assert(ctx)
if err != nil {
@@ -51,51 +78,45 @@ func (hd *actuatorHttp) Check(ctx eocontext.EoContext, handlers []*LimitingHandl
metricsAlready.Add(key)
metricsValue := h.Metrics().Metrics(entry)
if h.query.Second > 0 && scalars.QuerySecond.Get(metricsValue) >= h.query.Second {
setLimitingStrategyContent(httpContext, h.Name(), h.Response())
log.DebugF("refuse by limiting strategy %s of second query ", h.Name())
return ErrorLimitingRefuse
if h.query.Second > 0 {
err = hd.compareAndAddCount(httpContext, scalars.QuerySecond, metricsValue, "second", h.name, h.query.Second, h.Response())
if err != nil {
return err
}
}
if h.query.Minute > 0 && scalars.QueryMinute.Get(metricsValue) >= h.query.Minute {
setLimitingStrategyContent(httpContext, h.Name(), h.Response())
log.DebugF("refuse by limiting strategy %s of minute query ", h.Name())
return ErrorLimitingRefuse
if h.query.Minute > 0 {
err = hd.compareAndAddCount(httpContext, scalars.QueryMinute, metricsValue, "minute", h.name, h.query.Minute, h.Response())
if err != nil {
return err
}
}
if h.query.Hour > 0 && scalars.QueryHour.Get(metricsValue) >= h.query.Hour {
setLimitingStrategyContent(httpContext, h.Name(), h.Response())
log.DebugF("refuse by limiting strategy %s of hour query ", h.Name())
return ErrorLimitingRefuse
}
if h.traffic.Second > 0 && scalars.TrafficsSecond.Get(metricsValue) >= h.traffic.Second {
setLimitingStrategyContent(httpContext, h.Name(), h.Response())
log.DebugF("refuse by limiting strategy %s of second traffic ", h.Name())
return ErrorLimitingRefuse
}
if h.traffic.Minute > 0 && scalars.TrafficsMinute.Get(metricsValue) >= h.traffic.Minute {
setLimitingStrategyContent(httpContext, h.Name(), h.Response())
log.DebugF("refuse by limiting strategy %s of minute traffic ", h.Name())
return ErrorLimitingRefuse
if h.query.Hour > 0 {
err = hd.compareAndAddCount(httpContext, scalars.QueryHour, metricsValue, "hour", h.name, h.query.Hour, h.Response())
if err != nil {
return err
}
}
if h.traffic.Hour > 0 && scalars.TrafficsHour.Get(metricsValue) >= h.traffic.Hour {
setLimitingStrategyContent(httpContext, h.Name(), h.Response())
log.DebugF("refuse by limiting strategy %s of hour traffic ", h.Name())
return ErrorLimitingRefuse
if h.traffic.Second > 0 {
err = hd.compareAndAddLength(httpContext, scalars.TrafficsSecond, metricsValue, "second", h.name, h.traffic.Second, contentLength, h.Response())
if err != nil {
return err
}
}
if h.traffic.Minute > 0 {
err = hd.compareAndAddLength(httpContext, scalars.TrafficsMinute, metricsValue, "minute", h.name, h.traffic.Minute, contentLength, h.Response())
if err != nil {
return err
}
}
scalars.QuerySecond.Add(metricsValue, 1)
scalars.QueryMinute.Add(metricsValue, 1)
scalars.QueryHour.Add(metricsValue, 1)
scalars.TrafficsSecond.Add(metricsValue, contentLength)
scalars.TrafficsMinute.Add(metricsValue, contentLength)
scalars.TrafficsHour.Add(metricsValue, contentLength)
if h.traffic.Hour > 0 {
err = hd.compareAndAddLength(httpContext, scalars.TrafficsHour, metricsValue, "hour", h.name, h.traffic.Hour, contentLength, h.Response())
if err != nil {
return err
}
}
}
}
return nil

View File

@@ -83,6 +83,7 @@ func (a *tActuator) Strategy(ctx eocontext.EoContext, next eocontext.IChain) err
handlers := a.handlers
a.lock.RUnlock()
pass := true
var name string
for _, handler := range handlers {
// 匹配Filter
if !handler.filter.Check(httpCtx) {
@@ -94,9 +95,11 @@ func (a *tActuator) Strategy(ctx eocontext.EoContext, next eocontext.IChain) err
if match {
// 匹配成功
pass = handler.rule.visit
name = handler.name
break
}
pass = !handler.rule.visit
name = handler.name
if handler.rule.isContinue {
continue
}
@@ -107,6 +110,8 @@ func (a *tActuator) Strategy(ctx eocontext.EoContext, next eocontext.IChain) err
httpCtx.Response().SetStatus(403, "")
errInfo := "not allowed"
httpCtx.Response().SetBody([]byte(errInfo))
ctx.WithValue("is_block", true)
ctx.SetLabel("block_name", name)
return errors.New(errInfo)
}
if next != nil {

View File

@@ -11,6 +11,8 @@ import (
"strings"
"time"
"github.com/eolinker/eosc/env"
"github.com/eolinker/apinto/utils/version"
"github.com/eolinker/apinto/utils"
@@ -34,6 +36,7 @@ func (f Fields) Read(name string, ctx http_service.IHttpContext) (interface{}, b
if has {
return r.Read("", ctx)
}
fs := strings.SplitN(name, "_", 2)
if len(fs) == 2 {
r, has = f[fs[0]]
@@ -41,6 +44,10 @@ func (f Fields) Read(name string, ctx http_service.IHttpContext) (interface{}, b
return r.Read(fs[1], ctx)
}
}
value := ctx.Value(name)
if value != nil {
return value, true
}
label := ctx.GetLabel(name)
if label != "" {
@@ -137,6 +144,48 @@ var (
return ctxRule.Read(name, ctx)
}),
"gateway_host": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) {
hosts, has := env.GetEnv("GATEWAY_ADVERTISE_HOSTS")
if !has {
return "", false
}
return strings.Split(hosts, ",")[0], true
}),
"gateway_hosts": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) {
hosts, has := env.GetEnv("GATEWAY_ADVERTISE_HOSTS")
if !has {
return "", false
}
return strings.Split(hosts, ","), true
}),
"peer_host": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) {
hosts, has := env.GetEnv("PEER_ADVERTISE_HOSTS")
if !has {
return "", false
}
return strings.Split(hosts, ",")[0], true
}),
"peer_hosts": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) {
hosts, has := env.GetEnv("PEER_ADVERTISE_HOSTS")
if !has {
return "", false
}
return strings.Split(hosts, ","), true
}),
"client_host": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) {
hosts, has := env.GetEnv("CLIENT_ADVERTISE_HOSTS")
if !has {
return "", false
}
return strings.Split(hosts, ",")[0], true
}),
"client_hosts": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) {
hosts, has := env.GetEnv("CLIENT_ADVERTISE_HOSTS")
if !has {
return "", false
}
return strings.Split(hosts, ","), true
}),
"request": Fields{
"body": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) {

4
go.mod
View File

@@ -32,7 +32,7 @@ require (
golang.org/x/crypto v0.21.0
golang.org/x/net v0.22.0
google.golang.org/grpc v1.61.0
google.golang.org/protobuf v1.33.1-0.20240408130810-98873a205002
google.golang.org/protobuf v1.34.2
)
require (
@@ -182,4 +182,4 @@ require (
replace github.com/soheilhy/cmux v0.1.5 => github.com/hmzzrcs/cmux v0.1.6
//replace github.com/eolinker/eosc => ../eosc
replace github.com/eolinker/eosc => ../eosc

View File

@@ -226,7 +226,7 @@ func (c *Context) doInvoke(address string, timeout time.Duration) error {
return err
}
c.proxy.Headers().Set("grpc-timeout", fmt.Sprintf("%dn", timeout))
//c.proxy.Headers().Set("grpc-timeout", fmt.Sprintf("%d", timeout))
clientCtx, _ := context.WithCancel(metadata.NewOutgoingContext(c.Context(), c.proxy.Headers().Copy()))
serverHeaders := &metadata.MD{}
serverTrailers := &metadata.MD{}

View File

@@ -33,6 +33,7 @@ type Request struct {
}
func (r *Request) SetHost(s string) {
r.headers.Set(":authority", s)
r.host = s
}

View File

@@ -3,15 +3,15 @@ package grpc_context
import (
"io"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/types/known/emptypb"
grpc_context "github.com/eolinker/eosc/eocontext/grpc-context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/anypb"
)
var (
@@ -72,7 +72,7 @@ func handlerStream(serverStream grpc.ServerStream, clientStream grpc.ClientStrea
func forwardClientToServer(src grpc.ClientStream, dst grpc.ServerStream) chan error {
ret := make(chan error, 1)
go func() {
f := &anypb.Any{}
// This is a bit of a hack, but client to server headers are only readable after first client msg is
// received but must be written to server stream before the first msg is flushed.
// This is the only place to do it nicely.
@@ -85,7 +85,10 @@ func forwardClientToServer(src grpc.ClientStream, dst grpc.ServerStream) chan er
ret <- err
return
}
f := &emptypb.Empty{}
for {
if err := src.RecvMsg(f); err != nil {
ret <- err // this can be io.EOF which is happy case
break
@@ -102,8 +105,9 @@ func forwardClientToServer(src grpc.ClientStream, dst grpc.ServerStream) chan er
func forwardServerToClient(src grpc.ServerStream, dst grpc.ClientStream) chan error {
ret := make(chan error, 1)
go func() {
f := &anypb.Any{}
f := &emptypb.Empty{}
for {
if err := src.RecvMsg(f); err != nil {
ret <- err // this can be io.EOF which is happy case
break

View File

@@ -14,6 +14,7 @@ type IService interface {
eoscContext.EoApp
eoscContext.BalanceHandler
eoscContext.UpstreamHostHandler
Title() string
}
// CheckSkill 检查目标技能是否符合