add base class to all data units (#2122)

needed by #2068
This commit is contained in:
Alessandro Ros
2023-07-30 22:55:13 +02:00
committed by GitHub
parent db3862cf0d
commit 08d6d0b888
19 changed files with 213 additions and 195 deletions

View File

@@ -80,9 +80,11 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan
c.OnDataH26x(track, func(pts time.Duration, dts time.Duration, au [][]byte) { c.OnDataH26x(track, func(pts time.Duration, dts time.Duration, au [][]byte) {
stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitH264{ stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitH264{
BaseUnit: formatprocessor.BaseUnit{
NTP: time.Now(),
},
PTS: pts, PTS: pts,
AU: au, AU: au,
NTP: time.Now(),
}) })
}) })
@@ -99,9 +101,11 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan
c.OnDataH26x(track, func(pts time.Duration, dts time.Duration, au [][]byte) { c.OnDataH26x(track, func(pts time.Duration, dts time.Duration, au [][]byte) {
stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitH265{ stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitH265{
BaseUnit: formatprocessor.BaseUnit{
NTP: time.Now(),
},
PTS: pts, PTS: pts,
AU: au, AU: au,
NTP: time.Now(),
}) })
}) })
@@ -119,9 +123,11 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan
c.OnDataMPEG4Audio(track, func(pts time.Duration, dts time.Duration, aus [][]byte) { c.OnDataMPEG4Audio(track, func(pts time.Duration, dts time.Duration, aus [][]byte) {
stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitMPEG4AudioGeneric{ stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitMPEG4AudioGeneric{
BaseUnit: formatprocessor.BaseUnit{
NTP: time.Now(),
},
PTS: pts, PTS: pts,
AUs: aus, AUs: aus,
NTP: time.Now(),
}) })
}) })
@@ -136,9 +142,11 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan
c.OnDataOpus(track, func(pts time.Duration, dts time.Duration, packets [][]byte) { c.OnDataOpus(track, func(pts time.Duration, dts time.Duration, packets [][]byte) {
stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitOpus{ stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitOpus{
BaseUnit: formatprocessor.BaseUnit{
NTP: time.Now(),
},
PTS: pts, PTS: pts,
Packets: packets, Packets: packets,
NTP: time.Now(),
}) })
}) })
} }

View File

@@ -100,9 +100,11 @@ func (s *rpiCameraSource) run(ctx context.Context, cnf *conf.PathConf, reloadCon
} }
stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitH264{ stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitH264{
BaseUnit: formatprocessor.BaseUnit{
NTP: time.Now(),
},
PTS: dts, PTS: dts,
AU: au, AU: au,
NTP: time.Now(),
}) })
} }

View File

