api: match WebRTC api more closely

This commit is contained in:
backkem
2018-07-15 20:29:34 +02:00
committed by Sean DuBois
parent 2564609560
commit 7f682d2d2e
21 changed files with 2003 additions and 468 deletions

92
errors.go Normal file
View File

@@ -0,0 +1,92 @@
package webrtc
import (
"errors"
"fmt"
)
// Types of InvalidStateErrors
var (
ErrConnectionClosed = errors.New("connection closed")
)
// InvalidStateError indicates the object is in an invalid state.
type InvalidStateError struct {
Err error
}
func (e *InvalidStateError) Error() string {
return fmt.Sprintf("invalid state error: %v", e.Err)
}
// Types of UnknownErrors
var (
ErrNoConfig = errors.New("no configuration provided")
)
// UnknownError indicates the operation failed for an unknown transient reason
type UnknownError struct {
Err error
}
func (e *UnknownError) Error() string {
return fmt.Sprintf("unknown error: %v", e.Err)
}
// Types of InvalidAccessErrors
var (
ErrCertificateExpired = errors.New("certificate expired")
ErrNoTurnCred = errors.New("turn server credentials required")
ErrTurnCred = errors.New("invalid turn server credentials")
ErrExistingTrack = errors.New("track aready exists")
)
// InvalidAccessError indicates the object does not support the operation or argument.
type InvalidAccessError struct {
Err error
}
func (e *InvalidAccessError) Error() string {
return fmt.Sprintf("invalid access error: %v", e.Err)
}
// Types of NotSupportedErrors
var ()
// NotSupportedError indicates the operation is not supported.
type NotSupportedError struct {
Err error
}
func (e *NotSupportedError) Error() string {
return fmt.Sprintf("not supported error: %v", e.Err)
}
// Types of InvalidModificationErrors
var (
ErrModPeerIdentity = errors.New("peer identity cannot be modified")
ErrModCertificates = errors.New("certificates cannot be modified")
ErrModRtcpMuxPolicy = errors.New("rtcp mux policy cannot be modified")
ErrModIceCandidatePoolSize = errors.New("ice candidate pool size cannot be modified")
)
// InvalidModificationError indicates the object can not be modified in this way.
type InvalidModificationError struct {
Err error
}
func (e *InvalidModificationError) Error() string {
return fmt.Sprintf("invalid modification error: %v", e.Err)
}
// Types of SyntaxErrors
var ()
// SyntaxError indicates the string did not match the expected pattern.
type SyntaxError struct {
Err error
}
func (e *SyntaxError) Error() string {
return fmt.Sprintf("syntax error: %v", e.Err)
}

View File

