RTCP Reception mechanism

Add a channel to the RTCTrack object to
recieve incoming RTCP packets with.
Relates to #119
This commit is contained in:
Woodrow Douglass
2018-11-26 11:48:45 -05:00
committed by Max Hawkins
parent bc89ee0fcb
commit de31bb3c60
11 changed files with 321 additions and 33 deletions

View File

@@ -12,9 +12,15 @@ import (
"github.com/pions/webrtc/pkg/datachannel" "github.com/pions/webrtc/pkg/datachannel"
"github.com/pions/webrtc/pkg/ice" "github.com/pions/webrtc/pkg/ice"
"github.com/pions/webrtc/pkg/rtp" "github.com/pions/webrtc/pkg/rtp"
"github.com/pions/webrtc/pkg/rtcp"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
type Transportpair struct {
Rtp chan<- *rtp.Packet
Rtcp chan<- *rtcp.PacketWithHeader
}
// Manager contains all network state (DTLS, SRTP) that is shared between ports // Manager contains all network state (DTLS, SRTP) that is shared between ports
// It is also used to perform operations that involve multiple ports // It is also used to perform operations that involve multiple ports
type Manager struct { type Manager struct {
@@ -30,7 +36,7 @@ type Manager struct {
dataChannelEventHandler DataChannelEventHandler dataChannelEventHandler DataChannelEventHandler
bufferTransportGenerator BufferTransportGenerator bufferTransportGenerator BufferTransportGenerator
bufferTransports map[uint32]chan<- *rtp.Packet bufferTransportPairs map[uint32]*Transportpair
srtpInboundContextLock sync.RWMutex srtpInboundContextLock sync.RWMutex
srtpInboundContext *srtp.Context srtpInboundContext *srtp.Context
@@ -44,11 +50,20 @@ type Manager struct {
ports []*port ports []*port
} }
func (m *Manager) AddTransportPair(ssrc uint32, Rtp chan<- *rtp.Packet, Rtcp chan<- *rtcp.PacketWithHeader) {
bufferTransport := m.bufferTransportPairs[ssrc]
if bufferTransport == nil {
bufferTransport = &Transportpair{Rtp, Rtcp}
m.bufferTransportPairs[ssrc] = bufferTransport
}
}
// NewManager creates a new network.Manager // NewManager creates a new network.Manager
func NewManager(btg BufferTransportGenerator, dcet DataChannelEventHandler, ntf ICENotifier) (m *Manager, err error) { func NewManager(btg BufferTransportGenerator, dcet DataChannelEventHandler, ntf ICENotifier) (m *Manager, err error) {
m = &Manager{ m = &Manager{
iceNotifier: ntf, iceNotifier: ntf,
bufferTransports: make(map[uint32]chan<- *rtp.Packet), bufferTransportPairs: make(map[uint32]*Transportpair),
bufferTransportGenerator: btg, bufferTransportGenerator: btg,
dataChannelEventHandler: dcet, dataChannelEventHandler: dcet,
} }
@@ -80,6 +95,22 @@ func NewManager(btg BufferTransportGenerator, dcet DataChannelEventHandler, ntf
return m, err return m, err
} }
func (m *Manager) getBufferTransports(ssrc uint32) *Transportpair {
fmt.Println(m.bufferTransportPairs)
return m.bufferTransportPairs[ssrc]
}
func (m *Manager) getOrCreateBufferTransports(ssrc uint32, payloadtype uint8) *Transportpair {
bufferTransport := m.bufferTransportPairs[ssrc]
if bufferTransport == nil {
bufferTransport = m.bufferTransportGenerator(ssrc, payloadtype)
fmt.Printf("CREATE FOR %x %v\n", ssrc, bufferTransport)
m.bufferTransportPairs[ssrc] = bufferTransport
}
return bufferTransport
}
func (m *Manager) handleDTLSState(state dtls.ConnectionState) { func (m *Manager) handleDTLSState(state dtls.ConnectionState) {
if state == dtls.Established { if state == dtls.Established {
m.sctpAssociation.Connect() m.sctpAssociation.Connect()

View File

@@ -3,12 +3,11 @@ package network
import ( import (
"github.com/pions/webrtc/pkg/datachannel" "github.com/pions/webrtc/pkg/datachannel"
"github.com/pions/webrtc/pkg/ice" "github.com/pions/webrtc/pkg/ice"
"github.com/pions/webrtc/pkg/rtp"
) )
// BufferTransportGenerator generates a new channel for the associated SSRC // BufferTransportGenerator generates a new channel for the associated SSRC
// This channel is used to send RTP packets to users of pion-WebRTC // This channel is used to send RTP packets to users of pion-WebRTC
type BufferTransportGenerator func(uint32, uint8) chan<- *rtp.Packet type BufferTransportGenerator func(uint32, uint8) *Transportpair
// ICENotifier notifies the RTCPeerConnection if ICE state has changed // ICENotifier notifies the RTCPeerConnection if ICE state has changed
type ICENotifier func(ice.ConnectionState) type ICENotifier func(ice.ConnectionState)

View File

@@ -10,6 +10,7 @@ import (
"github.com/pions/webrtc/internal/sctp" "github.com/pions/webrtc/internal/sctp"
"github.com/pions/webrtc/internal/srtp" "github.com/pions/webrtc/internal/srtp"
"github.com/pions/webrtc/pkg/rtp" "github.com/pions/webrtc/pkg/rtp"
"github.com/pions/webrtc/pkg/rtcp"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@@ -35,12 +36,59 @@ func (p *port) handleSRTP(buffer []byte) {
return return
} }
if rtcpPacketType >= 192 && rtcpPacketType <= 223 { if rtcpPacketType >= 192 && rtcpPacketType <= 223 {
decrypted, err := p.m.srtpInboundContext.DecryptRTCP(buffer) decrypted, err := p.m.srtpInboundContext.DecryptRTCP(buffer)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
fmt.Println(decrypted) fmt.Println(decrypted)
return return
} else {
var report rtcp.PacketWithHeader
err = report.Unmarshal(decrypted)
if (err != nil) {
fmt.Println(err)
return
} else {
f := func(ssrc uint32) {
bufferTransport := p.m.getBufferTransports(ssrc)
fmt.Printf("send to %x, %v\n", ssrc, bufferTransport);
if bufferTransport != nil {
select {
case bufferTransport.Rtcp <- &report:
default:
}
}
}
switch report.Header.Type {
case rtcp.TypeSenderReport:
for _, ssrc := range report.Packet.(*rtcp.SenderReport).Reports {
f(ssrc.SSRC)
}
case rtcp.TypeReceiverReport:
for _, ssrc := range report.Packet.(*rtcp.ReceiverReport).Reports {
fmt.Println("ssrc ", ssrc.SSRC);
f(ssrc.SSRC)
}
case rtcp.TypeSourceDescription:
for _, ssrc := range report.Packet.(*rtcp.SourceDescription).Chunks {
f(ssrc.Source)
}
case rtcp.TypeGoodbye:
for _, ssrc := range report.Packet.(*rtcp.Goodbye).Sources {
f(ssrc)
}
case rtcp.TypeTransportSpecificFeedback:
f(report.Packet.(*rtcp.RapidResynchronizationRequest).MediaSSRC)
case rtcp.TypePayloadSpecificFeedback:
f(report.Packet.(*rtcp.PictureLossIndication).MediaSSRC)
}
}
} }
return return
} }
@@ -57,19 +105,13 @@ func (p *port) handleSRTP(buffer []byte) {
return return
} }
bufferTransport := p.m.bufferTransports[packet.SSRC] bufferTransport := p.m.getOrCreateBufferTransports(packet.SSRC, packet.PayloadType)
if bufferTransport == nil { if bufferTransport != nil {
bufferTransport = p.m.bufferTransportGenerator(packet.SSRC, packet.PayloadType)
if bufferTransport == nil {
return
}
p.m.bufferTransports[packet.SSRC] = bufferTransport
}
select { select {
case bufferTransport <- packet: case bufferTransport.Rtp <- packet:
default: default:
} }
}
} }

View File

@@ -3,6 +3,7 @@ package webrtc
import ( import (
"github.com/pions/webrtc/pkg/media" "github.com/pions/webrtc/pkg/media"
"github.com/pions/webrtc/pkg/rtp" "github.com/pions/webrtc/pkg/rtp"
"github.com/pions/webrtc/pkg/rtcp"
) )
// RTCTrack represents a track that is communicated // RTCTrack represents a track that is communicated
@@ -14,6 +15,7 @@ type RTCTrack struct {
Ssrc uint32 Ssrc uint32
Codec *RTCRtpCodec Codec *RTCRtpCodec
Packets <-chan *rtp.Packet Packets <-chan *rtp.Packet
RtcpPackets <-chan *rtcp.PacketWithHeader
Samples chan<- media.RTCSample Samples chan<- media.RTCSample
RawRTP chan<- *rtp.Packet RawRTP chan<- *rtp.Packet
} }

View File

@@ -14,6 +14,7 @@ const (
TypeSourceDescription PacketType = 202 // RFC 3550, 6.5 TypeSourceDescription PacketType = 202 // RFC 3550, 6.5
TypeGoodbye PacketType = 203 // RFC 3550, 6.6 TypeGoodbye PacketType = 203 // RFC 3550, 6.6
TypeApplicationDefined PacketType = 204 // RFC 3550, 6.7 (unimplemented) TypeApplicationDefined PacketType = 204 // RFC 3550, 6.7 (unimplemented)
TypeTransportSpecificFeedback PacketType = 205 // RFC 4585, 6051
TypePayloadSpecificFeedback PacketType = 206 // RFC 4585, 6.3 TypePayloadSpecificFeedback PacketType = 206 // RFC 4585, 6.3
) )
@@ -30,6 +31,8 @@ func (p PacketType) String() string {
return "BYE" return "BYE"
case TypeApplicationDefined: case TypeApplicationDefined:
return "APP" return "APP"
case TypeTransportSpecificFeedback:
return "TSFB"
case TypePayloadSpecificFeedback: case TypePayloadSpecificFeedback:
return "PSFB" return "PSFB"
default: default:

View File

@@ -5,3 +5,78 @@ type Packet interface {
Marshal() ([]byte, error) Marshal() ([]byte, error)
Unmarshal(rawPacket []byte) error Unmarshal(rawPacket []byte) error
} }
type PacketWithHeader struct {
Header
Packet
RawPacket []byte
}
func (p PacketWithHeader) Marshal() ([]byte, error) {
return p.Packet.Marshal()
}
func (p *PacketWithHeader) Unmarshal(rawPacket []byte) error {
p.RawPacket = rawPacket
if err := p.Header.Unmarshal(rawPacket); err != nil {
return err
}
switch p.Header.Type {
case TypeSenderReport:
sr := new(SenderReport)
err := sr.Unmarshal(rawPacket)
if (err == nil) {
p.Packet = sr
} else {
return err
}
case TypeReceiverReport:
rr := new(ReceiverReport)
err := rr.Unmarshal(rawPacket)
if (err == nil) {
p.Packet = rr
} else {
return err
}
case TypeSourceDescription:
sdes := new(SourceDescription)
err := sdes.Unmarshal(rawPacket)
if (err == nil) {
p.Packet = sdes
} else {
return err
}
case TypeGoodbye:
bye := new(Goodbye)
err := bye.Unmarshal(rawPacket)
if (err == nil) {
p.Packet = bye
} else {
return err
}
case TypeTransportSpecificFeedback:
rrr := new(RapidResynchronizationRequest)
err := rrr.Unmarshal(rawPacket)
if (err == nil) {
p.Packet = rrr
} else {
return err
}
case TypePayloadSpecificFeedback:
psfb := new(PictureLossIndication)
err := psfb.Unmarshal(rawPacket)
if (err == nil) {
p.Packet = psfb
} else {
return err
}
default:
return errWrongType
}
return nil
}

View File

@@ -54,7 +54,7 @@ func (p *PictureLossIndication) Unmarshal(rawPacket []byte) error {
return err return err
} }
if h.Type != TypePayloadSpecificFeedback || h.Count != pliFMT { if h.Type != TypePayloadSpecificFeedback || h.Count != 1 {
return errWrongType return errWrongType
} }

