feat: add flv recorder

This commit is contained in:
langhuihui
2025-01-07 13:14:30 +08:00
parent 0b1bd41192
commit 142d9c26a6
20 changed files with 3185 additions and 1287 deletions

View File

@@ -1,6 +1,7 @@
package flv
import (
"errors"
"io"
"os"
"strconv"
@@ -18,7 +19,7 @@ import (
type (
RecordReader struct {
m7s.RecordFilePuller
reader *FlvReader
reader *util.BufReader
}
)
@@ -41,6 +42,7 @@ func (p *RecordReader) Run() (err error) {
pullStartTime := p.PullStartTime
publisher := pullJob.Publisher
allocator := util.NewScalableMemoryAllocator(1 << 10)
var tagHeader [11]byte
var ts, tsOffset int64
var realTime time.Time
defer allocator.Recycle()
@@ -66,32 +68,28 @@ func (p *RecordReader) Run() (err error) {
if err != nil {
continue
}
p.reader = NewFlvReader(p.File)
if err = p.reader.ReadHeader(); err != nil {
p.reader = util.NewBufReader(p.File)
var head util.Memory
head, err = p.reader.ReadBytes(9)
if err != nil {
return
}
var flvHead [3]byte
var version, flag byte
err = head.NewReader().ReadByteTo(&flvHead[0], &flvHead[1], &flvHead[2], &version, &flag)
if err != nil {
return
}
if flvHead != [3]byte{'F', 'L', 'V'} {
return errors.New("not flv file")
}
startTimestamp := int64(0)
if i == 0 {
startTimestamp := pullStartTime.Sub(stream.StartTime).Milliseconds()
startTimestamp = pullStartTime.Sub(stream.StartTime).Milliseconds()
if startTimestamp < 0 {
startTimestamp = 0
}
var metaData rtmp.EcmaArray
if metaData, err = ReadMetaData(p.File); err != nil {
return
}
if keyframes, ok := metaData["keyframes"].(map[string]any); ok {
filepositions := keyframes["filepositions"].([]float64)
times := keyframes["times"].([]float64)
for i, t := range times {
if int64(t*1000) >= startTimestamp {
if _, err = p.File.Seek(int64(filepositions[i]), io.SeekStart); err != nil {
return
}
tsOffset = -int64(t * 1000)
break
}
}
}
}
for {
@@ -101,35 +99,83 @@ func (p *RecordReader) Run() (err error) {
if publisher.Paused != nil {
publisher.Paused.Await()
}
var tag *Tag
if tag, err = p.reader.ReadTag(); err != nil {
if err == io.EOF {
err = nil
break
}
if _, err = p.reader.ReadBE(4); err != nil { // previous tag size
return
}
// Read tag header (11 bytes total)
if err = p.reader.ReadNto(11, tagHeader[:]); err != nil {
return
}
ts = int64(tag.Timestamp) + tsOffset
realTime = stream.StartTime.Add(time.Duration(tag.Timestamp) * time.Millisecond)
t := tagHeader[0] // tag type (1 byte)
dataSize := int(tagHeader[1])<<16 | int(tagHeader[2])<<8 | int(tagHeader[3]) // data size (3 bytes)
timestamp := uint32(tagHeader[4])<<16 | uint32(tagHeader[5])<<8 | uint32(tagHeader[6]) // timestamp (3 bytes)
timestamp |= uint32(tagHeader[7]) << 24 // timestamp extended (1 byte)
// stream id is tagHeader[8:11] (3 bytes), always 0
var frame rtmp.RTMPData
frame.SetAllocator(allocator)
if err = p.reader.ReadNto(dataSize, frame.NextN(dataSize)); err != nil {
return
}
ts = int64(timestamp) + tsOffset
realTime = stream.StartTime.Add(time.Duration(timestamp) * time.Millisecond)
if p.MaxTS > 0 && ts > p.MaxTS {
return
}
switch tag.Type {
frame.Timestamp = uint32(ts)
switch t {
case FLV_TAG_TYPE_AUDIO:
var audioFrame rtmp.RTMPAudio
audioFrame.SetAllocator(allocator)
audioFrame.Timestamp = uint32(ts)
audioFrame.AddRecycleBytes(tag.Data[:len(tag.Data)-4]) // remove previous tag size
err = publisher.WriteAudio(&audioFrame)
if publisher.PubAudio {
err = publisher.WriteAudio(frame.WrapAudio())
}
case FLV_TAG_TYPE_VIDEO:
var videoFrame rtmp.RTMPVideo
videoFrame.SetAllocator(allocator)
videoFrame.Timestamp = uint32(ts)
videoFrame.AddRecycleBytes(tag.Data[:len(tag.Data)-4]) // remove previous tag size
err = publisher.WriteVideo(&videoFrame)
if publisher.PubVideo {
err = publisher.WriteVideo(frame.WrapVideo())
}
case FLV_TAG_TYPE_SCRIPT:
r := frame.NewReader()
amf := &rtmp.AMF{
Buffer: util.Buffer(r.ToBytes()),
}
frame.Recycle()
var obj any
if obj, err = amf.Unmarshal(); err != nil {
return
}
name := obj
if obj, err = amf.Unmarshal(); err != nil {
return
}
if i == 0 {
if metaData, ok := obj.(rtmp.EcmaArray); ok {
if keyframes, ok := metaData["keyframes"].(map[string]any); ok {
filepositions := keyframes["filepositions"].([]any)
times := keyframes["times"].([]any)
currentPos, err := p.File.Seek(0, io.SeekCurrent)
for i, t := range times {
if int64(t.(float64)*1000) >= startTimestamp {
if err != nil {
return err
}
if _, err = p.File.Seek(int64(filepositions[i].(float64)), io.SeekStart); err != nil {
return err
}
tsOffset = -int64(t.(float64) * 1000)
break
}
}
if _, err = p.File.Seek(currentPos, io.SeekStart); err != nil {
return err
}
}
}
} else {
publisher.Info("script", name, obj)
}
}
if err != nil {
return