From af8c2fbd42095331a7e84ef90659ab07ef76b585 Mon Sep 17 00:00:00 2001 From: impact-eintr Date: Mon, 21 Nov 2022 14:55:24 +0800 Subject: [PATCH] base package --- go.mod | 2 +- ilist/list.go | 137 +++++++++++ rand/rand.go | 11 + sleep/commit_amd64.s | 35 +++ sleep/commit_asm.go | 20 ++ sleep/commit_noasm.go | 42 ++++ sleep/empty.s | 15 ++ sleep/sleep_test.go | 542 ++++++++++++++++++++++++++++++++++++++++++ sleep/sleep_unsafe.go | 395 ++++++++++++++++++++++++++++++ tmutex/tmutex.go | 51 ++++ tmutex/tmutex_test.go | 47 ++++ 11 files changed, 1296 insertions(+), 1 deletion(-) create mode 100644 ilist/list.go create mode 100644 rand/rand.go create mode 100644 sleep/commit_amd64.s create mode 100644 sleep/commit_asm.go create mode 100644 sleep/commit_noasm.go create mode 100644 sleep/empty.s create mode 100644 sleep/sleep_test.go create mode 100644 sleep/sleep_unsafe.go create mode 100644 tmutex/tmutex.go create mode 100644 tmutex/tmutex_test.go diff --git a/go.mod b/go.mod index 1833b1f..35363a7 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,3 @@ module tcpip -go 1.14 +go 1.18 diff --git a/ilist/list.go b/ilist/list.go new file mode 100644 index 0000000..651a9a8 --- /dev/null +++ b/ilist/list.go @@ -0,0 +1,137 @@ +package ilist + +type Linker interface { + Next() Element + Prev() Element + SetNext(Element) + SetPrev(Element) +} + +type Element interface { + Linker +} + +type ElementMapper struct {} + +func (ElementMapper) linkerFor(elem Element) Linker { + return elem; +} + +type List struct { + head Element + tail Element +} + +func (l *List) Reset() { + l.head = nil + l.tail = nil +} + +func (l *List) Empty() bool { + return l.head == nil +} + +func (l *List) Back() Element { + return l.tail +} + +func (l *List) PushFront(e Element) { + ElementMapper{}.linkerFor(e).SetNext(l.head) + ElementMapper{}.linkerFor(e).SetPrev(nil) + + if l.head != nil { + ElementMapper{}.linkerFor(l.head).SetPrev(e) + } else { + l.tail = e + } + l.head = e +} + +func (l *List) PushBack(e Element) { + ElementMapper{}.linkerFor(e).SetNext(nil) + ElementMapper{}.linkerFor(e).SetPrev(l.tail) + + if l.tail != nil { + ElementMapper{}.linkerFor(l.tail).SetNext(e) + } else { + l.head = e + } + l.tail = e +} + +// list merge +func (l *List) PushBackList(m *List) { + if l.head == nil { + l.head = m.head + l.tail = m.tail + } else if m.head != nil { + ElementMapper{}.linkerFor(l.tail).SetNext(m.head) + ElementMapper{}.linkerFor(m.head).SetPrev(l.tail) + + l.tail = m.tail + } + m.head = nil + m.tail = nil +} + +func (l *List) InsertAfter(b, e Element) { + a := ElementMapper{}.linkerFor(b).Next() + ElementMapper{}.linkerFor(e).SetNext(a) + ElementMapper{}.linkerFor(e).SetPrev(b) + ElementMapper{}.linkerFor(b).SetNext(e) + if a != nil { + ElementMapper{}.linkerFor(a).SetPrev(e) + } else { + l.tail = e + } +} + +func (l *List) InsertBefore(a, e Element) { + b := ElementMapper{}.linkerFor(a).Prev() + ElementMapper{}.linkerFor(e).SetNext(a) + ElementMapper{}.linkerFor(e).SetPrev(b) + ElementMapper{}.linkerFor(a).SetPrev(e) + if a != nil { + ElementMapper{}.linkerFor(b).SetNext(e) + } else { + l.head = e + } +} + +func (l *List) Remove(e Element) { + prev := ElementMapper{}.linkerFor(e).Prev() + next := ElementMapper{}.linkerFor(e).Next() + + if prev != nil { + ElementMapper{}.linkerFor(prev).SetNext(next) + } else { + l.head = next + } + + if next != nil { + ElementMapper{}.linkerFor(next).SetPrev(prev) + } else { + l.tail = prev + } +} + +type Entry struct { + next Element + prev Element +} + +func (e *Entry) Next() Element { + return e.next +} + +func (e *Entry) Prev() Element { + return e.prev +} + +func (e *Entry) SetNext(elem Element) { + e.next = elem +} + +func (e *Entry) SetPrev(elem Element) { + e.prev = elem +} diff --git a/rand/rand.go b/rand/rand.go new file mode 100644 index 0000000..6bcef3d --- /dev/null +++ b/rand/rand.go @@ -0,0 +1,11 @@ +package rand + +import "crypto/rand" + +// Reader is the default reader. +var Reader = rand.Reader + +// Read implements io.Reader.Read. +func Read(b []byte) (int, error) { + return rand.Read(b) +} diff --git a/sleep/commit_amd64.s b/sleep/commit_amd64.s new file mode 100644 index 0000000..8585ac6 --- /dev/null +++ b/sleep/commit_amd64.s @@ -0,0 +1,35 @@ +// Copyright 2018 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "textflag.h" + +#define preparingG 1 + +// See commit_noasm.go for a description of commitSleep. +// +// func commitSleep(g uintptr, waitingG *uintptr) bool +TEXT ·commitSleep(SB),NOSPLIT,$0-24 + MOVQ waitingG+8(FP), CX + MOVQ g+0(FP), DX + + // Store the G in waitingG if it's still preparingG. If it's anything + // else it means a waker has aborted the sleep. + MOVQ $preparingG, AX + LOCK + CMPXCHGQ DX, 0(CX) + + SETEQ AX + MOVB AX, ret+16(FP) + + RET diff --git a/sleep/commit_asm.go b/sleep/commit_asm.go new file mode 100644 index 0000000..adc81d5 --- /dev/null +++ b/sleep/commit_asm.go @@ -0,0 +1,20 @@ +// Copyright 2018 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build amd64 + +package sleep + +// See commit_noasm.go for a description of commitSleep. +func commitSleep(g uintptr, waitingG *uintptr) bool diff --git a/sleep/commit_noasm.go b/sleep/commit_noasm.go new file mode 100644 index 0000000..985c580 --- /dev/null +++ b/sleep/commit_noasm.go @@ -0,0 +1,42 @@ +// Copyright 2018 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !race +// +build !amd64 + +package sleep + +import "sync/atomic" + +// commitSleep signals to wakers that the given g is now sleeping. Wakers can +// then fetch it and wake it. +// +// The commit may fail if wakers have been asserted after our last check, in +// which case they will have set s.waitingG to zero. +// +// It is written in assembly because it is called from g0, so it doesn't have +// a race context. +func commitSleep(g uintptr, waitingG *uintptr) bool { + for { + // Check if the wait was aborted. + if atomic.LoadUintptr(waitingG) == 0 { + return false + } + + // Try to store the G so that wakers know who to wake. + if atomic.CompareAndSwapUintptr(waitingG, preparingG, g) { + return true + } + } +} diff --git a/sleep/empty.s b/sleep/empty.s new file mode 100644 index 0000000..e07189f --- /dev/null +++ b/sleep/empty.s @@ -0,0 +1,15 @@ +// Copyright 2018 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Empty assembly file so empty func definitions work. diff --git a/sleep/sleep_test.go b/sleep/sleep_test.go new file mode 100644 index 0000000..e4e4093 --- /dev/null +++ b/sleep/sleep_test.go @@ -0,0 +1,542 @@ +// Copyright 2018 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sleep + +import ( + "math/rand" + "runtime" + "testing" + "time" +) + +// ZeroWakerNotAsserted tests that a zero-value waker is in non-asserted state. +func ZeroWakerNotAsserted(t *testing.T) { + var w Waker + if w.IsAsserted() { + t.Fatalf("Zero waker is asserted") + } + + if w.Clear() { + t.Fatalf("Zero waker is asserted") + } +} + +// AssertedWakerAfterAssert tests that a waker properly reports its state as +// asserted once its Assert() method is called. +func AssertedWakerAfterAssert(t *testing.T) { + var w Waker + w.Assert() + if !w.IsAsserted() { + t.Fatalf("Asserted waker is not reported as such") + } + + if !w.Clear() { + t.Fatalf("Asserted waker is not reported as such") + } +} + +// AssertedWakerAfterTwoAsserts tests that a waker properly reports its state as +// asserted once its Assert() method is called twice. +func AssertedWakerAfterTwoAsserts(t *testing.T) { + var w Waker + w.Assert() + w.Assert() + if !w.IsAsserted() { + t.Fatalf("Asserted waker is not reported as such") + } + + if !w.Clear() { + t.Fatalf("Asserted waker is not reported as such") + } +} + +// NotAssertedWakerWithSleeper tests that a waker properly reports its state as +// not asserted after a sleeper is associated with it. +func NotAssertedWakerWithSleeper(t *testing.T) { + var w Waker + var s Sleeper + s.AddWaker(&w, 0) + if w.IsAsserted() { + t.Fatalf("Non-asserted waker is reported as asserted") + } + + if w.Clear() { + t.Fatalf("Non-asserted waker is reported as asserted") + } +} + +// NotAssertedWakerAfterWake tests that a waker properly reports its state as +// not asserted after a previous assert is consumed by a sleeper. That is, tests +// the "edge-triggered" behavior. +func NotAssertedWakerAfterWake(t *testing.T) { + var w Waker + var s Sleeper + s.AddWaker(&w, 0) + w.Assert() + s.Fetch(true) + if w.IsAsserted() { + t.Fatalf("Consumed waker is reported as asserted") + } + + if w.Clear() { + t.Fatalf("Consumed waker is reported as asserted") + } +} + +// AssertedWakerBeforeAdd tests that a waker causes a sleeper to not sleep if +// it's already asserted before being added. +func AssertedWakerBeforeAdd(t *testing.T) { + var w Waker + var s Sleeper + w.Assert() + s.AddWaker(&w, 0) + + if _, ok := s.Fetch(false); !ok { + t.Fatalf("Fetch failed even though asserted waker was added") + } +} + +// ClearedWaker tests that a waker properly reports its state as not asserted +// after it is cleared. +func ClearedWaker(t *testing.T) { + var w Waker + w.Assert() + w.Clear() + if w.IsAsserted() { + t.Fatalf("Cleared waker is reported as asserted") + } + + if w.Clear() { + t.Fatalf("Cleared waker is reported as asserted") + } +} + +// ClearedWakerWithSleeper tests that a waker properly reports its state as +// not asserted when it is cleared while it has a sleeper associated with it. +func ClearedWakerWithSleeper(t *testing.T) { + var w Waker + var s Sleeper + s.AddWaker(&w, 0) + w.Clear() + if w.IsAsserted() { + t.Fatalf("Cleared waker is reported as asserted") + } + + if w.Clear() { + t.Fatalf("Cleared waker is reported as asserted") + } +} + +// ClearedWakerAssertedWithSleeper tests that a waker properly reports its state +// as not asserted when it is cleared while it has a sleeper associated with it +// and has been asserted. +func ClearedWakerAssertedWithSleeper(t *testing.T) { + var w Waker + var s Sleeper + s.AddWaker(&w, 0) + w.Assert() + w.Clear() + if w.IsAsserted() { + t.Fatalf("Cleared waker is reported as asserted") + } + + if w.Clear() { + t.Fatalf("Cleared waker is reported as asserted") + } +} + +// TestBlock tests that a sleeper actually blocks waiting for the waker to +// assert its state. +func TestBlock(t *testing.T) { + var w Waker + var s Sleeper + + s.AddWaker(&w, 0) + + // Assert waker after one second. + before := time.Now() + go func() { + time.Sleep(1 * time.Second) + w.Assert() + }() + + // Fetch the result and make sure it took at least 500ms. + if _, ok := s.Fetch(true); !ok { + t.Fatalf("Fetch failed unexpectedly") + } + if d := time.Now().Sub(before); d < 500*time.Millisecond { + t.Fatalf("Duration was too short: %v", d) + } + + // Check that already-asserted waker completes inline. + w.Assert() + if _, ok := s.Fetch(true); !ok { + t.Fatalf("Fetch failed unexpectedly") + } + + // Check that fetch sleeps if waker had been asserted but was reset + // before Fetch is called. + w.Assert() + w.Clear() + before = time.Now() + go func() { + time.Sleep(1 * time.Second) + w.Assert() + }() + if _, ok := s.Fetch(true); !ok { + t.Fatalf("Fetch failed unexpectedly") + } + if d := time.Now().Sub(before); d < 500*time.Millisecond { + t.Fatalf("Duration was too short: %v", d) + } +} + +// TestNonBlock checks that a sleeper won't block if waker isn't asserted. +func TestNonBlock(t *testing.T) { + var w Waker + var s Sleeper + + // Don't block when there's no waker. + if _, ok := s.Fetch(false); ok { + t.Fatalf("Fetch succeeded when there is no waker") + } + + // Don't block when waker isn't asserted. + s.AddWaker(&w, 0) + if _, ok := s.Fetch(false); ok { + t.Fatalf("Fetch succeeded when waker was not asserted") + } + + // Don't block when waker was asserted, but isn't anymore. + w.Assert() + w.Clear() + if _, ok := s.Fetch(false); ok { + t.Fatalf("Fetch succeeded when waker was not asserted anymore") + } + + // Don't block when waker was consumed by previous Fetch(). + w.Assert() + if _, ok := s.Fetch(false); !ok { + t.Fatalf("Fetch failed even though waker was asserted") + } + + if _, ok := s.Fetch(false); ok { + t.Fatalf("Fetch succeeded when waker had been consumed") + } +} + +// TestMultiple checks that a sleeper can wait for and receives notifications +// from multiple wakers. +func TestMultiple(t *testing.T) { + s := Sleeper{} + w1 := Waker{} + w2 := Waker{} + + s.AddWaker(&w1, 0) + s.AddWaker(&w2, 1) + + w1.Assert() + w2.Assert() + + v, ok := s.Fetch(false) + if !ok { + t.Fatalf("Fetch failed when there are asserted wakers") + } + + if v != 0 && v != 1 { + t.Fatalf("Unexpected waker id: %v", v) + } + + want := 1 - v + v, ok = s.Fetch(false) + if !ok { + t.Fatalf("Fetch failed when there is an asserted waker") + } + + if v != want { + t.Fatalf("Unexpected waker id, got %v, want %v", v, want) + } +} + +// TestDoneFunction tests if calling Done() on a sleeper works properly. +func TestDoneFunction(t *testing.T) { + // Trivial case of no waker. + s := Sleeper{} + s.Done() + + // Cases when the sleeper has n wakers, but none are asserted. + for n := 1; n < 20; n++ { + s := Sleeper{} + w := make([]Waker, n) + for j := 0; j < n; j++ { + s.AddWaker(&w[j], j) + } + s.Done() + } + + // Cases when the sleeper has n wakers, and only the i-th one is + // asserted. + for n := 1; n < 20; n++ { + for i := 0; i < n; i++ { + s := Sleeper{} + w := make([]Waker, n) + for j := 0; j < n; j++ { + s.AddWaker(&w[j], j) + } + w[i].Assert() + s.Done() + } + } + + // Cases when the sleeper has n wakers, and the i-th one is asserted + // and cleared. + for n := 1; n < 20; n++ { + for i := 0; i < n; i++ { + s := Sleeper{} + w := make([]Waker, n) + for j := 0; j < n; j++ { + s.AddWaker(&w[j], j) + } + w[i].Assert() + w[i].Clear() + s.Done() + } + } + + // Cases when the sleeper has n wakers, with a random number of them + // asserted. + for n := 1; n < 20; n++ { + for iters := 0; iters < 1000; iters++ { + s := Sleeper{} + w := make([]Waker, n) + for j := 0; j < n; j++ { + s.AddWaker(&w[j], j) + } + + // Pick the number of asserted elements, then assert + // random wakers. + asserted := rand.Int() % (n + 1) + for j := 0; j < asserted; j++ { + w[rand.Int()%n].Assert() + } + s.Done() + } + } +} + +// TestRace tests that multiple wakers can continuously send wake requests to +// the sleeper. +func TestRace(t *testing.T) { + const wakers = 100 + const wakeRequests = 10000 + + counts := make([]int, wakers) + w := make([]Waker, wakers) + s := Sleeper{} + + // Associate each waker and start goroutines that will assert them. + for i := range w { + s.AddWaker(&w[i], i) + go func(w *Waker) { + n := 0 + for n < wakeRequests { + if !w.IsAsserted() { + w.Assert() + n++ + } else { + runtime.Gosched() + } + } + }(&w[i]) + } + + // Wait for all wake up notifications from all wakers. + for i := 0; i < wakers*wakeRequests; i++ { + v, _ := s.Fetch(true) + counts[v]++ + } + + // Check that we got the right number for each. + for i, v := range counts { + if v != wakeRequests { + t.Errorf("Waker %v only got %v wakes", i, v) + } + } +} + +// BenchmarkSleeperMultiSelect measures how long it takes to fetch a wake up +// from 4 wakers when at least one is already asserted. +func BenchmarkSleeperMultiSelect(b *testing.B) { + const count = 4 + s := Sleeper{} + w := make([]Waker, count) + for i := range w { + s.AddWaker(&w[i], i) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + w[count-1].Assert() + s.Fetch(true) + } +} + +// BenchmarkGoMultiSelect measures how long it takes to fetch a zero-length +// struct from one of 4 channels when at least one is ready. +func BenchmarkGoMultiSelect(b *testing.B) { + const count = 4 + ch := make([]chan struct{}, count) + for i := range ch { + ch[i] = make(chan struct{}, 1) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + ch[count-1] <- struct{}{} + select { + case <-ch[0]: + case <-ch[1]: + case <-ch[2]: + case <-ch[3]: + } + } +} + +// BenchmarkSleeperSingleSelect measures how long it takes to fetch a wake up +// from one waker that is already asserted. +func BenchmarkSleeperSingleSelect(b *testing.B) { + s := Sleeper{} + w := Waker{} + s.AddWaker(&w, 0) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + w.Assert() + s.Fetch(true) + } +} + +// BenchmarkGoSingleSelect measures how long it takes to fetch a zero-length +// struct from a channel that already has it buffered. +func BenchmarkGoSingleSelect(b *testing.B) { + ch := make(chan struct{}, 1) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + ch <- struct{}{} + <-ch + } +} + +// BenchmarkSleeperAssertNonWaiting measures how long it takes to assert a +// channel that is already asserted. +func BenchmarkSleeperAssertNonWaiting(b *testing.B) { + w := Waker{} + w.Assert() + for i := 0; i < b.N; i++ { + w.Assert() + } + +} + +// BenchmarkGoAssertNonWaiting measures how long it takes to write to a channel +// that has already something written to it. +func BenchmarkGoAssertNonWaiting(b *testing.B) { + ch := make(chan struct{}, 1) + ch <- struct{}{} + for i := 0; i < b.N; i++ { + select { + case ch <- struct{}{}: + default: + } + } +} + +// BenchmarkSleeperWaitOnSingleSelect measures how long it takes to wait on one +// waker channel while another goroutine wakes up the sleeper. This assumes that +// a new goroutine doesn't run immediately (i.e., the creator of a new goroutine +// is allowed to go to sleep before the new goroutine has a chance to run). +func BenchmarkSleeperWaitOnSingleSelect(b *testing.B) { + s := Sleeper{} + w := Waker{} + s.AddWaker(&w, 0) + for i := 0; i < b.N; i++ { + go func() { + w.Assert() + }() + s.Fetch(true) + } + +} + +// BenchmarkGoWaitOnSingleSelect measures how long it takes to wait on one +// channel while another goroutine wakes up the sleeper. This assumes that a new +// goroutine doesn't run immediately (i.e., the creator of a new goroutine is +// allowed to go to sleep before the new goroutine has a chance to run). +func BenchmarkGoWaitOnSingleSelect(b *testing.B) { + ch := make(chan struct{}, 1) + for i := 0; i < b.N; i++ { + go func() { + ch <- struct{}{} + }() + <-ch + } +} + +// BenchmarkSleeperWaitOnMultiSelect measures how long it takes to wait on 4 +// wakers while another goroutine wakes up the sleeper. This assumes that a new +// goroutine doesn't run immediately (i.e., the creator of a new goroutine is +// allowed to go to sleep before the new goroutine has a chance to run). +func BenchmarkSleeperWaitOnMultiSelect(b *testing.B) { + const count = 4 + s := Sleeper{} + w := make([]Waker, count) + for i := range w { + s.AddWaker(&w[i], i) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + go func() { + w[count-1].Assert() + }() + s.Fetch(true) + } +} + +// BenchmarkGoWaitOnMultiSelect measures how long it takes to wait on 4 channels +// while another goroutine wakes up the sleeper. This assumes that a new +// goroutine doesn't run immediately (i.e., the creator of a new goroutine is +// allowed to go to sleep before the new goroutine has a chance to run). +func BenchmarkGoWaitOnMultiSelect(b *testing.B) { + const count = 4 + ch := make([]chan struct{}, count) + for i := range ch { + ch[i] = make(chan struct{}, 1) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + go func() { + ch[count-1] <- struct{}{} + }() + select { + case <-ch[0]: + case <-ch[1]: + case <-ch[2]: + case <-ch[3]: + } + } +} diff --git a/sleep/sleep_unsafe.go b/sleep/sleep_unsafe.go new file mode 100644 index 0000000..f63cadc --- /dev/null +++ b/sleep/sleep_unsafe.go @@ -0,0 +1,395 @@ +// Copyright 2018 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package sleep allows goroutines to efficiently sleep on multiple sources of +// notifications (wakers). It offers O(1) complexity, which is different from +// multi-channel selects which have O(n) complexity (where n is the number of +// channels) and a considerable constant factor. +// +// It is similar to edge-triggered epoll waits, where the user registers each +// object of interest once, and then can repeatedly wait on all of them. +// +// A Waker object is used to wake a sleeping goroutine (G) up, or prevent it +// from going to sleep next. A Sleeper object is used to receive notifications +// from wakers, and if no notifications are available, to optionally sleep until +// one becomes available. +// +// A Waker can be associated with at most one Sleeper, but a Sleeper can be +// associated with multiple Wakers. A Sleeper has a list of asserted (ready) +// wakers; when Fetch() is called repeatedly, elements from this list are +// returned until the list becomes empty in which case the goroutine goes to +// sleep. When Assert() is called on a Waker, it adds itself to the Sleeper's +// asserted list and wakes the G up from its sleep if needed. +// +// Sleeper objects are expected to be used as follows, with just one goroutine +// executing this code: +// +// // One time set-up. +// s := sleep.Sleeper{} +// s.AddWaker(&w1, constant1) +// s.AddWaker(&w2, constant2) +// +// // Called repeatedly. +// for { +// switch id, _ := s.Fetch(true); id { +// case constant1: +// // Do work triggered by w1 being asserted. +// case constant2: +// // Do work triggered by w2 being asserted. +// } +// } +// +// And Waker objects are expected to call w.Assert() when they want the sleeper +// to wake up and perform work. +// +// The notifications are edge-triggered, which means that if a Waker calls +// Assert() several times before the sleeper has the chance to wake up, it will +// only be notified once and should perform all pending work (alternatively, it +// can also call Assert() on the waker, to ensure that it will wake up again). +// +// The "unsafeness" here is in the casts to/from unsafe.Pointer, which is safe +// when only one type is used for each unsafe.Pointer (which is the case here), +// we should just make sure that this remains the case in the future. The usage +// of unsafe package could be confined to sharedWaker and sharedSleeper types +// that would hold pointers in atomic.Pointers, but the go compiler currently +// can't optimize these as well (it won't inline their method calls), which +// reduces performance. +package sleep + +import ( + "sync/atomic" + "unsafe" +) + +const ( + // preparingG is stored in sleepers to indicate that they're preparing + // to sleep. + preparingG = 1 +) + +var ( + // assertedSleeper is a sentinel sleeper. A pointer to it is stored in + // wakers that are asserted. + assertedSleeper Sleeper +) + +//go:linkname gopark runtime.gopark +func gopark(unlockf func(uintptr, *uintptr) bool, wg *uintptr, reason string, traceEv byte, traceskip int) + +//go:linkname goready runtime.goready +func goready(g uintptr, traceskip int) + +// Sleeper allows a goroutine to sleep and receive wake up notifications from +// Wakers in an efficient way. +// +// This is similar to edge-triggered epoll in that wakers are added to the +// sleeper once and the sleeper can then repeatedly sleep in O(1) time while +// waiting on all wakers. +// +// None of the methods in a Sleeper can be called concurrently. Wakers that have +// been added to a sleeper A can only be added to another sleeper after A.Done() +// returns. These restrictions allow this to be implemented lock-free. +// +// This struct is thread-compatible. +type Sleeper struct { + // sharedList is a "stack" of asserted wakers. They atomically add + // themselves to the front of this list as they become asserted. + sharedList unsafe.Pointer + + // localList is a list of asserted wakers that is only accessible to the + // waiter, and thus doesn't have to be accessed atomically. When + // fetching more wakers, the waiter will first go through this list, and + // only when it's empty will it atomically fetch wakers from + // sharedList. + localList *Waker + + // allWakers is a list with all wakers that have been added to this + // sleeper. It is used during cleanup to remove associations. + allWakers *Waker + + // waitingG holds the G that is sleeping, if any. It is used by wakers + // to determine which G, if any, they should wake. + waitingG uintptr +} + +// AddWaker associates the given waker to the sleeper. id is the value to be +// returned when the sleeper is woken by the given waker. +func (s *Sleeper) AddWaker(w *Waker, id int) { + // Add the waker to the list of all wakers. + w.allWakersNext = s.allWakers + s.allWakers = w + w.id = id + + // Try to associate the waker with the sleeper. If it's already + // asserted, we simply enqueue it in the "ready" list. + for { + p := (*Sleeper)(atomic.LoadPointer(&w.s)) + if p == &assertedSleeper { + s.enqueueAssertedWaker(w) + return + } + + if atomic.CompareAndSwapPointer(&w.s, usleeper(p), usleeper(s)) { + return + } + } +} + +// nextWaker returns the next waker in the notification list, blocking if +// needed. +func (s *Sleeper) nextWaker(block bool) *Waker { + // Attempt to replenish the local list if it's currently empty. + if s.localList == nil { + for atomic.LoadPointer(&s.sharedList) == nil { + // Fail request if caller requested that we + // don't block. + if !block { + return nil + } + + // Indicate to wakers that we're about to sleep, + // this allows them to abort the wait by setting + // waitingG back to zero (which we'll notice + // before committing the sleep). + atomic.StoreUintptr(&s.waitingG, preparingG) + + // Check if something was queued while we were + // preparing to sleep. We need this interleaving + // to avoid missing wake ups. + if atomic.LoadPointer(&s.sharedList) != nil { + atomic.StoreUintptr(&s.waitingG, 0) + break + } + + // Try to commit the sleep and report it to the + // tracer as a select. + // + // gopark puts the caller to sleep and calls + // commitSleep to decide whether to immediately + // wake the caller up or to leave it sleeping. + const traceEvGoBlockSelect = 24 + gopark(commitSleep, &s.waitingG, "sleeper", traceEvGoBlockSelect, 0) + } + + // Pull the shared list out and reverse it in the local + // list. Given that wakers push themselves in reverse + // order, we fix things here. + v := (*Waker)(atomic.SwapPointer(&s.sharedList, nil)) + for v != nil { + cur := v + v = v.next + + cur.next = s.localList + s.localList = cur + } + } + + // Remove the waker in the front of the list. + w := s.localList + s.localList = w.next + + return w +} + +// Fetch fetches the next wake-up notification. If a notification is immediately +// available, it is returned right away. Otherwise, the behavior depends on the +// value of 'block': if true, the current goroutine blocks until a notification +// arrives, then returns it; if false, returns 'ok' as false. +// +// When 'ok' is true, the value of 'id' corresponds to the id associated with +// the waker; when 'ok' is false, 'id' is undefined. +// +// N.B. This method is *not* thread-safe. Only one goroutine at a time is +// allowed to call this method. +func (s *Sleeper) Fetch(block bool) (id int, ok bool) { + for { + w := s.nextWaker(block) + if w == nil { + return -1, false + } + + // Reassociate the waker with the sleeper. If the waker was + // still asserted we can return it, otherwise try the next one. + old := (*Sleeper)(atomic.SwapPointer(&w.s, usleeper(s))) + if old == &assertedSleeper { + return w.id, true + } + } +} + +// Done is used to indicate that the caller won't use this Sleeper anymore. It +// removes the association with all wakers so that they can be safely reused +// by another sleeper after Done() returns. +func (s *Sleeper) Done() { + // Remove all associations that we can, and build a list of the ones + // we could not. An association can be removed right away from waker w + // if w.s has a pointer to the sleeper, that is, the waker is not + // asserted yet. By atomically switching w.s to nil, we guarantee that + // subsequent calls to Assert() on the waker will not result in it being + // queued to this sleeper. + var pending *Waker + w := s.allWakers + for w != nil { + next := w.allWakersNext + for { + t := atomic.LoadPointer(&w.s) + if t != usleeper(s) { + w.allWakersNext = pending + pending = w + break + } + + if atomic.CompareAndSwapPointer(&w.s, t, nil) { + break + } + } + w = next + } + + // The associations that we could not remove are either asserted, or in + // the process of being asserted, or have been asserted and cleared + // before being pulled from the sleeper lists. We must wait for them all + // to make it to the sleeper lists, so that we know that the wakers + // won't do any more work towards waking this sleeper up. + for pending != nil { + pulled := s.nextWaker(true) + + // Remove the waker we just pulled from the list of associated + // wakers. + prev := &pending + for w := *prev; w != nil; w = *prev { + if pulled == w { + *prev = w.allWakersNext + break + } + prev = &w.allWakersNext + } + } + s.allWakers = nil +} + +// enqueueAssertedWaker enqueues an asserted waker to the "ready" circular list +// of wakers that want to notify the sleeper. +func (s *Sleeper) enqueueAssertedWaker(w *Waker) { + // Add the new waker to the front of the list. + for { + v := (*Waker)(atomic.LoadPointer(&s.sharedList)) + w.next = v + if atomic.CompareAndSwapPointer(&s.sharedList, uwaker(v), uwaker(w)) { + break + } + } + + for { + // Nothing to do if there isn't a G waiting. + g := atomic.LoadUintptr(&s.waitingG) + if g == 0 { + return + } + + // Signal to the sleeper that a waker has been asserted. + if atomic.CompareAndSwapUintptr(&s.waitingG, g, 0) { + if g != preparingG { + // We managed to get a G. Wake it up. + goready(g, 0) + } + } + } +} + +// Waker represents a source of wake-up notifications to be sent to sleepers. A +// waker can be associated with at most one sleeper at a time, and at any given +// time is either in asserted or non-asserted state. +// +// Once asserted, the waker remains so until it is manually cleared or a sleeper +// consumes its assertion (i.e., a sleeper wakes up or is prevented from going +// to sleep due to the waker). +// +// This struct is thread-safe, that is, its methods can be called concurrently +// by multiple goroutines. +type Waker struct { + // s is the sleeper that this waker can wake up. Only one sleeper at a + // time is allowed. This field can have three classes of values: + // nil -- the waker is not asserted: it either is not associated with + // a sleeper, or is queued to a sleeper due to being previously + // asserted. This is the zero value. + // &assertedSleeper -- the waker is asserted. + // otherwise -- the waker is not asserted, and is associated with the + // given sleeper. Once it transitions to asserted state, the + // associated sleeper will be woken. + s unsafe.Pointer + + // next is used to form a linked list of asserted wakers in a sleeper. + next *Waker + + // allWakersNext is used to form a linked list of all wakers associated + // to a given sleeper. + allWakersNext *Waker + + // id is the value to be returned to sleepers when they wake up due to + // this waker being asserted. + id int +} + +// Assert moves the waker to an asserted state, if it isn't asserted yet. When +// asserted, the waker will cause its matching sleeper to wake up. +func (w *Waker) Assert() { + // Nothing to do if the waker is already asserted. This check allows us + // to complete this case (already asserted) without any interlocked + // operations on x86. + if atomic.LoadPointer(&w.s) == usleeper(&assertedSleeper) { + return + } + + // Mark the waker as asserted, and wake up a sleeper if there is one. + switch s := (*Sleeper)(atomic.SwapPointer(&w.s, usleeper(&assertedSleeper))); s { + case nil: + case &assertedSleeper: + default: + s.enqueueAssertedWaker(w) + } +} + +// Clear moves the waker to then non-asserted state and returns whether it was +// asserted before being cleared. +// +// N.B. The waker isn't removed from the "ready" list of a sleeper (if it +// happens to be in one), but the sleeper will notice that it is not asserted +// anymore and won't return it to the caller. +func (w *Waker) Clear() bool { + // Nothing to do if the waker is not asserted. This check allows us to + // complete this case (already not asserted) without any interlocked + // operations on x86. + if atomic.LoadPointer(&w.s) != usleeper(&assertedSleeper) { + return false + } + + // Try to store nil in the sleeper, which indicates that the waker is + // not asserted. + return atomic.CompareAndSwapPointer(&w.s, usleeper(&assertedSleeper), nil) +} + +// IsAsserted returns whether the waker is currently asserted (i.e., if it's +// currently in a state that would cause its matching sleeper to wake up). +func (w *Waker) IsAsserted() bool { + return (*Sleeper)(atomic.LoadPointer(&w.s)) == &assertedSleeper +} + +func usleeper(s *Sleeper) unsafe.Pointer { + return unsafe.Pointer(s) +} + +func uwaker(w *Waker) unsafe.Pointer { + return unsafe.Pointer(w) +} diff --git a/tmutex/tmutex.go b/tmutex/tmutex.go new file mode 100644 index 0000000..78d95e0 --- /dev/null +++ b/tmutex/tmutex.go @@ -0,0 +1,51 @@ +package tmutex + +import ( + "sync/atomic" +) + +type Mutex struct { + v int32 + ch chan struct{} +} + +func (m *Mutex) Init() { + m.v = 1 + m.ch = make(chan struct{}, 1) +} + +func (m *Mutex) Lock() { + // ==0时 只有一个锁持有者 + if atomic.AddInt32(&m.v, -1) == 0 { + return + } + // !=0时 有多个想持有锁者 + for { + if v := atomic.LoadInt32(&m.v);v >= 0 && atomic.SwapInt32(&m.v, -1) == 1 { + return + } + <-m.ch // 排队阻塞 等待锁释放 + } +} + +func (m *Mutex) TryLock() bool { + v := atomic.LoadInt32(&m.v) + if v <= 0 { + return false + } + // CAS操作需要输入两个数值,一个旧值(期望操作前的值)和一个新值, + // 在操作期间先比较下旧值有没有发生变化, + // 如果没有发生变化,才交换成新值,发生了变化则不交换。 + return atomic.CompareAndSwapInt32(&m.v, 1, 0) +} + +func (m *Mutex) Unlock() { + if atomic.SwapInt32(&m.v, 1) == 0 { // 没有任何持有者 + return + } + + select { + case m.ch <- struct{}{}: + default: + } +} diff --git a/tmutex/tmutex_test.go b/tmutex/tmutex_test.go new file mode 100644 index 0000000..fb058e7 --- /dev/null +++ b/tmutex/tmutex_test.go @@ -0,0 +1,47 @@ +package tmutex + +import ( + "fmt" + "runtime" + "testing" + "time" +) + +func TestBasicLock(t *testing.T) { + var race = 0 + var m Mutex + m.Init() + + m.Lock() + + go func(){ + m.Lock() + race++ + m.Unlock() + }() + + go func(){ + m.Lock() + race++ + m.Unlock() + }() + + runtime.Gosched() // 让渡cpu + race++ + + m.Unlock() + + time.Sleep(time.Second) +} + +func TestShutOut(t *testing.T) { + + a := 1 + if a < 3 || func() bool { + fmt.Println("ShutOut") + return false + }() { + t.Logf("Ok\n") + } + +}