View File

@@ -0,0 +1,65 @@
package rtcp
import (
"encoding/binary"
)
// The RapidResynchronizationRequest packet informs the encoder about the loss of an undefined amount of coded video data belonging to one or more pictures
type RapidResynchronizationRequest struct {
// SSRC of sender
SenderSSRC uint32
// SSRC of the media source
MediaSSRC uint32
}
const (
rrrFMT = 5
rrrLength = 2
)
// Marshal encodes the RapidResynchronizationRequest in binary
func (p RapidResynchronizationRequest) Marshal() ([]byte, error) {
/*
* RRR does not require parameters. Therefore, the length field MUST be
* 2, and there MUST NOT be any Feedback Control Information.
*
* The semantics of this FB message is independent of the payload type.
*/
rawPacket := make([]byte, 8)
binary.BigEndian.PutUint32(rawPacket, p.SenderSSRC)
binary.BigEndian.PutUint32(rawPacket[4:], p.MediaSSRC)
h := Header{
Count: rrrFMT,
Type: TypeTransportSpecificFeedback,
Length: rrrLength,
}
hData, err := h.Marshal()
if err != nil {
return nil, err
}
return append(hData, rawPacket...), nil
}
// Unmarshal decodes the RapidResynchronizationRequest from binary
func (p *RapidResynchronizationRequest) Unmarshal(rawPacket []byte) error {
if len(rawPacket) < (headerLength + (ssrcLength * 2)) {
return errPacketTooShort
}
var h Header
if err := h.Unmarshal(rawPacket); err != nil {
return err
}
if h.Type != TypeTransportSpecificFeedback || h.Count != 1 {
return errWrongType
}
p.SenderSSRC = binary.BigEndian.Uint32(rawPacket[headerLength:])
p.MediaSSRC = binary.BigEndian.Uint32(rawPacket[headerLength+ssrcLength:])
return nil
}

