grpc代码完成-待调试

This commit is contained in:
Liujian
2023-02-10 20:03:00 +08:00
parent 8e74ed33e5
commit 3563ff7fd8
12 changed files with 250 additions and 209 deletions

View File

@@ -15,7 +15,7 @@ func (f *Finisher) Finish(org eocontext.EoContext) error {
if err != nil { if err != nil {
return err return err
} }
ctx.Response() ctx.FastFinish()
return nil return nil
} }

View File

@@ -1,12 +1,16 @@
package grpc_router package grpc_router
import ( import (
"github.com/eolinker/apinto/drivers/router/grpc-router/manager"
"github.com/eolinker/apinto/service" "github.com/eolinker/apinto/service"
grpc_context "github.com/eolinker/eosc/eocontext/grpc-context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/eolinker/eosc/eocontext" "github.com/eolinker/eosc/eocontext"
) )
var completeCaller = NewCompleteCaller() var completeCaller = manager.NewCompleteCaller()
type grpcRouter struct { type grpcRouter struct {
completeHandler eocontext.CompleteHandler completeHandler eocontext.CompleteHandler
@@ -22,5 +26,27 @@ type grpcRouter struct {
} }
func (h *grpcRouter) ServeHTTP(ctx eocontext.EoContext) { func (h *grpcRouter) ServeHTTP(ctx eocontext.EoContext) {
grpcContext, err := grpc_context.Assert(ctx)
if err != nil {
return
}
if h.disable {
grpcContext.SetFinish(manager.NewErrHandler(status.Error(codes.Unavailable, "router is disable")))
grpcContext.FastFinish()
return
}
//Set Label
ctx.SetLabel("api", h.routerName)
ctx.SetLabel("api_id", h.routerId)
ctx.SetLabel("service", h.serviceName)
ctx.SetLabel("service_id", h.service.Id())
ctx.SetLabel("ip", grpcContext.Request().RealIP())
ctx.SetCompleteHandler(h.completeHandler)
ctx.SetApp(h.service)
ctx.SetBalance(h.service)
ctx.SetUpstreamHostHandler(h.service)
ctx.SetFinish(h.finisher)
h.filters.Chain(ctx, completeCaller)
} }

View File

@@ -1,8 +1,7 @@
package grpc_router package manager
import ( import (
"errors" "errors"
"fmt"
"time" "time"
grpc_context "github.com/eolinker/eosc/eocontext/grpc-context" grpc_context "github.com/eolinker/eosc/eocontext/grpc-context"
@@ -35,7 +34,6 @@ func (h *Complete) Complete(org eocontext.EoContext) error {
balance := ctx.GetBalance() balance := ctx.GetBalance()
app := ctx.GetApp() app := ctx.GetApp()
var lastErr error var lastErr error
scheme := app.Scheme()
timeOut := app.TimeOut() timeOut := app.TimeOut()
for index := 0; index <= h.retry; index++ { for index := 0; index <= h.retry; index++ {
@@ -45,17 +43,15 @@ func (h *Complete) Complete(org eocontext.EoContext) error {
} }
node, err := balance.Select(ctx) node, err := balance.Select(ctx)
if err != nil { if err != nil {
return err return err
} }
log.Debug("node: ", node.Addr()) log.Debug("node addr : ", node.Addr())
addr := fmt.Sprintf("%s://%s", scheme, node.Addr()) lastErr = ctx.Invoke(node.Addr(), timeOut)
lastErr = ctx.Invoke(addr, timeOut)
if lastErr == nil { if lastErr == nil {
return nil return nil
} }
log.Error("http upstream send error: ", lastErr) log.Error("grpc upstream send error: ", lastErr)
} }
return lastErr return lastErr

View File

@@ -23,8 +23,8 @@ func init() {
serverHandler := func(port int, ln net.Listener) { serverHandler := func(port int, ln net.Listener) {
opts := []grpc.ServerOption{ opts := []grpc.ServerOption{
grpc.UnknownServiceHandler(func(srv interface{}, stream grpc.ServerStream) error { grpc.UnknownServiceHandler(func(srv interface{}, stream grpc.ServerStream) error {
routerManager.FastHandler(port, srv, stream) err := routerManager.FastHandler(port, srv, stream)
return nil return err
}), }),
} }
server := grpc.NewServer(opts...) server := grpc.NewServer(opts...)

View File

@@ -1,23 +1,23 @@
package manager package manager
import ( import (
"fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
"google.golang.org/grpc/peer" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
grpc_context "github.com/eolinker/apinto/node/grpc-context"
"github.com/eolinker/apinto/router" "github.com/eolinker/apinto/router"
"google.golang.org/grpc" "google.golang.org/grpc"
http_complete "github.com/eolinker/apinto/drivers/router/http-router/http-complete" "github.com/eolinker/eosc/eocontext"
eoscContext "github.com/eolinker/eosc/eocontext"
"github.com/eolinker/eosc/log" "github.com/eolinker/eosc/log"
) )
var _ IManger = (*Manager)(nil) var _ IManger = (*Manager)(nil)
var notFound = new(NotFoundHandler) var completeCaller = NewCompleteCaller()
var completeCaller = http_complete.NewHttpCompleteCaller()
type IManger interface { type IManger interface {
Set(id string, port int, hosts []string, service string, method string, append []AppendRule, router router.IRouterHandler) error Set(id string, port int, hosts []string, service string, method string, append []AppendRule, router router.IRouterHandler) error
@@ -28,10 +28,10 @@ type Manager struct {
matcher router.IMatcher matcher router.IMatcher
routersData IRouterData routersData IRouterData
globalFilters atomic.Pointer[eoscContext.IChainPro] globalFilters atomic.Pointer[eocontext.IChainPro]
} }
func (m *Manager) SetGlobalFilters(globalFilters *eoscContext.IChainPro) { func (m *Manager) SetGlobalFilters(globalFilters *eocontext.IChainPro) {
m.globalFilters.Store(globalFilters) m.globalFilters.Store(globalFilters)
} }
@@ -68,24 +68,45 @@ func (m *Manager) Delete(id string) {
return return
} }
func (m *Manager) FastHandler(port int, srv interface{}, stream grpc.ServerStream) { func (m *Manager) FastHandler(port int, srv interface{}, stream grpc.ServerStream) error {
p, has := peer.FromContext(stream.Context()) ctx := grpc_context.NewContext(srv, stream)
if !has {
return
}
fmt.Println(p.Addr.String(), p.AuthInfo.AuthType())
if m.matcher == nil { if m.matcher == nil {
return return status.Error(codes.NotFound, "not found")
} }
r, has := m.matcher.Match(port, ctx.Request())
if !has {
errHandler := NewErrHandler(status.Error(codes.NotFound, "not found"))
ctx.SetFinish(errHandler)
ctx.SetCompleteHandler(errHandler)
globalFilters := m.globalFilters.Load()
if globalFilters != nil {
(*globalFilters).Chain(ctx, completeCaller)
}
} else {
log.Debug("match has:", port)
r.ServeHTTP(ctx)
}
finishHandler := ctx.GetFinish()
if finishHandler != nil {
return finishHandler.Finish(ctx)
}
return nil
} }
type NotFoundHandler struct { type ErrHandler struct {
err error
} }
func (h *NotFoundHandler) Complete(ctx eoscContext.EoContext) error { func (e *ErrHandler) Complete(ctx eocontext.EoContext) error {
panic("no implement") return e.err
} }
func (h *NotFoundHandler) Finish(ctx eoscContext.EoContext) error { func NewErrHandler(err error) *ErrHandler {
panic("no implement") return &ErrHandler{err: err}
}
func (e *ErrHandler) Finish(ctx eocontext.EoContext) error {
return e.err
} }

View File

@@ -51,7 +51,7 @@ func (h *GrpcRouter) reset(cfg *Config, workers map[eosc.RequireId]eosc.IWorker)
routerName: h.name, routerName: h.name,
routerId: h.id, routerId: h.id,
serviceName: strings.TrimSuffix(string(cfg.Service), "@service"), serviceName: strings.TrimSuffix(string(cfg.Service), "@service"),
completeHandler: NewComplete(cfg.Retry, time.Duration(cfg.TimeOut)*time.Millisecond), completeHandler: manager.NewComplete(cfg.Retry, time.Duration(cfg.TimeOut)*time.Millisecond),
finisher: defaultFinisher, finisher: defaultFinisher,
service: nil, service: nil,
filters: nil, filters: nil,

View File

@@ -86,7 +86,6 @@ func (m *Manager) FastHandler(port int, ctx *fasthttp.RequestCtx) {
globalFilters := m.globalFilters.Load() globalFilters := m.globalFilters.Load()
if globalFilters != nil { if globalFilters != nil {
(*globalFilters).Chain(httpContext, completeCaller) (*globalFilters).Chain(httpContext, completeCaller)
} }
} else { } else {
log.Debug("match has:", port) log.Debug("match has:", port)

7
go.mod
View File

@@ -11,14 +11,18 @@ require (
github.com/google/uuid v1.3.0 github.com/google/uuid v1.3.0
github.com/hashicorp/consul/api v1.9.1 github.com/hashicorp/consul/api v1.9.1
github.com/influxdata/influxdb-client-go/v2 v2.12.1 github.com/influxdata/influxdb-client-go/v2 v2.12.1
github.com/jhump/protoreflect v1.14.1
github.com/nsqio/go-nsq v1.1.0 github.com/nsqio/go-nsq v1.1.0
github.com/ohler55/ojg v1.12.9 github.com/ohler55/ojg v1.12.9
github.com/pkg/sftp v1.13.4 github.com/pkg/sftp v1.13.4
github.com/robertkrimen/otto v0.0.0-20211024170158-b87d35c0b86f github.com/robertkrimen/otto v0.0.0-20211024170158-b87d35c0b86f
github.com/soheilhy/cmux v0.1.5
github.com/urfave/cli/v2 v2.23.4 github.com/urfave/cli/v2 v2.23.4
github.com/valyala/fasthttp v1.42.0 github.com/valyala/fasthttp v1.42.0
golang.org/x/crypto v0.1.0 golang.org/x/crypto v0.1.0
golang.org/x/net v0.1.0 golang.org/x/net v0.1.0
google.golang.org/grpc v1.50.1
google.golang.org/protobuf v1.28.1
) )
require ( require (
@@ -79,7 +83,6 @@ require (
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/savsgio/gotils v0.0.0-20211223103454-d0aaa54c5899 // indirect github.com/savsgio/gotils v0.0.0-20211223103454-d0aaa54c5899 // indirect
github.com/soheilhy/cmux v0.1.5 // indirect
github.com/stretchr/objx v0.5.0 // indirect github.com/stretchr/objx v0.5.0 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
@@ -102,8 +105,6 @@ require (
golang.org/x/text v0.4.0 // indirect golang.org/x/text v0.4.0 // indirect
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect
google.golang.org/grpc v1.50.1 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/sourcemap.v1 v1.0.5 // indirect gopkg.in/sourcemap.v1 v1.0.5 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect

View File

@@ -4,15 +4,9 @@ import (
"context" "context"
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"io"
"net" "net"
"time" "time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
@@ -42,7 +36,7 @@ type Context struct {
requestId string requestId string
request *Request request *Request
proxy *Request proxy *Request
response *Response response grpc_context.IResponse
completeHandler eocontext.CompleteHandler completeHandler eocontext.CompleteHandler
finishHandler eocontext.FinishHandler finishHandler eocontext.FinishHandler
app eocontext.EoApp app eocontext.EoApp
@@ -52,6 +46,7 @@ type Context struct {
tls bool tls bool
insecureCertificateVerify bool insecureCertificateVerify bool
port int port int
finish bool
} }
func (c *Context) EnableTls(b bool) { func (c *Context) EnableTls(b bool) {
@@ -70,18 +65,16 @@ func NewContext(srv interface{}, stream grpc.ServerStream) *Context {
addr = p.Addr addr = p.Addr
} }
return &Context{ return &Context{
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
addr: addr, addr: addr,
srv: srv, srv: srv,
request: NewRequest(stream), request: NewRequest(stream),
proxy: NewRequest(stream),
response: NewResponse(),
} }
} }
//func (c *Context) ServerStream() grpc.ServerStream {
// return c.serverStream
//}
func (c *Context) RequestId() string { func (c *Context) RequestId() string {
return c.requestId return c.requestId
} }
@@ -191,7 +184,7 @@ func (c *Context) Response() grpc_context.IResponse {
} }
func (c *Context) SetResponse(response grpc_context.IResponse) { func (c *Context) SetResponse(response grpc_context.IResponse) {
c.response = response
} }
func (c *Context) Invoke(address string, timeout time.Duration) error { func (c *Context) Invoke(address string, timeout time.Duration) error {
@@ -199,7 +192,19 @@ func (c *Context) Invoke(address string, timeout time.Duration) error {
if err != nil { if err != nil {
return err return err
} }
return handlerStream(outgoingCtx, c.serverStream, clientConn, c.proxy.FullMethodName()) go handlerStream(outgoingCtx, c.serverStream, clientConn, c.proxy.FullMethodName(), c.response)
c.finish = true
return nil
}
func (c *Context) FastFinish() {
if c.finish {
return
}
c.serverStream.SendHeader(c.response.Headers())
c.serverStream.SendMsg(c.response.Message())
c.serverStream.SetTrailer(c.response.Trailer())
} }
func (c *Context) dial(address string, timeout time.Duration) (context.Context, *grpc.ClientConn, error) { func (c *Context) dial(address string, timeout time.Duration) (context.Context, *grpc.ClientConn, error) {
@@ -223,98 +228,3 @@ func (c *Context) dial(address string, timeout time.Duration) (context.Context,
conn, err := grpc.DialContext(c.ctx, address, opts...) conn, err := grpc.DialContext(c.ctx, address, opts...)
return metadata.NewOutgoingContext(c.Context(), c.proxy.Headers().Copy()), conn, err return metadata.NewOutgoingContext(c.Context(), c.proxy.Headers().Copy()), conn, err
} }
func handlerStream(outgoingCtx context.Context, serverStream grpc.ServerStream, clientConn *grpc.ClientConn, fullMethodName string) error {
// We require that the director's returned context inherits from the serverStream.Context().
clientCtx, clientCancel := context.WithCancel(outgoingCtx)
defer clientCancel()
// TODO(mwitkow): Add a `forwarded` header to metadata, https://en.wikipedia.org/wiki/X-Forwarded-For.
clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, clientConn, fullMethodName)
if err != nil {
return err
}
// Explicitly *do not close* s2cErrChan and c2sErrChan, otherwise the select below will not terminate.
// Channels do not have to be closed, it is just a control flow mechanism, see
// https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ
s2cErrChan := forwardServerToClient(serverStream, clientStream)
c2sErrChan := forwardClientToServer(clientStream, serverStream)
// We don't know which side is going to stop sending first, so we need a select between the two.
for i := 0; i < 2; i++ {
select {
case s2cErr := <-s2cErrChan:
if s2cErr == io.EOF {
// this is the happy case where the sender has encountered io.EOF, and won't be sending anymore./
// the clientStream>serverStream may continue pumping though.
clientStream.CloseSend()
} else {
// however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need
// to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and
// exit with an error to the stack
clientCancel()
return status.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr)
}
case c2sErr := <-c2sErrChan:
// This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two
// cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers
// will be nil.
serverStream.SetTrailer(clientStream.Trailer())
// c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error.
if c2sErr != io.EOF {
return c2sErr
}
return nil
}
}
return status.Errorf(codes.Internal, "gRPC proxying should never reach this stage.")
}
func forwardClientToServer(src grpc.ClientStream, dst grpc.ServerStream) chan error {
ret := make(chan error, 1)
go func() {
f := &anypb.Any{}
for i := 0; ; i++ {
if err := src.RecvMsg(f); err != nil {
ret <- err // this can be io.EOF which is happy case
break
}
if i == 0 {
// This is a bit of a hack, but client to server headers are only readable after first client msg is
// received but must be written to server stream before the first msg is flushed.
// This is the only place to do it nicely.
md, err := src.Header()
if err != nil {
ret <- err
break
}
if err := dst.SendHeader(md); err != nil {
ret <- err
break
}
}
if err := dst.SendMsg(f); err != nil {
ret <- err
break
}
}
}()
return ret
}
func forwardServerToClient(src grpc.ServerStream, dst grpc.ClientStream) chan error {
ret := make(chan error, 1)
go func() {
f := &anypb.Any{}
for i := 0; ; i++ {
if err := src.RecvMsg(f); err != nil {
ret <- err // this can be io.EOF which is happy case
break
}
if err := dst.SendMsg(f); err != nil {
ret <- err
break
}
}
}()
return ret
}

View File

@@ -27,6 +27,7 @@ type Request struct {
method string method string
message *dynamic.Message message *dynamic.Message
stream grpc.ServerStream stream grpc.ServerStream
realIP string
} }
func (r *Request) SetService(service string) { func (r *Request) SetService(service string) {
@@ -79,6 +80,17 @@ func (r *Request) FullMethodName() string {
return fmt.Sprintf("/%s/%s", r.service, r.method) return fmt.Sprintf("/%s/%s", r.service, r.method)
} }
func (r *Request) RealIP() string {
if r.realIP == "" {
r.realIP = strings.Join(r.headers.Get("x-real-ip"), ";")
}
return r.realIP
}
func (r *Request) ForwardIP() string {
return strings.Join(r.headers.Get("x-forwarded-for"), ";")
}
func (r *Request) Message(msgDesc *desc.MessageDescriptor) *dynamic.Message { func (r *Request) Message(msgDesc *desc.MessageDescriptor) *dynamic.Message {
if r.message != nil { if r.message != nil {
return r.message return r.message

View File

@@ -1,86 +1,39 @@
package grpc_context package grpc_context
import ( import (
"context"
"io"
"time"
grpc_context "github.com/eolinker/eosc/eocontext/grpc-context" grpc_context "github.com/eolinker/eosc/eocontext/grpc-context"
"github.com/eolinker/eosc/log"
"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/dynamic" "github.com/jhump/protoreflect/dynamic"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
) )
var _ grpc_context.IResponse = (*Response)(nil) var _ grpc_context.IResponse = (*Response)(nil)
type Response struct { type Response struct {
stream grpc.ClientStream header metadata.MD
message *dynamic.Message
headers metadata.MD
trailer metadata.MD trailer metadata.MD
msg *dynamic.Message
} }
func NewResponse(stream grpc.ClientStream) *Response { func (r *Response) Write(msg *dynamic.Message) {
headers, err := stream.Header() r.msg = msg
if err != nil { }
log.Error("get grpc response header error: ", err)
headers = metadata.New(map[string]string{})
}
func NewResponse() *Response {
return &Response{ return &Response{
stream: stream, header: metadata.New(map[string]string{}),
headers: headers, trailer: metadata.New(map[string]string{}),
trailer: stream.Trailer(), msg: nil,
} }
} }
func (r *Response) Headers() metadata.MD { func (r *Response) Headers() metadata.MD {
return r.headers return r.header
} }
func (r *Response) Message(msgDesc *desc.MessageDescriptor) *dynamic.Message { func (r *Response) Message() *dynamic.Message {
if r.message != nil { return r.msg
return r.message
}
msg := dynamic.NewMessage(msgDesc)
if r.stream != nil {
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
errChan := make(chan error)
defer close(errChan)
go func() {
var err error
for {
err = r.stream.RecvMsg(msg)
if err != nil {
errChan <- err
}
}
}()
for {
select {
case <-ctx.Done():
// 超时关闭通道
r.stream.CloseSend()
break
case err := <-errChan:
if err == io.EOF {
log.Debug("read message eof.")
}
break
}
}
}
r.message = msg
return msg
} }
func (r *Response) Trailer() metadata.MD { func (r *Response) Trailer() metadata.MD {
return r.trailer return r.trailer
} }
func (r *Response) ClientStream() grpc.ClientStream {
return r.stream
}

123
node/grpc-context/stream.go Normal file
View File

@@ -0,0 +1,123 @@
package grpc_context
import (
"context"
"io"
"google.golang.org/grpc/metadata"
grpc_context "github.com/eolinker/eosc/eocontext/grpc-context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/anypb"
)
var (
clientStreamDescForProxying = &grpc.StreamDesc{
ServerStreams: true,
ClientStreams: true,
}
)
func handlerStream(outgoingCtx context.Context, serverStream grpc.ServerStream, clientConn *grpc.ClientConn, fullMethodName string, response grpc_context.IResponse) error {
// We require that the director's returned context inherits from the serverStream.Context().
clientCtx, clientCancel := context.WithCancel(outgoingCtx)
defer clientCancel()
clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, clientConn, fullMethodName)
if err != nil {
return err
}
// Explicitly *do not close* s2cErrChan and c2sErrChan, otherwise the select below will not terminate.
// Channels do not have to be closed, it is just a control flow mechanism, see
// https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ
s2cErrChan := forwardServerToClient(serverStream, clientStream)
c2sErrChan := forwardClientToServer(clientStream, serverStream)
// We don't know which side is going to stop sending first, so we need a select between the two.
for i := 0; i < 2; i++ {
select {
case s2cErr := <-s2cErrChan:
if s2cErr == io.EOF {
// this is the happy case where the sender has encountered io.EOF, and won't be sending anymore./
// the clientStream>serverStream may continue pumping though.
clientStream.CloseSend()
} else {
// however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need
// to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and
// exit with an error to the stack
clientCancel()
return status.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr)
}
case c2sErr := <-c2sErrChan:
// This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two
// cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers
// will be nil.
header, err := clientStream.Header()
if err != nil {
serverStream.SendHeader(response.Headers())
} else {
serverStream.SendHeader(metadata.Join(response.Headers(), header))
}
serverStream.SetTrailer(metadata.Join(response.Trailer(), clientStream.Trailer()))
// c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error.
if c2sErr != io.EOF {
return c2sErr
}
return nil
}
}
return status.Errorf(codes.Internal, "gRPC proxying should never reach this stage.")
}
func forwardClientToServer(src grpc.ClientStream, dst grpc.ServerStream) chan error {
ret := make(chan error, 1)
go func() {
f := &anypb.Any{}
for i := 0; ; i++ {
if err := src.RecvMsg(f); err != nil {
ret <- err // this can be io.EOF which is happy case
break
}
if i == 0 {
// This is a bit of a hack, but client to server headers are only readable after first client msg is
// received but must be written to server stream before the first msg is flushed.
// This is the only place to do it nicely.
md, err := src.Header()
if err != nil {
ret <- err
break
}
if err := dst.SendHeader(md); err != nil {
ret <- err
break
}
}
if err := dst.SendMsg(f); err != nil {
ret <- err
break
}
}
}()
return ret
}
func forwardServerToClient(src grpc.ServerStream, dst grpc.ClientStream) chan error {
ret := make(chan error, 1)
go func() {
f := &anypb.Any{}
for i := 0; ; i++ {
if err := src.RecvMsg(f); err != nil {
ret <- err // this can be io.EOF which is happy case
break
}
if err := dst.SendMsg(f); err != nil {
ret <- err
break
}
}
}()
return ret
}