Add opus sending support

This commit is contained in:
John Bradley
2018-07-03 18:00:45 -05:00
parent f7ae8e3d0a
commit 912a8e18f8
8 changed files with 80 additions and 26 deletions

View File

@@ -43,7 +43,6 @@ GstElement *gstreamer_recieve_create_pipeline(char *pipeline) {
}
void gstreamer_recieve_start_pipeline(GstElement *pipeline) {
GstElement *source, *demuxer, *decoder, *conv, *sink;
GstBus *bus;
guint bus_watch_id;

View File

@@ -2,8 +2,6 @@
#include <gst/app/gstappsrc.h>
#define PIPELINE "videotestsrc ! vp8enc ! appsink name=appsink"
static gboolean gstreamer_send_bus_call(GstBus *bus, GstMessage *msg, gpointer data) {
GMainLoop *loop = (GMainLoop *)data;
@@ -52,10 +50,10 @@ GstFlowReturn gstreamer_send_new_sample_handler(GstElement *object, gpointer use
return GST_FLOW_OK;
}
GstElement *gstreamer_send_create_pipeline() {
GstElement *gstreamer_send_create_pipeline(char *pipeline) {
gst_init(NULL, NULL);
GError *error = NULL;
return gst_parse_launch(PIPELINE, &error);
return gst_parse_launch(pipeline, &error);
}
void gstreamer_send_start_pipeline(GstElement *pipeline) {

View File

@@ -9,28 +9,46 @@ package gst
import "C"
import (
"fmt"
"github.com/pions/webrtc"
"unsafe"
)
// Pipeline is a wrapper for a GStreamer Pipeline
type Pipeline struct {
Pipeline *C.GstElement
in chan<- []byte
in chan<- webrtc.RTCSample
samples uint32
}
// This allows cgo to access pipeline, this will not work if you want multiple
var globalPipeline *Pipeline
// CreatePipeline creates a GStreamer Pipeline
func CreatePipeline(in chan<- []byte) *Pipeline {
func CreatePipeline(codec webrtc.TrackType, in chan<- webrtc.RTCSample) *Pipeline {
pipelineStr := "appsink name=appsink"
var samples uint32
switch codec {
case webrtc.VP8:
pipelineStr = "videotestsrc ! vp8enc ! " + pipelineStr
case webrtc.VP9:
pipelineStr = "videotestsrc ! vp9enc ! " + pipelineStr
case webrtc.Opus:
pipelineStr = "audiotestsrc ! opusenc ! " + pipelineStr
default:
panic("Unhandled codec " + codec.String())
}
pipelineStrUnsafe := C.CString(pipelineStr)
defer C.free(unsafe.Pointer(pipelineStrUnsafe))
globalPipeline = &Pipeline{
Pipeline: C.gstreamer_send_create_pipeline(),
Pipeline: C.gstreamer_send_create_pipeline(pipelineStrUnsafe),
in: in,
samples: samples,
}
return globalPipeline
}
// This allows cgo to access pipeline, this will not work if you want multiple
var globalPipeline *Pipeline
// Start starts the GStreamer Pipeline
func (p *Pipeline) Start() {
C.gstreamer_send_start_pipeline(p.Pipeline)
@@ -42,9 +60,9 @@ func (p *Pipeline) Stop() {
}
//export goHandlePipelineBuffer
func goHandlePipelineBuffer(buffer unsafe.Pointer, bufferLen C.int) {
func goHandlePipelineBuffer(buffer unsafe.Pointer, bufferLen C.int, samples C.int) {
if globalPipeline != nil {
globalPipeline.in <- C.GoBytes(buffer, bufferLen)
globalPipeline.in <- webrtc.RTCSample{C.GoBytes(buffer, bufferLen), samples}
} else {
fmt.Println("discarding buffer, globalPipeline not set")
}

View File

@@ -8,7 +8,7 @@
extern void goHandlePipelineBuffer(void *buffer, int bufferLen);
GstElement *gstreamer_send_create_pipeline();
GstElement *gstreamer_send_create_pipeline(char *pipeline);
void gstreamer_send_start_pipeline(GstElement *pipeline);
void gstreamer_send_stop_pipeline(GstElement *pipeline);

View File

@@ -31,7 +31,7 @@ func main() {
peerConnection := &webrtc.RTCPeerConnection{}
// Create a video track, and start pushing buffers
in, err := peerConnection.AddTrack(webrtc.VP8)
in, err := peerConnection.AddTrack(webrtc.Opus)
if err != nil {
panic(err)
}
@@ -56,6 +56,6 @@ func main() {
localDescriptionStr := peerConnection.LocalDescription.Marshal()
fmt.Println(base64.StdEncoding.EncodeToString([]byte(localDescriptionStr)))
gst.CreatePipeline(in).Start()
gst.CreatePipeline(webrtc.Opus, in).Start()
select {}
}

View File

@@ -0,0 +1,22 @@
package codecs
import "github.com/pions/webrtc/pkg/rtp"
// OpusPayloader payloads Opus packets
type OpusPayloader struct{}
// Payload fragments an Opus packet across one or more byte arrays
func (p *OpusPayloader) Payload(mtu int, payload []byte) [][]byte {
return [][]byte{payload}
}
// OpusPacket represents the VP8 header that is stored in the payload of an RTP Packet
type OpusPacket struct {
Payload []byte
}
// Unmarshal parses the passed byte slice and stores the result in the OpusPacket this method is called upon
func (p *OpusPacket) Unmarshal(packet *rtp.Packet) error {
p.Payload = packet.Payload
return nil
}

View File

@@ -7,7 +7,7 @@ type Payloader interface {
// Packetizer packetizes a payload
type Packetizer interface {
Packetize(payload []byte) []*Packet
Packetize(payload []byte, samples uint32) []*Packet
}
type packetizer struct {
@@ -17,10 +17,11 @@ type packetizer struct {
Payloader Payloader
Sequencer Sequencer
Timestamp uint32
ClockRate uint32
}
// NewPacketizer returns a new instance of a Packetizer for a specific payloader
func NewPacketizer(mtu int, pt uint8, ssrc uint32, payloader Payloader, sequencer Sequencer) Packetizer {
func NewPacketizer(mtu int, pt uint8, ssrc uint32, payloader Payloader, sequencer Sequencer, clockRate uint32) Packetizer {
return &packetizer{
mtu,
pt,
@@ -28,11 +29,12 @@ func NewPacketizer(mtu int, pt uint8, ssrc uint32, payloader Payloader, sequence
payloader,
sequencer,
0,
clockRate,
}
}
// Packetize packetizes the payload of an RTP packet and returns one or more RTP packets
func (p *packetizer) Packetize(payload []byte) []*Packet {
func (p *packetizer) Packetize(payload []byte, samples uint32) []*Packet {
payloads := p.Payloader.Payload(p.MTU-12, payload)
packets := make([]*Packet, len(payloads))
@@ -48,7 +50,7 @@ func (p *packetizer) Packetize(payload []byte) []*Packet {
SSRC: p.SSRC,
Payload: pp,
}
p.Timestamp++
p.Timestamp += samples
}
return packets

View File

@@ -21,6 +21,11 @@ func init() {
rand.Seed(time.Now().UTC().UnixNano())
}
type RTCSample struct {
Data []byte
Samples uint32
}
// TrackType determines the type of media we are sending receiving
type TrackType int
@@ -117,23 +122,33 @@ func (r *RTCPeerConnection) CreateOffer() error {
// AddTrack adds a new track to the RTCPeerConnection
// This function returns a channel to push buffers on, and an error if the channel can't be added
// Closing the channel ends this stream
func (r *RTCPeerConnection) AddTrack(mediaType TrackType) (buffers chan<- []byte, err error) {
if mediaType != VP8 {
func (r *RTCPeerConnection) AddTrack(mediaType TrackType, clockRate uint32) (samples chan<- RTCSample, err error) {
if mediaType != VP8 && mediaType != Opus {
panic("TODO Discarding packet, need media parsing")
}
trackInput := make(chan []byte, 15)
trackInput := make(chan RTCSample, 15)
go func() {
ssrc := rand.Uint32()
sdpTrack := &sdp.SessionBuilderTrack{SSRC: ssrc}
if mediaType == Opus {
var payloader rtp.Payloader
var payloadType uint8
switch mediaType {
case Opus:
sdpTrack.IsAudio = true
payloader = &codecs.OpusPayloader{}
payloadType = 111
case VP8:
payloader = &codecs.VP8Payloader{}
payloadType = 96
}
r.localTracks = append(r.localTracks, sdpTrack)
packetizer := rtp.NewPacketizer(1500, 96, ssrc, &codecs.VP8Payloader{}, rtp.NewRandomSequencer())
packetizer := rtp.NewPacketizer(1500, payloadType, ssrc, payloader, rtp.NewRandomSequencer(), clockRate)
for {
packets := packetizer.Packetize(<-trackInput)
in := <-trackInput
packets := packetizer.Packetize(in.Data, in.Samples)
for _, p := range packets {
for _, port := range r.ports {
port.Send(p)