From 673acb657afcd72ce16962a4871f620b3e79b0f3 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sun, 22 Nov 2020 15:11:05 +0100 Subject: [PATCH] rtcpsender: extract clock rate from sdp instead of estimating it --- connclient.go | 15 +++++++----- pkg/rtcpreceiver/rtcpreceiver.go | 2 +- pkg/rtcpsender/rtcpsender.go | 40 ++++++++++++++------------------ pkg/rtpaac/encoder.go | 6 ++--- track.go | 26 +++++++++++++++++++++ 5 files changed, 56 insertions(+), 33 deletions(-) diff --git a/connclient.go b/connclient.go index fea445a5..f3f49d3d 100644 --- a/connclient.go +++ b/connclient.go @@ -503,11 +503,6 @@ func (c *ConnClient) Setup(u *base.URL, mode headers.TransportMode, proto base.S } } - c.streamUrl = u - c.streamProtocol = &proto - - c.tracks = append(c.tracks, track) - if mode == headers.TransportModePlay { c.rtcpReceivers[track.Id] = rtcpreceiver.New(nil) @@ -516,9 +511,17 @@ func (c *ConnClient) Setup(u *base.URL, mode headers.TransportMode, proto base.S c.udpLastFrameTimes[track.Id] = &v } } else { - c.rtcpSenders[track.Id] = rtcpsender.New() + clockRate, err := track.ClockRate() + if err != nil { + return nil, fmt.Errorf("unable to get track clock rate: %s", err) + } + c.rtcpSenders[track.Id] = rtcpsender.New(clockRate) } + c.streamUrl = u + c.streamProtocol = &proto + c.tracks = append(c.tracks, track) + if proto == StreamProtocolUDP { rtpListener.remoteIp = c.nconn.RemoteAddr().(*net.TCPAddr).IP rtpListener.remoteZone = c.nconn.RemoteAddr().(*net.TCPAddr).Zone diff --git a/pkg/rtcpreceiver/rtcpreceiver.go b/pkg/rtcpreceiver/rtcpreceiver.go index f5508735..1d9caf5d 100644 --- a/pkg/rtcpreceiver/rtcpreceiver.go +++ b/pkg/rtcpreceiver/rtcpreceiver.go @@ -12,10 +12,10 @@ import ( // RtcpReceiver allows to generate RTCP receiver reports. type RtcpReceiver struct { + receiverSSRC uint32 mutex sync.Mutex firstRtpReceived bool senderSSRC uint32 - receiverSSRC uint32 sequenceNumberCycles uint16 lastSequenceNumber uint16 lastSenderReportTime uint32 diff --git a/pkg/rtcpsender/rtcpsender.go b/pkg/rtcpsender/rtcpsender.go index e2794a66..794c7b79 100644 --- a/pkg/rtcpsender/rtcpsender.go +++ b/pkg/rtcpsender/rtcpsender.go @@ -13,20 +13,21 @@ import ( // RtcpSender allows to generate RTCP sender reports. type RtcpSender struct { - mutex sync.Mutex - firstRtpReceived bool - secondRtpReceived bool - senderSSRC uint32 - packetCount uint32 - octetCount uint32 - rtpTimeOffset uint32 - rtpTimeTime time.Time - clock float64 + clockRate float64 + mutex sync.Mutex + firstRtpReceived bool + senderSSRC uint32 + rtpTimeOffset uint32 + rtpTimeTime time.Time + packetCount uint32 + octetCount uint32 } // New allocates a RtcpSender. -func New() *RtcpSender { - return &RtcpSender{} +func New(clockRate int) *RtcpSender { + return &RtcpSender{ + clockRate: float64(clockRate), + } } // OnFrame processes a RTP or RTCP frame and extract the needed data. @@ -45,13 +46,6 @@ func (rs *RtcpSender) OnFrame(ts time.Time, streamType base.StreamType, buf []by // save RTP time offset and correspondent time rs.rtpTimeOffset = pkt.Timestamp rs.rtpTimeTime = ts - - } else if !rs.secondRtpReceived && pkt.Timestamp != rs.rtpTimeOffset { - rs.secondRtpReceived = true - - // estimate clock - rs.clock = float64(pkt.Timestamp-rs.rtpTimeOffset) / - ts.Sub(rs.rtpTimeTime).Seconds() } rs.packetCount++ @@ -65,7 +59,7 @@ func (rs *RtcpSender) Report(ts time.Time) []byte { rs.mutex.Lock() defer rs.mutex.Unlock() - if !rs.firstRtpReceived || !rs.secondRtpReceived { + if !rs.firstRtpReceived { return nil } @@ -73,14 +67,14 @@ func (rs *RtcpSender) Report(ts time.Time) []byte { SSRC: rs.senderSSRC, NTPTime: func() uint64 { // seconds since 1st January 1900 - n := (float64(ts.UnixNano()) / 1000000000) + 2208988800 + s := (float64(ts.UnixNano()) / 1000000000) + 2208988800 // higher 32 bits are the integer part, lower 32 bits are the fractional part - integerPart := uint32(n) - fractionalPart := uint32((n - float64(integerPart)) * 0xFFFFFFFF) + integerPart := uint32(s) + fractionalPart := uint32((s - float64(integerPart)) * 0xFFFFFFFF) return uint64(integerPart)<<32 | uint64(fractionalPart) }(), - RTPTime: rs.rtpTimeOffset + uint32((ts.Sub(rs.rtpTimeTime)).Seconds()*rs.clock), + RTPTime: rs.rtpTimeOffset + uint32((ts.Sub(rs.rtpTimeTime)).Seconds()*float64(rs.clockRate)), PacketCount: rs.packetCount, OctetCount: rs.octetCount, } diff --git a/pkg/rtpaac/encoder.go b/pkg/rtpaac/encoder.go index c6a6a862..7fdf8b10 100644 --- a/pkg/rtpaac/encoder.go +++ b/pkg/rtpaac/encoder.go @@ -19,7 +19,7 @@ const ( // Encoder is a RPT/AAC encoder. type Encoder struct { payloadType uint8 - samplingRate float64 + sampleRate float64 sequenceNumber uint16 ssrc uint32 initialTs uint32 @@ -35,7 +35,7 @@ func NewEncoder(relativeType uint8, config []byte) (*Encoder, error) { return &Encoder{ payloadType: 96 + relativeType, - samplingRate: float64(codec.Config.SampleRate), + sampleRate: float64(codec.Config.SampleRate), sequenceNumber: uint16(rand.Uint32()), ssrc: rand.Uint32(), initialTs: rand.Uint32(), @@ -52,7 +52,7 @@ func (e *Encoder) Write(data []byte, timestamp time.Duration) ([][]byte, error) return nil, fmt.Errorf("data is too big") } - rtpTs := e.initialTs + uint32((timestamp-e.started).Seconds()*e.samplingRate) + rtpTs := e.initialTs + uint32((timestamp-e.started).Seconds()*e.sampleRate) // 13 bits payload size // 3 bits AU-Index(-delta) diff --git a/track.go b/track.go index 85baec34..685655d7 100644 --- a/track.go +++ b/track.go @@ -107,6 +107,32 @@ func NewTrackAac(id int, config []byte) (*Track, error) { }, nil } +// ClockRate returns the clock rate of the track. +func (t *Track) ClockRate() (int, error) { + // https://tools.ietf.org/html/rfc4566 + // a=rtpmap: / [/] + for _, a := range t.Media.Attributes { + if a.Key == "rtpmap" { + tmp := strings.Split(a.Value, " ") + if len(tmp) != 2 { + return 0, fmt.Errorf("invalid format (%s)", a.Value) + } + + tmp = strings.Split(tmp[1], "/") + if len(tmp) != 2 && len(tmp) != 3 { + return 0, fmt.Errorf("invalid format (%s)", a.Value) + } + + v, err := strconv.ParseInt(tmp[1], 10, 64) + if err != nil { + return 0, err + } + return int(v), nil + } + } + return 0, fmt.Errorf("attribute 'rtpmap' not found") +} + // Tracks is a list of tracks. type Tracks []*Track