View File

@@ -0,0 +1,64 @@
package rtcp
import (
"encoding/binary"
)
// The PictureLossIndication packet informs the encoder about the loss of an undefined amount of coded video data belonging to one or more pictures
type PictureLossIndication struct {
// SSRC of sender
SenderSSRC uint32
// SSRC where the loss was experienced
MediaSSRC uint32
}
const (
pliFMT = 1
pliLength = 2
)
// Marshal encodes the PictureLossIndication in binary
func (p PictureLossIndication) Marshal() ([]byte, error) {
/*
* PLI does not require parameters. Therefore, the length field MUST be
* 2, and there MUST NOT be any Feedback Control Information.
*
* The semantics of this FB message is independent of the payload type.
*/
rawPacket := make([]byte, 8)
binary.BigEndian.PutUint32(rawPacket, p.SenderSSRC)
binary.BigEndian.PutUint32(rawPacket[4:], p.MediaSSRC)
h := Header{
Count: pliFMT,
Type: TypePayloadSpecificFeedback,
Length: pliLength,
}
hData, err := h.Marshal()
if err != nil {
return nil, err
}
return append(hData, rawPacket...), nil
}
// Unmarshal decodes the PictureLossIndication from binary
func (p *PictureLossIndication) Unmarshal(rawPacket []byte) error {
if len(rawPacket) < (headerLength + (ssrcLength * 2)) {
return errPacketTooShort
}
var h Header
if err := h.Unmarshal(rawPacket); err != nil {
return err
}
if h.Type != TypePayloadSpecificFeedback || h.Count != pliFMT {
return errWrongType
}
p.SenderSSRC = binary.BigEndian.Uint32(rawPacket[headerLength:])
p.MediaSSRC = binary.BigEndian.Uint32(rawPacket[headerLength+ssrcLength:])
return nil
}

