mirror of
https://github.com/smallnest/rpcx.git
synced 2025-09-26 20:21:14 +08:00
implement metadata
This commit is contained in:
@@ -118,10 +118,11 @@ type Call struct {
|
||||
ServicePath string // The name of the service and method to call.
|
||||
ServiceMethod string // The name of the service and method to call.
|
||||
Metadata map[string]string //metadata
|
||||
Args interface{} // The argument to the function (*struct).
|
||||
Reply interface{} // The reply from the function (*struct).
|
||||
Error error // After completion, the error status.
|
||||
Done chan *Call // Strobes when call is complete.
|
||||
ResMetadata map[string]string
|
||||
Args interface{} // The argument to the function (*struct).
|
||||
Reply interface{} // The reply from the function (*struct).
|
||||
Error error // After completion, the error status.
|
||||
Done chan *Call // Strobes when call is complete.
|
||||
}
|
||||
|
||||
func (call *Call) done() {
|
||||
@@ -153,6 +154,17 @@ func (client *Client) Go(ctx context.Context, servicePath, serviceMethod string,
|
||||
call.ServicePath = servicePath
|
||||
call.ServiceMethod = serviceMethod
|
||||
call.Metadata = metadata
|
||||
meta := ctx.Value(share.ReqMetaDataKey)
|
||||
if meta != nil { //copy meta in context to meta in requests
|
||||
metaMap := meta.(map[string]string)
|
||||
if metadata == nil {
|
||||
call.Metadata = metadata
|
||||
} else {
|
||||
for k, v := range metaMap {
|
||||
metadata[k] = v
|
||||
}
|
||||
}
|
||||
}
|
||||
call.Args = args
|
||||
call.Reply = reply
|
||||
if done == nil {
|
||||
@@ -202,6 +214,13 @@ func (client *Client) call(ctx context.Context, servicePath, serviceMethod strin
|
||||
return ctx.Err()
|
||||
case call := <-Done:
|
||||
err = call.Error
|
||||
meta := ctx.Value(share.ResMetaDataKey)
|
||||
if meta != nil && len(call.ResMetadata) > 0 {
|
||||
resMeta := meta.(map[string]string)
|
||||
for k, v := range call.ResMetadata {
|
||||
resMeta[k] = v
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
@@ -326,6 +345,7 @@ func (client *Client) input() {
|
||||
case res.MessageStatusType() == protocol.Error:
|
||||
// We've got an error response. Give this to the request;
|
||||
call.Error = ServiceError(res.Metadata[protocol.ServiceError])
|
||||
call.ResMetadata = res.Metadata
|
||||
call.done()
|
||||
default:
|
||||
data := res.Payload
|
||||
@@ -347,7 +367,7 @@ func (client *Client) input() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
call.ResMetadata = res.Metadata
|
||||
call.done()
|
||||
}
|
||||
}
|
||||
|
@@ -272,17 +272,32 @@ func (s *Server) serveConn(conn net.Conn) {
|
||||
conn.Write(data)
|
||||
return
|
||||
}
|
||||
res, err := s.handleRequest(ctx, req)
|
||||
|
||||
resMetadata := make(map[string]string)
|
||||
newCtx := context.WithValue(context.WithValue(ctx, share.ReqMetaDataKey, req.Metadata),
|
||||
share.ResMetaDataKey, resMetadata)
|
||||
|
||||
res, err := s.handleRequest(newCtx, req)
|
||||
if err != nil {
|
||||
log.Warnf("rpcx: failed to handle request: %v", err)
|
||||
}
|
||||
s.Plugins.DoPreWriteResponse(ctx, req)
|
||||
s.Plugins.DoPreWriteResponse(newCtx, req)
|
||||
if !req.IsOneway() {
|
||||
if len(resMetadata) > 0 { //copy meta in context to request
|
||||
meta := res.Metadata
|
||||
if meta == nil {
|
||||
meta = make(map[string]string)
|
||||
}
|
||||
for k, v := range resMetadata {
|
||||
meta[k] = v
|
||||
}
|
||||
res.Metadata = meta
|
||||
}
|
||||
data := res.Encode()
|
||||
conn.Write(data)
|
||||
//res.WriteTo(conn)
|
||||
}
|
||||
s.Plugins.DoPostWriteResponse(ctx, req, res, err)
|
||||
s.Plugins.DoPostWriteResponse(newCtx, req, res, err)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
@@ -27,3 +27,12 @@ var (
|
||||
func RegisterCodec(t protocol.SerializeType, c codec.Codec) {
|
||||
Codecs[t] = c
|
||||
}
|
||||
|
||||
// ContextKey defines key type in context.
|
||||
type ContextKey string
|
||||
|
||||
// ReqMetaDataKey is used to set metatdata in context of requests.
|
||||
var ReqMetaDataKey = ContextKey("__req_metadata")
|
||||
|
||||
// ResMetaDataKey is used to set metatdata in context of responses.
|
||||
var ResMetaDataKey = ContextKey("__res_metadata")
|
||||
|
@@ -13,3 +13,12 @@ func StringToSliceByte(s string) []byte {
|
||||
h := [3]uintptr{x[0], x[1], x[1]}
|
||||
return *(*[]byte)(unsafe.Pointer(&h))
|
||||
}
|
||||
|
||||
func CopyMeta(src, dst map[string]string) {
|
||||
if dst == nil {
|
||||
return
|
||||
}
|
||||
for k, v := range src {
|
||||
dst[k] = v
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user