finish AppSink impl and add example

This commit is contained in:
tinyzimmer
2020-10-03 15:55:47 +03:00
parent 36df9e707b
commit 3870298b98
8 changed files with 481 additions and 34 deletions

113
examples/appsink/main.go Normal file
View File

@@ -0,0 +1,113 @@
package main
import (
"fmt"
"math"
"time"
"github.com/tinyzimmer/go-gst/examples"
"github.com/tinyzimmer/go-gst/gst"
"github.com/tinyzimmer/go-gst/gst/app"
)
func createPipeline() *gst.Pipeline {
gst.Init(nil)
pipeline, err := gst.NewPipeline("")
if err != nil {
panic(err)
}
src, err := gst.NewElement("audiotestsrc")
if err != nil {
panic(err)
}
sink, err := app.NewAppSink()
if err != nil {
panic(err)
}
pipeline.AddMany(src, sink.Element)
src.Link(sink.Element)
// Tell the appsink what format we want. It will then be the audiotestsrc's job to
// provide the format we request.
// This can be set after linking the two objects, because format negotiation between
// both elements will happen during pre-rolling of the pipeline.
sink.SetCaps(gst.NewCapsFromString(
"audio/x-raw, format=S16LE, layout=interleaved, channels=1",
))
// Getting data out of the appsink is done by setting callbacks on it.
// The appsink will then call those handlers, as soon as data is available.
sink.SetCallbacks(&app.SinkCallbacks{
// Add a "new-sample" callback
NewSampleFunc: func(sink *app.Sink) gst.FlowReturn {
// Pull the sample that triggered this callback
sample := sink.PullSample()
if sample == nil {
return gst.FlowEOS
}
// Retrieve the buffer from the sample
buffer := sample.GetBuffer()
if buffer == nil {
return gst.FlowError
}
// At this point, buffer is only a reference to an existing memory region somewhere.
// When we want to access its content, we have to map it while requesting the required
// mode of access (read, read/write).
//
// We also know what format to expect because we set it with the caps. So we convert
// the map directly to signed 16-bit integers.
samples := buffer.Map(gst.MapRead).AsInt16Slice()
// Calculate the root mean square for the buffer
// (https://en.wikipedia.org/wiki/Root_mean_square)
var square float64
for _, i := range samples {
square += float64(i * i)
}
rms := math.Sqrt(square / float64(len(samples)))
fmt.Println("rms:", rms)
return gst.FlowOK
},
})
return pipeline
}
func mainLoop(pipeline *gst.Pipeline) error {
defer pipeline.Destroy()
pipeline.SetState(gst.StatePlaying)
bus := pipeline.GetPipelineBus()
for {
msg := bus.TimedPop(time.Duration(-1))
if msg == nil {
break
}
switch msg.Type() {
case gst.MessageEOS:
break
case gst.MessageError:
return msg.ParseError()
}
}
return nil
}
func main() {
examples.Run(func() error {
pipeline := createPipeline()
return mainLoop(pipeline)
})
}

23
examples/common.go Normal file
View File

@@ -0,0 +1,23 @@
package examples
import (
"fmt"
"github.com/tinyzimmer/go-gst/gst"
)
// Run is used to wrap the given function in a main loop and print any error
func Run(f func() error) {
mainLoop := gst.NewMainLoop(gst.DefaultMainContext(), false)
defer mainLoop.Unref()
go func() {
if err := f(); err != nil {
fmt.Println("ERROR!", err)
}
mainLoop.Quit()
}()
mainLoop.Run()
}

65
gst/app/cgo_exports.go Normal file
View File

