rename rtpproc into rtpcleaner

This commit is contained in:
aler9
2022-06-11 19:50:38 +02:00
parent 4e41b1c88b
commit ac910c63db
7 changed files with 40 additions and 41 deletions

View File

@@ -29,7 +29,7 @@ import (
"github.com/aler9/gortsplib/pkg/ringbuffer" "github.com/aler9/gortsplib/pkg/ringbuffer"
"github.com/aler9/gortsplib/pkg/rtcpreceiver" "github.com/aler9/gortsplib/pkg/rtcpreceiver"
"github.com/aler9/gortsplib/pkg/rtcpsender" "github.com/aler9/gortsplib/pkg/rtcpsender"
"github.com/aler9/gortsplib/pkg/rtpproc" "github.com/aler9/gortsplib/pkg/rtpcleaner"
"github.com/aler9/gortsplib/pkg/sdp" "github.com/aler9/gortsplib/pkg/sdp"
"github.com/aler9/gortsplib/pkg/url" "github.com/aler9/gortsplib/pkg/url"
) )
@@ -93,7 +93,7 @@ type clientTrack struct {
// play // play
udpRTPPacketBuffer *rtpPacketMultiBuffer udpRTPPacketBuffer *rtpPacketMultiBuffer
udpRTCPReceiver *rtcpreceiver.RTCPReceiver udpRTCPReceiver *rtcpreceiver.RTCPReceiver
proc *rtpproc.Processor cleaner *rtpcleaner.Cleaner
// record // record
udpRTCPSender *rtcpsender.RTCPSender udpRTCPSender *rtcpsender.RTCPSender
@@ -715,7 +715,7 @@ func (c *Client) playRecordStart() {
if c.state == clientStatePlay { if c.state == clientStatePlay {
for _, ct := range c.tracks { for _, ct := range c.tracks {
_, isH264 := ct.track.(*TrackH264) _, isH264 := ct.track.(*TrackH264)
ct.proc = rtpproc.NewProcessor(isH264, *c.effectiveTransport == TransportTCP) ct.cleaner = rtpcleaner.NewCleaner(isH264, *c.effectiveTransport == TransportTCP)
} }
c.keepaliveTimer = time.NewTimer(c.keepalivePeriod) c.keepaliveTimer = time.NewTimer(c.keepalivePeriod)
@@ -814,7 +814,7 @@ func (c *Client) runReader() {
return err return err
} }
out, err := track.proc.Process(pkt) out, err := track.cleaner.Clear(pkt)
if err != nil { if err != nil {
return err return err
} }
@@ -940,7 +940,7 @@ func (c *Client) playRecordStop(isClosing bool) {
} }
for _, ct := range c.tracks { for _, ct := range c.tracks {
ct.proc = nil ct.cleaner = nil
} }
// stop timers // stop timers

View File

@@ -196,7 +196,7 @@ func (u *clientUDPListener) processPlayRTP(now time.Time, payload []byte) {
return return
} }
out, err := u.ct.proc.Process(pkt) out, err := u.ct.cleaner.Clear(pkt)
if err != nil { if err != nil {
return return
} }

View File

@@ -1,4 +1,4 @@
package rtpproc package rtpcleaner
import ( import (
"fmt" "fmt"
@@ -15,19 +15,18 @@ const (
maxPacketSize = 1472 maxPacketSize = 1472
) )
// ProcessorOutput is the output of Process(). // Output is the output of Clear().
type ProcessorOutput struct { type Output struct {
Packet *rtp.Packet Packet *rtp.Packet
PTSEqualsDTS bool PTSEqualsDTS bool
H264NALUs [][]byte H264NALUs [][]byte
H264PTS time.Duration H264PTS time.Duration
} }
// Processor is used to process incoming RTP packets, in order to: // Cleaner is used to clean incoming RTP packets, in order to:
// - remove padding // - remove padding
// - decode packets encoded with supported codecs
// - re-encode packets if they are bigger than maximum allowed. // - re-encode packets if they are bigger than maximum allowed.
type Processor struct { type Cleaner struct {
isH264 bool isH264 bool
isTCP bool isTCP bool
@@ -35,9 +34,9 @@ type Processor struct {
h264Encoder *rtph264.Encoder h264Encoder *rtph264.Encoder
} }
// NewProcessor allocates a Processor. // NewCleaner allocates a Cleaner.
func NewProcessor(isH264 bool, isTCP bool) *Processor { func NewCleaner(isH264 bool, isTCP bool) *Cleaner {
p := &Processor{ p := &Cleaner{
isH264: isH264, isH264: isH264,
isTCP: isTCP, isTCP: isTCP,
} }
@@ -50,7 +49,7 @@ func NewProcessor(isH264 bool, isTCP bool) *Processor {
return p return p
} }
func (p *Processor) processH264(pkt *rtp.Packet) ([]*ProcessorOutput, error) { func (p *Cleaner) processH264(pkt *rtp.Packet) ([]*Output, error) {
// check if we need to re-encode // check if we need to re-encode
if p.isTCP && p.h264Encoder == nil && pkt.MarshalSize() > maxPacketSize { if p.isTCP && p.h264Encoder == nil && pkt.MarshalSize() > maxPacketSize {
v1 := pkt.SSRC v1 := pkt.SSRC
@@ -71,7 +70,7 @@ func (p *Processor) processH264(pkt *rtp.Packet) ([]*ProcessorOutput, error) {
if err == rtph264.ErrNonStartingPacketAndNoPrevious || if err == rtph264.ErrNonStartingPacketAndNoPrevious ||
err == rtph264.ErrMorePacketsNeeded { err == rtph264.ErrMorePacketsNeeded {
if p.h264Encoder == nil { if p.h264Encoder == nil {
return []*ProcessorOutput{{ return []*Output{{
Packet: pkt, Packet: pkt,
PTSEqualsDTS: false, PTSEqualsDTS: false,
}}, nil }}, nil
@@ -90,16 +89,16 @@ func (p *Processor) processH264(pkt *rtp.Packet) ([]*ProcessorOutput, error) {
return nil, err return nil, err
} }
output := make([]*ProcessorOutput, len(packets)) output := make([]*Output, len(packets))
for i, pkt := range packets { for i, pkt := range packets {
if i != len(packets)-1 { if i != len(packets)-1 {
output[i] = &ProcessorOutput{ output[i] = &Output{
Packet: pkt, Packet: pkt,
PTSEqualsDTS: false, PTSEqualsDTS: false,
} }
} else { } else {
output[i] = &ProcessorOutput{ output[i] = &Output{
Packet: pkt, Packet: pkt,
PTSEqualsDTS: ptsEqualsDTS, PTSEqualsDTS: ptsEqualsDTS,
H264NALUs: nalus, H264NALUs: nalus,
@@ -111,7 +110,7 @@ func (p *Processor) processH264(pkt *rtp.Packet) ([]*ProcessorOutput, error) {
return output, nil return output, nil
} }
return []*ProcessorOutput{{ return []*Output{{
Packet: pkt, Packet: pkt,
PTSEqualsDTS: ptsEqualsDTS, PTSEqualsDTS: ptsEqualsDTS,
H264NALUs: nalus, H264NALUs: nalus,
@@ -119,8 +118,8 @@ func (p *Processor) processH264(pkt *rtp.Packet) ([]*ProcessorOutput, error) {
}}, nil }}, nil
} }
// Process processes a RTP packet. // Clear processes a RTP packet.
func (p *Processor) Process(pkt *rtp.Packet) ([]*ProcessorOutput, error) { func (p *Cleaner) Clear(pkt *rtp.Packet) ([]*Output, error) {
// remove padding // remove padding
pkt.Header.Padding = false pkt.Header.Padding = false
pkt.PaddingSize = 0 pkt.PaddingSize = 0
@@ -134,7 +133,7 @@ func (p *Processor) Process(pkt *rtp.Packet) ([]*ProcessorOutput, error) {
pkt.MarshalSize(), maxPacketSize) pkt.MarshalSize(), maxPacketSize)
} }
return []*ProcessorOutput{{ return []*Output{{
Packet: pkt, Packet: pkt,
PTSEqualsDTS: true, PTSEqualsDTS: true,
}}, nil }}, nil

View File

@@ -1,4 +1,4 @@
package rtpproc package rtpcleaner
import ( import (
"bytes" "bytes"
@@ -8,10 +8,10 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestProcessRemovePadding(t *testing.T) { func TestRemovePadding(t *testing.T) {
proc := NewProcessor(false, false) cleaner := NewCleaner(false, false)
out, err := proc.Process(&rtp.Packet{ out, err := cleaner.Clear(&rtp.Packet{
Header: rtp.Header{ Header: rtp.Header{
Version: 2, Version: 2,
PayloadType: 96, PayloadType: 96,
@@ -23,7 +23,7 @@ func TestProcessRemovePadding(t *testing.T) {
PaddingSize: 64, PaddingSize: 64,
}) })
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, []*ProcessorOutput{{ require.Equal(t, []*Output{{
Packet: &rtp.Packet{ Packet: &rtp.Packet{
Header: rtp.Header{ Header: rtp.Header{
Version: 2, Version: 2,
@@ -37,10 +37,10 @@ func TestProcessRemovePadding(t *testing.T) {
}}, out) }}, out)
} }
func TestProcessH264Oversized(t *testing.T) { func TestH264Oversized(t *testing.T) {
proc := NewProcessor(true, true) cleaner := NewCleaner(true, true)
out, err := proc.Process(&rtp.Packet{ out, err := cleaner.Clear(&rtp.Packet{
Header: rtp.Header{ Header: rtp.Header{
Version: 2, Version: 2,
PayloadType: 96, PayloadType: 96,
@@ -53,9 +53,9 @@ func TestProcessH264Oversized(t *testing.T) {
), ),
}) })
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, []*ProcessorOutput(nil), out) require.Equal(t, []*Output(nil), out)
out, err = proc.Process(&rtp.Packet{ out, err = cleaner.Clear(&rtp.Packet{
Header: rtp.Header{ Header: rtp.Header{
Version: 2, Version: 2,
PayloadType: 96, PayloadType: 96,
@@ -68,7 +68,7 @@ func TestProcessH264Oversized(t *testing.T) {
), ),
}) })
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, []*ProcessorOutput{ require.Equal(t, []*Output{
{ {
Packet: &rtp.Packet{ Packet: &rtp.Packet{
Header: rtp.Header{ Header: rtp.Header{

View File

@@ -257,7 +257,7 @@ func (sc *ServerConn) readFuncTCP(readRequest chan readReq) error {
return err return err
} }
out, err := sc.session.setuppedTracks[trackID].proc.Process(pkt) out, err := sc.session.setuppedTracks[trackID].cleaner.Clear(pkt)
if err != nil { if err != nil {
return err return err
} }

View File

@@ -18,7 +18,7 @@ import (
"github.com/aler9/gortsplib/pkg/liberrors" "github.com/aler9/gortsplib/pkg/liberrors"
"github.com/aler9/gortsplib/pkg/ringbuffer" "github.com/aler9/gortsplib/pkg/ringbuffer"
"github.com/aler9/gortsplib/pkg/rtcpreceiver" "github.com/aler9/gortsplib/pkg/rtcpreceiver"
"github.com/aler9/gortsplib/pkg/rtpproc" "github.com/aler9/gortsplib/pkg/rtpcleaner"
"github.com/aler9/gortsplib/pkg/url" "github.com/aler9/gortsplib/pkg/url"
) )
@@ -151,7 +151,7 @@ type ServerSessionSetuppedTrack struct {
// publish // publish
udpRTCPReceiver *rtcpreceiver.RTCPReceiver udpRTCPReceiver *rtcpreceiver.RTCPReceiver
proc *rtpproc.Processor cleaner *rtpcleaner.Cleaner
} }
// ServerSession is a server-side RTSP session. // ServerSession is a server-side RTSP session.
@@ -973,7 +973,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
for trackID, st := range ss.setuppedTracks { for trackID, st := range ss.setuppedTracks {
_, isH264 := ss.announcedTracks[trackID].(*TrackH264) _, isH264 := ss.announcedTracks[trackID].(*TrackH264)
st.proc = rtpproc.NewProcessor(isH264, *ss.setuppedTransport == TransportTCP) st.cleaner = rtpcleaner.NewCleaner(isH264, *ss.setuppedTransport == TransportTCP)
} }
switch *ss.setuppedTransport { switch *ss.setuppedTransport {
@@ -1097,7 +1097,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
} }
for _, st := range ss.setuppedTracks { for _, st := range ss.setuppedTracks {
st.proc = nil st.cleaner = nil
} }
ss.state = ServerSessionStatePreRecord ss.state = ServerSessionStatePreRecord

View File

@@ -201,7 +201,7 @@ func (u *serverUDPListener) processRTP(clientData *clientData, payload []byte) {
now := time.Now() now := time.Now()
atomic.StoreInt64(clientData.ss.udpLastFrameTime, now.Unix()) atomic.StoreInt64(clientData.ss.udpLastFrameTime, now.Unix())
out, err := clientData.track.proc.Process(pkt) out, err := clientData.track.cleaner.Clear(pkt)
if err != nil { if err != nil {
return return
} }