playback: fix error 404 when seeking before start of segment (#4276) (#4533) (#4539)

Fixes #4276
Replaces #4533
This commit is contained in:
Alessandro Ros
2025-05-25 18:50:19 +02:00
committed by GitHub
parent 6c8bf4f3d4
commit 9a3bbda8f8
5 changed files with 325 additions and 177 deletions

View File

@@ -6,6 +6,7 @@ import (
"github.com/bluenviron/mediacommon/v2/pkg/formats/fmp4"
"github.com/bluenviron/mediacommon/v2/pkg/formats/fmp4/seekablebuffer"
"github.com/bluenviron/mediamtx/internal/recordstore"
)
const (
@@ -75,7 +76,7 @@ func (w *muxerFMP4) writeSample(
// if sample is a IDR, remove previous GOP
if !isNonSyncSample {
w.curTrack.samples = nil
w.curTrack.samples = w.curTrack.samples[:0]
}
} else {
diff := dts - w.curTrack.lastDTS
@@ -103,10 +104,11 @@ func (w *muxerFMP4) writeSample(
} else {
if !isNonSyncSample { // sample is IDR
// reset GOP
w.curTrack.samples = []*fmp4.Sample{{
w.curTrack.samples = w.curTrack.samples[:0]
w.curTrack.samples = append(w.curTrack.samples, &fmp4.Sample{
IsNonSyncSample: isNonSyncSample,
Payload: pl,
}}
})
} else { // sample is not IDR
// append sample to current GOP, with PTSOffset = 0 and Duration = 0
w.curTrack.samples = append(w.curTrack.samples, &fmp4.Sample{
@@ -120,7 +122,7 @@ func (w *muxerFMP4) writeSample(
}
func (w *muxerFMP4) writeFinalDTS(dts int64) {
if w.curTrack.firstDTS >= 0 {
if len(w.curTrack.samples) != 0 && w.curTrack.firstDTS >= 0 {
diff := dts - w.curTrack.lastDTS
if diff < 0 {
diff = 0
@@ -156,26 +158,20 @@ func (w *muxerFMP4) innerFlush(final bool) error {
}
}
if part.Tracks != nil {
part.SequenceNumber = w.nextSequenceNumber
w.nextSequenceNumber++
// no samples to write
if part.Tracks == nil {
// if no samples has been written before, return an error
if w.init != nil {
err := w.init.Marshal(&w.outBuf)
if err != nil {
return err
}
_, err = w.w.Write(w.outBuf.Bytes())
if err != nil {
return err
}
w.init = nil
w.outBuf.Reset()
return recordstore.ErrNoSegmentsFound
}
return nil
}
err := part.Marshal(&w.outBuf)
part.SequenceNumber = w.nextSequenceNumber
w.nextSequenceNumber++
if w.init != nil {
err := w.init.Marshal(&w.outBuf)
if err != nil {
return err
}
@@ -185,9 +181,22 @@ func (w *muxerFMP4) innerFlush(final bool) error {
return err
}
w.init = nil
w.outBuf.Reset()
}
err := part.Marshal(&w.outBuf)
if err != nil {
return err
}
_, err = w.w.Write(w.outBuf.Bytes())
if err != nil {
return err
}
w.outBuf.Reset()
return nil
}

View File

@@ -5,6 +5,7 @@ import (
"github.com/bluenviron/mediacommon/v2/pkg/formats/fmp4"
"github.com/bluenviron/mediacommon/v2/pkg/formats/pmp4"
"github.com/bluenviron/mediamtx/internal/recordstore"
)
type muxerMP4Track struct {
@@ -55,17 +56,17 @@ func (w *muxerMP4) writeSample(
) error {
// remove GOPs before the GOP of the first sample
if (dts < 0 || (dts >= 0 && w.curTrack.lastDTS < 0)) && !isNonSyncSample {
w.curTrack.Samples = nil
w.curTrack.Samples = w.curTrack.Samples[:0]
}
if w.curTrack.Samples == nil {
if len(w.curTrack.Samples) == 0 {
w.curTrack.TimeOffset = int32(dts)
} else {
diff := dts - w.curTrack.lastDTS
if diff < 0 {
diff = 0
duration := dts - w.curTrack.lastDTS
if duration < 0 {
duration = 0
}
w.curTrack.Samples[len(w.curTrack.Samples)-1].Duration = uint32(diff)
w.curTrack.Samples[len(w.curTrack.Samples)-1].Duration = uint32(duration)
}
// prevent warning "edit list: 1 Missing key frame while searching for timestamp: 0"
@@ -85,14 +86,20 @@ func (w *muxerMP4) writeSample(
}
func (w *muxerMP4) writeFinalDTS(dts int64) {
diff := dts - w.curTrack.lastDTS
if diff < 0 {
diff = 0
if len(w.curTrack.Samples) != 0 {
duration := dts - w.curTrack.lastDTS
if duration < 0 {
duration = 0
}
w.curTrack.Samples[len(w.curTrack.Samples)-1].Duration = uint32(duration)
}
w.curTrack.Samples[len(w.curTrack.Samples)-1].Duration = uint32(diff)
}
func (w *muxerMP4) flush() error {
if len(w.curTrack.Samples) == 0 || w.curTrack.lastDTS < 0 {
return recordstore.ErrNoSegmentsFound
}
h := pmp4.Presentation{
Tracks: make([]*pmp4.Track, len(w.tracks)),
}

View File

@@ -64,9 +64,9 @@ func seekAndMux(
m.writeInit(firstInit)
segmentStartOffset := start.Sub(segments[0].Start)
segmentStartOffset := segments[0].Start.Sub(start) // this is negative
segmentDuration, err := segmentFMP4SeekAndMuxParts(f, segmentStartOffset, duration, firstInit, m)
segmentDuration, err := segmentFMP4MuxParts(f, segmentStartOffset, duration, firstInit, m)
if err != nil {
return err
}
@@ -90,7 +90,7 @@ func seekAndMux(
break
}
segmentStartOffset := seg.Start.Sub(start)
segmentStartOffset := seg.Start.Sub(start) // this is positive
var segmentDuration time.Duration
segmentDuration, err = segmentFMP4MuxParts(f, segmentStartOffset, duration, firstInit, m)

View File

@@ -52,7 +52,6 @@ func writeSegment1(t *testing.T, fpath string) {
var buf2 seekablebuffer.Buffer
parts := fmp4.Parts{
{
SequenceNumber: 2,
Tracks: []*fmp4.PartTrack{
{
ID: 1,
@@ -131,7 +130,6 @@ func writeSegment2(t *testing.T, fpath string) {
var buf2 seekablebuffer.Buffer
parts := fmp4.Parts{
{
SequenceNumber: 3,
Tracks: []*fmp4.PartTrack{{
ID: 1,
BaseTime: 0,
@@ -144,7 +142,6 @@ func writeSegment2(t *testing.T, fpath string) {
}},
},
{
SequenceNumber: 4,
Tracks: []*fmp4.PartTrack{{
ID: 2,
BaseTime: 0,
@@ -210,7 +207,6 @@ func writeSegment3(t *testing.T, fpath string) {
var buf2 seekablebuffer.Buffer
parts := fmp4.Parts{
{
SequenceNumber: 1,
Tracks: []*fmp4.PartTrack{{
ID: 1,
BaseTime: 0,
@@ -677,3 +673,273 @@ func TestOnGetNTPCompensation(t *testing.T) {
},
}, parts)
}
func TestOnGetInMiddleOfLastSample(t *testing.T) {
for _, format := range []string{"fmp4", "mp4"} {
t.Run(format, func(t *testing.T) {
dir, err := os.MkdirTemp("", "mediamtx-playback")
require.NoError(t, err)
defer os.RemoveAll(dir)
err = os.Mkdir(filepath.Join(dir, "mypath"), 0o755)
require.NoError(t, err)
init := fmp4.Init{
Tracks: []*fmp4.InitTrack{
{
ID: 1,
TimeScale: 90000,
Codec: &mp4.CodecH264{
SPS: test.FormatH264.SPS,
PPS: test.FormatH264.PPS,
},
},
},
}
func() {
fpath := filepath.Join(dir, "mypath", "2008-11-07_11-22-00-000000.mp4")
var buf1 seekablebuffer.Buffer
err = init.Marshal(&buf1)
require.NoError(t, err)
var buf2 seekablebuffer.Buffer
parts := fmp4.Parts{
{
Tracks: []*fmp4.PartTrack{
{
ID: 1,
Samples: []*fmp4.Sample{
{
Duration: 1 * 90000,
IsNonSyncSample: false,
Payload: []byte{1, 2},
},
},
},
},
},
}
err = parts.Marshal(&buf2)
require.NoError(t, err)
err = os.WriteFile(fpath, append(buf1.Bytes(), buf2.Bytes()...), 0o644)
require.NoError(t, err)
}()
s := &Server{
Address: "127.0.0.1:9996",
ReadTimeout: conf.Duration(10 * time.Second),
PathConfs: map[string]*conf.Path{
"mypath": {
Name: "mypath",
RecordPath: filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f"),
},
},
AuthManager: test.NilAuthManager,
Parent: test.NilLogger,
}
err = s.Initialize()
require.NoError(t, err)
defer s.Close()
u, err := url.Parse("http://myuser:mypass@localhost:9996/get")
require.NoError(t, err)
v := url.Values{}
v.Set("path", "mypath")
v.Set("start", time.Date(2008, 11, 7, 11, 22, 0, 500000000, time.Local).Format(time.RFC3339Nano))
v.Set("duration", "3")
v.Set("format", format)
u.RawQuery = v.Encode()
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
require.NoError(t, err)
res, err := http.DefaultClient.Do(req)
require.NoError(t, err)
defer res.Body.Close()
require.Equal(t, http.StatusNotFound, res.StatusCode)
})
}
}
func TestOnGetBetweenSegments(t *testing.T) {
for _, ca := range []string{
"idr before",
"idr after",
} {
t.Run(ca, func(t *testing.T) {
dir, err := os.MkdirTemp("", "mediamtx-playback")
require.NoError(t, err)
defer os.RemoveAll(dir)
err = os.Mkdir(filepath.Join(dir, "mypath"), 0o755)
require.NoError(t, err)
init := fmp4.Init{
Tracks: []*fmp4.InitTrack{
{
ID: 1,
TimeScale: 90000,
Codec: &mp4.CodecH264{
SPS: test.FormatH264.SPS,
PPS: test.FormatH264.PPS,
},
},
},
}
func() {
fpath := filepath.Join(dir, "mypath", "2008-11-07_11-22-00-000000.mp4")
var buf1 seekablebuffer.Buffer
err = init.Marshal(&buf1)
require.NoError(t, err)
var buf2 seekablebuffer.Buffer
parts := fmp4.Parts{
{
Tracks: []*fmp4.PartTrack{
{
ID: 1,
Samples: []*fmp4.Sample{
{
Duration: 1 * 90000,
IsNonSyncSample: false,
Payload: []byte{1, 2},
},
},
},
},
},
}
err = parts.Marshal(&buf2)
require.NoError(t, err)
err = os.WriteFile(fpath, append(buf1.Bytes(), buf2.Bytes()...), 0o644)
require.NoError(t, err)
}()
func() {
fpath := filepath.Join(dir, "mypath", "2008-11-07_11-22-01-000000.mp4")
var buf1 seekablebuffer.Buffer
err = init.Marshal(&buf1)
require.NoError(t, err)
var buf2 seekablebuffer.Buffer
parts := fmp4.Parts{
{
Tracks: []*fmp4.PartTrack{
{
ID: 1,
Samples: []*fmp4.Sample{
{
Duration: 1 * 90000,
IsNonSyncSample: (ca == "idr before"),
Payload: []byte{3, 4},
},
},
},
},
},
}
err = parts.Marshal(&buf2)
require.NoError(t, err)
err = os.WriteFile(fpath, append(buf1.Bytes(), buf2.Bytes()...), 0o644)
require.NoError(t, err)
}()
s := &Server{
Address: "127.0.0.1:9996",
ReadTimeout: conf.Duration(10 * time.Second),
PathConfs: map[string]*conf.Path{
"mypath": {
Name: "mypath",
RecordPath: filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f"),
},
},
AuthManager: test.NilAuthManager,
Parent: test.NilLogger,
}
err = s.Initialize()
require.NoError(t, err)
defer s.Close()
u, err := url.Parse("http://myuser:mypass@localhost:9996/get")
require.NoError(t, err)
v := url.Values{}
v.Set("path", "mypath")
v.Set("start", time.Date(2008, 11, 7, 11, 22, 0, 500000000, time.Local).Format(time.RFC3339Nano))
v.Set("duration", "3")
v.Set("format", "fmp4")
u.RawQuery = v.Encode()
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
require.NoError(t, err)
res, err := http.DefaultClient.Do(req)
require.NoError(t, err)
defer res.Body.Close()
require.Equal(t, http.StatusOK, res.StatusCode)
buf, err := io.ReadAll(res.Body)
require.NoError(t, err)
var parts fmp4.Parts
err = parts.Unmarshal(buf)
require.NoError(t, err)
switch ca {
case "idr before":
require.Equal(t, fmp4.Parts{
{
SequenceNumber: 0,
Tracks: []*fmp4.PartTrack{
{
ID: 1,
BaseTime: 45000,
Samples: []*fmp4.Sample{
{
Duration: 0,
Payload: []byte{1, 2},
},
{
Duration: 90000,
Payload: []byte{3, 4},
IsNonSyncSample: true,
},
},
},
},
},
}, parts)
case "idr after":
require.Equal(t, fmp4.Parts{
{
SequenceNumber: 0,
Tracks: []*fmp4.PartTrack{
{
ID: 1,
BaseTime: 45000,
Samples: []*fmp4.Sample{
{
Duration: 90000,
Payload: []byte{3, 4},
},
},
},
},
},
}, parts)
}
})
}
}

View File

@@ -10,7 +10,6 @@ import (
"github.com/abema/go-mp4"
"github.com/bluenviron/mediacommon/v2/pkg/formats/fmp4"
"github.com/bluenviron/mediamtx/internal/recordstore"
)
const (
@@ -368,143 +367,14 @@ outer:
return maxElapsed, nil
}
func segmentFMP4SeekAndMuxParts(
r readSeekerAt,
segmentStartOffset time.Duration,
duration time.Duration,
init *fmp4.Init,
m muxer,
) (time.Duration, error) {
var segmentStartOffsetMP4 int64
var durationMP4 int64
moofOffset := uint64(0)
var tfhd *mp4.Tfhd
var tfdt *mp4.Tfdt
atLeastOnePartWritten := false
var timeScale uint32
var maxMuxerDTS time.Duration
breakAtNextMdat := false
_, err := mp4.ReadBoxStructure(r, func(h *mp4.ReadHandle) (interface{}, error) {
switch h.BoxInfo.Type.String() {
case "moof":
moofOffset = h.BoxInfo.Offset
return h.Expand()
case "traf":
return h.Expand()
case "tfhd":
box, _, err := h.ReadPayload()
if err != nil {
return nil, err
}
tfhd = box.(*mp4.Tfhd)
case "tfdt":
box, _, err := h.ReadPayload()
if err != nil {
return nil, err
}
tfdt = box.(*mp4.Tfdt)
track := findInitTrack(init.Tracks, int(tfhd.TrackID))
if track == nil {
return nil, fmt.Errorf("invalid track ID: %v", tfhd.TrackID)
}
m.setTrack(int(tfhd.TrackID))
timeScale = track.TimeScale
segmentStartOffsetMP4 = durationGoToMp4(segmentStartOffset, track.TimeScale)
durationMP4 = durationGoToMp4(duration, track.TimeScale)
case "trun":
box, _, err := h.ReadPayload()
if err != nil {
return nil, err
}
trun := box.(*mp4.Trun)
dataOffset := moofOffset + uint64(trun.DataOffset)
muxerDTS := int64(tfdt.BaseMediaDecodeTimeV1) - segmentStartOffsetMP4
atLeastOneSampleWritten := false
for _, e := range trun.Entries {
if muxerDTS >= durationMP4 {
breakAtNextMdat = true
break
}
if muxerDTS >= 0 {
atLeastOnePartWritten = true
}
sampleOffset := dataOffset
sampleSize := e.SampleSize
err = m.writeSample(
muxerDTS,
e.SampleCompositionTimeOffsetV1,
(e.SampleFlags&sampleFlagIsNonSyncSample) != 0,
e.SampleSize,
func() ([]byte, error) {
payload := make([]byte, sampleSize)
n, err2 := r.ReadAt(payload, int64(sampleOffset))
if err2 != nil {
return nil, err2
}
if n != int(sampleSize) {
return nil, fmt.Errorf("partial read")
}
return payload, nil
},
)
if err != nil {
return nil, err
}
atLeastOneSampleWritten = true
dataOffset += uint64(e.SampleSize)
muxerDTS += int64(e.SampleDuration)
}
if atLeastOneSampleWritten {
m.writeFinalDTS(muxerDTS)
}
muxerDTSGo := durationMp4ToGo(muxerDTS, timeScale)
if muxerDTSGo > maxMuxerDTS {
maxMuxerDTS = muxerDTSGo
}
case "mdat":
if breakAtNextMdat {
return nil, errTerminated
}
}
return nil, nil
})
if err != nil && !errors.Is(err, errTerminated) {
return 0, err
}
if !atLeastOnePartWritten {
return 0, recordstore.ErrNoSegmentsFound
}
return maxMuxerDTS, nil
}
func segmentFMP4MuxParts(
r readSeekerAt,
segmentStartOffset time.Duration,
dtsOffset time.Duration,
duration time.Duration,
init *fmp4.Init,
m muxer,
) (time.Duration, error) {
var segmentStartOffsetMP4 int64
var dtsOffsetMP4 int64
var durationMP4 int64
moofOffset := uint64(0)
var tfhd *mp4.Tfhd
@@ -543,7 +413,7 @@ func segmentFMP4MuxParts(
m.setTrack(int(tfhd.TrackID))
timeScale = track.TimeScale
segmentStartOffsetMP4 = durationGoToMp4(segmentStartOffset, track.TimeScale)
dtsOffsetMP4 = durationGoToMp4(dtsOffset, track.TimeScale)
durationMP4 = durationGoToMp4(duration, track.TimeScale)
case "trun":
@@ -554,8 +424,7 @@ func segmentFMP4MuxParts(
trun := box.(*mp4.Trun)
dataOffset := moofOffset + uint64(trun.DataOffset)
muxerDTS := int64(tfdt.BaseMediaDecodeTimeV1) + segmentStartOffsetMP4
atLeastOneSampleWritten := false
muxerDTS := int64(tfdt.BaseMediaDecodeTimeV1) + dtsOffsetMP4
for _, e := range trun.Entries {
if muxerDTS >= durationMP4 {
@@ -588,14 +457,11 @@ func segmentFMP4MuxParts(
return nil, err
}
atLeastOneSampleWritten = true
dataOffset += uint64(e.SampleSize)
muxerDTS += int64(e.SampleDuration)
}
if atLeastOneSampleWritten {
m.writeFinalDTS(muxerDTS)
}
m.writeFinalDTS(muxerDTS)
muxerDTSGo := durationMp4ToGo(muxerDTS, timeScale)