@@ -0,0 +1,65 @@
package app
// #include "gst.go.h"
import "C"
import (
"unsafe"
gopointer "github.com/mattn/go-pointer"
"github.com/tinyzimmer/go-gst/gst"
)
func getCbsFromPtr(userData C.gpointer) *SinkCallbacks {
ptr := gopointer.Restore(unsafe.Pointer(userData))
cbs, ok := ptr.(*SinkCallbacks)
if !ok {
gopointer.Unref(unsafe.Pointer(userData))
return nil
}
return cbs
}
func wrapCSink(sink *C.GstAppSink) *Sink {
return wrapAppSink(gst.FromGstElementUnsafe(unsafe.Pointer(sink)))
}
//export goSinkEOSCb
func goSinkEOSCb(sink *C.GstAppSink, userData C.gpointer) {
cbs := getCbsFromPtr(userData)
if cbs == nil {
return
}
if cbs.EOSFunc == nil {
return
}
cbs.EOSFunc(wrapCSink(sink))
}
//export goSinkNewPrerollCb
func goSinkNewPrerollCb(sink *C.GstAppSink, userData C.gpointer) C.GstFlowReturn {
cbs := getCbsFromPtr(userData)
if cbs == nil {
return C.GstFlowReturn(gst.FlowError)
}
if cbs.NewPrerollFunc == nil {
return C.GstFlowReturn(gst.FlowOK)
}
return C.GstFlowReturn(cbs.NewPrerollFunc(wrapCSink(sink)))
}
//export goSinkNewSampleCb
func goSinkNewSampleCb(sink *C.GstAppSink, userData C.gpointer) C.GstFlowReturn {
cbs := getCbsFromPtr(userData)
if cbs == nil {
return C.GstFlowReturn(gst.FlowError)
}
if cbs.NewSampleFunc == nil {
return C.GstFlowReturn(gst.FlowOK)
}
return C.GstFlowReturn(cbs.NewSampleFunc(wrapCSink(sink)))
}
//export goSinkGDestroyNotifyFunc
func goSinkGDestroyNotifyFunc(ptr C.gpointer) {
gopointer.Unref(unsafe.Pointer(ptr))
}

View File

