diff --git a/cluster/api.go b/cluster/api.go index 75a92475..576bbac3 100644 --- a/cluster/api.go +++ b/cluster/api.go @@ -1,15 +1,11 @@ package cluster import ( - "bytes" "context" - "encoding/json" - "fmt" - "io" "net/http" "strings" - "time" + "github.com/datarhei/core/v16/cluster/client" httpapi "github.com/datarhei/core/v16/http/api" "github.com/datarhei/core/v16/http/errorhandler" "github.com/datarhei/core/v16/http/handler/util" @@ -17,7 +13,6 @@ import ( mwlog "github.com/datarhei/core/v16/http/middleware/log" "github.com/datarhei/core/v16/http/validator" "github.com/datarhei/core/v16/log" - "github.com/datarhei/core/v16/restream/app" "github.com/labstack/echo/v4" "github.com/labstack/echo/v4/middleware" @@ -42,27 +37,6 @@ type APIConfig struct { Logger log.Logger } -type JoinRequest struct { - Origin string `json:"origin"` - ID string `json:"id"` - RaftAddress string `json:"raft_address"` -} - -type LeaveRequest struct { - Origin string `json:"origin"` - ID string `json:"id"` -} - -type AddProcessRequest struct { - Origin string `json:"origin"` - Config app.Config `json:"config"` -} - -type RemoveProcessRequest struct { - Origin string `json:"origin"` - ID string `json:"id"` -} - func NewAPI(config APIConfig) (API, error) { a := &api{ id: config.ID, @@ -105,7 +79,7 @@ func NewAPI(config APIConfig) (API, error) { a.router.Logger.SetOutput(httplog.NewWrapper(a.logger)) a.router.POST("/v1/join", func(c echo.Context) error { - r := JoinRequest{} + r := client.JoinRequest{} if err := util.ShouldBindJSON(c, &r); err != nil { return httpapi.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) @@ -127,7 +101,7 @@ func NewAPI(config APIConfig) (API, error) { }) a.router.POST("/v1/leave", func(c echo.Context) error { - r := LeaveRequest{} + r := client.LeaveRequest{} if err := util.ShouldBindJSON(c, &r); err != nil { return httpapi.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) @@ -161,7 +135,7 @@ func NewAPI(config APIConfig) (API, error) { }) a.router.POST("/v1/process", func(c echo.Context) error { - r := AddProcessRequest{} + r := client.AddProcessRequest{} if err := util.ShouldBindJSON(c, &r); err != nil { return httpapi.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) @@ -183,7 +157,7 @@ func NewAPI(config APIConfig) (API, error) { }) a.router.POST("/v1/process/:id", func(c echo.Context) error { - r := RemoveProcessRequest{} + r := client.RemoveProcessRequest{} if err := util.ShouldBindJSON(c, &r); err != nil { return httpapi.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) @@ -220,141 +194,3 @@ func (a *api) Start() error { func (a *api) Shutdown(ctx context.Context) error { return a.router.Shutdown(ctx) } - -type APIClient struct { - Address string - Client *http.Client -} - -func (c *APIClient) CoreAPIAddress() (string, error) { - data, err := c.call(http.MethodGet, "/core", "", nil) - if err != nil { - return "", err - } - - var address string - err = json.Unmarshal(data, &address) - if err != nil { - return "", err - } - - return address, nil -} - -func (c *APIClient) Join(r JoinRequest) error { - data, err := json.Marshal(&r) - if err != nil { - return err - } - - _, err = c.call(http.MethodPost, "/join", "application/json", bytes.NewReader(data)) - - return err -} - -func (c *APIClient) Leave(r LeaveRequest) error { - data, err := json.Marshal(&r) - if err != nil { - return err - } - - _, err = c.call(http.MethodPost, "/leave", "application/json", bytes.NewReader(data)) - - return err -} - -func (c *APIClient) AddProcess(r AddProcessRequest) error { - data, err := json.Marshal(r) - if err != nil { - return err - } - - _, err = c.call(http.MethodPost, "/process", "application/json", bytes.NewReader(data)) - - return err -} - -func (c *APIClient) RemoveProcess(r RemoveProcessRequest) error { - data, err := json.Marshal(r) - if err != nil { - return err - } - - _, err = c.call(http.MethodPost, "/process/"+r.ID, "application/json", bytes.NewReader(data)) - - return err -} - -func (c *APIClient) Snapshot() (io.ReadCloser, error) { - return c.stream(http.MethodGet, "/snapshot", "", nil) -} - -func (c *APIClient) stream(method, path, contentType string, data io.Reader) (io.ReadCloser, error) { - if len(c.Address) == 0 { - return nil, fmt.Errorf("no address defined") - } - - address := "http://" + c.Address + "/v1" + path - - req, err := http.NewRequest(method, address, data) - if err != nil { - return nil, err - } - - if method == "POST" || method == "PUT" { - req.Header.Add("Content-Type", contentType) - } - - status, body, err := c.request(req) - if err != nil { - return nil, err - } - - if status < 200 || status >= 300 { - e := httpapi.Error{} - - defer body.Close() - - x, _ := io.ReadAll(body) - - json.Unmarshal(x, &e) - - return nil, e - } - - return body, nil -} - -func (c *APIClient) call(method, path, contentType string, data io.Reader) ([]byte, error) { - body, err := c.stream(method, path, contentType, data) - if err != nil { - return nil, err - } - - defer body.Close() - - x, _ := io.ReadAll(body) - - return x, nil -} - -func (c *APIClient) request(req *http.Request) (int, io.ReadCloser, error) { - if c.Client == nil { - tr := &http.Transport{ - MaxIdleConns: 10, - IdleConnTimeout: 30 * time.Second, - } - - c.Client = &http.Client{ - Transport: tr, - Timeout: 5 * time.Second, - } - } - - resp, err := c.Client.Do(req) - if err != nil { - return -1, nil, err - } - - return resp.StatusCode, resp.Body, nil -} diff --git a/cluster/client/client.go b/cluster/client/client.go new file mode 100644 index 00000000..586f7638 --- /dev/null +++ b/cluster/client/client.go @@ -0,0 +1,172 @@ +package client + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + httpapi "github.com/datarhei/core/v16/http/api" + "github.com/datarhei/core/v16/restream/app" +) + +type JoinRequest struct { + Origin string `json:"origin"` + ID string `json:"id"` + RaftAddress string `json:"raft_address"` +} + +type LeaveRequest struct { + Origin string `json:"origin"` + ID string `json:"id"` +} + +type AddProcessRequest struct { + Origin string `json:"origin"` + Config app.Config `json:"config"` +} + +type RemoveProcessRequest struct { + Origin string `json:"origin"` + ID string `json:"id"` +} + +type APIClient struct { + Address string + Client *http.Client +} + +func (c *APIClient) CoreAPIAddress() (string, error) { + data, err := c.call(http.MethodGet, "/core", "", nil) + if err != nil { + return "", err + } + + var address string + err = json.Unmarshal(data, &address) + if err != nil { + return "", err + } + + return address, nil +} + +func (c *APIClient) Join(r JoinRequest) error { + data, err := json.Marshal(&r) + if err != nil { + return err + } + + _, err = c.call(http.MethodPost, "/join", "application/json", bytes.NewReader(data)) + + return err +} + +func (c *APIClient) Leave(r LeaveRequest) error { + data, err := json.Marshal(&r) + if err != nil { + return err + } + + _, err = c.call(http.MethodPost, "/leave", "application/json", bytes.NewReader(data)) + + return err +} + +func (c *APIClient) AddProcess(r AddProcessRequest) error { + data, err := json.Marshal(r) + if err != nil { + return err + } + + _, err = c.call(http.MethodPost, "/process", "application/json", bytes.NewReader(data)) + + return err +} + +func (c *APIClient) RemoveProcess(r RemoveProcessRequest) error { + data, err := json.Marshal(r) + if err != nil { + return err + } + + _, err = c.call(http.MethodPost, "/process/"+r.ID, "application/json", bytes.NewReader(data)) + + return err +} + +func (c *APIClient) Snapshot() (io.ReadCloser, error) { + return c.stream(http.MethodGet, "/snapshot", "", nil) +} + +func (c *APIClient) stream(method, path, contentType string, data io.Reader) (io.ReadCloser, error) { + if len(c.Address) == 0 { + return nil, fmt.Errorf("no address defined") + } + + address := "http://" + c.Address + "/v1" + path + + req, err := http.NewRequest(method, address, data) + if err != nil { + return nil, err + } + + if method == "POST" || method == "PUT" { + req.Header.Add("Content-Type", contentType) + } + + status, body, err := c.request(req) + if err != nil { + return nil, err + } + + if status < 200 || status >= 300 { + e := httpapi.Error{} + + defer body.Close() + + x, _ := io.ReadAll(body) + + json.Unmarshal(x, &e) + + return nil, e + } + + return body, nil +} + +func (c *APIClient) call(method, path, contentType string, data io.Reader) ([]byte, error) { + body, err := c.stream(method, path, contentType, data) + if err != nil { + return nil, err + } + + defer body.Close() + + x, _ := io.ReadAll(body) + + return x, nil +} + +func (c *APIClient) request(req *http.Request) (int, io.ReadCloser, error) { + if c.Client == nil { + tr := &http.Transport{ + MaxIdleConns: 10, + IdleConnTimeout: 30 * time.Second, + } + + c.Client = &http.Client{ + Transport: tr, + Timeout: 5 * time.Second, + } + } + + resp, err := c.Client.Do(req) + if err != nil { + return -1, nil, err + } + + return resp.StatusCode, resp.Body, nil +} diff --git a/cluster/cluster.go b/cluster/cluster.go index fbf11ba3..ecf3745d 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -5,7 +5,6 @@ import ( "context" "encoding/gob" "encoding/json" - "errors" "fmt" "io" gonet "net" @@ -16,6 +15,11 @@ import ( "sync" "time" + apiclient "github.com/datarhei/core/v16/cluster/client" + "github.com/datarhei/core/v16/cluster/forwarder" + raftlogger "github.com/datarhei/core/v16/cluster/logger" + "github.com/datarhei/core/v16/cluster/proxy" + "github.com/datarhei/core/v16/cluster/store" "github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/net" "github.com/datarhei/core/v16/restream/app" @@ -46,8 +50,6 @@ import ( ** all these endpoints will forward the request to the leader. */ -var ErrNodeNotFound = errors.New("node not found") - type Cluster interface { // Address returns the raft address of this node Address() string @@ -71,7 +73,7 @@ type Cluster interface { AddProcess(origin string, config *app.Config) error RemoveProcess(origin, id string) error - ProxyReader() ProxyReader + ProxyReader() proxy.ProxyReader } type Peer struct { @@ -113,7 +115,7 @@ type cluster struct { peers []Peer - store Store + store store.Store reassertLeaderCh chan chan error cancelLeaderShip context.CancelFunc @@ -124,9 +126,9 @@ type cluster struct { shutdownCh chan struct{} shutdownLock sync.Mutex - forwarder Forwarder + forwarder forwarder.Forwarder api API - proxy Proxy + proxy proxy.Proxy coreAddress string @@ -135,7 +137,7 @@ type cluster struct { isLeader bool leaderLock sync.Mutex - nodes map[string]Node + nodes map[string]proxy.Node nodesLock sync.RWMutex } @@ -153,7 +155,7 @@ func New(config ClusterConfig) (Cluster, error) { leaveCh: make(chan struct{}), shutdownCh: make(chan struct{}), - nodes: map[string]Node{}, + nodes: map[string]proxy.Node{}, } u, err := url.Parse(config.CoreAPIAddress) @@ -173,7 +175,7 @@ func New(config ClusterConfig) (Cluster, error) { c.logger = log.New("") } - store, err := NewStore() + store, err := store.NewStore() if err != nil { return nil, err } @@ -195,7 +197,7 @@ func New(config ClusterConfig) (Cluster, error) { c.api = api - proxy, err := NewProxy(ProxyConfig{ + nodeproxy, err := proxy.NewProxy(proxy.ProxyConfig{ ID: c.id, Name: c.name, IPLimiter: config.IPLimiter, @@ -206,15 +208,15 @@ func New(config ClusterConfig) (Cluster, error) { return nil, err } - go func(proxy Proxy) { - proxy.Start() - }(proxy) + go func(nodeproxy proxy.Proxy) { + nodeproxy.Start() + }(nodeproxy) - c.proxy = proxy + c.proxy = nodeproxy go c.trackNodeChanges() - if forwarder, err := NewForwarder(ForwarderConfig{ + if forwarder, err := forwarder.New(forwarder.ForwarderConfig{ ID: c.id, Logger: c.logger.WithField("logname", "forwarder"), }); err != nil { @@ -299,7 +301,7 @@ func (c *cluster) CoreAPIAddress(raftAddress string) (string, error) { return "", err } - client := APIClient{ + client := apiclient.APIClient{ Address: addr, } @@ -494,7 +496,7 @@ func (c *cluster) Join(origin, id, raftAddress, peerAddress string) error { return fmt.Errorf("peer API doesn't respond: %w", err) } - node := NewNode(address) + node := proxy.NewNode(address) err = node.Connect() if err != nil { return fmt.Errorf("couldn't connect to peer: %w", err) @@ -640,7 +642,7 @@ func (c *cluster) trackNodeChanges() { continue } - node := NewNode(address) + node := proxy.NewNode(address) err = node.Connect() if err != nil { c.logger.Warn().WithError(err).WithFields(log.Fields{ @@ -745,14 +747,14 @@ func (c *cluster) startRaft(fsm raft.FSM, bootstrap, recover bool, peers []Peer, c.logger.Debug().Log("address: %s", addr) - transport, err := raft.NewTCPTransportWithLogger(c.raftAddress, addr, 3, 10*time.Second, NewLogger(c.logger, hclog.Debug).Named("raft-transport")) + transport, err := raft.NewTCPTransportWithLogger(c.raftAddress, addr, 3, 10*time.Second, raftlogger.New(c.logger, hclog.Debug).Named("raft-transport")) if err != nil { return err } c.raftTransport = transport - snapshotLogger := NewLogger(c.logger, hclog.Debug).Named("raft-snapshot") + snapshotLogger := raftlogger.New(c.logger, hclog.Debug).Named("raft-snapshot") snapshots, err := raft.NewFileSnapshotStoreWithLogger(c.path, 3, snapshotLogger) if err != nil { return err @@ -787,7 +789,7 @@ func (c *cluster) startRaft(fsm raft.FSM, bootstrap, recover bool, peers []Peer, cfg := raft.DefaultConfig() cfg.LocalID = raft.ServerID(c.id) - cfg.Logger = NewLogger(c.logger, hclog.Debug).Named("raft") + cfg.Logger = raftlogger.New(c.logger, hclog.Debug).Named("raft") hasState, err := raft.HasExistingState(logStore, stableStore, snapshots) if err != nil { @@ -823,7 +825,7 @@ func (c *cluster) startRaft(fsm raft.FSM, bootstrap, recover bool, peers []Peer, c.logger.Debug().Log("raft node bootstrapped") } else { // Recover cluster - fsm, err := NewStore() + fsm, err := store.NewStore() if err != nil { return err } @@ -919,9 +921,9 @@ func (c *cluster) AddProcess(origin string, config *app.Config) error { return c.forwarder.AddProcess(origin, config) } - cmd := &command{ - Operation: opAddProcess, - Data: &addProcessCommand{ + cmd := &store.Command{ + Operation: store.OpAddProcess, + Data: &store.CommandAddProcess{ Config: *config, }, } @@ -934,9 +936,9 @@ func (c *cluster) RemoveProcess(origin, id string) error { return c.forwarder.RemoveProcess(origin, id) } - cmd := &command{ - Operation: opRemoveProcess, - Data: &removeProcessCommand{ + cmd := &store.Command{ + Operation: store.OpRemoveProcess, + Data: &store.CommandRemoveProcess{ ID: id, }, } @@ -944,7 +946,7 @@ func (c *cluster) RemoveProcess(origin, id string) error { return c.applyCommand(cmd) } -func (c *cluster) applyCommand(cmd *command) error { +func (c *cluster) applyCommand(cmd *store.Command) error { b, err := json.Marshal(cmd) if err != nil { return err @@ -1076,6 +1078,6 @@ func (c *cluster) sentinel() { } } -func (c *cluster) ProxyReader() ProxyReader { +func (c *cluster) ProxyReader() proxy.ProxyReader { return c.proxy.Reader() } diff --git a/cluster/forwarder.go b/cluster/forwarder/forwarder.go similarity index 88% rename from cluster/forwarder.go rename to cluster/forwarder/forwarder.go index 45098c55..e93bf81e 100644 --- a/cluster/forwarder.go +++ b/cluster/forwarder/forwarder.go @@ -1,4 +1,4 @@ -package cluster +package forwarder import ( "fmt" @@ -7,6 +7,7 @@ import ( "sync" "time" + apiclient "github.com/datarhei/core/v16/cluster/client" "github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/restream/app" ) @@ -27,7 +28,7 @@ type forwarder struct { id string lock sync.RWMutex - client APIClient + client apiclient.APIClient logger log.Logger } @@ -37,7 +38,7 @@ type ForwarderConfig struct { Logger log.Logger } -func NewForwarder(config ForwarderConfig) (Forwarder, error) { +func New(config ForwarderConfig) (Forwarder, error) { f := &forwarder{ id: config.ID, logger: config.Logger, @@ -57,7 +58,7 @@ func NewForwarder(config ForwarderConfig) (Forwarder, error) { Timeout: 5 * time.Second, } - f.client = APIClient{ + f.client = apiclient.APIClient{ Client: client, } @@ -86,7 +87,7 @@ func (f *forwarder) Join(origin, id, raftAddress, peerAddress string) error { origin = f.id } - r := JoinRequest{ + r := apiclient.JoinRequest{ Origin: origin, ID: id, RaftAddress: raftAddress, @@ -99,7 +100,7 @@ func (f *forwarder) Join(origin, id, raftAddress, peerAddress string) error { f.lock.RUnlock() if len(peerAddress) != 0 { - client = APIClient{ + client = apiclient.APIClient{ Address: peerAddress, Client: f.client.Client, } @@ -113,7 +114,7 @@ func (f *forwarder) Leave(origin, id string) error { origin = f.id } - r := LeaveRequest{ + r := apiclient.LeaveRequest{ Origin: origin, ID: id, } @@ -140,7 +141,7 @@ func (f *forwarder) AddProcess(origin string, config *app.Config) error { origin = f.id } - r := AddProcessRequest{ + r := apiclient.AddProcessRequest{ Origin: origin, Config: *config, } @@ -161,7 +162,7 @@ func (f *forwarder) RemoveProcess(origin, id string) error { origin = f.id } - r := RemoveProcessRequest{ + r := apiclient.RemoveProcessRequest{ Origin: origin, ID: id, } diff --git a/cluster/leader.go b/cluster/leader.go index bd2dea32..5db6a0ff 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/datarhei/core/v16/cluster/proxy" "github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/restream/app" ) @@ -471,7 +472,7 @@ func (c *cluster) doRebalance() { } // normalizeProcessesAndResources normalizes the CPU and memory consumption of the processes and resources in-place. -func normalizeProcessesAndResources(processes []ProcessConfig, resources map[string]NodeResources) { +func normalizeProcessesAndResources(processes []proxy.ProcessConfig, resources map[string]proxy.NodeResources) { maxNCPU := .0 maxMemTotal := .0 @@ -520,7 +521,7 @@ func normalizeProcessesAndResources(processes []ProcessConfig, resources map[str // synchronize returns a list of operations in order to adjust the "have" list to the "want" list // with taking the available resources on each node into account. -func synchronize(want []app.Config, have []ProcessConfig, resources map[string]NodeResources) []interface{} { +func synchronize(want []app.Config, have []proxy.ProcessConfig, resources map[string]proxy.NodeResources) []interface{} { normalizeProcessesAndResources(have, resources) // A map from the process ID to the process config of the processes @@ -535,7 +536,7 @@ func synchronize(want []app.Config, have []ProcessConfig, resources map[string]N // Now we iterate through the processes we actually have running on the nodes // and remove them from the wantMap. We also make sure that they are running. // If a process is not on the wantMap, it will be deleted from the nodes. - haveAfterRemove := []ProcessConfig{} + haveAfterRemove := []proxy.ProcessConfig{} for _, p := range have { if _, ok := wantMap[p.Config.ID]; !ok { @@ -664,7 +665,7 @@ type referenceAffinityNodeCount struct { count uint64 } -func createReferenceAffinityMap(processes []ProcessConfig) map[string][]referenceAffinityNodeCount { +func createReferenceAffinityMap(processes []proxy.ProcessConfig) map[string][]referenceAffinityNodeCount { referenceAffinityMap := map[string][]referenceAffinityNodeCount{} for _, p := range processes { if len(p.Config.Reference) == 0 { @@ -709,11 +710,11 @@ func createReferenceAffinityMap(processes []ProcessConfig) map[string][]referenc // rebalance returns a list of operations that will move running processes away from nodes // that are overloaded. -func rebalance(have []ProcessConfig, resources map[string]NodeResources) []interface{} { +func rebalance(have []proxy.ProcessConfig, resources map[string]proxy.NodeResources) []interface{} { normalizeProcessesAndResources(have, resources) // Group the processes by node - processNodeMap := map[string][]ProcessConfig{} + processNodeMap := map[string][]proxy.ProcessConfig{} for _, p := range have { processNodeMap[p.NodeID] = append(processNodeMap[p.NodeID], p) diff --git a/cluster/leader_test.go b/cluster/leader_test.go index c371af2e..7add6ae8 100644 --- a/cluster/leader_test.go +++ b/cluster/leader_test.go @@ -3,13 +3,14 @@ package cluster import ( "testing" + "github.com/datarhei/core/v16/cluster/proxy" "github.com/datarhei/core/v16/restream/app" "github.com/stretchr/testify/require" ) func TestNormalize(t *testing.T) { - have := []ProcessConfig{ + have := []proxy.ProcessConfig{ { NodeID: "node2", Order: "start", @@ -23,7 +24,7 @@ func TestNormalize(t *testing.T) { }, } - resources := map[string]NodeResources{ + resources := map[string]proxy.NodeResources{ "node1": { NCPU: 2, CPU: 7, @@ -40,7 +41,7 @@ func TestNormalize(t *testing.T) { normalizeProcessesAndResources(have, resources) - require.Equal(t, []ProcessConfig{ + require.Equal(t, []proxy.ProcessConfig{ { NodeID: "node2", Order: "start", @@ -54,7 +55,7 @@ func TestNormalize(t *testing.T) { }, }, have) - require.Equal(t, map[string]NodeResources{ + require.Equal(t, map[string]proxy.NodeResources{ "node1": { NCPU: 2, CPU: 7, @@ -72,7 +73,7 @@ func TestNormalize(t *testing.T) { // test idempotency normalizeProcessesAndResources(have, resources) - require.Equal(t, []ProcessConfig{ + require.Equal(t, []proxy.ProcessConfig{ { NodeID: "node2", Order: "start", @@ -86,7 +87,7 @@ func TestNormalize(t *testing.T) { }, }, have) - require.Equal(t, map[string]NodeResources{ + require.Equal(t, map[string]proxy.NodeResources{ "node1": { NCPU: 2, CPU: 7, @@ -111,9 +112,9 @@ func TestSynchronizeAdd(t *testing.T) { }, } - have := []ProcessConfig{} + have := []proxy.ProcessConfig{} - resources := map[string]NodeResources{ + resources := map[string]proxy.NodeResources{ "node1": { NCPU: 1, CPU: 7, @@ -145,7 +146,7 @@ func TestSynchronizeAdd(t *testing.T) { }, }, stack) - require.Equal(t, map[string]NodeResources{ + require.Equal(t, map[string]proxy.NodeResources{ "node1": { NCPU: 1, CPU: 17, @@ -181,7 +182,7 @@ func TestSynchronizeAddReferenceAffinity(t *testing.T) { }, } - have := []ProcessConfig{ + have := []proxy.ProcessConfig{ { NodeID: "node2", Order: "start", @@ -196,7 +197,7 @@ func TestSynchronizeAddReferenceAffinity(t *testing.T) { }, } - resources := map[string]NodeResources{ + resources := map[string]proxy.NodeResources{ "node1": { NCPU: 1, CPU: 1, @@ -239,9 +240,9 @@ func TestSynchronizeAddLimit(t *testing.T) { }, } - have := []ProcessConfig{} + have := []proxy.ProcessConfig{} - resources := map[string]NodeResources{ + resources := map[string]proxy.NodeResources{ "node1": { NCPU: 1, CPU: 81, @@ -273,7 +274,7 @@ func TestSynchronizeAddLimit(t *testing.T) { }, }, stack) - require.Equal(t, map[string]NodeResources{ + require.Equal(t, map[string]proxy.NodeResources{ "node1": { NCPU: 1, CPU: 81, @@ -302,9 +303,9 @@ func TestSynchronizeAddNoResourcesCPU(t *testing.T) { }, } - have := []ProcessConfig{} + have := []proxy.ProcessConfig{} - resources := map[string]NodeResources{ + resources := map[string]proxy.NodeResources{ "node1": { NCPU: 1, CPU: 81, @@ -342,9 +343,9 @@ func TestSynchronizeAddNoResourcesMemory(t *testing.T) { }, } - have := []ProcessConfig{} + have := []proxy.ProcessConfig{} - resources := map[string]NodeResources{ + resources := map[string]proxy.NodeResources{ "node1": { NCPU: 1, CPU: 81, @@ -380,9 +381,9 @@ func TestSynchronizeAddNoLimits(t *testing.T) { }, } - have := []ProcessConfig{} + have := []proxy.ProcessConfig{} - resources := map[string]NodeResources{ + resources := map[string]proxy.NodeResources{ "node1": { NCPU: 1, CPU: 81, @@ -414,7 +415,7 @@ func TestSynchronizeAddNoLimits(t *testing.T) { func TestSynchronizeRemove(t *testing.T) { want := []app.Config{} - have := []ProcessConfig{ + have := []proxy.ProcessConfig{ { NodeID: "node2", Order: "start", @@ -428,7 +429,7 @@ func TestSynchronizeRemove(t *testing.T) { }, } - resources := map[string]NodeResources{ + resources := map[string]proxy.NodeResources{ "node1": { NCPU: 1, CPU: 7, @@ -456,7 +457,7 @@ func TestSynchronizeRemove(t *testing.T) { }, }, stack) - require.Equal(t, map[string]NodeResources{ + require.Equal(t, map[string]proxy.NodeResources{ "node1": { NCPU: 1, CPU: 7, @@ -485,7 +486,7 @@ func TestSynchronizeAddRemove(t *testing.T) { }, } - have := []ProcessConfig{ + have := []proxy.ProcessConfig{ { NodeID: "node2", Order: "start", @@ -499,7 +500,7 @@ func TestSynchronizeAddRemove(t *testing.T) { }, } - resources := map[string]NodeResources{ + resources := map[string]proxy.NodeResources{ "node1": { NCPU: 1, CPU: 7, @@ -535,7 +536,7 @@ func TestSynchronizeAddRemove(t *testing.T) { }, }, stack) - require.Equal(t, map[string]NodeResources{ + require.Equal(t, map[string]proxy.NodeResources{ "node1": { NCPU: 1, CPU: 17, @@ -556,7 +557,7 @@ func TestSynchronizeAddRemove(t *testing.T) { } func TestRebalanceNothingToDo(t *testing.T) { - processes := []ProcessConfig{ + processes := []proxy.ProcessConfig{ { NodeID: "node1", Order: "start", @@ -581,7 +582,7 @@ func TestRebalanceNothingToDo(t *testing.T) { }, } - resources := map[string]NodeResources{ + resources := map[string]proxy.NodeResources{ "node1": { NCPU: 1, CPU: 42, @@ -606,7 +607,7 @@ func TestRebalanceNothingToDo(t *testing.T) { } func TestRebalanceOverload(t *testing.T) { - processes := []ProcessConfig{ + processes := []proxy.ProcessConfig{ { NodeID: "node1", Order: "start", @@ -642,7 +643,7 @@ func TestRebalanceOverload(t *testing.T) { }, } - resources := map[string]NodeResources{ + resources := map[string]proxy.NodeResources{ "node1": { NCPU: 1, CPU: 91, @@ -675,7 +676,7 @@ func TestRebalanceOverload(t *testing.T) { }, }, opStack) - require.Equal(t, map[string]NodeResources{ + require.Equal(t, map[string]proxy.NodeResources{ "node1": { NCPU: 1, CPU: 74, @@ -696,7 +697,7 @@ func TestRebalanceOverload(t *testing.T) { } func TestRebalanceSkip(t *testing.T) { - processes := []ProcessConfig{ + processes := []proxy.ProcessConfig{ { NodeID: "node1", Order: "start", @@ -732,7 +733,7 @@ func TestRebalanceSkip(t *testing.T) { }, } - resources := map[string]NodeResources{ + resources := map[string]proxy.NodeResources{ "node1": { NCPU: 1, CPU: 91, @@ -773,7 +774,7 @@ func TestRebalanceSkip(t *testing.T) { }, }, opStack) - require.Equal(t, map[string]NodeResources{ + require.Equal(t, map[string]proxy.NodeResources{ "node1": { NCPU: 1, CPU: 91, @@ -794,7 +795,7 @@ func TestRebalanceSkip(t *testing.T) { } func TestRebalanceReferenceAffinity(t *testing.T) { - processes := []ProcessConfig{ + processes := []proxy.ProcessConfig{ { NodeID: "node1", Order: "start", @@ -856,7 +857,7 @@ func TestRebalanceReferenceAffinity(t *testing.T) { }, } - resources := map[string]NodeResources{ + resources := map[string]proxy.NodeResources{ "node1": { NCPU: 1, CPU: 90, @@ -898,7 +899,7 @@ func TestRebalanceReferenceAffinity(t *testing.T) { }, }, opStack) - require.Equal(t, map[string]NodeResources{ + require.Equal(t, map[string]proxy.NodeResources{ "node1": { NCPU: 1, CPU: 89, @@ -927,7 +928,7 @@ func TestRebalanceReferenceAffinity(t *testing.T) { } func TestCreateReferenceAffinityNodeMap(t *testing.T) { - processes := []ProcessConfig{ + processes := []proxy.ProcessConfig{ { NodeID: "node1", Order: "start", diff --git a/cluster/logger.go b/cluster/logger/logger.go similarity index 97% rename from cluster/logger.go rename to cluster/logger/logger.go index 4f5f03b4..3e24f07b 100644 --- a/cluster/logger.go +++ b/cluster/logger/logger.go @@ -1,4 +1,4 @@ -package cluster +package logger import ( "io" @@ -19,7 +19,7 @@ type hclogger struct { args []interface{} } -func NewLogger(logger log.Logger, lvl hclog.Level) hclog.Logger { +func New(logger log.Logger, lvl hclog.Level) hclog.Logger { return &hclogger{ logger: logger, level: lvl, diff --git a/cluster/node.go b/cluster/proxy/node.go similarity index 92% rename from cluster/node.go rename to cluster/proxy/node.go index 12703327..b0993428 100644 --- a/cluster/node.go +++ b/cluster/proxy/node.go @@ -1,4 +1,4 @@ -package cluster +package proxy import ( "context" @@ -375,6 +375,13 @@ func (n *node) IPs() []string { } func (n *node) ID() string { + n.peerLock.RLock() + defer n.peerLock.RUnlock() + + if n.peer == nil { + return "" + } + return n.peer.ID() } @@ -383,7 +390,7 @@ func (n *node) Files() NodeFiles { defer n.stateLock.RUnlock() state := NodeFiles{ - ID: n.peer.ID(), + ID: n.ID(), LastUpdate: n.lastUpdate, } @@ -400,7 +407,7 @@ func (n *node) State() NodeState { defer n.stateLock.RUnlock() state := NodeState{ - ID: n.peer.ID(), + ID: n.ID(), LastContact: n.lastContact, State: n.state.String(), Latency: time.Duration(n.latency * float64(time.Second)), @@ -445,6 +452,10 @@ func (n *node) files() { n.peerLock.RLock() defer n.peerLock.RUnlock() + if n.peer == nil { + return + } + files, err := n.peer.MemFSList("name", "asc") if err != nil { return @@ -461,6 +472,10 @@ func (n *node) files() { n.peerLock.RLock() defer n.peerLock.RUnlock() + if n.peer == nil { + return + } + files, err := n.peer.DiskFSList("name", "asc") if err != nil { return @@ -480,6 +495,10 @@ func (n *node) files() { n.peerLock.RLock() defer n.peerLock.RUnlock() + if n.peer == nil { + return + } + files, err := n.peer.RTMPChannels() if err != nil { return @@ -500,6 +519,10 @@ func (n *node) files() { n.peerLock.RLock() defer n.peerLock.RUnlock() + if n.peer == nil { + return + } + files, err := n.peer.SRTChannels() if err != nil { return @@ -570,6 +593,10 @@ func (n *node) GetFile(path string) (io.ReadCloser, error) { n.peerLock.RLock() defer n.peerLock.RUnlock() + if n.peer == nil { + return nil, fmt.Errorf("not connected") + } + if prefix == "mem" { return n.peer.MemFSGetFile(path) } else if prefix == "disk" { @@ -580,6 +607,13 @@ func (n *node) GetFile(path string) (io.ReadCloser, error) { } func (n *node) ProcessList() ([]ProcessConfig, error) { + n.peerLock.RLock() + defer n.peerLock.RUnlock() + + if n.peer == nil { + return nil, fmt.Errorf("not connected") + } + list, err := n.peer.ProcessList(nil, []string{ "state", "config", @@ -613,6 +647,13 @@ func (n *node) ProcessList() ([]ProcessConfig, error) { } func (n *node) ProcessAdd(config *app.Config) error { + n.peerLock.RLock() + defer n.peerLock.RUnlock() + + if n.peer == nil { + return fmt.Errorf("not connected") + } + cfg := httpapi.ProcessConfig{} cfg.Unmarshal(config) @@ -620,13 +661,34 @@ func (n *node) ProcessAdd(config *app.Config) error { } func (n *node) ProcessStart(id string) error { + n.peerLock.RLock() + defer n.peerLock.RUnlock() + + if n.peer == nil { + return fmt.Errorf("not connected") + } + return n.peer.ProcessCommand(id, "start") } func (n *node) ProcessStop(id string) error { + n.peerLock.RLock() + defer n.peerLock.RUnlock() + + if n.peer == nil { + return fmt.Errorf("not connected") + } + return n.peer.ProcessCommand(id, "stop") } func (n *node) ProcessDelete(id string) error { + n.peerLock.RLock() + defer n.peerLock.RUnlock() + + if n.peer == nil { + return fmt.Errorf("not connected") + } + return n.peer.ProcessDelete(id) } diff --git a/cluster/proxy.go b/cluster/proxy/proxy.go similarity index 99% rename from cluster/proxy.go rename to cluster/proxy/proxy.go index ced4b1f1..cd2cad59 100644 --- a/cluster/proxy.go +++ b/cluster/proxy/proxy.go @@ -1,7 +1,8 @@ -package cluster +package proxy import ( "context" + "errors" "fmt" "io" "sync" @@ -156,6 +157,8 @@ type proxy struct { logger log.Logger } +var ErrNodeNotFound = errors.New("node not found") + func NewProxy(config ProxyConfig) (Proxy, error) { p := &proxy{ id: config.ID, diff --git a/cluster/store.go b/cluster/store/store.go similarity index 86% rename from cluster/store.go rename to cluster/store/store.go index 04f615a6..9266ef95 100644 --- a/cluster/store.go +++ b/cluster/store/store.go @@ -1,4 +1,4 @@ -package cluster +package store import ( "encoding/json" @@ -21,25 +21,20 @@ type Store interface { type operation string const ( - opAddProcess operation = "addProcess" - opRemoveProcess operation = "removeProcess" + OpAddProcess operation = "addProcess" + OpRemoveProcess operation = "removeProcess" ) -type command struct { +type Command struct { Operation operation Data interface{} } -type StoreNode struct { - ID string - Address string -} - -type addProcessCommand struct { +type CommandAddProcess struct { app.Config } -type removeProcessCommand struct { +type CommandRemoveProcess struct { ID string } @@ -58,7 +53,7 @@ func NewStore() (Store, error) { func (s *store) Apply(log *raft.Log) interface{} { fmt.Printf("a log entry came in (index=%d, term=%d): %s\n", log.Index, log.Term, string(log.Data)) - c := command{} + c := Command{} err := json.Unmarshal(log.Data, &c) if err != nil { @@ -70,17 +65,17 @@ func (s *store) Apply(log *raft.Log) interface{} { fmt.Printf("op: %+v\n", c) switch c.Operation { - case opAddProcess: + case OpAddProcess: b, _ := json.Marshal(c.Data) - cmd := addProcessCommand{} + cmd := CommandAddProcess{} json.Unmarshal(b, &cmd) s.lock.Lock() s.Process[cmd.ID] = cmd.Config s.lock.Unlock() - case opRemoveProcess: + case OpRemoveProcess: b, _ := json.Marshal(c.Data) - cmd := removeProcessCommand{} + cmd := CommandRemoveProcess{} json.Unmarshal(b, &cmd) s.lock.Lock() diff --git a/http/fs/cluster.go b/http/fs/cluster.go index bb79cb21..2d1b895b 100644 --- a/http/fs/cluster.go +++ b/http/fs/cluster.go @@ -5,7 +5,7 @@ import ( gofs "io/fs" "time" - "github.com/datarhei/core/v16/cluster" + "github.com/datarhei/core/v16/cluster/proxy" "github.com/datarhei/core/v16/io/fs" ) @@ -17,10 +17,10 @@ type filesystem struct { fs.Filesystem name string - proxy cluster.ProxyReader + proxy proxy.ProxyReader } -func NewClusterFS(name string, fs fs.Filesystem, proxy cluster.ProxyReader) Filesystem { +func NewClusterFS(name string, fs fs.Filesystem, proxy proxy.ProxyReader) Filesystem { if proxy == nil { return fs } diff --git a/http/handler/api/cluster.go b/http/handler/api/cluster.go index 60198ddb..0794c08b 100644 --- a/http/handler/api/cluster.go +++ b/http/handler/api/cluster.go @@ -6,6 +6,7 @@ import ( "strings" "github.com/datarhei/core/v16/cluster" + "github.com/datarhei/core/v16/cluster/proxy" "github.com/datarhei/core/v16/http/api" "github.com/datarhei/core/v16/http/handler/util" "github.com/labstack/echo/v4" @@ -15,7 +16,7 @@ import ( // The ClusterHandler type provides handler functions for manipulating the cluster config. type ClusterHandler struct { cluster cluster.Cluster - proxy cluster.ProxyReader + proxy proxy.ProxyReader } // NewCluster return a new ClusterHandler type. You have to provide a cluster. diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index a3e0bc38..ea56e49c 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -11,7 +11,7 @@ import ( "sync" "time" - "github.com/datarhei/core/v16/cluster" + "github.com/datarhei/core/v16/cluster/proxy" "github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/session" @@ -58,7 +58,7 @@ type Config struct { // with methods like tls.Config.SetSessionTicketKeys. TLSConfig *tls.Config - Proxy cluster.ProxyReader + Proxy proxy.ProxyReader } // Server represents a RTMP server @@ -93,7 +93,7 @@ type server struct { channels map[string]*channel lock sync.RWMutex - proxy cluster.ProxyReader + proxy proxy.ProxyReader } // New creates a new RTMP server according to the given config @@ -119,7 +119,7 @@ func New(config Config) (Server, error) { } if s.proxy == nil { - s.proxy = cluster.NewNullProxyReader() + s.proxy = proxy.NewNullProxyReader() } s.server = &rtmp.Server{ diff --git a/srt/srt.go b/srt/srt.go index 6d8196cc..208b9b29 100644 --- a/srt/srt.go +++ b/srt/srt.go @@ -10,7 +10,7 @@ import ( "sync" "time" - "github.com/datarhei/core/v16/cluster" + "github.com/datarhei/core/v16/cluster/proxy" "github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/session" srt "github.com/datarhei/gosrt" @@ -40,7 +40,7 @@ type Config struct { SRTLogTopics []string - Proxy cluster.ProxyReader + Proxy proxy.ProxyReader } // Server represents a SRT server @@ -77,7 +77,7 @@ type server struct { srtlog map[string]*ring.Ring // Per logtopic a dedicated ring buffer srtlogLock sync.RWMutex - proxy cluster.ProxyReader + proxy proxy.ProxyReader } func New(config Config) (Server, error) { @@ -95,7 +95,7 @@ func New(config Config) (Server, error) { } if s.proxy == nil { - s.proxy = cluster.NewNullProxyReader() + s.proxy = proxy.NewNullProxyReader() } if s.logger == nil {