diff --git a/examples/appsink/main.go b/examples/appsink/main.go new file mode 100644 index 0000000..a76b05f --- /dev/null +++ b/examples/appsink/main.go @@ -0,0 +1,113 @@ +package main + +import ( + "fmt" + "math" + "time" + + "github.com/tinyzimmer/go-gst/examples" + "github.com/tinyzimmer/go-gst/gst" + "github.com/tinyzimmer/go-gst/gst/app" +) + +func createPipeline() *gst.Pipeline { + gst.Init(nil) + + pipeline, err := gst.NewPipeline("") + if err != nil { + panic(err) + } + + src, err := gst.NewElement("audiotestsrc") + if err != nil { + panic(err) + } + + sink, err := app.NewAppSink() + if err != nil { + panic(err) + } + + pipeline.AddMany(src, sink.Element) + src.Link(sink.Element) + + // Tell the appsink what format we want. It will then be the audiotestsrc's job to + // provide the format we request. + // This can be set after linking the two objects, because format negotiation between + // both elements will happen during pre-rolling of the pipeline. + sink.SetCaps(gst.NewCapsFromString( + "audio/x-raw, format=S16LE, layout=interleaved, channels=1", + )) + + // Getting data out of the appsink is done by setting callbacks on it. + // The appsink will then call those handlers, as soon as data is available. + sink.SetCallbacks(&app.SinkCallbacks{ + // Add a "new-sample" callback + NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { + + // Pull the sample that triggered this callback + sample := sink.PullSample() + if sample == nil { + return gst.FlowEOS + } + + // Retrieve the buffer from the sample + buffer := sample.GetBuffer() + if buffer == nil { + return gst.FlowError + } + + // At this point, buffer is only a reference to an existing memory region somewhere. + // When we want to access its content, we have to map it while requesting the required + // mode of access (read, read/write). + // + // We also know what format to expect because we set it with the caps. So we convert + // the map directly to signed 16-bit integers. + samples := buffer.Map(gst.MapRead).AsInt16Slice() + + // Calculate the root mean square for the buffer + // (https://en.wikipedia.org/wiki/Root_mean_square) + var square float64 + for _, i := range samples { + square += float64(i * i) + } + rms := math.Sqrt(square / float64(len(samples))) + fmt.Println("rms:", rms) + + return gst.FlowOK + }, + }) + + return pipeline +} + +func mainLoop(pipeline *gst.Pipeline) error { + defer pipeline.Destroy() + + pipeline.SetState(gst.StatePlaying) + + bus := pipeline.GetPipelineBus() + + for { + msg := bus.TimedPop(time.Duration(-1)) + if msg == nil { + break + } + switch msg.Type() { + case gst.MessageEOS: + break + case gst.MessageError: + return msg.ParseError() + } + } + + return nil +} + +func main() { + examples.Run(func() error { + pipeline := createPipeline() + return mainLoop(pipeline) + }) + +} diff --git a/examples/common.go b/examples/common.go new file mode 100644 index 0000000..b1f42b3 --- /dev/null +++ b/examples/common.go @@ -0,0 +1,23 @@ +package examples + +import ( + "fmt" + + "github.com/tinyzimmer/go-gst/gst" +) + +// Run is used to wrap the given function in a main loop and print any error +func Run(f func() error) { + mainLoop := gst.NewMainLoop(gst.DefaultMainContext(), false) + + defer mainLoop.Unref() + + go func() { + if err := f(); err != nil { + fmt.Println("ERROR!", err) + } + mainLoop.Quit() + }() + + mainLoop.Run() +} diff --git a/gst/app/cgo_exports.go b/gst/app/cgo_exports.go new file mode 100644 index 0000000..fe83aec --- /dev/null +++ b/gst/app/cgo_exports.go @@ -0,0 +1,65 @@ +package app + +// #include "gst.go.h" +import "C" +import ( + "unsafe" + + gopointer "github.com/mattn/go-pointer" + "github.com/tinyzimmer/go-gst/gst" +) + +func getCbsFromPtr(userData C.gpointer) *SinkCallbacks { + ptr := gopointer.Restore(unsafe.Pointer(userData)) + cbs, ok := ptr.(*SinkCallbacks) + if !ok { + gopointer.Unref(unsafe.Pointer(userData)) + return nil + } + return cbs +} + +func wrapCSink(sink *C.GstAppSink) *Sink { + return wrapAppSink(gst.FromGstElementUnsafe(unsafe.Pointer(sink))) +} + +//export goSinkEOSCb +func goSinkEOSCb(sink *C.GstAppSink, userData C.gpointer) { + cbs := getCbsFromPtr(userData) + if cbs == nil { + return + } + if cbs.EOSFunc == nil { + return + } + cbs.EOSFunc(wrapCSink(sink)) +} + +//export goSinkNewPrerollCb +func goSinkNewPrerollCb(sink *C.GstAppSink, userData C.gpointer) C.GstFlowReturn { + cbs := getCbsFromPtr(userData) + if cbs == nil { + return C.GstFlowReturn(gst.FlowError) + } + if cbs.NewPrerollFunc == nil { + return C.GstFlowReturn(gst.FlowOK) + } + return C.GstFlowReturn(cbs.NewPrerollFunc(wrapCSink(sink))) +} + +//export goSinkNewSampleCb +func goSinkNewSampleCb(sink *C.GstAppSink, userData C.gpointer) C.GstFlowReturn { + cbs := getCbsFromPtr(userData) + if cbs == nil { + return C.GstFlowReturn(gst.FlowError) + } + if cbs.NewSampleFunc == nil { + return C.GstFlowReturn(gst.FlowOK) + } + return C.GstFlowReturn(cbs.NewSampleFunc(wrapCSink(sink))) +} + +//export goSinkGDestroyNotifyFunc +func goSinkGDestroyNotifyFunc(ptr C.gpointer) { + gopointer.Unref(unsafe.Pointer(ptr)) +} diff --git a/gst/app/gst_app_sink.go b/gst/app/gst_app_sink.go index 27dd02c..8736e93 100644 --- a/gst/app/gst_app_sink.go +++ b/gst/app/gst_app_sink.go @@ -1,14 +1,41 @@ package app -// #include "gst.go.h" +/* +#include "gst.go.h" + +extern void goSinkGDestroyNotifyFunc (gpointer user_data); + +extern void goSinkEOSCb (GstAppSink * sink, gpointer user_data); +extern GstFlowReturn goSinkNewPrerollCb (GstAppSink * sink, gpointer user_data); +extern GstFlowReturn goSinkNewSampleCb (GstAppSink * sink, gpointer user_data); + +void cgoGDestroyNotifyFunc (gpointer user_data) { goSinkGDestroyNotifyFunc(user_data); } +void cgoSinkEOSCb (GstAppSink * sink, gpointer user_data) { return goSinkEOSCb(sink, user_data); } +GstFlowReturn cgoSinkNewPrerollCb (GstAppSink * sink, gpointer user_data) { return goSinkNewPrerollCb(sink, user_data); } +GstFlowReturn cgoSinkNewSampleCb (GstAppSink * sink, gpointer user_data) { return goSinkNewSampleCb(sink, user_data); } + +*/ import "C" + import ( "errors" + "time" "unsafe" + gopointer "github.com/mattn/go-pointer" "github.com/tinyzimmer/go-gst/gst" ) +// SinkCallbacks represents callbacks that can be installed on an app sink when data is available. +type SinkCallbacks struct { + EOSFunc func(appSink *Sink) + NewPrerollFunc func(appSink *Sink) gst.FlowReturn + NewSampleFunc func(appSink *Sink) gst.FlowReturn +} + +// ErrEOS represents that the stream has ended. +var ErrEOS = errors.New("Pipeline has reached end-of-stream") + // Sink wraps an Element made with the appsink plugin with additional methods for pulling samples. type Sink struct{ *gst.Element } @@ -24,41 +51,170 @@ func NewAppSink() (*Sink, error) { // Instance returns the native GstAppSink instance. func (a *Sink) Instance() *C.GstAppSink { return C.toGstAppSink(a.Unsafe()) } -// ErrEOS represents that the stream has ended. -var ErrEOS = errors.New("Pipeline has reached end-of-stream") +// GetBufferListSupport checks if appsink supports buffer lists. +func (a *Sink) GetBufferListSupport() bool { + return gobool(C.gst_app_sink_get_buffer_list_support(a.Instance())) +} + +// GetCaps gets the configured caps on appsink. +func (a *Sink) GetCaps() *gst.Caps { + caps := C.gst_app_sink_get_caps(a.Instance()) + if caps == nil { + return nil + } + return gst.FromGstCapsUnsafe(unsafe.Pointer(caps)) +} + +// GetDrop checks if appsink will drop old buffers when the maximum amount of queued buffers is reached. +func (a *Sink) GetDrop() bool { + return gobool(C.gst_app_sink_get_drop(a.Instance())) +} + +// GetEmitSignals checks if appsink will emit the "new-preroll" and "new-sample" signals. +func (a *Sink) GetEmitSignals() bool { + return gobool(C.gst_app_sink_get_emit_signals(a.Instance())) +} + +// GetMaxBuffers gets the maximum amount of buffers that can be queued in appsink. +func (a *Sink) GetMaxBuffers() uint { + return uint(C.gst_app_sink_get_max_buffers(a.Instance())) +} + +// GetWaitOnEOS checks if appsink will wait for all buffers to be consumed when an EOS is received. +func (a *Sink) GetWaitOnEOS() bool { + return gobool(C.gst_app_sink_get_wait_on_eos(a.Instance())) +} // IsEOS returns true if this AppSink has reached the end-of-stream. func (a *Sink) IsEOS() bool { return gobool(C.gst_app_sink_is_eos((*C.GstAppSink)(a.Instance()))) } -// BlockPullSample will block until a sample becomes available or the stream -// is ended. -func (a *Sink) BlockPullSample() (*gst.Sample, error) { - for { - if a.IsEOS() { - return nil, ErrEOS - } - // This function won't block if the entire pipeline is waiting for data - sample := C.gst_app_sink_pull_sample((*C.GstAppSink)(a.Instance())) - if sample == nil { - continue - } - return gst.FromGstSampleUnsafe(unsafe.Pointer(sample)), nil +// PullPreroll gets the last preroll sample in appsink. This was the sample that caused the appsink to preroll in the PAUSED state. +// +// This function is typically used when dealing with a pipeline in the PAUSED state. Calling this function after doing a seek will +// give the sample right after the seek position. +// +// Calling this function will clear the internal reference to the preroll buffer. +// +// Note that the preroll sample will also be returned as the first sample when calling gst_app_sink_pull_sample. +// +// If an EOS event was received before any buffers, this function returns NULL. Use gst_app_sink_is_eos () to check for the EOS condition. +// +// This function blocks until a preroll sample or EOS is received or the appsink element is set to the READY/NULL state. +func (a *Sink) PullPreroll() *gst.Sample { + smpl := C.gst_app_sink_pull_preroll(a.Instance()) + if smpl == nil { + return nil } + return gst.FromGstSampleUnsafe(unsafe.Pointer(smpl)) } -// PullSample will try to pull a sample or return nil if none is available. -func (a *Sink) PullSample() (*gst.Sample, error) { - if a.IsEOS() { - return nil, ErrEOS +// PullSample blocks until a sample or EOS becomes available or the appsink element is set to the READY/NULL state. +// +// This function will only return samples when the appsink is in the PLAYING state. All rendered buffers will be put in a queue +// so that the application can pull samples at its own rate. Note that when the application does not pull samples fast enough, the queued +// buffers could consume a lot of memory, especially when dealing with raw video frames. +// +// If an EOS event was received before any buffers, this function returns NULL. Use IsEOS() to check for the EOS condition. +func (a *Sink) PullSample() *gst.Sample { + smpl := C.gst_app_sink_pull_sample(a.Instance()) + if smpl == nil { + return nil } - sample := C.gst_app_sink_try_pull_sample( - (*C.GstAppSink)(a.Instance()), - C.GST_SECOND, - ) - if sample != nil { - return gst.FromGstSampleUnsafe(unsafe.Pointer(sample)), nil - } - return nil, nil + return gst.FromGstSampleUnsafe(unsafe.Pointer(smpl)) +} + +// SetBufferListSupport instructs appsink to enable or disable buffer list support. +// +// For backwards-compatibility reasons applications need to opt in to indicate that they will be able to handle buffer lists. +func (a *Sink) SetBufferListSupport(enabled bool) { + C.gst_app_sink_set_buffer_list_support(a.Instance(), gboolean(enabled)) +} + +// SetCallbacks sets callbacks which will be executed for each new preroll, new sample and eos. This is an alternative to using the signals, +// it has lower overhead and is thus less expensive, but also less flexible. +// +// If callbacks are installed, no signals will be emitted for performance reasons. +// +// Before 1.16.3 it was not possible to change the callbacks in a thread-safe way. +func (a *Sink) SetCallbacks(cbs *SinkCallbacks) { + ptr := gopointer.Save(cbs) + appSinkCallbacks := &C.GstAppSinkCallbacks{ + eos: (*[0]byte)(unsafe.Pointer(C.cgoSinkEOSCb)), + new_preroll: (*[0]byte)(unsafe.Pointer(C.cgoSinkNewPrerollCb)), + new_sample: (*[0]byte)(unsafe.Pointer(C.cgoSinkNewSampleCb)), + } + C.gst_app_sink_set_callbacks( + a.Instance(), + appSinkCallbacks, + (C.gpointer)(unsafe.Pointer(ptr)), + C.GDestroyNotify(C.cgoGDestroyNotifyFunc), + ) +} + +// SetCaps sets the capabilities on the appsink element. This function takes a copy of the caps structure. After calling this method, +// the sink will only accept caps that match caps. If caps is non-fixed, or incomplete, you must check the caps on the samples to get +// the actual used caps. +func (a *Sink) SetCaps(caps *gst.Caps) { + C.gst_app_sink_set_caps(a.Instance(), (*C.GstCaps)(unsafe.Pointer(caps.Instance()))) +} + +// SetDrop instructs appsink to drop old buffers when the maximum amount of queued buffers is reached. +func (a *Sink) SetDrop(drop bool) { + C.gst_app_sink_set_drop(a.Instance(), gboolean(drop)) +} + +// SetEmitSignals makes appsink emit the "new-preroll" and "new-sample" signals. This option is by default disabled because signal emission +// is expensive and unneeded when the application prefers to operate in pull mode. +func (a *Sink) SetEmitSignals(emit bool) { + C.gst_app_sink_set_emit_signals(a.Instance(), gboolean(emit)) +} + +// SetMaxBuffers sets the maximum amount of buffers that can be queued in appsink. After this amount of buffers are queued in appsink, +// any more buffers will block upstream elements until a sample is pulled from appsink. +func (a *Sink) SetMaxBuffers(max uint) { + C.gst_app_sink_set_max_buffers(a.Instance(), C.guint(max)) +} + +// SetWaitOnEOS instructs appsink to wait for all buffers to be consumed when an EOS is received. +func (a *Sink) SetWaitOnEOS(wait bool) { + C.gst_app_sink_set_wait_on_eos(a.Instance(), gboolean(wait)) +} + +// TryPullPreroll gets the last preroll sample in appsink. This was the sample that caused the appsink to preroll in the PAUSED state. +// +// This function is typically used when dealing with a pipeline in the PAUSED state. Calling this function after doing a seek will give +// the sample right after the seek position. +// +// Calling this function will clear the internal reference to the preroll buffer. +// +// Note that the preroll sample will also be returned as the first sample when calling PullSample. +// +// If an EOS event was received before any buffers or the timeout expires, this function returns NULL. Use IsEOS () to check for the EOS condition. +// +// This function blocks until a preroll sample or EOS is received, the appsink element is set to the READY/NULL state, or the timeout expires. +func (a *Sink) TryPullPreroll(timeout time.Duration) *gst.Sample { + tm := C.GstClockTime(timeout.Nanoseconds()) + smpl := C.gst_app_sink_try_pull_preroll(a.Instance(), tm) + if smpl == nil { + return nil + } + return gst.FromGstSampleUnsafe(unsafe.Pointer(smpl)) +} + +// TryPullSample blocks until a sample or EOS becomes available or the appsink element is set to the READY/NULL state or the timeout expires. +// +// This function will only return samples when the appsink is in the PLAYING state. All rendered buffers will be put in a queue so that the +// application can pull samples at its own rate. Note that when the application does not pull samples fast enough, the queued buffers could +// consume a lot of memory, especially when dealing with raw video frames. +// +// If an EOS event was received before any buffers or the timeout expires, this function returns NULL. Use IsEOS () to check for the EOS condition. +func (a *Sink) TryPullSample(timeout time.Duration) *gst.Sample { + tm := C.GstClockTime(timeout.Nanoseconds()) + smpl := C.gst_app_sink_try_pull_sample(a.Instance(), tm) + if smpl == nil { + return nil + } + return gst.FromGstSampleUnsafe(unsafe.Pointer(smpl)) } diff --git a/gst/app/wrappers.go b/gst/app/wrappers.go index 75bbebe..fa04b88 100644 --- a/gst/app/wrappers.go +++ b/gst/app/wrappers.go @@ -3,10 +3,20 @@ package app // #include import "C" -import "github.com/tinyzimmer/go-gst/gst" +import ( + "github.com/tinyzimmer/go-gst/gst" +) func wrapAppSink(elem *gst.Element) *Sink { return &Sink{elem} } func wrapAppSrc(elem *gst.Element) *Source { return &Source{elem} } // gobool provides an easy type conversion between C.gboolean and a go bool. func gobool(b C.gboolean) bool { return int(b) > 0 } + +// gboolean converts a go bool to a C.gboolean. +func gboolean(b bool) C.gboolean { + if b { + return C.gboolean(1) + } + return C.gboolean(0) +} diff --git a/gst/gst_caps.go b/gst/gst_caps.go index 4575ca6..5c4246a 100644 --- a/gst/gst_caps.go +++ b/gst/gst_caps.go @@ -26,6 +26,10 @@ type Caps struct { native *C.GstCaps } +// FromGstCapsUnsafe wraps the pointer to the given C GstCaps with the go type. +// This is meant for internal usage and is exported for visibility to other packages. +func FromGstCapsUnsafe(caps unsafe.Pointer) *Caps { return wrapCaps(C.toGstCaps(caps)) } + // CapsMapFunc represents a function passed to the Caps MapInPlace, ForEach, and FilterAndMapInPlace methods. type CapsMapFunc func(features *CapsFeatures, structure *Structure) bool diff --git a/gst/gst_element_factory.go b/gst/gst_element_factory.go index 8f95e38..3ad5589 100644 --- a/gst/gst_element_factory.go +++ b/gst/gst_element_factory.go @@ -10,6 +10,10 @@ import ( "github.com/gotk3/gotk3/glib" ) +// FromGstElementUnsafe wraps the pointer to the given C GstElement with the go type. +// This is meant for internal usage and is exported for visibility to other packages. +func FromGstElementUnsafe(elem unsafe.Pointer) *Element { return wrapElement(toGObject(elem)) } + // NewElement is a generic wrapper around `gst_element_factory_make`. func NewElement(name string) (*Element, error) { elemName := C.CString(name) @@ -23,17 +27,17 @@ func NewElement(name string) (*Element, error) { // NewElementMany is a convenience wrapper around building many GstElements in a // single function call. It returns an error if the creation of any element fails. A -// map containing the ordinal of the argument to the Element created is returned. -func NewElementMany(elemNames ...string) (map[int]*Element, error) { - elemMap := make(map[int]*Element) +// slice in the order the names were given is returned. +func NewElementMany(elemNames ...string) ([]*Element, error) { + elems := make([]*Element, len(elemNames)) for idx, name := range elemNames { elem, err := NewElement(name) if err != nil { return nil, err } - elemMap[idx] = elem + elems[idx] = elem } - return elemMap, nil + return elems, nil } // ElementFactory wraps the GstElementFactory diff --git a/gst/gst_memory.go b/gst/gst_memory.go index e262db7..5b67146 100644 --- a/gst/gst_memory.go +++ b/gst/gst_memory.go @@ -123,6 +123,78 @@ func (m *MapInfo) Bytes() []byte { return C.GoBytes(m.Data, (C.int)(m.Size)) } +// AsInt8Slice returns the contents of this map as a slice of signed 8-bit integers. +func (m *MapInfo) AsInt8Slice() []int8 { + out := make([]int8, m.Size) + for i, t := range (*[1 << 30]int8)(m.Data)[:m.Size:m.Size] { + out[i] = int8(t) + } + return out +} + +// AsInt16Slice returns the contents of this map as a slice of signed 16-bit integers. +func (m *MapInfo) AsInt16Slice() []int16 { + out := make([]int16, m.Size) + for i, t := range (*[1 << 30]int16)(m.Data)[:m.Size:m.Size] { + out[i] = int16(t) + } + return out +} + +// AsInt32Slice returns the contents of this map as a slice of signed 32-bit integers. +func (m *MapInfo) AsInt32Slice() []int32 { + out := make([]int32, m.Size) + for i, t := range (*[1 << 30]int32)(m.Data)[:m.Size:m.Size] { + out[i] = int32(t) + } + return out +} + +// AsInt64Slice returns the contents of this map as a slice of signed 64-bit integers. +func (m *MapInfo) AsInt64Slice() []int64 { + out := make([]int64, m.Size) + for i, t := range (*[1 << 30]int64)(m.Data)[:m.Size:m.Size] { + out[i] = int64(t) + } + return out +} + +// AsUint8Slice returns the contents of this map as a slice of unsigned 8-bit integers. +func (m *MapInfo) AsUint8Slice() []uint8 { + out := make([]uint8, m.Size) + for i, t := range (*[1 << 30]uint8)(m.Data)[:m.Size:m.Size] { + out[i] = uint8(t) + } + return out +} + +// AsUint16Slice returns the contents of this map as a slice of unsigned 16-bit integers. +func (m *MapInfo) AsUint16Slice() []uint16 { + out := make([]uint16, m.Size) + for i, t := range (*[1 << 30]uint16)(m.Data)[:m.Size:m.Size] { + out[i] = uint16(t) + } + return out +} + +// AsUint32Slice returns the contents of this map as a slice of unsigned 32-bit integers. +func (m *MapInfo) AsUint32Slice() []uint32 { + out := make([]uint32, m.Size) + for i, t := range (*[1 << 30]uint32)(m.Data)[:m.Size:m.Size] { + out[i] = uint32(t) + } + return out +} + +// AsUint64Slice returns the contents of this map as a slice of unsigned 64-bit integers. +func (m *MapInfo) AsUint64Slice() []uint64 { + out := make([]uint64, m.Size) + for i, t := range (*[1 << 30]uint64)(m.Data)[:m.Size:m.Size] { + out[i] = uint64(t) + } + return out +} + func wrapMapInfo(mapInfo *C.GstMapInfo, unmapFunc func()) *MapInfo { return &MapInfo{ ptr: mapInfo,