Fix SRTP shutdown race

Add Stop() to RTCDtlsTransport and guard the shutdown of SRTP
This commit is contained in:
Sean DuBois
2019-01-31 01:19:26 -08:00
parent e88821b6b7
commit 88b495c7f5
5 changed files with 64 additions and 32 deletions

View File

@@ -212,6 +212,31 @@ func (t *RTCDtlsTransport) Start(remoteParameters RTCDtlsParameters) error {
return nil
}
// Stop stops and closes the RTCDtlsTransport object.
func (t *RTCDtlsTransport) Stop() error {
t.lock.Lock()
defer t.lock.Unlock()
// Try closing everything and collect the errors
var closeErrs []error
if t.srtpSession != nil {
if err := t.srtpSession.Close(); err != nil {
closeErrs = append(closeErrs, err)
}
}
if t.srtcpSession != nil {
if err := t.srtcpSession.Close(); err != nil {
closeErrs = append(closeErrs, err)
}
}
// TODO: Close DTLS itself? Currently closed by ICE
return flattenErrs(closeErrs)
}
func (t *RTCDtlsTransport) validateFingerPrint(remoteParameters RTCDtlsParameters, remoteCert *x509.Certificate) error {
for _, fp := range remoteParameters.Fingerprints {
hashAlgo, err := dtls.HashAlgorithmString(fp.Algorithm)

View File

@@ -834,6 +834,16 @@ func (pc *RTCPeerConnection) SetRemoteDescription(desc RTCSessionDescription) er
} else {
pcLog.Warnf("OnTrack unset, unable to handle incoming media streams")
}
for _, tranceiver := range pc.rtpTransceivers {
if tranceiver.Sender != nil {
tranceiver.Sender.Send(RTCRtpSendParameters{
encodings: RTCRtpEncodingParameters{
RTCRtpCodingParameters{SSRC: tranceiver.Sender.Track.Ssrc, PayloadType: tranceiver.Sender.Track.PayloadType},
}})
}
}
go pc.drainSRTP()
// Start sctp
@@ -1099,10 +1109,6 @@ func (pc *RTCPeerConnection) AddTrack(track *RTCTrack) (*RTCRtpSender, error) {
sender,
RTCRtpTransceiverDirectionSendonly,
)
sender.Send(RTCRtpSendParameters{
encodings: RTCRtpEncodingParameters{
RTCRtpCodingParameters{SSRC: track.Ssrc, PayloadType: track.PayloadType},
}})
}
transceiver.Mid = track.Kind.String() // TODO: Mid generation
@@ -1292,7 +1298,7 @@ func (pc *RTCPeerConnection) SendRTCP(pkt rtcp.Packet) error {
srtcpSession, err := pc.dtlsTransport.getSRTCPSession()
if err != nil {
return fmt.Errorf("SendRTCP failed to open SRTCPSession: %v", err)
return nil // TODO SendRTCP before would gracefully discard packets until ready
}
writeStream, err := srtcpSession.OpenWriteStream()
@@ -1335,16 +1341,8 @@ func (pc *RTCPeerConnection) Close() error {
// Conn if one of the endpoints is closed down. To
// continue the chain the Mux has to be closed.
if pc.dtlsTransport.srtpSession != nil {
if err := pc.dtlsTransport.srtpSession.Close(); err != nil {
closeErrs = append(closeErrs, err)
}
}
if pc.dtlsTransport.srtcpSession != nil {
if err := pc.dtlsTransport.srtcpSession.Close(); err != nil {
closeErrs = append(closeErrs, err)
}
if err := pc.dtlsTransport.Stop(); err != nil {
closeErrs = append(closeErrs, err)
}
for _, t := range pc.rtpTransceivers {

View File

@@ -37,13 +37,13 @@ func TestRTCPeerConnection_Media_Sample(t *testing.T) {
go func() {
for {
time.Sleep(time.Millisecond * 100)
if routineErr := pcAnswer.SendRTCP(&rtcp.PictureLossIndication{SenderSSRC: track.Ssrc, MediaSSRC: track.Ssrc}); routineErr != nil {
if routineErr := pcAnswer.SendRTCP(&rtcp.RapidResynchronizationRequest{SenderSSRC: track.Ssrc, MediaSSRC: track.Ssrc}); routineErr != nil {
awaitRTCPRecieverSend <- routineErr
return
}
select {
case <-awaitRTCPRecieverRecv:
case <-awaitRTCPSenderRecv:
close(awaitRTCPRecieverSend)
return
default:

View File

@@ -17,31 +17,37 @@ type RTCRtpSender struct {
// NewRTCRtpSender constructs a new RTCRtpSender
func NewRTCRtpSender(track *RTCTrack, transport *RTCDtlsTransport) *RTCRtpSender {
return &RTCRtpSender{
r := &RTCRtpSender{
Track: track,
transport: transport,
}
r.Track.sampleInput = make(chan media.RTCSample, 15) // Is the buffering needed?
r.Track.rawInput = make(chan *rtp.Packet, 15) // Is the buffering needed?
r.Track.rtcpInput = make(chan rtcp.Packet, 15) // Is the buffering needed?
r.Track.Samples = r.Track.sampleInput
r.Track.RawRTP = r.Track.rawInput
r.Track.RTCPPackets = r.Track.rtcpInput
if r.Track.isRawRTP {
close(r.Track.Samples)
} else {
close(r.Track.RawRTP)
}
return r
}
// Send Attempts to set the parameters controlling the sending of media.
func (r *RTCRtpSender) Send(parameters RTCRtpSendParameters) {
sampleInput := make(chan media.RTCSample, 15) // Is the buffering needed?
rawInput := make(chan *rtp.Packet, 15) // Is the buffering needed?
rtcpInput := make(chan rtcp.Packet, 15) // Is the buffering needed?
r.Track.Samples = sampleInput
r.Track.RawRTP = rawInput
r.Track.RTCPPackets = rtcpInput
if r.Track.isRawRTP {
close(r.Track.Samples)
go r.handleRawRTP(rawInput)
go r.handleRawRTP(r.Track.rawInput)
} else {
close(r.Track.RawRTP)
go r.handleSampleRTP(sampleInput)
go r.handleSampleRTP(r.Track.sampleInput)
}
go r.handleRTCP(r.transport, rtcpInput)
go r.handleRTCP(r.transport, r.Track.rtcpInput)
}
// Stop irreversibly stops the RTCRtpSender

View File

@@ -12,7 +12,10 @@ import (
// RTCTrack represents a track that is communicated
type RTCTrack struct {
isRawRTP bool
isRawRTP bool
sampleInput chan media.RTCSample
rawInput chan *rtp.Packet
rtcpInput chan rtcp.Packet
ID string
PayloadType uint8