diff --git a/internal/core/hls_muxer.go b/internal/core/hls_muxer.go index 1cb07970..94cc1ab2 100644 --- a/internal/core/hls_muxer.go +++ b/internal/core/hls_muxer.go @@ -355,21 +355,21 @@ func (m *hlsMuxer) setupVideoMedia(stream *stream) (*media.Media, format.Format) videoStartPTSFilled := false var videoStartPTS time.Duration - stream.readerAdd(m, videoMedia, videoFormatH265, func(dat formatprocessor.Data) { + stream.readerAdd(m, videoMedia, videoFormatH265, func(unit formatprocessor.Unit) { m.ringBuffer.Push(func() error { - tdata := dat.(*formatprocessor.DataH265) + tunit := unit.(*formatprocessor.UnitH265) - if tdata.AU == nil { + if tunit.AU == nil { return nil } if !videoStartPTSFilled { videoStartPTSFilled = true - videoStartPTS = tdata.PTS + videoStartPTS = tunit.PTS } - pts := tdata.PTS - videoStartPTS + pts := tunit.PTS - videoStartPTS - err := m.muxer.WriteH26x(tdata.NTP, pts, tdata.AU) + err := m.muxer.WriteH26x(tunit.NTP, pts, tunit.AU) if err != nil { return fmt.Errorf("muxer error: %v", err) } @@ -388,21 +388,21 @@ func (m *hlsMuxer) setupVideoMedia(stream *stream) (*media.Media, format.Format) videoStartPTSFilled := false var videoStartPTS time.Duration - stream.readerAdd(m, videoMedia, videoFormatH264, func(dat formatprocessor.Data) { + stream.readerAdd(m, videoMedia, videoFormatH264, func(unit formatprocessor.Unit) { m.ringBuffer.Push(func() error { - tdata := dat.(*formatprocessor.DataH264) + tunit := unit.(*formatprocessor.UnitH264) - if tdata.AU == nil { + if tunit.AU == nil { return nil } if !videoStartPTSFilled { videoStartPTSFilled = true - videoStartPTS = tdata.PTS + videoStartPTS = tunit.PTS } - pts := tdata.PTS - videoStartPTS + pts := tunit.PTS - videoStartPTS - err := m.muxer.WriteH26x(tdata.NTP, pts, tdata.AU) + err := m.muxer.WriteH26x(tunit.NTP, pts, tunit.AU) if err != nil { return fmt.Errorf("muxer error: %v", err) } @@ -425,23 +425,23 @@ func (m *hlsMuxer) setupAudioMedia(stream *stream) (*media.Media, format.Format) audioStartPTSFilled := false var audioStartPTS time.Duration - stream.readerAdd(m, audioMedia, audioFormatMPEG4Audio, func(dat formatprocessor.Data) { + stream.readerAdd(m, audioMedia, audioFormatMPEG4Audio, func(unit formatprocessor.Unit) { m.ringBuffer.Push(func() error { - tdata := dat.(*formatprocessor.DataMPEG4Audio) + tunit := unit.(*formatprocessor.UnitMPEG4Audio) - if tdata.AUs == nil { + if tunit.AUs == nil { return nil } if !audioStartPTSFilled { audioStartPTSFilled = true - audioStartPTS = tdata.PTS + audioStartPTS = tunit.PTS } - pts := tdata.PTS - audioStartPTS + pts := tunit.PTS - audioStartPTS - for i, au := range tdata.AUs { + for i, au := range tunit.AUs { err := m.muxer.WriteAudio( - tdata.NTP, + tunit.NTP, pts+time.Duration(i)*mpeg4audio.SamplesPerAccessUnit* time.Second/time.Duration(audioFormatMPEG4Audio.ClockRate()), au) @@ -464,20 +464,20 @@ func (m *hlsMuxer) setupAudioMedia(stream *stream) (*media.Media, format.Format) audioStartPTSFilled := false var audioStartPTS time.Duration - stream.readerAdd(m, audioMedia, audioFormatOpus, func(dat formatprocessor.Data) { + stream.readerAdd(m, audioMedia, audioFormatOpus, func(unit formatprocessor.Unit) { m.ringBuffer.Push(func() error { - tdata := dat.(*formatprocessor.DataOpus) + tunit := unit.(*formatprocessor.UnitOpus) if !audioStartPTSFilled { audioStartPTSFilled = true - audioStartPTS = tdata.PTS + audioStartPTS = tunit.PTS } - pts := tdata.PTS - audioStartPTS + pts := tunit.PTS - audioStartPTS err := m.muxer.WriteAudio( - tdata.NTP, + tunit.NTP, pts, - tdata.Frame) + tunit.Frame) if err != nil { return fmt.Errorf("muxer error: %v", err) } diff --git a/internal/core/hls_source.go b/internal/core/hls_source.go index 74dda62e..cdfda106 100644 --- a/internal/core/hls_source.go +++ b/internal/core/hls_source.go @@ -68,10 +68,10 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan case *format.H264: medi.Type = media.TypeVideo - c.OnData(track, func(pts time.Duration, dat interface{}) { - err := stream.writeData(medi, ctrack, &formatprocessor.DataH264{ + c.OnData(track, func(pts time.Duration, unit interface{}) { + err := stream.writeData(medi, ctrack, &formatprocessor.UnitH264{ PTS: pts, - AU: dat.([][]byte), + AU: unit.([][]byte), NTP: time.Now(), }) if err != nil { @@ -82,10 +82,10 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan case *format.H265: medi.Type = media.TypeVideo - c.OnData(track, func(pts time.Duration, dat interface{}) { - err := stream.writeData(medi, ctrack, &formatprocessor.DataH265{ + c.OnData(track, func(pts time.Duration, unit interface{}) { + err := stream.writeData(medi, ctrack, &formatprocessor.UnitH265{ PTS: pts, - AU: dat.([][]byte), + AU: unit.([][]byte), NTP: time.Now(), }) if err != nil { @@ -96,10 +96,10 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan case *format.MPEG4Audio: medi.Type = media.TypeAudio - c.OnData(track, func(pts time.Duration, dat interface{}) { - err := stream.writeData(medi, ctrack, &formatprocessor.DataMPEG4Audio{ + c.OnData(track, func(pts time.Duration, unit interface{}) { + err := stream.writeData(medi, ctrack, &formatprocessor.UnitMPEG4Audio{ PTS: pts, - AUs: [][]byte{dat.([]byte)}, + AUs: [][]byte{unit.([]byte)}, NTP: time.Now(), }) if err != nil { @@ -110,10 +110,10 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan case *format.Opus: medi.Type = media.TypeAudio - c.OnData(track, func(pts time.Duration, dat interface{}) { - err := stream.writeData(medi, ctrack, &formatprocessor.DataOpus{ + c.OnData(track, func(pts time.Duration, unit interface{}) { + err := stream.writeData(medi, ctrack, &formatprocessor.UnitOpus{ PTS: pts, - Frame: dat.([]byte), + Frame: unit.([]byte), NTP: time.Now(), }) if err != nil { diff --git a/internal/core/rpicamera_source.go b/internal/core/rpicamera_source.go index a5916944..66ebc0a7 100644 --- a/internal/core/rpicamera_source.go +++ b/internal/core/rpicamera_source.go @@ -95,7 +95,7 @@ func (s *rpiCameraSource) run(ctx context.Context, cnf *conf.PathConf, reloadCon stream = res.stream } - err := stream.writeData(medi, medi.Formats[0], &formatprocessor.DataH264{ + err := stream.writeData(medi, medi.Formats[0], &formatprocessor.UnitH264{ PTS: dts, AU: au, NTP: time.Now(), diff --git a/internal/core/rtmp_conn.go b/internal/core/rtmp_conn.go index b0ca8ba6..4a2be41c 100644 --- a/internal/core/rtmp_conn.go +++ b/internal/core/rtmp_conn.go @@ -278,24 +278,24 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { var videoStartPTS time.Duration var videoDTSExtractor *h264.DTSExtractor - res.stream.readerAdd(c, videoMedia, videoFormat, func(dat formatprocessor.Data) { + res.stream.readerAdd(c, videoMedia, videoFormat, func(unit formatprocessor.Unit) { ringBuffer.Push(func() error { - tdata := dat.(*formatprocessor.DataH264) + tunit := unit.(*formatprocessor.UnitH264) - if tdata.AU == nil { + if tunit.AU == nil { return nil } if !videoStartPTSFilled { videoStartPTSFilled = true - videoStartPTS = tdata.PTS + videoStartPTS = tunit.PTS } - pts := tdata.PTS - videoStartPTS + pts := tunit.PTS - videoStartPTS idrPresent := false nonIDRPresent := false - for _, nalu := range tdata.AU { + for _, nalu := range tunit.AU { typ := h264.NALUType(nalu[0] & 0x1F) switch typ { case h264.NALUTypeIDR: @@ -318,7 +318,7 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { videoDTSExtractor = h264.NewDTSExtractor() var err error - dts, err = videoDTSExtractor.Extract(tdata.AU, pts) + dts, err = videoDTSExtractor.Extract(tunit.AU, pts) if err != nil { return err } @@ -332,7 +332,7 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { } var err error - dts, err = videoDTSExtractor.Extract(tdata.AU, pts) + dts, err = videoDTSExtractor.Extract(tunit.AU, pts) if err != nil { return err } @@ -341,7 +341,7 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { pts -= videoStartDTS } - avcc, err := h264.AVCCMarshal(tdata.AU) + avcc, err := h264.AVCCMarshal(tunit.AU) if err != nil { return err } @@ -371,19 +371,19 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { audioStartPTSFilled := false var audioStartPTS time.Duration - res.stream.readerAdd(c, audioMedia, audioFormat, func(dat formatprocessor.Data) { + res.stream.readerAdd(c, audioMedia, audioFormat, func(unit formatprocessor.Unit) { ringBuffer.Push(func() error { - tdata := dat.(*formatprocessor.DataMPEG4Audio) + tunit := unit.(*formatprocessor.UnitMPEG4Audio) - if tdata.AUs == nil { + if tunit.AUs == nil { return nil } if !audioStartPTSFilled { audioStartPTSFilled = true - audioStartPTS = tdata.PTS + audioStartPTS = tunit.PTS } - pts := tdata.PTS - audioStartPTS + pts := tunit.PTS - audioStartPTS if videoFormat != nil { if !videoFirstIDRFound { @@ -396,7 +396,7 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { } } - for i, au := range tdata.AUs { + for i, au := range tunit.AUs { c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) err := c.conn.WriteMessage(&message.MsgAudio{ ChunkStreamID: message.MsgAudioChunkStreamID, @@ -542,7 +542,7 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { if _, ok := videoFormat.(*format.H264); ok { onVideoData = func(pts time.Duration, au [][]byte) { - err = rres.stream.writeData(videoMedia, videoFormat, &formatprocessor.DataH264{ + err = rres.stream.writeData(videoMedia, videoFormat, &formatprocessor.UnitH264{ PTS: pts, AU: au, NTP: time.Now(), @@ -553,7 +553,7 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { } } else { onVideoData = func(pts time.Duration, au [][]byte) { - err = rres.stream.writeData(videoMedia, videoFormat, &formatprocessor.DataH265{ + err = rres.stream.writeData(videoMedia, videoFormat, &formatprocessor.UnitH265{ PTS: pts, AU: au, NTP: time.Now(), @@ -589,7 +589,7 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { conf.PPS, } - err := rres.stream.writeData(videoMedia, videoFormat, &formatprocessor.DataH264{ + err := rres.stream.writeData(videoMedia, videoFormat, &formatprocessor.UnitH264{ PTS: tmsg.DTS + tmsg.PTSDelta, AU: au, NTP: time.Now(), @@ -613,7 +613,7 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { } if tmsg.AACType == flvio.AAC_RAW { - err := rres.stream.writeData(audioMedia, audioFormat, &formatprocessor.DataMPEG4Audio{ + err := rres.stream.writeData(audioMedia, audioFormat, &formatprocessor.UnitMPEG4Audio{ PTS: tmsg.DTS, AUs: [][]byte{tmsg.Payload}, NTP: time.Now(), diff --git a/internal/core/rtmp_source.go b/internal/core/rtmp_source.go index 6e69323e..316f7208 100644 --- a/internal/core/rtmp_source.go +++ b/internal/core/rtmp_source.go @@ -177,7 +177,7 @@ func (s *rtmpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha continue } - err = res.stream.writeData(videoMedia, videoFormat, &formatprocessor.DataH264{ + err = res.stream.writeData(videoMedia, videoFormat, &formatprocessor.UnitH264{ PTS: tmsg.DTS + tmsg.PTSDelta, AU: au, NTP: time.Now(), @@ -193,7 +193,7 @@ func (s *rtmpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha return fmt.Errorf("received an AAC packet, but track is not set up") } - err := res.stream.writeData(audioMedia, audioFormat, &formatprocessor.DataMPEG4Audio{ + err := res.stream.writeData(audioMedia, audioFormat, &formatprocessor.UnitMPEG4Audio{ PTS: tmsg.DTS, AUs: [][]byte{tmsg.Payload}, NTP: time.Now(), diff --git a/internal/core/rtsp_session.go b/internal/core/rtsp_session.go index e40a5487..efbcb009 100644 --- a/internal/core/rtsp_session.go +++ b/internal/core/rtsp_session.go @@ -327,7 +327,7 @@ func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R switch forma.(type) { case *format.H264: ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - err := s.stream.writeData(cmedia, cformat, &formatprocessor.DataH264{ + err := s.stream.writeData(cmedia, cformat, &formatprocessor.UnitH264{ RTPPackets: []*rtp.Packet{pkt}, NTP: time.Now(), }) @@ -338,7 +338,7 @@ func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R case *format.H265: ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - err := s.stream.writeData(cmedia, cformat, &formatprocessor.DataH265{ + err := s.stream.writeData(cmedia, cformat, &formatprocessor.UnitH265{ RTPPackets: []*rtp.Packet{pkt}, NTP: time.Now(), }) @@ -349,7 +349,7 @@ func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R case *format.VP8: ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - err := s.stream.writeData(cmedia, cformat, &formatprocessor.DataVP8{ + err := s.stream.writeData(cmedia, cformat, &formatprocessor.UnitVP8{ RTPPackets: []*rtp.Packet{pkt}, NTP: time.Now(), }) @@ -360,7 +360,7 @@ func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R case *format.VP9: ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - err := s.stream.writeData(cmedia, cformat, &formatprocessor.DataVP9{ + err := s.stream.writeData(cmedia, cformat, &formatprocessor.UnitVP9{ RTPPackets: []*rtp.Packet{pkt}, NTP: time.Now(), }) @@ -371,7 +371,7 @@ func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R case *format.MPEG4Audio: ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - err := s.stream.writeData(cmedia, cformat, &formatprocessor.DataMPEG4Audio{ + err := s.stream.writeData(cmedia, cformat, &formatprocessor.UnitMPEG4Audio{ RTPPackets: []*rtp.Packet{pkt}, NTP: time.Now(), }) @@ -382,7 +382,7 @@ func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R case *format.Opus: ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - err := s.stream.writeData(cmedia, cformat, &formatprocessor.DataOpus{ + err := s.stream.writeData(cmedia, cformat, &formatprocessor.UnitOpus{ RTPPackets: []*rtp.Packet{pkt}, NTP: time.Now(), }) @@ -393,7 +393,7 @@ func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R default: ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - err := s.stream.writeData(cmedia, cformat, &formatprocessor.DataGeneric{ + err := s.stream.writeData(cmedia, cformat, &formatprocessor.UnitGeneric{ RTPPackets: []*rtp.Packet{pkt}, NTP: time.Now(), }) diff --git a/internal/core/rtsp_source.go b/internal/core/rtsp_source.go index 79f655e0..aa0981c4 100644 --- a/internal/core/rtsp_source.go +++ b/internal/core/rtsp_source.go @@ -139,7 +139,7 @@ func (s *rtspSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha switch forma.(type) { case *format.H264: c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - err := res.stream.writeData(cmedia, cformat, &formatprocessor.DataH264{ + err := res.stream.writeData(cmedia, cformat, &formatprocessor.UnitH264{ RTPPackets: []*rtp.Packet{pkt}, NTP: time.Now(), }) @@ -150,7 +150,7 @@ func (s *rtspSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha case *format.H265: c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - err := res.stream.writeData(cmedia, cformat, &formatprocessor.DataH265{ + err := res.stream.writeData(cmedia, cformat, &formatprocessor.UnitH265{ RTPPackets: []*rtp.Packet{pkt}, NTP: time.Now(), }) @@ -161,7 +161,7 @@ func (s *rtspSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha case *format.VP8: c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - err := res.stream.writeData(cmedia, cformat, &formatprocessor.DataVP8{ + err := res.stream.writeData(cmedia, cformat, &formatprocessor.UnitVP8{ RTPPackets: []*rtp.Packet{pkt}, NTP: time.Now(), }) @@ -172,7 +172,7 @@ func (s *rtspSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha case *format.VP9: c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - err := res.stream.writeData(cmedia, cformat, &formatprocessor.DataVP9{ + err := res.stream.writeData(cmedia, cformat, &formatprocessor.UnitVP9{ RTPPackets: []*rtp.Packet{pkt}, NTP: time.Now(), }) @@ -183,7 +183,7 @@ func (s *rtspSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha case *format.MPEG4Audio: c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - err := res.stream.writeData(cmedia, cformat, &formatprocessor.DataMPEG4Audio{ + err := res.stream.writeData(cmedia, cformat, &formatprocessor.UnitMPEG4Audio{ RTPPackets: []*rtp.Packet{pkt}, NTP: time.Now(), }) @@ -194,7 +194,7 @@ func (s *rtspSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha case *format.Opus: c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - err := res.stream.writeData(cmedia, cformat, &formatprocessor.DataOpus{ + err := res.stream.writeData(cmedia, cformat, &formatprocessor.UnitOpus{ RTPPackets: []*rtp.Packet{pkt}, NTP: time.Now(), }) @@ -205,7 +205,7 @@ func (s *rtspSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha default: c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - err := res.stream.writeData(cmedia, cformat, &formatprocessor.DataGeneric{ + err := res.stream.writeData(cmedia, cformat, &formatprocessor.UnitGeneric{ RTPPackets: []*rtp.Packet{pkt}, NTP: time.Now(), }) diff --git a/internal/core/stream.go b/internal/core/stream.go index b6eb8b34..54dfea7a 100644 --- a/internal/core/stream.go +++ b/internal/core/stream.go @@ -45,7 +45,7 @@ func (s *stream) medias() media.Medias { return s.rtspStream.Medias() } -func (s *stream) readerAdd(r reader, medi *media.Media, forma format.Format, cb func(formatprocessor.Data)) { +func (s *stream) readerAdd(r reader, medi *media.Media, forma format.Format, cb func(formatprocessor.Unit)) { sm := s.smedias[medi] sf := sm.formats[forma] sf.readerAdd(r, cb) @@ -59,7 +59,7 @@ func (s *stream) readerRemove(r reader) { } } -func (s *stream) writeData(medi *media.Media, forma format.Format, data formatprocessor.Data) error { +func (s *stream) writeData(medi *media.Media, forma format.Format, data formatprocessor.Unit) error { sm := s.smedias[medi] sf := sm.formats[forma] return sf.writeData(s, medi, data) diff --git a/internal/core/stream_format.go b/internal/core/stream_format.go index c109ae35..1b5a66d8 100644 --- a/internal/core/stream_format.go +++ b/internal/core/stream_format.go @@ -13,7 +13,7 @@ import ( type streamFormat struct { proc formatprocessor.Processor mutex sync.RWMutex - nonRTSPReaders map[reader]func(formatprocessor.Data) + nonRTSPReaders map[reader]func(formatprocessor.Unit) } func newStreamFormat(forma format.Format, generateRTPPackets bool) (*streamFormat, error) { @@ -24,13 +24,13 @@ func newStreamFormat(forma format.Format, generateRTPPackets bool) (*streamForma sf := &streamFormat{ proc: proc, - nonRTSPReaders: make(map[reader]func(formatprocessor.Data)), + nonRTSPReaders: make(map[reader]func(formatprocessor.Unit)), } return sf, nil } -func (sf *streamFormat) readerAdd(r reader, cb func(formatprocessor.Data)) { +func (sf *streamFormat) readerAdd(r reader, cb func(formatprocessor.Unit)) { sf.mutex.Lock() defer sf.mutex.Unlock() sf.nonRTSPReaders[r] = cb @@ -42,7 +42,7 @@ func (sf *streamFormat) readerRemove(r reader) { delete(sf.nonRTSPReaders, r) } -func (sf *streamFormat) writeData(s *stream, medi *media.Media, data formatprocessor.Data) error { +func (sf *streamFormat) writeData(s *stream, medi *media.Media, data formatprocessor.Unit) error { sf.mutex.RLock() defer sf.mutex.RUnlock() diff --git a/internal/core/webrtc_conn.go b/internal/core/webrtc_conn.go index 8daf437e..d933b978 100644 --- a/internal/core/webrtc_conn.go +++ b/internal/core/webrtc_conn.go @@ -66,7 +66,7 @@ type webRTCTrack struct { media *media.Media format format.Format webRTCTrack *webrtc.TrackLocalStaticRTP - cb func(formatprocessor.Data, context.Context, chan error) + cb func(formatprocessor.Unit, context.Context, chan error) } func gatherMedias(tracks []*webRTCTrack) media.Medias { @@ -502,9 +502,9 @@ outer: for _, track := range tracks { ctrack := track - res.stream.readerAdd(c, track.media, track.format, func(dat formatprocessor.Data) { + res.stream.readerAdd(c, track.media, track.format, func(unit formatprocessor.Unit) { ringBuffer.Push(func() { - ctrack.cb(dat, ctx, writeError) + ctrack.cb(unit, ctx, writeError) }) }) } @@ -567,14 +567,14 @@ func (c *webRTCConn) allocateTracks(medias media.Medias) ([]*webRTCTrack, error) media: vp9Media, format: vp9Format, webRTCTrack: webRTCTrak, - cb: func(dat formatprocessor.Data, ctx context.Context, writeError chan error) { - tdata := dat.(*formatprocessor.DataVP9) + cb: func(unit formatprocessor.Unit, ctx context.Context, writeError chan error) { + tunit := unit.(*formatprocessor.UnitVP9) - if tdata.Frame == nil { + if tunit.Frame == nil { return } - packets, err := encoder.Encode(tdata.Frame, tdata.PTS) + packets, err := encoder.Encode(tunit.Frame, tunit.PTS) if err != nil { return } @@ -614,14 +614,14 @@ func (c *webRTCConn) allocateTracks(medias media.Medias) ([]*webRTCTrack, error) media: vp8Media, format: vp8Format, webRTCTrack: webRTCTrak, - cb: func(dat formatprocessor.Data, ctx context.Context, writeError chan error) { - tdata := dat.(*formatprocessor.DataVP8) + cb: func(unit formatprocessor.Unit, ctx context.Context, writeError chan error) { + tunit := unit.(*formatprocessor.UnitVP8) - if tdata.Frame == nil { + if tunit.Frame == nil { return } - packets, err := encoder.Encode(tdata.Frame, tdata.PTS) + packets, err := encoder.Encode(tunit.Frame, tunit.PTS) if err != nil { return } @@ -664,28 +664,28 @@ func (c *webRTCConn) allocateTracks(medias media.Medias) ([]*webRTCTrack, error) media: h264Media, format: h264Format, webRTCTrack: webRTCTrak, - cb: func(dat formatprocessor.Data, ctx context.Context, writeError chan error) { - tdata := dat.(*formatprocessor.DataH264) + cb: func(unit formatprocessor.Unit, ctx context.Context, writeError chan error) { + tunit := unit.(*formatprocessor.UnitH264) - if tdata.AU == nil { + if tunit.AU == nil { return } if !firstNALUReceived { firstNALUReceived = true - lastPTS = tdata.PTS + lastPTS = tunit.PTS } else { - if tdata.PTS < lastPTS { + if tunit.PTS < lastPTS { select { case writeError <- fmt.Errorf("WebRTC doesn't support H264 streams with B-frames"): case <-ctx.Done(): } return } - lastPTS = tdata.PTS + lastPTS = tunit.PTS } - packets, err := encoder.Encode(tdata.AU, tdata.PTS) + packets, err := encoder.Encode(tunit.AU, tunit.PTS) if err != nil { return } @@ -718,8 +718,8 @@ func (c *webRTCConn) allocateTracks(medias media.Medias) ([]*webRTCTrack, error) media: opusMedia, format: opusFormat, webRTCTrack: webRTCTrak, - cb: func(dat formatprocessor.Data, ctx context.Context, writeError chan error) { - for _, pkt := range dat.GetRTPPackets() { + cb: func(unit formatprocessor.Unit, ctx context.Context, writeError chan error) { + for _, pkt := range unit.GetRTPPackets() { webRTCTrak.WriteRTP(pkt) } }, @@ -748,8 +748,8 @@ func (c *webRTCConn) allocateTracks(medias media.Medias) ([]*webRTCTrack, error) media: g722Media, format: g722Format, webRTCTrack: webRTCTrak, - cb: func(dat formatprocessor.Data, ctx context.Context, writeError chan error) { - for _, pkt := range dat.GetRTPPackets() { + cb: func(unit formatprocessor.Unit, ctx context.Context, writeError chan error) { + for _, pkt := range unit.GetRTPPackets() { webRTCTrak.WriteRTP(pkt) } }, @@ -786,8 +786,8 @@ func (c *webRTCConn) allocateTracks(medias media.Medias) ([]*webRTCTrack, error) media: g711Media, format: g711Format, webRTCTrack: webRTCTrak, - cb: func(dat formatprocessor.Data, ctx context.Context, writeError chan error) { - for _, pkt := range dat.GetRTPPackets() { + cb: func(unit formatprocessor.Unit, ctx context.Context, writeError chan error) { + for _, pkt := range unit.GetRTPPackets() { webRTCTrak.WriteRTP(pkt) } }, diff --git a/internal/formatprocessor/generic.go b/internal/formatprocessor/generic.go index 4901f823..142175e0 100644 --- a/internal/formatprocessor/generic.go +++ b/internal/formatprocessor/generic.go @@ -13,19 +13,19 @@ const ( maxPacketSize = 1472 ) -// DataGeneric is a generic data unit. -type DataGeneric struct { +// UnitGeneric is a generic data unit. +type UnitGeneric struct { RTPPackets []*rtp.Packet NTP time.Time } -// GetRTPPackets implements Data. -func (d *DataGeneric) GetRTPPackets() []*rtp.Packet { +// GetRTPPackets implements Unit. +func (d *UnitGeneric) GetRTPPackets() []*rtp.Packet { return d.RTPPackets } -// GetNTP implements Data. -func (d *DataGeneric) GetNTP() time.Time { +// GetNTP implements Unit. +func (d *UnitGeneric) GetNTP() time.Time { return d.NTP } @@ -39,10 +39,10 @@ func newGeneric(forma format.Format, generateRTPPackets bool) (*formatProcessorG return &formatProcessorGeneric{}, nil } -func (t *formatProcessorGeneric) Process(dat Data, hasNonRTSPReaders bool) error { - tdata := dat.(*DataGeneric) +func (t *formatProcessorGeneric) Process(unit Unit, hasNonRTSPReaders bool) error { + tunit := unit.(*UnitGeneric) - pkt := tdata.RTPPackets[0] + pkt := tunit.RTPPackets[0] // remove padding pkt.Header.Padding = false diff --git a/internal/formatprocessor/generic_test.go b/internal/formatprocessor/generic_test.go index e7d34fd4..6f3ac1fb 100644 --- a/internal/formatprocessor/generic_test.go +++ b/internal/formatprocessor/generic_test.go @@ -32,7 +32,7 @@ func TestGenericRemovePadding(t *testing.T) { PaddingSize: 20, } - err = p.Process(&DataGeneric{ + err = p.Process(&UnitGeneric{ RTPPackets: []*rtp.Packet{pkt}, }, false) require.NoError(t, err) diff --git a/internal/formatprocessor/h264.go b/internal/formatprocessor/h264.go index 240f495d..f557ddee 100644 --- a/internal/formatprocessor/h264.go +++ b/internal/formatprocessor/h264.go @@ -67,21 +67,21 @@ func rtpH264ExtractSPSPPS(pkt *rtp.Packet) ([]byte, []byte) { } } -// DataH264 is a H264 data unit. -type DataH264 struct { +// UnitH264 is a H264 data unit. +type UnitH264 struct { RTPPackets []*rtp.Packet NTP time.Time PTS time.Duration AU [][]byte } -// GetRTPPackets implements Data. -func (d *DataH264) GetRTPPackets() []*rtp.Packet { +// GetRTPPackets implements Unit. +func (d *UnitH264) GetRTPPackets() []*rtp.Packet { return d.RTPPackets } -// GetNTP implements Data. -func (d *DataH264) GetNTP() time.Time { +// GetNTP implements Unit. +func (d *UnitH264) GetNTP() time.Time { return d.NTP } @@ -198,11 +198,11 @@ func (t *formatProcessorH264) remuxAccessUnit(nalus [][]byte) [][]byte { return filteredNALUs } -func (t *formatProcessorH264) Process(dat Data, hasNonRTSPReaders bool) error { //nolint:dupl - tdata := dat.(*DataH264) +func (t *formatProcessorH264) Process(unit Unit, hasNonRTSPReaders bool) error { //nolint:dupl + tunit := unit.(*UnitH264) - if tdata.RTPPackets != nil { - pkt := tdata.RTPPackets[0] + if tunit.RTPPackets != nil { + pkt := tunit.RTPPackets[0] t.updateTrackParametersFromRTPPacket(pkt) if t.encoder == nil { @@ -233,7 +233,7 @@ func (t *formatProcessorH264) Process(dat Data, hasNonRTSPReaders bool) error { } if t.encoder != nil { - tdata.RTPPackets = nil + tunit.RTPPackets = nil } // DecodeUntilMarker() is necessary, otherwise Encode() generates partial groups @@ -245,9 +245,9 @@ func (t *formatProcessorH264) Process(dat Data, hasNonRTSPReaders bool) error { return err } - tdata.AU = au - tdata.PTS = PTS - tdata.AU = t.remuxAccessUnit(tdata.AU) + tunit.AU = au + tunit.PTS = PTS + tunit.AU = t.remuxAccessUnit(tunit.AU) } // route packet as is @@ -255,18 +255,18 @@ func (t *formatProcessorH264) Process(dat Data, hasNonRTSPReaders bool) error { return nil } } else { - t.updateTrackParametersFromNALUs(tdata.AU) - tdata.AU = t.remuxAccessUnit(tdata.AU) + t.updateTrackParametersFromNALUs(tunit.AU) + tunit.AU = t.remuxAccessUnit(tunit.AU) } - if len(tdata.AU) != 0 { - pkts, err := t.encoder.Encode(tdata.AU, tdata.PTS) + if len(tunit.AU) != 0 { + pkts, err := t.encoder.Encode(tunit.AU, tunit.PTS) if err != nil { return err } - tdata.RTPPackets = pkts + tunit.RTPPackets = pkts } else { - tdata.RTPPackets = nil + tunit.RTPPackets = nil } return nil diff --git a/internal/formatprocessor/h264_test.go b/internal/formatprocessor/h264_test.go index f796072a..3d2cab08 100644 --- a/internal/formatprocessor/h264_test.go +++ b/internal/formatprocessor/h264_test.go @@ -23,7 +23,7 @@ func TestH264DynamicParams(t *testing.T) { pkts, err := enc.Encode([][]byte{{byte(h264.NALUTypeIDR)}}, 0) require.NoError(t, err) - data := &DataH264{RTPPackets: []*rtp.Packet{pkts[0]}} + data := &UnitH264{RTPPackets: []*rtp.Packet{pkts[0]}} p.Process(data, true) require.Equal(t, [][]byte{ @@ -32,18 +32,18 @@ func TestH264DynamicParams(t *testing.T) { pkts, err = enc.Encode([][]byte{{7, 4, 5, 6}}, 0) // SPS require.NoError(t, err) - p.Process(&DataH264{RTPPackets: []*rtp.Packet{pkts[0]}}, false) + p.Process(&UnitH264{RTPPackets: []*rtp.Packet{pkts[0]}}, false) pkts, err = enc.Encode([][]byte{{8, 1}}, 0) // PPS require.NoError(t, err) - p.Process(&DataH264{RTPPackets: []*rtp.Packet{pkts[0]}}, false) + p.Process(&UnitH264{RTPPackets: []*rtp.Packet{pkts[0]}}, false) require.Equal(t, []byte{7, 4, 5, 6}, forma.SPS) require.Equal(t, []byte{8, 1}, forma.PPS) pkts, err = enc.Encode([][]byte{{byte(h264.NALUTypeIDR)}}, 0) require.NoError(t, err) - data = &DataH264{RTPPackets: []*rtp.Packet{pkts[0]}} + data = &UnitH264{RTPPackets: []*rtp.Packet{pkts[0]}} p.Process(data, true) require.Equal(t, [][]byte{ @@ -104,7 +104,7 @@ func TestH264OversizedPackets(t *testing.T) { Payload: []byte{0x1c, 0b01000000, 0x01, 0x02, 0x03, 0x04}, }, } { - data := &DataH264{RTPPackets: []*rtp.Packet{pkt}} + data := &UnitH264{RTPPackets: []*rtp.Packet{pkt}} p.Process(data, false) out = append(out, data.RTPPackets...) } @@ -161,7 +161,7 @@ func TestH264EmptyPacket(t *testing.T) { p, err := New(forma, true) require.NoError(t, err) - unit := &DataH264{ + unit := &UnitH264{ AU: [][]byte{ {0x07, 0x01, 0x02, 0x03}, // SPS {0x08, 0x01, 0x02}, // PPS diff --git a/internal/formatprocessor/h265.go b/internal/formatprocessor/h265.go index d420820c..6fb804b2 100644 --- a/internal/formatprocessor/h265.go +++ b/internal/formatprocessor/h265.go @@ -74,21 +74,21 @@ func rtpH265ExtractVPSSPSPPS(pkt *rtp.Packet) ([]byte, []byte, []byte) { } } -// DataH265 is a H265 data unit. -type DataH265 struct { +// UnitH265 is a H265 data unit. +type UnitH265 struct { RTPPackets []*rtp.Packet NTP time.Time PTS time.Duration AU [][]byte } -// GetRTPPackets implements Data. -func (d *DataH265) GetRTPPackets() []*rtp.Packet { +// GetRTPPackets implements Unit. +func (d *UnitH265) GetRTPPackets() []*rtp.Packet { return d.RTPPackets } -// GetNTP implements Data. -func (d *DataH265) GetNTP() time.Time { +// GetNTP implements Unit. +func (d *UnitH265) GetNTP() time.Time { return d.NTP } @@ -219,11 +219,11 @@ func (t *formatProcessorH265) remuxAccessUnit(nalus [][]byte) [][]byte { return filteredNALUs } -func (t *formatProcessorH265) Process(dat Data, hasNonRTSPReaders bool) error { //nolint:dupl - tdata := dat.(*DataH265) +func (t *formatProcessorH265) Process(unit Unit, hasNonRTSPReaders bool) error { //nolint:dupl + tunit := unit.(*UnitH265) - if tdata.RTPPackets != nil { - pkt := tdata.RTPPackets[0] + if tunit.RTPPackets != nil { + pkt := tunit.RTPPackets[0] t.updateTrackParametersFromRTPPacket(pkt) if t.encoder == nil { @@ -254,7 +254,7 @@ func (t *formatProcessorH265) Process(dat Data, hasNonRTSPReaders bool) error { } if t.encoder != nil { - tdata.RTPPackets = nil + tunit.RTPPackets = nil } // DecodeUntilMarker() is necessary, otherwise Encode() generates partial groups @@ -266,9 +266,9 @@ func (t *formatProcessorH265) Process(dat Data, hasNonRTSPReaders bool) error { return err } - tdata.AU = au - tdata.PTS = PTS - tdata.AU = t.remuxAccessUnit(tdata.AU) + tunit.AU = au + tunit.PTS = PTS + tunit.AU = t.remuxAccessUnit(tunit.AU) } // route packet as is @@ -276,18 +276,18 @@ func (t *formatProcessorH265) Process(dat Data, hasNonRTSPReaders bool) error { return nil } } else { - t.updateTrackParametersFromNALUs(tdata.AU) - tdata.AU = t.remuxAccessUnit(tdata.AU) + t.updateTrackParametersFromNALUs(tunit.AU) + tunit.AU = t.remuxAccessUnit(tunit.AU) } - if len(tdata.AU) != 0 { - pkts, err := t.encoder.Encode(tdata.AU, tdata.PTS) + if len(tunit.AU) != 0 { + pkts, err := t.encoder.Encode(tunit.AU, tunit.PTS) if err != nil { return err } - tdata.RTPPackets = pkts + tunit.RTPPackets = pkts } else { - tdata.RTPPackets = nil + tunit.RTPPackets = nil } return nil diff --git a/internal/formatprocessor/h265_test.go b/internal/formatprocessor/h265_test.go index 18832e8c..2b9687ec 100644 --- a/internal/formatprocessor/h265_test.go +++ b/internal/formatprocessor/h265_test.go @@ -22,7 +22,7 @@ func TestH265DynamicParams(t *testing.T) { pkts, err := enc.Encode([][]byte{{byte(h265.NALUType_CRA_NUT) << 1, 0}}, 0) require.NoError(t, err) - data := &DataH265{RTPPackets: []*rtp.Packet{pkts[0]}} + data := &UnitH265{RTPPackets: []*rtp.Packet{pkts[0]}} p.Process(data, true) require.Equal(t, [][]byte{ @@ -31,15 +31,15 @@ func TestH265DynamicParams(t *testing.T) { pkts, err = enc.Encode([][]byte{{byte(h265.NALUType_VPS_NUT) << 1, 1, 2, 3}}, 0) require.NoError(t, err) - p.Process(&DataH265{RTPPackets: []*rtp.Packet{pkts[0]}}, false) + p.Process(&UnitH265{RTPPackets: []*rtp.Packet{pkts[0]}}, false) pkts, err = enc.Encode([][]byte{{byte(h265.NALUType_SPS_NUT) << 1, 4, 5, 6}}, 0) require.NoError(t, err) - p.Process(&DataH265{RTPPackets: []*rtp.Packet{pkts[0]}}, false) + p.Process(&UnitH265{RTPPackets: []*rtp.Packet{pkts[0]}}, false) pkts, err = enc.Encode([][]byte{{byte(h265.NALUType_PPS_NUT) << 1, 7, 8, 9}}, 0) require.NoError(t, err) - p.Process(&DataH265{RTPPackets: []*rtp.Packet{pkts[0]}}, false) + p.Process(&UnitH265{RTPPackets: []*rtp.Packet{pkts[0]}}, false) require.Equal(t, []byte{byte(h265.NALUType_VPS_NUT) << 1, 1, 2, 3}, forma.VPS) require.Equal(t, []byte{byte(h265.NALUType_SPS_NUT) << 1, 4, 5, 6}, forma.SPS) @@ -47,7 +47,7 @@ func TestH265DynamicParams(t *testing.T) { pkts, err = enc.Encode([][]byte{{byte(h265.NALUType_CRA_NUT) << 1, 0}}, 0) require.NoError(t, err) - data = &DataH265{RTPPackets: []*rtp.Packet{pkts[0]}} + data = &UnitH265{RTPPackets: []*rtp.Packet{pkts[0]}} p.Process(data, true) require.Equal(t, [][]byte{ @@ -97,7 +97,7 @@ func TestH265OversizedPackets(t *testing.T) { Payload: bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 2000/4), }, } { - data := &DataH265{RTPPackets: []*rtp.Packet{pkt}} + data := &UnitH265{RTPPackets: []*rtp.Packet{pkt}} p.Process(data, false) out = append(out, data.RTPPackets...) } @@ -153,7 +153,7 @@ func TestH265EmptyPacket(t *testing.T) { p, err := New(forma, true) require.NoError(t, err) - unit := &DataH265{ + unit := &UnitH265{ AU: [][]byte{ {byte(h265.NALUType_VPS_NUT) << 1, 10, 11, 12}, // VPS {byte(h265.NALUType_SPS_NUT) << 1, 13, 14, 15}, // SPS diff --git a/internal/formatprocessor/mpeg4audio.go b/internal/formatprocessor/mpeg4audio.go index df6464a0..6fd76e83 100644 --- a/internal/formatprocessor/mpeg4audio.go +++ b/internal/formatprocessor/mpeg4audio.go @@ -9,21 +9,21 @@ import ( "github.com/pion/rtp" ) -// DataMPEG4Audio is a MPEG4-audio data unit. -type DataMPEG4Audio struct { +// UnitMPEG4Audio is a MPEG4-audio data unit. +type UnitMPEG4Audio struct { RTPPackets []*rtp.Packet NTP time.Time PTS time.Duration AUs [][]byte } -// GetRTPPackets implements Data. -func (d *DataMPEG4Audio) GetRTPPackets() []*rtp.Packet { +// GetRTPPackets implements Unit. +func (d *UnitMPEG4Audio) GetRTPPackets() []*rtp.Packet { return d.RTPPackets } -// GetNTP implements Data. -func (d *DataMPEG4Audio) GetNTP() time.Time { +// GetNTP implements Unit. +func (d *UnitMPEG4Audio) GetNTP() time.Time { return d.NTP } @@ -48,11 +48,11 @@ func newMPEG4Audio( return t, nil } -func (t *formatProcessorMPEG4Audio) Process(dat Data, hasNonRTSPReaders bool) error { //nolint:dupl - tdata := dat.(*DataMPEG4Audio) +func (t *formatProcessorMPEG4Audio) Process(unit Unit, hasNonRTSPReaders bool) error { //nolint:dupl + tunit := unit.(*UnitMPEG4Audio) - if tdata.RTPPackets != nil { - pkt := tdata.RTPPackets[0] + if tunit.RTPPackets != nil { + pkt := tunit.RTPPackets[0] // remove padding pkt.Header.Padding = false @@ -77,19 +77,19 @@ func (t *formatProcessorMPEG4Audio) Process(dat Data, hasNonRTSPReaders bool) er return err } - tdata.AUs = aus - tdata.PTS = PTS + tunit.AUs = aus + tunit.PTS = PTS } // route packet as is return nil } - pkts, err := t.encoder.Encode(tdata.AUs, tdata.PTS) + pkts, err := t.encoder.Encode(tunit.AUs, tunit.PTS) if err != nil { return err } - tdata.RTPPackets = pkts + tunit.RTPPackets = pkts return nil } diff --git a/internal/formatprocessor/opus.go b/internal/formatprocessor/opus.go index 88d48bec..95c1c707 100644 --- a/internal/formatprocessor/opus.go +++ b/internal/formatprocessor/opus.go @@ -9,21 +9,21 @@ import ( "github.com/pion/rtp" ) -// DataOpus is a Opus data unit. -type DataOpus struct { +// UnitOpus is a Opus data unit. +type UnitOpus struct { RTPPackets []*rtp.Packet NTP time.Time PTS time.Duration Frame []byte } -// GetRTPPackets implements Data. -func (d *DataOpus) GetRTPPackets() []*rtp.Packet { +// GetRTPPackets implements Unit. +func (d *UnitOpus) GetRTPPackets() []*rtp.Packet { return d.RTPPackets } -// GetNTP implements Data. -func (d *DataOpus) GetNTP() time.Time { +// GetNTP implements Unit. +func (d *UnitOpus) GetNTP() time.Time { return d.NTP } @@ -48,11 +48,11 @@ func newOpus( return t, nil } -func (t *formatProcessorOpus) Process(dat Data, hasNonRTSPReaders bool) error { //nolint:dupl - tdata := dat.(*DataOpus) +func (t *formatProcessorOpus) Process(unit Unit, hasNonRTSPReaders bool) error { //nolint:dupl + tunit := unit.(*UnitOpus) - if tdata.RTPPackets != nil { - pkt := tdata.RTPPackets[0] + if tunit.RTPPackets != nil { + pkt := tunit.RTPPackets[0] // remove padding pkt.Header.Padding = false @@ -74,19 +74,19 @@ func (t *formatProcessorOpus) Process(dat Data, hasNonRTSPReaders bool) error { return err } - tdata.Frame = frame - tdata.PTS = PTS + tunit.Frame = frame + tunit.PTS = PTS } // route packet as is return nil } - pkt, err := t.encoder.Encode(tdata.Frame, tdata.PTS) + pkt, err := t.encoder.Encode(tunit.Frame, tunit.PTS) if err != nil { return err } - tdata.RTPPackets = []*rtp.Packet{pkt} + tunit.RTPPackets = []*rtp.Packet{pkt} return nil } diff --git a/internal/formatprocessor/processor.go b/internal/formatprocessor/processor.go index 07ed56a3..5bad09c2 100644 --- a/internal/formatprocessor/processor.go +++ b/internal/formatprocessor/processor.go @@ -2,23 +2,13 @@ package formatprocessor import ( - "time" - - "github.com/pion/rtp" - "github.com/aler9/gortsplib/v2/pkg/format" ) -// Data is the elementary data unit routed across the server. -type Data interface { - GetRTPPackets() []*rtp.Packet - GetNTP() time.Time -} - // Processor allows to cleanup and normalize streams. type Processor interface { // clears and normalizes a data unit. - Process(Data, bool) error + Process(Unit, bool) error } // New allocates a Processor. diff --git a/internal/formatprocessor/unit.go b/internal/formatprocessor/unit.go new file mode 100644 index 00000000..0f878d99 --- /dev/null +++ b/internal/formatprocessor/unit.go @@ -0,0 +1,13 @@ +package formatprocessor + +import ( + "time" + + "github.com/pion/rtp" +) + +// Unit is the elementary data unit routed across the server. +type Unit interface { + GetRTPPackets() []*rtp.Packet + GetNTP() time.Time +} diff --git a/internal/formatprocessor/vp8.go b/internal/formatprocessor/vp8.go index 57822847..c860f3f9 100644 --- a/internal/formatprocessor/vp8.go +++ b/internal/formatprocessor/vp8.go @@ -9,21 +9,21 @@ import ( "github.com/pion/rtp" ) -// DataVP8 is a VP8 data unit. -type DataVP8 struct { +// UnitVP8 is a VP8 data unit. +type UnitVP8 struct { RTPPackets []*rtp.Packet NTP time.Time PTS time.Duration Frame []byte } -// GetRTPPackets implements Data. -func (d *DataVP8) GetRTPPackets() []*rtp.Packet { +// GetRTPPackets implements Unit. +func (d *UnitVP8) GetRTPPackets() []*rtp.Packet { return d.RTPPackets } -// GetNTP implements Data. -func (d *DataVP8) GetNTP() time.Time { +// GetNTP implements Unit. +func (d *UnitVP8) GetNTP() time.Time { return d.NTP } @@ -48,11 +48,11 @@ func newVP8( return t, nil } -func (t *formatProcessorVP8) Process(dat Data, hasNonRTSPReaders bool) error { //nolint:dupl - tdata := dat.(*DataVP8) +func (t *formatProcessorVP8) Process(unit Unit, hasNonRTSPReaders bool) error { //nolint:dupl + tunit := unit.(*UnitVP8) - if tdata.RTPPackets != nil { - pkt := tdata.RTPPackets[0] + if tunit.RTPPackets != nil { + pkt := tunit.RTPPackets[0] // remove padding pkt.Header.Padding = false @@ -77,19 +77,19 @@ func (t *formatProcessorVP8) Process(dat Data, hasNonRTSPReaders bool) error { / return err } - tdata.Frame = frame - tdata.PTS = PTS + tunit.Frame = frame + tunit.PTS = PTS } // route packet as is return nil } - pkts, err := t.encoder.Encode(tdata.Frame, tdata.PTS) + pkts, err := t.encoder.Encode(tunit.Frame, tunit.PTS) if err != nil { return err } - tdata.RTPPackets = pkts + tunit.RTPPackets = pkts return nil } diff --git a/internal/formatprocessor/vp9.go b/internal/formatprocessor/vp9.go index a128e1b6..711b7365 100644 --- a/internal/formatprocessor/vp9.go +++ b/internal/formatprocessor/vp9.go @@ -9,21 +9,21 @@ import ( "github.com/pion/rtp" ) -// DataVP9 is a VP9 data unit. -type DataVP9 struct { +// UnitVP9 is a VP9 data unit. +type UnitVP9 struct { RTPPackets []*rtp.Packet NTP time.Time PTS time.Duration Frame []byte } -// GetRTPPackets implements Data. -func (d *DataVP9) GetRTPPackets() []*rtp.Packet { +// GetRTPPackets implements Unit. +func (d *UnitVP9) GetRTPPackets() []*rtp.Packet { return d.RTPPackets } -// GetNTP implements Data. -func (d *DataVP9) GetNTP() time.Time { +// GetNTP implements Unit. +func (d *UnitVP9) GetNTP() time.Time { return d.NTP } @@ -48,11 +48,11 @@ func newVP9( return t, nil } -func (t *formatProcessorVP9) Process(dat Data, hasNonRTSPReaders bool) error { //nolint:dupl - tdata := dat.(*DataVP9) +func (t *formatProcessorVP9) Process(unit Unit, hasNonRTSPReaders bool) error { //nolint:dupl + tunit := unit.(*UnitVP9) - if tdata.RTPPackets != nil { - pkt := tdata.RTPPackets[0] + if tunit.RTPPackets != nil { + pkt := tunit.RTPPackets[0] // remove padding pkt.Header.Padding = false @@ -77,19 +77,19 @@ func (t *formatProcessorVP9) Process(dat Data, hasNonRTSPReaders bool) error { / return err } - tdata.Frame = frame - tdata.PTS = PTS + tunit.Frame = frame + tunit.PTS = PTS } // route packet as is return nil } - pkts, err := t.encoder.Encode(tdata.Frame, tdata.PTS) + pkts, err := t.encoder.Encode(tunit.Frame, tunit.PTS) if err != nil { return err } - tdata.RTPPackets = pkts + tunit.RTPPackets = pkts return nil } diff --git a/internal/hls/client_test.go b/internal/hls/client_test.go index 54135153..1ec10dfd 100644 --- a/internal/hls/client_test.go +++ b/internal/hls/client_test.go @@ -281,13 +281,13 @@ func TestClientMPEGTS(t *testing.T) { ) require.NoError(t, err) - onH264 := func(pts time.Duration, dat interface{}) { + onH264 := func(pts time.Duration, unit interface{}) { require.Equal(t, 2*time.Second, pts) require.Equal(t, [][]byte{ {7, 1, 2, 3}, {8}, {5}, - }, dat) + }, unit) close(packetRecv) } @@ -344,13 +344,13 @@ func TestClientFMP4(t *testing.T) { packetRecv := make(chan struct{}) - onH264 := func(pts time.Duration, dat interface{}) { + onH264 := func(pts time.Duration, unit interface{}) { require.Equal(t, 2*time.Second, pts) require.Equal(t, [][]byte{ {7, 1, 2, 3}, {8}, {5}, - }, dat) + }, unit) close(packetRecv) }