Did some cleanup

This commit is contained in:
Jacob Stewart
2023-08-06 09:36:09 -04:00
parent 88270d58a4
commit 16ca97588c
4 changed files with 55 additions and 38 deletions

43
conn.go
View File

@@ -4,6 +4,9 @@ import (
"fmt"
"net"
"time"
"sync"
"sync/atomic"
"errors"
"github.com/pion/webrtc/v3"
)
@@ -11,20 +14,35 @@ import (
type Conn struct {
peerConn *webrtc.PeerConnection
dataChannel *webrtc.DataChannel
websocket net.Conn
readChan chan []byte
errorChan chan error
closed bool // TODO - atomic?
closeOnce sync.Once
closed atomic.Bool
}
func newConn(peer *webrtc.PeerConnection, websocket net.Conn) *Conn {
func newConn(peer *webrtc.PeerConnection) *Conn {
return &Conn{
peerConn: peer,
websocket: websocket,
readChan: make(chan []byte, 1024), //TODO! - Sizing
errorChan: make(chan error, 16), //TODO! - Sizing
}
}
// For pushing read data out of the datachannel and into the read buffer
func (c *Conn) pushReadData(dat []byte) {
if c.closed.Load() { return } // Skip if we are already closed
c.readChan <- dat
}
// For pushing error data out of the webrtc connection into the error buffer
func (c *Conn) pushErrorData(err error) {
if c.closed.Load() { return } // Skip if we are already closed
c.errorChan <- err
}
func (c *Conn) Read(b []byte) (int, error) {
select {
case err, ok := <-c.errorChan:
@@ -54,20 +72,23 @@ func (c *Conn) Write(b []byte) (int, error) {
}
func (c *Conn) Close() error {
if !c.closed {
c.closed = true
var closeErr error
c.closeOnce.Do(func() {
trace("conn: closing")
c.closed.Store(true)
err1 := c.dataChannel.Close()
err2 := c.peerConn.Close()
err3 := c.websocket.Close()
close(c.readChan)
close(c.errorChan)
if err1 != nil || err2 != nil || err3 != nil {
return fmt.Errorf("Conn Close Error: datachannel: %s peerconn: %s websocket: %s", err1, err2, err3)
if err1 != nil || err2 != nil {
closeErr = errors.Join(errors.New("failed to close: (datachannel, peerconn)"), err1, err2)
logErr("conn: closing error: ", closeErr)
}
}
return nil
})
return closeErr
}
func (c *Conn) LocalAddr() net.Addr {

30
dial.go
View File

@@ -37,7 +37,7 @@ func Dial(address string, tlsConfig *tls.Config) (*Conn, error) {
return nil, err
}
conn := newConn(peerConnection, wSock)
conn := newConn(peerConnection)
connFinish := make(chan bool)
peerConnection.OnICECandidate(func(c *webrtc.ICECandidate) {
// log.Debug().Msg("Dial: OnICECandidate")
@@ -55,12 +55,10 @@ func Dial(address string, tlsConfig *tls.Config) (*Conn, error) {
sigMsg := signalMsg{
Candidate: &candidateMsg{c.ToJSON()},
}
err := sendMsg(conn.websocket, sigMsg)
err := sendMsg(wSock, sigMsg)
if err != nil {
logErr("Dial: Receive Peer OnIceCandidate", err)
if !conn.closed {
conn.errorChan <- err
}
conn.pushErrorData(err)
return
}
}
@@ -69,7 +67,7 @@ func Dial(address string, tlsConfig *tls.Config) (*Conn, error) {
go func() {
buf := make([]byte, 8 * 1024)
for {
n, err := conn.websocket.Read(buf)
n, err := wSock.Read(buf)
if err != nil {
// TODO: Are there any cases where we might get an error here but its not fatal?
// Assume the websocket is closed and break
@@ -94,9 +92,7 @@ func Dial(address string, tlsConfig *tls.Config) (*Conn, error) {
err := peerConnection.SetRemoteDescription(sdp)
if err != nil {
logErr("Dial: SetRemoteDescription", err)
if !conn.closed {
conn.errorChan <- err
}
conn.pushErrorData(err)
return
}
@@ -108,12 +104,10 @@ func Dial(address string, tlsConfig *tls.Config) (*Conn, error) {
sigMsg := signalMsg{
Candidate: &candidateMsg{c.ToJSON()},
}
err := sendMsg(conn.websocket, sigMsg)
err := sendMsg(wSock, sigMsg)
if err != nil {
logErr("Dial: Failed Websocket Send: Pending Candidate Msg", err)
if !conn.closed {
conn.errorChan <- err
}
conn.pushErrorData(err)
return
}
}
@@ -123,9 +117,7 @@ func Dial(address string, tlsConfig *tls.Config) (*Conn, error) {
err := peerConnection.AddICECandidate(msg.Candidate.CandidateInit)
if err != nil {
logErr("Dial: AddIceCandidate", err)
if !conn.closed {
conn.errorChan <- err
}
conn.pushErrorData(err)
return
}
} else {
@@ -167,7 +159,7 @@ func Dial(address string, tlsConfig *tls.Config) (*Conn, error) {
// Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
// Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
conn.errorChan <- fmt.Errorf("Peer Connection has gone to failed")
conn.pushErrorData(fmt.Errorf("Peer Connection has gone to failed"))
} else if s == webrtc.PeerConnectionStateDisconnected {
trace("Dial: PeerConnectionStateDisconnected")
// conn.errorChan <- fmt.Errorf("Peer Connection has gone to disconnected")
@@ -189,7 +181,7 @@ func Dial(address string, tlsConfig *tls.Config) (*Conn, error) {
trace("Dial: DataChannel OnMessage: Received string message, skipping")
return
}
conn.readChan <- msg.Data
conn.pushReadData(msg.Data)
})
// Create an offer to send to the other process
@@ -212,7 +204,7 @@ func Dial(address string, tlsConfig *tls.Config) (*Conn, error) {
sigMsg := signalMsg{
SDP: &sdpMsg{ offer.Type, offer.SDP },
}
err = sendMsg(conn.websocket, sigMsg)
err = sendMsg(wSock, sigMsg)
if err != nil {
logErr("Dial: websocket.Send RtcSdp Offer", err)
return nil, err

View File

@@ -65,6 +65,7 @@ func (l *Listener) Accept() (net.Conn, error) {
}
func (l *Listener) Close() error {
l.closed.Store(true)
// TODO: these aren't channel safe. need to do a boolean check before writing to them.
close(l.pendingAccepts)
close(l.pendingAcceptErrors)
@@ -75,6 +76,8 @@ func (l *Listener) Addr() net.Addr {
}
func (l *Listener) attemptWebRtcNegotiation(wsConn net.Conn) {
defer trace("finished attemptWebRtcNegotiation")
var candidatesMux sync.Mutex
pendingCandidates := make([]*webrtc.ICECandidate, 0)
config := webrtc.Configuration{
@@ -139,7 +142,7 @@ func (l *Listener) attemptWebRtcNegotiation(wsConn net.Conn) {
// Register data channel creation handling
peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
conn := newConn(peerConnection, wsConn)
conn := newConn(peerConnection)
conn.dataChannel = d
// Register channel opening handling
@@ -149,10 +152,10 @@ func (l *Listener) attemptWebRtcNegotiation(wsConn net.Conn) {
l.pendingAccepts <- conn
})
// Register channel opening handling
d.OnClose(func() {
trace("Listener: Data channel was closed!!")
})
// // Register channel opening handling
// d.OnClose(func() {
// trace("Listener: Data channel was closed!")
// })
// Register text message handling
d.OnMessage(func(msg webrtc.DataChannelMessage) {
@@ -161,16 +164,17 @@ func (l *Listener) attemptWebRtcNegotiation(wsConn net.Conn) {
trace("Listener: DataChannel OnMessage: Received string message, skipping")
return
}
conn.readChan <- msg.Data
conn.pushReadData(msg.Data)
})
})
buf := make([]byte, 8 * 1024)
buf := make([]byte, 8 * 1024) // TODO: hardcoded to be big enough for the signalling messages
for {
n, err := wsConn.Read(buf)
if err != nil {
// TODO: Are there any cases where we might get an error here but its not fatal?
// Assume the websocket is closed and break
logErr("error reading websocket", err)
break
}

View File

@@ -55,7 +55,7 @@ func (l dLog) Log(msg ...any) {
}
func (l dLog) LogErr(msg string, err error) {
log.Printf("%s: %w", msg, err)
log.Printf("%s: %s\n", msg, err)
}
// Actual log functions