Files
apinto/node/grpc-context/stream.go
2024-07-22 17:28:14 +08:00

123 lines
3.9 KiB
Go

package grpc_context
import (
"io"
"google.golang.org/protobuf/types/known/emptypb"
grpc_context "github.com/eolinker/eosc/eocontext/grpc-context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/grpc"
)
var (
clientStreamDescForProxying = &grpc.StreamDesc{
ServerStreams: true,
ClientStreams: true,
}
)
func (c *Context) readError(serverStream grpc.ServerStream, clientStream grpc.ClientStream, serverHeaders *metadata.MD, trailers *metadata.MD, response grpc_context.IResponse) {
c.errChan <- handlerStream(serverStream, clientStream, serverHeaders, trailers, response)
close(c.errChan)
}
func handlerStream(serverStream grpc.ServerStream, clientStream grpc.ClientStream, serverHeaders *metadata.MD, trailers *metadata.MD, response grpc_context.IResponse) error {
// Explicitly *do not close* s2cErrChan and c2sErrChan, otherwise the select below will not terminate.
// Channels do not have to be closed, it is just a control flow mechanism, see
// https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ
s2cErrChan := forwardServerToClient(serverStream, clientStream)
c2sErrChan := forwardClientToServer(clientStream, serverStream)
// We don't know which side is going to stop sending first, so we need a select between the two.
for i := 0; i < 2; i++ {
select {
case s2cErr := <-s2cErrChan:
if s2cErr == io.EOF {
// this is the happy case where the sender has encountered io.EOF, and won't be sending anymore./
// the clientStream>serverStream may continue pumping though.
clientStream.CloseSend()
} else {
// however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need
// to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and
// exit with an error to the stack
return status.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr)
}
case c2sErr := <-c2sErrChan:
// This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two
// cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers
// will be nil.
//header, err := clientStream.Header()
//if err != nil {
// serverStream.SendHeader(response.Headers())
//} else {
serverStream.SendHeader(metadata.Join(response.Headers(), *serverHeaders))
//}
serverStream.SetTrailer(metadata.Join(response.Trailer(), *trailers))
// c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error.
if c2sErr != io.EOF {
return c2sErr
}
return nil
}
}
return status.Errorf(codes.Internal, "gRPC proxying should never reach this stage.")
}
func forwardClientToServer(src grpc.ClientStream, dst grpc.ServerStream) chan error {
ret := make(chan error, 1)
go func() {
// This is a bit of a hack, but client to server headers are only readable after first client msg is
// received but must be written to server stream before the first msg is flushed.
// This is the only place to do it nicely.
md, err := src.Header()
if err != nil {
ret <- err
return
}
if err := dst.SendHeader(md); err != nil {
ret <- err
return
}
f := &emptypb.Empty{}
for {
if err := src.RecvMsg(f); err != nil {
ret <- err // this can be io.EOF which is happy case
break
}
if err := dst.SendMsg(f); err != nil {
ret <- err
break
}
}
}()
return ret
}
func forwardServerToClient(src grpc.ServerStream, dst grpc.ClientStream) chan error {
ret := make(chan error, 1)
go func() {
f := &emptypb.Empty{}
for {
if err := src.RecvMsg(f); err != nil {
ret <- err // this can be io.EOF which is happy case
break
}
if err := dst.SendMsg(f); err != nil {
ret <- err
break
}
}
}()
return ret
}