// SPDX-FileCopyrightText: 2023 The Pion community // SPDX-License-Identifier: MIT //go:build !js // +build !js // broadcast demonstrates how to broadcast a video to many peers, while only requiring the broadcaster to upload once. package main import ( "encoding/base64" "encoding/json" "errors" "flag" "fmt" "io" "net/http" "strconv" "github.com/pion/interceptor" "github.com/pion/interceptor/pkg/intervalpli" "github.com/pion/webrtc/v4" ) // nolint:gocognit, cyclop func main() { port := flag.Int("port", 8080, "http server port") flag.Parse() sdpChan := httpSDPServer(*port) // Everything below is the Pion WebRTC API, thanks for using it ❤️. offer := webrtc.SessionDescription{} decode(<-sdpChan, &offer) fmt.Println("") peerConnectionConfig := webrtc.Configuration{ ICEServers: []webrtc.ICEServer{ { URLs: []string{"stun:stun.l.google.com:19302"}, }, }, } mediaEngine := &webrtc.MediaEngine{} if err := mediaEngine.RegisterDefaultCodecs(); err != nil { panic(err) } // Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline. // This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection` // this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry // for each PeerConnection. interceptorRegistry := &interceptor.Registry{} // Use the default set of Interceptors if err := webrtc.RegisterDefaultInterceptors(mediaEngine, interceptorRegistry); err != nil { panic(err) } // Register a intervalpli factory // This interceptor sends a PLI every 3 seconds. A PLI causes a video keyframe to be generated by the sender. // This makes our video seekable and more error resilent, but at a cost of lower picture quality and higher bitrates // A real world application should process incoming RTCP packets from viewers and forward them to senders intervalPliFactory, err := intervalpli.NewReceiverInterceptor() if err != nil { panic(err) } interceptorRegistry.Add(intervalPliFactory) // Create a new RTCPeerConnection peerConnection, err := webrtc.NewAPI( webrtc.WithMediaEngine(mediaEngine), webrtc.WithInterceptorRegistry(interceptorRegistry), ).NewPeerConnection(peerConnectionConfig) if err != nil { panic(err) } defer func() { if cErr := peerConnection.Close(); cErr != nil { fmt.Printf("cannot close peerConnection: %v\n", cErr) } }() // Allow us to receive 1 video track if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo); err != nil { panic(err) } localTrackChan := make(chan *webrtc.TrackLocalStaticRTP) // Set a handler for when a new remote track starts, this just distributes all our packets // to connected peers peerConnection.OnTrack(func(remoteTrack *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { //nolint: revive // Create a local track, all our SFU clients will be fed via this track localTrack, newTrackErr := webrtc.NewTrackLocalStaticRTP(remoteTrack.Codec().RTPCodecCapability, "video", "pion") if newTrackErr != nil { panic(newTrackErr) } localTrackChan <- localTrack rtpBuf := make([]byte, 1400) for { i, _, readErr := remoteTrack.Read(rtpBuf) if readErr != nil { panic(readErr) } // ErrClosedPipe means we don't have any subscribers, this is ok if no peers have connected yet if _, err = localTrack.Write(rtpBuf[:i]); err != nil && !errors.Is(err, io.ErrClosedPipe) { panic(err) } } }) // Set the remote SessionDescription err = peerConnection.SetRemoteDescription(offer) if err != nil { panic(err) } // Create answer answer, err := peerConnection.CreateAnswer(nil) if err != nil { panic(err) } // Create channel that is blocked until ICE Gathering is complete gatherComplete := webrtc.GatheringCompletePromise(peerConnection) // Sets the LocalDescription, and starts our UDP listeners err = peerConnection.SetLocalDescription(answer) if err != nil { panic(err) } // Block until ICE Gathering is complete, disabling trickle ICE // we do this because we only can exchange one signaling message // in a production application you should exchange ICE Candidates via OnICECandidate <-gatherComplete // Get the LocalDescription and take it to base64 so we can paste in browser fmt.Println(encode(peerConnection.LocalDescription())) localTrack := <-localTrackChan for { fmt.Println("") fmt.Println("Curl an base64 SDP to start sendonly peer connection") recvOnlyOffer := webrtc.SessionDescription{} decode(<-sdpChan, &recvOnlyOffer) // Create a new PeerConnection peerConnection, err := webrtc.NewPeerConnection(peerConnectionConfig) if err != nil { panic(err) } rtpSender, err := peerConnection.AddTrack(localTrack) if err != nil { panic(err) } // Read incoming RTCP packets // Before these packets are returned they are processed by interceptors. For things // like NACK this needs to be called. go func() { rtcpBuf := make([]byte, 1500) for { if _, _, rtcpErr := rtpSender.Read(rtcpBuf); rtcpErr != nil { return } } }() // Set the remote SessionDescription err = peerConnection.SetRemoteDescription(recvOnlyOffer) if err != nil { panic(err) } // Create answer answer, err := peerConnection.CreateAnswer(nil) if err != nil { panic(err) } // Create channel that is blocked until ICE Gathering is complete gatherComplete = webrtc.GatheringCompletePromise(peerConnection) // Sets the LocalDescription, and starts our UDP listeners err = peerConnection.SetLocalDescription(answer) if err != nil { panic(err) } // Block until ICE Gathering is complete, disabling trickle ICE // we do this because we only can exchange one signaling message // in a production application you should exchange ICE Candidates via OnICECandidate <-gatherComplete // Get the LocalDescription and take it to base64 so we can paste in browser fmt.Println(encode(peerConnection.LocalDescription())) } } // JSON encode + base64 a SessionDescription. func encode(obj *webrtc.SessionDescription) string { b, err := json.Marshal(obj) if err != nil { panic(err) } return base64.StdEncoding.EncodeToString(b) } // Decode a base64 and unmarshal JSON into a SessionDescription. func decode(in string, obj *webrtc.SessionDescription) { b, err := base64.StdEncoding.DecodeString(in) if err != nil { panic(err) } if err = json.Unmarshal(b, obj); err != nil { panic(err) } } // httpSDPServer starts a HTTP Server that consumes SDPs. func httpSDPServer(port int) chan string { sdpChan := make(chan string) http.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) { body, _ := io.ReadAll(req.Body) fmt.Fprintf(res, "done") //nolint: errcheck sdpChan <- string(body) }) go func() { // nolint: gosec panic(http.ListenAndServe(":"+strconv.Itoa(port), nil)) }() return sdpChan }