Compare commits

...

18 Commits

Author SHA1 Message Date
mochi
a2c0749640 Update server version to 1.0.5 2022-01-24 18:46:34 +00:00
JB
37293aeecf Merge pull request #18 from mochi-co/feature/connect-disconnect-hooks
OnConnect and OnDisconnect Event Hooks
2022-01-24 18:44:39 +00:00
mochi
7a2d4db6a4 Update for OnConnect and OnDisconnect hooks 2022-01-24 18:42:09 +00:00
mochi
03d2a8bc82 Add tests for OnConnect, OnDisconnect 2022-01-24 18:29:18 +00:00
mochi
4b51e5c7d1 Add OnConnect and OnDisconnect hooks to example 2022-01-24 17:42:33 +00:00
mochi
d15ad682bf Call OnDisconnect Event if applicable 2022-01-24 17:42:19 +00:00
mochi
130ffcbb53 Add OnDisconnect Event Hook 2022-01-24 17:42:04 +00:00
mochi
33cf2f991b Add testbolt file to ignore list 2022-01-24 17:41:46 +00:00
mochi
a360ea6a6c Call OnConnect Event if applicable 2022-01-24 17:37:11 +00:00
mochi
ae3aa0d3fa Add OnConnect event hook 2022-01-24 17:36:50 +00:00
mochi
811ae0e1be Prevent locks being copied by passing non-pointer to FromClient 2022-01-24 17:36:14 +00:00
JB
51d6825430 Merge pull request #15 from ClarkQAQ/master
Fixed some bugs, wish the project better and better
2022-01-17 10:08:20 +00:00
clark
514288c53e update tcp.go maybe this will be better 2022-01-16 20:06:49 +08:00
clark
957fc0a049 fix local variable black hole 2022-01-16 18:23:45 +08:00
clark
03f94f948a update mock.go plase use range 2022-01-16 18:22:37 +08:00
clark
1bc752a2b8 fix [ST1005] strings should not be capitalized 2022-01-16 18:21:33 +08:00
clark
b9db59ba12 update websocket.go fix check origin 2022-01-16 18:20:06 +08:00
JB
c0ef58c363 Update README.md 2022-01-14 17:48:21 +00:00
9 changed files with 270 additions and 16 deletions

3
.gitignore vendored
View File

@@ -1,2 +1,3 @@
cmd/mqtt
.DS_Store
.DS_Store
server/persistence/bolt/testbolt.db

View File

