feature:mp4-upload-s3 (#325)

* iFLOW CLI Automated Issue Triage

* Update for ai

* Revert "Update for ai"

This reverts commit b85978298a.

* Update ai md

* feature:mp4-upload-s3
This commit is contained in:
eanfs
2025-09-08 08:53:15 +08:00
committed by GitHub
parent f60c9fd421
commit 2311931432
7 changed files with 631 additions and 18 deletions

101
.github/workflows/iflow.yml vendored Normal file
View File

@@ -0,0 +1,101 @@
name: '🏷️ iFLOW CLI Automated Issue Triage'
on:
issues:
types:
- 'opened'
- 'reopened'
issue_comment:
types:
- 'created'
workflow_dispatch:
inputs:
issue_number:
description: 'issue number to triage'
required: true
type: 'number'
concurrency:
group: '${{ github.workflow }}-${{ github.event.issue.number }}'
cancel-in-progress: true
defaults:
run:
shell: 'bash'
permissions:
contents: 'read'
issues: 'write'
statuses: 'write'
jobs:
triage-issue:
if: |-
github.event_name == 'issues' ||
github.event_name == 'workflow_dispatch' ||
(
github.event_name == 'issue_comment' &&
contains(github.event.comment.body, '@iflow-cli /triage') &&
contains(fromJSON('["OWNER", "MEMBER", "COLLABORATOR"]'), github.event.comment.author_association)
)
timeout-minutes: 5
runs-on: 'ubuntu-latest'
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: 'Run iFlow CLI Issue Triage'
uses: vibe-ideas/iflow-cli-action@main
id: 'iflow_cli_issue_triage'
env:
GITHUB_TOKEN: '${{ secrets.GITHUB_TOKEN }}'
ISSUE_TITLE: '${{ github.event.issue.title }}'
ISSUE_BODY: '${{ github.event.issue.body }}'
ISSUE_NUMBER: '${{ github.event.issue.number }}'
REPOSITORY: '${{ github.repository }}'
with:
api_key: ${{ secrets.IFLOW_API_KEY }}
timeout: "3600"
extra_args: "--debug"
prompt: |
## Role
You are an issue triage assistant. Analyze the current GitHub issue
and apply the most appropriate existing labels. Use the available
tools to gather information; do not ask for information to be
provided.
## Steps
1. Run: `gh label list` to get all available labels.
2. Review the issue title and body provided in the environment
variables: "${ISSUE_TITLE}" and "${ISSUE_BODY}".
3. Classify issues by their kind (bug, enhancement, documentation,
cleanup, etc) and their priority (p0, p1, p2, p3). Set the
labels according to the format `kind/*` and `priority/*` patterns.
4. Apply the selected labels to this issue using:
`gh issue edit "${ISSUE_NUMBER}" --add-label "label1,label2"`
5. If the "status/needs-triage" label is present, remove it using:
`gh issue edit "${ISSUE_NUMBER}" --remove-label "status/needs-triage"`
## Guidelines
- Only use labels that already exist in the repository
- Do not add comments or modify the issue content
- Triage only the current issue
- Assign all applicable labels based on the issue content
- Reference all shell variables as "${VAR}" (with quotes and braces)
- name: 'Post Issue Triage Failure Comment'
if: |-
${{ failure() && steps.iflow_cli_issue_triage.outcome == 'failure' }}
uses: 'actions/github-script@60a0d83039c74a4aee543508d2ffcb1c3799cdea'
with:
github-token: '${{ secrets.GITHUB_TOKEN }}'
script: |-
github.rest.issues.createComment({
owner: '${{ github.repository }}'.split('/')[0],
repo: '${{ github.repository }}'.split('/')[1],
issue_number: '${{ github.event.issue.number }}',
body: 'There is a problem with the iFlow CLI issue triaging. Please check the [action logs](${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}) for details.'
})

176
CLAUDE.md
View File

@@ -61,7 +61,70 @@ go test ./...
**Configuration System (`pkg/config/`):** Hierarchical configuration system with priority order: dynamic modifications > environment variables > config files > default YAML > global config > defaults.
**Task System (`pkg/task/`):** Asynchronous task management with dependency handling, lifecycle management, and graceful shutdown capabilities.
**Task System (`pkg/task/`):** Advanced asynchronous task management system with multiple layers:
- **Task:** Basic unit of work with lifecycle management (Start/Run/Dispose)
- **Job:** Container that manages multiple child tasks and provides event loops
- **Work:** Special type of Job that acts as a persistent queue manager (keepalive=true)
- **Channel:** Event-driven task for handling continuous data streams
### Task System Deep Dive
#### Task Hierarchy and Lifecycle
```
Work (Queue Manager)
└── Job (Container with Event Loop)
└── Task (Basic Work Unit)
├── Start() - Initialization phase
├── Run() - Main execution phase
└── Dispose() - Cleanup phase
```
#### Queue-based Asynchronous Processing
The Task system supports sophisticated queue-based processing patterns:
1. **Work as Queue Manager:** Work instances stay alive indefinitely and manage queues of tasks
2. **Task Queuing:** Use `workInstance.AddTask(task, logger)` to queue tasks
3. **Automatic Lifecycle:** Tasks are automatically started, executed, and disposed
4. **Error Handling:** Built-in retry mechanisms and error propagation
**Example Pattern (from S3 plugin):**
```go
type UploadQueueTask struct {
task.Work // Persistent queue manager
}
type FileUploadTask struct {
task.Task // Individual work item
// ... task-specific fields
}
// Initialize queue manager (typically in init())
var uploadQueueTask UploadQueueTask
m7s.Servers.AddTask(&uploadQueueTask)
// Queue individual tasks
uploadQueueTask.AddTask(&FileUploadTask{...}, logger)
```
#### Cross-Plugin Task Cooperation
Tasks can coordinate across different plugins through:
1. **Global Instance Pattern:** Plugins expose global instances for cross-plugin access
2. **Event-based Triggers:** One plugin triggers tasks in another plugin
3. **Shared Queue Managers:** Multiple plugins can use the same Work instance
**Example (MP4 → S3 Integration):**
```go
// In MP4 plugin: trigger S3 upload after recording completes
s3plugin.TriggerUpload(filePath, deleteAfter)
// S3 plugin receives trigger and queues upload task
func TriggerUpload(filePath string, deleteAfter bool) {
if s3PluginInstance != nil {
s3PluginInstance.QueueUpload(filePath, objectKey, deleteAfter)
}
}
```
### Key Interfaces
@@ -80,6 +143,18 @@ go test ./...
4. **Transformers** can process streams between publishers and subscribers
5. **Plugins** provide protocol-specific implementations
### Post-Recording Workflow
Monibuca implements a sophisticated post-recording processing pipeline:
1. **Recording Completion:** MP4 recorder finishes writing stream data
2. **Trailer Writing:** Asynchronous task moves MOOV box to file beginning for web compatibility
3. **File Optimization:** Temporary file operations ensure atomic updates
4. **External Storage Integration:** Automatic upload to S3-compatible services
5. **Cleanup:** Optional local file deletion after successful upload
This workflow uses queue-based task processing to avoid blocking the main recording pipeline.
## Plugin Development
### Creating a Plugin
@@ -100,6 +175,81 @@ go test ./...
3. **Run:** Active operation
4. **Dispose:** Cleanup and shutdown
### Cross-Plugin Communication Patterns
#### 1. Global Instance Pattern
```go
// Expose global instance for cross-plugin access
var s3PluginInstance *S3Plugin
func (p *S3Plugin) Start() error {
s3PluginInstance = p // Set global instance
// ... rest of start logic
}
// Provide public API functions
func TriggerUpload(filePath string, deleteAfter bool) {
if s3PluginInstance != nil {
s3PluginInstance.QueueUpload(filePath, objectKey, deleteAfter)
}
}
```
#### 2. Event-Driven Integration
```go
// In one plugin: trigger event after completion
if t.filePath != "" {
t.Info("MP4 file processing completed, triggering S3 upload")
s3plugin.TriggerUpload(t.filePath, false)
}
```
#### 3. Shared Queue Managers
Multiple plugins can share Work instances for coordinated processing.
### Asynchronous Task Development Best Practices
#### 1. Implement Task Interfaces
```go
type MyTask struct {
task.Task
// ... custom fields
}
func (t *MyTask) Start() error {
// Initialize resources, validate inputs
return nil
}
func (t *MyTask) Run() error {
// Main work execution
// Return task.ErrTaskComplete for successful completion
return nil
}
```
#### 2. Use Work for Queue Management
```go
type MyQueueManager struct {
task.Work
}
var myQueue MyQueueManager
func init() {
m7s.Servers.AddTask(&myQueue)
}
// Queue tasks from anywhere
myQueue.AddTask(&MyTask{...}, logger)
```
#### 3. Error Handling and Retry
- Tasks automatically support retry mechanisms
- Use `task.SetRetry(maxRetry, interval)` for custom retry behavior
- Return `task.ErrTaskComplete` for successful completion
- Return other errors to trigger retry or failure handling
## Configuration Structure
### Global Configuration
@@ -131,8 +281,9 @@ Automatic migration is handled for core models including users, proxies, and str
- **WebRTC:** Web real-time communication
- **GB28181:** Chinese surveillance standard
- **FLV:** Flash video format
- **MP4:** MPEG-4 format
- **MP4:** MPEG-4 format with post-processing capabilities
- **SRT:** Secure reliable transport
- **S3:** File upload integration with AWS S3/MinIO compatibility
## Authentication & Security
@@ -154,11 +305,18 @@ Automatic migration is handled for core models including users, proxies, and str
- Integration tests can use the example configurations
- Use the mock.py script for protocol testing
### Async Task Development
- Always use Work instances for queue management
- Implement proper Start/Run lifecycle in tasks
- Use global instance pattern for cross-plugin communication
- Handle errors gracefully with appropriate retry strategies
### Performance Considerations
- Memory pool is enabled by default (disable with `disable_rm`)
- Zero-copy design for media data where possible
- Lock-free data structures for high concurrency
- Efficient buffer management with ring buffers
- Queue-based processing prevents blocking main threads
## Debugging
@@ -173,6 +331,12 @@ Automatic migration is handled for core models including users, proxies, and str
- Log rotation support
- Fatal crash logging
### Task System Debugging
- Tasks automatically include detailed logging with task IDs and types
- Use `task.Debug/Info/Warn/Error` methods for consistent logging
- Task state and progress can be monitored through descriptions
- Event loop status and queue lengths are logged automatically
## Web Admin Interface
- Web-based admin UI served from `admin.zip`
@@ -196,4 +360,10 @@ Automatic migration is handled for core models including users, proxies, and str
### Plugin Loading
- Plugins are auto-discovered from imports
- Check plugin enable/disable status
- Verify configuration merging
- Verify configuration merging
### Task System Issues
- Ensure Work instances are added to server during initialization
- Check task queue status if tasks aren't executing
- Verify proper error handling in task implementation
- Monitor task retry counts and failure reasons in logs

92
GEMINI.md Normal file
View File

@@ -0,0 +1,92 @@
# Gemini Context: Monibuca Project
This document provides a summary of the Monibuca project to give context for AI-assisted development.
## Project Overview
Monibuca is a modular, high-performance streaming media server framework written in Go. Its core design is lightweight and plugin-based, allowing developers to extend functionality by adding or developing plugins for different streaming protocols and features. The project's module path is `m7s.live/v4`.
The architecture is centered around a core engine (`m7s.live/v4`) that manages plugins, streams, and the main event loop. Functionality is added by importing plugins, which register themselves with the core engine.
**Key Technologies:**
- **Language:** Go
- **Architecture:** Plugin-based
- **APIs:** RESTful HTTP API, gRPC API
**Supported Protocols (based on plugins):**
- RTMP
- RTSP
- HLS
- FLV
- WebRTC
- GB28181
- SRT
- And more...
## Building and Running
### Build
To build the server, run the following command from the project root:
```bash
go build -v .
```
### Test
To run the test suite:
```bash
go test -v ./...
```
### Running the Server
The server is typically run by creating a `main.go` file that imports the core engine and the desired plugins.
**Example `main.go`:**
```go
package main
import (
"m7s.live/v4"
// Import desired plugins to register them
_ "m7s.live/plugin/rtmp/v4"
_ "m7s.live/plugin/rtsp/v4"
_ "m7s.live/plugin/hls/v4"
_ "m7s.live/plugin/webrtc/v4"
)
func main() {
m7s.Run()
}
```
The server is executed by running `go run main.go`. Configuration is managed through a `config.yaml` file in the same directory.
### Docker
The project includes a `Dockerfile` to build and run in a container.
```bash
# Build the image
docker build -t monibuca .
# Run the container
docker run -p 8080:8080 monibuca
```
## Development Conventions
### Project Structure
- `server.go`: Core engine logic.
- `plugin/`: Contains individual plugins for different protocols and features.
- `pkg/`: Shared packages and utilities used across the project.
- `pb/`: Protobuf definitions for the gRPC API.
- `example/`: Example implementations and configurations.
- `doc/`: Project documentation.
### Plugin System
The primary way to add functionality is by creating or enabling plugins. A plugin is a Go package that registers itself with the core engine upon import (using the `init()` function). This modular approach keeps the core small and allows for custom builds with only the necessary features.
### API
- **RESTful API:** Defined in `api.go`, provides HTTP endpoints for controlling and monitoring the server.
- **gRPC API:** Defined in the `pb/` directory using protobuf. `protoc.sh` is used to generate the Go code from the `.proto` files.
### Code Style and CI
- The project uses `golangci-lint` for linting, as seen in the `.github/workflows/go.yml` file.
- Static analysis is configured via `staticcheck.conf` and `qodana.yaml`.
- All code should be formatted with `gofmt`.

124
IFLOW.md Normal file
View File

@@ -0,0 +1,124 @@
# Monibuca v5 项目概述
Monibuca 是一个使用纯 Go 语言开发的、高度可扩展的高性能流媒体服务器开发框架。它旨在提供高并发、低延迟的流媒体处理能力,并支持多种流媒体协议和功能。
## 核心特性
* **高性能**: 采用无锁设计、部分手动内存管理和多核计算。
* **低延迟**: 实现零等待转发,全链路亚秒级延迟。
* **模块化**: 按需加载,无限扩展性。
* **灵活性**: 高度可配置,适应各种流媒体场景。
* **可扩展性**: 支持分布式部署,轻松应对大规模场景。
* **调试友好**: 内置调试插件,实时性能监控与分析。
* **媒体处理**: 支持截图、转码、SEI 数据处理。
* **集群能力**: 内置级联和房间管理。
* **预览功能**: 支持视频预览、多屏预览、自定义屏幕布局。
* **安全性**: 提供加密传输和流认证。
* **性能监控**: 支持压力测试和性能指标收集(集成在测试插件中)。
* **日志管理**: 日志轮转、自动清理、自定义扩展。
* **录制与回放**: 支持 MP4、HLS、FLV 格式,支持倍速、寻址、暂停。
* **动态时移**: 动态缓存设计,支持直播时移回放。
* **远程调用**: 支持 gRPC 接口,实现跨语言集成。
* **流别名**: 支持动态流别名,灵活的多流管理。
* **AI 能力**: 集成推理引擎,支持 ONNX 模型,支持自定义前后处理。
* **WebHook**: 订阅流生命周期事件,用于业务系统集成。
* **私有协议**: 支持自定义私有协议以满足特殊业务需求。
## 支持的协议
* RTMP
* RTSP
* HTTP-FLV
* WS-FLV
* HLS
* WebRTC
* GB28181
* ONVIF
* SRT
## 技术架构
Monibuca 基于插件化架构设计,核心功能通过插件扩展。主要组件包括:
* **Server**: 核心服务器,负责管理流、插件、任务等。
* **Plugin**: 插件系统,提供各种功能扩展。
* **Publisher**: 流发布者,负责接收和管理流数据。
* **Subscriber**: 流订阅者,负责消费流数据。
* **Task**: 任务系统,用于管理异步任务和生命周期。
* **Config**: 配置系统,支持多层级配置(环境变量、配置文件、默认值等)。
## 构建与运行
### 前提条件
* Go 1.23 或更高版本
* 对流媒体协议有基本了解
### 运行默认配置
```bash
cd example/default
go run -tags sqlite main.go
```
### 构建标签
可以使用以下构建标签来自定义构建:
| 构建标签 | 描述 |
| :--- | :--- |
| `disable_rm` | 禁用内存池 |
| `sqlite` | 启用 sqlite DB |
| `sqliteCGO` | 启用 sqlite cgo 版本 DB |
| `mysql` | 启用 mysql DB |
| `postgres` | 启用 postgres DB |
| `duckdb` | 启用 duckdb DB |
| `taskpanic` | 抛出 panic用于测试 |
| `fasthttp` | 启用 fasthttp 服务器而不是 net/http |
### Web UI
`admin.zip` 文件(不要解压)放在与配置文件相同的目录中。然后访问 http://localhost:8080 即可访问 UI。
## 开发约定
### 项目结构
* `example/`: 包含各种使用示例。
* `pkg/`: 核心库代码。
* `plugin/`: 各种功能插件。
* `pb/`: Protocol Buffer 生成的代码。
* `doc/`: 项目文档。
* `scripts/`: 脚本文件。
### 配置
* 使用 YAML 格式进行配置。
* 支持多层级配置覆盖(环境变量 > 配置文件 > 默认值)。
* 插件配置通常以插件名小写作为前缀。
### 日志
* 使用 `slog` 进行日志记录。
* 支持不同日志级别debug, info, warn, error, trace
* 插件可以有自己的日志记录器。
### 插件开发
* 插件需要实现 `IPlugin` 接口。
* 通过 `InstallPlugin` 函数注册插件。
* 插件可以注册 HTTP 处理函数、gRPC 服务等。
* 插件可以有自己的配置结构体。
### 任务系统
* 使用 `task` 包管理异步任务。
* 任务具有生命周期管理(启动、停止、销毁)。
* 任务可以有父子关系,形成任务树。
* 支持任务重试机制。
### 测试
* 使用 Go 标准测试包 `testing`
*`test/` 目录下编写集成测试。
* 使用 `example/test` 目录进行功能测试。

View File

@@ -94,6 +94,17 @@ hls:
# live/test: rtsp://admin:1qaz2wsx3EDC@giroro.tpddns.cn:1554/Streaming/Channels/101
# live/test: rtsp://admin:1qaz2wsx3EDC@localhost:8554/live/test
s3:
auto: true # 启用自动上传
deleteAfterUpload: false # 上传后保留本地文件
endpoint: "storage-dev.xiding.tech"
accessKeyId: "xidinguser"
secretAccessKey: "U2FsdGVkX1/7uyvj0trCzSNFsfDZ66dMSAEZjNlvW1c="
bucket: "vidu-media-bucket"
pathPrefix: "recordings"
forcePathStyle: true
useSSL: true
snap:
enable: false
onpub:

View File

@@ -13,6 +13,7 @@ import (
"m7s.live/v5/pkg/config"
"m7s.live/v5/pkg/task"
"m7s.live/v5/plugin/mp4/pkg/box"
s3plugin "m7s.live/v5/plugin/s3"
)
type WriteTrailerQueueTask struct {
@@ -23,8 +24,9 @@ var writeTrailerQueueTask WriteTrailerQueueTask
type writeTrailerTask struct {
task.Task
muxer *Muxer
file *os.File
muxer *Muxer
file *os.File
filePath string
}
func (task *writeTrailerTask) Start() (err error) {
@@ -102,6 +104,13 @@ func (t *writeTrailerTask) Run() (err error) {
if err = temp.Close(); err != nil {
t.Error("close temp file", "err", err)
}
// MP4文件处理完成后触发S3上传
if t.filePath != "" {
t.Info("MP4 file processing completed, triggering S3 upload", "filePath", t.filePath)
s3plugin.TriggerUpload(t.filePath, false) // 不删除本地文件,让用户配置决定
}
return
}
@@ -122,8 +131,9 @@ type Recorder struct {
func (r *Recorder) writeTailer(end time.Time) {
r.WriteTail(end, &writeTrailerQueueTask)
writeTrailerQueueTask.AddTask(&writeTrailerTask{
muxer: r.muxer,
file: r.file,
muxer: r.muxer,
file: r.file,
filePath: r.Event.FilePath,
}, r.Logger)
}

View File

@@ -3,6 +3,7 @@ package plugin_s3
import (
"fmt"
"os"
"path/filepath"
"strings"
"github.com/aws/aws-sdk-go/aws"
@@ -10,23 +11,41 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"m7s.live/v5"
"m7s.live/v5/pkg/task"
"m7s.live/v5/plugin/s3/pb"
)
// 上传任务队列工作器
type UploadQueueTask struct {
task.Work
}
var uploadQueueTask UploadQueueTask
// 文件上传任务
type FileUploadTask struct {
task.Task
plugin *S3Plugin
filePath string
objectKey string
deleteAfterUpload bool
}
type S3Plugin struct {
pb.UnimplementedApiServer
m7s.Plugin
Endpoint string `desc:"S3 service endpoint, such as MinIO address"`
Region string `default:"us-east-1" desc:"AWS region"`
AccessKeyID string `desc:"S3 access key ID"`
SecretAccessKey string `desc:"S3 secret access key"`
Bucket string `desc:"S3 bucket name"`
PathPrefix string `desc:"file path prefix"`
ForcePathStyle bool `desc:"force path style (required for MinIO)"`
UseSSL bool `default:"true" desc:"whether to use SSL"`
Auto bool `desc:"whether to automatically upload recorded files"`
Timeout int `default:"30" desc:"upload timeout in seconds"`
s3Client *s3.S3
Endpoint string `desc:"S3 service endpoint, such as MinIO address"`
Region string `default:"us-east-1" desc:"AWS region"`
AccessKeyID string `desc:"S3 access key ID"`
SecretAccessKey string `desc:"S3 secret access key"`
Bucket string `desc:"S3 bucket name"`
PathPrefix string `desc:"file path prefix"`
ForcePathStyle bool `desc:"force path style (required for MinIO)"`
UseSSL bool `default:"true" desc:"whether to use SSL"`
Auto bool `desc:"whether to automatically upload recorded files"`
DeleteAfterUpload bool `desc:"whether to delete local file after successful upload"`
Timeout int `default:"30" desc:"upload timeout in seconds"`
s3Client *s3.S3
}
var _ = m7s.InstallPlugin[S3Plugin](m7s.PluginMeta{
@@ -34,7 +53,18 @@ var _ = m7s.InstallPlugin[S3Plugin](m7s.PluginMeta{
RegisterGRPCHandler: pb.RegisterApiHandler,
})
// 全局S3插件实例
var s3PluginInstance *S3Plugin
func init() {
// 将上传队列任务添加到服务器
m7s.Servers.AddTask(&uploadQueueTask)
}
func (p *S3Plugin) Start() error {
// 设置全局实例
s3PluginInstance = p
// Set default configuration
if p.Region == "" {
p.Region = "us-east-1"
@@ -129,3 +159,78 @@ func (p *S3Plugin) uploadFile(filePath, objectKey string) error {
p.Info("File uploaded successfully", "objectKey", objectKey, "size", fileInfo.Size())
return nil
}
// FileUploadTask的Start方法
func (task *FileUploadTask) Start() error {
task.Info("Starting file upload", "filePath", task.filePath, "objectKey", task.objectKey)
return nil
}
// FileUploadTask的Run方法
func (task *FileUploadTask) Run() error {
// 检查文件是否存在
if _, err := os.Stat(task.filePath); os.IsNotExist(err) {
return fmt.Errorf("file does not exist: %s", task.filePath)
}
// 执行上传
err := task.plugin.uploadFile(task.filePath, task.objectKey)
if err != nil {
task.Error("Failed to upload file", "error", err, "filePath", task.filePath)
return err
}
// 如果配置了上传后删除,则删除本地文件
if task.deleteAfterUpload {
if err := os.Remove(task.filePath); err != nil {
task.Warn("Failed to delete local file after upload", "error", err, "filePath", task.filePath)
} else {
task.Info("Local file deleted after successful upload", "filePath", task.filePath)
}
}
task.Info("File upload completed successfully", "filePath", task.filePath, "objectKey", task.objectKey)
return nil
}
// 队列上传文件方法
func (p *S3Plugin) QueueUpload(filePath, objectKey string, deleteAfter bool) {
if !p.Auto {
p.Debug("Auto upload is disabled, skipping upload", "filePath", filePath)
return
}
uploadTask := &FileUploadTask{
plugin: p,
filePath: filePath,
objectKey: objectKey,
deleteAfterUpload: deleteAfter || p.DeleteAfterUpload,
}
// 将上传任务添加到队列
uploadQueueTask.AddTask(uploadTask, p.Logger.With("filePath", filePath, "objectKey", objectKey))
p.Info("File upload queued", "filePath", filePath, "objectKey", objectKey)
}
// 生成S3对象键的辅助方法
func (p *S3Plugin) generateObjectKey(filePath string) string {
// 获取文件名
fileName := filepath.Base(filePath)
// 如果配置了路径前缀,则添加前缀
if p.PathPrefix != "" {
return strings.TrimSuffix(p.PathPrefix, "/") + "/" + fileName
}
return fileName
}
// TriggerUpload 全局函数供其他插件调用以触发S3上传
func TriggerUpload(filePath string, deleteAfter bool) {
if s3PluginInstance == nil {
return // S3插件未启用或未初始化
}
objectKey := s3PluginInstance.generateObjectKey(filePath)
s3PluginInstance.QueueUpload(filePath, objectKey, deleteAfter)
}