mirror of
https://github.com/mochi-mqtt/server.git
synced 2025-10-26 17:40:38 +08:00
Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
75504ff201 | ||
|
|
a556feb325 | ||
|
|
8d4cc091b4 | ||
|
|
d8f28cb843 |
@@ -383,6 +383,10 @@ func (cl *Client) ReadFixedHeader(fh *packets.FixedHeader) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if cl.ops.capabilities.MaximumPacketSize > 0 && uint32(fh.Remaining+1) > cl.ops.capabilities.MaximumPacketSize {
|
||||
return packets.ErrPacketTooLarge // [MQTT-3.2.2-15]
|
||||
}
|
||||
|
||||
atomic.AddInt64(&cl.ops.info.BytesReceived, int64(bu+1))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -350,6 +350,22 @@ func TestClientReadFixedHeaderDecodeError(t *testing.T) {
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestClientReadFixedHeaderPacketOversized(t *testing.T) {
|
||||
cl, r, _ := newTestClient()
|
||||
cl.ops.capabilities.MaximumPacketSize = 2
|
||||
defer cl.Stop(errClientStop)
|
||||
|
||||
go func() {
|
||||
r.Write(packets.TPacketData[packets.Publish].Get(packets.TPublishQos1Dup).RawBytes)
|
||||
r.Close()
|
||||
}()
|
||||
|
||||
fh := new(packets.FixedHeader)
|
||||
err := cl.ReadFixedHeader(fh)
|
||||
require.Error(t, err)
|
||||
require.ErrorIs(t, err, packets.ErrPacketTooLarge)
|
||||
}
|
||||
|
||||
func TestClientReadFixedHeaderReadEOF(t *testing.T) {
|
||||
cl, r, _ := newTestClient()
|
||||
defer cl.Stop(errClientStop)
|
||||
|
||||
25
server.go
25
server.go
@@ -26,7 +26,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
Version = "2.0.5" // the current server version.
|
||||
Version = "2.0.7" // the current server version.
|
||||
defaultSysTopicInterval int64 = 1 // the interval between $SYS topic publishes
|
||||
defaultFanPoolSize uint64 = 32 // the number of concurrent workers in the pool
|
||||
defaultFanPoolQueueSize uint64 = 1024 // the capacity of each worker queue
|
||||
@@ -376,7 +376,7 @@ func (s *Server) attachClient(cl *Client, listener string) error {
|
||||
expire := (cl.Properties.ProtocolVersion == 5 && cl.Properties.Props.SessionExpiryIntervalFlag && cl.Properties.Props.SessionExpiryInterval == 0) || (cl.Properties.ProtocolVersion < 5 && cl.Properties.Clean)
|
||||
s.hooks.OnDisconnect(cl, err, expire)
|
||||
if expire {
|
||||
s.unsubscribeClient(cl)
|
||||
s.UnsubscribeClient(cl)
|
||||
cl.ClearInflights(math.MaxInt64, 0)
|
||||
s.Clients.Delete(cl.ID) // [MQTT-4.1.0-2] ![MQTT-3.1.2-23]
|
||||
}
|
||||
@@ -455,7 +455,7 @@ func (s *Server) inheritClientSession(pk packets.Packet, cl *Client) bool {
|
||||
defer existing.Unlock()
|
||||
s.DisconnectClient(existing, packets.ErrSessionTakenOver) // [MQTT-3.1.4-3]
|
||||
if pk.Connect.Clean || (existing.Properties.Clean && cl.Properties.ProtocolVersion < 5) { // [MQTT-3.1.2-4] [MQTT-3.1.4-4]
|
||||
s.unsubscribeClient(existing)
|
||||
s.UnsubscribeClient(existing)
|
||||
existing.ClearInflights(math.MaxInt64, 0)
|
||||
return false // [MQTT-3.2.2-3]
|
||||
}
|
||||
@@ -1072,14 +1072,20 @@ func (s *Server) processUnsubscribe(cl *Client, pk packets.Packet) error {
|
||||
return cl.WritePacket(ack)
|
||||
}
|
||||
|
||||
// unsubscribeClient unsubscribes a client from all of their subscriptions.
|
||||
func (s *Server) unsubscribeClient(cl *Client) {
|
||||
for k := range cl.State.Subscriptions.GetAll() {
|
||||
// UnsubscribeClient unsubscribes a client from all of their subscriptions.
|
||||
func (s *Server) UnsubscribeClient(cl *Client) {
|
||||
i := 0
|
||||
filterMap := cl.State.Subscriptions.GetAll()
|
||||
filters := make([]packets.Subscription, len(filterMap))
|
||||
for k, v := range filterMap {
|
||||
cl.State.Subscriptions.Delete(k)
|
||||
if s.Topics.Unsubscribe(k, cl.ID) {
|
||||
atomic.AddInt64(&s.Info.Subscriptions, -1)
|
||||
}
|
||||
filters[i] = v
|
||||
i++
|
||||
}
|
||||
s.hooks.OnUnsubscribed(cl, packets.Packet{Filters: filters})
|
||||
}
|
||||
|
||||
// processAuth processes an Auth packet.
|
||||
@@ -1126,12 +1132,15 @@ func (s *Server) DisconnectClient(cl *Client, code packets.Code) error {
|
||||
|
||||
// We already have a code we are using to disconnect the client, so we are not
|
||||
// interested if the write packet fails due to a closed connection (as we are closing it).
|
||||
_ = cl.WritePacket(out)
|
||||
err := cl.WritePacket(out)
|
||||
if !s.Options.Capabilities.Compatibilities.PassiveClientDisconnect {
|
||||
cl.Stop(code)
|
||||
if code.Code >= packets.ErrUnspecifiedError.Code {
|
||||
return code
|
||||
}
|
||||
}
|
||||
|
||||
return code
|
||||
return err
|
||||
}
|
||||
|
||||
// publishSysTopics publishes the current values to the server $SYS topics.
|
||||
|
||||
@@ -844,7 +844,7 @@ func TestServerUnsubscribeClient(t *testing.T) {
|
||||
s.Topics.Subscribe(cl.ID, pk)
|
||||
subs := s.Topics.Subscribers("a/b/c")
|
||||
require.Equal(t, 1, len(subs.Subscriptions))
|
||||
s.unsubscribeClient(cl)
|
||||
s.UnsubscribeClient(cl)
|
||||
subs = s.Topics.Subscribers("a/b/c")
|
||||
require.Equal(t, 0, len(subs.Subscriptions))
|
||||
}
|
||||
@@ -2291,6 +2291,21 @@ func TestServerRecievePacketDisconnectClientZeroNonZero(t *testing.T) {
|
||||
require.Equal(t, packets.TPacketData[packets.Disconnect].Get(packets.TDisconnectZeroNonZeroExpiry).RawBytes, buf)
|
||||
}
|
||||
|
||||
func TestServerRecievePacketDisconnectClient(t *testing.T) {
|
||||
s := newServer()
|
||||
cl, r, w := newTestClient()
|
||||
|
||||
go func() {
|
||||
err := s.DisconnectClient(cl, packets.CodeDisconnect)
|
||||
require.NoError(t, err)
|
||||
w.Close()
|
||||
}()
|
||||
|
||||
buf, err := io.ReadAll(r)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, packets.TPacketData[packets.Disconnect].Get(packets.TDisconnect).RawBytes, buf)
|
||||
}
|
||||
|
||||
func TestServerProcessPacketDisconnect(t *testing.T) {
|
||||
s := newServer()
|
||||
cl, _, _ := newTestClient()
|
||||
|
||||
Reference in New Issue
Block a user