Added logging throughput

Resolves #778
This commit is contained in:
Yutaka Takeda
2019-12-07 22:40:40 -08:00
committed by Sean DuBois
parent 1d42a11bde
commit 7d99edd21a
2 changed files with 81 additions and 7 deletions

View File

@@ -1,6 +1,9 @@
package main package main
import ( import (
"log"
"net"
"sync/atomic"
"time" "time"
"github.com/pion/logging" "github.com/pion/logging"
@@ -8,27 +11,96 @@ import (
"github.com/pion/webrtc/v2" "github.com/pion/webrtc/v2"
) )
/* VNet Configuration
+ - - - - - - - - - - - - - - - - - - - - - - - +
VNet
| +-------------------------------------------+ |
| wan:vnet.Router |
| +---------+----------------------+----------+ |
| |
| +---------+----------+ +---------+----------+ |
| offerVNet:vnet.Net | |answerVNet:vnet.Net |
| +---------+----------+ +---------+----------+ |
| |
+ - - - - - + - - - - - - - - - - -+- - - - - - +
| |
+---------+----------+ +---------+----------+
|offerPeerConnection | |answerPeerConnection|
+--------------------+ +--------------------+
*/
func main() { func main() {
var inboundBytes int32 // for offerPeerConnection
var outboundBytes int32 // for offerPeerConnection
// Create a root router
wan, err := vnet.NewRouter(&vnet.RouterConfig{ wan, err := vnet.NewRouter(&vnet.RouterConfig{
CIDR: "1.2.3.0/24", CIDR: "1.2.3.0/24",
LoggerFactory: logging.NewDefaultLoggerFactory(), LoggerFactory: logging.NewDefaultLoggerFactory(),
}) })
panicIfError(err) panicIfError(err)
offerVNet := vnet.NewNet(&vnet.NetConfig{}) // Add a filter that monitors the traffic on the router
wan.AddChunkFilter(func(c vnet.Chunk) bool {
netType := c.SourceAddr().Network()
if netType == "udp" {
dstAddr := c.DestinationAddr().String()
host, _, err2 := net.SplitHostPort(dstAddr)
panicIfError(err2)
if host == "1.2.3.4" {
// c.UserData() returns a []byte of UDP payload
atomic.AddInt32(&inboundBytes, int32(len(c.UserData())))
}
srcAddr := c.SourceAddr().String()
host, _, err2 = net.SplitHostPort(srcAddr)
panicIfError(err2)
if host == "1.2.3.4" {
// c.UserData() returns a []byte of UDP payload
atomic.AddInt32(&outboundBytes, int32(len(c.UserData())))
}
}
return true
})
// Log throughput every 3 seconds
go func() {
duration := 2 * time.Second
for {
time.Sleep(duration)
inBytes := atomic.SwapInt32(&inboundBytes, 0) // read & reset
outBytes := atomic.SwapInt32(&outboundBytes, 0) // read & reset
inboundThroughput := float64(inBytes) / duration.Seconds()
outboundThroughput := float64(outBytes) / duration.Seconds()
log.Printf("inbound throughput : %.01f [Byte/s]\n", inboundThroughput)
log.Printf("outbound throughput: %.01f [Byte/s]\n", outboundThroughput)
}
}()
// Create a network interface for offerer
offerVNet := vnet.NewNet(&vnet.NetConfig{
StaticIPs: []string{"1.2.3.4"},
})
// Add the network interface to the router
panicIfError(wan.AddNet(offerVNet)) panicIfError(wan.AddNet(offerVNet))
offerSettingEngine := webrtc.SettingEngine{} offerSettingEngine := webrtc.SettingEngine{}
offerSettingEngine.SetVNet(offerVNet) offerSettingEngine.SetVNet(offerVNet)
offerAPI := webrtc.NewAPI(webrtc.WithSettingEngine(offerSettingEngine)) offerAPI := webrtc.NewAPI(webrtc.WithSettingEngine(offerSettingEngine))
answerVNet := vnet.NewNet(&vnet.NetConfig{}) // Create a network interface for answerer
answerVNet := vnet.NewNet(&vnet.NetConfig{
StaticIPs: []string{"1.2.3.5"},
})
// Add the network interface to the router
panicIfError(wan.AddNet(answerVNet)) panicIfError(wan.AddNet(answerVNet))
answerSettingEngine := webrtc.SettingEngine{} answerSettingEngine := webrtc.SettingEngine{}
answerSettingEngine.SetVNet(answerVNet) answerSettingEngine.SetVNet(answerVNet)
answerAPI := webrtc.NewAPI(webrtc.WithSettingEngine(answerSettingEngine)) answerAPI := webrtc.NewAPI(webrtc.WithSettingEngine(answerSettingEngine))
// Start the virtual network by calling Start() on the root router
panicIfError(wan.Start()) panicIfError(wan.Start())
offerPeerConnection, err := offerAPI.NewPeerConnection(webrtc.Configuration{}) offerPeerConnection, err := offerAPI.NewPeerConnection(webrtc.Configuration{})
@@ -40,21 +112,23 @@ func main() {
offerDataChannel, err := offerPeerConnection.CreateDataChannel("label", nil) offerDataChannel, err := offerPeerConnection.CreateDataChannel("label", nil)
panicIfError(err) panicIfError(err)
msgSendLoop := func(dc *webrtc.DataChannel) { msgSendLoop := func(dc *webrtc.DataChannel, interval time.Duration) {
for { for {
time.Sleep(500 * time.Millisecond) time.Sleep(interval)
panicIfError(dc.SendText("My DataChannel Message")) panicIfError(dc.SendText("My DataChannel Message"))
} }
} }
offerDataChannel.OnOpen(func() { offerDataChannel.OnOpen(func() {
msgSendLoop(offerDataChannel) // Send test from offerer every 100 msec
msgSendLoop(offerDataChannel, 100*time.Millisecond)
}) })
answerPeerConnection.OnDataChannel(func(answerDataChannel *webrtc.DataChannel) { answerPeerConnection.OnDataChannel(func(answerDataChannel *webrtc.DataChannel) {
answerDataChannel.OnOpen(func() { answerDataChannel.OnOpen(func() {
msgSendLoop(answerDataChannel) // Send test from answerer every 200 msec
msgSendLoop(answerDataChannel, 200*time.Millisecond)
}) })
}) })

View File

@@ -1915,7 +1915,7 @@ func (pc *PeerConnection) startTransports(iceRole ICERole, dtlsRole DTLSRole, re
var openedDCCount uint32 var openedDCCount uint32
for _, d := range dataChannels { for _, d := range dataChannels {
if d.readyState == DataChannelStateConnecting { if d.ReadyState() == DataChannelStateConnecting {
err := d.open(pc.sctpTransport) err := d.open(pc.sctpTransport)
if err != nil { if err != nil {
pc.log.Warnf("failed to open data channel: %s", err) pc.log.Warnf("failed to open data channel: %s", err)