Files
rtcnet/dial.go
2025-05-02 14:32:42 -04:00

313 lines
8.0 KiB
Go

package rtcnet
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net"
"sync"
"time"
"github.com/pion/webrtc/v4"
)
func Dial(address string, tlsConfig *tls.Config, ordered bool, iceServers []string) (*Conn, error) {
dialCtx, cancel := context.WithTimeout(context.Background(), 10 * time.Second) // TODO: pass in timeout
defer cancel()
wSock, err := dialWebsocket(address, tlsConfig, dialCtx)
if err != nil {
return nil, err
}
defer wSock.Close()
// Offer WebRtc Upgrade
var candidatesMux sync.Mutex
pendingCandidates := make([]*webrtc.ICECandidate, 0)
config := webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
// {
// URLs: []string{"stun:stun.l.google.com:19302"},
// },
},
}
if len(iceServers) > 0 {
config.ICEServers = append(config.ICEServers,
webrtc.ICEServer{
URLs: iceServers,
})
}
trace("Dial: Starting WebRTC negotiation")
api := getSettingsEngineApi()
peerConnection, err := api.NewPeerConnection(config)
if err != nil {
logger.Error().
Err(err).
Msg("Dial: NewPeerConnection")
return nil, err
}
conn := newConn(peerConnection, wSock.LocalAddr(), wSock.RemoteAddr())
connFinish := make(chan bool)
peerConnection.OnICECandidate(func(c *webrtc.ICECandidate) {
logger.Trace().Msg("Dial: peerConnection.OnICECandidate")
if c == nil {
return
}
candidatesMux.Lock()
defer candidatesMux.Unlock()
desc := peerConnection.RemoteDescription()
if desc == nil {
pendingCandidates = append(pendingCandidates, c)
} else {
sigMsg := signalMsg{
Candidate: &candidateMsg{c.ToJSON()},
}
err := sendMsg(wSock, sigMsg)
if err != nil {
logger.Error().
Err(err).
Msg("Dial: Receive Peer OnIceCandidate")
conn.pushErrorData(err)
return
}
}
})
go func() {
buf := make([]byte, 8 * 1024)
for {
n, err := wSock.Read(buf)
if err != nil {
// TODO: Are there any cases where we might get an error here but its not fatal?
// Assume the websocket is closed and break
logger.Error().
Err(err).
Msg("Failed to read from websocket")
// TODO: We don't want this to cause an error, if it closed for normal reasons. Else we do want it to cause an error
// conn.pushErrorData(err)
return
}
if n == 0 { continue }
var msg signalMsg
err = json.Unmarshal(buf[:n], &msg)
if err != nil {
// There was some problem with the unmarshal. Let's just continue looking for another valid message
logger.Error().
Err(err).
Msg("Failed to unmarshal signalling message")
continue
}
if msg.SDP != nil {
trace("Dial: RtcSdpMsg")
sdp := webrtc.SessionDescription{}
sdp.Type = msg.SDP.Type
sdp.SDP = msg.SDP.SDP
err := peerConnection.SetRemoteDescription(sdp)
if err != nil {
logger.Error().
Err(err).
Msg("Dial: SetRemoteDescription")
conn.pushErrorData(err)
return
}
// Warning: Be very careful with canadidatesMux
{
candidatesMux.Lock()
for _, c := range pendingCandidates {
trace(fmt.Sprintf("Dial: pendingCandidates: %v", *c))
sigMsg := signalMsg{
Candidate: &candidateMsg{c.ToJSON()},
}
err := sendMsg(wSock, sigMsg)
if err != nil {
logger.Error().
Err(err).
Msg("Dial: Failed Websocket Send: Pending Candidate Msg")
conn.pushErrorData(err)
candidatesMux.Unlock()
return
}
}
candidatesMux.Unlock()
}
} else if msg.Candidate != nil {
trace("Dial: RtcCandidateMsg")
err := peerConnection.AddICECandidate(msg.Candidate.CandidateInit)
if err != nil {
logger.Error().
Err(err).
Msg("Dial: AddIceCandidate")
conn.pushErrorData(err)
return
}
} else {
// Warning: no valid message included
trace("Dial: ws received unknown message " + string(buf[:n]))
continue
}
}
}()
// Create a datachannel with label 'data'
// maxRetransmits := uint16(0)
dataChannelOptions := webrtc.DataChannelInit{
Ordered: &ordered,
// MaxRetransmits: &maxRetransmits,
}
dataChannel, err := peerConnection.CreateDataChannel("data", &dataChannelOptions)
if err != nil {
logger.Error().
Err(err).
Msg("Dial: CreateDataChannel")
return nil, err
}
// Set the handler for Peer connection state
// This will notify you when the peer has connected/disconnected
peerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) {
trace("Dial: Peer Connection State has changed: " + s.String())
if s == webrtc.PeerConnectionStateClosed {
trace("Dial: webrtc.PeerConnectionStateClosed")
// This means the webrtc was closed by one side. Just close it on the other side
conn.Close()
}
if s == webrtc.PeerConnectionStateFailed {
trace("Dial: PeerConnectionStateFailed")
// Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
// Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
// Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
// conn.pushErrorData(fmt.Errorf("Peer Connection has gone to failed"))
conn.Close()
} else if s == webrtc.PeerConnectionStateDisconnected {
trace("Dial: PeerConnectionStateDisconnected")
// conn.errorChan <- fmt.Errorf("Peer Connection has gone to disconnected")
}
})
// Register channel opening handling
dataChannel.OnOpen(func() {
printDataChannel(dataChannel)
trace("Dial: Data Channel OnOpen")
detached, err := dataChannel.Detach()
if err != nil {
conn.pushErrorData(err)
} else {
conn.raw = detached
conn.dataChannel = dataChannel
connFinish <- true
}
})
// Note: Stopped using this now that I have detached data channels
// // Register text message handling
// dataChannel.OnMessage(func(msg webrtc.DataChannelMessage) {
// // log.Print("Client: Received Msg from DataChannel", len(msg.Data))
// if msg.IsString {
// trace("Dial: DataChannel OnMessage: Received string message, skipping")
// return
// }
// conn.pushReadData(msg.Data)
// })
// Create an offer to send to the other process
offer, err := peerConnection.CreateOffer(nil)
if err != nil {
logger.Error().
Err(err).
Msg("Dial: CreateOffer")
return nil, err
}
// fmt.Println("CreateOffer")
// Sets the LocalDescription, and starts our UDP listeners
// Note: this will start the gathering of ICE candidates
err = peerConnection.SetLocalDescription(offer)
if err != nil {
logger.Error().
Err(err).
Msg("Dial: SetLocalDescription")
return nil, err
}
// fmt.Println("SetLocalDesc")
sigMsg := signalMsg{
SDP: &sdpMsg{ offer.Type, offer.SDP },
}
err = sendMsg(wSock, sigMsg)
if err != nil {
logger.Error().
Err(err).
Msg("Dial: websocket.Send RtcSdp Offer")
return nil, err
}
// Wait until the webrtc connection is finished getting setup
select {
case <-dialCtx.Done():
logger.Error().
Err(err).
Msg("Dial: context Done")
return nil, dialCtx.Err()
case err := <-conn.errorChan:
logger.Error().
Err(err).
Msg("Dial: error exit")
return nil, err // There was an error in setup
case <-connFinish:
trace("Dial: normal exit")
// Socket finished getting setup
return conn, nil
}
}
func sendMsg(conn net.Conn, msg signalMsg) error {
// log.Print("sendMsg: ", msg)
msgDat, err := json.Marshal(msg)
if err != nil {
logger.Error().
Err(err).
Msg("sendMsg: Marshal")
return err
}
// log.Print("sendMsg: Marshalled: ", string(msgDat))
_, err = conn.Write(msgDat)
if err != nil {
logger.Error().
Err(err).
Msg("sendMsg: conn write")
return err
}
return nil
}
func printDataChannel(d *webrtc.DataChannel) {
trace(fmt.Sprintf(" Label : %v \n ID: %v \n MaxPacketLifeTime: %v \n MaxRetransmits: %v \n Negotiated: %v \n Ordered: %v \n Protocol: %s \n ReadyState: %v",
d.Label(), d.ID(), d.MaxPacketLifeTime(), d.MaxRetransmits(), d.Negotiated(), d.Ordered(), d.Protocol(), d.ReadyState()),
)
// t := d.Transport()
// log.Print(fmt.Sprintf(" Transport:
}