Fix concurrent pc.GracefulClose

This commit is contained in:
Eric Daniels
2024-08-26 07:38:32 -04:00
parent a49914b253
commit 6ac4b71eee
2 changed files with 155 additions and 35 deletions

View File

@@ -56,8 +56,9 @@ type PeerConnection struct {
idpLoginURL *string
isClosed *atomicBool
isGracefulClosed *atomicBool
isGracefulClosedDone chan struct{}
isGracefullyClosingOrClosed bool
isCloseDone chan struct{}
isGracefulCloseDone chan struct{}
isNegotiationNeeded *atomicBool
updateNegotiationNeededFlagOnEmptyChain *atomicBool
@@ -130,8 +131,8 @@ func (api *API) NewPeerConnection(configuration Configuration) (*PeerConnection,
ICECandidatePoolSize: 0,
},
isClosed: &atomicBool{},
isGracefulClosed: &atomicBool{},
isGracefulClosedDone: make(chan struct{}),
isCloseDone: make(chan struct{}),
isGracefulCloseDone: make(chan struct{}),
isNegotiationNeeded: &atomicBool{},
updateNegotiationNeededFlagOnEmptyChain: &atomicBool{},
lastOffer: "",
@@ -2101,22 +2102,44 @@ func (pc *PeerConnection) GracefulClose() error {
func (pc *PeerConnection) close(shouldGracefullyClose bool) error {
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #1)
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #2)
alreadyGracefullyClosed := shouldGracefullyClose && pc.isGracefulClosed.swap(true)
if pc.isClosed.swap(true) {
if alreadyGracefullyClosed {
// similar but distinct condition where we may be waiting for some
// other graceful close to finish. Incorrectly using isClosed may
// leak a goroutine.
<-pc.isGracefulClosedDone
}
return nil
pc.mu.Lock()
// A lock in this critical section is needed because pc.isClosed and
// pc.isGracefullyClosingOrClosed are related to each other in that we
// want to make graceful and normal closure one time operations in order
// to avoid any double closure errors from cropping up. However, there are
// some overlapping close cases when both normal and graceful close are used
// that should be idempotent, but be cautioned when writing new close behavior
// to preserve this property.
isAlreadyClosingOrClosed := pc.isClosed.swap(true)
isAlreadyGracefullyClosingOrClosed := pc.isGracefullyClosingOrClosed
if shouldGracefullyClose && !isAlreadyGracefullyClosingOrClosed {
pc.isGracefullyClosingOrClosed = true
}
if shouldGracefullyClose && !alreadyGracefullyClosed {
defer close(pc.isGracefulClosedDone)
pc.mu.Unlock()
if isAlreadyClosingOrClosed {
if !shouldGracefullyClose {
return nil
}
// Even if we're already closing, it may not be graceful:
// If we are not the ones doing the closing, we just wait for the graceful close
// to happen and then return.
if isAlreadyGracefullyClosingOrClosed {
<-pc.isGracefulCloseDone
return nil
}
// Otherwise we need to go through the graceful closure flow once the
// normal closure is done since there are extra steps to take with a
// graceful close.
<-pc.isCloseDone
} else {
defer close(pc.isCloseDone)
}
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #3)
pc.signalingState.Set(SignalingStateClosed)
if shouldGracefullyClose {
defer close(pc.isGracefulCloseDone)
}
// Try closing everything and collect the errors
// Shutdown strategy:
@@ -2126,6 +2149,34 @@ func (pc *PeerConnection) close(shouldGracefullyClose bool) error {
// continue the chain the Mux has to be closed.
closeErrs := make([]error, 4)
doGracefulCloseOps := func() []error {
if !shouldGracefullyClose {
return nil
}
// these are all non-canon steps
var gracefulCloseErrors []error
if pc.iceTransport != nil {
gracefulCloseErrors = append(gracefulCloseErrors, pc.iceTransport.GracefulStop())
}
pc.ops.GracefulClose()
pc.sctpTransport.lock.Lock()
for _, d := range pc.sctpTransport.dataChannels {
gracefulCloseErrors = append(gracefulCloseErrors, d.GracefulClose())
}
pc.sctpTransport.lock.Unlock()
return gracefulCloseErrors
}
if isAlreadyClosingOrClosed {
return util.FlattenErrs(doGracefulCloseOps())
}
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #3)
pc.signalingState.Set(SignalingStateClosed)
closeErrs = append(closeErrs, pc.api.interceptor.Close())
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #4)
@@ -2156,28 +2207,15 @@ func (pc *PeerConnection) close(shouldGracefullyClose bool) error {
closeErrs = append(closeErrs, pc.dtlsTransport.Stop())
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #8, #9, #10)
if pc.iceTransport != nil {
if shouldGracefullyClose {
// note that it isn't canon to stop gracefully
closeErrs = append(closeErrs, pc.iceTransport.GracefulStop())
} else {
closeErrs = append(closeErrs, pc.iceTransport.Stop())
}
if pc.iceTransport != nil && !shouldGracefullyClose {
// we will stop gracefully in doGracefulCloseOps
closeErrs = append(closeErrs, pc.iceTransport.Stop())
}
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #11)
pc.updateConnectionState(pc.ICEConnectionState(), pc.dtlsTransport.State())
if shouldGracefullyClose {
pc.ops.GracefulClose()
// note that it isn't canon to stop gracefully
pc.sctpTransport.lock.Lock()
for _, d := range pc.sctpTransport.dataChannels {
closeErrs = append(closeErrs, d.GracefulClose())
}
pc.sctpTransport.lock.Unlock()
}
closeErrs = append(closeErrs, doGracefulCloseOps()...)
return util.FlattenErrs(closeErrs)
}

View File

@@ -7,6 +7,8 @@
package webrtc
import (
"fmt"
"sync"
"testing"
"time"
@@ -180,7 +182,7 @@ func TestPeerConnection_Close_DuringICE(t *testing.T) {
}
}
func TestPeerConnection_CloseWithIncomingMessages(t *testing.T) {
func TestPeerConnection_GracefulCloseWithIncomingMessages(t *testing.T) {
// Limit runtime in case of deadlocks
lim := test.TimeOut(time.Second * 20)
defer lim.Stop()
@@ -245,3 +247,83 @@ func TestPeerConnection_CloseWithIncomingMessages(t *testing.T) {
t.Fatal(err)
}
}
func TestPeerConnection_GracefulCloseWhileOpening(t *testing.T) {
// Limit runtime in case of deadlocks
lim := test.TimeOut(time.Second * 5)
defer lim.Stop()
report := test.CheckRoutinesStrict(t)
defer report()
pcOffer, pcAnswer, err := newPair()
if err != nil {
t.Fatal(err)
}
if _, err = pcOffer.CreateDataChannel("initial_data_channel", nil); err != nil {
t.Fatal(err)
}
offer, err := pcOffer.CreateOffer(nil)
if err != nil {
t.Fatal(err)
}
offerGatheringComplete := GatheringCompletePromise(pcOffer)
if err = pcOffer.SetLocalDescription(offer); err != nil {
t.Fatal(err)
}
<-offerGatheringComplete
err = pcOffer.GracefulClose()
if err != nil {
t.Fatal(err)
}
if err = pcAnswer.SetRemoteDescription(offer); err != nil {
t.Fatal(err)
}
err = pcAnswer.GracefulClose()
if err != nil {
t.Fatal(err)
}
}
func TestPeerConnection_GracefulCloseConcurrent(t *testing.T) {
// Limit runtime in case of deadlocks
lim := test.TimeOut(time.Second * 10)
defer lim.Stop()
for _, mixed := range []bool{false, true} {
t.Run(fmt.Sprintf("mixed_graceful=%t", mixed), func(t *testing.T) {
report := test.CheckRoutinesStrict(t)
defer report()
pc, err := NewPeerConnection(Configuration{})
if err != nil {
t.Fatal(err)
}
const gracefulCloseConcurrency = 50
var wg sync.WaitGroup
wg.Add(gracefulCloseConcurrency)
for i := 0; i < gracefulCloseConcurrency; i++ {
go func() {
defer wg.Done()
assert.NoError(t, pc.GracefulClose())
}()
}
if !mixed {
if err := pc.Close(); err != nil {
t.Fatal(err)
}
} else {
if err := pc.GracefulClose(); err != nil {
t.Fatal(err)
}
}
wg.Wait()
})
}
}