mirror of
https://github.com/photoprism/photoprism.git
synced 2025-10-22 08:29:40 +08:00
CLI: Add cluster operations and management commands #98
Signed-off-by: Michael Mayer <michael@photoprism.app>
This commit is contained in:
295
internal/commands/cluster_register.go
Normal file
295
internal/commands/cluster_register.go
Normal file
@@ -0,0 +1,295 @@
|
||||
package commands
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/urfave/cli/v2"
|
||||
yaml "gopkg.in/yaml.v2"
|
||||
|
||||
"github.com/photoprism/photoprism/internal/config"
|
||||
"github.com/photoprism/photoprism/internal/service/cluster"
|
||||
"github.com/photoprism/photoprism/pkg/clean"
|
||||
"github.com/photoprism/photoprism/pkg/fs"
|
||||
"github.com/photoprism/photoprism/pkg/txt/report"
|
||||
)
|
||||
|
||||
// flags for register
|
||||
var (
|
||||
regNameFlag = &cli.StringFlag{Name: "name", Usage: "node `NAME` (lowercase letters, digits, hyphens)"}
|
||||
regTypeFlag = &cli.StringFlag{Name: "type", Usage: "node `TYPE` (instance, service)", Value: "instance"}
|
||||
regIntUrlFlag = &cli.StringFlag{Name: "internal-url", Usage: "internal service `URL`"}
|
||||
regLabelFlag = &cli.StringSliceFlag{Name: "label", Usage: "`k=v` label (repeatable)"}
|
||||
regRotateDB = &cli.BoolFlag{Name: "rotate", Usage: "rotates the node's database password"}
|
||||
regRotateSec = &cli.BoolFlag{Name: "rotate-secret", Usage: "rotates the node's secret used for JWT"}
|
||||
regPortalURL = &cli.StringFlag{Name: "portal-url", Usage: "Portal base `URL` (defaults to config)"}
|
||||
regPortalTok = &cli.StringFlag{Name: "portal-token", Usage: "Portal access `TOKEN` (defaults to config)"}
|
||||
regWriteConf = &cli.BoolFlag{Name: "write-config", Usage: "persists returned secrets and DB settings to local config"}
|
||||
regForceFlag = &cli.BoolFlag{Name: "force", Aliases: []string{"f"}, Usage: "confirm actions that may overwrite/replace local data (e.g., --write-config)"}
|
||||
)
|
||||
|
||||
// ClusterRegisterCommand registers a node with the Portal via HTTP.
|
||||
var ClusterRegisterCommand = &cli.Command{
|
||||
Name: "register",
|
||||
Usage: "Registers/rotates a node via Portal (HTTP)",
|
||||
Flags: append(append([]cli.Flag{regNameFlag, regTypeFlag, regIntUrlFlag, regLabelFlag, regRotateDB, regRotateSec, regPortalURL, regPortalTok, regWriteConf, regForceFlag, JsonFlag}, report.CliFlags...)),
|
||||
Action: clusterRegisterAction,
|
||||
}
|
||||
|
||||
func clusterRegisterAction(ctx *cli.Context) error {
|
||||
return CallWithDependencies(ctx, func(conf *config.Config) error {
|
||||
// Resolve inputs
|
||||
name := clean.TypeLowerDash(ctx.String("name"))
|
||||
if name == "" { // default from config if set
|
||||
name = clean.TypeLowerDash(conf.NodeName())
|
||||
}
|
||||
if name == "" {
|
||||
return fmt.Errorf("node name is required (use --name or set node-name)")
|
||||
}
|
||||
nodeType := clean.TypeLowerDash(ctx.String("type"))
|
||||
switch nodeType {
|
||||
case "instance", "service":
|
||||
default:
|
||||
return fmt.Errorf("invalid --type (must be instance or service)")
|
||||
}
|
||||
|
||||
portalURL := ctx.String("portal-url")
|
||||
if portalURL == "" {
|
||||
portalURL = conf.PortalUrl()
|
||||
}
|
||||
if portalURL == "" {
|
||||
return fmt.Errorf("portal URL is required (use --portal-url or set portal-url)")
|
||||
}
|
||||
token := ctx.String("portal-token")
|
||||
if token == "" {
|
||||
token = conf.PortalToken()
|
||||
}
|
||||
if token == "" {
|
||||
return fmt.Errorf("portal token is required (use --portal-token or set portal-token)")
|
||||
}
|
||||
|
||||
body := map[string]interface{}{
|
||||
"nodeName": name,
|
||||
"nodeType": nodeType,
|
||||
"labels": parseLabelSlice(ctx.StringSlice("label")),
|
||||
"internalUrl": ctx.String("internal-url"),
|
||||
"rotate": ctx.Bool("rotate"),
|
||||
"rotateSecret": ctx.Bool("rotate-secret"),
|
||||
}
|
||||
b, _ := json.Marshal(body)
|
||||
|
||||
// POST with bounded backoff on 429
|
||||
url := stringsTrimRightSlash(portalURL) + "/api/v1/cluster/nodes/register"
|
||||
var resp cluster.RegisterResponse
|
||||
if err := postWithBackoff(url, token, b, &resp); err != nil {
|
||||
var httpErr *httpError
|
||||
if errors.As(err, &httpErr) && httpErr.Status == http.StatusTooManyRequests {
|
||||
return fmt.Errorf("portal rate-limited registration attempts")
|
||||
}
|
||||
// Map common errors
|
||||
if errors.As(err, &httpErr) {
|
||||
switch httpErr.Status {
|
||||
case http.StatusUnauthorized, http.StatusForbidden:
|
||||
return fmt.Errorf("%s", httpErr.Error())
|
||||
case http.StatusConflict:
|
||||
return fmt.Errorf("%s", httpErr.Error())
|
||||
case http.StatusBadRequest:
|
||||
return fmt.Errorf("%s", httpErr.Error())
|
||||
case http.StatusNotFound:
|
||||
return fmt.Errorf("%s", httpErr.Error())
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Output
|
||||
if ctx.Bool("json") {
|
||||
jb, _ := json.Marshal(resp)
|
||||
fmt.Println(string(jb))
|
||||
} else {
|
||||
// Human-readable: node row and credentials if present
|
||||
cols := []string{"ID", "Name", "Type", "DB Name", "DB User", "Host", "Port"}
|
||||
var dbName, dbUser string
|
||||
if resp.DB.Name != "" {
|
||||
dbName = resp.DB.Name
|
||||
}
|
||||
if resp.DB.User != "" {
|
||||
dbUser = resp.DB.User
|
||||
}
|
||||
rows := [][]string{{resp.Node.ID, resp.Node.Name, resp.Node.Type, dbName, dbUser, resp.DB.Host, fmt.Sprintf("%d", resp.DB.Port)}}
|
||||
out, _ := report.RenderFormat(rows, cols, report.CliFormat(ctx))
|
||||
fmt.Printf("\n%s\n", out)
|
||||
|
||||
// Secrets/credentials block if any
|
||||
// Show secrets in up to two tables, then print DSN if present
|
||||
if (resp.Secrets != nil && resp.Secrets.NodeSecret != "") || resp.DB.Password != "" {
|
||||
fmt.Println("PLEASE WRITE DOWN THE FOLLOWING CREDENTIALS; THEY WILL NOT BE SHOWN AGAIN:")
|
||||
if resp.Secrets != nil && resp.Secrets.NodeSecret != "" && resp.DB.Password != "" {
|
||||
fmt.Printf("\n%s\n", report.Credentials("Node Secret", resp.Secrets.NodeSecret, "DB Password", resp.DB.Password))
|
||||
} else if resp.Secrets != nil && resp.Secrets.NodeSecret != "" {
|
||||
fmt.Printf("\n%s\n", report.Credentials("Node Secret", resp.Secrets.NodeSecret, "", ""))
|
||||
} else if resp.DB.Password != "" {
|
||||
fmt.Printf("\n%s\n", report.Credentials("DB User", resp.DB.User, "DB Password", resp.DB.Password))
|
||||
}
|
||||
if resp.DB.DSN != "" {
|
||||
fmt.Printf("DSN: %s\n", resp.DB.DSN)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Optional persistence
|
||||
if ctx.Bool("write-config") {
|
||||
if err := persistRegisterResponse(conf, &resp); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// HTTP helpers and backoff
|
||||
|
||||
type httpError struct {
|
||||
Status int
|
||||
Body string
|
||||
}
|
||||
|
||||
func (e *httpError) Error() string { return fmt.Sprintf("http %d: %s", e.Status, e.Body) }
|
||||
|
||||
func postWithBackoff(url, token string, payload []byte, out any) error {
|
||||
// backoff: 500ms -> max ~8s, 6 attempts with jitter
|
||||
delay := 500 * time.Millisecond
|
||||
for attempt := 0; attempt < 6; attempt++ {
|
||||
req, _ := http.NewRequest(http.MethodPost, url, bytes.NewReader(payload))
|
||||
req.Header.Set("Authorization", "Bearer "+token)
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode == http.StatusTooManyRequests {
|
||||
// backoff and retry
|
||||
time.Sleep(jitter(delay, 0.25))
|
||||
if delay < 8*time.Second {
|
||||
delay *= 2
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
b, _ := io.ReadAll(resp.Body)
|
||||
return &httpError{Status: resp.StatusCode, Body: string(b)}
|
||||
}
|
||||
dec := json.NewDecoder(resp.Body)
|
||||
return dec.Decode(out)
|
||||
}
|
||||
return &httpError{Status: http.StatusTooManyRequests, Body: "rate limited"}
|
||||
}
|
||||
|
||||
func jitter(d time.Duration, frac float64) time.Duration {
|
||||
// simple +/- jitter
|
||||
n := time.Duration(float64(d) * (1 + (randFloat()*2-1)*frac))
|
||||
if n <= 0 {
|
||||
return d
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
// tiny rand without pulling math/rand global state unpredictably
|
||||
func randFloat() float64 { return float64(time.Now().UnixNano()%1000) / 1000.0 }
|
||||
|
||||
func stringsTrimRightSlash(s string) string {
|
||||
for len(s) > 0 && s[len(s)-1] == '/' {
|
||||
s = s[:len(s)-1]
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// Persistence helpers for --write-config
|
||||
func parseLabelSlice(labels []string) map[string]string {
|
||||
if len(labels) == 0 {
|
||||
return nil
|
||||
}
|
||||
m := make(map[string]string)
|
||||
for _, kv := range labels {
|
||||
if i := bytes.IndexByte([]byte(kv), '='); i > 0 && i < len(kv)-1 {
|
||||
k := kv[:i]
|
||||
v := kv[i+1:]
|
||||
m[k] = v
|
||||
}
|
||||
}
|
||||
if len(m) == 0 {
|
||||
return nil
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
// Persistence helpers for --write-config
|
||||
func persistRegisterResponse(conf *config.Config, resp *cluster.RegisterResponse) error {
|
||||
// Node secret file
|
||||
if resp.Secrets != nil && resp.Secrets.NodeSecret != "" {
|
||||
// Prefer PHOTOPRISM_NODE_SECRET_FILE; otherwise config cluster path
|
||||
fileName := os.Getenv(config.FlagFileVar("NODE_SECRET"))
|
||||
if fileName == "" {
|
||||
fileName = filepath.Join(conf.PortalConfigPath(), "node-secret")
|
||||
}
|
||||
if err := fs.MkdirAll(filepath.Dir(fileName)); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := os.WriteFile(fileName, []byte(resp.Secrets.NodeSecret), 0o600); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("wrote node secret to %s", clean.Log(fileName))
|
||||
}
|
||||
|
||||
// DB settings (MySQL/MariaDB only)
|
||||
if resp.DB.Name != "" && resp.DB.User != "" {
|
||||
if err := mergeOptionsYaml(conf, map[string]any{
|
||||
"DatabaseDriver": config.MySQL,
|
||||
"DatabaseName": resp.DB.Name,
|
||||
"DatabaseServer": fmt.Sprintf("%s:%d", resp.DB.Host, resp.DB.Port),
|
||||
"DatabaseUser": resp.DB.User,
|
||||
"DatabasePassword": resp.DB.Password,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("updated options.yml with database settings for node %s", clean.LogQuote(resp.Node.Name))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func mergeOptionsYaml(conf *config.Config, kv map[string]any) error {
|
||||
fileName := conf.OptionsYaml()
|
||||
if err := fs.MkdirAll(filepath.Dir(fileName)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var m map[string]any
|
||||
if fs.FileExists(fileName) {
|
||||
if b, err := os.ReadFile(fileName); err == nil && len(b) > 0 {
|
||||
_ = yaml.Unmarshal(b, &m)
|
||||
}
|
||||
}
|
||||
if m == nil {
|
||||
m = map[string]any{}
|
||||
}
|
||||
for k, v := range kv {
|
||||
m[k] = v
|
||||
}
|
||||
b, err := yaml.Marshal(m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return os.WriteFile(fileName, b, 0o644)
|
||||
}
|
Reference in New Issue
Block a user