Move to new Track API

See v2.0.0 Release Notes[0] for all changes

Resolves #405

[0] https://github.com/pions/webrtc/wiki/v2.0.0-Release-Notes#media-api
This commit is contained in:
Sean DuBois
2019-02-21 14:53:35 -08:00
parent f5d11df18d
commit 6aeb3425b0
18 changed files with 715 additions and 537 deletions

View File

@@ -156,8 +156,10 @@ func TestDataChannel_MessagesAreOrdered(t *testing.T) {
out := make(chan int) out := make(chan int)
inner := func(msg DataChannelMessage) { inner := func(msg DataChannelMessage) {
// randomly sleep // randomly sleep
// NB: The big.Int/crypto.Rand is overkill but makes the linter happy // math/rand a weak RNG, but this does not need to be secure. Ignore with #nosec
/* #nosec */
randInt, err := rand.Int(rand.Reader, big.NewInt(int64(max))) randInt, err := rand.Int(rand.Reader, big.NewInt(int64(max)))
/* #nosec */
if err != nil { if err != nil {
t.Fatalf("Failed to get random sleep duration: %s", err) t.Fatalf("Failed to get random sleep duration: %s", err)
} }

View File

@@ -34,26 +34,31 @@ func gstreamerReceiveMain() {
// Set a handler for when a new remote track starts, this handler creates a gstreamer pipeline // Set a handler for when a new remote track starts, this handler creates a gstreamer pipeline
// for the given codec // for the given codec
peerConnection.OnTrack(func(track *webrtc.Track) { peerConnection.OnTrack(func(track *webrtc.Track, receiver *webrtc.RTPReceiver) {
// Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval // Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval
// This is a temporary fix until we implement incoming RTCP events, then we would push a PLI only when a viewer requests it // This is a temporary fix until we implement incoming RTCP events, then we would push a PLI only when a viewer requests it
go func() { go func() {
ticker := time.NewTicker(time.Second * 3) ticker := time.NewTicker(time.Second * 3)
for range ticker.C { for range ticker.C {
err := peerConnection.SendRTCP(&rtcp.PictureLossIndication{MediaSSRC: track.SSRC}) err := peerConnection.SendRTCP(&rtcp.PictureLossIndication{MediaSSRC: track.SSRC()})
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
} }
} }
}() }()
codec := track.Codec codec := track.Codec()
fmt.Printf("Track has started, of type %d: %s \n", track.PayloadType, codec.Name) fmt.Printf("Track has started, of type %d: %s \n", track.PayloadType(), codec.Name)
pipeline := gst.CreatePipeline(codec.Name) pipeline := gst.CreatePipeline(codec.Name)
pipeline.Start() pipeline.Start()
buf := make([]byte, 1400)
for { for {
p := <-track.Packets i, err := track.Read(buf)
pipeline.Push(p.Raw) if err != nil {
panic(err)
}
pipeline.Push(buf[:i])
} }
}) })

View File

@@ -2,6 +2,7 @@ package main
import ( import (
"fmt" "fmt"
"math/rand"
"github.com/pions/webrtc" "github.com/pions/webrtc"
@@ -34,7 +35,7 @@ func main() {
}) })
// Create a audio track // Create a audio track
opusTrack, err := peerConnection.NewSampleTrack(webrtc.DefaultPayloadTypeOpus, "audio", "pion1") opusTrack, err := peerConnection.NewTrack(webrtc.DefaultPayloadTypeOpus, rand.Uint32(), "audio", "pion1")
if err != nil { if err != nil {
panic(err) panic(err)
} }
@@ -44,7 +45,7 @@ func main() {
} }
// Create a video track // Create a video track
vp8Track, err := peerConnection.NewSampleTrack(webrtc.DefaultPayloadTypeVP8, "video", "pion2") vp8Track, err := peerConnection.NewTrack(webrtc.DefaultPayloadTypeVP8, rand.Uint32(), "video", "pion2")
if err != nil { if err != nil {
panic(err) panic(err)
} }
@@ -79,8 +80,8 @@ func main() {
} }
// Start pushing buffers on these tracks // Start pushing buffers on these tracks
gst.CreatePipeline(webrtc.Opus, opusTrack.Samples, "audiotestsrc").Start() gst.CreatePipeline(webrtc.Opus, opusTrack, "audiotestsrc").Start()
gst.CreatePipeline(webrtc.VP8, vp8Track.Samples, "videotestsrc").Start() gst.CreatePipeline(webrtc.VP8, vp8Track, "videotestsrc").Start()
// Block forever // Block forever
select {} select {}

View File