@@ -64,9 +64,11 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream.S
} }
stream.WriteUnit(medi, format, &formatprocessor.UnitH264{ stream.WriteUnit(medi, format, &formatprocessor.UnitH264{
BaseUnit: formatprocessor.BaseUnit{
NTP: time.Now(),
},
PTS: tmsg.DTS + tmsg.PTSDelta, PTS: tmsg.DTS + tmsg.PTSDelta,
AU: au, AU: au,
NTP: time.Now(),
}) })
case message.VideoTypeAU: case message.VideoTypeAU:
@@ -76,9 +78,11 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream.S
} }
stream.WriteUnit(medi, format, &formatprocessor.UnitH264{ stream.WriteUnit(medi, format, &formatprocessor.UnitH264{
BaseUnit: formatprocessor.BaseUnit{
NTP: time.Now(),
},
PTS: tmsg.DTS + tmsg.PTSDelta, PTS: tmsg.DTS + tmsg.PTSDelta,
AU: au, AU: au,
NTP: time.Now(),
}) })
} }
@@ -95,9 +99,11 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream.S
} }
stream.WriteUnit(medi, format, &formatprocessor.UnitH265{ stream.WriteUnit(medi, format, &formatprocessor.UnitH265{
BaseUnit: formatprocessor.BaseUnit{
NTP: time.Now(),
},
PTS: tmsg.DTS + tmsg.PTSDelta, PTS: tmsg.DTS + tmsg.PTSDelta,
AU: au, AU: au,
NTP: time.Now(),
}) })
case *message.ExtendedFramesX: case *message.ExtendedFramesX:
@@ -107,9 +113,11 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream.S
} }
stream.WriteUnit(medi, format, &formatprocessor.UnitH265{ stream.WriteUnit(medi, format, &formatprocessor.UnitH265{
BaseUnit: formatprocessor.BaseUnit{
NTP: time.Now(),
},
PTS: tmsg.DTS, PTS: tmsg.DTS,
AU: au, AU: au,
NTP: time.Now(),
}) })
case *message.ExtendedCodedFrames: case *message.ExtendedCodedFrames:
@@ -119,9 +127,11 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream.S
} }
stream.WriteUnit(medi, format, &formatprocessor.UnitH265{ stream.WriteUnit(medi, format, &formatprocessor.UnitH265{
BaseUnit: formatprocessor.BaseUnit{
NTP: time.Now(),
},
PTS: tmsg.DTS + tmsg.PTSDelta, PTS: tmsg.DTS + tmsg.PTSDelta,
AU: au, AU: au,
NTP: time.Now(),
}) })
} }
@@ -137,9 +147,11 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream.S
} }
stream.WriteUnit(medi, format, &formatprocessor.UnitAV1{ stream.WriteUnit(medi, format, &formatprocessor.UnitAV1{
BaseUnit: formatprocessor.BaseUnit{
NTP: time.Now(),
},
PTS: tmsg.DTS, PTS: tmsg.DTS,
OBUs: obus, OBUs: obus,
NTP: time.Now(),
}) })
} }
@@ -151,9 +163,11 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream.S
tmsg := msg.(*message.Audio) tmsg := msg.(*message.Audio)
stream.WriteUnit(medi, format, &formatprocessor.UnitMPEG2Audio{ stream.WriteUnit(medi, format, &formatprocessor.UnitMPEG2Audio{
BaseUnit: formatprocessor.BaseUnit{
NTP: time.Now(),
},
PTS: tmsg.DTS, PTS: tmsg.DTS,
Frames: [][]byte{tmsg.Payload}, Frames: [][]byte{tmsg.Payload},
NTP: time.Now(),
}) })
return nil return nil
@@ -165,9 +179,11 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream.S
if tmsg.AACType == message.AudioAACTypeAU { if tmsg.AACType == message.AudioAACTypeAU {
stream.WriteUnit(medi, format, &formatprocessor.UnitMPEG4AudioGeneric{ stream.WriteUnit(medi, format, &formatprocessor.UnitMPEG4AudioGeneric{
BaseUnit: formatprocessor.BaseUnit{
NTP: time.Now(),
},
PTS: tmsg.DTS, PTS: tmsg.DTS,
AUs: [][]byte{tmsg.Payload}, AUs: [][]byte{tmsg.Payload},
NTP: time.Now(),
}) })
} }

View File

@@ -165,9 +165,11 @@ func (s *udpSource) runReader(pc net.PacketConn) error {
r.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) error { r.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) error {
stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitH264{ stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitH264{
BaseUnit: formatprocessor.BaseUnit{
NTP: time.Now(),
},
PTS: decodeTime(pts), PTS: decodeTime(pts),
AU: au, AU: au,
NTP: time.Now(),
}) })
return nil return nil
}) })
@@ -182,9 +184,11 @@ func (s *udpSource) runReader(pc net.PacketConn) error {
r.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) error { r.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) error {
stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitH265{ stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitH265{
BaseUnit: formatprocessor.BaseUnit{
NTP: time.Now(),
},
PTS: decodeTime(pts), PTS: decodeTime(pts),
AU: au, AU: au,
NTP: time.Now(),
}) })
return nil return nil
}) })
@@ -203,9 +207,11 @@ func (s *udpSource) runReader(pc net.PacketConn) error {
r.OnDataMPEG4Audio(track, func(pts int64, _ int64, aus [][]byte) error { r.OnDataMPEG4Audio(track, func(pts int64, _ int64, aus [][]byte) error {
stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitMPEG4AudioGeneric{ stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitMPEG4AudioGeneric{
BaseUnit: formatprocessor.BaseUnit{
NTP: time.Now(),
},
PTS: decodeTime(pts), PTS: decodeTime(pts),
AUs: aus, AUs: aus,
NTP: time.Now(),
}) })
return nil return nil
}) })
@@ -221,9 +227,11 @@ func (s *udpSource) runReader(pc net.PacketConn) error {
r.OnDataOpus(track, func(pts int64, _ int64, packets [][]byte) error { r.OnDataOpus(track, func(pts int64, _ int64, packets [][]byte) error {
stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitOpus{ stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitOpus{
BaseUnit: formatprocessor.BaseUnit{
NTP: time.Now(),
},
PTS: decodeTime(pts), PTS: decodeTime(pts),
Packets: packets, Packets: packets,
NTP: time.Now(),
}) })
return nil return nil
}) })

