feat: add syncthing re-connect (#315)

This commit is contained in:
naison
2024-08-03 15:02:48 +08:00
committed by GitHub
parent 0826f2e20c
commit 947d50af85
4 changed files with 149 additions and 22 deletions

View File

@@ -15,6 +15,8 @@ const (
// EnvDisableSyncthingLog disable syncthing log, because it can not set output writer, only write os.Stdout or io.Discard
EnvDisableSyncthingLog = "LOGGER_DISCARD"
SyncthingAPIKey = "kubevpn"
)
var LocalCert tls.Certificate

View File

@@ -7,6 +7,7 @@ import (
"net"
"net/url"
"path/filepath"
"sort"
"strconv"
"strings"
"time"
@@ -15,6 +16,7 @@ import (
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
libconfig "github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/netutil"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
@@ -30,6 +32,8 @@ import (
"k8s.io/client-go/util/retry"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/cmd/util/podcmd"
"k8s.io/kubectl/pkg/polymorphichelpers"
"k8s.io/kubectl/pkg/util/podutils"
"k8s.io/utils/pointer"
"k8s.io/utils/ptr"
@@ -401,33 +405,81 @@ func (d *CloneOptions) DoClone(ctx context.Context, kubeconfigJsonBytes []byte)
}
log.Infof("Create clone resource %s/%s in target cluster", u.GetObjectKind().GroupVersionKind().GroupKind().String(), u.GetName())
log.Infof("Wait for clone resource %s/%s to be ready", u.GetObjectKind().GroupVersionKind().GroupKind().String(), u.GetName())
log.Infoln()
err = util.WaitPodToBeReady(ctx, d.targetClientset.CoreV1().Pods(d.TargetNamespace), metav1.LabelSelector{MatchLabels: labelsMap})
if err != nil {
return err
}
_ = util.RolloutStatus(ctx, d.factory, d.Namespace, workload, time.Minute*60)
if d.LocalDir == "" {
continue
if d.LocalDir != "" {
err = d.SyncDir(ctx, fields.SelectorFromSet(labelsMap).String())
if err != nil {
return err
}
}
list, err := util.GetRunningPodList(ctx, d.targetClientset, d.TargetNamespace, fields.SelectorFromSet(labelsMap).String())
if err != nil {
return err
}
remoteAddr := net.JoinHostPort(list[0].Status.PodIP, strconv.Itoa(libconfig.DefaultTCPPort))
localPort, _ := util.GetAvailableTCPPortOrDie()
localAddr := net.JoinHostPort("127.0.0.1", strconv.Itoa(localPort))
err = syncthing.StartClient(d.ctx, d.LocalDir, localAddr, remoteAddr)
if err != nil {
return err
}
d.syncthingGUIAddr = (&url.URL{Scheme: "http", Host: localAddr}).String()
log.Infof("Access the syncthing GUI via the following URL: %s", d.syncthingGUIAddr)
}
return nil
}
func (d *CloneOptions) SyncDir(ctx context.Context, labels string) error {
list, err := util.GetRunningPodList(ctx, d.targetClientset, d.TargetNamespace, labels)
if err != nil {
return err
}
remoteAddr := net.JoinHostPort(list[0].Status.PodIP, strconv.Itoa(libconfig.DefaultTCPPort))
localPort, _ := util.GetAvailableTCPPortOrDie()
localAddr := net.JoinHostPort("127.0.0.1", strconv.Itoa(localPort))
err = syncthing.StartClient(d.ctx, d.LocalDir, localAddr, remoteAddr)
if err != nil {
return err
}
d.syncthingGUIAddr = (&url.URL{Scheme: "http", Host: localAddr}).String()
log.Infof("Access the syncthing GUI via the following URL: %s", d.syncthingGUIAddr)
go func() {
client := syncthing.NewClient(localAddr)
podName := list[0].Name
for d.ctx.Err() == nil {
func() {
defer time.Sleep(time.Second * 2)
util.CheckPodStatus(d.ctx, func() {}, podName, d.targetClientset.CoreV1().Pods(d.TargetNamespace))
sortBy := func(pods []*v1.Pod) sort.Interface { return sort.Reverse(podutils.ActivePods(pods)) }
_, _, _ = polymorphichelpers.GetFirstPod(d.targetClientset.CoreV1(), d.TargetNamespace, labels, time.Second*30, sortBy)
list, err := util.GetRunningPodList(d.ctx, d.targetClientset, d.TargetNamespace, labels)
if err != nil {
log.Error(err)
return
}
if podName == list[0].Name {
return
}
podName = list[0].Name
log.Debugf("Detect newer pod %s", podName)
var conf *libconfig.Configuration
conf, err = client.GetConfig(d.ctx)
if err != nil {
log.Errorf("Failed to get config from syncthing: %v", err)
return
}
for i := range conf.Devices {
if config.RemoteDeviceID.Equals(conf.Devices[i].DeviceID) {
addr := netutil.AddressURL("tcp", net.JoinHostPort(list[0].Status.PodIP, strconv.Itoa(libconfig.DefaultTCPPort)))
conf.Devices[i].Addresses = []string{addr}
log.Debugf("Use newer remote syncthing endpoint: %s", addr)
}
}
err = client.PutConfig(d.ctx, conf)
if err != nil {
log.Errorf("Failed to set config to syncthing: %v", err)
}
}()
}
}()
return nil
}
func RemoveUselessInfo(u *unstructured.Unstructured) {
if u == nil {
return

71
pkg/syncthing/api.go Normal file
View File

@@ -0,0 +1,71 @@
package syncthing
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"path"
"time"
"github.com/syncthing/syncthing/lib/config"
config2 "github.com/wencaiwulue/kubevpn/v2/pkg/config"
)
type Client struct {
GUIAddress string
client *http.Client
}
func NewClient(addr string) *Client {
c := &Client{
GUIAddress: addr,
client: &http.Client{Timeout: 5 * time.Second},
}
return c
}
func (c *Client) GetConfig(ctx context.Context) (*config.Configuration, error) {
buf, err := c.Call(ctx, "rest/config", "GET", nil)
var configuration config.Configuration
err = json.Unmarshal(buf, &configuration)
if err != nil {
return nil, err
}
return &configuration, nil
}
func (c *Client) PutConfig(ctx context.Context, conf *config.Configuration) error {
marshal, err := json.Marshal(conf)
if err != nil {
return err
}
_, err = c.Call(ctx, "rest/config", "PUT", marshal)
return err
}
func (c *Client) Call(ctx context.Context, uri, method string, body []byte) ([]byte, error) {
var url = path.Join(c.GUIAddress, uri)
req, err := http.NewRequest(method, fmt.Sprintf("http://%s", url), bytes.NewBuffer(body))
if err != nil {
return nil, fmt.Errorf("failed to initialize syncthing API request: %w", err)
}
req = req.WithContext(ctx)
req.Header.Set("X-API-Key", config2.SyncthingAPIKey)
resp, err := c.client.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to call syncthing [%s]: %w", uri, err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return nil, fmt.Errorf("unexpected response from syncthing [%s | %d]: %s", req.URL.String(), resp.StatusCode, string(body))
}
body, err = io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response from syncthing [%s]: %w", uri, err)
}
return body, nil
}

View File

@@ -23,11 +23,12 @@ import (
)
const (
dir = "dir"
local = "local"
remote = "remote"
label = "kubevpn"
guiPort = 8384
dir = "dir"
local = "local"
remote = "remote"
label = "kubevpn"
guiPort = 8384
rescanInterval = 3
)
var (
@@ -86,7 +87,7 @@ func StartClient(ctx context.Context, localDir string, localAddr, remoteAddr str
Paused: false,
FSWatcherEnabled: true,
FSWatcherDelayS: 0.01,
RescanIntervalS: 2,
RescanIntervalS: rescanInterval,
Devices: []config.FolderDeviceConfiguration{
{DeviceID: localID},
{DeviceID: remoteID},
@@ -98,6 +99,7 @@ func StartClient(ctx context.Context, localDir string, localAddr, remoteAddr str
GUI: config.GUIConfiguration{
Enabled: true,
RawAddress: localAddr,
APIKey: pkgconfig.SyncthingAPIKey,
},
Options: config.OptionsConfiguration{
AutoUpgradeIntervalH: 0,