mirror of
https://github.com/mochi-mqtt/server.git
synced 2025-09-26 20:21:12 +08:00
Lock on close outbound (#213)
This commit is contained in:
22
clients.go
22
clients.go
@@ -356,6 +356,9 @@ func (cl *Client) Read(packetHandler ReadFn) error {
|
||||
// Stop instructs the client to shut down all processing goroutines and disconnect.
|
||||
func (cl *Client) Stop(err error) {
|
||||
cl.State.endOnce.Do(func() {
|
||||
cl.Lock()
|
||||
defer cl.Unlock()
|
||||
|
||||
if cl.Net.Conn != nil {
|
||||
_ = cl.Net.Conn.Close() // omit close error
|
||||
}
|
||||
@@ -479,14 +482,6 @@ func (cl *Client) ReadPacket(fh *packets.FixedHeader) (pk packets.Packet, err er
|
||||
|
||||
// WritePacket encodes and writes a packet to the client.
|
||||
func (cl *Client) WritePacket(pk packets.Packet) error {
|
||||
if atomic.LoadUint32(&cl.State.done) == 1 {
|
||||
return ErrConnectionClosed
|
||||
}
|
||||
|
||||
if cl.Net.Conn == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if pk.Expiry > 0 {
|
||||
pk.Properties.MessageExpiryInterval = uint32(pk.Expiry - time.Now().Unix()) // [MQTT-3.3.2-6]
|
||||
}
|
||||
@@ -550,9 +545,18 @@ func (cl *Client) WritePacket(pk packets.Packet) error {
|
||||
return packets.ErrPacketTooLarge // [MQTT-3.1.2-24] [MQTT-3.1.2-25]
|
||||
}
|
||||
|
||||
nb := net.Buffers{buf.Bytes()}
|
||||
cl.Lock()
|
||||
defer cl.Unlock()
|
||||
|
||||
if atomic.LoadUint32(&cl.State.done) == 1 {
|
||||
return ErrConnectionClosed
|
||||
}
|
||||
|
||||
if cl.Net.Conn == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
nb := net.Buffers{buf.Bytes()}
|
||||
n, err := nb.WriteTo(cl.Net.Conn)
|
||||
if err != nil {
|
||||
return err
|
||||
|
Reference in New Issue
Block a user