mirror of
https://github.com/pion/webrtc.git
synced 2025-09-27 03:25:58 +08:00
@@ -40,7 +40,6 @@ type DataChannel struct {
|
|||||||
readyState atomic.Value // DataChannelState
|
readyState atomic.Value // DataChannelState
|
||||||
bufferedAmountLowThreshold uint64
|
bufferedAmountLowThreshold uint64
|
||||||
detachCalled bool
|
detachCalled bool
|
||||||
readLoopActive chan struct{}
|
|
||||||
|
|
||||||
// The binaryType represents attribute MUST, on getting, return the value to
|
// The binaryType represents attribute MUST, on getting, return the value to
|
||||||
// which it was last set. On setting, if the new value is either the string
|
// which it was last set. On setting, if the new value is either the string
|
||||||
@@ -328,7 +327,6 @@ func (d *DataChannel) handleOpen(dc *datachannel.DataChannel, isRemote, isAlread
|
|||||||
defer d.mu.Unlock()
|
defer d.mu.Unlock()
|
||||||
|
|
||||||
if !d.api.settingEngine.detach.DataChannels {
|
if !d.api.settingEngine.detach.DataChannels {
|
||||||
d.readLoopActive = make(chan struct{})
|
|
||||||
go d.readLoop()
|
go d.readLoop()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -358,7 +356,6 @@ var rlBufPool = sync.Pool{New: func() interface{} {
|
|||||||
}}
|
}}
|
||||||
|
|
||||||
func (d *DataChannel) readLoop() {
|
func (d *DataChannel) readLoop() {
|
||||||
defer close(d.readLoopActive)
|
|
||||||
for {
|
for {
|
||||||
buffer := rlBufPool.Get().([]byte) //nolint:forcetypeassert
|
buffer := rlBufPool.Get().([]byte) //nolint:forcetypeassert
|
||||||
n, isString, err := d.dataChannel.ReadDataChannel(buffer)
|
n, isString, err := d.dataChannel.ReadDataChannel(buffer)
|
||||||
@@ -441,22 +438,6 @@ func (d *DataChannel) Detach() (datachannel.ReadWriteCloser, error) {
|
|||||||
// Close Closes the DataChannel. It may be called regardless of whether
|
// Close Closes the DataChannel. It may be called regardless of whether
|
||||||
// the DataChannel object was created by this peer or the remote peer.
|
// the DataChannel object was created by this peer or the remote peer.
|
||||||
func (d *DataChannel) Close() error {
|
func (d *DataChannel) Close() error {
|
||||||
return d.close(false)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Normally, close only stops writes from happening, so waitForReadsDone=true
|
|
||||||
// will wait for reads to be finished based on underlying SCTP association
|
|
||||||
// closure or a SCTP reset stream from the other side. This is safe to call
|
|
||||||
// with waitForReadsDone=true after tearing down a PeerConnection but not
|
|
||||||
// necessarily before. For example, if you used a vnet and dropped all packets
|
|
||||||
// right before closing the DataChannel, you'd need never see a reset stream.
|
|
||||||
func (d *DataChannel) close(waitForReadsDone bool) error {
|
|
||||||
if waitForReadsDone && d.readLoopActive != nil {
|
|
||||||
defer func() {
|
|
||||||
<-d.readLoopActive
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
d.mu.Lock()
|
d.mu.Lock()
|
||||||
haveSctpTransport := d.dataChannel != nil
|
haveSctpTransport := d.dataChannel != nil
|
||||||
d.mu.Unlock()
|
d.mu.Unlock()
|
||||||
|
@@ -56,7 +56,6 @@ type PeerConnection struct {
|
|||||||
idpLoginURL *string
|
idpLoginURL *string
|
||||||
|
|
||||||
isClosed *atomicBool
|
isClosed *atomicBool
|
||||||
isClosedDone chan struct{}
|
|
||||||
isNegotiationNeeded *atomicBool
|
isNegotiationNeeded *atomicBool
|
||||||
updateNegotiationNeededFlagOnEmptyChain *atomicBool
|
updateNegotiationNeededFlagOnEmptyChain *atomicBool
|
||||||
|
|
||||||
@@ -128,7 +127,6 @@ func (api *API) NewPeerConnection(configuration Configuration) (*PeerConnection,
|
|||||||
ICECandidatePoolSize: 0,
|
ICECandidatePoolSize: 0,
|
||||||
},
|
},
|
||||||
isClosed: &atomicBool{},
|
isClosed: &atomicBool{},
|
||||||
isClosedDone: make(chan struct{}),
|
|
||||||
isNegotiationNeeded: &atomicBool{},
|
isNegotiationNeeded: &atomicBool{},
|
||||||
updateNegotiationNeededFlagOnEmptyChain: &atomicBool{},
|
updateNegotiationNeededFlagOnEmptyChain: &atomicBool{},
|
||||||
lastOffer: "",
|
lastOffer: "",
|
||||||
@@ -2036,31 +2034,14 @@ func (pc *PeerConnection) writeRTCP(pkts []rtcp.Packet, _ interceptor.Attributes
|
|||||||
return pc.dtlsTransport.WriteRTCP(pkts)
|
return pc.dtlsTransport.WriteRTCP(pkts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close ends the PeerConnection.
|
// Close ends the PeerConnection
|
||||||
// It will make a best effort to wait for all underlying goroutines it spawned to finish,
|
|
||||||
// except for cases that would cause deadlocks with itself.
|
|
||||||
func (pc *PeerConnection) Close() error {
|
func (pc *PeerConnection) Close() error {
|
||||||
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #1)
|
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #1)
|
||||||
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #2)
|
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #2)
|
||||||
if pc.isClosed.swap(true) {
|
if pc.isClosed.swap(true) {
|
||||||
// someone else got here first but may still be closing (e.g. via DTLS close_notify)
|
|
||||||
<-pc.isClosedDone
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
defer close(pc.isClosedDone)
|
|
||||||
|
|
||||||
// Try closing everything and collect the errors
|
|
||||||
// Shutdown strategy:
|
|
||||||
// 1. Close all data channels.
|
|
||||||
// 2. All Conn close by closing their underlying Conn.
|
|
||||||
// 3. A Mux stops this chain. It won't close the underlying
|
|
||||||
// Conn if one of the endpoints is closed down. To
|
|
||||||
// continue the chain the Mux has to be closed.
|
|
||||||
pc.sctpTransport.lock.Lock()
|
|
||||||
closeErrs := make([]error, 0, 4+len(pc.sctpTransport.dataChannels))
|
|
||||||
pc.sctpTransport.lock.Unlock()
|
|
||||||
|
|
||||||
// canon steps
|
|
||||||
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #3)
|
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #3)
|
||||||
pc.signalingState.Set(SignalingStateClosed)
|
pc.signalingState.Set(SignalingStateClosed)
|
||||||
|
|
||||||
@@ -2070,6 +2051,7 @@ func (pc *PeerConnection) Close() error {
|
|||||||
// 2. A Mux stops this chain. It won't close the underlying
|
// 2. A Mux stops this chain. It won't close the underlying
|
||||||
// Conn if one of the endpoints is closed down. To
|
// Conn if one of the endpoints is closed down. To
|
||||||
// continue the chain the Mux has to be closed.
|
// continue the chain the Mux has to be closed.
|
||||||
|
closeErrs := make([]error, 4)
|
||||||
|
|
||||||
closeErrs = append(closeErrs, pc.api.interceptor.Close())
|
closeErrs = append(closeErrs, pc.api.interceptor.Close())
|
||||||
|
|
||||||
@@ -2096,6 +2078,7 @@ func (pc *PeerConnection) Close() error {
|
|||||||
|
|
||||||
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #7)
|
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #7)
|
||||||
closeErrs = append(closeErrs, pc.dtlsTransport.Stop())
|
closeErrs = append(closeErrs, pc.dtlsTransport.Stop())
|
||||||
|
|
||||||
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #8, #9, #10)
|
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #8, #9, #10)
|
||||||
if pc.iceTransport != nil {
|
if pc.iceTransport != nil {
|
||||||
closeErrs = append(closeErrs, pc.iceTransport.Stop())
|
closeErrs = append(closeErrs, pc.iceTransport.Stop())
|
||||||
@@ -2104,13 +2087,6 @@ func (pc *PeerConnection) Close() error {
|
|||||||
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #11)
|
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #11)
|
||||||
pc.updateConnectionState(pc.ICEConnectionState(), pc.dtlsTransport.State())
|
pc.updateConnectionState(pc.ICEConnectionState(), pc.dtlsTransport.State())
|
||||||
|
|
||||||
// non-canon steps
|
|
||||||
pc.sctpTransport.lock.Lock()
|
|
||||||
for _, d := range pc.sctpTransport.dataChannels {
|
|
||||||
closeErrs = append(closeErrs, d.close(true))
|
|
||||||
}
|
|
||||||
pc.sctpTransport.lock.Unlock()
|
|
||||||
|
|
||||||
return util.FlattenErrs(closeErrs)
|
return util.FlattenErrs(closeErrs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -7,8 +7,6 @@
|
|||||||
package webrtc
|
package webrtc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"runtime"
|
|
||||||
"strings"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -181,103 +179,3 @@ func TestPeerConnection_Close_DuringICE(t *testing.T) {
|
|||||||
t.Error("pcOffer.Close() Timeout")
|
t.Error("pcOffer.Close() Timeout")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPeerConnection_CloseWithIncomingMessages(t *testing.T) {
|
|
||||||
// Limit runtime in case of deadlocks
|
|
||||||
lim := test.TimeOut(time.Second * 20)
|
|
||||||
defer lim.Stop()
|
|
||||||
|
|
||||||
report := CheckRoutinesIntolerant(t)
|
|
||||||
defer report()
|
|
||||||
|
|
||||||
pcOffer, pcAnswer, err := newPair()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var dcAnswer *DataChannel
|
|
||||||
answerDataChannelOpened := make(chan struct{})
|
|
||||||
pcAnswer.OnDataChannel(func(d *DataChannel) {
|
|
||||||
// Make sure this is the data channel we were looking for. (Not the one
|
|
||||||
// created in signalPair).
|
|
||||||
if d.Label() != "data" {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
dcAnswer = d
|
|
||||||
close(answerDataChannelOpened)
|
|
||||||
})
|
|
||||||
|
|
||||||
dcOffer, err := pcOffer.CreateDataChannel("data", nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
offerDataChannelOpened := make(chan struct{})
|
|
||||||
dcOffer.OnOpen(func() {
|
|
||||||
close(offerDataChannelOpened)
|
|
||||||
})
|
|
||||||
|
|
||||||
err = signalPair(pcOffer, pcAnswer)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
<-offerDataChannelOpened
|
|
||||||
<-answerDataChannelOpened
|
|
||||||
|
|
||||||
msgNum := 0
|
|
||||||
dcOffer.OnMessage(func(_ DataChannelMessage) {
|
|
||||||
t.Log("msg", msgNum)
|
|
||||||
msgNum++
|
|
||||||
})
|
|
||||||
|
|
||||||
// send 50 messages, then close pcOffer, and then send another 50
|
|
||||||
for i := 0; i < 100; i++ {
|
|
||||||
if i == 50 {
|
|
||||||
err = pcOffer.Close()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ = dcAnswer.Send([]byte("hello!"))
|
|
||||||
}
|
|
||||||
|
|
||||||
err = pcAnswer.Close()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// CheckRoutinesIntolerant is used to check for leaked go-routines.
|
|
||||||
// It differs from test.CheckRoutines in that it won't wait at all
|
|
||||||
// for lingering goroutines. This is helpful for tests that need
|
|
||||||
// to ensure clean closure of resources.
|
|
||||||
func CheckRoutinesIntolerant(t *testing.T) func() {
|
|
||||||
return func() {
|
|
||||||
routines := getRoutines()
|
|
||||||
if len(routines) == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
t.Fatalf("%s: \n%s", "Unexpected routines on test end", strings.Join(routines, "\n\n")) // nolint
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func getRoutines() []string {
|
|
||||||
buf := make([]byte, 2<<20)
|
|
||||||
buf = buf[:runtime.Stack(buf, true)]
|
|
||||||
return filterRoutines(strings.Split(string(buf), "\n\n"))
|
|
||||||
}
|
|
||||||
|
|
||||||
func filterRoutines(routines []string) []string {
|
|
||||||
result := []string{}
|
|
||||||
for _, stack := range routines {
|
|
||||||
if stack == "" || // Empty
|
|
||||||
strings.Contains(stack, "testing.Main(") || // Tests
|
|
||||||
strings.Contains(stack, "testing.(*T).Run(") || // Test run
|
|
||||||
strings.Contains(stack, "getRoutines(") { // This routine
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
result = append(result, stack)
|
|
||||||
}
|
|
||||||
return result
|
|
||||||
}
|
|
||||||
|
@@ -1623,3 +1623,37 @@ func TestPeerConnectionState(t *testing.T) {
|
|||||||
assert.NoError(t, pc.Close())
|
assert.NoError(t, pc.Close())
|
||||||
assert.Equal(t, PeerConnectionStateClosed, pc.ConnectionState())
|
assert.Equal(t, PeerConnectionStateClosed, pc.ConnectionState())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPeerConnectionDeadlock(t *testing.T) {
|
||||||
|
lim := test.TimeOut(time.Second * 5)
|
||||||
|
defer lim.Stop()
|
||||||
|
|
||||||
|
report := test.CheckRoutines(t)
|
||||||
|
defer report()
|
||||||
|
|
||||||
|
closeHdlr := func(peerConnection *PeerConnection) {
|
||||||
|
peerConnection.OnICEConnectionStateChange(func(i ICEConnectionState) {
|
||||||
|
if i == ICEConnectionStateFailed || i == ICEConnectionStateClosed {
|
||||||
|
if err := peerConnection.Close(); err != nil {
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pcOffer, pcAnswer, err := NewAPI().newPair(Configuration{})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
assert.NoError(t, signalPair(pcOffer, pcAnswer))
|
||||||
|
|
||||||
|
onDataChannel, onDataChannelCancel := context.WithCancel(context.Background())
|
||||||
|
pcAnswer.OnDataChannel(func(*DataChannel) {
|
||||||
|
onDataChannelCancel()
|
||||||
|
})
|
||||||
|
<-onDataChannel.Done()
|
||||||
|
|
||||||
|
closeHdlr(pcOffer)
|
||||||
|
closeHdlr(pcAnswer)
|
||||||
|
|
||||||
|
closePairNow(t, pcOffer, pcAnswer)
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user