feat: use regctl copy image on local pc

This commit is contained in:
naison
2025-01-29 06:03:36 +00:00
parent 1d40843e99
commit 98baec8253
215 changed files with 29039 additions and 94 deletions

View File

@@ -41,16 +41,7 @@ func (svr *Server) ConnectFork(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectF
Lock: &svr.Lock,
ImagePullSecretName: req.ImagePullSecretName,
}
var sshConf = ssh.ParseSshFromRPC(req.SshJump)
var transferImage = req.TransferImage
defaultlog.Default().SetOutput(io.Discard)
if transferImage {
err = ssh.TransferImage(ctx, sshConf, config.OriginImage, req.Image, out)
if err != nil {
return err
}
}
file, err := util.ConvertToTempKubeconfigFile([]byte(req.KubeconfigBytes))
if err != nil {
return err
@@ -74,7 +65,7 @@ func (svr *Server) ConnectFork(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectF
}()
var path string
path, err = ssh.SshJump(sshCtx, sshConf, flags, false)
path, err = ssh.SshJump(sshCtx, ssh.ParseSshFromRPC(req.SshJump), flags, false)
if err != nil {
return err
}

View File

@@ -61,16 +61,7 @@ func (svr *Server) Connect(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServe
Lock: &svr.Lock,
ImagePullSecretName: req.ImagePullSecretName,
}
var sshConf = ssh.ParseSshFromRPC(req.SshJump)
var transferImage = req.TransferImage
golog.Default().SetOutput(io.Discard)
if transferImage {
err := ssh.TransferImage(ctx, sshConf, config.OriginImage, req.Image, out)
if err != nil {
return err
}
}
file, err := util.ConvertToTempKubeconfigFile([]byte(req.KubeconfigBytes))
if err != nil {
return err
@@ -92,7 +83,7 @@ func (svr *Server) Connect(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServe
}
}()
var path string
path, err = ssh.SshJump(sshCtx, sshConf, flags, false)
path, err = ssh.SshJump(sshCtx, ssh.ParseSshFromRPC(req.SshJump), flags, false)
if err != nil {
return err
}

View File

@@ -185,7 +185,7 @@ func PullImage(ctx context.Context, platform *v1.Platform, cli *client.Client, d
Platform: plat,
})
if err != nil {
log.Errorf("Failed to pull image %s, err: %s, please make sure image is exist and can be pulled from local", img, err)
log.Errorf("Failed to pull image %s: %v", img, err)
return err
}
defer readCloser.Close()

View File

@@ -0,0 +1,75 @@
package ascii
import (
"bytes"
"fmt"
"io"
"golang.org/x/term"
)
type Lines struct {
atStart bool
buf []byte
lines int
out io.Writer
width int
}
func NewLines(w io.Writer) *Lines {
width := 0
if wFd, ok := w.(interface{ Fd() uintptr }); ok && term.IsTerminal(int(wFd.Fd())) {
w, _, err := term.GetSize(int(wFd.Fd()))
if err == nil {
width = w
}
}
return &Lines{
buf: []byte{},
out: w,
width: width,
}
}
func (b *Lines) Add(add []byte) {
b.buf = append(b.buf, add...)
}
func (b *Lines) Del() {
b.buf = b.buf[:0]
}
func (b *Lines) Flush() {
b.Clear()
_, err := b.out.Write(b.buf)
if err != nil {
return
}
b.lines = bytes.Count(b.buf, []byte("\n"))
if b.width > 0 {
for _, line := range bytes.Split(b.buf, []byte("\n")) {
if len(line) > b.width {
b.lines += (len(line) - 1) / b.width
}
}
}
b.buf = b.buf[:0]
b.atStart = false
}
func (b *Lines) Clear() {
if !b.atStart {
b.Return()
}
fmt.Fprintf(b.out, "\033[0J")
b.atStart = true
b.lines = 0
}
func (b *Lines) Return() {
if !b.atStart && b.lines > 0 {
fmt.Fprintf(b.out, "\033[%dF", b.lines)
}
b.atStart = true
}

