mirror of
https://github.com/mochi-mqtt/server.git
synced 2025-09-27 04:26:23 +08:00
Compare commits
12 Commits
v2.6.5
...
dependabot
Author | SHA1 | Date | |
---|---|---|---|
![]() |
06f33b1de6 | ||
![]() |
5b7f94bde4 | ||
![]() |
b4332150f8 | ||
![]() |
de82bddb83 | ||
![]() |
ce88723041 | ||
![]() |
d3b62035f9 | ||
![]() |
9441e92595 | ||
![]() |
bfaf78332b | ||
![]() |
043906876e | ||
![]() |
dcb814cedf | ||
![]() |
8f52b891d5 | ||
![]() |
47536b77f2 |
12
clients.go
12
clients.go
@@ -150,7 +150,7 @@ type ClientState struct {
|
||||
disconnected int64 // the time the client disconnected in unix time, for calculating expiry
|
||||
outbound chan *packets.Packet // queue for pending outbound packets
|
||||
endOnce sync.Once // only end once
|
||||
isTakenOver uint32 // used to identify orphaned clients
|
||||
isTakenOver atomic.Bool // used to identify orphaned clients
|
||||
packetID uint32 // the current highest packetID
|
||||
open context.Context // indicate that the client is open for packet exchange
|
||||
cancelOpen context.CancelFunc // cancel function for open context
|
||||
@@ -427,6 +427,10 @@ func (cl *Client) Closed() bool {
|
||||
return cl.State.open == nil || cl.State.open.Err() != nil
|
||||
}
|
||||
|
||||
func (cl *Client) IsTakenOver() bool {
|
||||
return cl.State.isTakenOver.Load()
|
||||
}
|
||||
|
||||
// ReadFixedHeader reads in the values of the next packet's fixed header.
|
||||
func (cl *Client) ReadFixedHeader(fh *packets.FixedHeader) error {
|
||||
if cl.Net.bconn == nil {
|
||||
@@ -529,7 +533,11 @@ func (cl *Client) WritePacket(pk packets.Packet) error {
|
||||
}
|
||||
|
||||
if pk.Expiry > 0 {
|
||||
pk.Properties.MessageExpiryInterval = uint32(pk.Expiry - time.Now().Unix()) // [MQTT-3.3.2-6]
|
||||
expiry := pk.Expiry - time.Now().Unix()
|
||||
if expiry < 1 {
|
||||
expiry = 1
|
||||
}
|
||||
pk.Properties.MessageExpiryInterval = uint32(expiry) // [MQTT-3.3.2-6]
|
||||
}
|
||||
|
||||
pk.ProtocolVersion = cl.Properties.ProtocolVersion
|
||||
|
@@ -599,6 +599,13 @@ func TestClientClosed(t *testing.T) {
|
||||
require.True(t, cl.Closed())
|
||||
}
|
||||
|
||||
func TestClientIsTakenOver(t *testing.T) {
|
||||
cl, _, _ := newTestClient()
|
||||
require.False(t, cl.IsTakenOver())
|
||||
cl.State.isTakenOver.Store(true)
|
||||
require.True(t, cl.IsTakenOver())
|
||||
}
|
||||
|
||||
func TestClientReadFixedHeaderError(t *testing.T) {
|
||||
cl, r, _ := newTestClient()
|
||||
defer cl.Stop(errClientStop)
|
||||
|
20
cmd/main.go
20
cmd/main.go
@@ -5,6 +5,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"flag"
|
||||
"log"
|
||||
"os"
|
||||
@@ -20,6 +21,8 @@ func main() {
|
||||
tcpAddr := flag.String("tcp", ":1883", "network address for TCP listener")
|
||||
wsAddr := flag.String("ws", ":1882", "network address for Websocket listener")
|
||||
infoAddr := flag.String("info", ":8080", "network address for web info dashboard listener")
|
||||
tlsCertFile := flag.String("tls-cert-file", "", "TLS certificate file")
|
||||
tlsKeyFile := flag.String("tls-key-file", "", "TLS key file")
|
||||
flag.Parse()
|
||||
|
||||
sigs := make(chan os.Signal, 1)
|
||||
@@ -30,12 +33,25 @@ func main() {
|
||||
done <- true
|
||||
}()
|
||||
|
||||
var tlsConfig *tls.Config
|
||||
|
||||
if tlsCertFile != nil && tlsKeyFile != nil && *tlsCertFile != "" && *tlsKeyFile != "" {
|
||||
cert, err := tls.LoadX509KeyPair(*tlsCertFile, *tlsKeyFile)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
tlsConfig = &tls.Config{
|
||||
Certificates: []tls.Certificate{cert},
|
||||
}
|
||||
}
|
||||
|
||||
server := mqtt.New(nil)
|
||||
_ = server.AddHook(new(auth.AllowHook), nil)
|
||||
|
||||
tcp := listeners.NewTCP(listeners.Config{
|
||||
ID: "t1",
|
||||
Address: *tcpAddr,
|
||||
ID: "t1",
|
||||
Address: *tcpAddr,
|
||||
TLSConfig: tlsConfig,
|
||||
})
|
||||
err := server.AddListener(tcp)
|
||||
if err != nil {
|
||||
|
@@ -57,7 +57,10 @@ func main() {
|
||||
done <- true
|
||||
}()
|
||||
|
||||
cert, err := tls.X509KeyPair(testCertificate, testPrivateKey)
|
||||
// Load tls cert from your cert file
|
||||
cert, err := tls.LoadX509KeyPair("replace_your_cert.pem", "replace_your_cert.key")
|
||||
|
||||
//cert, err := tls.X509KeyPair(testCertificate, testPrivateKey)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
9
go.mod
9
go.mod
@@ -1,6 +1,7 @@
|
||||
module github.com/mochi-mqtt/server/v2
|
||||
|
||||
go 1.21
|
||||
toolchain go1.24.1
|
||||
|
||||
require (
|
||||
github.com/alicebob/miniredis/v2 v2.23.0
|
||||
@@ -30,7 +31,7 @@ require (
|
||||
github.com/dustin/go-humanize v1.0.0 // indirect
|
||||
github.com/getsentry/sentry-go v0.18.0 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/golang/glog v1.0.0 // indirect
|
||||
github.com/golang/glog v1.2.4 // indirect
|
||||
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
|
||||
github.com/golang/protobuf v1.5.2 // indirect
|
||||
github.com/golang/snappy v0.0.4 // indirect
|
||||
@@ -49,8 +50,8 @@ require (
|
||||
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 // indirect
|
||||
go.opencensus.io v0.22.5 // indirect
|
||||
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect
|
||||
golang.org/x/net v0.23.0 // indirect
|
||||
golang.org/x/sys v0.18.0 // indirect
|
||||
golang.org/x/text v0.14.0 // indirect
|
||||
golang.org/x/net v0.38.0 // indirect
|
||||
golang.org/x/sys v0.31.0 // indirect
|
||||
golang.org/x/text v0.23.0 // indirect
|
||||
google.golang.org/protobuf v1.33.0 // indirect
|
||||
)
|
||||
|
24
go.sum
24
go.sum
@@ -110,8 +110,8 @@ github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a
|
||||
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
|
||||
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
|
||||
github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ=
|
||||
github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4=
|
||||
github.com/golang/glog v1.2.4 h1:CNNw5U8lSiiBk7druxtSHHTsRWcxKoac6kZKm2peBBc=
|
||||
github.com/golang/glog v1.2.4/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w=
|
||||
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
|
||||
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
|
||||
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18hOBcwBwiTsbnFeTZHV9eER/QT5JVZxY=
|
||||
@@ -155,8 +155,8 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
|
||||
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
|
||||
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
|
||||
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
|
||||
@@ -350,8 +350,8 @@ golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81R
|
||||
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
|
||||
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
|
||||
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
|
||||
golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8=
|
||||
golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
|
||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||
@@ -368,8 +368,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
|
||||
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw=
|
||||
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw=
|
||||
golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
@@ -409,8 +409,8 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w
|
||||
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
|
||||
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
|
||||
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
@@ -418,8 +418,8 @@ golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3
|
||||
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
|
||||
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
|
||||
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
|
||||
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
|
||||
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
|
2
hooks.go
2
hooks.go
@@ -405,6 +405,8 @@ func (h *Hooks) OnPublish(cl *Client, pk packets.Packet) (pkx packets.Packet, er
|
||||
"hook", hook.ID(),
|
||||
"packet", pkx)
|
||||
return pk, err
|
||||
} else if errors.Is(err, packets.CodeSuccessIgnore) {
|
||||
return pk, err
|
||||
}
|
||||
h.Log.Error("publish packet error",
|
||||
"error", err,
|
||||
|
@@ -292,6 +292,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
|
||||
TopicName: pk.TopicName,
|
||||
Payload: pk.Payload,
|
||||
Created: pk.Created,
|
||||
Client: cl.ID,
|
||||
Origin: pk.Origin,
|
||||
Properties: storage.MessageProperties{
|
||||
PayloadFormat: props.PayloadFormat,
|
||||
@@ -319,6 +320,7 @@ func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, rese
|
||||
in := &storage.Message{
|
||||
ID: inflightKey(cl, pk),
|
||||
T: storage.InflightKey,
|
||||
Client: cl.ID,
|
||||
Origin: pk.Origin,
|
||||
PacketID: pk.PacketID,
|
||||
FixedHeader: pk.FixedHeader,
|
||||
|
@@ -260,6 +260,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
|
||||
TopicName: pk.TopicName,
|
||||
Payload: pk.Payload,
|
||||
Created: pk.Created,
|
||||
Client: cl.ID,
|
||||
Origin: pk.Origin,
|
||||
Properties: storage.MessageProperties{
|
||||
PayloadFormat: props.PayloadFormat,
|
||||
@@ -287,6 +288,7 @@ func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, rese
|
||||
in := &storage.Message{
|
||||
ID: inflightKey(cl, pk),
|
||||
T: storage.InflightKey,
|
||||
Client: cl.ID,
|
||||
Origin: pk.Origin,
|
||||
FixedHeader: pk.FixedHeader,
|
||||
TopicName: pk.TopicName,
|
||||
|
@@ -268,6 +268,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
|
||||
TopicName: pk.TopicName,
|
||||
Payload: pk.Payload,
|
||||
Created: pk.Created,
|
||||
Client: cl.ID,
|
||||
Origin: pk.Origin,
|
||||
Properties: storage.MessageProperties{
|
||||
PayloadFormat: props.PayloadFormat,
|
||||
@@ -295,6 +296,7 @@ func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, rese
|
||||
in := &storage.Message{
|
||||
ID: inflightKey(cl, pk),
|
||||
T: storage.InflightKey,
|
||||
Client: cl.ID,
|
||||
Origin: pk.Origin,
|
||||
PacketID: pk.PacketID,
|
||||
FixedHeader: pk.FixedHeader,
|
||||
|
@@ -287,6 +287,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
|
||||
TopicName: pk.TopicName,
|
||||
Payload: pk.Payload,
|
||||
Created: pk.Created,
|
||||
Client: cl.ID,
|
||||
Origin: pk.Origin,
|
||||
Properties: storage.MessageProperties{
|
||||
PayloadFormat: props.PayloadFormat,
|
||||
@@ -317,6 +318,7 @@ func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, rese
|
||||
in := &storage.Message{
|
||||
ID: inflightKey(cl, pk),
|
||||
T: storage.InflightKey,
|
||||
Client: cl.ID,
|
||||
Origin: pk.Origin,
|
||||
FixedHeader: pk.FixedHeader,
|
||||
TopicName: pk.TopicName,
|
||||
|
@@ -89,6 +89,7 @@ type Message struct {
|
||||
Payload []byte `json:"payload"` // the message payload (if retained)
|
||||
T string `json:"t,omitempty"` // the data type
|
||||
ID string `json:"id,omitempty" storm:"id"` // the storage key
|
||||
Client string `json:"client,omitempty"` // the client id the message is for
|
||||
Origin string `json:"origin,omitempty"` // the id of the client who sent the message
|
||||
TopicName string `json:"topic_name,omitempty"` // the topic the message was sent to (if retained)
|
||||
FixedHeader packets.FixedHeader `json:"fixedheader"` // the header properties of the message
|
||||
|
44
server.go
44
server.go
@@ -27,7 +27,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
Version = "2.6.5" // the current server version.
|
||||
Version = "2.7.9" // the current server version.
|
||||
defaultSysTopicInterval int64 = 1 // the interval between $SYS topic publishes
|
||||
LocalListener = "local"
|
||||
InlineClientId = "inline"
|
||||
@@ -485,7 +485,7 @@ func (s *Server) attachClient(cl *Client, listener string) error {
|
||||
expire := (cl.Properties.ProtocolVersion == 5 && cl.Properties.Props.SessionExpiryInterval == 0) || (cl.Properties.ProtocolVersion < 5 && cl.Properties.Clean)
|
||||
s.hooks.OnDisconnect(cl, err, expire)
|
||||
|
||||
if expire && atomic.LoadUint32(&cl.State.isTakenOver) == 0 {
|
||||
if expire && !cl.IsTakenOver() {
|
||||
cl.ClearInflights()
|
||||
s.UnsubscribeClient(cl)
|
||||
s.Clients.Delete(cl.ID) // [MQTT-4.1.0-2] ![MQTT-3.1.2-23]
|
||||
@@ -565,11 +565,11 @@ func (s *Server) inheritClientSession(pk packets.Packet, cl *Client) bool {
|
||||
if pk.Connect.Clean || (existing.Properties.Clean && existing.Properties.ProtocolVersion < 5) { // [MQTT-3.1.2-4] [MQTT-3.1.4-4]
|
||||
s.UnsubscribeClient(existing)
|
||||
existing.ClearInflights()
|
||||
atomic.StoreUint32(&existing.State.isTakenOver, 1) // only set isTakenOver after unsubscribe has occurred
|
||||
return false // [MQTT-3.2.2-3]
|
||||
existing.State.isTakenOver.Store(true) // only set isTakenOver after unsubscribe has occurred
|
||||
return false // [MQTT-3.2.2-3]
|
||||
}
|
||||
|
||||
atomic.StoreUint32(&existing.State.isTakenOver, 1)
|
||||
existing.State.isTakenOver.Store(true)
|
||||
if existing.State.Inflight.Len() > 0 {
|
||||
cl.State.Inflight = existing.State.Inflight.Clone() // [MQTT-3.1.2-5]
|
||||
if cl.State.Inflight.maximumReceiveQuota == 0 && cl.ops.options.Capabilities.ReceiveMaximum != 0 {
|
||||
@@ -885,6 +885,11 @@ func (s *Server) processPublish(cl *Client, pk packets.Packet) error {
|
||||
pk.Origin = cl.ID
|
||||
pk.Created = time.Now().Unix()
|
||||
|
||||
if expiry := minimum(s.Options.Capabilities.MaximumMessageExpiryInterval,
|
||||
int64(pk.Properties.MessageExpiryInterval)); expiry > 0 {
|
||||
pk.Expiry = pk.Created + expiry
|
||||
}
|
||||
|
||||
if !cl.Net.Inline {
|
||||
if pki, ok := cl.State.Inflight.Get(pk.PacketID); ok {
|
||||
if pki.FixedHeader.Type == packets.Pubrec { // [MQTT-4.3.3-10]
|
||||
@@ -986,9 +991,11 @@ func (s *Server) publishToSubscribers(pk packets.Packet) {
|
||||
pk.Created = time.Now().Unix()
|
||||
}
|
||||
|
||||
pk.Expiry = pk.Created + s.Options.Capabilities.MaximumMessageExpiryInterval
|
||||
if pk.Properties.MessageExpiryInterval > 0 {
|
||||
pk.Expiry = pk.Created + int64(pk.Properties.MessageExpiryInterval)
|
||||
if pk.Expiry == 0 {
|
||||
if expiry := minimum(s.Options.Capabilities.MaximumMessageExpiryInterval,
|
||||
int64(pk.Properties.MessageExpiryInterval)); expiry > 0 {
|
||||
pk.Expiry = pk.Created + expiry
|
||||
}
|
||||
}
|
||||
|
||||
subscribers := s.Topics.Subscribers(pk.TopicName)
|
||||
@@ -1358,7 +1365,7 @@ func (s *Server) UnsubscribeClient(cl *Client) {
|
||||
cl.State.Subscriptions.Delete(k)
|
||||
}
|
||||
|
||||
if atomic.LoadUint32(&cl.State.isTakenOver) == 1 {
|
||||
if cl.IsTakenOver() {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1672,7 +1679,7 @@ func (s *Server) loadClients(v []storage.Client) {
|
||||
// loadInflight restores inflight messages from the datastore.
|
||||
func (s *Server) loadInflight(v []storage.Message) {
|
||||
for _, msg := range v {
|
||||
if client, ok := s.Clients.Get(msg.Origin); ok {
|
||||
if client, ok := s.Clients.Get(msg.Client); ok {
|
||||
client.State.Inflight.Set(msg.ToPacket())
|
||||
}
|
||||
}
|
||||
@@ -1755,3 +1762,20 @@ func (s *Server) sendDelayedLWT(dt int64) {
|
||||
func Int64toa(v int64) string {
|
||||
return strconv.FormatInt(v, 10)
|
||||
}
|
||||
|
||||
// minimum differs from built-in min, it returns minimum of the non-zero value a and b.
|
||||
// If both a and b are zero value, it reutrns 0.
|
||||
func minimum(a, b int64) (m int64) {
|
||||
if a != 0 {
|
||||
m = a
|
||||
if b != 0 && b < a {
|
||||
m = b
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if b != 0 {
|
||||
m = b
|
||||
}
|
||||
return
|
||||
}
|
||||
|
@@ -667,6 +667,7 @@ func TestEstablishConnectionInheritExisting(t *testing.T) {
|
||||
clw, ok := s.Clients.Get(packets.TPacketData[packets.Connect].Get(packets.TConnectMqtt311).Packet.Connect.ClientIdentifier)
|
||||
require.True(t, ok)
|
||||
require.NotEmpty(t, clw.State.Subscriptions)
|
||||
require.True(t, cl.IsTakenOver())
|
||||
|
||||
// Prevent sequential takeover memory-bloom.
|
||||
require.Empty(t, cl.State.Subscriptions.GetAll())
|
||||
@@ -761,6 +762,9 @@ func TestEstablishConnectionInheritExistingTrueTakeover(t *testing.T) {
|
||||
|
||||
_, _ = w2.Write(packets.TPacketData[packets.Disconnect].Get(packets.TDisconnect).RawBytes)
|
||||
require.NoError(t, <-o2)
|
||||
|
||||
require.True(t, clp1.IsTakenOver())
|
||||
require.False(t, clp2.IsTakenOver())
|
||||
}
|
||||
|
||||
func TestEstablishConnectionResentPendingInflightsError(t *testing.T) {
|
||||
@@ -848,12 +852,15 @@ func TestEstablishConnectionInheritExistingClean(t *testing.T) {
|
||||
require.Equal(t, packets.TPacketData[packets.Connack].Get(packets.TConnackAcceptedNoSession).RawBytes, <-recv)
|
||||
require.Equal(t, packets.TPacketData[packets.Disconnect].Get(packets.TDisconnect).RawBytes, <-takeover)
|
||||
|
||||
require.True(t, cl.IsTakenOver())
|
||||
|
||||
_ = w.Close()
|
||||
_ = r.Close()
|
||||
|
||||
clw, ok := s.Clients.Get(packets.TPacketData[packets.Connect].Get(packets.TConnectMqtt311).Packet.Connect.ClientIdentifier)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, 0, clw.State.Subscriptions.Len())
|
||||
|
||||
}
|
||||
|
||||
func TestEstablishConnectionBadAuthentication(t *testing.T) {
|
||||
@@ -3416,10 +3423,10 @@ func TestServerLoadInflightMessages(t *testing.T) {
|
||||
require.Equal(t, 3, s.Clients.Len())
|
||||
|
||||
v := []storage.Message{
|
||||
{Origin: "mochi", PacketID: 1, Payload: []byte("hello world"), TopicName: "a/b/c"},
|
||||
{Origin: "mochi", PacketID: 2, Payload: []byte("yes"), TopicName: "a/b/c"},
|
||||
{Origin: "zen", PacketID: 3, Payload: []byte("hello world"), TopicName: "a/b/c"},
|
||||
{Origin: "mochi-co", PacketID: 4, Payload: []byte("hello world"), TopicName: "a/b/c"},
|
||||
{Client: "mochi", Origin: "mochi", PacketID: 1, Payload: []byte("hello world"), TopicName: "a/b/c"},
|
||||
{Client: "mochi", Origin: "mochi", PacketID: 2, Payload: []byte("yes"), TopicName: "a/b/c"},
|
||||
{Client: "zen", Origin: "zen", PacketID: 3, Payload: []byte("hello world"), TopicName: "a/b/c"},
|
||||
{Client: "mochi-co", Origin: "mochi-co", PacketID: 4, Payload: []byte("hello world"), TopicName: "a/b/c"},
|
||||
}
|
||||
s.loadInflight(v)
|
||||
|
||||
@@ -3913,3 +3920,14 @@ func TestServerSubscribeWithRetainDifferentIdentifier(t *testing.T) {
|
||||
require.Equal(t, true, <-finishCh)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMinimum(t *testing.T) {
|
||||
require.EqualValues(t, 0, minimum(0, 0))
|
||||
require.EqualValues(t, 1, minimum(0, 1))
|
||||
require.EqualValues(t, 1, minimum(1, 0))
|
||||
require.EqualValues(t, 10, minimum(10, 20))
|
||||
require.EqualValues(t, 20, minimum(30, 20))
|
||||
require.EqualValues(t, -1, minimum(-1, 0)) // negative values are not used, but included here for completeness
|
||||
require.EqualValues(t, -1, minimum(-1, 20))
|
||||
require.EqualValues(t, -2, minimum(-1, -2))
|
||||
}
|
||||
|
Reference in New Issue
Block a user