feat: save mp4 to s3 and play mp4 from s3

This commit is contained in:
pggiroro
2025-11-29 22:21:50 +08:00
parent b25fa38a6d
commit 00cb367901
5 changed files with 178 additions and 44 deletions

View File

@@ -63,6 +63,15 @@ func (s *LocalStorage) CreateFile(ctx context.Context, path string) (File, error
return file, nil
}
func (s *LocalStorage) OpenFile(ctx context.Context, path string) (File, error) {
// 只读模式打开文件
file, err := os.Open(path)
if err != nil {
return nil, fmt.Errorf("failed to open file: %w", err)
}
return file, nil
}
func (s *LocalStorage) Delete(ctx context.Context, path string) error {
return os.Remove(path)
}

View File

@@ -110,6 +110,17 @@ func (s *S3Storage) CreateFile(ctx context.Context, path string) (File, error) {
storage: s,
objectKey: objectKey,
ctx: ctx,
readOnly: false,
}, nil
}
func (s *S3Storage) OpenFile(ctx context.Context, path string) (File, error) {
objectKey := s.getObjectKey(path)
return &S3File{
storage: s,
objectKey: objectKey,
ctx: ctx,
readOnly: true, // 只读模式
}, nil
}
@@ -238,6 +249,7 @@ type S3File struct {
ctx context.Context
tempFile *os.File // 本地临时文件,用于支持随机访问
filePath string // 临时文件路径
readOnly bool // 只读模式不上传到S3
}
func (w *S3File) Name() string {
@@ -293,11 +305,23 @@ func (w *S3File) ReadAt(p []byte, off int64) (n int, err error) {
}
func (w *S3File) Sync() error {
// 只读模式不上传
if w.readOnly {
if w.tempFile != nil {
return w.tempFile.Sync()
}
return nil
}
// 如果使用临时文件,先同步到磁盘
if w.tempFile != nil {
if err := w.tempFile.Sync(); err != nil {
return err
}
// 获取文件大小用于日志
if stat, err := w.tempFile.Stat(); err == nil {
fmt.Printf("[S3File.Sync] tempFile size: %d bytes, path: %s\n", stat.Size(), w.filePath)
}
}
if err := w.uploadTempFile(); err != nil {
return err
@@ -349,6 +373,17 @@ func (w *S3File) Stat() (os.FileInfo, error) {
// uploadTempFile 上传临时文件到S3
func (w *S3File) uploadTempFile() (err error) {
// 重置文件指针到开头
if _, err := w.tempFile.Seek(0, 0); err != nil {
fmt.Printf("[S3File.uploadTempFile] failed to seek: %v\n", err)
return fmt.Errorf("failed to seek temp file: %w", err)
}
// 获取文件大小
stat, _ := w.tempFile.Stat()
fmt.Printf("[S3File.uploadTempFile] uploading to S3: bucket=%s, key=%s, size=%d\n",
w.storage.config.Bucket, w.objectKey, stat.Size())
// 上传到S3
_, err = w.storage.uploader.UploadWithContext(w.ctx, &s3manager.UploadInput{
Bucket: aws.String(w.storage.config.Bucket),
@@ -358,9 +393,11 @@ func (w *S3File) uploadTempFile() (err error) {
})
if err != nil {
fmt.Printf("[S3File.uploadTempFile] upload failed: %v\n", err)
return fmt.Errorf("failed to upload to S3: %w", err)
}
fmt.Printf("[S3File.uploadTempFile] upload successful: %s\n", w.objectKey)
return nil
}

View File

@@ -27,6 +27,8 @@ type StorageConfig interface {
// Storage 存储接口
type Storage interface {
CreateFile(ctx context.Context, path string) (File, error)
// OpenFile 以只读模式打开文件(不会上传修改)
OpenFile(ctx context.Context, path string) (File, error)
// Delete 删除文件
Delete(ctx context.Context, path string) error

View File

@@ -20,6 +20,7 @@ import (
"m7s.live/v5/pb"
"m7s.live/v5/pkg"
"m7s.live/v5/pkg/config"
"m7s.live/v5/pkg/storage"
"m7s.live/v5/pkg/util"
mp4pb "m7s.live/v5/plugin/mp4/pb"
mp4 "m7s.live/v5/plugin/mp4/pkg"
@@ -27,22 +28,100 @@ import (
)
type ContentPart struct {
*os.File
file storage.File
Start int64
Size int
boxies []box.IBox
}
func (c *ContentPart) Read(p []byte) (n int, err error) {
return c.file.Read(p)
}
func (c *ContentPart) Seek(offset int64, whence int) (int64, error) {
return c.file.Seek(offset, whence)
}
func (c *ContentPart) Close() error {
return c.file.Close()
}
func (p *MP4Plugin) downloadSingleFile(stream *m7s.RecordStream, flag mp4.Flag, w http.ResponseWriter, r *http.Request) {
if flag == 0 {
http.ServeFile(w, r, stream.FilePath)
} else if flag == mp4.FLAG_FRAGMENT {
file, err := os.Open(stream.FilePath)
// 获取文件(本地或远程)
var file storage.File
var err error
if stream.StorageType != "" && stream.StorageType != "local" {
// 远程存储:从 S3/OSS/COS 获取文件
var storageConfig map[string]any
// 遍历所有录像配置规则,查找包含该 storage 类型的配置
commonConf := p.GetCommonConf()
for _, recConf := range commonConf.OnPub.Record {
if recConf.Storage != nil {
if cfg, ok := recConf.Storage[stream.StorageType]; ok {
if cfgMap, ok := cfg.(map[string]any); ok {
storageConfig = cfgMap
break
}
}
}
}
if storageConfig == nil {
http.Error(w, "storage config not found", http.StatusInternalServerError)
p.Error("storage config not found", "storageType", stream.StorageType)
return
}
// 创建 storage 实例
storageInstance, err := storage.CreateStorage(stream.StorageType, storageConfig)
if err != nil {
http.Error(w, fmt.Sprintf("failed to create storage: %v", err), http.StatusInternalServerError)
p.Error("failed to create storage", "err", err)
return
}
// 对于普通 MP4直接重定向到预签名 URL
if flag == 0 {
url, err := storageInstance.GetURL(context.Background(), stream.FilePath)
if err != nil {
http.Error(w, fmt.Sprintf("failed to get URL: %v", err), http.StatusInternalServerError)
p.Error("failed to get URL", "err", err)
return
}
p.Info("redirect to storage URL", "storageType", stream.StorageType, "url", url)
http.Redirect(w, r, url, http.StatusFound)
return
}
// 对于 fmp4需要读取文件进行转换只读模式不会上传
file, err = storageInstance.OpenFile(context.Background(), stream.FilePath)
if err != nil {
http.Error(w, fmt.Sprintf("failed to open remote file: %v", err), http.StatusInternalServerError)
p.Error("failed to open remote file", "err", err)
return
}
defer file.Close()
p.Info("reading remote file for fmp4 conversion", "storageType", stream.StorageType, "path", stream.FilePath)
} else {
// 本地存储
if flag == 0 {
http.ServeFile(w, r, stream.FilePath)
return
}
// 本地文件用于 fmp4 转换
file, err = os.Open(stream.FilePath)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
p.Info("read", "file", file.Name())
defer file.Close()
}
// fmp4 转换处理(本地和远程文件统一处理)
if flag == mp4.FLAG_FRAGMENT {
p.Info("converting to fmp4", "file", file.Name())
demuxer := mp4.NewDemuxer(file)
err = demuxer.Demux()
if err != nil {
@@ -62,7 +141,7 @@ func (p *MP4Plugin) downloadSingleFile(stream *m7s.RecordStream, flag mp4.Flag,
for track, sample := range demuxer.RangeSample {
if part == nil {
part = &ContentPart{
File: file,
file: file,
Start: sample.Offset,
}
parts = append(parts, part)
@@ -319,7 +398,7 @@ func (p *MP4Plugin) download(w http.ResponseWriter, r *http.Request) {
// 创建内容片段
if part == nil {
part = &ContentPart{
File: file,
file: file,
Start: sample.Offset,
}
}
@@ -404,7 +483,7 @@ func (p *MP4Plugin) download(w http.ResponseWriter, r *http.Request) {
// 写入所有内容片段的数据
for _, part := range parts {
part.Seek(part.Start, io.SeekStart)
written, err = io.CopyN(w, part.File, int64(part.Size))
written, err = io.CopyN(w, part, int64(part.Size))
if err != nil {
return
}
@@ -612,14 +691,14 @@ func (p *MP4Plugin) Delete(ctx context.Context, req *mp4pb.ReqRecordDelete) (res
// CreateTag 创建标签
func (p *MP4Plugin) CreateTag(ctx context.Context, req *mp4pb.ReqCreateTag) (res *mp4pb.ResponseTag, err error) {
res = &mp4pb.ResponseTag{}
// 检查数据库连接
if p.DB == nil {
res.Code = 500
res.Message = pkg.ErrNoDB.Error()
return res, pkg.ErrNoDB
}
// 解析标签时间
tagTime, err := util.TimeQueryParse(req.TagTime)
if err != nil {
@@ -627,21 +706,21 @@ func (p *MP4Plugin) CreateTag(ctx context.Context, req *mp4pb.ReqCreateTag) (res
res.Message = "标签时间格式错误: " + err.Error()
return res, err
}
// 创建标签记录
tag := &mp4.TagModel{
TagName: req.TagName,
StreamPath: req.StreamPath,
TagTime: tagTime,
}
// 保存到数据库
if err = p.DB.Create(tag).Error; err != nil {
res.Code = 500
res.Message = "创建标签失败: " + err.Error()
return res, err
}
// 返回成功结果
res.Code = 0
res.Message = "创建成功"
@@ -653,21 +732,21 @@ func (p *MP4Plugin) CreateTag(ctx context.Context, req *mp4pb.ReqCreateTag) (res
CreatedAt: tag.CreatedAt.Format(time.RFC3339),
UpdatedAt: tag.UpdatedAt.Format(time.RFC3339),
}
return res, nil
}
// UpdateTag 更新标签
func (p *MP4Plugin) UpdateTag(ctx context.Context, req *mp4pb.ReqUpdateTag) (res *mp4pb.ResponseTag, err error) {
res = &mp4pb.ResponseTag{}
// 检查数据库连接
if p.DB == nil {
res.Code = 500
res.Message = pkg.ErrNoDB.Error()
return res, pkg.ErrNoDB
}
// 查询标签是否存在
var tag mp4.TagModel
if err = p.DB.First(&tag, req.Id).Error; err != nil {
@@ -675,7 +754,7 @@ func (p *MP4Plugin) UpdateTag(ctx context.Context, req *mp4pb.ReqUpdateTag) (res
res.Message = "标签不存在: " + err.Error()
return res, err
}
// 更新字段
if req.TagName != "" {
tag.TagName = req.TagName
@@ -692,14 +771,14 @@ func (p *MP4Plugin) UpdateTag(ctx context.Context, req *mp4pb.ReqUpdateTag) (res
}
tag.TagTime = tagTime
}
// 保存更新
if err = p.DB.Save(&tag).Error; err != nil {
res.Code = 500
res.Message = "更新标签失败: " + err.Error()
return res, err
}
// 返回成功结果
res.Code = 0
res.Message = "更新成功"
@@ -711,49 +790,49 @@ func (p *MP4Plugin) UpdateTag(ctx context.Context, req *mp4pb.ReqUpdateTag) (res
CreatedAt: tag.CreatedAt.Format(time.RFC3339),
UpdatedAt: tag.UpdatedAt.Format(time.RFC3339),
}
return res, nil
}
// DeleteTag 删除标签(软删除)
func (p *MP4Plugin) DeleteTag(ctx context.Context, req *mp4pb.ReqDeleteTag) (res *mp4pb.ResponseTag, err error) {
res = &mp4pb.ResponseTag{}
// 检查数据库连接
if p.DB == nil {
res.Code = 500
res.Message = pkg.ErrNoDB.Error()
return res, pkg.ErrNoDB
}
// 软删除标签
if err = p.DB.Delete(&mp4.TagModel{}, req.Id).Error; err != nil {
res.Code = 500
res.Message = "删除标签失败: " + err.Error()
return res, err
}
// 返回成功结果
res.Code = 0
res.Message = "删除成功"
return res, nil
}
// ListTag 查询标签列表
func (p *MP4Plugin) ListTag(ctx context.Context, req *mp4pb.ReqListTag) (res *mp4pb.ResponseTagList, err error) {
res = &mp4pb.ResponseTagList{}
// 检查数据库连接
if p.DB == nil {
res.Code = 500
res.Message = pkg.ErrNoDB.Error()
return res, pkg.ErrNoDB
}
// 构建查询
query := p.DB.Model(&mp4.TagModel{})
// 流路径过滤(默认模糊匹配)
if req.StreamPath != "" {
if strings.Contains(req.StreamPath, "*") {
@@ -762,7 +841,7 @@ func (p *MP4Plugin) ListTag(ctx context.Context, req *mp4pb.ReqListTag) (res *mp
query = query.Where("stream_path LIKE ?", "%"+req.StreamPath+"%")
}
}
// 标签名称过滤(默认模糊匹配)
if req.TagName != "" {
if strings.Contains(req.TagName, "*") {
@@ -771,7 +850,7 @@ func (p *MP4Plugin) ListTag(ctx context.Context, req *mp4pb.ReqListTag) (res *mp
query = query.Where("tag_name LIKE ?", "%"+req.TagName+"%")
}
}
// 时间范围过滤(只有当传入了时间参数时才进行过滤)
if req.Start != "" {
startTime, err := util.TimeQueryParse(req.Start)
@@ -785,7 +864,7 @@ func (p *MP4Plugin) ListTag(ctx context.Context, req *mp4pb.ReqListTag) (res *mp
query = query.Where("tag_time <= ?", endTime)
}
}
// 分页
page := req.Page
count := req.Count
@@ -796,7 +875,7 @@ func (p *MP4Plugin) ListTag(ctx context.Context, req *mp4pb.ReqListTag) (res *mp
count = 10
}
offset := (page - 1) * count
// 获取总数
var total int64
if err = query.Count(&total).Error; err != nil {
@@ -804,7 +883,7 @@ func (p *MP4Plugin) ListTag(ctx context.Context, req *mp4pb.ReqListTag) (res *mp
res.Message = "查询总数失败: " + err.Error()
return res, err
}
// 查询数据
var tags []mp4.TagModel
if err = query.Order("tag_time DESC").Offset(int(offset)).Limit(int(count)).Find(&tags).Error; err != nil {
@@ -812,13 +891,13 @@ func (p *MP4Plugin) ListTag(ctx context.Context, req *mp4pb.ReqListTag) (res *mp
res.Message = "查询标签失败: " + err.Error()
return res, err
}
// 转换为响应格式
res.Code = 0
res.Message = "查询成功"
res.Total = uint32(total)
res.List = make([]*mp4pb.TagInfo, 0, len(tags))
for _, tag := range tags {
res.List = append(res.List, &mp4pb.TagInfo{
Id: uint32(tag.ID),
@@ -829,6 +908,6 @@ func (p *MP4Plugin) ListTag(ctx context.Context, req *mp4pb.ReqListTag) (res *mp
UpdatedAt: tag.UpdatedAt.Format(time.RFC3339),
})
}
return res, nil
}

View File

@@ -53,7 +53,8 @@ type (
CreatedAt time.Time
DeletedAt gorm.DeletedAt `gorm:"index" yaml:"-"`
RecordLevel config.EventLevel `json:"eventLevel" desc:"事件级别" gorm:"type:varchar(255);comment:事件级别,high表示重要事件无法删除且表示无需自动删除,low表示非重要事件,达到自动删除时间后,自动删除;default:'low'"`
StorageLevel int `json:"storageLevel" desc:"存储级别" gorm:"comment:存储级别,1=主存储,2=次级存储;default:1"`
StorageLevel int `json:"storageLevel" desc:"存储级别" gorm:"comment:存储级别,1=主存储,2=次级存储;default:1"`
StorageType string `json:"storageType" desc:"存储类型" gorm:"type:varchar(20);comment:存储类型(local/s3/oss/cos);default:'local'"`
}
)
@@ -72,7 +73,8 @@ func (r *DefaultRecorder) CreateStream(start time.Time, customFileName func(*Rec
// 生成文件路径
filePath := customFileName(recordJob)
recordJob.storage = r.createStorage(recordJob.RecConf.Storage)
var storageType string
recordJob.storage, storageType = r.createStorage(recordJob.RecConf.Storage)
if recordJob.storage == nil {
return fmt.Errorf("storage config is required")
@@ -84,6 +86,7 @@ func (r *DefaultRecorder) CreateStream(start time.Time, customFileName func(*Rec
FilePath: filePath,
Type: recordJob.RecConf.Type,
StorageLevel: 1, // 默认为主存储
StorageType: storageType,
}
if sub.Publisher.HasAudioTrack() {
@@ -105,21 +108,25 @@ func (r *DefaultRecorder) CreateStream(start time.Time, customFileName func(*Rec
return
}
// createStorage 创建存储实例
func (r *DefaultRecorder) createStorage(storageConfig map[string]any) storage.Storage {
// createStorage 创建存储实例,返回 storage 和存储类型
func (r *DefaultRecorder) createStorage(storageConfig map[string]any) (storage.Storage, string) {
for t, conf := range storageConfig {
r.Info("trying to create storage", "type", t, "config", conf)
storage, err := storage.CreateStorage(t, conf)
if err == nil {
return storage
r.Info("storage created successfully", "type", t)
return storage, t
}
r.Error("create storage failed", "type", t, "err", err)
}
r.Warn("falling back to local storage", "path", r.RecordJob.RecConf.FilePath)
localStorage, err := storage.CreateStorage("local", r.RecordJob.RecConf.FilePath)
if err == nil {
return localStorage
return localStorage, "local"
} else {
r.Error("create storage failed", "err", err)
r.Error("create local storage failed", "err", err)
}
return nil
return nil, ""
}
func (r *DefaultRecorder) WriteTail(end time.Time, tailJob task.IJob) {