Revert "Make RTPTransceiver Stopped an atomic"

This reverts commit 6c3620093d.
This commit would cause sender.ReadRTCP() to never return
even when pc associated with this sender was closed.
The aftermath is leaked goroutines that will never stop.
This commit is contained in:
digitalix
2021-09-24 14:59:31 +01:00
committed by Patryk Rogalski
parent 5e98c50d8b
commit f93ea80d85
5 changed files with 59 additions and 97 deletions

View File

@@ -12,20 +12,9 @@ func (b *atomicBool) set(value bool) { // nolint: unparam
i = 1
}
atomic.StoreInt32(&b.val, i)
atomic.StoreInt32(&(b.val), i)
}
func (b *atomicBool) get() bool {
return atomic.LoadInt32(&b.val) != 0
}
func (b *atomicBool) compareAndSwap(old, new bool) (swapped bool) {
var oldval, newval int32
if old {
oldval = 1
}
if new {
newval = 1
}
return atomic.CompareAndSwapInt32(&b.val, oldval, newval)
return atomic.LoadInt32(&(b.val)) != 0
}

View File

@@ -377,15 +377,15 @@ func (pc *PeerConnection) checkNegotiationNeeded() bool { //nolint:gocognit
for _, t := range pc.rtpTransceivers {
// https://www.w3.org/TR/webrtc/#dfn-update-the-negotiation-needed-flag
// Step 5.1
// if t.stopping && !t.Stopped() {
// if t.stopping && !t.stopped {
// return true
// }
m := getByMid(t.Mid(), localDesc)
// Step 5.2
if !t.Stopped() && m == nil {
if !t.stopped && m == nil {
return true
}
if !t.Stopped() && m != nil {
if !t.stopped && m != nil {
// Step 5.3.1
if t.Direction() == RTPTransceiverDirectionSendrecv || t.Direction() == RTPTransceiverDirectionSendonly {
descMsid, okMsid := m.Attribute(sdp.AttrKeyMsid)
@@ -414,7 +414,7 @@ func (pc *PeerConnection) checkNegotiationNeeded() bool { //nolint:gocognit
}
}
// Step 5.4
if t.Stopped() && t.Mid() != "" {
if t.stopped && t.Mid() != "" {
if getByMid(t.Mid(), localDesc) != nil || getByMid(t.Mid(), remoteDesc) != nil {
return true
}
@@ -1257,7 +1257,7 @@ func (pc *PeerConnection) startRTPReceivers(incomingTracks []trackDetails, curre
}
receiver := t.Receiver()
if (incomingTrack.kind != t.Kind()) ||
if (incomingTrack.kind != t.kind) ||
(t.Direction() != RTPTransceiverDirectionRecvonly && t.Direction() != RTPTransceiverDirectionSendrecv) ||
receiver == nil ||
(receiver.haveReceived()) {
@@ -1621,7 +1621,7 @@ func (pc *PeerConnection) AddTrack(track TrackLocal) (*RTPSender, error) {
pc.mu.Lock()
defer pc.mu.Unlock()
for _, t := range pc.rtpTransceivers {
if !t.Stopped() && t.Kind() == track.Kind() && t.Sender() == nil {
if !t.stopped && t.kind == track.Kind() && t.Sender() == nil {
sender, err := pc.api.NewRTPSender(track, pc.dtlsTransport)
if err == nil {
err = t.SetSender(sender, track)
@@ -1882,7 +1882,7 @@ func (pc *PeerConnection) Close() error {
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #4)
pc.mu.Lock()
for _, t := range pc.rtpTransceivers {
if !t.Stopped() {
if !t.stopped {
closeErrs = append(closeErrs, t.Stop())
}
}
@@ -2186,9 +2186,9 @@ func (pc *PeerConnection) generateUnmatchedSDP(transceivers []*RTPTransceiver, u
audio := make([]*RTPTransceiver, 0)
for _, t := range transceivers {
if t.Kind() == RTPCodecTypeVideo {
if t.kind == RTPCodecTypeVideo {
video = append(video, t)
} else if t.Kind() == RTPCodecTypeAudio {
} else if t.kind == RTPCodecTypeAudio {
audio = append(audio, t)
}
if sender := t.Sender(); sender != nil {
@@ -2288,7 +2288,8 @@ func (pc *PeerConnection) generateMatchedSDP(transceivers []*RTPTransceiver, use
t, localTransceivers = satisfyTypeAndDirection(kind, direction, localTransceivers)
if t == nil {
if len(mediaTransceivers) == 0 {
t = newRTPTransceiver(nil, nil, RTPTransceiverDirectionInactive, kind, pc.api)
t = &RTPTransceiver{kind: kind, api: pc.api, codecs: pc.api.mediaEngine.getCodecsByKind(kind)}
t.setDirection(RTPTransceiverDirectionInactive)
mediaTransceivers = append(mediaTransceivers, t)
}
break

View File

@@ -178,7 +178,7 @@ func TestPeerConnection_SetConfiguration_Go(t *testing.T) {
certificate2, err := GenerateCertificate(secretKey2)
assert.Nil(t, err)
for _, testcase := range []struct {
for _, test := range []struct {
name string
init func() (*PeerConnection, error)
config Configuration
@@ -266,14 +266,14 @@ func TestPeerConnection_SetConfiguration_Go(t *testing.T) {
wantErr: &rtcerr.InvalidAccessError{Err: ErrNoTurnCredentials},
},
} {
pc, err := testcase.init()
pc, err := test.init()
if err != nil {
t.Errorf("SetConfiguration %q: init failed: %v", testcase.name, err)
t.Errorf("SetConfiguration %q: init failed: %v", test.name, err)
}
err = pc.SetConfiguration(testcase.config)
if got, want := err, testcase.wantErr; !reflect.DeepEqual(got, want) {
t.Errorf("SetConfiguration %q: err = %v, want %v", testcase.name, got, want)
err = pc.SetConfiguration(test.config)
if got, want := err, test.wantErr; !reflect.DeepEqual(got, want) {
t.Errorf("SetConfiguration %q: err = %v, want %v", test.name, got, want)
}
assert.NoError(t, pc.Close())
@@ -446,7 +446,14 @@ func TestPeerConnection_AnswerWithClosedConnection(t *testing.T) {
}
func TestPeerConnection_satisfyTypeAndDirection(t *testing.T) {
for _, testcase := range []struct {
createTransceiver := func(kind RTPCodecType, direction RTPTransceiverDirection) *RTPTransceiver {
r := &RTPTransceiver{kind: kind}
r.setDirection(direction)
return r
}
for _, test := range []struct {
name string
kinds []RTPCodecType
@@ -459,7 +466,7 @@ func TestPeerConnection_satisfyTypeAndDirection(t *testing.T) {
"Audio and Video Transceivers can not satisfy each other",
[]RTPCodecType{RTPCodecTypeVideo},
[]RTPTransceiverDirection{RTPTransceiverDirectionSendrecv},
[]*RTPTransceiver{newRTPTransceiver(nil, nil, RTPTransceiverDirectionSendrecv, RTPCodecTypeAudio, nil)},
[]*RTPTransceiver{createTransceiver(RTPCodecTypeAudio, RTPTransceiverDirectionSendrecv)},
[]*RTPTransceiver{nil},
},
{
@@ -481,9 +488,9 @@ func TestPeerConnection_satisfyTypeAndDirection(t *testing.T) {
[]RTPCodecType{RTPCodecTypeVideo},
[]RTPTransceiverDirection{RTPTransceiverDirectionSendrecv},
[]*RTPTransceiver{newRTPTransceiver(nil, nil, RTPTransceiverDirectionRecvonly, RTPCodecTypeVideo, nil)},
[]*RTPTransceiver{createTransceiver(RTPCodecTypeVideo, RTPTransceiverDirectionRecvonly)},
[]*RTPTransceiver{newRTPTransceiver(nil, nil, RTPTransceiverDirectionRecvonly, RTPCodecTypeVideo, nil)},
[]*RTPTransceiver{createTransceiver(RTPCodecTypeVideo, RTPTransceiverDirectionRecvonly)},
},
{
"Don't satisfy a Sendonly with a SendRecv, later SendRecv will be marked as Inactive",
@@ -491,39 +498,39 @@ func TestPeerConnection_satisfyTypeAndDirection(t *testing.T) {
[]RTPTransceiverDirection{RTPTransceiverDirectionSendonly, RTPTransceiverDirectionSendrecv},
[]*RTPTransceiver{
newRTPTransceiver(nil, nil, RTPTransceiverDirectionSendrecv, RTPCodecTypeVideo, nil),
newRTPTransceiver(nil, nil, RTPTransceiverDirectionRecvonly, RTPCodecTypeVideo, nil),
createTransceiver(RTPCodecTypeVideo, RTPTransceiverDirectionSendrecv),
createTransceiver(RTPCodecTypeVideo, RTPTransceiverDirectionRecvonly),
},
[]*RTPTransceiver{
newRTPTransceiver(nil, nil, RTPTransceiverDirectionRecvonly, RTPCodecTypeVideo, nil),
newRTPTransceiver(nil, nil, RTPTransceiverDirectionSendrecv, RTPCodecTypeVideo, nil),
createTransceiver(RTPCodecTypeVideo, RTPTransceiverDirectionRecvonly),
createTransceiver(RTPCodecTypeVideo, RTPTransceiverDirectionSendrecv),
},
},
} {
if len(testcase.kinds) != len(testcase.directions) {
if len(test.kinds) != len(test.directions) {
t.Fatal("Kinds and Directions must be the same length")
}
got := []*RTPTransceiver{}
for i := range testcase.kinds {
res, filteredLocalTransceivers := satisfyTypeAndDirection(testcase.kinds[i], testcase.directions[i], testcase.localTransceivers)
for i := range test.kinds {
res, filteredLocalTransceivers := satisfyTypeAndDirection(test.kinds[i], test.directions[i], test.localTransceivers)
got = append(got, res)
testcase.localTransceivers = filteredLocalTransceivers
test.localTransceivers = filteredLocalTransceivers
}
if !reflect.DeepEqual(got, testcase.want) {
if !reflect.DeepEqual(got, test.want) {
gotStr := ""
for _, t := range got {
gotStr += fmt.Sprintf("%+v\n", t)
}
wantStr := ""
for _, t := range testcase.want {
for _, t := range test.want {
wantStr += fmt.Sprintf("%+v\n", t)
}
t.Errorf("satisfyTypeAndDirection %q: \ngot\n%s \nwant\n%s", testcase.name, gotStr, wantStr)
t.Errorf("satisfyTypeAndDirection %q: \ngot\n%s \nwant\n%s", test.name, gotStr, wantStr)
}
}
}
@@ -1251,7 +1258,7 @@ func TestPeerConnection_TransceiverDirection(t *testing.T) {
return err
}
for _, testcase := range []struct {
for _, test := range []struct {
name string
offerDirection RTPTransceiverDirection
answerStartDirection RTPTransceiverDirection
@@ -1312,11 +1319,11 @@ func TestPeerConnection_TransceiverDirection(t *testing.T) {
[]RTPTransceiverDirection{RTPTransceiverDirectionRecvonly, RTPTransceiverDirectionSendonly},
},
} {
offerDirection := testcase.offerDirection
answerStartDirection := testcase.answerStartDirection
answerFinalDirections := testcase.answerFinalDirections
offerDirection := test.offerDirection
answerStartDirection := test.answerStartDirection
answerFinalDirections := test.answerFinalDirections
t.Run(testcase.name, func(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
pcOffer, pcAnswer, err := newPair()
assert.NoError(t, err)
@@ -1426,34 +1433,3 @@ func TestPeerConnectionNilCallback(t *testing.T) {
assert.NoError(t, pc.Close())
}
func TestPeerConnection_SkipStoppedTransceiver(t *testing.T) {
defer test.TimeOut(time.Second).Stop()
pc, err := NewPeerConnection(Configuration{})
assert.NoError(t, err)
track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: "video/vp8"}, "video1", "pion")
assert.NoError(t, err)
transceiver, err := pc.AddTransceiverFromTrack(track)
assert.NoError(t, err)
assert.Equal(t, 1, len(pc.GetTransceivers()))
assert.NoError(t, pc.RemoveTrack(transceiver.Sender()))
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, transceiver.Stop())
}() // no error, no panic
}
wg.Wait()
track, err = NewTrackLocalStaticSample(RTPCodecCapability{MimeType: "video/vp8"}, "video2", "pion")
assert.NoError(t, err)
_, err = pc.AddTrack(track) // should not use the above stopped transceiver
assert.NoError(t, err)
assert.Equal(t, 2, len(pc.GetTransceivers()))
assert.NoError(t, pc.Close())
}

View File

@@ -19,7 +19,7 @@ type RTPTransceiver struct {
codecs []RTPCodecParameters // User provided codecs via SetCodecPreferences
stopped atomicBool
stopped bool
kind RTPCodecType
api *API
@@ -141,26 +141,21 @@ func (t *RTPTransceiver) Direction() RTPTransceiverDirection {
// Stop irreversibly stops the RTPTransceiver
func (t *RTPTransceiver) Stop() error {
if t.stopped.compareAndSwap(false, true) {
if sender := t.Sender(); sender != nil {
if err := sender.Stop(); err != nil {
return err
}
}
if receiver := t.Receiver(); receiver != nil {
if err := receiver.Stop(); err != nil {
return err
}
t.setDirection(RTPTransceiverDirectionInactive)
if sender := t.Sender(); sender != nil {
if err := sender.Stop(); err != nil {
return err
}
}
if receiver := t.Receiver(); receiver != nil {
if err := receiver.Stop(); err != nil {
return err
}
}
t.setDirection(RTPTransceiverDirectionInactive)
return nil
}
// Stopped indicates whether or not RTPTransceiver has been stopped
func (t *RTPTransceiver) Stopped() bool { return t.stopped.get() }
func (t *RTPTransceiver) setReceiver(r *RTPReceiver) {
if r != nil {
r.setRTPTransceiver(t)

View File

@@ -375,7 +375,8 @@ func TestPopulateSDP(t *testing.T) {
assert.NoError(t, me.RegisterDefaultCodecs())
api := NewAPI(WithMediaEngine(me))
tr := newRTPTransceiver(nil, nil, RTPTransceiverDirectionRecvonly, RTPCodecTypeVideo, api)
tr := &RTPTransceiver{kind: RTPCodecTypeVideo, api: api, codecs: me.videoCodecs}
tr.setDirection(RTPTransceiverDirectionRecvonly)
ridMap := map[string]string{
"ridkey": "some",
}