From ebdff9159caa85357c1244d2dd94de5852ba0ca3 Mon Sep 17 00:00:00 2001 From: tinyzimmer <38474291+tinyzimmer@users.noreply.github.com> Date: Mon, 5 Oct 2020 11:13:50 +0300 Subject: [PATCH] add callasync and clean up examples --- examples/appsink/main.go | 1 + examples/custom_events/main.go | 19 ++++++++----- examples/gif-encoder/main.go | 27 ++++++++++++++++--- gst/cgo_exports.go | 7 +++++ gst/gst_element.go | 49 +++++++++++++++++++++++++++++++++- 5 files changed, 93 insertions(+), 10 deletions(-) diff --git a/examples/appsink/main.go b/examples/appsink/main.go index 212c2bb..a951ce4 100644 --- a/examples/appsink/main.go +++ b/examples/appsink/main.go @@ -53,6 +53,7 @@ func createPipeline() (*gst.Pipeline, error) { if sample == nil { return gst.FlowEOS } + defer sample.Unref() // Retrieve the buffer from the sample buffer := sample.GetBuffer() diff --git a/examples/custom_events/main.go b/examples/custom_events/main.go index 1909441..68ad779 100644 --- a/examples/custom_events/main.go +++ b/examples/custom_events/main.go @@ -57,13 +57,20 @@ func createPipeline() (*gst.Pipeline, error) { // Log and act accordingly fmt.Printf("Received custom event with count=%d send_eos=%v\n", customEvent.Count, customEvent.SendEOS) if customEvent.SendEOS { - fmt.Println("Send EOS is true, sending eos") - if !pipeline.SendEvent(gst.NewEOSEvent()) { - fmt.Println("WARNING: Failed to send EOS to pipeline") - } - } else { - fmt.Println("Send EOS is false ignoring") + // We need to use the CallAsync method to send the signal. + // This is becaues the SendEvent method blocks and this could cause a dead lock sending the + // event directly from the probe. This is the near equivalent of using go func() { ... }(), + // however displayed this way for demonstration purposes. + sink.CallAsync(func() { + fmt.Println("Send EOS is true, sending eos") + if !pipeline.SendEvent(gst.NewEOSEvent()) { + fmt.Println("WARNING: Failed to send EOS to pipeline") + } + fmt.Println("Sent EOS") + }) + return gst.PadProbeRemove } + fmt.Println("Send EOS is false ignoring") return gst.PadProbeOK }) diff --git a/examples/gif-encoder/main.go b/examples/gif-encoder/main.go index 3cb430d..9d0ca31 100644 --- a/examples/gif-encoder/main.go +++ b/examples/gif-encoder/main.go @@ -92,6 +92,9 @@ func encodeGif(mainLoop *gst.MainLoop) error { appSink, _ := app.NewAppSink() pipeline.Add(appSink.Element) jpegenc.Link(appSink.Element) + appSink.SyncStateWithParent() + + appSink.SetWaitOnEOS(false) // We can query the decodebin for the duration of the video it received. We can then // use this value to calculate the total number of frames we expect to produce. @@ -113,21 +116,37 @@ func encodeGif(mainLoop *gst.MainLoop) error { var frameNum int appSink.SetCallbacks(&app.SinkCallbacks{ NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { + if appSink.IsEOS() { + return gst.FlowEOS + } + // Increment the frame number counter frameNum++ if frameNum > totalFrames { // If we've reached the total number of frames we are expecting. We can // signal the main loop to quit. - mainLoop.Quit() + // This needs to be done from a goroutine to not block the app sink + // callback. + appSink.CallAsync(func() { + pipeline.SendEvent(gst.NewEOSEvent()) + }) return gst.FlowEOS } - fmt.Printf("\033[2K\rProcessing image frame %d/%d", frameNum, totalFrames) + // Pull the sample from the sink + sample := sink.PullSample() + if sample == nil { + return gst.FlowOK + } + defer sample.Unref() + + fmt.Printf("\033[2K\r") + fmt.Printf("Processing image frame %d/%d\n", frameNum, totalFrames) // We can retrieve a reader with the raw bytes of the image directly from the // sink. - imgReader := sink.PullSample().GetBuffer().Reader() + imgReader := sample.GetBuffer().Reader() img, err := jpeg.Decode(imgReader) if err != nil { @@ -164,6 +183,8 @@ func encodeGif(mainLoop *gst.MainLoop) error { var isError bool pipeline.GetPipelineBus().AddWatch(func(msg *gst.Message) bool { switch msg.Type() { + case gst.MessageEOS: + mainLoop.Quit() case gst.MessageError: err := msg.ParseError() fmt.Println("ERROR:", err.Error()) diff --git a/gst/cgo_exports.go b/gst/cgo_exports.go index 339445f..53cc764 100644 --- a/gst/cgo_exports.go +++ b/gst/cgo_exports.go @@ -13,6 +13,13 @@ import ( gopointer "github.com/mattn/go-pointer" ) +//export goElementCallAsync +func goElementCallAsync(element *C.GstElement, userData C.gpointer) { + iface := gopointer.Restore(unsafe.Pointer(userData)) + f := iface.(func()) + f() +} + //export goPadStickyEventForEachFunc func goPadStickyEventForEachFunc(gpad *C.GstPad, event **C.GstEvent, userData C.gpointer) C.gboolean { cbIface := gopointer.Restore(unsafe.Pointer(userData)) diff --git a/gst/gst_element.go b/gst/gst_element.go index 9722593..1f0056a 100644 --- a/gst/gst_element.go +++ b/gst/gst_element.go @@ -1,6 +1,22 @@ package gst -// #include "gst.go.h" +/* +#include "gst.go.h" + +extern void goGDestroyNotifyFuncNoRun (gpointer user_data); +extern void goElementCallAsync (GstElement * element, gpointer user_data); + +void cgoElementAsyncDestroyNotify (gpointer user_data) +{ + goGDestroyNotifyFuncNoRun(user_data); +} + +void cgoElementCallAsync (GstElement * element, gpointer user_data) +{ + goElementCallAsync(element, user_data); +} + +*/ import "C" import ( @@ -8,6 +24,7 @@ import ( "unsafe" "github.com/gotk3/gotk3/glib" + gopointer "github.com/mattn/go-pointer" ) // Element is a Go wrapper around a GstElement. @@ -285,3 +302,33 @@ func (e *Element) Emit(signal string, args ...interface{}) (interface{}, error) func (e *Element) SyncStateWithParent() bool { return gobool(C.gst_element_sync_state_with_parent(e.Instance())) } + +// AbortState aborts the state change of the element. This function is used by elements that do asynchronous state changes +// and find out something is wrong. +func (e *Element) AbortState() { C.gst_element_abort_state(e.Instance()) } + +// AddPad adds a pad (link point) to element. pad's parent will be set to element +// +// Pads are automatically activated when added in the PAUSED or PLAYING state. +// +// The pad and the element should be unlocked when calling this function. +// +// This function will emit the pad-added signal on the element. +func (e *Element) AddPad(pad *Pad) bool { + return gobool(C.gst_element_add_pad(e.Instance(), pad.Instance())) +} + +// CallAsync calls f from another thread. This is to be used for cases when a state change has to be performed from a streaming +// thread, directly via SetState or indirectly e.g. via SEEK events. +// +// Calling those functions directly from the streaming thread will cause deadlocks in many situations, as they might involve waiting +// for the streaming thread to shut down from this very streaming thread. +func (e *Element) CallAsync(f func()) { + ptr := gopointer.Save(f) + C.gst_element_call_async( + e.Instance(), + C.GstElementCallAsyncFunc(C.cgoElementCallAsync), + (C.gpointer)(unsafe.Pointer(ptr)), + C.GDestroyNotify(C.cgoElementAsyncDestroyNotify), + ) +}