Fix data race of RTX packet

Fix data race of RTX packet
This commit is contained in:
cnderrauber
2023-10-01 22:28:19 +08:00
committed by cnderrauber
parent f68b789150
commit 219c6a35ce
2 changed files with 20 additions and 2 deletions

View File

@@ -43,6 +43,15 @@ type trackStreams struct {
type rtxPacketWithAttributes struct {
pkt []byte
attributes interceptor.Attributes
pool *sync.Pool
}
func (p *rtxPacketWithAttributes) release() {
if p.pkt != nil {
b := p.pkt[:cap(p.pkt)]
p.pool.Put(b) // nolint:staticcheck
p.pkt = nil
}
}
// RTPReceiver allows an application to inspect the receipt of a TrackRemote
@@ -59,6 +68,8 @@ type RTPReceiver struct {
// A reference to the associated api object
api *API
rtxPool sync.Pool
}
// NewRTPReceiver constructs a new RTPReceiver
@@ -74,6 +85,9 @@ func (api *API) NewRTPReceiver(kind RTPCodecType, transport *DTLSTransport) (*RT
closed: make(chan interface{}),
received: make(chan interface{}),
tracks: []trackStreams{},
rtxPool: sync.Pool{New: func() interface{} {
return make([]byte, api.settingEngine.getReceiveMTU())
}},
}
return r, nil
@@ -411,10 +425,11 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep
track.repairStreamChannel = make(chan rtxPacketWithAttributes)
go func() {
b := make([]byte, r.api.settingEngine.getReceiveMTU())
for {
b := r.rtxPool.Get().([]byte) // nolint:forcetypeassert
i, attributes, err := track.repairInterceptor.Read(b, nil)
if err != nil {
r.rtxPool.Put(b) // nolint:staticcheck
return
}
@@ -435,6 +450,7 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep
if i-int(headerLength)-paddingLength < 2 {
// BWE probe packet, ignore
r.rtxPool.Put(b) // nolint:staticcheck
continue
}
@@ -450,8 +466,9 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep
select {
case <-r.closed:
r.rtxPool.Put(b) // nolint:staticcheck
return
case track.repairStreamChannel <- rtxPacketWithAttributes{pkt: b[:i-2], attributes: attributes}:
case track.repairStreamChannel <- rtxPacketWithAttributes{pkt: b[:i-2], attributes: attributes, pool: &r.rtxPool}:
}
}
}()

View File

@@ -131,6 +131,7 @@ func (t *TrackRemote) Read(b []byte) (n int, attributes interceptor.Attributes,
if rtxPacketReceived := r.readRTX(t); rtxPacketReceived != nil {
n = copy(b, rtxPacketReceived.pkt)
attributes = rtxPacketReceived.attributes
rtxPacketReceived.release()
err = nil
} else {
// If there's no separate RTX track (or there's a separate RTX track but no RTX packet waiting), wait for and return