From 69308c466b0479898304fd66fcfa3c4700867ef9 Mon Sep 17 00:00:00 2001 From: ydajiang Date: Sat, 13 Sep 2025 15:45:16 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=94=AF=E6=8C=81=E5=BC=80=E5=90=AF?= =?UTF-8?q?=E5=92=8C=E7=BB=93=E6=9D=9F=E5=BD=95=E5=88=B6=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api.go | 47 ++++++++++++++++++- record/record_flv.go | 12 +++-- stream/config.go | 5 +++ stream/stream_publisher.go | 92 ++++++++++++++++++++++++++++++++++---- 4 files changed, 143 insertions(+), 13 deletions(-) diff --git a/api.go b/api.go index 75b5528..0c4a0d2 100644 --- a/api.go +++ b/api.go @@ -99,6 +99,11 @@ func startApiServer(addr string) { handler.ServeHTTP(w, r) }) }) + + // 点播, 映射录制资源 + // 放在最前面, 避免被后面的路由拦截 + apiServer.router.PathPrefix("/record/").Handler(http.StripPrefix("/record/", http.FileServer(http.Dir(stream.AppConfig.Record.Dir)))) + // {source}.flv和/{source}/{stream}.flv意味着, 推流id(路径)只能嵌套一层 apiServer.router.HandleFunc("/{source}.flv", filterSourceID(apiServer.onFlv, ".flv")) apiServer.router.HandleFunc("/{source}/{stream}.flv", filterSourceID(apiServer.onFlv, ".flv")) @@ -120,6 +125,8 @@ func startApiServer(addr string) { apiServer.router.HandleFunc("/api/v1/sink/list", withJsonParams(apiServer.OnSinkList, &IDS{})) // 查询某个推流源下,所有的拉流端列表 apiServer.router.HandleFunc("/api/v1/sink/close", withJsonParams(apiServer.OnSinkClose, &IDS{})) // 关闭拉流端 apiServer.router.HandleFunc("/api/v1/sink/add", withJsonParams(apiServer.OnSinkAdd, &GBOffer{})) // 级联/广播/JT转GB + apiServer.router.HandleFunc("/api/v1/record/start", apiServer.OnRecordStart) // 开启录制 + apiServer.router.HandleFunc("/api/v1/record/stop", apiServer.OnRecordStop) // 关闭录制 apiServer.router.HandleFunc("/api/v1/streams/statistics", nil) // 统计所有推拉流 @@ -560,6 +567,11 @@ func (api *ApiServer) OnStreamInfo(w http.ResponseWriter, r *http.Request) { liveGBSUrls[streamName] = url } + var recordStartTime string + if startTime := source.GetTransStreamPublisher().RecordStartTime(); !startTime.IsZero() { + recordStartTime = startTime.Format("2006-01-02 15:04:05") + } + statistics := source.GetBitrateStatistics() response := struct { AudioEnable bool `json:"AudioEnable"` @@ -583,7 +595,7 @@ func (api *ApiServer) OnStreamInfo(w http.ResponseWriter, r *http.Request) { RTPLostCount int `json:"RTPLostCount"` RTPLostRate int `json:"RTPLostRate"` RTSP string `json:"RTSP"` - RecordStartAt string `json:"RecordStartAt"` + RecordStartAt string `json:"RecordStartAt"` // 录制时间 RelaySize int `json:"RelaySize"` SMSID string `json:"SMSID"` SnapURL string `json:"SnapURL"` @@ -621,7 +633,7 @@ func (api *ApiServer) OnStreamInfo(w http.ResponseWriter, r *http.Request) { RTPLostCount: 0, RTPLostRate: 0, RTSP: liveGBSUrls["rtsp"], - RecordStartAt: "", + RecordStartAt: recordStartTime, RelaySize: 0, SMSID: "", SnapURL: "", @@ -650,3 +662,34 @@ func (api *ApiServer) OnStreamInfo(w http.ResponseWriter, r *http.Request) { httpResponseJson(w, &response) } + +func (api *ApiServer) OnRecordStart(w http.ResponseWriter, req *http.Request) { + streamId := req.FormValue("streamid") + source := stream.SourceManager.Find(streamId) + if source == nil { + log.Sugar.Errorf("OnRecordStart stream not found streamid %s", streamId) + w.WriteHeader(http.StatusNotFound) + } else if url, ok := source.GetTransStreamPublisher().StartRecord(); !ok { + w.WriteHeader(http.StatusBadRequest) + } else { + // 返回拉流地址 + httpResponseJson(w, &struct { + DownloadURL string `json:"DownloadURL"` + }{ + DownloadURL: url, + }) + } + +} + +func (api *ApiServer) OnRecordStop(w http.ResponseWriter, req *http.Request) { + streamId := req.FormValue("streamid") + source := stream.SourceManager.Find(streamId) + if source == nil { + log.Sugar.Errorf("OnRecordStop stream not found streamid %s", streamId) + w.WriteHeader(http.StatusNotFound) + } else if err := source.GetTransStreamPublisher().StopRecord(); err != nil { + w.WriteHeader(http.StatusBadRequest) + httpResponseJson(w, err.Error()) + } +} diff --git a/record/record_flv.go b/record/record_flv.go index e9ee149..779ff2d 100644 --- a/record/record_flv.go +++ b/record/record_flv.go @@ -11,6 +11,7 @@ import ( type FLVFileSink struct { stream.BaseSink file *os.File + path string fail bool } @@ -47,22 +48,27 @@ func (f *FLVFileSink) Close() { f.file.Close() f.file = nil } + + if source := stream.SourceManager.Find(f.SourceID); source != nil { + stream.HookRecordEvent(source, f.path) + } } // NewFLVFileSink 创建FLV文件录制流Sink // 保存path: dir/sourceId/yyyy-MM-dd/HH-mm-ss.flv func NewFLVFileSink(sourceId string) (stream.Sink, string, error) { now := time.Now().Format("2006-01-02/15-04-05") - path := filepath.Join(stream.AppConfig.Record.Dir, sourceId, now+".flv") + path := filepath.Join(sourceId, now+".flv") + dirPath := filepath.Join(stream.AppConfig.Record.Dir, path) // 创建目录 - dir := filepath.Dir(path) + dir := filepath.Dir(dirPath) if err := os.MkdirAll(dir, 0666); err != nil { return nil, "", err } // 创建flv文件 - file, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0666) + file, err := os.OpenFile(dirPath, os.O_CREATE|os.O_RDWR, 0666) if err != nil { return nil, "", err } diff --git a/stream/config.go b/stream/config.go index 698c835..3b798ec 100644 --- a/stream/config.go +++ b/stream/config.go @@ -351,3 +351,8 @@ func limitInt(min, max, value int) int { return value } + +// GenerateRecordStreamPlayUrl 生成录制文件的播放url +func GenerateRecordStreamPlayUrl(recordFile string) string { + return fmt.Sprintf("http://%s:%d/record/%s", AppConfig.PublicIP, AppConfig.Http.Port, recordFile) +} diff --git a/stream/stream_publisher.go b/stream/stream_publisher.go index 93a8cf5..b326109 100644 --- a/stream/stream_publisher.go +++ b/stream/stream_publisher.go @@ -1,6 +1,7 @@ package stream import ( + "bytes" "fmt" "github.com/lkmio/avformat" "github.com/lkmio/avformat/collections" @@ -9,6 +10,9 @@ import ( "github.com/lkmio/lkm/log" "github.com/lkmio/lkm/transcode" "github.com/lkmio/transport" + "path/filepath" + "runtime" + "strconv" "sync" "sync/atomic" "time" @@ -66,6 +70,16 @@ type TransStreamPublisher interface { ExecuteSyncEvent(cb func()) SetSourceID(id string) + + // StartRecord 开启录制 + // 如果AppConfig已经开启了全局录制, 则无需手动开启, 返回false + StartRecord() (string, bool) + + // StopRecord 停止录制 + // 如果AppConfig已经开启了全局录制, 返回error + StopRecord() error + + RecordStartTime() time.Time } type transStreamPublisher struct { @@ -78,6 +92,7 @@ type transStreamPublisher struct { recordSink Sink // 每个Source的录制流 recordFilePath string // 录制流文件路径 + recordStartTime time.Time // 开始录制时间 hlsStream TransStream // HLS传输流 originTracks TrackManager // 推流的原始track transcodeTracks map[utils.AVCodecID]*TranscodeTrack // 转码Track @@ -99,7 +114,18 @@ func (t *transStreamPublisher) Post(event *StreamEvent) { t.streamEvents.Post(event) } +func getGoroutineID() uint64 { + b := make([]byte, 64) + b = b[:runtime.Stack(b, false)] + b = bytes.TrimPrefix(b, []byte("goroutine ")) + b = b[:bytes.IndexByte(b, ' ')] + n, _ := strconv.ParseUint(string(b), 10, 64) + return n +} + func (t *transStreamPublisher) run() { + log.Sugar.Infof("transStreamPublisher run goroutine id: %d", getGoroutineID()) + t.streamEvents = NewNonBlockingChannel[*StreamEvent](256) t.mainContextEvents = make(chan func(), 256) @@ -165,6 +191,19 @@ func (t *transStreamPublisher) ExecuteSyncEvent(cb func()) { group.Wait() } +func (t *transStreamPublisher) createRecordSink() bool { + sink, path, err := CreateRecordStream(t.source) + if err != nil { + log.Sugar.Errorf("创建录制sink失败 source: %s err: %s", t.source, err.Error()) + return false + } + + t.recordSink = sink + t.recordFilePath = path + t.recordStartTime = time.Now() + return true +} + func (t *transStreamPublisher) CreateDefaultOutStreams() { if t.transStreams == nil { t.transStreams = make(map[TransStreamID]TransStream, 10) @@ -172,13 +211,7 @@ func (t *transStreamPublisher) CreateDefaultOutStreams() { // 创建录制流 if AppConfig.Record.Enable { - sink, path, err := CreateRecordStream(t.source) - if err != nil { - log.Sugar.Errorf("创建录制sink失败 source: %s err: %s", t.source, err.Error()) - } else { - t.recordSink = sink - t.recordFilePath = path - } + t.createRecordSink() } // 创建HLS输出流 @@ -628,12 +661,12 @@ func (t *transStreamPublisher) clearSinkStreaming(sink Sink) { delete(transStreamSinks, sink.GetID()) t.lastStreamEndTime = time.Now() sink.StopStreaming(t.transStreams[sink.GetTransStreamID()]) + delete(t.sinks, sink.GetID()) } func (t *transStreamPublisher) doRemoveSink(sink Sink) bool { if _, ok := t.sinks[sink.GetID()]; ok { t.clearSinkStreaming(sink) - delete(t.sinks, sink.GetID()) t.sinkCount-- log.Sugar.Infof("sink count: %d source: %s", t.sinkCount, t.source) @@ -873,6 +906,49 @@ func (t *transStreamPublisher) SetSourceID(id string) { t.source = id } +func (t *transStreamPublisher) StartRecord() (string, bool) { + if AppConfig.Record.Enable || t.recordSink != nil { + return "", false + } + + var ok bool + t.ExecuteSyncEvent(func() { + if t.recordSink == nil && t.createRecordSink() { + ok = t.doAddSink(t.recordSink, false) + } + }) + + var url string + if ok { + // 去掉反斜杠 + url = GenerateRecordStreamPlayUrl(filepath.ToSlash(t.recordFilePath)) + } + + return url, ok +} + +func (t *transStreamPublisher) StopRecord() error { + if AppConfig.Record.Enable { + return fmt.Errorf("录制常开") + } + + t.ExecuteSyncEvent(func() { + if t.recordSink != nil { + t.clearSinkStreaming(t.recordSink) + t.recordSink.Close() + t.recordSink = nil + t.recordFilePath = "" + t.recordStartTime = time.Time{} + } + }) + + return nil +} + +func (t *transStreamPublisher) RecordStartTime() time.Time { + return t.recordStartTime +} + func NewTransStreamPublisher(source string) TransStreamPublisher { return &transStreamPublisher{ transStreams: make(map[TransStreamID]TransStream),