View File

@@ -61,6 +61,7 @@ func (r ReceiverReport) Marshal() ([]byte, error) {
rawPacket = append(rawPacket, data...) rawPacket = append(rawPacket, data...)
} }
if len(r.Reports) > countMax { if len(r.Reports) > countMax {
return nil, errTooManyReports return nil, errTooManyReports
} }
@@ -125,7 +126,7 @@ func (r *ReceiverReport) Unmarshal(rawPacket []byte) error {
r.SSRC = binary.BigEndian.Uint32(rawPacket[rrSSRCOffset:]) r.SSRC = binary.BigEndian.Uint32(rawPacket[rrSSRCOffset:])
for i := rrReportOffset; i < len(rawPacket); i += receptionReportLength { for i := rrReportOffset; i < len(rawPacket) && len(r.Reports) < int(h.Count); i += receptionReportLength {
var rr ReceptionReport var rr ReceptionReport
if err := rr.Unmarshal(rawPacket[i:]); err != nil { if err := rr.Unmarshal(rawPacket[i:]); err != nil {
return err return err

View File

@@ -1008,7 +1008,7 @@ func (pc *RTCPeerConnection) Close() error {
} }
/* Everything below is private */ /* Everything below is private */
func (pc *RTCPeerConnection) generateChannel(ssrc uint32, payloadType uint8) (buffers chan<- *rtp.Packet) { func (pc *RTCPeerConnection) generateChannel(ssrc uint32, payloadType uint8) (tpair *network.Transportpair) {
pc.RLock() pc.RLock()
if pc.onTrackHandler == nil { if pc.onTrackHandler == nil {
pc.RUnlock() pc.RUnlock()
@@ -1028,7 +1028,8 @@ func (pc *RTCPeerConnection) generateChannel(ssrc uint32, payloadType uint8) (bu
return nil return nil
} }
bufferTransport := make(chan *rtp.Packet, 15) rtpTransport := make(chan *rtp.Packet, 15)
rtcpTransport := make(chan *rtcp.PacketWithHeader, 15)
track := &RTCTrack{ track := &RTCTrack{
PayloadType: payloadType, PayloadType: payloadType,
@@ -1037,13 +1038,14 @@ func (pc *RTCPeerConnection) generateChannel(ssrc uint32, payloadType uint8) (bu
Label: "", // TODO extract from remoteDescription Label: "", // TODO extract from remoteDescription
Ssrc: ssrc, Ssrc: ssrc,
Codec: codec, Codec: codec,
Packets: bufferTransport, Packets: rtpTransport,
RtcpPackets: rtcpTransport,
} }
// TODO: Register the receiving Track // TODO: Register the receiving Track
pc.onTrack(track) pc.onTrack(track)
return bufferTransport return &network.Transportpair{rtpTransport, rtcpTransport}
} }
func (pc *RTCPeerConnection) iceStateChange(newState ice.ConnectionState) { func (pc *RTCPeerConnection) iceStateChange(newState ice.ConnectionState) {
@@ -1200,6 +1202,7 @@ func (pc *RTCPeerConnection) newRTCTrack(payloadType uint8, ssrc uint32, id, lab
trackInput := make(chan media.RTCSample, 15) // Is the buffering needed? trackInput := make(chan media.RTCSample, 15) // Is the buffering needed?
rawPackets := make(chan *rtp.Packet) rawPackets := make(chan *rtp.Packet)
rtcpPackets := make(chan *rtcp.PacketWithHeader)
if ssrc == 0 { if ssrc == 0 {
buf := make([]byte, 4) buf := make([]byte, 4)
_, err = rand.Read(buf) _, err = rand.Read(buf)
@@ -1246,10 +1249,13 @@ func (pc *RTCPeerConnection) newRTCTrack(payloadType uint8, ssrc uint32, id, lab
Label: label, Label: label,
Ssrc: ssrc, Ssrc: ssrc,
Codec: codec, Codec: codec,
RtcpPackets: rtcpPackets,
Samples: trackInput, Samples: trackInput,
RawRTP: rawPackets, RawRTP: rawPackets,
} }
pc.networkManager.AddTransportPair(ssrc, nil, rtcpPackets)
return t, nil return t, nil
} }