Finish RTCRtpReceiver and RTCRtpSender

Allow sending of media via ORTC, examples will be added in the future.
But for now all of this state has been pulled out of RTCPeerConnection

Resolves #312
This commit is contained in:
Sean DuBois
2019-01-26 21:47:05 -08:00
parent 3c37b6c4b4
commit 56a5827d5c
7 changed files with 456 additions and 353 deletions

View File

@@ -13,9 +13,15 @@ import (
"github.com/pions/dtls" "github.com/pions/dtls"
"github.com/pions/webrtc/internal/mux" "github.com/pions/webrtc/internal/mux"
"github.com/pions/webrtc/internal/srtp"
"github.com/pions/webrtc/pkg/rtcerr" "github.com/pions/webrtc/pkg/rtcerr"
) )
const (
srtpMasterKeyLen = 16
srtpMasterKeySaltLen = 14
)
// RTCDtlsTransport allows an application access to information about the DTLS // RTCDtlsTransport allows an application access to information about the DTLS
// transport over which RTP and RTCP packets are sent and received by // transport over which RTP and RTCP packets are sent and received by
// RTCRtpSender and RTCRtpReceiver, as well other data such as SCTP packets sent // RTCRtpSender and RTCRtpReceiver, as well other data such as SCTP packets sent
@@ -31,13 +37,23 @@ type RTCDtlsTransport struct {
// OnError func() // OnError func()
conn *dtls.Conn conn *dtls.Conn
srtpSession *srtp.SessionSRTP
srtpEndpoint *mux.Endpoint
srtcpSession *srtp.SessionSRTCP
srtcpEndpoint *mux.Endpoint
} }
// NewRTCDtlsTransport creates a new RTCDtlsTransport. // NewRTCDtlsTransport creates a new RTCDtlsTransport.
// This constructor is part of the ORTC API. It is not // This constructor is part of the ORTC API. It is not
// meant to be used together with the basic WebRTC API. // meant to be used together with the basic WebRTC API.
func (api *API) NewRTCDtlsTransport(transport *RTCIceTransport, certificates []RTCCertificate) (*RTCDtlsTransport, error) { func (api *API) NewRTCDtlsTransport(transport *RTCIceTransport, certificates []RTCCertificate) (*RTCDtlsTransport, error) {
t := &RTCDtlsTransport{iceTransport: transport} t := &RTCDtlsTransport{
iceTransport: transport,
srtpSession: srtp.CreateSessionSRTP(),
srtcpSession: srtp.CreateSessionSRTCP(),
}
if len(certificates) > 0 { if len(certificates) > 0 {
now := time.Now() now := time.Now()
@@ -132,7 +148,64 @@ func (t *RTCDtlsTransport) Start(remoteParameters RTCDtlsParameters) error {
fmt.Println("Warning: Certificate not checked") fmt.Println("Warning: Certificate not checked")
} }
return nil return t.startSRTP(isClient)
}
// Start SRTP transport
// RTCDtlsTransport needs to own SRTP state to satisfy ORTC interfaces
func (t *RTCDtlsTransport) startSRTP(isClient bool) error {
keyingMaterial, err := t.conn.ExportKeyingMaterial([]byte("EXTRACTOR-dtls_srtp"), nil, (srtpMasterKeyLen*2)+(srtpMasterKeySaltLen*2))
if err != nil {
return err
}
t.srtpEndpoint = t.iceTransport.mux.NewEndpoint(mux.MatchSRTP)
t.srtcpEndpoint = t.iceTransport.mux.NewEndpoint(mux.MatchSRTCP)
offset := 0
clientWriteKey := append([]byte{}, keyingMaterial[offset:offset+srtpMasterKeyLen]...)
offset += srtpMasterKeyLen
serverWriteKey := append([]byte{}, keyingMaterial[offset:offset+srtpMasterKeyLen]...)
offset += srtpMasterKeyLen
clientWriteKey = append(clientWriteKey, keyingMaterial[offset:offset+srtpMasterKeySaltLen]...)
offset += srtpMasterKeySaltLen
serverWriteKey = append(serverWriteKey, keyingMaterial[offset:offset+srtpMasterKeySaltLen]...)
if !isClient {
err = t.srtpSession.Start(
serverWriteKey[0:16], serverWriteKey[16:],
clientWriteKey[0:16], clientWriteKey[16:],
srtp.ProtectionProfileAes128CmHmacSha1_80, t.srtpEndpoint,
)
if err == nil {
err = t.srtcpSession.Start(
serverWriteKey[0:16], serverWriteKey[16:],
clientWriteKey[0:16], clientWriteKey[16:],
srtp.ProtectionProfileAes128CmHmacSha1_80, t.srtcpEndpoint,
)
}
} else {
err = t.srtpSession.Start(
clientWriteKey[0:16], clientWriteKey[16:],
serverWriteKey[0:16], serverWriteKey[16:],
srtp.ProtectionProfileAes128CmHmacSha1_80, t.srtpEndpoint,
)
if err == nil {
err = t.srtcpSession.Start(
clientWriteKey[0:16], clientWriteKey[16:],
serverWriteKey[0:16], serverWriteKey[16:],
srtp.ProtectionProfileAes128CmHmacSha1_80, t.srtcpEndpoint,
)
}
}
return err
} }
func (t *RTCDtlsTransport) validateFingerPrint(remoteParameters RTCDtlsParameters, remoteCert *x509.Certificate) error { func (t *RTCDtlsTransport) validateFingerPrint(remoteParameters RTCDtlsParameters, remoteCert *x509.Certificate) error {

View File

@@ -5,22 +5,18 @@ import (
"crypto/ecdsa" "crypto/ecdsa"
"crypto/elliptic" "crypto/elliptic"
"crypto/rand" "crypto/rand"
"encoding/binary"
"fmt" "fmt"
"net" "net"
"strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/pions/sdp" "github.com/pions/sdp"
"github.com/pions/webrtc/internal/mux"
"github.com/pions/webrtc/internal/srtp"
"github.com/pions/webrtc/pkg/ice" "github.com/pions/webrtc/pkg/ice"
"github.com/pions/webrtc/pkg/logging" "github.com/pions/webrtc/pkg/logging"
"github.com/pions/webrtc/pkg/media"
"github.com/pions/webrtc/pkg/rtcerr" "github.com/pions/webrtc/pkg/rtcerr"
"github.com/pions/webrtc/pkg/rtcp" "github.com/pions/webrtc/pkg/rtcp"
"github.com/pions/webrtc/pkg/rtp"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@@ -33,9 +29,6 @@ const (
unknownStr = "unknown" unknownStr = "unknown"
receiveMTU = 8192 receiveMTU = 8192
srtpMasterKeyLen = 16
srtpMasterKeySaltLen = 14
) )
// RTCPeerConnection represents a WebRTC connection that establishes a // RTCPeerConnection represents a WebRTC connection that establishes a
@@ -118,12 +111,6 @@ type RTCPeerConnection struct {
dtlsTransport *RTCDtlsTransport dtlsTransport *RTCDtlsTransport
sctpTransport *RTCSctpTransport sctpTransport *RTCSctpTransport
srtpSession *srtp.SessionSRTP
srtpEndpoint *mux.Endpoint
srtcpSession *srtp.SessionSRTCP
srtcpEndpoint *mux.Endpoint
// A reference to the associated API state used by this connection // A reference to the associated API state used by this connection
api *API api *API
} }
@@ -133,7 +120,7 @@ func (api *API) NewRTCPeerConnection(configuration RTCConfiguration) (*RTCPeerCo
// https://w3c.github.io/webrtc-pc/#constructor (Step #2) // https://w3c.github.io/webrtc-pc/#constructor (Step #2)
// Some variables defined explicitly despite their implicit zero values to // Some variables defined explicitly despite their implicit zero values to
// allow better readability to understand what is happening. // allow better readability to understand what is happening.
pc := RTCPeerConnection{ pc := &RTCPeerConnection{
configuration: RTCConfiguration{ configuration: RTCConfiguration{
IceServers: []RTCIceServer{}, IceServers: []RTCIceServer{},
IceTransportPolicy: RTCIceTransportPolicyAll, IceTransportPolicy: RTCIceTransportPolicyAll,
@@ -153,9 +140,6 @@ func (api *API) NewRTCPeerConnection(configuration RTCConfiguration) (*RTCPeerCo
ConnectionState: RTCPeerConnectionStateNew, ConnectionState: RTCPeerConnectionStateNew,
dataChannels: make(map[uint16]*RTCDataChannel), dataChannels: make(map[uint16]*RTCDataChannel),
srtpSession: srtp.CreateSessionSRTP(),
srtcpSession: srtp.CreateSessionSRTCP(),
api: api, api: api,
} }
@@ -177,7 +161,18 @@ func (api *API) NewRTCPeerConnection(configuration RTCConfiguration) (*RTCPeerCo
return nil, err return nil, err
} }
return &pc, nil // Create the ice transport
iceTransport := pc.createICETransport()
pc.iceTransport = iceTransport
// Create the DTLS transport
dtlsTransport, err := pc.createDTLSTransport()
if err != nil {
return nil, err
}
pc.dtlsTransport = dtlsTransport
return pc, nil
} }
// initConfiguration defines validation of the specified RTCConfiguration and // initConfiguration defines validation of the specified RTCConfiguration and
@@ -744,10 +739,6 @@ func (pc *RTCPeerConnection) SetRemoteDescription(desc RTCSessionDescription) er
weOffer = false weOffer = false
} }
// Create the ice transport
iceTransport := pc.createICETransport()
pc.iceTransport = iceTransport
for _, m := range pc.RemoteDescription().parsed.MediaDescriptions { for _, m := range pc.RemoteDescription().parsed.MediaDescriptions {
for _, a := range m.Attributes { for _, a := range m.Attributes {
if a.IsICECandidate() { if a.IsICECandidate() {
@@ -772,13 +763,6 @@ func (pc *RTCPeerConnection) SetRemoteDescription(desc RTCSessionDescription) er
} }
} }
// Create the DTLS transport
dtlsTransport, err := pc.createDTLSTransport()
if err != nil {
return err
}
pc.dtlsTransport = dtlsTransport
fingerprint, ok := desc.parsed.Attribute("fingerprint") fingerprint, ok := desc.parsed.Attribute("fingerprint")
if !ok { if !ok {
fingerprint, ok = desc.parsed.MediaDescriptions[0].Attribute("fingerprint") fingerprint, ok = desc.parsed.MediaDescriptions[0].Attribute("fingerprint")
@@ -844,19 +828,12 @@ func (pc *RTCPeerConnection) SetRemoteDescription(desc RTCSessionDescription) er
return return
} }
pc.srtpEndpoint = pc.iceTransport.mux.NewEndpoint(mux.MatchSRTP) if pc.onTrackHandler != nil {
pc.srtcpEndpoint = pc.iceTransport.mux.NewEndpoint(mux.MatchSRTCP) pc.openSRTP()
} else {
err = pc.startSRTP(weOffer) pcLog.Warnf("OnTrack unset, unable to handle incoming media streams")
if err != nil {
// TODO: Handle error
pcLog.Warnf("Failed to start RTP: %s", err)
return
} }
go pc.acceptSRTP()
go pc.drainSRTCP()
// Start sctp // Start sctp
err = pc.sctpTransport.Start(RTCSctpCapabilities{ err = pc.sctpTransport.Start(RTCSctpCapabilities{
MaxMessageSize: 0, MaxMessageSize: 0,
@@ -885,56 +862,65 @@ func (pc *RTCPeerConnection) openDataChannels() {
} }
} }
// startSRTP initializes all the cryptographic context needed for encrypted RTP // openSRTP opens knows inbound SRTP streams from the RemoteDescription
func (pc *RTCPeerConnection) startSRTP(isOffer bool) error { func (pc *RTCPeerConnection) openSRTP() {
keyingMaterial, err := pc.dtlsTransport.conn.ExportKeyingMaterial([]byte("EXTRACTOR-dtls_srtp"), nil, (srtpMasterKeyLen*2)+(srtpMasterKeySaltLen*2)) incomingSSRCes := map[uint32]RTCRtpCodecType{}
if err != nil {
return err
}
offset := 0 for _, media := range pc.RemoteDescription().parsed.MediaDescriptions {
clientWriteKey := append([]byte{}, keyingMaterial[offset:offset+srtpMasterKeyLen]...) for _, attr := range media.Attributes {
offset += srtpMasterKeyLen var codecType RTCRtpCodecType
if media.MediaName.Media == "audio" {
serverWriteKey := append([]byte{}, keyingMaterial[offset:offset+srtpMasterKeyLen]...) codecType = RTCRtpCodecTypeAudio
offset += srtpMasterKeyLen } else if media.MediaName.Media == "video" {
codecType = RTCRtpCodecTypeVideo
clientWriteKey = append(clientWriteKey, keyingMaterial[offset:offset+srtpMasterKeySaltLen]...)
offset += srtpMasterKeySaltLen
serverWriteKey = append(serverWriteKey, keyingMaterial[offset:offset+srtpMasterKeySaltLen]...)
if isOffer {
err = pc.srtpSession.Start(
serverWriteKey[0:16], serverWriteKey[16:],
clientWriteKey[0:16], clientWriteKey[16:],
srtp.ProtectionProfileAes128CmHmacSha1_80, pc.srtpEndpoint,
)
if err == nil {
err = pc.srtcpSession.Start(
serverWriteKey[0:16], serverWriteKey[16:],
clientWriteKey[0:16], clientWriteKey[16:],
srtp.ProtectionProfileAes128CmHmacSha1_80, pc.srtcpEndpoint,
)
}
} else { } else {
err = pc.srtpSession.Start( continue
clientWriteKey[0:16], clientWriteKey[16:], }
serverWriteKey[0:16], serverWriteKey[16:],
srtp.ProtectionProfileAes128CmHmacSha1_80, pc.srtpEndpoint,
)
if err == nil { if attr.Key == sdp.AttrKeySsrc {
err = pc.srtcpSession.Start( ssrc, err := strconv.ParseUint(strings.Split(attr.Value, " ")[0], 10, 32)
clientWriteKey[0:16], clientWriteKey[16:], if err != nil {
serverWriteKey[0:16], serverWriteKey[16:], pcLog.Warnf("Failed to parse SSRC: %v", err)
srtp.ProtectionProfileAes128CmHmacSha1_80, pc.srtcpEndpoint, continue
) }
incomingSSRCes[uint32(ssrc)] = codecType
}
} }
} }
return err for i := range incomingSSRCes {
go func(ssrc uint32, codecType RTCRtpCodecType) {
receiver := NewRTCRtpReceiver(codecType, pc.dtlsTransport)
<-receiver.Receive(RTCRtpReceiveParameters{
encodings: RTCRtpDecodingParameters{
RTCRtpCodingParameters{SSRC: ssrc},
}})
sdpCodec, err := pc.CurrentLocalDescription.parsed.GetCodecForPayloadType(receiver.Track.PayloadType)
if err != nil {
pcLog.Warnf("no codec could be found in RemoteDescription for payloadType %d", receiver.Track.PayloadType)
return
}
codec, err := pc.api.mediaEngine.getCodecSDP(sdpCodec)
if err != nil {
pcLog.Warnf("codec %s in not registered", sdpCodec)
return
}
receiver.Track.Kind = codec.Type
receiver.Track.Codec = codec
pc.newRTCRtpTransceiver(
receiver,
nil,
RTCRtpTransceiverDirectionRecvonly,
)
pc.onTrack(receiver.Track)
}(i, incomingSSRCes[i])
}
} }
// drainSRTCP pulls and discards RTCP packets that don't match any SRTP // drainSRTCP pulls and discards RTCP packets that don't match any SRTP
@@ -943,24 +929,22 @@ func (pc *RTCPeerConnection) startSRTP(isOffer bool) error {
// and provides useful debugging messages // and provides useful debugging messages
func (pc *RTCPeerConnection) drainSRTCP() { func (pc *RTCPeerConnection) drainSRTCP() {
for { for {
r, ssrc, err := pc.srtcpSession.AcceptStream() r, ssrc, err := pc.dtlsTransport.srtcpSession.AcceptStream()
if err != nil { if err != nil {
pcLog.Warnf("Failed to accept RTCP %v \n", err) pcLog.Warnf("Failed to accept RTCP %v \n", err)
return return
} }
go func() { go func() {
var rtcpPacket rtcp.Packet
for {
rtcpBuf := make([]byte, receiveMTU) rtcpBuf := make([]byte, receiveMTU)
for {
i, err := r.Read(rtcpBuf) i, err := r.Read(rtcpBuf)
if err != nil { if err != nil {
pcLog.Warnf("Failed to read, RTCTrack done for: %v %d \n", err, ssrc) pcLog.Warnf("Failed to read, drainSRTCP done for: %v %d \n", err, ssrc)
return return
} }
rtcpPacket, _, err = rtcp.Unmarshal(rtcpBuf[:i]) rtcpPacket, _, err := rtcp.Unmarshal(rtcpBuf[:i])
if err != nil { if err != nil {
pcLog.Warnf("Failed to unmarshal RTCP packet, discarding: %v \n", err) pcLog.Warnf("Failed to unmarshal RTCP packet, discarding: %v \n", err)
continue continue
@@ -972,77 +956,6 @@ func (pc *RTCPeerConnection) drainSRTCP() {
} }
} }
// TODO: Move to RTCRTpSender?
func (pc *RTCPeerConnection) acceptSRTP() {
for {
r, ssrc, err := pc.srtpSession.AcceptStream()
if err != nil {
return
}
_, h, err := r.ReadRTP(make([]byte, receiveMTU))
if err != nil {
pcLog.Warnf("Failed to read, ignoring AcceptStream: %v \n", err)
continue
}
rtpChannel, rtcpChannel, err := pc.generateChannel(h)
if err != nil {
pcLog.Warnf("Failed to create output channels, ignoring AcceptStream: %v \n", err)
continue
}
// RTP
go func() {
for {
rtpBuf := make([]byte, receiveMTU)
rtpLen, h, err := r.ReadRTP(rtpBuf)
if err != nil {
pcLog.Warnf("Failed to read, RTCTrack done for: %v %d \n", err, ssrc)
return
}
select {
case rtpChannel <- &rtp.Packet{Header: *h, Raw: rtpBuf[:rtpLen], Payload: rtpBuf[h.PayloadOffset:rtpLen]}:
default:
}
}
}()
// RTCP
go func() {
readStream, err := pc.srtcpSession.OpenReadStream(ssrc)
if err != nil {
pcLog.Warnf("Failed to open RTCP ReadStream, RTCTrack done for: %v %d \n", err, ssrc)
return
}
for {
var (
rtcpPacket rtcp.Packet
rtcpLen int
)
rtcpBuf := make([]byte, receiveMTU)
rtcpLen, err = readStream.Read(rtcpBuf)
if err != nil {
pcLog.Warnf("Failed to read, RTCTrack done for: %v %d \n", err, ssrc)
return
}
rtcpPacket, _, err = rtcp.Unmarshal(rtcpBuf[:rtcpLen])
if err != nil {
pcLog.Warnf("Failed to unmarshal RTCP packet, discarding: %v \n", err)
continue
}
select {
case rtcpChannel <- rtcpPacket:
default:
}
}
}()
}
}
// RemoteDescription returns PendingRemoteDescription if it is not null and // RemoteDescription returns PendingRemoteDescription if it is not null and
// otherwise it returns CurrentRemoteDescription. This property is used to // otherwise it returns CurrentRemoteDescription. This property is used to
// determine if setRemoteDescription has already been called. // determine if setRemoteDescription has already been called.
@@ -1138,13 +1051,16 @@ func (pc *RTCPeerConnection) AddTrack(track *RTCTrack) (*RTCRtpSender, error) {
return nil, err return nil, err
} }
} else { } else {
var receiver *RTCRtpReceiver sender := NewRTCRtpSender(track, pc.dtlsTransport)
sender := newRTCRtpSender(track)
transceiver = pc.newRTCRtpTransceiver( transceiver = pc.newRTCRtpTransceiver(
receiver, nil,
sender, sender,
RTCRtpTransceiverDirectionSendonly, RTCRtpTransceiverDirectionSendonly,
) )
sender.Send(RTCRtpSendParameters{
encodings: RTCRtpEncodingParameters{
RTCRtpCodingParameters{SSRC: track.Ssrc, PayloadType: track.PayloadType},
}})
} }
transceiver.Mid = track.Kind.String() // TODO: Mid generation transceiver.Mid = track.Kind.String() // TODO: Mid generation
@@ -1332,7 +1248,7 @@ func (pc *RTCPeerConnection) SendRTCP(pkt rtcp.Packet) error {
return err return err
} }
writeStream, err := pc.srtcpSession.OpenWriteStream() writeStream, err := pc.dtlsTransport.srtcpSession.OpenWriteStream()
if err != nil { if err != nil {
return fmt.Errorf("SendRTCP failed to open WriteStream: %v", err) return fmt.Errorf("SendRTCP failed to open WriteStream: %v", err)
} }
@@ -1382,13 +1298,13 @@ func (pc *RTCPeerConnection) Close() error {
// Conn if one of the endpoints is closed down. To // Conn if one of the endpoints is closed down. To
// continue the chain the Mux has to be closed. // continue the chain the Mux has to be closed.
if err := pc.srtpSession.Close(); err != nil { // if err := pc.srtpSession.Close(); err != nil {
closeErrs = append(closeErrs, err) // closeErrs = append(closeErrs, err)
} // }
if err := pc.srtcpSession.Close(); err != nil { // if err := pc.srtcpSession.Close(); err != nil {
closeErrs = append(closeErrs, err) // closeErrs = append(closeErrs, err)
} // }
if pc.sctpTransport != nil { if pc.sctpTransport != nil {
if err := pc.sctpTransport.Stop(); err != nil { if err := pc.sctpTransport.Stop(); err != nil {
@@ -1426,44 +1342,6 @@ func flattenErrs(errs []error) error {
return fmt.Errorf(strings.Join(errstrings, "\n")) return fmt.Errorf(strings.Join(errstrings, "\n"))
} }
/* Everything below is private */
func (pc *RTCPeerConnection) generateChannel(h *rtp.Header) (chan *rtp.Packet, chan rtcp.Packet, error) {
pc.RLock()
if pc.onTrackHandler == nil {
pc.RUnlock()
return nil, nil, fmt.Errorf("OnTrack unset, unable to handle incoming")
}
pc.RUnlock()
sdpCodec, err := pc.CurrentLocalDescription.parsed.GetCodecForPayloadType(h.PayloadType)
if err != nil {
return nil, nil, fmt.Errorf("no codec could be found in RemoteDescription for payloadType %d", h.PayloadType)
}
codec, err := pc.api.mediaEngine.getCodecSDP(sdpCodec)
if err != nil {
return nil, nil, fmt.Errorf("codec %s in not registered", sdpCodec)
}
rtpTransport := make(chan *rtp.Packet, 15)
rtcpTransport := make(chan rtcp.Packet, 15)
track := &RTCTrack{
PayloadType: h.PayloadType,
Kind: codec.Type,
ID: "0", // TODO extract from remoteDescription
Label: "", // TODO extract from remoteDescription
Ssrc: h.SSRC,
Codec: codec,
Packets: rtpTransport,
RTCPPackets: rtcpTransport,
}
// TODO: Register the receiving Track
pc.onTrack(track)
return rtpTransport, rtcpTransport, nil
}
func (pc *RTCPeerConnection) iceStateChange(newState ice.ConnectionState) { func (pc *RTCPeerConnection) iceStateChange(newState ice.ConnectionState) {
pc.Lock() pc.Lock()
pc.IceConnectionState = newState pc.IceConnectionState = newState
@@ -1568,20 +1446,17 @@ func (pc *RTCPeerConnection) addDataMediaSection(d *sdp.SessionDescription, midV
d.WithMedia(media) d.WithMedia(media)
} }
// TODO RTCRtpSender // NewRawRTPTrack Creates a new RTCTrack
func (pc *RTCPeerConnection) sendRTP(packet *rtp.Packet) { //
writeStream, err := pc.srtpSession.OpenWriteStream() // See NewRTCSampleTrack for documentation
if err != nil { func (pc *RTCPeerConnection) NewRawRTPTrack(payloadType uint8, ssrc uint32, id, label string) (*RTCTrack, error) {
pcLog.Warnf("SendRTP failed to open WriteStream: %v", err) return NewRawRTPTrack(payloadType, ssrc, id, label)
return
}
if _, err := writeStream.WriteRTP(&packet.Header, packet.Payload); err != nil {
pcLog.Warnf("SendRTP failed to write: %v", err)
}
} }
func (pc *RTCPeerConnection) newRTCTrack(payloadType uint8, ssrc uint32, id, label string) (*RTCTrack, error) { // NewRTCSampleTrack Creates a new RTCTrack
//
// See NewRTCSampleTrack for documentation
func (pc *RTCPeerConnection) NewRTCSampleTrack(payloadType uint8, id, label string) (*RTCTrack, error) {
codec, err := pc.api.mediaEngine.getCodec(payloadType) codec, err := pc.api.mediaEngine.getCodec(payloadType)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -1589,118 +1464,7 @@ func (pc *RTCPeerConnection) newRTCTrack(payloadType uint8, ssrc uint32, id, lab
return nil, errors.New("codec payloader not set") return nil, errors.New("codec payloader not set")
} }
trackInput := make(chan media.RTCSample, 15) // Is the buffering needed? return NewRTCSampleTrack(payloadType, id, label, codec)
rawPackets := make(chan *rtp.Packet, 15) // Is the buffering needed?
rtcpPackets := make(chan rtcp.Packet, 15) // Is the buffering needed?
isRawRTP := false
if ssrc == 0 {
buf := make([]byte, 4)
_, err = rand.Read(buf)
if err != nil {
return nil, errors.New("failed to generate random value")
}
ssrc = binary.LittleEndian.Uint32(buf)
go func() {
packetizer := rtp.NewPacketizer(
1400,
payloadType,
ssrc,
codec.Payloader,
rtp.NewRandomSequencer(),
codec.ClockRate,
)
for {
in, ok := <-trackInput
if !ok {
return
}
packets := packetizer.Packetize(in.Data, in.Samples)
for _, p := range packets {
pc.sendRTP(p)
}
}
}()
close(rawPackets)
} else {
// If SSRC is not 0, then we are working with an established RTP stream
// and need to accept raw RTP packets for forwarding.
isRawRTP = true
go func() {
for {
p, ok := <-rawPackets
if !ok {
return
}
pc.sendRTP(p)
}
}()
close(trackInput)
}
t := &RTCTrack{
isRawRTP: isRawRTP,
PayloadType: payloadType,
Kind: codec.Type,
ID: id,
Label: label,
Ssrc: ssrc,
Codec: codec,
RTCPPackets: rtcpPackets,
Samples: trackInput,
RawRTP: rawPackets,
}
// Inbound RTCP
go func() {
readStream, err := pc.srtcpSession.OpenReadStream(ssrc)
if err != nil {
pcLog.Warnf("Failed to open RTCP ReadStream, RTCTrack done for: %v %d \n", err, ssrc)
return
}
var rtcpPacket rtcp.Packet
for {
rtcpBuf := make([]byte, receiveMTU)
i, err := readStream.Read(rtcpBuf)
if err != nil {
pcLog.Warnf("Failed to read, RTCTrack done for: %v %d \n", err, ssrc)
return
}
rtcpPacket, _, err = rtcp.Unmarshal(rtcpBuf[:i])
if err != nil {
pcLog.Warnf("Failed to unmarshal RTCP packet, discarding: %v \n", err)
continue
}
select {
case rtcpPackets <- rtcpPacket:
default:
}
}
}()
return t, nil
}
// NewRawRTPTrack initializes a new *RTCTrack configured to accept raw *rtp.Packet
//
// NB: If the source RTP stream is being broadcast to multiple tracks, each track
// must receive its own copies of the source packets in order to avoid packet corruption.
func (pc *RTCPeerConnection) NewRawRTPTrack(payloadType uint8, ssrc uint32, id, label string) (*RTCTrack, error) {
if ssrc == 0 {
return nil, errors.New("SSRC supplied to NewRawRTPTrack() must be non-zero")
}
return pc.newRTCTrack(payloadType, ssrc, id, label)
}
// NewRTCSampleTrack initializes a new *RTCTrack configured to accept media.RTCSample
func (pc *RTCPeerConnection) NewRTCSampleTrack(payloadType uint8, id, label string) (*RTCTrack, error) {
return pc.newRTCTrack(payloadType, 0, id, label)
} }
// NewRTCTrack is used to create a new RTCTrack // NewRTCTrack is used to create a new RTCTrack

View File

@@ -0,0 +1,6 @@
package webrtc
// RTCRtpReceiveParameters contains the RTP stack settings used by receivers
type RTCRtpReceiveParameters struct {
encodings RTCRtpDecodingParameters
}

View File

@@ -1,14 +1,113 @@
package webrtc package webrtc
import (
"github.com/pions/webrtc/pkg/rtcp"
"github.com/pions/webrtc/pkg/rtp"
)
// RTCRtpReceiver allows an application to inspect the receipt of a RTCTrack // RTCRtpReceiver allows an application to inspect the receipt of a RTCTrack
type RTCRtpReceiver struct { type RTCRtpReceiver struct {
kind RTCRtpCodecType
transport *RTCDtlsTransport
hasRecv chan bool
Track *RTCTrack Track *RTCTrack
// receiverTrack *RTCTrack
// receiverTransport rtpOut chan *rtp.Packet
// receiverRtcpTransport rtcpOut chan rtcp.Packet
} }
// TODO: receiving side // NewRTCRtpReceiver constructs a new RTCRtpReceiver
// func newRTCRtpReceiver(kind, id string) { func NewRTCRtpReceiver(kind RTCRtpCodecType, transport *RTCDtlsTransport) *RTCRtpReceiver {
// return &RTCRtpReceiver{
// } kind: kind,
transport: transport,
hasRecv: make(chan bool),
rtpOut: make(chan *rtp.Packet, 15),
rtcpOut: make(chan rtcp.Packet, 15),
}
}
// Receive blocks until the RTCTrack is available
func (r *RTCRtpReceiver) Receive(parameters RTCRtpReceiveParameters) chan bool {
// TODO atomic only allow this to fire once
r.Track = &RTCTrack{
Kind: r.kind,
Ssrc: parameters.encodings.SSRC,
Packets: r.rtpOut,
RTCPPackets: r.rtcpOut,
}
// RTP ReadLoop
go func() {
payloadSet := false
defer func() {
if !payloadSet {
close(r.hasRecv)
}
}()
readStream, err := r.transport.srtpSession.OpenReadStream(parameters.encodings.SSRC)
if err != nil {
pcLog.Warnf("Failed to open RTCP ReadStream, RTCTrack done for: %v %d \n", err, parameters.encodings.SSRC)
return
}
readBuf := make([]byte, receiveMTU)
for {
rtpLen, err := readStream.Read(readBuf)
if err != nil {
pcLog.Warnf("Failed to read, RTCTrack done for: %v %d \n", err, parameters.encodings.SSRC)
return
}
var rtpPacket rtp.Packet
if err = rtpPacket.Unmarshal(append([]byte{}, readBuf[:rtpLen]...)); err != nil {
pcLog.Warnf("Failed to unmarshal RTP packet, discarding: %v \n", err)
continue
}
if !payloadSet {
r.Track.PayloadType = rtpPacket.PayloadType
payloadSet = true
close(r.hasRecv)
}
select {
case r.rtpOut <- &rtpPacket:
default:
}
}
}()
// RTCP ReadLoop
go func() {
readStream, err := r.transport.srtcpSession.OpenReadStream(parameters.encodings.SSRC)
if err != nil {
pcLog.Warnf("Failed to open RTCP ReadStream, RTCTrack done for: %v %d \n", err, parameters.encodings.SSRC)
return
}
readBuf := make([]byte, receiveMTU)
for {
rtcpLen, err := readStream.Read(readBuf)
if err != nil {
pcLog.Warnf("Failed to read, RTCTrack done for: %v %d \n", err, parameters.encodings.SSRC)
return
}
rtcpPacket, _, err := rtcp.Unmarshal(append([]byte{}, readBuf[:rtcpLen]...))
if err != nil {
pcLog.Warnf("Failed to unmarshal RTCP packet, discarding: %v \n", err)
continue
}
select {
case r.rtcpOut <- rtcpPacket:
default:
}
}
}()
return r.hasRecv
}

View File

@@ -1,16 +1,122 @@
package webrtc package webrtc
import (
"github.com/pions/webrtc/pkg/media"
"github.com/pions/webrtc/pkg/rtcp"
"github.com/pions/webrtc/pkg/rtp"
)
const rtpOutboundMTU = 1400
// RTCRtpSender allows an application to control how a given RTCTrack is encoded and transmitted to a remote peer // RTCRtpSender allows an application to control how a given RTCTrack is encoded and transmitted to a remote peer
type RTCRtpSender struct { type RTCRtpSender struct {
Track *RTCTrack Track *RTCTrack
// senderTrack *RTCTrack
// senderTransport transport *RTCDtlsTransport
// senderRtcpTransport
} }
func newRTCRtpSender(track *RTCTrack) *RTCRtpSender { // NewRTCRtpSender constructs a new RTCRtpSender
s := &RTCRtpSender{ func NewRTCRtpSender(track *RTCTrack, transport *RTCDtlsTransport) *RTCRtpSender {
return &RTCRtpSender{
Track: track, Track: track,
transport: transport,
}
}
// Send Attempts to set the parameters controlling the sending of media.
func (r *RTCRtpSender) Send(parameters RTCRtpSendParameters) {
sampleInput := make(chan media.RTCSample, 15) // Is the buffering needed?
rawInput := make(chan *rtp.Packet, 15) // Is the buffering needed?
rtcpInput := make(chan rtcp.Packet, 15) // Is the buffering needed?
r.Track.Samples = sampleInput
r.Track.RawRTP = rawInput
r.Track.RTCPPackets = rtcpInput
if r.Track.isRawRTP {
close(r.Track.Samples)
go r.handleRawRTP(r.transport, rawInput)
} else {
close(r.Track.RawRTP)
go r.handleSampleRTP(r.transport, sampleInput)
}
go r.handleRTCP(r.transport, rtcpInput)
}
func (r *RTCRtpSender) handleRawRTP(transport *RTCDtlsTransport, rtpPackets chan *rtp.Packet) {
for {
p, ok := <-rtpPackets
if !ok {
return
}
r.sendRTP(p)
}
}
func (r *RTCRtpSender) handleSampleRTP(transport *RTCDtlsTransport, rtpPackets chan media.RTCSample) {
packetizer := rtp.NewPacketizer(
rtpOutboundMTU,
r.Track.PayloadType,
r.Track.Ssrc,
r.Track.Codec.Payloader,
rtp.NewRandomSequencer(),
r.Track.Codec.ClockRate,
)
for {
in, ok := <-rtpPackets
if !ok {
return
}
packets := packetizer.Packetize(in.Data, in.Samples)
for _, p := range packets {
r.sendRTP(p)
}
}
}
func (r *RTCRtpSender) handleRTCP(transport *RTCDtlsTransport, rtcpPackets chan rtcp.Packet) {
readStream, err := transport.srtcpSession.OpenReadStream(r.Track.Ssrc)
if err != nil {
pcLog.Warnf("Failed to open RTCP ReadStream, RTCTrack done for: %v %d \n", err, r.Track.Ssrc)
return
}
var rtcpPacket rtcp.Packet
for {
rtcpBuf := make([]byte, receiveMTU)
i, err := readStream.Read(rtcpBuf)
if err != nil {
pcLog.Warnf("Failed to read, RTCTrack done for: %v %d \n", err, r.Track.Ssrc)
return
}
rtcpPacket, _, err = rtcp.Unmarshal(rtcpBuf[:i])
if err != nil {
pcLog.Warnf("Failed to unmarshal RTCP packet, discarding: %v \n", err)
continue
}
select {
case rtcpPackets <- rtcpPacket:
default:
}
}
}
func (r *RTCRtpSender) sendRTP(packet *rtp.Packet) {
writeStream, err := r.transport.srtpSession.OpenWriteStream()
if err != nil {
pcLog.Warnf("SendRTP failed to open WriteStream: %v", err)
return
}
if _, err := writeStream.WriteRTP(&packet.Header, packet.Payload); err != nil {
pcLog.Warnf("SendRTP failed to write: %v", err)
} }
return s
} }

6
rtcrtpsendparameters.go Normal file
View File

@@ -0,0 +1,6 @@
package webrtc
// RTCRtpSendParameters contains the RTP stack settings used by receivers
type RTCRtpSendParameters struct {
encodings RTCRtpEncodingParameters
}

View File

@@ -1,22 +1,71 @@
package webrtc package webrtc
import ( import (
"crypto/rand"
"encoding/binary"
"github.com/pions/webrtc/pkg/media" "github.com/pions/webrtc/pkg/media"
"github.com/pions/webrtc/pkg/rtcp" "github.com/pions/webrtc/pkg/rtcp"
"github.com/pions/webrtc/pkg/rtp" "github.com/pions/webrtc/pkg/rtp"
"github.com/pkg/errors"
) )
// RTCTrack represents a track that is communicated // RTCTrack represents a track that is communicated
type RTCTrack struct { type RTCTrack struct {
isRawRTP bool
ID string ID string
PayloadType uint8 PayloadType uint8
Kind RTCRtpCodecType Kind RTCRtpCodecType
Label string Label string
Ssrc uint32 Ssrc uint32
Codec *RTCRtpCodec Codec *RTCRtpCodec
Packets <-chan *rtp.Packet Packets <-chan *rtp.Packet
RTCPPackets <-chan rtcp.Packet RTCPPackets <-chan rtcp.Packet
Samples chan<- media.RTCSample Samples chan<- media.RTCSample
RawRTP chan<- *rtp.Packet RawRTP chan<- *rtp.Packet
isRawRTP bool }
// NewRawRTPTrack initializes a new *RTCTrack configured to accept raw *rtp.Packet
//
// NB: If the source RTP stream is being broadcast to multiple tracks, each track
// must receive its own copies of the source packets in order to avoid packet corruption.
func NewRawRTPTrack(payloadType uint8, ssrc uint32, id, label string) (*RTCTrack, error) {
if ssrc == 0 {
return nil, errors.New("SSRC supplied to NewRawRTPTrack() must be non-zero")
}
return &RTCTrack{
isRawRTP: true,
ID: id,
PayloadType: payloadType,
Label: label,
Ssrc: ssrc,
}, nil
}
// NewRTCSampleTrack initializes a new *RTCTrack configured to accept media.RTCSample
func NewRTCSampleTrack(payloadType uint8, id, label string, codec *RTCRtpCodec) (*RTCTrack, error) {
if codec == nil {
return nil, errors.New("codec supplied to NewRTCSampleTrack() must not be nil")
}
buf := make([]byte, 4)
if _, err := rand.Read(buf); err != nil {
return nil, errors.New("failed to generate random value")
}
return &RTCTrack{
isRawRTP: false,
ID: id,
PayloadType: payloadType,
Kind: codec.Type,
Label: label,
Ssrc: binary.LittleEndian.Uint32(buf),
Codec: codec,
}, nil
} }