extract servicePath and serviceMethod from metadata

This commit is contained in:
smallnest
2017-10-22 01:27:55 +08:00
parent b91d979b59
commit 32518e0470
7 changed files with 150 additions and 82 deletions

View File

@@ -223,8 +223,8 @@ func (client *Client) send(ctx context.Context, call *Call) {
req.Metadata = call.Metadata
}
req.Metadata[protocol.ServicePath] = call.ServicePath
req.Metadata[protocol.ServiceMethod] = call.ServiceMethod
req.ServicePath = call.ServicePath
req.ServiceMethod = call.ServiceMethod
data, err := codec.Encode(call.Args)
if err != nil {

View File

@@ -23,10 +23,6 @@ var (
)
const (
// ServicePath is service name
ServicePath = "__rpcx_path__"
// ServiceMethod is name of the service
ServiceMethod = "__rpcx_method__"
// ServiceError contains error info of service invocation
ServiceError = "__rpcx_error__"
)
@@ -78,6 +74,9 @@ const (
// Message is the generic type of Request and Response.
type Message struct {
*Header
ServicePath string
ServiceMethod string
metaBytes []byte
Metadata map[string]string
Payload []byte
}
@@ -89,7 +88,6 @@ func NewMessage() *Message {
return &Message{
Header: &header,
Metadata: make(map[string]string),
}
}
@@ -197,7 +195,8 @@ func (m Message) Clone() *Message {
header := *m.Header
c := &Message{
Header: &header,
Metadata: make(map[string]string),
ServicePath: m.ServicePath,
ServiceMethod: m.ServiceMethod,
}
return c
}
@@ -206,14 +205,27 @@ func (m Message) Clone() *Message {
func (m Message) Encode() []byte {
meta := encodeMetadata(m.Metadata)
l := 12 + (4 + len(meta)) + (4 + len(m.Payload))
spL := len(m.ServicePath)
spM := len(m.ServiceMethod)
metaStart := 12 + (4 + spL) + (4 + spM)
payLoadStart := metaStart + (4 + len(meta))
l := payLoadStart + (4 + len(m.Payload))
data := make([]byte, l)
copy(data, m.Header[:])
binary.BigEndian.PutUint32(data[12:16], uint32(len(meta)))
copy(data[12:], meta)
binary.BigEndian.PutUint32(data[16+len(meta):], uint32(len(m.Payload)))
copy(data[20+len(meta):], m.Payload)
binary.BigEndian.PutUint32(data[12:16], uint32(spL))
copy(data[16:16+spL], util.StringToSliceByte(m.ServicePath))
binary.BigEndian.PutUint32(data[16+spL:20+spL], uint32(spM))
copy(data[20+spL:metaStart], util.StringToSliceByte(m.ServiceMethod))
binary.BigEndian.PutUint32(data[metaStart:metaStart+4], uint32(len(meta)))
copy(data[metaStart+4:], meta)
binary.BigEndian.PutUint32(data[payLoadStart:payLoadStart+4], uint32(len(m.Payload)))
copy(data[payLoadStart+4:], m.Payload)
return data
}
@@ -225,17 +237,36 @@ func (m Message) WriteTo(w io.Writer) error {
return err
}
//write servicePath and serviceMethod
err = binary.Write(w, binary.BigEndian, uint32(len(m.ServicePath)))
if err != nil {
return err
}
_, err = w.Write(util.StringToSliceByte(m.ServicePath))
if err != nil {
return err
}
err = binary.Write(w, binary.BigEndian, uint32(len(m.ServiceMethod)))
if err != nil {
return err
}
_, err = w.Write(util.StringToSliceByte(m.ServiceMethod))
if err != nil {
return err
}
// write meta
meta := encodeMetadata(m.Metadata)
err = binary.Write(w, binary.BigEndian, uint32(len(meta)))
if err != nil {
return err
}
_, err = w.Write(meta)
if err != nil {
return err
}
//write payload
err = binary.Write(w, binary.BigEndian, uint32(len(m.Payload)))
if err != nil {
return err
@@ -245,95 +276,126 @@ func (m Message) WriteTo(w io.Writer) error {
return err
}
// len,string,len,string,......
func encodeMetadata(m map[string]string) []byte {
var buf bytes.Buffer
for k, v := range m {
buf.WriteString(k)
buf.Write(lineSeparator)
buf.WriteString(v)
buf.Write(lineSeparator)
if len(m) == 0 {
return []byte{}
}
var buf bytes.Buffer
var d = make([]byte, 4)
for k, v := range m {
binary.BigEndian.PutUint32(d, uint32(len(k)))
buf.Write(d)
buf.Write(util.StringToSliceByte(k))
binary.BigEndian.PutUint32(d, uint32(len(v)))
buf.Write(d)
buf.Write(util.StringToSliceByte(v))
}
return buf.Bytes()
}
func decodeMetadata(lenData []byte, r io.Reader) (map[string]string, error) {
func decodeMetadata(lenData []byte, r io.Reader) ([]byte, map[string]string, error) {
_, err := io.ReadFull(r, lenData)
if err != nil {
return nil, err
return nil, nil, err
}
l := binary.BigEndian.Uint32(lenData)
m := make(map[string]string)
if l == 0 {
return m, nil
return nil, nil, nil
}
m := make(map[string]string, 10)
data := make([]byte, l)
_, err = io.ReadFull(r, data)
if err != nil {
return nil, err
return nil, nil, err
}
meta := bytes.Split(data, lineSeparator)
n := uint32(0)
for n < l {
// parse one key and value
// key
sl := binary.BigEndian.Uint32(data[n : n+4])
n = n + 4
if n+sl > l-4 {
return nil, m, ErrMetaKVMissing
}
k := util.SliceByteToString(data[n : n+sl])
n = n + sl
// last element is empty
if len(meta)%2 != 1 {
return nil, ErrMetaKVMissing
// value
sl = binary.BigEndian.Uint32(data[n : n+4])
n = n + 4
if n+sl > l {
return nil, m, ErrMetaKVMissing
}
v := util.SliceByteToString(data[n : n+sl])
n = n + sl
m[k] = v
}
for i := 0; i < len(meta)-1; i = i + 2 {
m[util.SliceByteToString(meta[i])] = util.SliceByteToString(meta[i+1])
}
return m, nil
return data, m, nil
}
// Read reads a message from r.
func Read(r io.Reader) (*Message, error) {
msg := NewMessage()
_, err := io.ReadFull(r, msg.Header[:])
err := msg.Decode(r)
if err != nil {
return nil, err
}
lenData := make([]byte, 4)
msg.Metadata, err = decodeMetadata(lenData, r)
if err != nil {
return nil, err
}
_, err = io.ReadFull(r, lenData)
if err != nil {
return nil, err
}
l := binary.BigEndian.Uint32(lenData)
msg.Payload = make([]byte, l)
_, err = io.ReadFull(r, msg.Payload)
return msg, err
return msg, nil
}
// Decode decodes a message from reader.
func (m *Message) Decode(r io.Reader) error {
// parse header
_, err := io.ReadFull(r, m.Header[:])
if err != nil {
return err
}
// parse servicePath
lenData := make([]byte, 4)
m.Metadata, err = decodeMetadata(lenData, r)
if err != nil {
return err
}
_, err = io.ReadFull(r, lenData)
if err != nil {
return err
}
l := binary.BigEndian.Uint32(lenData)
sp := make([]byte, l)
_, err = io.ReadFull(r, sp)
if err != nil {
return err
}
m.ServicePath = util.SliceByteToString(sp)
// parse serviceMethod
_, err = io.ReadFull(r, lenData)
if err != nil {
return err
}
l = binary.BigEndian.Uint32(lenData)
sm := make([]byte, l)
_, err = io.ReadFull(r, sm)
if err != nil {
return err
}
m.ServiceMethod = util.SliceByteToString(sm)
// parse meta
m.metaBytes, m.Metadata, err = decodeMetadata(lenData, r)
if err != nil {
return err
}
// parse payload
_, err = io.ReadFull(r, lenData)
if err != nil {
return err
}
l = binary.BigEndian.Uint32(lenData)
m.Payload = make([]byte, l)
_, err = io.ReadFull(r, m.Payload)

View File

@@ -18,7 +18,8 @@ func TestMessage(t *testing.T) {
req.SetSeq(1234567890)
m := make(map[string]string)
m["__METHOD"] = "Arith.Add"
req.ServicePath = "Arith"
req.ServiceMethod = "Add"
m["__ID"] = "6ba7b810-9dad-11d1-80b4-00c04fd430c9"
req.Metadata = m
@@ -34,7 +35,6 @@ func TestMessage(t *testing.T) {
if err != nil {
t.Fatal(err)
}
res, err := Read(&buf)
if err != nil {
t.Fatal(err)
@@ -49,8 +49,8 @@ func TestMessage(t *testing.T) {
t.Errorf("expect 1234567890 but got %d", res.Seq())
}
if res.Metadata["__METHOD"] != "Arith.Add" && res.Metadata["__METHOD"] != "6ba7b810-9dad-11d1-80b4-00c04fd430c9" {
t.Errorf("got wrong meatadata: %v", res.Metadata)
if res.ServicePath != "Arith" || res.ServiceMethod != "Add" || res.Metadata["__ID"] != "6ba7b810-9dad-11d1-80b4-00c04fd430c9" {
t.Errorf("got wrong metadata: %v", res.Metadata)
}
if string(res.Payload) != payload {

View File

@@ -300,8 +300,8 @@ func (s *Server) handleRequest(ctx context.Context, req *protocol.Message) (res
res.SetMessageType(protocol.Response)
serviceName := req.Metadata[protocol.ServicePath]
methodName := req.Metadata[protocol.ServiceMethod]
serviceName := req.ServicePath
methodName := req.ServiceMethod
s.serviceMapMu.RLock()
service := s.serviceMap[serviceName]
@@ -362,6 +362,9 @@ func (s *Server) handleRequest(ctx context.Context, req *protocol.Message) (res
func handleError(res *protocol.Message, err error) (*protocol.Message, error) {
res.SetMessageStatusType(protocol.Error)
if res.Metadata == nil {
res.Metadata = make(map[string]string)
}
res.Metadata[protocol.ServiceError] = err.Error()
return res, err
}

View File

@@ -38,8 +38,8 @@ func TestHandleRequest(t *testing.T) {
req.SetSerializeType(protocol.JSON)
req.SetSeq(1234567890)
req.Metadata[protocol.ServicePath] = "Arith"
req.Metadata[protocol.ServiceMethod] = "Mul"
req.ServicePath = "Arith"
req.ServiceMethod = "Mul"
argv := &Args{
A: 10,

View File

@@ -41,14 +41,17 @@ func NewAliasPlugin() *AliasPlugin {
// PostReadRequest converts the alias of this service.
func (p *AliasPlugin) PostReadRequest(ctx context.Context, r *protocol.Message, e error) error {
var sp = r.Metadata[protocol.ServicePath]
var sm = r.Metadata[protocol.ServiceMethod]
var sp = r.ServicePath
var sm = r.ServiceMethod
k := sp + "." + sm
if p.Aliases != nil {
if pm := p.Aliases[k]; pm != nil {
r.Metadata[protocol.ServicePath] = pm.servicePath
r.Metadata[protocol.ServiceMethod] = pm.serviceMethod
r.ServicePath = pm.servicePath
r.ServiceMethod = pm.serviceMethod
if r.Metadata == nil {
r.Metadata = make(map[string]string)
}
r.Metadata[aliasAppliedKey] = "true"
}
}
@@ -60,14 +63,14 @@ func (p *AliasPlugin) PreWriteResponse(ctx context.Context, r *protocol.Message)
if r.Metadata[aliasAppliedKey] != "true" {
return nil
}
var sp = r.Metadata[protocol.ServicePath]
var sm = r.Metadata[protocol.ServiceMethod]
var sp = r.ServicePath
var sm = r.ServiceMethod
k := sp + "." + sm
if p.ReseverseAliases != nil {
if pm := p.ReseverseAliases[k]; pm != nil {
r.Metadata[protocol.ServicePath] = pm.servicePath
r.Metadata[protocol.ServiceMethod] = pm.serviceMethod
r.ServicePath = pm.servicePath
r.ServiceMethod = pm.serviceMethod
delete(r.Metadata, aliasAppliedKey)
}
}

View File

@@ -58,8 +58,8 @@ func (p *MetricsPlugin) PreReadRequest(ctx context.Context) error {
// PostReadRequest counts read
func (p *MetricsPlugin) PostReadRequest(ctx context.Context, r *protocol.Message, e error) error {
sp := r.Metadata[protocol.ServicePath]
sm := r.Metadata[protocol.ServiceMethod]
sp := r.ServicePath
sm := r.ServiceMethod
m := metrics.GetOrRegisterMeter(p.withPrefix("service_"+sp+"."+sm+"_Read_Qps"), p.Registry)
m.Mark(1)
@@ -68,8 +68,8 @@ func (p *MetricsPlugin) PostReadRequest(ctx context.Context, r *protocol.Message
// PostWriteResponse count write
func (p *MetricsPlugin) PostWriteResponse(ctx context.Context, req *protocol.Message, res *protocol.Message, e error) error {
sp := res.Metadata[protocol.ServicePath]
sm := res.Metadata[protocol.ServiceMethod]
sp := res.ServicePath
sm := res.ServiceMethod
m := metrics.GetOrRegisterMeter(p.withPrefix("service_"+sp+"."+sm+"_Write_Qps"), p.Registry)
m.Mark(1)