diff --git a/client.go b/client.go index 4c6f79f5..212f63a2 100644 --- a/client.go +++ b/client.go @@ -285,10 +285,11 @@ type Client struct { // private // - senderReportPeriod time.Duration - udpReceiverReportPeriod time.Duration - checkTimeoutPeriod time.Duration - keepalivePeriod time.Duration + timeNow func() time.Time + senderReportPeriod time.Duration + receiverReportPeriod time.Duration + checkTimeoutPeriod time.Duration + keepalivePeriod time.Duration connURL *url.URL ctx context.Context @@ -396,12 +397,15 @@ func (c *Client) Start(scheme string, host string) error { } // private + if c.timeNow == nil { + c.timeNow = time.Now + } if c.senderReportPeriod == 0 { c.senderReportPeriod = 10 * time.Second } - if c.udpReceiverReportPeriod == 0 { + if c.receiverReportPeriod == 0 { // some cameras require a maximum of 5secs between keepalives - c.udpReceiverReportPeriod = 5 * time.Second + c.receiverReportPeriod = 5 * time.Second } if c.checkTimeoutPeriod == 0 { c.checkTimeoutPeriod = 1 * time.Second @@ -680,7 +684,7 @@ func (c *Client) playRecordStart() { default: // TCP c.checkTimeoutTimer = time.NewTimer(c.checkTimeoutPeriod) - v := time.Now().Unix() + v := c.timeNow().Unix() c.tcpLastFrameTime = &v } } @@ -870,7 +874,7 @@ func (c *Client) atLeastOneUDPPacketHasBeenReceived() bool { } func (c *Client) isInUDPTimeout() bool { - now := time.Now() + now := c.timeNow() for _, ct := range c.medias { lft := time.Unix(atomic.LoadInt64(ct.udpRTPListener.lastPacketTime), 0) if now.Sub(lft) < c.ReadTimeout { @@ -886,7 +890,7 @@ func (c *Client) isInUDPTimeout() bool { } func (c *Client) isInTCPTimeout() bool { - now := time.Now() + now := c.timeNow() lft := time.Unix(atomic.LoadInt64(c.tcpLastFrameTime), 0) return now.Sub(lft) >= c.ReadTimeout } @@ -1653,7 +1657,7 @@ func (c *Client) OnPacketRTCP(medi *media.Media, cb OnPacketRTCPFunc) { // WritePacketRTP writes a RTP packet to the server. func (c *Client) WritePacketRTP(medi *media.Media, pkt *rtp.Packet) error { - return c.WritePacketRTPWithNTP(medi, pkt, time.Now()) + return c.WritePacketRTPWithNTP(medi, pkt, c.timeNow()) } // WritePacketRTPWithNTP writes a RTP packet to the server. @@ -1695,3 +1699,11 @@ func (c *Client) WritePacketRTCP(medi *media.Media, pkt rtcp.Packet) error { cm.writePacketRTCP(byts) return nil } + +// PacketNTP returns the NTP timestamp of an incoming RTP packet. +// The NTP timestamp is computed from sender reports. +func (c *Client) PacketNTP(medi *media.Media, pkt *rtp.Packet) (time.Time, bool) { + cm := c.medias[medi] + ct := cm.formats[pkt.PayloadType] + return ct.rtcpReceiver.PacketNTP(pkt.Timestamp) +} diff --git a/client_format.go b/client_format.go index f7094c0e..cd0ea071 100644 --- a/client_format.go +++ b/client_format.go @@ -18,8 +18,8 @@ type clientFormat struct { cm *clientMedia format format.Format udpReorderer *rtpreorderer.Reorderer // play - udpRTCPReceiver *rtcpreceiver.RTCPReceiver // play tcpLossDetector *rtplossdetector.LossDetector // play + rtcpReceiver *rtcpreceiver.RTCPReceiver // play rtcpSender *rtcpsender.RTCPSender // record onPacketRTP OnPacketRTPFunc } @@ -36,24 +36,29 @@ func (ct *clientFormat) start() { if ct.cm.c.state == clientStatePlay { if ct.cm.udpRTPListener != nil { ct.udpReorderer = rtpreorderer.New() - - var err error - ct.udpRTCPReceiver, err = rtcpreceiver.New( - ct.cm.c.udpReceiverReportPeriod, - nil, - ct.format.ClockRate(), func(pkt rtcp.Packet) { - ct.cm.c.WritePacketRTCP(ct.cm.media, pkt) //nolint:errcheck - }) - if err != nil { - panic(err) - } } else { ct.tcpLossDetector = rtplossdetector.New() } + + var err error + ct.rtcpReceiver, err = rtcpreceiver.New( + ct.format.ClockRate(), + nil, + ct.cm.c.receiverReportPeriod, + ct.cm.c.timeNow, + func(pkt rtcp.Packet) { + if ct.cm.udpRTPListener != nil { + ct.cm.c.WritePacketRTCP(ct.cm.media, pkt) //nolint:errcheck + } + }) + if err != nil { + panic(err) + } } else { ct.rtcpSender = rtcpsender.New( ct.format.ClockRate(), ct.cm.c.senderReportPeriod, + ct.cm.c.timeNow, func(pkt rtcp.Packet) { if !ct.cm.c.DisableRTCPSenderReports { ct.cm.c.WritePacketRTCP(ct.cm.media, pkt) //nolint:errcheck @@ -63,9 +68,9 @@ func (ct *clientFormat) start() { } func (ct *clientFormat) stop() { - if ct.udpRTCPReceiver != nil { - ct.udpRTCPReceiver.Close() - ct.udpRTCPReceiver = nil + if ct.rtcpReceiver != nil { + ct.rtcpReceiver.Close() + ct.rtcpReceiver = nil } if ct.rtcpSender != nil { @@ -95,10 +100,10 @@ func (ct *clientFormat) readRTPUDP(pkt *rtp.Packet) { // do not return } - now := time.Now() + now := ct.cm.c.timeNow() for _, pkt := range packets { - ct.udpRTCPReceiver.ProcessPacket(pkt, now, ct.format.PTSEqualsDTS(pkt)) + ct.rtcpReceiver.ProcessPacket(pkt, now, ct.format.PTSEqualsDTS(pkt)) ct.onPacketRTP(pkt) } } @@ -117,5 +122,7 @@ func (ct *clientFormat) readRTPTCP(pkt *rtp.Packet) { // do not return } + now := ct.cm.c.timeNow() + ct.rtcpReceiver.ProcessPacket(pkt, now, ct.format.PTSEqualsDTS(pkt)) ct.onPacketRTP(pkt) } diff --git a/client_media.go b/client_media.go index fe094e6f..c6046abe 100644 --- a/client_media.go +++ b/client_media.go @@ -44,9 +44,7 @@ func (cm *clientMedia) close() { func (cm *clientMedia) allocateUDPListeners(multicast bool, rtpAddress string, rtcpAddress string) error { if rtpAddress != ":0" { l1, err := newClientUDPListener( - cm.c.ListenPacket, - cm.c.AnyPortEnable, - cm.c.WriteTimeout, + cm.c, multicast, rtpAddress, ) @@ -55,9 +53,7 @@ func (cm *clientMedia) allocateUDPListeners(multicast bool, rtpAddress string, r } l2, err := newClientUDPListener( - cm.c.ListenPacket, - cm.c.AnyPortEnable, - cm.c.WriteTimeout, + cm.c, multicast, rtcpAddress, ) @@ -71,11 +67,7 @@ func (cm *clientMedia) allocateUDPListeners(multicast bool, rtpAddress string, r } var err error - cm.udpRTPListener, cm.udpRTCPListener, err = newClientUDPListenerPair( - cm.c.ListenPacket, - cm.c.AnyPortEnable, - cm.c.WriteTimeout, - ) + cm.udpRTPListener, cm.udpRTCPListener, err = newClientUDPListenerPair(cm.c) return err } @@ -144,7 +136,7 @@ func (cm *clientMedia) stop() { func (cm *clientMedia) findFormatWithSSRC(ssrc uint32) *clientFormat { for _, format := range cm.formats { - tssrc, ok := format.udpRTCPReceiver.LastSSRC() + tssrc, ok := format.rtcpReceiver.LastSSRC() if ok && tssrc == ssrc { return format } @@ -183,7 +175,7 @@ func (cm *clientMedia) writePacketRTCP(byts []byte) { } func (cm *clientMedia) readRTPTCPPlay(payload []byte) { - now := time.Now() + now := cm.c.timeNow() atomic.StoreInt64(cm.c.tcpLastFrameTime, now.Unix()) pkt := &rtp.Packet{} @@ -203,7 +195,7 @@ func (cm *clientMedia) readRTPTCPPlay(payload []byte) { } func (cm *clientMedia) readRTCPTCPPlay(payload []byte) { - now := time.Now() + now := cm.c.timeNow() atomic.StoreInt64(cm.c.tcpLastFrameTime, now.Unix()) if len(payload) > udpMaxPayloadSize { @@ -219,6 +211,13 @@ func (cm *clientMedia) readRTCPTCPPlay(payload []byte) { } for _, pkt := range packets { + if sr, ok := pkt.(*rtcp.SenderReport); ok { + format := cm.findFormatWithSSRC(sr.SSRC) + if format != nil { + format.rtcpReceiver.ProcessSenderReport(sr, now) + } + } + cm.onPacketRTCP(pkt) } } @@ -271,7 +270,7 @@ func (cm *clientMedia) readRTPUDPPlay(payload []byte) { } func (cm *clientMedia) readRTCPUDPPlay(payload []byte) { - now := time.Now() + now := cm.c.timeNow() plen := len(payload) atomic.AddUint64(cm.c.BytesReceived, uint64(plen)) @@ -291,7 +290,7 @@ func (cm *clientMedia) readRTCPUDPPlay(payload []byte) { if sr, ok := pkt.(*rtcp.SenderReport); ok { format := cm.findFormatWithSSRC(sr.SSRC) if format != nil { - format.udpRTCPReceiver.ProcessSenderReport(sr, now) + format.rtcpReceiver.ProcessSenderReport(sr, now) } } diff --git a/client_play_test.go b/client_play_test.go index 3b2b471b..2f3197e6 100644 --- a/client_play_test.go +++ b/client_play_test.go @@ -2138,12 +2138,12 @@ func TestClientPlayRTCPReport(t *testing.T) { require.NoError(t, err) // wait for the packet's SSRC to be saved - time.Sleep(500 * time.Millisecond) + time.Sleep(200 * time.Millisecond) sr := &rtcp.SenderReport{ SSRC: 753621, - NTPTime: 0, - RTPTime: 0, + NTPTime: ntpTimeGoToRTCP(time.Date(2017, 8, 12, 15, 30, 0, 0, time.UTC)), + RTPTime: 54352, PacketCount: 1, OctetCount: 4, } @@ -2167,7 +2167,7 @@ func TestClientPlayRTCPReport(t *testing.T) { { SSRC: rr.Reports[0].SSRC, LastSequenceNumber: 946, - LastSenderReport: rr.Reports[0].LastSenderReport, + LastSenderReport: 2641887232, Delay: rr.Reports[0].Delay, }, }, @@ -2187,7 +2187,7 @@ func TestClientPlayRTCPReport(t *testing.T) { }() c := Client{ - udpReceiverReportPeriod: 1 * time.Second, + receiverReportPeriod: 500 * time.Millisecond, } err = readAll(&c, "rtsp://localhost:8554/teststream", nil) @@ -3166,3 +3166,184 @@ func TestClientPlayDecodeErrors(t *testing.T) { }) } } + +func TestClientPlayPacketNTP(t *testing.T) { + l, err := net.Listen("tcp", "localhost:8554") + require.NoError(t, err) + defer l.Close() + + serverDone := make(chan struct{}) + defer func() { <-serverDone }() + go func() { + defer close(serverDone) + + nconn, err := l.Accept() + require.NoError(t, err) + defer nconn.Close() + conn := conn.NewConn(nconn) + + req, err := conn.ReadRequest() + require.NoError(t, err) + require.Equal(t, base.Options, req.Method) + + err = conn.WriteResponse(&base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Public": base.HeaderValue{strings.Join([]string{ + string(base.Describe), + string(base.Setup), + string(base.Play), + }, ", ")}, + }, + }) + require.NoError(t, err) + + req, err = conn.ReadRequest() + require.NoError(t, err) + require.Equal(t, base.Describe, req.Method) + + medias := media.Medias{testH264Media} + resetMediaControls(medias) + + err = conn.WriteResponse(&base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Content-Type": base.HeaderValue{"application/sdp"}, + "Content-Base": base.HeaderValue{"rtsp://localhost:8554/teststream/"}, + }, + Body: mustMarshalMedias(medias), + }) + require.NoError(t, err) + + req, err = conn.ReadRequest() + require.NoError(t, err) + require.Equal(t, base.Setup, req.Method) + + var inTH headers.Transport + err = inTH.Unmarshal(req.Header["Transport"]) + require.NoError(t, err) + + l1, err := net.ListenPacket("udp", "localhost:27556") + require.NoError(t, err) + defer l1.Close() + + l2, err := net.ListenPacket("udp", "localhost:27557") + require.NoError(t, err) + defer l2.Close() + + err = conn.WriteResponse(&base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Transport": headers.Transport{ + Protocol: headers.TransportProtocolUDP, + Delivery: func() *headers.TransportDelivery { + v := headers.TransportDeliveryUnicast + return &v + }(), + ServerPorts: &[2]int{27556, 27557}, + ClientPorts: inTH.ClientPorts, + }.Marshal(), + }, + }) + require.NoError(t, err) + + req, err = conn.ReadRequest() + require.NoError(t, err) + require.Equal(t, base.Play, req.Method) + + err = conn.WriteResponse(&base.Response{ + StatusCode: base.StatusOK, + }) + require.NoError(t, err) + + // skip firewall opening + buf := make([]byte, 2048) + _, _, err = l2.ReadFrom(buf) + require.NoError(t, err) + + pkt := rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 946, + Timestamp: 54352, + SSRC: 753621, + }, + Payload: []byte{1, 2, 3, 4}, + } + byts, _ := pkt.Marshal() + _, err = l1.WriteTo(byts, &net.UDPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: inTH.ClientPorts[0], + }) + require.NoError(t, err) + + // wait for the packet's SSRC to be saved + time.Sleep(100 * time.Millisecond) + + sr := &rtcp.SenderReport{ + SSRC: 753621, + NTPTime: ntpTimeGoToRTCP(time.Date(2017, 8, 12, 15, 30, 0, 0, time.UTC)), + RTPTime: 54352, + PacketCount: 1, + OctetCount: 4, + } + byts, _ = sr.Marshal() + _, err = l2.WriteTo(byts, &net.UDPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: inTH.ClientPorts[1], + }) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + pkt = rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 947, + Timestamp: 54352 + 90000, + SSRC: 753621, + }, + Payload: []byte{5, 6, 7, 8}, + } + byts, _ = pkt.Marshal() + _, err = l1.WriteTo(byts, &net.UDPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: inTH.ClientPorts[0], + }) + require.NoError(t, err) + + req, err = conn.ReadRequest() + require.NoError(t, err) + require.Equal(t, base.Teardown, req.Method) + + err = conn.WriteResponse(&base.Response{ + StatusCode: base.StatusOK, + }) + require.NoError(t, err) + }() + + c := Client{} + + recv := make(chan struct{}) + first := false + + err = readAll(&c, "rtsp://localhost:8554/teststream", + func(medi *media.Media, forma format.Format, pkt *rtp.Packet) { + if !first { + first = true + } else { + ntp, ok := c.PacketNTP(medi, pkt) + require.Equal(t, true, ok) + require.Equal(t, time.Date(2017, 8, 12, 15, 30, 1, 0, time.UTC), ntp.UTC()) + close(recv) + } + }) + require.NoError(t, err) + defer c.Close() + + <-recv +} diff --git a/client_record_test.go b/client_record_test.go index 3894ff6d..96684646 100644 --- a/client_record_test.go +++ b/client_record_test.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "net" "strings" + "sync" "testing" "time" @@ -65,6 +66,11 @@ var testRTCPPacketMarshaled = func() []byte { return byts }() +func ntpTimeGoToRTCP(v time.Time) uint64 { + s := uint64(v.UnixNano()) + 2208988800*1000000000 + return (s/1000000000)<<32 | (s % 1000000000) +} + func record(c *Client, ur string, medias media.Medias, cb func(*media.Media, rtcp.Packet)) error { u, err := url.Parse(ur) if err != nil { @@ -1168,10 +1174,10 @@ func TestClientRecordRTCPReport(t *testing.T) { require.NoError(t, err) require.Equal(t, &rtcp.SenderReport{ SSRC: 0x38F27A2F, - NTPTime: packets[0].(*rtcp.SenderReport).NTPTime, - RTPTime: packets[0].(*rtcp.SenderReport).RTPTime, - PacketCount: 2, - OctetCount: 2, + NTPTime: ntpTimeGoToRTCP(time.Date(1996, 2, 13, 14, 33, 5, 0, time.UTC)), + RTPTime: 1300000 + 60*90000, + PacketCount: 1, + OctetCount: 1, }, packets[0]) close(reportReceived) @@ -1186,6 +1192,9 @@ func TestClientRecordRTCPReport(t *testing.T) { require.NoError(t, err) }() + var curTime time.Time + var curTimeMutex sync.Mutex + c := Client{ Transport: func() *Transport { if ca == "udp" { @@ -1195,7 +1204,12 @@ func TestClientRecordRTCPReport(t *testing.T) { v := TransportTCP return &v }(), - senderReportPeriod: 500 * time.Millisecond, + timeNow: func() time.Time { + curTimeMutex.Lock() + defer curTimeMutex.Unlock() + return curTime + }, + senderReportPeriod: 100 * time.Millisecond, } medi := testH264Media @@ -1205,17 +1219,27 @@ func TestClientRecordRTCPReport(t *testing.T) { require.NoError(t, err) defer c.Close() - for i := 0; i < 2; i++ { - err = c.WritePacketRTP(medi, &rtp.Packet{ + curTimeMutex.Lock() + curTime = time.Date(2013, 6, 10, 1, 0, 0, 0, time.UTC) + curTimeMutex.Unlock() + + err = c.WritePacketRTPWithNTP( + medi, + &rtp.Packet{ Header: rtp.Header{ Version: 2, PayloadType: 96, SSRC: 0x38F27A2F, + Timestamp: 1300000, }, Payload: []byte{0x05}, // IDR - }) - require.NoError(t, err) - } + }, + time.Date(1996, 2, 13, 14, 32, 5, 0, time.UTC)) + require.NoError(t, err) + + curTimeMutex.Lock() + curTime = time.Date(2013, 6, 10, 1, 1, 0, 0, time.UTC) + curTimeMutex.Unlock() <-reportReceived }) diff --git a/client_udp_listener.go b/client_udp_listener.go index 1f3f78a0..de089c19 100644 --- a/client_udp_listener.go +++ b/client_udp_listener.go @@ -25,9 +25,8 @@ func randInRange(max int) (int, error) { } type clientUDPListener struct { - anyPortEnable bool - writeTimeout time.Duration - pc net.PacketConn + c *Client + pc net.PacketConn readFunc readFunc readIP net.IP @@ -40,11 +39,7 @@ type clientUDPListener struct { done chan struct{} } -func newClientUDPListenerPair( - listenPacket func(network, address string) (net.PacketConn, error), - anyPortEnable bool, - writeTimeout time.Duration, -) (*clientUDPListener, *clientUDPListener, error) { +func newClientUDPListenerPair(c *Client) (*clientUDPListener, *clientUDPListener, error) { // choose two consecutive ports in range 65535-10000 // RTP port must be even and RTCP port odd for { @@ -55,9 +50,7 @@ func newClientUDPListenerPair( rtpPort := v*2 + 10000 rtpListener, err := newClientUDPListener( - listenPacket, - anyPortEnable, - writeTimeout, + c, false, net.JoinHostPort("", strconv.FormatInt(int64(rtpPort), 10)), ) @@ -67,9 +60,7 @@ func newClientUDPListenerPair( rtcpPort := rtpPort + 1 rtcpListener, err := newClientUDPListener( - listenPacket, - anyPortEnable, - writeTimeout, + c, false, net.JoinHostPort("", strconv.FormatInt(int64(rtcpPort), 10)), ) @@ -83,9 +74,7 @@ func newClientUDPListenerPair( } func newClientUDPListener( - listenPacket func(network, address string) (net.PacketConn, error), - anyPortEnable bool, - writeTimeout time.Duration, + c *Client, multicast bool, address string, ) (*clientUDPListener, error) { @@ -96,7 +85,7 @@ func newClientUDPListener( return nil, err } - tmp, err := listenPacket(restrictNetwork("udp", "224.0.0.0:"+port)) + tmp, err := c.ListenPacket(restrictNetwork("udp", "224.0.0.0:"+port)) if err != nil { return nil, err } @@ -115,7 +104,7 @@ func newClientUDPListener( pc = tmp.(*net.UDPConn) } else { - tmp, err := listenPacket(restrictNetwork("udp", address)) + tmp, err := c.ListenPacket(restrictNetwork("udp", address)) if err != nil { return nil, err } @@ -128,8 +117,7 @@ func newClientUDPListener( } return &clientUDPListener{ - anyPortEnable: anyPortEnable, - writeTimeout: writeTimeout, + c: c, pc: pc, lastPacketTime: int64Ptr(0), }, nil @@ -176,13 +164,13 @@ func (u *clientUDPListener) run() { // in case of anyPortEnable, store the port of the first packet we receive. // this reduces security issues - if u.anyPortEnable && u.readPort == 0 { + if u.c.AnyPortEnable && u.readPort == 0 { u.readPort = uaddr.Port } else if u.readPort != uaddr.Port { continue } - now := time.Now() + now := u.c.timeNow() atomic.StoreInt64(u.lastPacketTime, now.Unix()) u.readFunc(buf[:n]) @@ -192,7 +180,7 @@ func (u *clientUDPListener) run() { func (u *clientUDPListener) write(payload []byte) error { // no mutex is needed here since Write() has an internal lock. // https://github.com/golang/go/issues/27203#issuecomment-534386117 - u.pc.SetWriteDeadline(time.Now().Add(u.writeTimeout)) + u.pc.SetWriteDeadline(time.Now().Add(u.c.WriteTimeout)) _, err := u.pc.WriteTo(payload, u.writeAddr) return err } diff --git a/pkg/rtcpreceiver/rtcpreceiver.go b/pkg/rtcpreceiver/rtcpreceiver.go index 1cbdbada..caf8cf57 100644 --- a/pkg/rtcpreceiver/rtcpreceiver.go +++ b/pkg/rtcpreceiver/rtcpreceiver.go @@ -10,6 +10,13 @@ import ( "github.com/pion/rtp" ) +// seconds since 1st January 1900 +// higher 32 bits are the integer part, lower 32 bits are the fractional part +func ntpTimeRTCPToGo(v uint64) time.Time { + nano := int64((v>>32)*1000000000+(v&0xFFFFFFFF)) - 2208988800*1000000000 + return time.Unix(0, nano) +} + func randUint32() (uint32, error) { var b [4]byte _, err := rand.Read(b[:]) @@ -19,13 +26,12 @@ func randUint32() (uint32, error) { return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]), nil } -var timeNow = time.Now - // RTCPReceiver is a utility to generate RTCP receiver reports. type RTCPReceiver struct { - period time.Duration - receiverSSRC uint32 clockRate float64 + receiverSSRC uint32 + period time.Duration + timeNow func() time.Time writePacketRTCP func(rtcp.Packet) mutex sync.Mutex @@ -36,16 +42,17 @@ type RTCPReceiver struct { lastSSRC uint32 lastSequenceNumber uint16 lastTimeRTP uint32 - lastTimeNTP time.Time + lastTimeSystem time.Time totalLost uint32 totalLostSinceReport uint32 totalSinceReport uint32 jitter float64 // data from RTCP packets - firstSenderReportReceived bool - lastSenderReportNTP uint32 - lastSenderReportTime time.Time + firstSenderReportReceived bool + lastSenderReportTimeNTP uint64 + lastSenderReportTimeRTP uint32 + lastSenderReportTimeSystem time.Time terminate chan struct{} done chan struct{} @@ -53,9 +60,10 @@ type RTCPReceiver struct { // New allocates a RTCPReceiver. func New( - period time.Duration, - receiverSSRC *uint32, clockRate int, + receiverSSRC *uint32, + period time.Duration, + timeNow func() time.Time, writePacketRTCP func(rtcp.Packet), ) (*RTCPReceiver, error) { if receiverSSRC == nil { @@ -66,10 +74,15 @@ func New( receiverSSRC = &v } + if timeNow == nil { + timeNow = time.Now + } + rr := &RTCPReceiver{ - period: period, - receiverSSRC: *receiverSSRC, clockRate: float64(clockRate), + receiverSSRC: *receiverSSRC, + period: period, + timeNow: timeNow, writePacketRTCP: writePacketRTCP, terminate: make(chan struct{}), done: make(chan struct{}), @@ -95,7 +108,7 @@ func (rr *RTCPReceiver) run() { for { select { case <-t.C: - report := rr.report(timeNow()) + report := rr.report() if report != nil { rr.writePacketRTCP(report) } @@ -106,14 +119,16 @@ func (rr *RTCPReceiver) run() { } } -func (rr *RTCPReceiver) report(ts time.Time) rtcp.Packet { +func (rr *RTCPReceiver) report() rtcp.Packet { rr.mutex.Lock() defer rr.mutex.Unlock() - if !rr.firstRTPPacketReceived || rr.clockRate == 0 { + if !rr.firstRTPPacketReceived { return nil } + system := rr.timeNow() + report := &rtcp.ReceiverReport{ SSRC: rr.receiverSSRC, Reports: []rtcp.ReceptionReport{ @@ -131,12 +146,12 @@ func (rr *RTCPReceiver) report(ts time.Time) rtcp.Packet { if rr.firstSenderReportReceived { // middle 32 bits out of 64 in the NTP timestamp of last sender report - report.Reports[0].LastSenderReport = rr.lastSenderReportNTP + report.Reports[0].LastSenderReport = uint32(rr.lastSenderReportTimeNTP >> 16) // delay, expressed in units of 1/65536 seconds, between // receiving the last SR packet from source SSRC_n and sending this // reception report block - report.Reports[0].Delay = uint32(ts.Sub(rr.lastSenderReportTime).Seconds() * 65536) + report.Reports[0].Delay = uint32(system.Sub(rr.lastSenderReportTimeSystem).Seconds() * 65536) } rr.totalLostSinceReport = 0 @@ -146,7 +161,7 @@ func (rr *RTCPReceiver) report(ts time.Time) rtcp.Packet { } // ProcessPacket extracts the needed data from RTP packets. -func (rr *RTCPReceiver) ProcessPacket(pkt *rtp.Packet, ntp time.Time, ptsEqualsDTS bool) { +func (rr *RTCPReceiver) ProcessPacket(pkt *rtp.Packet, system time.Time, ptsEqualsDTS bool) { rr.mutex.Lock() defer rr.mutex.Unlock() @@ -160,7 +175,7 @@ func (rr *RTCPReceiver) ProcessPacket(pkt *rtp.Packet, ntp time.Time, ptsEqualsD if ptsEqualsDTS { rr.timeInitialized = true rr.lastTimeRTP = pkt.Timestamp - rr.lastTimeNTP = ntp + rr.lastTimeSystem = system } // subsequent packets @@ -194,7 +209,7 @@ func (rr *RTCPReceiver) ProcessPacket(pkt *rtp.Packet, ntp time.Time, ptsEqualsD if rr.timeInitialized { // update jitter // https://tools.ietf.org/html/rfc3550#page-39 - D := ntp.Sub(rr.lastTimeNTP).Seconds()*rr.clockRate - + D := system.Sub(rr.lastTimeSystem).Seconds()*rr.clockRate - (float64(pkt.Timestamp) - float64(rr.lastTimeRTP)) if D < 0 { D = -D @@ -204,20 +219,21 @@ func (rr *RTCPReceiver) ProcessPacket(pkt *rtp.Packet, ntp time.Time, ptsEqualsD rr.timeInitialized = true rr.lastTimeRTP = pkt.Timestamp - rr.lastTimeNTP = ntp + rr.lastTimeSystem = system rr.lastSSRC = pkt.SSRC } } } // ProcessSenderReport extracts the needed data from RTCP sender reports. -func (rr *RTCPReceiver) ProcessSenderReport(sr *rtcp.SenderReport, ts time.Time) { +func (rr *RTCPReceiver) ProcessSenderReport(sr *rtcp.SenderReport, system time.Time) { rr.mutex.Lock() defer rr.mutex.Unlock() rr.firstSenderReportReceived = true - rr.lastSenderReportNTP = uint32(sr.NTPTime >> 16) - rr.lastSenderReportTime = ts + rr.lastSenderReportTimeNTP = sr.NTPTime + rr.lastSenderReportTimeRTP = sr.RTPTime + rr.lastSenderReportTimeSystem = system } // LastSSRC returns the SSRC of the last RTP packet. @@ -226,3 +242,18 @@ func (rr *RTCPReceiver) LastSSRC() (uint32, bool) { defer rr.mutex.Unlock() return rr.lastSSRC, rr.firstRTPPacketReceived } + +// PacketNTP returns the NTP timestamp of the packet. +func (rr *RTCPReceiver) PacketNTP(ts uint32) (time.Time, bool) { + rr.mutex.Lock() + defer rr.mutex.Unlock() + + if !rr.firstSenderReportReceived { + return time.Time{}, false + } + + timeDiff := int32(ts - rr.lastSenderReportTimeRTP) + timeDiffGo := (time.Duration(timeDiff) * time.Second) / time.Duration(rr.clockRate) + + return ntpTimeRTCPToGo(rr.lastSenderReportTimeNTP).Add(timeDiffGo), true +} diff --git a/pkg/rtcpreceiver/rtcpreceiver_test.go b/pkg/rtcpreceiver/rtcpreceiver_test.go index 2a2333a8..7b2a2a08 100644 --- a/pkg/rtcpreceiver/rtcpreceiver_test.go +++ b/pkg/rtcpreceiver/rtcpreceiver_test.go @@ -9,14 +9,20 @@ import ( "github.com/stretchr/testify/require" ) -func TestRTCPReceiverBase(t *testing.T) { - timeNow = func() time.Time { - return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC) - } - done := make(chan struct{}) - v := uint32(0x65f83afb) +func uint32Ptr(v uint32) *uint32 { + return &v +} - rr, err := New(500*time.Millisecond, &v, 90000, +func TestRTCPReceiverBase(t *testing.T) { + done := make(chan struct{}) + + rr, err := New( + 90000, + uint32Ptr(0x65f83afb), + 500*time.Millisecond, + func() time.Time { + return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC) + }, func(pkt rtcp.Packet) { require.Equal(t, &rtcp.ReceiverReport{ SSRC: 0x65f83afb, @@ -77,25 +83,28 @@ func TestRTCPReceiverBase(t *testing.T) { func TestRTCPReceiverOverflow(t *testing.T) { done := make(chan struct{}) - timeNow = func() time.Time { - return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) - } - v := uint32(0x65f83afb) - rr, err := New(250*time.Millisecond, &v, 90000, func(pkt rtcp.Packet) { - require.Equal(t, &rtcp.ReceiverReport{ - SSRC: 0x65f83afb, - Reports: []rtcp.ReceptionReport{ - { - SSRC: 0xba9da416, - LastSequenceNumber: 1 << 16, - LastSenderReport: 0x887a17ce, - Delay: 1 * 65536, + rr, err := New( + 90000, + uint32Ptr(0x65f83afb), + 250*time.Millisecond, + func() time.Time { + return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) + }, + func(pkt rtcp.Packet) { + require.Equal(t, &rtcp.ReceiverReport{ + SSRC: 0x65f83afb, + Reports: []rtcp.ReceptionReport{ + { + SSRC: 0xba9da416, + LastSequenceNumber: 1 << 16, + LastSenderReport: 0x887a17ce, + Delay: 1 * 65536, + }, }, - }, - }, pkt) - close(done) - }) + }, pkt) + close(done) + }) require.NoError(t, err) defer rr.Close() @@ -144,30 +153,33 @@ func TestRTCPReceiverOverflow(t *testing.T) { func TestRTCPReceiverPacketLost(t *testing.T) { done := make(chan struct{}) - timeNow = func() time.Time { - return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) - } - v := uint32(0x65f83afb) - rr, err := New(500*time.Millisecond, &v, 90000, func(pkt rtcp.Packet) { - require.Equal(t, &rtcp.ReceiverReport{ - SSRC: 0x65f83afb, - Reports: []rtcp.ReceptionReport{ - { - SSRC: 0xba9da416, - LastSequenceNumber: 0x0122, - LastSenderReport: 0x887a17ce, - FractionLost: func() uint8 { - v := float64(1) / 3 - return uint8(v * 256) - }(), - TotalLost: 1, - Delay: 1 * 65536, + rr, err := New( + 90000, + uint32Ptr(0x65f83afb), + 500*time.Millisecond, + func() time.Time { + return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) + }, + func(pkt rtcp.Packet) { + require.Equal(t, &rtcp.ReceiverReport{ + SSRC: 0x65f83afb, + Reports: []rtcp.ReceptionReport{ + { + SSRC: 0xba9da416, + LastSequenceNumber: 0x0122, + LastSenderReport: 0x887a17ce, + FractionLost: func() uint8 { + v := float64(1) / 3 + return uint8(v * 256) + }(), + TotalLost: 1, + Delay: 1 * 65536, + }, }, - }, - }, pkt) - close(done) - }) + }, pkt) + close(done) + }) require.NoError(t, err) defer rr.Close() @@ -214,30 +226,33 @@ func TestRTCPReceiverPacketLost(t *testing.T) { func TestRTCPReceiverOverflowPacketLost(t *testing.T) { done := make(chan struct{}) - timeNow = func() time.Time { - return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) - } - v := uint32(0x65f83afb) - rr, err := New(500*time.Millisecond, &v, 90000, func(pkt rtcp.Packet) { - require.Equal(t, &rtcp.ReceiverReport{ - SSRC: 0x65f83afb, - Reports: []rtcp.ReceptionReport{ - { - SSRC: 0xba9da416, - LastSequenceNumber: 1<<16 | 0x0002, - LastSenderReport: 0x887a17ce, - FractionLost: func() uint8 { - v := float64(2) / 4 - return uint8(v * 256) - }(), - TotalLost: 2, - Delay: 1 * 65536, + rr, err := New( + 90000, + uint32Ptr(0x65f83afb), + 500*time.Millisecond, + func() time.Time { + return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) + }, + func(pkt rtcp.Packet) { + require.Equal(t, &rtcp.ReceiverReport{ + SSRC: 0x65f83afb, + Reports: []rtcp.ReceptionReport{ + { + SSRC: 0xba9da416, + LastSequenceNumber: 1<<16 | 0x0002, + LastSenderReport: 0x887a17ce, + FractionLost: func() uint8 { + v := float64(2) / 4 + return uint8(v * 256) + }(), + TotalLost: 2, + Delay: 1 * 65536, + }, }, - }, - }, pkt) - close(done) - }) + }, pkt) + close(done) + }) require.NoError(t, err) defer rr.Close() @@ -284,26 +299,29 @@ func TestRTCPReceiverOverflowPacketLost(t *testing.T) { func TestRTCPReceiverJitter(t *testing.T) { done := make(chan struct{}) - timeNow = func() time.Time { - return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC) - } - v := uint32(0x65f83afb) - rr, err := New(500*time.Millisecond, &v, 90000, func(pkt rtcp.Packet) { - require.Equal(t, &rtcp.ReceiverReport{ - SSRC: 0x65f83afb, - Reports: []rtcp.ReceptionReport{ - { - SSRC: 0xba9da416, - LastSequenceNumber: 948, - LastSenderReport: 0x887a17ce, - Delay: 2 * 65536, - Jitter: 45000 / 16, + rr, err := New( + 90000, + uint32Ptr(0x65f83afb), + 500*time.Millisecond, + func() time.Time { + return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC) + }, + func(pkt rtcp.Packet) { + require.Equal(t, &rtcp.ReceiverReport{ + SSRC: 0x65f83afb, + Reports: []rtcp.ReceptionReport{ + { + SSRC: 0xba9da416, + LastSequenceNumber: 948, + LastSenderReport: 0x887a17ce, + Delay: 2 * 65536, + Jitter: 45000 / 16, + }, }, - }, - }, pkt) - close(done) - }) + }, pkt) + close(done) + }) require.NoError(t, err) defer rr.Close() diff --git a/pkg/rtcpsender/rtcpsender.go b/pkg/rtcpsender/rtcpsender.go index d341e6ed..805d80e7 100644 --- a/pkg/rtcpsender/rtcpsender.go +++ b/pkg/rtcpsender/rtcpsender.go @@ -9,8 +9,6 @@ import ( "github.com/pion/rtp" ) -var timeNow = time.Now - // seconds since 1st January 1900 // higher 32 bits are the integer part, lower 32 bits are the fractional part func ntpTimeGoToRTCP(v time.Time) uint64 { @@ -22,6 +20,7 @@ func ntpTimeGoToRTCP(v time.Time) uint64 { type RTCPSender struct { clockRate float64 period time.Duration + timeNow func() time.Time writePacketRTCP func(rtcp.Packet) mutex sync.Mutex @@ -43,11 +42,17 @@ type RTCPSender struct { func New( clockRate int, period time.Duration, + timeNow func() time.Time, writePacketRTCP func(rtcp.Packet), ) *RTCPSender { + if timeNow == nil { + timeNow = time.Now + } + rs := &RTCPSender{ clockRate: float64(clockRate), period: period, + timeNow: timeNow, writePacketRTCP: writePacketRTCP, terminate: make(chan struct{}), done: make(chan struct{}), @@ -92,7 +97,7 @@ func (rs *RTCPSender) report() rtcp.Packet { return nil } - systemTimeDiff := timeNow().Sub(rs.lastTimeSystem) + systemTimeDiff := rs.timeNow().Sub(rs.lastTimeSystem) ntpTime := rs.lastTimeNTP.Add(systemTimeDiff) rtpTime := rs.lastTimeRTP + uint32(systemTimeDiff.Seconds()*rs.clockRate) @@ -114,7 +119,7 @@ func (rs *RTCPSender) ProcessPacket(pkt *rtp.Packet, ntp time.Time, ptsEqualsDTS rs.initialized = true rs.lastTimeRTP = pkt.Timestamp rs.lastTimeNTP = ntp - rs.lastTimeSystem = timeNow() + rs.lastTimeSystem = rs.timeNow() } rs.lastSSRC = pkt.SSRC diff --git a/pkg/rtcpsender/rtcpsender_test.go b/pkg/rtcpsender/rtcpsender_test.go index 3569b253..ac3dc7ab 100644 --- a/pkg/rtcpsender/rtcpsender_test.go +++ b/pkg/rtcpsender/rtcpsender_test.go @@ -20,17 +20,16 @@ func TestRTCPSender(t *testing.T) { curTime = v } - timeNow = func() time.Time { - mutex.Lock() - defer mutex.Unlock() - return curTime - } - sent := make(chan struct{}) rs := New( 90000, 100*time.Millisecond, + func() time.Time { + mutex.Lock() + defer mutex.Unlock() + return curTime + }, func(pkt rtcp.Packet) { require.Equal(t, &rtcp.SenderReport{ SSRC: 0xba9da416, diff --git a/server.go b/server.go index 9983219a..2e6a4bbd 100644 --- a/server.go +++ b/server.go @@ -118,10 +118,11 @@ type Server struct { // private // - udpReceiverReportPeriod time.Duration - senderReportPeriod time.Duration - sessionTimeout time.Duration - checkStreamPeriod time.Duration + timeNow func() time.Time + senderReportPeriod time.Duration + receiverReportPeriod time.Duration + sessionTimeout time.Duration + checkStreamPeriod time.Duration ctx context.Context ctxCancel func() @@ -176,12 +177,15 @@ func (s *Server) Start() error { } // private - if s.udpReceiverReportPeriod == 0 { - s.udpReceiverReportPeriod = 10 * time.Second + if s.timeNow == nil { + s.timeNow = time.Now } if s.senderReportPeriod == 0 { s.senderReportPeriod = 10 * time.Second } + if s.receiverReportPeriod == 0 { + s.receiverReportPeriod = 10 * time.Second + } if s.sessionTimeout == 0 { s.sessionTimeout = 1 * 60 * time.Second } diff --git a/server_play_test.go b/server_play_test.go index ddbda355..cfbf835a 100644 --- a/server_play_test.go +++ b/server_play_test.go @@ -6,6 +6,7 @@ import ( "net" "strconv" "strings" + "sync" "sync/atomic" "testing" "time" @@ -977,6 +978,9 @@ func TestServerPlayRTCPReport(t *testing.T) { t.Run(ca, func(t *testing.T) { var stream *ServerStream + var curTime time.Time + var curTimeMutex sync.Mutex + s := &Server{ Handler: &testServerHandler{ onDescribe: func(ctx *ServerHandlerOnDescribeCtx) (*base.Response, *ServerStream, error) { @@ -995,10 +999,15 @@ func TestServerPlayRTCPReport(t *testing.T) { }, nil }, }, - senderReportPeriod: 1 * time.Second, - RTSPAddress: "localhost:8554", - UDPRTPAddress: "127.0.0.1:8000", - UDPRTCPAddress: "127.0.0.1:8001", + RTSPAddress: "localhost:8554", + UDPRTPAddress: "127.0.0.1:8000", + UDPRTCPAddress: "127.0.0.1:8001", + timeNow: func() time.Time { + curTimeMutex.Lock() + defer curTimeMutex.Unlock() + return curTime + }, + senderReportPeriod: 100 * time.Millisecond, } err := s.Start() @@ -1052,17 +1061,27 @@ func TestServerPlayRTCPReport(t *testing.T) { doPlay(t, conn, "rtsp://localhost:8554/teststream", session) - for i := 0; i < 2; i++ { - err := stream.WritePacketRTP(stream.Medias()[0], &rtp.Packet{ + curTimeMutex.Lock() + curTime = time.Date(2014, 6, 7, 15, 0, 0, 0, time.UTC) + curTimeMutex.Unlock() + + err = stream.WritePacketRTPWithNTP( + stream.Medias()[0], + &rtp.Packet{ Header: rtp.Header{ Version: 2, PayloadType: 96, SSRC: 0x38F27A2F, + Timestamp: 240000, }, Payload: []byte{0x05}, // IDR - }) - require.NoError(t, err) - } + }, + time.Date(2017, 8, 10, 12, 22, 0, 0, time.UTC)) + require.NoError(t, err) + + curTimeMutex.Lock() + curTime = time.Date(2014, 6, 7, 15, 0, 30, 0, time.UTC) + curTimeMutex.Unlock() var buf []byte @@ -1073,10 +1092,8 @@ func TestServerPlayRTCPReport(t *testing.T) { require.NoError(t, err) buf = buf[:n] } else { - for i := 0; i < 2; i++ { - _, err := conn.ReadInterleavedFrame() - require.NoError(t, err) - } + _, err := conn.ReadInterleavedFrame() + require.NoError(t, err) f, err := conn.ReadInterleavedFrame() require.NoError(t, err) @@ -1088,10 +1105,10 @@ func TestServerPlayRTCPReport(t *testing.T) { require.NoError(t, err) require.Equal(t, &rtcp.SenderReport{ SSRC: 0x38F27A2F, - NTPTime: packets[0].(*rtcp.SenderReport).NTPTime, - RTPTime: packets[0].(*rtcp.SenderReport).RTPTime, - PacketCount: 2, - OctetCount: 2, + NTPTime: ntpTimeGoToRTCP(time.Date(2017, 8, 10, 12, 22, 30, 0, time.UTC)), + RTPTime: 240000 + 90000*30, + PacketCount: 1, + OctetCount: 1, }, packets[0]) doTeardown(t, conn, "rtsp://localhost:8554/teststream", session) diff --git a/server_record_test.go b/server_record_test.go index 6163d621..143c2dff 100644 --- a/server_record_test.go +++ b/server_record_test.go @@ -818,10 +818,10 @@ func TestServerRecordRTCPReport(t *testing.T) { }, nil }, }, - udpReceiverReportPeriod: 1 * time.Second, - UDPRTPAddress: "127.0.0.1:8000", - UDPRTCPAddress: "127.0.0.1:8001", - RTSPAddress: "localhost:8554", + UDPRTPAddress: "127.0.0.1:8000", + UDPRTCPAddress: "127.0.0.1:8001", + RTSPAddress: "localhost:8554", + receiverReportPeriod: 500 * time.Millisecond, } err := s.Start() @@ -874,7 +874,7 @@ func TestServerRecordRTCPReport(t *testing.T) { Timestamp: 54352, SSRC: 753621, }, - Payload: []byte{0x01, 0x02, 0x03, 0x04}, + Payload: []byte{1, 2, 3, 4}, }).Marshal() _, err = l1.WriteTo(byts, &net.UDPAddr{ IP: net.ParseIP("127.0.0.1"), @@ -883,11 +883,11 @@ func TestServerRecordRTCPReport(t *testing.T) { require.NoError(t, err) // wait for the packet's SSRC to be saved - time.Sleep(500 * time.Millisecond) + time.Sleep(200 * time.Millisecond) byts, _ = (&rtcp.SenderReport{ SSRC: 753621, - NTPTime: 0xcbddcc34999997ff, + NTPTime: ntpTimeGoToRTCP(time.Date(2018, 2, 20, 19, 0, 0, 0, time.UTC)), RTPTime: 54352, PacketCount: 1, OctetCount: 4, @@ -916,7 +916,7 @@ func TestServerRecordRTCPReport(t *testing.T) { { SSRC: rr.Reports[0].SSRC, LastSequenceNumber: 534, - LastSenderReport: rr.Reports[0].LastSenderReport, + LastSenderReport: 4004511744, Delay: rr.Reports[0].Delay, }, }, @@ -1418,3 +1418,137 @@ func TestServerRecordDecodeErrors(t *testing.T) { }) } } + +func TestServerRecordPacketNTP(t *testing.T) { + recv := make(chan struct{}) + first := false + + s := &Server{ + Handler: &testServerHandler{ + onAnnounce: func(ctx *ServerHandlerOnAnnounceCtx) (*base.Response, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + }, + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, nil, nil + }, + onRecord: func(ctx *ServerHandlerOnRecordCtx) (*base.Response, error) { + ctx.Session.OnPacketRTPAny(func(medi *media.Media, forma format.Format, pkt *rtp.Packet) { + if !first { + first = true + } else { + ntp, ok := ctx.Session.PacketNTP(medi, pkt) + require.Equal(t, true, ok) + require.Equal(t, time.Date(2018, 2, 20, 19, 0, 1, 0, time.UTC), ntp.UTC()) + close(recv) + } + }) + + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + }, + }, + UDPRTPAddress: "127.0.0.1:8000", + UDPRTCPAddress: "127.0.0.1:8001", + RTSPAddress: "localhost:8554", + } + + err := s.Start() + require.NoError(t, err) + defer s.Close() + + nconn, err := net.Dial("tcp", "localhost:8554") + require.NoError(t, err) + defer nconn.Close() + conn := conn.NewConn(nconn) + + medias := media.Medias{testH264Media} + resetMediaControls(medias) + + doAnnounce(t, conn, "rtsp://localhost:8554/teststream", medias) + + l1, err := net.ListenPacket("udp", "localhost:34556") + require.NoError(t, err) + defer l1.Close() + + l2, err := net.ListenPacket("udp", "localhost:34557") + require.NoError(t, err) + defer l2.Close() + + inTH := &headers.Transport{ + Delivery: func() *headers.TransportDelivery { + v := headers.TransportDeliveryUnicast + return &v + }(), + Mode: func() *headers.TransportMode { + v := headers.TransportModeRecord + return &v + }(), + Protocol: headers.TransportProtocolUDP, + ClientPorts: &[2]int{34556, 34557}, + } + + res, th := doSetup(t, conn, "rtsp://localhost:8554/teststream/"+medias[0].Control, inTH, "") + + session := readSession(t, res) + + doRecord(t, conn, "rtsp://localhost:8554/teststream", session) + + byts, _ := (&rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 534, + Timestamp: 54352, + SSRC: 753621, + }, + Payload: []byte{1, 2, 3, 4}, + }).Marshal() + _, err = l1.WriteTo(byts, &net.UDPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: th.ServerPorts[0], + }) + require.NoError(t, err) + + // wait for the packet's SSRC to be saved + time.Sleep(100 * time.Millisecond) + + byts, _ = (&rtcp.SenderReport{ + SSRC: 753621, + NTPTime: ntpTimeGoToRTCP(time.Date(2018, 2, 20, 19, 0, 0, 0, time.UTC)), + RTPTime: 54352, + PacketCount: 1, + OctetCount: 4, + }).Marshal() + _, err = l2.WriteTo(byts, &net.UDPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: th.ServerPorts[1], + }) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + byts, _ = (&rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 535, + Timestamp: 54352 + 90000, + SSRC: 753621, + }, + Payload: []byte{1, 2, 3, 4}, + }).Marshal() + _, err = l1.WriteTo(byts, &net.UDPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: th.ServerPorts[0], + }) + require.NoError(t, err) + + <-recv +} diff --git a/server_session.go b/server_session.go index d9e1e693..ae94d6e1 100644 --- a/server_session.go +++ b/server_session.go @@ -181,7 +181,7 @@ func newServerSession( bytesReceived: new(uint64), bytesSent: new(uint64), conns: make(map[*ServerConn]struct{}), - lastRequestTime: time.Now(), + lastRequestTime: s.timeNow(), udpCheckStreamTimer: emptyTimer(), chHandleRequest: make(chan sessionRequestReq), chRemoveConn: make(chan *ServerConn), @@ -326,7 +326,7 @@ func (ss *ServerSession) runInner() error { for { select { case req := <-ss.chHandleRequest: - ss.lastRequestTime = time.Now() + ss.lastRequestTime = ss.s.timeNow() if _, ok := ss.conns[req.sc]; !ok { ss.conns[req.sc] = struct{}{} @@ -402,7 +402,7 @@ func (ss *ServerSession) runInner() error { } case <-ss.udpCheckStreamTimer.C: - now := time.Now() + now := ss.s.timeNow() lft := atomic.LoadInt64(ss.udpLastPacketTime) @@ -873,7 +873,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( ss.state = ServerSessionStatePlay - v := time.Now().Unix() + v := ss.s.timeNow().Unix() ss.udpLastPacketTime = &v for _, sm := range ss.setuppedMedias { @@ -897,7 +897,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( } var ri headers.RTPInfo - now := time.Now() + now := ss.s.timeNow() for _, sm := range ss.setuppedMediasOrdered { entry := ss.setuppedStream.rtpInfoEntry(sm.media, now) @@ -965,7 +965,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( ss.state = ServerSessionStateRecord - v := time.Now().Unix() + v := ss.s.timeNow().Unix() ss.udpLastPacketTime = &v for _, sm := range ss.setuppedMedias { @@ -1186,6 +1186,14 @@ func (ss *ServerSession) WritePacketRTCP(medi *media.Media, pkt rtcp.Packet) err return nil } +// PacketNTP returns the NTP timestamp of an incoming RTP packet. +// The NTP timestamp is computed from sender reports. +func (ss *ServerSession) PacketNTP(medi *media.Media, pkt *rtp.Packet) (time.Time, bool) { + sm := ss.setuppedMedias[medi] + sf := sm.formats[pkt.PayloadType] + return sf.rtcpReceiver.PacketNTP(pkt.Timestamp) +} + func (ss *ServerSession) handleRequest(req sessionRequestReq) (*base.Response, *ServerSession, error) { select { case ss.chHandleRequest <- req: diff --git a/server_session_format.go b/server_session_format.go index 6872cf4c..b2f45ecd 100644 --- a/server_session_format.go +++ b/server_session_format.go @@ -18,7 +18,7 @@ type serverSessionFormat struct { format format.Format udpReorderer *rtpreorderer.Reorderer tcpLossDetector *rtplossdetector.LossDetector - udpRTCPReceiver *rtcpreceiver.RTCPReceiver + rtcpReceiver *rtcpreceiver.RTCPReceiver onPacketRTP OnPacketRTPFunc } @@ -34,27 +34,31 @@ func (sf *serverSessionFormat) start() { if sf.sm.ss.state != ServerSessionStatePlay { if *sf.sm.ss.setuppedTransport == TransportUDP || *sf.sm.ss.setuppedTransport == TransportUDPMulticast { sf.udpReorderer = rtpreorderer.New() - var err error - sf.udpRTCPReceiver, err = rtcpreceiver.New( - sf.sm.ss.s.udpReceiverReportPeriod, - nil, - sf.format.ClockRate(), - func(pkt rtcp.Packet) { - sf.sm.ss.WritePacketRTCP(sf.sm.media, pkt) //nolint:errcheck - }) - if err != nil { - panic(err) - } } else { sf.tcpLossDetector = rtplossdetector.New() } + + var err error + sf.rtcpReceiver, err = rtcpreceiver.New( + sf.format.ClockRate(), + nil, + sf.sm.ss.s.receiverReportPeriod, + sf.sm.ss.s.timeNow, + func(pkt rtcp.Packet) { + if *sf.sm.ss.setuppedTransport == TransportUDP || *sf.sm.ss.setuppedTransport == TransportUDPMulticast { + sf.sm.ss.WritePacketRTCP(sf.sm.media, pkt) //nolint:errcheck + } + }) + if err != nil { + panic(err) + } } } func (sf *serverSessionFormat) stop() { - if sf.udpRTCPReceiver != nil { - sf.udpRTCPReceiver.Close() - sf.udpRTCPReceiver = nil + if sf.rtcpReceiver != nil { + sf.rtcpReceiver.Close() + sf.rtcpReceiver = nil } } @@ -73,7 +77,7 @@ func (sf *serverSessionFormat) readRTPUDP(pkt *rtp.Packet, now time.Time) { } for _, pkt := range packets { - sf.udpRTCPReceiver.ProcessPacket(pkt, now, sf.format.PTSEqualsDTS(pkt)) + sf.rtcpReceiver.ProcessPacket(pkt, now, sf.format.PTSEqualsDTS(pkt)) sf.onPacketRTP(pkt) } } @@ -92,5 +96,7 @@ func (sf *serverSessionFormat) readRTPTCP(pkt *rtp.Packet) { // do not return } + now := sf.sm.ss.s.timeNow() + sf.rtcpReceiver.ProcessPacket(pkt, now, sf.format.PTSEqualsDTS(pkt)) sf.onPacketRTP(pkt) } diff --git a/server_session_media.go b/server_session_media.go index d911621d..cf484b67 100644 --- a/server_session_media.go +++ b/server_session_media.go @@ -160,7 +160,7 @@ func (sm *serverSessionMedia) readRTCPUDPPlay(payload []byte) { return } - now := time.Now() + now := sm.ss.s.timeNow() atomic.StoreInt64(sm.ss.udpLastPacketTime, now.Unix()) for _, pkt := range packets { @@ -191,7 +191,7 @@ func (sm *serverSessionMedia) readRTPUDPRecord(payload []byte) { return } - now := time.Now() + now := sm.ss.s.timeNow() atomic.StoreInt64(sm.ss.udpLastPacketTime, now.Unix()) forma.readRTPUDP(pkt, now) @@ -213,19 +213,17 @@ func (sm *serverSessionMedia) readRTCPUDPRecord(payload []byte) { return } - now := time.Now() + now := sm.ss.s.timeNow() atomic.StoreInt64(sm.ss.udpLastPacketTime, now.Unix()) for _, pkt := range packets { if sr, ok := pkt.(*rtcp.SenderReport); ok { format := serverFindFormatWithSSRC(sm.formats, sr.SSRC) if format != nil { - format.udpRTCPReceiver.ProcessSenderReport(sr, now) + format.rtcpReceiver.ProcessSenderReport(sr, now) } } - } - for _, pkt := range packets { sm.onPacketRTCP(pkt) } } @@ -281,7 +279,16 @@ func (sm *serverSessionMedia) readRTCPTCPRecord(payload []byte) { return } + now := sm.ss.s.timeNow() + for _, pkt := range packets { + if sr, ok := pkt.(*rtcp.SenderReport); ok { + format := serverFindFormatWithSSRC(sm.formats, sr.SSRC) + if format != nil { + format.rtcpReceiver.ProcessSenderReport(sr, now) + } + } + sm.onPacketRTCP(pkt) } } diff --git a/server_stream.go b/server_stream.go index da8ffed0..6d389861 100644 --- a/server_stream.go +++ b/server_stream.go @@ -236,7 +236,7 @@ func (st *ServerStream) readerSetInactive(ss *ServerSession) { // WritePacketRTP writes a RTP packet to all the readers of the stream. func (st *ServerStream) WritePacketRTP(medi *media.Media, pkt *rtp.Packet) error { - return st.WritePacketRTPWithNTP(medi, pkt, time.Now()) + return st.WritePacketRTPWithNTP(medi, pkt, st.s.timeNow()) } // WritePacketRTPWithNTP writes a RTP packet to all the readers of the stream. diff --git a/server_stream_media.go b/server_stream_media.go index 0cbc40bc..4da3b598 100644 --- a/server_stream_media.go +++ b/server_stream_media.go @@ -35,6 +35,7 @@ func newServerStreamMedia(st *ServerStream, medi *media.Media, trackID int) *ser tr.rtcpSender = rtcpsender.New( forma.ClockRate(), st.s.senderReportPeriod, + st.s.timeNow, func(pkt rtcp.Packet) { if !st.s.DisableRTCPSenderReports { st.WritePacketRTCP(cmedia, pkt) //nolint:errcheck diff --git a/server_udp_listener.go b/server_udp_listener.go index 2cdf3cbe..c0f4e43c 100644 --- a/server_udp_listener.go +++ b/server_udp_listener.go @@ -15,7 +15,7 @@ func serverFindFormatWithSSRC( ssrc uint32, ) *serverSessionFormat { for _, format := range formats { - tssrc, ok := format.udpRTCPReceiver.LastSSRC() + tssrc, ok := format.rtcpReceiver.LastSSRC() if ok && tssrc == ssrc { return format }