WIP: internal API

This commit is contained in:
Ingo Oppermann
2023-04-17 14:01:20 +02:00
parent 7643959bf8
commit 27f19f9188
9 changed files with 374 additions and 45 deletions

View File

@@ -623,17 +623,20 @@ func (a *api) start() error {
a.restream = restream
if cfg.Cluster.Enable {
if cluster, err := cluster.New(cluster.ClusterConfig{
cluster, err := cluster.New(cluster.ClusterConfig{
ID: cfg.ID,
Name: cfg.Name,
Path: filepath.Join(cfg.DB.Dir, "cluster"),
IPLimiter: a.sessionsLimiter,
Logger: a.log.logger.core.WithComponent("Cluster"),
}); err != nil {
Bootstrap: cfg.Cluster.Bootstrap,
Address: cfg.Cluster.Address,
})
if err != nil {
return fmt.Errorf("unable to create cluster: %w", err)
} else {
a.cluster = cluster
}
a.cluster = cluster
}
var httpjwt jwt.JWT

View File

@@ -2,11 +2,13 @@ package cluster
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
gonet "net"
"path/filepath"
"reflect"
"sync"
"time"
@@ -18,6 +20,26 @@ import (
"go.etcd.io/bbolt"
)
/*
/api/v3:
GET /cluster/db/node - list all nodes that are stored in the FSM - Cluster.Store.ListNodes()
POST /cluster/db/node - add a node to the FSM - Cluster.Store.AddNode()
DELETE /cluster/db/node/:id - remove a node from the FSM - Cluster.Store.RemoveNode()
GET /cluster/db/process - list all process configs that are stored in the FSM - Cluster.Store.ListProcesses()
POST /cluster/db/process - add a process config to the FSM - Cluster.Store.AddProcess()
PUT /cluster/db/process/:id - update a process config in the FSM - Cluster.Store.UpdateProcess()
DELETE /cluster/db/process/:id - remove a process config from the FSM - Cluster.Store.RemoveProcess()
** for the processes, the leader will decide where to actually run them. the process configs will
also be added to the regular process DB of each core.
POST /cluster/join - join the cluster - Cluster.Join()
DELETE /cluster/:id - leave the cluster - Cluster.Leave()
** all these endpoints will forward the request to the leader.
*/
var ErrNodeNotFound = errors.New("node not found")
type ClusterReader interface {
@@ -55,6 +77,8 @@ type ClusterConfig struct {
Path string
IPLimiter net.IPLimiter
Logger log.Logger
Bootstrap bool
Address string
}
type cluster struct {
@@ -84,6 +108,8 @@ type cluster struct {
raftStore *raftboltdb.BoltStore
raftRemoveGracePeriod time.Duration
store Store
reassertLeaderCh chan chan error
leaveCh chan struct{}
@@ -106,6 +132,8 @@ func New(config ClusterConfig) (Cluster, error) {
updates: make(chan NodeState, 64),
logger: config.Logger,
raftAddress: config.Address,
reassertLeaderCh: make(chan chan error),
leaveCh: make(chan struct{}),
shutdownCh: make(chan struct{}),
@@ -119,12 +147,19 @@ func New(config ClusterConfig) (Cluster, error) {
c.logger = log.New("")
}
fsm, err := NewFSM()
store, err := NewStore()
if err != nil {
return nil, err
}
c.startRaft(fsm, true, false)
c.store = store
c.logger.Debug().Log("starting raft")
err = c.startRaft(store, config.Bootstrap, false)
if err != nil {
return nil, err
}
go func() {
for {
@@ -213,6 +248,7 @@ func (c *cluster) Leave() error {
if err := c.leadershipTransfer(); err == nil {
isLeader = false
} else {
c.StoreRemoveNode(c.id)
future := c.raft.RemoveServer(raft.ServerID(c.id), 0, 0)
if err := future.Error(); err != nil {
c.logger.Error().WithError(err).Log("failed to remove ourself as raft peer")
@@ -224,6 +260,9 @@ func (c *cluster) Leave() error {
// must wait to allow the raft replication to take place, otherwise an
// immediate shutdown could cause a loss of quorum.
if !isLeader {
// Send leave-request to leader
// DELETE leader//api/v3/cluster/node/:id
left := false
limit := time.Now().Add(c.raftRemoveGracePeriod)
for !left && time.Now().Before(limit) {
@@ -259,6 +298,210 @@ func (c *cluster) IsLeader() bool {
return c.raft.State() == raft.Leader
}
func (c *cluster) leave(id string) error {
if !c.IsLeader() {
return fmt.Errorf("not leader")
}
c.logger.Debug().WithFields(log.Fields{
"nodeid": id,
}).Log("received leave request for remote node")
if id == c.id {
err := c.leadershipTransfer()
if err != nil {
c.logger.Warn().WithError(err).Log("failed to transfer leadership")
}
}
future := c.raft.RemoveServer(raft.ServerID(id), 0, 0)
if err := future.Error(); err != nil {
c.logger.Error().WithError(err).WithFields(log.Fields{
"nodeid": id,
}).Log("failed to remove node")
}
return nil
}
func (c *cluster) Join(id, raftAddress, apiAddress, username, password string) error {
if !c.IsLeader() {
return fmt.Errorf("not leader")
}
c.logger.Debug().WithFields(log.Fields{
"nodeid": id,
"address": raftAddress,
}).Log("received join request for remote node")
configFuture := c.raft.GetConfiguration()
if err := configFuture.Error(); err != nil {
c.logger.Error().WithError(err).Log("failed to get raft configuration")
return err
}
for _, srv := range configFuture.Configuration().Servers {
// If a node already exists with either the joining node's ID or address,
// that node may need to be removed from the config first.
if srv.ID == raft.ServerID(id) || srv.Address == raft.ServerAddress(raftAddress) {
// However if *both* the ID and the address are the same, then nothing -- not even
// a join operation -- is needed.
if srv.ID == raft.ServerID(id) && srv.Address == raft.ServerAddress(raftAddress) {
c.logger.Debug().WithFields(log.Fields{
"nodeid": id,
"address": raftAddress,
}).Log("node is already member of cluster, ignoring join request")
return nil
}
future := c.raft.RemoveServer(srv.ID, 0, 0)
if err := future.Error(); err != nil {
c.logger.Error().WithError(err).WithFields(log.Fields{
"nodeid": id,
"address": raftAddress,
}).Log("error removing existing node")
return fmt.Errorf("error removing existing node %s at %s: %w", id, raftAddress, err)
}
}
}
f := c.raft.AddVoter(raft.ServerID(id), raft.ServerAddress(raftAddress), 0, 5*time.Second)
if err := f.Error(); err != nil {
return err
}
if err := c.StoreAddNode(id, apiAddress, username, password); err != nil {
future := c.raft.RemoveServer(raft.ServerID(id), 0, 0)
if err := future.Error(); err != nil {
c.logger.Error().WithError(err).WithFields(log.Fields{
"nodeid": id,
"address": raftAddress,
}).Log("error removing existing node")
return err
}
return err
}
c.logger.Info().WithFields(log.Fields{
"nodeid": id,
"address": raftAddress,
}).Log("node joined successfully")
return nil
}
type command struct {
Operation string
Data interface{}
}
type addNodeCommand struct {
ID string
Address string
Username string
Password string
}
func (c *cluster) StoreListNodes() []addNodeCommand {
c.store.ListNodes()
return nil
}
func (c *cluster) StoreAddNode(id, address, username, password string) error {
if !c.IsLeader() {
return fmt.Errorf("not leader")
}
com := &command{
Operation: "addNode",
Data: &addNodeCommand{
ID: id,
Address: address,
Username: username,
Password: password,
},
}
b, err := json.Marshal(com)
if err != nil {
return err
}
future := c.raft.Apply(b, 5*time.Second)
if err := future.Error(); err != nil {
return fmt.Errorf("applying command failed: %w", err)
}
return nil
}
type removeNodeCommand struct {
ID string
}
func (c *cluster) StoreRemoveNode(id string) error {
if !c.IsLeader() {
return fmt.Errorf("not leader")
}
com := &command{
Operation: "removeNode",
Data: &removeNodeCommand{
ID: id,
},
}
b, err := json.Marshal(com)
if err != nil {
return err
}
future := c.raft.Apply(b, 5*time.Second)
if err := future.Error(); err != nil {
return fmt.Errorf("applying command failed: %w", err)
}
return nil
}
// trackLeaderChanges registers an Observer with raft in order to receive updates
// about leader changes, in order to keep the forwarder up to date.
func (c *cluster) trackLeaderChanges() {
obsCh := make(chan raft.Observation, 16)
observer := raft.NewObserver(obsCh, false, func(o *raft.Observation) bool {
_, leaderOK := o.Data.(raft.LeaderObservation)
_, peerOK := o.Data.(raft.PeerObservation)
return leaderOK || peerOK
})
c.raft.RegisterObserver(observer)
for {
select {
case obs := <-obsCh:
if leaderObs, ok := obs.Data.(raft.LeaderObservation); ok {
// TODO: update the forwarder
c.logger.Debug().WithFields(log.Fields{
"id": leaderObs.LeaderID,
"address": leaderObs.LeaderAddr,
}).Log("new leader observation")
} else if peerObs, ok := obs.Data.(raft.PeerObservation); ok {
c.logger.Debug().WithFields(log.Fields{
"removed": peerObs.Removed,
"address": peerObs.Peer.Address,
}).Log("new peer observation")
} else {
c.logger.Debug().WithField("type", reflect.TypeOf(obs.Data)).Log("got unknown observation type from raft")
continue
}
case <-c.shutdownCh:
c.raft.DeregisterObserver(observer)
return
}
}
}
func (c *cluster) AddNode(address, username, password string) (string, error) {
node, err := newNode(address, username, password, c.updates)
if err != nil {
@@ -313,6 +556,8 @@ func (c *cluster) RemoveNode(id string) error {
c.limiter.RemoveBlock(ip)
}
c.leave(id)
c.logger.Info().WithFields(log.Fields{
"id": id,
}).Log("Removed node")
@@ -435,12 +680,16 @@ func (c *cluster) startRaft(fsm raft.FSM, bootstrap, inmem bool) error {
return err
}
transport, err := raft.NewTCPTransportWithLogger(c.raftAddress, addr, 3, 10*time.Second, NewLogger(c.logger.WithComponent("raft"), hclog.Debug).Named("transport"))
c.logger.Debug().Log("address: %s", addr)
transport, err := raft.NewTCPTransportWithLogger(c.raftAddress, addr, 3, 10*time.Second, NewLogger(c.logger, hclog.Debug).Named("raft-transport"))
if err != nil {
return err
}
snapshotLogger := NewLogger(c.logger.WithComponent("raft"), hclog.Debug).Named("snapshot")
c.raftTransport = transport
snapshotLogger := NewLogger(c.logger, hclog.Debug).Named("raft-snapshot")
snapshots, err := raft.NewFileSnapshotStoreWithLogger(filepath.Join(c.path, "snapshots"), 10, snapshotLogger)
if err != nil {
return err
@@ -475,7 +724,7 @@ func (c *cluster) startRaft(fsm raft.FSM, bootstrap, inmem bool) error {
cfg := raft.DefaultConfig()
cfg.LocalID = raft.ServerID(c.id)
cfg.Logger = NewLogger(c.logger.WithComponent("raft"), hclog.Debug)
cfg.Logger = NewLogger(c.logger, hclog.Debug).Named("raft")
if bootstrap {
hasState, err := raft.HasExistingState(logStore, stableStore, snapshots)
@@ -512,8 +761,11 @@ func (c *cluster) startRaft(fsm raft.FSM, bootstrap, inmem bool) error {
c.raft = node
go c.trackLeaderChanges()
go c.monitorLeadership()
c.logger.Debug().Log("raft started")
return nil
}
@@ -529,3 +781,7 @@ func (c *cluster) shutdownRaft() {
}
}
}
// nodeLoop is run by every node in the cluster. This is mainly to check the list
// of nodes from the FSM, in order to connect to them and to fetch their file lists.
func (c *cluster) nodeLoop() {}

28
cluster/forwarder.go Normal file
View File

@@ -0,0 +1,28 @@
package cluster
import "github.com/labstack/echo/v4"
// Forwarder forwards any HTTP request from a follower to the leader
type Forwarder interface {
SetLeader(address string)
Forward(c echo.Context)
}
type forwarder struct {
leaderAddr string
}
func NewForwarder() (Forwarder, error) {
return &forwarder{}, nil
}
func (f *forwarder) SetLeader(address string) {
if f.leaderAddr == address {
return
}
}
func (f *forwarder) Forward(c echo.Context) {
}

View File

@@ -1,27 +1,68 @@
package cluster
import (
"encoding/json"
"fmt"
"io"
"github.com/hashicorp/raft"
)
// Implement a FSM
type fsm struct{}
type Store interface {
raft.FSM
func NewFSM() (raft.FSM, error) {
return &fsm{}, nil
ListNodes() []string
}
func (f *fsm) Apply(*raft.Log) interface{} {
// Implement a FSM
type store struct{}
func NewStore() (Store, error) {
return &store{}, nil
}
func (s *store) Apply(log *raft.Log) interface{} {
fmt.Printf("a log entry came in (index=%d, term=%d): %s\n", log.Index, log.Term, string(log.Data))
c := command{}
err := json.Unmarshal(log.Data, &c)
if err != nil {
fmt.Printf("invalid log entry\n")
return nil
}
fmt.Printf("op: %s\n", c.Operation)
fmt.Printf("op: %+v\n", c)
switch c.Operation {
case "addNode":
b, _ := json.Marshal(c.Data)
cmd := addNodeCommand{}
json.Unmarshal(b, &cmd)
fmt.Printf("addNode: %+v\n", cmd)
case "removeNode":
b, _ := json.Marshal(c.Data)
cmd := removeNodeCommand{}
json.Unmarshal(b, &cmd)
fmt.Printf("removeNode: %+v\n", cmd)
}
return nil
}
func (f *fsm) Snapshot() (raft.FSMSnapshot, error) {
func (s *store) Snapshot() (raft.FSMSnapshot, error) {
fmt.Printf("a snapshot is requested\n")
return &fsmSnapshot{}, nil
}
func (f *fsm) Restore(snapshot io.ReadCloser) error {
func (s *store) Restore(snapshot io.ReadCloser) error {
fmt.Printf("a snapshot is restored\n")
return nil
}
func (s *store) ListNodes() []string {
return nil
}

View File

@@ -21,6 +21,7 @@ func (c *cluster) monitorLeadership() {
var weAreLeaderCh chan struct{}
var leaderLoop sync.WaitGroup
for {
select {
case isLeader := <-raftNotifyCh:
@@ -39,6 +40,8 @@ func (c *cluster) monitorLeadership() {
}(weAreLeaderCh)
c.logger.Info().Log("cluster leadership acquired")
c.StoreAddNode(c.id, ":8080", "foo", "bar")
default:
if weAreLeaderCh == nil {
c.logger.Error().Log("attempted to stop the leader loop while not running")
@@ -87,7 +90,7 @@ func (c *cluster) leaderLoop(stopCh chan struct{}) {
establishedLeader := false
RECONCILE:
// Setup a reconciliation timer
interval := time.After(s.config.ReconcileInterval)
interval := time.After(10 * time.Second)
// Apply a raft barrier to ensure our FSM is caught up
barrier := c.raft.Barrier(time.Minute)
@@ -98,7 +101,7 @@ RECONCILE:
// Check if we need to handle initial leadership actions
if !establishedLeader {
if err := c.establishLeadership(stopCtx); err != nil {
if err := c.establishLeadership(context.TODO()); err != nil {
c.logger.Error().WithError(err).Log("failed to establish leadership")
// Immediately revoke leadership since we didn't successfully
// establish leadership.
@@ -130,8 +133,7 @@ WAIT:
default:
}
// Periodically reconcile as long as we are the leader,
// or when Serf events arrive
// Periodically reconcile as long as we are the leader
for {
select {
case <-stopCh:
@@ -157,7 +159,7 @@ WAIT:
// leader, which means revokeLeadership followed by an
// establishLeadership().
c.revokeLeadership()
err := c.establishLeadership(stopCtx)
err := c.establishLeadership(context.TODO())
errCh <- err
// in case establishLeadership failed, we will try to
@@ -187,9 +189,10 @@ WAIT:
}
func (c *cluster) establishLeadership(ctx context.Context) error {
c.logger.Debug().Log("establishing leadership")
return nil
}
func (c *cluster) revokeLeadership() {
c.logger.Debug().Log("revoking leadership")
}

View File

@@ -20,23 +20,8 @@ type hclogger struct {
}
func NewLogger(logger log.Logger, lvl hclog.Level) hclog.Logger {
level := log.Linfo
switch lvl {
case hclog.Trace:
level = log.Ldebug
case hclog.Debug:
level = log.Ldebug
case hclog.Info:
level = log.Linfo
case hclog.Warn:
level = log.Lwarn
case hclog.Error:
level = log.Lerror
}
return &hclogger{
logger: logger.WithOutput(log.NewSyncWriter(log.NewConsoleWriter(os.Stderr, level, true))),
logger: logger,
level: lvl,
}
}

View File

@@ -242,7 +242,7 @@ func (n *node) files() {
}
for _, file := range files {
filesChan <- "mem:" + file.Name
f <- "mem:" + file.Name
}
}(filesChan)
@@ -255,7 +255,7 @@ func (n *node) files() {
}
for _, file := range files {
filesChan <- "disk:" + file.Name
f <- "disk:" + file.Name
}
}(filesChan)
@@ -271,7 +271,7 @@ func (n *node) files() {
}
for _, file := range files {
filesChan <- "rtmp:" + file.Name
f <- "rtmp:" + file.Name
}
}(filesChan)
}
@@ -288,7 +288,7 @@ func (n *node) files() {
}
for _, file := range files {
filesChan <- "srt:" + file.Name
f <- "srt:" + file.Name
}
}(filesChan)
}

View File

@@ -134,6 +134,8 @@ func (d *Config) Clone() *Config {
data.Router.BlockedPrefixes = copy.Slice(d.Router.BlockedPrefixes)
data.Router.Routes = copy.StringMap(d.Router.Routes)
data.Cluster = d.Cluster
data.vars.Transfer(&d.vars)
return data
@@ -276,6 +278,8 @@ func (d *Config) init() {
d.vars.Register(value.NewBool(&d.Cluster.Enable, false), "cluster.enable", "CORE_CLUSTER_ENABLE", nil, "Enable cluster mode", false, false)
d.vars.Register(value.NewBool(&d.Cluster.Bootstrap, false), "cluster.bootstrap", "CORE_CLUSTER_BOOTSTRAP", nil, "Bootstrap a cluster", false, false)
d.vars.Register(value.NewBool(&d.Cluster.Debug, false), "cluster.debug", "CORE_CLUSTER_DEBUG", nil, "Switch to debug mode, not for production", false, false)
d.vars.Register(value.NewAddress(&d.Cluster.Address, ":8000"), "cluster.address", "CORE_CLUSTER_ADDRESS", nil, "Raft listen address", false, true)
d.vars.Register(value.NewString(&d.Cluster.JoinAddress, ""), "cluster.join_address", "CORE_CLUSTER_JOIN_ADDRESS", nil, "Address of a core that is part of the cluster", false, true)
}
// Validate validates the current state of the Config for completeness and sanity. Errors are
@@ -455,6 +459,13 @@ func (d *Config) Validate(resetLogs bool) {
d.vars.Log("error", "metrics.interval", "must be smaller than the range")
}
}
// If cluster mode is enabled, we can't join and bootstrap at the same time
if d.Cluster.Enable {
if d.Cluster.Bootstrap && len(d.Cluster.JoinAddress) != 0 {
d.vars.Log("error", "cluster.join_address", "can't be set if cluster.bootstrap is enabled")
}
}
}
// Merge merges the values of the known environment variables into the configuration

View File

@@ -167,9 +167,11 @@ type Data struct {
UIPath string `json:"ui_path"`
} `json:"router"`
Cluster struct {
Enable bool `json:"enable"`
Bootstrap bool `json:"bootstrap"`
Debug bool `json:"debug"`
Enable bool `json:"enable"`
Bootstrap bool `json:"bootstrap"`
Debug bool `json:"debug"`
Address string `json:"address"`
JoinAddress string `json:"join_address"`
} `json:"cluster"`
}