View File

@@ -14,20 +14,9 @@ import (
// UnitAV1 is an AV1 data unit. // UnitAV1 is an AV1 data unit.
type UnitAV1 struct { type UnitAV1 struct {
RTPPackets []*rtp.Packet BaseUnit
NTP time.Time PTS time.Duration
PTS time.Duration OBUs [][]byte
OBUs [][]byte
}
// GetRTPPackets implements Unit.
func (d *UnitAV1) GetRTPPackets() []*rtp.Packet {
return d.RTPPackets
}
// GetNTP implements Unit.
func (d *UnitAV1) GetNTP() time.Time {
return d.NTP
} }
type formatProcessorAV1 struct { type formatProcessorAV1 struct {
@@ -145,7 +134,9 @@ func (t *formatProcessorAV1) Process(unit Unit, hasNonRTSPReaders bool) error {
func (t *formatProcessorAV1) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time) Unit { func (t *formatProcessorAV1) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time) Unit {
return &UnitAV1{ return &UnitAV1{
RTPPackets: []*rtp.Packet{pkt}, BaseUnit: BaseUnit{
NTP: ntp, RTPPackets: []*rtp.Packet{pkt},
NTP: ntp,
},
} }
} }

View File

@@ -8,7 +8,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestAV1KeyFrameWarning(t *testing.T) { func TestAV1KeyFrameWarning(t *testing.T) { //nolint:dupl
forma := &formats.AV1{ forma := &formats.AV1{
PayloadTyp: 96, PayloadTyp: 96,
} }
@@ -19,19 +19,23 @@ func TestAV1KeyFrameWarning(t *testing.T) {
ntp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) ntp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
err = p.Process(&UnitAV1{ err = p.Process(&UnitAV1{
BaseUnit: BaseUnit{
NTP: ntp,
},
OBUs: [][]byte{ OBUs: [][]byte{
{0x01}, {0x01},
}, },
NTP: ntp,
}, false) }, false)
require.NoError(t, err) require.NoError(t, err)
ntp = ntp.Add(30 * time.Second) ntp = ntp.Add(30 * time.Second)
err = p.Process(&UnitAV1{ err = p.Process(&UnitAV1{
BaseUnit: BaseUnit{
NTP: ntp,
},
OBUs: [][]byte{ OBUs: [][]byte{
{0x01}, {0x01},
}, },
NTP: ntp,
}, false) }, false)
require.NoError(t, err) require.NoError(t, err)

View File

@@ -12,18 +12,7 @@ import (
// UnitGeneric is a generic data unit. // UnitGeneric is a generic data unit.
type UnitGeneric struct { type UnitGeneric struct {
RTPPackets []*rtp.Packet BaseUnit
NTP time.Time
}
// GetRTPPackets implements Unit.
func (d *UnitGeneric) GetRTPPackets() []*rtp.Packet {
return d.RTPPackets
}
// GetNTP implements Unit.
func (d *UnitGeneric) GetNTP() time.Time {
return d.NTP
} }
type formatProcessorGeneric struct { type formatProcessorGeneric struct {
@@ -64,7 +53,9 @@ func (t *formatProcessorGeneric) Process(unit Unit, _ bool) error {
func (t *formatProcessorGeneric) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time) Unit { func (t *formatProcessorGeneric) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time) Unit {
return &UnitGeneric{ return &UnitGeneric{
RTPPackets: []*rtp.Packet{pkt}, BaseUnit: BaseUnit{
NTP: ntp, RTPPackets: []*rtp.Packet{pkt},
NTP: ntp,
},
} }
} }

View File

@@ -33,7 +33,9 @@ func TestGenericRemovePadding(t *testing.T) {
} }
err = p.Process(&UnitGeneric{ err = p.Process(&UnitGeneric{
RTPPackets: []*rtp.Packet{pkt}, BaseUnit: BaseUnit{
RTPPackets: []*rtp.Packet{pkt},
},
}, false) }, false)
require.NoError(t, err) require.NoError(t, err)

View File

@@ -71,20 +71,9 @@ func rtpH264ExtractSPSPPS(pkt *rtp.Packet) ([]byte, []byte) {
// UnitH264 is a H264 data unit. // UnitH264 is a H264 data unit.
type UnitH264 struct { type UnitH264 struct {
RTPPackets []*rtp.Packet BaseUnit
NTP time.Time PTS time.Duration
PTS time.Duration AU [][]byte
AU [][]byte
}
// GetRTPPackets implements Unit.
func (d *UnitH264) GetRTPPackets() []*rtp.Packet {
return d.RTPPackets
}
// GetNTP implements Unit.
func (d *UnitH264) GetNTP() time.Time {
return d.NTP
} }
type formatProcessorH264 struct { type formatProcessorH264 struct {
@@ -334,7 +323,9 @@ func (t *formatProcessorH264) Process(unit Unit, hasNonRTSPReaders bool) error {
func (t *formatProcessorH264) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time) Unit { func (t *formatProcessorH264) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time) Unit {
return &UnitH264{ return &UnitH264{
RTPPackets: []*rtp.Packet{pkt}, BaseUnit: BaseUnit{
NTP: ntp, RTPPackets: []*rtp.Packet{pkt},
NTP: ntp,
},
} }
} }

View File

@@ -37,7 +37,11 @@ func TestH264DynamicParams(t *testing.T) {
pkts, err := enc.Encode([][]byte{{byte(h264.NALUTypeIDR)}}, 0) pkts, err := enc.Encode([][]byte{{byte(h264.NALUTypeIDR)}}, 0)
require.NoError(t, err) require.NoError(t, err)
data := &UnitH264{RTPPackets: []*rtp.Packet{pkts[0]}} data := &UnitH264{
BaseUnit: BaseUnit{
RTPPackets: []*rtp.Packet{pkts[0]},
},
}
p.Process(data, true) p.Process(data, true)
require.Equal(t, [][]byte{ require.Equal(t, [][]byte{
@@ -46,18 +50,30 @@ func TestH264DynamicParams(t *testing.T) {
pkts, err = enc.Encode([][]byte{{7, 4, 5, 6}}, 0) // SPS pkts, err = enc.Encode([][]byte{{7, 4, 5, 6}}, 0) // SPS
require.NoError(t, err) require.NoError(t, err)
p.Process(&UnitH264{RTPPackets: []*rtp.Packet{pkts[0]}}, false) p.Process(&UnitH264{
BaseUnit: BaseUnit{
RTPPackets: []*rtp.Packet{pkts[0]},
},
}, false)
pkts, err = enc.Encode([][]byte{{8, 1}}, 0) // PPS pkts, err = enc.Encode([][]byte{{8, 1}}, 0) // PPS
require.NoError(t, err) require.NoError(t, err)
p.Process(&UnitH264{RTPPackets: []*rtp.Packet{pkts[0]}}, false) p.Process(&UnitH264{
BaseUnit: BaseUnit{
RTPPackets: []*rtp.Packet{pkts[0]},
},
}, false)
require.Equal(t, []byte{7, 4, 5, 6}, forma.SPS) require.Equal(t, []byte{7, 4, 5, 6}, forma.SPS)
require.Equal(t, []byte{8, 1}, forma.PPS) require.Equal(t, []byte{8, 1}, forma.PPS)
pkts, err = enc.Encode([][]byte{{byte(h264.NALUTypeIDR)}}, 0) pkts, err = enc.Encode([][]byte{{byte(h264.NALUTypeIDR)}}, 0)
require.NoError(t, err) require.NoError(t, err)
data = &UnitH264{RTPPackets: []*rtp.Packet{pkts[0]}} data = &UnitH264{
BaseUnit: BaseUnit{
RTPPackets: []*rtp.Packet{pkts[0]},
},
}
p.Process(data, true) p.Process(data, true)
require.Equal(t, [][]byte{ require.Equal(t, [][]byte{
@@ -118,7 +134,11 @@ func TestH264OversizedPackets(t *testing.T) {
Payload: []byte{0x1c, 0b01000000, 0x01, 0x02, 0x03, 0x04}, Payload: []byte{0x1c, 0b01000000, 0x01, 0x02, 0x03, 0x04},
}, },
} { } {
data := &UnitH264{RTPPackets: []*rtp.Packet{pkt}} data := &UnitH264{
BaseUnit: BaseUnit{
RTPPackets: []*rtp.Packet{pkt},
},
}
p.Process(data, false) p.Process(data, false)
out = append(out, data.RTPPackets...) out = append(out, data.RTPPackets...)
} }
@@ -200,19 +220,23 @@ func TestH264KeyFrameWarning(t *testing.T) {
ntp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) ntp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
err = p.Process(&UnitH264{ err = p.Process(&UnitH264{
BaseUnit: BaseUnit{
NTP: ntp,
},
AU: [][]byte{ AU: [][]byte{
{0x01}, {0x01},
}, },
NTP: ntp,
}, false) }, false)
require.NoError(t, err) require.NoError(t, err)
ntp = ntp.Add(30 * time.Second) ntp = ntp.Add(30 * time.Second)
err = p.Process(&UnitH264{ err = p.Process(&UnitH264{
BaseUnit: BaseUnit{
NTP: ntp,
},
AU: [][]byte{ AU: [][]byte{
{0x01}, {0x01},
}, },
NTP: ntp,
}, false) }, false)
require.NoError(t, err) require.NoError(t, err)

View File

@@ -78,20 +78,9 @@ func rtpH265ExtractVPSSPSPPS(pkt *rtp.Packet) ([]byte, []byte, []byte) {
// UnitH265 is a H265 data unit. // UnitH265 is a H265 data unit.
type UnitH265 struct { type UnitH265 struct {
RTPPackets []*rtp.Packet BaseUnit
NTP time.Time PTS time.Duration
PTS time.Duration AU [][]byte
AU [][]byte
}
// GetRTPPackets implements Unit.
func (d *UnitH265) GetRTPPackets() []*rtp.Packet {
return d.RTPPackets
}
// GetNTP implements Unit.
func (d *UnitH265) GetNTP() time.Time {
return d.NTP
} }
type formatProcessorH265 struct { type formatProcessorH265 struct {
@@ -356,7 +345,9 @@ func (t *formatProcessorH265) Process(unit Unit, hasNonRTSPReaders bool) error {
func (t *formatProcessorH265) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time) Unit { func (t *formatProcessorH265) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time) Unit {
return &UnitH265{ return &UnitH265{
RTPPackets: []*rtp.Packet{pkt}, BaseUnit: BaseUnit{
NTP: ntp, RTPPackets: []*rtp.Packet{pkt},
NTP: ntp,
},
} }
} }

View File

@@ -25,7 +25,11 @@ func TestH265DynamicParams(t *testing.T) {
pkts, err := enc.Encode([][]byte{{byte(h265.NALUType_CRA_NUT) << 1, 0}}, 0) pkts, err := enc.Encode([][]byte{{byte(h265.NALUType_CRA_NUT) << 1, 0}}, 0)
require.NoError(t, err) require.NoError(t, err)
data := &UnitH265{RTPPackets: []*rtp.Packet{pkts[0]}} data := &UnitH265{
BaseUnit: BaseUnit{
RTPPackets: []*rtp.Packet{pkts[0]},
},
}
p.Process(data, true) p.Process(data, true)
require.Equal(t, [][]byte{ require.Equal(t, [][]byte{
@@ -34,15 +38,27 @@ func TestH265DynamicParams(t *testing.T) {
pkts, err = enc.Encode([][]byte{{byte(h265.NALUType_VPS_NUT) << 1, 1, 2, 3}}, 0) pkts, err = enc.Encode([][]byte{{byte(h265.NALUType_VPS_NUT) << 1, 1, 2, 3}}, 0)
require.NoError(t, err) require.NoError(t, err)
p.Process(&UnitH265{RTPPackets: []*rtp.Packet{pkts[0]}}, false) p.Process(&UnitH265{
BaseUnit: BaseUnit{
RTPPackets: []*rtp.Packet{pkts[0]},
},
}, false)
pkts, err = enc.Encode([][]byte{{byte(h265.NALUType_SPS_NUT) << 1, 4, 5, 6}}, 0) pkts, err = enc.Encode([][]byte{{byte(h265.NALUType_SPS_NUT) << 1, 4, 5, 6}}, 0)
require.NoError(t, err) require.NoError(t, err)
p.Process(&UnitH265{RTPPackets: []*rtp.Packet{pkts[0]}}, false) p.Process(&UnitH265{
BaseUnit: BaseUnit{
RTPPackets: []*rtp.Packet{pkts[0]},
},
}, false)
pkts, err = enc.Encode([][]byte{{byte(h265.NALUType_PPS_NUT) << 1, 7, 8, 9}}, 0) pkts, err = enc.Encode([][]byte{{byte(h265.NALUType_PPS_NUT) << 1, 7, 8, 9}}, 0)
require.NoError(t, err) require.NoError(t, err)
p.Process(&UnitH265{RTPPackets: []*rtp.Packet{pkts[0]}}, false) p.Process(&UnitH265{
BaseUnit: BaseUnit{
RTPPackets: []*rtp.Packet{pkts[0]},
},
}, false)
require.Equal(t, []byte{byte(h265.NALUType_VPS_NUT) << 1, 1, 2, 3}, forma.VPS) require.Equal(t, []byte{byte(h265.NALUType_VPS_NUT) << 1, 1, 2, 3}, forma.VPS)
require.Equal(t, []byte{byte(h265.NALUType_SPS_NUT) << 1, 4, 5, 6}, forma.SPS) require.Equal(t, []byte{byte(h265.NALUType_SPS_NUT) << 1, 4, 5, 6}, forma.SPS)
@@ -50,7 +66,11 @@ func TestH265DynamicParams(t *testing.T) {
pkts, err = enc.Encode([][]byte{{byte(h265.NALUType_CRA_NUT) << 1, 0}}, 0) pkts, err = enc.Encode([][]byte{{byte(h265.NALUType_CRA_NUT) << 1, 0}}, 0)
require.NoError(t, err) require.NoError(t, err)
data = &UnitH265{RTPPackets: []*rtp.Packet{pkts[0]}} data = &UnitH265{
BaseUnit: BaseUnit{
RTPPackets: []*rtp.Packet{pkts[0]},
},
}
p.Process(data, true) p.Process(data, true)
require.Equal(t, [][]byte{ require.Equal(t, [][]byte{
@@ -100,7 +120,11 @@ func TestH265OversizedPackets(t *testing.T) {
Payload: bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 2000/4), Payload: bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 2000/4),
}, },
} { } {
data := &UnitH265{RTPPackets: []*rtp.Packet{pkt}} data := &UnitH265{
BaseUnit: BaseUnit{
RTPPackets: []*rtp.Packet{pkt},
},
}
p.Process(data, false) p.Process(data, false)
out = append(out, data.RTPPackets...) out = append(out, data.RTPPackets...)
} }
@@ -170,7 +194,7 @@ func TestH265EmptyPacket(t *testing.T) {
require.Equal(t, []*rtp.Packet(nil), unit.RTPPackets) require.Equal(t, []*rtp.Packet(nil), unit.RTPPackets)
} }
func TestH265KeyFrameWarning(t *testing.T) { func TestH265KeyFrameWarning(t *testing.T) { //nolint:dupl
forma := &formats.H265{ forma := &formats.H265{
PayloadTyp: 96, PayloadTyp: 96,
} }
@@ -181,19 +205,23 @@ func TestH265KeyFrameWarning(t *testing.T) {
ntp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) ntp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
err = p.Process(&UnitH265{ err = p.Process(&UnitH265{
BaseUnit: BaseUnit{
NTP: ntp,
},
AU: [][]byte{ AU: [][]byte{
{0x01}, {0x01},
}, },
NTP: ntp,
}, false) }, false)
require.NoError(t, err) require.NoError(t, err)
ntp = ntp.Add(30 * time.Second) ntp = ntp.Add(30 * time.Second)
err = p.Process(&UnitH265{ err = p.Process(&UnitH265{
BaseUnit: BaseUnit{
NTP: ntp,
},
AU: [][]byte{ AU: [][]byte{
{0x01}, {0x01},
}, },
NTP: ntp,
}, false) }, false)
require.NoError(t, err) require.NoError(t, err)

View File

@@ -13,20 +13,9 @@ import (
// UnitMPEG2Audio is a MPEG-1/2 Audio data unit. // UnitMPEG2Audio is a MPEG-1/2 Audio data unit.
type UnitMPEG2Audio struct { type UnitMPEG2Audio struct {
RTPPackets []*rtp.Packet BaseUnit
NTP time.Time PTS time.Duration
PTS time.Duration Frames [][]byte
Frames [][]byte
}
// GetRTPPackets implements Unit.
func (d *UnitMPEG2Audio) GetRTPPackets() []*rtp.Packet {
return d.RTPPackets
}
// GetNTP implements Unit.
func (d *UnitMPEG2Audio) GetNTP() time.Time {
return d.NTP
} }
type formatProcessorMPEG2Audio struct { type formatProcessorMPEG2Audio struct {
@@ -117,7 +106,9 @@ func (t *formatProcessorMPEG2Audio) Process(unit Unit, hasNonRTSPReaders bool) e
func (t *formatProcessorMPEG2Audio) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time) Unit { func (t *formatProcessorMPEG2Audio) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time) Unit {
return &UnitMPEG2Audio{ return &UnitMPEG2Audio{
RTPPackets: []*rtp.Packet{pkt}, BaseUnit: BaseUnit{
NTP: ntp, RTPPackets: []*rtp.Packet{pkt},
NTP: ntp,
},
} }
} }

View File

@@ -13,20 +13,9 @@ import (
// UnitMPEG4AudioGeneric is a MPEG-4 Audio data unit. // UnitMPEG4AudioGeneric is a MPEG-4 Audio data unit.
type UnitMPEG4AudioGeneric struct { type UnitMPEG4AudioGeneric struct {
RTPPackets []*rtp.Packet BaseUnit
NTP time.Time PTS time.Duration
PTS time.Duration AUs [][]byte
AUs [][]byte
}
// GetRTPPackets implements Unit.
func (d *UnitMPEG4AudioGeneric) GetRTPPackets() []*rtp.Packet {
return d.RTPPackets
}
// GetNTP implements Unit.
func (d *UnitMPEG4AudioGeneric) GetNTP() time.Time {
return d.NTP
} }
type formatProcessorMPEG4AudioGeneric struct { type formatProcessorMPEG4AudioGeneric struct {
@@ -122,7 +111,9 @@ func (t *formatProcessorMPEG4AudioGeneric) Process(unit Unit, hasNonRTSPReaders
func (t *formatProcessorMPEG4AudioGeneric) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time) Unit { func (t *formatProcessorMPEG4AudioGeneric) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time) Unit {
return &UnitMPEG4AudioGeneric{ return &UnitMPEG4AudioGeneric{
RTPPackets: []*rtp.Packet{pkt}, BaseUnit: BaseUnit{
NTP: ntp, RTPPackets: []*rtp.Packet{pkt},
NTP: ntp,
},
} }
} }

View File

@@ -13,20 +13,9 @@ import (
// UnitMPEG4AudioLATM is a MPEG-4 Audio data unit. // UnitMPEG4AudioLATM is a MPEG-4 Audio data unit.
type UnitMPEG4AudioLATM struct { type UnitMPEG4AudioLATM struct {
RTPPackets []*rtp.Packet BaseUnit
NTP time.Time PTS time.Duration
PTS time.Duration AU []byte
AU []byte
}
// GetRTPPackets implements Unit.
func (d *UnitMPEG4AudioLATM) GetRTPPackets() []*rtp.Packet {
return d.RTPPackets
}
// GetNTP implements Unit.
func (d *UnitMPEG4AudioLATM) GetNTP() time.Time {
return d.NTP
} }
type formatProcessorMPEG4AudioLATM struct { type formatProcessorMPEG4AudioLATM struct {
@@ -118,7 +107,9 @@ func (t *formatProcessorMPEG4AudioLATM) Process(unit Unit, hasNonRTSPReaders boo
func (t *formatProcessorMPEG4AudioLATM) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time) Unit { func (t *formatProcessorMPEG4AudioLATM) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time) Unit {
return &UnitMPEG4AudioLATM{ return &UnitMPEG4AudioLATM{
RTPPackets: []*rtp.Packet{pkt}, BaseUnit: BaseUnit{
NTP: ntp, RTPPackets: []*rtp.Packet{pkt},
NTP: ntp,
},
} }
} }

View File

@@ -14,20 +14,9 @@ import (
// UnitOpus is a Opus data unit. // UnitOpus is a Opus data unit.
type UnitOpus struct { type UnitOpus struct {
RTPPackets []*rtp.Packet BaseUnit
NTP time.Time PTS time.Duration
PTS time.Duration Packets [][]byte
Packets [][]byte
}
// GetRTPPackets implements Unit.
func (d *UnitOpus) GetRTPPackets() []*rtp.Packet {
return d.RTPPackets
}
// GetNTP implements Unit.
func (d *UnitOpus) GetNTP() time.Time {
return d.NTP
} }
type formatProcessorOpus struct { type formatProcessorOpus struct {
@@ -124,7 +113,9 @@ func (t *formatProcessorOpus) Process(unit Unit, hasNonRTSPReaders bool) error {
func (t *formatProcessorOpus) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time) Unit { func (t *formatProcessorOpus) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time) Unit {
return &UnitOpus{ return &UnitOpus{
RTPPackets: []*rtp.Packet{pkt}, BaseUnit: BaseUnit{
NTP: ntp, RTPPackets: []*rtp.Packet{pkt},
NTP: ntp,
},
} }
} }

View File

@@ -6,6 +6,22 @@ import (
"github.com/pion/rtp" "github.com/pion/rtp"
) )
// BaseUnit contains fields shared across all units.
type BaseUnit struct {
RTPPackets []*rtp.Packet
NTP time.Time
}
// GetRTPPackets implements Unit.
func (u *BaseUnit) GetRTPPackets() []*rtp.Packet {
return u.RTPPackets
}
// GetNTP implements Unit.
func (u *BaseUnit) GetNTP() time.Time {
return u.NTP
}
// Unit is the elementary data unit routed across the server. // Unit is the elementary data unit routed across the server.
type Unit interface { type Unit interface {
// returns RTP packets contained into the unit. // returns RTP packets contained into the unit.

View File

@@ -13,20 +13,9 @@ import (
// UnitVP8 is a VP8 data unit. // UnitVP8 is a VP8 data unit.
type UnitVP8 struct { type UnitVP8 struct {
RTPPackets []*rtp.Packet BaseUnit
NTP time.Time PTS time.Duration
PTS time.Duration Frame []byte
Frame []byte
}
// GetRTPPackets implements Unit.
func (d *UnitVP8) GetRTPPackets() []*rtp.Packet {
return d.RTPPackets
}
// GetNTP implements Unit.
func (d *UnitVP8) GetNTP() time.Time {
return d.NTP
} }
type formatProcessorVP8 struct { type formatProcessorVP8 struct {
@@ -118,7 +107,9 @@ func (t *formatProcessorVP8) Process(unit Unit, hasNonRTSPReaders bool) error {
func (t *formatProcessorVP8) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time) Unit { func (t *formatProcessorVP8) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time) Unit {
return &UnitVP8{ return &UnitVP8{
RTPPackets: []*rtp.Packet{pkt}, BaseUnit: BaseUnit{
NTP: ntp, RTPPackets: []*rtp.Packet{pkt},
NTP: ntp,
},
} }
} }

View File

@@ -13,20 +13,9 @@ import (
// UnitVP9 is a VP9 data unit. // UnitVP9 is a VP9 data unit.
type UnitVP9 struct { type UnitVP9 struct {
RTPPackets []*rtp.Packet BaseUnit
NTP time.Time PTS time.Duration
PTS time.Duration Frame []byte
Frame []byte
}
// GetRTPPackets implements Unit.
func (d *UnitVP9) GetRTPPackets() []*rtp.Packet {
return d.RTPPackets
}
// GetNTP implements Unit.
func (d *UnitVP9) GetNTP() time.Time {
return d.NTP
} }
type formatProcessorVP9 struct { type formatProcessorVP9 struct {
@@ -118,7 +107,9 @@ func (t *formatProcessorVP9) Process(unit Unit, hasNonRTSPReaders bool) error {
func (t *formatProcessorVP9) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time) Unit { func (t *formatProcessorVP9) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time) Unit {
return &UnitVP9{ return &UnitVP9{
RTPPackets: []*rtp.Packet{pkt}, BaseUnit: BaseUnit{
NTP: ntp, RTPPackets: []*rtp.Packet{pkt},
NTP: ntp,
},
} }
} }