Files
rpcx/client/selector.go
2025-01-29 01:04:03 +08:00

321 lines
7.0 KiB
Go

package client
import (
"container/ring"
"context"
"math"
"math/rand"
"net/url"
"sort"
"strconv"
"time"
"github.com/edwingeng/doublejump"
"github.com/valyala/fastrand"
)
type SelectFunc func(ctx context.Context, servicePath, serviceMethod string, args interface{}) string
// Selector defines selector that selects one service from candidates.
type Selector interface {
Select(ctx context.Context, servicePath, serviceMethod string, args interface{}) string // SelectFunc
UpdateServer(servers map[string]string)
}
func newSelector(selectMode SelectMode, servers map[string]string) Selector {
switch selectMode {
case RandomSelect:
return newRandomSelector(servers)
case RoundRobin:
return newRoundRobinSelector(servers)
case WeightedRoundRobin:
return newWeightedRoundRobinSelector(servers)
case WeightedICMP:
return newWeightedICMPSelector(servers)
case ConsistentHash:
return newConsistentHashSelector(servers)
case SelectByUser:
return nil
default:
return newRandomSelector(servers)
}
}
// randomSelector selects randomly.
type randomSelector struct {
servers []string
}
func newRandomSelector(servers map[string]string) Selector {
ss := make([]string, 0, len(servers))
for k := range servers {
ss = append(ss, k)
}
return &randomSelector{servers: ss}
}
func (s *randomSelector) Select(ctx context.Context, servicePath, serviceMethod string, args interface{}) string {
ss := s.servers
if len(ss) == 0 {
return ""
}
i := fastrand.Uint32n(uint32(len(ss)))
return ss[i]
}
func (s *randomSelector) UpdateServer(servers map[string]string) {
ss := make([]string, 0, len(servers))
for k := range servers {
ss = append(ss, k)
}
s.servers = ss
}
// roundRobinSelector selects servers with roundrobin.
type roundRobinSelector struct {
servers []string
i int
}
func newRoundRobinSelector(servers map[string]string) Selector {
ss := make([]string, 0, len(servers))
for k := range servers {
ss = append(ss, k)
}
return &roundRobinSelector{servers: ss}
}
func (s *roundRobinSelector) Select(ctx context.Context, servicePath, serviceMethod string, args interface{}) string {
ss := s.servers
if len(ss) == 0 {
return ""
}
i := s.i
i = i % len(ss)
s.i = i + 1
return ss[i]
}
func (s *roundRobinSelector) UpdateServer(servers map[string]string) {
ss := make([]string, 0, len(servers))
for k := range servers {
ss = append(ss, k)
}
s.servers = ss
}
// weightedRoundRobinSelector selects servers with weighted.
type weightedRoundRobinSelector struct {
servers []*Weighted
totalWeight int
rr *ring.Ring
}
func newWeightedRoundRobinSelector(servers map[string]string) Selector {
ss := createWeighted(servers)
s := &weightedRoundRobinSelector{servers: ss}
s.buildRing()
return s
}
func (s *weightedRoundRobinSelector) Select(ctx context.Context, servicePath, serviceMethod string, args interface{}) string {
ss := s.servers
if len(ss) == 0 {
return ""
}
val := s.rr.Value
s.rr = s.rr.Next()
return val.(*Weighted).Server
}
func (s *weightedRoundRobinSelector) UpdateServer(servers map[string]string) {
newServer := newWeightedRoundRobinSelector(servers).(*weightedRoundRobinSelector)
*s = *newServer
}
func (s *weightedRoundRobinSelector) buildRing() {
s.totalWeight = 0
for _, w := range s.servers {
s.totalWeight += w.Weight
}
s.rr = ring.New(s.totalWeight)
for i := 0; i < s.totalWeight; i++ {
n := s.next()
s.rr.Value = n
s.rr = s.rr.Next()
}
}
func (s *weightedRoundRobinSelector) next() *Weighted {
if len(s.servers) == 0 {
return nil
}
n := len(s.servers)
if n == 0 {
return nil
}
if n == 1 {
return s.servers[0]
}
flag := 0
m := 0
for i := 0; i < n; i++ {
s.servers[i].CurrentWeight += s.servers[i].Weight
if s.servers[i].CurrentWeight > m {
m = s.servers[i].CurrentWeight
flag = i
}
}
s.servers[flag].CurrentWeight -= s.totalWeight
return s.servers[flag]
}
func createWeighted(servers map[string]string) []*Weighted {
ss := make([]*Weighted, 0, len(servers))
for k, metadata := range servers {
w := &Weighted{Server: k, Weight: 1}
if v, err := url.ParseQuery(metadata); err == nil {
ww := v.Get("weight")
if ww != "" {
if weight, err := strconv.Atoi(ww); err == nil {
w.Weight = weight
}
}
}
ss = append(ss, w)
}
return ss
}
type geoServer struct {
Server string
Latitude float64
Longitude float64
}
// geoSelector selects servers based on location.
type geoSelector struct {
servers []*geoServer
Latitude float64
Longitude float64
r *rand.Rand
}
func newGeoSelector(servers map[string]string, latitude, longitude float64) Selector {
ss := createGeoServer(servers)
r := rand.New(rand.NewSource(time.Now().UnixNano()))
return &geoSelector{servers: ss, Latitude: latitude, Longitude: longitude, r: r}
}
func (s *geoSelector) Select(ctx context.Context, servicePath, serviceMethod string, args interface{}) string {
if len(s.servers) == 0 {
return ""
}
var server []string
minNum := math.MaxFloat64
for _, gs := range s.servers {
d := getDistanceFrom(s.Latitude, s.Longitude, gs.Latitude, gs.Longitude)
if d < minNum {
server = []string{gs.Server}
minNum = d
} else if d == minNum {
server = append(server, gs.Server)
}
}
if len(server) == 1 {
return server[0]
}
return server[s.r.Intn(len(server))]
}
func (s *geoSelector) UpdateServer(servers map[string]string) {
ss := createGeoServer(servers)
s.servers = ss
}
func createGeoServer(servers map[string]string) []*geoServer {
geoServers := make([]*geoServer, 0, len(servers))
for s, metadata := range servers {
if v, err := url.ParseQuery(metadata); err == nil {
latStr := v.Get("latitude")
lonStr := v.Get("longitude")
if latStr == "" || lonStr == "" {
continue
}
lat, err := strconv.ParseFloat(latStr, 64)
if err != nil {
continue
}
lon, err := strconv.ParseFloat(lonStr, 64)
if err != nil {
continue
}
geoServers = append(geoServers, &geoServer{Server: s, Latitude: lat, Longitude: lon})
}
}
return geoServers
}
// consistentHashSelector selects based on JumpConsistentHash.
type consistentHashSelector struct {
h *doublejump.Hash
servers []string
}
func newConsistentHashSelector(servers map[string]string) Selector {
h := doublejump.NewHash()
ss := make([]string, 0, len(servers))
for k := range servers {
ss = append(ss, k)
h.Add(k)
}
sort.Slice(ss, func(i, j int) bool { return ss[i] < ss[j] })
return &consistentHashSelector{servers: ss, h: h}
}
func (s *consistentHashSelector) Select(ctx context.Context, servicePath, serviceMethod string, args interface{}) string {
ss := s.servers
if len(ss) == 0 {
return ""
}
key := genKey(servicePath, serviceMethod, args)
selected, _ := s.h.Get(key).(string)
return selected
}
func (s *consistentHashSelector) UpdateServer(servers map[string]string) {
ss := make([]string, 0, len(servers))
for k := range servers {
s.h.Add(k)
ss = append(ss, k)
}
sort.Slice(ss, func(i, j int) bool { return ss[i] < ss[j] })
for _, k := range s.servers {
if _, exist := servers[k]; !exist { // remove
s.h.Remove(k)
}
}
s.servers = ss
}