// SPDX-FileCopyrightText: 2023 The Pion community // SPDX-License-Identifier: MIT // pion-to-pion is an example of two pion instances communicating directly! package main import ( "bufio" "bytes" "encoding/base64" "encoding/json" "errors" "flag" "fmt" "io" "net/http" "os" "strings" "sync" "time" "github.com/pion/randutil" "github.com/pion/webrtc/v4" ) func signalCandidate(addr string, c *webrtc.ICECandidate) error { payload := []byte(c.ToJSON().Candidate) resp, err := http.Post(fmt.Sprintf("http://%s/candidate", addr), // nolint:noctx "application/json; charset=utf-8", bytes.NewReader(payload)) if err != nil { return err } return resp.Body.Close() } func main() { // nolint:gocognit offerAddr := flag.String("offer-address", "localhost:50000", "Address that the Offer HTTP server is hosted on.") answerAddr := flag.String("answer-address", ":60000", "Address that the Answer HTTP server is hosted on.") flag.Parse() var candidatesMux sync.Mutex pendingCandidates := make([]*webrtc.ICECandidate, 0) // Everything below is the Pion WebRTC API! Thanks for using it ❤️. // Prepare the configuration config := webrtc.Configuration{ ICEServers: []webrtc.ICEServer{ { URLs: []string{"stun:stun.l.google.com:19302"}, }, }, } // Create a new RTCPeerConnection peerConnection, err := webrtc.NewPeerConnection(config) if err != nil { panic(err) } defer func() { if err := peerConnection.Close(); err != nil { fmt.Printf("cannot close peerConnection: %v\n", err) } }() // When an ICE candidate is available send to the other Pion instance // the other Pion instance will add this candidate by calling AddICECandidate peerConnection.OnICECandidate(func(c *webrtc.ICECandidate) { if c == nil { return } candidatesMux.Lock() defer candidatesMux.Unlock() desc := peerConnection.RemoteDescription() if desc == nil { pendingCandidates = append(pendingCandidates, c) } else if onICECandidateErr := signalCandidate(*offerAddr, c); onICECandidateErr != nil { panic(onICECandidateErr) } }) // A HTTP handler that allows the other Pion instance to send us ICE candidates // This allows us to add ICE candidates faster, we don't have to wait for STUN or TURN // candidates which may be slower http.HandleFunc("/candidate", func(w http.ResponseWriter, r *http.Request) { //nolint: revive candidate, candidateErr := io.ReadAll(r.Body) if candidateErr != nil { panic(candidateErr) } if candidateErr := peerConnection.AddICECandidate(webrtc.ICECandidateInit{Candidate: string(candidate)}); candidateErr != nil { panic(candidateErr) } }) // A HTTP handler that processes a SessionDescription given to us from the other Pion process http.HandleFunc("/sdp", func(w http.ResponseWriter, r *http.Request) { // nolint: revive sdp := webrtc.SessionDescription{} if err := json.NewDecoder(r.Body).Decode(&sdp); err != nil { panic(err) } if err := peerConnection.SetRemoteDescription(sdp); err != nil { panic(err) } // Create an answer to send to the other process answer, err := peerConnection.CreateAnswer(nil) if err != nil { panic(err) } // Send our answer to the HTTP server listening in the other process payload, err := json.Marshal(answer) if err != nil { panic(err) } resp, err := http.Post(fmt.Sprintf("http://%s/sdp", *offerAddr), "application/json; charset=utf-8", bytes.NewReader(payload)) // nolint:noctx if err != nil { panic(err) } else if closeErr := resp.Body.Close(); closeErr != nil { panic(closeErr) } // Sets the LocalDescription, and starts our UDP listeners err = peerConnection.SetLocalDescription(answer) if err != nil { panic(err) } candidatesMux.Lock() for _, c := range pendingCandidates { onICECandidateErr := signalCandidate(*offerAddr, c) if onICECandidateErr != nil { panic(onICECandidateErr) } } candidatesMux.Unlock() }) // Set the handler for Peer connection state // This will notify you when the peer has connected/disconnected peerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) { fmt.Printf("Peer Connection State has changed: %s\n", s.String()) if s == webrtc.PeerConnectionStateFailed { // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart. // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout. // Note that the PeerConnection may come back from PeerConnectionStateDisconnected. fmt.Println("Peer Connection has gone to failed exiting") os.Exit(0) } if s == webrtc.PeerConnectionStateClosed { // PeerConnection was explicitly closed. This usually happens from a DTLS CloseNotify fmt.Println("Peer Connection has gone to closed exiting") os.Exit(0) } }) // Register data channel creation handling peerConnection.OnDataChannel(func(d *webrtc.DataChannel) { fmt.Printf("New DataChannel %s %d\n", d.Label(), d.ID()) // Register channel opening handling d.OnOpen(func() { fmt.Printf("Data channel '%s'-'%d' open. Random messages will now be sent to any connected DataChannels every 5 seconds\n", d.Label(), d.ID()) for range time.NewTicker(5 * time.Second).C { message, sendTextErr := randutil.GenerateCryptoRandomString(15, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") if sendTextErr != nil { panic(sendTextErr) } // Send the message as text fmt.Printf("Sending '%s'\n", message) if sendTextErr = d.SendText(message); sendTextErr != nil { panic(sendTextErr) } } }) // Register text message handling d.OnMessage(func(msg webrtc.DataChannelMessage) { fmt.Printf("Message from DataChannel '%s': '%s'\n", d.Label(), string(msg.Data)) }) }) // Start HTTP server that accepts requests from the offer process to exchange SDP and Candidates // nolint: gosec panic(http.ListenAndServe(*answerAddr, nil)) } // Read from stdin until we get a newline func readUntilNewline() (in string) { var err error r := bufio.NewReader(os.Stdin) for { in, err = r.ReadString('\n') if err != nil && !errors.Is(err, io.EOF) { panic(err) } if in = strings.TrimSpace(in); len(in) > 0 { break } } fmt.Println("") return } // JSON encode + base64 a SessionDescription func encode(obj *webrtc.SessionDescription) string { b, err := json.Marshal(obj) if err != nil { panic(err) } return base64.StdEncoding.EncodeToString(b) } // Decode a base64 and unmarshal JSON into a SessionDescription func decode(in string, obj *webrtc.SessionDescription) { b, err := base64.StdEncoding.DecodeString(in) if err != nil { panic(err) } if err = json.Unmarshal(b, obj); err != nil { panic(err) } }