mirror of
https://github.com/duke-git/lancet.git
synced 2025-09-27 03:45:58 +08:00
258 lines
6.0 KiB
Go
258 lines
6.0 KiB
Go
// Copyright 2025 dudaodong@gmail.com. All rights reserved.
|
|
// Use of this source code is governed by MIT license
|
|
|
|
// Package concurrency contain some functions to support concurrent programming. eg, goroutine, channel, locker.
|
|
package concurrency
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// KeyedLocker is a simple implementation of a keyed locker that allows for non-blocking lock acquisition.
|
|
type KeyedLocker[K comparable] struct {
|
|
locks sync.Map
|
|
ttl time.Duration
|
|
}
|
|
|
|
type lockEntry struct {
|
|
mu sync.Mutex
|
|
ref int32
|
|
timer atomic.Pointer[time.Timer]
|
|
}
|
|
|
|
// NewKeyedLocker creates a new KeyedLocker with the specified TTL for lock expiration.
|
|
// The TTL is used to automatically release locks that are no longer held.
|
|
// Play: https://go.dev/play/p/GzeyC33T5rw
|
|
func NewKeyedLocker[K comparable](ttl time.Duration) *KeyedLocker[K] {
|
|
return &KeyedLocker[K]{ttl: ttl}
|
|
}
|
|
|
|
// Do acquires a lock for the specified key and executes the provided function.
|
|
// It returns an error if the context is canceled before the function completes.
|
|
// Play: https://go.dev/play/p/GzeyC33T5rw
|
|
func (l *KeyedLocker[K]) Do(ctx context.Context, key K, fn func()) error {
|
|
entry := l.acquire(key)
|
|
defer l.release(key, entry, key)
|
|
|
|
done := make(chan struct{})
|
|
|
|
go func() {
|
|
entry.mu.Lock()
|
|
defer entry.mu.Unlock()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
default:
|
|
fn()
|
|
}
|
|
close(done)
|
|
}()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-done:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// acquire tries to acquire a lock for the specified key.
|
|
func (l *KeyedLocker[K]) acquire(key K) *lockEntry {
|
|
lock, _ := l.locks.LoadOrStore(key, &lockEntry{})
|
|
entry := lock.(*lockEntry)
|
|
|
|
atomic.AddInt32(&entry.ref, 1)
|
|
if t := entry.timer.Swap(nil); t != nil {
|
|
t.Stop()
|
|
}
|
|
|
|
return entry
|
|
}
|
|
|
|
// release releases the lock for the specified key.
|
|
func (l *KeyedLocker[K]) release(key K, entry *lockEntry, rawKey K) {
|
|
if atomic.AddInt32(&entry.ref, -1) == 0 {
|
|
entry.mu.Lock()
|
|
defer entry.mu.Unlock()
|
|
|
|
if entry.ref == 0 {
|
|
if t := entry.timer.Swap(nil); t != nil {
|
|
t.Stop()
|
|
}
|
|
|
|
l.locks.Delete(rawKey)
|
|
} else {
|
|
if entry.timer.Load() == nil {
|
|
t := time.AfterFunc(l.ttl, func() {
|
|
l.release(key, entry, rawKey)
|
|
})
|
|
entry.timer.Store(t)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// RWKeyedLocker is a read-write version of KeyedLocker.
|
|
type RWKeyedLocker[K comparable] struct {
|
|
locks sync.Map
|
|
ttl time.Duration
|
|
}
|
|
|
|
type rwLockEntry struct {
|
|
mu sync.RWMutex
|
|
ref int32
|
|
timer atomic.Pointer[time.Timer]
|
|
}
|
|
|
|
// NewRWKeyedLocker creates a new RWKeyedLocker with the specified TTL for lock expiration.
|
|
// The TTL is used to automatically release locks that are no longer held.
|
|
// Play: https://go.dev/play/p/CkaJWWwZm9
|
|
func NewRWKeyedLocker[K comparable](ttl time.Duration) *RWKeyedLocker[K] {
|
|
return &RWKeyedLocker[K]{ttl: ttl}
|
|
}
|
|
|
|
// RLock acquires a read lock for the specified key and executes the provided function.
|
|
// It returns an error if the context is canceled before the function completes.
|
|
// Play: https://go.dev/play/p/ZrCr8sMo77T
|
|
func (l *RWKeyedLocker[K]) RLock(ctx context.Context, key K, fn func()) error {
|
|
entry := l.acquire(key)
|
|
defer l.release(entry, key)
|
|
|
|
done := make(chan struct{})
|
|
|
|
go func() {
|
|
entry.mu.RLock()
|
|
defer entry.mu.RUnlock()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
default:
|
|
fn()
|
|
}
|
|
close(done)
|
|
}()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-done:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Lock acquires a write lock for the specified key and executes the provided function.
|
|
// It returns an error if the context is canceled before the function completes.
|
|
// Play: https://go.dev/play/p/WgAcXbOPKGk
|
|
func (l *RWKeyedLocker[K]) Lock(ctx context.Context, key K, fn func()) error {
|
|
entry := l.acquire(key)
|
|
defer l.release(entry, key)
|
|
|
|
done := make(chan struct{})
|
|
|
|
go func() {
|
|
entry.mu.Lock()
|
|
defer entry.mu.Unlock()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
default:
|
|
fn()
|
|
}
|
|
close(done)
|
|
}()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-done:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// acquire tries to acquire a read lock for the specified key.
|
|
func (l *RWKeyedLocker[K]) acquire(key K) *rwLockEntry {
|
|
actual, _ := l.locks.LoadOrStore(key, &rwLockEntry{})
|
|
entry := actual.(*rwLockEntry)
|
|
atomic.AddInt32(&entry.ref, 1)
|
|
|
|
if t := entry.timer.Swap(nil); t != nil {
|
|
t.Stop()
|
|
}
|
|
return entry
|
|
}
|
|
|
|
// release releases the lock for the specified key.
|
|
func (l *RWKeyedLocker[K]) release(entry *rwLockEntry, rawKey K) {
|
|
if atomic.AddInt32(&entry.ref, -1) == 0 {
|
|
timer := time.AfterFunc(l.ttl, func() {
|
|
if atomic.LoadInt32(&entry.ref) == 0 {
|
|
l.locks.Delete(rawKey)
|
|
}
|
|
})
|
|
entry.timer.Store(timer)
|
|
}
|
|
}
|
|
|
|
// TryKeyedLocker is a non-blocking version of KeyedLocker.
|
|
// It allows for trying to acquire a lock without blocking if the lock is already held.
|
|
type TryKeyedLocker[K comparable] struct {
|
|
mu sync.Mutex
|
|
locks map[K]*casMutex
|
|
}
|
|
|
|
// NewTryKeyedLocker creates a new TryKeyedLocker.
|
|
// Play: https://go.dev/play/p/VG9qLvyetE2
|
|
func NewTryKeyedLocker[K comparable]() *TryKeyedLocker[K] {
|
|
return &TryKeyedLocker[K]{locks: make(map[K]*casMutex)}
|
|
}
|
|
|
|
// TryLock tries to acquire a lock for the specified key.
|
|
// It returns true if the lock was acquired, false otherwise.
|
|
// Play: https://go.dev/play/p/VG9qLvyetE2
|
|
func (l *TryKeyedLocker[K]) TryLock(key K) bool {
|
|
l.mu.Lock()
|
|
|
|
lock, ok := l.locks[key]
|
|
if !ok {
|
|
lock = &casMutex{}
|
|
l.locks[key] = lock
|
|
}
|
|
l.mu.Unlock()
|
|
|
|
return lock.TryLock()
|
|
}
|
|
|
|
// Unlock releases the lock for the specified key.
|
|
// Play: https://go.dev/play/p/VG9qLvyetE2
|
|
func (l *TryKeyedLocker[K]) Unlock(key K) {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
|
|
lock, ok := l.locks[key]
|
|
if ok {
|
|
lock.Unlock()
|
|
if lock.lock == 0 {
|
|
delete(l.locks, key)
|
|
}
|
|
}
|
|
}
|
|
|
|
// casMutex is a simple mutex that uses atomic operations to provide a non-blocking lock.
|
|
type casMutex struct {
|
|
lock int32
|
|
}
|
|
|
|
// TryLock tries to acquire the lock without blocking.
|
|
// It returns true if the lock was acquired, false otherwise.
|
|
func (m *casMutex) TryLock() bool {
|
|
return atomic.CompareAndSwapInt32(&m.lock, 0, 1)
|
|
}
|
|
|
|
// Unlock releases the lock.
|
|
func (m *casMutex) Unlock() {
|
|
atomic.StoreInt32(&m.lock, 0)
|
|
}
|