Compare commits

...

25 Commits

Author SHA1 Message Date
mochi
b277600823 Increment server version to 1.0.1 2022-01-05 21:14:11 +00:00
JB
685c050fdd Merge pull request #11 from mochi-co/feature/event-hooks-publish
Feature/event hooks publish
2022-01-05 21:13:01 +00:00
mochi
0abbaf5070 fix onmessage test 2022-01-05 21:09:12 +00:00
mochi
1ab1928cff change scheduled message for clarity 2022-01-05 21:05:31 +00:00
mochi
8890bb9dd4 remove redundant code 2022-01-05 21:05:20 +00:00
mochi
f9348aaf93 Update Readme to add Event Hooks section 2022-01-05 20:59:25 +00:00
mochi
c2a42a16ca Merge OnMessage and OnMessageModify 2022-01-05 20:59:14 +00:00
mochi
d14d944de9 Update events example with publish hooks 2022-01-05 20:38:32 +00:00
mochi
480e60b3f0 Adds tests for publishing event hooks 2022-01-05 20:38:10 +00:00
mochi
d4cbf1abdc Add Event Hooks
Adds basic event hooks (OnMessage, OnMessageModify) to the server using the new events library.
2022-01-05 20:38:00 +00:00
mochi
8a1c53432e Add Events
Events library contains event hook types and related utility functions
2022-01-05 20:37:15 +00:00
mochi
7c7b8d58fe Return packets to internal
Now that we can alias types, there's no compelling reason to expose the packets library
2022-01-05 18:10:24 +00:00
JB
ce773b3978 Merge pull request #10 from mochi-co/expose-packets
Expose packets library
2022-01-05 17:06:36 +00:00
JB
f3e7469478 Merge pull request #8 from mochi-co/feature/inline-publish
Inline Publishing
2022-01-05 17:02:14 +00:00
mochi
b5685ca0ee update packets library import reference 2022-01-05 17:01:15 +00:00
mochi
66edb0564c expose packets library 2022-01-05 17:00:51 +00:00
mochi
1d9fa4199c Add .DS_Store to ignore list 2022-01-05 17:00:17 +00:00
mochi
dec880231d Update with direct publishing
Adds information about direct publishing and moves performance section
2022-01-05 13:49:41 +00:00
mochi
21d4e54e74 Add inline publishing example
Adds an example file which demonstrates the usage of the `Publish` method. This file will also be used to demonstrate event hooks.
2022-01-05 13:48:17 +00:00
mochi
aeb4190733 Add tests for new inline publishing method 2022-01-05 13:32:28 +00:00
mochi
484e4abd56 Directly publish messages from embedding system
When the broker is embedded in a larger Go codebase, it is beneficial to be able to publish messages directly from the system to topics. This change provides a Publish method which adds messages to an inline publishing queue in a separate goroutine, which are then processed in the standard way and issued to all clients with matching topic filters.
2022-01-05 13:32:12 +00:00
mochi
d51bad30fc Update comments and rename input parameter for clarity 2022-01-05 13:14:50 +00:00
mochi
060fbffa79 Update comments for clarity 2022-01-05 13:14:15 +00:00
mochi
7c68614912 Add .gitignore
Ensure we're not committing any binaries
2022-01-05 13:13:54 +00:00
JB
124be96c0e Remove Codacy badge 2021-11-01 21:54:40 +00:00
9 changed files with 563 additions and 109 deletions

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
cmd/mqtt
.DS_Store

175
README.md
View File

