From 65e01112a18e5511f0fbdedf77b67c7dbc0796e3 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Fri, 26 May 2023 20:24:13 +0200 Subject: [PATCH] Send circuit breaker in the header --- cluster/api.go | 87 +++++++++++++++------------------- cluster/client/client.go | 80 ++++++++++--------------------- cluster/forwarder/forwarder.go | 35 ++++---------- 3 files changed, 72 insertions(+), 130 deletions(-) diff --git a/cluster/api.go b/cluster/api.go index 4510ee6f..9a4e9c9f 100644 --- a/cluster/api.go +++ b/cluster/api.go @@ -74,7 +74,7 @@ func NewAPI(config APIConfig) (API, error) { })) a.router.Logger.SetOutput(httplog.NewWrapper(a.logger)) - a.router.POST("/v1/join", func(c echo.Context) error { + a.router.POST("/v1/server", func(c echo.Context) error { r := client.JoinRequest{} if err := util.ShouldBindJSON(c, &r); err != nil { @@ -86,11 +86,13 @@ func NewAPI(config APIConfig) (API, error) { "request": r, }).Log("Join request: %+v", r) - if r.Origin == a.id { + origin := c.Request().Header.Get("X-Cluster-Origin") + + if origin == a.id { return httpapi.Err(http.StatusLoopDetected, "", "breaking circuit") } - err := a.cluster.Join(r.Origin, r.ID, r.RaftAddress, "") + err := a.cluster.Join(origin, r.ID, r.RaftAddress, "") if err != nil { a.logger.Debug().WithError(err).WithField("id", r.ID).Log("Unable to join cluster") return httpapi.Err(http.StatusInternalServerError, "unable to join cluster", "%s", err) @@ -99,25 +101,22 @@ func NewAPI(config APIConfig) (API, error) { return c.JSON(http.StatusOK, "OK") }) - a.router.POST("/v1/leave", func(c echo.Context) error { - r := client.LeaveRequest{} - - if err := util.ShouldBindJSON(c, &r); err != nil { - return httpapi.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) - } + a.router.POST("/v1/server/:id", func(c echo.Context) error { + id := util.PathParam(c, "id") a.logger.Debug().WithFields(log.Fields{ - "id": r.ID, - "request": r, - }).Log("Leave request: %+v", r) + "id": id, + }).Log("Leave request") - if r.Origin == a.id { + origin := c.Request().Header.Get("X-Cluster-Origin") + + if origin == a.id { return httpapi.Err(http.StatusLoopDetected, "", "breaking circuit") } - err := a.cluster.Leave(r.Origin, r.ID) + err := a.cluster.Leave(origin, id) if err != nil { - a.logger.Debug().WithError(err).WithField("id", r.ID).Log("Unable to leave cluster") + a.logger.Debug().WithError(err).WithField("id", id).Log("Unable to leave cluster") return httpapi.Err(http.StatusInternalServerError, "unable to leave cluster", "%s", err) } @@ -143,13 +142,15 @@ func NewAPI(config APIConfig) (API, error) { return httpapi.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) } - if r.Origin == a.id { + origin := c.Request().Header.Get("X-Cluster-Origin") + + if origin == a.id { return httpapi.Err(http.StatusLoopDetected, "", "breaking circuit") } a.logger.Debug().WithField("id", r.Config.ID).Log("Add process request") - err := a.cluster.AddProcess(r.Origin, &r.Config) + err := a.cluster.AddProcess(origin, &r.Config) if err != nil { a.logger.Debug().WithError(err).WithField("id", r.Config.ID).Log("Unable to add process") return httpapi.Err(http.StatusInternalServerError, "unable to add process", "%s", err) @@ -158,28 +159,20 @@ func NewAPI(config APIConfig) (API, error) { return c.JSON(http.StatusOK, "OK") }) - a.router.POST("/v1/process/:id", func(c echo.Context) error { + a.router.DELETE("/v1/process/:id", func(c echo.Context) error { id := util.PathParam(c, "id") - r := client.RemoveProcessRequest{} + origin := c.Request().Header.Get("X-Cluster-Origin") - if err := util.ShouldBindJSON(c, &r); err != nil { - return httpapi.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) - } - - if r.Origin == a.id { + if origin == a.id { return httpapi.Err(http.StatusLoopDetected, "", "breaking circuit") } - if id != r.ID { - return httpapi.Err(http.StatusBadRequest, "Invalid data", "the ID in the path and the request do not match") - } + a.logger.Debug().WithField("id", id).Log("Remove process request") - a.logger.Debug().WithField("id", r.ID).Log("Remove process request") - - err := a.cluster.RemoveProcess(r.Origin, r.ID) + err := a.cluster.RemoveProcess(origin, id) if err != nil { - a.logger.Debug().WithError(err).WithField("id", r.ID).Log("Unable to remove process") + a.logger.Debug().WithError(err).WithField("id", id).Log("Unable to remove process") return httpapi.Err(http.StatusInternalServerError, "unable to remove process", "%s", err) } @@ -195,7 +188,9 @@ func NewAPI(config APIConfig) (API, error) { return httpapi.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) } - if r.Origin == a.id { + origin := c.Request().Header.Get("X-Cluster-Origin") + + if origin == a.id { return httpapi.Err(http.StatusLoopDetected, "", "breaking circuit") } @@ -208,7 +203,7 @@ func NewAPI(config APIConfig) (API, error) { "new_id": r.Config.ID, }).Log("Update process request") - err := a.cluster.UpdateProcess(r.Origin, r.ID, &r.Config) + err := a.cluster.UpdateProcess(origin, r.ID, &r.Config) if err != nil { a.logger.Debug().WithError(err).WithField("id", r.ID).Log("Unable to update process") return httpapi.Err(http.StatusInternalServerError, "unable to update process", "%s", err) @@ -224,13 +219,15 @@ func NewAPI(config APIConfig) (API, error) { return httpapi.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) } - if r.Origin == a.id { + origin := c.Request().Header.Get("X-Cluster-Origin") + + if origin == a.id { return httpapi.Err(http.StatusLoopDetected, "", "breaking circuit") } a.logger.Debug().WithField("identity", r.Identity).Log("Add identity request") - err := a.cluster.AddIdentity(r.Origin, r.Identity) + err := a.cluster.AddIdentity(origin, r.Identity) if err != nil { a.logger.Debug().WithError(err).WithField("identity", r.Identity).Log("Unable to add identity") return httpapi.Err(http.StatusInternalServerError, "unable to add identity", "%s", err) @@ -239,28 +236,20 @@ func NewAPI(config APIConfig) (API, error) { return c.JSON(http.StatusOK, "OK") }) - a.router.POST("/v1/iam/user/:name", func(c echo.Context) error { + a.router.DELETE("/v1/iam/user/:name", func(c echo.Context) error { name := util.PathParam(c, "name") - r := client.RemoveIdentityRequest{} + origin := c.Request().Header.Get("X-Cluster-Origin") - if err := util.ShouldBindJSON(c, &r); err != nil { - return httpapi.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) - } - - if r.Origin == a.id { + if origin == a.id { return httpapi.Err(http.StatusLoopDetected, "", "breaking circuit") } - if name != r.Name { - return httpapi.Err(http.StatusBadRequest, "Invalid data", "the name in the path and the request do not match") - } + a.logger.Debug().WithField("identity", name).Log("Remove identity request") - a.logger.Debug().WithField("identity", r.Name).Log("Remove identity request") - - err := a.cluster.RemoveIdentity(r.Origin, r.Name) + err := a.cluster.RemoveIdentity(origin, name) if err != nil { - a.logger.Debug().WithError(err).WithField("identity", r.Name).Log("Unable to remove identity") + a.logger.Debug().WithError(err).WithField("identity", name).Log("Unable to remove identity") return httpapi.Err(http.StatusInternalServerError, "unable to remove identity", "%s", err) } diff --git a/cluster/client/client.go b/cluster/client/client.go index e9688c8c..238e533d 100644 --- a/cluster/client/client.go +++ b/cluster/client/client.go @@ -14,49 +14,34 @@ import ( ) type JoinRequest struct { - Origin string `json:"origin"` ID string `json:"id"` RaftAddress string `json:"raft_address"` } type LeaveRequest struct { - Origin string `json:"origin"` - ID string `json:"id"` + ID string `json:"id"` } type AddProcessRequest struct { - Origin string `json:"origin"` Config app.Config `json:"config"` } -type RemoveProcessRequest struct { - Origin string `json:"origin"` - ID string `json:"id"` -} - type UpdateProcessRequest struct { - Origin string `json:"origin"` ID string `json:"id"` Config app.Config `json:"config"` } type AddIdentityRequest struct { - Origin string `json:"origin"` Identity iamidentity.User `json:"identity"` } -type RemoveIdentityRequest struct { - Origin string `json:"origin"` - Name string `json:"name"` -} - type APIClient struct { Address string Client *http.Client } func (c *APIClient) CoreAPIAddress() (string, error) { - data, err := c.call(http.MethodGet, "/core", "", nil) + data, err := c.call(http.MethodGet, "/core", "", nil, "") if err != nil { return "", err } @@ -70,88 +55,73 @@ func (c *APIClient) CoreAPIAddress() (string, error) { return address, nil } -func (c *APIClient) Join(r JoinRequest) error { +func (c *APIClient) Join(origin string, r JoinRequest) error { data, err := json.Marshal(&r) if err != nil { return err } - _, err = c.call(http.MethodPost, "/join", "application/json", bytes.NewReader(data)) + _, err = c.call(http.MethodPost, "/server", "application/json", bytes.NewReader(data), origin) return err } -func (c *APIClient) Leave(r LeaveRequest) error { - data, err := json.Marshal(&r) - if err != nil { - return err - } - - _, err = c.call(http.MethodPost, "/leave", "application/json", bytes.NewReader(data)) +func (c *APIClient) Leave(origin string, id string) error { + _, err := c.call(http.MethodDelete, "/server/"+id, "application/json", nil, origin) return err } -func (c *APIClient) AddProcess(r AddProcessRequest) error { +func (c *APIClient) AddProcess(origin string, r AddProcessRequest) error { data, err := json.Marshal(r) if err != nil { return err } - _, err = c.call(http.MethodPost, "/process", "application/json", bytes.NewReader(data)) + _, err = c.call(http.MethodPost, "/process", "application/json", bytes.NewReader(data), origin) return err } -func (c *APIClient) RemoveProcess(r RemoveProcessRequest) error { +func (c *APIClient) RemoveProcess(origin string, id string) error { + _, err := c.call(http.MethodDelete, "/process/"+id, "application/json", nil, origin) + + return err +} + +func (c *APIClient) UpdateProcess(origin string, r UpdateProcessRequest) error { data, err := json.Marshal(r) if err != nil { return err } - _, err = c.call(http.MethodPost, "/process/"+r.ID, "application/json", bytes.NewReader(data)) + _, err = c.call(http.MethodPut, "/process/"+r.ID, "application/json", bytes.NewReader(data), origin) return err } -func (c *APIClient) UpdateProcess(r UpdateProcessRequest) error { +func (c *APIClient) AddIdentity(origin string, r AddIdentityRequest) error { data, err := json.Marshal(r) if err != nil { return err } - _, err = c.call(http.MethodPut, "/process/"+r.ID, "application/json", bytes.NewReader(data)) + _, err = c.call(http.MethodPost, "/iam/user", "application/json", bytes.NewReader(data), origin) return err } -func (c *APIClient) AddIdentity(r AddIdentityRequest) error { - data, err := json.Marshal(r) - if err != nil { - return err - } - - _, err = c.call(http.MethodPost, "/iam/user", "application/json", bytes.NewReader(data)) - - return err -} - -func (c *APIClient) RemoveIdentity(r RemoveIdentityRequest) error { - data, err := json.Marshal(r) - if err != nil { - return err - } - - _, err = c.call(http.MethodPost, "/iam/user/:name", "application/json", bytes.NewReader(data)) +func (c *APIClient) RemoveIdentity(origin string, name string) error { + _, err := c.call(http.MethodDelete, "/iam/user/"+name, "application/json", nil, origin) return err } func (c *APIClient) Snapshot() (io.ReadCloser, error) { - return c.stream(http.MethodGet, "/snapshot", "", nil) + return c.stream(http.MethodGet, "/snapshot", "", nil, "") } -func (c *APIClient) stream(method, path, contentType string, data io.Reader) (io.ReadCloser, error) { +func (c *APIClient) stream(method, path, contentType string, data io.Reader, origin string) (io.ReadCloser, error) { if len(c.Address) == 0 { return nil, fmt.Errorf("no address defined") } @@ -163,6 +133,8 @@ func (c *APIClient) stream(method, path, contentType string, data io.Reader) (io return nil, err } + req.Header.Add("X-Cluster-Origin", origin) + if method == "POST" || method == "PUT" { req.Header.Add("Content-Type", contentType) } @@ -187,8 +159,8 @@ func (c *APIClient) stream(method, path, contentType string, data io.Reader) (io return body, nil } -func (c *APIClient) call(method, path, contentType string, data io.Reader) ([]byte, error) { - body, err := c.stream(method, path, contentType, data) +func (c *APIClient) call(method, path, contentType string, data io.Reader, origin string) ([]byte, error) { + body, err := c.stream(method, path, contentType, data, origin) if err != nil { return nil, err } diff --git a/cluster/forwarder/forwarder.go b/cluster/forwarder/forwarder.go index e75af10a..b88f5b13 100644 --- a/cluster/forwarder/forwarder.go +++ b/cluster/forwarder/forwarder.go @@ -93,7 +93,6 @@ func (f *forwarder) Join(origin, id, raftAddress, peerAddress string) error { } r := apiclient.JoinRequest{ - Origin: origin, ID: id, RaftAddress: raftAddress, } @@ -111,7 +110,7 @@ func (f *forwarder) Join(origin, id, raftAddress, peerAddress string) error { } } - return client.Join(r) + return client.Join(origin, r) } func (f *forwarder) Leave(origin, id string) error { @@ -119,18 +118,13 @@ func (f *forwarder) Leave(origin, id string) error { origin = f.id } - r := apiclient.LeaveRequest{ - Origin: origin, - ID: id, - } - - f.logger.Debug().WithField("request", r).Log("Forwarding to leader") + f.logger.Debug().WithField("id", id).Log("Forwarding to leader") f.lock.RLock() client := f.client f.lock.RUnlock() - return client.Leave(r) + return client.Leave(origin, id) } func (f *forwarder) Snapshot() (io.ReadCloser, error) { @@ -147,7 +141,6 @@ func (f *forwarder) AddProcess(origin string, config *app.Config) error { } r := apiclient.AddProcessRequest{ - Origin: origin, Config: *config, } @@ -155,7 +148,7 @@ func (f *forwarder) AddProcess(origin string, config *app.Config) error { client := f.client f.lock.RUnlock() - return client.AddProcess(r) + return client.AddProcess(origin, r) } func (f *forwarder) UpdateProcess(origin, id string, config *app.Config) error { @@ -164,7 +157,6 @@ func (f *forwarder) UpdateProcess(origin, id string, config *app.Config) error { } r := apiclient.UpdateProcessRequest{ - Origin: origin, ID: id, Config: *config, } @@ -173,7 +165,7 @@ func (f *forwarder) UpdateProcess(origin, id string, config *app.Config) error { client := f.client f.lock.RUnlock() - return client.UpdateProcess(r) + return client.UpdateProcess(origin, r) } func (f *forwarder) RemoveProcess(origin, id string) error { @@ -181,16 +173,11 @@ func (f *forwarder) RemoveProcess(origin, id string) error { origin = f.id } - r := apiclient.RemoveProcessRequest{ - Origin: origin, - ID: id, - } - f.lock.RLock() client := f.client f.lock.RUnlock() - return client.RemoveProcess(r) + return client.RemoveProcess(origin, id) } func (f *forwarder) AddIdentity(origin string, identity iamidentity.User) error { @@ -199,7 +186,6 @@ func (f *forwarder) AddIdentity(origin string, identity iamidentity.User) error } r := apiclient.AddIdentityRequest{ - Origin: origin, Identity: identity, } @@ -207,7 +193,7 @@ func (f *forwarder) AddIdentity(origin string, identity iamidentity.User) error client := f.client f.lock.RUnlock() - return client.AddIdentity(r) + return client.AddIdentity(origin, r) } func (f *forwarder) RemoveIdentity(origin string, name string) error { @@ -215,14 +201,9 @@ func (f *forwarder) RemoveIdentity(origin string, name string) error { origin = f.id } - r := apiclient.RemoveIdentityRequest{ - Origin: origin, - Name: name, - } - f.lock.RLock() client := f.client f.lock.RUnlock() - return client.RemoveIdentity(r) + return client.RemoveIdentity(origin, name) }