Compare commits

...

9 Commits

Author SHA1 Message Date
mochi-co
de82bddb83 Update server version 2025-02-25 20:16:39 +00:00
mochi-co
ce88723041 update server version 2025-01-30 18:37:55 +00:00
dagehuifei
d3b62035f9 feat: Add TLS Cert File flag (#434)
Co-authored-by: JB <28275108+mochi-co@users.noreply.github.com>
2025-01-30 18:25:18 +00:00
dependabot[bot]
9441e92595 Bump golang.org/x/net from 0.23.0 to 0.33.0 (#450)
Bumps [golang.org/x/net](https://github.com/golang/net) from 0.23.0 to 0.33.0.
- [Commits](https://github.com/golang/net/compare/v0.23.0...v0.33.0)

---
updated-dependencies:
- dependency-name: golang.org/x/net
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: JB <28275108+mochi-co@users.noreply.github.com>
2025-01-30 18:05:35 +00:00
thedevop
bfaf78332b Suppress OnPublish CodeSuccessIgnore error log (#438)
* OnPublish CodeSuccessIgnore, use debug instead of error log

* Suppress OnPublish CodeSuccessIgnore error log

---------

Co-authored-by: JB <28275108+mochi-co@users.noreply.github.com>
2025-01-30 18:02:25 +00:00
thedevop
043906876e Add cl.IsTakenOver and switch cl.isTakenOver to atomic.Bool (#446)
* OnPublish CodeSuccessIgnore, use debug instead of error log

* Suppress OnPublish CodeSuccessIgnore error log

* Add cl.IsTakenOver and switch to use atomic.Bool

---------

Co-authored-by: JB <28275108+mochi-co@users.noreply.github.com>
2025-01-30 17:58:03 +00:00
dependabot[bot]
dcb814cedf Bump github.com/golang/glog from 1.0.0 to 1.2.4 (#449)
Bumps [github.com/golang/glog](https://github.com/golang/glog) from 1.0.0 to 1.2.4.
- [Release notes](https://github.com/golang/glog/releases)
- [Commits](https://github.com/golang/glog/compare/v1.0.0...v1.2.4)

---
updated-dependencies:
- dependency-name: github.com/golang/glog
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-01-30 17:54:44 +00:00
mochi-co
8f52b891d5 Update server version 2024-10-23 21:06:28 +01:00
s9-hfe
47536b77f2 Fix QoS 1 message delivery after server restart (#427)
Resolved an issue where persisted QoS 1 messages were not correctly loaded
into the appropriate client instance during server startup.
2024-10-23 21:02:45 +01:00
14 changed files with 79 additions and 31 deletions

View File

@@ -150,7 +150,7 @@ type ClientState struct {
disconnected int64 // the time the client disconnected in unix time, for calculating expiry
outbound chan *packets.Packet // queue for pending outbound packets
endOnce sync.Once // only end once
isTakenOver uint32 // used to identify orphaned clients
isTakenOver atomic.Bool // used to identify orphaned clients
packetID uint32 // the current highest packetID
open context.Context // indicate that the client is open for packet exchange
cancelOpen context.CancelFunc // cancel function for open context
@@ -427,6 +427,10 @@ func (cl *Client) Closed() bool {
return cl.State.open == nil || cl.State.open.Err() != nil
}
func (cl *Client) IsTakenOver() bool {
return cl.State.isTakenOver.Load()
}
// ReadFixedHeader reads in the values of the next packet's fixed header.
func (cl *Client) ReadFixedHeader(fh *packets.FixedHeader) error {
if cl.Net.bconn == nil {

View File

@@ -599,6 +599,13 @@ func TestClientClosed(t *testing.T) {
require.True(t, cl.Closed())
}
func TestClientIsTakenOver(t *testing.T) {
cl, _, _ := newTestClient()
require.False(t, cl.IsTakenOver())
cl.State.isTakenOver.Store(true)
require.True(t, cl.IsTakenOver())
}
func TestClientReadFixedHeaderError(t *testing.T) {
cl, r, _ := newTestClient()
defer cl.Stop(errClientStop)

View File

@@ -5,6 +5,7 @@
package main
import (
"crypto/tls"
"flag"
"log"
"os"
@@ -20,6 +21,8 @@ func main() {
tcpAddr := flag.String("tcp", ":1883", "network address for TCP listener")
wsAddr := flag.String("ws", ":1882", "network address for Websocket listener")
infoAddr := flag.String("info", ":8080", "network address for web info dashboard listener")
tlsCertFile := flag.String("tls-cert-file", "", "TLS certificate file")
tlsKeyFile := flag.String("tls-key-file", "", "TLS key file")
flag.Parse()
sigs := make(chan os.Signal, 1)
@@ -30,12 +33,25 @@ func main() {
done <- true
}()
var tlsConfig *tls.Config
if tlsCertFile != nil && tlsKeyFile != nil && *tlsCertFile != "" && *tlsKeyFile != "" {
cert, err := tls.LoadX509KeyPair(*tlsCertFile, *tlsKeyFile)
if err != nil {
return
}
tlsConfig = &tls.Config{
Certificates: []tls.Certificate{cert},
}
}
server := mqtt.New(nil)
_ = server.AddHook(new(auth.AllowHook), nil)
tcp := listeners.NewTCP(listeners.Config{
ID: "t1",
Address: *tcpAddr,
ID: "t1",
Address: *tcpAddr,
TLSConfig: tlsConfig,
})
err := server.AddListener(tcp)
if err != nil {

View File

@@ -57,7 +57,10 @@ func main() {
done <- true
}()
cert, err := tls.X509KeyPair(testCertificate, testPrivateKey)
// Load tls cert from your cert file
cert, err := tls.LoadX509KeyPair("replace_your_cert.pem", "replace_your_cert.key")
//cert, err := tls.X509KeyPair(testCertificate, testPrivateKey)
if err != nil {
log.Fatal(err)
}

8
go.mod
View File

@@ -30,7 +30,7 @@ require (
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/getsentry/sentry-go v0.18.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/glog v1.0.0 // indirect
github.com/golang/glog v1.2.4 // indirect
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
@@ -49,8 +49,8 @@ require (
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 // indirect
go.opencensus.io v0.22.5 // indirect
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
)

24
go.sum
View File

@@ -110,8 +110,8 @@ github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ=
github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4=
github.com/golang/glog v1.2.4 h1:CNNw5U8lSiiBk7druxtSHHTsRWcxKoac6kZKm2peBBc=
github.com/golang/glog v1.2.4/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18hOBcwBwiTsbnFeTZHV9eER/QT5JVZxY=
@@ -155,8 +155,8 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
@@ -350,8 +350,8 @@ golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -368,8 +368,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -409,8 +409,8 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -418,8 +418,8 @@ golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=

View File

@@ -405,6 +405,8 @@ func (h *Hooks) OnPublish(cl *Client, pk packets.Packet) (pkx packets.Packet, er
"hook", hook.ID(),
"packet", pkx)
return pk, err
} else if errors.Is(err, packets.CodeSuccessIgnore) {
return pk, err
}
h.Log.Error("publish packet error",
"error", err,

View File

@@ -292,6 +292,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
TopicName: pk.TopicName,
Payload: pk.Payload,
Created: pk.Created,
Client: cl.ID,
Origin: pk.Origin,
Properties: storage.MessageProperties{
PayloadFormat: props.PayloadFormat,
@@ -319,6 +320,7 @@ func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, rese
in := &storage.Message{
ID: inflightKey(cl, pk),
T: storage.InflightKey,
Client: cl.ID,
Origin: pk.Origin,
PacketID: pk.PacketID,
FixedHeader: pk.FixedHeader,

View File

@@ -260,6 +260,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
TopicName: pk.TopicName,
Payload: pk.Payload,
Created: pk.Created,
Client: cl.ID,
Origin: pk.Origin,
Properties: storage.MessageProperties{
PayloadFormat: props.PayloadFormat,
@@ -287,6 +288,7 @@ func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, rese
in := &storage.Message{
ID: inflightKey(cl, pk),
T: storage.InflightKey,
Client: cl.ID,
Origin: pk.Origin,
FixedHeader: pk.FixedHeader,
TopicName: pk.TopicName,

View File

@@ -268,6 +268,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
TopicName: pk.TopicName,
Payload: pk.Payload,
Created: pk.Created,
Client: cl.ID,
Origin: pk.Origin,
Properties: storage.MessageProperties{
PayloadFormat: props.PayloadFormat,
@@ -295,6 +296,7 @@ func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, rese
in := &storage.Message{
ID: inflightKey(cl, pk),
T: storage.InflightKey,
Client: cl.ID,
Origin: pk.Origin,
PacketID: pk.PacketID,
FixedHeader: pk.FixedHeader,

View File

@@ -287,6 +287,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
TopicName: pk.TopicName,
Payload: pk.Payload,
Created: pk.Created,
Client: cl.ID,
Origin: pk.Origin,
Properties: storage.MessageProperties{
PayloadFormat: props.PayloadFormat,
@@ -317,6 +318,7 @@ func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, rese
in := &storage.Message{
ID: inflightKey(cl, pk),
T: storage.InflightKey,
Client: cl.ID,
Origin: pk.Origin,
FixedHeader: pk.FixedHeader,
TopicName: pk.TopicName,

View File

@@ -89,6 +89,7 @@ type Message struct {
Payload []byte `json:"payload"` // the message payload (if retained)
T string `json:"t,omitempty"` // the data type
ID string `json:"id,omitempty" storm:"id"` // the storage key
Client string `json:"client,omitempty"` // the client id the message is for
Origin string `json:"origin,omitempty"` // the id of the client who sent the message
TopicName string `json:"topic_name,omitempty"` // the topic the message was sent to (if retained)
FixedHeader packets.FixedHeader `json:"fixedheader"` // the header properties of the message

View File

@@ -27,7 +27,7 @@ import (
)
const (
Version = "2.6.5" // the current server version.
Version = "2.7.8" // the current server version.
defaultSysTopicInterval int64 = 1 // the interval between $SYS topic publishes
LocalListener = "local"
InlineClientId = "inline"
@@ -485,7 +485,7 @@ func (s *Server) attachClient(cl *Client, listener string) error {
expire := (cl.Properties.ProtocolVersion == 5 && cl.Properties.Props.SessionExpiryInterval == 0) || (cl.Properties.ProtocolVersion < 5 && cl.Properties.Clean)
s.hooks.OnDisconnect(cl, err, expire)
if expire && atomic.LoadUint32(&cl.State.isTakenOver) == 0 {
if expire && !cl.IsTakenOver() {
cl.ClearInflights()
s.UnsubscribeClient(cl)
s.Clients.Delete(cl.ID) // [MQTT-4.1.0-2] ![MQTT-3.1.2-23]
@@ -565,11 +565,11 @@ func (s *Server) inheritClientSession(pk packets.Packet, cl *Client) bool {
if pk.Connect.Clean || (existing.Properties.Clean && existing.Properties.ProtocolVersion < 5) { // [MQTT-3.1.2-4] [MQTT-3.1.4-4]
s.UnsubscribeClient(existing)
existing.ClearInflights()
atomic.StoreUint32(&existing.State.isTakenOver, 1) // only set isTakenOver after unsubscribe has occurred
return false // [MQTT-3.2.2-3]
existing.State.isTakenOver.Store(true) // only set isTakenOver after unsubscribe has occurred
return false // [MQTT-3.2.2-3]
}
atomic.StoreUint32(&existing.State.isTakenOver, 1)
existing.State.isTakenOver.Store(true)
if existing.State.Inflight.Len() > 0 {
cl.State.Inflight = existing.State.Inflight.Clone() // [MQTT-3.1.2-5]
if cl.State.Inflight.maximumReceiveQuota == 0 && cl.ops.options.Capabilities.ReceiveMaximum != 0 {
@@ -1358,7 +1358,7 @@ func (s *Server) UnsubscribeClient(cl *Client) {
cl.State.Subscriptions.Delete(k)
}
if atomic.LoadUint32(&cl.State.isTakenOver) == 1 {
if cl.IsTakenOver() {
return
}
@@ -1672,7 +1672,7 @@ func (s *Server) loadClients(v []storage.Client) {
// loadInflight restores inflight messages from the datastore.
func (s *Server) loadInflight(v []storage.Message) {
for _, msg := range v {
if client, ok := s.Clients.Get(msg.Origin); ok {
if client, ok := s.Clients.Get(msg.Client); ok {
client.State.Inflight.Set(msg.ToPacket())
}
}

View File

@@ -667,6 +667,7 @@ func TestEstablishConnectionInheritExisting(t *testing.T) {
clw, ok := s.Clients.Get(packets.TPacketData[packets.Connect].Get(packets.TConnectMqtt311).Packet.Connect.ClientIdentifier)
require.True(t, ok)
require.NotEmpty(t, clw.State.Subscriptions)
require.True(t, cl.IsTakenOver())
// Prevent sequential takeover memory-bloom.
require.Empty(t, cl.State.Subscriptions.GetAll())
@@ -761,6 +762,9 @@ func TestEstablishConnectionInheritExistingTrueTakeover(t *testing.T) {
_, _ = w2.Write(packets.TPacketData[packets.Disconnect].Get(packets.TDisconnect).RawBytes)
require.NoError(t, <-o2)
require.True(t, clp1.IsTakenOver())
require.False(t, clp2.IsTakenOver())
}
func TestEstablishConnectionResentPendingInflightsError(t *testing.T) {
@@ -848,12 +852,15 @@ func TestEstablishConnectionInheritExistingClean(t *testing.T) {
require.Equal(t, packets.TPacketData[packets.Connack].Get(packets.TConnackAcceptedNoSession).RawBytes, <-recv)
require.Equal(t, packets.TPacketData[packets.Disconnect].Get(packets.TDisconnect).RawBytes, <-takeover)
require.True(t, cl.IsTakenOver())
_ = w.Close()
_ = r.Close()
clw, ok := s.Clients.Get(packets.TPacketData[packets.Connect].Get(packets.TConnectMqtt311).Packet.Connect.ClientIdentifier)
require.True(t, ok)
require.Equal(t, 0, clw.State.Subscriptions.Len())
}
func TestEstablishConnectionBadAuthentication(t *testing.T) {
@@ -3416,10 +3423,10 @@ func TestServerLoadInflightMessages(t *testing.T) {
require.Equal(t, 3, s.Clients.Len())
v := []storage.Message{
{Origin: "mochi", PacketID: 1, Payload: []byte("hello world"), TopicName: "a/b/c"},
{Origin: "mochi", PacketID: 2, Payload: []byte("yes"), TopicName: "a/b/c"},
{Origin: "zen", PacketID: 3, Payload: []byte("hello world"), TopicName: "a/b/c"},
{Origin: "mochi-co", PacketID: 4, Payload: []byte("hello world"), TopicName: "a/b/c"},
{Client: "mochi", Origin: "mochi", PacketID: 1, Payload: []byte("hello world"), TopicName: "a/b/c"},
{Client: "mochi", Origin: "mochi", PacketID: 2, Payload: []byte("yes"), TopicName: "a/b/c"},
{Client: "zen", Origin: "zen", PacketID: 3, Payload: []byte("hello world"), TopicName: "a/b/c"},
{Client: "mochi-co", Origin: "mochi-co", PacketID: 4, Payload: []byte("hello world"), TopicName: "a/b/c"},
}
s.loadInflight(v)