Compare commits

..

1 Commits

Author SHA1 Message Date
mochi-co
ef5dcf68d0 Use context to signal client open state 2023-05-06 11:49:02 +01:00
70 changed files with 443 additions and 1333 deletions

1
.gitignore vendored
View File

@@ -1,4 +1,3 @@
cmd/mqtt
.DS_Store
*.db
.idea

View File

@@ -1,8 +1,7 @@
The MIT License (MIT)
Copyright (c) 2023 Mochi-MQTT Organisation
Copyright (c) 2019, 2022, 2023 Jonathan Blake (mochi-co)
Copyright (c) 2019, 2022 Jonathan Blake (mochi-co)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal

179
README.md
View File

@@ -1,11 +1,11 @@
<p align="center">
![build status](https://github.com/mochi-mqtt/server/actions/workflows/build.yml/badge.svg)
[![Coverage Status](https://coveralls.io/repos/github/mochi-mqtt/server/badge.svg?branch=master&v2)](https://coveralls.io/github/mochi-mqtt/server?branch=master)
[![Go Report Card](https://goreportcard.com/badge/github.com/mochi-mqtt/server)](https://goreportcard.com/report/github.com/mochi-mqtt/server/v2)
[![Go Reference](https://pkg.go.dev/badge/github.com/mochi-mqtt/server.svg)](https://pkg.go.dev/github.com/mochi-mqtt/server/v2)
[![contributions welcome](https://img.shields.io/badge/contributions-welcome-brightgreen.svg?style=flat)](https://github.com/mochi-mqtt/server/issues)
![build status](https://github.com/mochi-co/mqtt/actions/workflows/build.yml/badge.svg)
[![Coverage Status](https://coveralls.io/repos/github/mochi-co/mqtt/badge.svg?branch=master&v2)](https://coveralls.io/github/mochi-co/mqtt?branch=master)
[![Go Report Card](https://goreportcard.com/badge/github.com/mochi-co/mqtt)](https://goreportcard.com/report/github.com/mochi-co/mqtt/v2)
[![Go Reference](https://pkg.go.dev/badge/github.com/mochi-co/mqtt.svg)](https://pkg.go.dev/github.com/mochi-co/mqtt/v2)
[![contributions welcome](https://img.shields.io/badge/contributions-welcome-brightgreen.svg?style=flat)](https://github.com/mochi-co/mqtt/issues)
</p>
@@ -16,15 +16,12 @@ Mochi MQTT is an embeddable [fully compliant](https://docs.oasis-open.org/mqtt/m
### What is MQTT?
MQTT stands for [MQ Telemetry Transport](https://en.wikipedia.org/wiki/MQTT). It is a publish/subscribe, extremely simple and lightweight messaging protocol, designed for constrained devices and low-bandwidth, high-latency or unreliable networks ([Learn more](https://mqtt.org/faq)). Mochi MQTT fully implements version 5.0.0 of the MQTT protocol.
### When is this repo updated?
Unless it's a critical issue, new releases typically go out over the weekend.
## What's new in Version 2?
## What's new in Version 2.0.0?
Version 2.0.0 takes all the great things we loved about Mochi MQTT v1.0.0, learns from the mistakes, and improves on the things we wished we'd had. It's a total from-scratch rewrite, designed to fully implement MQTT v5 as a first-class feature.
Don't forget to use the new v2 import paths:
```go
import "github.com/mochi-mqtt/server/v2"
import "github.com/mochi-co/mqtt/v2"
```
- Full MQTTv5 Feature Compliance, compatibility for MQTT v3.1.1 and v3.0.0:
@@ -40,7 +37,7 @@ import "github.com/mochi-mqtt/server/v2"
- Plus all the original MQTT features of Mochi MQTT v1, such as Full QoS(0,1,2), $SYS topics, retained messages, etc.
- Developer-centric:
- Most core broker code is now exported and accessible, for total developer control.
- Full-featured and flexible Hook-based interfacing system to provide easy 'plugin' development.
- Full featured and flexible Hook-based interfacing system to provide easy 'plugin' development.
- Direct Packet Injection using special inline client, or masquerade as existing clients.
- Performant and Stable:
- Our classic trie-based Topic-Subscription model.
@@ -59,7 +56,7 @@ Because of the overlap between the v5 specification and previous versions of mqt
Support for MQTT v3.0.0 and v3.1.1 is considered hybrid-compatibility. Where not specifically restricted in the v3 specification, more modern and safety-first v5 behaviours are used instead - such as expiry for inflight and retained messages, and clients - and quality-of-service flow control limits.
## Roadmap
- Please [open an issue](https://github.com/mochi-mqtt/server/issues) to request new features or event hooks!
- Please [open an issue](https://github.com/mochi-co/mqtt/issues) to request new features or event hooks!
- Cluster support.
- Enhanced Metrics support.
- File-based server configuration (supporting docker).
@@ -88,9 +85,9 @@ Importing Mochi MQTT as a package requires just a few lines of code to get start
import (
"log"
"github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/hooks/auth"
"github.com/mochi-mqtt/server/v2/listeners"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/hooks/auth"
"github.com/mochi-co/mqtt/v2/listeners"
)
func main() {
@@ -119,14 +116,13 @@ Examples of running the broker with various configurations can be found in the [
#### Network Listeners
The server comes with a variety of pre-packaged network listeners which allow the broker to accept connections on different protocols. The current listeners are:
| Listener | Usage |
|------------------------------|----------------------------------------------------------------------------------------------|
| listeners.NewTCP | A TCP listener |
| listeners.NewUnixSock | A Unix Socket listener |
| listeners.NewNet | A net.Listener listener |
| listeners.NewWebsocket | A Websocket listener |
| listeners.NewHTTPStats | An HTTP $SYS info dashboard |
| listeners.NewHTTPHealthCheck | An HTTP healthcheck listener to provide health check responses for e.g. cloud infrastructure |
| Listener | Usage |
| --- | --- |
| listeners.NewTCP | A TCP listener |
| listeners.NewUnixSock | A Unix Socket listener |
| listeners.NewNet | A net.Listener listener |
| listeners.NewWebsocket | A Websocket listener |
| listeners.NewHTTPStats | An HTTP $SYS info dashboard |
> Use the `listeners.Listener` interface to develop new listeners. If you do, please let us know!
@@ -140,13 +136,13 @@ A number of configurable options are available which can be used to alter the be
```go
server := mqtt.New(&mqtt.Options{
Capabilities: mqtt.Capabilities{
ClientNetWriteBufferSize: 4096,
ClientNetReadBufferSize: 4096,
MaximumSessionExpiryInterval: 3600,
Compatibilities: mqtt.Compatibilities{
ObscureNotAuthorized: true,
},
},
ClientNetWriteBufferSize: 4096,
ClientNetReadBufferSize: 4096,
SysTopicResendInterval: 10,
})
```
@@ -161,14 +157,14 @@ Hooks are stackable - you can add multiple hooks to a server, and they will be r
| Type | Import | Info |
| -- | -- | -- |
| Access Control | [mochi-mqtt/server/hooks/auth . AllowHook](hooks/auth/allow_all.go) | Allow access to all connecting clients and read/write to all topics. |
| Access Control | [mochi-mqtt/server/hooks/auth . Auth](hooks/auth/auth.go) | Rule-based access control ledger. |
| Persistence | [mochi-mqtt/server/hooks/storage/bolt](hooks/storage/bolt/bolt.go) | Persistent storage using [BoltDB](https://dbdb.io/db/boltdb) (deprecated). |
| Persistence | [mochi-mqtt/server/hooks/storage/badger](hooks/storage/badger/badger.go) | Persistent storage using [BadgerDB](https://github.com/dgraph-io/badger). |
| Persistence | [mochi-mqtt/server/hooks/storage/redis](hooks/storage/redis/redis.go) | Persistent storage using [Redis](https://redis.io). |
| Debugging | [mochi-mqtt/server/hooks/debug](hooks/debug/debug.go) | Additional debugging output to visualise packet flow. |
| Access Control | [mochi-co/mqtt/hooks/auth . AllowHook](hooks/auth/allow_all.go) | Allow access to all connecting clients and read/write to all topics. |
| Access Control | [mochi-co/mqtt/hooks/auth . Auth](hooks/auth/auth.go) | Rule-based access control ledger. |
| Persistence | [mochi-co/mqtt/hooks/storage/bolt](hooks/storage/bolt/bolt.go) | Persistent storage using [BoltDB](https://dbdb.io/db/boltdb) (deprecated). |
| Persistence | [mochi-co/mqtt/hooks/storage/badger](hooks/storage/badger/badger.go) | Persistent storage using [BadgerDB](https://github.com/dgraph-io/badger). |
| Persistence | [mochi-co/mqtt/hooks/storage/redis](hooks/storage/redis/redis.go) | Persistent storage using [Redis](https://redis.io). |
| Debugging | [mochi-co/mqtt/hooks/debug](hooks/debug/debug.go) | Additional debugging output to visualise packet flow. |
Many of the internal server functions are now exposed to developers, so you can make your own Hooks by using the above as examples. If you do, please [Open an issue](https://github.com/mochi-mqtt/server/issues) and let everyone know!
Many of the internal server functions are now exposed to developers, so you can make your own Hooks by using the above as examples. If you do, please [Open an issue](https://github.com/mochi-co/mqtt/issues) and let everyone know!
### Access Control
#### Allow Hook
@@ -273,51 +269,51 @@ For more information on how the badger hook works, or how to use it, see the [ex
There is also a BoltDB hook which has been deprecated in favour of Badger, but if you need it, check [examples/persistence/bolt/main.go](examples/persistence/bolt/main.go).
## Developing with Event Hooks
Many hooks are available for interacting with the broker and client lifecycle.
The function signatures for all the hooks and `mqtt.Hook` interface can be found in [hooks.go](hooks.go).
> The most flexible event hooks are OnPacketRead, OnPacketEncode, and OnPacketSent - these hooks be used to control and modify all incoming and outgoing packets.
| Function | Usage |
|------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| OnStarted | Called when the server has successfully started. |
| OnStopped | Called when the server has successfully stopped. |
| OnConnectAuthenticate | Called when a user attempts to authenticate with the server. An implementation of this method MUST be used to allow or deny access to the server (see hooks/auth/allow_all or basic). It can be used in custom hooks to check connecting users against an existing user database. Returns true if allowed. |
| OnACLCheck | Called when a user attempts to publish or subscribe to a topic filter. As above. |
| OnSysInfoTick | Called when the $SYS topic values are published out. |
| OnConnect | Called when a new client connects, may return an error or packet code to halt the client connection process. |
| OnSessionEstablish | Called immediately after a new client connects and authenticates and immediately before the session is established and CONNACK is sent.
| OnSessionEstablished | Called when a new client successfully establishes a session (after OnConnect) |
| OnDisconnect | Called when a client is disconnected for any reason. |
| OnAuthPacket | Called when an auth packet is received. It is intended to allow developers to create their own mqtt v5 Auth Packet handling mechanisms. Allows packet modification. |
| OnPacketRead | Called when a packet is received from a client. Allows packet modification. |
| OnPacketEncode | Called immediately before a packet is encoded to be sent to a client. Allows packet modification. |
| OnPacketSent | Called when a packet has been sent to a client. |
| OnPacketProcessed | Called when a packet has been received and successfully handled by the broker. |
| OnSubscribe | Called when a client subscribes to one or more filters. Allows packet modification. |
| OnSubscribed | Called when a client successfully subscribes to one or more filters. |
| OnSelectSubscribers | Called when subscribers have been collected for a topic, but before shared subscription subscribers have been selected. Allows receipient modification. |
| OnUnsubscribe | Called when a client unsubscribes from one or more filters. Allows packet modification. |
| OnUnsubscribed | Called when a client successfully unsubscribes from one or more filters. |
| OnPublish | Called when a client publishes a message. Allows packet modification. |
| OnPublished | Called when a client has published a message to subscribers. |
| OnPublishDropped | Called when a message to a client is dropped before delivery, such as if the client is taking too long to respond. |
| OnRetainMessage | Called then a published message is retained. |
| OnRetainPublished | Called then a retained message is published to a client. |
| OnQosPublish | Called when a publish packet with Qos >= 1 is issued to a subscriber. |
| OnQosComplete | Called when the Qos flow for a message has been completed. |
| OnQosDropped | Called when an inflight message expires before completion. |
| OnPacketIDExhausted | Called when a client runs out of unused packet ids to assign. |
| OnWill | Called when a client disconnects and intends to issue a will message. Allows packet modification. |
| OnWillSent | Called when an LWT message has been issued from a disconnecting client. |
| OnClientExpired | Called when a client session has expired and should be deleted. |
| OnRetainedExpired | Called when a retained message has expired and should be deleted. |
| StoredClients | Returns clients, eg. from a persistent store. |
| StoredSubscriptions | Returns client subscriptions, eg. from a persistent store. |
| StoredInflightMessages | Returns inflight messages, eg. from a persistent store. |
| StoredRetainedMessages | Returns retained messages, eg. from a persistent store. |
| StoredSysInfo | Returns stored system info values, eg. from a persistent store. |
| Function | Usage |
| -------------------------- | -- |
| OnStarted | Called when the server has successfully started.|
| OnStopped | Called when the server has successfully stopped. |
| OnConnectAuthenticate | Called when a user attempts to authenticate with the server. An implementation of this method MUST be used to allow or deny access to the server (see hooks/auth/allow_all or basic). It can be used in custom hooks to check connecting users against an existing user database. Returns true if allowed. |
| OnACLCheck | Called when a user attempts to publish or subscribe to a topic filter. As above. |
| OnSysInfoTick | Called when the $SYS topic values are published out. |
| OnConnect | Called when a new client connects |
| OnSessionEstablished | Called when a new client successfully establishes a session (after OnConnect) |
| OnDisconnect | Called when a client is disconnected for any reason. |
| OnAuthPacket | Called when an auth packet is received. It is intended to allow developers to create their own mqtt v5 Auth Packet handling mechanisms. Allows packet modification. |
| OnPacketRead | Called when a packet is received from a client. Allows packet modification. |
| OnPacketEncode | Called immediately before a packet is encoded to be sent to a client. Allows packet modification. |
| OnPacketSent | Called when a packet has been sent to a client. |
| OnPacketProcessed | Called when a packet has been received and successfully handled by the broker. |
| OnSubscribe | Called when a client subscribes to one or more filters. Allows packet modification. |
| OnSubscribed | Called when a client successfully subscribes to one or more filters. |
| OnSelectSubscribers | Called when subscribers have been collected for a topic, but before shared subscription subscribers have been selected. Allows receipient modification.|
| OnUnsubscribe | Called when a client unsubscribes from one or more filters. Allows packet modification. |
| OnUnsubscribed | Called when a client successfully unsubscribes from one or more filters. |
| OnPublish | Called when a client publishes a message. Allows packet modification. |
| OnPublished | Called when a client has published a message to subscribers. |
| OnPublishDropped | Called when a message to a client is dropped before delivery, such as if the client is taking too long to respond. |
| OnRetainMessage | Called then a published message is retained. |
| OnQosPublish | Called when a publish packet with Qos >= 1 is issued to a subscriber. |
| OnQosComplete | Called when the Qos flow for a message has been completed. |
| OnQosDropped | Called when an inflight message expires before completion. |
| OnPacketIDExhausted | Called when a client runs out of unused packet ids to assign. |
| OnWill | Called when a client disconnects and intends to issue a will message. Allows packet modification. |
| OnWillSent | Called when an LWT message has been issued from a disconnecting client. |
| OnClientExpired | Called when a client session has expired and should be deleted. |
| OnRetainedExpired | Called when a retained message has expired and should be deleted. |
| StoredClients | Returns clients, eg. from a persistent store. |
| StoredSubscriptions | Returns client subscriptions, eg. from a persistent store. |
| StoredInflightMessages | Returns inflight messages, eg. from a persistent store. |
| StoredRetainedMessages | Returns retained messages, eg. from a persistent store. |
| StoredSysInfo | Returns stored system info values, eg. from a persistent store. |
If you are building a persistent storage hook, see the existing persistent hooks for inspiration and patterns. If you are building an auth hook, you will need `OnACLCheck` and `OnConnectAuthenticate`.
@@ -371,54 +367,39 @@ Mochi MQTT performance is comparable with popular brokers such as Mosquitto, EMQ
Performance benchmarks were tested using [MQTT-Stresser](https://github.com/inovex/mqtt-stresser) on a Apple Macbook Air M2, using `cmd/main.go` default settings. Taking into account bursts of high and low throughput, the median scores are the most useful. Higher is better.
> The values presented in the benchmark are not representative of true messages per second throughput. They rely on an unusual calculation by mqtt-stresser, but are usable as they are consistent across all brokers.
> Benchmarks are provided as a general performance expectation guideline only. Comparisons are performed using out-of-the-box default configurations.
> Benchmarks are provided as a general performance expectation guideline only.
`mqtt-stresser -broker tcp://localhost:1883 -num-clients=2 -num-messages=10000`
| Broker | publish fastest | median | slowest | receive fastest | median | slowest |
| -- | -- | -- | -- | -- | -- | -- |
| Mochi v2.2.10 | 124,772 | 125,456 | 124,614 | 314,461 | 313,186 | 311,910 |
| [Mosquitto v2.0.15](https://github.com/eclipse/mosquitto) | 155,920 | 155,919 | 155,918 | 185,485 | 185,097 | 184,709 |
| [EMQX v5.0.11](https://github.com/emqx/emqx) | 156,945 | 156,257 | 155,568 | 17,918 | 17,783 | 17,649 |
| [Rumqtt v0.21.0](https://github.com/bytebeamio/rumqtt) | 112,208 | 108,480 | 104,753 | 135,784 | 126,446 | 117,108 |
| Mochi v2.2.0 | 127,216 | 125,748 | 124,279 | 319,250 | 309,327 | 299,405 |
| Mosquitto v2.0.15 | 155,920 | 155,919 | 155,918 | 185,485 | 185,097 | 184,709 |
| EMQX v5.0.11 | 156,945 | 156,257 | 155,568 | 17,918 | 17,783 | 17,649 |
`mqtt-stresser -broker tcp://localhost:1883 -num-clients=10 -num-messages=10000`
| Broker | publish fastest | median | slowest | receive fastest | median | slowest |
| -- | -- | -- | -- | -- | -- | -- |
| Mochi v2.2.10 | 41,825 | 31,663| 23,008 | 144,058 | 65,903 | 37,618 |
| Mochi v2.2.0 | 45,615 | 30,129 | 21,138 | 232,717 | 86,323 | 50,402 |
| Mosquitto v2.0.15 | 42,729 | 38,633 | 29,879 | 23,241 | 19,714 | 18,806 |
| EMQX v5.0.11 | 21,553 | 17,418 | 14,356 | 4,257 | 3,980 | 3,756 |
| Rumqtt v0.21.0 | 42,213 | 23,153 | 20,814 | 49,465 | 36,626 | 19,283 |
Million Message Challenge (hit the server with 1 million messages immediately):
`mqtt-stresser -broker tcp://localhost:1883 -num-clients=100 -num-messages=10000`
| Broker | publish fastest | median | slowest | receive fastest | median | slowest |
| -- | -- | -- | -- | -- | -- | -- |
| Mochi v2.2.10 | 13,532 | 4,425 | 2,344 | 52,120 | 7,274 | 2,701 |
| Mochi v2.2.0 | 51,044 | 4,682 | 2,345 | 72,634 | 7,645 | 2,464 |
| Mosquitto v2.0.15 | 3,826 | 3,395 | 3,032 | 1,200 | 1,150 | 1,118 |
| EMQX v5.0.11 | 4,086 | 2,432 | 2,274 | 434 | 333 | 311 |
| Rumqtt v0.21.0 | 78,972 | 5,047 | 3,804 | 4,286 | 3,249 | 2,027 |
> Not sure what's going on with EMQX here, perhaps the docker out-of-the-box settings are not optimal, so take it with a pinch of salt as we know for a fact it's a solid piece of software.
## Contribution Guidelines
Contributions and feedback are both welcomed and encouraged! [Open an issue](https://github.com/mochi-mqtt/server/issues) to report a bug, ask a question, or make a feature request. If you open a pull request, please try to follow the following guidelines:
- Try to maintain test coverage where reasonably possible.
- Clearly state what the PR does and why.
- Remember to add your SPDX FileContributor tag to files where you have made a meaningful contribution.
[SPDX Annotations](https://spdx.dev) are used to clearly indicate the license, copyright, and contributions of each file in a machine-readable format. If you are adding a new file to the repository, please ensure it has the following SPDX header:
```go
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2023 mochi-mqtt
// SPDX-FileContributor: Your name or alias <optional@email.address>
package name
```
Please ensure to add a new `SPDX-FileContributor` line for each contributor to the file. Refer to other files for examples. Please remember to do this, your contributions to this project are valuable and appreciated - it's important to receive credit!
## Stargazers over time 🥰
[![Stargazers over time](https://starchart.cc/mochi-mqtt/server.svg)](https://starchart.cc/mochi-mqtt/server)
Are you using Mochi MQTT in a project? [Let us know!](https://github.com/mochi-mqtt/server/issues)
[![Stargazers over time](https://starchart.cc/mochi-co/mqtt.svg)](https://starchart.cc/mochi-co/mqtt)
Are you using Mochi MQTT in a project? [Let us know!](https://github.com/mochi-co/mqtt/issues)
## Contributions
Contributions and feedback are both welcomed and encouraged! [Open an issue](https://github.com/mochi-co/mqtt/issues) to report a bug, ask a question, or make a feature request.

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2023 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 J. Blake / mochi-co
// SPDX-FileContributor: mochi-co
package mqtt
@@ -17,7 +17,7 @@ import (
"github.com/rs/xid"
"github.com/mochi-mqtt/server/v2/packets"
"github.com/mochi-co/mqtt/v2/packets"
)
const (
@@ -136,34 +136,30 @@ type Will struct {
// State tracks the state of the client.
type ClientState struct {
TopicAliases TopicAliases // a map of topic aliases
stopCause atomic.Value // reason for stopping
Inflight *Inflight // a map of in-flight qos messages
Subscriptions *Subscriptions // a map of the subscription filters a client maintains
disconnected int64 // the time the client disconnected in unix time, for calculating expiry
outbound chan *packets.Packet // queue for pending outbound packets
endOnce sync.Once // only end once
isTakenOver uint32 // used to identify orphaned clients
packetID uint32 // the current highest packetID
open context.Context // indicate that the client is open for packet exchange
cancelOpen context.CancelFunc // cancel function for open context
outboundQty int32 // number of messages currently in the outbound queue
Keepalive uint16 // the number of seconds the connection can wait
ServerKeepalive bool // keepalive was set by the server
TopicAliases TopicAliases // a map of topic aliases
stopCause atomic.Value // reason for stopping
Inflight *Inflight // a map of in-flight qos messages
Subscriptions *Subscriptions // a map of the subscription filters a client maintains
disconnected int64 // the time the client disconnected in unix time, for calculating expiry
outbound chan *packets.Packet // queue for pending outbound packets
endOnce sync.Once // only end once
isTakenOver uint32 // used to identify orphaned clients
packetID uint32 // the current highest packetID
open context.Context // indicate that the client is open for packet exchange
outboundQty int32 // number of messages currently in the outbound queue
keepalive uint16 // the number of seconds the connection can wait
}
// newClient returns a new instance of Client. This is almost exclusively used by Server
// for creating new clients, but it lives here because it's not dependent.
func newClient(c net.Conn, o *ops) *Client {
ctx, cancel := context.WithCancel(context.Background())
cl := &Client{
State: ClientState{
Inflight: NewInflights(),
Subscriptions: NewSubscriptions(),
TopicAliases: NewTopicAliases(o.options.Capabilities.TopicAliasMaximum),
open: ctx,
cancelOpen: cancel,
Keepalive: defaultKeepalive,
keepalive: defaultKeepalive,
open: context.Background(),
outbound: make(chan *packets.Packet, o.options.Capabilities.MaximumClientWritesPending),
},
Properties: ClientProperties{
@@ -177,27 +173,24 @@ func newClient(c net.Conn, o *ops) *Client {
Conn: c,
bconn: bufio.NewReadWriter(
bufio.NewReaderSize(c, o.options.ClientNetReadBufferSize),
bufio.NewWriterSize(c, o.options.ClientNetWriteBufferSize),
bufio.NewWriterSize(c, o.options.ClientNetReadBufferSize),
),
Remote: c.RemoteAddr().String(),
}
}
cl.refreshDeadline(cl.State.keepalive)
return cl
}
// WriteLoop ranges over pending outbound messages and writes them to the client connection.
func (cl *Client) WriteLoop() {
for {
select {
case pk := <-cl.State.outbound:
if err := cl.WritePacket(*pk); err != nil {
cl.ops.log.Debug().Err(err).Str("client", cl.ID).Interface("packet", pk).Msg("failed publishing packet")
}
atomic.AddInt32(&cl.State.outboundQty, -1)
case <-cl.State.open.Done():
return
for pk := range cl.State.outbound {
if err := cl.WritePacket(*pk); err != nil {
cl.ops.log.Debug().Err(err).Str("client", cl.ID).Interface("packet", pk).Msg("failed publishing packet")
}
atomic.AddInt32(&cl.State.outboundQty, -1)
}
}
@@ -210,9 +203,9 @@ func (cl *Client) ParseConnect(lid string, pk packets.Packet) {
cl.Properties.Clean = pk.Connect.Clean
cl.Properties.Props = pk.Properties.Copy(false)
cl.State.Keepalive = pk.Connect.Keepalive // [MQTT-3.2.2-22]
cl.State.Inflight.ResetReceiveQuota(int32(cl.ops.options.Capabilities.ReceiveMaximum)) // server receive max per client
cl.State.Inflight.ResetSendQuota(int32(cl.Properties.Props.ReceiveMaximum)) // client receive max
cl.State.TopicAliases.Outbound = NewOutboundTopicAliases(cl.Properties.Props.TopicAliasMaximum)
cl.ID = pk.Connect.ClientIdentifier
@@ -221,6 +214,11 @@ func (cl *Client) ParseConnect(lid string, pk packets.Packet) {
cl.Properties.Props.AssignedClientID = cl.ID
}
cl.State.keepalive = cl.ops.options.Capabilities.ServerKeepAlive
if pk.Connect.Keepalive > 0 {
cl.State.keepalive = pk.Connect.Keepalive // [MQTT-3.2.2-22]
}
if pk.Connect.WillFlag {
cl.Properties.Will = Will{
Qos: pk.Connect.WillQos,
@@ -238,6 +236,8 @@ func (cl *Client) ParseConnect(lid string, pk packets.Packet) {
cl.Properties.Will.Flag = 1 // atomic for checking
}
}
cl.refreshDeadline(cl.State.keepalive)
}
// refreshDeadline refreshes the read/write deadline for the net.Conn connection.
@@ -336,7 +336,7 @@ func (cl *Client) Read(packetHandler ReadFn) error {
return nil
}
cl.refreshDeadline(cl.State.Keepalive)
cl.refreshDeadline(cl.State.keepalive)
fh := new(packets.FixedHeader)
err = cl.ReadFixedHeader(fh)
if err != nil {
@@ -358,6 +358,8 @@ func (cl *Client) Read(packetHandler ReadFn) error {
// Stop instructs the client to shut down all processing goroutines and disconnect.
func (cl *Client) Stop(err error) {
cl.State.endOnce.Do(func() {
cl.Lock()
defer cl.Unlock()
if cl.Net.Conn != nil {
_ = cl.Net.Conn.Close() // omit close error
@@ -367,8 +369,14 @@ func (cl *Client) Stop(err error) {
cl.State.stopCause.Store(err)
}
if cl.State.cancelOpen != nil {
cl.State.cancelOpen()
if cl.State.outbound != nil {
close(cl.State.outbound)
}
if cl.State.open != nil {
var cancel context.CancelFunc
cl.State.open, cancel = context.WithCancel(cl.State.open)
cancel()
}
atomic.StoreInt64(&cl.State.disconnected, time.Now().Unix())
@@ -481,14 +489,6 @@ func (cl *Client) ReadPacket(fh *packets.FixedHeader) (pk packets.Packet, err er
// WritePacket encodes and writes a packet to the client.
func (cl *Client) WritePacket(pk packets.Packet) error {
if cl.Closed() {
return ErrConnectionClosed
}
if cl.Net.Conn == nil {
return nil
}
if pk.Expiry > 0 {
pk.Properties.MessageExpiryInterval = uint32(pk.Expiry - time.Now().Unix()) // [MQTT-3.3.2-6]
}
@@ -552,12 +552,19 @@ func (cl *Client) WritePacket(pk packets.Packet) error {
return packets.ErrPacketTooLarge // [MQTT-3.1.2-24] [MQTT-3.1.2-25]
}
cl.Lock()
defer cl.Unlock()
if cl.Closed() {
return ErrConnectionClosed
}
if cl.Net.Conn == nil {
return nil
}
nb := net.Buffers{buf.Bytes()}
n, err := func() (int64, error) {
cl.Lock()
defer cl.Unlock()
return nb.WriteTo(cl.Net.Conn)
}()
n, err := nb.WriteTo(cl.Net.Conn)
if err != nil {
return err
}

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package mqtt
@@ -13,8 +13,8 @@ import (
"testing"
"time"
"github.com/mochi-mqtt/server/v2/packets"
"github.com/mochi-mqtt/server/v2/system"
"github.com/mochi-co/mqtt/v2/packets"
"github.com/mochi-co/mqtt/v2/system"
"github.com/stretchr/testify/require"
)
@@ -133,7 +133,7 @@ func TestNewClient(t *testing.T) {
require.NotNil(t, cl.State.Inflight.internal)
require.NotNil(t, cl.State.Subscriptions)
require.NotNil(t, cl.State.TopicAliases)
require.Equal(t, defaultKeepalive, cl.State.Keepalive)
require.Equal(t, defaultKeepalive, cl.State.keepalive)
require.Equal(t, defaultClientProtocolVersion, cl.Properties.ProtocolVersion)
require.NotNil(t, cl.Net.Conn)
require.NotNil(t, cl.Net.bconn)
@@ -165,7 +165,7 @@ func TestClientParseConnect(t *testing.T) {
cl.ParseConnect("tcp1", pk)
require.Equal(t, pk.Connect.ClientIdentifier, cl.ID)
require.Equal(t, pk.Connect.Keepalive, cl.State.Keepalive)
require.Equal(t, pk.Connect.Keepalive, cl.State.keepalive)
require.Equal(t, pk.Connect.Clean, cl.Properties.Clean)
require.Equal(t, pk.Connect.ClientIdentifier, cl.ID)
require.Equal(t, pk.Connect.WillTopic, cl.Properties.Will.TopicName)
@@ -467,7 +467,7 @@ func TestClientReadOK(t *testing.T) {
func TestClientReadDone(t *testing.T) {
cl, _, _ := newTestClient()
defer cl.Stop(errClientStop)
cl.State.cancelOpen()
cl.State.open = nil
o := make(chan error)
go func() {

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package main
@@ -11,9 +11,9 @@ import (
"os/signal"
"syscall"
"github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/hooks/auth"
"github.com/mochi-mqtt/server/v2/listeners"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/hooks/auth"
"github.com/mochi-co/mqtt/v2/listeners"
)
func main() {

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package main
@@ -10,9 +10,9 @@ import (
"os/signal"
"syscall"
"github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/hooks/auth"
"github.com/mochi-mqtt/server/v2/listeners"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/hooks/auth"
"github.com/mochi-co/mqtt/v2/listeners"
)
func main() {

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package main
@@ -11,9 +11,9 @@ import (
"os/signal"
"syscall"
"github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/hooks/auth"
"github.com/mochi-mqtt/server/v2/listeners"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/hooks/auth"
"github.com/mochi-co/mqtt/v2/listeners"
)
func main() {

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package main
@@ -11,9 +11,9 @@ import (
"os/signal"
"syscall"
"github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/hooks/auth"
"github.com/mochi-mqtt/server/v2/listeners"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/hooks/auth"
"github.com/mochi-co/mqtt/v2/listeners"
)
func main() {

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package main
@@ -10,10 +10,10 @@ import (
"os/signal"
"syscall"
"github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/hooks/auth"
"github.com/mochi-mqtt/server/v2/hooks/debug"
"github.com/mochi-mqtt/server/v2/listeners"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/hooks/auth"
"github.com/mochi-co/mqtt/v2/hooks/debug"
"github.com/mochi-co/mqtt/v2/listeners"
"github.com/rs/zerolog"
)
@@ -30,14 +30,14 @@ func main() {
l := server.Log.Level(zerolog.DebugLevel)
server.Log = &l
err := server.AddHook(new(debug.Hook), &debug.Options{
// ShowPacketData: true,
})
err := server.AddHook(new(auth.AllowHook), nil)
if err != nil {
log.Fatal(err)
}
err = server.AddHook(new(auth.AllowHook), nil)
err = server.AddHook(new(debug.Hook), &debug.Options{
// ShowPacketData: true,
})
if err != nil {
log.Fatal(err)
}

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package main
@@ -12,10 +12,10 @@ import (
"syscall"
"time"
"github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/hooks/auth"
"github.com/mochi-mqtt/server/v2/listeners"
"github.com/mochi-mqtt/server/v2/packets"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/hooks/auth"
"github.com/mochi-co/mqtt/v2/listeners"
"github.com/mochi-co/mqtt/v2/packets"
)
func main() {
@@ -110,9 +110,8 @@ func (h *ExampleHook) Init(config any) error {
return nil
}
func (h *ExampleHook) OnConnect(cl *mqtt.Client, pk packets.Packet) error {
func (h *ExampleHook) OnConnect(cl *mqtt.Client, pk packets.Packet) {
h.Log.Info().Str("client", cl.ID).Msgf("client connected")
return nil
}
func (h *ExampleHook) OnDisconnect(cl *mqtt.Client, err error, expire bool) {

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package main
@@ -11,9 +11,9 @@ import (
"os/signal"
"syscall"
"github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/listeners"
"github.com/mochi-mqtt/server/v2/packets"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/listeners"
"github.com/mochi-co/mqtt/v2/packets"
)
func main() {
@@ -26,9 +26,9 @@ func main() {
}()
server := mqtt.New(nil)
server.Options.Capabilities.ServerKeepAlive = 60
server.Options.Capabilities.Compatibilities.ObscureNotAuthorized = true
server.Options.Capabilities.Compatibilities.PassiveClientDisconnect = true
server.Options.Capabilities.Compatibilities.NoInheritedPropertiesOnAck = true
_ = server.AddHook(new(pahoAuthHook), nil)
tcp := listeners.NewTCP("t1", ":1883", nil)
@@ -61,7 +61,6 @@ func (h *pahoAuthHook) ID() string {
func (h *pahoAuthHook) Provides(b byte) bool {
return bytes.Contains([]byte{
mqtt.OnConnectAuthenticate,
mqtt.OnConnect,
mqtt.OnACLCheck,
}, []byte{b})
}
@@ -73,12 +72,3 @@ func (h *pahoAuthHook) OnConnectAuthenticate(cl *mqtt.Client, pk packets.Packet)
func (h *pahoAuthHook) OnACLCheck(cl *mqtt.Client, topic string, write bool) bool {
return topic != "test/nosubscribe"
}
func (h *pahoAuthHook) OnConnect(cl *mqtt.Client, pk packets.Packet) error {
// Handle paho test_server_keep_alive
if pk.Connect.Keepalive == 120 && pk.Connect.Clean {
cl.State.Keepalive = 60
cl.State.ServerKeepalive = true
}
return nil
}

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package main
@@ -10,10 +10,10 @@ import (
"os/signal"
"syscall"
"github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/hooks/auth"
"github.com/mochi-mqtt/server/v2/hooks/storage/badger"
"github.com/mochi-mqtt/server/v2/listeners"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/hooks/auth"
"github.com/mochi-co/mqtt/v2/hooks/storage/badger"
"github.com/mochi-co/mqtt/v2/listeners"
)
func main() {

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package main
@@ -11,10 +11,10 @@ import (
"syscall"
"time"
"github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/hooks/auth"
"github.com/mochi-mqtt/server/v2/hooks/storage/bolt"
"github.com/mochi-mqtt/server/v2/listeners"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/hooks/auth"
"github.com/mochi-co/mqtt/v2/hooks/storage/bolt"
"github.com/mochi-co/mqtt/v2/listeners"
"go.etcd.io/bbolt"
)

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package main
@@ -10,10 +10,10 @@ import (
"os/signal"
"syscall"
"github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/hooks/auth"
"github.com/mochi-mqtt/server/v2/hooks/storage/redis"
"github.com/mochi-mqtt/server/v2/listeners"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/hooks/auth"
"github.com/mochi-co/mqtt/v2/hooks/storage/redis"
"github.com/mochi-co/mqtt/v2/listeners"
"github.com/rs/zerolog"
rv8 "github.com/go-redis/redis/v8"

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package main
@@ -10,9 +10,9 @@ import (
"os/signal"
"syscall"
"github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/hooks/auth"
"github.com/mochi-mqtt/server/v2/listeners"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/hooks/auth"
"github.com/mochi-co/mqtt/v2/listeners"
)
func main() {

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package main
@@ -11,9 +11,9 @@ import (
"os/signal"
"syscall"
"github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/hooks/auth"
"github.com/mochi-mqtt/server/v2/listeners"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/hooks/auth"
"github.com/mochi-co/mqtt/v2/listeners"
)
var (
@@ -97,7 +97,7 @@ func main() {
stats := listeners.NewHTTPStats("stats", ":8080", &listeners.Config{
TLSConfig: tlsConfig,
}, server.Info)
}, nil)
err = server.AddListener(stats)
if err != nil {
log.Fatal(err)

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package main
@@ -10,9 +10,9 @@ import (
"os/signal"
"syscall"
"github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/hooks/auth"
"github.com/mochi-mqtt/server/v2/listeners"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/hooks/auth"
"github.com/mochi-co/mqtt/v2/listeners"
)
func main() {

2
go.mod
View File

@@ -1,4 +1,4 @@
module github.com/mochi-mqtt/server/v2
module github.com/mochi-co/mqtt/v2
go 1.19

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co, thedevop
package mqtt
@@ -10,9 +10,9 @@ import (
"sync"
"sync/atomic"
"github.com/mochi-mqtt/server/v2/hooks/storage"
"github.com/mochi-mqtt/server/v2/packets"
"github.com/mochi-mqtt/server/v2/system"
"github.com/mochi-co/mqtt/v2/hooks/storage"
"github.com/mochi-co/mqtt/v2/packets"
"github.com/mochi-co/mqtt/v2/system"
"github.com/rs/zerolog"
)
@@ -25,7 +25,6 @@ const (
OnConnectAuthenticate
OnACLCheck
OnConnect
OnSessionEstablish
OnSessionEstablished
OnDisconnect
OnAuthPacket
@@ -42,7 +41,6 @@ const (
OnPublished
OnPublishDropped
OnRetainMessage
OnRetainPublished
OnQosPublish
OnQosComplete
OnQosDropped
@@ -76,8 +74,7 @@ type Hook interface {
OnConnectAuthenticate(cl *Client, pk packets.Packet) bool
OnACLCheck(cl *Client, topic string, write bool) bool
OnSysInfoTick(*system.Info)
OnConnect(cl *Client, pk packets.Packet) error
OnSessionEstablish(cl *Client, pk packets.Packet)
OnConnect(cl *Client, pk packets.Packet)
OnSessionEstablished(cl *Client, pk packets.Packet)
OnDisconnect(cl *Client, err error, expire bool)
OnAuthPacket(cl *Client, pk packets.Packet) (packets.Packet, error)
@@ -94,7 +91,6 @@ type Hook interface {
OnPublished(cl *Client, pk packets.Packet)
OnPublishDropped(cl *Client, pk packets.Packet)
OnRetainMessage(cl *Client, pk packets.Packet, r int64)
OnRetainPublished(cl *Client, pk packets.Packet)
OnQosPublish(cl *Client, pk packets.Packet, sent int64, resends int)
OnQosComplete(cl *Client, pk packets.Packet)
OnQosDropped(cl *Client, pk packets.Packet)
@@ -218,25 +214,11 @@ func (h *Hooks) OnStopped() {
}
}
// OnConnect is called when a new client connects, and may return a packets.Code as an error to halt the connection.
func (h *Hooks) OnConnect(cl *Client, pk packets.Packet) error {
// OnConnect is called when a new client connects.
func (h *Hooks) OnConnect(cl *Client, pk packets.Packet) {
for _, hook := range h.GetAll() {
if hook.Provides(OnConnect) {
err := hook.OnConnect(cl, pk)
if err != nil {
return err
}
}
}
return nil
}
// OnSessionEstablish is called right after a new client connects and authenticates and right before
// the session is established and CONNACK is sent.
func (h *Hooks) OnSessionEstablish(cl *Client, pk packets.Packet) {
for _, hook := range h.GetAll() {
if hook.Provides(OnSessionEstablish) {
hook.OnSessionEstablish(cl, pk)
hook.OnConnect(cl, pk)
}
}
}
@@ -392,14 +374,13 @@ func (h *Hooks) OnPublish(cl *Client, pk packets.Packet) (pkx packets.Packet, er
for _, hook := range h.GetAll() {
if hook.Provides(OnPublish) {
npk, err := hook.OnPublish(cl, pkx)
if err != nil {
if errors.Is(err, packets.ErrRejectPacket) {
h.Log.Debug().Err(err).Str("hook", hook.ID()).Interface("packet", pkx).Msg("publish packet rejected")
return pk, err
}
h.Log.Error().Err(err).Str("hook", hook.ID()).Interface("packet", pkx).Msg("publish packet error")
if err != nil && errors.Is(err, packets.ErrRejectPacket) {
h.Log.Debug().Err(err).Str("hook", hook.ID()).Interface("packet", pkx).Msg("publish packet rejected")
return pk, err
} else if err != nil {
continue
}
pkx = npk
}
}
@@ -435,15 +416,6 @@ func (h *Hooks) OnRetainMessage(cl *Client, pk packets.Packet, r int64) {
}
}
// OnRetainPublished is called when a retained message is published.
func (h *Hooks) OnRetainPublished(cl *Client, pk packets.Packet) {
for _, hook := range h.GetAll() {
if hook.Provides(OnRetainPublished) {
hook.OnRetainPublished(cl, pk)
}
}
}
// OnQosPublish is called when a publish packet with Qos >= 1 is issued to a subscriber.
// In other words, this method is called when a new inflight message is created or resent.
// It is typically used to store a new inflight message.
@@ -696,7 +668,7 @@ func (h *HookBase) SetOpts(l *zerolog.Logger, opts *HookOptions) {
h.Opts = opts
}
// Stop is called to gracefully shut down the hook.
// Stop is called to gracefully shutdown the hook.
func (h *HookBase) Stop() error {
return nil
}
@@ -721,13 +693,7 @@ func (h *HookBase) OnACLCheck(cl *Client, topic string, write bool) bool {
}
// OnConnect is called when a new client connects.
func (h *HookBase) OnConnect(cl *Client, pk packets.Packet) error {
return nil
}
// OnSessionEstablish is called right after a new client connects and authenticates and right before
// the session is established and CONNACK is sent.
func (h *HookBase) OnSessionEstablish(cl *Client, pk packets.Packet) {}
func (h *HookBase) OnConnect(cl *Client, pk packets.Packet) {}
// OnSessionEstablished is called when a new client establishes a session (after OnConnect).
func (h *HookBase) OnSessionEstablished(cl *Client, pk packets.Packet) {}
@@ -791,9 +757,6 @@ func (h *HookBase) OnPublishDropped(cl *Client, pk packets.Packet) {}
// OnRetainMessage is called then a published message is retained.
func (h *HookBase) OnRetainMessage(cl *Client, pk packets.Packet, r int64) {}
// OnRetainPublished is called when a retained message is published.
func (h *HookBase) OnRetainPublished(cl *Client, pk packets.Packet) {}
// OnQosPublish is called when a publish packet with Qos > 1 is issued to a subscriber.
func (h *HookBase) OnQosPublish(cl *Client, pk packets.Packet, sent int64, resends int) {}

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package auth
@@ -7,8 +7,8 @@ package auth
import (
"bytes"
"github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/packets"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/packets"
)
// AllowHook is an authentication hook which allows connection access

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package auth
@@ -7,8 +7,8 @@ package auth
import (
"testing"
"github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/packets"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/packets"
"github.com/stretchr/testify/require"
)

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package auth
@@ -7,8 +7,8 @@ package auth
import (
"bytes"
"github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/packets"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/packets"
)
// Options contains the configuration/rules data for the auth ledger.

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package auth
@@ -8,8 +8,8 @@ import (
"os"
"testing"
"github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/packets"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/packets"
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"
)

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package auth
@@ -9,10 +9,9 @@ import (
"strings"
"sync"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/packets"
"gopkg.in/yaml.v3"
"github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/packets"
)
const (
@@ -189,29 +188,15 @@ func (l *Ledger) ACLOk(cl *mqtt.Client, topic string, write bool) (n int, ok boo
return n, true
}
if write {
for filter, access := range rule.Filters {
if access == WriteOnly || access == ReadWrite {
if filter.FilterMatches(topic) {
return n, true
}
}
}
}
if !write {
for filter, access := range rule.Filters {
if access == ReadOnly || access == ReadWrite {
if filter.FilterMatches(topic) {
return n, true
}
}
}
}
for filter, _ := range rule.Filters {
for filter, access := range rule.Filters {
if filter.FilterMatches(topic) {
return n, false
if !write && (access == ReadOnly || access == ReadWrite) {
return n, true
} else if write && (access == WriteOnly || access == ReadWrite) {
return n, true
} else {
return n, false
}
}
}
}

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package auth
@@ -7,8 +7,8 @@ package auth
import (
"testing"
"github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/packets"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/packets"
"github.com/stretchr/testify/require"
)

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package debug
@@ -7,9 +7,9 @@ package debug
import (
"strings"
"github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/hooks/storage"
"github.com/mochi-mqtt/server/v2/packets"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/hooks/storage"
"github.com/mochi-co/mqtt/v2/packets"
"github.com/rs/zerolog"
)

View File

@@ -1,6 +1,6 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileContributor: mochi-co, gsagula
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package badger
@@ -9,10 +9,10 @@ import (
"errors"
"strings"
"github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/hooks/storage"
"github.com/mochi-mqtt/server/v2/packets"
"github.com/mochi-mqtt/server/v2/system"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/hooks/storage"
"github.com/mochi-co/mqtt/v2/packets"
"github.com/mochi-co/mqtt/v2/system"
"github.com/timshannon/badgerhold"
)
@@ -346,7 +346,7 @@ func (h *Hook) OnSysInfoTick(sys *system.Info) {
in := &storage.SystemInfo{
ID: sysInfoKey(),
T: storage.SysInfoKey,
Info: *sys.Clone(),
Info: *sys,
}
err := h.db.Upsert(in.ID, in)

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package badger
@@ -10,10 +10,10 @@ import (
"testing"
"time"
"github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/hooks/storage"
"github.com/mochi-mqtt/server/v2/packets"
"github.com/mochi-mqtt/server/v2/system"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/hooks/storage"
"github.com/mochi-co/mqtt/v2/packets"
"github.com/mochi-co/mqtt/v2/system"
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"
"github.com/timshannon/badgerhold"

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
// package bolt is provided for historical compatibility and may not be actively updated, you should use the badger hook instead.
package bolt
@@ -9,10 +9,10 @@ import (
"errors"
"time"
"github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/hooks/storage"
"github.com/mochi-mqtt/server/v2/packets"
"github.com/mochi-mqtt/server/v2/system"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/hooks/storage"
"github.com/mochi-co/mqtt/v2/packets"
"github.com/mochi-co/mqtt/v2/system"
sgob "github.com/asdine/storm/codec/gob"
"github.com/asdine/storm/v3"

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package bolt
@@ -9,10 +9,10 @@ import (
"testing"
"time"
"github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/hooks/storage"
"github.com/mochi-mqtt/server/v2/packets"
"github.com/mochi-mqtt/server/v2/system"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/hooks/storage"
"github.com/mochi-co/mqtt/v2/packets"
"github.com/mochi-co/mqtt/v2/system"
"github.com/asdine/storm/v3"
"github.com/rs/zerolog"

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package redis
@@ -10,10 +10,10 @@ import (
"errors"
"fmt"
"github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/hooks/storage"
"github.com/mochi-mqtt/server/v2/packets"
"github.com/mochi-mqtt/server/v2/system"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/hooks/storage"
"github.com/mochi-co/mqtt/v2/packets"
"github.com/mochi-co/mqtt/v2/system"
redis "github.com/go-redis/redis/v8"
)

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package redis
@@ -10,10 +10,10 @@ import (
"testing"
"time"
"github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/hooks/storage"
"github.com/mochi-mqtt/server/v2/packets"
"github.com/mochi-mqtt/server/v2/system"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/hooks/storage"
"github.com/mochi-co/mqtt/v2/packets"
"github.com/mochi-co/mqtt/v2/system"
miniredis "github.com/alicebob/miniredis/v2"
redis "github.com/go-redis/redis/v8"

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package storage
@@ -8,8 +8,8 @@ import (
"encoding/json"
"errors"
"github.com/mochi-mqtt/server/v2/packets"
"github.com/mochi-mqtt/server/v2/system"
"github.com/mochi-co/mqtt/v2/packets"
"github.com/mochi-co/mqtt/v2/system"
)
const (

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package storage
@@ -8,8 +8,8 @@ import (
"testing"
"time"
"github.com/mochi-mqtt/server/v2/packets"
"github.com/mochi-mqtt/server/v2/system"
"github.com/mochi-co/mqtt/v2/packets"
"github.com/mochi-co/mqtt/v2/system"
"github.com/stretchr/testify/require"
)

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package mqtt
@@ -11,9 +11,9 @@ import (
"testing"
"time"
"github.com/mochi-mqtt/server/v2/hooks/storage"
"github.com/mochi-mqtt/server/v2/packets"
"github.com/mochi-mqtt/server/v2/system"
"github.com/mochi-co/mqtt/v2/hooks/storage"
"github.com/mochi-co/mqtt/v2/packets"
"github.com/mochi-co/mqtt/v2/system"
"github.com/stretchr/testify/require"
)
@@ -50,14 +50,6 @@ func (h *modifiedHookBase) Stop() error {
return nil
}
func (h *modifiedHookBase) OnConnect(cl *Client, pk packets.Packet) error {
if h.fail {
return errTestHook
}
return nil
}
func (h *modifiedHookBase) OnConnectAuthenticate(cl *Client, pk packets.Packet) bool {
return true
}
@@ -236,7 +228,7 @@ func TestHooksNonReturns(t *testing.T) {
h.OnStarted()
h.OnStopped()
h.OnSysInfoTick(new(system.Info))
h.OnSessionEstablish(cl, packets.Packet{})
h.OnConnect(cl, packets.Packet{})
h.OnSessionEstablished(cl, packets.Packet{})
h.OnDisconnect(cl, nil, false)
h.OnPacketSent(cl, packets.Packet{}, []byte{})
@@ -246,7 +238,6 @@ func TestHooksNonReturns(t *testing.T) {
h.OnPublished(cl, packets.Packet{})
h.OnPublishDropped(cl, packets.Packet{})
h.OnRetainMessage(cl, packets.Packet{}, 0)
h.OnRetainPublished(cl, packets.Packet{})
h.OnQosPublish(cl, packets.Packet{}, time.Now().Unix(), 0)
h.OnQosComplete(cl, packets.Packet{})
h.OnQosDropped(cl, packets.Packet{})
@@ -347,7 +338,7 @@ func TestHooksOnPublish(t *testing.T) {
// coverage: failure
hook.fail = true
pk, err = h.OnPublish(new(Client), packets.Packet{PacketID: 10})
require.Error(t, err)
require.NoError(t, err)
require.Equal(t, uint16(10), pk.PacketID)
// coverage: reject packet
@@ -402,22 +393,6 @@ func TestHooksOnAuthPacket(t *testing.T) {
require.Equal(t, uint16(10), pk.PacketID)
}
func TestHooksOnConnect(t *testing.T) {
h := new(Hooks)
h.Log = &logger
hook := new(modifiedHookBase)
err := h.Add(hook, nil)
require.NoError(t, err)
err = h.OnConnect(new(Client), packets.Packet{PacketID: 10})
require.NoError(t, err)
hook.fail = true
err = h.OnConnect(new(Client), packets.Packet{PacketID: 10})
require.Error(t, err)
}
func TestHooksOnPacketEncode(t *testing.T) {
h := new(Hooks)
h.Log = &logger
@@ -590,19 +565,12 @@ func TestHookBaseOnConnectAuthenticate(t *testing.T) {
v := h.OnConnectAuthenticate(new(Client), packets.Packet{})
require.False(t, v)
}
func TestHookBaseOnACLCheck(t *testing.T) {
h := new(HookBase)
v := h.OnACLCheck(new(Client), "topic", true)
require.False(t, v)
}
func TestHookBaseOnConnect(t *testing.T) {
h := new(HookBase)
err := h.OnConnect(new(Client), packets.Packet{})
require.NoError(t, err)
}
func TestHookBaseOnPublish(t *testing.T) {
h := new(HookBase)
pk, err := h.OnPublish(new(Client), packets.Packet{PacketID: 10})

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2023 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 J. Blake / mochi-co
// SPDX-FileContributor: mochi-co
package mqtt
@@ -9,7 +9,7 @@ import (
"sync"
"sync/atomic"
"github.com/mochi-mqtt/server/v2/packets"
"github.com/mochi-co/mqtt/v2/packets"
)
// Inflight is a map of InflightMessage keyed on packet id.

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package mqtt
@@ -8,7 +8,7 @@ import (
"sync/atomic"
"testing"
"github.com/mochi-mqtt/server/v2/packets"
"github.com/mochi-co/mqtt/v2/packets"
"github.com/stretchr/testify/require"
)

View File

@@ -1,104 +0,0 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2023 mochi-mqtt, mochi-co
// SPDX-FileContributor: Derek Duncan
package listeners
import (
"context"
"net/http"
"sync"
"sync/atomic"
"time"
"github.com/rs/zerolog"
)
// HTTPHealthCheck is a listener for providing an HTTP healthcheck endpoint.
type HTTPHealthCheck struct {
sync.RWMutex
id string // the internal id of the listener
address string // the network address to bind to
config *Config // configuration values for the listener
listen *http.Server // the http server
log *zerolog.Logger // server logger
end uint32 // ensure the close methods are only called once
}
// NewHTTPHealthCheck initialises and returns a new HTTP listener, listening on an address.
func NewHTTPHealthCheck(id, address string, config *Config) *HTTPHealthCheck {
if config == nil {
config = new(Config)
}
return &HTTPHealthCheck{
id: id,
address: address,
config: config,
}
}
// ID returns the id of the listener.
func (l *HTTPHealthCheck) ID() string {
return l.id
}
// Address returns the address of the listener.
func (l *HTTPHealthCheck) Address() string {
return l.address
}
// Protocol returns the address of the listener.
func (l *HTTPHealthCheck) Protocol() string {
if l.listen != nil && l.listen.TLSConfig != nil {
return "https"
}
return "http"
}
// Init initializes the listener.
func (l *HTTPHealthCheck) Init(log *zerolog.Logger) error {
l.log = log
mux := http.NewServeMux()
mux.HandleFunc("/healthcheck", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
}
})
l.listen = &http.Server{
ReadTimeout: 5 * time.Second,
WriteTimeout: 5 * time.Second,
Addr: l.address,
Handler: mux,
}
if l.config.TLSConfig != nil {
l.listen.TLSConfig = l.config.TLSConfig
}
return nil
}
// Serve starts listening for new connections and serving responses.
func (l *HTTPHealthCheck) Serve(establish EstablishFn) {
if l.listen.TLSConfig != nil {
l.listen.ListenAndServeTLS("", "")
} else {
l.listen.ListenAndServe()
}
}
// Close closes the listener and any client connections.
func (l *HTTPHealthCheck) Close(closeClients CloseFn) {
l.Lock()
defer l.Unlock()
if atomic.CompareAndSwapUint32(&l.end, 0, 1) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
l.listen.Shutdown(ctx)
}
closeClients(l.id)
}

View File

@@ -1,143 +0,0 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2023 mochi-mqtt, mochi-co
// SPDX-FileContributor: Derek Duncan
package listeners
import (
"io"
"net/http"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestNewHTTPHealthCheck(t *testing.T) {
l := NewHTTPHealthCheck("healthcheck", testAddr, nil)
require.Equal(t, "healthcheck", l.id)
require.Equal(t, testAddr, l.address)
}
func TestHTTPHealthCheckID(t *testing.T) {
l := NewHTTPHealthCheck("healthcheck", testAddr, nil)
require.Equal(t, "healthcheck", l.ID())
}
func TestHTTPHealthCheckAddress(t *testing.T) {
l := NewHTTPHealthCheck("healthcheck", testAddr, nil)
require.Equal(t, testAddr, l.Address())
}
func TestHTTPHealthCheckProtocol(t *testing.T) {
l := NewHTTPHealthCheck("healthcheck", testAddr, nil)
require.Equal(t, "http", l.Protocol())
}
func TestHTTPHealthCheckTLSProtocol(t *testing.T) {
l := NewHTTPHealthCheck("healthcheck", testAddr, &Config{
TLSConfig: tlsConfigBasic,
})
l.Init(nil)
require.Equal(t, "https", l.Protocol())
}
func TestHTTPHealthCheckInit(t *testing.T) {
l := NewHTTPHealthCheck("healthcheck", testAddr, nil)
err := l.Init(nil)
require.NoError(t, err)
require.NotNil(t, l.listen)
require.Equal(t, testAddr, l.listen.Addr)
}
func TestHTTPHealthCheckServeAndClose(t *testing.T) {
// setup http stats listener
l := NewHTTPHealthCheck("healthcheck", testAddr, nil)
err := l.Init(nil)
require.NoError(t, err)
o := make(chan bool)
go func(o chan bool) {
l.Serve(MockEstablisher)
o <- true
}(o)
time.Sleep(time.Millisecond)
// call healthcheck
resp, err := http.Get("http://localhost" + testAddr + "/healthcheck")
require.NoError(t, err)
require.NotNil(t, resp)
defer resp.Body.Close()
_, err = io.ReadAll(resp.Body)
require.NoError(t, err)
// ensure listening is closed
var closed bool
l.Close(func(id string) {
closed = true
})
require.Equal(t, true, closed)
_, err = http.Get("http://localhost/healthcheck" + testAddr + "/healthcheck")
require.Error(t, err)
<-o
}
func TestHTTPHealthCheckServeAndCloseMethodNotAllowed(t *testing.T) {
// setup http stats listener
l := NewHTTPHealthCheck("healthcheck", testAddr, nil)
err := l.Init(nil)
require.NoError(t, err)
o := make(chan bool)
go func(o chan bool) {
l.Serve(MockEstablisher)
o <- true
}(o)
time.Sleep(time.Millisecond)
// make disallowed method type http request
resp, err := http.Post("http://localhost"+testAddr+"/healthcheck", "application/json", http.NoBody)
require.NoError(t, err)
require.NotNil(t, resp)
defer resp.Body.Close()
_, err = io.ReadAll(resp.Body)
require.NoError(t, err)
// ensure listening is closed
var closed bool
l.Close(func(id string) {
closed = true
})
require.Equal(t, true, closed)
_, err = http.Post("http://localhost/healthcheck"+testAddr+"/healthcheck", "application/json", http.NoBody)
require.Error(t, err)
<-o
}
func TestHTTPHealthCheckServeTLSAndClose(t *testing.T) {
l := NewHTTPHealthCheck("healthcheck", testAddr, &Config{
TLSConfig: tlsConfigBasic,
})
err := l.Init(nil)
require.NoError(t, err)
o := make(chan bool)
go func(o chan bool) {
l.Serve(MockEstablisher)
o <- true
}(o)
time.Sleep(time.Millisecond)
l.Close(MockCloser)
}

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package listeners
@@ -13,7 +13,7 @@ import (
"sync/atomic"
"time"
"github.com/mochi-mqtt/server/v2/system"
"github.com/mochi-co/mqtt/v2/system"
"github.com/rs/zerolog"
)

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package listeners
@@ -11,7 +11,7 @@ import (
"testing"
"time"
"github.com/mochi-mqtt/server/v2/system"
"github.com/mochi-co/mqtt/v2/system"
"github.com/stretchr/testify/require"
)

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package listeners

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package listeners

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package listeners

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package listeners

View File

@@ -1,7 +1,3 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2023 mochi-mqtt, mochi-co
// SPDX-FileContributor: Jeroen Rinzema
package listeners
import (

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package listeners

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package listeners

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: jason@zgwit.com
package listeners

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: jason@zgwit.com
package listeners

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package listeners
@@ -99,7 +99,7 @@ func (l *Websocket) handler(w http.ResponseWriter, r *http.Request) {
}
defer c.Close()
err = l.establish(l.id, &wsConn{Conn: c.UnderlyingConn(), c: c})
err = l.establish(l.id, &wsConn{c.UnderlyingConn(), c})
if err != nil {
l.log.Warn().Err(err).Send()
}
@@ -135,41 +135,25 @@ func (l *Websocket) Close(closeClients CloseFn) {
type wsConn struct {
net.Conn
c *websocket.Conn
// reader for the current message (may be nil)
r io.Reader
}
// Read reads the next span of bytes from the websocket connection and returns the number of bytes read.
func (ws *wsConn) Read(p []byte) (int, error) {
if ws.r == nil {
op, r, err := ws.c.NextReader()
if err != nil {
return 0, err
}
if op != websocket.BinaryMessage {
err = ErrInvalidMessage
return 0, err
}
ws.r = r
op, r, err := ws.c.NextReader()
if err != nil {
return 0, err
}
var n int
for {
// buffer is full, return what we've read so far
if n == len(p) {
return n, nil
}
if op != websocket.BinaryMessage {
err = ErrInvalidMessage
return 0, err
}
br, err := ws.r.Read(p[n:])
var n, br int
for {
br, err = r.Read(p[n:])
n += br
if err != nil {
// when ANY error occurs, we consider this the end of the current message (either because it really is, via
// io.EOF, or because something bad happened, in which case we want to drop the remainder)
ws.r = nil
if errors.Is(err, io.EOF) {
err = nil
}

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package listeners
@@ -112,44 +112,3 @@ func TestWebsocketUpgrade(t *testing.T) {
s.Close()
ws.Close()
}
func TestWebsocketConnectionReads(t *testing.T) {
l := NewWebsocket("t1", testAddr, nil)
l.Init(nil)
recv := make(chan []byte)
l.establish = func(id string, c net.Conn) error {
var out []byte
for {
buf := make([]byte, 2048)
n, err := c.Read(buf)
require.NoError(t, err)
out = append(out, buf[:n]...)
if n < 2048 {
break
}
}
recv <- out
return nil
}
s := httptest.NewServer(http.HandlerFunc(l.handler))
ws, _, err := websocket.DefaultDialer.Dial("ws"+strings.TrimPrefix(s.URL, "http"), nil)
require.NoError(t, err)
pkt := make([]byte, 3000) // make sure this is >2048
for i := 0; i < len(pkt); i++ {
pkt[i] = byte(i % 100)
}
err = ws.WriteMessage(websocket.BinaryMessage, pkt)
require.NoError(t, err)
got := <-recv
require.Equal(t, 3000, len(got))
require.Equal(t, pkt, got)
s.Close()
ws.Close()
}

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package packets

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package packets

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package packets
@@ -28,7 +28,6 @@ var (
2: CodeGrantedQos2,
}
CodeSuccessIgnore = Code{Code: 0x00, Reason: "ignore packet"}
CodeSuccess = Code{Code: 0x00, Reason: "success"}
CodeDisconnect = Code{Code: 0x00, Reason: "disconnected"}
CodeGrantedQos0 = Code{Code: 0x00, Reason: "granted qos 0"}

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package packets

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package packets

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package packets

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package packets
@@ -135,7 +135,6 @@ type Packet struct {
SessionPresent bool // session existed for connack
ReasonCode byte // reason code for a packet response (acks, etc)
ReservedBit byte // reserved, do not use (except in testing)
Ignore bool // if true, do not perform any message forwarding operations
}
// Mods specifies certain values required for certain mqtt v5 compliance within packet encoding/decoding.
@@ -176,7 +175,6 @@ type Subscription struct {
Qos byte
RetainAsPublished bool
NoLocal bool
FwdRetainedFlag bool // true if the subscription forms part of a publish response to a client subscription and packet is retained.
}
// Copy creates a new instance of a packet, but with an empty header for inheriting new QoS flags, etc.
@@ -389,7 +387,7 @@ func (pk *Packet) ConnectDecode(buf []byte) error {
offset += n
}
pk.Connect.ClientIdentifier, offset, err = decodeString(buf, offset) // [MQTT-3.1.3-1] [MQTT-3.1.3-2] [MQTT-3.1.3-3] [MQTT-3.1.3-4]
pk.Connect.ClientIdentifier, offset, err = decodeString(buf, offset) //[MQTT-3.1.3-1] [MQTT-3.1.3-2] [MQTT-3.1.3-3] [MQTT-3.1.3-4]
if err != nil {
return ErrClientIdentifierNotValid // [MQTT-3.1.3-8]
}

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2023 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 J. Blake / mochi-co
// SPDX-FileContributor: mochi-co
package packets

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package packets

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package packets

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package packets
@@ -82,7 +82,6 @@ const (
TConnackAcceptedAdjustedExpiryInterval
TConnackMinMqtt5
TConnackMinCleanMqtt5
TConnackServerKeepalive
TConnackInvalidMinMqtt5
TConnackBadProtocolVersion
TConnackProtocolViolationNoSession
@@ -103,7 +102,6 @@ const (
TPublishBasicMqtt5
TPublishMqtt5
TPublishQos1
TPublishQos1Mqtt5
TPublishQos1NoPayload
TPublishQos1Dup
TPublishQos2
@@ -133,7 +131,6 @@ const (
TPubackMqtt5
TPubackMalPacketID
TPubackMalProperties
TPubackUnexpectedError
TPubrec
TPubrecMqtt5
TPubrecMqtt5IDInUse
@@ -1088,22 +1085,25 @@ var TPacketData = map[byte]TPacketCases{
Desc: "accepted, no session, adjusted expiry interval mqtt5",
Primary: true,
RawBytes: []byte{
Connack << 4, 8, // fixed header
Connack << 4, 11, // fixed header
0, // Session present
CodeSuccess.Code,
5, // length
8, // length
17, 0, 0, 0, 120, // Session Expiry Interval (17)
19, 0, 10, // Server Keep Alive (19)
},
Packet: &Packet{
ProtocolVersion: 5,
FixedHeader: FixedHeader{
Type: Connack,
Remaining: 8,
Remaining: 11,
},
ReasonCode: CodeSuccess.Code,
Properties: Properties{
SessionExpiryInterval: uint32(120),
SessionExpiryIntervalFlag: true,
ServerKeepAlive: uint16(10),
ServerKeepAliveFlag: true,
},
},
},
@@ -1190,25 +1190,28 @@ var TPacketData = map[byte]TPacketCases{
Desc: "accepted min properties mqtt5",
Primary: true,
RawBytes: []byte{
Connack << 4, 13, // fixed header
Connack << 4, 16, // fixed header
1, // existing session
CodeSuccess.Code,
10, // Properties length
13, // Properties length
18, 0, 5, 'm', 'o', 'c', 'h', 'i', // Assigned Client ID (18)
19, 0, 20, // Server Keep Alive (19)
36, 1, // Maximum Qos (36)
},
Packet: &Packet{
ProtocolVersion: 5,
FixedHeader: FixedHeader{
Type: Connack,
Remaining: 13,
Remaining: 16,
},
SessionPresent: true,
ReasonCode: CodeSuccess.Code,
Properties: Properties{
AssignedClientID: "mochi",
MaximumQos: byte(1),
MaximumQosFlag: true,
ServerKeepAlive: uint16(20),
ServerKeepAliveFlag: true,
AssignedClientID: "mochi",
MaximumQos: byte(1),
MaximumQosFlag: true,
},
},
},
@@ -1216,29 +1219,9 @@ var TPacketData = map[byte]TPacketCases{
Case: TConnackMinCleanMqtt5,
Desc: "accepted min properties mqtt5b",
Primary: true,
RawBytes: []byte{
Connack << 4, 3, // fixed header
0, // existing session
CodeSuccess.Code,
0, // Properties length
},
Packet: &Packet{
ProtocolVersion: 5,
FixedHeader: FixedHeader{
Type: Connack,
Remaining: 16,
},
SessionPresent: false,
ReasonCode: CodeSuccess.Code,
},
},
{
Case: TConnackServerKeepalive,
Desc: "server set keepalive",
Primary: true,
RawBytes: []byte{
Connack << 4, 6, // fixed header
1, // existing session
0, // existing session
CodeSuccess.Code,
3, // Properties length
19, 0, 10, // server keepalive
@@ -1247,9 +1230,9 @@ var TPacketData = map[byte]TPacketCases{
ProtocolVersion: 5,
FixedHeader: FixedHeader{
Type: Connack,
Remaining: 6,
Remaining: 16,
},
SessionPresent: true,
SessionPresent: false,
ReasonCode: CodeSuccess.Code,
Properties: Properties{
ServerKeepAlive: uint16(10),
@@ -1262,23 +1245,26 @@ var TPacketData = map[byte]TPacketCases{
Desc: "failure min properties mqtt5",
Primary: true,
RawBytes: append([]byte{
Connack << 4, 23, // fixed header
Connack << 4, 26, // fixed header
0, // No existing session
ErrUnspecifiedError.Code,
// Properties
20, // length
23, // length
19, 0, 20, // Server Keep Alive (19)
31, 0, 17, // Reason String (31)
}, []byte(ErrUnspecifiedError.Reason)...),
Packet: &Packet{
ProtocolVersion: 5,
FixedHeader: FixedHeader{
Type: Connack,
Remaining: 23,
Remaining: 25,
},
SessionPresent: false,
ReasonCode: ErrUnspecifiedError.Code,
Properties: Properties{
ReasonString: ErrUnspecifiedError.Reason,
ServerKeepAlive: uint16(20),
ServerKeepAliveFlag: true,
ReasonString: ErrUnspecifiedError.Reason,
},
},
},
@@ -1706,43 +1692,6 @@ var TPacketData = map[byte]TPacketCases{
PacketID: 7,
},
},
{
Case: TPublishQos1Mqtt5,
Desc: "mqtt v5",
Primary: true,
RawBytes: []byte{
Publish<<4 | 1<<1, 37, // Fixed header
0, 5, // Topic Name - LSB+MSB
'a', '/', 'b', '/', 'c', // Topic Name
0, 7, // Packet ID - LSB+MSB
// Properties
16, // length
38, // User Properties (38)
0, 5, 'h', 'e', 'l', 'l', 'o',
0, 6, 228, 184, 150, 231, 149, 140,
'h', 'e', 'l', 'l', 'o', ' ', 'm', 'o', 'c', 'h', 'i', // Payload
},
Packet: &Packet{
ProtocolVersion: 5,
FixedHeader: FixedHeader{
Type: Publish,
Remaining: 37,
Qos: 1,
},
PacketID: 7,
TopicName: "a/b/c",
Properties: Properties{
User: []UserProperty{
{
Key: "hello",
Val: "世界",
},
},
},
Payload: []byte("hello mochi"),
},
},
{
Case: TPublishQos1Dup,
Desc: "qos:1, dup:true, packet id",
@@ -2274,32 +2223,6 @@ var TPacketData = map[byte]TPacketCases{
},
},
},
{
Case: TPubackUnexpectedError,
Desc: "unexpected error",
Group: "decode",
RawBytes: []byte{
Puback << 4, 29, // Fixed header
0, 7, // Packet ID - LSB+MSB
ErrPayloadFormatInvalid.Code, // Reason Code
25, // Properties Length
31, 0, 22, 'p', 'a', 'y', 'l', 'o', 'a', 'd',
' ', 'f', 'o', 'r', 'm', 'a', 't',
' ', 'i', 'n', 'v', 'a', 'l', 'i', 'd', // Reason String (31)
},
Packet: &Packet{
ProtocolVersion: 5,
FixedHeader: FixedHeader{
Type: Puback,
Remaining: 28,
},
PacketID: 7,
ReasonCode: ErrPayloadFormatInvalid.Code,
Properties: Properties{
ReasonString: ErrPayloadFormatInvalid.Reason,
},
},
},
// Fail states
{
@@ -2381,17 +2304,14 @@ var TPacketData = map[byte]TPacketCases{
Desc: "packet id in use mqtt5",
Primary: true,
RawBytes: []byte{
Pubrec << 4, 47, // Fixed header
Pubrec << 4, 31, // Fixed header
0, 7, // Packet ID - LSB+MSB
ErrPacketIdentifierInUse.Code, // Reason Code
43, // Properties Length
27, // Properties Length
31, 0, 24, 'p', 'a', 'c', 'k', 'e', 't',
' ', 'i', 'd', 'e', 'n', 't', 'i', 'f', 'i', 'e', 'r',
' ', 'i', 'n',
' ', 'u', 's', 'e', // Reason String (31)
38, // User Properties (38)
0, 5, 'h', 'e', 'l', 'l', 'o',
0, 6, 228, 184, 150, 231, 149, 140,
},
Packet: &Packet{
ProtocolVersion: 5,
@@ -2403,12 +2323,6 @@ var TPacketData = map[byte]TPacketCases{
ReasonCode: ErrPacketIdentifierInUse.Code,
Properties: Properties{
ReasonString: ErrPacketIdentifierInUse.Reason,
User: []UserProperty{
{
Key: "hello",
Val: "世界",
},
},
},
},
},

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package packets

120
server.go
View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
// package mqtt provides a high performance, fully compliant MQTT v5 broker server with v3.1.1 backward compatibility.
@@ -17,17 +17,17 @@ import (
"sync/atomic"
"time"
"github.com/mochi-mqtt/server/v2/hooks/storage"
"github.com/mochi-mqtt/server/v2/listeners"
"github.com/mochi-mqtt/server/v2/packets"
"github.com/mochi-mqtt/server/v2/system"
"github.com/mochi-co/mqtt/v2/hooks/storage"
"github.com/mochi-co/mqtt/v2/listeners"
"github.com/mochi-co/mqtt/v2/packets"
"github.com/mochi-co/mqtt/v2/system"
"github.com/rs/zerolog"
)
const (
Version = "2.3.0" // the current server version.
defaultSysTopicInterval int64 = 1 // the interval between $SYS topic publishes
Version = "2.2.8" // the current server version.
defaultSysTopicInterval int64 = 1 // the interval between $SYS topic publishes
)
var (
@@ -43,6 +43,7 @@ var (
WildcardSubAvailable: 1, // wildcard subscriptions are available
SubIDAvailable: 1, // subscription identifiers are available
SharedSubAvailable: 1, // shared subscriptions are available
ServerKeepAlive: 10, // default keepalive for clients
MinimumProtocolVersion: 3, // minimum supported mqtt version (3.0.0)
MaximumClientWritesPending: 1024 * 8, // maximum number of pending message writes for a client
}
@@ -60,6 +61,7 @@ type Capabilities struct {
maximumPacketID uint32 // unexported, used for testing only
ReceiveMaximum uint16
TopicAliasMaximum uint16
ServerKeepAlive uint16
SharedSubAvailable byte
MinimumProtocolVersion byte
Compatibilities Compatibilities
@@ -71,11 +73,10 @@ type Capabilities struct {
// Compatibilities provides flags for using compatibility modes.
type Compatibilities struct {
ObscureNotAuthorized bool // return unspecified errors instead of not authorized
PassiveClientDisconnect bool // don't disconnect the client forcefully after sending disconnect packet (paho - spec violation)
AlwaysReturnResponseInfo bool // always return response info (useful for testing)
RestoreSysInfoOnRestart bool // restore system info from store as if server never stopped
NoInheritedPropertiesOnAck bool // don't allow inherited user properties on ack (paho - spec violation)
ObscureNotAuthorized bool // return unspecified errors instead of not authorized
PassiveClientDisconnect bool // don't disconnect the client forcefully after sending disconnect packet (paho)
AlwaysReturnResponseInfo bool // always return response info (useful for testing)
RestoreSysInfoOnRestart bool // restore system info from store as if server never stopped
}
// Options contains configurable options for the server.
@@ -214,7 +215,7 @@ func (s *Server) NewClient(c net.Conn, listener string, id string, inline bool)
if inline { // inline clients bypass acl and some validity checks.
cl.Net.Inline = true
// By default, we don't want to restrict developer publishes,
// By default we don't want to restrict developer publishes,
// but if you do, reset this after creating inline client.
cl.State.Inflight.ResetReceiveQuota(math.MaxInt32)
} else {
@@ -323,20 +324,16 @@ func (s *Server) attachClient(cl *Client, listener string) error {
cl.ParseConnect(listener, pk)
code := s.validateConnect(cl, pk) // [MQTT-3.1.4-1] [MQTT-3.1.4-2]
if code != packets.CodeSuccess {
if err := s.SendConnack(cl, code, false, nil); err != nil {
if err := s.sendConnack(cl, code, false); err != nil {
return fmt.Errorf("invalid connection send ack: %w", err)
}
return code // [MQTT-3.2.2-7] [MQTT-3.1.4-6]
}
err = s.hooks.OnConnect(cl, pk)
if err != nil {
return err
}
s.hooks.OnConnect(cl, pk)
cl.refreshDeadline(cl.State.Keepalive)
if !s.hooks.OnConnectAuthenticate(cl, pk) { // [MQTT-3.1.4-2]
err := s.SendConnack(cl, packets.ErrBadUsernameOrPassword, false, nil)
err := s.sendConnack(cl, packets.ErrBadUsernameOrPassword, false)
if err != nil {
return fmt.Errorf("invalid connection send ack: %w", err)
}
@@ -347,12 +344,10 @@ func (s *Server) attachClient(cl *Client, listener string) error {
atomic.AddInt64(&s.Info.ClientsConnected, 1)
defer atomic.AddInt64(&s.Info.ClientsConnected, -1)
s.hooks.OnSessionEstablish(cl, pk)
sessionPresent := s.inheritClientSession(pk, cl)
s.Clients.Add(cl) // [MQTT-4.1.0-1]
err = s.SendConnack(cl, code, sessionPresent, nil) // [MQTT-3.1.4-5] [MQTT-3.2.0-1] [MQTT-3.2.0-2] &[MQTT-3.14.0-1]
err = s.sendConnack(cl, code, sessionPresent) // [MQTT-3.1.4-5] [MQTT-3.2.0-1] [MQTT-3.2.0-2] &[MQTT-3.14.0-1]
if err != nil {
return fmt.Errorf("ack connection packet: %w", err)
}
@@ -500,18 +495,12 @@ func (s *Server) inheritClientSession(pk packets.Packet, cl *Client) bool {
return false // [MQTT-3.2.2-2]
}
// SendConnack returns a Connack packet to a client.
func (s *Server) SendConnack(cl *Client, reason packets.Code, present bool, properties *packets.Properties) error {
if properties == nil {
properties = &packets.Properties{
ReceiveMaximum: s.Options.Capabilities.ReceiveMaximum,
}
}
properties.ReceiveMaximum = s.Options.Capabilities.ReceiveMaximum // 3.2.2.3.3 Receive Maximum
if cl.State.ServerKeepalive { // You can set this dynamically using the OnConnect hook.
properties.ServerKeepAlive = cl.State.Keepalive // [MQTT-3.1.2-21]
properties.ServerKeepAliveFlag = true
// sendConnack returns a Connack packet to a client.
func (s *Server) sendConnack(cl *Client, reason packets.Code, present bool) error {
properties := packets.Properties{
ServerKeepAlive: s.Options.Capabilities.ServerKeepAlive, // [MQTT-3.1.2-21]
ServerKeepAliveFlag: true,
ReceiveMaximum: s.Options.Capabilities.ReceiveMaximum, // 3.2.2.3.3 Receive Maximum
}
if reason.Code >= packets.ErrUnspecifiedError.Code {
@@ -528,8 +517,9 @@ func (s *Server) SendConnack(cl *Client, reason packets.Code, present bool, prop
},
SessionPresent: false, // [MQTT-3.2.2-6]
ReasonCode: reason.Code, // [MQTT-3.2.2-8]
Properties: *properties,
Properties: properties,
}
return cl.WritePacket(ack)
}
@@ -549,15 +539,14 @@ func (s *Server) SendConnack(cl *Client, reason packets.Code, present bool, prop
cl.Properties.Props.SessionExpiryIntervalFlag = true
}
ack := packets.Packet{
return cl.WritePacket(packets.Packet{
FixedHeader: packets.FixedHeader{
Type: packets.Connack,
},
SessionPresent: present,
ReasonCode: reason.Code, // [MQTT-3.2.2-8]
Properties: *properties,
}
return cl.WritePacket(ack)
Properties: properties,
})
}
// processPacket processes an inbound packet for a client. Since the method is
@@ -716,19 +705,10 @@ func (s *Server) processPublish(cl *Client, pk packets.Packet) error {
pk.FixedHeader.Qos = s.Options.Capabilities.MaximumQos // [MQTT-3.2.2-9] Reduce Qos based on server max qos capability
}
pkx, err := s.hooks.OnPublish(cl, pk)
if err == nil {
if pkx, err := s.hooks.OnPublish(cl, pk); err == nil {
pk = pkx
} else if errors.Is(err, packets.ErrRejectPacket) {
return nil
} else if errors.Is(err, packets.CodeSuccessIgnore) {
pk.Ignore = true
} else if cl.Properties.ProtocolVersion == 5 && pk.FixedHeader.Qos > 0 && errors.As(err, new(packets.Code)) {
err = cl.WritePacket(s.buildAck(pk.PacketID, packets.Puback, 0, pk.Properties, err.(packets.Code)))
if err != nil {
return err
}
return nil
}
if pk.FixedHeader.Retain { // [MQTT-3.3.1-5] ![MQTT-3.3.1-8]
@@ -752,7 +732,7 @@ func (s *Server) processPublish(cl *Client, pk packets.Packet) error {
s.hooks.OnQosPublish(cl, ack, ack.Created, 0)
}
err = cl.WritePacket(ack)
err := cl.WritePacket(ack)
if err != nil {
return err
}
@@ -772,12 +752,8 @@ func (s *Server) processPublish(cl *Client, pk packets.Packet) error {
}
// retainMessage adds a message to a topic, and if a persistent store is provided,
// adds the message to the store to be reloaded if necessary.
// adds the message to the store so it can be reloaded if necessary.
func (s *Server) retainMessage(cl *Client, pk packets.Packet) {
if s.Options.Capabilities.RetainAvailable == 0 || pk.Ignore {
return
}
out := pk.Copy(false)
r := s.Topics.RetainMessage(out)
s.hooks.OnRetainMessage(cl, pk, r)
@@ -786,10 +762,6 @@ func (s *Server) retainMessage(cl *Client, pk packets.Packet) {
// publishToSubscribers publishes a publish packet to all subscribers with matching topic filters.
func (s *Server) publishToSubscribers(pk packets.Packet) {
if pk.Ignore {
return
}
if pk.Created == 0 {
pk.Created = time.Now().Unix()
}
@@ -824,7 +796,7 @@ func (s *Server) publishToClient(cl *Client, sub packets.Subscription, pk packet
}
out := pk.Copy(false)
if !sub.FwdRetainedFlag && ((cl.Properties.ProtocolVersion == 5 && !sub.RetainAsPublished) || cl.Properties.ProtocolVersion < 5) { // ![MQTT-3.3.1-13] [v3 MQTT-3.3.1-9]
if cl.Properties.ProtocolVersion == 5 && !sub.RetainAsPublished { // ![MQTT-3.3.1-13]
out.FixedHeader.Retain = false // [MQTT-3.3.1-12]
}
@@ -836,10 +808,6 @@ func (s *Server) publishToClient(cl *Client, sub packets.Subscription, pk packet
sort.Ints(out.Properties.SubscriptionIdentifier)
}
if out.FixedHeader.Qos > sub.Qos {
out.FixedHeader.Qos = sub.Qos
}
if out.FixedHeader.Qos > s.Options.Capabilities.MaximumQos {
out.FixedHeader.Qos = s.Options.Capabilities.MaximumQos // [MQTT-3.2.2-9]
}
@@ -906,22 +874,17 @@ func (s *Server) publishRetainedToClient(cl *Client, sub packets.Subscription, e
return
}
sub.FwdRetainedFlag = true
for _, pkv := range s.Topics.Messages(sub.Filter) { // [MQTT-3.8.4-4]
_, err := s.publishToClient(cl, sub, pkv)
if err != nil {
s.Log.Debug().Err(err).Str("client", cl.ID).Str("listener", cl.Net.Listener).Interface("packet", pkv).Msg("failed to publish retained message")
continue
}
s.hooks.OnRetainPublished(cl, pkv)
}
}
// buildAck builds a standardised ack message for Puback, Pubrec, Pubrel, Pubcomp packets.
// buildAck builds an standardised ack message for Puback, Pubrec, Pubrel, Pubcomp packets.
func (s *Server) buildAck(packetID uint16, pkt, qos byte, properties packets.Properties, reason packets.Code) packets.Packet {
if s.Options.Capabilities.Compatibilities.NoInheritedPropertiesOnAck {
properties = packets.Properties{}
}
properties = packets.Properties{} // PRL
if reason.Code >= packets.ErrUnspecifiedError.Code {
properties.ReasonString = reason.Reason
}
@@ -1064,7 +1027,7 @@ func (s *Server) processSubscribe(cl *Client, pk packets.Packet) error {
}
}
ack := packets.Packet{ // [MQTT-3.8.4-1] [MQTT-3.8.4-5]
ack := packets.Packet{ //[MQTT-3.8.4-1] [MQTT-3.8.4-5]
FixedHeader: packets.FixedHeader{
Type: packets.Suback,
},
@@ -1160,7 +1123,7 @@ func (s *Server) UnsubscribeClient(cl *Client) {
filters[i] = v
i++
}
s.hooks.OnUnsubscribed(cl, packets.Packet{FixedHeader: packets.FixedHeader{Type: packets.Unsubscribe}, Filters: filters})
s.hooks.OnUnsubscribed(cl, packets.Packet{Filters: filters})
}
// processAuth processes an Auth packet.
@@ -1271,7 +1234,7 @@ func (s *Server) publishSysTopics() {
s.hooks.OnSysInfoTick(s.Info)
}
// Close attempts to gracefully shut down the server, all listeners, clients, and stores.
// Close attempts to gracefully shutdown the server, all listeners, clients, and stores.
func (s *Server) Close() error {
close(s.done)
s.Listeners.CloseAll(s.closeListenerClients)
@@ -1320,10 +1283,6 @@ func (s *Server) sendLWT(cl *Client) {
return
}
if pk.FixedHeader.Retain {
s.retainMessage(cl, pk)
}
s.publishToSubscribers(pk) // [MQTT-3.1.2-8]
atomic.StoreUint32(&cl.Properties.Will.Flag, 0) // [MQTT-3.1.2-10]
s.hooks.OnWillSent(cl, pk)
@@ -1516,9 +1475,6 @@ func (s *Server) sendDelayedLWT(dt int64) {
if dt > pk.Expiry {
s.publishToSubscribers(pk) // [MQTT-3.1.2-8]
if cl, ok := s.Clients.Get(id); ok {
if pk.FixedHeader.Retain {
s.retainMessage(cl, pk)
}
cl.Properties.Will = Will{} // [MQTT-3.1.2-10]
s.hooks.OnWillSent(cl, pk)
}

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package mqtt
@@ -15,10 +15,10 @@ import (
"testing"
"time"
"github.com/mochi-mqtt/server/v2/hooks/storage"
"github.com/mochi-mqtt/server/v2/listeners"
"github.com/mochi-mqtt/server/v2/packets"
"github.com/mochi-mqtt/server/v2/system"
"github.com/mochi-co/mqtt/v2/hooks/storage"
"github.com/mochi-co/mqtt/v2/listeners"
"github.com/mochi-co/mqtt/v2/packets"
"github.com/mochi-co/mqtt/v2/system"
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"
@@ -127,7 +127,7 @@ func TestServerNewClient(t *testing.T) {
require.NotNil(t, cl.State.Inflight.internal)
require.NotNil(t, cl.State.Subscriptions)
require.NotNil(t, cl.State.TopicAliases)
require.Equal(t, defaultKeepalive, cl.State.Keepalive)
require.Equal(t, defaultKeepalive, cl.State.keepalive)
require.Equal(t, defaultClientProtocolVersion, cl.Properties.ProtocolVersion)
require.NotNil(t, cl.Net.Conn)
require.NotNil(t, cl.Net.bconn)
@@ -478,7 +478,7 @@ func TestEstablishConnectionInheritExisting(t *testing.T) {
require.Empty(t, cl.State.Subscriptions.GetAll())
}
// See https://github.com/mochi-mqtt/server/issues/173
// See https://github.com/mochi-co/mqtt/issues/173
func TestEstablishConnectionInheritExistingTrueTakeover(t *testing.T) {
s := newServer()
d := new(DelayHook)
@@ -748,7 +748,7 @@ func TestServerEstablishConnectionInvalidConnect(t *testing.T) {
r.Close()
}
// See https://github.com/mochi-mqtt/server/issues/178
// See https://github.com/mochi-co/mqtt/issues/178
func TestServerEstablishConnectionZeroByteUsernameIsValid(t *testing.T) {
s := newServer()
@@ -817,40 +817,17 @@ func TestServerEstablishConnectionBadPacket(t *testing.T) {
r.Close()
}
func TestServerEstablishConnectionOnConnectError(t *testing.T) {
s := newServer()
hook := new(modifiedHookBase)
hook.fail = true
err := s.AddHook(hook, nil)
require.NoError(t, err)
r, w := net.Pipe()
o := make(chan error)
go func() {
o <- s.EstablishConnection("tcp", r)
}()
go func() {
w.Write(packets.TPacketData[packets.Connect].Get(packets.TConnectClean).RawBytes)
}()
err = <-o
require.Error(t, err)
require.ErrorIs(t, err, errTestHook)
r.Close()
}
func TestServerSendConnack(t *testing.T) {
s := newServer()
cl, r, w := newTestClient()
cl.Properties.ProtocolVersion = 5
s.Options.Capabilities.ServerKeepAlive = 20
s.Options.Capabilities.MaximumQos = 1
cl.Properties.Props = packets.Properties{
AssignedClientID: "mochi",
}
go func() {
err := s.SendConnack(cl, packets.CodeSuccess, true, nil)
err := s.sendConnack(cl, packets.CodeSuccess, true)
require.NoError(t, err)
w.Close()
}()
@@ -864,8 +841,9 @@ func TestServerSendConnackFailureReason(t *testing.T) {
s := newServer()
cl, r, w := newTestClient()
cl.Properties.ProtocolVersion = 5
s.Options.Capabilities.ServerKeepAlive = 20
go func() {
err := s.SendConnack(cl, packets.ErrUnspecifiedError, true, nil)
err := s.sendConnack(cl, packets.ErrUnspecifiedError, true)
require.NoError(t, err)
w.Close()
}()
@@ -875,23 +853,6 @@ func TestServerSendConnackFailureReason(t *testing.T) {
require.Equal(t, packets.TPacketData[packets.Connack].Get(packets.TConnackInvalidMinMqtt5).RawBytes, buf)
}
func TestServerSendConnackWithServerKeepalive(t *testing.T) {
s := newServer()
cl, r, w := newTestClient()
cl.Properties.ProtocolVersion = 5
cl.State.Keepalive = 10
cl.State.ServerKeepalive = true
go func() {
err := s.SendConnack(cl, packets.CodeSuccess, true, nil)
require.NoError(t, err)
w.Close()
}()
buf, err := io.ReadAll(r)
require.NoError(t, err)
require.Equal(t, packets.TPacketData[packets.Connack].Get(packets.TConnackServerKeepalive).RawBytes, buf)
}
func TestServerValidateConnect(t *testing.T) {
packet := *packets.TPacketData[packets.Connect].Get(packets.TConnectMqtt5).Packet
invalidBitPacket := packet
@@ -961,7 +922,7 @@ func TestServerSendConnackAdjustedExpiryInterval(t *testing.T) {
cl.Properties.Props.SessionExpiryInterval = uint32(300)
s.Options.Capabilities.MaximumSessionExpiryInterval = 120
go func() {
err := s.SendConnack(cl, packets.CodeSuccess, false, nil)
err := s.sendConnack(cl, packets.CodeSuccess, false)
require.NoError(t, err)
w.Close()
}()
@@ -1194,54 +1155,10 @@ func TestServerProcessPacketPublishAndReceive(t *testing.T) {
w2.Close()
}()
require.Equal(t, packets.TPacketData[packets.Publish].Get(packets.TPublishBasic).RawBytes, <-receiverBuf)
require.Equal(t, packets.TPacketData[packets.Publish].Get(packets.TPublishRetain).RawBytes, <-receiverBuf)
require.Equal(t, 1, len(s.Topics.Messages("a/b/c")))
}
func TestServerBuildAck(t *testing.T) {
s := newServer()
properties := packets.Properties{
User: []packets.UserProperty{
{Key: "hello", Val: "世界"},
},
}
ack := s.buildAck(7, packets.Puback, 1, properties, packets.CodeGrantedQos1)
require.Equal(t, packets.Puback, ack.FixedHeader.Type)
require.Equal(t, uint8(1), ack.FixedHeader.Qos)
require.Equal(t, packets.CodeGrantedQos1.Code, ack.ReasonCode)
require.Equal(t, properties, ack.Properties)
}
func TestServerBuildAckError(t *testing.T) {
s := newServer()
properties := packets.Properties{
User: []packets.UserProperty{
{Key: "hello", Val: "世界"},
},
}
ack := s.buildAck(7, packets.Puback, 1, properties, packets.ErrMalformedPacket)
require.Equal(t, packets.Puback, ack.FixedHeader.Type)
require.Equal(t, uint8(1), ack.FixedHeader.Qos)
require.Equal(t, packets.ErrMalformedPacket.Code, ack.ReasonCode)
properties.ReasonString = packets.ErrMalformedPacket.Reason
require.Equal(t, properties, ack.Properties)
}
func TestServerBuildAckPahoCompatibility(t *testing.T) {
s := newServer()
s.Options.Capabilities.Compatibilities.NoInheritedPropertiesOnAck = true
properties := packets.Properties{
User: []packets.UserProperty{
{Key: "hello", Val: "世界"},
},
}
ack := s.buildAck(7, packets.Puback, 1, properties, packets.CodeGrantedQos1)
require.Equal(t, packets.Puback, ack.FixedHeader.Type)
require.Equal(t, uint8(1), ack.FixedHeader.Qos)
require.Equal(t, packets.CodeGrantedQos1.Code, ack.ReasonCode)
require.Equal(t, packets.Properties{}, ack.Properties)
}
func TestServerProcessPacketAndNextImmediate(t *testing.T) {
s := newServer()
cl, r, w := newTestClient()
@@ -1266,7 +1183,7 @@ func TestServerProcessPacketAndNextImmediate(t *testing.T) {
require.Equal(t, int32(4), cl.State.Inflight.sendQuota)
}
func TestServerProcessPublishAckFailure(t *testing.T) {
func TestServerProcessPacketPublishAckFailure(t *testing.T) {
s := newServer()
s.Serve()
defer s.Close()
@@ -1280,92 +1197,6 @@ func TestServerProcessPublishAckFailure(t *testing.T) {
require.ErrorIs(t, err, io.ErrClosedPipe)
}
func TestServerProcessPublishOnPublishAckErrorRWError(t *testing.T) {
s := newServer()
hook := new(modifiedHookBase)
hook.fail = true
hook.err = packets.ErrUnspecifiedError
err := s.AddHook(hook, nil)
require.NoError(t,err)
cl, _, w := newTestClient()
cl.Properties.ProtocolVersion = 5
s.Clients.Add(cl)
w.Close()
err = s.processPublish(cl, *packets.TPacketData[packets.Publish].Get(packets.TPublishQos1).Packet)
require.Error(t, err)
require.ErrorIs(t, err, io.ErrClosedPipe)
}
func TestServerProcessPublishOnPublishAckErrorContinue(t *testing.T) {
s := newServer()
hook := new(modifiedHookBase)
hook.fail = true
hook.err = packets.ErrPayloadFormatInvalid
err := s.AddHook(hook, nil)
require.NoError(t,err)
s.Serve()
defer s.Close()
cl, r, w := newTestClient()
cl.Properties.ProtocolVersion = 5
s.Clients.Add(cl)
go func() {
err := s.processPacket(cl, *packets.TPacketData[packets.Publish].Get(packets.TPublishQos1).Packet)
require.NoError(t, err)
w.Close()
}()
buf, err := io.ReadAll(r)
require.NoError(t, err)
require.Equal(t, packets.TPacketData[packets.Puback].Get(packets.TPubackUnexpectedError).RawBytes, buf)
}
func TestServerProcessPublishOnPublishPkIgnore(t *testing.T) {
s := newServer()
hook := new(modifiedHookBase)
hook.fail = true
hook.err = packets.CodeSuccessIgnore
err := s.AddHook(hook, nil)
require.NoError(t,err)
s.Serve()
defer s.Close()
cl, r, w := newTestClient()
s.Clients.Add(cl)
receiver, r2, w2 := newTestClient()
receiver.ID = "receiver"
s.Clients.Add(receiver)
s.Topics.Subscribe(receiver.ID, packets.Subscription{Filter: "a/b/c"})
require.Equal(t, int64(0), atomic.LoadInt64(&s.Info.PacketsReceived))
require.Equal(t, 0, len(s.Topics.Messages("a/b/c")))
receiverBuf := make(chan []byte)
go func() {
buf, err := io.ReadAll(r2)
require.NoError(t, err)
receiverBuf <- buf
}()
go func() {
err := s.processPacket(cl, *packets.TPacketData[packets.Publish].Get(packets.TPublishQos1).Packet)
require.NoError(t, err)
w.Close()
w2.Close()
}()
buf, err := io.ReadAll(r)
require.NoError(t, err)
require.Equal(t, packets.TPacketData[packets.Puback].Get(packets.TPuback).RawBytes, buf)
require.Equal(t, []byte{}, <-receiverBuf)
require.Equal(t, 0, len(s.Topics.Messages("a/b/c")))
}
func TestServerProcessPacketPublishMaximumReceive(t *testing.T) {
s := newServer()
s.Serve()
@@ -1523,7 +1354,6 @@ func TestServerProcessPacketPublishDowngradeQos(t *testing.T) {
require.Equal(t, packets.TPacketData[packets.Puback].Get(packets.TPuback).RawBytes, buf)
}
func TestPublishToSubscribersSelfNoLocal(t *testing.T) {
s := newServer()
cl, r, w := newTestClient()
@@ -1668,32 +1498,6 @@ func TestPublishToSubscribersIdentifiers(t *testing.T) {
require.Equal(t, packets.TPacketData[packets.Publish].Get(packets.TPublishSubscriberIdentifier).RawBytes, <-receiverBuf)
}
func TestPublishToSubscribersPkIgnore(t *testing.T) {
s := newServer()
cl, r, w := newTestClient()
s.Clients.Add(cl)
subbed := s.Topics.Subscribe(cl.ID, packets.Subscription{Filter: "#", Identifier: 1})
require.True(t, subbed)
go func() {
pk := *packets.TPacketData[packets.Publish].Get(packets.TPublishBasic).Packet
pk.Ignore = true
s.publishToSubscribers(pk)
time.Sleep(time.Millisecond)
w.Close()
}()
receiverBuf := make(chan []byte)
go func() {
buf, err := io.ReadAll(r)
require.NoError(t, err)
receiverBuf <- buf
}()
require.Equal(t, []byte{}, <-receiverBuf)
}
func TestPublishToClientServerDowngradeQos(t *testing.T) {
s := newServer()
s.Options.Capabilities.MaximumQos = 1
@@ -1723,35 +1527,6 @@ func TestPublishToClientServerDowngradeQos(t *testing.T) {
require.Equal(t, packets.TPacketData[packets.Publish].Get(packets.TPublishQos1).RawBytes, <-receiverBuf)
}
func TestPublishToClientSubscriptionDowngradeQos(t *testing.T) {
s := newServer()
s.Options.Capabilities.MaximumQos = 2
cl, r, w := newTestClient()
s.Clients.Add(cl)
_, ok := cl.State.Inflight.Get(1)
require.False(t, ok)
cl.State.packetID = 6 // just to match the same packet id (7) in the fixtures
go func() {
pkx := *packets.TPacketData[packets.Publish].Get(packets.TPublishQos1).Packet
pkx.FixedHeader.Qos = 2
s.publishToClient(cl, packets.Subscription{Filter: "a/b/c", Qos: 1}, pkx)
time.Sleep(time.Microsecond * 100)
w.Close()
}()
receiverBuf := make(chan []byte)
go func() {
buf, err := io.ReadAll(r)
require.NoError(t, err)
receiverBuf <- buf
}()
require.Equal(t, packets.TPacketData[packets.Publish].Get(packets.TPublishQos1).RawBytes, <-receiverBuf)
}
func TestPublishToClientExceedClientWritesPending(t *testing.T) {
s := newServer()
@@ -1809,29 +1584,6 @@ func TestPublishToClientServerTopicAlias(t *testing.T) {
require.Equal(t, append(pk1, pk2...), ret)
}
func TestPublishToClientMqtt3RetainFalseLeverageNoConn(t *testing.T) {
s := newServer()
cl, _, _ := newTestClient()
cl.Net.Conn = nil
out, err := s.publishToClient(cl, packets.Subscription{Filter: "a/b/c", RetainAsPublished: true}, *packets.TPacketData[packets.Publish].Get(packets.TPublishRetain).Packet)
require.False(t, out.FixedHeader.Retain)
require.Error(t, err)
require.ErrorIs(t, err, packets.CodeDisconnect)
}
func TestPublishToClientMqtt5RetainAsPublishedTrueLeverageNoConn(t *testing.T) {
s := newServer()
cl, _, _ := newTestClient()
cl.Properties.ProtocolVersion = 5
cl.Net.Conn = nil
out, err := s.publishToClient(cl, packets.Subscription{Filter: "a/b/c", RetainAsPublished: true}, *packets.TPacketData[packets.Publish].Get(packets.TPublishRetain).Packet)
require.True(t, out.FixedHeader.Retain)
require.Error(t, err)
require.ErrorIs(t, err, packets.CodeDisconnect)
}
func TestPublishToClientExhaustedPacketID(t *testing.T) {
s := newServer()
cl, _, _ := newTestClient()
@@ -1839,7 +1591,7 @@ func TestPublishToClientExhaustedPacketID(t *testing.T) {
cl.State.Inflight.Set(packets.Packet{PacketID: uint16(i)})
}
_, err := s.publishToClient(cl, packets.Subscription{Filter: "a/b/c", Qos: 1}, *packets.TPacketData[packets.Publish].Get(packets.TPublishQos1).Packet)
_, err := s.publishToClient(cl, packets.Subscription{Filter: "a/b/c"}, *packets.TPacketData[packets.Publish].Get(packets.TPublishQos1).Packet)
require.Error(t, err)
require.ErrorIs(t, err, packets.ErrQuotaExceeded)
}
@@ -1993,37 +1745,6 @@ func TestPublishRetainedToClientError(t *testing.T) {
s.publishRetainedToClient(cl, sub, false)
}
func TestNoRetainMessageIfUnavailable(t *testing.T) {
s := newServer()
s.Options.Capabilities.RetainAvailable = 0
cl, _, _ := newTestClient()
s.Clients.Add(cl)
s.retainMessage(new(Client), *packets.TPacketData[packets.Publish].Get(packets.TPublishRetain).Packet)
require.Equal(t, int64(0), atomic.LoadInt64(&s.Info.Retained))
}
func TestNoRetainMessageIfPkIgnore(t *testing.T) {
s := newServer()
cl, _, _ := newTestClient()
s.Clients.Add(cl)
pk := *packets.TPacketData[packets.Publish].Get(packets.TPublishRetain).Packet
pk.Ignore = true
s.retainMessage(new(Client), pk)
require.Equal(t, int64(0), atomic.LoadInt64(&s.Info.Retained))
}
func TestNoRetainMessage(t *testing.T) {
s := newServer()
cl, _, _ := newTestClient()
s.Clients.Add(cl)
s.retainMessage(new(Client), *packets.TPacketData[packets.Publish].Get(packets.TPublishRetain).Packet)
require.Equal(t, int64(1), atomic.LoadInt64(&s.Info.Retained))
}
func TestServerProcessPacketPuback(t *testing.T) {
tt := ProtocolTest{
{
@@ -2850,46 +2571,6 @@ func TestServerSendLWT(t *testing.T) {
require.Equal(t, packets.TPacketData[packets.Publish].Get(packets.TPublishBasic).RawBytes, <-receiverBuf)
}
func TestServerSendLWTRetain(t *testing.T) {
s := newServer()
s.Serve()
defer s.Close()
sender, _, w1 := newTestClient()
sender.ID = "sender"
sender.Properties.Will = Will{
Flag: 1,
TopicName: "a/b/c",
Payload: []byte("hello mochi"),
Retain: true,
}
s.Clients.Add(sender)
receiver, r2, w2 := newTestClient()
receiver.ID = "receiver"
s.Clients.Add(receiver)
s.Topics.Subscribe(receiver.ID, packets.Subscription{Filter: "a/b/c", Qos: 0})
require.Equal(t, int64(0), atomic.LoadInt64(&s.Info.PacketsReceived))
require.Equal(t, 0, len(s.Topics.Messages("a/b/c")))
receiverBuf := make(chan []byte)
go func() {
buf, err := io.ReadAll(r2)
require.NoError(t, err)
receiverBuf <- buf
}()
go func() {
s.sendLWT(sender)
time.Sleep(time.Millisecond * 10)
w1.Close()
w2.Close()
}()
require.Equal(t, packets.TPacketData[packets.Publish].Get(packets.TPublishBasic).RawBytes, <-receiverBuf)
}
func TestServerSendLWTDelayed(t *testing.T) {
s := newServer()
cl1, _, _ := newTestClient()
@@ -2928,7 +2609,7 @@ func TestServerSendLWTDelayed(t *testing.T) {
recv <- buf
}()
require.Equal(t, packets.TPacketData[packets.Publish].Get(packets.TPublishBasic).RawBytes, <-recv)
require.Equal(t, packets.TPacketData[packets.Publish].Get(packets.TPublishRetain).RawBytes, <-recv)
}
func TestServerReadStore(t *testing.T) {
@@ -3125,7 +2806,7 @@ func TestServerClearExpiredClients(t *testing.T) {
cl0, _, _ := newTestClient()
cl0.ID = "c0"
cl0.State.disconnected = n - 10
cl0.State.cancelOpen()
cl0.State.open = nil
cl0.Properties.ProtocolVersion = 5
cl0.Properties.Props.SessionExpiryInterval = 12
cl0.Properties.Props.SessionExpiryIntervalFlag = true
@@ -3135,7 +2816,7 @@ func TestServerClearExpiredClients(t *testing.T) {
cl1, _, _ := newTestClient()
cl1.ID = "c1"
cl1.State.disconnected = n - 10
cl1.State.cancelOpen()
cl1.State.open = nil
cl1.Properties.ProtocolVersion = 5
cl1.Properties.Props.SessionExpiryInterval = 8
cl1.Properties.Props.SessionExpiryIntervalFlag = true
@@ -3145,7 +2826,7 @@ func TestServerClearExpiredClients(t *testing.T) {
cl2, _, _ := newTestClient()
cl2.ID = "c2"
cl2.State.disconnected = n - 10
cl2.State.cancelOpen()
cl2.State.open = nil
cl2.Properties.ProtocolVersion = 5
cl2.Properties.Props.SessionExpiryInterval = 0
cl2.Properties.Props.SessionExpiryIntervalFlag = true

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileContributor: mochi-co
package system

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2023 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 J. Blake / mochi-co
// SPDX-FileContributor: mochi-co
package mqtt
@@ -9,7 +9,7 @@ import (
"sync"
"sync/atomic"
"github.com/mochi-mqtt/server/v2/packets"
"github.com/mochi-co/mqtt/v2/packets"
)
var (
@@ -327,9 +327,7 @@ func (x *TopicsIndex) Unsubscribe(filter, client string) bool {
defer x.root.Unlock()
var d int
prefix, _ := isolateParticle(filter, 0)
shareSub := strings.EqualFold(prefix, SharePrefix)
if shareSub {
if strings.HasPrefix(filter, SharePrefix) {
d = 2
}
@@ -338,7 +336,8 @@ func (x *TopicsIndex) Unsubscribe(filter, client string) bool {
return false
}
if shareSub {
prefix, _ := isolateParticle(filter, 0)
if strings.EqualFold(prefix, SharePrefix) {
group, _ := isolateParticle(filter, 1)
particle.shared.Delete(group, client)
} else {
@@ -501,27 +500,20 @@ func (x *TopicsIndex) scanSubscribers(topic string, d int, n *particle, subs *Su
}
key, hasNext := isolateParticle(topic, d)
for _, partKey := range []string{key, "+"} {
for _, partKey := range []string{key, "+", "#"} {
if particle := n.particles.get(partKey); particle != nil { // [MQTT-3.3.2-3]
x.gatherSubscriptions(topic, particle, subs)
x.gatherSharedSubscriptions(particle, subs)
if wild := particle.particles.get("#"); wild != nil && partKey != "#" && partKey != "+" {
x.gatherSubscriptions(topic, wild, subs) // also match any subs where filter/# is filter as per 4.7.1.2
}
if hasNext {
x.scanSubscribers(topic, d+1, particle, subs)
} else {
x.gatherSubscriptions(topic, particle, subs)
x.gatherSharedSubscriptions(particle, subs)
if wild := particle.particles.get("#"); wild != nil && partKey != "+" {
x.gatherSubscriptions(topic, wild, subs) // also match any subs where filter/# is filter as per 4.7.1.2
x.gatherSharedSubscriptions(wild, subs)
}
}
}
}
if particle := n.particles.get("#"); particle != nil {
x.gatherSubscriptions(topic, particle, subs)
x.gatherSharedSubscriptions(particle, subs)
}
return subs
}

View File

@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2023 mochi-mqtt, mochi-co
// SPDX-FileCopyrightText: 2022 J. Blake / mochi-co
// SPDX-FileContributor: mochi-co
package mqtt
@@ -7,7 +7,7 @@ package mqtt
import (
"testing"
"github.com/mochi-mqtt/server/v2/packets"
"github.com/mochi-co/mqtt/v2/packets"
"github.com/stretchr/testify/require"
)
@@ -319,7 +319,7 @@ func TestUnsubscribeShared(t *testing.T) {
require.True(t, exists)
require.Equal(t, byte(2), client.Qos)
require.True(t, index.Unsubscribe("$share/tmp/a/b/c", "cl1"))
require.True(t, index.Unsubscribe("$SHARE/tmp/a/b/c", "cl1"))
_, exists = final.shared.Get("tmp", "cl1")
require.False(t, exists)
}
@@ -501,40 +501,28 @@ func TestScanSubscribers(t *testing.T) {
index.Subscribe("cl2", packets.Subscription{Qos: 0, Filter: "$SYS/test", Identifier: 2})
subs := index.scanSubscribers("a/b/c", 0, nil, new(Subscribers))
require.Equal(t, 3, len(subs.Subscriptions))
require.Equal(t, 4, len(subs.Subscriptions))
require.Contains(t, subs.Subscriptions, "cl1")
require.Contains(t, subs.Subscriptions, "cl2")
require.Contains(t, subs.Subscriptions, "cl3")
require.Contains(t, subs.Subscriptions, "cl4")
require.Equal(t, byte(1), subs.Subscriptions["cl1"].Qos)
require.Equal(t, byte(2), subs.Subscriptions["cl2"].Qos)
require.Equal(t, byte(1), subs.Subscriptions["cl3"].Qos)
require.Equal(t, byte(0), subs.Subscriptions["cl4"].Qos)
require.Equal(t, 22, subs.Subscriptions["cl1"].Identifiers["a/b/c"])
require.Equal(t, 0, subs.Subscriptions["cl2"].Identifiers["a/#"])
require.Equal(t, 77, subs.Subscriptions["cl2"].Identifiers["a/b/+"])
require.Equal(t, 0, subs.Subscriptions["cl2"].Identifiers["a/b/c"])
require.Equal(t, 5, subs.Subscriptions["cl4"].Identifiers["#"])
subs = index.scanSubscribers("d/e/f/g", 0, nil, new(Subscribers))
require.Equal(t, 1, len(subs.Subscriptions))
require.Contains(t, subs.Subscriptions, "cl4")
require.Equal(t, byte(0), subs.Subscriptions["cl4"].Qos)
require.Equal(t, 234, subs.Subscriptions["cl3"].Identifiers["+/b"])
require.Equal(t, 5, subs.Subscriptions["cl4"].Identifiers["#"])
subs = index.scanSubscribers("", 0, nil, new(Subscribers))
require.Equal(t, 0, len(subs.Subscriptions))
}
func TestScanSubscribersTopicInheritanceBug(t *testing.T) {
index := NewTopicsIndex()
index.Subscribe("cl1", packets.Subscription{Qos: 0, Filter: "a/b/c"})
index.Subscribe("cl2", packets.Subscription{Qos: 0, Filter: "a/b"})
subs := index.scanSubscribers("a/b/c", 0, nil, new(Subscribers))
require.Equal(t, 1, len(subs.Subscriptions))
}
func TestScanSubscribersShared(t *testing.T) {
index := NewTopicsIndex()
index.Subscribe("cl1", packets.Subscription{Qos: 1, Filter: SharePrefix + "/tmp/a/b/c", Identifier: 111})
@@ -543,9 +531,8 @@ func TestScanSubscribersShared(t *testing.T) {
index.Subscribe("cl2", packets.Subscription{Qos: 0, Filter: SharePrefix + "/tmp/a/b/+", Identifier: 10})
index.Subscribe("cl3", packets.Subscription{Qos: 1, Filter: SharePrefix + "/tmp/a/b/+", Identifier: 200})
index.Subscribe("cl4", packets.Subscription{Qos: 0, Filter: SharePrefix + "/tmp/a/b/+", Identifier: 201})
index.Subscribe("cl5", packets.Subscription{Qos: 0, Filter: SharePrefix + "/tmp/a/b/c/#"})
subs := index.scanSubscribers("a/b/c", 0, nil, new(Subscribers))
require.Equal(t, 4, len(subs.Shared))
require.Equal(t, 3, len(subs.Shared))
}
func TestSelectSharedSubscriber(t *testing.T) {