add support for GstPromise

This commit is contained in:
RSWilli
2024-08-20 17:56:36 +02:00
parent bf761a7aa1
commit 32a6a0568d
5 changed files with 346 additions and 8 deletions

View File

@@ -275,9 +275,18 @@ func goLogFunction(
}
}
// goDestroyBusWatch frees the go memory associated with the bus watch.
// goUnrefGopointerUserData is a GDestroyNotify used to unref the gopointer from the userdata, used for callback functions
//
//export goDestroyBusWatch
func goDestroyBusWatch(fPtr C.gpointer) {
//export goUnrefGopointerUserData
func goUnrefGopointerUserData(fPtr C.gpointer) {
gopointer.Unref(unsafe.Pointer(fPtr))
}
// goPromiseChangeFunc is the function the GstPromise calls when it changes state
//
//export goPromiseChangeFunc
func goPromiseChangeFunc(_ *C.GstPromise, fPtr C.gpointer) {
f := gopointer.Restore(unsafe.Pointer(fPtr)).(func())
f()
}

View File

@@ -5,16 +5,16 @@ package gst
extern GstBusSyncReply goBusSyncHandler (GstBus * bus, GstMessage * message, gpointer user_data);
extern gboolean goBusFunc (GstBus * bus, GstMessage * msg, gpointer user_data);
extern void goDestroyBusWatch (gpointer);
extern void goUnrefGopointerUserData (gpointer);
gboolean cgoBusFunc (GstBus * bus, GstMessage * msg, gpointer user_data)
{
return goBusFunc(bus, msg, user_data);
}
void cgoDestroyBusWatch (gpointer data)
void cgoUnrefGopointerUserData (gpointer data)
{
goDestroyBusWatch(data);
goUnrefGopointerUserData(data);
}
GstBusSyncReply cgoBusSyncHandler (GstBus * bus, GstMessage * message, gpointer user_data)
@@ -141,7 +141,7 @@ func (b *Bus) AddWatch(busFunc BusWatchFunc) bool {
C.G_PRIORITY_DEFAULT,
C.GstBusFunc(C.cgoBusFunc),
(C.gpointer)(unsafe.Pointer(fPtr)),
C.GDestroyNotify(C.cgoDestroyBusWatch),
C.GDestroyNotify(C.cgoUnrefGopointerUserData),
)),
)
}
@@ -308,7 +308,7 @@ func (b *Bus) SetSyncHandler(f BusSyncHandler) {
b.Instance(),
C.GstBusSyncHandler(C.cgoBusSyncHandler),
(C.gpointer)(unsafe.Pointer(ptr)),
C.GDestroyNotify(C.cgoDestroyBusWatch),
C.GDestroyNotify(C.cgoUnrefGopointerUserData),
)
}

202
gst/gst_promise.go Normal file
View File

