提取服务端trailer

This commit is contained in:
Liujian
2024-07-04 13:38:34 +08:00
parent be8e769a72
commit a863b0477c
2 changed files with 7 additions and 6 deletions

View File

@@ -229,12 +229,13 @@ func (c *Context) doInvoke(address string, timeout time.Duration) error {
c.proxy.Headers().Set("grpc-timeout", fmt.Sprintf("%dn", timeout))
clientCtx, _ := context.WithCancel(metadata.NewOutgoingContext(c.Context(), c.proxy.Headers().Copy()))
serverHeaders := &metadata.MD{}
clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, clientConn, c.proxy.FullMethodName(), grpc.Header(serverHeaders))
serverTrailers := &metadata.MD{}
clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, clientConn, c.proxy.FullMethodName(), grpc.Header(serverHeaders), grpc.Trailer(serverTrailers))
if err != nil {
return err
}
c.finish = true
go c.readError(c.serverStream, clientStream, serverHeaders, c.response)
go c.readError(c.serverStream, clientStream, serverHeaders, serverTrailers, c.response)
return nil
}

View File

@@ -21,12 +21,12 @@ var (
}
)
func (c *Context) readError(serverStream grpc.ServerStream, clientStream grpc.ClientStream, serverHeaders *metadata.MD, response grpc_context.IResponse) {
c.errChan <- handlerStream(serverStream, clientStream, serverHeaders, response)
func (c *Context) readError(serverStream grpc.ServerStream, clientStream grpc.ClientStream, serverHeaders *metadata.MD, trailers *metadata.MD, response grpc_context.IResponse) {
c.errChan <- handlerStream(serverStream, clientStream, serverHeaders, trailers, response)
close(c.errChan)
}
func handlerStream(serverStream grpc.ServerStream, clientStream grpc.ClientStream, serverHeaders *metadata.MD, response grpc_context.IResponse) error {
func handlerStream(serverStream grpc.ServerStream, clientStream grpc.ClientStream, serverHeaders *metadata.MD, trailers *metadata.MD, response grpc_context.IResponse) error {
// Explicitly *do not close* s2cErrChan and c2sErrChan, otherwise the select below will not terminate.
// Channels do not have to be closed, it is just a control flow mechanism, see
@@ -58,7 +58,7 @@ func handlerStream(serverStream grpc.ServerStream, clientStream grpc.ClientStrea
//} else {
serverStream.SendHeader(metadata.Join(response.Headers(), *serverHeaders))
//}
serverStream.SetTrailer(metadata.Join(response.Trailer(), clientStream.Trailer()))
serverStream.SetTrailer(metadata.Join(response.Trailer(), *trailers))
// c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error.
if c2sErr != io.EOF {
return c2sErr