feat: add schedule iframe mode and save record to db

This commit is contained in:
banshan
2024-12-26 22:31:34 +08:00
parent 79150b05de
commit 09f32bbb03
7 changed files with 302 additions and 17 deletions

View File

@@ -55,8 +55,8 @@ snap:
fontcolor: "rgba(255,165,0,1)"
offsetx: 10
offsety: 10
snapmode: 1
snapmode: 0
snaptimeinterval: 1s
snapsavepath: "./snaps"
snapiframeinterval: 0
snapiframeinterval: 3
filter: "^live/.*"

120
plugin/snap/README.md Normal file
View File

@@ -0,0 +1,120 @@
# Snap 插件
Snap 插件提供了对流媒体的截图功能,支持定时截图、按关键帧截图以及手动触发截图。同时支持水印功能和历史截图查询。
## 配置说明
```yaml
snap:
snapwatermark:
text: "" # 水印文字内容
fontpath: "" # 水印字体文件路径
fontcolor: "rgba(255,165,0,1)" # 水印字体颜色支持rgba格式
fontsize: 36 # 水印字体大小
offsetx: 0 # 水印位置X偏移
offsety: 0 # 水印位置Y偏移
snaptimeinterval: 1m # 截图时间间隔默认1分钟
snapsavepath: "snaps" # 截图保存路径
filter: ".*" # 截图流过滤器,支持正则表达式
snapiframeinterval: 3 # 间隔多少帧截图
snapmode: 1 # 截图模式0-时间间隔1-关键帧间隔
snapquerytimedelta: 3 # 查询截图时允许的最大时间差(秒)
```
## HTTP API
### 1. 手动触发截图
```http
GET /{streamPath}
```
参数说明:
- `streamPath`: 流路径
响应:
- 成功:返回 JPEG 图片
- 失败:返回错误信息
### 2. 查询历史截图
```http
GET /query?streamPath={streamPath}&snapTime={timestamp}
```
参数说明:
- `streamPath`: 流路径
- `snapTime`: Unix时间戳
响应:
- 成功:返回最接近请求时间的 JPEG 图片
- 失败:返回错误信息
- 404未找到截图或时间差超出配置范围
- 400参数错误
- 500服务器内部错误
## 截图模式说明
### 时间间隔模式 (snapmode: 0)
- 按照配置的 `snaptimeinterval` 定时对流进行截图
- 适合需要固定时间间隔截图的场景
### 关键帧间隔模式 (snapmode: 1)
- 按照配置的 `snapiframeinterval` 对关键帧进行截图
- 适合需要按视频内容变化进行截图的场景
### HTTP请求模式 (snapmode: 2)
- 通过 HTTP API 手动触发截图
- 适合需要实时获取画面的场景
## 水印功能
支持为截图添加文字水印,可配置:
- 水印文字内容
- 字体文件
- 字体颜色RGBA格式
- 字体大小
- 位置偏移
## 数据库记录
每次截图都会在数据库中记录以下信息:
- 流名称StreamName
- 截图模式SnapMode
- 截图时间SnapTime
- 截图路径SnapPath
- 创建时间CreatedAt
## 使用示例
1. 基础配置示例:
```yaml
snap:
snaptimeinterval: 30s
snapsavepath: "./snapshots"
snapmode: 1
snapiframeinterval: 5
```
2. 带水印的配置示例:
```yaml
snap:
snapwatermark:
text: "测试水印"
fontpath: "/path/to/font.ttf"
fontcolor: "rgba(255,0,0,0.5)"
fontsize: 48
offsetx: 20
offsety: 20
snapmode: 0
snaptimeinterval: 1m
```
3. API调用示例
```bash
# 手动触发截图
curl http://localhost:8080/snap/live/stream1
# 查询历史截图
curl http://localhost:8080/snap/query?streamPath=live/stream1&snapTime=1677123456
```

View File

