From df4d5e8913d40c013ebc717a97385ea0995c1599 Mon Sep 17 00:00:00 2001 From: ydajiang Date: Fri, 12 Dec 2025 14:35:14 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E8=BF=98=E6=9C=AA=E7=94=9F=E6=88=90?= =?UTF-8?q?=E6=88=AA=E5=9B=BE,=20=E8=AE=BF=E9=97=AE=E5=BA=94=E7=AD=94404?= =?UTF-8?q?=E9=97=AE=E9=A2=98;=20=E6=88=AA=E5=9B=BE=E7=BC=A9=E6=94=BE?= =?UTF-8?q?=E5=88=B0960*540=E5=A4=A7=E5=B0=8F;?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/api.go | 21 +++++++++++++-- api/api_hook.go | 32 +++++++++++++++++----- api/api_stream.go | 2 +- api/snapshot.go | 40 ++++++++++++++++++++++++--- api/snapshot_async.go | 63 +++++++++++++++++++++++++++++++++++++++++++ go.mod | 2 ++ 6 files changed, 148 insertions(+), 12 deletions(-) create mode 100644 api/snapshot_async.go diff --git a/api/api.go b/api/api.go index 1e1ff01..4e14ba9 100644 --- a/api/api.go +++ b/api/api.go @@ -259,6 +259,22 @@ func (api *ApiServer) registerStatisticsHandler(actionName, path string, handler api.actionNames[path] = actionName } +type SnapshotFilter struct { +} + +func (f *SnapshotFilter) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if "" != r.URL.Query().Get("preview_snapshot") { + t := r.URL.Query().Get("t") + ok := DefaultSnapshotManager.Get(t, 2*time.Second) + if !ok { + w.WriteHeader(http.StatusNotFound) + return + } + } + + http.FileServer(http.Dir("./snapshot")).ServeHTTP(w, r) +} + func StartApiServer(addr string) { apiServer.router.HandleFunc("/api/v1/hook/on_play", common.WithJsonParams(apiServer.OnPlay, &PlayDoneParams{})) apiServer.router.HandleFunc("/api/v1/hook/on_play_done", common.WithJsonParams(apiServer.OnPlayDone, &PlayDoneParams{})) @@ -268,7 +284,7 @@ func StartApiServer(addr string) { apiServer.router.HandleFunc("/api/v1/hook/on_receive_timeout", common.WithJsonParams(apiServer.OnReceiveTimeout, &StreamParams{})) apiServer.router.HandleFunc("/api/v1/hook/on_record", common.WithJsonParams(apiServer.OnRecord, &RecordParams{})) apiServer.router.HandleFunc("/api/v1/hook/on_started", apiServer.OnStarted) - apiServer.router.HandleFunc("/api/v1/hook/on_snapshot", common.WithJsonParams(apiServer.OnSnapshot, &SnapshotParams{})) + apiServer.router.HandleFunc("/api/v1/hook/on_snapshot", apiServer.OnSnapshot) apiServer.registerStatisticsHandler("开始预览", "/api/v1/stream/start", withVerify(common.WithFormDataParams(apiServer.OnStreamStart, InviteParams{}))) // 实时预览 apiServer.registerStatisticsHandler("停止预览", "/api/v1/stream/stop", withVerify(common.WithFormDataParams(apiServer.OnCloseLiveStream, InviteParams{}))) // 关闭实时预览 @@ -411,7 +427,8 @@ func StartApiServer(addr string) { }) // 映射snapshot目录 - apiServer.router.PathPrefix("/snapshot/").Handler(http.StripPrefix("/snapshot/", http.FileServer(http.Dir("./snapshot")))) + filter := SnapshotFilter{} + apiServer.router.PathPrefix("/snapshot/").Handler(http.StripPrefix("/snapshot/", &filter)) // 前端路由 htmlRoot := "./html/" diff --git a/api/api_hook.go b/api/api_hook.go index 7dcdea7..c67dfe8 100644 --- a/api/api_hook.go +++ b/api/api_hook.go @@ -8,6 +8,7 @@ import ( "gb-cms/stack" "github.com/csnewman/ffmpeg-go" "github.com/lkmio/avformat/utils" + "io" "net/http" "os" "path" @@ -234,11 +235,28 @@ func (api *ApiServer) OnStarted(_ http.ResponseWriter, _ *http.Request) { } } -func (api *ApiServer) OnSnapshot(v *SnapshotParams, writer http.ResponseWriter, request *http.Request) { +func (api *ApiServer) OnSnapshot(w http.ResponseWriter, r *http.Request) { if VideoKeyFrame2JPG == nil { return } + r.Body = http.MaxBytesReader(w, r.Body, 1024*1024*2) + data, err := io.ReadAll(r.Body) + if err != nil { + if err.Error() == "http: request body too large" { + http.Error(w, "file too large", http.StatusRequestEntityTooLarge) + } else { + http.Error(w, "read body failed", http.StatusInternalServerError) + } + return + } + + v := &SnapshotParams{} + v.Stream = common.StreamID(r.Header.Get("stream")) + v.Session = r.Header.Get("session") + v.Codec = r.Header.Get("codec") + v.KeyFrameData = data + var codecId ffmpeg.AVCodecID switch strings.ToLower(v.Codec) { case "h264": @@ -250,8 +268,8 @@ func (api *ApiServer) OnSnapshot(v *SnapshotParams, writer http.ResponseWriter, return } - jpgPath := GetSnapshotPath(v.Stream, v.Session) - err := os.MkdirAll(path.Dir(jpgPath), 0755) + jpgPath := GetSnapshotPath(v.Stream) + err = os.MkdirAll(path.Dir(jpgPath), 0755) if err != nil { log.Sugar.Errorf("创建目录失败 err: %s", err.Error()) return @@ -261,16 +279,18 @@ func (api *ApiServer) OnSnapshot(v *SnapshotParams, writer http.ResponseWriter, err = VideoKeyFrame2JPG(codecId, v.KeyFrameData, jpgPath) if err != nil { log.Sugar.Errorf("转换为JPEG失败 err: %s", err.Error()) - } else if err = dao.Channel.SetSnapshotPath(v.Stream.DeviceID(), v.Stream.ChannelID(), jpgPath); err != nil { + } else if err = dao.Channel.SetSnapshotPath(v.Stream.DeviceID(), v.Stream.ChannelID(), jpgPath+"?t="+v.Session); err != nil { // 数据库更新通道的最新截图 log.Sugar.Errorf("更新通道最新截图失败 err: %s", err.Error()) + } else { + DefaultSnapshotManager.Put(v.Session, true) } } -func GetSnapshotPath(streamID common.StreamID, sessionID string) string { +func GetSnapshotPath(streamID common.StreamID) string { if VideoKeyFrame2JPG == nil { return "" } - return path.Join("./snapshot/", string(streamID), sessionID+".jpg") + return path.Join("./snapshot/", string(streamID)+".jpg") } diff --git a/api/api_stream.go b/api/api_stream.go index e5e4935..153e39b 100644 --- a/api/api_stream.go +++ b/api/api_stream.go @@ -116,7 +116,7 @@ func (api *ApiServer) DoStartStream(v *InviteParams, w http.ResponseWriter, r *h RecordStartAt: "", RelaySize: 0, SMSID: "", - SnapURL: GetSnapshotPath(stream.StreamID, stream.SessionID), + SnapURL: GetSnapshotPath(stream.StreamID) + "?preview_snapshot=1&t=" + stream.SessionID, SourceAudioCodecName: "", SourceAudioSampleRate: 0, SourceVideoCodecName: "", diff --git a/api/snapshot.go b/api/snapshot.go index cee0a7b..f9e1413 100644 --- a/api/snapshot.go +++ b/api/snapshot.go @@ -3,8 +3,10 @@ package api import ( + "bytes" "fmt" "github.com/csnewman/ffmpeg-go" + "github.com/disintegration/imaging" "os" "unsafe" ) @@ -111,9 +113,41 @@ func saveFrameAsJPEG(frame *ffmpeg.AVFrame, filename string) error { return fmt.Errorf("接收包失败 %v", err) } - // 写入文件 - if _, err := file.Write(unsafe.Slice((*byte)(pkt.Data()), pkt.Size())); err != nil { - return fmt.Errorf("写入文件失败 %v", err) + // 获取编码后的 JPEG 原始字节数据 + jpegBytes := unsafe.Slice((*byte)(pkt.Data()), pkt.Size()) + + // 判断是否需要缩放 + targetW, targetH := 960, 540 + if (frame.Width() > frame.Height()) && frame.Width() > targetW || frame.Height() > targetH { + // [路径 A: 需要缩放] + + // 1. 将 JPEG 字节流解码为 Go Image 对象 + img, err := imaging.Decode(bytes.NewReader(jpegBytes)) + if err != nil { + return fmt.Errorf("Go解码图片失败: %v", err) + } + + // 2. 使用 imaging 库进行缩放 (Fit 会保持比例,适应 960x540 的框) + // 使用 Lanczos 算法保证缩放后的清晰度 + dstImg := imaging.Fit(img, targetW, targetH, imaging.Lanczos) + + // 3. 保存缩放后的图片到文件 + if err := imaging.Save(dstImg, filename, imaging.JPEGQuality(80)); err != nil { + return fmt.Errorf("保存缩放图片失败: %v", err) + } + + } else { + // [路径 B: 不需要缩放] + // 直接将 FFmpeg 编码好的字节写入文件,效率最高 + file, err := os.Create(filename) + if err != nil { + return fmt.Errorf("创建文件失败 %v", err) + } + defer file.Close() + + if _, err := file.Write(jpegBytes); err != nil { + return fmt.Errorf("写入文件失败 %v", err) + } } return nil diff --git a/api/snapshot_async.go b/api/snapshot_async.go new file mode 100644 index 0000000..184f87e --- /dev/null +++ b/api/snapshot_async.go @@ -0,0 +1,63 @@ +package api + +import ( + "container/list" + "context" + "sync" + "time" +) + +const ( + SnapshotQueueMaxSize = 128 +) + +var ( + DefaultSnapshotManager = &SnapshotManager{ + snapshots: make(map[string][]func(), SnapshotQueueMaxSize*1.5), + keys: list.New(), + } +) + +type SnapshotManager struct { + lock sync.RWMutex + snapshots map[string][]func() + keys *list.List +} + +func (s *SnapshotManager) Get(id string, timeout time.Duration) bool { + s.lock.Lock() + if _, ok := s.snapshots[id]; ok { + s.lock.Unlock() + return true + } + + background, cancel := context.WithCancel(context.Background()) + s.snapshots[id] = append(s.snapshots[id], cancel) + // 超时未获取到截图, 则认为不存在 + after := time.After(timeout) + + s.lock.Unlock() + select { + case <-after: + return false + case <-background.Done(): + return true + } +} + +func (s *SnapshotManager) Put(id string, ok bool) { + s.lock.Lock() + defer s.lock.Unlock() + + // 通知获取截图的goroutine, 截图已准备好 + for _, cancel := range s.snapshots[id] { + cancel() + } + + // 超出最大队列长度, 则删除最早的截图 + for s.keys.Len() > SnapshotQueueMaxSize { + oldElement := s.keys.Front() + s.keys.Remove(oldElement) + delete(s.snapshots, oldElement.Value.(string)) + } +} diff --git a/go.mod b/go.mod index 58f3b3b..b5d08dc 100644 --- a/go.mod +++ b/go.mod @@ -43,6 +43,7 @@ require ( github.com/yusufpapurcu/wmi v1.2.4 // indirect go.uber.org/multierr v1.10.0 // indirect golang.org/x/crypto v0.24.0 // indirect + golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8 // indirect golang.org/x/sys v0.21.0 // indirect golang.org/x/term v0.21.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect @@ -54,6 +55,7 @@ require ( require ( github.com/csnewman/ffmpeg-go v0.6.0 + github.com/disintegration/imaging v1.6.2 github.com/glebarez/sqlite v1.11.0 github.com/go-co-op/gocron/v2 v2.16.6 github.com/gorilla/mux v1.8.1