mirror of
https://github.com/mochi-mqtt/server.git
synced 2025-10-06 16:47:21 +08:00
Merge OnMessage and OnMessageModify
This commit is contained in:
@@ -45,18 +45,13 @@ func main() {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
// Add OnMessage Event Hook
|
// Add OnMessage Event Hook
|
||||||
server.Events.OnMessage = func(cl events.Client, pk events.Packet) {
|
server.Events.OnMessage = func(cl events.Client, pk events.Packet) (pkx events.Packet, err error) {
|
||||||
fmt.Printf("< OnMessage received message from client %s: %s\n", cl.ID, string(pk.Payload))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add OnMessage Event Hook
|
|
||||||
server.Events.OnMessageModify = func(cl events.Client, pk events.Packet) (pkx events.Packet, err error) {
|
|
||||||
pkx = pk
|
pkx = pk
|
||||||
if string(pk.Payload) == "hello" {
|
if string(pk.Payload) == "hello" {
|
||||||
pkx.Payload = []byte("hello world")
|
pkx.Payload = []byte("hello world")
|
||||||
fmt.Printf("< OnMessageModify modified message from client %s: %s\n", cl.ID, string(pkx.Payload))
|
fmt.Printf("< OnMessage modified message from client %s: %s\n", cl.ID, string(pkx.Payload))
|
||||||
} else {
|
} else {
|
||||||
fmt.Printf("< OnMessageModify received message from client %s: %s\n", cl.ID, string(pkx.Payload))
|
fmt.Printf("< OnMessage received message from client %s: %s\n", cl.ID, string(pkx.Payload))
|
||||||
}
|
}
|
||||||
|
|
||||||
return pkx, nil
|
return pkx, nil
|
||||||
|
@@ -6,8 +6,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type Events struct {
|
type Events struct {
|
||||||
OnMessage // published message receieved.
|
OnMessage // published message receieved.
|
||||||
OnMessageModify // modify a received published message.
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type Packet packets.Packet
|
type Packet packets.Packet
|
||||||
@@ -28,13 +27,10 @@ func FromClient(cl clients.Client) Client {
|
|||||||
// OnMessage function is called when a publish message is received. Note,
|
// OnMessage function is called when a publish message is received. Note,
|
||||||
// this hook is ONLY called by connected client publishers, it is not triggered when
|
// this hook is ONLY called by connected client publishers, it is not triggered when
|
||||||
// using the direct s.Publish method. The function receives the sent message and the
|
// using the direct s.Publish method. The function receives the sent message and the
|
||||||
// data of the client who published it. This function will block message dispatching
|
// data of the client who published it, and allows the packet to be modified
|
||||||
// until it returns. To minimise this, have the function open a new goroutine on the
|
// before it is dispatched to subscribers. If no modification is required, return
|
||||||
// embedding side.
|
// the original packet data. If an error occurs, the original packet will
|
||||||
type OnMessage func(Client, Packet)
|
// be dispatched as if the event hook had not been triggered.
|
||||||
|
// This function will block message dispatching until it returns. To minimise this,
|
||||||
// OnMessageModify is the same as OnMessage except it allows the packet to be modified
|
// have the function open a new goroutine on the embedding side.
|
||||||
// before it is dispatched to subscribers. If an error occurs, the original packet will
|
type OnMessage func(Client, Packet) (Packet, error)
|
||||||
// be dispatched as if the event hook had not been triggered. Please implement your own
|
|
||||||
// error handling within the hook. This function will block message dispatching until it returns.
|
|
||||||
type OnMessageModify func(Client, Packet) (Packet, error)
|
|
||||||
|
@@ -415,9 +415,9 @@ func (s *Server) processPublish(cl *clients.Client, pk packets.Packet) error {
|
|||||||
s.Events.OnMessage(events.FromClient(*cl), events.Packet(pk))
|
s.Events.OnMessage(events.FromClient(*cl), events.Packet(pk))
|
||||||
}
|
}
|
||||||
|
|
||||||
// if an OnMessageModify hook exists, potentially modify the packet.
|
// if an OnMessage hook exists, potentially modify the packet.
|
||||||
if s.Events.OnMessageModify != nil {
|
if s.Events.OnMessage != nil {
|
||||||
if pkx, err := s.Events.OnMessageModify(events.FromClient(*cl), events.Packet(pk)); err == nil {
|
if pkx, err := s.Events.OnMessage(events.FromClient(*cl), events.Packet(pk)); err == nil {
|
||||||
pk = packets.Packet(pkx)
|
pk = packets.Packet(pkx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -929,9 +929,8 @@ func TestServerProcessPublishHookOnMessage(t *testing.T) {
|
|||||||
|
|
||||||
var hookedPacket events.Packet
|
var hookedPacket events.Packet
|
||||||
var hookedClient events.Client
|
var hookedClient events.Client
|
||||||
s.Events.OnMessage = func(cl events.Client, pk events.Packet) {
|
s.Events.OnMessage = func(cl events.Client, pk events.Packet) (events.Packet, error) {
|
||||||
hookedPacket = pk
|
return pk, nil
|
||||||
hookedClient = cl
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ack1 := make(chan []byte)
|
ack1 := make(chan []byte)
|
||||||
@@ -981,7 +980,7 @@ func TestServerProcessPublishHookOnMessageModify(t *testing.T) {
|
|||||||
|
|
||||||
var hookedPacket events.Packet
|
var hookedPacket events.Packet
|
||||||
var hookedClient events.Client
|
var hookedClient events.Client
|
||||||
s.Events.OnMessageModify = func(cl events.Client, pk events.Packet) (events.Packet, error) {
|
s.Events.OnMessage = func(cl events.Client, pk events.Packet) (events.Packet, error) {
|
||||||
hookedPacket = pk
|
hookedPacket = pk
|
||||||
hookedPacket.Payload = []byte("world")
|
hookedPacket.Payload = []byte("world")
|
||||||
hookedClient = cl
|
hookedClient = cl
|
||||||
@@ -1031,7 +1030,7 @@ func TestServerProcessPublishHookOnMessageModifyError(t *testing.T) {
|
|||||||
s.Clients.Add(cl1)
|
s.Clients.Add(cl1)
|
||||||
s.Topics.Subscribe("a/b/+", cl1.ID, 0)
|
s.Topics.Subscribe("a/b/+", cl1.ID, 0)
|
||||||
|
|
||||||
s.Events.OnMessageModify = func(cl events.Client, pk events.Packet) (events.Packet, error) {
|
s.Events.OnMessage = func(cl events.Client, pk events.Packet) (events.Packet, error) {
|
||||||
pkx := pk
|
pkx := pk
|
||||||
pkx.Payload = []byte("world")
|
pkx.Payload = []byte("world")
|
||||||
return pkx, fmt.Errorf("error")
|
return pkx, fmt.Errorf("error")
|
||||||
|
Reference in New Issue
Block a user