Split video with interval(Closes #2) (#12)

This commit is contained in:
Han Gyoung-Su
2024-09-24 00:09:57 +09:00
committed by GitHub
parent c2301694db
commit c1c90c8010
9 changed files with 406 additions and 234 deletions

View File

@@ -90,6 +90,7 @@ func main() {
if conf.MP4.Record { if conf.MP4.Record {
mp4 := mp4.NewMP4(mp4.MP4Args{ mp4 := mp4.NewMP4(mp4.MP4Args{
Hub: hub, Hub: hub,
SplitIntervalMS: 3000,
}) })
err = mp4.Start(ctx, source) err = mp4.Start(ctx, source)
if err != nil { if err != nil {
@@ -99,6 +100,8 @@ func main() {
if conf.EBML.Record { if conf.EBML.Record {
webmStarter := webm.NewWEBM(webm.WebMArgs{ webmStarter := webm.NewWEBM(webm.WebMArgs{
Hub: hub, Hub: hub,
SplitIntervalMS: 6000,
StreamID: source.StreamID(),
}) })
err = webmStarter.Start(ctx, source) err = webmStarter.Start(ctx, source)
if err != nil { if err != nil {

View File

@@ -17,7 +17,7 @@ type H264Video struct {
Data []byte Data []byte
SPS []byte SPS []byte
PPS []byte PPS []byte
SliceType SliceType SliceTypes []SliceType
CodecData []byte CodecData []byte
} }

View File

@@ -5,14 +5,12 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"io"
"liveflow/media/streamer/egress/record" "liveflow/media/streamer/egress/record"
"liveflow/media/streamer/processes" "liveflow/media/streamer/processes"
"math/rand"
"os" "os"
"time"
astiav "github.com/asticode/go-astiav" astiav "github.com/asticode/go-astiav"
"github.com/deepch/vdk/codec/aacparser" "github.com/deepch/vdk/codec/aacparser"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
gomp4 "github.com/yapingcat/gomedia/go-mp4" gomp4 "github.com/yapingcat/gomedia/go-mp4"
@@ -31,56 +29,6 @@ const (
audioSampleRate = 48000 audioSampleRate = 48000
) )
type cacheWriterSeeker struct {
buf []byte
offset int
}
func newCacheWriterSeeker(capacity int) *cacheWriterSeeker {
return &cacheWriterSeeker{
buf: make([]byte, 0, capacity),
offset: 0,
}
}
func (ws *cacheWriterSeeker) Write(p []byte) (n int, err error) {
if cap(ws.buf)-ws.offset >= len(p) {
if len(ws.buf) < ws.offset+len(p) {
ws.buf = ws.buf[:ws.offset+len(p)]
}
copy(ws.buf[ws.offset:], p)
ws.offset += len(p)
return len(p), nil
}
tmp := make([]byte, len(ws.buf), cap(ws.buf)+len(p)*2)
copy(tmp, ws.buf)
if len(ws.buf) < ws.offset+len(p) {
tmp = tmp[:ws.offset+len(p)]
}
copy(tmp[ws.offset:], p)
ws.buf = tmp
ws.offset += len(p)
return len(p), nil
}
func (ws *cacheWriterSeeker) Seek(offset int64, whence int) (int64, error) {
if whence == io.SeekCurrent {
if ws.offset+int(offset) > len(ws.buf) {
return -1, errors.New(fmt.Sprint("SeekCurrent out of range", len(ws.buf), offset, ws.offset))
}
ws.offset += int(offset)
return int64(ws.offset), nil
} else if whence == io.SeekStart {
if offset > int64(len(ws.buf)) {
return -1, errors.New(fmt.Sprint("SeekStart out of range", len(ws.buf), offset, ws.offset))
}
ws.offset = int(offset)
return offset, nil
} else {
return 0, errors.New("unsupport SeekEnd")
}
}
type MP4 struct { type MP4 struct {
hub *hub.Hub hub *hub.Hub
muxer *gomp4.Movmuxer muxer *gomp4.Movmuxer
@@ -91,15 +39,24 @@ type MP4 struct {
audioIndex uint32 audioIndex uint32
mpeg4AudioConfigBytes []byte mpeg4AudioConfigBytes []byte
mpeg4AudioConfig *aacparser.MPEG4AudioConfig mpeg4AudioConfig *aacparser.MPEG4AudioConfig
streamID string
// New fields for splitting
splitIntervalMS int64
lastSplitTime int64
fileIndex int
splitPending bool // Indicates if a split is pending
} }
type MP4Args struct { type MP4Args struct {
Hub *hub.Hub Hub *hub.Hub
SplitIntervalMS int64
} }
func NewMP4(args MP4Args) *MP4 { func NewMP4(args MP4Args) *MP4 {
return &MP4{ return &MP4{
hub: args.Hub, hub: args.Hub,
splitIntervalMS: args.SplitIntervalMS,
} }
} }
@@ -110,6 +67,7 @@ func (m *MP4) Start(ctx context.Context, source hub.Source) error {
if !hub.HasCodecType(source.MediaSpecs(), hub.CodecTypeH264) { if !hub.HasCodecType(source.MediaSpecs(), hub.CodecTypeH264) {
return ErrUnsupportedCodec return ErrUnsupportedCodec
} }
m.streamID = source.StreamID()
ctx = log.WithFields(ctx, logrus.Fields{ ctx = log.WithFields(ctx, logrus.Fields{
fields.StreamID: source.StreamID(), fields.StreamID: source.StreamID(),
fields.SourceName: source.Name(), fields.SourceName: source.Name(),
@@ -118,26 +76,26 @@ func (m *MP4) Start(ctx context.Context, source hub.Source) error {
sub := m.hub.Subscribe(source.StreamID()) sub := m.hub.Subscribe(source.StreamID())
go func() { go func() {
var err error var err error
mp4File, err := record.CreateFileInDir(fmt.Sprintf("videos/%d.mp4", rand.Int()))
// Initialize the splitting logic
m.fileIndex = 0
err = m.createNewFile(ctx)
if err != nil { if err != nil {
fmt.Println(err) log.Error(ctx, err, "failed to create mp4 file")
return return
} }
defer func() { defer m.closeFile(ctx)
err := mp4File.Close()
if err != nil {
log.Error(ctx, err, "failed to close mp4 file")
}
}()
muxer, err := gomp4.CreateMp4Muxer(mp4File)
if err != nil {
fmt.Println(err)
return
}
m.muxer = muxer
var audioTranscodingProcess *processes.AudioTranscodingProcess var audioTranscodingProcess *processes.AudioTranscodingProcess
for data := range sub { for data := range sub {
// Check if we need to initiate a split
if data.H264Video != nil {
if !m.splitPending && data.H264Video.RawDTS()-m.lastSplitTime >= m.splitIntervalMS {
m.splitPending = true
}
}
if data.H264Video != nil { if data.H264Video != nil {
m.onVideo(ctx, data.H264Video) m.onVideo(ctx, data.H264Video)
} }
@@ -160,7 +118,7 @@ func (m *MP4) Start(ctx context.Context, source hub.Source) error {
} }
} }
} }
err = muxer.WriteTrailer() err = m.muxer.WriteTrailer()
if err != nil { if err != nil {
log.Error(ctx, err, "failed to write trailer") log.Error(ctx, err, "failed to write trailer")
} }
@@ -168,14 +126,84 @@ func (m *MP4) Start(ctx context.Context, source hub.Source) error {
return nil return nil
} }
// createNewFile creates a new MP4 file and initializes the muxer
func (m *MP4) createNewFile(ctx context.Context) error {
var err error
m.closeFile(ctx) // Close previous file if any
timestamp := time.Now().Format("2006-01-02-15-04-05")
fileName := fmt.Sprintf("videos/%s_%s.mp4", m.streamID, timestamp)
m.tempFile, err = record.CreateFileInDir(fileName)
if err != nil {
return err
}
m.muxer, err = gomp4.CreateMp4Muxer(m.tempFile)
if err != nil {
return err
}
m.hasVideo = false
m.hasAudio = false
m.videoIndex = 0
m.audioIndex = 0
m.lastSplitTime = 0
m.fileIndex++
return nil
}
// closeFile closes the current MP4 file and muxer
func (m *MP4) closeFile(ctx context.Context) {
if m.muxer != nil {
err := m.muxer.WriteTrailer()
if err != nil {
log.Error(ctx, err, "failed to write trailer")
}
m.muxer = nil
}
if m.tempFile != nil {
err := m.tempFile.Close()
if err != nil {
log.Error(ctx, err, "failed to close mp4 file")
}
m.tempFile = nil
}
}
// splitFile handles the logic to split the MP4 file
func (m *MP4) splitFile(ctx context.Context) error {
// Close current file
m.closeFile(ctx)
// Create a new file
return m.createNewFile(ctx)
}
func (m *MP4) onVideo(ctx context.Context, h264Video *hub.H264Video) { func (m *MP4) onVideo(ctx context.Context, h264Video *hub.H264Video) {
// Check if this is a keyframe
isKeyFrame := false
for _, sliceType := range h264Video.SliceTypes {
if sliceType == hub.SliceI {
isKeyFrame = true
break
}
}
// If a split is pending and we have a keyframe, perform the split
if m.splitPending && isKeyFrame {
err := m.splitFile(ctx)
if err != nil {
log.Error(ctx, err, "failed to split mp4 file")
return
}
m.lastSplitTime = h264Video.RawDTS()
m.splitPending = false // Reset the split pending flag
}
if !m.hasVideo { if !m.hasVideo {
m.hasVideo = true m.hasVideo = true
m.videoIndex = m.muxer.AddVideoTrack(gomp4.MP4_CODEC_H264) m.videoIndex = m.muxer.AddVideoTrack(gomp4.MP4_CODEC_H264)
} }
videoData := make([]byte, len(h264Video.Data)) videoData := make([]byte, len(h264Video.Data))
copy(videoData, h264Video.Data) copy(videoData, h264Video.Data)
err := m.muxer.Write(m.videoIndex, videoData, uint64(h264Video.RawPTS()), uint64(h264Video.RawDTS())) err := m.muxer.Write(m.videoIndex, videoData, uint64(h264Video.RawPTS()-m.lastSplitTime), uint64(h264Video.RawDTS()-m.lastSplitTime))
if err != nil { if err != nil {
log.Error(ctx, err, "failed to write video") log.Error(ctx, err, "failed to write video")
} }
@@ -201,7 +229,7 @@ func (m *MP4) onAudio(ctx context.Context, aacAudio *hub.AACAudio) {
adtsHeader := make([]byte, adtsHeaderSize) adtsHeader := make([]byte, adtsHeaderSize)
aacparser.FillADTSHeader(adtsHeader, *m.mpeg4AudioConfig, aacSamples, len(aacAudio.Data)) aacparser.FillADTSHeader(adtsHeader, *m.mpeg4AudioConfig, aacSamples, len(aacAudio.Data))
audioData = append(adtsHeader, aacAudio.Data...) audioData = append(adtsHeader, aacAudio.Data...)
err := m.muxer.Write(m.audioIndex, audioData, uint64(aacAudio.RawPTS()), uint64(aacAudio.RawDTS())) err := m.muxer.Write(m.audioIndex, audioData, uint64(aacAudio.RawPTS()-m.lastSplitTime), uint64(aacAudio.RawDTS()-m.lastSplitTime))
if err != nil { if err != nil {
log.Error(ctx, err, "failed to write audio") log.Error(ctx, err, "failed to write audio")
} }

View File

@@ -0,0 +1,57 @@
package mp4
import (
"errors"
"fmt"
"io"
)
type cacheWriterSeeker struct {
buf []byte
offset int
}
func newCacheWriterSeeker(capacity int) *cacheWriterSeeker {
return &cacheWriterSeeker{
buf: make([]byte, 0, capacity),
offset: 0,
}
}
func (ws *cacheWriterSeeker) Write(p []byte) (n int, err error) {
if cap(ws.buf)-ws.offset >= len(p) {
if len(ws.buf) < ws.offset+len(p) {
ws.buf = ws.buf[:ws.offset+len(p)]
}
copy(ws.buf[ws.offset:], p)
ws.offset += len(p)
return len(p), nil
}
tmp := make([]byte, len(ws.buf), cap(ws.buf)+len(p)*2)
copy(tmp, ws.buf)
if len(ws.buf) < ws.offset+len(p) {
tmp = tmp[:ws.offset+len(p)]
}
copy(tmp[ws.offset:], p)
ws.buf = tmp
ws.offset += len(p)
return len(p), nil
}
func (ws *cacheWriterSeeker) Seek(offset int64, whence int) (int64, error) {
if whence == io.SeekCurrent {
if ws.offset+int(offset) > len(ws.buf) {
return -1, errors.New(fmt.Sprint("SeekCurrent out of range", len(ws.buf), offset, ws.offset))
}
ws.offset += int(offset)
return int64(ws.offset), nil
} else if whence == io.SeekStart {
if offset > int64(len(ws.buf)) {
return -1, errors.New(fmt.Sprint("SeekStart out of range", len(ws.buf), offset, ws.offset))
}
ws.offset = int(offset)
return offset, nil
} else {
return 0, errors.New("unsupport SeekEnd")
}
}

View File

@@ -6,12 +6,13 @@ import (
"fmt" "fmt"
"liveflow/log" "liveflow/log"
"liveflow/media/hub" "liveflow/media/hub"
"liveflow/media/streamer/egress/record"
"liveflow/media/streamer/fields" "liveflow/media/streamer/fields"
"liveflow/media/streamer/processes" "liveflow/media/streamer/processes"
"time"
"github.com/asticode/go-astiav" "github.com/asticode/go-astiav"
"github.com/deepch/vdk/codec/aacparser" "github.com/deepch/vdk/codec/aacparser"
"github.com/pion/webrtc/v3"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
@@ -24,19 +25,28 @@ const (
) )
type WebMArgs struct { type WebMArgs struct {
Tracks map[string][]*webrtc.TrackLocalStaticRTP
Hub *hub.Hub Hub *hub.Hub
SplitIntervalMS int64 // Add SplitIntervalMS to arguments
StreamID string // Add StreamID
} }
type WebM struct { type WebM struct {
hub *hub.Hub hub *hub.Hub
webmMuxer *EBMLMuxer webmMuxer *EBMLMuxer
samples int samples int
splitIntervalMS int64
lastSplitTime int64
splitPending bool
streamID string
audioTranscodingProcess *processes.AudioTranscodingProcess
mediaSpecs []hub.MediaSpec
} }
func NewWEBM(args WebMArgs) *WebM { func NewWEBM(args WebMArgs) *WebM {
return &WebM{ return &WebM{
hub: args.Hub, hub: args.Hub,
splitIntervalMS: args.SplitIntervalMS,
streamID: args.StreamID,
} }
} }
@@ -51,59 +61,130 @@ func (w *WebM) Start(ctx context.Context, source hub.Source) error {
if err != nil { if err != nil {
return err return err
} }
w.mediaSpecs = source.MediaSpecs()
ctx = log.WithFields(ctx, logrus.Fields{ ctx = log.WithFields(ctx, logrus.Fields{
fields.StreamID: source.StreamID(), fields.StreamID: source.StreamID(),
fields.SourceName: source.Name(), fields.SourceName: source.Name(),
}) })
muxer := NewEBMLMuxer(int(audioClockRate), 2, ContainerMKV)
err = muxer.Init(ctx)
if err != nil {
return err
}
log.Info(ctx, "start webm") log.Info(ctx, "start webm")
sub := w.hub.Subscribe(source.StreamID()) sub := w.hub.Subscribe(source.StreamID())
go func() { go func() {
var audioTranscodingProcess *processes.AudioTranscodingProcess // Initialize splitting logic
err := w.createNewMuxer(ctx, int(audioClockRate))
if err != nil {
log.Error(ctx, err, "failed to create webm muxer")
return
}
// Initialize audio transcoding process if needed
if hub.HasCodecType(source.MediaSpecs(), hub.CodecTypeAAC) {
w.audioTranscodingProcess = processes.NewTranscodingProcess(astiav.CodecIDAac, astiav.CodecIDOpus, audioSampleRate)
w.audioTranscodingProcess.Init()
defer w.audioTranscodingProcess.Close()
}
for data := range sub { for data := range sub {
// Check if we need to initiate a split
if data.H264Video != nil { if data.H264Video != nil {
w.onVideo(ctx, muxer, data.H264Video) if !w.splitPending && data.H264Video.RawDTS()-w.lastSplitTime >= w.splitIntervalMS {
w.splitPending = true
}
w.onVideo(ctx, data.H264Video)
} }
if data.AACAudio != nil { if data.AACAudio != nil {
if audioTranscodingProcess == nil { w.onAACAudio(ctx, data.AACAudio)
audioTranscodingProcess = processes.NewTranscodingProcess(astiav.CodecIDAac, astiav.CodecIDOpus, audioSampleRate)
audioTranscodingProcess.Init()
defer audioTranscodingProcess.Close()
}
w.onAACAudio(ctx, muxer, data.AACAudio, audioTranscodingProcess)
} else if data.OPUSAudio != nil { } else if data.OPUSAudio != nil {
w.onAudio(ctx, muxer, data.OPUSAudio) w.onAudio(ctx, data.OPUSAudio)
} }
} }
err = muxer.Finalize(ctx) // Ensure the muxer is finalized
if err != nil { w.closeMuxer(ctx)
log.Error(ctx, err, "failed to finalize")
}
}() }()
return nil return nil
} }
func (w *WebM) onVideo(ctx context.Context, muxer *EBMLMuxer, data *hub.H264Video) { // createNewMuxer initializes a new EBMLMuxer
keyFrame := data.SliceType == hub.SliceI func (w *WebM) createNewMuxer(ctx context.Context, audioClockRate int) error {
err := muxer.WriteVideo(data.Data, keyFrame, uint64(data.RawPTS()), uint64(data.RawDTS())) // Initialize new muxer
w.webmMuxer = NewEBMLMuxer(audioClockRate, 2, ContainerMKV)
err := w.webmMuxer.Init(ctx)
if err != nil {
return err
}
return nil
}
// closeMuxer finalizes the current muxer and writes to the output file
func (w *WebM) closeMuxer(ctx context.Context) {
if w.webmMuxer != nil {
// Create output file with timestamp
timestamp := time.Now().Format("2006-01-02-15-04-05")
fileName := fmt.Sprintf("videos/%s_%s.mkv", w.streamID, timestamp)
outputFile, err := record.CreateFileInDir(fileName)
if err != nil {
log.Error(ctx, err, "failed to create output file")
return
}
defer outputFile.Close()
// Finalize muxer with output file
err = w.webmMuxer.Finalize(ctx, outputFile)
if err != nil {
log.Error(ctx, err, "failed to finalize muxer")
}
w.webmMuxer = nil
}
}
// splitMuxer handles the logic to split the WebM file
func (w *WebM) splitMuxer(ctx context.Context) error {
// Close current muxer
w.closeMuxer(ctx)
// Create a new muxer
audioClockRate, err := hub.AudioClockRate(w.mediaSpecs)
if err != nil {
return err
}
return w.createNewMuxer(ctx, int(audioClockRate))
}
func (w *WebM) onVideo(ctx context.Context, data *hub.H264Video) {
keyFrame := false
for _, sliceType := range data.SliceTypes {
if sliceType == hub.SliceI {
keyFrame = true
break
}
}
// If a split is pending and we have a keyframe, perform the split
if w.splitPending && keyFrame {
err := w.splitMuxer(ctx)
if err != nil {
log.Error(ctx, err, "failed to split webm file")
return
}
w.lastSplitTime = data.RawDTS()
w.splitPending = false // Reset the split pending flag
}
err := w.webmMuxer.WriteVideo(data.Data, keyFrame, uint64(data.RawPTS()-w.lastSplitTime), uint64(data.RawDTS()-w.lastSplitTime))
if err != nil { if err != nil {
log.Error(ctx, err, "failed to write video") log.Error(ctx, err, "failed to write video")
} }
} }
func (w *WebM) onAudio(ctx context.Context, muxer *EBMLMuxer, data *hub.OPUSAudio) { func (w *WebM) onAudio(ctx context.Context, data *hub.OPUSAudio) {
err := muxer.WriteAudio(data.Data, false, uint64(data.RawPTS()), uint64(data.RawDTS())) fmt.Println("dts: ", data.RawDTS())
err := w.webmMuxer.WriteAudio(data.Data, false, uint64(data.RawPTS()-w.lastSplitTime), uint64(data.RawDTS()-w.lastSplitTime))
if err != nil { if err != nil {
log.Error(ctx, err, "failed to write audio") log.Error(ctx, err, "failed to write audio")
} }
} }
func (w *WebM) onAACAudio(ctx context.Context, muxer *EBMLMuxer, aac *hub.AACAudio, transcodingProcess *processes.AudioTranscodingProcess) { func (w *WebM) onAACAudio(ctx context.Context, aac *hub.AACAudio) {
if len(aac.Data) == 0 { if len(aac.Data) == 0 {
log.Warn(ctx, "no data") log.Warn(ctx, "no data")
return return
@@ -120,7 +201,7 @@ func (w *WebM) onAACAudio(ctx context.Context, muxer *EBMLMuxer, aac *hub.AACAud
aacparser.FillADTSHeader(adtsHeader, *aac.MPEG4AudioConfig, aacSamples, len(aac.Data)) aacparser.FillADTSHeader(adtsHeader, *aac.MPEG4AudioConfig, aacSamples, len(aac.Data))
audioDataWithADTS := append(adtsHeader, aac.Data...) audioDataWithADTS := append(adtsHeader, aac.Data...)
packets, err := transcodingProcess.Process(&processes.MediaPacket{ packets, err := w.audioTranscodingProcess.Process(&processes.MediaPacket{
Data: audioDataWithADTS, Data: audioDataWithADTS,
PTS: aac.PTS, PTS: aac.PTS,
DTS: aac.DTS, DTS: aac.DTS,
@@ -129,7 +210,7 @@ func (w *WebM) onAACAudio(ctx context.Context, muxer *EBMLMuxer, aac *hub.AACAud
fmt.Println(err) fmt.Println(err)
} }
for _, packet := range packets { for _, packet := range packets {
w.onAudio(ctx, muxer, &hub.OPUSAudio{ w.onAudio(ctx, &hub.OPUSAudio{
Data: packet.Data, Data: packet.Data,
PTS: packet.PTS, PTS: packet.PTS,
DTS: packet.DTS, DTS: packet.DTS,

View File

@@ -5,10 +5,9 @@ import (
"encoding/binary" "encoding/binary"
"errors" "errors"
"fmt" "fmt"
"liveflow/log" "io"
"liveflow/media/streamer/egress/record" "io/ioutil"
"math" "math"
"math/rand"
"os" "os"
"github.com/at-wat/ebml-go" "github.com/at-wat/ebml-go"
@@ -44,11 +43,11 @@ const (
type EBMLMuxer struct { type EBMLMuxer struct {
writers []mkvcore.BlockWriteCloser writers []mkvcore.BlockWriteCloser
tempFile *os.File
container Name container Name
tempFileName string
audioSampleRate float64 audioSampleRate float64
audioChannels uint64 audioChannels uint64
durationPos uint64 durationPos int64
duration int64 duration int64
audioStreamIndex int audioStreamIndex int
videoStreamIndex int videoStreamIndex int
@@ -57,7 +56,6 @@ type EBMLMuxer struct {
func NewEBMLMuxer(sampleRate int, channels int, container Name) *EBMLMuxer { func NewEBMLMuxer(sampleRate int, channels int, container Name) *EBMLMuxer {
return &EBMLMuxer{ return &EBMLMuxer{
writers: nil, writers: nil,
tempFileName: "",
audioSampleRate: float64(sampleRate), audioSampleRate: float64(sampleRate),
audioChannels: uint64(channels), audioChannels: uint64(channels),
durationPos: 0, durationPos: 0,
@@ -71,12 +69,9 @@ func (w *EBMLMuxer) makeWebmWriters() ([]mkvcore.BlockWriteCloser, error) {
trackTypeVideo = 1 trackTypeVideo = 1
trackTypeAudio = 2 trackTypeAudio = 2
) )
tempFile, err := record.CreateFileInDir(fmt.Sprintf("videos/%d.webm", rand.Int()))
if err != nil {
return nil, err
}
w.audioStreamIndex = 0 w.audioStreamIndex = 0
w.videoStreamIndex = 1 w.videoStreamIndex = 1
trackEntries := []webm.TrackEntry{ trackEntries := []webm.TrackEntry{
{ {
Name: trackNameAudio, Name: trackNameAudio,
@@ -101,31 +96,30 @@ func (w *EBMLMuxer) makeWebmWriters() ([]mkvcore.BlockWriteCloser, error) {
}, },
}, },
} }
writers, err := webm.NewSimpleBlockWriter(tempFile, trackEntries,
var err error
w.tempFile, err = ioutil.TempFile("", "ebmlmuxer-*.webm")
if err != nil {
return nil, err
}
writers, err := webm.NewSimpleBlockWriter(w.tempFile, trackEntries,
mkvcore.WithSeekHead(true), mkvcore.WithSeekHead(true),
mkvcore.WithOnErrorHandler(func(err error) {
log.Error(context.Background(), err, "failed to construct webm writer (error)")
}),
mkvcore.WithOnFatalHandler(func(err error) {
log.Error(context.Background(), err, "failed to construct webm writer (fatal)")
}),
mkvcore.WithSegmentInfo(&webm.Info{ mkvcore.WithSegmentInfo(&webm.Info{
TimecodeScale: defaultTimecode, // 1ms TimecodeScale: defaultTimecode, // 1ms
MuxingApp: "mrw-v4.ebml-go.webm", MuxingApp: "your_app_name",
WritingApp: "mrw-v4.ebml-go.webm", WritingApp: "your_app_name",
Duration: defaultDuration, // Arbitrarily set to default videoSplitIntervalMs, final value is adjusted in writeTrailer. Duration: defaultDuration, // Placeholder duration; final value is adjusted in overwritePTS.
}), }),
mkvcore.WithMarshalOptions(ebml.WithElementWriteHooks(func(e *ebml.Element) { mkvcore.WithMarshalOptions(ebml.WithElementWriteHooks(func(e *ebml.Element) {
switch e.Name { if e.Name == "Duration" {
case "Duration": w.durationPos = int64(e.Position + 4) // Adjust position to overwrite duration later.
w.durationPos = e.Position + 4 // Duration header size = 3, SegmentInfo header size delta = 1
} }
})), })),
) )
if err != nil { if err != nil {
return nil, err return nil, err
} }
w.tempFileName = tempFile.Name()
var mkvWriters []mkvcore.BlockWriteCloser var mkvWriters []mkvcore.BlockWriteCloser
for _, writer := range writers { for _, writer := range writers {
mkvWriters = append(mkvWriters, writer) mkvWriters = append(mkvWriters, writer)
@@ -138,14 +132,11 @@ func (w *EBMLMuxer) makeMKVWriters() ([]mkvcore.BlockWriteCloser, error) {
trackTypeVideo = 1 trackTypeVideo = 1
trackTypeAudio = 2 trackTypeAudio = 2
) )
tempFile, err := record.CreateFileInDir(fmt.Sprintf("videos/%d.mkv", rand.Int()))
if err != nil {
return nil, err
}
var mkvTrackDesc []mkvcore.TrackDescription
w.audioStreamIndex = 0 w.audioStreamIndex = 0
w.videoStreamIndex = 1 w.videoStreamIndex = 1
mkvTrackDesc = append(mkvTrackDesc, mkvcore.TrackDescription{
mkvTrackDesc := []mkvcore.TrackDescription{
{
TrackNumber: 1, TrackNumber: 1,
TrackEntry: webm.TrackEntry{ TrackEntry: webm.TrackEntry{
Name: trackNameAudio, Name: trackNameAudio,
@@ -155,50 +146,48 @@ func (w *EBMLMuxer) makeMKVWriters() ([]mkvcore.BlockWriteCloser, error) {
TrackType: trackTypeAudio, TrackType: trackTypeAudio,
Audio: &webm.Audio{ Audio: &webm.Audio{
SamplingFrequency: w.audioSampleRate, SamplingFrequency: w.audioSampleRate,
Channels: 2, Channels: w.audioChannels,
}, },
}, },
}) },
mkvTrackDesc = append(mkvTrackDesc, mkvcore.TrackDescription{ {
TrackNumber: webmVideoTrackNumber, TrackNumber: webmVideoTrackNumber,
TrackEntry: webm.TrackEntry{ TrackEntry: webm.TrackEntry{
TrackNumber: webmVideoTrackNumber, TrackNumber: webmVideoTrackNumber,
TrackUID: webmVideoTrackNumber, TrackUID: webmVideoTrackNumber,
TrackType: trackTypeVideo, TrackType: trackTypeVideo,
DefaultDuration: 0,
Name: trackNameVideo, Name: trackNameVideo,
CodecID: codecIDH264, CodecID: codecIDH264,
SeekPreRoll: 0, DefaultDuration: 0,
// TODO: The resolution may need to be written later, but it works fine without it for now.
//Video: &webm.Video{
// PixelWidth: 1280,
// PixelHeight: 720,
//},
}, },
}) },
var mkvWriters []mkvcore.BlockWriteCloser }
mkvWriters, err = mkvcore.NewSimpleBlockWriter(tempFile, mkvTrackDesc,
var err error
w.tempFile, err = ioutil.TempFile("/tmp", "ebmlmuxer-*.mkv")
if err != nil {
return nil, err
}
writers, err := mkvcore.NewSimpleBlockWriter(w.tempFile, mkvTrackDesc,
mkvcore.WithSeekHead(true), mkvcore.WithSeekHead(true),
mkvcore.WithEBMLHeader(mkv.DefaultEBMLHeader), mkvcore.WithEBMLHeader(mkv.DefaultEBMLHeader),
mkvcore.WithSegmentInfo(&webm.Info{ mkvcore.WithSegmentInfo(&webm.Info{
TimecodeScale: defaultTimecode, // 1ms TimecodeScale: defaultTimecode,
MuxingApp: "mrw-v4.ebml-go.mkv", MuxingApp: "your_app_name",
WritingApp: "mrw-v4.ebml-go.mkv", WritingApp: "your_app_name",
Duration: defaultDuration, // Arbitrarily set to default videoSplitIntervalMs, final value is adjusted in writeTrailer. Duration: defaultDuration,
}), }),
mkvcore.WithBlockInterceptor(webm.DefaultBlockInterceptor),
mkvcore.WithMarshalOptions(ebml.WithElementWriteHooks(func(e *ebml.Element) { mkvcore.WithMarshalOptions(ebml.WithElementWriteHooks(func(e *ebml.Element) {
switch e.Name { if e.Name == "Duration" {
case "Duration": w.durationPos = int64(e.Position + 4)
w.durationPos = e.Position + 4 // Duration header size = 3, SegmentInfo header size delta = 1
} }
})), })),
) )
if err != nil { if err != nil {
return nil, err return nil, err
} }
w.tempFileName = tempFile.Name() return writers, nil
return mkvWriters, nil
} }
func (w *EBMLMuxer) Init(_ context.Context) error { func (w *EBMLMuxer) Init(_ context.Context) error {
@@ -238,53 +227,52 @@ func (w *EBMLMuxer) WriteAudio(data []byte, keyframe bool, pts uint64, _ uint64)
return nil return nil
} }
func (w *EBMLMuxer) Finalize(ctx context.Context) error { func (w *EBMLMuxer) Finalize(ctx context.Context, output io.Writer) error {
defer func() { defer w.cleanup()
w.cleanup()
}() if err := w.overwritePTS(); err != nil {
log.Info(ctx, "finalize webm muxer") return fmt.Errorf("overwrite PTS error: %w", err)
fileName := w.tempFileName }
// Copy the data from the temporary file to the output writer
if _, err := w.tempFile.Seek(0, io.SeekStart); err != nil {
return fmt.Errorf("seek error: %w", err)
}
if _, err := io.Copy(output, w.tempFile); err != nil {
return fmt.Errorf("copy error: %w", err)
}
for _, writer := range w.writers { for _, writer := range w.writers {
if err := writer.Close(); err != nil { if err := writer.Close(); err != nil {
log.Error(ctx, err, "failed to close writer") return fmt.Errorf("writer close error: %w", err)
} }
} }
if err := w.overwritePTS(ctx, fileName); err != nil {
return err
}
return nil return nil
} }
func (w *EBMLMuxer) ContainerName() string { func (w *EBMLMuxer) overwritePTS() error {
return string(w.container) ptsBytes := make([]byte, 8)
} binary.BigEndian.PutUint64(ptsBytes, math.Float64bits(float64(w.duration)))
if _, err := w.tempFile.Seek(w.durationPos, io.SeekStart); err != nil {
func (w *EBMLMuxer) overwritePTS(ctx context.Context, fileName string) error {
tempFile, err := os.OpenFile(fileName, os.O_RDWR, 0o600)
if err != nil {
return err return err
} }
defer func() { if _, err := w.tempFile.Write(ptsBytes); err != nil {
if err := tempFile.Close(); err != nil {
log.Error(ctx, err, "failed to close temp file")
}
}()
ptsBytes, _ := EncodeFloat64(float64(w.duration))
if _, err := tempFile.WriteAt(ptsBytes, int64(w.durationPos)); err != nil {
return err return err
} }
return nil return nil
} }
func (w *EBMLMuxer) cleanup() { func (w *EBMLMuxer) cleanup() {
if w.tempFile != nil {
w.tempFile.Close()
//os.Remove(w.tempFile.Name())
w.tempFile = nil
}
w.writers = nil w.writers = nil
w.tempFileName = ""
w.duration = 0 w.duration = 0
w.durationPos = 0 w.durationPos = 0
} }
func EncodeFloat64(i float64) ([]byte, error) { func (w *EBMLMuxer) ContainerName() string {
b := make([]byte, 8) return string(w.container)
binary.BigEndian.PutUint64(b, math.Float64bits(i))
return b, nil
} }

View File

@@ -0,0 +1,35 @@
package ingress
import (
"liveflow/media/hub"
"github.com/deepch/vdk/codec/h264parser"
)
func SliceTypes(payload []byte) []hub.SliceType {
nalus, _ := h264parser.SplitNALUs(payload)
slices := make([]hub.SliceType, 0)
for _, nalu := range nalus {
if len(nalu) < 1 {
continue
}
nalUnitType := nalu[0] & 0x1f
switch nalUnitType {
case h264parser.NALU_SPS:
slices = append(slices, hub.SliceSPS)
case h264parser.NALU_PPS:
slices = append(slices, hub.SlicePPS)
default:
sliceType, _ := h264parser.ParseSliceHeaderFromNALU(nalu)
switch sliceType {
case h264parser.SLICE_I:
slices = append(slices, hub.SliceI)
case h264parser.SLICE_P:
slices = append(slices, hub.SliceP)
case h264parser.SLICE_B:
slices = append(slices, hub.SliceB)
}
}
}
return slices
}

View File

@@ -5,6 +5,7 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"liveflow/media/streamer/ingress"
"os" "os"
"path/filepath" "path/filepath"
@@ -335,6 +336,7 @@ func (h *Handler) publishVideoData(timestamp uint32, compositionTime int32, vide
dts := int64(timestamp) dts := int64(timestamp)
pts := int64(compositionTime) + dts pts := int64(compositionTime) + dts
sliceTypes := ingress.SliceTypes(videoDataToSend)
h.hub.Publish(h.streamID, &hub.FrameData{ h.hub.Publish(h.streamID, &hub.FrameData{
H264Video: &hub.H264Video{ H264Video: &hub.H264Video{
VideoClockRate: 90000, VideoClockRate: 90000,
@@ -344,6 +346,7 @@ func (h *Handler) publishVideoData(timestamp uint32, compositionTime int32, vide
SPS: h.sps, SPS: h.sps,
PPS: h.pps, PPS: h.pps,
CodecData: nil, CodecData: nil,
SliceTypes: sliceTypes,
}, },
}) })
} }

View File

@@ -4,11 +4,11 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"liveflow/media/streamer/ingress"
"net/http" "net/http"
"strings" "strings"
"time" "time"
"github.com/deepch/vdk/codec/h264parser"
"github.com/labstack/echo/v4" "github.com/labstack/echo/v4"
"github.com/pion/rtp" "github.com/pion/rtp"
"github.com/pion/rtp/codecs" "github.com/pion/rtp/codecs"
@@ -227,30 +227,7 @@ func (w *WebRTCHandler) onVideo(ctx context.Context, packets []*rtp.Packet) erro
return nil return nil
} }
pts := w.videoTimestampGen.Generate(int64(packets[0].Timestamp)) pts := w.videoTimestampGen.Generate(int64(packets[0].Timestamp))
nalus, _ := h264parser.SplitNALUs(payload) sliceTypes := ingress.SliceTypes(payload)
var slice hub.SliceType
for _, nalu := range nalus {
if len(nalu) < 1 {
continue
}
nalUnitType := nalu[0] & 0x1f
switch nalUnitType {
case h264parser.NALU_SPS:
slice = hub.SliceSPS
case h264parser.NALU_PPS:
slice = hub.SlicePPS
default:
sliceType, _ := h264parser.ParseSliceHeaderFromNALU(nalu)
switch sliceType {
case h264parser.SLICE_I:
slice = hub.SliceI
case h264parser.SLICE_P:
slice = hub.SliceP
case h264parser.SLICE_B:
slice = hub.SliceB
}
}
}
w.hub.Publish(w.streamID, &hub.FrameData{ w.hub.Publish(w.streamID, &hub.FrameData{
H264Video: &hub.H264Video{ H264Video: &hub.H264Video{
PTS: pts, PTS: pts,
@@ -259,7 +236,7 @@ func (w *WebRTCHandler) onVideo(ctx context.Context, packets []*rtp.Packet) erro
Data: payload, Data: payload,
SPS: nil, SPS: nil,
PPS: nil, PPS: nil,
SliceType: slice, SliceTypes: sliceTypes,
CodecData: nil, CodecData: nil,
}, },
AACAudio: nil, AACAudio: nil,