Fixes issue 1822

Previously we could have situations where during
first condition like `transceiver.Sender() != nil`
there would be another condition like
`transceiver.Sender().isNegotiated()` where
`.Sender()` could become nil if changed in
a different goroutine.
This commit is contained in:
digitalix
2021-08-03 16:04:34 +01:00
committed by Sean DuBois
parent cffa6afc34
commit 06a5a14197
3 changed files with 42 additions and 39 deletions

View File

@@ -1220,7 +1220,7 @@ func (pc *PeerConnection) startRTPReceivers(incomingTracks []trackDetails, curre
incomingTrack := incomingTracks[i]
for _, t := range localTransceivers {
if (t.Receiver()) == nil || t.Receiver().Track() == nil || t.Receiver().Track().ssrc != incomingTrack.ssrc {
if receiver := t.Receiver(); receiver == nil || receiver.Track() == nil || receiver.Track().ssrc != incomingTrack.ssrc {
continue
}
@@ -1239,14 +1239,15 @@ func (pc *PeerConnection) startRTPReceivers(incomingTracks []trackDetails, curre
continue
}
receiver := t.Receiver()
if (incomingTrack.kind != t.kind) ||
(t.Direction() != RTPTransceiverDirectionRecvonly && t.Direction() != RTPTransceiverDirectionSendrecv) ||
(t.Receiver()) == nil ||
(t.Receiver().haveReceived()) {
receiver == nil ||
(receiver.haveReceived()) {
continue
}
pc.startReceiver(incomingTrack, t.Receiver())
pc.startReceiver(incomingTrack, receiver)
trackHandled = true
break
}
@@ -1273,8 +1274,8 @@ func (pc *PeerConnection) startRTPReceivers(incomingTracks []trackDetails, curre
// startRTPSenders starts all outbound RTP streams
func (pc *PeerConnection) startRTPSenders(currentTransceivers []*RTPTransceiver) error {
for _, transceiver := range currentTransceivers {
if transceiver.Sender() != nil && transceiver.Sender().isNegotiated() && !transceiver.Sender().hasSent() {
err := transceiver.Sender().Send(transceiver.Sender().GetParameters())
if sender := transceiver.Sender(); sender != nil && sender.isNegotiated() && !sender.hasSent() {
err := sender.Send(sender.GetParameters())
if err != nil {
return err
}
@@ -1394,15 +1395,16 @@ func (pc *PeerConnection) handleUndeclaredSSRC(rtpStream io.Reader, ssrc SSRC) e
}
for _, t := range pc.GetTransceivers() {
if t.Mid() != mid || t.Receiver() == nil {
receiver := t.Receiver()
if t.Mid() != mid || receiver == nil {
continue
}
track, err := t.Receiver().receiveForRid(rid, params, ssrc)
track, err := receiver.receiveForRid(rid, params, ssrc)
if err != nil {
return err
}
pc.onTrack(track, t.Receiver())
pc.onTrack(track, receiver)
return nil
}
}
@@ -1521,8 +1523,8 @@ func (pc *PeerConnection) GetSenders() (result []*RTPSender) {
defer pc.mu.Unlock()
for _, transceiver := range pc.rtpTransceivers {
if transceiver.Sender() != nil {
result = append(result, transceiver.Sender())
if sender := transceiver.Sender(); sender != nil {
result = append(result, sender)
}
}
return result
@@ -1534,8 +1536,8 @@ func (pc *PeerConnection) GetReceivers() (receivers []*RTPReceiver) {
defer pc.mu.Unlock()
for _, transceiver := range pc.rtpTransceivers {
if transceiver.Receiver() != nil {
receivers = append(receivers, transceiver.Receiver())
if receiver := transceiver.Receiver(); receiver != nil {
receivers = append(receivers, receiver)
}
}
return
@@ -2031,28 +2033,29 @@ func (pc *PeerConnection) startRTP(isRenegotiation bool, remoteDesc *SessionDesc
trackDetails := trackDetailsFromSDP(pc.log, remoteDesc.parsed)
if isRenegotiation {
for _, t := range currentTransceivers {
if t.Receiver() == nil || t.Receiver().Track() == nil {
receiver := t.Receiver()
if receiver == nil || t.Receiver().Track() == nil {
continue
}
t.Receiver().Track().mu.Lock()
ssrc := t.Receiver().Track().ssrc
receiver.Track().mu.Lock()
ssrc := receiver.Track().ssrc
if details := trackDetailsForSSRC(trackDetails, ssrc); details != nil {
t.Receiver().Track().id = details.id
t.Receiver().Track().streamID = details.streamID
t.Receiver().Track().mu.Unlock()
receiver.Track().id = details.id
receiver.Track().streamID = details.streamID
receiver.Track().mu.Unlock()
continue
}
t.Receiver().Track().mu.Unlock()
receiver.Track().mu.Unlock()
if err := t.Receiver().Stop(); err != nil {
if err := receiver.Stop(); err != nil {
pc.log.Warnf("Failed to stop RtpReceiver: %s", err)
continue
}
receiver, err := pc.api.NewRTPReceiver(t.Receiver().kind, pc.dtlsTransport)
receiver, err := pc.api.NewRTPReceiver(receiver.kind, pc.dtlsTransport)
if err != nil {
pc.log.Warnf("Failed to create new RtpReceiver: %s", err)
continue
@@ -2106,8 +2109,8 @@ func (pc *PeerConnection) generateUnmatchedSDP(transceivers []*RTPTransceiver, u
} else if t.kind == RTPCodecTypeAudio {
audio = append(audio, t)
}
if t.Sender() != nil {
t.Sender().setNegotiated()
if sender := t.Sender(); sender != nil {
sender.setNegotiated()
}
}
@@ -2123,8 +2126,8 @@ func (pc *PeerConnection) generateUnmatchedSDP(transceivers []*RTPTransceiver, u
}
} else {
for _, t := range transceivers {
if t.Sender() != nil {
t.Sender().setNegotiated()
if sender := t.Sender(); sender != nil {
sender.setNegotiated()
}
mediaSections = append(mediaSections, mediaSection{id: t.Mid(), transceivers: []*RTPTransceiver{t}})
}
@@ -2209,8 +2212,8 @@ func (pc *PeerConnection) generateMatchedSDP(transceivers []*RTPTransceiver, use
}
break
}
if t.Sender() != nil {
t.Sender().setNegotiated()
if sender := t.Sender(); sender != nil {
sender.setNegotiated()
}
mediaTransceivers = append(mediaTransceivers, t)
}
@@ -2223,8 +2226,8 @@ func (pc *PeerConnection) generateMatchedSDP(transceivers []*RTPTransceiver, use
if t == nil {
return nil, fmt.Errorf("%w: %q", errPeerConnTranscieverMidNil, midValue)
}
if t.Sender() != nil {
t.Sender().setNegotiated()
if sender := t.Sender(); sender != nil {
sender.setNegotiated()
}
mediaTransceivers := []*RTPTransceiver{t}
mediaSections = append(mediaSections, mediaSection{id: midValue, transceivers: mediaTransceivers, ridMap: getRids(media)})
@@ -2235,8 +2238,8 @@ func (pc *PeerConnection) generateMatchedSDP(transceivers []*RTPTransceiver, use
if includeUnmatched {
if !detectedPlanB {
for _, t := range localTransceivers {
if t.Sender() != nil {
t.Sender().setNegotiated()
if sender := t.Sender(); sender != nil {
sender.setNegotiated()
}
mediaSections = append(mediaSections, mediaSection{id: t.Mid(), transceivers: []*RTPTransceiver{t}})
}

View File

@@ -141,13 +141,13 @@ func (t *RTPTransceiver) Direction() RTPTransceiverDirection {
// Stop irreversibly stops the RTPTransceiver
func (t *RTPTransceiver) Stop() error {
if t.Sender() != nil {
if err := t.Sender().Stop(); err != nil {
if sender := t.Sender(); sender != nil {
if err := sender.Stop(); err != nil {
return err
}
}
if t.Receiver() != nil {
if err := t.Receiver().Stop(); err != nil {
if receiver := t.Receiver(); receiver != nil {
if err := receiver.Stop(); err != nil {
return err
}
}

6
sdp.go
View File

@@ -344,9 +344,9 @@ func addTransceiverSDP(d *sdp.SessionDescription, isPlanB, shouldAddCandidates b
}
for _, mt := range transceivers {
if mt.Sender() != nil && mt.Sender().Track() != nil {
track := mt.Sender().Track()
media = media.WithMediaSource(uint32(mt.Sender().ssrc), track.StreamID() /* cname */, track.StreamID() /* streamLabel */, track.ID())
if sender := mt.Sender(); sender != nil && sender.Track() != nil {
track := sender.Track()
media = media.WithMediaSource(uint32(sender.ssrc), track.StreamID() /* cname */, track.StreamID() /* streamLabel */, track.ID())
if !isPlanB {
media = media.WithPropertyAttribute("msid:" + track.StreamID() + " " + track.ID())
break