port more examples over to new generated bindings

plugins not yet working, examples mostly untested
This commit is contained in:
RSWilli
2025-09-16 22:36:07 +02:00
parent 9fb9393213
commit 846581a077
37 changed files with 364 additions and 3196 deletions

View File

@@ -2,11 +2,6 @@
This directory contains examples of some common use cases of gstreamer using the go bindings.
The common package provided to each example exports two methods.
- `Run(f)` - This wraps the given function in a goroutine and wraps a GMainLoop around it.
- `RunLoop(f(loop))` - This simply creates (but does not start) a GMainLoop and passes it to the example to manage.
Each example can be run in one of two ways:
```bash
@@ -16,4 +11,6 @@ go run <example>/main.go [..args]
# For multiple-file examples (but would also work for single file examples)
cd <example> && go build .
./<example> [..args]
```
```
See the plugins subdirectory to learn how to write custom elements in `go-gst`

View File

@@ -50,13 +50,9 @@ func buildPipeline() (*gst.Pipeline, error) {
pipeline := gst.NewPipeline("")
src, ok := gst.ElementFactoryMake("filesrc", "").(*gst.Element)
src := gst.ElementFactoryMake("filesrc", "")
if !ok {
return nil, fmt.Errorf("could not create filesource")
}
decodebin, ok := gst.ElementFactoryMake("decodebin", "").(*gst.Bin)
decodebin, ok := gst.ElementFactoryMake("decodebin", "").(*gst.Bin) // must cast since we need a weak reference
if !ok {
return nil, fmt.Errorf("could not create decodebin")
}

View File

@@ -15,13 +15,13 @@ import (
"os"
"time"
"github.com/go-gst/go-gst/gst"
"github.com/go-gst/go-gst/gst/pbutils"
"github.com/go-gst/go-gst/pkg/gst"
"github.com/go-gst/go-gst/pkg/gstpbutils"
)
func main() {
gst.Init(nil)
gst.Init()
if len(os.Args) < 2 {
fmt.Printf("USAGE: %s <uri>\n", os.Args[0])
@@ -30,7 +30,7 @@ func main() {
uri := os.Args[1]
discoverer, err := pbutils.NewDiscoverer(gst.ClockTime(time.Second * 15))
discoverer, err := gstpbutils.NewDiscoverer(gst.ClockTime(time.Second * 15))
if err != nil {
fmt.Println("ERROR:", err)
os.Exit(2)
@@ -45,23 +45,23 @@ func main() {
printDiscovererInfo(info)
}
func printDiscovererInfo(info *pbutils.DiscovererInfo) {
fmt.Println("URI:", info.GetURI())
fmt.Println("Duration:", info.GetDuration())
func printDiscovererInfo(info *gstpbutils.DiscovererInfo) {
fmt.Println("URI:", info.URI())
fmt.Println("Duration:", info.Duration())
printTags(info)
printStreamInfo(info.GetStreamInfo())
printStreamInfo(info.StreamInfo())
children := info.GetStreamList()
children := info.StreamList()
fmt.Println("Children streams:")
for _, child := range children {
printStreamInfo(child)
}
}
func printTags(info *pbutils.DiscovererInfo) {
func printTags(info *gstpbutils.DiscovererInfo) {
fmt.Println("Tags:")
tags := info.GetTags()
tags := info.Tags()
if tags != nil {
fmt.Println(" ", tags)
return
@@ -69,13 +69,13 @@ func printTags(info *pbutils.DiscovererInfo) {
fmt.Println(" no tags")
}
func printStreamInfo(info *pbutils.DiscovererStreamInfo) {
func printStreamInfo(info *gstpbutils.DiscovererStreamInfo) {
if info == nil {
return
}
fmt.Println("Stream: ")
fmt.Println(" Stream id:", info.GetStreamID())
if caps := info.GetCaps(); caps != nil {
fmt.Println(" Stream id:", info.StreamID())
if caps := info.Caps(); caps != nil {
fmt.Println(" Format:", caps)
}
}

View File

@@ -12,49 +12,39 @@ package main
import (
"errors"
"fmt"
"math"
"github.com/go-gst/go-glib/glib"
"github.com/go-gst/go-gst/examples"
"github.com/go-gst/go-gst/gst"
"github.com/go-gst/go-gst/pkg/gst"
)
func padProbes(mainLoop *glib.MainLoop) error {
gst.Init(nil)
func main() {
gst.Init()
// Parse the pipeline we want to probe from a static in-line string.
// Here we give our audiotestsrc a name, so we can retrieve that element
// from the resulting pipeline.
pipeline, err := gst.NewPipelineFromString(
ret, err := gst.ParseLaunch(
"audiotestsrc name=src ! audio/x-raw,format=S16LE,channels=1 ! fakesink",
)
if err != nil {
return err
panic("could not create pipeline")
}
pipeline := ret.(*gst.Pipeline)
// Get the audiotestsrc element from the pipeline that GStreamer
// created for us while parsing the launch syntax above.
//
// TODO: There are some discrepancies still between methods that check the nil
// value and return an error, versus those that will instead just return nil.
// Need to settle on one way or the other.
src, err := pipeline.GetElementByName("src")
if err != nil {
return err
}
src := pipeline.ByName("src").(*gst.Element)
// Get the audiotestsrc's src-pad.
srcPad := src.GetStaticPad("src")
if srcPad == nil {
return errors.New("src pad on src element was nil")
}
srcPad := src.StaticPad("src")
// Add a probe handler on the audiotestsrc's src-pad.
// This handler gets called for every buffer that passes the pad we probe.
srcPad.AddProbe(gst.PadProbeTypeBuffer, func(self *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
// Interpret the data sent over the pad as a buffer. We know to expect this because of
// the probe mask defined above.
buffer := info.GetBuffer()
buffer := info.Buffer()
// At this point, buffer is only a reference to an existing memory region somewhere.
// When we want to access its content, we have to map it while requesting the required
@@ -63,24 +53,31 @@ func padProbes(mainLoop *glib.MainLoop) error {
// on the machine's main memory itself, but rather in the GPU's memory.
// So mapping the buffer makes the underlying memory region accessible to us.
// See: https://gstreamer.freedesktop.org/documentation/plugin-development/advanced/allocation.html
mapInfo := buffer.Map(gst.MapRead)
defer buffer.Unmap()
mapInfo, ok := buffer.Map(gst.MapRead)
if !ok {
panic("could not map buffer")
}
defer buffer.Unmap(mapInfo)
// TODO: make mapInfo data accessible
// We know what format the data in the memory region has, since we requested
// it by setting the fakesink's caps. So what we do here is interpret the
// memory region we mapped as an array of signed 16 bit integers.
samples := mapInfo.AsInt16LESlice()
if len(samples) == 0 {
return gst.PadProbeOK
}
// // memory region we mapped as an array of signed 16 bit integers.
// samples := mapInfo
// if len(samples) == 0 {
// return gst.PadProbeOK
// }
// For each buffer (= chunk of samples) calculate the root mean square.
var square float64
for _, i := range samples {
square += float64(i * i)
}
rms := math.Sqrt(square / float64(len(samples)))
fmt.Println("rms:", rms)
// // For each buffer (= chunk of samples) calculate the root mean square.
// var square float64
// for _, i := range samples {
// square += float64(i * i)
// }
// rms := math.Sqrt(square / float64(len(samples)))
// fmt.Println("rms:", rms)
return gst.PadProbeOK
})
@@ -90,29 +87,25 @@ func padProbes(mainLoop *glib.MainLoop) error {
// Block on messages coming in from the bus instead of using the main loop
for {
msg := pipeline.GetPipelineBus().TimedPop(gst.ClockTimeNone)
msg := pipeline.Bus().TimedPop(gst.ClockTimeNone)
if msg == nil {
break
}
if err := handleMessage(msg); err != nil {
return err
fmt.Println(err)
return
}
}
return nil
}
func handleMessage(msg *gst.Message) error {
defer msg.Unref()
switch msg.Type() {
case gst.MessageEOS:
case gst.MessageEos:
return errors.New("end-of-stream")
case gst.MessageError:
return msg.ParseError()
err, _ := msg.ParseError()
return err
}
return nil
}
func main() {
examples.RunLoop(padProbes)
}

View File

@@ -16,40 +16,46 @@ import (
"fmt"
"os"
"github.com/go-gst/go-glib/glib"
"github.com/go-gst/go-gst/examples"
"github.com/go-gst/go-gst/gst"
"github.com/diamondburned/gotk4/pkg/glib/v2"
"github.com/go-gst/go-gst/pkg/gst"
)
func playbin(mainLoop *glib.MainLoop) error {
func playbin() error {
gst.Init()
mainLoop := glib.NewMainLoop(glib.MainContextDefault(), false)
if len(os.Args) < 2 {
return errors.New("usage: playbin <uri>")
}
gst.Init(nil)
gst.Init()
// Create a new playbin and set the URI on it
playbin, err := gst.NewElement("playbin")
if err != nil {
return err
ret := gst.ElementFactoryMake("playbin", "")
if ret != nil {
return fmt.Errorf("could not create playbin")
}
playbin.Set("uri", os.Args[1])
playbin := ret.(*gst.Pipeline)
playbin.SetObjectProperty("uri", os.Args[1])
// The playbin element itself is a pipeline, so it can be used as one, despite being
// created from an element factory.
bus := playbin.GetBus()
bus := playbin.Bus()
playbin.SetState(gst.StatePlaying)
bus.AddWatch(func(msg *gst.Message) bool {
bus.AddWatch(0, func(bus *gst.Bus, msg *gst.Message) bool {
switch msg.Type() {
case gst.MessageEOS:
case gst.MessageEos:
mainLoop.Quit()
return false
case gst.MessageError:
err := msg.ParseError()
err, debug := msg.ParseError()
fmt.Println("ERROR:", err.Error())
if debug := err.DebugString(); debug != "" {
if debug != "" {
fmt.Println("DEBUG")
fmt.Println(debug)
}
@@ -57,33 +63,36 @@ func playbin(mainLoop *glib.MainLoop) error {
return false
// Watch state change events
case gst.MessageStateChanged:
if _, newState := msg.ParseStateChanged(); newState == gst.StatePlaying {
bin := gst.ToGstBin(playbin)
if _, newState, _ := msg.ParseStateChanged(); newState == gst.StatePlaying {
// Generate a dot graph of the pipeline to GST_DEBUG_DUMP_DOT_DIR if defined
bin.DebugBinToDotFile(gst.DebugGraphShowAll, "PLAYING")
gst.DebugBinToDotFile(&playbin.Bin, gst.DebugGraphShowAll, "PLAYING")
}
// Tag messages contain changes to tags on the stream. This can include metadata about
// the stream such as codecs, artists, albums, etc.
case gst.MessageTag:
tags := msg.ParseTags()
tags := msg.ParseTag()
fmt.Println("Tags:")
if artist, ok := tags.GetString(gst.TagArtist); ok {
if artist, ok := tags.String(gst.TAG_ARTIST); ok {
fmt.Println(" Artist:", artist)
}
if album, ok := tags.GetString(gst.TagAlbum); ok {
if album, ok := tags.String(gst.TAG_ALBUM); ok {
fmt.Println(" Album:", album)
}
if title, ok := tags.GetString(gst.TagTitle); ok {
if title, ok := tags.String(gst.TAG_TITLE); ok {
fmt.Println(" Title:", title)
}
}
return true
})
return mainLoop.RunError()
mainLoop.Run()
return nil
}
func main() {
examples.RunLoop(playbin)
if err := playbin(); err != nil {
fmt.Println(err)
}
}

View File

@@ -1,13 +0,0 @@
PLUGINS ?= $(patsubst %/,%,$(sort $(dir $(wildcard */))))
all: $(PLUGINS)
.PHONY: $(PLUGINS)
$(PLUGINS):
cd $@ && \
go generate && \
go build -o ../libgst$@.so -buildmode c-shared .
rm libgst$@.h
clean:
rm -f *.so *.h

View File

@@ -1,8 +1,3 @@
# Plugins
This directory contains examples of writing GStreamer plugins using `go-gst`.
The metadata required by GStreamer is generated via `go generate` with the code for the generator contained in this repo
at [`cmd/gst-plugin-gen`](../../cmd/gst-plugin-gen).
The generator assumes the above is compiled and accessible in your PATH as `gst-plugin-gen`.
You can build and install it to your `GOPATH` by running `make install-plugin-gen` in the root of the repository.
This directory contains examples of writing GStreamer plugins using `go-gst`.

View File

@@ -1,232 +0,0 @@
//lint:file-ignore U1000 Ignore all unused code, this is example code
// +plugin:Name=async-identity
// +plugin:Description=A go-gst example plugin with async state changes
// +plugin:Version=v0.0.1
// +plugin:License=gst.LicenseLGPL
// +plugin:Source=go-gst
// +plugin:Package=examples
// +plugin:Origin=https://github.com/go-gst/go-gst
// +plugin:ReleaseDate=2024-09-13
//
// +element:Name=asyncidentity
// +element:Rank=gst.RankNone
// +element:Impl=asyncidentity
// +element:Subclass=gst.ExtendsElement
//
//go:generate gst-plugin-gen
package main
import (
"fmt"
"sync/atomic"
"time"
"github.com/go-gst/go-glib/glib"
"github.com/go-gst/go-gst/gst"
)
var (
_cat = gst.NewDebugCategory(
"asyncidentity",
gst.DebugColorNone,
"asyncidentity element",
)
_srcPadTemplate = gst.NewPadTemplate("generic-src", gst.PadDirectionSource,
gst.PadPresenceAlways, gst.NewAnyCaps())
_sinkPadTemplate = gst.NewPadTemplate("generic-sink", gst.PadDirectionSink,
gst.PadPresenceAlways, gst.NewAnyCaps())
_properties = []*glib.ParamSpec{
glib.NewUint64Param(
"delay",
"ns state change delay",
"Duration in nanoseconds to wait until a state changes",
_delayNsMin, _delayNsMax, _delayNsDefault,
glib.ParameterReadWrite,
),
}
)
const (
_propDelayNs = 0
_delayNsMin = uint64(0)
_delayNsMax = uint64(time.Second) * 10
_delayNsDefault = uint64(time.Second)
)
func main() {}
type asyncidentity struct {
// inner state
sinkpad *gst.Pad
srcpad *gst.Pad
asyncPending atomic.Bool
// property storage
delayNs atomic.Uint64
}
var _ glib.GoObjectSubclass = (*asyncidentity)(nil)
func (g *asyncidentity) New() glib.GoObjectSubclass { return &asyncidentity{} }
func (g *asyncidentity) ClassInit(klass *glib.ObjectClass) {
class := gst.ToElementClass(klass)
class.SetMetadata(
"Async Identity Example",
"General",
"An async state changing identity like element",
"Artem Martus <artemmartus2012@gmail.com>",
)
class.AddStaticPadTemplate(_srcPadTemplate)
class.AddStaticPadTemplate(_sinkPadTemplate)
class.InstallProperties(_properties)
}
var _ glib.GoObject = (*asyncidentity)(nil)
func (g *asyncidentity) SetProperty(obj *glib.Object, id uint, value *glib.Value) {
self := gst.ToElement(obj)
switch id {
case _propDelayNs:
newDelayErased, err := value.GoValue()
if err != nil {
self.Error("Failed unmarshalling the 'delay' property", err)
return
}
newDelay, ok := newDelayErased.(uint64)
if !ok {
self.Error("Failed Go-casting the 'delay' interface{} into uint64",
fmt.Errorf("interfaced value: %+v", newDelayErased))
return
}
oldDelay := g.delayNs.Swap(newDelay)
self.Log(_cat, gst.LevelInfo,
fmt.Sprintf("Changed delay property %s => %s",
time.Duration(oldDelay),
time.Duration(newDelay),
))
default:
self.Error("Tried to set unknown property",
fmt.Errorf("prop id %d: %s", id, value.TypeName()))
}
}
func (g *asyncidentity) GetProperty(obj *glib.Object, id uint) *glib.Value {
var (
out *glib.Value
err error
)
switch id {
case _propDelayNs:
out, err = glib.GValue(g.delayNs.Load())
default:
err = fmt.Errorf("unknown property id: %d", id)
}
if err != nil {
self := gst.ToElement(obj)
self.Error("Get property error", err)
out = nil
}
return out
}
func (g *asyncidentity) Constructed(self *glib.Object) {
elem := gst.ToElement(self)
srcPad := gst.NewPadFromTemplate(_srcPadTemplate, "src")
sinkPad := gst.NewPadFromTemplate(_sinkPadTemplate, "sink")
sinkPad.SetChainFunction(g.sink_chain_function)
// Have to set proxy flags on a pads
proxyFlags := gst.PadFlagProxyAllocation | gst.PadFlagProxyCaps | gst.PadFlagProxyScheduling
sinkPad.SetFlags(proxyFlags)
srcPad.SetFlags(proxyFlags)
// Or setup query & event functions like so
// sinkPad.SetQueryFunction(func(self *gst.Pad, parent *gst.Object, query *gst.Query) bool {
// return srcPad.PeerQuery(query)
// })
// sinkPad.SetEventFunction(func(self *gst.Pad, parent *gst.Object, event *gst.Event) bool {
// return srcPad.PushEvent(event)
// })
// srcPad.SetQueryFunction(func(self *gst.Pad, parent *gst.Object, query *gst.Query) bool {
// return sinkPad.PeerQuery(query)
// })
// srcPad.SetEventFunction(func(self *gst.Pad, parent *gst.Object, event *gst.Event) bool {
// return sinkPad.PushEvent(event)
// })
elem.AddPad(srcPad)
elem.AddPad(sinkPad)
g.srcpad = srcPad
g.sinkpad = sinkPad
g.delayNs.Store(_delayNsDefault)
}
func (g *asyncidentity) sink_chain_function(
_self *gst.Pad,
_parent *gst.Object,
buffer *gst.Buffer,
) gst.FlowReturn {
return g.srcpad.Push(buffer)
}
// var _ gst.ElementImpl = (*asyncidentity)(nil)
func (g *asyncidentity) ChangeState(el *gst.Element, transition gst.StateChange) gst.StateChangeReturn {
if ret := el.ParentChangeState(transition); ret == gst.StateChangeFailure {
return ret
}
switch transition {
case gst.StateChangeNullToReady:
// async will be ignored due to target state <= READY
case gst.StateChangeReadyToPaused:
// async will be ignored due to no_preroll
return gst.StateChangeNoPreroll
case gst.StateChangePausedToPlaying:
fallthrough
case gst.StateChangePlayingToPaused:
g.asyncStateChange(el)
return gst.StateChangeAsync
case gst.StateChangePausedToReady:
// async will be ignored due to target state <= READY
case gst.StateChangeReadyToNull:
}
// check against forcing state change
if g.asyncPending.Load() {
return gst.StateChangeAsync
}
return gst.StateChangeSuccess
}
func (g *asyncidentity) asyncStateChange(el *gst.Element) {
msg := gst.NewAsyncStartMessage(el)
_ = el.PostMessage(msg)
go func(el *gst.Element) {
g.asyncPending.Store(true)
delay := time.Duration(g.delayNs.Load())
<-time.After(delay)
msg := gst.NewAsyncDoneMessage(el, gst.ClockTimeNone)
_ = el.PostMessage(msg)
g.asyncPending.Store(false)
}(el)
}

View File

@@ -1,68 +1,9 @@
package customtransform
import (
"github.com/go-gst/go-glib/glib"
"github.com/go-gst/go-gst/gst"
"github.com/go-gst/go-gst/gst/base"
"github.com/go-gst/go-gst/pkg/gstbase"
)
type customBaseTransform struct{}
// ClassInit is the place where you define pads and properties
func (*customBaseTransform) ClassInit(klass *glib.ObjectClass) {
class := gst.ToElementClass(klass)
class.SetMetadata(
"custom base transform",
"Transform/demo",
"custom base transform",
"Wilhelm Bartel <bartel.wilhelm@gmail.com>",
)
class.AddPadTemplate(gst.NewPadTemplate(
"src",
gst.PadDirectionSource,
gst.PadPresenceAlways,
gst.NewCapsFromString("audio/x-raw,channels=2,rate=48000"),
))
class.AddPadTemplate(gst.NewPadTemplate(
"sink",
gst.PadDirectionSink,
gst.PadPresenceAlways,
gst.NewCapsFromString("audio/x-raw,channels=2,rate=48000"),
))
}
// SetProperty gets called for every property. The id is the index in the slice defined above.
func (s *customBaseTransform) SetProperty(self *glib.Object, id uint, value *glib.Value) {}
// GetProperty is called to retrieve the value of the property at index `id` in the properties
// slice provided at ClassInit.
func (o *customBaseTransform) GetProperty(self *glib.Object, id uint) *glib.Value {
return nil
}
// New is called by the bindings to create a new instance of your go element. Use this to initialize channels, maps, etc.
//
// Think of New like the constructor of your struct
func (*customBaseTransform) New() glib.GoObjectSubclass {
return &customBaseTransform{}
}
// InstanceInit should initialize the element. Keep in mind that the properties are not yet present. When this is called.
func (s *customBaseTransform) InstanceInit(instance *glib.Object) {}
func (s *customBaseTransform) Constructed(o *glib.Object) {}
func (s *customBaseTransform) Finalize(o *glib.Object) {}
// see base.GstBaseTransformImpl interface for the method signatures of the virtual methods
//
// it is not required to implement all methods
var _ base.GstBaseTransformImpl = nil
func (s *customBaseTransform) SinkEvent(self *base.GstBaseTransform, event *gst.Event) bool {
return self.ParentSinkEvent(event)
}
func (s *customBaseTransform) SrcEvent(self *base.GstBaseTransform, event *gst.Event) bool {
return self.ParentSrcEvent(event)
type customBaseTransform struct {
gstbase.BaseTransform
}

View File

@@ -1,23 +1,51 @@
package customtransform
import (
"github.com/go-gst/go-gst/gst"
"github.com/go-gst/go-gst/gst/base"
"github.com/diamondburned/gotk4/pkg/core/glib"
"github.com/go-gst/go-gst/pkg/gst"
"github.com/go-gst/go-gst/pkg/gstbase"
)
// Register needs to be called after gst.Init() to make the gocustombin available in the standard
// gst element registry. After this call the element can be used like any other gstreamer element
func Register() bool {
return gst.RegisterElement(
registered := glib.RegisterSubclassWithConstructor[*customBaseTransform](
func() *customBaseTransform {
return &customBaseTransform{}
},
glib.WithOverrides[*customBaseTransform, gstbase.BaseTransformOverrides](func(b *customBaseTransform) gstbase.BaseTransformOverrides {
return gstbase.BaseTransformOverrides{}
}),
glib.WithClassInit[*gstbase.BaseTransformClass](func(class *gstbase.BaseTransformClass) {
class.ParentClass().SetStaticMetadata(
"custom base transform",
"Transform/demo",
"custom base transform",
"Wilhelm Bartel <bartel.wilhelm@gmail.com>",
)
class.ParentClass().AddPadTemplate(gst.NewPadTemplate(
"src",
gst.PadSrc,
gst.PadAlways,
gst.CapsFromString("audio/x-raw,channels=2,rate=48000"),
))
class.ParentClass().AddPadTemplate(gst.NewPadTemplate(
"sink",
gst.PadSink,
gst.PadAlways,
gst.CapsFromString("audio/x-raw,channels=2,rate=48000"),
))
}),
)
return gst.ElementRegister(
// no plugin:
nil,
// The name of the element
"gocustomtransform",
// The rank of the element
gst.RankNone,
// The GoElement implementation for the element
&customBaseTransform{},
// The base subclass this element extends
base.ExtendsBaseTransform,
uint(gst.RankNone),
registered.Type(),
)
}

View File

@@ -5,32 +5,33 @@ import (
"fmt"
"os"
"os/signal"
"time"
"github.com/go-gst/go-gst/examples/plugins/basetransform/internal/customtransform"
"github.com/go-gst/go-gst/gst"
"github.com/go-gst/go-gst/pkg/gst"
)
func run(ctx context.Context) error {
ctx, cancel := signal.NotifyContext(ctx, os.Interrupt)
defer cancel()
gst.Init(nil)
gst.Init()
customtransform.Register()
pipeline, err := gst.NewPipelineFromString("audiotestsrc ! gocustomtransform ! fakesink")
ret, err := gst.ParseLaunch("audiotestsrc ! gocustomtransform ! fakesink")
if err != nil {
return err
}
pipeline := ret.(*gst.Pipeline)
pipeline.SetState(gst.StatePlaying)
<-ctx.Done()
pipeline.BlockSetState(gst.StateNull)
gst.Deinit()
pipeline.BlockSetState(gst.StateNull, gst.ClockTime(time.Second))
return ctx.Err()
}

View File

@@ -1,40 +0,0 @@
//lint:file-ignore U1000 Ignore all unused code, this is example code
// +plugin:Name=boilerplate
// +plugin:Description=My plugin written in go
// +plugin:Version=v0.0.1
// +plugin:License=gst.LicenseLGPL
// +plugin:Source=go-gst
// +plugin:Package=examples
// +plugin:Origin=https://github.com/go-gst/go-gst
// +plugin:ReleaseDate=2021-01-18
//
// +element:Name=myelement
// +element:Rank=gst.RankNone
// +element:Impl=myelement
// +element:Subclass=gst.ExtendsElement
//
//go:generate gst-plugin-gen
package main
import (
"github.com/go-gst/go-glib/glib"
"github.com/go-gst/go-gst/gst"
)
func main() {}
type myelement struct{}
func (g *myelement) New() glib.GoObjectSubclass { return &myelement{} }
func (g *myelement) ClassInit(klass *glib.ObjectClass) {
// Set the plugin's longname as it is a basic requirement for a GStreamer plugin
class := gst.ToElementClass(klass)
class.SetMetadata(
"Boilerplate",
"General",
"An empty element which does nothing",
"Avi Zimmerman <avi.zimmerman@gmail.com>",
)
}

View File

@@ -1,41 +0,0 @@
//lint:file-ignore U1000 Ignore all unused code, this is a work in progress
// +plugin:Name=gobin
// +plugin:Description=A bin element written in go
// +plugin:Version=v0.0.1
// +plugin:License=gst.LicenseLGPL
// +plugin:Source=go-gst
// +plugin:Package=examples
// +plugin:Origin=https://github.com/go-gst/go-gst
// +plugin:ReleaseDate=2021-01-18
//
// +element:Name=gobin
// +element:Rank=gst.RankNone
// +element:Impl=gobin
// +element:Subclass=gst.ExtendsBin
// +element:Interfaces=gst.InterfaceChildProxy
//
//go:generate gst-plugin-gen
package main
import (
"github.com/go-gst/go-glib/glib"
"github.com/go-gst/go-gst/gst"
)
func main() {}
type gobin struct{}
func (g *gobin) New() glib.GoObjectSubclass { return &gobin{} }
func (g *gobin) ClassInit(klass *glib.ObjectClass) {
// Set the plugin's longname as it is a basic requirement for a GStreamer plugin
class := gst.ToElementClass(klass)
class.SetMetadata(
"GoBin example",
"General",
"An empty GstBin element which does nothing",
"Avi Zimmerman <avi.zimmerman@gmail.com>",
)
}

View File

@@ -1,285 +0,0 @@
// This example demonstrates a filesink plugin implemented in Go.
//
// Every element in a Gstreamer pipeline is provided by plugins. Some are builtin while
// others are provided by third-parties or distributed privately. The plugins are built
// around the GObject type system.
//
// Go-gst offers loose bindings around the GObject type system to provide the necessary
// functionality to implement these plugins. The example in this code produces an element
// that can write to a file on the local system.
//
// In order to build the plugin for use by GStreamer, you can do the following:
//
// $ go generate
// $ go build -o libgstgofilesink.so -buildmode c-shared .
//
// +plugin:Name=gofilesink
// +plugin:Description=File plugins written in go
// +plugin:Version=v0.0.1
// +plugin:License=gst.LicenseLGPL
// +plugin:Source=go-gst
// +plugin:Package=examples
// +plugin:Origin=https://github.com/go-gst/go-gst
// +plugin:ReleaseDate=2021-01-04
//
// +element:Name=gofilesink
// +element:Rank=gst.RankNone
// +element:Impl=FileSink
// +element:Subclass=base.ExtendsBaseSink
// +element:Interfaces=gst.InterfaceURIHandler
//
//go:generate gst-plugin-gen
package main
import (
"errors"
"fmt"
"io"
"os"
"strings"
"github.com/go-gst/go-glib/glib"
"github.com/go-gst/go-gst/gst"
"github.com/go-gst/go-gst/gst/base"
)
// main is left unimplemented since these files are compiled to c-shared.
func main() {}
// CAT is the log category for the gofilesink. It is safe to define GStreamer objects as globals
// without calling gst.Init, since in the context of a loaded plugin all initialization has
// already been taken care of by the loading application.
var CAT = gst.NewDebugCategory(
"gofilesink",
gst.DebugColorNone,
"GoFileSink Element",
)
// Here we define a list of ParamSpecs that will make up the properties for our element.
// This element only has a single property, the location of the file to write to.
// When getting and setting properties later on, you will reference them by their index in
// this list.
var properties = []*glib.ParamSpec{
glib.NewStringParam(
"location", // The name of the parameter
"File Location", // The long name for the parameter
"Location to write the file to", // A blurb about the parameter
nil, // A default value for the parameter
glib.ParameterReadWrite, // Flags for the parameter
),
}
// Here we declare a private struct to hold our internal state.
type state struct {
// Whether the element is started or not
started bool
// The file the element is writing to
file *os.File
// The current position in the file
position uint64
}
// This is another private struct where we hold the parameter values set on our
// element.
type settings struct {
location string
}
// Finally a structure is defined that implements (at a minimum) the glib.GoObject interface.
// It is possible to signal to the bindings to inherit from other classes or implement other
// interfaces via the registration and TypeInit processes.
type FileSink struct {
// The settings for the element
settings *settings
// The current state of the element
state *state
}
// setLocation is a simple method to check the validity of a provided file path and set the
// local value with it.
func (f *FileSink) setLocation(path string) error {
if f.state.started {
return errors.New("changing the `location` property on a started `GoFileSink` is not supported")
}
f.settings.location = strings.TrimPrefix(path, "file://") // should obviously use url.URL and do actual parsing
return nil
}
// The ObjectSubclass implementations below are for registering the various aspects of our
// element and its capabilities with the type system. These are the minimum methods that
// should be implemented by an element.
// Every element needs to provide its own constructor that returns an initialized glib.GoObjectSubclass
// implementation. Here we simply create a new fileSink with zeroed settings and state objects.
func (f *FileSink) New() glib.GoObjectSubclass {
CAT.Log(gst.LevelLog, "Initializing new fileSink object")
return &FileSink{
settings: &settings{},
state: &state{},
}
}
// The ClassInit method should specify the metadata for this element and add any pad templates
// and properties.
func (f *FileSink) ClassInit(klass *glib.ObjectClass) {
CAT.Log(gst.LevelLog, "Initializing gofilesink class")
class := gst.ToElementClass(klass)
class.SetMetadata(
"File Sink",
"Sink/File",
"Write stream to a file",
"Avi Zimmerman <avi.zimmerman@gmail.com>",
)
CAT.Log(gst.LevelLog, "Adding sink pad template and properties to class")
class.AddPadTemplate(gst.NewPadTemplate(
"sink",
gst.PadDirectionSink,
gst.PadPresenceAlways,
gst.NewAnyCaps(),
))
class.InstallProperties(properties)
}
// Object implementations are used during the initialization of an element. The
// methods are called once the object is constructed and its properties are read
// and written to. These and the rest of the methods described below are documented
// in interfaces in the bindings, however only individual methods needs from those
// interfaces need to be implemented. When left unimplemented, the behavior of the parent
// class is inherited.
// SetProperty is called when a `value` is set to the property at index `id` in the
// properties slice that we installed during ClassInit. It should attempt to register
// the value locally or signal any errors that occur in the process.
func (f *FileSink) SetProperty(self *glib.Object, id uint, value *glib.Value) {
param := properties[id]
switch param.Name() {
case "location":
var val string
if value == nil {
val = ""
} else {
val, _ = value.GetString()
}
if err := f.setLocation(val); err != nil {
gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorSettings,
fmt.Sprintf("Could not set location on object: %s", err.Error()),
"",
)
return
}
gst.ToElement(self).Log(CAT, gst.LevelInfo, fmt.Sprintf("Set `location` to %s", f.settings.location))
}
}
// GetProperty is called to retrieve the value of the property at index `id` in the properties
// slice provided at ClassInit.
func (f *FileSink) GetProperty(self *glib.Object, id uint) *glib.Value {
param := properties[id]
switch param.Name() {
case "location":
if f.settings.location == "" {
return nil
}
val, err := glib.GValue(f.settings.location)
if err == nil {
return val
}
gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorFailed,
fmt.Sprintf("Could not convert %s to GValue", f.settings.location),
err.Error(),
)
}
return nil
}
// GstBaseSink implementations are optional methods to implement from the base.GstBaseSinkImpl interface.
// If the method is not overridden by the implementing struct, it will be inherited from the parent class.
// Start is called to start the filesink. Open the file for writing and set the internal state.
func (f *FileSink) Start(self *base.GstBaseSink) bool {
if f.state.started {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings, "GoFileSink is already started", "")
return false
}
if f.settings.location == "" {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings, "No location configured on the filesink", "")
return false
}
destFile := f.settings.location
var err error
f.state.file, err = os.Create(destFile)
if err != nil {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorOpenWrite,
fmt.Sprintf("Could not open %s for writing", destFile), err.Error())
return false
}
self.Log(CAT, gst.LevelDebug, fmt.Sprintf("Opened file %s for writing", destFile))
f.state.started = true
self.Log(CAT, gst.LevelInfo, "GoFileSink has started")
return true
}
// Stop is called to stop the element. Set the internal state and close the file.
func (f *FileSink) Stop(self *base.GstBaseSink) bool {
if !f.state.started {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings, "GoFileSink is not started", "")
return false
}
if err := f.state.file.Close(); err != nil {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorWrite, "Failed to close the destination file", err.Error())
return false
}
self.Log(CAT, gst.LevelInfo, "GoFileSink has stopped")
return true
}
// Render is called when a buffer is ready to be written to the file.
func (f *FileSink) Render(self *base.GstBaseSink, buffer *gst.Buffer) gst.FlowReturn {
if !f.state.started {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings, "GoFileSink is not started", "")
return gst.FlowError
}
self.Log(CAT, gst.LevelTrace, fmt.Sprintf("Rendering buffer at %v", buffer.Instance()))
newPos, err := io.Copy(f.state.file, buffer.Reader())
if err != nil {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorWrite, "Error copying buffer to file", err.Error())
return gst.FlowError
}
f.state.position += uint64(newPos)
self.Log(CAT, gst.LevelTrace, fmt.Sprintf("New position in file: %v", f.state.position))
return gst.FlowOK
}
// URIHandler implementations are the methods required by the GstURIHandler interface.
// GetURI returns the currently configured URI
func (f *FileSink) GetURI() string { return fmt.Sprintf("file://%s", f.settings.location) }
// GetURIType returns the types of URI this element supports.
func (f *FileSink) GetURIType() gst.URIType { return gst.URISource }
// GetProtocols returns the protcols this element supports.
func (f *FileSink) GetProtocols() []string { return []string{"file"} }
// SetURI should set the URI that this element is working on.
func (f *FileSink) SetURI(uri string) (bool, error) {
if uri == "file://" {
return true, nil
}
err := f.setLocation(uri)
if err != nil {
return false, err
}
CAT.Log(gst.LevelInfo, fmt.Sprintf("Set `location` to %s via URIHandler", f.settings.location))
return true, nil
}

View File

@@ -1,354 +0,0 @@
// This example demonstrates a filesrc plugin implemented in Go.
//
// Every element in a Gstreamer pipeline is provided by plugins. Some are builtin while
// others are provided by third-parties or distributed privately. The plugins are built
// around the GObject type system.
//
// Go-gst offers loose bindings around the GObject type system to provide the necessary
// functionality to implement these plugins. The example in this code produces an element
// that can read from a file on the local system.
//
// In order to build the plugin for use by GStreamer, you can do the following:
//
// $ go generate
// $ go build -o libgstgofilesrc.so -buildmode c-shared .
//
// +plugin:Name=gofilesrc
// +plugin:Description=File plugins written in go
// +plugin:Version=v0.0.1
// +plugin:License=gst.LicenseLGPL
// +plugin:Source=go-gst
// +plugin:Package=examples
// +plugin:Origin=https://github.com/go-gst/go-gst
// +plugin:ReleaseDate=2021-01-04
//
// +element:Name=gofilesrc
// +element:Rank=gst.RankNone
// +element:Impl=FileSrc
// +element:Subclass=base.ExtendsBaseSrc
// +element:Interfaces=gst.InterfaceURIHandler
//
//go:generate gst-plugin-gen
package main
import (
"errors"
"fmt"
"io"
"os"
"strings"
"github.com/go-gst/go-glib/glib"
"github.com/go-gst/go-gst/gst"
"github.com/go-gst/go-gst/gst/base"
)
// main is left unimplemented since these files are compiled to c-shared.
func main() {}
// CAT is the log category for the gofilesrc. It is safe to define GStreamer objects as globals
// without calling gst.Init, since in the context of a loaded plugin all initialization has
// already been taken care of by the loading application.
var CAT = gst.NewDebugCategory(
"gofilesrc",
gst.DebugColorNone,
"GoFileSrc Element",
)
// Here we define a list of ParamSpecs that will make up the properties for our element.
// This element only has a single property, the location of the file to read from.
// When getting and setting properties later on, you will reference them by their index in
// this list.
var properties = []*glib.ParamSpec{
glib.NewStringParam(
"location", // The name of the parameter
"File Location", // The long name for the parameter
"Location of the file to read from", // A blurb about the parameter
nil, // A default value for the parameter
glib.ParameterReadWrite, // Flags for the parameter
),
}
// Here we declare a private struct to hold our internal state.
type state struct {
// Whether the element is started or not
started bool
// The file the element is reading from
file *os.File
// The information about the file retrieved from stat
fileInfo os.FileInfo
// The current position in the file
position uint64
}
// This is another private struct where we hold the parameter values set on our
// element.
type settings struct {
location string
}
// Finally a structure is defined that implements (at a minimum) the gst.GoElement interface.
// It is possible to signal to the bindings to inherit from other classes or implement other
// interfaces via the registration and TypeInit processes.
type FileSrc struct {
// The settings for the element
settings *settings
// The current state of the element
state *state
}
// Private methods only used internally by the plugin
// setLocation is a simple method to check the validity of a provided file path and set the
// local value with it.
func (f *FileSrc) setLocation(path string) error {
if f.state.started {
return errors.New("changing the `location` property on a started `GoFileSrc` is not supported")
}
f.settings.location = strings.TrimPrefix(path, "file://") // should obviously use url.URL and do actual parsing
return nil
}
// The ObjectSubclass implementations below are for registering the various aspects of our
// element and its capabilities with the type system. These are the minimum methods that
// should be implemented by an element.
// Every element needs to provide its own constructor that returns an initialized
// glib.GoObjectSubclass and state objects.
func (f *FileSrc) New() glib.GoObjectSubclass {
CAT.Log(gst.LevelLog, "Initializing new fileSrc object")
return &FileSrc{
settings: &settings{},
state: &state{},
}
}
// The ClassInit method should specify the metadata for this element and add any pad templates
// and properties.
func (f *FileSrc) ClassInit(klass *glib.ObjectClass) {
CAT.Log(gst.LevelLog, "Initializing gofilesrc class")
class := gst.ToElementClass(klass)
class.SetMetadata(
"File Source",
"Source/File",
"Read stream from a file",
"Avi Zimmerman <avi.zimmerman@gmail.com>",
)
CAT.Log(gst.LevelLog, "Adding src pad template and properties to class")
class.AddPadTemplate(gst.NewPadTemplate(
"src",
gst.PadDirectionSource,
gst.PadPresenceAlways,
gst.NewAnyCaps(),
))
class.InstallProperties(properties)
}
// Object implementations are used during the initialization of an element. The
// methods are called once the object is constructed and its properties are read
// and written to. These and the rest of the methods described below are documented
// in interfaces in the bindings, however only individual methods needs from those
// interfaces need to be implemented. When left unimplemented, the behavior of the parent
// class is inherited.
// SetProperty is called when a `value` is set to the property at index `id` in the
// properties slice that we installed during ClassInit. It should attempt to register
// the value locally or signal any errors that occur in the process.
func (f *FileSrc) SetProperty(self *glib.Object, id uint, value *glib.Value) {
param := properties[id]
switch param.Name() {
case "location":
var val string
if value == nil {
val = ""
} else {
val, _ = value.GetString()
}
if err := f.setLocation(val); err != nil {
gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorSettings,
fmt.Sprintf("Could not set location on object: %s", err.Error()),
"",
)
return
}
gst.ToElement(self).Log(CAT, gst.LevelInfo, fmt.Sprintf("Set `location` to %s", f.settings.location))
}
}
// GetProperty is called to retrieve the value of the property at index `id` in the properties
// slice provided at ClassInit.
func (f *FileSrc) GetProperty(self *glib.Object, id uint) *glib.Value {
param := properties[id]
switch param.Name() {
case "location":
if f.settings.location == "" {
return nil
}
val, err := glib.GValue(f.settings.location)
if err == nil {
return val
}
gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorFailed,
fmt.Sprintf("Could not convert %s to GValue", f.settings.location),
err.Error(),
)
}
return nil
}
// Constructed is called when the type system is done constructing the object. Any finalizations required
// during the initialization process can be performed here. In this example, we set the format on our
// underlying GstBaseSrc to bytes.
func (f *FileSrc) Constructed(self *glib.Object) {
base.ToGstBaseSrc(self).Log(CAT, gst.LevelLog, "Setting format of GstBaseSrc to bytes")
base.ToGstBaseSrc(self).SetFormat(gst.FormatBytes)
}
// GstBaseSrc implementations are optional methods to implement from the base.GstBaseSrcImpl interface.
// If the method is not overridden by the implementing struct, it will be inherited from the parent class.
// IsSeekable returns that we are, in fact, seekable.
func (f *FileSrc) IsSeekable(*base.GstBaseSrc) bool { return true }
// GetSize will return the total size of the file at the configured location.
func (f *FileSrc) GetSize(self *base.GstBaseSrc) (bool, int64) {
if !f.state.started {
return false, 0
}
return true, f.state.fileInfo.Size()
}
// Start is called to start this element. In this example, the configured file is opened for reading,
// and any error encountered in the process is posted to the pipeline.
func (f *FileSrc) Start(self *base.GstBaseSrc) bool {
if f.state.started {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings, "GoFileSrc is already started", "")
return false
}
if f.settings.location == "" {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings, "File location is not defined", "")
return false
}
stat, err := os.Stat(f.settings.location)
if err != nil {
if os.IsNotExist(err) {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorOpenRead,
fmt.Sprintf("%s does not exist", f.settings.location), "")
return false
}
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorOpenRead,
fmt.Sprintf("Could not stat %s, err: %s", f.settings.location, err.Error()), "")
return false
}
if stat.IsDir() {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorOpenRead,
fmt.Sprintf("%s is a directory", f.settings.location), "")
return false
}
f.state.fileInfo = stat
self.Log(CAT, gst.LevelDebug, fmt.Sprintf("file stat - name: %s size: %d mode: %v modtime: %v", stat.Name(), stat.Size(), stat.Mode(), stat.ModTime()))
self.Log(CAT, gst.LevelDebug, fmt.Sprintf("Opening file %s for reading", f.settings.location))
f.state.file, err = os.Open(f.settings.location)
if err != nil {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorOpenRead,
fmt.Sprintf("Could not open file %s for reading", f.settings.location), err.Error())
return false
}
f.state.position = 0
f.state.started = true
self.StartComplete(gst.FlowOK)
self.Log(CAT, gst.LevelInfo, "GoFileSrc has started")
return true
}
// Stop is called to stop the element. The file is closed and the local values are zeroed out.
func (f *FileSrc) Stop(self *base.GstBaseSrc) bool {
if !f.state.started {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings, "FileSrc is not started", "")
return false
}
if err := f.state.file.Close(); err != nil {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorClose, "Failed to close the source file", err.Error())
return false
}
f.state.file = nil
f.state.position = 0
f.state.started = false
self.Log(CAT, gst.LevelInfo, "GoFileSrc has stopped")
return true
}
// Fill is called to fill a pre-allocated buffer with the data at offset up to the given size.
// Since we declared that we are seekable, we need to support the provided offset not necessarily matching
// where we currently are in the file. This is why we store the position in the file locally.
func (f *FileSrc) Fill(self *base.GstBaseSrc, offset uint64, size uint, buffer *gst.Buffer) gst.FlowReturn {
if !f.state.started || f.state.file == nil {
self.ErrorMessage(gst.DomainCore, gst.CoreErrorFailed, "Not started yet", "")
return gst.FlowError
}
self.Log(CAT, gst.LevelLog, fmt.Sprintf("Request to fill buffer from offset %v with size %v", offset, size))
if f.state.position != offset {
self.Log(CAT, gst.LevelDebug, fmt.Sprintf("Seeking to new position at offset %v from previous position at offset %v", offset, f.state.position))
if _, err := f.state.file.Seek(int64(offset), 0); err != nil {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSeek,
fmt.Sprintf("Failed to seek to %d in file", offset), err.Error())
return gst.FlowError
}
f.state.position = offset
}
bufmap := buffer.Map(gst.MapWrite)
if bufmap == nil {
self.ErrorMessage(gst.DomainLibrary, gst.LibraryErrorFailed, "Failed to map buffer", "")
return gst.FlowError
}
defer buffer.Unmap()
self.Log(CAT, gst.LevelLog, fmt.Sprintf("Reading %v bytes from offset %v in file into buffer at %v", size, f.state.position, bufmap.Data()))
if _, err := io.CopyN(bufmap.Writer(), f.state.file, int64(size)); err != nil {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorRead,
fmt.Sprintf("Failed to read %d bytes from file at %d into buffer", size, offset), err.Error())
return gst.FlowError
}
buffer.SetSize(int64(size))
f.state.position = f.state.position + uint64(size)
self.Log(CAT, gst.LevelLog, fmt.Sprintf("Incremented current position to %v", f.state.position))
return gst.FlowOK
}
// URIHandler implementations are the methods required by the GstURIHandler interface.
// GetURI returns the currently configured URI
func (f *FileSrc) GetURI() string { return fmt.Sprintf("file://%s", f.settings.location) }
// GetURIType returns the types of URI this element supports.
func (f *FileSrc) GetURIType() gst.URIType { return gst.URISource }
// GetProtocols returns the protcols this element supports.
func (f *FileSrc) GetProtocols() []string { return []string{"file"} }
// SetURI should set the URI that this element is working on.
func (f *FileSrc) SetURI(uri string) (bool, error) {
if uri == "file://" {
return true, nil
}
err := f.setLocation(uri)
if err != nil {
return false, err
}
CAT.Log(gst.LevelInfo, fmt.Sprintf("Set `location` to %s via URIHandler", f.settings.location))
return true, nil
}

View File

@@ -1,167 +0,0 @@
package main
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"net/http"
"os"
"github.com/go-gst/go-glib/glib"
"github.com/go-gst/go-gst/gst"
minio "github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
)
const (
accessKeyIDEnvVar = "MINIO_ACCESS_KEY_ID"
secretAccessKeyEnvVar = "MINIO_SECRET_ACCESS_KEY"
)
var (
defaultEndpoint = "play.min.io"
defaultUseTLS = true
defaultRegion = "us-east-1"
defaultInsecureSkipVerify = false
)
type settings struct {
endpoint string
useTLS bool
region string
bucket string
key string
accessKeyID string
secretAccessKey string
insecureSkipVerify bool
caCertFile string
partSize uint64
}
func (s *settings) safestring() string {
return fmt.Sprintf("%+v", &settings{
endpoint: s.endpoint,
useTLS: s.useTLS,
region: s.region,
bucket: s.bucket,
key: s.key,
insecureSkipVerify: s.insecureSkipVerify,
caCertFile: s.caCertFile,
})
}
func defaultSettings() *settings {
return &settings{
endpoint: defaultEndpoint,
useTLS: defaultUseTLS,
region: defaultRegion,
accessKeyID: os.Getenv(accessKeyIDEnvVar),
secretAccessKey: os.Getenv(secretAccessKeyEnvVar),
insecureSkipVerify: defaultInsecureSkipVerify,
partSize: defaultPartSize,
}
}
func getMinIOClient(settings *settings) (*minio.Client, error) {
transport := http.DefaultTransport.(*http.Transport).Clone()
if settings.useTLS {
if transport.TLSClientConfig == nil {
transport.TLSClientConfig = &tls.Config{}
}
if settings.caCertFile != "" {
certPool := x509.NewCertPool()
body, err := ioutil.ReadFile(settings.caCertFile)
if err != nil {
return nil, err
}
certPool.AppendCertsFromPEM(body)
transport.TLSClientConfig.RootCAs = certPool
}
transport.TLSClientConfig.InsecureSkipVerify = settings.insecureSkipVerify
}
return minio.New(settings.endpoint, &minio.Options{
Creds: credentials.NewStaticV4(settings.accessKeyID, settings.secretAccessKey, ""),
Secure: settings.useTLS,
Region: settings.region,
})
}
func setProperty(elem *gst.Element, properties []*glib.ParamSpec, settings *settings, id uint, value *glib.Value) {
prop := properties[id]
val, err := value.GoValue()
if err != nil {
elem.ErrorMessage(gst.DomainLibrary, gst.LibraryErrorSettings,
fmt.Sprintf("Could not coerce %v to go value", value), err.Error())
}
switch prop.Name() {
case "endpoint":
settings.endpoint = val.(string)
case "use-tls":
settings.useTLS = val.(bool)
case "tls-skip-verify":
settings.insecureSkipVerify = val.(bool)
case "ca-cert-file":
settings.caCertFile = val.(string)
case "region":
settings.region = val.(string)
case "bucket":
settings.bucket = val.(string)
case "key":
settings.key = val.(string)
case "access-key-id":
settings.accessKeyID = val.(string)
case "secret-access-key":
settings.secretAccessKey = val.(string)
case "part-size":
settings.partSize = val.(uint64)
}
}
func getProperty(elem *gst.Element, properties []*glib.ParamSpec, settings *settings, id uint) *glib.Value {
prop := properties[id]
var localVal interface{}
switch prop.Name() {
case "endpoint":
localVal = settings.endpoint
case "use-tls":
localVal = settings.useTLS
case "tls-skip-verify":
localVal = settings.insecureSkipVerify
case "ca-cert-file":
localVal = settings.caCertFile
case "region":
localVal = settings.region
case "bucket":
localVal = settings.bucket
case "key":
localVal = settings.key
case "access-key-id":
localVal = settings.accessKeyID
case "secret-access-key":
localVal = "<private>"
case "part-size":
localVal = settings.partSize
default:
elem.ErrorMessage(gst.DomainLibrary, gst.LibraryErrorSettings,
fmt.Sprintf("Cannot get invalid property %s", prop.Name()), "")
return nil
}
val, err := glib.GValue(localVal)
if err != nil {
elem.ErrorMessage(gst.DomainLibrary, gst.LibraryErrorFailed,
fmt.Sprintf("Could not convert %v to GValue", localVal),
err.Error(),
)
return nil
}
return val
}

View File

@@ -1,29 +0,0 @@
module github.com/go-gst/go-gst/examples/plugins/minio
go 1.22
require (
github.com/go-gst/go-glib v1.2.1
github.com/go-gst/go-gst v1.2.1
github.com/minio/minio-go/v7 v7.0.76
)
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/go-ini/ini v1.67.0 // indirect
github.com/goccy/go-json v0.10.3 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/mattn/go-pointer v0.0.1 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rs/xid v1.6.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e // indirect
golang.org/x/net v0.29.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

View File

@@ -1,45 +0,0 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/go-gst/go-glib v1.2.1 h1:ibAr5N1NmuHmZ5RaCFjFjeUy0Rk3t3LgvGutmwBeR9E=
github.com/go-gst/go-glib v1.2.1/go.mod h1:JybIYeoHNwCkHGaBf1fHNIaM4sQTrJPkPLsi7dmPNOU=
github.com/go-gst/go-gst v1.2.1 h1:FqUFGFllbuC8LkQoqULgAui2ZS0VU1WEBCNekIMcBEE=
github.com/go-gst/go-gst v1.2.1/go.mod h1:OGPRsJdvNYCKjt3e4H4i8J6KVd2Wk5S2lzsEQ8mO1+g=
github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A=
github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA=
github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM=
github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/mattn/go-pointer v0.0.1 h1:n+XhsuGeVO6MEAp7xyEukFINEa+Quek5psIR/ylA6o0=
github.com/mattn/go-pointer v0.0.1/go.mod h1:2zXcozF6qYGgmsG+SeTZz3oAbFLdD3OWqnUbNvJZAlc=
github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=
github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM=
github.com/minio/minio-go/v7 v7.0.76 h1:9nxHH2XDai61cT/EFhyIw/wW4vJfpPNvl7lSFpRt+Ng=
github.com/minio/minio-go/v7 v7.0.76/go.mod h1:AVM3IUN6WwKzmwBxVdjzhH8xq+f57JSbbvzqvUzR6eg=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rs/xid v1.6.0 h1:fV591PaemRlL6JfRxGDEPl69wICngIQ3shQtzfy2gxU=
github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e h1:I88y4caeGeuDQxgdoFPUq097j7kNfw6uvuiNxUBfcBk=
golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e/go.mod h1:akd2r19cwCdwSwWeIdzYQGa/EZZyqcOdwWiwj5L5eKQ=
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -1,225 +0,0 @@
package main
import (
"fmt"
"io"
"os"
"strings"
"sync"
"github.com/go-gst/go-glib/glib"
"github.com/go-gst/go-gst/gst"
"github.com/go-gst/go-gst/gst/base"
)
var sinkCAT = gst.NewDebugCategory(
"miniosink",
gst.DebugColorNone,
"MinIOSink Element",
)
type minioSink struct {
settings *settings
state *sinkstate
writer *seekWriter
mux sync.Mutex
}
type sinkstate struct {
started bool
}
func (m *minioSink) New() glib.GoObjectSubclass {
srcCAT.Log(gst.LevelLog, "Creating new minioSink object")
return &minioSink{
settings: defaultSettings(),
state: &sinkstate{},
}
}
func (m *minioSink) ClassInit(klass *glib.ObjectClass) {
class := gst.ToElementClass(klass)
sinkCAT.Log(gst.LevelLog, "Initializing miniosink class")
class.SetMetadata(
"MinIO Sink",
"Sink/File",
"Write stream to a MinIO object",
"Avi Zimmerman <avi.zimmerman@gmail.com>",
)
sinkCAT.Log(gst.LevelLog, "Adding sink pad template and properties to class")
class.AddPadTemplate(gst.NewPadTemplate(
"sink",
gst.PadDirectionSink,
gst.PadPresenceAlways,
gst.NewAnyCaps(),
))
class.InstallProperties(sinkProperties)
}
func (m *minioSink) Constructed(obj *glib.Object) { base.ToGstBaseSink(obj).SetSync(false) }
func (m *minioSink) SetProperty(self *glib.Object, id uint, value *glib.Value) {
setProperty(gst.ToElement(self), sinkProperties, m.settings, id, value)
}
func (m *minioSink) GetProperty(self *glib.Object, id uint) *glib.Value {
return getProperty(gst.ToElement(self), sinkProperties, m.settings, id)
}
func (m *minioSink) Query(self *base.GstBaseSink, query *gst.Query) bool {
switch query.Type() {
case gst.QuerySeeking:
self.Log(sinkCAT, gst.LevelDebug, "Answering seeking query")
query.SetSeeking(gst.FormatTime, true, 0, -1)
return true
}
return false
}
func (m *minioSink) Start(self *base.GstBaseSink) bool {
m.mux.Lock()
defer m.mux.Unlock()
if m.state.started {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings,
"MinIOSink is already started", "")
return false
}
if m.settings.bucket == "" {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings,
"No bucket configured on the miniosink", "")
return false
}
if m.settings.key == "" {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings,
"No bucket configured on the miniosink", "")
return false
}
self.Log(sinkCAT, gst.LevelDebug, m.settings.safestring())
if strings.HasPrefix(m.settings.accessKeyID, "env:") {
spl := strings.Split(m.settings.accessKeyID, "env:")
m.settings.accessKeyID = os.Getenv(spl[len(spl)-1])
}
if strings.HasPrefix(m.settings.secretAccessKey, "env:") {
spl := strings.Split(m.settings.secretAccessKey, "env:")
m.settings.secretAccessKey = os.Getenv(spl[len(spl)-1])
}
self.Log(sinkCAT, gst.LevelInfo, fmt.Sprintf("Creating new MinIO client for %s", m.settings.endpoint))
client, err := getMinIOClient(m.settings)
if err != nil {
self.Log(sinkCAT, gst.LevelError, err.Error())
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed,
fmt.Sprintf("Failed to connect to MinIO endpoint %s", m.settings.endpoint), err.Error())
return false
}
self.Log(sinkCAT, gst.LevelInfo, "Initializing new MinIO writer")
m.writer = newSeekWriter(client, int64(m.settings.partSize), m.settings.bucket, m.settings.key)
m.state.started = true
self.Log(sinkCAT, gst.LevelInfo, "MinIOSink has started")
return true
}
func (m *minioSink) Stop(self *base.GstBaseSink) bool {
self.Log(sinkCAT, gst.LevelInfo, "Stopping MinIOSink")
m.mux.Lock()
defer m.mux.Unlock()
if !m.state.started {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings, "MinIOSink is not started", "")
return false
}
m.writer = nil
m.state.started = false
self.Log(sinkCAT, gst.LevelInfo, "MinIOSink has stopped")
return true
}
func (m *minioSink) Render(self *base.GstBaseSink, buffer *gst.Buffer) gst.FlowReturn {
m.mux.Lock()
defer m.mux.Unlock()
if !m.state.started {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings, "MinIOSink is not started", "")
return gst.FlowError
}
self.Log(sinkCAT, gst.LevelTrace, fmt.Sprintf("Rendering buffer %v", buffer))
if _, err := m.writer.Write(buffer.Bytes()); err != nil {
self.Log(sinkCAT, gst.LevelError, err.Error())
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorWrite, fmt.Sprintf("Failed to write data to minio buffer: %s", err.Error()), "")
return gst.FlowError
}
return gst.FlowOK
}
func (m *minioSink) Event(self *base.GstBaseSink, event *gst.Event) bool {
switch event.Type() {
case gst.EventTypeSegment:
segment := event.ParseSegment()
if segment.GetFormat() == gst.FormatBytes {
if uint64(m.writer.currentPosition) != segment.GetStart() {
m.mux.Lock()
self.Log(sinkCAT, gst.LevelInfo, fmt.Sprintf("Seeking to %d", segment.GetStart()))
if _, err := m.writer.Seek(int64(segment.GetStart()), io.SeekStart); err != nil {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed, err.Error(), "")
m.mux.Unlock()
return false
}
m.mux.Unlock()
} else {
self.Log(sinkCAT, gst.LevelDebug, "Ignored SEGMENT, no seek needed")
}
} else {
self.Log(sinkCAT, gst.LevelDebug, fmt.Sprintf("Ignored SEGMENT event of format %s", segment.GetFormat().String()))
}
case gst.EventTypeFlushStop:
self.Log(sinkCAT, gst.LevelInfo, "Flushing contents of writer and seeking back to start")
if m.writer.currentPosition != 0 {
m.mux.Lock()
if err := m.writer.flush(true); err != nil {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorWrite, err.Error(), "")
m.mux.Unlock()
return false
}
if _, err := m.writer.Seek(0, io.SeekStart); err != nil {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed, err.Error(), "")
m.mux.Unlock()
return false
}
m.mux.Unlock()
}
case gst.EventTypeEOS:
self.Log(sinkCAT, gst.LevelInfo, "Received EOS, closing MinIO writer")
m.mux.Lock()
if err := m.writer.Close(); err != nil {
self.Log(sinkCAT, gst.LevelError, err.Error())
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorClose, fmt.Sprintf("Failed to close MinIO writer: %s", err.Error()), "")
m.mux.Unlock()
return false
}
m.mux.Unlock()
}
return self.ParentEvent(event)
}

View File

@@ -1,211 +0,0 @@
package main
import (
"context"
"fmt"
"io"
"os"
"strings"
"sync"
minio "github.com/minio/minio-go/v7"
"github.com/go-gst/go-glib/glib"
"github.com/go-gst/go-gst/gst"
"github.com/go-gst/go-gst/gst/base"
)
var srcCAT = gst.NewDebugCategory(
"miniosrc",
gst.DebugColorNone,
"MinIOSrc Element",
)
type minioSrc struct {
settings *settings
state *srcstate
}
type srcstate struct {
started bool
object *minio.Object
objInfo minio.ObjectInfo
mux sync.Mutex
}
func (m *minioSrc) New() glib.GoObjectSubclass {
srcCAT.Log(gst.LevelLog, "Creating new minioSrc object")
return &minioSrc{
settings: defaultSettings(),
state: &srcstate{},
}
}
func (m *minioSrc) ClassInit(klass *glib.ObjectClass) {
class := gst.ToElementClass(klass)
srcCAT.Log(gst.LevelLog, "Initializing miniosrc class")
class.SetMetadata(
"MinIO Source",
"Source/File",
"Read stream from a MinIO object",
"Avi Zimmerman <avi.zimmerman@gmail.com>",
)
srcCAT.Log(gst.LevelLog, "Adding src pad template and properties to class")
class.AddPadTemplate(gst.NewPadTemplate(
"src",
gst.PadDirectionSource,
gst.PadPresenceAlways,
gst.NewAnyCaps(),
))
class.InstallProperties(srcProperties)
}
func (m *minioSrc) SetProperty(self *glib.Object, id uint, value *glib.Value) {
setProperty(gst.ToElement(self), srcProperties, m.settings, id, value)
}
func (m *minioSrc) GetProperty(self *glib.Object, id uint) *glib.Value {
return getProperty(gst.ToElement(self), srcProperties, m.settings, id)
}
func (m *minioSrc) Constructed(self *glib.Object) {
base.ToGstBaseSrc(self).Log(srcCAT, gst.LevelLog, "Setting format of GstBaseSrc to bytes")
base.ToGstBaseSrc(self).SetFormat(gst.FormatBytes)
}
func (m *minioSrc) IsSeekable(*base.GstBaseSrc) bool { return true }
func (m *minioSrc) GetSize(self *base.GstBaseSrc) (bool, int64) {
if !m.state.started {
return false, 0
}
return true, m.state.objInfo.Size
}
func (m *minioSrc) Start(self *base.GstBaseSrc) bool {
if m.state.started {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed, "MinIOSrc is already started", "")
return false
}
if m.settings.bucket == "" {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed, "No source bucket defined", "")
return false
}
if m.settings.key == "" {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed, "No object key defined", "")
return false
}
m.state.mux.Lock()
if strings.HasPrefix(m.settings.accessKeyID, "env:") {
spl := strings.Split(m.settings.accessKeyID, "env:")
m.settings.accessKeyID = os.Getenv(spl[len(spl)-1])
}
if strings.HasPrefix(m.settings.secretAccessKey, "env:") {
spl := strings.Split(m.settings.secretAccessKey, "env:")
m.settings.secretAccessKey = os.Getenv(spl[len(spl)-1])
}
client, err := getMinIOClient(m.settings)
if err != nil {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed,
fmt.Sprintf("Failed to connect to MinIO endpoint %s", m.settings.endpoint), err.Error())
m.state.mux.Unlock()
return false
}
self.Log(srcCAT, gst.LevelInfo, fmt.Sprintf("Requesting %s/%s from %s", m.settings.bucket, m.settings.key, m.settings.endpoint))
m.state.object, err = client.GetObject(context.Background(), m.settings.bucket, m.settings.key, minio.GetObjectOptions{})
if err != nil {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorOpenRead,
fmt.Sprintf("Failed to retrieve object %q from bucket %q", m.settings.key, m.settings.bucket), err.Error())
m.state.mux.Unlock()
return false
}
self.Log(srcCAT, gst.LevelInfo, "Getting HEAD for object")
m.state.objInfo, err = m.state.object.Stat()
if err != nil {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorOpenRead,
fmt.Sprintf("Failed to stat object %q in bucket %q: %s", m.settings.key, m.settings.bucket, err.Error()), "")
m.state.mux.Unlock()
return false
}
self.Log(srcCAT, gst.LevelInfo, fmt.Sprintf("%+v", m.state.objInfo))
m.state.started = true
m.state.mux.Unlock()
self.StartComplete(gst.FlowOK)
self.Log(srcCAT, gst.LevelInfo, "MinIOSrc has started")
return true
}
func (m *minioSrc) Stop(self *base.GstBaseSrc) bool {
self.Log(srcCAT, gst.LevelInfo, "Stopping MinIOSrc")
m.state.mux.Lock()
defer m.state.mux.Unlock()
if !m.state.started {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings, "MinIOSrc is not started", "")
return false
}
if err := m.state.object.Close(); err != nil {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorClose, "Failed to close the bucket object", err.Error())
return false
}
m.state.object = nil
m.state.started = false
self.Log(srcCAT, gst.LevelInfo, "MinIOSrc has stopped")
return true
}
func (m *minioSrc) Fill(self *base.GstBaseSrc, offset uint64, size uint, buffer *gst.Buffer) gst.FlowReturn {
if !m.state.started || m.state.object == nil {
self.ErrorMessage(gst.DomainCore, gst.CoreErrorFailed, "MinIOSrc is not started yet", "")
return gst.FlowError
}
self.Log(srcCAT, gst.LevelLog, fmt.Sprintf("Request to fill buffer from offset %v with size %v", offset, size))
m.state.mux.Lock()
defer m.state.mux.Unlock()
data := make([]byte, size)
read, err := m.state.object.ReadAt(data, int64(offset))
if err != nil && err != io.EOF {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorRead,
fmt.Sprintf("Failed to read %d bytes from object at offset %d", size, offset), err.Error())
return gst.FlowError
}
if read < int(size) {
self.Log(srcCAT, gst.LevelDebug, fmt.Sprintf("Only read %d bytes from object, trimming", read))
trim := make([]byte, read)
copy(trim, data)
data = trim
}
bufmap := buffer.Map(gst.MapWrite)
if bufmap == nil {
self.ErrorMessage(gst.DomainLibrary, gst.LibraryErrorFailed, "Failed to map buffer", "")
return gst.FlowError
}
defer buffer.Unmap()
bufmap.WriteData(data)
buffer.SetSize(int64(read))
return gst.FlowOK
}

View File

@@ -1,71 +0,0 @@
// This example demonstrates a src element that reads from objects in a minio bucket.
// Since minio implements the S3 API this plugin could also be used for S3 buckets by
// setting the correct endpoints and credentials.
//
// By default this plugin will use the credentials set in the environment at MINIO_ACCESS_KEY_ID
// and MINIO_SECRET_ACCESS_KEY however these can also be set on the element directly.
//
// In order to build the plugin for use by GStreamer, you can do the following:
//
// $ go build -o libgstminio.so -buildmode c-shared .
package main
import "C"
import (
"unsafe"
"github.com/go-gst/go-gst/gst"
"github.com/go-gst/go-gst/gst/base"
)
// The metadata for this plugin
var pluginMeta = &gst.PluginMetadata{
MajorVersion: gst.VersionMajor,
MinorVersion: gst.VersionMinor,
Name: "minio-plugins",
Description: "GStreamer plugins for reading and writing from Minio",
Version: "v0.0.1",
License: gst.LicenseLGPL,
Source: "gst-pipeline-operator",
Package: "plugins",
Origin: "https://github.com/go-gst/gst-pipeline-operator",
ReleaseDate: "2021-01-12",
// The init function is called to register elements provided by the plugin.
Init: func(plugin *gst.Plugin) bool {
if ok := gst.RegisterElement(
plugin,
// The name of the element
"miniosrc",
// The rank of the element
gst.RankNone,
// The GoElement implementation for the element
&minioSrc{},
// The base subclass this element extends
base.ExtendsBaseSrc,
); !ok {
return ok
}
if ok := gst.RegisterElement(
plugin,
// The name of the element
"miniosink",
// The rank of the element
gst.RankNone,
// The GoElement implementation for the element
&minioSink{},
// The base subclass this element extends
base.ExtendsBaseSink,
); !ok {
return ok
}
return true
},
}
func main() {}
//export gst_plugin_minio_get_desc
func gst_plugin_minio_get_desc() unsafe.Pointer { return pluginMeta.Export() }

View File

@@ -1,153 +0,0 @@
package main
import (
"math"
"github.com/go-gst/go-glib/glib"
)
// Even though there is overlap in properties, they have to be declared twice.
// This is because the GType system doesn't allow for GObjects to share pointers
// to the exact same GParamSpecs.
const defaultPartSize = 1024 * 1024 * 128
const minPartSize = 1024 * 1024 * 5
var sinkProperties = []*glib.ParamSpec{
glib.NewStringParam(
"endpoint",
"S3 API Endpoint",
"The endpoint for the S3 API server",
&defaultEndpoint,
glib.ParameterReadWrite,
),
glib.NewBoolParam(
"use-tls",
"Use TLS",
"Use HTTPS for API requests",
defaultUseTLS,
glib.ParameterReadWrite,
),
glib.NewBoolParam(
"tls-skip-verify",
"Disable TLS Verification",
"Don't verify the signature of the MinIO server certificate",
defaultInsecureSkipVerify,
glib.ParameterReadWrite,
),
glib.NewStringParam(
"ca-cert-file",
"PEM CA Cert Bundle",
"A file containing a PEM certificate bundle to use to verify the MinIO certificate",
nil,
glib.ParameterReadWrite,
),
glib.NewStringParam(
"region",
"Bucket region",
"The region where the bucket is",
&defaultRegion,
glib.ParameterReadWrite,
),
glib.NewStringParam(
"bucket",
"Bucket name",
"The name of the MinIO bucket",
nil,
glib.ParameterReadWrite,
),
glib.NewStringParam(
"key",
"Object key",
"The key of the object inside the bucket",
nil,
glib.ParameterReadWrite,
),
glib.NewStringParam(
"access-key-id",
"Access Key ID",
"The access key ID to use for authentication",
nil,
glib.ParameterReadWrite,
),
glib.NewStringParam(
"secret-access-key",
"Secret Access Key",
"The secret access key to use for authentication",
nil,
glib.ParameterReadWrite,
),
glib.NewUint64Param(
"part-size",
"Part Size",
"Size for each part in the multi-part upload",
minPartSize, math.MaxInt64, defaultPartSize,
glib.ParameterReadWrite,
),
}
var srcProperties = []*glib.ParamSpec{
glib.NewStringParam(
"endpoint",
"S3 API Endpoint",
"The endpoint for the S3 API server",
&defaultEndpoint,
glib.ParameterReadWrite,
),
glib.NewBoolParam(
"use-tls",
"Use TLS",
"Use HTTPS for API requests",
defaultUseTLS,
glib.ParameterReadWrite,
),
glib.NewBoolParam(
"tls-skip-verify",
"Disable TLS Verification",
"Don't verify the signature of the MinIO server certificate",
defaultInsecureSkipVerify,
glib.ParameterReadWrite,
),
glib.NewStringParam(
"ca-cert-file",
"PEM CA Cert Bundle",
"A file containing a PEM certificate bundle to use to verify the MinIO certificate",
nil,
glib.ParameterReadWrite,
),
glib.NewStringParam(
"region",
"Bucket region",
"The region where the bucket is",
&defaultRegion,
glib.ParameterReadWrite,
),
glib.NewStringParam(
"bucket",
"Bucket name",
"The name of the MinIO bucket",
nil,
glib.ParameterReadWrite,
),
glib.NewStringParam(
"key",
"Object key",
"The key of the object inside the bucket",
nil,
glib.ParameterReadWrite,
),
glib.NewStringParam(
"access-key-id",
"Access Key ID",
"The access key ID to use for authentication. Use env: prefix to denote an environment variable.",
nil,
glib.ParameterReadWrite,
),
glib.NewStringParam(
"secret-access-key",
"Secret Access Key",
"The secret access key to use for authentication. Use env: prefix to denote an environment variable.",
nil,
glib.ParameterReadWrite,
),
}

View File

@@ -1,190 +0,0 @@
package main
import (
"bytes"
"context"
"crypto/sha256"
"fmt"
"io/ioutil"
"path"
minio "github.com/minio/minio-go/v7"
)
type seekWriter struct {
// The current position in the buffer
currentPosition int64
// The size of each part to upload
partSize int64
// A map of in memory parts to their content
parts map[int64][]byte
// A map of uploaded parts to the checksum at time of upload
uploadedParts map[int64]string
// A local reference to the minio client
client *minio.Client
bucket, key string
}
func newSeekWriter(client *minio.Client, partsize int64, bucket, key string) *seekWriter {
return &seekWriter{
currentPosition: 0,
partSize: partsize,
parts: make(map[int64][]byte),
uploadedParts: make(map[int64]string),
client: client,
bucket: bucket, key: key,
}
}
func (s *seekWriter) Write(p []byte) (int, error) {
wrote, err := s.buffer(0, p)
if err != nil {
return wrote, err
}
return wrote, s.flush(false)
}
func (s *seekWriter) Seek(offset int64, whence int) (int64, error) {
// Only needs to support SeekStart
s.currentPosition = offset
return s.currentPosition, nil
}
func (s *seekWriter) Close() error {
if err := s.flush(true); err != nil {
return err
}
if len(s.uploadedParts) == 0 {
return nil
}
opts := make([]minio.CopySrcOptions, len(s.uploadedParts))
for i := 0; i < len(opts); i++ {
opts[i] = minio.CopySrcOptions{
Bucket: s.bucket,
Object: s.keyForPart(int64(i)),
}
}
_, err := s.client.ComposeObject(context.Background(), minio.CopyDestOptions{
Bucket: s.bucket,
Object: s.key,
}, opts...)
if err != nil {
return err
}
for _, opt := range opts {
if err := s.client.RemoveObject(context.Background(), opt.Bucket, opt.Object, minio.RemoveObjectOptions{}); err != nil {
return err
}
}
return nil
}
func (s *seekWriter) buffer(from int, p []byte) (int, error) {
currentPart := s.currentPosition / s.partSize
writeat := s.currentPosition % s.partSize
lenToWrite := int64(len(p))
var buf []byte
var ok bool
if buf, ok = s.parts[currentPart]; !ok {
if _, ok := s.uploadedParts[currentPart]; !ok {
s.parts[currentPart] = make([]byte, writeat+lenToWrite)
buf = s.parts[currentPart]
} else {
var err error
buf, err = s.fetchRemotePart(currentPart)
if err != nil {
return from, err
}
}
}
if lenToWrite+writeat > s.partSize {
newbuf := make([]byte, s.partSize)
copy(newbuf, buf)
s.parts[currentPart] = newbuf
buf = newbuf
} else if lenToWrite+writeat > int64(len(buf)) {
newbuf := make([]byte, lenToWrite+writeat)
copy(newbuf, buf)
s.parts[currentPart] = newbuf
buf = newbuf
}
wrote := copy(buf[writeat:], p)
s.currentPosition += int64(wrote)
if int64(wrote) != lenToWrite {
return s.buffer(from+wrote, p[wrote:])
}
return from + wrote, nil
}
func (s *seekWriter) flush(all bool) error {
for part, buf := range s.parts {
if all || int64(len(buf)) == s.partSize {
if err := s.uploadPart(part, buf); err != nil {
return err
}
continue
}
if !all {
continue
}
if err := s.uploadPart(part, buf); err != nil {
return err
}
}
return nil
}
func (s *seekWriter) uploadPart(part int64, data []byte) error {
h := sha256.New()
if _, err := h.Write(data); err != nil {
return err
}
datasum := fmt.Sprintf("%x", h.Sum(nil))
if sum, ok := s.uploadedParts[part]; ok && sum == datasum {
return nil
}
_, err := s.client.PutObject(context.Background(),
s.bucket, s.keyForPart(part),
bytes.NewReader(data), int64(len(data)),
minio.PutObjectOptions{
ContentType: "application/octet-stream",
},
)
if err != nil {
return err
}
delete(s.parts, part)
s.uploadedParts[part] = datasum
return nil
}
func (s *seekWriter) fetchRemotePart(part int64) ([]byte, error) {
object, err := s.client.GetObject(context.Background(), s.bucket, s.keyForPart(part), minio.GetObjectOptions{})
if err != nil {
return nil, err
}
body, err := ioutil.ReadAll(object)
if err != nil {
return nil, err
}
s.parts[part] = body
return body, nil
}
func (s *seekWriter) keyForPart(part int64) string {
if path.Dir(s.key) == "" {
return fmt.Sprintf("%s_tmp/%d", s.key, part)
}
return path.Join(
path.Dir(s.key),
fmt.Sprintf("%s_tmp/%d", path.Base(s.key), part),
)
}

View File

@@ -1,11 +0,0 @@
package common
import "fmt"
var FinalizersCalled int = 0
func AssertFinalizersCalled(x int) {
if FinalizersCalled != x {
panic(fmt.Sprintf("finalizers did not run correctly, memory leak, wanted: %d, got: %d", x, FinalizersCalled))
}
}

View File

@@ -1,9 +0,0 @@
package common
func Must[T any](v T, err error) T {
if err != nil {
panic("got error:" + err.Error())
}
return v
}

View File

@@ -3,85 +3,40 @@ package custombin
import (
"time"
"github.com/go-gst/go-glib/glib"
"github.com/go-gst/go-gst/examples/plugins/registered_elements/internal/common"
"github.com/go-gst/go-gst/gst"
"github.com/go-gst/go-gst/pkg/gst"
)
type customBin struct {
// self *gst.Bin
source1 *gst.Element
source2 *gst.Element
mixer *gst.Element
gst.Bin // parent object must be first embedded field
source1 gst.Elementer
source2 gst.Elementer
mixer gst.Elementer
}
// ClassInit is the place where you define pads and properties
func (*customBin) ClassInit(klass *glib.ObjectClass) {
class := gst.ToElementClass(klass)
class.SetMetadata(
"custom test source",
"Src/Test",
"Demo source bin with volume",
"Wilhelm Bartel <bartel.wilhelm@gmail.com>",
)
class.AddPadTemplate(gst.NewPadTemplate(
"src",
gst.PadDirectionSource,
gst.PadPresenceAlways,
gst.NewCapsFromString("audio/x-raw,channels=2,rate=48000"),
))
}
// SetProperty gets called for every property. The id is the index in the slice defined above.
func (s *customBin) SetProperty(self *glib.Object, id uint, value *glib.Value) {}
// GetProperty is called to retrieve the value of the property at index `id` in the properties
// slice provided at ClassInit.
func (o *customBin) GetProperty(self *glib.Object, id uint) *glib.Value {
return nil
}
// New is called by the bindings to create a new instance of your go element. Use this to initialize channels, maps, etc.
//
// Think of New like the constructor of your struct
func (*customBin) New() glib.GoObjectSubclass {
return &customBin{}
}
// InstanceInit should initialize the element. Keep in mind that the properties are not yet present. When this is called.
func (s *customBin) InstanceInit(instance *glib.Object) {
self := gst.ToGstBin(instance)
s.source1 = common.Must(gst.NewElementWithProperties("gocustomsrc", map[string]interface{}{
// init should initialize the element. Keep in mind that the properties are not yet present. When this is called.
func (bin *customBin) init() {
bin.source1 = gst.ElementFactoryMakeWithProperties("gocustomsrc", map[string]interface{}{
"duration": int64(5 * time.Second),
}))
s.source2 = common.Must(gst.NewElementWithProperties("gocustomsrc", map[string]interface{}{
})
bin.source2 = gst.ElementFactoryMakeWithProperties("gocustomsrc", map[string]interface{}{
"duration": int64(10 * time.Second),
}))
})
s.mixer = common.Must(gst.NewElement("audiomixer"))
bin.mixer = gst.ElementFactoryMake("audiomixer", "")
klass := instance.Class()
class := gst.ToElementClass(klass)
self.AddMany(
s.source1,
s.source2,
s.mixer,
bin.AddMany(
bin.source1,
bin.source2,
bin.mixer,
)
srcpad := s.mixer.GetStaticPad("src")
srcpad := bin.mixer.StaticPad("src")
ghostpad := gst.NewGhostPadFromTemplate("src", srcpad, class.GetPadTemplate("src"))
ghostpad := gst.NewGhostPadFromTemplate("src", srcpad, bin.PadTemplate("src"))
s.source1.Link(s.mixer)
s.source2.Link(s.mixer)
bin.source1.Link(bin.mixer)
bin.source2.Link(bin.mixer)
self.AddPad(ghostpad.Pad)
}
func (s *customBin) Constructed(o *glib.Object) {}
func (s *customBin) Finalize(o *glib.Object) {
common.FinalizersCalled++
bin.AddPad(&ghostpad.Pad)
}

View File

@@ -1,22 +1,45 @@
package custombin
import (
"github.com/go-gst/go-gst/gst"
"github.com/diamondburned/gotk4/pkg/core/glib"
"github.com/go-gst/go-gst/pkg/gst"
)
// Register needs to be called after gst.Init() to make the gocustombin available in the standard
// gst element registry. After this call the element can be used like any other gstreamer element
func Register() bool {
return gst.RegisterElement(
registered := glib.RegisterSubclassWithConstructor[*customBin](
func() *customBin {
return &customBin{}
},
glib.WithOverrides[*customBin, gst.BinOverrides](func(b *customBin) gst.BinOverrides {
return gst.BinOverrides{}
}),
glib.WithClassInit[*gst.BinClass](func(bc *gst.BinClass) {
bc.ParentClass().SetStaticMetadata(
"custom test source",
"Src/Test",
"Demo source bin with volume",
"Wilhelm Bartel <bartel.wilhelm@gmail.com>",
)
bc.ParentClass().AddPadTemplate(gst.NewPadTemplate(
"src",
gst.PadSrc,
gst.PadAlways,
gst.CapsFromString("audio/x-raw,channels=2,rate=48000"),
))
}),
)
return gst.ElementRegister(
// no plugin:
nil,
// The name of the element
"gocustombin",
// The rank of the element
gst.RankNone,
uint(gst.RankNone),
// The GoElement implementation for the element
&customBin{},
// The base subclass this element extends
gst.ExtendsBin,
registered.Type(),
)
}

View File

@@ -1,13 +1,10 @@
package customsrc
import (
"fmt"
"math"
"time"
"github.com/go-gst/go-glib/glib"
"github.com/go-gst/go-gst/examples/plugins/registered_elements/internal/common"
"github.com/go-gst/go-gst/gst"
"github.com/go-gst/go-gst/pkg/gst"
)
// default: 1024, this value makes it easier to calculate num buffers with the sample rate
@@ -15,125 +12,44 @@ const samplesperbuffer = 4800
const samplerate = 48000
var properties = []*glib.ParamSpec{
glib.NewInt64Param(
"duration",
"duration",
"duration the source",
0,
math.MaxInt64,
0,
glib.ParameterReadWrite,
),
}
type customSrc struct {
// self *gst.Bin
source *gst.Element
volume *gst.Element
gst.Bin // parent must be embedded as the first field
duration time.Duration
}
source gst.Elementer
volume gst.Elementer
// ClassInit is the place where you define pads and properties
func (*customSrc) ClassInit(klass *glib.ObjectClass) {
class := gst.ToElementClass(klass)
class.SetMetadata(
"custom test source",
"Src/Test",
"Demo source bin with volume",
"Wilhelm Bartel <bartel.wilhelm@gmail.com>",
)
class.AddPadTemplate(gst.NewPadTemplate(
"src",
gst.PadDirectionSource,
gst.PadPresenceAlways,
gst.NewCapsFromString(fmt.Sprintf("audio/x-raw,channels=2,rate=%d", samplerate)),
))
class.InstallProperties(properties)
}
// SetProperty gets called for every property. The id is the index in the slice defined above.
func (s *customSrc) SetProperty(self *glib.Object, id uint, value *glib.Value) {
param := properties[id]
bin := gst.ToGstBin(self)
switch param.Name() {
case "duration":
state := bin.GetCurrentState()
if !(state == gst.StateNull || state != gst.StateReady) {
return
}
gv, _ := value.GoValue()
durI, _ := gv.(int64)
s.duration = time.Duration(durI)
s.updateSource()
}
}
// GetProperty is called to retrieve the value of the property at index `id` in the properties
// slice provided at ClassInit.
func (o *customSrc) GetProperty(self *glib.Object, id uint) *glib.Value {
param := properties[id]
switch param.Name() {
case "duration":
v, _ := glib.GValue(int64(o.duration))
return v
}
return nil
}
func (*customSrc) New() glib.GoObjectSubclass {
return &customSrc{}
Duration time.Duration `glib:"duration"`
}
// InstanceInit should initialize the element. Keep in mind that the properties are not yet present. When this is called.
func (s *customSrc) InstanceInit(instance *glib.Object) {
self := gst.ToGstBin(instance)
func (bin *customSrc) init() {
bin.source = gst.ElementFactoryMake("audiotestsrc", "")
bin.volume = gst.ElementFactoryMake("volume", "")
s.source = common.Must(gst.NewElement("audiotestsrc"))
s.volume = common.Must(gst.NewElement("volume"))
klass := instance.Class()
class := gst.ToElementClass(klass)
self.AddMany(
s.source,
s.volume,
bin.AddMany(
bin.source,
bin.volume,
)
srcpad := s.volume.GetStaticPad("src")
srcpad := bin.volume.StaticPad("src")
ghostpad := gst.NewGhostPadFromTemplate("src", srcpad, class.GetPadTemplate("src"))
ghostpad := gst.NewGhostPadFromTemplate("src", srcpad, bin.PadTemplate("src"))
gst.ElementLinkMany(
s.source,
s.volume,
gst.LinkMany(
bin.source,
bin.volume,
)
self.AddPad(ghostpad.Pad)
bin.AddPad(&ghostpad.Pad)
s.updateSource()
}
func (s *customSrc) Constructed(o *glib.Object) {}
func (s *customSrc) Finalize(o *glib.Object) {
common.FinalizersCalled++
bin.updateSource()
}
// updateSource will get called to update the audiotestsrc when a property changes
func (s *customSrc) updateSource() {
if s.source != nil {
numBuffers := (float64(s.duration / time.Second)) / (float64(samplesperbuffer) / float64(samplerate))
numBuffers := (float64(s.Duration / time.Second)) / (float64(samplesperbuffer) / float64(samplerate))
s.source.SetProperty("num-buffers", int(math.Ceil(numBuffers)))
s.source.SetObjectProperty("num-buffers", int(math.Ceil(numBuffers)))
}
}

View File

@@ -1,22 +1,44 @@
package customsrc
import (
"github.com/go-gst/go-gst/gst"
"github.com/diamondburned/gotk4/pkg/core/glib"
"github.com/go-gst/go-gst/pkg/gst"
)
// Register needs to be called after gst.Init() to make the gocustomsrc available in the standard
// gst element registry. After this call the element can be used like any other gstreamer element
func Register() bool {
return gst.RegisterElement(
registered := glib.RegisterSubclassWithConstructor[*customSrc](
func() *customSrc {
return &customSrc{}
},
glib.WithOverrides[*customSrc, gst.BinOverrides](func(b *customSrc) gst.BinOverrides {
return gst.BinOverrides{}
}),
glib.WithClassInit[*gst.BinClass](func(bc *gst.BinClass) {
bc.ParentClass().SetStaticMetadata(
"custom test source",
"Src/Test",
"Demo source bin with volume",
"Wilhelm Bartel <bartel.wilhelm@gmail.com>",
)
bc.ParentClass().AddPadTemplate(gst.NewPadTemplate(
"src",
gst.PadSrc,
gst.PadAlways,
gst.CapsFromString("audio/x-raw,channels=2,rate=48000"),
))
}),
)
return gst.ElementRegister(
// no plugin:
nil,
// The name of the element
"gocustomsrc",
// The rank of the element
gst.RankNone,
// The GoElement implementation for the element
&customSrc{},
// The base subclass this element extends
gst.ExtendsBin,
uint(gst.RankNone),
registered.Type(),
)
}

View File

@@ -6,13 +6,12 @@ import (
"os"
"os/signal"
"path/filepath"
"runtime/pprof"
"time"
"github.com/go-gst/go-glib/glib"
"github.com/go-gst/go-gst/examples/plugins/registered_elements/internal/common"
"github.com/diamondburned/gotk4/pkg/glib/v2"
"github.com/go-gst/go-gst/examples/plugins/registered_elements/internal/custombin"
"github.com/go-gst/go-gst/examples/plugins/registered_elements/internal/customsrc"
"github.com/go-gst/go-gst/gst"
"github.com/go-gst/go-gst/pkg/gst"
)
func run(ctx context.Context) error {
@@ -25,32 +24,34 @@ func run(ctx context.Context) error {
return err
}
gst.Init(nil)
gst.Init()
customsrc.Register()
custombin.Register()
systemclock := gst.ObtainSystemClock()
systemclock := gst.SystemClockObtain()
pipeline, err := gst.NewPipelineFromString("gocustombin ! fakesink sync=true")
ret, err := gst.ParseLaunch("gocustombin ! fakesink sync=true")
if err != nil {
return err
}
pipeline.ForceClock(systemclock.Clock)
pipeline := ret.(*gst.Pipeline)
bus := pipeline.GetBus()
pipeline.UseClock(systemclock)
bus := pipeline.Bus()
mainloop := glib.NewMainLoop(glib.MainContextDefault(), false)
pipeline.SetState(gst.StatePlaying)
bus.AddWatch(func(msg *gst.Message) bool {
bus.AddWatch(0, func(bus *gst.Bus, msg *gst.Message) bool {
switch msg.Type() {
case gst.MessageStateChanged:
old, new := msg.ParseStateChanged()
dot := pipeline.DebugBinToDotData(gst.DebugGraphShowVerbose)
old, new, _ := msg.ParseStateChanged()
dot := gst.DebugBinToDotData(&pipeline.Bin, gst.DebugGraphShowVerbose)
f, err := os.OpenFile(filepath.Join(wd, fmt.Sprintf("pipeline-%s-to-%s.dot", old, new)), os.O_TRUNC|os.O_CREATE|os.O_RDWR, 0600)
@@ -69,14 +70,12 @@ func run(ctx context.Context) error {
return false
}
case gst.MessageEOS:
fmt.Println(msg.String())
case gst.MessageEos:
fmt.Println("reached EOS")
cancel()
return false
}
// the String method is expensive and should not be used in prodution:
fmt.Println(msg.String())
return true
})
@@ -86,7 +85,7 @@ func run(ctx context.Context) error {
mainloop.Quit()
pipeline.BlockSetState(gst.StateNull)
pipeline.BlockSetState(gst.StateNull, gst.ClockTime(time.Second))
gst.Deinit()
@@ -101,14 +100,4 @@ func main() {
if err != nil {
fmt.Fprintln(os.Stderr, err)
}
// this is very helpful to find memory leaks, see github.com/go-gst/asanutils
// asanutils.CheckLeaks()
prof := pprof.Lookup("go-glib-reffed-objects")
prof.WriteTo(os.Stdout, 1)
// we are creating 3 custom elements in total. If this panics, then the go struct will memory leak
common.AssertFinalizersCalled(3)
}

View File

@@ -1,14 +0,0 @@
module github.com/go-gst/go-gst/examples/plugins/websocketsrc
go 1.22
require (
github.com/go-gst/go-glib v1.2.1
github.com/go-gst/go-gst v1.2.1
golang.org/x/net v0.29.0
)
require (
github.com/mattn/go-pointer v0.0.1 // indirect
golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e // indirect
)

View File

@@ -1,10 +0,0 @@
github.com/go-gst/go-glib v1.2.1 h1:ibAr5N1NmuHmZ5RaCFjFjeUy0Rk3t3LgvGutmwBeR9E=
github.com/go-gst/go-glib v1.2.1/go.mod h1:JybIYeoHNwCkHGaBf1fHNIaM4sQTrJPkPLsi7dmPNOU=
github.com/go-gst/go-gst v1.2.1 h1:FqUFGFllbuC8LkQoqULgAui2ZS0VU1WEBCNekIMcBEE=
github.com/go-gst/go-gst v1.2.1/go.mod h1:OGPRsJdvNYCKjt3e4H4i8J6KVd2Wk5S2lzsEQ8mO1+g=
github.com/mattn/go-pointer v0.0.1 h1:n+XhsuGeVO6MEAp7xyEukFINEa+Quek5psIR/ylA6o0=
github.com/mattn/go-pointer v0.0.1/go.mod h1:2zXcozF6qYGgmsG+SeTZz3oAbFLdD3OWqnUbNvJZAlc=
golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e h1:I88y4caeGeuDQxgdoFPUq097j7kNfw6uvuiNxUBfcBk=
golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e/go.mod h1:akd2r19cwCdwSwWeIdzYQGa/EZZyqcOdwWiwj5L5eKQ=
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=

View File

@@ -1,552 +0,0 @@
// This is a GStreamer element implemented in Go that uses inbound data on a websocket
// connection as the source for the stream.
//
// In order to build the plugin for use by GStreamer, you can do the following:
//
// $ go generate
// $ go build -o libgstwebsocketsrc.so -buildmode c-shared .
//
// +plugin:Name=websocketsrc
// +plugin:Description=GStreamer Websocket Source
// +plugin:Version=v0.0.1
// +plugin:License=gst.LicenseLGPL
// +plugin:Source=go-gst
// +plugin:Package=examples
// +plugin:Origin=https://github.com/go-gst/go-gst
// +plugin:ReleaseDate=2021-01-10
//
// +element:Name=websocketsrc
// +element:Rank=gst.RankNone
// +element:Impl=websocketSrc
// +element:Subclass=gst.ExtendsElement
//
//go:generate gst-plugin-gen
package main
import (
"context"
"fmt"
"net/http"
"sync"
"time"
"github.com/go-gst/go-glib/glib"
"github.com/go-gst/go-gst/gst"
"golang.org/x/net/websocket"
)
// MaxPayloadSize to accept over websocket connections. Also the size of buffers.
const MaxPayloadSize = 1024
// Defaults //
var (
DefaultAddress string = "0.0.0.0"
DefaultPort int = 5000
DefaultRetrieveRemoteAddr bool = true
)
func main() {}
// CAT is the log category for the websocketsrc.
var CAT = gst.NewDebugCategory(
"websocketsrc",
gst.DebugColorNone,
"WebsocketSrc Element",
)
var properties = []*glib.ParamSpec{
glib.NewStringParam(
"address",
"Server Address",
"The address to bind the server to",
&DefaultAddress,
glib.ParameterReadWrite,
),
glib.NewIntParam(
"port",
"Server Port",
"The port to bind the server to",
1024, 65535,
DefaultPort,
glib.ParameterReadWrite,
),
// not implemented yet
glib.NewBoolParam(
"retrieve-remote-addr",
"Retrieve Remote Address",
"Include the remote client's address in the buffer metadata",
DefaultRetrieveRemoteAddr,
glib.ParameterReadWrite,
),
}
// Internals //
// A private settings struct to hold the values of the above parameters
type settings struct {
address string
port int
retrieveRemoteAddr bool
}
// Helper function to retrieve a settings object set to the default values.
func defaultSettings() *settings {
return &settings{
address: DefaultAddress,
port: DefaultPort,
retrieveRemoteAddr: DefaultRetrieveRemoteAddr,
}
}
// The internal state object
type state struct {
serverStarted, channelsStarted, sentInitialEvents, sentSegment bool
server *http.Server
srcpad *gst.Pad
bufferpool *gst.BufferPool
bufferchan chan []byte
stopchan chan struct{}
mux sync.Mutex
connmux sync.Mutex
}
// Base struct definition for the websocket src
type websocketSrc struct {
settings *settings
state *state
}
// prepare verifies the src pad has been added to the element, and then sets up server
// handlers and a buffer pool
func (w *websocketSrc) prepare(elem *gst.Element) error {
w.state.mux.Lock()
defer w.state.mux.Unlock()
// Make sure we have a srcpad
if w.state.srcpad == nil {
w.setupSrcPad(elem)
}
elem.Log(CAT, gst.LevelDebug, "Creating channels for goroutines")
// Setup a channel for handling buffers
w.state.bufferchan = make(chan []byte)
w.state.stopchan = make(chan struct{})
elem.Log(CAT, gst.LevelDebug, "Setting up the HTTP server")
// Setup the HTTP server instance
w.state.server = &http.Server{
Addr: fmt.Sprintf("%s:%d", w.settings.address, w.settings.port),
ReadTimeout: 300 * time.Second,
WriteTimeout: 300 * time.Second,
Handler: &websocket.Server{
// Don't check the Origin header
Handshake: func(*websocket.Config, *http.Request) error { return nil },
Handler: func(conn *websocket.Conn) {
elem.Log(CAT, gst.LevelInfo, fmt.Sprintf("Received new connection from: %s", conn.Request().RemoteAddr))
// Only allow a stream from one client at a time
w.state.connmux.Lock()
defer w.state.connmux.Unlock()
conn.PayloadType = websocket.BinaryFrame
conn.MaxPayloadBytes = MaxPayloadSize
for {
// Read the PayloadSize into a bytes slice
buf := make([]byte, conn.MaxPayloadBytes)
size, err := conn.Read(buf)
if err != nil {
elem.ErrorMessage(gst.DomainStream, gst.StreamErrorFailed, "Error reading bytes from client", err.Error())
return
}
// The goroutine listening for buffers will use the size to determine offsets,
// So trim the zeroes if we receive a buffer less than the requested size.
if size < conn.MaxPayloadBytes {
trimmed := make([]byte, size)
copy(trimmed, buf)
buf = trimmed
}
// Queue the buffer for processing
elem.Log(CAT, gst.LevelLog, fmt.Sprintf("Queueing %d bytes for processing", len(buf)))
w.state.bufferchan <- buf
}
},
},
}
elem.Log(CAT, gst.LevelDebug, "Configuring a buffer pool")
// Configure a buffer pool
w.state.bufferpool = gst.NewBufferPool()
cfg := w.state.bufferpool.GetConfig()
cfg.SetParams(nil, MaxPayloadSize, 0, 0)
w.state.bufferpool.SetConfig(cfg)
w.state.bufferpool.SetActive(true)
return nil
}
// This runs in a goroutine and checks for pause events or new buffers to push onto the pad.
func (w *websocketSrc) watchChannels(elem *gst.Element) {
for {
select {
case data, more := <-w.state.bufferchan:
if !more {
elem.Log(CAT, gst.LevelInfo, "Buffer channel has closed, stopping processing")
return
}
elem.Log(CAT, gst.LevelDebug, "Retrieving buffer from the pool")
buf, ret := w.state.bufferpool.AcquireBuffer(nil)
if ret != gst.FlowOK {
elem.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed,
fmt.Sprintf("Could not allocate buffer for data: %s", ret), "")
return
}
elem.Log(CAT, gst.LevelDebug, "Writing data to buffer")
buf.Map(gst.MapWrite).WriteData(data)
buf.Unmap()
buf.SetSize(int64(len(data)))
elem.Log(CAT, gst.LevelDebug, "Pushing buffer onto src pad")
w.pushPrelude(elem)
if ret := w.state.srcpad.Push(buf); ret == gst.FlowError {
elem.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed,
fmt.Sprintf("Failed to push buffer to srcpad: %s", ret), "")
return
}
case <-w.state.stopchan:
elem.Log(CAT, gst.LevelInfo, "Received signal on stopchan to halt buffer processing")
return
}
}
}
// start will start the websocket server and the buffer processing goroutines.
func (w *websocketSrc) start(elem *gst.Element) {
w.state.mux.Lock()
defer w.state.mux.Unlock()
if !w.state.serverStarted {
elem.Log(CAT, gst.LevelInfo, "Starting the HTTP server")
go w.startServer(elem)
w.state.serverStarted = true
}
if !w.state.channelsStarted {
elem.Log(CAT, gst.LevelInfo, "Starting channel goroutine")
go w.watchChannels(elem)
w.state.channelsStarted = true
}
elem.Log(CAT, gst.LevelInfo, "WebsocketSrc has started")
}
// starts the server, is called as a goroutine.
func (w *websocketSrc) startServer(elem *gst.Element) {
if err := w.state.server.ListenAndServe(); err != nil {
if err == http.ErrServerClosed {
elem.Log(CAT, gst.LevelInfo, "Server exited cleanly")
return
}
elem.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed, "Failed to start websocket server", err.Error())
}
}
// Checks if initial stream events were sent and pushes them onto the pad if needed.
func (w *websocketSrc) pushPrelude(elem *gst.Element) {
w.state.mux.Lock()
defer w.state.mux.Unlock()
if !w.state.sentInitialEvents {
elem.Log(CAT, gst.LevelDebug, "Sending stream start event")
streamid := "blahblahblah"
ev := gst.NewStreamStartEvent(streamid)
if res := w.state.srcpad.PushEvent(ev); !res {
elem.ErrorMessage(gst.DomainLibrary, gst.LibraryErrorFailed, "Failed to notify elements of stream start", "")
return
}
w.state.sentInitialEvents = true
}
if !w.state.sentSegment {
elem.Log(CAT, gst.LevelDebug, "Sending new segment event")
ev := gst.NewSegmentEvent(gst.NewFormattedSegment(gst.FormatTime))
if res := w.state.srcpad.PushEvent(ev); !res {
elem.ErrorMessage(gst.DomainLibrary, gst.LibraryErrorFailed, "Failed to notify elements of new segment", "")
return
}
w.state.sentSegment = true
}
}
// Stops the goroutines and the websocket server
func (w *websocketSrc) stop(elem *gst.Element) {
w.state.mux.Lock()
defer w.state.mux.Unlock()
if w.state.channelsStarted {
elem.Log(CAT, gst.LevelInfo, "Sending stop signal to go routines")
w.state.stopchan <- struct{}{}
w.state.channelsStarted = false
}
if w.state.serverStarted {
elem.Log(CAT, gst.LevelInfo, "Shutting down HTTP server")
w.state.server.Shutdown(context.Background())
w.state.serverStarted = false
}
}
// Just stops the buffer processing routine, but leaves the server running
func (w *websocketSrc) pause(elem *gst.Element) {
w.state.mux.Lock()
defer w.state.mux.Unlock()
elem.Log(CAT, gst.LevelDebug, "Sending stop signal to go routines")
w.state.stopchan <- struct{}{}
w.state.channelsStarted = false
}
// Tears down all resources for the element.
func (w *websocketSrc) unprepare(elem *gst.Element) {
w.state.mux.Lock()
defer w.state.mux.Unlock()
elem.Log(CAT, gst.LevelDebug, "Freeing pads and buffers")
w.state.bufferpool.SetActive(false)
w.state.bufferpool.Unref()
elem.Log(CAT, gst.LevelDebug, "Closing channels and clearing state")
close(w.state.bufferchan)
close(w.state.stopchan)
w.state = &state{}
}
// Sets up a src pad for an element and adds the necessary callbacks.
func (w *websocketSrc) setupSrcPad(elem *gst.Element) {
// Configure the src pad
elem.Log(CAT, gst.LevelDebug, "Configuring the src pad")
w.state.srcpad = gst.NewPadFromTemplate(elem.GetPadTemplates()[0], "src")
elem.AddPad(w.state.srcpad)
// Set a function for handling events
w.state.srcpad.SetEventFunction(func(pad *gst.Pad, parent *gst.Object, event *gst.Event) bool {
var ret bool
pad.Log(CAT, gst.LevelLog, fmt.Sprintf("Handling event: %s", event.Type()))
switch event.Type() {
case gst.EventTypeReconfigure:
ret = true
case gst.EventTypeLatency:
ret = true
default:
ret = false
}
if ret {
pad.Log(CAT, gst.LevelDebug, fmt.Sprintf("Handled event: %s", event.Type()))
} else {
pad.Log(CAT, gst.LevelLog, fmt.Sprintf("Didn't handle event: %s", event.Type()))
}
return ret
})
// Set a query handler for the src pad
w.state.srcpad.SetQueryFunction(func(pad *gst.Pad, parent *gst.Object, query *gst.Query) bool {
var ret bool
pad.Log(CAT, gst.LevelLog, fmt.Sprintf("Handling query: %s", query.Type()))
switch query.Type() {
case gst.QueryLatency:
query.SetLatency(true, 0, gst.ClockTimeNone)
ret = true
case gst.QueryScheduling:
query.SetScheduling(gst.SchedulingFlagSequential, 1, -1, 0)
query.AddSchedulingMode(gst.PadModePush)
ret = true
case gst.QueryCaps:
query.SetCapsResult(gst.NewAnyCaps())
ret = true
default:
ret = false
}
if ret {
pad.Log(CAT, gst.LevelDebug, fmt.Sprintf("Handled query: %s", query.Type()))
} else {
pad.Log(CAT, gst.LevelLog, fmt.Sprintf("Didn't handle query: %s", query.Type()))
}
return ret
})
}
// * ObjectSubclass * //
func (w *websocketSrc) New() glib.GoObjectSubclass {
return &websocketSrc{
settings: defaultSettings(),
state: &state{},
}
}
func (w *websocketSrc) ClassInit(klass *glib.ObjectClass) {
class := gst.ToElementClass(klass)
class.SetMetadata(
"Websocket Src",
"Src/Websocket",
"Write stream from a connection over a websocket server",
"Avi Zimmerman <avi.zimmerman@gmail.com>",
)
class.AddPadTemplate(gst.NewPadTemplate(
"src",
gst.PadDirectionSource,
gst.PadPresenceAlways,
gst.NewAnyCaps(),
))
class.InstallProperties(properties)
}
// * Object * //
func (w *websocketSrc) SetProperty(self *glib.Object, id uint, value *glib.Value) {
prop := properties[id]
switch prop.Name() {
case "address":
val, err := value.GetString()
if err != nil {
gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorFailed,
"Could not get string from GValue",
err.Error(),
)
return
}
w.settings.address = val
case "port":
val, err := value.GoValue()
if err != nil {
gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorFailed,
"Could not get go value from GValue",
err.Error(),
)
return
}
intval, ok := val.(int)
if !ok {
gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorFailed,
fmt.Sprintf("Could not coerce govalue %v to integer", val),
err.Error(),
)
return
}
w.settings.port = intval
case "retrieve-remote-addr":
val, err := value.GoValue()
if err != nil {
gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorFailed,
"Could not get go value from GValue",
err.Error(),
)
return
}
boolval, ok := val.(bool)
if !ok {
gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorFailed,
fmt.Sprintf("Could not coerce govalue %v to bool", val),
err.Error(),
)
return
}
w.settings.retrieveRemoteAddr = boolval
default:
gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorSettings,
fmt.Sprintf("Cannot set invalid property %s", prop.Name()), "")
}
}
func (w *websocketSrc) GetProperty(self *glib.Object, id uint) *glib.Value {
prop := properties[id]
var localVal interface{}
switch prop.Name() {
case "address":
localVal = w.settings.address
case "port":
localVal = w.settings.port
case "retrieve-remote-addr":
localVal = w.settings.retrieveRemoteAddr
default:
gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorSettings,
fmt.Sprintf("Cannot get invalid property %s", prop.Name()), "")
return nil
}
val, err := glib.GValue(localVal)
if err != nil {
gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorFailed,
fmt.Sprintf("Could not convert %v to GValue", localVal),
err.Error(),
)
}
return val
}
func (w *websocketSrc) Constructed(self *glib.Object) {
elem := gst.ToElement(self)
w.setupSrcPad(elem)
}
// * Element * //
func (w *websocketSrc) ChangeState(self *gst.Element, transition gst.StateChange) (ret gst.StateChangeReturn) {
self.Log(CAT, gst.LevelTrace, fmt.Sprintf("Changing state: %s", transition))
ret = gst.StateChangeSuccess
switch transition {
case gst.StateChangeNullToReady:
if err := w.prepare(self); err != nil {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed, err.Error(), "")
return gst.StateChangeFailure
}
case gst.StateChangePlayingToPaused:
w.pause(self)
case gst.StateChangeReadyToNull:
w.unprepare(self)
}
// Apply the transition to the parent element
if ret = self.ParentChangeState(transition); ret == gst.StateChangeFailure {
return
}
switch transition {
case gst.StateChangeReadyToPaused:
ret = gst.StateChangeNoPreroll
case gst.StateChangePausedToPlaying:
w.start(self)
case gst.StateChangePlayingToPaused:
ret = gst.StateChangeNoPreroll
case gst.StateChangePausedToReady:
w.stop(self)
}
return
}

View File

@@ -20,29 +20,32 @@ import (
"strings"
"time"
"github.com/go-gst/go-glib/glib"
"github.com/go-gst/go-gst/examples"
"github.com/go-gst/go-gst/gst"
"github.com/diamondburned/gotk4/pkg/glib/v2"
"github.com/go-gst/go-gst/pkg/gst"
)
func queries(mainLoop *glib.MainLoop) error {
func queries() error {
if len(os.Args) < 2 {
fmt.Println("USAGE: queries <pipeline>")
os.Exit(1)
}
gst.Init(nil)
gst.Init()
mainLoop := glib.NewMainLoop(glib.MainContextDefault(), false)
// Let GStreamer create a pipeline from the parsed launch syntax on the cli.
pipelineStr := strings.Join(os.Args[1:], " ")
pipeline, err := gst.NewPipelineFromString(pipelineStr)
ret, err := gst.ParseLaunch(pipelineStr)
if err != nil {
return err
}
pipeline := ret.(gst.Binner)
// Get a reference to the pipeline bus
bus := pipeline.GetPipelineBus()
bus := pipeline.Bus()
// Start the pipeline
pipeline.SetState(gst.StatePlaying)
@@ -54,14 +57,14 @@ func queries(mainLoop *glib.MainLoop) error {
// Create a new position query and send it to the pipeline.
// This will traverse all elements in the pipeline, until one feels
// capable of answering the query.
pos := gst.NewPositionQuery(gst.FormatTime)
pos := gst.NewQueryPosition(gst.FormatTime)
if ok := pipeline.Query(pos); !ok {
fmt.Println("Failed to query position from pipeline")
}
// Create a new duration query and send it to the pipeline.
// This will traverse all elements in the pipeline, until one feels
// capable of answering the query.
dur := gst.NewDurationQuery(gst.FormatTime)
dur := gst.NewQueryDuration(gst.FormatTime)
if ok := pipeline.Query(dur); !ok {
fmt.Println("Failed to query duration from pipeline")
}
@@ -77,14 +80,14 @@ func queries(mainLoop *glib.MainLoop) error {
}
}()
bus.AddWatch(func(msg *gst.Message) bool {
bus.AddWatch(0, func(bus *gst.Bus, msg *gst.Message) bool {
switch msg.Type() {
case gst.MessageEOS:
case gst.MessageEos:
mainLoop.Quit()
case gst.MessageError:
gstErr := msg.ParseError()
fmt.Printf("Error from %s: %s\n", msg.Source(), gstErr.Error())
if debug := gstErr.DebugString(); debug != "" {
gstErr, debug := msg.ParseError()
fmt.Printf("Error from %s: %s\n", msg.Src(), gstErr.Error())
if debug != "" {
fmt.Println("go-gst-debug:", debug)
}
mainLoop.Quit()
@@ -100,5 +103,7 @@ func queries(mainLoop *glib.MainLoop) error {
}
func main() {
examples.RunLoop(queries)
if err := queries(); err != nil {
fmt.Println(err)
}
}

View File

@@ -3,11 +3,11 @@ package main
import (
"errors"
"fmt"
"github.com/go-gst/go-glib/glib"
"github.com/go-gst/go-gst/examples"
"github.com/go-gst/go-gst/gst"
"os"
"time"
"github.com/go-gst/go-glib/glib"
"github.com/go-gst/go-gst/pkg/gst"
)
type workflow struct {
@@ -15,58 +15,39 @@ type workflow struct {
}
func (w *workflow) newSrc() {
src, err := gst.NewElementWithName("videotestsrc", "src2")
if err != nil {
fmt.Printf("err %v\n", err)
return
}
src.Set("is-live", true)
src := gst.ElementFactoryMake("videotestsrc", "src2")
src.SetObjectProperty("is-live", true)
w.Add(src)
caps, err := gst.NewElementWithName("capsfilter", "caps2")
if err != nil {
fmt.Printf("err %v\n", err)
return
}
caps.Set("caps", gst.NewCapsFromString("video/x-raw , width=640, height=360"))
caps := gst.ElementFactoryMake("capsfilter", "caps2")
caps.SetObjectProperty("caps", gst.CapsFromString("video/x-raw , width=640, height=360"))
w.Add(caps)
src.Link(caps)
// Get a sink pad on compositor
mixer, err := w.GetElementByName("mixer")
if err != nil {
fmt.Printf("err %v\n", err)
return
}
pad := mixer.GetRequestPad("sink_%u")
pad.SetProperty("xpos", 640)
pad.SetProperty("ypos", 0)
mixer := w.ByName("mixer")
caps.GetStaticPad("src").Link(pad)
pad := mixer.GetRequestPad("sink_%u")
pad.SetObjectProperty("xpos", 640)
pad.SetObjectProperty("ypos", 0)
caps.StaticPad("src").Link(pad)
caps.SyncStateWithParent()
src.SyncStateWithParent()
}
func (w *workflow) delSrc() {
mixer, err := w.GetElementByName("mixer")
if err != nil {
fmt.Printf("err %v\n", err)
return
}
mixer := w.ByName("mixer")
src, err := w.GetElementByName("src2")
if err != nil {
fmt.Printf("err %v\n", err)
return
}
caps, err := w.GetElementByName("caps2")
if err != nil {
fmt.Printf("err %v\n", err)
return
}
pad := mixer.GetStaticPad("sink_1")
src := w.ByName("src2")
caps := w.ByName("caps2")
pad := mixer.StaticPad("sink_1")
if pad == nil {
fmt.Printf("pad is null\n")
return
@@ -81,26 +62,16 @@ func (w *workflow) delSrc() {
}
func createPipeline() (*gst.Pipeline, error) {
gst.Init(nil)
var err error
var w workflow
w.Pipeline, err = gst.NewPipeline("")
gst.Init()
ret, err := gst.ParseLaunch("videotestsrc ! video/x-raw , capsfilter caps=width=640,height=360 name=caps1 ! compositor name=mixer ! autovideosink")
if err != nil {
fmt.Println(err)
os.Exit(2)
}
elements, err := gst.NewElementMany("videotestsrc", "capsfilter", "compositor", "autovideosink")
caps := elements[1]
caps.SetProperty("caps", gst.NewCapsFromString("video/x-raw , width=640, height=360"))
caps.SetProperty("name", "caps1")
mixer := elements[2]
mixer.SetProperty("name", "mixer")
if err != nil {
fmt.Printf("err %v\n", err)
return nil, err
}
w.AddMany(elements...)
gst.ElementLinkMany(elements...)
var w workflow
w.Pipeline = ret.(*gst.Pipeline)
go func() {
time.Sleep(time.Second)
@@ -118,19 +89,19 @@ func runPipeline(loop *glib.MainLoop, pipeline *gst.Pipeline) error {
pipeline.SetState(gst.StatePlaying)
// Add a message watch to the bus to quit on any error
pipeline.GetPipelineBus().AddWatch(func(msg *gst.Message) bool {
pipeline.Bus().AddWatch(0, func(bus *gst.Bus, msg *gst.Message) bool {
var err error
// If the stream has ended or any element posts an error to the
// bus, populate error.
switch msg.Type() {
case gst.MessageEOS:
case gst.MessageEos:
err = errors.New("end-of-stream")
case gst.MessageError:
// The parsed error implements the error interface, but also
// contains additional debug information.
gerr := msg.ParseError()
fmt.Println("go-gst-debug:", gerr.DebugString())
gerr, debug := msg.ParseError()
fmt.Println("go-gst-debug:", debug)
err = gerr
}
@@ -149,11 +120,12 @@ func runPipeline(loop *glib.MainLoop, pipeline *gst.Pipeline) error {
}
func main() {
examples.RunLoop(func(loop *glib.MainLoop) error {
pipeline, err := createPipeline()
if err != nil {
return err
}
return runPipeline(loop, pipeline)
})
loop := glib.NewMainLoop(glib.MainContextDefault(), false)
pipeline, err := createPipeline()
if err != nil {
os.Exit(2)
}
runPipeline(loop, pipeline)
}

View File

@@ -24,30 +24,29 @@ package main
import (
"fmt"
"github.com/go-gst/go-gst/examples"
"github.com/go-gst/go-gst/gst"
coreglib "github.com/diamondburned/gotk4/pkg/core/glib"
"github.com/go-gst/go-gst/pkg/gst"
)
func tagsetter() error {
gst.Init(nil)
gst.Init()
pipeline, err := gst.NewPipelineFromString(
ret, err := gst.ParseLaunch(
"audiotestsrc wave=white-noise num-buffers=10000 ! flacenc ! filesink location=test.flac",
)
if err != nil {
return err
}
pipeline := ret.(*gst.Pipeline)
// Query the pipeline for elements implementing the GstTagsetter interface.
// In our case, this will return the flacenc element.
element, err := pipeline.GetByInterface(gst.InterfaceTagSetter)
if err != nil {
return err
}
element := pipeline.ByInterface(gst.GTypeTagSetter)
// We actually just retrieved a *gst.Element with the above call. We can retrieve
// the underying TagSetter interface like this.
tagsetter := element.TagSetter()
tagsetter := element.(*gst.TagSetter)
// Tell the element implementing the GstTagsetter interface how to handle already existing
// metadata.
@@ -57,14 +56,14 @@ func tagsetter() error {
//
// The first parameter gst.TagMergeAppend tells the tagsetter to append this title
// if there already is one.
tagsetter.AddTagValue(gst.TagMergeAppend, gst.TagTitle, "Special randomized white-noise")
tagsetter.AddTagValue(gst.TagMergeAppend, gst.TAG_TITLE, coreglib.NewValue("Special randomized white-noise"))
pipeline.SetState(gst.StatePlaying)
var cont bool
var pipelineErr error
for {
msg := pipeline.GetPipelineBus().TimedPop(gst.ClockTimeNone)
msg := pipeline.Bus().TimedPop(gst.ClockTimeNone)
if msg == nil {
break
}
@@ -78,18 +77,18 @@ func tagsetter() error {
}
func handleMessage(msg *gst.Message) (bool, error) {
defer msg.Unref()
switch msg.Type() {
case gst.MessageTag:
fmt.Println(msg) // Prirnt our tags
case gst.MessageEOS:
case gst.MessageEos:
return false, nil
case gst.MessageError:
return false, msg.ParseError()
err, _ := msg.ParseError()
return false, err
}
return true, nil
}
func main() {
examples.Run(tagsetter)
}

View File

@@ -17,57 +17,43 @@ import (
"os"
"time"
"github.com/go-gst/go-glib/glib"
"github.com/go-gst/go-gst/examples"
"github.com/go-gst/go-gst/gst"
"github.com/go-gst/go-gst/pkg/gst"
)
func tagsetter(mainLoop *glib.MainLoop) error {
gst.Init(nil)
func tagsetter() error {
gst.Init()
if len(os.Args) < 2 {
return errors.New("usage: toc <file>")
}
pipeline, err := gst.NewPipeline("")
if err != nil {
return err
}
pipeline := gst.NewPipeline("")
src, err := gst.NewElement("filesrc")
if err != nil {
return err
}
decodebin, err := gst.NewElement("decodebin")
if err != nil {
return err
}
src := gst.ElementFactoryMake("filesrc", "")
src.SetProperty("location", os.Args[1])
decodebin := gst.ElementFactoryMake("decodebin", "")
src.SetObjectProperty("location", os.Args[1])
pipeline.AddMany(src, decodebin)
gst.ElementLinkMany(src, decodebin)
gst.LinkMany(src, decodebin)
// Connect to decodebin's pad-added signal, that is emitted whenever it found another stream
// from the input file and found a way to decode it to its raw format.
decodebin.Connect("pad-added", func(_ *gst.Element, srcPad *gst.Pad) {
decodebin.ConnectPadAdded(func(srcPad *gst.Pad) {
// In this example, we are only interested about parsing the ToC, so
// we simply pipe every encountered stream into a fakesink, essentially
// throwing away the data.
elems, err := gst.NewElementMany("queue", "fakesink")
if err != nil {
fmt.Println("Could not create decodebin pipeline")
return
}
pipeline.AddMany(elems...)
gst.ElementLinkMany(elems...)
for _, e := range elems {
e.SyncStateWithParent()
}
queue := gst.ElementFactoryMake("queue", "")
fakesink := gst.ElementFactoryMake("fakesink", "")
queue := elems[0]
sinkPad := queue.GetStaticPad("sink")
pipeline.AddMany(queue, fakesink)
gst.LinkMany(queue, fakesink)
queue.SyncStateWithParent()
fakesink.SyncStateWithParent()
sinkPad := queue.StaticPad("sink")
if sinkPad == nil {
fmt.Println("Could not get static pad from sink")
return
@@ -77,8 +63,8 @@ func tagsetter(mainLoop *glib.MainLoop) error {
Link(sinkPad)
})
if err := pipeline.SetState(gst.StatePaused); err != nil {
return err
if ret := pipeline.BlockSetState(gst.StatePaused, gst.ClockTime(time.Second)); ret != gst.StateChangeSuccess {
return fmt.Errorf("could not change state")
}
// Instead of using the main loop, we manually iterate over GStreamer's bus messages
@@ -87,37 +73,32 @@ func tagsetter(mainLoop *glib.MainLoop) error {
// timed_pop on the bus with the desired timeout for when to stop waiting for new messages.
// (-1 = Wait forever)
for {
msg := pipeline.GetPipelineBus().TimedPop(gst.ClockTimeNone)
msg := pipeline.Bus().TimedPop(gst.ClockTimeNone)
switch msg.Type() {
// When we use this method of popping from the bus (instead of a Watch), we own a
// reference to every message received (this may be abstracted later).
default:
// fmt.Println(msg)
msg.Unref()
// End of stream
case gst.MessageEOS:
msg.Unref()
case gst.MessageEos:
// Errors from any elements
case gst.MessageError:
gerr := msg.ParseError()
if debug := gerr.DebugString(); debug != "" {
gerr, debug := msg.ParseError()
if debug != "" {
fmt.Println("go-gst-debug:", debug)
}
msg.Unref()
return gerr
// Some element found a ToC in the current media stream and told
// us by posting a message to GStreamer's bus.
case gst.MessageTOC:
case gst.MessageToc:
// Parse the toc from the message
toc, updated := msg.ParseTOC()
msg.Unref()
fmt.Printf("Received toc: %s - updated %v\n", toc.GetScope(), updated)
toc, updated := msg.ParseToc()
fmt.Printf("Received toc: %s - updated %v\n", toc.Scope().String(), updated)
// Get a list of tags that are ToC specific.
if tags := toc.GetTags(); tags != nil {
if tags := toc.Tags(); tags != nil {
fmt.Println("- tags:", tags)
}
// ToCs do not have a fixed structure. Depending on the format that
@@ -127,43 +108,41 @@ func tagsetter(mainLoop *glib.MainLoop) error {
// interpreting the ToC manually.
// In this example, we simply want to print the ToC structure, so
// we iterate everything and don't try to interpret anything.
for _, entry := range toc.GetEntries() {
for _, entry := range toc.Entries() {
// Every entry in a ToC has its own type. One type could for
// example be Chapter.
fmt.Printf("\t%s - %s\n", entry.GetEntryTypeString(), entry.GetUID())
fmt.Printf("\t%s - %s\n", entry.EntryType().String(), entry.Uid())
// Every ToC entry can have a set of timestamps (start, stop).
if ok, start, stop := entry.GetStartStopTimes(); ok {
if start, stop, ok := entry.StartStopTimes(); ok {
startDur := time.Duration(start) * time.Nanosecond
stopDur := time.Duration(stop) * time.Nanosecond
fmt.Printf("\t- start: %s, stop: %s\n", startDur, stopDur)
}
// Every ToC entry can have tags to it.
if tags := entry.GetTags(); tags != nil {
if tags := entry.Tags(); tags != nil {
fmt.Println("\t- tags:", tags)
}
// Every ToC entry can have a set of child entries.
// With this structure, you can create trees of arbitrary depth.
for _, subEntry := range entry.GetSubEntries() {
fmt.Printf("\n\t\t%s - %s\n", subEntry.GetEntryTypeString(), subEntry.GetUID())
if ok, start, stop := entry.GetStartStopTimes(); ok {
for _, subEntry := range entry.SubEntries() {
fmt.Printf("\n\t\t%s - %s\n", subEntry.EntryType().String(), subEntry.Uid())
if start, stop, ok := entry.StartStopTimes(); ok {
startDur := time.Duration(start) * time.Nanosecond
stopDur := time.Duration(stop) * time.Nanosecond
fmt.Printf("\t\t- start: %s, stop: %s\n", startDur, stopDur)
}
if tags := entry.GetTags(); tags != nil {
if tags := entry.Tags(); tags != nil {
fmt.Println("\t\t- tags:", tags)
}
}
}
toc.Unref()
}
}
}
func main() {
examples.RunLoop(tagsetter)
tagsetter()
}