@@ -4,7 +4,6 @@
[![Build Status](https://travis-ci.com/mochi-co/mqtt.svg?token=59nqixhtefy2iQRwsPcu&branch=master)](https://travis-ci.com/mochi-co/mqtt)
[![contributions welcome](https://img.shields.io/badge/contributions-welcome-brightgreen.svg?style=flat)](https://github.com/mochi-co/mqtt/issues)
[![codecov](https://codecov.io/gh/mochi-co/mqtt/branch/master/graph/badge.svg?token=6vBUgYVaVB)](https://codecov.io/gh/mochi-co/mqtt)
[![Codacy Badge](https://api.codacy.com/project/badge/Grade/b48e17f87cee4221b60a45c02d49148c)](https://www.codacy.com/app/mochi-co/mqtt?utm_source=github.com&utm_medium=referral&utm_content=mochi-co/mqtt&utm_campaign=Badge_Grade)
[![GoDoc](https://godoc.org/github.com/mochi-co/mqtt?status.svg)](https://pkg.go.dev/github.com/mochi-co/mqtt)
</p>
@@ -25,78 +24,13 @@ MQTT stands for MQ Telemetry Transport. It is a publish/subscribe, extremely sim
- TCP, Websocket, (including SSL/TLS) and Dashboard listeners.
- Interfaces for Client Authentication and Topic access control.
- Bolt-backed persistence and storage interfaces.
- Directly Publishing from embedding service (`s.Publish(topic, message, retain)`).
#### Roadmap
- Inline Pub-sub (without client) and event hooks
- Event Hooks (eg. provide handler functions for `onMessage`).
- Docker Image
- MQTT v5 compatibility
#### Performance (messages/second)
Performance benchmarks were tested using [MQTT-Stresser](https://github.com/inovex/mqtt-stresser) on a 13-inch, Early 2015 Macbook Pro (2.7 GHz Intel Core i5). Taking into account bursts of high and low throughput, the median scores are the most useful. Higher is better. SEND = Publish throughput, RECV = Subscribe throughput.
> As usual, any performance benchmarks should be taken with a pinch of salt, but are shown to demonstrate typical throughput compared to the other leading MQTT brokers.
**Single Client, 10,000 messages**
_With only 1 client, there is no variation in throughput so the benchmark is reports the same number for high, low, and median._
![1 Client, 10,000 Messages](assets/benchmarkchart_1_10000.png "1 Client, 10,000 Messages")
`mqtt-stresser -broker tcp://localhost:1883 -num-clients=1 -num-messages=10000`
| | Mochi | Mosquitto | EMQX | VerneMQ | Mosca |
| :----------- | --------: | ----------: | -------: | --------: | --------:
| SEND Max | 36505 | 30597 | 27202 | 32782 | 30125 |
| SEND Min | 36505 | 30597 | 27202 | 32782 | 30125 |
| SEND Median | 36505 | 30597 | 27202 |32782 | 30125 |
| RECV Max | 152221 | 59130 | 7879 | 17551 | 9145 |
| RECV Min | 152221 | 59130 | 7879 | 17551 | 9145 |
| RECV Median | 152221 | 59130 | 7879 | 17551 | 9145 |
**10 Clients, 1,000 Messages**
![10 Clients, 1,000 Messages](assets/benchmarkchart_10_1000.png "10 Clients, 1,000 Messages")
`mqtt-stresser -broker tcp://localhost:1883 -num-clients=10 -num-messages=1000`
| | Mochi | Mosquitto | EMQX | VerneMQ | Mosca |
| :----------- | --------: | ----------: | -------: | --------: | --------:
| SEND Max | 37193 | 15775 | 17455 | 34138 | 36575 |
| SEND Min | 6529 | 6446 | 7714 | 8583 | 7383 |
| SEND Median | 15127 | 7813 | 10305 | 9887 | 8169 |
| RECV Max | 33535 | 3710 | 3022 | 4534 | 9411 |
| RECV Min | 7484 | 2661 | 1689 | 2021 | 2275 |
| RECV Median | 11427 | 3142 | 1831 | 2468 | 4692 |
**10 Clients, 10,000 Messages**
![10 Clients, 10000 Messages](assets/benchmarkchart_10_10000.png "10 Clients, 10000 Messages")
`mqtt-stresser -broker tcp://localhost:1883 -num-clients=10 -num-messages=10000`
| | Mochi | Mosquitto | EMQX | VerneMQ | Mosca |
| :----------- | --------: | ----------: | -------: | --------: | --------:
| SEND Max | 13153 | 13270 | 12229 | 13025 | 38446 |
| SEND Min | 8728 | 8513 | 8193 | 6483 | 3889 |
| SEND Median | 9045 | 9532 | 9252 | 8031 | 9210 |
| RECV Max | 20774 | 5052 | 2093 | 2071 | 43008 |
| RECV Min | 10718 |3995 | 1531 | 1673 | 18764 |
| RECV Median | 16339 | 4607 | 1620 | 1907 | 33524 |
**500 Clients, 100 Messages**
![500 Clients, 100 Messages](assets/benchmarkchart_500_100.png "500 Clients, 100 Messages")
`mqtt-stresser -broker tcp://localhost:1883 -num-clients=500 -num-messages=100`
| | Mochi | Mosquitto | EMQX | VerneMQ | Mosca |
| :----------- | --------: | ----------: | -------: | --------: | --------:
| SEND Max | 70688 | 72686 | 71392 | 75336 | 73192 |
| SEND Min | 1021 | 2577 | 1603 | 8417 | 2344 |
| SEND Median | 49871 | 33076 | 33637 | 35200 | 31312 |
| RECV Max | 116163 | 4215 | 3427 | 5484 | 10100 |
| RECV Min | 1044 | 156 | 56 | 83 | 169 |
| RECV Median | 24398 | 208 | 94 | 413 | 474 |
#### Using the Broker
Mochi MQTT can be used as a standalone broker. Simply checkout this repository and run the `main.go` entrypoint in the `cmd` folder which will expose tcp (:1883), websocket (:1882), and dashboard (:8080) listeners. A docker image is coming soon.
@@ -169,6 +103,43 @@ SSL may be configured on both the TCP and Websocket listeners by providing a pub
```
> Note the mandatory inclusion of the Auth Controller!
#### Event Hooks
Some basic Event Hooks have been added, allowing you to call your own functions when certain events occur. The execution of the functions are blocking - if necessary, please handle goroutines within the embedding service.
##### OnMessage
`server.Events.OnMessage` is called when a Publish packet is received. The function receives the published message and information about the client who published it. This function will block message dispatching until it returns.
> This hook is only triggered when a message is received by clients. It is not triggered when using the direct `server.Publish` method.
```go
import "github.com/mochi-co/mqtt/server/events"
server.Events.OnMessage = func(cl events.Client, pk events.Packet) (pkx events.Packet, err error) {
if string(pk.Payload) == "hello" {
pkx = pk
pkx.Payload = []byte("hello world")
return pkx, nil
}
return pk, nil
}
```
A working example can be found in the `examples/events` folder. Please open an issue if there is a particular event hook you are interested in!
#### Direct Publishing
When the broker is being embedded in a larger codebase, it can be useful to be able to publish messages directly to clients without having to implement a loopback TCP connection with an MQTT client. The `Publish` method allows you to inject publish messages directly into a queue to be delivered to any clients with matching topic filters. The `Retain` flag is supported.
```go
// func (s *Server) Publish(topic string, payload []byte, retain bool) error
err := s.Publish("a/b/c", []byte("hello"), false)
if err != nil {
log.Fatal(err)
}
```
A working example can be found in the `examples/events` folder.
#### Data Persistence
Mochi MQTT provides a `persistence.Store` interface for developing and attaching persistent stores to the broker. The default persistence mechanism packaged with the broker is backed by [Bolt](https://github.com/etcd-io/bbolt) and can be enabled by assigning a `*bolt.Store` to the server.
```go
@@ -183,6 +154,74 @@ Mochi MQTT provides a `persistence.Store` interface for developing and attaching
#### Paho Interoperability Test
You can check the broker against the [Paho Interoperability Test](https://github.com/eclipse/paho.mqtt.testing/tree/master/interoperability) by starting the broker using `examples/paho/main.go`, and then running the test with `python3 client_test.py` from the _interoperability_ folder.
#### Performance (messages/second)
Performance benchmarks were tested using [MQTT-Stresser](https://github.com/inovex/mqtt-stresser) on a 13-inch, Early 2015 Macbook Pro (2.7 GHz Intel Core i5). Taking into account bursts of high and low throughput, the median scores are the most useful. Higher is better. SEND = Publish throughput, RECV = Subscribe throughput.
> As usual, any performance benchmarks should be taken with a pinch of salt, but are shown to demonstrate typical throughput compared to the other leading MQTT brokers.
**Single Client, 10,000 messages**
_With only 1 client, there is no variation in throughput so the benchmark is reports the same number for high, low, and median._
![1 Client, 10,000 Messages](assets/benchmarkchart_1_10000.png "1 Client, 10,000 Messages")
`mqtt-stresser -broker tcp://localhost:1883 -num-clients=1 -num-messages=10000`
| | Mochi | Mosquitto | EMQX | VerneMQ | Mosca |
| :----------- | --------: | ----------: | -------: | --------: | --------:
| SEND Max | 36505 | 30597 | 27202 | 32782 | 30125 |
| SEND Min | 36505 | 30597 | 27202 | 32782 | 30125 |
| SEND Median | 36505 | 30597 | 27202 |32782 | 30125 |
| RECV Max | 152221 | 59130 | 7879 | 17551 | 9145 |
| RECV Min | 152221 | 59130 | 7879 | 17551 | 9145 |
| RECV Median | 152221 | 59130 | 7879 | 17551 | 9145 |
**10 Clients, 1,000 Messages**
![10 Clients, 1,000 Messages](assets/benchmarkchart_10_1000.png "10 Clients, 1,000 Messages")
`mqtt-stresser -broker tcp://localhost:1883 -num-clients=10 -num-messages=1000`
| | Mochi | Mosquitto | EMQX | VerneMQ | Mosca |
| :----------- | --------: | ----------: | -------: | --------: | --------:
| SEND Max | 37193 | 15775 | 17455 | 34138 | 36575 |
| SEND Min | 6529 | 6446 | 7714 | 8583 | 7383 |
| SEND Median | 15127 | 7813 | 10305 | 9887 | 8169 |
| RECV Max | 33535 | 3710 | 3022 | 4534 | 9411 |
| RECV Min | 7484 | 2661 | 1689 | 2021 | 2275 |
| RECV Median | 11427 | 3142 | 1831 | 2468 | 4692 |
**10 Clients, 10,000 Messages**
![10 Clients, 10000 Messages](assets/benchmarkchart_10_10000.png "10 Clients, 10000 Messages")
`mqtt-stresser -broker tcp://localhost:1883 -num-clients=10 -num-messages=10000`
| | Mochi | Mosquitto | EMQX | VerneMQ | Mosca |
| :----------- | --------: | ----------: | -------: | --------: | --------:
| SEND Max | 13153 | 13270 | 12229 | 13025 | 38446 |
| SEND Min | 8728 | 8513 | 8193 | 6483 | 3889 |
| SEND Median | 9045 | 9532 | 9252 | 8031 | 9210 |
| RECV Max | 20774 | 5052 | 2093 | 2071 | 43008 |
| RECV Min | 10718 |3995 | 1531 | 1673 | 18764 |
| RECV Median | 16339 | 4607 | 1620 | 1907 | 33524 |
**500 Clients, 100 Messages**
![500 Clients, 100 Messages](assets/benchmarkchart_500_100.png "500 Clients, 100 Messages")
`mqtt-stresser -broker tcp://localhost:1883 -num-clients=500 -num-messages=100`
| | Mochi | Mosquitto | EMQX | VerneMQ | Mosca |
| :----------- | --------: | ----------: | -------: | --------: | --------:
| SEND Max | 70688 | 72686 | 71392 | 75336 | 73192 |
| SEND Min | 1021 | 2577 | 1603 | 8417 | 2344 |
| SEND Median | 49871 | 33076 | 33637 | 35200 | 31312 |
| RECV Max | 116163 | 4215 | 3427 | 5484 | 10100 |
| RECV Min | 1044 | 156 | 56 | 83 | 169 |
| RECV Median | 24398 | 208 | 94 | 413 | 474 |
## Contributions
Contributions and feedback are both welcomed and encouraged! Open an [issue](https://github.com/mochi-co/mqtt/issues) to report a bug, ask a question, or make a feature request.

77
examples/events/main.go Normal file
View File

@@ -0,0 +1,77 @@
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/logrusorgru/aurora"
mqtt "github.com/mochi-co/mqtt/server"
"github.com/mochi-co/mqtt/server/events"
"github.com/mochi-co/mqtt/server/listeners"
"github.com/mochi-co/mqtt/server/listeners/auth"
)
func main() {
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigs
done <- true
}()
fmt.Println(aurora.Magenta("Mochi MQTT Server initializing..."), aurora.Cyan("TCP"))
server := mqtt.New()
tcp := listeners.NewTCP("t1", ":1883")
err := server.AddListener(tcp, &listeners.Config{
Auth: new(auth.Allow),
})
if err != nil {
log.Fatal(err)
}
// Start the server
go func() {
err := server.Serve()
if err != nil {
log.Fatal(err)
}
}()
// Add OnMessage Event Hook
server.Events.OnMessage = func(cl events.Client, pk events.Packet) (pkx events.Packet, err error) {
pkx = pk
if string(pk.Payload) == "hello" {
pkx.Payload = []byte("hello world")
fmt.Printf("< OnMessage modified message from client %s: %s\n", cl.ID, string(pkx.Payload))
} else {
fmt.Printf("< OnMessage received message from client %s: %s\n", cl.ID, string(pkx.Payload))
}
return pkx, nil
}
// Demonstration of directly publishing messages to a topic via the
// `server.Publish` method. Subscribe to `direct/publish` using your
// MQTT client to see the messages.
go func() {
for range time.Tick(time.Second * 10) {
server.Publish("direct/publish", []byte("scheduled message"), false)
fmt.Println("> issued direct message to direct/publish")
}
}()
fmt.Println(aurora.BgMagenta(" Started! "))
<-done
fmt.Println(aurora.BgRed(" Caught Signal "))
server.Close()
fmt.Println(aurora.BgGreen(" Finished "))
}

36
server/events/events.go Normal file
View File

@@ -0,0 +1,36 @@
package events
import (
"github.com/mochi-co/mqtt/server/internal/clients"
"github.com/mochi-co/mqtt/server/internal/packets"
)
type Events struct {
OnMessage // published message receieved.
}
type Packet packets.Packet
type Client struct {
ID string
Listener string
}
// FromClient returns an event client from a client.
func FromClient(cl clients.Client) Client {
return Client{
ID: cl.ID,
Listener: cl.Listener,
}
}
// OnMessage function is called when a publish message is received. Note,
// this hook is ONLY called by connected client publishers, it is not triggered when
// using the direct s.Publish method. The function receives the sent message and the
// data of the client who published it, and allows the packet to be modified
// before it is dispatched to subscribers. If no modification is required, return
// the original packet data. If an error occurs, the original packet will
// be dispatched as if the event hook had not been triggered.
// This function will block message dispatching until it returns. To minimise this,
// have the function open a new goroutine on the embedding side.
type OnMessage func(Client, Packet) (Packet, error)

View File

@@ -77,6 +77,7 @@ func (b *Writer) WriteTo(w io.Writer) (total int, err error) {
}
// Write writes the buffer to the buffer p, returning the number of bytes written.
// The bytes written to the buffer are picked up by WriteTo.
func (b *Writer) Write(p []byte) (total int, err error) {
err = b.awaitEmpty(len(p))
if err != nil {

View File

@@ -305,8 +305,9 @@ func (cl *Client) ReadFixedHeader(fh *packets.FixedHeader) error {
return nil
}
// Read reads new packets from a client connection
func (cl *Client) Read(h func(*Client, packets.Packet) error) error {
// Read loops forever reading new packets from a client connection until
// an error is encountered (or the connection is closed).
func (cl *Client) Read(packetHandler func(*Client, packets.Packet) error) error {
for {
if atomic.LoadInt64(&cl.State.Done) == 1 && cl.r.CapDelta() == 0 {
return nil
@@ -324,7 +325,7 @@ func (cl *Client) Read(h func(*Client, packets.Packet) error) error {
return err
}
err = h(cl, pk) // Process inbound packet.
err = packetHandler(cl, pk) // Process inbound packet.
if err != nil {
return err
}
@@ -437,6 +438,7 @@ func (cl *Client) WritePacket(pk packets.Packet) (n int, err error) {
return
}
// Write the packet bytes to the client byte buffer.
n, err = cl.w.Write(buf.Bytes())
if err != nil {
return

View File

@@ -80,8 +80,8 @@ func (l *TCP) Listen(s *system.Info) error {
return nil
}
// Serve starts waiting for new TCP connections, and calls the connection
// establishment callback for any received.
// Serve starts waiting for new TCP connections, and calls the establish
// connection callback for any received.
func (l *TCP) Serve(establish EstablishFunc) {
for {
if atomic.LoadInt64(&l.end) == 1 {

View File

@@ -9,6 +9,7 @@ import (
"sync/atomic"
"time"
"github.com/mochi-co/mqtt/server/events"
"github.com/mochi-co/mqtt/server/internal/circ"
"github.com/mochi-co/mqtt/server/internal/clients"
"github.com/mochi-co/mqtt/server/internal/packets"
@@ -20,11 +21,7 @@ import (
)
const (
Version = "1.0.0" // the server version.
// maxPacketID is the maximum value of a 16-bit packet ID. If a
// packet ID reaches this number, it resets to 0.
maxPacketID = 65535
Version = "1.0.1" // the server version.
)
var (
@@ -47,14 +44,22 @@ var (
// Server is an MQTT broker server. It should be created with server.New()
// in order to ensure all the internal fields are correctly populated.
type Server struct {
done chan bool // indicate that the server is ending.
bytepool circ.BytesPool // a byte pool for incoming and outgoing packets.
Listeners *listeners.Listeners // listeners are network interfaces which listen for new connections.
Clients *clients.Clients // clients which are known to the broker.
Topics *topics.Index // an index of topic filter subscriptions and retained messages.
System *system.Info // values about the server commonly found in $SYS topics.
Store persistence.Store // a persistent storage backend if desired.
done chan bool // indicate that the server is ending.
bytepool circ.BytesPool // a byte pool for incoming and outgoing packets.
sysTicker *time.Ticker // the interval ticker for sending updating $SYS topics.
inline inlineMessages // channels for direct publishing.
Events events.Events // overrideable event hooks.
}
// inlineMessages contains channels for handling inline (direct) publishing.
type inlineMessages struct {
done chan bool // indicate that the server is ending.
pub chan packets.Packet // a channel of packets to publish to clients
}
// New returns a new instance of an MQTT broker.
@@ -69,6 +74,11 @@ func New() *Server {
Started: time.Now().Unix(),
},
sysTicker: time.NewTicker(SysTopicInterval * time.Millisecond),
inline: inlineMessages{
done: make(chan bool),
pub: make(chan packets.Packet, 1024),
},
Events: events.Events{},
}
// Expose server stats using the system listener so it can be used in the
@@ -119,9 +129,10 @@ func (s *Server) Serve() error {
}
}
go s.eventLoop()
s.Listeners.ServeAll(s.EstablishConnection)
s.publishSysTopics()
go s.eventLoop() // spin up event loop for issuing $SYS values and closing server.
go s.inlineClient() // spin up inline client for direct message publishing.
s.Listeners.ServeAll(s.EstablishConnection) // start listening on all listeners.
s.publishSysTopics() // begin publishing $SYS system values.
return nil
}
@@ -132,6 +143,7 @@ func (s *Server) eventLoop() {
select {
case <-s.done:
s.sysTicker.Stop()
close(s.inline.done)
return
case <-s.sysTicker.C:
s.publishSysTopics()
@@ -139,11 +151,25 @@ func (s *Server) eventLoop() {
}
}
// EstablishConnection establishes a new client when a listener accepts a new
// connection.
// inlineClient loops forever, sending directly-published messages
// from the Publish method to subscribers.
func (s *Server) inlineClient() {
for {
select {
case <-s.inline.done:
close(s.inline.pub)
return
case pk := <-s.inline.pub:
s.publishToSubscribers(pk)
}
}
}
// EstablishConnection establishes a new client when a listener
// accepts a new connection.
func (s *Server) EstablishConnection(lid string, c net.Conn, ac auth.Controller) error {
xbr := s.bytepool.Get() // Get byte buffer from pools for sending and receiving packet data.
xbw := s.bytepool.Get()
xbr := s.bytepool.Get() // Get byte buffer from pools for receiving packet data.
xbw := s.bytepool.Get() // and for sending.
cl := clients.NewClient(c,
circ.NewReaderFromSlice(0, xbr),
@@ -325,11 +351,38 @@ func (s *Server) processPingreq(cl *clients.Client, pk packets.Packet) error {
return nil
}
// Publish creates a publish packet from a payload and sends it to the inline.pub
// channel, where it is written directly to the outgoing byte buffers of any
// clients subscribed to the given topic. Because the message is written directly
// within the server, QoS is inherently 2 (exactly once).
func (s *Server) Publish(topic string, payload []byte, retain bool) error {
if len(topic) >= 4 && topic[0:4] == "$SYS" {
return ErrInvalidTopic
}
pk := packets.Packet{
FixedHeader: packets.FixedHeader{
Type: packets.Publish,
},
TopicName: topic,
Payload: payload,
}
if retain {
s.retainMessage(pk)
}
// handoff packet to s.inline.pub channel for writing to client buffers
// to avoid blocking the calling function.
s.inline.pub <- pk
return nil
}
// processPublish processes a Publish packet.
func (s *Server) processPublish(cl *clients.Client, pk packets.Packet) error {
if len(pk.TopicName) >= 4 && pk.TopicName[0:4] == "$SYS" {
// Clients can't publish to $SYS topics.
return nil
return nil // Clients can't publish to $SYS topics, so fail silently as per spec.
}
if !cl.AC.ACL(cl.Username, pk.TopicName, true) {
@@ -337,22 +390,7 @@ func (s *Server) processPublish(cl *clients.Client, pk packets.Packet) error {
}
if pk.FixedHeader.Retain {
out := pk.PublishCopy()
q := s.Topics.RetainMessage(out)
atomic.AddInt64(&s.System.Retained, q)
if s.Store != nil {
if q == 1 {
s.Store.WriteRetained(persistence.Message{
ID: "ret_" + out.TopicName,
T: persistence.KRetained,
FixedHeader: persistence.FixedHeader(out.FixedHeader),
TopicName: out.TopicName,
Payload: out.Payload,
})
} else {
s.Store.DeleteRetained("ret_" + out.TopicName)
}
}
s.retainMessage(pk)
}
if pk.FixedHeader.Qos > 0 {
@@ -367,21 +405,49 @@ func (s *Server) processPublish(cl *clients.Client, pk packets.Packet) error {
ack.FixedHeader.Type = packets.Pubrec
}
s.writeClient(cl, ack)
// omit errors in case of broken connection / LWT publish. ack send failures
// will be handled by in-flight resending on next reconnect.
s.writeClient(cl, ack)
}
// if an OnMessage hook exists, potentially modify the packet.
if s.Events.OnMessage != nil {
if pkx, err := s.Events.OnMessage(events.FromClient(*cl), events.Packet(pk)); err == nil {
pk = packets.Packet(pkx)
}
}
// write packet to the byte buffers of any clients with matching topic filters.
s.publishToSubscribers(pk)
return nil
}
// retainMessage adds a message to a topic, and if a persistent store is provided,
// adds the message to the store so it can be reloaded if necessary.
func (s *Server) retainMessage(pk packets.Packet) {
out := pk.PublishCopy()
q := s.Topics.RetainMessage(out)
atomic.AddInt64(&s.System.Retained, q)
if s.Store != nil {
if q == 1 {
s.Store.WriteRetained(persistence.Message{
ID: "ret_" + out.TopicName,
T: persistence.KRetained,
FixedHeader: persistence.FixedHeader(out.FixedHeader),
TopicName: out.TopicName,
Payload: out.Payload,
})
} else {
s.Store.DeleteRetained("ret_" + out.TopicName)
}
}
}
// publishToSubscribers publishes a publish packet to all subscribers with
// matching topic filters.
func (s *Server) publishToSubscribers(pk packets.Packet) {
subs := s.Topics.Subscribers(pk.TopicName)
for id, qos := range subs {
for id, qos := range s.Topics.Subscribers(pk.TopicName) {
if client, ok := s.Clients.Get(id); ok {
out := pk.PublishCopy()
if qos > out.FixedHeader.Qos { // Inherit higher desired qos values.

View File

@@ -2,6 +2,7 @@ package server
import (
//"errors"
"fmt"
"io/ioutil"
"net"
"strconv"
@@ -11,6 +12,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/mochi-co/mqtt/server/events"
"github.com/mochi-co/mqtt/server/internal/circ"
"github.com/mochi-co/mqtt/server/internal/clients"
"github.com/mochi-co/mqtt/server/internal/packets"
@@ -650,7 +652,7 @@ func TestServerProcessPublishQoS2(t *testing.T) {
require.Equal(t, int64(0), atomic.LoadInt64(&s.System.Retained))
}
func TestServerProcessPublishUnretain(t *testing.T) {
func TestServerProcessPublishUnretainByEmptyPayload(t *testing.T) {
s, cl1, r1, w1 := setupClient()
s.Clients.Add(cl1)
@@ -840,6 +842,235 @@ func TestServerProcessPublishWriteAckError(t *testing.T) {
require.Error(t, err)
}
func TestServerPublishInline(t *testing.T) {
s, cl1, r1, w1 := setupClient()
cl1.ID = "inline"
s.Clients.Add(cl1)
s.Topics.Subscribe("a/b/+", cl1.ID, 0)
go s.inlineClient()
ack1 := make(chan []byte)
go func() {
buf, err := ioutil.ReadAll(r1)
if err != nil {
panic(err)
}
ack1 <- buf
}()
err := s.Publish("a/b/c", []byte("hello"), false)
require.NoError(t, err)
time.Sleep(10 * time.Millisecond)
w1.Close()
require.Equal(t, []byte{
byte(packets.Publish << 4), 12,
0, 5,
'a', '/', 'b', '/', 'c',
'h', 'e', 'l', 'l', 'o',
}, <-ack1)
require.Equal(t, int64(14), s.System.BytesSent)
close(s.inline.done)
}
func TestServerPublishInlineRetain(t *testing.T) {
s, cl1, r1, w1 := setupClient()
cl1.ID = "inline"
ack1 := make(chan []byte)
go func() {
buf, err := ioutil.ReadAll(r1)
if err != nil {
panic(err)
}
ack1 <- buf
}()
err := s.Publish("a/b/c", []byte("hello"), true)
require.NoError(t, err)
time.Sleep(10 * time.Millisecond)
s.Clients.Add(cl1)
s.Topics.Subscribe("a/b/+", cl1.ID, 0)
go s.inlineClient()
time.Sleep(10 * time.Millisecond)
w1.Close()
require.Equal(t, []byte{
byte(packets.Publish << 4), 12,
0, 5,
'a', '/', 'b', '/', 'c',
'h', 'e', 'l', 'l', 'o',
}, <-ack1)
require.Equal(t, int64(14), s.System.BytesSent)
close(s.inline.done)
}
func TestServerPublishInlineSysTopicError(t *testing.T) {
s, _, _, _ := setupClient()
err := s.Publish("$SYS/stuff", []byte("hello"), false)
require.Error(t, err)
require.Equal(t, int64(0), s.System.BytesSent)
}
func TestServerProcessPublishHookOnMessage(t *testing.T) {
s, cl1, r1, w1 := setupClient()
s.Clients.Add(cl1)
s.Topics.Subscribe("a/b/+", cl1.ID, 0)
var hookedPacket events.Packet
var hookedClient events.Client
s.Events.OnMessage = func(cl events.Client, pk events.Packet) (events.Packet, error) {
hookedClient = cl
hookedPacket = pk
return pk, nil
}
ack1 := make(chan []byte)
go func() {
buf, err := ioutil.ReadAll(r1)
if err != nil {
panic(err)
}
ack1 <- buf
}()
pk1 := packets.Packet{
FixedHeader: packets.FixedHeader{
Type: packets.Publish,
},
TopicName: "a/b/c",
Payload: []byte("hello"),
}
err := s.processPacket(cl1, pk1)
require.NoError(t, err)
time.Sleep(10 * time.Millisecond)
require.Equal(t, events.Client{
ID: "mochi",
Listener: "",
}, hookedClient)
require.Equal(t, events.Packet(pk1), hookedPacket)
w1.Close()
require.Equal(t, []byte{
byte(packets.Publish << 4), 12,
0, 5,
'a', '/', 'b', '/', 'c',
'h', 'e', 'l', 'l', 'o',
}, <-ack1)
require.Equal(t, int64(14), s.System.BytesSent)
}
func TestServerProcessPublishHookOnMessageModify(t *testing.T) {
s, cl1, r1, w1 := setupClient()
s.Clients.Add(cl1)
s.Topics.Subscribe("a/b/+", cl1.ID, 0)
var hookedPacket events.Packet
var hookedClient events.Client
s.Events.OnMessage = func(cl events.Client, pk events.Packet) (events.Packet, error) {
hookedPacket = pk
hookedPacket.Payload = []byte("world")
hookedClient = cl
return hookedPacket, nil
}
ack1 := make(chan []byte)
go func() {
buf, err := ioutil.ReadAll(r1)
if err != nil {
panic(err)
}
ack1 <- buf
}()
pk1 := packets.Packet{
FixedHeader: packets.FixedHeader{
Type: packets.Publish,
},
TopicName: "a/b/c",
Payload: []byte("hello"),
}
err := s.processPacket(cl1, pk1)
require.NoError(t, err)
time.Sleep(10 * time.Millisecond)
require.Equal(t, events.Client{
ID: "mochi",
Listener: "",
}, hookedClient)
w1.Close()
require.Equal(t, []byte{
byte(packets.Publish << 4), 12,
0, 5,
'a', '/', 'b', '/', 'c',
'w', 'o', 'r', 'l', 'd',
}, <-ack1)
require.Equal(t, int64(14), s.System.BytesSent)
}
func TestServerProcessPublishHookOnMessageModifyError(t *testing.T) {
s, cl1, r1, w1 := setupClient()
s.Clients.Add(cl1)
s.Topics.Subscribe("a/b/+", cl1.ID, 0)
s.Events.OnMessage = func(cl events.Client, pk events.Packet) (events.Packet, error) {
pkx := pk
pkx.Payload = []byte("world")
return pkx, fmt.Errorf("error")
}
ack1 := make(chan []byte)
go func() {
buf, err := ioutil.ReadAll(r1)
if err != nil {
panic(err)
}
ack1 <- buf
}()
pk1 := packets.Packet{
FixedHeader: packets.FixedHeader{
Type: packets.Publish,
},
TopicName: "a/b/c",
Payload: []byte("hello"),
}
err := s.processPacket(cl1, pk1)
require.NoError(t, err)
time.Sleep(10 * time.Millisecond)
w1.Close()
require.Equal(t, []byte{
byte(packets.Publish << 4), 12,
0, 5,
'a', '/', 'b', '/', 'c',
'h', 'e', 'l', 'l', 'o',
}, <-ack1)
require.Equal(t, int64(14), s.System.BytesSent)
}
func TestServerProcessPuback(t *testing.T) {
s, cl, _, _ := setupClient()
cl.Inflight.Set(11, clients.InflightMessage{Packet: packets.Packet{PacketID: 11}, Sent: 0})