diff --git a/app/api/api.go b/app/api/api.go index 939941ef..2ed471f2 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -623,17 +623,20 @@ func (a *api) start() error { a.restream = restream if cfg.Cluster.Enable { - if cluster, err := cluster.New(cluster.ClusterConfig{ + cluster, err := cluster.New(cluster.ClusterConfig{ ID: cfg.ID, Name: cfg.Name, Path: filepath.Join(cfg.DB.Dir, "cluster"), IPLimiter: a.sessionsLimiter, Logger: a.log.logger.core.WithComponent("Cluster"), - }); err != nil { + Bootstrap: cfg.Cluster.Bootstrap, + Address: cfg.Cluster.Address, + }) + if err != nil { return fmt.Errorf("unable to create cluster: %w", err) - } else { - a.cluster = cluster } + + a.cluster = cluster } var httpjwt jwt.JWT diff --git a/cluster/cluster.go b/cluster/cluster.go index 3ed587cf..1ae4672d 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -2,11 +2,13 @@ package cluster import ( "context" + "encoding/json" "errors" "fmt" "io" gonet "net" "path/filepath" + "reflect" "sync" "time" @@ -18,6 +20,26 @@ import ( "go.etcd.io/bbolt" ) +/* + /api/v3: + GET /cluster/db/node - list all nodes that are stored in the FSM - Cluster.Store.ListNodes() + POST /cluster/db/node - add a node to the FSM - Cluster.Store.AddNode() + DELETE /cluster/db/node/:id - remove a node from the FSM - Cluster.Store.RemoveNode() + + GET /cluster/db/process - list all process configs that are stored in the FSM - Cluster.Store.ListProcesses() + POST /cluster/db/process - add a process config to the FSM - Cluster.Store.AddProcess() + PUT /cluster/db/process/:id - update a process config in the FSM - Cluster.Store.UpdateProcess() + DELETE /cluster/db/process/:id - remove a process config from the FSM - Cluster.Store.RemoveProcess() + + ** for the processes, the leader will decide where to actually run them. the process configs will + also be added to the regular process DB of each core. + + POST /cluster/join - join the cluster - Cluster.Join() + DELETE /cluster/:id - leave the cluster - Cluster.Leave() + + ** all these endpoints will forward the request to the leader. +*/ + var ErrNodeNotFound = errors.New("node not found") type ClusterReader interface { @@ -55,6 +77,8 @@ type ClusterConfig struct { Path string IPLimiter net.IPLimiter Logger log.Logger + Bootstrap bool + Address string } type cluster struct { @@ -84,6 +108,8 @@ type cluster struct { raftStore *raftboltdb.BoltStore raftRemoveGracePeriod time.Duration + store Store + reassertLeaderCh chan chan error leaveCh chan struct{} @@ -106,6 +132,8 @@ func New(config ClusterConfig) (Cluster, error) { updates: make(chan NodeState, 64), logger: config.Logger, + raftAddress: config.Address, + reassertLeaderCh: make(chan chan error), leaveCh: make(chan struct{}), shutdownCh: make(chan struct{}), @@ -119,12 +147,19 @@ func New(config ClusterConfig) (Cluster, error) { c.logger = log.New("") } - fsm, err := NewFSM() + store, err := NewStore() if err != nil { return nil, err } - c.startRaft(fsm, true, false) + c.store = store + + c.logger.Debug().Log("starting raft") + + err = c.startRaft(store, config.Bootstrap, false) + if err != nil { + return nil, err + } go func() { for { @@ -213,6 +248,7 @@ func (c *cluster) Leave() error { if err := c.leadershipTransfer(); err == nil { isLeader = false } else { + c.StoreRemoveNode(c.id) future := c.raft.RemoveServer(raft.ServerID(c.id), 0, 0) if err := future.Error(); err != nil { c.logger.Error().WithError(err).Log("failed to remove ourself as raft peer") @@ -224,6 +260,9 @@ func (c *cluster) Leave() error { // must wait to allow the raft replication to take place, otherwise an // immediate shutdown could cause a loss of quorum. if !isLeader { + // Send leave-request to leader + // DELETE leader//api/v3/cluster/node/:id + left := false limit := time.Now().Add(c.raftRemoveGracePeriod) for !left && time.Now().Before(limit) { @@ -259,6 +298,210 @@ func (c *cluster) IsLeader() bool { return c.raft.State() == raft.Leader } +func (c *cluster) leave(id string) error { + if !c.IsLeader() { + return fmt.Errorf("not leader") + } + + c.logger.Debug().WithFields(log.Fields{ + "nodeid": id, + }).Log("received leave request for remote node") + + if id == c.id { + err := c.leadershipTransfer() + if err != nil { + c.logger.Warn().WithError(err).Log("failed to transfer leadership") + } + } + + future := c.raft.RemoveServer(raft.ServerID(id), 0, 0) + if err := future.Error(); err != nil { + c.logger.Error().WithError(err).WithFields(log.Fields{ + "nodeid": id, + }).Log("failed to remove node") + } + + return nil +} + +func (c *cluster) Join(id, raftAddress, apiAddress, username, password string) error { + if !c.IsLeader() { + return fmt.Errorf("not leader") + } + + c.logger.Debug().WithFields(log.Fields{ + "nodeid": id, + "address": raftAddress, + }).Log("received join request for remote node") + + configFuture := c.raft.GetConfiguration() + if err := configFuture.Error(); err != nil { + c.logger.Error().WithError(err).Log("failed to get raft configuration") + return err + } + + for _, srv := range configFuture.Configuration().Servers { + // If a node already exists with either the joining node's ID or address, + // that node may need to be removed from the config first. + if srv.ID == raft.ServerID(id) || srv.Address == raft.ServerAddress(raftAddress) { + // However if *both* the ID and the address are the same, then nothing -- not even + // a join operation -- is needed. + if srv.ID == raft.ServerID(id) && srv.Address == raft.ServerAddress(raftAddress) { + c.logger.Debug().WithFields(log.Fields{ + "nodeid": id, + "address": raftAddress, + }).Log("node is already member of cluster, ignoring join request") + return nil + } + + future := c.raft.RemoveServer(srv.ID, 0, 0) + if err := future.Error(); err != nil { + c.logger.Error().WithError(err).WithFields(log.Fields{ + "nodeid": id, + "address": raftAddress, + }).Log("error removing existing node") + return fmt.Errorf("error removing existing node %s at %s: %w", id, raftAddress, err) + } + } + } + + f := c.raft.AddVoter(raft.ServerID(id), raft.ServerAddress(raftAddress), 0, 5*time.Second) + if err := f.Error(); err != nil { + return err + } + + if err := c.StoreAddNode(id, apiAddress, username, password); err != nil { + future := c.raft.RemoveServer(raft.ServerID(id), 0, 0) + if err := future.Error(); err != nil { + c.logger.Error().WithError(err).WithFields(log.Fields{ + "nodeid": id, + "address": raftAddress, + }).Log("error removing existing node") + return err + } + return err + } + + c.logger.Info().WithFields(log.Fields{ + "nodeid": id, + "address": raftAddress, + }).Log("node joined successfully") + + return nil +} + +type command struct { + Operation string + Data interface{} +} + +type addNodeCommand struct { + ID string + Address string + Username string + Password string +} + +func (c *cluster) StoreListNodes() []addNodeCommand { + c.store.ListNodes() + + return nil +} + +func (c *cluster) StoreAddNode(id, address, username, password string) error { + if !c.IsLeader() { + return fmt.Errorf("not leader") + } + + com := &command{ + Operation: "addNode", + Data: &addNodeCommand{ + ID: id, + Address: address, + Username: username, + Password: password, + }, + } + + b, err := json.Marshal(com) + if err != nil { + return err + } + + future := c.raft.Apply(b, 5*time.Second) + if err := future.Error(); err != nil { + return fmt.Errorf("applying command failed: %w", err) + } + + return nil +} + +type removeNodeCommand struct { + ID string +} + +func (c *cluster) StoreRemoveNode(id string) error { + if !c.IsLeader() { + return fmt.Errorf("not leader") + } + + com := &command{ + Operation: "removeNode", + Data: &removeNodeCommand{ + ID: id, + }, + } + + b, err := json.Marshal(com) + if err != nil { + return err + } + + future := c.raft.Apply(b, 5*time.Second) + if err := future.Error(); err != nil { + return fmt.Errorf("applying command failed: %w", err) + } + + return nil +} + +// trackLeaderChanges registers an Observer with raft in order to receive updates +// about leader changes, in order to keep the forwarder up to date. +func (c *cluster) trackLeaderChanges() { + obsCh := make(chan raft.Observation, 16) + observer := raft.NewObserver(obsCh, false, func(o *raft.Observation) bool { + _, leaderOK := o.Data.(raft.LeaderObservation) + _, peerOK := o.Data.(raft.PeerObservation) + + return leaderOK || peerOK + }) + c.raft.RegisterObserver(observer) + + for { + select { + case obs := <-obsCh: + if leaderObs, ok := obs.Data.(raft.LeaderObservation); ok { + // TODO: update the forwarder + c.logger.Debug().WithFields(log.Fields{ + "id": leaderObs.LeaderID, + "address": leaderObs.LeaderAddr, + }).Log("new leader observation") + } else if peerObs, ok := obs.Data.(raft.PeerObservation); ok { + c.logger.Debug().WithFields(log.Fields{ + "removed": peerObs.Removed, + "address": peerObs.Peer.Address, + }).Log("new peer observation") + } else { + c.logger.Debug().WithField("type", reflect.TypeOf(obs.Data)).Log("got unknown observation type from raft") + continue + } + case <-c.shutdownCh: + c.raft.DeregisterObserver(observer) + return + } + } +} + func (c *cluster) AddNode(address, username, password string) (string, error) { node, err := newNode(address, username, password, c.updates) if err != nil { @@ -313,6 +556,8 @@ func (c *cluster) RemoveNode(id string) error { c.limiter.RemoveBlock(ip) } + c.leave(id) + c.logger.Info().WithFields(log.Fields{ "id": id, }).Log("Removed node") @@ -435,12 +680,16 @@ func (c *cluster) startRaft(fsm raft.FSM, bootstrap, inmem bool) error { return err } - transport, err := raft.NewTCPTransportWithLogger(c.raftAddress, addr, 3, 10*time.Second, NewLogger(c.logger.WithComponent("raft"), hclog.Debug).Named("transport")) + c.logger.Debug().Log("address: %s", addr) + + transport, err := raft.NewTCPTransportWithLogger(c.raftAddress, addr, 3, 10*time.Second, NewLogger(c.logger, hclog.Debug).Named("raft-transport")) if err != nil { return err } - snapshotLogger := NewLogger(c.logger.WithComponent("raft"), hclog.Debug).Named("snapshot") + c.raftTransport = transport + + snapshotLogger := NewLogger(c.logger, hclog.Debug).Named("raft-snapshot") snapshots, err := raft.NewFileSnapshotStoreWithLogger(filepath.Join(c.path, "snapshots"), 10, snapshotLogger) if err != nil { return err @@ -475,7 +724,7 @@ func (c *cluster) startRaft(fsm raft.FSM, bootstrap, inmem bool) error { cfg := raft.DefaultConfig() cfg.LocalID = raft.ServerID(c.id) - cfg.Logger = NewLogger(c.logger.WithComponent("raft"), hclog.Debug) + cfg.Logger = NewLogger(c.logger, hclog.Debug).Named("raft") if bootstrap { hasState, err := raft.HasExistingState(logStore, stableStore, snapshots) @@ -512,8 +761,11 @@ func (c *cluster) startRaft(fsm raft.FSM, bootstrap, inmem bool) error { c.raft = node + go c.trackLeaderChanges() go c.monitorLeadership() + c.logger.Debug().Log("raft started") + return nil } @@ -529,3 +781,7 @@ func (c *cluster) shutdownRaft() { } } } + +// nodeLoop is run by every node in the cluster. This is mainly to check the list +// of nodes from the FSM, in order to connect to them and to fetch their file lists. +func (c *cluster) nodeLoop() {} diff --git a/cluster/forwarder.go b/cluster/forwarder.go new file mode 100644 index 00000000..a7a27545 --- /dev/null +++ b/cluster/forwarder.go @@ -0,0 +1,28 @@ +package cluster + +import "github.com/labstack/echo/v4" + +// Forwarder forwards any HTTP request from a follower to the leader +type Forwarder interface { + SetLeader(address string) + Forward(c echo.Context) +} + +type forwarder struct { + leaderAddr string +} + +func NewForwarder() (Forwarder, error) { + return &forwarder{}, nil +} + +func (f *forwarder) SetLeader(address string) { + if f.leaderAddr == address { + return + } + +} + +func (f *forwarder) Forward(c echo.Context) { + +} diff --git a/cluster/fsm.go b/cluster/fsm.go index 746f1cea..8d78b5e9 100644 --- a/cluster/fsm.go +++ b/cluster/fsm.go @@ -1,27 +1,68 @@ package cluster import ( + "encoding/json" + "fmt" "io" "github.com/hashicorp/raft" ) -// Implement a FSM -type fsm struct{} +type Store interface { + raft.FSM -func NewFSM() (raft.FSM, error) { - return &fsm{}, nil + ListNodes() []string } -func (f *fsm) Apply(*raft.Log) interface{} { +// Implement a FSM +type store struct{} + +func NewStore() (Store, error) { + return &store{}, nil +} + +func (s *store) Apply(log *raft.Log) interface{} { + fmt.Printf("a log entry came in (index=%d, term=%d): %s\n", log.Index, log.Term, string(log.Data)) + + c := command{} + + err := json.Unmarshal(log.Data, &c) + if err != nil { + fmt.Printf("invalid log entry\n") + return nil + } + + fmt.Printf("op: %s\n", c.Operation) + fmt.Printf("op: %+v\n", c) + + switch c.Operation { + case "addNode": + b, _ := json.Marshal(c.Data) + cmd := addNodeCommand{} + json.Unmarshal(b, &cmd) + + fmt.Printf("addNode: %+v\n", cmd) + case "removeNode": + b, _ := json.Marshal(c.Data) + cmd := removeNodeCommand{} + json.Unmarshal(b, &cmd) + + fmt.Printf("removeNode: %+v\n", cmd) + } return nil } -func (f *fsm) Snapshot() (raft.FSMSnapshot, error) { +func (s *store) Snapshot() (raft.FSMSnapshot, error) { + fmt.Printf("a snapshot is requested\n") return &fsmSnapshot{}, nil } -func (f *fsm) Restore(snapshot io.ReadCloser) error { +func (s *store) Restore(snapshot io.ReadCloser) error { + fmt.Printf("a snapshot is restored\n") + return nil +} + +func (s *store) ListNodes() []string { return nil } diff --git a/cluster/leader.go b/cluster/leader.go index 6e66e54c..76f8c85d 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -21,6 +21,7 @@ func (c *cluster) monitorLeadership() { var weAreLeaderCh chan struct{} var leaderLoop sync.WaitGroup + for { select { case isLeader := <-raftNotifyCh: @@ -39,6 +40,8 @@ func (c *cluster) monitorLeadership() { }(weAreLeaderCh) c.logger.Info().Log("cluster leadership acquired") + c.StoreAddNode(c.id, ":8080", "foo", "bar") + default: if weAreLeaderCh == nil { c.logger.Error().Log("attempted to stop the leader loop while not running") @@ -87,7 +90,7 @@ func (c *cluster) leaderLoop(stopCh chan struct{}) { establishedLeader := false RECONCILE: // Setup a reconciliation timer - interval := time.After(s.config.ReconcileInterval) + interval := time.After(10 * time.Second) // Apply a raft barrier to ensure our FSM is caught up barrier := c.raft.Barrier(time.Minute) @@ -98,7 +101,7 @@ RECONCILE: // Check if we need to handle initial leadership actions if !establishedLeader { - if err := c.establishLeadership(stopCtx); err != nil { + if err := c.establishLeadership(context.TODO()); err != nil { c.logger.Error().WithError(err).Log("failed to establish leadership") // Immediately revoke leadership since we didn't successfully // establish leadership. @@ -130,8 +133,7 @@ WAIT: default: } - // Periodically reconcile as long as we are the leader, - // or when Serf events arrive + // Periodically reconcile as long as we are the leader for { select { case <-stopCh: @@ -157,7 +159,7 @@ WAIT: // leader, which means revokeLeadership followed by an // establishLeadership(). c.revokeLeadership() - err := c.establishLeadership(stopCtx) + err := c.establishLeadership(context.TODO()) errCh <- err // in case establishLeadership failed, we will try to @@ -187,9 +189,10 @@ WAIT: } func (c *cluster) establishLeadership(ctx context.Context) error { + c.logger.Debug().Log("establishing leadership") return nil } func (c *cluster) revokeLeadership() { - + c.logger.Debug().Log("revoking leadership") } diff --git a/cluster/logger.go b/cluster/logger.go index 08a4c5e9..4f5f03b4 100644 --- a/cluster/logger.go +++ b/cluster/logger.go @@ -20,23 +20,8 @@ type hclogger struct { } func NewLogger(logger log.Logger, lvl hclog.Level) hclog.Logger { - level := log.Linfo - - switch lvl { - case hclog.Trace: - level = log.Ldebug - case hclog.Debug: - level = log.Ldebug - case hclog.Info: - level = log.Linfo - case hclog.Warn: - level = log.Lwarn - case hclog.Error: - level = log.Lerror - } - return &hclogger{ - logger: logger.WithOutput(log.NewSyncWriter(log.NewConsoleWriter(os.Stderr, level, true))), + logger: logger, level: lvl, } } diff --git a/cluster/node.go b/cluster/node.go index 58235b99..4f093c88 100644 --- a/cluster/node.go +++ b/cluster/node.go @@ -242,7 +242,7 @@ func (n *node) files() { } for _, file := range files { - filesChan <- "mem:" + file.Name + f <- "mem:" + file.Name } }(filesChan) @@ -255,7 +255,7 @@ func (n *node) files() { } for _, file := range files { - filesChan <- "disk:" + file.Name + f <- "disk:" + file.Name } }(filesChan) @@ -271,7 +271,7 @@ func (n *node) files() { } for _, file := range files { - filesChan <- "rtmp:" + file.Name + f <- "rtmp:" + file.Name } }(filesChan) } @@ -288,7 +288,7 @@ func (n *node) files() { } for _, file := range files { - filesChan <- "srt:" + file.Name + f <- "srt:" + file.Name } }(filesChan) } diff --git a/config/config.go b/config/config.go index a5de6957..4302a1fd 100644 --- a/config/config.go +++ b/config/config.go @@ -134,6 +134,8 @@ func (d *Config) Clone() *Config { data.Router.BlockedPrefixes = copy.Slice(d.Router.BlockedPrefixes) data.Router.Routes = copy.StringMap(d.Router.Routes) + data.Cluster = d.Cluster + data.vars.Transfer(&d.vars) return data @@ -276,6 +278,8 @@ func (d *Config) init() { d.vars.Register(value.NewBool(&d.Cluster.Enable, false), "cluster.enable", "CORE_CLUSTER_ENABLE", nil, "Enable cluster mode", false, false) d.vars.Register(value.NewBool(&d.Cluster.Bootstrap, false), "cluster.bootstrap", "CORE_CLUSTER_BOOTSTRAP", nil, "Bootstrap a cluster", false, false) d.vars.Register(value.NewBool(&d.Cluster.Debug, false), "cluster.debug", "CORE_CLUSTER_DEBUG", nil, "Switch to debug mode, not for production", false, false) + d.vars.Register(value.NewAddress(&d.Cluster.Address, ":8000"), "cluster.address", "CORE_CLUSTER_ADDRESS", nil, "Raft listen address", false, true) + d.vars.Register(value.NewString(&d.Cluster.JoinAddress, ""), "cluster.join_address", "CORE_CLUSTER_JOIN_ADDRESS", nil, "Address of a core that is part of the cluster", false, true) } // Validate validates the current state of the Config for completeness and sanity. Errors are @@ -455,6 +459,13 @@ func (d *Config) Validate(resetLogs bool) { d.vars.Log("error", "metrics.interval", "must be smaller than the range") } } + + // If cluster mode is enabled, we can't join and bootstrap at the same time + if d.Cluster.Enable { + if d.Cluster.Bootstrap && len(d.Cluster.JoinAddress) != 0 { + d.vars.Log("error", "cluster.join_address", "can't be set if cluster.bootstrap is enabled") + } + } } // Merge merges the values of the known environment variables into the configuration diff --git a/config/data.go b/config/data.go index 631b215a..a844ae82 100644 --- a/config/data.go +++ b/config/data.go @@ -167,9 +167,11 @@ type Data struct { UIPath string `json:"ui_path"` } `json:"router"` Cluster struct { - Enable bool `json:"enable"` - Bootstrap bool `json:"bootstrap"` - Debug bool `json:"debug"` + Enable bool `json:"enable"` + Bootstrap bool `json:"bootstrap"` + Debug bool `json:"debug"` + Address string `json:"address"` + JoinAddress string `json:"join_address"` } `json:"cluster"` }