diff --git a/client.go b/client.go index e2fb3298..5cbeb3e9 100644 --- a/client.go +++ b/client.go @@ -30,6 +30,7 @@ import ( "github.com/aler9/gortsplib/pkg/rtcpreceiver" "github.com/aler9/gortsplib/pkg/rtcpsender" "github.com/aler9/gortsplib/pkg/rtpcleaner" + "github.com/aler9/gortsplib/pkg/rtpreorderer" "github.com/aler9/gortsplib/pkg/sdp" "github.com/aler9/gortsplib/pkg/url" ) @@ -93,6 +94,7 @@ type clientTrack struct { // play udpRTPPacketBuffer *rtpPacketMultiBuffer udpRTCPReceiver *rtcpreceiver.RTCPReceiver + reorderer *rtpreorderer.Reorderer cleaner *rtpcleaner.Cleaner // record @@ -717,8 +719,11 @@ func (c *Client) playRecordStart() { if c.state == clientStatePlay { for _, ct := range c.tracks { + if *c.effectiveTransport == TransportUDP || *c.effectiveTransport == TransportUDPMulticast { + ct.reorderer = rtpreorderer.New() + } _, isH264 := ct.track.(*TrackH264) - ct.cleaner = rtpcleaner.NewCleaner(isH264, *c.effectiveTransport == TransportTCP) + ct.cleaner = rtpcleaner.New(isH264, *c.effectiveTransport == TransportTCP) } c.keepaliveTimer = time.NewTimer(c.keepalivePeriod) @@ -817,7 +822,7 @@ func (c *Client) runReader() { return err } - out, err := track.cleaner.Clear(pkt) + out, err := track.cleaner.Process(pkt) if err != nil { return err } @@ -944,6 +949,7 @@ func (c *Client) playRecordStop(isClosing bool) { for _, ct := range c.tracks { ct.cleaner = nil + ct.reorderer = nil } // stop timers diff --git a/client_read_test.go b/client_read_test.go index 1360217f..62ecf7fd 100644 --- a/client_read_test.go +++ b/client_read_test.go @@ -342,14 +342,12 @@ func TestClientRead(t *testing.T) { // server -> client (RTP) switch transport { case "udp": - time.Sleep(1 * time.Second) l1.WriteTo(testRTPPacketMarshaled, &net.UDPAddr{ IP: net.ParseIP("127.0.0.1"), Port: th.ClientPorts[0], }) case "multicast": - time.Sleep(1 * time.Second) l1.WriteTo(testRTPPacketMarshaled, &net.UDPAddr{ IP: net.ParseIP("224.1.0.1"), Port: 25000, diff --git a/clientudpl.go b/clientudpl.go index 1868ce74..30670a26 100644 --- a/clientudpl.go +++ b/clientudpl.go @@ -196,21 +196,25 @@ func (u *clientUDPListener) processPlayRTP(now time.Time, payload []byte) { return } - out, err := u.ct.cleaner.Clear(pkt) - if err != nil { - return + packets := u.ct.reorderer.Process(pkt) + + for _, pkt := range packets { + out, err := u.ct.cleaner.Process(pkt) + if err != nil { + return + } + out0 := out[0] + + u.ct.udpRTCPReceiver.ProcessPacketRTP(time.Now(), pkt, out0.PTSEqualsDTS) + + u.c.OnPacketRTP(&ClientOnPacketRTPCtx{ + TrackID: u.ct.id, + Packet: out0.Packet, + PTSEqualsDTS: out0.PTSEqualsDTS, + H264NALUs: out0.H264NALUs, + H264PTS: out0.H264PTS, + }) } - out0 := out[0] - - u.ct.udpRTCPReceiver.ProcessPacketRTP(time.Now(), pkt, out0.PTSEqualsDTS) - - u.c.OnPacketRTP(&ClientOnPacketRTPCtx{ - TrackID: u.ct.id, - Packet: out0.Packet, - PTSEqualsDTS: out0.PTSEqualsDTS, - H264NALUs: out0.H264NALUs, - H264PTS: out0.H264PTS, - }) } func (u *clientUDPListener) processPlayRTCP(now time.Time, payload []byte) { diff --git a/pkg/rtcpreceiver/rtcpreceiver.go b/pkg/rtcpreceiver/rtcpreceiver.go index 65b26572..cddcd877 100644 --- a/pkg/rtcpreceiver/rtcpreceiver.go +++ b/pkg/rtcpreceiver/rtcpreceiver.go @@ -151,49 +151,45 @@ func (rr *RTCPReceiver) ProcessPacketRTP(ts time.Time, pkt *rtp.Packet, ptsEqual } else { diff := int32(pkt.Header.SequenceNumber) - int32(*rr.lastSequenceNumber) - // following packet or following packet after an overflow - if diff > 0 || diff < -0x0FFF { - // overflow - if diff < -0x0FFF { - rr.sequenceNumberCycles++ + // overflow + if diff < -0x0FFF { + rr.sequenceNumberCycles++ + } + + // detect lost packets + if pkt.Header.SequenceNumber != (*rr.lastSequenceNumber + 1) { + rr.totalLost += uint32(uint16(diff) - 1) + rr.totalLostSinceReport += uint32(uint16(diff) - 1) + + // allow up to 24 bits + if rr.totalLost > 0xFFFFFF { + rr.totalLost = 0xFFFFFF } - - // detect lost packets - if pkt.Header.SequenceNumber != (*rr.lastSequenceNumber + 1) { - rr.totalLost += uint32(uint16(diff) - 1) - rr.totalLostSinceReport += uint32(uint16(diff) - 1) - - // allow up to 24 bits - if rr.totalLost > 0xFFFFFF { - rr.totalLost = 0xFFFFFF - } - if rr.totalLostSinceReport > 0xFFFFFF { - rr.totalLostSinceReport = 0xFFFFFF - } - } - - rr.totalSinceReport += uint32(uint16(diff)) - v := pkt.Header.SequenceNumber - rr.lastSequenceNumber = &v - - if ptsEqualsDTS { - if rr.lastRTPTimeRTP != nil { - // update jitter - // https://tools.ietf.org/html/rfc3550#page-39 - D := ts.Sub(rr.lastRTPTimeTime).Seconds()*rr.clockRate - - (float64(pkt.Header.Timestamp) - float64(*rr.lastRTPTimeRTP)) - if D < 0 { - D = -D - } - rr.jitter += (D - rr.jitter) / 16 - } - - v := pkt.Header.Timestamp - rr.lastRTPTimeRTP = &v - rr.lastRTPTimeTime = ts + if rr.totalLostSinceReport > 0xFFFFFF { + rr.totalLostSinceReport = 0xFFFFFF } } - // ignore invalid packets (diff = 0) or reordered packets (diff < 0) + + rr.totalSinceReport += uint32(uint16(diff)) + v := pkt.Header.SequenceNumber + rr.lastSequenceNumber = &v + + if ptsEqualsDTS { + if rr.lastRTPTimeRTP != nil { + // update jitter + // https://tools.ietf.org/html/rfc3550#page-39 + D := ts.Sub(rr.lastRTPTimeTime).Seconds()*rr.clockRate - + (float64(pkt.Header.Timestamp) - float64(*rr.lastRTPTimeRTP)) + if D < 0 { + D = -D + } + rr.jitter += (D - rr.jitter) / 16 + } + + v := pkt.Header.Timestamp + rr.lastRTPTimeRTP = &v + rr.lastRTPTimeTime = ts + } } } diff --git a/pkg/rtcpreceiver/rtcpreceiver_test.go b/pkg/rtcpreceiver/rtcpreceiver_test.go index 0cac69f2..514ff6fb 100644 --- a/pkg/rtcpreceiver/rtcpreceiver_test.go +++ b/pkg/rtcpreceiver/rtcpreceiver_test.go @@ -276,70 +276,6 @@ func TestRTCPReceiverOverflowPacketLost(t *testing.T) { <-done } -func TestRTCPReceiverReorderedPackets(t *testing.T) { - done := make(chan struct{}) - now = func() time.Time { - return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) - } - v := uint32(0x65f83afb) - - rr := New(500*time.Millisecond, &v, 90000, func(pkt rtcp.Packet) { - require.Equal(t, &rtcp.ReceiverReport{ - SSRC: 0x65f83afb, - Reports: []rtcp.ReceptionReport{ - { - SSRC: 0xba9da416, - LastSequenceNumber: 0x43a7, - LastSenderReport: 0x887a17ce, - Delay: 1 * 65536, - }, - }, - }, pkt) - close(done) - }) - defer rr.Close() - - srPkt := rtcp.SenderReport{ - SSRC: 0xba9da416, - NTPTime: 0xe363887a17ced916, - RTPTime: 1287981738, - PacketCount: 714, - OctetCount: 859127, - } - ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) - rr.ProcessPacketRTCP(ts, &srPkt) - - rtpPkt := rtp.Packet{ - Header: rtp.Header{ - Version: 2, - Marker: true, - PayloadType: 96, - SequenceNumber: 0x43a7, - Timestamp: 0xafb45733, - SSRC: 0xba9da416, - }, - Payload: []byte("\x00\x00"), - } - ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) - rr.ProcessPacketRTP(ts, &rtpPkt, true) - - rtpPkt = rtp.Packet{ - Header: rtp.Header{ - Version: 2, - Marker: true, - PayloadType: 96, - SequenceNumber: 0x43a6, - Timestamp: 0xafb45733, - SSRC: 0xba9da416, - }, - Payload: []byte("\x00\x00"), - } - ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) - rr.ProcessPacketRTP(ts, &rtpPkt, true) - - <-done -} - func TestRTCPReceiverJitter(t *testing.T) { done := make(chan struct{}) now = func() time.Time { diff --git a/pkg/rtpcleaner/cleaner.go b/pkg/rtpcleaner/cleaner.go index 6a658b61..901b64a6 100644 --- a/pkg/rtpcleaner/cleaner.go +++ b/pkg/rtpcleaner/cleaner.go @@ -35,8 +35,8 @@ type Cleaner struct { h264Encoder *rtph264.Encoder } -// NewCleaner allocates a Cleaner. -func NewCleaner(isH264 bool, isTCP bool) *Cleaner { +// New allocates a Cleaner. +func New(isH264 bool, isTCP bool) *Cleaner { p := &Cleaner{ isH264: isH264, isTCP: isTCP, @@ -120,8 +120,8 @@ func (p *Cleaner) processH264(pkt *rtp.Packet) ([]*Output, error) { }}, nil } -// Clear processes a RTP packet. -func (p *Cleaner) Clear(pkt *rtp.Packet) ([]*Output, error) { +// Process processes a RTP packet. +func (p *Cleaner) Process(pkt *rtp.Packet) ([]*Output, error) { // remove padding pkt.Header.Padding = false pkt.PaddingSize = 0 diff --git a/pkg/rtpcleaner/cleaner_test.go b/pkg/rtpcleaner/cleaner_test.go index 0797d228..0a07fa38 100644 --- a/pkg/rtpcleaner/cleaner_test.go +++ b/pkg/rtpcleaner/cleaner_test.go @@ -9,9 +9,9 @@ import ( ) func TestRemovePadding(t *testing.T) { - cleaner := NewCleaner(false, false) + cleaner := New(false, false) - out, err := cleaner.Clear(&rtp.Packet{ + out, err := cleaner.Process(&rtp.Packet{ Header: rtp.Header{ Version: 2, PayloadType: 96, @@ -38,9 +38,9 @@ func TestRemovePadding(t *testing.T) { } func TestGenericOversized(t *testing.T) { - cleaner := NewCleaner(false, true) + cleaner := New(false, true) - _, err := cleaner.Clear(&rtp.Packet{ + _, err := cleaner.Process(&rtp.Packet{ Header: rtp.Header{ Version: 2, PayloadType: 96, @@ -53,9 +53,9 @@ func TestGenericOversized(t *testing.T) { } func TestH264Oversized(t *testing.T) { - cleaner := NewCleaner(true, true) + cleaner := New(true, true) - out, err := cleaner.Clear(&rtp.Packet{ + out, err := cleaner.Process(&rtp.Packet{ Header: rtp.Header{ Version: 2, PayloadType: 96, @@ -70,7 +70,7 @@ func TestH264Oversized(t *testing.T) { require.NoError(t, err) require.Equal(t, []*Output(nil), out) - out, err = cleaner.Clear(&rtp.Packet{ + out, err = cleaner.Process(&rtp.Packet{ Header: rtp.Header{ Version: 2, PayloadType: 96, diff --git a/pkg/rtpreorderer/reorderer.go b/pkg/rtpreorderer/reorderer.go new file mode 100644 index 00000000..11e9a8a3 --- /dev/null +++ b/pkg/rtpreorderer/reorderer.go @@ -0,0 +1,92 @@ +package rtpreorderer + +import ( + "github.com/pion/rtp" +) + +const ( + bufferSize = 64 +) + +// Reorderer filters incoming RTP packets, in order to +// - order packets +// - remove duplicate packets +type Reorderer struct { + initialized bool + expectedSeqNum uint16 + buffer []*rtp.Packet + absPos uint16 +} + +// New allocates a Reorderer. +func New() *Reorderer { + return &Reorderer{ + buffer: make([]*rtp.Packet, bufferSize), + } +} + +// Process processes a RTP packet. +func (r *Reorderer) Process(pkt *rtp.Packet) []*rtp.Packet { + if !r.initialized { + r.initialized = true + r.expectedSeqNum = pkt.SequenceNumber + 1 + return []*rtp.Packet{pkt} + } + + relPos := pkt.SequenceNumber - r.expectedSeqNum + + // packet is a duplicate or has been sent + // before the first packet processed by Reorderer. + // discard. + if relPos > 0xFFF { + return nil + } + + // buffer is full. clear buffer and return current packet. + if relPos >= bufferSize { + for i := 0; i < bufferSize; i++ { + r.buffer[i] = nil + } + r.expectedSeqNum = pkt.SequenceNumber + 1 + return []*rtp.Packet{pkt} + } + + // there's a missing packet + if relPos != 0 { + p := (r.absPos + relPos) & (bufferSize - 1) + + // current packet is a duplicate. discard. + if r.buffer[p] != nil { + return nil + } + + // put current packet in buffer. + r.buffer[p] = pkt + return nil + } + + count := uint16(1) + for { + p := (r.absPos + count) & (bufferSize - 1) + if r.buffer[p] == nil { + break + } + count++ + } + + ret := make([]*rtp.Packet, count) + ret[0] = pkt + + r.absPos++ + r.absPos &= (bufferSize - 1) + + for i := uint16(1); i < count; i++ { + ret[i], r.buffer[r.absPos] = r.buffer[r.absPos], nil + r.absPos++ + r.absPos &= (bufferSize - 1) + } + + r.expectedSeqNum = pkt.SequenceNumber + count + + return ret +} diff --git a/pkg/rtpreorderer/reorderer_test.go b/pkg/rtpreorderer/reorderer_test.go new file mode 100644 index 00000000..12e6614f --- /dev/null +++ b/pkg/rtpreorderer/reorderer_test.go @@ -0,0 +1,182 @@ +package rtpreorderer + +import ( + "testing" + + "github.com/pion/rtp" + "github.com/stretchr/testify/require" +) + +func TestReorder(t *testing.T) { + sequence := []struct { + in *rtp.Packet + out []*rtp.Packet + }{ + { + // first packet + &rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 65530, + }, + }, + []*rtp.Packet{{ + Header: rtp.Header{ + SequenceNumber: 65530, + }, + }}, + }, + { + // packet sent before first packet + &rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 65529, + }, + }, + []*rtp.Packet(nil), + }, + { + // ok + &rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 65531, + }, + }, + []*rtp.Packet{{ + Header: rtp.Header{ + SequenceNumber: 65531, + }, + }}, + }, + { + // duplicated + &rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 65531, + }, + }, + []*rtp.Packet(nil), + }, + { + // gap + &rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 65535, + }, + }, + []*rtp.Packet(nil), + }, + { + // unordered + &rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 65533, + PayloadType: 96, + }, + }, + []*rtp.Packet(nil), + }, + { + // unordered + duplicated + &rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 65533, + PayloadType: 97, + }, + }, + []*rtp.Packet(nil), + }, + { + // unordered + &rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 65532, + }, + }, + []*rtp.Packet{ + { + Header: rtp.Header{ + SequenceNumber: 65532, + }, + }, + { + Header: rtp.Header{ + SequenceNumber: 65533, + PayloadType: 96, + }, + }, + }, + }, + { + // unordered + &rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 65534, + }, + }, + []*rtp.Packet{ + { + Header: rtp.Header{ + SequenceNumber: 65534, + }, + }, + { + Header: rtp.Header{ + SequenceNumber: 65535, + }, + }, + }, + }, + { + // overflow + gap + &rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 1, + }, + }, + []*rtp.Packet(nil), + }, + { + // unordered + &rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 0, + }, + }, + []*rtp.Packet{ + { + Header: rtp.Header{ + SequenceNumber: 0, + }, + }, + { + Header: rtp.Header{ + SequenceNumber: 1, + }, + }, + }, + }, + { + // buffer is full + &rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 67, + }, + }, + []*rtp.Packet{ + { + Header: rtp.Header{ + SequenceNumber: 67, + }, + }, + }, + }, + } + + r := New() + r.absPos = 40 + + for _, entry := range sequence { + out := r.Process(entry.in) + require.Equal(t, entry.out, out) + } +} diff --git a/serverconn.go b/serverconn.go index ec92b9d9..d2ea6f30 100644 --- a/serverconn.go +++ b/serverconn.go @@ -257,7 +257,7 @@ func (sc *ServerConn) readFuncTCP(readRequest chan readReq) error { return err } - out, err := sc.session.setuppedTracks[trackID].cleaner.Clear(pkt) + out, err := sc.session.setuppedTracks[trackID].cleaner.Process(pkt) if err != nil { return err } diff --git a/serversession.go b/serversession.go index 59237239..34316403 100644 --- a/serversession.go +++ b/serversession.go @@ -19,6 +19,7 @@ import ( "github.com/aler9/gortsplib/pkg/ringbuffer" "github.com/aler9/gortsplib/pkg/rtcpreceiver" "github.com/aler9/gortsplib/pkg/rtpcleaner" + "github.com/aler9/gortsplib/pkg/rtpreorderer" "github.com/aler9/gortsplib/pkg/url" ) @@ -151,6 +152,7 @@ type ServerSessionSetuppedTrack struct { // publish udpRTCPReceiver *rtcpreceiver.RTCPReceiver + reorderer *rtpreorderer.Reorderer cleaner *rtpcleaner.Cleaner } @@ -974,8 +976,11 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base ss.state = ServerSessionStateRecord for trackID, st := range ss.setuppedTracks { + if *ss.setuppedTransport == TransportUDP { + st.reorderer = rtpreorderer.New() + } _, isH264 := ss.announcedTracks[trackID].(*TrackH264) - st.cleaner = rtpcleaner.NewCleaner(isH264, *ss.setuppedTransport == TransportTCP) + st.cleaner = rtpcleaner.New(isH264, *ss.setuppedTransport == TransportTCP) } switch *ss.setuppedTransport { @@ -1100,6 +1105,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base for _, st := range ss.setuppedTracks { st.cleaner = nil + st.reorderer = nil } ss.state = ServerSessionStatePreRecord diff --git a/serverudpl.go b/serverudpl.go index ede64b11..70516aea 100644 --- a/serverudpl.go +++ b/serverudpl.go @@ -198,26 +198,30 @@ func (u *serverUDPListener) processRTP(clientData *clientData, payload []byte) { return } - now := time.Now() - atomic.StoreInt64(clientData.ss.udpLastFrameTime, now.Unix()) + packets := clientData.track.reorderer.Process(pkt) - out, err := clientData.track.cleaner.Clear(pkt) - if err != nil { - return - } - out0 := out[0] + for _, pkt := range packets { + now := time.Now() + atomic.StoreInt64(clientData.ss.udpLastFrameTime, now.Unix()) - clientData.track.udpRTCPReceiver.ProcessPacketRTP(now, pkt, out0.PTSEqualsDTS) + out, err := clientData.track.cleaner.Process(pkt) + if err != nil { + return + } + out0 := out[0] - if h, ok := clientData.ss.s.Handler.(ServerHandlerOnPacketRTP); ok { - h.OnPacketRTP(&ServerHandlerOnPacketRTPCtx{ - Session: clientData.ss, - TrackID: clientData.track.id, - Packet: out0.Packet, - PTSEqualsDTS: out0.PTSEqualsDTS, - H264NALUs: out0.H264NALUs, - H264PTS: out0.H264PTS, - }) + clientData.track.udpRTCPReceiver.ProcessPacketRTP(now, pkt, out0.PTSEqualsDTS) + + if h, ok := clientData.ss.s.Handler.(ServerHandlerOnPacketRTP); ok { + h.OnPacketRTP(&ServerHandlerOnPacketRTPCtx{ + Session: clientData.ss, + TrackID: clientData.track.id, + Packet: out0.Packet, + PTSEqualsDTS: out0.PTSEqualsDTS, + H264NALUs: out0.H264NALUs, + H264PTS: out0.H264PTS, + }) + } } }