From 066c7584accc1809df976b1d0a21362ddad04c7b Mon Sep 17 00:00:00 2001 From: Jacob Stewart <2606873+unitoftime@users.noreply.github.com> Date: Sun, 6 Aug 2023 16:01:47 -0400 Subject: [PATCH] Started using detached datachannels for performance boost --- conn.go | 42 +++++++++++------------------------------- conn_test.go | 7 ++++++- dial.go | 33 +++++++++++++++++++++------------ listener.go | 31 ++++++++++++++++++++----------- rtcnet.go | 9 ++++++++- 5 files changed, 66 insertions(+), 56 deletions(-) diff --git a/conn.go b/conn.go index 37edd47..2fb7423 100644 --- a/conn.go +++ b/conn.go @@ -1,7 +1,6 @@ package rtcnet import ( - "fmt" "net" "time" "sync" @@ -9,11 +8,14 @@ import ( "errors" "github.com/pion/webrtc/v3" + "github.com/pion/datachannel" ) type Conn struct { peerConn *webrtc.PeerConnection dataChannel *webrtc.DataChannel + raw datachannel.ReadWriteCloser + readChan chan []byte errorChan chan error @@ -23,18 +25,10 @@ type Conn struct { func newConn(peer *webrtc.PeerConnection) *Conn { return &Conn{ peerConn: peer, - readChan: make(chan []byte, 1024), //TODO! - Sizing errorChan: make(chan error, 16), //TODO! - Sizing } } -// For pushing read data out of the datachannel and into the read buffer -func (c *Conn) pushReadData(dat []byte) { - if c.closed.Load() { return } // Skip if we are already closed - - c.readChan <- dat -} - // For pushing error data out of the webrtc connection into the error buffer func (c *Conn) pushErrorData(err error) { if c.closed.Load() { return } // Skip if we are already closed @@ -45,30 +39,16 @@ func (c *Conn) pushErrorData(err error) { func (c *Conn) Read(b []byte) (int, error) { select { - case err, ok := <-c.errorChan: - if !ok { - return 0, net.ErrClosed - } + case err := <-c.errorChan: return 0, err // There was some error - - case dat, ok := <- c.readChan: - if !ok { - return 0, net.ErrClosed - } - if len(dat) > len(b) { - return 0, fmt.Errorf("message too big") // TODO - Instead of failing, should we just return partial? - } - copy(b, dat) - return len(dat), nil + default: + // Just exit } + return c.raw.Read(b) } func (c *Conn) Write(b []byte) (int, error) { - err := c.dataChannel.Send(b) - if err != nil { - return 0, err - } - return len(b), nil + return c.raw.Write(b) } func (c *Conn) Close() error { @@ -79,12 +59,12 @@ func (c *Conn) Close() error { err1 := c.dataChannel.Close() err2 := c.peerConn.Close() + err3 := c.raw.Close() - close(c.readChan) close(c.errorChan) - if err1 != nil || err2 != nil { - closeErr = errors.Join(errors.New("failed to close: (datachannel, peerconn)"), err1, err2) + if err1 != nil || err2 != nil || err3 != nil { + closeErr = errors.Join(errors.New("failed to close: (datachannel, peerconn, raw)"), err1, err2, err3) logErr("conn: closing error: ", closeErr) } }) diff --git a/conn_test.go b/conn_test.go index 5bdeae7..722ddb8 100644 --- a/conn_test.go +++ b/conn_test.go @@ -126,10 +126,15 @@ func TestConn(t *testing.T) { compare(t, buf[i], dat[i]) } + fmt.Printf(".") successCount++ } - fmt.Println("Success: ", successCount) + fmt.Println("Success: ", successCount) + err = conn.Close() + if err != nil { + fmt.Errorf("%v", err) + } } fmt.Println("Done") diff --git a/dial.go b/dial.go index 14ea3e3..11f4cf2 100644 --- a/dial.go +++ b/dial.go @@ -31,7 +31,9 @@ func Dial(address string, tlsConfig *tls.Config) (*Conn, error) { trace("Dial: Starting WebRTC negotiation") - peerConnection, err := webrtc.NewPeerConnection(config) + api := getSettingsEngineApi() + + peerConnection, err := api.NewPeerConnection(config) if err != nil { logErr("Dial: NewPeerConnection", err) return nil, err @@ -170,19 +172,26 @@ func Dial(address string, tlsConfig *tls.Config) (*Conn, error) { dataChannel.OnOpen(func() { printDataChannel(dataChannel) trace("Dial: Data Channel OnOpen") - conn.dataChannel = dataChannel - connFinish <- true + + conn.raw, err = dataChannel.Detach() + if err != nil { + conn.pushErrorData(err) + } else { + conn.dataChannel = dataChannel + connFinish <- true + } }) - // Register text message handling - dataChannel.OnMessage(func(msg webrtc.DataChannelMessage) { - // log.Print("Client: Received Msg from DataChannel", len(msg.Data)) - if msg.IsString { - trace("Dial: DataChannel OnMessage: Received string message, skipping") - return - } - conn.pushReadData(msg.Data) - }) + // Note: Stopped using this now that I have detached data channels + // // Register text message handling + // dataChannel.OnMessage(func(msg webrtc.DataChannelMessage) { + // // log.Print("Client: Received Msg from DataChannel", len(msg.Data)) + // if msg.IsString { + // trace("Dial: DataChannel OnMessage: Received string message, skipping") + // return + // } + // conn.pushReadData(msg.Data) + // }) // Create an offer to send to the other process offer, err := peerConnection.CreateOffer(nil) diff --git a/listener.go b/listener.go index b65003d..ae237e2 100644 --- a/listener.go +++ b/listener.go @@ -78,6 +78,8 @@ func (l *Listener) Addr() net.Addr { func (l *Listener) attemptWebRtcNegotiation(wsConn net.Conn) { defer trace("finished attemptWebRtcNegotiation") + api := getSettingsEngineApi() + var candidatesMux sync.Mutex pendingCandidates := make([]*webrtc.ICECandidate, 0) config := webrtc.Configuration{ @@ -88,7 +90,7 @@ func (l *Listener) attemptWebRtcNegotiation(wsConn net.Conn) { }, } - peerConnection, err := webrtc.NewPeerConnection(config) + peerConnection, err := api.NewPeerConnection(config) if err != nil { l.pendingAcceptErrors <- err return @@ -149,7 +151,13 @@ func (l *Listener) attemptWebRtcNegotiation(wsConn net.Conn) { d.OnOpen(func() { printDataChannel(d) wsConn.Close() - l.pendingAccepts <- conn + + conn.raw, err = d.Detach() + if err != nil { + l.pendingAcceptErrors <- err + } else { + l.pendingAccepts <- conn + } }) // // Register channel opening handling @@ -157,15 +165,16 @@ func (l *Listener) attemptWebRtcNegotiation(wsConn net.Conn) { // trace("Listener: Data channel was closed!") // }) - // Register text message handling - d.OnMessage(func(msg webrtc.DataChannelMessage) { - // log.Print("Server: Received Msg from DataChannel", len(msg.Data)) - if msg.IsString { - trace("Listener: DataChannel OnMessage: Received string message, skipping") - return - } - conn.pushReadData(msg.Data) - }) + // Note: Stopped using this now that I have detached data channels + // // Register text message handling + // d.OnMessage(func(msg webrtc.DataChannelMessage) { + // // log.Print("Server: Received Msg from DataChannel", len(msg.Data)) + // if msg.IsString { + // trace("Listener: DataChannel OnMessage: Received string message, skipping") + // return + // } + // conn.pushReadData(msg.Data) + // }) }) buf := make([]byte, 8 * 1024) // TODO: hardcoded to be big enough for the signalling messages diff --git a/rtcnet.go b/rtcnet.go index fd34af2..4d68876 100644 --- a/rtcnet.go +++ b/rtcnet.go @@ -8,7 +8,6 @@ import ( // Notes: https://webrtcforthecurious.com/docs/01-what-why-and-how/ // Notes: about reliability: https://stackoverflow.com/questions/54292824/webrtc-channel-reliability -// TODO - Investigate Detaching the datachannel: https://github.com/pion/webrtc/tree/master/examples/data-channels-detach // TODO - Let stun server list be injectable // TODO - Interesting note: if you run this in a docker container with the networking set to something other than "host" (ie the default is bridge), then what happens is the docker container gets NAT'ed behind the original host which causes you to need an ICE server. So You must run this code with HOST networking!!! @@ -17,6 +16,14 @@ import ( // - TODO - There might be some way to avoid this with: https://pkg.go.dev/github.com/pion/webrtc/v3#SettingEngine.SetNAT1To1IPs // - TODO - also this: https://pkg.go.dev/github.com/pion/webrtc/v3#SettingEngine.SetICEUDPMux +// Current settings engine settings +// Detaching the datachannel: https://github.com/pion/webrtc/tree/master/examples/data-channels-detach +func getSettingsEngineApi() *webrtc.API { + s := webrtc.SettingEngine{} + s.DetachDataChannels() + return webrtc.NewAPI(webrtc.WithSettingEngine(s)) +} + // Internal messages used for webrtc negotiation/signalling type signalMsg struct { SDP *sdpMsg