From 306dc3776934194e66fc021700ddfe827105c5b3 Mon Sep 17 00:00:00 2001 From: LeeTeng2001 <63984709+LeeTeng2001@users.noreply.github.com> Date: Thu, 13 Feb 2025 00:47:42 +0800 Subject: [PATCH] Fix error handling in RTPReceiver.Receive If we failed to startReceive we would still make the Receiver as ready to start reading. Fixes #2929 --- peerconnection_renegotiation_test.go | 9 ++++--- rtpreceiver.go | 10 ++++++-- rtpreceiver_go_test.go | 36 ++++++++++++++++++++++++++++ 3 files changed, 50 insertions(+), 5 deletions(-) diff --git a/peerconnection_renegotiation_test.go b/peerconnection_renegotiation_test.go index 2782e953..fccb0299 100644 --- a/peerconnection_renegotiation_test.go +++ b/peerconnection_renegotiation_test.go @@ -1127,10 +1127,13 @@ func TestPeerConnection_Renegotiation_Simulcast(t *testing.T) { defer trackMapLock.Unlock() for _, track := range trackMap { - _, _, err := track.ReadRTP() // Ignore first Read, this is our peeked data - assert.Nil(t, err) + _, _, err := track.ReadRTP() + + // Ignore first Read, this was our peeked data + if err == nil { + _, _, err = track.ReadRTP() + } - _, _, err = track.ReadRTP() assert.Equal(t, err, io.EOF) } } diff --git a/rtpreceiver.go b/rtpreceiver.go index b4757636..109f88a4 100644 --- a/rtpreceiver.go +++ b/rtpreceiver.go @@ -193,7 +193,6 @@ func (r *RTPReceiver) startReceive(parameters RTPReceiveParameters) error { //no return errRTPReceiverReceiveAlreadyCalled default: } - defer close(r.received) globalParams := r.getParameters() codec := RTPCodecCapability{} @@ -257,6 +256,8 @@ func (r *RTPReceiver) startReceive(parameters RTPReceiveParameters) error { //no } } + close(r.received) + return nil } @@ -404,7 +405,12 @@ func (r *RTPReceiver) streamsForTrack(t *TrackRemote) *trackStreams { // readRTP should only be called by a track, this only exists so we can keep state in one place. func (r *RTPReceiver) readRTP(b []byte, reader *TrackRemote) (n int, a interceptor.Attributes, err error) { - <-r.received + select { + case <-r.received: + case <-r.closed: + return 0, nil, io.EOF + } + if t := r.streamsForTrack(reader); t != nil { return t.rtpInterceptor.Read(b, a) } diff --git a/rtpreceiver_go_test.go b/rtpreceiver_go_test.go index 2b7ff6de..81e7d7ad 100644 --- a/rtpreceiver_go_test.go +++ b/rtpreceiver_go_test.go @@ -8,6 +8,7 @@ package webrtc import ( "context" + "io" "testing" "time" @@ -74,3 +75,38 @@ func TestSetRTPParameters(t *testing.T) { assert.NoError(t, wan.Stop()) closePairNow(t, sender, receiver) } + +func TestReceiveError(t *testing.T) { + api := NewAPI() + + dtlsTransport, err := api.NewDTLSTransport(nil, nil) + assert.NoError(t, err) + + rtpReceiver, err := api.NewRTPReceiver(RTPCodecTypeVideo, dtlsTransport) + assert.NoError(t, err) + + rtpParameters := RTPReceiveParameters{ + Encodings: []RTPDecodingParameters{ + { + RTPCodingParameters: RTPCodingParameters{ + SSRC: 1000, + }, + }, + }, + } + + assert.Error(t, rtpReceiver.Receive(rtpParameters)) + + chanErrs := make(chan error) + go func() { + _, _, chanErr := rtpReceiver.Read(nil) + chanErrs <- chanErr + + _, _, chanErr = rtpReceiver.Track().ReadRTP() + chanErrs <- chanErr + }() + + assert.NoError(t, rtpReceiver.Stop()) + assert.Error(t, io.ErrClosedPipe, <-chanErrs) + assert.Error(t, io.ErrClosedPipe, <-chanErrs) +}