diff --git a/app/api/api.go b/app/api/api.go index bfb2b6bb..e1fb0b14 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -12,6 +12,7 @@ import ( "net/url" "path/filepath" "runtime/debug" + "strings" "sync" "time" @@ -623,6 +624,46 @@ func (a *api) start() error { a.restream = restream if cfg.Cluster.Enable { + scheme := "http://" + address := cfg.Address + + if cfg.TLS.Enable { + scheme = "https://" + address = cfg.TLS.Address + } + + host, port, err := gonet.SplitHostPort(address) + if err != nil { + return fmt.Errorf("invalid core address: %s: %w", address, err) + } + + if len(host) == 0 { + chost, _, err := gonet.SplitHostPort(cfg.Cluster.Address) + if err != nil { + return fmt.Errorf("invalid cluster address: %s: %w", cfg.Cluster.Address, err) + } + + if len(chost) == 0 { + return fmt.Errorf("invalid cluster address: %s: %w", cfg.Cluster.Address, err) + } + + host = chost + } + + peers := []cluster.Peer{} + + for _, p := range cfg.Cluster.Peers { + id, address, found := strings.Cut(p, "@") + if !found { + continue + } + + peers = append(peers, cluster.Peer{ + ID: id, + Address: address, + }) + } + cluster, err := cluster.New(cluster.ClusterConfig{ ID: cfg.ID, Name: cfg.Name, @@ -630,8 +671,8 @@ func (a *api) start() error { Bootstrap: cfg.Cluster.Bootstrap, Recover: cfg.Cluster.Recover, Address: cfg.Cluster.Address, - JoinAddress: cfg.Cluster.JoinAddress, - CoreAPIAddress: cfg.Address, + Peers: peers, + CoreAPIAddress: scheme + gonet.JoinHostPort(host, port), CoreAPIUsername: cfg.API.Auth.Username, CoreAPIPassword: cfg.API.Auth.Password, IPLimiter: a.sessionsLimiter, diff --git a/cluster/api.go b/cluster/api.go index 71e7268b..fd68f185 100644 --- a/cluster/api.go +++ b/cluster/api.go @@ -42,8 +42,6 @@ type JoinRequest struct { ID string `json:"id"` RaftAddress string `json:"raft_address"` APIAddress string `json:"api_address"` - APIUsername string `json:"api_username"` - APIPassword string `json:"api_password"` } type LeaveRequest struct { @@ -73,8 +71,8 @@ func NewAPI(config APIConfig) (API, error) { a.router.Debug = true a.router.HTTPErrorHandler = errorhandler.HTTPErrorHandler a.router.Validator = validator.New() - a.router.HideBanner = false - a.router.HidePort = false + a.router.HideBanner = true + a.router.HidePort = true mwlog.NewWithConfig(mwlog.Config{ Logger: a.logger, @@ -105,7 +103,7 @@ func NewAPI(config APIConfig) (API, error) { return httpapi.Err(http.StatusLoopDetected, "", "breaking circuit") } - err := a.cluster.Join(r.Origin, r.ID, r.RaftAddress, r.APIAddress, r.APIUsername, r.APIPassword) + err := a.cluster.Join(r.Origin, r.ID, r.RaftAddress, r.APIAddress, "") if err != nil { a.logger.Debug().WithError(err).Log("unable to join cluster") return httpapi.Err(http.StatusInternalServerError, "unable to join cluster", "%s", err) diff --git a/cluster/cluster.go b/cluster/cluster.go index cc5ef015..9ff7ff8b 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -9,6 +9,7 @@ import ( "fmt" "io" gonet "net" + "net/url" "path/filepath" "reflect" "strconv" @@ -69,33 +70,33 @@ type Cluster interface { Addr() string APIAddr(raftAddress string) (string, error) - Join(origin, id, raftAddress, apiAddress, apiUsername, apiPassword string) error + Join(origin, id, raftAddress, apiAddress, peerAddress string) error Leave(origin, id string) error // gracefully remove a node from the cluster Snapshot() ([]byte, error) Shutdown() error - AddNode(id, address, username, password string) error + AddNode(id, address string) error RemoveNode(id string) error ListNodes() []addNodeCommand GetNode(id string) (addNodeCommand, error) - AddNodeX(address, username, password string) (string, error) - RemoveNodeX(id string) error - ListNodesX() []NodeReader - GetNodeX(id string) (NodeReader, error) - ClusterReader } +type Peer struct { + ID string + Address string +} + type ClusterConfig struct { - ID string // ID of the node - Name string // Name of the node - Path string // Path where to store all cluster data - Bootstrap bool // Whether to bootstrap a cluster - Recover bool // Whether to recover this node - Address string // Listen address for the raft protocol - JoinAddress string // Address of a member of a cluster to join + ID string // ID of the node + Name string // Name of the node + Path string // Path where to store all cluster data + Bootstrap bool // Whether to bootstrap a cluster + Recover bool // Whether to recover this node + Address string // Listen address for the raft protocol + Peers []Peer // Address of a member of a cluster to join CoreAPIAddress string // Address of the core API CoreAPIUsername string // Username for the core API @@ -110,19 +111,6 @@ type cluster struct { name string path string - nodes map[string]*node // List of known nodes - idfiles map[string][]string // Map from nodeid to list of files - idupdate map[string]time.Time // Map from nodeid to time of last update - fileid map[string]string // Map from file name to nodeid - - limiter net.IPLimiter - - updates chan NodeState - - lock sync.RWMutex - cancel context.CancelFunc - once sync.Once - logger log.Logger raft *raft.Raft @@ -133,7 +121,7 @@ type cluster struct { raftStore *raftboltdb.BoltStore raftRemoveGracePeriod time.Duration - joinAddress string + peers []Peer store Store @@ -147,12 +135,9 @@ type cluster struct { forwarder Forwarder api API + proxy Proxy - core struct { - address string - username string - password string - } + coreAddress string isRaftLeader bool hasRaftLeader bool @@ -162,33 +147,32 @@ type cluster struct { func New(config ClusterConfig) (Cluster, error) { c := &cluster{ - id: config.ID, - name: config.Name, - path: config.Path, - nodes: map[string]*node{}, - idfiles: map[string][]string{}, - idupdate: map[string]time.Time{}, - fileid: map[string]string{}, - limiter: config.IPLimiter, - updates: make(chan NodeState, 64), - logger: config.Logger, + id: config.ID, + name: config.Name, + path: config.Path, + logger: config.Logger, raftAddress: config.Address, - joinAddress: config.JoinAddress, + peers: config.Peers, reassertLeaderCh: make(chan chan error), leaveCh: make(chan struct{}), shutdownCh: make(chan struct{}), } - c.core.address = config.CoreAPIAddress - c.core.username = config.CoreAPIUsername - c.core.password = config.CoreAPIPassword - - if c.limiter == nil { - c.limiter = net.NewNullIPLimiter() + u, err := url.Parse(config.CoreAPIAddress) + if err != nil { + return nil, fmt.Errorf("invalid core API address: %w", err) } + if len(config.CoreAPIPassword) == 0 { + u.User = url.User(config.CoreAPIUsername) + } else { + u.User = url.UserPassword(config.CoreAPIUsername, config.CoreAPIPassword) + } + + c.coreAddress = u.String() + if c.logger == nil { c.logger = log.New("") } @@ -215,6 +199,23 @@ func New(config ClusterConfig) (Cluster, error) { c.api = api + proxy, err := NewProxy(ProxyConfig{ + ID: c.id, + Name: c.name, + IPLimiter: config.IPLimiter, + Logger: c.logger.WithField("logname", "proxy"), + }) + if err != nil { + c.shutdownAPI() + return nil, err + } + + go func(proxy Proxy) { + proxy.Start() + }(proxy) + + c.proxy = proxy + if forwarder, err := NewForwarder(ForwarderConfig{ ID: c.id, Logger: c.logger.WithField("logname", "forwarder"), @@ -227,73 +228,42 @@ func New(config ClusterConfig) (Cluster, error) { c.logger.Debug().Log("starting raft") - err = c.startRaft(store, config.Bootstrap, config.Recover, false) + err = c.startRaft(store, config.Bootstrap, config.Recover, config.Peers, false) if err != nil { c.shutdownAPI() return nil, err } - if len(c.joinAddress) != 0 { - addr, _ := c.APIAddr(c.joinAddress) - c.forwarder.SetLeader(addr) - - go func(addr string) { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - - for { - select { - case <-c.shutdownCh: - return - case <-ticker.C: - c.logger.Debug().Log("joining cluster at %s", c.joinAddress) - err := c.Join("", c.id, c.raftAddress, c.core.address, c.core.username, c.core.password) - if err != nil { - c.logger.Warn().WithError(err).Log("unable to join %s", c.joinAddress) - continue - } - - return - } + if len(config.Peers) != 0 { + for i := 0; i < len(config.Peers); i++ { + peerAddress, err := c.APIAddr(config.Peers[i].Address) + if err != nil { + c.shutdownAPI() + c.shutdownRaft() + return nil, err } - }(addr) - } - go func() { - for { - select { - case <-c.shutdownCh: - return - case state := <-c.updates: - c.logger.Debug().WithFields(log.Fields{ - "node": state.ID, - "state": state.State, - "files": len(state.Files), - }).Log("Got update") + go func(peerAddress string) { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() - c.lock.Lock() + for { + select { + case <-c.shutdownCh: + return + case <-ticker.C: + err := c.Join("", c.id, c.raftAddress, c.coreAddress, peerAddress) + if err != nil { + c.logger.Warn().WithError(err).Log("unable to join cluster") + continue + } - // Cleanup - files := c.idfiles[state.ID] - for _, file := range files { - delete(c.fileid, file) - } - delete(c.idfiles, state.ID) - delete(c.idupdate, state.ID) - - if state.State == "connected" { - // Add files - for _, file := range state.Files { - c.fileid[file] = state.ID + return } - c.idfiles[state.ID] = files - c.idupdate[state.ID] = state.LastUpdate } - - c.lock.Unlock() - } + }(peerAddress) } - }() + } return c, nil } @@ -329,15 +299,11 @@ func (c *cluster) Shutdown() error { c.shutdown = true close(c.shutdownCh) - c.lock.Lock() - defer c.lock.Unlock() - - for _, node := range c.nodes { - node.stop() + if c.proxy != nil { + c.proxy.Stop() } - c.nodes = map[string]*node{} - + c.shutdownAPI() c.shutdownRaft() return nil @@ -488,10 +454,10 @@ func (c *cluster) Leave(origin, id string) error { return nil } -func (c *cluster) Join(origin, id, raftAddress, apiAddress, apiUsername, apiPassword string) error { +func (c *cluster) Join(origin, id, raftAddress, apiAddress, peerAddress string) error { if !c.IsRaftLeader() { c.logger.Debug().Log("not leader, forwarding to leader") - return c.forwarder.Join(origin, id, raftAddress, apiAddress, apiUsername, apiPassword) + return c.forwarder.Join(origin, id, raftAddress, apiAddress, peerAddress) } c.logger.Debug().Log("leader: joining %s", raftAddress) @@ -501,12 +467,16 @@ func (c *cluster) Join(origin, id, raftAddress, apiAddress, apiUsername, apiPass "address": raftAddress, }).Log("received join request for remote node") + // connect to the peer's API in order to find out if our version is compatible + configFuture := c.raft.GetConfiguration() if err := configFuture.Error(); err != nil { c.logger.Error().WithError(err).Log("failed to get raft configuration") return err } + nodeExists := false + for _, srv := range configFuture.Configuration().Servers { // If a node already exists with either the joining node's ID or address, // that node may need to be removed from the config first. @@ -514,30 +484,32 @@ func (c *cluster) Join(origin, id, raftAddress, apiAddress, apiUsername, apiPass // However if *both* the ID and the address are the same, then nothing -- not even // a join operation -- is needed. if srv.ID == raft.ServerID(id) && srv.Address == raft.ServerAddress(raftAddress) { + nodeExists = true c.logger.Debug().WithFields(log.Fields{ "nodeid": id, "address": raftAddress, }).Log("node is already member of cluster, ignoring join request") - return nil - } - - future := c.raft.RemoveServer(srv.ID, 0, 0) - if err := future.Error(); err != nil { - c.logger.Error().WithError(err).WithFields(log.Fields{ - "nodeid": id, - "address": raftAddress, - }).Log("error removing existing node") - return fmt.Errorf("error removing existing node %s at %s: %w", id, raftAddress, err) + } else { + future := c.raft.RemoveServer(srv.ID, 0, 0) + if err := future.Error(); err != nil { + c.logger.Error().WithError(err).WithFields(log.Fields{ + "nodeid": id, + "address": raftAddress, + }).Log("error removing existing node") + return fmt.Errorf("error removing existing node %s at %s: %w", id, raftAddress, err) + } } } } - f := c.raft.AddVoter(raft.ServerID(id), raft.ServerAddress(raftAddress), 0, 0) - if err := f.Error(); err != nil { - return err + if !nodeExists { + f := c.raft.AddVoter(raft.ServerID(id), raft.ServerAddress(raftAddress), 0, 0) + if err := f.Error(); err != nil { + return err + } } - if err := c.AddNode(id, apiAddress, apiUsername, apiPassword); err != nil { + if err := c.AddNode(id, apiAddress); err != nil { /* future := c.raft.RemoveServer(raft.ServerID(id), 0, 0) if err := future.Error(); err != nil { @@ -616,7 +588,7 @@ func (c *cluster) GetNode(id string) (addNodeCommand, error) { return addNodeCommand{}, nil } -func (c *cluster) AddNode(id, address, username, password string) error { +func (c *cluster) AddNode(id, address string) error { if !c.IsRaftLeader() { return fmt.Errorf("not leader") } @@ -624,10 +596,8 @@ func (c *cluster) AddNode(id, address, username, password string) error { com := &command{ Operation: opAddNode, Data: &addNodeCommand{ - ID: id, - Address: address, - Username: username, - Password: password, + ID: id, + Address: address, }, } @@ -717,171 +687,7 @@ func (c *cluster) trackLeaderChanges() { } } -func (c *cluster) AddNodeX(address, username, password string) (string, error) { - node, err := newNode(address, username, password, c.updates) - if err != nil { - return "", err - } - - id := node.ID() - - if id == c.id { - return "", fmt.Errorf("can't add myself as node or a node with the same ID") - } - - c.lock.Lock() - defer c.lock.Unlock() - - if _, ok := c.nodes[id]; ok { - node.stop() - return id, nil - } - - ips := node.IPs() - for _, ip := range ips { - c.limiter.AddBlock(ip) - } - - c.nodes[id] = node - - c.logger.Info().WithFields(log.Fields{ - "address": address, - "id": id, - }).Log("Added node") - - return id, nil -} - -func (c *cluster) RemoveNodeX(id string) error { - c.lock.Lock() - defer c.lock.Unlock() - - node, ok := c.nodes[id] - if !ok { - return ErrNodeNotFound - } - - node.stop() - - delete(c.nodes, id) - - ips := node.IPs() - - for _, ip := range ips { - c.limiter.RemoveBlock(ip) - } - - c.Leave("", id) - - c.logger.Info().WithFields(log.Fields{ - "id": id, - }).Log("Removed node") - - return nil -} - -func (c *cluster) ListNodesX() []NodeReader { - list := []NodeReader{} - - c.lock.RLock() - defer c.lock.RUnlock() - - for _, node := range c.nodes { - list = append(list, node) - } - - return list -} - -func (c *cluster) GetNodeX(id string) (NodeReader, error) { - c.lock.RLock() - defer c.lock.RUnlock() - - node, ok := c.nodes[id] - if !ok { - return nil, fmt.Errorf("node not found") - } - - return node, nil -} - -func (c *cluster) GetURL(path string) (string, error) { - c.lock.RLock() - defer c.lock.RUnlock() - - id, ok := c.fileid[path] - if !ok { - c.logger.Debug().WithField("path", path).Log("Not found") - return "", fmt.Errorf("file not found") - } - - ts, ok := c.idupdate[id] - if !ok { - c.logger.Debug().WithField("path", path).Log("No age information found") - return "", fmt.Errorf("file not found") - } - - if time.Since(ts) > 2*time.Second { - c.logger.Debug().WithField("path", path).Log("File too old") - return "", fmt.Errorf("file not found") - } - - node, ok := c.nodes[id] - if !ok { - c.logger.Debug().WithField("path", path).Log("Unknown node") - return "", fmt.Errorf("file not found") - } - - url, err := node.getURL(path) - if err != nil { - c.logger.Debug().WithField("path", path).Log("Invalid path") - return "", fmt.Errorf("file not found") - } - - c.logger.Debug().WithField("url", url).Log("File cluster url") - - return url, nil -} - -func (c *cluster) GetFile(path string) (io.ReadCloser, error) { - c.lock.RLock() - defer c.lock.RUnlock() - - id, ok := c.fileid[path] - if !ok { - c.logger.Debug().WithField("path", path).Log("Not found") - return nil, fmt.Errorf("file not found") - } - - ts, ok := c.idupdate[id] - if !ok { - c.logger.Debug().WithField("path", path).Log("No age information found") - return nil, fmt.Errorf("file not found") - } - - if time.Since(ts) > 2*time.Second { - c.logger.Debug().WithField("path", path).Log("File too old") - return nil, fmt.Errorf("file not found") - } - - node, ok := c.nodes[id] - if !ok { - c.logger.Debug().WithField("path", path).Log("Unknown node") - return nil, fmt.Errorf("file not found") - } - - data, err := node.getFile(path) - if err != nil { - c.logger.Debug().WithField("path", path).Log("Invalid path") - return nil, fmt.Errorf("file not found") - } - - c.logger.Debug().WithField("path", path).Log("File cluster path") - - return data, nil -} - -func (c *cluster) startRaft(fsm raft.FSM, bootstrap, recover, inmem bool) error { +func (c *cluster) startRaft(fsm raft.FSM, bootstrap, recover bool, peers []Peer, inmem bool) error { defer func() { if c.raft == nil && c.raftStore != nil { c.raftStore.Close() @@ -948,16 +754,26 @@ func (c *cluster) startRaft(fsm raft.FSM, bootstrap, recover, inmem bool) error if !hasState { // Bootstrap cluster - configuration := raft.Configuration{ - Servers: []raft.Server{ - { - Suffrage: raft.Voter, - ID: raft.ServerID(c.id), - Address: transport.LocalAddr(), - }, + servers := []raft.Server{ + { + Suffrage: raft.Voter, + ID: raft.ServerID(c.id), + Address: transport.LocalAddr(), }, } + for _, p := range peers { + servers = append(servers, raft.Server{ + Suffrage: raft.Voter, + ID: raft.ServerID(p.ID), + Address: raft.ServerAddress(p.Address), + }) + } + + configuration := raft.Configuration{ + Servers: servers, + } + if err := raft.BootstrapCluster(cfg, logStore, stableStore, snapshots, transport, configuration); err != nil { return err } @@ -970,16 +786,26 @@ func (c *cluster) startRaft(fsm raft.FSM, bootstrap, recover, inmem bool) error return err } - configuration := raft.Configuration{ - Servers: []raft.Server{ - { - Suffrage: raft.Voter, - ID: raft.ServerID(c.id), - Address: transport.LocalAddr(), - }, + servers := []raft.Server{ + { + Suffrage: raft.Voter, + ID: raft.ServerID(c.id), + Address: transport.LocalAddr(), }, } + for _, p := range peers { + servers = append(servers, raft.Server{ + Suffrage: raft.Voter, + ID: raft.ServerID(p.ID), + Address: raft.ServerAddress(p.Address), + }) + } + + configuration := raft.Configuration{ + Servers: servers, + } + if err := raft.RecoverCluster(cfg, fsm, logStore, stableStore, snapshots, transport, configuration); err != nil { return err } @@ -1123,3 +949,11 @@ func (c *cluster) sentinel() { } } } + +func (c *cluster) GetURL(path string) (string, error) { + return c.proxy.GetURL(path) +} + +func (c *cluster) GetFile(path string) (io.ReadCloser, error) { + return c.proxy.GetFile(path) +} diff --git a/cluster/forwarder.go b/cluster/forwarder.go index 1775184a..3171d018 100644 --- a/cluster/forwarder.go +++ b/cluster/forwarder.go @@ -17,7 +17,7 @@ import ( type Forwarder interface { SetLeader(address string) HasLeader() bool - Join(origin, id, raftAddress, apiAddress, apiUsername, apiPassword string) error + Join(origin, id, raftAddress, apiAddress, peerAddress string) error Leave(origin, id string) error Snapshot() ([]byte, error) AddProcess() error @@ -82,7 +82,7 @@ func (f *forwarder) HasLeader() bool { return len(f.leaderAddr) != 0 } -func (f *forwarder) Join(origin, id, raftAddress, apiAddress, apiUsername, apiPassword string) error { +func (f *forwarder) Join(origin, id, raftAddress, apiAddress, peerAddress string) error { if origin == "" { origin = f.id } @@ -92,8 +92,6 @@ func (f *forwarder) Join(origin, id, raftAddress, apiAddress, apiUsername, apiPa ID: id, RaftAddress: raftAddress, APIAddress: apiAddress, - APIUsername: apiUsername, - APIPassword: apiPassword, } f.logger.Debug().WithField("request", r).Log("forwarding to leader") @@ -103,7 +101,7 @@ func (f *forwarder) Join(origin, id, raftAddress, apiAddress, apiUsername, apiPa return err } - _, err = f.call(http.MethodPost, "/join", "application/json", bytes.NewReader(data)) + _, err = f.call(http.MethodPost, "/join", "application/json", bytes.NewReader(data), peerAddress) return err } @@ -125,13 +123,13 @@ func (f *forwarder) Leave(origin, id string) error { return err } - _, err = f.call(http.MethodPost, "/leave", "application/json", bytes.NewReader(data)) + _, err = f.call(http.MethodPost, "/leave", "application/json", bytes.NewReader(data), "") return err } func (f *forwarder) Snapshot() ([]byte, error) { - r, err := f.stream(http.MethodGet, "/snapshot", "", nil) + r, err := f.stream(http.MethodGet, "/snapshot", "", nil, "") if err != nil { return nil, err } @@ -156,13 +154,18 @@ func (f *forwarder) RemoveProcess() error { return fmt.Errorf("not implemented") } -func (f *forwarder) stream(method, path, contentType string, data io.Reader) (io.ReadCloser, error) { - if len(f.leaderAddr) == 0 { +func (f *forwarder) stream(method, path, contentType string, data io.Reader, leaderOverride string) (io.ReadCloser, error) { + leaderAddr := f.leaderAddr + if len(leaderOverride) != 0 { + leaderAddr = leaderOverride + } + + if len(leaderAddr) == 0 { return nil, fmt.Errorf("no leader address defined") } f.lock.RLock() - address := "http://" + f.leaderAddr + "/v1" + path + address := "http://" + leaderAddr + "/v1" + path f.lock.RUnlock() f.logger.Debug().Log("address: %s", address) @@ -196,8 +199,8 @@ func (f *forwarder) stream(method, path, contentType string, data io.Reader) (io return body, nil } -func (f *forwarder) call(method, path, contentType string, data io.Reader) ([]byte, error) { - body, err := f.stream(method, path, contentType, data) +func (f *forwarder) call(method, path, contentType string, data io.Reader, leaderOverride string) ([]byte, error) { + body, err := f.stream(method, path, contentType, data, leaderOverride) if err != nil { return nil, err } diff --git a/cluster/leader.go b/cluster/leader.go index 59f279ea..3fbd8a19 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -141,8 +141,6 @@ func (c *cluster) monitorLeadership() { c.isRaftLeader = true c.isLeader = true c.leaderLock.Unlock() - - c.AddNode(c.id, c.core.address, c.core.username, c.core.password) } else if notification == NOTIFY_EMERGENCY { if weAreEmergencyLeaderCh != nil { // we are already emergency leader, don't do anything diff --git a/cluster/node.go b/cluster/node.go index 4f093c88..ecb3e154 100644 --- a/cluster/node.go +++ b/cluster/node.go @@ -29,6 +29,11 @@ type NodeState struct { LastUpdate time.Time } +type NodeSpecs struct { + ID string + Address string +} + type nodeState string func (n nodeState) String() string { @@ -67,7 +72,7 @@ type node struct { prefix *regexp.Regexp } -func newNode(address, username, password string, updates chan<- NodeState) (*node, error) { +func newNode(address string, updates chan<- NodeState) (*node, error) { u, err := url.Parse(address) if err != nil { return nil, fmt.Errorf("invalid address: %w", err) @@ -83,6 +88,12 @@ func newNode(address, username, password string, updates chan<- NodeState) (*nod return nil, fmt.Errorf("lookup failed: %w", err) } + username := u.User.Username() + password := "" + if pw, ok := u.User.Password(); ok { + password = pw + } + n := &node{ address: address, ips: addrs, diff --git a/cluster/proxy.go b/cluster/proxy.go new file mode 100644 index 00000000..28e1a2aa --- /dev/null +++ b/cluster/proxy.go @@ -0,0 +1,311 @@ +package cluster + +import ( + "context" + "fmt" + "io" + "sync" + "time" + + "github.com/datarhei/core/v16/log" + "github.com/datarhei/core/v16/net" +) + +type Proxy interface { + Start() + Stop() + + AddNode(address string) (string, error) + RemoveNode(id string) error + ListNodes() []NodeReader + GetNode(id string) (NodeReader, error) + + GetURL(path string) (string, error) + GetFile(path string) (io.ReadCloser, error) +} + +type ProxyConfig struct { + ID string // ID of the node + Name string // Name of the node + + IPLimiter net.IPLimiter + Logger log.Logger +} + +type proxy struct { + id string + name string + + nodes map[string]*node // List of known nodes + idfiles map[string][]string // Map from nodeid to list of files + idupdate map[string]time.Time // Map from nodeid to time of last update + fileid map[string]string // Map from file name to nodeid + + limiter net.IPLimiter + + updates chan NodeState + + lock sync.RWMutex + cancel context.CancelFunc + + running bool + + logger log.Logger +} + +func NewProxy(config ProxyConfig) (Proxy, error) { + p := &proxy{ + id: config.ID, + name: config.Name, + nodes: map[string]*node{}, + idfiles: map[string][]string{}, + idupdate: map[string]time.Time{}, + fileid: map[string]string{}, + limiter: config.IPLimiter, + updates: make(chan NodeState, 64), + logger: config.Logger, + } + + if p.limiter == nil { + p.limiter = net.NewNullIPLimiter() + } + + if p.logger == nil { + p.logger = log.New("") + } + + return p, nil +} + +func (p *proxy) Start() { + p.lock.Lock() + defer p.lock.Unlock() + + if p.running { + return + } + + p.running = true + + p.logger.Debug().Log("starting proxy") + + ctx, cancel := context.WithCancel(context.Background()) + p.cancel = cancel + + go func(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case state := <-p.updates: + p.logger.Debug().WithFields(log.Fields{ + "node": state.ID, + "state": state.State, + "files": len(state.Files), + }).Log("Got update") + + p.lock.Lock() + + // Cleanup + files := p.idfiles[state.ID] + for _, file := range files { + delete(p.fileid, file) + } + delete(p.idfiles, state.ID) + delete(p.idupdate, state.ID) + + if state.State == "connected" { + // Add files + for _, file := range state.Files { + p.fileid[file] = state.ID + } + p.idfiles[state.ID] = files + p.idupdate[state.ID] = state.LastUpdate + } + + p.lock.Unlock() + } + } + }(ctx) +} + +func (p *proxy) Stop() { + p.lock.Lock() + defer p.lock.Unlock() + + if !p.running { + return + } + + p.running = false + + p.logger.Debug().Log("stopping proxy") + + for _, node := range p.nodes { + node.stop() + } + + p.nodes = map[string]*node{} +} + +func (p *proxy) AddNode(address string) (string, error) { + node, err := newNode(address, p.updates) + if err != nil { + return "", err + } + + id := node.ID() + + if id == p.id { + return "", fmt.Errorf("can't add myself as node or a node with the same ID") + } + + p.lock.Lock() + defer p.lock.Unlock() + + if _, ok := p.nodes[id]; ok { + node.stop() + return id, nil + } + + ips := node.IPs() + for _, ip := range ips { + p.limiter.AddBlock(ip) + } + + p.nodes[id] = node + + p.logger.Info().WithFields(log.Fields{ + "address": address, + "id": id, + }).Log("Added node") + + return id, nil +} + +func (p *proxy) RemoveNode(id string) error { + p.lock.Lock() + defer p.lock.Unlock() + + node, ok := p.nodes[id] + if !ok { + return ErrNodeNotFound + } + + node.stop() + + delete(p.nodes, id) + + ips := node.IPs() + + for _, ip := range ips { + p.limiter.RemoveBlock(ip) + } + + p.logger.Info().WithFields(log.Fields{ + "id": id, + }).Log("Removed node") + + return nil +} + +func (p *proxy) ListNodes() []NodeReader { + list := []NodeReader{} + + p.lock.RLock() + defer p.lock.RUnlock() + + for _, node := range p.nodes { + list = append(list, node) + } + + return list +} + +func (p *proxy) GetNode(id string) (NodeReader, error) { + p.lock.RLock() + defer p.lock.RUnlock() + + node, ok := p.nodes[id] + if !ok { + return nil, fmt.Errorf("node not found") + } + + return node, nil +} + +func (c *proxy) GetURL(path string) (string, error) { + c.lock.RLock() + defer c.lock.RUnlock() + + id, ok := c.fileid[path] + if !ok { + c.logger.Debug().WithField("path", path).Log("Not found") + return "", fmt.Errorf("file not found") + } + + ts, ok := c.idupdate[id] + if !ok { + c.logger.Debug().WithField("path", path).Log("No age information found") + return "", fmt.Errorf("file not found") + } + + if time.Since(ts) > 2*time.Second { + c.logger.Debug().WithField("path", path).Log("File too old") + return "", fmt.Errorf("file not found") + } + + node, ok := c.nodes[id] + if !ok { + c.logger.Debug().WithField("path", path).Log("Unknown node") + return "", fmt.Errorf("file not found") + } + + url, err := node.getURL(path) + if err != nil { + c.logger.Debug().WithField("path", path).Log("Invalid path") + return "", fmt.Errorf("file not found") + } + + c.logger.Debug().WithField("url", url).Log("File cluster url") + + return url, nil +} + +func (p *proxy) GetFile(path string) (io.ReadCloser, error) { + p.lock.RLock() + defer p.lock.RUnlock() + + id, ok := p.fileid[path] + if !ok { + p.logger.Debug().WithField("path", path).Log("Not found") + return nil, fmt.Errorf("file not found") + } + + ts, ok := p.idupdate[id] + if !ok { + p.logger.Debug().WithField("path", path).Log("No age information found") + return nil, fmt.Errorf("file not found") + } + + if time.Since(ts) > 2*time.Second { + p.logger.Debug().WithField("path", path).Log("File too old") + return nil, fmt.Errorf("file not found") + } + + node, ok := p.nodes[id] + if !ok { + p.logger.Debug().WithField("path", path).Log("Unknown node") + return nil, fmt.Errorf("file not found") + } + + data, err := node.getFile(path) + if err != nil { + p.logger.Debug().WithField("path", path).Log("Invalid path") + return nil, fmt.Errorf("file not found") + } + + p.logger.Debug().WithField("path", path).Log("File cluster path") + + return data, nil +} diff --git a/cluster/store.go b/cluster/store.go index ac84ffac..6322bd40 100644 --- a/cluster/store.go +++ b/cluster/store.go @@ -31,10 +31,8 @@ type command struct { } type addNodeCommand struct { - ID string - Address string - Username string - Password string + ID string + Address string } type removeNodeCommand struct { @@ -93,6 +91,10 @@ func (s *store) Apply(log *raft.Log) interface{} { delete(s.Nodes, cmd.ID) s.lock.Unlock() } + + s.lock.RLock() + fmt.Printf("\n==> %+v\n\n", s.Nodes) + s.lock.RUnlock() return nil } diff --git a/config/config.go b/config/config.go index 2591766f..dd1aa0cb 100644 --- a/config/config.go +++ b/config/config.go @@ -135,6 +135,8 @@ func (d *Config) Clone() *Config { data.Router.BlockedPrefixes = copy.Slice(d.Router.BlockedPrefixes) data.Router.Routes = copy.StringMap(d.Router.Routes) + data.Cluster.Peers = copy.Slice(d.Cluster.Peers) + data.vars.Transfer(&d.vars) return data @@ -278,8 +280,8 @@ func (d *Config) init() { d.vars.Register(value.NewBool(&d.Cluster.Bootstrap, false), "cluster.bootstrap", "CORE_CLUSTER_BOOTSTRAP", nil, "Bootstrap a cluster", false, false) d.vars.Register(value.NewBool(&d.Cluster.Recover, false), "cluster.recover", "CORE_CLUSTER_RECOVER", nil, "Recover a cluster", false, false) d.vars.Register(value.NewBool(&d.Cluster.Debug, false), "cluster.debug", "CORE_CLUSTER_DEBUG", nil, "Switch to debug mode, not for production", false, false) - d.vars.Register(value.NewAddress(&d.Cluster.Address, ":8000"), "cluster.address", "CORE_CLUSTER_ADDRESS", nil, "Raft listen address", false, true) - d.vars.Register(value.NewString(&d.Cluster.JoinAddress, ""), "cluster.join_address", "CORE_CLUSTER_JOIN_ADDRESS", nil, "Raft address of a core that is part of the cluster", false, true) + d.vars.Register(value.NewClusterAddress(&d.Cluster.Address, ""), "cluster.address", "CORE_CLUSTER_ADDRESS", nil, "Raft listen address", false, true) + d.vars.Register(value.NewClusterPeerList(&d.Cluster.Peers, []string{""}, ","), "cluster.peers", "CORE_CLUSTER_PEERS", nil, "Raft address of a cores that are part of the cluster", false, true) } // Validate validates the current state of the Config for completeness and sanity. Errors are @@ -462,8 +464,8 @@ func (d *Config) Validate(resetLogs bool) { // If cluster mode is enabled, we can't join and bootstrap at the same time if d.Cluster.Enable { - if d.Cluster.Bootstrap && len(d.Cluster.JoinAddress) != 0 { - d.vars.Log("error", "cluster.join_address", "can't be set if cluster.bootstrap is enabled") + if len(d.Cluster.Address) == 0 { + d.vars.Log("error", "cluster.address", "must be provided") } } } diff --git a/config/data.go b/config/data.go index f7a38188..8899e418 100644 --- a/config/data.go +++ b/config/data.go @@ -167,12 +167,12 @@ type Data struct { UIPath string `json:"ui_path"` } `json:"router"` Cluster struct { - Enable bool `json:"enable"` - Bootstrap bool `json:"bootstrap"` - Recover bool `json:"recover"` - Debug bool `json:"debug"` - Address string `json:"address"` - JoinAddress string `json:"join_address"` + Enable bool `json:"enable"` + Bootstrap bool `json:"bootstrap"` + Recover bool `json:"recover"` + Debug bool `json:"debug"` + Address string `json:"address"` + Peers []string `json:"peers"` } `json:"cluster"` } diff --git a/config/value/cluster.go b/config/value/cluster.go new file mode 100644 index 00000000..dd4af0f6 --- /dev/null +++ b/config/value/cluster.go @@ -0,0 +1,195 @@ +package value + +import ( + "fmt" + "net" + "regexp" + "strings" +) + +// cluster address (host:port) + +type ClusterAddress string + +func NewClusterAddress(p *string, val string) *ClusterAddress { + *p = val + + return (*ClusterAddress)(p) +} + +func (s *ClusterAddress) Set(val string) error { + // Check if the new value is only a port number + host, port, err := net.SplitHostPort(val) + if err != nil { + return err + } + + if len(host) == 0 || len(port) == 0 { + return fmt.Errorf("invalid address: host and port must be provided") + } + + re := regexp.MustCompile("^[0-9]+$") + if !re.MatchString(port) { + return fmt.Errorf("the port must be numerical") + } + + *s = ClusterAddress(val) + + return nil +} + +func (s *ClusterAddress) String() string { + return string(*s) +} + +func (s *ClusterAddress) Validate() error { + host, port, err := net.SplitHostPort(string(*s)) + if err != nil { + return err + } + + if len(host) == 0 || len(port) == 0 { + return fmt.Errorf("invalid address: host and port must be provided") + } + + re := regexp.MustCompile("^[0-9]+$") + if !re.MatchString(port) { + return fmt.Errorf("the port must be numerical") + } + + return nil +} + +func (s *ClusterAddress) IsEmpty() bool { + return s.Validate() != nil +} + +// cluster peer (id@host:port) + +type ClusterPeer string + +func NewClusterPeer(p *string, val string) *ClusterPeer { + *p = val + + return (*ClusterPeer)(p) +} + +func (s *ClusterPeer) Set(val string) error { + id, address, found := strings.Cut(val, "@") + if !found || len(id) == 0 { + return fmt.Errorf("id is missing") + } + + // Check if the new value is only a port number + host, port, err := net.SplitHostPort(address) + if err != nil { + return err + } + + if len(host) == 0 || len(port) == 0 { + return fmt.Errorf("invalid address: host and port must be provided") + } + + re := regexp.MustCompile("^[0-9]+$") + if !re.MatchString(port) { + return fmt.Errorf("the port must be numerical") + } + + *s = ClusterPeer(val) + + return nil +} + +func (s *ClusterPeer) String() string { + return string(*s) +} + +func (s *ClusterPeer) Validate() error { + _, address, found := strings.Cut(string(*s), "@") + if !found { + return fmt.Errorf("id is missing") + } + + host, port, err := net.SplitHostPort(address) + if err != nil { + return err + } + + if len(host) == 0 || len(port) == 0 { + return fmt.Errorf("invalid address: host and port must be provided") + } + + re := regexp.MustCompile("^[0-9]+$") + if !re.MatchString(port) { + return fmt.Errorf("the port must be numerical") + } + + return nil +} + +func (s *ClusterPeer) IsEmpty() bool { + return s.Validate() != nil +} + +// array of cluster peers + +type ClusterPeerList struct { + p *[]string + separator string +} + +func NewClusterPeerList(p *[]string, val []string, separator string) *ClusterPeerList { + v := &ClusterPeerList{ + p: p, + separator: separator, + } + + *p = val + + return v +} + +func (s *ClusterPeerList) Set(val string) error { + list := []string{} + + for _, elm := range strings.Split(val, s.separator) { + elm = strings.TrimSpace(elm) + if len(elm) != 0 { + p := NewClusterPeer(&elm, elm) + if err := p.Validate(); err != nil { + return fmt.Errorf("invalid cluster peer: %s: %w", elm, err) + } + list = append(list, elm) + } + } + + *s.p = list + + return nil +} + +func (s *ClusterPeerList) String() string { + if s.IsEmpty() { + return "(empty)" + } + + return strings.Join(*s.p, s.separator) +} + +func (s *ClusterPeerList) Validate() error { + for _, elm := range *s.p { + elm = strings.TrimSpace(elm) + if len(elm) != 0 { + p := NewClusterPeer(&elm, elm) + if err := p.Validate(); err != nil { + return fmt.Errorf("invalid cluster peer: %s: %w", elm, err) + } + } + } + + return nil +} + +func (s *ClusterPeerList) IsEmpty() bool { + return len(*s.p) == 0 +} diff --git a/config/value/cluster_test.go b/config/value/cluster_test.go new file mode 100644 index 00000000..eef9f3e9 --- /dev/null +++ b/config/value/cluster_test.go @@ -0,0 +1,79 @@ +package value + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestClusterAddressValue(t *testing.T) { + var x string + + val := NewClusterAddress(&x, "foobar:8080") + + require.Equal(t, "foobar:8080", val.String()) + require.Equal(t, nil, val.Validate()) + require.Equal(t, false, val.IsEmpty()) + + x = "foobaz:9090" + + require.Equal(t, "foobaz:9090", val.String()) + require.Equal(t, nil, val.Validate()) + require.Equal(t, false, val.IsEmpty()) + + val.Set("fooboz:7070") + + require.Equal(t, "fooboz:7070", x) + + err := val.Set(":7070") + require.Error(t, err) +} + +func TestClusterPeerValue(t *testing.T) { + var x string + + val := NewClusterPeer(&x, "abc@foobar:8080") + + require.Equal(t, "abc@foobar:8080", val.String()) + require.Equal(t, nil, val.Validate()) + require.Equal(t, false, val.IsEmpty()) + + x = "xyz@foobaz:9090" + + require.Equal(t, "xyz@foobaz:9090", val.String()) + require.Equal(t, nil, val.Validate()) + require.Equal(t, false, val.IsEmpty()) + + val.Set("mno@fooboz:7070") + + require.Equal(t, "mno@fooboz:7070", x) + + err := val.Set("foobar:7070") + require.Error(t, err) +} + +func TestClusterPeerListValue(t *testing.T) { + var x []string + + val := NewClusterPeerList(&x, []string{"abc@foobar:8080"}, ",") + + require.Equal(t, "abc@foobar:8080", val.String()) + require.Equal(t, nil, val.Validate()) + require.Equal(t, false, val.IsEmpty()) + + x = []string{"abc@foobar:8080", "xyz@foobaz:9090"} + + require.Equal(t, "abc@foobar:8080,xyz@foobaz:9090", val.String()) + require.Equal(t, nil, val.Validate()) + require.Equal(t, false, val.IsEmpty()) + + val.Set("mno@fooboz:8080,rst@foobax:9090") + + require.Equal(t, []string{"mno@fooboz:8080", "rst@foobax:9090"}, x) + + err := val.Set("mno@:8080") + require.Error(t, err) + + err = val.Set("foobax:9090") + require.Error(t, err) +} diff --git a/http/handler/api/cluster.go b/http/handler/api/cluster.go index 79d344ec..964484c8 100644 --- a/http/handler/api/cluster.go +++ b/http/handler/api/cluster.go @@ -1,16 +1,9 @@ package api import ( - "net/http" "regexp" - "sort" - "strings" "github.com/datarhei/core/v16/cluster" - "github.com/datarhei/core/v16/http/api" - "github.com/datarhei/core/v16/http/handler/util" - - "github.com/labstack/echo/v4" ) // The ClusterHandler type provides handler functions for manipulating the cluster config. @@ -19,6 +12,7 @@ type ClusterHandler struct { prefix *regexp.Regexp } +/* // NewCluster return a new ClusterHandler type. You have to provide a cluster. func NewCluster(cluster cluster.Cluster) *ClusterHandler { return &ClusterHandler{ @@ -207,3 +201,4 @@ func (h *ClusterHandler) UpdateNode(c echo.Context) error { return c.JSON(http.StatusOK, id) } +*/ diff --git a/http/server.go b/http/server.go index cd116853..05c3aaf0 100644 --- a/http/server.go +++ b/http/server.go @@ -314,7 +314,7 @@ func NewServer(config Config) (Server, error) { }) if config.Cluster != nil { - s.v3handler.cluster = api.NewCluster(config.Cluster) + //s.v3handler.cluster = api.NewCluster(config.Cluster) } if middleware, err := mwcors.NewWithConfig(mwcors.Config{ @@ -656,17 +656,19 @@ func (s *server) setRoutesV3(v3 *echo.Group) { } // v3 Cluster - if s.v3handler.cluster != nil { - v3.GET("/cluster", s.v3handler.cluster.GetCluster) - v3.GET("/cluster/node/:id", s.v3handler.cluster.GetNode) - v3.GET("/cluster/node/:id/proxy", s.v3handler.cluster.GetNodeProxy) + /* + if s.v3handler.cluster != nil { + v3.GET("/cluster", s.v3handler.cluster.GetCluster) + v3.GET("/cluster/node/:id", s.v3handler.cluster.GetNode) + v3.GET("/cluster/node/:id/proxy", s.v3handler.cluster.GetNodeProxy) - if !s.readOnly { - v3.POST("/cluster/node", s.v3handler.cluster.AddNode) - v3.PUT("/cluster/node/:id", s.v3handler.cluster.UpdateNode) - v3.DELETE("/cluster/node/:id", s.v3handler.cluster.DeleteNode) + if !s.readOnly { + v3.POST("/cluster/node", s.v3handler.cluster.AddNode) + v3.PUT("/cluster/node/:id", s.v3handler.cluster.UpdateNode) + v3.DELETE("/cluster/node/:id", s.v3handler.cluster.DeleteNode) + } } - } + */ // v3 Log v3.GET("/log", s.v3handler.log.Log)