mirror of
				https://github.com/mochi-mqtt/server.git
				synced 2025-10-31 11:36:25 +08:00 
			
		
		
		
	Compare commits
	
		
			25 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|   | b277600823 | ||
|   | 685c050fdd | ||
|   | 0abbaf5070 | ||
|   | 1ab1928cff | ||
|   | 8890bb9dd4 | ||
|   | f9348aaf93 | ||
|   | c2a42a16ca | ||
|   | d14d944de9 | ||
|   | 480e60b3f0 | ||
|   | d4cbf1abdc | ||
|   | 8a1c53432e | ||
|   | 7c7b8d58fe | ||
|   | ce773b3978 | ||
|   | f3e7469478 | ||
|   | b5685ca0ee | ||
|   | 66edb0564c | ||
|   | 1d9fa4199c | ||
|   | dec880231d | ||
|   | 21d4e54e74 | ||
|   | aeb4190733 | ||
|   | 484e4abd56 | ||
|   | d51bad30fc | ||
|   | 060fbffa79 | ||
|   | 7c68614912 | ||
|   | 124be96c0e | 
							
								
								
									
										2
									
								
								.gitignore
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										2
									
								
								.gitignore
									
									
									
									
										vendored
									
									
										Normal file
									
								
							| @@ -0,0 +1,2 @@ | |||||||
|  | cmd/mqtt | ||||||
|  | .DS_Store | ||||||
							
								
								
									
										175
									
								
								README.md
									
									
									
									
									
								
							
							
						
						
									
										175
									
								
								README.md
									
									
									
									
									
								
							| @@ -4,7 +4,6 @@ | |||||||
