From b2ab9849492ebf8c2f7e536a93d31db724df5003 Mon Sep 17 00:00:00 2001 From: werben Date: Fri, 22 Dec 2023 08:34:29 +0800 Subject: [PATCH] Move cl.WriteLoop() to attachClient() and call cl.Stop() in loadClients() to update client.State. (#344) * Moving go cl.WriteLoop() out of NewClient() and placing it in server.attachClient(). * Call cl.Stop() to cancel the context, update cl.State with information such as disconnected time, and set the stopCause. * update README-CN.md --------- Co-authored-by: JB <28275108+mochi-co@users.noreply.github.com> --- README-CN.md | 2 +- server.go | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/README-CN.md b/README-CN.md index 7b4b54b..76807a8 100644 --- a/README-CN.md +++ b/README-CN.md @@ -200,7 +200,7 @@ server := mqtt.New(&mqtt.Options{ | 数据持久性 | [mochi-mqtt/server/hooks/storage/redis](hooks/storage/redis/redis.go) | 使用 [Redis](https://redis.io) 进行持久性存储。 | | 调试跟踪 | [mochi-mqtt/server/hooks/debug](hooks/debug/debug.go) | 调试输出以查看数据包在服务端的链路追踪。 | -许多内部函数都已开放给开发者,你可以参考上述示例创建自己的Hook钩子。如果你有更好的关于Hook钩子方面的建议或者疑问,你可以[提交问题](https://github.com/mochi-mqtt/server/issues)给我们。 | +许多内部函数都已开放给开发者,你可以参考上述示例创建自己的Hook钩子。如果你有更好的关于Hook钩子方面的建议或者疑问,你可以[提交问题](https://github.com/mochi-mqtt/server/issues)给我们。 ### 访问控制(Access Control) diff --git a/server.go b/server.go index e94950f..33b3ef2 100644 --- a/server.go +++ b/server.go @@ -233,8 +233,6 @@ func (s *Server) NewClient(c net.Conn, listener string, id string, inline bool) // By default, we don't want to restrict developer publishes, // but if you do, reset this after creating inline client. cl.State.Inflight.ResetReceiveQuota(math.MaxInt32) - } else { - go cl.WriteLoop() // can only write to real clients } return cl @@ -332,6 +330,8 @@ func (s *Server) EstablishConnection(listener string, c net.Conn) error { func (s *Server) attachClient(cl *Client, listener string) error { defer s.Listeners.ClientsWg.Done() s.Listeners.ClientsWg.Add(1) + + go cl.WriteLoop() defer cl.Stop(nil) pk, err := s.readConnectionPacket(cl) @@ -1554,6 +1554,9 @@ func (s *Server) loadClients(v []storage.Client) { } cl.Properties.Will = Will(c.Will) + // cancel the context, update cl.State such as disconnected time and stopCause. + cl.Stop(packets.ErrServerShuttingDown) + expire := (cl.Properties.ProtocolVersion == 5 && cl.Properties.Props.SessionExpiryInterval == 0) || (cl.Properties.ProtocolVersion < 5 && cl.Properties.Clean) s.hooks.OnDisconnect(cl, packets.ErrServerShuttingDown, expire) if expire {