mirror of
https://github.com/mochi-mqtt/server.git
synced 2025-09-26 20:21:12 +08:00
Refactor Listener WG to track clients (#301)
This commit is contained in:
@@ -38,8 +38,8 @@ type Listener interface {
|
||||
|
||||
// Listeners contains the network listeners for the broker.
|
||||
type Listeners struct {
|
||||
wg sync.WaitGroup // a waitgroup that waits for all listeners to finish.
|
||||
internal map[string]Listener // a map of active listeners.
|
||||
ClientsWg sync.WaitGroup // a waitgroup that waits for all clients in all listeners to finish.
|
||||
internal map[string]Listener // a map of active listeners.
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
@@ -86,8 +86,6 @@ func (l *Listeners) Serve(id string, establisher EstablishFn) {
|
||||
listener := l.internal[id]
|
||||
|
||||
go func(e EstablishFn) {
|
||||
defer l.wg.Done()
|
||||
l.wg.Add(1)
|
||||
listener.Serve(e)
|
||||
}(establisher)
|
||||
}
|
||||
@@ -131,5 +129,5 @@ func (l *Listeners) CloseAll(closer CloseFn) {
|
||||
for _, id := range ids {
|
||||
l.Close(id, closer)
|
||||
}
|
||||
l.wg.Wait()
|
||||
l.ClientsWg.Wait()
|
||||
}
|
||||
|
@@ -330,7 +330,10 @@ func (s *Server) EstablishConnection(listener string, c net.Conn) error {
|
||||
// attachClient validates an incoming client connection and if viable, attaches the client
|
||||
// to the server, performs session housekeeping, and reads incoming packets.
|
||||
func (s *Server) attachClient(cl *Client, listener string) error {
|
||||
defer s.Listeners.ClientsWg.Done()
|
||||
s.Listeners.ClientsWg.Add(1)
|
||||
defer cl.Stop(nil)
|
||||
|
||||
pk, err := s.readConnectionPacket(cl)
|
||||
if err != nil {
|
||||
return fmt.Errorf("read connection: %w", err)
|
||||
@@ -1384,6 +1387,7 @@ func (s *Server) publishSysTopics() {
|
||||
// Close attempts to gracefully shut down the server, all listeners, clients, and stores.
|
||||
func (s *Server) Close() error {
|
||||
close(s.done)
|
||||
s.Log.Info("gracefully stopping server")
|
||||
s.Listeners.CloseAll(s.closeListenerClients)
|
||||
s.hooks.OnStopped()
|
||||
s.hooks.Stop()
|
||||
|
Reference in New Issue
Block a user