feat: 支持开启和结束录制流

This commit is contained in:
ydajiang
2025-09-13 15:45:16 +08:00
parent 7150525c20
commit 69308c466b
4 changed files with 143 additions and 13 deletions

47
api.go
View File

@@ -99,6 +99,11 @@ func startApiServer(addr string) {
handler.ServeHTTP(w, r)
})
})
// 点播, 映射录制资源
// 放在最前面, 避免被后面的路由拦截
apiServer.router.PathPrefix("/record/").Handler(http.StripPrefix("/record/", http.FileServer(http.Dir(stream.AppConfig.Record.Dir))))
// {source}.flv和/{source}/{stream}.flv意味着, 推流id(路径)只能嵌套一层
apiServer.router.HandleFunc("/{source}.flv", filterSourceID(apiServer.onFlv, ".flv"))
apiServer.router.HandleFunc("/{source}/{stream}.flv", filterSourceID(apiServer.onFlv, ".flv"))
@@ -120,6 +125,8 @@ func startApiServer(addr string) {
apiServer.router.HandleFunc("/api/v1/sink/list", withJsonParams(apiServer.OnSinkList, &IDS{})) // 查询某个推流源下,所有的拉流端列表
apiServer.router.HandleFunc("/api/v1/sink/close", withJsonParams(apiServer.OnSinkClose, &IDS{})) // 关闭拉流端
apiServer.router.HandleFunc("/api/v1/sink/add", withJsonParams(apiServer.OnSinkAdd, &GBOffer{})) // 级联/广播/JT转GB
apiServer.router.HandleFunc("/api/v1/record/start", apiServer.OnRecordStart) // 开启录制
apiServer.router.HandleFunc("/api/v1/record/stop", apiServer.OnRecordStop) // 关闭录制
apiServer.router.HandleFunc("/api/v1/streams/statistics", nil) // 统计所有推拉流
@@ -560,6 +567,11 @@ func (api *ApiServer) OnStreamInfo(w http.ResponseWriter, r *http.Request) {
liveGBSUrls[streamName] = url
}
var recordStartTime string
if startTime := source.GetTransStreamPublisher().RecordStartTime(); !startTime.IsZero() {
recordStartTime = startTime.Format("2006-01-02 15:04:05")
}
statistics := source.GetBitrateStatistics()
response := struct {
AudioEnable bool `json:"AudioEnable"`
@@ -583,7 +595,7 @@ func (api *ApiServer) OnStreamInfo(w http.ResponseWriter, r *http.Request) {
RTPLostCount int `json:"RTPLostCount"`
RTPLostRate int `json:"RTPLostRate"`
RTSP string `json:"RTSP"`
RecordStartAt string `json:"RecordStartAt"`
RecordStartAt string `json:"RecordStartAt"` // 录制时间
RelaySize int `json:"RelaySize"`
SMSID string `json:"SMSID"`
SnapURL string `json:"SnapURL"`
@@ -621,7 +633,7 @@ func (api *ApiServer) OnStreamInfo(w http.ResponseWriter, r *http.Request) {
RTPLostCount: 0,
RTPLostRate: 0,
RTSP: liveGBSUrls["rtsp"],
RecordStartAt: "",
RecordStartAt: recordStartTime,
RelaySize: 0,
SMSID: "",
SnapURL: "",
@@ -650,3 +662,34 @@ func (api *ApiServer) OnStreamInfo(w http.ResponseWriter, r *http.Request) {
httpResponseJson(w, &response)
}
func (api *ApiServer) OnRecordStart(w http.ResponseWriter, req *http.Request) {
streamId := req.FormValue("streamid")
source := stream.SourceManager.Find(streamId)
if source == nil {
log.Sugar.Errorf("OnRecordStart stream not found streamid %s", streamId)
w.WriteHeader(http.StatusNotFound)
} else if url, ok := source.GetTransStreamPublisher().StartRecord(); !ok {
w.WriteHeader(http.StatusBadRequest)
} else {
// 返回拉流地址
httpResponseJson(w, &struct {
DownloadURL string `json:"DownloadURL"`
}{
DownloadURL: url,
})
}
}
func (api *ApiServer) OnRecordStop(w http.ResponseWriter, req *http.Request) {
streamId := req.FormValue("streamid")
source := stream.SourceManager.Find(streamId)
if source == nil {
log.Sugar.Errorf("OnRecordStop stream not found streamid %s", streamId)
w.WriteHeader(http.StatusNotFound)
} else if err := source.GetTransStreamPublisher().StopRecord(); err != nil {
w.WriteHeader(http.StatusBadRequest)
httpResponseJson(w, err.Error())
}
}

View File

@@ -11,6 +11,7 @@ import (
type FLVFileSink struct {
stream.BaseSink
file *os.File
path string
fail bool
}
@@ -47,22 +48,27 @@ func (f *FLVFileSink) Close() {
f.file.Close()
f.file = nil
}
if source := stream.SourceManager.Find(f.SourceID); source != nil {
stream.HookRecordEvent(source, f.path)
}
}
// NewFLVFileSink 创建FLV文件录制流Sink
// 保存path: dir/sourceId/yyyy-MM-dd/HH-mm-ss.flv
func NewFLVFileSink(sourceId string) (stream.Sink, string, error) {
now := time.Now().Format("2006-01-02/15-04-05")
path := filepath.Join(stream.AppConfig.Record.Dir, sourceId, now+".flv")
path := filepath.Join(sourceId, now+".flv")
dirPath := filepath.Join(stream.AppConfig.Record.Dir, path)
// 创建目录
dir := filepath.Dir(path)
dir := filepath.Dir(dirPath)
if err := os.MkdirAll(dir, 0666); err != nil {
return nil, "", err
}
// 创建flv文件
file, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0666)
file, err := os.OpenFile(dirPath, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return nil, "", err
}

View File

@@ -351,3 +351,8 @@ func limitInt(min, max, value int) int {
return value
}
// GenerateRecordStreamPlayUrl 生成录制文件的播放url
func GenerateRecordStreamPlayUrl(recordFile string) string {
return fmt.Sprintf("http://%s:%d/record/%s", AppConfig.PublicIP, AppConfig.Http.Port, recordFile)
}

View File

@@ -1,6 +1,7 @@
package stream
import (
"bytes"
"fmt"
"github.com/lkmio/avformat"
"github.com/lkmio/avformat/collections"
@@ -9,6 +10,9 @@ import (
"github.com/lkmio/lkm/log"
"github.com/lkmio/lkm/transcode"
"github.com/lkmio/transport"
"path/filepath"
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"
@@ -66,6 +70,16 @@ type TransStreamPublisher interface {
ExecuteSyncEvent(cb func())
SetSourceID(id string)
// StartRecord 开启录制
// 如果AppConfig已经开启了全局录制, 则无需手动开启, 返回false
StartRecord() (string, bool)
// StopRecord 停止录制
// 如果AppConfig已经开启了全局录制, 返回error
StopRecord() error
RecordStartTime() time.Time
}
type transStreamPublisher struct {
@@ -78,6 +92,7 @@ type transStreamPublisher struct {
recordSink Sink // 每个Source的录制流
recordFilePath string // 录制流文件路径
recordStartTime time.Time // 开始录制时间
hlsStream TransStream // HLS传输流
originTracks TrackManager // 推流的原始track
transcodeTracks map[utils.AVCodecID]*TranscodeTrack // 转码Track
@@ -99,7 +114,18 @@ func (t *transStreamPublisher) Post(event *StreamEvent) {
t.streamEvents.Post(event)
}
func getGoroutineID() uint64 {
b := make([]byte, 64)
b = b[:runtime.Stack(b, false)]
b = bytes.TrimPrefix(b, []byte("goroutine "))
b = b[:bytes.IndexByte(b, ' ')]
n, _ := strconv.ParseUint(string(b), 10, 64)
return n
}
func (t *transStreamPublisher) run() {
log.Sugar.Infof("transStreamPublisher run goroutine id: %d", getGoroutineID())
t.streamEvents = NewNonBlockingChannel[*StreamEvent](256)
t.mainContextEvents = make(chan func(), 256)
@@ -165,6 +191,19 @@ func (t *transStreamPublisher) ExecuteSyncEvent(cb func()) {
group.Wait()
}
func (t *transStreamPublisher) createRecordSink() bool {
sink, path, err := CreateRecordStream(t.source)
if err != nil {
log.Sugar.Errorf("创建录制sink失败 source: %s err: %s", t.source, err.Error())
return false
}
t.recordSink = sink
t.recordFilePath = path
t.recordStartTime = time.Now()
return true
}
func (t *transStreamPublisher) CreateDefaultOutStreams() {
if t.transStreams == nil {
t.transStreams = make(map[TransStreamID]TransStream, 10)
@@ -172,13 +211,7 @@ func (t *transStreamPublisher) CreateDefaultOutStreams() {
// 创建录制流
if AppConfig.Record.Enable {
sink, path, err := CreateRecordStream(t.source)
if err != nil {
log.Sugar.Errorf("创建录制sink失败 source: %s err: %s", t.source, err.Error())
} else {
t.recordSink = sink
t.recordFilePath = path
}
t.createRecordSink()
}
// 创建HLS输出流
@@ -628,12 +661,12 @@ func (t *transStreamPublisher) clearSinkStreaming(sink Sink) {
delete(transStreamSinks, sink.GetID())
t.lastStreamEndTime = time.Now()
sink.StopStreaming(t.transStreams[sink.GetTransStreamID()])
delete(t.sinks, sink.GetID())
}
func (t *transStreamPublisher) doRemoveSink(sink Sink) bool {
if _, ok := t.sinks[sink.GetID()]; ok {
t.clearSinkStreaming(sink)
delete(t.sinks, sink.GetID())
t.sinkCount--
log.Sugar.Infof("sink count: %d source: %s", t.sinkCount, t.source)
@@ -873,6 +906,49 @@ func (t *transStreamPublisher) SetSourceID(id string) {
t.source = id
}
func (t *transStreamPublisher) StartRecord() (string, bool) {
if AppConfig.Record.Enable || t.recordSink != nil {
return "", false
}
var ok bool
t.ExecuteSyncEvent(func() {
if t.recordSink == nil && t.createRecordSink() {
ok = t.doAddSink(t.recordSink, false)
}
})
var url string
if ok {
// 去掉反斜杠
url = GenerateRecordStreamPlayUrl(filepath.ToSlash(t.recordFilePath))
}
return url, ok
}
func (t *transStreamPublisher) StopRecord() error {
if AppConfig.Record.Enable {
return fmt.Errorf("录制常开")
}
t.ExecuteSyncEvent(func() {
if t.recordSink != nil {
t.clearSinkStreaming(t.recordSink)
t.recordSink.Close()
t.recordSink = nil
t.recordFilePath = ""
t.recordStartTime = time.Time{}
}
})
return nil
}
func (t *transStreamPublisher) RecordStartTime() time.Time {
return t.recordStartTime
}
func NewTransStreamPublisher(source string) TransStreamPublisher {
return &transStreamPublisher{
transStreams: make(map[TransStreamID]TransStream),