mirror of
https://github.com/eolinker/apinto
synced 2025-09-27 13:12:20 +08:00
282 lines
6.1 KiB
Go
282 lines
6.1 KiB
Go
package grpc_context
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
|
|
"google.golang.org/grpc/metadata"
|
|
|
|
"google.golang.org/grpc/peer"
|
|
|
|
"github.com/eolinker/eosc/utils/config"
|
|
|
|
"google.golang.org/grpc"
|
|
|
|
"github.com/eolinker/eosc/eocontext"
|
|
|
|
grpc_context "github.com/eolinker/eosc/eocontext/grpc-context"
|
|
)
|
|
|
|
var _ grpc_context.IGrpcContext = (*Context)(nil)
|
|
|
|
var (
|
|
pool = sync.Pool{
|
|
New: newContext,
|
|
}
|
|
)
|
|
|
|
func newContext() interface{} {
|
|
h := new(Context)
|
|
return h
|
|
}
|
|
|
|
type Context struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
serverStream grpc.ServerStream
|
|
addr net.Addr
|
|
srv interface{}
|
|
acceptTime time.Time
|
|
requestId string
|
|
request grpc_context.IRequest
|
|
proxy grpc_context.IRequest
|
|
response grpc_context.IResponse
|
|
completeHandler eocontext.CompleteHandler
|
|
finishHandler eocontext.FinishHandler
|
|
app eocontext.EoApp
|
|
balance eocontext.BalanceHandler
|
|
upstreamHostHandler eocontext.UpstreamHostHandler
|
|
labels map[string]string
|
|
tls bool
|
|
insecureCertificateVerify bool
|
|
port int
|
|
finish bool
|
|
errChan chan error
|
|
}
|
|
|
|
func (c *Context) RealIP() string {
|
|
if c.request != nil {
|
|
return c.request.RealIP()
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func (c *Context) EnableTls(b bool) {
|
|
c.tls = b
|
|
}
|
|
|
|
func (c *Context) InsecureCertificateVerify(b bool) {
|
|
c.insecureCertificateVerify = b
|
|
}
|
|
|
|
func NewContext(srv interface{}, stream grpc.ServerStream) *Context {
|
|
now := time.Now()
|
|
ctx, cancel := context.WithCancel(stream.Context())
|
|
var addr net.Addr = zeroTCPAddr
|
|
p, has := peer.FromContext(ctx)
|
|
if has {
|
|
addr = p.Addr
|
|
}
|
|
|
|
newCtx := &Context{
|
|
requestId: uuid.New().String(),
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
addr: addr,
|
|
srv: srv,
|
|
serverStream: stream,
|
|
request: NewRequest(stream),
|
|
proxy: NewRequest(stream),
|
|
response: NewResponse(),
|
|
labels: map[string]string{},
|
|
errChan: make(chan error),
|
|
}
|
|
newCtx.WithValue("request_time", now)
|
|
return newCtx
|
|
}
|
|
|
|
func (c *Context) RequestId() string {
|
|
return c.requestId
|
|
}
|
|
|
|
func (c *Context) AcceptTime() time.Time {
|
|
return c.acceptTime
|
|
}
|
|
|
|
func (c *Context) Context() context.Context {
|
|
return c.ctx
|
|
}
|
|
|
|
func (c *Context) Value(key interface{}) interface{} {
|
|
return c.ctx.Value(key)
|
|
}
|
|
|
|
func (c *Context) WithValue(key, val interface{}) {
|
|
c.ctx = context.WithValue(c.ctx, key, val)
|
|
}
|
|
|
|
func (c *Context) Scheme() string {
|
|
return "grpc"
|
|
}
|
|
|
|
func (c *Context) Assert(i interface{}) error {
|
|
if v, ok := i.(*grpc_context.IGrpcContext); ok {
|
|
*v = c
|
|
return nil
|
|
}
|
|
return fmt.Errorf("not suport:%s", config.TypeNameOf(i))
|
|
}
|
|
|
|
func (c *Context) SetLabel(name, value string) {
|
|
c.labels[name] = value
|
|
}
|
|
|
|
func (c *Context) GetLabel(name string) string {
|
|
return c.labels[name]
|
|
}
|
|
|
|
func (c *Context) Labels() map[string]string {
|
|
return c.labels
|
|
}
|
|
|
|
func (c *Context) GetComplete() eocontext.CompleteHandler {
|
|
return c.completeHandler
|
|
}
|
|
|
|
func (c *Context) SetCompleteHandler(handler eocontext.CompleteHandler) {
|
|
c.completeHandler = handler
|
|
}
|
|
|
|
func (c *Context) GetFinish() eocontext.FinishHandler {
|
|
return c.finishHandler
|
|
}
|
|
|
|
func (c *Context) SetFinish(handler eocontext.FinishHandler) {
|
|
c.finishHandler = handler
|
|
}
|
|
|
|
func (c *Context) GetBalance() eocontext.BalanceHandler {
|
|
return c.balance
|
|
}
|
|
|
|
func (c *Context) SetBalance(handler eocontext.BalanceHandler) {
|
|
c.balance = handler
|
|
}
|
|
|
|
func (c *Context) GetUpstreamHostHandler() eocontext.UpstreamHostHandler {
|
|
return c.upstreamHostHandler
|
|
}
|
|
|
|
func (c *Context) SetUpstreamHostHandler(handler eocontext.UpstreamHostHandler) {
|
|
c.upstreamHostHandler = handler
|
|
}
|
|
|
|
func (c *Context) LocalIP() net.IP {
|
|
return addrToIP(c.addr)
|
|
}
|
|
|
|
func (c *Context) LocalAddr() net.Addr {
|
|
return c.addr
|
|
}
|
|
|
|
func (c *Context) LocalPort() int {
|
|
return c.port
|
|
}
|
|
|
|
func (c *Context) Request() grpc_context.IRequest {
|
|
return c.request
|
|
}
|
|
|
|
func (c *Context) Proxy() grpc_context.IRequest {
|
|
return c.proxy
|
|
}
|
|
|
|
func (c *Context) Response() grpc_context.IResponse {
|
|
return c.response
|
|
}
|
|
|
|
func (c *Context) SetResponse(response grpc_context.IResponse) {
|
|
c.response = response
|
|
}
|
|
|
|
func (c *Context) Invoke(node eocontext.INode, timeout time.Duration) error {
|
|
|
|
err := c.doInvoke(node.Addr(), timeout)
|
|
if err != nil {
|
|
node.Down()
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
func (c *Context) doInvoke(address string, timeout time.Duration) error {
|
|
passHost, targetHost := c.GetUpstreamHostHandler().PassHost()
|
|
switch passHost {
|
|
case eocontext.NodeHost:
|
|
c.proxy.SetHost(address)
|
|
case eocontext.ReWriteHost:
|
|
c.proxy.SetHost(targetHost)
|
|
}
|
|
clientConn, err := clientPool.Get(address, c.tls, c.proxy.Host()).Get()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
//c.proxy.Headers().SetProvider("grpc-timeout", fmt.Sprintf("%d", timeout))
|
|
clientCtx, _ := context.WithCancel(metadata.NewOutgoingContext(c.Context(), c.proxy.Headers().Copy()))
|
|
serverHeaders := &metadata.MD{}
|
|
serverTrailers := &metadata.MD{}
|
|
clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, clientConn, c.proxy.FullMethodName(), grpc.Header(serverHeaders), grpc.Trailer(serverTrailers))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c.finish = true
|
|
go c.readError(c.serverStream, clientStream, serverHeaders, serverTrailers, c.response)
|
|
return nil
|
|
}
|
|
|
|
func (c *Context) FastFinish() error {
|
|
defer c.reset()
|
|
if c.finish {
|
|
err, ok := <-c.errChan
|
|
if !ok {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
err := c.response.Error()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c.serverStream.SendHeader(c.response.Headers())
|
|
c.serverStream.SendMsg(c.response.Message())
|
|
c.serverStream.SetTrailer(c.response.Trailer())
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Context) reset() {
|
|
c.port = 0
|
|
c.ctx = nil
|
|
c.app = nil
|
|
c.balance = nil
|
|
c.upstreamHostHandler = nil
|
|
c.finishHandler = nil
|
|
c.completeHandler = nil
|
|
|
|
pool.Put(c)
|
|
}
|
|
|
|
func (c *Context) IsCloneable() bool {
|
|
return false
|
|
}
|
|
|
|
func (c *Context) Clone() (eocontext.EoContext, error) {
|
|
//TODO
|
|
return nil, fmt.Errorf("%s %w", "GrpcContext", eocontext.ErrEoCtxUnCloneable)
|
|
}
|