mirror of
https://github.com/mochi-mqtt/server.git
synced 2025-09-27 04:26:23 +08:00
Compare commits
18 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
a2c0749640 | ||
![]() |
37293aeecf | ||
![]() |
7a2d4db6a4 | ||
![]() |
03d2a8bc82 | ||
![]() |
4b51e5c7d1 | ||
![]() |
d15ad682bf | ||
![]() |
130ffcbb53 | ||
![]() |
33cf2f991b | ||
![]() |
a360ea6a6c | ||
![]() |
ae3aa0d3fa | ||
![]() |
811ae0e1be | ||
![]() |
51d6825430 | ||
![]() |
514288c53e | ||
![]() |
957fc0a049 | ||
![]() |
03f94f948a | ||
![]() |
1bc752a2b8 | ||
![]() |
b9db59ba12 | ||
![]() |
c0ef58c363 |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -1,2 +1,3 @@
|
||||
cmd/mqtt
|
||||
.DS_Store
|
||||
.DS_Store
|
||||
server/persistence/bolt/testbolt.db
|
||||
|
30
README.md
30
README.md
@@ -25,9 +25,10 @@ MQTT stands for MQ Telemetry Transport. It is a publish/subscribe, extremely sim
|
||||
- Interfaces for Client Authentication and Topic access control.
|
||||
- Bolt-backed persistence and storage interfaces.
|
||||
- Directly Publishing from embedding service (`s.Publish(topic, message, retain)`).
|
||||
- Basic Event Hooks (currently `onMessage`)
|
||||
- Basic Event Hooks (currently `OnMessage`, `OnConnect`, `OnDisconnect`)
|
||||
|
||||
#### Roadmap
|
||||
- Please open an issue to request new features or event hooks.
|
||||
- MQTT v5 compatibility?
|
||||
|
||||
#### Using the Broker
|
||||
@@ -105,8 +106,30 @@ err := server.AddListener(tcp, &listeners.Config{
|
||||
#### Event Hooks
|
||||
Some basic Event Hooks have been added, allowing you to call your own functions when certain events occur. The execution of the functions are blocking - if necessary, please handle goroutines within the embedding service.
|
||||
|
||||
Working examples can be found in the `examples/events` folder. Please open an issue if there is a particular event hook you are interested in!
|
||||
|
||||
##### OnConnect
|
||||
`server.Events.OnConnect` is called when a client successfully connects to the broker. The method receives the connect packet and the id and connection type for the client who connected.
|
||||
|
||||
```go
|
||||
import "github.com/mochi-co/mqtt/server/events"
|
||||
|
||||
server.Events.OnMessage = func(cl events.Client, pk events.Packet) (pkx events.Packet, err error) {
|
||||
fmt.Printf("<< OnConnect client connected %s: %+v\n", cl.ID, pk)
|
||||
}
|
||||
```
|
||||
|
||||
##### OnDisconnect
|
||||
`server.Events.OnDisconnect` is called when a client disconnects to the broker. If the client disconnected abnormally, the reason is indicated in the `err` error parameter.
|
||||
|
||||
```go
|
||||
server.Events.OnDisconnect = func(cl events.Client, err error) {
|
||||
fmt.Printf("<< OnDisconnect client dicconnected %s: %v\n", cl.ID, err)
|
||||
}
|
||||
```
|
||||
|
||||
##### OnMessage
|
||||
`server.Events.OnMessage` is called when a Publish packet is received. The function receives the published message and information about the client who published it. This function will block message dispatching until it returns.
|
||||
`server.Events.OnMessage` is called when a Publish packet is received. The method receives the published message and information about the client who published it.
|
||||
|
||||
> This hook is only triggered when a message is received by clients. It is not triggered when using the direct `server.Publish` method.
|
||||
|
||||
@@ -124,7 +147,8 @@ server.Events.OnMessage = func(cl events.Client, pk events.Packet) (pkx events.P
|
||||
}
|
||||
```
|
||||
|
||||
A working example can be found in the `examples/events` folder. Please open an issue if there is a particular event hook you are interested in!
|
||||
The OnMessage hook can also be used to selectively only deliver messages to one or more clients based on their id, using the `AllowClients []string` field on the packet structure.
|
||||
|
||||
|
||||
#### Direct Publishing
|
||||
When the broker is being embedded in a larger codebase, it can be useful to be able to publish messages directly to clients without having to implement a loopback TCP connection with an MQTT client. The `Publish` method allows you to inject publish messages directly into a queue to be delivered to any clients with matching topic filters. The `Retain` flag is supported.
|
||||
|
@@ -44,6 +44,16 @@ func main() {
|
||||
}
|
||||
}()
|
||||
|
||||
// Add OnConnect Event Hook
|
||||
server.Events.OnConnect = func(cl events.Client, pk events.Packet) {
|
||||
fmt.Printf("<< OnConnect client connected %s: %+v\n", cl.ID, pk)
|
||||
}
|
||||
|
||||
// Add OnDisconnect Event Hook
|
||||
server.Events.OnDisconnect = func(cl events.Client, err error) {
|
||||
fmt.Printf("<< OnDisconnect client dicconnected %s: %v\n", cl.ID, err)
|
||||
}
|
||||
|
||||
// Add OnMessage Event Hook
|
||||
server.Events.OnMessage = func(cl events.Client, pk events.Packet) (pkx events.Packet, err error) {
|
||||
pkx = pk
|
||||
|
@@ -6,7 +6,9 @@ import (
|
||||
)
|
||||
|
||||
type Events struct {
|
||||
OnMessage // published message receieved.
|
||||
OnMessage // published message receieved.
|
||||
OnConnect // client connected.
|
||||
OnDisconnect // client disconnected.
|
||||
}
|
||||
|
||||
type Packet packets.Packet
|
||||
@@ -17,7 +19,7 @@ type Client struct {
|
||||
}
|
||||
|
||||
// FromClient returns an event client from a client.
|
||||
func FromClient(cl clients.Client) Client {
|
||||
func FromClient(cl *clients.Client) Client {
|
||||
return Client{
|
||||
ID: cl.ID,
|
||||
Listener: cl.Listener,
|
||||
@@ -34,3 +36,11 @@ func FromClient(cl clients.Client) Client {
|
||||
// This function will block message dispatching until it returns. To minimise this,
|
||||
// have the function open a new goroutine on the embedding side.
|
||||
type OnMessage func(Client, Packet) (Packet, error)
|
||||
|
||||
// OnConnect is called when a client successfully connects to the broker.
|
||||
type OnConnect func(Client, Packet)
|
||||
|
||||
// OnDisconnect is called when a client disconnects to the broker. An error value
|
||||
// is passed to the function if the client disconnected abnormally, otherwise it
|
||||
// will be nil on a normal disconnect.
|
||||
type OnDisconnect func(Client, error)
|
||||
|
@@ -44,11 +44,8 @@ func (l *MockListener) Serve(establisher EstablishFunc) {
|
||||
l.Lock()
|
||||
l.Serving = true
|
||||
l.Unlock()
|
||||
for {
|
||||
select {
|
||||
case <-l.done:
|
||||
return
|
||||
}
|
||||
for range l.done {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -63,10 +63,12 @@ func (l *TCP) Listen(s *system.Info) error {
|
||||
var err error
|
||||
|
||||
if l.config.TLS != nil && len(l.config.TLS.Certificate) > 0 && len(l.config.TLS.PrivateKey) > 0 {
|
||||
cert, err := tls.X509KeyPair(l.config.TLS.Certificate, l.config.TLS.PrivateKey)
|
||||
var cert tls.Certificate
|
||||
cert, err = tls.X509KeyPair(l.config.TLS.Certificate, l.config.TLS.PrivateKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
l.listen, err = tls.Listen(l.protocol, l.address, &tls.Config{
|
||||
Certificates: []tls.Certificate{cert},
|
||||
})
|
||||
|
@@ -17,12 +17,13 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
ErrInvalidMessage = errors.New("Message type not binary")
|
||||
ErrInvalidMessage = errors.New("message type not binary")
|
||||
|
||||
// wsUpgrader is used to upgrade the incoming http/tcp connection to a
|
||||
// websocket compliant connection.
|
||||
wsUpgrader = &websocket.Upgrader{
|
||||
Subprotocols: []string{"mqtt"},
|
||||
CheckOrigin: func(r *http.Request) bool { return true },
|
||||
}
|
||||
)
|
||||
|
||||
|
@@ -22,7 +22,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
Version = "1.0.3" // the server version.
|
||||
Version = "1.0.5" // the server version.
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -259,6 +259,10 @@ func (s *Server) EstablishConnection(lid string, c net.Conn, ac auth.Controller)
|
||||
})
|
||||
}
|
||||
|
||||
if s.Events.OnConnect != nil {
|
||||
s.Events.OnConnect(events.FromClient(cl), events.Packet(pk))
|
||||
}
|
||||
|
||||
err = cl.Read(s.processPacket)
|
||||
if err != nil {
|
||||
s.closeClient(cl, true)
|
||||
@@ -270,6 +274,10 @@ func (s *Server) EstablishConnection(lid string, c net.Conn, ac auth.Controller)
|
||||
atomic.AddInt64(&s.System.ClientsConnected, -1)
|
||||
atomic.AddInt64(&s.System.ClientsDisconnected, 1)
|
||||
|
||||
if s.Events.OnDisconnect != nil {
|
||||
s.Events.OnDisconnect(events.FromClient(cl), err)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -413,7 +421,7 @@ func (s *Server) processPublish(cl *clients.Client, pk packets.Packet) error {
|
||||
|
||||
// if an OnMessage hook exists, potentially modify the packet.
|
||||
if s.Events.OnMessage != nil {
|
||||
if pkx, err := s.Events.OnMessage(events.FromClient(*cl), events.Packet(pk)); err == nil {
|
||||
if pkx, err := s.Events.OnMessage(events.FromClient(cl), events.Packet(pk)); err == nil {
|
||||
pk = packets.Packet(pkx)
|
||||
}
|
||||
}
|
||||
|
@@ -229,6 +229,203 @@ func TestServerEstablishConnectionOKCleanSession(t *testing.T) {
|
||||
w.Close()
|
||||
}
|
||||
|
||||
func TestServerEventOnConnect(t *testing.T) {
|
||||
r, w := net.Pipe()
|
||||
s, cl, _, _ := setupClient()
|
||||
s.Clients.Add(cl)
|
||||
|
||||
var hookedClient events.Client
|
||||
var hookedPacket events.Packet
|
||||
s.Events.OnConnect = func(cl events.Client, pk events.Packet) {
|
||||
hookedClient = cl
|
||||
hookedPacket = pk
|
||||
}
|
||||
|
||||
o := make(chan error)
|
||||
go func() {
|
||||
o <- s.EstablishConnection("tcp", r, new(auth.Allow))
|
||||
}()
|
||||
|
||||
go func() {
|
||||
w.Write([]byte{
|
||||
byte(packets.Connect << 4), 17, // Fixed header
|
||||
0, 4, // Protocol Name - MSB+LSB
|
||||
'M', 'Q', 'T', 'T', // Protocol Name
|
||||
4, // Protocol Version
|
||||
2, // Packet Flags - clean session
|
||||
0, 45, // Keepalive
|
||||
0, 5, // Client ID - MSB+LSB
|
||||
'm', 'o', 'c', 'h', 'i', // Client ID
|
||||
})
|
||||
w.Write([]byte{byte(packets.Disconnect << 4), 0})
|
||||
}()
|
||||
|
||||
// Receive the Connack
|
||||
recv := make(chan []byte)
|
||||
go func() {
|
||||
buf, err := ioutil.ReadAll(w)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
recv <- buf
|
||||
}()
|
||||
|
||||
clw, ok := s.Clients.Get("mochi")
|
||||
require.Equal(t, true, ok)
|
||||
clw.Stop()
|
||||
|
||||
errx := <-o
|
||||
require.NoError(t, errx)
|
||||
require.Equal(t, []byte{
|
||||
byte(packets.Connack << 4), 2,
|
||||
0, packets.Accepted,
|
||||
}, <-recv)
|
||||
require.Empty(t, clw.Subscriptions)
|
||||
|
||||
w.Close()
|
||||
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
require.Equal(t, events.Client{
|
||||
ID: "mochi",
|
||||
Listener: "tcp",
|
||||
}, hookedClient)
|
||||
|
||||
require.Equal(t, events.Packet(packets.Packet{
|
||||
FixedHeader: packets.FixedHeader{
|
||||
Type: packets.Connect,
|
||||
Remaining: 17,
|
||||
},
|
||||
ProtocolName: []byte{'M', 'Q', 'T', 'T'},
|
||||
ProtocolVersion: 4,
|
||||
CleanSession: true,
|
||||
Keepalive: 45,
|
||||
ClientIdentifier: "mochi",
|
||||
}), hookedPacket)
|
||||
|
||||
}
|
||||
|
||||
func TestServerEventOnDisconnect(t *testing.T) {
|
||||
r, w := net.Pipe()
|
||||
s, cl, _, _ := setupClient()
|
||||
s.Clients.Add(cl)
|
||||
|
||||
var hookedClient events.Client
|
||||
var hookedErr error
|
||||
s.Events.OnDisconnect = func(cl events.Client, err error) {
|
||||
hookedClient = cl
|
||||
hookedErr = err
|
||||
}
|
||||
|
||||
o := make(chan error)
|
||||
go func() {
|
||||
o <- s.EstablishConnection("tcp", r, new(auth.Allow))
|
||||
}()
|
||||
|
||||
go func() {
|
||||
w.Write([]byte{
|
||||
byte(packets.Connect << 4), 17, // Fixed header
|
||||
0, 4, // Protocol Name - MSB+LSB
|
||||
'M', 'Q', 'T', 'T', // Protocol Name
|
||||
4, // Protocol Version
|
||||
2, // Packet Flags - clean session
|
||||
0, 45, // Keepalive
|
||||
0, 5, // Client ID - MSB+LSB
|
||||
'm', 'o', 'c', 'h', 'i', // Client ID
|
||||
})
|
||||
w.Write([]byte{byte(packets.Disconnect << 4), 0})
|
||||
}()
|
||||
|
||||
// Receive the Connack
|
||||
recv := make(chan []byte)
|
||||
go func() {
|
||||
buf, err := ioutil.ReadAll(w)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
recv <- buf
|
||||
}()
|
||||
|
||||
clw, ok := s.Clients.Get("mochi")
|
||||
require.Equal(t, true, ok)
|
||||
clw.Stop()
|
||||
|
||||
errx := <-o
|
||||
require.NoError(t, errx)
|
||||
|
||||
require.Equal(t, []byte{
|
||||
byte(packets.Connack << 4), 2,
|
||||
0, packets.Accepted,
|
||||
}, <-recv)
|
||||
require.Empty(t, clw.Subscriptions)
|
||||
|
||||
w.Close()
|
||||
|
||||
require.Equal(t, events.Client{
|
||||
ID: "mochi",
|
||||
Listener: "tcp",
|
||||
}, hookedClient)
|
||||
|
||||
require.Equal(t, nil, hookedErr)
|
||||
}
|
||||
|
||||
func TestServerEventOnDisconnectOnError(t *testing.T) {
|
||||
r, w := net.Pipe()
|
||||
s, cl, _, _ := setupClient()
|
||||
s.Clients.Add(cl)
|
||||
|
||||
var hookedClient events.Client
|
||||
var hookedErr error
|
||||
s.Events.OnDisconnect = func(cl events.Client, err error) {
|
||||
hookedClient = cl
|
||||
hookedErr = err
|
||||
}
|
||||
|
||||
o := make(chan error)
|
||||
go func() {
|
||||
o <- s.EstablishConnection("tcp", r, new(auth.Allow))
|
||||
}()
|
||||
|
||||
go func() {
|
||||
w.Write([]byte{
|
||||
byte(packets.Connect << 4), 17, // Fixed header
|
||||
0, 4, // Protocol Name - MSB+LSB
|
||||
'M', 'Q', 'T', 'T', // Protocol Name
|
||||
4, // Protocol Version
|
||||
2, // Packet Flags - clean session
|
||||
0, 45, // Keepalive
|
||||
0, 5, // Client ID - MSB+LSB
|
||||
'm', 'o', 'c', 'h', 'i', // Client ID
|
||||
})
|
||||
|
||||
w.Write([]byte{0, 0})
|
||||
}()
|
||||
|
||||
// Receive the Connack
|
||||
go func() {
|
||||
_, err := ioutil.ReadAll(w)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
|
||||
clw, ok := s.Clients.Get("mochi")
|
||||
require.Equal(t, true, ok)
|
||||
clw.Stop()
|
||||
|
||||
errx := <-o
|
||||
require.Error(t, errx)
|
||||
require.Equal(t, "No valid packet available; 0", errx.Error())
|
||||
fmt.Println(hookedErr)
|
||||
require.Equal(t, errx, hookedErr)
|
||||
|
||||
require.Equal(t, events.Client{
|
||||
ID: "mochi",
|
||||
Listener: "tcp",
|
||||
}, hookedClient)
|
||||
|
||||
}
|
||||
|
||||
func TestServerEstablishConnectionOKInheritSession(t *testing.T) {
|
||||
s := New()
|
||||
|
||||
@@ -442,6 +639,10 @@ func TestServerEstablishConnectionReadPacketErr(t *testing.T) {
|
||||
require.Error(t, errx)
|
||||
}
|
||||
|
||||
func TestServerOnDisconnectErr(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
func TestServerWriteClient(t *testing.T) {
|
||||
s, cl, r, w := setupClient()
|
||||
cl.ID = "mochi"
|
||||
@@ -931,7 +1132,7 @@ func TestServerPublishInlineSysTopicError(t *testing.T) {
|
||||
require.Equal(t, int64(0), s.System.BytesSent)
|
||||
}
|
||||
|
||||
func TestServerProcessPublishHookOnMessage(t *testing.T) {
|
||||
func TestServerEventOnMessage(t *testing.T) {
|
||||
s, cl1, r1, w1 := setupClient()
|
||||
s.Clients.Add(cl1)
|
||||
s.Topics.Subscribe("a/b/+", cl1.ID, 0)
|
||||
|
Reference in New Issue
Block a user