@@ -8,8 +8,10 @@ import (
_ "image/jpeg"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/disintegration/imaging"
"github.com/golang/freetype/truetype"
@@ -57,28 +59,28 @@ func (t *SnapPlugin) snap(streamPath string) (*bytes.Buffer, error) {
// 读取字体文件
fontBytes, err := os.ReadFile(t.SnapWatermark.FontPath)
if err != nil {
t.Error("read font file error", err)
t.Error("read font file failed", "error", err.Error())
return nil, err
}
// 解析字体
font, err := truetype.Parse(fontBytes)
if err != nil {
t.Error("parse font error", err)
t.Error("parse font failed", "error", err.Error())
return nil, err
}
// 解码图片
img, _, err := image.Decode(bytes.NewReader(buf.Bytes()))
if err != nil {
t.Error("decode image error", err)
t.Error("decode image failed", "error", err.Error())
return nil, err
}
// 解码颜色
rgba, err := parseRGBA(t.SnapWatermark.FontColor)
if err != nil {
t.Error("parse color error", err)
t.Error("parse color failed", "error", err.Error())
return nil, err
}
// 确保alpha通道正确
@@ -104,14 +106,14 @@ func (t *SnapPlugin) snap(streamPath string) (*bytes.Buffer, error) {
OffsetY: t.SnapWatermark.OffsetY,
}, false)
if err != nil {
t.Error("add watermark error", err)
t.Error("add watermark failed", "error", err.Error())
return nil, err
}
// 清空原buffer并写入新图片
buf.Reset()
if err := imaging.Encode(buf, result, imaging.JPEG); err != nil {
t.Error("encode image error", err)
t.Error("encode image failed", "error", err.Error())
return nil, err
}
}
@@ -133,17 +135,97 @@ func (t *SnapPlugin) doSnap(rw http.ResponseWriter, r *http.Request) {
return
}
// 保存截图并记录到数据库
if t.DB != nil {
now := time.Now()
filename := fmt.Sprintf("%s_%s.jpg", streamPath, now.Format("20060102150405"))
filename = strings.ReplaceAll(filename, "/", "_")
savePath := filepath.Join(t.SnapSavePath, filename)
// 保存到本地
err = os.WriteFile(savePath, buf.Bytes(), 0644)
if err != nil {
t.Error("save snapshot failed", "error", err.Error())
} else {
// 保存记录到数据库
record := SnapRecord{
StreamName: streamPath,
SnapMode: 2, // HTTP请求截图模式
SnapTime: now,
SnapPath: savePath,
}
if err := t.DB.Create(&record).Error; err != nil {
t.Error("save snapshot record failed", "error", err.Error())
}
}
}
rw.Header().Set("Content-Type", "image/jpeg")
rw.Header().Set("Content-Length", strconv.Itoa(buf.Len()))
if _, err := buf.WriteTo(rw); err != nil {
t.Error("write response error", err.Error())
t.Error("write response failed", "error", err.Error())
return
}
}
func (t *SnapPlugin) querySnap(rw http.ResponseWriter, r *http.Request) {
if t.DB == nil {
http.Error(rw, "database not initialized", http.StatusInternalServerError)
return
}
streamPath := r.URL.Query().Get("streamPath")
if streamPath == "" {
http.Error(rw, "streamPath is required", http.StatusBadRequest)
return
}
snapTimeStr := r.URL.Query().Get("snapTime")
if snapTimeStr == "" {
http.Error(rw, "snapTime is required", http.StatusBadRequest)
return
}
snapTimeUnix, err := strconv.ParseInt(snapTimeStr, 10, 64)
if err != nil {
http.Error(rw, "invalid snapTime format, should be unix timestamp", http.StatusBadRequest)
return
}
targetTime := time.Unix(snapTimeUnix, 0)
var record SnapRecord
// 查询小于等于目标时间的最近一条记录
if err := t.DB.Where("stream_name = ? AND snap_time <= ?", streamPath, targetTime).
Order("snap_time DESC").
First(&record).Error; err != nil {
http.Error(rw, "snapshot not found", http.StatusNotFound)
return
}
// 计算时间差(秒)
timeDiff := targetTime.Sub(record.SnapTime).Seconds()
if timeDiff > float64(t.SnapQueryTimeDelta) {
http.Error(rw, "no snapshot found within time delta", http.StatusNotFound)
return
}
// 读取图片文件
imgData, err := os.ReadFile(record.SnapPath)
if err != nil {
http.Error(rw, "failed to read snapshot file", http.StatusNotFound)
return
}
rw.Header().Set("Content-Type", "image/jpeg")
rw.Header().Set("Content-Length", strconv.Itoa(len(imgData)))
rw.Write(imgData)
}
func (config *SnapPlugin) RegisterHandler() map[string]http.HandlerFunc {
return map[string]http.HandlerFunc{
"/{streamPath...}": config.doSnap,
"/query": config.querySnap,
}
}

View File

