diff --git a/agent_stats.go b/agent_stats.go index 167114f..f795e85 100644 --- a/agent_stats.go +++ b/agent_stats.go @@ -15,17 +15,17 @@ func (a *Agent) GetCandidatePairsStats() []CandidatePairStats { result := make([]CandidatePairStats, 0, len(a.checklist)) for _, cp := range a.checklist { stat := CandidatePairStats{ - Timestamp: time.Now(), - LocalCandidateID: cp.Local.ID(), - RemoteCandidateID: cp.Remote.ID(), - State: cp.state, - Nominated: cp.nominated, - // PacketsSent uint32 - // PacketsReceived uint32 - // BytesSent uint64 - // BytesReceived uint64 - // LastPacketSentTimestamp time.Time - // LastPacketReceivedTimestamp time.Time + Timestamp: time.Now(), + LocalCandidateID: cp.Local.ID(), + RemoteCandidateID: cp.Remote.ID(), + State: cp.state, + Nominated: cp.nominated, + PacketsSent: cp.PacketsSent(), + PacketsReceived: cp.PacketsReceived(), + BytesSent: cp.BytesSent(), + BytesReceived: cp.BytesReceived(), + LastPacketSentTimestamp: cp.LastPacketSentAt(), + LastPacketReceivedTimestamp: cp.LastPacketReceivedAt(), FirstRequestTimestamp: cp.FirstRequestSentAt(), LastRequestTimestamp: cp.LastRequestSentAt(), FirstResponseTimestamp: cp.FirstResponseReceivedAt(), @@ -73,17 +73,17 @@ func (a *Agent) GetSelectedCandidatePairStats() (CandidatePairStats, bool) { isAvailable = true res = CandidatePairStats{ - Timestamp: time.Now(), - LocalCandidateID: sp.Local.ID(), - RemoteCandidateID: sp.Remote.ID(), - State: sp.state, - Nominated: sp.nominated, - // PacketsSent uint32 - // PacketsReceived uint32 - // BytesSent uint64 - // BytesReceived uint64 - // LastPacketSentTimestamp time.Time - // LastPacketReceivedTimestamp time.Time + Timestamp: time.Now(), + LocalCandidateID: sp.Local.ID(), + RemoteCandidateID: sp.Remote.ID(), + State: sp.state, + Nominated: sp.nominated, + PacketsSent: sp.PacketsSent(), + PacketsReceived: sp.PacketsReceived(), + BytesSent: sp.BytesSent(), + BytesReceived: sp.BytesReceived(), + LastPacketSentTimestamp: sp.LastPacketSentAt(), + LastPacketReceivedTimestamp: sp.LastPacketReceivedAt(), // FirstRequestTimestamp time.Time // LastRequestTimestamp time.Time // LastResponseTimestamp time.Time diff --git a/agent_test.go b/agent_test.go index 4297592..3a2f43e 100644 --- a/agent_test.go +++ b/agent_test.go @@ -649,6 +649,9 @@ func TestCandidatePairsStats(t *testing.T) { //nolint:cyclop,gocyclo p.UpdateRequestSent() p.UpdateResponseSent() p.UpdateRoundTripTime(time.Second) + + p.UpdatePacketSent(100) + p.UpdatePacketReceived(200) } p := agent.findPair(hostLocal, prflxRemote) @@ -684,10 +687,13 @@ func TestCandidatePairsStats(t *testing.T) { //nolint:cyclop,gocyclo require.False(t, cps.LastResponseTimestamp.IsZero()) require.False(t, cps.FirstRequestReceivedTimestamp.IsZero()) require.False(t, cps.LastRequestReceivedTimestamp.IsZero()) - require.NotZero(t, cps.RequestsReceived) - require.NotZero(t, cps.RequestsSent) - require.NotZero(t, cps.ResponsesSent) - require.NotZero(t, cps.ResponsesReceived) + + require.Equal(t, uint32(1), cps.PacketsSent) + require.Equal(t, uint32(1), cps.PacketsReceived) + require.Equal(t, uint64(100), cps.BytesSent) + require.Equal(t, uint64(200), cps.BytesReceived) + require.False(t, cps.LastPacketSentTimestamp.IsZero()) + require.False(t, cps.LastPacketReceivedTimestamp.IsZero()) } require.Equal(t, relayPairStat.RemoteCandidateID, relayRemote.ID()) @@ -738,17 +744,20 @@ func TestSelectedCandidatePairStats(t *testing.T) { //nolint:cyclop require.False(t, ok) // add pair and populate some RTT stats - p := agent.findPair(hostLocal, srflxRemote) - if p == nil { + candidatePair := agent.findPair(hostLocal, srflxRemote) + if candidatePair == nil { agent.addPair(hostLocal, srflxRemote) - p = agent.findPair(hostLocal, srflxRemote) + candidatePair = agent.findPair(hostLocal, srflxRemote) } for i := 0; i < 10; i++ { - p.UpdateRoundTripTime(time.Duration(i+1) * time.Second) + candidatePair.UpdateRoundTripTime(time.Duration(i+1) * time.Second) } + candidatePair.UpdatePacketSent(150) + candidatePair.UpdatePacketReceived(250) + // set the pair as selected - agent.setSelectedPair(p) + agent.setSelectedPair(candidatePair) stats, ok := agent.GetSelectedCandidatePairStats() require.True(t, ok) @@ -758,6 +767,13 @@ func TestSelectedCandidatePairStats(t *testing.T) { //nolint:cyclop require.Equal(t, float64(10), stats.CurrentRoundTripTime) require.Equal(t, float64(55), stats.TotalRoundTripTime) require.Equal(t, uint64(10), stats.ResponsesReceived) + + require.Equal(t, uint32(1), stats.PacketsSent) + require.Equal(t, uint32(1), stats.PacketsReceived) + require.Equal(t, uint64(150), stats.BytesSent) + require.Equal(t, uint64(250), stats.BytesReceived) + require.False(t, stats.LastPacketSentTimestamp.IsZero()) + require.False(t, stats.LastPacketReceivedTimestamp.IsZero()) } func TestLocalCandidateStats(t *testing.T) { //nolint:cyclop diff --git a/candidate_base.go b/candidate_base.go index 45c090e..0e38d4c 100644 --- a/candidate_base.go +++ b/candidate_base.go @@ -309,11 +309,19 @@ func (c *candidateBase) handleInboundPacket(buf []byte, srcAddr net.Addr) { } // Note: This will return packetio.ErrFull if the buffer ever manages to fill up. - if _, err := agent.buf.Write(buf); err != nil { + n, err := agent.buf.Write(buf) + if err != nil { agent.log.Warnf("Failed to write packet: %s", err) return } + + // Add received application bytes to the currently selected candidate pair. + if n > 0 { + if sp := agent.getSelectedPair(); sp != nil { + sp.UpdatePacketReceived(n) + } + } } // close stops the recvLoop. diff --git a/candidatepair.go b/candidatepair.go index 9fd5059..31b8bf1 100644 --- a/candidatepair.go +++ b/candidatepair.go @@ -34,6 +34,13 @@ type CandidatePair struct { currentRoundTripTime int64 // in ns totalRoundTripTime int64 // in ns + packetsSent uint32 + packetsReceived uint32 + bytesSent uint64 + bytesReceived uint64 + lastPacketSentAt atomic.Value // time.Time + lastPacketReceivedAt atomic.Value // time.Time + requestsReceived uint64 requestsSent uint64 responsesReceived uint64 @@ -180,6 +187,66 @@ func (p *CandidatePair) ResponsesSent() uint64 { return atomic.LoadUint64(&p.responsesSent) } +// PacketsSent returns total application (non-STUN) packets sent on this pair. +func (p *CandidatePair) PacketsSent() uint32 { + return atomic.LoadUint32(&p.packetsSent) +} + +// PacketsReceived returns total application (non-STUN) packets received on this pair. +func (p *CandidatePair) PacketsReceived() uint32 { + return atomic.LoadUint32(&p.packetsReceived) +} + +// BytesSent returns total application bytes sent on this pair. +func (p *CandidatePair) BytesSent() uint64 { + return atomic.LoadUint64(&p.bytesSent) +} + +// BytesReceived returns total application bytes received on this pair. +func (p *CandidatePair) BytesReceived() uint64 { + return atomic.LoadUint64(&p.bytesReceived) +} + +// LastPacketSentAt returns the timestamp of the last application packet sent. +func (p *CandidatePair) LastPacketSentAt() time.Time { + if v, ok := p.lastPacketSentAt.Load().(time.Time); ok { + return v + } + + return time.Time{} +} + +// LastPacketReceivedAt returns the timestamp of the last application packet received. +func (p *CandidatePair) LastPacketReceivedAt() time.Time { + if v, ok := p.lastPacketReceivedAt.Load().(time.Time); ok { + return v + } + + return time.Time{} +} + +// UpdatePacketSent increments packet/byte counters and updates timestamp for a sent application packet. +func (p *CandidatePair) UpdatePacketSent(n int) { + if n <= 0 { + return + } + + atomic.AddUint32(&p.packetsSent, 1) + atomic.AddUint64(&p.bytesSent, uint64(n)) // #nosec G115 -- n > 0 validated above + p.lastPacketSentAt.Store(time.Now()) +} + +// UpdatePacketReceived increments packet/byte counters and updates timestamp for a received application packet. +func (p *CandidatePair) UpdatePacketReceived(n int) { + if n <= 0 { + return + } + + atomic.AddUint32(&p.packetsReceived, 1) + atomic.AddUint64(&p.bytesReceived, uint64(n)) // #nosec G115 -- n > 0 validated above + p.lastPacketReceivedAt.Store(time.Now()) +} + // FirstRequestSentAt returns the timestamp of the first connectivity check sent. func (p *CandidatePair) FirstRequestSentAt() time.Time { if v, ok := p.firstRequestSentAt.Load().(time.Time); ok { diff --git a/transport.go b/transport.go index 81c2a0a..31e1286 100644 --- a/transport.go +++ b/transport.go @@ -103,9 +103,14 @@ func (c *Conn) Write(packet []byte) (int, error) { } } - c.bytesSent.Add(uint64(len(packet))) + // Write application data via the selected pair and update stats with actual bytes written. + n, err := pair.Write(packet) + if n > 0 { + c.bytesSent.Add(uint64(n)) + pair.UpdatePacketSent(n) + } - return pair.Write(packet) + return n, err } // Close implements the Conn Close method. It is used to close