decodebin example and fix up signal handling

This commit is contained in:
tinyzimmer
2020-10-04 09:38:25 +03:00
parent c17e48f52e
commit 4017c9e6d0
8 changed files with 234 additions and 193 deletions

2
.gitignore vendored
View File

@@ -4,6 +4,7 @@
*.dll *.dll
*.so *.so
*.dylib *.dylib
*.mp3
# Test binary, built with `go test -c` # Test binary, built with `go test -c`
*.test *.test
@@ -18,3 +19,4 @@ dist/
.vscode/ .vscode/
_bin/ _bin/
.devcontainer/ .devcontainer/
_rust/

View File

@@ -9,6 +9,8 @@ import (
"github.com/tinyzimmer/go-gst/gst" "github.com/tinyzimmer/go-gst/gst"
) )
// ExampleCustomEvent demonstrates a custom event structue. Currerntly nested structs
// are not supported.
type ExampleCustomEvent struct { type ExampleCustomEvent struct {
Count int Count int
SendEOS bool SendEOS bool

170
examples/decodebin/main.go Normal file
View File

@@ -0,0 +1,170 @@
package main
import (
"errors"
"flag"
"fmt"
"os"
"strings"
"time"
"github.com/tinyzimmer/go-gst/examples"
"github.com/tinyzimmer/go-gst/gst"
)
var srcFile string
func buildPipeline() (*gst.Pipeline, error) {
gst.Init(nil)
pipeline, err := gst.NewPipeline("")
if err != nil {
return nil, err
}
src, err := gst.NewElement("filesrc")
if err != nil {
return nil, err
}
decodebin, err := gst.NewElement("decodebin")
if err != nil {
return nil, err
}
src.Set("location", srcFile)
pipeline.AddMany(src, decodebin)
src.Link(decodebin)
// Connect to decodebin's pad-added signal, that is emitted whenever
// it found another stream from the input file and found a way to decode it to its raw format.
// decodebin automatically adds a src-pad for this raw stream, which
// we can use to build the follow-up pipeline.
decodebin.Connect("pad-added", func(self *gst.Element, srcPad *gst.Pad) {
// Try to detect whether this is video or audio
var isAudio, isVideo bool
caps := srcPad.GetCurrentCaps()
for i := 0; i < caps.GetSize(); i++ {
st := caps.GetStructureAt(i)
if strings.HasPrefix(st.Name(), "audio/") {
isAudio = true
}
if strings.HasPrefix(st.Name(), "video/") {
isVideo = true
}
}
fmt.Printf("New pad added, is_audio=%v, is_video=%v\n", isAudio, isVideo)
if !isAudio && !isVideo {
err := errors.New("Could not detect media stream type")
// We can send errors directly to the pipeline bus if they occur.
// These will be handled downstream.
msg := gst.NewErrorMessage(self, gst.NewGError(1, err), fmt.Sprintf("Received caps: %s", caps.String()), nil)
pipeline.GetPipelineBus().Post(msg)
return
}
if isAudio {
// decodebin found a raw audiostream, so we build the follow-up pipeline to
// play it on the default audio playback device (using autoaudiosink).
elements, err := gst.NewElementMany("queue", "audioconvert", "audioresample", "autoaudiosink")
if err != nil {
msg := gst.NewErrorMessage(self, gst.NewGError(2, err), "", nil)
pipeline.GetPipelineBus().Post(msg)
fmt.Println("ERROR: Could not create elements for audio pipeline")
return
}
pipeline.AddMany(elements...)
gst.ElementLinkMany(elements...)
// !!ATTENTION!!:
// This is quite important and people forget it often. Without making sure that
// the new elements have the same state as the pipeline, things will fail later.
// They would still be in Null state and can't process data.
for _, e := range elements {
e.SyncStateWithParent()
}
// The queue was the first element returned above
queue := elements[0]
// Get the queue element's sink pad and link the decodebin's newly created
// src pad for the audio stream to it.
sinkPad := queue.GetStaticPad("sink")
srcPad.Link(sinkPad)
} else if isVideo {
// decodebin found a raw videostream, so we build the follow-up pipeline to
// display it using the autovideosink.
elements, err := gst.NewElementMany("queue", "videoconvert", "videoscale", "autovideosink")
if err != nil {
msg := gst.NewErrorMessage(self, gst.NewGError(2, err), "", nil)
pipeline.GetPipelineBus().Post(msg)
fmt.Println("ERROR: Could not create elements for audio pipeline")
return
}
pipeline.AddMany(elements...)
gst.ElementLinkMany(elements...)
for _, e := range elements {
e.SyncStateWithParent()
}
queue := elements[0]
// Get the queue element's sink pad and link the decodebin's newly created
// src pad for the video stream to it.
sinkPad := queue.GetStaticPad("sink")
srcPad.Link(sinkPad)
}
})
return pipeline, nil
}
func handleMessage(msg *gst.Message) error {
defer msg.Unref() // Messages are a good candidate for trying out runtime finalizers
switch msg.Type() {
case gst.MessageEOS:
return errors.New("end-of-stream")
case gst.MessageError:
return msg.ParseError()
}
return nil
}
func runPipeline(pipeline *gst.Pipeline) error {
pipeline.SetState(gst.StatePlaying)
bus := pipeline.GetPipelineBus()
for {
msg := bus.TimedPop(time.Duration(-1))
if msg == nil {
break
}
if err := handleMessage(msg); err != nil {
return err
}
}
return nil
}
func main() {
flag.StringVar(&srcFile, "f", "", "The file to decode")
flag.Parse()
if srcFile == "" {
flag.Usage()
os.Exit(1)
}
examples.RunLoop(func(loop *gst.MainLoop) error {
pipeline, err := buildPipeline()
if err != nil {
return err
}
return runPipeline(pipeline)
})
}

View File

@@ -1,91 +0,0 @@
package gst
// #include "gst.go.h"
import "C"
import (
"unsafe"
gopointer "github.com/mattn/go-pointer"
)
// AtomicQueue wraps a GstAtomicQueue that can be used from multiple threads
// without performing any blocking operations.
type AtomicQueue struct {
ptr *C.GstAtomicQueue
}
/*
NewAtomicQueue creates a new atomic queue with the given size. The size will
be rounded up to the nearest power of 2 and used as the initial size of the queue.
Example
queue := gst.NewAtomicQueue(2)
defer queue.Unref()
queue.Push("hello world")
fmt.Println("There are", queue.Length(), "item(s) in the queue")
peeked := queue.Peek()
str := peeked.(string)
fmt.Println("Head item in queue is:", str)
fmt.Println("There are", queue.Length(), "item(s) in the queue")
popped := queue.Pop()
str = popped.(string)
fmt.Println("Head item in queue was:", str)
fmt.Println("There are", queue.Length(), "item(s) in the queue")
*/
func NewAtomicQueue(size int) *AtomicQueue {
return wrapAtomicQueue(C.gst_atomic_queue_new(C.guint(size)))
}
// Instance returns the underlying queue instance.
func (a *AtomicQueue) Instance() *C.GstAtomicQueue { return a.ptr }
// Length returns the amount of items in this queue.
func (a *AtomicQueue) Length() int {
return int(C.gst_atomic_queue_length(a.Instance()))
}
// Peek looks at the first item in the queue without removing it. This function
// returns nil if the queue is empty.
func (a *AtomicQueue) Peek() interface{} {
ptr := C.gst_atomic_queue_peek(a.Instance())
if ptr == nil {
return nil
}
return gopointer.Restore(unsafe.Pointer(ptr))
}
// Pop pops the head element off the queue. This function returns nil if the queue
// is empty.
func (a *AtomicQueue) Pop() interface{} {
ptr := C.gst_atomic_queue_pop(a.Instance())
if ptr == nil {
return nil
}
defer gopointer.Unref(unsafe.Pointer(ptr))
return gopointer.Restore(unsafe.Pointer(ptr))
}
// Push appends the given data to the end of the queue.
func (a *AtomicQueue) Push(data interface{}) {
ptr := gopointer.Save(data)
C.gst_atomic_queue_push(a.Instance(), (C.gpointer)(unsafe.Pointer(ptr)))
}
// Ref increases the ref count on the queue by one.
func (a *AtomicQueue) Ref() {
C.gst_atomic_queue_ref(a.Instance())
}
// Unref decreaes the ref count on the queue by one. Memory is freed when the
// refcount reaches zero.
func (a *AtomicQueue) Unref() { C.gst_atomic_queue_unref(a.Instance()) }

View File

@@ -251,3 +251,19 @@ func (e *Element) QueryPosition(format Format) (bool, int64) {
func (e *Element) SendEvent(ev *Event) bool { func (e *Element) SendEvent(ev *Event) bool {
return gobool(C.gst_element_send_event(e.Instance(), ev.Instance())) return gobool(C.gst_element_send_event(e.Instance(), ev.Instance()))
} }
// Connect connects to the given signal on this element, and applies f as the callback. The callback must
// match the signature of the expected callback from the documentation. However, instead of specifying C types
// for arguments specify the go-gst equivalent (e.g. *gst.Element for almost all GstElement derivitives).
func (e *Element) Connect(signal string, f interface{}) (glib.SignalHandle, error) {
// Elements are sometimes their own type unique from TYPE_ELEMENT. So make sure a type marshaler
// is registered for whatever this type is. Use the built-in element marshaler.
glib.RegisterGValueMarshalers([]glib.TypeMarshaler{{T: e.TypeFromInstance(), F: marshalElement}})
return e.Object.Connect(signal, f, nil)
}
// SyncStateWithParent tries to change the state of the element to the same as its parent. If this function returns
// FALSE, the state of element is undefined.
func (e *Element) SyncStateWithParent() bool {
return gobool(C.gst_element_sync_state_with_parent(e.Instance()))
}

View File

@@ -58,6 +58,8 @@ func StructureFromGValue(gval *glib.Value) *Structure {
return wrapStructure(st) return wrapStructure(st)
} }
// MarshalStructure will convert the given go struct into a GstStructure. Currently nested
// structs are not supported.
func MarshalStructure(data interface{}) *Structure { func MarshalStructure(data interface{}) *Structure {
typeOf := reflect.TypeOf(data) typeOf := reflect.TypeOf(data)
valsOf := reflect.ValueOf(data) valsOf := reflect.ValueOf(data)
@@ -70,6 +72,8 @@ func MarshalStructure(data interface{}) *Structure {
return st return st
} }
// UnmarshalInto will unmarshal this structure into the given pointer. The object
// reflected by the pointer must be non-nil.
func (s *Structure) UnmarshalInto(data interface{}) error { func (s *Structure) UnmarshalInto(data interface{}) error {
rv := reflect.ValueOf(data) rv := reflect.ValueOf(data)
if rv.Kind() != reflect.Ptr || rv.IsNil() { if rv.Kind() != reflect.Ptr || rv.IsNil() {

View File

@@ -14,7 +14,6 @@ import (
func init() { func init() {
tm := []glib.TypeMarshaler{ tm := []glib.TypeMarshaler{
// Enums
{ {
T: glib.Type(C.gst_buffering_mode_get_type()), T: glib.Type(C.gst_buffering_mode_get_type()),
F: marshalBufferingMode, F: marshalBufferingMode,
@@ -47,8 +46,6 @@ func init() {
T: glib.Type(C.gst_state_change_return_get_type()), T: glib.Type(C.gst_state_change_return_get_type()),
F: marshalStateChangeReturn, F: marshalStateChangeReturn,
}, },
// Objects/Interfaces
{ {
T: glib.Type(C.gst_buffer_get_type()), T: glib.Type(C.gst_buffer_get_type()),
F: marshalBuffer, F: marshalBuffer,
@@ -101,10 +98,6 @@ func init() {
T: glib.Type(C.GST_TYPE_MEMORY), T: glib.Type(C.GST_TYPE_MEMORY),
F: marshalMemory, F: marshalMemory,
}, },
{
T: glib.Type(C.gst_atomic_queue_get_type()),
F: marshalAtomicQueue,
},
{ {
T: glib.Type(C.bufferListType()), T: glib.Type(C.bufferListType()),
F: marshalBufferList, F: marshalBufferList,
@@ -149,10 +142,12 @@ func init() {
T: glib.Type(C.GST_TYPE_QUERY), T: glib.Type(C.GST_TYPE_QUERY),
F: marshalQuery, F: marshalQuery,
}, },
{
// Boxed T: glib.Type(C.gst_message_get_type()),
{T: glib.Type(C.gst_message_get_type()), F: marshalMessage}, F: marshalMessage,
},
} }
glib.RegisterGValueMarshalers(tm) glib.RegisterGValueMarshalers(tm)
} }
@@ -163,41 +158,40 @@ func uintptrToGVal(p uintptr) *C.GValue {
// Object wrappers // Object wrappers
func wrapAllocator(obj *glib.Object) *Allocator { return &Allocator{wrapObject(obj)} } func wrapAllocator(obj *glib.Object) *Allocator { return &Allocator{wrapObject(obj)} }
func wrapAtomicQueue(queue *C.GstAtomicQueue) *AtomicQueue { return &AtomicQueue{ptr: queue} } func wrapBin(obj *glib.Object) *Bin { return &Bin{wrapElement(obj)} }
func wrapBin(obj *glib.Object) *Bin { return &Bin{wrapElement(obj)} } func wrapBuffer(buf *C.GstBuffer) *Buffer { return &Buffer{ptr: buf} }
func wrapBuffer(buf *C.GstBuffer) *Buffer { return &Buffer{ptr: buf} } func wrapBufferList(bufList *C.GstBufferList) *BufferList { return &BufferList{ptr: bufList} }
func wrapBufferList(bufList *C.GstBufferList) *BufferList { return &BufferList{ptr: bufList} } func wrapBufferPool(obj *glib.Object) *BufferPool { return &BufferPool{wrapObject(obj)} }
func wrapBufferPool(obj *glib.Object) *BufferPool { return &BufferPool{wrapObject(obj)} } func wrapBus(obj *glib.Object) *Bus { return &Bus{Object: wrapObject(obj)} }
func wrapBus(obj *glib.Object) *Bus { return &Bus{Object: wrapObject(obj)} } func wrapCaps(caps *C.GstCaps) *Caps { return &Caps{native: caps} }
func wrapCaps(caps *C.GstCaps) *Caps { return &Caps{native: caps} } func wrapChildProxy(c *C.GstChildProxy) *ChildProxy { return &ChildProxy{ptr: c} }
func wrapChildProxy(c *C.GstChildProxy) *ChildProxy { return &ChildProxy{ptr: c} } func wrapClock(obj *glib.Object) *Clock { return &Clock{wrapObject(obj)} }
func wrapClock(obj *glib.Object) *Clock { return &Clock{wrapObject(obj)} } func wrapContext(ctx *C.GstContext) *Context { return &Context{ptr: ctx} }
func wrapContext(ctx *C.GstContext) *Context { return &Context{ptr: ctx} } func wrapDevice(obj *glib.Object) *Device { return &Device{wrapObject(obj)} }
func wrapDevice(obj *glib.Object) *Device { return &Device{wrapObject(obj)} } func wrapElement(obj *glib.Object) *Element { return &Element{wrapObject(obj)} }
func wrapElement(obj *glib.Object) *Element { return &Element{wrapObject(obj)} } func wrapEvent(ev *C.GstEvent) *Event { return &Event{ptr: ev} }
func wrapEvent(ev *C.GstEvent) *Event { return &Event{ptr: ev} } func wrapGhostPad(obj *glib.Object) *GhostPad { return &GhostPad{wrapProxyPad(obj)} }
func wrapGhostPad(obj *glib.Object) *GhostPad { return &GhostPad{wrapProxyPad(obj)} } func wrapMainContext(ctx *C.GMainContext) *MainContext { return &MainContext{ptr: ctx} }
func wrapMainContext(ctx *C.GMainContext) *MainContext { return &MainContext{ptr: ctx} } func wrapMainLoop(loop *C.GMainLoop) *MainLoop { return &MainLoop{ptr: loop} }
func wrapMainLoop(loop *C.GMainLoop) *MainLoop { return &MainLoop{ptr: loop} } func wrapMemory(mem *C.GstMemory) *Memory { return &Memory{ptr: mem} }
func wrapMemory(mem *C.GstMemory) *Memory { return &Memory{ptr: mem} } func wrapMessage(msg *C.GstMessage) *Message { return &Message{msg: msg} }
func wrapMessage(msg *C.GstMessage) *Message { return &Message{msg: msg} } func wrapMeta(meta *C.GstMeta) *Meta { return &Meta{ptr: meta} }
func wrapMeta(meta *C.GstMeta) *Meta { return &Meta{ptr: meta} } func wrapMetaInfo(info *C.GstMetaInfo) *MetaInfo { return &MetaInfo{ptr: info} }
func wrapMetaInfo(info *C.GstMetaInfo) *MetaInfo { return &MetaInfo{ptr: info} } func wrapPad(obj *glib.Object) *Pad { return &Pad{wrapObject(obj)} }
func wrapPad(obj *glib.Object) *Pad { return &Pad{wrapObject(obj)} } func wrapPadTemplate(obj *glib.Object) *PadTemplate { return &PadTemplate{wrapObject(obj)} }
func wrapPadTemplate(obj *glib.Object) *PadTemplate { return &PadTemplate{wrapObject(obj)} } func wrapPipeline(obj *glib.Object) *Pipeline { return &Pipeline{Bin: wrapBin(obj)} }
func wrapPipeline(obj *glib.Object) *Pipeline { return &Pipeline{Bin: wrapBin(obj)} } func wrapPluginFeature(obj *glib.Object) *PluginFeature { return &PluginFeature{wrapObject(obj)} }
func wrapPluginFeature(obj *glib.Object) *PluginFeature { return &PluginFeature{wrapObject(obj)} } func wrapPlugin(obj *glib.Object) *Plugin { return &Plugin{wrapObject(obj)} }
func wrapPlugin(obj *glib.Object) *Plugin { return &Plugin{wrapObject(obj)} } func wrapProxyPad(obj *glib.Object) *ProxyPad { return &ProxyPad{wrapPad(obj)} }
func wrapProxyPad(obj *glib.Object) *ProxyPad { return &ProxyPad{wrapPad(obj)} } func wrapQuery(query *C.GstQuery) *Query { return &Query{ptr: query} }
func wrapQuery(query *C.GstQuery) *Query { return &Query{ptr: query} } func wrapRegistry(obj *glib.Object) *Registry { return &Registry{wrapObject(obj)} }
func wrapRegistry(obj *glib.Object) *Registry { return &Registry{wrapObject(obj)} } func wrapSample(sample *C.GstSample) *Sample { return &Sample{sample: sample} }
func wrapSample(sample *C.GstSample) *Sample { return &Sample{sample: sample} } func wrapSegment(segment *C.GstSegment) *Segment { return &Segment{ptr: segment} }
func wrapSegment(segment *C.GstSegment) *Segment { return &Segment{ptr: segment} } func wrapStream(obj *glib.Object) *Stream { return &Stream{wrapObject(obj)} }
func wrapStream(obj *glib.Object) *Stream { return &Stream{wrapObject(obj)} } func wrapTagList(tagList *C.GstTagList) *TagList { return &TagList{ptr: tagList} }
func wrapTagList(tagList *C.GstTagList) *TagList { return &TagList{ptr: tagList} } func wrapTOC(toc *C.GstToc) *TOC { return &TOC{ptr: toc} }
func wrapTOC(toc *C.GstToc) *TOC { return &TOC{ptr: toc} } func wrapTOCEntry(toc *C.GstTocEntry) *TOCEntry { return &TOCEntry{ptr: toc} }
func wrapTOCEntry(toc *C.GstTocEntry) *TOCEntry { return &TOCEntry{ptr: toc} }
func wrapCapsFeatures(features *C.GstCapsFeatures) *CapsFeatures { func wrapCapsFeatures(features *C.GstCapsFeatures) *CapsFeatures {
return &CapsFeatures{native: features} return &CapsFeatures{native: features}
@@ -356,12 +350,6 @@ func marshalMemory(p uintptr) (interface{}, error) {
return wrapMemory(obj), nil return wrapMemory(obj), nil
} }
func marshalAtomicQueue(p uintptr) (interface{}, error) {
c := C.g_value_get_object(uintptrToGVal(p))
obj := (*C.GstAtomicQueue)(unsafe.Pointer(c))
return wrapAtomicQueue(obj), nil
}
func marshalBuffer(p uintptr) (interface{}, error) { func marshalBuffer(p uintptr) (interface{}, error) {
c := C.getBufferValue(uintptrToGVal(p)) c := C.getBufferValue(uintptrToGVal(p))
return wrapBuffer(c), nil return wrapBuffer(c), nil

View File

@@ -1,50 +0,0 @@
package main
import (
"fmt"
"time"
"github.com/tinyzimmer/go-gst/gst"
)
func wait() {
gst.Init(nil)
clock := gst.ObtainSystemClock()
id := clock.NewSingleShotID(clock.GetTime() + gst.ClockTime(time.Minute.Nanoseconds()))
go func() {
res, _ := id.Wait()
if res != gst.ClockOK {
panic(res)
}
fmt.Println("I waited")
}()
fmt.Println("I am waiting")
time.Sleep(time.Second)
fmt.Println("Still waiting")
}
func capsWeirdness() {
gst.Init(nil)
caps := gst.NewCapsFromString("audio/x-raw")
caps.ForEach(func(features *gst.CapsFeatures, structure *gst.Structure) bool {
fmt.Println(features)
return true
})
caps.FilterAndMapInPlace(func(features *gst.CapsFeatures, structure *gst.Structure) bool {
fmt.Println(features)
return true
})
caps.Unref()
}
func main() {
wait()
}