Fix error handling in RTPReceiver.Receive

If we failed to startReceive we would still make the Receiver as ready
to start reading.
    
Fixes #2929
This commit is contained in:
LeeTeng2001
2025-02-13 00:47:42 +08:00
committed by GitHub
parent 1c45355b8c
commit 306dc37769
3 changed files with 50 additions and 5 deletions

View File

@@ -1127,10 +1127,13 @@ func TestPeerConnection_Renegotiation_Simulcast(t *testing.T) {
defer trackMapLock.Unlock() defer trackMapLock.Unlock()
for _, track := range trackMap { for _, track := range trackMap {
_, _, err := track.ReadRTP() // Ignore first Read, this is our peeked data _, _, err := track.ReadRTP()
assert.Nil(t, err)
// Ignore first Read, this was our peeked data
if err == nil {
_, _, err = track.ReadRTP()
}
_, _, err = track.ReadRTP()
assert.Equal(t, err, io.EOF) assert.Equal(t, err, io.EOF)
} }
} }

View File

@@ -193,7 +193,6 @@ func (r *RTPReceiver) startReceive(parameters RTPReceiveParameters) error { //no
return errRTPReceiverReceiveAlreadyCalled return errRTPReceiverReceiveAlreadyCalled
default: default:
} }
defer close(r.received)
globalParams := r.getParameters() globalParams := r.getParameters()
codec := RTPCodecCapability{} codec := RTPCodecCapability{}
@@ -257,6 +256,8 @@ func (r *RTPReceiver) startReceive(parameters RTPReceiveParameters) error { //no
} }
} }
close(r.received)
return nil return nil
} }
@@ -404,7 +405,12 @@ func (r *RTPReceiver) streamsForTrack(t *TrackRemote) *trackStreams {
// readRTP should only be called by a track, this only exists so we can keep state in one place. // readRTP should only be called by a track, this only exists so we can keep state in one place.
func (r *RTPReceiver) readRTP(b []byte, reader *TrackRemote) (n int, a interceptor.Attributes, err error) { func (r *RTPReceiver) readRTP(b []byte, reader *TrackRemote) (n int, a interceptor.Attributes, err error) {
<-r.received select {
case <-r.received:
case <-r.closed:
return 0, nil, io.EOF
}
if t := r.streamsForTrack(reader); t != nil { if t := r.streamsForTrack(reader); t != nil {
return t.rtpInterceptor.Read(b, a) return t.rtpInterceptor.Read(b, a)
} }

View File

@@ -8,6 +8,7 @@ package webrtc
import ( import (
"context" "context"
"io"
"testing" "testing"
"time" "time"
@@ -74,3 +75,38 @@ func TestSetRTPParameters(t *testing.T) {
assert.NoError(t, wan.Stop()) assert.NoError(t, wan.Stop())
closePairNow(t, sender, receiver) closePairNow(t, sender, receiver)
} }
func TestReceiveError(t *testing.T) {
api := NewAPI()
dtlsTransport, err := api.NewDTLSTransport(nil, nil)
assert.NoError(t, err)
rtpReceiver, err := api.NewRTPReceiver(RTPCodecTypeVideo, dtlsTransport)
assert.NoError(t, err)
rtpParameters := RTPReceiveParameters{
Encodings: []RTPDecodingParameters{
{
RTPCodingParameters: RTPCodingParameters{
SSRC: 1000,
},
},
},
}
assert.Error(t, rtpReceiver.Receive(rtpParameters))
chanErrs := make(chan error)
go func() {
_, _, chanErr := rtpReceiver.Read(nil)
chanErrs <- chanErr
_, _, chanErr = rtpReceiver.Track().ReadRTP()
chanErrs <- chanErr
}()
assert.NoError(t, rtpReceiver.Stop())
assert.Error(t, io.ErrClosedPipe, <-chanErrs)
assert.Error(t, io.ErrClosedPipe, <-chanErrs)
}