@@ -1,14 +1,41 @@
package app package app
// #include "gst.go.h" /*
#include "gst.go.h"
extern void goSinkGDestroyNotifyFunc (gpointer user_data);
extern void goSinkEOSCb (GstAppSink * sink, gpointer user_data);
extern GstFlowReturn goSinkNewPrerollCb (GstAppSink * sink, gpointer user_data);
extern GstFlowReturn goSinkNewSampleCb (GstAppSink * sink, gpointer user_data);
void cgoGDestroyNotifyFunc (gpointer user_data) { goSinkGDestroyNotifyFunc(user_data); }
void cgoSinkEOSCb (GstAppSink * sink, gpointer user_data) { return goSinkEOSCb(sink, user_data); }
GstFlowReturn cgoSinkNewPrerollCb (GstAppSink * sink, gpointer user_data) { return goSinkNewPrerollCb(sink, user_data); }
GstFlowReturn cgoSinkNewSampleCb (GstAppSink * sink, gpointer user_data) { return goSinkNewSampleCb(sink, user_data); }
*/
import "C" import "C"
import ( import (
"errors" "errors"
"time"
"unsafe" "unsafe"
gopointer "github.com/mattn/go-pointer"
"github.com/tinyzimmer/go-gst/gst" "github.com/tinyzimmer/go-gst/gst"
) )
// SinkCallbacks represents callbacks that can be installed on an app sink when data is available.
type SinkCallbacks struct {
EOSFunc func(appSink *Sink)
NewPrerollFunc func(appSink *Sink) gst.FlowReturn
NewSampleFunc func(appSink *Sink) gst.FlowReturn
}
// ErrEOS represents that the stream has ended.
var ErrEOS = errors.New("Pipeline has reached end-of-stream")
// Sink wraps an Element made with the appsink plugin with additional methods for pulling samples. // Sink wraps an Element made with the appsink plugin with additional methods for pulling samples.
type Sink struct{ *gst.Element } type Sink struct{ *gst.Element }
@@ -24,41 +51,170 @@ func NewAppSink() (*Sink, error) {
// Instance returns the native GstAppSink instance. // Instance returns the native GstAppSink instance.
func (a *Sink) Instance() *C.GstAppSink { return C.toGstAppSink(a.Unsafe()) } func (a *Sink) Instance() *C.GstAppSink { return C.toGstAppSink(a.Unsafe()) }
// ErrEOS represents that the stream has ended. // GetBufferListSupport checks if appsink supports buffer lists.
var ErrEOS = errors.New("Pipeline has reached end-of-stream") func (a *Sink) GetBufferListSupport() bool {
return gobool(C.gst_app_sink_get_buffer_list_support(a.Instance()))
}
// GetCaps gets the configured caps on appsink.
func (a *Sink) GetCaps() *gst.Caps {
caps := C.gst_app_sink_get_caps(a.Instance())
if caps == nil {
return nil
}
return gst.FromGstCapsUnsafe(unsafe.Pointer(caps))
}
// GetDrop checks if appsink will drop old buffers when the maximum amount of queued buffers is reached.
func (a *Sink) GetDrop() bool {
return gobool(C.gst_app_sink_get_drop(a.Instance()))
}
// GetEmitSignals checks if appsink will emit the "new-preroll" and "new-sample" signals.
func (a *Sink) GetEmitSignals() bool {
return gobool(C.gst_app_sink_get_emit_signals(a.Instance()))
}
// GetMaxBuffers gets the maximum amount of buffers that can be queued in appsink.
func (a *Sink) GetMaxBuffers() uint {
return uint(C.gst_app_sink_get_max_buffers(a.Instance()))
}
// GetWaitOnEOS checks if appsink will wait for all buffers to be consumed when an EOS is received.
func (a *Sink) GetWaitOnEOS() bool {
return gobool(C.gst_app_sink_get_wait_on_eos(a.Instance()))
}
// IsEOS returns true if this AppSink has reached the end-of-stream. // IsEOS returns true if this AppSink has reached the end-of-stream.
func (a *Sink) IsEOS() bool { func (a *Sink) IsEOS() bool {
return gobool(C.gst_app_sink_is_eos((*C.GstAppSink)(a.Instance()))) return gobool(C.gst_app_sink_is_eos((*C.GstAppSink)(a.Instance())))
} }
// BlockPullSample will block until a sample becomes available or the stream // PullPreroll gets the last preroll sample in appsink. This was the sample that caused the appsink to preroll in the PAUSED state.
// is ended. //
func (a *Sink) BlockPullSample() (*gst.Sample, error) { // This function is typically used when dealing with a pipeline in the PAUSED state. Calling this function after doing a seek will
for { // give the sample right after the seek position.
if a.IsEOS() { //
return nil, ErrEOS // Calling this function will clear the internal reference to the preroll buffer.
} //
// This function won't block if the entire pipeline is waiting for data // Note that the preroll sample will also be returned as the first sample when calling gst_app_sink_pull_sample.
sample := C.gst_app_sink_pull_sample((*C.GstAppSink)(a.Instance())) //
if sample == nil { // If an EOS event was received before any buffers, this function returns NULL. Use gst_app_sink_is_eos () to check for the EOS condition.
continue //
} // This function blocks until a preroll sample or EOS is received or the appsink element is set to the READY/NULL state.
return gst.FromGstSampleUnsafe(unsafe.Pointer(sample)), nil func (a *Sink) PullPreroll() *gst.Sample {
smpl := C.gst_app_sink_pull_preroll(a.Instance())
if smpl == nil {
return nil
} }
return gst.FromGstSampleUnsafe(unsafe.Pointer(smpl))
} }
// PullSample will try to pull a sample or return nil if none is available. // PullSample blocks until a sample or EOS becomes available or the appsink element is set to the READY/NULL state.
func (a *Sink) PullSample() (*gst.Sample, error) { //
if a.IsEOS() { // This function will only return samples when the appsink is in the PLAYING state. All rendered buffers will be put in a queue
return nil, ErrEOS // so that the application can pull samples at its own rate. Note that when the application does not pull samples fast enough, the queued
// buffers could consume a lot of memory, especially when dealing with raw video frames.
//
// If an EOS event was received before any buffers, this function returns NULL. Use IsEOS() to check for the EOS condition.
func (a *Sink) PullSample() *gst.Sample {
smpl := C.gst_app_sink_pull_sample(a.Instance())
if smpl == nil {
return nil
} }
sample := C.gst_app_sink_try_pull_sample( return gst.FromGstSampleUnsafe(unsafe.Pointer(smpl))
(*C.GstAppSink)(a.Instance()), }
C.GST_SECOND,
// SetBufferListSupport instructs appsink to enable or disable buffer list support.
//
// For backwards-compatibility reasons applications need to opt in to indicate that they will be able to handle buffer lists.
func (a *Sink) SetBufferListSupport(enabled bool) {
C.gst_app_sink_set_buffer_list_support(a.Instance(), gboolean(enabled))
}
// SetCallbacks sets callbacks which will be executed for each new preroll, new sample and eos. This is an alternative to using the signals,
// it has lower overhead and is thus less expensive, but also less flexible.
//
// If callbacks are installed, no signals will be emitted for performance reasons.
//
// Before 1.16.3 it was not possible to change the callbacks in a thread-safe way.
func (a *Sink) SetCallbacks(cbs *SinkCallbacks) {
ptr := gopointer.Save(cbs)
appSinkCallbacks := &C.GstAppSinkCallbacks{
eos: (*[0]byte)(unsafe.Pointer(C.cgoSinkEOSCb)),
new_preroll: (*[0]byte)(unsafe.Pointer(C.cgoSinkNewPrerollCb)),
new_sample: (*[0]byte)(unsafe.Pointer(C.cgoSinkNewSampleCb)),
}
C.gst_app_sink_set_callbacks(
a.Instance(),
appSinkCallbacks,
(C.gpointer)(unsafe.Pointer(ptr)),
C.GDestroyNotify(C.cgoGDestroyNotifyFunc),
) )
if sample != nil {
return gst.FromGstSampleUnsafe(unsafe.Pointer(sample)), nil
} }
return nil, nil
// SetCaps sets the capabilities on the appsink element. This function takes a copy of the caps structure. After calling this method,
// the sink will only accept caps that match caps. If caps is non-fixed, or incomplete, you must check the caps on the samples to get
// the actual used caps.
func (a *Sink) SetCaps(caps *gst.Caps) {
C.gst_app_sink_set_caps(a.Instance(), (*C.GstCaps)(unsafe.Pointer(caps.Instance())))
}
// SetDrop instructs appsink to drop old buffers when the maximum amount of queued buffers is reached.
func (a *Sink) SetDrop(drop bool) {
C.gst_app_sink_set_drop(a.Instance(), gboolean(drop))
}
// SetEmitSignals makes appsink emit the "new-preroll" and "new-sample" signals. This option is by default disabled because signal emission
// is expensive and unneeded when the application prefers to operate in pull mode.
func (a *Sink) SetEmitSignals(emit bool) {
C.gst_app_sink_set_emit_signals(a.Instance(), gboolean(emit))
}
// SetMaxBuffers sets the maximum amount of buffers that can be queued in appsink. After this amount of buffers are queued in appsink,
// any more buffers will block upstream elements until a sample is pulled from appsink.
func (a *Sink) SetMaxBuffers(max uint) {
C.gst_app_sink_set_max_buffers(a.Instance(), C.guint(max))
}
// SetWaitOnEOS instructs appsink to wait for all buffers to be consumed when an EOS is received.
func (a *Sink) SetWaitOnEOS(wait bool) {
C.gst_app_sink_set_wait_on_eos(a.Instance(), gboolean(wait))
}
// TryPullPreroll gets the last preroll sample in appsink. This was the sample that caused the appsink to preroll in the PAUSED state.
//
// This function is typically used when dealing with a pipeline in the PAUSED state. Calling this function after doing a seek will give
// the sample right after the seek position.
//
// Calling this function will clear the internal reference to the preroll buffer.
//
// Note that the preroll sample will also be returned as the first sample when calling PullSample.
//
// If an EOS event was received before any buffers or the timeout expires, this function returns NULL. Use IsEOS () to check for the EOS condition.
//
// This function blocks until a preroll sample or EOS is received, the appsink element is set to the READY/NULL state, or the timeout expires.
func (a *Sink) TryPullPreroll(timeout time.Duration) *gst.Sample {
tm := C.GstClockTime(timeout.Nanoseconds())
smpl := C.gst_app_sink_try_pull_preroll(a.Instance(), tm)
if smpl == nil {
return nil
}
return gst.FromGstSampleUnsafe(unsafe.Pointer(smpl))
}
// TryPullSample blocks until a sample or EOS becomes available or the appsink element is set to the READY/NULL state or the timeout expires.
//
// This function will only return samples when the appsink is in the PLAYING state. All rendered buffers will be put in a queue so that the
// application can pull samples at its own rate. Note that when the application does not pull samples fast enough, the queued buffers could
// consume a lot of memory, especially when dealing with raw video frames.
//
// If an EOS event was received before any buffers or the timeout expires, this function returns NULL. Use IsEOS () to check for the EOS condition.
func (a *Sink) TryPullSample(timeout time.Duration) *gst.Sample {
tm := C.GstClockTime(timeout.Nanoseconds())
smpl := C.gst_app_sink_try_pull_sample(a.Instance(), tm)
if smpl == nil {
return nil
}
return gst.FromGstSampleUnsafe(unsafe.Pointer(smpl))
} }

View File

@@ -3,10 +3,20 @@ package app
// #include <gst/gst.h> // #include <gst/gst.h>
import "C" import "C"
import "github.com/tinyzimmer/go-gst/gst" import (
"github.com/tinyzimmer/go-gst/gst"
)
func wrapAppSink(elem *gst.Element) *Sink { return &Sink{elem} } func wrapAppSink(elem *gst.Element) *Sink { return &Sink{elem} }
func wrapAppSrc(elem *gst.Element) *Source { return &Source{elem} } func wrapAppSrc(elem *gst.Element) *Source { return &Source{elem} }
// gobool provides an easy type conversion between C.gboolean and a go bool. // gobool provides an easy type conversion between C.gboolean and a go bool.
func gobool(b C.gboolean) bool { return int(b) > 0 } func gobool(b C.gboolean) bool { return int(b) > 0 }
// gboolean converts a go bool to a C.gboolean.
func gboolean(b bool) C.gboolean {
if b {
return C.gboolean(1)
}
return C.gboolean(0)
}

View File

@@ -26,6 +26,10 @@ type Caps struct {
native *C.GstCaps native *C.GstCaps
} }
// FromGstCapsUnsafe wraps the pointer to the given C GstCaps with the go type.
// This is meant for internal usage and is exported for visibility to other packages.
func FromGstCapsUnsafe(caps unsafe.Pointer) *Caps { return wrapCaps(C.toGstCaps(caps)) }
// CapsMapFunc represents a function passed to the Caps MapInPlace, ForEach, and FilterAndMapInPlace methods. // CapsMapFunc represents a function passed to the Caps MapInPlace, ForEach, and FilterAndMapInPlace methods.
type CapsMapFunc func(features *CapsFeatures, structure *Structure) bool type CapsMapFunc func(features *CapsFeatures, structure *Structure) bool

