mirror of
https://github.com/mochi-mqtt/server.git
synced 2025-09-27 12:32:16 +08:00
Compare commits
27 Commits
use-contex
...
v2.2.15
Author | SHA1 | Date | |
---|---|---|---|
![]() |
9c52292732 | ||
![]() |
990f308faa | ||
![]() |
fe0c1d15a6 | ||
![]() |
0648e39507 | ||
![]() |
233a82e448 | ||
![]() |
51a8d8cb54 | ||
![]() |
23c3208310 | ||
![]() |
23e1092cda | ||
![]() |
d498576927 | ||
![]() |
7e14ce99b5 | ||
![]() |
4db49a4b9d | ||
![]() |
e60b8ff0c9 | ||
![]() |
b9d5dcb5f0 | ||
![]() |
6d394d1fe9 | ||
![]() |
1ee2158711 | ||
![]() |
af79b55b9f | ||
![]() |
e1a9497c25 | ||
![]() |
62659e17ba | ||
![]() |
7ad6dd8e1a | ||
![]() |
565e07747e | ||
![]() |
6acd775a6b | ||
![]() |
493f6c8bb0 | ||
![]() |
d3785c2717 | ||
![]() |
52a347169a | ||
![]() |
797d75cb34 | ||
![]() |
5225a357e5 | ||
![]() |
a734a0dc73 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,3 +1,4 @@
|
||||
cmd/mqtt
|
||||
.DS_Store
|
||||
*.db
|
||||
.idea
|
138
README.md
138
README.md
@@ -16,7 +16,7 @@ Mochi MQTT is an embeddable [fully compliant](https://docs.oasis-open.org/mqtt/m
|
||||
### What is MQTT?
|
||||
MQTT stands for [MQ Telemetry Transport](https://en.wikipedia.org/wiki/MQTT). It is a publish/subscribe, extremely simple and lightweight messaging protocol, designed for constrained devices and low-bandwidth, high-latency or unreliable networks ([Learn more](https://mqtt.org/faq)). Mochi MQTT fully implements version 5.0.0 of the MQTT protocol.
|
||||
|
||||
## What's new in Version 2.0.0?
|
||||
## What's new in Version 2?
|
||||
Version 2.0.0 takes all the great things we loved about Mochi MQTT v1.0.0, learns from the mistakes, and improves on the things we wished we'd had. It's a total from-scratch rewrite, designed to fully implement MQTT v5 as a first-class feature.
|
||||
|
||||
Don't forget to use the new v2 import paths:
|
||||
@@ -37,7 +37,7 @@ import "github.com/mochi-co/mqtt/v2"
|
||||
- Plus all the original MQTT features of Mochi MQTT v1, such as Full QoS(0,1,2), $SYS topics, retained messages, etc.
|
||||
- Developer-centric:
|
||||
- Most core broker code is now exported and accessible, for total developer control.
|
||||
- Full featured and flexible Hook-based interfacing system to provide easy 'plugin' development.
|
||||
- Full-featured and flexible Hook-based interfacing system to provide easy 'plugin' development.
|
||||
- Direct Packet Injection using special inline client, or masquerade as existing clients.
|
||||
- Performant and Stable:
|
||||
- Our classic trie-based Topic-Subscription model.
|
||||
@@ -116,13 +116,14 @@ Examples of running the broker with various configurations can be found in the [
|
||||
#### Network Listeners
|
||||
The server comes with a variety of pre-packaged network listeners which allow the broker to accept connections on different protocols. The current listeners are:
|
||||
|
||||
| Listener | Usage |
|
||||
| --- | --- |
|
||||
| listeners.NewTCP | A TCP listener |
|
||||
| listeners.NewUnixSock | A Unix Socket listener |
|
||||
| listeners.NewNet | A net.Listener listener |
|
||||
| listeners.NewWebsocket | A Websocket listener |
|
||||
| listeners.NewHTTPStats | An HTTP $SYS info dashboard |
|
||||
| Listener | Usage |
|
||||
|------------------------------|----------------------------------------------------------------------------------------------|
|
||||
| listeners.NewTCP | A TCP listener |
|
||||
| listeners.NewUnixSock | A Unix Socket listener |
|
||||
| listeners.NewNet | A net.Listener listener |
|
||||
| listeners.NewWebsocket | A Websocket listener |
|
||||
| listeners.NewHTTPStats | An HTTP $SYS info dashboard |
|
||||
| listeners.NewHTTPHealthCheck | An HTTP healthcheck listener to provide health check responses for e.g. cloud infrastructure |
|
||||
|
||||
> Use the `listeners.Listener` interface to develop new listeners. If you do, please let us know!
|
||||
|
||||
@@ -136,13 +137,13 @@ A number of configurable options are available which can be used to alter the be
|
||||
```go
|
||||
server := mqtt.New(&mqtt.Options{
|
||||
Capabilities: mqtt.Capabilities{
|
||||
ClientNetWriteBufferSize: 4096,
|
||||
ClientNetReadBufferSize: 4096,
|
||||
MaximumSessionExpiryInterval: 3600,
|
||||
Compatibilities: mqtt.Compatibilities{
|
||||
ObscureNotAuthorized: true,
|
||||
},
|
||||
},
|
||||
ClientNetWriteBufferSize: 4096,
|
||||
ClientNetReadBufferSize: 4096,
|
||||
SysTopicResendInterval: 10,
|
||||
})
|
||||
```
|
||||
@@ -269,51 +270,51 @@ For more information on how the badger hook works, or how to use it, see the [ex
|
||||
|
||||
There is also a BoltDB hook which has been deprecated in favour of Badger, but if you need it, check [examples/persistence/bolt/main.go](examples/persistence/bolt/main.go).
|
||||
|
||||
|
||||
|
||||
## Developing with Event Hooks
|
||||
Many hooks are available for interacting with the broker and client lifecycle.
|
||||
The function signatures for all the hooks and `mqtt.Hook` interface can be found in [hooks.go](hooks.go).
|
||||
|
||||
> The most flexible event hooks are OnPacketRead, OnPacketEncode, and OnPacketSent - these hooks be used to control and modify all incoming and outgoing packets.
|
||||
|
||||
| Function | Usage |
|
||||
| -------------------------- | -- |
|
||||
| OnStarted | Called when the server has successfully started.|
|
||||
| OnStopped | Called when the server has successfully stopped. |
|
||||
| OnConnectAuthenticate | Called when a user attempts to authenticate with the server. An implementation of this method MUST be used to allow or deny access to the server (see hooks/auth/allow_all or basic). It can be used in custom hooks to check connecting users against an existing user database. Returns true if allowed. |
|
||||
| OnACLCheck | Called when a user attempts to publish or subscribe to a topic filter. As above. |
|
||||
| OnSysInfoTick | Called when the $SYS topic values are published out. |
|
||||
| OnConnect | Called when a new client connects |
|
||||
| OnSessionEstablished | Called when a new client successfully establishes a session (after OnConnect) |
|
||||
| OnDisconnect | Called when a client is disconnected for any reason. |
|
||||
| OnAuthPacket | Called when an auth packet is received. It is intended to allow developers to create their own mqtt v5 Auth Packet handling mechanisms. Allows packet modification. |
|
||||
| OnPacketRead | Called when a packet is received from a client. Allows packet modification. |
|
||||
| OnPacketEncode | Called immediately before a packet is encoded to be sent to a client. Allows packet modification. |
|
||||
| OnPacketSent | Called when a packet has been sent to a client. |
|
||||
| OnPacketProcessed | Called when a packet has been received and successfully handled by the broker. |
|
||||
| OnSubscribe | Called when a client subscribes to one or more filters. Allows packet modification. |
|
||||
| OnSubscribed | Called when a client successfully subscribes to one or more filters. |
|
||||
| OnSelectSubscribers | Called when subscribers have been collected for a topic, but before shared subscription subscribers have been selected. Allows receipient modification.|
|
||||
| OnUnsubscribe | Called when a client unsubscribes from one or more filters. Allows packet modification. |
|
||||
| OnUnsubscribed | Called when a client successfully unsubscribes from one or more filters. |
|
||||
| OnPublish | Called when a client publishes a message. Allows packet modification. |
|
||||
| OnPublished | Called when a client has published a message to subscribers. |
|
||||
| OnPublishDropped | Called when a message to a client is dropped before delivery, such as if the client is taking too long to respond. |
|
||||
| OnRetainMessage | Called then a published message is retained. |
|
||||
| OnQosPublish | Called when a publish packet with Qos >= 1 is issued to a subscriber. |
|
||||
| OnQosComplete | Called when the Qos flow for a message has been completed. |
|
||||
| OnQosDropped | Called when an inflight message expires before completion. |
|
||||
| OnPacketIDExhausted | Called when a client runs out of unused packet ids to assign. |
|
||||
| OnWill | Called when a client disconnects and intends to issue a will message. Allows packet modification. |
|
||||
| OnWillSent | Called when an LWT message has been issued from a disconnecting client. |
|
||||
| OnClientExpired | Called when a client session has expired and should be deleted. |
|
||||
| OnRetainedExpired | Called when a retained message has expired and should be deleted. |
|
||||
| StoredClients | Returns clients, eg. from a persistent store. |
|
||||
| StoredSubscriptions | Returns client subscriptions, eg. from a persistent store. |
|
||||
| StoredInflightMessages | Returns inflight messages, eg. from a persistent store. |
|
||||
| StoredRetainedMessages | Returns retained messages, eg. from a persistent store. |
|
||||
| StoredSysInfo | Returns stored system info values, eg. from a persistent store. |
|
||||
| Function | Usage |
|
||||
|------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| OnStarted | Called when the server has successfully started. |
|
||||
| OnStopped | Called when the server has successfully stopped. |
|
||||
| OnConnectAuthenticate | Called when a user attempts to authenticate with the server. An implementation of this method MUST be used to allow or deny access to the server (see hooks/auth/allow_all or basic). It can be used in custom hooks to check connecting users against an existing user database. Returns true if allowed. |
|
||||
| OnACLCheck | Called when a user attempts to publish or subscribe to a topic filter. As above. |
|
||||
| OnSysInfoTick | Called when the $SYS topic values are published out. |
|
||||
| OnConnect | Called when a new client connects, may return an error or packet code to halt the client connection process. |
|
||||
| OnSessionEstablish | Called immediately after a new client connects and authenticates and immediately before the session is established and CONNACK is sent.
|
||||
| OnSessionEstablished | Called when a new client successfully establishes a session (after OnConnect) |
|
||||
| OnDisconnect | Called when a client is disconnected for any reason. |
|
||||
| OnAuthPacket | Called when an auth packet is received. It is intended to allow developers to create their own mqtt v5 Auth Packet handling mechanisms. Allows packet modification. |
|
||||
| OnPacketRead | Called when a packet is received from a client. Allows packet modification. |
|
||||
| OnPacketEncode | Called immediately before a packet is encoded to be sent to a client. Allows packet modification. |
|
||||
| OnPacketSent | Called when a packet has been sent to a client. |
|
||||
| OnPacketProcessed | Called when a packet has been received and successfully handled by the broker. |
|
||||
| OnSubscribe | Called when a client subscribes to one or more filters. Allows packet modification. |
|
||||
| OnSubscribed | Called when a client successfully subscribes to one or more filters. |
|
||||
| OnSelectSubscribers | Called when subscribers have been collected for a topic, but before shared subscription subscribers have been selected. Allows receipient modification. |
|
||||
| OnUnsubscribe | Called when a client unsubscribes from one or more filters. Allows packet modification. |
|
||||
| OnUnsubscribed | Called when a client successfully unsubscribes from one or more filters. |
|
||||
| OnPublish | Called when a client publishes a message. Allows packet modification. |
|
||||
| OnPublished | Called when a client has published a message to subscribers. |
|
||||
| OnPublishDropped | Called when a message to a client is dropped before delivery, such as if the client is taking too long to respond. |
|
||||
| OnRetainMessage | Called then a published message is retained. |
|
||||
| OnRetainPublished | Called then a retained message is published to a client. |
|
||||
| OnQosPublish | Called when a publish packet with Qos >= 1 is issued to a subscriber. |
|
||||
| OnQosComplete | Called when the Qos flow for a message has been completed. |
|
||||
| OnQosDropped | Called when an inflight message expires before completion. |
|
||||
| OnPacketIDExhausted | Called when a client runs out of unused packet ids to assign. |
|
||||
| OnWill | Called when a client disconnects and intends to issue a will message. Allows packet modification. |
|
||||
| OnWillSent | Called when an LWT message has been issued from a disconnecting client. |
|
||||
| OnClientExpired | Called when a client session has expired and should be deleted. |
|
||||
| OnRetainedExpired | Called when a retained message has expired and should be deleted. |
|
||||
| StoredClients | Returns clients, eg. from a persistent store. |
|
||||
| StoredSubscriptions | Returns client subscriptions, eg. from a persistent store. |
|
||||
| StoredInflightMessages | Returns inflight messages, eg. from a persistent store. |
|
||||
| StoredRetainedMessages | Returns retained messages, eg. from a persistent store. |
|
||||
| StoredSysInfo | Returns stored system info values, eg. from a persistent store. |
|
||||
|
||||
If you are building a persistent storage hook, see the existing persistent hooks for inspiration and patterns. If you are building an auth hook, you will need `OnACLCheck` and `OnConnectAuthenticate`.
|
||||
|
||||
@@ -367,39 +368,54 @@ Mochi MQTT performance is comparable with popular brokers such as Mosquitto, EMQ
|
||||
Performance benchmarks were tested using [MQTT-Stresser](https://github.com/inovex/mqtt-stresser) on a Apple Macbook Air M2, using `cmd/main.go` default settings. Taking into account bursts of high and low throughput, the median scores are the most useful. Higher is better.
|
||||
|
||||
> The values presented in the benchmark are not representative of true messages per second throughput. They rely on an unusual calculation by mqtt-stresser, but are usable as they are consistent across all brokers.
|
||||
> Benchmarks are provided as a general performance expectation guideline only.
|
||||
> Benchmarks are provided as a general performance expectation guideline only. Comparisons are performed using out-of-the-box default configurations.
|
||||
|
||||
`mqtt-stresser -broker tcp://localhost:1883 -num-clients=2 -num-messages=10000`
|
||||
| Broker | publish fastest | median | slowest | receive fastest | median | slowest |
|
||||
| -- | -- | -- | -- | -- | -- | -- |
|
||||
| Mochi v2.2.0 | 127,216 | 125,748 | 124,279 | 319,250 | 309,327 | 299,405 |
|
||||
| Mosquitto v2.0.15 | 155,920 | 155,919 | 155,918 | 185,485 | 185,097 | 184,709 |
|
||||
| EMQX v5.0.11 | 156,945 | 156,257 | 155,568 | 17,918 | 17,783 | 17,649 |
|
||||
| Mochi v2.2.10 | 124,772 | 125,456 | 124,614 | 314,461 | 313,186 | 311,910 |
|
||||
| [Mosquitto v2.0.15](https://github.com/eclipse/mosquitto) | 155,920 | 155,919 | 155,918 | 185,485 | 185,097 | 184,709 |
|
||||
| [EMQX v5.0.11](https://github.com/emqx/emqx) | 156,945 | 156,257 | 155,568 | 17,918 | 17,783 | 17,649 |
|
||||
| [Rumqtt v0.21.0](https://github.com/bytebeamio/rumqtt) | 112,208 | 108,480 | 104,753 | 135,784 | 126,446 | 117,108 |
|
||||
|
||||
`mqtt-stresser -broker tcp://localhost:1883 -num-clients=10 -num-messages=10000`
|
||||
| Broker | publish fastest | median | slowest | receive fastest | median | slowest |
|
||||
| -- | -- | -- | -- | -- | -- | -- |
|
||||
| Mochi v2.2.0 | 45,615 | 30,129 | 21,138 | 232,717 | 86,323 | 50,402 |
|
||||
| Mochi v2.2.10 | 41,825 | 31,663| 23,008 | 144,058 | 65,903 | 37,618 |
|
||||
| Mosquitto v2.0.15 | 42,729 | 38,633 | 29,879 | 23,241 | 19,714 | 18,806 |
|
||||
| EMQX v5.0.11 | 21,553 | 17,418 | 14,356 | 4,257 | 3,980 | 3,756 |
|
||||
| Rumqtt v0.21.0 | 42,213 | 23,153 | 20,814 | 49,465 | 36,626 | 19,283 |
|
||||
|
||||
Million Message Challenge (hit the server with 1 million messages immediately):
|
||||
|
||||
`mqtt-stresser -broker tcp://localhost:1883 -num-clients=100 -num-messages=10000`
|
||||
| Broker | publish fastest | median | slowest | receive fastest | median | slowest |
|
||||
| -- | -- | -- | -- | -- | -- | -- |
|
||||
| Mochi v2.2.0 | 51,044 | 4,682 | 2,345 | 72,634 | 7,645 | 2,464 |
|
||||
| Mochi v2.2.10 | 13,532 | 4,425 | 2,344 | 52,120 | 7,274 | 2,701 |
|
||||
| Mosquitto v2.0.15 | 3,826 | 3,395 | 3,032 | 1,200 | 1,150 | 1,118 |
|
||||
| EMQX v5.0.11 | 4,086 | 2,432 | 2,274 | 434 | 333 | 311 |
|
||||
| Rumqtt v0.21.0 | 78,972 | 5,047 | 3,804 | 4,286 | 3,249 | 2,027 |
|
||||
|
||||
> Not sure what's going on with EMQX here, perhaps the docker out-of-the-box settings are not optimal, so take it with a pinch of salt as we know for a fact it's a solid piece of software.
|
||||
|
||||
## Contribution Guidelines
|
||||
Contributions and feedback are both welcomed and encouraged! [Open an issue](https://github.com/mochi-co/mqtt/issues) to report a bug, ask a question, or make a feature request. If you open a pull request, please try to follow the following guidelines:
|
||||
- Try to maintain test coverage where reasonably possible.
|
||||
- Clearly state what the PR does and why.
|
||||
- Remember to add your SPDX FileContributor tag to files where you have made a meaningful contribution.
|
||||
|
||||
[SPDX Annotations](https://spdx.dev) are used to clearly indicate the license, copyright, and contributions of each file in a machine-readable format. If you are adding a new file to the repository, please ensure it has the following SPDX header:
|
||||
```go
|
||||
// SPDX-License-Identifier: MIT
|
||||
// SPDX-FileCopyrightText: 2022 mochi-co
|
||||
// SPDX-FileContributor: Your name or alias <optional@email.address>
|
||||
|
||||
package name
|
||||
```
|
||||
|
||||
Please ensure to add a new `SPDX-FileContributor` line for each contributor to the file. Refer to other files for examples. Please remember to do this, your contributions to this project are valuable and appreciated - it's important to receive credit!
|
||||
|
||||
## Stargazers over time 🥰
|
||||
[](https://starchart.cc/mochi-co/mqtt)
|
||||
Are you using Mochi MQTT in a project? [Let us know!](https://github.com/mochi-co/mqtt/issues)
|
||||
|
||||
## Contributions
|
||||
Contributions and feedback are both welcomed and encouraged! [Open an issue](https://github.com/mochi-co/mqtt/issues) to report a bug, ask a question, or make a feature request.
|
||||
|
||||
|
||||
|
||||
|
96
clients.go
96
clients.go
@@ -7,6 +7,7 @@ package mqtt
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
@@ -87,7 +88,7 @@ func (cl *Clients) GetByListener(id string) []*Client {
|
||||
defer cl.RUnlock()
|
||||
clients := make([]*Client, 0, cl.Len())
|
||||
for _, client := range cl.internal {
|
||||
if client.Net.Listener == id && atomic.LoadUint32(&client.State.done) == 0 {
|
||||
if client.Net.Listener == id && !client.Closed() {
|
||||
clients = append(clients, client)
|
||||
}
|
||||
}
|
||||
@@ -135,29 +136,34 @@ type Will struct {
|
||||
|
||||
// State tracks the state of the client.
|
||||
type ClientState struct {
|
||||
TopicAliases TopicAliases // a map of topic aliases
|
||||
stopCause atomic.Value // reason for stopping
|
||||
Inflight *Inflight // a map of in-flight qos messages
|
||||
Subscriptions *Subscriptions // a map of the subscription filters a client maintains
|
||||
disconnected int64 // the time the client disconnected in unix time, for calculating expiry
|
||||
outbound chan *packets.Packet // queue for pending outbound packets
|
||||
endOnce sync.Once // only end once
|
||||
isTakenOver uint32 // used to identify orphaned clients
|
||||
packetID uint32 // the current highest packetID
|
||||
done uint32 // atomic counter which indicates that the client has closed
|
||||
outboundQty int32 // number of messages currently in the outbound queue
|
||||
keepalive uint16 // the number of seconds the connection can wait
|
||||
TopicAliases TopicAliases // a map of topic aliases
|
||||
stopCause atomic.Value // reason for stopping
|
||||
Inflight *Inflight // a map of in-flight qos messages
|
||||
Subscriptions *Subscriptions // a map of the subscription filters a client maintains
|
||||
disconnected int64 // the time the client disconnected in unix time, for calculating expiry
|
||||
outbound chan *packets.Packet // queue for pending outbound packets
|
||||
endOnce sync.Once // only end once
|
||||
isTakenOver uint32 // used to identify orphaned clients
|
||||
packetID uint32 // the current highest packetID
|
||||
open context.Context // indicate that the client is open for packet exchange
|
||||
cancelOpen context.CancelFunc // cancel function for open context
|
||||
outboundQty int32 // number of messages currently in the outbound queue
|
||||
Keepalive uint16 // the number of seconds the connection can wait
|
||||
ServerKeepalive bool // keepalive was set by the server
|
||||
}
|
||||
|
||||
// newClient returns a new instance of Client. This is almost exclusively used by Server
|
||||
// for creating new clients, but it lives here because it's not dependent.
|
||||
func newClient(c net.Conn, o *ops) *Client {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cl := &Client{
|
||||
State: ClientState{
|
||||
Inflight: NewInflights(),
|
||||
Subscriptions: NewSubscriptions(),
|
||||
TopicAliases: NewTopicAliases(o.options.Capabilities.TopicAliasMaximum),
|
||||
keepalive: defaultKeepalive,
|
||||
open: ctx,
|
||||
cancelOpen: cancel,
|
||||
Keepalive: defaultKeepalive,
|
||||
outbound: make(chan *packets.Packet, o.options.Capabilities.MaximumClientWritesPending),
|
||||
},
|
||||
Properties: ClientProperties{
|
||||
@@ -177,18 +183,21 @@ func newClient(c net.Conn, o *ops) *Client {
|
||||
}
|
||||
}
|
||||
|
||||
cl.refreshDeadline(cl.State.keepalive)
|
||||
|
||||
return cl
|
||||
}
|
||||
|
||||
// WriteLoop ranges over pending outbound messages and writes them to the client connection.
|
||||
func (cl *Client) WriteLoop() {
|
||||
for pk := range cl.State.outbound {
|
||||
if err := cl.WritePacket(*pk); err != nil {
|
||||
cl.ops.log.Debug().Err(err).Str("client", cl.ID).Interface("packet", pk).Msg("failed publishing packet")
|
||||
for {
|
||||
select {
|
||||
case pk := <-cl.State.outbound:
|
||||
if err := cl.WritePacket(*pk); err != nil {
|
||||
cl.ops.log.Debug().Err(err).Str("client", cl.ID).Interface("packet", pk).Msg("failed publishing packet")
|
||||
}
|
||||
atomic.AddInt32(&cl.State.outboundQty, -1)
|
||||
case <-cl.State.open.Done():
|
||||
return
|
||||
}
|
||||
atomic.AddInt32(&cl.State.outboundQty, -1)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -201,9 +210,9 @@ func (cl *Client) ParseConnect(lid string, pk packets.Packet) {
|
||||
cl.Properties.Clean = pk.Connect.Clean
|
||||
cl.Properties.Props = pk.Properties.Copy(false)
|
||||
|
||||
cl.State.Keepalive = pk.Connect.Keepalive // [MQTT-3.2.2-22]
|
||||
cl.State.Inflight.ResetReceiveQuota(int32(cl.ops.options.Capabilities.ReceiveMaximum)) // server receive max per client
|
||||
cl.State.Inflight.ResetSendQuota(int32(cl.Properties.Props.ReceiveMaximum)) // client receive max
|
||||
|
||||
cl.State.TopicAliases.Outbound = NewOutboundTopicAliases(cl.Properties.Props.TopicAliasMaximum)
|
||||
|
||||
cl.ID = pk.Connect.ClientIdentifier
|
||||
@@ -212,11 +221,6 @@ func (cl *Client) ParseConnect(lid string, pk packets.Packet) {
|
||||
cl.Properties.Props.AssignedClientID = cl.ID
|
||||
}
|
||||
|
||||
cl.State.keepalive = cl.ops.options.Capabilities.ServerKeepAlive
|
||||
if pk.Connect.Keepalive > 0 {
|
||||
cl.State.keepalive = pk.Connect.Keepalive // [MQTT-3.2.2-22]
|
||||
}
|
||||
|
||||
if pk.Connect.WillFlag {
|
||||
cl.Properties.Will = Will{
|
||||
Qos: pk.Connect.WillQos,
|
||||
@@ -234,8 +238,6 @@ func (cl *Client) ParseConnect(lid string, pk packets.Packet) {
|
||||
cl.Properties.Will.Flag = 1 // atomic for checking
|
||||
}
|
||||
}
|
||||
|
||||
cl.refreshDeadline(cl.State.keepalive)
|
||||
}
|
||||
|
||||
// refreshDeadline refreshes the read/write deadline for the net.Conn connection.
|
||||
@@ -330,11 +332,11 @@ func (cl *Client) Read(packetHandler ReadFn) error {
|
||||
var err error
|
||||
|
||||
for {
|
||||
if atomic.LoadUint32(&cl.State.done) == 1 {
|
||||
if cl.Closed() {
|
||||
return nil
|
||||
}
|
||||
|
||||
cl.refreshDeadline(cl.State.keepalive)
|
||||
cl.refreshDeadline(cl.State.Keepalive)
|
||||
fh := new(packets.FixedHeader)
|
||||
err = cl.ReadFixedHeader(fh)
|
||||
if err != nil {
|
||||
@@ -356,8 +358,6 @@ func (cl *Client) Read(packetHandler ReadFn) error {
|
||||
// Stop instructs the client to shut down all processing goroutines and disconnect.
|
||||
func (cl *Client) Stop(err error) {
|
||||
cl.State.endOnce.Do(func() {
|
||||
cl.Lock()
|
||||
defer cl.Unlock()
|
||||
|
||||
if cl.Net.Conn != nil {
|
||||
_ = cl.Net.Conn.Close() // omit close error
|
||||
@@ -367,11 +367,10 @@ func (cl *Client) Stop(err error) {
|
||||
cl.State.stopCause.Store(err)
|
||||
}
|
||||
|
||||
if cl.State.outbound != nil {
|
||||
close(cl.State.outbound)
|
||||
if cl.State.cancelOpen != nil {
|
||||
cl.State.cancelOpen()
|
||||
}
|
||||
|
||||
atomic.StoreUint32(&cl.State.done, 1)
|
||||
atomic.StoreInt64(&cl.State.disconnected, time.Now().Unix())
|
||||
})
|
||||
}
|
||||
@@ -386,7 +385,7 @@ func (cl *Client) StopCause() error {
|
||||
|
||||
// Closed returns true if client connection is closed.
|
||||
func (cl *Client) Closed() bool {
|
||||
return atomic.LoadUint32(&cl.State.done) == 1
|
||||
return cl.State.open == nil || cl.State.open.Err() != nil
|
||||
}
|
||||
|
||||
// ReadFixedHeader reads in the values of the next packet's fixed header.
|
||||
@@ -482,6 +481,14 @@ func (cl *Client) ReadPacket(fh *packets.FixedHeader) (pk packets.Packet, err er
|
||||
|
||||
// WritePacket encodes and writes a packet to the client.
|
||||
func (cl *Client) WritePacket(pk packets.Packet) error {
|
||||
if cl.Closed() {
|
||||
return ErrConnectionClosed
|
||||
}
|
||||
|
||||
if cl.Net.Conn == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if pk.Expiry > 0 {
|
||||
pk.Properties.MessageExpiryInterval = uint32(pk.Expiry - time.Now().Unix()) // [MQTT-3.3.2-6]
|
||||
}
|
||||
@@ -545,19 +552,12 @@ func (cl *Client) WritePacket(pk packets.Packet) error {
|
||||
return packets.ErrPacketTooLarge // [MQTT-3.1.2-24] [MQTT-3.1.2-25]
|
||||
}
|
||||
|
||||
cl.Lock()
|
||||
defer cl.Unlock()
|
||||
|
||||
if atomic.LoadUint32(&cl.State.done) == 1 {
|
||||
return ErrConnectionClosed
|
||||
}
|
||||
|
||||
if cl.Net.Conn == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
nb := net.Buffers{buf.Bytes()}
|
||||
n, err := nb.WriteTo(cl.Net.Conn)
|
||||
n, err := func() (int64, error) {
|
||||
cl.Lock()
|
||||
defer cl.Unlock()
|
||||
return nb.WriteTo(cl.Net.Conn)
|
||||
}()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@@ -5,6 +5,7 @@
|
||||
package mqtt
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"net"
|
||||
@@ -114,8 +115,8 @@ func TestClientsDelete(t *testing.T) {
|
||||
|
||||
func TestClientsGetByListener(t *testing.T) {
|
||||
cl := NewClients()
|
||||
cl.Add(&Client{ID: "t1", Net: ClientConnection{Listener: "tcp1"}})
|
||||
cl.Add(&Client{ID: "t2", Net: ClientConnection{Listener: "ws1"}})
|
||||
cl.Add(&Client{ID: "t1", State: ClientState{open: context.Background()}, Net: ClientConnection{Listener: "tcp1"}})
|
||||
cl.Add(&Client{ID: "t2", State: ClientState{open: context.Background()}, Net: ClientConnection{Listener: "ws1"}})
|
||||
require.Contains(t, cl.internal, "t1")
|
||||
require.Contains(t, cl.internal, "t2")
|
||||
|
||||
@@ -132,7 +133,7 @@ func TestNewClient(t *testing.T) {
|
||||
require.NotNil(t, cl.State.Inflight.internal)
|
||||
require.NotNil(t, cl.State.Subscriptions)
|
||||
require.NotNil(t, cl.State.TopicAliases)
|
||||
require.Equal(t, defaultKeepalive, cl.State.keepalive)
|
||||
require.Equal(t, defaultKeepalive, cl.State.Keepalive)
|
||||
require.Equal(t, defaultClientProtocolVersion, cl.Properties.ProtocolVersion)
|
||||
require.NotNil(t, cl.Net.Conn)
|
||||
require.NotNil(t, cl.Net.bconn)
|
||||
@@ -164,7 +165,7 @@ func TestClientParseConnect(t *testing.T) {
|
||||
|
||||
cl.ParseConnect("tcp1", pk)
|
||||
require.Equal(t, pk.Connect.ClientIdentifier, cl.ID)
|
||||
require.Equal(t, pk.Connect.Keepalive, cl.State.keepalive)
|
||||
require.Equal(t, pk.Connect.Keepalive, cl.State.Keepalive)
|
||||
require.Equal(t, pk.Connect.Clean, cl.Properties.Clean)
|
||||
require.Equal(t, pk.Connect.ClientIdentifier, cl.ID)
|
||||
require.Equal(t, pk.Connect.WillTopic, cl.Properties.Will.TopicName)
|
||||
@@ -466,7 +467,7 @@ func TestClientReadOK(t *testing.T) {
|
||||
func TestClientReadDone(t *testing.T) {
|
||||
cl, _, _ := newTestClient()
|
||||
defer cl.Stop(errClientStop)
|
||||
cl.State.done = 1
|
||||
cl.State.cancelOpen()
|
||||
|
||||
o := make(chan error)
|
||||
go func() {
|
||||
@@ -483,7 +484,7 @@ func TestClientStop(t *testing.T) {
|
||||
cl.Stop(nil)
|
||||
require.Equal(t, nil, cl.State.stopCause.Load())
|
||||
require.Equal(t, time.Now().Unix(), cl.State.disconnected)
|
||||
require.Equal(t, uint32(1), cl.State.done)
|
||||
require.True(t, cl.Closed())
|
||||
require.Equal(t, nil, cl.StopCause())
|
||||
}
|
||||
|
||||
|
@@ -30,14 +30,14 @@ func main() {
|
||||
l := server.Log.Level(zerolog.DebugLevel)
|
||||
server.Log = &l
|
||||
|
||||
err := server.AddHook(new(auth.AllowHook), nil)
|
||||
err := server.AddHook(new(debug.Hook), &debug.Options{
|
||||
// ShowPacketData: true,
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
err = server.AddHook(new(debug.Hook), &debug.Options{
|
||||
// ShowPacketData: true,
|
||||
})
|
||||
err = server.AddHook(new(auth.AllowHook), nil)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
@@ -110,8 +110,9 @@ func (h *ExampleHook) Init(config any) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *ExampleHook) OnConnect(cl *mqtt.Client, pk packets.Packet) {
|
||||
func (h *ExampleHook) OnConnect(cl *mqtt.Client, pk packets.Packet) error {
|
||||
h.Log.Info().Str("client", cl.ID).Msgf("client connected")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *ExampleHook) OnDisconnect(cl *mqtt.Client, err error, expire bool) {
|
||||
|
@@ -26,7 +26,6 @@ func main() {
|
||||
}()
|
||||
|
||||
server := mqtt.New(nil)
|
||||
server.Options.Capabilities.ServerKeepAlive = 60
|
||||
server.Options.Capabilities.Compatibilities.ObscureNotAuthorized = true
|
||||
server.Options.Capabilities.Compatibilities.PassiveClientDisconnect = true
|
||||
|
||||
@@ -61,6 +60,7 @@ func (h *pahoAuthHook) ID() string {
|
||||
func (h *pahoAuthHook) Provides(b byte) bool {
|
||||
return bytes.Contains([]byte{
|
||||
mqtt.OnConnectAuthenticate,
|
||||
mqtt.OnConnect,
|
||||
mqtt.OnACLCheck,
|
||||
}, []byte{b})
|
||||
}
|
||||
@@ -72,3 +72,12 @@ func (h *pahoAuthHook) OnConnectAuthenticate(cl *mqtt.Client, pk packets.Packet)
|
||||
func (h *pahoAuthHook) OnACLCheck(cl *mqtt.Client, topic string, write bool) bool {
|
||||
return topic != "test/nosubscribe"
|
||||
}
|
||||
|
||||
func (h *pahoAuthHook) OnConnect(cl *mqtt.Client, pk packets.Packet) error {
|
||||
// Handle paho test_server_keep_alive
|
||||
if pk.Connect.Keepalive == 120 && pk.Connect.Clean {
|
||||
cl.State.Keepalive = 60
|
||||
cl.State.ServerKeepalive = true
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@@ -97,7 +97,7 @@ func main() {
|
||||
|
||||
stats := listeners.NewHTTPStats("stats", ":8080", &listeners.Config{
|
||||
TLSConfig: tlsConfig,
|
||||
}, nil)
|
||||
}, server.Info)
|
||||
err = server.AddListener(stats)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
|
59
hooks.go
59
hooks.go
@@ -25,6 +25,7 @@ const (
|
||||
OnConnectAuthenticate
|
||||
OnACLCheck
|
||||
OnConnect
|
||||
OnSessionEstablish
|
||||
OnSessionEstablished
|
||||
OnDisconnect
|
||||
OnAuthPacket
|
||||
@@ -41,6 +42,7 @@ const (
|
||||
OnPublished
|
||||
OnPublishDropped
|
||||
OnRetainMessage
|
||||
OnRetainPublished
|
||||
OnQosPublish
|
||||
OnQosComplete
|
||||
OnQosDropped
|
||||
@@ -74,7 +76,8 @@ type Hook interface {
|
||||
OnConnectAuthenticate(cl *Client, pk packets.Packet) bool
|
||||
OnACLCheck(cl *Client, topic string, write bool) bool
|
||||
OnSysInfoTick(*system.Info)
|
||||
OnConnect(cl *Client, pk packets.Packet)
|
||||
OnConnect(cl *Client, pk packets.Packet) error
|
||||
OnSessionEstablish(cl *Client, pk packets.Packet)
|
||||
OnSessionEstablished(cl *Client, pk packets.Packet)
|
||||
OnDisconnect(cl *Client, err error, expire bool)
|
||||
OnAuthPacket(cl *Client, pk packets.Packet) (packets.Packet, error)
|
||||
@@ -91,6 +94,7 @@ type Hook interface {
|
||||
OnPublished(cl *Client, pk packets.Packet)
|
||||
OnPublishDropped(cl *Client, pk packets.Packet)
|
||||
OnRetainMessage(cl *Client, pk packets.Packet, r int64)
|
||||
OnRetainPublished(cl *Client, pk packets.Packet)
|
||||
OnQosPublish(cl *Client, pk packets.Packet, sent int64, resends int)
|
||||
OnQosComplete(cl *Client, pk packets.Packet)
|
||||
OnQosDropped(cl *Client, pk packets.Packet)
|
||||
@@ -214,11 +218,25 @@ func (h *Hooks) OnStopped() {
|
||||
}
|
||||
}
|
||||
|
||||
// OnConnect is called when a new client connects.
|
||||
func (h *Hooks) OnConnect(cl *Client, pk packets.Packet) {
|
||||
// OnConnect is called when a new client connects, and may return a packets.Code as an error to halt the connection.
|
||||
func (h *Hooks) OnConnect(cl *Client, pk packets.Packet) error {
|
||||
for _, hook := range h.GetAll() {
|
||||
if hook.Provides(OnConnect) {
|
||||
hook.OnConnect(cl, pk)
|
||||
err := hook.OnConnect(cl, pk)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// OnSessionEstablish is called right after a new client connects and authenticates and right before
|
||||
// the session is established and CONNACK is sent.
|
||||
func (h *Hooks) OnSessionEstablish(cl *Client, pk packets.Packet) {
|
||||
for _, hook := range h.GetAll() {
|
||||
if hook.Provides(OnSessionEstablish) {
|
||||
hook.OnSessionEstablish(cl, pk)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -374,13 +392,14 @@ func (h *Hooks) OnPublish(cl *Client, pk packets.Packet) (pkx packets.Packet, er
|
||||
for _, hook := range h.GetAll() {
|
||||
if hook.Provides(OnPublish) {
|
||||
npk, err := hook.OnPublish(cl, pkx)
|
||||
if err != nil && errors.Is(err, packets.ErrRejectPacket) {
|
||||
h.Log.Debug().Err(err).Str("hook", hook.ID()).Interface("packet", pkx).Msg("publish packet rejected")
|
||||
if err != nil {
|
||||
if errors.Is(err, packets.ErrRejectPacket) {
|
||||
h.Log.Debug().Err(err).Str("hook", hook.ID()).Interface("packet", pkx).Msg("publish packet rejected")
|
||||
return pk, err
|
||||
}
|
||||
h.Log.Error().Err(err).Str("hook", hook.ID()).Interface("packet", pkx).Msg("publish packet error")
|
||||
return pk, err
|
||||
} else if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
pkx = npk
|
||||
}
|
||||
}
|
||||
@@ -416,6 +435,15 @@ func (h *Hooks) OnRetainMessage(cl *Client, pk packets.Packet, r int64) {
|
||||
}
|
||||
}
|
||||
|
||||
// OnRetainPublished is called when a retained message is published.
|
||||
func (h *Hooks) OnRetainPublished(cl *Client, pk packets.Packet) {
|
||||
for _, hook := range h.GetAll() {
|
||||
if hook.Provides(OnRetainPublished) {
|
||||
hook.OnRetainPublished(cl, pk)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// OnQosPublish is called when a publish packet with Qos >= 1 is issued to a subscriber.
|
||||
// In other words, this method is called when a new inflight message is created or resent.
|
||||
// It is typically used to store a new inflight message.
|
||||
@@ -668,7 +696,7 @@ func (h *HookBase) SetOpts(l *zerolog.Logger, opts *HookOptions) {
|
||||
h.Opts = opts
|
||||
}
|
||||
|
||||
// Stop is called to gracefully shutdown the hook.
|
||||
// Stop is called to gracefully shut down the hook.
|
||||
func (h *HookBase) Stop() error {
|
||||
return nil
|
||||
}
|
||||
@@ -693,7 +721,13 @@ func (h *HookBase) OnACLCheck(cl *Client, topic string, write bool) bool {
|
||||
}
|
||||
|
||||
// OnConnect is called when a new client connects.
|
||||
func (h *HookBase) OnConnect(cl *Client, pk packets.Packet) {}
|
||||
func (h *HookBase) OnConnect(cl *Client, pk packets.Packet) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// OnSessionEstablish is called right after a new client connects and authenticates and right before
|
||||
// the session is established and CONNACK is sent.
|
||||
func (h *HookBase) OnSessionEstablish(cl *Client, pk packets.Packet) {}
|
||||
|
||||
// OnSessionEstablished is called when a new client establishes a session (after OnConnect).
|
||||
func (h *HookBase) OnSessionEstablished(cl *Client, pk packets.Packet) {}
|
||||
@@ -757,6 +791,9 @@ func (h *HookBase) OnPublishDropped(cl *Client, pk packets.Packet) {}
|
||||
// OnRetainMessage is called then a published message is retained.
|
||||
func (h *HookBase) OnRetainMessage(cl *Client, pk packets.Packet, r int64) {}
|
||||
|
||||
// OnRetainPublished is called when a retained message is published.
|
||||
func (h *HookBase) OnRetainPublished(cl *Client, pk packets.Packet) {}
|
||||
|
||||
// OnQosPublish is called when a publish packet with Qos > 1 is issued to a subscriber.
|
||||
func (h *HookBase) OnQosPublish(cl *Client, pk packets.Packet, sent int64, resends int) {}
|
||||
|
||||
|
@@ -50,6 +50,14 @@ func (h *modifiedHookBase) Stop() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *modifiedHookBase) OnConnect(cl *Client, pk packets.Packet) error {
|
||||
if h.fail {
|
||||
return errTestHook
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *modifiedHookBase) OnConnectAuthenticate(cl *Client, pk packets.Packet) bool {
|
||||
return true
|
||||
}
|
||||
@@ -228,7 +236,7 @@ func TestHooksNonReturns(t *testing.T) {
|
||||
h.OnStarted()
|
||||
h.OnStopped()
|
||||
h.OnSysInfoTick(new(system.Info))
|
||||
h.OnConnect(cl, packets.Packet{})
|
||||
h.OnSessionEstablish(cl, packets.Packet{})
|
||||
h.OnSessionEstablished(cl, packets.Packet{})
|
||||
h.OnDisconnect(cl, nil, false)
|
||||
h.OnPacketSent(cl, packets.Packet{}, []byte{})
|
||||
@@ -238,6 +246,7 @@ func TestHooksNonReturns(t *testing.T) {
|
||||
h.OnPublished(cl, packets.Packet{})
|
||||
h.OnPublishDropped(cl, packets.Packet{})
|
||||
h.OnRetainMessage(cl, packets.Packet{}, 0)
|
||||
h.OnRetainPublished(cl, packets.Packet{})
|
||||
h.OnQosPublish(cl, packets.Packet{}, time.Now().Unix(), 0)
|
||||
h.OnQosComplete(cl, packets.Packet{})
|
||||
h.OnQosDropped(cl, packets.Packet{})
|
||||
@@ -338,7 +347,7 @@ func TestHooksOnPublish(t *testing.T) {
|
||||
// coverage: failure
|
||||
hook.fail = true
|
||||
pk, err = h.OnPublish(new(Client), packets.Packet{PacketID: 10})
|
||||
require.NoError(t, err)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, uint16(10), pk.PacketID)
|
||||
|
||||
// coverage: reject packet
|
||||
@@ -393,6 +402,22 @@ func TestHooksOnAuthPacket(t *testing.T) {
|
||||
require.Equal(t, uint16(10), pk.PacketID)
|
||||
}
|
||||
|
||||
func TestHooksOnConnect(t *testing.T) {
|
||||
h := new(Hooks)
|
||||
h.Log = &logger
|
||||
|
||||
hook := new(modifiedHookBase)
|
||||
err := h.Add(hook, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = h.OnConnect(new(Client), packets.Packet{PacketID: 10})
|
||||
require.NoError(t, err)
|
||||
|
||||
hook.fail = true
|
||||
err = h.OnConnect(new(Client), packets.Packet{PacketID: 10})
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestHooksOnPacketEncode(t *testing.T) {
|
||||
h := new(Hooks)
|
||||
h.Log = &logger
|
||||
@@ -565,12 +590,19 @@ func TestHookBaseOnConnectAuthenticate(t *testing.T) {
|
||||
v := h.OnConnectAuthenticate(new(Client), packets.Packet{})
|
||||
require.False(t, v)
|
||||
}
|
||||
|
||||
func TestHookBaseOnACLCheck(t *testing.T) {
|
||||
h := new(HookBase)
|
||||
v := h.OnACLCheck(new(Client), "topic", true)
|
||||
require.False(t, v)
|
||||
}
|
||||
|
||||
func TestHookBaseOnConnect(t *testing.T) {
|
||||
h := new(HookBase)
|
||||
err := h.OnConnect(new(Client), packets.Packet{})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestHookBaseOnPublish(t *testing.T) {
|
||||
h := new(HookBase)
|
||||
pk, err := h.OnPublish(new(Client), packets.Packet{PacketID: 10})
|
||||
|
104
listeners/http_healthcheck.go
Normal file
104
listeners/http_healthcheck.go
Normal file
@@ -0,0 +1,104 @@
|
||||
// SPDX-License-Identifier: MIT
|
||||
// SPDX-FileCopyrightText: 2023 mochi-co
|
||||
// SPDX-FileContributor: Derek Duncan
|
||||
|
||||
package listeners
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
// HTTPHealthCheck is a listener for providing an HTTP healthcheck endpoint.
|
||||
type HTTPHealthCheck struct {
|
||||
sync.RWMutex
|
||||
id string // the internal id of the listener
|
||||
address string // the network address to bind to
|
||||
config *Config // configuration values for the listener
|
||||
listen *http.Server // the http server
|
||||
log *zerolog.Logger // server logger
|
||||
end uint32 // ensure the close methods are only called once
|
||||
}
|
||||
|
||||
// NewHTTPHealthCheck initialises and returns a new HTTP listener, listening on an address.
|
||||
func NewHTTPHealthCheck(id, address string, config *Config) *HTTPHealthCheck {
|
||||
if config == nil {
|
||||
config = new(Config)
|
||||
}
|
||||
return &HTTPHealthCheck{
|
||||
id: id,
|
||||
address: address,
|
||||
config: config,
|
||||
}
|
||||
}
|
||||
|
||||
// ID returns the id of the listener.
|
||||
func (l *HTTPHealthCheck) ID() string {
|
||||
return l.id
|
||||
}
|
||||
|
||||
// Address returns the address of the listener.
|
||||
func (l *HTTPHealthCheck) Address() string {
|
||||
return l.address
|
||||
}
|
||||
|
||||
// Protocol returns the address of the listener.
|
||||
func (l *HTTPHealthCheck) Protocol() string {
|
||||
if l.listen != nil && l.listen.TLSConfig != nil {
|
||||
return "https"
|
||||
}
|
||||
|
||||
return "http"
|
||||
}
|
||||
|
||||
// Init initializes the listener.
|
||||
func (l *HTTPHealthCheck) Init(log *zerolog.Logger) error {
|
||||
l.log = log
|
||||
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/healthcheck", func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodGet {
|
||||
w.WriteHeader(http.StatusMethodNotAllowed)
|
||||
}
|
||||
})
|
||||
l.listen = &http.Server{
|
||||
ReadTimeout: 5 * time.Second,
|
||||
WriteTimeout: 5 * time.Second,
|
||||
Addr: l.address,
|
||||
Handler: mux,
|
||||
}
|
||||
|
||||
if l.config.TLSConfig != nil {
|
||||
l.listen.TLSConfig = l.config.TLSConfig
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Serve starts listening for new connections and serving responses.
|
||||
func (l *HTTPHealthCheck) Serve(establish EstablishFn) {
|
||||
if l.listen.TLSConfig != nil {
|
||||
l.listen.ListenAndServeTLS("", "")
|
||||
} else {
|
||||
l.listen.ListenAndServe()
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes the listener and any client connections.
|
||||
func (l *HTTPHealthCheck) Close(closeClients CloseFn) {
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
|
||||
if atomic.CompareAndSwapUint32(&l.end, 0, 1) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
l.listen.Shutdown(ctx)
|
||||
}
|
||||
|
||||
closeClients(l.id)
|
||||
}
|
143
listeners/http_healthcheck_test.go
Normal file
143
listeners/http_healthcheck_test.go
Normal file
@@ -0,0 +1,143 @@
|
||||
// SPDX-License-Identifier: MIT
|
||||
// SPDX-FileCopyrightText: 2023 mochi-co
|
||||
// SPDX-FileContributor: Derek Duncan
|
||||
|
||||
package listeners
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net/http"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestNewHTTPHealthCheck(t *testing.T) {
|
||||
l := NewHTTPHealthCheck("healthcheck", testAddr, nil)
|
||||
require.Equal(t, "healthcheck", l.id)
|
||||
require.Equal(t, testAddr, l.address)
|
||||
}
|
||||
|
||||
func TestHTTPHealthCheckID(t *testing.T) {
|
||||
l := NewHTTPHealthCheck("healthcheck", testAddr, nil)
|
||||
require.Equal(t, "healthcheck", l.ID())
|
||||
}
|
||||
|
||||
func TestHTTPHealthCheckAddress(t *testing.T) {
|
||||
l := NewHTTPHealthCheck("healthcheck", testAddr, nil)
|
||||
require.Equal(t, testAddr, l.Address())
|
||||
}
|
||||
|
||||
func TestHTTPHealthCheckProtocol(t *testing.T) {
|
||||
l := NewHTTPHealthCheck("healthcheck", testAddr, nil)
|
||||
require.Equal(t, "http", l.Protocol())
|
||||
}
|
||||
|
||||
func TestHTTPHealthCheckTLSProtocol(t *testing.T) {
|
||||
l := NewHTTPHealthCheck("healthcheck", testAddr, &Config{
|
||||
TLSConfig: tlsConfigBasic,
|
||||
})
|
||||
|
||||
l.Init(nil)
|
||||
require.Equal(t, "https", l.Protocol())
|
||||
}
|
||||
|
||||
func TestHTTPHealthCheckInit(t *testing.T) {
|
||||
l := NewHTTPHealthCheck("healthcheck", testAddr, nil)
|
||||
err := l.Init(nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NotNil(t, l.listen)
|
||||
require.Equal(t, testAddr, l.listen.Addr)
|
||||
}
|
||||
|
||||
func TestHTTPHealthCheckServeAndClose(t *testing.T) {
|
||||
// setup http stats listener
|
||||
l := NewHTTPHealthCheck("healthcheck", testAddr, nil)
|
||||
err := l.Init(nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
o := make(chan bool)
|
||||
go func(o chan bool) {
|
||||
l.Serve(MockEstablisher)
|
||||
o <- true
|
||||
}(o)
|
||||
|
||||
time.Sleep(time.Millisecond)
|
||||
|
||||
// call healthcheck
|
||||
resp, err := http.Get("http://localhost" + testAddr + "/healthcheck")
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, resp)
|
||||
|
||||
defer resp.Body.Close()
|
||||
_, err = io.ReadAll(resp.Body)
|
||||
require.NoError(t, err)
|
||||
|
||||
// ensure listening is closed
|
||||
var closed bool
|
||||
l.Close(func(id string) {
|
||||
closed = true
|
||||
})
|
||||
|
||||
require.Equal(t, true, closed)
|
||||
|
||||
_, err = http.Get("http://localhost/healthcheck" + testAddr + "/healthcheck")
|
||||
require.Error(t, err)
|
||||
<-o
|
||||
}
|
||||
|
||||
func TestHTTPHealthCheckServeAndCloseMethodNotAllowed(t *testing.T) {
|
||||
// setup http stats listener
|
||||
l := NewHTTPHealthCheck("healthcheck", testAddr, nil)
|
||||
err := l.Init(nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
o := make(chan bool)
|
||||
go func(o chan bool) {
|
||||
l.Serve(MockEstablisher)
|
||||
o <- true
|
||||
}(o)
|
||||
|
||||
time.Sleep(time.Millisecond)
|
||||
|
||||
// make disallowed method type http request
|
||||
resp, err := http.Post("http://localhost"+testAddr+"/healthcheck", "application/json", http.NoBody)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, resp)
|
||||
|
||||
defer resp.Body.Close()
|
||||
_, err = io.ReadAll(resp.Body)
|
||||
require.NoError(t, err)
|
||||
|
||||
// ensure listening is closed
|
||||
var closed bool
|
||||
l.Close(func(id string) {
|
||||
closed = true
|
||||
})
|
||||
|
||||
require.Equal(t, true, closed)
|
||||
|
||||
_, err = http.Post("http://localhost/healthcheck"+testAddr+"/healthcheck", "application/json", http.NoBody)
|
||||
require.Error(t, err)
|
||||
<-o
|
||||
}
|
||||
|
||||
func TestHTTPHealthCheckServeTLSAndClose(t *testing.T) {
|
||||
l := NewHTTPHealthCheck("healthcheck", testAddr, &Config{
|
||||
TLSConfig: tlsConfigBasic,
|
||||
})
|
||||
|
||||
err := l.Init(nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
o := make(chan bool)
|
||||
go func(o chan bool) {
|
||||
l.Serve(MockEstablisher)
|
||||
o <- true
|
||||
}(o)
|
||||
|
||||
time.Sleep(time.Millisecond)
|
||||
l.Close(MockCloser)
|
||||
}
|
@@ -1,3 +1,7 @@
|
||||
// SPDX-License-Identifier: MIT
|
||||
// SPDX-FileCopyrightText: 2023 mochi-co
|
||||
// SPDX-FileContributor: Jeroen Rinzema
|
||||
|
||||
package listeners
|
||||
|
||||
import (
|
||||
|
@@ -82,6 +82,7 @@ const (
|
||||
TConnackAcceptedAdjustedExpiryInterval
|
||||
TConnackMinMqtt5
|
||||
TConnackMinCleanMqtt5
|
||||
TConnackServerKeepalive
|
||||
TConnackInvalidMinMqtt5
|
||||
TConnackBadProtocolVersion
|
||||
TConnackProtocolViolationNoSession
|
||||
@@ -1085,25 +1086,22 @@ var TPacketData = map[byte]TPacketCases{
|
||||
Desc: "accepted, no session, adjusted expiry interval mqtt5",
|
||||
Primary: true,
|
||||
RawBytes: []byte{
|
||||
Connack << 4, 11, // fixed header
|
||||
Connack << 4, 8, // fixed header
|
||||
0, // Session present
|
||||
CodeSuccess.Code,
|
||||
8, // length
|
||||
5, // length
|
||||
17, 0, 0, 0, 120, // Session Expiry Interval (17)
|
||||
19, 0, 10, // Server Keep Alive (19)
|
||||
},
|
||||
Packet: &Packet{
|
||||
ProtocolVersion: 5,
|
||||
FixedHeader: FixedHeader{
|
||||
Type: Connack,
|
||||
Remaining: 11,
|
||||
Remaining: 8,
|
||||
},
|
||||
ReasonCode: CodeSuccess.Code,
|
||||
Properties: Properties{
|
||||
SessionExpiryInterval: uint32(120),
|
||||
SessionExpiryIntervalFlag: true,
|
||||
ServerKeepAlive: uint16(10),
|
||||
ServerKeepAliveFlag: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -1190,28 +1188,25 @@ var TPacketData = map[byte]TPacketCases{
|
||||
Desc: "accepted min properties mqtt5",
|
||||
Primary: true,
|
||||
RawBytes: []byte{
|
||||
Connack << 4, 16, // fixed header
|
||||
Connack << 4, 13, // fixed header
|
||||
1, // existing session
|
||||
CodeSuccess.Code,
|
||||
13, // Properties length
|
||||
10, // Properties length
|
||||
18, 0, 5, 'm', 'o', 'c', 'h', 'i', // Assigned Client ID (18)
|
||||
19, 0, 20, // Server Keep Alive (19)
|
||||
36, 1, // Maximum Qos (36)
|
||||
},
|
||||
Packet: &Packet{
|
||||
ProtocolVersion: 5,
|
||||
FixedHeader: FixedHeader{
|
||||
Type: Connack,
|
||||
Remaining: 16,
|
||||
Remaining: 13,
|
||||
},
|
||||
SessionPresent: true,
|
||||
ReasonCode: CodeSuccess.Code,
|
||||
Properties: Properties{
|
||||
ServerKeepAlive: uint16(20),
|
||||
ServerKeepAliveFlag: true,
|
||||
AssignedClientID: "mochi",
|
||||
MaximumQos: byte(1),
|
||||
MaximumQosFlag: true,
|
||||
AssignedClientID: "mochi",
|
||||
MaximumQos: byte(1),
|
||||
MaximumQosFlag: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -1220,11 +1215,10 @@ var TPacketData = map[byte]TPacketCases{
|
||||
Desc: "accepted min properties mqtt5b",
|
||||
Primary: true,
|
||||
RawBytes: []byte{
|
||||
Connack << 4, 6, // fixed header
|
||||
Connack << 4, 3, // fixed header
|
||||
0, // existing session
|
||||
CodeSuccess.Code,
|
||||
3, // Properties length
|
||||
19, 0, 10, // server keepalive
|
||||
0, // Properties length
|
||||
},
|
||||
Packet: &Packet{
|
||||
ProtocolVersion: 5,
|
||||
@@ -1234,6 +1228,27 @@ var TPacketData = map[byte]TPacketCases{
|
||||
},
|
||||
SessionPresent: false,
|
||||
ReasonCode: CodeSuccess.Code,
|
||||
},
|
||||
},
|
||||
{
|
||||
Case: TConnackServerKeepalive,
|
||||
Desc: "server set keepalive",
|
||||
Primary: true,
|
||||
RawBytes: []byte{
|
||||
Connack << 4, 6, // fixed header
|
||||
1, // existing session
|
||||
CodeSuccess.Code,
|
||||
3, // Properties length
|
||||
19, 0, 10, // server keepalive
|
||||
},
|
||||
Packet: &Packet{
|
||||
ProtocolVersion: 5,
|
||||
FixedHeader: FixedHeader{
|
||||
Type: Connack,
|
||||
Remaining: 6,
|
||||
},
|
||||
SessionPresent: true,
|
||||
ReasonCode: CodeSuccess.Code,
|
||||
Properties: Properties{
|
||||
ServerKeepAlive: uint16(10),
|
||||
ServerKeepAliveFlag: true,
|
||||
@@ -1245,26 +1260,23 @@ var TPacketData = map[byte]TPacketCases{
|
||||
Desc: "failure min properties mqtt5",
|
||||
Primary: true,
|
||||
RawBytes: append([]byte{
|
||||
Connack << 4, 26, // fixed header
|
||||
Connack << 4, 23, // fixed header
|
||||
0, // No existing session
|
||||
ErrUnspecifiedError.Code,
|
||||
// Properties
|
||||
23, // length
|
||||
19, 0, 20, // Server Keep Alive (19)
|
||||
20, // length
|
||||
31, 0, 17, // Reason String (31)
|
||||
}, []byte(ErrUnspecifiedError.Reason)...),
|
||||
Packet: &Packet{
|
||||
ProtocolVersion: 5,
|
||||
FixedHeader: FixedHeader{
|
||||
Type: Connack,
|
||||
Remaining: 25,
|
||||
Remaining: 23,
|
||||
},
|
||||
SessionPresent: false,
|
||||
ReasonCode: ErrUnspecifiedError.Code,
|
||||
Properties: Properties{
|
||||
ServerKeepAlive: uint16(20),
|
||||
ServerKeepAliveFlag: true,
|
||||
ReasonString: ErrUnspecifiedError.Reason,
|
||||
ReasonString: ErrUnspecifiedError.Reason,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
69
server.go
69
server.go
@@ -26,8 +26,8 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
Version = "2.2.8" // the current server version.
|
||||
defaultSysTopicInterval int64 = 1 // the interval between $SYS topic publishes
|
||||
Version = "2.2.15" // the current server version.
|
||||
defaultSysTopicInterval int64 = 1 // the interval between $SYS topic publishes
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -43,7 +43,6 @@ var (
|
||||
WildcardSubAvailable: 1, // wildcard subscriptions are available
|
||||
SubIDAvailable: 1, // subscription identifiers are available
|
||||
SharedSubAvailable: 1, // shared subscriptions are available
|
||||
ServerKeepAlive: 10, // default keepalive for clients
|
||||
MinimumProtocolVersion: 3, // minimum supported mqtt version (3.0.0)
|
||||
MaximumClientWritesPending: 1024 * 8, // maximum number of pending message writes for a client
|
||||
}
|
||||
@@ -61,7 +60,6 @@ type Capabilities struct {
|
||||
maximumPacketID uint32 // unexported, used for testing only
|
||||
ReceiveMaximum uint16
|
||||
TopicAliasMaximum uint16
|
||||
ServerKeepAlive uint16
|
||||
SharedSubAvailable byte
|
||||
MinimumProtocolVersion byte
|
||||
Compatibilities Compatibilities
|
||||
@@ -215,7 +213,7 @@ func (s *Server) NewClient(c net.Conn, listener string, id string, inline bool)
|
||||
|
||||
if inline { // inline clients bypass acl and some validity checks.
|
||||
cl.Net.Inline = true
|
||||
// By default we don't want to restrict developer publishes,
|
||||
// By default, we don't want to restrict developer publishes,
|
||||
// but if you do, reset this after creating inline client.
|
||||
cl.State.Inflight.ResetReceiveQuota(math.MaxInt32)
|
||||
} else {
|
||||
@@ -324,16 +322,20 @@ func (s *Server) attachClient(cl *Client, listener string) error {
|
||||
cl.ParseConnect(listener, pk)
|
||||
code := s.validateConnect(cl, pk) // [MQTT-3.1.4-1] [MQTT-3.1.4-2]
|
||||
if code != packets.CodeSuccess {
|
||||
if err := s.sendConnack(cl, code, false); err != nil {
|
||||
if err := s.SendConnack(cl, code, false, nil); err != nil {
|
||||
return fmt.Errorf("invalid connection send ack: %w", err)
|
||||
}
|
||||
return code // [MQTT-3.2.2-7] [MQTT-3.1.4-6]
|
||||
}
|
||||
|
||||
s.hooks.OnConnect(cl, pk)
|
||||
err = s.hooks.OnConnect(cl, pk)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cl.refreshDeadline(cl.State.Keepalive)
|
||||
if !s.hooks.OnConnectAuthenticate(cl, pk) { // [MQTT-3.1.4-2]
|
||||
err := s.sendConnack(cl, packets.ErrBadUsernameOrPassword, false)
|
||||
err := s.SendConnack(cl, packets.ErrBadUsernameOrPassword, false, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid connection send ack: %w", err)
|
||||
}
|
||||
@@ -344,10 +346,12 @@ func (s *Server) attachClient(cl *Client, listener string) error {
|
||||
atomic.AddInt64(&s.Info.ClientsConnected, 1)
|
||||
defer atomic.AddInt64(&s.Info.ClientsConnected, -1)
|
||||
|
||||
s.hooks.OnSessionEstablish(cl, pk)
|
||||
|
||||
sessionPresent := s.inheritClientSession(pk, cl)
|
||||
s.Clients.Add(cl) // [MQTT-4.1.0-1]
|
||||
|
||||
err = s.sendConnack(cl, code, sessionPresent) // [MQTT-3.1.4-5] [MQTT-3.2.0-1] [MQTT-3.2.0-2] &[MQTT-3.14.0-1]
|
||||
err = s.SendConnack(cl, code, sessionPresent, nil) // [MQTT-3.1.4-5] [MQTT-3.2.0-1] [MQTT-3.2.0-2] &[MQTT-3.14.0-1]
|
||||
if err != nil {
|
||||
return fmt.Errorf("ack connection packet: %w", err)
|
||||
}
|
||||
@@ -495,12 +499,18 @@ func (s *Server) inheritClientSession(pk packets.Packet, cl *Client) bool {
|
||||
return false // [MQTT-3.2.2-2]
|
||||
}
|
||||
|
||||
// sendConnack returns a Connack packet to a client.
|
||||
func (s *Server) sendConnack(cl *Client, reason packets.Code, present bool) error {
|
||||
properties := packets.Properties{
|
||||
ServerKeepAlive: s.Options.Capabilities.ServerKeepAlive, // [MQTT-3.1.2-21]
|
||||
ServerKeepAliveFlag: true,
|
||||
ReceiveMaximum: s.Options.Capabilities.ReceiveMaximum, // 3.2.2.3.3 Receive Maximum
|
||||
// SendConnack returns a Connack packet to a client.
|
||||
func (s *Server) SendConnack(cl *Client, reason packets.Code, present bool, properties *packets.Properties) error {
|
||||
if properties == nil {
|
||||
properties = &packets.Properties{
|
||||
ReceiveMaximum: s.Options.Capabilities.ReceiveMaximum,
|
||||
}
|
||||
}
|
||||
|
||||
properties.ReceiveMaximum = s.Options.Capabilities.ReceiveMaximum // 3.2.2.3.3 Receive Maximum
|
||||
if cl.State.ServerKeepalive { // You can set this dynamically using the OnConnect hook.
|
||||
properties.ServerKeepAlive = cl.State.Keepalive // [MQTT-3.1.2-21]
|
||||
properties.ServerKeepAliveFlag = true
|
||||
}
|
||||
|
||||
if reason.Code >= packets.ErrUnspecifiedError.Code {
|
||||
@@ -517,9 +527,8 @@ func (s *Server) sendConnack(cl *Client, reason packets.Code, present bool) erro
|
||||
},
|
||||
SessionPresent: false, // [MQTT-3.2.2-6]
|
||||
ReasonCode: reason.Code, // [MQTT-3.2.2-8]
|
||||
Properties: properties,
|
||||
Properties: *properties,
|
||||
}
|
||||
|
||||
return cl.WritePacket(ack)
|
||||
}
|
||||
|
||||
@@ -539,14 +548,15 @@ func (s *Server) sendConnack(cl *Client, reason packets.Code, present bool) erro
|
||||
cl.Properties.Props.SessionExpiryIntervalFlag = true
|
||||
}
|
||||
|
||||
return cl.WritePacket(packets.Packet{
|
||||
ack := packets.Packet{
|
||||
FixedHeader: packets.FixedHeader{
|
||||
Type: packets.Connack,
|
||||
},
|
||||
SessionPresent: present,
|
||||
ReasonCode: reason.Code, // [MQTT-3.2.2-8]
|
||||
Properties: properties,
|
||||
})
|
||||
Properties: *properties,
|
||||
}
|
||||
return cl.WritePacket(ack)
|
||||
}
|
||||
|
||||
// processPacket processes an inbound packet for a client. Since the method is
|
||||
@@ -752,7 +762,7 @@ func (s *Server) processPublish(cl *Client, pk packets.Packet) error {
|
||||
}
|
||||
|
||||
// retainMessage adds a message to a topic, and if a persistent store is provided,
|
||||
// adds the message to the store so it can be reloaded if necessary.
|
||||
// adds the message to the store to be reloaded if necessary.
|
||||
func (s *Server) retainMessage(cl *Client, pk packets.Packet) {
|
||||
out := pk.Copy(false)
|
||||
r := s.Topics.RetainMessage(out)
|
||||
@@ -847,7 +857,7 @@ func (s *Server) publishToClient(cl *Client, sub packets.Subscription, pk packet
|
||||
}
|
||||
}
|
||||
|
||||
if cl.Net.Conn == nil || atomic.LoadUint32(&cl.State.done) == 1 {
|
||||
if cl.Net.Conn == nil || cl.Closed() {
|
||||
return out, packets.CodeDisconnect
|
||||
}
|
||||
|
||||
@@ -878,11 +888,13 @@ func (s *Server) publishRetainedToClient(cl *Client, sub packets.Subscription, e
|
||||
_, err := s.publishToClient(cl, sub, pkv)
|
||||
if err != nil {
|
||||
s.Log.Debug().Err(err).Str("client", cl.ID).Str("listener", cl.Net.Listener).Interface("packet", pkv).Msg("failed to publish retained message")
|
||||
continue
|
||||
}
|
||||
s.hooks.OnRetainPublished(cl, pkv)
|
||||
}
|
||||
}
|
||||
|
||||
// buildAck builds an standardised ack message for Puback, Pubrec, Pubrel, Pubcomp packets.
|
||||
// buildAck builds a standardised ack message for Puback, Pubrec, Pubrel, Pubcomp packets.
|
||||
func (s *Server) buildAck(packetID uint16, pkt, qos byte, properties packets.Properties, reason packets.Code) packets.Packet {
|
||||
properties = packets.Properties{} // PRL
|
||||
if reason.Code >= packets.ErrUnspecifiedError.Code {
|
||||
@@ -1027,7 +1039,7 @@ func (s *Server) processSubscribe(cl *Client, pk packets.Packet) error {
|
||||
}
|
||||
}
|
||||
|
||||
ack := packets.Packet{ //[MQTT-3.8.4-1] [MQTT-3.8.4-5]
|
||||
ack := packets.Packet{ // [MQTT-3.8.4-1] [MQTT-3.8.4-5]
|
||||
FixedHeader: packets.FixedHeader{
|
||||
Type: packets.Suback,
|
||||
},
|
||||
@@ -1234,7 +1246,7 @@ func (s *Server) publishSysTopics() {
|
||||
s.hooks.OnSysInfoTick(s.Info)
|
||||
}
|
||||
|
||||
// Close attempts to gracefully shutdown the server, all listeners, clients, and stores.
|
||||
// Close attempts to gracefully shut down the server, all listeners, clients, and stores.
|
||||
func (s *Server) Close() error {
|
||||
close(s.done)
|
||||
s.Listeners.CloseAll(s.closeListenerClients)
|
||||
@@ -1283,6 +1295,10 @@ func (s *Server) sendLWT(cl *Client) {
|
||||
return
|
||||
}
|
||||
|
||||
if pk.FixedHeader.Retain {
|
||||
s.retainMessage(cl, pk)
|
||||
}
|
||||
|
||||
s.publishToSubscribers(pk) // [MQTT-3.1.2-8]
|
||||
atomic.StoreUint32(&cl.Properties.Will.Flag, 0) // [MQTT-3.1.2-10]
|
||||
s.hooks.OnWillSent(cl, pk)
|
||||
@@ -1475,6 +1491,9 @@ func (s *Server) sendDelayedLWT(dt int64) {
|
||||
if dt > pk.Expiry {
|
||||
s.publishToSubscribers(pk) // [MQTT-3.1.2-8]
|
||||
if cl, ok := s.Clients.Get(id); ok {
|
||||
if pk.FixedHeader.Retain {
|
||||
s.retainMessage(cl, pk)
|
||||
}
|
||||
cl.Properties.Will = Will{} // [MQTT-3.1.2-10]
|
||||
s.hooks.OnWillSent(cl, pk)
|
||||
}
|
||||
|
@@ -127,7 +127,7 @@ func TestServerNewClient(t *testing.T) {
|
||||
require.NotNil(t, cl.State.Inflight.internal)
|
||||
require.NotNil(t, cl.State.Subscriptions)
|
||||
require.NotNil(t, cl.State.TopicAliases)
|
||||
require.Equal(t, defaultKeepalive, cl.State.keepalive)
|
||||
require.Equal(t, defaultKeepalive, cl.State.Keepalive)
|
||||
require.Equal(t, defaultClientProtocolVersion, cl.Properties.ProtocolVersion)
|
||||
require.NotNil(t, cl.Net.Conn)
|
||||
require.NotNil(t, cl.Net.bconn)
|
||||
@@ -817,17 +817,40 @@ func TestServerEstablishConnectionBadPacket(t *testing.T) {
|
||||
r.Close()
|
||||
}
|
||||
|
||||
func TestServerEstablishConnectionOnConnectError(t *testing.T) {
|
||||
s := newServer()
|
||||
hook := new(modifiedHookBase)
|
||||
hook.fail = true
|
||||
err := s.AddHook(hook, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
r, w := net.Pipe()
|
||||
o := make(chan error)
|
||||
go func() {
|
||||
o <- s.EstablishConnection("tcp", r)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
w.Write(packets.TPacketData[packets.Connect].Get(packets.TConnectClean).RawBytes)
|
||||
}()
|
||||
|
||||
err = <-o
|
||||
require.Error(t, err)
|
||||
require.ErrorIs(t, err, errTestHook)
|
||||
|
||||
r.Close()
|
||||
}
|
||||
|
||||
func TestServerSendConnack(t *testing.T) {
|
||||
s := newServer()
|
||||
cl, r, w := newTestClient()
|
||||
cl.Properties.ProtocolVersion = 5
|
||||
s.Options.Capabilities.ServerKeepAlive = 20
|
||||
s.Options.Capabilities.MaximumQos = 1
|
||||
cl.Properties.Props = packets.Properties{
|
||||
AssignedClientID: "mochi",
|
||||
}
|
||||
go func() {
|
||||
err := s.sendConnack(cl, packets.CodeSuccess, true)
|
||||
err := s.SendConnack(cl, packets.CodeSuccess, true, nil)
|
||||
require.NoError(t, err)
|
||||
w.Close()
|
||||
}()
|
||||
@@ -841,9 +864,8 @@ func TestServerSendConnackFailureReason(t *testing.T) {
|
||||
s := newServer()
|
||||
cl, r, w := newTestClient()
|
||||
cl.Properties.ProtocolVersion = 5
|
||||
s.Options.Capabilities.ServerKeepAlive = 20
|
||||
go func() {
|
||||
err := s.sendConnack(cl, packets.ErrUnspecifiedError, true)
|
||||
err := s.SendConnack(cl, packets.ErrUnspecifiedError, true, nil)
|
||||
require.NoError(t, err)
|
||||
w.Close()
|
||||
}()
|
||||
@@ -853,6 +875,23 @@ func TestServerSendConnackFailureReason(t *testing.T) {
|
||||
require.Equal(t, packets.TPacketData[packets.Connack].Get(packets.TConnackInvalidMinMqtt5).RawBytes, buf)
|
||||
}
|
||||
|
||||
func TestServerSendConnackWithServerKeepalive(t *testing.T) {
|
||||
s := newServer()
|
||||
cl, r, w := newTestClient()
|
||||
cl.Properties.ProtocolVersion = 5
|
||||
cl.State.Keepalive = 10
|
||||
cl.State.ServerKeepalive = true
|
||||
go func() {
|
||||
err := s.SendConnack(cl, packets.CodeSuccess, true, nil)
|
||||
require.NoError(t, err)
|
||||
w.Close()
|
||||
}()
|
||||
|
||||
buf, err := io.ReadAll(r)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, packets.TPacketData[packets.Connack].Get(packets.TConnackServerKeepalive).RawBytes, buf)
|
||||
}
|
||||
|
||||
func TestServerValidateConnect(t *testing.T) {
|
||||
packet := *packets.TPacketData[packets.Connect].Get(packets.TConnectMqtt5).Packet
|
||||
invalidBitPacket := packet
|
||||
@@ -922,7 +961,7 @@ func TestServerSendConnackAdjustedExpiryInterval(t *testing.T) {
|
||||
cl.Properties.Props.SessionExpiryInterval = uint32(300)
|
||||
s.Options.Capabilities.MaximumSessionExpiryInterval = 120
|
||||
go func() {
|
||||
err := s.sendConnack(cl, packets.CodeSuccess, false)
|
||||
err := s.SendConnack(cl, packets.CodeSuccess, false, nil)
|
||||
require.NoError(t, err)
|
||||
w.Close()
|
||||
}()
|
||||
@@ -2476,7 +2515,7 @@ func TestServerProcessPacketDisconnect(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, 0, s.loop.willDelayed.Len())
|
||||
require.Equal(t, uint32(1), atomic.LoadUint32(&cl.State.done))
|
||||
require.True(t, cl.Closed())
|
||||
require.Equal(t, time.Now().Unix(), atomic.LoadInt64(&cl.State.disconnected))
|
||||
}
|
||||
|
||||
@@ -2571,6 +2610,46 @@ func TestServerSendLWT(t *testing.T) {
|
||||
require.Equal(t, packets.TPacketData[packets.Publish].Get(packets.TPublishBasic).RawBytes, <-receiverBuf)
|
||||
}
|
||||
|
||||
func TestServerSendLWTRetain(t *testing.T) {
|
||||
s := newServer()
|
||||
s.Serve()
|
||||
defer s.Close()
|
||||
|
||||
sender, _, w1 := newTestClient()
|
||||
sender.ID = "sender"
|
||||
sender.Properties.Will = Will{
|
||||
Flag: 1,
|
||||
TopicName: "a/b/c",
|
||||
Payload: []byte("hello mochi"),
|
||||
Retain: true,
|
||||
}
|
||||
s.Clients.Add(sender)
|
||||
|
||||
receiver, r2, w2 := newTestClient()
|
||||
receiver.ID = "receiver"
|
||||
s.Clients.Add(receiver)
|
||||
s.Topics.Subscribe(receiver.ID, packets.Subscription{Filter: "a/b/c", Qos: 0})
|
||||
|
||||
require.Equal(t, int64(0), atomic.LoadInt64(&s.Info.PacketsReceived))
|
||||
require.Equal(t, 0, len(s.Topics.Messages("a/b/c")))
|
||||
|
||||
receiverBuf := make(chan []byte)
|
||||
go func() {
|
||||
buf, err := io.ReadAll(r2)
|
||||
require.NoError(t, err)
|
||||
receiverBuf <- buf
|
||||
}()
|
||||
|
||||
go func() {
|
||||
s.sendLWT(sender)
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
w1.Close()
|
||||
w2.Close()
|
||||
}()
|
||||
|
||||
require.Equal(t, packets.TPacketData[packets.Publish].Get(packets.TPublishRetain).RawBytes, <-receiverBuf)
|
||||
}
|
||||
|
||||
func TestServerSendLWTDelayed(t *testing.T) {
|
||||
s := newServer()
|
||||
cl1, _, _ := newTestClient()
|
||||
@@ -2806,7 +2885,7 @@ func TestServerClearExpiredClients(t *testing.T) {
|
||||
cl0, _, _ := newTestClient()
|
||||
cl0.ID = "c0"
|
||||
cl0.State.disconnected = n - 10
|
||||
cl0.State.done = 1
|
||||
cl0.State.cancelOpen()
|
||||
cl0.Properties.ProtocolVersion = 5
|
||||
cl0.Properties.Props.SessionExpiryInterval = 12
|
||||
cl0.Properties.Props.SessionExpiryIntervalFlag = true
|
||||
@@ -2816,7 +2895,7 @@ func TestServerClearExpiredClients(t *testing.T) {
|
||||
cl1, _, _ := newTestClient()
|
||||
cl1.ID = "c1"
|
||||
cl1.State.disconnected = n - 10
|
||||
cl1.State.done = 1
|
||||
cl1.State.cancelOpen()
|
||||
cl1.Properties.ProtocolVersion = 5
|
||||
cl1.Properties.Props.SessionExpiryInterval = 8
|
||||
cl1.Properties.Props.SessionExpiryIntervalFlag = true
|
||||
@@ -2826,7 +2905,7 @@ func TestServerClearExpiredClients(t *testing.T) {
|
||||
cl2, _, _ := newTestClient()
|
||||
cl2.ID = "c2"
|
||||
cl2.State.disconnected = n - 10
|
||||
cl2.State.done = 1
|
||||
cl2.State.cancelOpen()
|
||||
cl2.Properties.ProtocolVersion = 5
|
||||
cl2.Properties.Props.SessionExpiryInterval = 0
|
||||
cl2.Properties.Props.SessionExpiryIntervalFlag = true
|
||||
|
28
topics.go
28
topics.go
@@ -327,7 +327,9 @@ func (x *TopicsIndex) Unsubscribe(filter, client string) bool {
|
||||
defer x.root.Unlock()
|
||||
|
||||
var d int
|
||||
if strings.HasPrefix(filter, SharePrefix) {
|
||||
prefix, _ := isolateParticle(filter, 0)
|
||||
shareSub := strings.EqualFold(prefix, SharePrefix)
|
||||
if shareSub {
|
||||
d = 2
|
||||
}
|
||||
|
||||
@@ -336,8 +338,7 @@ func (x *TopicsIndex) Unsubscribe(filter, client string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
prefix, _ := isolateParticle(filter, 0)
|
||||
if strings.EqualFold(prefix, SharePrefix) {
|
||||
if shareSub {
|
||||
group, _ := isolateParticle(filter, 1)
|
||||
particle.shared.Delete(group, client)
|
||||
} else {
|
||||
@@ -500,20 +501,27 @@ func (x *TopicsIndex) scanSubscribers(topic string, d int, n *particle, subs *Su
|
||||
}
|
||||
|
||||
key, hasNext := isolateParticle(topic, d)
|
||||
for _, partKey := range []string{key, "+", "#"} {
|
||||
for _, partKey := range []string{key, "+"} {
|
||||
if particle := n.particles.get(partKey); particle != nil { // [MQTT-3.3.2-3]
|
||||
x.gatherSubscriptions(topic, particle, subs)
|
||||
x.gatherSharedSubscriptions(particle, subs)
|
||||
if wild := particle.particles.get("#"); wild != nil && partKey != "#" && partKey != "+" {
|
||||
x.gatherSubscriptions(topic, wild, subs) // also match any subs where filter/# is filter as per 4.7.1.2
|
||||
}
|
||||
|
||||
if hasNext {
|
||||
x.scanSubscribers(topic, d+1, particle, subs)
|
||||
} else {
|
||||
x.gatherSubscriptions(topic, particle, subs)
|
||||
x.gatherSharedSubscriptions(particle, subs)
|
||||
|
||||
if wild := particle.particles.get("#"); wild != nil && partKey != "+" {
|
||||
x.gatherSubscriptions(topic, wild, subs) // also match any subs where filter/# is filter as per 4.7.1.2
|
||||
x.gatherSharedSubscriptions(wild, subs)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if particle := n.particles.get("#"); particle != nil {
|
||||
x.gatherSubscriptions(topic, particle, subs)
|
||||
x.gatherSharedSubscriptions(particle, subs)
|
||||
}
|
||||
|
||||
return subs
|
||||
}
|
||||
|
||||
|
@@ -319,7 +319,7 @@ func TestUnsubscribeShared(t *testing.T) {
|
||||
require.True(t, exists)
|
||||
require.Equal(t, byte(2), client.Qos)
|
||||
|
||||
require.True(t, index.Unsubscribe("$SHARE/tmp/a/b/c", "cl1"))
|
||||
require.True(t, index.Unsubscribe("$share/tmp/a/b/c", "cl1"))
|
||||
_, exists = final.shared.Get("tmp", "cl1")
|
||||
require.False(t, exists)
|
||||
}
|
||||
@@ -501,28 +501,40 @@ func TestScanSubscribers(t *testing.T) {
|
||||
index.Subscribe("cl2", packets.Subscription{Qos: 0, Filter: "$SYS/test", Identifier: 2})
|
||||
|
||||
subs := index.scanSubscribers("a/b/c", 0, nil, new(Subscribers))
|
||||
require.Equal(t, 4, len(subs.Subscriptions))
|
||||
require.Equal(t, 3, len(subs.Subscriptions))
|
||||
require.Contains(t, subs.Subscriptions, "cl1")
|
||||
require.Contains(t, subs.Subscriptions, "cl2")
|
||||
require.Contains(t, subs.Subscriptions, "cl3")
|
||||
require.Contains(t, subs.Subscriptions, "cl4")
|
||||
|
||||
require.Equal(t, byte(1), subs.Subscriptions["cl1"].Qos)
|
||||
require.Equal(t, byte(2), subs.Subscriptions["cl2"].Qos)
|
||||
require.Equal(t, byte(1), subs.Subscriptions["cl3"].Qos)
|
||||
require.Equal(t, byte(0), subs.Subscriptions["cl4"].Qos)
|
||||
|
||||
require.Equal(t, 22, subs.Subscriptions["cl1"].Identifiers["a/b/c"])
|
||||
require.Equal(t, 0, subs.Subscriptions["cl2"].Identifiers["a/#"])
|
||||
require.Equal(t, 77, subs.Subscriptions["cl2"].Identifiers["a/b/+"])
|
||||
require.Equal(t, 0, subs.Subscriptions["cl2"].Identifiers["a/b/c"])
|
||||
require.Equal(t, 234, subs.Subscriptions["cl3"].Identifiers["+/b"])
|
||||
require.Equal(t, 5, subs.Subscriptions["cl4"].Identifiers["#"])
|
||||
|
||||
subs = index.scanSubscribers("d/e/f/g", 0, nil, new(Subscribers))
|
||||
require.Equal(t, 1, len(subs.Subscriptions))
|
||||
require.Contains(t, subs.Subscriptions, "cl4")
|
||||
require.Equal(t, byte(0), subs.Subscriptions["cl4"].Qos)
|
||||
require.Equal(t, 5, subs.Subscriptions["cl4"].Identifiers["#"])
|
||||
|
||||
subs = index.scanSubscribers("", 0, nil, new(Subscribers))
|
||||
require.Equal(t, 0, len(subs.Subscriptions))
|
||||
}
|
||||
|
||||
func TestScanSubscribersTopicInheritanceBug(t *testing.T) {
|
||||
index := NewTopicsIndex()
|
||||
index.Subscribe("cl1", packets.Subscription{Qos: 0, Filter: "a/b/c"})
|
||||
index.Subscribe("cl2", packets.Subscription{Qos: 0, Filter: "a/b"})
|
||||
|
||||
subs := index.scanSubscribers("a/b/c", 0, nil, new(Subscribers))
|
||||
require.Equal(t, 1, len(subs.Subscriptions))
|
||||
}
|
||||
|
||||
func TestScanSubscribersShared(t *testing.T) {
|
||||
index := NewTopicsIndex()
|
||||
index.Subscribe("cl1", packets.Subscription{Qos: 1, Filter: SharePrefix + "/tmp/a/b/c", Identifier: 111})
|
||||
@@ -531,8 +543,9 @@ func TestScanSubscribersShared(t *testing.T) {
|
||||
index.Subscribe("cl2", packets.Subscription{Qos: 0, Filter: SharePrefix + "/tmp/a/b/+", Identifier: 10})
|
||||
index.Subscribe("cl3", packets.Subscription{Qos: 1, Filter: SharePrefix + "/tmp/a/b/+", Identifier: 200})
|
||||
index.Subscribe("cl4", packets.Subscription{Qos: 0, Filter: SharePrefix + "/tmp/a/b/+", Identifier: 201})
|
||||
index.Subscribe("cl5", packets.Subscription{Qos: 0, Filter: SharePrefix + "/tmp/a/b/c/#"})
|
||||
subs := index.scanSubscribers("a/b/c", 0, nil, new(Subscribers))
|
||||
require.Equal(t, 3, len(subs.Shared))
|
||||
require.Equal(t, 4, len(subs.Shared))
|
||||
}
|
||||
|
||||
func TestSelectSharedSubscriber(t *testing.T) {
|
||||
|
Reference in New Issue
Block a user