diff --git a/app/apinto/profession.go b/app/apinto/profession.go index 9dc03cd6..40f84d8a 100644 --- a/app/apinto/profession.go +++ b/app/apinto/profession.go @@ -231,5 +231,21 @@ func ApintoProfession() []*eosc.ProfessionConfig { }, Mod: eosc.ProfessionConfig_Worker, }, + { + Name: "transcode", + Label: "编码器", + Desc: "编码器", + Dependencies: nil, + AppendLabels: nil, + Drivers: []*eosc.DriverConfig{ + { + Id: "eolinker.com:apinto:protobuf_transcode", + Name: "protobuf", + Label: "protobuf编码器", + Desc: "protobuf编码器", + }, + }, + Mod: eosc.ProfessionConfig_Worker, + }, } } diff --git a/app/apinto/worker.go b/app/apinto/worker.go index 81f968b8..80dc71bf 100644 --- a/app/apinto/worker.go +++ b/app/apinto/worker.go @@ -21,6 +21,7 @@ import ( dubbo2_proxy_rewrite "github.com/eolinker/apinto/drivers/plugins/dubbo2-proxy-rewrite" dubbo2_to_http "github.com/eolinker/apinto/drivers/plugins/dubbo2-to-http" extra_params "github.com/eolinker/apinto/drivers/plugins/extra-params" + grpc_to_http "github.com/eolinker/apinto/drivers/plugins/gRPC-to-http" grpc_proxy_rewrite "github.com/eolinker/apinto/drivers/plugins/grpc-proxy-rewrite" "github.com/eolinker/apinto/drivers/plugins/gzip" http_to_dubbo2 "github.com/eolinker/apinto/drivers/plugins/http-to-dubbo2" @@ -48,8 +49,9 @@ import ( grey_strategy "github.com/eolinker/apinto/drivers/strategy/grey-strategy" limiting_strategy "github.com/eolinker/apinto/drivers/strategy/limiting-strategy" visit_strategy "github.com/eolinker/apinto/drivers/strategy/visit-strategy" - template "github.com/eolinker/apinto/drivers/template" + "github.com/eolinker/apinto/drivers/template" + "github.com/eolinker/apinto/drivers/transcode/protobuf" "github.com/eolinker/eosc" "github.com/eolinker/eosc/extends" process_worker "github.com/eolinker/eosc/process-worker" @@ -138,5 +140,6 @@ func Register(extenderRegister eosc.IExtenderDriverRegister) { dubbo2_to_http.Register(extenderRegister) http_to_grpc.Register(extenderRegister) - + protocbuf.Register(extenderRegister) + grpc_to_http.Register(extenderRegister) } diff --git a/drivers/plugins/dubbo2-to-http/factory.go b/drivers/plugins/dubbo2-to-http/factory.go index f0caedf6..2cf206ac 100644 --- a/drivers/plugins/dubbo2-to-http/factory.go +++ b/drivers/plugins/dubbo2-to-http/factory.go @@ -6,7 +6,7 @@ import ( ) const ( - Name = "dubbo2-to-http" + Name = "dubbo2_to_http" ) func Register(register eosc.IExtenderDriverRegister) { diff --git a/drivers/plugins/gRPC-to-http/complete.go b/drivers/plugins/gRPC-to-http/complete.go new file mode 100644 index 00000000..00163d04 --- /dev/null +++ b/drivers/plugins/gRPC-to-http/complete.go @@ -0,0 +1,175 @@ +package grpc_to_http + +import ( + "errors" + "fmt" + "net/url" + "strings" + "time" + + grpc_descriptor "github.com/eolinker/apinto/grpc-descriptor" + + "github.com/jhump/protoreflect/dynamic" + + "github.com/jhump/protoreflect/desc" + + "github.com/valyala/fasthttp" + + fasthttp_client "github.com/eolinker/apinto/node/fasthttp-client" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + grpc_context "github.com/eolinker/eosc/eocontext/grpc-context" + + "github.com/eolinker/eosc/log" + + "google.golang.org/grpc/metadata" + + "github.com/eolinker/eosc/eocontext" +) + +var ( + ErrorTimeoutComplete = errors.New("complete timeout") + + defaultTimeout = 10 * time.Second +) + +type complete struct { + descriptor grpc_descriptor.IDescriptor + headers map[string]string + rawQuery string + path string + retry int + timeout time.Duration +} + +func newComplete(descriptor grpc_descriptor.IDescriptor, conf *Config) *complete { + query := url.Values{} + for key, value := range conf.Query { + query.Set(key, value) + } + timeout := defaultTimeout + return &complete{ + descriptor: descriptor, + timeout: timeout, + rawQuery: query.Encode(), + path: conf.Path, + headers: conf.Headers, + } +} + +func (h *complete) Complete(org eocontext.EoContext) error { + proxyTime := time.Now() + ctx, err := grpc_context.Assert(org) + if err != nil { + return err + } + descriptor, err := h.descriptor.Descriptor().FindSymbol(fmt.Sprintf("%s.%s", ctx.Proxy().Service(), ctx.Proxy().Method())) + if err != nil { + return err + } + methodDesc := descriptor.GetFile().FindService(ctx.Proxy().Service()).FindMethodByName(ctx.Proxy().Method()) + message := ctx.Proxy().Message(methodDesc.GetInputType()) + + body, err := message.MarshalJSON() + if err != nil { + return err + } + + app := ctx.GetApp() + + scheme := app.Scheme() + switch strings.ToLower(app.Scheme()) { + case "", "tcp": + scheme = "http" + case "tsl", "ssl", "https": + scheme = "https" + + } + path := h.path + if path == "" { + path = fmt.Sprintf("/%s/%s", ctx.Proxy().Service(), ctx.Proxy().Method()) + } + request := newRequest(ctx.Proxy().Headers(), body, h.headers, path, h.rawQuery) + defer fasthttp.ReleaseRequest(request) + var lastErr error + timeOut := app.TimeOut() + balance := ctx.GetBalance() + for index := 0; index <= h.retry; index++ { + + if h.timeout > 0 && time.Now().Sub(proxyTime) > h.timeout { + return ErrorTimeoutComplete + } + node, err := balance.Select(ctx) + if err != nil { + return status.Error(codes.NotFound, err.Error()) + } + addr := node.Addr() + log.Debug("node: ", addr) + request.URI() + passHost, targetHost := ctx.GetUpstreamHostHandler().PassHost() + switch passHost { + case eocontext.PassHost: + request.URI().SetHost(strings.Join(ctx.Proxy().Headers().Get(":authority"), ",")) + case eocontext.NodeHost: + request.URI().SetHost(node.Addr()) + case eocontext.ReWriteHost: + request.URI().SetHost(targetHost) + } + response := fasthttp.AcquireResponse() + lastErr = fasthttp_client.ProxyTimeout(fmt.Sprintf("%s://%s", scheme, node.Addr()), request, response, timeOut) + if lastErr == nil { + return newGRPCResponse(ctx, response, methodDesc) + } + log.Error("http upstream send error: ", lastErr) + } + + return status.Error(codes.Internal, lastErr.Error()) +} + +func newGRPCResponse(ctx grpc_context.IGrpcContext, response *fasthttp.Response, methodDesc *desc.MethodDescriptor) error { + defer fasthttp.ReleaseResponse(response) + message := dynamic.NewMessage(methodDesc.GetOutputType()) + err := message.UnmarshalJSON(response.Body()) + if err != nil { + log.Debug("body is: ", string(response.Body())) + return status.Error(codes.InvalidArgument, err.Error()) + } + + ctx.Response().Write(message) + hs := strings.Split(response.Header.String(), "\r\n") + for _, t := range hs { + vs := strings.Split(t, ":") + if len(vs) < 2 { + if vs[0] == "" { + continue + } + ctx.Response().Headers().Set(vs[0], strings.TrimSpace("")) + continue + } + ctx.Response().Headers().Set(vs[0], strings.TrimSpace(vs[1])) + } + + return nil +} + +func newRequest(headers metadata.MD, body []byte, additionalHeader map[string]string, path, rawQuery string) *fasthttp.Request { + request := fasthttp.AcquireRequest() + for key, value := range headers { + if strings.ToLower(key) == "grpc-go" { + key = "user-agent" + } + for _, v := range value { + request.Header.Add(key, v) + } + } + for key, value := range additionalHeader { + request.Header.Add(key, value) + } + request.Header.Set("content-type", "application/json") + request.URI().SetPath(path) + request.URI().SetQueryString(rawQuery) + request.SetBody(body) + return request +} diff --git a/drivers/plugins/gRPC-to-http/config.go b/drivers/plugins/gRPC-to-http/config.go new file mode 100644 index 00000000..36658590 --- /dev/null +++ b/drivers/plugins/gRPC-to-http/config.go @@ -0,0 +1,11 @@ +package grpc_to_http + +import "github.com/eolinker/eosc" + +type Config struct { + Path string `json:"path" label:"请求路径"` + Method string `json:"method" label:"请求方式" enum:"POST,PUT,PATCH"` + ProtobufID eosc.RequireId `json:"protobuf_id" required:"true" label:"Protobuf ID" skill:"github.com/eolinker/apinto/grpc-transcode.transcode.IDescriptor"` + Headers map[string]string `json:"headers" label:"额外头部"` + Query map[string]string `json:"query" label:"query参数"` +} diff --git a/drivers/plugins/gRPC-to-http/driver.go b/drivers/plugins/gRPC-to-http/driver.go new file mode 100644 index 00000000..eeffa9fc --- /dev/null +++ b/drivers/plugins/gRPC-to-http/driver.go @@ -0,0 +1,52 @@ +package grpc_to_http + +import ( + "fmt" + + "github.com/eolinker/eosc/common/bean" + + grpc_descriptor "github.com/eolinker/apinto/grpc-descriptor" + + "github.com/eolinker/apinto/drivers" + "github.com/eolinker/eosc" +) + +func check(v interface{}) (*Config, error) { + conf, err := drivers.Assert[Config](v) + if err != nil { + return nil, err + } + if conf.ProtobufID == "" { + return nil, fmt.Errorf("protobuf id is empty") + } + + return conf, nil +} + +func Create(id, name string, conf *Config, workers map[eosc.RequireId]eosc.IWorker) (eosc.IWorker, error) { + once.Do(func() { + bean.Autowired(&worker) + }) + descSource, err := getDescSource(string(conf.ProtobufID)) + if err != nil { + return nil, err + } + return &toHttp{ + WorkerBase: drivers.Worker(id, name), + handler: newComplete(descSource, conf), + }, nil +} + +func getDescSource(protobufID string) (grpc_descriptor.IDescriptor, error) { + + w, ok := worker.Get(protobufID) + if ok { + v, ok := w.(grpc_descriptor.IDescriptor) + if !ok { + return nil, fmt.Errorf("invalid protobuf id: %s", protobufID) + } + return v, nil + } + return nil, fmt.Errorf("protobuf worker(%s) is not exist", protobufID) + +} diff --git a/drivers/plugins/gRPC-to-http/factory.go b/drivers/plugins/gRPC-to-http/factory.go new file mode 100644 index 00000000..a9096d75 --- /dev/null +++ b/drivers/plugins/gRPC-to-http/factory.go @@ -0,0 +1,34 @@ +package grpc_to_http + +import ( + "fmt" + "sync" + + "github.com/eolinker/apinto/drivers" + "github.com/eolinker/eosc" +) + +const ( + Name = "grpc_to_http" +) + +var ( + once = sync.Once{} + worker eosc.IWorkers +) + +func Register(register eosc.IExtenderDriverRegister) { + register.RegisterExtenderDriver(Name, NewFactory()) +} + +func NewFactory() eosc.IExtenderDriverFactory { + return drivers.NewFactory[Config](Create, Check) +} + +func Check(cfg *Config, workers map[eosc.RequireId]eosc.IWorker) error { + if cfg.ProtobufID == "" { + return fmt.Errorf("protobuf id is empty") + } + + return nil +} diff --git a/drivers/plugins/gRPC-to-http/tohttp.go b/drivers/plugins/gRPC-to-http/tohttp.go new file mode 100644 index 00000000..8e3fd247 --- /dev/null +++ b/drivers/plugins/gRPC-to-http/tohttp.go @@ -0,0 +1,59 @@ +package grpc_to_http + +import ( + "github.com/eolinker/apinto/drivers" + "github.com/eolinker/eosc" + "github.com/eolinker/eosc/eocontext" + grpc_context "github.com/eolinker/eosc/eocontext/grpc-context" + http_context "github.com/eolinker/eosc/eocontext/http-context" +) + +type toHttp struct { + drivers.WorkerBase + handler eocontext.CompleteHandler +} + +func (t *toHttp) DoGrpcFilter(ctx grpc_context.IGrpcContext, next eocontext.IChain) (err error) { + if t.handler != nil { + ctx.SetCompleteHandler(t.handler) + } + if next != nil { + return next.DoChain(ctx) + } + return nil +} + +func (t *toHttp) Start() error { + return nil +} + +func (t *toHttp) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) (err error) { + return grpc_context.DoGrpcFilter(t, ctx, next) +} + +func (t *toHttp) Destroy() { + t.handler = nil + return +} + +func (t *toHttp) Reset(conf interface{}, workers map[eosc.RequireId]eosc.IWorker) error { + cfg, err := check(conf) + if err != nil { + return err + } + descSource, err := getDescSource(string(cfg.ProtobufID)) + if err != nil { + return err + } + t.handler = newComplete(descSource, cfg) + return nil +} + +func (t *toHttp) Stop() error { + t.Destroy() + return nil +} + +func (t *toHttp) CheckSkill(skill string) bool { + return http_context.FilterSkillName == skill +} diff --git a/drivers/plugins/http-to-dubbo2/factory.go b/drivers/plugins/http-to-dubbo2/factory.go index c070e148..d1916697 100644 --- a/drivers/plugins/http-to-dubbo2/factory.go +++ b/drivers/plugins/http-to-dubbo2/factory.go @@ -6,7 +6,7 @@ import ( ) const ( - Name = "http-to-dubbo2" + Name = "http_to_dubbo2" ) func Register(register eosc.IExtenderDriverRegister) { diff --git a/drivers/plugins/http-to-gRPC/complete.go b/drivers/plugins/http-to-gRPC/complete.go index bdd909d3..88c886da 100644 --- a/drivers/plugins/http-to-gRPC/complete.go +++ b/drivers/plugins/http-to-gRPC/complete.go @@ -4,12 +4,13 @@ import ( "context" "crypto/tls" "encoding/json" - "errors" "fmt" "net/http" "strings" "time" + grpc_descriptor "github.com/eolinker/apinto/grpc-descriptor" + "google.golang.org/grpc/codes" "github.com/jhump/protoreflect/grpcreflect" @@ -27,16 +28,13 @@ import ( "github.com/fullstorydev/grpcurl" - "google.golang.org/grpc/metadata" - http_context "github.com/eolinker/eosc/eocontext/http-context" "github.com/eolinker/eosc/eocontext" ) var ( - ErrorTimeoutComplete = errors.New("complete timeout") - options = grpcurl.FormatOptions{ + options = grpcurl.FormatOptions{ AllowUnknownFields: true, } defaultTimeout = 10 * time.Second @@ -44,7 +42,7 @@ var ( type complete struct { format grpcurl.Format - descSource grpcurl.DescriptorSource + descriptor grpc_descriptor.IDescriptor timeout time.Duration authority string service string @@ -54,11 +52,11 @@ type complete struct { reflect bool } -func newComplete(descSource grpcurl.DescriptorSource, conf *Config) *complete { +func newComplete(descriptor grpc_descriptor.IDescriptor, conf *Config) *complete { timeout := defaultTimeout return &complete{ format: grpcurl.Format(conf.Format), - descSource: descSource, + descriptor: descriptor, timeout: timeout, authority: conf.Authority, service: conf.Service, @@ -68,6 +66,20 @@ func newComplete(descSource grpcurl.DescriptorSource, conf *Config) *complete { } } +func getSymbol(path string, service string, method string) string { + ps := strings.Split(strings.TrimPrefix(path, "/"), "/") + if service == "" { + service = ps[0] + } + if method == "" { + if len(ps) > 1 { + method = ps[1] + } + } + return fmt.Sprintf("%s/%s", service, method) + +} + func (h *complete) Complete(org eocontext.EoContext) error { ctx, err := http_context.Assert(org) @@ -84,11 +96,10 @@ func (h *complete) Complete(org eocontext.EoContext) error { app := ctx.GetApp() md := httpHeaderToMD(ctx.Proxy().Header().Headers(), h.headers) - + newCtx := ctx.Context() opts := genDialOpts(app.Scheme() == "https", h.authority) - newCtx := metadata.NewOutgoingContext(ctx.Context(), md) - symbol := fmt.Sprintf("%s/%s", h.service, h.method) + symbol := getSymbol(ctx.Proxy().URI().Path(), h.service, h.method) var lastErr error var conn *grpc.ClientConn for i := h.retry + 1; i > 0; i-- { @@ -102,15 +113,17 @@ func (h *complete) Complete(org eocontext.EoContext) error { log.Error("dial error: ", lastErr) continue } - descSource := h.descSource + var descSource grpcurl.DescriptorSource if h.reflect { refClient := grpcreflect.NewClientV1Alpha(newCtx, reflectpb.NewServerReflectionClient(conn)) refSource := grpcurl.DescriptorSourceFromServer(newCtx, refClient) - if h.descSource == nil { + if descSource == nil { descSource = refSource } else { - descSource = &compositeSource{reflection: refSource, file: h.descSource} + descSource = &compositeSource{reflection: refSource, file: descSource} } + } else { + descSource = h.descriptor.Descriptor() } rf, formatter, err := grpcurl.RequestParserAndFormatter(h.format, descSource, in, options) @@ -119,10 +132,11 @@ func (h *complete) Complete(org eocontext.EoContext) error { } response := NewResponse() handler := &grpcurl.DefaultEventHandler{ - Out: response, - Formatter: formatter, + VerbosityLevel: 2, + Out: response, + Formatter: formatter, } - err = grpcurl.InvokeRPC(newCtx, descSource, conn, symbol, []string{}, handler, rf.Next) + err = grpcurl.InvokeRPC(newCtx, descSource, conn, symbol, md, handler, rf.Next) if err != nil { if errStatus, ok := status.FromError(err); ok { data, _ := json.Marshal(StatusErr{ @@ -141,6 +155,10 @@ func (h *complete) Complete(org eocontext.EoContext) error { ctx.Response().SetBody(data) return err } + for key, value := range response.Header() { + ctx.Response().SetHeader(key, value) + } + ctx.Response().SetHeader("content-type", "application/json") ctx.Response().SetBody(response.Body()) return nil } @@ -152,21 +170,25 @@ type StatusErr struct { Msg string `json:"msg"` } -func httpHeaderToMD(headers http.Header, additionalHeader map[string]string) metadata.MD { - md := metadata.New(map[string]string{}) +func httpHeaderToMD(headers http.Header, additionalHeader map[string]string) []string { + headers.Set("content-type", "application/grpc") + headers.Del("connection") + md := make([]string, len(headers)+len(additionalHeader)) + //md := metadata.New(map[string]string{}) for key, value := range headers { if strings.ToLower(key) == "user-agent" { - md.Set("grpc-go", value...) + for _, v := range value { + md = append(md, fmt.Sprintf("%s: %s", key, v)) + } continue } - - md.Set(key, value...) + for _, v := range value { + md = append(md, fmt.Sprintf("%s: %s", key, v)) + } } for key, value := range additionalHeader { - md.Set(key, value) + md = append(md, fmt.Sprintf("%s: %s", key, value)) } - md.Set("content-type", "application/grpc") - md.Delete("connection") return md } diff --git a/drivers/plugins/http-to-gRPC/driver.go b/drivers/plugins/http-to-gRPC/driver.go index 5c454220..31567779 100644 --- a/drivers/plugins/http-to-gRPC/driver.go +++ b/drivers/plugins/http-to-gRPC/driver.go @@ -3,10 +3,11 @@ package http_to_grpc import ( "fmt" + "github.com/eolinker/eosc/common/bean" + "github.com/eolinker/apinto/drivers" grpc_descriptor "github.com/eolinker/apinto/grpc-descriptor" "github.com/eolinker/eosc" - "github.com/fullstorydev/grpcurl" ) func check(v interface{}) (*Config, error) { @@ -19,7 +20,11 @@ func check(v interface{}) (*Config, error) { } func Create(id, name string, conf *Config, workers map[eosc.RequireId]eosc.IWorker) (eosc.IWorker, error) { - descSource, err := getDescSource(conf.ProtobufID, workers) + once.Do(func() { + bean.Autowired(&worker) + }) + + descSource, err := getDescSource(string(conf.ProtobufID), conf.Reflect) if err != nil { return nil, err } @@ -29,17 +34,21 @@ func Create(id, name string, conf *Config, workers map[eosc.RequireId]eosc.IWork }, nil } -func getDescSource(protobufID eosc.RequireId, workers map[eosc.RequireId]eosc.IWorker) (grpcurl.DescriptorSource, error) { - var descSource grpcurl.DescriptorSource - if protobufID != "" { - worker, ok := workers[protobufID] - if ok { - v, ok := worker.(grpc_descriptor.IDescriptor) - if !ok { - return nil, fmt.Errorf("invalid protobuf id: %s", protobufID) - } - descSource = v.Descriptor() - } +func getDescSource(protobufID string, reflect bool) (grpc_descriptor.IDescriptor, error) { + if reflect { + return nil, nil } - return descSource, nil + if protobufID == "" { + return nil, fmt.Errorf("protobuf id is empty") + } + + w, ok := worker.Get(protobufID) + if ok { + v, ok := w.(grpc_descriptor.IDescriptor) + if !ok { + return nil, fmt.Errorf("invalid protobuf id: %s", protobufID) + } + return v, nil + } + return nil, fmt.Errorf("protobuf worker(%s) is not exist", protobufID) } diff --git a/drivers/plugins/http-to-gRPC/factory.go b/drivers/plugins/http-to-gRPC/factory.go index 6dfba18a..39775012 100644 --- a/drivers/plugins/http-to-gRPC/factory.go +++ b/drivers/plugins/http-to-gRPC/factory.go @@ -1,6 +1,8 @@ package http_to_grpc import ( + "sync" + "github.com/eolinker/apinto/drivers" "github.com/eolinker/eosc" ) @@ -9,6 +11,11 @@ const ( Name = "http_to_grpc" ) +var ( + once = sync.Once{} + worker eosc.IWorkers +) + func Register(register eosc.IExtenderDriverRegister) { register.RegisterExtenderDriver(Name, NewFactory()) } diff --git a/drivers/plugins/http-to-gRPC/response.go b/drivers/plugins/http-to-gRPC/response.go index 9f32e519..f3a180cd 100644 --- a/drivers/plugins/http-to-gRPC/response.go +++ b/drivers/plugins/http-to-gRPC/response.go @@ -1,18 +1,65 @@ package http_to_grpc +import ( + "strings" + + "go.uber.org/zap/buffer" +) + +var ( + responseHeaderPre = "\nResponse headers received:" + responseContentPre = "\nResponse contents:" + responseTrailerPre = "\nResponse trailers received:" +) + func NewResponse() *Response { - return &Response{} + return &Response{ + header: make(map[string]string), + bodyWrite: false, + body: &buffer.Buffer{}, + } } type Response struct { - buf []byte + header map[string]string + bodyWrite bool + body *buffer.Buffer } func (r *Response) Write(p []byte) (n int, err error) { - r.buf = p - return len(r.buf), nil + str := string(p) + if strings.HasPrefix(str, responseHeaderPre) || strings.HasPrefix(str, responseTrailerPre) { + headers := strings.Split(str, "\n") + if len(headers) == 2 && strings.HasPrefix(headers[1], "(empty)") { + return len(p), nil + } + for index, header := range headers { + if index == 0 { + continue + } + values := strings.Split(header, ":") + var v string + if len(values) > 1 { + v = values[1] + } + r.header[values[0]] = v + } + } + if strings.HasPrefix(str, responseContentPre) { + r.bodyWrite = true + return len(p), nil + } + if r.bodyWrite { + r.body.Write(p) + r.bodyWrite = false + } + return len(p), nil } func (r *Response) Body() []byte { - return r.buf + return r.body.Bytes() +} + +func (r *Response) Header() map[string]string { + return r.header } diff --git a/drivers/plugins/http-to-gRPC/toGRPC.go b/drivers/plugins/http-to-gRPC/toGRPC.go index bd5d2d39..feb27ba1 100644 --- a/drivers/plugins/http-to-gRPC/toGRPC.go +++ b/drivers/plugins/http-to-gRPC/toGRPC.go @@ -39,7 +39,7 @@ func (t *toGRPC) Reset(conf interface{}, workers map[eosc.RequireId]eosc.IWorker if err != nil { return err } - descSource, err := getDescSource(cfg.ProtobufID, workers) + descSource, err := getDescSource(string(cfg.ProtobufID), cfg.Reflect) if err != nil { return err } diff --git a/drivers/router/grpc-router/handler.go b/drivers/router/grpc-router/handler.go index dd8698b7..421cc27a 100644 --- a/drivers/router/grpc-router/handler.go +++ b/drivers/router/grpc-router/handler.go @@ -48,5 +48,8 @@ func (h *grpcRouter) ServeHTTP(ctx eocontext.EoContext) { ctx.SetBalance(h.service) ctx.SetUpstreamHostHandler(h.service) ctx.SetFinish(h.finisher) - h.filters.Chain(ctx, completeCaller) + err = h.filters.Chain(ctx, completeCaller) + if err != nil { + grpcContext.Response().SetErr(err) + } } diff --git a/drivers/router/http-router/config.go b/drivers/router/http-router/config.go index 0a7fa8d7..24682974 100644 --- a/drivers/router/http-router/config.go +++ b/drivers/router/http-router/config.go @@ -7,7 +7,7 @@ import ( type Config struct { Listen int `json:"listen" yaml:"listen" title:"port" description:"使用端口" default:"80" label:"端口号" maximum:"65535"` - Method []string `json:"method" yaml:"method" enum:"GET,POST,PUT,DELETE,PATH,HEAD,OPTIONS" label:"请求方式"` + Method []string `json:"method" yaml:"method" enum:"GET,POST,PUT,DELETE,PATCH,HEAD,OPTIONS" label:"请求方式"` Host []string `json:"host" yaml:"host" label:"域名"` Path string `json:"location"` Rules []Rule `json:"rules" yaml:"rules" label:"路由规则"` @@ -21,7 +21,7 @@ type Config struct { TimeOut int `json:"time_out" label:"超时时间"` } -//Rule 规则 +// Rule 规则 type Rule struct { Type string `json:"type" yaml:"type" label:"类型" enum:"header,query,cookie"` Name string `json:"name" yaml:"name" label:"参数名"` diff --git a/drivers/router/http-router/websocket/complate.go b/drivers/router/http-router/websocket/complate.go index f8ab5624..303b704f 100644 --- a/drivers/router/http-router/websocket/complate.go +++ b/drivers/router/http-router/websocket/complate.go @@ -69,7 +69,7 @@ func (h *Complete) Complete(org eocontext.EoContext) error { conn, resp, lastErr = DialWithTimeout(u.String(), ctx.Proxy().Header().Headers(), timeOut) if lastErr == nil { resp.Body.Close() - ctx.SetUpstreamConn(conn) + ctx.SetUpstreamConn(&Conn{conn}) break } log.Error("websocket upstream send error: ", lastErr) @@ -80,3 +80,24 @@ func (h *Complete) Complete(org eocontext.EoContext) error { return lastErr } + +type Conn struct { + *websocket.Conn +} + +func (c *Conn) Read(b []byte) (n int, err error) { + return c.Conn.UnderlyingConn().Read(b) +} + +func (c *Conn) Write(b []byte) (n int, err error) { + return c.Conn.UnderlyingConn().Write(b) +} + +func (c *Conn) SetDeadline(t time.Time) error { + err := c.Conn.SetWriteDeadline(t) + if err != nil { + return err + } + + return c.Conn.SetReadDeadline(t) +} diff --git a/drivers/transcode/protobuf/config.go b/drivers/transcode/protobuf/config.go index 0cc7b821..d197425c 100644 --- a/drivers/transcode/protobuf/config.go +++ b/drivers/transcode/protobuf/config.go @@ -2,10 +2,13 @@ package protocbuf import ( "encoding/json" + + "github.com/eolinker/eosc" ) -// Config service_http驱动配置 +// Config protobuf驱动配置 type Config struct { + ProtoFiles eosc.EoFiles `json:"proto_files" label:"proto文件列表"` } func (c *Config) String() string { diff --git a/drivers/transcode/protobuf/driver.go b/drivers/transcode/protobuf/driver.go index e250f82c..edf486bb 100644 --- a/drivers/transcode/protobuf/driver.go +++ b/drivers/transcode/protobuf/driver.go @@ -1,10 +1,20 @@ package protocbuf import ( + "github.com/eolinker/apinto/drivers" "github.com/eolinker/eosc" ) // Create 创建service_http驱动的实例 func Create(id, name string, v *Config, workers map[eosc.RequireId]eosc.IWorker) (eosc.IWorker, error) { - return nil, nil + + source, err := parseFiles(v.ProtoFiles) + if err != nil { + return nil, err + } + + return &Worker{ + WorkerBase: drivers.Worker(id, name), + source: source, + }, nil } diff --git a/drivers/transcode/protobuf/factory.go b/drivers/transcode/protobuf/factory.go index 8c7e547b..7b437829 100644 --- a/drivers/transcode/protobuf/factory.go +++ b/drivers/transcode/protobuf/factory.go @@ -2,20 +2,13 @@ package protocbuf import ( "github.com/eolinker/apinto/drivers" - "github.com/eolinker/apinto/drivers/discovery/static" round_robin "github.com/eolinker/apinto/upstream/round-robin" "github.com/eolinker/eosc" ) var DriverName = "protobuf_transcode" -var ( - defaultHttpDiscovery = static.CreateAnonymous(&static.Config{ - Health: nil, - HealthOn: false, - }) -) -// Register 注册service_http驱动工厂 +// Register 注册protobuf驱动工厂 func Register(register eosc.IExtenderDriverRegister) { register.RegisterExtenderDriver(DriverName, NewFactory()) } diff --git a/drivers/transcode/protobuf/file_test.go b/drivers/transcode/protobuf/file_test.go new file mode 100644 index 00000000..0138225d --- /dev/null +++ b/drivers/transcode/protobuf/file_test.go @@ -0,0 +1,34 @@ +package protocbuf + +import ( + "encoding/json" + "fmt" + "testing" + + "github.com/eolinker/eosc" +) + +func TestParseFile(t *testing.T) { + var data = `[ + { + "data":"H4sIAAAAAAAAA12OMU4DMRBF+z3FKAfAYrdcbU8Noo7MMjIma4/xeFEQooAqQkRCAkTJDQg0AUUch7XouAJeSESgfX/mv88nNsgxVDBwngIVgzLLyAVNFhQNnaxHUmEfKx0O2r2NmoxAarQdoRfSaRtI4Fga16BQ3tViHw0NGf2xrrEvW1Xs/KBEhIAtbBraxqMWOcDH7CU+X2QGmfvDP9lpBsDBa6vAStMP2SwTStoiB9uaKi9/LzTvJnFVlNnZuoUdWUbobqbd4vbz7SpOrrvLhzifxPNZvJ8nYli9vy4OmWy8e4rTx/9blg1rY9LHasuSoPeJ5N/uL+SfHy9VAQAA", + "name":"msg.proto", + "size":341, + "type":"text/plain" + }, + { + "data":"H4sIAAAAAAAAA53OvQrCMBAA4L1PcXTSxQyO4uDmbB+gxHjEYJKLyVWU0ne3NhERFMTl4P6+u3TzLK+whjpEYlrWq6qiwIY8aGqDVCep8dHWho/dfqHICSRr/AmjkMF4JoFX6YJFoWNQ4oCO2oTxYhSOmHGBIkPtkl5MFx4HnmyTx8ZKWYAtWkvQVwCjlbPZFHd47jDxHCJyF32CZzkF8gnn/bAqSw1HlK7Mz9KUwZ9Gbnz74N0uCLyUjbUZ+vjFD8xwB3HHA1WeAQAA", + "name":"service.proto", + "size":414, + "type":"text/plain" + } + ]` + fileData := make(eosc.EoFiles, 0) + err := json.Unmarshal([]byte(data), &fileData) + if err != nil { + return + } + + desc, err := parseFiles(fileData) + fmt.Println(desc, err) +} diff --git a/drivers/transcode/protobuf/worker.go b/drivers/transcode/protobuf/worker.go new file mode 100644 index 00000000..4ce5c5dd --- /dev/null +++ b/drivers/transcode/protobuf/worker.go @@ -0,0 +1,69 @@ +package protocbuf + +import ( + "errors" + "fmt" + + "github.com/jhump/protoreflect/desc/protoparse" + + "github.com/eolinker/apinto/drivers" + grpc_descriptor "github.com/eolinker/apinto/grpc-descriptor" + "github.com/eolinker/eosc" + "github.com/fullstorydev/grpcurl" +) + +type Worker struct { + drivers.WorkerBase + source grpcurl.DescriptorSource +} + +func (w *Worker) Descriptor() grpcurl.DescriptorSource { + return w.source +} + +func (w *Worker) Start() error { + return nil +} + +func (w *Worker) Reset(conf interface{}, workers map[eosc.RequireId]eosc.IWorker) error { + cfg, ok := conf.(*Config) + if !ok { + return errors.New("illegal config type") + } + source, err := parseFiles(cfg.ProtoFiles) + if err != nil { + return err + } + w.source = source + return nil +} + +func (w *Worker) Stop() error { + w.source = nil + return nil +} + +func (w *Worker) CheckSkill(skill string) bool { + return grpc_descriptor.Skill == skill +} + +func parseFiles(files eosc.EoFiles) (grpcurl.DescriptorSource, error) { + descSourceFiles := map[string]string{} + fileNames := make([]string, 0, len(files)) + for _, f := range files { + v, err := f.DecodeData() + if err != nil { + return nil, fmt.Errorf("file(%s) data decode error: %v", f.Name, err) + } + descSourceFiles[f.Name] = string(v) + fileNames = append(fileNames, f.Name) + + } + p := &protoparse.Parser{Accessor: protoparse.FileContentsFromMap(descSourceFiles)} + descSources, err := p.ParseFiles(fileNames...) + if err != nil { + return nil, err + } + return grpcurl.DescriptorSourceFromFileDescriptors(descSources...) + +} diff --git a/go.mod b/go.mod index 0fbd804b..5382cb6d 100644 --- a/go.mod +++ b/go.mod @@ -198,4 +198,4 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect ) -//replace github.com/eolinker/eosc => ../eosc +replace github.com/eolinker/eosc => ../eosc diff --git a/grpc-descriptor/transcode.go b/grpc-descriptor/transcode.go index d95ea941..39031675 100644 --- a/grpc-descriptor/transcode.go +++ b/grpc-descriptor/transcode.go @@ -1,20 +1,18 @@ package grpc_descriptor import ( - "github.com/eolinker/eosc" "github.com/fullstorydev/grpcurl" ) const ( - ServiceSkill = "github.com/eolinker/apinto/grpc-transcode.transcode.IDescriptor" + Skill = "github.com/eolinker/apinto/grpc-transcode.transcode.IDescriptor" ) type IDescriptor interface { - eosc.IWorker Descriptor() grpcurl.DescriptorSource } -// CheckSkill 检查目标技能是否符合 +// CheckSkill 检查目标能力是否符合 func CheckSkill(skill string) bool { - return skill == ServiceSkill + return skill == Skill } diff --git a/node/grpc-context/request.go b/node/grpc-context/request.go index a29b97b7..5a98662d 100644 --- a/node/grpc-context/request.go +++ b/node/grpc-context/request.go @@ -11,9 +11,8 @@ import ( "github.com/jhump/protoreflect/desc" - "github.com/eolinker/eosc/log" - grpc_context "github.com/eolinker/eosc/eocontext/grpc-context" + "github.com/eolinker/eosc/log" "google.golang.org/grpc" "google.golang.org/grpc/metadata" ) @@ -101,33 +100,41 @@ func (r *Request) Message(msgDesc *desc.MessageDescriptor) *dynamic.Message { if r.message != nil { return r.message } - msg := dynamic.NewMessage(msgDesc) - if r.stream != nil { - ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) - errChan := make(chan error) - defer close(errChan) - go func() { - var err error - for { - err = r.stream.RecvMsg(msg) - if err != nil { - errChan <- err - } - } - }() + r.message = dynamic.NewMessage(msgDesc) + if r.stream == nil { + return r.message + } + + ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) + errChan := make(chan error) + + go func() { + var err error for { - select { - case <-ctx.Done(): - break - case err := <-errChan: - if err == io.EOF { - log.Debug("read message eof.") - } - break + err = r.stream.RecvMsg(r.message) + if err != nil { + errChan <- err + close(errChan) + return } } + }() + for { + select { + case <-ctx.Done(): + return r.message + case err, ok := <-errChan: + if !ok { + return r.message + } + if err != nil { + if err == io.EOF { + log.Debug("read message eof.") + } else { + log.Debug("read message error: ", err) + } + } + return r.message + } } - r.message = msg - - return msg } diff --git a/node/grpc-context/stream.go b/node/grpc-context/stream.go index 5f449724..2404519b 100644 --- a/node/grpc-context/stream.go +++ b/node/grpc-context/stream.go @@ -72,25 +72,23 @@ func forwardClientToServer(src grpc.ClientStream, dst grpc.ServerStream) chan er ret := make(chan error, 1) go func() { f := &anypb.Any{} - for i := 0; ; i++ { + // This is a bit of a hack, but client to server headers are only readable after first client msg is + // received but must be written to server stream before the first msg is flushed. + // This is the only place to do it nicely. + md, err := src.Header() + if err != nil { + ret <- err + return + } + if err := dst.SendHeader(md); err != nil { + ret <- err + return + } + for { if err := src.RecvMsg(f); err != nil { ret <- err // this can be io.EOF which is happy case break } - if i == 0 { - // This is a bit of a hack, but client to server headers are only readable after first client msg is - // received but must be written to server stream before the first msg is flushed. - // This is the only place to do it nicely. - md, err := src.Header() - if err != nil { - ret <- err - break - } - if err := dst.SendHeader(md); err != nil { - ret <- err - break - } - } if err := dst.SendMsg(f); err != nil { ret <- err break @@ -104,7 +102,7 @@ func forwardServerToClient(src grpc.ServerStream, dst grpc.ClientStream) chan er ret := make(chan error, 1) go func() { f := &anypb.Any{} - for i := 0; ; i++ { + for { if err := src.RecvMsg(f); err != nil { ret <- err // this can be io.EOF which is happy case break diff --git a/node/http-context/websocket-context.go b/node/http-context/websocket-context.go index cb3a82a4..627c077c 100644 --- a/node/http-context/websocket-context.go +++ b/node/http-context/websocket-context.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "io" + "net" "sync" "github.com/eolinker/eosc/log" @@ -19,7 +20,7 @@ var _ http_context.IWebsocketContext = (*WebsocketContext)(nil) type WebsocketContext struct { *HttpContext - upstreamConn *websocket.Conn + upstreamConn net.Conn } var upgrader = websocket.FastHTTPUpgrader{} @@ -36,12 +37,12 @@ func (w *WebsocketContext) Upgrade() error { wg := &sync.WaitGroup{} wg.Add(2) go func() { - size, err := io.Copy(conn.UnderlyingConn(), w.upstreamConn.UnderlyingConn()) + size, err := io.Copy(conn.UnderlyingConn(), w.upstreamConn) log.Infof("finish copy upstream: size is %d,err is %v", size, err) wg.Done() }() go func() { - size, err := io.Copy(w.upstreamConn.UnderlyingConn(), conn.UnderlyingConn()) + size, err := io.Copy(w.upstreamConn, conn.UnderlyingConn()) log.Infof("finish copy upstream: size is %d,err is %v", size, err) wg.Done() }() @@ -63,7 +64,7 @@ func NewWebsocketContext(ctx http_context.IHttpContext) (*WebsocketContext, erro return &WebsocketContext{HttpContext: httpCtx}, nil } -func (w *WebsocketContext) SetUpstreamConn(conn *websocket.Conn) { +func (w *WebsocketContext) SetUpstreamConn(conn net.Conn) { w.upstreamConn = conn }