diff --git a/main.go b/main.go index 3c4508c..f1d5c29 100644 --- a/main.go +++ b/main.go @@ -89,7 +89,8 @@ func main() { log.Infof(ctx, "New streamID received: %s", source.StreamID()) if conf.MP4.Record { mp4 := mp4.NewMP4(mp4.MP4Args{ - Hub: hub, + Hub: hub, + SplitIntervalMS: 3000, }) err = mp4.Start(ctx, source) if err != nil { @@ -98,7 +99,9 @@ func main() { } if conf.EBML.Record { webmStarter := webm.NewWEBM(webm.WebMArgs{ - Hub: hub, + Hub: hub, + SplitIntervalMS: 6000, + StreamID: source.StreamID(), }) err = webmStarter.Start(ctx, source) if err != nil { diff --git a/media/hub/dto.go b/media/hub/dto.go index 831a5dc..39abb5a 100644 --- a/media/hub/dto.go +++ b/media/hub/dto.go @@ -17,7 +17,7 @@ type H264Video struct { Data []byte SPS []byte PPS []byte - SliceType SliceType + SliceTypes []SliceType CodecData []byte } diff --git a/media/streamer/egress/record/mp4/handler.go b/media/streamer/egress/record/mp4/handler.go index 5d702e0..bb17cdc 100644 --- a/media/streamer/egress/record/mp4/handler.go +++ b/media/streamer/egress/record/mp4/handler.go @@ -5,14 +5,12 @@ import ( "context" "errors" "fmt" - "io" "liveflow/media/streamer/egress/record" "liveflow/media/streamer/processes" - "math/rand" "os" + "time" astiav "github.com/asticode/go-astiav" - "github.com/deepch/vdk/codec/aacparser" "github.com/sirupsen/logrus" gomp4 "github.com/yapingcat/gomedia/go-mp4" @@ -31,56 +29,6 @@ const ( audioSampleRate = 48000 ) -type cacheWriterSeeker struct { - buf []byte - offset int -} - -func newCacheWriterSeeker(capacity int) *cacheWriterSeeker { - return &cacheWriterSeeker{ - buf: make([]byte, 0, capacity), - offset: 0, - } -} - -func (ws *cacheWriterSeeker) Write(p []byte) (n int, err error) { - if cap(ws.buf)-ws.offset >= len(p) { - if len(ws.buf) < ws.offset+len(p) { - ws.buf = ws.buf[:ws.offset+len(p)] - } - copy(ws.buf[ws.offset:], p) - ws.offset += len(p) - return len(p), nil - } - tmp := make([]byte, len(ws.buf), cap(ws.buf)+len(p)*2) - copy(tmp, ws.buf) - if len(ws.buf) < ws.offset+len(p) { - tmp = tmp[:ws.offset+len(p)] - } - copy(tmp[ws.offset:], p) - ws.buf = tmp - ws.offset += len(p) - return len(p), nil -} - -func (ws *cacheWriterSeeker) Seek(offset int64, whence int) (int64, error) { - if whence == io.SeekCurrent { - if ws.offset+int(offset) > len(ws.buf) { - return -1, errors.New(fmt.Sprint("SeekCurrent out of range", len(ws.buf), offset, ws.offset)) - } - ws.offset += int(offset) - return int64(ws.offset), nil - } else if whence == io.SeekStart { - if offset > int64(len(ws.buf)) { - return -1, errors.New(fmt.Sprint("SeekStart out of range", len(ws.buf), offset, ws.offset)) - } - ws.offset = int(offset) - return offset, nil - } else { - return 0, errors.New("unsupport SeekEnd") - } -} - type MP4 struct { hub *hub.Hub muxer *gomp4.Movmuxer @@ -91,15 +39,24 @@ type MP4 struct { audioIndex uint32 mpeg4AudioConfigBytes []byte mpeg4AudioConfig *aacparser.MPEG4AudioConfig + streamID string + + // New fields for splitting + splitIntervalMS int64 + lastSplitTime int64 + fileIndex int + splitPending bool // Indicates if a split is pending } type MP4Args struct { - Hub *hub.Hub + Hub *hub.Hub + SplitIntervalMS int64 } func NewMP4(args MP4Args) *MP4 { return &MP4{ - hub: args.Hub, + hub: args.Hub, + splitIntervalMS: args.SplitIntervalMS, } } @@ -110,6 +67,7 @@ func (m *MP4) Start(ctx context.Context, source hub.Source) error { if !hub.HasCodecType(source.MediaSpecs(), hub.CodecTypeH264) { return ErrUnsupportedCodec } + m.streamID = source.StreamID() ctx = log.WithFields(ctx, logrus.Fields{ fields.StreamID: source.StreamID(), fields.SourceName: source.Name(), @@ -118,26 +76,26 @@ func (m *MP4) Start(ctx context.Context, source hub.Source) error { sub := m.hub.Subscribe(source.StreamID()) go func() { var err error - mp4File, err := record.CreateFileInDir(fmt.Sprintf("videos/%d.mp4", rand.Int())) + + // Initialize the splitting logic + m.fileIndex = 0 + err = m.createNewFile(ctx) if err != nil { - fmt.Println(err) + log.Error(ctx, err, "failed to create mp4 file") return } - defer func() { - err := mp4File.Close() - if err != nil { - log.Error(ctx, err, "failed to close mp4 file") - } - }() - muxer, err := gomp4.CreateMp4Muxer(mp4File) - if err != nil { - fmt.Println(err) - return - } - m.muxer = muxer + defer m.closeFile(ctx) var audioTranscodingProcess *processes.AudioTranscodingProcess for data := range sub { + // Check if we need to initiate a split + if data.H264Video != nil { + if !m.splitPending && data.H264Video.RawDTS()-m.lastSplitTime >= m.splitIntervalMS { + m.splitPending = true + } + + } + if data.H264Video != nil { m.onVideo(ctx, data.H264Video) } @@ -160,7 +118,7 @@ func (m *MP4) Start(ctx context.Context, source hub.Source) error { } } } - err = muxer.WriteTrailer() + err = m.muxer.WriteTrailer() if err != nil { log.Error(ctx, err, "failed to write trailer") } @@ -168,14 +126,84 @@ func (m *MP4) Start(ctx context.Context, source hub.Source) error { return nil } +// createNewFile creates a new MP4 file and initializes the muxer +func (m *MP4) createNewFile(ctx context.Context) error { + var err error + m.closeFile(ctx) // Close previous file if any + timestamp := time.Now().Format("2006-01-02-15-04-05") + fileName := fmt.Sprintf("videos/%s_%s.mp4", m.streamID, timestamp) + m.tempFile, err = record.CreateFileInDir(fileName) + if err != nil { + return err + } + m.muxer, err = gomp4.CreateMp4Muxer(m.tempFile) + if err != nil { + return err + } + m.hasVideo = false + m.hasAudio = false + m.videoIndex = 0 + m.audioIndex = 0 + m.lastSplitTime = 0 + m.fileIndex++ + return nil +} + +// closeFile closes the current MP4 file and muxer +func (m *MP4) closeFile(ctx context.Context) { + if m.muxer != nil { + err := m.muxer.WriteTrailer() + if err != nil { + log.Error(ctx, err, "failed to write trailer") + } + m.muxer = nil + } + if m.tempFile != nil { + err := m.tempFile.Close() + if err != nil { + log.Error(ctx, err, "failed to close mp4 file") + } + m.tempFile = nil + } +} + +// splitFile handles the logic to split the MP4 file +func (m *MP4) splitFile(ctx context.Context) error { + // Close current file + m.closeFile(ctx) + // Create a new file + return m.createNewFile(ctx) +} + func (m *MP4) onVideo(ctx context.Context, h264Video *hub.H264Video) { + // Check if this is a keyframe + isKeyFrame := false + for _, sliceType := range h264Video.SliceTypes { + if sliceType == hub.SliceI { + isKeyFrame = true + break + } + } + + // If a split is pending and we have a keyframe, perform the split + if m.splitPending && isKeyFrame { + err := m.splitFile(ctx) + if err != nil { + log.Error(ctx, err, "failed to split mp4 file") + return + } + m.lastSplitTime = h264Video.RawDTS() + m.splitPending = false // Reset the split pending flag + } + if !m.hasVideo { m.hasVideo = true m.videoIndex = m.muxer.AddVideoTrack(gomp4.MP4_CODEC_H264) } + videoData := make([]byte, len(h264Video.Data)) copy(videoData, h264Video.Data) - err := m.muxer.Write(m.videoIndex, videoData, uint64(h264Video.RawPTS()), uint64(h264Video.RawDTS())) + err := m.muxer.Write(m.videoIndex, videoData, uint64(h264Video.RawPTS()-m.lastSplitTime), uint64(h264Video.RawDTS()-m.lastSplitTime)) if err != nil { log.Error(ctx, err, "failed to write video") } @@ -201,7 +229,7 @@ func (m *MP4) onAudio(ctx context.Context, aacAudio *hub.AACAudio) { adtsHeader := make([]byte, adtsHeaderSize) aacparser.FillADTSHeader(adtsHeader, *m.mpeg4AudioConfig, aacSamples, len(aacAudio.Data)) audioData = append(adtsHeader, aacAudio.Data...) - err := m.muxer.Write(m.audioIndex, audioData, uint64(aacAudio.RawPTS()), uint64(aacAudio.RawDTS())) + err := m.muxer.Write(m.audioIndex, audioData, uint64(aacAudio.RawPTS()-m.lastSplitTime), uint64(aacAudio.RawDTS()-m.lastSplitTime)) if err != nil { log.Error(ctx, err, "failed to write audio") } diff --git a/media/streamer/egress/record/mp4/writerseeker.go b/media/streamer/egress/record/mp4/writerseeker.go new file mode 100644 index 0000000..a8e95f7 --- /dev/null +++ b/media/streamer/egress/record/mp4/writerseeker.go @@ -0,0 +1,57 @@ +package mp4 + +import ( + "errors" + "fmt" + "io" +) + +type cacheWriterSeeker struct { + buf []byte + offset int +} + +func newCacheWriterSeeker(capacity int) *cacheWriterSeeker { + return &cacheWriterSeeker{ + buf: make([]byte, 0, capacity), + offset: 0, + } +} + +func (ws *cacheWriterSeeker) Write(p []byte) (n int, err error) { + if cap(ws.buf)-ws.offset >= len(p) { + if len(ws.buf) < ws.offset+len(p) { + ws.buf = ws.buf[:ws.offset+len(p)] + } + copy(ws.buf[ws.offset:], p) + ws.offset += len(p) + return len(p), nil + } + tmp := make([]byte, len(ws.buf), cap(ws.buf)+len(p)*2) + copy(tmp, ws.buf) + if len(ws.buf) < ws.offset+len(p) { + tmp = tmp[:ws.offset+len(p)] + } + copy(tmp[ws.offset:], p) + ws.buf = tmp + ws.offset += len(p) + return len(p), nil +} + +func (ws *cacheWriterSeeker) Seek(offset int64, whence int) (int64, error) { + if whence == io.SeekCurrent { + if ws.offset+int(offset) > len(ws.buf) { + return -1, errors.New(fmt.Sprint("SeekCurrent out of range", len(ws.buf), offset, ws.offset)) + } + ws.offset += int(offset) + return int64(ws.offset), nil + } else if whence == io.SeekStart { + if offset > int64(len(ws.buf)) { + return -1, errors.New(fmt.Sprint("SeekStart out of range", len(ws.buf), offset, ws.offset)) + } + ws.offset = int(offset) + return offset, nil + } else { + return 0, errors.New("unsupport SeekEnd") + } +} diff --git a/media/streamer/egress/record/webm/handler.go b/media/streamer/egress/record/webm/handler.go index 13fa767..5c4b09e 100644 --- a/media/streamer/egress/record/webm/handler.go +++ b/media/streamer/egress/record/webm/handler.go @@ -6,12 +6,13 @@ import ( "fmt" "liveflow/log" "liveflow/media/hub" + "liveflow/media/streamer/egress/record" "liveflow/media/streamer/fields" "liveflow/media/streamer/processes" + "time" "github.com/asticode/go-astiav" "github.com/deepch/vdk/codec/aacparser" - "github.com/pion/webrtc/v3" "github.com/sirupsen/logrus" ) @@ -24,19 +25,28 @@ const ( ) type WebMArgs struct { - Tracks map[string][]*webrtc.TrackLocalStaticRTP - Hub *hub.Hub + Hub *hub.Hub + SplitIntervalMS int64 // Add SplitIntervalMS to arguments + StreamID string // Add StreamID } type WebM struct { - hub *hub.Hub - webmMuxer *EBMLMuxer - samples int + hub *hub.Hub + webmMuxer *EBMLMuxer + samples int + splitIntervalMS int64 + lastSplitTime int64 + splitPending bool + streamID string + audioTranscodingProcess *processes.AudioTranscodingProcess + mediaSpecs []hub.MediaSpec } func NewWEBM(args WebMArgs) *WebM { return &WebM{ - hub: args.Hub, + hub: args.Hub, + splitIntervalMS: args.SplitIntervalMS, + streamID: args.StreamID, } } @@ -51,59 +61,130 @@ func (w *WebM) Start(ctx context.Context, source hub.Source) error { if err != nil { return err } + w.mediaSpecs = source.MediaSpecs() ctx = log.WithFields(ctx, logrus.Fields{ fields.StreamID: source.StreamID(), fields.SourceName: source.Name(), }) - muxer := NewEBMLMuxer(int(audioClockRate), 2, ContainerMKV) - err = muxer.Init(ctx) - if err != nil { - return err - } log.Info(ctx, "start webm") sub := w.hub.Subscribe(source.StreamID()) go func() { - var audioTranscodingProcess *processes.AudioTranscodingProcess + // Initialize splitting logic + err := w.createNewMuxer(ctx, int(audioClockRate)) + if err != nil { + log.Error(ctx, err, "failed to create webm muxer") + return + } + + // Initialize audio transcoding process if needed + if hub.HasCodecType(source.MediaSpecs(), hub.CodecTypeAAC) { + w.audioTranscodingProcess = processes.NewTranscodingProcess(astiav.CodecIDAac, astiav.CodecIDOpus, audioSampleRate) + w.audioTranscodingProcess.Init() + defer w.audioTranscodingProcess.Close() + } + for data := range sub { + // Check if we need to initiate a split + if data.H264Video != nil { - w.onVideo(ctx, muxer, data.H264Video) + if !w.splitPending && data.H264Video.RawDTS()-w.lastSplitTime >= w.splitIntervalMS { + w.splitPending = true + } + w.onVideo(ctx, data.H264Video) } if data.AACAudio != nil { - if audioTranscodingProcess == nil { - audioTranscodingProcess = processes.NewTranscodingProcess(astiav.CodecIDAac, astiav.CodecIDOpus, audioSampleRate) - audioTranscodingProcess.Init() - defer audioTranscodingProcess.Close() - } - w.onAACAudio(ctx, muxer, data.AACAudio, audioTranscodingProcess) + w.onAACAudio(ctx, data.AACAudio) } else if data.OPUSAudio != nil { - w.onAudio(ctx, muxer, data.OPUSAudio) + w.onAudio(ctx, data.OPUSAudio) } } - err = muxer.Finalize(ctx) - if err != nil { - log.Error(ctx, err, "failed to finalize") - } + // Ensure the muxer is finalized + w.closeMuxer(ctx) }() return nil } -func (w *WebM) onVideo(ctx context.Context, muxer *EBMLMuxer, data *hub.H264Video) { - keyFrame := data.SliceType == hub.SliceI - err := muxer.WriteVideo(data.Data, keyFrame, uint64(data.RawPTS()), uint64(data.RawDTS())) +// createNewMuxer initializes a new EBMLMuxer +func (w *WebM) createNewMuxer(ctx context.Context, audioClockRate int) error { + // Initialize new muxer + w.webmMuxer = NewEBMLMuxer(audioClockRate, 2, ContainerMKV) + err := w.webmMuxer.Init(ctx) + if err != nil { + return err + } + return nil +} + +// closeMuxer finalizes the current muxer and writes to the output file +func (w *WebM) closeMuxer(ctx context.Context) { + if w.webmMuxer != nil { + // Create output file with timestamp + timestamp := time.Now().Format("2006-01-02-15-04-05") + fileName := fmt.Sprintf("videos/%s_%s.mkv", w.streamID, timestamp) + outputFile, err := record.CreateFileInDir(fileName) + if err != nil { + log.Error(ctx, err, "failed to create output file") + return + } + defer outputFile.Close() + + // Finalize muxer with output file + err = w.webmMuxer.Finalize(ctx, outputFile) + if err != nil { + log.Error(ctx, err, "failed to finalize muxer") + } + w.webmMuxer = nil + } +} + +// splitMuxer handles the logic to split the WebM file +func (w *WebM) splitMuxer(ctx context.Context) error { + // Close current muxer + w.closeMuxer(ctx) + // Create a new muxer + audioClockRate, err := hub.AudioClockRate(w.mediaSpecs) + if err != nil { + return err + } + return w.createNewMuxer(ctx, int(audioClockRate)) +} + +func (w *WebM) onVideo(ctx context.Context, data *hub.H264Video) { + keyFrame := false + for _, sliceType := range data.SliceTypes { + if sliceType == hub.SliceI { + keyFrame = true + break + } + } + + // If a split is pending and we have a keyframe, perform the split + if w.splitPending && keyFrame { + err := w.splitMuxer(ctx) + if err != nil { + log.Error(ctx, err, "failed to split webm file") + return + } + w.lastSplitTime = data.RawDTS() + w.splitPending = false // Reset the split pending flag + } + + err := w.webmMuxer.WriteVideo(data.Data, keyFrame, uint64(data.RawPTS()-w.lastSplitTime), uint64(data.RawDTS()-w.lastSplitTime)) if err != nil { log.Error(ctx, err, "failed to write video") } } -func (w *WebM) onAudio(ctx context.Context, muxer *EBMLMuxer, data *hub.OPUSAudio) { - err := muxer.WriteAudio(data.Data, false, uint64(data.RawPTS()), uint64(data.RawDTS())) +func (w *WebM) onAudio(ctx context.Context, data *hub.OPUSAudio) { + fmt.Println("dts: ", data.RawDTS()) + err := w.webmMuxer.WriteAudio(data.Data, false, uint64(data.RawPTS()-w.lastSplitTime), uint64(data.RawDTS()-w.lastSplitTime)) if err != nil { log.Error(ctx, err, "failed to write audio") } } -func (w *WebM) onAACAudio(ctx context.Context, muxer *EBMLMuxer, aac *hub.AACAudio, transcodingProcess *processes.AudioTranscodingProcess) { +func (w *WebM) onAACAudio(ctx context.Context, aac *hub.AACAudio) { if len(aac.Data) == 0 { log.Warn(ctx, "no data") return @@ -120,7 +201,7 @@ func (w *WebM) onAACAudio(ctx context.Context, muxer *EBMLMuxer, aac *hub.AACAud aacparser.FillADTSHeader(adtsHeader, *aac.MPEG4AudioConfig, aacSamples, len(aac.Data)) audioDataWithADTS := append(adtsHeader, aac.Data...) - packets, err := transcodingProcess.Process(&processes.MediaPacket{ + packets, err := w.audioTranscodingProcess.Process(&processes.MediaPacket{ Data: audioDataWithADTS, PTS: aac.PTS, DTS: aac.DTS, @@ -129,7 +210,7 @@ func (w *WebM) onAACAudio(ctx context.Context, muxer *EBMLMuxer, aac *hub.AACAud fmt.Println(err) } for _, packet := range packets { - w.onAudio(ctx, muxer, &hub.OPUSAudio{ + w.onAudio(ctx, &hub.OPUSAudio{ Data: packet.Data, PTS: packet.PTS, DTS: packet.DTS, diff --git a/media/streamer/egress/record/webm/webm.go b/media/streamer/egress/record/webm/webm.go index cb0b0ae..9cc3af7 100644 --- a/media/streamer/egress/record/webm/webm.go +++ b/media/streamer/egress/record/webm/webm.go @@ -5,10 +5,9 @@ import ( "encoding/binary" "errors" "fmt" - "liveflow/log" - "liveflow/media/streamer/egress/record" + "io" + "io/ioutil" "math" - "math/rand" "os" "github.com/at-wat/ebml-go" @@ -44,11 +43,11 @@ const ( type EBMLMuxer struct { writers []mkvcore.BlockWriteCloser + tempFile *os.File container Name - tempFileName string audioSampleRate float64 audioChannels uint64 - durationPos uint64 + durationPos int64 duration int64 audioStreamIndex int videoStreamIndex int @@ -57,7 +56,6 @@ type EBMLMuxer struct { func NewEBMLMuxer(sampleRate int, channels int, container Name) *EBMLMuxer { return &EBMLMuxer{ writers: nil, - tempFileName: "", audioSampleRate: float64(sampleRate), audioChannels: uint64(channels), durationPos: 0, @@ -71,12 +69,9 @@ func (w *EBMLMuxer) makeWebmWriters() ([]mkvcore.BlockWriteCloser, error) { trackTypeVideo = 1 trackTypeAudio = 2 ) - tempFile, err := record.CreateFileInDir(fmt.Sprintf("videos/%d.webm", rand.Int())) - if err != nil { - return nil, err - } w.audioStreamIndex = 0 w.videoStreamIndex = 1 + trackEntries := []webm.TrackEntry{ { Name: trackNameAudio, @@ -101,31 +96,30 @@ func (w *EBMLMuxer) makeWebmWriters() ([]mkvcore.BlockWriteCloser, error) { }, }, } - writers, err := webm.NewSimpleBlockWriter(tempFile, trackEntries, + + var err error + w.tempFile, err = ioutil.TempFile("", "ebmlmuxer-*.webm") + if err != nil { + return nil, err + } + + writers, err := webm.NewSimpleBlockWriter(w.tempFile, trackEntries, mkvcore.WithSeekHead(true), - mkvcore.WithOnErrorHandler(func(err error) { - log.Error(context.Background(), err, "failed to construct webm writer (error)") - }), - mkvcore.WithOnFatalHandler(func(err error) { - log.Error(context.Background(), err, "failed to construct webm writer (fatal)") - }), mkvcore.WithSegmentInfo(&webm.Info{ TimecodeScale: defaultTimecode, // 1ms - MuxingApp: "mrw-v4.ebml-go.webm", - WritingApp: "mrw-v4.ebml-go.webm", - Duration: defaultDuration, // Arbitrarily set to default videoSplitIntervalMs, final value is adjusted in writeTrailer. + MuxingApp: "your_app_name", + WritingApp: "your_app_name", + Duration: defaultDuration, // Placeholder duration; final value is adjusted in overwritePTS. }), mkvcore.WithMarshalOptions(ebml.WithElementWriteHooks(func(e *ebml.Element) { - switch e.Name { - case "Duration": - w.durationPos = e.Position + 4 // Duration header size = 3, SegmentInfo header size delta = 1 + if e.Name == "Duration" { + w.durationPos = int64(e.Position + 4) // Adjust position to overwrite duration later. } })), ) if err != nil { return nil, err } - w.tempFileName = tempFile.Name() var mkvWriters []mkvcore.BlockWriteCloser for _, writer := range writers { mkvWriters = append(mkvWriters, writer) @@ -138,67 +132,62 @@ func (w *EBMLMuxer) makeMKVWriters() ([]mkvcore.BlockWriteCloser, error) { trackTypeVideo = 1 trackTypeAudio = 2 ) - tempFile, err := record.CreateFileInDir(fmt.Sprintf("videos/%d.mkv", rand.Int())) + w.audioStreamIndex = 0 + w.videoStreamIndex = 1 + + mkvTrackDesc := []mkvcore.TrackDescription{ + { + TrackNumber: 1, + TrackEntry: webm.TrackEntry{ + Name: trackNameAudio, + TrackNumber: 1, + TrackUID: 1, + CodecID: codecIDOPUS, + TrackType: trackTypeAudio, + Audio: &webm.Audio{ + SamplingFrequency: w.audioSampleRate, + Channels: w.audioChannels, + }, + }, + }, + { + TrackNumber: webmVideoTrackNumber, + TrackEntry: webm.TrackEntry{ + TrackNumber: webmVideoTrackNumber, + TrackUID: webmVideoTrackNumber, + TrackType: trackTypeVideo, + Name: trackNameVideo, + CodecID: codecIDH264, + DefaultDuration: 0, + }, + }, + } + + var err error + w.tempFile, err = ioutil.TempFile("/tmp", "ebmlmuxer-*.mkv") if err != nil { return nil, err } - var mkvTrackDesc []mkvcore.TrackDescription - w.audioStreamIndex = 0 - w.videoStreamIndex = 1 - mkvTrackDesc = append(mkvTrackDesc, mkvcore.TrackDescription{ - TrackNumber: 1, - TrackEntry: webm.TrackEntry{ - Name: trackNameAudio, - TrackNumber: 1, - TrackUID: 1, - CodecID: codecIDOPUS, - TrackType: trackTypeAudio, - Audio: &webm.Audio{ - SamplingFrequency: w.audioSampleRate, - Channels: 2, - }, - }, - }) - mkvTrackDesc = append(mkvTrackDesc, mkvcore.TrackDescription{ - TrackNumber: webmVideoTrackNumber, - TrackEntry: webm.TrackEntry{ - TrackNumber: webmVideoTrackNumber, - TrackUID: webmVideoTrackNumber, - TrackType: trackTypeVideo, - DefaultDuration: 0, - Name: trackNameVideo, - CodecID: codecIDH264, - SeekPreRoll: 0, - // TODO: The resolution may need to be written later, but it works fine without it for now. - //Video: &webm.Video{ - // PixelWidth: 1280, - // PixelHeight: 720, - //}, - }, - }) - var mkvWriters []mkvcore.BlockWriteCloser - mkvWriters, err = mkvcore.NewSimpleBlockWriter(tempFile, mkvTrackDesc, + + writers, err := mkvcore.NewSimpleBlockWriter(w.tempFile, mkvTrackDesc, mkvcore.WithSeekHead(true), mkvcore.WithEBMLHeader(mkv.DefaultEBMLHeader), mkvcore.WithSegmentInfo(&webm.Info{ - TimecodeScale: defaultTimecode, // 1ms - MuxingApp: "mrw-v4.ebml-go.mkv", - WritingApp: "mrw-v4.ebml-go.mkv", - Duration: defaultDuration, // Arbitrarily set to default videoSplitIntervalMs, final value is adjusted in writeTrailer. + TimecodeScale: defaultTimecode, + MuxingApp: "your_app_name", + WritingApp: "your_app_name", + Duration: defaultDuration, }), - mkvcore.WithBlockInterceptor(webm.DefaultBlockInterceptor), mkvcore.WithMarshalOptions(ebml.WithElementWriteHooks(func(e *ebml.Element) { - switch e.Name { - case "Duration": - w.durationPos = e.Position + 4 // Duration header size = 3, SegmentInfo header size delta = 1 + if e.Name == "Duration" { + w.durationPos = int64(e.Position + 4) } })), ) if err != nil { return nil, err } - w.tempFileName = tempFile.Name() - return mkvWriters, nil + return writers, nil } func (w *EBMLMuxer) Init(_ context.Context) error { @@ -238,53 +227,52 @@ func (w *EBMLMuxer) WriteAudio(data []byte, keyframe bool, pts uint64, _ uint64) return nil } -func (w *EBMLMuxer) Finalize(ctx context.Context) error { - defer func() { - w.cleanup() - }() - log.Info(ctx, "finalize webm muxer") - fileName := w.tempFileName +func (w *EBMLMuxer) Finalize(ctx context.Context, output io.Writer) error { + defer w.cleanup() + + if err := w.overwritePTS(); err != nil { + return fmt.Errorf("overwrite PTS error: %w", err) + } + + // Copy the data from the temporary file to the output writer + if _, err := w.tempFile.Seek(0, io.SeekStart); err != nil { + return fmt.Errorf("seek error: %w", err) + } + if _, err := io.Copy(output, w.tempFile); err != nil { + return fmt.Errorf("copy error: %w", err) + } + for _, writer := range w.writers { if err := writer.Close(); err != nil { - log.Error(ctx, err, "failed to close writer") + return fmt.Errorf("writer close error: %w", err) } } - if err := w.overwritePTS(ctx, fileName); err != nil { - return err - } return nil } -func (w *EBMLMuxer) ContainerName() string { - return string(w.container) -} - -func (w *EBMLMuxer) overwritePTS(ctx context.Context, fileName string) error { - tempFile, err := os.OpenFile(fileName, os.O_RDWR, 0o600) - if err != nil { +func (w *EBMLMuxer) overwritePTS() error { + ptsBytes := make([]byte, 8) + binary.BigEndian.PutUint64(ptsBytes, math.Float64bits(float64(w.duration))) + if _, err := w.tempFile.Seek(w.durationPos, io.SeekStart); err != nil { return err } - defer func() { - if err := tempFile.Close(); err != nil { - log.Error(ctx, err, "failed to close temp file") - } - }() - ptsBytes, _ := EncodeFloat64(float64(w.duration)) - if _, err := tempFile.WriteAt(ptsBytes, int64(w.durationPos)); err != nil { + if _, err := w.tempFile.Write(ptsBytes); err != nil { return err } return nil } func (w *EBMLMuxer) cleanup() { + if w.tempFile != nil { + w.tempFile.Close() + //os.Remove(w.tempFile.Name()) + w.tempFile = nil + } w.writers = nil - w.tempFileName = "" w.duration = 0 w.durationPos = 0 } -func EncodeFloat64(i float64) ([]byte, error) { - b := make([]byte, 8) - binary.BigEndian.PutUint64(b, math.Float64bits(i)) - return b, nil +func (w *EBMLMuxer) ContainerName() string { + return string(w.container) } diff --git a/media/streamer/ingress/h264util.go b/media/streamer/ingress/h264util.go new file mode 100644 index 0000000..c130c2f --- /dev/null +++ b/media/streamer/ingress/h264util.go @@ -0,0 +1,35 @@ +package ingress + +import ( + "liveflow/media/hub" + + "github.com/deepch/vdk/codec/h264parser" +) + +func SliceTypes(payload []byte) []hub.SliceType { + nalus, _ := h264parser.SplitNALUs(payload) + slices := make([]hub.SliceType, 0) + for _, nalu := range nalus { + if len(nalu) < 1 { + continue + } + nalUnitType := nalu[0] & 0x1f + switch nalUnitType { + case h264parser.NALU_SPS: + slices = append(slices, hub.SliceSPS) + case h264parser.NALU_PPS: + slices = append(slices, hub.SlicePPS) + default: + sliceType, _ := h264parser.ParseSliceHeaderFromNALU(nalu) + switch sliceType { + case h264parser.SLICE_I: + slices = append(slices, hub.SliceI) + case h264parser.SLICE_P: + slices = append(slices, hub.SliceP) + case h264parser.SLICE_B: + slices = append(slices, hub.SliceB) + } + } + } + return slices +} diff --git a/media/streamer/ingress/rtmp/handler.go b/media/streamer/ingress/rtmp/handler.go index d4f5e13..fb521c6 100644 --- a/media/streamer/ingress/rtmp/handler.go +++ b/media/streamer/ingress/rtmp/handler.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "io" + "liveflow/media/streamer/ingress" "os" "path/filepath" @@ -335,6 +336,7 @@ func (h *Handler) publishVideoData(timestamp uint32, compositionTime int32, vide dts := int64(timestamp) pts := int64(compositionTime) + dts + sliceTypes := ingress.SliceTypes(videoDataToSend) h.hub.Publish(h.streamID, &hub.FrameData{ H264Video: &hub.H264Video{ VideoClockRate: 90000, @@ -344,6 +346,7 @@ func (h *Handler) publishVideoData(timestamp uint32, compositionTime int32, vide SPS: h.sps, PPS: h.pps, CodecData: nil, + SliceTypes: sliceTypes, }, }) } diff --git a/media/streamer/ingress/whip/handler.go b/media/streamer/ingress/whip/handler.go index e0fa1b2..882eb34 100644 --- a/media/streamer/ingress/whip/handler.go +++ b/media/streamer/ingress/whip/handler.go @@ -4,11 +4,11 @@ import ( "context" "fmt" "io" + "liveflow/media/streamer/ingress" "net/http" "strings" "time" - "github.com/deepch/vdk/codec/h264parser" "github.com/labstack/echo/v4" "github.com/pion/rtp" "github.com/pion/rtp/codecs" @@ -227,30 +227,7 @@ func (w *WebRTCHandler) onVideo(ctx context.Context, packets []*rtp.Packet) erro return nil } pts := w.videoTimestampGen.Generate(int64(packets[0].Timestamp)) - nalus, _ := h264parser.SplitNALUs(payload) - var slice hub.SliceType - for _, nalu := range nalus { - if len(nalu) < 1 { - continue - } - nalUnitType := nalu[0] & 0x1f - switch nalUnitType { - case h264parser.NALU_SPS: - slice = hub.SliceSPS - case h264parser.NALU_PPS: - slice = hub.SlicePPS - default: - sliceType, _ := h264parser.ParseSliceHeaderFromNALU(nalu) - switch sliceType { - case h264parser.SLICE_I: - slice = hub.SliceI - case h264parser.SLICE_P: - slice = hub.SliceP - case h264parser.SLICE_B: - slice = hub.SliceB - } - } - } + sliceTypes := ingress.SliceTypes(payload) w.hub.Publish(w.streamID, &hub.FrameData{ H264Video: &hub.H264Video{ PTS: pts, @@ -259,7 +236,7 @@ func (w *WebRTCHandler) onVideo(ctx context.Context, packets []*rtp.Packet) erro Data: payload, SPS: nil, PPS: nil, - SliceType: slice, + SliceTypes: sliceTypes, CodecData: nil, }, AACAudio: nil,