Started using detached datachannels for performance boost

This commit is contained in:
Jacob Stewart
2023-08-06 16:01:47 -04:00
parent d367d3e91e
commit 066c7584ac
5 changed files with 66 additions and 56 deletions

42
conn.go
View File

@@ -1,7 +1,6 @@
package rtcnet package rtcnet
import ( import (
"fmt"
"net" "net"
"time" "time"
"sync" "sync"
@@ -9,11 +8,14 @@ import (
"errors" "errors"
"github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3"
"github.com/pion/datachannel"
) )
type Conn struct { type Conn struct {
peerConn *webrtc.PeerConnection peerConn *webrtc.PeerConnection
dataChannel *webrtc.DataChannel dataChannel *webrtc.DataChannel
raw datachannel.ReadWriteCloser
readChan chan []byte readChan chan []byte
errorChan chan error errorChan chan error
@@ -23,18 +25,10 @@ type Conn struct {
func newConn(peer *webrtc.PeerConnection) *Conn { func newConn(peer *webrtc.PeerConnection) *Conn {
return &Conn{ return &Conn{
peerConn: peer, peerConn: peer,
readChan: make(chan []byte, 1024), //TODO! - Sizing
errorChan: make(chan error, 16), //TODO! - Sizing errorChan: make(chan error, 16), //TODO! - Sizing
} }
} }
// For pushing read data out of the datachannel and into the read buffer
func (c *Conn) pushReadData(dat []byte) {
if c.closed.Load() { return } // Skip if we are already closed
c.readChan <- dat
}
// For pushing error data out of the webrtc connection into the error buffer // For pushing error data out of the webrtc connection into the error buffer
func (c *Conn) pushErrorData(err error) { func (c *Conn) pushErrorData(err error) {
if c.closed.Load() { return } // Skip if we are already closed if c.closed.Load() { return } // Skip if we are already closed
@@ -45,30 +39,16 @@ func (c *Conn) pushErrorData(err error) {
func (c *Conn) Read(b []byte) (int, error) { func (c *Conn) Read(b []byte) (int, error) {
select { select {
case err, ok := <-c.errorChan: case err := <-c.errorChan:
if !ok {
return 0, net.ErrClosed
}
return 0, err // There was some error return 0, err // There was some error
default:
case dat, ok := <- c.readChan: // Just exit
if !ok {
return 0, net.ErrClosed
}
if len(dat) > len(b) {
return 0, fmt.Errorf("message too big") // TODO - Instead of failing, should we just return partial?
}
copy(b, dat)
return len(dat), nil
} }
return c.raw.Read(b)
} }
func (c *Conn) Write(b []byte) (int, error) { func (c *Conn) Write(b []byte) (int, error) {
err := c.dataChannel.Send(b) return c.raw.Write(b)
if err != nil {
return 0, err
}
return len(b), nil
} }
func (c *Conn) Close() error { func (c *Conn) Close() error {
@@ -79,12 +59,12 @@ func (c *Conn) Close() error {
err1 := c.dataChannel.Close() err1 := c.dataChannel.Close()
err2 := c.peerConn.Close() err2 := c.peerConn.Close()
err3 := c.raw.Close()
close(c.readChan)
close(c.errorChan) close(c.errorChan)
if err1 != nil || err2 != nil { if err1 != nil || err2 != nil || err3 != nil {
closeErr = errors.Join(errors.New("failed to close: (datachannel, peerconn)"), err1, err2) closeErr = errors.Join(errors.New("failed to close: (datachannel, peerconn, raw)"), err1, err2, err3)
logErr("conn: closing error: ", closeErr) logErr("conn: closing error: ", closeErr)
} }
}) })

View File

@@ -126,10 +126,15 @@ func TestConn(t *testing.T) {
compare(t, buf[i], dat[i]) compare(t, buf[i], dat[i])
} }
fmt.Printf(".")
successCount++ successCount++
} }
fmt.Println("Success: ", successCount) fmt.Println("Success: ", successCount)
err = conn.Close()
if err != nil {
fmt.Errorf("%v", err)
}
} }
fmt.Println("Done") fmt.Println("Done")

33
dial.go
View File

@@ -31,7 +31,9 @@ func Dial(address string, tlsConfig *tls.Config) (*Conn, error) {
trace("Dial: Starting WebRTC negotiation") trace("Dial: Starting WebRTC negotiation")
peerConnection, err := webrtc.NewPeerConnection(config) api := getSettingsEngineApi()
peerConnection, err := api.NewPeerConnection(config)
if err != nil { if err != nil {
logErr("Dial: NewPeerConnection", err) logErr("Dial: NewPeerConnection", err)
return nil, err return nil, err
@@ -170,19 +172,26 @@ func Dial(address string, tlsConfig *tls.Config) (*Conn, error) {
dataChannel.OnOpen(func() { dataChannel.OnOpen(func() {
printDataChannel(dataChannel) printDataChannel(dataChannel)
trace("Dial: Data Channel OnOpen") trace("Dial: Data Channel OnOpen")
conn.dataChannel = dataChannel
connFinish <- true conn.raw, err = dataChannel.Detach()
if err != nil {
conn.pushErrorData(err)
} else {
conn.dataChannel = dataChannel
connFinish <- true
}
}) })
// Register text message handling // Note: Stopped using this now that I have detached data channels
dataChannel.OnMessage(func(msg webrtc.DataChannelMessage) { // // Register text message handling
// log.Print("Client: Received Msg from DataChannel", len(msg.Data)) // dataChannel.OnMessage(func(msg webrtc.DataChannelMessage) {
if msg.IsString { // // log.Print("Client: Received Msg from DataChannel", len(msg.Data))
trace("Dial: DataChannel OnMessage: Received string message, skipping") // if msg.IsString {
return // trace("Dial: DataChannel OnMessage: Received string message, skipping")
} // return
conn.pushReadData(msg.Data) // }
}) // conn.pushReadData(msg.Data)
// })
// Create an offer to send to the other process // Create an offer to send to the other process
offer, err := peerConnection.CreateOffer(nil) offer, err := peerConnection.CreateOffer(nil)

View File

@@ -78,6 +78,8 @@ func (l *Listener) Addr() net.Addr {
func (l *Listener) attemptWebRtcNegotiation(wsConn net.Conn) { func (l *Listener) attemptWebRtcNegotiation(wsConn net.Conn) {
defer trace("finished attemptWebRtcNegotiation") defer trace("finished attemptWebRtcNegotiation")
api := getSettingsEngineApi()
var candidatesMux sync.Mutex var candidatesMux sync.Mutex
pendingCandidates := make([]*webrtc.ICECandidate, 0) pendingCandidates := make([]*webrtc.ICECandidate, 0)
config := webrtc.Configuration{ config := webrtc.Configuration{
@@ -88,7 +90,7 @@ func (l *Listener) attemptWebRtcNegotiation(wsConn net.Conn) {
}, },
} }
peerConnection, err := webrtc.NewPeerConnection(config) peerConnection, err := api.NewPeerConnection(config)
if err != nil { if err != nil {
l.pendingAcceptErrors <- err l.pendingAcceptErrors <- err
return return
@@ -149,7 +151,13 @@ func (l *Listener) attemptWebRtcNegotiation(wsConn net.Conn) {
d.OnOpen(func() { d.OnOpen(func() {
printDataChannel(d) printDataChannel(d)
wsConn.Close() wsConn.Close()
l.pendingAccepts <- conn
conn.raw, err = d.Detach()
if err != nil {
l.pendingAcceptErrors <- err
} else {
l.pendingAccepts <- conn
}
}) })
// // Register channel opening handling // // Register channel opening handling
@@ -157,15 +165,16 @@ func (l *Listener) attemptWebRtcNegotiation(wsConn net.Conn) {
// trace("Listener: Data channel was closed!") // trace("Listener: Data channel was closed!")
// }) // })
// Register text message handling // Note: Stopped using this now that I have detached data channels
d.OnMessage(func(msg webrtc.DataChannelMessage) { // // Register text message handling
// log.Print("Server: Received Msg from DataChannel", len(msg.Data)) // d.OnMessage(func(msg webrtc.DataChannelMessage) {
if msg.IsString { // // log.Print("Server: Received Msg from DataChannel", len(msg.Data))
trace("Listener: DataChannel OnMessage: Received string message, skipping") // if msg.IsString {
return // trace("Listener: DataChannel OnMessage: Received string message, skipping")
} // return
conn.pushReadData(msg.Data) // }
}) // conn.pushReadData(msg.Data)
// })
}) })
buf := make([]byte, 8 * 1024) // TODO: hardcoded to be big enough for the signalling messages buf := make([]byte, 8 * 1024) // TODO: hardcoded to be big enough for the signalling messages

View File

@@ -8,7 +8,6 @@ import (
// Notes: https://webrtcforthecurious.com/docs/01-what-why-and-how/ // Notes: https://webrtcforthecurious.com/docs/01-what-why-and-how/
// Notes: about reliability: https://stackoverflow.com/questions/54292824/webrtc-channel-reliability // Notes: about reliability: https://stackoverflow.com/questions/54292824/webrtc-channel-reliability
// TODO - Investigate Detaching the datachannel: https://github.com/pion/webrtc/tree/master/examples/data-channels-detach
// TODO - Let stun server list be injectable // TODO - Let stun server list be injectable
// TODO - Interesting note: if you run this in a docker container with the networking set to something other than "host" (ie the default is bridge), then what happens is the docker container gets NAT'ed behind the original host which causes you to need an ICE server. So You must run this code with HOST networking!!! // TODO - Interesting note: if you run this in a docker container with the networking set to something other than "host" (ie the default is bridge), then what happens is the docker container gets NAT'ed behind the original host which causes you to need an ICE server. So You must run this code with HOST networking!!!
@@ -17,6 +16,14 @@ import (
// - TODO - There might be some way to avoid this with: https://pkg.go.dev/github.com/pion/webrtc/v3#SettingEngine.SetNAT1To1IPs // - TODO - There might be some way to avoid this with: https://pkg.go.dev/github.com/pion/webrtc/v3#SettingEngine.SetNAT1To1IPs
// - TODO - also this: https://pkg.go.dev/github.com/pion/webrtc/v3#SettingEngine.SetICEUDPMux // - TODO - also this: https://pkg.go.dev/github.com/pion/webrtc/v3#SettingEngine.SetICEUDPMux
// Current settings engine settings
// Detaching the datachannel: https://github.com/pion/webrtc/tree/master/examples/data-channels-detach
func getSettingsEngineApi() *webrtc.API {
s := webrtc.SettingEngine{}
s.DetachDataChannels()
return webrtc.NewAPI(webrtc.WithSettingEngine(s))
}
// Internal messages used for webrtc negotiation/signalling // Internal messages used for webrtc negotiation/signalling
type signalMsg struct { type signalMsg struct {
SDP *sdpMsg SDP *sdpMsg