@@ -32,6 +32,7 @@ type SnapPlugin struct {
Filter string `default:".*" desc:"截图流过滤器,支持正则表达式"`
SnapIFrameInterval int `default:"3" desc:"间隔多少帧截图"`
SnapMode int `default:"1" desc:"截图模式 0:间隔时间 1:间隔关键帧"`
SnapQueryTimeDelta int `default:"3" desc:"查询截图时允许的最大时间差(秒)"`
filterRegex *regexp.Regexp
}
@@ -46,6 +47,15 @@ func (p *SnapPlugin) OnInit() (err error) {
return fmt.Errorf("invalid snap mode: %d, valid range is 0-1", p.SnapMode)
}
// 初始化数据库
if p.DB != nil {
err = p.DB.AutoMigrate(&SnapRecord{})
if err != nil {
p.Error("failed to migrate database", "error", err.Error())
return
}
}
// 创建保存目录
if err = os.MkdirAll(p.SnapSavePath, 0755); err != nil {
return
@@ -105,6 +115,11 @@ func (p *SnapPlugin) OnInit() (err error) {
)
}
//如果截图模式不是时间模式,则不加定时任务
if p.SnapMode != 0 {
return
}
// 如果间隔时间小于0则不添加定时任务;等于0则走onpub的transform
if p.SnapTimeInterval <= 0 {
return

15
plugin/snap/model.go Normal file
View File

@@ -0,0 +1,15 @@
package plugin_snap
import (
"time"
)
// SnapRecord 截图记录
type SnapRecord struct {
ID uint `gorm:"primarykey"`
StreamName string `gorm:"index"` // 流名称
SnapMode int // 截图模式
SnapTime time.Time `gorm:"index"` // 截图时间
SnapPath string // 截图路径
CreatedAt time.Time
}

View File

@@ -96,7 +96,7 @@ func processWithFFmpeg(annexb pkg.AnnexB, output io.Writer) error {
}
// 保存截图到文件
func saveSnapshot(annexb pkg.AnnexB, savePath string) error {
func saveSnapshot(annexb pkg.AnnexB, savePath string, plugin *m7s.Plugin, streamPath string, snapMode int) error {
var buf bytes.Buffer
if err := processWithFFmpeg(annexb, &buf); err != nil {
return fmt.Errorf("process with ffmpeg error: %w", err)
@@ -108,10 +108,38 @@ func saveSnapshot(annexb pkg.AnnexB, savePath string) error {
if err != nil {
return fmt.Errorf("add watermark error: %w", err)
}
return os.WriteFile(savePath, imgData, 0644)
err = os.WriteFile(savePath, imgData, 0644)
if err != nil {
return err
}
} else {
err := os.WriteFile(savePath, buf.Bytes(), 0644)
if err != nil {
return err
}
}
return os.WriteFile(savePath, buf.Bytes(), 0644)
// 保存记录到数据库
if plugin != nil && plugin.DB != nil {
record := struct {
ID uint `gorm:"primarykey"`
StreamName string `gorm:"index"` // 流名称
SnapMode int // 截图模式
SnapTime time.Time `gorm:"index"` // 截图时间
SnapPath string // 截图路径
CreatedAt time.Time
}{
StreamName: streamPath,
SnapMode: snapMode,
SnapTime: time.Now(),
SnapPath: savePath,
}
if err := plugin.DB.Create(&record).Error; err != nil {
return fmt.Errorf("save snapshot record failed: %w", err)
}
}
return nil
}
func NewTransform() m7s.ITransformer {
@@ -284,7 +312,7 @@ func (t *Transformer) Go() error {
savePath := filepath.Join(t.savePath, filename)
// 保存截图(带水印)
if err := saveSnapshot(annexb, savePath); err != nil {
if err := saveSnapshot(annexb, savePath, t.TransformJob.Plugin, subscriber.StreamPath, t.snapMode); err != nil {
t.Error("save snapshot failed",
"error", err.Error(),
"stream", subscriber.StreamPath,

View File

@@ -32,22 +32,47 @@ func (t *SnapTimerTask) Tick(any) {
}
if publisher.HasVideoTrack() {
streamPath := publisher.StreamPath
go func() {
buf, err := t.Plugin.snap(publisher.StreamPath)
buf, err := t.Plugin.snap(streamPath)
if err != nil {
t.Error("take snapshot failed", "error", err.Error())
return
}
filename := fmt.Sprintf("%s_%s.jpg", publisher.StreamPath, time.Now().Format("20060102150405"))
now := time.Now()
filename := fmt.Sprintf("%s_%s.jpg", streamPath, now.Format("20060102150405"))
filename = strings.ReplaceAll(filename, "/", "_")
savePath := filepath.Join(t.SavePath, filename)
// 保存到本地
err = os.WriteFile(savePath, buf.Bytes(), 0644)
if err != nil {
t.Error("take snapshot failed", "error", err.Error())
return
}
t.Info("take snapshot success", "path", savePath)
// 保存记录到数据库
if t.Plugin.DB != nil {
record := SnapRecord{
StreamName: streamPath,
SnapMode: t.Plugin.SnapMode,
SnapTime: now,
SnapPath: savePath,
}
if err := t.Plugin.DB.Create(&record).Error; err != nil {
t.Error("save snapshot record failed",
"error", err.Error(),
"record", record,
)
} else {
t.Info("save snapshot record success",
"stream", streamPath,
"path", savePath,
"time", now,
)
}
} else {
t.Info("take snapshot success", "path", savePath)
t.Warn("database not initialized, skip saving record")
}
}()
}