mirror of
				https://github.com/aler9/rtsp-simple-server
				synced 2025-10-31 19:13:22 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			833 lines
		
	
	
		
			19 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			833 lines
		
	
	
		
			19 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Package webrtc contains WebRTC utilities.
 | |
| package webrtc
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"net"
 | |
| 	"slices"
 | |
| 	"strconv"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 	"sync/atomic"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/pion/ice/v4"
 | |
| 	"github.com/pion/interceptor"
 | |
| 	"github.com/pion/sdp/v3"
 | |
| 	"github.com/pion/webrtc/v4"
 | |
| 
 | |
| 	"github.com/bluenviron/mediamtx/internal/conf"
 | |
| 	"github.com/bluenviron/mediamtx/internal/logger"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	webrtcStreamID = "mediamtx"
 | |
| )
 | |
| 
 | |
| func interfaceIPs(interfaceList []string) ([]string, error) {
 | |
| 	intfs, err := net.Interfaces()
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	var ips []string
 | |
| 
 | |
| 	for _, intf := range intfs {
 | |
| 		if len(interfaceList) == 0 || slices.Contains(interfaceList, intf.Name) {
 | |
| 			var addrs []net.Addr
 | |
| 			addrs, err = intf.Addrs()
 | |
| 			if err == nil {
 | |
| 				for _, addr := range addrs {
 | |
| 					var ip net.IP
 | |
| 
 | |
| 					switch v := addr.(type) {
 | |
| 					case *net.IPNet:
 | |
| 						ip = v.IP
 | |
| 					case *net.IPAddr:
 | |
| 						ip = v.IP
 | |
| 					}
 | |
| 
 | |
| 					if ip != nil {
 | |
| 						ips = append(ips, ip.String())
 | |
| 					}
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return ips, nil
 | |
| }
 | |
| 
 | |
| // * skip ConfigureRTCPReports
 | |
| // * add statsInterceptor
 | |
| func registerInterceptors(
 | |
| 	mediaEngine *webrtc.MediaEngine,
 | |
| 	interceptorRegistry *interceptor.Registry,
 | |
| 	onStatsInterceptor func(s *statsInterceptor),
 | |
| ) error {
 | |
| 	err := webrtc.ConfigureNack(mediaEngine, interceptorRegistry)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	err = webrtc.ConfigureSimulcastExtensionHeaders(mediaEngine)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	err = webrtc.ConfigureTWCCSender(mediaEngine, interceptorRegistry)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	interceptorRegistry.Add(&statsInterceptorFactory{
 | |
| 		onCreate: onStatsInterceptor,
 | |
| 	})
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func candidateLabel(c *webrtc.ICECandidate) string {
 | |
| 	return c.Typ.String() + "/" + c.Protocol.String() + "/" +
 | |
| 		c.Address + "/" + strconv.FormatInt(int64(c.Port), 10)
 | |
| }
 | |
| 
 | |
| // TracksAreValid checks whether tracks in the SDP are valid
 | |
| func TracksAreValid(medias []*sdp.MediaDescription) error {
 | |
| 	videoTrack := false
 | |
| 	audioTrack := false
 | |
| 
 | |
| 	for _, media := range medias {
 | |
| 		switch media.MediaName.Media {
 | |
| 		case "video":
 | |
| 			if videoTrack {
 | |
| 				return fmt.Errorf("only a single video and a single audio track are supported")
 | |
| 			}
 | |
| 			videoTrack = true
 | |
| 
 | |
| 		case "audio":
 | |
| 			if audioTrack {
 | |
| 				return fmt.Errorf("only a single video and a single audio track are supported")
 | |
| 			}
 | |
| 			audioTrack = true
 | |
| 
 | |
| 		default:
 | |
| 			return fmt.Errorf("unsupported media '%s'", media.MediaName.Media)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if !videoTrack && !audioTrack {
 | |
| 		return fmt.Errorf("no valid tracks found")
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| type trackRecvPair struct {
 | |
| 	track    *webrtc.TrackRemote
 | |
| 	receiver *webrtc.RTPReceiver
 | |
| }
 | |
| 
 | |
| // PeerConnection is a wrapper around webrtc.PeerConnection.
 | |
| type PeerConnection struct {
 | |
| 	LocalRandomUDP        bool
 | |
| 	ICEUDPMux             ice.UDPMux
 | |
| 	ICETCPMux             *TCPMuxWrapper
 | |
| 	ICEServers            []webrtc.ICEServer
 | |
| 	IPsFromInterfaces     bool
 | |
| 	IPsFromInterfacesList []string
 | |
| 	AdditionalHosts       []string
 | |
| 	HandshakeTimeout      conf.Duration
 | |
| 	TrackGatherTimeout    conf.Duration
 | |
| 	STUNGatherTimeout     conf.Duration
 | |
| 	Publish               bool
 | |
| 	OutgoingTracks        []*OutgoingTrack
 | |
| 	UseAbsoluteTimestamp  bool
 | |
| 	Log                   logger.Writer
 | |
| 
 | |
| 	wr                 *webrtc.PeerConnection
 | |
| 	ctx                context.Context
 | |
| 	ctxCancel          context.CancelFunc
 | |
| 	incomingTracks     []*IncomingTrack
 | |
| 	startedReading     *int64
 | |
| 	rtpPacketsReceived *uint64
 | |
| 	rtpPacketsSent     *uint64
 | |
| 	rtpPacketsLost     *uint64
 | |
| 	statsInterceptor   *statsInterceptor
 | |
| 
 | |
| 	newLocalCandidate chan *webrtc.ICECandidateInit
 | |
| 	incomingTrack     chan trackRecvPair
 | |
| 	connected         chan struct{}
 | |
| 	failed            chan struct{}
 | |
| 	closed            chan struct{}
 | |
| 	gatheringDone     chan struct{}
 | |
| 	done              chan struct{}
 | |
| 	chStartReading    chan struct{}
 | |
| }
 | |
| 
 | |
| // Start starts the peer connection.
 | |
| func (co *PeerConnection) Start() error {
 | |
| 	settingsEngine := webrtc.SettingEngine{}
 | |
| 
 | |
| 	settingsEngine.SetIncludeLoopbackCandidate(true)
 | |
| 
 | |
| 	// always enable TCP since we might be the client of a remote TCP listener
 | |
| 	networkTypes := []webrtc.NetworkType{
 | |
| 		webrtc.NetworkTypeTCP4,
 | |
| 		webrtc.NetworkTypeTCP6,
 | |
| 	}
 | |
| 
 | |
| 	if co.LocalRandomUDP || co.ICEUDPMux != nil || len(co.ICEServers) != 0 {
 | |
| 		networkTypes = append(networkTypes, webrtc.NetworkTypeUDP4, webrtc.NetworkTypeUDP6)
 | |
| 	}
 | |
| 
 | |
| 	settingsEngine.SetNetworkTypes(networkTypes)
 | |
| 
 | |
| 	if co.ICEUDPMux != nil {
 | |
| 		settingsEngine.SetICEUDPMux(co.ICEUDPMux)
 | |
| 	}
 | |
| 
 | |
| 	if co.ICETCPMux != nil {
 | |
| 		settingsEngine.SetICETCPMux(co.ICETCPMux.Mux)
 | |
| 	}
 | |
| 
 | |
| 	settingsEngine.SetSTUNGatherTimeout(time.Duration(co.STUNGatherTimeout))
 | |
| 
 | |
| 	mediaEngine := &webrtc.MediaEngine{}
 | |
| 
 | |
| 	if co.Publish {
 | |
| 		videoSetupped := false
 | |
| 		audioSetupped := false
 | |
| 		for _, tr := range co.OutgoingTracks {
 | |
| 			if tr.isVideo() {
 | |
| 				videoSetupped = true
 | |
| 			} else {
 | |
| 				audioSetupped = true
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		// When audio is not used, a track has to be present anyway,
 | |
| 		// otherwise video is not displayed on Firefox and Chrome.
 | |
| 		if !audioSetupped {
 | |
| 			co.OutgoingTracks = append(co.OutgoingTracks, &OutgoingTrack{
 | |
| 				Caps: webrtc.RTPCodecCapability{
 | |
| 					MimeType:  webrtc.MimeTypePCMU,
 | |
| 					ClockRate: 8000,
 | |
| 				},
 | |
| 			})
 | |
| 		}
 | |
| 
 | |
| 		for i, tr := range co.OutgoingTracks {
 | |
| 			var codecType webrtc.RTPCodecType
 | |
| 			if tr.isVideo() {
 | |
| 				codecType = webrtc.RTPCodecTypeVideo
 | |
| 			} else {
 | |
| 				codecType = webrtc.RTPCodecTypeAudio
 | |
| 			}
 | |
| 
 | |
| 			err := mediaEngine.RegisterCodec(webrtc.RTPCodecParameters{
 | |
| 				RTPCodecCapability: tr.Caps,
 | |
| 				PayloadType:        webrtc.PayloadType(96 + i),
 | |
| 			}, codecType)
 | |
| 			if err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		// When video is not used, a track must not be added but a codec has to present.
 | |
| 		// Otherwise audio is muted on Firefox and Chrome.
 | |
| 		if !videoSetupped {
 | |
| 			err := mediaEngine.RegisterCodec(webrtc.RTPCodecParameters{
 | |
| 				RTPCodecCapability: webrtc.RTPCodecCapability{
 | |
| 					MimeType:  webrtc.MimeTypeVP8,
 | |
| 					ClockRate: 90000,
 | |
| 				},
 | |
| 				PayloadType: 96,
 | |
| 			}, webrtc.RTPCodecTypeVideo)
 | |
| 			if err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 		}
 | |
| 	} else {
 | |
| 		for _, codec := range incomingVideoCodecs {
 | |
| 			err := mediaEngine.RegisterCodec(codec, webrtc.RTPCodecTypeVideo)
 | |
| 			if err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		for _, codec := range incomingAudioCodecs {
 | |
| 			err := mediaEngine.RegisterCodec(codec, webrtc.RTPCodecTypeAudio)
 | |
| 			if err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	interceptorRegistry := &interceptor.Registry{}
 | |
| 
 | |
| 	err := registerInterceptors(
 | |
| 		mediaEngine,
 | |
| 		interceptorRegistry,
 | |
| 		func(s *statsInterceptor) {
 | |
| 			co.statsInterceptor = s
 | |
| 		},
 | |
| 	)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	api := webrtc.NewAPI(
 | |
| 		webrtc.WithSettingEngine(settingsEngine),
 | |
| 		webrtc.WithMediaEngine(mediaEngine),
 | |
| 		webrtc.WithInterceptorRegistry(interceptorRegistry))
 | |
| 
 | |
| 	co.wr, err = api.NewPeerConnection(webrtc.Configuration{
 | |
| 		ICEServers: co.ICEServers,
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	co.ctx, co.ctxCancel = context.WithCancel(context.Background())
 | |
| 
 | |
| 	co.startedReading = new(int64)
 | |
| 	co.rtpPacketsReceived = new(uint64)
 | |
| 	co.rtpPacketsSent = new(uint64)
 | |
| 	co.rtpPacketsLost = new(uint64)
 | |
| 
 | |
| 	co.newLocalCandidate = make(chan *webrtc.ICECandidateInit)
 | |
| 	co.connected = make(chan struct{})
 | |
| 	co.failed = make(chan struct{})
 | |
| 	co.closed = make(chan struct{})
 | |
| 	co.gatheringDone = make(chan struct{})
 | |
| 	co.incomingTrack = make(chan trackRecvPair)
 | |
| 	co.done = make(chan struct{})
 | |
| 	co.chStartReading = make(chan struct{})
 | |
| 
 | |
| 	if co.Publish {
 | |
| 		for _, tr := range co.OutgoingTracks {
 | |
| 			err = tr.setup(co)
 | |
| 			if err != nil {
 | |
| 				co.wr.GracefulClose() //nolint:errcheck
 | |
| 				return err
 | |
| 			}
 | |
| 		}
 | |
| 	} else {
 | |
| 		_, err = co.wr.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{
 | |
| 			Direction: webrtc.RTPTransceiverDirectionRecvonly,
 | |
| 		})
 | |
| 		if err != nil {
 | |
| 			co.wr.GracefulClose() //nolint:errcheck
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		_, err = co.wr.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{
 | |
| 			Direction: webrtc.RTPTransceiverDirectionRecvonly,
 | |
| 		})
 | |
| 		if err != nil {
 | |
| 			co.wr.GracefulClose() //nolint:errcheck
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		co.wr.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
 | |
| 			select {
 | |
| 			case co.incomingTrack <- trackRecvPair{track, receiver}:
 | |
| 			case <-co.ctx.Done():
 | |
| 			}
 | |
| 		})
 | |
| 	}
 | |
| 
 | |
| 	var stateChangeMutex sync.Mutex
 | |
| 
 | |
| 	co.wr.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
 | |
| 		stateChangeMutex.Lock()
 | |
| 		defer stateChangeMutex.Unlock()
 | |
| 
 | |
| 		select {
 | |
| 		case <-co.closed:
 | |
| 			return
 | |
| 		default:
 | |
| 		}
 | |
| 
 | |
| 		co.Log.Log(logger.Debug, "peer connection state: "+state.String())
 | |
| 
 | |
| 		switch state {
 | |
| 		case webrtc.PeerConnectionStateConnected:
 | |
| 			// PeerConnectionStateConnected can arrive twice, since state can
 | |
| 			// switch from "disconnected" to "connected".
 | |
| 			// contrarily, we're interested into emitting "connected" once.
 | |
| 			select {
 | |
| 			case <-co.connected:
 | |
| 				return
 | |
| 			default:
 | |
| 			}
 | |
| 
 | |
| 			co.Log.Log(logger.Info, "peer connection established, local candidate: %v, remote candidate: %v",
 | |
| 				co.LocalCandidate(), co.RemoteCandidate())
 | |
| 
 | |
| 			close(co.connected)
 | |
| 
 | |
| 		case webrtc.PeerConnectionStateFailed:
 | |
| 			close(co.failed)
 | |
| 
 | |
| 		case webrtc.PeerConnectionStateClosed:
 | |
| 			// "closed" can arrive before "failed" and without
 | |
| 			// the Close() method being called at all.
 | |
| 			// It happens when the other peer sends a termination
 | |
| 			// message like a DTLS CloseNotify.
 | |
| 			select {
 | |
| 			case <-co.failed:
 | |
| 			default:
 | |
| 				close(co.failed)
 | |
| 			}
 | |
| 
 | |
| 			close(co.closed)
 | |
| 		}
 | |
| 	})
 | |
| 
 | |
| 	co.wr.OnICECandidate(func(i *webrtc.ICECandidate) {
 | |
| 		if i != nil {
 | |
| 			v := i.ToJSON()
 | |
| 			select {
 | |
| 			case co.newLocalCandidate <- &v:
 | |
| 			case <-co.connected:
 | |
| 			case <-co.ctx.Done():
 | |
| 			}
 | |
| 		} else {
 | |
| 			close(co.gatheringDone)
 | |
| 		}
 | |
| 	})
 | |
| 
 | |
| 	go co.run()
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Close closes the connection.
 | |
| func (co *PeerConnection) Close() {
 | |
| 	co.ctxCancel()
 | |
| 	<-co.done
 | |
| }
 | |
| 
 | |
| func (co *PeerConnection) run() {
 | |
| 	defer close(co.done)
 | |
| 
 | |
| 	defer func() {
 | |
| 		for _, track := range co.incomingTracks {
 | |
| 			track.close()
 | |
| 		}
 | |
| 		for _, track := range co.OutgoingTracks {
 | |
| 			track.close()
 | |
| 		}
 | |
| 
 | |
| 		co.wr.GracefulClose() //nolint:errcheck
 | |
| 
 | |
| 		// even if GracefulClose() should wait for any goroutine to return,
 | |
| 		// we have to wait for OnConnectionStateChange to return anyway,
 | |
| 		// since it is executed in an uncontrolled goroutine.
 | |
| 		// https://github.com/pion/webrtc/blob/4742d1fd54abbc3f81c3b56013654574ba7254f3/peerconnection.go#L509
 | |
| 		<-co.closed
 | |
| 	}()
 | |
| 
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-co.chStartReading:
 | |
| 			for _, track := range co.incomingTracks {
 | |
| 				track.start()
 | |
| 			}
 | |
| 			atomic.StoreInt64(co.startedReading, 1)
 | |
| 
 | |
| 		case <-co.ctx.Done():
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (co *PeerConnection) removeUnwantedCandidates(firstMedia *sdp.MediaDescription) error {
 | |
| 	var allowedIPs []string
 | |
| 	if co.IPsFromInterfaces {
 | |
| 		var err error
 | |
| 		allowedIPs, err = interfaceIPs(co.IPsFromInterfacesList)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	var newAttributes []sdp.Attribute //nolint:prealloc
 | |
| 
 | |
| 	for _, attr := range firstMedia.Attributes {
 | |
| 		if attr.Key == "candidate" {
 | |
| 			parts := strings.Split(attr.Value, " ")
 | |
| 
 | |
| 			// hide random UDP candidates
 | |
| 			if !co.LocalRandomUDP && co.ICEUDPMux == nil && parts[2] == "udp" && parts[7] == "host" {
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			// hide disallowed IPs
 | |
| 			if parts[7] == "host" && !slices.Contains(allowedIPs, parts[4]) {
 | |
| 				continue
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		newAttributes = append(newAttributes, attr)
 | |
| 	}
 | |
| 
 | |
| 	firstMedia.Attributes = newAttributes
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (co *PeerConnection) addAdditionalCandidates(firstMedia *sdp.MediaDescription) error {
 | |
| 	i := 0
 | |
| 	for _, attr := range firstMedia.Attributes {
 | |
| 		if attr.Key == "end-of-candidates" {
 | |
| 			break
 | |
| 		}
 | |
| 		i++
 | |
| 	}
 | |
| 
 | |
| 	for _, host := range co.AdditionalHosts {
 | |
| 		var ips []string
 | |
| 		if net.ParseIP(host) != nil {
 | |
| 			ips = []string{host}
 | |
| 		} else {
 | |
| 			tmp, err := net.LookupIP(host)
 | |
| 			if err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 
 | |
| 			ips = make([]string, len(tmp))
 | |
| 			for i, e := range tmp {
 | |
| 				ips[i] = e.String()
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		for _, ip := range ips {
 | |
| 			newAttrs := append([]sdp.Attribute(nil), firstMedia.Attributes[:i]...)
 | |
| 
 | |
| 			if co.ICEUDPMux != nil {
 | |
| 				port := strconv.FormatInt(int64(co.ICEUDPMux.GetListenAddresses()[0].(*net.UDPAddr).Port), 10)
 | |
| 
 | |
| 				tmp, err := randUint32()
 | |
| 				if err != nil {
 | |
| 					return err
 | |
| 				}
 | |
| 				id := strconv.FormatInt(int64(tmp), 10)
 | |
| 
 | |
| 				newAttrs = append(newAttrs, sdp.Attribute{
 | |
| 					Key:   "candidate",
 | |
| 					Value: id + " 1 udp 2130706431 " + ip + " " + port + " typ host",
 | |
| 				})
 | |
| 				newAttrs = append(newAttrs, sdp.Attribute{
 | |
| 					Key:   "candidate",
 | |
| 					Value: id + " 2 udp 2130706431 " + ip + " " + port + " typ host",
 | |
| 				})
 | |
| 			}
 | |
| 
 | |
| 			if co.ICETCPMux != nil {
 | |
| 				port := strconv.FormatInt(int64(co.ICETCPMux.Ln.Addr().(*net.TCPAddr).Port), 10)
 | |
| 
 | |
| 				tmp, err := randUint32()
 | |
| 				if err != nil {
 | |
| 					return err
 | |
| 				}
 | |
| 				id := strconv.FormatInt(int64(tmp), 10)
 | |
| 
 | |
| 				newAttrs = append(newAttrs, sdp.Attribute{
 | |
| 					Key:   "candidate",
 | |
| 					Value: id + " 1 tcp 1671430143 " + ip + " " + port + " typ host tcptype passive",
 | |
| 				})
 | |
| 				newAttrs = append(newAttrs, sdp.Attribute{
 | |
| 					Key:   "candidate",
 | |
| 					Value: id + " 2 tcp 1671430143 " + ip + " " + port + " typ host tcptype passive",
 | |
| 				})
 | |
| 			}
 | |
| 
 | |
| 			newAttrs = append(newAttrs, firstMedia.Attributes[i:]...)
 | |
| 			firstMedia.Attributes = newAttrs
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (co *PeerConnection) filterLocalDescription(desc *webrtc.SessionDescription) (*webrtc.SessionDescription, error) {
 | |
| 	var psdp sdp.SessionDescription
 | |
| 	psdp.Unmarshal([]byte(desc.SDP)) //nolint:errcheck
 | |
| 
 | |
| 	firstMedia := psdp.MediaDescriptions[0]
 | |
| 
 | |
| 	err := co.removeUnwantedCandidates(firstMedia)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	err = co.addAdditionalCandidates(firstMedia)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	out, _ := psdp.Marshal()
 | |
| 	desc.SDP = string(out)
 | |
| 
 | |
| 	return desc, nil
 | |
| }
 | |
| 
 | |
| // CreatePartialOffer creates a partial offer.
 | |
| func (co *PeerConnection) CreatePartialOffer() (*webrtc.SessionDescription, error) {
 | |
| 	tmp, err := co.wr.CreateOffer(nil)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	offer := &tmp
 | |
| 
 | |
| 	err = co.wr.SetLocalDescription(*offer)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	offer, err = co.filterLocalDescription(offer)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return offer, nil
 | |
| }
 | |
| 
 | |
| // SetAnswer sets the answer.
 | |
| func (co *PeerConnection) SetAnswer(answer *webrtc.SessionDescription) error {
 | |
| 	return co.wr.SetRemoteDescription(*answer)
 | |
| }
 | |
| 
 | |
| // AddRemoteCandidate adds a remote candidate.
 | |
| func (co *PeerConnection) AddRemoteCandidate(candidate *webrtc.ICECandidateInit) error {
 | |
| 	return co.wr.AddICECandidate(*candidate)
 | |
| }
 | |
| 
 | |
| // CreateFullAnswer creates a full answer.
 | |
| func (co *PeerConnection) CreateFullAnswer(offer *webrtc.SessionDescription) (*webrtc.SessionDescription, error) {
 | |
| 	err := co.wr.SetRemoteDescription(*offer)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	tmp, err := co.wr.CreateAnswer(nil)
 | |
| 	if err != nil {
 | |
| 		if errors.Is(err, webrtc.ErrSenderWithNoCodecs) {
 | |
| 			return nil, fmt.Errorf("codecs not supported by client")
 | |
| 		}
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	answer := &tmp
 | |
| 
 | |
| 	err = co.wr.SetLocalDescription(*answer)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	err = co.waitGatheringDone()
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	answer = co.wr.LocalDescription()
 | |
| 
 | |
| 	answer, err = co.filterLocalDescription(answer)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return answer, nil
 | |
| }
 | |
| 
 | |
| func (co *PeerConnection) waitGatheringDone() error {
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-co.NewLocalCandidate():
 | |
| 		case <-co.GatheringDone():
 | |
| 			return nil
 | |
| 		case <-co.ctx.Done():
 | |
| 			return fmt.Errorf("terminated")
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // WaitUntilConnected waits until connection is established.
 | |
| func (co *PeerConnection) WaitUntilConnected() error {
 | |
| 	t := time.NewTimer(time.Duration(co.HandshakeTimeout))
 | |
| 	defer t.Stop()
 | |
| 
 | |
| outer:
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-t.C:
 | |
| 			return fmt.Errorf("deadline exceeded while waiting connection")
 | |
| 
 | |
| 		case <-co.connected:
 | |
| 			break outer
 | |
| 
 | |
| 		case <-co.ctx.Done():
 | |
| 			return fmt.Errorf("terminated")
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // GatherIncomingTracks gathers incoming tracks.
 | |
| func (co *PeerConnection) GatherIncomingTracks() error {
 | |
| 	var sdp sdp.SessionDescription
 | |
| 	sdp.Unmarshal([]byte(co.wr.RemoteDescription().SDP)) //nolint:errcheck
 | |
| 
 | |
| 	maxTrackCount := len(sdp.MediaDescriptions)
 | |
| 
 | |
| 	t := time.NewTimer(time.Duration(co.TrackGatherTimeout))
 | |
| 	defer t.Stop()
 | |
| 
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-t.C:
 | |
| 			if len(co.incomingTracks) != 0 {
 | |
| 				return nil
 | |
| 			}
 | |
| 			return fmt.Errorf("deadline exceeded while waiting tracks")
 | |
| 
 | |
| 		case pair := <-co.incomingTrack:
 | |
| 			t := &IncomingTrack{
 | |
| 				useAbsoluteTimestamp: co.UseAbsoluteTimestamp,
 | |
| 				track:                pair.track,
 | |
| 				receiver:             pair.receiver,
 | |
| 				writeRTCP:            co.wr.WriteRTCP,
 | |
| 				log:                  co.Log,
 | |
| 				rtpPacketsReceived:   co.rtpPacketsReceived,
 | |
| 				rtpPacketsLost:       co.rtpPacketsLost,
 | |
| 			}
 | |
| 			t.initialize()
 | |
| 			co.incomingTracks = append(co.incomingTracks, t)
 | |
| 
 | |
| 			if len(co.incomingTracks) >= maxTrackCount {
 | |
| 				return nil
 | |
| 			}
 | |
| 
 | |
| 		case <-co.Failed():
 | |
| 			return fmt.Errorf("peer connection closed")
 | |
| 
 | |
| 		case <-co.ctx.Done():
 | |
| 			return fmt.Errorf("terminated")
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Connected returns when connected.
 | |
| func (co *PeerConnection) Connected() <-chan struct{} {
 | |
| 	return co.connected
 | |
| }
 | |
| 
 | |
| // Failed returns when failed.
 | |
| func (co *PeerConnection) Failed() <-chan struct{} {
 | |
| 	return co.failed
 | |
| }
 | |
| 
 | |
| // NewLocalCandidate returns when there's a new local candidate.
 | |
| func (co *PeerConnection) NewLocalCandidate() <-chan *webrtc.ICECandidateInit {
 | |
| 	return co.newLocalCandidate
 | |
| }
 | |
| 
 | |
| // GatheringDone returns when candidate gathering is complete.
 | |
| func (co *PeerConnection) GatheringDone() <-chan struct{} {
 | |
| 	return co.gatheringDone
 | |
| }
 | |
| 
 | |
| // IncomingTracks returns incoming tracks.
 | |
| func (co *PeerConnection) IncomingTracks() []*IncomingTrack {
 | |
| 	return co.incomingTracks
 | |
| }
 | |
| 
 | |
| // StartReading starts reading incoming tracks.
 | |
| func (co *PeerConnection) StartReading() {
 | |
| 	select {
 | |
| 	case co.chStartReading <- struct{}{}:
 | |
| 	case <-co.ctx.Done():
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // LocalCandidate returns the local candidate.
 | |
| func (co *PeerConnection) LocalCandidate() string {
 | |
| 	receivers := co.wr.GetReceivers()
 | |
| 	if len(receivers) < 1 {
 | |
| 		return ""
 | |
| 	}
 | |
| 
 | |
| 	cp, err := receivers[0].Transport().ICETransport().GetSelectedCandidatePair()
 | |
| 	if err != nil || cp == nil {
 | |
| 		return ""
 | |
| 	}
 | |
| 
 | |
| 	return candidateLabel(cp.Local)
 | |
| }
 | |
| 
 | |
| // RemoteCandidate returns the remote candidate.
 | |
| func (co *PeerConnection) RemoteCandidate() string {
 | |
| 	receivers := co.wr.GetReceivers()
 | |
| 	if len(receivers) < 1 {
 | |
| 		return ""
 | |
| 	}
 | |
| 
 | |
| 	cp, err := receivers[0].Transport().ICETransport().GetSelectedCandidatePair()
 | |
| 	if err != nil || cp == nil {
 | |
| 		return ""
 | |
| 	}
 | |
| 
 | |
| 	return candidateLabel(cp.Remote)
 | |
| }
 | |
| 
 | |
| func bytesStats(wr *webrtc.PeerConnection) (uint64, uint64) {
 | |
| 	for _, stats := range wr.GetStats() {
 | |
| 		if tstats, ok := stats.(webrtc.TransportStats); ok {
 | |
| 			if tstats.ID == "iceTransport" {
 | |
| 				return tstats.BytesReceived, tstats.BytesSent
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	return 0, 0
 | |
| }
 | |
| 
 | |
| // Stats returns statistics.
 | |
| func (co *PeerConnection) Stats() *Stats {
 | |
| 	bytesReceived, bytesSent := bytesStats(co.wr)
 | |
| 
 | |
| 	v := float64(0)
 | |
| 	n := float64(0)
 | |
| 
 | |
| 	if atomic.LoadInt64(co.startedReading) == 1 {
 | |
| 		for _, tr := range co.incomingTracks {
 | |
| 			if recvStats := tr.rtcpReceiver.Stats(); recvStats != nil {
 | |
| 				v += recvStats.Jitter
 | |
| 				n++
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	var rtpPacketsJitter float64
 | |
| 	if n != 0 {
 | |
| 		rtpPacketsJitter = v / n
 | |
| 	} else {
 | |
| 		rtpPacketsJitter = 0
 | |
| 	}
 | |
| 
 | |
| 	return &Stats{
 | |
| 		BytesReceived:       bytesReceived,
 | |
| 		BytesSent:           bytesSent,
 | |
| 		RTPPacketsReceived:  atomic.LoadUint64(co.rtpPacketsReceived),
 | |
| 		RTPPacketsSent:      atomic.LoadUint64(co.rtpPacketsSent),
 | |
| 		RTPPacketsLost:      atomic.LoadUint64(co.rtpPacketsLost),
 | |
| 		RTPPacketsJitter:    rtpPacketsJitter,
 | |
| 		RTCPPacketsReceived: atomic.LoadUint64(co.statsInterceptor.rtcpPacketsReceived),
 | |
| 		RTCPPacketsSent:     atomic.LoadUint64(co.statsInterceptor.rtcpPacketsSent),
 | |
| 	}
 | |
| }
 | 
