diff --git a/connclient.go b/connclient.go index 5669efee..6e254428 100644 --- a/connclient.go +++ b/connclient.go @@ -502,18 +502,23 @@ func (c *ConnClient) Setup(u *base.URL, mode headers.TransportMode, proto base.S } } + clockRate, err := track.ClockRate() + if err != nil { + if proto == StreamProtocolUDP { + rtpListener.close() + rtcpListener.close() + } + return nil, fmt.Errorf("unable to get the track clock rate: %s", err) + } + if mode == headers.TransportModePlay { - c.rtcpReceivers[track.Id] = rtcpreceiver.New(nil) + c.rtcpReceivers[track.Id] = rtcpreceiver.New(nil, clockRate) if proto == StreamProtocolUDP { v := time.Now().Unix() c.udpLastFrameTimes[track.Id] = &v } } else { - clockRate, err := track.ClockRate() - if err != nil { - return nil, fmt.Errorf("unable to get track clock rate: %s", err) - } c.rtcpSenders[track.Id] = rtcpsender.New(clockRate) } diff --git a/connclientpublish.go b/connclientpublish.go index f6991c80..4eae86f3 100644 --- a/connclientpublish.go +++ b/connclientpublish.go @@ -113,9 +113,9 @@ func (c *ConnClient) backgroundRecordUDP() { c.publishWriteMutex.Lock() now := time.Now() for trackId := range c.rtcpSenders { - report := c.rtcpSenders[trackId].Report(now) - if report != nil { - c.udpRtcpListeners[trackId].write(report) + r := c.rtcpSenders[trackId].Report(now) + if r != nil { + c.udpRtcpListeners[trackId].write(r) } } c.publishWriteMutex.Unlock() @@ -148,13 +148,13 @@ func (c *ConnClient) backgroundRecordTCP() { c.publishWriteMutex.Lock() now := time.Now() for trackId := range c.rtcpSenders { - report := c.rtcpSenders[trackId].Report(now) - if report != nil { + r := c.rtcpSenders[trackId].Report(now) + if r != nil { c.nconn.SetWriteDeadline(time.Now().Add(c.d.WriteTimeout)) frame := base.InterleavedFrame{ TrackId: trackId, StreamType: StreamTypeRtcp, - Content: report, + Content: r, } frame.Write(c.bw) } diff --git a/connclientread.go b/connclientread.go index 2cac73f4..23468035 100644 --- a/connclientread.go +++ b/connclientread.go @@ -96,8 +96,10 @@ func (c *ConnClient) backgroundPlayUDP(onFrameDone chan error) { case <-reportTicker.C: now := time.Now() for trackId := range c.rtcpReceivers { - report := c.rtcpReceivers[trackId].Report(now) - c.udpRtcpListeners[trackId].write(report) + r := c.rtcpReceivers[trackId].Report(now) + if r != nil { + c.udpRtcpListeners[trackId].write(r) + } } case <-keepaliveTicker.C: @@ -191,14 +193,16 @@ func (c *ConnClient) backgroundPlayTCP(onFrameDone chan error) { case <-reportTicker.C: now := time.Now() for trackId := range c.rtcpReceivers { - report := c.rtcpReceivers[trackId].Report(now) - c.nconn.SetWriteDeadline(time.Now().Add(c.d.WriteTimeout)) - frame := base.InterleavedFrame{ - TrackId: trackId, - StreamType: StreamTypeRtcp, - Content: report, + r := c.rtcpReceivers[trackId].Report(now) + if r != nil { + c.nconn.SetWriteDeadline(time.Now().Add(c.d.WriteTimeout)) + frame := base.InterleavedFrame{ + TrackId: trackId, + StreamType: StreamTypeRtcp, + Content: r, + } + frame.Write(c.bw) } - frame.Write(c.bw) } case err := <-readerDone: diff --git a/pkg/rtcpreceiver/rtcpreceiver.go b/pkg/rtcpreceiver/rtcpreceiver.go index 6a928374..5491d8ba 100644 --- a/pkg/rtcpreceiver/rtcpreceiver.go +++ b/pkg/rtcpreceiver/rtcpreceiver.go @@ -13,21 +13,29 @@ import ( // RtcpReceiver is a utility to generate RTCP receiver reports. type RtcpReceiver struct { - receiverSSRC uint32 - mutex sync.Mutex + receiverSSRC uint32 + clockRate float64 + mutex sync.Mutex + + // data from rtp packets firstRtpReceived bool - senderSSRC uint32 sequenceNumberCycles uint16 lastSequenceNumber uint16 + lastRtpTimeRtp uint32 + lastRtpTimeTime time.Time + totalLost uint32 + totalLostSinceReport uint32 + totalSinceReport uint32 + jitter float64 + + // data from rtcp packets + senderSSRC uint32 lastSenderReport uint32 lastSenderReportTime time.Time - totalLost uint32 - totalLostSinceRR uint32 - totalSinceRR uint32 } // New allocates a RtcpReceiver. -func New(receiverSSRC *uint32) *RtcpReceiver { +func New(receiverSSRC *uint32, clockRate int) *RtcpReceiver { return &RtcpReceiver{ receiverSSRC: func() uint32 { if receiverSSRC == nil { @@ -35,41 +43,70 @@ func New(receiverSSRC *uint32) *RtcpReceiver { } return *receiverSSRC }(), + clockRate: float64(clockRate), } } -// OnFrame processes a RTP or RTCP frame and extract the needed data. +// OnFrame extracts the needed data from RTP or RTCP frames. func (rr *RtcpReceiver) OnFrame(ts time.Time, streamType base.StreamType, buf []byte) { rr.mutex.Lock() defer rr.mutex.Unlock() if streamType == base.StreamTypeRtp { - if len(buf) >= 3 { - // extract the sequence number of the first frame + // do not parse the entire packet, extract only the fields we need + if len(buf) >= 8 { sequenceNumber := uint16(buf[2])<<8 | uint16(buf[3]) + rtpTime := uint32(buf[4])<<24 | uint32(buf[5])<<16 | uint32(buf[6])<<8 | uint32(buf[7]) // first frame if !rr.firstRtpReceived { rr.firstRtpReceived = true - rr.totalSinceRR = 1 + rr.totalSinceReport = 1 + rr.lastSequenceNumber = sequenceNumber + rr.lastRtpTimeRtp = rtpTime + rr.lastRtpTimeTime = ts // subsequent frames } else { - diff := (sequenceNumber - rr.lastSequenceNumber) + diff := int32(sequenceNumber) - int32(rr.lastSequenceNumber) - if sequenceNumber != (rr.lastSequenceNumber + 1) { - rr.totalLost += uint32(diff) - 1 - rr.totalLostSinceRR += uint32(diff) - 1 + // following frame or following frame after an overflow + if diff > 0 || diff < -0x0FFF { + // overflow + if diff < -0x0FFF { + rr.sequenceNumberCycles += 1 + } + + // detect lost frames + if sequenceNumber != (rr.lastSequenceNumber + 1) { + rr.totalLost += uint32(uint16(diff) - 1) + rr.totalLostSinceReport += uint32(uint16(diff) - 1) + + // allow up to 24 bits + if rr.totalLost > 0xFFFFFF { + rr.totalLost = 0xFFFFFF + } + if rr.totalLostSinceReport > 0xFFFFFF { + rr.totalLostSinceReport = 0xFFFFFF + } + } + + // compute jitter + // https://tools.ietf.org/html/rfc3550#page-39 + D := ts.Sub(rr.lastRtpTimeTime).Seconds()*rr.clockRate - + (float64(rtpTime) - float64(rr.lastRtpTimeRtp)) + if D < 0 { + D = -D + } + rr.jitter += (D - rr.jitter) / 16 + + rr.totalSinceReport += uint32(uint16(diff)) + rr.lastSequenceNumber = sequenceNumber + rr.lastRtpTimeRtp = rtpTime + rr.lastRtpTimeTime = ts } - - if sequenceNumber < rr.lastSequenceNumber { - rr.sequenceNumberCycles += 1 - } - - rr.totalSinceRR += uint32(diff) + // ignore invalid frames (diff = 0) or reordered frames (diff < 0) } - - rr.lastSequenceNumber = sequenceNumber } } else { @@ -102,19 +139,24 @@ func (rr *RtcpReceiver) Report(ts time.Time) []byte { LastSenderReport: rr.lastSenderReport, // equivalent to taking the integer part after multiplying the // loss fraction by 256 - FractionLost: uint8(float64(rr.totalLostSinceRR*256) / float64(rr.totalSinceRR)), + FractionLost: uint8(float64(rr.totalLostSinceReport*256) / float64(rr.totalSinceReport)), TotalLost: rr.totalLost, // delay, expressed in units of 1/65536 seconds, between // receiving the last SR packet from source SSRC_n and sending this // reception report block - Delay: uint32(ts.Sub(rr.lastSenderReportTime).Seconds() * 65536), + Delay: uint32(ts.Sub(rr.lastSenderReportTime).Seconds() * 65536), + Jitter: uint32(rr.jitter), }, }, } - rr.totalLostSinceRR = 0 - rr.totalSinceRR = 0 + rr.totalLostSinceReport = 0 + rr.totalSinceReport = 0 + + byts, err := report.Marshal() + if err != nil { + panic(err) + } - byts, _ := report.Marshal() return byts } diff --git a/pkg/rtcpreceiver/rtcpreceiver_test.go b/pkg/rtcpreceiver/rtcpreceiver_test.go index d1da149e..0ea6472b 100644 --- a/pkg/rtcpreceiver/rtcpreceiver_test.go +++ b/pkg/rtcpreceiver/rtcpreceiver_test.go @@ -13,12 +13,12 @@ import ( func TestRtcpReceiverBase(t *testing.T) { v := uint32(0x65f83afb) - rr := New(&v) + rr := New(&v, 90000) srPkt := rtcp.SenderReport{ SSRC: 0xba9da416, NTPTime: 0xe363887a17ced916, - RTPTime: 1287981738, + RTPTime: 0xafb45733, PacketCount: 714, OctetCount: 859127, } @@ -32,7 +32,7 @@ func TestRtcpReceiverBase(t *testing.T) { Marker: true, PayloadType: 96, SequenceNumber: 946, - Timestamp: 1287987768, + Timestamp: 0xafb45733, SSRC: 0xba9da416, }, Payload: []byte("\x00\x00"), @@ -41,25 +41,40 @@ func TestRtcpReceiverBase(t *testing.T) { ts = time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC) rr.OnFrame(ts, base.StreamTypeRtp, byts) + rtpPkt = rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 947, + Timestamp: 0xafb45733 + 90000, + SSRC: 0xba9da416, + }, + Payload: []byte("\x00\x00"), + } + byts, _ = rtpPkt.Marshal() + ts = time.Date(2008, 05, 20, 22, 15, 21, 0, time.UTC) + rr.OnFrame(ts, base.StreamTypeRtp, byts) + expectedPkt := rtcp.ReceiverReport{ SSRC: 0x65f83afb, Reports: []rtcp.ReceptionReport{ { SSRC: 0xba9da416, - LastSequenceNumber: 946, + LastSequenceNumber: 947, LastSenderReport: 0x887a17ce, - Delay: 1 * 65536, + Delay: 2 * 65536, }, }, } expected, _ := expectedPkt.Marshal() - ts = time.Date(2008, 05, 20, 22, 15, 21, 0, time.UTC) + ts = time.Date(2008, 05, 20, 22, 15, 22, 0, time.UTC) require.Equal(t, expected, rr.Report(ts)) } -func TestRtcpReceiverSequenceOverflow(t *testing.T) { +func TestRtcpReceiverOverflow(t *testing.T) { v := uint32(0x65f83afb) - rr := New(&v) + rr := New(&v, 90000) srPkt := rtcp.SenderReport{ SSRC: 0xba9da416, @@ -78,7 +93,7 @@ func TestRtcpReceiverSequenceOverflow(t *testing.T) { Marker: true, PayloadType: 96, SequenceNumber: 0xffff, - Timestamp: 1287987768, + Timestamp: 0xafb45733, SSRC: 0xba9da416, }, Payload: []byte("\x00\x00"), @@ -93,7 +108,7 @@ func TestRtcpReceiverSequenceOverflow(t *testing.T) { Marker: true, PayloadType: 96, SequenceNumber: 0x0000, - Timestamp: 1287987768, + Timestamp: 0xafb45733, SSRC: 0xba9da416, }, Payload: []byte("\x00\x00"), @@ -120,7 +135,7 @@ func TestRtcpReceiverSequenceOverflow(t *testing.T) { func TestRtcpReceiverPacketLost(t *testing.T) { v := uint32(0x65f83afb) - rr := New(&v) + rr := New(&v, 90000) srPkt := rtcp.SenderReport{ SSRC: 0xba9da416, @@ -139,7 +154,7 @@ func TestRtcpReceiverPacketLost(t *testing.T) { Marker: true, PayloadType: 96, SequenceNumber: 0x0120, - Timestamp: 1287987768, + Timestamp: 0xafb45733, SSRC: 0xba9da416, }, Payload: []byte("\x00\x00"), @@ -154,7 +169,7 @@ func TestRtcpReceiverPacketLost(t *testing.T) { Marker: true, PayloadType: 96, SequenceNumber: 0x0122, - Timestamp: 1287987768, + Timestamp: 0xafb45733, SSRC: 0xba9da416, }, Payload: []byte("\x00\x00"), @@ -184,9 +199,9 @@ func TestRtcpReceiverPacketLost(t *testing.T) { require.Equal(t, expected, rr.Report(ts)) } -func TestRtcpReceiverSequenceOverflowPacketLost(t *testing.T) { +func TestRtcpReceiverOverflowPacketLost(t *testing.T) { v := uint32(0x65f83afb) - rr := New(&v) + rr := New(&v, 90000) srPkt := rtcp.SenderReport{ SSRC: 0xba9da416, @@ -205,7 +220,7 @@ func TestRtcpReceiverSequenceOverflowPacketLost(t *testing.T) { Marker: true, PayloadType: 96, SequenceNumber: 0xffff, - Timestamp: 1287987768, + Timestamp: 0xafb45733, SSRC: 0xba9da416, }, Payload: []byte("\x00\x00"), @@ -220,7 +235,7 @@ func TestRtcpReceiverSequenceOverflowPacketLost(t *testing.T) { Marker: true, PayloadType: 96, SequenceNumber: 0x0002, - Timestamp: 1287987768, + Timestamp: 0xafb45733, SSRC: 0xba9da416, }, Payload: []byte("\x00\x00"), @@ -249,3 +264,126 @@ func TestRtcpReceiverSequenceOverflowPacketLost(t *testing.T) { ts = time.Date(2008, 05, 20, 22, 15, 21, 0, time.UTC) require.Equal(t, expected, rr.Report(ts)) } + +func TestRtcpReceiverReorderedPackets(t *testing.T) { + v := uint32(0x65f83afb) + rr := New(&v, 90000) + + srPkt := rtcp.SenderReport{ + SSRC: 0xba9da416, + NTPTime: 0xe363887a17ced916, + RTPTime: 1287981738, + PacketCount: 714, + OctetCount: 859127, + } + byts, _ := srPkt.Marshal() + ts := time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC) + rr.OnFrame(ts, base.StreamTypeRtcp, byts) + + rtpPkt := rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 0x43a7, + Timestamp: 0xafb45733, + SSRC: 0xba9da416, + }, + Payload: []byte("\x00\x00"), + } + byts, _ = rtpPkt.Marshal() + ts = time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC) + rr.OnFrame(ts, base.StreamTypeRtp, byts) + + rtpPkt = rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 0x43a6, + Timestamp: 0xafb45733, + SSRC: 0xba9da416, + }, + Payload: []byte("\x00\x00"), + } + byts, _ = rtpPkt.Marshal() + ts = time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC) + rr.OnFrame(ts, base.StreamTypeRtp, byts) + + expectedPkt := rtcp.ReceiverReport{ + SSRC: 0x65f83afb, + Reports: []rtcp.ReceptionReport{ + { + SSRC: 0xba9da416, + LastSequenceNumber: 0x43a7, + LastSenderReport: 0x887a17ce, + Delay: 1 * 65536, + }, + }, + } + expected, _ := expectedPkt.Marshal() + ts = time.Date(2008, 05, 20, 22, 15, 21, 0, time.UTC) + require.Equal(t, expected, rr.Report(ts)) +} + +func TestRtcpReceiverJitter(t *testing.T) { + v := uint32(0x65f83afb) + rr := New(&v, 90000) + + srPkt := rtcp.SenderReport{ + SSRC: 0xba9da416, + NTPTime: 0xe363887a17ced916, + RTPTime: 0xafb45733, + PacketCount: 714, + OctetCount: 859127, + } + byts, _ := srPkt.Marshal() + ts := time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC) + rr.OnFrame(ts, base.StreamTypeRtcp, byts) + + rtpPkt := rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 946, + Timestamp: 0xafb45733, + SSRC: 0xba9da416, + }, + Payload: []byte("\x00\x00"), + } + byts, _ = rtpPkt.Marshal() + ts = time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC) + rr.OnFrame(ts, base.StreamTypeRtp, byts) + + rtpPkt = rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 947, + Timestamp: 0xafb45733 + 45000, + SSRC: 0xba9da416, + }, + Payload: []byte("\x00\x00"), + } + byts, _ = rtpPkt.Marshal() + ts = time.Date(2008, 05, 20, 22, 15, 21, 0, time.UTC) + rr.OnFrame(ts, base.StreamTypeRtp, byts) + + expectedPkt := rtcp.ReceiverReport{ + SSRC: 0x65f83afb, + Reports: []rtcp.ReceptionReport{ + { + SSRC: 0xba9da416, + LastSequenceNumber: 947, + LastSenderReport: 0x887a17ce, + Delay: 2 * 65536, + Jitter: 45000 / 16, + }, + }, + } + expected, _ := expectedPkt.Marshal() + ts = time.Date(2008, 05, 20, 22, 15, 22, 0, time.UTC) + require.Equal(t, expected, rr.Report(ts)) +} diff --git a/pkg/rtcpsender/rtcpsender.go b/pkg/rtcpsender/rtcpsender.go index 6db1cc7a..3fe3f6d3 100644 --- a/pkg/rtcpsender/rtcpsender.go +++ b/pkg/rtcpsender/rtcpsender.go @@ -13,12 +13,14 @@ import ( // RtcpSender is a utility to generate RTCP sender reports. type RtcpSender struct { - clockRate float64 - mutex sync.Mutex + clockRate float64 + mutex sync.Mutex + + // data from rtp packets firstRtpReceived bool senderSSRC uint32 - rtpTimeOffset uint32 - rtpTimeTime time.Time + lastRtpTimeRtp uint32 + lastRtpTimeTime time.Time packetCount uint32 octetCount uint32 } @@ -30,7 +32,7 @@ func New(clockRate int) *RtcpSender { } } -// OnFrame processes a RTP or RTCP frame and extract the needed data. +// OnFrame extracts the needed data from RTP or RTCP frames. func (rs *RtcpSender) OnFrame(ts time.Time, streamType base.StreamType, buf []byte) { rs.mutex.Lock() defer rs.mutex.Unlock() @@ -45,8 +47,8 @@ func (rs *RtcpSender) OnFrame(ts time.Time, streamType base.StreamType, buf []by } // always update time to minimize errors - rs.rtpTimeOffset = pkt.Timestamp - rs.rtpTimeTime = ts + rs.lastRtpTimeRtp = pkt.Timestamp + rs.lastRtpTimeTime = ts rs.packetCount++ rs.octetCount += uint32(len(pkt.Payload)) @@ -74,11 +76,15 @@ func (rs *RtcpSender) Report(ts time.Time) []byte { fractionalPart := uint32((s - float64(integerPart)) * 0xFFFFFFFF) return uint64(integerPart)<<32 | uint64(fractionalPart) }(), - RTPTime: rs.rtpTimeOffset + uint32((ts.Sub(rs.rtpTimeTime)).Seconds()*float64(rs.clockRate)), + RTPTime: rs.lastRtpTimeRtp + uint32((ts.Sub(rs.lastRtpTimeTime)).Seconds()*float64(rs.clockRate)), PacketCount: rs.packetCount, OctetCount: rs.octetCount, } - byts, _ := report.Marshal() + byts, err := report.Marshal() + if err != nil { + panic(err) + } + return byts }