diff --git a/plugin/mp4/pkg/record.go b/plugin/mp4/pkg/record.go index e8ac297..fc08fb3 100644 --- a/plugin/mp4/pkg/record.go +++ b/plugin/mp4/pkg/record.go @@ -13,7 +13,6 @@ import ( "m7s.live/v5/pkg/config" "m7s.live/v5/pkg/task" "m7s.live/v5/plugin/mp4/pkg/box" - rtmp "m7s.live/v5/plugin/rtmp/pkg" ) type WriteTrailerQueueTask struct { @@ -136,7 +135,21 @@ var CustomFileName = func(job *m7s.RecordJob) string { } func (r *Recorder) createStream(start time.Time) (err error) { - return r.CreateStream(start, CustomFileName) + err = r.CreateStream(start, CustomFileName) + if err != nil { + return + } + r.file, err = os.Create(r.Event.FilePath) + if err != nil { + return + } + if r.RecordJob.RecConf.Type == "fmp4" { + r.Event.Type = "fmp4" + r.muxer = NewMuxerWithStreamPath(FLAG_FRAGMENT, r.Event.StreamPath) + } else { + r.muxer = NewMuxerWithStreamPath(0, r.Event.StreamPath) + } + return r.muxer.WriteInitSegment(r.file) } func (r *Recorder) Dispose() { @@ -149,27 +162,7 @@ func (r *Recorder) Run() (err error) { recordJob := &r.RecordJob sub := recordJob.Subscriber var audioTrack, videoTrack *Track - startTime := time.Now() - if recordJob.Event != nil { - startTime = startTime.Add(-time.Duration(recordJob.Event.BeforeDuration) * time.Millisecond) - } - err = r.createStream(startTime) - if err != nil { - return - } - r.file, err = os.Create(r.Event.FilePath) - if err != nil { - return - } - if recordJob.RecConf.Type == "fmp4" { - r.Event.Type = "fmp4" - r.muxer = NewMuxerWithStreamPath(FLAG_FRAGMENT, r.Event.StreamPath) - } else { - r.muxer = NewMuxerWithStreamPath(0, r.Event.StreamPath) - } - r.muxer.WriteInitSegment(r.file) var at, vt *pkg.AVTrack - checkEventRecordStop := func(absTime uint32) (err error) { if absTime >= recordJob.Event.AfterDuration+recordJob.Event.BeforeDuration { r.RecordJob.Stop(task.ErrStopByUser) @@ -177,19 +170,16 @@ func (r *Recorder) Run() (err error) { return } - checkFragment := func(absTime uint32) (err error) { - if duration := int64(absTime); time.Duration(duration)*time.Millisecond >= recordJob.RecConf.Fragment { - now := time.Now() - r.writeTailer(now) - err = r.createStream(now) + checkFragment := func(reader *pkg.AVRingReader) (err error) { + if duration := int64(reader.AbsTime); time.Duration(duration)*time.Millisecond >= recordJob.RecConf.Fragment { + r.writeTailer(reader.Value.WriteTime) + err = r.createStream(reader.Value.WriteTime) if err != nil { return } at, vt = nil, nil if vr := sub.VideoReader; vr != nil { vr.ResetAbsTime() - //seq := vt.SequenceFrame.(*rtmp.RTMPVideo) - //offset = int64(seq.Size + 15) } if ar := sub.AudioReader; ar != nil { ar.ResetAbsTime() @@ -198,7 +188,13 @@ func (r *Recorder) Run() (err error) { return } - return m7s.PlayBlock(sub, func(audio *pkg.RawAudio) error { + return m7s.PlayBlock(sub, func(audio *Audio) error { + if r.Event.StartTime.IsZero() { + err = r.createStream(sub.AudioReader.Value.WriteTime) + if err != nil { + return err + } + } r.Event.Duration = sub.AudioReader.AbsTime if sub.VideoReader == nil { if recordJob.Event != nil { @@ -208,7 +204,7 @@ func (r *Recorder) Run() (err error) { } } if recordJob.RecConf.Fragment != 0 { - err := checkFragment(sub.AudioReader.AbsTime) + err := checkFragment(sub.AudioReader) if err != nil { return err } @@ -238,12 +234,16 @@ func (r *Recorder) Run() (err error) { track.ChannelCount = uint8(ctx.Channels) } } - dts := sub.AudioReader.AbsTime - return r.muxer.WriteSample(r.file, audioTrack, box.Sample{ - Data: audio.ToBytes(), - Timestamp: uint32(dts), - }) - }, func(video *rtmp.RTMPVideo) error { + sample := audio.Sample + sample.Timestamp = uint32(sub.AudioReader.AbsTime) + return r.muxer.WriteSample(r.file, audioTrack, sample) + }, func(video *Video) error { + if r.Event.StartTime.IsZero() { + err = r.createStream(sub.VideoReader.Value.WriteTime) + if err != nil { + return err + } + } r.Event.Duration = sub.VideoReader.AbsTime if sub.VideoReader.Value.IDR { if recordJob.Event != nil { @@ -253,14 +253,14 @@ func (r *Recorder) Run() (err error) { } } if recordJob.RecConf.Fragment != 0 { - err := checkFragment(sub.VideoReader.AbsTime) + err := checkFragment(sub.VideoReader) if err != nil { return err } } } - offset := 5 - bytes := video.ToBytes() + ctx := vt.ICodecCtx.(pkg.IVideoCodecCtx) + width, height := uint32(ctx.Width()), uint32(ctx.Height()) if vt == nil { vt = sub.VideoReader.Track switch ctx := vt.ICodecCtx.GetBase().(type) { @@ -268,63 +268,36 @@ func (r *Recorder) Run() (err error) { track := r.muxer.AddTrack(box.MP4_CODEC_H264) videoTrack = track track.ExtraData = ctx.Record - track.Width = uint32(ctx.Width()) - track.Height = uint32(ctx.Height()) + track.Width = width + track.Height = height case *codec.H265Ctx: track := r.muxer.AddTrack(box.MP4_CODEC_H265) videoTrack = track track.ExtraData = ctx.Record - track.Width = uint32(ctx.Width()) - track.Height = uint32(ctx.Height()) + track.Width = width + track.Height = height } } - switch ctx := vt.ICodecCtx.(type) { - case *codec.H264Ctx: - if bytes[1] == 0 { - // Check if video resolution has changed - if uint32(ctx.Width()) != videoTrack.Width || uint32(ctx.Height()) != videoTrack.Height { - r.Info("Video resolution changed, restarting recording", - "old", fmt.Sprintf("%dx%d", videoTrack.Width, videoTrack.Height), - "new", fmt.Sprintf("%dx%d", ctx.Width(), ctx.Height())) - now := time.Now() - r.writeTailer(now) - err = r.createStream(now) - if err != nil { - return nil - } - at, vt = nil, nil - if vr := sub.VideoReader; vr != nil { - vr.ResetAbsTime() - //seq := vt.SequenceFrame.(*rtmp.RTMPVideo) - //offset = int64(seq.Size + 15) - } - if ar := sub.AudioReader; ar != nil { - ar.ResetAbsTime() - } - } + + if width != videoTrack.Width || height != videoTrack.Height { + r.Info("Video resolution changed, restarting recording", + "old", fmt.Sprintf("%dx%d", videoTrack.Width, videoTrack.Height), + "new", fmt.Sprintf("%dx%d", width, height)) + r.writeTailer(sub.VideoReader.Value.WriteTime) + err = r.createStream(sub.VideoReader.Value.WriteTime) + if err != nil { return nil } - case *rtmp.H265Ctx: - if ctx.Enhanced { - switch t := bytes[0] & 0b1111; t { - case rtmp.PacketTypeCodedFrames: - offset += 3 - case rtmp.PacketTypeSequenceStart: - return nil - case rtmp.PacketTypeCodedFramesX: - default: - r.Warn("unknown h265 packet type", "type", t) - return nil - } - } else if bytes[1] == 0 { - return nil + at, vt = nil, nil + if vr := sub.VideoReader; vr != nil { + vr.ResetAbsTime() + } + if ar := sub.AudioReader; ar != nil { + ar.ResetAbsTime() } } - return r.muxer.WriteSample(r.file, videoTrack, box.Sample{ - KeyFrame: sub.VideoReader.Value.IDR, - Data: bytes[offset:], - Timestamp: uint32(sub.VideoReader.AbsTime), - CTS: video.CTS, - }) + sample := video.Sample + sample.Timestamp = uint32(sub.VideoReader.AbsTime) + return r.muxer.WriteSample(r.file, videoTrack, sample) }) }