mirror of
				https://github.com/mochi-mqtt/server.git
				synced 2025-11-01 03:52:39 +08:00 
			
		
		
		
	Compare commits
	
		
			25 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|   | b277600823 | ||
|   | 685c050fdd | ||
|   | 0abbaf5070 | ||
|   | 1ab1928cff | ||
|   | 8890bb9dd4 | ||
|   | f9348aaf93 | ||
|   | c2a42a16ca | ||
|   | d14d944de9 | ||
|   | 480e60b3f0 | ||
|   | d4cbf1abdc | ||
|   | 8a1c53432e | ||
|   | 7c7b8d58fe | ||
|   | ce773b3978 | ||
|   | f3e7469478 | ||
|   | b5685ca0ee | ||
|   | 66edb0564c | ||
|   | 1d9fa4199c | ||
|   | dec880231d | ||
|   | 21d4e54e74 | ||
|   | aeb4190733 | ||
|   | 484e4abd56 | ||
|   | d51bad30fc | ||
|   | 060fbffa79 | ||
|   | 7c68614912 | ||
|   | 124be96c0e | 
							
								
								
									
										2
									
								
								.gitignore
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										2
									
								
								.gitignore
									
									
									
									
										vendored
									
									
										Normal file
									
								
							| @@ -0,0 +1,2 @@ | ||||
| cmd/mqtt | ||||
| .DS_Store | ||||
							
								
								
									
										175
									
								
								README.md
									
									
									
									
									
								
							
							
						
						
									
										175
									
								
								README.md
									
									
									
									
									
								
							| @@ -4,7 +4,6 @@ | ||||
