Handle non-Simulcast Repair Streams

Same issue with TWCC enabled as 11b887. We need to process the
RTX packets so that we can emit proper reports.
This commit is contained in:
Sean DuBois
2021-10-01 22:32:14 -04:00
parent f93ea80d85
commit 80da22268a
4 changed files with 73 additions and 40 deletions

View File

@@ -1171,6 +1171,8 @@ func (pc *PeerConnection) startReceiver(incoming trackDetails, receiver *RTPRece
if len(incoming.ssrcs) > i { if len(incoming.ssrcs) > i {
encodings[i].SSRC = incoming.ssrcs[i] encodings[i].SSRC = incoming.ssrcs[i]
} }
encodings[i].RTX.SSRC = incoming.repairSsrc
} }
if err := receiver.Receive(RTPReceiveParameters{Encodings: encodings}); err != nil { if err := receiver.Receive(RTPReceiveParameters{Encodings: encodings}); err != nil {
@@ -1451,7 +1453,9 @@ func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) err
} }
if rsid != "" { if rsid != "" {
return receiver.receiveForRsid(rsid, streamInfo, readStream, interceptor, rtcpReadStream, rtcpInterceptor) receiver.mu.Lock()
defer receiver.mu.Unlock()
return receiver.receiveForRtx(SSRC(0), rsid, streamInfo, readStream, interceptor, rtcpReadStream, rtcpInterceptor)
} }
track, err := receiver.receiveForRid(rid, params, streamInfo, readStream, interceptor, rtcpReadStream, rtcpInterceptor) track, err := receiver.receiveForRid(rid, params, streamInfo, readStream, interceptor, rtcpReadStream, rtcpInterceptor)

View File

@@ -1,5 +1,11 @@
package webrtc package webrtc
// RTPRtxParameters dictionary contains information relating to retransmission (RTX) settings.
// https://draft.ortc.org/#dom-rtcrtprtxparameters
type RTPRtxParameters struct {
SSRC SSRC `json:"ssrc"`
}
// RTPCodingParameters provides information relating to both encoding and decoding. // RTPCodingParameters provides information relating to both encoding and decoding.
// This is a subset of the RFC since Pion WebRTC doesn't implement encoding/decoding itself // This is a subset of the RFC since Pion WebRTC doesn't implement encoding/decoding itself
// http://draft.ortc.org/#dom-rtcrtpcodingparameters // http://draft.ortc.org/#dom-rtcrtpcodingparameters
@@ -7,4 +13,5 @@ type RTPCodingParameters struct {
RID string `json:"rid"` RID string `json:"rid"`
SSRC SSRC `json:"ssrc"` SSRC SSRC `json:"ssrc"`
PayloadType PayloadType `json:"payloadType"` PayloadType PayloadType `json:"payloadType"`
RTX RTPRtxParameters `json:"rtx"`
} }

View File

@@ -158,6 +158,18 @@ func (r *RTPReceiver) Receive(parameters RTPReceiveParameters) error {
} }
r.tracks = append(r.tracks, t) r.tracks = append(r.tracks, t)
if rtxSsrc := parameters.Encodings[i].RTX.SSRC; rtxSsrc != 0 {
streamInfo := createStreamInfo("", rtxSsrc, 0, codec, globalParams.HeaderExtensions)
rtpReadStream, rtpInterceptor, rtcpReadStream, rtcpInterceptor, err := r.transport.streamsForSSRC(rtxSsrc, *streamInfo)
if err != nil {
return err
}
if err = r.receiveForRtx(rtxSsrc, "", streamInfo, rtpReadStream, rtpInterceptor, rtcpReadStream, rtcpInterceptor); err != nil {
return err
}
}
} }
return nil return nil
@@ -323,37 +335,40 @@ func (r *RTPReceiver) receiveForRid(rid string, params RTPParameters, streamInfo
return nil, fmt.Errorf("%w: %s", errRTPReceiverForRIDTrackStreamNotFound, rid) return nil, fmt.Errorf("%w: %s", errRTPReceiverForRIDTrackStreamNotFound, rid)
} }
// receiveForRsid starts a routine that processes the repair stream for a RID // receiveForRtx starts a routine that processes the repair stream
// These packets aren't exposed to the user yet, but we need to process them for // These packets aren't exposed to the user yet, but we need to process them for
// TWCC // TWCC
func (r *RTPReceiver) receiveForRsid(rsid string, streamInfo *interceptor.StreamInfo, rtpReadStream *srtp.ReadStreamSRTP, rtpInterceptor interceptor.RTPReader, rtcpReadStream *srtp.ReadStreamSRTCP, rtcpInterceptor interceptor.RTCPReader) error { func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *interceptor.StreamInfo, rtpReadStream *srtp.ReadStreamSRTP, rtpInterceptor interceptor.RTPReader, rtcpReadStream *srtp.ReadStreamSRTCP, rtcpInterceptor interceptor.RTCPReader) error {
r.mu.Lock() var track *trackStreams
defer r.mu.Unlock() if ssrc != 0 && len(r.tracks) == 1 {
track = &r.tracks[0]
} else {
for i := range r.tracks { for i := range r.tracks {
if r.tracks[i].track.RID() == rsid { if r.tracks[i].track.RID() == rsid {
var err error track = &r.tracks[i]
}
}
}
r.tracks[i].repairStreamInfo = streamInfo if track == nil {
r.tracks[i].repairReadStream = rtpReadStream return fmt.Errorf("%w: ssrc(%d) rsid(%s)", errRTPReceiverForRIDTrackStreamNotFound, ssrc, rsid)
r.tracks[i].repairInterceptor = rtpInterceptor }
r.tracks[i].repairRtcpReadStream = rtcpReadStream
r.tracks[i].repairRtcpInterceptor = rtcpInterceptor track.repairStreamInfo = streamInfo
track.repairReadStream = rtpReadStream
track.repairInterceptor = rtpInterceptor
track.repairRtcpReadStream = rtcpReadStream
track.repairRtcpInterceptor = rtcpInterceptor
go func() { go func() {
b := make([]byte, r.api.settingEngine.getReceiveMTU()) b := make([]byte, r.api.settingEngine.getReceiveMTU())
for { for {
if _, _, readErr := r.tracks[i].repairInterceptor.Read(b, nil); readErr != nil { if _, _, readErr := track.repairInterceptor.Read(b, nil); readErr != nil {
return return
} }
} }
}() }()
return nil
return err
}
}
return fmt.Errorf("%w: %s", errRTPReceiverForRIDTrackStreamNotFound, rsid)
} }
// SetReadDeadline sets the max amount of time the RTCP stream will block before returning. 0 is forever. // SetReadDeadline sets the max amount of time the RTCP stream will block before returning. 0 is forever.

