#212 implement compressor

This commit is contained in:
smallnest
2018-04-25 20:21:59 +08:00
parent 83ff6b6b47
commit f1c003a1e0
5 changed files with 97 additions and 9 deletions

View File

@@ -300,6 +300,7 @@ func (client *Client) SendRaw(ctx context.Context, r *protocol.Message) (map[str
call.Error = err call.Error = err
call.done() call.done()
} }
return nil, nil, err
} }
if r.IsOneway() { if r.IsOneway() {
client.mutex.Lock() client.mutex.Lock()
@@ -309,6 +310,7 @@ func (client *Client) SendRaw(ctx context.Context, r *protocol.Message) (map[str
if call != nil { if call != nil {
call.done() call.done()
} }
return nil, nil, nil
} }
var m map[string]string var m map[string]string
@@ -437,8 +439,7 @@ func (client *Client) send(ctx context.Context, call *Call) {
call.done() call.done()
return return
} }
if len(data) > 1024 && client.option.CompressType == protocol.Gzip { if len(data) > 1024 && client.option.CompressType != protocol.None {
data, err = util.Zip(data)
if err != nil { if err != nil {
call.Error = err call.Error = err
call.done() call.done()
@@ -463,6 +464,7 @@ func (client *Client) send(ctx context.Context, call *Call) {
call.Error = err call.Error = err
call.done() call.done()
} }
return
} }
isOneway := req.IsOneway() isOneway := req.IsOneway()

32
protocol/compressor.go Normal file
View File

@@ -0,0 +1,32 @@
package protocol
import "github.com/smallnest/rpcx/util"
// Compressor defines a common compression interface.
type Compressor interface {
Zip([]byte) ([]byte, error)
Unzip([]byte) ([]byte, error)
}
// GzipCompressor implements gzip compressor.
type GzipCompressor struct {
}
func (c GzipCompressor) Zip(data []byte) ([]byte, error) {
return util.Zip(data)
}
func (c GzipCompressor) Unzip(data []byte) ([]byte, error) {
return util.Unzip(data)
}
type RawDataCompressor struct {
}
func (c RawDataCompressor) Zip(data []byte) ([]byte, error) {
return data, nil
}
func (c RawDataCompressor) Unzip(data []byte) ([]byte, error) {
return data, nil
}

View File

@@ -9,6 +9,14 @@ import (
"github.com/smallnest/rpcx/util" "github.com/smallnest/rpcx/util"
) )
var (
// Compressors are compressors supported by rpcx. You can add customized compressor in Compressors.
Compressors = map[CompressType]Compressor{
None: &RawDataCompressor{},
Gzip: &GzipCompressor{},
}
)
// MaxMessageLength is the max length of a message. // MaxMessageLength is the max length of a message.
// Default is 0 that means does not limit length of messages. // Default is 0 that means does not limit length of messages.
// It is used to validate when read messages from io.Reader. // It is used to validate when read messages from io.Reader.
@@ -23,6 +31,8 @@ var (
ErrMetaKVMissing = errors.New("wrong metadata lines. some keys or values are missing") ErrMetaKVMissing = errors.New("wrong metadata lines. some keys or values are missing")
// ErrMessageTooLong message is too long // ErrMessageTooLong message is too long
ErrMessageTooLong = errors.New("message is too long") ErrMessageTooLong = errors.New("message is too long")
ErrUnsupportedCompressor = errors.New("unsupported compressor")
) )
const ( const (
@@ -197,6 +207,7 @@ func (h *Header) SetSeq(seq uint64) {
func (m Message) Clone() *Message { func (m Message) Clone() *Message {
header := *m.Header header := *m.Header
c := GetPooledMsg() c := GetPooledMsg()
header.SetCompressType(None)
c.Header = &header c.Header = &header
c.ServicePath = m.ServicePath c.ServicePath = m.ServicePath
c.ServiceMethod = m.ServiceMethod c.ServiceMethod = m.ServiceMethod
@@ -210,7 +221,23 @@ func (m Message) Encode() []byte {
spL := len(m.ServicePath) spL := len(m.ServicePath)
smL := len(m.ServiceMethod) smL := len(m.ServiceMethod)
totalL := (4 + spL) + (4 + smL) + (4 + len(meta)) + (4 + len(m.Payload)) var err error
payload := m.Payload
if m.CompressType() != None {
compressor := Compressors[m.CompressType()]
if compressor == nil {
m.SetCompressType(None)
} else {
payload, err = compressor.Zip(m.Payload)
if err != nil {
m.SetCompressType(None)
payload = m.Payload
}
}
}
totalL := (4 + spL) + (4 + smL) + (4 + len(meta)) + (4 + len(payload))
// header + dataLen + spLen + sp + smLen + sm + metaL + meta + payloadLen + payload // header + dataLen + spLen + sp + smLen + sm + metaL + meta + payloadLen + payload
metaStart := 12 + 4 + (4 + spL) + (4 + smL) metaStart := 12 + 4 + (4 + spL) + (4 + smL)
@@ -233,8 +260,8 @@ func (m Message) Encode() []byte {
binary.BigEndian.PutUint32(data[metaStart:metaStart+4], uint32(len(meta))) binary.BigEndian.PutUint32(data[metaStart:metaStart+4], uint32(len(meta)))
copy(data[metaStart+4:], meta) copy(data[metaStart+4:], meta)
binary.BigEndian.PutUint32(data[payLoadStart:payLoadStart+4], uint32(len(m.Payload))) binary.BigEndian.PutUint32(data[payLoadStart:payLoadStart+4], uint32(len(payload)))
copy(data[payLoadStart+4:], m.Payload) copy(data[payLoadStart+4:], payload)
return data return data
} }
@@ -251,7 +278,19 @@ func (m Message) WriteTo(w io.Writer) error {
spL := len(m.ServicePath) spL := len(m.ServicePath)
smL := len(m.ServiceMethod) smL := len(m.ServiceMethod)
totalL := (4 + spL) + (4 + smL) + (4 + len(meta)) + (4 + len(m.Payload)) payload := m.Payload
if m.CompressType() != None {
compressor := Compressors[m.CompressType()]
if compressor == nil {
return ErrUnsupportedCompressor
}
payload, err = compressor.Zip(m.Payload)
if err != nil {
return err
}
}
totalL := (4 + spL) + (4 + smL) + (4 + len(meta)) + (4 + len(payload))
err = binary.Write(w, binary.BigEndian, uint32(totalL)) err = binary.Write(w, binary.BigEndian, uint32(totalL))
if err != nil { if err != nil {
return err return err
@@ -286,12 +325,12 @@ func (m Message) WriteTo(w io.Writer) error {
} }
//write payload //write payload
err = binary.Write(w, binary.BigEndian, uint32(len(m.Payload))) err = binary.Write(w, binary.BigEndian, uint32(len(payload)))
if err != nil { if err != nil {
return err return err
} }
_, err = w.Write(m.Payload) _, err = w.Write(payload)
return err return err
} }
@@ -421,6 +460,17 @@ func (m *Message) Decode(r io.Reader) error {
n = n + 4 n = n + 4
m.Payload = data[n:] m.Payload = data[n:]
if m.CompressType() != None {
compressor := Compressors[m.CompressType()]
if compressor == nil {
return ErrUnsupportedCompressor
}
m.Payload, err = compressor.Unzip(m.Payload)
if err != nil {
return err
}
}
return err return err
} }

View File

@@ -318,8 +318,12 @@ func (s *Server) serveConn(conn net.Conn) {
if !req.IsOneway() { if !req.IsOneway() {
res := req.Clone() res := req.Clone()
res.SetMessageType(protocol.Response) res.SetMessageType(protocol.Response)
if len(res.Payload) > 1024 && req.CompressType() != protocol.None {
res.SetCompressType(req.CompressType())
}
handleError(res, err) handleError(res, err)
data := res.Encode() data := res.Encode()
s.Plugins.DoPreWriteResponse(ctx, req, res) s.Plugins.DoPreWriteResponse(ctx, req, res)
conn.Write(data) conn.Write(data)
s.Plugins.DoPostWriteResponse(ctx, req, res, err) s.Plugins.DoPostWriteResponse(ctx, req, res, err)

View File

@@ -14,7 +14,7 @@ const (
) )
var ( var (
// Codecs are codecs supported by rpcx. // Codecs are codecs supported by rpcx. You can add customized codecs in Codecs.
Codecs = map[protocol.SerializeType]codec.Codec{ Codecs = map[protocol.SerializeType]codec.Codec{
protocol.SerializeNone: &codec.ByteCodec{}, protocol.SerializeNone: &codec.ByteCodec{},
protocol.JSON: &codec.JSONCodec{}, protocol.JSON: &codec.JSONCodec{},