| [](https://travis-ci.com/mochi-co/mqtt) | ||||
| [](https://github.com/mochi-co/mqtt/issues) | ||||
| [](https://codecov.io/gh/mochi-co/mqtt) | ||||
| [](https://www.codacy.com/app/mochi-co/mqtt?utm_source=github.com&utm_medium=referral&utm_content=mochi-co/mqtt&utm_campaign=Badge_Grade) | ||||
| [](https://pkg.go.dev/github.com/mochi-co/mqtt) | ||||
|  | ||||
| </p> | ||||
| @@ -25,78 +24,13 @@ MQTT stands for MQ Telemetry Transport. It is a publish/subscribe, extremely sim | ||||
| - TCP, Websocket, (including SSL/TLS) and Dashboard listeners. | ||||
| - Interfaces for Client Authentication and Topic access control. | ||||
| - Bolt-backed persistence and storage interfaces. | ||||
| - Directly Publishing from embedding service (`s.Publish(topic, message, retain)`). | ||||
|  | ||||
| #### Roadmap | ||||
| - Inline Pub-sub (without client) and event hooks | ||||
| - Event Hooks (eg. provide handler functions for `onMessage`). | ||||
| - Docker Image | ||||
| - MQTT v5 compatibility | ||||
|  | ||||
| #### Performance (messages/second) | ||||
| Performance benchmarks were tested using [MQTT-Stresser](https://github.com/inovex/mqtt-stresser) on a  13-inch, Early 2015 Macbook Pro (2.7 GHz Intel Core i5). Taking into account bursts of high and low throughput, the median scores are the most useful. Higher is better. SEND = Publish throughput, RECV = Subscribe throughput. | ||||
|  | ||||
| > As usual, any performance benchmarks should be taken with a pinch of salt, but are shown to demonstrate typical throughput compared to the other leading MQTT brokers. | ||||
|  | ||||
| **Single Client, 10,000 messages** | ||||
| _With only 1 client, there is no variation in throughput so the benchmark is reports the same number for high, low, and median._ | ||||
|  | ||||
|  | ||||
|  | ||||
| `mqtt-stresser -broker tcp://localhost:1883 -num-clients=1 -num-messages=10000` | ||||
|  | ||||
| |              | Mochi     | Mosquitto   | EMQX     | VerneMQ   | Mosca   |   | ||||
| | :----------- | --------: | ----------: | -------: | --------: | --------: | ||||
| | SEND Max    | 36505  |   30597  | 27202  | 32782  | 30125   | | ||||
| | SEND Min    |  36505    |  30597  | 27202   |  32782  | 30125  | | ||||
| | SEND Median  | 36505   | 30597   | 27202   |32782    | 30125  | | ||||
| | RECV Max    | 152221  |  59130  | 7879   | 17551   | 9145   | | ||||
| | RECV Min    | 152221  | 59130   | 7879   |  17551    |  9145    | | ||||
| | RECV Median    | 152221  |  59130  | 7879   |  17551   |  9145   | | ||||
|  | ||||
| **10 Clients, 1,000 Messages** | ||||
|  | ||||
|  | ||||
|  | ||||
| `mqtt-stresser -broker tcp://localhost:1883 -num-clients=10 -num-messages=1000` | ||||
|  | ||||
| |              | Mochi     | Mosquitto   | EMQX     | VerneMQ   | Mosca   |   | ||||
| | :----------- | --------: | ----------: | -------: | --------: | --------: | ||||
| | SEND Max    |  37193 | 	15775 |	17455 |	34138 |	36575  | | ||||
| | SEND Min    |   6529 |	6446 |	7714 |	8583 |	7383      | | ||||
| | SEND Median  |  15127 |	7813 | 	10305 |	9887 |	8169     | | ||||
| | RECV Max    |  33535	 | 3710	| 3022 |	4534 |	9411    | | ||||
| | RECV Min    |   7484	| 2661	| 1689 |	2021 |	2275     | | ||||
| | RECV Median    |   11427 |  3142 | 1831 |	2468 |	4692      | | ||||
|  | ||||
| **10 Clients, 10,000 Messages** | ||||
|  | ||||
|  | ||||
|  | ||||
| `mqtt-stresser -broker tcp://localhost:1883 -num-clients=10 -num-messages=10000` | ||||
|  | ||||
| |              | Mochi     | Mosquitto   | EMQX     | VerneMQ   | Mosca   |   | ||||
| | :----------- | --------: | ----------: | -------: | --------: | --------: | ||||
| | SEND Max    |   13153 |	13270 |	12229 |	13025 |	38446  | | ||||
| | SEND Min    |  8728	| 8513	| 8193 | 	6483 |	3889    | | ||||
| | SEND Median  |   9045	| 9532	| 9252 |	8031 |	9210    | | ||||
| | RECV Max    |  20774	| 5052	| 2093 |	2071 | 	43008    | | ||||
| | RECV Min    |   10718	 |3995	| 1531	| 1673	| 18764   | | ||||
| | RECV Median    |  16339 |	4607 |	1620 | 	1907	| 33524  | | ||||
|  | ||||
| **500 Clients, 100 Messages** | ||||
|  | ||||
|  | ||||
|  | ||||
| `mqtt-stresser -broker tcp://localhost:1883 -num-clients=500 -num-messages=100` | ||||
|  | ||||
| |              | Mochi     | Mosquitto   | EMQX     | VerneMQ   | Mosca   |   | ||||
| | :----------- | --------: | ----------: | -------: | --------: | --------: | ||||
| | SEND Max    |  70688	| 72686	| 71392 |	75336 |	73192   | | ||||
| | SEND Min    |   1021	| 2577 |	1603 |	8417 |	2344  | | ||||
| | SEND Median  |  49871	| 33076 |	33637 |	35200 |	31312   | | ||||
| | RECV Max    |  116163 |	4215 |	3427 |	5484 |	10100 | | ||||
| | RECV Min    |   1044	| 156 | 	56 | 	83	| 169   | | ||||
| | RECV Median    |     24398 | 208 |	94 |	413 |	474     | | ||||
|  | ||||
| #### Using the Broker | ||||
| Mochi MQTT can be used as a standalone broker. Simply checkout this repository and run the `main.go` entrypoint in the `cmd` folder which will expose tcp (:1883), websocket (:1882), and dashboard (:8080) listeners. A docker image is coming soon. | ||||
|  | ||||
| @@ -169,6 +103,43 @@ SSL may be configured on both the TCP and Websocket listeners by providing a pub | ||||
| ``` | ||||
| > Note the mandatory inclusion of the Auth Controller! | ||||
|  | ||||
| #### Event Hooks | ||||
| Some basic Event Hooks have been added, allowing you to call your own functions when certain events occur. The execution of the functions are blocking - if necessary, please handle goroutines within the embedding service. | ||||
|  | ||||
| ##### OnMessage | ||||
| `server.Events.OnMessage` is called when a Publish packet is received. The function receives the published message and information about the client who published it. This function will block message dispatching until it returns. | ||||
|  | ||||
| > This hook is only triggered when a message is received by clients. It is not triggered when using the direct `server.Publish` method. | ||||
|  | ||||
| ```go | ||||
| import "github.com/mochi-co/mqtt/server/events" | ||||
|  | ||||
| server.Events.OnMessage = func(cl events.Client, pk events.Packet) (pkx events.Packet, err error) { | ||||
| 	if string(pk.Payload) == "hello" { | ||||
| 		pkx = pk | ||||
| 		pkx.Payload = []byte("hello world") | ||||
| 		return pkx, nil | ||||
| 	}  | ||||
|  | ||||
| 	return pk, nil | ||||
| } | ||||
| ``` | ||||
|  | ||||
| A working example can be found in the `examples/events` folder. Please open an issue if there is a particular event hook you are interested in! | ||||
|  | ||||
| #### Direct Publishing | ||||
| When the broker is being embedded in a larger codebase, it can be useful to be able to publish messages directly to clients without having to implement a loopback TCP connection with an MQTT client. The `Publish` method allows you to inject publish messages directly into a queue to be delivered to any clients with matching topic filters. The `Retain` flag is supported. | ||||
|  | ||||
| ```go  | ||||
| 	// func (s *Server) Publish(topic string, payload []byte, retain bool) error | ||||
| 	err := s.Publish("a/b/c", []byte("hello"), false) | ||||
| 	if err != nil { | ||||
| 		log.Fatal(err) | ||||
| 	} | ||||
| ``` | ||||
|  | ||||
| A working example can be found in the `examples/events` folder. | ||||
|  | ||||
| #### Data Persistence | ||||
| Mochi MQTT provides a `persistence.Store` interface for developing and attaching persistent stores to the broker. The default persistence mechanism packaged with the broker is backed by [Bolt](https://github.com/etcd-io/bbolt) and can be enabled by assigning a `*bolt.Store` to the server. | ||||
| ```go | ||||
| @@ -183,6 +154,74 @@ Mochi MQTT provides a `persistence.Store` interface for developing and attaching | ||||
| #### Paho Interoperability Test | ||||
| You can check the broker against the [Paho Interoperability Test](https://github.com/eclipse/paho.mqtt.testing/tree/master/interoperability) by starting the broker using `examples/paho/main.go`, and then running the test with `python3 client_test.py` from the _interoperability_ folder. | ||||
|  | ||||
|  | ||||
| #### Performance (messages/second) | ||||
| Performance benchmarks were tested using [MQTT-Stresser](https://github.com/inovex/mqtt-stresser) on a  13-inch, Early 2015 Macbook Pro (2.7 GHz Intel Core i5). Taking into account bursts of high and low throughput, the median scores are the most useful. Higher is better. SEND = Publish throughput, RECV = Subscribe throughput. | ||||
|  | ||||
| > As usual, any performance benchmarks should be taken with a pinch of salt, but are shown to demonstrate typical throughput compared to the other leading MQTT brokers. | ||||
|  | ||||
| **Single Client, 10,000 messages** | ||||
| _With only 1 client, there is no variation in throughput so the benchmark is reports the same number for high, low, and median._ | ||||
|  | ||||
|  | ||||
|  | ||||
| `mqtt-stresser -broker tcp://localhost:1883 -num-clients=1 -num-messages=10000` | ||||
|  | ||||
| |              | Mochi     | Mosquitto   | EMQX     | VerneMQ   | Mosca   |   | ||||
| | :----------- | --------: | ----------: | -------: | --------: | --------: | ||||
| | SEND Max    | 36505  |   30597  | 27202  | 32782  | 30125   | | ||||
| | SEND Min    |  36505    |  30597  | 27202   |  32782  | 30125  | | ||||
| | SEND Median  | 36505   | 30597   | 27202   |32782    | 30125  | | ||||
| | RECV Max    | 152221  |  59130  | 7879   | 17551   | 9145   | | ||||
| | RECV Min    | 152221  | 59130   | 7879   |  17551    |  9145    | | ||||
| | RECV Median    | 152221  |  59130  | 7879   |  17551   |  9145   | | ||||
|  | ||||
| **10 Clients, 1,000 Messages** | ||||
|  | ||||
|  | ||||
|  | ||||
| `mqtt-stresser -broker tcp://localhost:1883 -num-clients=10 -num-messages=1000` | ||||
|  | ||||
| |              | Mochi     | Mosquitto   | EMQX     | VerneMQ   | Mosca   |   | ||||
| | :----------- | --------: | ----------: | -------: | --------: | --------: | ||||
| | SEND Max    |  37193 | 	15775 |	17455 |	34138 |	36575  | | ||||
| | SEND Min    |   6529 |	6446 |	7714 |	8583 |	7383      | | ||||
| | SEND Median  |  15127 |	7813 | 	10305 |	9887 |	8169     | | ||||
| | RECV Max    |  33535	 | 3710	| 3022 |	4534 |	9411    | | ||||
| | RECV Min    |   7484	| 2661	| 1689 |	2021 |	2275     | | ||||
| | RECV Median    |   11427 |  3142 | 1831 |	2468 |	4692      | | ||||
|  | ||||
| **10 Clients, 10,000 Messages** | ||||
|  | ||||
|  | ||||
|  | ||||
| `mqtt-stresser -broker tcp://localhost:1883 -num-clients=10 -num-messages=10000` | ||||
|  | ||||
| |              | Mochi     | Mosquitto   | EMQX     | VerneMQ   | Mosca   |   | ||||
| | :----------- | --------: | ----------: | -------: | --------: | --------: | ||||
| | SEND Max    |   13153 |	13270 |	12229 |	13025 |	38446  | | ||||
| | SEND Min    |  8728	| 8513	| 8193 | 	6483 |	3889    | | ||||
| | SEND Median  |   9045	| 9532	| 9252 |	8031 |	9210    | | ||||
| | RECV Max    |  20774	| 5052	| 2093 |	2071 | 	43008    | | ||||
| | RECV Min    |   10718	 |3995	| 1531	| 1673	| 18764   | | ||||
| | RECV Median    |  16339 |	4607 |	1620 | 	1907	| 33524  | | ||||
|  | ||||
| **500 Clients, 100 Messages** | ||||
|  | ||||
|  | ||||
|  | ||||
| `mqtt-stresser -broker tcp://localhost:1883 -num-clients=500 -num-messages=100` | ||||
|  | ||||
| |              | Mochi     | Mosquitto   | EMQX     | VerneMQ   | Mosca   |   | ||||
| | :----------- | --------: | ----------: | -------: | --------: | --------: | ||||
| | SEND Max    |  70688	| 72686	| 71392 |	75336 |	73192   | | ||||
| | SEND Min    |   1021	| 2577 |	1603 |	8417 |	2344  | | ||||
| | SEND Median  |  49871	| 33076 |	33637 |	35200 |	31312   | | ||||
| | RECV Max    |  116163 |	4215 |	3427 |	5484 |	10100 | | ||||
| | RECV Min    |   1044	| 156 | 	56 | 	83	| 169   | | ||||
| | RECV Median    |     24398 | 208 |	94 |	413 |	474     | | ||||
|  | ||||
|  | ||||
| ## Contributions | ||||
| Contributions and feedback are both welcomed and encouraged! Open an [issue](https://github.com/mochi-co/mqtt/issues) to report a bug, ask a question, or make a feature request. | ||||
|  | ||||
|   | ||||
							
								
								
									
										77
									
								
								examples/events/main.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										77
									
								
								examples/events/main.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,77 @@ | ||||
| package main | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"log" | ||||
| 	"os" | ||||
| 	"os/signal" | ||||
| 	"syscall" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/logrusorgru/aurora" | ||||
|  | ||||
| 	mqtt "github.com/mochi-co/mqtt/server" | ||||
| 	"github.com/mochi-co/mqtt/server/events" | ||||
| 	"github.com/mochi-co/mqtt/server/listeners" | ||||
| 	"github.com/mochi-co/mqtt/server/listeners/auth" | ||||
| ) | ||||
|  | ||||
| func main() { | ||||
| 	sigs := make(chan os.Signal, 1) | ||||
| 	done := make(chan bool, 1) | ||||
| 	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) | ||||
| 	go func() { | ||||
| 		<-sigs | ||||
| 		done <- true | ||||
| 	}() | ||||
|  | ||||
| 	fmt.Println(aurora.Magenta("Mochi MQTT Server initializing..."), aurora.Cyan("TCP")) | ||||
|  | ||||
| 	server := mqtt.New() | ||||
| 	tcp := listeners.NewTCP("t1", ":1883") | ||||
| 	err := server.AddListener(tcp, &listeners.Config{ | ||||
| 		Auth: new(auth.Allow), | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		log.Fatal(err) | ||||
| 	} | ||||
|  | ||||
| 	// Start the server | ||||
| 	go func() { | ||||
| 		err := server.Serve() | ||||
| 		if err != nil { | ||||
| 			log.Fatal(err) | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
| 	// Add OnMessage Event Hook | ||||
| 	server.Events.OnMessage = func(cl events.Client, pk events.Packet) (pkx events.Packet, err error) { | ||||
| 		pkx = pk | ||||
| 		if string(pk.Payload) == "hello" { | ||||
| 			pkx.Payload = []byte("hello world") | ||||
| 			fmt.Printf("< OnMessage modified message from client %s: %s\n", cl.ID, string(pkx.Payload)) | ||||
| 		} else { | ||||
| 			fmt.Printf("< OnMessage received message from client %s: %s\n", cl.ID, string(pkx.Payload)) | ||||
| 		} | ||||
|  | ||||
| 		return pkx, nil | ||||
| 	} | ||||
|  | ||||
| 	// Demonstration of directly publishing messages to a topic via the | ||||
| 	// `server.Publish` method. Subscribe to `direct/publish` using your | ||||
| 	// MQTT client to see the messages. | ||||
| 	go func() { | ||||
| 		for range time.Tick(time.Second * 10) { | ||||
| 			server.Publish("direct/publish", []byte("scheduled message"), false) | ||||
| 			fmt.Println("> issued direct message to direct/publish") | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
| 	fmt.Println(aurora.BgMagenta("  Started!  ")) | ||||
|  | ||||
| 	<-done | ||||
| 	fmt.Println(aurora.BgRed("  Caught Signal  ")) | ||||
|  | ||||
| 	server.Close() | ||||
| 	fmt.Println(aurora.BgGreen("  Finished  ")) | ||||
| } | ||||
							
								
								
									
										36
									
								
								server/events/events.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										36
									
								
								server/events/events.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,36 @@ | ||||
| package events | ||||
|  | ||||
| import ( | ||||
| 	"github.com/mochi-co/mqtt/server/internal/clients" | ||||
| 	"github.com/mochi-co/mqtt/server/internal/packets" | ||||
| ) | ||||
|  | ||||
| type Events struct { | ||||
| 	OnMessage // published message receieved. | ||||
| } | ||||
|  | ||||
| type Packet packets.Packet | ||||
|  | ||||
| type Client struct { | ||||
| 	ID       string | ||||
| 	Listener string | ||||
| } | ||||
|  | ||||
| // FromClient returns an event client from a client. | ||||
| func FromClient(cl clients.Client) Client { | ||||
| 	return Client{ | ||||
| 		ID:       cl.ID, | ||||
| 		Listener: cl.Listener, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // OnMessage function is called when a publish message is received. Note, | ||||
| // this hook is ONLY called by connected client publishers, it is not triggered when | ||||
| // using the direct s.Publish method. The function receives the sent message and the | ||||
| // data of the client who published it, and allows the packet to be modified | ||||
| // before it is dispatched to subscribers. If no modification is required, return | ||||
| // the original packet data. If an error occurs, the original packet will | ||||
| // be dispatched as if the event hook had not been triggered. | ||||
| // This function will block message dispatching until it returns. To minimise this, | ||||
| // have the function open a new goroutine on the embedding side. | ||||
| type OnMessage func(Client, Packet) (Packet, error) | ||||
| @@ -77,6 +77,7 @@ func (b *Writer) WriteTo(w io.Writer) (total int, err error) { | ||||
| } | ||||
|  | ||||
| // Write writes the buffer to the buffer p, returning the number of bytes written. | ||||
| // The bytes written to the buffer are picked up by WriteTo. | ||||
| func (b *Writer) Write(p []byte) (total int, err error) { | ||||
| 	err = b.awaitEmpty(len(p)) | ||||
| 	if err != nil { | ||||
|   | ||||
| @@ -305,8 +305,9 @@ func (cl *Client) ReadFixedHeader(fh *packets.FixedHeader) error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Read reads new packets from a client connection | ||||
| func (cl *Client) Read(h func(*Client, packets.Packet) error) error { | ||||
| // Read loops forever reading new packets from a client connection until | ||||
| // an error is encountered (or the connection is closed). | ||||
| func (cl *Client) Read(packetHandler func(*Client, packets.Packet) error) error { | ||||
| 	for { | ||||
| 		if atomic.LoadInt64(&cl.State.Done) == 1 && cl.r.CapDelta() == 0 { | ||||
| 			return nil | ||||
| @@ -324,7 +325,7 @@ func (cl *Client) Read(h func(*Client, packets.Packet) error) error { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		err = h(cl, pk) // Process inbound packet. | ||||
| 		err = packetHandler(cl, pk) // Process inbound packet. | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| @@ -437,6 +438,7 @@ func (cl *Client) WritePacket(pk packets.Packet) (n int, err error) { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	// Write the packet bytes to the client byte buffer. | ||||
| 	n, err = cl.w.Write(buf.Bytes()) | ||||
| 	if err != nil { | ||||
| 		return | ||||
|   | ||||
| @@ -80,8 +80,8 @@ func (l *TCP) Listen(s *system.Info) error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Serve starts waiting for new TCP connections, and calls the connection | ||||
| // establishment callback for any received. | ||||
| // Serve starts waiting for new TCP connections, and calls the establish | ||||
| // connection callback for any received. | ||||
| func (l *TCP) Serve(establish EstablishFunc) { | ||||
| 	for { | ||||
| 		if atomic.LoadInt64(&l.end) == 1 { | ||||
|   | ||||
							
								
								
									
										146
									
								
								server/server.go
									
									
									
									
									
								
							
							
						
						
									
										146
									
								
								server/server.go
									
									
									
									
									
								
							| @@ -9,6 +9,7 @@ import ( | ||||
| 	"sync/atomic" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/mochi-co/mqtt/server/events" | ||||
| 	"github.com/mochi-co/mqtt/server/internal/circ" | ||||
| 	"github.com/mochi-co/mqtt/server/internal/clients" | ||||
| 	"github.com/mochi-co/mqtt/server/internal/packets" | ||||
| @@ -20,11 +21,7 @@ import ( | ||||
| ) | ||||
|  | ||||
| const ( | ||||
| 	Version = "1.0.0" // the server version. | ||||
|  | ||||
| 	// maxPacketID is the maximum value of a 16-bit packet ID. If a | ||||
| 	// packet ID reaches this number, it resets to 0. | ||||
| 	maxPacketID = 65535 | ||||
| 	Version = "1.0.1" // the server version. | ||||
| ) | ||||
|  | ||||
| var ( | ||||
| @@ -47,14 +44,22 @@ var ( | ||||
| // Server is an MQTT broker server. It should be created with server.New() | ||||
| // in order to ensure all the internal fields are correctly populated. | ||||
| type Server struct { | ||||
| 	done      chan bool            // indicate that the server is ending. | ||||
| 	bytepool  circ.BytesPool       // a byte pool for incoming and outgoing packets. | ||||
| 	Listeners *listeners.Listeners // listeners are network interfaces which listen for new connections. | ||||
| 	Clients   *clients.Clients     // clients which are known to the broker. | ||||
| 	Topics    *topics.Index        // an index of topic filter subscriptions and retained messages. | ||||
| 	System    *system.Info         // values about the server commonly found in $SYS topics. | ||||
| 	Store     persistence.Store    // a persistent storage backend if desired. | ||||
| 	done      chan bool            // indicate that the server is ending. | ||||
| 	bytepool  circ.BytesPool       // a byte pool for incoming and outgoing packets. | ||||
| 	sysTicker *time.Ticker         // the interval ticker for sending updating $SYS topics. | ||||
| 	inline    inlineMessages       // channels for direct publishing. | ||||
| 	Events    events.Events        // overrideable event hooks. | ||||
| } | ||||
|  | ||||
| // inlineMessages contains channels for handling inline (direct) publishing. | ||||
| type inlineMessages struct { | ||||
| 	done chan bool           // indicate that the server is ending. | ||||
| 	pub  chan packets.Packet // a channel of packets to publish to clients | ||||
| } | ||||
|  | ||||
| // New returns a new instance of an MQTT broker. | ||||
| @@ -69,6 +74,11 @@ func New() *Server { | ||||
| 			Started: time.Now().Unix(), | ||||
| 		}, | ||||
| 		sysTicker: time.NewTicker(SysTopicInterval * time.Millisecond), | ||||
| 		inline: inlineMessages{ | ||||
| 			done: make(chan bool), | ||||
| 			pub:  make(chan packets.Packet, 1024), | ||||
| 		}, | ||||
| 		Events: events.Events{}, | ||||
| 	} | ||||
|  | ||||
| 	// Expose server stats using the system listener so it can be used in the | ||||
| @@ -119,9 +129,10 @@ func (s *Server) Serve() error { | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	go s.eventLoop() | ||||
| 	s.Listeners.ServeAll(s.EstablishConnection) | ||||
| 	s.publishSysTopics() | ||||
| 	go s.eventLoop()                            // spin up event loop for issuing $SYS values and closing server. | ||||
| 	go s.inlineClient()                         // spin up inline client for direct message publishing. | ||||
| 	s.Listeners.ServeAll(s.EstablishConnection) // start listening on all listeners. | ||||
| 	s.publishSysTopics()                        // begin publishing $SYS system values. | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
| @@ -132,6 +143,7 @@ func (s *Server) eventLoop() { | ||||
| 		select { | ||||
| 		case <-s.done: | ||||
| 			s.sysTicker.Stop() | ||||
| 			close(s.inline.done) | ||||
| 			return | ||||
| 		case <-s.sysTicker.C: | ||||
| 			s.publishSysTopics() | ||||
| @@ -139,11 +151,25 @@ func (s *Server) eventLoop() { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // EstablishConnection establishes a new client when a listener accepts a new | ||||
| // connection. | ||||
| // inlineClient loops forever, sending directly-published messages | ||||
| // from the Publish method to subscribers. | ||||
| func (s *Server) inlineClient() { | ||||
| 	for { | ||||
| 		select { | ||||
| 		case <-s.inline.done: | ||||
| 			close(s.inline.pub) | ||||
| 			return | ||||
| 		case pk := <-s.inline.pub: | ||||
| 			s.publishToSubscribers(pk) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // EstablishConnection establishes a new client when a listener | ||||
| // accepts a new connection. | ||||
| func (s *Server) EstablishConnection(lid string, c net.Conn, ac auth.Controller) error { | ||||
| 	xbr := s.bytepool.Get() // Get byte buffer from pools for sending and receiving packet data. | ||||
| 	xbw := s.bytepool.Get() | ||||
| 	xbr := s.bytepool.Get() // Get byte buffer from pools for receiving packet data. | ||||
| 	xbw := s.bytepool.Get() // and for sending. | ||||
|  | ||||
| 	cl := clients.NewClient(c, | ||||
| 		circ.NewReaderFromSlice(0, xbr), | ||||
| @@ -325,11 +351,38 @@ func (s *Server) processPingreq(cl *clients.Client, pk packets.Packet) error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Publish creates a publish packet from a payload and sends it to the inline.pub | ||||
| // channel, where it is written directly to the outgoing byte buffers of any | ||||
| // clients subscribed to the given topic. Because the message is written directly | ||||
| // within the server, QoS is inherently 2 (exactly once). | ||||
| func (s *Server) Publish(topic string, payload []byte, retain bool) error { | ||||
| 	if len(topic) >= 4 && topic[0:4] == "$SYS" { | ||||
| 		return ErrInvalidTopic | ||||
| 	} | ||||
|  | ||||
| 	pk := packets.Packet{ | ||||
| 		FixedHeader: packets.FixedHeader{ | ||||
| 			Type: packets.Publish, | ||||
| 		}, | ||||
| 		TopicName: topic, | ||||
| 		Payload:   payload, | ||||
| 	} | ||||
|  | ||||
| 	if retain { | ||||
| 		s.retainMessage(pk) | ||||
| 	} | ||||
|  | ||||
| 	// handoff packet to s.inline.pub channel for writing to client buffers | ||||
| 	// to avoid blocking the calling function. | ||||
| 	s.inline.pub <- pk | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // processPublish processes a Publish packet. | ||||
| func (s *Server) processPublish(cl *clients.Client, pk packets.Packet) error { | ||||
| 	if len(pk.TopicName) >= 4 && pk.TopicName[0:4] == "$SYS" { | ||||
| 		// Clients can't publish to $SYS topics. | ||||
| 		return nil | ||||
| 		return nil // Clients can't publish to $SYS topics, so fail silently as per spec. | ||||
| 	} | ||||
|  | ||||
| 	if !cl.AC.ACL(cl.Username, pk.TopicName, true) { | ||||
| @@ -337,6 +390,42 @@ func (s *Server) processPublish(cl *clients.Client, pk packets.Packet) error { | ||||
| 	} | ||||
|  | ||||
| 	if pk.FixedHeader.Retain { | ||||
| 		s.retainMessage(pk) | ||||
| 	} | ||||
|  | ||||
| 	if pk.FixedHeader.Qos > 0 { | ||||
| 		ack := packets.Packet{ | ||||
| 			FixedHeader: packets.FixedHeader{ | ||||
| 				Type: packets.Puback, | ||||
| 			}, | ||||
| 			PacketID: pk.PacketID, | ||||
| 		} | ||||
|  | ||||
| 		if pk.FixedHeader.Qos == 2 { | ||||
| 			ack.FixedHeader.Type = packets.Pubrec | ||||
| 		} | ||||
|  | ||||
| 		// omit errors in case of broken connection / LWT publish. ack send failures | ||||
| 		// will be handled by in-flight resending on next reconnect. | ||||
| 		s.writeClient(cl, ack) | ||||
| 	} | ||||
|  | ||||
| 	// if an OnMessage hook exists, potentially modify the packet. | ||||
| 	if s.Events.OnMessage != nil { | ||||
| 		if pkx, err := s.Events.OnMessage(events.FromClient(*cl), events.Packet(pk)); err == nil { | ||||
| 			pk = packets.Packet(pkx) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// write packet to the byte buffers of any clients with matching topic filters. | ||||
| 	s.publishToSubscribers(pk) | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // retainMessage adds a message to a topic, and if a persistent store is provided, | ||||
| // adds the message to the store so it can be reloaded if necessary. | ||||
| func (s *Server) retainMessage(pk packets.Packet) { | ||||
| 	out := pk.PublishCopy() | ||||
| 	q := s.Topics.RetainMessage(out) | ||||
| 	atomic.AddInt64(&s.System.Retained, q) | ||||
| @@ -355,33 +444,10 @@ func (s *Server) processPublish(cl *clients.Client, pk packets.Packet) error { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| 	if pk.FixedHeader.Qos > 0 { | ||||
| 		ack := packets.Packet{ | ||||
| 			FixedHeader: packets.FixedHeader{ | ||||
| 				Type: packets.Puback, | ||||
| 			}, | ||||
| 			PacketID: pk.PacketID, | ||||
| 		} | ||||
|  | ||||
| 		if pk.FixedHeader.Qos == 2 { | ||||
| 			ack.FixedHeader.Type = packets.Pubrec | ||||
| 		} | ||||
|  | ||||
| 		s.writeClient(cl, ack) | ||||
| 		// omit errors in case of broken connection / LWT publish. ack send failures | ||||
| 		// will be handled by in-flight resending on next reconnect. | ||||
| 	} | ||||
|  | ||||
| 	s.publishToSubscribers(pk) | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // publishToSubscribers publishes a publish packet to all subscribers with | ||||
| // matching topic filters. | ||||
| func (s *Server) publishToSubscribers(pk packets.Packet) { | ||||
| 	subs := s.Topics.Subscribers(pk.TopicName) | ||||
| 	for id, qos := range subs { | ||||
| 	for id, qos := range s.Topics.Subscribers(pk.TopicName) { | ||||
| 		if client, ok := s.Clients.Get(id); ok { | ||||
| 			out := pk.PublishCopy() | ||||
| 			if qos > out.FixedHeader.Qos { // Inherit higher desired qos values. | ||||
|   | ||||
| @@ -2,6 +2,7 @@ package server | ||||
|  | ||||
| import ( | ||||
| 	//"errors" | ||||
| 	"fmt" | ||||
| 	"io/ioutil" | ||||
| 	"net" | ||||
| 	"strconv" | ||||
| @@ -11,6 +12,7 @@ import ( | ||||
|  | ||||
| 	"github.com/stretchr/testify/require" | ||||
|  | ||||
| 	"github.com/mochi-co/mqtt/server/events" | ||||
| 	"github.com/mochi-co/mqtt/server/internal/circ" | ||||
| 	"github.com/mochi-co/mqtt/server/internal/clients" | ||||
| 	"github.com/mochi-co/mqtt/server/internal/packets" | ||||
| @@ -650,7 +652,7 @@ func TestServerProcessPublishQoS2(t *testing.T) { | ||||
| 	require.Equal(t, int64(0), atomic.LoadInt64(&s.System.Retained)) | ||||
| } | ||||
|  | ||||
| func TestServerProcessPublishUnretain(t *testing.T) { | ||||
| func TestServerProcessPublishUnretainByEmptyPayload(t *testing.T) { | ||||
| 	s, cl1, r1, w1 := setupClient() | ||||
| 	s.Clients.Add(cl1) | ||||
|  | ||||
| @@ -840,6 +842,235 @@ func TestServerProcessPublishWriteAckError(t *testing.T) { | ||||
| 	require.Error(t, err) | ||||
| } | ||||
|  | ||||
| func TestServerPublishInline(t *testing.T) { | ||||
| 	s, cl1, r1, w1 := setupClient() | ||||
| 	cl1.ID = "inline" | ||||
| 	s.Clients.Add(cl1) | ||||
| 	s.Topics.Subscribe("a/b/+", cl1.ID, 0) | ||||
| 	go s.inlineClient() | ||||
|  | ||||
| 	ack1 := make(chan []byte) | ||||
| 	go func() { | ||||
| 		buf, err := ioutil.ReadAll(r1) | ||||
| 		if err != nil { | ||||
| 			panic(err) | ||||
| 		} | ||||
| 		ack1 <- buf | ||||
| 	}() | ||||
|  | ||||
| 	err := s.Publish("a/b/c", []byte("hello"), false) | ||||
| 	require.NoError(t, err) | ||||
|  | ||||
| 	time.Sleep(10 * time.Millisecond) | ||||
| 	w1.Close() | ||||
|  | ||||
| 	require.Equal(t, []byte{ | ||||
| 		byte(packets.Publish << 4), 12, | ||||
| 		0, 5, | ||||
| 		'a', '/', 'b', '/', 'c', | ||||
| 		'h', 'e', 'l', 'l', 'o', | ||||
| 	}, <-ack1) | ||||
|  | ||||
| 	require.Equal(t, int64(14), s.System.BytesSent) | ||||
|  | ||||
| 	close(s.inline.done) | ||||
| } | ||||
|  | ||||
| func TestServerPublishInlineRetain(t *testing.T) { | ||||
| 	s, cl1, r1, w1 := setupClient() | ||||
| 	cl1.ID = "inline" | ||||
|  | ||||
| 	ack1 := make(chan []byte) | ||||
| 	go func() { | ||||
| 		buf, err := ioutil.ReadAll(r1) | ||||
| 		if err != nil { | ||||
| 			panic(err) | ||||
| 		} | ||||
| 		ack1 <- buf | ||||
| 	}() | ||||
|  | ||||
| 	err := s.Publish("a/b/c", []byte("hello"), true) | ||||
| 	require.NoError(t, err) | ||||
|  | ||||
| 	time.Sleep(10 * time.Millisecond) | ||||
|  | ||||
| 	s.Clients.Add(cl1) | ||||
| 	s.Topics.Subscribe("a/b/+", cl1.ID, 0) | ||||
| 	go s.inlineClient() | ||||
|  | ||||
| 	time.Sleep(10 * time.Millisecond) | ||||
|  | ||||
| 	w1.Close() | ||||
|  | ||||
| 	require.Equal(t, []byte{ | ||||
| 		byte(packets.Publish << 4), 12, | ||||
| 		0, 5, | ||||
| 		'a', '/', 'b', '/', 'c', | ||||
| 		'h', 'e', 'l', 'l', 'o', | ||||
| 	}, <-ack1) | ||||
|  | ||||
| 	require.Equal(t, int64(14), s.System.BytesSent) | ||||
|  | ||||
| 	close(s.inline.done) | ||||
| } | ||||
|  | ||||
| func TestServerPublishInlineSysTopicError(t *testing.T) { | ||||
| 	s, _, _, _ := setupClient() | ||||
|  | ||||
| 	err := s.Publish("$SYS/stuff", []byte("hello"), false) | ||||
| 	require.Error(t, err) | ||||
| 	require.Equal(t, int64(0), s.System.BytesSent) | ||||
| } | ||||
|  | ||||
| func TestServerProcessPublishHookOnMessage(t *testing.T) { | ||||
| 	s, cl1, r1, w1 := setupClient() | ||||
| 	s.Clients.Add(cl1) | ||||
| 	s.Topics.Subscribe("a/b/+", cl1.ID, 0) | ||||
|  | ||||
| 	var hookedPacket events.Packet | ||||
| 	var hookedClient events.Client | ||||
| 	s.Events.OnMessage = func(cl events.Client, pk events.Packet) (events.Packet, error) { | ||||
| 		hookedClient = cl | ||||
| 		hookedPacket = pk | ||||
| 		return pk, nil | ||||
| 	} | ||||
|  | ||||
| 	ack1 := make(chan []byte) | ||||
| 	go func() { | ||||
| 		buf, err := ioutil.ReadAll(r1) | ||||
| 		if err != nil { | ||||
| 			panic(err) | ||||
| 		} | ||||
| 		ack1 <- buf | ||||
| 	}() | ||||
|  | ||||
| 	pk1 := packets.Packet{ | ||||
| 		FixedHeader: packets.FixedHeader{ | ||||
| 			Type: packets.Publish, | ||||
| 		}, | ||||
| 		TopicName: "a/b/c", | ||||
| 		Payload:   []byte("hello"), | ||||
| 	} | ||||
| 	err := s.processPacket(cl1, pk1) | ||||
|  | ||||
| 	require.NoError(t, err) | ||||
| 	time.Sleep(10 * time.Millisecond) | ||||
|  | ||||
| 	require.Equal(t, events.Client{ | ||||
| 		ID:       "mochi", | ||||
| 		Listener: "", | ||||
| 	}, hookedClient) | ||||
|  | ||||
| 	require.Equal(t, events.Packet(pk1), hookedPacket) | ||||
|  | ||||
| 	w1.Close() | ||||
|  | ||||
| 	require.Equal(t, []byte{ | ||||
| 		byte(packets.Publish << 4), 12, | ||||
| 		0, 5, | ||||
| 		'a', '/', 'b', '/', 'c', | ||||
| 		'h', 'e', 'l', 'l', 'o', | ||||
| 	}, <-ack1) | ||||
|  | ||||
| 	require.Equal(t, int64(14), s.System.BytesSent) | ||||
| } | ||||
|  | ||||
| func TestServerProcessPublishHookOnMessageModify(t *testing.T) { | ||||
| 	s, cl1, r1, w1 := setupClient() | ||||
| 	s.Clients.Add(cl1) | ||||
| 	s.Topics.Subscribe("a/b/+", cl1.ID, 0) | ||||
|  | ||||
| 	var hookedPacket events.Packet | ||||
| 	var hookedClient events.Client | ||||
| 	s.Events.OnMessage = func(cl events.Client, pk events.Packet) (events.Packet, error) { | ||||
| 		hookedPacket = pk | ||||
| 		hookedPacket.Payload = []byte("world") | ||||
| 		hookedClient = cl | ||||
| 		return hookedPacket, nil | ||||
| 	} | ||||
|  | ||||
| 	ack1 := make(chan []byte) | ||||
| 	go func() { | ||||
| 		buf, err := ioutil.ReadAll(r1) | ||||
| 		if err != nil { | ||||
| 			panic(err) | ||||
| 		} | ||||
| 		ack1 <- buf | ||||
| 	}() | ||||
|  | ||||
| 	pk1 := packets.Packet{ | ||||
| 		FixedHeader: packets.FixedHeader{ | ||||
| 			Type: packets.Publish, | ||||
| 		}, | ||||
| 		TopicName: "a/b/c", | ||||
| 		Payload:   []byte("hello"), | ||||
| 	} | ||||
| 	err := s.processPacket(cl1, pk1) | ||||
|  | ||||
| 	require.NoError(t, err) | ||||
| 	time.Sleep(10 * time.Millisecond) | ||||
|  | ||||
| 	require.Equal(t, events.Client{ | ||||
| 		ID:       "mochi", | ||||
| 		Listener: "", | ||||
| 	}, hookedClient) | ||||
|  | ||||
| 	w1.Close() | ||||
|  | ||||
| 	require.Equal(t, []byte{ | ||||
| 		byte(packets.Publish << 4), 12, | ||||
| 		0, 5, | ||||
| 		'a', '/', 'b', '/', 'c', | ||||
| 		'w', 'o', 'r', 'l', 'd', | ||||
| 	}, <-ack1) | ||||
|  | ||||
| 	require.Equal(t, int64(14), s.System.BytesSent) | ||||
| } | ||||
|  | ||||
| func TestServerProcessPublishHookOnMessageModifyError(t *testing.T) { | ||||
| 	s, cl1, r1, w1 := setupClient() | ||||
| 	s.Clients.Add(cl1) | ||||
| 	s.Topics.Subscribe("a/b/+", cl1.ID, 0) | ||||
|  | ||||
| 	s.Events.OnMessage = func(cl events.Client, pk events.Packet) (events.Packet, error) { | ||||
| 		pkx := pk | ||||
| 		pkx.Payload = []byte("world") | ||||
| 		return pkx, fmt.Errorf("error") | ||||
| 	} | ||||
|  | ||||
| 	ack1 := make(chan []byte) | ||||
| 	go func() { | ||||
| 		buf, err := ioutil.ReadAll(r1) | ||||
| 		if err != nil { | ||||
| 			panic(err) | ||||
| 		} | ||||
| 		ack1 <- buf | ||||
| 	}() | ||||
|  | ||||
| 	pk1 := packets.Packet{ | ||||
| 		FixedHeader: packets.FixedHeader{ | ||||
| 			Type: packets.Publish, | ||||
| 		}, | ||||
| 		TopicName: "a/b/c", | ||||
| 		Payload:   []byte("hello"), | ||||
| 	} | ||||
| 	err := s.processPacket(cl1, pk1) | ||||
|  | ||||
| 	require.NoError(t, err) | ||||
| 	time.Sleep(10 * time.Millisecond) | ||||
|  | ||||
| 	w1.Close() | ||||
|  | ||||
| 	require.Equal(t, []byte{ | ||||
| 		byte(packets.Publish << 4), 12, | ||||
| 		0, 5, | ||||
| 		'a', '/', 'b', '/', 'c', | ||||
| 		'h', 'e', 'l', 'l', 'o', | ||||
| 	}, <-ack1) | ||||
|  | ||||
| 	require.Equal(t, int64(14), s.System.BytesSent) | ||||
| } | ||||
|  | ||||
| func TestServerProcessPuback(t *testing.T) { | ||||
| 	s, cl, _, _ := setupClient() | ||||
| 	cl.Inflight.Set(11, clients.InflightMessage{Packet: packets.Packet{PacketID: 11}, Sent: 0}) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user