Move cl.WriteLoop() to attachClient() and call cl.Stop() in loadClients() to update client.State. (#344)

* Moving go cl.WriteLoop() out of NewClient() and placing it in server.attachClient().

* Call cl.Stop() to cancel the context, update cl.State with information such as disconnected time, and set the stopCause.

* update README-CN.md

---------

Co-authored-by: JB <28275108+mochi-co@users.noreply.github.com>
This commit is contained in:
werben
2023-12-22 08:34:29 +08:00
committed by GitHub
parent 5523d15a9b
commit b2ab984949
2 changed files with 6 additions and 3 deletions

View File

@@ -200,7 +200,7 @@ server := mqtt.New(&mqtt.Options{
| 数据持久性 | [mochi-mqtt/server/hooks/storage/redis](hooks/storage/redis/redis.go) | 使用 [Redis](https://redis.io) 进行持久性存储。 |
| 调试跟踪 | [mochi-mqtt/server/hooks/debug](hooks/debug/debug.go) | 调试输出以查看数据包在服务端的链路追踪。 |
许多内部函数都已开放给开发者你可以参考上述示例创建自己的Hook钩子。如果你有更好的关于Hook钩子方面的建议或者疑问你可以[提交问题](https://github.com/mochi-mqtt/server/issues)给我们。 |
许多内部函数都已开放给开发者你可以参考上述示例创建自己的Hook钩子。如果你有更好的关于Hook钩子方面的建议或者疑问你可以[提交问题](https://github.com/mochi-mqtt/server/issues)给我们。
### 访问控制(Access Control)

View File

@@ -233,8 +233,6 @@ func (s *Server) NewClient(c net.Conn, listener string, id string, inline bool)
// By default, we don't want to restrict developer publishes,
// but if you do, reset this after creating inline client.
cl.State.Inflight.ResetReceiveQuota(math.MaxInt32)
} else {
go cl.WriteLoop() // can only write to real clients
}
return cl
@@ -332,6 +330,8 @@ func (s *Server) EstablishConnection(listener string, c net.Conn) error {
func (s *Server) attachClient(cl *Client, listener string) error {
defer s.Listeners.ClientsWg.Done()
s.Listeners.ClientsWg.Add(1)
go cl.WriteLoop()
defer cl.Stop(nil)
pk, err := s.readConnectionPacket(cl)
@@ -1554,6 +1554,9 @@ func (s *Server) loadClients(v []storage.Client) {
}
cl.Properties.Will = Will(c.Will)
// cancel the context, update cl.State such as disconnected time and stopCause.
cl.Stop(packets.ErrServerShuttingDown)
expire := (cl.Properties.ProtocolVersion == 5 && cl.Properties.Props.SessionExpiryInterval == 0) || (cl.Properties.ProtocolVersion < 5 && cl.Properties.Clean)
s.hooks.OnDisconnect(cl, packets.ErrServerShuttingDown, expire)
if expire {