playback: ignore NTP fluctiations when merging segments

This commit is contained in:
aler9
2025-05-26 22:26:22 +02:00
parent 99f4c73e07
commit dd12680ad6
4 changed files with 248 additions and 144 deletions

View File

@@ -48,30 +48,30 @@ func seekAndMux(
m muxer,
) error {
if recordFormat == conf.RecordFormatFMP4 {
var firstInit *fmp4.Init
var segmentEnd time.Time
f, err := os.Open(segments[0].Fpath)
if err != nil {
return err
}
defer f.Close()
firstInit, _, err = segmentFMP4ReadHeader(f)
firstInit, _, err := segmentFMP4ReadHeader(f)
if err != nil {
return err
}
m.writeInit(firstInit)
segmentStartOffset := segments[0].Start.Sub(start) // this is negative
dts := segments[0].Start.Sub(start) // this is negative
segmentDuration, err := segmentFMP4MuxParts(f, segmentStartOffset, duration, firstInit, m)
var segDuration time.Duration
segDuration, dts, err = segmentFMP4MuxParts(f, dts, duration, firstInit, m)
if err != nil {
return err
}
segmentEnd = start.Add(segmentDuration)
// use segDuration to compute ntp and call segmentFMP4CanBeConcatenated()
// in order to get the same behavior of the /list endpoint
ntp := start.Add(segDuration - start.Sub(segments[0].Start))
for _, seg := range segments[1:] {
f, err = os.Open(seg.Fpath)
@@ -86,19 +86,18 @@ func seekAndMux(
return err
}
if !segmentFMP4CanBeConcatenated(firstInit, segmentEnd, init, seg.Start) {
if !segmentFMP4CanBeConcatenated(firstInit, ntp, init, seg.Start) {
break
}
segmentStartOffset := seg.Start.Sub(start) // this is positive
var segmentDuration time.Duration
segmentDuration, err = segmentFMP4MuxParts(f, segmentStartOffset, duration, firstInit, m)
segDuration, dts, err = segmentFMP4MuxParts(f, dts, duration, firstInit, m)
if err != nil {
return err
}
segmentEnd = start.Add(segmentDuration)
// use segDuration to compute ntp and call segmentFMP4CanBeConcatenated()
// in order to get the same behavior of the /list endpoint
ntp = seg.Start.Add(segDuration)
}
err = m.flush()

View File

@@ -132,12 +132,12 @@ func writeSegment2(t *testing.T, fpath string) {
{
Tracks: []*fmp4.PartTrack{{
ID: 1,
BaseTime: 0,
BaseTime: 1 * 90000,
Samples: []*fmp4.Sample{
{
Duration: 1 * 90000,
Payload: []byte{7, 8},
}, // 1 sec
}, // 2 sec
},
}},
},
@@ -153,15 +153,18 @@ func writeSegment2(t *testing.T, fpath string) {
{
Duration: 1 * 48000,
Payload: []byte{7, 8},
}, // 2 secs
},
{
Duration: 1 * 48000,
Payload: []byte{9, 10},
}, // 3 secs
},
}},
},
{
SequenceNumber: 5,
Tracks: []*fmp4.PartTrack{{
ID: 1,
BaseTime: 1 * 90000,
BaseTime: 2 * 90000,
Samples: []*fmp4.Sample{
{
Duration: 1 * 90000,
@@ -170,10 +173,6 @@ func writeSegment2(t *testing.T, fpath string) {
{
Duration: 1 * 90000,
Payload: []byte{11, 12},
},
{
Duration: 1 * 90000,
Payload: []byte{13, 14},
}, // 4 secs
},
}},
@@ -239,9 +238,9 @@ func TestOnGet(t *testing.T) {
err = os.Mkdir(filepath.Join(dir, "mypath"), 0o755)
require.NoError(t, err)
writeSegment1(t, filepath.Join(dir, "mypath", "2008-11-07_11-22-00-500000.mp4"))
writeSegment2(t, filepath.Join(dir, "mypath", "2008-11-07_11-23-02-500000.mp4"))
writeSegment2(t, filepath.Join(dir, "mypath", "2008-11-07_11-23-04-500000.mp4"))
writeSegment1(t, filepath.Join(dir, "mypath", "2008-11-07_11-22-00-500000.mp4")) // start=0, len=62s/61s
writeSegment2(t, filepath.Join(dir, "mypath", "2008-11-07_11-23-01-500000.mp4")) // start=61s, len=4s/3s
writeSegment2(t, filepath.Join(dir, "mypath", "2008-11-07_11-23-04-500000.mp4")) // start=64s, len=4s/3s
s := &Server{
Address: "127.0.0.1:9996",
@@ -264,7 +263,7 @@ func TestOnGet(t *testing.T) {
v := url.Values{}
v.Set("path", "mypath")
v.Set("start", time.Date(2008, 11, 0o7, 11, 23, 1, 500000000, time.Local).Format(time.RFC3339Nano))
v.Set("start", time.Date(2008, 11, 7, 11, 23, 0, 500000000, time.Local).Format(time.RFC3339Nano))
v.Set("duration", "3")
v.Set("format", format)
u.RawQuery = v.Encode()
@@ -292,18 +291,14 @@ func TestOnGet(t *testing.T) {
SequenceNumber: 0,
Tracks: []*fmp4.PartTrack{
{
ID: 1,
ID: 1,
BaseTime: 0,
Samples: []*fmp4.Sample{
{
Duration: 0,
PTSOffset: 90000,
Payload: []byte{3, 4},
},
{
Duration: 90000,
PTSOffset: -90000,
IsNonSyncSample: true,
Payload: []byte{5, 6},
PTSOffset: 90000,
IsNonSyncSample: false,
Payload: []uint8{3, 4},
},
},
},
@@ -313,12 +308,14 @@ func TestOnGet(t *testing.T) {
SequenceNumber: 1,
Tracks: []*fmp4.PartTrack{
{
ID: 2,
BaseTime: 48000,
ID: 1,
BaseTime: 90000,
Samples: []*fmp4.Sample{
{
Duration: 48000,
Payload: []byte{5, 6},
Duration: 90000,
PTSOffset: -90000,
IsNonSyncSample: true,
Payload: []uint8{5, 6},
},
},
},
@@ -328,12 +325,14 @@ func TestOnGet(t *testing.T) {
SequenceNumber: 2,
Tracks: []*fmp4.PartTrack{
{
ID: 1,
BaseTime: 90000,
ID: 2,
BaseTime: 0,
Samples: []*fmp4.Sample{
{
Duration: 90000,
Payload: []byte{7, 8},
Duration: 48000,
PTSOffset: 0,
IsNonSyncSample: false,
Payload: []uint8{3, 4},
},
},
},
@@ -343,22 +342,43 @@ func TestOnGet(t *testing.T) {
SequenceNumber: 3,
Tracks: []*fmp4.PartTrack{
{
ID: 1,
BaseTime: 2 * 90000,
ID: 2,
BaseTime: 48000,
Samples: []*fmp4.Sample{
{
Duration: 90000,
Payload: []byte{9, 10},
Duration: 48000,
PTSOffset: 0,
IsNonSyncSample: false,
Payload: []uint8{5, 6},
},
},
},
},
},
{
SequenceNumber: 4,
Tracks: []*fmp4.PartTrack{
{
ID: 1,
BaseTime: 180000,
Samples: []*fmp4.Sample{
{
Duration: 90000,
PTSOffset: 0,
IsNonSyncSample: false,
Payload: []uint8{7, 8},
},
},
},
{
ID: 2,
BaseTime: 2 * 48000,
BaseTime: 96000,
Samples: []*fmp4.Sample{
{
Duration: 48000,
Payload: []byte{7, 8},
Duration: 48000,
PTSOffset: 0,
IsNonSyncSample: false,
Payload: []uint8{7, 8},
},
},
},
@@ -388,15 +408,17 @@ func TestOnGet(t *testing.T) {
{
ID: 1,
TimeScale: 90000,
TimeOffset: -90000,
TimeOffset: 0,
Codec: &mp4.CodecH264{
SPS: test.FormatH264.SPS,
PPS: test.FormatH264.PPS,
},
Samples: []*pmp4.Sample{
{
Duration: 90000,
PayloadSize: 2,
Duration: 90000,
PTSOffset: 0,
IsNonSyncSample: false,
PayloadSize: 2,
},
{
Duration: 90000,
@@ -405,19 +427,17 @@ func TestOnGet(t *testing.T) {
PayloadSize: 2,
},
{
Duration: 90000,
PayloadSize: 2,
},
{
Duration: 90000,
PayloadSize: 2,
Duration: 90000,
PTSOffset: 0,
IsNonSyncSample: false,
PayloadSize: 2,
},
},
},
{
ID: 2,
TimeScale: 48000,
TimeOffset: 48000,
TimeOffset: 0,
Codec: &mp4.CodecMPEG4Audio{
Config: mpeg4audio.Config{
Type: mpeg4audio.ObjectTypeAACLC,
@@ -434,6 +454,10 @@ func TestOnGet(t *testing.T) {
Duration: 48000,
PayloadSize: 2,
},
{
Duration: 48000,
PayloadSize: 2,
},
},
},
},
@@ -444,9 +468,9 @@ func TestOnGet(t *testing.T) {
{3, 4},
{5, 6},
{7, 8},
{9, 10},
},
2: {
{3, 4},
{5, 6},
{7, 8},
},
@@ -534,6 +558,7 @@ func TestOnGetDifferentInit(t *testing.T) {
}, parts)
}
// this tests that even if NTP changes, duration between samples does not.
func TestOnGetNTPCompensation(t *testing.T) {
dir, err := os.MkdirTemp("", "mediamtx-playback")
require.NoError(t, err)
@@ -542,8 +567,107 @@ func TestOnGetNTPCompensation(t *testing.T) {
err = os.Mkdir(filepath.Join(dir, "mypath"), 0o755)
require.NoError(t, err)
writeSegment1(t, filepath.Join(dir, "mypath", "2008-11-07_11-22-00-500000.mp4"))
writeSegment2(t, filepath.Join(dir, "mypath", "2008-11-07_11-23-02-000000.mp4")) // remove 0.5 secs
init := fmp4.Init{
Tracks: []*fmp4.InitTrack{
{
ID: 1,
TimeScale: 90000,
Codec: &mp4.CodecH264{
SPS: test.FormatH264.SPS,
PPS: test.FormatH264.PPS,
},
},
{
ID: 2,
TimeScale: 48000,
Codec: &mp4.CodecMPEG4Audio{
Config: mpeg4audio.Config{
Type: mpeg4audio.ObjectTypeAACLC,
SampleRate: 48000,
ChannelCount: 2,
},
},
},
},
}
func() {
var buf1 seekablebuffer.Buffer
err = init.Marshal(&buf1)
require.NoError(t, err)
var buf2 seekablebuffer.Buffer
parts := fmp4.Parts{
{
Tracks: []*fmp4.PartTrack{
{
ID: 1,
Samples: []*fmp4.Sample{
{
Duration: 1 * 90000,
Payload: []byte{1, 2},
}, // 1sec
},
},
{
ID: 2,
Samples: []*fmp4.Sample{
{
Duration: 0.5 * 48000,
Payload: []byte{3, 4},
}, // 0.5s secs
},
},
},
},
}
err = parts.Marshal(&buf2)
require.NoError(t, err)
err = os.WriteFile(filepath.Join(dir, "mypath", "2008-11-07_11-22-00-500000.mp4"),
append(buf1.Bytes(), buf2.Bytes()...), 0o644)
require.NoError(t, err)
}()
func() {
var buf1 seekablebuffer.Buffer
err = init.Marshal(&buf1)
require.NoError(t, err)
var buf2 seekablebuffer.Buffer
parts := fmp4.Parts{
{
Tracks: []*fmp4.PartTrack{{
ID: 1,
BaseTime: 0.5 * 90000,
Samples: []*fmp4.Sample{
{
Duration: 1 * 90000,
Payload: []byte{5, 6},
}, // 1.5sec
},
}},
},
{
Tracks: []*fmp4.PartTrack{{
ID: 2,
BaseTime: 0,
Samples: []*fmp4.Sample{
{
Duration: 0.5 * 48000,
Payload: []byte{7, 8},
}, // 0.5sec
},
}},
},
}
err = parts.Marshal(&buf2)
require.NoError(t, err)
err = os.WriteFile(filepath.Join(dir, "mypath", "2008-11-07_11-22-01-000000.mp4"), // add 0.5 secs
append(buf1.Bytes(), buf2.Bytes()...), 0o644)
require.NoError(t, err)
}()
s := &Server{
Address: "127.0.0.1:9996",
@@ -566,7 +690,7 @@ func TestOnGetNTPCompensation(t *testing.T) {
v := url.Values{}
v.Set("path", "mypath")
v.Set("start", time.Date(2008, 11, 0o7, 11, 23, 1, 500000000, time.Local).Format(time.RFC3339Nano))
v.Set("start", time.Date(2008, 11, 7, 11, 22, 0, 500000000, time.Local).Format(time.RFC3339Nano))
v.Set("duration", "3")
v.Set("format", "fmp4")
u.RawQuery = v.Encode()
@@ -587,7 +711,7 @@ func TestOnGetNTPCompensation(t *testing.T) {
err = parts.Unmarshal(buf)
require.NoError(t, err)
require.Equal(t, fmp4.Parts{
require.Equal(t, fmp4.Parts{ //nolint:dupl
{
SequenceNumber: 0,
Tracks: []*fmp4.PartTrack{
@@ -595,25 +719,8 @@ func TestOnGetNTPCompensation(t *testing.T) {
ID: 1,
Samples: []*fmp4.Sample{
{
Duration: 0,
PTSOffset: 90000,
Payload: []byte{3, 4},
},
{
Duration: 90000 - 45000,
PTSOffset: -90000,
IsNonSyncSample: true,
Payload: []byte{5, 6},
},
},
},
{
ID: 2,
BaseTime: 24000,
Samples: []*fmp4.Sample{
{
Duration: 48000,
Payload: []byte{5, 6},
Duration: 90000,
Payload: []byte{1, 2},
},
},
},
@@ -624,50 +731,24 @@ func TestOnGetNTPCompensation(t *testing.T) {
Tracks: []*fmp4.PartTrack{
{
ID: 1,
BaseTime: 90000 - 45000,
BaseTime: 90000,
Samples: []*fmp4.Sample{
{
Duration: 90000,
Payload: []byte{7, 8},
},
},
},
},
},
{
SequenceNumber: 2,
Tracks: []*fmp4.PartTrack{
{
ID: 1,
BaseTime: 2*90000 - 45000,
Samples: []*fmp4.Sample{
{
Duration: 90000,
Payload: []byte{9, 10},
},
},
},
},
},
{
SequenceNumber: 3,
Tracks: []*fmp4.PartTrack{
{
ID: 1,
BaseTime: 225000,
Samples: []*fmp4.Sample{
{
Duration: 90000,
Payload: []byte{11, 12},
Payload: []byte{5, 6},
},
},
},
{
ID: 2,
BaseTime: 72000,
BaseTime: 0,
Samples: []*fmp4.Sample{
{
Duration: 48000,
Duration: 0.5 * 48000,
Payload: []byte{3, 4},
},
{
Duration: 0.5 * 48000,
Payload: []byte{7, 8},
},
},

View File

@@ -99,9 +99,8 @@ func concatenateSegments(parsed []*parsedSegment) []listEntry {
out[len(out)-1].Start.Add(time.Duration(out[len(out)-1].Duration)),
parsed.init,
parsed.start) {
prevStart := out[len(out)-1].Start
curEnd := parsed.start.Add(parsed.duration)
out[len(out)-1].Duration = listEntryDuration(curEnd.Sub(prevStart))
out[len(out)-1].Duration = listEntryDuration(curEnd.Sub(out[len(out)-1].Start))
} else {
out = append(out, listEntry{
Start: parsed.start,

View File

@@ -15,6 +15,7 @@ import (
const (
sampleFlagIsNonSyncSample = 1 << 16
concatenationTolerance = 1 * time.Second
maxBasetime = 1 * time.Second
)
var errTerminated = errors.New("terminated")
@@ -25,6 +26,25 @@ type readSeekerAt interface {
io.ReaderAt
}
// try to guess where the next segment will start, using the same algorithm of nextSegmentStartingPos
func guessNextSegmentStartingPos(dtsPerTrack map[int]time.Duration) time.Duration {
var maxElapsed time.Duration
for _, dts := range dtsPerTrack {
if dts > maxElapsed {
maxElapsed = dts
}
}
minElapsed := maxElapsed
for _, dts := range dtsPerTrack {
if (maxElapsed-dts) <= maxBasetime && (dts <= minElapsed) {
minElapsed = dts
}
}
return minElapsed
}
func durationGoToMp4(v time.Duration, timeScale uint32) int64 {
timeScale64 := int64(timeScale)
secs := v / time.Second
@@ -249,7 +269,7 @@ func segmentFMP4ReadDurationFromParts(
return 0, err
}
var maxElapsed time.Duration
var maxDuration time.Duration
// foreach traf
@@ -351,37 +371,39 @@ outer:
return 0, fmt.Errorf("invalid trun box: %w", err)
}
elapsed := int64(tfdt.BaseMediaDecodeTimeV1)
duration := int64(tfdt.BaseMediaDecodeTimeV1)
for _, entry := range trun.Entries {
elapsed += int64(entry.SampleDuration)
duration += int64(entry.SampleDuration)
}
elapsedGo := durationMp4ToGo(elapsed, track.TimeScale)
durationGo := durationMp4ToGo(duration, track.TimeScale)
if elapsedGo > maxElapsed {
maxElapsed = elapsedGo
if durationGo > maxDuration {
maxDuration = durationGo
}
}
return maxElapsed, nil
return maxDuration, nil
}
func segmentFMP4MuxParts(
r readSeekerAt,
dtsOffset time.Duration,
start time.Duration,
duration time.Duration,
init *fmp4.Init,
m muxer,
) (time.Duration, error) {
var dtsOffsetMP4 int64
) (time.Duration, time.Duration, error) {
var startMP4 int64
var durationMP4 int64
moofOffset := uint64(0)
var tfhd *mp4.Tfhd
var tfdt *mp4.Tfdt
var timeScale uint32
var maxMuxerDTS time.Duration
breakAtNextMdat := false
var curTrackID int
dtsPerTrack := make(map[int]time.Duration)
var segDuration time.Duration
_, err := mp4.ReadBoxStructure(r, func(h *mp4.ReadHandle) (interface{}, error) {
switch h.BoxInfo.Type.String() {
@@ -411,9 +433,10 @@ func segmentFMP4MuxParts(
return nil, fmt.Errorf("invalid track ID: %v", tfhd.TrackID)
}
m.setTrack(int(tfhd.TrackID))
curTrackID = int(tfhd.TrackID)
m.setTrack(curTrackID)
timeScale = track.TimeScale
dtsOffsetMP4 = durationGoToMp4(dtsOffset, track.TimeScale)
startMP4 = durationGoToMp4(start, track.TimeScale)
durationMP4 = durationGoToMp4(duration, track.TimeScale)
case "trun":
@@ -424,10 +447,11 @@ func segmentFMP4MuxParts(
trun := box.(*mp4.Trun)
dataOffset := moofOffset + uint64(trun.DataOffset)
muxerDTS := int64(tfdt.BaseMediaDecodeTimeV1) + dtsOffsetMP4
dts := int64(tfdt.BaseMediaDecodeTimeV1) + startMP4
fmt.Println("FIDTS", dts)
for _, e := range trun.Entries {
if muxerDTS >= durationMP4 {
if dts >= durationMP4 {
breakAtNextMdat = true
break
}
@@ -436,7 +460,7 @@ func segmentFMP4MuxParts(
sampleSize := e.SampleSize
err = m.writeSample(
muxerDTS,
dts,
e.SampleCompositionTimeOffsetV1,
(e.SampleFlags&sampleFlagIsNonSyncSample) != 0,
e.SampleSize,
@@ -458,15 +482,15 @@ func segmentFMP4MuxParts(
}
dataOffset += uint64(e.SampleSize)
muxerDTS += int64(e.SampleDuration)
dts += int64(e.SampleDuration)
}
m.writeFinalDTS(muxerDTS)
m.writeFinalDTS(dts)
dtsPerTrack[curTrackID] = durationMp4ToGo(dts, timeScale)
muxerDTSGo := durationMp4ToGo(muxerDTS, timeScale)
if muxerDTSGo > maxMuxerDTS {
maxMuxerDTS = muxerDTSGo
relDuration := durationMp4ToGo(dts-startMP4, timeScale)
if relDuration > segDuration {
segDuration = relDuration
}
case "mdat":
@@ -477,8 +501,9 @@ func segmentFMP4MuxParts(
return nil, nil
})
if err != nil && !errors.Is(err, errTerminated) {
return 0, err
return 0, 0, err
}
return maxMuxerDTS, nil
nextSegmentStart := guessNextSegmentStartingPos(dtsPerTrack)
return segDuration, nextSegmentStart, nil
}