| [](https://travis-ci.com/mochi-co/mqtt) | [](https://travis-ci.com/mochi-co/mqtt) | ||||||
| [](https://github.com/mochi-co/mqtt/issues) | [](https://github.com/mochi-co/mqtt/issues) | ||||||
| [](https://codecov.io/gh/mochi-co/mqtt) | [](https://codecov.io/gh/mochi-co/mqtt) | ||||||
| [](https://www.codacy.com/app/mochi-co/mqtt?utm_source=github.com&utm_medium=referral&utm_content=mochi-co/mqtt&utm_campaign=Badge_Grade) |  | ||||||
| [](https://pkg.go.dev/github.com/mochi-co/mqtt) | [](https://pkg.go.dev/github.com/mochi-co/mqtt) | ||||||
|  |  | ||||||
| </p> | </p> | ||||||
| @@ -25,78 +24,13 @@ MQTT stands for MQ Telemetry Transport. It is a publish/subscribe, extremely sim | |||||||
| - TCP, Websocket, (including SSL/TLS) and Dashboard listeners. | - TCP, Websocket, (including SSL/TLS) and Dashboard listeners. | ||||||
| - Interfaces for Client Authentication and Topic access control. | - Interfaces for Client Authentication and Topic access control. | ||||||
| - Bolt-backed persistence and storage interfaces. | - Bolt-backed persistence and storage interfaces. | ||||||
|  | - Directly Publishing from embedding service (`s.Publish(topic, message, retain)`). | ||||||
|  |  | ||||||
| #### Roadmap | #### Roadmap | ||||||
| - Inline Pub-sub (without client) and event hooks | - Event Hooks (eg. provide handler functions for `onMessage`). | ||||||
| - Docker Image | - Docker Image | ||||||
| - MQTT v5 compatibility | - MQTT v5 compatibility | ||||||
|  |  | ||||||
| #### Performance (messages/second) |  | ||||||
| Performance benchmarks were tested using [MQTT-Stresser](https://github.com/inovex/mqtt-stresser) on a  13-inch, Early 2015 Macbook Pro (2.7 GHz Intel Core i5). Taking into account bursts of high and low throughput, the median scores are the most useful. Higher is better. SEND = Publish throughput, RECV = Subscribe throughput. |  | ||||||
|  |  | ||||||
| > As usual, any performance benchmarks should be taken with a pinch of salt, but are shown to demonstrate typical throughput compared to the other leading MQTT brokers. |  | ||||||
|  |  | ||||||
| **Single Client, 10,000 messages** |  | ||||||
| _With only 1 client, there is no variation in throughput so the benchmark is reports the same number for high, low, and median._ |  | ||||||
|  |  | ||||||
|  |  | ||||||
|  |  | ||||||
| `mqtt-stresser -broker tcp://localhost:1883 -num-clients=1 -num-messages=10000` |  | ||||||
|  |  | ||||||
| |              | Mochi     | Mosquitto   | EMQX     | VerneMQ   | Mosca   |   |  | ||||||
| | :----------- | --------: | ----------: | -------: | --------: | --------: |  | ||||||
| | SEND Max    | 36505  |   30597  | 27202  | 32782  | 30125   | |  | ||||||
| | SEND Min    |  36505    |  30597  | 27202   |  32782  | 30125  | |  | ||||||
| | SEND Median  | 36505   | 30597   | 27202   |32782    | 30125  | |  | ||||||
| | RECV Max    | 152221  |  59130  | 7879   | 17551   | 9145   | |  | ||||||
| | RECV Min    | 152221  | 59130   | 7879   |  17551    |  9145    | |  | ||||||
| | RECV Median    | 152221  |  59130  | 7879   |  17551   |  9145   | |  | ||||||
|  |  | ||||||
| **10 Clients, 1,000 Messages** |  | ||||||
|  |  | ||||||
|  |  | ||||||
|  |  | ||||||
| `mqtt-stresser -broker tcp://localhost:1883 -num-clients=10 -num-messages=1000` |  | ||||||
|  |  | ||||||
| |              | Mochi     | Mosquitto   | EMQX     | VerneMQ   | Mosca   |   |  | ||||||
| | :----------- | --------: | ----------: | -------: | --------: | --------: |  | ||||||
| | SEND Max    |  37193 | 	15775 |	17455 |	34138 |	36575  | |  | ||||||
| | SEND Min    |   6529 |	6446 |	7714 |	8583 |	7383      | |  | ||||||
| | SEND Median  |  15127 |	7813 | 	10305 |	9887 |	8169     | |  | ||||||
| | RECV Max    |  33535	 | 3710	| 3022 |	4534 |	9411    | |  | ||||||
| | RECV Min    |   7484	| 2661	| 1689 |	2021 |	2275     | |  | ||||||
| | RECV Median    |   11427 |  3142 | 1831 |	2468 |	4692      | |  | ||||||
|  |  | ||||||
| **10 Clients, 10,000 Messages** |  | ||||||
|  |  | ||||||
|  |  | ||||||
|  |  | ||||||
| `mqtt-stresser -broker tcp://localhost:1883 -num-clients=10 -num-messages=10000` |  | ||||||
|  |  | ||||||
| |              | Mochi     | Mosquitto   | EMQX     | VerneMQ   | Mosca   |   |  | ||||||
| | :----------- | --------: | ----------: | -------: | --------: | --------: |  | ||||||
| | SEND Max    |   13153 |	13270 |	12229 |	13025 |	38446  | |  | ||||||
| | SEND Min    |  8728	| 8513	| 8193 | 	6483 |	3889    | |  | ||||||
| | SEND Median  |   9045	| 9532	| 9252 |	8031 |	9210    | |  | ||||||
| | RECV Max    |  20774	| 5052	| 2093 |	2071 | 	43008    | |  | ||||||
| | RECV Min    |   10718	 |3995	| 1531	| 1673	| 18764   | |  | ||||||
| | RECV Median    |  16339 |	4607 |	1620 | 	1907	| 33524  | |  | ||||||
|  |  | ||||||
| **500 Clients, 100 Messages** |  | ||||||
|  |  | ||||||
|  |  | ||||||
|  |  | ||||||
| `mqtt-stresser -broker tcp://localhost:1883 -num-clients=500 -num-messages=100` |  | ||||||
|  |  | ||||||
| |              | Mochi     | Mosquitto   | EMQX     | VerneMQ   | Mosca   |   |  | ||||||
| | :----------- | --------: | ----------: | -------: | --------: | --------: |  | ||||||
| | SEND Max    |  70688	| 72686	| 71392 |	75336 |	73192   | |  | ||||||
| | SEND Min    |   1021	| 2577 |	1603 |	8417 |	2344  | |  | ||||||
| | SEND Median  |  49871	| 33076 |	33637 |	35200 |	31312   | |  | ||||||
| | RECV Max    |  116163 |	4215 |	3427 |	5484 |	10100 | |  | ||||||
| | RECV Min    |   1044	| 156 | 	56 | 	83	| 169   | |  | ||||||
| | RECV Median    |     24398 | 208 |	94 |	413 |	474     | |  | ||||||
|  |  | ||||||
| #### Using the Broker | #### Using the Broker | ||||||
| Mochi MQTT can be used as a standalone broker. Simply checkout this repository and run the `main.go` entrypoint in the `cmd` folder which will expose tcp (:1883), websocket (:1882), and dashboard (:8080) listeners. A docker image is coming soon. | Mochi MQTT can be used as a standalone broker. Simply checkout this repository and run the `main.go` entrypoint in the `cmd` folder which will expose tcp (:1883), websocket (:1882), and dashboard (:8080) listeners. A docker image is coming soon. | ||||||
|  |  | ||||||
| @@ -169,6 +103,43 @@ SSL may be configured on both the TCP and Websocket listeners by providing a pub | |||||||
| ``` | ``` | ||||||
| > Note the mandatory inclusion of the Auth Controller! | > Note the mandatory inclusion of the Auth Controller! | ||||||
|  |  | ||||||
|  | #### Event Hooks | ||||||
|  | Some basic Event Hooks have been added, allowing you to call your own functions when certain events occur. The execution of the functions are blocking - if necessary, please handle goroutines within the embedding service. | ||||||
|  |  | ||||||
|  | ##### OnMessage | ||||||
|  | `server.Events.OnMessage` is called when a Publish packet is received. The function receives the published message and information about the client who published it. This function will block message dispatching until it returns. | ||||||
|  |  | ||||||
|  | > This hook is only triggered when a message is received by clients. It is not triggered when using the direct `server.Publish` method. | ||||||
|  |  | ||||||
|  | ```go | ||||||
|  | import "github.com/mochi-co/mqtt/server/events" | ||||||
|  |  | ||||||
|  | server.Events.OnMessage = func(cl events.Client, pk events.Packet) (pkx events.Packet, err error) { | ||||||
|  | 	if string(pk.Payload) == "hello" { | ||||||
|  | 		pkx = pk | ||||||
|  | 		pkx.Payload = []byte("hello world") | ||||||
|  | 		return pkx, nil | ||||||
|  | 	}  | ||||||
|  |  | ||||||
|  | 	return pk, nil | ||||||
|  | } | ||||||
|  | ``` | ||||||
|  |  | ||||||
|  | A working example can be found in the `examples/events` folder. Please open an issue if there is a particular event hook you are interested in! | ||||||
|  |  | ||||||
|  | #### Direct Publishing | ||||||
|  | When the broker is being embedded in a larger codebase, it can be useful to be able to publish messages directly to clients without having to implement a loopback TCP connection with an MQTT client. The `Publish` method allows you to inject publish messages directly into a queue to be delivered to any clients with matching topic filters. The `Retain` flag is supported. | ||||||
|  |  | ||||||
|  | ```go  | ||||||
|  | 	// func (s *Server) Publish(topic string, payload []byte, retain bool) error | ||||||
|  | 	err := s.Publish("a/b/c", []byte("hello"), false) | ||||||
|  | 	if err != nil { | ||||||
|  | 		log.Fatal(err) | ||||||
|  | 	} | ||||||
|  | ``` | ||||||
|  |  | ||||||
|  | A working example can be found in the `examples/events` folder. | ||||||
|  |  | ||||||
| #### Data Persistence | #### Data Persistence | ||||||
| Mochi MQTT provides a `persistence.Store` interface for developing and attaching persistent stores to the broker. The default persistence mechanism packaged with the broker is backed by [Bolt](https://github.com/etcd-io/bbolt) and can be enabled by assigning a `*bolt.Store` to the server. | Mochi MQTT provides a `persistence.Store` interface for developing and attaching persistent stores to the broker. The default persistence mechanism packaged with the broker is backed by [Bolt](https://github.com/etcd-io/bbolt) and can be enabled by assigning a `*bolt.Store` to the server. | ||||||
| ```go | ```go | ||||||
| @@ -183,6 +154,74 @@ Mochi MQTT provides a `persistence.Store` interface for developing and attaching | |||||||
| #### Paho Interoperability Test | #### Paho Interoperability Test | ||||||
| You can check the broker against the [Paho Interoperability Test](https://github.com/eclipse/paho.mqtt.testing/tree/master/interoperability) by starting the broker using `examples/paho/main.go`, and then running the test with `python3 client_test.py` from the _interoperability_ folder. | You can check the broker against the [Paho Interoperability Test](https://github.com/eclipse/paho.mqtt.testing/tree/master/interoperability) by starting the broker using `examples/paho/main.go`, and then running the test with `python3 client_test.py` from the _interoperability_ folder. | ||||||
|  |  | ||||||
|  |  | ||||||
|  | #### Performance (messages/second) | ||||||
|  | Performance benchmarks were tested using [MQTT-Stresser](https://github.com/inovex/mqtt-stresser) on a  13-inch, Early 2015 Macbook Pro (2.7 GHz Intel Core i5). Taking into account bursts of high and low throughput, the median scores are the most useful. Higher is better. SEND = Publish throughput, RECV = Subscribe throughput. | ||||||
|  |  | ||||||
|  | > As usual, any performance benchmarks should be taken with a pinch of salt, but are shown to demonstrate typical throughput compared to the other leading MQTT brokers. | ||||||
|  |  | ||||||
|  | **Single Client, 10,000 messages** | ||||||
|  | _With only 1 client, there is no variation in throughput so the benchmark is reports the same number for high, low, and median._ | ||||||
|  |  | ||||||
|  |  | ||||||
|  |  | ||||||
|  | `mqtt-stresser -broker tcp://localhost:1883 -num-clients=1 -num-messages=10000` | ||||||
|  |  | ||||||
|  | |              | Mochi     | Mosquitto   | EMQX     | VerneMQ   | Mosca   |   | ||||||
|  | | :----------- | --------: | ----------: | -------: | --------: | --------: | ||||||
|  | | SEND Max    | 36505  |   30597  | 27202  | 32782  | 30125   | | ||||||
|  | | SEND Min    |  36505    |  30597  | 27202   |  32782  | 30125  | | ||||||
|  | | SEND Median  | 36505   | 30597   | 27202   |32782    | 30125  | | ||||||
|  | | RECV Max    | 152221  |  59130  | 7879   | 17551   | 9145   | | ||||||
|  | | RECV Min    | 152221  | 59130   | 7879   |  17551    |  9145    | | ||||||
|  | | RECV Median    | 152221  |  59130  | 7879   |  17551   |  9145   | | ||||||
|  |  | ||||||
|  | **10 Clients, 1,000 Messages** | ||||||
|  |  | ||||||
|  |  | ||||||
|  |  | ||||||
|  | `mqtt-stresser -broker tcp://localhost:1883 -num-clients=10 -num-messages=1000` | ||||||
|  |  | ||||||
|  | |              | Mochi     | Mosquitto   | EMQX     | VerneMQ   | Mosca   |   | ||||||
|  | | :----------- | --------: | ----------: | -------: | --------: | --------: | ||||||
|  | | SEND Max    |  37193 | 	15775 |	17455 |	34138 |	36575  | | ||||||
|  | | SEND Min    |   6529 |	6446 |	7714 |	8583 |	7383      | | ||||||
|  | | SEND Median  |  15127 |	7813 | 	10305 |	9887 |	8169     | | ||||||
|  | | RECV Max    |  33535	 | 3710	| 3022 |	4534 |	9411    | | ||||||
|  | | RECV Min    |   7484	| 2661	| 1689 |	2021 |	2275     | | ||||||
|  | | RECV Median    |   11427 |  3142 | 1831 |	2468 |	4692      | | ||||||
|  |  | ||||||
|  | **10 Clients, 10,000 Messages** | ||||||
|  |  | ||||||
|  |  | ||||||
|  |  | ||||||
|  | `mqtt-stresser -broker tcp://localhost:1883 -num-clients=10 -num-messages=10000` | ||||||
|  |  | ||||||
|  | |              | Mochi     | Mosquitto   | EMQX     | VerneMQ   | Mosca   |   | ||||||
|  | | :----------- | --------: | ----------: | -------: | --------: | --------: | ||||||
|  | | SEND Max    |   13153 |	13270 |	12229 |	13025 |	38446  | | ||||||
|  | | SEND Min    |  8728	| 8513	| 8193 | 	6483 |	3889    | | ||||||
|  | | SEND Median  |   9045	| 9532	| 9252 |	8031 |	9210    | | ||||||
|  | | RECV Max    |  20774	| 5052	| 2093 |	2071 | 	43008    | | ||||||
|  | | RECV Min    |   10718	 |3995	| 1531	| 1673	| 18764   | | ||||||
|  | | RECV Median    |  16339 |	4607 |	1620 | 	1907	| 33524  | | ||||||
|  |  | ||||||
|  | **500 Clients, 100 Messages** | ||||||
|  |  | ||||||
|  |  | ||||||
|  |  | ||||||
|  | `mqtt-stresser -broker tcp://localhost:1883 -num-clients=500 -num-messages=100` | ||||||
|  |  | ||||||
|  | |              | Mochi     | Mosquitto   | EMQX     | VerneMQ   | Mosca   |   | ||||||
|  | | :----------- | --------: | ----------: | -------: | --------: | --------: | ||||||
|  | | SEND Max    |  70688	| 72686	| 71392 |	75336 |	73192   | | ||||||
|  | | SEND Min    |   1021	| 2577 |	1603 |	8417 |	2344  | | ||||||
|  | | SEND Median  |  49871	| 33076 |	33637 |	35200 |	31312   | | ||||||
|  | | RECV Max    |  116163 |	4215 |	3427 |	5484 |	10100 | | ||||||
|  | | RECV Min    |   1044	| 156 | 	56 | 	83	| 169   | | ||||||
|  | | RECV Median    |     24398 | 208 |	94 |	413 |	474     | | ||||||
|  |  | ||||||
|  |  | ||||||
| ## Contributions | ## Contributions | ||||||
| Contributions and feedback are both welcomed and encouraged! Open an [issue](https://github.com/mochi-co/mqtt/issues) to report a bug, ask a question, or make a feature request. | Contributions and feedback are both welcomed and encouraged! Open an [issue](https://github.com/mochi-co/mqtt/issues) to report a bug, ask a question, or make a feature request. | ||||||
|  |  | ||||||
|   | |||||||
							
								
								
									
										77
									
								
								examples/events/main.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										77
									
								
								examples/events/main.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,77 @@ | |||||||
|  | package main | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"fmt" | ||||||
|  | 	"log" | ||||||
|  | 	"os" | ||||||
|  | 	"os/signal" | ||||||
|  | 	"syscall" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
|  | 	"github.com/logrusorgru/aurora" | ||||||
|  |  | ||||||
|  | 	mqtt "github.com/mochi-co/mqtt/server" | ||||||
|  | 	"github.com/mochi-co/mqtt/server/events" | ||||||
|  | 	"github.com/mochi-co/mqtt/server/listeners" | ||||||
|  | 	"github.com/mochi-co/mqtt/server/listeners/auth" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | func main() { | ||||||
|  | 	sigs := make(chan os.Signal, 1) | ||||||
|  | 	done := make(chan bool, 1) | ||||||
|  | 	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) | ||||||
|  | 	go func() { | ||||||
|  | 		<-sigs | ||||||
|  | 		done <- true | ||||||
|  | 	}() | ||||||
|  |  | ||||||
|  | 	fmt.Println(aurora.Magenta("Mochi MQTT Server initializing..."), aurora.Cyan("TCP")) | ||||||
|  |  | ||||||
|  | 	server := mqtt.New() | ||||||
|  | 	tcp := listeners.NewTCP("t1", ":1883") | ||||||
|  | 	err := server.AddListener(tcp, &listeners.Config{ | ||||||
|  | 		Auth: new(auth.Allow), | ||||||
|  | 	}) | ||||||
|  | 	if err != nil { | ||||||
|  | 		log.Fatal(err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Start the server | ||||||
|  | 	go func() { | ||||||
|  | 		err := server.Serve() | ||||||
|  | 		if err != nil { | ||||||
|  | 			log.Fatal(err) | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  |  | ||||||
|  | 	// Add OnMessage Event Hook | ||||||
|  | 	server.Events.OnMessage = func(cl events.Client, pk events.Packet) (pkx events.Packet, err error) { | ||||||
|  | 		pkx = pk | ||||||
|  | 		if string(pk.Payload) == "hello" { | ||||||
|  | 			pkx.Payload = []byte("hello world") | ||||||
|  | 			fmt.Printf("< OnMessage modified message from client %s: %s\n", cl.ID, string(pkx.Payload)) | ||||||
|  | 		} else { | ||||||
|  | 			fmt.Printf("< OnMessage received message from client %s: %s\n", cl.ID, string(pkx.Payload)) | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		return pkx, nil | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Demonstration of directly publishing messages to a topic via the | ||||||
|  | 	// `server.Publish` method. Subscribe to `direct/publish` using your | ||||||
|  | 	// MQTT client to see the messages. | ||||||
|  | 	go func() { | ||||||
|  | 		for range time.Tick(time.Second * 10) { | ||||||
|  | 			server.Publish("direct/publish", []byte("scheduled message"), false) | ||||||
|  | 			fmt.Println("> issued direct message to direct/publish") | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  |  | ||||||
|  | 	fmt.Println(aurora.BgMagenta("  Started!  ")) | ||||||
|  |  | ||||||
|  | 	<-done | ||||||
|  | 	fmt.Println(aurora.BgRed("  Caught Signal  ")) | ||||||
|  |  | ||||||
|  | 	server.Close() | ||||||
|  | 	fmt.Println(aurora.BgGreen("  Finished  ")) | ||||||
|  | } | ||||||
							
								
								
									
										36
									
								
								server/events/events.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										36
									
								
								server/events/events.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,36 @@ | |||||||
|  | package events | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"github.com/mochi-co/mqtt/server/internal/clients" | ||||||
|  | 	"github.com/mochi-co/mqtt/server/internal/packets" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | type Events struct { | ||||||
|  | 	OnMessage // published message receieved. | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type Packet packets.Packet | ||||||
|  |  | ||||||
|  | type Client struct { | ||||||
|  | 	ID       string | ||||||
|  | 	Listener string | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // FromClient returns an event client from a client. | ||||||
|  | func FromClient(cl clients.Client) Client { | ||||||
|  | 	return Client{ | ||||||
|  | 		ID:       cl.ID, | ||||||
|  | 		Listener: cl.Listener, | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // OnMessage function is called when a publish message is received. Note, | ||||||
|  | // this hook is ONLY called by connected client publishers, it is not triggered when | ||||||
|  | // using the direct s.Publish method. The function receives the sent message and the | ||||||
|  | // data of the client who published it, and allows the packet to be modified | ||||||
|  | // before it is dispatched to subscribers. If no modification is required, return | ||||||
|  | // the original packet data. If an error occurs, the original packet will | ||||||
|  | // be dispatched as if the event hook had not been triggered. | ||||||
|  | // This function will block message dispatching until it returns. To minimise this, | ||||||
|  | // have the function open a new goroutine on the embedding side. | ||||||
|  | type OnMessage func(Client, Packet) (Packet, error) | ||||||
| @@ -77,6 +77,7 @@ func (b *Writer) WriteTo(w io.Writer) (total int, err error) { | |||||||
| } | } | ||||||
|  |  | ||||||
| // Write writes the buffer to the buffer p, returning the number of bytes written. | // Write writes the buffer to the buffer p, returning the number of bytes written. | ||||||
|  | // The bytes written to the buffer are picked up by WriteTo. | ||||||
| func (b *Writer) Write(p []byte) (total int, err error) { | func (b *Writer) Write(p []byte) (total int, err error) { | ||||||
| 	err = b.awaitEmpty(len(p)) | 	err = b.awaitEmpty(len(p)) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
|   | |||||||
| @@ -305,8 +305,9 @@ func (cl *Client) ReadFixedHeader(fh *packets.FixedHeader) error { | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| // Read reads new packets from a client connection | // Read loops forever reading new packets from a client connection until | ||||||
| func (cl *Client) Read(h func(*Client, packets.Packet) error) error { | // an error is encountered (or the connection is closed). | ||||||
|  | func (cl *Client) Read(packetHandler func(*Client, packets.Packet) error) error { | ||||||
| 	for { | 	for { | ||||||
| 		if atomic.LoadInt64(&cl.State.Done) == 1 && cl.r.CapDelta() == 0 { | 		if atomic.LoadInt64(&cl.State.Done) == 1 && cl.r.CapDelta() == 0 { | ||||||
| 			return nil | 			return nil | ||||||
| @@ -324,7 +325,7 @@ func (cl *Client) Read(h func(*Client, packets.Packet) error) error { | |||||||
| 			return err | 			return err | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		err = h(cl, pk) // Process inbound packet. | 		err = packetHandler(cl, pk) // Process inbound packet. | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return err | 			return err | ||||||
| 		} | 		} | ||||||
| @@ -437,6 +438,7 @@ func (cl *Client) WritePacket(pk packets.Packet) (n int, err error) { | |||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	// Write the packet bytes to the client byte buffer. | ||||||
| 	n, err = cl.w.Write(buf.Bytes()) | 	n, err = cl.w.Write(buf.Bytes()) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return | 		return | ||||||
|   | |||||||
| @@ -80,8 +80,8 @@ func (l *TCP) Listen(s *system.Info) error { | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| // Serve starts waiting for new TCP connections, and calls the connection | // Serve starts waiting for new TCP connections, and calls the establish | ||||||
| // establishment callback for any received. | // connection callback for any received. | ||||||
| func (l *TCP) Serve(establish EstablishFunc) { | func (l *TCP) Serve(establish EstablishFunc) { | ||||||
| 	for { | 	for { | ||||||
| 		if atomic.LoadInt64(&l.end) == 1 { | 		if atomic.LoadInt64(&l.end) == 1 { | ||||||
|   | |||||||
							
								
								
									
										136
									
								
								server/server.go
									
									
									
									
									
								
							
							
						
						
									
										136
									
								
								server/server.go
									
									
									
									
									
								
							| @@ -9,6 +9,7 @@ import ( | |||||||
| 	"sync/atomic" | 	"sync/atomic" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
|  | 	"github.com/mochi-co/mqtt/server/events" | ||||||
| 	"github.com/mochi-co/mqtt/server/internal/circ" | 	"github.com/mochi-co/mqtt/server/internal/circ" | ||||||
| 	"github.com/mochi-co/mqtt/server/internal/clients" | 	"github.com/mochi-co/mqtt/server/internal/clients" | ||||||
| 	"github.com/mochi-co/mqtt/server/internal/packets" | 	"github.com/mochi-co/mqtt/server/internal/packets" | ||||||
| @@ -20,11 +21,7 @@ import ( | |||||||
| ) | ) | ||||||
|  |  | ||||||
| const ( | const ( | ||||||
| 	Version = "1.0.0" // the server version. | 	Version = "1.0.1" // the server version. | ||||||
|  |  | ||||||
| 	// maxPacketID is the maximum value of a 16-bit packet ID. If a |  | ||||||
| 	// packet ID reaches this number, it resets to 0. |  | ||||||
| 	maxPacketID = 65535 |  | ||||||
| ) | ) | ||||||
|  |  | ||||||
| var ( | var ( | ||||||
| @@ -47,14 +44,22 @@ var ( | |||||||
| // Server is an MQTT broker server. It should be created with server.New() | // Server is an MQTT broker server. It should be created with server.New() | ||||||
| // in order to ensure all the internal fields are correctly populated. | // in order to ensure all the internal fields are correctly populated. | ||||||
| type Server struct { | type Server struct { | ||||||
| 	done      chan bool            // indicate that the server is ending. |  | ||||||
| 	bytepool  circ.BytesPool       // a byte pool for incoming and outgoing packets. |  | ||||||
| 	Listeners *listeners.Listeners // listeners are network interfaces which listen for new connections. | 	Listeners *listeners.Listeners // listeners are network interfaces which listen for new connections. | ||||||
| 	Clients   *clients.Clients     // clients which are known to the broker. | 	Clients   *clients.Clients     // clients which are known to the broker. | ||||||
| 	Topics    *topics.Index        // an index of topic filter subscriptions and retained messages. | 	Topics    *topics.Index        // an index of topic filter subscriptions and retained messages. | ||||||
| 	System    *system.Info         // values about the server commonly found in $SYS topics. | 	System    *system.Info         // values about the server commonly found in $SYS topics. | ||||||
| 	Store     persistence.Store    // a persistent storage backend if desired. | 	Store     persistence.Store    // a persistent storage backend if desired. | ||||||
|  | 	done      chan bool            // indicate that the server is ending. | ||||||
|  | 	bytepool  circ.BytesPool       // a byte pool for incoming and outgoing packets. | ||||||
| 	sysTicker *time.Ticker         // the interval ticker for sending updating $SYS topics. | 	sysTicker *time.Ticker         // the interval ticker for sending updating $SYS topics. | ||||||
|  | 	inline    inlineMessages       // channels for direct publishing. | ||||||
|  | 	Events    events.Events        // overrideable event hooks. | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // inlineMessages contains channels for handling inline (direct) publishing. | ||||||
|  | type inlineMessages struct { | ||||||
|  | 	done chan bool           // indicate that the server is ending. | ||||||
|  | 	pub  chan packets.Packet // a channel of packets to publish to clients | ||||||
| } | } | ||||||
|  |  | ||||||
| // New returns a new instance of an MQTT broker. | // New returns a new instance of an MQTT broker. | ||||||
| @@ -69,6 +74,11 @@ func New() *Server { | |||||||
| 			Started: time.Now().Unix(), | 			Started: time.Now().Unix(), | ||||||
| 		}, | 		}, | ||||||
| 		sysTicker: time.NewTicker(SysTopicInterval * time.Millisecond), | 		sysTicker: time.NewTicker(SysTopicInterval * time.Millisecond), | ||||||
|  | 		inline: inlineMessages{ | ||||||
|  | 			done: make(chan bool), | ||||||
|  | 			pub:  make(chan packets.Packet, 1024), | ||||||
|  | 		}, | ||||||
|  | 		Events: events.Events{}, | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// Expose server stats using the system listener so it can be used in the | 	// Expose server stats using the system listener so it can be used in the | ||||||
| @@ -119,9 +129,10 @@ func (s *Server) Serve() error { | |||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	go s.eventLoop() | 	go s.eventLoop()                            // spin up event loop for issuing $SYS values and closing server. | ||||||
| 	s.Listeners.ServeAll(s.EstablishConnection) | 	go s.inlineClient()                         // spin up inline client for direct message publishing. | ||||||
| 	s.publishSysTopics() | 	s.Listeners.ServeAll(s.EstablishConnection) // start listening on all listeners. | ||||||
|  | 	s.publishSysTopics()                        // begin publishing $SYS system values. | ||||||
|  |  | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| @@ -132,6 +143,7 @@ func (s *Server) eventLoop() { | |||||||
| 		select { | 		select { | ||||||
| 		case <-s.done: | 		case <-s.done: | ||||||
| 			s.sysTicker.Stop() | 			s.sysTicker.Stop() | ||||||
|  | 			close(s.inline.done) | ||||||
| 			return | 			return | ||||||
| 		case <-s.sysTicker.C: | 		case <-s.sysTicker.C: | ||||||
| 			s.publishSysTopics() | 			s.publishSysTopics() | ||||||
| @@ -139,11 +151,25 @@ func (s *Server) eventLoop() { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| // EstablishConnection establishes a new client when a listener accepts a new | // inlineClient loops forever, sending directly-published messages | ||||||
| // connection. | // from the Publish method to subscribers. | ||||||
|  | func (s *Server) inlineClient() { | ||||||
|  | 	for { | ||||||
|  | 		select { | ||||||
|  | 		case <-s.inline.done: | ||||||
|  | 			close(s.inline.pub) | ||||||
|  | 			return | ||||||
|  | 		case pk := <-s.inline.pub: | ||||||
|  | 			s.publishToSubscribers(pk) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // EstablishConnection establishes a new client when a listener | ||||||
|  | // accepts a new connection. | ||||||
| func (s *Server) EstablishConnection(lid string, c net.Conn, ac auth.Controller) error { | func (s *Server) EstablishConnection(lid string, c net.Conn, ac auth.Controller) error { | ||||||
| 	xbr := s.bytepool.Get() // Get byte buffer from pools for sending and receiving packet data. | 	xbr := s.bytepool.Get() // Get byte buffer from pools for receiving packet data. | ||||||
| 	xbw := s.bytepool.Get() | 	xbw := s.bytepool.Get() // and for sending. | ||||||
|  |  | ||||||
| 	cl := clients.NewClient(c, | 	cl := clients.NewClient(c, | ||||||
| 		circ.NewReaderFromSlice(0, xbr), | 		circ.NewReaderFromSlice(0, xbr), | ||||||
| @@ -325,11 +351,38 @@ func (s *Server) processPingreq(cl *clients.Client, pk packets.Packet) error { | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // Publish creates a publish packet from a payload and sends it to the inline.pub | ||||||
|  | // channel, where it is written directly to the outgoing byte buffers of any | ||||||
|  | // clients subscribed to the given topic. Because the message is written directly | ||||||
|  | // within the server, QoS is inherently 2 (exactly once). | ||||||
|  | func (s *Server) Publish(topic string, payload []byte, retain bool) error { | ||||||
|  | 	if len(topic) >= 4 && topic[0:4] == "$SYS" { | ||||||
|  | 		return ErrInvalidTopic | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	pk := packets.Packet{ | ||||||
|  | 		FixedHeader: packets.FixedHeader{ | ||||||
|  | 			Type: packets.Publish, | ||||||
|  | 		}, | ||||||
|  | 		TopicName: topic, | ||||||
|  | 		Payload:   payload, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if retain { | ||||||
|  | 		s.retainMessage(pk) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// handoff packet to s.inline.pub channel for writing to client buffers | ||||||
|  | 	// to avoid blocking the calling function. | ||||||
|  | 	s.inline.pub <- pk | ||||||
|  |  | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
| // processPublish processes a Publish packet. | // processPublish processes a Publish packet. | ||||||
| func (s *Server) processPublish(cl *clients.Client, pk packets.Packet) error { | func (s *Server) processPublish(cl *clients.Client, pk packets.Packet) error { | ||||||
| 	if len(pk.TopicName) >= 4 && pk.TopicName[0:4] == "$SYS" { | 	if len(pk.TopicName) >= 4 && pk.TopicName[0:4] == "$SYS" { | ||||||
| 		// Clients can't publish to $SYS topics. | 		return nil // Clients can't publish to $SYS topics, so fail silently as per spec. | ||||||
| 		return nil |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if !cl.AC.ACL(cl.Username, pk.TopicName, true) { | 	if !cl.AC.ACL(cl.Username, pk.TopicName, true) { | ||||||
| @@ -337,22 +390,7 @@ func (s *Server) processPublish(cl *clients.Client, pk packets.Packet) error { | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if pk.FixedHeader.Retain { | 	if pk.FixedHeader.Retain { | ||||||
| 		out := pk.PublishCopy() | 		s.retainMessage(pk) | ||||||
| 		q := s.Topics.RetainMessage(out) |  | ||||||
| 		atomic.AddInt64(&s.System.Retained, q) |  | ||||||
| 		if s.Store != nil { |  | ||||||
| 			if q == 1 { |  | ||||||
| 				s.Store.WriteRetained(persistence.Message{ |  | ||||||
| 					ID:          "ret_" + out.TopicName, |  | ||||||
| 					T:           persistence.KRetained, |  | ||||||
| 					FixedHeader: persistence.FixedHeader(out.FixedHeader), |  | ||||||
| 					TopicName:   out.TopicName, |  | ||||||
| 					Payload:     out.Payload, |  | ||||||
| 				}) |  | ||||||
| 			} else { |  | ||||||
| 				s.Store.DeleteRetained("ret_" + out.TopicName) |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if pk.FixedHeader.Qos > 0 { | 	if pk.FixedHeader.Qos > 0 { | ||||||
| @@ -367,21 +405,49 @@ func (s *Server) processPublish(cl *clients.Client, pk packets.Packet) error { | |||||||
| 			ack.FixedHeader.Type = packets.Pubrec | 			ack.FixedHeader.Type = packets.Pubrec | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		s.writeClient(cl, ack) |  | ||||||
| 		// omit errors in case of broken connection / LWT publish. ack send failures | 		// omit errors in case of broken connection / LWT publish. ack send failures | ||||||
| 		// will be handled by in-flight resending on next reconnect. | 		// will be handled by in-flight resending on next reconnect. | ||||||
|  | 		s.writeClient(cl, ack) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	// if an OnMessage hook exists, potentially modify the packet. | ||||||
|  | 	if s.Events.OnMessage != nil { | ||||||
|  | 		if pkx, err := s.Events.OnMessage(events.FromClient(*cl), events.Packet(pk)); err == nil { | ||||||
|  | 			pk = packets.Packet(pkx) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// write packet to the byte buffers of any clients with matching topic filters. | ||||||
| 	s.publishToSubscribers(pk) | 	s.publishToSubscribers(pk) | ||||||
|  |  | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // retainMessage adds a message to a topic, and if a persistent store is provided, | ||||||
|  | // adds the message to the store so it can be reloaded if necessary. | ||||||
|  | func (s *Server) retainMessage(pk packets.Packet) { | ||||||
|  | 	out := pk.PublishCopy() | ||||||
|  | 	q := s.Topics.RetainMessage(out) | ||||||
|  | 	atomic.AddInt64(&s.System.Retained, q) | ||||||
|  | 	if s.Store != nil { | ||||||
|  | 		if q == 1 { | ||||||
|  | 			s.Store.WriteRetained(persistence.Message{ | ||||||
|  | 				ID:          "ret_" + out.TopicName, | ||||||
|  | 				T:           persistence.KRetained, | ||||||
|  | 				FixedHeader: persistence.FixedHeader(out.FixedHeader), | ||||||
|  | 				TopicName:   out.TopicName, | ||||||
|  | 				Payload:     out.Payload, | ||||||
|  | 			}) | ||||||
|  | 		} else { | ||||||
|  | 			s.Store.DeleteRetained("ret_" + out.TopicName) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
| // publishToSubscribers publishes a publish packet to all subscribers with | // publishToSubscribers publishes a publish packet to all subscribers with | ||||||
| // matching topic filters. | // matching topic filters. | ||||||
| func (s *Server) publishToSubscribers(pk packets.Packet) { | func (s *Server) publishToSubscribers(pk packets.Packet) { | ||||||
| 	subs := s.Topics.Subscribers(pk.TopicName) | 	for id, qos := range s.Topics.Subscribers(pk.TopicName) { | ||||||
| 	for id, qos := range subs { |  | ||||||
| 		if client, ok := s.Clients.Get(id); ok { | 		if client, ok := s.Clients.Get(id); ok { | ||||||
| 			out := pk.PublishCopy() | 			out := pk.PublishCopy() | ||||||
| 			if qos > out.FixedHeader.Qos { // Inherit higher desired qos values. | 			if qos > out.FixedHeader.Qos { // Inherit higher desired qos values. | ||||||
|   | |||||||
| @@ -2,6 +2,7 @@ package server | |||||||
|  |  | ||||||
| import ( | import ( | ||||||
| 	//"errors" | 	//"errors" | ||||||
|  | 	"fmt" | ||||||
| 	"io/ioutil" | 	"io/ioutil" | ||||||
| 	"net" | 	"net" | ||||||
| 	"strconv" | 	"strconv" | ||||||
| @@ -11,6 +12,7 @@ import ( | |||||||
|  |  | ||||||
| 	"github.com/stretchr/testify/require" | 	"github.com/stretchr/testify/require" | ||||||
|  |  | ||||||
|  | 	"github.com/mochi-co/mqtt/server/events" | ||||||
| 	"github.com/mochi-co/mqtt/server/internal/circ" | 	"github.com/mochi-co/mqtt/server/internal/circ" | ||||||
| 	"github.com/mochi-co/mqtt/server/internal/clients" | 	"github.com/mochi-co/mqtt/server/internal/clients" | ||||||
| 	"github.com/mochi-co/mqtt/server/internal/packets" | 	"github.com/mochi-co/mqtt/server/internal/packets" | ||||||
| @@ -650,7 +652,7 @@ func TestServerProcessPublishQoS2(t *testing.T) { | |||||||
| 	require.Equal(t, int64(0), atomic.LoadInt64(&s.System.Retained)) | 	require.Equal(t, int64(0), atomic.LoadInt64(&s.System.Retained)) | ||||||
| } | } | ||||||
|  |  | ||||||
| func TestServerProcessPublishUnretain(t *testing.T) { | func TestServerProcessPublishUnretainByEmptyPayload(t *testing.T) { | ||||||
| 	s, cl1, r1, w1 := setupClient() | 	s, cl1, r1, w1 := setupClient() | ||||||
| 	s.Clients.Add(cl1) | 	s.Clients.Add(cl1) | ||||||
|  |  | ||||||
| @@ -840,6 +842,235 @@ func TestServerProcessPublishWriteAckError(t *testing.T) { | |||||||
| 	require.Error(t, err) | 	require.Error(t, err) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func TestServerPublishInline(t *testing.T) { | ||||||
|  | 	s, cl1, r1, w1 := setupClient() | ||||||
|  | 	cl1.ID = "inline" | ||||||
|  | 	s.Clients.Add(cl1) | ||||||
|  | 	s.Topics.Subscribe("a/b/+", cl1.ID, 0) | ||||||
|  | 	go s.inlineClient() | ||||||
|  |  | ||||||
|  | 	ack1 := make(chan []byte) | ||||||
|  | 	go func() { | ||||||
|  | 		buf, err := ioutil.ReadAll(r1) | ||||||
|  | 		if err != nil { | ||||||
|  | 			panic(err) | ||||||
|  | 		} | ||||||
|  | 		ack1 <- buf | ||||||
|  | 	}() | ||||||
|  |  | ||||||
|  | 	err := s.Publish("a/b/c", []byte("hello"), false) | ||||||
|  | 	require.NoError(t, err) | ||||||
|  |  | ||||||
|  | 	time.Sleep(10 * time.Millisecond) | ||||||
|  | 	w1.Close() | ||||||
|  |  | ||||||
|  | 	require.Equal(t, []byte{ | ||||||
|  | 		byte(packets.Publish << 4), 12, | ||||||
|  | 		0, 5, | ||||||
|  | 		'a', '/', 'b', '/', 'c', | ||||||
|  | 		'h', 'e', 'l', 'l', 'o', | ||||||
|  | 	}, <-ack1) | ||||||
|  |  | ||||||
|  | 	require.Equal(t, int64(14), s.System.BytesSent) | ||||||
|  |  | ||||||
|  | 	close(s.inline.done) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func TestServerPublishInlineRetain(t *testing.T) { | ||||||
|  | 	s, cl1, r1, w1 := setupClient() | ||||||
|  | 	cl1.ID = "inline" | ||||||
|  |  | ||||||
|  | 	ack1 := make(chan []byte) | ||||||
|  | 	go func() { | ||||||
|  | 		buf, err := ioutil.ReadAll(r1) | ||||||
|  | 		if err != nil { | ||||||
|  | 			panic(err) | ||||||
|  | 		} | ||||||
|  | 		ack1 <- buf | ||||||
|  | 	}() | ||||||
|  |  | ||||||
|  | 	err := s.Publish("a/b/c", []byte("hello"), true) | ||||||
|  | 	require.NoError(t, err) | ||||||
|  |  | ||||||
|  | 	time.Sleep(10 * time.Millisecond) | ||||||
|  |  | ||||||
|  | 	s.Clients.Add(cl1) | ||||||
|  | 	s.Topics.Subscribe("a/b/+", cl1.ID, 0) | ||||||
|  | 	go s.inlineClient() | ||||||
|  |  | ||||||
|  | 	time.Sleep(10 * time.Millisecond) | ||||||
|  |  | ||||||
|  | 	w1.Close() | ||||||
|  |  | ||||||
|  | 	require.Equal(t, []byte{ | ||||||
|  | 		byte(packets.Publish << 4), 12, | ||||||
|  | 		0, 5, | ||||||
|  | 		'a', '/', 'b', '/', 'c', | ||||||
|  | 		'h', 'e', 'l', 'l', 'o', | ||||||
|  | 	}, <-ack1) | ||||||
|  |  | ||||||
|  | 	require.Equal(t, int64(14), s.System.BytesSent) | ||||||
|  |  | ||||||
|  | 	close(s.inline.done) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func TestServerPublishInlineSysTopicError(t *testing.T) { | ||||||
|  | 	s, _, _, _ := setupClient() | ||||||
|  |  | ||||||
|  | 	err := s.Publish("$SYS/stuff", []byte("hello"), false) | ||||||
|  | 	require.Error(t, err) | ||||||
|  | 	require.Equal(t, int64(0), s.System.BytesSent) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func TestServerProcessPublishHookOnMessage(t *testing.T) { | ||||||
|  | 	s, cl1, r1, w1 := setupClient() | ||||||
|  | 	s.Clients.Add(cl1) | ||||||
|  | 	s.Topics.Subscribe("a/b/+", cl1.ID, 0) | ||||||
|  |  | ||||||
|  | 	var hookedPacket events.Packet | ||||||
|  | 	var hookedClient events.Client | ||||||
|  | 	s.Events.OnMessage = func(cl events.Client, pk events.Packet) (events.Packet, error) { | ||||||
|  | 		hookedClient = cl | ||||||
|  | 		hookedPacket = pk | ||||||
|  | 		return pk, nil | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	ack1 := make(chan []byte) | ||||||
|  | 	go func() { | ||||||
|  | 		buf, err := ioutil.ReadAll(r1) | ||||||
|  | 		if err != nil { | ||||||
|  | 			panic(err) | ||||||
|  | 		} | ||||||
|  | 		ack1 <- buf | ||||||
|  | 	}() | ||||||
|  |  | ||||||
|  | 	pk1 := packets.Packet{ | ||||||
|  | 		FixedHeader: packets.FixedHeader{ | ||||||
|  | 			Type: packets.Publish, | ||||||
|  | 		}, | ||||||
|  | 		TopicName: "a/b/c", | ||||||
|  | 		Payload:   []byte("hello"), | ||||||
|  | 	} | ||||||
|  | 	err := s.processPacket(cl1, pk1) | ||||||
|  |  | ||||||
|  | 	require.NoError(t, err) | ||||||
|  | 	time.Sleep(10 * time.Millisecond) | ||||||
|  |  | ||||||
|  | 	require.Equal(t, events.Client{ | ||||||
|  | 		ID:       "mochi", | ||||||
|  | 		Listener: "", | ||||||
|  | 	}, hookedClient) | ||||||
|  |  | ||||||
|  | 	require.Equal(t, events.Packet(pk1), hookedPacket) | ||||||
|  |  | ||||||
|  | 	w1.Close() | ||||||
|  |  | ||||||
|  | 	require.Equal(t, []byte{ | ||||||
|  | 		byte(packets.Publish << 4), 12, | ||||||
|  | 		0, 5, | ||||||
|  | 		'a', '/', 'b', '/', 'c', | ||||||
|  | 		'h', 'e', 'l', 'l', 'o', | ||||||
|  | 	}, <-ack1) | ||||||
|  |  | ||||||
|  | 	require.Equal(t, int64(14), s.System.BytesSent) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func TestServerProcessPublishHookOnMessageModify(t *testing.T) { | ||||||
|  | 	s, cl1, r1, w1 := setupClient() | ||||||
|  | 	s.Clients.Add(cl1) | ||||||
|  | 	s.Topics.Subscribe("a/b/+", cl1.ID, 0) | ||||||
|  |  | ||||||
|  | 	var hookedPacket events.Packet | ||||||
|  | 	var hookedClient events.Client | ||||||
|  | 	s.Events.OnMessage = func(cl events.Client, pk events.Packet) (events.Packet, error) { | ||||||
|  | 		hookedPacket = pk | ||||||
|  | 		hookedPacket.Payload = []byte("world") | ||||||
|  | 		hookedClient = cl | ||||||
|  | 		return hookedPacket, nil | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	ack1 := make(chan []byte) | ||||||
|  | 	go func() { | ||||||
|  | 		buf, err := ioutil.ReadAll(r1) | ||||||
|  | 		if err != nil { | ||||||
|  | 			panic(err) | ||||||
|  | 		} | ||||||
|  | 		ack1 <- buf | ||||||
|  | 	}() | ||||||
|  |  | ||||||
|  | 	pk1 := packets.Packet{ | ||||||
|  | 		FixedHeader: packets.FixedHeader{ | ||||||
|  | 			Type: packets.Publish, | ||||||
|  | 		}, | ||||||
|  | 		TopicName: "a/b/c", | ||||||
|  | 		Payload:   []byte("hello"), | ||||||
|  | 	} | ||||||
|  | 	err := s.processPacket(cl1, pk1) | ||||||
|  |  | ||||||
|  | 	require.NoError(t, err) | ||||||
|  | 	time.Sleep(10 * time.Millisecond) | ||||||
|  |  | ||||||
|  | 	require.Equal(t, events.Client{ | ||||||
|  | 		ID:       "mochi", | ||||||
|  | 		Listener: "", | ||||||
|  | 	}, hookedClient) | ||||||
|  |  | ||||||
|  | 	w1.Close() | ||||||
|  |  | ||||||
|  | 	require.Equal(t, []byte{ | ||||||
|  | 		byte(packets.Publish << 4), 12, | ||||||
|  | 		0, 5, | ||||||
|  | 		'a', '/', 'b', '/', 'c', | ||||||
|  | 		'w', 'o', 'r', 'l', 'd', | ||||||
|  | 	}, <-ack1) | ||||||
|  |  | ||||||
|  | 	require.Equal(t, int64(14), s.System.BytesSent) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func TestServerProcessPublishHookOnMessageModifyError(t *testing.T) { | ||||||
|  | 	s, cl1, r1, w1 := setupClient() | ||||||
|  | 	s.Clients.Add(cl1) | ||||||
|  | 	s.Topics.Subscribe("a/b/+", cl1.ID, 0) | ||||||
|  |  | ||||||
|  | 	s.Events.OnMessage = func(cl events.Client, pk events.Packet) (events.Packet, error) { | ||||||
|  | 		pkx := pk | ||||||
|  | 		pkx.Payload = []byte("world") | ||||||
|  | 		return pkx, fmt.Errorf("error") | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	ack1 := make(chan []byte) | ||||||
|  | 	go func() { | ||||||
|  | 		buf, err := ioutil.ReadAll(r1) | ||||||
|  | 		if err != nil { | ||||||
|  | 			panic(err) | ||||||
|  | 		} | ||||||
|  | 		ack1 <- buf | ||||||
|  | 	}() | ||||||
|  |  | ||||||
|  | 	pk1 := packets.Packet{ | ||||||
|  | 		FixedHeader: packets.FixedHeader{ | ||||||
|  | 			Type: packets.Publish, | ||||||
|  | 		}, | ||||||
|  | 		TopicName: "a/b/c", | ||||||
|  | 		Payload:   []byte("hello"), | ||||||
|  | 	} | ||||||
|  | 	err := s.processPacket(cl1, pk1) | ||||||
|  |  | ||||||
|  | 	require.NoError(t, err) | ||||||
|  | 	time.Sleep(10 * time.Millisecond) | ||||||
|  |  | ||||||
|  | 	w1.Close() | ||||||
|  |  | ||||||
|  | 	require.Equal(t, []byte{ | ||||||
|  | 		byte(packets.Publish << 4), 12, | ||||||
|  | 		0, 5, | ||||||
|  | 		'a', '/', 'b', '/', 'c', | ||||||
|  | 		'h', 'e', 'l', 'l', 'o', | ||||||
|  | 	}, <-ack1) | ||||||
|  |  | ||||||
|  | 	require.Equal(t, int64(14), s.System.BytesSent) | ||||||
|  | } | ||||||
|  |  | ||||||
| func TestServerProcessPuback(t *testing.T) { | func TestServerProcessPuback(t *testing.T) { | ||||||
| 	s, cl, _, _ := setupClient() | 	s, cl, _, _ := setupClient() | ||||||
| 	cl.Inflight.Set(11, clients.InflightMessage{Packet: packets.Packet{PacketID: 11}, Sent: 0}) | 	cl.Inflight.Set(11, clients.InflightMessage{Packet: packets.Packet{PacketID: 11}, Sent: 0}) | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user