add Client.PacketNTP(), ServerSession.PacketNTP()

This commit is contained in:
aler9
2023-08-15 00:56:01 +02:00
parent 4ad57d6a75
commit bfef17b717
19 changed files with 704 additions and 263 deletions

View File

@@ -285,10 +285,11 @@ type Client struct {
// private // private
// //
senderReportPeriod time.Duration timeNow func() time.Time
udpReceiverReportPeriod time.Duration senderReportPeriod time.Duration
checkTimeoutPeriod time.Duration receiverReportPeriod time.Duration
keepalivePeriod time.Duration checkTimeoutPeriod time.Duration
keepalivePeriod time.Duration
connURL *url.URL connURL *url.URL
ctx context.Context ctx context.Context
@@ -396,12 +397,15 @@ func (c *Client) Start(scheme string, host string) error {
} }
// private // private
if c.timeNow == nil {
c.timeNow = time.Now
}
if c.senderReportPeriod == 0 { if c.senderReportPeriod == 0 {
c.senderReportPeriod = 10 * time.Second c.senderReportPeriod = 10 * time.Second
} }
if c.udpReceiverReportPeriod == 0 { if c.receiverReportPeriod == 0 {
// some cameras require a maximum of 5secs between keepalives // some cameras require a maximum of 5secs between keepalives
c.udpReceiverReportPeriod = 5 * time.Second c.receiverReportPeriod = 5 * time.Second
} }
if c.checkTimeoutPeriod == 0 { if c.checkTimeoutPeriod == 0 {
c.checkTimeoutPeriod = 1 * time.Second c.checkTimeoutPeriod = 1 * time.Second
@@ -680,7 +684,7 @@ func (c *Client) playRecordStart() {
default: // TCP default: // TCP
c.checkTimeoutTimer = time.NewTimer(c.checkTimeoutPeriod) c.checkTimeoutTimer = time.NewTimer(c.checkTimeoutPeriod)
v := time.Now().Unix() v := c.timeNow().Unix()
c.tcpLastFrameTime = &v c.tcpLastFrameTime = &v
} }
} }
@@ -870,7 +874,7 @@ func (c *Client) atLeastOneUDPPacketHasBeenReceived() bool {
} }
func (c *Client) isInUDPTimeout() bool { func (c *Client) isInUDPTimeout() bool {
now := time.Now() now := c.timeNow()
for _, ct := range c.medias { for _, ct := range c.medias {
lft := time.Unix(atomic.LoadInt64(ct.udpRTPListener.lastPacketTime), 0) lft := time.Unix(atomic.LoadInt64(ct.udpRTPListener.lastPacketTime), 0)
if now.Sub(lft) < c.ReadTimeout { if now.Sub(lft) < c.ReadTimeout {
@@ -886,7 +890,7 @@ func (c *Client) isInUDPTimeout() bool {
} }
func (c *Client) isInTCPTimeout() bool { func (c *Client) isInTCPTimeout() bool {
now := time.Now() now := c.timeNow()
lft := time.Unix(atomic.LoadInt64(c.tcpLastFrameTime), 0) lft := time.Unix(atomic.LoadInt64(c.tcpLastFrameTime), 0)
return now.Sub(lft) >= c.ReadTimeout return now.Sub(lft) >= c.ReadTimeout
} }
@@ -1653,7 +1657,7 @@ func (c *Client) OnPacketRTCP(medi *media.Media, cb OnPacketRTCPFunc) {
// WritePacketRTP writes a RTP packet to the server. // WritePacketRTP writes a RTP packet to the server.
func (c *Client) WritePacketRTP(medi *media.Media, pkt *rtp.Packet) error { func (c *Client) WritePacketRTP(medi *media.Media, pkt *rtp.Packet) error {
return c.WritePacketRTPWithNTP(medi, pkt, time.Now()) return c.WritePacketRTPWithNTP(medi, pkt, c.timeNow())
} }
// WritePacketRTPWithNTP writes a RTP packet to the server. // WritePacketRTPWithNTP writes a RTP packet to the server.
@@ -1695,3 +1699,11 @@ func (c *Client) WritePacketRTCP(medi *media.Media, pkt rtcp.Packet) error {
cm.writePacketRTCP(byts) cm.writePacketRTCP(byts)
return nil return nil
} }
// PacketNTP returns the NTP timestamp of an incoming RTP packet.
// The NTP timestamp is computed from sender reports.
func (c *Client) PacketNTP(medi *media.Media, pkt *rtp.Packet) (time.Time, bool) {
cm := c.medias[medi]
ct := cm.formats[pkt.PayloadType]
return ct.rtcpReceiver.PacketNTP(pkt.Timestamp)
}

View File

@@ -18,8 +18,8 @@ type clientFormat struct {
cm *clientMedia cm *clientMedia
format format.Format format format.Format
udpReorderer *rtpreorderer.Reorderer // play udpReorderer *rtpreorderer.Reorderer // play
udpRTCPReceiver *rtcpreceiver.RTCPReceiver // play
tcpLossDetector *rtplossdetector.LossDetector // play tcpLossDetector *rtplossdetector.LossDetector // play
rtcpReceiver *rtcpreceiver.RTCPReceiver // play
rtcpSender *rtcpsender.RTCPSender // record rtcpSender *rtcpsender.RTCPSender // record
onPacketRTP OnPacketRTPFunc onPacketRTP OnPacketRTPFunc
} }
@@ -36,24 +36,29 @@ func (ct *clientFormat) start() {
if ct.cm.c.state == clientStatePlay { if ct.cm.c.state == clientStatePlay {
if ct.cm.udpRTPListener != nil { if ct.cm.udpRTPListener != nil {
ct.udpReorderer = rtpreorderer.New() ct.udpReorderer = rtpreorderer.New()
var err error
ct.udpRTCPReceiver, err = rtcpreceiver.New(
ct.cm.c.udpReceiverReportPeriod,
nil,
ct.format.ClockRate(), func(pkt rtcp.Packet) {
ct.cm.c.WritePacketRTCP(ct.cm.media, pkt) //nolint:errcheck
})
if err != nil {
panic(err)
}
} else { } else {
ct.tcpLossDetector = rtplossdetector.New() ct.tcpLossDetector = rtplossdetector.New()
} }
var err error
ct.rtcpReceiver, err = rtcpreceiver.New(
ct.format.ClockRate(),
nil,
ct.cm.c.receiverReportPeriod,
ct.cm.c.timeNow,
func(pkt rtcp.Packet) {
if ct.cm.udpRTPListener != nil {
ct.cm.c.WritePacketRTCP(ct.cm.media, pkt) //nolint:errcheck
}
})
if err != nil {
panic(err)
}
} else { } else {
ct.rtcpSender = rtcpsender.New( ct.rtcpSender = rtcpsender.New(
ct.format.ClockRate(), ct.format.ClockRate(),
ct.cm.c.senderReportPeriod, ct.cm.c.senderReportPeriod,
ct.cm.c.timeNow,
func(pkt rtcp.Packet) { func(pkt rtcp.Packet) {
if !ct.cm.c.DisableRTCPSenderReports { if !ct.cm.c.DisableRTCPSenderReports {
ct.cm.c.WritePacketRTCP(ct.cm.media, pkt) //nolint:errcheck ct.cm.c.WritePacketRTCP(ct.cm.media, pkt) //nolint:errcheck
@@ -63,9 +68,9 @@ func (ct *clientFormat) start() {
} }
func (ct *clientFormat) stop() { func (ct *clientFormat) stop() {
if ct.udpRTCPReceiver != nil { if ct.rtcpReceiver != nil {
ct.udpRTCPReceiver.Close() ct.rtcpReceiver.Close()
ct.udpRTCPReceiver = nil ct.rtcpReceiver = nil
} }
if ct.rtcpSender != nil { if ct.rtcpSender != nil {
@@ -95,10 +100,10 @@ func (ct *clientFormat) readRTPUDP(pkt *rtp.Packet) {
// do not return // do not return
} }
now := time.Now() now := ct.cm.c.timeNow()
for _, pkt := range packets { for _, pkt := range packets {
ct.udpRTCPReceiver.ProcessPacket(pkt, now, ct.format.PTSEqualsDTS(pkt)) ct.rtcpReceiver.ProcessPacket(pkt, now, ct.format.PTSEqualsDTS(pkt))
ct.onPacketRTP(pkt) ct.onPacketRTP(pkt)
} }
} }
@@ -117,5 +122,7 @@ func (ct *clientFormat) readRTPTCP(pkt *rtp.Packet) {
// do not return // do not return
} }
now := ct.cm.c.timeNow()
ct.rtcpReceiver.ProcessPacket(pkt, now, ct.format.PTSEqualsDTS(pkt))
ct.onPacketRTP(pkt) ct.onPacketRTP(pkt)
} }

View File

@@ -44,9 +44,7 @@ func (cm *clientMedia) close() {
func (cm *clientMedia) allocateUDPListeners(multicast bool, rtpAddress string, rtcpAddress string) error { func (cm *clientMedia) allocateUDPListeners(multicast bool, rtpAddress string, rtcpAddress string) error {
if rtpAddress != ":0" { if rtpAddress != ":0" {
l1, err := newClientUDPListener( l1, err := newClientUDPListener(
cm.c.ListenPacket, cm.c,
cm.c.AnyPortEnable,
cm.c.WriteTimeout,
multicast, multicast,
rtpAddress, rtpAddress,
) )
@@ -55,9 +53,7 @@ func (cm *clientMedia) allocateUDPListeners(multicast bool, rtpAddress string, r
} }
l2, err := newClientUDPListener( l2, err := newClientUDPListener(
cm.c.ListenPacket, cm.c,
cm.c.AnyPortEnable,
cm.c.WriteTimeout,
multicast, multicast,
rtcpAddress, rtcpAddress,
) )
@@ -71,11 +67,7 @@ func (cm *clientMedia) allocateUDPListeners(multicast bool, rtpAddress string, r
} }
var err error var err error
cm.udpRTPListener, cm.udpRTCPListener, err = newClientUDPListenerPair( cm.udpRTPListener, cm.udpRTCPListener, err = newClientUDPListenerPair(cm.c)
cm.c.ListenPacket,
cm.c.AnyPortEnable,
cm.c.WriteTimeout,
)
return err return err
} }
@@ -144,7 +136,7 @@ func (cm *clientMedia) stop() {
func (cm *clientMedia) findFormatWithSSRC(ssrc uint32) *clientFormat { func (cm *clientMedia) findFormatWithSSRC(ssrc uint32) *clientFormat {
for _, format := range cm.formats { for _, format := range cm.formats {
tssrc, ok := format.udpRTCPReceiver.LastSSRC() tssrc, ok := format.rtcpReceiver.LastSSRC()
if ok && tssrc == ssrc { if ok && tssrc == ssrc {
return format return format
} }
@@ -183,7 +175,7 @@ func (cm *clientMedia) writePacketRTCP(byts []byte) {
} }
func (cm *clientMedia) readRTPTCPPlay(payload []byte) { func (cm *clientMedia) readRTPTCPPlay(payload []byte) {
now := time.Now() now := cm.c.timeNow()
atomic.StoreInt64(cm.c.tcpLastFrameTime, now.Unix()) atomic.StoreInt64(cm.c.tcpLastFrameTime, now.Unix())
pkt := &rtp.Packet{} pkt := &rtp.Packet{}
@@ -203,7 +195,7 @@ func (cm *clientMedia) readRTPTCPPlay(payload []byte) {
} }
func (cm *clientMedia) readRTCPTCPPlay(payload []byte) { func (cm *clientMedia) readRTCPTCPPlay(payload []byte) {
now := time.Now() now := cm.c.timeNow()
atomic.StoreInt64(cm.c.tcpLastFrameTime, now.Unix()) atomic.StoreInt64(cm.c.tcpLastFrameTime, now.Unix())
if len(payload) > udpMaxPayloadSize { if len(payload) > udpMaxPayloadSize {
@@ -219,6 +211,13 @@ func (cm *clientMedia) readRTCPTCPPlay(payload []byte) {
} }
for _, pkt := range packets { for _, pkt := range packets {
if sr, ok := pkt.(*rtcp.SenderReport); ok {
format := cm.findFormatWithSSRC(sr.SSRC)
if format != nil {
format.rtcpReceiver.ProcessSenderReport(sr, now)
}
}
cm.onPacketRTCP(pkt) cm.onPacketRTCP(pkt)
} }
} }
@@ -271,7 +270,7 @@ func (cm *clientMedia) readRTPUDPPlay(payload []byte) {
} }
func (cm *clientMedia) readRTCPUDPPlay(payload []byte) { func (cm *clientMedia) readRTCPUDPPlay(payload []byte) {
now := time.Now() now := cm.c.timeNow()
plen := len(payload) plen := len(payload)
atomic.AddUint64(cm.c.BytesReceived, uint64(plen)) atomic.AddUint64(cm.c.BytesReceived, uint64(plen))
@@ -291,7 +290,7 @@ func (cm *clientMedia) readRTCPUDPPlay(payload []byte) {
if sr, ok := pkt.(*rtcp.SenderReport); ok { if sr, ok := pkt.(*rtcp.SenderReport); ok {
format := cm.findFormatWithSSRC(sr.SSRC) format := cm.findFormatWithSSRC(sr.SSRC)
if format != nil { if format != nil {
format.udpRTCPReceiver.ProcessSenderReport(sr, now) format.rtcpReceiver.ProcessSenderReport(sr, now)
} }
} }

View File

@@ -2138,12 +2138,12 @@ func TestClientPlayRTCPReport(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// wait for the packet's SSRC to be saved // wait for the packet's SSRC to be saved
time.Sleep(500 * time.Millisecond) time.Sleep(200 * time.Millisecond)
sr := &rtcp.SenderReport{ sr := &rtcp.SenderReport{
SSRC: 753621, SSRC: 753621,
NTPTime: 0, NTPTime: ntpTimeGoToRTCP(time.Date(2017, 8, 12, 15, 30, 0, 0, time.UTC)),
RTPTime: 0, RTPTime: 54352,
PacketCount: 1, PacketCount: 1,
OctetCount: 4, OctetCount: 4,
} }
@@ -2167,7 +2167,7 @@ func TestClientPlayRTCPReport(t *testing.T) {
{ {
SSRC: rr.Reports[0].SSRC, SSRC: rr.Reports[0].SSRC,
LastSequenceNumber: 946, LastSequenceNumber: 946,
LastSenderReport: rr.Reports[0].LastSenderReport, LastSenderReport: 2641887232,
Delay: rr.Reports[0].Delay, Delay: rr.Reports[0].Delay,
}, },
}, },
@@ -2187,7 +2187,7 @@ func TestClientPlayRTCPReport(t *testing.T) {
}() }()
c := Client{ c := Client{
udpReceiverReportPeriod: 1 * time.Second, receiverReportPeriod: 500 * time.Millisecond,
} }
err = readAll(&c, "rtsp://localhost:8554/teststream", nil) err = readAll(&c, "rtsp://localhost:8554/teststream", nil)
@@ -3166,3 +3166,184 @@ func TestClientPlayDecodeErrors(t *testing.T) {
}) })
} }
} }
func TestClientPlayPacketNTP(t *testing.T) {
l, err := net.Listen("tcp", "localhost:8554")
require.NoError(t, err)
defer l.Close()
serverDone := make(chan struct{})
defer func() { <-serverDone }()
go func() {
defer close(serverDone)
nconn, err := l.Accept()
require.NoError(t, err)
defer nconn.Close()
conn := conn.NewConn(nconn)
req, err := conn.ReadRequest()
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
err = conn.WriteResponse(&base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Public": base.HeaderValue{strings.Join([]string{
string(base.Describe),
string(base.Setup),
string(base.Play),
}, ", ")},
},
})
require.NoError(t, err)
req, err = conn.ReadRequest()
require.NoError(t, err)
require.Equal(t, base.Describe, req.Method)
medias := media.Medias{testH264Media}
resetMediaControls(medias)
err = conn.WriteResponse(&base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Content-Type": base.HeaderValue{"application/sdp"},
"Content-Base": base.HeaderValue{"rtsp://localhost:8554/teststream/"},
},
Body: mustMarshalMedias(medias),
})
require.NoError(t, err)
req, err = conn.ReadRequest()
require.NoError(t, err)
require.Equal(t, base.Setup, req.Method)
var inTH headers.Transport
err = inTH.Unmarshal(req.Header["Transport"])
require.NoError(t, err)
l1, err := net.ListenPacket("udp", "localhost:27556")
require.NoError(t, err)
defer l1.Close()
l2, err := net.ListenPacket("udp", "localhost:27557")
require.NoError(t, err)
defer l2.Close()
err = conn.WriteResponse(&base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Transport": headers.Transport{
Protocol: headers.TransportProtocolUDP,
Delivery: func() *headers.TransportDelivery {
v := headers.TransportDeliveryUnicast
return &v
}(),
ServerPorts: &[2]int{27556, 27557},
ClientPorts: inTH.ClientPorts,
}.Marshal(),
},
})
require.NoError(t, err)
req, err = conn.ReadRequest()
require.NoError(t, err)
require.Equal(t, base.Play, req.Method)
err = conn.WriteResponse(&base.Response{
StatusCode: base.StatusOK,
})
require.NoError(t, err)
// skip firewall opening
buf := make([]byte, 2048)
_, _, err = l2.ReadFrom(buf)
require.NoError(t, err)
pkt := rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 946,
Timestamp: 54352,
SSRC: 753621,
},
Payload: []byte{1, 2, 3, 4},
}
byts, _ := pkt.Marshal()
_, err = l1.WriteTo(byts, &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: inTH.ClientPorts[0],
})
require.NoError(t, err)
// wait for the packet's SSRC to be saved
time.Sleep(100 * time.Millisecond)
sr := &rtcp.SenderReport{
SSRC: 753621,
NTPTime: ntpTimeGoToRTCP(time.Date(2017, 8, 12, 15, 30, 0, 0, time.UTC)),
RTPTime: 54352,
PacketCount: 1,
OctetCount: 4,
}
byts, _ = sr.Marshal()
_, err = l2.WriteTo(byts, &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: inTH.ClientPorts[1],
})
require.NoError(t, err)
time.Sleep(100 * time.Millisecond)
pkt = rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 947,
Timestamp: 54352 + 90000,
SSRC: 753621,
},
Payload: []byte{5, 6, 7, 8},
}
byts, _ = pkt.Marshal()
_, err = l1.WriteTo(byts, &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: inTH.ClientPorts[0],
})
require.NoError(t, err)
req, err = conn.ReadRequest()
require.NoError(t, err)
require.Equal(t, base.Teardown, req.Method)
err = conn.WriteResponse(&base.Response{
StatusCode: base.StatusOK,
})
require.NoError(t, err)
}()
c := Client{}
recv := make(chan struct{})
first := false
err = readAll(&c, "rtsp://localhost:8554/teststream",
func(medi *media.Media, forma format.Format, pkt *rtp.Packet) {
if !first {
first = true
} else {
ntp, ok := c.PacketNTP(medi, pkt)
require.Equal(t, true, ok)
require.Equal(t, time.Date(2017, 8, 12, 15, 30, 1, 0, time.UTC), ntp.UTC())
close(recv)
}
})
require.NoError(t, err)
defer c.Close()
<-recv
}

