mirror of
https://github.com/mochi-mqtt/server.git
synced 2025-10-30 19:16:30 +08:00
Compare commits
25 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b277600823 | ||
|
|
685c050fdd | ||
|
|
0abbaf5070 | ||
|
|
1ab1928cff | ||
|
|
8890bb9dd4 | ||
|
|
f9348aaf93 | ||
|
|
c2a42a16ca | ||
|
|
d14d944de9 | ||
|
|
480e60b3f0 | ||
|
|
d4cbf1abdc | ||
|
|
8a1c53432e | ||
|
|
7c7b8d58fe | ||
|
|
ce773b3978 | ||
|
|
f3e7469478 | ||
|
|
b5685ca0ee | ||
|
|
66edb0564c | ||
|
|
1d9fa4199c | ||
|
|
dec880231d | ||
|
|
21d4e54e74 | ||
|
|
aeb4190733 | ||
|
|
484e4abd56 | ||
|
|
d51bad30fc | ||
|
|
060fbffa79 | ||
|
|
7c68614912 | ||
|
|
124be96c0e |
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
cmd/mqtt
|
||||
.DS_Store
|
||||
175
README.md
175
README.md
@@ -4,7 +4,6 @@
|
||||
[](https://travis-ci.com/mochi-co/mqtt)
|
||||
[](https://github.com/mochi-co/mqtt/issues)
|
||||
[](https://codecov.io/gh/mochi-co/mqtt)
|
||||
[](https://www.codacy.com/app/mochi-co/mqtt?utm_source=github.com&utm_medium=referral&utm_content=mochi-co/mqtt&utm_campaign=Badge_Grade)
|
||||
[](https://pkg.go.dev/github.com/mochi-co/mqtt)
|
||||
|
||||
</p>
|
||||
@@ -25,78 +24,13 @@ MQTT stands for MQ Telemetry Transport. It is a publish/subscribe, extremely sim
|
||||
- TCP, Websocket, (including SSL/TLS) and Dashboard listeners.
|
||||
- Interfaces for Client Authentication and Topic access control.
|
||||
- Bolt-backed persistence and storage interfaces.
|
||||
- Directly Publishing from embedding service (`s.Publish(topic, message, retain)`).
|
||||
|
||||
#### Roadmap
|
||||
- Inline Pub-sub (without client) and event hooks
|
||||
- Event Hooks (eg. provide handler functions for `onMessage`).
|
||||
- Docker Image
|
||||
- MQTT v5 compatibility
|
||||
|
||||
#### Performance (messages/second)
|
||||
Performance benchmarks were tested using [MQTT-Stresser](https://github.com/inovex/mqtt-stresser) on a 13-inch, Early 2015 Macbook Pro (2.7 GHz Intel Core i5). Taking into account bursts of high and low throughput, the median scores are the most useful. Higher is better. SEND = Publish throughput, RECV = Subscribe throughput.
|
||||
|
||||
> As usual, any performance benchmarks should be taken with a pinch of salt, but are shown to demonstrate typical throughput compared to the other leading MQTT brokers.
|
||||
|
||||
**Single Client, 10,000 messages**
|
||||
_With only 1 client, there is no variation in throughput so the benchmark is reports the same number for high, low, and median._
|
||||
|
||||

|
||||
|
||||
`mqtt-stresser -broker tcp://localhost:1883 -num-clients=1 -num-messages=10000`
|
||||
|
||||
| | Mochi | Mosquitto | EMQX | VerneMQ | Mosca |
|
||||
| :----------- | --------: | ----------: | -------: | --------: | --------:
|
||||
| SEND Max | 36505 | 30597 | 27202 | 32782 | 30125 |
|
||||
| SEND Min | 36505 | 30597 | 27202 | 32782 | 30125 |
|
||||
| SEND Median | 36505 | 30597 | 27202 |32782 | 30125 |
|
||||
| RECV Max | 152221 | 59130 | 7879 | 17551 | 9145 |
|
||||
| RECV Min | 152221 | 59130 | 7879 | 17551 | 9145 |
|
||||
| RECV Median | 152221 | 59130 | 7879 | 17551 | 9145 |
|
||||
|
||||
**10 Clients, 1,000 Messages**
|
||||
|
||||

|
||||
|
||||
`mqtt-stresser -broker tcp://localhost:1883 -num-clients=10 -num-messages=1000`
|
||||
|
||||
| | Mochi | Mosquitto | EMQX | VerneMQ | Mosca |
|
||||
| :----------- | --------: | ----------: | -------: | --------: | --------:
|
||||
| SEND Max | 37193 | 15775 | 17455 | 34138 | 36575 |
|
||||
| SEND Min | 6529 | 6446 | 7714 | 8583 | 7383 |
|
||||
| SEND Median | 15127 | 7813 | 10305 | 9887 | 8169 |
|
||||
| RECV Max | 33535 | 3710 | 3022 | 4534 | 9411 |
|
||||
| RECV Min | 7484 | 2661 | 1689 | 2021 | 2275 |
|
||||
| RECV Median | 11427 | 3142 | 1831 | 2468 | 4692 |
|
||||
|
||||
**10 Clients, 10,000 Messages**
|
||||
|
||||

|
||||
|
||||
`mqtt-stresser -broker tcp://localhost:1883 -num-clients=10 -num-messages=10000`
|
||||
|
||||
| | Mochi | Mosquitto | EMQX | VerneMQ | Mosca |
|
||||
| :----------- | --------: | ----------: | -------: | --------: | --------:
|
||||
| SEND Max | 13153 | 13270 | 12229 | 13025 | 38446 |
|
||||
| SEND Min | 8728 | 8513 | 8193 | 6483 | 3889 |
|
||||
| SEND Median | 9045 | 9532 | 9252 | 8031 | 9210 |
|
||||
| RECV Max | 20774 | 5052 | 2093 | 2071 | 43008 |
|
||||
| RECV Min | 10718 |3995 | 1531 | 1673 | 18764 |
|
||||
| RECV Median | 16339 | 4607 | 1620 | 1907 | 33524 |
|
||||
|
||||
**500 Clients, 100 Messages**
|
||||
|
||||

|
||||
|
||||
`mqtt-stresser -broker tcp://localhost:1883 -num-clients=500 -num-messages=100`
|
||||
|
||||
| | Mochi | Mosquitto | EMQX | VerneMQ | Mosca |
|
||||
| :----------- | --------: | ----------: | -------: | --------: | --------:
|
||||
| SEND Max | 70688 | 72686 | 71392 | 75336 | 73192 |
|
||||
| SEND Min | 1021 | 2577 | 1603 | 8417 | 2344 |
|
||||
| SEND Median | 49871 | 33076 | 33637 | 35200 | 31312 |
|
||||
| RECV Max | 116163 | 4215 | 3427 | 5484 | 10100 |
|
||||
| RECV Min | 1044 | 156 | 56 | 83 | 169 |
|
||||
| RECV Median | 24398 | 208 | 94 | 413 | 474 |
|
||||
|
||||
#### Using the Broker
|
||||
Mochi MQTT can be used as a standalone broker. Simply checkout this repository and run the `main.go` entrypoint in the `cmd` folder which will expose tcp (:1883), websocket (:1882), and dashboard (:8080) listeners. A docker image is coming soon.
|
||||
|
||||
@@ -169,6 +103,43 @@ SSL may be configured on both the TCP and Websocket listeners by providing a pub
|
||||
```
|
||||
> Note the mandatory inclusion of the Auth Controller!
|
||||
|
||||
#### Event Hooks
|
||||
Some basic Event Hooks have been added, allowing you to call your own functions when certain events occur. The execution of the functions are blocking - if necessary, please handle goroutines within the embedding service.
|
||||
|
||||
##### OnMessage
|
||||
`server.Events.OnMessage` is called when a Publish packet is received. The function receives the published message and information about the client who published it. This function will block message dispatching until it returns.
|
||||
|
||||
> This hook is only triggered when a message is received by clients. It is not triggered when using the direct `server.Publish` method.
|
||||
|
||||
```go
|
||||
import "github.com/mochi-co/mqtt/server/events"
|
||||
|
||||
server.Events.OnMessage = func(cl events.Client, pk events.Packet) (pkx events.Packet, err error) {
|
||||
if string(pk.Payload) == "hello" {
|
||||
pkx = pk
|
||||
pkx.Payload = []byte("hello world")
|
||||
return pkx, nil
|
||||
}
|
||||
|
||||
return pk, nil
|
||||
}
|
||||
```
|
||||
|
||||
A working example can be found in the `examples/events` folder. Please open an issue if there is a particular event hook you are interested in!
|
||||
|
||||
#### Direct Publishing
|
||||
When the broker is being embedded in a larger codebase, it can be useful to be able to publish messages directly to clients without having to implement a loopback TCP connection with an MQTT client. The `Publish` method allows you to inject publish messages directly into a queue to be delivered to any clients with matching topic filters. The `Retain` flag is supported.
|
||||
|
||||
```go
|
||||
// func (s *Server) Publish(topic string, payload []byte, retain bool) error
|
||||
err := s.Publish("a/b/c", []byte("hello"), false)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
```
|
||||
|
||||
A working example can be found in the `examples/events` folder.
|
||||
|
||||
#### Data Persistence
|
||||
Mochi MQTT provides a `persistence.Store` interface for developing and attaching persistent stores to the broker. The default persistence mechanism packaged with the broker is backed by [Bolt](https://github.com/etcd-io/bbolt) and can be enabled by assigning a `*bolt.Store` to the server.
|
||||
```go
|
||||
@@ -183,6 +154,74 @@ Mochi MQTT provides a `persistence.Store` interface for developing and attaching
|
||||
#### Paho Interoperability Test
|
||||
You can check the broker against the [Paho Interoperability Test](https://github.com/eclipse/paho.mqtt.testing/tree/master/interoperability) by starting the broker using `examples/paho/main.go`, and then running the test with `python3 client_test.py` from the _interoperability_ folder.
|
||||
|
||||
|
||||
#### Performance (messages/second)
|
||||
Performance benchmarks were tested using [MQTT-Stresser](https://github.com/inovex/mqtt-stresser) on a 13-inch, Early 2015 Macbook Pro (2.7 GHz Intel Core i5). Taking into account bursts of high and low throughput, the median scores are the most useful. Higher is better. SEND = Publish throughput, RECV = Subscribe throughput.
|
||||
|
||||
> As usual, any performance benchmarks should be taken with a pinch of salt, but are shown to demonstrate typical throughput compared to the other leading MQTT brokers.
|
||||
|
||||
**Single Client, 10,000 messages**
|
||||
_With only 1 client, there is no variation in throughput so the benchmark is reports the same number for high, low, and median._
|
||||
|
||||

|
||||
|
||||
`mqtt-stresser -broker tcp://localhost:1883 -num-clients=1 -num-messages=10000`
|
||||
|
||||
| | Mochi | Mosquitto | EMQX | VerneMQ | Mosca |
|
||||
| :----------- | --------: | ----------: | -------: | --------: | --------:
|
||||
| SEND Max | 36505 | 30597 | 27202 | 32782 | 30125 |
|
||||
| SEND Min | 36505 | 30597 | 27202 | 32782 | 30125 |
|
||||
| SEND Median | 36505 | 30597 | 27202 |32782 | 30125 |
|
||||
| RECV Max | 152221 | 59130 | 7879 | 17551 | 9145 |
|
||||
| RECV Min | 152221 | 59130 | 7879 | 17551 | 9145 |
|
||||
| RECV Median | 152221 | 59130 | 7879 | 17551 | 9145 |
|
||||
|
||||
**10 Clients, 1,000 Messages**
|
||||
|
||||

|
||||
|
||||
`mqtt-stresser -broker tcp://localhost:1883 -num-clients=10 -num-messages=1000`
|
||||
|
||||
| | Mochi | Mosquitto | EMQX | VerneMQ | Mosca |
|
||||
| :----------- | --------: | ----------: | -------: | --------: | --------:
|
||||
| SEND Max | 37193 | 15775 | 17455 | 34138 | 36575 |
|
||||
| SEND Min | 6529 | 6446 | 7714 | 8583 | 7383 |
|
||||
| SEND Median | 15127 | 7813 | 10305 | 9887 | 8169 |
|
||||
| RECV Max | 33535 | 3710 | 3022 | 4534 | 9411 |
|
||||
| RECV Min | 7484 | 2661 | 1689 | 2021 | 2275 |
|
||||
| RECV Median | 11427 | 3142 | 1831 | 2468 | 4692 |
|
||||
|
||||
**10 Clients, 10,000 Messages**
|
||||
|
||||

|
||||
|
||||
`mqtt-stresser -broker tcp://localhost:1883 -num-clients=10 -num-messages=10000`
|
||||
|
||||
| | Mochi | Mosquitto | EMQX | VerneMQ | Mosca |
|
||||
| :----------- | --------: | ----------: | -------: | --------: | --------:
|
||||
| SEND Max | 13153 | 13270 | 12229 | 13025 | 38446 |
|
||||
| SEND Min | 8728 | 8513 | 8193 | 6483 | 3889 |
|
||||
| SEND Median | 9045 | 9532 | 9252 | 8031 | 9210 |
|
||||
| RECV Max | 20774 | 5052 | 2093 | 2071 | 43008 |
|
||||
| RECV Min | 10718 |3995 | 1531 | 1673 | 18764 |
|
||||
| RECV Median | 16339 | 4607 | 1620 | 1907 | 33524 |
|
||||
|
||||
**500 Clients, 100 Messages**
|
||||
|
||||

|
||||
|
||||
`mqtt-stresser -broker tcp://localhost:1883 -num-clients=500 -num-messages=100`
|
||||
|
||||
| | Mochi | Mosquitto | EMQX | VerneMQ | Mosca |
|
||||
| :----------- | --------: | ----------: | -------: | --------: | --------:
|
||||
| SEND Max | 70688 | 72686 | 71392 | 75336 | 73192 |
|
||||
| SEND Min | 1021 | 2577 | 1603 | 8417 | 2344 |
|
||||
| SEND Median | 49871 | 33076 | 33637 | 35200 | 31312 |
|
||||
| RECV Max | 116163 | 4215 | 3427 | 5484 | 10100 |
|
||||
| RECV Min | 1044 | 156 | 56 | 83 | 169 |
|
||||
| RECV Median | 24398 | 208 | 94 | 413 | 474 |
|
||||
|
||||
|
||||
## Contributions
|
||||
Contributions and feedback are both welcomed and encouraged! Open an [issue](https://github.com/mochi-co/mqtt/issues) to report a bug, ask a question, or make a feature request.
|
||||
|
||||
|
||||
77
examples/events/main.go
Normal file
77
examples/events/main.go
Normal file
@@ -0,0 +1,77 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/logrusorgru/aurora"
|
||||
|
||||
mqtt "github.com/mochi-co/mqtt/server"
|
||||
"github.com/mochi-co/mqtt/server/events"
|
||||
"github.com/mochi-co/mqtt/server/listeners"
|
||||
"github.com/mochi-co/mqtt/server/listeners/auth"
|
||||
)
|
||||
|
||||
func main() {
|
||||
sigs := make(chan os.Signal, 1)
|
||||
done := make(chan bool, 1)
|
||||
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
||||
go func() {
|
||||
<-sigs
|
||||
done <- true
|
||||
}()
|
||||
|
||||
fmt.Println(aurora.Magenta("Mochi MQTT Server initializing..."), aurora.Cyan("TCP"))
|
||||
|
||||
server := mqtt.New()
|
||||
tcp := listeners.NewTCP("t1", ":1883")
|
||||
err := server.AddListener(tcp, &listeners.Config{
|
||||
Auth: new(auth.Allow),
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
// Start the server
|
||||
go func() {
|
||||
err := server.Serve()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Add OnMessage Event Hook
|
||||
server.Events.OnMessage = func(cl events.Client, pk events.Packet) (pkx events.Packet, err error) {
|
||||
pkx = pk
|
||||
if string(pk.Payload) == "hello" {
|
||||
pkx.Payload = []byte("hello world")
|
||||
fmt.Printf("< OnMessage modified message from client %s: %s\n", cl.ID, string(pkx.Payload))
|
||||
} else {
|
||||
fmt.Printf("< OnMessage received message from client %s: %s\n", cl.ID, string(pkx.Payload))
|
||||
}
|
||||
|
||||
return pkx, nil
|
||||
}
|
||||
|
||||
// Demonstration of directly publishing messages to a topic via the
|
||||
// `server.Publish` method. Subscribe to `direct/publish` using your
|
||||
// MQTT client to see the messages.
|
||||
go func() {
|
||||
for range time.Tick(time.Second * 10) {
|
||||
server.Publish("direct/publish", []byte("scheduled message"), false)
|
||||
fmt.Println("> issued direct message to direct/publish")
|
||||
}
|
||||
}()
|
||||
|
||||
fmt.Println(aurora.BgMagenta(" Started! "))
|
||||
|
||||
<-done
|
||||
fmt.Println(aurora.BgRed(" Caught Signal "))
|
||||
|
||||
server.Close()
|
||||
fmt.Println(aurora.BgGreen(" Finished "))
|
||||
}
|
||||
36
server/events/events.go
Normal file
36
server/events/events.go
Normal file
@@ -0,0 +1,36 @@
|
||||
package events
|
||||
|
||||
import (
|
||||
"github.com/mochi-co/mqtt/server/internal/clients"
|
||||
"github.com/mochi-co/mqtt/server/internal/packets"
|
||||
)
|
||||
|
||||
type Events struct {
|
||||
OnMessage // published message receieved.
|
||||
}
|
||||
|
||||
type Packet packets.Packet
|
||||
|
||||
type Client struct {
|
||||
ID string
|
||||
Listener string
|
||||
}
|
||||
|
||||
// FromClient returns an event client from a client.
|
||||
func FromClient(cl clients.Client) Client {
|
||||
return Client{
|
||||
ID: cl.ID,
|
||||
Listener: cl.Listener,
|
||||
}
|
||||
}
|
||||
|
||||
// OnMessage function is called when a publish message is received. Note,
|
||||
// this hook is ONLY called by connected client publishers, it is not triggered when
|
||||
// using the direct s.Publish method. The function receives the sent message and the
|
||||
// data of the client who published it, and allows the packet to be modified
|
||||
// before it is dispatched to subscribers. If no modification is required, return
|
||||
// the original packet data. If an error occurs, the original packet will
|
||||
// be dispatched as if the event hook had not been triggered.
|
||||
// This function will block message dispatching until it returns. To minimise this,
|
||||
// have the function open a new goroutine on the embedding side.
|
||||
type OnMessage func(Client, Packet) (Packet, error)
|
||||
@@ -77,6 +77,7 @@ func (b *Writer) WriteTo(w io.Writer) (total int, err error) {
|
||||
}
|
||||
|
||||
// Write writes the buffer to the buffer p, returning the number of bytes written.
|
||||
// The bytes written to the buffer are picked up by WriteTo.
|
||||
func (b *Writer) Write(p []byte) (total int, err error) {
|
||||
err = b.awaitEmpty(len(p))
|
||||
if err != nil {
|
||||
|
||||
@@ -305,8 +305,9 @@ func (cl *Client) ReadFixedHeader(fh *packets.FixedHeader) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Read reads new packets from a client connection
|
||||
func (cl *Client) Read(h func(*Client, packets.Packet) error) error {
|
||||
// Read loops forever reading new packets from a client connection until
|
||||
// an error is encountered (or the connection is closed).
|
||||
func (cl *Client) Read(packetHandler func(*Client, packets.Packet) error) error {
|
||||
for {
|
||||
if atomic.LoadInt64(&cl.State.Done) == 1 && cl.r.CapDelta() == 0 {
|
||||
return nil
|
||||
@@ -324,7 +325,7 @@ func (cl *Client) Read(h func(*Client, packets.Packet) error) error {
|
||||
return err
|
||||
}
|
||||
|
||||
err = h(cl, pk) // Process inbound packet.
|
||||
err = packetHandler(cl, pk) // Process inbound packet.
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -437,6 +438,7 @@ func (cl *Client) WritePacket(pk packets.Packet) (n int, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// Write the packet bytes to the client byte buffer.
|
||||
n, err = cl.w.Write(buf.Bytes())
|
||||
if err != nil {
|
||||
return
|
||||
|
||||
@@ -80,8 +80,8 @@ func (l *TCP) Listen(s *system.Info) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Serve starts waiting for new TCP connections, and calls the connection
|
||||
// establishment callback for any received.
|
||||
// Serve starts waiting for new TCP connections, and calls the establish
|
||||
// connection callback for any received.
|
||||
func (l *TCP) Serve(establish EstablishFunc) {
|
||||
for {
|
||||
if atomic.LoadInt64(&l.end) == 1 {
|
||||
|
||||
136
server/server.go
136
server/server.go
@@ -9,6 +9,7 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/mochi-co/mqtt/server/events"
|
||||
"github.com/mochi-co/mqtt/server/internal/circ"
|
||||
"github.com/mochi-co/mqtt/server/internal/clients"
|
||||
"github.com/mochi-co/mqtt/server/internal/packets"
|
||||
@@ -20,11 +21,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
Version = "1.0.0" // the server version.
|
||||
|
||||
// maxPacketID is the maximum value of a 16-bit packet ID. If a
|
||||
// packet ID reaches this number, it resets to 0.
|
||||
maxPacketID = 65535
|
||||
Version = "1.0.1" // the server version.
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -47,14 +44,22 @@ var (
|
||||
// Server is an MQTT broker server. It should be created with server.New()
|
||||
// in order to ensure all the internal fields are correctly populated.
|
||||
type Server struct {
|
||||
done chan bool // indicate that the server is ending.
|
||||
bytepool circ.BytesPool // a byte pool for incoming and outgoing packets.
|
||||
Listeners *listeners.Listeners // listeners are network interfaces which listen for new connections.
|
||||
Clients *clients.Clients // clients which are known to the broker.
|
||||
Topics *topics.Index // an index of topic filter subscriptions and retained messages.
|
||||
System *system.Info // values about the server commonly found in $SYS topics.
|
||||
Store persistence.Store // a persistent storage backend if desired.
|
||||
done chan bool // indicate that the server is ending.
|
||||
bytepool circ.BytesPool // a byte pool for incoming and outgoing packets.
|
||||
sysTicker *time.Ticker // the interval ticker for sending updating $SYS topics.
|
||||
inline inlineMessages // channels for direct publishing.
|
||||
Events events.Events // overrideable event hooks.
|
||||
}
|
||||
|
||||
// inlineMessages contains channels for handling inline (direct) publishing.
|
||||
type inlineMessages struct {
|
||||
done chan bool // indicate that the server is ending.
|
||||
pub chan packets.Packet // a channel of packets to publish to clients
|
||||
}
|
||||
|
||||
// New returns a new instance of an MQTT broker.
|
||||
@@ -69,6 +74,11 @@ func New() *Server {
|
||||
Started: time.Now().Unix(),
|
||||
},
|
||||
sysTicker: time.NewTicker(SysTopicInterval * time.Millisecond),
|
||||
inline: inlineMessages{
|
||||
done: make(chan bool),
|
||||
pub: make(chan packets.Packet, 1024),
|
||||
},
|
||||
Events: events.Events{},
|
||||
}
|
||||
|
||||
// Expose server stats using the system listener so it can be used in the
|
||||
@@ -119,9 +129,10 @@ func (s *Server) Serve() error {
|
||||
}
|
||||
}
|
||||
|
||||
go s.eventLoop()
|
||||
s.Listeners.ServeAll(s.EstablishConnection)
|
||||
s.publishSysTopics()
|
||||
go s.eventLoop() // spin up event loop for issuing $SYS values and closing server.
|
||||
go s.inlineClient() // spin up inline client for direct message publishing.
|
||||
s.Listeners.ServeAll(s.EstablishConnection) // start listening on all listeners.
|
||||
s.publishSysTopics() // begin publishing $SYS system values.
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -132,6 +143,7 @@ func (s *Server) eventLoop() {
|
||||
select {
|
||||
case <-s.done:
|
||||
s.sysTicker.Stop()
|
||||
close(s.inline.done)
|
||||
return
|
||||
case <-s.sysTicker.C:
|
||||
s.publishSysTopics()
|
||||
@@ -139,11 +151,25 @@ func (s *Server) eventLoop() {
|
||||
}
|
||||
}
|
||||
|
||||
// EstablishConnection establishes a new client when a listener accepts a new
|
||||
// connection.
|
||||
// inlineClient loops forever, sending directly-published messages
|
||||
// from the Publish method to subscribers.
|
||||
func (s *Server) inlineClient() {
|
||||
for {
|
||||
select {
|
||||
case <-s.inline.done:
|
||||
close(s.inline.pub)
|
||||
return
|
||||
case pk := <-s.inline.pub:
|
||||
s.publishToSubscribers(pk)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// EstablishConnection establishes a new client when a listener
|
||||
// accepts a new connection.
|
||||
func (s *Server) EstablishConnection(lid string, c net.Conn, ac auth.Controller) error {
|
||||
xbr := s.bytepool.Get() // Get byte buffer from pools for sending and receiving packet data.
|
||||
xbw := s.bytepool.Get()
|
||||
xbr := s.bytepool.Get() // Get byte buffer from pools for receiving packet data.
|
||||
xbw := s.bytepool.Get() // and for sending.
|
||||
|
||||
cl := clients.NewClient(c,
|
||||
circ.NewReaderFromSlice(0, xbr),
|
||||
@@ -325,11 +351,38 @@ func (s *Server) processPingreq(cl *clients.Client, pk packets.Packet) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Publish creates a publish packet from a payload and sends it to the inline.pub
|
||||
// channel, where it is written directly to the outgoing byte buffers of any
|
||||
// clients subscribed to the given topic. Because the message is written directly
|
||||
// within the server, QoS is inherently 2 (exactly once).
|
||||
func (s *Server) Publish(topic string, payload []byte, retain bool) error {
|
||||
if len(topic) >= 4 && topic[0:4] == "$SYS" {
|
||||
return ErrInvalidTopic
|
||||
}
|
||||
|
||||
pk := packets.Packet{
|
||||
FixedHeader: packets.FixedHeader{
|
||||
Type: packets.Publish,
|
||||
},
|
||||
TopicName: topic,
|
||||
Payload: payload,
|
||||
}
|
||||
|
||||
if retain {
|
||||
s.retainMessage(pk)
|
||||
}
|
||||
|
||||
// handoff packet to s.inline.pub channel for writing to client buffers
|
||||
// to avoid blocking the calling function.
|
||||
s.inline.pub <- pk
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// processPublish processes a Publish packet.
|
||||
func (s *Server) processPublish(cl *clients.Client, pk packets.Packet) error {
|
||||
if len(pk.TopicName) >= 4 && pk.TopicName[0:4] == "$SYS" {
|
||||
// Clients can't publish to $SYS topics.
|
||||
return nil
|
||||
return nil // Clients can't publish to $SYS topics, so fail silently as per spec.
|
||||
}
|
||||
|
||||
if !cl.AC.ACL(cl.Username, pk.TopicName, true) {
|
||||
@@ -337,22 +390,7 @@ func (s *Server) processPublish(cl *clients.Client, pk packets.Packet) error {
|
||||
}
|
||||
|
||||
if pk.FixedHeader.Retain {
|
||||
out := pk.PublishCopy()
|
||||
q := s.Topics.RetainMessage(out)
|
||||
atomic.AddInt64(&s.System.Retained, q)
|
||||
if s.Store != nil {
|
||||
if q == 1 {
|
||||
s.Store.WriteRetained(persistence.Message{
|
||||
ID: "ret_" + out.TopicName,
|
||||
T: persistence.KRetained,
|
||||
FixedHeader: persistence.FixedHeader(out.FixedHeader),
|
||||
TopicName: out.TopicName,
|
||||
Payload: out.Payload,
|
||||
})
|
||||
} else {
|
||||
s.Store.DeleteRetained("ret_" + out.TopicName)
|
||||
}
|
||||
}
|
||||
s.retainMessage(pk)
|
||||
}
|
||||
|
||||
if pk.FixedHeader.Qos > 0 {
|
||||
@@ -367,21 +405,49 @@ func (s *Server) processPublish(cl *clients.Client, pk packets.Packet) error {
|
||||
ack.FixedHeader.Type = packets.Pubrec
|
||||
}
|
||||
|
||||
s.writeClient(cl, ack)
|
||||
// omit errors in case of broken connection / LWT publish. ack send failures
|
||||
// will be handled by in-flight resending on next reconnect.
|
||||
s.writeClient(cl, ack)
|
||||
}
|
||||
|
||||
// if an OnMessage hook exists, potentially modify the packet.
|
||||
if s.Events.OnMessage != nil {
|
||||
if pkx, err := s.Events.OnMessage(events.FromClient(*cl), events.Packet(pk)); err == nil {
|
||||
pk = packets.Packet(pkx)
|
||||
}
|
||||
}
|
||||
|
||||
// write packet to the byte buffers of any clients with matching topic filters.
|
||||
s.publishToSubscribers(pk)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// retainMessage adds a message to a topic, and if a persistent store is provided,
|
||||
// adds the message to the store so it can be reloaded if necessary.
|
||||
func (s *Server) retainMessage(pk packets.Packet) {
|
||||
out := pk.PublishCopy()
|
||||
q := s.Topics.RetainMessage(out)
|
||||
atomic.AddInt64(&s.System.Retained, q)
|
||||
if s.Store != nil {
|
||||
if q == 1 {
|
||||
s.Store.WriteRetained(persistence.Message{
|
||||
ID: "ret_" + out.TopicName,
|
||||
T: persistence.KRetained,
|
||||
FixedHeader: persistence.FixedHeader(out.FixedHeader),
|
||||
TopicName: out.TopicName,
|
||||
Payload: out.Payload,
|
||||
})
|
||||
} else {
|
||||
s.Store.DeleteRetained("ret_" + out.TopicName)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// publishToSubscribers publishes a publish packet to all subscribers with
|
||||
// matching topic filters.
|
||||
func (s *Server) publishToSubscribers(pk packets.Packet) {
|
||||
subs := s.Topics.Subscribers(pk.TopicName)
|
||||
for id, qos := range subs {
|
||||
for id, qos := range s.Topics.Subscribers(pk.TopicName) {
|
||||
if client, ok := s.Clients.Get(id); ok {
|
||||
out := pk.PublishCopy()
|
||||
if qos > out.FixedHeader.Qos { // Inherit higher desired qos values.
|
||||
|
||||
@@ -2,6 +2,7 @@ package server
|
||||
|
||||
import (
|
||||
//"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"strconv"
|
||||
@@ -11,6 +12,7 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/mochi-co/mqtt/server/events"
|
||||
"github.com/mochi-co/mqtt/server/internal/circ"
|
||||
"github.com/mochi-co/mqtt/server/internal/clients"
|
||||
"github.com/mochi-co/mqtt/server/internal/packets"
|
||||
@@ -650,7 +652,7 @@ func TestServerProcessPublishQoS2(t *testing.T) {
|
||||
require.Equal(t, int64(0), atomic.LoadInt64(&s.System.Retained))
|
||||
}
|
||||
|
||||
func TestServerProcessPublishUnretain(t *testing.T) {
|
||||
func TestServerProcessPublishUnretainByEmptyPayload(t *testing.T) {
|
||||
s, cl1, r1, w1 := setupClient()
|
||||
s.Clients.Add(cl1)
|
||||
|
||||
@@ -840,6 +842,235 @@ func TestServerProcessPublishWriteAckError(t *testing.T) {
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestServerPublishInline(t *testing.T) {
|
||||
s, cl1, r1, w1 := setupClient()
|
||||
cl1.ID = "inline"
|
||||
s.Clients.Add(cl1)
|
||||
s.Topics.Subscribe("a/b/+", cl1.ID, 0)
|
||||
go s.inlineClient()
|
||||
|
||||
ack1 := make(chan []byte)
|
||||
go func() {
|
||||
buf, err := ioutil.ReadAll(r1)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
ack1 <- buf
|
||||
}()
|
||||
|
||||
err := s.Publish("a/b/c", []byte("hello"), false)
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
w1.Close()
|
||||
|
||||
require.Equal(t, []byte{
|
||||
byte(packets.Publish << 4), 12,
|
||||
0, 5,
|
||||
'a', '/', 'b', '/', 'c',
|
||||
'h', 'e', 'l', 'l', 'o',
|
||||
}, <-ack1)
|
||||
|
||||
require.Equal(t, int64(14), s.System.BytesSent)
|
||||
|
||||
close(s.inline.done)
|
||||
}
|
||||
|
||||
func TestServerPublishInlineRetain(t *testing.T) {
|
||||
s, cl1, r1, w1 := setupClient()
|
||||
cl1.ID = "inline"
|
||||
|
||||
ack1 := make(chan []byte)
|
||||
go func() {
|
||||
buf, err := ioutil.ReadAll(r1)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
ack1 <- buf
|
||||
}()
|
||||
|
||||
err := s.Publish("a/b/c", []byte("hello"), true)
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
s.Clients.Add(cl1)
|
||||
s.Topics.Subscribe("a/b/+", cl1.ID, 0)
|
||||
go s.inlineClient()
|
||||
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
w1.Close()
|
||||
|
||||
require.Equal(t, []byte{
|
||||
byte(packets.Publish << 4), 12,
|
||||
0, 5,
|
||||
'a', '/', 'b', '/', 'c',
|
||||
'h', 'e', 'l', 'l', 'o',
|
||||
}, <-ack1)
|
||||
|
||||
require.Equal(t, int64(14), s.System.BytesSent)
|
||||
|
||||
close(s.inline.done)
|
||||
}
|
||||
|
||||
func TestServerPublishInlineSysTopicError(t *testing.T) {
|
||||
s, _, _, _ := setupClient()
|
||||
|
||||
err := s.Publish("$SYS/stuff", []byte("hello"), false)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, int64(0), s.System.BytesSent)
|
||||
}
|
||||
|
||||
func TestServerProcessPublishHookOnMessage(t *testing.T) {
|
||||
s, cl1, r1, w1 := setupClient()
|
||||
s.Clients.Add(cl1)
|
||||
s.Topics.Subscribe("a/b/+", cl1.ID, 0)
|
||||
|
||||
var hookedPacket events.Packet
|
||||
var hookedClient events.Client
|
||||
s.Events.OnMessage = func(cl events.Client, pk events.Packet) (events.Packet, error) {
|
||||
hookedClient = cl
|
||||
hookedPacket = pk
|
||||
return pk, nil
|
||||
}
|
||||
|
||||
ack1 := make(chan []byte)
|
||||
go func() {
|
||||
buf, err := ioutil.ReadAll(r1)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
ack1 <- buf
|
||||
}()
|
||||
|
||||
pk1 := packets.Packet{
|
||||
FixedHeader: packets.FixedHeader{
|
||||
Type: packets.Publish,
|
||||
},
|
||||
TopicName: "a/b/c",
|
||||
Payload: []byte("hello"),
|
||||
}
|
||||
err := s.processPacket(cl1, pk1)
|
||||
|
||||
require.NoError(t, err)
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
require.Equal(t, events.Client{
|
||||
ID: "mochi",
|
||||
Listener: "",
|
||||
}, hookedClient)
|
||||
|
||||
require.Equal(t, events.Packet(pk1), hookedPacket)
|
||||
|
||||
w1.Close()
|
||||
|
||||
require.Equal(t, []byte{
|
||||
byte(packets.Publish << 4), 12,
|
||||
0, 5,
|
||||
'a', '/', 'b', '/', 'c',
|
||||
'h', 'e', 'l', 'l', 'o',
|
||||
}, <-ack1)
|
||||
|
||||
require.Equal(t, int64(14), s.System.BytesSent)
|
||||
}
|
||||
|
||||
func TestServerProcessPublishHookOnMessageModify(t *testing.T) {
|
||||
s, cl1, r1, w1 := setupClient()
|
||||
s.Clients.Add(cl1)
|
||||
s.Topics.Subscribe("a/b/+", cl1.ID, 0)
|
||||
|
||||
var hookedPacket events.Packet
|
||||
var hookedClient events.Client
|
||||
s.Events.OnMessage = func(cl events.Client, pk events.Packet) (events.Packet, error) {
|
||||
hookedPacket = pk
|
||||
hookedPacket.Payload = []byte("world")
|
||||
hookedClient = cl
|
||||
return hookedPacket, nil
|
||||
}
|
||||
|
||||
ack1 := make(chan []byte)
|
||||
go func() {
|
||||
buf, err := ioutil.ReadAll(r1)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
ack1 <- buf
|
||||
}()
|
||||
|
||||
pk1 := packets.Packet{
|
||||
FixedHeader: packets.FixedHeader{
|
||||
Type: packets.Publish,
|
||||
},
|
||||
TopicName: "a/b/c",
|
||||
Payload: []byte("hello"),
|
||||
}
|
||||
err := s.processPacket(cl1, pk1)
|
||||
|
||||
require.NoError(t, err)
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
require.Equal(t, events.Client{
|
||||
ID: "mochi",
|
||||
Listener: "",
|
||||
}, hookedClient)
|
||||
|
||||
w1.Close()
|
||||
|
||||
require.Equal(t, []byte{
|
||||
byte(packets.Publish << 4), 12,
|
||||
0, 5,
|
||||
'a', '/', 'b', '/', 'c',
|
||||
'w', 'o', 'r', 'l', 'd',
|
||||
}, <-ack1)
|
||||
|
||||
require.Equal(t, int64(14), s.System.BytesSent)
|
||||
}
|
||||
|
||||
func TestServerProcessPublishHookOnMessageModifyError(t *testing.T) {
|
||||
s, cl1, r1, w1 := setupClient()
|
||||
s.Clients.Add(cl1)
|
||||
s.Topics.Subscribe("a/b/+", cl1.ID, 0)
|
||||
|
||||
s.Events.OnMessage = func(cl events.Client, pk events.Packet) (events.Packet, error) {
|
||||
pkx := pk
|
||||
pkx.Payload = []byte("world")
|
||||
return pkx, fmt.Errorf("error")
|
||||
}
|
||||
|
||||
ack1 := make(chan []byte)
|
||||
go func() {
|
||||
buf, err := ioutil.ReadAll(r1)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
ack1 <- buf
|
||||
}()
|
||||
|
||||
pk1 := packets.Packet{
|
||||
FixedHeader: packets.FixedHeader{
|
||||
Type: packets.Publish,
|
||||
},
|
||||
TopicName: "a/b/c",
|
||||
Payload: []byte("hello"),
|
||||
}
|
||||
err := s.processPacket(cl1, pk1)
|
||||
|
||||
require.NoError(t, err)
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
w1.Close()
|
||||
|
||||
require.Equal(t, []byte{
|
||||
byte(packets.Publish << 4), 12,
|
||||
0, 5,
|
||||
'a', '/', 'b', '/', 'c',
|
||||
'h', 'e', 'l', 'l', 'o',
|
||||
}, <-ack1)
|
||||
|
||||
require.Equal(t, int64(14), s.System.BytesSent)
|
||||
}
|
||||
|
||||
func TestServerProcessPuback(t *testing.T) {
|
||||
s, cl, _, _ := setupClient()
|
||||
cl.Inflight.Set(11, clients.InflightMessage{Packet: packets.Packet{PacketID: 11}, Sent: 0})
|
||||
|
||||
Reference in New Issue
Block a user