diff --git a/app/api/api.go b/app/api/api.go index b9d2c010..65f0e27c 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -474,13 +474,12 @@ func (a *api) start() error { ID: cfg.ID, Name: cfg.Name, Path: filepath.Join(cfg.DB.Dir, "cluster"), - Bootstrap: cfg.Cluster.Bootstrap, - Recover: cfg.Cluster.Recover, Address: cfg.Cluster.Address, Peers: peers, SyncInterval: time.Duration(cfg.Cluster.SyncInterval) * time.Second, NodeRecoverTimeout: time.Duration(cfg.Cluster.NodeRecoverTimeout) * time.Second, EmergencyLeaderTimeout: time.Duration(cfg.Cluster.EmergencyLeaderTimeout) * time.Second, + Config: cfg.Clone(), CoreAPIAddress: scheme + gonet.JoinHostPort(host, port), CoreAPIUsername: cfg.API.Auth.Username, CoreAPIPassword: cfg.API.Auth.Password, diff --git a/cluster/cluster.go b/cluster/cluster.go index 66f5a0e3..539f17ec 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -7,7 +7,6 @@ import ( "fmt" "io" gonet "net" - "net/http" "net/url" "strconv" "sync" @@ -20,6 +19,7 @@ import ( "github.com/datarhei/core/v16/cluster/proxy" "github.com/datarhei/core/v16/cluster/raft" "github.com/datarhei/core/v16/cluster/store" + "github.com/datarhei/core/v16/config" "github.com/datarhei/core/v16/iam" iamaccess "github.com/datarhei/core/v16/iam/access" iamidentity "github.com/datarhei/core/v16/iam/identity" @@ -74,13 +74,11 @@ type Peer struct { } type ClusterConfig struct { - ID string // ID of the node - Name string // Name of the node - Path string // Path where to store all cluster data - Bootstrap bool // Whether to bootstrap a cluster - Recover bool // Whether to recover this node - Address string // Listen address for the raft protocol - Peers []Peer // Address of a member of a cluster to join + ID string // ID of the node + Name string // Name of the node + Path string // Path where to store all cluster data + Address string // Listen address for the raft protocol + Peers []Peer // Address of a member of a cluster to join SyncInterval time.Duration // Interval between aligning the process in the cluster DB with the processes on the nodes NodeRecoverTimeout time.Duration // Timeout for a node to recover before rebalancing the processes @@ -90,6 +88,8 @@ type ClusterConfig struct { CoreAPIUsername string // Username for the core API CoreAPIPassword string // Password for the core API + Config *config.Config + IPLimiter net.IPLimiter Logger log.Logger } @@ -124,17 +124,19 @@ type cluster struct { api API proxy proxy.Proxy + config *config.Config coreAddress string - isDegraded bool - stateLock sync.Mutex + isDegraded bool + isDegradedErr error + stateLock sync.Mutex isRaftLeader bool hasRaftLeader bool isLeader bool leaderLock sync.Mutex - nodes map[string]proxy.Node + nodes map[string]*clusterNode nodesLock sync.RWMutex } @@ -156,7 +158,12 @@ func New(config ClusterConfig) (Cluster, error) { nodeRecoverTimeout: config.NodeRecoverTimeout, emergencyLeaderTimeout: config.EmergencyLeaderTimeout, - nodes: map[string]proxy.Node{}, + config: config.Config, + nodes: map[string]*clusterNode{}, + } + + if c.config == nil { + return nil, fmt.Errorf("the core config must be provided") } u, err := url.Parse(config.CoreAPIAddress) @@ -245,8 +252,6 @@ func New(config ClusterConfig) (Cluster, error) { raft, err := raft.New(raft.Config{ ID: config.ID, Path: config.Path, - Bootstrap: config.Bootstrap, - Recover: config.Recover, Address: config.Address, Peers: peers, Store: store, @@ -361,7 +366,7 @@ func (c *cluster) Shutdown() error { close(c.shutdownCh) for id, node := range c.nodes { - node.Disconnect() + node.Stop() if c.proxy != nil { c.proxy.RemoveNode(id) } @@ -624,7 +629,7 @@ func (c *cluster) trackNodeChanges() { } for _, server := range servers { - id := server.ID + id := server.ID + server.Address _, ok := c.nodes[id] if !ok { @@ -633,42 +638,27 @@ func (c *cluster) trackNodeChanges() { "address": server.Address, }) - address, err := c.ClusterAPIAddress(server.Address) + address, err := clusterAPIAddress(server.Address) if err != nil { logger.Warn().WithError(err).Log("Discovering cluster API address") - continue } - if !checkClusterVersion(address) { + cnode := NewClusterNode(address) + + if !verifyClusterVersion(cnode.Version()) { logger.Warn().Log("Version mismatch. Cluster will end up in degraded mode") } - client := apiclient.APIClient{ - Address: address, + if err := verifyClusterConfig(c.config, cnode.Config()); err != nil { + logger.Warn().WithError(err).Log("Config mismatch. Cluster will end up in degraded mode") } - coreAddress, err := client.CoreAPIAddress() - if err != nil { - logger.Warn().WithError(err).Log("Retrieve core API address") - continue - } - - node := proxy.NewNode(coreAddress) - err = node.Connect() - if err != nil { - c.logger.Warn().WithError(err).Log("Connecting to core API") - continue - } - - // TODO: Check constraints - - if _, err := c.proxy.AddNode(id, node); err != nil { + if _, err := c.proxy.AddNode(id, cnode.Proxy()); err != nil { c.logger.Warn().WithError(err).Log("Adding node") - node.Disconnect() continue } - c.nodes[id] = node + c.nodes[id] = cnode } else { delete(removeNodes, id) } @@ -681,17 +671,23 @@ func (c *cluster) trackNodeChanges() { } c.proxy.RemoveNode(id) - node.Disconnect() + node.Stop() delete(c.nodes, id) } c.nodesLock.Unlock() - // Put the cluster in "degraded" mode in case there's a version mismatch - isDegraded := !c.checkClusterVersions(servers) + // Put the cluster in "degraded" mode in case there's a mismatch in expected values + err = c.checkClusterNodes() c.stateLock.Lock() - c.isDegraded = isDegraded + if err != nil { + c.isDegraded = true + c.isDegradedErr = err + } else { + c.isDegraded = false + c.isDegradedErr = nil + } c.stateLock.Unlock() case <-c.shutdownCh: return @@ -699,63 +695,30 @@ func (c *cluster) trackNodeChanges() { } } -func (c *cluster) checkClusterVersions(servers []raft.Server) bool { - ok := true - okChan := make(chan bool, 64) +func (c *cluster) checkClusterNodes() error { + c.nodesLock.RLock() + defer c.nodesLock.RUnlock() - wgSummary := sync.WaitGroup{} - wgSummary.Add(1) - - go func() { - defer wgSummary.Done() - - for okServer := range okChan { - if !okServer { - ok = false - } + for id, node := range c.nodes { + if status, statusErr := node.Status(); status == "offline" { + return fmt.Errorf("node %s is offline: %w", id, statusErr) } - }() - wg := sync.WaitGroup{} + version := node.Version() + if !verifyClusterVersion(version) { + return fmt.Errorf("node %s has a different version: %s", id, version) + } - for _, server := range servers { - wg.Add(1) - - go func(server raft.Server, p chan<- bool) { - defer wg.Done() - - address, err := clusterAPIAddress(server.Address) - if err != nil { - p <- false - return - } - - p <- checkClusterVersion(address) - }(server, okChan) + config := node.Config() + if configErr := verifyClusterConfig(c.config, config); configErr != nil { + return fmt.Errorf("node %s has a different configuration: %w", id, configErr) + } } - wg.Wait() - - close(okChan) - - wgSummary.Wait() - - return ok + return nil } -func checkClusterVersion(address string) bool { - client := apiclient.APIClient{ - Address: address, - Client: &http.Client{ - Timeout: 5 * time.Second, - }, - } - - v, err := client.Version() - if err != nil { - return false - } - +func verifyClusterVersion(v string) bool { version, err := ParseClusterVersion(v) if err != nil { return false @@ -768,6 +731,46 @@ func checkClusterVersion(address string) bool { return true } +func verifyClusterConfig(local, remote *config.Config) error { + if remote == nil { + return fmt.Errorf("config is not available") + } + + if local.Cluster.Enable != remote.Cluster.Enable { + return fmt.Errorf("cluster.enable is different") + } + + if local.Cluster.SyncInterval != remote.Cluster.SyncInterval { + return fmt.Errorf("cluster.sync_interval_sec is different, local: %ds vs. remote: %ds", local.Cluster.SyncInterval, remote.Cluster.SyncInterval) + } + + if local.Cluster.NodeRecoverTimeout != remote.Cluster.NodeRecoverTimeout { + return fmt.Errorf("cluster.node_recover_timeout_sec is different, local: %ds vs. remote: %ds", local.Cluster.NodeRecoverTimeout, remote.Cluster.NodeRecoverTimeout) + } + + if local.Cluster.EmergencyLeaderTimeout != remote.Cluster.EmergencyLeaderTimeout { + return fmt.Errorf("cluster.emergency_leader_timeout_sec is different, local: %ds vs. remote: %ds", local.Cluster.EmergencyLeaderTimeout, remote.Cluster.EmergencyLeaderTimeout) + } + + if local.RTMP.Enable != remote.RTMP.Enable { + return fmt.Errorf("rtmp.enable is different") + } + + if local.RTMP.App != remote.RTMP.App { + return fmt.Errorf("rtmp.app is different") + } + + if local.SRT.Enable != remote.SRT.Enable { + return fmt.Errorf("srt.enable is different") + } + + if local.SRT.Passphrase != remote.SRT.Passphrase { + return fmt.Errorf("srt.passphrase is different") + } + + return nil +} + // trackLeaderChanges registers an Observer with raft in order to receive updates // about leader changes, in order to keep the forwarder up to date. func (c *cluster) trackLeaderChanges() { diff --git a/cluster/leader.go b/cluster/leader.go index ac6747c1..245e95fd 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -333,14 +333,18 @@ func (c *cluster) startSynchronizeAndRebalance(ctx context.Context, interval tim case <-ctx.Done(): return case <-ticker.C: - if c.IsDegraded() { - break - } - - c.doSynchronize(emergency) - if !emergency { - c.doRebalance(emergency) + if c.IsDegraded() { + break + } + + c.doSynchronize(emergency) + + if !emergency { + c.doRebalance(emergency) + } + } else { + c.doSynchronize(emergency) } } } diff --git a/cluster/node.go b/cluster/node.go index 9d1c7b06..3dd8df90 100644 --- a/cluster/node.go +++ b/cluster/node.go @@ -2,26 +2,33 @@ package cluster import ( "context" + "fmt" "net/http" "sync" "time" "github.com/datarhei/core/v16/cluster/client" "github.com/datarhei/core/v16/cluster/proxy" + "github.com/datarhei/core/v16/config" ) type clusterNode struct { client client.APIClient - version string - lastContact time.Time - latency time.Duration - pingLock sync.RWMutex + version string + lastContact time.Time + lastContactErr error + lastCoreContact time.Time + lastCoreContactErr error + latency time.Duration + pingLock sync.RWMutex runLock sync.Mutex cancelPing context.CancelFunc - proxyNode proxy.Node + proxyLock sync.Mutex + proxyNode proxy.Node + proxyNodeErr error } func NewClusterNode(address string) *clusterNode { @@ -35,6 +42,21 @@ func NewClusterNode(address string) *clusterNode { }, } + version, err := n.client.Version() + if err != nil { + version = "0.0.0" + } + + n.version = version + + p := proxy.NewNode(address) + err = p.Connect() + if err == nil { + n.proxyNode = p + } else { + n.proxyNodeErr = err + } + return n } @@ -46,24 +68,40 @@ func (n *clusterNode) Start() error { return nil } - version, err := n.client.Version() - if err != nil { - return err - } - - n.version = version - - address, err := n.CoreAPIAddress() - if err != nil { - return err - } - - n.proxyNode = proxy.NewNode(address) - ctx, cancel := context.WithCancel(context.Background()) n.cancelPing = cancel go n.ping(ctx) + go n.pingCore(ctx) + + address, _ := n.CoreAPIAddress() + + p := proxy.NewNode(address) + err := p.Connect() + if err != nil { + n.proxyNodeErr = err + + go func(ctx context.Context, address string) { + ticker := time.NewTicker(3 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + p := proxy.NewNode(address) + err := p.Connect() + if err == nil { + n.proxyNode = p + return + } + case <-ctx.Done(): + return + } + } + }(ctx, address) + } else { + n.proxyNode = p + } return nil } @@ -76,6 +114,8 @@ func (n *clusterNode) Stop() error { return nil } + n.proxyNode.Disconnect() + n.cancelPing() n.cancelPing = nil @@ -89,10 +129,42 @@ func (n *clusterNode) Version() string { return n.version } +func (n *clusterNode) Status() (string, error) { + n.pingLock.RLock() + defer n.pingLock.RUnlock() + + since := time.Since(n.lastContact) + if since > 5*time.Second { + return "offline", fmt.Errorf("the cluster API didn't respond for %s because: %w", since, n.lastContactErr) + } + + since = time.Since(n.lastCoreContact) + if since > 5*time.Second { + return "offline", fmt.Errorf("the core API didn't respond for %s because: %w", since, n.lastCoreContactErr) + } + + return "online", nil +} + +func (n *clusterNode) LastContact() time.Time { + n.pingLock.RLock() + defer n.pingLock.RUnlock() + + return n.lastContact +} + +func (n *clusterNode) Config() *config.Config { + return nil +} + func (n *clusterNode) CoreAPIAddress() (string, error) { return n.client.CoreAPIAddress() } +func (n *clusterNode) Proxy() proxy.Node { + return n.proxyNode +} + func (n *clusterNode) ping(ctx context.Context) { ticker := time.NewTicker(time.Second) defer ticker.Stop() @@ -108,6 +180,41 @@ func (n *clusterNode) ping(ctx context.Context) { n.lastContact = time.Now() n.latency = time.Since(start) n.pingLock.Unlock() + } else { + n.pingLock.Lock() + n.lastContactErr = err + n.pingLock.Unlock() + } + case <-ctx.Done(): + return + } + } +} + +func (n *clusterNode) pingCore(ctx context.Context) { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + var err error + n.proxyLock.Lock() + if n.proxyNode == nil { + err = fmt.Errorf("can't connect to core api: %w", n.proxyNodeErr) + } else { + _, err = n.proxyNode.Ping() + } + n.proxyLock.Unlock() + + if err == nil { + n.pingLock.Lock() + n.lastCoreContact = time.Now() + n.pingLock.Unlock() + } else { + n.pingLock.Lock() + n.lastCoreContactErr = err + n.pingLock.Unlock() } case <-ctx.Done(): return diff --git a/cluster/proxy/node.go b/cluster/proxy/node.go index c77a1a9b..b8074398 100644 --- a/cluster/proxy/node.go +++ b/cluster/proxy/node.go @@ -22,6 +22,8 @@ type Node interface { Connect() error Disconnect() + Config() clientapi.ConfigV3 + StartFiles(updates chan<- NodeFiles) error StopFiles() @@ -39,6 +41,7 @@ type Node interface { type NodeReader interface { IPs() []string + Ping() (time.Duration, error) About() NodeAbout Version() NodeVersion Resources() NodeResources @@ -114,6 +117,8 @@ type node struct { memLimit uint64 } + config clientapi.ConfigV3 + state nodeState latency float64 // Seconds stateLock sync.RWMutex @@ -190,6 +195,8 @@ func (n *node) Connect() error { return fmt.Errorf("failed to convert config to expected version") } + n.config = config + n.httpAddress = u if config.RTMP.Enable { @@ -205,6 +212,8 @@ func (n *node) Connect() error { n.rtmpAddress.Scheme = "rtmps:" } + n.rtmpAddress.JoinPath(config.RTMP.App) + n.rtmpAddress.Host = address } @@ -242,10 +251,10 @@ func (n *node) Connect() error { select { case <-ticker.C: // Ping - ok, latency := n.Ping() + latency, err := n.Ping() n.stateLock.Lock() - if !ok { + if err != nil { n.state = stateDisconnected } else { n.lastContact = time.Now() @@ -350,15 +359,26 @@ func (n *node) Connect() error { return nil } -func (n *node) Ping() (bool, time.Duration) { +func (n *node) Config() clientapi.ConfigV3 { + return n.config +} + +func (n *node) Ping() (time.Duration, error) { n.peerLock.RLock() defer n.peerLock.RUnlock() if n.peer == nil { - return false, 0 + return 0, fmt.Errorf("not connected") } - return n.peer.Ping() + ok, latency := n.peer.Ping() + var err error = nil + + if !ok { + err = fmt.Errorf("not connected") + } + + return latency, err } func (n *node) Metrics(query clientapi.MetricsQuery) (clientapi.MetricsResponse, error) { @@ -727,18 +747,18 @@ func cloneURL(src *url.URL) *url.URL { return dst } -func (n *node) GetURL(prefix, path string) (*url.URL, error) { +func (n *node) GetURL(prefix, resource string) (*url.URL, error) { var u *url.URL if prefix == "mem" { u = cloneURL(n.httpAddress) - u.JoinPath("memfs", path) + u.JoinPath("memfs", resource) } else if prefix == "disk" { u = cloneURL(n.httpAddress) - u.JoinPath(path) + u.JoinPath(resource) } else if prefix == "rtmp" { u = cloneURL(n.rtmpAddress) - u.JoinPath(path) + u.JoinPath(resource) } else if prefix == "srt" { u = cloneURL(n.srtAddress) } else { diff --git a/cluster/raft/raft.go b/cluster/raft/raft.go index bf2c82fa..32484058 100644 --- a/cluster/raft/raft.go +++ b/cluster/raft/raft.go @@ -82,12 +82,10 @@ type Stats struct { } type Config struct { - ID string // ID of the node - Path string // Path where to store all cluster data - Bootstrap bool // Whether to bootstrap a cluster - Recover bool // Whether to recover this node - Address string // Listen address for the raft protocol - Peers []Peer // Address of a member of a cluster to join + ID string // ID of the node + Path string // Path where to store all cluster data + Address string // Listen address for the raft protocol + Peers []Peer // Address of a member of a cluster to join Store hcraft.FSM @@ -118,7 +116,7 @@ func New(config Config) (Raft, error) { r.logger = log.New("") } - err := r.start(config.Store, config.Bootstrap, config.Recover, config.Peers, false) + err := r.start(config.Store, config.Peers, false) if err != nil { return nil, fmt.Errorf("failed to start raft: %w", err) } @@ -320,7 +318,7 @@ func (r *raft) Snapshot() (io.ReadCloser, error) { return &readCloserWrapper{&buffer}, nil } -func (r *raft) start(fsm hcraft.FSM, bootstrap, recover bool, peers []Peer, inmem bool) error { +func (r *raft) start(fsm hcraft.FSM, peers []Peer, inmem bool) error { defer func() { if r.raft == nil && r.raftStore != nil { r.raftStore.Close() diff --git a/config/config.go b/config/config.go index 6e32efc4..2a9e5c74 100644 --- a/config/config.go +++ b/config/config.go @@ -284,14 +284,12 @@ func (d *Config) init() { // Cluster d.vars.Register(value.NewBool(&d.Cluster.Enable, false), "cluster.enable", "CORE_CLUSTER_ENABLE", nil, "Enable cluster mode", false, false) - d.vars.Register(value.NewBool(&d.Cluster.Bootstrap, false), "cluster.bootstrap", "CORE_CLUSTER_BOOTSTRAP", nil, "Bootstrap a cluster", false, false) - d.vars.Register(value.NewBool(&d.Cluster.Recover, false), "cluster.recover", "CORE_CLUSTER_RECOVER", nil, "Recover a cluster", false, false) d.vars.Register(value.NewBool(&d.Cluster.Debug, false), "cluster.debug", "CORE_CLUSTER_DEBUG", nil, "Switch to debug mode, not for production", false, false) d.vars.Register(value.NewClusterAddress(&d.Cluster.Address, "127.0.0.1:8000"), "cluster.address", "CORE_CLUSTER_ADDRESS", nil, "Raft listen address", true, false) d.vars.Register(value.NewClusterPeerList(&d.Cluster.Peers, []string{""}, ","), "cluster.peers", "CORE_CLUSTER_PEERS", nil, "Raft addresses of cores that are part of the cluster", false, false) - d.vars.Register(value.NewInt64(&d.Cluster.SyncInterval, 5), "cluster.sync_interval", "CORE_CLUSTER_SYNC_INTERVAL", nil, "Interval between aligning the process in the cluster DB with the processes on the nodes", true, false) - d.vars.Register(value.NewInt64(&d.Cluster.NodeRecoverTimeout, 120), "cluster.node_recover_timeout", "CORE_CLUSTER_NODE_RECOVER_TIMEOUT", nil, "Timeout for a node to recover before rebalancing the processes", true, false) - d.vars.Register(value.NewInt64(&d.Cluster.EmergencyLeaderTimeout, 10), "cluster.emergency_leader_timeout", "CORE_CLUSTER_EMERGENCY_LEADER_TIMEOUT", nil, "Timeout for establishing the emergency leadership after lost contact to raft leader", true, false) + d.vars.Register(value.NewInt64(&d.Cluster.SyncInterval, 5), "cluster.sync_interval_sec", "CORE_CLUSTER_SYNC_INTERVAL", nil, "Interval between aligning the process in the cluster DB with the processes on the nodes", true, false) + d.vars.Register(value.NewInt64(&d.Cluster.NodeRecoverTimeout, 120), "cluster.node_recover_timeout_sec", "CORE_CLUSTER_NODE_RECOVER_TIMEOUT", nil, "Timeout for a node to recover before rebalancing the processes", true, false) + d.vars.Register(value.NewInt64(&d.Cluster.EmergencyLeaderTimeout, 10), "cluster.emergency_leader_timeout_sec", "CORE_CLUSTER_EMERGENCY_LEADER_TIMEOUT", nil, "Timeout for establishing the emergency leadership after lost contact to raft leader", true, false) } // Validate validates the current state of the Config for completeness and sanity. Errors are diff --git a/config/data.go b/config/data.go index 17234092..e1d2393e 100644 --- a/config/data.go +++ b/config/data.go @@ -174,14 +174,12 @@ type Data struct { } `json:"resources"` Cluster struct { Enable bool `json:"enable"` - Bootstrap bool `json:"bootstrap"` - Recover bool `json:"recover"` Debug bool `json:"debug"` Address string `json:"address"` // ip:port Peers []string `json:"peers"` - SyncInterval int64 `json:"sync_interval" format:"int64"` // seconds - NodeRecoverTimeout int64 `json:"node_recover_timeout" format:"int64"` // seconds - EmergencyLeaderTimeout int64 `json:"emergency_leader_timeout" format:"int64"` // seconds + SyncInterval int64 `json:"sync_interval_sec" format:"int64"` // seconds + NodeRecoverTimeout int64 `json:"node_recover_timeout_sec" format:"int64"` // seconds + EmergencyLeaderTimeout int64 `json:"emergency_leader_timeout_sec" format:"int64"` // seconds } `json:"cluster"` } diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index 3250f488..0d6ee0ff 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -277,9 +277,9 @@ func (s *server) handlePlay(conn *rtmp.Conn) { if ch == nil { // Check in the cluster for that stream - url, err := s.proxy.GetURL("rtmp", conn.URL.Path) + url, err := s.proxy.GetURL("rtmp", playpath) if err != nil { - s.log(identity, "PLAY", "NOTFOUND", conn.URL.Path, "", remote) + s.log(identity, "PLAY", "NOTFOUND", playpath, "", remote) return } @@ -289,7 +289,7 @@ func (s *server) handlePlay(conn *rtmp.Conn) { src, err := avutil.Open(peerurl) if err != nil { s.logger.Error().WithField("address", url).WithError(err).Log("Proxying address failed") - s.log(identity, "PLAY", "NOTFOUND", conn.URL.Path, "", remote) + s.log(identity, "PLAY", "NOTFOUND", playpath, "", remote) return }