Implement multi-pipeline gstreamer-send example

This commit is contained in:
Sean DuBois
2018-07-03 17:11:56 -07:00
parent 912a8e18f8
commit b1da546d24
5 changed files with 60 additions and 31 deletions

View File

@@ -14,7 +14,7 @@ go get github.com/pions/webrtc/examples/gstreamer-send
```
### Open gstreamer-send example page
[jsfiddle.net](http://jsfiddle.net/12kan4j5/) you should see two text-areas and a 'Start Session' button
[jsfiddle.net](http://jsfiddle.net/12kan4j5/4/) you should see two text-areas and a 'Start Session' button
### Run gstreamer-send with your browsers SessionDescription as stdin
In the jsfiddle the top textarea is your browser, copy that and:

View File

@@ -2,6 +2,17 @@
#include <gst/app/gstappsrc.h>
typedef struct SampleHandlerUserData {
int pipelineId;
} SampleHandlerUserData;
GMainLoop *main_loop = NULL;
void gstreamer_send_mainloop(void) {
main_loop = g_main_loop_new(NULL, FALSE);
g_main_loop_run(main_loop);
}
static gboolean gstreamer_send_bus_call(GstBus *bus, GstMessage *msg, gpointer data) {
GMainLoop *loop = (GMainLoop *)data;
@@ -36,13 +47,14 @@ GstFlowReturn gstreamer_send_new_sample_handler(GstElement *object, gpointer use
GstBuffer *buffer = NULL;
gpointer copy = NULL;
gsize copy_size = 0;
SampleHandlerUserData *s = (SampleHandlerUserData *)user_data;
g_signal_emit_by_name (object, "pull-sample", &sample);
if (sample) {
buffer = gst_sample_get_buffer(sample);
if (buffer) {
gst_buffer_extract_dup(buffer, 0, gst_buffer_get_size(buffer), &copy, &copy_size);
goHandlePipelineBuffer(copy, copy_size);
goHandlePipelineBuffer(copy, copy_size, 0, s->pipelineId);
}
gst_sample_unref (sample);
}
@@ -56,26 +68,21 @@ GstElement *gstreamer_send_create_pipeline(char *pipeline) {
return gst_parse_launch(pipeline, &error);
}
void gstreamer_send_start_pipeline(GstElement *pipeline) {
void gstreamer_send_start_pipeline(GstElement *pipeline, int pipelineId) {
GMainLoop *loop = g_main_loop_new(NULL, FALSE);
SampleHandlerUserData *s = calloc(1, sizeof(SampleHandlerUserData));
s->pipelineId = pipelineId;
GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline));
guint bus_watch_id = gst_bus_add_watch(bus, gstreamer_send_bus_call, loop);
gst_object_unref(bus);
GstElement *appsink = gst_bin_get_by_name(GST_BIN(pipeline), "appsink");
g_object_set(appsink, "emit-signals", TRUE, NULL);
g_signal_connect(appsink, "new-sample", G_CALLBACK(gstreamer_send_new_sample_handler), appsink);
g_signal_connect(appsink, "new-sample", G_CALLBACK(gstreamer_send_new_sample_handler), s);
gst_element_set_state(pipeline, GST_STATE_PLAYING);
g_main_loop_run(loop);
gst_element_set_state(pipeline, GST_STATE_NULL);
gst_object_unref(GST_OBJECT(pipeline));
g_source_remove(bus_watch_id);
g_main_loop_unref(loop);
}
void gstreamer_send_stop_pipeline(GstElement *pipeline) {

View File

@@ -9,21 +9,29 @@ package gst
import "C"
import (
"fmt"
"github.com/pions/webrtc"
"sync"
"unsafe"
"github.com/pions/webrtc"
)
func init() {
go C.gstreamer_send_mainloop()
}
// Pipeline is a wrapper for a GStreamer Pipeline
type Pipeline struct {
Pipeline *C.GstElement
in chan<- webrtc.RTCSample
samples uint32
id int
}
var pipelines = make(map[int]*Pipeline)
var pipelinesLock sync.Mutex
// CreatePipeline creates a GStreamer Pipeline
func CreatePipeline(codec webrtc.TrackType, in chan<- webrtc.RTCSample) *Pipeline {
pipelineStr := "appsink name=appsink"
var samples uint32
switch codec {
case webrtc.VP8:
pipelineStr = "videotestsrc ! vp8enc ! " + pipelineStr
@@ -37,21 +45,23 @@ func CreatePipeline(codec webrtc.TrackType, in chan<- webrtc.RTCSample) *Pipelin
pipelineStrUnsafe := C.CString(pipelineStr)
defer C.free(unsafe.Pointer(pipelineStrUnsafe))
globalPipeline = &Pipeline{
pipelinesLock.Lock()
defer pipelinesLock.Unlock()
pipeline := &Pipeline{
Pipeline: C.gstreamer_send_create_pipeline(pipelineStrUnsafe),
in: in,
samples: samples,
id: len(pipelines),
}
return globalPipeline
pipelines[pipeline.id] = pipeline
return pipeline
}
// This allows cgo to access pipeline, this will not work if you want multiple
var globalPipeline *Pipeline
// Start starts the GStreamer Pipeline
func (p *Pipeline) Start() {
C.gstreamer_send_start_pipeline(p.Pipeline)
C.gstreamer_send_start_pipeline(p.Pipeline, C.int(p.id))
}
// Stop stops the GStreamer Pipeline
@@ -60,11 +70,14 @@ func (p *Pipeline) Stop() {
}
//export goHandlePipelineBuffer
func goHandlePipelineBuffer(buffer unsafe.Pointer, bufferLen C.int, samples C.int) {
if globalPipeline != nil {
globalPipeline.in <- webrtc.RTCSample{C.GoBytes(buffer, bufferLen), samples}
func goHandlePipelineBuffer(buffer unsafe.Pointer, bufferLen C.int, samples C.int, pipelineId C.int) {
pipelinesLock.Lock()
defer pipelinesLock.Unlock()
if pipeline, ok := pipelines[int(pipelineId)]; ok {
pipeline.in <- webrtc.RTCSample{C.GoBytes(buffer, bufferLen), uint32(samples)}
} else {
fmt.Println("discarding buffer, globalPipeline not set")
fmt.Printf("discarding buffer, no pipeline with id %d", int(pipelineId))
}
C.free(buffer)
}

View File

@@ -6,10 +6,11 @@
#include <stdint.h>
#include <stdlib.h>
extern void goHandlePipelineBuffer(void *buffer, int bufferLen);
extern void goHandlePipelineBuffer(void *buffer, int bufferLen, int samples, int pipelineId);
GstElement *gstreamer_send_create_pipeline(char *pipeline);
void gstreamer_send_start_pipeline(GstElement *pipeline);
void gstreamer_send_start_pipeline(GstElement *pipeline, int pipelineId);
void gstreamer_send_stop_pipeline(GstElement *pipeline);
void gstreamer_send_mainloop(void);
#endif

View File

@@ -30,8 +30,14 @@ func main() {
// Create a new RTCPeerConnection
peerConnection := &webrtc.RTCPeerConnection{}
// Create a video track, and start pushing buffers
in, err := peerConnection.AddTrack(webrtc.Opus)
// Create a audio track
opusIn, err := peerConnection.AddTrack(webrtc.Opus, 48000)
if err != nil {
panic(err)
}
// Create a video track
vp8In, err := peerConnection.AddTrack(webrtc.VP8, 90000)
if err != nil {
panic(err)
}
@@ -56,6 +62,8 @@ func main() {
localDescriptionStr := peerConnection.LocalDescription.Marshal()
fmt.Println(base64.StdEncoding.EncodeToString([]byte(localDescriptionStr)))
gst.CreatePipeline(webrtc.Opus, in).Start()
// Start pushing buffers on these tracks
gst.CreatePipeline(webrtc.Opus, opusIn).Start()
gst.CreatePipeline(webrtc.VP8, vp8In).Start()
select {}
}