diff --git a/circ/buffer.go b/circ/buffer.go index 5c8db2a..daa96fe 100644 --- a/circ/buffer.go +++ b/circ/buffer.go @@ -81,12 +81,25 @@ func (b *buffer) Close() { atomic.StoreInt64(&b.done, 1) debug.Println(b.id, "[B] STORE done=1 buffer closing") + debug.Println(b.id, "##### [X] wcond.Locking") b.wcond.L.Lock() + debug.Println(b.id, "##### [X] wcond.Locked") + debug.Println(b.id, "##### [X] wcond.Broadcasting") b.wcond.Broadcast() + debug.Println(b.id, "##### [X] wcond.Broadcasted") + debug.Println(b.id, "##### [X] wcond.Unlocking") b.wcond.L.Unlock() + debug.Println(b.id, "##### [X] wcond.Unlocked") + + debug.Println(b.id, "##### [Y] rcond.Locking") b.rcond.L.Lock() + debug.Println(b.id, "##### [Y] rcond.Locked") + debug.Println(b.id, "##### [Y] rcond.Broadcasting") b.rcond.Broadcast() + debug.Println(b.id, "##### [Y] rcond.Broadcasted") + debug.Println(b.id, "##### [Y] rcond.Unlocking") b.rcond.L.Unlock() + debug.Println(b.id, "##### [Y] rcond.Unlocked") debug.Println(b.id, "[B] DONE REBROADCASTED") } @@ -118,22 +131,32 @@ func (b *buffer) awaitCapacity(n int64) (head int64, err error) { head = atomic.LoadInt64(&b.head) next := head + n wrapped := next - b.size - tail := atomic.LoadInt64(&b.tail) + //tail := atomic.LoadInt64(&b.tail) + var tail int64 debug.Println(b.id, "[B] awaiting capacity (n)", n) + debug.Println(b.id, "##### [B] rcond.Locking") b.rcond.L.Lock() - for ; wrapped > tail || (tail > head && next > tail && wrapped < 0); tail = atomic.LoadInt64(&b.tail) { + debug.Println(b.id, "##### [B] rcond.Locked") + for tail = atomic.LoadInt64(&b.tail); wrapped > tail || (tail > head && next > tail && wrapped < 0); tail = atomic.LoadInt64(&b.tail) { debug.Println(b.id, "[B] iter no capacity") //fmt.Println("\t", wrapped, ">", tail, wrapped > tail, "||", tail, ">", head, "&&", next, ">", tail, "&&", wrapped, "<", 0, (tail > head && next > tail && wrapped < 0)) if atomic.LoadInt64(&b.done) == 1 { + debug.Println(b.id, "************ [B] awaitCap caught DONE") + b.rcond.L.Unlock() // Make sure we unlock return 0, io.EOF } debug.Println(b.id, "[B] iter no capacity waiting") + debug.Println(b.id, "##### [B] rcond.Wating") b.rcond.Wait() + debug.Println(b.id, "##### [B] rcond.Waited") } + debug.Println(b.id, "##### [B] rcond.Unlocked") b.rcond.L.Unlock() + debug.Println(b.id, "##### [B] rcond.Unlocked") + debug.Println(b.id, "[B] capacity unlocked (tail)", tail) return @@ -142,22 +165,31 @@ func (b *buffer) awaitCapacity(n int64) (head int64, err error) { // awaitFilled will hold until there are at least n bytes waiting between the // tail and head. func (b *buffer) awaitFilled(n int64) (tail int64, err error) { - head := atomic.LoadInt64(&b.head) + //head := atomic.LoadInt64(&b.head) + var head int64 tail = atomic.LoadInt64(&b.tail) debug.Println(b.id, "[B] awaiting filled (tail, head)", tail, head) + debug.Println(b.id, "##### [B] wcond.Locking") b.wcond.L.Lock() - for ; head > tail && tail+n > head || head < tail && b.size-tail+head < n; head = atomic.LoadInt64(&b.head) { + debug.Println(b.id, "##### [B] rcond.Locked") + for head = atomic.LoadInt64(&b.head); head > tail && tail+n > head || head < tail && b.size-tail+head < n; head = atomic.LoadInt64(&b.head) { debug.Println(b.id, "[B] iter no fill") if atomic.LoadInt64(&b.done) == 1 { + debug.Println(b.id, "************ [B] awaitFilled caught DONE") + b.wcond.L.Unlock() // Make sure we unlock return 0, io.EOF } debug.Println(b.id, "[B] iter no fill waiting") + debug.Println(b.id, "##### [B] rcond.Waiting") b.wcond.Wait() + debug.Println(b.id, "##### [B] rcond.Waited") } + debug.Println(b.id, "##### [B] rcond.Unlocking") b.wcond.L.Unlock() + debug.Println(b.id, "##### [B] rcond.Unlocked") debug.Println(b.id, "[B] filled (head)", head) return diff --git a/circ/reader.go b/circ/reader.go index dfa80ac..ddb1789 100644 --- a/circ/reader.go +++ b/circ/reader.go @@ -62,15 +62,24 @@ DONE: n, err := r.Read(b.buf[start:end]) total += int64(n) // incr total bytes read. if err != nil { + debug.Println("*[R] r.READ error", err) break DONE } + debug.Println(b.id, "*[R] READ (start, end, n)", start, n, b.buf[start:start+int64(n)]) // Move the head forward. debug.Println(b.id, "*[R] STORE HEAD", start+int64(n)) atomic.StoreInt64(&b.head, start+int64(n)) + + debug.Println(b.id, "##### *[R] wcond.Locking") b.wcond.L.Lock() + debug.Println(b.id, "##### *[R] wcond.Locked") + debug.Println(b.id, "##### *[R] wcond.Broadcasting") b.wcond.Broadcast() + debug.Println(b.id, "##### *[R] wcond.Broadcasted") + debug.Println(b.id, "##### *[R] wcond.Unlocking") b.wcond.L.Unlock() + debug.Println(b.id, "##### *[R] wcond.Unlocked") } debug.Println(b.id, "*[R] FINISHED SPIN") @@ -80,19 +89,33 @@ DONE: // Peek returns the next n bytes without advancing the reader. func (b *Reader) Peek(n int64) ([]byte, error) { + debug.Println(b.id, "[R] START PEEKING (n)", n) + tail := atomic.LoadInt64(&b.tail) - head := atomic.LoadInt64(&b.head) + var head int64 + //head := atomic.LoadInt64(&b.head) // Wait until there's at least 1 byte of data. - debug.Println(b.id, "[R] PEEKING (tail, head, n)", tail, head, n) + debug.Println(b.id, "[R] PEEKING (tail, head, n)", tail, head, n, "/", atomic.LoadInt64(&b.head)) + + debug.Println(b.id, "##### [R] wcond.Locking") b.wcond.L.Lock() - for ; head == tail; head = atomic.LoadInt64(&b.head) { + debug.Println(b.id, "##### [R] wcond.Locked") + for head = atomic.LoadInt64(&b.head); head == atomic.LoadInt64(&b.tail); head = atomic.LoadInt64(&b.head) { + debug.Println(b.id, "[R] PEEKING insufficient (tail, head)", tail, head) if atomic.LoadInt64(&b.done) == 1 { + debug.Println(b.id, "************ [R] PEEK caught DONE") + b.wcond.L.Unlock() // make sure we unlock return nil, io.EOF } + debug.Println(b.id, "##### [R] wcond.Waiting") b.wcond.Wait() + debug.Println(b.id, "##### [R] wcond.Waited") + } + debug.Println(b.id, "##### [R] wcond.Unlocking") b.wcond.L.Unlock() + debug.Println(b.id, "##### [R] wcond.Unlocked") debug.Println(b.id, "[R] PEEKING available") // Figure out if we can get all n bytes. diff --git a/circ/writer.go b/circ/writer.go index 0e422ad..3592ed1 100644 --- a/circ/writer.go +++ b/circ/writer.go @@ -23,17 +23,20 @@ func NewWriter(size, block int64) *Writer { // WriteTo writes the contents of the buffer to an io.Writer. func (b *Writer) WriteTo(w io.Writer) (total int64, err error) { + debug.Println(b.id, "*[R] STARTING SPIN") var p []byte var n int +DONE: for { cd := b.capDelta(atomic.LoadInt64(&b.tail), atomic.LoadInt64(&b.head)) - debug.Println(b.id, "SPIN (tail, head, delta)", b.tail, b.head, cd) + debug.Println(b.id, "* [W] SPIN (tail, head, delta)", b.tail, b.head, cd) if atomic.LoadInt64(&b.done) == 1 { if cd == 0 { err = io.EOF - return total, err + debug.Println(b.id, "************ *[W] caught DONE") + break DONE } else { - debug.Println("[W] //// capDelta not reached", cd) + debug.Println("*[W] //// capDelta not reached", cd) } } @@ -41,31 +44,40 @@ func (b *Writer) WriteTo(w io.Writer) (total int64, err error) { // of the Reader type. p, err = (*Reader)(b).Peek(b.block) if err != nil { - debug.Println(b.id, "[W] writeTo peek err (p, err)", p, err) + debug.Println(b.id, "*[W] writeTo peek err (p, err)", p, err) continue //break DONE } - debug.Println(b.id, "[W] PEEKED OK (p)", p) + debug.Println(b.id, "*[W] PEEKED OK (p)", p) // Write the peeked bytes to the io.Writer. n, err = w.Write(p) total += int64(n) if err != nil { - return total, err + debug.Println(b.id, "*[W] io READ err", err) + break DONE } - debug.Println(b.id, "[W] SENT (n, p)", n, p) + debug.Println(b.id, "*[W] SENT (n, p)", n, p) // Move the tail forward the bytes written. end := (atomic.LoadInt64(&b.tail) + int64(n)) % b.size - debug.Println(b.id, "[W] STORE TAIL", end) + debug.Println(b.id, "*[W] STORE TAIL", end) atomic.StoreInt64(&b.tail, end) + + debug.Println(b.id, "##### *[W] rcond.Locking") b.rcond.L.Lock() + debug.Println(b.id, "##### *[W] rcond.Locked") + debug.Println(b.id, "##### *[W] rcond.Broadcasting") b.rcond.Broadcast() + debug.Println(b.id, "##### *[W] rcond.Broadcasted") + debug.Println(b.id, "##### *[W] rcond.Unlocking") b.rcond.L.Unlock() - debug.Println(b.id, "[W] writeTo unlocked") + debug.Println(b.id, "##### *[W] rcond.Unlocked") } + debug.Println(b.id, "*[W] FINISHED SPIN") + return } @@ -90,9 +102,15 @@ func (b *Writer) Write(p []byte) (nn int, err error) { atomic.StoreInt64(&b.head, next) debug.Println(b.id, "[W] STORE HEAD", next) + debug.Println(b.id, "##### [W] wcond.Locking") b.wcond.L.Lock() + debug.Println(b.id, "##### [W] rcond.Locked") + debug.Println(b.id, "##### [W] rcond.Broadcasting") b.wcond.Broadcast() + debug.Println(b.id, "##### [W] rcond.Broadcasted") + debug.Println(b.id, "##### [W] rcond.Unlocking") b.wcond.L.Unlock() + debug.Println(b.id, "##### [W] rcond.Unlocked") return } diff --git a/mqtt_test.go b/mqtt_test.go index 6b0ee72..4bc413b 100644 --- a/mqtt_test.go +++ b/mqtt_test.go @@ -185,54 +185,57 @@ func BenchmarkServerClose(b *testing.B) { */ func TestServerEstablishConnectionOKCleanSession(t *testing.T) { - s := New() - r, w := net.Pipe() - o := make(chan error) - go func() { - o <- s.EstablishConnection("tcp", r, new(auth.Allow)) - }() + for x := 0; x < 10000; x++ { + fmt.Println("===========================", x) + s := New() + r, w := net.Pipe() + o := make(chan error) + go func() { + o <- s.EstablishConnection("tcp", r, new(auth.Allow)) + }() - go func() { - w.Write([]byte{ - byte(packets.Connect << 4), 15, // Fixed header - 0, 4, // Protocol Name - MSB+LSB - 'M', 'Q', 'T', 'T', // Protocol Name - 4, // Protocol Version - 2, // Packet Flags - clean session - 0, 45, // Keepalive - 0, 3, // Client ID - MSB+LSB - 'z', 'e', 'n', // Client ID "zen" - }) - w.Write([]byte{byte(packets.Disconnect << 4), 0}) - }() + go func() { + w.Write([]byte{ + byte(packets.Connect << 4), 15, // Fixed header + 0, 4, // Protocol Name - MSB+LSB + 'M', 'Q', 'T', 'T', // Protocol Name + 4, // Protocol Version + 2, // Packet Flags - clean session + 0, 45, // Keepalive + 0, 3, // Client ID - MSB+LSB + 'z', 'e', 'n', // Client ID "zen" + }) + w.Write([]byte{byte(packets.Disconnect << 4), 0}) + }() - recv := make(chan []byte) - go func() { - buf, err := ioutil.ReadAll(w) - if err != nil { - panic(err) + recv := make(chan []byte) + go func() { + buf, err := ioutil.ReadAll(w) + if err != nil { + panic(err) + } + recv <- buf + }() + + //time.Sleep(10 * time.Millisecond) + s.clients.Lock() + for _, v := range s.clients.internal { + v.close() + break } - recv <- buf - }() + s.clients.Unlock() - //time.Sleep(10 * time.Millisecond) - s.clients.Lock() - for _, v := range s.clients.internal { - v.close() - break + errx := <-o + require.NoError(t, errx) + require.Equal(t, []byte{ + byte(packets.Connack << 4), 2, + 0, packets.Accepted, + }, <-recv) + + fmt.Println() + + w.Close() } - s.clients.Unlock() - - errx := <-o - require.NoError(t, errx) - require.Equal(t, []byte{ - byte(packets.Connack << 4), 2, - 0, packets.Accepted, - }, <-recv) - - fmt.Println() - - w.Close() } /* diff --git a/processor.go b/processor.go index 2b179e0..2839626 100644 --- a/processor.go +++ b/processor.go @@ -24,10 +24,13 @@ type Processor struct { W *circ.Writer // started tracks the goroutines which have been started. - started sync.WaitGroup + started *sync.WaitGroup - // ended tracks the goroutines which have ended. - ended sync.WaitGroup + // endedW tracks when the writer has ended. + endedW *sync.WaitGroup + + // endedR tracks when the reader has ended. + endedR *sync.WaitGroup // FixedHeader is the FixedHeader from the last read packet. FixedHeader packets.FixedHeader @@ -36,55 +39,68 @@ type Processor struct { // NewProcessor returns a new instance of Processor. func NewProcessor(c net.Conn, r *circ.Reader, w *circ.Writer) *Processor { return &Processor{ - Conn: c, - R: r, - W: w, + Conn: c, + R: r, + W: w, + started: new(sync.WaitGroup), + endedW: new(sync.WaitGroup), + endedR: new(sync.WaitGroup), } } // Start spins up the reader and writer goroutines. func (p *Processor) Start() { - go func() { - defer p.ended.Done() - fmt.Println("starting readFrom", p.Conn) - p.started.Done() - n, err := p.R.ReadFrom(p.Conn) - if err != nil { - // - } - fmt.Println(">>> finished ReadFrom", n, err) - }() + p.started.Add(2) + fmt.Println("\t\tWG Started ADD", 2) - go func() { - defer p.ended.Done() - fmt.Println("starting writeTo", p.Conn) + go func(start, end *sync.WaitGroup) { + defer p.endedW.Done() p.started.Done() + + fmt.Println("starting writeTo", p.Conn) n, err := p.W.WriteTo(p.Conn) if err != nil { - // + // ... } fmt.Println(">>> finished WriteTo", n, err) - }() + }(p.started, p.endedW) + p.endedW.Add(1) - p.started.Add(2) - p.ended.Add(2) + go func(start, end *sync.WaitGroup) { + defer p.endedR.Done() + p.started.Done() + + fmt.Println("starting readFrom", p.Conn) + n, err := p.R.ReadFrom(p.Conn) + if err != nil { + // ... + } + + // + fmt.Println(">>> finished ReadFrom", n, err) + }(p.started, p.endedR) + p.endedR.Add(1) + + fmt.Println("\t\tWG Started Waiting") p.started.Wait() + fmt.Println("Started OK") } // Stop stops the processor goroutines. func (p *Processor) Stop() { - fmt.Println("processor stop") - p.W.Close() + p.endedW.Wait() + if p.Conn != nil { p.Conn.Close() + } else { + fmt.Println("--// Conn is nil") } + p.R.Close() - - p.ended.Wait() - + p.endedR.Wait() } // RefreshDeadline refreshes the read/write deadline for the net.Conn connection.