add callasync and clean up examples

This commit is contained in:
tinyzimmer
2020-10-05 11:13:50 +03:00
parent 121a651b7f
commit ebdff9159c
5 changed files with 93 additions and 10 deletions

View File

@@ -53,6 +53,7 @@ func createPipeline() (*gst.Pipeline, error) {
if sample == nil { if sample == nil {
return gst.FlowEOS return gst.FlowEOS
} }
defer sample.Unref()
// Retrieve the buffer from the sample // Retrieve the buffer from the sample
buffer := sample.GetBuffer() buffer := sample.GetBuffer()

View File

@@ -57,13 +57,20 @@ func createPipeline() (*gst.Pipeline, error) {
// Log and act accordingly // Log and act accordingly
fmt.Printf("Received custom event with count=%d send_eos=%v\n", customEvent.Count, customEvent.SendEOS) fmt.Printf("Received custom event with count=%d send_eos=%v\n", customEvent.Count, customEvent.SendEOS)
if customEvent.SendEOS { if customEvent.SendEOS {
// We need to use the CallAsync method to send the signal.
// This is becaues the SendEvent method blocks and this could cause a dead lock sending the
// event directly from the probe. This is the near equivalent of using go func() { ... }(),
// however displayed this way for demonstration purposes.
sink.CallAsync(func() {
fmt.Println("Send EOS is true, sending eos") fmt.Println("Send EOS is true, sending eos")
if !pipeline.SendEvent(gst.NewEOSEvent()) { if !pipeline.SendEvent(gst.NewEOSEvent()) {
fmt.Println("WARNING: Failed to send EOS to pipeline") fmt.Println("WARNING: Failed to send EOS to pipeline")
} }
} else { fmt.Println("Sent EOS")
fmt.Println("Send EOS is false ignoring") })
return gst.PadProbeRemove
} }
fmt.Println("Send EOS is false ignoring")
return gst.PadProbeOK return gst.PadProbeOK
}) })

View File

