Update On Tue Jul 22 20:42:26 CEST 2025

This commit is contained in:
github-action[bot]
2025-07-22 20:42:27 +02:00
parent 6f788af24a
commit e72b5a9691
358 changed files with 7167 additions and 3782 deletions

View File

@@ -34,7 +34,7 @@ func coreDispatch(parsedURL *url.URL) {
// getTLSProtocol 获取TLS配置
func getTLSProtocol(parsedURL *url.URL) (string, *tls.Config) {
// 生成基本TLS配置
tlsConfig, err := cert.NewTLSConfig("yosebyte/nodepass:" + version)
tlsConfig, err := cert.NewTLSConfig(version)
if err != nil {
logger.Error("Generate failed: %v", err)
logger.Warn("TLS code-0: nil cert")

View File

@@ -40,6 +40,8 @@ func getParsedURL(args []string) *url.URL {
// initLogLevel 初始化日志级别
func initLogLevel(level string) {
switch level {
case "none":
logger.SetLogLevel(logs.None)
case "debug":
logger.SetLogLevel(logs.Debug)
logger.Debug("Init log level: DEBUG")
@@ -49,6 +51,9 @@ func initLogLevel(level string) {
case "error":
logger.SetLogLevel(logs.Error)
logger.Error("Init log level: ERROR")
case "event":
logger.SetLogLevel(logs.Event)
logger.Event("Init log level: EVENT")
default:
logger.SetLogLevel(logs.Info)
logger.Info("Init log level: INFO")

View File

@@ -4,19 +4,22 @@
NodePass offers a RESTful API in Master Mode that enables programmatic control and integration with frontend applications. This section provides comprehensive d // Configure auto-start policy for new instance based on type
if (data.success) {
const shouldAutoStart = config.type === 'server' || config.critical === true;
await setAutoStartPolicy(data.data.id, shouldAutoStart);
}ntation of the API endpoints, integration patterns, and best practices.
# NodePass API Reference
## Overview
NodePass provides a RESTful API in Master Mode for programmatic control and frontend integration. This document covers all endpoints, data structures, and best practices.
## Master Mode API
When running NodePass in Master Mode (`master://`), it exposes a REST API that allows frontend applications to:
When running in `master://` mode, NodePass supports:
1. Create and manage NodePass server and client instances
2. Monitor connection status and statistics
3. Control running instances (start, stop, restart)
4. Configure auto-start policies for automatic instance management
5. Configure behavior through parameters
1. Creating and managing server/client instances
2. Real-time monitoring of status, traffic, and health checks
3. Instance control (start, stop, restart, reset traffic)
4. Configurable auto-start policy
5. Flexible parameter configuration
### Base URL
@@ -24,540 +27,79 @@ When running NodePass in Master Mode (`master://`), it exposes a REST API that a
master://<api_addr>/<prefix>?<log>&<tls>
```
Where:
- `<api_addr>` is the address specified in the master mode URL (e.g., `0.0.0.0:9090`)
- `<prefix>` is the optional API prefix (if not specified, `/api` will be used as the prefix)
- `<api_addr>`: Listen address (e.g. `0.0.0.0:9090`)
- `<prefix>`: API path prefix (default `/api`)
### Starting Master Mode
To start NodePass in Master Mode with default settings:
```bash
nodepass "master://0.0.0.0:9090?log=info"
```
With custom API prefix and TLS enabled:
```bash
nodepass "master://0.0.0.0:9090/admin?log=info&tls=1"
```
### Available Endpoints
### Main Endpoints
| Endpoint | Method | Description |
|----------|--------|-------------|
| `/instances` | GET | List all NodePass instances |
| `/instances` | POST | Create a new NodePass instance |
| `/instances/{id}` | GET | Get details about a specific instance |
| `/instances/{id}` | PATCH | Update instance state or control operations |
| `/instances/{id}` | PUT | Update instance URL configuration |
| `/instances/{id}` | DELETE | Remove a specific instance |
| `/events` | GET | Subscribe to instance events using SSE |
| `/info` | GET | Get master service information |
| `/openapi.json` | GET | OpenAPI specification |
| `/docs` | GET | Swagger UI documentation |
| Endpoint | Method | Description |
|--------------------|--------|----------------------|
| `/instances` | GET | List all instances |
| `/instances` | POST | Create new instance |
| `/instances/{id}` | GET | Get instance details |
| `/instances/{id}` | PATCH | Update/control |
| `/instances/{id}` | PUT | Update instance URL |
| `/instances/{id}` | DELETE | Delete instance |
| `/events` | GET | SSE event stream |
| `/info` | GET | Master info |
| `/openapi.json` | GET | OpenAPI spec |
| `/docs` | GET | Swagger UI |
### API Authentication
The Master API now supports API Key authentication to prevent unauthorized access. The system automatically generates an API Key on first startup.
API Key authentication is enabled by default. The key is auto-generated and stored in `nodepass.gob`.
#### API Key Features
- Protected: `/instances`, `/instances/{id}`, `/events`, `/info`
- Public: `/openapi.json`, `/docs`
- Use header: `X-API-Key: <key>`
- Regenerate: PATCH `/instances/********` with `{ "action": "restart" }`
1. **Automatic Generation**: Created automatically when master mode is first started
2. **Persistent Storage**: The API Key is saved along with other instance configurations in the `nodepass.gob` file
3. **Retention After Restart**: The API Key remains the same after restarting the master
4. **Selective Protection**: Only critical API endpoints are protected, public documentation remains accessible
### Instance Data Structure
#### Protected Endpoints
The following endpoints require API Key authentication:
- `/instances` (all methods)
- `/instances/{id}` (all methods: GET, PATCH, PUT, DELETE)
- `/events`
- `/info`
The following endpoints are publicly accessible (no API Key required):
- `/openapi.json`
- `/docs`
#### How to Use the API Key
Include the API Key in your API requests:
```javascript
// Using an API Key for instance management requests
async function getInstances() {
const response = await fetch(`${API_URL}/instances`, {
method: 'GET',
headers: {
'X-API-Key': 'your-api-key-here'
}
});
return await response.json();
```json
{
"id": "a1b2c3d4",
"alias": "alias",
"type": "client|server",
"status": "running|stopped|error",
"url": "...",
"restart": true,
"tcprx": 0,
"tcptx": 0,
"udprx": 0,
"udptx": 0,
"pool": 0, // Health check pool size
"ping": 0 // Health check ping (ms)
}
```
#### How to Get and Regenerate API Key
- `pool`/`ping`: health check data, only available in debug mode
- `tcprx`/`tcptx`/`udprx`/`udptx`: cumulative traffic stats
- `restart`: auto-start policy
The API Key can be found in the system startup logs, and can be regenerated using:
### Instance URL Format
```javascript
// Regenerate the API Key (requires knowing the current API Key)
async function regenerateApiKey() {
const response = await fetch(`${API_URL}/instances/${apiKeyID}`, {
method: 'PATCH',
headers: {
'Content-Type': 'application/json',
'X-API-Key': 'current-api-key'
},
body: JSON.stringify({ action: 'restart' })
});
const result = await response.json();
return result.url; // The new API Key
}
```
- Server: `server://<bind_addr>:<bind_port>/<target_host>:<target_port>?<params>`
- Client: `client://<server_host>:<server_port>/<local_host>:<local_port>?<params>`
- Supported params: `tls`, `log`, `crt`, `key`
**Note**: The API Key ID is fixed as `********` (eight asterisks). In the internal implementation, this is a special instance ID used to store and manage the API Key.
### Real-time Events (SSE)
## Frontend Integration Guidelines
- Event types: `initial`, `create`, `update`, `delete`, `shutdown`, `log`
- Only normal logs are sent in `log` events; traffic/health logs are filtered
- Connect to `/events` for real-time instance and log updates
When integrating NodePass with frontend applications, consider the following important points:
### Instance Persistence
NodePass Master Mode now supports instance persistence using the gob serialization format. Instances and their states are saved to a `nodepass.gob` file in the same directory as the executable, and automatically restored when the master restarts.
Key persistence features:
- Instance configurations are automatically saved to disk
- Instance state (running/stopped) is preserved
- Auto-start policies are preserved across master restarts
- Traffic statistics are retained between restarts
- Instances with auto-start policy enabled will automatically start when master restarts
- No need for manual re-registration after restart
**Note:** While instance configurations are now persisted, frontend applications should still maintain their own record of instance configurations as a backup strategy.
### Instance ID Persistence
With NodePass now using gob format for persistent storage of instance state, instance IDs **no longer change** after a master restart. This means:
1. Frontend applications can safely use instance IDs as unique identifiers
2. Instance configurations, states, and statistics are automatically restored after restart
3. No need to implement logic for handling instance ID changes
This greatly simplifies frontend integration by eliminating the previous complexity of handling instance recreation and ID mapping.
### Auto-start Policy Management
NodePass now supports configurable auto-start policies for instances, allowing for automatic instance management and improved reliability. The auto-start policy feature enables:
1. **Automatic Instance Recovery**: Instances with auto-start policy enabled will automatically start when the master service restarts
2. **Selective Auto-start**: Configure which instances should auto-start based on their importance or role
3. **Persistent Policy Storage**: Auto-start policies are saved and restored across master restarts
4. **Fine-grained Control**: Each instance can have its own auto-start policy setting
#### How Auto-start Policy Works
- **Policy Assignment**: Each instance has a `restart` boolean field that determines its auto-start behavior
- **Master Startup**: When the master starts, it automatically launches all instances with `restart: true`
- **Policy Persistence**: Auto-start policies are saved in the same `nodepass.gob` file as other instance data
- **Runtime Management**: Auto-start policies can be modified while instances are running
#### Best Practices for Auto-start Policy
1. **Enable for Server Instances**: Server instances typically should have auto-start policy enabled for high availability
2. **Selective Client Auto-start**: Enable auto-start policy for critical client connections only
3. **Testing Scenarios**: Disable auto-start policy for temporary or testing instances
4. **Load Balancing**: Use auto-start policies to maintain minimum instance counts for load distribution
```javascript
// Example: Configure auto-start policies based on instance role
async function configureAutoStartPolicies(instances) {
for (const instance of instances) {
// Enable auto-start for servers and critical clients
const shouldAutoStart = instance.type === 'server' ||
instance.tags?.includes('critical');
await setAutoStartPolicy(instance.id, shouldAutoStart);
}
}
```
### Instance Lifecycle Management
For proper lifecycle management:
1. **Creation**: Store instance configurations and URLs
```javascript
async function createNodePassInstance(config) {
const response = await fetch(`${API_URL}/instances`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
url: `server://0.0.0.0:${config.port}/${config.target}?tls=${config.tls}`
})
});
const data = await response.json();
// Configure restart policy for new instance based on type
if (data.success) {
const shouldAutoRestart = config.type === 'server' || config.critical === true;
await setRestartPolicy(data.data.id, shouldAutoRestart);
}
// Store in frontend persistence
saveInstanceConfig({
id: data.data.id,
originalConfig: config,
url: data.data.url
});
return data;
}
```
2. **Status Monitoring**: Monitor instance state changes
NodePass provides two methods for monitoring instance status:
A. **Using SSE (Recommended)**: Receive real-time events via persistent connection
```javascript
function connectToEventSource() {
const eventSource = new EventSource(`${API_URL}/events`, {
// If authentication is needed, native EventSource doesn't support custom headers
// Need to use fetch API to implement a custom SSE client
});
// If using API Key, use custom implementation instead of native EventSource
// Example using native EventSource (for non-protected endpoints)
eventSource.addEventListener('instance', (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case 'initial':
console.log('Initial instance state:', data.instance);
updateInstanceUI(data.instance);
break;
case 'create':
console.log('Instance created:', data.instance);
addInstanceToUI(data.instance);
break;
case 'update':
console.log('Instance updated:', data.instance);
updateInstanceUI(data.instance);
break;
case 'delete':
console.log('Instance deleted:', data.instance);
removeInstanceFromUI(data.instance.id);
break;
case 'log':
console.log(`Instance ${data.instance.id} log:`, data.logs);
appendLogToInstanceUI(data.instance.id, data.logs);
break;
case 'shutdown':
console.log('Master service is shutting down');
// Close the event source and show notification
eventSource.close();
showShutdownNotification();
break;
}
});
eventSource.addEventListener('error', (error) => {
console.error('SSE connection error:', error);
// Attempt to reconnect after a delay
setTimeout(() => {
eventSource.close();
connectToEventSource();
}, 5000);
});
return eventSource;
}
// Example of creating SSE connection with API Key
function connectToEventSourceWithApiKey(apiKey) {
// Native EventSource doesn't support custom headers, need to use fetch API
fetch(`${API_URL}/events`, {
method: 'GET',
headers: {
'X-API-Key': apiKey,
'Cache-Control': 'no-cache'
}
}).then(response => {
if (!response.ok) {
throw new Error(`HTTP error: ${response.status}`);
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
function processStream() {
reader.read().then(({ value, done }) => {
if (done) {
console.log('Connection closed');
// Try to reconnect
setTimeout(() => connectToEventSourceWithApiKey(apiKey), 5000);
return;
}
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (line.trim() === '') continue;
const eventMatch = line.match(/^event: (.+)$/m);
const dataMatch = line.match(/^data: (.+)$/m);
if (eventMatch && dataMatch) {
const data = JSON.parse(dataMatch[1]);
// Process events - see switch code above
}
}
processStream();
}).catch(error => {
console.error('Read error:', error);
// Try to reconnect
setTimeout(() => connectToEventSourceWithApiKey(apiKey), 5000);
});
}
processStream();
}).catch(error => {
console.error('Connection error:', error);
// Try to reconnect
setTimeout(() => connectToEventSourceWithApiKey(apiKey), 5000);
});
}
```
B. **Traditional Polling (Alternative)**: Use in environments where SSE is not supported
```javascript
function startInstanceMonitoring(instanceId, interval = 5000) {
return setInterval(async () => {
try {
const response = await fetch(`${API_URL}/instances/${instanceId}`);
const data = await response.json();
if (data.success) {
updateInstanceStatus(instanceId, data.data.status);
updateInstanceMetrics(instanceId, {
connections: data.data.connections,
pool_size: data.data.pool_size,
uptime: data.data.uptime
});
}
} catch (error) {
markInstanceUnreachable(instanceId);
}
}, interval);
}
```
**Recommendation:** Prefer the SSE approach as it provides more efficient real-time monitoring and reduces server load. Only use the polling approach for client environments with specific compatibility needs or where SSE is not supported.
3. **Instance Alias Management**: Set readable names for instances
```javascript
// Batch set instance aliases
async function setInstanceAliases(instances) {
for (const instance of instances) {
// Generate meaningful aliases based on instance type and purpose
const alias = `${instance.type}-${instance.region || 'default'}-${instance.port || 'auto'}`;
await updateInstanceAlias(instance.id, alias);
}
}
// Find instance by alias
async function findInstanceByAlias(targetAlias) {
const response = await fetch(`${API_URL}/instances`, {
headers: { 'X-API-Key': apiKey }
});
const data = await response.json();
if (data.success) {
return data.data.find(instance => instance.alias === targetAlias);
}
return null;
}
```
4. **Control Operations**: Start, stop, restart instances
```javascript
async function controlInstance(instanceId, action) {
// action can be: start, stop, restart
const response = await fetch(`${API_URL}/instances/${instanceId}`, {
method: 'PATCH', // Note: API has been updated to use PATCH instead of PUT
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ action })
});
const data = await response.json();
return data.success;
}
// Update instance alias
async function updateInstanceAlias(instanceId, alias) {
const response = await fetch(`${API_URL}/instances/${instanceId}`, {
method: 'PATCH',
headers: {
'Content-Type': 'application/json',
'X-API-Key': apiKey // If API Key is enabled
},
body: JSON.stringify({ alias })
});
const data = await response.json();
return data.success;
}
// Update instance URL configuration
async function updateInstanceURL(instanceId, newURL) {
const response = await fetch(`${API_URL}/instances/${instanceId}`, {
method: 'PUT',
headers: {
'Content-Type': 'application/json',
'X-API-Key': apiKey // If API Key is enabled
},
body: JSON.stringify({ url: newURL })
});
const data = await response.json();
return data.success;
}
```
5. **Auto-start Policy Management**: Configure automatic startup behavior
```javascript
async function setAutoStartPolicy(instanceId, enableAutoStart) {
const response = await fetch(`${API_URL}/instances/${instanceId}`, {
method: 'PATCH',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ restart: enableAutoStart })
});
const data = await response.json();
return data.success;
}
// Combined operation: control instance and update auto-start policy
async function controlInstanceWithAutoStart(instanceId, action, enableAutoStart) {
const response = await fetch(`${API_URL}/instances/${instanceId}`, {
method: 'PATCH',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
action: action,
restart: enableAutoStart
})
});
const data = await response.json();
return data.success;
}
// Combined operation: update alias, control instance and auto-start policy
async function updateInstanceComplete(instanceId, alias, action, enableAutoStart) {
const response = await fetch(`${API_URL}/instances/${instanceId}`, {
method: 'PATCH',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
alias: alias,
action: action,
restart: enableAutoStart
})
});
const data = await response.json();
return data.success;
}
```
#### Complete Auto-start Policy Usage Example
Here's a comprehensive example showing how to implement auto-start policy management in a real-world scenario:
```javascript
// Scenario: Setting up a load-balanced server cluster with auto-start policies
async function setupServerCluster(serverConfigs) {
const clusterInstances = [];
for (const config of serverConfigs) {
try {
// Create server instance
const instance = await createNodePassInstance({
type: 'server',
port: config.port,
target: config.target,
critical: config.isPrimary, // Primary servers are critical
tls: config.enableTLS
});
if (instance.success) {
// Set meaningful instance alias
const alias = `${config.role}-server-${config.port}`;
await updateInstanceAlias(instance.data.id, alias);
// Configure auto-start policy based on server role
const autoStartPolicy = config.isPrimary || config.role === 'essential';
await setAutoStartPolicy(instance.data.id, autoStartPolicy);
// Start the instance
await controlInstance(instance.data.id, 'start');
clusterInstances.push({
id: instance.data.id,
alias: alias,
role: config.role,
autoStartEnabled: autoStartPolicy
});
console.log(`Server ${alias} created with auto-start policy: ${autoStartPolicy}`);
}
} catch (error) {
console.error(`Failed to create server ${config.role}:`, error);
}
}
return clusterInstances;
}
// Monitor cluster health and adjust auto-start policies dynamically
async function monitorClusterHealth(clusterInstances) {
const healthyInstances = [];
for (const cluster of clusterInstances) {
const instance = await fetch(`${API_URL}/instances/${cluster.id}`);
const data = await instance.json();
if (data.success && data.data.status === 'running') {
healthyInstances.push(cluster);
} else {
// If a critical instance is down, enable auto-start for backup instances
if (cluster.role === 'primary') {
await enableBackupInstanceAutoStart(clusterInstances);
}
}
}
return healthyInstances;
}
async function enableBackupInstanceAutoStart(clusterInstances) {
const backupInstances = clusterInstances.filter(c => c.role === 'backup');
for (const backup of backupInstances) {
await setAutoStartPolicy(backup.id, true);
console.log(`Enabled auto-start policy for backup instance: ${backup.id}`);
}
}
```
### Additional Notes
- All instance, traffic, health, alias, and auto-start data are persisted and restored after restart
- Full OpenAPI spec: `/openapi.json`, Swagger UI: `/docs`
### Real-time Event Monitoring with SSE
NodePass now supports Server-Sent Events (SSE) for real-time monitoring of instance state changes. This allows frontend applications to receive instant notifications about instance creation, updates, and deletions without polling.
@@ -762,16 +304,7 @@ When implementing SSE in your frontend:
The Master API provides traffic statistics, but there are important requirements to note:
1. **Enable Debug Mode**: Traffic statistics are only available when debug mode is enabled.
```bash
# Master with debug mode enabled
nodepass master://0.0.0.0:10101?log=debug
```
Without enabling debug mode, traffic statistics will not be collected or returned by the API.
2. **Basic Traffic Metrics**: NodePass periodically provides cumulative TCP and UDP traffic values in both inbound and outbound directions. The frontend application needs to store and process these values to derive meaningful statistics.
1. **Basic Traffic Metrics**: NodePass periodically provides cumulative TCP and UDP traffic values in both inbound and outbound directions. The frontend application needs to store and process these values to derive meaningful statistics.
```javascript
function processTrafficStats(instanceId, currentStats) {
// Store the current timestamp
@@ -806,7 +339,7 @@ The Master API provides traffic statistics, but there are important requirements
}
```
3. **Data Persistence**: Since the API only provides cumulative values, the frontend must implement proper storage and calculation logic
2. **Data Persistence**: Since the API only provides cumulative values, the frontend must implement proper storage and calculation logic
```javascript
// Example of frontend storage structure for traffic history
const trafficHistory = {};

View File

@@ -4,8 +4,9 @@ NodePass uses a minimalist approach to configuration, with all settings specifie
## Log Levels
NodePass provides five log verbosity levels that control the amount of information displayed:
NodePass provides six log verbosity levels that control the amount of information displayed:
- `none`: Disable logging - no log information displayed
- `debug`: Verbose debugging information - shows all operations and connections
- `info`: General operational information (default) - shows startup, shutdown, and key events
- `warn`: Warning conditions - only shows potential issues that don't affect core functionality
@@ -51,7 +52,7 @@ nodepass "server://0.0.0.0:10101/0.0.0.0:8080?tls=2&crt=/path/to/cert.pem&key=/p
Connection pool capacity can be configured via URL query parameters:
- `min`: Minimum connection pool capacity (default: 64)
- `max`: Maximum connection pool capacity (default: 8192)
- `max`: Maximum connection pool capacity (default: 1024)
Example:
```bash
@@ -59,6 +60,27 @@ Example:
nodepass "client://server.example.com:10101/127.0.0.1:8080?min=32&max=4096"
```
## URL Query Parameter Scope and Applicability
NodePass allows flexible configuration via URL query parameters. The following table shows which parameters are applicable in server, client, and master modes:
| Parameter | Description | server | client | master |
|-----------|----------------------|:------:|:------:|:------:|
| `log` | Log level | O | O | O |
| `tls` | TLS encryption mode | O | X | O |
| `crt` | Custom certificate path| O | X | O |
| `key` | Custom key path | O | X | O |
| `min` | Minimum pool capacity | X | O | X |
| `max` | Maximum pool capacity | O | O | X |
- O: Parameter is valid and recommended for configuration
- X: Parameter is not applicable and should be ignored
**Best Practices:**
- For server/master modes, configure security-related parameters (`tls`, `crt`, `key`) to enhance data channel security.
- For client/master modes, adjust connection pool capacity (`min`, `max`) based on traffic and resource constraints for optimal performance.
- Log level (`log`) can be set in all modes for easier operations and troubleshooting.
## Environment Variables
NodePass behavior can be fine-tuned using environment variables. Below is the complete list of available variables with their descriptions, default values, and recommended settings for different scenarios.

View File

@@ -18,9 +18,9 @@ Where:
### Query Parameters
Common query parameters:
- `log=<level>`: Log verbosity level (`debug`, `info`, `warn`, `error`, or `event`)
- `log=<level>`: Log verbosity level (`none`, `debug`, `info`, `warn`, `error`, or `event`)
- `min=<min_pool>`: Minimum connection pool capacity (default: 64, client mode only)
- `max=<max_pool>`: Maximum connection pool capacity (default: 8192, client mode only)
- `max=<max_pool>`: Maximum connection pool capacity (default: 1024, client mode only)
TLS-related parameters (server/master modes only):
- `tls=<mode>`: TLS security level for data channels (`0`, `1`, or `2`)
@@ -93,7 +93,7 @@ nodepass "client://<tunnel_addr>/<target_addr>?log=<level>&min=<min_pool>&max=<m
- `target_addr`: The destination address for business data with bidirectional flow support (e.g., 127.0.0.1:8080)
- `log`: Log level (debug, info, warn, error, event)
- `min`: Minimum connection pool capacity (default: 64)
- `max`: Maximum connection pool capacity (default: 8192)
- `max`: Maximum connection pool capacity (default: 1024)
#### How Client Mode Works

View File

@@ -4,100 +4,101 @@
NodePass在主控模式Master Mode下提供了RESTful API使前端应用能够以编程方式进行控制和集成。本节提供API端点、集成模式和最佳实践的全面文档。
## 主控模式API
# NodePass API 参考
当NodePass以主控模式`master://`运行时它会暴露REST API允许前端应用
## 概述
1. 创建和管理NodePass服务器和客户端实例
2. 监控连接状态和统计信息
3. 控制运行中的实例(启动、停止、重启)
4. 配置实例自启动策略以实现自动化管理
5. 通过参数配置行为
NodePass 主控模式Master Mode下提供 RESTful API支持前端集成和自动化。本文档涵盖所有接口、数据结构和最佳实践。
### 基本URL
## 主控模式 API
主控模式(`master://`NodePass 支持:
1. 创建和管理服务端/客户端实例
2. 实时监控状态、流量、健康检查
3. 控制实例(启动、停止、重启、重置流量)
4. 配置自启动策略
5. 灵活参数配置
### 基础 URL
```
master://<api_addr>/<prefix>?<log>&<tls>
```
其中:
- `<api_addr>`是主控模式URL中指定的地址例如`0.0.0.0:9090`
- `<prefix>`是可选的API前缀如果未指定则使用`/api`
- `<api_addr>`:监听地址(如 `0.0.0.0:9090`
- `<prefix>`API 路径前缀(默认 `/api`
### 启动主控模式
使用默认设置启动主控模式的NodePass
```bash
nodepass "master://0.0.0.0:9090?log=info"
```
使用自定义API前缀和启用TLS
```bash
nodepass "master://0.0.0.0:9090/admin?log=info&tls=1"
```
### 可用端点
### 主要接口
| 端点 | 方法 | 描述 |
|----------|--------|-------------|
| `/instances` | GET | 列出所有NodePass实例 |
| `/instances` | POST | 创建新的NodePass实例 |
| `/instances/{id}` | GET | 获取特定实例的详细信息 |
| `/instances/{id}` | PATCH | 更新实例状态或控制操作 |
| `/instances/{id}` | PUT | 更新实例URL配置 |
| `/instances/{id}` | DELETE | 删除特定实例 |
| `/events` | GET | 使用SSE订阅实例事件通知 |
| `/info` | GET | 获取主控服务信息 |
| `/openapi.json` | GET | OpenAPI规范 |
| `/docs` | GET | Swagger UI文档界面 |
| Endpoint | Method | 说明 |
|--------------------|--------|----------------------|
| `/instances` | GET | 获取所有实例 |
| `/instances` | POST | 创建新实例 |
| `/instances/{id}` | GET | 获取实例详情 |
| `/instances/{id}` | PATCH | 更新/控制实例 |
| `/instances/{id}` | PUT | 更新实例 URL |
| `/instances/{id}` | DELETE | 删除实例 |
| `/events` | GET | SSE 实时事件流 |
| `/info` | GET | 获取主控服务信息 |
| `/openapi.json` | GET | OpenAPI 规范 |
| `/docs` | GET | Swagger UI 文档 |
### API认证
### API 鉴权
NodePass主控API现在支持API Key认证可以防止未经授权的访问。系统会在首次启动自动生成一个API Key
API Key 认证默认启用,首次启动自动生成并保存在 `nodepass.gob`
#### API Key特点
- 受保护接口:`/instances``/instances/{id}``/events``/info`
- 公共接口:`/openapi.json``/docs`
- 认证方式:请求头加 `X-API-Key: <key>`
- 重置 KeyPATCH `/instances/********`body `{ "action": "restart" }`
1. **自动生成**:首次启动主控模式时自动创建
2. **持久化存储**API Key与其他实例配置一起保存在`nodepass.gob`文件中
3. **重启后保留**重启主控后API Key保持不变
4. **选择性保护**仅保护关键API端点公共文档仍可访问
### 实例数据结构
#### 受保护的端点
以下端点需要API Key认证
- `/instances`(所有方法)
- `/instances/{id}`所有方法GET、PATCH、PUT、DELETE
- `/events`
- `/info`
以下端点可公开访问无需API Key
- `/openapi.json`
- `/docs`
#### 如何使用API Key
在API请求中包含API Key
```javascript
// 使用API Key进行实例管理请求
async function getInstances() {
const response = await fetch(`${API_URL}/instances`, {
method: 'GET',
headers: {
'X-API-Key': 'your-api-key-here'
}
});
return await response.json();
```json
{
"id": "a1b2c3d4",
"alias": "别名",
"type": "client|server",
"status": "running|stopped|error",
"url": "...",
"restart": true,
"tcprx": 0,
"tcptx": 0,
"udprx": 0,
"udptx": 0,
"pool": 0, // 健康检查池连接数
"ping": 0 // 健康检查延迟(ms)
}
```
#### 如何获取和重新生成API Key
- `pool`/`ping`:健康检查数据,仅 debug 模式下统计
- `tcprx`/`tcptx`/`udprx`/`udptx`:累计流量统计
- `restart`:自启动策略
API Key可以在系统启动日志中找到也可以通过以下方式重新生成
### 实例 URL 格式
- 服务端:`server://<bind_addr>:<bind_port>/<target_host>:<target_port>?<参数>`
- 客户端:`client://<server_host>:<server_port>/<local_host>:<local_port>?<参数>`
- 支持参数:`tls``log``crt``key`
### 实时事件流SSE
- 事件类型:`initial``create``update``delete``shutdown``log`
- `log` 事件仅推送普通日志,流量/健康检查日志已被过滤
- 连接 `/events` 可实时获取实例变更和日志
### 其他说明
- 所有实例、流量、健康检查、别名、自启动策略均持久化存储,重启后自动恢复
- API 详细规范见 `/openapi.json`Swagger UI 见 `/docs`
```javascript
// 重新生成API Key需要知道当前的API Key
async function regenerateApiKey() {
@@ -633,16 +634,7 @@ async function enableBackupInstanceAutoStart(clusterInstances) {
主控API提供流量统计数据但需要注意以下重要事项
1. **启用调试模式**:流量统计功能仅在启用调试模式时可用
```bash
# 启用调试模式的主控
nodepass master://0.0.0.0:10101?log=debug
```
如果未启用调试模式API将不会收集或返回流量统计数据。
2. **基本流量指标**NodePass周期性地提供TCP和UDP流量在入站和出站方向上的累计值前端应用需要存储和处理这些值以获得有意义的统计信息。
1. **基本流量指标**NodePass周期性地提供TCP和UDP流量在入站和出站方向上的累计值前端应用需要存储和处理这些值以获得有意义的统计信息
```javascript
function processTrafficStats(instanceId, currentStats) {
// 存储当前时间戳
@@ -677,7 +669,7 @@ async function enableBackupInstanceAutoStart(clusterInstances) {
}
```
3. **数据持久化**由于API只提供累计值前端必须实现适当的存储和计算逻辑
2. **数据持久化**由于API只提供累计值前端必须实现适当的存储和计算逻辑
```javascript
// 前端流量历史存储结构示例
const trafficHistory = {};

View File

@@ -4,8 +4,9 @@ NodePass采用极简方法进行配置所有设置都通过命令行参数和
## 日志级别
NodePass提供种日志详细级别,控制显示的信息量:
NodePass提供种日志详细级别,控制显示的信息量:
- `none`:禁用日志记录 - 不显示任何日志信息
- `debug`:详细调试信息 - 显示所有操作和连接
- `info`:一般操作信息(默认) - 显示启动、关闭和关键事件
- `warn`:警告条件 - 仅显示不影响核心功能的潜在问题
@@ -51,7 +52,7 @@ nodepass "server://0.0.0.0:10101/0.0.0.0:8080?tls=2&crt=/path/to/cert.pem&key=/p
连接池容量可以通过URL查询参数进行配置
- `min`: 最小连接池容量(默认: 64
- `max`: 最大连接池容量(默认: 8192
- `max`: 最大连接池容量(默认: 1024
示例:
```bash
@@ -59,6 +60,28 @@ nodepass "server://0.0.0.0:10101/0.0.0.0:8080?tls=2&crt=/path/to/cert.pem&key=/p
nodepass "client://server.example.com:10101/127.0.0.1:8080?min=32&max=4096"
```
## URL查询参数配置及作用范围
NodePass支持通过URL查询参数进行灵活配置不同参数在 server、client、master 模式下的适用性如下表:
| 参数 | 说明 | server | client | master |
|-----------|----------------------|:------:|:------:|:------:|
| `log` | 日志级别 | O | O | O |
| `tls` | TLS加密模式 | O | X | O |
| `crt` | 自定义证书路径 | O | X | O |
| `key` | 自定义密钥路径 | O | X | O |
| `min` | 最小连接池容量 | X | O | X |
| `max` | 最大连接池容量 | O | O | X |
- O参数有效推荐根据实际场景配置
- X参数无效忽略设置
**最佳实践:**
- server/master 模式建议配置安全相关参数(如 tls、crt、key提升数据通道安全性。
- client 模式建议根据流量和资源情况调整连接池容量min/max优化性能。
- 日志级别log可在所有模式下灵活调整便于运维和排查。
## 环境变量
可以使用环境变量微调NodePass行为。以下是所有可用变量的完整列表包括其描述、默认值以及不同场景的推荐设置。

View File

@@ -18,9 +18,9 @@ nodepass "<core>://<tunnel_addr>/<target_addr>?log=<level>&tls=<mode>&crt=<cert_
### 查询参数说明
通用查询参数:
- `log=<level>`:日志详细级别(`debug``info``warn``error``event`
- `log=<level>`:日志详细级别(`none``debug``info``warn``error``event`
- `min=<min_pool>`最小连接池容量默认64仅适用于client模式
- `max=<max_pool>`:最大连接池容量(默认:8192仅适用于client模式
- `max=<max_pool>`:最大连接池容量(默认:1024仅适用于client模式
TLS相关参数仅适用于server/master模式
- `tls=<mode>`数据通道的TLS安全级别`0``1``2`
@@ -93,7 +93,7 @@ nodepass "client://<tunnel_addr>/<target_addr>?log=<level>&min=<min_pool>&max=<m
- `target_addr`:业务数据的目标地址,支持双向数据流模式(例如, 127.0.0.1:8080)
- `log`:日志级别(debug, info, warn, error, event)
- `min`最小连接池容量默认64
- `max`:最大连接池容量(默认:8192
- `max`:最大连接池容量(默认:1024
#### 客户端模式工作原理

View File

@@ -4,7 +4,7 @@ go 1.24.3
require (
github.com/NodePassProject/cert v1.0.0
github.com/NodePassProject/conn v1.0.1
github.com/NodePassProject/logs v1.0.1
github.com/NodePassProject/pool v1.0.9
github.com/NodePassProject/conn v1.0.3
github.com/NodePassProject/logs v1.0.2
github.com/NodePassProject/pool v1.0.18
)

View File

@@ -1,8 +1,8 @@
github.com/NodePassProject/cert v1.0.0 h1:cBNNvR+ja22AgNlUmeGWLcCM1vmnLTqpbCQ4Hdn5was=
github.com/NodePassProject/cert v1.0.0/go.mod h1:4EJDS3GozJ74dtICJ/xcq42WKKvF0tiTM9/M7Q9NF9c=
github.com/NodePassProject/conn v1.0.1 h1:vuzcQQj+cqENagzEYPwse9Vvlj/8vfkyNZCp5RvQMKk=
github.com/NodePassProject/conn v1.0.1/go.mod h1:mWe3Rylunp6Sx4v6pkSGgYZe2R+I/O+7nZ2od0yJ3aQ=
github.com/NodePassProject/logs v1.0.1 h1:WDHY1DcTO+7NydBzuRpxhEw6pWYayBdDjjZzU1uDKac=
github.com/NodePassProject/logs v1.0.1/go.mod h1:ocFTMNXBTnQFJFAhF+qobAzu7+y+wYPik7D+a1jPfis=
github.com/NodePassProject/pool v1.0.9 h1:8VKKv8kJqg1FX9Odx9Vu2eTUOpO2G84uSfiyDTuHvvA=
github.com/NodePassProject/pool v1.0.9/go.mod h1:kdRAEDK45j/+iHH4kRTpXt/wI28NIguJ13n/5NDXxkw=
github.com/NodePassProject/conn v1.0.3 h1:yw9rimaOMvQYF2kzMD9a5MfvJ+US7AOFinyx+QbdX78=
github.com/NodePassProject/conn v1.0.3/go.mod h1:mWe3Rylunp6Sx4v6pkSGgYZe2R+I/O+7nZ2od0yJ3aQ=
github.com/NodePassProject/logs v1.0.2 h1:z4b+jAMHtVJoBb2tsD58gVa/9ftd1Dy6DXHrS4IgafM=
github.com/NodePassProject/logs v1.0.2/go.mod h1:ocFTMNXBTnQFJFAhF+qobAzu7+y+wYPik7D+a1jPfis=
github.com/NodePassProject/pool v1.0.18 h1:urZeotSjcVdzoZDBDPOGNt/NtH1Ngxlj60ByF2ZsvY0=
github.com/NodePassProject/pool v1.0.18/go.mod h1:kdRAEDK45j/+iHH4kRTpXt/wI28NIguJ13n/5NDXxkw=

View File

@@ -12,6 +12,7 @@ import (
"syscall"
"time"
"github.com/NodePassProject/conn"
"github.com/NodePassProject/logs"
"github.com/NodePassProject/pool"
)
@@ -28,7 +29,6 @@ func NewClient(parsedURL *url.URL, logger *logs.Logger) *Client {
Common: Common{
logger: logger,
semaphore: make(chan struct{}, semaphoreLimit),
errChan: make(chan error, 2),
signalChan: make(chan string, semaphoreLimit),
},
tunnelName: parsedURL.Hostname(),
@@ -73,7 +73,8 @@ func (c *Client) Run() {
// start 启动客户端服务
func (c *Client) start() error {
c.initContext()
// 初始化基本信息
c.initBackground()
// 通过是否监听成功判断单端转发或双端握手
if err := c.initTunnelListener(); err == nil {
@@ -115,18 +116,15 @@ func (c *Client) start() error {
go c.tunnelPool.ClientManager()
switch c.dataFlow {
case "-":
go c.commonOnce()
go c.commonQueue()
case "+":
if c.dataFlow == "+" {
// 初始化目标监听器
if err := c.initTargetListener(); err != nil {
return err
}
go c.commonLoop()
}
return c.healthCheck()
return c.commonControl()
}
}
@@ -139,7 +137,7 @@ func (c *Client) tunnelHandshake() error {
}
c.tunnelTCPConn = tunnelTCPConn.(*net.TCPConn)
c.bufReader = bufio.NewReader(c.tunnelTCPConn)
c.bufReader = bufio.NewReader(&conn.TimeoutReader{Conn: c.tunnelTCPConn, Timeout: tcpReadTimeout})
c.tunnelTCPConn.SetKeepAlive(true)
c.tunnelTCPConn.SetKeepAlivePeriod(reportInterval)

View File

@@ -6,6 +6,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"net"
"net/url"
"os"
@@ -46,6 +47,7 @@ type Common struct {
bufReader *bufio.Reader // 缓冲读取器
signalChan chan string // 信号通道
errChan chan error // 错误通道
checkPoint time.Time // 检查点时间
ctx context.Context // 上下文
cancel context.CancelFunc // 取消函数
}
@@ -123,7 +125,7 @@ func (c *Common) getPoolCapacity(parsedURL *url.URL) {
c.maxPoolCapacity = value
}
} else {
c.maxPoolCapacity = 8192
c.maxPoolCapacity = 1024
}
}
@@ -165,12 +167,13 @@ func (c *Common) getAddress(parsedURL *url.URL) {
}
}
// initContext 初始化上下文
func (c *Common) initContext() {
// initBackground 初始化基本信息
func (c *Common) initBackground() {
if c.cancel != nil {
c.cancel()
}
c.ctx, c.cancel = context.WithCancel(context.Background())
c.errChan = make(chan error, 3)
}
// initTargetListener 初始化目标监听器
@@ -308,6 +311,21 @@ func (c *Common) shutdown(ctx context.Context, stopFunc func()) error {
}
}
// commonControl 共用控制逻辑
func (c *Common) commonControl() error {
// 信号消纳、信号队列和健康检查
go func() { c.errChan <- c.commonOnce() }()
go func() { c.errChan <- c.commonQueue() }()
go func() { c.errChan <- c.healthCheck() }()
select {
case <-c.ctx.Done():
return c.ctx.Err()
case err := <-c.errChan:
return err
}
}
// commonQueue 共用信号队列
func (c *Common) commonQueue() error {
for {
@@ -335,12 +353,15 @@ func (c *Common) commonQueue() error {
// healthCheck 共用健康度检查
func (c *Common) healthCheck() error {
flushURL := &url.URL{Fragment: "0"} // 连接池刷新信号
pingURL := &url.URL{Fragment: "i"} // PING信号
for {
select {
case <-c.ctx.Done():
return c.ctx.Err()
default:
// 尝试获取锁
if !c.mu.TryLock() {
time.Sleep(time.Millisecond)
continue
}
@@ -353,15 +374,17 @@ func (c *Common) healthCheck() error {
return err
}
c.tunnelPool.Flush()
c.tunnelPool.ResetError()
time.Sleep(reportInterval) // 等待连接池刷新完成
c.logger.Debug("Tunnel pool reset: %v active connections", c.tunnelPool.Active())
} else {
// 发送普通心跳包
_, err := c.tunnelTCPConn.Write([]byte("\n"))
if err != nil {
c.mu.Unlock()
return err
}
}
// 发送PING信号
c.checkPoint = time.Now()
_, err := c.tunnelTCPConn.Write(append(c.xor([]byte(pingURL.String())), '\n'))
if err != nil {
c.mu.Unlock()
return err
}
c.mu.Unlock()
@@ -418,8 +441,7 @@ func (c *Common) commonTCPLoop() {
// 从连接池获取连接
id, remoteConn := c.tunnelPool.ServerGet()
if remoteConn == nil {
c.logger.Error("Get failed: %v not found", id)
c.tunnelPool.AddError()
c.logger.Error("Get failed: %v", id)
return
}
@@ -451,10 +473,11 @@ func (c *Common) commonTCPLoop() {
c.logger.Debug("Starting exchange: %v <-> %v", remoteConn.LocalAddr(), targetConn.LocalAddr())
// 交换数据
rx, tx, _ := conn.DataExchange(remoteConn, targetConn, tcpReadTimeout)
rx, tx, err := conn.DataExchange(remoteConn, targetConn, tcpReadTimeout)
// 交换完成,广播统计信息
c.logger.Event("Exchange complete: TRAFFIC_STATS|TCP_RX=%v|TCP_TX=%v|UDP_RX=0|UDP_TX=0", rx, tx)
c.logger.Debug("Exchange complete: %v", err)
c.logger.Event("TRAFFIC_STATS|TCP_RX=%v|TCP_TX=%v|UDP_RX=0|UDP_TX=0", rx, tx)
}(targetConn)
}
}
@@ -477,99 +500,115 @@ func (c *Common) commonUDPLoop() {
c.logger.Debug("Target connection: %v <-> %v", c.targetUDPConn.LocalAddr(), clientAddr)
// 获取池连接
id, remoteConn := c.tunnelPool.ServerGet()
if remoteConn == nil {
c.logger.Error("Get failed: %v not found", id)
c.tunnelPool.AddError()
continue
}
var id string
var remoteConn net.Conn
sessionKey := clientAddr.String()
c.logger.Debug("Tunnel connection: get %v <- pool active %v", id, c.tunnelPool.Active())
c.logger.Debug("Tunnel connection: %v <-> %v", remoteConn.LocalAddr(), remoteConn.RemoteAddr())
// 使用信号量限制并发数
c.semaphore <- struct{}{}
go func(remoteConn net.Conn, clientAddr *net.UDPAddr, id string) {
defer func() {
c.tunnelPool.Put(id, remoteConn)
c.logger.Debug("Tunnel connection: put %v -> pool active %v", id, c.tunnelPool.Active())
<-c.semaphore
}()
buffer := make([]byte, udpDataBufSize)
for {
select {
case <-c.ctx.Done():
return
default:
// 设置TCP读取超时
if err := remoteConn.SetReadDeadline(time.Now().Add(tcpReadTimeout)); err != nil {
c.logger.Error("SetReadDeadline failed: %v", err)
return
}
// 从池连接读取数据
x, err := remoteConn.Read(buffer)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
c.logger.Debug("UDP session abort: %v", err)
} else if strings.Contains(err.Error(), "use of closed network connection") {
c.logger.Debug("Read closed: %v", err)
} else {
c.logger.Error("Read failed: %v", err)
}
return
}
// 将数据写入目标UDP连接
tx, err := c.targetUDPConn.WriteToUDP(buffer[:x], clientAddr)
if err != nil {
c.logger.Error("WriteToUDP failed: %v", err)
return
}
// 传输完成,广播统计信息
c.logger.Event("Transfer complete: TRAFFIC_STATS|TCP_RX=0|TCP_TX=0|UDP_RX=0|UDP_TX=%v", tx)
}
// 获取或创建UDP会话
if session, ok := c.targetUDPSession.Load(sessionKey); ok {
// 复用现有会话
remoteConn = session.(net.Conn)
c.logger.Debug("Using UDP session: %v <-> %v", remoteConn.LocalAddr(), remoteConn.RemoteAddr())
} else {
// 获取池连接
id, remoteConn = c.tunnelPool.ServerGet()
if remoteConn == nil {
c.logger.Error("Get failed: %v", id)
continue
}
}(remoteConn, clientAddr, id)
c.targetUDPSession.Store(sessionKey, remoteConn)
c.logger.Debug("Tunnel connection: get %v <- pool active %v", id, c.tunnelPool.Active())
c.logger.Debug("Tunnel connection: %v <-> %v", remoteConn.LocalAddr(), remoteConn.RemoteAddr())
// 构建并发送启动URL到客户端
launchURL := &url.URL{
Host: clientAddr.String(),
Path: id,
Fragment: "2", // UDP模式
// 使用信号量限制并发数
c.semaphore <- struct{}{}
go func(remoteConn net.Conn, clientAddr *net.UDPAddr, sessionKey, id string) {
defer func() {
// 重置池连接的读取超时
remoteConn.SetReadDeadline(time.Time{})
c.tunnelPool.Put(id, remoteConn)
c.logger.Debug("Tunnel connection: put %v -> pool active %v", id, c.tunnelPool.Active())
// 清理UDP会话
c.targetUDPSession.Delete(sessionKey)
<-c.semaphore
}()
buffer := make([]byte, udpDataBufSize)
reader := &conn.TimeoutReader{Conn: remoteConn, Timeout: tcpReadTimeout}
for {
select {
case <-c.ctx.Done():
return
default:
// 从池连接读取数据
x, err := reader.Read(buffer)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
c.logger.Debug("UDP session abort: %v", err)
} else if err == io.EOF {
c.logger.Debug("UDP session close: %v", err)
} else if strings.Contains(err.Error(), "use of closed network connection") {
c.logger.Debug("UDP session close: %v", err)
} else {
c.logger.Error("Read failed: %v", err)
}
return
}
// 将数据写入目标UDP连接
tx, err := c.targetUDPConn.WriteToUDP(buffer[:x], clientAddr)
if err != nil {
c.logger.Error("WriteToUDP failed: %v", err)
return
}
// 传输完成,广播统计信息
c.logger.Debug("Transfer complete: %v <-> %v", remoteConn.LocalAddr(), c.targetUDPConn.LocalAddr())
c.logger.Event("TRAFFIC_STATS|TCP_RX=0|TCP_TX=0|UDP_RX=0|UDP_TX=%v", tx)
}
}
}(remoteConn, clientAddr, sessionKey, id)
// 构建并发送启动URL到客户端
launchURL := &url.URL{
Host: clientAddr.String(),
Path: id,
Fragment: "2", // UDP模式
}
c.mu.Lock()
_, err = c.tunnelTCPConn.Write(append(c.xor([]byte(launchURL.String())), '\n'))
c.mu.Unlock()
if err != nil {
c.logger.Error("Write failed: %v", err)
continue
}
c.logger.Debug("UDP launch signal: pid %v -> %v", id, c.tunnelTCPConn.RemoteAddr())
c.logger.Debug("Starting transfer: %v <-> %v", remoteConn.LocalAddr(), c.targetUDPConn.LocalAddr())
}
c.mu.Lock()
_, err = c.tunnelTCPConn.Write(append(c.xor([]byte(launchURL.String())), '\n'))
c.mu.Unlock()
if err != nil {
c.logger.Error("Write failed: %v", err)
continue
}
c.logger.Debug("UDP launch signal: pid %v -> %v", id, c.tunnelTCPConn.RemoteAddr())
c.logger.Debug("Starting transfer: %v <-> %v", remoteConn.LocalAddr(), c.targetUDPConn.LocalAddr())
// 将原始数据写入池连接
rx, err := remoteConn.Write(buffer[:n])
if err != nil {
c.logger.Error("Write failed: %v", err)
c.targetUDPSession.Delete(sessionKey)
remoteConn.Close()
continue
}
// 传输完成,广播统计信息
c.logger.Event("Transfer complete: TRAFFIC_STATS|TCP_RX=0|TCP_TX=0|UDP_RX=%v|UDP_TX=0", rx)
c.logger.Debug("Transfer complete: %v <-> %v", remoteConn.LocalAddr(), c.targetUDPConn.LocalAddr())
c.logger.Event("TRAFFIC_STATS|TCP_RX=0|TCP_TX=0|UDP_RX=%v|UDP_TX=0", rx)
}
}
}
// commonOnce 共用处理单个请求
func (c *Common) commonOnce() {
func (c *Common) commonOnce() error {
pongURL := &url.URL{Fragment: "o"} // PONG信号
for {
// 等待连接池准备就绪
if !c.tunnelPool.Ready() {
@@ -579,13 +618,12 @@ func (c *Common) commonOnce() {
select {
case <-c.ctx.Done():
return
return c.ctx.Err()
case signal := <-c.signalChan:
// 解析信号URL
signalURL, err := url.Parse(signal)
if err != nil {
c.logger.Error("Parse failed: %v", err)
continue
return err
}
// 处理信号
@@ -593,6 +631,7 @@ func (c *Common) commonOnce() {
case "0": // 连接池刷新
go func() {
c.tunnelPool.Flush()
c.tunnelPool.ResetError()
time.Sleep(reportInterval) // 等待连接池刷新完成
c.logger.Debug("Tunnel pool reset: %v active connections", c.tunnelPool.Active())
}()
@@ -600,8 +639,17 @@ func (c *Common) commonOnce() {
go c.commonTCPOnce(signalURL.Host)
case "2": // UDP
go c.commonUDPOnce(signalURL)
case "i": // PING
c.mu.Lock()
_, err := c.tunnelTCPConn.Write(append(c.xor([]byte(pongURL.String())), '\n'))
c.mu.Unlock()
if err != nil {
return err
}
case "o": // PONG
c.logger.Event("HEALTH_CHECKS|POOL=%v|PING=%vms", c.tunnelPool.Active(), time.Since(c.checkPoint).Milliseconds())
default:
// 健康检查或无效信号
// 无效信号
}
}
}
@@ -615,6 +663,7 @@ func (c *Common) commonTCPOnce(id string) {
remoteConn := c.tunnelPool.ClientGet(id)
if remoteConn == nil {
c.logger.Error("Get failed: %v not found", id)
c.tunnelPool.AddError()
return
}
@@ -645,10 +694,11 @@ func (c *Common) commonTCPOnce(id string) {
c.logger.Debug("Starting exchange: %v <-> %v", remoteConn.LocalAddr(), targetConn.LocalAddr())
// 交换数据
rx, tx, _ := conn.DataExchange(remoteConn, targetConn, tcpReadTimeout)
rx, tx, err := conn.DataExchange(remoteConn, targetConn, tcpReadTimeout)
// 交换完成,广播统计信息
c.logger.Event("Exchange complete: TRAFFIC_STATS|TCP_RX=%v|TCP_TX=%v|UDP_RX=0|UDP_TX=0", rx, tx)
c.logger.Debug("Exchange complete: %v", err)
c.logger.Event("TRAFFIC_STATS|TCP_RX=%v|TCP_TX=%v|UDP_RX=0|UDP_TX=0", rx, tx)
}
// commonUDPOnce 共用处理单个UDP请求
@@ -660,6 +710,7 @@ func (c *Common) commonUDPOnce(signalURL *url.URL) {
remoteConn := c.tunnelPool.ClientGet(id)
if remoteConn == nil {
c.logger.Error("Get failed: %v not found", id)
c.tunnelPool.AddError()
return
}
c.logger.Debug("Tunnel connection: get %v <- pool active %v", id, c.tunnelPool.Active())
@@ -684,30 +735,28 @@ func (c *Common) commonUDPOnce(signalURL *url.URL) {
targetConn = session.(*net.UDPConn)
c.logger.Debug("Target connection: %v <-> %v", targetConn.LocalAddr(), targetConn.RemoteAddr())
}
c.logger.Debug("Starting transfer: %v <-> %v", remoteConn.LocalAddr(), targetConn.LocalAddr())
done := make(chan struct{}, 2)
go func() {
defer func() { done <- struct{}{} }()
buffer := make([]byte, udpDataBufSize)
reader := &conn.TimeoutReader{Conn: remoteConn, Timeout: tcpReadTimeout}
for {
select {
case <-c.ctx.Done():
return
default:
// 设置TCP读取超时
if err := remoteConn.SetReadDeadline(time.Now().Add(tcpReadTimeout)); err != nil {
c.logger.Error("SetReadDeadline failed: %v", err)
return
}
// 从隧道连接读取数据
x, err := remoteConn.Read(buffer)
x, err := reader.Read(buffer)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
c.logger.Debug("Read timeout: %v", err)
c.logger.Debug("UDP session abort: %v", err)
} else if err == io.EOF {
c.logger.Debug("UDP session close: %v", err)
} else if strings.Contains(err.Error(), "use of closed network connection") {
c.logger.Debug("Read closed: %v", err)
c.logger.Debug("UDP session close: %v", err)
} else {
c.logger.Error("Read failed: %v", err)
}
@@ -722,7 +771,8 @@ func (c *Common) commonUDPOnce(signalURL *url.URL) {
}
// 传输完成,广播统计信息
c.logger.Event("Transfer complete: TRAFFIC_STATS|TCP_RX=0|TCP_TX=0|UDP_RX=%v|UDP_TX=0", rx)
c.logger.Debug("Transfer complete: %v <-> %v", remoteConn.LocalAddr(), targetConn.LocalAddr())
c.logger.Event("TRAFFIC_STATS|TCP_RX=0|TCP_TX=0|UDP_RX=%v|UDP_TX=0", rx)
}
}
}()
@@ -730,24 +780,21 @@ func (c *Common) commonUDPOnce(signalURL *url.URL) {
go func() {
defer func() { done <- struct{}{} }()
buffer := make([]byte, udpDataBufSize)
reader := &conn.TimeoutReader{Conn: targetConn, Timeout: udpReadTimeout}
for {
select {
case <-c.ctx.Done():
return
default:
// 设置UDP读取超时
if err := targetConn.SetReadDeadline(time.Now().Add(udpReadTimeout)); err != nil {
c.logger.Error("SetReadDeadline failed: %v", err)
return
}
// 从目标UDP连接读取数据
x, err := targetConn.Read(buffer)
x, err := reader.Read(buffer)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
c.logger.Debug("Read timeout: %v", err)
c.logger.Debug("UDP session abort: %v", err)
} else if err == io.EOF {
c.logger.Debug("UDP session close: %v", err)
} else if strings.Contains(err.Error(), "use of closed network connection") {
c.logger.Debug("Read closed: %v", err)
c.logger.Debug("UDP session close: %v", err)
} else {
c.logger.Error("Read failed: %v", err)
}
@@ -762,13 +809,23 @@ func (c *Common) commonUDPOnce(signalURL *url.URL) {
}
// 传输完成,广播统计信息
c.logger.Event("Transfer complete: TRAFFIC_STATS|TCP_RX=0|TCP_TX=0|UDP_RX=0|UDP_TX=%v", tx)
c.logger.Debug("Transfer complete: %v <-> %v", targetConn.LocalAddr(), remoteConn.LocalAddr())
c.logger.Event("TRAFFIC_STATS|TCP_RX=0|TCP_TX=0|UDP_RX=0|UDP_TX=%v", tx)
}
}
}()
// 等待任一协程完成
<-done
// 清理连接和会话
c.targetUDPSession.Delete(sessionKey)
if targetConn != nil {
targetConn.Close()
}
// 重置池连接的读取超时
remoteConn.SetReadDeadline(time.Time{})
c.tunnelPool.Put(id, remoteConn)
c.logger.Debug("Tunnel connection: put %v -> pool active %v", id, c.tunnelPool.Active())
}
@@ -826,11 +883,13 @@ func (c *Common) singleTCPLoop() error {
return
}
c.logger.Debug("Target connection: get relay-id <- pool active %v", c.tunnelPool.Active())
c.logger.Debug("Target connection: get ******** <- pool active %v / %v per %v",
c.tunnelPool.Active(), c.tunnelPool.Capacity(), c.tunnelPool.Interval())
defer func() {
c.tunnelPool.Put("", targetConn)
c.logger.Debug("Tunnel connection: put relay-id -> pool active %v", c.tunnelPool.Active())
if targetConn != nil {
targetConn.Close()
}
}()
c.targetTCPConn = targetConn.(*net.TCPConn)
@@ -838,10 +897,11 @@ func (c *Common) singleTCPLoop() error {
c.logger.Debug("Starting exchange: %v <-> %v", tunnelConn.LocalAddr(), targetConn.LocalAddr())
// 交换数据
rx, tx, _ := conn.DataExchange(tunnelConn, targetConn, tcpReadTimeout)
rx, tx, err := conn.DataExchange(tunnelConn, targetConn, tcpReadTimeout)
// 交换完成,广播统计信息
c.logger.Event("Exchange complete: TRAFFIC_STATS|TCP_RX=%v|TCP_TX=%v|UDP_RX=0|UDP_TX=0", rx, tx)
c.logger.Debug("Exchange complete: %v", err)
c.logger.Event("TRAFFIC_STATS|TCP_RX=%v|TCP_TX=%v|UDP_RX=0|UDP_TX=0", rx, tx)
}(tunnelConn)
}
}
@@ -896,45 +956,45 @@ func (c *Common) singleUDPLoop() error {
}()
buffer := make([]byte, udpDataBufSize)
reader := &conn.TimeoutReader{Conn: targetConn, Timeout: udpReadTimeout}
for {
select {
case <-c.ctx.Done():
return
default:
// 设置UDP读取超时
if err := targetConn.SetReadDeadline(time.Now().Add(udpReadTimeout)); err != nil {
c.logger.Error("SetReadDeadline failed: %v", err)
c.targetUDPSession.Delete(sessionKey)
targetConn.Close()
return
}
// 从UDP读取响应
n, err := targetConn.Read(buffer)
x, err := reader.Read(buffer)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
c.logger.Debug("UDP session abort: %v", err)
} else if err == io.EOF {
c.logger.Debug("UDP session close: %v", err)
} else if strings.Contains(err.Error(), "use of closed network connection") {
c.logger.Debug("Read closed: %v", err)
c.logger.Debug("UDP session close: %v", err)
} else {
c.logger.Error("Read failed: %v", err)
}
c.targetUDPSession.Delete(sessionKey)
targetConn.Close()
if targetConn != nil {
targetConn.Close()
}
return
}
// 将响应写回隧道UDP连接
tx, err := c.tunnelUDPConn.WriteToUDP(buffer[:n], clientAddr)
tx, err := c.tunnelUDPConn.WriteToUDP(buffer[:x], clientAddr)
if err != nil {
c.logger.Error("WriteToUDP failed: %v", err)
c.targetUDPSession.Delete(sessionKey)
targetConn.Close()
if targetConn != nil {
targetConn.Close()
}
return
}
// 传输完成,广播统计信息
c.logger.Event("Transfer complete: TRAFFIC_STATS|TCP_RX=0|TCP_TX=0|UDP_RX=0|UDP_TX=%v", tx)
c.logger.Debug("Transfer complete: %v <-> %v", c.tunnelUDPConn.LocalAddr(), targetConn.LocalAddr())
c.logger.Event("TRAFFIC_STATS|TCP_RX=0|TCP_TX=0|UDP_RX=0|UDP_TX=%v", tx)
}
}
}(targetConn, clientAddr, sessionKey)
@@ -946,12 +1006,15 @@ func (c *Common) singleUDPLoop() error {
if err != nil {
c.logger.Error("Write failed: %v", err)
c.targetUDPSession.Delete(sessionKey)
targetConn.Close()
if targetConn != nil {
targetConn.Close()
}
return err
}
// 传输完成,广播统计信息
c.logger.Event("Transfer complete: TRAFFIC_STATS|TCP_RX=0|TCP_TX=0|UDP_RX=%v|UDP_TX=0", rx)
c.logger.Debug("Transfer complete: %v <-> %v", targetConn.LocalAddr(), c.tunnelUDPConn.LocalAddr())
c.logger.Event("TRAFFIC_STATS|TCP_RX=0|TCP_TX=0|UDP_RX=%v|UDP_TX=0", rx)
}
}
}

View File

@@ -49,12 +49,12 @@ const swaggerUIHTML = `<!DOCTYPE html>
<div id="swagger-ui"></div>
<script src="https://cdn.jsdelivr.net/npm/swagger-ui-dist@5/swagger-ui-bundle.js"></script>
<script>
window.onload = () => SwaggerUIBundle({
spec: %s,
dom_id: '#swagger-ui',
presets: [SwaggerUIBundle.presets.apis, SwaggerUIBundle.SwaggerUIStandalonePreset],
layout: "BaseLayout"
});
window.onload = () => SwaggerUIBundle({
spec: %s,
dom_id: '#swagger-ui',
presets: [SwaggerUIBundle.presets.apis, SwaggerUIBundle.SwaggerUIStandalonePreset],
layout: "BaseLayout"
});
</script>
</body>
</html>`
@@ -73,6 +73,7 @@ type Master struct {
tlsConfig *tls.Config // TLS配置
masterURL *url.URL // 主控URL
statePath string // 实例状态持久化文件路径
stateMu sync.Mutex // 持久化文件写入互斥锁
subscribers sync.Map // SSE订阅者映射表
notifyChannel chan *InstanceEvent // 事件通知通道
startTime time.Time // 启动时间
@@ -90,6 +91,8 @@ type Instance struct {
TCPTX uint64 `json:"tcptx"` // TCP发送字节数
UDPRX uint64 `json:"udprx"` // UDP接收字节数
UDPTX uint64 `json:"udptx"` // UDP发送字节数
Pool int64 `json:"pool"` // 健康检查池连接数
Ping int64 `json:"ping"` // 健康检查端内延迟
cmd *exec.Cmd `json:"-" gob:"-"` // 命令对象(不序列化)
stopped chan struct{} `json:"-" gob:"-"` // 停止信号通道(不序列化)
cancelFunc context.CancelFunc `json:"-" gob:"-"` // 取消函数(不序列化)
@@ -105,21 +108,23 @@ type InstanceEvent struct {
// InstanceLogWriter 实例日志写入器
type InstanceLogWriter struct {
instanceID string // 实例ID
instance *Instance // 实例对象
target io.Writer // 目标写入器
master *Master // 主控对象
statRegex *regexp.Regexp // 统计信息正则表达式
instanceID string // 实例ID
instance *Instance // 实例对象
target io.Writer // 目标写入器
master *Master // 主控对象
statRegex *regexp.Regexp // 统计信息正则表达式
healthRegex *regexp.Regexp // 健康检查正则表达式
}
// NewInstanceLogWriter 创建新的实例日志写入器
func NewInstanceLogWriter(instanceID string, instance *Instance, target io.Writer, master *Master) *InstanceLogWriter {
return &InstanceLogWriter{
instanceID: instanceID,
instance: instance,
target: target,
master: master,
statRegex: regexp.MustCompile(`TRAFFIC_STATS\|TCP_RX=(\d+)\|TCP_TX=(\d+)\|UDP_RX=(\d+)\|UDP_TX=(\d+)`),
instanceID: instanceID,
instance: instance,
target: target,
master: master,
statRegex: regexp.MustCompile(`TRAFFIC_STATS\|TCP_RX=(\d+)\|TCP_TX=(\d+)\|UDP_RX=(\d+)\|UDP_TX=(\d+)`),
healthRegex: regexp.MustCompile(`HEALTH_CHECKS\|POOL=(\d+)\|PING=(\d+)ms`),
}
}
@@ -140,9 +145,24 @@ func (w *InstanceLogWriter) Write(p []byte) (n int, err error) {
}
}
w.master.instances.Store(w.instanceID, w.instance)
// 发送流量更新事件
w.master.sendSSEEvent("update", w.instance)
// 过滤统计日志
continue
}
// 解析并处理健康检查信息
if matches := w.healthRegex.FindStringSubmatch(line); len(matches) == 3 {
if v, err := strconv.ParseInt(matches[1], 10, 64); err == nil {
w.instance.Pool = v
}
if v, err := strconv.ParseInt(matches[2], 10, 64); err == nil {
w.instance.Ping = v
}
w.master.instances.Store(w.instanceID, w.instance)
// 发送健康检查更新事件
w.master.sendSSEEvent("update", w.instance)
// 过滤检查日志
continue
}
// 输出日志加实例ID
fmt.Fprintf(w.target, "%s [%s]\n", line, w.instanceID)
@@ -429,6 +449,9 @@ func (m *Master) Shutdown(ctx context.Context) error {
// saveState 保存实例状态到文件
func (m *Master) saveState() error {
m.stateMu.Lock()
defer m.stateMu.Unlock()
// 创建持久化数据
persistentData := make(map[string]*Instance)
@@ -699,7 +722,7 @@ func (m *Master) handlePatchInstance(w http.ResponseWriter, r *http.Request, id
instance.UDPRX = 0
instance.UDPTX = 0
m.instances.Store(id, instance)
m.saveState()
go m.saveState()
m.logger.Info("Traffic stats reset: [%v]", instance.ID)
// 发送流量统计重置事件
@@ -710,7 +733,7 @@ func (m *Master) handlePatchInstance(w http.ResponseWriter, r *http.Request, id
if reqData.Restart != nil && instance.Restart != *reqData.Restart {
instance.Restart = *reqData.Restart
m.instances.Store(id, instance)
m.saveState()
go m.saveState()
m.logger.Info("Restart policy updated: %v [%v]", *reqData.Restart, instance.ID)
// 发送restart策略变更事件
@@ -721,7 +744,7 @@ func (m *Master) handlePatchInstance(w http.ResponseWriter, r *http.Request, id
if reqData.Alias != "" && instance.Alias != reqData.Alias {
instance.Alias = reqData.Alias
m.instances.Store(id, instance)
m.saveState()
go m.saveState()
m.logger.Info("Alias updated: %v [%v]", reqData.Alias, instance.ID)
// 发送别名变更事件
@@ -807,7 +830,7 @@ func (m *Master) handlePutInstance(w http.ResponseWriter, r *http.Request, id st
func (m *Master) regenerateAPIKey(instance *Instance) {
instance.URL = generateAPIKey()
m.instances.Store(apiKeyID, instance)
m.saveState()
go m.saveState()
m.logger.Info("API Key regenerated: %v", instance.URL)
}
@@ -848,7 +871,7 @@ func (m *Master) handleDeleteInstance(w http.ResponseWriter, id string, instance
}
m.instances.Delete(id)
// 删除实例后保存状态
m.saveState()
go m.saveState()
w.WriteHeader(http.StatusNoContent)
// 发送删除事件
@@ -1123,7 +1146,7 @@ func (m *Master) stopInstance(instance *Instance) {
m.instances.Store(instance.ID, instance)
// 保存状态变更
m.saveState()
go m.saveState()
// 发送停止事件
m.sendSSEEvent("update", instance)
@@ -1200,185 +1223,187 @@ func generateOpenAPISpec() string {
return fmt.Sprintf(`{
"openapi": "3.1.1",
"info": {
"title": "NodePass API",
"description": "API for managing NodePass server and client instances",
"version": "%s"
"title": "NodePass API",
"description": "API for managing NodePass server and client instances",
"version": "%s"
},
"servers": [{"url": "/{prefix}/v1", "variables": {"prefix": {"default": "api", "description": "API prefix path"}}}],
"security": [{"ApiKeyAuth": []}],
"paths": {
"/instances": {
"get": {
"summary": "List all instances",
"security": [{"ApiKeyAuth": []}],
"responses": {
"200": {"description": "Success", "content": {"application/json": {"schema": {"type": "array", "items": {"$ref": "#/components/schemas/Instance"}}}}},
"401": {"description": "Unauthorized"},
"405": {"description": "Method not allowed"}
}
},
"post": {
"summary": "Create a new instance",
"security": [{"ApiKeyAuth": []}],
"requestBody": {"required": true, "content": {"application/json": {"schema": {"$ref": "#/components/schemas/CreateInstanceRequest"}}}},
"responses": {
"201": {"description": "Created", "content": {"application/json": {"schema": {"$ref": "#/components/schemas/Instance"}}}},
"400": {"description": "Invalid input"},
"401": {"description": "Unauthorized"},
"405": {"description": "Method not allowed"},
"409": {"description": "Instance ID already exists"}
}
}
},
"/instances/{id}": {
"parameters": [{"name": "id", "in": "path", "required": true, "schema": {"type": "string"}}],
"get": {
"summary": "Get instance details",
"security": [{"ApiKeyAuth": []}],
"responses": {
"200": {"description": "Success", "content": {"application/json": {"schema": {"$ref": "#/components/schemas/Instance"}}}},
"400": {"description": "Instance ID required"},
"401": {"description": "Unauthorized"},
"404": {"description": "Not found"},
"405": {"description": "Method not allowed"}
}
},
"patch": {
"summary": "Update instance",
"security": [{"ApiKeyAuth": []}],
"requestBody": {"required": true, "content": {"application/json": {"schema": {"$ref": "#/components/schemas/UpdateInstanceRequest"}}}},
"responses": {
"200": {"description": "Success", "content": {"application/json": {"schema": {"$ref": "#/components/schemas/Instance"}}}},
"400": {"description": "Instance ID required or invalid input"},
"401": {"description": "Unauthorized"},
"404": {"description": "Not found"},
"405": {"description": "Method not allowed"}
}
},
"put": {
"summary": "Update instance URL",
"security": [{"ApiKeyAuth": []}],
"requestBody": {"required": true, "content": {"application/json": {"schema": {"$ref": "#/components/schemas/PutInstanceRequest"}}}},
"responses": {
"200": {"description": "Success", "content": {"application/json": {"schema": {"$ref": "#/components/schemas/Instance"}}}},
"400": {"description": "Instance ID required or invalid input"},
"401": {"description": "Unauthorized"},
"403": {"description": "Forbidden"},
"404": {"description": "Not found"},
"405": {"description": "Method not allowed"},
"409": {"description": "Instance URL conflict"}
}
},
"delete": {
"summary": "Delete instance",
"security": [{"ApiKeyAuth": []}],
"responses": {
"204": {"description": "Deleted"},
"400": {"description": "Instance ID required"},
"401": {"description": "Unauthorized"},
"403": {"description": "Forbidden"},
"404": {"description": "Not found"},
"405": {"description": "Method not allowed"}
}
}
},
"/events": {
"get": {
"summary": "Subscribe to instance events",
"/instances": {
"get": {
"summary": "List all instances",
"security": [{"ApiKeyAuth": []}],
"responses": {
"200": {"description": "Success", "content": {"text/event-stream": {}}},
"responses": {
"200": {"description": "Success", "content": {"application/json": {"schema": {"type": "array", "items": {"$ref": "#/components/schemas/Instance"}}}}},
"401": {"description": "Unauthorized"},
"405": {"description": "Method not allowed"}
}
}
},
"/info": {
"get": {
"summary": "Get master information",
"security": [{"ApiKeyAuth": []}],
"responses": {
"200": {"description": "Success", "content": {"application/json": {"schema": {"$ref": "#/components/schemas/MasterInfo"}}}},
"401": {"description": "Unauthorized"},
"405": {"description": "Method not allowed"}
}
}
},
"/openapi.json": {
"get": {
"summary": "Get OpenAPI specification",
"responses": {
"200": {"description": "Success", "content": {"application/json": {}}}
}
}
},
"/docs": {
"get": {
"summary": "Get Swagger UI",
"responses": {
"200": {"description": "Success", "content": {"text/html": {}}}
}
}
}
"405": {"description": "Method not allowed"}
}
},
"post": {
"summary": "Create a new instance",
"security": [{"ApiKeyAuth": []}],
"requestBody": {"required": true, "content": {"application/json": {"schema": {"$ref": "#/components/schemas/CreateInstanceRequest"}}}},
"responses": {
"201": {"description": "Created", "content": {"application/json": {"schema": {"$ref": "#/components/schemas/Instance"}}}},
"400": {"description": "Invalid input"},
"401": {"description": "Unauthorized"},
"405": {"description": "Method not allowed"},
"409": {"description": "Instance ID already exists"}
}
}
},
"/instances/{id}": {
"parameters": [{"name": "id", "in": "path", "required": true, "schema": {"type": "string"}}],
"get": {
"summary": "Get instance details",
"security": [{"ApiKeyAuth": []}],
"responses": {
"200": {"description": "Success", "content": {"application/json": {"schema": {"$ref": "#/components/schemas/Instance"}}}},
"400": {"description": "Instance ID required"},
"401": {"description": "Unauthorized"},
"404": {"description": "Not found"},
"405": {"description": "Method not allowed"}
}
},
"patch": {
"summary": "Update instance",
"security": [{"ApiKeyAuth": []}],
"requestBody": {"required": true, "content": {"application/json": {"schema": {"$ref": "#/components/schemas/UpdateInstanceRequest"}}}},
"responses": {
"200": {"description": "Success", "content": {"application/json": {"schema": {"$ref": "#/components/schemas/Instance"}}}},
"400": {"description": "Instance ID required or invalid input"},
"401": {"description": "Unauthorized"},
"404": {"description": "Not found"},
"405": {"description": "Method not allowed"}
}
},
"put": {
"summary": "Update instance URL",
"security": [{"ApiKeyAuth": []}],
"requestBody": {"required": true, "content": {"application/json": {"schema": {"$ref": "#/components/schemas/PutInstanceRequest"}}}},
"responses": {
"200": {"description": "Success", "content": {"application/json": {"schema": {"$ref": "#/components/schemas/Instance"}}}},
"400": {"description": "Instance ID required or invalid input"},
"401": {"description": "Unauthorized"},
"403": {"description": "Forbidden"},
"404": {"description": "Not found"},
"405": {"description": "Method not allowed"},
"409": {"description": "Instance URL conflict"}
}
},
"delete": {
"summary": "Delete instance",
"security": [{"ApiKeyAuth": []}],
"responses": {
"204": {"description": "Deleted"},
"400": {"description": "Instance ID required"},
"401": {"description": "Unauthorized"},
"403": {"description": "Forbidden"},
"404": {"description": "Not found"},
"405": {"description": "Method not allowed"}
}
}
},
"/events": {
"get": {
"summary": "Subscribe to instance events",
"security": [{"ApiKeyAuth": []}],
"responses": {
"200": {"description": "Success", "content": {"text/event-stream": {}}},
"401": {"description": "Unauthorized"},
"405": {"description": "Method not allowed"}
}
}
},
"/info": {
"get": {
"summary": "Get master information",
"security": [{"ApiKeyAuth": []}],
"responses": {
"200": {"description": "Success", "content": {"application/json": {"schema": {"$ref": "#/components/schemas/MasterInfo"}}}},
"401": {"description": "Unauthorized"},
"405": {"description": "Method not allowed"}
}
}
},
"/openapi.json": {
"get": {
"summary": "Get OpenAPI specification",
"responses": {
"200": {"description": "Success", "content": {"application/json": {}}}
}
}
},
"/docs": {
"get": {
"summary": "Get Swagger UI",
"responses": {
"200": {"description": "Success", "content": {"text/html": {}}}
}
}
}
},
"components": {
"securitySchemes": {
"ApiKeyAuth": {
"type": "apiKey",
"in": "header",
"name": "X-API-Key",
"description": "API Key for authentication"
}
},
"schemas": {
"Instance": {
"type": "object",
"properties": {
"id": {"type": "string", "description": "Unique identifier"},
"alias": {"type": "string", "description": "Instance alias"},
"type": {"type": "string", "enum": ["client", "server"], "description": "Type of instance"},
"status": {"type": "string", "enum": ["running", "stopped", "error"], "description": "Instance status"},
"url": {"type": "string", "description": "Command string or API Key"},
"restart": {"type": "boolean", "description": "Restart policy"},
"tcprx": {"type": "integer", "description": "TCP received bytes"},
"tcptx": {"type": "integer", "description": "TCP transmitted bytes"},
"udprx": {"type": "integer", "description": "UDP received bytes"},
"udptx": {"type": "integer", "description": "UDP transmitted bytes"}
}
},
"CreateInstanceRequest": {
"type": "object",
"required": ["url"],
"properties": {"url": {"type": "string", "description": "Command string(scheme://host:port/host:port)"}}
},
"UpdateInstanceRequest": {
"type": "object",
"properties": {
"alias": {"type": "string", "description": "Instance alias"},
"action": {"type": "string", "enum": ["start", "stop", "restart", "reset"], "description": "Action for the instance"},
"restart": {"type": "boolean", "description": "Instance restart policy"}
}
},
"PutInstanceRequest": {
"type": "object",
"required": ["url"],
"properties": {"url": {"type": "string", "description": "New command string(scheme://host:port/host:port)"}}
},
"MasterInfo": {
"type": "object",
"properties": {
"os": {"type": "string", "description": "Operating system"},
"arch": {"type": "string", "description": "System architecture"},
"ver": {"type": "string", "description": "NodePass version"},
"securitySchemes": {
"ApiKeyAuth": {
"type": "apiKey",
"in": "header",
"name": "X-API-Key",
"description": "API Key for authentication"
}
},
"schemas": {
"Instance": {
"type": "object",
"properties": {
"id": {"type": "string", "description": "Unique identifier"},
"alias": {"type": "string", "description": "Instance alias"},
"type": {"type": "string", "enum": ["client", "server"], "description": "Type of instance"},
"status": {"type": "string", "enum": ["running", "stopped", "error"], "description": "Instance status"},
"url": {"type": "string", "description": "Command string or API Key"},
"restart": {"type": "boolean", "description": "Restart policy"},
"tcprx": {"type": "integer", "description": "TCP received bytes"},
"tcptx": {"type": "integer", "description": "TCP transmitted bytes"},
"udprx": {"type": "integer", "description": "UDP received bytes"},
"udptx": {"type": "integer", "description": "UDP transmitted bytes"},
"pool": {"type": "integer", "description": "Health check pool active"},
"ping": {"type": "integer", "description": "Health check one-way latency"},
}
},
"CreateInstanceRequest": {
"type": "object",
"required": ["url"],
"properties": {"url": {"type": "string", "description": "Command string(scheme://host:port/host:port)"}}
},
"UpdateInstanceRequest": {
"type": "object",
"properties": {
"alias": {"type": "string", "description": "Instance alias"},
"action": {"type": "string", "enum": ["start", "stop", "restart", "reset"], "description": "Action for the instance"},
"restart": {"type": "boolean", "description": "Instance restart policy"}
}
},
"PutInstanceRequest": {
"type": "object",
"required": ["url"],
"properties": {"url": {"type": "string", "description": "New command string(scheme://host:port/host:port)"}}
},
"MasterInfo": {
"type": "object",
"properties": {
"os": {"type": "string", "description": "Operating system"},
"arch": {"type": "string", "description": "System architecture"},
"ver": {"type": "string", "description": "NodePass version"},
"name": {"type": "string", "description": "Hostname"},
"uptime": {"type": "integer", "format": "int64", "description": "Uptime in seconds"},
"log": {"type": "string", "description": "Log level"},
"tls": {"type": "string", "description": "TLS code"},
"crt": {"type": "string", "description": "Certificate path"},
"key": {"type": "string", "description": "Private key path"}
}
}
}
"uptime": {"type": "integer", "format": "int64", "description": "Uptime in seconds"},
"log": {"type": "string", "description": "Log level"},
"tls": {"type": "string", "description": "TLS code"},
"crt": {"type": "string", "description": "Certificate path"},
"key": {"type": "string", "description": "Private key path"}
}
}
}
}
}`, openAPIVersion)
}

View File

@@ -13,6 +13,7 @@ import (
"syscall"
"time"
"github.com/NodePassProject/conn"
"github.com/NodePassProject/logs"
"github.com/NodePassProject/pool"
)
@@ -38,6 +39,7 @@ func NewServer(parsedURL *url.URL, tlsCode string, tlsConfig *tls.Config, logger
}
// 初始化公共字段
server.getTunnelKey(parsedURL)
server.getPoolCapacity(parsedURL)
server.getAddress(parsedURL)
return server
}
@@ -75,7 +77,8 @@ func (s *Server) Run() {
// start 启动服务端
func (s *Server) start() error {
s.initContext()
// 初始化基本信息
s.initBackground()
// 初始化隧道监听器
if err := s.initTunnelListener(); err != nil {
@@ -99,6 +102,7 @@ func (s *Server) start() error {
// 初始化隧道连接池
s.tunnelPool = pool.NewServerPool(
s.maxPoolCapacity,
s.clientIP,
s.tlsConfig,
s.tunnelListener,
@@ -106,14 +110,11 @@ func (s *Server) start() error {
go s.tunnelPool.ServerManager()
switch s.dataFlow {
case "-":
if s.dataFlow == "-" {
go s.commonLoop()
case "+":
go s.commonOnce()
go s.commonQueue()
}
return s.healthCheck()
return s.commonControl()
}
// tunnelHandshake 与客户端进行握手
@@ -148,7 +149,7 @@ func (s *Server) tunnelHandshake() error {
continue
} else {
s.tunnelTCPConn = tunnelTCPConn.(*net.TCPConn)
s.bufReader = bufio.NewReader(s.tunnelTCPConn)
s.bufReader = bufio.NewReader(&conn.TimeoutReader{Conn: s.tunnelTCPConn, Timeout: tcpReadTimeout})
s.tunnelTCPConn.SetKeepAlive(true)
s.tunnelTCPConn.SetKeepAlivePeriod(reportInterval)