Update On Sat Sep 27 20:33:38 CEST 2025

This commit is contained in:
github-action[bot]
2025-09-27 20:33:38 +02:00
parent 22c5f5fb97
commit 52cc3405d5
52 changed files with 2100 additions and 1134 deletions

View File

@@ -1,5 +1,5 @@
<div align="center">
<img src="https://cdn.yobc.de/assets/np-gopher.png" alt="nodepass" width="300">
<img src="https://cdn.yobc.de/assets/np-gopher.png" width="300">
[![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go#networking)
[![GitHub release](https://img.shields.io/github/v/release/yosebyte/nodepass)](https://github.com/yosebyte/nodepass/releases)
@@ -10,6 +10,8 @@
[![Ask DeepWiki](https://deepwiki.com/badge.svg)](https://deepwiki.com/yosebyte/nodepass)
![GitHub last commit](https://img.shields.io/github/last-commit/yosebyte/nodepass)
<a href="https://apps.apple.com/us/app/nodepass/id6747930492"><img src="https://cdn.yobc.de/assets/appstore.png" width="120"></a>
English | [简体中文](README_zh.md)
</div>
@@ -98,6 +100,8 @@ The [NodePassProject](https://github.com/NodePassProject) organization develops
- **[npsh](https://github.com/NodePassProject/npsh)**: A collection of one-click scripts that provide simple deployment for API or Dashboard with flexible configuration and management.
- **[NodePass-ApplePlatforms](https://github.com/NodePassProject/NodePass-ApplePlatforms)**: An iOS/macOS application that offers a native experience for Apple users.
- **[nodepass-core](https://github.com/NodePassProject/nodepass-core)**: Development branch, featuring previews of new functionalities and performance optimizations, suitable for advanced users and developers.
## 💬 Discussion

View File

@@ -10,6 +10,8 @@
[![Ask DeepWiki](https://deepwiki.com/badge.svg)](https://deepwiki.com/yosebyte/nodepass)
![GitHub last commit](https://img.shields.io/github/last-commit/yosebyte/nodepass)
<a href="https://apps.apple.com/cn/app/nodepass/id6747930492"><img src="https://cdn.yobc.de/assets/appstore.png" width="120"></a>
[English](README.md) | 简体中文
</div>
@@ -98,6 +100,8 @@ nodepass "master://:10101/api?log=debug&tls=1"
- **[npsh](https://github.com/NodePassProject/npsh)**: 简单易用的 NodePass 一键脚本合集,包括 API 主控、Dash 面板的安装部署、灵活配置和辅助管理。
- **[NodePass-ApplePlatforms](https://github.com/NodePassProject/NodePass-ApplePlatforms)**: 一款为 Apple 用户提供原生体验的 iOS/macOS 应用程序。
- **[nodepass-core](https://github.com/NodePassProject/nodepass-core)**: 开发分支,包含新功能预览和性能优化测试,适合高级用户和开发者。
## 💬 讨论

View File

@@ -2,37 +2,80 @@ package main
import (
"crypto/tls"
"fmt"
"net/url"
"os"
"runtime"
"time"
"github.com/NodePassProject/cert"
"github.com/NodePassProject/logs"
"github.com/yosebyte/nodepass/internal"
)
// coreDispatch 根据URL方案分派到不同的运行模式
func coreDispatch(parsedURL *url.URL) {
var core interface{ Run() }
// start 启动核心逻辑
func start(args []string) error {
if len(args) != 2 {
return fmt.Errorf("start: empty URL command")
}
switch scheme := parsedURL.Scheme; scheme {
case "server", "master":
tlsCode, tlsConfig := getTLSProtocol(parsedURL)
if scheme == "server" {
core = internal.NewServer(parsedURL, tlsCode, tlsConfig, logger)
} else {
core = internal.NewMaster(parsedURL, tlsCode, tlsConfig, logger, version)
}
case "client":
core = internal.NewClient(parsedURL, logger)
default:
logger.Error("Unknown core: %v", scheme)
getExitInfo()
parsedURL, err := url.Parse(args[1])
if err != nil {
return fmt.Errorf("start: parse URL failed: %v", err)
}
logger := initLogger(parsedURL.Query().Get("log"))
core, err := createCore(parsedURL, logger)
if err != nil {
return fmt.Errorf("start: create core failed: %v", err)
}
core.Run()
return nil
}
// initLogger 初始化日志记录器
func initLogger(level string) *logs.Logger {
logger := logs.NewLogger(logs.Info, true)
switch level {
case "none":
logger.SetLogLevel(logs.None)
case "debug":
logger.SetLogLevel(logs.Debug)
logger.Debug("Init log level: DEBUG")
case "warn":
logger.SetLogLevel(logs.Warn)
logger.Warn("Init log level: WARN")
case "error":
logger.SetLogLevel(logs.Error)
logger.Error("Init log level: ERROR")
case "event":
logger.SetLogLevel(logs.Event)
logger.Event("Init log level: EVENT")
default:
}
return logger
}
// createCore 创建核心
func createCore(parsedURL *url.URL, logger *logs.Logger) (interface{ Run() }, error) {
switch parsedURL.Scheme {
case "server":
tlsCode, tlsConfig := getTLSProtocol(parsedURL, logger)
return internal.NewServer(parsedURL, tlsCode, tlsConfig, logger)
case "client":
return internal.NewClient(parsedURL, logger)
case "master":
tlsCode, tlsConfig := getTLSProtocol(parsedURL, logger)
return internal.NewMaster(parsedURL, tlsCode, tlsConfig, logger, version)
default:
return nil, fmt.Errorf("unknown core: %v", parsedURL)
}
}
// getTLSProtocol 获取TLS配置
func getTLSProtocol(parsedURL *url.URL) (string, *tls.Config) {
func getTLSProtocol(parsedURL *url.URL, logger *logs.Logger) (string, *tls.Config) {
// 生成基本TLS配置
tlsConfig, err := cert.NewTLSConfig(version)
if err != nil {
@@ -99,3 +142,40 @@ func getTLSProtocol(parsedURL *url.URL) (string, *tls.Config) {
return "0", nil
}
}
// exit 退出程序并显示帮助信息
func exit(err error) {
errMsg1, errMsg2 := "", ""
if err != nil {
errStr := "FAILED: " + err.Error()
if len(errStr) > 35 {
errMsg1 = errStr[:35]
if len(errStr) > 70 {
errMsg2 = errStr[35:67] + "..."
} else {
errMsg2 = errStr[35:]
}
} else {
errMsg1 = errStr
}
}
fmt.Printf(`
╭─────────────────────────────────────╮
│ ░░█▀█░█▀█░░▀█░█▀▀░█▀█░█▀█░█▀▀░█▀▀░░ │
│ ░░█░█░█░█░█▀█░█▀▀░█▀▀░█▀█░▀▀█░▀▀█░░ │
│ ░░▀░▀░▀▀▀░▀▀▀░▀▀▀░▀░░░▀░▀░▀▀▀░▀▀▀░░ │
├─────────────────────────────────────┤
│%*s │
│%*s │
├─────────────────────────────────────┤
│ server://password@host/host?<query> │
│ client://password@host/host?<query> │
│ master://hostname:port/path?<query> │
├─────────────────────────────────────┤
│ %-35s │
│ %-35s │
╰─────────────────────────────────────╯
`, 36, version, 36, fmt.Sprintf("%s/%s", runtime.GOOS, runtime.GOARCH), errMsg1, errMsg2)
os.Exit(1)
}

View File

@@ -1,84 +1,11 @@
package main
import (
"net/url"
"os"
"runtime"
import "os"
"github.com/NodePassProject/logs"
)
var version = "dev"
var (
// 全局日志记录器
logger = logs.NewLogger(logs.Info, true)
// 程序版本
version = "dev"
)
// main 程序入口
func main() {
parsedURL := getParsedURL(os.Args)
initLogLevel(parsedURL.Query().Get("log"))
coreDispatch(parsedURL)
}
// getParsedURL 解析URL参数
func getParsedURL(args []string) *url.URL {
if len(args) < 2 {
getExitInfo()
}
parsedURL, err := url.Parse(args[1])
if err != nil {
logger.Error("URL parse: %v", err)
getExitInfo()
}
return parsedURL
}
// initLogLevel 初始化日志级别
func initLogLevel(level string) {
switch level {
case "none":
logger.SetLogLevel(logs.None)
case "debug":
logger.SetLogLevel(logs.Debug)
logger.Debug("Init log level: DEBUG")
case "warn":
logger.SetLogLevel(logs.Warn)
logger.Warn("Init log level: WARN")
case "error":
logger.SetLogLevel(logs.Error)
logger.Error("Init log level: ERROR")
case "event":
logger.SetLogLevel(logs.Event)
logger.Event("Init log level: EVENT")
default:
logger.SetLogLevel(logs.Info)
logger.Info("Init log level: INFO")
if err := start(os.Args); err != nil {
exit(err)
}
}
// getExitInfo 输出帮助信息并退出程序
func getExitInfo() {
logger.SetLogLevel(logs.Info)
logger.Info(`Version: %v %v/%v
╭─────────────────────────────────────────────╮
│ ░░█▀█░█▀█░░▀█░█▀▀░█▀█░█▀█░█▀▀░█▀▀░░ │
│ ░░█░█░█░█░█▀█░█▀▀░█▀▀░█▀█░▀▀█░▀▀█░░ │
│ ░░▀░▀░▀▀▀░▀▀▀░▀▀▀░▀░░░▀░▀░▀▀▀░▀▀▀░░ │
├─────────────────────────────────────────────┤
│ >Universal TCP/UDP Tunneling Solution │
│ >https://github.com/yosebyte/nodepass │
├─────────────────────────────────────────────┤
│ Usage: nodepass "<your-unique-URL-command>" │
├─────────────────────────────────────────────┤
│ server://password@host/host?<query>&<query> │
│ client://password@host/host?<query>&<query> │
│ master://hostname:port/path?<query>&<query> │
╰─────────────────────────────────────────────╯
`, version, runtime.GOOS, runtime.GOARCH)
os.Exit(1)
}

View File

@@ -42,6 +42,7 @@ nodepass "master://0.0.0.0:9090/admin?log=info&tls=1"
| `/instances/{id}` | DELETE | Delete instance |
| `/events` | GET | SSE real-time event stream |
| `/info` | GET | Get master service info |
| `/info` | POST | Update master alias |
| `/tcping` | GET | TCP connection test |
| `/openapi.json` | GET | OpenAPI specification |
| `/docs` | GET | Swagger UI documentation |
@@ -64,7 +65,13 @@ API Key authentication is enabled by default, automatically generated and saved
"type": "client|server",
"status": "running|stopped|error",
"url": "...",
"config": "server://0.0.0.0:8080/localhost:3000?log=info&tls=1&max=1024&mode=0&read=1h&rate=0&slot=65536&proxy=0",
"restart": true,
"tags": [
{"key": "environment", "value": "production"},
{"key": "region", "value": "us-west-2"},
{"key": "project", "value": "web-service"}
],
"mode": 0,
"ping": 0,
"pool": 0,
@@ -81,7 +88,9 @@ API Key authentication is enabled by default, automatically generated and saved
- `ping`/`pool`: Health check data
- `tcps`/`udps`: Current active connection count statistics
- `tcprx`/`tcptx`/`udprx`/`udptx`: Cumulative traffic statistics
- `config`: Instance configuration URL with complete startup configuration
- `restart`: Auto-restart policy
- `tags`: Optional key-value pairs for labeling and organizing instances
### Instance URL Format
@@ -514,6 +523,31 @@ To properly manage lifecycles:
return data.success;
}
// Update instance tags
async function updateInstanceTags(instanceId, tags) {
const response = await fetch(`${API_URL}/instances/${instanceId}`, {
method: 'PATCH',
headers: {
'Content-Type': 'application/json',
'X-API-Key': apiKey // If API Key is enabled
},
body: JSON.stringify({ tags })
});
const data = await response.json();
return data.success;
}
// Delete specific tags by setting their values to empty string
async function deleteInstanceTags(instanceId, tagKeys) {
const tagsToDelete = {};
tagKeys.forEach(key => {
tagsToDelete[key] = ""; // Empty string removes the tag
});
return await updateInstanceTags(instanceId, tagsToDelete);
}
// Update instance URL configuration
async function updateInstanceURL(instanceId, newURL) {
const response = await fetch(`${API_URL}/instances/${instanceId}`, {
@@ -530,7 +564,44 @@ To properly manage lifecycles:
}
```
5. **Auto-restart Policy Management**: Configure automatic startup behavior
5. **Tag Management**: Label and organize instances using key-value pairs
```javascript
// Update instance tags via PATCH method
async function updateInstanceTags(instanceId, tags) {
const response = await fetch(`${API_URL}/instances/${instanceId}`, {
method: 'PATCH',
headers: {
'Content-Type': 'application/json',
'X-API-Key': apiKey
},
body: JSON.stringify({ tags })
});
return response.json();
}
// Add or update tags - existing tags are preserved unless specified
await updateInstanceTags('abc123', [
{"key": "environment", "value": "production"}, // Add/update
{"key": "region", "value": "us-west-2"}, // Add/update
{"key": "team", "value": "backend"} // Add/update
]);
// Delete specific tags by setting empty values
await updateInstanceTags('abc123', [
{"key": "old-tag", "value": ""}, // Delete this tag
{"key": "temp-env", "value": ""} // Delete this tag
]);
// Mix operations: add/update some tags, delete others
await updateInstanceTags('abc123', [
{"key": "environment", "value": "staging"}, // Update existing
{"key": "version", "value": "2.0"}, // Add new
{"key": "deprecated", "value": ""} // Delete existing
]);
```
6. **Auto-restart Policy Management**: Configure automatic startup behavior
```javascript
async function setAutoStartPolicy(instanceId, enableAutoStart) {
const response = await fetch(`${API_URL}/instances/${instanceId}`, {
@@ -782,6 +853,17 @@ async function configureAutoStartPolicies(instances) {
}
```
### Tag Management Rules
1. **Merge-based Updates**: Tags are processed with merge logic - existing tags are preserved unless explicitly updated or deleted
2. **Array Format**: Tags use array format `[{"key": "key1", "value": "value1"}]`
3. **Key Filtering**: Empty keys are automatically filtered out and rejected by validation
4. **Delete by Empty Value**: Set `value: ""` (empty string) to delete an existing tag key
5. **Add/Update Logic**: Non-empty values will add new tags or update existing ones
6. **Uniqueness Check**: Duplicate keys are not allowed within the same tag operation
7. **Limits**: Maximum 50 tags, key names ≤100 characters, values ≤500 characters
8. **Persistence**: All tag operations are automatically saved to disk and restored after restart
## Instance Data Structure
The instance object in API responses contains the following fields:
@@ -793,7 +875,13 @@ The instance object in API responses contains the following fields:
"type": "server", // Instance type: server or client
"status": "running", // Instance status: running, stopped, or error
"url": "server://...", // Instance configuration URL
"config": "server://0.0.0.0:8080/localhost:3000?log=info&tls=1&max=1024&mode=0&read=1h&rate=0&slot=65536&proxy=0", // Complete configuration URL
"restart": true, // Auto-restart policy
"tags": [ // Tag array
{"key": "environment", "value": "production"},
{"key": "project", "value": "web-service"},
{"key": "region", "value": "us-west-2"}
],
"mode": 0, // Instance mode
"tcprx": 1024, // TCP received bytes
"tcptx": 2048, // TCP transmitted bytes
@@ -804,8 +892,31 @@ The instance object in API responses contains the following fields:
**Note:**
- `alias` field is optional, empty string if not set
- `config` field contains the instance's complete configuration URL, auto-generated by the system
- `mode` field indicates the current runtime mode of the instance
- `restart` field controls the auto-restart behavior of the instance
- `tags` field is optional, only included when tags are present
### Instance Configuration Field
NodePass Master automatically maintains the `config` field for each instance:
- **Auto Generation**: Automatically generated when instances are created and updated, no manual maintenance required
- **Complete Configuration**: Contains the instance's complete URL with all default parameters
- **Configuration Inheritance**: log and tls configurations are inherited from master settings
- **Default Parameters**: Other parameters use system defaults
- **Read-Only Nature**: Auto-generated field that cannot be directly modified through the API
**Example config field value:**
```
server://0.0.0.0:8080/localhost:3000?log=info&tls=1&max=1024&mode=0&read=1h&rate=0&slot=65536&proxy=0
```
This feature is particularly useful for:
- Configuration backup and export
- Instance configuration integrity checks
- Automated deployment scripts
- Configuration documentation generation
## System Information Endpoint
@@ -825,13 +936,14 @@ The response contains the following system information fields:
```json
{
"alias": "dev", // Master alias
"os": "linux", // Operating system type
"arch": "amd64", // System architecture
"cpu": 45, // CPU usage percentage (Linux only)
"mem_total": 8589934592, // Total memory in bytes (Linux only)
"mem_free": 2684354560, // Free memory in bytes (Linux only)
"mem_used": 2684354560, // Used memory in bytes (Linux only)
"swap_total": 3555328000, // Total swap space in bytes (Linux only)
"swap_free": 3555328000, // Free swap space in bytes (Linux only)
"swap_used": 3555328000, // Used swap space in bytes (Linux only)
"netrx": 1048576000, // Network received bytes (cumulative, Linux only)
"nettx": 2097152000, // Network transmitted bytes (cumulative, Linux only)
"diskr": 4194304000, // Disk read bytes (cumulative, Linux only)
@@ -879,12 +991,16 @@ function displaySystemStatus() {
console.log(`CPU usage: ${info.cpu}%`);
}
if (info.mem_total > 0) {
const memUsagePercent = ((info.mem_total - info.mem_free) / info.mem_total * 100).toFixed(1);
console.log(`Memory usage: ${memUsagePercent}% (${(info.mem_free / 1024 / 1024 / 1024).toFixed(1)}GB free of ${(info.mem_total / 1024 / 1024 / 1024).toFixed(1)}GB total)`);
const memUsagePercent = (info.mem_used / info.mem_total * 100).toFixed(1);
const memFreeGB = ((info.mem_total - info.mem_used) / 1024 / 1024 / 1024).toFixed(1);
const memTotalGB = (info.mem_total / 1024 / 1024 / 1024).toFixed(1);
console.log(`Memory usage: ${memUsagePercent}% (${memFreeGB}GB free of ${memTotalGB}GB total)`);
}
if (info.swap_total > 0) {
const swapUsagePercent = ((info.swap_total - info.swap_free) / info.swap_total * 100).toFixed(1);
console.log(`Swap usage: ${swapUsagePercent}% (${(info.swap_free / 1024 / 1024 / 1024).toFixed(1)}GB free of ${(info.swap_total / 1024 / 1024 / 1024).toFixed(1)}GB total)`);
const swapUsagePercent = (info.swap_used / info.swap_total * 100).toFixed(1);
const swapFreeGB = ((info.swap_total - info.swap_used) / 1024 / 1024 / 1024).toFixed(1);
const swapTotalGB = (info.swap_total / 1024 / 1024 / 1024).toFixed(1);
console.log(`Swap usage: ${swapUsagePercent}% (${swapFreeGB}GB free of ${swapTotalGB}GB total)`);
}
} else {
console.log('CPU, memory, swap space, network I/O, disk I/O, and system uptime monitoring is only available on Linux systems');
@@ -910,8 +1026,8 @@ function displaySystemStatus() {
- **Log level verification**: Ensure current log level meets expectations
- **Resource monitoring**: On Linux systems, monitor CPU, memory, swap space, network I/O, disk I/O usage to ensure optimal performance
- CPU usage is calculated by parsing `/proc/stat` (percentage of non-idle time)
- Memory information is obtained by parsing `/proc/meminfo` (total and free memory in bytes)
- Swap space information is obtained by parsing `/proc/meminfo` (total and free swap space in bytes)
- Memory information is obtained by parsing `/proc/meminfo` (total and used memory in bytes, calculated as total minus available)
- Swap space information is obtained by parsing `/proc/meminfo` (total and used swap space in bytes, calculated as total minus free)
- Network I/O is calculated by parsing `/proc/net/dev` (cumulative bytes, excluding virtual interfaces)
- Disk I/O is calculated by parsing `/proc/diskstats` (cumulative bytes, major devices only)
- System uptime is obtained by parsing `/proc/uptime`
@@ -987,13 +1103,12 @@ const instance = await fetch(`${API_URL}/instances/abc123`, {
```
#### PATCH /instances/{id}
- **Description**: Update instance state, alias, or perform control operations
- **Description**: Update instance state, alias, tags, or perform control operations
- **Authentication**: Requires API Key
- **Request body**: `{ "alias": "new alias", "action": "start|stop|restart|reset", "restart": true|false }`
- **Features**: Does not interrupt running instances, only updates specified fields. `action: "reset"` can reset traffic statistics (tcprx, tcptx, udprx, udptx) for the instance to zero.
- **Request body**: `{ "alias": "new alias", "action": "start|stop|restart|reset", "restart": true|false, "tags": [{"key": "key1", "value": "value1"}] }`
- **Example**:
```javascript
// Update alias and auto-restart policy
// Update tags
await fetch(`${API_URL}/instances/abc123`, {
method: 'PATCH',
headers: {
@@ -1001,29 +1116,27 @@ await fetch(`${API_URL}/instances/abc123`, {
'X-API-Key': apiKey
},
body: JSON.stringify({
alias: 'Web Server',
restart: true
tags: [
{"key": "environment", "value": "production"},
{"key": "region", "value": "us-west-2"}
]
})
});
// Control instance operations
// Update and delete tags in one operation
await fetch(`${API_URL}/instances/abc123`, {
method: 'PATCH',
headers: {
'Content-Type': 'application/json',
'X-API-Key': apiKey
},
body: JSON.stringify({ action: 'restart' })
});
// Reset traffic statistics
await fetch(`${API_URL}/instances/abc123`, {
method: 'PATCH',
headers: {
'Content-Type': 'application/json',
'X-API-Key': apiKey
},
body: JSON.stringify({ action: 'reset' })
body: JSON.stringify({
tags: [
{"key": "environment", "value": "staging"}, // Update existing
{"key": "version", "value": "2.0"}, // Add new
{"key": "old-tag", "value": ""} // Delete existing
]
})
});
```
@@ -1074,6 +1187,28 @@ await fetch(`${API_URL}/instances/abc123`, {
- **Authentication**: Requires API Key
- **Response**: Contains system information, version, uptime, CPU and RAM usage, etc.
#### POST /info
- **Description**: Update master alias
- **Authentication**: Requires API Key
- **Request body**: `{ "alias": "new alias" }`
- **Response**: Complete master information (same as GET /info)
- **Example**:
```javascript
// Update master alias
const response = await fetch(`${API_URL}/info`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-API-Key': apiKey
},
body: JSON.stringify({ alias: 'My NodePass Server' })
});
const data = await response.json();
console.log('Updated alias:', data.alias);
// Response contains full system info with updated alias
```
#### GET /tcping
- **Description**: TCP connection test, checks connectivity and latency to target address
- **Authentication**: Requires API Key
@@ -1130,9 +1265,10 @@ Examples:
| `tls` | TLS encryption level | `0`(none), `1`(self-signed), `2`(certificate) | `0` | Server only |
| `crt` | Certificate path | File path | None | Server only |
| `key` | Private key path | File path | None | Server only |
| `mode` | Runtime mode control | `0`(auto), `1`(force mode 1), `2`(force mode 2) | `0` | Both |
| `min` | Minimum pool capacity | Integer > 0 | `64` | Client dual-end handshake mode only |
| `max` | Maximum pool capacity | Integer > 0 | `1024` | Dual-end handshake mode |
| `mode` | Runtime mode control | `0`(auto), `1`(force mode 1), `2`(force mode 2) | `0` | Both |
| `read` | Read timeout duration | Time duration (e.g., `10m`, `30s`, `1h`) | `10m` | Both |
| `rate` | Bandwidth rate limit | Integer (Mbps), 0=unlimited | `0` | Both |
| `slot` | Connection slot count | Integer (1-65536) | `65536` | Both |
| `proxy` | PROXY protocol support | `0`(disabled), `1`(enabled) | `0` | Both |

View File

@@ -254,19 +254,19 @@ nodepass "server://0.0.0.0:10101/0.0.0.0:8080?log=info&tls=1&proxy=1&rate=100"
NodePass allows flexible configuration via URL query parameters. The following table shows which parameters are applicable in server, client, and master modes:
| Parameter | Description | server | client | master |
|-----------|----------------------|:------:|:------:|:------:|
| `log` | Log level | O | O | O |
| `tls` | TLS encryption mode | O | X | O |
| `crt` | Custom certificate path| O | X | O |
| `key` | Custom key path | O | X | O |
| `min` | Minimum pool capacity | X | O | X |
| `max` | Maximum pool capacity | O | X | X |
| `mode` | Run mode control | O | O | X |
| `read` | Data read timeout | O | O | X |
| `rate` | Bandwidth rate limit | O | O | X |
| `slot` | Maximum connection limit | O | O | X |
| `proxy` | PROXY protocol support| O | O | X |
| Parameter | Description | Default | server | client | master |
|-----------|----------------------|---------|:------:|:------:|:------:|
| `log` | Log level | `info` | O | O | O |
| `tls` | TLS encryption mode | `0` | O | X | O |
| `crt` | Custom certificate path| N/A | O | X | O |
| `key` | Custom key path | N/A | O | X | O |
| `min` | Minimum pool capacity | `64` | X | O | X |
| `max` | Maximum pool capacity | `1024` | O | X | X |
| `mode` | Run mode control | `0` | O | O | X |
| `read` | Data read timeout | `1h` | O | O | X |
| `rate` | Bandwidth rate limit | `0` | O | O | X |
| `slot` | Maximum connection limit | `65536` | O | O | X |
| `proxy` | PROXY protocol support| `0` | O | O | X |
- O: Parameter is valid and recommended for configuration
- X: Parameter is not applicable and should be ignored
@@ -285,6 +285,7 @@ NodePass behavior can be fine-tuned using environment variables. Below is the co
| Variable | Description | Default | Example |
|----------|-------------|---------|---------|
| `NP_SEMAPHORE_LIMIT` | Signal channel buffer size | 65536 | `export NP_SEMAPHORE_LIMIT=2048` |
| `NP_TCP_DATA_BUF_SIZE` | Buffer size for TCP data transfer | 32768 | `export NP_TCP_DATA_BUF_SIZE=65536` |
| `NP_UDP_DATA_BUF_SIZE` | Buffer size for UDP packets | 2048 | `export NP_UDP_DATA_BUF_SIZE=16384` |
| `NP_HANDSHAKE_TIMEOUT` | Timeout for handshake operations | 10s | `export NP_HANDSHAKE_TIMEOUT=30s` |
| `NP_TCP_DIAL_TIMEOUT` | Timeout for establishing TCP connections | 30s | `export NP_TCP_DIAL_TIMEOUT=60s` |
@@ -348,6 +349,11 @@ For applications relying heavily on UDP traffic:
For optimizing TCP connections:
- `NP_TCP_DATA_BUF_SIZE`: Buffer size for TCP data transfer
- Default (32768) provides good balance for most applications
- Increase for high-throughput applications requiring larger buffers
- Consider increasing to 65536 or higher for bulk data transfers and streaming
- `NP_TCP_DIAL_TIMEOUT`: Timeout for establishing TCP connections
- Default (30s) is suitable for most network conditions
- Increase for unstable network conditions
@@ -401,6 +407,7 @@ Environment variables:
export NP_MIN_POOL_INTERVAL=50ms
export NP_MAX_POOL_INTERVAL=500ms
export NP_SEMAPHORE_LIMIT=8192
export NP_TCP_DATA_BUF_SIZE=65536
export NP_UDP_DATA_BUF_SIZE=32768
export NP_POOL_GET_TIMEOUT=60s
export NP_REPORT_INTERVAL=10s

View File

@@ -327,7 +327,7 @@ NodePass uses tunnel keys to authenticate connections between clients and server
The handshake process between client and server is as follows:
1. **Client Connection**: Client connects to the server's tunnel address
2. **Key Authentication**: Client sends XOR-encrypted tunnel key
2. **Key Authentication**: Client sends encrypted tunnel key
3. **Server Verification**: Server decrypts and verifies if the key matches
4. **Configuration Sync**: Upon successful verification, server sends tunnel configuration including:
- Data flow direction

View File

@@ -42,6 +42,7 @@ nodepass "master://0.0.0.0:9090/admin?log=info&tls=1"
| `/instances/{id}` | DELETE | 删除实例 |
| `/events` | GET | SSE 实时事件流 |
| `/info` | GET | 获取主控服务信息 |
| `/info` | POST | 更新主控别名 |
| `/tcping` | GET | TCP连接测试 |
| `/openapi.json` | GET | OpenAPI 规范 |
| `/docs` | GET | Swagger UI 文档 |
@@ -64,7 +65,13 @@ API Key 认证默认启用,首次启动自动生成并保存在 `nodepass.gob`
"type": "client|server",
"status": "running|stopped|error",
"url": "...",
"config": "server://0.0.0.0:8080/localhost:3000?log=info&tls=1&max=1024&mode=0&read=1h&rate=0&slot=65536&proxy=0",
"restart": true,
"tags": [
{"key": "environment", "value": "production"},
{"key": "region", "value": "us-west-2"},
{"key": "project", "value": "web-service"}
],
"mode": 0,
"ping": 0,
"pool": 0,
@@ -81,7 +88,9 @@ API Key 认证默认启用,首次启动自动生成并保存在 `nodepass.gob`
- `ping`/`pool`:健康检查数据
- `tcps`/`udps`:当前活动连接数统计
- `tcprx`/`tcptx`/`udprx`/`udptx`:累计流量统计
- `config`实例配置URL包含完整的启动配置
- `restart`:自启动策略
- `tags`:可选的键值对,用于标记和组织实例
### 实例 URL 格式
@@ -112,6 +121,7 @@ API Key 认证默认启用,首次启动自动生成并保存在 `nodepass.gob`
- 所有实例、流量、健康检查、别名、自启动策略均持久化存储,重启后自动恢复
- API 详细规范见 `/openapi.json`Swagger UI 见 `/docs`
```javascript
// 重新生成API Key需要知道当前的API Key
async function regenerateApiKey() {
@@ -513,6 +523,31 @@ NodePass主控模式提供自动备份功能定期备份状态文件以防止
return data.success;
}
// 更新实例标签
async function updateInstanceTags(instanceId, tags) {
const response = await fetch(`${API_URL}/instances/${instanceId}`, {
method: 'PATCH',
headers: {
'Content-Type': 'application/json',
'X-API-Key': apiKey // 如果启用了API Key
},
body: JSON.stringify({ tags })
});
const data = await response.json();
return data.success;
}
// 通过设置空字符串值删除特定标签
async function deleteInstanceTags(instanceId, tagKeys) {
const tagsToDelete = {};
tagKeys.forEach(key => {
tagsToDelete[key] = ""; // 空字符串会删除标签
});
return await updateInstanceTags(instanceId, tagsToDelete);
}
// 更新实例URL配置
async function updateInstanceURL(instanceId, newURL) {
const response = await fetch(`${API_URL}/instances/${instanceId}`, {
@@ -529,7 +564,44 @@ NodePass主控模式提供自动备份功能定期备份状态文件以防止
}
```
5. **自启动策略管理**:配置自动启动行为
5. **标签管理**:使用键值对标记和组织实例
```javascript
// 通过PATCH方法更新实例标签
async function updateInstanceTags(instanceId, tags) {
const response = await fetch(`${API_URL}/instances/${instanceId}`, {
method: 'PATCH',
headers: {
'Content-Type': 'application/json',
'X-API-Key': apiKey
},
body: JSON.stringify({ tags })
});
return response.json();
}
// 添加或更新标签 - 现有标签会保留,除非被显式指定
await updateInstanceTags('abc123', [
{"key": "environment", "value": "production"}, // 添加/更新
{"key": "region", "value": "us-west-2"}, // 添加/更新
{"key": "team", "value": "backend"} // 添加/更新
]);
// 通过设置空值删除特定标签
await updateInstanceTags('abc123', [
{"key": "old-tag", "value": ""}, // 删除此标签
{"key": "temp-env", "value": ""} // 删除此标签
]);
// 混合操作:添加/更新某些标签,删除其他标签
await updateInstanceTags('abc123', [
{"key": "environment", "value": "staging"}, // 更新现有
{"key": "version", "value": "2.0"}, // 添加新的
{"key": "deprecated", "value": ""} // 删除现有
]);
```
6. **自启动策略管理**:配置自动启动行为
```javascript
async function setAutoStartPolicy(instanceId, enableAutoStart) {
const response = await fetch(`${API_URL}/instances/${instanceId}`, {
@@ -781,6 +853,17 @@ async function configureAutoStartPolicies(instances) {
}
```
### 标签管理规则
1. **合并式更新**:标签采用合并逻辑处理 - 现有标签会保留,除非被显式更新或删除
2. **数组格式**:标签使用数组格式 `[{"key": "key1", "value": "value1"}]`
3. **键过滤**:空键会被自动过滤并被验证拒绝
4. **空值删除**:设置 `value: ""` (空字符串)可删除现有标签键
5. **添加/更新逻辑**:非空值会添加新标签或更新现有标签
6. **唯一性检查**:同一标签操作中不允许重复的键名
7. **限制**最多50个标签键名长度≤100字符值长度≤500字符
8. **持久化**:所有标签操作自动保存到磁盘,重启后恢复
## 实例数据结构
API响应中的实例对象包含以下字段
@@ -792,7 +875,13 @@ API响应中的实例对象包含以下字段
"type": "server", // 实例类型server 或 client
"status": "running", // 实例状态running、stopped 或 error
"url": "server://...", // 实例配置URL
"config": "server://0.0.0.0:8080/localhost:3000?log=info&tls=1&max=1024&mode=0&read=1h&rate=0&slot=65536&proxy=0", // 完整配置URL
"restart": true, // 自启动策略
"tags": [ // 标签数组
{"key": "environment", "value": "production"},
{"key": "project", "value": "web-service"},
{"key": "region", "value": "us-west-2"}
],
"mode": 0, // 运行模式
"tcprx": 1024, // TCP接收字节数
"tcptx": 2048, // TCP发送字节数
@@ -803,8 +892,31 @@ API响应中的实例对象包含以下字段
**注意:**
- `alias` 字段为可选,如果未设置则为空字符串
- `config` 字段包含实例的完整配置URL由系统自动生成
- `mode` 字段表示实例当前的运行模式
- `restart` 字段控制实例的自启动行为
- `tags` 字段为可选,仅在设置标签时存在
### 实例配置字段
NodePass主控会自动为每个实例维护 `config` 字段:
- **自动生成**:在实例创建和更新时自动生成,无需手动维护
- **完整配置**包含实例的完整URL带有所有默认参数
- **配置继承**log和tls配置继承自主控设置
- **默认参数**:其他参数使用系统默认值
- **只读性质**自动生成的字段通过API无法直接修改
**示例 config 字段值:**
```
server://0.0.0.0:8080/localhost:3000?log=info&tls=1&max=1024&mode=0&read=1h&rate=0&slot=65536&proxy=0
```
此功能特别适用于:
- 配置备份和导出
- 实例配置的完整性检查
- 自动化部署脚本
- 配置文档生成
## 系统信息端点
@@ -824,13 +936,14 @@ GET /info
```json
{
"alias": "dev", // 主控别名
"os": "linux", // 操作系统类型
"arch": "amd64", // 系统架构
"cpu": 45, // CPU使用率百分比仅Linux系统
"mem_total": 8589934592, // 内存容量字节仅Linux系统
"mem_free": 2684354560, // 内存字节仅Linux系统
"mem_used": 2684354560, // 内存字节仅Linux系统
"swap_total": 3555328000, // 交换区总量字节仅Linux系统
"swap_free": 3555328000, // 交换区字节仅Linux系统
"swap_used": 3555328000, // 交换区字节仅Linux系统
"netrx": 1048576000, // 网络接收字节数累计值仅Linux
"nettx": 2097152000, // 网络发送字节数累计值仅Linux
"diskr": 4194304000, // 磁盘读取字节数累计值仅Linux
@@ -878,12 +991,16 @@ function displaySystemStatus() {
console.log(`CPU使用率: ${info.cpu}%`);
}
if (info.mem_total > 0) {
const memUsagePercent = ((info.mem_total - info.mem_free) / info.mem_total * 100).toFixed(1);
console.log(`内存使用率: ${memUsagePercent}% (${(info.mem_free / 1024 / 1024 / 1024).toFixed(1)}GB 可用,共 ${(info.mem_total / 1024 / 1024 / 1024).toFixed(1)}GB)`);
const memUsagePercent = (info.mem_used / info.mem_total * 100).toFixed(1);
const memFreeGB = ((info.mem_total - info.mem_used) / 1024 / 1024 / 1024).toFixed(1);
const memTotalGB = (info.mem_total / 1024 / 1024 / 1024).toFixed(1);
console.log(`内存使用率: ${memUsagePercent}% (${memFreeGB}GB 可用,共 ${memTotalGB}GB)`);
}
if (info.swap_total > 0) {
const swapUsagePercent = ((info.swap_total - info.swap_free) / info.swap_total * 100).toFixed(1);
console.log(`交换区使用率: ${swapUsagePercent}% (${(info.swap_free / 1024 / 1024 / 1024).toFixed(1)}GB 可用,共 ${(info.swap_total / 1024 / 1024 / 1024).toFixed(1)}GB)`);
const swapUsagePercent = (info.swap_used / info.swap_total * 100).toFixed(1);
const swapFreeGB = ((info.swap_total - info.swap_used) / 1024 / 1024 / 1024).toFixed(1);
const swapTotalGB = (info.swap_total / 1024 / 1024 / 1024).toFixed(1);
console.log(`交换区使用率: ${swapUsagePercent}% (${swapFreeGB}GB 可用,共 ${swapTotalGB}GB)`);
}
} else {
console.log('CPU、内存、交换区、网络I/O、磁盘I/O和系统运行时间监控功能仅在Linux系统上可用');
@@ -909,8 +1026,8 @@ function displaySystemStatus() {
- **日志级别验证**:确认当前日志级别符合预期
- **资源监控**在Linux系统上监控CPU、内存、交换区、网络I/O、磁盘I/O使用情况以确保最佳性能
- CPU使用率通过解析`/proc/stat`计算(非空闲时间百分比)
- 内存信息通过解析`/proc/meminfo`获取(总量和用量,单位为字节)
- 交换区信息通过解析`/proc/meminfo`获取(总量和用量,单位为字节)
- 内存信息通过解析`/proc/meminfo`获取(总量和用量,单位为字节,已用量计算为总量减去可用量
- 交换区信息通过解析`/proc/meminfo`获取(总量和用量,单位为字节,已用量计算为总量减去空闲量
- 网络I/O通过解析`/proc/net/dev`计算(累计字节数,排除虚拟接口)
- 磁盘I/O通过解析`/proc/diskstats`计算(累计字节数,仅统计主设备)
- 系统运行时间通过解析`/proc/uptime`获取
@@ -986,13 +1103,12 @@ const instance = await fetch(`${API_URL}/instances/abc123`, {
```
#### PATCH /instances/{id}
- **描述**:更新实例状态、别名或执行控制操作
- **描述**:更新实例状态、别名、标签或执行控制操作
- **认证**需要API Key
- **请求体**`{ "alias": "新别名", "action": "start|stop|restart|reset", "restart": true|false }`
- **特点**:不中断正在运行的实例,仅更新指定字段。`action: "reset"` 可将该实例的流量统计tcprx、tcptx、udprx、udptx清零。
- **请求体**`{ "alias": "新别名", "action": "start|stop|restart|reset", "restart": true|false, "tags": [{"key": "键", "value": "值"}] }`
- **示例**
```javascript
// 更新别名和自启动策略
// 更新标签
await fetch(`${API_URL}/instances/abc123`, {
method: 'PATCH',
headers: {
@@ -1000,29 +1116,27 @@ await fetch(`${API_URL}/instances/abc123`, {
'X-API-Key': apiKey
},
body: JSON.stringify({
alias: 'Web服务器',
restart: true
tags: [
{"key": "environment", "value": "production"},
{"key": "region", "value": "us-west-2"}
]
})
});
// 控制实例操作
// 一次操作中更新和删除标签
await fetch(`${API_URL}/instances/abc123`, {
method: 'PATCH',
headers: {
'Content-Type': 'application/json',
'X-API-Key': apiKey
},
body: JSON.stringify({ action: 'restart' })
});
// 清零流量统计
await fetch(`${API_URL}/instances/abc123`, {
method: 'PATCH',
headers: {
'Content-Type': 'application/json',
'X-API-Key': apiKey
},
body: JSON.stringify({ action: 'reset' })
body: JSON.stringify({
tags: [
{"key": "environment", "value": "staging"}, // 更新现有
{"key": "version", "value": "2.0"}, // 添加新的
{"key": "old-tag", "value": ""} // 删除现有
]
})
});
```
@@ -1073,6 +1187,28 @@ await fetch(`${API_URL}/instances/abc123`, {
- **认证**需要API Key
- **响应**包含系统信息、版本、运行时间、CPU和RAM使用率等
#### POST /info
- **描述**:更新主控别名
- **认证**需要API Key
- **请求体**`{ "alias": "新别名" }`
- **响应**完整的主控信息与GET /info相同
- **示例**
```javascript
// 更新主控别名
const response = await fetch(`${API_URL}/info`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-API-Key': apiKey
},
body: JSON.stringify({ alias: '我的NodePass服务器' })
});
const data = await response.json();
console.log('更新后的别名:', data.alias);
// 响应包含完整的系统信息,包括更新后的别名
```
#### GET /tcping
- **描述**TCP连接测试检测目标地址的连通性和延迟
- **认证**需要API Key
@@ -1129,9 +1265,10 @@ client://<server_host>:<server_port>/<local_host>:<local_port>?<parameters>
| `tls` | TLS加密级别 | `0`(无), `1`(自签名), `2`(证书) | `0` | 仅服务器 |
| `crt` | 证书路径 | 文件路径 | 无 | 仅服务器 |
| `key` | 私钥路径 | 文件路径 | 无 | 仅服务器 |
| `mode` | 运行模式控制 | `0`(自动), `1`(强制模式1), `2`(强制模式2) | `0` | 两者 |
| `min` | 最小连接池容量 | 整数 > 0 | `64` | 仅客户端双端握手模式 |
| `max` | 最大连接池容量 | 整数 > 0 | `1024` | 双端握手模式 |
| `mode` | 运行模式控制 | `0`(自动), `1`(强制模式1), `2`(强制模式2) | `0` | 两者 |
| `read` | 读取超时时间 | 时间长度 (如 `10m`, `30s`, `1h`) | `10m` | 两者 |
| `rate` | 带宽速率限制 | 整数 (Mbps), 0=无限制 | `0` | 两者 |
| `proxy` | PROXY协议支持 | `0`(禁用), `1`(启用) | `0` | 两者 |
| `slot` | 连接槽位数 | 整数 (1-65536) | `65536` | 两者 |
| `proxy` | PROXY协议支持 | `0`(禁用), `1`(启用) | `0` | 两者 |

View File

@@ -254,19 +254,19 @@ nodepass "server://0.0.0.0:10101/0.0.0.0:8080?log=info&tls=1&proxy=1&rate=100"
NodePass支持通过URL查询参数进行灵活配置不同参数在 server、client、master 模式下的适用性如下表:
| 参数 | 说明 | server | client | master |
|-----------|----------------------|:------:|:------:|:------:|
| `log` | 日志级别 | O | O | O |
| `tls` | TLS加密模式 | O | X | O |
| `crt` | 自定义证书路径 | O | X | O |
| `key` | 自定义密钥路径 | O | X | O |
| `min` | 最小连接池容量 | X | O | X |
| `max` | 最大连接池容量 | O | X | X |
| `mode` | 运行模式控制 | O | O | X |
| `read` | 读取超时时间 | O | O | X |
| `rate` | 带宽速率限制 | O | O | X |
| `slot` | 最大连接数限制 | O | O | X |
| `proxy` | PROXY协议支持 | O | O | X |
| 参数 | 说明 | 默认值 | server | client | master |
|-----------|----------------------|-----------|:------:|:------:|:------:|
| `log` | 日志级别 | `info` | O | O | O |
| `tls` | TLS加密模式 | `0` | O | X | O |
| `crt` | 自定义证书路径 | N/A | O | X | O |
| `key` | 自定义密钥路径 | N/A | O | X | O |
| `min` | 最小连接池容量 | `64` | X | O | X |
| `max` | 最大连接池容量 | `1024` | O | X | X |
| `mode` | 运行模式控制 | `0` | O | O | X |
| `read` | 读取超时时间 | `1h` | O | O | X |
| `rate` | 带宽速率限制 | `0` | O | O | X |
| `slot` | 最大连接数限制 | `65536` | O | O | X |
| `proxy` | PROXY协议支持 | `0` | O | O | X |
- O参数有效推荐根据实际场景配置
@@ -286,6 +286,7 @@ NodePass支持通过URL查询参数进行灵活配置不同参数在 server
| 变量 | 描述 | 默认值 | 示例 |
|----------|-------------|---------|---------|
| `NP_SEMAPHORE_LIMIT` | 信号缓冲区大小 | 65536 | `export NP_SEMAPHORE_LIMIT=2048` |
| `NP_TCP_DATA_BUF_SIZE` | TCP数据传输缓冲区大小 | 32768 | `export NP_TCP_DATA_BUF_SIZE=65536` |
| `NP_UDP_DATA_BUF_SIZE` | UDP数据包缓冲区大小 | 2048 | `export NP_UDP_DATA_BUF_SIZE=16384` |
| `NP_HANDSHAKE_TIMEOUT` | 握手操作超时 | 10s | `export NP_HANDSHAKE_TIMEOUT=30s` |
| `NP_TCP_DIAL_TIMEOUT` | TCP连接建立超时 | 30s | `export NP_TCP_DIAL_TIMEOUT=60s` |
@@ -349,6 +350,11 @@ NodePass支持通过URL查询参数进行灵活配置不同参数在 server
对于TCP连接的优化
- `NP_TCP_DATA_BUF_SIZE`TCP数据传输缓冲区大小
- 默认值(32768)为大多数应用提供良好平衡
- 对于需要大缓冲区的高吞吐量应用增加此值
- 考虑为批量数据传输和流媒体增加到65536或更高
- `NP_TCP_DIAL_TIMEOUT`TCP连接建立超时
- 默认值(30s)适用于大多数网络条件
- 对于网络条件不稳定的环境增加此值
@@ -402,6 +408,7 @@ nodepass "client://server.example.com:10101/127.0.0.1:8080?min=128&rate=500&slot
export NP_MIN_POOL_INTERVAL=50ms
export NP_MAX_POOL_INTERVAL=500ms
export NP_SEMAPHORE_LIMIT=8192
export NP_TCP_DATA_BUF_SIZE=65536
export NP_UDP_DATA_BUF_SIZE=32768
export NP_POOL_GET_TIMEOUT=60s
export NP_REPORT_INTERVAL=10s

View File

@@ -330,7 +330,7 @@ NodePass使用隧道密钥来验证客户端和服务端之间的连接。密钥
客户端与服务端的握手过程如下:
1. **客户端连接**:客户端连接到服务端的隧道地址
2. **密钥验证**:客户端发送XOR加密的隧道密钥
2. **密钥验证**:客户端发送加密的隧道密钥
3. **服务端验证**:服务端解密并验证密钥是否匹配
4. **配置同步**:验证成功后,服务端发送隧道配置信息,包括:
- 数据流向模式

View File

@@ -4,7 +4,7 @@ go 1.25.0
require (
github.com/NodePassProject/cert v1.0.1
github.com/NodePassProject/conn v1.0.12
github.com/NodePassProject/conn v1.0.15
github.com/NodePassProject/logs v1.0.3
github.com/NodePassProject/pool v1.0.24
github.com/NodePassProject/pool v1.0.30
)

View File

@@ -1,8 +1,8 @@
github.com/NodePassProject/cert v1.0.1 h1:BDy2tTOudy6yk7hvcmScAJMw4NrpCdSCsbuu7hHsIuw=
github.com/NodePassProject/cert v1.0.1/go.mod h1:wP7joOJeQAIlIuOUmhHPwMExjuwGa4XApMWQYChGSrk=
github.com/NodePassProject/conn v1.0.12 h1:63Ueb//leEptKNU8MXlrEDF3exJiitZefmJAEyMLnt4=
github.com/NodePassProject/conn v1.0.12/go.mod h1:xfQ7ZLUxrtdLsljGHYYCToW+Hdg6DAbmL1Cs94n5h6E=
github.com/NodePassProject/conn v1.0.15 h1:YJaWphxGO4EZGdel/Lw4taLcb2hmHefjJipkilHn0B4=
github.com/NodePassProject/conn v1.0.15/go.mod h1:xfQ7ZLUxrtdLsljGHYYCToW+Hdg6DAbmL1Cs94n5h6E=
github.com/NodePassProject/logs v1.0.3 h1:CDUZVQ477vmmFQHazrQCWM0gJPNINm0C2N3FzC4jVyw=
github.com/NodePassProject/logs v1.0.3/go.mod h1:TwtPXOzLtb8iH+fdduQjEEywICXivsM39cy9AinMSks=
github.com/NodePassProject/pool v1.0.24 h1:8DSgZ2dxnzYXgplp9ZwZlpwFiKIWU4JYW1glUm+hq/4=
github.com/NodePassProject/pool v1.0.24/go.mod h1:joQFk1oocg56QpJ1QK/2g5Jv/AyqYUQgPXMG1gWe8iA=
github.com/NodePassProject/pool v1.0.30 h1:EupeGn6nTOCzybQMHWmEphPjR7CTEptgOFvkD0UMw6Q=
github.com/NodePassProject/pool v1.0.30/go.mod h1:joQFk1oocg56QpJ1QK/2g5Jv/AyqYUQgPXMG1gWe8iA=

View File

@@ -3,7 +3,6 @@ package internal
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
@@ -13,6 +12,7 @@ import (
"os/signal"
"strconv"
"strings"
"sync"
"syscall"
"time"
@@ -28,25 +28,39 @@ type Client struct {
}
// NewClient 创建新的客户端实例
func NewClient(parsedURL *url.URL, logger *logs.Logger) *Client {
func NewClient(parsedURL *url.URL, logger *logs.Logger) (*Client, error) {
client := &Client{
Common: Common{
logger: logger,
signalChan: make(chan string, semaphoreLimit),
tcpBufferPool: &sync.Pool{
New: func() any {
buf := make([]byte, tcpDataBufSize)
return &buf
},
},
udpBufferPool: &sync.Pool{
New: func() any {
buf := make([]byte, udpDataBufSize)
return &buf
},
},
},
tunnelName: parsedURL.Hostname(),
}
client.initConfig(parsedURL)
if err := client.initConfig(parsedURL); err != nil {
return nil, fmt.Errorf("newClient: initConfig failed: %w", err)
}
client.initRateLimiter()
return client
return client, nil
}
// Run 管理客户端生命周期
func (c *Client) Run() {
logInfo := func(prefix string) {
c.logger.Info("%v: %v@%v/%v?min=%v&mode=%v&read=%v&rate=%v&slot=%v",
c.logger.Info("%v: client://%v@%v/%v?min=%v&mode=%v&read=%v&rate=%v&slot=%v&proxy=%v",
prefix, c.tunnelKey, c.tunnelTCPAddr, c.targetTCPAddr,
c.minPoolCapacity, c.runMode, c.readTimeout, c.rateLimit/125000, c.slotLimit)
c.minPoolCapacity, c.runMode, c.readTimeout, c.rateLimit/125000, c.slotLimit, c.proxyProtocol)
}
logInfo("Client started")
@@ -168,7 +182,7 @@ func (c *Client) tunnelHandshake() error {
c.tunnelTCPConn.SetKeepAlivePeriod(reportInterval)
// 发送隧道密钥
_, err = c.tunnelTCPConn.Write(append(c.xor([]byte(c.tunnelKey)), '\n'))
_, err = c.tunnelTCPConn.Write(c.encode([]byte(c.tunnelKey)))
if err != nil {
return fmt.Errorf("tunnelHandshake: write tunnel key failed: %w", err)
}
@@ -179,8 +193,14 @@ func (c *Client) tunnelHandshake() error {
return fmt.Errorf("tunnelHandshake: readBytes failed: %w", err)
}
// 解码隧道URL
tunnelURLData, err := c.decode(rawTunnelURL)
if err != nil {
return fmt.Errorf("tunnelHandshake: decode tunnel URL failed: %w", err)
}
// 解析隧道URL
tunnelURL, err := url.Parse(string(c.xor(bytes.TrimSuffix(rawTunnelURL, []byte{'\n'}))))
tunnelURL, err := url.Parse(string(tunnelURLData))
if err != nil {
return fmt.Errorf("tunnelHandshake: parse tunnel URL failed: %w", err)
}

View File

@@ -5,6 +5,7 @@ import (
"bufio"
"bytes"
"context"
"encoding/base64"
"encoding/hex"
"fmt"
"hash/fnv"
@@ -48,6 +49,8 @@ type Common struct {
rateLimiter *conn.RateLimiter // 全局限速器
readTimeout time.Duration // 读取超时
bufReader *bufio.Reader // 缓冲读取器
tcpBufferPool *sync.Pool // TCP缓冲区池
udpBufferPool *sync.Pool // UDP缓冲区池
signalChan chan string // 信号通道
checkPoint time.Time // 检查点时间
slotLimit int32 // 槽位限制
@@ -64,6 +67,7 @@ type Common struct {
// 配置变量,可通过环境变量调整
var (
semaphoreLimit = getEnvAsInt("NP_SEMAPHORE_LIMIT", 65536) // 信号量限制
tcpDataBufSize = getEnvAsInt("NP_TCP_DATA_BUF_SIZE", 32768) // TCP缓冲区大小
udpDataBufSize = getEnvAsInt("NP_UDP_DATA_BUF_SIZE", 2048) // UDP缓冲区大小
handshakeTimeout = getEnvAsDuration("NP_HANDSHAKE_TIMEOUT", 10*time.Second) // 握手超时
tcpDialTimeout = getEnvAsDuration("NP_TCP_DIAL_TIMEOUT", 30*time.Second) // TCP拨号超时
@@ -77,28 +81,40 @@ var (
ReloadInterval = getEnvAsDuration("NP_RELOAD_INTERVAL", 1*time.Hour) // 重载间隔
)
// UDP缓冲区池
var udpBufferPool = sync.Pool{
New: func() any {
b := make([]byte, udpDataBufSize)
return &b
},
// 默认配置
const (
defaultMinPool = 64 // 默认最小池容量
defaultMaxPool = 1024 // 默认最大池容量
defaultRunMode = "0" // 默认运行模式
defaultReadTimeout = 1 * time.Hour // 默认读取超时
defaultRateLimit = 0 // 默认速率限制
defaultSlotLimit = 65536 // 默认槽位限制
defaultProxyProtocol = "0" // 默认代理协议
)
// getTCPBuffer 获取TCP缓冲区
func (c *Common) getTCPBuffer() []byte {
buf := c.tcpBufferPool.Get().(*[]byte)
return (*buf)[:tcpDataBufSize]
}
// getUDPBuffer 从池中获取UDP缓冲区
func getUDPBuffer() []byte {
buf := udpBufferPool.Get().(*[]byte)
if cap(*buf) < udpDataBufSize {
b := make([]byte, udpDataBufSize)
return b
// putTCPBuffer 归还TCP缓冲区
func (c *Common) putTCPBuffer(buf []byte) {
if buf != nil && cap(buf) >= tcpDataBufSize {
c.tcpBufferPool.Put(&buf)
}
}
// getUDPBuffer 获取UDP缓冲区
func (c *Common) getUDPBuffer() []byte {
buf := c.udpBufferPool.Get().(*[]byte)
return (*buf)[:udpDataBufSize]
}
// putUDPBuffer UDP缓冲区归还到池中
func putUDPBuffer(buf []byte) {
if buf != nil {
udpBufferPool.Put(&buf)
// putUDPBuffer 归还UDP缓冲区
func (c *Common) putUDPBuffer(buf []byte) {
if buf != nil && cap(buf) >= udpDataBufSize {
c.udpBufferPool.Put(&buf)
}
}
@@ -166,6 +182,59 @@ func (c *Common) xor(data []byte) []byte {
return data
}
// encode base64编码数据
func (c *Common) encode(data []byte) []byte {
return append([]byte(base64.StdEncoding.EncodeToString(c.xor(data))), '\n')
}
// decode base64解码数据
func (c *Common) decode(data []byte) ([]byte, error) {
decoded, err := base64.StdEncoding.DecodeString(string(bytes.TrimSuffix(data, []byte{'\n'})))
if err != nil {
return nil, fmt.Errorf("decode: base64 decode failed: %w", err)
}
return c.xor(decoded), nil
}
// getAddress 解析和设置地址信息
func (c *Common) getAddress(parsedURL *url.URL) error {
// 解析隧道地址
tunnelAddr := parsedURL.Host
// 解析隧道TCP地址
if tunnelTCPAddr, err := net.ResolveTCPAddr("tcp", tunnelAddr); err == nil {
c.tunnelTCPAddr = tunnelTCPAddr
} else {
return fmt.Errorf("getAddress: resolveTCPAddr failed: %w", err)
}
// 解析隧道UDP地址
if tunnelUDPAddr, err := net.ResolveUDPAddr("udp", tunnelAddr); err == nil {
c.tunnelUDPAddr = tunnelUDPAddr
} else {
return fmt.Errorf("getAddress: resolveUDPAddr failed: %w", err)
}
// 处理目标地址
targetAddr := strings.TrimPrefix(parsedURL.Path, "/")
// 解析目标TCP地址
if targetTCPAddr, err := net.ResolveTCPAddr("tcp", targetAddr); err == nil {
c.targetTCPAddr = targetTCPAddr
} else {
return fmt.Errorf("getAddress: resolveTCPAddr failed: %w", err)
}
// 解析目标UDP地址
if targetUDPAddr, err := net.ResolveUDPAddr("udp", targetAddr); err == nil {
c.targetUDPAddr = targetUDPAddr
} else {
return fmt.Errorf("getAddress: resolveUDPAddr failed: %w", err)
}
return nil
}
// getTunnelKey 从URL中获取隧道密钥
func (c *Common) getTunnelKey(parsedURL *url.URL) {
if key := parsedURL.User.Username(); key != "" {
@@ -177,43 +246,6 @@ func (c *Common) getTunnelKey(parsedURL *url.URL) {
}
}
// getAddress 解析和设置地址信息
func (c *Common) getAddress(parsedURL *url.URL) {
// 解析隧道地址
tunnelAddr := parsedURL.Host
// 解析隧道TCP地址
if tunnelTCPAddr, err := net.ResolveTCPAddr("tcp", tunnelAddr); err == nil {
c.tunnelTCPAddr = tunnelTCPAddr
} else {
c.logger.Error("getAddress: resolveTCPAddr failed: %v", err)
}
// 解析隧道UDP地址
if tunnelUDPAddr, err := net.ResolveUDPAddr("udp", tunnelAddr); err == nil {
c.tunnelUDPAddr = tunnelUDPAddr
} else {
c.logger.Error("getAddress: resolveUDPAddr failed: %v", err)
}
// 处理目标地址
targetAddr := strings.TrimPrefix(parsedURL.Path, "/")
// 解析目标TCP地址
if targetTCPAddr, err := net.ResolveTCPAddr("tcp", targetAddr); err == nil {
c.targetTCPAddr = targetTCPAddr
} else {
c.logger.Error("getAddress: resolveTCPAddr failed: %v", err)
}
// 解析目标UDP地址
if targetUDPAddr, err := net.ResolveUDPAddr("udp", targetAddr); err == nil {
c.targetUDPAddr = targetUDPAddr
} else {
c.logger.Error("getAddress: resolveUDPAddr failed: %v", err)
}
}
// getPoolCapacity 获取连接池容量设置
func (c *Common) getPoolCapacity(parsedURL *url.URL) {
if min := parsedURL.Query().Get("min"); min != "" {
@@ -221,7 +253,7 @@ func (c *Common) getPoolCapacity(parsedURL *url.URL) {
c.minPoolCapacity = value
}
} else {
c.minPoolCapacity = 64
c.minPoolCapacity = defaultMinPool
}
if max := parsedURL.Query().Get("max"); max != "" {
@@ -229,7 +261,16 @@ func (c *Common) getPoolCapacity(parsedURL *url.URL) {
c.maxPoolCapacity = value
}
} else {
c.maxPoolCapacity = 1024
c.maxPoolCapacity = defaultMaxPool
}
}
// getRunMode 获取运行模式
func (c *Common) getRunMode(parsedURL *url.URL) {
if mode := parsedURL.Query().Get("mode"); mode != "" {
c.runMode = mode
} else {
c.runMode = defaultRunMode
}
}
@@ -240,16 +281,7 @@ func (c *Common) getReadTimeout(parsedURL *url.URL) {
c.readTimeout = value
}
} else {
c.readTimeout = 1 * time.Hour
}
}
// getRunMode 获取运行模式
func (c *Common) getRunMode(parsedURL *url.URL) {
if mode := parsedURL.Query().Get("mode"); mode != "" {
c.runMode = mode
} else {
c.runMode = "0"
c.readTimeout = defaultReadTimeout
}
}
@@ -260,7 +292,7 @@ func (c *Common) getRateLimit(parsedURL *url.URL) {
c.rateLimit = value * 125000
}
} else {
c.rateLimit = 0
c.rateLimit = defaultRateLimit
}
}
@@ -271,7 +303,7 @@ func (c *Common) getSlotLimit(parsedURL *url.URL) {
c.slotLimit = int32(value)
}
} else {
c.slotLimit = 65536
c.slotLimit = defaultSlotLimit
}
}
@@ -280,20 +312,25 @@ func (c *Common) getProxyProtocol(parsedURL *url.URL) {
if protocol := parsedURL.Query().Get("proxy"); protocol != "" {
c.proxyProtocol = protocol
} else {
c.proxyProtocol = "0"
c.proxyProtocol = defaultProxyProtocol
}
}
// initConfig 初始化配置
func (c *Common) initConfig(parsedURL *url.URL) {
func (c *Common) initConfig(parsedURL *url.URL) error {
if err := c.getAddress(parsedURL); err != nil {
return err
}
c.getTunnelKey(parsedURL)
c.getAddress(parsedURL)
c.getPoolCapacity(parsedURL)
c.getReadTimeout(parsedURL)
c.getRunMode(parsedURL)
c.getReadTimeout(parsedURL)
c.getRateLimit(parsedURL)
c.getSlotLimit(parsedURL)
c.getProxyProtocol(parsedURL)
return nil
}
// sendProxyV1Header 发送PROXY v1
@@ -517,7 +554,19 @@ func (c *Common) commonQueue() error {
if err != nil {
return fmt.Errorf("commonQueue: readBytes failed: %w", err)
}
signal := string(c.xor(bytes.TrimSuffix(rawSignal, []byte{'\n'})))
// 解码信号
signalData, err := c.decode(rawSignal)
if err != nil {
c.logger.Error("commonQueue: decode signal failed: %v", err)
select {
case <-c.ctx.Done():
return fmt.Errorf("commonQueue: context error: %w", c.ctx.Err())
case <-time.After(50 * time.Millisecond):
}
continue
}
signal := string(signalData)
// 将信号发送到通道
select {
@@ -550,7 +599,7 @@ func (c *Common) healthCheck() error {
// 连接池健康度检查
if c.tunnelPool.ErrorCount() > c.tunnelPool.Active()/2 {
// 发送刷新信号到对端
_, err := c.tunnelTCPConn.Write(append(c.xor([]byte(flushURL.String())), '\n'))
_, err := c.tunnelTCPConn.Write(c.encode([]byte(flushURL.String())))
if err != nil {
c.mu.Unlock()
return fmt.Errorf("healthCheck: write flush signal failed: %w", err)
@@ -569,7 +618,7 @@ func (c *Common) healthCheck() error {
// 发送PING信号
c.checkPoint = time.Now()
_, err := c.tunnelTCPConn.Write(append(c.xor([]byte(pingURL.String())), '\n'))
_, err := c.tunnelTCPConn.Write(c.encode([]byte(pingURL.String())))
if err != nil {
c.mu.Unlock()
return fmt.Errorf("healthCheck: write ping signal failed: %w", err)
@@ -674,7 +723,7 @@ func (c *Common) commonTCPLoop() {
}
c.mu.Lock()
_, err = c.tunnelTCPConn.Write(append(c.xor([]byte(launchURL.String())), '\n'))
_, err = c.tunnelTCPConn.Write(c.encode([]byte(launchURL.String())))
c.mu.Unlock()
if err != nil {
@@ -684,9 +733,16 @@ func (c *Common) commonTCPLoop() {
c.logger.Debug("TCP launch signal: cid %v -> %v", id, c.tunnelTCPConn.RemoteAddr())
buffer1 := c.getTCPBuffer()
buffer2 := c.getTCPBuffer()
defer func() {
c.putTCPBuffer(buffer1)
c.putTCPBuffer(buffer2)
}()
// 交换数据
c.logger.Debug("Starting exchange: %v <-> %v", remoteConn.LocalAddr(), targetConn.LocalAddr())
c.logger.Debug("Exchange complete: %v", conn.DataExchange(remoteConn, targetConn, c.readTimeout))
c.logger.Debug("Exchange complete: %v", conn.DataExchange(remoteConn, targetConn, c.readTimeout, buffer1, buffer2))
}(targetConn)
}
}
@@ -698,17 +754,17 @@ func (c *Common) commonUDPLoop() {
return
}
buffer := getUDPBuffer()
buffer := c.getUDPBuffer()
// 读取来自目标的UDP数据
x, clientAddr, err := c.targetUDPConn.ReadFromUDP(buffer)
if err != nil {
if c.ctx.Err() != nil || err == net.ErrClosed {
putUDPBuffer(buffer)
c.putUDPBuffer(buffer)
return
}
c.logger.Error("commonUDPLoop: readFromUDP failed: %v", err)
putUDPBuffer(buffer)
c.putUDPBuffer(buffer)
select {
case <-c.ctx.Done():
@@ -733,7 +789,7 @@ func (c *Common) commonUDPLoop() {
// 尝试获取UDP连接槽位
if !c.tryAcquireSlot(true) {
c.logger.Error("commonUDPLoop: UDP slot limit reached: %v/%v", c.udpSlot, c.slotLimit)
putUDPBuffer(buffer)
c.putUDPBuffer(buffer)
continue
}
@@ -760,8 +816,8 @@ func (c *Common) commonUDPLoop() {
c.releaseSlot(true)
}()
buffer := getUDPBuffer()
defer putUDPBuffer(buffer)
buffer := c.getUDPBuffer()
defer c.putUDPBuffer(buffer)
reader := &conn.TimeoutReader{Conn: remoteConn, Timeout: c.readTimeout}
for {
@@ -800,7 +856,7 @@ func (c *Common) commonUDPLoop() {
}
c.mu.Lock()
_, err = c.tunnelTCPConn.Write(append(c.xor([]byte(launchURL.String())), '\n'))
_, err = c.tunnelTCPConn.Write(c.encode([]byte(launchURL.String())))
c.mu.Unlock()
if err != nil {
c.logger.Error("commonUDPLoop: write launch signal failed: %v", err)
@@ -817,13 +873,13 @@ func (c *Common) commonUDPLoop() {
c.logger.Error("commonUDPLoop: write to tunnel failed: %v", err)
c.targetUDPSession.Delete(sessionKey)
remoteConn.Close()
putUDPBuffer(buffer)
c.putUDPBuffer(buffer)
continue
}
// 传输完成
c.logger.Debug("Transfer complete: %v <-> %v", remoteConn.LocalAddr(), c.targetUDPConn.LocalAddr())
putUDPBuffer(buffer)
c.putUDPBuffer(buffer)
}
}
@@ -878,7 +934,7 @@ func (c *Common) commonOnce() error {
go c.commonUDPOnce(signalURL)
case "i": // PING
c.mu.Lock()
_, err := c.tunnelTCPConn.Write(append(c.xor([]byte(pongURL.String())), '\n'))
_, err := c.tunnelTCPConn.Write(c.encode([]byte(pongURL.String())))
c.mu.Unlock()
if err != nil {
return fmt.Errorf("commonOnce: write pong signal failed: %w", err)
@@ -954,9 +1010,16 @@ func (c *Common) commonTCPOnce(signalURL *url.URL) {
return
}
buffer1 := c.getTCPBuffer()
buffer2 := c.getTCPBuffer()
defer func() {
c.putTCPBuffer(buffer1)
c.putTCPBuffer(buffer2)
}()
// 交换数据
c.logger.Debug("Starting exchange: %v <-> %v", remoteConn.LocalAddr(), targetConn.LocalAddr())
c.logger.Debug("Exchange complete: %v", conn.DataExchange(remoteConn, targetConn, c.readTimeout))
c.logger.Debug("Exchange complete: %v", conn.DataExchange(remoteConn, targetConn, c.readTimeout, buffer1, buffer2))
}
// commonUDPOnce 共用处理单个UDP请求
@@ -1013,8 +1076,8 @@ func (c *Common) commonUDPOnce(signalURL *url.URL) {
go func() {
defer func() { done <- struct{}{} }()
buffer := getUDPBuffer()
defer putUDPBuffer(buffer)
buffer := c.getUDPBuffer()
defer c.putUDPBuffer(buffer)
reader := &conn.TimeoutReader{Conn: remoteConn, Timeout: c.readTimeout}
for {
if c.ctx.Err() != nil {
@@ -1047,8 +1110,8 @@ func (c *Common) commonUDPOnce(signalURL *url.URL) {
go func() {
defer func() { done <- struct{}{} }()
buffer := getUDPBuffer()
defer putUDPBuffer(buffer)
buffer := c.getUDPBuffer()
defer c.putUDPBuffer(buffer)
reader := &conn.TimeoutReader{Conn: targetConn, Timeout: c.readTimeout}
for {
if c.ctx.Err() != nil {
@@ -1204,10 +1267,16 @@ func (c *Common) singleTCPLoop() error {
c.logger.Error("singleTCPLoop: sendProxyV1Header failed: %v", err)
return
}
buffer1 := c.getTCPBuffer()
buffer2 := c.getTCPBuffer()
defer func() {
c.putTCPBuffer(buffer1)
c.putTCPBuffer(buffer2)
}()
// 交换数据
c.logger.Debug("Starting exchange: %v <-> %v", tunnelConn.LocalAddr(), targetConn.LocalAddr())
c.logger.Debug("Exchange complete: %v", conn.DataExchange(tunnelConn, targetConn, c.readTimeout))
c.logger.Debug("Exchange complete: %v", conn.DataExchange(tunnelConn, targetConn, c.readTimeout, buffer1, buffer2))
}(tunnelConn)
}
}
@@ -1219,18 +1288,18 @@ func (c *Common) singleUDPLoop() error {
return fmt.Errorf("singleUDPLoop: context error: %w", c.ctx.Err())
}
buffer := getUDPBuffer()
buffer := c.getUDPBuffer()
// 读取来自隧道的UDP数据
x, clientAddr, err := c.tunnelUDPConn.ReadFromUDP(buffer)
if err != nil {
if c.ctx.Err() != nil || err == net.ErrClosed {
putUDPBuffer(buffer)
c.putUDPBuffer(buffer)
return fmt.Errorf("singleUDPLoop: context error: %w", c.ctx.Err())
}
c.logger.Error("singleUDPLoop: ReadFromUDP failed: %v", err)
putUDPBuffer(buffer)
c.putUDPBuffer(buffer)
select {
case <-c.ctx.Done():
return fmt.Errorf("singleUDPLoop: context error: %w", c.ctx.Err())
@@ -1253,7 +1322,7 @@ func (c *Common) singleUDPLoop() error {
// 尝试获取UDP连接槽位
if !c.tryAcquireSlot(true) {
c.logger.Error("singleUDPLoop: UDP slot limit reached: %v/%v", c.udpSlot, c.slotLimit)
putUDPBuffer(buffer)
c.putUDPBuffer(buffer)
continue
}
@@ -1262,7 +1331,7 @@ func (c *Common) singleUDPLoop() error {
if err != nil {
c.logger.Error("singleUDPLoop: dialTimeout failed: %v", err)
c.releaseSlot(true)
putUDPBuffer(buffer)
c.putUDPBuffer(buffer)
continue
}
targetConn = newSession
@@ -1277,8 +1346,8 @@ func (c *Common) singleUDPLoop() error {
c.releaseSlot(true)
}()
buffer := getUDPBuffer()
defer putUDPBuffer(buffer)
buffer := c.getUDPBuffer()
defer c.putUDPBuffer(buffer)
reader := &conn.TimeoutReader{Conn: targetConn, Timeout: c.readTimeout}
for {
@@ -1326,12 +1395,12 @@ func (c *Common) singleUDPLoop() error {
if targetConn != nil {
targetConn.Close()
}
putUDPBuffer(buffer)
c.putUDPBuffer(buffer)
return fmt.Errorf("singleUDPLoop: write to target failed: %w", err)
}
// 传输完成
c.logger.Debug("Transfer complete: %v <-> %v", targetConn.LocalAddr(), c.tunnelUDPConn.LocalAddr())
putUDPBuffer(buffer)
c.putUDPBuffer(buffer)
}
}

View File

@@ -38,6 +38,9 @@ const (
apiKeyID = "********" // API Key的特殊ID
tcpingSemLimit = 10 // TCPing最大并发数
baseDuration = 100 * time.Millisecond // 基准持续时间
maxTagsCount = 50 // 最大标签数量
maxTagKeyLen = 100 // 标签键最大长度
maxTagValueLen = 500 // 标签值最大长度
)
// Swagger UI HTML模板
@@ -64,6 +67,7 @@ const swaggerUIHTML = `<!DOCTYPE html>
// Master 实现主控模式功能
type Master struct {
Common // 继承通用功能
alias string // 主控别名
prefix string // API前缀
version string // NP版本
hostname string // 隧道名称
@@ -80,7 +84,7 @@ type Master struct {
notifyChannel chan *InstanceEvent // 事件通知通道
tcpingSem chan struct{} // TCPing并发控制
startTime time.Time // 启动时间
backupDone chan struct{} // 备份停止信号
periodicDone chan struct{} // 定期任务停止信号
}
// Instance 实例信息
@@ -90,7 +94,9 @@ type Instance struct {
Type string `json:"type"` // 实例类型
Status string `json:"status"` // 实例状态
URL string `json:"url"` // 实例URL
Config string `json:"config"` // 实例配置
Restart bool `json:"restart"` // 是否自启动
Tags []Tag `json:"tags"` // 标签数组
Mode int32 `json:"mode"` // 实例模式
Ping int32 `json:"ping"` // 端内延迟
Pool int32 `json:"pool"` // 池连接数
@@ -106,25 +112,32 @@ type Instance struct {
UDPTXBase uint64 `json:"-" gob:"-"` // UDP发送字节数基线不序列化
cmd *exec.Cmd `json:"-" gob:"-"` // 命令对象(不序列化)
stopped chan struct{} `json:"-" gob:"-"` // 停止信号通道(不序列化)
deleted bool `json:"-" gob:"-"` // 删除标志(不序列化)
cancelFunc context.CancelFunc `json:"-" gob:"-"` // 取消函数(不序列化)
lastCheckPoint time.Time `json:"-" gob:"-"` // 上次检查点时间(不序列化)
}
// Tag 标签结构体
type Tag struct {
Key string `json:"key"` // 标签键
Value string `json:"value"` // 标签值
}
// InstanceEvent 实例事件信息
type InstanceEvent struct {
Type string `json:"type"` // 事件类型initial, create, update, delete, shutdown, log
Time time.Time `json:"time"` // 事件时间
Instance *Instance `json:"instance"` // 关联的实例
Logs string `json:"logs,omitempty"` // 日志内容仅当Type为log时有效
Type string `json:"type"` // 事件类型initial, create, update, delete, shutdown, log
Time time.Time `json:"time"` // 事件时间
Instance *Instance `json:"instance"` // 关联的实例
Logs string `json:"logs"` // 日志内容
}
// SystemInfo 系统信息结构体
type SystemInfo struct {
CPU int `json:"cpu"` // CPU使用率 (%)
MemTotal uint64 `json:"mem_total"` // 内存容量字节数
MemFree uint64 `json:"mem_free"` // 内存用字节数
MemUsed uint64 `json:"mem_used"` // 内存用字节数
SwapTotal uint64 `json:"swap_total"` // 交换区容量字节数
SwapFree uint64 `json:"swap_free"` // 交换区用字节数
SwapUsed uint64 `json:"swap_used"` // 交换区用字节数
NetRX uint64 `json:"netrx"` // 网络接收字节数
NetTX uint64 `json:"nettx"` // 网络发送字节数
DiskR uint64 `json:"diskr"` // 磁盘读取字节数
@@ -191,6 +204,34 @@ func (m *Master) performTCPing(target string) *TCPingResult {
return result
}
// validateTags 验证标签的有效性
func validateTags(tags []Tag) error {
if len(tags) > maxTagsCount {
return fmt.Errorf("too many tags: maximum %d allowed", maxTagsCount)
}
keySet := make(map[string]bool)
for _, tag := range tags {
if len(tag.Key) == 0 {
return fmt.Errorf("tag key cannot be empty")
}
if len(tag.Key) > maxTagKeyLen {
return fmt.Errorf("tag key exceeds maximum length %d", maxTagKeyLen)
}
if len(tag.Value) > maxTagValueLen {
return fmt.Errorf("tag value for key exceeds maximum length %d", maxTagValueLen)
}
// 检查重复的键
if keySet[tag.Key] {
return fmt.Errorf("duplicate tag key: '%s'", tag.Key)
}
keySet[tag.Key] = true
}
return nil
}
// InstanceLogWriter 实例日志写入器
type InstanceLogWriter struct {
instanceID string // 实例ID
@@ -246,9 +287,11 @@ func (w *InstanceLogWriter) Write(p []byte) (n int, err error) {
}
w.instance.lastCheckPoint = time.Now()
w.master.instances.Store(w.instanceID, w.instance)
// 发送检查点更新事件
w.master.sendSSEEvent("update", w.instance)
// 仅当实例未被删除时才存储和发送更新事件
if !w.instance.deleted {
w.master.instances.Store(w.instanceID, w.instance)
w.master.sendSSEEvent("update", w.instance)
}
// 过滤检查点日志
continue
}
@@ -256,8 +299,10 @@ func (w *InstanceLogWriter) Write(p []byte) (n int, err error) {
// 输出日志加实例ID
fmt.Fprintf(w.target, "%s [%s]\n", line, w.instanceID)
// 发送日志事件
w.master.sendSSEEvent("log", w.instance, line)
// 仅当实例未被删除时才发送日志事件
if !w.instance.deleted {
w.master.sendSSEEvent("log", w.instance, line)
}
}
if err := scanner.Err(); err != nil {
@@ -274,12 +319,11 @@ func setCorsHeaders(w http.ResponseWriter) {
}
// NewMaster 创建新的主控实例
func NewMaster(parsedURL *url.URL, tlsCode string, tlsConfig *tls.Config, logger *logs.Logger, version string) *Master {
func NewMaster(parsedURL *url.URL, tlsCode string, tlsConfig *tls.Config, logger *logs.Logger, version string) (*Master, error) {
// 解析主机地址
host, err := net.ResolveTCPAddr("tcp", parsedURL.Host)
if err != nil {
logger.Error("newMaster: resolveTCPAddr failed: %v", err)
return nil
return nil, fmt.Errorf("newMaster: resolve host failed: %w", err)
}
// 获取隧道名称
@@ -319,7 +363,7 @@ func NewMaster(parsedURL *url.URL, tlsCode string, tlsConfig *tls.Config, logger
notifyChannel: make(chan *InstanceEvent, semaphoreLimit),
tcpingSem: make(chan struct{}, tcpingSemLimit),
startTime: time.Now(),
backupDone: make(chan struct{}),
periodicDone: make(chan struct{}),
}
master.tunnelTCPAddr = host
@@ -329,10 +373,7 @@ func NewMaster(parsedURL *url.URL, tlsCode string, tlsConfig *tls.Config, logger
// 启动事件分发器
go master.startEventDispatcher()
// 启动定期备份
go master.startPeriodicBackup()
return master
return master, nil
}
// Run 管理主控生命周期
@@ -449,6 +490,9 @@ func (m *Master) Run() {
}
}()
// 启动定期任务
go m.startPeriodicTasks()
// 处理系统信号
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
<-ctx.Done()
@@ -467,53 +511,15 @@ func (m *Master) Run() {
// Shutdown 关闭主控
func (m *Master) Shutdown(ctx context.Context) error {
return m.shutdown(ctx, func() {
// 声明一个已关闭通道的集合,避免重复关闭
var closedChannels sync.Map
var wg sync.WaitGroup
// 给所有订阅者一个关闭通知
m.subscribers.Range(func(key, value any) bool {
subscriberChan := value.(chan *InstanceEvent)
wg.Add(1)
go func(ch chan *InstanceEvent) {
defer wg.Done()
// 非阻塞的方式发送关闭事件
select {
case ch <- &InstanceEvent{
Type: "shutdown",
Time: time.Now(),
}:
default:
// 不可用,忽略
}
}(subscriberChan)
return true
})
// 等待所有订阅者处理完关闭事件
time.Sleep(baseDuration)
// 关闭所有订阅者通道
m.subscribers.Range(func(key, value any) bool {
subscriberChan := value.(chan *InstanceEvent)
// 检查通道是否已关闭,如果没有则关闭它
if _, loaded := closedChannels.LoadOrStore(subscriberChan, true); !loaded {
wg.Add(1)
go func(k any, ch chan *InstanceEvent) {
defer wg.Done()
close(ch)
m.subscribers.Delete(k)
}(key, subscriberChan)
}
return true
})
// 通知并关闭SSE连接
m.shutdownSSEConnections()
// 停止所有运行中的实例
var wg sync.WaitGroup
m.instances.Range(func(key, value any) bool {
instance := value.(*Instance)
// 如果实例正在运行,则停止它
if instance.Status == "running" && instance.cmd != nil && instance.cmd.Process != nil {
// 如果实例需要停止,则停止它
if instance.Status != "stopped" && instance.cmd != nil && instance.cmd.Process != nil {
wg.Add(1)
go func(inst *Instance) {
defer wg.Done()
@@ -525,8 +531,8 @@ func (m *Master) Shutdown(ctx context.Context) error {
wg.Wait()
// 关闭定期备份
close(m.backupDone)
// 关闭定期任务
close(m.periodicDone)
// 关闭事件通知通道,停止事件分发器
close(m.notifyChannel)
@@ -545,6 +551,107 @@ func (m *Master) Shutdown(ctx context.Context) error {
})
}
// startPeriodicTasks 启动所有定期任务
func (m *Master) startPeriodicTasks() {
go m.startPeriodicBackup()
go m.startPeriodicCleanup()
go m.startPeriodicRestart()
}
// startPeriodicBackup 启动定期备份
func (m *Master) startPeriodicBackup() {
for {
select {
case <-time.After(ReloadInterval):
// 固定备份文件名
backupPath := fmt.Sprintf("%s.backup", m.statePath)
if err := m.saveStateToPath(backupPath); err != nil {
m.logger.Error("startPeriodicBackup: backup state failed: %v", err)
} else {
m.logger.Info("State backup saved: %v", backupPath)
}
case <-m.periodicDone:
return
}
}
}
// startPeriodicCleanup 启动定期清理重复ID的实例
func (m *Master) startPeriodicCleanup() {
for {
select {
case <-time.After(reportInterval):
// 收集实例并按ID分组
idInstances := make(map[string][]*Instance)
m.instances.Range(func(key, value any) bool {
if id := key.(string); id != apiKeyID {
idInstances[id] = append(idInstances[id], value.(*Instance))
}
return true
})
// 清理重复实例
for _, instances := range idInstances {
if len(instances) <= 1 {
continue
}
// 选择保留实例
keepIdx := 0
for i, inst := range instances {
if inst.Status == "running" && instances[keepIdx].Status != "running" {
keepIdx = i
}
}
// 清理多余实例
for i, inst := range instances {
if i == keepIdx {
continue
}
inst.deleted = true
if inst.Status != "stopped" {
m.stopInstance(inst)
}
m.instances.Delete(inst.ID)
}
}
case <-m.periodicDone:
return
}
}
}
// startPeriodicRestart 启动定期错误实例重启
func (m *Master) startPeriodicRestart() {
for {
select {
case <-time.After(reportInterval):
// 收集所有error状态的实例
var errorInstances []*Instance
m.instances.Range(func(key, value any) bool {
if id := key.(string); id != apiKeyID {
instance := value.(*Instance)
if instance.Status == "error" && !instance.deleted {
errorInstances = append(errorInstances, instance)
}
}
return true
})
// 重启所有error状态的实例
for _, instance := range errorInstances {
m.stopInstance(instance)
time.Sleep(baseDuration)
m.startInstance(instance)
}
case <-m.periodicDone:
return
}
}
}
// saveState 保存实例状态到文件
func (m *Master) saveState() error {
return m.saveStateToPath(m.statePath)
@@ -618,27 +725,15 @@ func (m *Master) saveStateToPath(filePath string) error {
return nil
}
// startPeriodicBackup 启动定期备份
func (m *Master) startPeriodicBackup() {
for {
select {
case <-time.After(ReloadInterval):
// 固定备份文件名
backupPath := fmt.Sprintf("%s.backup", m.statePath)
if err := m.saveStateToPath(backupPath); err != nil {
m.logger.Error("startPeriodicBackup: backup state failed: %v", err)
} else {
m.logger.Info("State backup saved: %v", backupPath)
}
case <-m.backupDone:
return
}
}
}
// loadState 从文件加载实例状态
func (m *Master) loadState() {
// 清理旧的临时文件
if tmpFiles, _ := filepath.Glob(filepath.Join(filepath.Dir(m.statePath), "np-*.tmp")); tmpFiles != nil {
for _, f := range tmpFiles {
os.Remove(f)
}
}
// 检查文件是否存在
if _, err := os.Stat(m.statePath); os.IsNotExist(err) {
return
@@ -663,6 +758,12 @@ func (m *Master) loadState() {
// 恢复实例
for id, instance := range persistentData {
instance.stopped = make(chan struct{})
// 生成完整配置
if instance.Config == "" && instance.ID != apiKeyID {
instance.Config = m.generateConfigURL(instance)
}
m.instances.Store(id, instance)
// 处理自启动
@@ -679,31 +780,56 @@ func (m *Master) loadState() {
func (m *Master) handleOpenAPISpec(w http.ResponseWriter, r *http.Request) {
setCorsHeaders(w)
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(generateOpenAPISpec()))
w.Write([]byte(m.generateOpenAPISpec()))
}
// handleSwaggerUI 处理Swagger UI请求
func (m *Master) handleSwaggerUI(w http.ResponseWriter, r *http.Request) {
setCorsHeaders(w)
w.Header().Set("Content-Type", "text/html")
fmt.Fprintf(w, swaggerUIHTML, generateOpenAPISpec())
fmt.Fprintf(w, swaggerUIHTML, m.generateOpenAPISpec())
}
// handleInfo 处理系统信息请求
func (m *Master) handleInfo(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
httpError(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
switch r.Method {
case http.MethodGet:
writeJSON(w, http.StatusOK, m.getMasterInfo())
case http.MethodPost:
var reqData struct {
Alias string `json:"alias"`
}
if err := json.NewDecoder(r.Body).Decode(&reqData); err != nil {
httpError(w, "Invalid request body", http.StatusBadRequest)
return
}
// 更新主控别名
if len(reqData.Alias) > maxTagKeyLen {
httpError(w, fmt.Sprintf("Master alias exceeds maximum length %d", maxTagKeyLen), http.StatusBadRequest)
return
}
m.alias = reqData.Alias
writeJSON(w, http.StatusOK, m.getMasterInfo())
default:
httpError(w, "Method not allowed", http.StatusMethodNotAllowed)
}
}
// getMasterInfo 获取完整的主控信息
func (m *Master) getMasterInfo() map[string]any {
info := map[string]any{
"alias": m.alias,
"os": runtime.GOOS,
"arch": runtime.GOARCH,
"cpu": -1,
"mem_total": uint64(0),
"mem_free": uint64(0),
"mem_used": uint64(0),
"swap_total": uint64(0),
"swap_free": uint64(0),
"swap_used": uint64(0),
"netrx": uint64(0),
"nettx": uint64(0),
"diskr": uint64(0),
@@ -722,9 +848,9 @@ func (m *Master) handleInfo(w http.ResponseWriter, r *http.Request) {
sysInfo := getLinuxSysInfo()
info["cpu"] = sysInfo.CPU
info["mem_total"] = sysInfo.MemTotal
info["mem_free"] = sysInfo.MemFree
info["mem_used"] = sysInfo.MemUsed
info["swap_total"] = sysInfo.SwapTotal
info["swap_free"] = sysInfo.SwapFree
info["swap_used"] = sysInfo.SwapUsed
info["netrx"] = sysInfo.NetRX
info["nettx"] = sysInfo.NetTX
info["diskr"] = sysInfo.DiskR
@@ -732,7 +858,7 @@ func (m *Master) handleInfo(w http.ResponseWriter, r *http.Request) {
info["sysup"] = sysInfo.SysUp
}
writeJSON(w, http.StatusOK, info)
return info
}
// getLinuxSysInfo 获取Linux系统信息
@@ -740,9 +866,9 @@ func getLinuxSysInfo() SystemInfo {
info := SystemInfo{
CPU: -1,
MemTotal: 0,
MemFree: 0,
MemUsed: 0,
SwapTotal: 0,
SwapFree: 0,
SwapUsed: 0,
NetRX: 0,
NetTX: 0,
DiskR: 0,
@@ -754,7 +880,7 @@ func getLinuxSysInfo() SystemInfo {
return info
}
// CPU使用率:解析/proc/stat
// CPU占用:解析/proc/stat
readStat := func() (idle, total uint64) {
data, err := os.ReadFile("/proc/stat")
if err != nil {
@@ -783,9 +909,9 @@ func getLinuxSysInfo() SystemInfo {
info.CPU = min(int((deltaTotal-deltaIdle)*100/deltaTotal/uint64(numCPU)), 100)
}
// RAM使用率:解析/proc/meminfo
// RAM占用:解析/proc/meminfo
if data, err := os.ReadFile("/proc/meminfo"); err == nil {
var memTotal, memFree, swapTotal, swapFree uint64
var memTotal, memAvailable, swapTotal, swapFree uint64
for line := range strings.SplitSeq(string(data), "\n") {
if fields := strings.Fields(line); len(fields) >= 2 {
if val, err := strconv.ParseUint(fields[1], 10, 64); err == nil {
@@ -793,8 +919,8 @@ func getLinuxSysInfo() SystemInfo {
switch fields[0] {
case "MemTotal:":
memTotal = val
case "MemFree:":
memFree = val
case "MemAvailable:":
memAvailable = val
case "SwapTotal:":
swapTotal = val
case "SwapFree:":
@@ -804,9 +930,9 @@ func getLinuxSysInfo() SystemInfo {
}
}
info.MemTotal = memTotal
info.MemFree = memFree
info.MemUsed = memTotal - memAvailable
info.SwapTotal = swapTotal
info.SwapFree = swapFree
info.SwapUsed = swapTotal - swapFree
}
// 网络I/O解析/proc/net/dev
@@ -914,9 +1040,12 @@ func (m *Master) handleInstances(w http.ResponseWriter, r *http.Request) {
Type: instanceType,
URL: m.enhanceURL(reqData.URL, instanceType),
Status: "stopped",
Restart: false,
Restart: true,
Tags: []Tag{},
stopped: make(chan struct{}),
}
instance.Config = m.generateConfigURL(instance)
m.instances.Store(id, instance)
// 启动实例
@@ -978,6 +1107,7 @@ func (m *Master) handlePatchInstance(w http.ResponseWriter, r *http.Request, id
Alias string `json:"alias,omitempty"`
Action string `json:"action,omitempty"`
Restart *bool `json:"restart,omitempty"`
Tags []Tag `json:"tags,omitempty"`
}
if err := json.NewDecoder(r.Body).Decode(&reqData); err == nil {
if id == apiKeyID {
@@ -988,6 +1118,44 @@ func (m *Master) handlePatchInstance(w http.ResponseWriter, r *http.Request, id
m.sendSSEEvent("update", instance)
}
} else {
// 处理标签更新
if reqData.Tags != nil {
if err := validateTags(reqData.Tags); err != nil {
httpError(w, err.Error(), http.StatusBadRequest)
return
}
// 创建现有标签的映射表
existingTags := make(map[string]Tag)
for _, tag := range instance.Tags {
existingTags[tag.Key] = tag
}
for _, tag := range reqData.Tags {
if tag.Value == "" {
// value为空删除key
delete(existingTags, tag.Key)
} else {
// value非空更新或添加key
existingTags[tag.Key] = tag
}
}
// 将映射表转换回标签数组
newTags := make([]Tag, 0, len(existingTags))
for _, tag := range existingTags {
newTags = append(newTags, tag)
}
instance.Tags = newTags
m.instances.Store(id, instance)
go m.saveState()
m.logger.Info("Tags updated: [%v]", instance.ID)
// 发送标签变更事件
m.sendSSEEvent("update", instance)
}
// 重置流量统计
if reqData.Action == "reset" {
instance.TCPRX = 0
@@ -1015,6 +1183,10 @@ func (m *Master) handlePatchInstance(w http.ResponseWriter, r *http.Request, id
// 更新实例别名
if reqData.Alias != "" && instance.Alias != reqData.Alias {
if len(reqData.Alias) > maxTagKeyLen {
httpError(w, fmt.Sprintf("Instance alias exceeds maximum length %d", maxTagKeyLen), http.StatusBadRequest)
return
}
instance.Alias = reqData.Alias
m.instances.Store(id, instance)
go m.saveState()
@@ -1072,8 +1244,8 @@ func (m *Master) handlePutInstance(w http.ResponseWriter, r *http.Request, id st
return
}
// 如果实例正在运行,先停止它
if instance.Status == "running" {
// 如果实例需要停止,先停止它
if instance.Status != "stopped" {
m.stopInstance(instance)
time.Sleep(baseDuration)
}
@@ -1081,6 +1253,7 @@ func (m *Master) handlePutInstance(w http.ResponseWriter, r *http.Request, id st
// 更新实例URL和类型
instance.URL = enhancedURL
instance.Type = instanceType
instance.Config = m.generateConfigURL(instance)
// 更新实例状态
instance.Status = "stopped"
@@ -1103,8 +1276,9 @@ func (m *Master) handlePutInstance(w http.ResponseWriter, r *http.Request, id st
func (m *Master) regenerateAPIKey(instance *Instance) {
instance.URL = generateAPIKey()
m.instances.Store(apiKeyID, instance)
go m.saveState()
m.logger.Info("API Key regenerated: %v", instance.URL)
go m.saveState()
go m.shutdownSSEConnections()
}
// processInstanceAction 处理实例操作
@@ -1115,19 +1289,15 @@ func (m *Master) processInstanceAction(instance *Instance, action string) {
go m.startInstance(instance)
}
case "stop":
if instance.Status == "running" {
if instance.Status != "stopped" {
go m.stopInstance(instance)
}
case "restart":
if instance.Status == "running" {
go func() {
m.stopInstance(instance)
time.Sleep(baseDuration)
m.startInstance(instance)
}()
} else {
go m.startInstance(instance)
}
go func() {
m.stopInstance(instance)
time.Sleep(baseDuration)
m.startInstance(instance)
}()
}
}
@@ -1139,7 +1309,11 @@ func (m *Master) handleDeleteInstance(w http.ResponseWriter, id string, instance
return
}
if instance.Status == "running" {
// 标记实例为已删除
instance.deleted = true
m.instances.Store(id, instance)
if instance.Status != "stopped" {
m.stopInstance(instance)
}
m.instances.Delete(id)
@@ -1202,12 +1376,14 @@ func (m *Master) handleSSE(w http.ResponseWriter, r *http.Request) {
// 客户端连接关闭标志
connectionClosed := make(chan struct{})
// 监听客户端连接是否关闭但不关闭通道留给Shutdown处理
// 监听客户端连接是否关闭
go func() {
<-ctx.Done()
close(connectionClosed)
// 从映射表中移除,但不关闭通道
m.subscribers.Delete(subscriberID)
// 从映射表中移除关闭通道
if ch, exists := m.subscribers.LoadAndDelete(subscriberID); exists {
close(ch.(chan *InstanceEvent))
}
}()
// 持续发送事件到客户端
@@ -1255,6 +1431,32 @@ func (m *Master) sendSSEEvent(eventType string, instance *Instance, logs ...stri
}
}
// shutdownSSEConnections 通知并关闭SSE连接
func (m *Master) shutdownSSEConnections() {
var wg sync.WaitGroup
// 发送shutdown通知并关闭通道
m.subscribers.Range(func(key, value any) bool {
ch := value.(chan *InstanceEvent)
wg.Add(1)
go func(subscriberID any, eventChan chan *InstanceEvent) {
defer wg.Done()
// 发送shutdown通知
select {
case eventChan <- &InstanceEvent{Type: "shutdown", Time: time.Now()}:
default:
}
// 从映射表中移除并关闭通道
if _, exists := m.subscribers.LoadAndDelete(subscriberID); exists {
close(eventChan)
}
}(key, ch)
return true
})
wg.Wait()
}
// startEventDispatcher 启动事件分发器
func (m *Master) startEventDispatcher() {
for event := range m.notifyChannel {
@@ -1345,9 +1547,7 @@ func (m *Master) startInstance(instance *Instance) {
// monitorInstance 监控实例状态
func (m *Master) monitorInstance(instance *Instance, cmd *exec.Cmd) {
done := make(chan error, 1)
go func() {
done <- cmd.Wait()
}()
go func() { done <- cmd.Wait() }()
for {
select {
@@ -1371,7 +1571,7 @@ func (m *Master) monitorInstance(instance *Instance, cmd *exec.Cmd) {
}
return
case <-time.After(reportInterval):
if !instance.lastCheckPoint.IsZero() && time.Since(instance.lastCheckPoint) > 5*reportInterval {
if !instance.lastCheckPoint.IsZero() && time.Since(instance.lastCheckPoint) > 3*reportInterval {
instance.Status = "error"
m.instances.Store(instance.ID, instance)
m.sendSSEEvent("update", instance)
@@ -1469,6 +1669,86 @@ func (m *Master) enhanceURL(instanceURL string, instanceType string) string {
return parsedURL.String()
}
// generateConfigURL 生成实例的完整URL
func (m *Master) generateConfigURL(instance *Instance) string {
parsedURL, err := url.Parse(instance.URL)
if err != nil {
m.logger.Error("generateConfigURL: invalid URL format: %v", err)
return instance.URL
}
query := parsedURL.Query()
// 设置日志级别
if m.logLevel != "" && query.Get("log") == "" {
query.Set("log", m.logLevel)
}
// 设置TLS配置
if instance.Type == "server" && m.tlsCode != "0" {
if query.Get("tls") == "" {
query.Set("tls", m.tlsCode)
}
// 为TLS code-2设置证书和密钥
if m.tlsCode == "2" {
if m.crtPath != "" && query.Get("crt") == "" {
query.Set("crt", m.crtPath)
}
if m.keyPath != "" && query.Get("key") == "" {
query.Set("key", m.keyPath)
}
}
}
// 根据实例类型设置默认参数
switch instance.Type {
case "client":
// client参数: min, mode, read, rate, slot, proxy
if query.Get("min") == "" {
query.Set("min", strconv.Itoa(defaultMinPool))
}
if query.Get("mode") == "" {
query.Set("mode", defaultRunMode)
}
if query.Get("read") == "" {
query.Set("read", defaultReadTimeout.String())
}
if query.Get("rate") == "" {
query.Set("rate", strconv.Itoa(defaultRateLimit))
}
if query.Get("slot") == "" {
query.Set("slot", strconv.Itoa(defaultSlotLimit))
}
if query.Get("proxy") == "" {
query.Set("proxy", defaultProxyProtocol)
}
case "server":
// server参数: max, mode, read, rate, slot, proxy
if query.Get("max") == "" {
query.Set("max", strconv.Itoa(defaultMaxPool))
}
if query.Get("mode") == "" {
query.Set("mode", defaultRunMode)
}
if query.Get("read") == "" {
query.Set("read", defaultReadTimeout.String())
}
if query.Get("rate") == "" {
query.Set("rate", strconv.Itoa(defaultRateLimit))
}
if query.Get("slot") == "" {
query.Set("slot", strconv.Itoa(defaultSlotLimit))
}
if query.Get("proxy") == "" {
query.Set("proxy", defaultProxyProtocol)
}
}
parsedURL.RawQuery = query.Encode()
return parsedURL.String()
}
// generateID 生成随机ID
func generateID() string {
bytes := make([]byte, 4)
@@ -1500,7 +1780,7 @@ func writeJSON(w http.ResponseWriter, statusCode int, data any) {
}
// generateOpenAPISpec 生成OpenAPI规范文档
func generateOpenAPISpec() string {
func (m *Master) generateOpenAPISpec() string {
return fmt.Sprintf(`{
"openapi": "3.1.1",
"info": {
@@ -1508,7 +1788,7 @@ func generateOpenAPISpec() string {
"description": "API for managing NodePass server and client instances",
"version": "%s"
},
"servers": [{"url": "/{prefix}/v1", "variables": {"prefix": {"default": "api", "description": "API prefix path"}}}],
"servers": [{"url": "%s"}],
"security": [{"ApiKeyAuth": []}],
"paths": {
"/instances": {
@@ -1606,6 +1886,17 @@ func generateOpenAPISpec() string {
"401": {"description": "Unauthorized"},
"405": {"description": "Method not allowed"}
}
},
"post": {
"summary": "Update master alias",
"security": [{"ApiKeyAuth": []}],
"requestBody": {"required": true, "content": {"application/json": {"schema": {"$ref": "#/components/schemas/UpdateMasterAliasRequest"}}}},
"responses": {
"200": {"description": "Success", "content": {"application/json": {"schema": {"$ref": "#/components/schemas/MasterInfo"}}}},
"400": {"description": "Invalid input"},
"401": {"description": "Unauthorized"},
"405": {"description": "Method not allowed"}
}
}
},
"/tcping": {
@@ -1664,7 +1955,9 @@ func generateOpenAPISpec() string {
"type": {"type": "string", "enum": ["client", "server"], "description": "Type of instance"},
"status": {"type": "string", "enum": ["running", "stopped", "error"], "description": "Instance status"},
"url": {"type": "string", "description": "Command string or API Key"},
"config": {"type": "string", "description": "Instance configuration URL"},
"restart": {"type": "boolean", "description": "Restart policy"},
"tags": {"type": "array", "items": {"$ref": "#/components/schemas/Tag"}, "description": "Tag array"},
"mode": {"type": "integer", "description": "Instance mode"},
"ping": {"type": "integer", "description": "TCPing latency"},
"pool": {"type": "integer", "description": "Pool active count"},
@@ -1686,7 +1979,8 @@ func generateOpenAPISpec() string {
"properties": {
"alias": {"type": "string", "description": "Instance alias"},
"action": {"type": "string", "enum": ["start", "stop", "restart", "reset"], "description": "Action for the instance"},
"restart": {"type": "boolean", "description": "Instance restart policy"}
"restart": {"type": "boolean", "description": "Instance restart policy"},
"tags": {"type": "array", "items": {"$ref": "#/components/schemas/Tag"}, "description": "Tag array"}
}
},
"PutInstanceRequest": {
@@ -1697,13 +1991,14 @@ func generateOpenAPISpec() string {
"MasterInfo": {
"type": "object",
"properties": {
"alias": {"type": "string", "description": "Master alias"},
"os": {"type": "string", "description": "Operating system"},
"arch": {"type": "string", "description": "System architecture"},
"cpu": {"type": "integer", "description": "CPU usage percentage"},
"mem_total": {"type": "integer", "format": "int64", "description": "Total memory in bytes"},
"mem_free": {"type": "integer", "format": "int64", "description": "Free memory in bytes"},
"mem_used": {"type": "integer", "format": "int64", "description": "Used memory in bytes"},
"swap_total": {"type": "integer", "format": "int64", "description": "Total swap space in bytes"},
"swap_free": {"type": "integer", "format": "int64", "description": "Free swap space in bytes"},
"swap_used": {"type": "integer", "format": "int64", "description": "Used swap space in bytes"},
"netrx": {"type": "integer", "format": "int64", "description": "Network received bytes"},
"nettx": {"type": "integer", "format": "int64", "description": "Network transmitted bytes"},
"diskr": {"type": "integer", "format": "int64", "description": "Disk read bytes"},
@@ -1718,6 +2013,11 @@ func generateOpenAPISpec() string {
"key": {"type": "string", "description": "Private key path"}
}
},
"UpdateMasterAliasRequest": {
"type": "object",
"required": ["alias"],
"properties": {"alias": {"type": "string", "description": "Master alias"}}
},
"TCPingResult": {
"type": "object",
"properties": {
@@ -1726,8 +2026,16 @@ func generateOpenAPISpec() string {
"latency": {"type": "integer", "format": "int64", "description": "Latency in milliseconds"},
"error": {"type": "string", "nullable": true, "description": "Error message"}
}
},
"Tag": {
"type": "object",
"required": ["key", "value"],
"properties": {
"key": {"type": "string", "description": "Tag key"},
"value": {"type": "string", "description": "Tag value"}
}
}
}
}
}`, openAPIVersion)
}`, openAPIVersion, m.prefix)
}

View File

@@ -3,7 +3,6 @@ package internal
import (
"bufio"
"bytes"
"context"
"crypto/tls"
"fmt"
@@ -13,6 +12,7 @@ import (
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"
@@ -29,26 +29,40 @@ type Server struct {
}
// NewServer 创建新的服务端实例
func NewServer(parsedURL *url.URL, tlsCode string, tlsConfig *tls.Config, logger *logs.Logger) *Server {
func NewServer(parsedURL *url.URL, tlsCode string, tlsConfig *tls.Config, logger *logs.Logger) (*Server, error) {
server := &Server{
Common: Common{
tlsCode: tlsCode,
logger: logger,
signalChan: make(chan string, semaphoreLimit),
tcpBufferPool: &sync.Pool{
New: func() any {
buf := make([]byte, tcpDataBufSize)
return &buf
},
},
udpBufferPool: &sync.Pool{
New: func() any {
buf := make([]byte, udpDataBufSize)
return &buf
},
},
},
tlsConfig: tlsConfig,
}
server.initConfig(parsedURL)
if err := server.initConfig(parsedURL); err != nil {
return nil, fmt.Errorf("newServer: initConfig failed: %w", err)
}
server.initRateLimiter()
return server
return server, nil
}
// Run 管理服务端生命周期
func (s *Server) Run() {
logInfo := func(prefix string) {
s.logger.Info("%v: %v@%v/%v?max=%v&mode=%v&read=%v&rate=%v&slot=%v",
s.logger.Info("%v: server://%v@%v/%v?max=%v&mode=%v&read=%v&rate=%v&slot=%v&proxy=%v",
prefix, s.tunnelKey, s.tunnelTCPAddr, s.targetTCPAddr,
s.maxPoolCapacity, s.runMode, s.readTimeout, s.rateLimit/125000, s.slotLimit)
s.maxPoolCapacity, s.runMode, s.readTimeout, s.rateLimit/125000, s.slotLimit, s.proxyProtocol)
}
logInfo("Server started")
@@ -168,7 +182,7 @@ func (s *Server) tunnelHandshake() error {
tunnelTCPConn.SetReadDeadline(time.Now().Add(handshakeTimeout))
bufReader := bufio.NewReader(tunnelTCPConn)
rawTunnelKey, err := bufReader.ReadString('\n')
rawTunnelKey, err := bufReader.ReadBytes('\n')
if err != nil {
s.logger.Warn("tunnelHandshake: handshake timeout: %v", tunnelTCPConn.RemoteAddr())
tunnelTCPConn.Close()
@@ -181,7 +195,20 @@ func (s *Server) tunnelHandshake() error {
}
tunnelTCPConn.SetReadDeadline(time.Time{})
tunnelKey := string(s.xor(bytes.TrimSuffix([]byte(rawTunnelKey), []byte{'\n'})))
// 解码隧道密钥
tunnelKeyData, err := s.decode(rawTunnelKey)
if err != nil {
s.logger.Warn("tunnelHandshake: decode tunnel key failed: %v", tunnelTCPConn.RemoteAddr())
tunnelTCPConn.Close()
select {
case <-s.ctx.Done():
return fmt.Errorf("tunnelHandshake: context error: %w", s.ctx.Err())
case <-time.After(serviceCooldown):
}
continue
}
tunnelKey := string(tunnelKeyData)
if tunnelKey != s.tunnelKey {
s.logger.Warn("tunnelHandshake: access denied: %v", tunnelTCPConn.RemoteAddr())
@@ -212,7 +239,7 @@ func (s *Server) tunnelHandshake() error {
Fragment: s.tlsCode,
}
_, err := s.tunnelTCPConn.Write(append(s.xor([]byte(tunnelURL.String())), '\n'))
_, err := s.tunnelTCPConn.Write(s.encode([]byte(tunnelURL.String())))
if err != nil {
return fmt.Errorf("tunnelHandshake: write tunnel config failed: %w", err)
}