// SPDX-FileCopyrightText: 2023 The Pion community // SPDX-License-Identifier: MIT //go:build !js // +build !js package webrtc import ( "fmt" "strings" "sync" "sync/atomic" "time" "github.com/pion/ice/v4" "github.com/pion/logging" "github.com/pion/stun/v3" ) // ICEGatherer gathers local host, server reflexive and relay // candidates, as well as enabling the retrieval of local Interactive // Connectivity Establishment (ICE) parameters which can be // exchanged in signaling. type ICEGatherer struct { lock sync.RWMutex log logging.LeveledLogger state ICEGathererState validatedServers []*stun.URI gatherPolicy ICETransportPolicy agent *ice.Agent onLocalCandidateHandler atomic.Value // func(candidate *ICECandidate) onStateChangeHandler atomic.Value // func(state ICEGathererState) // Used for GatheringCompletePromise onGatheringCompleteHandler atomic.Value // func() api *API // Used to set the corresponding media stream identification tag and media description index // for ICE candidates generated by this gatherer. sdpMid atomic.Value // string sdpMLineIndex atomic.Uint32 // uint16 } // ICEAddressRewriteMode controls whether a rule replaces or appends candidates. type ICEAddressRewriteMode byte const ( ICEAddressRewriteModeUnspecified ICEAddressRewriteMode = iota ICEAddressRewriteReplace ICEAddressRewriteAppend ) func (r ICEAddressRewriteMode) toICE() ice.AddressRewriteMode { return ice.AddressRewriteMode(r) } // ICEAddressRewriteRule represents a rule for remapping candidate addresses. type ICEAddressRewriteRule struct { External []string Local string Iface string CIDR string AsCandidateType ICECandidateType Mode ICEAddressRewriteMode Networks []NetworkType } func (r ICEAddressRewriteRule) toICE() ice.AddressRewriteRule { candidateType := r.AsCandidateType.toICE() mode := r.Mode.toICE() networks := toICENetworkTypes(r.Networks) rule := ice.AddressRewriteRule{ External: append([]string(nil), r.External...), Local: r.Local, Iface: r.Iface, CIDR: r.CIDR, AsCandidateType: candidateType, Mode: mode, Networks: networks, } return rule } // NewICEGatherer creates a new NewICEGatherer. // This constructor is part of the ORTC API. It is not // meant to be used together with the basic WebRTC API. func (api *API) NewICEGatherer(opts ICEGatherOptions) (*ICEGatherer, error) { var validatedServers []*stun.URI if len(opts.ICEServers) > 0 { for _, server := range opts.ICEServers { url, err := server.urls() if err != nil { return nil, err } validatedServers = append(validatedServers, url...) } } return &ICEGatherer{ state: ICEGathererStateNew, gatherPolicy: opts.ICEGatherPolicy, validatedServers: validatedServers, api: api, log: api.settingEngine.LoggerFactory.NewLogger("ice"), sdpMid: atomic.Value{}, sdpMLineIndex: atomic.Uint32{}, }, nil } func (g *ICEGatherer) createAgent() error { g.lock.Lock() defer g.lock.Unlock() if g.agent != nil || g.State() != ICEGathererStateNew { return nil } options, err := g.buildAgentOptions() if err != nil { return err } agent, err := ice.NewAgentWithOptions(options...) if err != nil { return err } g.agent = agent return nil } func (g *ICEGatherer) buildAgentOptions() ([]ice.AgentOption, error) { candidateTypes := g.resolveCandidateTypes() nat1To1CandiTyp := g.resolveNAT1To1CandidateType() mDNSMode := g.sanitizedMDNSMode() options := g.baseAgentOptions(mDNSMode) if len(candidateTypes) > 0 { options = append(options, ice.WithCandidateTypes(candidateTypes)) } options = append(options, g.credentialOptions()...) rewriteOptions, err := g.addressRewriteOptions(nat1To1CandiTyp) if err != nil { return nil, err } options = append(options, rewriteOptions...) options = append(options, g.timeoutOptions()...) options = append(options, g.miscOptions()...) options = append(options, g.renominationOptions()...) requestedNetworkTypes := g.api.settingEngine.candidates.ICENetworkTypes if len(requestedNetworkTypes) == 0 { requestedNetworkTypes = supportedNetworkTypes() } return append(options, ice.WithNetworkTypes(toICENetworkTypes(requestedNetworkTypes))), nil } func (g *ICEGatherer) resolveCandidateTypes() []ice.CandidateType { if g.api.settingEngine.candidates.ICELite { return []ice.CandidateType{ice.CandidateTypeHost} } switch g.gatherPolicy { case ICETransportPolicyRelay: return []ice.CandidateType{ice.CandidateTypeRelay} case ICETransportPolicyNoHost: return []ice.CandidateType{ice.CandidateTypeServerReflexive, ice.CandidateTypeRelay} default: } return nil } func (g *ICEGatherer) resolveNAT1To1CandidateType() ice.CandidateType { switch g.api.settingEngine.candidates.NAT1To1IPCandidateType { case ICECandidateTypeHost: return ice.CandidateTypeHost case ICECandidateTypeSrflx: return ice.CandidateTypeServerReflexive default: return ice.CandidateTypeUnspecified } } func (g *ICEGatherer) sanitizedMDNSMode() ice.MulticastDNSMode { mode := g.api.settingEngine.candidates.MulticastDNSMode if mode == ice.MulticastDNSModeDisabled || mode == ice.MulticastDNSModeQueryAndGather { return mode } return ice.MulticastDNSModeQueryOnly } func (g *ICEGatherer) baseAgentOptions(mDNSMode ice.MulticastDNSMode) []ice.AgentOption { return []ice.AgentOption{ ice.WithICELite(g.api.settingEngine.candidates.ICELite), ice.WithUrls(g.validatedServers), ice.WithPortRange(g.api.settingEngine.ephemeralUDP.PortMin, g.api.settingEngine.ephemeralUDP.PortMax), ice.WithLoggerFactory(g.api.settingEngine.LoggerFactory), ice.WithInterfaceFilter(g.api.settingEngine.candidates.InterfaceFilter), ice.WithIPFilter(g.api.settingEngine.candidates.IPFilter), ice.WithNet(g.api.settingEngine.net), ice.WithMulticastDNSMode(mDNSMode), ice.WithTCPMux(g.api.settingEngine.iceTCPMux), ice.WithUDPMux(g.api.settingEngine.iceUDPMux), ice.WithProxyDialer(g.api.settingEngine.iceProxyDialer), ice.WithBindingRequestHandler(g.api.settingEngine.iceBindingRequestHandler), } } func (g *ICEGatherer) credentialOptions() []ice.AgentOption { ufrag := g.api.settingEngine.candidates.UsernameFragment pass := g.api.settingEngine.candidates.Password if ufrag == "" && pass == "" { return nil } return []ice.AgentOption{ ice.WithLocalCredentials(g.api.settingEngine.candidates.UsernameFragment, g.api.settingEngine.candidates.Password), } } func (g *ICEGatherer) addressRewriteOptions(candidateType ice.CandidateType) ([]ice.AgentOption, error) { rules := g.api.settingEngine.candidates.addressRewriteRules nat1To1IPs := g.api.settingEngine.candidates.NAT1To1IPs if len(rules) > 0 && len(nat1To1IPs) > 0 { return nil, errAddressRewriteWithNAT1To1 } if len(rules) > 0 { return []ice.AgentOption{ice.WithAddressRewriteRules(rules...)}, nil } if len(nat1To1IPs) == 0 { return nil, nil } return []ice.AgentOption{ ice.WithAddressRewriteRules( legacyNAT1To1AddressRewriteRules( nat1To1IPs, candidateType, )..., ), }, nil } func (g *ICEGatherer) timeoutOptions() []ice.AgentOption { opts := make([]ice.AgentOption, 0, 8) if g.api.settingEngine.timeout.ICEDisconnectedTimeout != nil { opts = append(opts, ice.WithDisconnectedTimeout(*g.api.settingEngine.timeout.ICEDisconnectedTimeout)) } if g.api.settingEngine.timeout.ICEFailedTimeout != nil { opts = append(opts, ice.WithFailedTimeout(*g.api.settingEngine.timeout.ICEFailedTimeout)) } if g.api.settingEngine.timeout.ICEKeepaliveInterval != nil { opts = append(opts, ice.WithKeepaliveInterval(*g.api.settingEngine.timeout.ICEKeepaliveInterval)) } if g.api.settingEngine.timeout.ICEHostAcceptanceMinWait != nil { opts = append(opts, ice.WithHostAcceptanceMinWait(*g.api.settingEngine.timeout.ICEHostAcceptanceMinWait)) } if g.api.settingEngine.timeout.ICESrflxAcceptanceMinWait != nil { opts = append(opts, ice.WithSrflxAcceptanceMinWait(*g.api.settingEngine.timeout.ICESrflxAcceptanceMinWait)) } if g.api.settingEngine.timeout.ICEPrflxAcceptanceMinWait != nil { opts = append(opts, ice.WithPrflxAcceptanceMinWait(*g.api.settingEngine.timeout.ICEPrflxAcceptanceMinWait)) } if g.api.settingEngine.timeout.ICERelayAcceptanceMinWait != nil { opts = append(opts, ice.WithRelayAcceptanceMinWait(*g.api.settingEngine.timeout.ICERelayAcceptanceMinWait)) } if g.api.settingEngine.timeout.ICESTUNGatherTimeout != nil { opts = append(opts, ice.WithSTUNGatherTimeout(*g.api.settingEngine.timeout.ICESTUNGatherTimeout)) } return opts } func (g *ICEGatherer) miscOptions() []ice.AgentOption { opts := make([]ice.AgentOption, 0, 4) if g.api.settingEngine.candidates.MulticastDNSHostName != "" { opts = append(opts, ice.WithMulticastDNSHostName(g.api.settingEngine.candidates.MulticastDNSHostName)) } if g.api.settingEngine.candidates.IncludeLoopbackCandidate { opts = append(opts, ice.WithIncludeLoopback()) } if g.api.settingEngine.iceDisableActiveTCP { opts = append(opts, ice.WithDisableActiveTCP()) } if g.api.settingEngine.iceMaxBindingRequests != nil { opts = append(opts, ice.WithMaxBindingRequests(*g.api.settingEngine.iceMaxBindingRequests)) } return opts } func (g *ICEGatherer) renominationOptions() []ice.AgentOption { renom := g.api.settingEngine.renomination if !renom.enabled && !renom.automatic { return nil } generator := renom.generator opts := []ice.AgentOption{ ice.WithRenomination(func() uint32 { return generator() }), } if renom.automatic { interval := time.Duration(0) if renom.automaticInterval != nil { interval = *renom.automaticInterval } opts = append(opts, ice.WithAutomaticRenomination(interval)) } return opts } func legacyNAT1To1AddressRewriteRules(ips []string, candidateType ice.CandidateType) []ice.AddressRewriteRule { catchAll := make([]string, 0, len(ips)) rules := make([]ice.AddressRewriteRule, 0, len(ips)+1) for _, ip := range ips { splits := strings.SplitN(ip, "/", 2) if len(splits) == 2 { rules = append(rules, ice.AddressRewriteRule{ External: []string{splits[0]}, Local: splits[1], AsCandidateType: candidateType, }) catchAll = append(catchAll, splits[0]) } else { catchAll = append(catchAll, ip) } } if len(catchAll) > 0 { rules = append(rules, ice.AddressRewriteRule{ External: catchAll, AsCandidateType: candidateType, }) } return rules } // Gather ICE candidates. func (g *ICEGatherer) Gather() error { //nolint:cyclop if err := g.createAgent(); err != nil { return err } agent := g.getAgent() // it is possible agent had just been closed if agent == nil { return fmt.Errorf("%w: unable to gather", errICEAgentNotExist) } g.setState(ICEGathererStateGathering) if err := agent.OnCandidate(func(candidate ice.Candidate) { onLocalCandidateHandler := func(*ICECandidate) {} if handler, ok := g.onLocalCandidateHandler.Load().(func(candidate *ICECandidate)); ok && handler != nil { onLocalCandidateHandler = handler } onGatheringCompleteHandler := func() {} if handler, ok := g.onGatheringCompleteHandler.Load().(func()); ok && handler != nil { onGatheringCompleteHandler = handler } sdpMid := "" if mid, ok := g.sdpMid.Load().(string); ok { sdpMid = mid } sdpMLineIndex := uint16(g.sdpMLineIndex.Load()) //nolint:gosec // G115 if candidate != nil { c, err := newICECandidateFromICE(candidate, sdpMid, sdpMLineIndex) if err != nil { g.log.Warnf("Failed to convert ice.Candidate: %s", err) return } onLocalCandidateHandler(&c) } else { g.setState(ICEGathererStateComplete) onGatheringCompleteHandler() onLocalCandidateHandler(nil) } }); err != nil { return err } return agent.GatherCandidates() } // set media stream identification tag and media description index for this gatherer. func (g *ICEGatherer) setMediaStreamIdentification(mid string, mLineIndex uint16) { g.sdpMid.Store(mid) g.sdpMLineIndex.Store(uint32(mLineIndex)) } // Close prunes all local candidates, and closes the ports. func (g *ICEGatherer) Close() error { return g.close(false /* shouldGracefullyClose */) } // GracefulClose prunes all local candidates, and closes the ports. It also waits // for any goroutines it started to complete. This is only safe to call outside of // ICEGatherer callbacks or if in a callback, in its own goroutine. func (g *ICEGatherer) GracefulClose() error { return g.close(true /* shouldGracefullyClose */) } func (g *ICEGatherer) close(shouldGracefullyClose bool) error { g.lock.Lock() defer g.lock.Unlock() if g.agent == nil { return nil } if shouldGracefullyClose { if err := g.agent.GracefulClose(); err != nil { return err } } else { if err := g.agent.Close(); err != nil { return err } } g.agent = nil g.setState(ICEGathererStateClosed) return nil } // GetLocalParameters returns the ICE parameters of the ICEGatherer. func (g *ICEGatherer) GetLocalParameters() (ICEParameters, error) { if err := g.createAgent(); err != nil { return ICEParameters{}, err } agent := g.getAgent() // it is possible agent had just been closed if agent == nil { return ICEParameters{}, fmt.Errorf("%w: unable to get local parameters", errICEAgentNotExist) } frag, pwd, err := agent.GetLocalUserCredentials() if err != nil { return ICEParameters{}, err } return ICEParameters{ UsernameFragment: frag, Password: pwd, ICELite: false, }, nil } // GetLocalCandidates returns the sequence of valid local candidates associated with the ICEGatherer. func (g *ICEGatherer) GetLocalCandidates() ([]ICECandidate, error) { if err := g.createAgent(); err != nil { return nil, err } agent := g.getAgent() // it is possible agent had just been closed if agent == nil { return nil, fmt.Errorf("%w: unable to get local candidates", errICEAgentNotExist) } iceCandidates, err := agent.GetLocalCandidates() if err != nil { return nil, err } sdpMid := "" if mid, ok := g.sdpMid.Load().(string); ok { sdpMid = mid } sdpMLineIndex := uint16(g.sdpMLineIndex.Load()) //nolint:gosec // G115 return newICECandidatesFromICE(iceCandidates, sdpMid, sdpMLineIndex) } // OnLocalCandidate sets an event handler which fires when a new local ICE candidate is available // Take note that the handler will be called with a nil pointer when gathering is finished. func (g *ICEGatherer) OnLocalCandidate(f func(*ICECandidate)) { g.onLocalCandidateHandler.Store(f) } // OnStateChange fires any time the ICEGatherer changes. func (g *ICEGatherer) OnStateChange(f func(ICEGathererState)) { g.onStateChangeHandler.Store(f) } // State indicates the current state of the ICE gatherer. func (g *ICEGatherer) State() ICEGathererState { return atomicLoadICEGathererState(&g.state) } func (g *ICEGatherer) setState(s ICEGathererState) { atomicStoreICEGathererState(&g.state, s) if handler, ok := g.onStateChangeHandler.Load().(func(state ICEGathererState)); ok && handler != nil { handler(s) } } func (g *ICEGatherer) getAgent() *ice.Agent { g.lock.RLock() defer g.lock.RUnlock() return g.agent } func (g *ICEGatherer) collectStats(collector *statsReportCollector) { agent := g.getAgent() if agent == nil { return } collector.Collecting() go func(collector *statsReportCollector, agent *ice.Agent) { for _, candidatePairStats := range agent.GetCandidatePairsStats() { collector.Collecting() stats, err := toICECandidatePairStats(candidatePairStats) if err != nil { g.log.Error(err.Error()) collector.Done() continue } collector.Collect(stats.ID, stats) } for _, candidateStats := range agent.GetLocalCandidatesStats() { collector.Collecting() networkType, err := getNetworkType(candidateStats.NetworkType) if err != nil { g.log.Error(err.Error()) } candidateType, err := getCandidateType(candidateStats.CandidateType) if err != nil { g.log.Error(err.Error()) } stats := ICECandidateStats{ Timestamp: statsTimestampFrom(candidateStats.Timestamp), ID: candidateStats.ID, Type: StatsTypeLocalCandidate, IP: candidateStats.IP, Port: int32(candidateStats.Port), //nolint:gosec // G115, no overflow, port Protocol: networkType.Protocol(), CandidateType: candidateType, Priority: int32(candidateStats.Priority), //nolint:gosec URL: candidateStats.URL, RelayProtocol: candidateStats.RelayProtocol, Deleted: candidateStats.Deleted, } collector.Collect(stats.ID, stats) } for _, candidateStats := range agent.GetRemoteCandidatesStats() { collector.Collecting() networkType, err := getNetworkType(candidateStats.NetworkType) if err != nil { g.log.Error(err.Error()) } candidateType, err := getCandidateType(candidateStats.CandidateType) if err != nil { g.log.Error(err.Error()) } stats := ICECandidateStats{ Timestamp: statsTimestampFrom(candidateStats.Timestamp), ID: candidateStats.ID, Type: StatsTypeRemoteCandidate, IP: candidateStats.IP, Port: int32(candidateStats.Port), //nolint:gosec // G115, no overflow, port Protocol: networkType.Protocol(), CandidateType: candidateType, Priority: int32(candidateStats.Priority), //nolint:gosec // G115 URL: candidateStats.URL, RelayProtocol: candidateStats.RelayProtocol, } collector.Collect(stats.ID, stats) } collector.Done() }(collector, agent) } func (g *ICEGatherer) getSelectedCandidatePairStats() (ICECandidatePairStats, bool) { agent := g.getAgent() if agent == nil { return ICECandidatePairStats{}, false } selectedCandidatePairStats, isAvailable := agent.GetSelectedCandidatePairStats() if !isAvailable { return ICECandidatePairStats{}, false } stats, err := toICECandidatePairStats(selectedCandidatePairStats) if err != nil { g.log.Error(err.Error()) return ICECandidatePairStats{}, false } return stats, true }