@@ -0,0 +1,202 @@
package gst
/*
#include "gst.go.h"
extern void goPromiseChangeFunc (GstPromise*, gpointer user_data);
extern void cgoUnrefGopointerUserData (gpointer);
void cgoPromiseChangeFunc (GstPromise *promise, gpointer data)
{
goPromiseChangeFunc(promise, data);
}
*/
import "C"
import (
"context"
"errors"
"fmt"
"runtime"
"unsafe"
"github.com/go-gst/go-glib/glib"
gopointer "github.com/mattn/go-pointer"
)
type PromiseResult int
func (pr PromiseResult) String() string {
switch pr {
case PromiseResultPending:
return "PENDING"
case PromiseResultInterrupted:
return "INTERRUPTED"
case PromiseResultReplied:
return "REPLIED"
case PromiseResultExpired:
return "EXPIRED"
default:
return "UNKNOWN"
}
}
const (
//Initial state. Waiting for transition to any other state.
PromiseResultPending = C.GST_PROMISE_RESULT_PENDING
// Interrupted by the consumer as it doesn't want the value anymore.
PromiseResultInterrupted = C.GST_PROMISE_RESULT_INTERRUPTED
// A producer marked a reply
PromiseResultReplied = C.GST_PROMISE_RESULT_REPLIED
// The promise expired (the carrying object lost all refs) and the promise will never be fulfilled.
PromiseResultExpired = C.GST_PROMISE_RESULT_EXPIRED
)
// Promise is a go wrapper around a GstPromise.
// See: https://gstreamer.freedesktop.org/documentation/gstreamer/gstpromise.html
//
// it can be awaited on-blocking using Await, given the promise was constructed in go and not received from FFI.
type Promise struct {
ptr *C.GstPromise
// done will be closed when the GstPromise has changed state
done <-chan struct{}
}
func NewPromise() *Promise {
done := make(chan struct{})
fPtr := gopointer.Save(func() {
close(done)
})
cprom := C.gst_promise_new_with_change_func(
C.GstPromiseChangeFunc(C.cgoPromiseChangeFunc),
C.gpointer(fPtr),
C.GDestroyNotify(C.cgoUnrefGopointerUserData),
)
prom := &Promise{
ptr: cprom,
done: done,
}
runtime.SetFinalizer(prom, func(prom *Promise) {
prom.Unref()
})
return prom
}
func (p *Promise) Instance() *C.GstPromise {
return p.ptr
}
// Ref increases the ref count on the promise. Exposed for completeness sake. Should not be called
// by application code
func (p *Promise) Ref() {
C.gst_promise_ref(p.ptr)
}
// Unref decreases the ref count on the promise. Exposed for completeness sake. Should not be called
// by application code
func (p *Promise) Unref() {
C.gst_promise_unref(p.ptr)
}
var ErrPromiseNotReplied = errors.New("promise was not replied")
var ErrNilPromiseReply = errors.New("promise returned a nil reply")
// ErrCannotAwaitPromise signifies that we do not have a channel that we can await.
//
// this happens if the promise was marshaled from a GValue coming from C
var ErrCannotAwaitPromise = errors.New("promises received from FFI cannot be awaited")
// Await awaits the promise without blocking the thread. It returns the reply returned by the GstPromise
//
// its implementation is preferred over the blocking gst_promise_wait, which would lock a thread until the
// promise has changed state.
func (p *Promise) Await(ctx context.Context) (*Structure, error) {
if p.done == nil {
return nil, ErrCannotAwaitPromise
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-p.done:
}
// gst_promise_wait will not block here, because the promise has already changed state
result := PromiseResult(C.gst_promise_wait(p.ptr))
if result != PromiseResultReplied {
return nil, fmt.Errorf("%w: got %s", ErrPromiseNotReplied, result)
}
structure := p.GetReply()
if structure == nil {
return nil, ErrNilPromiseReply
}
return structure, nil
}
// GetReply wraps gst_promise_get_reply and returns the structure, which can be nil.
func (p *Promise) GetReply() *Structure {
cstruct := C.gst_promise_get_reply(p.ptr)
if cstruct == nil {
return nil
}
structure := wrapStructure(cstruct)
// the structure is owned by the promise, so we keep the promise alive
// until the structure gets GC'ed
p.Ref()
runtime.SetFinalizer(structure, func(_ *Structure) {
p.Unref()
})
return structure
}
// Expire wraps gst_promise_expire
func (p *Promise) Expire() {
C.gst_promise_expire(p.ptr)
}
// Interrupt wraps gst_promise_interrupt
func (p *Promise) Interrupt() {
C.gst_promise_interrupt(p.ptr)
}
// Reply wraps gst_promise_reply
func (p *Promise) Reply(answer *Structure) {
C.gst_promise_reply(p.ptr, answer.Instance())
}
var TypePromise = glib.Type(C.GST_TYPE_PROMISE)
// ToGValue implements glib.ValueTransformer
func (p *Promise) ToGValue() (*glib.Value, error) {
val, err := glib.ValueInit(TypePromise)
if err != nil {
return nil, err
}
val.SetInstance(unsafe.Pointer(p.Instance()))
return val, nil
}
func marshalPromise(p unsafe.Pointer) (interface{}, error) {
c := C.g_value_get_object(toGValue(p))
obj := (*C.GstPromise)(unsafe.Pointer(c))
prom := &Promise{
ptr: obj,
done: nil, // cannot be awaited if received from FFI
}
return prom, nil
}

123
gst/gst_promise_test.go Normal file
View File

@@ -0,0 +1,123 @@
package gst
import (
"context"
"errors"
"runtime"
"sync"
"testing"
"time"
"unsafe"
)
//go:noinline
func awaitGC() {
type tmp struct{ v string }
setup := make(chan struct{})
done := make(chan struct{})
go func() {
v := &tmp{"foo"}
runtime.SetFinalizer(v, func(v *tmp) {
close(done)
})
close(setup)
}()
<-setup
runtime.GC()
<-done
runtime.GC()
time.Sleep(1 * time.Second)
}
func TestPromise(t *testing.T) {
initOnce.Do(func() {
Init(nil)
})
prom := NewPromise()
cprom := prom.Instance()
reply := NewStructure("foo/bar")
errchan := make(chan error)
go func() {
res, err := prom.Await(context.Background())
if err != nil {
errchan <- err
}
// even though we don't use the promise, the result structure should be still accessible.
// the returned structure is owned by the promise, so the promise must not get GC'ed until
// we don't use the structure anymore
awaitGC()
if res.Name() != reply.Name() {
errchan <- errors.New("name mismatch")
}
runtime.GC()
runtime.GC()
runtime.GC()
close(errchan)
}()
prom.Reply(reply)
err := <-errchan
if err != nil {
t.FailNow()
}
awaitGC()
if cprom.parent.refcount != 1 {
panic("refcount too high")
}
runtime.KeepAlive(prom)
awaitGC()
}
var initOnce sync.Once
func TestPromiseMarshal(t *testing.T) {
initOnce.Do(func() {
Init(nil)
})
prom := NewPromise()
gv, err := prom.ToGValue()
if err != nil {
t.Fatal(err)
}
receivedPromI, err := marshalPromise(unsafe.Pointer(gv.GValue))
if err != nil {
t.Fatal(err)
}
receivedProm, ok := receivedPromI.(*Promise)
if !ok {
t.Fatal("could not cast")
}
// Awaiting received promise should error immediately
_, err = receivedProm.Await(context.Background())
if err == nil {
t.FailNow()
}
}

View File

@@ -234,6 +234,10 @@ func registerMarshalers() {
T: TypeValueList,
F: marshalValueList,
},
{
T: TypePromise,
F: marshalPromise,
},
{
T: glib.Type(C.gst_sample_get_type()),
F: marshalSample,