All working

This commit is contained in:
Mochi
2019-10-08 20:42:26 +01:00
parent 2a00e70b02
commit 2fadf08246
5 changed files with 87 additions and 65 deletions

View File

@@ -26,7 +26,7 @@ func main() {
fmt.Println(aurora.Magenta("Mochi MQTT Broker initializing..."))
server := mqtt.New()
tcp := listeners.NewTCP("t1", ":1882")
tcp := listeners.NewTCP("t1", ":1883")
log.Println(tcp)
err := server.AddListener(tcp, nil)
if err != nil {

67
mqtt.go
View File

@@ -4,7 +4,9 @@ import (
"bufio"
"bytes"
"errors"
"log"
"net"
"sync"
"time"
//"github.com/davecgh/go-spew/spew"
@@ -51,6 +53,25 @@ type Server struct {
// topics is an index of topic subscriptions and retained messages.
topics topics.Indexer
// inbound is a small worker pool which processes incoming packets.
inbound chan transitMsg
// outbount is a small worker pool which processes incoming packets.
outbount chan transitMsg
//inboundPool is a waitgroup for the inbound workers.
inboundPool sync.WaitGroup
}
// transitMsg contains data to be sent to the inbound channel.
type transitMsg struct {
// client is the client who received or will receive the message.
client *client
// packet is the packet sent or received.
packet packets.Packet
}
// New returns a pointer to a new instance of the MQTT broker.
@@ -59,9 +80,37 @@ func New() *Server {
listeners: listeners.NewListeners(),
clients: newClients(),
topics: trie.New(),
inbound: make(chan transitMsg),
}
}
func (s *Server) StartProcessing() {
var workers = 8
s.inboundPool = sync.WaitGroup{}
s.inboundPool.Add(workers)
for i := 0; i < workers; i++ {
log.Println("spawning worker", i)
go func(wid int) {
defer s.inboundPool.Done()
for {
select {
case p, ok := <-s.inbound:
if !ok {
log.Println("worker closed", wid)
return
}
s.processPacket(p.client, p.packet)
}
}
}(i)
}
log.Println("spawned all workers")
return
}
// AddListener adds a new network listener to the server.
func (s *Server) AddListener(listener listeners.Listener, config *listeners.Config) error {
if _, ok := s.listeners.Get(listener.ID()); ok {
@@ -80,7 +129,9 @@ func (s *Server) AddListener(listener listeners.Listener, config *listeners.Conf
// Serve begins the event loops for establishing client connections on all
// attached listeners.
func (s *Server) Serve() error {
s.StartProcessing()
s.listeners.ServeAll(s.EstablishConnection)
return nil
}
@@ -133,9 +184,8 @@ func (s *Server) EstablishConnection(lid string, c net.Conn, ac auth.Controller)
// id, inherit the session.
var sessionPresent bool
if existing, ok := s.clients.get(msg.ClientIdentifier); ok {
existing.close()
existing.Lock()
existing.close()
if msg.CleanSession {
for k := range existing.subscriptions {
s.topics.Unsubscribe(k, existing.id)
@@ -242,7 +292,9 @@ DONE:
}
// Process inbound packet.
go s.processPacket(cl, pk)
s.inbound <- transitMsg{client: cl, packet: pk}
//go s.processPacket(cl, pk)
}
}
@@ -531,6 +583,8 @@ func (s *Server) processUnsubscribe(cl *client, pk *packets.UnsubscribePacket) e
// writeClient writes packets to a client connection.
func (s *Server) writeClient(cl *client, pk packets.Packet) error {
cl.p.Lock()
defer cl.p.Unlock()
// Ensure Writer is open.
if cl.p.W == nil {
@@ -565,7 +619,14 @@ func (s *Server) writeClient(cl *client, pk packets.Packet) error {
// Close attempts to gracefully shutdown the server, all listeners, and clients.
func (s *Server) Close() error {
// Close all listeners.
s.listeners.CloseAll(s.closeListenerClients)
// Close down waitgroups and pools.
close(s.inbound)
s.inboundPool.Wait()
return nil
}

View File

@@ -77,6 +77,7 @@ func TestNew(t *testing.T) {
require.NotNil(t, s)
require.NotNil(t, s.listeners)
require.NotNil(t, s.clients)
require.NotNil(t, s.inbound)
// log.Println(s)
}
@@ -86,6 +87,11 @@ func BenchmarkNew(b *testing.B) {
}
}
func TestProcessor(t *testing.T) {
s := New()
}
func TestServerAddListener(t *testing.T) {
s := New()
require.NotNil(t, s)

View File

@@ -64,8 +64,10 @@ func NewParser(c net.Conn, r *bufio.Reader, w BufWriter) *Parser {
// RefreshDeadline refreshes the read/write deadline for the net.Conn connection.
func (p *Parser) RefreshDeadline(keepalive uint16) {
if p.Conn != nil {
expiry := time.Duration(keepalive+(keepalive/2)) * time.Second
p.Conn.SetDeadline(time.Now().Add(expiry))
}
}
// ReadFixedHeader reads in the values of the next packet's fixed header.

View File

@@ -2,7 +2,6 @@ package trie
import (
"fmt"
"log"
"strings"
// "sync"
sync "github.com/sasha-s/go-deadlock"
@@ -22,7 +21,8 @@ func ReLeaf(m string, leaf *Leaf, d int) {
// Index is a prefix/trie tree containing topic subscribers and retained messages.
type Index struct {
sync.RWMutex
mu sync.RWMutex
// sync.RWMutex
Root *Leaf
}
@@ -38,11 +38,12 @@ func New() *Index {
// RetainMessage saves a message payload to the end of a topic branch.
func (x *Index) RetainMessage(msg *packets.PublishPacket) {
x.mu.Lock()
defer x.mu.Unlock()
if len(msg.Payload) > 0 {
n := x.poperate(msg.TopicName)
n.Message = msg
} else {
log.Println("unpoperating", msg.TopicName)
x.unpoperate(msg.TopicName, "", true)
}
//spew.Dump(x.Root)
@@ -50,8 +51,8 @@ func (x *Index) RetainMessage(msg *packets.PublishPacket) {
// Subscribe creates a subscription filter for a client.
func (x *Index) Subscribe(filter, client string, qos byte) {
x.Lock()
defer x.Unlock()
x.mu.Lock()
defer x.mu.Unlock()
n := x.poperate(filter)
n.Clients[client] = qos
n.Filter = filter
@@ -61,62 +62,15 @@ func (x *Index) Subscribe(filter, client string, qos byte) {
// Unsubscribe removes a subscription filter for a client. Returns true if an
// unsubscribe action sucessful.
func (x *Index) Unsubscribe(filter, client string) bool {
x.mu.Lock()
defer x.mu.Unlock()
return x.unpoperate(filter, client, false)
/*x.Lock()
defer x.Unlock()
// Walk to end leaf.
var d int
var particle string
var hasNext = true
e := x.Root
for hasNext {
particle, hasNext = isolateParticle(filter, d)
d++
e, _ = e.Leaves[particle]
// If the topic part doesn't exist in the tree, there's nothing
// left to do.
if e == nil {
return false
}
}
// Step backward removing client and orphaned leaves.
var key string
var orphaned bool
var end = true
for e.Parent != nil {
key = e.Key
// Wipe the client from this leaf if it's the filter end.
if end {
delete(e.Clients, client)
end = false
}
// If this leaf is empty, note it as orphaned.
orphaned = len(e.Clients) == 0 && len(e.Leaves) == 0 && e.Message == nil
// Traverse up the branch.
e = e.Parent
// If the leaf we just came from was empty, delete it.
if orphaned {
delete(e.Leaves, key)
}
}
return true
*/
}
// unpoperate steps backward through a trie sequence and removes any orphaned
// nodes. If a client id is specified, it will unsubscribe a client. If message
// is true, it will delete a retained message.
func (x *Index) unpoperate(filter string, client string, message bool) bool {
x.Lock()
defer x.Unlock()
// Walk to end leaf.
var d int
@@ -199,22 +153,21 @@ func (x *Index) poperate(topic string) *Leaf {
// Subscribers returns a map of clients who are subscribed to matching filters.
func (x *Index) Subscribers(topic string) topics.Subscriptions {
x.RLock()
defer x.RUnlock()
x.mu.RLock()
defer x.mu.RUnlock()
return x.Root.scanSubscribers(topic, 0, make(topics.Subscriptions))
}
// Messages returns a slice of retained topic messages which match a filter.
func (x *Index) Messages(filter string) []*packets.PublishPacket {
// ReLeaf("messages", x.Root, 0)
x.RLock()
defer x.RUnlock()
x.mu.RLock()
defer x.mu.RUnlock()
return x.Root.scanMessages(filter, 0, make([]*packets.PublishPacket, 0, 32))
}
// Leaf is a child node on the tree.
type Leaf struct {
sync.RWMutex
// Key contains the key that was used to create the leaf.
Key string