Files
mochi-mqtt/listeners/net.go
Derek Duncan 44bac0adc5 Migrate from zerolog to slog (#248)
* Begin adding new slog calls

* Fixed unit tests

* Add leveler example

* Add debug log level to Redis example

* Change location of server.Close() and add logs to example/hooks

* Begin removing references to zerolog

* Removed final references to zerolog

* Change where server.Close() occurs in main

* Change to 1.21 to remove x dependency

* Add slog

* Update references to 1.21

* Begin change of LogAttrs to standard logging interface

* Change the rest of LogAttrs to default

* Fix bad log

* Update badger.go

Changing "data" to "key" or "id" here might be more appropriate.

* Update badger.go

Changing "data" to "key" or "id" here might be more appropriate.

* Update server.go

Not checking if err is equal to nil

* Update server.go

printing information for ID or error is missing.

* Change references of err.Error() to err in slog

* Remove missed removal of Error() references for logging

---------

Co-authored-by: Derek Duncan <dduncan@atlassian.com>
Co-authored-by: Derek Duncan <derekduncan@gmail.com>
Co-authored-by: JB <28275108+mochi-co@users.noreply.github.com>
Co-authored-by: werbenhu <werben@qq.com>
2023-09-06 15:21:04 +01:00

93 lines
1.9 KiB
Go

// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2023 mochi-mqtt, mochi-co
// SPDX-FileContributor: Jeroen Rinzema
package listeners
import (
"net"
"sync"
"sync/atomic"
"log/slog"
)
// Net is a listener for establishing client connections on basic TCP protocol.
type Net struct { // [MQTT-4.2.0-1]
mu sync.Mutex
listener net.Listener // a net.Listener which will listen for new clients
id string // the internal id of the listener
log *slog.Logger // server logger
end uint32 // ensure the close methods are only called once
}
// NewNet initialises and returns a listener serving incoming connections on the given net.Listener
func NewNet(id string, listener net.Listener) *Net {
return &Net{
id: id,
listener: listener,
}
}
// ID returns the id of the listener.
func (l *Net) ID() string {
return l.id
}
// Address returns the address of the listener.
func (l *Net) Address() string {
return l.listener.Addr().String()
}
// Protocol returns the network of the listener.
func (l *Net) Protocol() string {
return l.listener.Addr().Network()
}
// Init initializes the listener.
func (l *Net) Init(log *slog.Logger) error {
l.log = log
return nil
}
// Serve starts waiting for new TCP connections, and calls the establish
// connection callback for any received.
func (l *Net) Serve(establish EstablishFn) {
for {
if atomic.LoadUint32(&l.end) == 1 {
return
}
conn, err := l.listener.Accept()
if err != nil {
return
}
if atomic.LoadUint32(&l.end) == 0 {
go func() {
err = establish(l.id, conn)
if err != nil {
l.log.Warn("", "error", err)
}
}()
}
}
}
// Close closes the listener and any client connections.
func (l *Net) Close(closeClients CloseFn) {
l.mu.Lock()
defer l.mu.Unlock()
if atomic.CompareAndSwapUint32(&l.end, 0, 1) {
closeClients(l.id)
}
if l.listener != nil {
err := l.listener.Close()
if err != nil {
return
}
}
}