@@ -23,9 +23,9 @@ type Pipeline struct {
}
// CreatePipeline creates a GStreamer Pipeline
func CreatePipeline(codec webrtc.TrackType) *Pipeline {
func CreatePipeline(codecName string) *Pipeline {
pipelineStr := "appsrc format=time is-live=true do-timestamp=true name=src ! application/x-rtp"
switch codec {
switch codecName {
case webrtc.VP8:
pipelineStr += ", encoding-name=VP8-DRAFT-IETF-01 ! rtpvp8depay ! decodebin ! autovideosink"
case webrtc.Opus:
@@ -35,7 +35,7 @@ func CreatePipeline(codec webrtc.TrackType) *Pipeline {
case webrtc.H264:
pipelineStr += " ! rtph264depay ! decodebin ! autovideosink"
default:
panic("Unhandled codec " + codec.String())
panic("Unhandled codec " + codecName)
}
pipelineStrUnsafe := C.CString(pipelineStr)

View File

@@ -2,6 +2,7 @@ package main
import (
"fmt"
"io"
"os"
"bufio"
@@ -10,13 +11,12 @@ import (
"github.com/pions/webrtc"
"github.com/pions/webrtc/examples/gstreamer-receive/gst"
"github.com/pions/webrtc/pkg/ice"
"github.com/pions/webrtc/pkg/rtp"
)
func main() {
reader := bufio.NewReader(os.Stdin)
rawSd, err := reader.ReadString('\n')
if err != nil {
if err != nil && err != io.EOF {
panic(err)
}
@@ -28,20 +28,25 @@ func main() {
/* Everything below is the pion-WebRTC API, thanks for using it! */
// Setup the codecs you want to use.
// We'll use the default ones but you can also define your own
webrtc.RegisterDefaultCodecs()
// Create a new RTCPeerConnection
peerConnection, err := webrtc.New(&webrtc.RTCConfiguration{})
peerConnection, err := webrtc.New(webrtc.RTCConfiguration{})
if err != nil {
panic(err)
}
// Set a handler for when a new remote track starts, this handler creates a gstreamer pipeline
// for the given codec
peerConnection.Ontrack = func(mediaType webrtc.TrackType, packets <-chan *rtp.Packet) {
fmt.Printf("Track has started, of type %s \n", mediaType.String())
pipeline := gst.CreatePipeline(mediaType)
peerConnection.Ontrack = func(track *webrtc.RTCTrack) {
codec := track.Codec
fmt.Printf("Track has started, of type %d: %s \n", track.PayloadType, codec.Name)
pipeline := gst.CreatePipeline(codec.Name)
pipeline.Start()
for {
p := <-packets
p := <-track.Packets
pipeline.Push(p.Raw)
}
}
@@ -53,17 +58,21 @@ func main() {
}
// Set the remote SessionDescription
if err := peerConnection.SetRemoteDescription(string(sd)); err != nil {
offer := webrtc.RTCSessionDescription{
Typ: webrtc.RTCSdpTypeOffer,
Sdp: string(sd),
}
if err := peerConnection.SetRemoteDescription(offer); err != nil {
panic(err)
}
// Sets the LocalDescription, and starts our UDP listeners
if err := peerConnection.CreateAnswer(); err != nil {
answer, err := peerConnection.CreateAnswer(nil)
if err != nil {
panic(err)
}
// Get the LocalDescription and take it to base64 so we can paste in browser
localDescriptionStr := peerConnection.LocalDescription.Marshal()
fmt.Println(base64.StdEncoding.EncodeToString([]byte(localDescriptionStr)))
fmt.Println(base64.StdEncoding.EncodeToString([]byte(answer.Sdp)))
select {}
}

View File

@@ -24,16 +24,16 @@ type Pipeline struct {
Pipeline *C.GstElement
in chan<- webrtc.RTCSample
id int
codec webrtc.TrackType
codecName string
}
var pipelines = make(map[int]*Pipeline)
var pipelinesLock sync.Mutex
// CreatePipeline creates a GStreamer Pipeline
func CreatePipeline(codec webrtc.TrackType, in chan<- webrtc.RTCSample) *Pipeline {
func CreatePipeline(codecName string, in chan<- webrtc.RTCSample) *Pipeline {
pipelineStr := "appsink name=appsink"
switch codec {
switch codecName {
case webrtc.VP8:
pipelineStr = "videotestsrc ! vp8enc ! " + pipelineStr
case webrtc.VP9:
@@ -43,7 +43,7 @@ func CreatePipeline(codec webrtc.TrackType, in chan<- webrtc.RTCSample) *Pipelin
case webrtc.Opus:
pipelineStr = "audiotestsrc ! opusenc ! " + pipelineStr
default:
panic("Unhandled codec " + codec.String())
panic("Unhandled codec " + codecName)
}
pipelineStrUnsafe := C.CString(pipelineStr)
@@ -56,7 +56,7 @@ func CreatePipeline(codec webrtc.TrackType, in chan<- webrtc.RTCSample) *Pipelin
Pipeline: C.gstreamer_send_create_pipeline(pipelineStrUnsafe),
in: in,
id: len(pipelines),
codec: codec,
codecName: codecName,
}
pipelines[pipeline.id] = pipeline
@@ -85,7 +85,7 @@ func goHandlePipelineBuffer(buffer unsafe.Pointer, bufferLen C.int, duration C.i
if pipeline, ok := pipelines[int(pipelineID)]; ok {
var samples uint32
if pipeline.codec == webrtc.Opus {
if pipeline.codecName == webrtc.Opus {
samples = uint32(audioClockRate * (float32(duration) / 1000000000))
} else {
samples = uint32(videoClockRate * (float32(duration) / 1000000000))

View File

@@ -2,6 +2,7 @@ package main
import (
"fmt"
"io"
"os"
"bufio"
@@ -15,7 +16,7 @@ import (
func main() {
reader := bufio.NewReader(os.Stdin)
rawSd, err := reader.ReadString('\n')
if err != nil {
if err != nil && err != io.EOF {
panic(err)
}
@@ -27,46 +28,62 @@ func main() {
/* Everything below is the pion-WebRTC API, thanks for using it! */
// Setup the codecs you want to use.
// We'll use the default ones but you can also define your own
webrtc.RegisterDefaultCodecs()
// Create a new RTCPeerConnection
peerConnection, err := webrtc.New(&webrtc.RTCConfiguration{})
peerConnection, err := webrtc.New(webrtc.RTCConfiguration{})
if err != nil {
panic(err)
}
// Create a audio track
opusIn, err := peerConnection.AddTrack(webrtc.Opus, 48000)
if err != nil {
panic(err)
}
// Create a video track
vp8In, err := peerConnection.AddTrack(webrtc.VP8, 90000)
if err != nil {
panic(err)
}
// Set the remote SessionDescription
if err := peerConnection.SetRemoteDescription(string(sd)); err != nil {
panic(err)
}
// Sets the LocalDescription, and starts our UDP listeners
if err := peerConnection.CreateAnswer(); err != nil {
panic(err)
}
// Set the handler for ICE connection state
// This will notify you when the peer has connected/disconnected
peerConnection.OnICEConnectionStateChange = func(connectionState ice.ConnectionState) {
fmt.Printf("Connection State has changed %s \n", connectionState.String())
}
// Create a audio track
opusTrack, err := peerConnection.NewRTCTrack(webrtc.PayloadTypeOpus, "audio", "pions1")
if err != nil {
panic(err)
}
_, err = peerConnection.AddTrack(opusTrack)
if err != nil {
panic(err)
}
// Create a video track
vp8Track, err := peerConnection.NewRTCTrack(webrtc.PayloadTypeVP8, "video", "pions2")
if err != nil {
panic(err)
}
_, err = peerConnection.AddTrack(vp8Track)
if err != nil {
panic(err)
}
// Set the remote SessionDescription
offer := webrtc.RTCSessionDescription{
Typ: webrtc.RTCSdpTypeOffer,
Sdp: string(sd),
}
if err := peerConnection.SetRemoteDescription(offer); err != nil {
panic(err)
}
// Sets the LocalDescription, and starts our UDP listeners
answer, err := peerConnection.CreateAnswer(nil)
if err != nil {
panic(err)
}
// Get the LocalDescription and take it to base64 so we can paste in browser
localDescriptionStr := peerConnection.LocalDescription.Marshal()
fmt.Println(base64.StdEncoding.EncodeToString([]byte(localDescriptionStr)))
fmt.Println(base64.StdEncoding.EncodeToString([]byte(answer.Sdp)))
// Start pushing buffers on these tracks
gst.CreatePipeline(webrtc.Opus, opusIn).Start()
gst.CreatePipeline(webrtc.VP8, vp8In).Start()
gst.CreatePipeline(webrtc.Opus, opusTrack.Samples).Start()
gst.CreatePipeline(webrtc.VP8, vp8Track.Samples).Start()
select {}
}

View File

@@ -4,17 +4,17 @@ import (
"bufio"
"encoding/base64"
"fmt"
"io"
"os"
"github.com/pions/webrtc"
"github.com/pions/webrtc/pkg/ice"
"github.com/pions/webrtc/pkg/rtp"
)
func main() {
reader := bufio.NewReader(os.Stdin)
rawSd, err := reader.ReadString('\n')
if err != nil {
if err != nil && err != io.EOF {
panic(err)
}
@@ -26,8 +26,13 @@ func main() {
/* Everything below is the pion-WebRTC API, thanks for using it! */
// Setup the codecs you want to use.
// We'll use a VP8 codec but you can also define your own
webrtc.RegisterCodec(webrtc.NewRTCRtpOpusCodec(webrtc.PayloadTypeOpus, 48000, 2))
webrtc.RegisterCodec(webrtc.NewRTCRtpVP8Codec(webrtc.PayloadTypeVP8, 90000))
// Create a new RTCPeerConnection
peerConnection, err := webrtc.New(&webrtc.RTCConfiguration{})
peerConnection, err := webrtc.New(webrtc.RTCConfiguration{})
if err != nil {
panic(err)
}
@@ -35,15 +40,15 @@ func main() {
// Set a handler for when a new remote track starts, this handler saves buffers to disk as
// an ivf file, since we could have multiple video tracks we provide a counter.
// In your application this is where you would handle/process video
peerConnection.Ontrack = func(mediaType webrtc.TrackType, packets <-chan *rtp.Packet) {
if mediaType == webrtc.VP8 {
peerConnection.Ontrack = func(track *webrtc.RTCTrack) {
if track.Codec.Name == webrtc.VP8 {
fmt.Println("Got VP8 track, saving to disk as output.ivf")
i, err := newIVFWriter("output.ivf")
if err != nil {
panic(err)
}
for {
i.addPacket(<-packets)
i.addPacket(<-track.Packets)
}
}
}
@@ -55,17 +60,21 @@ func main() {
}
// Set the remote SessionDescription
if err := peerConnection.SetRemoteDescription(string(sd)); err != nil {
offer := webrtc.RTCSessionDescription{
Typ: webrtc.RTCSdpTypeOffer,
Sdp: string(sd),
}
if err := peerConnection.SetRemoteDescription(offer); err != nil {
panic(err)
}
// Sets the LocalDescription, and starts our UDP listeners
if err := peerConnection.CreateAnswer(); err != nil {
answer, err := peerConnection.CreateAnswer(nil)
if err != nil {
panic(err)
}
// Get the LocalDescription and take it to base64 so we can paste in browser
localDescriptionStr := peerConnection.LocalDescription.Marshal()
fmt.Println(base64.StdEncoding.EncodeToString([]byte(localDescriptionStr)))
fmt.Println(base64.StdEncoding.EncodeToString([]byte(answer.Sdp)))
select {}
}

View File

@@ -4,17 +4,17 @@ import (
"bufio"
"encoding/base64"
"fmt"
"io"
"os"
"github.com/pions/webrtc"
"github.com/pions/webrtc/pkg/ice"
"github.com/pions/webrtc/pkg/rtp"
)
func main() {
reader := bufio.NewReader(os.Stdin)
rawSd, err := reader.ReadString('\n')
if err != nil {
if err != nil && err != io.EOF {
panic(err)
}
@@ -24,7 +24,14 @@ func main() {
panic(err)
}
peerConnection, err := webrtc.New(&webrtc.RTCConfiguration{
/* Everything below is the pion-WebRTC API, thanks for using it! */
// Setup the codecs you want to use.
// We'll use the default ones but you can also define your own
webrtc.RegisterDefaultCodecs()
// Create a new RTCPeerConnection, providing ICE servers
peerConnection, err := webrtc.New(webrtc.RTCConfiguration{
ICEServers: []webrtc.RTCICEServer{
{
URLs: []string{"stun:stun.l.google.com:19302"},
@@ -35,23 +42,30 @@ func main() {
panic(err)
}
peerConnection.Ontrack = func(mediaType webrtc.TrackType, packets <-chan *rtp.Packet) {
fmt.Printf("Got a %s track\n", mediaType)
peerConnection.Ontrack = func(track *webrtc.RTCTrack) {
fmt.Printf("Got a %s track\n", track.Codec.Name)
}
peerConnection.OnICEConnectionStateChange = func(connectionState ice.ConnectionState) {
fmt.Printf("Connection State has changed %s \n", connectionState.String())
}
if err := peerConnection.SetRemoteDescription(string(sd)); err != nil {
// Set the remote SessionDescription
offer := webrtc.RTCSessionDescription{
Typ: webrtc.RTCSdpTypeOffer,
Sdp: string(sd),
}
if err := peerConnection.SetRemoteDescription(offer); err != nil {
panic(err)
}
if err := peerConnection.CreateAnswer(); err != nil {
// Sets the LocalDescription, and starts our UDP listeners
answer, err := peerConnection.CreateAnswer(nil)
if err != nil {
panic(err)
}
localDescriptionStr := peerConnection.LocalDescription.Marshal()
fmt.Println(base64.StdEncoding.EncodeToString([]byte(localDescriptionStr)))
// Get the LocalDescription and take it to base64 so we can paste in browser
fmt.Println(base64.StdEncoding.EncodeToString([]byte(answer.Sdp)))
select {}
}

View File

@@ -94,7 +94,7 @@ func (p *Port) handleICE(in *incomingPacket, remoteKey []byte, iceTimer *time.Ti
); err != nil {
fmt.Println(err)
} else {
p.ICEState = ice.Completed
p.ICEState = ice.ConnectionStateCompleted
iceTimer.Reset(iceTimeout)
iceNotifier(p)
}
@@ -131,7 +131,7 @@ func (p *Port) networkLoop(remoteKey []byte, tlscfg *dtls.TLSCfg, b BufferTransp
for {
select {
case <-iceTimer.C:
p.ICEState = ice.Failed
p.ICEState = ice.ConnectionStateFailed
iceNotifier(p)
case in, inValid := <-incomingPackets:
if !inValid {

135
internal/sdp/jsep.go Normal file
View File

@@ -0,0 +1,135 @@
package sdp
import (
"fmt"
"time"
)
// Constants for SDP attributes used in JSEP
const (
AttrKeyIdentity = "identity"
AttrKeyGroup = "group"
AttrKeySsrc = "ssrc"
AttrKeySsrcGroup = "ssrc-group"
AttrKeyMsidSemantic = "msid-semantic"
AttrKeyConnectionSetup = "setup"
AttrKeyMID = "mid"
AttrKeyICELite = "ice-lite"
AttrKeyRtcpMux = "rtcp-mux"
AttrKeyRtcpRsize = "rtcp-rsize"
)
// Constants for semantic tokens used in JSEP
const (
SemanticTokenLipSynchronization = "LS"
SemanticTokenFlowIdentification = "FID"
SemanticTokenForwardErrorCorrection = "FEC"
SemanticTokenWebRTCMediaStreams = "WMS"
)
// API to match draft-ietf-rtcweb-jsep
// Move to webrtc or its own package?
// NewJSEPSessionDescription creates a new SessionDescription with
// some settings that are required by the JSEP spec.
func NewJSEPSessionDescription(fingerprint string, identity bool) *SessionDescription {
d := &SessionDescription{
ProtocolVersion: 0,
Origin: fmt.Sprintf(
"- %d %d IN IP4 0.0.0.0",
newSessionID(),
time.Now().Unix(),
),
SessionName: "-",
Timing: []string{"0 0"},
Attributes: []string{
// "ice-options:trickle", // TODO: implement trickle ICE
"fingerprint:sha-256 " + fingerprint,
},
}
if identity {
d.WithPropertyAttribute(AttrKeyIdentity)
}
return d
}
// WithPropertyAttribute adds a property attribute 'a=key' to the session description
func (d *SessionDescription) WithPropertyAttribute(key string) *SessionDescription {
d.Attributes = append(d.Attributes, key)
return d
}
// WithValueAttribute adds a value attribute 'a=key:value' to the session description
func (d *SessionDescription) WithValueAttribute(key, value string) *SessionDescription {
d.Attributes = append(d.Attributes, fmt.Sprintf("%s:%s", key, value))
return d
}
// WithMedia adds a media description to the session description
func (d *SessionDescription) WithMedia(md *MediaDescription) *SessionDescription {
d.MediaDescriptions = append(d.MediaDescriptions, md)
return d
}
// NewJSEPMediaDescription creates a new MediaDescription with
// some settings that are required by the JSEP spec.
func NewJSEPMediaDescription(typ string, codecPrefs []string) *MediaDescription {
// TODO: handle codecPrefs
d := &MediaDescription{
MediaName: fmt.Sprintf("%s 9 UDP/TLS/RTP/SAVPF", typ), // TODO: other transports?
ConnectionData: "IN IP4 0.0.0.0",
Attributes: []string{},
}
return d
}
// WithPropertyAttribute adds a property attribute 'a=key' to the media description
func (d *MediaDescription) WithPropertyAttribute(key string) *MediaDescription {
d.Attributes = append(d.Attributes, key)
return d
}
// WithValueAttribute adds a value attribute 'a=key:value' to the media description
func (d *MediaDescription) WithValueAttribute(key, value string) *MediaDescription {
d.Attributes = append(d.Attributes, fmt.Sprintf("%s:%s", key, value))
return d
}
// WithICECredentials adds ICE credentials to the media description
func (d *MediaDescription) WithICECredentials(username, password string) *MediaDescription {
return d.
WithValueAttribute("ice-ufrag", username).
WithValueAttribute("ice-pwd", password)
}
// WithCodec adds codec information to the media description
func (d *MediaDescription) WithCodec(payloadType uint8, name string, clockrate uint32, channels uint16, fmtp string) *MediaDescription {
d.MediaName = fmt.Sprintf("%s %d", d.MediaName, payloadType)
rtpmap := fmt.Sprintf("%d %s/%d", payloadType, name, clockrate)
if channels > 0 {
rtpmap = rtpmap + fmt.Sprintf("/%d", channels)
}
d.WithValueAttribute("rtpmap", rtpmap)
if fmtp != "" {
d.WithValueAttribute("fmtp", fmt.Sprintf("%d %s", payloadType, fmtp))
}
return d
}
// WithMediaSource adds media source information to the media description
func (d *MediaDescription) WithMediaSource(ssrc uint32, cname, streamLabel, label string) *MediaDescription {
return d.
WithValueAttribute("ssrc", fmt.Sprintf("%d cname:%s", ssrc, cname)). // Deprecated but not pased out?
WithValueAttribute("ssrc", fmt.Sprintf("%d msid:%s %s", ssrc, streamLabel, label)).
WithValueAttribute("ssrc", fmt.Sprintf("%d mslabel:%s", ssrc, streamLabel)). // Deprecated but not pased out?
WithValueAttribute("ssrc", fmt.Sprintf("%d label:%s", ssrc, label)) // Deprecated but not pased out?
}
// WithCandidate adds an ICE candidate to the media description
func (d *MediaDescription) WithCandidate(id int, transport string, basePriority uint16, ip string, port int, typ string) *MediaDescription {
return d.
WithValueAttribute("candidate",
fmt.Sprintf("%scandidate %d %s %d %s %d typ %s", transport, id, transport, basePriority, ip, port, typ))
}

View File

@@ -1,6 +1,7 @@
package sdp
import (
"errors"
"fmt"
"math/rand"
"strconv"
@@ -22,102 +23,99 @@ type SessionBuilder struct {
Tracks []*SessionBuilderTrack
}
// BaseSessionDescription generates a default SDP response that is ice-lite, initiates the DTLS session and
// supports VP8, VP9, H264 and Opus
func BaseSessionDescription(b *SessionBuilder) *SessionDescription {
addMediaCandidates := func(m *MediaDescription) *MediaDescription {
m.Attributes = append(m.Attributes, b.Candidates...)
m.Attributes = append(m.Attributes, "end-of-candidates")
return m
}
// ConnectionRole indicates which of the end points should initiate the connection establishment
type ConnectionRole int
audioMediaDescription := &MediaDescription{
MediaName: "audio 9 RTP/SAVPF 111",
ConnectionData: "IN IP4 127.0.0.1",
Attributes: []string{
"setup:active",
"mid:audio",
"sendrecv",
"ice-ufrag:" + b.IceUsername,
"ice-pwd:" + b.IcePassword,
"ice-lite",
"fingerprint:sha-256 " + b.Fingerprint,
"rtcp-mux",
"rtcp-rsize",
"rtpmap:111 opus/48000/2",
"fmtp:111 minptime=10;useinbandfec=1",
},
}
const (
videoMediaDescription := &MediaDescription{
MediaName: "video 9 RTP/SAVPF 96 98 100",
ConnectionData: "IN IP4 127.0.0.1",
Attributes: []string{
"setup:active",
"mid:video",
"sendrecv",
"ice-ufrag:" + b.IceUsername,
"ice-pwd:" + b.IcePassword,
"ice-lite",
"fingerprint:sha-256 " + b.Fingerprint,
"rtcp-mux",
"rtcp-rsize",
"rtpmap:96 VP8/90000",
"rtpmap:98 VP9/90000",
"rtpmap:100 H264/90000",
"fmtp:100 level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f",
},
}
// ConnectionRoleActive indicates the endpoint will initiate an outgoing connection.
ConnectionRoleActive ConnectionRole = iota + 1
mediaStreamsAttribute := "msid-semantic: WMS"
for i, track := range b.Tracks {
var attributes *[]string
if track.IsAudio {
attributes = &audioMediaDescription.Attributes
} else {
attributes = &videoMediaDescription.Attributes
}
appendAttr := func(attr string) {
*attributes = append(*attributes, attr)
}
// ConnectionRolePassive indicates the endpoint will accept an incoming connection.
ConnectionRolePassive
appendAttr("ssrc:" + fmt.Sprint(track.SSRC) + " cname:pion" + strconv.Itoa(i))
appendAttr("ssrc:" + fmt.Sprint(track.SSRC) + " msid:pion" + strconv.Itoa(i) + " pion" + strconv.Itoa(i))
appendAttr("ssrc:" + fmt.Sprint(track.SSRC) + " mslabel:pion" + strconv.Itoa(i))
appendAttr("ssrc:" + fmt.Sprint(track.SSRC) + " label:pion" + strconv.Itoa(i))
// ConnectionRoleActpass indicates the endpoint is willing to accept an incoming connection or to initiate an outgoing connection.
ConnectionRoleActpass
mediaStreamsAttribute += " pion" + strconv.Itoa(i)
}
// ConnectionRoleHoldconn indicates the endpoint does not want the connection to be established for the time being.
ConnectionRoleHoldconn
)
sessionID := strconv.FormatUint(uint64(rand.Uint32())<<32+uint64(rand.Uint32()), 10)
return &SessionDescription{
ProtocolVersion: 0,
Origin: "pion-webrtc " + sessionID + " 2 IN IP4 0.0.0.0",
SessionName: "-",
Timing: []string{"0 0"},
Attributes: []string{
"group:BUNDLE audio video",
mediaStreamsAttribute,
},
MediaDescriptions: []*MediaDescription{
addMediaCandidates(audioMediaDescription),
addMediaCandidates(videoMediaDescription),
},
func (t ConnectionRole) String() string {
switch t {
case ConnectionRoleActive:
return "active"
case ConnectionRolePassive:
return "passive"
case ConnectionRoleActpass:
return "actpass"
case ConnectionRoleHoldconn:
return "holdconn"
default:
return "Unknown"
}
}
func newSessionID() uint64 {
return uint64(rand.Uint32())<<32 + uint64(rand.Uint32())
}
// Codec represents a codec
type Codec struct {
PayloadType uint8
Name string
ClockRate uint32
EncodingParameters string
Fmtp string
}
func (c Codec) String() string {
return fmt.Sprintf("%d %s/%d/%s", c.PayloadType, c.Name, c.ClockRate, c.EncodingParameters)
}
// GetCodecForPayloadType scans the SessionDescription for the given payloadType and returns the codec
func GetCodecForPayloadType(payloadType uint8, sd *SessionDescription) (ok bool, codec string) {
func (sd *SessionDescription) GetCodecForPayloadType(payloadType uint8) (Codec, error) {
codec := Codec{
PayloadType: payloadType,
}
found := false
payloadTypeString := strconv.Itoa(int(payloadType))
rtpmapPrefix := "rtpmap:" + payloadTypeString
fmtpPrefix := "fmtp:" + payloadTypeString
for _, m := range sd.MediaDescriptions {
for _, a := range m.Attributes {
if strings.Contains(a, "rtpmap:"+strconv.Itoa(int(payloadType))) {
if strings.HasPrefix(a, rtpmapPrefix) {
found = true
// a=rtpmap:<payload type> <encoding name>/<clock rate> [/<encoding parameters>]
split := strings.Split(a, " ")
if len(split) == 2 {
split := strings.Split(split[1], "/")
return true, split[0]
codec.Name = split[0]
parts := len(split)
if parts > 1 {
rate, err := strconv.Atoi(split[1])
if err != nil {
return codec, err
}
codec.ClockRate = uint32(rate)
}
if parts > 2 {
codec.EncodingParameters = split[2]
}
}
} else if strings.HasPrefix(a, fmtpPrefix) {
// a=fmtp:<format> <format specific parameters>
split := strings.Split(a, " ")
if len(split) == 2 {
codec.Fmtp = split[1]
}
}
}
if found {
return codec, nil
}
return false, ""
}
return codec, errors.New("payload type not found")
}

246
media.go Normal file
View File

@@ -0,0 +1,246 @@
package webrtc
import (
"math/rand"
"github.com/pions/webrtc/pkg/rtp"
"github.com/pkg/errors"
)
// RTCRtpReceiver allows an application to inspect the receipt of a RTCTrack
type RTCRtpReceiver struct {
Track *RTCTrack
// receiverTrack *RTCTrack
// receiverTransport
// receiverRtcpTransport
}
// TODO: receiving side
// func newRTCRtpReceiver(kind, id string) {
//
// }
// RTCRtpSender allows an application to control how a given RTCTrack is encoded and transmitted to a remote peer
type RTCRtpSender struct {
Track *RTCTrack
// senderTrack *RTCTrack
// senderTransport
// senderRtcpTransport
}
func newRTCRtpSender(track *RTCTrack) *RTCRtpSender {
s := &RTCRtpSender{
Track: track,
}
return s
}
// RTCRtpTransceiverDirection indicates the direction of the RTCRtpTransceiver
type RTCRtpTransceiverDirection int
const (
// RTCRtpTransceiverDirectionSendrecv indicates the RTCRtpSender will offer to send RTP and RTCRtpReceiver the will offer to receive RTP
RTCRtpTransceiverDirectionSendrecv RTCRtpTransceiverDirection = iota + 1
// RTCRtpTransceiverDirectionSendonly indicates the RTCRtpSender will offer to send RTP
RTCRtpTransceiverDirectionSendonly
// RTCRtpTransceiverDirectionRecvonly indicates the RTCRtpReceiver the will offer to receive RTP
RTCRtpTransceiverDirectionRecvonly
// RTCRtpTransceiverDirectionInactive indicates the RTCRtpSender won't offer to send RTP and RTCRtpReceiver the won't offer to receive RTP
RTCRtpTransceiverDirectionInactive
)
func (t RTCRtpTransceiverDirection) String() string {
switch t {
case RTCRtpTransceiverDirectionSendrecv:
return "sendrecv"
case RTCRtpTransceiverDirectionSendonly:
return "sendonly"
case RTCRtpTransceiverDirectionRecvonly:
return "recvonly"
case RTCRtpTransceiverDirectionInactive:
return "inactive"
default:
return "Unknown"
}
}
// RTCRtpTransceiver represents a combination of an RTCRtpSender and an RTCRtpReceiver that share a common mid.
type RTCRtpTransceiver struct {
Mid string
Sender *RTCRtpSender
Receiver *RTCRtpReceiver
Direction RTCRtpTransceiverDirection
// currentDirection RTCRtpTransceiverDirection
// firedDirection RTCRtpTransceiverDirection
// receptive bool
stopped bool
}
func (t *RTCRtpTransceiver) setSendingTrack(track *RTCTrack) {
t.Sender.Track = track
switch t.Direction {
case RTCRtpTransceiverDirectionRecvonly:
t.Direction = RTCRtpTransceiverDirectionSendrecv
case RTCRtpTransceiverDirectionInactive:
t.Direction = RTCRtpTransceiverDirectionSendonly
default:
panic("Invalid state change in RTCRtpTransceiver.setSending")
}
}
func (r *RTCPeerConnection) newRTCRtpTransceiver(
receiver *RTCRtpReceiver,
sender *RTCRtpSender,
direction RTCRtpTransceiverDirection,
) *RTCRtpTransceiver {
t := &RTCRtpTransceiver{
Receiver: receiver,
Sender: sender,
Direction: direction,
}
r.rtpTransceivers = append(r.rtpTransceivers, t)
return t
}
// Stop irreversibly stops the RTCRtpTransceiver
func (t *RTCRtpTransceiver) Stop() error {
panic("TODO")
}
// RTCSample contains media, and the amount of samples in it
type RTCSample struct {
Data []byte
Samples uint32
}
// RTCTrack represents a track that is communicated
type RTCTrack struct {
PayloadType uint8
Kind RTCRtpCodecType
ID string
Label string
Ssrc uint32
Codec *RTCRtpCodec
Packets <-chan *rtp.Packet
Samples chan<- RTCSample
}
// NewRTCTrack is used to create a new RTCTrack
func (r *RTCPeerConnection) NewRTCTrack(payloadType uint8, id, label string) (*RTCTrack, error) {
codec, err := rtcMediaEngine.getCodec(payloadType)
if err != nil {
return nil, err
}
if codec.Payloader == nil {
return nil, errors.New("codec payloader not set")
}
trackInput := make(chan RTCSample, 15) // Is the buffering needed?
ssrc := rand.Uint32()
go func() {
packetizer := rtp.NewPacketizer(
1400,
payloadType,
ssrc,
codec.Payloader,
rtp.NewRandomSequencer(),
codec.ClockRate,
)
for {
in := <-trackInput
packets := packetizer.Packetize(in.Data, in.Samples)
for _, p := range packets {
for _, port := range r.ports {
port.Send(p)
}
}
}
}()
t := &RTCTrack{
PayloadType: payloadType,
Kind: codec.Type,
ID: id,
Label: label,
Ssrc: ssrc,
Codec: codec,
Samples: trackInput,
}
return t, nil
}
// AddTrack adds a RTCTrack to the RTCPeerConnection
func (r *RTCPeerConnection) AddTrack(track *RTCTrack) (*RTCRtpSender, error) {
if r.IsClosed {
return nil, &InvalidStateError{Err: ErrConnectionClosed}
}
for _, tranceiver := range r.rtpTransceivers {
if tranceiver.Sender.Track == nil {
continue
}
if track.ID == tranceiver.Sender.Track.ID {
return nil, &InvalidAccessError{Err: ErrExistingTrack}
}
}
var tranciever *RTCRtpTransceiver
for _, t := range r.rtpTransceivers {
if !t.stopped &&
// t.Sender == nil && // TODO: check that the sender has never sent
t.Sender.Track == nil &&
t.Receiver.Track != nil &&
t.Receiver.Track.Kind == track.Kind {
tranciever = t
break
}
}
if tranciever != nil {
tranciever.setSendingTrack(track)
} else {
var receiver *RTCRtpReceiver
sender := newRTCRtpSender(track)
tranciever = r.newRTCRtpTransceiver(
receiver,
sender,
RTCRtpTransceiverDirectionSendonly,
)
}
tranciever.Mid = track.Kind.String() // TODO: Mid generation
return tranciever.Sender, nil
}
// GetSenders returns the RTCRtpSender that are currently attached to this RTCPeerConnection
func (r *RTCPeerConnection) GetSenders() []RTCRtpSender {
result := make([]RTCRtpSender, len(r.rtpTransceivers))
for i, tranceiver := range r.rtpTransceivers {
result[i] = *tranceiver.Sender
}
return result
}
// GetReceivers returns the RTCRtpReceivers that are currently attached to this RTCPeerConnection
func (r *RTCPeerConnection) GetReceivers() []RTCRtpReceiver {
result := make([]RTCRtpReceiver, len(r.rtpTransceivers))
for i, tranceiver := range r.rtpTransceivers {
result[i] = *tranceiver.Receiver
}
return result
}
// GetTransceivers returns the RTCRtpTransceiver that are currently attached to this RTCPeerConnection
func (r *RTCPeerConnection) GetTransceivers() []RTCRtpTransceiver {
result := make([]RTCRtpTransceiver, len(r.rtpTransceivers))
for i, tranceiver := range r.rtpTransceivers {
result[i] = *tranceiver
}
return result
}

210
mediaengine.go Normal file
View File

@@ -0,0 +1,210 @@
package webrtc
import (
"strconv"
"github.com/pions/webrtc/internal/sdp"
"github.com/pions/webrtc/pkg/rtp"
"github.com/pions/webrtc/pkg/rtp/codecs"
"github.com/pkg/errors"
)
// PayloadTypes for the default codecs
const (
PayloadTypeOpus = 111
PayloadTypeVP8 = 96
PayloadTypeVP9 = 98
PayloadTypeH264 = 100
)
// Names for the default codecs
const (
Opus = "opus"
VP8 = "VP8"
VP9 = "VP9"
H264 = "H264"
)
var rtcMediaEngine = &mediaEngine{}
// RegisterDefaultCodecs is a helper that registers the default codecs supported by pions-webrtc
func RegisterDefaultCodecs() {
RegisterCodec(NewRTCRtpOpusCodec(PayloadTypeOpus, 48000, 2))
RegisterCodec(NewRTCRtpVP8Codec(PayloadTypeVP8, 90000))
RegisterCodec(NewRTCRtpH264Codec(PayloadTypeH264, 90000))
RegisterCodec(NewRTCRtpVP9Codec(PayloadTypeVP9, 90000))
}
// RegisterCodec is used to register a codec
func RegisterCodec(codec *RTCRtpCodec) {
rtcMediaEngine.RegisterCodec(codec)
}
type mediaEngine struct {
codecs []*RTCRtpCodec
}
func (m *mediaEngine) RegisterCodec(codec *RTCRtpCodec) uint8 {
// TODO: generate PayloadType if not set
m.codecs = append(m.codecs, codec)
return codec.PayloadType
}
func (m *mediaEngine) ClearCodecs() {
m.codecs = nil
}
func (m *mediaEngine) getCodec(payloadType uint8) (*RTCRtpCodec, error) {
for _, codec := range m.codecs {
if codec.PayloadType == payloadType {
return codec, nil
}
}
return nil, errors.New("Codec not found")
}
func (m *mediaEngine) getCodecSDP(sdpCodec sdp.Codec) (*RTCRtpCodec, error) {
for _, codec := range m.codecs {
if codec.Name == sdpCodec.Name &&
codec.ClockRate == sdpCodec.ClockRate &&
(sdpCodec.EncodingParameters == "" ||
strconv.Itoa(int(codec.Channels)) == sdpCodec.EncodingParameters) &&
codec.SdpFmtpLine == sdpCodec.Fmtp { // TODO: Protocol specific matching?
return codec, nil
}
}
return nil, errors.New("Codec not found")
}
func (m *mediaEngine) getCodecsByKind(kind RTCRtpCodecType) []*RTCRtpCodec {
var codecs []*RTCRtpCodec
for _, codec := range rtcMediaEngine.codecs {
if codec.Type == kind {
codecs = append(codecs, codec)
}
}
return codecs
}
// NewRTCRtpOpusCodec is a helper to create an Opus codec
func NewRTCRtpOpusCodec(payloadType uint8, clockrate uint32, channels uint16) *RTCRtpCodec {
c := NewRTCRtpCodec(RTCRtpCodecTypeAudio,
Opus,
clockrate,
channels,
"minptime=10;useinbandfec=1",
payloadType,
&codecs.OpusPayloader{})
return c
}
// NewRTCRtpVP8Codec is a helper to create an VP8 codec
func NewRTCRtpVP8Codec(payloadType uint8, clockrate uint32) *RTCRtpCodec {
c := NewRTCRtpCodec(RTCRtpCodecTypeVideo,
VP8,
clockrate,
0,
"",
payloadType,
&codecs.VP8Payloader{})
return c
}
// NewRTCRtpVP9Codec is a helper to create an VP9 codec
func NewRTCRtpVP9Codec(payloadType uint8, clockrate uint32) *RTCRtpCodec {
c := NewRTCRtpCodec(RTCRtpCodecTypeVideo,
VP9,
clockrate,
0,
"",
payloadType,
nil) // TODO
return c
}
// NewRTCRtpH264Codec is a helper to create an H264 codec
func NewRTCRtpH264Codec(payloadType uint8, clockrate uint32) *RTCRtpCodec {
c := NewRTCRtpCodec(RTCRtpCodecTypeVideo,
H264,
clockrate,
0,
"level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f",
payloadType,
&codecs.H264Payloader{})
return c
}
// RTCRtpCodecType determines the type of a codec
type RTCRtpCodecType int
const (
// RTCRtpCodecTypeAudio indicates this is an audio codec
RTCRtpCodecTypeAudio RTCRtpCodecType = iota + 1
// RTCRtpCodecTypeVideo indicates this is a video codec
RTCRtpCodecTypeVideo
)
func (t RTCRtpCodecType) String() string {
switch t {
case RTCRtpCodecTypeAudio:
return "audio"
case RTCRtpCodecTypeVideo:
return "video"
default:
return "Unknown"
}
}
// RTCRtpCodec represents a codec supported by the PeerConnection
type RTCRtpCodec struct {
RTCRtpCodecCapability
Type RTCRtpCodecType
Name string
PayloadType uint8
Payloader rtp.Payloader
}
// NewRTCRtpCodec is used to define a new codec
func NewRTCRtpCodec(
typ RTCRtpCodecType,
name string,
clockrate uint32,
channels uint16,
fmtp string,
payloadType uint8,
payloader rtp.Payloader,
) *RTCRtpCodec {
return &RTCRtpCodec{
RTCRtpCodecCapability: RTCRtpCodecCapability{
MimeType: typ.String() + "/" + name,
ClockRate: clockrate,
Channels: channels,
SdpFmtpLine: fmtp,
},
PayloadType: payloadType,
Payloader: payloader,
Type: typ,
Name: name,
}
}
// RTCRtpCodecCapability provides information about codec capabilities.
type RTCRtpCodecCapability struct {
MimeType string
ClockRate uint32
Channels uint16
SdpFmtpLine string
}
// RTCRtpHeaderExtensionCapability is used to define a RFC5285 RTP header extension supported by the codec.
type RTCRtpHeaderExtensionCapability struct {
URI string
}
// RTCRtpCapabilities represents the capabilities of a transceiver
type RTCRtpCapabilities struct {
Codecs []RTCRtpCodecCapability
HeaderExtensions []RTCRtpHeaderExtensionCapability
}

179
pkg/ice/address.go Normal file
View File

@@ -0,0 +1,179 @@
package ice
import (
"log"
"strconv"
"strings"
"github.com/pkg/errors"
)
// TODO: Migrate address parsing to STUN/TURN packages?
var (
// ErrServerType indicates the server type could not be parsed
ErrServerType = errors.New("unknown server type")
// ErrSTUNQuery indicates query arguments are provided in a STUN URL
ErrSTUNQuery = errors.New("queries not supported in stun address")
// ErrInvalidQuery indicates an unsupported query is provided
ErrInvalidQuery = errors.New("invalid query")
// ErrTransportType indicates an unsupported transport type was provided
ErrTransportType = errors.New("invalid transport type")
// ErrHost indicates the server hostname could not be parsed
ErrHost = errors.New("invalid hostname")
// ErrPort indicates the server port could not be parsed
ErrPort = errors.New("invalid port")
)
// ServerType indicates the type of server used
type ServerType int
const (
// ServerTypeSTUN indicates the URL represents a STUN server
ServerTypeSTUN ServerType = iota + 1
// ServerTypeTURN indicates the URL represents a TURN server
ServerTypeTURN
)
func (t ServerType) String() string {
switch t {
case ServerTypeSTUN:
return "stun"
case ServerTypeTURN:
return "turn"
default:
return "Unknown"
}
}
// TransportType indicates the transport that is used
type TransportType int
const (
// TransportUDP indicates the URL uses a UDP transport
TransportUDP TransportType = iota + 1
// TransportTCP indicates the URL uses a TCP transport
TransportTCP
)
func (t TransportType) String() string {
switch t {
case TransportUDP:
return "udp"
case TransportTCP:
return "tcp"
default:
return "Unknown"
}
}
// URL represents a STUN (rfc7064) or TRUN (rfc7065) URL
type URL struct {
Type ServerType
Secure bool
Host string
Port int
TransportType TransportType
}
// NewURL creates a new URL by parsing a STUN (rfc7064) or TRUN (rfc7065) uri string
func NewURL(address string) (URL, error) {
var result URL
var scheme string
scheme, address = split(address, ":")
switch strings.ToLower(scheme) {
case "stun":
result.Type = ServerTypeSTUN
result.Secure = false
case "stuns":
result.Type = ServerTypeSTUN
result.Secure = true
case "turn":
result.Type = ServerTypeTURN
result.Secure = false
case "turns":
result.Type = ServerTypeTURN
result.Secure = true
default:
return result, ErrServerType
}
var query string
address, query = split(address, "?")
if query != "" {
if result.Type == ServerTypeSTUN {
return result, ErrSTUNQuery
}
key, value := split(query, "=")
if strings.ToLower(key) != "transport" {
return result, ErrInvalidQuery
}
switch strings.ToLower(value) {
case "udp":
result.TransportType = TransportUDP
case "tcp":
result.TransportType = TransportTCP
default:
return result, ErrTransportType
}
} else {
if result.Secure {
result.TransportType = TransportTCP
} else {
result.TransportType = TransportUDP
}
}
var host string
var port string
colon := strings.IndexByte(address, ':')
if colon == -1 {
host = address
if result.Secure {
port = "5349"
} else {
port = "3478"
}
} else if i := strings.IndexByte(address, ']'); i != -1 {
host = strings.TrimPrefix(address[:i], "[")
port = address[i+1+len(":"):]
log.Println(port)
} else {
host = address[:colon]
port = address[colon+len(":"):]
}
if host == "" {
return result, ErrHost
}
result.Host = strings.ToLower(host)
var err error
result.Port, err = strconv.Atoi(port)
if err != nil {
return result, ErrPort
}
return result, nil
}
func split(s string, c string) (string, string) {
i := strings.Index(s, c)
if i < 0 {
return s, ""
}
return s[:i], s[i+len(c):]
}

68
pkg/ice/address_test.go Normal file
View File

@@ -0,0 +1,68 @@
package ice
import "testing"
func TestNewURL(t *testing.T) {
t.Run("Success", func(t *testing.T) {
testCases := []struct {
rawURL string
expectedType ServerType
expectedSecure bool
expectedHost string
expectedPort int
expectedTransportType TransportType
}{
{"stun:google.de", ServerTypeSTUN, false, "google.de", 3478, TransportUDP},
{"stun:google.de:1234", ServerTypeSTUN, false, "google.de", 1234, TransportUDP},
{"stuns:google.de", ServerTypeSTUN, true, "google.de", 5349, TransportTCP},
{"stun:[::1]:123", ServerTypeSTUN, false, "::1", 123, TransportUDP},
{"turn:google.de", ServerTypeTURN, false, "google.de", 3478, TransportUDP},
{"turns:google.de", ServerTypeTURN, true, "google.de", 5349, TransportTCP},
{"turn:google.de?transport=udp", ServerTypeTURN, false, "google.de", 3478, TransportUDP},
{"turn:google.de?transport=tcp", ServerTypeTURN, false, "google.de", 3478, TransportTCP},
}
for i, testCase := range testCases {
url, err := NewURL(testCase.rawURL)
if err != nil {
t.Errorf("Case %d: got error: %v", i, err)
}
if url.Type != testCase.expectedType ||
url.Secure != testCase.expectedSecure ||
url.Host != testCase.expectedHost ||
url.Port != testCase.expectedPort ||
url.TransportType != testCase.expectedTransportType {
t.Errorf("Case %d: got %s %t %s %d %s",
i,
url.Type,
url.Secure,
url.Host,
url.Port,
url.TransportType,
)
}
}
})
t.Run("Failure", func(t *testing.T) {
testCases := []struct {
rawURL string
expectedErr error
}{
{"", ErrServerType},
{":::", ErrServerType},
{"google.de", ErrServerType},
{"stun:", ErrHost},
{"stun:google.de:abc", ErrPort},
{"stun:google.de?transport=udp", ErrSTUNQuery},
{"turn:google.de?trans=udp", ErrInvalidQuery},
{"turn:google.de?transport=ip", ErrTransportType},
}
for i, testCase := range testCases {
if _, err := NewURL(testCase.rawURL); err != testCase.expectedErr {
t.Errorf("Case %d: got error '%v' expected '%v'", i, err, testCase.expectedErr)
}
}
})
}

23
pkg/ice/agent.go Normal file
View File

@@ -0,0 +1,23 @@
package ice
import "github.com/pions/webrtc/internal/util"
// Agent represents the ICE agent
type Agent struct {
Servers [][]URL
Ufrag string
Pwd string
}
// NewAgent creates a new Agent
func NewAgent() *Agent {
return &Agent{
Ufrag: util.RandSeq(16),
Pwd: util.RandSeq(32),
}
}
// SetServers is used to set the ICE servers used by the Agent
func (a *Agent) SetServers(urls [][]URL) {
a.Servers = urls
}

View File

@@ -7,49 +7,76 @@ type ConnectionState int
// List of supported States
const (
// New ICE agent is gathering addresses
New = iota + 1
// ConnectionStateNew ICE agent is gathering addresses
ConnectionStateNew = iota + 1
// Checking ICE agent has been given local and remote candidates, and is attempting to find a match
Checking
// ConnectionStateChecking ICE agent has been given local and remote candidates, and is attempting to find a match
ConnectionStateChecking
// Connected ICE agent has a pairing, but is still checking other pairs
Connected
// ConnectionStateConnected ICE agent has a pairing, but is still checking other pairs
ConnectionStateConnected
// Completed ICE agent has finished
Completed
// ConnectionStateCompleted ICE agent has finished
ConnectionStateCompleted
// Failed ICE agent never could sucessfully connect
Failed
// ConnectionStateFailed ICE agent never could sucessfully connect
ConnectionStateFailed
// Failed ICE agent connected sucessfully, but has entered a failed state
Disconnected
// ConnectionStateDisconnected ICE agent connected sucessfully, but has entered a failed state
ConnectionStateDisconnected
// Closed ICE agent has finished and is no longer handling requests
Closed
// ConnectionStateClosed ICE agent has finished and is no longer handling requests
ConnectionStateClosed
)
func (c ConnectionState) String() string {
switch c {
case New:
case ConnectionStateNew:
return "New"
case Checking:
case ConnectionStateChecking:
return "Checking"
case Connected:
case ConnectionStateConnected:
return "Connected"
case Completed:
case ConnectionStateCompleted:
return "Completed"
case Failed:
case ConnectionStateFailed:
return "Failed"
case Disconnected:
case ConnectionStateDisconnected:
return "Disconnected"
case Closed:
case ConnectionStateClosed:
return "Closed"
default:
return "Invalid"
}
}
// GatheringState describes the state of the candidate gathering process
type GatheringState int
const (
// GatheringStateNew indicates candidate gatering is not yet started
GatheringStateNew GatheringState = iota + 1
// GatheringStateGathering indicates candidate gatering is ongoing
GatheringStateGathering
// GatheringStateComplete indicates candidate gatering has been completed
GatheringStateComplete
)
func (t GatheringState) String() string {
switch t {
case GatheringStateNew:
return "new"
case GatheringStateGathering:
return "gathering"
case GatheringStateComplete:
return "complete"
default:
return "Unknown"
}
}
// HostInterfaces generates a slice of all the IPs associated with interfaces
func HostInterfaces() (ips []string) {
ifaces, err := net.Interfaces()

View File

@@ -1,68 +1,133 @@
package webrtc
import (
"fmt"
"strings"
"time"
"github.com/pions/webrtc/pkg/ice"
)
// RTCCredentialType specifies the type of credentials provided
type RTCCredentialType string
// RTCICECredentialType indicates the type of credentials used to connect to an ICE server
type RTCICECredentialType int
var (
// RTCCredentialTypePassword describes username+pasword based credentials
RTCCredentialTypePassword RTCCredentialType = "password"
// RTCCredentialTypeToken describes token based credentials
RTCCredentialTypeToken RTCCredentialType = "token"
const (
// RTCICECredentialTypePassword describes username+pasword based credentials
RTCICECredentialTypePassword RTCICECredentialType = iota + 1
// RTCICECredentialTypeOauth describes token based credentials
RTCICECredentialTypeOauth
)
// RTCServerType is used to identify different ICE server types
type RTCServerType string
func (t RTCICECredentialType) String() string {
switch t {
case RTCICECredentialTypePassword:
return "password"
case RTCICECredentialTypeOauth:
return "oauth"
default:
return "Unknown"
}
}
var (
// RTCServerTypeSTUN is used to identify STUN servers. Prefix is stun:
RTCServerTypeSTUN RTCServerType = "STUN"
// RTCServerTypeTURN is used to identify TURN servers. Prefix is turn:
RTCServerTypeTURN RTCServerType = "TURN"
// RTCServerTypeUnknown is used when an ICE server can not be identified properly.
RTCServerTypeUnknown RTCServerType = "UnknownType"
)
// RTCCertificate represents a certificate used to authenticate WebRTC communications.
type RTCCertificate struct {
expires time.Time
// TODO: Finish during DTLS implementation
}
// Equals determines if two certificates are identical
func (c RTCCertificate) Equals(other RTCCertificate) bool {
return c.expires == other.expires
}
// RTCICEServer describes a single ICE server, as well as required credentials
type RTCICEServer struct {
CredentialType RTCCredentialType
URLs []string
Username string
Credential string
Credential RTCICECredential
CredentialType RTCICECredentialType
}
// RTCICECredential represents credentials used to connect to an ICE server
// Two types of credentials are supported:
// - Password (type string)
// - Password (type RTCOAuthCredential)
type RTCICECredential interface{}
// RTCOAuthCredential represents OAuth credentials used to connect to an ICE server
type RTCOAuthCredential struct {
MacKey string
AccessToken string
}
// RTCICETransportPolicy defines the ICE candidate policy [JSEP] (section 3.5.3.) used to surface the permitted candidates
type RTCICETransportPolicy int
const (
stunPrefix = "stun:"
stunsPrefix = "stuns:"
turnPrefix = "turn:"
turnsPrefix = "turns:"
// RTCICETransportPolicyRelay indicates only media relay candidates such as candidates passing through a TURN server are used
RTCICETransportPolicyRelay RTCICETransportPolicy = iota + 1
// RTCICETransportPolicyAll indicates any type of candidate is used
RTCICETransportPolicyAll
)
func (c RTCICEServer) serverType() RTCServerType {
for _, url := range c.URLs {
if strings.HasPrefix(url, stunPrefix) || strings.HasPrefix(url, stunsPrefix) {
return RTCServerTypeSTUN
func (t RTCICETransportPolicy) String() string {
switch t {
case RTCICETransportPolicyRelay:
return "relay"
case RTCICETransportPolicyAll:
return "all"
default:
return "Unknown"
}
if strings.HasPrefix(url, turnPrefix) || strings.HasPrefix(url, turnsPrefix) {
return RTCServerTypeTURN
}
}
return RTCServerTypeUnknown
}
func protocolAndHost(url string) (string, string, error) {
if strings.HasPrefix(url, stunPrefix) {
return "udp", url[len(stunPrefix):], nil
// RTCBundlePolicy affects which media tracks are negotiated if the remote endpoint is not bundle-aware, and what ICE candidates are gathered.
type RTCBundlePolicy int
const (
// RTCRtcpMuxPolicyBalanced indicates to gather ICE candidates for each media type in use (audio, video, and data).
RTCRtcpMuxPolicyBalanced RTCBundlePolicy = iota + 1
// RTCRtcpMuxPolicyMaxCompat indicates to gather ICE candidates for each track.
RTCRtcpMuxPolicyMaxCompat
// RTCRtcpMuxPolicyMaxBundle indicates to gather ICE candidates for only one track.
RTCRtcpMuxPolicyMaxBundle
)
func (t RTCBundlePolicy) String() string {
switch t {
case RTCRtcpMuxPolicyBalanced:
return "balanced"
case RTCRtcpMuxPolicyMaxCompat:
return "max-compat"
case RTCRtcpMuxPolicyMaxBundle:
return "max-bundle"
default:
return "Unknown"
}
if strings.HasPrefix(url, stunsPrefix) {
return "tcp", url[len(stunsPrefix):], nil
}
// RTCRtcpMuxPolicy affects what ICE candidates are gathered to support non-multiplexed RTCP
type RTCRtcpMuxPolicy int
const (
// RTCRtcpMuxPolicyNegotiate indicates to gather ICE candidates for both RTP and RTCP candidates.
RTCRtcpMuxPolicyNegotiate RTCRtcpMuxPolicy = iota + 1
// RTCRtcpMuxPolicyRequire indicates to gather ICE candidates only for RTP and multiplex RTCP on the RTP candidates
RTCRtcpMuxPolicyRequire
)
func (t RTCRtcpMuxPolicy) String() string {
switch t {
case RTCRtcpMuxPolicyNegotiate:
return "negotiate"
case RTCRtcpMuxPolicyRequire:
return "require"
default:
return "Unknown"
}
// TODO TURN urls
return "", "", fmt.Errorf("Unknown protocol in URL %q", url)
}
// RTCConfiguration contains RTCPeerConfiguration options
@@ -71,4 +136,152 @@ type RTCConfiguration struct {
// these are typically STUN and/or TURN servers. If this isn't specified, the ICE agent may choose to use its own ICE servers;
// otherwise, the connection attempt will be made with no STUN or TURN server available, which limits the connection to local peers.
ICEServers []RTCICEServer
ICETransportPolicy RTCICETransportPolicy
BundlePolicy RTCBundlePolicy
RtcpMuxPolicy RTCRtcpMuxPolicy
PeerIdentity string
Certificates []RTCCertificate
ICECandidatePoolSize uint8
}
// SetConfiguration updates the configuration of the RTCPeerConnection
func (r *RTCPeerConnection) SetConfiguration(config RTCConfiguration) error {
err := r.validatePeerIdentity(config)
if err != nil {
return err
}
err = r.validateCertificates(config)
if err != nil {
return err
}
err = r.validateBundlePolicy(config)
if err != nil {
return err
}
err = r.validateRtcpMuxPolicy(config)
if err != nil {
return err
}
err = r.validateICECandidatePoolSize(config)
if err != nil {
return err
}
err = r.setICEServers(config)
if err != nil {
return err
}
r.config = config
return nil
}
func (r *RTCPeerConnection) validatePeerIdentity(config RTCConfiguration) error {
current := r.config
if current.PeerIdentity != "" &&
config.PeerIdentity != "" &&
config.PeerIdentity != current.PeerIdentity {
return &InvalidModificationError{Err: ErrModPeerIdentity}
}
return nil
}
func (r *RTCPeerConnection) validateCertificates(config RTCConfiguration) error {
current := r.config
if len(current.Certificates) > 0 &&
len(config.Certificates) > 0 {
if len(config.Certificates) != len(current.Certificates) {
return &InvalidModificationError{Err: ErrModCertificates}
}
for i, cert := range config.Certificates {
if !current.Certificates[i].Equals(cert) {
return &InvalidModificationError{Err: ErrModCertificates}
}
}
}
now := time.Now()
for _, cert := range config.Certificates {
if now.After(cert.expires) {
return &InvalidAccessError{Err: ErrCertificateExpired}
}
// TODO: Check certificate 'origin'
}
return nil
}
func (r *RTCPeerConnection) validateBundlePolicy(config RTCConfiguration) error {
current := r.config
if config.BundlePolicy != current.BundlePolicy {
return &InvalidModificationError{Err: ErrModRtcpMuxPolicy}
}
return nil
}
func (r *RTCPeerConnection) validateRtcpMuxPolicy(config RTCConfiguration) error {
current := r.config
if config.RtcpMuxPolicy != current.RtcpMuxPolicy {
return &InvalidModificationError{Err: ErrModRtcpMuxPolicy}
}
return nil
}
func (r *RTCPeerConnection) validateICECandidatePoolSize(config RTCConfiguration) error {
current := r.config
if r.LocalDescription != nil &&
config.ICECandidatePoolSize != current.ICECandidatePoolSize {
return &InvalidModificationError{Err: ErrModIceCandidatePoolSize}
}
return nil
}
func (r *RTCPeerConnection) setICEServers(config RTCConfiguration) error {
if len(config.ICEServers) > 0 {
var servers [][]ice.URL
for _, server := range config.ICEServers {
var urls []ice.URL
for _, rawURL := range server.URLs {
url, err := parseICEServer(server, rawURL)
if err != nil {
return err
}
urls = append(urls, url)
}
if len(urls) > 0 {
servers = append(servers, urls)
}
}
if len(servers) > 0 {
r.iceAgent.SetServers(servers)
}
}
return nil
}
func parseICEServer(server RTCICEServer, rawURL string) (ice.URL, error) {
iceurl, err := ice.NewURL(rawURL)
if err != nil {
return iceurl, &SyntaxError{Err: err}
}
_, isPass := server.Credential.(string)
_, isOauth := server.Credential.(RTCOAuthCredential)
noPass := !isPass && !isOauth
if iceurl.Type == ice.ServerTypeTURN {
if server.Username == "" ||
noPass {
return iceurl, &InvalidAccessError{Err: ErrNoTurnCred}
}
if server.CredentialType == RTCICECredentialTypePassword &&
!isPass {
return iceurl, &InvalidAccessError{Err: ErrTurnCred}
}
if server.CredentialType == RTCICECredentialTypeOauth &&
!isOauth {
return iceurl, &InvalidAccessError{Err: ErrTurnCred}
}
}
return iceurl, nil
}

View File

@@ -1,44 +0,0 @@
package webrtc
import "testing"
func TestRTCICEServer_isStun(t *testing.T) {
testCases := []struct {
expectedType RTCServerType
server RTCICEServer
}{
{RTCServerTypeSTUN, RTCICEServer{URLs: []string{"stun:google.de"}}},
{RTCServerTypeTURN, RTCICEServer{URLs: []string{"turn:google.de"}}},
{RTCServerTypeUnknown, RTCICEServer{URLs: []string{"google.de"}}},
}
for _, testCase := range testCases {
if serverType := testCase.server.serverType(); serverType != testCase.expectedType {
t.Errorf("Expected %q to be %s, but got %s", testCase.server.URLs, testCase.expectedType, serverType)
}
}
}
func TestPortAndHost(t *testing.T) {
testCases := []struct {
url string
expectedHost string
expectedProtocol string
}{
{"stun:stun.l.google.com:19302", "stun.l.google.com:19302", "udp"},
{"stuns:stun.l.google.com:19302", "stun.l.google.com:19302", "tcp"},
}
for _, testCase := range testCases {
proto, host, err := protocolAndHost(testCase.url)
if err != nil {
t.Fatalf("unable to get proto and host: %v", err)
}
if proto != testCase.expectedProtocol {
t.Fatalf("expected %s, got %s", testCase.expectedProtocol, proto)
}
if host != testCase.expectedHost {
t.Fatalf("expected %s, got %s", testCase.expectedHost, host)
}
}
}

View File

@@ -3,18 +3,14 @@ package webrtc
import (
"fmt"
"math/rand"
"net"
"sync"
"time"
"github.com/pions/pkg/stun"
"github.com/pions/webrtc/internal/dtls"
"github.com/pions/webrtc/internal/network"
"github.com/pions/webrtc/internal/sdp"
"github.com/pions/webrtc/internal/util"
"github.com/pions/webrtc/pkg/ice"
"github.com/pions/webrtc/pkg/rtp"
"github.com/pions/webrtc/pkg/rtp/codecs"
"github.com/pkg/errors"
)
@@ -23,212 +19,119 @@ func init() {
rand.Seed(time.Now().UTC().UnixNano())
}
// RTCSample contains media, and the amount of samples in it
type RTCSample struct {
Data []byte
Samples uint32
}
// RTCPeerConnectionState indicates the state of the peer connection
type RTCPeerConnectionState int
// TrackType determines the type of media we are sending receiving
type TrackType int
// List of supported TrackTypes
const (
VP8 TrackType = iota + 1
VP9
H264
Opus
// RTCPeerConnectionStateNew indicates some of the ICE or DTLS transports are in status new
RTCPeerConnectionStateNew RTCPeerConnectionState = iota + 1
// RTCPeerConnectionStateConnecting indicates some of the ICE or DTLS transports are in status connecting or checking
RTCPeerConnectionStateConnecting
// RTCPeerConnectionStateConnected indicates all of the ICE or DTLS transports are in status connected or completed
RTCPeerConnectionStateConnected
// RTCPeerConnectionStateDisconnected indicates some of the ICE or DTLS transports are in status disconnected
RTCPeerConnectionStateDisconnected
// RTCPeerConnectionStateFailed indicates some of the ICE or DTLS transports are in status failed
RTCPeerConnectionStateFailed
// RTCPeerConnectionStateClosed indicates the peer connection is closed
RTCPeerConnectionStateClosed
)
func (t TrackType) String() string {
func (t RTCPeerConnectionState) String() string {
switch t {
case VP8:
return "VP8"
case VP9:
return "VP9"
case H264:
return "H264"
case Opus:
return "Opus"
case RTCPeerConnectionStateNew:
return "new"
case RTCPeerConnectionStateConnecting:
return "connecting"
case RTCPeerConnectionStateConnected:
return "connected"
case RTCPeerConnectionStateDisconnected:
return "disconnected"
case RTCPeerConnectionStateFailed:
return "failed"
case RTCPeerConnectionStateClosed:
return "closed"
default:
return "Unknown"
}
}
// New creates a new RTCPeerConfiguration with the provided configuration
func New(config *RTCConfiguration) (*RTCPeerConnection, error) {
return &RTCPeerConnection{
config: config,
}, nil
}
// RTCPeerConnection represents a WebRTC connection between itself and a remote peer
type RTCPeerConnection struct {
Ontrack func(mediaType TrackType, buffers <-chan *rtp.Packet)
LocalDescription *sdp.SessionDescription
// ICE
OnICEConnectionStateChange func(iceConnectionState ice.ConnectionState)
config *RTCConfiguration
config RTCConfiguration
tlscfg *dtls.TLSCfg
iceUfrag string
icePwd string
// ICE: TODO: Move to ICEAgent
iceAgent *ice.Agent
iceState ice.ConnectionState
iceGatheringState ice.GatheringState
iceConnectionState ice.ConnectionState
portsLock sync.RWMutex
ports []*network.Port
// Signaling
// pendingLocalDescription *RTCSessionDescription
// currentLocalDescription *RTCSessionDescription
LocalDescription *sdp.SessionDescription
// pendingRemoteDescription *RTCSessionDescription
currentRemoteDescription *RTCSessionDescription
remoteDescription *sdp.SessionDescription
localTracks []*sdp.SessionBuilderTrack
idpLoginURL *string
IsClosed bool
NegotiationNeeded bool
// lastOffer string
// lastAnswer string
signalingState RTCSignalingState
connectionState RTCPeerConnectionState
// Media
rtpTransceivers []*RTCRtpTransceiver
Ontrack func(*RTCTrack)
}
// New creates a new RTCPeerConfiguration with the provided configuration
func New(config RTCConfiguration) (*RTCPeerConnection, error) {
r := &RTCPeerConnection{
config: config,
signalingState: RTCSignalingStateStable,
iceAgent: ice.NewAgent(),
iceGatheringState: ice.GatheringStateNew,
iceConnectionState: ice.ConnectionStateNew,
connectionState: RTCPeerConnectionStateNew,
}
err := r.SetConfiguration(config)
if err != nil {
return nil, err
}
r.tlscfg = dtls.NewTLSCfg()
// TODO: Initialize ICE Agent
return r, nil
}
// Public
// SetRemoteDescription sets the SessionDescription of the remote peer
func (r *RTCPeerConnection) SetRemoteDescription(rawSessionDescription string) error {
if r.remoteDescription != nil {
return errors.Errorf("remoteDescription is already defined, SetRemoteDescription can only be called once")
}
r.remoteDescription = &sdp.SessionDescription{}
return r.remoteDescription.Unmarshal(rawSessionDescription)
}
// CreateOffer starts the RTCPeerConnection and generates the localDescription
func (r *RTCPeerConnection) CreateOffer() error {
return errors.Errorf("CreateOffer is not implemented")
}
// CreateAnswer starts the RTCPeerConnection and generates the localDescription
func (r *RTCPeerConnection) CreateAnswer() error {
if r.tlscfg != nil {
return errors.Errorf("tlscfg is already defined, CreateOffer can only be called once")
}
r.tlscfg = dtls.NewTLSCfg()
r.iceUfrag = util.RandSeq(16)
r.icePwd = util.RandSeq(32)
r.portsLock.Lock()
defer r.portsLock.Unlock()
candidates := []string{}
basePriority := uint16(rand.Uint32() & (1<<16 - 1))
for _, c := range ice.HostInterfaces() {
port, err := network.NewPort(c+":0", []byte(r.icePwd), r.tlscfg, r.generateChannel, r.iceStateChange)
if err != nil {
return err
}
candidates = append(candidates, fmt.Sprintf("candidate:udpcandidate 1 udp %d %s %d typ host", basePriority, c, port.ListeningAddr.Port))
basePriority = basePriority + 1
r.ports = append(r.ports, port)
}
if r.config != nil {
for _, server := range r.config.ICEServers {
if server.serverType() != RTCServerTypeSTUN {
continue
}
for _, iceURL := range server.URLs {
proto, host, err := protocolAndHost(iceURL)
// TODO if one of the URLs does not work we should just ignore it.
if err != nil {
return errors.Wrapf(err, "Failed to parse ICE URL")
}
// TODO Do we want the timeout to be configurable?
client, err := stun.NewClient(proto, host, time.Second*5)
if err != nil {
return errors.Wrapf(err, "Failed to create STUN client")
}
localAddr, ok := client.LocalAddr().(*net.UDPAddr)
if !ok {
return errors.Errorf("Failed to cast STUN client to UDPAddr")
}
resp, err := client.Request()
if err != nil {
return errors.Wrapf(err, "Failed to make STUN request")
}
if err := client.Close(); err != nil {
return errors.Wrapf(err, "Failed to close STUN client")
}
attr, ok := resp.GetOneAttribute(stun.AttrXORMappedAddress)
if !ok {
return errors.Errorf("Got respond from STUN server that did not contain XORAddress")
}
var addr stun.XorAddress
if err := addr.Unpack(resp, attr); err != nil {
return errors.Wrapf(err, "Failed to unpack STUN XorAddress response")
}
port, err := network.NewPort(fmt.Sprintf("0.0.0.0:%d", localAddr.Port), []byte(r.icePwd), r.tlscfg, r.generateChannel, r.iceStateChange)
if err != nil {
return errors.Wrapf(err, "Failed to build network/port")
}
candidates = append(candidates, fmt.Sprintf("candidate:%scandidate 1 %s %d %s %d typ srflx", proto, proto, basePriority, addr.IP.String(), localAddr.Port))
basePriority = basePriority + 1
r.ports = append(r.ports, port)
}
}
}
r.LocalDescription = sdp.BaseSessionDescription(&sdp.SessionBuilder{
IceUsername: r.iceUfrag,
IcePassword: r.icePwd,
Fingerprint: r.tlscfg.Fingerprint(),
Candidates: candidates,
Tracks: r.localTracks,
})
return nil
}
// AddTrack adds a new track to the RTCPeerConnection
// This function returns a channel to push buffers on, and an error if the channel can't be added
// Closing the channel ends this stream
func (r *RTCPeerConnection) AddTrack(mediaType TrackType, clockRate uint32) (samples chan<- RTCSample, err error) {
if mediaType != VP8 && mediaType != H264 && mediaType != Opus {
panic("TODO Discarding packet, need media parsing")
}
trackInput := make(chan RTCSample, 15)
go func() {
ssrc := rand.Uint32()
sdpTrack := &sdp.SessionBuilderTrack{SSRC: ssrc}
var payloader rtp.Payloader
var payloadType uint8
switch mediaType {
case Opus:
sdpTrack.IsAudio = true
payloader = &codecs.OpusPayloader{}
payloadType = 111
case VP8:
payloader = &codecs.VP8Payloader{}
payloadType = 96
case H264:
payloader = &codecs.H264Payloader{}
payloadType = 100
}
r.localTracks = append(r.localTracks, sdpTrack)
packetizer := rtp.NewPacketizer(1400, payloadType, ssrc, payloader, rtp.NewRandomSequencer(), clockRate)
for {
in := <-trackInput
packets := packetizer.Packetize(in.Data, in.Samples)
for _, p := range packets {
for _, port := range r.ports {
port.Send(p)
}
}
}
}()
return trackInput, nil
// SetIdentityProvider is used to configure an identity provider to generate identity assertions
func (r *RTCPeerConnection) SetIdentityProvider(provider string) error {
panic("TODO SetIdentityProvider")
}
// Close ends the RTCPeerConnection
@@ -252,29 +155,32 @@ func (r *RTCPeerConnection) generateChannel(ssrc uint32, payloadType uint8) (buf
return nil
}
var codec TrackType
ok, codecStr := sdp.GetCodecForPayloadType(payloadType, r.remoteDescription)
if !ok {
sdpCodec, err := r.remoteDescription.GetCodecForPayloadType(payloadType)
if err != nil {
fmt.Printf("No codec could be found in RemoteDescription for payloadType %d \n", payloadType)
return nil
}
switch codecStr {
case "VP8":
codec = VP8
case "VP9":
codec = VP9
case "opus":
codec = Opus
case "H264":
codec = H264
default:
fmt.Printf("Codec %s in not supported by pion-WebRTC \n", codecStr)
return nil
codec, err := rtcMediaEngine.getCodecSDP(sdpCodec)
if err != nil {
fmt.Printf("Codec %s in not registered\n", sdpCodec)
}
bufferTransport := make(chan *rtp.Packet, 15)
go r.Ontrack(codec, bufferTransport)
track := &RTCTrack{
PayloadType: payloadType,
Kind: codec.Type,
ID: "0", // TODO extract from remoteDescription
Label: "", // TODO extract from remoteDescription
Ssrc: ssrc,
Codec: codec,
Packets: bufferTransport,
}
// TODO: Register the receiving Track
go r.Ontrack(track)
return bufferTransport
}
@@ -287,7 +193,7 @@ func (r *RTCPeerConnection) iceStateChange(p *network.Port) {
r.iceState = newState
}
if p.ICEState == ice.Failed {
if p.ICEState == ice.ConnectionStateFailed {
if err := p.Close(); err != nil {
fmt.Println(errors.Wrap(err, "Failed to close Port when ICE went to failed"))
}
@@ -301,9 +207,9 @@ func (r *RTCPeerConnection) iceStateChange(p *network.Port) {
}
if len(r.ports) == 0 {
updateAndNotify(ice.Disconnected)
updateAndNotify(ice.ConnectionStateDisconnected)
}
} else {
updateAndNotify(ice.Connected)
updateAndNotify(ice.ConnectionStateConnected)
}
}

407
signaling.go Normal file
View File

@@ -0,0 +1,407 @@
package webrtc
import (
"fmt"
"math/rand"
"net"
"time"
"github.com/pions/pkg/stun"
"github.com/pions/webrtc/internal/network"
"github.com/pions/webrtc/internal/sdp"
"github.com/pions/webrtc/pkg/ice"
"github.com/pkg/errors"
)
/*
setRemote(OFFER) setLocal(PRANSWER)
/-----\ /-----\
| | | |
v | v |
+---------------+ | +---------------+ |
| |----/ | |----/
| have- | setLocal(PRANSWER) | have- |
| remote-offer |------------------- >| local-pranswer|
| | | |
| | | |
+---------------+ +---------------+
^ | |
| | setLocal(ANSWER) |
setRemote(OFFER) | |
| V setLocal(ANSWER) |
+---------------+ |
| | |
| |<---------------------------+
| stable |
| |<---------------------------+
| | |
+---------------+ setRemote(ANSWER) |
^ | |
| | setLocal(OFFER) |
setRemote(ANSWER) | |
| V |
+---------------+ +---------------+
| | | |
| have- | setRemote(PRANSWER) |have- |
| local-offer |------------------- >|remote-pranswer|
| | | |
| |----\ | |----\
+---------------+ | +---------------+ |
^ | ^ |
| | | |
\-----/ \-----/
setLocal(OFFER) setRemote(PRANSWER)
*/
// RTCSignalingState indicates the state of the offer/answer process
type RTCSignalingState int
const (
// RTCSignalingStateStable indicates there is no offer­answer exchange in progress.
RTCSignalingStateStable RTCSignalingState = iota + 1
// RTCSignalingStateHaveLocalOffer indicates A local description, of type "offer", has been successfully applied.
RTCSignalingStateHaveLocalOffer
// RTCSignalingStateHaveRemoteOffer indicates A remote description, of type "offer", has been successfully applied.
RTCSignalingStateHaveRemoteOffer
// RTCSignalingStateHaveLocalPranswer indicates A remote description of type "offer" has been successfully applied and a local description of type "pranswer" has been successfully applied.
RTCSignalingStateHaveLocalPranswer
// RTCSignalingStateHaveRemotePranswer indicates A local description of type "offer" has been successfully applied and a remote description of type "pranswer" has been successfully applied.
RTCSignalingStateHaveRemotePranswer
// RTCSignalingStateClosed indicates The RTCPeerConnection has been closed.
RTCSignalingStateClosed
)
func (t RTCSignalingState) String() string {
switch t {
case RTCSignalingStateStable:
return "stable"
case RTCSignalingStateHaveLocalOffer:
return "have-local-offer"
case RTCSignalingStateHaveRemoteOffer:
return "have-remote-offer"
case RTCSignalingStateHaveLocalPranswer:
return "have-local-pranswer"
case RTCSignalingStateHaveRemotePranswer:
return "have-remote-pranswer"
case RTCSignalingStateClosed:
return "closed"
default:
return "Unknown"
}
}
// RTCSdpType describes the type of an RTCSessionDescription
type RTCSdpType int
const (
// RTCSdpTypeOffer indicates that a description MUST be treated as an SDP offer.
RTCSdpTypeOffer RTCSdpType = iota + 1
// RTCSdpTypePranswer indicates that a description MUST be treated as an SDP answer, but not a final answer.
RTCSdpTypePranswer
// RTCSdpTypeAnswer indicates that a description MUST be treated as an SDP final answer, and the offer-answer exchange MUST be considered complete.
RTCSdpTypeAnswer
// RTCSdpTypeRollback indicates that a description MUST be treated as canceling the current SDP negotiation and moving the SDP offer and answer back to what it was in the previous stable state.
RTCSdpTypeRollback
)
func (t RTCSdpType) String() string {
switch t {
case RTCSdpTypeOffer:
return "offer"
case RTCSdpTypePranswer:
return "pranswer"
case RTCSdpTypeAnswer:
return "answer"
case RTCSdpTypeRollback:
return "rollback"
default:
return "Unknown"
}
}
// RTCSessionDescription is used to expose local and remote session descriptions.
type RTCSessionDescription struct {
Typ RTCSdpType
Sdp string
}
// SetRemoteDescription sets the SessionDescription of the remote peer
func (r *RTCPeerConnection) SetRemoteDescription(desc RTCSessionDescription) error {
if r.remoteDescription != nil {
return errors.Errorf("remoteDescription is already defined, SetRemoteDescription can only be called once")
}
r.currentRemoteDescription = &desc
r.remoteDescription = &sdp.SessionDescription{}
return r.remoteDescription.Unmarshal(desc.Sdp)
}
// RTCOfferOptions describes the options used to control the offer creation process
type RTCOfferOptions struct {
VoiceActivityDetection bool
IceRestart bool
}
// CreateOffer starts the RTCPeerConnection and generates the localDescription
func (r *RTCPeerConnection) CreateOffer(options *RTCOfferOptions) (RTCSessionDescription, error) {
if options != nil {
panic("TODO handle options")
}
if r.IsClosed {
return RTCSessionDescription{}, &InvalidStateError{Err: ErrConnectionClosed}
}
useIdentity := r.idpLoginURL != nil
if useIdentity {
panic("TODO handle identity provider")
}
d := sdp.NewJSEPSessionDescription(
r.tlscfg.Fingerprint(),
useIdentity).
WithValueAttribute(sdp.AttrKeyGroup, "BUNDLE audio video") // TODO: Support BUNDLE
var streamlabels string
for _, tranceiver := range r.rtpTransceivers {
if tranceiver.Sender == nil ||
tranceiver.Sender.Track == nil {
continue
}
track := tranceiver.Sender.Track
cname := "poins" // TODO: Support RTP streams synchronisation
steamlabel := "poins" // TODO: Support steam labels
codec, err := rtcMediaEngine.getCodec(track.PayloadType)
if err != nil {
return RTCSessionDescription{}, err
}
media := sdp.NewJSEPMediaDescription(track.Kind.String(), []string{}).
WithValueAttribute(sdp.AttrKeyConnectionSetup, sdp.ConnectionRoleActive.String()). // TODO: Support other connection types
WithValueAttribute(sdp.AttrKeyMID, tranceiver.Mid).
WithPropertyAttribute(tranceiver.Direction.String()).
WithICECredentials(r.iceAgent.Ufrag, r.iceAgent.Pwd).
WithPropertyAttribute(sdp.AttrKeyICELite). // TODO: get ICE type from ICE Agent
WithPropertyAttribute(sdp.AttrKeyRtcpMux). // TODO: support RTCP fallback
WithPropertyAttribute(sdp.AttrKeyRtcpRsize). // TODO: Support Reduced-Size RTCP?
WithCodec(
codec.PayloadType,
codec.Name,
codec.ClockRate,
codec.Channels,
codec.SdpFmtpLine,
).
WithMediaSource(track.Ssrc, cname, steamlabel, track.Label)
err = r.addICECandidates(media)
if err != nil {
return RTCSessionDescription{}, err
}
streamlabels = streamlabels + " " + steamlabel
d.WithMedia(media)
}
d.WithValueAttribute(sdp.AttrKeyMsidSemantic, " "+sdp.SemanticTokenWebRTCMediaStreams+streamlabels)
return RTCSessionDescription{
Typ: RTCSdpTypeOffer,
Sdp: d.Marshal(),
}, nil
}
// RTCAnswerOptions describes the options used to control the answer creation process
type RTCAnswerOptions struct {
VoiceActivityDetection bool
}
// CreateAnswer starts the RTCPeerConnection and generates the localDescription
func (r *RTCPeerConnection) CreateAnswer(options *RTCOfferOptions) (RTCSessionDescription, error) {
if options != nil {
panic("TODO handle options")
}
if r.IsClosed {
return RTCSessionDescription{}, &InvalidStateError{Err: ErrConnectionClosed}
}
useIdentity := r.idpLoginURL != nil
if useIdentity {
panic("TODO handle identity provider")
}
d := sdp.NewJSEPSessionDescription(
r.tlscfg.Fingerprint(),
useIdentity).
WithValueAttribute(sdp.AttrKeyGroup, "BUNDLE audio video") // TODO: Support BUNDLE
// TODO: Actually reply based on the offer (#21)
// TODO: Media lines should be in the same order as the offer
audioStreamLabels, err := r.addAnswerMedia(d, RTCRtpCodecTypeAudio)
if err != nil {
return RTCSessionDescription{}, err
}
videoStreamLabels, err := r.addAnswerMedia(d, RTCRtpCodecTypeVideo)
if err != nil {
return RTCSessionDescription{}, err
}
d.WithValueAttribute(sdp.AttrKeyMsidSemantic, " "+sdp.SemanticTokenWebRTCMediaStreams+audioStreamLabels+videoStreamLabels)
return RTCSessionDescription{
Typ: RTCSdpTypeAnswer,
Sdp: d.Marshal(),
}, nil
}
func (r *RTCPeerConnection) addAnswerMedia(d *sdp.SessionDescription, typ RTCRtpCodecType) (string, error) {
added := false
var streamlabels string
for _, tranceiver := range r.rtpTransceivers {
if tranceiver.Sender == nil ||
tranceiver.Sender.Track == nil ||
tranceiver.Sender.Track.Kind != typ {
continue
}
track := tranceiver.Sender.Track
cname := track.Label // TODO: Support RTP streams synchronisation
steamlabel := track.Label // TODO: Support steam labels
codec, err := rtcMediaEngine.getCodec(track.PayloadType)
if err != nil {
return "", err
}
media := sdp.NewJSEPMediaDescription(track.Kind.String(), []string{}).
WithValueAttribute(sdp.AttrKeyConnectionSetup, sdp.ConnectionRoleActive.String()). // TODO: Support other connection types
WithValueAttribute(sdp.AttrKeyMID, tranceiver.Mid).
WithPropertyAttribute(tranceiver.Direction.String()).
WithICECredentials(r.iceAgent.Ufrag, r.iceAgent.Pwd).
WithPropertyAttribute(sdp.AttrKeyICELite). // TODO: get ICE type from ICE Agent
WithPropertyAttribute(sdp.AttrKeyRtcpMux). // TODO: support RTCP fallback
WithPropertyAttribute(sdp.AttrKeyRtcpRsize). // TODO: Support Reduced-Size RTCP?
WithCodec(
codec.PayloadType,
codec.Name,
codec.ClockRate,
codec.Channels,
codec.SdpFmtpLine,
).
WithMediaSource(track.Ssrc, cname, steamlabel, track.Label)
err = r.addICECandidates(media)
if err != nil {
return "", err
}
d.WithMedia(media)
streamlabels = streamlabels + " " + steamlabel
added = true
}
if !added {
// Add media line to advertise capabilities
media := sdp.NewJSEPMediaDescription(typ.String(), []string{}).
WithValueAttribute(sdp.AttrKeyConnectionSetup, sdp.ConnectionRoleActive.String()). // TODO: Support other connection types
WithValueAttribute(sdp.AttrKeyMID, typ.String()).
WithPropertyAttribute(RTCRtpTransceiverDirectionSendrecv.String()).
WithICECredentials(r.iceAgent.Ufrag, r.iceAgent.Pwd). // TODO: get credendials form ICE agent
WithPropertyAttribute(sdp.AttrKeyICELite). // TODO: get ICE type from ICE Agent (#23)
WithPropertyAttribute(sdp.AttrKeyRtcpMux). // TODO: support RTCP fallback
WithPropertyAttribute(sdp.AttrKeyRtcpRsize) // TODO: Support Reduced-Size RTCP?
for _, codec := range rtcMediaEngine.getCodecsByKind(typ) {
media.WithCodec(
codec.PayloadType,
codec.Name,
codec.ClockRate,
codec.Channels,
codec.SdpFmtpLine,
)
}
err := r.addICECandidates(media)
if err != nil {
return "", err
}
d.WithMedia(media)
}
return streamlabels, nil
}
func (r *RTCPeerConnection) addICECandidates(d *sdp.MediaDescription) error {
r.portsLock.Lock()
defer r.portsLock.Unlock()
// TODO: Move gathering to ice.Agent
basePriority := uint16(rand.Uint32() & (1<<16 - 1))
for _, c := range ice.HostInterfaces() {
port, err := network.NewPort(c+":0", []byte(r.iceAgent.Pwd), r.tlscfg, r.generateChannel, r.iceStateChange)
if err != nil {
return err
}
d.WithCandidate(1, "udp", basePriority, c, port.ListeningAddr.Port, "host")
basePriority = basePriority + 1
r.ports = append(r.ports, port)
}
for _, servers := range r.iceAgent.Servers {
for _, server := range servers {
if server.Type != ice.ServerTypeSTUN {
continue
}
// TODO Do we want the timeout to be configurable?
proto := server.TransportType.String()
client, err := stun.NewClient(proto, fmt.Sprintf("%s:%d", server.Host, server.Port), time.Second*5)
if err != nil {
return errors.Wrapf(err, "Failed to create STUN client")
}
localAddr, ok := client.LocalAddr().(*net.UDPAddr)
if !ok {
return errors.Errorf("Failed to cast STUN client to UDPAddr")
}
resp, err := client.Request()
if err != nil {
return errors.Wrapf(err, "Failed to make STUN request")
}
if err := client.Close(); err != nil {
return errors.Wrapf(err, "Failed to close STUN client")
}
attr, ok := resp.GetOneAttribute(stun.AttrXORMappedAddress)
if !ok {
return errors.Errorf("Got respond from STUN server that did not contain XORAddress")
}
var addr stun.XorAddress
if err := addr.Unpack(resp, attr); err != nil {
return errors.Wrapf(err, "Failed to unpack STUN XorAddress response")
}
port, err := network.NewPort(fmt.Sprintf("0.0.0.0:%d", localAddr.Port), []byte(r.iceAgent.Pwd), r.tlscfg, r.generateChannel, r.iceStateChange)
if err != nil {
return errors.Wrapf(err, "Failed to build network/port")
}
d.WithCandidate(1, proto, basePriority, addr.IP.String(), localAddr.Port, "srflx")
basePriority = basePriority + 1
r.ports = append(r.ports, port)
}
}
d.WithPropertyAttribute("end-of-candidates") // TODO: Support full trickle-ice
return nil
}

26
signaling_test.go Normal file
View File

@@ -0,0 +1,26 @@
package webrtc
import (
"testing"
"github.com/pions/webrtc/internal/sdp"
)
func TestSetRemoteDescription(t *testing.T) {
testCases := []struct {
desc RTCSessionDescription
}{
{RTCSessionDescription{RTCSdpTypeOffer, sdp.NewJSEPSessionDescription("", false).Marshal()}},
}
for i, testCase := range testCases {
peerConn, err := New(RTCConfiguration{})
if err != nil {
t.Errorf("Case %d: got error: %v", i, err)
}
err = peerConn.SetRemoteDescription(testCase.desc)
if err != nil {
t.Errorf("Case %d: got error: %v", i, err)
}
}
}