15
sdp.go
View File

@@ -23,6 +23,7 @@ type trackDetails struct {
streamID string streamID string
id string id string
ssrcs []SSRC ssrcs []SSRC
repairSsrc SSRC
rids []string rids []string
} }
@@ -73,7 +74,7 @@ func filterTrackWithSSRC(incomingTracks []trackDetails, ssrc SSRC) []trackDetail
func trackDetailsFromSDP(log logging.LeveledLogger, s *sdp.SessionDescription) (incomingTracks []trackDetails) { // nolint:gocognit func trackDetailsFromSDP(log logging.LeveledLogger, s *sdp.SessionDescription) (incomingTracks []trackDetails) { // nolint:gocognit
for _, media := range s.MediaDescriptions { for _, media := range s.MediaDescriptions {
tracksInMediaSection := []trackDetails{} tracksInMediaSection := []trackDetails{}
rtxRepairFlows := map[uint32]bool{} rtxRepairFlows := map[uint64]uint64{}
// Plan B can have multiple tracks in a signle media section // Plan B can have multiple tracks in a signle media section
streamID := "" streamID := ""
@@ -106,7 +107,7 @@ func trackDetailsFromSDP(log logging.LeveledLogger, s *sdp.SessionDescription) (
// as this declares that the second SSRC (632943048) is a rtx repair flow (RFC4588) for the first // as this declares that the second SSRC (632943048) is a rtx repair flow (RFC4588) for the first
// (2231627014) as specified in RFC5576 // (2231627014) as specified in RFC5576
if len(split) == 3 { if len(split) == 3 {
_, err := strconv.ParseUint(split[1], 10, 32) baseSsrc, err := strconv.ParseUint(split[1], 10, 32)
if err != nil { if err != nil {
log.Warnf("Failed to parse SSRC: %v", err) log.Warnf("Failed to parse SSRC: %v", err)
continue continue
@@ -116,7 +117,7 @@ func trackDetailsFromSDP(log logging.LeveledLogger, s *sdp.SessionDescription) (
log.Warnf("Failed to parse SSRC: %v", err) log.Warnf("Failed to parse SSRC: %v", err)
continue continue
} }
rtxRepairFlows[uint32(rtxRepairFlow)] = true rtxRepairFlows[rtxRepairFlow] = baseSsrc
tracksInMediaSection = filterTrackWithSSRC(tracksInMediaSection, SSRC(rtxRepairFlow)) // Remove if rtx was added as track before tracksInMediaSection = filterTrackWithSSRC(tracksInMediaSection, SSRC(rtxRepairFlow)) // Remove if rtx was added as track before
} }
} }
@@ -139,7 +140,7 @@ func trackDetailsFromSDP(log logging.LeveledLogger, s *sdp.SessionDescription) (
continue continue
} }
if rtxRepairFlow := rtxRepairFlows[uint32(ssrc)]; rtxRepairFlow { if _, ok := rtxRepairFlows[ssrc]; ok {
continue // This ssrc is a RTX repair flow, ignore continue // This ssrc is a RTX repair flow, ignore
} }
@@ -165,6 +166,12 @@ func trackDetailsFromSDP(log logging.LeveledLogger, s *sdp.SessionDescription) (
trackDetails.id = trackID trackDetails.id = trackID
trackDetails.ssrcs = []SSRC{SSRC(ssrc)} trackDetails.ssrcs = []SSRC{SSRC(ssrc)}
for repairSsrc, baseSsrc := range rtxRepairFlows {
if baseSsrc == ssrc {
trackDetails.repairSsrc = SSRC(repairSsrc)
}
}
if isNewTrack { if isNewTrack {
tracksInMediaSection = append(tracksInMediaSection, *trackDetails) tracksInMediaSection = append(tracksInMediaSection, *trackDetails)
} }