From fa165ecf85f08d39f8e11e15eab9f5524388248b Mon Sep 17 00:00:00 2001 From: tinyzimmer <38474291+tinyzimmer@users.noreply.github.com> Date: Sat, 3 Oct 2020 17:10:03 +0300 Subject: [PATCH] finish appsrc impl --- gst/app/cgo_exports.go | 62 ++++++++++++-- gst/app/gst_app_sink.go | 6 +- gst/app/gst_app_src.go | 184 ++++++++++++++++++++++++++++++++++++---- 3 files changed, 228 insertions(+), 24 deletions(-) diff --git a/gst/app/cgo_exports.go b/gst/app/cgo_exports.go index fe83aec..d780997 100644 --- a/gst/app/cgo_exports.go +++ b/gst/app/cgo_exports.go @@ -9,7 +9,7 @@ import ( "github.com/tinyzimmer/go-gst/gst" ) -func getCbsFromPtr(userData C.gpointer) *SinkCallbacks { +func getSinkCbsFromPtr(userData C.gpointer) *SinkCallbacks { ptr := gopointer.Restore(unsafe.Pointer(userData)) cbs, ok := ptr.(*SinkCallbacks) if !ok { @@ -19,13 +19,63 @@ func getCbsFromPtr(userData C.gpointer) *SinkCallbacks { return cbs } +func getSrcCbsFromPtr(userData C.gpointer) *SourceCallbacks { + ptr := gopointer.Restore(unsafe.Pointer(userData)) + cbs, ok := ptr.(*SourceCallbacks) + if !ok { + gopointer.Unref(unsafe.Pointer(userData)) + return nil + } + return cbs +} + func wrapCSink(sink *C.GstAppSink) *Sink { return wrapAppSink(gst.FromGstElementUnsafe(unsafe.Pointer(sink))) } +func wrapCSource(src *C.GstAppSrc) *Source { + return wrapAppSrc(gst.FromGstElementUnsafe(unsafe.Pointer(src))) +} + +//export goNeedDataCb +func goNeedDataCb(src *C.GstAppSrc, length C.guint, userData C.gpointer) { + cbs := getSrcCbsFromPtr(userData) + if cbs == nil { + return + } + if cbs.NeedDataFunc == nil { + return + } + cbs.NeedDataFunc(wrapCSource(src), uint(length)) +} + +//export goEnoughDataDb +func goEnoughDataDb(src *C.GstAppSrc, userData C.gpointer) { + cbs := getSrcCbsFromPtr(userData) + if cbs == nil { + return + } + if cbs.EnoughDataFunc == nil { + return + } + cbs.EnoughDataFunc(wrapCSource(src)) +} + +//export goSeekDataCb +func goSeekDataCb(src *C.GstAppSrc, offset C.guint64, userData C.gpointer) C.gboolean { + cbs := getSrcCbsFromPtr(userData) + if cbs == nil { + return gboolean(false) + } + if cbs.SeekDataFunc == nil { + return gboolean(true) + } + return gboolean(cbs.SeekDataFunc(wrapCSource(src), uint64(offset))) +} + //export goSinkEOSCb func goSinkEOSCb(sink *C.GstAppSink, userData C.gpointer) { - cbs := getCbsFromPtr(userData) + cbs := getSinkCbsFromPtr(userData) if cbs == nil { return } @@ -37,7 +87,7 @@ func goSinkEOSCb(sink *C.GstAppSink, userData C.gpointer) { //export goSinkNewPrerollCb func goSinkNewPrerollCb(sink *C.GstAppSink, userData C.gpointer) C.GstFlowReturn { - cbs := getCbsFromPtr(userData) + cbs := getSinkCbsFromPtr(userData) if cbs == nil { return C.GstFlowReturn(gst.FlowError) } @@ -49,7 +99,7 @@ func goSinkNewPrerollCb(sink *C.GstAppSink, userData C.gpointer) C.GstFlowReturn //export goSinkNewSampleCb func goSinkNewSampleCb(sink *C.GstAppSink, userData C.gpointer) C.GstFlowReturn { - cbs := getCbsFromPtr(userData) + cbs := getSinkCbsFromPtr(userData) if cbs == nil { return C.GstFlowReturn(gst.FlowError) } @@ -59,7 +109,7 @@ func goSinkNewSampleCb(sink *C.GstAppSink, userData C.gpointer) C.GstFlowReturn return C.GstFlowReturn(cbs.NewSampleFunc(wrapCSink(sink))) } -//export goSinkGDestroyNotifyFunc -func goSinkGDestroyNotifyFunc(ptr C.gpointer) { +//export goAppGDestroyNotifyFunc +func goAppGDestroyNotifyFunc(ptr C.gpointer) { gopointer.Unref(unsafe.Pointer(ptr)) } diff --git a/gst/app/gst_app_sink.go b/gst/app/gst_app_sink.go index 8736e93..6ab4889 100644 --- a/gst/app/gst_app_sink.go +++ b/gst/app/gst_app_sink.go @@ -3,13 +3,13 @@ package app /* #include "gst.go.h" -extern void goSinkGDestroyNotifyFunc (gpointer user_data); +extern void goAppGDestroyNotifyFunc (gpointer user_data); extern void goSinkEOSCb (GstAppSink * sink, gpointer user_data); extern GstFlowReturn goSinkNewPrerollCb (GstAppSink * sink, gpointer user_data); extern GstFlowReturn goSinkNewSampleCb (GstAppSink * sink, gpointer user_data); -void cgoGDestroyNotifyFunc (gpointer user_data) { goSinkGDestroyNotifyFunc(user_data); } +void cgoSinkGDestroyNotifyFunc (gpointer user_data) { goAppGDestroyNotifyFunc(user_data); } void cgoSinkEOSCb (GstAppSink * sink, gpointer user_data) { return goSinkEOSCb(sink, user_data); } GstFlowReturn cgoSinkNewPrerollCb (GstAppSink * sink, gpointer user_data) { return goSinkNewPrerollCb(sink, user_data); } GstFlowReturn cgoSinkNewSampleCb (GstAppSink * sink, gpointer user_data) { return goSinkNewSampleCb(sink, user_data); } @@ -149,7 +149,7 @@ func (a *Sink) SetCallbacks(cbs *SinkCallbacks) { a.Instance(), appSinkCallbacks, (C.gpointer)(unsafe.Pointer(ptr)), - C.GDestroyNotify(C.cgoGDestroyNotifyFunc), + C.GDestroyNotify(C.cgoSinkGDestroyNotifyFunc), ) } diff --git a/gst/app/gst_app_src.go b/gst/app/gst_app_src.go index e5ef2d4..d5b02ca 100644 --- a/gst/app/gst_app_src.go +++ b/gst/app/gst_app_src.go @@ -1,14 +1,47 @@ package app -// #include "gst.go.h" +/* +#include "gst.go.h" + +extern void goAppGDestroyNotifyFunc (gpointer user_data); + +extern void goNeedDataCb (GstAppSrc *src, guint length, gpointer user_data); +extern void goEnoughDataDb (GstAppSrc *src, gpointer user_data); +extern gboolean goSeekDataCb (GstAppSrc *src, guint64 offset, gpointer user_data); + +void cgoSrcGDestroyNotifyFunc (gpointer user_data) { goAppGDestroyNotifyFunc(user_data); } + +void cgoNeedDataCb (GstAppSrc *src, guint length, gpointer user_data) { goNeedDataCb(src, length, user_data); } +void cgoEnoughDataCb (GstAppSrc *src, gpointer user_data) { goEnoughDataDb(src, user_data); } +gboolean cgoSeekDataCb (GstAppSrc *src, guint64 offset, gpointer user_data) { return goSeekDataCb(src, offset, user_data); } + +*/ import "C" import ( "time" "unsafe" + gopointer "github.com/mattn/go-pointer" "github.com/tinyzimmer/go-gst/gst" ) +// SourceCallbacks represents callbacks to configure on an AppSource. +type SourceCallbacks struct { + NeedDataFunc func(src *Source, length uint) + EnoughDataFunc func(src *Source) + SeekDataFunc func(src *Source, offset uint64) bool +} + +// StreamType casts GstAppStreamType +type StreamType int + +// Type castings +const ( + AppStreamTypeStream StreamType = C.GST_APP_STREAM_TYPE_STREAM // (0) – No seeking is supported in the stream, such as a live stream. + AppStreamTypeSeekable StreamType = C.GST_APP_STREAM_TYPE_SEEKABLE // (1) – The stream is seekable but seeking might not be very fast, such as data from a webserver. + AppStreamTypeRandomAccess StreamType = C.GST_APP_STREAM_TYPE_RANDOM_ACCESS // (2) – The stream is seekable and seeking is fast, such as in a local file. +) + // Source wraps an Element made with the appsrc plugin with additional methods for pushing samples. type Source struct{ *gst.Element } @@ -24,18 +57,6 @@ func NewAppSrc() (*Source, error) { // Instance returns the native GstAppSink instance. func (a *Source) Instance() *C.GstAppSrc { return C.toGstAppSrc(a.Unsafe()) } -// SetSize sets the size of the source stream in bytes. You should call this for -// streams of fixed length. -func (a *Source) SetSize(size int64) { - C.gst_app_src_set_size((*C.GstAppSrc)(a.Instance()), (C.gint64)(size)) -} - -// SetDuration sets the duration of the source stream. You should call -// this if the value is known. -func (a *Source) SetDuration(dur time.Duration) { - C.gst_app_src_set_duration((*C.GstAppSrc)(a.Instance()), C.GstClockTime(dur.Nanoseconds())) -} - // EndStream signals to the app source that the stream has ended after the last queued // buffer. func (a *Source) EndStream() gst.FlowReturn { @@ -43,8 +64,55 @@ func (a *Source) EndStream() gst.FlowReturn { return gst.FlowReturn(ret) } -// SetLive sets whether or not this is a live stream. -func (a *Source) SetLive(b bool) error { return a.Set("is-live", b) } +// GetCaps gets the configures caps on the app src. +func (a *Source) GetCaps() *gst.Caps { + caps := C.gst_app_src_get_caps(a.Instance()) + if caps == nil { + return nil + } + return gst.FromGstCapsUnsafe(unsafe.Pointer(caps)) +} + +// GetCurrentLevelBytes gets the number of currently queued bytes inside appsrc. +func (a *Source) GetCurrentLevelBytes() uint64 { + return uint64(C.gst_app_src_get_current_level_bytes(a.Instance())) +} + +// GetDuration gets the duration of the stream in nanoseconds. A negative value means that the duration is not known. +func (a *Source) GetDuration() time.Duration { + dur := C.gst_app_src_get_duration(a.Instance()) + if gst.ClockTime(dur) == gst.ClockTimeNone { + return time.Duration(-1) + } + return time.Duration(uint64(dur)) * time.Nanosecond +} + +// GetEmitSignals checks if appsrc will emit the "new-preroll" and "new-buffer" signals. +func (a *Source) GetEmitSignals() bool { + return gobool(C.gst_app_src_get_emit_signals(a.Instance())) +} + +// GetLatency retrieves the min and max latencies in min and max respectively. +func (a *Source) GetLatency() (min, max uint64) { + var gmin, gmax C.guint64 + C.gst_app_src_get_latency(a.Instance(), &gmin, &gmax) + return uint64(gmin), uint64(gmax) +} + +// GetMaxBytes gets the maximum amount of bytes that can be queued in appsrc. +func (a *Source) GetMaxBytes() uint64 { + return uint64(C.gst_app_src_get_max_bytes(a.Instance())) +} + +// GetSize gets the size of the stream in bytes. A value of -1 means that the size is not known. +func (a *Source) GetSize() int64 { + return int64(C.gst_app_src_get_size(a.Instance())) +} + +// GetStreamType gets the stream type. Control the stream type of appsrc with SetStreamType. +func (a *Source) GetStreamType() StreamType { + return StreamType(C.gst_app_src_get_stream_type(a.Instance())) +} // PushBuffer pushes a buffer to the appsrc. Currently by default this will block // until the source is ready to accept the buffer. @@ -55,3 +123,89 @@ func (a *Source) PushBuffer(buf *gst.Buffer) gst.FlowReturn { ) return gst.FlowReturn(ret) } + +// PushBufferList adds a buffer list to the queue of buffers and buffer lists that the appsrc element will push +// to its source pad. This function takes ownership of buffer_list. +// +// When the block property is TRUE, this function can block until free space becomes available in the queue. +func (a *Source) PushBufferList(bufList *gst.BufferList) gst.FlowReturn { + return gst.FlowReturn(C.gst_app_src_push_buffer_list( + a.Instance(), (*C.GstBufferList)(unsafe.Pointer(bufList.Instance())), + )) +} + +// PushSample Extract a buffer from the provided sample and adds it to the queue of buffers that the appsrc element will +// push to its source pad. Any previous caps that were set on appsrc will be replaced by the caps associated with the +// sample if not equal. +// +// This function does not take ownership of the sample so the sample needs to be unreffed after calling this function. +// +// When the block property is TRUE, this function can block until free space becomes available in the queue. +func (a *Source) PushSample(sample *gst.Sample) gst.FlowReturn { + return gst.FlowReturn(C.gst_app_src_push_sample( + a.Instance(), (*C.GstSample)(unsafe.Pointer(sample.Instance())), + )) +} + +// SetCallbacks sets callbacks which will be executed when data is needed, enough data has been collected or when a seek +// should be performed. This is an alternative to using the signals, it has lower overhead and is thus less expensive, +// but also less flexible. +// +// If callbacks are installed, no signals will be emitted for performance reasons. +// +// Before 1.16.3 it was not possible to change the callbacks in a thread-safe way. +func (a *Source) SetCallbacks(cbs *SourceCallbacks) { + ptr := gopointer.Save(cbs) + appSrcCallbacks := &C.GstAppSrcCallbacks{ + need_data: (*[0]byte)(unsafe.Pointer(C.cgoNeedDataCb)), + enough_data: (*[0]byte)(unsafe.Pointer(C.cgoEnoughDataCb)), + seek_data: (*[0]byte)(unsafe.Pointer(C.cgoSeekDataCb)), + } + C.gst_app_src_set_callbacks( + a.Instance(), + appSrcCallbacks, + (C.gpointer)(unsafe.Pointer(ptr)), + C.GDestroyNotify(C.cgoSrcGDestroyNotifyFunc), + ) +} + +// SetCaps sets the capabilities on the appsrc element. This function takes a copy of the caps structure. After calling this method, +// the source will only produce caps that match caps. caps must be fixed and the caps on the buffers must match the caps or left NULL. +func (a *Source) SetCaps(caps *gst.Caps) { + C.gst_app_src_set_caps(a.Instance(), (*C.GstCaps)(unsafe.Pointer(caps.Instance()))) +} + +// SetDuration sets the duration of the source stream. You should call +// this if the value is known. +func (a *Source) SetDuration(dur time.Duration) { + C.gst_app_src_set_duration((*C.GstAppSrc)(a.Instance()), C.GstClockTime(dur.Nanoseconds())) +} + +// SetEmitSignals makes appsrc emit the "new-preroll" and "new-buffer" signals. This option is by default disabled because signal emission +// is expensive and unneeded when the application prefers to operate in pull mode. +func (a *Source) SetEmitSignals(emit bool) { + C.gst_app_src_set_emit_signals(a.Instance(), gboolean(emit)) +} + +// SetLatency configures the min and max latency in src. If min is set to -1, the default latency calculations for pseudo-live sources +// will be used. +func (a *Source) SetLatency(min, max uint64) { + C.gst_app_src_set_latency(a.Instance(), C.guint64(min), C.guint64(max)) +} + +// SetMaxBytes sets the maximum amount of bytes that can be queued in appsrc. After the maximum amount of bytes are queued, appsrc will +// emit the "enough-data" signal. +func (a *Source) SetMaxBytes(max uint64) { + C.gst_app_src_set_max_bytes(a.Instance(), C.guint64(max)) +} + +// SetSize sets the size of the source stream in bytes. You should call this for +// streams of fixed length. +func (a *Source) SetSize(size int64) { + C.gst_app_src_set_size((*C.GstAppSrc)(a.Instance()), (C.gint64)(size)) +} + +// SetStreamType sets the stream type on appsrc. For seekable streams, the "seek" signal must be connected to. +func (a *Source) SetStreamType(streamType StreamType) { + C.gst_app_src_set_stream_type(a.Instance(), C.GstAppStreamType(streamType)) +}