mirror of
https://github.com/mochi-mqtt/server.git
synced 2025-09-27 04:26:23 +08:00

* Begin adding new slog calls * Fixed unit tests * Add leveler example * Add debug log level to Redis example * Change location of server.Close() and add logs to example/hooks * Begin removing references to zerolog * Removed final references to zerolog * Change where server.Close() occurs in main * Change to 1.21 to remove x dependency * Add slog * Update references to 1.21 * Begin change of LogAttrs to standard logging interface * Change the rest of LogAttrs to default * Fix bad log * Update badger.go Changing "data" to "key" or "id" here might be more appropriate. * Update badger.go Changing "data" to "key" or "id" here might be more appropriate. * Update server.go Not checking if err is equal to nil * Update server.go printing information for ID or error is missing. * Change references of err.Error() to err in slog * Remove missed removal of Error() references for logging --------- Co-authored-by: Derek Duncan <dduncan@atlassian.com> Co-authored-by: Derek Duncan <derekduncan@gmail.com> Co-authored-by: JB <28275108+mochi-co@users.noreply.github.com> Co-authored-by: werbenhu <werben@qq.com>
93 lines
1.9 KiB
Go
93 lines
1.9 KiB
Go
// SPDX-License-Identifier: MIT
|
|
// SPDX-FileCopyrightText: 2023 mochi-mqtt, mochi-co
|
|
// SPDX-FileContributor: Jeroen Rinzema
|
|
|
|
package listeners
|
|
|
|
import (
|
|
"net"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"log/slog"
|
|
)
|
|
|
|
// Net is a listener for establishing client connections on basic TCP protocol.
|
|
type Net struct { // [MQTT-4.2.0-1]
|
|
mu sync.Mutex
|
|
listener net.Listener // a net.Listener which will listen for new clients
|
|
id string // the internal id of the listener
|
|
log *slog.Logger // server logger
|
|
end uint32 // ensure the close methods are only called once
|
|
}
|
|
|
|
// NewNet initialises and returns a listener serving incoming connections on the given net.Listener
|
|
func NewNet(id string, listener net.Listener) *Net {
|
|
return &Net{
|
|
id: id,
|
|
listener: listener,
|
|
}
|
|
}
|
|
|
|
// ID returns the id of the listener.
|
|
func (l *Net) ID() string {
|
|
return l.id
|
|
}
|
|
|
|
// Address returns the address of the listener.
|
|
func (l *Net) Address() string {
|
|
return l.listener.Addr().String()
|
|
}
|
|
|
|
// Protocol returns the network of the listener.
|
|
func (l *Net) Protocol() string {
|
|
return l.listener.Addr().Network()
|
|
}
|
|
|
|
// Init initializes the listener.
|
|
func (l *Net) Init(log *slog.Logger) error {
|
|
l.log = log
|
|
return nil
|
|
}
|
|
|
|
// Serve starts waiting for new TCP connections, and calls the establish
|
|
// connection callback for any received.
|
|
func (l *Net) Serve(establish EstablishFn) {
|
|
for {
|
|
if atomic.LoadUint32(&l.end) == 1 {
|
|
return
|
|
}
|
|
|
|
conn, err := l.listener.Accept()
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
if atomic.LoadUint32(&l.end) == 0 {
|
|
go func() {
|
|
err = establish(l.id, conn)
|
|
if err != nil {
|
|
l.log.Warn("", "error", err)
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
}
|
|
|
|
// Close closes the listener and any client connections.
|
|
func (l *Net) Close(closeClients CloseFn) {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
|
|
if atomic.CompareAndSwapUint32(&l.end, 0, 1) {
|
|
closeClients(l.id)
|
|
}
|
|
|
|
if l.listener != nil {
|
|
err := l.listener.Close()
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
}
|