View File

@@ -0,0 +1,63 @@
package ascii
import (
"fmt"
"io"
"golang.org/x/term"
)
type ProgressBar struct {
Width, Min, Max int
Start, Done, Active, Pending, End byte
Out io.Writer
}
func NewProgressBar(w io.Writer) *ProgressBar {
width := 0
if wFd, ok := w.(interface{ Fd() uintptr }); ok && term.IsTerminal(int(wFd.Fd())) {
w, _, err := term.GetSize(int(wFd.Fd()))
if err == nil {
width = w
}
}
return &ProgressBar{
Width: width,
Min: 10,
Max: 40,
Out: w,
Start: '[',
Done: '=',
Active: '>',
Pending: ' ',
End: ']',
}
}
func (p *ProgressBar) Generate(pct float64, pre, post string) []byte {
if pct < 0 {
pct = 0
} else if pct > 1 {
pct = 1
}
curWidth := p.Width - (len(pre) + len(post) + 2)
if curWidth < p.Min {
curWidth = p.Min
} else if curWidth > p.Max {
curWidth = p.Max
}
buf := make([]byte, curWidth)
doneLen := int(float64(curWidth) * pct)
for i := 0; i < doneLen; i++ {
buf[i] = p.Done
}
if doneLen < curWidth {
buf[doneLen] = p.Active
}
for i := doneLen + 1; i < curWidth; i++ {
buf[i] = p.Pending
}
return []byte(fmt.Sprintf("%s%c%s%c%s\n", pre, p.Start, buf, p.End, post))
}

144
pkg/util/regctl/image.go Normal file
View File

@@ -0,0 +1,144 @@
package regctl
import (
"fmt"
"sort"
"sync"
"time"
"github.com/docker/go-units"
"github.com/regclient/regclient/types"
"github.com/wencaiwulue/kubevpn/v2/pkg/util/regctl/ascii"
)
type ImageProgress struct {
mu sync.Mutex
Start time.Time
Entries map[string]*ImageProgressEntry
AsciiOut *ascii.Lines
Bar *ascii.ProgressBar
changed bool
}
type ImageProgressEntry struct {
kind types.CallbackKind
instance string
state types.CallbackState
start, last time.Time
cur, total int64
bps []float64
}
func (ip *ImageProgress) Callback(kind types.CallbackKind, instance string, state types.CallbackState, cur, total int64) {
// track kind/instance
ip.mu.Lock()
defer ip.mu.Unlock()
ip.changed = true
now := time.Now()
if e, ok := ip.Entries[kind.String()+":"+instance]; ok {
e.state = state
diff := now.Sub(e.last)
bps := float64(cur-e.cur) / diff.Seconds()
e.state = state
e.last = now
e.cur = cur
e.total = total
if len(e.bps) >= 10 {
e.bps = append(e.bps[1:], bps)
} else {
e.bps = append(e.bps, bps)
}
} else {
ip.Entries[kind.String()+":"+instance] = &ImageProgressEntry{
kind: kind,
instance: instance,
state: state,
start: now,
last: now,
cur: cur,
total: total,
bps: []float64{},
}
}
}
func (ip *ImageProgress) Display(final bool) {
ip.mu.Lock()
defer ip.mu.Unlock()
if !ip.changed && !final {
return // skip since no changes since last display and not the final display
}
var manifestTotal, manifestFinished, sum, skipped, queued int64
// sort entry keys by start time
keys := make([]string, 0, len(ip.Entries))
for k := range ip.Entries {
keys = append(keys, k)
}
sort.Slice(keys, func(a, b int) bool {
if ip.Entries[keys[a]].state != ip.Entries[keys[b]].state {
return ip.Entries[keys[a]].state > ip.Entries[keys[b]].state
} else if ip.Entries[keys[a]].state != types.CallbackActive {
return ip.Entries[keys[a]].last.Before(ip.Entries[keys[b]].last)
} else {
return ip.Entries[keys[a]].cur > ip.Entries[keys[b]].cur
}
})
startCount, startLimit := 0, 2
finishedCount, finishedLimit := 0, 2
// hide old finished entries
for i := len(keys) - 1; i >= 0; i-- {
e := ip.Entries[keys[i]]
if e.kind != types.CallbackManifest && e.state == types.CallbackFinished {
finishedCount++
if finishedCount > finishedLimit {
e.state = types.CallbackArchived
}
}
}
for _, k := range keys {
e := ip.Entries[k]
switch e.kind {
case types.CallbackManifest:
manifestTotal++
if e.state == types.CallbackFinished || e.state == types.CallbackSkipped {
manifestFinished++
}
default:
// show progress bars
if !final && (e.state == types.CallbackActive || (e.state == types.CallbackStarted && startCount < startLimit) || e.state == types.CallbackFinished) {
if e.state == types.CallbackStarted {
startCount++
}
pre := e.instance + " "
if len(pre) > 15 {
pre = pre[:14] + " "
}
pct := float64(e.cur) / float64(e.total)
post := fmt.Sprintf(" %4.2f%% %s/%s", pct*100, units.HumanSize(float64(e.cur)), units.HumanSize(float64(e.total)))
ip.AsciiOut.Add(ip.Bar.Generate(pct, pre, post))
}
// track stats
if e.state == types.CallbackSkipped {
skipped += e.total
} else if e.total > 0 {
sum += e.cur
queued += e.total - e.cur
}
}
}
// show stats summary
ip.AsciiOut.Add([]byte(fmt.Sprintf("Manifests: %d/%d | Blobs: %s copied, %s skipped",
manifestFinished, manifestTotal,
units.HumanSize(float64(sum)),
units.HumanSize(float64(skipped)))))
if queued > 0 {
ip.AsciiOut.Add([]byte(fmt.Sprintf(", %s queued",
units.HumanSize(float64(queued)))))
}
ip.AsciiOut.Add([]byte(fmt.Sprintf(" | Elapsed: %ds\n", int64(time.Since(ip.Start).Seconds()))))
ip.AsciiOut.Flush()
if !final {
ip.AsciiOut.Return()
}
}

