From 6c88f85bed3bdf8e8a6adc26628191f9bcece302 Mon Sep 17 00:00:00 2001 From: pyihe <785131182@qq.com> Date: Thu, 28 Apr 2022 15:09:19 +0800 Subject: [PATCH] fix --- bytes/bytes.go | 2 +- go.mod | 1 - packets/packet.go | 53 ++++++++++++++++++++++------------------- rediss/pool.go | 2 +- snowflakes/snowflake.go | 34 +++++++++----------------- syncs/counter.go | 9 +++++++ 6 files changed, 51 insertions(+), 50 deletions(-) diff --git a/bytes/bytes.go b/bytes/bytes.go index b9cc0bb..dd3bd11 100644 --- a/bytes/bytes.go +++ b/bytes/bytes.go @@ -9,7 +9,7 @@ import ( type ByteBuffer = bytebufferpool.ByteBuffer var ( - Get = bytebufferpool.Get() + Get = bytebufferpool.Get Put = func(b *ByteBuffer) { if b != nil { bytebufferpool.Put(b) diff --git a/go.mod b/go.mod index 8c40848..63754bb 100644 --- a/go.mod +++ b/go.mod @@ -13,5 +13,4 @@ require ( github.com/vmihailenco/msgpack/v5 v5.3.4 // indirect go.uber.org/zap v1.18.1 // indirect golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e // indirect - gopkg.in/ini.v1 v1.66.4 // indirect ) diff --git a/packets/packet.go b/packets/packet.go index fd228c7..541d40c 100644 --- a/packets/packet.go +++ b/packets/packet.go @@ -10,8 +10,10 @@ import ( ) type Packet interface { - Packet(message Message) (data []byte, err error) - UnPacket(reader io.Reader, message Message) error + HeaderLen() int + MaxMessageLen() int + Packet(message []byte) (data []byte, err error) + UnPacket(reader io.Reader) ([]byte, error) } type Message interface { @@ -25,44 +27,48 @@ type packet struct { } func NewPacket(headerLen, maxDataLen int) Packet { - if headerLen <= 0 { + if maths.MaxInt(0, headerLen) == 0 { headerLen = 4 } + if maths.MaxInt(0, maxDataLen) == 0 { + maxDataLen = 2046 + } return &packet{ headerLen: headerLen, maxDataLen: maths.MaxInt(0, maxDataLen), } } +func (p *packet) HeaderLen() int { + if p != nil { + return p.headerLen + } + return -1 +} + +func (p *packet) MaxMessageLen() int { + if p != nil { + return p.maxDataLen + } + return -1 +} + // Packet 封包 -func (p *packet) Packet(message Message) (data []byte, err error) { - if message == nil { - err = errors.New("nil Message") - return - } - mBytes, err := message.Marshal() - if err != nil { - return - } - if p.maxDataLen > 0 && len(mBytes) > p.maxDataLen { +func (p *packet) Packet(message []byte) (data []byte, err error) { + if p.maxDataLen > 0 && len(message) > p.maxDataLen { err = errors.New("packet: message is too large") return } - data = make([]byte, p.headerLen+len(mBytes)) + data = make([]byte, p.headerLen+len(message)) // 头headerLen个字节存放数据长度 - binary.LittleEndian.PutUint32(data[:4], uint32(len(mBytes))) + binary.LittleEndian.PutUint32(data[:4], uint32(len(message))) // 将数据写进剩余的字节 - copy(data[4:], mBytes) + copy(data[4:], message) return } // UnPacket 拆包 -func (p *packet) UnPacket(reader io.Reader, message Message) (err error) { - if message == nil { - err = errors.New("nil Message") - return - } - +func (p *packet) UnPacket(reader io.Reader) (b []byte, err error) { // 先读取header中的数据长度 header := make([]byte, p.headerLen) n, err := io.ReadFull(reader, header) @@ -84,8 +90,7 @@ func (p *packet) UnPacket(reader io.Reader, message Message) (err error) { data := make([]byte, dataLen) n, err = io.ReadFull(reader, data) if err == nil { - // 反序列化数据到对应的结构体中 - err = message.Unmarshal(data[:n]) + b = data[:n] } return } diff --git a/rediss/pool.go b/rediss/pool.go index 9b08931..1b4709e 100644 --- a/rediss/pool.go +++ b/rediss/pool.go @@ -111,7 +111,7 @@ func NewPool(opts ...InitOptions) (RedisPool, error) { defaultPool.db = 1 } if defaultPool.net == "" { - defaultPool.net = "tcp" + defaultPool.net = "tcps" } defaultPool.p = &redis.Pool{ Dial: func() (conn redis.Conn, e error) { diff --git a/snowflakes/snowflake.go b/snowflakes/snowflake.go index 0214de2..70f8480 100644 --- a/snowflakes/snowflake.go +++ b/snowflakes/snowflake.go @@ -1,7 +1,6 @@ package snowflakes import ( - "errors" "strconv" "sync" "time" @@ -36,38 +35,26 @@ type builder struct { number int64 // 当前毫秒已经生成的id序列号(从0开始累加) 1毫秒内最多生成4096个ID } -// 实例化一个工作节点 -func NewWorker(opts ...Option) Worker { +func NewWorker(workerId int64) Worker { + assertWorkId(workerId) b := &builder{ - epoch: time.Now().Unix() * 1000, + epoch: time.Now().Unix() * 1000, + workerId: workerId, } - for _, opt := range opts { - if err := opt(b); err != nil { - panic(err) - } - } - return b } -func WithWorkerId(workerId int64) Option { - return func(b *builder) error { - if workerId < 0 || workerId > nodeMax { - return errors.New("work id cannot more than 1024") - } - b.workerId = workerId - return nil +func assertWorkId(workerId int64) { + if workerId < 0 || workerId > nodeMax { + panic("work id cannot more than 1024") } } -func (w *builder) GetInt64() int64 { +func (w *builder) GetInt64() (id int64) { w.mu.Lock() - defer w.mu.Unlock() - now := time.Now().UnixNano() / 1e6 if w.timestamp == now { w.number++ - if w.number > numberMax { for now <= w.timestamp { now = time.Now().UnixNano() / 1e6 @@ -77,8 +64,9 @@ func (w *builder) GetInt64() int64 { w.number = 0 w.timestamp = now } - - return (now-w.epoch)<