@@ -3,6 +3,7 @@ package main
import ( import (
"flag" "flag"
"fmt" "fmt"
"math/rand"
"github.com/pions/webrtc" "github.com/pions/webrtc"
@@ -39,7 +40,7 @@ func main() {
}) })
// Create a audio track // Create a audio track
opusTrack, err := peerConnection.NewSampleTrack(webrtc.DefaultPayloadTypeOpus, "audio", "pion1") opusTrack, err := peerConnection.NewTrack(webrtc.DefaultPayloadTypeOpus, rand.Uint32(), "audio", "pion1")
if err != nil { if err != nil {
panic(err) panic(err)
} }
@@ -49,7 +50,7 @@ func main() {
} }
// Create a video track // Create a video track
vp8Track, err := peerConnection.NewSampleTrack(webrtc.DefaultPayloadTypeVP8, "video", "pion2") vp8Track, err := peerConnection.NewTrack(webrtc.DefaultPayloadTypeVP8, rand.Uint32(), "video", "pion2")
if err != nil { if err != nil {
panic(err) panic(err)
} }
@@ -84,8 +85,8 @@ func main() {
fmt.Println(signal.Encode(answer)) fmt.Println(signal.Encode(answer))
// Start pushing buffers on these tracks // Start pushing buffers on these tracks
gst.CreatePipeline(webrtc.Opus, opusTrack.Samples, *audioSrc).Start() gst.CreatePipeline(webrtc.Opus, opusTrack, *audioSrc).Start()
gst.CreatePipeline(webrtc.VP8, vp8Track.Samples, *videoSrc).Start() gst.CreatePipeline(webrtc.VP8, vp8Track, *videoSrc).Start()
// Block forever // Block forever
select {} select {}

View File

@@ -23,7 +23,7 @@ func init() {
// Pipeline is a wrapper for a GStreamer Pipeline // Pipeline is a wrapper for a GStreamer Pipeline
type Pipeline struct { type Pipeline struct {
Pipeline *C.GstElement Pipeline *C.GstElement
in chan<- media.Sample track *webrtc.Track
// stop acts as a signal that this pipeline is stopped // stop acts as a signal that this pipeline is stopped
// any pending sends to Pipeline.in should be cancelled // any pending sends to Pipeline.in should be cancelled
stop chan interface{} stop chan interface{}
@@ -35,7 +35,7 @@ var pipelines = make(map[int]*Pipeline)
var pipelinesLock sync.Mutex var pipelinesLock sync.Mutex
// CreatePipeline creates a GStreamer Pipeline // CreatePipeline creates a GStreamer Pipeline
func CreatePipeline(codecName string, in chan<- media.Sample, pipelineSrc string) *Pipeline { func CreatePipeline(codecName string, track *webrtc.Track, pipelineSrc string) *Pipeline {
pipelineStr := "appsink name=appsink" pipelineStr := "appsink name=appsink"
switch codecName { switch codecName {
case webrtc.VP8: case webrtc.VP8:
@@ -60,7 +60,7 @@ func CreatePipeline(codecName string, in chan<- media.Sample, pipelineSrc string
pipeline := &Pipeline{ pipeline := &Pipeline{
Pipeline: C.gstreamer_send_create_pipeline(pipelineStrUnsafe), Pipeline: C.gstreamer_send_create_pipeline(pipelineStrUnsafe),
in: in, track: track,
id: len(pipelines), id: len(pipelines),
codecName: codecName, codecName: codecName,
} }
@@ -105,9 +105,8 @@ func goHandlePipelineBuffer(buffer unsafe.Pointer, bufferLen C.int, duration C.i
} }
// We need to be able to cancel this function even f pipeline.in isn't being serviced // We need to be able to cancel this function even f pipeline.in isn't being serviced
// When pipeline.stop is closed the sending of data will be cancelled. // When pipeline.stop is closed the sending of data will be cancelled.
select { if err := pipeline.track.WriteSample(media.Sample{Data: C.GoBytes(buffer, bufferLen), Samples: samples}); err != nil {
case pipeline.in <- media.Sample{Data: C.GoBytes(buffer, bufferLen), Samples: samples}: panic(err)
case <-pipeline.stop:
} }
} else { } else {
fmt.Printf("discarding buffer, no pipeline with id %d", int(pipelineID)) fmt.Printf("discarding buffer, no pipeline with id %d", int(pipelineID))

View File

@@ -52,8 +52,8 @@ func main() {
fmt.Printf("Connection State has changed %s \n", connectionState.String()) fmt.Printf("Connection State has changed %s \n", connectionState.String())
}) })
peerConnection.OnTrack(func(track *webrtc.Track) { peerConnection.OnTrack(func(track *webrtc.Track, receiver *webrtc.RTPReceiver) {
if track.Codec.Name == webrtc.Opus { if track.Codec().Name == webrtc.Opus {
return return
} }
@@ -62,8 +62,14 @@ func main() {
if err != nil { if err != nil {
panic(err) panic(err)
} }
for { for {
err = i.AddPacket(<-track.Packets) packet, err := track.ReadRTP()
if err != nil {
panic(err)
}
err = i.AddPacket(packet)
if err != nil { if err != nil {
panic(err) panic(err)
} }

View File

@@ -3,6 +3,7 @@ package main
import ( import (
"fmt" "fmt"
"log" "log"
"math/rand"
janus "github.com/notedit/janus-go" janus "github.com/notedit/janus-go"
"github.com/pions/webrtc" "github.com/pions/webrtc"
@@ -54,7 +55,7 @@ func main() {
}) })
// Create a audio track // Create a audio track
opusTrack, err := peerConnection.NewSampleTrack(webrtc.DefaultPayloadTypeOpus, "audio", "pion1") opusTrack, err := peerConnection.NewTrack(webrtc.DefaultPayloadTypeOpus, rand.Uint32(), "audio", "pion1")
if err != nil { if err != nil {
panic(err) panic(err)
} }
@@ -64,7 +65,7 @@ func main() {
} }
// Create a video track // Create a video track
vp8Track, err := peerConnection.NewSampleTrack(webrtc.DefaultPayloadTypeVP8, "video", "pion2") vp8Track, err := peerConnection.NewTrack(webrtc.DefaultPayloadTypeVP8, rand.Uint32(), "video", "pion2")
if err != nil { if err != nil {
panic(err) panic(err)
} }
@@ -134,8 +135,8 @@ func main() {
} }
// Start pushing buffers on these tracks // Start pushing buffers on these tracks
gst.CreatePipeline(webrtc.Opus, opusTrack.Samples, "audiotestsrc").Start() gst.CreatePipeline(webrtc.Opus, opusTrack, "audiotestsrc").Start()
gst.CreatePipeline(webrtc.VP8, vp8Track.Samples, "videotestsrc").Start() gst.CreatePipeline(webrtc.VP8, vp8Track, "videotestsrc").Start()
} }
select {} select {}

View File

@@ -44,27 +44,32 @@ func main() {
// Set a handler for when a new remote track starts, this handler saves buffers to disk as // Set a handler for when a new remote track starts, this handler saves buffers to disk as
// an ivf file, since we could have multiple video tracks we provide a counter. // an ivf file, since we could have multiple video tracks we provide a counter.
// In your application this is where you would handle/process video // In your application this is where you would handle/process video
peerConnection.OnTrack(func(track *webrtc.Track) { peerConnection.OnTrack(func(track *webrtc.Track, receiver *webrtc.RTPReceiver) {
// Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval // Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval
// This is a temporary fix until we implement incoming RTCP events, then we would push a PLI only when a viewer requests it
go func() { go func() {
ticker := time.NewTicker(time.Second * 3) ticker := time.NewTicker(time.Second * 3)
for range ticker.C { for range ticker.C {
err := peerConnection.SendRTCP(&rtcp.PictureLossIndication{MediaSSRC: track.SSRC}) err := peerConnection.SendRTCP(&rtcp.PictureLossIndication{MediaSSRC: track.SSRC()})
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
} }
} }
}() }()
if track.Codec.Name == webrtc.VP8 { if track.Codec().Name == webrtc.VP8 {
fmt.Println("Got VP8 track, saving to disk as output.ivf") fmt.Println("Got VP8 track, saving to disk as output.ivf")
i, err := ivfwriter.New("output.ivf") i, err := ivfwriter.New("output.ivf")
if err != nil { if err != nil {
panic(err) panic(err)
} }
for { for {
err = i.AddPacket(<-track.Packets) packet, err := track.ReadRTP()
if err != nil {
panic(err)
}
err = i.AddPacket(packet)
if err != nil { if err != nil {
panic(err) panic(err)
} }

View File

@@ -7,11 +7,9 @@ import (
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"strconv" "strconv"
"sync"
"time" "time"
"github.com/pions/rtcp" "github.com/pions/rtcp"
"github.com/pions/rtp"
"github.com/pions/webrtc" "github.com/pions/webrtc"
"github.com/pions/webrtc/examples/internal/signal" "github.com/pions/webrtc/examples/internal/signal"
@@ -84,41 +82,38 @@ func main() {
panic(err) panic(err)
} }
inboundSSRC := make(chan uint32) localTrackChan := make(chan *webrtc.Track)
inboundPayloadType := make(chan uint8)
outboundRTP := []chan<- *rtp.Packet{}
var outboundRTPLock sync.RWMutex
// Set a handler for when a new remote track starts, this just distributes all our packets // Set a handler for when a new remote track starts, this just distributes all our packets
// to connected peers // to connected peers
peerConnection.OnTrack(func(track *webrtc.Track) { peerConnection.OnTrack(func(remoteTrack *webrtc.Track, receiver *webrtc.RTPReceiver) {
// Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval // Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval
// This can be less wasteful by processing incoming RTCP events, then we would emit a NACK/PLI when a viewer requests it // This can be less wasteful by processing incoming RTCP events, then we would emit a NACK/PLI when a viewer requests it
go func() { go func() {
ticker := time.NewTicker(rtcpPLIInterval) ticker := time.NewTicker(rtcpPLIInterval)
for range ticker.C { for range ticker.C {
if err := peerConnection.SendRTCP(&rtcp.PictureLossIndication{MediaSSRC: track.SSRC}); err != nil { if err := peerConnection.SendRTCP(&rtcp.PictureLossIndication{MediaSSRC: remoteTrack.SSRC()}); err != nil {
fmt.Println(err) fmt.Println(err)
} }
} }
}() }()
inboundSSRC <- track.SSRC // Create a local track, all our SFU clients will be fed via this track
inboundPayloadType <- track.PayloadType localTrack, err := peerConnection.NewTrack(remoteTrack.PayloadType(), remoteTrack.SSRC(), "video", "pion")
if err != nil {
panic(err)
}
localTrackChan <- localTrack
rtpBuf := make([]byte, 1400)
for { for {
rtpPacket := <-track.Packets i, err := remoteTrack.Read(rtpBuf)
if err != nil {
outboundRTPLock.RLock() panic(err)
for _, outChan := range outboundRTP { }
outPacket := rtpPacket
outPacket.Payload = append([]byte{}, outPacket.Payload...) if _, err = localTrack.Write(rtpBuf[:i]); err != nil {
select { panic(err)
case outChan <- outPacket:
default:
}
} }
outboundRTPLock.RUnlock()
} }
}) })
@@ -143,8 +138,7 @@ func main() {
// Get the LocalDescription and take it to base64 so we can paste in browser // Get the LocalDescription and take it to base64 so we can paste in browser
fmt.Println(signal.Encode(answer)) fmt.Println(signal.Encode(answer))
outboundSSRC := <-inboundSSRC localTrack := <-localTrackChan
outboundPayloadType := <-inboundPayloadType
for { for {
fmt.Println("") fmt.Println("")
fmt.Println("Curl an base64 SDP to start sendonly peer connection") fmt.Println("Curl an base64 SDP to start sendonly peer connection")
@@ -158,21 +152,11 @@ func main() {
panic(err) panic(err)
} }
// Create a single VP8 Track to send videa _, err = peerConnection.AddTrack(localTrack)
vp8Track, err := peerConnection.NewRawRTPTrack(outboundPayloadType, outboundSSRC, "video", "pion")
if err != nil { if err != nil {
panic(err) panic(err)
} }
_, err = peerConnection.AddTrack(vp8Track)
if err != nil {
panic(err)
}
outboundRTPLock.Lock()
outboundRTP = append(outboundRTP, vp8Track.RawRTP)
outboundRTPLock.Unlock()
// Set the remote SessionDescription // Set the remote SessionDescription
err = peerConnection.SetRemoteDescription(recvOnlyOffer) err = peerConnection.SetRemoteDescription(recvOnlyOffer)
if err != nil { if err != nil {

93
lossy_stream.go Normal file
View File

@@ -0,0 +1,93 @@
package webrtc
import (
"fmt"
"io"
"sync"
)
// lossyReader wraps an io.Reader and discards data if it isn't read in time
// Allowing us to only deliver the newest data to the caller
type lossyReadCloser struct {
nextReader io.ReadCloser
mu sync.RWMutex
incomingBuf chan []byte
amountRead chan int
readError error
hasErrored chan interface{}
closed chan interface{}
}
func newLossyReadCloser(nextReader io.ReadCloser) *lossyReadCloser {
l := &lossyReadCloser{
nextReader: nextReader,
closed: make(chan interface{}),
incomingBuf: make(chan []byte),
hasErrored: make(chan interface{}),
amountRead: make(chan int),
}
go func() {
readBuf := make([]byte, receiveMTU)
for {
i, err := nextReader.Read(readBuf)
if err != nil {
l.mu.Lock()
l.readError = err
l.mu.Unlock()
close(l.hasErrored)
break
}
select {
case in := <-l.incomingBuf:
copy(in, readBuf[:i])
l.amountRead <- i
default: // Discard if we have no inbound read
}
}
}()
return l
}
func (l *lossyReadCloser) Read(b []byte) (n int, err error) {
select {
case <-l.closed:
return 0, fmt.Errorf("lossyReadCloser is closed")
case <-l.hasErrored:
l.mu.RLock()
defer l.mu.RUnlock()
return 0, l.readError
case l.incomingBuf <- b:
}
select {
case <-l.closed:
return 0, fmt.Errorf("lossyReadCloser is closed")
case <-l.hasErrored:
l.mu.RLock()
defer l.mu.RUnlock()
return 0, l.readError
case i := <-l.amountRead:
return i, nil
}
}
func (l *lossyReadCloser) Close() error {
select {
case <-l.closed:
return fmt.Errorf("lossyReader is already closed")
default:
}
close(l.closed)
return l.nextReader.Close()
}

View File

@@ -13,7 +13,6 @@ import (
"time" "time"
"github.com/pions/rtcp" "github.com/pions/rtcp"
"github.com/pions/rtp"
"github.com/pions/sdp/v2" "github.com/pions/sdp/v2"
"github.com/pions/webrtc/pkg/ice" "github.com/pions/webrtc/pkg/ice"
"github.com/pions/webrtc/pkg/logging" "github.com/pions/webrtc/pkg/logging"
@@ -103,7 +102,7 @@ type PeerConnection struct {
onSignalingStateChangeHandler func(SignalingState) onSignalingStateChangeHandler func(SignalingState)
onICEConnectionStateChangeHandler func(ICEConnectionState) onICEConnectionStateChangeHandler func(ICEConnectionState)
onTrackHandler func(*Track) onTrackHandler func(*Track, *RTPReceiver)
onDataChannelHandler func(*DataChannel) onDataChannelHandler func(*DataChannel)
iceGatherer *ICEGatherer iceGatherer *ICEGatherer
@@ -279,13 +278,13 @@ func (pc *PeerConnection) OnDataChannel(f func(*DataChannel)) {
// OnTrack sets an event handler which is called when remote track // OnTrack sets an event handler which is called when remote track
// arrives from a remote peer. // arrives from a remote peer.
func (pc *PeerConnection) OnTrack(f func(*Track)) { func (pc *PeerConnection) OnTrack(f func(*Track, *RTPReceiver)) {
pc.mu.Lock() pc.mu.Lock()
defer pc.mu.Unlock() defer pc.mu.Unlock()
pc.onTrackHandler = f pc.onTrackHandler = f
} }
func (pc *PeerConnection) onTrack(t *Track) (done chan struct{}) { func (pc *PeerConnection) onTrack(t *Track, r *RTPReceiver) (done chan struct{}) {
pc.mu.RLock() pc.mu.RLock()
hdlr := pc.onTrackHandler hdlr := pc.onTrackHandler
pc.mu.RUnlock() pc.mu.RUnlock()
@@ -298,7 +297,7 @@ func (pc *PeerConnection) onTrack(t *Track) (done chan struct{}) {
} }
go func() { go func() {
hdlr(t) hdlr(t, r)
close(done) close(done)
}() }()
@@ -856,18 +855,21 @@ func (pc *PeerConnection) SetRemoteDescription(desc SessionDescription) error {
return return
} }
if pc.onTrackHandler != nil { pc.openSRTP()
pc.openSRTP()
} else {
pcLog.Warnf("OnTrack unset, unable to handle incoming media streams")
}
for _, tranceiver := range pc.rtpTransceivers { for _, tranceiver := range pc.rtpTransceivers {
if tranceiver.Sender != nil { if tranceiver.Sender != nil {
tranceiver.Sender.Send(RTPSendParameters{ err = tranceiver.Sender.Send(RTPSendParameters{
encodings: RTPEncodingParameters{ encodings: RTPEncodingParameters{
RTPCodingParameters{SSRC: tranceiver.Sender.Track.SSRC, PayloadType: tranceiver.Sender.Track.PayloadType}, RTPCodingParameters{
SSRC: tranceiver.Sender.track.SSRC(),
PayloadType: tranceiver.Sender.track.PayloadType(),
},
}}) }})
if err != nil {
pcLog.Warnf("Failed to start Sender: %s", err)
}
} }
} }
@@ -931,15 +933,37 @@ func (pc *PeerConnection) openSRTP() {
for i := range incomingSSRCes { for i := range incomingSSRCes {
go func(ssrc uint32, codecType RTPCodecType) { go func(ssrc uint32, codecType RTPCodecType) {
receiver := pc.api.NewRTPReceiver(codecType, pc.dtlsTransport) receiver, err := pc.api.NewRTPReceiver(codecType, pc.dtlsTransport)
<-receiver.Receive(RTPReceiveParameters{ if err != nil {
pcLog.Warnf("Could not create RTPReceiver %s", err)
return
}
if err = receiver.Receive(RTPReceiveParameters{
encodings: RTPDecodingParameters{ encodings: RTPDecodingParameters{
RTPCodingParameters{SSRC: ssrc}, RTPCodingParameters{SSRC: ssrc},
}}) }}); err != nil {
pcLog.Warnf("RTPReceiver Receive failed %s", err)
return
}
sdpCodec, err := pc.CurrentLocalDescription.parsed.GetCodecForPayloadType(receiver.Track.PayloadType) pc.newRTPTransceiver(
receiver,
nil,
RTPTransceiverDirectionRecvonly,
)
if err = receiver.Track().determinePayloadType(); err != nil {
pcLog.Warnf("Could not determine PayloadType for SSRC %d", receiver.Track().SSRC())
return
}
pc.mu.RLock()
defer pc.mu.RUnlock()
sdpCodec, err := pc.CurrentLocalDescription.parsed.GetCodecForPayloadType(receiver.Track().PayloadType())
if err != nil { if err != nil {
pcLog.Warnf("no codec could be found in RemoteDescription for payloadType %d", receiver.Track.PayloadType) pcLog.Warnf("no codec could be found in RemoteDescription for payloadType %d", receiver.Track().PayloadType())
return return
} }
@@ -949,18 +973,18 @@ func (pc *PeerConnection) openSRTP() {
return return
} }
receiver.Track.Kind = codec.Type receiver.Track().mu.Lock()
receiver.Track.Codec = codec receiver.Track().kind = codec.Type
pc.newRTPTransceiver( receiver.Track().codec = codec
receiver, receiver.Track().mu.Unlock()
nil,
RTPTransceiverDirectionRecvonly,
)
pc.onTrack(receiver.Track) if pc.onTrackHandler != nil {
pc.onTrack(receiver.Track(), receiver)
} else {
pcLog.Warnf("OnTrack unset, unable to handle incoming media streams")
}
}(i, incomingSSRCes[i]) }(i, incomingSSRCes[i])
} }
} }
// drainSRTP pulls and discards RTP/RTCP packets that don't match any SRTP // drainSRTP pulls and discards RTP/RTCP packets that don't match any SRTP
@@ -984,20 +1008,14 @@ func (pc *PeerConnection) drainSRTP() {
go func() { go func() {
rtpBuf := make([]byte, receiveMTU) rtpBuf := make([]byte, receiveMTU)
rtpPacket := &rtp.Packet{}
for { for {
i, err := r.Read(rtpBuf) _, rtpHeader, err := r.ReadRTP(rtpBuf)
if err != nil { if err != nil {
pcLog.Warnf("Failed to read, drainSRTP done for: %v %d \n", err, ssrc) pcLog.Warnf("Failed to read, drainSRTP done for: %v %d \n", err, ssrc)
return return
} }
if err := rtpPacket.Unmarshal(rtpBuf[:i]); err != nil { pcLog.Debugf("got RTP: %+v", rtpHeader)
pcLog.Warnf("Failed to unmarshal RTP packet, discarding: %v \n", err)
continue
}
pcLog.Debugf("got RTP: %+v", rtpPacket)
} }
}() }()
} }
@@ -1019,18 +1037,12 @@ func (pc *PeerConnection) drainSRTP() {
go func() { go func() {
rtcpBuf := make([]byte, receiveMTU) rtcpBuf := make([]byte, receiveMTU)
for { for {
i, err := r.Read(rtcpBuf) _, header, err := r.ReadRTCP(rtcpBuf)
if err != nil { if err != nil {
pcLog.Warnf("Failed to read, drainSRTCP done for: %v %d \n", err, ssrc) pcLog.Warnf("Failed to read, drainSRTCP done for: %v %d \n", err, ssrc)
return return
} }
pcLog.Debugf("got RTCP: %+v", header)
rtcpPacket, _, err := rtcp.Unmarshal(rtcpBuf[:i])
if err != nil {
pcLog.Warnf("Failed to unmarshal RTCP packet, discarding: %v \n", err)
continue
}
pcLog.Debugf("got RTCP: %+v", rtcpPacket)
} }
}() }()
} }
@@ -1087,10 +1099,10 @@ func (pc *PeerConnection) GetSenders() []*RTPSender {
pc.mu.Lock() pc.mu.Lock()
defer pc.mu.Unlock() defer pc.mu.Unlock()
result := make([]*RTPSender, len(pc.rtpTransceivers)) result := []*RTPSender{}
for i, tranceiver := range pc.rtpTransceivers { for _, tranceiver := range pc.rtpTransceivers {
if tranceiver.Sender != nil { if tranceiver.Sender != nil {
result[i] = tranceiver.Sender result = append(result, tranceiver.Sender)
} }
} }
return result return result
@@ -1101,11 +1113,10 @@ func (pc *PeerConnection) GetReceivers() []*RTPReceiver {
pc.mu.Lock() pc.mu.Lock()
defer pc.mu.Unlock() defer pc.mu.Unlock()
result := make([]*RTPReceiver, len(pc.rtpTransceivers)) result := []*RTPReceiver{}
for i, tranceiver := range pc.rtpTransceivers { for _, tranceiver := range pc.rtpTransceivers {
if tranceiver.Receiver != nil { if tranceiver.Receiver != nil {
result[i] = tranceiver.Receiver result = append(result, tranceiver.Receiver)
} }
} }
return result return result
@@ -1125,10 +1136,10 @@ func (pc *PeerConnection) AddTrack(track *Track) (*RTPSender, error) {
return nil, &rtcerr.InvalidStateError{Err: ErrConnectionClosed} return nil, &rtcerr.InvalidStateError{Err: ErrConnectionClosed}
} }
for _, transceiver := range pc.rtpTransceivers { for _, transceiver := range pc.rtpTransceivers {
if transceiver.Sender.Track == nil { if transceiver.Sender.track == nil {
continue continue
} }
if track.ID == transceiver.Sender.Track.ID { if track.ID() == transceiver.Sender.track.ID() {
return nil, &rtcerr.InvalidAccessError{Err: ErrExistingTrack} return nil, &rtcerr.InvalidAccessError{Err: ErrExistingTrack}
} }
} }
@@ -1136,9 +1147,9 @@ func (pc *PeerConnection) AddTrack(track *Track) (*RTPSender, error) {
for _, t := range pc.rtpTransceivers { for _, t := range pc.rtpTransceivers {
if !t.stopped && if !t.stopped &&
// t.Sender == nil && // TODO: check that the sender has never sent // t.Sender == nil && // TODO: check that the sender has never sent
t.Sender.Track == nil && t.Sender.track == nil &&
t.Receiver.Track != nil && t.Receiver.Track() != nil &&
t.Receiver.Track.Kind == track.Kind { t.Receiver.Track().Kind() == track.Kind() {
transceiver = t transceiver = t
break break
} }
@@ -1148,7 +1159,10 @@ func (pc *PeerConnection) AddTrack(track *Track) (*RTPSender, error) {
return nil, err return nil, err
} }
} else { } else {
sender := pc.api.NewRTPSender(track, pc.dtlsTransport) sender, err := pc.api.NewRTPSender(track, pc.dtlsTransport)
if err != nil {
return nil, err
}
transceiver = pc.newRTPTransceiver( transceiver = pc.newRTPTransceiver(
nil, nil,
sender, sender,
@@ -1156,7 +1170,7 @@ func (pc *PeerConnection) AddTrack(track *Track) (*RTPSender, error) {
) )
} }
transceiver.Mid = track.Kind.String() // TODO: Mid generation transceiver.Mid = track.Kind().String() // TODO: Mid generation
return transceiver.Sender, nil return transceiver.Sender, nil
} }
@@ -1328,16 +1342,16 @@ func (pc *PeerConnection) Close() error {
// Conn if one of the endpoints is closed down. To // Conn if one of the endpoints is closed down. To
// continue the chain the Mux has to be closed. // continue the chain the Mux has to be closed.
if err := pc.dtlsTransport.Stop(); err != nil {
closeErrs = append(closeErrs, err)
}
for _, t := range pc.rtpTransceivers { for _, t := range pc.rtpTransceivers {
if err := t.Stop(); err != nil { if err := t.Stop(); err != nil {
closeErrs = append(closeErrs, err) closeErrs = append(closeErrs, err)
} }
} }
if err := pc.dtlsTransport.Stop(); err != nil {
closeErrs = append(closeErrs, err)
}
if pc.sctpTransport != nil { if pc.sctpTransport != nil {
if err := pc.sctpTransport.Stop(); err != nil { if err := pc.sctpTransport.Stop(); err != nil {
closeErrs = append(closeErrs, err) closeErrs = append(closeErrs, err)
@@ -1421,13 +1435,13 @@ func (pc *PeerConnection) addRTPMediaSection(d *sdp.SessionDescription, codecTyp
weSend := false weSend := false
for _, transceiver := range pc.rtpTransceivers { for _, transceiver := range pc.rtpTransceivers {
if transceiver.Sender == nil || if transceiver.Sender == nil ||
transceiver.Sender.Track == nil || transceiver.Sender.track == nil ||
transceiver.Sender.Track.Kind != codecType { transceiver.Sender.track.Kind() != codecType {
continue continue
} }
weSend = true weSend = true
track := transceiver.Sender.Track track := transceiver.Sender.track
media = media.WithMediaSource(track.SSRC, track.Label /* cname */, track.Label /* streamLabel */, track.Label) media = media.WithMediaSource(track.SSRC(), track.Label() /* cname */, track.Label() /* streamLabel */, track.Label())
} }
media = media.WithPropertyAttribute(localDirection(weSend, peerDirection).String()) media = media.WithPropertyAttribute(localDirection(weSend, peerDirection).String())
@@ -1479,10 +1493,8 @@ func (pc *PeerConnection) addDataMediaSection(d *sdp.SessionDescription, midValu
d.WithMedia(media) d.WithMedia(media)
} }
// NewRawRTPTrack Creates a new Track // NewTrack Creates a new Track
// func (pc *PeerConnection) NewTrack(payloadType uint8, ssrc uint32, id, label string) (*Track, error) {
// See NewSampleTrack for documentation
func (pc *PeerConnection) NewRawRTPTrack(payloadType uint8, ssrc uint32, id, label string) (*Track, error) {
codec, err := pc.api.mediaEngine.getCodec(payloadType) codec, err := pc.api.mediaEngine.getCodec(payloadType)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -1490,28 +1502,7 @@ func (pc *PeerConnection) NewRawRTPTrack(payloadType uint8, ssrc uint32, id, lab
return nil, errors.New("codec payloader not set") return nil, errors.New("codec payloader not set")
} }
return NewRawRTPTrack(payloadType, ssrc, id, label, codec) return NewTrack(payloadType, ssrc, id, label, codec)
}
// NewSampleTrack Creates a new Track
//
// See NewSampleTrack for documentation
func (pc *PeerConnection) NewSampleTrack(payloadType uint8, id, label string) (*Track, error) {
codec, err := pc.api.mediaEngine.getCodec(payloadType)
if err != nil {
return nil, err
} else if codec.Payloader == nil {
return nil, errors.New("codec payloader not set")
}
return NewSampleTrack(payloadType, id, label, codec)
}
// NewTrack is used to create a new Track
//
// Deprecated: Use NewSampleTrack() instead
func (pc *PeerConnection) NewTrack(payloadType uint8, id, label string) (*Track, error) {
return pc.NewSampleTrack(payloadType, id, label)
} }
func (pc *PeerConnection) newRTPTransceiver( func (pc *PeerConnection) newRTPTransceiver(

View File

@@ -2,6 +2,8 @@ package webrtc
import ( import (
"bytes" "bytes"
"fmt"
"math/rand"
"sync" "sync"
"testing" "testing"
"time" "time"
@@ -32,14 +34,14 @@ func TestPeerConnection_Media_Sample(t *testing.T) {
awaitRTCPSenderRecv := make(chan bool) awaitRTCPSenderRecv := make(chan bool)
awaitRTCPSenderSend := make(chan error) awaitRTCPSenderSend := make(chan error)
awaitRTCPRecieverRecv := make(chan bool) awaitRTCPRecieverRecv := make(chan error)
awaitRTCPRecieverSend := make(chan error) awaitRTCPRecieverSend := make(chan error)
pcAnswer.OnTrack(func(track *Track) { pcAnswer.OnTrack(func(track *Track, receiver *RTPReceiver) {
go func() { go func() {
for { for {
time.Sleep(time.Millisecond * 100) time.Sleep(time.Millisecond * 100)
if routineErr := pcAnswer.SendRTCP(&rtcp.RapidResynchronizationRequest{SenderSSRC: track.SSRC, MediaSSRC: track.SSRC}); routineErr != nil { if routineErr := pcAnswer.SendRTCP(&rtcp.RapidResynchronizationRequest{SenderSSRC: track.SSRC(), MediaSSRC: track.SSRC()}); routineErr != nil {
awaitRTCPRecieverSend <- routineErr awaitRTCPRecieverSend <- routineErr
return return
} }
@@ -54,14 +56,18 @@ func TestPeerConnection_Media_Sample(t *testing.T) {
}() }()
go func() { go func() {
<-track.RTCPPackets _, routineErr := receiver.Read(make([]byte, 1400))
close(awaitRTCPRecieverRecv) if routineErr != nil {
awaitRTCPRecieverRecv <- routineErr
} else {
close(awaitRTCPRecieverRecv)
}
}() }()
haveClosedAwaitRTPRecv := false haveClosedAwaitRTPRecv := false
for { for {
p, ok := <-track.Packets p, routineErr := track.ReadRTP()
if !ok { if routineErr != nil {
close(awaitRTPRecvClosed) close(awaitRTPRecvClosed)
return return
} else if bytes.Equal(p.Payload, []byte{0x10, 0x00}) && !haveClosedAwaitRTPRecv { } else if bytes.Equal(p.Payload, []byte{0x10, 0x00}) && !haveClosedAwaitRTPRecv {
@@ -71,18 +77,21 @@ func TestPeerConnection_Media_Sample(t *testing.T) {
} }
}) })
vp8Track, err := pcOffer.NewSampleTrack(DefaultPayloadTypeVP8, "video", "pion") vp8Track, err := pcOffer.NewTrack(DefaultPayloadTypeVP8, rand.Uint32(), "video", "pion")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if _, err = pcOffer.AddTrack(vp8Track); err != nil { rtpReceiver, err := pcOffer.AddTrack(vp8Track)
if err != nil {
t.Fatal(err) t.Fatal(err)
} }
go func() { go func() {
for { for {
time.Sleep(time.Millisecond * 100) time.Sleep(time.Millisecond * 100)
vp8Track.Samples <- media.Sample{Data: []byte{0x00}, Samples: 1} if routineErr := vp8Track.WriteSample(media.Sample{Data: []byte{0x00}, Samples: 1}); routineErr != nil {
fmt.Println(routineErr)
}
select { select {
case <-awaitRTPRecv: case <-awaitRTPRecv:
@@ -96,7 +105,7 @@ func TestPeerConnection_Media_Sample(t *testing.T) {
go func() { go func() {
for { for {
time.Sleep(time.Millisecond * 100) time.Sleep(time.Millisecond * 100)
if routineErr := pcOffer.SendRTCP(&rtcp.PictureLossIndication{SenderSSRC: vp8Track.SSRC, MediaSSRC: vp8Track.SSRC}); routineErr != nil { if routineErr := pcOffer.SendRTCP(&rtcp.PictureLossIndication{SenderSSRC: vp8Track.SSRC(), MediaSSRC: vp8Track.SSRC()}); routineErr != nil {
awaitRTCPSenderSend <- routineErr awaitRTCPSenderSend <- routineErr
} }
@@ -110,8 +119,9 @@ func TestPeerConnection_Media_Sample(t *testing.T) {
}() }()
go func() { go func() {
<-vp8Track.RTCPPackets if _, routineErr := rtpReceiver.Read(make([]byte, 1400)); routineErr == nil {
close(awaitRTCPSenderRecv) close(awaitRTCPSenderRecv)
}
}() }()
err = signalPair(pcOffer, pcAnswer) err = signalPair(pcOffer, pcAnswer)
@@ -158,38 +168,41 @@ This test adds an input track and asserts
func TestPeerConnection_Media_Shutdown(t *testing.T) { func TestPeerConnection_Media_Shutdown(t *testing.T) {
iceComplete := make(chan bool) iceComplete := make(chan bool)
api := NewAPI()
lim := test.TimeOut(time.Second * 30) lim := test.TimeOut(time.Second * 30)
defer lim.Stop() defer lim.Stop()
report := test.CheckRoutines(t) report := test.CheckRoutines(t)
defer report() defer report()
api.mediaEngine.RegisterDefaultCodecs() pcOffer, err := NewPeerConnection(Configuration{})
pcOffer, pcAnswer, err := api.newPair()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
opusTrack, err := pcOffer.NewSampleTrack(DefaultPayloadTypeOpus, "audio", "pion1") pcAnswer, err := NewPeerConnection(Configuration{})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
vp8Track, err := pcOffer.NewSampleTrack(DefaultPayloadTypeVP8, "video", "pion2")
opusTrack, err := pcOffer.NewTrack(DefaultPayloadTypeOpus, rand.Uint32(), "audio", "pion1")
if err != nil {
t.Fatal(err)
}
vp8Track, err := pcOffer.NewTrack(DefaultPayloadTypeVP8, rand.Uint32(), "video", "pion2")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if _, err = pcOffer.AddTrack(opusTrack); err != nil { if _, err = pcOffer.AddTrack(opusTrack); err != nil {
t.Fatal(err) t.Fatal(err)
} else if _, err = pcOffer.AddTrack(vp8Track); err != nil { } else if _, err = pcAnswer.AddTrack(vp8Track); err != nil {
t.Fatal(err) t.Fatal(err)
} }
var onTrackFiredLock sync.RWMutex var onTrackFiredLock sync.RWMutex
onTrackFired := false onTrackFired := false
pcAnswer.OnTrack(func(track *Track) { pcAnswer.OnTrack(func(track *Track, receiver *RTPReceiver) {
onTrackFiredLock.Lock() onTrackFiredLock.Lock()
defer onTrackFiredLock.Unlock() defer onTrackFiredLock.Unlock()
onTrackFired = true onTrackFired = true
@@ -208,9 +221,26 @@ func TestPeerConnection_Media_Shutdown(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
<-iceComplete <-iceComplete
// Each PeerConnection should have one sender, one receiver and two transceivers
for _, pc := range []*PeerConnection{pcOffer, pcAnswer} {
senders := pc.GetSenders()
if len(senders) != 1 {
t.Errorf("Each PeerConnection should have one RTPSender, we have %d", len(senders))
}
receivers := pc.GetReceivers()
if len(receivers) != 1 {
t.Errorf("Each PeerConnection should have one RTPReceiver, we have %d", len(receivers))
}
transceivers := pc.GetTransceivers()
if len(transceivers) != 2 {
t.Errorf("Each PeerConnection should have two RTPTransceivers, we have %d", len(transceivers))
}
}
err = pcOffer.Close() err = pcOffer.Close()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@@ -226,5 +256,4 @@ func TestPeerConnection_Media_Shutdown(t *testing.T) {
t.Fatalf("PeerConnection OnTrack fired even though we got no packets") t.Fatalf("PeerConnection OnTrack fired even though we got no packets")
} }
onTrackFiredLock.Unlock() onTrackFiredLock.Unlock()
} }

View File

@@ -10,9 +10,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/pions/rtp"
"github.com/pions/webrtc/pkg/ice" "github.com/pions/webrtc/pkg/ice"
"github.com/pions/webrtc/pkg/media"
"github.com/pions/webrtc/pkg/rtcerr" "github.com/pions/webrtc/pkg/rtcerr"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@@ -431,55 +429,6 @@ func TestCreateOfferAnswer(t *testing.T) {
} }
} }
func TestPeerConnection_NewRawRTPTrack(t *testing.T) {
api := NewAPI()
api.mediaEngine.RegisterDefaultCodecs()
pc, err := api.NewPeerConnection(Configuration{})
assert.Nil(t, err)
_, err = pc.NewRawRTPTrack(DefaultPayloadTypeH264, 0, "trackId", "trackLabel")
assert.NotNil(t, err)
track, err := pc.NewRawRTPTrack(DefaultPayloadTypeH264, 123456, "trackId", "trackLabel")
assert.Nil(t, err)
_, err = pc.AddTrack(track)
assert.Nil(t, err)
// This channel should not be set up for a RawRTP track
assert.Panics(t, func() {
track.Samples <- media.Sample{}
})
assert.NotPanics(t, func() {
track.RawRTP <- &rtp.Packet{}
})
}
func TestPeerConnection_NewSampleTrack(t *testing.T) {
api := NewAPI()
api.mediaEngine.RegisterDefaultCodecs()
pc, err := api.NewPeerConnection(Configuration{})
assert.Nil(t, err)
track, err := pc.NewSampleTrack(DefaultPayloadTypeH264, "trackId", "trackLabel")
assert.Nil(t, err)
_, err = pc.AddTrack(track)
assert.Nil(t, err)
// This channel should not be set up for a Sample track
assert.Panics(t, func() {
track.RawRTP <- &rtp.Packet{}
})
assert.NotPanics(t, func() {
track.Samples <- media.Sample{}
})
}
func TestPeerConnection_EventHandlers(t *testing.T) { func TestPeerConnection_EventHandlers(t *testing.T) {
api := NewAPI() api := NewAPI()
pc, err := api.NewPeerConnection(Configuration{}) pc, err := api.NewPeerConnection(Configuration{})
@@ -490,10 +439,10 @@ func TestPeerConnection_EventHandlers(t *testing.T) {
onDataChannelCalled := make(chan bool) onDataChannelCalled := make(chan bool)
// Verify that the noop case works // Verify that the noop case works
assert.NotPanics(t, func() { pc.onTrack(nil) }) assert.NotPanics(t, func() { pc.onTrack(nil, nil) })
assert.NotPanics(t, func() { pc.onICEConnectionStateChange(ice.ConnectionStateNew) }) assert.NotPanics(t, func() { pc.onICEConnectionStateChange(ice.ConnectionStateNew) })
pc.OnTrack(func(t *Track) { pc.OnTrack(func(t *Track, r *RTPReceiver) {
onTrackCalled <- true onTrackCalled <- true
}) })
@@ -506,11 +455,11 @@ func TestPeerConnection_EventHandlers(t *testing.T) {
}) })
// Verify that the handlers deal with nil inputs // Verify that the handlers deal with nil inputs
assert.NotPanics(t, func() { pc.onTrack(nil) }) assert.NotPanics(t, func() { pc.onTrack(nil, nil) })
assert.NotPanics(t, func() { go pc.onDataChannelHandler(nil) }) assert.NotPanics(t, func() { go pc.onDataChannelHandler(nil) })
// Verify that the set handlers are called // Verify that the set handlers are called
assert.NotPanics(t, func() { pc.onTrack(&Track{}) }) assert.NotPanics(t, func() { pc.onTrack(&Track{}, &RTPReceiver{}) })
assert.NotPanics(t, func() { pc.onICEConnectionStateChange(ice.ConnectionStateNew) }) assert.NotPanics(t, func() { pc.onICEConnectionStateChange(ice.ConnectionStateNew) })
assert.NotPanics(t, func() { go pc.onDataChannelHandler(&DataChannel{api: api}) }) assert.NotPanics(t, func() { go pc.onDataChannelHandler(&DataChannel{api: api}) })

51
pkg/ice/ice_test.go Normal file
View File

@@ -0,0 +1,51 @@
package ice
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestConnectedState_String(t *testing.T) {
testCases := []struct {
connectionState ConnectionState
expectedString string
}{
{ConnectionState(Unknown), "Invalid"},
{ConnectionStateNew, "New"},
{ConnectionStateChecking, "Checking"},
{ConnectionStateConnected, "Connected"},
{ConnectionStateCompleted, "Completed"},
{ConnectionStateFailed, "Failed"},
{ConnectionStateDisconnected, "Disconnected"},
{ConnectionStateClosed, "Closed"},
}
for i, testCase := range testCases {
assert.Equal(t,
testCase.expectedString,
testCase.connectionState.String(),
"testCase: %d %v", i, testCase,
)
}
}
func TestGatheringState_String(t *testing.T) {
testCases := []struct {
gatheringState GatheringState
expectedString string
}{
{GatheringState(Unknown), ErrUnknownType.Error()},
{GatheringStateNew, "new"},
{GatheringStateGathering, "gathering"},
{GatheringStateComplete, "complete"},
}
for i, testCase := range testCases {
assert.Equal(t,
testCase.expectedString,
testCase.gatheringState.String(),
"testCase: %d %v", i, testCase,
)
}
}

View File

@@ -5,8 +5,6 @@ import (
"sync" "sync"
"github.com/pions/rtcp" "github.com/pions/rtcp"
"github.com/pions/rtp"
"github.com/pions/srtp"
) )
// RTPReceiver allows an application to inspect the receipt of a Track // RTPReceiver allows an application to inspect the receipt of a Track
@@ -14,149 +12,101 @@ type RTPReceiver struct {
kind RTPCodecType kind RTPCodecType
transport *DTLSTransport transport *DTLSTransport
hasRecv chan bool track *Track
Track *Track closed, received chan interface{}
mu sync.RWMutex
closed bool rtpReadStream, rtcpReadStream *lossyReadCloser
mu sync.Mutex
rtpOut chan *rtp.Packet
rtpReadStream *srtp.ReadStreamSRTP
rtpOutDone chan struct{}
rtcpOut chan rtcp.Packet
rtcpReadStream *srtp.ReadStreamSRTCP
rtcpOutDone chan struct{}
// A reference to the associated api object // A reference to the associated api object
api *API api *API
} }
// NewRTPReceiver constructs a new RTPReceiver // NewRTPReceiver constructs a new RTPReceiver
func (api *API) NewRTPReceiver(kind RTPCodecType, transport *DTLSTransport) *RTPReceiver { func (api *API) NewRTPReceiver(kind RTPCodecType, transport *DTLSTransport) (*RTPReceiver, error) {
if transport == nil {
return nil, fmt.Errorf("DTLSTransport must not be nil")
}
return &RTPReceiver{ return &RTPReceiver{
kind: kind, kind: kind,
transport: transport, transport: transport,
api: api,
closed: make(chan interface{}),
received: make(chan interface{}),
}, nil
}
rtpOut: make(chan *rtp.Packet, 15), // Track returns the RTCRtpTransceiver track
rtpOutDone: make(chan struct{}), func (r *RTPReceiver) Track() *Track {
r.mu.RLock()
defer r.mu.RUnlock()
return r.track
}
rtcpOut: make(chan rtcp.Packet, 15), // Receive initialize the track and starts all the transports
rtcpOutDone: make(chan struct{}), func (r *RTPReceiver) Receive(parameters RTPReceiveParameters) error {
r.mu.Lock()
defer r.mu.Unlock()
select {
case <-r.received:
return fmt.Errorf("Receive has already been called")
default:
}
close(r.received)
hasRecv: make(chan bool), r.track = &Track{
kind: r.kind,
ssrc: parameters.encodings.SSRC,
receiver: r,
}
api: api, srtpSession, err := r.transport.getSRTPSession()
if err != nil {
return err
}
srtpReadStream, err := srtpSession.OpenReadStream(parameters.encodings.SSRC)
if err != nil {
return err
}
srtcpSession, err := r.transport.getSRTCPSession()
if err != nil {
return err
}
srtcpReadStream, err := srtcpSession.OpenReadStream(parameters.encodings.SSRC)
if err != nil {
return err
}
r.rtpReadStream = newLossyReadCloser(srtpReadStream)
r.rtcpReadStream = newLossyReadCloser(srtcpReadStream)
return nil
}
// Read reads incoming RTCP for this RTPReceiver
func (r *RTPReceiver) Read(b []byte) (n int, err error) {
select {
case <-r.closed:
return 0, fmt.Errorf("RTPSender has been stopped")
case <-r.received:
return r.rtcpReadStream.Read(b)
} }
} }
// Receive blocks until the Track is available // ReadRTCP is a convenience method that wraps Read and unmarshals for you
func (r *RTPReceiver) Receive(parameters RTPReceiveParameters) chan bool { func (r *RTPReceiver) ReadRTCP() (rtcp.Packet, error) {
// TODO atomic only allow this to fire once b := make([]byte, receiveMTU)
r.Track = &Track{ i, err := r.Read(b)
Kind: r.kind, if err != nil {
SSRC: parameters.encodings.SSRC, return nil, err
Packets: r.rtpOut,
RTCPPackets: r.rtcpOut,
} }
// RTP ReadLoop pkt, _, err := rtcp.Unmarshal(b[:i])
go func() { return pkt, err
payloadSet := false
defer func() {
if !payloadSet {
close(r.hasRecv)
}
close(r.rtpOut)
close(r.rtpOutDone)
}()
srtpSession, err := r.transport.getSRTPSession()
if err != nil {
pcLog.Warnf("Failed to open SRTPSession, Track done for: %v %d \n", err, parameters.encodings.SSRC)
return
}
readStream, err := srtpSession.OpenReadStream(parameters.encodings.SSRC)
if err != nil {
pcLog.Warnf("Failed to open RTCP ReadStream, Track done for: %v %d \n", err, parameters.encodings.SSRC)
return
}
r.mu.Lock()
r.rtpReadStream = readStream
r.mu.Unlock()
readBuf := make([]byte, receiveMTU)
for {
rtpLen, err := readStream.Read(readBuf)
if err != nil {
pcLog.Warnf("Failed to read, Track done for: %v %d \n", err, parameters.encodings.SSRC)
return
}
var rtpPacket rtp.Packet
if err = rtpPacket.Unmarshal(append([]byte{}, readBuf[:rtpLen]...)); err != nil {
pcLog.Warnf("Failed to unmarshal RTP packet, discarding: %v \n", err)
continue
}
if !payloadSet {
r.Track.PayloadType = rtpPacket.PayloadType
payloadSet = true
close(r.hasRecv)
}
select {
case r.rtpOut <- &rtpPacket:
default:
}
}
}()
// RTCP ReadLoop
go func() {
defer func() {
close(r.rtcpOut)
close(r.rtcpOutDone)
}()
srtcpSession, err := r.transport.getSRTCPSession()
if err != nil {
pcLog.Warnf("Failed to open SRTCPSession, Track done for: %v %d \n", err, parameters.encodings.SSRC)
return
}
readStream, err := srtcpSession.OpenReadStream(parameters.encodings.SSRC)
if err != nil {
pcLog.Warnf("Failed to open RTCP ReadStream, Track done for: %v %d \n", err, parameters.encodings.SSRC)
return
}
r.mu.Lock()
r.rtcpReadStream = readStream
r.mu.Unlock()
readBuf := make([]byte, receiveMTU)
for {
rtcpLen, err := readStream.Read(readBuf)
if err != nil {
pcLog.Warnf("Failed to read, Track done for: %v %d \n", err, parameters.encodings.SSRC)
return
}
rtcpPacket, _, err := rtcp.Unmarshal(append([]byte{}, readBuf[:rtcpLen]...))
if err != nil {
pcLog.Warnf("Failed to unmarshal RTCP packet, discarding: %v \n", err)
continue
}
select {
case r.rtcpOut <- rtcpPacket:
default:
}
}
}()
return r.hasRecv
} }
// Stop irreversibly stops the RTPReceiver // Stop irreversibly stops the RTPReceiver
@@ -164,26 +114,33 @@ func (r *RTPReceiver) Stop() error {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
if r.closed { select {
return fmt.Errorf("RTPReceiver has already been closed") case <-r.closed:
return nil
default:
} }
select { select {
case <-r.hasRecv: case <-r.received:
if err := r.rtcpReadStream.Close(); err != nil {
return err
}
if err := r.rtpReadStream.Close(); err != nil {
return err
}
default: default:
return fmt.Errorf("RTPReceiver has not been started")
} }
if err := r.rtcpReadStream.Close(); err != nil { close(r.closed)
return err
}
if err := r.rtpReadStream.Close(); err != nil {
return err
}
<-r.rtcpOutDone
<-r.rtpOutDone
r.closed = true
return nil return nil
} }
// readRTP should only be called by a track, this only exists so we can keep state in one place
func (r *RTPReceiver) readRTP(b []byte) (n int, err error) {
select {
case <-r.closed:
return 0, fmt.Errorf("RTPSender has been stopped")
case <-r.received:
return r.rtpReadStream.Read(b)
}
}

View File

@@ -1,154 +1,146 @@
package webrtc package webrtc
import ( import (
"github.com/pions/rtcp" "fmt"
"github.com/pions/rtp" "sync"
"github.com/pions/webrtc/pkg/media"
)
const rtpOutboundMTU = 1400 "github.com/pions/rtcp"
)
// RTPSender allows an application to control how a given Track is encoded and transmitted to a remote peer // RTPSender allows an application to control how a given Track is encoded and transmitted to a remote peer
type RTPSender struct { type RTPSender struct {
Track *Track track *Track
rtcpReadStream *lossyReadCloser
transport *DTLSTransport transport *DTLSTransport
// A reference to the associated api object // A reference to the associated api object
api *API api *API
mu sync.RWMutex
sendCalled, stopCalled chan interface{}
} }
// NewRTPSender constructs a new RTPSender // NewRTPSender constructs a new RTPSender
func (api *API) NewRTPSender(track *Track, transport *DTLSTransport) *RTPSender { func (api *API) NewRTPSender(track *Track, transport *DTLSTransport) (*RTPSender, error) {
r := &RTPSender{ if track == nil {
Track: track, return nil, fmt.Errorf("Track must not be nil")
transport: transport, } else if transport == nil {
api: api, return nil, fmt.Errorf("DTLSTransport must not be nil")
} }
r.Track.sampleInput = make(chan media.Sample, 15) // Is the buffering needed? track.mu.RLock()
r.Track.rawInput = make(chan *rtp.Packet, 15) // Is the buffering needed? defer track.mu.RUnlock()
r.Track.rtcpInput = make(chan rtcp.Packet, 15) // Is the buffering needed? if track.receiver != nil {
return nil, fmt.Errorf("RTPSender can not be constructed with remote track")
r.Track.Samples = r.Track.sampleInput
r.Track.RawRTP = r.Track.rawInput
r.Track.RTCPPackets = r.Track.rtcpInput
if r.Track.isRawRTP {
close(r.Track.Samples)
} else {
close(r.Track.RawRTP)
} }
return r return &RTPSender{
track: track,
transport: transport,
api: api,
sendCalled: make(chan interface{}),
stopCalled: make(chan interface{}),
}, nil
} }
// Send Attempts to set the parameters controlling the sending of media. // Send Attempts to set the parameters controlling the sending of media.
func (r *RTPSender) Send(parameters RTPSendParameters) { func (r *RTPSender) Send(parameters RTPSendParameters) error {
if r.Track.isRawRTP { r.mu.Lock()
go r.handleRawRTP(r.Track.rawInput) defer r.mu.Unlock()
} else { select {
go r.handleSampleRTP(r.Track.sampleInput) case <-r.sendCalled:
return fmt.Errorf("Send has already been called")
default:
} }
go r.handleRTCP(r.transport, r.Track.rtcpInput) srtcpSession, err := r.transport.getSRTCPSession()
if err != nil {
return err
}
srtcpReadStream, err := srtcpSession.OpenReadStream(parameters.encodings.SSRC)
if err != nil {
return err
}
r.rtcpReadStream = newLossyReadCloser(srtcpReadStream)
r.track.mu.Lock()
r.track.senders = append(r.track.senders, r)
r.track.mu.Unlock()
close(r.sendCalled)
return nil
} }
// Stop irreversibly stops the RTPSender // Stop irreversibly stops the RTPSender
func (r *RTPSender) Stop() { func (r *RTPSender) Stop() error {
if r.Track.isRawRTP { r.mu.Lock()
close(r.Track.RawRTP) defer r.mu.Unlock()
} else {
close(r.Track.Samples) select {
case <-r.stopCalled:
return nil
default:
} }
// TODO properly tear down all loops (and test that) r.track.mu.Lock()
defer r.track.mu.Unlock()
filtered := []*RTPSender{}
for _, s := range r.track.senders {
if s != r {
filtered = append(filtered, s)
}
}
r.track.senders = filtered
select {
case <-r.sendCalled:
return r.rtcpReadStream.Close()
default:
}
close(r.stopCalled)
return nil
} }
func (r *RTPSender) handleRawRTP(rtpPackets chan *rtp.Packet) { // Read reads incoming RTCP for this RTPReceiver
for { func (r *RTPSender) Read(b []byte) (n int, err error) {
p, ok := <-rtpPackets select {
if !ok { case <-r.stopCalled:
return return 0, fmt.Errorf("RTPSender has been stopped")
} case <-r.sendCalled:
return r.rtcpReadStream.Read(b)
r.sendRTP(p)
} }
} }
func (r *RTPSender) handleSampleRTP(rtpPackets chan media.Sample) { // ReadRTCP is a convenience method that wraps Read and unmarshals for you
packetizer := rtp.NewPacketizer( func (r *RTPSender) ReadRTCP() (rtcp.Packet, error) {
rtpOutboundMTU, b := make([]byte, receiveMTU)
r.Track.PayloadType, i, err := r.Read(b)
r.Track.SSRC,
r.Track.Codec.Payloader,
rtp.NewRandomSequencer(),
r.Track.Codec.ClockRate,
)
for {
in, ok := <-rtpPackets
if !ok {
return
}
packets := packetizer.Packetize(in.Data, in.Samples)
for _, p := range packets {
r.sendRTP(p)
}
}
}
func (r *RTPSender) handleRTCP(transport *DTLSTransport, rtcpPackets chan rtcp.Packet) {
srtcpSession, err := transport.getSRTCPSession()
if err != nil { if err != nil {
pcLog.Warnf("Failed to open SRTCPSession, Track done for: %v %d \n", err, r.Track.SSRC) return nil, err
return
} }
readStream, err := srtcpSession.OpenReadStream(r.Track.SSRC) pkt, _, err := rtcp.Unmarshal(b[:i])
if err != nil { return pkt, err
pcLog.Warnf("Failed to open RTCP ReadStream, Track done for: %v %d \n", err, r.Track.SSRC) }
return
}
var rtcpPacket rtcp.Packet // sendRTP should only be called by a track, this only exists so we can keep state in one place
for { func (r *RTPSender) sendRTP(b []byte) (int, error) {
rtcpBuf := make([]byte, receiveMTU) select {
i, err := readStream.Read(rtcpBuf) case <-r.stopCalled:
return 0, fmt.Errorf("RTPSender has been stopped")
case <-r.sendCalled:
srtpSession, err := r.transport.getSRTPSession()
if err != nil { if err != nil {
pcLog.Warnf("Failed to read, Track done for: %v %d \n", err, r.Track.SSRC) return 0, err
return
} }
rtcpPacket, _, err = rtcp.Unmarshal(rtcpBuf[:i]) writeStream, err := srtpSession.OpenWriteStream()
if err != nil { if err != nil {
pcLog.Warnf("Failed to unmarshal RTCP packet, discarding: %v \n", err) return 0, err
continue
} }
select { return writeStream.Write(b)
case rtcpPackets <- rtcpPacket:
default:
}
}
}
func (r *RTPSender) sendRTP(packet *rtp.Packet) {
srtpSession, err := r.transport.getSRTPSession()
if err != nil {
pcLog.Warnf("SendRTP failed to open SrtpSession: %v", err)
return
}
writeStream, err := srtpSession.OpenWriteStream()
if err != nil {
pcLog.Warnf("SendRTP failed to open WriteStream: %v", err)
return
}
if _, err := writeStream.WriteRTP(&packet.Header, packet.Payload); err != nil {
pcLog.Warnf("SendRTP failed to write: %v", err)
} }
} }

View File

@@ -17,7 +17,7 @@ type RTPTransceiver struct {
} }
func (t *RTPTransceiver) setSendingTrack(track *Track) error { func (t *RTPTransceiver) setSendingTrack(track *Track) error {
t.Sender.Track = track t.Sender.track = track
switch t.Direction { switch t.Direction {
case RTPTransceiverDirectionRecvonly: case RTPTransceiverDirectionRecvonly:
@@ -33,7 +33,9 @@ func (t *RTPTransceiver) setSendingTrack(track *Track) error {
// Stop irreversibly stops the RTPTransceiver // Stop irreversibly stops the RTPTransceiver
func (t *RTPTransceiver) Stop() error { func (t *RTPTransceiver) Stop() error {
if t.Sender != nil { if t.Sender != nil {
t.Sender.Stop() if err := t.Sender.Stop(); err != nil {
return err
}
} }
if t.Receiver != nil { if t.Receiver != nil {
if err := t.Receiver.Stop(); err != nil { if err := t.Receiver.Stop(); err != nil {

214
track.go
View File

@@ -1,76 +1,186 @@
package webrtc package webrtc
import ( import (
"crypto/rand" "fmt"
"encoding/binary" "sync"
"github.com/pions/rtcp"
"github.com/pions/rtp" "github.com/pions/rtp"
"github.com/pions/webrtc/pkg/media" "github.com/pions/webrtc/pkg/media"
"github.com/pkg/errors"
) )
// Track represents a track that is communicated const rtpOutboundMTU = 1400
// Track represents a single media track
type Track struct { type Track struct {
isRawRTP bool mu sync.RWMutex
sampleInput chan media.Sample
rawInput chan *rtp.Packet
rtcpInput chan rtcp.Packet
ID string id string
PayloadType uint8 payloadType uint8
Kind RTPCodecType kind RTPCodecType
Label string label string
SSRC uint32 ssrc uint32
Codec *RTPCodec codec *RTPCodec
Packets <-chan *rtp.Packet packetizer rtp.Packetizer
RTCPPackets <-chan rtcp.Packet receiver *RTPReceiver
senders []*RTPSender
Samples chan<- media.Sample
RawRTP chan<- *rtp.Packet
} }
// NewRawRTPTrack initializes a new *Track configured to accept raw *rtp.Packet // ID gets the ID of the track
// func (t *Track) ID() string {
// NB: If the source RTP stream is being broadcast to multiple tracks, each track t.mu.RLock()
// must receive its own copies of the source packets in order to avoid packet corruption. defer t.mu.RUnlock()
func NewRawRTPTrack(payloadType uint8, ssrc uint32, id, label string, codec *RTPCodec) (*Track, error) { return t.id
}
// PayloadType gets the PayloadType of the track
func (t *Track) PayloadType() uint8 {
t.mu.RLock()
defer t.mu.RUnlock()
return t.payloadType
}
// Kind gets the Kind of the track
func (t *Track) Kind() RTPCodecType {
t.mu.RLock()
defer t.mu.RUnlock()
return t.kind
}
// Label gets the Label of the track
func (t *Track) Label() string {
t.mu.RLock()
defer t.mu.RUnlock()
return t.label
}
// SSRC gets the SSRC of the track
func (t *Track) SSRC() uint32 {
t.mu.RLock()
defer t.mu.RUnlock()
return t.ssrc
}
// Codec gets the Codec of the track
func (t *Track) Codec() *RTPCodec {
t.mu.RLock()
defer t.mu.RUnlock()
return t.codec
}
// Read reads data from the track. If this is a local track this will error
func (t *Track) Read(b []byte) (n int, err error) {
t.mu.RLock()
if len(t.senders) != 0 {
t.mu.RUnlock()
return 0, fmt.Errorf("this is a local track and must not be read from")
}
r := t.receiver
t.mu.RUnlock()
return r.readRTP(b)
}
// ReadRTP is a convenience method that wraps Read and unmarshals for you
func (t *Track) ReadRTP() (*rtp.Packet, error) {
b := make([]byte, receiveMTU)
i, err := t.Read(b)
if err != nil {
return nil, err
}
r := &rtp.Packet{}
if err := r.Unmarshal(b[:i]); err != nil {
return nil, err
}
return r, nil
}
// Write writes data to the track. If this is a remote track this will error
func (t *Track) Write(b []byte) (n int, err error) {
t.mu.RLock()
if t.receiver != nil {
t.mu.RUnlock()
return 0, fmt.Errorf("this is a remote track and must not be written to")
}
senders := t.senders
t.mu.RUnlock()
for _, s := range senders {
if _, err := s.sendRTP(b); err != nil {
return 0, err
}
}
return len(b), nil
}
// WriteSample packetizes and writes to the track
func (t *Track) WriteSample(s media.Sample) error {
packets := t.packetizer.Packetize(s.Data, s.Samples)
for _, p := range packets {
buf, err := p.Marshal()
if err != nil {
return err
}
if _, err := t.Write(buf); err != nil {
return err
}
}
return nil
}
// WriteRTP writes RTP packets to the track
func (t *Track) WriteRTP(p *rtp.Packet) error {
buf, err := p.Marshal()
if err != nil {
return err
}
if _, err := t.Write(buf); err != nil {
return err
}
return nil
}
// NewTrack initializes a new *Track
func NewTrack(payloadType uint8, ssrc uint32, id, label string, codec *RTPCodec) (*Track, error) {
if ssrc == 0 { if ssrc == 0 {
return nil, errors.New("SSRC supplied to NewRawRTPTrack() must be non-zero") return nil, fmt.Errorf("SSRC supplied to NewTrack() must be non-zero")
} }
return &Track{ packetizer := rtp.NewPacketizer(
isRawRTP: true, rtpOutboundMTU,
payloadType,
ssrc,
codec.Payloader,
rtp.NewRandomSequencer(),
codec.ClockRate,
)
ID: id, return &Track{
PayloadType: payloadType, id: id,
Kind: codec.Type, payloadType: payloadType,
Label: label, kind: codec.Type,
SSRC: ssrc, label: label,
Codec: codec, ssrc: ssrc,
codec: codec,
packetizer: packetizer,
}, nil }, nil
} }
// NewSampleTrack initializes a new *Track configured to accept media.Sample // determinePayloadType blocks and reads a single packet to determine the PayloadType for this Track
func NewSampleTrack(payloadType uint8, id, label string, codec *RTPCodec) (*Track, error) { // this is useful if we are dealing with a remote track and we can't announce it to the user until we know the payloadType
if codec == nil { func (t *Track) determinePayloadType() error {
return nil, errors.New("codec supplied to NewSampleTrack() must not be nil") r, err := t.ReadRTP()
if err != nil {
return err
} }
buf := make([]byte, 4) t.mu.Lock()
if _, err := rand.Read(buf); err != nil { t.payloadType = r.PayloadType
return nil, errors.New("failed to generate random value") defer t.mu.Unlock()
}
return &Track{ return nil
isRawRTP: false,
ID: id,
PayloadType: payloadType,
Kind: codec.Type,
Label: label,
SSRC: binary.LittleEndian.Uint32(buf),
Codec: codec,
}, nil
} }