finish appsrc impl

This commit is contained in:
tinyzimmer
2020-10-03 17:10:03 +03:00
parent c172f1e663
commit fa165ecf85
3 changed files with 228 additions and 24 deletions

View File

@@ -9,7 +9,7 @@ import (
"github.com/tinyzimmer/go-gst/gst"
)
func getCbsFromPtr(userData C.gpointer) *SinkCallbacks {
func getSinkCbsFromPtr(userData C.gpointer) *SinkCallbacks {
ptr := gopointer.Restore(unsafe.Pointer(userData))
cbs, ok := ptr.(*SinkCallbacks)
if !ok {
@@ -19,13 +19,63 @@ func getCbsFromPtr(userData C.gpointer) *SinkCallbacks {
return cbs
}
func getSrcCbsFromPtr(userData C.gpointer) *SourceCallbacks {
ptr := gopointer.Restore(unsafe.Pointer(userData))
cbs, ok := ptr.(*SourceCallbacks)
if !ok {
gopointer.Unref(unsafe.Pointer(userData))
return nil
}
return cbs
}
func wrapCSink(sink *C.GstAppSink) *Sink {
return wrapAppSink(gst.FromGstElementUnsafe(unsafe.Pointer(sink)))
}
func wrapCSource(src *C.GstAppSrc) *Source {
return wrapAppSrc(gst.FromGstElementUnsafe(unsafe.Pointer(src)))
}
//export goNeedDataCb
func goNeedDataCb(src *C.GstAppSrc, length C.guint, userData C.gpointer) {
cbs := getSrcCbsFromPtr(userData)
if cbs == nil {
return
}
if cbs.NeedDataFunc == nil {
return
}
cbs.NeedDataFunc(wrapCSource(src), uint(length))
}
//export goEnoughDataDb
func goEnoughDataDb(src *C.GstAppSrc, userData C.gpointer) {
cbs := getSrcCbsFromPtr(userData)
if cbs == nil {
return
}
if cbs.EnoughDataFunc == nil {
return
}
cbs.EnoughDataFunc(wrapCSource(src))
}
//export goSeekDataCb
func goSeekDataCb(src *C.GstAppSrc, offset C.guint64, userData C.gpointer) C.gboolean {
cbs := getSrcCbsFromPtr(userData)
if cbs == nil {
return gboolean(false)
}
if cbs.SeekDataFunc == nil {
return gboolean(true)
}
return gboolean(cbs.SeekDataFunc(wrapCSource(src), uint64(offset)))
}
//export goSinkEOSCb
func goSinkEOSCb(sink *C.GstAppSink, userData C.gpointer) {
cbs := getCbsFromPtr(userData)
cbs := getSinkCbsFromPtr(userData)
if cbs == nil {
return
}
@@ -37,7 +87,7 @@ func goSinkEOSCb(sink *C.GstAppSink, userData C.gpointer) {
//export goSinkNewPrerollCb
func goSinkNewPrerollCb(sink *C.GstAppSink, userData C.gpointer) C.GstFlowReturn {
cbs := getCbsFromPtr(userData)
cbs := getSinkCbsFromPtr(userData)
if cbs == nil {
return C.GstFlowReturn(gst.FlowError)
}
@@ -49,7 +99,7 @@ func goSinkNewPrerollCb(sink *C.GstAppSink, userData C.gpointer) C.GstFlowReturn
//export goSinkNewSampleCb
func goSinkNewSampleCb(sink *C.GstAppSink, userData C.gpointer) C.GstFlowReturn {
cbs := getCbsFromPtr(userData)
cbs := getSinkCbsFromPtr(userData)
if cbs == nil {
return C.GstFlowReturn(gst.FlowError)
}
@@ -59,7 +109,7 @@ func goSinkNewSampleCb(sink *C.GstAppSink, userData C.gpointer) C.GstFlowReturn
return C.GstFlowReturn(cbs.NewSampleFunc(wrapCSink(sink)))
}
//export goSinkGDestroyNotifyFunc
func goSinkGDestroyNotifyFunc(ptr C.gpointer) {
//export goAppGDestroyNotifyFunc
func goAppGDestroyNotifyFunc(ptr C.gpointer) {
gopointer.Unref(unsafe.Pointer(ptr))
}

View File

@@ -3,13 +3,13 @@ package app
/*
#include "gst.go.h"
extern void goSinkGDestroyNotifyFunc (gpointer user_data);
extern void goAppGDestroyNotifyFunc (gpointer user_data);
extern void goSinkEOSCb (GstAppSink * sink, gpointer user_data);
extern GstFlowReturn goSinkNewPrerollCb (GstAppSink * sink, gpointer user_data);
extern GstFlowReturn goSinkNewSampleCb (GstAppSink * sink, gpointer user_data);
void cgoGDestroyNotifyFunc (gpointer user_data) { goSinkGDestroyNotifyFunc(user_data); }
void cgoSinkGDestroyNotifyFunc (gpointer user_data) { goAppGDestroyNotifyFunc(user_data); }
void cgoSinkEOSCb (GstAppSink * sink, gpointer user_data) { return goSinkEOSCb(sink, user_data); }
GstFlowReturn cgoSinkNewPrerollCb (GstAppSink * sink, gpointer user_data) { return goSinkNewPrerollCb(sink, user_data); }
GstFlowReturn cgoSinkNewSampleCb (GstAppSink * sink, gpointer user_data) { return goSinkNewSampleCb(sink, user_data); }
@@ -149,7 +149,7 @@ func (a *Sink) SetCallbacks(cbs *SinkCallbacks) {
a.Instance(),
appSinkCallbacks,
(C.gpointer)(unsafe.Pointer(ptr)),
C.GDestroyNotify(C.cgoGDestroyNotifyFunc),
C.GDestroyNotify(C.cgoSinkGDestroyNotifyFunc),
)
}

View File

@@ -1,14 +1,47 @@
package app
// #include "gst.go.h"
/*
#include "gst.go.h"
extern void goAppGDestroyNotifyFunc (gpointer user_data);
extern void goNeedDataCb (GstAppSrc *src, guint length, gpointer user_data);
extern void goEnoughDataDb (GstAppSrc *src, gpointer user_data);
extern gboolean goSeekDataCb (GstAppSrc *src, guint64 offset, gpointer user_data);
void cgoSrcGDestroyNotifyFunc (gpointer user_data) { goAppGDestroyNotifyFunc(user_data); }
void cgoNeedDataCb (GstAppSrc *src, guint length, gpointer user_data) { goNeedDataCb(src, length, user_data); }
void cgoEnoughDataCb (GstAppSrc *src, gpointer user_data) { goEnoughDataDb(src, user_data); }
gboolean cgoSeekDataCb (GstAppSrc *src, guint64 offset, gpointer user_data) { return goSeekDataCb(src, offset, user_data); }
*/
import "C"
import (
"time"
"unsafe"
gopointer "github.com/mattn/go-pointer"
"github.com/tinyzimmer/go-gst/gst"
)
// SourceCallbacks represents callbacks to configure on an AppSource.
type SourceCallbacks struct {
NeedDataFunc func(src *Source, length uint)
EnoughDataFunc func(src *Source)
SeekDataFunc func(src *Source, offset uint64) bool
}
// StreamType casts GstAppStreamType
type StreamType int
// Type castings
const (
AppStreamTypeStream StreamType = C.GST_APP_STREAM_TYPE_STREAM // (0) No seeking is supported in the stream, such as a live stream.
AppStreamTypeSeekable StreamType = C.GST_APP_STREAM_TYPE_SEEKABLE // (1) The stream is seekable but seeking might not be very fast, such as data from a webserver.
AppStreamTypeRandomAccess StreamType = C.GST_APP_STREAM_TYPE_RANDOM_ACCESS // (2) The stream is seekable and seeking is fast, such as in a local file.
)
// Source wraps an Element made with the appsrc plugin with additional methods for pushing samples.
type Source struct{ *gst.Element }
@@ -24,18 +57,6 @@ func NewAppSrc() (*Source, error) {
// Instance returns the native GstAppSink instance.
func (a *Source) Instance() *C.GstAppSrc { return C.toGstAppSrc(a.Unsafe()) }
// SetSize sets the size of the source stream in bytes. You should call this for
// streams of fixed length.
func (a *Source) SetSize(size int64) {
C.gst_app_src_set_size((*C.GstAppSrc)(a.Instance()), (C.gint64)(size))
}
// SetDuration sets the duration of the source stream. You should call
// this if the value is known.
func (a *Source) SetDuration(dur time.Duration) {
C.gst_app_src_set_duration((*C.GstAppSrc)(a.Instance()), C.GstClockTime(dur.Nanoseconds()))
}
// EndStream signals to the app source that the stream has ended after the last queued
// buffer.
func (a *Source) EndStream() gst.FlowReturn {
@@ -43,8 +64,55 @@ func (a *Source) EndStream() gst.FlowReturn {
return gst.FlowReturn(ret)
}
// SetLive sets whether or not this is a live stream.
func (a *Source) SetLive(b bool) error { return a.Set("is-live", b) }
// GetCaps gets the configures caps on the app src.
func (a *Source) GetCaps() *gst.Caps {
caps := C.gst_app_src_get_caps(a.Instance())
if caps == nil {
return nil
}
return gst.FromGstCapsUnsafe(unsafe.Pointer(caps))
}
// GetCurrentLevelBytes gets the number of currently queued bytes inside appsrc.
func (a *Source) GetCurrentLevelBytes() uint64 {
return uint64(C.gst_app_src_get_current_level_bytes(a.Instance()))
}
// GetDuration gets the duration of the stream in nanoseconds. A negative value means that the duration is not known.
func (a *Source) GetDuration() time.Duration {
dur := C.gst_app_src_get_duration(a.Instance())
if gst.ClockTime(dur) == gst.ClockTimeNone {
return time.Duration(-1)
}
return time.Duration(uint64(dur)) * time.Nanosecond
}
// GetEmitSignals checks if appsrc will emit the "new-preroll" and "new-buffer" signals.
func (a *Source) GetEmitSignals() bool {
return gobool(C.gst_app_src_get_emit_signals(a.Instance()))
}
// GetLatency retrieves the min and max latencies in min and max respectively.
func (a *Source) GetLatency() (min, max uint64) {
var gmin, gmax C.guint64
C.gst_app_src_get_latency(a.Instance(), &gmin, &gmax)
return uint64(gmin), uint64(gmax)
}
// GetMaxBytes gets the maximum amount of bytes that can be queued in appsrc.
func (a *Source) GetMaxBytes() uint64 {
return uint64(C.gst_app_src_get_max_bytes(a.Instance()))
}
// GetSize gets the size of the stream in bytes. A value of -1 means that the size is not known.
func (a *Source) GetSize() int64 {
return int64(C.gst_app_src_get_size(a.Instance()))
}
// GetStreamType gets the stream type. Control the stream type of appsrc with SetStreamType.
func (a *Source) GetStreamType() StreamType {
return StreamType(C.gst_app_src_get_stream_type(a.Instance()))
}
// PushBuffer pushes a buffer to the appsrc. Currently by default this will block
// until the source is ready to accept the buffer.
@@ -55,3 +123,89 @@ func (a *Source) PushBuffer(buf *gst.Buffer) gst.FlowReturn {
)
return gst.FlowReturn(ret)
}
// PushBufferList adds a buffer list to the queue of buffers and buffer lists that the appsrc element will push
// to its source pad. This function takes ownership of buffer_list.
//
// When the block property is TRUE, this function can block until free space becomes available in the queue.
func (a *Source) PushBufferList(bufList *gst.BufferList) gst.FlowReturn {
return gst.FlowReturn(C.gst_app_src_push_buffer_list(
a.Instance(), (*C.GstBufferList)(unsafe.Pointer(bufList.Instance())),
))
}
// PushSample Extract a buffer from the provided sample and adds it to the queue of buffers that the appsrc element will
// push to its source pad. Any previous caps that were set on appsrc will be replaced by the caps associated with the
// sample if not equal.
//
// This function does not take ownership of the sample so the sample needs to be unreffed after calling this function.
//
// When the block property is TRUE, this function can block until free space becomes available in the queue.
func (a *Source) PushSample(sample *gst.Sample) gst.FlowReturn {
return gst.FlowReturn(C.gst_app_src_push_sample(
a.Instance(), (*C.GstSample)(unsafe.Pointer(sample.Instance())),
))
}
// SetCallbacks sets callbacks which will be executed when data is needed, enough data has been collected or when a seek
// should be performed. This is an alternative to using the signals, it has lower overhead and is thus less expensive,
// but also less flexible.
//
// If callbacks are installed, no signals will be emitted for performance reasons.
//
// Before 1.16.3 it was not possible to change the callbacks in a thread-safe way.
func (a *Source) SetCallbacks(cbs *SourceCallbacks) {
ptr := gopointer.Save(cbs)
appSrcCallbacks := &C.GstAppSrcCallbacks{
need_data: (*[0]byte)(unsafe.Pointer(C.cgoNeedDataCb)),
enough_data: (*[0]byte)(unsafe.Pointer(C.cgoEnoughDataCb)),
seek_data: (*[0]byte)(unsafe.Pointer(C.cgoSeekDataCb)),
}
C.gst_app_src_set_callbacks(
a.Instance(),
appSrcCallbacks,
(C.gpointer)(unsafe.Pointer(ptr)),
C.GDestroyNotify(C.cgoSrcGDestroyNotifyFunc),
)
}
// SetCaps sets the capabilities on the appsrc element. This function takes a copy of the caps structure. After calling this method,
// the source will only produce caps that match caps. caps must be fixed and the caps on the buffers must match the caps or left NULL.
func (a *Source) SetCaps(caps *gst.Caps) {
C.gst_app_src_set_caps(a.Instance(), (*C.GstCaps)(unsafe.Pointer(caps.Instance())))
}
// SetDuration sets the duration of the source stream. You should call
// this if the value is known.
func (a *Source) SetDuration(dur time.Duration) {
C.gst_app_src_set_duration((*C.GstAppSrc)(a.Instance()), C.GstClockTime(dur.Nanoseconds()))
}
// SetEmitSignals makes appsrc emit the "new-preroll" and "new-buffer" signals. This option is by default disabled because signal emission
// is expensive and unneeded when the application prefers to operate in pull mode.
func (a *Source) SetEmitSignals(emit bool) {
C.gst_app_src_set_emit_signals(a.Instance(), gboolean(emit))
}
// SetLatency configures the min and max latency in src. If min is set to -1, the default latency calculations for pseudo-live sources
// will be used.
func (a *Source) SetLatency(min, max uint64) {
C.gst_app_src_set_latency(a.Instance(), C.guint64(min), C.guint64(max))
}
// SetMaxBytes sets the maximum amount of bytes that can be queued in appsrc. After the maximum amount of bytes are queued, appsrc will
// emit the "enough-data" signal.
func (a *Source) SetMaxBytes(max uint64) {
C.gst_app_src_set_max_bytes(a.Instance(), C.guint64(max))
}
// SetSize sets the size of the source stream in bytes. You should call this for
// streams of fixed length.
func (a *Source) SetSize(size int64) {
C.gst_app_src_set_size((*C.GstAppSrc)(a.Instance()), (C.gint64)(size))
}
// SetStreamType sets the stream type on appsrc. For seekable streams, the "seek" signal must be connected to.
func (a *Source) SetStreamType(streamType StreamType) {
C.gst_app_src_set_stream_type(a.Instance(), C.GstAppStreamType(streamType))
}