fix: concrurrent map writing

This commit is contained in:
langhuihui
2023-08-18 09:56:27 +08:00
parent 8f33e9b802
commit c2bc5bf730
2 changed files with 32 additions and 11 deletions

View File

@@ -8,6 +8,7 @@ import (
"path/filepath" "path/filepath"
"regexp" "regexp"
"strings" "strings"
"sync"
"time" "time"
"m7s.live/engine/v4/util" "m7s.live/engine/v4/util"
@@ -19,6 +20,19 @@ type FileWr interface {
io.Seeker io.Seeker
io.Closer io.Closer
} }
var WritingFiles sync.Map
type FileWriter struct {
filePath string
*os.File
}
func (f *FileWriter) Close() error {
WritingFiles.Delete(f.File.Name())
return f.File.Close()
}
type VideoFileInfo struct { type VideoFileInfo struct {
Path string Path string
Size int64 Size int64
@@ -35,7 +49,6 @@ type Record struct {
fs http.Handler fs http.Handler
CreateFileFn func(filename string, append bool) (FileWr, error) `json:"-" yaml:"-"` CreateFileFn func(filename string, append bool) (FileWr, error) `json:"-" yaml:"-"`
GetDurationFn func(file io.ReadSeeker) uint32 `json:"-" yaml:"-"` GetDurationFn func(file io.ReadSeeker) uint32 `json:"-" yaml:"-"`
recording map[string]IRecorder
} }
func (r *Record) ServeHTTP(w http.ResponseWriter, req *http.Request) { func (r *Record) ServeHTTP(w http.ResponseWriter, req *http.Request) {
@@ -47,18 +60,27 @@ func (r *Record) NeedRecord(streamPath string) bool {
} }
func (r *Record) Init() { func (r *Record) Init() {
r.recording = make(map[string]IRecorder) os.MkdirAll(r.Path, 0666)
os.MkdirAll(r.Path, 0766)
if r.Filter != "" { if r.Filter != "" {
r.filterReg = regexp.MustCompile(r.Filter) r.filterReg = regexp.MustCompile(r.Filter)
} }
r.fs = http.FileServer(http.Dir(r.Path)) r.fs = http.FileServer(http.Dir(r.Path))
r.CreateFileFn = func(filename string, append bool) (file FileWr, err error) { r.CreateFileFn = func(filename string, append bool) (file FileWr, err error) {
filePath := filepath.Join(r.Path, filename) filePath := filepath.Join(r.Path, filename)
if err = os.MkdirAll(filepath.Dir(filePath), 0766); err != nil { if err = os.MkdirAll(filepath.Dir(filePath), 0666); err != nil {
return file, err return file, err
} }
file, err = os.OpenFile(filePath, os.O_CREATE | os.O_RDWR | util.Conditoinal(append, os.O_APPEND, os.O_TRUNC), 0766) fw := &FileWriter{filePath: filePath}
if !append {
if _, loaded := WritingFiles.LoadOrStore(filePath, fw); loaded {
return file, ErrRecordExist
}
}
file, err = os.OpenFile(filePath, os.O_CREATE|os.O_RDWR|util.Conditoinal(append, os.O_APPEND, os.O_TRUNC), 0666)
if err == nil && !append {
fw.File = file.(*os.File)
return fw, nil
}
return return
} }
} }

View File

@@ -24,6 +24,7 @@ type Recorder struct {
Record `json:"-" yaml:"-"` Record `json:"-" yaml:"-"`
File FileWr `json:"-" yaml:"-"` File FileWr `json:"-" yaml:"-"`
FileName string // 自定义文件名,分段录像无效 FileName string // 自定义文件名,分段录像无效
filePath string // 文件路径
append bool // 是否追加模式 append bool // 是否追加模式
} }
@@ -36,12 +37,12 @@ func (r *Recorder) CreateFile() (FileWr, error) {
} }
func (r *Recorder) createFile() (f FileWr, err error) { func (r *Recorder) createFile() (f FileWr, err error) {
filePath := r.getFileName(r.Stream.Path) + r.Ext r.filePath = r.getFileName(r.Stream.Path) + r.Ext
f, err = r.CreateFileFn(filePath, r.append) f, err = r.CreateFileFn(r.filePath, r.append)
if err == nil { if err == nil {
r.Info("create file", zap.String("path", filePath)) r.Info("create file", zap.String("path", r.filePath))
} else { } else {
r.Error("create file", zap.String("path", filePath), zap.Error(err)) r.Error("create file", zap.String("path", r.filePath), zap.Error(err))
} }
return return
} }
@@ -64,12 +65,10 @@ func (r *Recorder) start(re IRecorder, streamPath string, subType byte) (err err
if _, loaded := RecordPluginConfig.recordings.LoadOrStore(r.ID, re); loaded { if _, loaded := RecordPluginConfig.recordings.LoadOrStore(r.ID, re); loaded {
return ErrRecordExist return ErrRecordExist
} }
r.recording[streamPath] = re
r.Closer = re r.Closer = re
go func() { go func() {
r.PlayBlock(subType) r.PlayBlock(subType)
RecordPluginConfig.recordings.Delete(r.ID) RecordPluginConfig.recordings.Delete(r.ID)
delete(r.recording, streamPath)
}() }()
} }
return return