View File

@@ -5,6 +5,7 @@ import (
"crypto/tls" "crypto/tls"
"net" "net"
"strings" "strings"
"sync"
"testing" "testing"
"time" "time"
@@ -65,6 +66,11 @@ var testRTCPPacketMarshaled = func() []byte {
return byts return byts
}() }()
func ntpTimeGoToRTCP(v time.Time) uint64 {
s := uint64(v.UnixNano()) + 2208988800*1000000000
return (s/1000000000)<<32 | (s % 1000000000)
}
func record(c *Client, ur string, medias media.Medias, cb func(*media.Media, rtcp.Packet)) error { func record(c *Client, ur string, medias media.Medias, cb func(*media.Media, rtcp.Packet)) error {
u, err := url.Parse(ur) u, err := url.Parse(ur)
if err != nil { if err != nil {
@@ -1168,10 +1174,10 @@ func TestClientRecordRTCPReport(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, &rtcp.SenderReport{ require.Equal(t, &rtcp.SenderReport{
SSRC: 0x38F27A2F, SSRC: 0x38F27A2F,
NTPTime: packets[0].(*rtcp.SenderReport).NTPTime, NTPTime: ntpTimeGoToRTCP(time.Date(1996, 2, 13, 14, 33, 5, 0, time.UTC)),
RTPTime: packets[0].(*rtcp.SenderReport).RTPTime, RTPTime: 1300000 + 60*90000,
PacketCount: 2, PacketCount: 1,
OctetCount: 2, OctetCount: 1,
}, packets[0]) }, packets[0])
close(reportReceived) close(reportReceived)
@@ -1186,6 +1192,9 @@ func TestClientRecordRTCPReport(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
}() }()
var curTime time.Time
var curTimeMutex sync.Mutex
c := Client{ c := Client{
Transport: func() *Transport { Transport: func() *Transport {
if ca == "udp" { if ca == "udp" {
@@ -1195,7 +1204,12 @@ func TestClientRecordRTCPReport(t *testing.T) {
v := TransportTCP v := TransportTCP
return &v return &v
}(), }(),
senderReportPeriod: 500 * time.Millisecond, timeNow: func() time.Time {
curTimeMutex.Lock()
defer curTimeMutex.Unlock()
return curTime
},
senderReportPeriod: 100 * time.Millisecond,
} }
medi := testH264Media medi := testH264Media
@@ -1205,17 +1219,27 @@ func TestClientRecordRTCPReport(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer c.Close() defer c.Close()
for i := 0; i < 2; i++ { curTimeMutex.Lock()
err = c.WritePacketRTP(medi, &rtp.Packet{ curTime = time.Date(2013, 6, 10, 1, 0, 0, 0, time.UTC)
curTimeMutex.Unlock()
err = c.WritePacketRTPWithNTP(
medi,
&rtp.Packet{
Header: rtp.Header{ Header: rtp.Header{
Version: 2, Version: 2,
PayloadType: 96, PayloadType: 96,
SSRC: 0x38F27A2F, SSRC: 0x38F27A2F,
Timestamp: 1300000,
}, },
Payload: []byte{0x05}, // IDR Payload: []byte{0x05}, // IDR
}) },
require.NoError(t, err) time.Date(1996, 2, 13, 14, 32, 5, 0, time.UTC))
} require.NoError(t, err)
curTimeMutex.Lock()
curTime = time.Date(2013, 6, 10, 1, 1, 0, 0, time.UTC)
curTimeMutex.Unlock()
<-reportReceived <-reportReceived
}) })

View File

@@ -25,9 +25,8 @@ func randInRange(max int) (int, error) {
} }
type clientUDPListener struct { type clientUDPListener struct {
anyPortEnable bool c *Client
writeTimeout time.Duration pc net.PacketConn
pc net.PacketConn
readFunc readFunc readFunc readFunc
readIP net.IP readIP net.IP
@@ -40,11 +39,7 @@ type clientUDPListener struct {
done chan struct{} done chan struct{}
} }
func newClientUDPListenerPair( func newClientUDPListenerPair(c *Client) (*clientUDPListener, *clientUDPListener, error) {
listenPacket func(network, address string) (net.PacketConn, error),
anyPortEnable bool,
writeTimeout time.Duration,
) (*clientUDPListener, *clientUDPListener, error) {
// choose two consecutive ports in range 65535-10000 // choose two consecutive ports in range 65535-10000
// RTP port must be even and RTCP port odd // RTP port must be even and RTCP port odd
for { for {
@@ -55,9 +50,7 @@ func newClientUDPListenerPair(
rtpPort := v*2 + 10000 rtpPort := v*2 + 10000
rtpListener, err := newClientUDPListener( rtpListener, err := newClientUDPListener(
listenPacket, c,
anyPortEnable,
writeTimeout,
false, false,
net.JoinHostPort("", strconv.FormatInt(int64(rtpPort), 10)), net.JoinHostPort("", strconv.FormatInt(int64(rtpPort), 10)),
) )
@@ -67,9 +60,7 @@ func newClientUDPListenerPair(
rtcpPort := rtpPort + 1 rtcpPort := rtpPort + 1
rtcpListener, err := newClientUDPListener( rtcpListener, err := newClientUDPListener(
listenPacket, c,
anyPortEnable,
writeTimeout,
false, false,
net.JoinHostPort("", strconv.FormatInt(int64(rtcpPort), 10)), net.JoinHostPort("", strconv.FormatInt(int64(rtcpPort), 10)),
) )
@@ -83,9 +74,7 @@ func newClientUDPListenerPair(
} }
func newClientUDPListener( func newClientUDPListener(
listenPacket func(network, address string) (net.PacketConn, error), c *Client,
anyPortEnable bool,
writeTimeout time.Duration,
multicast bool, multicast bool,
address string, address string,
) (*clientUDPListener, error) { ) (*clientUDPListener, error) {
@@ -96,7 +85,7 @@ func newClientUDPListener(
return nil, err return nil, err
} }
tmp, err := listenPacket(restrictNetwork("udp", "224.0.0.0:"+port)) tmp, err := c.ListenPacket(restrictNetwork("udp", "224.0.0.0:"+port))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -115,7 +104,7 @@ func newClientUDPListener(
pc = tmp.(*net.UDPConn) pc = tmp.(*net.UDPConn)
} else { } else {
tmp, err := listenPacket(restrictNetwork("udp", address)) tmp, err := c.ListenPacket(restrictNetwork("udp", address))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -128,8 +117,7 @@ func newClientUDPListener(
} }
return &clientUDPListener{ return &clientUDPListener{
anyPortEnable: anyPortEnable, c: c,
writeTimeout: writeTimeout,
pc: pc, pc: pc,
lastPacketTime: int64Ptr(0), lastPacketTime: int64Ptr(0),
}, nil }, nil
@@ -176,13 +164,13 @@ func (u *clientUDPListener) run() {
// in case of anyPortEnable, store the port of the first packet we receive. // in case of anyPortEnable, store the port of the first packet we receive.
// this reduces security issues // this reduces security issues
if u.anyPortEnable && u.readPort == 0 { if u.c.AnyPortEnable && u.readPort == 0 {
u.readPort = uaddr.Port u.readPort = uaddr.Port
} else if u.readPort != uaddr.Port { } else if u.readPort != uaddr.Port {
continue continue
} }
now := time.Now() now := u.c.timeNow()
atomic.StoreInt64(u.lastPacketTime, now.Unix()) atomic.StoreInt64(u.lastPacketTime, now.Unix())
u.readFunc(buf[:n]) u.readFunc(buf[:n])
@@ -192,7 +180,7 @@ func (u *clientUDPListener) run() {
func (u *clientUDPListener) write(payload []byte) error { func (u *clientUDPListener) write(payload []byte) error {
// no mutex is needed here since Write() has an internal lock. // no mutex is needed here since Write() has an internal lock.
// https://github.com/golang/go/issues/27203#issuecomment-534386117 // https://github.com/golang/go/issues/27203#issuecomment-534386117
u.pc.SetWriteDeadline(time.Now().Add(u.writeTimeout)) u.pc.SetWriteDeadline(time.Now().Add(u.c.WriteTimeout))
_, err := u.pc.WriteTo(payload, u.writeAddr) _, err := u.pc.WriteTo(payload, u.writeAddr)
return err return err
} }

View File

@@ -10,6 +10,13 @@ import (
"github.com/pion/rtp" "github.com/pion/rtp"
) )
// seconds since 1st January 1900
// higher 32 bits are the integer part, lower 32 bits are the fractional part
func ntpTimeRTCPToGo(v uint64) time.Time {
nano := int64((v>>32)*1000000000+(v&0xFFFFFFFF)) - 2208988800*1000000000
return time.Unix(0, nano)
}
func randUint32() (uint32, error) { func randUint32() (uint32, error) {
var b [4]byte var b [4]byte
_, err := rand.Read(b[:]) _, err := rand.Read(b[:])
@@ -19,13 +26,12 @@ func randUint32() (uint32, error) {
return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]), nil return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]), nil
} }
var timeNow = time.Now
// RTCPReceiver is a utility to generate RTCP receiver reports. // RTCPReceiver is a utility to generate RTCP receiver reports.
type RTCPReceiver struct { type RTCPReceiver struct {
period time.Duration
receiverSSRC uint32
clockRate float64 clockRate float64
receiverSSRC uint32
period time.Duration
timeNow func() time.Time
writePacketRTCP func(rtcp.Packet) writePacketRTCP func(rtcp.Packet)
mutex sync.Mutex mutex sync.Mutex
@@ -36,16 +42,17 @@ type RTCPReceiver struct {
lastSSRC uint32 lastSSRC uint32
lastSequenceNumber uint16 lastSequenceNumber uint16
lastTimeRTP uint32 lastTimeRTP uint32
lastTimeNTP time.Time lastTimeSystem time.Time
totalLost uint32 totalLost uint32
totalLostSinceReport uint32 totalLostSinceReport uint32
totalSinceReport uint32 totalSinceReport uint32
jitter float64 jitter float64
// data from RTCP packets // data from RTCP packets
firstSenderReportReceived bool firstSenderReportReceived bool
lastSenderReportNTP uint32 lastSenderReportTimeNTP uint64
lastSenderReportTime time.Time lastSenderReportTimeRTP uint32
lastSenderReportTimeSystem time.Time
terminate chan struct{} terminate chan struct{}
done chan struct{} done chan struct{}
@@ -53,9 +60,10 @@ type RTCPReceiver struct {
// New allocates a RTCPReceiver. // New allocates a RTCPReceiver.
func New( func New(
period time.Duration,
receiverSSRC *uint32,
clockRate int, clockRate int,
receiverSSRC *uint32,
period time.Duration,
timeNow func() time.Time,
writePacketRTCP func(rtcp.Packet), writePacketRTCP func(rtcp.Packet),
) (*RTCPReceiver, error) { ) (*RTCPReceiver, error) {
if receiverSSRC == nil { if receiverSSRC == nil {
@@ -66,10 +74,15 @@ func New(
receiverSSRC = &v receiverSSRC = &v
} }
if timeNow == nil {
timeNow = time.Now
}
rr := &RTCPReceiver{ rr := &RTCPReceiver{
period: period,
receiverSSRC: *receiverSSRC,
clockRate: float64(clockRate), clockRate: float64(clockRate),
receiverSSRC: *receiverSSRC,
period: period,
timeNow: timeNow,
writePacketRTCP: writePacketRTCP, writePacketRTCP: writePacketRTCP,
terminate: make(chan struct{}), terminate: make(chan struct{}),
done: make(chan struct{}), done: make(chan struct{}),
@@ -95,7 +108,7 @@ func (rr *RTCPReceiver) run() {
for { for {
select { select {
case <-t.C: case <-t.C:
report := rr.report(timeNow()) report := rr.report()
if report != nil { if report != nil {
rr.writePacketRTCP(report) rr.writePacketRTCP(report)
} }
@@ -106,14 +119,16 @@ func (rr *RTCPReceiver) run() {
} }
} }
func (rr *RTCPReceiver) report(ts time.Time) rtcp.Packet { func (rr *RTCPReceiver) report() rtcp.Packet {
rr.mutex.Lock() rr.mutex.Lock()
defer rr.mutex.Unlock() defer rr.mutex.Unlock()
if !rr.firstRTPPacketReceived || rr.clockRate == 0 { if !rr.firstRTPPacketReceived {
return nil return nil
} }
system := rr.timeNow()
report := &rtcp.ReceiverReport{ report := &rtcp.ReceiverReport{
SSRC: rr.receiverSSRC, SSRC: rr.receiverSSRC,
Reports: []rtcp.ReceptionReport{ Reports: []rtcp.ReceptionReport{
@@ -131,12 +146,12 @@ func (rr *RTCPReceiver) report(ts time.Time) rtcp.Packet {
if rr.firstSenderReportReceived { if rr.firstSenderReportReceived {
// middle 32 bits out of 64 in the NTP timestamp of last sender report // middle 32 bits out of 64 in the NTP timestamp of last sender report
report.Reports[0].LastSenderReport = rr.lastSenderReportNTP report.Reports[0].LastSenderReport = uint32(rr.lastSenderReportTimeNTP >> 16)
// delay, expressed in units of 1/65536 seconds, between // delay, expressed in units of 1/65536 seconds, between
// receiving the last SR packet from source SSRC_n and sending this // receiving the last SR packet from source SSRC_n and sending this
// reception report block // reception report block
report.Reports[0].Delay = uint32(ts.Sub(rr.lastSenderReportTime).Seconds() * 65536) report.Reports[0].Delay = uint32(system.Sub(rr.lastSenderReportTimeSystem).Seconds() * 65536)
} }
rr.totalLostSinceReport = 0 rr.totalLostSinceReport = 0
@@ -146,7 +161,7 @@ func (rr *RTCPReceiver) report(ts time.Time) rtcp.Packet {
} }
// ProcessPacket extracts the needed data from RTP packets. // ProcessPacket extracts the needed data from RTP packets.
func (rr *RTCPReceiver) ProcessPacket(pkt *rtp.Packet, ntp time.Time, ptsEqualsDTS bool) { func (rr *RTCPReceiver) ProcessPacket(pkt *rtp.Packet, system time.Time, ptsEqualsDTS bool) {
rr.mutex.Lock() rr.mutex.Lock()
defer rr.mutex.Unlock() defer rr.mutex.Unlock()
@@ -160,7 +175,7 @@ func (rr *RTCPReceiver) ProcessPacket(pkt *rtp.Packet, ntp time.Time, ptsEqualsD
if ptsEqualsDTS { if ptsEqualsDTS {
rr.timeInitialized = true rr.timeInitialized = true
rr.lastTimeRTP = pkt.Timestamp rr.lastTimeRTP = pkt.Timestamp
rr.lastTimeNTP = ntp rr.lastTimeSystem = system
} }
// subsequent packets // subsequent packets
@@ -194,7 +209,7 @@ func (rr *RTCPReceiver) ProcessPacket(pkt *rtp.Packet, ntp time.Time, ptsEqualsD
if rr.timeInitialized { if rr.timeInitialized {
// update jitter // update jitter
// https://tools.ietf.org/html/rfc3550#page-39 // https://tools.ietf.org/html/rfc3550#page-39
D := ntp.Sub(rr.lastTimeNTP).Seconds()*rr.clockRate - D := system.Sub(rr.lastTimeSystem).Seconds()*rr.clockRate -
(float64(pkt.Timestamp) - float64(rr.lastTimeRTP)) (float64(pkt.Timestamp) - float64(rr.lastTimeRTP))
if D < 0 { if D < 0 {
D = -D D = -D
@@ -204,20 +219,21 @@ func (rr *RTCPReceiver) ProcessPacket(pkt *rtp.Packet, ntp time.Time, ptsEqualsD
rr.timeInitialized = true rr.timeInitialized = true
rr.lastTimeRTP = pkt.Timestamp rr.lastTimeRTP = pkt.Timestamp
rr.lastTimeNTP = ntp rr.lastTimeSystem = system
rr.lastSSRC = pkt.SSRC rr.lastSSRC = pkt.SSRC
} }
} }
} }
// ProcessSenderReport extracts the needed data from RTCP sender reports. // ProcessSenderReport extracts the needed data from RTCP sender reports.
func (rr *RTCPReceiver) ProcessSenderReport(sr *rtcp.SenderReport, ts time.Time) { func (rr *RTCPReceiver) ProcessSenderReport(sr *rtcp.SenderReport, system time.Time) {
rr.mutex.Lock() rr.mutex.Lock()
defer rr.mutex.Unlock() defer rr.mutex.Unlock()
rr.firstSenderReportReceived = true rr.firstSenderReportReceived = true
rr.lastSenderReportNTP = uint32(sr.NTPTime >> 16) rr.lastSenderReportTimeNTP = sr.NTPTime
rr.lastSenderReportTime = ts rr.lastSenderReportTimeRTP = sr.RTPTime
rr.lastSenderReportTimeSystem = system
} }
// LastSSRC returns the SSRC of the last RTP packet. // LastSSRC returns the SSRC of the last RTP packet.
@@ -226,3 +242,18 @@ func (rr *RTCPReceiver) LastSSRC() (uint32, bool) {
defer rr.mutex.Unlock() defer rr.mutex.Unlock()
return rr.lastSSRC, rr.firstRTPPacketReceived return rr.lastSSRC, rr.firstRTPPacketReceived
} }
// PacketNTP returns the NTP timestamp of the packet.
func (rr *RTCPReceiver) PacketNTP(ts uint32) (time.Time, bool) {
rr.mutex.Lock()
defer rr.mutex.Unlock()
if !rr.firstSenderReportReceived {
return time.Time{}, false
}
timeDiff := int32(ts - rr.lastSenderReportTimeRTP)
timeDiffGo := (time.Duration(timeDiff) * time.Second) / time.Duration(rr.clockRate)
return ntpTimeRTCPToGo(rr.lastSenderReportTimeNTP).Add(timeDiffGo), true
}

View File

@@ -9,14 +9,20 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestRTCPReceiverBase(t *testing.T) { func uint32Ptr(v uint32) *uint32 {
timeNow = func() time.Time { return &v
return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC) }
}
done := make(chan struct{})
v := uint32(0x65f83afb)
rr, err := New(500*time.Millisecond, &v, 90000, func TestRTCPReceiverBase(t *testing.T) {
done := make(chan struct{})
rr, err := New(
90000,
uint32Ptr(0x65f83afb),
500*time.Millisecond,
func() time.Time {
return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC)
},
func(pkt rtcp.Packet) { func(pkt rtcp.Packet) {
require.Equal(t, &rtcp.ReceiverReport{ require.Equal(t, &rtcp.ReceiverReport{
SSRC: 0x65f83afb, SSRC: 0x65f83afb,
@@ -77,25 +83,28 @@ func TestRTCPReceiverBase(t *testing.T) {
func TestRTCPReceiverOverflow(t *testing.T) { func TestRTCPReceiverOverflow(t *testing.T) {
done := make(chan struct{}) done := make(chan struct{})
timeNow = func() time.Time {
return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC)
}
v := uint32(0x65f83afb)
rr, err := New(250*time.Millisecond, &v, 90000, func(pkt rtcp.Packet) { rr, err := New(
require.Equal(t, &rtcp.ReceiverReport{ 90000,
SSRC: 0x65f83afb, uint32Ptr(0x65f83afb),
Reports: []rtcp.ReceptionReport{ 250*time.Millisecond,
{ func() time.Time {
SSRC: 0xba9da416, return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC)
LastSequenceNumber: 1 << 16, },
LastSenderReport: 0x887a17ce, func(pkt rtcp.Packet) {
Delay: 1 * 65536, require.Equal(t, &rtcp.ReceiverReport{
SSRC: 0x65f83afb,
Reports: []rtcp.ReceptionReport{
{
SSRC: 0xba9da416,
LastSequenceNumber: 1 << 16,
LastSenderReport: 0x887a17ce,
Delay: 1 * 65536,
},
}, },
}, }, pkt)
}, pkt) close(done)
close(done) })
})
require.NoError(t, err) require.NoError(t, err)
defer rr.Close() defer rr.Close()
@@ -144,30 +153,33 @@ func TestRTCPReceiverOverflow(t *testing.T) {
func TestRTCPReceiverPacketLost(t *testing.T) { func TestRTCPReceiverPacketLost(t *testing.T) {
done := make(chan struct{}) done := make(chan struct{})
timeNow = func() time.Time {
return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC)
}
v := uint32(0x65f83afb)
rr, err := New(500*time.Millisecond, &v, 90000, func(pkt rtcp.Packet) { rr, err := New(
require.Equal(t, &rtcp.ReceiverReport{ 90000,
SSRC: 0x65f83afb, uint32Ptr(0x65f83afb),
Reports: []rtcp.ReceptionReport{ 500*time.Millisecond,
{ func() time.Time {
SSRC: 0xba9da416, return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC)
LastSequenceNumber: 0x0122, },
LastSenderReport: 0x887a17ce, func(pkt rtcp.Packet) {
FractionLost: func() uint8 { require.Equal(t, &rtcp.ReceiverReport{
v := float64(1) / 3 SSRC: 0x65f83afb,
return uint8(v * 256) Reports: []rtcp.ReceptionReport{
}(), {
TotalLost: 1, SSRC: 0xba9da416,
Delay: 1 * 65536, LastSequenceNumber: 0x0122,
LastSenderReport: 0x887a17ce,
FractionLost: func() uint8 {
v := float64(1) / 3
return uint8(v * 256)
}(),
TotalLost: 1,
Delay: 1 * 65536,
},
}, },
}, }, pkt)
}, pkt) close(done)
close(done) })
})
require.NoError(t, err) require.NoError(t, err)
defer rr.Close() defer rr.Close()
@@ -214,30 +226,33 @@ func TestRTCPReceiverPacketLost(t *testing.T) {
func TestRTCPReceiverOverflowPacketLost(t *testing.T) { func TestRTCPReceiverOverflowPacketLost(t *testing.T) {
done := make(chan struct{}) done := make(chan struct{})
timeNow = func() time.Time {
return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC)
}
v := uint32(0x65f83afb)
rr, err := New(500*time.Millisecond, &v, 90000, func(pkt rtcp.Packet) { rr, err := New(
require.Equal(t, &rtcp.ReceiverReport{ 90000,
SSRC: 0x65f83afb, uint32Ptr(0x65f83afb),
Reports: []rtcp.ReceptionReport{ 500*time.Millisecond,
{ func() time.Time {
SSRC: 0xba9da416, return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC)
LastSequenceNumber: 1<<16 | 0x0002, },
LastSenderReport: 0x887a17ce, func(pkt rtcp.Packet) {
FractionLost: func() uint8 { require.Equal(t, &rtcp.ReceiverReport{
v := float64(2) / 4 SSRC: 0x65f83afb,
return uint8(v * 256) Reports: []rtcp.ReceptionReport{
}(), {
TotalLost: 2, SSRC: 0xba9da416,
Delay: 1 * 65536, LastSequenceNumber: 1<<16 | 0x0002,
LastSenderReport: 0x887a17ce,
FractionLost: func() uint8 {
v := float64(2) / 4
return uint8(v * 256)
}(),
TotalLost: 2,
Delay: 1 * 65536,
},
}, },
}, }, pkt)
}, pkt) close(done)
close(done) })
})
require.NoError(t, err) require.NoError(t, err)
defer rr.Close() defer rr.Close()
@@ -284,26 +299,29 @@ func TestRTCPReceiverOverflowPacketLost(t *testing.T) {
func TestRTCPReceiverJitter(t *testing.T) { func TestRTCPReceiverJitter(t *testing.T) {
done := make(chan struct{}) done := make(chan struct{})
timeNow = func() time.Time {
return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC)
}
v := uint32(0x65f83afb)
rr, err := New(500*time.Millisecond, &v, 90000, func(pkt rtcp.Packet) { rr, err := New(
require.Equal(t, &rtcp.ReceiverReport{ 90000,
SSRC: 0x65f83afb, uint32Ptr(0x65f83afb),
Reports: []rtcp.ReceptionReport{ 500*time.Millisecond,
{ func() time.Time {
SSRC: 0xba9da416, return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC)
LastSequenceNumber: 948, },
LastSenderReport: 0x887a17ce, func(pkt rtcp.Packet) {
Delay: 2 * 65536, require.Equal(t, &rtcp.ReceiverReport{
Jitter: 45000 / 16, SSRC: 0x65f83afb,
Reports: []rtcp.ReceptionReport{
{
SSRC: 0xba9da416,
LastSequenceNumber: 948,
LastSenderReport: 0x887a17ce,
Delay: 2 * 65536,
Jitter: 45000 / 16,
},
}, },
}, }, pkt)
}, pkt) close(done)
close(done) })
})
require.NoError(t, err) require.NoError(t, err)
defer rr.Close() defer rr.Close()

View File

@@ -9,8 +9,6 @@ import (
"github.com/pion/rtp" "github.com/pion/rtp"
) )
var timeNow = time.Now
// seconds since 1st January 1900 // seconds since 1st January 1900
// higher 32 bits are the integer part, lower 32 bits are the fractional part // higher 32 bits are the integer part, lower 32 bits are the fractional part
func ntpTimeGoToRTCP(v time.Time) uint64 { func ntpTimeGoToRTCP(v time.Time) uint64 {
@@ -22,6 +20,7 @@ func ntpTimeGoToRTCP(v time.Time) uint64 {
type RTCPSender struct { type RTCPSender struct {
clockRate float64 clockRate float64
period time.Duration period time.Duration
timeNow func() time.Time
writePacketRTCP func(rtcp.Packet) writePacketRTCP func(rtcp.Packet)
mutex sync.Mutex mutex sync.Mutex
@@ -43,11 +42,17 @@ type RTCPSender struct {
func New( func New(
clockRate int, clockRate int,
period time.Duration, period time.Duration,
timeNow func() time.Time,
writePacketRTCP func(rtcp.Packet), writePacketRTCP func(rtcp.Packet),
) *RTCPSender { ) *RTCPSender {
if timeNow == nil {
timeNow = time.Now
}
rs := &RTCPSender{ rs := &RTCPSender{
clockRate: float64(clockRate), clockRate: float64(clockRate),
period: period, period: period,
timeNow: timeNow,
writePacketRTCP: writePacketRTCP, writePacketRTCP: writePacketRTCP,
terminate: make(chan struct{}), terminate: make(chan struct{}),
done: make(chan struct{}), done: make(chan struct{}),
@@ -92,7 +97,7 @@ func (rs *RTCPSender) report() rtcp.Packet {
return nil return nil
} }
systemTimeDiff := timeNow().Sub(rs.lastTimeSystem) systemTimeDiff := rs.timeNow().Sub(rs.lastTimeSystem)
ntpTime := rs.lastTimeNTP.Add(systemTimeDiff) ntpTime := rs.lastTimeNTP.Add(systemTimeDiff)
rtpTime := rs.lastTimeRTP + uint32(systemTimeDiff.Seconds()*rs.clockRate) rtpTime := rs.lastTimeRTP + uint32(systemTimeDiff.Seconds()*rs.clockRate)
@@ -114,7 +119,7 @@ func (rs *RTCPSender) ProcessPacket(pkt *rtp.Packet, ntp time.Time, ptsEqualsDTS
rs.initialized = true rs.initialized = true
rs.lastTimeRTP = pkt.Timestamp rs.lastTimeRTP = pkt.Timestamp
rs.lastTimeNTP = ntp rs.lastTimeNTP = ntp
rs.lastTimeSystem = timeNow() rs.lastTimeSystem = rs.timeNow()
} }
rs.lastSSRC = pkt.SSRC rs.lastSSRC = pkt.SSRC

View File

@@ -20,17 +20,16 @@ func TestRTCPSender(t *testing.T) {
curTime = v curTime = v
} }
timeNow = func() time.Time {
mutex.Lock()
defer mutex.Unlock()
return curTime
}
sent := make(chan struct{}) sent := make(chan struct{})
rs := New( rs := New(
90000, 90000,
100*time.Millisecond, 100*time.Millisecond,
func() time.Time {
mutex.Lock()
defer mutex.Unlock()
return curTime
},
func(pkt rtcp.Packet) { func(pkt rtcp.Packet) {
require.Equal(t, &rtcp.SenderReport{ require.Equal(t, &rtcp.SenderReport{
SSRC: 0xba9da416, SSRC: 0xba9da416,

View File

@@ -118,10 +118,11 @@ type Server struct {
// private // private
// //
udpReceiverReportPeriod time.Duration timeNow func() time.Time
senderReportPeriod time.Duration senderReportPeriod time.Duration
sessionTimeout time.Duration receiverReportPeriod time.Duration
checkStreamPeriod time.Duration sessionTimeout time.Duration
checkStreamPeriod time.Duration
ctx context.Context ctx context.Context
ctxCancel func() ctxCancel func()
@@ -176,12 +177,15 @@ func (s *Server) Start() error {
} }
// private // private
if s.udpReceiverReportPeriod == 0 { if s.timeNow == nil {
s.udpReceiverReportPeriod = 10 * time.Second s.timeNow = time.Now
} }
if s.senderReportPeriod == 0 { if s.senderReportPeriod == 0 {
s.senderReportPeriod = 10 * time.Second s.senderReportPeriod = 10 * time.Second
} }
if s.receiverReportPeriod == 0 {
s.receiverReportPeriod = 10 * time.Second
}
if s.sessionTimeout == 0 { if s.sessionTimeout == 0 {
s.sessionTimeout = 1 * 60 * time.Second s.sessionTimeout = 1 * 60 * time.Second
} }

View File

@@ -6,6 +6,7 @@ import (
"net" "net"
"strconv" "strconv"
"strings" "strings"
"sync"
"sync/atomic" "sync/atomic"
"testing" "testing"
"time" "time"
@@ -977,6 +978,9 @@ func TestServerPlayRTCPReport(t *testing.T) {
t.Run(ca, func(t *testing.T) { t.Run(ca, func(t *testing.T) {
var stream *ServerStream var stream *ServerStream
var curTime time.Time
var curTimeMutex sync.Mutex
s := &Server{ s := &Server{
Handler: &testServerHandler{ Handler: &testServerHandler{
onDescribe: func(ctx *ServerHandlerOnDescribeCtx) (*base.Response, *ServerStream, error) { onDescribe: func(ctx *ServerHandlerOnDescribeCtx) (*base.Response, *ServerStream, error) {
@@ -995,10 +999,15 @@ func TestServerPlayRTCPReport(t *testing.T) {
}, nil }, nil
}, },
}, },
senderReportPeriod: 1 * time.Second, RTSPAddress: "localhost:8554",
RTSPAddress: "localhost:8554", UDPRTPAddress: "127.0.0.1:8000",
UDPRTPAddress: "127.0.0.1:8000", UDPRTCPAddress: "127.0.0.1:8001",
UDPRTCPAddress: "127.0.0.1:8001", timeNow: func() time.Time {
curTimeMutex.Lock()
defer curTimeMutex.Unlock()
return curTime
},
senderReportPeriod: 100 * time.Millisecond,
} }
err := s.Start() err := s.Start()
@@ -1052,17 +1061,27 @@ func TestServerPlayRTCPReport(t *testing.T) {
doPlay(t, conn, "rtsp://localhost:8554/teststream", session) doPlay(t, conn, "rtsp://localhost:8554/teststream", session)
for i := 0; i < 2; i++ { curTimeMutex.Lock()
err := stream.WritePacketRTP(stream.Medias()[0], &rtp.Packet{ curTime = time.Date(2014, 6, 7, 15, 0, 0, 0, time.UTC)
curTimeMutex.Unlock()
err = stream.WritePacketRTPWithNTP(
stream.Medias()[0],
&rtp.Packet{
Header: rtp.Header{ Header: rtp.Header{
Version: 2, Version: 2,
PayloadType: 96, PayloadType: 96,
SSRC: 0x38F27A2F, SSRC: 0x38F27A2F,
Timestamp: 240000,
}, },
Payload: []byte{0x05}, // IDR Payload: []byte{0x05}, // IDR
}) },
require.NoError(t, err) time.Date(2017, 8, 10, 12, 22, 0, 0, time.UTC))
} require.NoError(t, err)
curTimeMutex.Lock()
curTime = time.Date(2014, 6, 7, 15, 0, 30, 0, time.UTC)
curTimeMutex.Unlock()
var buf []byte var buf []byte
@@ -1073,10 +1092,8 @@ func TestServerPlayRTCPReport(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
buf = buf[:n] buf = buf[:n]
} else { } else {
for i := 0; i < 2; i++ { _, err := conn.ReadInterleavedFrame()
_, err := conn.ReadInterleavedFrame() require.NoError(t, err)
require.NoError(t, err)
}
f, err := conn.ReadInterleavedFrame() f, err := conn.ReadInterleavedFrame()
require.NoError(t, err) require.NoError(t, err)
@@ -1088,10 +1105,10 @@ func TestServerPlayRTCPReport(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, &rtcp.SenderReport{ require.Equal(t, &rtcp.SenderReport{
SSRC: 0x38F27A2F, SSRC: 0x38F27A2F,
NTPTime: packets[0].(*rtcp.SenderReport).NTPTime, NTPTime: ntpTimeGoToRTCP(time.Date(2017, 8, 10, 12, 22, 30, 0, time.UTC)),
RTPTime: packets[0].(*rtcp.SenderReport).RTPTime, RTPTime: 240000 + 90000*30,
PacketCount: 2, PacketCount: 1,
OctetCount: 2, OctetCount: 1,
}, packets[0]) }, packets[0])
doTeardown(t, conn, "rtsp://localhost:8554/teststream", session) doTeardown(t, conn, "rtsp://localhost:8554/teststream", session)

View File

@@ -818,10 +818,10 @@ func TestServerRecordRTCPReport(t *testing.T) {
}, nil }, nil
}, },
}, },
udpReceiverReportPeriod: 1 * time.Second, UDPRTPAddress: "127.0.0.1:8000",
UDPRTPAddress: "127.0.0.1:8000", UDPRTCPAddress: "127.0.0.1:8001",
UDPRTCPAddress: "127.0.0.1:8001", RTSPAddress: "localhost:8554",
RTSPAddress: "localhost:8554", receiverReportPeriod: 500 * time.Millisecond,
} }
err := s.Start() err := s.Start()
@@ -874,7 +874,7 @@ func TestServerRecordRTCPReport(t *testing.T) {
Timestamp: 54352, Timestamp: 54352,
SSRC: 753621, SSRC: 753621,
}, },
Payload: []byte{0x01, 0x02, 0x03, 0x04}, Payload: []byte{1, 2, 3, 4},
}).Marshal() }).Marshal()
_, err = l1.WriteTo(byts, &net.UDPAddr{ _, err = l1.WriteTo(byts, &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"), IP: net.ParseIP("127.0.0.1"),
@@ -883,11 +883,11 @@ func TestServerRecordRTCPReport(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// wait for the packet's SSRC to be saved // wait for the packet's SSRC to be saved
time.Sleep(500 * time.Millisecond) time.Sleep(200 * time.Millisecond)
byts, _ = (&rtcp.SenderReport{ byts, _ = (&rtcp.SenderReport{
SSRC: 753621, SSRC: 753621,
NTPTime: 0xcbddcc34999997ff, NTPTime: ntpTimeGoToRTCP(time.Date(2018, 2, 20, 19, 0, 0, 0, time.UTC)),
RTPTime: 54352, RTPTime: 54352,
PacketCount: 1, PacketCount: 1,
OctetCount: 4, OctetCount: 4,
@@ -916,7 +916,7 @@ func TestServerRecordRTCPReport(t *testing.T) {
{ {
SSRC: rr.Reports[0].SSRC, SSRC: rr.Reports[0].SSRC,
LastSequenceNumber: 534, LastSequenceNumber: 534,
LastSenderReport: rr.Reports[0].LastSenderReport, LastSenderReport: 4004511744,
Delay: rr.Reports[0].Delay, Delay: rr.Reports[0].Delay,
}, },
}, },
@@ -1418,3 +1418,137 @@ func TestServerRecordDecodeErrors(t *testing.T) {
}) })
} }
} }
func TestServerRecordPacketNTP(t *testing.T) {
recv := make(chan struct{})
first := false
s := &Server{
Handler: &testServerHandler{
onAnnounce: func(ctx *ServerHandlerOnAnnounceCtx) (*base.Response, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, nil, nil
},
onRecord: func(ctx *ServerHandlerOnRecordCtx) (*base.Response, error) {
ctx.Session.OnPacketRTPAny(func(medi *media.Media, forma format.Format, pkt *rtp.Packet) {
if !first {
first = true
} else {
ntp, ok := ctx.Session.PacketNTP(medi, pkt)
require.Equal(t, true, ok)
require.Equal(t, time.Date(2018, 2, 20, 19, 0, 1, 0, time.UTC), ntp.UTC())
close(recv)
}
})
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
},
UDPRTPAddress: "127.0.0.1:8000",
UDPRTCPAddress: "127.0.0.1:8001",
RTSPAddress: "localhost:8554",
}
err := s.Start()
require.NoError(t, err)
defer s.Close()
nconn, err := net.Dial("tcp", "localhost:8554")
require.NoError(t, err)
defer nconn.Close()
conn := conn.NewConn(nconn)
medias := media.Medias{testH264Media}
resetMediaControls(medias)
doAnnounce(t, conn, "rtsp://localhost:8554/teststream", medias)
l1, err := net.ListenPacket("udp", "localhost:34556")
require.NoError(t, err)
defer l1.Close()
l2, err := net.ListenPacket("udp", "localhost:34557")
require.NoError(t, err)
defer l2.Close()
inTH := &headers.Transport{
Delivery: func() *headers.TransportDelivery {
v := headers.TransportDeliveryUnicast
return &v
}(),
Mode: func() *headers.TransportMode {
v := headers.TransportModeRecord
return &v
}(),
Protocol: headers.TransportProtocolUDP,
ClientPorts: &[2]int{34556, 34557},
}
res, th := doSetup(t, conn, "rtsp://localhost:8554/teststream/"+medias[0].Control, inTH, "")
session := readSession(t, res)
doRecord(t, conn, "rtsp://localhost:8554/teststream", session)
byts, _ := (&rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 534,
Timestamp: 54352,
SSRC: 753621,
},
Payload: []byte{1, 2, 3, 4},
}).Marshal()
_, err = l1.WriteTo(byts, &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: th.ServerPorts[0],
})
require.NoError(t, err)
// wait for the packet's SSRC to be saved
time.Sleep(100 * time.Millisecond)
byts, _ = (&rtcp.SenderReport{
SSRC: 753621,
NTPTime: ntpTimeGoToRTCP(time.Date(2018, 2, 20, 19, 0, 0, 0, time.UTC)),
RTPTime: 54352,
PacketCount: 1,
OctetCount: 4,
}).Marshal()
_, err = l2.WriteTo(byts, &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: th.ServerPorts[1],
})
require.NoError(t, err)
time.Sleep(100 * time.Millisecond)
byts, _ = (&rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 535,
Timestamp: 54352 + 90000,
SSRC: 753621,
},
Payload: []byte{1, 2, 3, 4},
}).Marshal()
_, err = l1.WriteTo(byts, &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: th.ServerPorts[0],
})
require.NoError(t, err)
<-recv
}

View File

@@ -181,7 +181,7 @@ func newServerSession(
bytesReceived: new(uint64), bytesReceived: new(uint64),
bytesSent: new(uint64), bytesSent: new(uint64),
conns: make(map[*ServerConn]struct{}), conns: make(map[*ServerConn]struct{}),
lastRequestTime: time.Now(), lastRequestTime: s.timeNow(),
udpCheckStreamTimer: emptyTimer(), udpCheckStreamTimer: emptyTimer(),
chHandleRequest: make(chan sessionRequestReq), chHandleRequest: make(chan sessionRequestReq),
chRemoveConn: make(chan *ServerConn), chRemoveConn: make(chan *ServerConn),
@@ -326,7 +326,7 @@ func (ss *ServerSession) runInner() error {
for { for {
select { select {
case req := <-ss.chHandleRequest: case req := <-ss.chHandleRequest:
ss.lastRequestTime = time.Now() ss.lastRequestTime = ss.s.timeNow()
if _, ok := ss.conns[req.sc]; !ok { if _, ok := ss.conns[req.sc]; !ok {
ss.conns[req.sc] = struct{}{} ss.conns[req.sc] = struct{}{}
@@ -402,7 +402,7 @@ func (ss *ServerSession) runInner() error {
} }
case <-ss.udpCheckStreamTimer.C: case <-ss.udpCheckStreamTimer.C:
now := time.Now() now := ss.s.timeNow()
lft := atomic.LoadInt64(ss.udpLastPacketTime) lft := atomic.LoadInt64(ss.udpLastPacketTime)
@@ -873,7 +873,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
ss.state = ServerSessionStatePlay ss.state = ServerSessionStatePlay
v := time.Now().Unix() v := ss.s.timeNow().Unix()
ss.udpLastPacketTime = &v ss.udpLastPacketTime = &v
for _, sm := range ss.setuppedMedias { for _, sm := range ss.setuppedMedias {
@@ -897,7 +897,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
} }
var ri headers.RTPInfo var ri headers.RTPInfo
now := time.Now() now := ss.s.timeNow()
for _, sm := range ss.setuppedMediasOrdered { for _, sm := range ss.setuppedMediasOrdered {
entry := ss.setuppedStream.rtpInfoEntry(sm.media, now) entry := ss.setuppedStream.rtpInfoEntry(sm.media, now)
@@ -965,7 +965,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
ss.state = ServerSessionStateRecord ss.state = ServerSessionStateRecord
v := time.Now().Unix() v := ss.s.timeNow().Unix()
ss.udpLastPacketTime = &v ss.udpLastPacketTime = &v
for _, sm := range ss.setuppedMedias { for _, sm := range ss.setuppedMedias {
@@ -1186,6 +1186,14 @@ func (ss *ServerSession) WritePacketRTCP(medi *media.Media, pkt rtcp.Packet) err
return nil return nil
} }
// PacketNTP returns the NTP timestamp of an incoming RTP packet.
// The NTP timestamp is computed from sender reports.
func (ss *ServerSession) PacketNTP(medi *media.Media, pkt *rtp.Packet) (time.Time, bool) {
sm := ss.setuppedMedias[medi]
sf := sm.formats[pkt.PayloadType]
return sf.rtcpReceiver.PacketNTP(pkt.Timestamp)
}
func (ss *ServerSession) handleRequest(req sessionRequestReq) (*base.Response, *ServerSession, error) { func (ss *ServerSession) handleRequest(req sessionRequestReq) (*base.Response, *ServerSession, error) {
select { select {
case ss.chHandleRequest <- req: case ss.chHandleRequest <- req:

View File

@@ -18,7 +18,7 @@ type serverSessionFormat struct {
format format.Format format format.Format
udpReorderer *rtpreorderer.Reorderer udpReorderer *rtpreorderer.Reorderer
tcpLossDetector *rtplossdetector.LossDetector tcpLossDetector *rtplossdetector.LossDetector
udpRTCPReceiver *rtcpreceiver.RTCPReceiver rtcpReceiver *rtcpreceiver.RTCPReceiver
onPacketRTP OnPacketRTPFunc onPacketRTP OnPacketRTPFunc
} }
@@ -34,27 +34,31 @@ func (sf *serverSessionFormat) start() {
if sf.sm.ss.state != ServerSessionStatePlay { if sf.sm.ss.state != ServerSessionStatePlay {
if *sf.sm.ss.setuppedTransport == TransportUDP || *sf.sm.ss.setuppedTransport == TransportUDPMulticast { if *sf.sm.ss.setuppedTransport == TransportUDP || *sf.sm.ss.setuppedTransport == TransportUDPMulticast {
sf.udpReorderer = rtpreorderer.New() sf.udpReorderer = rtpreorderer.New()
var err error
sf.udpRTCPReceiver, err = rtcpreceiver.New(
sf.sm.ss.s.udpReceiverReportPeriod,
nil,
sf.format.ClockRate(),
func(pkt rtcp.Packet) {
sf.sm.ss.WritePacketRTCP(sf.sm.media, pkt) //nolint:errcheck
})
if err != nil {
panic(err)
}
} else { } else {
sf.tcpLossDetector = rtplossdetector.New() sf.tcpLossDetector = rtplossdetector.New()
} }
var err error
sf.rtcpReceiver, err = rtcpreceiver.New(
sf.format.ClockRate(),
nil,
sf.sm.ss.s.receiverReportPeriod,
sf.sm.ss.s.timeNow,
func(pkt rtcp.Packet) {
if *sf.sm.ss.setuppedTransport == TransportUDP || *sf.sm.ss.setuppedTransport == TransportUDPMulticast {
sf.sm.ss.WritePacketRTCP(sf.sm.media, pkt) //nolint:errcheck
}
})
if err != nil {
panic(err)
}
} }
} }
func (sf *serverSessionFormat) stop() { func (sf *serverSessionFormat) stop() {
if sf.udpRTCPReceiver != nil { if sf.rtcpReceiver != nil {
sf.udpRTCPReceiver.Close() sf.rtcpReceiver.Close()
sf.udpRTCPReceiver = nil sf.rtcpReceiver = nil
} }
} }
@@ -73,7 +77,7 @@ func (sf *serverSessionFormat) readRTPUDP(pkt *rtp.Packet, now time.Time) {
} }
for _, pkt := range packets { for _, pkt := range packets {
sf.udpRTCPReceiver.ProcessPacket(pkt, now, sf.format.PTSEqualsDTS(pkt)) sf.rtcpReceiver.ProcessPacket(pkt, now, sf.format.PTSEqualsDTS(pkt))
sf.onPacketRTP(pkt) sf.onPacketRTP(pkt)
} }
} }
@@ -92,5 +96,7 @@ func (sf *serverSessionFormat) readRTPTCP(pkt *rtp.Packet) {
// do not return // do not return
} }
now := sf.sm.ss.s.timeNow()
sf.rtcpReceiver.ProcessPacket(pkt, now, sf.format.PTSEqualsDTS(pkt))
sf.onPacketRTP(pkt) sf.onPacketRTP(pkt)
} }

View File

@@ -160,7 +160,7 @@ func (sm *serverSessionMedia) readRTCPUDPPlay(payload []byte) {
return return
} }
now := time.Now() now := sm.ss.s.timeNow()
atomic.StoreInt64(sm.ss.udpLastPacketTime, now.Unix()) atomic.StoreInt64(sm.ss.udpLastPacketTime, now.Unix())
for _, pkt := range packets { for _, pkt := range packets {
@@ -191,7 +191,7 @@ func (sm *serverSessionMedia) readRTPUDPRecord(payload []byte) {
return return
} }
now := time.Now() now := sm.ss.s.timeNow()
atomic.StoreInt64(sm.ss.udpLastPacketTime, now.Unix()) atomic.StoreInt64(sm.ss.udpLastPacketTime, now.Unix())
forma.readRTPUDP(pkt, now) forma.readRTPUDP(pkt, now)
@@ -213,19 +213,17 @@ func (sm *serverSessionMedia) readRTCPUDPRecord(payload []byte) {
return return
} }
now := time.Now() now := sm.ss.s.timeNow()
atomic.StoreInt64(sm.ss.udpLastPacketTime, now.Unix()) atomic.StoreInt64(sm.ss.udpLastPacketTime, now.Unix())
for _, pkt := range packets { for _, pkt := range packets {
if sr, ok := pkt.(*rtcp.SenderReport); ok { if sr, ok := pkt.(*rtcp.SenderReport); ok {
format := serverFindFormatWithSSRC(sm.formats, sr.SSRC) format := serverFindFormatWithSSRC(sm.formats, sr.SSRC)
if format != nil { if format != nil {
format.udpRTCPReceiver.ProcessSenderReport(sr, now) format.rtcpReceiver.ProcessSenderReport(sr, now)
} }
} }
}
for _, pkt := range packets {
sm.onPacketRTCP(pkt) sm.onPacketRTCP(pkt)
} }
} }
@@ -281,7 +279,16 @@ func (sm *serverSessionMedia) readRTCPTCPRecord(payload []byte) {
return return
} }
now := sm.ss.s.timeNow()
for _, pkt := range packets { for _, pkt := range packets {
if sr, ok := pkt.(*rtcp.SenderReport); ok {
format := serverFindFormatWithSSRC(sm.formats, sr.SSRC)
if format != nil {
format.rtcpReceiver.ProcessSenderReport(sr, now)
}
}
sm.onPacketRTCP(pkt) sm.onPacketRTCP(pkt)
} }
} }

View File

@@ -236,7 +236,7 @@ func (st *ServerStream) readerSetInactive(ss *ServerSession) {
// WritePacketRTP writes a RTP packet to all the readers of the stream. // WritePacketRTP writes a RTP packet to all the readers of the stream.
func (st *ServerStream) WritePacketRTP(medi *media.Media, pkt *rtp.Packet) error { func (st *ServerStream) WritePacketRTP(medi *media.Media, pkt *rtp.Packet) error {
return st.WritePacketRTPWithNTP(medi, pkt, time.Now()) return st.WritePacketRTPWithNTP(medi, pkt, st.s.timeNow())
} }
// WritePacketRTPWithNTP writes a RTP packet to all the readers of the stream. // WritePacketRTPWithNTP writes a RTP packet to all the readers of the stream.

View File

@@ -35,6 +35,7 @@ func newServerStreamMedia(st *ServerStream, medi *media.Media, trackID int) *ser
tr.rtcpSender = rtcpsender.New( tr.rtcpSender = rtcpsender.New(
forma.ClockRate(), forma.ClockRate(),
st.s.senderReportPeriod, st.s.senderReportPeriod,
st.s.timeNow,
func(pkt rtcp.Packet) { func(pkt rtcp.Packet) {
if !st.s.DisableRTCPSenderReports { if !st.s.DisableRTCPSenderReports {
st.WritePacketRTCP(cmedia, pkt) //nolint:errcheck st.WritePacketRTCP(cmedia, pkt) //nolint:errcheck

View File

@@ -15,7 +15,7 @@ func serverFindFormatWithSSRC(
ssrc uint32, ssrc uint32,
) *serverSessionFormat { ) *serverSessionFormat {
for _, format := range formats { for _, format := range formats {
tssrc, ok := format.udpRTCPReceiver.LastSSRC() tssrc, ok := format.rtcpReceiver.LastSSRC()
if ok && tssrc == ssrc { if ok && tssrc == ssrc {
return format return format
} }