67
pkg/util/regctl/regctl.go Normal file
View File

@@ -0,0 +1,67 @@
package regctl
import (
"context"
"fmt"
"log/slog"
"os"
"time"
"github.com/regclient/regclient"
"github.com/regclient/regclient/types/ref"
"github.com/wencaiwulue/kubevpn/v2/pkg/util/regctl/ascii"
)
func TransferImageWithRegctl(ctx context.Context, imageSource, imageTarget string) error {
rc := regclient.New(
regclient.WithDockerCerts(),
regclient.WithDockerCreds(),
regclient.WithSlog(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn}))),
)
// create a reference for an image
src, err := ref.New(imageSource)
if err != nil {
_, _ = os.Stdout.Write([]byte(fmt.Sprintf("failed to create ref: %v\n", err)))
return err
}
defer rc.Close(ctx, src)
dst, err := ref.New(imageTarget)
if err != nil {
_, _ = os.Stdout.Write([]byte(fmt.Sprintf("failed to create ref: %v\n", err)))
return err
}
defer rc.Close(ctx, dst)
// check for a tty and attach progress reporter
done := make(chan bool)
var progress = &ImageProgress{
Start: time.Now(),
Entries: map[string]*ImageProgressEntry{},
AsciiOut: ascii.NewLines(os.Stdout),
Bar: ascii.NewProgressBar(os.Stdout),
}
progressFreq := time.Millisecond * 250
ticker := time.NewTicker(progressFreq)
defer ticker.Stop()
go func() {
for {
select {
case <-done:
ticker.Stop()
return
case <-ticker.C:
progress.Display(false)
}
}
}()
var opts []regclient.ImageOpts
opts = append(opts, regclient.ImageWithCallback(progress.Callback))
err = rc.ImageCopy(ctx, src, dst, opts...)
close(done)
progress.Display(true)
return err
}