View File

@@ -10,6 +10,10 @@ import (
"github.com/gotk3/gotk3/glib" "github.com/gotk3/gotk3/glib"
) )
// FromGstElementUnsafe wraps the pointer to the given C GstElement with the go type.
// This is meant for internal usage and is exported for visibility to other packages.
func FromGstElementUnsafe(elem unsafe.Pointer) *Element { return wrapElement(toGObject(elem)) }
// NewElement is a generic wrapper around `gst_element_factory_make`. // NewElement is a generic wrapper around `gst_element_factory_make`.
func NewElement(name string) (*Element, error) { func NewElement(name string) (*Element, error) {
elemName := C.CString(name) elemName := C.CString(name)
@@ -23,17 +27,17 @@ func NewElement(name string) (*Element, error) {
// NewElementMany is a convenience wrapper around building many GstElements in a // NewElementMany is a convenience wrapper around building many GstElements in a
// single function call. It returns an error if the creation of any element fails. A // single function call. It returns an error if the creation of any element fails. A
// map containing the ordinal of the argument to the Element created is returned. // slice in the order the names were given is returned.
func NewElementMany(elemNames ...string) (map[int]*Element, error) { func NewElementMany(elemNames ...string) ([]*Element, error) {
elemMap := make(map[int]*Element) elems := make([]*Element, len(elemNames))
for idx, name := range elemNames { for idx, name := range elemNames {
elem, err := NewElement(name) elem, err := NewElement(name)
if err != nil { if err != nil {
return nil, err return nil, err
} }
elemMap[idx] = elem elems[idx] = elem
} }
return elemMap, nil return elems, nil
} }
// ElementFactory wraps the GstElementFactory // ElementFactory wraps the GstElementFactory

View File

@@ -123,6 +123,78 @@ func (m *MapInfo) Bytes() []byte {
return C.GoBytes(m.Data, (C.int)(m.Size)) return C.GoBytes(m.Data, (C.int)(m.Size))
} }
// AsInt8Slice returns the contents of this map as a slice of signed 8-bit integers.
func (m *MapInfo) AsInt8Slice() []int8 {
out := make([]int8, m.Size)
for i, t := range (*[1 << 30]int8)(m.Data)[:m.Size:m.Size] {
out[i] = int8(t)
}
return out
}
// AsInt16Slice returns the contents of this map as a slice of signed 16-bit integers.
func (m *MapInfo) AsInt16Slice() []int16 {
out := make([]int16, m.Size)
for i, t := range (*[1 << 30]int16)(m.Data)[:m.Size:m.Size] {
out[i] = int16(t)
}
return out
}
// AsInt32Slice returns the contents of this map as a slice of signed 32-bit integers.
func (m *MapInfo) AsInt32Slice() []int32 {
out := make([]int32, m.Size)
for i, t := range (*[1 << 30]int32)(m.Data)[:m.Size:m.Size] {
out[i] = int32(t)
}
return out
}
// AsInt64Slice returns the contents of this map as a slice of signed 64-bit integers.
func (m *MapInfo) AsInt64Slice() []int64 {
out := make([]int64, m.Size)
for i, t := range (*[1 << 30]int64)(m.Data)[:m.Size:m.Size] {
out[i] = int64(t)
}
return out
}
// AsUint8Slice returns the contents of this map as a slice of unsigned 8-bit integers.
func (m *MapInfo) AsUint8Slice() []uint8 {
out := make([]uint8, m.Size)
for i, t := range (*[1 << 30]uint8)(m.Data)[:m.Size:m.Size] {
out[i] = uint8(t)
}
return out
}
// AsUint16Slice returns the contents of this map as a slice of unsigned 16-bit integers.
func (m *MapInfo) AsUint16Slice() []uint16 {
out := make([]uint16, m.Size)
for i, t := range (*[1 << 30]uint16)(m.Data)[:m.Size:m.Size] {
out[i] = uint16(t)
}
return out
}
// AsUint32Slice returns the contents of this map as a slice of unsigned 32-bit integers.
func (m *MapInfo) AsUint32Slice() []uint32 {
out := make([]uint32, m.Size)
for i, t := range (*[1 << 30]uint32)(m.Data)[:m.Size:m.Size] {
out[i] = uint32(t)
}
return out
}
// AsUint64Slice returns the contents of this map as a slice of unsigned 64-bit integers.
func (m *MapInfo) AsUint64Slice() []uint64 {
out := make([]uint64, m.Size)
for i, t := range (*[1 << 30]uint64)(m.Data)[:m.Size:m.Size] {
out[i] = uint64(t)
}
return out
}
func wrapMapInfo(mapInfo *C.GstMapInfo, unmapFunc func()) *MapInfo { func wrapMapInfo(mapInfo *C.GstMapInfo, unmapFunc func()) *MapInfo {
return &MapInfo{ return &MapInfo{
ptr: mapInfo, ptr: mapInfo,