feature: add feature that hls record and vod hls record file

This commit is contained in:
pg
2025-01-22 15:41:20 +08:00
committed by pggiroro
parent 9a0d22fa4e
commit b33a72caab
4 changed files with 479 additions and 47 deletions

View File

@@ -4,17 +4,17 @@ import (
"embed"
"fmt"
"net/http"
"os"
"path"
"strings"
"time"
"m7s.live/v5/pkg/util"
"m7s.live/v5"
"m7s.live/v5/pkg/util"
hls "m7s.live/v5/plugin/hls/pkg"
)
var _ = m7s.InstallPlugin[HLSPlugin](hls.NewPuller, hls.NewTransform)
var _ = m7s.InstallPlugin[HLSPlugin](hls.NewTransform, hls.NewRecorder)
//go:embed hls.js
var hls_js embed.FS
@@ -59,8 +59,45 @@ func (config *HLSPlugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if strings.HasSuffix(r.URL.Path, ".m3u8") {
w.Header().Add("Content-Type", "application/vnd.apple.mpegurl")
streamPath := strings.TrimSuffix(fileName, ".m3u8")
// If memory lookup failed or returned empty, try database
startTime, endTime, _ := util.TimeRangeQueryParse(r.URL.Query())
if !startTime.IsZero() {
if config.DB != nil {
var records []m7s.RecordStream
query := `stream_path = ? AND type = 'hls' AND start_time IS NOT NULL AND end_time IS NOT NULL AND ? <= end_time AND ? >= start_time`
config.DB.Where(query, streamPath, startTime, endTime).Find(&records)
if len(records) > 0 {
playlist := hls.Playlist{
Version: 3,
Sequence: 0,
Targetduration: 90,
}
var plBuffer util.Buffer
playlist.Writer = &plBuffer
playlist.Init()
for _, record := range records {
duration := record.EndTime.Sub(record.StartTime).Seconds()
playlist.WriteInf(hls.PlaylistInf{
Duration: duration,
Title: path.Base(record.FilePath),
FilePath: record.FilePath,
})
}
plBuffer.WriteString("#EXT-X-ENDLIST\n")
w.Write(plBuffer)
return
}
}
}
if v, ok := hls.MemoryM3u8.Load(streamPath); ok && v.(string) != "" {
w.Write([]byte(v.(string)))
return
}
for {
if v, ok := hls.MemoryM3u8.Load(streamPath); ok {
if v, ok := hls.MemoryM3u8.Load(streamPath); ok && v.(string) != "" {
w.Write([]byte(v.(string)))
return
}
@@ -72,49 +109,29 @@ func (config *HLSPlugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
break
}
}
// w.Write([]byte(fmt.Sprintf(`#EXTM3U
// #EXT-X-VERSION:3
// #EXT-X-MEDIA-SEQUENCE:%d
// #EXT-X-TARGETDURATION:%d
// #EXT-X-DISCONTINUITY-SEQUENCE:%d
// #EXT-X-DISCONTINUITY
// #EXTINF:%.3f,
// default.ts`, defaultSeq, int(math.Ceil(config.DefaultTSDuration.Seconds())), defaultSeq, config.DefaultTSDuration.Seconds())))
} else if strings.HasSuffix(r.URL.Path, ".ts") {
w.Header().Add("Content-Type", "video/mp2t") //video/mp2t
parts := strings.Split(fileName, "/")
filePath := strings.Join(parts[1:], "/")
data, err := os.ReadFile(filePath)
if err == nil {
w.Write(data)
return
}
streamPath := path.Dir(fileName)
for {
tsData, ok := hls.MemoryTs.Load(streamPath)
if !ok {
tsData, ok = hls.MemoryTs.Load(path.Dir(streamPath))
if !ok {
if waitTimeout > 0 && time.Since(waitStart) < waitTimeout {
time.Sleep(time.Second)
continue
} else {
// w.Write(defaultTS)
return
}
}
}
for {
if tsData, ok := tsData.(hls.TsCacher).GetTs(fileName); ok {
switch v := tsData.(type) {
case *hls.TsInMemory:
v.WriteTo(w)
case util.Buffer:
w.Write(v)
}
return
} else {
if waitTimeout > 0 && time.Since(waitStart) < waitTimeout {
time.Sleep(time.Second)
continue
} else {
// w.Write(defaultTS)
return
}
tsData, ok := hls.MemoryTs.Load(streamPath)
if !ok {
tsData, ok = hls.MemoryTs.Load(path.Dir(streamPath))
}
if ok {
if tsData, ok := tsData.(hls.TsCacher).GetTs(fileName); ok {
switch v := tsData.(type) {
case *hls.TsInMemory:
v.WriteTo(w)
case util.Buffer:
w.Write(v)
}
return
}
}
} else {

View File

@@ -31,10 +31,19 @@ type Puller struct {
memoryTs sync.Map
}
func NewPuller(_ config.Pull) m7s.IPuller {
p := &Puller{}
p.SetDescription(task.OwnerTypeKey, "HLSPuller")
return p
func NewPuller(conf config.Pull) m7s.IPuller {
if strings.HasPrefix(conf.URL, "http") || strings.HasSuffix(conf.URL, ".m3u8") {
p := &Puller{}
p.SetDescription(task.OwnerTypeKey, "HLSPuller")
return p
}
if conf.Args.Get(util.StartKey) != "" {
p := &RecordReader{}
p.Type = "hls"
p.SetDescription(task.OwnerTypeKey, "HLSRecordReader")
return p
}
return nil
}
func (p *Puller) GetPullJob() *m7s.PullJob {

219
plugin/hls/pkg/record.go Normal file
View File

@@ -0,0 +1,219 @@
package hls
import (
"fmt"
"os"
"path/filepath"
"time"
"gorm.io/gorm"
"m7s.live/v5"
"m7s.live/v5/pkg"
"m7s.live/v5/pkg/codec"
"m7s.live/v5/pkg/task"
"m7s.live/v5/pkg/util"
mpegts "m7s.live/v5/plugin/hls/pkg/ts"
)
func NewRecorder() m7s.IRecorder {
return &Recorder{}
}
type Recorder struct {
m7s.DefaultRecorder
stream m7s.RecordStream
ts *TsInFile
pesAudio *mpegts.MpegtsPESFrame
pesVideo *mpegts.MpegtsPESFrame
segmentCount uint32
lastTs time.Duration
firstSegment bool
}
var CustomFileName = func(job *m7s.RecordJob) string {
if job.Fragment == 0 || job.Append {
return fmt.Sprintf("%s/%s.ts", job.FilePath, time.Now().Format("20060102150405"))
}
return filepath.Join(job.FilePath, time.Now().Format("20060102150405")+".ts")
}
func (r *Recorder) createStream(start time.Time) (err error) {
recordJob := &r.RecordJob
sub := recordJob.Subscriber
r.stream = m7s.RecordStream{
StartTime: start,
StreamPath: sub.StreamPath,
FilePath: CustomFileName(&r.RecordJob),
EventId: recordJob.EventId,
EventDesc: recordJob.EventDesc,
EventName: recordJob.EventName,
EventLevel: recordJob.EventLevel,
BeforeDuration: recordJob.BeforeDuration,
AfterDuration: recordJob.AfterDuration,
Mode: recordJob.Mode,
Type: "hls",
}
dir := filepath.Dir(r.stream.FilePath)
dir = filepath.Clean(dir)
if err = os.MkdirAll(dir, 0755); err != nil {
r.Error("create directory failed", "err", err, "dir", dir)
return
}
if sub.Publisher.HasAudioTrack() {
r.stream.AudioCodec = sub.Publisher.AudioTrack.ICodecCtx.FourCC().String()
}
if sub.Publisher.HasVideoTrack() {
r.stream.VideoCodec = sub.Publisher.VideoTrack.ICodecCtx.FourCC().String()
}
if recordJob.Plugin.DB != nil {
recordJob.Plugin.DB.Save(&r.stream)
}
return
}
type eventRecordCheck struct {
task.Task
DB *gorm.DB
streamPath string
}
func (t *eventRecordCheck) Run() (err error) {
var eventRecordStreams []m7s.RecordStream
queryRecord := m7s.RecordStream{
EventLevel: m7s.EventLevelHigh,
Mode: m7s.RecordModeEvent,
Type: "hls",
}
t.DB.Where(&queryRecord).Find(&eventRecordStreams, "stream_path=?", t.streamPath) //搜索事件录像,且为重要事件(无法自动删除)
if len(eventRecordStreams) > 0 {
for _, recordStream := range eventRecordStreams {
var unimportantEventRecordStreams []m7s.RecordStream
queryRecord.EventLevel = m7s.EventLevelLow
query := `(start_time BETWEEN ? AND ?)
OR (end_time BETWEEN ? AND ?)
OR (? BETWEEN start_time AND end_time)
OR (? BETWEEN start_time AND end_time) AND stream_path=? `
t.DB.Where(&queryRecord).Where(query, recordStream.StartTime, recordStream.EndTime, recordStream.StartTime, recordStream.EndTime, recordStream.StartTime, recordStream.EndTime, recordStream.StreamPath).Find(&unimportantEventRecordStreams)
if len(unimportantEventRecordStreams) > 0 {
for _, unimportantEventRecordStream := range unimportantEventRecordStreams {
unimportantEventRecordStream.EventLevel = m7s.EventLevelHigh
t.DB.Save(&unimportantEventRecordStream)
}
}
}
}
return
}
func (r *Recorder) writeTailer(end time.Time) {
if r.stream.EndTime.After(r.stream.StartTime) {
return
}
r.stream.EndTime = end
if r.RecordJob.Plugin.DB != nil {
r.RecordJob.Plugin.DB.Save(&r.stream)
}
}
func (r *Recorder) Dispose() {
// 如果当前有未完成的片段,先保存
if r.ts != nil {
r.ts.Close()
}
r.writeTailer(time.Now())
}
func (r *Recorder) createNewTs() {
var oldPMT util.Buffer
if r.ts != nil {
oldPMT = r.ts.PMT
r.ts.Close()
}
var err error
r.ts, err = NewTsInFile(r.stream.FilePath)
if err != nil {
r.Error("create ts file failed", "err", err, "path", r.stream.FilePath)
return
}
if oldPMT.Len() > 0 {
r.ts.PMT = oldPMT
}
}
func (r *Recorder) writeSegment(ts time.Duration) (err error) {
if dur := ts - r.lastTs; dur >= r.RecordJob.Fragment || r.lastTs == 0 {
if dur == ts && r.lastTs == 0 { //时间戳不对的情况首个默认为2s
dur = time.Duration(2) * time.Second
}
// 如果是第一个片段,跳过写入,只记录时间戳
if r.firstSegment {
r.lastTs = ts
r.firstSegment = false
return nil
}
// 结束当前片段的记录
r.writeTailer(time.Now())
// 创建新的数据库记录
err = r.createStream(time.Now())
if err != nil {
return
}
// 创建新的ts文件
r.createNewTs()
r.segmentCount++
r.lastTs = ts
}
return
}
func (r *Recorder) Run() (err error) {
ctx := &r.RecordJob
suber := ctx.Subscriber
startTime := time.Now()
if ctx.BeforeDuration > 0 {
startTime = startTime.Add(-ctx.BeforeDuration)
}
// 创建第一个片段记录
if err = r.createStream(startTime); err != nil {
return
}
// 初始化HLS相关结构
r.createNewTs()
r.pesAudio = &mpegts.MpegtsPESFrame{
Pid: mpegts.PID_AUDIO,
}
r.pesVideo = &mpegts.MpegtsPESFrame{
Pid: mpegts.PID_VIDEO,
}
r.firstSegment = true
var audioCodec, videoCodec codec.FourCC
if suber.Publisher.HasAudioTrack() {
audioCodec = suber.Publisher.AudioTrack.FourCC()
}
if suber.Publisher.HasVideoTrack() {
videoCodec = suber.Publisher.VideoTrack.FourCC()
}
r.ts.WritePMTPacket(audioCodec, videoCodec)
return m7s.PlayBlock(suber, r.ProcessADTS, r.ProcessAnnexB)
}
func (r *Recorder) ProcessADTS(audio *pkg.ADTS) (err error) {
return r.ts.WriteAudioFrame(audio, r.pesAudio)
}
func (r *Recorder) ProcessAnnexB(video *pkg.AnnexB) (err error) {
if r.RecordJob.Subscriber.VideoReader.Value.IDR {
if err = r.writeSegment(video.GetTimestamp()); err != nil {
return
}
}
return r.ts.WriteVideoFrame(video, r.pesVideo)
}

View File

@@ -0,0 +1,187 @@
package hls
import (
"errors"
"io"
"net"
"os"
"path/filepath"
"slices"
"m7s.live/v5/pkg"
"m7s.live/v5/pkg/codec"
"m7s.live/v5/pkg/util"
mpegts "m7s.live/v5/plugin/hls/pkg/ts"
)
type TsInFile struct {
PMT util.Buffer
file *os.File
path string
closed bool
}
func NewTsInFile(path string) (*TsInFile, error) {
// 确保目录存在
dir := filepath.Dir(path)
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, err
}
file, err := os.Create(path)
if err != nil {
return nil, err
}
ts := &TsInFile{
path: path,
file: file,
}
return ts, nil
}
func (ts *TsInFile) Close() error {
if ts.closed {
return nil
}
ts.closed = true
return ts.file.Close()
}
func (ts *TsInFile) WritePMTPacket(audio, video codec.FourCC) {
ts.PMT.Reset()
mpegts.WritePMTPacket(&ts.PMT, video, audio)
// 写入PAT和PMT
ts.file.Write(mpegts.DefaultPATPacket)
ts.file.Write(ts.PMT)
}
func (ts *TsInFile) WritePESPacket(frame *mpegts.MpegtsPESFrame, packet mpegts.MpegTsPESPacket) (err error) {
if packet.Header.PacketStartCodePrefix != 0x000001 {
err = errors.New("packetStartCodePrefix != 0x000001")
return
}
var pesHeadItem util.Buffer
pesHeadItem.Reset()
_, err = mpegts.WritePESHeader(&pesHeadItem, packet.Header)
if err != nil {
return
}
pesBuffers := append(net.Buffers{pesHeadItem}, packet.Buffers...)
pesPktLength := int64(util.SizeOfBuffers(pesBuffers))
for i := 0; pesPktLength > 0; i++ {
var tsBuffer util.Buffer
tsBuffer.Reset()
tsHeader := mpegts.MpegTsHeader{
SyncByte: 0x47,
TransportErrorIndicator: 0,
PayloadUnitStartIndicator: 0,
TransportPriority: 0,
Pid: frame.Pid,
TransportScramblingControl: 0,
AdaptionFieldControl: 1,
ContinuityCounter: frame.ContinuityCounter,
}
frame.ContinuityCounter++
frame.ContinuityCounter = frame.ContinuityCounter % 16
if i == 0 {
tsHeader.PayloadUnitStartIndicator = 1
if frame.IsKeyFrame {
tsHeader.AdaptionFieldControl = 0x03
tsHeader.AdaptationFieldLength = 7
tsHeader.PCRFlag = 1
tsHeader.RandomAccessIndicator = 1
tsHeader.ProgramClockReferenceBase = frame.ProgramClockReferenceBase
}
}
if pesPktLength < mpegts.TS_PACKET_SIZE-4 {
var tsStuffingLength uint8
tsHeader.AdaptionFieldControl = 0x03
tsHeader.AdaptationFieldLength = uint8(mpegts.TS_PACKET_SIZE - 4 - 1 - pesPktLength)
if tsHeader.AdaptationFieldLength >= 1 {
tsStuffingLength = tsHeader.AdaptationFieldLength - 1
}
tsHeaderLength, err := mpegts.WriteTsHeader(&tsBuffer, tsHeader)
if err != nil {
return err
}
if tsStuffingLength > 0 {
if _, err = tsBuffer.Write(mpegts.Stuffing[:tsStuffingLength]); err != nil {
return err
}
}
tsPayloadLength := mpegts.TS_PACKET_SIZE - tsHeaderLength - int(tsStuffingLength)
written, _ := io.CopyN(&tsBuffer, &pesBuffers, int64(tsPayloadLength))
pesPktLength -= written
} else {
tsHeaderLength, err := mpegts.WriteTsHeader(&tsBuffer, tsHeader)
if err != nil {
return err
}
tsPayloadLength := mpegts.TS_PACKET_SIZE - tsHeaderLength
written, _ := io.CopyN(&tsBuffer, &pesBuffers, int64(tsPayloadLength))
pesPktLength -= written
}
// 直接写入文件
if _, err = ts.file.Write(tsBuffer); err != nil {
return err
}
}
return nil
}
func (ts *TsInFile) WriteAudioFrame(frame *pkg.ADTS, pes *mpegts.MpegtsPESFrame) (err error) {
var packet mpegts.MpegTsPESPacket
packet.Header.PesPacketLength = uint16(frame.Size + 8)
packet.Buffers = slices.Clone(frame.Buffers)
packet.Header.Pts = uint64(frame.DTS)
packet.Header.PacketStartCodePrefix = 0x000001
packet.Header.ConstTen = 0x80
packet.Header.StreamID = mpegts.STREAM_ID_AUDIO
pes.ProgramClockReferenceBase = packet.Header.Pts
packet.Header.PtsDtsFlags = 0x80
packet.Header.PesHeaderDataLength = 5
return ts.WritePESPacket(pes, packet)
}
func (ts *TsInFile) WriteVideoFrame(frame *pkg.AnnexB, pes *mpegts.MpegtsPESFrame) (err error) {
var buffer net.Buffers
if frame.Hevc {
buffer = append(buffer, codec.AudNalu)
} else {
buffer = append(buffer, codec.NALU_AUD_BYTE)
}
buffer = append(buffer, frame.Buffers...)
pktLength := util.SizeOfBuffers(buffer) + 10 + 3
if pktLength > 0xffff {
pktLength = 0
}
var packet mpegts.MpegTsPESPacket
packet.Header.PacketStartCodePrefix = 0x000001
packet.Header.ConstTen = 0x80
packet.Header.StreamID = mpegts.STREAM_ID_VIDEO
packet.Header.PesPacketLength = uint16(pktLength)
packet.Header.Pts = uint64(frame.PTS)
pes.ProgramClockReferenceBase = packet.Header.Pts
packet.Header.Dts = uint64(frame.DTS)
packet.Header.PtsDtsFlags = 0xC0
packet.Header.PesHeaderDataLength = 10
packet.Buffers = buffer
return ts.WritePESPacket(pes, packet)
}