mirror of
https://github.com/mochi-mqtt/server.git
synced 2025-11-03 02:23:49 +08:00
Fixes sync.cond deadlock
This commit is contained in:
@@ -81,12 +81,25 @@ func (b *buffer) Close() {
|
||||
atomic.StoreInt64(&b.done, 1)
|
||||
debug.Println(b.id, "[B] STORE done=1 buffer closing")
|
||||
|
||||
debug.Println(b.id, "##### [X] wcond.Locking")
|
||||
b.wcond.L.Lock()
|
||||
debug.Println(b.id, "##### [X] wcond.Locked")
|
||||
debug.Println(b.id, "##### [X] wcond.Broadcasting")
|
||||
b.wcond.Broadcast()
|
||||
debug.Println(b.id, "##### [X] wcond.Broadcasted")
|
||||
debug.Println(b.id, "##### [X] wcond.Unlocking")
|
||||
b.wcond.L.Unlock()
|
||||
debug.Println(b.id, "##### [X] wcond.Unlocked")
|
||||
|
||||
debug.Println(b.id, "##### [Y] rcond.Locking")
|
||||
b.rcond.L.Lock()
|
||||
debug.Println(b.id, "##### [Y] rcond.Locked")
|
||||
debug.Println(b.id, "##### [Y] rcond.Broadcasting")
|
||||
b.rcond.Broadcast()
|
||||
debug.Println(b.id, "##### [Y] rcond.Broadcasted")
|
||||
debug.Println(b.id, "##### [Y] rcond.Unlocking")
|
||||
b.rcond.L.Unlock()
|
||||
debug.Println(b.id, "##### [Y] rcond.Unlocked")
|
||||
|
||||
debug.Println(b.id, "[B] DONE REBROADCASTED")
|
||||
}
|
||||
@@ -118,22 +131,32 @@ func (b *buffer) awaitCapacity(n int64) (head int64, err error) {
|
||||
head = atomic.LoadInt64(&b.head)
|
||||
next := head + n
|
||||
wrapped := next - b.size
|
||||
tail := atomic.LoadInt64(&b.tail)
|
||||
//tail := atomic.LoadInt64(&b.tail)
|
||||
var tail int64
|
||||
|
||||
debug.Println(b.id, "[B] awaiting capacity (n)", n)
|
||||
debug.Println(b.id, "##### [B] rcond.Locking")
|
||||
b.rcond.L.Lock()
|
||||
for ; wrapped > tail || (tail > head && next > tail && wrapped < 0); tail = atomic.LoadInt64(&b.tail) {
|
||||
debug.Println(b.id, "##### [B] rcond.Locked")
|
||||
for tail = atomic.LoadInt64(&b.tail); wrapped > tail || (tail > head && next > tail && wrapped < 0); tail = atomic.LoadInt64(&b.tail) {
|
||||
debug.Println(b.id, "[B] iter no capacity")
|
||||
|
||||
//fmt.Println("\t", wrapped, ">", tail, wrapped > tail, "||", tail, ">", head, "&&", next, ">", tail, "&&", wrapped, "<", 0, (tail > head && next > tail && wrapped < 0))
|
||||
if atomic.LoadInt64(&b.done) == 1 {
|
||||
debug.Println(b.id, "************ [B] awaitCap caught DONE")
|
||||
b.rcond.L.Unlock() // Make sure we unlock
|
||||
return 0, io.EOF
|
||||
}
|
||||
|
||||
debug.Println(b.id, "[B] iter no capacity waiting")
|
||||
debug.Println(b.id, "##### [B] rcond.Wating")
|
||||
b.rcond.Wait()
|
||||
debug.Println(b.id, "##### [B] rcond.Waited")
|
||||
}
|
||||
debug.Println(b.id, "##### [B] rcond.Unlocked")
|
||||
b.rcond.L.Unlock()
|
||||
debug.Println(b.id, "##### [B] rcond.Unlocked")
|
||||
|
||||
debug.Println(b.id, "[B] capacity unlocked (tail)", tail)
|
||||
|
||||
return
|
||||
@@ -142,22 +165,31 @@ func (b *buffer) awaitCapacity(n int64) (head int64, err error) {
|
||||
// awaitFilled will hold until there are at least n bytes waiting between the
|
||||
// tail and head.
|
||||
func (b *buffer) awaitFilled(n int64) (tail int64, err error) {
|
||||
head := atomic.LoadInt64(&b.head)
|
||||
//head := atomic.LoadInt64(&b.head)
|
||||
var head int64
|
||||
tail = atomic.LoadInt64(&b.tail)
|
||||
|
||||
debug.Println(b.id, "[B] awaiting filled (tail, head)", tail, head)
|
||||
debug.Println(b.id, "##### [B] wcond.Locking")
|
||||
b.wcond.L.Lock()
|
||||
for ; head > tail && tail+n > head || head < tail && b.size-tail+head < n; head = atomic.LoadInt64(&b.head) {
|
||||
debug.Println(b.id, "##### [B] rcond.Locked")
|
||||
for head = atomic.LoadInt64(&b.head); head > tail && tail+n > head || head < tail && b.size-tail+head < n; head = atomic.LoadInt64(&b.head) {
|
||||
debug.Println(b.id, "[B] iter no fill")
|
||||
|
||||
if atomic.LoadInt64(&b.done) == 1 {
|
||||
debug.Println(b.id, "************ [B] awaitFilled caught DONE")
|
||||
b.wcond.L.Unlock() // Make sure we unlock
|
||||
return 0, io.EOF
|
||||
}
|
||||
|
||||
debug.Println(b.id, "[B] iter no fill waiting")
|
||||
debug.Println(b.id, "##### [B] rcond.Waiting")
|
||||
b.wcond.Wait()
|
||||
debug.Println(b.id, "##### [B] rcond.Waited")
|
||||
}
|
||||
debug.Println(b.id, "##### [B] rcond.Unlocking")
|
||||
b.wcond.L.Unlock()
|
||||
debug.Println(b.id, "##### [B] rcond.Unlocked")
|
||||
debug.Println(b.id, "[B] filled (head)", head)
|
||||
|
||||
return
|
||||
|
||||
@@ -62,15 +62,24 @@ DONE:
|
||||
n, err := r.Read(b.buf[start:end])
|
||||
total += int64(n) // incr total bytes read.
|
||||
if err != nil {
|
||||
debug.Println("*[R] r.READ error", err)
|
||||
break DONE
|
||||
}
|
||||
debug.Println(b.id, "*[R] READ (start, end, n)", start, n, b.buf[start:start+int64(n)])
|
||||
|
||||
// Move the head forward.
|
||||
debug.Println(b.id, "*[R] STORE HEAD", start+int64(n))
|
||||
atomic.StoreInt64(&b.head, start+int64(n))
|
||||
|
||||
debug.Println(b.id, "##### *[R] wcond.Locking")
|
||||
b.wcond.L.Lock()
|
||||
debug.Println(b.id, "##### *[R] wcond.Locked")
|
||||
debug.Println(b.id, "##### *[R] wcond.Broadcasting")
|
||||
b.wcond.Broadcast()
|
||||
debug.Println(b.id, "##### *[R] wcond.Broadcasted")
|
||||
debug.Println(b.id, "##### *[R] wcond.Unlocking")
|
||||
b.wcond.L.Unlock()
|
||||
debug.Println(b.id, "##### *[R] wcond.Unlocked")
|
||||
}
|
||||
|
||||
debug.Println(b.id, "*[R] FINISHED SPIN")
|
||||
@@ -80,19 +89,33 @@ DONE:
|
||||
|
||||
// Peek returns the next n bytes without advancing the reader.
|
||||
func (b *Reader) Peek(n int64) ([]byte, error) {
|
||||
debug.Println(b.id, "[R] START PEEKING (n)", n)
|
||||
|
||||
tail := atomic.LoadInt64(&b.tail)
|
||||
head := atomic.LoadInt64(&b.head)
|
||||
var head int64
|
||||
//head := atomic.LoadInt64(&b.head)
|
||||
|
||||
// Wait until there's at least 1 byte of data.
|
||||
debug.Println(b.id, "[R] PEEKING (tail, head, n)", tail, head, n)
|
||||
debug.Println(b.id, "[R] PEEKING (tail, head, n)", tail, head, n, "/", atomic.LoadInt64(&b.head))
|
||||
|
||||
debug.Println(b.id, "##### [R] wcond.Locking")
|
||||
b.wcond.L.Lock()
|
||||
for ; head == tail; head = atomic.LoadInt64(&b.head) {
|
||||
debug.Println(b.id, "##### [R] wcond.Locked")
|
||||
for head = atomic.LoadInt64(&b.head); head == atomic.LoadInt64(&b.tail); head = atomic.LoadInt64(&b.head) {
|
||||
debug.Println(b.id, "[R] PEEKING insufficient (tail, head)", tail, head)
|
||||
if atomic.LoadInt64(&b.done) == 1 {
|
||||
debug.Println(b.id, "************ [R] PEEK caught DONE")
|
||||
b.wcond.L.Unlock() // make sure we unlock
|
||||
return nil, io.EOF
|
||||
}
|
||||
debug.Println(b.id, "##### [R] wcond.Waiting")
|
||||
b.wcond.Wait()
|
||||
debug.Println(b.id, "##### [R] wcond.Waited")
|
||||
|
||||
}
|
||||
debug.Println(b.id, "##### [R] wcond.Unlocking")
|
||||
b.wcond.L.Unlock()
|
||||
debug.Println(b.id, "##### [R] wcond.Unlocked")
|
||||
debug.Println(b.id, "[R] PEEKING available")
|
||||
|
||||
// Figure out if we can get all n bytes.
|
||||
|
||||
@@ -23,17 +23,20 @@ func NewWriter(size, block int64) *Writer {
|
||||
|
||||
// WriteTo writes the contents of the buffer to an io.Writer.
|
||||
func (b *Writer) WriteTo(w io.Writer) (total int64, err error) {
|
||||
debug.Println(b.id, "*[R] STARTING SPIN")
|
||||
var p []byte
|
||||
var n int
|
||||
DONE:
|
||||
for {
|
||||
cd := b.capDelta(atomic.LoadInt64(&b.tail), atomic.LoadInt64(&b.head))
|
||||
debug.Println(b.id, "SPIN (tail, head, delta)", b.tail, b.head, cd)
|
||||
debug.Println(b.id, "* [W] SPIN (tail, head, delta)", b.tail, b.head, cd)
|
||||
if atomic.LoadInt64(&b.done) == 1 {
|
||||
if cd == 0 {
|
||||
err = io.EOF
|
||||
return total, err
|
||||
debug.Println(b.id, "************ *[W] caught DONE")
|
||||
break DONE
|
||||
} else {
|
||||
debug.Println("[W] //// capDelta not reached", cd)
|
||||
debug.Println("*[W] //// capDelta not reached", cd)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,31 +44,40 @@ func (b *Writer) WriteTo(w io.Writer) (total int64, err error) {
|
||||
// of the Reader type.
|
||||
p, err = (*Reader)(b).Peek(b.block)
|
||||
if err != nil {
|
||||
debug.Println(b.id, "[W] writeTo peek err (p, err)", p, err)
|
||||
debug.Println(b.id, "*[W] writeTo peek err (p, err)", p, err)
|
||||
continue
|
||||
//break DONE
|
||||
}
|
||||
debug.Println(b.id, "[W] PEEKED OK (p)", p)
|
||||
debug.Println(b.id, "*[W] PEEKED OK (p)", p)
|
||||
|
||||
// Write the peeked bytes to the io.Writer.
|
||||
n, err = w.Write(p)
|
||||
total += int64(n)
|
||||
if err != nil {
|
||||
return total, err
|
||||
debug.Println(b.id, "*[W] io READ err", err)
|
||||
break DONE
|
||||
}
|
||||
|
||||
debug.Println(b.id, "[W] SENT (n, p)", n, p)
|
||||
debug.Println(b.id, "*[W] SENT (n, p)", n, p)
|
||||
|
||||
// Move the tail forward the bytes written.
|
||||
end := (atomic.LoadInt64(&b.tail) + int64(n)) % b.size
|
||||
debug.Println(b.id, "[W] STORE TAIL", end)
|
||||
debug.Println(b.id, "*[W] STORE TAIL", end)
|
||||
atomic.StoreInt64(&b.tail, end)
|
||||
|
||||
debug.Println(b.id, "##### *[W] rcond.Locking")
|
||||
b.rcond.L.Lock()
|
||||
debug.Println(b.id, "##### *[W] rcond.Locked")
|
||||
debug.Println(b.id, "##### *[W] rcond.Broadcasting")
|
||||
b.rcond.Broadcast()
|
||||
debug.Println(b.id, "##### *[W] rcond.Broadcasted")
|
||||
debug.Println(b.id, "##### *[W] rcond.Unlocking")
|
||||
b.rcond.L.Unlock()
|
||||
debug.Println(b.id, "[W] writeTo unlocked")
|
||||
debug.Println(b.id, "##### *[W] rcond.Unlocked")
|
||||
}
|
||||
|
||||
debug.Println(b.id, "*[W] FINISHED SPIN")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@@ -90,9 +102,15 @@ func (b *Writer) Write(p []byte) (nn int, err error) {
|
||||
atomic.StoreInt64(&b.head, next)
|
||||
debug.Println(b.id, "[W] STORE HEAD", next)
|
||||
|
||||
debug.Println(b.id, "##### [W] wcond.Locking")
|
||||
b.wcond.L.Lock()
|
||||
debug.Println(b.id, "##### [W] rcond.Locked")
|
||||
debug.Println(b.id, "##### [W] rcond.Broadcasting")
|
||||
b.wcond.Broadcast()
|
||||
debug.Println(b.id, "##### [W] rcond.Broadcasted")
|
||||
debug.Println(b.id, "##### [W] rcond.Unlocking")
|
||||
b.wcond.L.Unlock()
|
||||
debug.Println(b.id, "##### [W] rcond.Unlocked")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@@ -185,6 +185,8 @@ func BenchmarkServerClose(b *testing.B) {
|
||||
*/
|
||||
|
||||
func TestServerEstablishConnectionOKCleanSession(t *testing.T) {
|
||||
for x := 0; x < 10000; x++ {
|
||||
fmt.Println("===========================", x)
|
||||
s := New()
|
||||
r, w := net.Pipe()
|
||||
o := make(chan error)
|
||||
@@ -233,6 +235,7 @@ func TestServerEstablishConnectionOKCleanSession(t *testing.T) {
|
||||
fmt.Println()
|
||||
|
||||
w.Close()
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
66
processor.go
66
processor.go
@@ -24,10 +24,13 @@ type Processor struct {
|
||||
W *circ.Writer
|
||||
|
||||
// started tracks the goroutines which have been started.
|
||||
started sync.WaitGroup
|
||||
started *sync.WaitGroup
|
||||
|
||||
// ended tracks the goroutines which have ended.
|
||||
ended sync.WaitGroup
|
||||
// endedW tracks when the writer has ended.
|
||||
endedW *sync.WaitGroup
|
||||
|
||||
// endedR tracks when the reader has ended.
|
||||
endedR *sync.WaitGroup
|
||||
|
||||
// FixedHeader is the FixedHeader from the last read packet.
|
||||
FixedHeader packets.FixedHeader
|
||||
@@ -39,52 +42,65 @@ func NewProcessor(c net.Conn, r *circ.Reader, w *circ.Writer) *Processor {
|
||||
Conn: c,
|
||||
R: r,
|
||||
W: w,
|
||||
started: new(sync.WaitGroup),
|
||||
endedW: new(sync.WaitGroup),
|
||||
endedR: new(sync.WaitGroup),
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Start spins up the reader and writer goroutines.
|
||||
func (p *Processor) Start() {
|
||||
go func() {
|
||||
defer p.ended.Done()
|
||||
fmt.Println("starting readFrom", p.Conn)
|
||||
p.started.Done()
|
||||
n, err := p.R.ReadFrom(p.Conn)
|
||||
if err != nil {
|
||||
//
|
||||
}
|
||||
fmt.Println(">>> finished ReadFrom", n, err)
|
||||
}()
|
||||
p.started.Add(2)
|
||||
fmt.Println("\t\tWG Started ADD", 2)
|
||||
|
||||
go func() {
|
||||
defer p.ended.Done()
|
||||
fmt.Println("starting writeTo", p.Conn)
|
||||
go func(start, end *sync.WaitGroup) {
|
||||
defer p.endedW.Done()
|
||||
p.started.Done()
|
||||
|
||||
fmt.Println("starting writeTo", p.Conn)
|
||||
n, err := p.W.WriteTo(p.Conn)
|
||||
if err != nil {
|
||||
//
|
||||
// ...
|
||||
}
|
||||
|
||||
fmt.Println(">>> finished WriteTo", n, err)
|
||||
}()
|
||||
}(p.started, p.endedW)
|
||||
p.endedW.Add(1)
|
||||
|
||||
p.started.Add(2)
|
||||
p.ended.Add(2)
|
||||
go func(start, end *sync.WaitGroup) {
|
||||
defer p.endedR.Done()
|
||||
p.started.Done()
|
||||
|
||||
fmt.Println("starting readFrom", p.Conn)
|
||||
n, err := p.R.ReadFrom(p.Conn)
|
||||
if err != nil {
|
||||
// ...
|
||||
}
|
||||
|
||||
//
|
||||
fmt.Println(">>> finished ReadFrom", n, err)
|
||||
}(p.started, p.endedR)
|
||||
p.endedR.Add(1)
|
||||
|
||||
fmt.Println("\t\tWG Started Waiting")
|
||||
p.started.Wait()
|
||||
fmt.Println("Started OK")
|
||||
}
|
||||
|
||||
// Stop stops the processor goroutines.
|
||||
func (p *Processor) Stop() {
|
||||
fmt.Println("processor stop")
|
||||
|
||||
p.W.Close()
|
||||
p.endedW.Wait()
|
||||
|
||||
if p.Conn != nil {
|
||||
p.Conn.Close()
|
||||
} else {
|
||||
fmt.Println("--// Conn is nil")
|
||||
}
|
||||
|
||||
p.R.Close()
|
||||
|
||||
p.ended.Wait()
|
||||
|
||||
p.endedR.Wait()
|
||||
}
|
||||
|
||||
// RefreshDeadline refreshes the read/write deadline for the net.Conn connection.
|
||||
|
||||
Reference in New Issue
Block a user