// 内部包,实现主控模式功能
package internal
import (
"bufio"
"context"
"crypto/rand"
"crypto/tls"
"encoding/gob"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os"
"os/exec"
"os/signal"
"path/filepath"
"regexp"
"runtime"
"strconv"
"strings"
"sync"
"syscall"
"time"
"github.com/NodePassProject/logs"
)
// 常量定义
const (
openAPIVersion = "v1" // OpenAPI版本
stateFilePath = "gob" // 实例状态持久化文件路径
stateFileName = "nodepass.gob" // 实例状态持久化文件名
sseRetryTime = 3000 // 重试间隔时间(毫秒)
apiKeyID = "********" // API Key的特殊ID
tcpingSemLimit = 10 // TCPing最大并发数
baseDuration = 100 * time.Millisecond // 基准持续时间
maxTagsCount = 50 // 最大标签数量
maxTagKeyLen = 100 // 标签键最大长度
maxTagValueLen = 500 // 标签值最大长度
)
// Swagger UI HTML模板
const swaggerUIHTML = `
NodePass API
`
// Master 实现主控模式功能
type Master struct {
Common // 继承通用功能
alias string // 主控别名
prefix string // API前缀
version string // NP版本
hostname string // 隧道名称
logLevel string // 日志级别
crtPath string // 证书路径
keyPath string // 密钥路径
instances sync.Map // 实例映射表
server *http.Server // HTTP服务器
tlsConfig *tls.Config // TLS配置
masterURL *url.URL // 主控URL
statePath string // 实例状态持久化文件路径
stateMu sync.Mutex // 持久化文件写入互斥锁
subscribers sync.Map // SSE订阅者映射表
notifyChannel chan *InstanceEvent // 事件通知通道
tcpingSem chan struct{} // TCPing并发控制
startTime time.Time // 启动时间
periodicDone chan struct{} // 定期任务停止信号
}
// Instance 实例信息
type Instance struct {
ID string `json:"id"` // 实例ID
Alias string `json:"alias"` // 实例别名
Type string `json:"type"` // 实例类型
Status string `json:"status"` // 实例状态
URL string `json:"url"` // 实例URL
Config string `json:"config"` // 实例配置
Restart bool `json:"restart"` // 是否自启动
Tags []Tag `json:"tags"` // 标签数组
Mode int32 `json:"mode"` // 实例模式
Ping int32 `json:"ping"` // 端内延迟
Pool int32 `json:"pool"` // 池连接数
TCPS int32 `json:"tcps"` // TCP连接数
UDPS int32 `json:"udps"` // UDP连接数
TCPRX uint64 `json:"tcprx"` // TCP接收字节数
TCPTX uint64 `json:"tcptx"` // TCP发送字节数
UDPRX uint64 `json:"udprx"` // UDP接收字节数
UDPTX uint64 `json:"udptx"` // UDP发送字节数
TCPRXBase uint64 `json:"-" gob:"-"` // TCP接收字节数基线(不序列化)
TCPTXBase uint64 `json:"-" gob:"-"` // TCP发送字节数基线(不序列化)
UDPRXBase uint64 `json:"-" gob:"-"` // UDP接收字节数基线(不序列化)
UDPTXBase uint64 `json:"-" gob:"-"` // UDP发送字节数基线(不序列化)
cmd *exec.Cmd `json:"-" gob:"-"` // 命令对象(不序列化)
stopped chan struct{} `json:"-" gob:"-"` // 停止信号通道(不序列化)
deleted bool `json:"-" gob:"-"` // 删除标志(不序列化)
cancelFunc context.CancelFunc `json:"-" gob:"-"` // 取消函数(不序列化)
lastCheckPoint time.Time `json:"-" gob:"-"` // 上次检查点时间(不序列化)
}
// Tag 标签结构体
type Tag struct {
Key string `json:"key"` // 标签键
Value string `json:"value"` // 标签值
}
// InstanceEvent 实例事件信息
type InstanceEvent struct {
Type string `json:"type"` // 事件类型:initial, create, update, delete, shutdown, log
Time time.Time `json:"time"` // 事件时间
Instance *Instance `json:"instance"` // 关联的实例
Logs string `json:"logs"` // 日志内容
}
// SystemInfo 系统信息结构体
type SystemInfo struct {
CPU int `json:"cpu"` // CPU使用率 (%)
MemTotal uint64 `json:"mem_total"` // 内存容量字节数
MemUsed uint64 `json:"mem_used"` // 内存已用字节数
SwapTotal uint64 `json:"swap_total"` // 交换区容量字节数
SwapUsed uint64 `json:"swap_used"` // 交换区已用字节数
NetRX uint64 `json:"netrx"` // 网络接收字节数
NetTX uint64 `json:"nettx"` // 网络发送字节数
DiskR uint64 `json:"diskr"` // 磁盘读取字节数
DiskW uint64 `json:"diskw"` // 磁盘写入字节数
SysUp uint64 `json:"sysup"` // 系统运行时间(秒)
}
// TCPingResult TCPing结果结构体
type TCPingResult struct {
Target string `json:"target"`
Connected bool `json:"connected"`
Latency int64 `json:"latency"`
Error *string `json:"error"`
}
// handleTCPing 处理TCPing请求
func (m *Master) handleTCPing(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
httpError(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
target := r.URL.Query().Get("target")
if target == "" {
httpError(w, "Target address required", http.StatusBadRequest)
return
}
// 执行TCPing
result := m.performTCPing(target)
writeJSON(w, http.StatusOK, result)
}
// performTCPing 执行单次TCPing
func (m *Master) performTCPing(target string) *TCPingResult {
result := &TCPingResult{
Target: target,
Connected: false,
Latency: 0,
Error: nil,
}
// 并发控制
select {
case m.tcpingSem <- struct{}{}:
defer func() { <-m.tcpingSem }()
case <-time.After(time.Second):
errMsg := "too many requests"
result.Error = &errMsg
return result
}
start := time.Now()
conn, err := net.DialTimeout("tcp", target, reportInterval)
if err != nil {
errMsg := err.Error()
result.Error = &errMsg
return result
}
result.Connected = true
result.Latency = time.Since(start).Milliseconds()
conn.Close()
return result
}
// validateTags 验证标签的有效性
func validateTags(tags []Tag) error {
if len(tags) > maxTagsCount {
return fmt.Errorf("too many tags: maximum %d allowed", maxTagsCount)
}
keySet := make(map[string]bool)
for _, tag := range tags {
if len(tag.Key) == 0 {
return fmt.Errorf("tag key cannot be empty")
}
if len(tag.Key) > maxTagKeyLen {
return fmt.Errorf("tag key exceeds maximum length %d", maxTagKeyLen)
}
if len(tag.Value) > maxTagValueLen {
return fmt.Errorf("tag value for key exceeds maximum length %d", maxTagValueLen)
}
// 检查重复的键
if keySet[tag.Key] {
return fmt.Errorf("duplicate tag key: '%s'", tag.Key)
}
keySet[tag.Key] = true
}
return nil
}
// InstanceLogWriter 实例日志写入器
type InstanceLogWriter struct {
instanceID string // 实例ID
instance *Instance // 实例对象
target io.Writer // 目标写入器
master *Master // 主控对象
checkPoint *regexp.Regexp // 检查点正则表达式
}
// NewInstanceLogWriter 创建新的实例日志写入器
func NewInstanceLogWriter(instanceID string, instance *Instance, target io.Writer, master *Master) *InstanceLogWriter {
return &InstanceLogWriter{
instanceID: instanceID,
instance: instance,
target: target,
master: master,
checkPoint: regexp.MustCompile(`CHECK_POINT\|MODE=(\d+)\|PING=(\d+)ms\|POOL=(\d+)\|TCPS=(\d+)\|UDPS=(\d+)\|TCPRX=(\d+)\|TCPTX=(\d+)\|UDPRX=(\d+)\|UDPTX=(\d+)`),
}
}
// Write 实现io.Writer接口,处理日志输出并解析统计信息
func (w *InstanceLogWriter) Write(p []byte) (n int, err error) {
s := string(p)
scanner := bufio.NewScanner(strings.NewReader(s))
for scanner.Scan() {
line := scanner.Text()
// 解析并处理检查点信息
if matches := w.checkPoint.FindStringSubmatch(line); len(matches) == 10 {
// matches[1] = MODE, matches[2] = PING, matches[3] = POOL, matches[4] = TCPS, matches[5] = UDPS, matches[6] = TCPRX, matches[7] = TCPTX, matches[8] = UDPRX, matches[9] = UDPTX
if mode, err := strconv.ParseInt(matches[1], 10, 32); err == nil {
w.instance.Mode = int32(mode)
}
if ping, err := strconv.ParseInt(matches[2], 10, 32); err == nil {
w.instance.Ping = int32(ping)
}
if pool, err := strconv.ParseInt(matches[3], 10, 32); err == nil {
w.instance.Pool = int32(pool)
}
if tcps, err := strconv.ParseInt(matches[4], 10, 32); err == nil {
w.instance.TCPS = int32(tcps)
}
if udps, err := strconv.ParseInt(matches[5], 10, 32); err == nil {
w.instance.UDPS = int32(udps)
}
stats := []*uint64{&w.instance.TCPRX, &w.instance.TCPTX, &w.instance.UDPRX, &w.instance.UDPTX}
bases := []uint64{w.instance.TCPRXBase, w.instance.TCPTXBase, w.instance.UDPRXBase, w.instance.UDPTXBase}
for i, stat := range stats {
if v, err := strconv.ParseUint(matches[i+6], 10, 64); err == nil {
*stat = bases[i] + v
}
}
w.instance.lastCheckPoint = time.Now()
// 仅当实例未被删除时才存储和发送更新事件
if !w.instance.deleted {
w.master.instances.Store(w.instanceID, w.instance)
w.master.sendSSEEvent("update", w.instance)
}
// 过滤检查点日志
continue
}
// 输出日志加实例ID
fmt.Fprintf(w.target, "%s [%s]\n", line, w.instanceID)
// 仅当实例未被删除时才发送日志事件
if !w.instance.deleted {
w.master.sendSSEEvent("log", w.instance, line)
}
}
if err := scanner.Err(); err != nil {
fmt.Fprintf(w.target, "%s [%s]", s, w.instanceID)
}
return len(p), nil
}
// setCorsHeaders 设置跨域响应头
func setCorsHeaders(w http.ResponseWriter) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET, PATCH, POST, PUT, DELETE, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization, X-API-Key, Cache-Control")
}
// NewMaster 创建新的主控实例
func NewMaster(parsedURL *url.URL, tlsCode string, tlsConfig *tls.Config, logger *logs.Logger, version string) (*Master, error) {
// 解析主机地址
host, err := net.ResolveTCPAddr("tcp", parsedURL.Host)
if err != nil {
return nil, fmt.Errorf("newMaster: resolve host failed: %w", err)
}
// 获取隧道名称
var hostname string
if tlsConfig != nil && tlsConfig.ServerName != "" {
hostname = tlsConfig.ServerName
} else {
hostname = parsedURL.Hostname()
}
// 设置API前缀
prefix := parsedURL.Path
if prefix == "" || prefix == "/" {
prefix = "/api"
} else {
prefix = strings.TrimRight(prefix, "/")
}
// 获取应用程序目录作为状态文件存储位置
execPath, _ := os.Executable()
baseDir := filepath.Dir(execPath)
master := &Master{
Common: Common{
tlsCode: tlsCode,
logger: logger,
},
prefix: fmt.Sprintf("%s/%s", prefix, openAPIVersion),
version: version,
logLevel: parsedURL.Query().Get("log"),
crtPath: parsedURL.Query().Get("crt"),
keyPath: parsedURL.Query().Get("key"),
hostname: hostname,
tlsConfig: tlsConfig,
masterURL: parsedURL,
statePath: filepath.Join(baseDir, stateFilePath, stateFileName),
notifyChannel: make(chan *InstanceEvent, semaphoreLimit),
tcpingSem: make(chan struct{}, tcpingSemLimit),
startTime: time.Now(),
periodicDone: make(chan struct{}),
}
master.tunnelTCPAddr = host
// 加载持久化的实例状态
master.loadState()
// 启动事件分发器
go master.startEventDispatcher()
return master, nil
}
// Run 管理主控生命周期
func (m *Master) Run() {
m.logger.Info("Master started: %v%v", m.tunnelTCPAddr, m.prefix)
// 初始化API Key
apiKey, ok := m.findInstance(apiKeyID)
if !ok {
// 如果不存在API Key实例,则创建一个
apiKey = &Instance{
ID: apiKeyID,
URL: generateAPIKey(),
}
m.instances.Store(apiKeyID, apiKey)
m.saveState()
m.logger.Info("API Key created: %v", apiKey.URL)
} else {
m.logger.Info("API Key loaded: %v", apiKey.URL)
}
// 设置HTTP路由
mux := http.NewServeMux()
// 创建需要API Key认证的端点
protectedEndpoints := map[string]http.HandlerFunc{
fmt.Sprintf("%s/instances", m.prefix): m.handleInstances,
fmt.Sprintf("%s/instances/", m.prefix): m.handleInstanceDetail,
fmt.Sprintf("%s/events", m.prefix): m.handleSSE,
fmt.Sprintf("%s/info", m.prefix): m.handleInfo,
fmt.Sprintf("%s/tcping", m.prefix): m.handleTCPing,
}
// 创建不需要API Key认证的端点
publicEndpoints := map[string]http.HandlerFunc{
fmt.Sprintf("%s/openapi.json", m.prefix): m.handleOpenAPISpec,
fmt.Sprintf("%s/docs", m.prefix): m.handleSwaggerUI,
}
// API Key 认证中间件
apiKeyMiddleware := func(next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// 设置跨域响应头
setCorsHeaders(w)
if r.Method == "OPTIONS" {
w.WriteHeader(http.StatusOK)
return
}
// 读取API Key,如果存在的话
apiKeyInstance, keyExists := m.findInstance(apiKeyID)
if keyExists && apiKeyInstance.URL != "" {
// 检查请求头中的API Key
reqAPIKey := r.Header.Get("X-API-Key")
if reqAPIKey == "" {
// API Key不存在,返回未授权错误
httpError(w, "Unauthorized: API key required", http.StatusUnauthorized)
return
}
// 验证API Key
if reqAPIKey != apiKeyInstance.URL {
httpError(w, "Unauthorized: Invalid API key", http.StatusUnauthorized)
return
}
}
// 调用原始处理器
next(w, r)
}
}
// CORS 中间件
corsMiddleware := func(next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// 设置跨域响应头
setCorsHeaders(w)
if r.Method == "OPTIONS" {
w.WriteHeader(http.StatusOK)
return
}
next(w, r)
}
}
// 注册受保护的端点
for path, handler := range protectedEndpoints {
mux.HandleFunc(path, apiKeyMiddleware(handler))
}
// 注册公共端点
for path, handler := range publicEndpoints {
mux.HandleFunc(path, corsMiddleware(handler))
}
// 创建HTTP服务器
m.server = &http.Server{
Addr: m.tunnelTCPAddr.String(),
ErrorLog: m.logger.StdLogger(),
Handler: mux,
TLSConfig: m.tlsConfig,
}
// 启动HTTP服务器
go func() {
var err error
if m.tlsConfig != nil {
err = m.server.ListenAndServeTLS("", "")
} else {
err = m.server.ListenAndServe()
}
if err != nil && err != http.ErrServerClosed {
m.logger.Error("run: listen failed: %v", err)
}
}()
// 启动定期任务
go m.startPeriodicTasks()
// 处理系统信号
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
<-ctx.Done()
stop()
// 优雅关闭
shutdownCtx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
defer cancel()
if err := m.Shutdown(shutdownCtx); err != nil {
m.logger.Error("Master shutdown error: %v", err)
} else {
m.logger.Info("Master shutdown complete")
}
}
// Shutdown 关闭主控
func (m *Master) Shutdown(ctx context.Context) error {
return m.shutdown(ctx, func() {
// 通知并关闭SSE连接
m.shutdownSSEConnections()
// 停止所有运行中的实例
var wg sync.WaitGroup
m.instances.Range(func(key, value any) bool {
instance := value.(*Instance)
// 如果实例需要停止,则停止它
if instance.Status != "stopped" && instance.cmd != nil && instance.cmd.Process != nil {
wg.Add(1)
go func(inst *Instance) {
defer wg.Done()
m.stopInstance(inst)
}(instance)
}
return true
})
wg.Wait()
// 关闭定期任务
close(m.periodicDone)
// 关闭事件通知通道,停止事件分发器
close(m.notifyChannel)
// 保存实例状态
if err := m.saveState(); err != nil {
m.logger.Error("shutdown: save gob failed: %v", err)
} else {
m.logger.Info("Instances saved: %v", m.statePath)
}
// 关闭HTTP服务器
if err := m.server.Shutdown(ctx); err != nil {
m.logger.Error("shutdown: api shutdown error: %v", err)
}
})
}
// startPeriodicTasks 启动所有定期任务
func (m *Master) startPeriodicTasks() {
go m.startPeriodicBackup()
go m.startPeriodicCleanup()
go m.startPeriodicRestart()
}
// startPeriodicBackup 启动定期备份
func (m *Master) startPeriodicBackup() {
for {
select {
case <-time.After(ReloadInterval):
// 固定备份文件名
backupPath := fmt.Sprintf("%s.backup", m.statePath)
if err := m.saveStateToPath(backupPath); err != nil {
m.logger.Error("startPeriodicBackup: backup state failed: %v", err)
} else {
m.logger.Info("State backup saved: %v", backupPath)
}
case <-m.periodicDone:
return
}
}
}
// startPeriodicCleanup 启动定期清理重复ID的实例
func (m *Master) startPeriodicCleanup() {
for {
select {
case <-time.After(reportInterval):
// 收集实例并按ID分组
idInstances := make(map[string][]*Instance)
m.instances.Range(func(key, value any) bool {
if id := key.(string); id != apiKeyID {
idInstances[id] = append(idInstances[id], value.(*Instance))
}
return true
})
// 清理重复实例
for _, instances := range idInstances {
if len(instances) <= 1 {
continue
}
// 选择保留实例
keepIdx := 0
for i, inst := range instances {
if inst.Status == "running" && instances[keepIdx].Status != "running" {
keepIdx = i
}
}
// 清理多余实例
for i, inst := range instances {
if i == keepIdx {
continue
}
inst.deleted = true
if inst.Status != "stopped" {
m.stopInstance(inst)
}
m.instances.Delete(inst.ID)
}
}
case <-m.periodicDone:
return
}
}
}
// startPeriodicRestart 启动定期错误实例重启
func (m *Master) startPeriodicRestart() {
for {
select {
case <-time.After(reportInterval):
// 收集所有error状态的实例
var errorInstances []*Instance
m.instances.Range(func(key, value any) bool {
if id := key.(string); id != apiKeyID {
instance := value.(*Instance)
if instance.Status == "error" && !instance.deleted {
errorInstances = append(errorInstances, instance)
}
}
return true
})
// 重启所有error状态的实例
for _, instance := range errorInstances {
m.stopInstance(instance)
time.Sleep(baseDuration)
m.startInstance(instance)
}
case <-m.periodicDone:
return
}
}
}
// saveState 保存实例状态到文件
func (m *Master) saveState() error {
return m.saveStateToPath(m.statePath)
}
// saveStateToPath 保存实例状态到指定路径
func (m *Master) saveStateToPath(filePath string) error {
if !m.stateMu.TryLock() {
return nil
}
defer m.stateMu.Unlock()
// 创建持久化数据
persistentData := make(map[string]*Instance)
// 从sync.Map转换数据
m.instances.Range(func(key, value any) bool {
instance := value.(*Instance)
persistentData[key.(string)] = instance
return true
})
// 如果没有实例,直接返回
if len(persistentData) == 0 {
// 如果状态文件存在,删除它
if _, err := os.Stat(filePath); err == nil {
return os.Remove(filePath)
}
return nil
}
// 确保目录存在
if err := os.MkdirAll(filepath.Dir(filePath), 0755); err != nil {
return fmt.Errorf("saveStateToPath: mkdirAll failed: %w", err)
}
// 创建临时文件
tempFile, err := os.CreateTemp(filepath.Dir(filePath), "np-*.tmp")
if err != nil {
return fmt.Errorf("saveStateToPath: createTemp failed: %w", err)
}
tempPath := tempFile.Name()
// 删除临时文件的函数,只在错误情况下使用
removeTemp := func() {
if _, err := os.Stat(tempPath); err == nil {
os.Remove(tempPath)
}
}
// 编码数据
encoder := gob.NewEncoder(tempFile)
if err := encoder.Encode(persistentData); err != nil {
tempFile.Close()
removeTemp()
return fmt.Errorf("saveStateToPath: encode failed: %w", err)
}
// 关闭文件
if err := tempFile.Close(); err != nil {
removeTemp()
return fmt.Errorf("saveStateToPath: close temp file failed: %w", err)
}
// 原子地替换文件
if err := os.Rename(tempPath, filePath); err != nil {
removeTemp()
return fmt.Errorf("saveStateToPath: rename temp file failed: %w", err)
}
return nil
}
// loadState 从文件加载实例状态
func (m *Master) loadState() {
// 清理旧的临时文件
if tmpFiles, _ := filepath.Glob(filepath.Join(filepath.Dir(m.statePath), "np-*.tmp")); tmpFiles != nil {
for _, f := range tmpFiles {
os.Remove(f)
}
}
// 检查文件是否存在
if _, err := os.Stat(m.statePath); os.IsNotExist(err) {
return
}
// 打开文件
file, err := os.Open(m.statePath)
if err != nil {
m.logger.Error("loadState: open file failed: %v", err)
return
}
defer file.Close()
// 解码数据
var persistentData map[string]*Instance
decoder := gob.NewDecoder(file)
if err := decoder.Decode(&persistentData); err != nil {
m.logger.Error("loadState: decode file failed: %v", err)
return
}
// 恢复实例
for id, instance := range persistentData {
instance.stopped = make(chan struct{})
// 生成完整配置
if instance.Config == "" && instance.ID != apiKeyID {
instance.Config = m.generateConfigURL(instance)
}
m.instances.Store(id, instance)
// 处理自启动
if instance.Restart {
m.logger.Info("Auto-starting instance: %v [%v]", instance.URL, instance.ID)
m.startInstance(instance)
}
}
m.logger.Info("Loaded %v instances from %v", len(persistentData), m.statePath)
}
// handleOpenAPISpec 处理OpenAPI规范请求
func (m *Master) handleOpenAPISpec(w http.ResponseWriter, r *http.Request) {
setCorsHeaders(w)
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(m.generateOpenAPISpec()))
}
// handleSwaggerUI 处理Swagger UI请求
func (m *Master) handleSwaggerUI(w http.ResponseWriter, r *http.Request) {
setCorsHeaders(w)
w.Header().Set("Content-Type", "text/html")
fmt.Fprintf(w, swaggerUIHTML, m.generateOpenAPISpec())
}
// handleInfo 处理系统信息请求
func (m *Master) handleInfo(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
writeJSON(w, http.StatusOK, m.getMasterInfo())
case http.MethodPost:
var reqData struct {
Alias string `json:"alias"`
}
if err := json.NewDecoder(r.Body).Decode(&reqData); err != nil {
httpError(w, "Invalid request body", http.StatusBadRequest)
return
}
// 更新主控别名
if len(reqData.Alias) > maxTagKeyLen {
httpError(w, fmt.Sprintf("Master alias exceeds maximum length %d", maxTagKeyLen), http.StatusBadRequest)
return
}
m.alias = reqData.Alias
writeJSON(w, http.StatusOK, m.getMasterInfo())
default:
httpError(w, "Method not allowed", http.StatusMethodNotAllowed)
}
}
// getMasterInfo 获取完整的主控信息
func (m *Master) getMasterInfo() map[string]any {
info := map[string]any{
"alias": m.alias,
"os": runtime.GOOS,
"arch": runtime.GOARCH,
"cpu": -1,
"mem_total": uint64(0),
"mem_used": uint64(0),
"swap_total": uint64(0),
"swap_used": uint64(0),
"netrx": uint64(0),
"nettx": uint64(0),
"diskr": uint64(0),
"diskw": uint64(0),
"sysup": uint64(0),
"ver": m.version,
"name": m.hostname,
"uptime": uint64(time.Since(m.startTime).Seconds()),
"log": m.logLevel,
"tls": m.tlsCode,
"crt": m.crtPath,
"key": m.keyPath,
}
if runtime.GOOS == "linux" {
sysInfo := getLinuxSysInfo()
info["cpu"] = sysInfo.CPU
info["mem_total"] = sysInfo.MemTotal
info["mem_used"] = sysInfo.MemUsed
info["swap_total"] = sysInfo.SwapTotal
info["swap_used"] = sysInfo.SwapUsed
info["netrx"] = sysInfo.NetRX
info["nettx"] = sysInfo.NetTX
info["diskr"] = sysInfo.DiskR
info["diskw"] = sysInfo.DiskW
info["sysup"] = sysInfo.SysUp
}
return info
}
// getLinuxSysInfo 获取Linux系统信息
func getLinuxSysInfo() SystemInfo {
info := SystemInfo{
CPU: -1,
MemTotal: 0,
MemUsed: 0,
SwapTotal: 0,
SwapUsed: 0,
NetRX: 0,
NetTX: 0,
DiskR: 0,
DiskW: 0,
SysUp: 0,
}
if runtime.GOOS != "linux" {
return info
}
// CPU占用:解析/proc/stat
readStat := func() (idle, total uint64) {
data, err := os.ReadFile("/proc/stat")
if err != nil {
return
}
for line := range strings.SplitSeq(string(data), "\n") {
if strings.HasPrefix(line, "cpu ") {
fields := strings.Fields(line)
for i, v := range fields[1:] {
val, _ := strconv.ParseUint(v, 10, 64)
total += val
if i == 3 {
idle = val
}
}
break
}
}
return
}
idle1, total1 := readStat()
time.Sleep(baseDuration)
idle2, total2 := readStat()
numCPU := runtime.NumCPU()
if deltaIdle, deltaTotal := idle2-idle1, total2-total1; deltaTotal > 0 && numCPU > 0 {
info.CPU = min(int((deltaTotal-deltaIdle)*100/deltaTotal/uint64(numCPU)), 100)
}
// RAM占用:解析/proc/meminfo
if data, err := os.ReadFile("/proc/meminfo"); err == nil {
var memTotal, memAvailable, swapTotal, swapFree uint64
for line := range strings.SplitSeq(string(data), "\n") {
if fields := strings.Fields(line); len(fields) >= 2 {
if val, err := strconv.ParseUint(fields[1], 10, 64); err == nil {
val *= 1024
switch fields[0] {
case "MemTotal:":
memTotal = val
case "MemAvailable:":
memAvailable = val
case "SwapTotal:":
swapTotal = val
case "SwapFree:":
swapFree = val
}
}
}
}
info.MemTotal = memTotal
info.MemUsed = memTotal - memAvailable
info.SwapTotal = swapTotal
info.SwapUsed = swapTotal - swapFree
}
// 网络I/O:解析/proc/net/dev
if data, err := os.ReadFile("/proc/net/dev"); err == nil {
for _, line := range strings.Split(string(data), "\n")[2:] {
if fields := strings.Fields(line); len(fields) >= 10 {
ifname := strings.TrimSuffix(fields[0], ":")
// 排除项
if strings.HasPrefix(ifname, "lo") || strings.HasPrefix(ifname, "veth") ||
strings.HasPrefix(ifname, "docker") || strings.HasPrefix(ifname, "podman") ||
strings.HasPrefix(ifname, "br-") || strings.HasPrefix(ifname, "virbr") {
continue
}
if val, err := strconv.ParseUint(fields[1], 10, 64); err == nil {
info.NetRX += val
}
if val, err := strconv.ParseUint(fields[9], 10, 64); err == nil {
info.NetTX += val
}
}
}
}
// 磁盘I/O:解析/proc/diskstats
if data, err := os.ReadFile("/proc/diskstats"); err == nil {
for line := range strings.SplitSeq(string(data), "\n") {
if fields := strings.Fields(line); len(fields) >= 14 {
deviceName := fields[2]
// 排除项
if strings.Contains(deviceName, "loop") || strings.Contains(deviceName, "ram") ||
strings.HasPrefix(deviceName, "dm-") || strings.HasPrefix(deviceName, "md") {
continue
}
if matched, _ := regexp.MatchString(`\d+$`, deviceName); matched {
continue
}
if val, err := strconv.ParseUint(fields[5], 10, 64); err == nil {
info.DiskR += val * 512
}
if val, err := strconv.ParseUint(fields[9], 10, 64); err == nil {
info.DiskW += val * 512
}
}
}
}
// 系统运行时间:解析/proc/uptime
if data, err := os.ReadFile("/proc/uptime"); err == nil {
if fields := strings.Fields(string(data)); len(fields) > 0 {
if uptime, err := strconv.ParseFloat(fields[0], 64); err == nil {
info.SysUp = uint64(uptime)
}
}
}
return info
}
// handleInstances 处理实例集合请求
func (m *Master) handleInstances(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
// 获取所有实例
instances := []*Instance{}
m.instances.Range(func(_, value any) bool {
instances = append(instances, value.(*Instance))
return true
})
writeJSON(w, http.StatusOK, instances)
case http.MethodPost:
// 创建新实例
var reqData struct {
URL string `json:"url"`
}
if err := json.NewDecoder(r.Body).Decode(&reqData); err != nil || reqData.URL == "" {
httpError(w, "Invalid request body", http.StatusBadRequest)
return
}
// 解析URL
parsedURL, err := url.Parse(reqData.URL)
if err != nil {
httpError(w, "Invalid URL format", http.StatusBadRequest)
return
}
// 验证实例类型
instanceType := parsedURL.Scheme
if instanceType != "client" && instanceType != "server" {
httpError(w, "Invalid URL scheme", http.StatusBadRequest)
return
}
// 生成实例ID
id := generateID()
if _, exists := m.instances.Load(id); exists {
httpError(w, "Instance ID already exists", http.StatusConflict)
return
}
// 创建实例
instance := &Instance{
ID: id,
Type: instanceType,
URL: m.enhanceURL(reqData.URL, instanceType),
Status: "stopped",
Restart: true,
Tags: []Tag{},
stopped: make(chan struct{}),
}
instance.Config = m.generateConfigURL(instance)
m.instances.Store(id, instance)
// 启动实例
go m.startInstance(instance)
// 保存实例状态
go func() {
time.Sleep(baseDuration)
m.saveState()
}()
writeJSON(w, http.StatusCreated, instance)
// 发送创建事件
m.sendSSEEvent("create", instance)
default:
httpError(w, "Method not allowed", http.StatusMethodNotAllowed)
}
}
// handleInstanceDetail 处理单个实例请求
func (m *Master) handleInstanceDetail(w http.ResponseWriter, r *http.Request) {
// 获取实例ID
id := strings.TrimPrefix(r.URL.Path, fmt.Sprintf("%s/instances/", m.prefix))
if id == "" || id == "/" {
httpError(w, "Instance ID is required", http.StatusBadRequest)
return
}
// 查找实例
instance, ok := m.findInstance(id)
if !ok {
httpError(w, "Instance not found", http.StatusNotFound)
return
}
switch r.Method {
case http.MethodGet:
m.handleGetInstance(w, instance)
case http.MethodPatch:
m.handlePatchInstance(w, r, id, instance)
case http.MethodPut:
m.handlePutInstance(w, r, id, instance)
case http.MethodDelete:
m.handleDeleteInstance(w, id, instance)
default:
httpError(w, "Method not allowed", http.StatusMethodNotAllowed)
}
}
// handleGetInstance 处理获取实例信息请求
func (m *Master) handleGetInstance(w http.ResponseWriter, instance *Instance) {
writeJSON(w, http.StatusOK, instance)
}
// handlePatchInstance 处理更新实例状态请求
func (m *Master) handlePatchInstance(w http.ResponseWriter, r *http.Request, id string, instance *Instance) {
var reqData struct {
Alias string `json:"alias,omitempty"`
Action string `json:"action,omitempty"`
Restart *bool `json:"restart,omitempty"`
Tags []Tag `json:"tags,omitempty"`
}
if err := json.NewDecoder(r.Body).Decode(&reqData); err == nil {
if id == apiKeyID {
// API Key实例只允许restart操作
if reqData.Action == "restart" {
m.regenerateAPIKey(instance)
// 只有API Key需要在这里发送事件
m.sendSSEEvent("update", instance)
}
} else {
// 处理标签更新
if reqData.Tags != nil {
if err := validateTags(reqData.Tags); err != nil {
httpError(w, err.Error(), http.StatusBadRequest)
return
}
// 创建现有标签的映射表
existingTags := make(map[string]Tag)
for _, tag := range instance.Tags {
existingTags[tag.Key] = tag
}
for _, tag := range reqData.Tags {
if tag.Value == "" {
// value为空,删除key
delete(existingTags, tag.Key)
} else {
// value非空,更新或添加key
existingTags[tag.Key] = tag
}
}
// 将映射表转换回标签数组
newTags := make([]Tag, 0, len(existingTags))
for _, tag := range existingTags {
newTags = append(newTags, tag)
}
instance.Tags = newTags
m.instances.Store(id, instance)
go m.saveState()
m.logger.Info("Tags updated: [%v]", instance.ID)
// 发送标签变更事件
m.sendSSEEvent("update", instance)
}
// 重置流量统计
if reqData.Action == "reset" {
instance.TCPRX = 0
instance.TCPTX = 0
instance.UDPRX = 0
instance.UDPTX = 0
m.instances.Store(id, instance)
go m.saveState()
m.logger.Info("Traffic stats reset: [%v]", instance.ID)
// 发送流量统计重置事件
m.sendSSEEvent("update", instance)
}
// 更新自启动设置
if reqData.Restart != nil && instance.Restart != *reqData.Restart {
instance.Restart = *reqData.Restart
m.instances.Store(id, instance)
go m.saveState()
m.logger.Info("Restart policy updated: %v [%v]", *reqData.Restart, instance.ID)
// 发送restart策略变更事件
m.sendSSEEvent("update", instance)
}
// 更新实例别名
if reqData.Alias != "" && instance.Alias != reqData.Alias {
if len(reqData.Alias) > maxTagKeyLen {
httpError(w, fmt.Sprintf("Instance alias exceeds maximum length %d", maxTagKeyLen), http.StatusBadRequest)
return
}
instance.Alias = reqData.Alias
m.instances.Store(id, instance)
go m.saveState()
m.logger.Info("Alias updated: %v [%v]", reqData.Alias, instance.ID)
// 发送别名变更事件
m.sendSSEEvent("update", instance)
}
// 处理当前实例操作
if reqData.Action != "" && reqData.Action != "reset" {
m.processInstanceAction(instance, reqData.Action)
}
}
}
writeJSON(w, http.StatusOK, instance)
}
// handlePutInstance 处理更新实例URL请求
func (m *Master) handlePutInstance(w http.ResponseWriter, r *http.Request, id string, instance *Instance) {
// API Key实例不允许修改URL
if id == apiKeyID {
httpError(w, "Forbidden: API Key", http.StatusForbidden)
return
}
var reqData struct {
URL string `json:"url"`
}
if err := json.NewDecoder(r.Body).Decode(&reqData); err != nil || reqData.URL == "" {
httpError(w, "Invalid request body", http.StatusBadRequest)
return
}
// 解析URL
parsedURL, err := url.Parse(reqData.URL)
if err != nil {
httpError(w, "Invalid URL format", http.StatusBadRequest)
return
}
// 验证实例类型
instanceType := parsedURL.Scheme
if instanceType != "client" && instanceType != "server" {
httpError(w, "Invalid URL scheme", http.StatusBadRequest)
return
}
// 增强URL以便进行重复检测
enhancedURL := m.enhanceURL(reqData.URL, instanceType)
// 检查是否与当前实例的URL相同
if instance.URL == enhancedURL {
httpError(w, "Instance URL conflict", http.StatusConflict)
return
}
// 如果实例需要停止,先停止它
if instance.Status != "stopped" {
m.stopInstance(instance)
time.Sleep(baseDuration)
}
// 更新实例URL和类型
instance.URL = enhancedURL
instance.Type = instanceType
instance.Config = m.generateConfigURL(instance)
// 更新实例状态
instance.Status = "stopped"
m.instances.Store(id, instance)
// 启动实例
go m.startInstance(instance)
// 保存实例状态
go func() {
time.Sleep(baseDuration)
m.saveState()
}()
writeJSON(w, http.StatusOK, instance)
m.logger.Info("Instance URL updated: %v [%v]", instance.URL, instance.ID)
}
// regenerateAPIKey 重新生成API Key
func (m *Master) regenerateAPIKey(instance *Instance) {
instance.URL = generateAPIKey()
m.instances.Store(apiKeyID, instance)
m.logger.Info("API Key regenerated: %v", instance.URL)
go m.saveState()
go m.shutdownSSEConnections()
}
// processInstanceAction 处理实例操作
func (m *Master) processInstanceAction(instance *Instance, action string) {
switch action {
case "start":
if instance.Status != "running" {
go m.startInstance(instance)
}
case "stop":
if instance.Status != "stopped" {
go m.stopInstance(instance)
}
case "restart":
go func() {
m.stopInstance(instance)
time.Sleep(baseDuration)
m.startInstance(instance)
}()
}
}
// handleDeleteInstance 处理删除实例请求
func (m *Master) handleDeleteInstance(w http.ResponseWriter, id string, instance *Instance) {
// API Key实例不允许删除
if id == apiKeyID {
httpError(w, "Forbidden: API Key", http.StatusForbidden)
return
}
// 标记实例为已删除
instance.deleted = true
m.instances.Store(id, instance)
if instance.Status != "stopped" {
m.stopInstance(instance)
}
m.instances.Delete(id)
// 删除实例后保存状态
go m.saveState()
w.WriteHeader(http.StatusNoContent)
// 发送删除事件
m.sendSSEEvent("delete", instance)
}
// handleSSE 处理SSE连接请求
func (m *Master) handleSSE(w http.ResponseWriter, r *http.Request) {
// 验证是否为GET请求
if r.Method != http.MethodGet {
httpError(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// 设置SSE相关响应头
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
// 创建唯一的订阅者ID
subscriberID := generateID()
// 创建一个通道用于接收事件
events := make(chan *InstanceEvent, 10)
// 注册订阅者
m.subscribers.Store(subscriberID, events)
defer m.subscribers.Delete(subscriberID)
// 发送初始重试间隔
fmt.Fprintf(w, "retry: %d\n\n", sseRetryTime)
// 获取当前所有实例并发送初始状态
m.instances.Range(func(_, value any) bool {
instance := value.(*Instance)
event := &InstanceEvent{
Type: "initial",
Time: time.Now(),
Instance: instance,
}
data, err := json.Marshal(event)
if err == nil {
fmt.Fprintf(w, "event: instance\ndata: %s\n\n", data)
w.(http.Flusher).Flush()
}
return true
})
// 设置客户端连接超时
ctx, cancel := context.WithCancel(r.Context())
defer cancel()
// 客户端连接关闭标志
connectionClosed := make(chan struct{})
// 监听客户端连接是否关闭
go func() {
<-ctx.Done()
close(connectionClosed)
// 从映射表中移除并关闭通道
if ch, exists := m.subscribers.LoadAndDelete(subscriberID); exists {
close(ch.(chan *InstanceEvent))
}
}()
// 持续发送事件到客户端
for {
select {
case <-connectionClosed:
return
case event, ok := <-events:
if !ok {
return
}
// 序列化事件数据
data, err := json.Marshal(event)
if err != nil {
m.logger.Error("handleSSE: event marshal error: %v", err)
continue
}
// 发送事件
fmt.Fprintf(w, "event: instance\ndata: %s\n\n", data)
w.(http.Flusher).Flush()
}
}
}
// sendSSEEvent 发送SSE事件的通用函数
func (m *Master) sendSSEEvent(eventType string, instance *Instance, logs ...string) {
event := &InstanceEvent{
Type: eventType,
Time: time.Now(),
Instance: instance,
}
// 如果有日志内容,添加到事件中
if len(logs) > 0 {
event.Logs = logs[0]
}
// 非阻塞方式发送事件
select {
case m.notifyChannel <- event:
default:
// 通道已满或关闭,忽略
}
}
// shutdownSSEConnections 通知并关闭SSE连接
func (m *Master) shutdownSSEConnections() {
var wg sync.WaitGroup
// 发送shutdown通知并关闭通道
m.subscribers.Range(func(key, value any) bool {
ch := value.(chan *InstanceEvent)
wg.Add(1)
go func(subscriberID any, eventChan chan *InstanceEvent) {
defer wg.Done()
// 发送shutdown通知
select {
case eventChan <- &InstanceEvent{Type: "shutdown", Time: time.Now()}:
default:
}
// 从映射表中移除并关闭通道
if _, exists := m.subscribers.LoadAndDelete(subscriberID); exists {
close(eventChan)
}
}(key, ch)
return true
})
wg.Wait()
}
// startEventDispatcher 启动事件分发器
func (m *Master) startEventDispatcher() {
for event := range m.notifyChannel {
// 向所有订阅者分发事件
m.subscribers.Range(func(_, value any) bool {
eventChan := value.(chan *InstanceEvent)
// 非阻塞方式发送事件
select {
case eventChan <- event:
default:
// 不可用,忽略
}
return true
})
}
}
// findInstance 查找实例
func (m *Master) findInstance(id string) (*Instance, bool) {
value, exists := m.instances.Load(id)
if !exists {
return nil, false
}
return value.(*Instance), true
}
// startInstance 启动实例
func (m *Master) startInstance(instance *Instance) {
// 获取最新实例状态
if value, exists := m.instances.Load(instance.ID); exists {
instance = value.(*Instance)
if instance.Status == "running" {
return
}
}
// 启动前,记录基线
instance.TCPRXBase = instance.TCPRX
instance.TCPTXBase = instance.TCPTX
instance.UDPRXBase = instance.UDPRX
instance.UDPTXBase = instance.UDPTX
// 获取可执行文件路径
execPath, err := os.Executable()
if err != nil {
m.logger.Error("startInstance: get path failed: %v [%v]", err, instance.ID)
instance.Status = "error"
m.instances.Store(instance.ID, instance)
m.sendSSEEvent("update", instance)
return
}
// 创建上下文和命令
ctx, cancel := context.WithCancel(context.Background())
cmd := exec.CommandContext(ctx, execPath, instance.URL)
instance.cancelFunc = cancel
// 设置日志输出
writer := NewInstanceLogWriter(instance.ID, instance, os.Stdout, m)
cmd.Stdout, cmd.Stderr = writer, writer
m.logger.Info("Instance starting: %v [%v]", instance.URL, instance.ID)
// 启动实例
if err := cmd.Start(); err != nil || cmd.Process == nil || cmd.Process.Pid <= 0 {
if err != nil {
m.logger.Error("startInstance: instance error: %v [%v]", err, instance.ID)
} else {
m.logger.Error("startInstance: instance start failed [%v]", instance.ID)
}
instance.Status = "error"
m.instances.Store(instance.ID, instance)
m.sendSSEEvent("update", instance)
cancel()
return
}
instance.cmd = cmd
instance.Status = "running"
go m.monitorInstance(instance, cmd)
m.instances.Store(instance.ID, instance)
// 发送启动事件
m.sendSSEEvent("update", instance)
}
// monitorInstance 监控实例状态
func (m *Master) monitorInstance(instance *Instance, cmd *exec.Cmd) {
done := make(chan error, 1)
go func() { done <- cmd.Wait() }()
for {
select {
case <-instance.stopped:
// 实例被显式停止
return
case err := <-done:
// 获取最新的实例状态
if value, exists := m.instances.Load(instance.ID); exists {
instance = value.(*Instance)
if instance.Status == "running" {
if err != nil {
m.logger.Error("monitorInstance: instance error: %v [%v]", err, instance.ID)
instance.Status = "error"
} else {
instance.Status = "stopped"
}
m.instances.Store(instance.ID, instance)
m.sendSSEEvent("update", instance)
}
}
return
case <-time.After(reportInterval):
if !instance.lastCheckPoint.IsZero() && time.Since(instance.lastCheckPoint) > 3*reportInterval {
instance.Status = "error"
m.instances.Store(instance.ID, instance)
m.sendSSEEvent("update", instance)
}
}
}
}
// stopInstance 停止实例
func (m *Master) stopInstance(instance *Instance) {
// 如果已经是停止状态,不重复操作
if instance.Status == "stopped" {
return
}
// 如果没有命令或进程,直接设为已停止
if instance.cmd == nil || instance.cmd.Process == nil {
instance.Status = "stopped"
m.instances.Store(instance.ID, instance)
m.sendSSEEvent("update", instance)
return
}
// 发送终止信号
if instance.cmd.Process != nil {
if runtime.GOOS == "windows" {
instance.cmd.Process.Signal(os.Interrupt)
} else {
instance.cmd.Process.Signal(syscall.SIGTERM)
}
time.Sleep(baseDuration)
}
// 关闭停止通道
close(instance.stopped)
// 取消执行或强制终止
if instance.cancelFunc != nil {
instance.cancelFunc()
} else {
err := instance.cmd.Process.Kill()
if err != nil {
m.logger.Error("stopInstance: instance error: %v [%v]", err, instance.ID)
}
}
m.logger.Info("Instance stopped [%v]", instance.ID)
// 重置实例状态
instance.Status = "stopped"
instance.stopped = make(chan struct{})
instance.cancelFunc = nil
m.instances.Store(instance.ID, instance)
// 保存状态变更
go m.saveState()
// 发送停止事件
m.sendSSEEvent("update", instance)
}
// enhanceURL 增强URL,添加日志级别和TLS配置
func (m *Master) enhanceURL(instanceURL string, instanceType string) string {
parsedURL, err := url.Parse(instanceURL)
if err != nil {
m.logger.Error("enhanceURL: invalid URL format: %v", err)
return instanceURL
}
query := parsedURL.Query()
// 设置日志级别
if m.logLevel != "" && query.Get("log") == "" {
query.Set("log", m.logLevel)
}
// 为服务端实例设置TLS配置
if instanceType == "server" && m.tlsCode != "0" {
if query.Get("tls") == "" {
query.Set("tls", m.tlsCode)
}
// 为TLS code-2设置证书和密钥
if m.tlsCode == "2" {
if m.crtPath != "" && query.Get("crt") == "" {
query.Set("crt", m.crtPath)
}
if m.keyPath != "" && query.Get("key") == "" {
query.Set("key", m.keyPath)
}
}
}
parsedURL.RawQuery = query.Encode()
return parsedURL.String()
}
// generateConfigURL 生成实例的完整URL
func (m *Master) generateConfigURL(instance *Instance) string {
parsedURL, err := url.Parse(instance.URL)
if err != nil {
m.logger.Error("generateConfigURL: invalid URL format: %v", err)
return instance.URL
}
query := parsedURL.Query()
// 设置日志级别
if m.logLevel != "" && query.Get("log") == "" {
query.Set("log", m.logLevel)
}
// 设置TLS配置
if instance.Type == "server" && m.tlsCode != "0" {
if query.Get("tls") == "" {
query.Set("tls", m.tlsCode)
}
// 为TLS code-2设置证书和密钥
if m.tlsCode == "2" {
if m.crtPath != "" && query.Get("crt") == "" {
query.Set("crt", m.crtPath)
}
if m.keyPath != "" && query.Get("key") == "" {
query.Set("key", m.keyPath)
}
}
}
// 根据实例类型设置默认参数
switch instance.Type {
case "client":
// client参数: min, mode, read, rate, slot, proxy
if query.Get("min") == "" {
query.Set("min", strconv.Itoa(defaultMinPool))
}
if query.Get("mode") == "" {
query.Set("mode", defaultRunMode)
}
if query.Get("read") == "" {
query.Set("read", defaultReadTimeout.String())
}
if query.Get("rate") == "" {
query.Set("rate", strconv.Itoa(defaultRateLimit))
}
if query.Get("slot") == "" {
query.Set("slot", strconv.Itoa(defaultSlotLimit))
}
if query.Get("proxy") == "" {
query.Set("proxy", defaultProxyProtocol)
}
case "server":
// server参数: max, mode, read, rate, slot, proxy
if query.Get("max") == "" {
query.Set("max", strconv.Itoa(defaultMaxPool))
}
if query.Get("mode") == "" {
query.Set("mode", defaultRunMode)
}
if query.Get("read") == "" {
query.Set("read", defaultReadTimeout.String())
}
if query.Get("rate") == "" {
query.Set("rate", strconv.Itoa(defaultRateLimit))
}
if query.Get("slot") == "" {
query.Set("slot", strconv.Itoa(defaultSlotLimit))
}
if query.Get("proxy") == "" {
query.Set("proxy", defaultProxyProtocol)
}
}
parsedURL.RawQuery = query.Encode()
return parsedURL.String()
}
// generateID 生成随机ID
func generateID() string {
bytes := make([]byte, 4)
rand.Read(bytes)
return hex.EncodeToString(bytes)
}
// generateAPIKey 生成API Key
func generateAPIKey() string {
bytes := make([]byte, 16)
rand.Read(bytes)
return hex.EncodeToString(bytes)
}
// httpError 返回HTTP错误
func httpError(w http.ResponseWriter, message string, statusCode int) {
setCorsHeaders(w)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(statusCode)
json.NewEncoder(w).Encode(map[string]string{"error": message})
}
// writeJSON 写入JSON响应
func writeJSON(w http.ResponseWriter, statusCode int, data any) {
setCorsHeaders(w)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(statusCode)
json.NewEncoder(w).Encode(data)
}
// generateOpenAPISpec 生成OpenAPI规范文档
func (m *Master) generateOpenAPISpec() string {
return fmt.Sprintf(`{
"openapi": "3.1.1",
"info": {
"title": "NodePass API",
"description": "API for managing NodePass server and client instances",
"version": "%s"
},
"servers": [{"url": "%s"}],
"security": [{"ApiKeyAuth": []}],
"paths": {
"/instances": {
"get": {
"summary": "List all instances",
"security": [{"ApiKeyAuth": []}],
"responses": {
"200": {"description": "Success", "content": {"application/json": {"schema": {"type": "array", "items": {"$ref": "#/components/schemas/Instance"}}}}},
"401": {"description": "Unauthorized"},
"405": {"description": "Method not allowed"}
}
},
"post": {
"summary": "Create a new instance",
"security": [{"ApiKeyAuth": []}],
"requestBody": {"required": true, "content": {"application/json": {"schema": {"$ref": "#/components/schemas/CreateInstanceRequest"}}}},
"responses": {
"201": {"description": "Created", "content": {"application/json": {"schema": {"$ref": "#/components/schemas/Instance"}}}},
"400": {"description": "Invalid input"},
"401": {"description": "Unauthorized"},
"405": {"description": "Method not allowed"},
"409": {"description": "Instance ID already exists"}
}
}
},
"/instances/{id}": {
"parameters": [{"name": "id", "in": "path", "required": true, "schema": {"type": "string"}}],
"get": {
"summary": "Get instance details",
"security": [{"ApiKeyAuth": []}],
"responses": {
"200": {"description": "Success", "content": {"application/json": {"schema": {"$ref": "#/components/schemas/Instance"}}}},
"400": {"description": "Instance ID required"},
"401": {"description": "Unauthorized"},
"404": {"description": "Not found"},
"405": {"description": "Method not allowed"}
}
},
"patch": {
"summary": "Update instance",
"security": [{"ApiKeyAuth": []}],
"requestBody": {"required": true, "content": {"application/json": {"schema": {"$ref": "#/components/schemas/UpdateInstanceRequest"}}}},
"responses": {
"200": {"description": "Success", "content": {"application/json": {"schema": {"$ref": "#/components/schemas/Instance"}}}},
"400": {"description": "Instance ID required or invalid input"},
"401": {"description": "Unauthorized"},
"404": {"description": "Not found"},
"405": {"description": "Method not allowed"}
}
},
"put": {
"summary": "Update instance URL",
"security": [{"ApiKeyAuth": []}],
"requestBody": {"required": true, "content": {"application/json": {"schema": {"$ref": "#/components/schemas/PutInstanceRequest"}}}},
"responses": {
"200": {"description": "Success", "content": {"application/json": {"schema": {"$ref": "#/components/schemas/Instance"}}}},
"400": {"description": "Instance ID required or invalid input"},
"401": {"description": "Unauthorized"},
"403": {"description": "Forbidden"},
"404": {"description": "Not found"},
"405": {"description": "Method not allowed"},
"409": {"description": "Instance URL conflict"}
}
},
"delete": {
"summary": "Delete instance",
"security": [{"ApiKeyAuth": []}],
"responses": {
"204": {"description": "Deleted"},
"400": {"description": "Instance ID required"},
"401": {"description": "Unauthorized"},
"403": {"description": "Forbidden"},
"404": {"description": "Not found"},
"405": {"description": "Method not allowed"}
}
}
},
"/events": {
"get": {
"summary": "Subscribe to instance events",
"security": [{"ApiKeyAuth": []}],
"responses": {
"200": {"description": "Success", "content": {"text/event-stream": {}}},
"401": {"description": "Unauthorized"},
"405": {"description": "Method not allowed"}
}
}
},
"/info": {
"get": {
"summary": "Get master information",
"security": [{"ApiKeyAuth": []}],
"responses": {
"200": {"description": "Success", "content": {"application/json": {"schema": {"$ref": "#/components/schemas/MasterInfo"}}}},
"401": {"description": "Unauthorized"},
"405": {"description": "Method not allowed"}
}
},
"post": {
"summary": "Update master alias",
"security": [{"ApiKeyAuth": []}],
"requestBody": {"required": true, "content": {"application/json": {"schema": {"$ref": "#/components/schemas/UpdateMasterAliasRequest"}}}},
"responses": {
"200": {"description": "Success", "content": {"application/json": {"schema": {"$ref": "#/components/schemas/MasterInfo"}}}},
"400": {"description": "Invalid input"},
"401": {"description": "Unauthorized"},
"405": {"description": "Method not allowed"}
}
}
},
"/tcping": {
"get": {
"summary": "TCP connectivity test",
"security": [{"ApiKeyAuth": []}],
"parameters": [
{
"name": "target",
"in": "query",
"required": true,
"schema": {"type": "string"},
"description": "Target address in format host:port"
}
],
"responses": {
"200": {"description": "Success", "content": {"application/json": {"schema": {"$ref": "#/components/schemas/TCPingResult"}}}},
"400": {"description": "Target address required"},
"401": {"description": "Unauthorized"},
"405": {"description": "Method not allowed"}
}
}
},
"/openapi.json": {
"get": {
"summary": "Get OpenAPI specification",
"responses": {
"200": {"description": "Success", "content": {"application/json": {}}}
}
}
},
"/docs": {
"get": {
"summary": "Get Swagger UI",
"responses": {
"200": {"description": "Success", "content": {"text/html": {}}}
}
}
}
},
"components": {
"securitySchemes": {
"ApiKeyAuth": {
"type": "apiKey",
"in": "header",
"name": "X-API-Key",
"description": "API Key for authentication"
}
},
"schemas": {
"Instance": {
"type": "object",
"properties": {
"id": {"type": "string", "description": "Unique identifier"},
"alias": {"type": "string", "description": "Instance alias"},
"type": {"type": "string", "enum": ["client", "server"], "description": "Type of instance"},
"status": {"type": "string", "enum": ["running", "stopped", "error"], "description": "Instance status"},
"url": {"type": "string", "description": "Command string or API Key"},
"config": {"type": "string", "description": "Instance configuration URL"},
"restart": {"type": "boolean", "description": "Restart policy"},
"tags": {"type": "array", "items": {"$ref": "#/components/schemas/Tag"}, "description": "Tag array"},
"mode": {"type": "integer", "description": "Instance mode"},
"ping": {"type": "integer", "description": "TCPing latency"},
"pool": {"type": "integer", "description": "Pool active count"},
"tcps": {"type": "integer", "description": "TCP connection count"},
"udps": {"type": "integer", "description": "UDP connection count"},
"tcprx": {"type": "integer", "description": "TCP received bytes"},
"tcptx": {"type": "integer", "description": "TCP transmitted bytes"},
"udprx": {"type": "integer", "description": "UDP received bytes"},
"udptx": {"type": "integer", "description": "UDP transmitted bytes"}
}
},
"CreateInstanceRequest": {
"type": "object",
"required": ["url"],
"properties": {"url": {"type": "string", "description": "Command string(scheme://host:port/host:port)"}}
},
"UpdateInstanceRequest": {
"type": "object",
"properties": {
"alias": {"type": "string", "description": "Instance alias"},
"action": {"type": "string", "enum": ["start", "stop", "restart", "reset"], "description": "Action for the instance"},
"restart": {"type": "boolean", "description": "Instance restart policy"},
"tags": {"type": "array", "items": {"$ref": "#/components/schemas/Tag"}, "description": "Tag array"}
}
},
"PutInstanceRequest": {
"type": "object",
"required": ["url"],
"properties": {"url": {"type": "string", "description": "New command string(scheme://host:port/host:port)"}}
},
"MasterInfo": {
"type": "object",
"properties": {
"alias": {"type": "string", "description": "Master alias"},
"os": {"type": "string", "description": "Operating system"},
"arch": {"type": "string", "description": "System architecture"},
"cpu": {"type": "integer", "description": "CPU usage percentage"},
"mem_total": {"type": "integer", "format": "int64", "description": "Total memory in bytes"},
"mem_used": {"type": "integer", "format": "int64", "description": "Used memory in bytes"},
"swap_total": {"type": "integer", "format": "int64", "description": "Total swap space in bytes"},
"swap_used": {"type": "integer", "format": "int64", "description": "Used swap space in bytes"},
"netrx": {"type": "integer", "format": "int64", "description": "Network received bytes"},
"nettx": {"type": "integer", "format": "int64", "description": "Network transmitted bytes"},
"diskr": {"type": "integer", "format": "int64", "description": "Disk read bytes"},
"diskw": {"type": "integer", "format": "int64", "description": "Disk write bytes"},
"sysup": {"type": "integer", "format": "int64", "description": "System uptime in seconds"},
"ver": {"type": "string", "description": "NodePass version"},
"name": {"type": "string", "description": "Hostname"},
"uptime": {"type": "integer", "format": "int64", "description": "API uptime in seconds"},
"log": {"type": "string", "description": "Log level"},
"tls": {"type": "string", "description": "TLS code"},
"crt": {"type": "string", "description": "Certificate path"},
"key": {"type": "string", "description": "Private key path"}
}
},
"UpdateMasterAliasRequest": {
"type": "object",
"required": ["alias"],
"properties": {"alias": {"type": "string", "description": "Master alias"}}
},
"TCPingResult": {
"type": "object",
"properties": {
"target": {"type": "string", "description": "Target address"},
"connected": {"type": "boolean", "description": "Is connected"},
"latency": {"type": "integer", "format": "int64", "description": "Latency in milliseconds"},
"error": {"type": "string", "nullable": true, "description": "Error message"}
}
},
"Tag": {
"type": "object",
"required": ["key", "value"],
"properties": {
"key": {"type": "string", "description": "Tag key"},
"value": {"type": "string", "description": "Tag value"}
}
}
}
}
}`, openAPIVersion, m.prefix)
}