@@ -92,6 +92,9 @@ func encodeGif(mainLoop *gst.MainLoop) error {
appSink, _ := app.NewAppSink() appSink, _ := app.NewAppSink()
pipeline.Add(appSink.Element) pipeline.Add(appSink.Element)
jpegenc.Link(appSink.Element) jpegenc.Link(appSink.Element)
appSink.SyncStateWithParent()
appSink.SetWaitOnEOS(false)
// We can query the decodebin for the duration of the video it received. We can then // We can query the decodebin for the duration of the video it received. We can then
// use this value to calculate the total number of frames we expect to produce. // use this value to calculate the total number of frames we expect to produce.
@@ -113,21 +116,37 @@ func encodeGif(mainLoop *gst.MainLoop) error {
var frameNum int var frameNum int
appSink.SetCallbacks(&app.SinkCallbacks{ appSink.SetCallbacks(&app.SinkCallbacks{
NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { NewSampleFunc: func(sink *app.Sink) gst.FlowReturn {
if appSink.IsEOS() {
return gst.FlowEOS
}
// Increment the frame number counter // Increment the frame number counter
frameNum++ frameNum++
if frameNum > totalFrames { if frameNum > totalFrames {
// If we've reached the total number of frames we are expecting. We can // If we've reached the total number of frames we are expecting. We can
// signal the main loop to quit. // signal the main loop to quit.
mainLoop.Quit() // This needs to be done from a goroutine to not block the app sink
// callback.
appSink.CallAsync(func() {
pipeline.SendEvent(gst.NewEOSEvent())
})
return gst.FlowEOS return gst.FlowEOS
} }
fmt.Printf("\033[2K\rProcessing image frame %d/%d", frameNum, totalFrames) // Pull the sample from the sink
sample := sink.PullSample()
if sample == nil {
return gst.FlowOK
}
defer sample.Unref()
fmt.Printf("\033[2K\r")
fmt.Printf("Processing image frame %d/%d\n", frameNum, totalFrames)
// We can retrieve a reader with the raw bytes of the image directly from the // We can retrieve a reader with the raw bytes of the image directly from the
// sink. // sink.
imgReader := sink.PullSample().GetBuffer().Reader() imgReader := sample.GetBuffer().Reader()
img, err := jpeg.Decode(imgReader) img, err := jpeg.Decode(imgReader)
if err != nil { if err != nil {
@@ -164,6 +183,8 @@ func encodeGif(mainLoop *gst.MainLoop) error {
var isError bool var isError bool
pipeline.GetPipelineBus().AddWatch(func(msg *gst.Message) bool { pipeline.GetPipelineBus().AddWatch(func(msg *gst.Message) bool {
switch msg.Type() { switch msg.Type() {
case gst.MessageEOS:
mainLoop.Quit()
case gst.MessageError: case gst.MessageError:
err := msg.ParseError() err := msg.ParseError()
fmt.Println("ERROR:", err.Error()) fmt.Println("ERROR:", err.Error())

View File

@@ -13,6 +13,13 @@ import (
gopointer "github.com/mattn/go-pointer" gopointer "github.com/mattn/go-pointer"
) )
//export goElementCallAsync
func goElementCallAsync(element *C.GstElement, userData C.gpointer) {
iface := gopointer.Restore(unsafe.Pointer(userData))
f := iface.(func())
f()
}
//export goPadStickyEventForEachFunc //export goPadStickyEventForEachFunc
func goPadStickyEventForEachFunc(gpad *C.GstPad, event **C.GstEvent, userData C.gpointer) C.gboolean { func goPadStickyEventForEachFunc(gpad *C.GstPad, event **C.GstEvent, userData C.gpointer) C.gboolean {
cbIface := gopointer.Restore(unsafe.Pointer(userData)) cbIface := gopointer.Restore(unsafe.Pointer(userData))

View File

@@ -1,6 +1,22 @@
package gst package gst
// #include "gst.go.h" /*
#include "gst.go.h"
extern void goGDestroyNotifyFuncNoRun (gpointer user_data);
extern void goElementCallAsync (GstElement * element, gpointer user_data);
void cgoElementAsyncDestroyNotify (gpointer user_data)
{
goGDestroyNotifyFuncNoRun(user_data);
}
void cgoElementCallAsync (GstElement * element, gpointer user_data)
{
goElementCallAsync(element, user_data);
}
*/
import "C" import "C"
import ( import (
@@ -8,6 +24,7 @@ import (
"unsafe" "unsafe"
"github.com/gotk3/gotk3/glib" "github.com/gotk3/gotk3/glib"
gopointer "github.com/mattn/go-pointer"
) )
// Element is a Go wrapper around a GstElement. // Element is a Go wrapper around a GstElement.
@@ -285,3 +302,33 @@ func (e *Element) Emit(signal string, args ...interface{}) (interface{}, error)
func (e *Element) SyncStateWithParent() bool { func (e *Element) SyncStateWithParent() bool {
return gobool(C.gst_element_sync_state_with_parent(e.Instance())) return gobool(C.gst_element_sync_state_with_parent(e.Instance()))
} }
// AbortState aborts the state change of the element. This function is used by elements that do asynchronous state changes
// and find out something is wrong.
func (e *Element) AbortState() { C.gst_element_abort_state(e.Instance()) }
// AddPad adds a pad (link point) to element. pad's parent will be set to element
//
// Pads are automatically activated when added in the PAUSED or PLAYING state.
//
// The pad and the element should be unlocked when calling this function.
//
// This function will emit the pad-added signal on the element.
func (e *Element) AddPad(pad *Pad) bool {
return gobool(C.gst_element_add_pad(e.Instance(), pad.Instance()))
}
// CallAsync calls f from another thread. This is to be used for cases when a state change has to be performed from a streaming
// thread, directly via SetState or indirectly e.g. via SEEK events.
//
// Calling those functions directly from the streaming thread will cause deadlocks in many situations, as they might involve waiting
// for the streaming thread to shut down from this very streaming thread.
func (e *Element) CallAsync(f func()) {
ptr := gopointer.Save(f)
C.gst_element_call_async(
e.Instance(),
C.GstElementCallAsyncFunc(C.cgoElementCallAsync),
(C.gpointer)(unsafe.Pointer(ptr)),
C.GDestroyNotify(C.cgoElementAsyncDestroyNotify),
)
}