diff --git a/cluster/api.go b/cluster/api.go index 576bbac3..12ae14b3 100644 --- a/cluster/api.go +++ b/cluster/api.go @@ -72,7 +72,7 @@ func NewAPI(config APIConfig) (API, error) { a.router.Use(middleware.RecoverWithConfig(middleware.RecoverConfig{ LogErrorFunc: func(c echo.Context, err error, stack []byte) error { rows := strings.Split(string(stack), "\n") - a.logger.Error().WithField("stack", rows).Log("recovered from a panic") + a.logger.Error().WithField("stack", rows).Log("Recovered from a panic") return nil }, })) @@ -85,7 +85,10 @@ func NewAPI(config APIConfig) (API, error) { return httpapi.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) } - a.logger.Debug().WithField("id", r.ID).Log("got join request: %+v", r) + a.logger.Debug().WithFields(log.Fields{ + "id": r.ID, + "request": r, + }).Log("Join request: %+v", r) if r.Origin == a.id { return httpapi.Err(http.StatusLoopDetected, "", "breaking circuit") @@ -93,7 +96,7 @@ func NewAPI(config APIConfig) (API, error) { err := a.cluster.Join(r.Origin, r.ID, r.RaftAddress, "") if err != nil { - a.logger.Debug().WithError(err).WithField("id", r.ID).Log("unable to join cluster") + a.logger.Debug().WithError(err).WithField("id", r.ID).Log("Unable to join cluster") return httpapi.Err(http.StatusInternalServerError, "unable to join cluster", "%s", err) } @@ -107,7 +110,10 @@ func NewAPI(config APIConfig) (API, error) { return httpapi.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) } - a.logger.Debug().WithField("id", r.ID).Log("got leave request: %+v", r) + a.logger.Debug().WithFields(log.Fields{ + "id": r.ID, + "request": r, + }).Log("Leave request: %+v", r) if r.Origin == a.id { return httpapi.Err(http.StatusLoopDetected, "", "breaking circuit") @@ -115,7 +121,7 @@ func NewAPI(config APIConfig) (API, error) { err := a.cluster.Leave(r.Origin, r.ID) if err != nil { - a.logger.Debug().WithError(err).WithField("id", r.ID).Log("unable to leave cluster") + a.logger.Debug().WithError(err).WithField("id", r.ID).Log("Unable to leave cluster") return httpapi.Err(http.StatusInternalServerError, "unable to leave cluster", "%s", err) } @@ -125,7 +131,7 @@ func NewAPI(config APIConfig) (API, error) { a.router.GET("/v1/snaphot", func(c echo.Context) error { data, err := a.cluster.Snapshot() if err != nil { - a.logger.Debug().WithError(err).Log("unable to create snaphot") + a.logger.Debug().WithError(err).Log("Unable to create snaphot") return httpapi.Err(http.StatusInternalServerError, "unable to create snapshot", "%s", err) } @@ -141,7 +147,7 @@ func NewAPI(config APIConfig) (API, error) { return httpapi.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) } - a.logger.Debug().WithField("id", r.Config.ID).Log("got add process request") + a.logger.Debug().WithField("id", r.Config.ID).Log("Add process request") if r.Origin == a.id { return httpapi.Err(http.StatusLoopDetected, "", "breaking circuit") @@ -149,7 +155,7 @@ func NewAPI(config APIConfig) (API, error) { err := a.cluster.AddProcess(r.Origin, &r.Config) if err != nil { - a.logger.Debug().WithError(err).WithField("id", r.Config.ID).Log("unable to add process") + a.logger.Debug().WithError(err).WithField("id", r.Config.ID).Log("Unable to add process") return httpapi.Err(http.StatusInternalServerError, "unable to add process", "%s", err) } @@ -163,7 +169,7 @@ func NewAPI(config APIConfig) (API, error) { return httpapi.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) } - a.logger.Debug().WithField("id", r.ID).Log("got remove process request") + a.logger.Debug().WithField("id", r.ID).Log("Remove process request") if r.Origin == a.id { return httpapi.Err(http.StatusLoopDetected, "", "breaking circuit") @@ -171,7 +177,7 @@ func NewAPI(config APIConfig) (API, error) { err := a.cluster.RemoveProcess(r.Origin, r.ID) if err != nil { - a.logger.Debug().WithError(err).WithField("id", r.ID).Log("unable to remove process") + a.logger.Debug().WithError(err).WithField("id", r.ID).Log("Unable to remove process") return httpapi.Err(http.StatusInternalServerError, "unable to remove process", "%s", err) } @@ -187,10 +193,11 @@ func NewAPI(config APIConfig) (API, error) { } func (a *api) Start() error { - a.logger.Debug().Log("starting api at %s", a.address) + a.logger.Debug().WithField("address", a.address).Log("Starting api") return a.router.Start(a.address) } func (a *api) Shutdown(ctx context.Context) error { + a.logger.Debug().WithField("address", a.address).Log("Shutting down api") return a.router.Shutdown(ctx) } diff --git a/cluster/cluster.go b/cluster/cluster.go index 446cb97a..f213c39f 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -158,7 +158,9 @@ func New(config ClusterConfig) (Cluster, error) { c.logger = log.New("") } - store, err := store.NewStore() + store, err := store.NewStore(store.Config{ + Logger: c.logger.WithField("logname", "fsm"), + }) if err != nil { return nil, err } @@ -207,7 +209,7 @@ func New(config ClusterConfig) (Cluster, error) { c.forwarder = forwarder } - c.logger.Debug().Log("starting raft") + c.logger.Debug().Log("Starting raft") peers := []raft.Peer{} @@ -232,7 +234,7 @@ func New(config ClusterConfig) (Cluster, error) { Store: store, LeadershipNotifyCh: c.raftNotifyCh, LeaderObservationCh: c.raftLeaderObservationCh, - Logger: c.logger.WithComponent("raft"), + Logger: c.logger.WithComponent("Raft"), }) if err != nil { c.Shutdown() @@ -260,7 +262,7 @@ func New(config ClusterConfig) (Cluster, error) { case <-ticker.C: err := c.Join("", c.id, c.raftAddress, peerAddress) if err != nil { - c.logger.Warn().WithError(err).Log("unable to join cluster") + c.logger.Warn().WithError(err).Log("Join cluster") continue } @@ -322,7 +324,7 @@ func (c *cluster) CoreAPIAddress(raftAddress string) (string, error) { } func (c *cluster) Shutdown() error { - c.logger.Info().Log("shutting down cluster") + c.logger.Info().Log("Shutting down cluster") c.shutdownLock.Lock() defer c.shutdownLock.Unlock() @@ -373,7 +375,7 @@ func (c *cluster) Leave(origin, id string) error { c.logger.Debug().WithFields(log.Fields{ "nodeid": id, - }).Log("received leave request for node") + }).Log("Received leave request for server") if !c.IsRaftLeader() { // Tell the leader to remove us @@ -386,14 +388,14 @@ func (c *cluster) Leave(origin, id string) error { left := false limit := time.Now().Add(c.raftRemoveGracePeriod) for !left && time.Now().Before(limit) { - c.logger.Debug().Log("waiting for getting removed from the configuration") + c.logger.Debug().Log("Waiting for getting removed from the configuration") // Sleep a while before we check. time.Sleep(50 * time.Millisecond) // Get the latest configuration. servers, err := c.raft.Servers() if err != nil { - c.logger.Error().WithError(err).Log("failed to get raft configuration") + c.logger.Error().WithError(err).Log("Raft configuration") break } @@ -408,7 +410,7 @@ func (c *cluster) Leave(origin, id string) error { } if !left { - c.logger.Warn().Log("failed to leave raft configuration gracefully, timeout") + c.logger.Warn().Log("Failed to leave raft configuration gracefully, timeout") } return nil @@ -417,7 +419,7 @@ func (c *cluster) Leave(origin, id string) error { // Count the number of servers in the cluster servers, err := c.raft.Servers() if err != nil { - c.logger.Error().WithError(err).Log("failed to get raft configuration") + c.logger.Error().WithError(err).Log("Raft configuration") return err } @@ -427,20 +429,20 @@ func (c *cluster) Leave(origin, id string) error { // We're going to remove ourselves if numPeers <= 1 { // Don't do so if we're the only server in the cluster - c.logger.Debug().Log("we're the leader without any peers, not doing anything") + c.logger.Debug().Log("We're the leader without any peers, not doing anything") return nil } // Transfer the leadership to another server err := c.leadershipTransfer() if err != nil { - c.logger.Warn().WithError(err).Log("failed to transfer leadership") + c.logger.Warn().WithError(err).Log("Transfer leadership") return err } // Wait for new leader election for { - c.logger.Debug().Log("waiting for new leader election") + c.logger.Debug().Log("Waiting for new leader election") time.Sleep(50 * time.Millisecond) @@ -463,14 +465,14 @@ func (c *cluster) Leave(origin, id string) error { left := false limit := time.Now().Add(c.raftRemoveGracePeriod) for !left && time.Now().Before(limit) { - c.logger.Debug().Log("waiting for getting removed from the configuration") + c.logger.Debug().Log("Waiting for getting removed from the configuration") // Sleep a while before we check. time.Sleep(50 * time.Millisecond) // Get the latest configuration. servers, err := c.raft.Servers() if err != nil { - c.logger.Error().WithError(err).Log("failed to get raft configuration") + c.logger.Error().WithError(err).Log("Raft configuration") break } @@ -492,7 +494,7 @@ func (c *cluster) Leave(origin, id string) error { if err != nil { c.logger.Error().WithError(err).WithFields(log.Fields{ "nodeid": id, - }).Log("failed to remove node") + }).Log("Remove server") return err } @@ -502,16 +504,14 @@ func (c *cluster) Leave(origin, id string) error { func (c *cluster) Join(origin, id, raftAddress, peerAddress string) error { if !c.IsRaftLeader() { - c.logger.Debug().Log("not leader, forwarding to leader") + c.logger.Debug().Log("Not leader, forwarding to leader") return c.forwarder.Join(origin, id, raftAddress, peerAddress) } - c.logger.Debug().Log("leader: joining %s", raftAddress) - c.logger.Debug().WithFields(log.Fields{ "nodeid": id, "address": raftAddress, - }).Log("received join request for remote node") + }).Log("Received join request for remote server") // connect to the peer's API in order to find out if our version is compatible address, err := c.CoreAPIAddress(raftAddress) @@ -528,7 +528,7 @@ func (c *cluster) Join(origin, id, raftAddress, peerAddress string) error { servers, err := c.raft.Servers() if err != nil { - c.logger.Error().WithError(err).Log("failed to get raft configuration") + c.logger.Error().WithError(err).Log("Raft configuration") return err } @@ -545,14 +545,14 @@ func (c *cluster) Join(origin, id, raftAddress, peerAddress string) error { c.logger.Debug().WithFields(log.Fields{ "nodeid": id, "address": raftAddress, - }).Log("node is already member of cluster, ignoring join request") + }).Log("Server is already member of cluster, ignoring join request") } else { err := c.raft.RemoveServer(srv.ID) if err != nil { c.logger.Error().WithError(err).WithFields(log.Fields{ "nodeid": id, "address": raftAddress, - }).Log("error removing existing node") + }).Log("Removing existing node") return fmt.Errorf("error removing existing node %s at %s: %w", id, raftAddress, err) } } @@ -569,14 +569,14 @@ func (c *cluster) Join(origin, id, raftAddress, peerAddress string) error { c.logger.Info().WithFields(log.Fields{ "nodeid": id, "address": raftAddress, - }).Log("node joined successfully") + }).Log("Joined successfully") return nil } func (c *cluster) Snapshot() (io.ReadCloser, error) { if !c.IsRaftLeader() { - c.logger.Debug().Log("not leader, forwarding to leader") + c.logger.Debug().Log("Not leader, forwarding to leader") return c.forwarder.Snapshot() } @@ -593,7 +593,7 @@ func (c *cluster) trackNodeChanges() { // Get the latest configuration. servers, err := c.raft.Servers() if err != nil { - c.logger.Error().WithError(err).Log("failed to get raft configuration") + c.logger.Error().WithError(err).Log("Raft configuration") continue } @@ -614,7 +614,7 @@ func (c *cluster) trackNodeChanges() { c.logger.Warn().WithError(err).WithFields(log.Fields{ "id": id, "address": server.Address, - }).Log("Discovering core API address failed") + }).Log("Discovering core API address") continue } @@ -624,7 +624,7 @@ func (c *cluster) trackNodeChanges() { c.logger.Warn().WithError(err).WithFields(log.Fields{ "id": id, "address": server.Address, - }).Log("Connecting to core API failed") + }).Log("Connecting to core API") continue } @@ -632,7 +632,7 @@ func (c *cluster) trackNodeChanges() { c.logger.Warn().WithError(err).WithFields(log.Fields{ "id": id, "address": address, - }).Log("Adding node failed") + }).Log("Adding node") } c.nodes[id] = node @@ -667,7 +667,7 @@ func (c *cluster) trackLeaderChanges() { case leaderAddress := <-c.raftLeaderObservationCh: c.logger.Debug().WithFields(log.Fields{ "address": leaderAddress, - }).Log("new leader observation") + }).Log("Leader observation") if len(leaderAddress) != 0 { leaderAddress, _ = c.ClusterAPIAddress(leaderAddress) } @@ -773,7 +773,7 @@ func (c *cluster) About() (ClusterAbout, error) { servers, err := c.raft.Servers() if err != nil { - c.logger.Error().WithError(err).Log("failed to get raft configuration") + c.logger.Error().WithError(err).Log("Raft configuration") return ClusterAbout{}, err } @@ -808,14 +808,14 @@ func (c *cluster) sentinel() { "state": stats.State, "last_contact": stats.LastContact.String(), "num_peers": stats.NumPeers, - }).Log("stats") + }).Log("Stats") if stats.LastContact > 10*time.Second && !isEmergencyLeader { - c.logger.Warn().Log("force leadership due to lost contact to leader") + c.logger.Warn().Log("Force leadership due to lost contact to leader") c.raftEmergencyNotifyCh <- true isEmergencyLeader = true } else if stats.LastContact <= 10*time.Second && isEmergencyLeader { - c.logger.Warn().Log("stop forced leadership due to contact to leader") + c.logger.Warn().Log("Stop forced leadership due to contact to leader") c.raftEmergencyNotifyCh <- false isEmergencyLeader = false } diff --git a/cluster/forwarder/forwarder.go b/cluster/forwarder/forwarder.go index e93bf81e..a11183da 100644 --- a/cluster/forwarder/forwarder.go +++ b/cluster/forwarder/forwarder.go @@ -73,7 +73,7 @@ func (f *forwarder) SetLeader(address string) { return } - f.logger.Debug().Log("setting leader address to %s", address) + f.logger.Debug().Log("Setting leader address to %s", address) f.client.Address = address } @@ -93,7 +93,7 @@ func (f *forwarder) Join(origin, id, raftAddress, peerAddress string) error { RaftAddress: raftAddress, } - f.logger.Debug().WithField("request", r).Log("forwarding to leader") + f.logger.Debug().WithField("request", r).Log("Forwarding to leader") f.lock.RLock() client := f.client @@ -119,7 +119,7 @@ func (f *forwarder) Leave(origin, id string) error { ID: id, } - f.logger.Debug().WithField("request", r).Log("forwarding to leader") + f.logger.Debug().WithField("request", r).Log("Forwarding to leader") f.lock.RLock() client := f.client diff --git a/cluster/leader.go b/cluster/leader.go index c5ce188e..2696b994 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -83,14 +83,14 @@ func (c *cluster) monitorLeadership() { // shutdown any leader and emergency loop if weAreLeaderCh != nil { - c.logger.Debug().Log("shutting down leader loop") + c.logger.Debug().Log("Shutting down leader loop") close(weAreLeaderCh) leaderLoop.Wait() weAreLeaderCh = nil } if weAreEmergencyLeaderCh != nil { - c.logger.Debug().Log("shutting down emergency leader loop") + c.logger.Debug().Log("Shutting down emergency leader loop") close(weAreEmergencyLeaderCh) emergencyLeaderLoop.Wait() weAreEmergencyLeaderCh = nil @@ -103,7 +103,7 @@ func (c *cluster) monitorLeadership() { c.followerLoop(ch) }(weAreFollowerCh) - c.logger.Info().Log("cluster followship acquired") + c.logger.Info().Log("Cluster followship acquired") c.leaderLock.Lock() c.isRaftLeader = false @@ -117,14 +117,14 @@ func (c *cluster) monitorLeadership() { // shutdown any follower and emergency loop if weAreFollowerCh != nil { - c.logger.Debug().Log("shutting down follower loop") + c.logger.Debug().Log("Shutting down follower loop") close(weAreFollowerCh) followerLoop.Wait() weAreFollowerCh = nil } if weAreEmergencyLeaderCh != nil { - c.logger.Debug().Log("shutting down emergency leader loop") + c.logger.Debug().Log("Shutting down emergency leader loop") close(weAreEmergencyLeaderCh) emergencyLeaderLoop.Wait() weAreEmergencyLeaderCh = nil @@ -136,7 +136,7 @@ func (c *cluster) monitorLeadership() { defer leaderLoop.Done() c.leaderLoop(ch, false) }(weAreLeaderCh) - c.logger.Info().Log("cluster leadership acquired") + c.logger.Info().Log("Cluster leadership acquired") c.leaderLock.Lock() c.isRaftLeader = true @@ -150,14 +150,14 @@ func (c *cluster) monitorLeadership() { // shutdown any follower and leader loop if weAreFollowerCh != nil { - c.logger.Debug().Log("shutting down follower loop") + c.logger.Debug().Log("Shutting down follower loop") close(weAreFollowerCh) followerLoop.Wait() weAreFollowerCh = nil } if weAreLeaderCh != nil { - c.logger.Debug().Log("shutting down leader loop") + c.logger.Debug().Log("Shutting down leader loop") close(weAreLeaderCh) leaderLoop.Wait() weAreLeaderCh = nil @@ -169,7 +169,7 @@ func (c *cluster) monitorLeadership() { defer emergencyLeaderLoop.Done() c.leaderLoop(ch, true) }(weAreEmergencyLeaderCh) - c.logger.Info().Log("cluster emergency leadership acquired") + c.logger.Info().Log("Sluster emergency leadership acquired") c.leaderLock.Lock() c.isRaftLeader = false @@ -192,15 +192,15 @@ func (c *cluster) leadershipTransfer() error { c.logger.Error().WithError(err).WithFields(log.Fields{ "attempt": i, "retry_limit": retryCount, - }).Log("failed to transfer leadership attempt, will retry") + }).Log("Transfer leadership attempt, will retry") } else { c.logger.Info().WithFields(log.Fields{ "attempt": i, "retry_limit": retryCount, - }).Log("successfully transferred leadership") + }).Log("Successfully transferred leadership") for { - c.logger.Debug().Log("waiting for losing leadership") + c.logger.Debug().Log("Waiting for losing leadership") time.Sleep(50 * time.Millisecond) @@ -216,6 +216,7 @@ func (c *cluster) leadershipTransfer() error { return nil } } + return fmt.Errorf("failed to transfer leadership in %d attempts", retryCount) } @@ -232,7 +233,7 @@ RECONCILE: if !emergency { err := c.raft.Barrier(time.Minute) if err != nil { - c.logger.Error().WithError(err).Log("failed to wait for barrier") + c.logger.Error().WithError(err).Log("Wait for barrier") goto WAIT } } @@ -240,7 +241,7 @@ RECONCILE: // Check if we need to handle initial leadership actions if !establishedLeader { if err := c.establishLeadership(context.TODO()); err != nil { - c.logger.Error().WithError(err).Log("failed to establish leadership") + c.logger.Error().WithError(err).Log("Establish leadership") // Immediately revoke leadership since we didn't successfully // establish leadership. c.revokeLeadership() @@ -251,7 +252,7 @@ RECONCILE: // will try to acquire it again after // 5 seconds. if err := c.leadershipTransfer(); err != nil { - c.logger.Error().WithError(err).Log("failed to transfer leadership") + c.logger.Error().WithError(err).Log("Transfer leadership") interval = time.After(5 * time.Second) goto WAIT } @@ -285,7 +286,7 @@ WAIT: } func (c *cluster) establishLeadership(ctx context.Context) error { - c.logger.Debug().Log("establishing leadership") + c.logger.Debug().Log("Establishing leadership") ctx, cancel := context.WithCancel(ctx) c.cancelLeaderShip = cancel @@ -296,7 +297,7 @@ func (c *cluster) establishLeadership(ctx context.Context) error { } func (c *cluster) revokeLeadership() { - c.logger.Debug().Log("revoking leadership") + c.logger.Debug().Log("Revoking leadership") c.cancelLeaderShip() } @@ -361,7 +362,7 @@ func (c *cluster) applyOpStack(stack []interface{}) { c.logger.Info().WithError(err).WithFields(log.Fields{ "processid": v.config.ID, "nodeid": v.nodeid, - }).Log("Adding process failed") + }).Log("Adding process") break } err = c.proxy.ProcessStart(v.nodeid, v.config.ID) @@ -369,7 +370,7 @@ func (c *cluster) applyOpStack(stack []interface{}) { c.logger.Info().WithError(err).WithFields(log.Fields{ "processid": v.config.ID, "nodeid": v.nodeid, - }).Log("Starting process failed") + }).Log("Starting process") break } c.logger.Info().WithFields(log.Fields{ @@ -382,7 +383,7 @@ func (c *cluster) applyOpStack(stack []interface{}) { c.logger.Info().WithError(err).WithFields(log.Fields{ "processid": v.processid, "nodeid": v.nodeid, - }).Log("Removing process failed") + }).Log("Removing process") break } c.logger.Info().WithFields(log.Fields{ @@ -396,7 +397,7 @@ func (c *cluster) applyOpStack(stack []interface{}) { "processid": v.config.ID, "fromnodeid": v.fromNodeid, "tonodeid": v.toNodeid, - }).Log("Moving process, adding process failed") + }).Log("Moving process, adding process") break } err = c.proxy.ProcessDelete(v.fromNodeid, v.config.ID) @@ -405,7 +406,7 @@ func (c *cluster) applyOpStack(stack []interface{}) { "processid": v.config.ID, "fromnodeid": v.fromNodeid, "tonodeid": v.toNodeid, - }).Log("Moving process, removing process failed") + }).Log("Moving process, removing process") break } err = c.proxy.ProcessStart(v.toNodeid, v.config.ID) @@ -414,7 +415,7 @@ func (c *cluster) applyOpStack(stack []interface{}) { "processid": v.config.ID, "fromnodeid": v.fromNodeid, "tonodeid": v.toNodeid, - }).Log("Moving process, starting process failed") + }).Log("Moving process, starting process") break } c.logger.Info().WithFields(log.Fields{ @@ -428,7 +429,7 @@ func (c *cluster) applyOpStack(stack []interface{}) { c.logger.Info().WithError(err).WithFields(log.Fields{ "processid": v.processid, "nodeid": v.nodeid, - }).Log("Starting process failed") + }).Log("Starting process") break } c.logger.Info().WithFields(log.Fields{ @@ -453,6 +454,12 @@ func (c *cluster) doSynchronize() { have := c.proxy.ProcessList() resources := c.proxy.Resources() + c.logger.Debug().WithFields(log.Fields{ + "want": want, + "have": have, + "resources": resources, + }).Log("Synchronize") + opStack := synchronize(want, have, resources) c.applyOpStack(opStack) @@ -462,6 +469,11 @@ func (c *cluster) doRebalance() { have := c.proxy.ProcessList() resources := c.proxy.Resources() + c.logger.Debug().WithFields(log.Fields{ + "have": have, + "resources": resources, + }).Log("Rebalance") + opStack := rebalance(have, resources) c.applyOpStack(opStack) diff --git a/cluster/logger/logger.go b/cluster/logger/logger.go index 3e24f07b..630eb37d 100644 --- a/cluster/logger/logger.go +++ b/cluster/logger/logger.go @@ -4,6 +4,7 @@ import ( "io" golog "log" "os" + "strings" "github.com/datarhei/core/v16/log" @@ -44,6 +45,10 @@ func (l *hclogger) Log(level hclog.Level, msg string, args ...interface{}) { logger = logger.Error() } + if len(msg) != 0 { + msg = strings.ToUpper(msg[:1]) + msg[1:] + } + logger.Log(msg) } diff --git a/cluster/proxy/proxy.go b/cluster/proxy/proxy.go index cd2cad59..55f5f15e 100644 --- a/cluster/proxy/proxy.go +++ b/cluster/proxy/proxy.go @@ -193,7 +193,7 @@ func (p *proxy) Start() { p.running = true - p.logger.Debug().Log("starting proxy") + p.logger.Debug().Log("Starting proxy") ctx, cancel := context.WithCancel(context.Background()) p.cancel = cancel @@ -246,7 +246,7 @@ func (p *proxy) Stop() { p.running = false - p.logger.Debug().Log("stopping proxy") + p.logger.Debug().Log("Stopping proxy") p.cancel() p.cancel = nil diff --git a/cluster/raft/raft.go b/cluster/raft/raft.go index 2320ef2e..9f77c497 100644 --- a/cluster/raft/raft.go +++ b/cluster/raft/raft.go @@ -110,6 +110,12 @@ func New(config Config) (Raft, error) { raftAddress: config.Address, leadershipNotifyCh: config.LeadershipNotifyCh, leaderObservationCh: config.LeaderObservationCh, + shutdownCh: make(chan struct{}), + logger: config.Logger, + } + + if r.logger == nil { + r.logger = log.New("") } err := r.start(config.Store, config.Bootstrap, config.Recover, config.Peers, false) @@ -319,7 +325,7 @@ func (r *raft) start(fsm hcraft.FSM, bootstrap, recover bool, peers []Peer, inme return err } - r.logger.Debug().Log("address: %s", addr) + r.logger.Debug().Log("Address: %s", addr) transport, err := hcraft.NewTCPTransportWithLogger(r.raftAddress, addr, 3, 10*time.Second, raftlogger.New(r.logger, hclog.Debug).Named("raft-transport")) if err != nil { @@ -396,10 +402,10 @@ func (r *raft) start(fsm hcraft.FSM, bootstrap, recover bool, peers []Peer, inme return fmt.Errorf("bootstrapping cluster: %w", err) } - r.logger.Debug().Log("raft node bootstrapped") + r.logger.Debug().Log("Raft node bootstrapped") } else { // Recover cluster - fsm, err := store.NewStore() + fsm, err := store.NewStore(store.Config{}) if err != nil { return err } @@ -428,7 +434,7 @@ func (r *raft) start(fsm hcraft.FSM, bootstrap, recover bool, peers []Peer, inme return fmt.Errorf("recovering cluster: %w", err) } - r.logger.Debug().Log("raft node recoverd") + r.logger.Debug().Log("Raft node recoverd") } // Set up a channel for reliable leader notifications. @@ -446,7 +452,7 @@ func (r *raft) start(fsm hcraft.FSM, bootstrap, recover bool, peers []Peer, inme go r.trackLeaderChanges() go r.monitorLeadership() - r.logger.Debug().Log("raft started") + r.logger.Debug().Log("Raft started") return nil } @@ -490,7 +496,7 @@ func (r *raft) trackLeaderChanges() { r.logger.Debug().WithFields(log.Fields{ "id": leaderObs.LeaderID, "address": leaderObs.LeaderAddr, - }).Log("new leader observation") + }).Log("New leader observation") leaderAddress := string(leaderObs.LeaderAddr) @@ -501,7 +507,7 @@ func (r *raft) trackLeaderChanges() { } } } else { - r.logger.Debug().WithField("type", reflect.TypeOf(obs.Data)).Log("got unknown observation type from raft") + r.logger.Debug().WithField("type", reflect.TypeOf(obs.Data)).Log("Unknown observation type from raft") continue } case <-r.shutdownCh: diff --git a/cluster/store/store.go b/cluster/store/store.go index 9266ef95..9d32eaf3 100644 --- a/cluster/store/store.go +++ b/cluster/store/store.go @@ -6,6 +6,7 @@ import ( "io" "sync" + "github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/restream/app" "github.com/hashicorp/raft" @@ -42,27 +43,44 @@ type CommandRemoveProcess struct { type store struct { lock sync.RWMutex Process map[string]app.Config + + logger log.Logger } -func NewStore() (Store, error) { - return &store{ +type Config struct { + Logger log.Logger +} + +func NewStore(config Config) (Store, error) { + s := &store{ Process: map[string]app.Config{}, - }, nil + logger: config.Logger, + } + + if s.logger == nil { + s.logger = log.New("") + } + + return s, nil } -func (s *store) Apply(log *raft.Log) interface{} { - fmt.Printf("a log entry came in (index=%d, term=%d): %s\n", log.Index, log.Term, string(log.Data)) +func (s *store) Apply(entry *raft.Log) interface{} { + logger := s.logger.WithFields(log.Fields{ + "index": entry.Index, + "term": entry.Term, + }) + + logger.Debug().WithField("data", string(entry.Data)).Log("New entry") c := Command{} - err := json.Unmarshal(log.Data, &c) + err := json.Unmarshal(entry.Data, &c) if err != nil { - fmt.Printf("invalid log entry\n") - return nil + logger.Error().WithError(err).Log("Invalid entry") + return fmt.Errorf("invalid log entry") } - fmt.Printf("op: %s\n", c.Operation) - fmt.Printf("op: %+v\n", c) + logger.Debug().WithField("operation", c.Operation).Log("") switch c.Operation { case OpAddProcess: @@ -84,13 +102,13 @@ func (s *store) Apply(log *raft.Log) interface{} { } s.lock.RLock() - fmt.Printf("\n==> %+v\n\n", s.Process) + s.logger.Debug().WithField("processes", s.Process).Log("") s.lock.RUnlock() return nil } func (s *store) Snapshot() (raft.FSMSnapshot, error) { - fmt.Printf("a snapshot is requested\n") + s.logger.Debug().Log("Snapshot request") s.lock.Lock() defer s.lock.Unlock() @@ -106,7 +124,7 @@ func (s *store) Snapshot() (raft.FSMSnapshot, error) { } func (s *store) Restore(snapshot io.ReadCloser) error { - fmt.Printf("a snapshot is restored\n") + s.logger.Debug().Log("Snapshot restore") defer snapshot.Close()