Files
goproxy/internal/loadbalance/loadbalancer.go
2025-03-13 15:56:33 +08:00

331 lines
5.9 KiB
Go

package loadbalance
import (
"math/rand"
"net/url"
"sync"
"sync/atomic"
"time"
)
// Strategy 负载均衡策略
type Strategy int
const (
// StrategyRoundRobin 轮询策略
StrategyRoundRobin Strategy = iota
// StrategyRandom 随机策略
StrategyRandom
// StrategyWeightedRoundRobin 加权轮询策略
StrategyWeightedRoundRobin
// StrategyIPHash IP哈希策略
StrategyIPHash
)
// LoadBalancer 负载均衡器接口
type LoadBalancer interface {
// Next 获取下一个后端
Next(key string) (*url.URL, error)
// Add 添加后端
Add(backend string, weight int) error
// Remove 删除后端
Remove(backend string) error
// MarkDown 标记后端为不可用
MarkDown(backend string) error
// MarkUp 标记后端为可用
MarkUp(backend string) error
// Reset 重置负载均衡器
Reset() error
}
// Backend 后端服务器
type Backend struct {
// URL 后端URL
URL *url.URL
// Weight 权重
Weight int
// Down 是否不可用
Down bool
}
// RoundRobinBalancer 轮询负载均衡器
type RoundRobinBalancer struct {
backends []*Backend
current int32
mutex sync.RWMutex
}
// NewRoundRobinBalancer 创建轮询负载均衡器
func NewRoundRobinBalancer() *RoundRobinBalancer {
return &RoundRobinBalancer{
backends: make([]*Backend, 0),
current: 0,
}
}
// Next 获取下一个后端
func (lb *RoundRobinBalancer) Next(key string) (*url.URL, error) {
lb.mutex.RLock()
defer lb.mutex.RUnlock()
if len(lb.backends) == 0 {
return nil, nil
}
// 计算可用后端数量
var availableCount int
for _, backend := range lb.backends {
if !backend.Down {
availableCount++
}
}
if availableCount == 0 {
return nil, nil
}
// 循环直到找到可用后端
for i := 0; i < len(lb.backends); i++ {
idx := atomic.AddInt32(&lb.current, 1) % int32(len(lb.backends))
backend := lb.backends[idx]
if !backend.Down {
return backend.URL, nil
}
}
return nil, nil
}
// Add 添加后端
func (lb *RoundRobinBalancer) Add(backend string, weight int) error {
lb.mutex.Lock()
defer lb.mutex.Unlock()
url, err := url.Parse(backend)
if err != nil {
return err
}
for _, b := range lb.backends {
if b.URL.String() == url.String() {
return nil
}
}
lb.backends = append(lb.backends, &Backend{
URL: url,
Weight: weight,
Down: false,
})
return nil
}
// Remove 删除后端
func (lb *RoundRobinBalancer) Remove(backend string) error {
lb.mutex.Lock()
defer lb.mutex.Unlock()
url, err := url.Parse(backend)
if err != nil {
return err
}
for i, b := range lb.backends {
if b.URL.String() == url.String() {
lb.backends = append(lb.backends[:i], lb.backends[i+1:]...)
return nil
}
}
return nil
}
// MarkDown 标记后端为不可用
func (lb *RoundRobinBalancer) MarkDown(backend string) error {
lb.mutex.Lock()
defer lb.mutex.Unlock()
url, err := url.Parse(backend)
if err != nil {
return err
}
for _, b := range lb.backends {
if b.URL.String() == url.String() {
b.Down = true
return nil
}
}
return nil
}
// MarkUp 标记后端为可用
func (lb *RoundRobinBalancer) MarkUp(backend string) error {
lb.mutex.Lock()
defer lb.mutex.Unlock()
url, err := url.Parse(backend)
if err != nil {
return err
}
for _, b := range lb.backends {
if b.URL.String() == url.String() {
b.Down = false
return nil
}
}
return nil
}
// Reset 重置负载均衡器
func (lb *RoundRobinBalancer) Reset() error {
lb.mutex.Lock()
defer lb.mutex.Unlock()
lb.backends = make([]*Backend, 0)
atomic.StoreInt32(&lb.current, 0)
return nil
}
// RandomBalancer 随机负载均衡器
type RandomBalancer struct {
backends []*Backend
mutex sync.RWMutex
rand *rand.Rand
}
// NewRandomBalancer 创建随机负载均衡器
func NewRandomBalancer() *RandomBalancer {
source := rand.NewSource(time.Now().UnixNano())
random := rand.New(source)
return &RandomBalancer{
backends: make([]*Backend, 0),
rand: random,
}
}
// Next 获取下一个后端
func (lb *RandomBalancer) Next(key string) (*url.URL, error) {
lb.mutex.RLock()
defer lb.mutex.RUnlock()
if len(lb.backends) == 0 {
return nil, nil
}
// 计算可用后端数量
var availableBackends []*Backend
for _, backend := range lb.backends {
if !backend.Down {
availableBackends = append(availableBackends, backend)
}
}
if len(availableBackends) == 0 {
return nil, nil
}
idx := lb.rand.Intn(len(availableBackends))
return availableBackends[idx].URL, nil
}
// Add 添加后端
func (lb *RandomBalancer) Add(backend string, weight int) error {
lb.mutex.Lock()
defer lb.mutex.Unlock()
url, err := url.Parse(backend)
if err != nil {
return err
}
for _, b := range lb.backends {
if b.URL.String() == url.String() {
return nil
}
}
lb.backends = append(lb.backends, &Backend{
URL: url,
Weight: weight,
Down: false,
})
return nil
}
// Remove 删除后端
func (lb *RandomBalancer) Remove(backend string) error {
lb.mutex.Lock()
defer lb.mutex.Unlock()
url, err := url.Parse(backend)
if err != nil {
return err
}
for i, b := range lb.backends {
if b.URL.String() == url.String() {
lb.backends = append(lb.backends[:i], lb.backends[i+1:]...)
return nil
}
}
return nil
}
// MarkDown 标记后端为不可用
func (lb *RandomBalancer) MarkDown(backend string) error {
lb.mutex.Lock()
defer lb.mutex.Unlock()
url, err := url.Parse(backend)
if err != nil {
return err
}
for _, b := range lb.backends {
if b.URL.String() == url.String() {
b.Down = true
return nil
}
}
return nil
}
// MarkUp 标记后端为可用
func (lb *RandomBalancer) MarkUp(backend string) error {
lb.mutex.Lock()
defer lb.mutex.Unlock()
url, err := url.Parse(backend)
if err != nil {
return err
}
for _, b := range lb.backends {
if b.URL.String() == url.String() {
b.Down = false
return nil
}
}
return nil
}
// Reset 重置负载均衡器
func (lb *RandomBalancer) Reset() error {
lb.mutex.Lock()
defer lb.mutex.Unlock()
lb.backends = make([]*Backend, 0)
return nil
}