@@ -25,9 +25,10 @@ MQTT stands for MQ Telemetry Transport. It is a publish/subscribe, extremely sim
- Interfaces for Client Authentication and Topic access control.
- Bolt-backed persistence and storage interfaces.
- Directly Publishing from embedding service (`s.Publish(topic, message, retain)`).
- Basic Event Hooks (currently `onMessage`)
- Basic Event Hooks (currently `OnMessage`, `OnConnect`, `OnDisconnect`)
#### Roadmap
- Please open an issue to request new features or event hooks.
- MQTT v5 compatibility?
#### Using the Broker
@@ -105,8 +106,30 @@ err := server.AddListener(tcp, &listeners.Config{
#### Event Hooks
Some basic Event Hooks have been added, allowing you to call your own functions when certain events occur. The execution of the functions are blocking - if necessary, please handle goroutines within the embedding service.
Working examples can be found in the `examples/events` folder. Please open an issue if there is a particular event hook you are interested in!
##### OnConnect
`server.Events.OnConnect` is called when a client successfully connects to the broker. The method receives the connect packet and the id and connection type for the client who connected.
```go
import "github.com/mochi-co/mqtt/server/events"
server.Events.OnMessage = func(cl events.Client, pk events.Packet) (pkx events.Packet, err error) {
fmt.Printf("<< OnConnect client connected %s: %+v\n", cl.ID, pk)
}
```
##### OnDisconnect
`server.Events.OnDisconnect` is called when a client disconnects to the broker. If the client disconnected abnormally, the reason is indicated in the `err` error parameter.
```go
server.Events.OnDisconnect = func(cl events.Client, err error) {
fmt.Printf("<< OnDisconnect client dicconnected %s: %v\n", cl.ID, err)
}
```
##### OnMessage
`server.Events.OnMessage` is called when a Publish packet is received. The function receives the published message and information about the client who published it. This function will block message dispatching until it returns.
`server.Events.OnMessage` is called when a Publish packet is received. The method receives the published message and information about the client who published it.
> This hook is only triggered when a message is received by clients. It is not triggered when using the direct `server.Publish` method.
@@ -124,7 +147,8 @@ server.Events.OnMessage = func(cl events.Client, pk events.Packet) (pkx events.P
}
```
A working example can be found in the `examples/events` folder. Please open an issue if there is a particular event hook you are interested in!
The OnMessage hook can also be used to selectively only deliver messages to one or more clients based on their id, using the `AllowClients []string` field on the packet structure.
#### Direct Publishing
When the broker is being embedded in a larger codebase, it can be useful to be able to publish messages directly to clients without having to implement a loopback TCP connection with an MQTT client. The `Publish` method allows you to inject publish messages directly into a queue to be delivered to any clients with matching topic filters. The `Retain` flag is supported.

View File

@@ -44,6 +44,16 @@ func main() {
}
}()
// Add OnConnect Event Hook
server.Events.OnConnect = func(cl events.Client, pk events.Packet) {
fmt.Printf("<< OnConnect client connected %s: %+v\n", cl.ID, pk)
}
// Add OnDisconnect Event Hook
server.Events.OnDisconnect = func(cl events.Client, err error) {
fmt.Printf("<< OnDisconnect client dicconnected %s: %v\n", cl.ID, err)
}
// Add OnMessage Event Hook
server.Events.OnMessage = func(cl events.Client, pk events.Packet) (pkx events.Packet, err error) {
pkx = pk

View File

@@ -6,7 +6,9 @@ import (
)
type Events struct {
OnMessage // published message receieved.
OnMessage // published message receieved.
OnConnect // client connected.
OnDisconnect // client disconnected.
}
type Packet packets.Packet
@@ -17,7 +19,7 @@ type Client struct {
}
// FromClient returns an event client from a client.
func FromClient(cl clients.Client) Client {
func FromClient(cl *clients.Client) Client {
return Client{
ID: cl.ID,
Listener: cl.Listener,
@@ -34,3 +36,11 @@ func FromClient(cl clients.Client) Client {
// This function will block message dispatching until it returns. To minimise this,
// have the function open a new goroutine on the embedding side.
type OnMessage func(Client, Packet) (Packet, error)
// OnConnect is called when a client successfully connects to the broker.
type OnConnect func(Client, Packet)
// OnDisconnect is called when a client disconnects to the broker. An error value
// is passed to the function if the client disconnected abnormally, otherwise it
// will be nil on a normal disconnect.
type OnDisconnect func(Client, error)

View File

@@ -44,11 +44,8 @@ func (l *MockListener) Serve(establisher EstablishFunc) {
l.Lock()
l.Serving = true
l.Unlock()
for {
select {
case <-l.done:
return
}
for range l.done {
return
}
}

View File

@@ -63,10 +63,12 @@ func (l *TCP) Listen(s *system.Info) error {
var err error
if l.config.TLS != nil && len(l.config.TLS.Certificate) > 0 && len(l.config.TLS.PrivateKey) > 0 {
cert, err := tls.X509KeyPair(l.config.TLS.Certificate, l.config.TLS.PrivateKey)
var cert tls.Certificate
cert, err = tls.X509KeyPair(l.config.TLS.Certificate, l.config.TLS.PrivateKey)
if err != nil {
return err
}
l.listen, err = tls.Listen(l.protocol, l.address, &tls.Config{
Certificates: []tls.Certificate{cert},
})

View File

@@ -17,12 +17,13 @@ import (
)
var (
ErrInvalidMessage = errors.New("Message type not binary")
ErrInvalidMessage = errors.New("message type not binary")
// wsUpgrader is used to upgrade the incoming http/tcp connection to a
// websocket compliant connection.
wsUpgrader = &websocket.Upgrader{
Subprotocols: []string{"mqtt"},
CheckOrigin: func(r *http.Request) bool { return true },
}
)

View File

@@ -22,7 +22,7 @@ import (
)
const (
Version = "1.0.3" // the server version.
Version = "1.0.5" // the server version.
)
var (
@@ -259,6 +259,10 @@ func (s *Server) EstablishConnection(lid string, c net.Conn, ac auth.Controller)
})
}
if s.Events.OnConnect != nil {
s.Events.OnConnect(events.FromClient(cl), events.Packet(pk))
}
err = cl.Read(s.processPacket)
if err != nil {
s.closeClient(cl, true)
@@ -270,6 +274,10 @@ func (s *Server) EstablishConnection(lid string, c net.Conn, ac auth.Controller)
atomic.AddInt64(&s.System.ClientsConnected, -1)
atomic.AddInt64(&s.System.ClientsDisconnected, 1)
if s.Events.OnDisconnect != nil {
s.Events.OnDisconnect(events.FromClient(cl), err)
}
return err
}
@@ -413,7 +421,7 @@ func (s *Server) processPublish(cl *clients.Client, pk packets.Packet) error {
// if an OnMessage hook exists, potentially modify the packet.
if s.Events.OnMessage != nil {
if pkx, err := s.Events.OnMessage(events.FromClient(*cl), events.Packet(pk)); err == nil {
if pkx, err := s.Events.OnMessage(events.FromClient(cl), events.Packet(pk)); err == nil {
pk = packets.Packet(pkx)
}
}

View File

@@ -229,6 +229,203 @@ func TestServerEstablishConnectionOKCleanSession(t *testing.T) {
w.Close()
}
func TestServerEventOnConnect(t *testing.T) {
r, w := net.Pipe()
s, cl, _, _ := setupClient()
s.Clients.Add(cl)
var hookedClient events.Client
var hookedPacket events.Packet
s.Events.OnConnect = func(cl events.Client, pk events.Packet) {
hookedClient = cl
hookedPacket = pk
}
o := make(chan error)
go func() {
o <- s.EstablishConnection("tcp", r, new(auth.Allow))
}()
go func() {
w.Write([]byte{
byte(packets.Connect << 4), 17, // Fixed header
0, 4, // Protocol Name - MSB+LSB
'M', 'Q', 'T', 'T', // Protocol Name
4, // Protocol Version
2, // Packet Flags - clean session
0, 45, // Keepalive
0, 5, // Client ID - MSB+LSB
'm', 'o', 'c', 'h', 'i', // Client ID
})
w.Write([]byte{byte(packets.Disconnect << 4), 0})
}()
// Receive the Connack
recv := make(chan []byte)
go func() {
buf, err := ioutil.ReadAll(w)
if err != nil {
panic(err)
}
recv <- buf
}()
clw, ok := s.Clients.Get("mochi")
require.Equal(t, true, ok)
clw.Stop()
errx := <-o
require.NoError(t, errx)
require.Equal(t, []byte{
byte(packets.Connack << 4), 2,
0, packets.Accepted,
}, <-recv)
require.Empty(t, clw.Subscriptions)
w.Close()
time.Sleep(10 * time.Millisecond)
require.Equal(t, events.Client{
ID: "mochi",
Listener: "tcp",
}, hookedClient)
require.Equal(t, events.Packet(packets.Packet{
FixedHeader: packets.FixedHeader{
Type: packets.Connect,
Remaining: 17,
},
ProtocolName: []byte{'M', 'Q', 'T', 'T'},
ProtocolVersion: 4,
CleanSession: true,
Keepalive: 45,
ClientIdentifier: "mochi",
}), hookedPacket)
}
func TestServerEventOnDisconnect(t *testing.T) {
r, w := net.Pipe()
s, cl, _, _ := setupClient()
s.Clients.Add(cl)
var hookedClient events.Client
var hookedErr error
s.Events.OnDisconnect = func(cl events.Client, err error) {
hookedClient = cl
hookedErr = err
}
o := make(chan error)
go func() {
o <- s.EstablishConnection("tcp", r, new(auth.Allow))
}()
go func() {
w.Write([]byte{
byte(packets.Connect << 4), 17, // Fixed header
0, 4, // Protocol Name - MSB+LSB
'M', 'Q', 'T', 'T', // Protocol Name
4, // Protocol Version
2, // Packet Flags - clean session
0, 45, // Keepalive
0, 5, // Client ID - MSB+LSB
'm', 'o', 'c', 'h', 'i', // Client ID
})
w.Write([]byte{byte(packets.Disconnect << 4), 0})
}()
// Receive the Connack
recv := make(chan []byte)
go func() {
buf, err := ioutil.ReadAll(w)
if err != nil {
panic(err)
}
recv <- buf
}()
clw, ok := s.Clients.Get("mochi")
require.Equal(t, true, ok)
clw.Stop()
errx := <-o
require.NoError(t, errx)
require.Equal(t, []byte{
byte(packets.Connack << 4), 2,
0, packets.Accepted,
}, <-recv)
require.Empty(t, clw.Subscriptions)
w.Close()
require.Equal(t, events.Client{
ID: "mochi",
Listener: "tcp",
}, hookedClient)
require.Equal(t, nil, hookedErr)
}
func TestServerEventOnDisconnectOnError(t *testing.T) {
r, w := net.Pipe()
s, cl, _, _ := setupClient()
s.Clients.Add(cl)
var hookedClient events.Client
var hookedErr error
s.Events.OnDisconnect = func(cl events.Client, err error) {
hookedClient = cl
hookedErr = err
}
o := make(chan error)
go func() {
o <- s.EstablishConnection("tcp", r, new(auth.Allow))
}()
go func() {
w.Write([]byte{
byte(packets.Connect << 4), 17, // Fixed header
0, 4, // Protocol Name - MSB+LSB
'M', 'Q', 'T', 'T', // Protocol Name
4, // Protocol Version
2, // Packet Flags - clean session
0, 45, // Keepalive
0, 5, // Client ID - MSB+LSB
'm', 'o', 'c', 'h', 'i', // Client ID
})
w.Write([]byte{0, 0})
}()
// Receive the Connack
go func() {
_, err := ioutil.ReadAll(w)
if err != nil {
panic(err)
}
}()
clw, ok := s.Clients.Get("mochi")
require.Equal(t, true, ok)
clw.Stop()
errx := <-o
require.Error(t, errx)
require.Equal(t, "No valid packet available; 0", errx.Error())
fmt.Println(hookedErr)
require.Equal(t, errx, hookedErr)
require.Equal(t, events.Client{
ID: "mochi",
Listener: "tcp",
}, hookedClient)
}
func TestServerEstablishConnectionOKInheritSession(t *testing.T) {
s := New()
@@ -442,6 +639,10 @@ func TestServerEstablishConnectionReadPacketErr(t *testing.T) {
require.Error(t, errx)
}
func TestServerOnDisconnectErr(t *testing.T) {
}
func TestServerWriteClient(t *testing.T) {
s, cl, r, w := setupClient()
cl.ID = "mochi"
@@ -931,7 +1132,7 @@ func TestServerPublishInlineSysTopicError(t *testing.T) {
require.Equal(t, int64(0), s.System.BytesSent)
}
func TestServerProcessPublishHookOnMessage(t *testing.T) {
func TestServerEventOnMessage(t *testing.T) {
s, cl1, r1, w1 := setupClient()
s.Clients.Add(cl1)
s.Topics.Subscribe("a/b/+", cl1.ID, 0)