diff --git a/gst/cgo_exports.go b/gst/cgo_exports.go index 526fc5f..f40c91a 100644 --- a/gst/cgo_exports.go +++ b/gst/cgo_exports.go @@ -275,9 +275,18 @@ func goLogFunction( } } -// goDestroyBusWatch frees the go memory associated with the bus watch. +// goUnrefGopointerUserData is a GDestroyNotify used to unref the gopointer from the userdata, used for callback functions // -//export goDestroyBusWatch -func goDestroyBusWatch(fPtr C.gpointer) { +//export goUnrefGopointerUserData +func goUnrefGopointerUserData(fPtr C.gpointer) { gopointer.Unref(unsafe.Pointer(fPtr)) } + +// goPromiseChangeFunc is the function the GstPromise calls when it changes state +// +//export goPromiseChangeFunc +func goPromiseChangeFunc(_ *C.GstPromise, fPtr C.gpointer) { + f := gopointer.Restore(unsafe.Pointer(fPtr)).(func()) + + f() +} diff --git a/gst/gst_bus.go b/gst/gst_bus.go index 72f38ee..da6f49d 100644 --- a/gst/gst_bus.go +++ b/gst/gst_bus.go @@ -5,16 +5,16 @@ package gst extern GstBusSyncReply goBusSyncHandler (GstBus * bus, GstMessage * message, gpointer user_data); extern gboolean goBusFunc (GstBus * bus, GstMessage * msg, gpointer user_data); -extern void goDestroyBusWatch (gpointer); +extern void goUnrefGopointerUserData (gpointer); gboolean cgoBusFunc (GstBus * bus, GstMessage * msg, gpointer user_data) { return goBusFunc(bus, msg, user_data); } -void cgoDestroyBusWatch (gpointer data) +void cgoUnrefGopointerUserData (gpointer data) { - goDestroyBusWatch(data); + goUnrefGopointerUserData(data); } GstBusSyncReply cgoBusSyncHandler (GstBus * bus, GstMessage * message, gpointer user_data) @@ -141,7 +141,7 @@ func (b *Bus) AddWatch(busFunc BusWatchFunc) bool { C.G_PRIORITY_DEFAULT, C.GstBusFunc(C.cgoBusFunc), (C.gpointer)(unsafe.Pointer(fPtr)), - C.GDestroyNotify(C.cgoDestroyBusWatch), + C.GDestroyNotify(C.cgoUnrefGopointerUserData), )), ) } @@ -308,7 +308,7 @@ func (b *Bus) SetSyncHandler(f BusSyncHandler) { b.Instance(), C.GstBusSyncHandler(C.cgoBusSyncHandler), (C.gpointer)(unsafe.Pointer(ptr)), - C.GDestroyNotify(C.cgoDestroyBusWatch), + C.GDestroyNotify(C.cgoUnrefGopointerUserData), ) } diff --git a/gst/gst_promise.go b/gst/gst_promise.go new file mode 100644 index 0000000..20ed8a9 --- /dev/null +++ b/gst/gst_promise.go @@ -0,0 +1,202 @@ +package gst + +/* +#include "gst.go.h" + +extern void goPromiseChangeFunc (GstPromise*, gpointer user_data); +extern void cgoUnrefGopointerUserData (gpointer); + +void cgoPromiseChangeFunc (GstPromise *promise, gpointer data) +{ + goPromiseChangeFunc(promise, data); +} +*/ +import "C" +import ( + "context" + "errors" + "fmt" + "runtime" + "unsafe" + + "github.com/go-gst/go-glib/glib" + gopointer "github.com/mattn/go-pointer" +) + +type PromiseResult int + +func (pr PromiseResult) String() string { + switch pr { + case PromiseResultPending: + return "PENDING" + case PromiseResultInterrupted: + return "INTERRUPTED" + case PromiseResultReplied: + return "REPLIED" + case PromiseResultExpired: + return "EXPIRED" + default: + return "UNKNOWN" + } +} + +const ( + //Initial state. Waiting for transition to any other state. + PromiseResultPending = C.GST_PROMISE_RESULT_PENDING + // Interrupted by the consumer as it doesn't want the value anymore. + PromiseResultInterrupted = C.GST_PROMISE_RESULT_INTERRUPTED + // A producer marked a reply + PromiseResultReplied = C.GST_PROMISE_RESULT_REPLIED + // The promise expired (the carrying object lost all refs) and the promise will never be fulfilled. + PromiseResultExpired = C.GST_PROMISE_RESULT_EXPIRED +) + +// Promise is a go wrapper around a GstPromise. +// See: https://gstreamer.freedesktop.org/documentation/gstreamer/gstpromise.html +// +// it can be awaited on-blocking using Await, given the promise was constructed in go and not received from FFI. +type Promise struct { + ptr *C.GstPromise + + // done will be closed when the GstPromise has changed state + done <-chan struct{} +} + +func NewPromise() *Promise { + done := make(chan struct{}) + + fPtr := gopointer.Save(func() { + close(done) + }) + + cprom := C.gst_promise_new_with_change_func( + C.GstPromiseChangeFunc(C.cgoPromiseChangeFunc), + C.gpointer(fPtr), + C.GDestroyNotify(C.cgoUnrefGopointerUserData), + ) + + prom := &Promise{ + ptr: cprom, + done: done, + } + + runtime.SetFinalizer(prom, func(prom *Promise) { + prom.Unref() + }) + + return prom +} + +func (p *Promise) Instance() *C.GstPromise { + return p.ptr +} + +// Ref increases the ref count on the promise. Exposed for completeness sake. Should not be called +// by application code +func (p *Promise) Ref() { + C.gst_promise_ref(p.ptr) +} + +// Unref decreases the ref count on the promise. Exposed for completeness sake. Should not be called +// by application code +func (p *Promise) Unref() { + C.gst_promise_unref(p.ptr) +} + +var ErrPromiseNotReplied = errors.New("promise was not replied") +var ErrNilPromiseReply = errors.New("promise returned a nil reply") + +// ErrCannotAwaitPromise signifies that we do not have a channel that we can await. +// +// this happens if the promise was marshaled from a GValue coming from C +var ErrCannotAwaitPromise = errors.New("promises received from FFI cannot be awaited") + +// Await awaits the promise without blocking the thread. It returns the reply returned by the GstPromise +// +// its implementation is preferred over the blocking gst_promise_wait, which would lock a thread until the +// promise has changed state. +func (p *Promise) Await(ctx context.Context) (*Structure, error) { + if p.done == nil { + return nil, ErrCannotAwaitPromise + } + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-p.done: + } + + // gst_promise_wait will not block here, because the promise has already changed state + result := PromiseResult(C.gst_promise_wait(p.ptr)) + + if result != PromiseResultReplied { + return nil, fmt.Errorf("%w: got %s", ErrPromiseNotReplied, result) + } + + structure := p.GetReply() + + if structure == nil { + return nil, ErrNilPromiseReply + } + + return structure, nil +} + +// GetReply wraps gst_promise_get_reply and returns the structure, which can be nil. +func (p *Promise) GetReply() *Structure { + cstruct := C.gst_promise_get_reply(p.ptr) + + if cstruct == nil { + return nil + } + + structure := wrapStructure(cstruct) + + // the structure is owned by the promise, so we keep the promise alive + // until the structure gets GC'ed + p.Ref() + runtime.SetFinalizer(structure, func(_ *Structure) { + p.Unref() + }) + + return structure +} + +// Expire wraps gst_promise_expire +func (p *Promise) Expire() { + C.gst_promise_expire(p.ptr) +} + +// Interrupt wraps gst_promise_interrupt +func (p *Promise) Interrupt() { + C.gst_promise_interrupt(p.ptr) +} + +// Reply wraps gst_promise_reply +func (p *Promise) Reply(answer *Structure) { + C.gst_promise_reply(p.ptr, answer.Instance()) +} + +var TypePromise = glib.Type(C.GST_TYPE_PROMISE) + +// ToGValue implements glib.ValueTransformer +func (p *Promise) ToGValue() (*glib.Value, error) { + val, err := glib.ValueInit(TypePromise) + if err != nil { + return nil, err + } + val.SetInstance(unsafe.Pointer(p.Instance())) + return val, nil +} + +func marshalPromise(p unsafe.Pointer) (interface{}, error) { + c := C.g_value_get_object(toGValue(p)) + obj := (*C.GstPromise)(unsafe.Pointer(c)) + + prom := &Promise{ + ptr: obj, + done: nil, // cannot be awaited if received from FFI + } + + return prom, nil +} diff --git a/gst/gst_promise_test.go b/gst/gst_promise_test.go new file mode 100644 index 0000000..3d29f42 --- /dev/null +++ b/gst/gst_promise_test.go @@ -0,0 +1,123 @@ +package gst + +import ( + "context" + "errors" + "runtime" + "sync" + "testing" + "time" + "unsafe" +) + +//go:noinline +func awaitGC() { + type tmp struct{ v string } + + setup := make(chan struct{}) + done := make(chan struct{}) + + go func() { + v := &tmp{"foo"} + + runtime.SetFinalizer(v, func(v *tmp) { + close(done) + }) + + close(setup) + }() + + <-setup + runtime.GC() + <-done + runtime.GC() + time.Sleep(1 * time.Second) +} + +func TestPromise(t *testing.T) { + initOnce.Do(func() { + Init(nil) + }) + + prom := NewPromise() + cprom := prom.Instance() + + reply := NewStructure("foo/bar") + errchan := make(chan error) + + go func() { + res, err := prom.Await(context.Background()) + + if err != nil { + errchan <- err + } + + // even though we don't use the promise, the result structure should be still accessible. + // the returned structure is owned by the promise, so the promise must not get GC'ed until + // we don't use the structure anymore + awaitGC() + + if res.Name() != reply.Name() { + errchan <- errors.New("name mismatch") + } + + runtime.GC() + runtime.GC() + runtime.GC() + + close(errchan) + }() + + prom.Reply(reply) + + err := <-errchan + + if err != nil { + t.FailNow() + } + + awaitGC() + + if cprom.parent.refcount != 1 { + panic("refcount too high") + } + + runtime.KeepAlive(prom) + + awaitGC() +} + +var initOnce sync.Once + +func TestPromiseMarshal(t *testing.T) { + initOnce.Do(func() { + Init(nil) + }) + + prom := NewPromise() + + gv, err := prom.ToGValue() + + if err != nil { + t.Fatal(err) + } + + receivedPromI, err := marshalPromise(unsafe.Pointer(gv.GValue)) + + if err != nil { + t.Fatal(err) + } + + receivedProm, ok := receivedPromI.(*Promise) + + if !ok { + t.Fatal("could not cast") + } + + // Awaiting received promise should error immediately + _, err = receivedProm.Await(context.Background()) + + if err == nil { + t.FailNow() + } +} diff --git a/gst/gst_wrappers.go b/gst/gst_wrappers.go index 002956d..64659f9 100644 --- a/gst/gst_wrappers.go +++ b/gst/gst_wrappers.go @@ -234,6 +234,10 @@ func registerMarshalers() { T: TypeValueList, F: marshalValueList, }, + { + T: TypePromise, + F: marshalPromise, + }, { T: glib.Type(C.gst_sample_get_type()), F: marshalSample,