diff --git a/internal/core/hls_source.go b/internal/core/hls_source.go index 366101fd..38929563 100644 --- a/internal/core/hls_source.go +++ b/internal/core/hls_source.go @@ -80,9 +80,11 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan c.OnDataH26x(track, func(pts time.Duration, dts time.Duration, au [][]byte) { stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitH264{ + BaseUnit: formatprocessor.BaseUnit{ + NTP: time.Now(), + }, PTS: pts, AU: au, - NTP: time.Now(), }) }) @@ -99,9 +101,11 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan c.OnDataH26x(track, func(pts time.Duration, dts time.Duration, au [][]byte) { stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitH265{ + BaseUnit: formatprocessor.BaseUnit{ + NTP: time.Now(), + }, PTS: pts, AU: au, - NTP: time.Now(), }) }) @@ -119,9 +123,11 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan c.OnDataMPEG4Audio(track, func(pts time.Duration, dts time.Duration, aus [][]byte) { stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitMPEG4AudioGeneric{ + BaseUnit: formatprocessor.BaseUnit{ + NTP: time.Now(), + }, PTS: pts, AUs: aus, - NTP: time.Now(), }) }) @@ -136,9 +142,11 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan c.OnDataOpus(track, func(pts time.Duration, dts time.Duration, packets [][]byte) { stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitOpus{ + BaseUnit: formatprocessor.BaseUnit{ + NTP: time.Now(), + }, PTS: pts, Packets: packets, - NTP: time.Now(), }) }) } diff --git a/internal/core/rpicamera_source.go b/internal/core/rpicamera_source.go index 9c7fb19e..83bc2541 100644 --- a/internal/core/rpicamera_source.go +++ b/internal/core/rpicamera_source.go @@ -100,9 +100,11 @@ func (s *rpiCameraSource) run(ctx context.Context, cnf *conf.PathConf, reloadCon } stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitH264{ + BaseUnit: formatprocessor.BaseUnit{ + NTP: time.Now(), + }, PTS: dts, AU: au, - NTP: time.Now(), }) } diff --git a/internal/core/rtmp_conn.go b/internal/core/rtmp_conn.go index f6567024..16bf8dc2 100644 --- a/internal/core/rtmp_conn.go +++ b/internal/core/rtmp_conn.go @@ -64,9 +64,11 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream.S } stream.WriteUnit(medi, format, &formatprocessor.UnitH264{ + BaseUnit: formatprocessor.BaseUnit{ + NTP: time.Now(), + }, PTS: tmsg.DTS + tmsg.PTSDelta, AU: au, - NTP: time.Now(), }) case message.VideoTypeAU: @@ -76,9 +78,11 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream.S } stream.WriteUnit(medi, format, &formatprocessor.UnitH264{ + BaseUnit: formatprocessor.BaseUnit{ + NTP: time.Now(), + }, PTS: tmsg.DTS + tmsg.PTSDelta, AU: au, - NTP: time.Now(), }) } @@ -95,9 +99,11 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream.S } stream.WriteUnit(medi, format, &formatprocessor.UnitH265{ + BaseUnit: formatprocessor.BaseUnit{ + NTP: time.Now(), + }, PTS: tmsg.DTS + tmsg.PTSDelta, AU: au, - NTP: time.Now(), }) case *message.ExtendedFramesX: @@ -107,9 +113,11 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream.S } stream.WriteUnit(medi, format, &formatprocessor.UnitH265{ + BaseUnit: formatprocessor.BaseUnit{ + NTP: time.Now(), + }, PTS: tmsg.DTS, AU: au, - NTP: time.Now(), }) case *message.ExtendedCodedFrames: @@ -119,9 +127,11 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream.S } stream.WriteUnit(medi, format, &formatprocessor.UnitH265{ + BaseUnit: formatprocessor.BaseUnit{ + NTP: time.Now(), + }, PTS: tmsg.DTS + tmsg.PTSDelta, AU: au, - NTP: time.Now(), }) } @@ -137,9 +147,11 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream.S } stream.WriteUnit(medi, format, &formatprocessor.UnitAV1{ + BaseUnit: formatprocessor.BaseUnit{ + NTP: time.Now(), + }, PTS: tmsg.DTS, OBUs: obus, - NTP: time.Now(), }) } @@ -151,9 +163,11 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream.S tmsg := msg.(*message.Audio) stream.WriteUnit(medi, format, &formatprocessor.UnitMPEG2Audio{ + BaseUnit: formatprocessor.BaseUnit{ + NTP: time.Now(), + }, PTS: tmsg.DTS, Frames: [][]byte{tmsg.Payload}, - NTP: time.Now(), }) return nil @@ -165,9 +179,11 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream.S if tmsg.AACType == message.AudioAACTypeAU { stream.WriteUnit(medi, format, &formatprocessor.UnitMPEG4AudioGeneric{ + BaseUnit: formatprocessor.BaseUnit{ + NTP: time.Now(), + }, PTS: tmsg.DTS, AUs: [][]byte{tmsg.Payload}, - NTP: time.Now(), }) } diff --git a/internal/core/udp_source.go b/internal/core/udp_source.go index 8d930f5f..3780fb02 100644 --- a/internal/core/udp_source.go +++ b/internal/core/udp_source.go @@ -165,9 +165,11 @@ func (s *udpSource) runReader(pc net.PacketConn) error { r.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) error { stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitH264{ + BaseUnit: formatprocessor.BaseUnit{ + NTP: time.Now(), + }, PTS: decodeTime(pts), AU: au, - NTP: time.Now(), }) return nil }) @@ -182,9 +184,11 @@ func (s *udpSource) runReader(pc net.PacketConn) error { r.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) error { stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitH265{ + BaseUnit: formatprocessor.BaseUnit{ + NTP: time.Now(), + }, PTS: decodeTime(pts), AU: au, - NTP: time.Now(), }) return nil }) @@ -203,9 +207,11 @@ func (s *udpSource) runReader(pc net.PacketConn) error { r.OnDataMPEG4Audio(track, func(pts int64, _ int64, aus [][]byte) error { stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitMPEG4AudioGeneric{ + BaseUnit: formatprocessor.BaseUnit{ + NTP: time.Now(), + }, PTS: decodeTime(pts), AUs: aus, - NTP: time.Now(), }) return nil }) @@ -221,9 +227,11 @@ func (s *udpSource) runReader(pc net.PacketConn) error { r.OnDataOpus(track, func(pts int64, _ int64, packets [][]byte) error { stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitOpus{ + BaseUnit: formatprocessor.BaseUnit{ + NTP: time.Now(), + }, PTS: decodeTime(pts), Packets: packets, - NTP: time.Now(), }) return nil }) diff --git a/internal/formatprocessor/av1.go b/internal/formatprocessor/av1.go index b7dd076d..f4a4d9cd 100644 --- a/internal/formatprocessor/av1.go +++ b/internal/formatprocessor/av1.go @@ -14,20 +14,9 @@ import ( // UnitAV1 is an AV1 data unit. type UnitAV1 struct { - RTPPackets []*rtp.Packet - NTP time.Time - PTS time.Duration - OBUs [][]byte -} - -// GetRTPPackets implements Unit. -func (d *UnitAV1) GetRTPPackets() []*rtp.Packet { - return d.RTPPackets -} - -// GetNTP implements Unit. -func (d *UnitAV1) GetNTP() time.Time { - return d.NTP + BaseUnit + PTS time.Duration + OBUs [][]byte } type formatProcessorAV1 struct { @@ -145,7 +134,9 @@ func (t *formatProcessorAV1) Process(unit Unit, hasNonRTSPReaders bool) error { func (t *formatProcessorAV1) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time) Unit { return &UnitAV1{ - RTPPackets: []*rtp.Packet{pkt}, - NTP: ntp, + BaseUnit: BaseUnit{ + RTPPackets: []*rtp.Packet{pkt}, + NTP: ntp, + }, } } diff --git a/internal/formatprocessor/av1_test.go b/internal/formatprocessor/av1_test.go index 49eb2ce0..f7e0a4ab 100644 --- a/internal/formatprocessor/av1_test.go +++ b/internal/formatprocessor/av1_test.go @@ -8,7 +8,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestAV1KeyFrameWarning(t *testing.T) { +func TestAV1KeyFrameWarning(t *testing.T) { //nolint:dupl forma := &formats.AV1{ PayloadTyp: 96, } @@ -19,19 +19,23 @@ func TestAV1KeyFrameWarning(t *testing.T) { ntp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) err = p.Process(&UnitAV1{ + BaseUnit: BaseUnit{ + NTP: ntp, + }, OBUs: [][]byte{ {0x01}, }, - NTP: ntp, }, false) require.NoError(t, err) ntp = ntp.Add(30 * time.Second) err = p.Process(&UnitAV1{ + BaseUnit: BaseUnit{ + NTP: ntp, + }, OBUs: [][]byte{ {0x01}, }, - NTP: ntp, }, false) require.NoError(t, err) diff --git a/internal/formatprocessor/generic.go b/internal/formatprocessor/generic.go index d9dc449c..a8f5bea4 100644 --- a/internal/formatprocessor/generic.go +++ b/internal/formatprocessor/generic.go @@ -12,18 +12,7 @@ import ( // UnitGeneric is a generic data unit. type UnitGeneric struct { - RTPPackets []*rtp.Packet - NTP time.Time -} - -// GetRTPPackets implements Unit. -func (d *UnitGeneric) GetRTPPackets() []*rtp.Packet { - return d.RTPPackets -} - -// GetNTP implements Unit. -func (d *UnitGeneric) GetNTP() time.Time { - return d.NTP + BaseUnit } type formatProcessorGeneric struct { @@ -64,7 +53,9 @@ func (t *formatProcessorGeneric) Process(unit Unit, _ bool) error { func (t *formatProcessorGeneric) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time) Unit { return &UnitGeneric{ - RTPPackets: []*rtp.Packet{pkt}, - NTP: ntp, + BaseUnit: BaseUnit{ + RTPPackets: []*rtp.Packet{pkt}, + NTP: ntp, + }, } } diff --git a/internal/formatprocessor/generic_test.go b/internal/formatprocessor/generic_test.go index 8e042f3f..b58b480a 100644 --- a/internal/formatprocessor/generic_test.go +++ b/internal/formatprocessor/generic_test.go @@ -33,7 +33,9 @@ func TestGenericRemovePadding(t *testing.T) { } err = p.Process(&UnitGeneric{ - RTPPackets: []*rtp.Packet{pkt}, + BaseUnit: BaseUnit{ + RTPPackets: []*rtp.Packet{pkt}, + }, }, false) require.NoError(t, err) diff --git a/internal/formatprocessor/h264.go b/internal/formatprocessor/h264.go index b330f600..09e7e025 100644 --- a/internal/formatprocessor/h264.go +++ b/internal/formatprocessor/h264.go @@ -71,20 +71,9 @@ func rtpH264ExtractSPSPPS(pkt *rtp.Packet) ([]byte, []byte) { // UnitH264 is a H264 data unit. type UnitH264 struct { - RTPPackets []*rtp.Packet - NTP time.Time - PTS time.Duration - AU [][]byte -} - -// GetRTPPackets implements Unit. -func (d *UnitH264) GetRTPPackets() []*rtp.Packet { - return d.RTPPackets -} - -// GetNTP implements Unit. -func (d *UnitH264) GetNTP() time.Time { - return d.NTP + BaseUnit + PTS time.Duration + AU [][]byte } type formatProcessorH264 struct { @@ -334,7 +323,9 @@ func (t *formatProcessorH264) Process(unit Unit, hasNonRTSPReaders bool) error { func (t *formatProcessorH264) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time) Unit { return &UnitH264{ - RTPPackets: []*rtp.Packet{pkt}, - NTP: ntp, + BaseUnit: BaseUnit{ + RTPPackets: []*rtp.Packet{pkt}, + NTP: ntp, + }, } } diff --git a/internal/formatprocessor/h264_test.go b/internal/formatprocessor/h264_test.go index 5d7bf38e..de40dafb 100644 --- a/internal/formatprocessor/h264_test.go +++ b/internal/formatprocessor/h264_test.go @@ -37,7 +37,11 @@ func TestH264DynamicParams(t *testing.T) { pkts, err := enc.Encode([][]byte{{byte(h264.NALUTypeIDR)}}, 0) require.NoError(t, err) - data := &UnitH264{RTPPackets: []*rtp.Packet{pkts[0]}} + data := &UnitH264{ + BaseUnit: BaseUnit{ + RTPPackets: []*rtp.Packet{pkts[0]}, + }, + } p.Process(data, true) require.Equal(t, [][]byte{ @@ -46,18 +50,30 @@ func TestH264DynamicParams(t *testing.T) { pkts, err = enc.Encode([][]byte{{7, 4, 5, 6}}, 0) // SPS require.NoError(t, err) - p.Process(&UnitH264{RTPPackets: []*rtp.Packet{pkts[0]}}, false) + p.Process(&UnitH264{ + BaseUnit: BaseUnit{ + RTPPackets: []*rtp.Packet{pkts[0]}, + }, + }, false) pkts, err = enc.Encode([][]byte{{8, 1}}, 0) // PPS require.NoError(t, err) - p.Process(&UnitH264{RTPPackets: []*rtp.Packet{pkts[0]}}, false) + p.Process(&UnitH264{ + BaseUnit: BaseUnit{ + RTPPackets: []*rtp.Packet{pkts[0]}, + }, + }, false) require.Equal(t, []byte{7, 4, 5, 6}, forma.SPS) require.Equal(t, []byte{8, 1}, forma.PPS) pkts, err = enc.Encode([][]byte{{byte(h264.NALUTypeIDR)}}, 0) require.NoError(t, err) - data = &UnitH264{RTPPackets: []*rtp.Packet{pkts[0]}} + data = &UnitH264{ + BaseUnit: BaseUnit{ + RTPPackets: []*rtp.Packet{pkts[0]}, + }, + } p.Process(data, true) require.Equal(t, [][]byte{ @@ -118,7 +134,11 @@ func TestH264OversizedPackets(t *testing.T) { Payload: []byte{0x1c, 0b01000000, 0x01, 0x02, 0x03, 0x04}, }, } { - data := &UnitH264{RTPPackets: []*rtp.Packet{pkt}} + data := &UnitH264{ + BaseUnit: BaseUnit{ + RTPPackets: []*rtp.Packet{pkt}, + }, + } p.Process(data, false) out = append(out, data.RTPPackets...) } @@ -200,19 +220,23 @@ func TestH264KeyFrameWarning(t *testing.T) { ntp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) err = p.Process(&UnitH264{ + BaseUnit: BaseUnit{ + NTP: ntp, + }, AU: [][]byte{ {0x01}, }, - NTP: ntp, }, false) require.NoError(t, err) ntp = ntp.Add(30 * time.Second) err = p.Process(&UnitH264{ + BaseUnit: BaseUnit{ + NTP: ntp, + }, AU: [][]byte{ {0x01}, }, - NTP: ntp, }, false) require.NoError(t, err) diff --git a/internal/formatprocessor/h265.go b/internal/formatprocessor/h265.go index 2246d682..a49ea24a 100644 --- a/internal/formatprocessor/h265.go +++ b/internal/formatprocessor/h265.go @@ -78,20 +78,9 @@ func rtpH265ExtractVPSSPSPPS(pkt *rtp.Packet) ([]byte, []byte, []byte) { // UnitH265 is a H265 data unit. type UnitH265 struct { - RTPPackets []*rtp.Packet - NTP time.Time - PTS time.Duration - AU [][]byte -} - -// GetRTPPackets implements Unit. -func (d *UnitH265) GetRTPPackets() []*rtp.Packet { - return d.RTPPackets -} - -// GetNTP implements Unit. -func (d *UnitH265) GetNTP() time.Time { - return d.NTP + BaseUnit + PTS time.Duration + AU [][]byte } type formatProcessorH265 struct { @@ -356,7 +345,9 @@ func (t *formatProcessorH265) Process(unit Unit, hasNonRTSPReaders bool) error { func (t *formatProcessorH265) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time) Unit { return &UnitH265{ - RTPPackets: []*rtp.Packet{pkt}, - NTP: ntp, + BaseUnit: BaseUnit{ + RTPPackets: []*rtp.Packet{pkt}, + NTP: ntp, + }, } } diff --git a/internal/formatprocessor/h265_test.go b/internal/formatprocessor/h265_test.go index 15b6ecea..6c276975 100644 --- a/internal/formatprocessor/h265_test.go +++ b/internal/formatprocessor/h265_test.go @@ -25,7 +25,11 @@ func TestH265DynamicParams(t *testing.T) { pkts, err := enc.Encode([][]byte{{byte(h265.NALUType_CRA_NUT) << 1, 0}}, 0) require.NoError(t, err) - data := &UnitH265{RTPPackets: []*rtp.Packet{pkts[0]}} + data := &UnitH265{ + BaseUnit: BaseUnit{ + RTPPackets: []*rtp.Packet{pkts[0]}, + }, + } p.Process(data, true) require.Equal(t, [][]byte{ @@ -34,15 +38,27 @@ func TestH265DynamicParams(t *testing.T) { pkts, err = enc.Encode([][]byte{{byte(h265.NALUType_VPS_NUT) << 1, 1, 2, 3}}, 0) require.NoError(t, err) - p.Process(&UnitH265{RTPPackets: []*rtp.Packet{pkts[0]}}, false) + p.Process(&UnitH265{ + BaseUnit: BaseUnit{ + RTPPackets: []*rtp.Packet{pkts[0]}, + }, + }, false) pkts, err = enc.Encode([][]byte{{byte(h265.NALUType_SPS_NUT) << 1, 4, 5, 6}}, 0) require.NoError(t, err) - p.Process(&UnitH265{RTPPackets: []*rtp.Packet{pkts[0]}}, false) + p.Process(&UnitH265{ + BaseUnit: BaseUnit{ + RTPPackets: []*rtp.Packet{pkts[0]}, + }, + }, false) pkts, err = enc.Encode([][]byte{{byte(h265.NALUType_PPS_NUT) << 1, 7, 8, 9}}, 0) require.NoError(t, err) - p.Process(&UnitH265{RTPPackets: []*rtp.Packet{pkts[0]}}, false) + p.Process(&UnitH265{ + BaseUnit: BaseUnit{ + RTPPackets: []*rtp.Packet{pkts[0]}, + }, + }, false) require.Equal(t, []byte{byte(h265.NALUType_VPS_NUT) << 1, 1, 2, 3}, forma.VPS) require.Equal(t, []byte{byte(h265.NALUType_SPS_NUT) << 1, 4, 5, 6}, forma.SPS) @@ -50,7 +66,11 @@ func TestH265DynamicParams(t *testing.T) { pkts, err = enc.Encode([][]byte{{byte(h265.NALUType_CRA_NUT) << 1, 0}}, 0) require.NoError(t, err) - data = &UnitH265{RTPPackets: []*rtp.Packet{pkts[0]}} + data = &UnitH265{ + BaseUnit: BaseUnit{ + RTPPackets: []*rtp.Packet{pkts[0]}, + }, + } p.Process(data, true) require.Equal(t, [][]byte{ @@ -100,7 +120,11 @@ func TestH265OversizedPackets(t *testing.T) { Payload: bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 2000/4), }, } { - data := &UnitH265{RTPPackets: []*rtp.Packet{pkt}} + data := &UnitH265{ + BaseUnit: BaseUnit{ + RTPPackets: []*rtp.Packet{pkt}, + }, + } p.Process(data, false) out = append(out, data.RTPPackets...) } @@ -170,7 +194,7 @@ func TestH265EmptyPacket(t *testing.T) { require.Equal(t, []*rtp.Packet(nil), unit.RTPPackets) } -func TestH265KeyFrameWarning(t *testing.T) { +func TestH265KeyFrameWarning(t *testing.T) { //nolint:dupl forma := &formats.H265{ PayloadTyp: 96, } @@ -181,19 +205,23 @@ func TestH265KeyFrameWarning(t *testing.T) { ntp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) err = p.Process(&UnitH265{ + BaseUnit: BaseUnit{ + NTP: ntp, + }, AU: [][]byte{ {0x01}, }, - NTP: ntp, }, false) require.NoError(t, err) ntp = ntp.Add(30 * time.Second) err = p.Process(&UnitH265{ + BaseUnit: BaseUnit{ + NTP: ntp, + }, AU: [][]byte{ {0x01}, }, - NTP: ntp, }, false) require.NoError(t, err) diff --git a/internal/formatprocessor/mpeg2audio.go b/internal/formatprocessor/mpeg2audio.go index 308bbf33..0aaa5e98 100644 --- a/internal/formatprocessor/mpeg2audio.go +++ b/internal/formatprocessor/mpeg2audio.go @@ -13,20 +13,9 @@ import ( // UnitMPEG2Audio is a MPEG-1/2 Audio data unit. type UnitMPEG2Audio struct { - RTPPackets []*rtp.Packet - NTP time.Time - PTS time.Duration - Frames [][]byte -} - -// GetRTPPackets implements Unit. -func (d *UnitMPEG2Audio) GetRTPPackets() []*rtp.Packet { - return d.RTPPackets -} - -// GetNTP implements Unit. -func (d *UnitMPEG2Audio) GetNTP() time.Time { - return d.NTP + BaseUnit + PTS time.Duration + Frames [][]byte } type formatProcessorMPEG2Audio struct { @@ -117,7 +106,9 @@ func (t *formatProcessorMPEG2Audio) Process(unit Unit, hasNonRTSPReaders bool) e func (t *formatProcessorMPEG2Audio) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time) Unit { return &UnitMPEG2Audio{ - RTPPackets: []*rtp.Packet{pkt}, - NTP: ntp, + BaseUnit: BaseUnit{ + RTPPackets: []*rtp.Packet{pkt}, + NTP: ntp, + }, } } diff --git a/internal/formatprocessor/mpeg4audio_generic.go b/internal/formatprocessor/mpeg4audio_generic.go index 097891dd..044b362d 100644 --- a/internal/formatprocessor/mpeg4audio_generic.go +++ b/internal/formatprocessor/mpeg4audio_generic.go @@ -13,20 +13,9 @@ import ( // UnitMPEG4AudioGeneric is a MPEG-4 Audio data unit. type UnitMPEG4AudioGeneric struct { - RTPPackets []*rtp.Packet - NTP time.Time - PTS time.Duration - AUs [][]byte -} - -// GetRTPPackets implements Unit. -func (d *UnitMPEG4AudioGeneric) GetRTPPackets() []*rtp.Packet { - return d.RTPPackets -} - -// GetNTP implements Unit. -func (d *UnitMPEG4AudioGeneric) GetNTP() time.Time { - return d.NTP + BaseUnit + PTS time.Duration + AUs [][]byte } type formatProcessorMPEG4AudioGeneric struct { @@ -122,7 +111,9 @@ func (t *formatProcessorMPEG4AudioGeneric) Process(unit Unit, hasNonRTSPReaders func (t *formatProcessorMPEG4AudioGeneric) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time) Unit { return &UnitMPEG4AudioGeneric{ - RTPPackets: []*rtp.Packet{pkt}, - NTP: ntp, + BaseUnit: BaseUnit{ + RTPPackets: []*rtp.Packet{pkt}, + NTP: ntp, + }, } } diff --git a/internal/formatprocessor/mpeg4audio_latm.go b/internal/formatprocessor/mpeg4audio_latm.go index 8676636e..9e51a73b 100644 --- a/internal/formatprocessor/mpeg4audio_latm.go +++ b/internal/formatprocessor/mpeg4audio_latm.go @@ -13,20 +13,9 @@ import ( // UnitMPEG4AudioLATM is a MPEG-4 Audio data unit. type UnitMPEG4AudioLATM struct { - RTPPackets []*rtp.Packet - NTP time.Time - PTS time.Duration - AU []byte -} - -// GetRTPPackets implements Unit. -func (d *UnitMPEG4AudioLATM) GetRTPPackets() []*rtp.Packet { - return d.RTPPackets -} - -// GetNTP implements Unit. -func (d *UnitMPEG4AudioLATM) GetNTP() time.Time { - return d.NTP + BaseUnit + PTS time.Duration + AU []byte } type formatProcessorMPEG4AudioLATM struct { @@ -118,7 +107,9 @@ func (t *formatProcessorMPEG4AudioLATM) Process(unit Unit, hasNonRTSPReaders boo func (t *formatProcessorMPEG4AudioLATM) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time) Unit { return &UnitMPEG4AudioLATM{ - RTPPackets: []*rtp.Packet{pkt}, - NTP: ntp, + BaseUnit: BaseUnit{ + RTPPackets: []*rtp.Packet{pkt}, + NTP: ntp, + }, } } diff --git a/internal/formatprocessor/opus.go b/internal/formatprocessor/opus.go index 7db741a3..dfb610e0 100644 --- a/internal/formatprocessor/opus.go +++ b/internal/formatprocessor/opus.go @@ -14,20 +14,9 @@ import ( // UnitOpus is a Opus data unit. type UnitOpus struct { - RTPPackets []*rtp.Packet - NTP time.Time - PTS time.Duration - Packets [][]byte -} - -// GetRTPPackets implements Unit. -func (d *UnitOpus) GetRTPPackets() []*rtp.Packet { - return d.RTPPackets -} - -// GetNTP implements Unit. -func (d *UnitOpus) GetNTP() time.Time { - return d.NTP + BaseUnit + PTS time.Duration + Packets [][]byte } type formatProcessorOpus struct { @@ -124,7 +113,9 @@ func (t *formatProcessorOpus) Process(unit Unit, hasNonRTSPReaders bool) error { func (t *formatProcessorOpus) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time) Unit { return &UnitOpus{ - RTPPackets: []*rtp.Packet{pkt}, - NTP: ntp, + BaseUnit: BaseUnit{ + RTPPackets: []*rtp.Packet{pkt}, + NTP: ntp, + }, } } diff --git a/internal/formatprocessor/unit.go b/internal/formatprocessor/unit.go index c52b6e9a..ceca4800 100644 --- a/internal/formatprocessor/unit.go +++ b/internal/formatprocessor/unit.go @@ -6,6 +6,22 @@ import ( "github.com/pion/rtp" ) +// BaseUnit contains fields shared across all units. +type BaseUnit struct { + RTPPackets []*rtp.Packet + NTP time.Time +} + +// GetRTPPackets implements Unit. +func (u *BaseUnit) GetRTPPackets() []*rtp.Packet { + return u.RTPPackets +} + +// GetNTP implements Unit. +func (u *BaseUnit) GetNTP() time.Time { + return u.NTP +} + // Unit is the elementary data unit routed across the server. type Unit interface { // returns RTP packets contained into the unit. diff --git a/internal/formatprocessor/vp8.go b/internal/formatprocessor/vp8.go index 0c7f2730..5db500ba 100644 --- a/internal/formatprocessor/vp8.go +++ b/internal/formatprocessor/vp8.go @@ -13,20 +13,9 @@ import ( // UnitVP8 is a VP8 data unit. type UnitVP8 struct { - RTPPackets []*rtp.Packet - NTP time.Time - PTS time.Duration - Frame []byte -} - -// GetRTPPackets implements Unit. -func (d *UnitVP8) GetRTPPackets() []*rtp.Packet { - return d.RTPPackets -} - -// GetNTP implements Unit. -func (d *UnitVP8) GetNTP() time.Time { - return d.NTP + BaseUnit + PTS time.Duration + Frame []byte } type formatProcessorVP8 struct { @@ -118,7 +107,9 @@ func (t *formatProcessorVP8) Process(unit Unit, hasNonRTSPReaders bool) error { func (t *formatProcessorVP8) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time) Unit { return &UnitVP8{ - RTPPackets: []*rtp.Packet{pkt}, - NTP: ntp, + BaseUnit: BaseUnit{ + RTPPackets: []*rtp.Packet{pkt}, + NTP: ntp, + }, } } diff --git a/internal/formatprocessor/vp9.go b/internal/formatprocessor/vp9.go index ce13ecfa..ca255c2f 100644 --- a/internal/formatprocessor/vp9.go +++ b/internal/formatprocessor/vp9.go @@ -13,20 +13,9 @@ import ( // UnitVP9 is a VP9 data unit. type UnitVP9 struct { - RTPPackets []*rtp.Packet - NTP time.Time - PTS time.Duration - Frame []byte -} - -// GetRTPPackets implements Unit. -func (d *UnitVP9) GetRTPPackets() []*rtp.Packet { - return d.RTPPackets -} - -// GetNTP implements Unit. -func (d *UnitVP9) GetNTP() time.Time { - return d.NTP + BaseUnit + PTS time.Duration + Frame []byte } type formatProcessorVP9 struct { @@ -118,7 +107,9 @@ func (t *formatProcessorVP9) Process(unit Unit, hasNonRTSPReaders bool) error { func (t *formatProcessorVP9) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time) Unit { return &UnitVP9{ - RTPPackets: []*rtp.Packet{pkt}, - NTP: ntp, + BaseUnit: BaseUnit{ + RTPPackets: []*rtp.Packet{pkt}, + NTP: ntp, + }, } }